Commit cd2194f1 authored by jpic ∞'s avatar jpic ∞ 💾
Browse files

Initial commit

parents
Pipeline #2214 passed with stages
in 25 seconds
image: yourlabs/python-arch
qa:
stage: test
script: flake8
pytest:
stage: test
script:
- pip install --user -e .
- pytest -vv --cov shlax --cov-report=xml:coverage.xml --junitxml=report.xml --cov-report=term-missing --strict tests
- CI_PROJECT_PATH=yourlabs/shlax CI_BUILD_REPO=https://github.com/yourlabs/cli2 codecov-bash -f coverage.xml
artifacts:
reports:
junit: report.xml
pypi:
stage: deploy
script: pypi-release
only: [tags]
Shlax: Beautiful Async Subprocess executor
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Why?
====
In Python we now have async subprocesses which allows to execute several
subprocesses at the same time. The purpose of this library is to:
- provide an acceptable asyncio subprocess wrapper for my syntaxic taste,
- can stream stderr and stdout in real time while capturing it,
- real time output must be prefixed for when you execute several commands at
the time so that you know which line is for which process, like with
docker-compose logs,
- output coloration in real time with regexps to make even more readable.
This code was copy/pasted between projects and finally extracted on its own.
Usage
=====
Basics
------
Basic example, this will both stream output and capture it:
.. code-block:: python
from shlax import Subprocess
proc = await Subprocess('echo hi').wait()
print(proc.rc, proc.out, proc.err, proc.out_raw, proc.err_raw)
Longer
------
If you want to start the command and wait for completion elsewhere then call
any of ``start()`` and ``wait()``, or both, explicitely:
.. code-block:: python
proc = Subprocess('echo hi')
await proc.start() # start the process
await proc.wait() # wait for completion
Proc alias
----------
Note that shlax defines an alias ``Proc`` to ``Subprocess`` so this also works:
.. code-block:: python
from shlax import Proc
proc = await Proc('echo hi').wait()
Quiet
-----
To disable real time output streaming use the ``quiet`` argument:
.. code-block:: python
proc = await Subprocess('echo hi', quiet=True).wait()
Prefix
------
Using prefixes, you can have real time outputs of parallel commands and at the
same time know which output belongs to which process:
.. code-block:: python
proc0 = Subprocess('find /', prefix='first')
proc1 = Subprocess('find /', prefix='second')
await asyncio.gather(proc0.wait(), proc1.wait())
Coloration and output patching
------------------------------
You can add coloration or patch real time output with regexps, note that it
will be applied line by line:
.. code-block:: python
import sys
regexps = {
'^(.*).py$': '{cyan}\\1',
}
await asyncio.gather(*[
Subprocess(
f'find {path}',
regexps=regexps,
).wait()
for path in sys.path
])
Where is the rest?
==================
Shlax used to be the name of a much more ambitious poc-project that has been
extracted in two projects with clear boundaries, namely `sysplan
<https://yourlabs.io/oss/sysplan>`_ and `podplan
<https://yourlabs.io/oss/podplan>`_ which are still in alpha state, but Shlax
as it is feature complete and stable.
from setuptools import setup
setup(
name='shlax',
versioning='dev',
setup_requires='setupmeta',
extras_require=dict(
test=[
'pytest',
'pytest-cov',
'pytest-asyncio',
],
),
author='James Pic',
author_email='jamespic@gmail.com',
url='https://yourlabs.io/oss/shlax',
include_package_data=True,
license='MIT',
keywords='async subprocess',
python_requires='>=3',
)
from .subprocess import Subprocess # noqa
Proc = Subprocess # noqa
from .colors import colors, c # noqa
theme = dict(
cyan='\033[38;5;51m',
cyan1='\033[38;5;87m',
cyan2='\033[38;5;123m',
cyan3='\033[38;5;159m',
blue='\033[38;5;33m',
blue1='\033[38;5;69m',
blue2='\033[38;5;75m',
blue3='\033[38;5;81m',
blue4='\033[38;5;111m',
blue5='\033[38;5;27m',
green='\033[38;5;10m',
green1='\033[38;5;2m',
green2='\033[38;5;46m',
green3='\033[38;5;47m',
green4='\033[38;5;48m',
green5='\033[38;5;118m',
green6='\033[38;5;119m',
green7='\033[38;5;120m',
purple='\033[38;5;5m',
purple1='\033[38;5;6m',
purple2='\033[38;5;13m',
purple3='\033[38;5;164m',
purple4='\033[38;5;165m',
purple5='\033[38;5;176m',
purple6='\033[38;5;145m',
purple7='\033[38;5;213m',
purple8='\033[38;5;201m',
red='\033[38;5;1m',
red1='\033[38;5;9m',
red2='\033[38;5;196m',
red3='\033[38;5;160m',
red4='\033[38;5;197m',
red5='\033[38;5;198m',
red6='\033[38;5;199m',
yellow='\033[38;5;226m',
yellow1='\033[38;5;227m',
yellow2='\033[38;5;226m',
yellow3='\033[38;5;229m',
yellow4='\033[38;5;220m',
yellow5='\033[38;5;230m',
gray='\033[38;5;250m',
gray1='\033[38;5;251m',
gray2='\033[38;5;252m',
gray3='\033[38;5;253m',
gray4='\033[38;5;254m',
gray5='\033[38;5;255m',
gray6='\033[38;5;249m',
pink='\033[38;5;197m',
pink1='\033[38;5;198m',
pink2='\033[38;5;199m',
pink3='\033[38;5;200m',
pink4='\033[38;5;201m',
pink5='\033[38;5;207m',
pink6='\033[38;5;213m',
orange='\033[38;5;202m',
orange1='\033[38;5;208m',
orange2='\033[38;5;214m',
orange3='\033[38;5;220m',
orange4='\033[38;5;172m',
orange5='\033[38;5;166m',
reset='\033[0m',
)
class Colors:
def __init__(self, **theme):
for name, value in theme.items():
setattr(self, name, value)
setattr(self, f'b{name}', value.replace('[', '[1;'))
c = colors = Colors(**theme)
import asyncio
import functools
import re
import sys
from .colors import colors
class SubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, proc):
self.proc = proc
self.output = bytearray()
def pipe_data_received(self, fd, data):
if fd == 1:
self.proc.stdout(data)
elif fd == 2:
self.proc.stderr(data)
def process_exited(self):
self.proc.exit_future.set_result(True)
class Subprocess:
colors = colors
# arbitrary list of colors
prefix_colors = (
colors.cyan,
colors.blue,
colors.green,
colors.purple,
colors.red,
colors.yellow,
colors.gray,
colors.pink,
colors.orange,
)
# class variables, meant to grow as new prefixes are discovered to ensure
# output alignment
prefixes = dict()
prefix_length = 0
def __init__(
self,
*args,
quiet=None,
prefix=None,
regexps=None,
write=None,
flush=None,
):
if len(args) == 1 and ' ' in args[0]:
args = ['sh', '-euc', args[0]]
self.cmd = ' '.join(args)
self.args = args
self.quiet = quiet if quiet is not None else False
self.prefix = prefix
self.write = write or sys.stdout.buffer.write
self.flush = flush or sys.stdout.flush
self.started = False
self.waited = False
self.out_raw = bytearray()
self.err_raw = bytearray()
self.regexps = dict()
if regexps:
for search, replace in regexps.items():
if isinstance(search, str):
search = search.encode()
search = re.compile(search)
replace = replace.format(**self.colors.__dict__).encode()
self.regexps[search] = replace
async def start(self, wait=True):
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
self.exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
self.transport, self.protocol = await loop.subprocess_exec(
lambda: SubprocessProtocol(self),
*self.args,
stdin=None,
)
self.started = True
async def wait(self, *args, **kwargs):
if not self.started:
await self.start()
if not self.waited:
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await self.exit_future
# Close the stdout pipe.
self.transport.close()
self.waited = True
return self
def stdout(self, data):
self.out_raw.extend(data)
if not self.quiet:
self.output(data)
def stderr(self, data):
self.err_raw.extend(data)
if not self.quiet:
self.output(data)
@functools.cached_property
def out(self):
return self.out_raw.decode().strip()
@functools.cached_property
def err(self):
return self.err_raw.decode().strip()
@functools.cached_property
def rc(self):
return self.transport.get_returncode()
def output(self, data, highlight=True, flush=True):
for line in data.strip().split(b'\n'):
line = [self.highlight(line) if highlight else line]
if self.prefix:
line = self.prefix_line() + line
line.append(b'\n')
line = b''.join(line)
self.write(line)
if flush:
self.flush()
def highlight(self, line, highlight=True):
if not highlight or (
b'\x1b[' in line
or b'\033[' in line
or b'\\e[' in line
):
return line
for search, replace in self.regexps.items():
line = re.sub(search, replace, line)
line = line + self.colors.reset.encode()
return line
def prefix_line(self):
if self.prefix not in self.prefixes:
self.prefixes[self.prefix] = self.prefix_colors[len(self.prefixes)]
if len(self.prefix) > self.prefix_length:
type(self).prefix_length = len(self.prefix)
return [
self.prefixes[self.prefix].encode(),
b' ' * (self.prefix_length - len(self.prefix)),
self.prefix.encode(),
b' ',
self.colors.reset.encode(),
b'| '
]
import shlax
def test_colors():
assert shlax.colors.cyan == '\u001b[38;5;51m'
assert shlax.colors.bcyan == '\u001b[1;38;5;51m'
assert shlax.colors.reset == '\u001b[0m'
import pytest
from unittest.mock import Mock, call
from shlax import Proc
@pytest.mark.asyncio
@pytest.mark.parametrize(
'args',
(
['sh', '-c', 'echo hi'],
['echo hi'],
['sh -c "echo hi"'],
)
)
async def test_proc(args):
proc = Proc(*args, quiet=True)
assert not proc.waited
assert not proc.started
await proc.wait()
assert proc.waited
assert proc.started
assert proc.out == 'hi'
assert proc.err == ''
assert proc.out_raw == b'hi\n'
assert proc.err_raw == b''
assert proc.rc == 0
@pytest.mark.asyncio
async def test_wait_unbound():
proc = await Proc('echo hi', quiet=True).wait()
assert proc.out == 'hi'
@pytest.mark.asyncio
async def test_rc_1():
proc = await Proc(
'NON EXISTING COMMAND',
write=Mock(),
).wait()
assert proc.rc != 0
proc.write.assert_called_once_with(
b'sh: line 1: NON: command not found\x1b[0m\n'
)
@pytest.mark.asyncio
async def test_prefix():
"""
Test output prefixes for when executing multiple commands in parallel.
"""
Proc.prefix_length = 0 # reset
write = Mock()
await Proc(
'echo hi',
write=write,
prefix='test_prefix',
).wait()
await Proc(
'echo hi',
write=write,
prefix='test_prefix_1'
).wait()
await Proc(
'echo hi',
write=write,
prefix='test_prefix',
).wait()
assert write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[1].encode()
# padding has been added because of output1
+ b'test_prefix_1 '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
# padding has been added because of output1
+ b' test_prefix '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
)
]
@pytest.mark.asyncio
async def test_prefix_multiline():
Proc.prefix_length = 0 # reset
proc = await Proc(
'echo -e "a\nb"',
write=Mock(),
prefix='test_prefix',
).wait()
assert proc.write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| a'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
# padding has been added because of output1
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| b'
+ Proc.colors.reset.encode()
+ b'\n'
),
]
@pytest.mark.asyncio
async def test_highlight():
"""
Test that we can color output with regexps.
"""
proc = await Proc(
'echo hi',
write=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.write.assert_called_once_with(b'h\x1b[38;5;51mi\x1b[0m\n')
@pytest.mark.asyncio
async def test_highlight_if_not_colored():
"""
Test that coloration does not apply on output that is already colored.
"""
proc = await Proc(
'echo -e h"\\e[31m"i',
write=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.write.assert_called_once_with(b'h\x1b[31mi\n')
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment