diff --git a/btrfs_receive_chunks.py b/btrfs_receive_chunks.py new file mode 100644 index 0000000..8f7e275 --- /dev/null +++ b/btrfs_receive_chunks.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import argparse +from pathlib import Path + +from exec_print_receive import execute_print_receive_chunks +from common import _get_chunk_file, _get_remote_socket + + +def main(): + args = parse_args() + + target_path: Path = args.target_path + command = ['btrfs', 'receive', str(target_path)] + target_socket = _get_remote_socket(target_path) + + execute_print_receive_chunks( + command=command, + socket_file=target_socket, + chunk_file_tmpl=args.chunk_tmpl, + get_chunk_file=_get_chunk_file, + ) + + +def parse_args(): + parser = argparse.ArgumentParser(prog='btrfs-receive-chunks') + + parser.add_argument('--chunk-tmpl', + help='During btrfs-receive, chunks are saved as "CHUNK_TMPL.CHUNK_NUMBER". ' + 'The default value of CHUNK_TMPL is "SUBVOLUME.CHUNK". ' + 'One can change it to e.g. "/tmp/chunk/SUBVOLUME".', + dest='chunk_tmpl', + default=None, + type=Path, + metavar='CHUNK_TMPL' + ) + + parser.add_argument('target_path', + help='Path were the subvolume will be created; forwarded to btrfs-receive.', + type=Path, + metavar='SUBVOLUME' + ) + + args = parser.parse_args() + + if not args.chunk_tmpl: + target_path: Path = args.target_path + args.chunk_tmpl = target_path.parent.joinpath(f'{target_path.name}.CHUNK') + + return args + + +if __name__ == '__main__': + main() diff --git a/btrfs_send_chunks.py b/btrfs_send_chunks.py new file mode 100644 index 0000000..a61841d --- /dev/null +++ b/btrfs_send_chunks.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import argparse +from pathlib import Path +import shlex + +from exec_print_transfer import execute_print_transfer_chunks +from common import _get_chunk_file, _get_remote_socket +from transfer_inform import transfer_inform + + +def main(): + args = parse_args() + + command_parts = ( + ['btrfs', 'send'], + ['-p', str(args.parent)] if args.parent else [], + ['--compress-data'] if args.compressed_data else [], + [str(args.child)] + ) + command = [x for xs in command_parts for x in xs] + + execute_print_transfer_chunks( + command=command, + chunk_file_tmpl=args.chunk_tmpl, + chunk_transfer_fun=chunk_transfer_fun, + chunk_transfer_args=(args.ssh_target, args.target_path, args.chunk_tmpl), + chunk_size=args.chunk_size, + ) + + +def parse_args(): + parser = argparse.ArgumentParser(prog='btrfs-send-chunks') + + parser.add_argument('-p', + help='Parent subvolume; forwarded to btrfs-send.', + dest='parent', + default=None, + type=Path, + metavar='PARENT_SUBVOLUME' + ) + + parser.add_argument('--compressed-data', + help='Forwarded to btrfs-send.', + dest='compressed_data', + action='store_true', + default=False, + ) + + parser.add_argument('--chunk-tmpl', + help='During btrfs-send, chunks are saved as "CHUNK_TMPL.CHUNK_NUMBER". ' + 'The default value of CHUNK_TMPL is "CHILD_SUBVOLUME.CHUNK". ' + 'One can change it to e.g. "/tmp/chunk/CHILD_SUBVOLUME".', + dest='chunk_tmpl', + default=None, + type=Path, + metavar='CHUNK_TMPL' + ) + + parser.add_argument('--chunk-size', + help='Size in bytes; defaults to 64 MB.', + dest='chunk_size', + type=int, + default=64 * 1024 * 1024, + ) + + parser.add_argument('child', + help='Forwarded to btrfs-send.', + type=Path, + metavar='CHILD_SUBVOLUME' + ) + + parser.add_argument('ssh_target', + help='Hostname of target computer; as configured in ~/.ssh/config.', + metavar='SSH_TARGET' + ) + + parser.add_argument('target_path', + help='Path where the subvolume will be created on the remote ssh target.', + type=Path, + metavar='TARGET_PATH' + ) + + args = parser.parse_args() + + if not args.chunk_tmpl: + child: Path = args.child + args.chunk_tmpl = child.parent.joinpath(f'{child.name}.CHUNK') + + return args + + +def chunk_transfer_fun(chunk_file: Path, ct: int, eof: bool, + ssh_target: str, + target_path: Path, + chunk_tmpl: Path): + target_chunk = _get_chunk_file(chunk_tmpl, ct) + rsync_cmd = ['rsync', str(chunk_file), f'{ssh_target}:{str(target_chunk)}'] + + message = 'EOF' if eof else 'OK' + target_socket = _get_remote_socket(target_path) + inform_cmd = ['ssh', ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] + + transfer_inform( + rsync_cmd=rsync_cmd, + inform_cmd=inform_cmd, + user_input_file=chunk_file.parent.joinpath(f'{chunk_file.name}.SOCKET'), + ) + + +if __name__ == '__main__': + main() diff --git a/get_chunk_file.py b/common.py similarity index 52% rename from get_chunk_file.py rename to common.py index 827362c..395cea8 100644 --- a/get_chunk_file.py +++ b/common.py @@ -1,5 +1,11 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- from pathlib import Path def _get_chunk_file(chunk_file_tmpl: Path, ct: int): return chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') + + +def _get_remote_socket(target: Path): + return target.parent.joinpath(f'{target}.SOCKET') diff --git a/exec_print_receive.py b/exec_print_receive.py index 417fe35..e5edbb8 100644 --- a/exec_print_receive.py +++ b/exec_print_receive.py @@ -6,7 +6,7 @@ import threading from pathlib import Path from typing import IO, AnyStr, Callable -from get_chunk_file import _get_chunk_file +from common import _get_chunk_file from receive_inform import receive_inform diff --git a/exec_print_transfer.py b/exec_print_transfer.py index 24189c0..b556931 100644 --- a/exec_print_transfer.py +++ b/exec_print_transfer.py @@ -6,7 +6,7 @@ import threading import subprocess from typing import AnyStr, IO, Callable -from get_chunk_file import _get_chunk_file +from common import _get_chunk_file def _rotate_chunk(chunk_file: Path, chunk_number: int, eof: bool, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): @@ -44,8 +44,6 @@ def _save_output_rotating_chunks(out_pipe: IO[AnyStr], chunk_transfer_args: tuple, get_chunk_file: Callable[[Path, int], Path], ): - chunk_file_tmpl.parent.mkdir(parents=True, exist_ok=True) - ct: int = 1 remaining_bytes = chunk_size chunk: bytes = b'' @@ -57,6 +55,7 @@ def _save_output_rotating_chunks(out_pipe: IO[AnyStr], if len(b) == 0: # EOF reached. chunk_file = get_chunk_file(chunk_file_tmpl, ct) + chunk_file.parent.mkdir(parents=True, exist_ok=True) _save_chunk(chunk, chunk_file) _rotate_chunk(chunk_file, ct, True, chunk_transfer_fun, chunk_transfer_args) @@ -68,6 +67,7 @@ def _save_output_rotating_chunks(out_pipe: IO[AnyStr], if chunk_len == chunk_size: # Next chunk is full. chunk_file = get_chunk_file(chunk_file_tmpl, ct) + chunk_file.parent.mkdir(parents=True, exist_ok=True) _save_chunk(chunk, chunk_file) _rotate_chunk(chunk_file, ct, False, chunk_transfer_fun, chunk_transfer_args) @@ -113,7 +113,7 @@ def execute_print_transfer_chunks(command: list[str], During command execution: Forwards stderr output of the command to stderr. - :param chunk_file_tmpl: Chunks are saved as this file before they are transferred. + :param chunk_file_tmpl: Used by get_chunk_file to calculate the paths of the chunk files. :param command: Command to execute, e.g. ['cat', '/some/large/file'] :param chunk_transfer_fun: Called for each chunk with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args) :param chunk_transfer_args: