This commit is contained in:
Daniel Langbein 2023-01-12 17:20:39 +01:00
parent 52243d5f3a
commit f3ae420dca
7 changed files with 16 additions and 369 deletions

View File

@ -5,6 +5,7 @@ import shlex
from pathlib import Path
from queue import Queue
from p1st.common import Message1, Message2
from p1st.exec_consume_chunks import execute_consume_chunks
from p1st.repeat import repeat_until_successful
@ -39,14 +40,14 @@ def main():
f'{target_chunk_path}']
inform2_cmd = ['ssh',
ssh_source,
f'echo OK | nc -U {shlex.quote(str(in_socket))}']
f"printf '{Message2.OK_STR.value}' | nc -U {shlex.quote(str(in_socket))}"]
# Wait until next chunk can be transferred from sending side.
messages = ['OK\n', 'EOF\n']
messages = [Message1.OK.value, Message1.EOF.value]
msg = repeat_until_successful(inform_cmd, usr_confirmation_socket)
if msg not in messages:
raise ValueError(f'Inalid message: >>>{msg}<<<')
last_chunk = msg == 'EOF\n'
raise ValueError(f'Invalid message: {msg}')
last_chunk = msg == Message1.EOF.value
# Transfer chunk.
repeat_until_successful(rsync_cmd, usr_confirmation_socket)

View File

@ -4,6 +4,7 @@ import argparse
import socket
from pathlib import Path
from p1st.common import Message1, Message2
from p1st.exec_produce_chunks import execute_produce_chunks
from p1st.unix_sock_input import accept_loop_until_message
@ -42,11 +43,11 @@ def main():
# Inform receiving side, that the next chunk can be transferred.
inform_path = source_chunk_path.parent.joinpath(f'{source_chunk_path.name}.COMPLETE')
inform_path.write_text('EOF\n' if last_chunk else 'OK\n')
inform_path.write_text(Message1.EOF.value if last_chunk else Message1.OK.value)
# Read from in_socket
# -> Receiving side informs us after the chunk has been transferred.
_command = accept_loop_until_message(in_sock, [b'OK\n'])
_command = accept_loop_until_message(in_sock, [Message2.OK_BINARY.value])
# Close in_socket
in_sock.close()

View File

@ -1,15 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
from enum import Enum
def _get_chunk_file(chunk_file_tmpl: Path, ct: int):
return chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}')
class Message1(Enum):
OK = 'OK\n'
EOF = 'EOF\n'
def _get_remote_socket(target: Path):
return target.parent.joinpath(f'{target}.SOCKET')
def _get_chunk_tmpl(subvol: Path):
return subvol.parent.joinpath(f'{subvol.name}.CHUNK')
class Message2(Enum):
OK_BINARY = b'OK\n'
OK_STR = 'OK\n'

View File

@ -4,7 +4,7 @@ from queue import Queue
import sys
import threading
import subprocess
from typing import AnyStr, IO, Callable
from typing import AnyStr, IO
class _Assert:

View File

@ -1,75 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import sys
import threading
from pathlib import Path
from typing import IO, AnyStr, Callable
from p1st.common import _get_chunk_file
from p1st.receive_inform import receive_inform
def _print_stdout(bin_pipe: IO[AnyStr]):
b: bytes
for b in bin_pipe:
sys.stdout.write(f'[STDOUT] {b.decode("UTF-8")}')
# TODO: Has this any effect?
# bin_pipe.close()
def _print_stderr(bin_pipe: IO[AnyStr]):
b: bytes
for b in bin_pipe:
sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}')
# TODO: Has this any effect?
# bin_pipe.close()
def execute_print_receive_chunks(command: list[str],
socket_file: Path,
chunk_file_tmpl: Path,
get_chunk_file: Callable[[Path, int], Path] = _get_chunk_file,
) -> int:
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
)
# TODO - This function
# - execute the command (print live stdout/stderr)
#
# TODO - Inside receive_inform()
# - while True
# - wait for message at socket_file
# - if 'OK\n'
# - determine chunk filename
# - read chunk, pass to stdin of command
# - delete chunk
# - if 'EOF\n'
# - break
#
# TODO - This function
# - wait for command to finish
t_out = threading.Thread(
target=_print_stdout, args=(process.stdout,))
t_err = threading.Thread(
target=_print_stderr, args=(process.stderr,))
for t in (t_out, t_err):
t.daemon = True
t.start()
receive_inform(process.stdin, socket_file, chunk_file_tmpl, get_chunk_file)
returncode = process.wait()
for t in (t_out, t_err):
t.join()
return returncode

View File

