diff --git a/src/p1st/btrfs_receive_chunks_v3.py b/src/p1st/btrfs_receive_chunks_v3.py index 8d5f3b1..0420b3d 100644 --- a/src/p1st/btrfs_receive_chunks_v3.py +++ b/src/p1st/btrfs_receive_chunks_v3.py @@ -5,7 +5,7 @@ import shlex from pathlib import Path from queue import Queue -from p1st.common import Message1, Message2 +from p1st.common import Message1, Message2, get_chunk_path, get_inform1_path, get_inform2_path from p1st.exec_consume_chunks import execute_consume_chunks from p1st.repeat import repeat_until_successful @@ -15,7 +15,7 @@ def main(): # ssh_source: str = args.ssh_source source_chunk_dir: Path = args.source_chunk_dir - targe_chunk_dir: Path = args.chunk_dir + target_chunk_dir: Path = args.chunk_dir target_path: Path = args.target_path target_path.parent.mkdir(parents=True, exist_ok=True) @@ -25,13 +25,13 @@ def main(): def handle_chunks(queue_put: Queue.put): chunk_no = 1 while True: - target_chunk_path = targe_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') - source_chunk_path = source_chunk_dir.joinpath(f'{target_path.name}.CHUNK.{chunk_no}') - in_socket: Path = source_chunk_dir.parent.joinpath(f'{target_path.name}.SOCKET.in') + target_chunk_path = get_chunk_path(target_chunk_dir, target_path, chunk_no) + source_chunk_path = get_chunk_path(source_chunk_dir, target_path, chunk_no) + in_socket: Path = get_inform2_path(source_chunk_dir, target_path) - usr_confirmation_socket = target_chunk_path.parent.joinpath(f'{target_chunk_path.name}.SOCKET') + user_socket = target_chunk_dir.parent.joinpath(f'{target_path.name}.SOCKET.user') - inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') + inform_path = get_inform1_path(source_chunk_path) inform_cmd = ['ssh', ssh_source, f'cat {shlex.quote(str(inform_path))} && rm {shlex.quote(str(inform_path))}'] @@ -44,19 +44,19 @@ def main(): # Wait until next chunk can be transferred from sending side. messages = [Message1.OK.value, Message1.EOF.value] - msg = repeat_until_successful(inform_cmd, usr_confirmation_socket) + msg = repeat_until_successful(inform_cmd, user_socket) if msg not in messages: raise ValueError(f'Invalid message: {msg}') last_chunk = msg == Message1.EOF.value # Transfer chunk. - repeat_until_successful(rsync_cmd, usr_confirmation_socket) + repeat_until_successful(rsync_cmd, user_socket) # Add chunk path to queue. queue_put((target_chunk_path, last_chunk)) # Inform sending side about successful transfer. - repeat_until_successful(inform2_cmd, usr_confirmation_socket) + repeat_until_successful(inform2_cmd, user_socket) if last_chunk: break diff --git a/src/p1st/btrfs_send_chunks_v3.py b/src/p1st/btrfs_send_chunks_v3.py index 30ad8f3..2807c5e 100644 --- a/src/p1st/btrfs_send_chunks_v3.py +++ b/src/p1st/btrfs_send_chunks_v3.py @@ -4,7 +4,7 @@ import argparse import socket from pathlib import Path -from p1st.common import Message1, Message2 +from p1st.common import Message1, Message2, get_inform1_path, get_inform2_path, get_chunk_path from p1st.exec_produce_chunks import execute_produce_chunks from p1st.unix_sock_input import accept_loop_until_message @@ -29,20 +29,20 @@ def main(): command = [x for xs in command_parts for x in xs] def get_source_chunk_path(chunk_no: int) -> Path: - return source_chunk_dir.joinpath(f'{child.name}.CHUNK.{chunk_no}') + return get_chunk_path(source_chunk_dir, child, chunk_no) def handle_chunk(chunk_no: int, last_chunk: bool): source_chunk_path = get_source_chunk_path(chunk_no) print(f'Handling chunk {source_chunk_path}') # Create in_socket - in_socket: Path = source_chunk_dir.parent.joinpath(f'{child.name}.SOCKET.in') + in_socket: Path = get_inform2_path(source_chunk_dir, child) in_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) in_sock.bind(str(in_socket)) in_sock.listen(1) # Inform receiving side, that the next chunk can be transferred. - inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') + inform_path = get_inform1_path(source_chunk_path) inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value) # Read from in_socket diff --git a/src/p1st/common.py b/src/p1st/common.py index ec80eba..8aa80b5 100644 --- a/src/p1st/common.py +++ b/src/p1st/common.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- from enum import Enum +from pathlib import Path class Message1(Enum): @@ -11,3 +12,15 @@ class Message1(Enum): class Message2(Enum): OK_BINARY = b'OK\n' OK_STR = 'OK\n' + + +def get_chunk_path(chunk_dir: Path, subvolume: Path, chunk_no: int) -> Path: + return chunk_dir.joinpath(f'{subvolume.name}.CHUNK.{chunk_no}') + + +def get_inform1_path(source_chunk_path: Path) -> Path: + return source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE') + + +def get_inform2_path(source_chunk_dir: Path, subvolume: Path) -> Path: + return source_chunk_dir.joinpath(f'{subvolume.name}.SOCKET.in')