diff --git a/src/p1st/data_units.py b/src/p1st/data_units.py index 3134596..2e50bea 100644 --- a/src/p1st/data_units.py +++ b/src/p1st/data_units.py @@ -7,6 +7,16 @@ def test(): class DataUnitConverter: + """ + A class to convert between data units. + + Example: 1.000.000 B == 1 MB + + The main purpose is to convert a number with many digits + to a "larger" unit so that the number is shorter + and more readable. + """ + class DataUnit(Enum): B = 'B' KB = 'KB' diff --git a/src/p1st/exec_capture.py b/src/p1st/exec_capture.py index b5e7cd3..06de106 100644 --- a/src/p1st/exec_capture.py +++ b/src/p1st/exec_capture.py @@ -4,6 +4,10 @@ import subprocess def execute_capture(command: list[str]) -> tuple[int, str, str]: + """ + :param command: Executes the given `command` in a subprocess. + :return: (returncode, stdout, stderr) + """ completed: subprocess.CompletedProcess = subprocess.run( command, capture_output=True, diff --git a/src/p1st/exec_consume_chunks.py b/src/p1st/exec_consume_chunks.py index 5719a32..bed722e 100644 --- a/src/p1st/exec_consume_chunks.py +++ b/src/p1st/exec_consume_chunks.py @@ -16,7 +16,16 @@ def execute_consume_chunks(command: list[str], handle_chunks: Callable[[Queue.put], None], ) -> int: """ - Local chunks are deleted after they are fed to stdin of subprocess. + The `command` is executed in a subprocess. + + `handle_chunks` is executed in a separate thread. + After `handle_chunks` has saved a new chunk, + the path of the saved chunk is added to a `Queue`. + + For each chunk that has been added to the `Queue`, we + - read the chunk from disk + - write the chunk to the stdin of the subprocess + - and delete the chunk :param command: :param handle_chunks: Has one parameter, `queue_put`. `handle_chunks` must call queue_put.(chunk_path, last_chunk) for each saved chunk. diff --git a/src/p1st/exec_print_capture.py b/src/p1st/exec_print_capture.py index 434a192..ce5fe58 100644 --- a/src/p1st/exec_print_capture.py +++ b/src/p1st/exec_print_capture.py @@ -7,60 +7,29 @@ import subprocess from typing import AnyStr, IO -class _Assert: - @staticmethod - def true(a): - if not a: - raise ValueError(f'Expected a to be true: {a}') - - @staticmethod - def equal(a, b): - if a != b: - raise ValueError(f'Expected a and b to be equal: {a}, {b}') - - -def _read_output(str_pipe: IO[AnyStr], - queue_put: Queue.put, - list_append: list.append, - prefix: str = ''): - line: str - for line in str_pipe: - queue_put(f'{prefix}{line}') - list_append(line) - - # TODO: Has this any effect? - # str_pipe.close() - - -def _write_output(queue_get: Queue.get): - # Take items out of queue until taken item is None. - for line in iter(queue_get, None): - sys.stdout.write(line) - - # Goal: We want to **capture** and **print** stdout/stderr while running the command. -# -# +# +# # https://docs.python.org/3/library/subprocess.html#using-the-subprocess-module -# +# # The recommended approach to invoking subprocesses is to use the run() function for all use cases it can handle. # For more advanced use cases, the underlying Popen interface can be used directly. -# +# # subprocess.run(): -# +# # - Run the command described by args. Wait for command to complete, # then return a CompletedProcess instance. -# +# # - capture_output: If capture_output is true, stdout and stderr will be captured. # When used, the internal Popen object is automatically created with stdout=PIPE and stderr=PIPE. # The stdout and stderr arguments may not be supplied at the same time as capture_output. -# +# # - Conclusion: One cannot print and capture stdout/stderr at the same time. # - If we need to stream output as it appears in real time, we can use Popen instead. # (https://csatlas.com/python-subprocess-run-stdout-stderr/) -# +# # subprocess.Popen(): -# +# # - args: Should be a sequence of program arguments. By default, the program to execute is the first item in args. # - Warning: For maximum reliability, use a fully qualified path for the executable. # To search for an unqualified name on PATH, use shutil.which(). @@ -68,18 +37,19 @@ def _write_output(queue_get: Queue.get): # Valid values are PIPE, DEVNULL, an existing file descriptor (a positive integer), # an existing file object with a valid file descriptor, and None. # PIPE indicates that a new pipe to the child should be created. -# +# # subprocess.PIPE: Most useful with Popen.communicate(). -# +# # Popen.communicate(): -# +# # - Read data from stdout and stderr, until end-of-file is reached. # - Note: The data read is buffered in memory, so do not use this method if the data size is large or unlimited. def execute_print_capture(command: list[str], encoding='UTF-8') -> tuple[int, str, str]: """ Executes the given command. - Stdout and stderr are printed in real time. + The stdout and stderr are printed in real time + and returned after the command has finished. :param command: Command to execute, e.g. ['ls', '-la', '/home'] :param encoding: @@ -127,3 +97,34 @@ def execute_print_capture(command: list[str], encoding='UTF-8') -> tuple[int, st t_write.join() return returncode, ''.join(out), ''.join(err) + + +class _Assert: + @staticmethod + def true(a): + if not a: + raise ValueError(f'Expected a to be true: {a}') + + @staticmethod + def equal(a, b): + if a != b: + raise ValueError(f'Expected a and b to be equal: {a}, {b}') + + +def _read_output(str_pipe: IO[AnyStr], + queue_put: Queue.put, + list_append: list.append, + prefix: str = ''): + line: str + for line in str_pipe: + queue_put(f'{prefix}{line}') + list_append(line) + + # TODO: Has this any effect? + # str_pipe.close() + + +def _write_output(queue_get: Queue.get): + # Take items out of queue until taken item is None. + for line in iter(queue_get, None): + sys.stdout.write(line) diff --git a/src/p1st/exec_produce_chunks.py b/src/p1st/exec_produce_chunks.py index 0b0b080..974ea2a 100644 --- a/src/p1st/exec_produce_chunks.py +++ b/src/p1st/exec_produce_chunks.py @@ -14,15 +14,15 @@ def execute_produce_chunks(command: list[str], chunk_size: int = 1024 * 1024, ) -> int: """ - Executes the given command in a subprocess. + Executes the given `command` in a subprocess. - Stdout of the subprocess is saved in chunks. - The location of the chunks is determined by `get_chunk_path`. + The stdout of the subprocess is saved in chunks. + The location of the chunks is determined by `get_chunk_path(chunk_no)`. - A separate thread calls `handle_chunk` once a new chunk was saved. - It is the duty of `handle_chunk` to delete processed chunks from disk. + A separate thread calls `handle_chunk(chunk_no, last_chunk)` once a new chunk was saved. + It is the duty of `handle_chunk` to delete the processed chunk. - Stderr of the subprocess is printed to sys.stderr. + The stderr of the subprocess is printed to sys.stderr. :param command: :param get_chunk_path: diff --git a/src/p1st/receive_inform.py b/src/p1st/receive_inform.py index 5ea544f..fb9e58b 100644 --- a/src/p1st/receive_inform.py +++ b/src/p1st/receive_inform.py @@ -13,6 +13,19 @@ def receive_inform(in_pipe: IO[AnyStr], get_chunk_file: Callable[[Path, int], Path], ) -> None: """ + Creates a listening UNIX socket at `socket_file`. + + Waits for incoming messages `OK` and `EOF` on the socket. + + Once a message is received, + the file returned by `get_chunk_file(chunk_file_tmpl, chunk_no)` + is read and then written to `in_pipe`. + + If the received message was `EOF`, then the listening loop ends. + If the received message was `OK`, then chunk_no is increased by one and the loop continues. + + Finally, the socket is closed. + :param get_chunk_file: :param in_pipe: :param chunk_file_tmpl: diff --git a/src/p1st/repeat.py b/src/p1st/repeat.py index bd51a7f..5bd66f3 100644 --- a/src/p1st/repeat.py +++ b/src/p1st/repeat.py @@ -10,30 +10,40 @@ def repeat_until_successful(command: list[str], retries: int = 0, retry_delay_seconds: float = 5) -> str: """ - Executes the given `command`. + Executes the given `command` and returns its stdout. - If an error occurs, it creates a UNIX socket at - `socket_file` and waits for user input. + If the command has failed, it is executed at most `retires` times again, until successful. + Between each retry we wait for `retry_delay_seconds`. + + If no more retries are left, a UNIX socket is created at `socket_file` + and we wait for user input. """ while True: returncode: int - out: str - returncode, out, _err = execute_print_capture(command) + stdout: str + returncode, stdout, _stderr = execute_print_capture(command) + + # If no error occurred, return stdout of subprocess. if returncode == 0: - time.sleep(retry_delay_seconds) - return out + return stdout + # Else, an error has occurred. print(f'\n' f'Error while executing:\n' f'\t{command}\n' f'\tFor details, see above output.') + # If retry attempts are left, + # wait `retry_delay_seconds` and then execute the command again. if retries > 0: - print('\tRetrying the failed command.' - '\n') - retries = retries - 1 + print(f'\tRetrying the failed command in {retry_delay_seconds} seconds.' + f'\n') + time.sleep(retry_delay_seconds) + retries -= 1 continue + # Else, no more retry attempts are left. + # Print message and wait for user input. print(f'Info:\n' f'\tPlease fix the above error first. Then continue here:\n' f'\tsudo pacman -S --needed openbsd-netcat\n' diff --git a/src/p1st/unix_sock_input.py b/src/p1st/unix_sock_input.py index 2c680c2..3e7c1ea 100644 --- a/src/p1st/unix_sock_input.py +++ b/src/p1st/unix_sock_input.py @@ -14,7 +14,7 @@ def wait_for_message(socket_file: Path, Closes the UNIX socket. - :returns: The message that was received. + :return: The message that was received. """ # INSPIRATION: https://pymotw.com/3/socket/uds.html diff --git a/src/p1st/unix_socket_output.py b/src/p1st/unix_socket_output.py index 39e04e4..9f93063 100644 --- a/src/p1st/unix_socket_output.py +++ b/src/p1st/unix_socket_output.py @@ -4,6 +4,13 @@ from pathlib import Path def write_message(socket_file: Path, message: bytes): + """ + Writes `message` to the UNIX socket `socket_file`. + + :param socket_file: + :param message: + :return: + """ # INSPIRATION: https://pymotw.com/3/socket/uds.html