subprocess-util/src/subprocess_util/exec_consume_chunks.py

123 lines
3.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2023-01-12 21:06:52 +01:00
import datetime
import subprocess
import sys
import threading
2023-01-12 21:06:52 +01:00
import time
from pathlib import Path
from queue import Queue
from typing import Callable, IO, AnyStr
from subprocess_util.data_units import DataUnitConverter
2023-01-12 21:06:52 +01:00
def execute_consume_chunks(command: list[str],
handle_chunks: Callable[[Queue.put], None],
) -> int:
"""
The `command` is executed in a subprocess.
`handle_chunks` is executed in a separate thread.
After `handle_chunks` has saved a new chunk,
the path of the saved chunk is added to a `Queue`.
For each chunk that has been added to the `Queue`, we
- read the chunk from disk
- write the chunk to the stdin of the subprocess
- and delete the chunk
2023-01-12 21:37:39 +01:00
:param command:
2023-01-12 21:37:39 +01:00
:param handle_chunks: Has one parameter, `queue_put`. `handle_chunks` must call queue_put.(chunk_path, last_chunk) for each saved chunk.
:return:
"""
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
)
q = Queue(maxsize=2)
threads = [
threading.Thread(
target=_stdin_worker,
args=(q.get,
process.stdin,
)),
threading.Thread(
target=handle_chunks,
args=(q.put,
)),
threading.Thread(
target=_stderr_worker,
args=(process.stderr,
)),
]
for t in threads:
t.daemon = True
t.start()
returncode: int = process.wait()
for t in threads:
t.join()
return returncode
def _stdin_worker(queue_get: Queue.get,
binary_stdin: IO[AnyStr],
):
2023-01-12 21:06:52 +01:00
start_time = time.time()
transferred_bytes = 0
while True:
2023-01-12 21:37:39 +01:00
chunk_path: Path
chunk_path, last_chunk = queue_get()
chunk = _read_chunk(chunk_path)
2023-01-12 21:37:39 +01:00
chunk_path.unlink(missing_ok=False)
binary_stdin.write(chunk)
# binary_stdin.flush() # TODO: is this required?
2023-01-12 21:06:52 +01:00
2023-01-13 09:01:25 +01:00
# TODO: print total transfer speed and last chunk transfer speed
2023-01-12 21:06:52 +01:00
current_time = time.time()
elapsed_time = current_time - start_time
transferred_bytes += len(chunk)
bytes_per_second = round(transferred_bytes / elapsed_time)
print(f'Elapsed time: {datetime.timedelta(seconds=elapsed_time)}\n'
f'Transferred: {DataUnitConverter.to_unit_auto_str(transferred_bytes)}\n'
f'Speed: {DataUnitConverter.to_unit_auto_str(bytes_per_second)}/s')
if last_chunk:
break
binary_stdin.flush()
# TODO: Has this any effect? On stdin probably yes!
binary_stdin.close()
def _read_chunk(chunk_path: Path) -> bytes:
"""
Reads a chunk from the given path.
"""
print(f'Reading chunk {chunk_path}')
# Fails if file does not exist.
return chunk_path.read_bytes()
def _stderr_worker(binary_stderr: IO[AnyStr]):
"""
Prints stderr of subprocess to sys.stderr.
"""
b: bytes
for b in binary_stderr:
sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}')
# TODO: Has this any effect?
# binary_stderr.close()