from pathlib import Path import sys import threading import subprocess from typing import AnyStr, IO, Callable def _chunk_transfer(stdout_dir: Path): print(f'Transferring chunks from {stdout_dir} to ...') # TODO # - rsync to remote pc # - catch error # - wait until user input, then repeat # - inform remote pc about complete rsync # - catch error # - wait until user input, then repeat def _rotate_chunks(stdout_dir: Path, ct: int, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): chunk_transfer_fun(*chunk_transfer_args) print(f'Rotating chunks ...') for i in range(0, ct + 1): file = stdout_dir.joinpath(str(i)) # print(f'Removing {file}') file.unlink(missing_ok=False) def _save_chunk(chunk: bytes, stdout_dir: Path, ct: int): print(f'Saving chunk {ct}') file = stdout_dir.joinpath(str(ct)) # Fails if file does already exist. with open(file, 'xb') as f: f.write(chunk) def _save_output(pipe: IO[AnyStr], stdout_dir: Path): stdout_dir.mkdir(parents=True, exist_ok=False) b: bytes ct: int = 1 for b in pipe: stdout_dir.joinpath(str(ct)).write_bytes(b) ct += 1 # TODO: Has this any effect? # pipe.close() def _save_output_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size): stdout_dir.mkdir(parents=True, exist_ok=False) ct = 0 remaining_bytes = chunk_size chunk: bytes = b'' while True: # https://docs.python.org/3/library/io.html#io.RawIOBase.read # If 0 bytes are returned, and size was not 0, this indicates end of file. # If the object is in non-blocking mode and no bytes are available, None is returned. b = pipe.read(remaining_bytes) if len(b) == 0: # EOF reached _save_chunk(chunk, stdout_dir, ct) break chunk += b chunk_len = len(chunk) if chunk_len == chunk_size: _save_chunk(chunk, stdout_dir, ct) chunk = b'' remaining_bytes = chunk_size ct += 1 elif chunk_len < chunk_size: remaining_bytes = chunk_size - chunk_len else: raise ValueError('Invalid state') # TODO: Has this any effect? # pipe.close() def _save_output_rotating_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): stdout_dir.mkdir(parents=True, exist_ok=False) # TODO ct = 0 ct_modulo = 2 remaining_bytes = chunk_size chunk: bytes = b'' while True: # https://docs.python.org/3/library/io.html#io.RawIOBase.read # If 0 bytes are returned, and size was not 0, this indicates end of file. # If the object is in non-blocking mode and no bytes are available, None is returned. b = pipe.read(remaining_bytes) if len(b) == 0: # EOF reached _save_chunk(chunk, stdout_dir, ct) _rotate_chunks(stdout_dir, ct, chunk_transfer_fun, chunk_transfer_args) break chunk += b chunk_len = len(chunk) if chunk_len == chunk_size: _save_chunk(chunk, stdout_dir, ct) chunk = b'' remaining_bytes = chunk_size ct = (ct + 1) % ct_modulo if ct == 0: _rotate_chunks(stdout_dir, ct_modulo - 1, chunk_transfer_fun, chunk_transfer_args) elif chunk_len < chunk_size: remaining_bytes = chunk_size - chunk_len else: raise ValueError('Invalid state') # TODO: Has this any effect? # pipe.close() def _print_output(pipe: IO[AnyStr]): line: str for line in pipe: sys.stderr.write(f'[STDERR] {line}') # TODO: Has this any effect? # pipe.close() # Goal: We want to save the stdout to small files and print stderr while running the command. def execute_print_transfer_chunks(command: list[str], stdout_dir: Path, chunk_transfer_fun: Callable = _chunk_transfer, chunk_transfer_args: tuple = None, chunk_size=1024 * 1024, ) -> int: """ Executes the given command saving its stdout to stdout_dir. Stderr is printed in real time. :param stdout_dir: Directory where stdout is saved to. :param command: Command to execute, e.g. ['ls', '-la', '/home'] :param chunk_transfer_fun: :param chunk_transfer_args: :param chunk_size: Defaults to 1MB (1024*1024). :return: returncode """ if chunk_transfer_args is None: chunk_transfer_args = (stdout_dir,) process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, ) t_out = threading.Thread( target=_save_output_rotating_chunks, args=(process.stdout, stdout_dir, chunk_size, chunk_transfer_fun, chunk_transfer_args)) t_err = threading.Thread( target=_print_output, args=(process.stderr,)) for t in (t_out, t_err): t.daemon = True t.start() returncode = process.wait() for t in (t_out, t_err): t.join() return returncode