refactor: extract exec_{send,receive}_chunks from btrfs_{send,receive}

This commit is contained in:
Daniel Langbein 2023-03-18 17:52:49 +01:00
parent c61e1a8c1d
commit 780fe7f1d1
9 changed files with 257 additions and 188 deletions

View File

@ -1,13 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import shlex
from pathlib import Path
from queue import Queue
from subprocess_util.common import Message1, Message2, get_chunk_path, get_inform1_path, get_inform2_path
from subprocess_util.exec_consume_chunks import execute_consume_chunks
from subprocess_util.repeat import repeat_until_successful
from subprocess_util.exec_receive_chunks_active import exec_receive_chunks_active
def main():
@ -22,57 +18,12 @@ def main():
command = ['btrfs', 'receive', str(target_path.parent)]
def handle_chunks(queue_put: Queue.put):
chunk_no = 1
while True:
target_chunk_path = get_chunk_path(target_chunk_dir, target_path, chunk_no)
source_chunk_path = get_chunk_path(source_chunk_dir, target_path, chunk_no)
in_socket: Path = get_inform2_path(source_chunk_dir, target_path)
user_socket = target_chunk_dir.parent.joinpath(f'{target_path.name}.SOCKET.user')
inform_path = get_inform1_path(source_chunk_path)
inform_cmd = ['ssh',
ssh_source,
f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}']
messages = [Message1.OK.value, Message1.EOF.value]
rsync_cmd = ['rsync',
f'{ssh_source}:{source_chunk_path}',
f'{target_chunk_path}']
inform2_cmd = ['ssh',
ssh_source,
f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(in_socket))}"]
# Wait until next chunk can be transferred from sending side.
#
# It can happen,that the sending side has not yet saved the next chunk.
# In that case `cat ...` will fail.
# We will retry 3 times, each time after 5 seconds sleep.
#
# If all 4 `cat ...` attempts failed, we wait until the user has inspected the problem.
msg = repeat_until_successful(inform_cmd, user_socket, retries=3, retry_delay_seconds=5)
if msg not in messages:
raise ValueError(f'Invalid message: {msg}')
last_chunk = msg == Message1.EOF.value
# Transfer chunk.
repeat_until_successful(rsync_cmd, user_socket)
# Add chunk path to queue.
queue_put((target_chunk_path, last_chunk))
# Inform sending side about successful transfer.
repeat_until_successful(inform2_cmd, user_socket)
if last_chunk:
break
chunk_no += 1
returncode = execute_consume_chunks(
command=command,
handle_chunks=handle_chunks,
exec_receive_chunks_active(
command,
ssh_source,
source_chunk_dir,
target_chunk_dir,
)
exit(returncode)
def parse_args():

View File

@ -1,12 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import socket
from pathlib import Path
from queue import Queue
from subprocess_util.exec_consume_chunks import execute_consume_chunks
from subprocess_util.unix_sock_input import accept_loop_until_message
from subprocess_util.exec_receive_chunks_passive import exec_receive_chunks_passive
def main():
@ -15,39 +12,11 @@ def main():
target_chunk_dir: Path = args.chunk_dir
target_path: Path = args.target_path
target_chunk_dir.mkdir(parents=True, exist_ok=True)
command = ['btrfs', 'receive', str(target_path.parent)]
def handle_chunks(queue_put: Queue.put):
socket_file: Path = target_path.parent.joinpath(f'{target_path.name}.SOCKET')
socket_file.parent.mkdir(parents=True, exist_ok=True)
print(f'Listening on socket {socket_file}')
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(str(socket_file))
sock.listen(1)
chunk_no = 1
messages = [b'OK\n', b'EOF\n']
while True:
msg = accept_loop_until_message(sock, messages)
last_chunk = msg == b'EOF\n'
chunk_path = target_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}')
queue_put((chunk_path, last_chunk))
if last_chunk:
break
chunk_no += 1
print(f'Closing socket {socket_file}')
sock.close()
socket_file.unlink(missing_ok=False)
returncode = execute_consume_chunks(
command=command,
handle_chunks=handle_chunks,
returncode = exec_receive_chunks_passive(
command,
target_chunk_dir,
)
exit(returncode)
@ -76,10 +45,12 @@ def parse_args():
# Make all paths absolute. Set default values.
args.target_path = args.target_path.absolute()
if args.chunk_dir:
args.chunk_dir = args.chunk_dir.absolute()
#
if args.chunk_dir is None:
# Default value
args.chunk_dir = args.target_path.parent.joinpath(f'{args.target_path.name}.chunk')
else:
args.chunk_dir = args.target_path.absolute().parent
args.chunk_dir = args.chunk_dir.absolute()
return args

View File

@ -1,11 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import shlex
from pathlib import Path
from subprocess_util.exec_produce_chunks import execute_produce_chunks
from subprocess_util.repeat import repeat_until_successful
from subprocess_util.exec_send_chunks_active import exec_send_chunks_active
def main():
@ -18,7 +16,6 @@ def main():
parent: Path | None = args.parent
source_chunk_dir: Path = args.chunk_dir
target_chunk_dir: Path = args.target_chunk_dir
target_path: Path = args.target_path
command_parts = (
['btrfs', 'send'],
@ -28,36 +25,12 @@ def main():
)
command = [x for xs in command_parts for x in xs]
def get_source_chunk_path(chunk_no: int) -> Path:
return source_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}')
def handle_chunk(chunk_no: int, last_chunk: bool):
source_chunk_path = get_source_chunk_path(chunk_no)
target_chunk_path = target_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}')
print(f'Handling chunk {source_chunk_path}')
usr_confirmation_socket = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.SOCKET')
target_socket = target_path.parent.joinpath(f'{target_path.name}.SOCKET')
message = 'EOF' if last_chunk else 'OK'
rsync_cmd = ['rsync',
str(source_chunk_path),
f'{ssh_target}:{str(target_chunk_path)}']
inform_cmd = ['ssh',
ssh_target,
f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
# Delete local chunk after it has been transferred.
source_chunk_path.unlink(missing_ok=False)
repeat_until_successful(inform_cmd, usr_confirmation_socket)
returncode = execute_produce_chunks(
command=command,
get_chunk_path=get_source_chunk_path,
handle_chunk=handle_chunk,
chunk_size=chunk_size,
returncode = exec_send_chunks_active(
command,
ssh_target,
source_chunk_dir,
target_chunk_dir,
chunk_size
)
exit(returncode)
@ -89,7 +62,7 @@ def parse_args():
parser.add_argument('--chunk-dir',
help='Chunks are saved in this directory. '
'Defaults to parent directory of CHILD.',
'Defaults to folder `<CHILD_SUBVOLUME>.chunk` next to CHILD_SUBVOLUME.',
dest='chunk_dir',
type=Path,
metavar='CHUNK_DIR',
@ -99,7 +72,7 @@ def parse_args():
parser.add_argument('--target-chunk-dir',
help='Chunks are saved in this directory on SSH_TARGET. '
'Must be an absolute path. '
'Defaults to parent directory of TARGET_PATH.',
'Defaults to directory `<TARGET_PATH>.chunk` next to TARGET_PATH.',
dest='target_chunk_dir',
type=Path,
metavar='TARGET_CHUNK_DIR',
@ -118,7 +91,7 @@ def parse_args():
)
parser.add_argument('target_path',
help='Absolute path where the subvolume will be created on SSH_TARGET.',
help='Absolute path to the read-only subvolume to be created on SSH_TARGET.',
type=Path,
metavar='TARGET_PATH'
)
@ -127,20 +100,24 @@ def parse_args():
# Make all paths absolute. Set default values.
args.child = args.child.absolute()
#
if not args.target_path.is_absolute():
raise ValueError('TARGET_PATH must be absolute')
#
if args.chunk_dir:
if args.chunk_dir is None:
# Default value
args.chunk_dir = args.child.parent.joinpath(f'{args.child.name}.chunk')
else:
args.chunk_dir = args.chunk_dir.absolute()
else: # Default value
args.chunk_dir = args.child.absolute().parent
if args.target_chunk_dir:
#
if args.target_chunk_dir is None:
# Default value
args.target_chunk_dir = args.target_path.parent.joinpath(f'{args.target_path.name}.chunk')
else:
if not args.target_chunk_dir.is_absolute():
raise ValueError('TARGET_CHUNK_DIR must be absolute')
else: # Default value
args.target_chunk_dir = args.target_path.parent
#
if args.parent:
if args.parent is not None:
args.parent = args.parent.absolute()
return args

View File

@ -1,12 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import socket
from pathlib import Path
from subprocess_util.common import Message1, Message2, get_inform1_path, get_inform2_path, get_chunk_path
from subprocess_util.exec_produce_chunks import execute_produce_chunks
from subprocess_util.unix_sock_input import accept_loop_until_message
from subprocess_util.exec_send_chunks_passive import exec_send_chunks_passive
def main():
@ -27,40 +24,10 @@ def main():
[str(child)]
)
command = [x for xs in command_parts for x in xs]
def get_source_chunk_path(chunk_no: int) -> Path:
return get_chunk_path(source_chunk_dir, child, chunk_no)
def handle_chunk(chunk_no: int, last_chunk: bool):
source_chunk_path = get_source_chunk_path(chunk_no)
print(f'Handling chunk {source_chunk_path}')
# Create in_socket
in_socket: Path = get_inform2_path(source_chunk_dir, child)
in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
in_sock.bind(str(in_socket))
in_sock.listen(1)
# Inform receiving side, that the next chunk can be transferred.
inform_path = get_inform1_path(source_chunk_path)
inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value)
# Read from in_socket
# -> Receiving side informs us after the chunk has been transferred.
_command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value])
# Delete local chunk after it has been transferred.
source_chunk_path.unlink(missing_ok=False)
# Close in_socket
in_sock.close()
in_socket.unlink(missing_ok=False)
returncode = execute_produce_chunks(
command=command,
get_chunk_path=get_source_chunk_path,
handle_chunk=handle_chunk,
chunk_size=chunk_size,
returncode = exec_send_chunks_passive(
command,
source_chunk_dir,
chunk_size,
)
exit(returncode)

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from enum import Enum
from pathlib import Path
class Message1(Enum):
@ -12,15 +11,3 @@ class Message1(Enum):
class Message2(Enum):
OK_BINARY = b'OK\n'
OK_STR = 'OK\n'
def get_chunk_path(chunk_dir: Path, subvolume: Path, chunk_no: int) -> Path:
return chunk_dir.joinpath(f'{subvolume.name}.CHUNK.{chunk_no}')
def get_inform1_path(source_chunk_path: Path) -> Path:
return source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE')
def get_inform2_path(source_chunk_dir: Path, subvolume: Path) -> Path:
return source_chunk_dir.joinpath(f'{subvolume.name}.SOCKET.in')

View File

@ -0,0 +1,73 @@
import shlex
from pathlib import Path
from queue import Queue
from subprocess_util.common import Message2, Message1
from subprocess_util.exec_consume_chunks import execute_consume_chunks
from subprocess_util.repeat import repeat_until_successful
def exec_receive_chunks_active(
command: list[str],
ssh_source: str,
source_chunk_dir: Path,
target_chunk_dir: Path,
) -> int:
def get_source_chunk_path(chunk_no: int) -> Path:
return source_chunk_dir.joinpath(f'{chunk_no}')
def get_target_chunk_path(chunk_no: int) -> Path:
return target_chunk_dir.joinpath(f'{chunk_no}')
def handle_chunks(queue_put: Queue.put):
# TODO: get starting value of chunk_no as argument
chunk_no = 1
while True:
target_chunk_path = get_target_chunk_path(chunk_no)
source_chunk_path = get_source_chunk_path(chunk_no)
source_socket = source_chunk_dir.joinpath('SOCKET')
usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET')
inform_path = source_chunk_path.joinpath(f'{chunk_no}.COMPLETE')
inform_cmd = ['ssh',
ssh_source,
f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}']
messages = [Message1.OK.value, Message1.EOF.value]
rsync_cmd = ['rsync',
f'{ssh_source}:{source_chunk_path}',
f'{target_chunk_path}']
inform2_cmd = ['ssh',
ssh_source,
f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(source_socket))}"]
# Wait until next chunk can be transferred from sending side.
#
# It can happen, that the sending side has not yet saved the next chunk.
# In that case `cat ...` will fail.
# We will retry 3 times, each time after 5 seconds sleep.
#
# If all 4 `cat ...` attempts failed, we wait until the user has inspected the problem.
msg = repeat_until_successful(inform_cmd, usr_confirmation_socket, retries=3, retry_delay_seconds=5)
if msg not in messages:
raise ValueError(f'Invalid message: {msg}')
last_chunk = msg == Message1.EOF.value
# Transfer chunk.
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
# Add chunk path to queue.
queue_put((target_chunk_path, last_chunk))
# Inform sending side about successful transfer.
repeat_until_successful(inform2_cmd, usr_confirmation_socket)
if last_chunk:
break
chunk_no += 1
returncode = execute_consume_chunks(
command=command,
handle_chunks=handle_chunks,
)
return returncode

