add btrfs_receive_chunks_v3

This commit is contained in:
Daniel Langbein 2023-01-12 16:28:18 +01:00
parent bfd2906ccd
commit 52243d5f3a
10 changed files with 173 additions and 70 deletions

View File

@ -33,5 +33,6 @@ where = src
; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html ; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html
console_scripts = console_scripts =
btrfs-send-chunks-v2 = p1st.btrfs_send_chunks_v2:main btrfs-send-chunks-v2 = p1st.btrfs_send_chunks_v2:main
btrfs-receive-chunks = p1st.btrfs_receive_chunks:main btrfs-send-chunks-v3 = p1st.btrfs_send_chunks_v3:main
btrfs-receive-chunks-v2 = p1st.btrfs_receive_chunks_v2:main btrfs-receive-chunks-v2 = p1st.btrfs_receive_chunks_v2:main
btrfs-receive-chunks-v3 = p1st.btrfs_receive_chunks_v3:main

View File

@ -12,10 +12,10 @@ from p1st.unix_sock_input import accept_loop_until_message
def main(): def main():
args = parse_args() args = parse_args()
# #
chunk_dir: Path = args.chunk_dir target_chunk_dir: Path = args.chunk_dir
target_path: Path = args.target_path target_path: Path = args.target_path
chunk_dir.mkdir(parents=True, exist_ok=True) target_chunk_dir.mkdir(parents=True, exist_ok=True)
command = ['btrfs', 'receive', str(target_path.parent)] command = ['btrfs', 'receive', str(target_path.parent)]
@ -34,7 +34,7 @@ def main():
msg = accept_loop_until_message(sock, messages) msg = accept_loop_until_message(sock, messages)
last_chunk = msg == b'EOF\n' last_chunk = msg == b'EOF\n'
chunk_path = chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') chunk_path = target_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}')
queue_put((chunk_path, last_chunk)) queue_put((chunk_path, last_chunk))
if last_chunk: if last_chunk:

View File

@ -0,0 +1,118 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import shlex
from pathlib import Path
from queue import Queue
from p1st.exec_consume_chunks import execute_consume_chunks
from p1st.repeat import repeat_until_successful
def main():
args = parse_args()
#
ssh_source: str = args.ssh_source
source_chunk_dir: Path = args.source_chunk_dir
targe_chunk_dir: Path = args.chunk_dir
target_path: Path = args.target_path
target_path.parent.mkdir(parents=True, exist_ok=True)
command = ['btrfs', 'receive', str(target_path.parent)]
def handle_chunks(queue_put: Queue.put):
chunk_no = 1
while True:
target_chunk_path = targe_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}')
source_chunk_path = source_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}')
in_socket: Path = source_chunk_dir.parent.joinpath(f'{target_path.name}.SOCKET.in')
usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{target_chunk_path.name}.SOCKET')
inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE')
inform_cmd = ['ssh',
ssh_source,
f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}']
rsync_cmd = ['rsync',
f'{ssh_source}:{source_chunk_path}',
f'{target_chunk_path}']
inform2_cmd = ['ssh',
ssh_source,
f'echo OK | nc -U {shlex.quote(str(in_socket))}']
# Wait until next chunk can be transferred from sending side.
messages = ['OK\n', 'EOF\n']
msg = repeat_until_successful(inform_cmd, usr_confirmation_socket)
if msg not in messages:
raise ValueError(f'Inalid message: >>>{msg}<<<')
last_chunk = msg == 'EOF\n'
# Transfer chunk.
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
# Add chunk path to queue.
queue_put((target_chunk_path, last_chunk))
# Inform sending side about successful transfer.
repeat_until_successful(inform2_cmd, usr_confirmation_socket)
if last_chunk:
break
chunk_no += 1
returncode = execute_consume_chunks(
command=command,
handle_chunks=handle_chunks,
)
exit(returncode)
def parse_args():
parser = argparse.ArgumentParser(prog='btrfs-receive-chunks')
parser.add_argument('--chunk-dir',
help='Chunks are saved in this directory. '
'Defaults to parent directory of SUBVOLUME.',
dest='chunk_dir',
type=Path,
metavar='CHUNK_DIR',
default=None,
)
parser.add_argument('ssh_source',
help='Hostname of source computer; as configured in ~/.ssh/config.',
metavar='SSH_SOURCE'
)
parser.add_argument('source_chunk_dir',
help='Chunks are saved in this directory on SSH_SOURCE. '
'Must be an absolute path.',
type=Path,
metavar='SOURCE_CHUNK_DIR',
)
parser.add_argument('target_path',
help='Path where the subvolume will be created. '
'The last component of the path '
'must be equal to the name of the subvolume on the sending side.',
type=Path,
metavar='SUBVOLUME'
)
args = parser.parse_args()
# Make all paths absolute. Set default values.
args.target_path = args.target_path.absolute()
if args.chunk_dir:
args.chunk_dir = args.chunk_dir.absolute()
else:
args.chunk_dir = args.target_path.absolute().parent
if not args.source_chunk_dir.is_absolute():
raise ValueError(f'SOURCE_CHUNK_DIR must be absolute')
return args
if __name__ == '__main__':
main()

