subprocess-util/src/p1st/exec_consume_chunks.py

93 lines
2.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import sys
import threading
from pathlib import Path
from queue import Queue
from typing import Callable, IO, AnyStr
def execute_consume_chunks(command: list[str],
handle_chunks: Callable[[Queue.put], None],
) -> int:
"""
:param command:
:param handle_chunks: Has one parameter, `queue_put`. After a chunks is saved, this method must call queue_put.(chunk_path, last_chunk).
:return:
"""
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
)
q = Queue(maxsize=2)
threads = [
threading.Thread(
target=_stdin_worker,
args=(q.get,
process.stdin,
)),
threading.Thread(
target=handle_chunks,
args=(q.put,
)),
threading.Thread(
target=_stderr_worker,
args=(process.stderr,
)),
]
for t in threads:
t.daemon = True
t.start()
returncode: int = process.wait()
for t in threads:
t.join()
return returncode
def _stdin_worker(queue_get: Queue.get,
binary_stdin: IO[AnyStr],
):
while True:
chunk_path, last_chunk = queue_get()
chunk = _read_chunk(chunk_path)
binary_stdin.write(chunk)
# binary_stdin.flush() # TODO: is this required?
if last_chunk:
break
binary_stdin.flush()
# TODO: Has this any effect? On stdin probably yes!
binary_stdin.close()
def _read_chunk(chunk_path: Path) -> bytes:
"""
Reads a chunk from the given path.
"""
print(f'Reading chunk {chunk_path}')
# Fails if file does not exist.
return chunk_path.read_bytes()
def _stderr_worker(binary_stderr: IO[AnyStr]):
"""
Prints stderr of subprocess to sys.stderr.
"""
b: bytes
for b in binary_stderr:
sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}')
# TODO: Has this any effect?
# binary_stderr.close()