View File

@ -0,0 +1,46 @@
import socket
from pathlib import Path
from queue import Queue
from subprocess_util.exec_consume_chunks import execute_consume_chunks
from subprocess_util.unix_sock_input import accept_loop_until_message
def exec_receive_chunks_passive(
command: list[str],
target_chunk_dir: Path,
) -> int:
def get_target_chunk_path(chunk_no: int) -> Path:
return target_chunk_dir.joinpath(f'{chunk_no}')
def handle_chunks(queue_put: Queue.put):
target_socket = target_chunk_dir.joinpath('SOCKET')
target_socket.parent.mkdir(parents=True, exist_ok=True)
print(f'Listening on socket {target_socket}')
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(str(target_socket))
sock.listen(1)
chunk_no = 1
messages = [b'OK\n', b'EOF\n']
while True:
msg = accept_loop_until_message(sock, messages)
last_chunk = msg == b'EOF\n'
target_chunk_path = get_target_chunk_path(chunk_no)
queue_put((target_chunk_path, last_chunk))
if last_chunk:
break
chunk_no += 1
print(f'Closing socket {target_socket}')
sock.close()
target_socket.unlink(missing_ok=False)
returncode = execute_consume_chunks(
command=command,
handle_chunks=handle_chunks,
)
return returncode

