From be3393fcb4f7c5b0e3bd8f937c71848d42163d06 Mon Sep 17 00:00:00 2001 From: Damien Elmes Date: Wed, 22 Jan 2020 14:09:51 +1000 Subject: [PATCH] simplify taskman, and add .run_on_main() --- qt/aqt/addons.py | 4 +-- qt/aqt/sound.py | 2 +- qt/aqt/taskman.py | 64 +++++++++++++++++++---------------------------- 3 files changed, 29 insertions(+), 41 deletions(-) diff --git a/qt/aqt/addons.py b/qt/aqt/addons.py index de207d0ba..334450632 100644 --- a/qt/aqt/addons.py +++ b/qt/aqt/addons.py @@ -933,7 +933,7 @@ class DownloaderInstaller(QObject): self.on_done = on_done self.mgr.mw.progress.start(immediate=True, parent=self.parent()) - self.mgr.mw.taskman.run(self._download_all, self._download_done) + self.mgr.mw.taskman.run_in_background(self._download_all, self._download_done) def _progress_callback(self, up: int, down: int) -> None: self.dl_bytes += down @@ -1063,7 +1063,7 @@ def check_for_updates( on_done(client, result) - mgr.mw.taskman.run(check, update_info_received) + mgr.mw.taskman.run_in_background(check, update_info_received) def handle_update_info( diff --git a/qt/aqt/sound.py b/qt/aqt/sound.py index 1d71c6589..50eb9ee52 100644 --- a/qt/aqt/sound.py +++ b/qt/aqt/sound.py @@ -224,7 +224,7 @@ class SimpleProcessPlayer(Player): # pylint: disable=abstract-method self._process: Optional[subprocess.Popen] = None def play(self, tag: AVTag, on_done: OnDoneCallback) -> None: - self._taskman.run( + self._taskman.run_in_background( lambda: self._play(tag), lambda res: self._on_done(res, on_done) ) diff --git a/qt/aqt/taskman.py b/qt/aqt/taskman.py index 34325de51..be305b7b5 100644 --- a/qt/aqt/taskman.py +++ b/qt/aqt/taskman.py @@ -7,39 +7,40 @@ Helper for running tasks on background threads. from concurrent.futures import Future from concurrent.futures.thread import ThreadPoolExecutor -from dataclasses import dataclass from threading import Lock from typing import Any, Callable, Dict, List, Optional from PyQt5.QtCore import QObject, pyqtSignal - -@dataclass -class PendingDoneCallback: - callback: Callable[[Any], None] - future: Future +Closure = Callable[[], None] class TaskManager(QObject): - _results_available = pyqtSignal() + _closures_pending = pyqtSignal() def __init__(self): QObject.__init__(self) - self._executor = ThreadPoolExecutor() + self._closures: List[Closure] = [] + self._closures_lock = Lock() + self._closures_pending.connect(self._on_closures_pending) # type: ignore - self._pending_callbacks: List[PendingDoneCallback] = [] + def run_on_main(self, closure: Closure): + "Run the provided closure on the main thread." + with self._closures_lock: + self._closures.append(closure) + self._closures_pending.emit() # type: ignore - self._results_lock = Lock() - self._results_available.connect(self._drain_results) # type: ignore - - def run( + def run_in_background( self, task: Callable, - on_done: Optional[Callable], + on_done: Optional[Callable[[Future], None]] = None, args: Optional[Dict[str, Any]] = None, ) -> Future: - """Run task on a background thread, calling on_done on the main thread if provided. + """Run task on a background thread. + + If on_done is provided, it will be called on the main thread with + the completed future. Args if provided will be passed on as keyword arguments to the task callable.""" if args is None: @@ -48,30 +49,17 @@ class TaskManager(QObject): fut = self._executor.submit(task, **args) if on_done is not None: - - def done_closure(completed_future: Future) -> None: - self._handle_done_callback(completed_future, on_done) - - fut.add_done_callback(done_closure) + fut.add_done_callback( + lambda future: self.run_on_main(lambda: on_done(future)) + ) return fut - def _handle_done_callback(self, future: Future, callback: Callable) -> None: - """When future completes, schedule its callback to run on the main thread.""" - # add result to the queue - with self._results_lock: - self._pending_callbacks.append( - PendingDoneCallback(callback=callback, future=future) - ) + def _on_closures_pending(self): + """Run any pending closures. This runs in the main thread.""" + with self._closures_lock: + closures = self._closures + self._closures = [] - # and tell the main thread to flush the queue - self._results_available.emit() # type: ignore - - def _drain_results(self): - """Fires pending callbacks in the main thread.""" - with self._results_lock: - results = self._pending_callbacks - self._pending_callbacks = [] - - for result in results: - result.callback(result.future) + for closure in closures: + closure()