This commit is contained in:
Daniel Langbein 2023-03-19 16:13:54 +01:00
parent 780fe7f1d1
commit 2b93336fbd
2 changed files with 31 additions and 19 deletions

View File

@ -19,27 +19,33 @@ def exec_receive_chunks_active(
def get_target_chunk_path(chunk_no: int) -> Path: def get_target_chunk_path(chunk_no: int) -> Path:
return target_chunk_dir.joinpath(f'{chunk_no}') return target_chunk_dir.joinpath(f'{chunk_no}')
def get_source_created_path(chunk_no: int) -> Path:
return source_chunk_dir.joinpath(f'{chunk_no}.COMPLETE')
def get_source_transferred_path() -> Path:
return source_chunk_dir.joinpath('SOCKET')
def handle_chunks(queue_put: Queue.put): def handle_chunks(queue_put: Queue.put):
# TODO: get starting value of chunk_no as argument # TODO: get starting value of chunk_no as argument
chunk_no = 1 chunk_no = 1
while True: while True:
target_chunk_path = get_target_chunk_path(chunk_no) target_chunk_path = get_target_chunk_path(chunk_no)
source_chunk_path = get_source_chunk_path(chunk_no) source_chunk_path = get_source_chunk_path(chunk_no)
source_socket = source_chunk_dir.joinpath('SOCKET') chunk_transferred_sock_path = get_source_transferred_path()
usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET') usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET')
inform_path = source_chunk_path.joinpath(f'{chunk_no}.COMPLETE') chunk_created_path = get_source_created_path()
inform_cmd = ['ssh', chunk_created_cmd = ['ssh',
ssh_source, ssh_source,
f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}'] f'cat {shlex.quote(str(chunk_created_path))} && rm {shlex.quote(str(chunk_created_path))}']
messages = [Message1.OK.value, Message1.EOF.value] messages = [Message1.OK.value, Message1.EOF.value]
rsync_cmd = ['rsync', rsync_cmd = ['rsync',
f'{ssh_source}:{source_chunk_path}', f'{ssh_source}:{source_chunk_path}',
f'{target_chunk_path}'] f'{target_chunk_path}']
inform2_cmd = ['ssh', chunk_transferred_cmd = ['ssh',
ssh_source, ssh_source,
f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(source_socket))}"] f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(chunk_transferred_sock_path))}"]
# Wait until next chunk can be transferred from sending side. # Wait until next chunk can be transferred from sending side.
# #
@ -48,7 +54,7 @@ def exec_receive_chunks_active(
# We will retry 3 times, each time after 5 seconds sleep. # We will retry 3 times, each time after 5 seconds sleep.
# #
# If all 4 `cat ...` attempts failed, we wait until the user has inspected the problem. # If all 4 `cat ...` attempts failed, we wait until the user has inspected the problem.
msg = repeat_until_successful(inform_cmd, usr_confirmation_socket, retries=3, retry_delay_seconds=5) msg = repeat_until_successful(chunk_created_cmd, usr_confirmation_socket, retries=3, retry_delay_seconds=5)
if msg not in messages: if msg not in messages:
raise ValueError(f'Invalid message: {msg}') raise ValueError(f'Invalid message: {msg}')
last_chunk = msg == Message1.EOF.value last_chunk = msg == Message1.EOF.value
@ -60,7 +66,7 @@ def exec_receive_chunks_active(
queue_put((target_chunk_path, last_chunk)) queue_put((target_chunk_path, last_chunk))
# Inform sending side about successful transfer. # Inform sending side about successful transfer.
repeat_until_successful(inform2_cmd, usr_confirmation_socket) repeat_until_successful(chunk_transferred_cmd, usr_confirmation_socket)
if last_chunk: if last_chunk:
break break

View File

@ -14,30 +14,36 @@ def exec_send_chunks_passive(
def get_source_chunk_path(chunk_no: int) -> Path: def get_source_chunk_path(chunk_no: int) -> Path:
return source_chunk_dir.joinpath(f'{chunk_no}') return source_chunk_dir.joinpath(f'{chunk_no}')
def get_source_created_path(chunk_no: int) -> Path:
return source_chunk_dir.joinpath(f'{chunk_no}.COMPLETE')
def get_source_transferred_path() -> Path:
return source_chunk_dir.joinpath('SOCKET')
def handle_chunk(chunk_no: int, last_chunk: bool): def handle_chunk(chunk_no: int, last_chunk: bool):
source_chunk_path = get_source_chunk_path(chunk_no) source_chunk_path = get_source_chunk_path(chunk_no)
print(f'Handling chunk {source_chunk_path}') print(f'Handling chunk {source_chunk_path}')
# Create in_socket # Create in_socket
source_socket = source_chunk_dir.joinpath('SOCKET') chunk_transferred_sock_path = get_source_transferred_path()
in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) chunk_transferred_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
in_sock.bind(str(source_socket)) chunk_transferred_sock.bind(str(chunk_transferred_sock_path))
in_sock.listen(1) chunk_transferred_sock.listen(1)
# Inform receiving side, that the next chunk can be transferred. # Inform receiving side, that the next chunk can be transferred.
inform_path = source_chunk_path.joinpath(f'{chunk_no}.COMPLETE') chunk_created_path = get_source_created_path(chunk_no)
inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value) chunk_created_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value)
# Read from in_socket # Read from in_socket
# -> Receiving side informs us after the chunk has been transferred. # -> Receiving side informs us after the chunk has been transferred.
_command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value]) _command = accept_loop_until_message(chunk_transferred_sock, [Message2.OK_BINARY.value])
# Delete local chunk after it has been transferred. # Delete local chunk after it has been transferred.
source_chunk_path.unlink(missing_ok=False) source_chunk_path.unlink(missing_ok=False)
# Close in_socket # Close in_socket
in_sock.close() chunk_transferred_sock.close()
source_socket.unlink(missing_ok=False) chunk_transferred_sock_path.unlink(missing_ok=False)
returncode = execute_produce_chunks( returncode = execute_produce_chunks(
command=command, command=command,