Introduce new resolver for executables to replace commands_cache usages and speed up everything (#5544)

* remove commands_cache from pyghooks to avoid cc.update_cache on every key press

* create executables.py

* replace cc.locate_binary to locate_executable

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* vc: replace locate_binary

* pyghooks: remove commands_cache

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove unused func _yield_executables

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Move `executables_in` from tools to commands_cache to avoid circular imports.

* First steps to `procs.executables` that is based on `commands_cache` and `tools`.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* test_executables

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add not recommended notes

* tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add `get_paths` with test

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix source test

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* vc: remove tests because commands cache is not used

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* specs: fix exception for recursive call

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* specs: fix exception for recursive call

* improve test_locate_executable

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix test

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix test

* beautify pathext

* tests

* docs

* tests

* update locators

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* locate_file

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* comments

* return environ locate bin test

* comments

* Update xonsh/procs/executables.py

Co-authored-by: Jason R. Coombs <jaraco@jaraco.com>

* Update xonsh/procs/executables.py

Co-authored-by: Jason R. Coombs <jaraco@jaraco.com>

* Update xonsh/procs/executables.py

Co-authored-by: Jason R. Coombs <jaraco@jaraco.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add itertools

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* moving is_executable

* doc

* optimization is_executable_in_windows

* micro improvements

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* news

* news

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* bump test

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: a <1@1.1>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jason R. Coombs <jaraco@jaraco.com>
This commit is contained in:
Andy Kipp 2024-07-09 09:44:03 +02:00 committed by GitHub
parent 2c826c9b75
commit 059fc301e7
Failed to generate hash of commit
20 changed files with 341 additions and 185 deletions

View file

@ -0,0 +1,23 @@
**Added:**
* <news item>
**Changed:**
* New executable resolving method was introduced and the commands_cache usages were replaced in the key places. As result we expect speed up in xonsh startup, reducing lagging during typing in prompt and speed ups during the commands execution (#5544 by @anki-code).
**Deprecated:**
* <news item>
**Removed:**
* <news item>
**Fixed:**
* <news item>
**Security:**
* <news item>

View file

@ -1,6 +1,7 @@
import builtins import builtins
import os.path import os.path
from contextlib import contextmanager from contextlib import contextmanager
from pathlib import Path
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
@ -34,9 +35,8 @@ def test_source_current_dir(mockopen, monkeypatch, mocked_execx_checker):
assert mocked_execx_checker == ["foo", "bar"] assert mocked_execx_checker == ["foo", "bar"]
def test_source_path(mockopen, mocked_execx_checker, patch_locate_binary, xession): def test_source_path(mockopen, mocked_execx_checker, xession):
patch_locate_binary(xession.commands_cache) with xession.env.swap(PATH=[Path(__file__).parent.parent / "bin"]):
source_alias(["foo", "bar"]) source_alias(["foo", "bar"])
path_foo = os.path.join("bin", "foo") path_foo = os.path.join("bin", "foo")
path_bar = os.path.join("bin", "bar") path_bar = os.path.join("bin", "bar")

1
tests/bin/bar Executable file
View file

@ -0,0 +1 @@
# Test source_alias

1
tests/bin/foo Executable file
View file

@ -0,0 +1 @@
# Test source_alias

View file

@ -0,0 +1,46 @@
import os
from xonsh.environ import Env
from xonsh.platform import ON_WINDOWS
from xonsh.procs.executables import get_paths, get_possible_names, locate_executable
def test_get_possible_names():
env = Env(PATHEXT=[".EXE", ".COM"])
assert get_possible_names("file", env) == ["file", "file.exe", "file.com"]
assert get_possible_names("FILE", env) == ["FILE", "FILE.EXE", "FILE.COM"]
def test_get_paths(tmpdir):
bindir1 = str(tmpdir.mkdir("bindir1"))
bindir2 = str(tmpdir.mkdir("bindir2"))
env = Env(PATH=[bindir1, bindir2, bindir1, "nodir"])
assert get_paths(env) == (bindir2, bindir1)
def test_locate_executable(tmpdir, xession):
bindir = tmpdir.mkdir("bindir")
bindir.mkdir("subdir")
executables = ["file1.EXE", "file2.COM", "file2.EXE", "file3"]
not_executables = ["file4.EXE", "file5"]
for exefile in executables + not_executables:
f = bindir / exefile
f.write_text("binary", encoding="utf8")
if exefile in executables:
os.chmod(f, 0o777)
pathext = [".EXE", ".COM"] if ON_WINDOWS else []
with xession.env.swap(PATH=str(bindir), PATHEXT=pathext):
assert locate_executable("file1.EXE")
assert locate_executable("nofile") is None
assert locate_executable("file5") is None
assert locate_executable("subdir") is None
if ON_WINDOWS:
assert locate_executable("file1")
assert locate_executable("file4")
assert locate_executable("file2").endswith("file2.exe")
else:
assert locate_executable("file3")
assert locate_executable("file1") is None
assert locate_executable("file4") is None
assert locate_executable("file2") is None

View file

@ -2,7 +2,6 @@ import os
import subprocess as sp import subprocess as sp
import textwrap import textwrap
from pathlib import Path from pathlib import Path
from unittest.mock import Mock
import pytest import pytest
@ -119,27 +118,6 @@ current = yellow reverse
assert not branch.startswith("\u001b[") assert not branch.startswith("\u001b[")
def test_current_branch_calls_locate_binary_for_empty_cmds_cache(xession, monkeypatch):
cache = xession.commands_cache
monkeypatch.setattr(cache, "is_empty", Mock(return_value=True))
monkeypatch.setattr(cache, "locate_binary", Mock(return_value=""))
vc.current_branch()
assert cache.locate_binary.called
def test_current_branch_does_not_call_locate_binary_for_non_empty_cmds_cache(
xession,
monkeypatch,
):
cache = xession.commands_cache
monkeypatch.setattr(cache, "is_empty", Mock(return_value=False))
monkeypatch.setattr(cache, "locate_binary", Mock(return_value=""))
# make lazy locate return nothing to avoid running vc binaries
monkeypatch.setattr(cache, "lazy_locate_binary", Mock(return_value=""))
vc.current_branch()
assert not cache.locate_binary.called
def test_dirty_working_directory(repo, set_xenv): def test_dirty_working_directory(repo, set_xenv):
get_dwd = "{}_dirty_working_directory".format(repo["vc"]) get_dwd = "{}_dirty_working_directory".format(repo["vc"])
set_xenv(repo["dir"]) set_xenv(repo["dir"])

View file

@ -1,6 +1,8 @@
import os import os
import pickle import pickle
import stat
import time import time
from tempfile import TemporaryDirectory
import pytest import pytest
@ -8,12 +10,16 @@ from xonsh.commands_cache import (
SHELL_PREDICTOR_PARSER, SHELL_PREDICTOR_PARSER,
CommandsCache, CommandsCache,
_Commands, _Commands,
executables_in,
predict_false, predict_false,
predict_shell, predict_shell,
predict_true, predict_true,
) )
from xonsh.platform import ON_WINDOWS
from xonsh.pytest.tools import skip_if_on_windows from xonsh.pytest.tools import skip_if_on_windows
PATHEXT_ENV = {"PATHEXT": [".COM", ".EXE", ".BAT"]}
def test_commands_cache_lazy(xession): def test_commands_cache_lazy(xession):
cc = xession.commands_cache cc = xession.commands_cache
@ -260,3 +266,43 @@ def test_nixos_coreutils(tmp_path):
assert cache.predict_threadable(["echo", "1"]) is True assert cache.predict_threadable(["echo", "1"]) is True
assert cache.predict_threadable(["cat", "file"]) is False assert cache.predict_threadable(["cat", "file"]) is False
def test_executables_in(xession):
expected = set()
types = ("file", "directory", "brokensymlink")
if ON_WINDOWS:
# Don't test symlinks on windows since it requires admin
types = ("file", "directory")
executables = (True, False)
with TemporaryDirectory() as test_path:
for _type in types:
for executable in executables:
fname = f"{_type}_{executable}"
if _type == "none":
continue
if _type == "file" and executable:
ext = ".exe" if ON_WINDOWS else ""
expected.add(fname + ext)
else:
ext = ""
path = os.path.join(test_path, fname + ext)
if _type == "file":
with open(path, "w") as f:
f.write(fname)
elif _type == "directory":
os.mkdir(path)
elif _type == "brokensymlink":
tmp_path = os.path.join(test_path, "i_wont_exist")
with open(tmp_path, "w") as f:
f.write("deleteme")
os.symlink(tmp_path, path)
os.remove(tmp_path)
if executable and not _type == "brokensymlink":
os.chmod(path, stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
if ON_WINDOWS:
xession.env = PATHEXT_ENV
result = set(executables_in(test_path))
else:
result = set(executables_in(test_path))
assert expected == result

View file

@ -729,8 +729,11 @@ def test_script(case):
if ON_DARWIN: if ON_DARWIN:
script = script.replace("tests/bin", str(Path(__file__).parent / "bin")) script = script.replace("tests/bin", str(Path(__file__).parent / "bin"))
out, err, rtn = run_xonsh(script) out, err, rtn = run_xonsh(script)
out = out.replace("bash: no job control in this shell\n", "")
if callable(exp_out): if callable(exp_out):
assert exp_out(out) assert exp_out(
out
), f"CASE:\nscript=***\n{script}\n***,\nExpected: {exp_out!r},\nActual: {out!r}"
else: else:
assert exp_out == out assert exp_out == out
assert exp_rtn == rtn assert exp_rtn == rtn

View file

@ -4,10 +4,8 @@ import datetime as dt
import os import os
import pathlib import pathlib
import re import re
import stat
import subprocess import subprocess
import warnings import warnings
from tempfile import TemporaryDirectory
import pytest import pytest
@ -35,7 +33,6 @@ from xonsh.tools import (
ensure_timestamp, ensure_timestamp,
env_path_to_str, env_path_to_str,
escape_windows_cmd_string, escape_windows_cmd_string,
executables_in,
expand_case_matching, expand_case_matching,
expand_path, expand_path,
expandvars, expandvars,
@ -101,7 +98,6 @@ INDENT = " "
TOOLS_ENV = {"EXPAND_ENV_VARS": True, "XONSH_ENCODING_ERRORS": "strict"} TOOLS_ENV = {"EXPAND_ENV_VARS": True, "XONSH_ENCODING_ERRORS": "strict"}
ENCODE_ENV_ONLY = {"XONSH_ENCODING_ERRORS": "strict"} ENCODE_ENV_ONLY = {"XONSH_ENCODING_ERRORS": "strict"}
PATHEXT_ENV = {"PATHEXT": [".COM", ".EXE", ".BAT"]}
def test_random_choice(): def test_random_choice():
@ -1616,46 +1612,6 @@ def test_partial_string(leaders, prefix, quote):
assert obs == exp assert obs == exp
def test_executables_in(xession):
expected = set()
types = ("file", "directory", "brokensymlink")
if ON_WINDOWS:
# Don't test symlinks on windows since it requires admin
types = ("file", "directory")
executables = (True, False)
with TemporaryDirectory() as test_path:
for _type in types:
for executable in executables:
fname = f"{_type}_{executable}"
if _type == "none":
continue
if _type == "file" and executable:
ext = ".exe" if ON_WINDOWS else ""
expected.add(fname + ext)
else:
ext = ""
path = os.path.join(test_path, fname + ext)
if _type == "file":
with open(path, "w") as f:
f.write(fname)
elif _type == "directory":
os.mkdir(path)
elif _type == "brokensymlink":
tmp_path = os.path.join(test_path, "i_wont_exist")
with open(tmp_path, "w") as f:
f.write("deleteme")
os.symlink(tmp_path, path)
os.remove(tmp_path)
if executable and not _type == "brokensymlink":
os.chmod(path, stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
if ON_WINDOWS:
xession.env = PATHEXT_ENV
result = set(executables_in(test_path))
else:
result = set(executables_in(test_path))
assert expected == result
@pytest.mark.parametrize( @pytest.mark.parametrize(
"inp, exp", "inp, exp",
[ [

View file

@ -31,6 +31,7 @@ from xonsh.platform import (
ON_OPENBSD, ON_OPENBSD,
ON_WINDOWS, ON_WINDOWS,
) )
from xonsh.procs.executables import locate_file
from xonsh.procs.jobs import bg, clean_jobs, disown, fg, jobs from xonsh.procs.jobs import bg, clean_jobs, disown, fg, jobs
from xonsh.procs.specs import SpecAttrModifierAlias, SpecModifierAlias from xonsh.procs.specs import SpecAttrModifierAlias, SpecModifierAlias
from xonsh.timings import timeit_alias from xonsh.timings import timeit_alias
@ -637,7 +638,7 @@ def source_alias(args, stdin=None):
for i, fname in enumerate(args): for i, fname in enumerate(args):
fpath = fname fpath = fname
if not os.path.isfile(fpath): if not os.path.isfile(fpath):
fpath = locate_binary(fname) fpath = locate_file(fname)
if fpath is None: if fpath is None:
if env.get("XONSH_DEBUG"): if env.get("XONSH_DEBUG"):
print(f"source: {fname}: No such file", file=sys.stderr) print(f"source: {fname}: No such file", file=sys.stderr)

View file

@ -16,7 +16,12 @@ from pathlib import Path
from xonsh.lib.lazyasd import lazyobject from xonsh.lib.lazyasd import lazyobject
from xonsh.platform import ON_POSIX, ON_WINDOWS, pathbasename from xonsh.platform import ON_POSIX, ON_WINDOWS, pathbasename
from xonsh.tools import executables_in from xonsh.procs.executables import (
get_paths,
get_possible_names,
is_executable_in_posix,
is_executable_in_windows,
)
if ON_WINDOWS: if ON_WINDOWS:
from case_insensitive_dict import CaseInsensitiveDict as CacheDict from case_insensitive_dict import CaseInsensitiveDict as CacheDict
@ -29,12 +34,58 @@ class _Commands(tp.NamedTuple):
cmds: "tuple[str, ...]" cmds: "tuple[str, ...]"
def _yield_accessible_unix_file_names(path):
"""yield file names of executable files in path."""
if not os.path.exists(path):
return
for file_ in os.scandir(path):
if is_executable_in_posix(file_):
yield file_.name
def _executables_in_posix(path):
if not os.path.exists(path):
return
else:
yield from _yield_accessible_unix_file_names(path)
def _executables_in_windows(path):
if not os.path.isdir(path):
return
try:
for x in os.scandir(path):
if is_executable_in_windows(x):
yield x.name
except FileNotFoundError:
# On Windows, there's no guarantee for the directory to really
# exist even if isdir returns True. This may happen for instance
# if the path contains trailing spaces.
return
def executables_in(path) -> tp.Iterable[str]:
"""Returns a generator of files in path that the user could execute."""
if ON_WINDOWS:
func = _executables_in_windows
else:
func = _executables_in_posix
try:
yield from func(path)
except PermissionError:
return
class CommandsCache(cabc.Mapping): class CommandsCache(cabc.Mapping):
"""A lazy cache representing the commands available on the file system. """A lazy cache representing the commands available on the file system.
The keys are the command names and the values a tuple of (loc, has_alias) The keys are the command names and the values a tuple of (loc, has_alias)
where loc is either a str pointing to the executable on the file system or where loc is either a str pointing to the executable on the file system or
None (if no executable exists) and has_alias is a boolean flag for whether None (if no executable exists) and has_alias is a boolean flag for whether
the command has an alias. the command has an alias.
Note! There is ``xonsh.procs.executables`` module with resolving executables.
Usage ``executables`` is preferred instead of commands_cache for cases
where you just need to locate executable command.
""" """
CACHE_FILE = "path-commands-cache.pickle" CACHE_FILE = "path-commands-cache.pickle"
@ -99,25 +150,7 @@ class CommandsCache(cabc.Mapping):
return len(self._cmds_cache) == 0 return len(self._cmds_cache) == 0
def get_possible_names(self, name): def get_possible_names(self, name):
"""Expand name to all possible variants based on `PATHEXT`. return get_possible_names(name, self.env)
PATHEXT is a Windows convention containing extensions to be
considered when searching for an executable file.
Conserves order of any extensions found and gives precedence
to the bare name.
"""
extensions = [""] + self.env.get("PATHEXT", [])
return [name + ext for ext in extensions]
@staticmethod
def remove_dups(paths):
cont = set()
for p in map(os.path.realpath, paths):
if p not in cont:
cont.add(p)
if os.path.isdir(p):
yield p
def _update_aliases_cache(self): def _update_aliases_cache(self):
"""Update aliases checksum and return result: updated or not.""" """Update aliases checksum and return result: updated or not."""
@ -162,10 +195,15 @@ class CommandsCache(cabc.Mapping):
return current_path return current_path
def update_cache(self): def update_cache(self):
"""The main function to update commands cache.
Note! There is ``xonsh.procs.executables`` module with resolving executables.
Usage ``executables`` is preferred instead of commands_cache for cases
where you just need to locate executable command.
"""
env = self.env env = self.env
# iterate backwards so that entries at the front of PATH overwrite # iterate backwards so that entries at the front of PATH overwrite
# entries at the back. # entries at the back.
paths = tuple(reversed(tuple(self.remove_dups(env.get("PATH") or [])))) paths = get_paths(env)
if self._update_and_check_changes(paths): if self._update_and_check_changes(paths):
all_cmds = CacheDict() all_cmds = CacheDict()
for cmd, path in self._iter_binaries(paths): for cmd, path in self._iter_binaries(paths):
@ -256,6 +294,9 @@ class CommandsCache(cabc.Mapping):
def locate_binary(self, name, ignore_alias=False): def locate_binary(self, name, ignore_alias=False):
"""Locates an executable on the file system using the cache. """Locates an executable on the file system using the cache.
NOT RECOMMENDED. Take a look into `xonsh.procs.executables.locate_executable`
before using this function.
Parameters Parameters
---------- ----------
name : str name : str
@ -270,6 +311,9 @@ class CommandsCache(cabc.Mapping):
def lazy_locate_binary(self, name, ignore_alias=False): def lazy_locate_binary(self, name, ignore_alias=False):
"""Locates an executable in the cache, without checking its validity. """Locates an executable in the cache, without checking its validity.
NOT RECOMMENDED. Take a look into `xonsh.procs.executables.locate_executable`
before using this function.
Parameters Parameters
---------- ----------
name : str name : str

View file

@ -4,8 +4,8 @@ import re
import typing as tp import typing as tp
import xonsh.platform as xp import xonsh.platform as xp
import xonsh.tools as xt
from xonsh.built_ins import XSH from xonsh.built_ins import XSH
from xonsh.commands_cache import executables_in
from xonsh.completer import Completer from xonsh.completer import Completer
from xonsh.completers.tools import ( from xonsh.completers.tools import (
RichCompletion, RichCompletion,
@ -35,12 +35,12 @@ def complete_command(command: CommandContext):
kwargs["description"] = "Alias" if is_alias else path kwargs["description"] = "Alias" if is_alias else path
yield RichCompletion(s, append_space=True, **kwargs) yield RichCompletion(s, append_space=True, **kwargs)
if xp.ON_WINDOWS: if xp.ON_WINDOWS:
for i in xt.executables_in("."): for i in executables_in("."):
if i.startswith(cmd): if i.startswith(cmd):
yield RichCompletion(i, append_space=True) yield RichCompletion(i, append_space=True)
base = os.path.basename(cmd) base = os.path.basename(cmd)
if os.path.isdir(base): if os.path.isdir(base):
for i in xt.executables_in(base): for i in executables_in(base):
if i.startswith(cmd): if i.startswith(cmd):
yield RichCompletion(os.path.join(base, i)) yield RichCompletion(os.path.join(base, i))

View file

@ -54,7 +54,6 @@ from xonsh.tools import (
dynamic_cwd_tuple_to_str, dynamic_cwd_tuple_to_str,
ensure_string, ensure_string,
env_path_to_str, env_path_to_str,
executables_in,
history_tuple_to_str, history_tuple_to_str,
intensify_colors_on_win_setter, intensify_colors_on_win_setter,
is_bool, is_bool,
@ -2547,22 +2546,13 @@ class InternalEnvironDict(ChainMap):
local.update(new_local) local.update(new_local)
def _yield_executables(directory, name):
if ON_WINDOWS:
base_name, ext = os.path.splitext(name.lower())
for fname in executables_in(directory):
fbase, fext = os.path.splitext(fname.lower())
if base_name == fbase and (len(ext) == 0 or ext == fext):
yield os.path.join(directory, fname)
else:
for x in executables_in(directory):
if x == name:
yield os.path.join(directory, name)
return
def locate_binary(name): def locate_binary(name):
"""Locates an executable on the file system.""" """Locates an executable on the file system.
NOT RECOMMENDED because ``commands_cache.locate_binary`` contains ``update_cache``
with scanning all files in ``$PATH``. First of all take a look into ``xonsh.specs.executables``
for more fast implementation the locate operation.
"""
return XSH.commands_cache.locate_binary(name) return XSH.commands_cache.locate_binary(name)

View file

@ -1,3 +1,6 @@
from itertools import filterfalse
def as_iterable(iterable_or_scalar): def as_iterable(iterable_or_scalar):
"""Utility for converting an object to an iterable. """Utility for converting an object to an iterable.
Parameters Parameters
@ -34,3 +37,26 @@ def as_iterable(iterable_or_scalar):
return iterable_or_scalar return iterable_or_scalar
else: else:
return (iterable_or_scalar,) return (iterable_or_scalar,)
def unique_everseen(iterable, key=None):
"""Yield unique elements, preserving order. Remember all elements ever seen.
```
unique_everseen('AAAABBBCCDAABBB') A B C D
unique_everseen('ABBcCAD', str.casefold) A B c D
```
Source code: https://docs.python.org/3/library/itertools.html#itertools-recipes
"""
seen = set()
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen.add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen.add(k)
yield element

103
xonsh/procs/executables.py Normal file
View file

@ -0,0 +1,103 @@
"""Interfaces to locate executable files on file system."""
import itertools
import os
from pathlib import Path
from xonsh.built_ins import XSH
from xonsh.lib.itertools import unique_everseen
from xonsh.platform import ON_WINDOWS
def get_possible_names(name, env=None):
"""Expand name to all possible variants based on `PATHEXT`.
PATHEXT is a Windows convention containing extensions to be
considered when searching for an executable file.
Conserves order of any extensions found and gives precedence
to the bare name.
"""
env = env if env is not None else XSH.env
env_pathext = env.get("PATHEXT", [])
if not env_pathext:
return [name]
upper = name.upper() == name
extensions = [""] + env_pathext
return [name + (ext.upper() if upper else ext.lower()) for ext in extensions]
def clear_paths(paths):
"""Remove duplicates and nonexistent directories from paths."""
return filter(os.path.isdir, unique_everseen(map(os.path.realpath, paths)))
def get_paths(env=None):
"""Return tuple with deduplicated and existent paths from ``$PATH``."""
env = env if env is not None else XSH.env
return tuple(reversed(tuple(clear_paths(env.get("PATH") or []))))
def is_executable_in_windows(filepath, env=None):
"""Check the file is executable in Windows."""
filepath = Path(filepath)
try:
try:
if not filepath.is_file():
return False
except OSError:
return False
env = env if env is not None else XSH.env
return any(s.lower() == filepath.suffix.lower() for s in env.get("PATHEXT", []))
except FileNotFoundError:
# On Windows, there's no guarantee for the directory to really
# exist even if isdir returns True. This may happen for instance
# if the path contains trailing spaces.
return False
def is_executable_in_posix(filepath):
"""Check the file is executable in POSIX."""
try:
return filepath.is_file() and os.access(filepath, os.X_OK)
except OSError:
# broken Symlink are neither dir not files
pass
return False
is_executable = is_executable_in_windows if ON_WINDOWS else is_executable_in_posix
def locate_executable(name, env=None):
"""Search executable binary name in ``$PATH`` and return full path."""
return locate_file(name, env=env, check_executable=True, use_pathext=True)
def locate_file(name, env=None, check_executable=False, use_pathext=False):
"""Search file name in ``$PATH`` and return full path.
Compromise. There is no way to get case sensitive file name without listing all files.
If the file name is ``CaMeL.exe`` and we found that ``camel.EXE`` exists there is no way
to get back the case sensitive name. We don't want to read the list of files in all ``$PATH``
directories because of performance reasons. So we're ok to get existent
but case insensitive (or different) result from resolver.
May be in the future file systems as well as Python Path will be smarter to get the case sensitive name.
The task for reading and returning case sensitive filename we give to completer in interactive mode
with ``commands_cache``.
"""
env = env if env is not None else XSH.env
env_path = env.get("PATH", [])
paths = tuple(reversed(tuple(clear_paths(env_path))))
possible_names = get_possible_names(name, env) if use_pathext else [name]
for path, possible_name in itertools.product(paths, possible_names):
filepath = Path(path) / possible_name
try:
if check_executable and not is_executable(filepath):
continue
return str(filepath)
except PermissionError:
return

View file

@ -12,13 +12,13 @@ import stat
import subprocess import subprocess
import sys import sys
import xonsh.environ as xenv
import xonsh.lib.lazyasd as xl import xonsh.lib.lazyasd as xl
import xonsh.lib.lazyimps as xli import xonsh.lib.lazyimps as xli
import xonsh.platform as xp import xonsh.platform as xp
import xonsh.procs.jobs as xj import xonsh.procs.jobs as xj
import xonsh.tools as xt import xonsh.tools as xt
from xonsh.built_ins import XSH from xonsh.built_ins import XSH
from xonsh.procs.executables import locate_executable
from xonsh.procs.pipelines import ( from xonsh.procs.pipelines import (
STDOUT_CAPTURE_KINDS, STDOUT_CAPTURE_KINDS,
CommandPipeline, CommandPipeline,
@ -734,13 +734,13 @@ class SubprocSpec:
alias = self.alias alias = self.alias
if alias is None: if alias is None:
cmd0 = self.cmd[0] cmd0 = self.cmd[0]
binary_loc = xenv.locate_binary(cmd0) binary_loc = locate_executable(cmd0)
if binary_loc == cmd0 and cmd0 in self.alias_stack: if binary_loc is None and cmd0 and cmd0 in self.alias_stack:
raise Exception(f'Recursive calls to "{cmd0}" alias.') raise Exception(f'Recursive calls to "{cmd0}" alias.')
elif callable(alias): elif callable(alias):
binary_loc = None binary_loc = None
else: else:
binary_loc = xenv.locate_binary(alias[0]) binary_loc = locate_executable(alias[0])
self.binary_loc = binary_loc self.binary_loc = binary_loc
def resolve_auto_cd(self): def resolve_auto_cd(self):

View file

@ -13,6 +13,7 @@ import threading
import xonsh.tools as xt import xonsh.tools as xt
from xonsh.built_ins import XSH from xonsh.built_ins import XSH
from xonsh.lib.lazyasd import LazyObject from xonsh.lib.lazyasd import LazyObject
from xonsh.procs.executables import locate_executable
RE_REMOVE_ANSI = LazyObject( RE_REMOVE_ANSI = LazyObject(
lambda: re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]"), lambda: re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]"),
@ -169,11 +170,7 @@ def _first_branch_timeout_message():
def _vc_has(binary): def _vc_has(binary):
"""This allows us to locate binaries after git only if necessary""" """This allows us to locate binaries after git only if necessary"""
cmds = XSH.commands_cache return bool(locate_executable(binary))
if cmds.is_empty():
return bool(cmds.locate_binary(binary, ignore_alias=True))
else:
return bool(cmds.lazy_locate_binary(binary, ignore_alias=True))
def current_branch(): def current_branch():

View file

@ -38,7 +38,6 @@ from xonsh.color_tools import (
make_palette, make_palette,
warn_deprecated_no_color, warn_deprecated_no_color,
) )
from xonsh.commands_cache import CommandsCache
from xonsh.events import events from xonsh.events import events
from xonsh.lib.lazyasd import LazyDict, LazyObject, lazyobject from xonsh.lib.lazyasd import LazyDict, LazyObject, lazyobject
from xonsh.lib.lazyimps import html, os_listxattr, terminal256 from xonsh.lib.lazyimps import html, os_listxattr, terminal256
@ -48,6 +47,7 @@ from xonsh.platform import (
pygments_version_info, pygments_version_info,
win_ansi_support, win_ansi_support,
) )
from xonsh.procs.executables import locate_executable
from xonsh.pygments_cache import add_custom_style, get_style_by_name from xonsh.pygments_cache import add_custom_style, get_style_by_name
from xonsh.style_tools import DEFAULT_STYLE_DICT, norm_name from xonsh.style_tools import DEFAULT_STYLE_DICT, norm_name
from xonsh.tools import ( from xonsh.tools import (
@ -1600,7 +1600,7 @@ def _command_is_valid(cmd):
cmd_abspath = os.path.abspath(os.path.expanduser(cmd)) cmd_abspath = os.path.abspath(os.path.expanduser(cmd))
except OSError: except OSError:
return False return False
return (cmd in XSH.commands_cache and not iskeyword(cmd)) or ( return ((cmd in XSH.aliases or locate_executable(cmd)) and not iskeyword(cmd)) or (
os.path.isfile(cmd_abspath) and os.access(cmd_abspath, os.X_OK) os.path.isfile(cmd_abspath) and os.access(cmd_abspath, os.X_OK)
) )
@ -1649,15 +1649,12 @@ class XonshLexer(Python3Lexer):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# If the lexer is loaded as a pygment plugin, we have to mock # If the lexer is loaded as a pygment plugin, we have to mock
# __xonsh__.env and __xonsh__.commands_cache # __xonsh__.env
if getattr(XSH, "env", None) is None: if getattr(XSH, "env", None) is None:
XSH.env = {} XSH.env = {}
if ON_WINDOWS: if ON_WINDOWS:
pathext = os_environ.get("PATHEXT", [".EXE", ".BAT", ".CMD"]) pathext = os_environ.get("PATHEXT", [".EXE", ".BAT", ".CMD"])
XSH.env["PATHEXT"] = pathext.split(os.pathsep) XSH.env["PATHEXT"] = pathext.split(os.pathsep)
if getattr(XSH, "commands_cache", None) is None:
XSH.commands_cache = CommandsCache(XSH.env)
_ = XSH.commands_cache.all_commands # NOQA
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
tokens = { tokens = {

View file

@ -132,8 +132,8 @@ def mock_executables_in(xession, tmp_path, monkeypatch):
@pytest.fixture @pytest.fixture
def patch_locate_binary(monkeypatch): def patch_locate_binary(monkeypatch):
def locate_binary(self, name): def locate_binary(self, name, *args):
return os.path.join(os.path.dirname(__file__), "bin", name) return str(Path(__file__).parent.parent.parent / "tests" / "bin" / name)
def factory(cc: commands_cache.CommandsCache): def factory(cc: commands_cache.CommandsCache):
monkeypatch.setattr(cc, "locate_binary", types.MethodType(locate_binary, cc)) monkeypatch.setattr(cc, "locate_binary", types.MethodType(locate_binary, cc))

View file

@ -822,62 +822,6 @@ class redirect_stderr(_RedirectStream):
_stream = "stderr" _stream = "stderr"
def _yield_accessible_unix_file_names(path):
"""yield file names of executable files in path."""
if not os.path.exists(path):
return
for file_ in os.scandir(path):
try:
if file_.is_file() and os.access(file_.path, os.X_OK):
yield file_.name
except OSError:
# broken Symlink are neither dir not files
pass
def _executables_in_posix(path):
if not os.path.exists(path):
return
else:
yield from _yield_accessible_unix_file_names(path)
def _executables_in_windows(path):
if not os.path.isdir(path):
return
extensions = xsh.env["PATHEXT"]
try:
for x in os.scandir(path):
try:
is_file = x.is_file()
except OSError:
continue
if is_file:
fname = x.name
else:
continue
base_name, ext = os.path.splitext(fname)
if ext.upper() in extensions:
yield fname
except FileNotFoundError:
# On Windows, there's no guarantee for the directory to really
# exist even if isdir returns True. This may happen for instance
# if the path contains trailing spaces.
return
def executables_in(path) -> tp.Iterable[str]:
"""Returns a generator of files in path that the user could execute."""
if ON_WINDOWS:
func = _executables_in_windows
else:
func = _executables_in_posix
try:
yield from func(path)
except PermissionError:
return
def debian_command_not_found(cmd): def debian_command_not_found(cmd):
"""Uses the debian/ubuntu command-not-found utility to suggest packages for a """Uses the debian/ubuntu command-not-found utility to suggest packages for a
command that cannot currently be found. command that cannot currently be found.