mirror of
https://github.com/xonsh/xonsh.git
synced 2025-03-04 08:24:40 +01:00
feat: add superhelp and additional context via new FuncAlias (#5366)
### Goals * Make callable aliases transparent. * Catch errors in callable aliases and show the name of the source. * Show additional attributes: thredable, capturable. * Closes #5266 ## Exception ### Before ```xsh aliases['cd'] # <function xonsh.dirstack.cd> aliases['trace'] # <function xonsh.aliases.trace> aliases['null'] = lambda: 1/0 null # ZeroDivisionError: division by zero @aliases.register('catch') @aliases.register('me') @aliases.register('if') @aliases.register('you') @aliases.register('can') def _exc(args, stdin, stdout): for line in stdin.readlines(): print(line.strip() + '!', file=stdout, flush=True) return 1/0 if 'i' in $__ALIAS_NAME else 0 echo hey | catch | me | if | you | can # ZeroDivisionError: division by zero <--- ??? # hey!!!!! ``` ### After ```xsh aliases['cd'] # FuncAlias({'name': 'cd', 'func': 'cd'}) aliases['trace'] # FuncAlias({'name': 'trace', 'func': 'trace', '__xonsh_threadable__': False}) $XONSH_SHOW_TRACEBACK=False $RAISE_SUBPROC_ERROR = False aliases['null'] = lambda: 1/0 null #Exception in thread {'cls': 'ProcProxyThread', 'name': 'Thread-15', 'func': FuncAlias({'name': 'null', 'func': '<lambda>'}), 'alias': 'null', 'pid': None} #ZeroDivisionError: division by zero @aliases.register('catch') @aliases.register('me') @aliases.register('if') @aliases.register('you') @aliases.register('can') def _exc(args, stdin, stdout): for line in stdin.readlines(): print(line.strip() + '!', file=stdout, flush=True) return 1/0 if 'i' in $__ALIAS_NAME else 0 echo hey | catch | me | if | you | can # Exception in thread {'cls': 'ProcProxyThread', 'name': 'Thread-8', 'func': FuncAlias({'name': 'if', 'func': '_exc'}), 'alias': 'if', 'pid': None} # ZeroDivisionError: division by zero # hey!!!!! ``` ## Superhelp ### Before ```xsh @aliases.register("hello") def _alias_hello(): """Show world.""" print('world') hello? # No manual entry for hello ``` ### After ```xsh @aliases.register("hello") def _alias_hello(): """Show world.""" print('world') hello? # FuncAlias({'name': 'hello', 'func': '_alias_hello'}): # Show world. ``` ## For community ⬇️ **Please click the 👍 reaction instead of leaving a `+1` or 👍 comment** --------- Co-authored-by: a <1@1.1> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
55b341d477
commit
bb394a8e84
8 changed files with 257 additions and 81 deletions
24
news/funcalias.rst
Normal file
24
news/funcalias.rst
Normal file
|
@ -0,0 +1,24 @@
|
|||
**Added:**
|
||||
|
||||
* Added FuncAlias to process callable aliases.
|
||||
* Added alias name printing in case of exception in alias.
|
||||
|
||||
**Changed:**
|
||||
|
||||
* <news item>
|
||||
|
||||
**Deprecated:**
|
||||
|
||||
* <news item>
|
||||
|
||||
**Removed:**
|
||||
|
||||
* <news item>
|
||||
|
||||
**Fixed:**
|
||||
|
||||
* Fixed showing alias description using superhelp e.g. ``which?``.
|
||||
|
||||
**Security:**
|
||||
|
||||
* <news item>
|
|
@ -9,7 +9,7 @@ import pytest
|
|||
from xonsh.aliases import Aliases, ExecAlias
|
||||
|
||||
|
||||
def cd(args, stdin=None, **kwargs):
|
||||
def cd(args, stdin=None):
|
||||
return args
|
||||
|
||||
|
||||
|
@ -30,10 +30,11 @@ def test_imports(xession):
|
|||
"o": ["omg", "lala"],
|
||||
"ls": ["ls", "- -"],
|
||||
"color_ls": ["ls", "--color=true"],
|
||||
"cd": cd,
|
||||
"cd": "FuncAlias",
|
||||
"indirect_cd": ["cd", ".."],
|
||||
}
|
||||
raw = ales._raw
|
||||
raw["cd"] = type(ales["cd"]).__name__
|
||||
assert raw == expected
|
||||
|
||||
|
||||
|
|
|
@ -1000,6 +1000,177 @@ def test_run_fail_not_on_path():
|
|||
assert out != "Hello world"
|
||||
|
||||
|
||||
ALIASES_THREADABLE_PRINT_CASES = [
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = False
|
||||
$XONSH_SHOW_TRACEBACK = False
|
||||
aliases['f'] = lambda: 1/0
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"^f1f1f1\nException in thread.*FuncAlias.*\nZeroDivisionError.*\nf2f2f2\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = True
|
||||
$XONSH_SHOW_TRACEBACK = False
|
||||
aliases['f'] = lambda: 1/0
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nException in thread.*\nZeroDivisionError: .*\nsubprocess.CalledProcessError.*\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = True
|
||||
$XONSH_SHOW_TRACEBACK = True
|
||||
aliases['f'] = lambda: 1/0
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nException in thread.*\nTraceback.*\nZeroDivisionError: .*\nsubprocess.CalledProcessError.*\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = False
|
||||
$XONSH_SHOW_TRACEBACK = True
|
||||
aliases['f'] = lambda: 1/0
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nException in thread.*FuncAlias.*\nTraceback.*\nZeroDivisionError.*\nf2f2f2\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = False
|
||||
$XONSH_SHOW_TRACEBACK = False
|
||||
aliases['f'] = lambda: (None, "I failed", 2)
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"^f1f1f1\nI failed\nf2f2f2\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = True
|
||||
$XONSH_SHOW_TRACEBACK = False
|
||||
aliases['f'] = lambda: (None, "I failed", 2)
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nI failed\nsubprocess.CalledProcessError.*\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = True
|
||||
$XONSH_SHOW_TRACEBACK = True
|
||||
aliases['f'] = lambda: (None, "I failed", 2)
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nI failed.*\nTraceback.*\nsubprocess.CalledProcessError.*\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = False
|
||||
$XONSH_SHOW_TRACEBACK = True
|
||||
aliases['f'] = lambda: (None, "I failed", 2)
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nI failed\nf2f2f2\n$",
|
||||
),
|
||||
]
|
||||
|
||||
ALIASES_UNTHREADABLE_PRINT_CASES = [
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = False
|
||||
$XONSH_SHOW_TRACEBACK = False
|
||||
aliases['f'] = lambda: 1/0
|
||||
aliases['f'].__xonsh_threadable__ = False
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"^f1f1f1\nException in.*FuncAlias.*\nZeroDivisionError.*\nf2f2f2\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = True
|
||||
$XONSH_SHOW_TRACEBACK = False
|
||||
aliases['f'] = lambda: 1/0
|
||||
aliases['f'].__xonsh_threadable__ = False
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nException in.*\nZeroDivisionError: .*\nsubprocess.CalledProcessError.*\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = True
|
||||
$XONSH_SHOW_TRACEBACK = True
|
||||
aliases['f'] = lambda: 1/0
|
||||
aliases['f'].__xonsh_threadable__ = False
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nException in.*\nTraceback.*\nZeroDivisionError: .*\nsubprocess.CalledProcessError.*\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = False
|
||||
$XONSH_SHOW_TRACEBACK = True
|
||||
aliases['f'] = lambda: 1/0
|
||||
aliases['f'].__xonsh_threadable__ = False
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nException in.*FuncAlias.*\nTraceback.*\nZeroDivisionError.*\nf2f2f2\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = False
|
||||
$XONSH_SHOW_TRACEBACK = False
|
||||
aliases['f'] = lambda: (None, "I failed", 2)
|
||||
aliases['f'].__xonsh_threadable__ = False
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"^f1f1f1\nI failed\nf2f2f2\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = True
|
||||
$XONSH_SHOW_TRACEBACK = False
|
||||
aliases['f'] = lambda: (None, "I failed", 2)
|
||||
aliases['f'].__xonsh_threadable__ = False
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nI failed\nsubprocess.CalledProcessError.*\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = True
|
||||
$XONSH_SHOW_TRACEBACK = True
|
||||
aliases['f'] = lambda: (None, "I failed", 2)
|
||||
aliases['f'].__xonsh_threadable__ = False
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nI failed.*\nTraceback.*\nsubprocess.CalledProcessError.*\n$",
|
||||
),
|
||||
(
|
||||
"""
|
||||
$RAISE_SUBPROC_ERROR = False
|
||||
$XONSH_SHOW_TRACEBACK = True
|
||||
aliases['f'] = lambda: (None, "I failed", 2)
|
||||
aliases['f'].__xonsh_threadable__ = False
|
||||
echo f1f1f1 ; f ; echo f2f2f2
|
||||
""",
|
||||
"f1f1f1\nI failed\nf2f2f2\n$",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@skip_if_on_windows
|
||||
@pytest.mark.parametrize(
|
||||
"case", ALIASES_THREADABLE_PRINT_CASES + ALIASES_UNTHREADABLE_PRINT_CASES
|
||||
)
|
||||
def test_aliases_print(case):
|
||||
cmd, match = case
|
||||
out, err, ret = run_xonsh(cmd=cmd, single_command=False)
|
||||
assert re.match(
|
||||
match, out, re.MULTILINE | re.DOTALL
|
||||
), f"\nFailed:\n```\n{cmd.strip()}\n```,\nresult: {out!r}\nexpected: {match!r}."
|
||||
|
||||
|
||||
@skip_if_on_windows
|
||||
@pytest.mark.parametrize("interactive", [True, False])
|
||||
def test_raise_subproc_error_with_show_traceback(monkeypatch, interactive):
|
||||
|
|
|
@ -2122,7 +2122,7 @@ def test_print_exception_error(xession, capsys):
|
|||
match,
|
||||
cap.err,
|
||||
re.MULTILINE | re.DOTALL,
|
||||
), f"Assert: {cap.err!r} not matched with {match!r}"
|
||||
), f"\nAssert: {cap.err!r},\nexpected: {match!r}"
|
||||
|
||||
with xession.env.swap(XONSH_SHOW_TRACEBACK=True):
|
||||
try:
|
||||
|
@ -2130,9 +2130,9 @@ def test_print_exception_error(xession, capsys):
|
|||
except subprocess.CalledProcessError:
|
||||
print_exception(msg="MSG")
|
||||
cap = capsys.readouterr()
|
||||
match = ".*Traceback.*subprocess.CalledProcessError: Command .* returned non-zero exit status .*\nMSG\n"
|
||||
match = ".*Traceback.*subprocess.CalledProcessError: Command .* returned non-zero exit status .*MSG\n"
|
||||
assert re.match(
|
||||
match,
|
||||
cap.err,
|
||||
re.MULTILINE | re.DOTALL,
|
||||
), f"Assert: {cap.err!r} not matched with {match!r}"
|
||||
), f"\nAssert: {cap.err!r},\nexpected {match!r}"
|
||||
|
|
|
@ -54,6 +54,37 @@ def EXEC_ALIAS_RE():
|
|||
return re.compile(r"@\(|\$\(|!\(|\$\[|!\[|\&\&|\|\||\s+and\s+|\s+or\s+|[>|<]")
|
||||
|
||||
|
||||
class FuncAlias:
|
||||
"""Provides a callable alias for xonsh commands."""
|
||||
|
||||
attributes_show = ["__xonsh_threadable__", "__xonsh_capturable__"]
|
||||
attributes_inherit = attributes_show + ["__doc__"]
|
||||
|
||||
def __init__(self, name, func):
|
||||
self.__name__ = self.name = name
|
||||
self.func = func
|
||||
for attr in self.attributes_inherit:
|
||||
if (val := getattr(func, attr, None)) is not None:
|
||||
self.__setattr__(attr, val)
|
||||
|
||||
def __repr__(self):
|
||||
r = {"name": self.name, "func": self.func.__name__}
|
||||
r |= {
|
||||
attr: val
|
||||
for attr in self.attributes_show
|
||||
if (val := getattr(self, attr, None)) is not None
|
||||
}
|
||||
return f"FuncAlias({repr(r)})"
|
||||
|
||||
def __call__(
|
||||
self, args=None, stdin=None, stdout=None, stderr=None, spec=None, stack=None
|
||||
):
|
||||
func_args = [args, stdin, stdout, stderr, spec, stack][
|
||||
: len(inspect.signature(self.func).parameters)
|
||||
]
|
||||
return self.func(*func_args)
|
||||
|
||||
|
||||
class Aliases(cabc.MutableMapping):
|
||||
"""Represents a location to hold and look up aliases."""
|
||||
|
||||
|
@ -182,6 +213,8 @@ class Aliases(cabc.MutableMapping):
|
|||
else:
|
||||
# need to exec alias
|
||||
self._raw[key] = ExecAlias(val, filename=f)
|
||||
elif isinstance(val, types.FunctionType):
|
||||
self._raw[key] = FuncAlias(key, val)
|
||||
else:
|
||||
self._raw[key] = val
|
||||
|
||||
|
@ -225,7 +258,7 @@ class Aliases(cabc.MutableMapping):
|
|||
|
||||
|
||||
class ExecAlias:
|
||||
"""Provides a callable alias for xonsh source code."""
|
||||
"""Provides an exec alias for xonsh source code."""
|
||||
|
||||
def __init__(self, src, filename="<exec-alias>"):
|
||||
"""
|
||||
|
|
|
@ -9,7 +9,6 @@ licensed to the Python Software foundation under a Contributor Agreement.
|
|||
|
||||
import collections.abc as cabc
|
||||
import functools
|
||||
import inspect
|
||||
import io
|
||||
import os
|
||||
import signal
|
||||
|
@ -274,7 +273,7 @@ def parse_proxy_return(r, stdout, stderr):
|
|||
stdout.write(str(r[0]))
|
||||
stdout.flush()
|
||||
if rlen > 1 and r[1] is not None:
|
||||
stderr.write(str(r[1]))
|
||||
stderr.write(xt.endswith_newline(str(r[1])))
|
||||
stderr.flush()
|
||||
if rlen > 2 and isinstance(r[2], int):
|
||||
cmd_result = r[2]
|
||||
|
@ -285,69 +284,6 @@ def parse_proxy_return(r, stdout, stderr):
|
|||
return cmd_result
|
||||
|
||||
|
||||
def proxy_zero(f, args, stdin, stdout, stderr, spec, stack):
|
||||
"""Calls a proxy function which takes no parameters."""
|
||||
return f()
|
||||
|
||||
|
||||
def proxy_one(f, args, stdin, stdout, stderr, spec, stack):
|
||||
"""Calls a proxy function which takes one parameter: args"""
|
||||
return f(args)
|
||||
|
||||
|
||||
def proxy_two(f, args, stdin, stdout, stderr, spec, stack):
|
||||
"""Calls a proxy function which takes two parameter: args and stdin."""
|
||||
return f(args, stdin)
|
||||
|
||||
|
||||
def proxy_three(f, args, stdin, stdout, stderr, spec, stack):
|
||||
"""Calls a proxy function which takes three parameter: args, stdin, stdout."""
|
||||
return f(args, stdin, stdout)
|
||||
|
||||
|
||||
def proxy_four(f, args, stdin, stdout, stderr, spec, stack):
|
||||
"""Calls a proxy function which takes four parameter: args, stdin, stdout,
|
||||
and stderr.
|
||||
"""
|
||||
return f(args, stdin, stdout, stderr)
|
||||
|
||||
|
||||
def proxy_five(f, args, stdin, stdout, stderr, spec, stack):
|
||||
"""Calls a proxy function which takes four parameter: args, stdin, stdout,
|
||||
stderr, and spec.
|
||||
"""
|
||||
return f(args, stdin, stdout, stderr, spec)
|
||||
|
||||
|
||||
PROXIES = (proxy_zero, proxy_one, proxy_two, proxy_three, proxy_four, proxy_five)
|
||||
|
||||
|
||||
def partial_proxy(f):
|
||||
"""Dispatches the appropriate proxy function based on the number of args."""
|
||||
numargs = 0
|
||||
|
||||
for name, param in inspect.signature(f).parameters.items():
|
||||
# handle *args/**kwargs signature
|
||||
if param.kind in {param.VAR_KEYWORD, param.VAR_POSITIONAL}:
|
||||
numargs = 6
|
||||
break
|
||||
if (
|
||||
param.kind == param.POSITIONAL_ONLY
|
||||
or param.kind == param.POSITIONAL_OR_KEYWORD
|
||||
):
|
||||
numargs += 1
|
||||
elif name in xt.ALIAS_KWARG_NAMES and param.kind == param.KEYWORD_ONLY:
|
||||
numargs += 1
|
||||
if numargs < 6:
|
||||
return functools.partial(PROXIES[numargs], f)
|
||||
elif numargs == 6:
|
||||
# don't need to partial.
|
||||
return f
|
||||
else:
|
||||
e = "Expected proxy with 6 or fewer arguments for {}, not {}"
|
||||
raise xt.XonshError(e.format(", ".join(xt.ALIAS_KWARG_NAMES), numargs))
|
||||
|
||||
|
||||
def get_proc_proxy_name(cls):
|
||||
func_name = cls.f
|
||||
if type(cls.f) is functools.partial:
|
||||
|
@ -409,8 +345,7 @@ class ProcProxyThread(threading.Thread):
|
|||
env : Mapping, optional
|
||||
Environment mapping.
|
||||
"""
|
||||
self.orig_f = f
|
||||
self.f = partial_proxy(f)
|
||||
self.f = f
|
||||
self.args = args
|
||||
self.pid = None
|
||||
self.returncode = None
|
||||
|
@ -799,8 +734,7 @@ class ProcProxy:
|
|||
close_fds=False,
|
||||
env=None,
|
||||
):
|
||||
self.orig_f = f
|
||||
self.f = partial_proxy(f)
|
||||
self.f = f
|
||||
self.args = args
|
||||
self.pid = os.getpid()
|
||||
self.returncode = None
|
||||
|
|
|
@ -488,10 +488,18 @@ class SubprocSpec:
|
|||
except FileNotFoundError as ex:
|
||||
cmd0 = self.cmd[0]
|
||||
if len(self.cmd) == 1 and cmd0.endswith("?"):
|
||||
with contextlib.suppress(OSError):
|
||||
return self.cls(
|
||||
["man", cmd0.rstrip("?")], bufsize=bufsize, **kwargs
|
||||
cmdq = cmd0.rstrip("?")
|
||||
if cmdq in XSH.aliases:
|
||||
alias = XSH.aliases[cmdq]
|
||||
descr = (
|
||||
repr(alias) + (":\n" + doc)
|
||||
if (doc := getattr(alias, "__doc__", ""))
|
||||
else ""
|
||||
)
|
||||
return self.cls(["echo", descr], bufsize=bufsize, **kwargs)
|
||||
else:
|
||||
with contextlib.suppress(OSError):
|
||||
return self.cls(["man", cmdq], bufsize=bufsize, **kwargs)
|
||||
e = f"xonsh: subprocess mode: command not found: {repr(cmd0)}"
|
||||
env = XSH.env
|
||||
sug = xt.suggest_commands(cmd0, env)
|
||||
|
@ -701,7 +709,7 @@ class SubprocSpec:
|
|||
if not callable(self.alias):
|
||||
return
|
||||
# check that we actual need the stack
|
||||
sig = inspect.signature(self.alias)
|
||||
sig = inspect.signature(getattr(self.alias, "func", self.alias))
|
||||
if len(sig.parameters) <= 5 and "stack" not in sig.parameters:
|
||||
return
|
||||
# compute the stack, and filter out these build methods
|
||||
|
|
|
@ -1116,7 +1116,7 @@ def display_colored_error_message(exc_info, strip_xonsh_error_types=True, limit=
|
|||
content = traceback.format_exception(*exc_info, limit=limit)
|
||||
|
||||
if no_trace_and_raise_subproc_error and "Error:" in content[-1]:
|
||||
content = [content[-1].rstrip()]
|
||||
content = [content[-1]]
|
||||
|
||||
traceback_str = "".join([v for v in content])
|
||||
|
||||
|
@ -1133,7 +1133,7 @@ def display_colored_error_message(exc_info, strip_xonsh_error_types=True, limit=
|
|||
lexer = pygments.lexers.python.PythonTracebackLexer()
|
||||
tokens = list(pygments.lex(traceback_str, lexer=lexer))
|
||||
# this goes to stdout, but since we are interactive it doesn't matter
|
||||
print_color(tokens, end="\n", file=sys.stderr)
|
||||
print_color(tokens, end="", file=sys.stderr)
|
||||
return
|
||||
|
||||
|
||||
|
@ -2858,3 +2858,8 @@ def unquote(s: str, chars="'\""):
|
|||
if len(s) >= 2 and s[0] == s[-1] and s[0] in chars:
|
||||
return s[1:-1]
|
||||
return s
|
||||
|
||||
|
||||
def endswith_newline(s: str):
|
||||
"""Force one new line character end to string."""
|
||||
return s.rstrip("\n") + "\n"
|
||||
|
|
Loading…
Add table
Reference in a new issue