diff --git a/setup.cfg b/setup.cfg index 0bedd93..3a36d5e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,5 +33,6 @@ where = src ; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html console_scripts = btrfs-send-chunks-v2 = p1st.btrfs_send_chunks_v2:main - btrfs-receive-chunks = p1st.btrfs_receive_chunks:main + btrfs-send-chunks-v3 = p1st.btrfs_send_chunks_v3:main btrfs-receive-chunks-v2 = p1st.btrfs_receive_chunks_v2:main + btrfs-receive-chunks-v3 = p1st.btrfs_receive_chunks_v3:main diff --git a/src/p1st/btrfs_receive_chunks_v2.py b/src/p1st/btrfs_receive_chunks_v2.py index d3626d3..4737b75 100644 --- a/src/p1st/btrfs_receive_chunks_v2.py +++ b/src/p1st/btrfs_receive_chunks_v2.py @@ -12,10 +12,10 @@ from p1st.unix_sock_input import accept_loop_until_message def main(): args = parse_args() # - chunk_dir: Path = args.chunk_dir + target_chunk_dir: Path = args.chunk_dir target_path: Path = args.target_path - chunk_dir.mkdir(parents=True, exist_ok=True) + target_chunk_dir.mkdir(parents=True, exist_ok=True) command = ['btrfs', 'receive', str(target_path.parent)] @@ -34,7 +34,7 @@ def main(): msg = accept_loop_until_message(sock, messages) last_chunk = msg == b'EOF\n' - chunk_path = chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') + chunk_path = target_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') queue_put((chunk_path, last_chunk)) if last_chunk: diff --git a/src/p1st/btrfs_receive_chunks_v3.py b/src/p1st/btrfs_receive_chunks_v3.py new file mode 100644 index 0000000..5ded26e --- /dev/null +++ b/src/p1st/btrfs_receive_chunks_v3.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import argparse +import shlex +from pathlib import Path +from queue import Queue + +from p1st.exec_consume_chunks import execute_consume_chunks +from p1st.repeat import repeat_until_successful + + +def main(): + args = parse_args() + # + ssh_source: str = args.ssh_source + source_chunk_dir: Path = args.source_chunk_dir + targe_chunk_dir: Path = args.chunk_dir + target_path: Path = args.target_path + + target_path.parent.mkdir(parents=True, exist_ok=True) + + command = ['btrfs', 'receive', str(target_path.parent)] + + def handle_chunks(queue_put: Queue.put): + chunk_no = 1 + while True: + target_chunk_path = targe_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') + source_chunk_path = source_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') + in_socket: Path = source_chunk_dir.parent.joinpath(f'{target_path.name}.SOCKET.in') + + usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{target_chunk_path.name}.SOCKET') + + inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') + inform_cmd = ['ssh', + ssh_source, + f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}'] + rsync_cmd = ['rsync', + f'{ssh_source}:{source_chunk_path}', + f'{target_chunk_path}'] + inform2_cmd = ['ssh', + ssh_source, + f'echo OK | nc -U {shlex.quote(str(in_socket))}'] + + # Wait until next chunk can be transferred from sending side. + messages = ['OK\n', 'EOF\n'] + msg = repeat_until_successful(inform_cmd, usr_confirmation_socket) + if msg not in messages: + raise ValueError(f'Inalid message: >>>{msg}<<<') + last_chunk = msg == 'EOF\n' + + # Transfer chunk. + repeat_until_successful(rsync_cmd, usr_confirmation_socket) + + # Add chunk path to queue. + queue_put((target_chunk_path, last_chunk)) + + # Inform sending side about successful transfer. + repeat_until_successful(inform2_cmd, usr_confirmation_socket) + + if last_chunk: + break + chunk_no += 1 + + returncode = execute_consume_chunks( + command=command, + handle_chunks=handle_chunks, + ) + exit(returncode) + + +def parse_args(): + parser = argparse.ArgumentParser(prog='btrfs-receive-chunks') + + parser.add_argument('--chunk-dir', + help='Chunks are saved in this directory. ' + 'Defaults to parent directory of SUBVOLUME.', + dest='chunk_dir', + type=Path, + metavar='CHUNK_DIR', + default=None, + ) + + parser.add_argument('ssh_source', + help='Hostname of source computer; as configured in ~/.ssh/config.', + metavar='SSH_SOURCE' + ) + + parser.add_argument('source_chunk_dir', + help='Chunks are saved in this directory on SSH_SOURCE. ' + 'Must be an absolute path.', + type=Path, + metavar='SOURCE_CHUNK_DIR', + ) + + parser.add_argument('target_path', + help='Path where the subvolume will be created. ' + 'The last component of the path ' + 'must be equal to the name of the subvolume on the sending side.', + type=Path, + metavar='SUBVOLUME' + ) + + args = parser.parse_args() + + # Make all paths absolute. Set default values. + args.target_path = args.target_path.absolute() + if args.chunk_dir: + args.chunk_dir = args.chunk_dir.absolute() + else: + args.chunk_dir = args.target_path.absolute().parent + if not args.source_chunk_dir.is_absolute(): + raise ValueError(f'SOURCE_CHUNK_DIR must be absolute') + + return args + + +if __name__ == '__main__': + main() diff --git a/src/p1st/btrfs_send_chunks_v2.py b/src/p1st/btrfs_send_chunks_v2.py index 64a69de..64f60c8 100644 --- a/src/p1st/btrfs_send_chunks_v2.py +++ b/src/p1st/btrfs_send_chunks_v2.py @@ -16,7 +16,7 @@ def main(): compressed_data: bool = args.compressed_data child: Path = args.child parent: Path | None = args.parent - chunk_dir: Path = args.chunk_dir + source_chunk_dir: Path = args.chunk_dir target_chunk_dir: Path = args.target_chunk_dir target_path: Path = args.target_path @@ -28,21 +28,21 @@ def main(): ) command = [x for xs in command_parts for x in xs] - def get_chunk_path(chunk_no: int) -> Path: - return chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') + def get_source_chunk_path(chunk_no: int) -> Path: + return source_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') def handle_chunk(chunk_no: int, last_chunk: bool): - chunk_path = get_chunk_path(chunk_no) + source_chunk_path = get_source_chunk_path(chunk_no) target_chunk_path = target_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') - print(f'Handling chunk {chunk_path}') + print(f'Handling chunk {source_chunk_path}') - usr_confirmation_socket = chunk_path.parent.joinpath(f'{chunk_path.name}.SOCKET') + usr_confirmation_socket = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.SOCKET') target_socket = target_path.parent.joinpath(f'{target_path.name}.SOCKET') message = 'EOF' if last_chunk else 'OK' rsync_cmd = ['rsync', - str(chunk_path), + str(source_chunk_path), f'{ssh_target}:{str(target_chunk_path)}'] inform_cmd = ['ssh', ssh_target, @@ -53,7 +53,7 @@ def main(): returncode = execute_produce_chunks( command=command, - get_chunk_path=get_chunk_path, + get_chunk_path=get_source_chunk_path, handle_chunk=handle_chunk, chunk_size=chunk_size, ) diff --git a/src/p1st/btrfs_send_chunks_v3.py b/src/p1st/btrfs_send_chunks_v3.py index 7ab2110..bfe3b9d 100644 --- a/src/p1st/btrfs_send_chunks_v3.py +++ b/src/p1st/btrfs_send_chunks_v3.py @@ -1,14 +1,11 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse -import shlex import socket from pathlib import Path from p1st.exec_produce_chunks import execute_produce_chunks -from p1st.repeat import repeat_until_successful from p1st.unix_sock_input import accept_loop_until_message -from p1st.unix_socket_output import write_message def main(): @@ -18,9 +15,9 @@ def main(): compressed_data: bool = args.compressed_data child: Path = args.child parent: Path | None = args.parent - chunk_dir: Path = args.chunk_dir + source_chunk_dir: Path = args.chunk_dir - chunk_dir.mkdir(parents=True, exist_ok=True) + source_chunk_dir.mkdir(parents=True, exist_ok=True) command_parts = ( ['btrfs', 'send'], @@ -30,24 +27,22 @@ def main(): ) command = [x for xs in command_parts for x in xs] - def get_chunk_path(chunk_no: int) -> Path: - return chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') + def get_source_chunk_path(chunk_no: int) -> Path: + return source_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') def handle_chunk(chunk_no: int, last_chunk: bool): - chunk_path = get_chunk_path(chunk_no) - print(f'Handling chunk {chunk_path}') + source_chunk_path = get_source_chunk_path(chunk_no) + print(f'Handling chunk {source_chunk_path}') # Create in_socket - in_socket: Path = chunk_dir.parent.joinpath(f'{child.name}.SOCKET.in') + in_socket: Path = source_chunk_dir.parent.joinpath(f'{child.name}.SOCKET.in') in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) in_sock.bind(str(in_socket)) in_sock.listen(1) - # Write to out_socket - # -> Inform receiving side, that the next chung can be transferred. - out_socket: Path = chunk_dir.parent.joinpath(f'{child.name}.SOCKET.out') - message = b'EOF\n' if last_chunk else b'OK\n' - write_message(out_socket, message) + # Inform receiving side, that the next chunk can be transferred. + inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') + inform_path.write_text('EOF\n' if last_chunk else 'OK\n') # Read from in_socket # -> Receiving side informs us after the chunk has been transferred. @@ -59,7 +54,7 @@ def main(): returncode = execute_produce_chunks( command=command, - get_chunk_path=get_chunk_path, + get_chunk_path=get_source_chunk_path, handle_chunk=handle_chunk, chunk_size=chunk_size, ) @@ -100,49 +95,21 @@ def parse_args(): default=None, ) - parser.add_argument('--target-chunk-dir', - help='Chunks are saved in this directory on SSH_TARGET. ' - 'Must be an absolute path. ' - 'Defaults to parent directory of TARGET_PATH.', - dest='target_chunk_dir', - type=Path, - metavar='TARGET_CHUNK_DIR', - default=None, - ) - parser.add_argument('child', help='Path to child subvolume. Forwarded to btrfs-send.', type=Path, metavar='CHILD_SUBVOLUME' ) - parser.add_argument('ssh_target', - help='Hostname of target computer; as configured in ~/.ssh/config.', - metavar='SSH_TARGET' - ) - - parser.add_argument('target_path', - help='Absolute path where the subvolume will be created on SSH_TARGET.', - type=Path, - metavar='TARGET_PATH' - ) - args = parser.parse_args() # Make all paths absolute. Set default values. args.child = args.child.absolute() - if not args.target_path.is_absolute(): - raise ValueError('TARGET_PATH must be absolute') # if args.chunk_dir: args.chunk_dir = args.chunk_dir.absolute() else: # Default value args.chunk_dir = args.child.absolute().parent - if args.target_chunk_dir: - if not args.target_chunk_dir.is_absolute(): - raise ValueError('TARGET_CHUNK_DIR must be absolute') - else: # Default value - args.target_chunk_dir = args.target_path.parent # if args.parent: args.parent = args.parent.absolute() diff --git a/src/p1st/data_units.py b/src/p1st/data_units.py index 26d2721..3134596 100644 --- a/src/p1st/data_units.py +++ b/src/p1st/data_units.py @@ -32,7 +32,7 @@ class DataUnitConverter: return round(num_bytes / cls._unit_dict[unit], 3) @classmethod - def to_unit_auto(cls, num_bytes: int) -> (float, DataUnit): + def to_unit_auto(cls, num_bytes: int) -> tuple[float, DataUnit]: for unit, factor in cls._unit_dict.items(): converted = cls.to_unit(num_bytes, unit) if converted <= 999: diff --git a/src/p1st/exec_capture.py b/src/p1st/exec_capture.py index 4e3b94a..b5e7cd3 100644 --- a/src/p1st/exec_capture.py +++ b/src/p1st/exec_capture.py @@ -3,7 +3,7 @@ import subprocess -def execute_capture(command: list[str]) -> [int, str, str]: +def execute_capture(command: list[str]) -> tuple[int, str, str]: completed: subprocess.CompletedProcess = subprocess.run( command, capture_output=True, diff --git a/src/p1st/exec_print_capture.py b/src/p1st/exec_print_capture.py index ba97e81..6307bf9 100644 --- a/src/p1st/exec_print_capture.py +++ b/src/p1st/exec_print_capture.py @@ -19,12 +19,14 @@ class _Assert: raise ValueError(f'Expected a and b to be equal: {a}, {b}') -def _read_output(str_pipe: IO[AnyStr], queue_put: Queue.put, list_append: list.append, prefix: str = ''): +def _read_output(str_pipe: IO[AnyStr], + queue_put: Queue.put, + list_append: list.append, + prefix: str = ''): line: str for line in str_pipe: - func: Callable[[str], None] - for func in (queue_put, list_append): - func(f'{prefix}{line}') + queue_put(f'{prefix}{line}') + list_append(line) # TODO: Has this any effect? # str_pipe.close() @@ -73,7 +75,7 @@ def _write_output(queue_get: Queue.get): # # - Read data from stdout and stderr, until end-of-file is reached. # - Note: The data read is buffered in memory, so do not use this method if the data size is large or unlimited. -def execute_print_capture(command: list[str], encoding='UTF-8') -> [int, list[str], list[str]]: +def execute_print_capture(command: list[str], encoding='UTF-8') -> tuple[int, str, str]: """ Executes the given command. @@ -124,4 +126,4 @@ def execute_print_capture(command: list[str], encoding='UTF-8') -> [int, list[st q.put(None) t_write.join() - return returncode, out, err + return returncode, ''.join(out), ''.join(err) diff --git a/src/p1st/repeat.py b/src/p1st/repeat.py index 7dc5edb..b2e2a0c 100644 --- a/src/p1st/repeat.py +++ b/src/p1st/repeat.py @@ -5,7 +5,7 @@ from p1st.unix_sock_input import wait_for_message def repeat_until_successful(command: list[str], - socket_file: Path) -> None: + socket_file: Path) -> str: """ Executes the given `command`. @@ -13,9 +13,11 @@ def repeat_until_successful(command: list[str], `socket_file` and waits for user input. """ while True: - returncode, _out, _err = execute_print_capture(command) + returncode: int + out: str + returncode, out, _err = execute_print_capture(command) if returncode == 0: - return + return out print(f'\n' f'Error while executing:\n' diff --git a/src/p1st/test.py b/src/p1st/test.py index 8c50431..d40a0d4 100644 --- a/src/p1st/test.py +++ b/src/p1st/test.py @@ -25,7 +25,8 @@ def test(): # test9() # test10() # test11() - test12() + # test12() + test13() def test1(): @@ -273,8 +274,20 @@ def test12(): ssh_target = 'rootnas' target_path = '/mnt/data/test/blogger.privacy1st.de/20230104T2255' target_chunk_path = '/mnt/data/test/chunks' - print(f'sudo btrfs-receive-chunks-v2 --chunk-dir={target_chunk_path} {target_path}') - print(f'sudo btrfs-send-chunks-v2 --chunk-dir=/mnt/backup/test/chunks --target-chunk-dir={target_chunk_path} {child_path} {ssh_target} {target_path}') + print(f'nas: sudo btrfs-receive-chunks-v2 --chunk-dir={target_chunk_path} {target_path}') + print( + f'odroid: sudo btrfs-send-chunks-v2 --chunk-dir=/mnt/backup/test/chunks --target-chunk-dir={target_chunk_path} {child_path} {ssh_target} {target_path}') + + +def test13(): + ssh_source = 'rootnas' + source_path = '/mnt/data/snap/blogger.privacy1st.de/20230104T1622_u' + source_chunk_dir = '/mnt/data/test/chunks' + + target_path = '/mnt/backup/test/blogger.privacy1st.de/20230104T1622_u' + + print(f'nas: sudo btrfs-send-chunks-v3 --chunk-dir={source_chunk_dir} {source_path}') + print(f'odroid: sudo btrfs-receive-chunks-v3 {ssh_source} {source_chunk_dir} {target_path}') def _init(test_number: int):