@ -1,170 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import time
from pathlib import Path
import sys
import threading
import subprocess
from typing import AnyStr, IO, Callable
from p1st.common import _get_chunk_file
from p1st.data_units import DataUnitConverter
def _rotate_chunk(chunk_file: Path, chunk_number: int, eof: bool, chunk_transfer_fun: Callable, chunk_transfer_args: tuple):
print(f'Transferring chunk')
chunk_transfer_fun(chunk_file, chunk_number, eof, *chunk_transfer_args)
print(f'Removing chunk')
chunk_file.unlink(missing_ok=False)
def _save_chunk(chunk: bytes, chunk_file: Path):
print(f'Saving chunk {chunk_file}')
# Fails if file does already exist.
with open(chunk_file, 'xb') as f:
f.write(chunk)
# def _save_output(out_pipe: IO[AnyStr], stdout_dir: Path):
# stdout_dir.mkdir(parents=True, exist_ok=False)
#
# b: bytes
# ct: int = 1
# for b in out_pipe:
# stdout_dir.joinpath(str(ct)).write_bytes(b)
# ct += 1
#
# # TODO: Has this any effect?
# # out_pipe.close()
def _save_output_rotating_chunks(out_pipe: IO[AnyStr],
chunk_file_tmpl: Path,
chunk_size,
chunk_transfer_fun: Callable,
chunk_transfer_args: tuple,
get_chunk_file: Callable[[Path, int], Path],
):
start_time = time.time()
ct: int = 1
remaining_bytes = chunk_size
chunk: bytes = b''
while True:
# https://docs.python.org/3/library/io.html#io.RawIOBase.read
# If 0 bytes are returned, and size was not 0, this indicates end of file.
# If the object is in non-blocking mode and no bytes are available, None is returned.
b = out_pipe.read(remaining_bytes)
if len(b) == 0:
# EOF reached.
chunk_file = get_chunk_file(chunk_file_tmpl, ct)
chunk_file.parent.mkdir(parents=True, exist_ok=True)
_save_chunk(chunk, chunk_file)
_rotate_chunk(chunk_file, ct, True, chunk_transfer_fun, chunk_transfer_args)
current_time = time.time()
elapsed_time = current_time - start_time
print(f'Elapsed time: {datetime.timedelta(seconds=elapsed_time)}\n'
f'Transferred: {DataUnitConverter.to_unit_auto_str(ct * chunk_size)}')
break
chunk += b
chunk_len = len(chunk)
if chunk_len == chunk_size:
# Next chunk is full.
chunk_file = get_chunk_file(chunk_file_tmpl, ct)
chunk_file.parent.mkdir(parents=True, exist_ok=True)
_save_chunk(chunk, chunk_file)
_rotate_chunk(chunk_file, ct, False, chunk_transfer_fun, chunk_transfer_args)
current_time = time.time()
elapsed_time = current_time - start_time
transferred_bytes = ct * chunk_size
bytes_per_second = transferred_bytes / elapsed_time
print(f'Elapsed time: {datetime.timedelta(seconds=elapsed_time)}\n'
f'Transferred: {DataUnitConverter.to_unit_auto_str(transferred_bytes)}\n'
f'Speed: {DataUnitConverter.to_unit_auto_str(bytes_per_second)}/s')
chunk = b''
remaining_bytes = chunk_size
ct += 1
elif chunk_len < chunk_size:
remaining_bytes = chunk_size - chunk_len
else:
raise ValueError('Invalid state')
# TODO: Has this any effect?
# out_pipe.close()
def _print_stderr(bin_pipe: IO[AnyStr]):
b: bytes
for b in bin_pipe:
sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}')
# TODO: Has this any effect?
# bin_pipe.close()
def execute_print_transfer_chunks(command: list[str],
chunk_file_tmpl: Path,
chunk_transfer_fun: Callable,
chunk_transfer_args: tuple,
chunk_size: int = 1024 * 1024,
get_chunk_file: Callable[[Path, int], Path] = _get_chunk_file,
) -> int:
"""
Executes the given command.
Until the command has finished:
- saves a small part of the commands stdout to chunk_file = f'{chunk_file_tmpl.name}.{chunk_number}'
- calls chunk_transfer_fun with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args)
- eof_reached is True for the last chunk
- the last chunk may be smaller than chunk_size
- deletes chunk_file
During command execution: Forwards stderr output of the command to stderr.
:param chunk_file_tmpl: Used by get_chunk_file to calculate the paths of the chunk files.
:param command: Command to execute, e.g. ['cat', '/some/large/file']
:param chunk_transfer_fun: Called for each chunk with arguments (chunk_file, chunk_number, eof_reached, *chunk_transfer_args)
:param chunk_transfer_args:
:param chunk_size: Defaults to 1MB (1024*1024).
:param get_chunk_file: Gets chunk_file_tmpl and chunk_number as input and returns the path of the corresponding chunk.
:return: returncode of executed command
"""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
)
t_out = threading.Thread(
target=_save_output_rotating_chunks,
args=(process.stdout,
chunk_file_tmpl,
chunk_size,
chunk_transfer_fun,
chunk_transfer_args,
get_chunk_file,
))
t_err = threading.Thread(
target=_print_stderr, args=(process.stderr,))
for t in (t_out, t_err):
t.daemon = True
t.start()
returncode = process.wait()
for t in (t_out, t_err):
t.join()
return returncode

