From 07d1fcbca9ed92683b57e505e8a425c056472378 Mon Sep 17 00:00:00 2001 From: Daniel Langbein Date: Wed, 11 Jan 2023 11:04:27 +0100 Subject: [PATCH] add execute_print_receive_chunks(); add Makefile; add test --- .gitignore | 3 +- Makefile | 33 +++++++++++++++++ exec_capture.py | 2 ++ exec_print_capture.py | 8 +++-- exec_print_receive.py | 72 +++++++++++++++++++++++++++++++++++++ exec_print_transfer.py | 26 +++++++------- receive_inform.py | 41 +++++++++++---------- test.py | 81 +++++++++++++++++++++++++++++++++++++----- transfer_inform.py | 2 ++ unix_sock_input.py | 2 ++ 10 files changed, 224 insertions(+), 46 deletions(-) create mode 100644 Makefile create mode 100644 exec_print_receive.py diff --git a/.gitignore b/.gitignore index d56e3c4..e209ae4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /.idea -/test \ No newline at end of file +/test +/venv \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..eb74769 --- /dev/null +++ b/Makefile @@ -0,0 +1,33 @@ +.ONESHELL: +SHELL := bash +# https://github.com/JordanMartinez/purescript-cookbook/blob/master/makefile +# set -e = bash immediately exits if any command has a non-zero exit status. +# set -u = a reference to any shell variable you haven't previously +# defined -- with the exceptions of $* and $@ -- is an error, and causes +# the program to immediately exit with non-zero code. +# set -o pipefail = the first non-zero exit code emitted in one part of a +# pipeline (e.g. `cat file.txt | grep 'foo'`) will be used as the exit +# code for the entire pipeline. If all exit codes of a pipeline are zero, +# the pipeline will emit an exit code of 0. +.SHELLFLAGS := -eu -o pipefail -c + +.PHONY: all +all: venv + +# Python Dependency Locking with pip-tools +# https://lincolnloop.com/insights/python-dependency-locking-pip-tools/ + +.PHONY: test +test: venv + source venv/bin/activate + python3 test.py + +venv: + if [ -d venv ]; then + rm -r venv + fi + python3 -m venv venv + +.PHONY: clean +clean: + rm -rf venv .mypy_cache build dist __pycache__ test diff --git a/exec_capture.py b/exec_capture.py index a55a895..4e3b94a 100644 --- a/exec_capture.py +++ b/exec_capture.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- import subprocess diff --git a/exec_print_capture.py b/exec_print_capture.py index bb27ade..ba97e81 100644 --- a/exec_print_capture.py +++ b/exec_print_capture.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- from queue import Queue import sys import threading @@ -17,15 +19,15 @@ class _Assert: raise ValueError(f'Expected a and b to be equal: {a}, {b}') -def _read_output(pipe: IO[AnyStr], queue_put: Queue.put, list_append: list.append, prefix: str = ''): +def _read_output(str_pipe: IO[AnyStr], queue_put: Queue.put, list_append: list.append, prefix: str = ''): line: str - for line in pipe: + for line in str_pipe: func: Callable[[str], None] for func in (queue_put, list_append): func(f'{prefix}{line}') # TODO: Has this any effect? - # pipe.close() + # str_pipe.close() def _write_output(queue_get: Queue.get): diff --git a/exec_print_receive.py b/exec_print_receive.py new file mode 100644 index 0000000..da7187f --- /dev/null +++ b/exec_print_receive.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import subprocess +import sys +import threading +from pathlib import Path +from typing import IO, AnyStr + +from receive_inform import receive_inform + + +def _print_stdout(bin_pipe: IO[AnyStr]): + b: bytes + for b in bin_pipe: + sys.stdout.write(f'[STDOUT] {b.decode("UTF-8")}') + + # TODO: Has this any effect? + # bin_pipe.close() + + +def _print_stderr(bin_pipe: IO[AnyStr]): + b: bytes + for b in bin_pipe: + sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') + + # TODO: Has this any effect? + # bin_pipe.close() + + +def execute_print_receive_chunks(command: list[str], + socket_file: Path, + chunk_file_tmpl: Path) -> int: + process = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + ) + + # TODO - This function + # - execute the command (print live stdout/stderr) + # + # TODO - Inside receive_inform() + # - while True + # - wait for message at socket_file + # - if 'OK\n' + # - determine chunk filename + # - read chunk, pass to stdin of command + # - delete chunk + # - if 'EOF\n' + # - break + # + # TODO - This function + # - wait for command to finish + + t_out = threading.Thread( + target=_print_stdout, args=(process.stdout,)) + t_err = threading.Thread( + target=_print_stderr, args=(process.stderr,)) + + for t in (t_out, t_err): + t.daemon = True + t.start() + + receive_inform(process.stdin, socket_file, chunk_file_tmpl) + + returncode = process.wait() + for t in (t_out, t_err): + t.join() + + return returncode diff --git a/exec_print_transfer.py b/exec_print_transfer.py index 500e1f3..c07236d 100644 --- a/exec_print_transfer.py +++ b/exec_print_transfer.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- from pathlib import Path import sys import threading @@ -26,20 +28,20 @@ def _save_chunk(chunk: bytes, chunk_file: Path): f.write(chunk) -def _save_output(pipe: IO[AnyStr], stdout_dir: Path): +def _save_output(out_pipe: IO[AnyStr], stdout_dir: Path): stdout_dir.mkdir(parents=True, exist_ok=False) b: bytes ct: int = 1 - for b in pipe: + for b in out_pipe: stdout_dir.joinpath(str(ct)).write_bytes(b) ct += 1 # TODO: Has this any effect? - # pipe.close() + # out_pipe.close() -def _save_output_rotating_chunks(pipe: IO[AnyStr], chunk_file_tmpl: Path, chunk_size, +def _save_output_rotating_chunks(out_pipe: IO[AnyStr], chunk_file_tmpl: Path, chunk_size, chunk_transfer_fun: Callable, chunk_transfer_args: tuple): chunk_file_tmpl.parent.mkdir(parents=True, exist_ok=True) @@ -50,7 +52,7 @@ def _save_output_rotating_chunks(pipe: IO[AnyStr], chunk_file_tmpl: Path, chunk_ # https://docs.python.org/3/library/io.html#io.RawIOBase.read # If 0 bytes are returned, and size was not 0, this indicates end of file. # If the object is in non-blocking mode and no bytes are available, None is returned. - b = pipe.read(remaining_bytes) + b = out_pipe.read(remaining_bytes) if len(b) == 0: # EOF reached. chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') @@ -78,16 +80,16 @@ def _save_output_rotating_chunks(pipe: IO[AnyStr], chunk_file_tmpl: Path, chunk_ raise ValueError('Invalid state') # TODO: Has this any effect? - # pipe.close() + # out_pipe.close() -def _print_output(pipe: IO[AnyStr]): - line: str - for line in pipe: - sys.stderr.write(f'[STDERR] {line}') +def _print_stderr(bin_pipe: IO[AnyStr]): + b: bytes + for b in bin_pipe: + sys.stderr.write(f'[STDERR] {b.decode("UTF-8")}') # TODO: Has this any effect? - # pipe.close() + # bin_pipe.close() def execute_print_transfer_chunks(command: list[str], @@ -130,7 +132,7 @@ def execute_print_transfer_chunks(command: list[str], target=_save_output_rotating_chunks, args=(process.stdout, chunk_file_tmpl, chunk_size, chunk_transfer_fun, chunk_transfer_args)) t_err = threading.Thread( - target=_print_output, args=(process.stderr,)) + target=_print_stderr, args=(process.stderr,)) for t in (t_out, t_err): t.daemon = True diff --git a/receive_inform.py b/receive_inform.py index 769a8a2..e7c506b 100644 --- a/receive_inform.py +++ b/receive_inform.py @@ -1,30 +1,22 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- import socket from pathlib import Path +from typing import IO, AnyStr from unix_sock_input import accept_loop_until_command_received -def receive_inform(cmd: list[str], socket_file: Path, chunk_file_tmpl: Path): +def receive_inform(in_pipe: IO[AnyStr], + socket_file: Path, + chunk_file_tmpl: Path) -> None: """ + :param in_pipe: :param chunk_file_tmpl: :param cmd: :param socket_file: Create a UNIX socket and wait for messages. :return: """ - - # TODO - # - execute the command - - # TODO - # - while True - # - wait for message at socket_file - # - if 'OK\n' - # - determine chunk filename - # - read chunk, pass to stdin of command - # - delete chunk - # - if 'EOF\n' - # - break - print(f'Listening on socket {socket_file}') sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(str(socket_file)) @@ -34,11 +26,16 @@ def receive_inform(cmd: list[str], socket_file: Path, chunk_file_tmpl: Path): commands = [b'OK\n', b'EOF\n'] while True: command = accept_loop_until_command_received(sock, commands) + if command not in commands: + raise ValueError("Invalid state") + + chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') + chunk = chunk_file.read_bytes() + in_pipe.write(chunk) + # in_pipe.flush() # TODO: is this required? + chunk_file.unlink(missing_ok=False) + if command == b'OK\n': - chunk_file = chunk_file_tmpl.parent.joinpath(f'{chunk_file_tmpl.name}.{ct}') - chunk = chunk_file.read_bytes() - # TODO send chunk to stdin of command - chunk_file.unlink(missing_ok=False) ct += 1 elif command == b'EOF\n': break @@ -49,5 +46,7 @@ def receive_inform(cmd: list[str], socket_file: Path, chunk_file_tmpl: Path): sock.close() socket_file.unlink(missing_ok=False) - # TODO - # - wait for command to finish + in_pipe.flush() + + # TODO: Has this any effect? On stdin probably yes! + in_pipe.close() diff --git a/test.py b/test.py index 3fda382..bcd2d8e 100644 --- a/test.py +++ b/test.py @@ -1,9 +1,14 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- import os +import shlex import shutil +import socket from pathlib import Path from exec_capture import execute_capture from exec_print_capture import execute_print_capture +from exec_print_receive import execute_print_receive_chunks from exec_print_transfer import execute_print_transfer_chunks from transfer_inform import transfer_inform @@ -13,7 +18,8 @@ def test(): # test2() # test3() # test4() - test5() + # test5() + test67() def test1(): @@ -61,7 +67,6 @@ def test5(): chunk_file_tmpl = Path('test/5') source_file = Path('transfer_inform.py') # A python script file with some content to copy ;) remote_target_file = Path(f'test/5-copy-of-{source_file}') - ssh_error_file = Path('test/5-ssh-error-while-informing-remote-pc') concat_script = Path('test/5-concat') concat_script.write_text(f'#!/usr/bin/bash\n' @@ -76,17 +81,16 @@ def test5(): execute_print_transfer_chunks( command=['cat', str(source_file)], chunk_file_tmpl=chunk_file_tmpl, - chunk_transfer_fun=_chunk_transfer_fun, - chunk_transfer_args=(ssh_error_file, concat_script, remote_target_file), + chunk_transfer_fun=_test5_chunk_transfer_fun, + chunk_transfer_args=(concat_script, remote_target_file), chunk_size=512, ) -def _chunk_transfer_fun(chunk_file: Path, - eof: bool, - ssh_error_file: Path, - concat_script: Path, - remote_target_file: Path): +def _test5_chunk_transfer_fun(chunk_file: Path, + eof: bool, + concat_script: Path, + remote_target_file: Path): rsync_cmd = [str(concat_script), str(chunk_file), str(remote_target_file)] inform_cmd = ['ls', chunk_file.parent.joinpath(f'5.remote.EOF={eof}'), @@ -99,6 +103,65 @@ def _chunk_transfer_fun(chunk_file: Path, ) +def test67(): + hostname = socket.gethostname() + if hostname == 'yodaTux': + test6() # LOCAL + elif hostname == 'danctnix': + test7() # REMOTE + else: + print(f'Unknown hostname {hostname}') + + +def test6(): + _init(6) + + source_file = Path('transfer_inform.py') # A python script file with some content to copy ;) + chunk_file_tmpl = Path('test/6-transfer_inform.py') + ssh_target = 'pine-pwdless' + target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/6-transfer_inform.py') + + execute_print_transfer_chunks( + command=['cat', str(source_file)], + chunk_file_tmpl=chunk_file_tmpl, + chunk_transfer_fun=_test6_chunk_transfer_fun, + chunk_transfer_args=(chunk_file_tmpl, ssh_target, target_file_tmpl), + chunk_size=512, + ) + + +def _test6_chunk_transfer_fun(source_chunk: Path, + eof: bool, + chunk_file_tmpl: Path, + ssh_target: str, + target_file_tmpl: Path, + ): + target_chunk = target_file_tmpl.parent.joinpath(f'{source_chunk.name}') + rsync_cmd = ['rsync', str(source_chunk), f'{ssh_target}:{str(target_chunk)}'] + message = 'EOF' if eof else 'OK' + target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET') + inform_cmd = ['ssh', ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] + + transfer_inform( + rsync_cmd=rsync_cmd, + inform_cmd=inform_cmd, + user_input_file=source_chunk.parent.joinpath(f'{source_chunk.name}.SOCKET'), + ) + + +def test7(): + _init(7) + + target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/6-transfer_inform.py') + target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET') + + execute_print_receive_chunks( + ['tee', str(target_file_tmpl)], + target_socket, + target_file_tmpl, + ) + + def _init(test_number: int): print(f"TEST {test_number}") test_dir = Path('test') diff --git a/transfer_inform.py b/transfer_inform.py index 1ffca4b..4e66348 100644 --- a/transfer_inform.py +++ b/transfer_inform.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- from pathlib import Path from exec_print_capture import execute_print_capture diff --git a/unix_sock_input.py b/unix_sock_input.py index a3da123..e74ca0b 100644 --- a/unix_sock_input.py +++ b/unix_sock_input.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- import socket from pathlib import Path