From 3fed1b44909f9f6e32758a76b3e694bec90a01fc Mon Sep 17 00:00:00 2001 From: Daniel Langbein Date: Thu, 12 Jan 2023 21:37:39 +0100 Subject: [PATCH] delete processed chunks --- src/p1st/btrfs_send_chunks_active.py | 2 ++ src/p1st/btrfs_send_chunks_passive.py | 3 +++ src/p1st/exec_consume_chunks.py | 6 +++++- src/p1st/exec_produce_chunks.py | 1 + src/p1st/test.py | 8 ++++++-- 5 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/p1st/btrfs_send_chunks_active.py b/src/p1st/btrfs_send_chunks_active.py index 64f60c8..8ec26c8 100644 --- a/src/p1st/btrfs_send_chunks_active.py +++ b/src/p1st/btrfs_send_chunks_active.py @@ -49,6 +49,8 @@ def main(): f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] repeat_until_successful(rsync_cmd, usr_confirmation_socket) + # Delete local chunk after it has been transferred. + source_chunk_path.unlink(missing_ok=False) repeat_until_successful(inform_cmd, usr_confirmation_socket) returncode = execute_produce_chunks( diff --git a/src/p1st/btrfs_send_chunks_passive.py b/src/p1st/btrfs_send_chunks_passive.py index 2807c5e..8ddae92 100644 --- a/src/p1st/btrfs_send_chunks_passive.py +++ b/src/p1st/btrfs_send_chunks_passive.py @@ -49,6 +49,9 @@ def main(): # -> Receiving side informs us after the chunk has been transferred. _command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value]) + # Delete local chunk after it has been transferred. + source_chunk_path.unlink(missing_ok=False) + # Close in_socket in_sock.close() in_socket.unlink(missing_ok=False) diff --git a/src/p1st/exec_consume_chunks.py b/src/p1st/exec_consume_chunks.py index dab959b..a63fcff 100644 --- a/src/p1st/exec_consume_chunks.py +++ b/src/p1st/exec_consume_chunks.py @@ -16,8 +16,10 @@ def execute_consume_chunks(command: list[str], handle_chunks: Callable[[Queue.put], None], ) -> int: """ + Local chunks are deleted after they are fed to stdin of subprocess. + :param command: - :param handle_chunks: Has one parameter, `queue_put`. After a chunks is saved, this method must call queue_put.(chunk_path, last_chunk). + :param handle_chunks: Has one parameter, `queue_put`. `handle_chunks` must call queue_put.(chunk_path, last_chunk) for each saved chunk. :return: """ process = subprocess.Popen( @@ -64,8 +66,10 @@ def _stdin_worker(queue_get: Queue.get, transferred_bytes = 0 while True: + chunk_path: Path chunk_path, last_chunk = queue_get() chunk = _read_chunk(chunk_path) + chunk_path.unlink(missing_ok=False) binary_stdin.write(chunk) # binary_stdin.flush() # TODO: is this required? diff --git a/src/p1st/exec_produce_chunks.py b/src/p1st/exec_produce_chunks.py index 33b0c4d..0b0b080 100644 --- a/src/p1st/exec_produce_chunks.py +++ b/src/p1st/exec_produce_chunks.py @@ -20,6 +20,7 @@ def execute_produce_chunks(command: list[str], The location of the chunks is determined by `get_chunk_path`. A separate thread calls `handle_chunk` once a new chunk was saved. + It is the duty of `handle_chunk` to delete processed chunks from disk. Stderr of the subprocess is printed to sys.stderr. diff --git a/src/p1st/test.py b/src/p1st/test.py index 416563c..818bdf8 100644 --- a/src/p1st/test.py +++ b/src/p1st/test.py @@ -153,14 +153,18 @@ def test13(): def test14(): + chunk_size = 256 * 1024 * 1024 # 256MB + ssh_source = 'rootnas' source_path = '/mnt/data/snap/cloud.privacy1st.de/20221015T1432_u' source_chunk_dir = '/mnt/data/btrfs-transfer-chunks' target_path = '/mnt/backup/snap/cloud.privacy1st.de/20221015T1432_u' - print(f'nas: sudo btrfs-send-chunks-passive --chunk-dir={source_chunk_dir} {source_path}') - print(f'odroid: sudo btrfs-receive-chunks-active {ssh_source} {source_chunk_dir} {target_path}') + print(f'nas: [screen] ' + + f'sudo btrfs-send-chunks-passive --chunk-size={chunk_size} --chunk-dir={source_chunk_dir} {source_path}') + print(f'odroid: [screen] ' + + f'sudo btrfs-receive-chunks-active {ssh_source} {source_chunk_dir} {target_path}') def _init(test_number: int):