Commit fd4fd42e authored by ∞'s avatar 💻

Code split into multiple files, some hacks removed

parent 49780036
This diff is collapsed.
# flake8: noqa
"""cli2 is like click but as laxist as docopts."""
from .colors import * # noqa
from .exceptions import Cli2Exception, Cli2ArgsException
from .parser import Parser
from .introspection import docfile, Callable, Importable, DocDescriptor
from .command import command, Command
from .console_script import ConsoleScript, Group
from .test import autotest
from .cli import debug, docmod, help, run
"""
cli2 makes your python callbacks work on CLI too !
cli2 provides sub-commands to introspect python modules or callables docstrings
or to execute callables or help working with cli2 itself.
"""
import os
import textwrap
import types
from .console_script import ConsoleScript, Group
from .command import command, option
from .exceptions import Cli2Exception, Cli2ArgsException
from .colors import GREEN, RED, RESET, YELLOW
from .introspection import docfile, Callable, Importable
def docmod(module_name):
"""Docstring for a module in dotted path.
Example: cli2 docmod cli2
"""
return Group.factory(module_name).doc
@command(color=GREEN)
def help(*args):
"""
Get help for a command.
Example::
$ cli2 help help
"""
console_script = ConsoleScript.singleton
if not args:
# show console script documentation
yield console_script.doc
else:
# show command documentation if possible
if args[0] in console_script:
yield console_script[args[0]].doc
else:
importable = Importable.factory(args[0])
if importable.target and not importable.is_module:
yield importable.doc
elif importable.module:
if not importable.target:
yield f'{RED}Cannot import {args[0]}{RESET}'
yield ' '.join([
YELLOW,
'Showing help for',
importable.module.__name__ + RESET
])
yield Group.factory(importable.module.__name__).doc
@command(color=GREEN)
def debug(callback, *args, **kwargs):
"""
Dump parsed variables.
Example usage::
cli2 debug test to=see --how -it=parses
"""
cs = console_script
parser = cs.parser
yield textwrap.dedent(f'''
Callable: {RED}{callback}{RESET}
Args: {YELLOW}{args}{RESET}
Kwargs: {YELLOW}{kwargs}{RESET}
console_script.parser.dashargs: {GREEN}{parser.dashargs}{RESET}
console_script.parser.dashkwargs: {GREEN}{parser.dashkwargs}{RESET}
''').strip()
@option('debug', help='Also print debug output', color=GREEN, alias='d')
def run(callback, *args, **kwargs):
"""
Execute a python callback on the command line.
To call your.module.callback('arg1', 'argN', kwarg1='foo'):
cli2 your.module.callback arg1 argN kwarg1=foo
You can also prefix arguments with a dash, those that contain equal sign
will end in dict console_script.parser.dashkwargs, those without equal
sign will end up in a list in console_script.parser.dashargs.
For examples, try `cli2 debug`.
For other commands, try `cli2 help`.
"""
if console_script.parser.options.get('debug', False):
print('HELLO')
cb = Callable.factory(callback)
if cb.target and not cb.is_module:
try:
result = cb(*args, **kwargs)
except Cli2ArgsException as e:
print(e)
print(cb.doc)
result = None
console_script.exit_code = 1
except Exception as e:
out = [f'{RED}Running {callback}(']
if args and kwargs:
out.append(f'*{args}, **{kwargs}')
elif args:
out.append(f'*{args}')
elif kwargs:
out.append(f'**{kwargs}')
out.append(f') raised {type(e)}{RESET}')
e.args = (e.args[0] + '\n' + cb.doc,) + e.args[1:]
raise
if isinstance(result, types.GeneratorType):
yield from result
else:
yield result
else:
if '.' in callback:
yield f'{RED}Could not import callback: {callback}{RESET}'
else:
yield f'{RED}Cannot run a module{RESET}: try {callback}.something'
if cb.module:
yield ' '.join([
'However we could import module',
f'{GREEN}{cb.module.__name__}{RESET}',
])
doc = docmod(cb.module.__name__)
if doc:
yield f'Showing help for {GREEN}{cb.module.__name__}{RESET}:'
yield doc
else:
return f'Docstring not found in {cb.module.__name__}'
elif callback != callback.split('.')[0]:
yield ' '.join([
RED,
'Could not import module:',
callback.split('.')[0],
RESET,
])
console_script = ConsoleScript(
__doc__,
default_command='run'
).add_commands(
help,
run,
debug,
command(color=GREEN)(docmod),
command(color=GREEN)(docfile),
)
import colorama
GREEN = colorama.Fore.GREEN
RED = colorama.Fore.RED
YELLOW = colorama.Fore.YELLOW
RESET = colorama.Style.RESET_ALL
import collections
from .introspection import Callable
from .colors import YELLOW
class Command(Callable):
def __init__(self, name, target, color=None, options=None, doc=None):
super().__init__(name, target)
self.color = color or YELLOW
self.options = options or collections.OrderedDict()
def command(**config):
def wrap(cb):
if 'cli2' not in cb.__dict__:
cb.cli2 = Command(cb.__name__, cb, **config)
else:
cb.cli2.__dict__.update(config)
return cb
return wrap
class Option:
def __init__(self, name, help=None, color=None, alias=None):
self.name = name
self.help = help or 'Undocumented option'
self.color = color or ''
self.alias = alias
def option(name, **cfg):
def wrap(cb):
if 'cli2' not in cb.__dict__:
cb = command()(cb)
cb.cli2.options[name] = Option(name, **cfg)
return cb
return wrap
import collections
import inspect
import colorama
import pprint
import sys
import types
from .colors import RESET
from .parser import Parser
from .command import Command
from .exceptions import Cli2ArgsException, Cli2Exception
from .introspection import Importable
class GroupDocDescriptor:
def __get__(self, obj, objtype):
ret = []
if 'value' in self.__dict__:
ret.append(self.value.strip() + '\n')
width = len(max(obj.keys(), key=len)) + 2
for name, cmd in obj.items():
line = ' ' + ''.join([
cmd.color,
name,
RESET,
(width - len(name)) * ' '
])
if cmd.target:
doc = inspect.getdoc(cmd.target)
if doc:
line += doc.split('\n')[0]
else:
line += cmd.name + str(inspect.signature(cmd.target))
else:
line += 'Callback not found'
ret.append(line)
return '\n'.join(ret)
def __set__(self, obj, value):
self.value = value
class Group(collections.OrderedDict):
doc = GroupDocDescriptor()
def __init__(self, name, doc=None):
self.name = name
if doc:
self.doc = doc
def add_help(self):
from .cli import help
self['help'] = Command('help', help)
return self
def add_module(self, module_name):
importable = Importable.factory(module_name)
if not importable.module:
raise Cli2Exception('Module not found' + importable.module)
for cb in importable.get_callables():
self[cb.name] = getattr(
cb.target, 'cli2', Command(cb.name, cb.target))
return self
def add_commands(self, *callbacks):
for cb in callbacks:
self[cb.__name__] = getattr(
cb, 'cli2',
Command(cb.__name__, cb)
)
return self
@classmethod
def factory(cls, module_name):
doc = Importable.factory(module_name).doc
return cls(module_name, doc).add_module(module_name)
class ConsoleScript(Group):
def __init__(self, doc=None, argv=None, default_command='help'):
ConsoleScript.singleton = self
self.default_command = default_command
argv = argv if argv is not None else sys.argv
Group.__init__(self, argv[0].split('/')[-1], doc)
self.argv = argv
self.exit_code = 0
self.add_help()
def __call__(self):
ConsoleScript.singleton = self
self.parser = Parser(self.argv[1:], self)
self.parser.parse()
colorama.init()
result = None
try:
result = self.call(self.parser.command)
if isinstance(result, (types.GeneratorType, list)):
for r in result:
self.result_handler(r)
else:
self.result_handler(result)
except Cli2ArgsException as e:
self.exit_code = 1
print('\n'.join([str(e), '', self.parser.command.doc]))
except Cli2Exception as e:
self.exit_code = 1
print(str(e))
except Exception:
self.exit_code = 1
raise
return self.exit_code
def call(self, command):
return command(*self.parser.funcargs, **self.parser.funckwargs)
def result_handler(self, result):
if isinstance(result, str):
print(result)
elif result is None:
pass
else:
pprint.PrettyPrinter(indent=4).pprint(result)
def result_handler_set(self, result_handler):
self.result_handler = result_handler.__get__(self, type(self))
ConsoleScript.singleton = ConsoleScript()
from .colors import RED, RESET
class Cli2Exception(Exception):
pass
class Cli2ArgsException(Cli2Exception):
def __init__(self, command, passed_args):
msg = ['Got arguments:'] if len(passed_args) else []
for arg, i in enumerate(passed_args, start=0):
msg.append('='.join(command.required_args[i], arg))
msg.append(
'Missing arguments: ' + ', '.join([*map(
lambda i: f'{RED}{i}{RESET}',
command.required_args[len(passed_args):],
)])
)
super().__init__('\n'.join(msg))
import inspect
import importlib
import types
import os
from .colors import GREEN, RED, RESET
from .exceptions import Cli2Exception, Cli2ArgsException
def docfile(filepath):
"""
Docstring for a file path.
Examples:
cli2 docfile cli2.py
"""
if not os.path.exists(filepath):
raise Cli2Exception(f'{RED}{filepath}{RESET} not found')
try:
co = compile(open(filepath).read(), filepath, 'exec')
except SyntaxError:
print(f'{RED}SyntaxError in {filepath}{RESET} shown below:')
raise
if co.co_consts and isinstance(co.co_consts[0], str):
docstring = co.co_consts[0]
else:
docstring = None
return docstring
class DocDescriptor:
def __get__(self, obj, objtype):
if obj.is_module:
if 'value' in self.__dict__:
return self.value
# Only show module docstring
from .cli import docfile
return docfile(obj.module.__file__)
elif obj.target:
# Show callable docstring + signature
ret = []
if callable(obj.target):
# TODO: enhance output of the signature help
ret.append(''.join([
f'Signature: {GREEN}{obj.name}{RESET}',
f'{inspect.signature(obj.target)}'
]))
if 'value' in self.__dict__:
ret.append(self.value)
else:
ret.append(inspect.getdoc(obj.target) or 'No docstring found')
if obj.options:
ret += ['', 'Extra CLI options:', '']
width = len(max(obj.options.keys(), key=len)) + 4
for name, option in obj.options.items():
ret.append(' ' + ''.join([
option.color,
f'--{name},-{option.alias}'
if option.alias else f'--{name}',
RESET,
(width - len(name) + 2) * ' ',
option.help,
]))
return '\n'.join(ret)
def __set__(self, obj, value):
self.value = value
class Importable:
doc = DocDescriptor()
def __init__(self, name, target, module=None):
self.name = name
self.target = target
if module:
self.module = module
elif isinstance(target, types.ModuleType):
self.module = target
elif target:
self.module = target.__module__
else:
self.module = None
def __str__(self):
return self.name
__repr__ = __str__
def __eq__(self, other):
return other.name == self.name and other.target == self.target
@classmethod
def factory(cls, name):
module = None
parts = name.split('.')
for i, part in reversed(list(enumerate(parts))):
modname = '.'.join(parts[:i + 1])
if not modname:
break
try:
module = importlib.import_module(modname)
except ImportError:
continue
else:
break
if module:
ret = module
for part in parts[i + 1:]:
if isinstance(ret, dict) and part in ret:
ret = ret.get(part)
elif isinstance(ret, list) and part.isnumeric():
ret = ret[int(part)]
else:
ret = getattr(ret, part, None)
else:
ret = None
return cls(name, ret, module)
def get_callables(self, whitelist=None, blacklist=None):
for name, member in inspect.getmembers(self.target):
if not callable(member):
continue
if isinstance(member, type):
continue
if whitelist and name not in whitelist:
continue
if blacklist and name in blacklist:
continue
if name.startswith('_'):
continue
yield Callable(name, member)
@property
def is_module(self):
return self.module == self.target
class Callable(Importable):
doc = DocDescriptor()
def __call__(self, *args, **kwargs):
if len(args) < len(self.required_args):
raise Cli2ArgsException(self, args)
return self.target(*args, **kwargs)
@property
def required_args(self):
if self.is_module:
return []
argspec = inspect.getfullargspec(self.target)
if argspec.defaults:
return argspec.args[:-len(argspec.defaults)]
else:
return argspec.args
from .command import Command
class Parser:
def __init__(self, argv_all, group=None):
self.argv_all = argv_all
self.root = group
self.group = group
self.command = None
# this parse() will provision
self.argv = []
self.funcargs = []
self.funckwargs = {}
self.dashargs = []
self.dashkwargs = {}
self.options = {}
def parse(self):
from .console_script import Group
for arg in self.argv_all:
if not self.command and arg in self.group:
item = self.group[arg]
if isinstance(item, Group):
self.group = item
elif isinstance(item, Command):
self.command = item
else:
self.argv.append(arg)
if not self.command:
self.command = self.group[self.group.default_command]
for arg in self.argv:
self.append(arg)
def get_option(self, name):
name = name.lstrip('-')
for option in self.command.options.values():
if name == option.name or name == option.alias:
return option
return False
def append(self, arg):
if '=' in arg:
if arg.startswith('-'):
key, value = arg.lstrip('-').split('=')
option = self.get_option(key)
if option:
self.options[option.name] = value
else:
self.dashkwargs[key] = value
else:
key, value = arg.split('=', 1)
self.funckwargs[key] = value
else:
if arg.startswith('-'):
stripped = arg.lstrip('-')
option = self.get_option(stripped)
if option:
self.options[option.name] = True
else:
self.dashargs.append(stripped)
else:
self.funcargs.append(arg)
import io
import os
import pkg_resources
import re
import shlex
import subprocess
import unittest
def entrypoint_get(name):
for ep in pkg_resources.iter_entry_points('console_scripts'):
if ep.name == name:
return ep
def autotest(path, cmd, ignore=None):
"""
The autowriting test pattern, minimal for testing cli2 scripts.
Example:
cli2.autotest(
'tests/djcli_save_user.txt',
'djcli save auth.User username="test"',
)
"""
name = cmd.split(' ')[0]
ep = entrypoint_get(name)
if not ep:
raise Exception('Could not find entrypoint {name}')
console_script = ep.load()
console_script.argv = shlex.split(cmd)
# for debugging this function, the following helps me:
# return print(cmd, console_script())
@unittest.mock.patch('sys.stderr', new_callable=io.StringIO)
@unittest.mock.patch('sys.stdout', new_callable=io.StringIO)
def test(mock_stdout, mock_stderr):
console_script()
return mock_stdout, mock_stderr
out, err = test()
out.seek(0)
test_out = out.read()
err.seek(0)
test_err = err.read()
fixture = '\n'.join([
'command: ' + cmd,
'retcode: ' + str(console_script.exit_code),
'stdout:',
test_out,
])
if test_err:
fixture += '\n'.join([
'stderr:',
test_err,
])
for r in ignore or []:
fixture = re.compile(r).sub(f'redacted', fixture)
if not os.path.exists(path):
dirname = '/'.join(path.split('/')[:-1])
if not os.path.exists(dirname):
os.makedirs(dirname)
with open(path, 'w+') as f:
f.write(fixture)
raise type('FixtureCreated', (Exception,), {})(
f'''
{path} was not in workding and was created with:
{fixture}
'''.strip(),
)
diff_cmd = 'diff -U 1 - "%s" | sed "1,2 d"' % path
proc = subprocess.Popen(
diff_cmd,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True
)
diff_out, diff_err = proc.communicate(input=fixture.encode('utf8'))
if diff_out:
raise type(f'''
DiffFound
- {cmd}
+ {path}
'''.strip(), (Exception,), {})('\n' + diff_out.decode('utf8'))
......@@ -18,8 +18,8 @@ import pytest
('help_module_attr_notfound', 'help cli2.skipppp'),
('docmod', 'docmod cli2'),
('docmod_noargs', 'docmod'),