diff --git a/src/subprocess_util/exec_receive_chunks_active.py b/src/subprocess_util/exec_receive_chunks_active.py index d981f96..cdd14d6 100644 --- a/src/subprocess_util/exec_receive_chunks_active.py +++ b/src/subprocess_util/exec_receive_chunks_active.py @@ -19,27 +19,33 @@ def exec_receive_chunks_active( def get_target_chunk_path(chunk_no: int) -> Path: return target_chunk_dir.joinpath(f'{chunk_no}') + def get_source_created_path(chunk_no: int) -> Path: + return source_chunk_dir.joinpath(f'{chunk_no}.COMPLETE') + + def get_source_transferred_path() -> Path: + return source_chunk_dir.joinpath('SOCKET') + def handle_chunks(queue_put: Queue.put): # TODO: get starting value of chunk_no as argument chunk_no = 1 while True: target_chunk_path = get_target_chunk_path(chunk_no) source_chunk_path = get_source_chunk_path(chunk_no) - source_socket = source_chunk_dir.joinpath('SOCKET') + chunk_transferred_sock_path = get_source_transferred_path() usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET') - inform_path = source_chunk_path.joinpath(f'{chunk_no}.COMPLETE') - inform_cmd = ['ssh', - ssh_source, - f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}'] + chunk_created_path = get_source_created_path() + chunk_created_cmd = ['ssh', + ssh_source, + f'cat {shlex.quote(str(chunk_created_path))} && rm {shlex.quote(str(chunk_created_path))}'] messages = [Message1.OK.value, Message1.EOF.value] rsync_cmd = ['rsync', f'{ssh_source}:{source_chunk_path}', f'{target_chunk_path}'] - inform2_cmd = ['ssh', - ssh_source, - f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(source_socket))}"] + chunk_transferred_cmd = ['ssh', + ssh_source, + f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(chunk_transferred_sock_path))}"] # Wait until next chunk can be transferred from sending side. # @@ -48,7 +54,7 @@ def exec_receive_chunks_active( # We will retry 3 times, each time after 5 seconds sleep. # # If all 4 `cat ...` attempts failed, we wait until the user has inspected the problem. - msg = repeat_until_successful(inform_cmd, usr_confirmation_socket, retries=3, retry_delay_seconds=5) + msg = repeat_until_successful(chunk_created_cmd, usr_confirmation_socket, retries=3, retry_delay_seconds=5) if msg not in messages: raise ValueError(f'Invalid message: {msg}') last_chunk = msg == Message1.EOF.value @@ -60,7 +66,7 @@ def exec_receive_chunks_active( queue_put((target_chunk_path, last_chunk)) # Inform sending side about successful transfer. - repeat_until_successful(inform2_cmd, usr_confirmation_socket) + repeat_until_successful(chunk_transferred_cmd, usr_confirmation_socket) if last_chunk: break diff --git a/src/subprocess_util/exec_send_chunks_passive.py b/src/subprocess_util/exec_send_chunks_passive.py index 1c68e5c..252d83a 100644 --- a/src/subprocess_util/exec_send_chunks_passive.py +++ b/src/subprocess_util/exec_send_chunks_passive.py @@ -14,30 +14,36 @@ def exec_send_chunks_passive( def get_source_chunk_path(chunk_no: int) -> Path: return source_chunk_dir.joinpath(f'{chunk_no}') + def get_source_created_path(chunk_no: int) -> Path: + return source_chunk_dir.joinpath(f'{chunk_no}.COMPLETE') + + def get_source_transferred_path() -> Path: + return source_chunk_dir.joinpath('SOCKET') + def handle_chunk(chunk_no: int, last_chunk: bool): source_chunk_path = get_source_chunk_path(chunk_no) print(f'Handling chunk {source_chunk_path}') # Create in_socket - source_socket = source_chunk_dir.joinpath('SOCKET') - in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - in_sock.bind(str(source_socket)) - in_sock.listen(1) + chunk_transferred_sock_path = get_source_transferred_path() + chunk_transferred_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + chunk_transferred_sock.bind(str(chunk_transferred_sock_path)) + chunk_transferred_sock.listen(1) # Inform receiving side, that the next chunk can be transferred. - inform_path = source_chunk_path.joinpath(f'{chunk_no}.COMPLETE') - inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value) + chunk_created_path = get_source_created_path(chunk_no) + chunk_created_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value) # Read from in_socket # -> Receiving side informs us after the chunk has been transferred. - _command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value]) + _command = accept_loop_until_message(chunk_transferred_sock, [Message2.OK_BINARY.value]) # Delete local chunk after it has been transferred. source_chunk_path.unlink(missing_ok=False) # Close in_socket - in_sock.close() - source_socket.unlink(missing_ok=False) + chunk_transferred_sock.close() + chunk_transferred_sock_path.unlink(missing_ok=False) returncode = execute_produce_chunks( command=command,