diff --git a/setup.cfg b/setup.cfg index df7857d..131d758 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,4 +33,5 @@ where = src ; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html console_scripts = btrfs-send-chunks = p1st.btrfs_send_chunks:main + btrfs-send-chunks-v2 = p1st.btrfs_send_chunks_v2:main btrfs-receive-chunks = p1st.btrfs_receive_chunks:main diff --git a/src/p1st/btrfs_send_chunks_v2.py b/src/p1st/btrfs_send_chunks_v2.py new file mode 100644 index 0000000..18b0ab1 --- /dev/null +++ b/src/p1st/btrfs_send_chunks_v2.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import argparse +import shlex +from pathlib import Path + +from p1st.exec_print_chunk import execute_print_chunk +from p1st.repeat import repeat_until_successful + + +def main(): + args = parse_args() + + command_parts = ( + ['btrfs', 'send'], + ['-p', str(args.parent)] if args.parent else [], + ['--compress-data'] if args.compressed_data else [], + [str(args.child)] + ) + command = [x for xs in command_parts for x in xs] + + def get_chunk_path(chunk_no: int) -> Path: + return args.chunk_dir.joinpath(f'{args.child.name}.CHUNK.{chunk_no}') + + def handle_chunk(chunk_no: int, last_chunk: bool): + chunk_path = get_chunk_path(chunk_no) + target_chunk_path = args.target_chunk_dir.joinpath(f'{args.child.name}.CHUNK.{chunk_no}') + print(f'Handling chunk {chunk_path}') + + usr_confirmation_socket = chunk_path.parent.joinpath(f'{chunk_path.name}.SOCKET') + target_socket = args.target_path.parent.joinpath(f'{args.target_path.name}.SOCKET') + + message = 'EOF' if last_chunk else 'OK' + + rsync_cmd = ['rsync', + str(chunk_path), + f'{args.ssh_target}:{str(target_chunk_path)}'] + inform_cmd = ['ssh', + args.ssh_target, + f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] + + repeat_until_successful(rsync_cmd, usr_confirmation_socket) + repeat_until_successful(inform_cmd, usr_confirmation_socket) + + execute_print_chunk( + command=command, + get_chunk_path=get_chunk_path, + handle_chunk=handle_chunk, + chunk_size=args.chunk_size, + ) + + +def parse_args(): + parser = argparse.ArgumentParser(prog='btrfs-send-chunks') + + parser.add_argument('-p', + help='Path to parent subvolume; forwarded to btrfs-send.', + dest='parent', + default=None, + type=Path, + metavar='PARENT_SUBVOLUME' + ) + + parser.add_argument('--compressed-data', + help='Forwarded to btrfs-send.', + dest='compressed_data', + action='store_true', + default=False, + ) + + parser.add_argument('--chunk-size', + help='Size in bytes; defaults to 128 MB.', + dest='chunk_size', + type=int, + default=128 * 1024 * 1024, + ) + + parser.add_argument('--chunk-dir', + help='Chunks are saved in this directory. ' + 'Defaults to parent directory of CHILD.', + dest='chunk_dir', + type=Path, + metavar='CHUNK_DIR', + default=None, + ) + + parser.add_argument('--target-chunk-dir', + help='Chunks are saved in this directory on SSH_TARGET. ' + 'Must be an absolute path. ' + 'Defaults to parent directory of TARGET_PATH.', + dest='target_chunk_dir', + type=Path, + metavar='TARGET_CHUNK_DIR', + default=None, + ) + + parser.add_argument('child', + help='Path to child subvolume. Forwarded to btrfs-send.', + type=Path, + metavar='CHILD_SUBVOLUME' + ) + + parser.add_argument('ssh_target', + help='Hostname of target computer; as configured in ~/.ssh/config.', + metavar='SSH_TARGET' + ) + + parser.add_argument('target_path', + help='Absolute path where the subvolume will be created on SSH_TARGET.', + type=Path, + metavar='TARGET_PATH' + ) + + args = parser.parse_args() + + # Make all paths absolute. Set default values. + args.child = args.child.absolute() + if not args.target_path.is_absolute(): + raise ValueError('TARGET_PATH must be absolute') + # + if args.chunk_dir: + args.chunk_dir = args.chunk_dir.absolute() + else: # Default value + args.chunk_dir = args.child.absolute().parent + if args.target_chunk_dir: + if not args.target_chunk_dir.is_absolute(): + raise ValueError('TARGET_CHUNK_DIR must be absolute') + else: # Default value + args.target_chunk_dir = args.target_path.parent + # + if args.parent: + args.parent = args.parent.absolute() + + return args + + +if __name__ == '__main__': + main() diff --git a/src/p1st/exec_print_chunk.py b/src/p1st/exec_print_chunk.py new file mode 100644 index 0000000..af5289d --- /dev/null +++ b/src/p1st/exec_print_chunk.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import subprocess +import sys +import threading +from pathlib import Path +from queue import Queue +from typing import Callable, IO, AnyStr + + +def execute_print_chunk(command: list[str], + get_chunk_path: Callable[[int], Path], + handle_chunk: Callable[[int, bool], None], + chunk_size: int = 512, # TODO increase + ) -> int: + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + ) + + q = Queue(maxsize=2) + + threads = [ + threading.Thread( + target=_stdout_worker, + args=(process.stdout, + get_chunk_path, + q.put, + chunk_size, + )), + threading.Thread( + target=_chunk_worker, + args=(q.get, + handle_chunk, + )), + threading.Thread( + target=_stderr_worker, + args=(process.stderr, + )), + ] + + for t in threads: + t.daemon = True + t.start() + + returncode: int = process.wait() + for t in threads: + t.join() + + return returncode + + +def _stdout_worker(binary_stdout: IO[AnyStr], + get_chunk_path: Callable[[int], Path], + queue_put: Queue.put, + chunk_size: int, + ): + chunk_no: int = 1 + remaining_bytes: int = chunk_size + chunk: bytes = b'' + + while True: + b = binary_stdout.read(remaining_bytes) + if len(b) == 0: # EOF reached. + chunk_path: Path = get_chunk_path(chunk_no) + _save_chunk(chunk, chunk_path) + queue_put((chunk_no, True)) + + break + + chunk += b + chunk_len: int = len(chunk) + if chunk_len == chunk_size: # Next chunk is full. + chunk_path: Path = get_chunk_path(chunk_no) + _save_chunk(chunk, chunk_path) + queue_put((chunk_no, False)) + + chunk = b'' + remaining_bytes = chunk_size + chunk_no += 1 + elif chunk_len < chunk_size: + remaining_bytes = chunk_size - chunk_len + else: + raise ValueError('Invalid state') + + # TODO: Has this any effect? + # binary_stdout.close() + + +def _save_chunk(chunk: bytes, chunk_path: Path): + """ + Saves a chunk at the given path. + """ + print(f'Saving chunk {chunk_path}') + chunk_path.parent.mkdir(parents=True, exist_ok=True) + + # Fails if file does already exist. + with open(chunk_path, 'xb') as f: + f.write(chunk) + + +def _chunk_worker(queue_get: Queue.get, + handle_chunk: Callable[[int, bool], None], + ): + """ + Calls handle_chunk(chunk_no, last_chunk) for each chunk + that has been saved. + """ + while True: + chunk_no, last_chunk = queue_get() + handle_chunk(chunk_no, last_chunk) + if last_chunk: + break + + +def _stderr_worker(binary_stderr: IO[AnyStr]): + """ + Prints stderr of subprocess to sys.stderr. + """ + + b: bytes + for b in binary_stderr: + sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') + + # TODO: Has this any effect? + # binary_stderr.close() diff --git a/src/p1st/receive_inform.py b/src/p1st/receive_inform.py index 07af909..5ea544f 100644 --- a/src/p1st/receive_inform.py +++ b/src/p1st/receive_inform.py @@ -4,7 +4,7 @@ import socket from pathlib import Path from typing import IO, AnyStr, Callable -from p1st.unix_sock_input import accept_loop_until_command_received +from p1st.unix_sock_input import accept_loop_until_message def receive_inform(in_pipe: IO[AnyStr], @@ -25,10 +25,10 @@ def receive_inform(in_pipe: IO[AnyStr], sock.listen(1) ct = 1 - commands = [b'OK\n', b'EOF\n'] + messages = [b'OK\n', b'EOF\n'] while True: - command = accept_loop_until_command_received(sock, commands) - if command not in commands: + msg = accept_loop_until_message(sock, messages) + if msg not in messages: raise ValueError("Invalid state") chunk_file = get_chunk_file(chunk_file_tmpl, ct) @@ -37,9 +37,9 @@ def receive_inform(in_pipe: IO[AnyStr], # in_pipe.flush() # TODO: is this required? chunk_file.unlink(missing_ok=False) - if command == b'OK\n': + if msg == b'OK\n': ct += 1 - elif command == b'EOF\n': + elif msg == b'EOF\n': break else: raise ValueError("Invalid state") diff --git a/src/p1st/repeat.py b/src/p1st/repeat.py new file mode 100644 index 0000000..7dc5edb --- /dev/null +++ b/src/p1st/repeat.py @@ -0,0 +1,30 @@ +from pathlib import Path + +from p1st.exec_print_capture import execute_print_capture +from p1st.unix_sock_input import wait_for_message + + +def repeat_until_successful(command: list[str], + socket_file: Path) -> None: + """ + Executes the given `command`. + + If an error occurs, it creates a UNIX socket at + `socket_file` and waits for user input. + """ + while True: + returncode, _out, _err = execute_print_capture(command) + if returncode == 0: + return + + print(f'\n' + f'Error while executing:\n' + f'\t{command}\n' + f'\tFor details, see above output.') + + print(f'Info:\n' + f'\tPlease fix the above error first. Then continue here:\n' + f'\tsudo pacman -S --needed openbsd-netcat\n' + f'\techo "OK" | nc -U "{socket_file.absolute()}"') + wait_for_message(socket_file, [b'OK\n']) + print() diff --git a/src/p1st/test.py b/src/p1st/test.py index 856fe57..f9bd259 100644 --- a/src/p1st/test.py +++ b/src/p1st/test.py @@ -8,6 +8,7 @@ from pathlib import Path from p1st.exec_capture import execute_capture from p1st.exec_print_capture import execute_print_capture +from p1st.exec_print_chunk import execute_print_chunk from p1st.exec_print_receive import execute_print_receive_chunks from p1st.exec_print_transfer import execute_print_transfer_chunks from p1st.transfer_inform import transfer_inform @@ -21,7 +22,9 @@ def test(): # test5() # test67() # test8() - test9() + # test9() + # test10() + test11() def test1(): @@ -207,6 +210,63 @@ def test9(): print(f'sudo btrfs-send-chunks {child_path} {ssh_target} {target_path}') +def test10(): + _init(10) + + source_file = Path('src/p1st/transfer_inform.py') + chunk_dir = Path('test') + target_file = Path('test/transfer_inform.py') + + def get_chunk_path(chunk_no: int): + return chunk_dir.joinpath(f'{source_file.name}.CHUNK.{chunk_no}') + + def handle_chunk(chunk_no: int, last_chunk: bool): + chunk_path = get_chunk_path(chunk_no) + print(f'Handling chunk {chunk_path}') + + # Read chunk. + chunk = chunk_path.read_bytes() + # Append chunk to target. + with target_file.open("ab") as f: + f.write(chunk) + # Delete chunk. + chunk_path.unlink(missing_ok=False) + + execute_print_chunk( + command=['cat', str(source_file)], + get_chunk_path=get_chunk_path, + handle_chunk=handle_chunk, + chunk_size=512, + ) + + +def test11(): + repo_name = 'subprocess_util' + + child_name = 'test-subvolume' + child_dir = '/mnt/backup/test-dir' + child_path = f'{child_dir}/{child_name}' + target_dir = '/mnt/data/test-dir' + target_path = f'{target_dir}/{child_name}' + ssh_target = 'rootnas' + + print(f'=== In one shell, connect with "ssh nas" ===') + print(f'\tsudo mkdir {target_dir}') + print(f'\tcd {repo_name} && make && sudo make clean && cd ..') + print() + print(f'\tsudo btrfs-receive-chunks {target_path}') + print() + + print(f'=== In another shell, connect with "ssh odroid" ===') + print(f'\tsudo mkdir {child_dir}') + print(f'\tsudo btrfs subvolume create {child_path}.writeable') + print(f'\techo foo | sudo tee {child_path}.writeable/bar') + print(f'\tsudo btrfs subvolume snapshot -r {child_path}.writeable {child_path}') + print(f'\tcd {repo_name} && make && sudo make clean && cd ..') + print() + print(f'\tsudo btrfs-send-chunks-v2 {child_path} {ssh_target} {target_path}') + + def _init(test_number: int): print(f"TEST {test_number}") test_dir = Path('test') diff --git a/src/p1st/transfer_inform.py b/src/p1st/transfer_inform.py index 550fd6f..a97ad67 100644 --- a/src/p1st/transfer_inform.py +++ b/src/p1st/transfer_inform.py @@ -2,8 +2,7 @@ # -*- coding: utf-8 -*- from pathlib import Path -from p1st.exec_print_capture import execute_print_capture -from p1st.unix_sock_input import wait_until_command_received +from p1st.repeat import repeat_until_successful def transfer_inform(rsync_cmd: list[str], @@ -26,29 +25,9 @@ def transfer_inform(rsync_cmd: list[str], # - rsync to remote pc # - catch error # - wait until user input, then repeat - _execute_loop_until_successful(rsync_cmd, user_input_file) + repeat_until_successful(rsync_cmd, user_input_file) # - inform remote pc about complete rsync # - catch error # - wait until user input, then repeat - _execute_loop_until_successful(inform_cmd, user_input_file) - - -def _execute_loop_until_successful(cmd: list[str], user_input_file: Path): - while True: - returncode, _out, _err = execute_print_capture(cmd) - if returncode == 0: - break - else: - print(f'\n' - f'Error while executing:\n' - f'\t{cmd}\n' - f'\tFor details, see above output.') - - print(f'Info:\n' - f'\tPlease fix the above error first. Then continue here:\n' - f'\tsudo pacman -S --needed openbsd-netcat\n' - f'\techo "OK" | nc -U "{user_input_file.absolute()}"') - wait_until_command_received(user_input_file, [b'OK\n']) - - print() + repeat_until_successful(inform_cmd, user_input_file) diff --git a/src/p1st/unix_sock_input.py b/src/p1st/unix_sock_input.py index e74ca0b..2c680c2 100644 --- a/src/p1st/unix_sock_input.py +++ b/src/p1st/unix_sock_input.py @@ -4,16 +4,17 @@ import socket from pathlib import Path -def wait_until_command_received(socket_file: Path, commands: list[bytes]) -> bytes: +def wait_for_message(socket_file: Path, + messages: list[bytes]) -> bytes: """ - Creates a UNIX socket at socket_file. + Creates a UNIX socket at `socket_file`. Accepts connections on the UNIX socket, - until one client sends one of the given commands as first message. + until a client sends one of the given `messages`. Closes the UNIX socket. - :returns: The command that was received. + :returns: The message that was received. """ # INSPIRATION: https://pymotw.com/3/socket/uds.html @@ -23,7 +24,7 @@ def wait_until_command_received(socket_file: Path, commands: list[bytes]) -> byt sock.bind(str(socket_file)) sock.listen(1) - command = accept_loop_until_command_received(sock, commands) + command = accept_loop_until_message(sock, messages) print(f'Closing socket {socket_file}') sock.close() @@ -32,7 +33,8 @@ def wait_until_command_received(socket_file: Path, commands: list[bytes]) -> byt return command -def accept_loop_until_command_received(sock: socket.socket, commands: list[bytes]) -> bytes: +def accept_loop_until_message(sock: socket.socket, + messages: list[bytes]) -> bytes: """ Uses an open UNIX socket. @@ -44,16 +46,16 @@ def accept_loop_until_command_received(sock: socket.socket, commands: list[bytes :returns: The command that was received. """ - bufsize = max([len(cmd) for cmd in commands]) + len(b'\n') + bufsize = max([len(msg) for msg in messages]) + len(b'\n') while True: connection, client_address = sock.accept() try: b: bytes = connection.recv(bufsize) - for cmd in commands: - if b == cmd: - print(f'Received "{cmd}".') - return cmd + for msg in messages: + if b == msg: + print(f'Received "{msg}".') + return msg print(f'Received unknown message: {b}') except Exception as e: print(f'Error while reading message: {e}')