#!/usr/bin/env python3 # -*- coding: utf-8 -*- from pathlib import Path import sys import threading import subprocess from typing import AnyStr, IO, Callable from common import _get_chunk_file def _rotate_chunk(chunk_file: Path, chunk_number: int, eof: bool, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): print(f'Transferring chunk {chunk_file}') chunk_transfer_fun(chunk_file, chunk_number, eof, *chunk_transfer_args) print(f'Removing chunk {chunk_file}') chunk_file.unlink(missing_ok=False) def _save_chunk(chunk: bytes, chunk_file: Path): print(f'Saving chunk {chunk_file}') # Fails if file does already exist. with open(chunk_file, 'xb') as f: f.write(chunk) # def _save_output(out_pipe: IO[AnyStr], stdout_dir: Path): # stdout_dir.mkdir(parents=True, exist_ok=False) # # b: bytes # ct: int = 1 # for b in out_pipe: # stdout_dir.joinpath(str(ct)).write_bytes(b) # ct += 1 # # # TODO: Has this any effect? # # out_pipe.close() def _save_output_rotating_chunks(out_pipe: IO[AnyStr], chunk_file_tmpl: Path, chunk_size, chunk_transfer_fun: Callable, chunk_transfer_args: tuple, get_chunk_file: Callable[[Path, int], Path], ): ct: int = 1 remaining_bytes = chunk_size chunk: bytes = b'' while True: # https://docs.python.org/3/library/io.html#io.RawIOBase.read # If 0 bytes are returned, and size was not 0, this indicates end of file. # If the object is in non-blocking mode and no bytes are available, None is returned. b = out_pipe.read(remaining_bytes) if len(b) == 0: # EOF reached. chunk_file = get_chunk_file(chunk_file_tmpl, ct) chunk_file.parent.mkdir(parents=True, exist_ok=True) _save_chunk(chunk, chunk_file) _rotate_chunk(chunk_file, ct, True, chunk_transfer_fun, chunk_transfer_args) break chunk += b chunk_len = len(chunk) if chunk_len == chunk_size: # Next chunk is full. chunk_file = get_chunk_file(chunk_file_tmpl, ct) chunk_file.parent.mkdir(parents=True, exist_ok=True) _save_chunk(chunk, chunk_file) _rotate_chunk(chunk_file, ct, False, chunk_transfer_fun, chunk_transfer_args) chunk = b'' remaining_bytes = chunk_size ct += 1 elif chunk_len < chunk_size: remaining_bytes = chunk_size - chunk_len else: raise ValueError('Invalid state') # TODO: Has this any effect? # out_pipe.close() def _print_stderr(bin_pipe: IO[AnyStr]): b: bytes for b in bin_pipe: sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') # TODO: Has this any effect? # bin_pipe.close() def execute_print_transfer_chunks(command: list[str], chunk_file_tmpl: Path, chunk_transfer_fun: Callable, chunk_transfer_args: tuple, chunk_size: int = 1024 * 1024, get_chunk_file: Callable[[Path, int], Path] = _get_chunk_file, ) -> int: """ Executes the given command. Until the command has finished: - saves a small part of the commands stdout to chunk_file = f'{chunk_file_tmpl.name}.{chunk_number}' - calls chunk_transfer_fun with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args) - eof_reached is True for the last chunk - the last chunk may be smaller than chunk_size - deletes chunk_file During command execution: Forwards stderr output of the command to stderr. :param chunk_file_tmpl: Used by get_chunk_file to calculate the paths of the chunk files. :param command: Command to execute, e.g. ['cat', '/some/large/file'] :param chunk_transfer_fun: Called for each chunk with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args) :param chunk_transfer_args: :param chunk_size: Defaults to 1MB (1024*1024). :param get_chunk_file: Gets chunk_file_tmpl and chunk_number as input and returns the path of the corresponding chunk. :return: returncode of executed command """ process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, ) t_out = threading.Thread( target=_save_output_rotating_chunks, args=(process.stdout, chunk_file_tmpl, chunk_size, chunk_transfer_fun, chunk_transfer_args, get_chunk_file, )) t_err = threading.Thread( target=_print_stderr, args=(process.stderr,)) for t in (t_out, t_err): t.daemon = True t.start() returncode = process.wait() for t in (t_out, t_err): t.join() return returncode