mirror of
https://codeberg.org/privacy1st/subprocess-util
synced 2024-12-22 22:06:05 +01:00
175 lines
5.1 KiB
Python
175 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
import os
|
|
import shlex
|
|
import shutil
|
|
import socket
|
|
from pathlib import Path
|
|
|
|
from exec_capture import execute_capture
|
|
from exec_print_capture import execute_print_capture
|
|
from exec_print_receive import execute_print_receive_chunks
|
|
from exec_print_transfer import execute_print_transfer_chunks
|
|
from transfer_inform import transfer_inform
|
|
|
|
|
|
def test():
|
|
# test1()
|
|
# test2()
|
|
# test3()
|
|
# test4()
|
|
# test5()
|
|
test67()
|
|
|
|
|
|
def test1():
|
|
_init(1)
|
|
|
|
returncode, out, err = execute_capture(['ls', '-la'])
|
|
print(f'stdout:\n{out}\nstderr:\n{err}')
|
|
print()
|
|
returncode, out, err = execute_capture(['ls', '/foo/bar'])
|
|
print(f'stdout:\n{out}\nstderr:\n{err}')
|
|
|
|
|
|
def test2():
|
|
_init(2)
|
|
|
|
returncode, out, err = execute_print_capture(['ls', '-la'])
|
|
print()
|
|
returncode, out, err = execute_print_capture(['ls', '/foo/bar'])
|
|
|
|
|
|
def test3():
|
|
_init(3)
|
|
|
|
returncode = execute_print_transfer_chunks(['ls', '-la'], Path('test/3-1'))
|
|
print("TEST 3-2")
|
|
returncode = execute_print_transfer_chunks(['ls', '/foo/bar'], Path('test/3-2'))
|
|
print("TEST 3-3")
|
|
returncode = execute_print_transfer_chunks(['cat', 'transfer_inform.py'], Path('test/3-3'),
|
|
chunk_size=1024)
|
|
|
|
|
|
def test4():
|
|
_init(4)
|
|
|
|
transfer_inform(
|
|
['ls', 'test/4-rsync-error'], # rsync src to dst
|
|
['ls', 'test/4-ssh-error'], # ssh target-pc 'echo "OK" | nc -U "/path/to/unix-socket"'
|
|
Path('test/4-UNIX-socket')
|
|
)
|
|
|
|
|
|
def test5():
|
|
_init(5)
|
|
|
|
chunk_file_tmpl = Path('test/5')
|
|
source_file = Path('transfer_inform.py') # A python script file with some content to copy ;)
|
|
remote_target_file = Path(f'test/5-copy-of-{source_file}')
|
|
concat_script = Path('test/5-concat')
|
|
|
|
concat_script.write_text(f'#!/usr/bin/bash\n'
|
|
f'echo "rsync $1 ... command output"\n'
|
|
f'cat "$1" >> "$2"')
|
|
os.chmod(concat_script, 0o0755)
|
|
|
|
# TODO:
|
|
# When running this test:
|
|
# The intentionally generated error, can be fixed by touching (creating) the missing file.
|
|
|
|
execute_print_transfer_chunks(
|
|
command=['cat', str(source_file)],
|
|
chunk_file_tmpl=chunk_file_tmpl,
|
|
chunk_transfer_fun=_test5_chunk_transfer_fun,
|
|
chunk_transfer_args=(concat_script, remote_target_file),
|
|
chunk_size=512,
|
|
)
|
|
|
|
|
|
def _test5_chunk_transfer_fun(chunk_file: Path,
|
|
eof: bool,
|
|
concat_script: Path,
|
|
remote_target_file: Path):
|
|
rsync_cmd = [str(concat_script), str(chunk_file), str(remote_target_file)]
|
|
inform_cmd = ['ls',
|
|
chunk_file.parent.joinpath(f'5.remote.EOF={eof}'),
|
|
]
|
|
|
|
transfer_inform(
|
|
rsync_cmd=rsync_cmd,
|
|
inform_cmd=inform_cmd,
|
|
user_input_file=chunk_file.parent.joinpath(f'5.SOCKET'),
|
|
)
|
|
|
|
|
|
def test67():
|
|
hostname = socket.gethostname()
|
|
if hostname == 'yodaTux':
|
|
test6() # LOCAL
|
|
elif hostname == 'danctnix':
|
|
test7() # REMOTE
|
|
else:
|
|
print(f'Unknown hostname {hostname}')
|
|
|
|
|
|
def test6():
|
|
_init(6)
|
|
|
|
source_file = Path('transfer_inform.py') # A python script file with some content to copy ;)
|
|
chunk_file_tmpl = Path('test/6-transfer_inform.py')
|
|
ssh_target = 'pine-pwdless'
|
|
target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/6-transfer_inform.py')
|
|
|
|
execute_print_transfer_chunks(
|
|
command=['cat', str(source_file)],
|
|
chunk_file_tmpl=chunk_file_tmpl,
|
|
chunk_transfer_fun=_test6_chunk_transfer_fun,
|
|
chunk_transfer_args=(chunk_file_tmpl, ssh_target, target_file_tmpl),
|
|
chunk_size=512,
|
|
)
|
|
|
|
|
|
def _test6_chunk_transfer_fun(source_chunk: Path,
|
|
eof: bool,
|
|
chunk_file_tmpl: Path,
|
|
ssh_target: str,
|
|
target_file_tmpl: Path,
|
|
):
|
|
target_chunk = target_file_tmpl.parent.joinpath(f'{source_chunk.name}')
|
|
rsync_cmd = ['rsync', str(source_chunk), f'{ssh_target}:{str(target_chunk)}']
|
|
message = 'EOF' if eof else 'OK'
|
|
target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET')
|
|
inform_cmd = ['ssh', ssh_target, f'echo {message} | nc -U {shlex.quote(str(target_socket))}']
|
|
|
|
transfer_inform(
|
|
rsync_cmd=rsync_cmd,
|
|
inform_cmd=inform_cmd,
|
|
user_input_file=source_chunk.parent.joinpath(f'{source_chunk.name}.SOCKET'),
|
|
)
|
|
|
|
|
|
def test7():
|
|
_init(7)
|
|
|
|
target_file_tmpl = Path(f'/home/alarm/subprocess_util/test/6-transfer_inform.py')
|
|
target_socket = target_file_tmpl.parent.joinpath(f'{target_file_tmpl.name}.SOCKET')
|
|
|
|
execute_print_receive_chunks(
|
|
['tee', str(target_file_tmpl)],
|
|
target_socket,
|
|
target_file_tmpl,
|
|
)
|
|
|
|
|
|
def _init(test_number: int):
|
|
print(f"TEST {test_number}")
|
|
test_dir = Path('test')
|
|
if test_dir.exists():
|
|
shutil.rmtree('test')
|
|
Path('test').mkdir(exist_ok=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test()
|