From 780fe7f1d165b62520775e48a627a6bfea1ddaa4 Mon Sep 17 00:00:00 2001 From: Daniel Langbein Date: Sat, 18 Mar 2023 17:52:49 +0100 Subject: [PATCH] refactor: extract exec_{send,receive}_chunks from btrfs_{send,receive} --- .../btrfs_receive_chunks_active.py | 61 ++-------------- .../btrfs_receive_chunks_passive.py | 47 +++--------- .../btrfs_send_chunks_active.py | 65 ++++++----------- .../btrfs_send_chunks_passive.py | 43 ++--------- src/subprocess_util/common.py | 13 ---- .../exec_receive_chunks_active.py | 73 +++++++++++++++++++ .../exec_receive_chunks_passive.py | 46 ++++++++++++ .../exec_send_chunks_active.py | 49 +++++++++++++ .../exec_send_chunks_passive.py | 48 ++++++++++++ 9 files changed, 257 insertions(+), 188 deletions(-) create mode 100644 src/subprocess_util/exec_receive_chunks_active.py create mode 100644 src/subprocess_util/exec_receive_chunks_passive.py create mode 100644 src/subprocess_util/exec_send_chunks_active.py create mode 100644 src/subprocess_util/exec_send_chunks_passive.py diff --git a/src/subprocess_util/btrfs_receive_chunks_active.py b/src/subprocess_util/btrfs_receive_chunks_active.py index 1a15819..194923f 100644 --- a/src/subprocess_util/btrfs_receive_chunks_active.py +++ b/src/subprocess_util/btrfs_receive_chunks_active.py @@ -1,13 +1,9 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse -import shlex from pathlib import Path -from queue import Queue -from subprocess_util.common import Message1, Message2, get_chunk_path, get_inform1_path, get_inform2_path -from subprocess_util.exec_consume_chunks import execute_consume_chunks -from subprocess_util.repeat import repeat_until_successful +from subprocess_util.exec_receive_chunks_active import exec_receive_chunks_active def main(): @@ -22,57 +18,12 @@ def main(): command = ['btrfs', 'receive', str(target_path.parent)] - def handle_chunks(queue_put: Queue.put): - chunk_no = 1 - while True: - target_chunk_path = get_chunk_path(target_chunk_dir, target_path, chunk_no) - source_chunk_path = get_chunk_path(source_chunk_dir, target_path, chunk_no) - in_socket: Path = get_inform2_path(source_chunk_dir, target_path) - - user_socket = target_chunk_dir.parent.joinpath(f'{target_path.name}.SOCKET.user') - - inform_path = get_inform1_path(source_chunk_path) - inform_cmd = ['ssh', - ssh_source, - f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}'] - messages = [Message1.OK.value, Message1.EOF.value] - rsync_cmd = ['rsync', - f'{ssh_source}:{source_chunk_path}', - f'{target_chunk_path}'] - inform2_cmd = ['ssh', - ssh_source, - f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(in_socket))}"] - - # Wait until next chunk can be transferred from sending side. - # - # It can happen,that the sending side has not yet saved the next chunk. - # In that case `cat ...` will fail. - # We will retry 3 times, each time after 5 seconds sleep. - # - # If all 4 `cat ...` attempts failed, we wait until the user has inspected the problem. - msg = repeat_until_successful(inform_cmd, user_socket, retries=3, retry_delay_seconds=5) - if msg not in messages: - raise ValueError(f'Invalid message: {msg}') - last_chunk = msg == Message1.EOF.value - - # Transfer chunk. - repeat_until_successful(rsync_cmd, user_socket) - - # Add chunk path to queue. - queue_put((target_chunk_path, last_chunk)) - - # Inform sending side about successful transfer. - repeat_until_successful(inform2_cmd, user_socket) - - if last_chunk: - break - chunk_no += 1 - - returncode = execute_consume_chunks( - command=command, - handle_chunks=handle_chunks, + exec_receive_chunks_active( + command, + ssh_source, + source_chunk_dir, + target_chunk_dir, ) - exit(returncode) def parse_args(): diff --git a/src/subprocess_util/btrfs_receive_chunks_passive.py b/src/subprocess_util/btrfs_receive_chunks_passive.py index a316976..940dec2 100644 --- a/src/subprocess_util/btrfs_receive_chunks_passive.py +++ b/src/subprocess_util/btrfs_receive_chunks_passive.py @@ -1,12 +1,9 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse -import socket from pathlib import Path -from queue import Queue -from subprocess_util.exec_consume_chunks import execute_consume_chunks -from subprocess_util.unix_sock_input import accept_loop_until_message +from subprocess_util.exec_receive_chunks_passive import exec_receive_chunks_passive def main(): @@ -15,39 +12,11 @@ def main(): target_chunk_dir: Path = args.chunk_dir target_path: Path = args.target_path - target_chunk_dir.mkdir(parents=True, exist_ok=True) - command = ['btrfs', 'receive', str(target_path.parent)] - def handle_chunks(queue_put: Queue.put): - socket_file: Path = target_path.parent.joinpath(f'{target_path.name}.SOCKET') - socket_file.parent.mkdir(parents=True, exist_ok=True) - - print(f'Listening on socket {socket_file}') - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.bind(str(socket_file)) - sock.listen(1) - - chunk_no = 1 - messages = [b'OK\n', b'EOF\n'] - while True: - msg = accept_loop_until_message(sock, messages) - - last_chunk = msg == b'EOF\n' - chunk_path = target_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') - queue_put((chunk_path, last_chunk)) - - if last_chunk: - break - chunk_no += 1 - - print(f'Closing socket {socket_file}') - sock.close() - socket_file.unlink(missing_ok=False) - - returncode = execute_consume_chunks( - command=command, - handle_chunks=handle_chunks, + returncode = exec_receive_chunks_passive( + command, + target_chunk_dir, ) exit(returncode) @@ -76,10 +45,12 @@ def parse_args(): # Make all paths absolute. Set default values. args.target_path = args.target_path.absolute() - if args.chunk_dir: - args.chunk_dir = args.chunk_dir.absolute() + # + if args.chunk_dir is None: + # Default value + args.chunk_dir = args.target_path.parent.joinpath(f'{args.target_path.name}.chunk') else: - args.chunk_dir = args.target_path.absolute().parent + args.chunk_dir = args.chunk_dir.absolute() return args diff --git a/src/subprocess_util/btrfs_send_chunks_active.py b/src/subprocess_util/btrfs_send_chunks_active.py index b6d0743..ab23189 100644 --- a/src/subprocess_util/btrfs_send_chunks_active.py +++ b/src/subprocess_util/btrfs_send_chunks_active.py @@ -1,11 +1,9 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse -import shlex from pathlib import Path -from subprocess_util.exec_produce_chunks import execute_produce_chunks -from subprocess_util.repeat import repeat_until_successful +from subprocess_util.exec_send_chunks_active import exec_send_chunks_active def main(): @@ -18,7 +16,6 @@ def main(): parent: Path | None = args.parent source_chunk_dir: Path = args.chunk_dir target_chunk_dir: Path = args.target_chunk_dir - target_path: Path = args.target_path command_parts = ( ['btrfs', 'send'], @@ -28,36 +25,12 @@ def main(): ) command = [x for xs in command_parts for x in xs] - def get_source_chunk_path(chunk_no: int) -> Path: - return source_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') - - def handle_chunk(chunk_no: int, last_chunk: bool): - source_chunk_path = get_source_chunk_path(chunk_no) - target_chunk_path = target_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') - print(f'Handling chunk {source_chunk_path}') - - usr_confirmation_socket = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.SOCKET') - target_socket = target_path.parent.joinpath(f'{target_path.name}.SOCKET') - - message = 'EOF' if last_chunk else 'OK' - - rsync_cmd = ['rsync', - str(source_chunk_path), - f'{ssh_target}:{str(target_chunk_path)}'] - inform_cmd = ['ssh', - ssh_target, - f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] - - repeat_until_successful(rsync_cmd, usr_confirmation_socket) - # Delete local chunk after it has been transferred. - source_chunk_path.unlink(missing_ok=False) - repeat_until_successful(inform_cmd, usr_confirmation_socket) - - returncode = execute_produce_chunks( - command=command, - get_chunk_path=get_source_chunk_path, - handle_chunk=handle_chunk, - chunk_size=chunk_size, + returncode = exec_send_chunks_active( + command, + ssh_target, + source_chunk_dir, + target_chunk_dir, + chunk_size ) exit(returncode) @@ -89,7 +62,7 @@ def parse_args(): parser.add_argument('--chunk-dir', help='Chunks are saved in this directory. ' - 'Defaults to parent directory of CHILD.', + 'Defaults to folder `.chunk` next to CHILD_SUBVOLUME.', dest='chunk_dir', type=Path, metavar='CHUNK_DIR', @@ -99,7 +72,7 @@ def parse_args(): parser.add_argument('--target-chunk-dir', help='Chunks are saved in this directory on SSH_TARGET. ' 'Must be an absolute path. ' - 'Defaults to parent directory of TARGET_PATH.', + 'Defaults to directory `.chunk` next to TARGET_PATH.', dest='target_chunk_dir', type=Path, metavar='TARGET_CHUNK_DIR', @@ -118,7 +91,7 @@ def parse_args(): ) parser.add_argument('target_path', - help='Absolute path where the subvolume will be created on SSH_TARGET.', + help='Absolute path to the read-only subvolume to be created on SSH_TARGET.', type=Path, metavar='TARGET_PATH' ) @@ -127,20 +100,24 @@ def parse_args(): # Make all paths absolute. Set default values. args.child = args.child.absolute() + # if not args.target_path.is_absolute(): raise ValueError('TARGET_PATH must be absolute') # - if args.chunk_dir: + if args.chunk_dir is None: + # Default value + args.chunk_dir = args.child.parent.joinpath(f'{args.child.name}.chunk') + else: args.chunk_dir = args.chunk_dir.absolute() - else: # Default value - args.chunk_dir = args.child.absolute().parent - if args.target_chunk_dir: + # + if args.target_chunk_dir is None: + # Default value + args.target_chunk_dir = args.target_path.parent.joinpath(f'{args.target_path.name}.chunk') + else: if not args.target_chunk_dir.is_absolute(): raise ValueError('TARGET_CHUNK_DIR must be absolute') - else: # Default value - args.target_chunk_dir = args.target_path.parent # - if args.parent: + if args.parent is not None: args.parent = args.parent.absolute() return args diff --git a/src/subprocess_util/btrfs_send_chunks_passive.py b/src/subprocess_util/btrfs_send_chunks_passive.py index e4907ab..b2c9a49 100644 --- a/src/subprocess_util/btrfs_send_chunks_passive.py +++ b/src/subprocess_util/btrfs_send_chunks_passive.py @@ -1,12 +1,9 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse -import socket from pathlib import Path -from subprocess_util.common import Message1, Message2, get_inform1_path, get_inform2_path, get_chunk_path -from subprocess_util.exec_produce_chunks import execute_produce_chunks -from subprocess_util.unix_sock_input import accept_loop_until_message +from subprocess_util.exec_send_chunks_passive import exec_send_chunks_passive def main(): @@ -27,40 +24,10 @@ def main(): [str(child)] ) command = [x for xs in command_parts for x in xs] - - def get_source_chunk_path(chunk_no: int) -> Path: - return get_chunk_path(source_chunk_dir, child, chunk_no) - - def handle_chunk(chunk_no: int, last_chunk: bool): - source_chunk_path = get_source_chunk_path(chunk_no) - print(f'Handling chunk {source_chunk_path}') - - # Create in_socket - in_socket: Path = get_inform2_path(source_chunk_dir, child) - in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - in_sock.bind(str(in_socket)) - in_sock.listen(1) - - # Inform receiving side, that the next chunk can be transferred. - inform_path = get_inform1_path(source_chunk_path) - inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value) - - # Read from in_socket - # -> Receiving side informs us after the chunk has been transferred. - _command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value]) - - # Delete local chunk after it has been transferred. - source_chunk_path.unlink(missing_ok=False) - - # Close in_socket - in_sock.close() - in_socket.unlink(missing_ok=False) - - returncode = execute_produce_chunks( - command=command, - get_chunk_path=get_source_chunk_path, - handle_chunk=handle_chunk, - chunk_size=chunk_size, + returncode = exec_send_chunks_passive( + command, + source_chunk_dir, + chunk_size, ) exit(returncode) diff --git a/src/subprocess_util/common.py b/src/subprocess_util/common.py index 8aa80b5..ec80eba 100644 --- a/src/subprocess_util/common.py +++ b/src/subprocess_util/common.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- from enum import Enum -from pathlib import Path class Message1(Enum): @@ -12,15 +11,3 @@ class Message1(Enum): class Message2(Enum): OK_BINARY = b'OK\n' OK_STR = 'OK\n' - - -def get_chunk_path(chunk_dir: Path, subvolume: Path, chunk_no: int) -> Path: - return chunk_dir.joinpath(f'{subvolume.name}.CHUNK.{chunk_no}') - - -def get_inform1_path(source_chunk_path: Path) -> Path: - return source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') - - -def get_inform2_path(source_chunk_dir: Path, subvolume: Path) -> Path: - return source_chunk_dir.joinpath(f'{subvolume.name}.SOCKET.in') diff --git a/src/subprocess_util/exec_receive_chunks_active.py b/src/subprocess_util/exec_receive_chunks_active.py new file mode 100644 index 0000000..d981f96 --- /dev/null +++ b/src/subprocess_util/exec_receive_chunks_active.py @@ -0,0 +1,73 @@ +import shlex +from pathlib import Path +from queue import Queue + +from subprocess_util.common import Message2, Message1 +from subprocess_util.exec_consume_chunks import execute_consume_chunks +from subprocess_util.repeat import repeat_until_successful + + +def exec_receive_chunks_active( + command: list[str], + ssh_source: str, + source_chunk_dir: Path, + target_chunk_dir: Path, +) -> int: + def get_source_chunk_path(chunk_no: int) -> Path: + return source_chunk_dir.joinpath(f'{chunk_no}') + + def get_target_chunk_path(chunk_no: int) -> Path: + return target_chunk_dir.joinpath(f'{chunk_no}') + + def handle_chunks(queue_put: Queue.put): + # TODO: get starting value of chunk_no as argument + chunk_no = 1 + while True: + target_chunk_path = get_target_chunk_path(chunk_no) + source_chunk_path = get_source_chunk_path(chunk_no) + source_socket = source_chunk_dir.joinpath('SOCKET') + + usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET') + + inform_path = source_chunk_path.joinpath(f'{chunk_no}.COMPLETE') + inform_cmd = ['ssh', + ssh_source, + f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}'] + messages = [Message1.OK.value, Message1.EOF.value] + rsync_cmd = ['rsync', + f'{ssh_source}:{source_chunk_path}', + f'{target_chunk_path}'] + inform2_cmd = ['ssh', + ssh_source, + f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(source_socket))}"] + + # Wait until next chunk can be transferred from sending side. + # + # It can happen, that the sending side has not yet saved the next chunk. + # In that case `cat ...` will fail. + # We will retry 3 times, each time after 5 seconds sleep. + # + # If all 4 `cat ...` attempts failed, we wait until the user has inspected the problem. + msg = repeat_until_successful(inform_cmd, usr_confirmation_socket, retries=3, retry_delay_seconds=5) + if msg not in messages: + raise ValueError(f'Invalid message: {msg}') + last_chunk = msg == Message1.EOF.value + + # Transfer chunk. + repeat_until_successful(rsync_cmd, usr_confirmation_socket) + + # Add chunk path to queue. + queue_put((target_chunk_path, last_chunk)) + + # Inform sending side about successful transfer. + repeat_until_successful(inform2_cmd, usr_confirmation_socket) + + if last_chunk: + break + chunk_no += 1 + + returncode = execute_consume_chunks( + command=command, + handle_chunks=handle_chunks, + ) + return returncode diff --git a/src/subprocess_util/exec_receive_chunks_passive.py b/src/subprocess_util/exec_receive_chunks_passive.py new file mode 100644 index 0000000..4493f59 --- /dev/null +++ b/src/subprocess_util/exec_receive_chunks_passive.py @@ -0,0 +1,46 @@ +import socket +from pathlib import Path +from queue import Queue + +from subprocess_util.exec_consume_chunks import execute_consume_chunks +from subprocess_util.unix_sock_input import accept_loop_until_message + + +def exec_receive_chunks_passive( + command: list[str], + target_chunk_dir: Path, +) -> int: + def get_target_chunk_path(chunk_no: int) -> Path: + return target_chunk_dir.joinpath(f'{chunk_no}') + + def handle_chunks(queue_put: Queue.put): + target_socket = target_chunk_dir.joinpath('SOCKET') + target_socket.parent.mkdir(parents=True, exist_ok=True) + + print(f'Listening on socket {target_socket}') + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(str(target_socket)) + sock.listen(1) + + chunk_no = 1 + messages = [b'OK\n', b'EOF\n'] + while True: + msg = accept_loop_until_message(sock, messages) + + last_chunk = msg == b'EOF\n' + target_chunk_path = get_target_chunk_path(chunk_no) + queue_put((target_chunk_path, last_chunk)) + + if last_chunk: + break + chunk_no += 1 + + print(f'Closing socket {target_socket}') + sock.close() + target_socket.unlink(missing_ok=False) + + returncode = execute_consume_chunks( + command=command, + handle_chunks=handle_chunks, + ) + return returncode diff --git a/src/subprocess_util/exec_send_chunks_active.py b/src/subprocess_util/exec_send_chunks_active.py new file mode 100644 index 0000000..871c4b6 --- /dev/null +++ b/src/subprocess_util/exec_send_chunks_active.py @@ -0,0 +1,49 @@ +import shlex +from pathlib import Path + +from subprocess_util.exec_produce_chunks import execute_produce_chunks +from subprocess_util.repeat import repeat_until_successful + + +def exec_send_chunks_active( + command: list[str], + ssh_target: str, + source_chunk_dir: Path, + target_chunk_dir: Path, + chunk_size: int = 128 * 1024 * 1024, +) -> int: + def get_source_chunk_path(chunk_no: int) -> Path: + return source_chunk_dir.joinpath(f'{chunk_no}') + + def get_target_chunk_path(chunk_no: int) -> Path: + return target_chunk_dir.joinpath(f'{chunk_no}') + + def handle_chunk(chunk_no: int, last_chunk: bool): + source_chunk_path = get_source_chunk_path(chunk_no) + target_chunk_path = get_target_chunk_path(chunk_no) + print(f'Handling chunk {source_chunk_path}') + + usr_confirmation_socket = source_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET') + target_socket = target_chunk_dir.joinpath('SOCKET') + + message = 'EOF' if last_chunk else 'OK' + + rsync_cmd = ['rsync', + str(source_chunk_path), + f'{ssh_target}:{str(target_chunk_path)}'] + inform_cmd = ['ssh', + ssh_target, + f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] + + repeat_until_successful(rsync_cmd, usr_confirmation_socket) + # Delete local chunk after it has been transferred. + source_chunk_path.unlink(missing_ok=False) + repeat_until_successful(inform_cmd, usr_confirmation_socket) + + returncode = execute_produce_chunks( + command=command, + get_chunk_path=get_source_chunk_path, + handle_chunk=handle_chunk, + chunk_size=chunk_size, + ) + return returncode diff --git a/src/subprocess_util/exec_send_chunks_passive.py b/src/subprocess_util/exec_send_chunks_passive.py new file mode 100644 index 0000000..1c68e5c --- /dev/null +++ b/src/subprocess_util/exec_send_chunks_passive.py @@ -0,0 +1,48 @@ +import socket +from pathlib import Path + +from subprocess_util.common import Message1, Message2 +from subprocess_util.exec_produce_chunks import execute_produce_chunks +from subprocess_util.unix_sock_input import accept_loop_until_message + + +def exec_send_chunks_passive( + command: list[str], + source_chunk_dir: Path, + chunk_size: int = 128 * 1024 * 1024, +) -> int: + def get_source_chunk_path(chunk_no: int) -> Path: + return source_chunk_dir.joinpath(f'{chunk_no}') + + def handle_chunk(chunk_no: int, last_chunk: bool): + source_chunk_path = get_source_chunk_path(chunk_no) + print(f'Handling chunk {source_chunk_path}') + + # Create in_socket + source_socket = source_chunk_dir.joinpath('SOCKET') + in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + in_sock.bind(str(source_socket)) + in_sock.listen(1) + + # Inform receiving side, that the next chunk can be transferred. + inform_path = source_chunk_path.joinpath(f'{chunk_no}.COMPLETE') + inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value) + + # Read from in_socket + # -> Receiving side informs us after the chunk has been transferred. + _command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value]) + + # Delete local chunk after it has been transferred. + source_chunk_path.unlink(missing_ok=False) + + # Close in_socket + in_sock.close() + source_socket.unlink(missing_ok=False) + + returncode = execute_produce_chunks( + command=command, + get_chunk_path=get_source_chunk_path, + handle_chunk=handle_chunk, + chunk_size=chunk_size, + ) + return returncode