#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import shlex import shutil import socket from pathlib import Path from exec_capture import execute_capture from exec_print_capture import execute_print_capture from exec_print_receive import execute_print_receive_chunks from exec_print_transfer import execute_print_transfer_chunks from transfer_inform import transfer_inform def test(): # test1() # test2() # test3() # test4() # test5() test67() def test1(): _init(1) returncode, out, err = execute_capture(['ls', '-la']) print(f'stdout:\n{out}\nstderr:\n{err}') print() returncode, out, err = execute_capture(['ls', '/foo/bar']) print(f'stdout:\n{out}\nstderr:\n{err}') def test2(): _init(2) returncode, out, err = execute_print_capture(['ls', '-la']) print() returncode, out, err = execute_print_capture(['ls', '/foo/bar']) def test3(): _init(3) def _chunk_transfer(chunk_file: Path, eof: bool): print(f'Transferring chunk {chunk_file} to ... (This is the default method, it has no effect)') if eof: print(f'The last chunk has been transferred.') returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1'), _chunk_transfer, ()) print("TEST 3-2") returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2'), _chunk_transfer, ()) print("TEST 3-3") returncode = execute_print_transfer_chunks(['cat', 'transfer_inform.py'], Path('test/3-3'), _chunk_transfer, (), chunk_size=1024) def test4(): _init(4) transfer_inform( ['ls', 'test/4-rsync-error'], # rsync src to dst ['ls', 'test/4-ssh-error'], # ssh target-pc 'echo "OK" | nc -U "/path/to/unix-socket"' Path('test/4-UNIX-socket') ) def test5(): _init(5) chunk_file_tmpl = Path('test/5') source_file = Path('transfer_inform.py') # A python script file with some content to copy ;) remote_target_file = Path(f'test/5-copy-of-{source_file}') concat_script = Path('test/5-concat') concat_script.write_text(f'#!/usr/bin/bash\n' f'echo "rsync $1 ... command output"\n' f'cat "$1" >> "$2"') os.chmod(concat_script, 0o0755) # TODO: # When running this test: # The intentionally generated error, can be fixed by touching (creating) the missing file. execute_print_transfer_chunks( command=['cat', str(source_file)], chunk_file_tmpl=chunk_file_tmpl, chunk_transfer_fun=_test5_chunk_transfer_fun, chunk_transfer_args=(concat_script, remote_target_file), chunk_size=512, ) def _test5_chunk_transfer_fun(chunk_file: Path, chunk_number: int, eof: bool, concat_script: Path, remote_target_file: Path): rsync_cmd = [str(concat_script), str(chunk_file), str(remote_target_file)] inform_cmd = ['ls', chunk_file.parent.joinpath(f'5.remote.EOF={eof}'), ] transfer_inform( rsync_cmd=rsync_cmd, inform_cmd=inform_cmd, user_input_file=chunk_file.parent.joinpath(f'5.SOCKET'), ) def test67(): hostname = socket.gethostname() if hostname == 'yodaTux': test6() # LOCAL elif hostname == 'danctnix': test7() # REMOTE else: print(f'Unknown hostname {hostname}') def test6(): _init(6) source_file = Path('transfer_inform.py') # A python script file with some content to copy ;) chunk_file_tmpl = Path('test/6-transfer_inform.py') ssh_target = 'pine-pwdless' target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/7-transfer_inform.py') execute_print_transfer_chunks( command=['cat', str(source_file)], chunk_file_tmpl=chunk_file_tmpl, chunk_transfer_fun=_test6_chunk_transfer_fun, chunk_transfer_args=(chunk_file_tmpl, ssh_target, target_file_tmpl), chunk_size=512, ) def _test6_chunk_transfer_fun(source_chunk: Path, chunk_number: int, eof: bool, chunk_file_tmpl: Path, ssh_target: str, target_file_tmpl: Path, ): target_chunk = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.{chunk_number}') rsync_cmd = ['rsync', str(source_chunk), f'{ssh_target}:{str(target_chunk)}'] message = 'EOF' if eof else 'OK' target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET') inform_cmd = ['ssh', ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}'] transfer_inform( rsync_cmd=rsync_cmd, inform_cmd=inform_cmd, user_input_file=source_chunk.parent.joinpath(f'{source_chunk.name}.SOCKET'), ) def test7(): _init(7) target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/7-transfer_inform.py') target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET') execute_print_receive_chunks( ['tee', str(target_file_tmpl)], target_socket, target_file_tmpl, ) def _init(test_number: int): print(f"TEST {test_number}") test_dir = Path('test') if test_dir.exists(): shutil.rmtree('test') Path('test').mkdir(exist_ok=False) if __name__ == '__main__': test()