This commit is contained in:
Daniel Langbein 2023-01-12 17:39:57 +01:00
parent f3ae420dca
commit 4781c88e3c
3 changed files with 27 additions and 14 deletions

View File

@ -5,7 +5,7 @@ import shlex
from pathlib import Path from pathlib import Path
from queue import Queue from queue import Queue
from p1st.common import Message1, Message2 from p1st.common import Message1, Message2, get_chunk_path, get_inform1_path, get_inform2_path
from p1st.exec_consume_chunks import execute_consume_chunks from p1st.exec_consume_chunks import execute_consume_chunks
from p1st.repeat import repeat_until_successful from p1st.repeat import repeat_until_successful
@ -15,7 +15,7 @@ def main():
# #
ssh_source: str = args.ssh_source ssh_source: str = args.ssh_source
source_chunk_dir: Path = args.source_chunk_dir source_chunk_dir: Path = args.source_chunk_dir
targe_chunk_dir: Path = args.chunk_dir target_chunk_dir: Path = args.chunk_dir
target_path: Path = args.target_path target_path: Path = args.target_path
target_path.parent.mkdir(parents=True, exist_ok=True) target_path.parent.mkdir(parents=True, exist_ok=True)
@ -25,13 +25,13 @@ def main():
def handle_chunks(queue_put: Queue.put): def handle_chunks(queue_put: Queue.put):
chunk_no = 1 chunk_no = 1
while True: while True:
target_chunk_path = targe_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') target_chunk_path = get_chunk_path(target_chunk_dir, target_path, chunk_no)
source_chunk_path = source_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') source_chunk_path = get_chunk_path(source_chunk_dir, target_path, chunk_no)
in_socket: Path = source_chunk_dir.parent.joinpath(f'{target_path.name}.SOCKET.in') in_socket: Path = get_inform2_path(source_chunk_dir, target_path)
usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{target_chunk_path.name}.SOCKET') user_socket = target_chunk_dir.parent.joinpath(f'{target_path.name}.SOCKET.user')
inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') inform_path = get_inform1_path(source_chunk_path)
inform_cmd = ['ssh', inform_cmd = ['ssh',
ssh_source, ssh_source,
f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}'] f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}']
@ -44,19 +44,19 @@ def main():
# Wait until next chunk can be transferred from sending side. # Wait until next chunk can be transferred from sending side.
messages = [Message1.OK.value, Message1.EOF.value] messages = [Message1.OK.value, Message1.EOF.value]
msg = repeat_until_successful(inform_cmd, usr_confirmation_socket) msg = repeat_until_successful(inform_cmd, user_socket)
if msg not in messages: if msg not in messages:
raise ValueError(f'Invalid message: {msg}') raise ValueError(f'Invalid message: {msg}')
last_chunk = msg == Message1.EOF.value last_chunk = msg == Message1.EOF.value
# Transfer chunk. # Transfer chunk.
repeat_until_successful(rsync_cmd, usr_confirmation_socket) repeat_until_successful(rsync_cmd, user_socket)
# Add chunk path to queue. # Add chunk path to queue.
queue_put((target_chunk_path, last_chunk)) queue_put((target_chunk_path, last_chunk))
# Inform sending side about successful transfer. # Inform sending side about successful transfer.
repeat_until_successful(inform2_cmd, usr_confirmation_socket) repeat_until_successful(inform2_cmd, user_socket)
if last_chunk: if last_chunk:
break break

View File

@ -4,7 +4,7 @@ import argparse
import socket import socket
from pathlib import Path from pathlib import Path
from p1st.common import Message1, Message2 from p1st.common import Message1, Message2, get_inform1_path, get_inform2_path, get_chunk_path
from p1st.exec_produce_chunks import execute_produce_chunks from p1st.exec_produce_chunks import execute_produce_chunks
from p1st.unix_sock_input import accept_loop_until_message from p1st.unix_sock_input import accept_loop_until_message
@ -29,20 +29,20 @@ def main():
command = [x for xs in command_parts for x in xs] command = [x for xs in command_parts for x in xs]
def get_source_chunk_path(chunk_no: int) -> Path: def get_source_chunk_path(chunk_no: int) -> Path:
return source_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') return get_chunk_path(source_chunk_dir, child, chunk_no)
def handle_chunk(chunk_no: int, last_chunk: bool): def handle_chunk(chunk_no: int, last_chunk: bool):
source_chunk_path = get_source_chunk_path(chunk_no) source_chunk_path = get_source_chunk_path(chunk_no)
print(f'Handling chunk {source_chunk_path}') print(f'Handling chunk {source_chunk_path}')
# Create in_socket # Create in_socket
in_socket: Path = source_chunk_dir.parent.joinpath(f'{child.name}.SOCKET.in') in_socket: Path = get_inform2_path(source_chunk_dir, child)
in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
in_sock.bind(str(in_socket)) in_sock.bind(str(in_socket))
in_sock.listen(1) in_sock.listen(1)
# Inform receiving side, that the next chunk can be transferred. # Inform receiving side, that the next chunk can be transferred.
inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') inform_path = get_inform1_path(source_chunk_path)
inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value) inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value)
# Read from in_socket # Read from in_socket

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from enum import Enum from enum import Enum
from pathlib import Path
class Message1(Enum): class Message1(Enum):
@ -11,3 +12,15 @@ class Message1(Enum):
class Message2(Enum): class Message2(Enum):
OK_BINARY = b'OK\n' OK_BINARY = b'OK\n'
OK_STR = 'OK\n' OK_STR = 'OK\n'
def get_chunk_path(chunk_dir: Path, subvolume: Path, chunk_no: int) -> Path:
return chunk_dir.joinpath(f'{subvolume.name}.CHUNK.{chunk_no}')
def get_inform1_path(source_chunk_path: Path) -> Path:
return source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE')
def get_inform2_path(source_chunk_dir: Path, subvolume: Path) -> Path:
return source_chunk_dir.joinpath(f'{subvolume.name}.SOCKET.in')