View File

@ -16,7 +16,7 @@ def main():
compressed_data: bool = args.compressed_data compressed_data: bool = args.compressed_data
child: Path = args.child child: Path = args.child
parent: Path | None = args.parent parent: Path | None = args.parent
chunk_dir: Path = args.chunk_dir source_chunk_dir: Path = args.chunk_dir
target_chunk_dir: Path = args.target_chunk_dir target_chunk_dir: Path = args.target_chunk_dir
target_path: Path = args.target_path target_path: Path = args.target_path
@ -28,21 +28,21 @@ def main():
) )
command = [x for xs in command_parts for x in xs] command = [x for xs in command_parts for x in xs]
def get_chunk_path(chunk_no: int) -> Path: def get_source_chunk_path(chunk_no: int) -> Path:
return chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') return source_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}')
def handle_chunk(chunk_no: int, last_chunk: bool): def handle_chunk(chunk_no: int, last_chunk: bool):
chunk_path = get_chunk_path(chunk_no) source_chunk_path = get_source_chunk_path(chunk_no)
target_chunk_path = target_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') target_chunk_path = target_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}')
print(f'Handling chunk {chunk_path}') print(f'Handling chunk {source_chunk_path}')
usr_confirmation_socket = chunk_path.parent.joinpath(f'{chunk_path.name}.SOCKET') usr_confirmation_socket = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.SOCKET')
target_socket = target_path.parent.joinpath(f'{target_path.name}.SOCKET') target_socket = target_path.parent.joinpath(f'{target_path.name}.SOCKET')
message = 'EOF' if last_chunk else 'OK' message = 'EOF' if last_chunk else 'OK'
rsync_cmd = ['rsync', rsync_cmd = ['rsync',
str(chunk_path), str(source_chunk_path),
f'{ssh_target}:{str(target_chunk_path)}'] f'{ssh_target}:{str(target_chunk_path)}']
inform_cmd = ['ssh', inform_cmd = ['ssh',
ssh_target, ssh_target,
@ -53,7 +53,7 @@ def main():
returncode = execute_produce_chunks( returncode = execute_produce_chunks(
command=command, command=command,
get_chunk_path=get_chunk_path, get_chunk_path=get_source_chunk_path,
handle_chunk=handle_chunk, handle_chunk=handle_chunk,
chunk_size=chunk_size, chunk_size=chunk_size,
) )

View File

@ -1,14 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import argparse import argparse
import shlex
import socket import socket
from pathlib import Path from pathlib import Path
from p1st.exec_produce_chunks import execute_produce_chunks from p1st.exec_produce_chunks import execute_produce_chunks
from p1st.repeat import repeat_until_successful
from p1st.unix_sock_input import accept_loop_until_message from p1st.unix_sock_input import accept_loop_until_message
from p1st.unix_socket_output import write_message
def main(): def main():
@ -18,9 +15,9 @@ def main():
compressed_data: bool = args.compressed_data compressed_data: bool = args.compressed_data
child: Path = args.child child: Path = args.child
parent: Path | None = args.parent parent: Path | None = args.parent
chunk_dir: Path = args.chunk_dir source_chunk_dir: Path = args.chunk_dir
chunk_dir.mkdir(parents=True, exist_ok=True) source_chunk_dir.mkdir(parents=True, exist_ok=True)
command_parts = ( command_parts = (
['btrfs', 'send'], ['btrfs', 'send'],
@ -30,24 +27,22 @@ def main():
) )
command = [x for xs in command_parts for x in xs] command = [x for xs in command_parts for x in xs]
def get_chunk_path(chunk_no: int) -> Path: def get_source_chunk_path(chunk_no: int) -> Path:
return chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') return source_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}')
def handle_chunk(chunk_no: int, last_chunk: bool): def handle_chunk(chunk_no: int, last_chunk: bool):
chunk_path = get_chunk_path(chunk_no) source_chunk_path = get_source_chunk_path(chunk_no)
print(f'Handling chunk {chunk_path}') print(f'Handling chunk {source_chunk_path}')
# Create in_socket # Create in_socket
in_socket: Path = chunk_dir.parent.joinpath(f'{child.name}.SOCKET.in') in_socket: Path = source_chunk_dir.parent.joinpath(f'{child.name}.SOCKET.in')
in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
in_sock.bind(str(in_socket)) in_sock.bind(str(in_socket))
in_sock.listen(1) in_sock.listen(1)
# Write to out_socket # Inform receiving side, that the next chunk can be transferred.
# -> Inform receiving side, that the next chung can be transferred. inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE')
out_socket: Path = chunk_dir.parent.joinpath(f'{child.name}.SOCKET.out') inform_path.write_text('EOF\n' if last_chunk else 'OK\n')
message = b'EOF\n' if last_chunk else b'OK\n'
write_message(out_socket, message)
# Read from in_socket # Read from in_socket
# -> Receiving side informs us after the chunk has been transferred. # -> Receiving side informs us after the chunk has been transferred.
@ -59,7 +54,7 @@ def main():
returncode = execute_produce_chunks( returncode = execute_produce_chunks(
command=command, command=command,
get_chunk_path=get_chunk_path, get_chunk_path=get_source_chunk_path,
handle_chunk=handle_chunk, handle_chunk=handle_chunk,
chunk_size=chunk_size, chunk_size=chunk_size,
) )
@ -100,49 +95,21 @@ def parse_args():
default=None, default=None,
) )
parser.add_argument('--target-chunk-dir',
help='Chunks are saved in this directory on SSH_TARGET. '
'Must be an absolute path. '
'Defaults to parent directory of TARGET_PATH.',
dest='target_chunk_dir',
type=Path,
metavar='TARGET_CHUNK_DIR',
default=None,
)
parser.add_argument('child', parser.add_argument('child',
help='Path to child subvolume. Forwarded to btrfs-send.', help='Path to child subvolume. Forwarded to btrfs-send.',
type=Path, type=Path,
metavar='CHILD_SUBVOLUME' metavar='CHILD_SUBVOLUME'
) )
parser.add_argument('ssh_target',
help='Hostname of target computer; as configured in ~/.ssh/config.',
metavar='SSH_TARGET'
)
parser.add_argument('target_path',
help='Absolute path where the subvolume will be created on SSH_TARGET.',
type=Path,
metavar='TARGET_PATH'
)
args = parser.parse_args() args = parser.parse_args()
# Make all paths absolute. Set default values. # Make all paths absolute. Set default values.
args.child = args.child.absolute() args.child = args.child.absolute()
if not args.target_path.is_absolute():
raise ValueError('TARGET_PATH must be absolute')
# #
if args.chunk_dir: if args.chunk_dir:
args.chunk_dir = args.chunk_dir.absolute() args.chunk_dir = args.chunk_dir.absolute()
else: # Default value else: # Default value
args.chunk_dir = args.child.absolute().parent args.chunk_dir = args.child.absolute().parent
if args.target_chunk_dir:
if not args.target_chunk_dir.is_absolute():
raise ValueError('TARGET_CHUNK_DIR must be absolute')
else: # Default value
args.target_chunk_dir = args.target_path.parent
# #
if args.parent: if args.parent:
args.parent = args.parent.absolute() args.parent = args.parent.absolute()

