From f3ae420dca314bfab60fcf10596f4d9261ab0705 Mon Sep 17 00:00:00 2001 From: Daniel Langbein Date: Thu, 12 Jan 2023 17:20:39 +0100 Subject: [PATCH] refactor --- src/p1st/btrfs_receive_chunks_v3.py | 9 +- src/p1st/btrfs_send_chunks_v3.py | 5 +- src/p1st/common.py | 16 ++- src/p1st/exec_print_capture.py | 2 +- src/p1st/exec_print_receive.py | 75 ------------ src/p1st/exec_print_transfer.py | 170 ---------------------------- src/p1st/test.py | 108 ------------------ 7 files changed, 16 insertions(+), 369 deletions(-) delete mode 100644 src/p1st/exec_print_receive.py delete mode 100644 src/p1st/exec_print_transfer.py diff --git a/src/p1st/btrfs_receive_chunks_v3.py b/src/p1st/btrfs_receive_chunks_v3.py index 5ded26e..8d5f3b1 100644 --- a/src/p1st/btrfs_receive_chunks_v3.py +++ b/src/p1st/btrfs_receive_chunks_v3.py @@ -5,6 +5,7 @@ import shlex from pathlib import Path from queue import Queue +from p1st.common import Message1, Message2 from p1st.exec_consume_chunks import execute_consume_chunks from p1st.repeat import repeat_until_successful @@ -39,14 +40,14 @@ def main(): f'{target_chunk_path}'] inform2_cmd = ['ssh', ssh_source, - f'echo OK | nc -U {shlex.quote(str(in_socket))}'] + f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(in_socket))}"] # Wait until next chunk can be transferred from sending side. - messages = ['OK\n', 'EOF\n'] + messages = [Message1.OK.value, Message1.EOF.value] msg = repeat_until_successful(inform_cmd, usr_confirmation_socket) if msg not in messages: - raise ValueError(f'Inalid message: >>>{msg}<<<') - last_chunk = msg == 'EOF\n' + raise ValueError(f'Invalid message: {msg}') + last_chunk = msg == Message1.EOF.value # Transfer chunk. repeat_until_successful(rsync_cmd, usr_confirmation_socket) diff --git a/src/p1st/btrfs_send_chunks_v3.py b/src/p1st/btrfs_send_chunks_v3.py index bfe3b9d..30ad8f3 100644 --- a/src/p1st/btrfs_send_chunks_v3.py +++ b/src/p1st/btrfs_send_chunks_v3.py @@ -4,6 +4,7 @@ import argparse import socket from pathlib import Path +from p1st.common import Message1, Message2 from p1st.exec_produce_chunks import execute_produce_chunks from p1st.unix_sock_input import accept_loop_until_message @@ -42,11 +43,11 @@ def main(): # Inform receiving side, that the next chunk can be transferred. inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') - inform_path.write_text('EOF\n' if last_chunk else 'OK\n') + inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value) # Read from in_socket # -> Receiving side informs us after the chunk has been transferred. - _command = accept_loop_until_message(in_sock, [b'OK\n']) + _command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value]) # Close in_socket in_sock.close() diff --git a/src/p1st/common.py b/src/p1st/common.py index cc06c1f..ec80eba 100644 --- a/src/p1st/common.py +++ b/src/p1st/common.py @@ -1,15 +1,13 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from pathlib import Path +from enum import Enum -def _get_chunk_file(chunk_file_tmpl: Path, ct: int): - return chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') +class Message1(Enum): + OK = 'OK\n' + EOF = 'EOF\n' -def _get_remote_socket(target: Path): - return target.parent.joinpath(f'{target}.SOCKET') - - -def _get_chunk_tmpl(subvol: Path): - return subvol.parent.joinpath(f'{subvol.name}.CHUNK') +class Message2(Enum): + OK_BINARY = b'OK\n' + OK_STR = 'OK\n' diff --git a/src/p1st/exec_print_capture.py b/src/p1st/exec_print_capture.py index 6307bf9..434a192 100644 --- a/src/p1st/exec_print_capture.py +++ b/src/p1st/exec_print_capture.py @@ -4,7 +4,7 @@ from queue import Queue import sys import threading import subprocess -from typing import AnyStr, IO, Callable +from typing import AnyStr, IO class _Assert: diff --git a/src/p1st/exec_print_receive.py b/src/p1st/exec_print_receive.py deleted file mode 100644 index 96bca48..0000000 --- a/src/p1st/exec_print_receive.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -import subprocess -import sys -import threading -from pathlib import Path -from typing import IO, AnyStr, Callable - -from p1st.common import _get_chunk_file -from p1st.receive_inform import receive_inform - - -def _print_stdout(bin_pipe: IO[AnyStr]): - b: bytes - for b in bin_pipe: - sys.stdout.write(f'[STDOUT] {b.decode("UTF-8")}') - - # TODO: Has this any effect? - # bin_pipe.close() - - -def _print_stderr(bin_pipe: IO[AnyStr]): - b: bytes - for b in bin_pipe: - sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') - - # TODO: Has this any effect? - # bin_pipe.close() - - -def execute_print_receive_chunks(command: list[str], - socket_file: Path, - chunk_file_tmpl: Path, - get_chunk_file: Callable[[Path, int], Path] = _get_chunk_file, - ) -> int: - process = subprocess.Popen( - command, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - ) - - # TODO - This function - # - execute the command (print live stdout/stderr) - # - # TODO - Inside receive_inform() - # - while True - # - wait for message at socket_file - # - if 'OK\n' - # - determine chunk filename - # - read chunk, pass to stdin of command - # - delete chunk - # - if 'EOF\n' - # - break - # - # TODO - This function - # - wait for command to finish - - t_out = threading.Thread( - target=_print_stdout, args=(process.stdout,)) - t_err = threading.Thread( - target=_print_stderr, args=(process.stderr,)) - - for t in (t_out, t_err): - t.daemon = True - t.start() - - receive_inform(process.stdin, socket_file, chunk_file_tmpl, get_chunk_file) - - returncode = process.wait() - for t in (t_out, t_err): - t.join() - - return returncode diff --git a/src/p1st/exec_print_transfer.py b/src/p1st/exec_print_transfer.py deleted file mode 100644 index 6bd76a4..0000000 --- a/src/p1st/exec_print_transfer.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -import datetime -import time -from pathlib import Path -import sys -import threading -import subprocess -from typing import AnyStr, IO, Callable - -from p1st.common import _get_chunk_file -from p1st.data_units import DataUnitConverter - - -def _rotate_chunk(chunk_file: Path, chunk_number: int, eof: bool, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): - print(f'Transferring chunk') - chunk_transfer_fun(chunk_file, chunk_number, eof, *chunk_transfer_args) - - print(f'Removing chunk') - chunk_file.unlink(missing_ok=False) - - -def _save_chunk(chunk: bytes, chunk_file: Path): - print(f'Saving chunk {chunk_file}') - - # Fails if file does already exist. - with open(chunk_file, 'xb') as f: - f.write(chunk) - - -# def _save_output(out_pipe: IO[AnyStr], stdout_dir: Path): -# stdout_dir.mkdir(parents=True, exist_ok=False) -# -# b: bytes -# ct: int = 1 -# for b in out_pipe: -# stdout_dir.joinpath(str(ct)).write_bytes(b) -# ct += 1 -# -# # TODO: Has this any effect? -# # out_pipe.close() - - -def _save_output_rotating_chunks(out_pipe: IO[AnyStr], - chunk_file_tmpl: Path, - chunk_size, - chunk_transfer_fun: Callable, - chunk_transfer_args: tuple, - get_chunk_file: Callable[[Path, int], Path], - ): - start_time = time.time() - - ct: int = 1 - remaining_bytes = chunk_size - chunk: bytes = b'' - while True: - # https://docs.python.org/3/library/io.html#io.RawIOBase.read - # If 0 bytes are returned, and size was not 0, this indicates end of file. - # If the object is in non-blocking mode and no bytes are available, None is returned. - b = out_pipe.read(remaining_bytes) - if len(b) == 0: - # EOF reached. - chunk_file = get_chunk_file(chunk_file_tmpl, ct) - chunk_file.parent.mkdir(parents=True, exist_ok=True) - - _save_chunk(chunk, chunk_file) - _rotate_chunk(chunk_file, ct, True, chunk_transfer_fun, chunk_transfer_args) - - current_time = time.time() - elapsed_time = current_time - start_time - print(f'Elapsed time: {datetime.timedelta(seconds=elapsed_time)}\n' - f'Transferred: {DataUnitConverter.to_unit_auto_str(ct * chunk_size)}') - - break - chunk += b - - chunk_len = len(chunk) - if chunk_len == chunk_size: - # Next chunk is full. - chunk_file = get_chunk_file(chunk_file_tmpl, ct) - chunk_file.parent.mkdir(parents=True, exist_ok=True) - - _save_chunk(chunk, chunk_file) - _rotate_chunk(chunk_file, ct, False, chunk_transfer_fun, chunk_transfer_args) - - current_time = time.time() - elapsed_time = current_time - start_time - transferred_bytes = ct * chunk_size - bytes_per_second = transferred_bytes / elapsed_time - print(f'Elapsed time: {datetime.timedelta(seconds=elapsed_time)}\n' - f'Transferred: {DataUnitConverter.to_unit_auto_str(transferred_bytes)}\n' - f'Speed: {DataUnitConverter.to_unit_auto_str(bytes_per_second)}/s') - - chunk = b'' - remaining_bytes = chunk_size - ct += 1 - elif chunk_len < chunk_size: - remaining_bytes = chunk_size - chunk_len - else: - raise ValueError('Invalid state') - - # TODO: Has this any effect? - # out_pipe.close() - - -def _print_stderr(bin_pipe: IO[AnyStr]): - b: bytes - for b in bin_pipe: - sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') - - # TODO: Has this any effect? - # bin_pipe.close() - - -def execute_print_transfer_chunks(command: list[str], - chunk_file_tmpl: Path, - chunk_transfer_fun: Callable, - chunk_transfer_args: tuple, - chunk_size: int = 1024 * 1024, - get_chunk_file: Callable[[Path, int], Path] = _get_chunk_file, - ) -> int: - """ - Executes the given command. - - Until the command has finished: - - - saves a small part of the commands stdout to chunk_file = f'{chunk_file_tmpl.name}.{chunk_number}' - - calls chunk_transfer_fun with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args) - - eof_reached is True for the last chunk - - the last chunk may be smaller than chunk_size - - deletes chunk_file - - During command execution: Forwards stderr output of the command to stderr. - - :param chunk_file_tmpl: Used by get_chunk_file to calculate the paths of the chunk files. - :param command: Command to execute, e.g. ['cat', '/some/large/file'] - :param chunk_transfer_fun: Called for each chunk with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args) - :param chunk_transfer_args: - :param chunk_size: Defaults to 1MB (1024*1024). - :param get_chunk_file: Gets chunk_file_tmpl and chunk_number as input and returns the path of the corresponding chunk. - :return: returncode of executed command - """ - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - ) - - t_out = threading.Thread( - target=_save_output_rotating_chunks, - args=(process.stdout, - chunk_file_tmpl, - chunk_size, - chunk_transfer_fun, - chunk_transfer_args, - get_chunk_file, - )) - t_err = threading.Thread( - target=_print_stderr, args=(process.stderr,)) - - for t in (t_out, t_err): - t.daemon = True - t.start() - - returncode = process.wait() - for t in (t_out, t_err): - t.join() - - return returncode diff --git a/src/p1st/test.py b/src/p1st/test.py index d40a0d4..f516439 100644 --- a/src/p1st/test.py +++ b/src/p1st/test.py @@ -8,8 +8,6 @@ from pathlib import Path from p1st.exec_capture import execute_capture from p1st.exec_print_capture import execute_print_capture -from p1st.exec_print_receive import execute_print_receive_chunks -from p1st.exec_print_transfer import execute_print_transfer_chunks from p1st.exec_produce_chunks import execute_produce_chunks from p1st.transfer_inform import transfer_inform @@ -17,10 +15,7 @@ from p1st.transfer_inform import transfer_inform def test(): # test1() # test2() - # test3() # test4() - # test5() - # test67() # test8() # test9() # test10() @@ -47,23 +42,6 @@ def test2(): returncode, out, err = execute_print_capture(['ls', '/foo/bar']) -def test3(): - _init(3) - - def _chunk_transfer(chunk_file: Path, ct: int, eof: bool): - print(f'Transferring chunk {chunk_file} to ... (This is the default method, it has no effect)') - if eof: - print(f'The last chunk has been transferred.') - - returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1'), _chunk_transfer, ()) - print("TEST 3-2") - returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2'), _chunk_transfer, ()) - print("TEST 3-3") - returncode = execute_print_transfer_chunks(['cat', 'transfer_inform.py'], Path('test/3-3'), - _chunk_transfer, (), - chunk_size=1024) - - def test4(): _init(4) @@ -74,32 +52,6 @@ def test4(): ) -def test5(): - _init(5) - - chunk_file_tmpl = Path('test/5') - source_file = Path('transfer_inform.py') # A python script file with some content to copy ;) - remote_target_file = Path(f'test/5-copy-of-{source_file}') - concat_script = Path('test/5-concat') - - concat_script.write_text(f'#!/usr/bin/bash\n' - f'echo "rsync $1 ... command output"\n' - f'cat "$1" >> "$2"') - os.chmod(concat_script, 0o0755) - - # TODO: - # When running this test: - # The intentionally generated error, can be fixed by touching (creating) the missing file. - - execute_print_transfer_chunks( - command=['cat', str(source_file)], - chunk_file_tmpl=chunk_file_tmpl, - chunk_transfer_fun=_test5_chunk_transfer_fun, - chunk_transfer_args=(concat_script, remote_target_file), - chunk_size=512, - ) - - def _test5_chunk_transfer_fun(chunk_file: Path, chunk_number: int, eof: bool, @@ -117,66 +69,6 @@ def _test5_chunk_transfer_fun(chunk_file: Path, ) -def test67(): - hostname = socket.gethostname() - if hostname == 'yodaTux': - test6() # LOCAL - elif hostname == 'danctnix': - test7() # REMOTE - else: - print(f'Unknown hostname {hostname}') - - -def test6(): - _init(6) - - source_file = Path('transfer_inform.py') # A python script file with some content to copy ;) - chunk_file_tmpl = Path('test/6-transfer_inform.py') - ssh_target = 'pine-pwdless' - target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/7-transfer_inform.py') - - execute_print_transfer_chunks( - command=['cat', str(source_file)], - chunk_file_tmpl=chunk_file_tmpl, - chunk_transfer_fun=_test6_chunk_transfer_fun, - chunk_transfer_args=(chunk_file_tmpl, ssh_target, target_file_tmpl), - chunk_size=512, - ) - - -def _test6_chunk_transfer_fun(source_chunk: Path, - chunk_number: int, - eof: bool, - chunk_file_tmpl: Path, - ssh_target: str, - target_file_tmpl: Path, - ): - target_chunk = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.{chunk_number}') - rsync_cmd = ['rsync', str(source_chunk), f'{ssh_target}:{str(target_chunk)}'] - message = 'EOF' if eof else 'OK' - target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET') - inform_cmd = ['ssh', ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] - - transfer_inform( - rsync_cmd=rsync_cmd, - inform_cmd=inform_cmd, - user_input_file=source_chunk.parent.joinpath(f'{source_chunk.name}.SOCKET'), - ) - - -def test7(): - _init(7) - - target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/7-transfer_inform.py') - target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET') - - execute_print_receive_chunks( - ['tee', str(target_file_tmpl)], - target_socket, - target_file_tmpl, - ) - - def test8(): repo_name = 'subprocess_util'