View File

@ -0,0 +1,49 @@
import shlex
from pathlib import Path
from subprocess_util.exec_produce_chunks import execute_produce_chunks
from subprocess_util.repeat import repeat_until_successful
def exec_send_chunks_active(
command: list[str],
ssh_target: str,
source_chunk_dir: Path,
target_chunk_dir: Path,
chunk_size: int = 128 * 1024 * 1024,
) -> int:
def get_source_chunk_path(chunk_no: int) -> Path:
return source_chunk_dir.joinpath(f'{chunk_no}')
def get_target_chunk_path(chunk_no: int) -> Path:
return target_chunk_dir.joinpath(f'{chunk_no}')
def handle_chunk(chunk_no: int, last_chunk: bool):
source_chunk_path = get_source_chunk_path(chunk_no)
target_chunk_path = get_target_chunk_path(chunk_no)
print(f'Handling chunk {source_chunk_path}')
usr_confirmation_socket = source_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET')
target_socket = target_chunk_dir.joinpath('SOCKET')
message = 'EOF' if last_chunk else 'OK'
rsync_cmd = ['rsync',
str(source_chunk_path),
f'{ssh_target}:{str(target_chunk_path)}']
inform_cmd = ['ssh',
ssh_target,
f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
# Delete local chunk after it has been transferred.
source_chunk_path.unlink(missing_ok=False)
repeat_until_successful(inform_cmd, usr_confirmation_socket)
returncode = execute_produce_chunks(
command=command,
get_chunk_path=get_source_chunk_path,
handle_chunk=handle_chunk,
chunk_size=chunk_size,
)
return returncode

View File

@ -0,0 +1,48 @@
import socket
from pathlib import Path
from subprocess_util.common import Message1, Message2
from subprocess_util.exec_produce_chunks import execute_produce_chunks
from subprocess_util.unix_sock_input import accept_loop_until_message
def exec_send_chunks_passive(
command: list[str],
source_chunk_dir: Path,
chunk_size: int = 128 * 1024 * 1024,
) -> int:
def get_source_chunk_path(chunk_no: int) -> Path:
return source_chunk_dir.joinpath(f'{chunk_no}')
def handle_chunk(chunk_no: int, last_chunk: bool):
source_chunk_path = get_source_chunk_path(chunk_no)
print(f'Handling chunk {source_chunk_path}')
# Create in_socket
source_socket = source_chunk_dir.joinpath('SOCKET')
in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
in_sock.bind(str(source_socket))
in_sock.listen(1)
# Inform receiving side, that the next chunk can be transferred.
inform_path = source_chunk_path.joinpath(f'{chunk_no}.COMPLETE')
inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value)
# Read from in_socket
# -> Receiving side informs us after the chunk has been transferred.
_command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value])
# Delete local chunk after it has been transferred.
source_chunk_path.unlink(missing_ok=False)
# Close in_socket
in_sock.close()
source_socket.unlink(missing_ok=False)
returncode = execute_produce_chunks(
command=command,
get_chunk_path=get_source_chunk_path,
handle_chunk=handle_chunk,
chunk_size=chunk_size,
)
return returncode