diff --git a/exec_print_transfer.py b/exec_print_transfer.py index aff6925..f51392f 100644 --- a/exec_print_transfer.py +++ b/exec_print_transfer.py @@ -5,33 +5,22 @@ import subprocess from typing import AnyStr, IO, Callable -def _chunk_transfer(stdout_dir: Path): - print(f'Transferring chunks from {stdout_dir} to ...') - - # TODO - # - rsync to remote pc - # - catch error - # - wait until user input, then repeat - # - inform remote pc about complete rsync - # - catch error - # - wait until user input, then repeat +def _chunk_transfer(chunk_file: Path): + print(f'Transferring chunk {chunk_file} to ... (This is the default method, it has no effect)') -def _rotate_chunks(stdout_dir: Path, ct: int, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): +def _rotate_chunk(chunk_file: Path, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): + print(f'Transferring chunk {chunk_file}') chunk_transfer_fun(*chunk_transfer_args) - print(f'Rotating chunks ...') - for i in range(0, ct + 1): - file = stdout_dir.joinpath(str(i)) - # print(f'Removing {file}') - file.unlink(missing_ok=False) + print(f'Removing chunk {chunk_file}') + chunk_file.unlink(missing_ok=False) -def _save_chunk(chunk: bytes, stdout_dir: Path, ct: int): - print(f'Saving chunk {ct}') - file = stdout_dir.joinpath(str(ct)) +def _save_chunk(chunk: bytes, chunk_file: Path): + print(f'Saving chunk {chunk_file}') # Fails if file does already exist. - with open(file, 'xb') as f: + with open(chunk_file, 'xb') as f: f.write(chunk) @@ -48,44 +37,10 @@ def _save_output(pipe: IO[AnyStr], stdout_dir: Path): # pipe.close() -def _save_output_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size): - stdout_dir.mkdir(parents=True, exist_ok=False) - - ct = 0 - remaining_bytes = chunk_size - chunk: bytes = b'' - while True: - # https://docs.python.org/3/library/io.html#io.RawIOBase.read - # If 0 bytes are returned, and size was not 0, this indicates end of file. - # If the object is in non-blocking mode and no bytes are available, None is returned. - b = pipe.read(remaining_bytes) - if len(b) == 0: - # EOF reached - _save_chunk(chunk, stdout_dir, ct) - break - chunk += b - - chunk_len = len(chunk) - if chunk_len == chunk_size: - _save_chunk(chunk, stdout_dir, ct) - chunk = b'' - remaining_bytes = chunk_size - ct += 1 - elif chunk_len < chunk_size: - remaining_bytes = chunk_size - chunk_len - else: - raise ValueError('Invalid state') - - # TODO: Has this any effect? - # pipe.close() - - -def _save_output_rotating_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size, +def _save_output_rotating_chunks(pipe: IO[AnyStr], chunk_file: Path, chunk_size, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): - stdout_dir.mkdir(parents=True, exist_ok=False) # TODO + chunk_file.parent.mkdir(parents=True, exist_ok=True) - ct = 0 - ct_modulo = 2 remaining_bytes = chunk_size chunk: bytes = b'' while True: @@ -95,19 +50,17 @@ def _save_output_rotating_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size, b = pipe.read(remaining_bytes) if len(b) == 0: # EOF reached - _save_chunk(chunk, stdout_dir, ct) - _rotate_chunks(stdout_dir, ct, chunk_transfer_fun, chunk_transfer_args) + _save_chunk(chunk, chunk_file) + _rotate_chunk(chunk_file, chunk_transfer_fun, chunk_transfer_args) break chunk += b chunk_len = len(chunk) if chunk_len == chunk_size: - _save_chunk(chunk, stdout_dir, ct) + _save_chunk(chunk, chunk_file) chunk = b'' remaining_bytes = chunk_size - ct = (ct + 1) % ct_modulo - if ct == 0: - _rotate_chunks(stdout_dir, ct_modulo - 1, chunk_transfer_fun, chunk_transfer_args) + _rotate_chunk(chunk_file, chunk_transfer_fun, chunk_transfer_args) elif chunk_len < chunk_size: remaining_bytes = chunk_size - chunk_len else: @@ -126,28 +79,32 @@ def _print_output(pipe: IO[AnyStr]): # pipe.close() -# Goal: We want to save the stdout to small files and print stderr while running the command. def execute_print_transfer_chunks(command: list[str], - stdout_dir: Path, + chunk_file: Path, chunk_transfer_fun: Callable = _chunk_transfer, chunk_transfer_args: tuple = None, chunk_size=1024 * 1024, ) -> int: """ - Executes the given command saving its stdout to stdout_dir. + Executes the given command. - Stderr is printed in real time. + Until the command has finished: + - saves a small part of the commands stdout to chunk_file + - calls chunk_transfer_fun with chunk_transfer args + - deletes chunk_file - :param stdout_dir: Directory where stdout is saved to. - :param command: Command to execute, e.g. ['ls', '-la', '/home'] + During command execution: Forwards stderr output of the command to stderr. + + :param chunk_file: Chunks are saved as this file before they are transferred. + :param command: Command to execute, e.g. ['cat', '/some/large/file'] :param chunk_transfer_fun: :param chunk_transfer_args: :param chunk_size: Defaults to 1MB (1024*1024). - :return: returncode + :return: returncode of executed command """ if chunk_transfer_args is None: - chunk_transfer_args = (stdout_dir,) + chunk_transfer_args = (chunk_file,) process = subprocess.Popen( command, @@ -158,7 +115,7 @@ def execute_print_transfer_chunks(command: list[str], t_out = threading.Thread( target=_save_output_rotating_chunks, - args=(process.stdout, stdout_dir, chunk_size, chunk_transfer_fun, chunk_transfer_args)) + args=(process.stdout, chunk_file, chunk_size, chunk_transfer_fun, chunk_transfer_args)) t_err = threading.Thread( target=_print_output, args=(process.stderr,)) diff --git a/rsync_inform.py b/rsync_inform.py index f685562..8e447c1 100644 --- a/rsync_inform.py +++ b/rsync_inform.py @@ -24,34 +24,37 @@ def rsync_inform(rsync_cmd: list[str], # - rsync to remote pc # - catch error # - wait until user input, then repeat - while True: - returncode, _out, _err = execute_print_capture(rsync_cmd) - if returncode == 0: - break - else: - print(f'Error while executing: {rsync_cmd}\n' - f'See above output.') - _wait_for_user(user_input_file) + _execute_loop_until_successful(rsync_cmd, user_input_file) # - inform remote pc about complete rsync # - catch error # - wait until user input, then repeat + _execute_loop_until_successful(inform_cmd, user_input_file) + + +def _execute_loop_until_successful(cmd: list[str], user_input_file: Path): while True: - returncode, _out, _err = execute_print_capture(inform_cmd) + returncode, _out, _err = execute_print_capture(cmd) if returncode == 0: break else: - print(f'Error while executing: {rsync_cmd}\n' - f'See above output.') + print(f'\n' + f'Error while executing:\n' + f'\t{cmd}\n' + f'\tFor details, see above output.') _wait_for_user(user_input_file) + print() def _wait_for_user(socket_file: Path): + """ + Waits until user writes 'OK\n' to UNIX-socket. + """ + # INSPIRATION: https://pymotw.com/3/socket/uds.html - print(f'Waiting for user to write "OK\\n" to unix socket {socket_file} ...\n' - f'Hint:\n' - f'\tFirst, fix the error reported above, then continue here:\n' + print(f'Info:\n' + f'\tPlease fix the above error first. Then continue here:\n' f'\tsudo pacman -S --needed openbsd-netcat\n' f'\techo "OK" | nc -U "{socket_file.absolute()}"') diff --git a/test.py b/test.py index 443c690..6196f12 100644 --- a/test.py +++ b/test.py @@ -1,20 +1,22 @@ +import os from pathlib import Path +from exec_capture import execute_capture +from exec_print_capture import execute_print_capture +from exec_print_transfer import execute_print_transfer_chunks from rsync_inform import rsync_inform -from subprocess_util import execute_print_capture -from subprocess_util1 import execute_capture -from subprocess_util_2 import execute_print_transfer_chunks def test(): - test1() - test2() - test3() - test4() + # test1() + # test2() + # test3() + # test4() + test5() def test1(): - print("TEST ZERO") + print("TEST 1") returncode, out, err = execute_capture(['ls', '-la']) print(f'stdout:\n{out}\nstderr:\n{err}') print() @@ -23,28 +25,70 @@ def test1(): def test2(): - print("TEST ONE") + print("TEST 2") returncode, out, err = execute_print_capture(['ls', '-la']) print() returncode, out, err = execute_print_capture(['ls', '/foo/bar']) def test3(): - print("TEST TWO-1") - execute_print_capture(['rm', '-rf', 'test/1', 'test/2', 'test/3']) - returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/1')) - print("TEST TWO-2") - returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/2')) - print("TEST TWO-3") - returncode = execute_print_transfer_chunks(['cat', 'subprocess_util.py'], Path('test/3'), + print("TEST 3-1") + execute_print_capture(['rm', '-rf', 'test/3-1', 'test/3-2', 'test/3-3']) + + returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1')) + print("TEST 3-2") + returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2')) + print("TEST 3-3") + returncode = execute_print_transfer_chunks(['cat', 'rsync_inform.py'], Path('test/3-3'), chunk_size=1024) def test4(): + print("TEST 4") + execute_print_capture(['rm', '-rf', 'test/4-rsync-error', 'test/4-ssh-error', 'test/4-UNIX-socket']) + rsync_inform( - ['ls', 'test/rsync-error'], # rsync src to dst - ['ls', 'test/ssh-error'], # ssh target-pc 'echo "OK" | nc -U "/path/to/unix-socket"' - Path('test/unix-socket') + ['ls', 'test/4-rsync-error'], # rsync src to dst + ['ls', 'test/4-ssh-error'], # ssh target-pc 'echo "OK" | nc -U "/path/to/unix-socket"' + Path('test/4-UNIX-socket') + ) + + +def test5(): + print("TEST 5") + source_file = Path('rsync_inform.py') # A python script file with some content to copy ;) + + chunk_file = Path('test/5') + remote_target_file = Path(f'test/5-copy-of-{source_file}') + ssh_error_file = Path('test/5-ssh-error-while-informing-remote-pc') + user_input_file = Path('test/5-user-input') + concat_script = Path('test/5-concat') + execute_print_capture(['rm', '-rf', + str(chunk_file), + str(remote_target_file), + str(ssh_error_file), + str(user_input_file), + str(concat_script)]) + + concat_script.write_text(f'#!/usr/bin/bash\n' + f'echo "rsync {chunk_file} {remote_target_file} command output ..."\n' + f'cat "$1" >> "$2"') + os.chmod(concat_script, 0o0755) + + rsync_cmd = [str(concat_script), str(chunk_file), str(remote_target_file)] + inform_cmd = ['ls', str(ssh_error_file)] + + # TODO: + # When running this test: + # The intentionally generated error, can be fixed by executing + # f'touch str(ssh_error_file)' + + execute_print_transfer_chunks( + command=['cat', str(source_file)], + chunk_file=chunk_file, + chunk_transfer_fun=rsync_inform, + chunk_transfer_args=(rsync_cmd, inform_cmd, user_input_file), + chunk_size=512, )