mirror of
https://codeberg.org/privacy1st/subprocess-util
synced 2024-12-22 22:06:05 +01:00
add unix_socket_output and btrfs_send_chunks_v3
This commit is contained in:
parent
ddd5b9a1fe
commit
bfd2906ccd
@ -1,60 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
import argparse
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from p1st.exec_print_receive import execute_print_receive_chunks
|
|
||||||
from p1st.common import _get_chunk_file, _get_remote_socket, _get_chunk_tmpl
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
args = parse_args()
|
|
||||||
|
|
||||||
target_path: Path = args.target_path
|
|
||||||
command = ['btrfs', 'receive', str(target_path.parent)]
|
|
||||||
target_socket = _get_remote_socket(target_path)
|
|
||||||
|
|
||||||
execute_print_receive_chunks(
|
|
||||||
command=command,
|
|
||||||
socket_file=target_socket,
|
|
||||||
chunk_file_tmpl=args.chunk_tmpl,
|
|
||||||
get_chunk_file=_get_chunk_file,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_args():
|
|
||||||
parser = argparse.ArgumentParser(prog='btrfs-receive-chunks')
|
|
||||||
|
|
||||||
parser.add_argument('--chunk-tmpl',
|
|
||||||
help='During btrfs-receive, chunks are saved as "CHUNK_TMPL.CHUNK_NUMBER". '
|
|
||||||
'The default value of CHUNK_TMPL is "SUBVOLUME.CHUNK". '
|
|
||||||
'One can change it to e.g. "/tmp/chunk/SUBVOLUME".',
|
|
||||||
dest='chunk_tmpl',
|
|
||||||
default=None,
|
|
||||||
type=Path,
|
|
||||||
metavar='CHUNK_TMPL'
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument('target_path',
|
|
||||||
help='Path where the subvolume will be created. '
|
|
||||||
'The last component of the path '
|
|
||||||
'must be equal to the name of the subvolume on the sending side.',
|
|
||||||
type=Path,
|
|
||||||
metavar='SUBVOLUME'
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
# Make all paths absolute.
|
|
||||||
if args.chunk_tmpl:
|
|
||||||
args.chunk_tmpl = args.chunk_tmpl.absolute()
|
|
||||||
args.target_path = args.target_path.absolute()
|
|
||||||
|
|
||||||
if not args.chunk_tmpl:
|
|
||||||
args.chunk_tmpl = _get_chunk_tmpl(args.target_path)
|
|
||||||
|
|
||||||
return args
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
@ -11,14 +11,16 @@ from p1st.unix_sock_input import accept_loop_until_message
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
args = parse_args()
|
args = parse_args()
|
||||||
|
#
|
||||||
command = ['btrfs', 'receive', str(args.target_path.parent)]
|
|
||||||
|
|
||||||
def handle_chunks(queue_put: Queue.put):
|
|
||||||
chunk_dir: Path = args.chunk_dir
|
chunk_dir: Path = args.chunk_dir
|
||||||
|
target_path: Path = args.target_path
|
||||||
|
|
||||||
chunk_dir.mkdir(parents=True, exist_ok=True)
|
chunk_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
socket_file: Path = args.target_path.parent.joinpath(f'{args.target_path.name}.SOCKET')
|
command = ['btrfs', 'receive', str(target_path.parent)]
|
||||||
|
|
||||||
|
def handle_chunks(queue_put: Queue.put):
|
||||||
|
socket_file: Path = target_path.parent.joinpath(f'{target_path.name}.SOCKET')
|
||||||
socket_file.parent.mkdir(parents=True, exist_ok=True)
|
socket_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
print(f'Listening on socket {socket_file}')
|
print(f'Listening on socket {socket_file}')
|
||||||
@ -32,7 +34,7 @@ def main():
|
|||||||
msg = accept_loop_until_message(sock, messages)
|
msg = accept_loop_until_message(sock, messages)
|
||||||
|
|
||||||
last_chunk = msg == b'EOF\n'
|
last_chunk = msg == b'EOF\n'
|
||||||
chunk_path = chunk_dir.joinpath(f'{args.target_path.name}.CHUNK.{chunk_no}')
|
chunk_path = chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}')
|
||||||
queue_put((chunk_path, last_chunk))
|
queue_put((chunk_path, last_chunk))
|
||||||
|
|
||||||
if last_chunk:
|
if last_chunk:
|
||||||
|
@ -10,33 +10,42 @@ from p1st.repeat import repeat_until_successful
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
args = parse_args()
|
args = parse_args()
|
||||||
|
#
|
||||||
|
ssh_target = args.ssh_target
|
||||||
|
chunk_size = args.chunk_size
|
||||||
|
compressed_data: bool = args.compressed_data
|
||||||
|
child: Path = args.child
|
||||||
|
parent: Path | None = args.parent
|
||||||
|
chunk_dir: Path = args.chunk_dir
|
||||||
|
target_chunk_dir: Path = args.target_chunk_dir
|
||||||
|
target_path: Path = args.target_path
|
||||||
|
|
||||||
command_parts = (
|
command_parts = (
|
||||||
['btrfs', 'send'],
|
['btrfs', 'send'],
|
||||||
['-p', str(args.parent)] if args.parent else [],
|
['-p', str(parent)] if parent else [],
|
||||||
['--compress-data'] if args.compressed_data else [],
|
['--compress-data'] if compressed_data else [],
|
||||||
[str(args.child)]
|
[str(child)]
|
||||||
)
|
)
|
||||||
command = [x for xs in command_parts for x in xs]
|
command = [x for xs in command_parts for x in xs]
|
||||||
|
|
||||||
def get_chunk_path(chunk_no: int) -> Path:
|
def get_chunk_path(chunk_no: int) -> Path:
|
||||||
return args.chunk_dir.joinpath(f'{args.child.name}.CHUNK.{chunk_no}')
|
return chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}')
|
||||||
|
|
||||||
def handle_chunk(chunk_no: int, last_chunk: bool):
|
def handle_chunk(chunk_no: int, last_chunk: bool):
|
||||||
chunk_path = get_chunk_path(chunk_no)
|
chunk_path = get_chunk_path(chunk_no)
|
||||||
target_chunk_path = args.target_chunk_dir.joinpath(f'{args.child.name}.CHUNK.{chunk_no}')
|
target_chunk_path = target_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}')
|
||||||
print(f'Handling chunk {chunk_path}')
|
print(f'Handling chunk {chunk_path}')
|
||||||
|
|
||||||
usr_confirmation_socket = chunk_path.parent.joinpath(f'{chunk_path.name}.SOCKET')
|
usr_confirmation_socket = chunk_path.parent.joinpath(f'{chunk_path.name}.SOCKET')
|
||||||
target_socket = args.target_path.parent.joinpath(f'{args.target_path.name}.SOCKET')
|
target_socket = target_path.parent.joinpath(f'{target_path.name}.SOCKET')
|
||||||
|
|
||||||
message = 'EOF' if last_chunk else 'OK'
|
message = 'EOF' if last_chunk else 'OK'
|
||||||
|
|
||||||
rsync_cmd = ['rsync',
|
rsync_cmd = ['rsync',
|
||||||
str(chunk_path),
|
str(chunk_path),
|
||||||
f'{args.ssh_target}:{str(target_chunk_path)}']
|
f'{ssh_target}:{str(target_chunk_path)}']
|
||||||
inform_cmd = ['ssh',
|
inform_cmd = ['ssh',
|
||||||
args.ssh_target,
|
ssh_target,
|
||||||
f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
|
f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
|
||||||
|
|
||||||
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
|
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
|
||||||
@ -46,7 +55,7 @@ def main():
|
|||||||
command=command,
|
command=command,
|
||||||
get_chunk_path=get_chunk_path,
|
get_chunk_path=get_chunk_path,
|
||||||
handle_chunk=handle_chunk,
|
handle_chunk=handle_chunk,
|
||||||
chunk_size=args.chunk_size,
|
chunk_size=chunk_size,
|
||||||
)
|
)
|
||||||
exit(returncode)
|
exit(returncode)
|
||||||
|
|
||||||
|
154
src/p1st/btrfs_send_chunks_v3.py
Normal file
154
src/p1st/btrfs_send_chunks_v3.py
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import argparse
|
||||||
|
import shlex
|
||||||
|
import socket
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from p1st.exec_produce_chunks import execute_produce_chunks
|
||||||
|
from p1st.repeat import repeat_until_successful
|
||||||
|
from p1st.unix_sock_input import accept_loop_until_message
|
||||||
|
from p1st.unix_socket_output import write_message
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = parse_args()
|
||||||
|
#
|
||||||
|
chunk_size = args.chunk_size
|
||||||
|
compressed_data: bool = args.compressed_data
|
||||||
|
child: Path = args.child
|
||||||
|
parent: Path | None = args.parent
|
||||||
|
chunk_dir: Path = args.chunk_dir
|
||||||
|
|
||||||
|
chunk_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
command_parts = (
|
||||||
|
['btrfs', 'send'],
|
||||||
|
['-p', str(parent)] if parent else [],
|
||||||
|
['--compress-data'] if compressed_data else [],
|
||||||
|
[str(child)]
|
||||||
|
)
|
||||||
|
command = [x for xs in command_parts for x in xs]
|
||||||
|
|
||||||
|
def get_chunk_path(chunk_no: int) -> Path:
|
||||||
|
return chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}')
|
||||||
|
|
||||||
|
def handle_chunk(chunk_no: int, last_chunk: bool):
|
||||||
|
chunk_path = get_chunk_path(chunk_no)
|
||||||
|
print(f'Handling chunk {chunk_path}')
|
||||||
|
|
||||||
|
# Create in_socket
|
||||||
|
in_socket: Path = chunk_dir.parent.joinpath(f'{child.name}.SOCKET.in')
|
||||||
|
in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
in_sock.bind(str(in_socket))
|
||||||
|
in_sock.listen(1)
|
||||||
|
|
||||||
|
# Write to out_socket
|
||||||
|
# -> Inform receiving side, that the next chung can be transferred.
|
||||||
|
out_socket: Path = chunk_dir.parent.joinpath(f'{child.name}.SOCKET.out')
|
||||||
|
message = b'EOF\n' if last_chunk else b'OK\n'
|
||||||
|
write_message(out_socket, message)
|
||||||
|
|
||||||
|
# Read from in_socket
|
||||||
|
# -> Receiving side informs us after the chunk has been transferred.
|
||||||
|
_command = accept_loop_until_message(in_sock, [b'OK\n'])
|
||||||
|
|
||||||
|
# Close in_socket
|
||||||
|
in_sock.close()
|
||||||
|
in_socket.unlink(missing_ok=False)
|
||||||
|
|
||||||
|
returncode = execute_produce_chunks(
|
||||||
|
command=command,
|
||||||
|
get_chunk_path=get_chunk_path,
|
||||||
|
handle_chunk=handle_chunk,
|
||||||
|
chunk_size=chunk_size,
|
||||||
|
)
|
||||||
|
exit(returncode)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
parser = argparse.ArgumentParser(prog='btrfs-send-chunks')
|
||||||
|
|
||||||
|
parser.add_argument('-p',
|
||||||
|
help='Path to parent subvolume; forwarded to btrfs-send.',
|
||||||
|
dest='parent',
|
||||||
|
default=None,
|
||||||
|
type=Path,
|
||||||
|
metavar='PARENT_SUBVOLUME'
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('--compressed-data',
|
||||||
|
help='Forwarded to btrfs-send.',
|
||||||
|
dest='compressed_data',
|
||||||
|
action='store_true',
|
||||||
|
default=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('--chunk-size',
|
||||||
|
help='Size in bytes; defaults to 128 MB.',
|
||||||
|
dest='chunk_size',
|
||||||
|
type=int,
|
||||||
|
default=128 * 1024 * 1024,
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('--chunk-dir',
|
||||||
|
help='Chunks are saved in this directory. '
|
||||||
|
'Defaults to parent directory of CHILD.',
|
||||||
|
dest='chunk_dir',
|
||||||
|
type=Path,
|
||||||
|
metavar='CHUNK_DIR',
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('--target-chunk-dir',
|
||||||
|
help='Chunks are saved in this directory on SSH_TARGET. '
|
||||||
|
'Must be an absolute path. '
|
||||||
|
'Defaults to parent directory of TARGET_PATH.',
|
||||||
|
dest='target_chunk_dir',
|
||||||
|
type=Path,
|
||||||
|
metavar='TARGET_CHUNK_DIR',
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('child',
|
||||||
|
help='Path to child subvolume. Forwarded to btrfs-send.',
|
||||||
|
type=Path,
|
||||||
|
metavar='CHILD_SUBVOLUME'
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('ssh_target',
|
||||||
|
help='Hostname of target computer; as configured in ~/.ssh/config.',
|
||||||
|
metavar='SSH_TARGET'
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('target_path',
|
||||||
|
help='Absolute path where the subvolume will be created on SSH_TARGET.',
|
||||||
|
type=Path,
|
||||||
|
metavar='TARGET_PATH'
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Make all paths absolute. Set default values.
|
||||||
|
args.child = args.child.absolute()
|
||||||
|
if not args.target_path.is_absolute():
|
||||||
|
raise ValueError('TARGET_PATH must be absolute')
|
||||||
|
#
|
||||||
|
if args.chunk_dir:
|
||||||
|
args.chunk_dir = args.chunk_dir.absolute()
|
||||||
|
else: # Default value
|
||||||
|
args.chunk_dir = args.child.absolute().parent
|
||||||
|
if args.target_chunk_dir:
|
||||||
|
if not args.target_chunk_dir.is_absolute():
|
||||||
|
raise ValueError('TARGET_CHUNK_DIR must be absolute')
|
||||||
|
else: # Default value
|
||||||
|
args.target_chunk_dir = args.target_path.parent
|
||||||
|
#
|
||||||
|
if args.parent:
|
||||||
|
args.parent = args.parent.absolute()
|
||||||
|
|
||||||
|
return args
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
17
src/p1st/unix_socket_output.py
Normal file
17
src/p1st/unix_socket_output.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
import socket
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def write_message(socket_file: Path,
|
||||||
|
message: bytes):
|
||||||
|
|
||||||
|
# INSPIRATION: https://pymotw.com/3/socket/uds.html
|
||||||
|
|
||||||
|
print(f'Connecting to socket {socket_file}')
|
||||||
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
sock.connect(str(socket_file))
|
||||||
|
try:
|
||||||
|
sock.sendall(message)
|
||||||
|
finally:
|
||||||
|
print(f'Closing socket {socket_file}')
|
||||||
|
sock.close()
|
Loading…
Reference in New Issue
Block a user