improve completers (#4648)

* fix: pip -r appends spaces at the end

modularize completing output from subproc-out

* docs:

* fix: flake8

* fix: failing pip comp tests

* refactor: naming xonsh conflicts with actual package

the IDE completions don't work.
we add this naming convention instead.

* feat: option to filter after completion returned

this will help reduce some boilerplate, and we can enrich the filtering
behaviour

* feat: add gh completions

* fix: filtering out completions

* refactor: simplify invoking completer interface

* test: add fixture for xsh with os-env

* test: add tests for gh-completions

* fix: flake error

* fix: mypy errors and update gh completer tests

* fix: handle cross-platform line endings

* feat: include man,bash completer only if available

* todo: improve man page completions

* fix: failing man page tests

* fix: py 3.7 compatibility

* fix: qa error

* fix: stop dir completions

* feat: improve man page completions

now shows descriptions, recognizes more number of options correctly

* fix: update man page completions

* feat: support filtering based on display as well

* Update xonsh/completer.py

Co-authored-by: Gil Forsyth <gforsyth@users.noreply.github.com>

* style:

* test: xfail ptk-shell tests on windows

Co-authored-by: Gil Forsyth <gforsyth@users.noreply.github.com>
This commit is contained in:
Noorhteen Raja NJ 2022-01-27 21:22:36 +05:30 committed by GitHub
parent 278bd87d7c
commit e76115676b
Failed to generate hash of commit
19 changed files with 590 additions and 215 deletions

View file

@ -0,0 +1,23 @@
**Added:**
* completions from man page will now show the description for the options if available.
**Changed:**
* <news item>
**Deprecated:**
* <news item>
**Removed:**
* <news item>
**Fixed:**
* ``pip`` completer now handles path completions correctly
**Security:**
* <news item>

View file

@ -0,0 +1,21 @@
import pytest
from tests.tools import skip_if_not_has
pytestmark = skip_if_not_has("gh")
@pytest.mark.parametrize(
"line, exp",
[
["gh rep", {"repo"}],
["gh repo ", {"archive", "clone", "create", "delete", "edit", "fork"}],
],
)
def test_completions(line, exp, check_completer, xsh_with_env):
# use the actual PATH from os. Otherwise subproc will fail on windows. `unintialized python...`
comps = check_completer(line, prefix=None)
if callable(exp):
exp = exp()
assert comps.intersection(exp)

View file

@ -1,3 +1,6 @@
import json
import subprocess
import pytest
from xonsh.completers.commands import complete_xompletions
@ -37,17 +40,23 @@ def test_pip_list_re1(line):
assert complete_xompletions.matcher.search_completer(line) is None
def pip_installed():
out = subprocess.check_output(["pip", "list", "--format=json"]).decode()
pkgs = json.loads(out)
return {p["name"] for p in pkgs}
@pytest.mark.parametrize(
"line, prefix, exp",
[
["pip", "c", {"cache", "check", "config"}],
["pip show", "", {"setuptools", "wheel", "pip"}],
["pip show", "", pip_installed],
],
)
def test_completions(line, prefix, exp, check_completer, xession, os_env, monkeypatch):
def test_completions(line, prefix, exp, check_completer, xsh_with_env):
# use the actual PATH from os. Otherwise subproc will fail on windows. `unintialized python...`
monkeypatch.setattr(xession, "env", os_env)
comps = check_completer(line, prefix=prefix)
if callable(exp):
exp = exp()
assert comps.intersection(exp)

View file

@ -224,9 +224,16 @@ def xession(mock_xonsh_session) -> XonshSession:
@pytest.fixture
def xsh_with_aliases(mock_xonsh_session) -> XonshSession:
"""Xonsh mock-session with default set of aliases"""
return mock_xonsh_session("aliases")
@pytest.fixture
def xsh_with_env(mock_xonsh_session) -> XonshSession:
"""Xonsh mock-session with os.environ"""
return mock_xonsh_session("env")
@pytest.fixture(scope="session")
def completion_context_parse():
return CompletionContextParser().parse
@ -242,8 +249,35 @@ def check_completer(completer_obj):
"""Helper function to run completer and parse the results as set of strings"""
completer = completer_obj
def _factory(line: str, prefix="", send_original=False):
completions, _ = completer.complete_line(line, prefix=prefix)
def _factory(
line: str, prefix: "None|str" = "", send_original=False, complete_fn=None
):
"""
Parameters
----------
line
prefix
send_original
if True, return the original result from the completer (e.g. RichCompletion instances ...)
complete_fn
if given, use that to get the completions
Returns
-------
completions as set of string if not send
"""
if prefix is not None:
line += " " + prefix
if complete_fn is None:
completions, _ = completer.complete_line(line)
else:
ctx = completer_obj.parse(line)
out = complete_fn(ctx)
if isinstance(out, tuple):
completions = out[0]
else:
completions = out
values = {getattr(i, "value", i).strip() for i in completions}
if send_original:

BIN
tests/man1/man.1.gz Normal file

Binary file not shown.

View file

@ -1,27 +1,107 @@
import os
import subprocess
import pytest # noqa F401
from tools import skip_if_on_windows, skip_if_not_on_darwin
from xonsh.completers.man import complete_from_man
from tools import skip_if_on_windows
from xonsh.parsers.completion_context import (
CompletionContext,
CommandContext,
CommandArg,
)
@skip_if_on_windows
def test_man_completion(monkeypatch, tmpdir, xession):
tempdir = tmpdir.mkdir("test_man")
monkeypatch.setitem(
os.environ, "MANPATH", os.path.dirname(os.path.abspath(__file__))
@pytest.mark.parametrize(
"cmd,exp",
[
[
"yes",
{"--version", "--help"},
],
[
"man",
{
"--all",
"--apropos",
"--ascii",
"--catman",
"--config-file",
"--debug",
"--default",
"--ditroff",
"--encoding",
"--extension",
"--global-apropos",
"--gxditview",
"--help",
"--html",
"--ignore-case",
"--local-file",
"--locale",
"--location",
"--location-cat",
"--manpath",
"--match-case",
"--names-only",
"--nh",
"--nj",
"--no-subpages",
"--pager",
"--preprocessor",
"--prompt",
"--recode",
"--regex",
"--sections",
"--systems",
"--troff",
"--troff-device",
"--update",
"--usage",
"--version",
"--warnings",
"--whatis",
"--wildcard",
},
],
],
)
xession.env.update({"XONSH_DATA_DIR": str(tempdir)})
completions = complete_from_man(
CompletionContext(
CommandContext(args=(CommandArg("yes"),), arg_index=1, prefix="--")
def test_man_completion(xession, check_completer, cmd, exp):
xession.env["MANPATH"] = os.path.dirname(os.path.abspath(__file__))
completions = check_completer(cmd, complete_fn=complete_from_man, prefix="-")
assert completions == exp
@skip_if_not_on_darwin
@pytest.mark.parametrize(
"cmd,exp",
[
[
"ar",
{
"-L",
"-S",
"-T",
"-a",
"-b",
"-c",
"-d",
"-i",
"-m",
"-o",
"-p",
"-q",
"-r",
"-s",
"-t",
"-u",
"-x",
},
],
],
)
)
assert "--version" in completions
assert "--help" in completions
def test_bsd_man_page_completions(xession, check_completer, cmd, exp):
proc = subprocess.run([cmd, "--version"], stderr=subprocess.PIPE)
if (cmd == "ar" and proc.returncode != 1) or (
cmd == "man" and proc.stderr.strip() not in {b"man, version 1.6g"}
):
pytest.skip("A different man page version is installed")
# BSD & Linux have different man page version
completions = check_completer(cmd, complete_fn=complete_from_man, prefix="-")
assert completions == exp

View file

@ -9,6 +9,7 @@ import pyte
from xonsh.ptk_shell.shell import tokenize_ansi
from xonsh.shell import Shell
from tests.tools import ON_WINDOWS
@pytest.mark.parametrize(
@ -106,6 +107,10 @@ def test_tokenize_ansi(prompt_tokens, ansi_string_parts):
["2 * 3", "6"],
],
)
@pytest.mark.xfail(
ON_WINDOWS,
reason="Recent versions use Proactor event loop. This may need some handling",
)
def test_ptk_prompt(line, exp, ptk_shell, capsys):
inp, out, shell = ptk_shell
inp.send_text(f"{line}\nexit\n") # note: terminate with '\n'

View file

@ -1,6 +1,7 @@
"""Tests the xonsh lexer."""
import copy
import os
import shutil
import sys
import ast
import platform
@ -39,11 +40,19 @@ skip_if_on_unix = pytest.mark.skipif(not ON_WINDOWS, reason="Windows stuff")
skip_if_on_darwin = pytest.mark.skipif(ON_DARWIN, reason="not Mac friendly")
skip_if_not_on_darwin = pytest.mark.skipif(not ON_DARWIN, reason="Mac only")
skip_if_on_travis = pytest.mark.skipif(ON_TRAVIS, reason="not Travis CI friendly")
skip_if_pre_3_8 = pytest.mark.skipif(VER_FULL < (3, 8), reason="Python >= 3.8 feature")
def skip_if_not_has(exe: str):
has_exe = shutil.which(exe)
return pytest.mark.skipif(not has_exe, reason=f"{exe} is not available.")
def sp(cmd):
return subprocess.check_output(cmd, universal_newlines=True)

View file

@ -1,6 +1,5 @@
from xonsh.cli_utils import ArgparseCompleter
from xonsh.completers.tools import get_filter_function
from xonsh.parsers.completion_context import CommandContext
@ -10,8 +9,4 @@ def xonsh_complete(command: CommandContext):
from xonsh.main import parser
completer = ArgparseCompleter(parser, command=command)
fltr = get_filter_function()
for comp in completer.complete():
if fltr(comp, command.prefix):
yield comp
# todo: part of return value will have unfiltered=False/True. based on that we can use fuzzy to rank the results
return completer.complete(), False

25
xompletions/gh.py Normal file
View file

@ -0,0 +1,25 @@
"""Completers for gh CLI"""
from xonsh.completers.tools import sub_proc_get_output, completion_from_cmd_output
from xonsh.parsers.completion_context import CommandContext
def _complete(cmd, *args):
out, _ = sub_proc_get_output(cmd, "__complete", *args)
if out:
# directives
# shellCompDirectiveError 1
# shellCompDirectiveNoSpace 2
# shellCompDirectiveNoFileComp 4
# shellCompDirectiveFilterFileExt 8
# shellCompDirectiveFilterDirs 16
# todo: implement directive-numbers above
*lines, dir_num = out.decode().splitlines()
for ln in lines:
yield completion_from_cmd_output(ln)
def xonsh_complete(ctx: CommandContext):
cmd, *args = [arg.value for arg in ctx.args] + [ctx.prefix]
return _complete(cmd, *args)

View file

@ -1,14 +1,14 @@
from xonsh.completers.man import complete_from_man
from xonsh.completers.path import complete_dir
from xonsh.parsers.completion_context import CompletionContext, CommandContext
from xonsh.parsers.completion_context import CommandContext
def xonsh_complete(command: CommandContext):
def xonsh_complete(ctx: CommandContext):
"""
Completion for "rmdir", includes only valid directory names.
"""
opts = complete_from_man(CompletionContext(command))
comps, lp = complete_dir(command)
if len(comps) == 0 and len(opts) == 0:
raise StopIteration
return comps | opts, lp
# if starts with the given prefix then it will get completions from man page
if not ctx.prefix.startswith("-") and ctx.arg_index > 0:
comps, lprefix = complete_dir(ctx)
if not comps:
raise StopIteration # no further file completions
return comps, lprefix

View file

@ -610,7 +610,7 @@ class XonshSession:
self.modules_cache = {}
self.all_jobs = {}
self.completers = default_completers()
self.completers = default_completers(self.commands_cache)
self.builtins = get_default_builtins(execer)
self._initial_builtin_names = frozenset(vars(self.builtins))

View file

@ -9,6 +9,7 @@ from xonsh.completers.tools import (
RichCompletion,
apply_lprefix,
is_exclusive_completer,
get_filter_function,
)
from xonsh.built_ins import XSH
from xonsh.parsers.completion_context import CompletionContext, CompletionContextParser
@ -21,18 +22,48 @@ class Completer:
def __init__(self):
self.context_parser = CompletionContextParser()
def complete_line(self, line: str, prefix: str = None):
"""Handy wrapper to build completion-context when cursor is at the end"""
line = line.strip()
if prefix:
begidx = len(line) + 1
def parse(
self, text: str, cursor_index: "None|int" = None, ctx=None
) -> "CompletionContext":
"""Parse the given text
Parameters
----------
text
multi-line text
cursor_index
position of the cursor. If not given, then it is considered to be at the end.
ctx
Execution context
"""
cursor_index = len(text) if cursor_index is None else cursor_index
return self.context_parser.parse(text, cursor_index, ctx)
def complete_line(self, text: str):
"""Handy wrapper to build command-completion-context when cursor is at the end.
Notes
-----
suffix is not supported; text after last space is parsed as prefix.
"""
ctx = self.parse(text)
cmd_ctx = ctx.command
if not cmd_ctx:
raise RuntimeError("Only Command context is empty")
prefix = cmd_ctx.prefix
line = text
begidx = text.rfind(prefix)
endidx = begidx + len(prefix)
line = " ".join([line, prefix])
else:
line += " "
begidx = endidx = len(line)
return self.complete(
prefix, line, begidx, endidx, cursor_index=len(line), multiline_text=line
prefix,
line,
begidx,
endidx,
cursor_index=len(line),
multiline_text=line,
completion_context=ctx,
)
def complete(
@ -44,6 +75,7 @@ class Completer:
ctx=None,
multiline_text=None,
cursor_index=None,
completion_context=None,
):
"""Complete the string, given a possible execution context.
@ -74,16 +106,16 @@ class Completer:
Length of the prefix to be replaced in the completion.
"""
if multiline_text is not None and cursor_index is not None:
completion_context: tp.Optional[
CompletionContext
] = self.context_parser.parse(
if (
(multiline_text is not None)
and (cursor_index is not None)
and (completion_context is None)
):
completion_context: tp.Optional[CompletionContext] = self.parse(
multiline_text,
cursor_index,
ctx,
)
else:
completion_context = None
ctx = ctx or {}
return self.complete_from_context(
@ -140,6 +172,8 @@ class Completer:
def generate_completions(
completion_context, old_completer_args, trace: bool
) -> tp.Iterator[tp.Tuple[Completion, int]]:
filter_func = get_filter_function()
for name, func in XSH.completers.items():
try:
if is_contextual_completer(func):
@ -167,24 +201,38 @@ class Completer:
and completion_context is not None
and completion_context.command is not None
)
# -- set comp-defaults --
# the default is that the completer function filters out as necessary
# we can change that once fuzzy/substring matches are added
is_filtered = True
custom_lprefix = False
prefix = ""
if completing_contextual_command:
prefix = completion_context.command.prefix
elif old_completer_args is not None:
prefix = old_completer_args[0]
lprefix = len(prefix)
if isinstance(out, cabc.Sequence):
res, lprefix = out
# update comp-defaults from
res, lprefix_filtered = out
if isinstance(lprefix_filtered, bool):
is_filtered = lprefix_filtered
else:
lprefix = lprefix_filtered
custom_lprefix = True
else:
res = out
custom_lprefix = False
if completing_contextual_command:
lprefix = len(completion_context.command.prefix)
elif old_completer_args is not None:
lprefix = len(old_completer_args[0])
else:
lprefix = 0
if res is None:
continue
items = []
for comp in res:
if (not is_filtered) and (not filter_func(comp, prefix)):
continue
comp = Completer._format_completion(
comp,
completion_context,
@ -215,7 +263,10 @@ class Completer:
print("\nTRACE COMPLETIONS: Getting completions with context:")
sys.displayhook(completion_context)
lprefix = 0
completions = set()
# using dict to keep order py3.6+
completions = {}
query_limit = XSH.env.get("COMPLETION_QUERY_LIMIT")
for comp in self.generate_completions(
@ -224,7 +275,7 @@ class Completer:
trace,
):
completion, lprefix = comp
completions.add(completion)
completions[completion] = None
if query_limit and len(completions) >= query_limit:
if trace:
print(
@ -233,6 +284,7 @@ class Completer:
break
def sortkey(s):
# todo: should sort with prefix > substring > fuzzy
return s.lstrip(''''"''').lower()
# the last completer's lprefix is returned. other lprefix values are inside the RichCompletions.

View file

@ -5,10 +5,7 @@ from xonsh.completers.completer import (
remove_completer,
add_one_completer,
)
from xonsh.completers.tools import (
contextual_command_completer,
get_filter_function,
)
from xonsh.completers.tools import contextual_command_completer
from xonsh.parsers.completion_context import CommandContext
# for backward compatibility
@ -105,8 +102,7 @@ class CompleterAlias(xcli.ArgParserAlias):
def complete(
self,
line: xcli.Annotated["list[str]", xcli.Arg(nargs="...")],
prefix: "str | None" = None,
line: str,
):
"""Output the completions to stdout
@ -119,16 +115,18 @@ class CompleterAlias(xcli.ArgParserAlias):
Examples
--------
To get completions such as `git checkout`
To get completions such as ``pip install``
$ completer complete --prefix=check git
$ completer complete 'pip in'
To get ``pip`` sub-commands, pass the command with a space at the end
$ completer complete 'pip '
"""
from xonsh.completer import Completer
completer = Completer()
completions, prefix_length = completer.complete_line(
" ".join(line), prefix=prefix
)
completions, prefix_length = completer.complete_line(line)
self.out(f"Prefix Length: {prefix_length}")
for comp in completions:
@ -172,7 +170,4 @@ def complete_aliases(command: CommandContext):
return
possible = completer(command=command, alias=alias)
fltr = get_filter_function()
for comp in possible:
if fltr(comp, command.prefix):
yield comp
return possible, False

View file

@ -1,3 +1,4 @@
import contextlib
import functools
import importlib
import importlib.util as im_util
@ -121,9 +122,7 @@ class ModuleMatcher:
extra search paths to use if finding module on namespace package fails
"""
# list of pre-defined patterns. More can be added using the public method ``.wrap``
self._patterns: tp.Dict[str, str] = {
r"\bx?pip(?:\d|\.)*(exe)?$": "pip",
}
self._patterns: tp.Dict[str, str] = {}
self._compiled: tp.Dict[str, tp.Pattern] = {}
self.contextual = True
self.base = base
@ -182,13 +181,16 @@ class ModuleMatcher:
return module
@functools.lru_cache(maxsize=10)
def get_module(self, name):
try:
def get_module(self, module: str):
for name in [
module,
f"_{module}", # naming convention to not clash with actual python package
]:
with contextlib.suppress(ModuleNotFoundError):
return importlib.import_module(f"{self.base}.{name}")
except ModuleNotFoundError:
file = self._find_file_path(name)
file = self._find_file_path(module)
if file:
return self.import_module(file, self.base, name)
return self.import_module(file, self.base, module)
def search_completer(self, cmd: str, cleaned=False):
if not cleaned:
@ -221,6 +223,7 @@ class CommandCompleter:
"xompletions",
extra_paths=XSH.env.get("XONSH_COMPLETER_DIRS", []),
)
self._matcher.wrap(r"\bx?pip(?:\d|\.)*(exe)?$", "pip")
return self._matcher
@staticmethod

View file

@ -19,10 +19,9 @@ from xonsh.completers._aliases import complete_aliases
from xonsh.completers.environment import complete_environment_vars
def default_completers():
def default_completers(cmd_cache):
"""Creates a copy of the default completers."""
return collections.OrderedDict(
[
defaults = [
# non-exclusive completers:
("end_proc_tokens", complete_end_proc_tokens),
("end_proc_keywords", complete_end_proc_keywords),
@ -33,9 +32,19 @@ def default_completers():
("alias", complete_aliases),
("xompleter", complete_xompletions),
("import", complete_import),
]
for cmd, func in [
("bash", complete_from_bash),
("man", complete_from_man),
]:
if cmd in cmd_cache:
defaults.append((cmd, func))
defaults.extend(
[
("python", complete_python),
("path", complete_path),
]
)
return collections.OrderedDict(defaults)

View file

@ -1,27 +1,128 @@
import os
import functools
import json
import re
import pickle
import shutil
import subprocess
import typing as tp
import textwrap
from pathlib import Path
from xonsh.parsers.completion_context import CommandContext
from xonsh.built_ins import XSH
import xonsh.lazyasd as xl
from xonsh.completers.tools import get_filter_function, contextual_command_completer
OPTIONS: tp.Optional[tp.Dict[str, tp.Any]] = None
OPTIONS_PATH: tp.Optional[str] = None
from xonsh.completers.tools import (
contextual_command_completer,
RichCompletion,
)
from xonsh.parsers.completion_context import CommandContext
@xl.lazyobject
def SCRAPE_RE():
return re.compile(r"^(?:\s*(?:-\w|--[a-z0-9-]+)[\s,])+", re.M)
@functools.lru_cache(maxsize=None)
def get_man_completions_path() -> Path:
env = XSH.env or {}
datadir = Path(env["XONSH_DATA_DIR"]) / "generated_completions" / "man"
if datadir.exists() and (not datadir.is_dir()):
shutil.move(datadir, datadir.with_suffix(".bkp"))
if not datadir.exists():
datadir.mkdir(exist_ok=True, parents=True)
return datadir
@xl.lazyobject
def INNER_OPTIONS_RE():
return re.compile(r"-\w|--[a-z0-9-]+")
def _get_man_page(cmd: str):
"""without control characters"""
env = XSH.env.detype()
manpage = subprocess.Popen(
["man", cmd], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, env=env
)
# This is a trick to get rid of reverse line feeds
return subprocess.check_output(["col", "-b"], stdin=manpage.stdout, env=env)
@functools.lru_cache(maxsize=None)
def _man_option_string_regex():
return re.compile(
r"(?:(,\s?)|^|(\sor\s))(?P<option>-[\w]|--[\w-]+)(?=\[?(\s|,|=\w+|$))"
)
def generate_options_of(cmd: str):
out = _get_man_page(cmd)
if not out:
return
def get_headers(text: str):
"""split as header-body based on indent"""
if not text:
return
header = ""
body = []
for line in textwrap.dedent(text.replace("\n\t", "\n ")).splitlines():
if not line.strip():
continue
if line.startswith((" ", "\t")):
body.append(line)
else:
if header or body:
yield header, body
# found new section
header = line.strip()
body = []
if header or body:
yield header, body
def split_options_string(text: str):
text = text.strip()
regex = _man_option_string_regex()
regex.findall(text)
options = []
for match in regex.finditer(text):
option = match.groupdict().pop("option", None)
if option:
options.append(option)
text = text[match.end() :]
return options, text.strip()
def get_option_section():
option_sect = dict(get_headers(out.decode()))
small_names = {k.lower(): k for k in option_sect}
for head in (
"options",
"command options",
"description",
): # prefer sections in this order
if head in small_names:
title = small_names[head]
return "\n".join(option_sect[title])
def get_options(text):
"""finally get the options"""
# return old section if
for opt, lines in get_headers(text):
# todo: some have [+-] or such vague notations
if opt.startswith("-"):
# sometime a single line will have both desc and options
option_strings, rest = split_options_string(opt)
descs = []
if rest:
descs.append(rest)
if lines:
descs.append(textwrap.dedent("\n".join(lines)))
if option_strings:
yield ". ".join(descs), tuple(option_strings)
elif lines:
# sometimes the options are nested inside subheaders
yield from get_options("\n".join(lines))
yield from get_options(get_option_section())
@functools.lru_cache(maxsize=10)
def _parse_man_page_options(cmd: str) -> "dict[str, tuple[str, ...]]":
path = get_man_completions_path() / f"{cmd}.json"
if path.exists():
return json.loads(path.read_text())
options = dict(generate_options_of(cmd))
path.write_text(json.dumps(options))
return options
@contextual_command_completer
@ -30,31 +131,15 @@ def complete_from_man(context: CommandContext):
Completes an option name, based on the contents of the associated man
page.
"""
global OPTIONS, OPTIONS_PATH
if OPTIONS is None:
datadir: str = XSH.env["XONSH_DATA_DIR"] # type: ignore
OPTIONS_PATH = os.path.join(datadir, "man_completions_cache")
try:
with open(OPTIONS_PATH, "rb") as f:
OPTIONS = pickle.load(f)
except Exception:
OPTIONS = {}
if context.arg_index == 0 or not context.prefix.startswith("-"):
return set()
return
cmd = context.args[0].value
if cmd not in OPTIONS:
try:
manpage = subprocess.Popen(
["man", cmd], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
def completions():
for desc, opts in _parse_man_page_options(cmd).items():
yield RichCompletion(
value=opts[-1], display=", ".join(opts), description=desc
)
# This is a trick to get rid of reverse line feeds
enc_text = subprocess.check_output(["col", "-b"], stdin=manpage.stdout)
text = enc_text.decode("utf-8")
scraped_text = " ".join(SCRAPE_RE.findall(text))
matches = INNER_OPTIONS_RE.findall(scraped_text)
OPTIONS[cmd] = matches
with open(tp.cast(str, OPTIONS_PATH), "wb") as f:
pickle.dump(OPTIONS, f)
except Exception:
return set()
return {s for s in OPTIONS[cmd] if get_filter_function()(s, context.prefix)}
return completions(), False

View file

@ -1,22 +1,32 @@
"""Xonsh completer tools."""
import inspect
import os
import shlex
import subprocess
import textwrap
import typing as tp
from functools import wraps
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.lazyasd import lazyobject
from xonsh.parsers.completion_context import CompletionContext, CommandContext
def _filter_normal(s, x):
return s.startswith(x)
def _filter_with_func(text, prefix, func):
if isinstance(text, RichCompletion) and text.display:
parts = [p.strip() for p in text.display.split(",")]
return any(map(lambda part: func(part.strip(), prefix), parts))
return func(text, prefix)
def _filter_ignorecase(s, x):
return s.lower().startswith(x.lower())
def _filter_normal(text, prefix):
return _filter_with_func(text, prefix, str.startswith)
def _filter_ignorecase(text, prefix):
func = lambda txt, pre: txt.lower().startswith(pre.lower())
return _filter_with_func(text, prefix, func)
def get_filter_function():
@ -205,33 +215,84 @@ def apply_lprefix(comps, lprefix):
yield RichCompletion(comp, prefix_len=lprefix)
def completion_from_cmd_output(line: str, append_space=False):
line = line.strip()
if "\t" in line:
cmd, desc = map(str.strip, line.split("\t", maxsplit=1))
else:
cmd, desc = line, ""
# special treatment for path completions.
# not appending space even if it is a single candidate.
if cmd.endswith(os.pathsep) or (os.altsep and cmd.endswith(os.altsep)):
append_space = False
return RichCompletion(
cmd,
description=desc,
append_space=append_space,
)
def sub_proc_get_output(*args, **env_vars: str) -> "tuple[bytes, bool]":
env = {}
# env.detype is mutable, so update the newly created variable
env.update(XSH.env.detype())
env.update(env_vars) # prefer passed env variables
out = b""
not_found = False
try:
out = subprocess.run(
args,
env=env,
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE,
).stdout
except FileNotFoundError:
not_found = True
except Exception as ex:
xt.print_exception(f"Failed to get completions from sub-proc: {args} ({ex!r})")
return out, not_found
def complete_from_sub_proc(*args: str, sep=None, filter_prefix=None, **env_vars: str):
if sep is None:
sep = str.splitlines
filter_func = get_filter_function()
stdout, _ = sub_proc_get_output(*args, **env_vars)
if stdout:
output = stdout.decode().strip()
if callable(sep):
lines = sep(output)
else:
lines = output.split(sep)
# if there is a single completion candidate then maybe it is over
append_space = len(lines) == 1
for line in lines:
if filter_prefix and (not filter_func(line, filter_prefix)):
continue
comp = completion_from_cmd_output(line, append_space)
yield comp
def comp_based_completer(ctx: CommandContext, **env: str):
"""Helper function to complete commands such as ``pip``,``django-admin``,... that use bash's ``complete``"""
prefix = ctx.prefix
filter_func = get_filter_function()
args = [arg.value for arg in ctx.args]
if prefix:
args.append(prefix)
args = [arg.raw_value for arg in ctx.args]
env.update(
{
"COMP_WORDS": " ".join(args),
"COMP_CWORD": str(ctx.arg_index),
}
yield from complete_from_sub_proc(
args[0],
sep=shlex.split,
COMP_WORDS=os.linesep.join(args) + os.linesep,
COMP_CWORD=str(ctx.arg_index),
**env,
)
env.update(XSH.env.detype())
try:
proc = subprocess.run(
[args[0]],
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE,
env=env,
)
except FileNotFoundError:
return None
if proc.stdout:
out = shlex.split(proc.stdout.decode())
for cmp in out:
if filter_func(cmp, prefix):
yield RichCompletion(cmp, append_space=True)

View file

@ -1,30 +1,8 @@
from xonsh.completers import completer
from xonsh.completers.tools import RichCompletion, contextual_command_completer
import os
import subprocess as sp
from xonsh.built_ins import XSH
from xonsh.completers.tools import contextual_command_completer, complete_from_sub_proc
from xonsh.parsers.completion_context import CommandContext
def create_rich_completion(line: str, append_space=False):
line = line.strip()
if "\t" in line:
cmd, desc = map(str.strip, line.split("\t", maxsplit=1))
else:
cmd, desc = line, ""
# special treatment for path completions.
# not appending space even if it is a single candidate.
if cmd.endswith(os.pathsep):
append_space = False
return RichCompletion(
cmd,
description=desc,
append_space=append_space,
)
@contextual_command_completer
def fish_proc_completer(ctx: CommandContext):
"""Populate completions using fish shell and remove bash-completer"""
@ -36,21 +14,12 @@ def fish_proc_completer(ctx: CommandContext):
f"complete --no-files {ctx.command}", # switch off basic file completions for the executable
f"complete -C '{line}'",
]
args = ["fish", "-c", "; ".join(script_lines)]
env = XSH.env.detype()
try:
output = sp.check_output(args, env=env, stderr=sp.DEVNULL).decode()
except Exception as ex:
print(f"Failed to get fish-completions: {ex}")
return
if output:
lines = output.strip().splitlines(keepends=False)
# if there is a single completion candidate then maybe it is over
append_space = len(lines) == 1
for line in lines:
comp = create_rich_completion(line, append_space)
yield comp
yield from complete_from_sub_proc(
"fish",
"-c",
"; ".join(script_lines),
)
completer.add_one_completer("fish", fish_proc_completer, "<bash")