From 4fe94d380078c4bcd76c3463b8d7f5a9e8c498ec Mon Sep 17 00:00:00 2001 From: Daniel Langbein Date: Tue, 10 Jan 2023 22:13:56 +0100 Subject: [PATCH] add receive_inform --- exec_print_transfer.py | 41 ++++++++++++++-------- receive_inform.py | 53 ++++++++++++++++++++++++++++ rsync_inform.py | 80 ------------------------------------------ test.py | 71 ++++++++++++++++++++++--------------- transfer_inform.py | 52 +++++++++++++++++++++++++++ unix_sock_input.py | 59 +++++++++++++++++++++++++++++++ 6 files changed, 234 insertions(+), 122 deletions(-) create mode 100644 receive_inform.py delete mode 100644 rsync_inform.py create mode 100644 transfer_inform.py create mode 100644 unix_sock_input.py diff --git a/exec_print_transfer.py b/exec_print_transfer.py index f51392f..500e1f3 100644 --- a/exec_print_transfer.py +++ b/exec_print_transfer.py @@ -5,13 +5,15 @@ import subprocess from typing import AnyStr, IO, Callable -def _chunk_transfer(chunk_file: Path): +def _chunk_transfer(chunk_file: Path, eof: bool): print(f'Transferring chunk {chunk_file} to ... (This is the default method, it has no effect)') + if eof: + print(f'The last chunk has been transferred.') -def _rotate_chunk(chunk_file: Path, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): +def _rotate_chunk(chunk_file: Path, eof: bool, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): print(f'Transferring chunk {chunk_file}') - chunk_transfer_fun(*chunk_transfer_args) + chunk_transfer_fun(chunk_file, eof, *chunk_transfer_args) print(f'Removing chunk {chunk_file}') chunk_file.unlink(missing_ok=False) @@ -37,10 +39,11 @@ def _save_output(pipe: IO[AnyStr], stdout_dir: Path): # pipe.close() -def _save_output_rotating_chunks(pipe: IO[AnyStr], chunk_file: Path, chunk_size, +def _save_output_rotating_chunks(pipe: IO[AnyStr], chunk_file_tmpl: Path, chunk_size, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): - chunk_file.parent.mkdir(parents=True, exist_ok=True) + chunk_file_tmpl.parent.mkdir(parents=True, exist_ok=True) + ct: int = 1 remaining_bytes = chunk_size chunk: bytes = b'' while True: @@ -49,18 +52,26 @@ def _save_output_rotating_chunks(pipe: IO[AnyStr], chunk_file: Path, chunk_size, # If the object is in non-blocking mode and no bytes are available, None is returned. b = pipe.read(remaining_bytes) if len(b) == 0: - # EOF reached + # EOF reached. + chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') + _save_chunk(chunk, chunk_file) - _rotate_chunk(chunk_file, chunk_transfer_fun, chunk_transfer_args) + _rotate_chunk(chunk_file, True, chunk_transfer_fun, chunk_transfer_args) + break chunk += b chunk_len = len(chunk) if chunk_len == chunk_size: + # Next chunk is full. + chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') + _save_chunk(chunk, chunk_file) + _rotate_chunk(chunk_file, False, chunk_transfer_fun, chunk_transfer_args) + chunk = b'' remaining_bytes = chunk_size - _rotate_chunk(chunk_file, chunk_transfer_fun, chunk_transfer_args) + ct += 1 elif chunk_len < chunk_size: remaining_bytes = chunk_size - chunk_len else: @@ -80,7 +91,7 @@ def _print_output(pipe: IO[AnyStr]): def execute_print_transfer_chunks(command: list[str], - chunk_file: Path, + chunk_file_tmpl: Path, chunk_transfer_fun: Callable = _chunk_transfer, chunk_transfer_args: tuple = None, chunk_size=1024 * 1024, @@ -90,13 +101,15 @@ def execute_print_transfer_chunks(command: list[str], Until the command has finished: - - saves a small part of the commands stdout to chunk_file - - calls chunk_transfer_fun with chunk_transfer args + - saves a small part of the commands stdout to chunk_file = f'{chunk_file_tmpl.name}.{chunk_number}' + - calls chunk_transfer_fun with arguments (chunk_file, eof_reached, *chunk_transfer args) + - eof_reached is True for the last chunk + - the last chunk may be smaller than chunk_size - deletes chunk_file During command execution: Forwards stderr output of the command to stderr. - :param chunk_file: Chunks are saved as this file before they are transferred. + :param chunk_file_tmpl: Chunks are saved as this file before they are transferred. :param command: Command to execute, e.g. ['cat', '/some/large/file'] :param chunk_transfer_fun: :param chunk_transfer_args: @@ -104,7 +117,7 @@ def execute_print_transfer_chunks(command: list[str], :return: returncode of executed command """ if chunk_transfer_args is None: - chunk_transfer_args = (chunk_file,) + chunk_transfer_args = tuple() process = subprocess.Popen( command, @@ -115,7 +128,7 @@ def execute_print_transfer_chunks(command: list[str], t_out = threading.Thread( target=_save_output_rotating_chunks, - args=(process.stdout, chunk_file, chunk_size, chunk_transfer_fun, chunk_transfer_args)) + args=(process.stdout, chunk_file_tmpl, chunk_size, chunk_transfer_fun, chunk_transfer_args)) t_err = threading.Thread( target=_print_output, args=(process.stderr,)) diff --git a/receive_inform.py b/receive_inform.py new file mode 100644 index 0000000..769a8a2 --- /dev/null +++ b/receive_inform.py @@ -0,0 +1,53 @@ +import socket +from pathlib import Path + +from unix_sock_input import accept_loop_until_command_received + + +def receive_inform(cmd: list[str], socket_file: Path, chunk_file_tmpl: Path): + """ + :param chunk_file_tmpl: + :param cmd: + :param socket_file: Create a UNIX socket and wait for messages. + :return: + """ + + # TODO + # - execute the command + + # TODO + # - while True + # - wait for message at socket_file + # - if 'OK\n' + # - determine chunk filename + # - read chunk, pass to stdin of command + # - delete chunk + # - if 'EOF\n' + # - break + + print(f'Listening on socket {socket_file}') + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(str(socket_file)) + sock.listen(1) + + ct = 1 + commands = [b'OK\n', b'EOF\n'] + while True: + command = accept_loop_until_command_received(sock, commands) + if command == b'OK\n': + chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') + chunk = chunk_file.read_bytes() + # TODO send chunk to stdin of command + chunk_file.unlink(missing_ok=False) + ct += 1 + elif command == b'EOF\n': + break + else: + raise ValueError("Invalid state") + + print(f'Closing socket {socket_file}') + sock.close() + socket_file.unlink(missing_ok=False) + + # TODO + # - wait for command to finish diff --git a/rsync_inform.py b/rsync_inform.py deleted file mode 100644 index 8e447c1..0000000 --- a/rsync_inform.py +++ /dev/null @@ -1,80 +0,0 @@ -import socket -from pathlib import Path - -from exec_print_capture import execute_print_capture - - -def rsync_inform(rsync_cmd: list[str], - inform_cmd: list[str], - user_input_file: Path): - """ - First, this method transfers files to a remote pc. - - Then, it informs the remote pc. - - If some error occurs during the above steps, - it waits for user input (via a UNIX-socket) and repeats the failed step. - - :param rsync_cmd: - :param inform_cmd: - :param user_input_file: - :return: - """ - - # - rsync to remote pc - # - catch error - # - wait until user input, then repeat - _execute_loop_until_successful(rsync_cmd, user_input_file) - - # - inform remote pc about complete rsync - # - catch error - # - wait until user input, then repeat - _execute_loop_until_successful(inform_cmd, user_input_file) - - -def _execute_loop_until_successful(cmd: list[str], user_input_file: Path): - while True: - returncode, _out, _err = execute_print_capture(cmd) - if returncode == 0: - break - else: - print(f'\n' - f'Error while executing:\n' - f'\t{cmd}\n' - f'\tFor details, see above output.') - _wait_for_user(user_input_file) - print() - - -def _wait_for_user(socket_file: Path): - """ - Waits until user writes 'OK\n' to UNIX-socket. - """ - - # INSPIRATION: https://pymotw.com/3/socket/uds.html - - print(f'Info:\n' - f'\tPlease fix the above error first. Then continue here:\n' - f'\tsudo pacman -S --needed openbsd-netcat\n' - f'\techo "OK" | nc -U "{socket_file.absolute()}"') - - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.bind(str(socket_file)) - sock.listen(1) - - while True: - connection, client_address = sock.accept() - try: - b: bytes = connection.recv(32) - if b == b'OK\n': - print('Received "OK" from user.') - break - else: - print(f'Received unknown message: {b}') - except Exception as e: - print(f'Error while reading message: {e}') - finally: - connection.close() - - sock.close() - socket_file.unlink(missing_ok=False) diff --git a/test.py b/test.py index 6196f12..3fda382 100644 --- a/test.py +++ b/test.py @@ -1,10 +1,11 @@ import os +import shutil from pathlib import Path from exec_capture import execute_capture from exec_print_capture import execute_print_capture from exec_print_transfer import execute_print_transfer_chunks -from rsync_inform import rsync_inform +from transfer_inform import transfer_inform def test(): @@ -16,7 +17,8 @@ def test(): def test1(): - print("TEST 1") + _init(1) + returncode, out, err = execute_capture(['ls', '-la']) print(f'stdout:\n{out}\nstderr:\n{err}') print() @@ -25,29 +27,28 @@ def test1(): def test2(): - print("TEST 2") + _init(2) + returncode, out, err = execute_print_capture(['ls', '-la']) print() returncode, out, err = execute_print_capture(['ls', '/foo/bar']) def test3(): - print("TEST 3-1") - execute_print_capture(['rm', '-rf', 'test/3-1', 'test/3-2', 'test/3-3']) + _init(3) returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1')) print("TEST 3-2") returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2')) print("TEST 3-3") - returncode = execute_print_transfer_chunks(['cat', 'rsync_inform.py'], Path('test/3-3'), + returncode = execute_print_transfer_chunks(['cat', 'transfer_inform.py'], Path('test/3-3'), chunk_size=1024) def test4(): - print("TEST 4") - execute_print_capture(['rm', '-rf', 'test/4-rsync-error', 'test/4-ssh-error', 'test/4-UNIX-socket']) + _init(4) - rsync_inform( + transfer_inform( ['ls', 'test/4-rsync-error'], # rsync src to dst ['ls', 'test/4-ssh-error'], # ssh target-pc 'echo "OK" | nc -U "/path/to/unix-socket"' Path('test/4-UNIX-socket') @@ -55,42 +56,56 @@ def test4(): def test5(): - print("TEST 5") - source_file = Path('rsync_inform.py') # A python script file with some content to copy ;) + _init(5) - chunk_file = Path('test/5') + chunk_file_tmpl = Path('test/5') + source_file = Path('transfer_inform.py') # A python script file with some content to copy ;) remote_target_file = Path(f'test/5-copy-of-{source_file}') ssh_error_file = Path('test/5-ssh-error-while-informing-remote-pc') - user_input_file = Path('test/5-user-input') concat_script = Path('test/5-concat') - execute_print_capture(['rm', '-rf', - str(chunk_file), - str(remote_target_file), - str(ssh_error_file), - str(user_input_file), - str(concat_script)]) concat_script.write_text(f'#!/usr/bin/bash\n' - f'echo "rsync {chunk_file} {remote_target_file} command output ..."\n' + f'echo "rsync $1 ... command output"\n' f'cat "$1" >> "$2"') os.chmod(concat_script, 0o0755) - rsync_cmd = [str(concat_script), str(chunk_file), str(remote_target_file)] - inform_cmd = ['ls', str(ssh_error_file)] - # TODO: # When running this test: - # The intentionally generated error, can be fixed by executing - # f'touch str(ssh_error_file)' + # The intentionally generated error, can be fixed by touching (creating) the missing file. execute_print_transfer_chunks( command=['cat', str(source_file)], - chunk_file=chunk_file, - chunk_transfer_fun=rsync_inform, - chunk_transfer_args=(rsync_cmd, inform_cmd, user_input_file), + chunk_file_tmpl=chunk_file_tmpl, + chunk_transfer_fun=_chunk_transfer_fun, + chunk_transfer_args=(ssh_error_file, concat_script, remote_target_file), chunk_size=512, ) +def _chunk_transfer_fun(chunk_file: Path, + eof: bool, + ssh_error_file: Path, + concat_script: Path, + remote_target_file: Path): + rsync_cmd = [str(concat_script), str(chunk_file), str(remote_target_file)] + inform_cmd = ['ls', + chunk_file.parent.joinpath(f'5.remote.EOF={eof}'), + ] + + transfer_inform( + rsync_cmd=rsync_cmd, + inform_cmd=inform_cmd, + user_input_file=chunk_file.parent.joinpath(f'5.SOCKET'), + ) + + +def _init(test_number: int): + print(f"TEST {test_number}") + test_dir = Path('test') + if test_dir.exists(): + shutil.rmtree('test') + Path('test').mkdir(exist_ok=False) + + if __name__ == '__main__': test() diff --git a/transfer_inform.py b/transfer_inform.py new file mode 100644 index 0000000..1ffca4b --- /dev/null +++ b/transfer_inform.py @@ -0,0 +1,52 @@ +from pathlib import Path + +from exec_print_capture import execute_print_capture +from unix_sock_input import wait_until_command_received + + +def transfer_inform(rsync_cmd: list[str], + inform_cmd: list[str], + user_input_file: Path): + """ + First, this method transfers files to a remote pc. + + Then, it informs the remote pc. + + If some error occurs during the above steps, + it waits for user input (via a UNIX-socket) and repeats the failed step. + + :param rsync_cmd: + :param inform_cmd: + :param user_input_file: + :return: + """ + + # - rsync to remote pc + # - catch error + # - wait until user input, then repeat + _execute_loop_until_successful(rsync_cmd, user_input_file) + + # - inform remote pc about complete rsync + # - catch error + # - wait until user input, then repeat + _execute_loop_until_successful(inform_cmd, user_input_file) + + +def _execute_loop_until_successful(cmd: list[str], user_input_file: Path): + while True: + returncode, _out, _err = execute_print_capture(cmd) + if returncode == 0: + break + else: + print(f'\n' + f'Error while executing:\n' + f'\t{cmd}\n' + f'\tFor details, see above output.') + + print(f'Info:\n' + f'\tPlease fix the above error first. Then continue here:\n' + f'\tsudo pacman -S --needed openbsd-netcat\n' + f'\techo "OK" | nc -U "{user_input_file.absolute()}"') + wait_until_command_received(user_input_file, [b'OK\n']) + + print() diff --git a/unix_sock_input.py b/unix_sock_input.py new file mode 100644 index 0000000..a3da123 --- /dev/null +++ b/unix_sock_input.py @@ -0,0 +1,59 @@ +import socket +from pathlib import Path + + +def wait_until_command_received(socket_file: Path, commands: list[bytes]) -> bytes: + """ + Creates a UNIX socket at socket_file. + + Accepts connections on the UNIX socket, + until one client sends one of the given commands as first message. + + Closes the UNIX socket. + + :returns: The command that was received. + """ + + # INSPIRATION: https://pymotw.com/3/socket/uds.html + + print(f'Listening on socket {socket_file}') + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(str(socket_file)) + sock.listen(1) + + command = accept_loop_until_command_received(sock, commands) + + print(f'Closing socket {socket_file}') + sock.close() + socket_file.unlink(missing_ok=False) + + return command + + +def accept_loop_until_command_received(sock: socket.socket, commands: list[bytes]) -> bytes: + """ + Uses an open UNIX socket. + + Accepts connections on the UNIX socket, + until one client sends one of the given commands as first message. + + Does not close the UNIX socket. + + :returns: The command that was received. + """ + + bufsize = max([len(cmd) for cmd in commands]) + len(b'\n') + + while True: + connection, client_address = sock.accept() + try: + b: bytes = connection.recv(bufsize) + for cmd in commands: + if b == cmd: + print(f'Received "{cmd}".') + return cmd + print(f'Received unknown message: {b}') + except Exception as e: + print(f'Error while reading message: {e}') + finally: + connection.close()