mirror of
https://codeberg.org/privacy1st/subprocess-util
synced 2024-12-22 22:06:05 +01:00
rename to exec_produce_chunks; add exec_consume_chunks; add btrfs_receive_chunks_v2
This commit is contained in:
parent
7f6cd90c18
commit
ddd5b9a1fe
@ -32,6 +32,6 @@ where = src
|
|||||||
[options.entry_points]
|
[options.entry_points]
|
||||||
; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html
|
; https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html
|
||||||
console_scripts =
|
console_scripts =
|
||||||
btrfs-send-chunks = p1st.btrfs_send_chunks:main
|
|
||||||
btrfs-send-chunks-v2 = p1st.btrfs_send_chunks_v2:main
|
btrfs-send-chunks-v2 = p1st.btrfs_send_chunks_v2:main
|
||||||
btrfs-receive-chunks = p1st.btrfs_receive_chunks:main
|
btrfs-receive-chunks = p1st.btrfs_receive_chunks:main
|
||||||
|
btrfs-receive-chunks-v2 = p1st.btrfs_receive_chunks_v2:main
|
||||||
|
86
src/p1st/btrfs_receive_chunks_v2.py
Normal file
86
src/p1st/btrfs_receive_chunks_v2.py
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import argparse
|
||||||
|
import socket
|
||||||
|
from pathlib import Path
|
||||||
|
from queue import Queue
|
||||||
|
|
||||||
|
from p1st.exec_consume_chunks import execute_consume_chunks
|
||||||
|
from p1st.unix_sock_input import accept_loop_until_message
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = parse_args()
|
||||||
|
|
||||||
|
command = ['btrfs', 'receive', str(args.target_path.parent)]
|
||||||
|
|
||||||
|
def handle_chunks(queue_put: Queue.put):
|
||||||
|
chunk_dir: Path = args.chunk_dir
|
||||||
|
chunk_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
socket_file: Path = args.target_path.parent.joinpath(f'{args.target_path.name}.SOCKET')
|
||||||
|
socket_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
print(f'Listening on socket {socket_file}')
|
||||||
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
sock.bind(str(socket_file))
|
||||||
|
sock.listen(1)
|
||||||
|
|
||||||
|
chunk_no = 1
|
||||||
|
messages = [b'OK\n', b'EOF\n']
|
||||||
|
while True:
|
||||||
|
msg = accept_loop_until_message(sock, messages)
|
||||||
|
|
||||||
|
last_chunk = msg == b'EOF\n'
|
||||||
|
chunk_path = chunk_dir.joinpath(f'{args.target_path.name}.CHUNK.{chunk_no}')
|
||||||
|
queue_put((chunk_path, last_chunk))
|
||||||
|
|
||||||
|
if last_chunk:
|
||||||
|
break
|
||||||
|
chunk_no += 1
|
||||||
|
|
||||||
|
print(f'Closing socket {socket_file}')
|
||||||
|
sock.close()
|
||||||
|
socket_file.unlink(missing_ok=False)
|
||||||
|
|
||||||
|
returncode = execute_consume_chunks(
|
||||||
|
command=command,
|
||||||
|
handle_chunks=handle_chunks,
|
||||||
|
)
|
||||||
|
exit(returncode)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
parser = argparse.ArgumentParser(prog='btrfs-receive-chunks')
|
||||||
|
|
||||||
|
parser.add_argument('--chunk-dir',
|
||||||
|
help='Chunks are saved in this directory. '
|
||||||
|
'Defaults to parent directory of SUBVOLUME.',
|
||||||
|
dest='chunk_dir',
|
||||||
|
type=Path,
|
||||||
|
metavar='CHUNK_DIR',
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('target_path',
|
||||||
|
help='Path where the subvolume will be created. '
|
||||||
|
'The last component of the path '
|
||||||
|
'must be equal to the name of the subvolume on the sending side.',
|
||||||
|
type=Path,
|
||||||
|
metavar='SUBVOLUME'
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Make all paths absolute. Set default values.
|
||||||
|
args.target_path = args.target_path.absolute()
|
||||||
|
if args.chunk_dir:
|
||||||
|
args.chunk_dir = args.chunk_dir.absolute()
|
||||||
|
else:
|
||||||
|
args.chunk_dir = args.target_path.absolute().parent
|
||||||
|
|
||||||
|
return args
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
@ -1,115 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
import argparse
|
|
||||||
from pathlib import Path
|
|
||||||
import shlex
|
|
||||||
from typing import Callable
|
|
||||||
|
|
||||||
from p1st.exec_print_transfer import execute_print_transfer_chunks
|
|
||||||
from p1st.common import _get_chunk_file, _get_remote_socket, _get_chunk_tmpl
|
|
||||||
from p1st.transfer_inform import transfer_inform
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
args = parse_args()
|
|
||||||
|
|
||||||
command_parts = (
|
|
||||||
['btrfs', 'send'],
|
|
||||||
['-p', str(args.parent)] if args.parent else [],
|
|
||||||
['--compress-data'] if args.compressed_data else [],
|
|
||||||
[str(args.child)]
|
|
||||||
)
|
|
||||||
command = [x for xs in command_parts for x in xs]
|
|
||||||
|
|
||||||
chunk_tmpl = _get_chunk_tmpl(args.child)
|
|
||||||
target_chunk_tmpl = _get_chunk_tmpl(args.target_path)
|
|
||||||
|
|
||||||
execute_print_transfer_chunks(
|
|
||||||
command=command,
|
|
||||||
chunk_file_tmpl=chunk_tmpl,
|
|
||||||
chunk_transfer_fun=chunk_transfer_fun,
|
|
||||||
chunk_transfer_args=(args.ssh_target,
|
|
||||||
args.target_path,
|
|
||||||
_get_chunk_file,
|
|
||||||
target_chunk_tmpl),
|
|
||||||
chunk_size=args.chunk_size,
|
|
||||||
get_chunk_file=_get_chunk_file,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_args():
|
|
||||||
parser = argparse.ArgumentParser(prog='btrfs-send-chunks')
|
|
||||||
|
|
||||||
parser.add_argument('-p',
|
|
||||||
help='Path to parent subvolume; forwarded to btrfs-send.',
|
|
||||||
dest='parent',
|
|
||||||
default=None,
|
|
||||||
type=Path,
|
|
||||||
metavar='PARENT_SUBVOLUME'
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument('--compressed-data',
|
|
||||||
help='Forwarded to btrfs-send.',
|
|
||||||
dest='compressed_data',
|
|
||||||
action='store_true',
|
|
||||||
default=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument('--chunk-size',
|
|
||||||
help='Size in bytes; defaults to 128 MB.',
|
|
||||||
dest='chunk_size',
|
|
||||||
type=int,
|
|
||||||
default=128 * 1024 * 1024,
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument('child',
|
|
||||||
help='Path to child subvolume. Forwarded to btrfs-send.',
|
|
||||||
type=Path,
|
|
||||||
metavar='CHILD_SUBVOLUME'
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument('ssh_target',
|
|
||||||
help='Hostname of target computer; as configured in ~/.ssh/config.',
|
|
||||||
metavar='SSH_TARGET'
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument('target_path',
|
|
||||||
help='Path where the subvolume will be created on the remote ssh target.',
|
|
||||||
type=Path,
|
|
||||||
metavar='TARGET_PATH'
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
# Make all paths absolute.
|
|
||||||
if args.parent:
|
|
||||||
args.parent = args.parent.absolute()
|
|
||||||
args.child = args.child.absolute()
|
|
||||||
if args.target_path:
|
|
||||||
args.target_path = args.target_path.absolute()
|
|
||||||
|
|
||||||
return args
|
|
||||||
|
|
||||||
|
|
||||||
def chunk_transfer_fun(chunk_file: Path, ct: int, eof: bool,
|
|
||||||
ssh_target: str,
|
|
||||||
target_path: Path,
|
|
||||||
get_chunk_file: Callable[[Path, int], Path],
|
|
||||||
target_chunk_tmpl: Path,
|
|
||||||
):
|
|
||||||
target_chunk = get_chunk_file(target_chunk_tmpl, ct)
|
|
||||||
rsync_cmd = ['rsync', str(chunk_file), f'{ssh_target}:{str(target_chunk)}']
|
|
||||||
|
|
||||||
message = 'EOF' if eof else 'OK'
|
|
||||||
target_socket = _get_remote_socket(target_path)
|
|
||||||
inform_cmd = ['ssh', ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
|
|
||||||
|
|
||||||
transfer_inform(
|
|
||||||
rsync_cmd=rsync_cmd,
|
|
||||||
inform_cmd=inform_cmd,
|
|
||||||
user_input_file=chunk_file.parent.joinpath(f'{chunk_file.name}.SOCKET'),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
@ -4,7 +4,7 @@ import argparse
|
|||||||
import shlex
|
import shlex
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from p1st.exec_print_chunk import execute_print_chunk
|
from p1st.exec_produce_chunks import execute_produce_chunks
|
||||||
from p1st.repeat import repeat_until_successful
|
from p1st.repeat import repeat_until_successful
|
||||||
|
|
||||||
|
|
||||||
@ -42,12 +42,13 @@ def main():
|
|||||||
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
|
repeat_until_successful(rsync_cmd, usr_confirmation_socket)
|
||||||
repeat_until_successful(inform_cmd, usr_confirmation_socket)
|
repeat_until_successful(inform_cmd, usr_confirmation_socket)
|
||||||
|
|
||||||
execute_print_chunk(
|
returncode = execute_produce_chunks(
|
||||||
command=command,
|
command=command,
|
||||||
get_chunk_path=get_chunk_path,
|
get_chunk_path=get_chunk_path,
|
||||||
handle_chunk=handle_chunk,
|
handle_chunk=handle_chunk,
|
||||||
chunk_size=args.chunk_size,
|
chunk_size=args.chunk_size,
|
||||||
)
|
)
|
||||||
|
exit(returncode)
|
||||||
|
|
||||||
|
|
||||||
def parse_args():
|
def parse_args():
|
||||||
|
92
src/p1st/exec_consume_chunks.py
Normal file
92
src/p1st/exec_consume_chunks.py
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
from pathlib import Path
|
||||||
|
from queue import Queue
|
||||||
|
from typing import Callable, IO, AnyStr
|
||||||
|
|
||||||
|
|
||||||
|
def execute_consume_chunks(command: list[str],
|
||||||
|
handle_chunks: Callable[[Queue.put], None],
|
||||||
|
) -> int:
|
||||||
|
"""
|
||||||
|
:param command:
|
||||||
|
:param handle_chunks: Has one parameter, `queue_put`. After a chunks is saved, this method must call queue_put.(chunk_path, last_chunk).
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
process = subprocess.Popen(
|
||||||
|
command,
|
||||||
|
stdin=subprocess.PIPE,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
close_fds=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
q = Queue(maxsize=2)
|
||||||
|
|
||||||
|
threads = [
|
||||||
|
threading.Thread(
|
||||||
|
target=_stdin_worker,
|
||||||
|
args=(q.get,
|
||||||
|
process.stdin,
|
||||||
|
)),
|
||||||
|
threading.Thread(
|
||||||
|
target=handle_chunks,
|
||||||
|
args=(q.put,
|
||||||
|
)),
|
||||||
|
threading.Thread(
|
||||||
|
target=_stderr_worker,
|
||||||
|
args=(process.stderr,
|
||||||
|
)),
|
||||||
|
]
|
||||||
|
|
||||||
|
for t in threads:
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
returncode: int = process.wait()
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
return returncode
|
||||||
|
|
||||||
|
|
||||||
|
def _stdin_worker(queue_get: Queue.get,
|
||||||
|
binary_stdin: IO[AnyStr],
|
||||||
|
):
|
||||||
|
while True:
|
||||||
|
chunk_path, last_chunk = queue_get()
|
||||||
|
chunk = _read_chunk(chunk_path)
|
||||||
|
binary_stdin.write(chunk)
|
||||||
|
# binary_stdin.flush() # TODO: is this required?
|
||||||
|
if last_chunk:
|
||||||
|
break
|
||||||
|
|
||||||
|
binary_stdin.flush()
|
||||||
|
|
||||||
|
# TODO: Has this any effect? On stdin probably yes!
|
||||||
|
binary_stdin.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _read_chunk(chunk_path: Path) -> bytes:
|
||||||
|
"""
|
||||||
|
Reads a chunk from the given path.
|
||||||
|
"""
|
||||||
|
print(f'Reading chunk {chunk_path}')
|
||||||
|
|
||||||
|
# Fails if file does not exist.
|
||||||
|
return chunk_path.read_bytes()
|
||||||
|
|
||||||
|
|
||||||
|
def _stderr_worker(binary_stderr: IO[AnyStr]):
|
||||||
|
"""
|
||||||
|
Prints stderr of subprocess to sys.stderr.
|
||||||
|
"""
|
||||||
|
b: bytes
|
||||||
|
for b in binary_stderr:
|
||||||
|
sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}')
|
||||||
|
|
||||||
|
# TODO: Has this any effect?
|
||||||
|
# binary_stderr.close()
|
@ -8,11 +8,27 @@ from queue import Queue
|
|||||||
from typing import Callable, IO, AnyStr
|
from typing import Callable, IO, AnyStr
|
||||||
|
|
||||||
|
|
||||||
def execute_print_chunk(command: list[str],
|
def execute_produce_chunks(command: list[str],
|
||||||
get_chunk_path: Callable[[int], Path],
|
get_chunk_path: Callable[[int], Path],
|
||||||
handle_chunk: Callable[[int, bool], None],
|
handle_chunk: Callable[[int, bool], None],
|
||||||
chunk_size: int = 512, # TODO increase
|
chunk_size: int = 1024 * 1024,
|
||||||
) -> int:
|
) -> int:
|
||||||
|
"""
|
||||||
|
Executes the given command in a subprocess.
|
||||||
|
|
||||||
|
Stdout of the subprocess is saved in chunks.
|
||||||
|
The location of the chunks is determined by `get_chunk_path`.
|
||||||
|
|
||||||
|
A separate thread calls `handle_chunk` once a new chunk was saved.
|
||||||
|
|
||||||
|
Stderr of the subprocess is printed to sys.stderr.
|
||||||
|
|
||||||
|
:param command:
|
||||||
|
:param get_chunk_path:
|
||||||
|
:param handle_chunk:
|
||||||
|
:param chunk_size:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
process = subprocess.Popen(
|
process = subprocess.Popen(
|
||||||
command,
|
command,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
@ -112,14 +128,13 @@ def _chunk_worker(queue_get: Queue.get,
|
|||||||
chunk_no, last_chunk = queue_get()
|
chunk_no, last_chunk = queue_get()
|
||||||
handle_chunk(chunk_no, last_chunk)
|
handle_chunk(chunk_no, last_chunk)
|
||||||
if last_chunk:
|
if last_chunk:
|
||||||
break
|
return
|
||||||
|
|
||||||
|
|
||||||
def _stderr_worker(binary_stderr: IO[AnyStr]):
|
def _stderr_worker(binary_stderr: IO[AnyStr]):
|
||||||
"""
|
"""
|
||||||
Prints stderr of subprocess to sys.stderr.
|
Prints stderr of subprocess to sys.stderr.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
b: bytes
|
b: bytes
|
||||||
for b in binary_stderr:
|
for b in binary_stderr:
|
||||||
sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}')
|
sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}')
|
@ -8,9 +8,9 @@ from pathlib import Path
|
|||||||
|
|
||||||
from p1st.exec_capture import execute_capture
|
from p1st.exec_capture import execute_capture
|
||||||
from p1st.exec_print_capture import execute_print_capture
|
from p1st.exec_print_capture import execute_print_capture
|
||||||
from p1st.exec_print_chunk import execute_print_chunk
|
|
||||||
from p1st.exec_print_receive import execute_print_receive_chunks
|
from p1st.exec_print_receive import execute_print_receive_chunks
|
||||||
from p1st.exec_print_transfer import execute_print_transfer_chunks
|
from p1st.exec_print_transfer import execute_print_transfer_chunks
|
||||||
|
from p1st.exec_produce_chunks import execute_produce_chunks
|
||||||
from p1st.transfer_inform import transfer_inform
|
from p1st.transfer_inform import transfer_inform
|
||||||
|
|
||||||
|
|
||||||
@ -24,7 +24,8 @@ def test():
|
|||||||
# test8()
|
# test8()
|
||||||
# test9()
|
# test9()
|
||||||
# test10()
|
# test10()
|
||||||
test11()
|
# test11()
|
||||||
|
test12()
|
||||||
|
|
||||||
|
|
||||||
def test1():
|
def test1():
|
||||||
@ -232,7 +233,7 @@ def test10():
|
|||||||
# Delete chunk.
|
# Delete chunk.
|
||||||
chunk_path.unlink(missing_ok=False)
|
chunk_path.unlink(missing_ok=False)
|
||||||
|
|
||||||
execute_print_chunk(
|
execute_produce_chunks(
|
||||||
command=['cat', str(source_file)],
|
command=['cat', str(source_file)],
|
||||||
get_chunk_path=get_chunk_path,
|
get_chunk_path=get_chunk_path,
|
||||||
handle_chunk=handle_chunk,
|
handle_chunk=handle_chunk,
|
||||||
@ -254,7 +255,7 @@ def test11():
|
|||||||
print(f'\tsudo mkdir {target_dir}')
|
print(f'\tsudo mkdir {target_dir}')
|
||||||
print(f'\tcd {repo_name} && make && sudo make clean && cd ..')
|
print(f'\tcd {repo_name} && make && sudo make clean && cd ..')
|
||||||
print()
|
print()
|
||||||
print(f'\tsudo btrfs-receive-chunks {target_path}')
|
print(f'\tsudo btrfs-receive-chunks-v2 {target_path}')
|
||||||
print()
|
print()
|
||||||
|
|
||||||
print(f'=== In another shell, connect with "ssh odroid" ===')
|
print(f'=== In another shell, connect with "ssh odroid" ===')
|
||||||
@ -267,6 +268,15 @@ def test11():
|
|||||||
print(f'\tsudo btrfs-send-chunks-v2 {child_path} {ssh_target} {target_path}')
|
print(f'\tsudo btrfs-send-chunks-v2 {child_path} {ssh_target} {target_path}')
|
||||||
|
|
||||||
|
|
||||||
|
def test12():
|
||||||
|
child_path = '/mnt/backup/snap/blogger.privacy1st.de/20230104T2255'
|
||||||
|
ssh_target = 'rootnas'
|
||||||
|
target_path = '/mnt/data/test/blogger.privacy1st.de/20230104T2255'
|
||||||
|
target_chunk_path = '/mnt/data/test/chunks'
|
||||||
|
print(f'sudo btrfs-receive-chunks-v2 --chunk-dir={target_chunk_path} {target_path}')
|
||||||
|
print(f'sudo btrfs-send-chunks-v2 --chunk-dir=/mnt/backup/test/chunks --target-chunk-dir={target_chunk_path} {child_path} {ssh_target} {target_path}')
|
||||||
|
|
||||||
|
|
||||||
def _init(test_number: int):
|
def _init(test_number: int):
|
||||||
print(f"TEST {test_number}")
|
print(f"TEST {test_number}")
|
||||||
test_dir = Path('test')
|
test_dir = Path('test')
|
||||||
|
Loading…
Reference in New Issue
Block a user