diff --git a/exec_capture.py b/exec_capture.py new file mode 100644 index 0000000..a55a895 --- /dev/null +++ b/exec_capture.py @@ -0,0 +1,10 @@ +import subprocess + + +def execute_capture(command: list[str]) -> [int, str, str]: + completed: subprocess.CompletedProcess = subprocess.run( + command, + capture_output=True, + text=True, + ) + return completed.returncode, completed.stdout, completed.stderr diff --git a/subprocess_util.py b/exec_print_capture.py similarity index 100% rename from subprocess_util.py rename to exec_print_capture.py diff --git a/exec_print_transfer.py b/exec_print_transfer.py new file mode 100644 index 0000000..aff6925 --- /dev/null +++ b/exec_print_transfer.py @@ -0,0 +1,173 @@ +from pathlib import Path +import sys +import threading +import subprocess +from typing import AnyStr, IO, Callable + + +def _chunk_transfer(stdout_dir: Path): + print(f'Transferring chunks from {stdout_dir} to ...') + + # TODO + # - rsync to remote pc + # - catch error + # - wait until user input, then repeat + # - inform remote pc about complete rsync + # - catch error + # - wait until user input, then repeat + + +def _rotate_chunks(stdout_dir: Path, ct: int, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): + chunk_transfer_fun(*chunk_transfer_args) + + print(f'Rotating chunks ...') + for i in range(0, ct + 1): + file = stdout_dir.joinpath(str(i)) + # print(f'Removing {file}') + file.unlink(missing_ok=False) + + +def _save_chunk(chunk: bytes, stdout_dir: Path, ct: int): + print(f'Saving chunk {ct}') + file = stdout_dir.joinpath(str(ct)) + # Fails if file does already exist. + with open(file, 'xb') as f: + f.write(chunk) + + +def _save_output(pipe: IO[AnyStr], stdout_dir: Path): + stdout_dir.mkdir(parents=True, exist_ok=False) + + b: bytes + ct: int = 1 + for b in pipe: + stdout_dir.joinpath(str(ct)).write_bytes(b) + ct += 1 + + # TODO: Has this any effect? + # pipe.close() + + +def _save_output_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size): + stdout_dir.mkdir(parents=True, exist_ok=False) + + ct = 0 + remaining_bytes = chunk_size + chunk: bytes = b'' + while True: + # https://docs.python.org/3/library/io.html#io.RawIOBase.read + # If 0 bytes are returned, and size was not 0, this indicates end of file. + # If the object is in non-blocking mode and no bytes are available, None is returned. + b = pipe.read(remaining_bytes) + if len(b) == 0: + # EOF reached + _save_chunk(chunk, stdout_dir, ct) + break + chunk += b + + chunk_len = len(chunk) + if chunk_len == chunk_size: + _save_chunk(chunk, stdout_dir, ct) + chunk = b'' + remaining_bytes = chunk_size + ct += 1 + elif chunk_len < chunk_size: + remaining_bytes = chunk_size - chunk_len + else: + raise ValueError('Invalid state') + + # TODO: Has this any effect? + # pipe.close() + + +def _save_output_rotating_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size, + chunk_transfer_fun: Callable, chunk_transfer_args: tuple): + stdout_dir.mkdir(parents=True, exist_ok=False) # TODO + + ct = 0 + ct_modulo = 2 + remaining_bytes = chunk_size + chunk: bytes = b'' + while True: + # https://docs.python.org/3/library/io.html#io.RawIOBase.read + # If 0 bytes are returned, and size was not 0, this indicates end of file. + # If the object is in non-blocking mode and no bytes are available, None is returned. + b = pipe.read(remaining_bytes) + if len(b) == 0: + # EOF reached + _save_chunk(chunk, stdout_dir, ct) + _rotate_chunks(stdout_dir, ct, chunk_transfer_fun, chunk_transfer_args) + break + chunk += b + + chunk_len = len(chunk) + if chunk_len == chunk_size: + _save_chunk(chunk, stdout_dir, ct) + chunk = b'' + remaining_bytes = chunk_size + ct = (ct + 1) % ct_modulo + if ct == 0: + _rotate_chunks(stdout_dir, ct_modulo - 1, chunk_transfer_fun, chunk_transfer_args) + elif chunk_len < chunk_size: + remaining_bytes = chunk_size - chunk_len + else: + raise ValueError('Invalid state') + + # TODO: Has this any effect? + # pipe.close() + + +def _print_output(pipe: IO[AnyStr]): + line: str + for line in pipe: + sys.stderr.write(f'[STDERR] {line}') + + # TODO: Has this any effect? + # pipe.close() + + +# Goal: We want to save the stdout to small files and print stderr while running the command. +def execute_print_transfer_chunks(command: list[str], + stdout_dir: Path, + chunk_transfer_fun: Callable = _chunk_transfer, + chunk_transfer_args: tuple = None, + chunk_size=1024 * 1024, + ) -> int: + """ + Executes the given command saving its stdout to stdout_dir. + + Stderr is printed in real time. + + + :param stdout_dir: Directory where stdout is saved to. + :param command: Command to execute, e.g. ['ls', '-la', '/home'] + :param chunk_transfer_fun: + :param chunk_transfer_args: + :param chunk_size: Defaults to 1MB (1024*1024). + :return: returncode + """ + if chunk_transfer_args is None: + chunk_transfer_args = (stdout_dir,) + + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + ) + + t_out = threading.Thread( + target=_save_output_rotating_chunks, + args=(process.stdout, stdout_dir, chunk_size, chunk_transfer_fun, chunk_transfer_args)) + t_err = threading.Thread( + target=_print_output, args=(process.stderr,)) + + for t in (t_out, t_err): + t.daemon = True + t.start() + + returncode = process.wait() + for t in (t_out, t_err): + t.join() + + return returncode diff --git a/rsync_inform.py b/rsync_inform.py new file mode 100644 index 0000000..f685562 --- /dev/null +++ b/rsync_inform.py @@ -0,0 +1,77 @@ +import socket +from pathlib import Path + +from exec_print_capture import execute_print_capture + + +def rsync_inform(rsync_cmd: list[str], + inform_cmd: list[str], + user_input_file: Path): + """ + First, this method transfers files to a remote pc. + + Then, it informs the remote pc. + + If some error occurs during the above steps, + it waits for user input (via a UNIX-socket) and repeats the failed step. + + :param rsync_cmd: + :param inform_cmd: + :param user_input_file: + :return: + """ + + # - rsync to remote pc + # - catch error + # - wait until user input, then repeat + while True: + returncode, _out, _err = execute_print_capture(rsync_cmd) + if returncode == 0: + break + else: + print(f'Error while executing: {rsync_cmd}\n' + f'See above output.') + _wait_for_user(user_input_file) + + # - inform remote pc about complete rsync + # - catch error + # - wait until user input, then repeat + while True: + returncode, _out, _err = execute_print_capture(inform_cmd) + if returncode == 0: + break + else: + print(f'Error while executing: {rsync_cmd}\n' + f'See above output.') + _wait_for_user(user_input_file) + + +def _wait_for_user(socket_file: Path): + # INSPIRATION: https://pymotw.com/3/socket/uds.html + + print(f'Waiting for user to write "OK\\n" to unix socket {socket_file} ...\n' + f'Hint:\n' + f'\tFirst, fix the error reported above, then continue here:\n' + f'\tsudo pacman -S --needed openbsd-netcat\n' + f'\techo "OK" | nc -U "{socket_file.absolute()}"') + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(str(socket_file)) + sock.listen(1) + + while True: + connection, client_address = sock.accept() + try: + b: bytes = connection.recv(32) + if b == b'OK\n': + print('Received "OK" from user.') + break + else: + print(f'Received unknown message: {b}') + except Exception as e: + print(f'Error while reading message: {e}') + finally: + connection.close() + + sock.close() + socket_file.unlink(missing_ok=False) diff --git a/subprocess_util_2.py b/subprocess_util_2.py deleted file mode 100644 index 4e27624..0000000 --- a/subprocess_util_2.py +++ /dev/null @@ -1,107 +0,0 @@ -from pathlib import Path -import sys -import threading -import subprocess -from typing import AnyStr, IO - - -def _save_chunk(chunk: bytes, stdout_dir: Path, ct: int): - print(f"Saving chunk {ct}") - file = stdout_dir.joinpath(str(ct)) - file.write_bytes(chunk) - - -def _save_output(pipe: IO[AnyStr], stdout_dir: Path): - stdout_dir.mkdir(parents=True, exist_ok=False) # TODO - - b: bytes - ct: int = 1 - for b in pipe: - stdout_dir.joinpath(str(ct)).write_bytes(b) - ct += 1 - - # TODO: Has this any effect? - # pipe.close() - - -def _save_output_chunks(pipe: IO[AnyStr], stdout_dir: Path, chunk_size): - stdout_dir.mkdir(parents=True, exist_ok=False) # TODO - - # b: bytes - # ct: int = 1 - # for b in pipe: - # stdout_dir.joinpath(str(ct)).write_bytes(b) - # ct += 1 - - ct = 0 - remaining_bytes = chunk_size - chunk: bytes = b'' - while True: - - # https://docs.python.org/3/library/io.html#io.RawIOBase.read - # If 0 bytes are returned, and size was not 0, this indicates end of file. - # If the object is in non-blocking mode and no bytes are available, None is returned. - b = pipe.read(remaining_bytes) - if len(b) == 0: - # EOF reached - _save_chunk(chunk, stdout_dir, ct) - break - chunk += b - - chunk_len = len(chunk) - if chunk_len == chunk_size: - _save_chunk(chunk, stdout_dir, ct) - chunk = b'' - remaining_bytes = chunk_size - ct += 1 - elif chunk_len < chunk_size: - remaining_bytes = chunk_size - chunk_len - else: - raise ValueError("Invalid state") - - # TODO: Has this any effect? - # pipe.close() - - -def _print_output(pipe: IO[AnyStr]): - line: str - for line in pipe: - sys.stderr.write(f'[STDERR] {line}') - - # TODO: Has this any effect? - # pipe.close() - - -# Goal: We want to save the stdout to small files and print stderr while running the command. -def execute_print_capture_bin(command: list[str], stdout_dir: Path, chunk_size = 1024 * 1024) -> int: - """ - Executes the given command saving its stdout to stdout_dir. - - Stderr is printed in real time. - - :param chunk_size: Defaults to 1MB (1024*1024). - :param stdout_dir: Directory where stdout is saved to. - :param command: Command to execute, e.g. ['ls', '-la', '/home'] - :return: returncode - """ - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - ) - - t_out = threading.Thread( - target=_save_output_chunks, args=(process.stdout, stdout_dir, chunk_size)) - t_err = threading.Thread( - target=_print_output, args=(process.stderr,)) - - for t in (t_out, t_err): - t.daemon = True - t.start() - - returncode = process.wait() - for t in (t_out, t_err): - t.join() - - return returncode diff --git a/test.py b/test.py index e871a3b..443c690 100644 --- a/test.py +++ b/test.py @@ -1,24 +1,51 @@ from pathlib import Path +from rsync_inform import rsync_inform from subprocess_util import execute_print_capture -from subprocess_util_2 import execute_print_capture_bin +from subprocess_util1 import execute_capture +from subprocess_util_2 import execute_print_transfer_chunks def test(): - print("TEST ONE") + test1() + test2() + test3() + test4() + +def test1(): + print("TEST ZERO") + returncode, out, err = execute_capture(['ls', '-la']) + print(f'stdout:\n{out}\nstderr:\n{err}') + print() + returncode, out, err = execute_capture(['ls', '/foo/bar']) + print(f'stdout:\n{out}\nstderr:\n{err}') + + +def test2(): + print("TEST ONE") returncode, out, err = execute_print_capture(['ls', '-la']) print() returncode, out, err = execute_print_capture(['ls', '/foo/bar']) + +def test3(): print("TEST TWO-1") execute_print_capture(['rm', '-rf', 'test/1', 'test/2', 'test/3']) - returncode = execute_print_capture_bin(['ls', '-la'], Path('test/1')) + returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/1')) print("TEST TWO-2") - returncode = execute_print_capture_bin(['ls', '/foo/bar'], Path('test/2')) + returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/2')) print("TEST TWO-3") - returncode = execute_print_capture_bin(['cat', 'subprocess_util.py'], Path('test/3'), - chunk_size=1024) + returncode = execute_print_transfer_chunks(['cat', 'subprocess_util.py'], Path('test/3'), + chunk_size=1024) + + +def test4(): + rsync_inform( + ['ls', 'test/rsync-error'], # rsync src to dst + ['ls', 'test/ssh-error'], # ssh target-pc 'echo "OK" | nc -U "/path/to/unix-socket"' + Path('test/unix-socket') + ) if __name__ == '__main__':