diff --git a/exec_print_receive.py b/exec_print_receive.py index da7187f..417fe35 100644 --- a/exec_print_receive.py +++ b/exec_print_receive.py @@ -4,8 +4,9 @@ import subprocess import sys import threading from pathlib import Path -from typing import IO, AnyStr +from typing import IO, AnyStr, Callable +from get_chunk_file import _get_chunk_file from receive_inform import receive_inform @@ -29,7 +30,9 @@ def _print_stderr(bin_pipe: IO[AnyStr]): def execute_print_receive_chunks(command: list[str], socket_file: Path, - chunk_file_tmpl: Path) -> int: + chunk_file_tmpl: Path, + get_chunk_file: Callable[[Path, int], Path] = _get_chunk_file, + ) -> int: process = subprocess.Popen( command, stdin=subprocess.PIPE, @@ -63,7 +66,7 @@ def execute_print_receive_chunks(command: list[str], t.daemon = True t.start() - receive_inform(process.stdin, socket_file, chunk_file_tmpl) + receive_inform(process.stdin, socket_file, chunk_file_tmpl, get_chunk_file) returncode = process.wait() for t in (t_out, t_err): diff --git a/exec_print_transfer.py b/exec_print_transfer.py index c07236d..24189c0 100644 --- a/exec_print_transfer.py +++ b/exec_print_transfer.py @@ -6,16 +6,12 @@ import threading import subprocess from typing import AnyStr, IO, Callable - -def _chunk_transfer(chunk_file: Path, eof: bool): - print(f'Transferring chunk {chunk_file} to ... (This is the default method, it has no effect)') - if eof: - print(f'The last chunk has been transferred.') +from get_chunk_file import _get_chunk_file -def _rotate_chunk(chunk_file: Path, eof: bool, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): +def _rotate_chunk(chunk_file: Path, chunk_number: int, eof: bool, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): print(f'Transferring chunk {chunk_file}') - chunk_transfer_fun(chunk_file, eof, *chunk_transfer_args) + chunk_transfer_fun(chunk_file, chunk_number, eof, *chunk_transfer_args) print(f'Removing chunk {chunk_file}') chunk_file.unlink(missing_ok=False) @@ -28,21 +24,26 @@ def _save_chunk(chunk: bytes, chunk_file: Path): f.write(chunk) -def _save_output(out_pipe: IO[AnyStr], stdout_dir: Path): - stdout_dir.mkdir(parents=True, exist_ok=False) - - b: bytes - ct: int = 1 - for b in out_pipe: - stdout_dir.joinpath(str(ct)).write_bytes(b) - ct += 1 - - # TODO: Has this any effect? - # out_pipe.close() +# def _save_output(out_pipe: IO[AnyStr], stdout_dir: Path): +# stdout_dir.mkdir(parents=True, exist_ok=False) +# +# b: bytes +# ct: int = 1 +# for b in out_pipe: +# stdout_dir.joinpath(str(ct)).write_bytes(b) +# ct += 1 +# +# # TODO: Has this any effect? +# # out_pipe.close() -def _save_output_rotating_chunks(out_pipe: IO[AnyStr], chunk_file_tmpl: Path, chunk_size, - chunk_transfer_fun: Callable, chunk_transfer_args: tuple): +def _save_output_rotating_chunks(out_pipe: IO[AnyStr], + chunk_file_tmpl: Path, + chunk_size, + chunk_transfer_fun: Callable, + chunk_transfer_args: tuple, + get_chunk_file: Callable[[Path, int], Path], + ): chunk_file_tmpl.parent.mkdir(parents=True, exist_ok=True) ct: int = 1 @@ -55,10 +56,10 @@ def _save_output_rotating_chunks(out_pipe: IO[AnyStr], chunk_file_tmpl: Path, ch b = out_pipe.read(remaining_bytes) if len(b) == 0: # EOF reached. - chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') + chunk_file = get_chunk_file(chunk_file_tmpl, ct) _save_chunk(chunk, chunk_file) - _rotate_chunk(chunk_file, True, chunk_transfer_fun, chunk_transfer_args) + _rotate_chunk(chunk_file, ct, True, chunk_transfer_fun, chunk_transfer_args) break chunk += b @@ -66,10 +67,10 @@ def _save_output_rotating_chunks(out_pipe: IO[AnyStr], chunk_file_tmpl: Path, ch chunk_len = len(chunk) if chunk_len == chunk_size: # Next chunk is full. - chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') + chunk_file = get_chunk_file(chunk_file_tmpl, ct) _save_chunk(chunk, chunk_file) - _rotate_chunk(chunk_file, False, chunk_transfer_fun, chunk_transfer_args) + _rotate_chunk(chunk_file, ct, False, chunk_transfer_fun, chunk_transfer_args) chunk = b'' remaining_bytes = chunk_size @@ -94,9 +95,10 @@ def _print_stderr(bin_pipe: IO[AnyStr]): def execute_print_transfer_chunks(command: list[str], chunk_file_tmpl: Path, - chunk_transfer_fun: Callable = _chunk_transfer, - chunk_transfer_args: tuple = None, - chunk_size=1024 * 1024, + chunk_transfer_fun: Callable, + chunk_transfer_args: tuple, + chunk_size: int = 1024 * 1024, + get_chunk_file: Callable[[Path, int], Path] = _get_chunk_file, ) -> int: """ Executes the given command. @@ -104,7 +106,7 @@ def execute_print_transfer_chunks(command: list[str], Until the command has finished: - saves a small part of the commands stdout to chunk_file = f'{chunk_file_tmpl.name}.{chunk_number}' - - calls chunk_transfer_fun with arguments (chunk_file, eof_reached, *chunk_transfer args) + - calls chunk_transfer_fun with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args) - eof_reached is True for the last chunk - the last chunk may be smaller than chunk_size - deletes chunk_file @@ -113,14 +115,12 @@ def execute_print_transfer_chunks(command: list[str], :param chunk_file_tmpl: Chunks are saved as this file before they are transferred. :param command: Command to execute, e.g. ['cat', '/some/large/file'] - :param chunk_transfer_fun: + :param chunk_transfer_fun: Called for each chunk with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args) :param chunk_transfer_args: :param chunk_size: Defaults to 1MB (1024*1024). + :param get_chunk_file: Gets chunk_file_tmpl and chunk_number as input and returns the path of the corresponding chunk. :return: returncode of executed command """ - if chunk_transfer_args is None: - chunk_transfer_args = tuple() - process = subprocess.Popen( command, stdout=subprocess.PIPE, @@ -130,7 +130,13 @@ def execute_print_transfer_chunks(command: list[str], t_out = threading.Thread( target=_save_output_rotating_chunks, - args=(process.stdout, chunk_file_tmpl, chunk_size, chunk_transfer_fun, chunk_transfer_args)) + args=(process.stdout, + chunk_file_tmpl, + chunk_size, + chunk_transfer_fun, + chunk_transfer_args, + get_chunk_file, + )) t_err = threading.Thread( target=_print_stderr, args=(process.stderr,)) diff --git a/get_chunk_file.py b/get_chunk_file.py new file mode 100644 index 0000000..827362c --- /dev/null +++ b/get_chunk_file.py @@ -0,0 +1,5 @@ +from pathlib import Path + + +def _get_chunk_file(chunk_file_tmpl: Path, ct: int): + return chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') diff --git a/receive_inform.py b/receive_inform.py index e7c506b..ceba041 100644 --- a/receive_inform.py +++ b/receive_inform.py @@ -2,18 +2,20 @@ # -*- coding: utf-8 -*- import socket from pathlib import Path -from typing import IO, AnyStr +from typing import IO, AnyStr, Callable from unix_sock_input import accept_loop_until_command_received def receive_inform(in_pipe: IO[AnyStr], socket_file: Path, - chunk_file_tmpl: Path) -> None: + chunk_file_tmpl: Path, + get_chunk_file: Callable[[Path, int], Path], + ) -> None: """ + :param get_chunk_file: :param in_pipe: :param chunk_file_tmpl: - :param cmd: :param socket_file: Create a UNIX socket and wait for messages. :return: """ @@ -29,7 +31,7 @@ def receive_inform(in_pipe: IO[AnyStr], if command not in commands: raise ValueError("Invalid state") - chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') + chunk_file = get_chunk_file(chunk_file_tmpl, ct) chunk = chunk_file.read_bytes() in_pipe.write(chunk) # in_pipe.flush() # TODO: is this required? diff --git a/test.py b/test.py index bcd2d8e..ac0f106 100644 --- a/test.py +++ b/test.py @@ -43,11 +43,17 @@ def test2(): def test3(): _init(3) - returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1')) + def _chunk_transfer(chunk_file: Path, eof: bool): + print(f'Transferring chunk {chunk_file} to ... (This is the default method, it has no effect)') + if eof: + print(f'The last chunk has been transferred.') + + returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1'), _chunk_transfer, ()) print("TEST 3-2") - returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2')) + returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2'), _chunk_transfer, ()) print("TEST 3-3") returncode = execute_print_transfer_chunks(['cat', 'transfer_inform.py'], Path('test/3-3'), + _chunk_transfer, (), chunk_size=1024) @@ -88,6 +94,7 @@ def test5(): def _test5_chunk_transfer_fun(chunk_file: Path, + chunk_number: int, eof: bool, concat_script: Path, remote_target_file: Path): @@ -119,7 +126,7 @@ def test6(): source_file = Path('transfer_inform.py') # A python script file with some content to copy ;) chunk_file_tmpl = Path('test/6-transfer_inform.py') ssh_target = 'pine-pwdless' - target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/6-transfer_inform.py') + target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/7-transfer_inform.py') execute_print_transfer_chunks( command=['cat', str(source_file)], @@ -131,12 +138,13 @@ def test6(): def _test6_chunk_transfer_fun(source_chunk: Path, + chunk_number: int, eof: bool, chunk_file_tmpl: Path, ssh_target: str, target_file_tmpl: Path, ): - target_chunk = target_file_tmpl.parent.joinpath(f'{source_chunk.name}') + target_chunk = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.{chunk_number}') rsync_cmd = ['rsync', str(source_chunk), f'{ssh_target}:{str(target_chunk)}'] message = 'EOF' if eof else 'OK' target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET') @@ -152,7 +160,7 @@ def _test6_chunk_transfer_fun(source_chunk: Path, def test7(): _init(7) - target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/6-transfer_inform.py') + target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/7-transfer_inform.py') target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET') execute_print_receive_chunks(