From bfd2906ccdf8f7916a40b9eeac5c928db835779f Mon Sep 17 00:00:00 2001 From: Daniel Langbein Date: Thu, 12 Jan 2023 15:38:01 +0100 Subject: [PATCH] add unix_socket_output and btrfs_send_chunks_v3 --- src/p1st/btrfs_receive_chunks.py | 60 ----------- src/p1st/btrfs_receive_chunks_v2.py | 14 +-- src/p1st/btrfs_send_chunks_v2.py | 27 +++-- src/p1st/btrfs_send_chunks_v3.py | 154 ++++++++++++++++++++++++++++ src/p1st/unix_socket_output.py | 17 +++ 5 files changed, 197 insertions(+), 75 deletions(-) delete mode 100644 src/p1st/btrfs_receive_chunks.py create mode 100644 src/p1st/btrfs_send_chunks_v3.py create mode 100644 src/p1st/unix_socket_output.py diff --git a/src/p1st/btrfs_receive_chunks.py b/src/p1st/btrfs_receive_chunks.py deleted file mode 100644 index 6d113a0..0000000 --- a/src/p1st/btrfs_receive_chunks.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -import argparse -from pathlib import Path - -from p1st.exec_print_receive import execute_print_receive_chunks -from p1st.common import _get_chunk_file, _get_remote_socket, _get_chunk_tmpl - - -def main(): - args = parse_args() - - target_path: Path = args.target_path - command = ['btrfs', 'receive', str(target_path.parent)] - target_socket = _get_remote_socket(target_path) - - execute_print_receive_chunks( - command=command, - socket_file=target_socket, - chunk_file_tmpl=args.chunk_tmpl, - get_chunk_file=_get_chunk_file, - ) - - -def parse_args(): - parser = argparse.ArgumentParser(prog='btrfs-receive-chunks') - - parser.add_argument('--chunk-tmpl', - help='During btrfs-receive, chunks are saved as "CHUNK_TMPL.CHUNK_NUMBER". ' - 'The default value of CHUNK_TMPL is "SUBVOLUME.CHUNK". ' - 'One can change it to e.g. "/tmp/chunk/SUBVOLUME".', - dest='chunk_tmpl', - default=None, - type=Path, - metavar='CHUNK_TMPL' - ) - - parser.add_argument('target_path', - help='Path where the subvolume will be created. ' - 'The last component of the path ' - 'must be equal to the name of the subvolume on the sending side.', - type=Path, - metavar='SUBVOLUME' - ) - - args = parser.parse_args() - - # Make all paths absolute. - if args.chunk_tmpl: - args.chunk_tmpl = args.chunk_tmpl.absolute() - args.target_path = args.target_path.absolute() - - if not args.chunk_tmpl: - args.chunk_tmpl = _get_chunk_tmpl(args.target_path) - - return args - - -if __name__ == '__main__': - main() diff --git a/src/p1st/btrfs_receive_chunks_v2.py b/src/p1st/btrfs_receive_chunks_v2.py index c5242f5..d3626d3 100644 --- a/src/p1st/btrfs_receive_chunks_v2.py +++ b/src/p1st/btrfs_receive_chunks_v2.py @@ -11,14 +11,16 @@ from p1st.unix_sock_input import accept_loop_until_message def main(): args = parse_args() + # + chunk_dir: Path = args.chunk_dir + target_path: Path = args.target_path - command = ['btrfs', 'receive', str(args.target_path.parent)] + chunk_dir.mkdir(parents=True, exist_ok=True) + + command = ['btrfs', 'receive', str(target_path.parent)] def handle_chunks(queue_put: Queue.put): - chunk_dir: Path = args.chunk_dir - chunk_dir.mkdir(parents=True, exist_ok=True) - - socket_file: Path = args.target_path.parent.joinpath(f'{args.target_path.name}.SOCKET') + socket_file: Path = target_path.parent.joinpath(f'{target_path.name}.SOCKET') socket_file.parent.mkdir(parents=True, exist_ok=True) print(f'Listening on socket {socket_file}') @@ -32,7 +34,7 @@ def main(): msg = accept_loop_until_message(sock, messages) last_chunk = msg == b'EOF\n' - chunk_path = chunk_dir.joinpath(f'{args.target_path.name}.CHUNK.{chunk_no}') + chunk_path = chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') queue_put((chunk_path, last_chunk)) if last_chunk: diff --git a/src/p1st/btrfs_send_chunks_v2.py b/src/p1st/btrfs_send_chunks_v2.py index c352a9b..64a69de 100644 --- a/src/p1st/btrfs_send_chunks_v2.py +++ b/src/p1st/btrfs_send_chunks_v2.py @@ -10,33 +10,42 @@ from p1st.repeat import repeat_until_successful def main(): args = parse_args() + # + ssh_target = args.ssh_target + chunk_size = args.chunk_size + compressed_data: bool = args.compressed_data + child: Path = args.child + parent: Path | None = args.parent + chunk_dir: Path = args.chunk_dir + target_chunk_dir: Path = args.target_chunk_dir + target_path: Path = args.target_path command_parts = ( ['btrfs', 'send'], - ['-p', str(args.parent)] if args.parent else [], - ['--compress-data'] if args.compressed_data else [], - [str(args.child)] + ['-p', str(parent)] if parent else [], + ['--compress-data'] if compressed_data else [], + [str(child)] ) command = [x for xs in command_parts for x in xs] def get_chunk_path(chunk_no: int) -> Path: - return args.chunk_dir.joinpath(f'{args.child.name}.CHUNK.{chunk_no}') + return chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') def handle_chunk(chunk_no: int, last_chunk: bool): chunk_path = get_chunk_path(chunk_no) - target_chunk_path = args.target_chunk_dir.joinpath(f'{args.child.name}.CHUNK.{chunk_no}') + target_chunk_path = target_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') print(f'Handling chunk {chunk_path}') usr_confirmation_socket = chunk_path.parent.joinpath(f'{chunk_path.name}.SOCKET') - target_socket = args.target_path.parent.joinpath(f'{args.target_path.name}.SOCKET') + target_socket = target_path.parent.joinpath(f'{target_path.name}.SOCKET') message = 'EOF' if last_chunk else 'OK' rsync_cmd = ['rsync', str(chunk_path), - f'{args.ssh_target}:{str(target_chunk_path)}'] + f'{ssh_target}:{str(target_chunk_path)}'] inform_cmd = ['ssh', - args.ssh_target, + ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] repeat_until_successful(rsync_cmd, usr_confirmation_socket) @@ -46,7 +55,7 @@ def main(): command=command, get_chunk_path=get_chunk_path, handle_chunk=handle_chunk, - chunk_size=args.chunk_size, + chunk_size=chunk_size, ) exit(returncode) diff --git a/src/p1st/btrfs_send_chunks_v3.py b/src/p1st/btrfs_send_chunks_v3.py new file mode 100644 index 0000000..7ab2110 --- /dev/null +++ b/src/p1st/btrfs_send_chunks_v3.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import argparse +import shlex +import socket +from pathlib import Path + +from p1st.exec_produce_chunks import execute_produce_chunks +from p1st.repeat import repeat_until_successful +from p1st.unix_sock_input import accept_loop_until_message +from p1st.unix_socket_output import write_message + + +def main(): + args = parse_args() + # + chunk_size = args.chunk_size + compressed_data: bool = args.compressed_data + child: Path = args.child + parent: Path | None = args.parent + chunk_dir: Path = args.chunk_dir + + chunk_dir.mkdir(parents=True, exist_ok=True) + + command_parts = ( + ['btrfs', 'send'], + ['-p', str(parent)] if parent else [], + ['--compress-data'] if compressed_data else [], + [str(child)] + ) + command = [x for xs in command_parts for x in xs] + + def get_chunk_path(chunk_no: int) -> Path: + return chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') + + def handle_chunk(chunk_no: int, last_chunk: bool): + chunk_path = get_chunk_path(chunk_no) + print(f'Handling chunk {chunk_path}') + + # Create in_socket + in_socket: Path = chunk_dir.parent.joinpath(f'{child.name}.SOCKET.in') + in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + in_sock.bind(str(in_socket)) + in_sock.listen(1) + + # Write to out_socket + # -> Inform receiving side, that the next chung can be transferred. + out_socket: Path = chunk_dir.parent.joinpath(f'{child.name}.SOCKET.out') + message = b'EOF\n' if last_chunk else b'OK\n' + write_message(out_socket, message) + + # Read from in_socket + # -> Receiving side informs us after the chunk has been transferred. + _command = accept_loop_until_message(in_sock, [b'OK\n']) + + # Close in_socket + in_sock.close() + in_socket.unlink(missing_ok=False) + + returncode = execute_produce_chunks( + command=command, + get_chunk_path=get_chunk_path, + handle_chunk=handle_chunk, + chunk_size=chunk_size, + ) + exit(returncode) + + +def parse_args(): + parser = argparse.ArgumentParser(prog='btrfs-send-chunks') + + parser.add_argument('-p', + help='Path to parent subvolume; forwarded to btrfs-send.', + dest='parent', + default=None, + type=Path, + metavar='PARENT_SUBVOLUME' + ) + + parser.add_argument('--compressed-data', + help='Forwarded to btrfs-send.', + dest='compressed_data', + action='store_true', + default=False, + ) + + parser.add_argument('--chunk-size', + help='Size in bytes; defaults to 128 MB.', + dest='chunk_size', + type=int, + default=128 * 1024 * 1024, + ) + + parser.add_argument('--chunk-dir', + help='Chunks are saved in this directory. ' + 'Defaults to parent directory of CHILD.', + dest='chunk_dir', + type=Path, + metavar='CHUNK_DIR', + default=None, + ) + + parser.add_argument('--target-chunk-dir', + help='Chunks are saved in this directory on SSH_TARGET. ' + 'Must be an absolute path. ' + 'Defaults to parent directory of TARGET_PATH.', + dest='target_chunk_dir', + type=Path, + metavar='TARGET_CHUNK_DIR', + default=None, + ) + + parser.add_argument('child', + help='Path to child subvolume. Forwarded to btrfs-send.', + type=Path, + metavar='CHILD_SUBVOLUME' + ) + + parser.add_argument('ssh_target', + help='Hostname of target computer; as configured in ~/.ssh/config.', + metavar='SSH_TARGET' + ) + + parser.add_argument('target_path', + help='Absolute path where the subvolume will be created on SSH_TARGET.', + type=Path, + metavar='TARGET_PATH' + ) + + args = parser.parse_args() + + # Make all paths absolute. Set default values. + args.child = args.child.absolute() + if not args.target_path.is_absolute(): + raise ValueError('TARGET_PATH must be absolute') + # + if args.chunk_dir: + args.chunk_dir = args.chunk_dir.absolute() + else: # Default value + args.chunk_dir = args.child.absolute().parent + if args.target_chunk_dir: + if not args.target_chunk_dir.is_absolute(): + raise ValueError('TARGET_CHUNK_DIR must be absolute') + else: # Default value + args.target_chunk_dir = args.target_path.parent + # + if args.parent: + args.parent = args.parent.absolute() + + return args + + +if __name__ == '__main__': + main() diff --git a/src/p1st/unix_socket_output.py b/src/p1st/unix_socket_output.py new file mode 100644 index 0000000..39e04e4 --- /dev/null +++ b/src/p1st/unix_socket_output.py @@ -0,0 +1,17 @@ +import socket +from pathlib import Path + + +def write_message(socket_file: Path, + message: bytes): + + # INSPIRATION: https://pymotw.com/3/socket/uds.html + + print(f'Connecting to socket {socket_file}') + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(str(socket_file)) + try: + sock.sendall(message) + finally: + print(f'Closing socket {socket_file}') + sock.close()