simplify taskman, and add .run_on_main()

This commit is contained in:
Damien Elmes 2020-01-22 14:09:51 +10:00
parent 8236f800ae
commit be3393fcb4
3 changed files with 29 additions and 41 deletions

View File

@ -933,7 +933,7 @@ class DownloaderInstaller(QObject):
self.on_done = on_done self.on_done = on_done
self.mgr.mw.progress.start(immediate=True, parent=self.parent()) self.mgr.mw.progress.start(immediate=True, parent=self.parent())
self.mgr.mw.taskman.run(self._download_all, self._download_done) self.mgr.mw.taskman.run_in_background(self._download_all, self._download_done)
def _progress_callback(self, up: int, down: int) -> None: def _progress_callback(self, up: int, down: int) -> None:
self.dl_bytes += down self.dl_bytes += down
@ -1063,7 +1063,7 @@ def check_for_updates(
on_done(client, result) on_done(client, result)
mgr.mw.taskman.run(check, update_info_received) mgr.mw.taskman.run_in_background(check, update_info_received)
def handle_update_info( def handle_update_info(

View File

@ -224,7 +224,7 @@ class SimpleProcessPlayer(Player): # pylint: disable=abstract-method
self._process: Optional[subprocess.Popen] = None self._process: Optional[subprocess.Popen] = None
def play(self, tag: AVTag, on_done: OnDoneCallback) -> None: def play(self, tag: AVTag, on_done: OnDoneCallback) -> None:
self._taskman.run( self._taskman.run_in_background(
lambda: self._play(tag), lambda res: self._on_done(res, on_done) lambda: self._play(tag), lambda res: self._on_done(res, on_done)
) )

View File

@ -7,39 +7,40 @@ Helper for running tasks on background threads.
from concurrent.futures import Future from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from dataclasses import dataclass
from threading import Lock from threading import Lock
from typing import Any, Callable, Dict, List, Optional from typing import Any, Callable, Dict, List, Optional
from PyQt5.QtCore import QObject, pyqtSignal from PyQt5.QtCore import QObject, pyqtSignal
Closure = Callable[[], None]
@dataclass
class PendingDoneCallback:
callback: Callable[[Any], None]
future: Future
class TaskManager(QObject): class TaskManager(QObject):
_results_available = pyqtSignal() _closures_pending = pyqtSignal()
def __init__(self): def __init__(self):
QObject.__init__(self) QObject.__init__(self)
self._executor = ThreadPoolExecutor() self._executor = ThreadPoolExecutor()
self._closures: List[Closure] = []
self._closures_lock = Lock()
self._closures_pending.connect(self._on_closures_pending) # type: ignore
self._pending_callbacks: List[PendingDoneCallback] = [] def run_on_main(self, closure: Closure):
"Run the provided closure on the main thread."
with self._closures_lock:
self._closures.append(closure)
self._closures_pending.emit() # type: ignore
self._results_lock = Lock() def run_in_background(
self._results_available.connect(self._drain_results) # type: ignore
def run(
self, self,
task: Callable, task: Callable,
on_done: Optional[Callable], on_done: Optional[Callable[[Future], None]] = None,
args: Optional[Dict[str, Any]] = None, args: Optional[Dict[str, Any]] = None,
) -> Future: ) -> Future:
"""Run task on a background thread, calling on_done on the main thread if provided. """Run task on a background thread.
If on_done is provided, it will be called on the main thread with
the completed future.
Args if provided will be passed on as keyword arguments to the task callable.""" Args if provided will be passed on as keyword arguments to the task callable."""
if args is None: if args is None:
@ -48,30 +49,17 @@ class TaskManager(QObject):
fut = self._executor.submit(task, **args) fut = self._executor.submit(task, **args)
if on_done is not None: if on_done is not None:
fut.add_done_callback(
def done_closure(completed_future: Future) -> None: lambda future: self.run_on_main(lambda: on_done(future))
self._handle_done_callback(completed_future, on_done) )
fut.add_done_callback(done_closure)
return fut return fut
def _handle_done_callback(self, future: Future, callback: Callable) -> None: def _on_closures_pending(self):
"""When future completes, schedule its callback to run on the main thread.""" """Run any pending closures. This runs in the main thread."""
# add result to the queue with self._closures_lock:
with self._results_lock: closures = self._closures
self._pending_callbacks.append( self._closures = []
PendingDoneCallback(callback=callback, future=future)
)
# and tell the main thread to flush the queue for closure in closures:
self._results_available.emit() # type: ignore closure()
def _drain_results(self):
"""Fires pending callbacks in the main thread."""
with self._results_lock:
results = self._pending_callbacks
self._pending_callbacks = []
for result in results:
result.callback(result.future)