delete processed chunks

This commit is contained in:
Daniel Langbein 2023-01-12 21:37:39 +01:00
parent 15bbb728e8
commit 3fed1b4490
5 changed files with 17 additions and 3 deletions

View File

@ -49,6 +49,8 @@ def main():
f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
repeat_until_successful(rsync_cmd, usr_confirmation_socket) repeat_until_successful(rsync_cmd, usr_confirmation_socket)
# Delete local chunk after it has been transferred.
source_chunk_path.unlink(missing_ok=False)
repeat_until_successful(inform_cmd, usr_confirmation_socket) repeat_until_successful(inform_cmd, usr_confirmation_socket)
returncode = execute_produce_chunks( returncode = execute_produce_chunks(

View File

@ -49,6 +49,9 @@ def main():
# -> Receiving side informs us after the chunk has been transferred. # -> Receiving side informs us after the chunk has been transferred.
_command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value]) _command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value])
# Delete local chunk after it has been transferred.
source_chunk_path.unlink(missing_ok=False)
# Close in_socket # Close in_socket
in_sock.close() in_sock.close()
in_socket.unlink(missing_ok=False) in_socket.unlink(missing_ok=False)

View File

@ -16,8 +16,10 @@ def execute_consume_chunks(command: list[str],
handle_chunks: Callable[[Queue.put], None], handle_chunks: Callable[[Queue.put], None],
) -> int: ) -> int:
""" """
Local chunks are deleted after they are fed to stdin of subprocess.
:param command: :param command:
:param handle_chunks: Has one parameter, `queue_put`. After a chunks is saved, this method must call queue_put.(chunk_path, last_chunk). :param handle_chunks: Has one parameter, `queue_put`. `handle_chunks` must call queue_put.(chunk_path, last_chunk) for each saved chunk.
:return: :return:
""" """
process = subprocess.Popen( process = subprocess.Popen(
@ -64,8 +66,10 @@ def _stdin_worker(queue_get: Queue.get,
transferred_bytes = 0 transferred_bytes = 0
while True: while True:
chunk_path: Path
chunk_path, last_chunk = queue_get() chunk_path, last_chunk = queue_get()
chunk = _read_chunk(chunk_path) chunk = _read_chunk(chunk_path)
chunk_path.unlink(missing_ok=False)
binary_stdin.write(chunk) binary_stdin.write(chunk)
# binary_stdin.flush() # TODO: is this required? # binary_stdin.flush() # TODO: is this required?

View File

@ -20,6 +20,7 @@ def execute_produce_chunks(command: list[str],
The location of the chunks is determined by `get_chunk_path`. The location of the chunks is determined by `get_chunk_path`.
A separate thread calls `handle_chunk` once a new chunk was saved. A separate thread calls `handle_chunk` once a new chunk was saved.
It is the duty of `handle_chunk` to delete processed chunks from disk.
Stderr of the subprocess is printed to sys.stderr. Stderr of the subprocess is printed to sys.stderr.

View File

@ -153,14 +153,18 @@ def test13():
def test14(): def test14():
chunk_size = 256 * 1024 * 1024 # 256MB
ssh_source = 'rootnas' ssh_source = 'rootnas'
source_path = '/mnt/data/snap/cloud.privacy1st.de/20221015T1432_u' source_path = '/mnt/data/snap/cloud.privacy1st.de/20221015T1432_u'
source_chunk_dir = '/mnt/data/btrfs-transfer-chunks' source_chunk_dir = '/mnt/data/btrfs-transfer-chunks'
target_path = '/mnt/backup/snap/cloud.privacy1st.de/20221015T1432_u' target_path = '/mnt/backup/snap/cloud.privacy1st.de/20221015T1432_u'
print(f'nas: sudo btrfs-send-chunks-passive --chunk-dir={source_chunk_dir} {source_path}') print(f'nas: [screen] ' +
print(f'odroid: sudo btrfs-receive-chunks-active {ssh_source} {source_chunk_dir} {target_path}') f'sudo btrfs-send-chunks-passive --chunk-size={chunk_size} --chunk-dir={source_chunk_dir} {source_path}')
print(f'odroid: [screen] ' +
f'sudo btrfs-receive-chunks-active {ssh_source} {source_chunk_dir} {target_path}')
def _init(test_number: int): def _init(test_number: int):