This commit is contained in:
Daniel Langbein 2023-03-19 16:33:58 +01:00
parent 74366a3a1b
commit a0e2db42e6
3 changed files with 16 additions and 4 deletions

View File

@ -0,0 +1,10 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from enum import Enum
class NextChunkEnum(Enum):
OK_BINARY = b'OK\n'
OK_STR = 'OK\n'
EOF_BINARY = b'EOF\n'
EOF_STR = 'EOF\n'

View File

@ -2,6 +2,7 @@ import socket
from pathlib import Path from pathlib import Path
from queue import Queue from queue import Queue
from subprocess_util.chunked_transfer.active_send.common import NextChunkEnum
from subprocess_util.exec_consume_chunks import execute_consume_chunks from subprocess_util.exec_consume_chunks import execute_consume_chunks
from subprocess_util.unix_sock_input import accept_loop_until_message from subprocess_util.unix_sock_input import accept_loop_until_message
@ -23,11 +24,11 @@ def exec_receive_chunks_passive(
sock.listen(1) sock.listen(1)
chunk_no = 1 chunk_no = 1
messages = [b'OK\n', b'EOF\n'] messages: list[bytes] = [NextChunkEnum.OK_BINARY.value, NextChunkEnum.EOF_BINARY.value]
while True: while True:
msg = accept_loop_until_message(sock, messages) msg = accept_loop_until_message(sock, messages)
last_chunk = msg == b'EOF\n' last_chunk = msg == NextChunkEnum.EOF_BINARY.value
target_chunk_path = get_target_chunk_path(chunk_no) target_chunk_path = get_target_chunk_path(chunk_no)
queue_put((target_chunk_path, last_chunk)) queue_put((target_chunk_path, last_chunk))

View File

@ -1,6 +1,7 @@
import shlex import shlex
from pathlib import Path from pathlib import Path
from subprocess_util.chunked_transfer.active_send.common import NextChunkEnum
from subprocess_util.exec_produce_chunks import execute_produce_chunks from subprocess_util.exec_produce_chunks import execute_produce_chunks
from subprocess_util.repeat import repeat_until_successful from subprocess_util.repeat import repeat_until_successful
@ -26,14 +27,14 @@ def exec_send_chunks_active(
usr_confirmation_socket = source_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET') usr_confirmation_socket = source_chunk_path.parent.joinpath(f'{chunk_no}.SOCKET')
target_socket = target_chunk_dir.joinpath('SOCKET') target_socket = target_chunk_dir.joinpath('SOCKET')
message = 'EOF' if last_chunk else 'OK' message: str = NextChunkEnum.EOF_STR.value if last_chunk else NextChunkEnum.OK_STR.value
rsync_cmd = ['rsync', rsync_cmd = ['rsync',
str(source_chunk_path), str(source_chunk_path),
f'{ssh_target}:{str(target_chunk_path)}'] f'{ssh_target}:{str(target_chunk_path)}']
inform_cmd = ['ssh', inform_cmd = ['ssh',
ssh_target, ssh_target,
f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] f"printf '{message}' | nc -U {shlex.quote(str(target_socket))}"]
repeat_until_successful(rsync_cmd, usr_confirmation_socket) repeat_until_successful(rsync_cmd, usr_confirmation_socket)
# Delete local chunk after it has been transferred. # Delete local chunk after it has been transferred.