View File

@ -32,7 +32,7 @@ class DataUnitConverter:
return round(num_bytes / cls._unit_dict[unit], 3) return round(num_bytes / cls._unit_dict[unit], 3)
@classmethod @classmethod
def to_unit_auto(cls, num_bytes: int) -> (float, DataUnit): def to_unit_auto(cls, num_bytes: int) -> tuple[float, DataUnit]:
for unit, factor in cls._unit_dict.items(): for unit, factor in cls._unit_dict.items():
converted = cls.to_unit(num_bytes, unit) converted = cls.to_unit(num_bytes, unit)
if converted <= 999: if converted <= 999:

View File

@ -3,7 +3,7 @@
import subprocess import subprocess
def execute_capture(command: list[str]) -> [int, str, str]: def execute_capture(command: list[str]) -> tuple[int, str, str]:
completed: subprocess.CompletedProcess = subprocess.run( completed: subprocess.CompletedProcess = subprocess.run(
command, command,
capture_output=True, capture_output=True,

View File

@ -19,12 +19,14 @@ class _Assert:
raise ValueError(f'Expected a and b to be equal: {a}, {b}') raise ValueError(f'Expected a and b to be equal: {a}, {b}')
def _read_output(str_pipe: IO[AnyStr], queue_put: Queue.put, list_append: list.append, prefix: str = ''): def _read_output(str_pipe: IO[AnyStr],
queue_put: Queue.put,
list_append: list.append,
prefix: str = ''):
line: str line: str
for line in str_pipe: for line in str_pipe:
func: Callable[[str], None] queue_put(f'{prefix}{line}')
for func in (queue_put, list_append): list_append(line)
func(f'{prefix}{line}')
# TODO: Has this any effect? # TODO: Has this any effect?
# str_pipe.close() # str_pipe.close()
@ -73,7 +75,7 @@ def _write_output(queue_get: Queue.get):
# #
# - Read data from stdout and stderr, until end-of-file is reached. # - Read data from stdout and stderr, until end-of-file is reached.
# - Note: The data read is buffered in memory, so do not use this method if the data size is large or unlimited. # - Note: The data read is buffered in memory, so do not use this method if the data size is large or unlimited.
def execute_print_capture(command: list[str], encoding='UTF-8') -> [int, list[str], list[str]]: def execute_print_capture(command: list[str], encoding='UTF-8') -> tuple[int, str, str]:
""" """
Executes the given command. Executes the given command.
@ -124,4 +126,4 @@ def execute_print_capture(command: list[str], encoding='UTF-8') -> [int, list[st
q.put(None) q.put(None)
t_write.join() t_write.join()
return returncode, out, err return returncode, ''.join(out), ''.join(err)

View File

@ -5,7 +5,7 @@ from p1st.unix_sock_input import wait_for_message
def repeat_until_successful(command: list[str], def repeat_until_successful(command: list[str],
socket_file: Path) -> None: socket_file: Path) -> str:
""" """
Executes the given `command`. Executes the given `command`.
@ -13,9 +13,11 @@ def repeat_until_successful(command: list[str],
`socket_file` and waits for user input. `socket_file` and waits for user input.
""" """
while True: while True:
returncode, _out, _err = execute_print_capture(command) returncode: int
out: str
returncode, out, _err = execute_print_capture(command)
if returncode == 0: if returncode == 0:
return return out
print(f'\n' print(f'\n'
f'Error while executing:\n' f'Error while executing:\n'

View File

@ -25,7 +25,8 @@ def test():
# test9() # test9()
# test10() # test10()
# test11() # test11()
test12() # test12()
test13()
def test1(): def test1():
@ -273,8 +274,20 @@ def test12():
ssh_target = 'rootnas' ssh_target = 'rootnas'
target_path = '/mnt/data/test/blogger.privacy1st.de/20230104T2255' target_path = '/mnt/data/test/blogger.privacy1st.de/20230104T2255'
target_chunk_path = '/mnt/data/test/chunks' target_chunk_path = '/mnt/data/test/chunks'
print(f'sudo btrfs-receive-chunks-v2 --chunk-dir={target_chunk_path} {target_path}') print(f'nas: sudo btrfs-receive-chunks-v2 --chunk-dir={target_chunk_path} {target_path}')
print(f'sudo btrfs-send-chunks-v2 --chunk-dir=/mnt/backup/test/chunks --target-chunk-dir={target_chunk_path} {child_path} {ssh_target} {target_path}') print(
f'odroid: sudo btrfs-send-chunks-v2 --chunk-dir=/mnt/backup/test/chunks --target-chunk-dir={target_chunk_path} {child_path} {ssh_target} {target_path}')
def test13():
ssh_source = 'rootnas'
source_path = '/mnt/data/snap/blogger.privacy1st.de/20230104T1622_u'
source_chunk_dir = '/mnt/data/test/chunks'
target_path = '/mnt/backup/test/blogger.privacy1st.de/20230104T1622_u'
print(f'nas: sudo btrfs-send-chunks-v3 --chunk-dir={source_chunk_dir} {source_path}')
print(f'odroid: sudo btrfs-receive-chunks-v3 {ssh_source} {source_chunk_dir} {target_path}')
def _init(test_number: int): def _init(test_number: int):