From ddd5b9a1fe8f8648f82fc81c701cd684d9c096b2 Mon Sep 17 00:00:00 2001 From: Daniel Langbein Date: Thu, 12 Jan 2023 14:33:20 +0100 Subject: [PATCH] rename to exec_produce_chunks; add exec_consume_chunks; add btrfs_receive_chunks_v2 --- setup.cfg | 2 +- src/p1st/btrfs_receive_chunks_v2.py | 86 +++++++++++++ src/p1st/btrfs_send_chunks.py | 115 ------------------ src/p1st/btrfs_send_chunks_v2.py | 5 +- src/p1st/exec_consume_chunks.py | 92 ++++++++++++++ ..._print_chunk.py => exec_produce_chunks.py} | 29 +++-- src/p1st/test.py | 18 ++- 7 files changed, 218 insertions(+), 129 deletions(-) create mode 100644 src/p1st/btrfs_receive_chunks_v2.py delete mode 100644 src/p1st/btrfs_send_chunks.py create mode 100644 src/p1st/exec_consume_chunks.py rename src/p1st/{exec_print_chunk.py => exec_produce_chunks.py} (81%) diff --git a/setup.cfg b/setup.cfg index 131d758..0bedd93 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,6 @@ where = src [options.entry_points] ; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html console_scripts = - btrfs-send-chunks = p1st.btrfs_send_chunks:main btrfs-send-chunks-v2 = p1st.btrfs_send_chunks_v2:main btrfs-receive-chunks = p1st.btrfs_receive_chunks:main + btrfs-receive-chunks-v2 = p1st.btrfs_receive_chunks_v2:main diff --git a/src/p1st/btrfs_receive_chunks_v2.py b/src/p1st/btrfs_receive_chunks_v2.py new file mode 100644 index 0000000..c5242f5 --- /dev/null +++ b/src/p1st/btrfs_receive_chunks_v2.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import argparse +import socket +from pathlib import Path +from queue import Queue + +from p1st.exec_consume_chunks import execute_consume_chunks +from p1st.unix_sock_input import accept_loop_until_message + + +def main(): + args = parse_args() + + command = ['btrfs', 'receive', str(args.target_path.parent)] + + def handle_chunks(queue_put: Queue.put): + chunk_dir: Path = args.chunk_dir + chunk_dir.mkdir(parents=True, exist_ok=True) + + socket_file: Path = args.target_path.parent.joinpath(f'{args.target_path.name}.SOCKET') + socket_file.parent.mkdir(parents=True, exist_ok=True) + + print(f'Listening on socket {socket_file}') + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(str(socket_file)) + sock.listen(1) + + chunk_no = 1 + messages = [b'OK\n', b'EOF\n'] + while True: + msg = accept_loop_until_message(sock, messages) + + last_chunk = msg == b'EOF\n' + chunk_path = chunk_dir.joinpath(f'{args.target_path.name}.CHUNK.{chunk_no}') + queue_put((chunk_path, last_chunk)) + + if last_chunk: + break + chunk_no += 1 + + print(f'Closing socket {socket_file}') + sock.close() + socket_file.unlink(missing_ok=False) + + returncode = execute_consume_chunks( + command=command, + handle_chunks=handle_chunks, + ) + exit(returncode) + + +def parse_args(): + parser = argparse.ArgumentParser(prog='btrfs-receive-chunks') + + parser.add_argument('--chunk-dir', + help='Chunks are saved in this directory. ' + 'Defaults to parent directory of SUBVOLUME.', + dest='chunk_dir', + type=Path, + metavar='CHUNK_DIR', + default=None, + ) + + parser.add_argument('target_path', + help='Path where the subvolume will be created. ' + 'The last component of the path ' + 'must be equal to the name of the subvolume on the sending side.', + type=Path, + metavar='SUBVOLUME' + ) + + args = parser.parse_args() + + # Make all paths absolute. Set default values. + args.target_path = args.target_path.absolute() + if args.chunk_dir: + args.chunk_dir = args.chunk_dir.absolute() + else: + args.chunk_dir = args.target_path.absolute().parent + + return args + + +if __name__ == '__main__': + main() diff --git a/src/p1st/btrfs_send_chunks.py b/src/p1st/btrfs_send_chunks.py deleted file mode 100644 index 5b4b441..0000000 --- a/src/p1st/btrfs_send_chunks.py +++ /dev/null @@ -1,115 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -import argparse -from pathlib import Path -import shlex -from typing import Callable - -from p1st.exec_print_transfer import execute_print_transfer_chunks -from p1st.common import _get_chunk_file, _get_remote_socket, _get_chunk_tmpl -from p1st.transfer_inform import transfer_inform - - -def main(): - args = parse_args() - - command_parts = ( - ['btrfs', 'send'], - ['-p', str(args.parent)] if args.parent else [], - ['--compress-data'] if args.compressed_data else [], - [str(args.child)] - ) - command = [x for xs in command_parts for x in xs] - - chunk_tmpl = _get_chunk_tmpl(args.child) - target_chunk_tmpl = _get_chunk_tmpl(args.target_path) - - execute_print_transfer_chunks( - command=command, - chunk_file_tmpl=chunk_tmpl, - chunk_transfer_fun=chunk_transfer_fun, - chunk_transfer_args=(args.ssh_target, - args.target_path, - _get_chunk_file, - target_chunk_tmpl), - chunk_size=args.chunk_size, - get_chunk_file=_get_chunk_file, - ) - - -def parse_args(): - parser = argparse.ArgumentParser(prog='btrfs-send-chunks') - - parser.add_argument('-p', - help='Path to parent subvolume; forwarded to btrfs-send.', - dest='parent', - default=None, - type=Path, - metavar='PARENT_SUBVOLUME' - ) - - parser.add_argument('--compressed-data', - help='Forwarded to btrfs-send.', - dest='compressed_data', - action='store_true', - default=False, - ) - - parser.add_argument('--chunk-size', - help='Size in bytes; defaults to 128 MB.', - dest='chunk_size', - type=int, - default=128 * 1024 * 1024, - ) - - parser.add_argument('child', - help='Path to child subvolume. Forwarded to btrfs-send.', - type=Path, - metavar='CHILD_SUBVOLUME' - ) - - parser.add_argument('ssh_target', - help='Hostname of target computer; as configured in ~/.ssh/config.', - metavar='SSH_TARGET' - ) - - parser.add_argument('target_path', - help='Path where the subvolume will be created on the remote ssh target.', - type=Path, - metavar='TARGET_PATH' - ) - - args = parser.parse_args() - - # Make all paths absolute. - if args.parent: - args.parent = args.parent.absolute() - args.child = args.child.absolute() - if args.target_path: - args.target_path = args.target_path.absolute() - - return args - - -def chunk_transfer_fun(chunk_file: Path, ct: int, eof: bool, - ssh_target: str, - target_path: Path, - get_chunk_file: Callable[[Path, int], Path], - target_chunk_tmpl: Path, - ): - target_chunk = get_chunk_file(target_chunk_tmpl, ct) - rsync_cmd = ['rsync', str(chunk_file), f'{ssh_target}:{str(target_chunk)}'] - - message = 'EOF' if eof else 'OK' - target_socket = _get_remote_socket(target_path) - inform_cmd = ['ssh', ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] - - transfer_inform( - rsync_cmd=rsync_cmd, - inform_cmd=inform_cmd, - user_input_file=chunk_file.parent.joinpath(f'{chunk_file.name}.SOCKET'), - ) - - -if __name__ == '__main__': - main() diff --git a/src/p1st/btrfs_send_chunks_v2.py b/src/p1st/btrfs_send_chunks_v2.py index 18b0ab1..c352a9b 100644 --- a/src/p1st/btrfs_send_chunks_v2.py +++ b/src/p1st/btrfs_send_chunks_v2.py @@ -4,7 +4,7 @@ import argparse import shlex from pathlib import Path -from p1st.exec_print_chunk import execute_print_chunk +from p1st.exec_produce_chunks import execute_produce_chunks from p1st.repeat import repeat_until_successful @@ -42,12 +42,13 @@ def main(): repeat_until_successful(rsync_cmd, usr_confirmation_socket) repeat_until_successful(inform_cmd, usr_confirmation_socket) - execute_print_chunk( + returncode = execute_produce_chunks( command=command, get_chunk_path=get_chunk_path, handle_chunk=handle_chunk, chunk_size=args.chunk_size, ) + exit(returncode) def parse_args(): diff --git a/src/p1st/exec_consume_chunks.py b/src/p1st/exec_consume_chunks.py new file mode 100644 index 0000000..bcc0d22 --- /dev/null +++ b/src/p1st/exec_consume_chunks.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import subprocess +import sys +import threading +from pathlib import Path +from queue import Queue +from typing import Callable, IO, AnyStr + + +def execute_consume_chunks(command: list[str], + handle_chunks: Callable[[Queue.put], None], + ) -> int: + """ + :param command: + :param handle_chunks: Has one parameter, `queue_put`. After a chunks is saved, this method must call queue_put.(chunk_path, last_chunk). + :return: + """ + process = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + ) + + q = Queue(maxsize=2) + + threads = [ + threading.Thread( + target=_stdin_worker, + args=(q.get, + process.stdin, + )), + threading.Thread( + target=handle_chunks, + args=(q.put, + )), + threading.Thread( + target=_stderr_worker, + args=(process.stderr, + )), + ] + + for t in threads: + t.daemon = True + t.start() + + returncode: int = process.wait() + for t in threads: + t.join() + + return returncode + + +def _stdin_worker(queue_get: Queue.get, + binary_stdin: IO[AnyStr], + ): + while True: + chunk_path, last_chunk = queue_get() + chunk = _read_chunk(chunk_path) + binary_stdin.write(chunk) + # binary_stdin.flush() # TODO: is this required? + if last_chunk: + break + + binary_stdin.flush() + + # TODO: Has this any effect? On stdin probably yes! + binary_stdin.close() + + +def _read_chunk(chunk_path: Path) -> bytes: + """ + Reads a chunk from the given path. + """ + print(f'Reading chunk {chunk_path}') + + # Fails if file does not exist. + return chunk_path.read_bytes() + + +def _stderr_worker(binary_stderr: IO[AnyStr]): + """ + Prints stderr of subprocess to sys.stderr. + """ + b: bytes + for b in binary_stderr: + sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') + + # TODO: Has this any effect? + # binary_stderr.close() diff --git a/src/p1st/exec_print_chunk.py b/src/p1st/exec_produce_chunks.py similarity index 81% rename from src/p1st/exec_print_chunk.py rename to src/p1st/exec_produce_chunks.py index af5289d..33b0c4d 100644 --- a/src/p1st/exec_print_chunk.py +++ b/src/p1st/exec_produce_chunks.py @@ -8,11 +8,27 @@ from queue import Queue from typing import Callable, IO, AnyStr -def execute_print_chunk(command: list[str], - get_chunk_path: Callable[[int], Path], - handle_chunk: Callable[[int, bool], None], - chunk_size: int = 512, # TODO increase - ) -> int: +def execute_produce_chunks(command: list[str], + get_chunk_path: Callable[[int], Path], + handle_chunk: Callable[[int, bool], None], + chunk_size: int = 1024 * 1024, + ) -> int: + """ + Executes the given command in a subprocess. + + Stdout of the subprocess is saved in chunks. + The location of the chunks is determined by `get_chunk_path`. + + A separate thread calls `handle_chunk` once a new chunk was saved. + + Stderr of the subprocess is printed to sys.stderr. + + :param command: + :param get_chunk_path: + :param handle_chunk: + :param chunk_size: + :return: + """ process = subprocess.Popen( command, stdout=subprocess.PIPE, @@ -112,14 +128,13 @@ def _chunk_worker(queue_get: Queue.get, chunk_no, last_chunk = queue_get() handle_chunk(chunk_no, last_chunk) if last_chunk: - break + return def _stderr_worker(binary_stderr: IO[AnyStr]): """ Prints stderr of subprocess to sys.stderr. """ - b: bytes for b in binary_stderr: sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') diff --git a/src/p1st/test.py b/src/p1st/test.py index f9bd259..8c50431 100644 --- a/src/p1st/test.py +++ b/src/p1st/test.py @@ -8,9 +8,9 @@ from pathlib import Path from p1st.exec_capture import execute_capture from p1st.exec_print_capture import execute_print_capture -from p1st.exec_print_chunk import execute_print_chunk from p1st.exec_print_receive import execute_print_receive_chunks from p1st.exec_print_transfer import execute_print_transfer_chunks +from p1st.exec_produce_chunks import execute_produce_chunks from p1st.transfer_inform import transfer_inform @@ -24,7 +24,8 @@ def test(): # test8() # test9() # test10() - test11() + # test11() + test12() def test1(): @@ -232,7 +233,7 @@ def test10(): # Delete chunk. chunk_path.unlink(missing_ok=False) - execute_print_chunk( + execute_produce_chunks( command=['cat', str(source_file)], get_chunk_path=get_chunk_path, handle_chunk=handle_chunk, @@ -254,7 +255,7 @@ def test11(): print(f'\tsudo mkdir {target_dir}') print(f'\tcd {repo_name} && make && sudo make clean && cd ..') print() - print(f'\tsudo btrfs-receive-chunks {target_path}') + print(f'\tsudo btrfs-receive-chunks-v2 {target_path}') print() print(f'=== In another shell, connect with "ssh odroid" ===') @@ -267,6 +268,15 @@ def test11(): print(f'\tsudo btrfs-send-chunks-v2 {child_path} {ssh_target} {target_path}') +def test12(): + child_path = '/mnt/backup/snap/blogger.privacy1st.de/20230104T2255' + ssh_target = 'rootnas' + target_path = '/mnt/data/test/blogger.privacy1st.de/20230104T2255' + target_chunk_path = '/mnt/data/test/chunks' + print(f'sudo btrfs-receive-chunks-v2 --chunk-dir={target_chunk_path} {target_path}') + print(f'sudo btrfs-send-chunks-v2 --chunk-dir=/mnt/backup/test/chunks --target-chunk-dir={target_chunk_path} {child_path} {ssh_target} {target_path}') + + def _init(test_number: int): print(f"TEST {test_number}") test_dir = Path('test')