subprocess-util/subprocess_util_2.py

108 lines
2.9 KiB
Python

from pathlib import Path
import sys
import threading
import subprocess
from typing import AnyStr, IO
def _save_chunk(chunk: bytes, stdout_dir: Path, ct: int):
print(f"Saving chunk {ct}")
file = stdout_dir.joinpath(str(ct))
file.write_bytes(chunk)
def _save_output(pipe: IO[AnyStr], stdout_dir: Path):
stdout_dir.mkdir(parents=True, exist_ok=False) # TODO
b: bytes
ct: int = 1
for b in pipe:
stdout_dir.joinpath(str(ct)).write_bytes(b)
ct += 1
# TODO: Has this any effect?
# pipe.close()
def _save_output_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size):
stdout_dir.mkdir(parents=True, exist_ok=False) # TODO
# b: bytes
# ct: int = 1
# for b in pipe:
# stdout_dir.joinpath(str(ct)).write_bytes(b)
# ct += 1
ct = 0
remaining_bytes = chunk_size
chunk: bytes = b''
while True:
# https://docs.python.org/3/library/io.html#io.RawIOBase.read
# If 0 bytes are returned, and size was not 0, this indicates end of file.
# If the object is in non-blocking mode and no bytes are available, None is returned.
b = pipe.read(remaining_bytes)
if len(b) == 0:
# EOF reached
_save_chunk(chunk, stdout_dir, ct)
break
chunk += b
chunk_len = len(chunk)
if chunk_len == chunk_size:
_save_chunk(chunk, stdout_dir, ct)
chunk = b''
remaining_bytes = chunk_size
ct += 1
elif chunk_len < chunk_size:
remaining_bytes = chunk_size - chunk_len
else:
raise ValueError("Invalid state")
# TODO: Has this any effect?
# pipe.close()
def _print_output(pipe: IO[AnyStr]):
line: str
for line in pipe:
sys.stderr.write(f'[STDERR] {line}')
# TODO: Has this any effect?
# pipe.close()
# Goal: We want to save the stdout to small files and print stderr while running the command.
def execute_print_capture_bin(command: list[str], stdout_dir: Path, chunk_size = 1024 * 1024) -> int:
"""
Executes the given command saving its stdout to stdout_dir.
Stderr is printed in real time.
:param chunk_size: Defaults to 1MB (1024*1024).
:param stdout_dir: Directory where stdout is saved to.
:param command: Command to execute, e.g. ['ls', '-la', '/home']
:return: returncode
"""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
)
t_out = threading.Thread(
target=_save_output_chunks, args=(process.stdout, stdout_dir, chunk_size))
t_err = threading.Thread(
target=_print_output, args=(process.stderr,))
for t in (t_out, t_err):
t.daemon = True
t.start()
returncode = process.wait()
for t in (t_out, t_err):
t.join()
return returncode