add exec_print_chunk and btrfs_send_chunks_v2

This commit is contained in:
Daniel Langbein 2023-01-12 13:19:21 +01:00
parent 097c195e0e
commit 7f6cd90c18
8 changed files with 380 additions and 42 deletions

View File

@ -33,4 +33,5 @@ where = src
; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html ; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html
console_scripts = console_scripts =
btrfs-send-chunks = p1st.btrfs_send_chunks:main btrfs-send-chunks = p1st.btrfs_send_chunks:main
btrfs-send-chunks-v2 = p1st.btrfs_send_chunks_v2:main
btrfs-receive-chunks = p1st.btrfs_receive_chunks:main btrfs-receive-chunks = p1st.btrfs_receive_chunks:main

View File

@ -0,0 +1,138 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import shlex
from pathlib import Path
from p1st.exec_print_chunk import execute_print_chunk
from p1st.repeat import repeat_until_successful
def main():
args = parse_args()
command_parts = (
['btrfs', 'send'],
['-p', str(args.parent)] if args.parent else [],
['--compress-data'] if args.compressed_data else [],
[str(args.child)]
)
command = [x for xs in command_parts for x in xs]
def get_chunk_path(chunk_no: int) -> Path:
return args.chunk_dir.joinpath(f'{args.child.name}.CHUNK.{chunk_no}')
def handle_chunk(chunk_no: int, last_chunk: bool):
chunk_path = get_chunk_path(chunk_no)
target_chunk_path = args.target_chunk_dir.joinpath(f'{args.child.name}.CHUNK.{chunk_no}')
print(f'Handling chunk {chunk_path}')
usr_confirmation_socket = chunk_path.parent.joinpath(f'{chunk_path.name}.SOCKET')
target_socket = args.target_path.parent.joinpath(f'{args.target_path.name}.SOCKET')
message = 'EOF' if last_chunk else 'OK'
rsync_cmd = ['rsync',
str(chunk_path),
f'{args.ssh_target}:{str(target_chunk_path)}']
inform_cmd = ['ssh',
args.ssh_target,
f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
repeat_until_successful(inform_cmd, usr_confirmation_socket)
execute_print_chunk(
command=command,
get_chunk_path=get_chunk_path,
handle_chunk=handle_chunk,
chunk_size=args.chunk_size,
)
def parse_args():
parser = argparse.ArgumentParser(prog='btrfs-send-chunks')
parser.add_argument('-p',
help='Path to parent subvolume; forwarded to btrfs-send.',
dest='parent',
default=None,
type=Path,
metavar='PARENT_SUBVOLUME'
)
parser.add_argument('--compressed-data',
help='Forwarded to btrfs-send.',
dest='compressed_data',
action='store_true',
default=False,
)
parser.add_argument('--chunk-size',
help='Size in bytes; defaults to 128 MB.',
dest='chunk_size',
type=int,
default=128 * 1024 * 1024,
)
parser.add_argument('--chunk-dir',
help='Chunks are saved in this directory. '
'Defaults to parent directory of CHILD.',
dest='chunk_dir',
type=Path,
metavar='CHUNK_DIR',
default=None,
)
parser.add_argument('--target-chunk-dir',
help='Chunks are saved in this directory on SSH_TARGET. '
'Must be an absolute path. '
'Defaults to parent directory of TARGET_PATH.',
dest='target_chunk_dir',
type=Path,
metavar='TARGET_CHUNK_DIR',
default=None,
)
parser.add_argument('child',
help='Path to child subvolume. Forwarded to btrfs-send.',
type=Path,
metavar='CHILD_SUBVOLUME'
)
parser.add_argument('ssh_target',
help='Hostname of target computer; as configured in ~/.ssh/config.',
metavar='SSH_TARGET'
)
parser.add_argument('target_path',
help='Absolute path where the subvolume will be created on SSH_TARGET.',
type=Path,
metavar='TARGET_PATH'
)
args = parser.parse_args()
# Make all paths absolute. Set default values.
args.child = args.child.absolute()
if not args.target_path.is_absolute():
raise ValueError('TARGET_PATH must be absolute')
#
if args.chunk_dir:
args.chunk_dir = args.chunk_dir.absolute()
else: # Default value
args.chunk_dir = args.child.absolute().parent
if args.target_chunk_dir:
if not args.target_chunk_dir.is_absolute():
raise ValueError('TARGET_CHUNK_DIR must be absolute')
else: # Default value
args.target_chunk_dir = args.target_path.parent
#
if args.parent:
args.parent = args.parent.absolute()
return args
if __name__ == '__main__':
main()

View File

@ -0,0 +1,128 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import sys
import threading
from pathlib import Path
from queue import Queue
from typing import Callable, IO, AnyStr
def execute_print_chunk(command: list[str],
get_chunk_path: Callable[[int], Path],
handle_chunk: Callable[[int, bool], None],
chunk_size: int = 512, # TODO increase
) -> int:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
)
q = Queue(maxsize=2)
threads = [
threading.Thread(
target=_stdout_worker,
args=(process.stdout,
get_chunk_path,
q.put,
chunk_size,
)),
threading.Thread(
target=_chunk_worker,
args=(q.get,
handle_chunk,
)),
threading.Thread(
target=_stderr_worker,
args=(process.stderr,
)),
]
for t in threads:
t.daemon = True
t.start()
returncode: int = process.wait()
for t in threads:
t.join()
return returncode
def _stdout_worker(binary_stdout: IO[AnyStr],
get_chunk_path: Callable[[int], Path],
queue_put: Queue.put,
chunk_size: int,
):
chunk_no: int = 1
remaining_bytes: int = chunk_size
chunk: bytes = b''
while True:
b = binary_stdout.read(remaining_bytes)
if len(b) == 0: # EOF reached.
chunk_path: Path = get_chunk_path(chunk_no)
_save_chunk(chunk, chunk_path)
queue_put((chunk_no, True))
break
chunk += b
chunk_len: int = len(chunk)
if chunk_len == chunk_size: # Next chunk is full.
chunk_path: Path = get_chunk_path(chunk_no)
_save_chunk(chunk, chunk_path)
queue_put((chunk_no, False))
chunk = b''
remaining_bytes = chunk_size
chunk_no += 1
elif chunk_len < chunk_size:
remaining_bytes = chunk_size - chunk_len
else:
raise ValueError('Invalid state')
# TODO: Has this any effect?
# binary_stdout.close()
def _save_chunk(chunk: bytes, chunk_path: Path):
"""
Saves a chunk at the given path.
"""
print(f'Saving chunk {chunk_path}')
chunk_path.parent.mkdir(parents=True, exist_ok=True)
# Fails if file does already exist.
with open(chunk_path, 'xb') as f:
f.write(chunk)
def _chunk_worker(queue_get: Queue.get,
handle_chunk: Callable[[int, bool], None],
):
"""
Calls handle_chunk(chunk_no, last_chunk) for each chunk
that has been saved.
"""
while True:
chunk_no, last_chunk = queue_get()
handle_chunk(chunk_no, last_chunk)
if last_chunk:
break
def _stderr_worker(binary_stderr: IO[AnyStr]):
"""
Prints stderr of subprocess to sys.stderr.
"""
b: bytes
for b in binary_stderr:
sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}')
# TODO: Has this any effect?
# binary_stderr.close()

