subprocess-util/test.py
2023-01-10 20:23:28 +01:00

97 lines
3.0 KiB
Python

import os
from pathlib import Path
from exec_capture import execute_capture
from exec_print_capture import execute_print_capture
from exec_print_transfer import execute_print_transfer_chunks
from rsync_inform import rsync_inform
def test():
# test1()
# test2()
# test3()
# test4()
test5()
def test1():
print("TEST 1")
returncode, out, err = execute_capture(['ls', '-la'])
print(f'stdout:\n{out}\nstderr:\n{err}')
print()
returncode, out, err = execute_capture(['ls', '/foo/bar'])
print(f'stdout:\n{out}\nstderr:\n{err}')
def test2():
print("TEST 2")
returncode, out, err = execute_print_capture(['ls', '-la'])
print()
returncode, out, err = execute_print_capture(['ls', '/foo/bar'])
def test3():
print("TEST 3-1")
execute_print_capture(['rm', '-rf', 'test/3-1', 'test/3-2', 'test/3-3'])
returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1'))
print("TEST 3-2")
returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2'))
print("TEST 3-3")
returncode = execute_print_transfer_chunks(['cat', 'rsync_inform.py'], Path('test/3-3'),
chunk_size=1024)
def test4():
print("TEST 4")
execute_print_capture(['rm', '-rf', 'test/4-rsync-error', 'test/4-ssh-error', 'test/4-UNIX-socket'])
rsync_inform(
['ls', 'test/4-rsync-error'], # rsync src to dst
['ls', 'test/4-ssh-error'], # ssh target-pc 'echo "OK" | nc -U "/path/to/unix-socket"'
Path('test/4-UNIX-socket')
)
def test5():
print("TEST 5")
source_file = Path('rsync_inform.py') # A python script file with some content to copy ;)
chunk_file = Path('test/5')
remote_target_file = Path(f'test/5-copy-of-{source_file}')
ssh_error_file = Path('test/5-ssh-error-while-informing-remote-pc')
user_input_file = Path('test/5-user-input')
concat_script = Path('test/5-concat')
execute_print_capture(['rm', '-rf',
str(chunk_file),
str(remote_target_file),
str(ssh_error_file),
str(user_input_file),
str(concat_script)])
concat_script.write_text(f'#!/usr/bin/bash\n'
f'echo "rsync {chunk_file} {remote_target_file} command output ..."\n'
f'cat "$1" >> "$2"')
os.chmod(concat_script, 0o0755)
rsync_cmd = [str(concat_script), str(chunk_file), str(remote_target_file)]
inform_cmd = ['ls', str(ssh_error_file)]
# TODO:
# When running this test:
# The intentionally generated error, can be fixed by executing
# f'touch str(ssh_error_file)'
execute_print_transfer_chunks(
command=['cat', str(source_file)],
chunk_file=chunk_file,
chunk_transfer_fun=rsync_inform,
chunk_transfer_args=(rsync_cmd, inform_cmd, user_input_file),
chunk_size=512,
)
if __name__ == '__main__':
test()