View File

@ -8,8 +8,6 @@ from pathlib import Path
from p1st.exec_capture import execute_capture
from p1st.exec_print_capture import execute_print_capture
from p1st.exec_print_receive import execute_print_receive_chunks
from p1st.exec_print_transfer import execute_print_transfer_chunks
from p1st.exec_produce_chunks import execute_produce_chunks
from p1st.transfer_inform import transfer_inform
@ -17,10 +15,7 @@ from p1st.transfer_inform import transfer_inform
def test():
# test1()
# test2()
# test3()
# test4()
# test5()
# test67()
# test8()
# test9()
# test10()
@ -47,23 +42,6 @@ def test2():
returncode, out, err = execute_print_capture(['ls', '/foo/bar'])
def test3():
_init(3)
def _chunk_transfer(chunk_file: Path, ct: int, eof: bool):
print(f'Transferring chunk {chunk_file} to ... (This is the default method, it has no effect)')
if eof:
print(f'The last chunk has been transferred.')
returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1'), _chunk_transfer, ())
print("TEST 3-2")
returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2'), _chunk_transfer, ())
print("TEST 3-3")
returncode = execute_print_transfer_chunks(['cat', 'transfer_inform.py'], Path('test/3-3'),
_chunk_transfer, (),
chunk_size=1024)
def test4():
_init(4)
@ -74,32 +52,6 @@ def test4():
)
def test5():
_init(5)
chunk_file_tmpl = Path('test/5')
source_file = Path('transfer_inform.py') # A python script file with some content to copy ;)
remote_target_file = Path(f'test/5-copy-of-{source_file}')
concat_script = Path('test/5-concat')
concat_script.write_text(f'#!/usr/bin/bash\n'
f'echo "rsync $1 ... command output"\n'
f'cat "$1" >> "$2"')
os.chmod(concat_script, 0o0755)
# TODO:
# When running this test:
# The intentionally generated error, can be fixed by touching (creating) the missing file.
execute_print_transfer_chunks(
command=['cat', str(source_file)],
chunk_file_tmpl=chunk_file_tmpl,
chunk_transfer_fun=_test5_chunk_transfer_fun,
chunk_transfer_args=(concat_script, remote_target_file),
chunk_size=512,
)
def _test5_chunk_transfer_fun(chunk_file: Path,
chunk_number: int,
eof: bool,
@ -117,66 +69,6 @@ def _test5_chunk_transfer_fun(chunk_file: Path,
)
def test67():
hostname = socket.gethostname()
if hostname == 'yodaTux':
test6() # LOCAL
elif hostname == 'danctnix':
test7() # REMOTE
else:
print(f'Unknown hostname {hostname}')
def test6():
_init(6)
source_file = Path('transfer_inform.py') # A python script file with some content to copy ;)
chunk_file_tmpl = Path('test/6-transfer_inform.py')
ssh_target = 'pine-pwdless'
target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/7-transfer_inform.py')
execute_print_transfer_chunks(
command=['cat', str(source_file)],
chunk_file_tmpl=chunk_file_tmpl,
chunk_transfer_fun=_test6_chunk_transfer_fun,
chunk_transfer_args=(chunk_file_tmpl, ssh_target, target_file_tmpl),
chunk_size=512,
)
def _test6_chunk_transfer_fun(source_chunk: Path,
chunk_number: int,
eof: bool,
chunk_file_tmpl: Path,
ssh_target: str,
target_file_tmpl: Path,
):
target_chunk = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.{chunk_number}')
rsync_cmd = ['rsync', str(source_chunk), f'{ssh_target}:{str(target_chunk)}']
message = 'EOF' if eof else 'OK'
target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET')
inform_cmd = ['ssh', ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
transfer_inform(
rsync_cmd=rsync_cmd,
inform_cmd=inform_cmd,
user_input_file=source_chunk.parent.joinpath(f'{source_chunk.name}.SOCKET'),
)
def test7():
_init(7)
target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/7-transfer_inform.py')
target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET')
execute_print_receive_chunks(
['tee', str(target_file_tmpl)],
target_socket,
target_file_tmpl,
)
def test8():
repo_name = 'subprocess_util'