#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime import subprocess import sys import threading import time from pathlib import Path from queue import Queue from typing import Callable, IO, AnyStr from p1st.data_units import DataUnitConverter def execute_consume_chunks(command: list[str], handle_chunks: Callable[[Queue.put], None], ) -> int: """ The `command` is executed in a subprocess. `handle_chunks` is executed in a separate thread. After `handle_chunks` has saved a new chunk, the path of the saved chunk is added to a `Queue`. For each chunk that has been added to the `Queue`, we - read the chunk from disk - write the chunk to the stdin of the subprocess - and delete the chunk :param command: :param handle_chunks: Has one parameter, `queue_put`. `handle_chunks` must call queue_put.(chunk_path, last_chunk) for each saved chunk. :return: """ process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, ) q = Queue(maxsize=2) threads = [ threading.Thread( target=_stdin_worker, args=(q.get, process.stdin, )), threading.Thread( target=handle_chunks, args=(q.put, )), threading.Thread( target=_stderr_worker, args=(process.stderr, )), ] for t in threads: t.daemon = True t.start() returncode: int = process.wait() for t in threads: t.join() return returncode def _stdin_worker(queue_get: Queue.get, binary_stdin: IO[AnyStr], ): start_time = time.time() transferred_bytes = 0 while True: chunk_path: Path chunk_path, last_chunk = queue_get() chunk = _read_chunk(chunk_path) chunk_path.unlink(missing_ok=False) binary_stdin.write(chunk) # binary_stdin.flush() # TODO: is this required? # TODO: print total transfer speed and last chunk transfer speed current_time = time.time() elapsed_time = current_time - start_time transferred_bytes += len(chunk) bytes_per_second = round(transferred_bytes / elapsed_time) print(f'Elapsed time: {datetime.timedelta(seconds=elapsed_time)}\n' f'Transferred: {DataUnitConverter.to_unit_auto_str(transferred_bytes)}\n' f'Speed: {DataUnitConverter.to_unit_auto_str(bytes_per_second)}/s') if last_chunk: break binary_stdin.flush() # TODO: Has this any effect? On stdin probably yes! binary_stdin.close() def _read_chunk(chunk_path: Path) -> bytes: """ Reads a chunk from the given path. """ print(f'Reading chunk {chunk_path}') # Fails if file does not exist. return chunk_path.read_bytes() def _stderr_worker(binary_stderr: IO[AnyStr]): """ Prints stderr of subprocess to sys.stderr. """ b: bytes for b in binary_stderr: sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') # TODO: Has this any effect? # binary_stderr.close()