mirror of
https://codeberg.org/privacy1st/subprocess-util
synced 2024-12-22 22:06:05 +01:00
126 lines
4.1 KiB
Python
126 lines
4.1 KiB
Python
from queue import Queue
|
|
import sys
|
|
import threading
|
|
import subprocess
|
|
from typing import AnyStr, IO, Callable
|
|
|
|
|
|
class _Assert:
|
|
@staticmethod
|
|
def true(a):
|
|
if not a:
|
|
raise ValueError(f'Expected a to be true: {a}')
|
|
|
|
@staticmethod
|
|
def equal(a, b):
|
|
if a != b:
|
|
raise ValueError(f'Expected a and b to be equal: {a}, {b}')
|
|
|
|
|
|
def _read_output(pipe: IO[AnyStr], queue_put: Queue.put, list_append: list.append, prefix: str = ''):
|
|
line: str
|
|
for line in pipe:
|
|
func: Callable[[str], None]
|
|
for func in (queue_put, list_append):
|
|
func(f'{prefix}{line}')
|
|
|
|
# TODO: Has this any effect?
|
|
# pipe.close()
|
|
|
|
|
|
def _write_output(queue_get: Queue.get):
|
|
# Take items out of queue until taken item is None.
|
|
for line in iter(queue_get, None):
|
|
sys.stdout.write(line)
|
|
|
|
|
|
# Goal: We want to **capture** and **print** stdout/stderr while running the command.
|
|
#
|
|
#
|
|
# https://docs.python.org/3/library/subprocess.html#using-the-subprocess-module
|
|
#
|
|
# The recommended approach to invoking subprocesses is to use the run() function for all use cases it can handle.
|
|
# For more advanced use cases, the underlying Popen interface can be used directly.
|
|
#
|
|
# subprocess.run():
|
|
#
|
|
# - Run the command described by args. Wait for command to complete,
|
|
# then return a CompletedProcess instance.
|
|
#
|
|
# - capture_output: If capture_output is true, stdout and stderr will be captured.
|
|
# When used, the internal Popen object is automatically created with stdout=PIPE and stderr=PIPE.
|
|
# The stdout and stderr arguments may not be supplied at the same time as capture_output.
|
|
#
|
|
# - Conclusion: One cannot print and capture stdout/stderr at the same time.
|
|
# - If we need to stream output as it appears in real time, we can use Popen instead.
|
|
# (https://csatlas.com/python-subprocess-run-stdout-stderr/)
|
|
#
|
|
# subprocess.Popen():
|
|
#
|
|
# - args: Should be a sequence of program arguments. By default, the program to execute is the first item in args.
|
|
# - Warning: For maximum reliability, use a fully qualified path for the executable.
|
|
# To search for an unqualified name on PATH, use shutil.which().
|
|
# - stdin, stdout, stderr:
|
|
# Valid values are PIPE, DEVNULL, an existing file descriptor (a positive integer),
|
|
# an existing file object with a valid file descriptor, and None.
|
|
# PIPE indicates that a new pipe to the child should be created.
|
|
#
|
|
# subprocess.PIPE: Most useful with Popen.communicate().
|
|
#
|
|
# Popen.communicate():
|
|
#
|
|
# - Read data from stdout and stderr, until end-of-file is reached.
|
|
# - Note: The data read is buffered in memory, so do not use this method if the data size is large or unlimited.
|
|
def execute_print_capture(command: list[str], encoding='UTF-8') -> [int, list[str], list[str]]:
|
|
"""
|
|
Executes the given command.
|
|
|
|
Stdout and stderr are printed in real time.
|
|
|
|
:param command: Command to execute, e.g. ['ls', '-la', '/home']
|
|
:param encoding:
|
|
:return: (returncode, stdout, stderr)
|
|
"""
|
|
|
|
# INSPIRATION OF THIS IMPLEMENTATION:
|
|
# https://www.itcodar.com/python/subprocess-popen-cloning-stdout-and-stderr-both-to-terminal-and-variables.html
|
|
# ALTERNATIVE IMPLEMENTATION:
|
|
# https://stackoverflow.com/a/26611142
|
|
|
|
_Assert.equal(sys.stdout.encoding.upper(), encoding)
|
|
|
|
process = subprocess.Popen(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
close_fds=True,
|
|
bufsize=1,
|
|
text=True,
|
|
encoding=encoding,
|
|
)
|
|
|
|
q = Queue()
|
|
out: list[str] = []
|
|
err: list[str] = []
|
|
|
|
t_out = threading.Thread(
|
|
target=_read_output, args=(process.stdout, q.put, out.append, "[STDOUT] "))
|
|
t_err = threading.Thread(
|
|
target=_read_output, args=(process.stderr, q.put, err.append, "[STDERR] "))
|
|
t_write = threading.Thread(
|
|
target=_write_output, args=(q.get,))
|
|
|
|
for t in (t_out, t_err, t_write):
|
|
t.daemon = True
|
|
t.start()
|
|
returncode = process.wait()
|
|
|
|
for t in (t_out, t_err):
|
|
t.join()
|
|
|
|
# This ends iter(queue_get, None), see above.
|
|
q.put(None)
|
|
t_write.join()
|
|
|
|
return returncode, out, err
|