subprocess-util/test.py

112 lines
3.1 KiB
Python

import os
import shutil
from pathlib import Path
from exec_capture import execute_capture
from exec_print_capture import execute_print_capture
from exec_print_transfer import execute_print_transfer_chunks
from transfer_inform import transfer_inform
def test():
# test1()
# test2()
# test3()
# test4()
test5()
def test1():
_init(1)
returncode, out, err = execute_capture(['ls', '-la'])
print(f'stdout:\n{out}\nstderr:\n{err}')
print()
returncode, out, err = execute_capture(['ls', '/foo/bar'])
print(f'stdout:\n{out}\nstderr:\n{err}')
def test2():
_init(2)
returncode, out, err = execute_print_capture(['ls', '-la'])
print()
returncode, out, err = execute_print_capture(['ls', '/foo/bar'])
def test3():
_init(3)
returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1'))
print("TEST 3-2")
returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2'))
print("TEST 3-3")
returncode = execute_print_transfer_chunks(['cat', 'transfer_inform.py'], Path('test/3-3'),
chunk_size=1024)
def test4():
_init(4)
transfer_inform(
['ls', 'test/4-rsync-error'], # rsync src to dst
['ls', 'test/4-ssh-error'], # ssh target-pc 'echo "OK" | nc -U "/path/to/unix-socket"'
Path('test/4-UNIX-socket')
)
def test5():
_init(5)
chunk_file_tmpl = Path('test/5')
source_file = Path('transfer_inform.py') # A python script file with some content to copy ;)
remote_target_file = Path(f'test/5-copy-of-{source_file}')
ssh_error_file = Path('test/5-ssh-error-while-informing-remote-pc')
concat_script = Path('test/5-concat')
concat_script.write_text(f'#!/usr/bin/bash\n'
f'echo "rsync $1 ... command output"\n'
f'cat "$1" >> "$2"')
os.chmod(concat_script, 0o0755)
# TODO:
# When running this test:
# The intentionally generated error, can be fixed by touching (creating) the missing file.
execute_print_transfer_chunks(
command=['cat', str(source_file)],
chunk_file_tmpl=chunk_file_tmpl,
chunk_transfer_fun=_chunk_transfer_fun,
chunk_transfer_args=(ssh_error_file, concat_script, remote_target_file),
chunk_size=512,
)
def _chunk_transfer_fun(chunk_file: Path,
eof: bool,
ssh_error_file: Path,
concat_script: Path,
remote_target_file: Path):
rsync_cmd = [str(concat_script), str(chunk_file), str(remote_target_file)]
inform_cmd = ['ls',
chunk_file.parent.joinpath(f'5.remote.EOF={eof}'),
]
transfer_inform(
rsync_cmd=rsync_cmd,
inform_cmd=inform_cmd,
user_input_file=chunk_file.parent.joinpath(f'5.SOCKET'),
)
def _init(test_number: int):
print(f"TEST {test_number}")
test_dir = Path('test')
if test_dir.exists():
shutil.rmtree('test')
Path('test').mkdir(exist_ok=False)
if __name__ == '__main__':
test()