View File

@ -4,7 +4,7 @@ import socket
from pathlib import Path from pathlib import Path
from typing import IO, AnyStr, Callable from typing import IO, AnyStr, Callable
from p1st.unix_sock_input import accept_loop_until_command_received from p1st.unix_sock_input import accept_loop_until_message
def receive_inform(in_pipe: IO[AnyStr], def receive_inform(in_pipe: IO[AnyStr],
@ -25,10 +25,10 @@ def receive_inform(in_pipe: IO[AnyStr],
sock.listen(1) sock.listen(1)
ct = 1 ct = 1
commands = [b'OK\n', b'EOF\n'] messages = [b'OK\n', b'EOF\n']
while True: while True:
command = accept_loop_until_command_received(sock, commands) msg = accept_loop_until_message(sock, messages)
if command not in commands: if msg not in messages:
raise ValueError("Invalid state") raise ValueError("Invalid state")
chunk_file = get_chunk_file(chunk_file_tmpl, ct) chunk_file = get_chunk_file(chunk_file_tmpl, ct)
@ -37,9 +37,9 @@ def receive_inform(in_pipe: IO[AnyStr],
# in_pipe.flush() # TODO: is this required? # in_pipe.flush() # TODO: is this required?
chunk_file.unlink(missing_ok=False) chunk_file.unlink(missing_ok=False)
if command == b'OK\n': if msg == b'OK\n':
ct += 1 ct += 1
elif command == b'EOF\n': elif msg == b'EOF\n':
break break
else: else:
raise ValueError("Invalid state") raise ValueError("Invalid state")

30
src/p1st/repeat.py Normal file
View File

@ -0,0 +1,30 @@
from pathlib import Path
from p1st.exec_print_capture import execute_print_capture
from p1st.unix_sock_input import wait_for_message
def repeat_until_successful(command: list[str],
socket_file: Path) -> None:
"""
Executes the given `command`.
If an error occurs, it creates a UNIX socket at
`socket_file` and waits for user input.
"""
while True:
returncode, _out, _err = execute_print_capture(command)
if returncode == 0:
return
print(f'\n'
f'Error while executing:\n'
f'\t{command}\n'
f'\tFor details, see above output.')
print(f'Info:\n'
f'\tPlease fix the above error first. Then continue here:\n'
f'\tsudo pacman -S --needed openbsd-netcat\n'
f'\techo "OK" | nc -U "{socket_file.absolute()}"')
wait_for_message(socket_file, [b'OK\n'])
print()

View File

@ -8,6 +8,7 @@ from pathlib import Path
from p1st.exec_capture import execute_capture from p1st.exec_capture import execute_capture
from p1st.exec_print_capture import execute_print_capture from p1st.exec_print_capture import execute_print_capture
from p1st.exec_print_chunk import execute_print_chunk
from p1st.exec_print_receive import execute_print_receive_chunks from p1st.exec_print_receive import execute_print_receive_chunks
from p1st.exec_print_transfer import execute_print_transfer_chunks from p1st.exec_print_transfer import execute_print_transfer_chunks
from p1st.transfer_inform import transfer_inform from p1st.transfer_inform import transfer_inform
@ -21,7 +22,9 @@ def test():
# test5() # test5()
# test67() # test67()
# test8() # test8()
test9() # test9()
# test10()
test11()
def test1(): def test1():
@ -207,6 +210,63 @@ def test9():
print(f'sudo btrfs-send-chunks {child_path} {ssh_target} {target_path}') print(f'sudo btrfs-send-chunks {child_path} {ssh_target} {target_path}')
def test10():
_init(10)
source_file = Path('src/p1st/transfer_inform.py')
chunk_dir = Path('test')
target_file = Path('test/transfer_inform.py')
def get_chunk_path(chunk_no: int):
return chunk_dir.joinpath(f'{source_file.name}.CHUNK.{chunk_no}')
def handle_chunk(chunk_no: int, last_chunk: bool):
chunk_path = get_chunk_path(chunk_no)
print(f'Handling chunk {chunk_path}')
# Read chunk.
chunk = chunk_path.read_bytes()
# Append chunk to target.
with target_file.open("ab") as f:
f.write(chunk)
# Delete chunk.
chunk_path.unlink(missing_ok=False)
execute_print_chunk(
command=['cat', str(source_file)],
get_chunk_path=get_chunk_path,
handle_chunk=handle_chunk,
chunk_size=512,
)
def test11():
repo_name = 'subprocess_util'
child_name = 'test-subvolume'
child_dir = '/mnt/backup/test-dir'
child_path = f'{child_dir}/{child_name}'
target_dir = '/mnt/data/test-dir'
target_path = f'{target_dir}/{child_name}'
ssh_target = 'rootnas'
print(f'=== In one shell, connect with "ssh nas" ===')
print(f'\tsudo mkdir {target_dir}')
print(f'\tcd {repo_name} && make && sudo make clean && cd ..')
print()
print(f'\tsudo btrfs-receive-chunks {target_path}')
print()
print(f'=== In another shell, connect with "ssh odroid" ===')
print(f'\tsudo mkdir {child_dir}')
print(f'\tsudo btrfs subvolume create {child_path}.writeable')
print(f'\techo foo | sudo tee {child_path}.writeable/bar')
print(f'\tsudo btrfs subvolume snapshot -r {child_path}.writeable {child_path}')
print(f'\tcd {repo_name} && make && sudo make clean && cd ..')
print()
print(f'\tsudo btrfs-send-chunks-v2 {child_path} {ssh_target} {target_path}')
def _init(test_number: int): def _init(test_number: int):
print(f"TEST {test_number}") print(f"TEST {test_number}")
test_dir = Path('test') test_dir = Path('test')

View File

@ -2,8 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from pathlib import Path from pathlib import Path
from p1st.exec_print_capture import execute_print_capture from p1st.repeat import repeat_until_successful
from p1st.unix_sock_input import wait_until_command_received
def transfer_inform(rsync_cmd: list[str], def transfer_inform(rsync_cmd: list[str],
@ -26,29 +25,9 @@ def transfer_inform(rsync_cmd: list[str],
# - rsync to remote pc # - rsync to remote pc
# - catch error # - catch error
# - wait until user input, then repeat # - wait until user input, then repeat
_execute_loop_until_successful(rsync_cmd, user_input_file) repeat_until_successful(rsync_cmd, user_input_file)
# - inform remote pc about complete rsync # - inform remote pc about complete rsync
# - catch error # - catch error
# - wait until user input, then repeat # - wait until user input, then repeat
_execute_loop_until_successful(inform_cmd, user_input_file) repeat_until_successful(inform_cmd, user_input_file)
def _execute_loop_until_successful(cmd: list[str], user_input_file: Path):
while True:
returncode, _out, _err = execute_print_capture(cmd)
if returncode == 0:
break
else:
print(f'\n'
f'Error while executing:\n'
f'\t{cmd}\n'
f'\tFor details, see above output.')
print(f'Info:\n'
f'\tPlease fix the above error first. Then continue here:\n'
f'\tsudo pacman -S --needed openbsd-netcat\n'
f'\techo "OK" | nc -U "{user_input_file.absolute()}"')
wait_until_command_received(user_input_file, [b'OK\n'])
print()

View File

@ -4,16 +4,17 @@ import socket
from pathlib import Path from pathlib import Path
def wait_until_command_received(socket_file: Path, commands: list[bytes]) -> bytes: def wait_for_message(socket_file: Path,
messages: list[bytes]) -> bytes:
""" """
Creates a UNIX socket at socket_file. Creates a UNIX socket at `socket_file`.
Accepts connections on the UNIX socket, Accepts connections on the UNIX socket,
until one client sends one of the given commands as first message. until a client sends one of the given `messages`.
Closes the UNIX socket. Closes the UNIX socket.
:returns: The command that was received. :returns: The message that was received.
""" """
# INSPIRATION: https://pymotw.com/3/socket/uds.html # INSPIRATION: https://pymotw.com/3/socket/uds.html
@ -23,7 +24,7 @@ def wait_until_command_received(socket_file: Path, commands: list[bytes]) -> byt
sock.bind(str(socket_file)) sock.bind(str(socket_file))
sock.listen(1) sock.listen(1)
command = accept_loop_until_command_received(sock, commands) command = accept_loop_until_message(sock, messages)
print(f'Closing socket {socket_file}') print(f'Closing socket {socket_file}')
sock.close() sock.close()
@ -32,7 +33,8 @@ def wait_until_command_received(socket_file: Path, commands: list[bytes]) -> byt
return command return command
def accept_loop_until_command_received(sock: socket.socket, commands: list[bytes]) -> bytes: def accept_loop_until_message(sock: socket.socket,
messages: list[bytes]) -> bytes:
""" """
Uses an open UNIX socket. Uses an open UNIX socket.
@ -44,16 +46,16 @@ def accept_loop_until_command_received(sock: socket.socket, commands: list[bytes
:returns: The command that was received. :returns: The command that was received.
""" """
bufsize = max([len(cmd) for cmd in commands]) + len(b'\n') bufsize = max([len(msg) for msg in messages]) + len(b'\n')
while True: while True:
connection, client_address = sock.accept() connection, client_address = sock.accept()
try: try:
b: bytes = connection.recv(bufsize) b: bytes = connection.recv(bufsize)
for cmd in commands: for msg in messages:
if b == cmd: if b == msg:
print(f'Received "{cmd}".') print(f'Received "{msg}".')
return cmd return msg
print(f'Received unknown message: {b}') print(f'Received unknown message: {b}')
except Exception as e: except Exception as e:
print(f'Error while reading message: {e}') print(f'Error while reading message: {e}')