Moving old xontribs to repositories (#5055)

* Update and rename README to README.rst

* Create hello_world.py

* Update README.rst

* init

* remove bashisms

* Transfer abbrevs to xontrib-abbrevs

* Transfer free-cwd to xontrib-free-cwd

* news

* black

* Transfer fish-completer to xonsh/xontrib-fish-completer

* Transfer vox to xonsh/xontrib-vox

* Transfer pdb, xog to xonsh/xontrib-debug-tools

* remove hello_world

* fix tests

* black

* fix whitespaces

* fix readme

* Update python_virtual_environments.rst

* Update README.rst

* Update xontribs_transfer.rst

---------

Co-authored-by: a <1@1.1>
This commit is contained in:
Andy Kipp 2023-03-17 13:25:22 +06:00 committed by GitHub
parent 868fff93c9
commit 042487745a
Failed to generate hash of commit
25 changed files with 52 additions and 2338 deletions

View file

@ -58,9 +58,12 @@ And visit https://xon.sh for more information:
Extensions Extensions
********** **********
- `Core extensions (xontribs) <https://xon.sh/api/_autosummary/xontribs/xontrib.html>`_
- `External extensions on Github <https://github.com/topics/xontrib>`_ Xonsh has the certain term for extensions and additional materials - xontrib - the short version of "contribution" word.
- `List of awesome xontribs <https://github.com/xonsh/awesome-xontribs>`_
- `Xontribs on Github <https://github.com/topics/xontrib>`_
- `Awesome xontribs <https://github.com/xonsh/awesome-xontribs>`_
- `Core xontribs <https://xon.sh/api/_autosummary/xontribs/xontrib.html>`_
- `Create a xontrib step by step from template <https://github.com/xonsh/xontrib-template>`_ - `Create a xontrib step by step from template <https://github.com/xonsh/xontrib-template>`_
Projects that use xonsh or compatible Projects that use xonsh or compatible

View file

@ -148,5 +148,5 @@ to set :ref:`$XONSH_TRACE_SUBPROC <xonsh_trace_subproc>` to ``True``:
TRACE SUBPROC: (['echo', 'hello\n', 'world'], '|', ['grep', 'hello']) TRACE SUBPROC: (['echo', 'hello\n', 'world'], '|', ['grep', 'hello'])
If after time you still try to type ``export``, ``unset`` or ``!!`` commands If after time you still try to type ``export``, ``unset`` or ``!!`` commands
there are the `bashisms <https://xon.sh/xontribs.html#bashisms>`_ there are the `bashisms <https://github.com/xonsh/xontrib-bashisms>`_
and `sh <https://xon.sh/xontribs.html#sh>`_ xontribs. and `sh <https://github.com/anki-code/xontrib-sh>`_ xontribs.

View file

@ -197,6 +197,10 @@ works by hooking the prompt to reset the current working directory to the root
drive folder whenever the shell is idle. It only works with the prompt-toolkit drive folder whenever the shell is idle. It only works with the prompt-toolkit
back-end. To enable that behaviour run the following: back-end. To enable that behaviour run the following:
.. code-block:: xonshcon
>>> xpip install xontrib-free-cwd
Add this line to your ``~/.xonshrc`` file to have it always enabled. Add this line to your ``~/.xonshrc`` file to have it always enabled.
.. code-block:: xonshcon .. code-block:: xonshcon

View file

@ -8,7 +8,9 @@ Python Virtual Environments
The usual tools for creating Python virtual environments—``venv``, ``virtualenv``, ``pew``—don't play well with xonsh. We won't dig deeper into why it is so, but the general gist is that these tools are hacky and hard-coded for bash, zsh, and other mainstream shells. The usual tools for creating Python virtual environments—``venv``, ``virtualenv``, ``pew``—don't play well with xonsh. We won't dig deeper into why it is so, but the general gist is that these tools are hacky and hard-coded for bash, zsh, and other mainstream shells.
Luckily, xonsh ships with its own virtual environments manager called **Vox**. Luckily, xonsh has its own virtual environments manager called **Vox**. Run to install Vox::
$ xpip install xontrib-vox
Vox Vox
=== ===

View file

@ -0,0 +1,29 @@
**Added:**
* <news item>
**Changed:**
* ``abbrevs`` xontrib transferred to `xontrib-abbrevs <https://github.com/xonsh/xontrib-abbrevs>`_.
* ``bashisms`` xontrib transferred to `xontrib-bashisms <https://github.com/xonsh/xontrib-bashisms>`_.
* ``free_cwd`` xontrib transferred to `xontrib-free-cwd <https://github.com/xonsh/xontrib-free-cwd>`_.
* ``whole_word_jumping`` xontrib transferred to `xontrib-whole-word-jumping <https://github.com/xonsh/xontrib-whole-word-jumping>`_.
* ``fish_completer`` xontrib transferred to `xontrib-fish-completer <https://github.com/xonsh/xontrib-fish-completer>`_.
* ``vox``, ``autovox``, ``voxapi`` xontribs transferred to `xontrib-vox <https://github.com/xonsh/xontrib-vox>`_.
* ``pdb``, ``xog`` xontribs transferred to `xontrib-debug-tools <https://github.com/xonsh/xontrib-debug-tools>`_.
**Deprecated:**
* <news item>
**Removed:**
* <news item>
**Fixed:**
* Fixed xontrib-jupyter to work in JupyterLab and terminal-based `Euporie <https://github.com/joouha/euporie>`_ environment.
**Security:**
* <news item>

View file

@ -96,7 +96,6 @@ per-file-ignores =
xonsh/tokenize.py:F821 F841, xonsh/tokenize.py:F821 F841,
xonsh/tools.py:E731 E305, xonsh/tools.py:E731 E305,
xonsh/xonfig.py:E731, xonsh/xonfig.py:E731,
xontrib/vox.py:F821,
# remove these later # remove these later
xonsh/color_tools.py:E305 xonsh/color_tools.py:E305
xonsh/completers/_aliases.py:E305, xonsh/completers/_aliases.py:E305,

View file

@ -38,9 +38,6 @@ def test_xonfig(args, prefix, exp, xsh_with_aliases, monkeypatch, check_complete
None, None,
{ {
# the list may vary wrt the env. so testing only part of the coreutils. # the list may vary wrt the env. so testing only part of the coreutils.
"abbrevs",
"pdb",
"bashisms",
"coreutils", "coreutils",
}, },
), ),

View file

@ -1,51 +0,0 @@
"""Tests bashisms xontrib."""
import sys
import pytest
@pytest.fixture(name="bash_preproc", scope="module")
def _bash_preproc():
from xontrib.bashisms import bash_preproc
yield bash_preproc
del sys.modules["xontrib.bashisms"]
@pytest.mark.parametrize(
"history, inp, exp",
[
# No history:
([], "!!", ""),
([], "!$", ""),
([], "!^", ""),
([], "!*", ""),
([], "!echo", ""),
# No substitution:
(["aa 1 2", "ab 3 4"], "ls", "ls"),
(["aa 1 2", "ab 3 4"], "x = 42", "x = 42"),
(["aa 1 2", "ab 3 4"], "!", "!"),
# Bang command only:
(["aa 1 2", "ab 3 4"], "!!", "ab 3 4"),
(["aa 1 2", "ab 3 4"], "!$", "4"),
(["aa 1 2", "ab 3 4"], "!^", "ab"),
(["aa 1 2", "ab 3 4"], "!*", "3 4"),
(["aa 1 2", "ab 3 4"], "!a", "ab 3 4"),
(["aa 1 2", "ab 3 4"], "!aa", "aa 1 2"),
(["aa 1 2", "ab 3 4"], "!ab", "ab 3 4"),
# Bang command with others:
(["aa 1 2", "ab 3 4"], "echo !! >log", "echo ab 3 4 >log"),
(["aa 1 2", "ab 3 4"], "echo !$ >log", "echo 4 >log"),
(["aa 1 2", "ab 3 4"], "echo !^ >log", "echo ab >log"),
(["aa 1 2", "ab 3 4"], "echo !* >log", "echo 3 4 >log"),
(["aa 1 2", "ab 3 4"], "echo !a >log", "echo ab 3 4 >log"),
(["aa 1 2", "ab 3 4"], "echo !aa >log", "echo aa 1 2 >log"),
(["aa 1 2", "ab 3 4"], "echo !ab >log", "echo ab 3 4 >log"),
],
)
def test_preproc(history, inp, exp, xession, bash_preproc):
"""Test the bash preprocessor."""
xession.history.inps = history
obs = bash_preproc(inp)
assert exp == obs

View file

@ -1,470 +0,0 @@
"""Vox tests"""
import io
import os
import pathlib
import stat
import subprocess as sp
import sys
import types
from typing import TYPE_CHECKING
import pytest
from py.path import local
from xonsh.platform import ON_WINDOWS
from xonsh.pytest.tools import skip_if_on_conda, skip_if_on_msys
from xontrib.voxapi import Vox, _get_vox_default_interpreter
if TYPE_CHECKING:
from pytest_subprocess import FakeProcess
from xontrib.vox import VoxHandler
@pytest.fixture
def venv_home(tmpdir, xession):
"""Path where VENVs are created"""
home = tmpdir / "venvs"
home.ensure_dir()
# Set up an isolated venv home
xession.env["VIRTUALENV_HOME"] = str(home)
return home
@pytest.fixture
def venv_proc(fake_process: "FakeProcess", venv_home):
def version_handle(process):
ver = str(sys.version).split()[0]
process.stdout.write(f"Python {ver}")
def venv_handle(process):
env_path = local(process.args[3])
(env_path / "lib").ensure_dir()
bin_path = env_path / ("Scripts" if ON_WINDOWS else "bin")
(bin_path / "python").write("", ensure=True)
(bin_path / "python.exe").write("", ensure=True)
for file in bin_path.listdir():
st = os.stat(str(file))
os.chmod(str(file), st.st_mode | stat.S_IEXEC)
for pip_name in ["pip", "pip.exe"]:
fake_process.register(
[str(bin_path / pip_name), "freeze", fake_process.any()], stdout=""
)
# will be used by `vox runin`
fake_process.register(
[pip_name, "--version"],
stdout=f"pip 22.0.4 from {env_path}/lib/python3.10/site-packages/pip (python 3.10)",
)
fake_process.keep_last_process(True)
return env_path
def get_interpreters():
interpreter = _get_vox_default_interpreter()
yield interpreter
if sys.executable != interpreter:
yield sys.executable
for cmd in get_interpreters():
fake_process.register([cmd, "--version"], callback=version_handle)
venv = (cmd, "-m", "venv")
fake_process.register([*venv, fake_process.any(min=1)], callback=venv_handle)
fake_process.keep_last_process(True)
return fake_process
@pytest.fixture
def vox(xession, load_xontrib, venv_proc) -> "VoxHandler":
"""vox Alias function"""
# Set up enough environment for xonsh to function
xession.env["PWD"] = os.getcwd()
xession.env["DIRSTACK_SIZE"] = 10
xession.env["PATH"] = []
xession.env["XONSH_SHOW_TRACEBACK"] = True
load_xontrib("vox")
vox = xession.aliases["vox"]
return vox
@pytest.fixture
def record_events(xession):
class Listener:
def __init__(self):
self.last = None
def listener(self, name):
def _wrapper(**kwargs):
self.last = (name,) + tuple(kwargs.values())
return _wrapper
def __call__(self, *events: str):
for name in events:
event = getattr(xession.builtins.events, name)
event(self.listener(name))
yield Listener()
def test_vox_flow(xession, vox, record_events, venv_home):
"""
Creates a virtual environment, gets it, enumerates it, and then deletes it.
"""
record_events(
"vox_on_create", "vox_on_delete", "vox_on_activate", "vox_on_deactivate"
)
vox(["create", "spam"])
assert stat.S_ISDIR(venv_home.join("spam").stat().mode)
assert record_events.last == ("vox_on_create", "spam")
ve = vox.vox["spam"]
assert ve.env == str(venv_home.join("spam"))
assert os.path.isdir(ve.bin)
assert "spam" in vox.vox
assert "spam" in list(vox.vox)
# activate
vox(["activate", "spam"])
assert xession.env["VIRTUAL_ENV"] == vox.vox["spam"].env
assert record_events.last == ("vox_on_activate", "spam", str(ve.env))
out = io.StringIO()
# info
vox(["info"], stdout=out)
assert "spam" in out.getvalue()
out.seek(0)
# list
vox(["list"], stdout=out)
print(out.getvalue())
assert "spam" in out.getvalue()
out.seek(0)
# wipe
vox(["wipe"], stdout=out)
print(out.getvalue())
assert "Nothing to remove" in out.getvalue()
out.seek(0)
# deactivate
vox(["deactivate"])
assert "VIRTUAL_ENV" not in xession.env
assert record_events.last == ("vox_on_deactivate", "spam", str(ve.env))
# runin
vox(["runin", "spam", "pip", "--version"], stdout=out)
print(out.getvalue())
assert "spam" in out.getvalue()
out.seek(0)
# removal
vox(["rm", "spam", "--force"])
assert not venv_home.join("spam").check()
assert record_events.last == ("vox_on_delete", "spam")
def test_activate_non_vox_venv(xession, vox, record_events, venv_proc, venv_home):
"""
Create a virtual environment using Python's built-in venv module
(not in VIRTUALENV_HOME) and verify that vox can activate it correctly.
"""
xession.env["PATH"] = []
record_events("vox_on_activate", "vox_on_deactivate")
with venv_home.as_cwd():
venv_dirname = "venv"
sp.run([sys.executable, "-m", "venv", venv_dirname])
vox(["activate", venv_dirname])
vxv = vox.vox[venv_dirname]
env = xession.env
assert os.path.isabs(vxv.bin)
assert env["PATH"][0] == vxv.bin
assert os.path.isabs(vxv.env)
assert env["VIRTUAL_ENV"] == vxv.env
assert record_events.last == (
"vox_on_activate",
venv_dirname,
str(pathlib.Path(str(venv_home)) / venv_dirname),
)
vox(["deactivate"])
assert not env["PATH"]
assert "VIRTUAL_ENV" not in env
assert record_events.last == (
"vox_on_deactivate",
venv_dirname,
str(pathlib.Path(str(venv_home)) / venv_dirname),
)
@skip_if_on_msys
@skip_if_on_conda
def test_path(xession, vox, a_venv):
"""
Test to make sure Vox properly activates and deactivates by examining $PATH
"""
oldpath = list(xession.env["PATH"])
vox(["activate", a_venv.basename])
assert oldpath != xession.env["PATH"]
vox.deactivate()
assert oldpath == xession.env["PATH"]
def test_crud_subdir(xession, venv_home, venv_proc):
"""
Creates a virtual environment, gets it, enumerates it, and then deletes it.
"""
vox = Vox(force_removals=True)
vox.create("spam/eggs")
assert stat.S_ISDIR(venv_home.join("spam", "eggs").stat().mode)
ve = vox["spam/eggs"]
assert ve.env == str(venv_home.join("spam", "eggs"))
assert os.path.isdir(ve.bin)
assert "spam/eggs" in vox
assert "spam" not in vox
# assert 'spam/eggs' in list(vox) # This is NOT true on Windows
assert "spam" not in list(vox)
del vox["spam/eggs"]
assert not venv_home.join("spam", "eggs").check()
def test_crud_path(xession, tmpdir, venv_proc):
"""
Creates a virtual environment, gets it, enumerates it, and then deletes it.
"""
tmp = str(tmpdir)
vox = Vox(force_removals=True)
vox.create(tmp)
assert stat.S_ISDIR(tmpdir.join("lib").stat().mode)
ve = vox[tmp]
assert ve.env == str(tmp)
assert os.path.isdir(ve.bin)
del vox[tmp]
assert not tmpdir.check()
@skip_if_on_msys
@skip_if_on_conda
def test_reserved_names(xession, tmpdir):
"""
Tests that reserved words are disallowed.
"""
xession.env["VIRTUALENV_HOME"] = str(tmpdir)
vox = Vox()
with pytest.raises(ValueError):
if ON_WINDOWS:
vox.create("Scripts")
else:
vox.create("bin")
with pytest.raises(ValueError):
if ON_WINDOWS:
vox.create("spameggs/Scripts")
else:
vox.create("spameggs/bin")
@pytest.mark.parametrize("registered", [False, True])
def test_autovox(xession, vox, a_venv, load_xontrib, registered):
"""
Tests that autovox works
"""
from xonsh.dirstack import popd, pushd
# Makes sure that event handlers are registered
load_xontrib("autovox")
env_name = a_venv.basename
env_path = str(a_venv)
# init properly
assert vox.parser
def policy(path, **_):
if str(path) == env_path:
return env_name
if registered:
xession.builtins.events.autovox_policy(policy)
pushd([env_path])
value = env_name if registered else None
assert vox.vox.active() == value
popd([])
@pytest.fixture
def create_venv(venv_proc):
vox = Vox(force_removals=True)
def wrapped(name):
vox.create(name)
return local(vox[name].env)
return wrapped
@pytest.fixture
def venvs(venv_home, create_venv):
"""Create virtualenv with names venv0, venv1"""
from xonsh.dirstack import popd, pushd
pushd([str(venv_home)])
yield [create_venv(f"venv{idx}") for idx in range(2)]
popd([])
@pytest.fixture
def a_venv(create_venv):
return create_venv("venv0")
@pytest.fixture
def patched_cmd_cache(xession, vox, monkeypatch):
cc = xession.commands_cache
def no_change(self, *_):
return False, False
monkeypatch.setattr(cc, "_check_changes", types.MethodType(no_change, cc))
bins = {path: (path, False) for path in _PY_BINS}
monkeypatch.setattr(cc, "_cmds_cache", bins)
yield cc
_VENV_NAMES = {"venv1", "venv1/", "venv0/", "venv0"}
if ON_WINDOWS:
_VENV_NAMES = {"venv1\\", "venv0\\", "venv0", "venv1"}
_HELP_OPTS = {
"-h",
"--help",
}
_PY_BINS = {"/bin/python2", "/bin/python3"}
_VOX_NEW_OPTS = {
"--ssp",
"--system-site-packages",
"--without-pip",
}.union(_HELP_OPTS)
if ON_WINDOWS:
_VOX_NEW_OPTS.add("--symlinks")
else:
_VOX_NEW_OPTS.add("--copies")
_VOX_RM_OPTS = {"-f", "--force"}.union(_HELP_OPTS)
class TestVoxCompletions:
@pytest.fixture
def check(self, check_completer, xession, vox):
def wrapped(cmd, positionals, options=None):
for k in list(xession.completers):
if k != "alias":
xession.completers.pop(k)
assert check_completer(cmd) == positionals
xession.env["ALIAS_COMPLETIONS_OPTIONS_BY_DEFAULT"] = True
if options:
assert check_completer(cmd) == positionals.union(options)
return wrapped
@pytest.mark.parametrize(
"args, positionals, opts",
[
(
"vox",
{
"delete",
"new",
"remove",
"del",
"workon",
"list",
"exit",
"info",
"ls",
"rm",
"deactivate",
"activate",
"enter",
"create",
"project-get",
"project-set",
"runin",
"runin-all",
"toggle-ssp",
"wipe",
"upgrade",
},
_HELP_OPTS,
),
(
"vox create",
set(),
_VOX_NEW_OPTS.union(
{
"-a",
"--activate",
"--wp",
"--without-pip",
"-p",
"--interpreter",
"-i",
"--install",
"-l",
"--link",
"--link-project",
"-r",
"--requirements",
"-t",
"--temp",
"--prompt",
}
),
),
("vox activate", _VENV_NAMES, _HELP_OPTS.union({"-n", "--no-cd"})),
("vox rm", _VENV_NAMES, _VOX_RM_OPTS),
("vox rm venv1", _VENV_NAMES, _VOX_RM_OPTS), # pos nargs: one or more
("vox rm venv1 venv2", _VENV_NAMES, _VOX_RM_OPTS), # pos nargs: two or more
],
)
def test_vox_commands(self, args, positionals, opts, check, venvs):
check(args, positionals, opts)
@pytest.mark.parametrize(
"args",
[
"vox new --activate --interpreter", # option after option
"vox new --interpreter", # "option: first
"vox new --activate env1 --interpreter", # option after pos
"vox new env1 --interpreter", # "option: at end"
"vox new env1 --interpreter=", # "option: at end with
],
)
def test_interpreter(self, check, args, patched_cmd_cache):
check(args, _PY_BINS)

View file

@ -161,4 +161,4 @@ hello = 'world'
def test_xontrib_list(xession, capsys): def test_xontrib_list(xession, capsys):
xontribs_main(["list"]) xontribs_main(["list"])
out, err = capsys.readouterr() out, err = capsys.readouterr()
assert "abbrevs" in out assert "coreutils" in out

View file

@ -1,55 +0,0 @@
"""test xontrib.abbrevs"""
import importlib
import sys
from prompt_toolkit.buffer import Buffer
from pytest import fixture, mark
from xonsh.xontribs import find_xontrib
@fixture
def _buffer():
def _wrapper(text):
buf = Buffer()
buf.insert_text(text)
return buf
return _wrapper
@fixture
def abbrevs_xontrib(monkeypatch, source_path):
monkeypatch.syspath_prepend(source_path)
spec = find_xontrib("abbrevs")
yield importlib.import_module(spec.name)
del sys.modules[spec.name]
ps_special_expand = (
lambda buffer, word: "procs" if buffer.text.startswith(word) else word
)
@mark.parametrize(
"abbr,val,expanded,cur",
[
("ps", "procs", "procs", None),
("ps", ps_special_expand, "procs", None),
("docker ps", ps_special_expand, "docker ps", None),
("kill", "kill <edit> -9", "kill -9", 5),
("pt", "poe<edit>try", "poetry", 3),
],
)
def test_gets_expanded(abbr, val, expanded, cur, abbrevs_xontrib, _buffer):
from xontrib import abbrevs
abbrevs.abbrevs[abbr] = val
abbrev = abbrevs.Abbreviation()
buf = _buffer(abbr)
abbrev.expand(buf)
assert buf.text == expanded
if cur is not None:
assert buf.cursor_position == cur

View file

@ -1,27 +0,0 @@
import pytest
@pytest.fixture
def fish_completer(tmpdir, xession, load_xontrib, fake_process):
"""vox Alias function"""
load_xontrib("fish_completer")
xession.env.update(
dict(
XONSH_DATA_DIR=str(tmpdir),
XONSH_SHOW_TRACEBACK=True,
)
)
fake_process.register_subprocess(
command=["fish", fake_process.any()],
# completion for "git chec"
stdout=b"""\
cherry-pick Apply the change introduced by an existing commit
checkout Checkout and switch to a branch""",
)
return fake_process
def test_fish_completer(fish_completer, check_completer):
assert check_completer("git", prefix="chec") == {"checkout"}

View file

@ -1,5 +1,4 @@
import cgi import cgi
import inspect
import sys import sys
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -356,51 +355,3 @@ class AliasesPage(Routes):
def get(self): def get(self):
yield t.div("table-responsive")[self.get_table()] yield t.div("table-responsive")[self.get_table()]
class AbbrevsPage(Routes):
path = "/abbrevs"
mod_name = XontribsPage.mod_name("abbrevs")
def __init__(self, **kwargs):
super().__init__(**kwargs)
# lazy import as to not load by accident
from xontrib.abbrevs import abbrevs # type: ignore
self.abbrevs: "dict[str, str]" = abbrevs
@classmethod
def nav_title(cls):
if cls.mod_name in sys.modules:
return "Abbrevs"
def get_header(self):
yield t.tr()[
t.th("text-right")["Name"],
t.th()["Value"],
]
def get_rows(self):
for name in sorted(self.abbrevs.keys()):
alias = self.abbrevs[name]
if callable(alias):
display = inspect.getsource(alias)
else:
display = str(alias)
# todo:
# 2. way to update
yield t.tr()[
t.td("text-right")[str(name)],
t.td()[t.p()[repr(display)],],
]
def get_table(self):
rows = list(self.get_rows())
yield t.tbl("table-sm", "table-striped")[
self.get_header(),
rows,
]
def get(self):
yield t.div("table-responsive")[self.get_table()]

View file

@ -1,5 +0,0 @@
xontrib is an implicit namespace package. DO NOT add an __init__.py file
to this directory.
Feel free to add both *.xsh and *.py files to the directory, they will be
installed and available.

7
xontrib/README.rst Normal file
View file

@ -0,0 +1,7 @@
Xontributions, or xontribs, are a set of tools and conventions for extending
the functionality of xonsh beyond what is provided by default. This allows
3rd party developers and users to improve their xonsh experience without
having to go through the xonsh development and release cycle.
This xontrib directory represents an implicit namespace package.
DO NOT add an __init__.py file to this directory.

View file

@ -1,142 +0,0 @@
"""
Command abbreviations.
This expands input words from `abbrevs` dictionary as you type.
Adds ``abbrevs`` dictionary to hold user-defined "command abbreviations.
The dictionary is searched as you type the matching words are replaced
at the command line by the corresponding dictionary contents once you hit
'Space' or 'Return' key.
For instance a frequently used command such as ``git status`` can be abbreviated to ``gst`` as follows::
$ xontrib load abbrevs
$ abbrevs['gst'] = 'git status'
$ gst # Once you hit <space> or <return> 'gst' gets expanded to 'git status'.
one can set a callback function that receives current buffer and word to customize the expanded word based on context
.. code-block:: python
$ abbrevs['ps'] = lambda buffer, word: "procs" if buffer.text.startswith(word) else word
It is also possible to set the cursor position after expansion with,
$ abbrevs['gp'] = "git push <edit> --force"
"""
import builtins
import typing as tp
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.filters import IsMultiline
from xonsh.built_ins import DynamicAccessProxy, XonshSession
from xonsh.tools import check_for_partial_string
__all__ = ()
if tp.TYPE_CHECKING:
class AbbrCallType(tp.Protocol):
def __call__(self, word: str, buffer: Buffer) -> str:
...
AbbrValType = tp.Union[str, AbbrCallType]
abbrevs: "dict[str, AbbrValType]" = dict()
class _LastExpanded(tp.NamedTuple):
word: str
expanded: str
class Abbreviation:
"""A container class to handle state related to abbreviating keywords"""
last_expanded: tp.Optional[_LastExpanded] = None
def expand(self, buffer: Buffer) -> bool:
"""expand the given abbr text. Return true if cursor position changed."""
if not abbrevs:
return False
document = buffer.document
word = document.get_word_before_cursor(WORD=True)
if word in abbrevs.keys():
partial = document.text[: document.cursor_position]
startix, endix, quote = check_for_partial_string(partial)
if startix is not None and endix is None:
return False
text = get_abbreviated(word, buffer)
buffer.delete_before_cursor(count=len(word))
buffer.insert_text(text)
self.last_expanded = _LastExpanded(word, text)
if EDIT_SYMBOL in text:
set_cursor_position(buffer, text)
return True
return False
def revert(self, buffer) -> bool:
if self.last_expanded is None:
return False
document = buffer.document
expansion = self.last_expanded.expanded + " "
if not document.text_before_cursor.endswith(expansion):
return False
buffer.delete_before_cursor(count=len(expansion))
buffer.insert_text(self.last_expanded.word)
self.last_expanded = None
return True
EDIT_SYMBOL = "<edit>"
def get_abbreviated(key: str, buffer) -> str:
abbr = abbrevs[key]
if callable(abbr):
text = abbr(buffer=buffer, word=key)
else:
text = abbr
return text
def set_cursor_position(buffer, expanded: str) -> None:
pos = expanded.rfind(EDIT_SYMBOL)
if pos == -1:
return
buffer.cursor_position = buffer.cursor_position - (len(expanded) - pos)
buffer.delete(len(EDIT_SYMBOL))
def _load_xontrib_(xsh: XonshSession, **_):
@xsh.builtins.events.on_ptk_create # type:ignore
def custom_keybindings(bindings, **kw):
from prompt_toolkit.filters import EmacsInsertMode, ViInsertMode
handler = bindings.add
insert_mode = ViInsertMode() | EmacsInsertMode()
abbrev = Abbreviation()
@handler(" ", filter=IsMultiline() & insert_mode)
def handle_space(event):
buffer = event.app.current_buffer
add_space = True
if not abbrev.revert(buffer):
position_changed = abbrev.expand(buffer)
if position_changed:
add_space = False
if add_space:
buffer.insert_text(" ")
# XSH.builtins is a namespace and extendable
xsh.builtins.abbrevs = abbrevs
proxy = DynamicAccessProxy("abbrevs", "__xonsh__.builtins.abbrevs")
builtins.abbrevs = proxy # type: ignore
return {"abbrevs": abbrevs}

View file

@ -1,108 +0,0 @@
"""
Manages automatic activation of virtual environments.
This coordinates multiple automatic vox policies and deals with some of the
mechanics of venv searching and chdir handling.
This provides no interface for end users.
Developers should look at XSH.builtins.events.autovox_policy
"""
import itertools
import warnings
from pathlib import Path
import xontrib.voxapi as voxapi
from xonsh.built_ins import XSH, XonshSession
__all__ = ()
def autovox_policy(path: "Path") -> "str|Path|None":
"""
Register a policy with autovox.
A policy is a function that takes a Path and returns the venv associated with it,
if any.
NOTE: The policy should only return a venv for this path exactly, not for
parent paths. Parent walking is handled by autovox so that all policies can
be queried at each level.
"""
class MultipleVenvsWarning(RuntimeWarning):
pass
def get_venv(vox, dirpath):
# Search up the directory tree until a venv is found, or none
for path in itertools.chain((dirpath,), dirpath.parents):
venvs = [
vox[p]
for p in XSH.builtins.events.autovox_policy.fire(path=path)
if p is not None and p in vox # Filter out venvs that don't exist
]
if len(venvs) == 0:
continue
else:
if len(venvs) > 1:
warnings.warn(
MultipleVenvsWarning(
"Found {numvenvs} venvs for {path}; using the first".format(
numvenvs=len(venvs), path=path
)
)
)
return venvs[0]
def check_for_new_venv(curdir, olddir):
vox = voxapi.Vox()
if olddir is ... or olddir is None:
try:
oldve = vox[...]
except KeyError:
oldve = None
else:
oldve = get_venv(vox, olddir)
newve = get_venv(vox, curdir)
if oldve != newve:
if newve is None:
vox.deactivate()
else:
vox.activate(newve.env)
# Core mechanism: Check for venv when the current directory changes
def cd_handler(newdir, olddir, **_):
check_for_new_venv(Path(newdir), Path(olddir))
# Recalculate when venvs are created or destroyed
def create_handler(**_):
check_for_new_venv(Path.cwd(), ...)
def destroy_handler(**_):
check_for_new_venv(Path.cwd(), ...)
# Initial activation before first prompt
def load_handler(**_):
check_for_new_venv(Path.cwd(), None)
def _load_xontrib_(xsh: XonshSession, **_):
xsh.builtins.events.register(autovox_policy)
xsh.builtins.events.on_chdir(cd_handler)
xsh.builtins.events.vox_on_create(create_handler)
xsh.builtins.events.vox_on_destroy(destroy_handler)
xsh.builtins.events.on_post_init(load_handler)

View file

@ -1,153 +0,0 @@
"""Bash-like interface extensions for xonsh.
Enables additional Bash-like syntax while at the command prompt.
For example, the ``!!`` syntax for running the previous command is now usable.
Note that these features are implemented as precommand events and
these additions do not affect the xonsh language when run as script.
That said, you might find them useful if you have strong muscle memory.
**Warning:** This xontrib may modify user command line input to implement its behavior.
To see the modifications as they are applied (in unified diffformat), please set ``$XONSH_DEBUG`` to ``2`` or higher.
The xontrib also adds commands: ``alias``, ``export``, ``unset``, ``set``, ``shopt``, ``complete``.
"""
import re
import shlex
import sys
from xonsh.built_ins import XSH, XonshSession
__all__ = ()
def _warn_not_supported(msg: str):
print(
f"""Not supported ``{msg}`` in xontrib bashisms.
PRs are welcome - https://github.com/xonsh/xonsh/blob/main/xontrib/bashisms.py""",
file=sys.stderr,
)
def bash_preproc(cmd, **kw):
bang_previous = {
"!": lambda x: x,
"$": lambda x: shlex.split(x)[-1],
"^": lambda x: shlex.split(x)[0],
"*": lambda x: " ".join(shlex.split(x)[1:]),
}
def replace_bang(m):
arg = m.group(1)
inputs = XSH.history.inps
# Dissect the previous command.
if arg in bang_previous:
try:
return bang_previous[arg](inputs[-1])
except IndexError:
print(f"xonsh: no history for '!{arg}'")
return ""
# Look back in history for a matching command.
else:
try:
return next(x for x in reversed(inputs) if x.startswith(arg))
except StopIteration:
print(f"xonsh: no previous commands match '!{arg}'")
return ""
return re.sub(r"!([!$^*]|[\w]+)", replace_bang, cmd)
def alias(args, stdin=None):
ret = 0
if args:
for arg in args:
if "=" in arg:
# shlex.split to remove quotes, e.g. "foo='echo hey'" into
# "foo=echo hey"
name, cmd = shlex.split(arg)[0].split("=", 1)
XSH.aliases[name] = shlex.split(cmd)
elif arg in XSH.aliases:
print(f"{arg}={XSH.aliases[arg]}")
else:
print(f"alias: {arg}: not found", file=sys.stderr)
ret = 1
else:
for alias, cmd in XSH.aliases.items():
print(f"{alias}={cmd}")
return ret
def _unset(args):
if not args:
print("Usage: unset ENV_VARIABLE", file=sys.stderr)
for v in args:
try:
XSH.env.pop(v)
except KeyError:
print(f"{v} not found", file=sys.stderr)
def _export(args):
if not args:
print("Usage: export ENV_VARIABLE=VALUE", file=sys.stderr)
for eq in args:
if "=" in eq:
name, val = shlex.split(eq)[0].split("=", 1)
XSH.env[name] = val
else:
print(f"{eq} equal sign not found", file=sys.stderr)
def _set(args):
arg = args[0]
if arg == "-e":
XSH.env["RAISE_SUBPROC_ERROR"] = True
elif arg == "+e":
XSH.env["RAISE_SUBPROC_ERROR"] = False
elif arg == "-x":
XSH.env["XONSH_TRACE_SUBPROC"] = True
elif arg == "+x":
XSH.env["XONSH_TRACE_SUBPROC"] = False
else:
_warn_not_supported(f"set {arg}")
def _shopt(args):
supported_shopt = ["DOTGLOB"]
args_len = len(args)
if args_len == 0:
for so in supported_shopt:
onoff = "on" if so in XSH.env and XSH.env[so] else "off"
print(f"dotglob\t{onoff}")
return
elif args_len < 2 or args[0] in ["-h", "--help"]:
print(f'Usage: shopt <-s|-u> <{"|".join(supported_shopt).lower()}>')
return
opt = args[0]
optname = args[1]
if opt == "-s" and optname == "dotglob":
XSH.env["DOTGLOB"] = True
elif opt == "-u" and optname == "dotglob":
XSH.env["DOTGLOB"] = False
else:
_warn_not_supported(f"shopt {args}")
def _load_xontrib_(xsh: XonshSession, **_):
xsh.builtins.events.on_transform_command(bash_preproc)
xsh.aliases.register(_unset)
xsh.aliases.register(_export)
xsh.aliases.register(_shopt)
xsh.aliases.register(_set)
xsh.aliases["complete"] = "completer list".split()
xsh.aliases["alias"] = alias
xsh.env["THREAD_SUBPROCS"] = False

View file

@ -1,30 +0,0 @@
"""Populate rich completions using fish and remove the default bash based completer"""
from xonsh.completers import completer
from xonsh.completers.tools import complete_from_sub_proc, contextual_command_completer
from xonsh.parsers.completion_context import CommandContext
@contextual_command_completer
def fish_proc_completer(ctx: CommandContext):
if not ctx.args:
return
line = ctx.text_before_cursor
script_lines = [
f"complete --no-files {ctx.command}", # switch off basic file completions for the executable
f"complete -C '{line}'",
]
return (
complete_from_sub_proc(
"fish",
"-c",
"; ".join(script_lines),
),
False,
)
def _load_xontrib_(**_):
completer.add_one_completer("fish", fish_proc_completer, "<bash")

View file

@ -1,106 +0,0 @@
"""Windows only xontrib, to release the lock on the current directory whenever the prompt is shown.
Enabling this will allow other programs or
Windows Explorer to delete or rename the current or parent
directories. Internally, it is accomplished by temporarily resetting
CWD to the root drive folder while waiting at the prompt. This only
works with the prompt_toolkit backend and can cause issues
if any extensions are enabled that hook the prompt and relies on
``os.getcwd()``.
"""
import functools
import os
from pathlib import Path
from xonsh.built_ins import XSH, XonshSession
from xonsh.platform import ON_CYGWIN, ON_MSYS, ON_WINDOWS
from xonsh.tools import print_exception
def _chdir_up(path):
"""Change directory to path or if path does not exist
the first valid parent.
"""
path = Path(path)
try:
os.chdir(path)
return str(path.absolute())
except (FileNotFoundError, NotADirectoryError):
path.resolve()
return _chdir_up(path.parent)
def _cwd_release_wrapper(func):
"""Decorator for Windows to wrap the prompt function and release
the process lock on the current directory while the prompt is
displayed. This works by temporarily setting
the workdir to the users home directory.
"""
env = XSH.env
if env.get("UPDATE_PROMPT_ON_KEYPRESS"):
return func if not hasattr(func, "_orgfunc") else func._orgfunc
if hasattr(func, "_orgfunc"):
# Already wrapped
return func
else:
@functools.wraps(func)
def wrapper(*args, **kwargs):
anchor = Path(os.getcwd()).anchor
os.chdir(anchor)
try:
out = func(*args, **kwargs)
finally:
try:
pwd = env.get("PWD", anchor)
os.chdir(pwd)
except (FileNotFoundError, NotADirectoryError):
print_exception()
newpath = _chdir_up(pwd)
XSH.env["PWD"] = newpath
raise KeyboardInterrupt
return out
wrapper._orgfunc = func
return wrapper
def _cwd_restore_wrapper(func):
"""Decorator for Windows which will temporary restore the true working
directory. Designed to wrap completer callbacks from the
prompt_toolkit or readline.
"""
env = XSH.env
if env.get("UPDATE_PROMPT_ON_KEYPRESS"):
return func if not hasattr(func, "_orgfunc") else func._orgfunc
if hasattr(func, "_orgfunc"):
# Already wrapped
return func
else:
@functools.wraps(func)
def wrapper(*args, **kwargs):
workdir = os.getcwd()
_chdir_up(env.get("PWD", workdir))
out = func(*args, **kwargs)
_chdir_up(workdir)
return out
wrapper._orgfunc = func
return wrapper
def setup_release_cwd_hook(prompter, history, completer, bindings, **kw):
if ON_WINDOWS and not ON_CYGWIN and not ON_MSYS:
prompter.prompt = _cwd_release_wrapper(prompter.prompt)
if completer.completer:
# Temporarily restore cwd for callbacks to the completer
completer.completer.complete = _cwd_restore_wrapper(
completer.completer.complete
)
def _load_xontrib_(xsh: XonshSession, **_):
xsh.builtins.events.on_ptk_create(setup_release_cwd_hook)

View file

@ -1,15 +0,0 @@
"""Simple built-in debugger. Runs pdb on reception of SIGUSR1 signal."""
import signal
from xonsh.built_ins import XonshSession
def handle_sigusr1(sig, frame):
print("\nSIGUSR1 signal received. Starting interactive debugger...", flush=True)
import pdb # noqa
pdb.Pdb().set_trace(frame) # noqa
def _load_xontrib_(xsh: XonshSession, **_):
signal.signal(signal.SIGUSR1, handle_sigusr1)

View file

@ -1,499 +0,0 @@
"""Python virtual environment manager for xonsh."""
import os.path
import subprocess
import tempfile
import typing as tp
from pathlib import Path
import xonsh.cli_utils as xcli
import xontrib.voxapi as voxapi
from xonsh.built_ins import XSH, XonshSession
from xonsh.dirstack import pushd_fn
from xonsh.platform import ON_WINDOWS
from xonsh.tools import XonshError
__all__ = ()
def venv_names_completer(command, alias: "VoxHandler", **_):
envs = alias.vox.keys()
from xonsh.completers.path import complete_dir
yield from envs
paths, _ = complete_dir(command)
yield from paths
def py_interpreter_path_completer(xsh, **_):
for _, (path, is_alias) in xsh.commands_cache.all_commands.items():
if not is_alias and ("/python" in path or "/pypy" in path):
yield path
_venv_option = xcli.Annotated[
tp.Optional[str],
xcli.Arg(metavar="ENV", nargs="?", completer=venv_names_completer),
]
class VoxHandler(xcli.ArgParserAlias):
"""Vox is a virtual environment manager for xonsh."""
def build(self):
"""lazily called during dispatch"""
self.vox = voxapi.Vox()
parser = self.create_parser(prog="vox")
parser.add_command(self.new, aliases=["create"])
parser.add_command(self.activate, aliases=["workon", "enter"])
parser.add_command(self.deactivate, aliases=["exit"])
parser.add_command(self.list, aliases=["ls"])
parser.add_command(self.remove, aliases=["rm", "delete", "del"])
parser.add_command(self.info)
parser.add_command(self.runin)
parser.add_command(self.runin_all)
parser.add_command(self.toggle_ssp)
parser.add_command(self.wipe)
parser.add_command(self.project_set)
parser.add_command(self.project_get)
parser.add_command(self.upgrade)
return parser
def hook_pre_add_argument(self, param: str, func, flags, kwargs):
if func.__name__ in {"new", "upgrade"}:
if ON_WINDOWS and param == "symlinks":
# copies by default on windows
kwargs["default"] = False
kwargs["action"] = "store_true"
kwargs["help"] = "Try to use symlinks rather than copies"
flags = ["--symlinks"]
return flags, kwargs
def hook_post_add_argument(self, action, param: str, **_):
if param == "interpreter":
action.completer = py_interpreter_path_completer
def new(
self,
name: xcli.Annotated[str, xcli.Arg(metavar="ENV")],
interpreter: "str|None" = None,
system_site_packages=False,
symlinks=True,
without_pip=False,
activate=False,
temporary=False,
packages: xcli.Annotated[tp.Sequence[str], xcli.Arg(nargs="*")] = (),
requirements: xcli.Annotated[tp.Sequence[str], xcli.Arg(action="append")] = (),
link_project_dir=False,
prompt: "str|None" = None,
):
"""Create a virtual environment in $VIRTUALENV_HOME with python3's ``venv``.
Parameters
----------
name : str
Virtual environment name
interpreter: -p, --interpreter
Python interpreter used to create the virtual environment.
Can be configured via the $VOX_DEFAULT_INTERPRETER environment variable.
system_site_packages : --system-site-packages, --ssp
If True, the system (global) site-packages dir is available to
created environments.
symlinks : --copies
Try to use copies rather than symlinks.
without_pip : --without-pip, --wp
Skips installing or upgrading pip in the virtual environment
activate : -a, --activate
Activate the newly created virtual environment.
temporary: -t, --temp
Create the virtualenv under a temporary directory.
packages: -i, --install
Install one or more packages (by repeating the option) after the environment is created using pip
requirements: -r, --requirements
The argument value is passed to ``pip -r`` to be installed.
link_project_dir: -l, --link, --link-project
Associate the current directory with the new environment.
prompt: --prompt
Provides an alternative prompt prefix for this environment.
"""
self.out("Creating environment...")
if temporary:
path = tempfile.mkdtemp(prefix=f"vox-env-{name}")
name = os.path.join(path, name)
self.vox.create(
name,
system_site_packages=system_site_packages,
symlinks=symlinks,
with_pip=(not without_pip),
interpreter=interpreter,
prompt=prompt,
)
if link_project_dir:
self.project_set(name)
if packages:
self.runin(name, ["pip", "install", *packages])
if requirements:
def _generate_args():
for req in requirements:
yield "-r"
yield req
self.runin(name, ["pip", "install"] + list(_generate_args()))
if activate:
self.activate(name)
self.out(f"Environment {name!r} created and activated.\n")
else:
self.out(
f'Environment {name!r} created. Activate it with "vox activate {name}".\n'
)
def activate(
self,
name: _venv_option = None,
no_cd=False,
):
"""Activate a virtual environment.
Parameters
----------
name
The environment to activate.
ENV can be either a name from the venvs shown by ``vox list``
or the path to an arbitrary venv
no_cd: -n, --no-cd
Do not change current working directory even if a project path is associated with ENV.
"""
if name is None:
return self.list()
try:
self.vox.activate(name)
except KeyError:
raise self.Error(
f'This environment doesn\'t exist. Create it with "vox new {name}"',
)
self.out(f'Activated "{name}".\n')
if not no_cd:
project_dir = self._get_project_dir(name)
if project_dir:
pushd_fn(project_dir)
def deactivate(self, remove=False, force=False):
"""Deactivate the active virtual environment.
Parameters
----------
remove: -r, --remove
Remove the virtual environment after leaving it.
force: -f, --force-removal
Remove the virtual environment without prompt
"""
if self.vox.active() is None:
raise self.Error(
'No environment currently active. Activate one with "vox activate".\n',
)
env_name = self.vox.deactivate()
if remove:
self.vox.force_removals = force
del self.vox[env_name]
self.out(f'Environment "{env_name}" deactivated and removed.\n')
else:
self.out(f'Environment "{env_name}" deactivated.\n')
def list(self):
"""List available virtual environments."""
try:
envs = sorted(self.vox.keys())
except PermissionError:
raise self.Error("No permissions on VIRTUALENV_HOME")
if not envs:
raise self.Error(
'No environments available. Create one with "vox new".\n',
)
self.out("Available environments:")
self.out("\n".join(envs))
def remove(
self,
names: xcli.Annotated[
tp.List[str],
xcli.Arg(metavar="ENV", nargs="+", completer=venv_names_completer),
],
force=False,
):
"""Remove virtual environments.
Parameters
----------
names
The environments to remove. ENV can be either a name from the venvs shown by vox
list or the path to an arbitrary venv
force : -f, --force
Delete virtualenv without prompt
"""
self.vox.force_removals = force
for name in names:
try:
del self.vox[name]
except voxapi.EnvironmentInUse:
raise self.Error(
f'The "{name}" environment is currently active. '
'In order to remove it, deactivate it first with "vox deactivate".\n',
)
except KeyError:
raise self.Error(f'"{name}" environment doesn\'t exist.\n')
else:
self.out(f'Environment "{name}" removed.')
self.out()
def _in_venv(self, env_dir: str, command: str, *args, **kwargs):
env = {**XSH.env.detype(), "VIRTUAL_ENV": env_dir}
bin_path = os.path.join(env_dir, self.vox.sub_dirs[0])
env["PATH"] = os.pathsep.join([bin_path, env["PATH"]])
for key in ("PYTHONHOME", "__PYVENV_LAUNCHER__"):
env.pop(key, None)
try:
return subprocess.check_call(
[command] + list(args), shell=bool(ON_WINDOWS), env=env, **kwargs
)
# need to have shell=True on windows, otherwise the PYTHONPATH
# won't inherit the PATH
except OSError as e:
if e.errno == 2:
raise self.Error(f"Unable to find {command}")
raise
def runin(
self,
venv: xcli.Annotated[
str,
xcli.Arg(completer=venv_names_completer),
],
args: xcli.Annotated[tp.Sequence[str], xcli.Arg(nargs="...")],
):
"""Run the command in the given environment
Parameters
----------
venv
The environment to run the command for
args
The actual command to run
Examples
--------
vox runin venv1 black --check-only
"""
env_dir = self._get_env_dir(venv)
if not args:
raise self.Error("No command is passed")
self._in_venv(env_dir, *args)
def runin_all(
self,
args: xcli.Annotated[tp.Sequence[str], xcli.Arg(nargs="...")],
):
"""Run the command in all environments found under $VIRTUALENV_HOME
Parameters
----------
args
The actual command to run with arguments
"""
errors = False
for env in self.vox:
self.out("\n%s:" % env)
try:
self.runin(env, *args)
except subprocess.CalledProcessError as e:
errors = True
self.err(e)
self.parser.exit(errors)
def _sitepackages_dir(self, venv_path: str):
env_python = self.vox.get_binary_path("python", venv_path)
if not os.path.exists(env_python):
raise self.Error("no virtualenv active")
return Path(
subprocess.check_output(
[
str(env_python),
"-c",
"import distutils; \
print(distutils.sysconfig.get_python_lib())",
]
).decode()
)
def _get_env_dir(self, venv=None):
venv = venv or ...
try:
env_dir = self.vox[venv].env
except KeyError:
# check whether the venv is a valid path to an environment
if (
isinstance(venv, str)
and os.path.exists(venv)
and os.path.exists(self.vox.get_binary_path("python", venv))
):
return venv
raise XonshError("No virtualenv is found")
return env_dir
def toggle_ssp(self):
"""Controls whether the active virtualenv will access the packages
in the global Python site-packages directory."""
# https://virtualenv.pypa.io/en/legacy/userguide.html#the-system-site-packages-option
env_dir = self._get_env_dir() # current
site = self._sitepackages_dir(env_dir)
ngsp_file = site.parent / "no-global-site-packages.txt"
if ngsp_file.exists():
ngsp_file.unlink()
self.out("Enabled global site-packages")
else:
with ngsp_file.open("w"):
self.out("Disabled global site-packages")
def project_set(
self,
venv: _venv_option = None,
project_path=None,
):
"""Bind an existing virtualenv to an existing project.
Parameters
----------
venv
Name of the virtualenv, while the default being currently active venv.
project_path
Path to the project, while the default being current directory.
"""
env_dir = self._get_env_dir(venv) # current
project = os.path.abspath(project_path or ".")
if not os.path.exists(env_dir):
raise self.Error(f"Environment '{env_dir}' doesn't exist.")
if not os.path.isdir(project):
raise self.Error(f"{project} does not exist")
project_file = self._get_project_file()
project_file.write_text(project)
def _get_project_file(
self,
venv=None,
):
env_dir = Path(self._get_env_dir(venv)) # current
return env_dir / ".project"
def _get_project_dir(self, venv=None):
project_file = self._get_project_file(venv)
if project_file.exists():
project_dir = project_file.read_text()
if os.path.exists(project_dir):
return project_dir
def project_get(self, venv: _venv_option = None):
"""Return a virtualenv's project directory.
Parameters
----------
venv
Name of the virtualenv under $VIRTUALENV_HOME, while default being currently active venv.
"""
project_dir = self._get_project_dir(venv)
if project_dir:
self.out(project_dir)
else:
project_file = self._get_project_file(venv)
raise self.Error(
f"Corrupted or outdated: {project_file}\nDirectory: {project_dir} doesn't exist."
)
def wipe(self, venv: _venv_option = None):
"""Remove all installed packages from the current (or supplied) env.
Parameters
----------
venv
name of the venv. Defaults to currently active venv
"""
env_dir = self._get_env_dir(venv)
pip_bin = self.vox.get_binary_path("pip", env_dir)
all_pkgs = set(
subprocess.check_output([pip_bin, "freeze", "--local"])
.decode()
.splitlines()
)
pkgs = {p for p in all_pkgs if len(p.split("==")) == 2}
ignored = sorted(all_pkgs - pkgs)
to_remove = {p.split("==")[0] for p in pkgs}
if to_remove:
self.out("Ignoring:\n %s" % "\n ".join(ignored))
self.out("Uninstalling packages:\n %s" % "\n ".join(to_remove))
return subprocess.run([pip_bin, "uninstall", "-y", *to_remove])
else:
self.out("Nothing to remove")
def info(self, venv: _venv_option = None):
"""Prints the path for the supplied env
Parameters
----------
venv
name of the venv
"""
self.out(self.vox[venv or ...])
def upgrade(
self,
name: _venv_option = None,
interpreter: "str|None" = None,
symlinks=True,
with_pip=False,
):
"""Upgrade the environment directory to use this version
of Python, assuming Python has been upgraded in-place.
WARNING: If a virtual environment was created with symlinks or without PIP, you must
specify these options again on upgrade.
Parameters
----------
name
Name or the path to the virtual environment
interpreter: -p, --interpreter
Python interpreter used to create the virtual environment.
Can be configured via the $VOX_DEFAULT_INTERPRETER environment variable.
symlinks : --copies
Try to use copies rather than symlinks.
with_pip : --without-pip, --wp
Skips installing or upgrading pip in the virtual environment
"""
venv = self.vox.upgrade(
name or ..., symlinks=symlinks, with_pip=with_pip, interpreter=interpreter
)
self.out(venv)
def _load_xontrib_(xsh: XonshSession, **_):
xsh.aliases["vox"] = VoxHandler(threadable=False)

View file

@ -1,450 +0,0 @@
"""
API for Vox, the Python virtual environment manager for xonsh.
Vox defines several events related to the life cycle of virtual environments:
* ``vox_on_create(env: str) -> None``
* ``vox_on_activate(env: str, path: pathlib.Path) -> None``
* ``vox_on_deactivate(env: str, path: pathlib.Path) -> None``
* ``vox_on_delete(env: str) -> None``
"""
import collections.abc
import logging
import os
import shutil
import subprocess as sp
import sys
import typing
from xonsh.built_ins import XSH
# This is because builtins aren't globally created during testing.
# FIXME: Is there a better way?
from xonsh.events import events
from xonsh.platform import ON_POSIX, ON_WINDOWS
events.doc(
"vox_on_create",
"""
vox_on_create(env: str) -> None
Fired after an environment is created.
""",
)
events.doc(
"vox_on_activate",
"""
vox_on_activate(env: str, path: pathlib.Path) -> None
Fired after an environment is activated.
""",
)
events.doc(
"vox_on_deactivate",
"""
vox_on_deactivate(env: str, path: pathlib.Path) -> None
Fired after an environment is deactivated.
""",
)
events.doc(
"vox_on_delete",
"""
vox_on_delete(env: str) -> None
Fired after an environment is deleted (through vox).
""",
)
class VirtualEnvironment(typing.NamedTuple):
env: str
bin: str
lib: str
inc: str
def _subdir_names():
"""
Gets the names of the special dirs in a venv.
This is not necessarily exhaustive of all the directories that could be in a venv, and there
may additional logic to get to useful places.
"""
if ON_WINDOWS:
return "Scripts", "Lib", "Include"
elif ON_POSIX:
return "bin", "lib", "include"
else:
raise OSError("This OS is not supported.")
def _mkvenv(env_dir):
"""
Constructs a VirtualEnvironment based on the given base path.
This only cares about the platform. No filesystem calls are made.
"""
env_dir = os.path.abspath(env_dir)
if ON_WINDOWS:
binname = os.path.join(env_dir, "Scripts")
incpath = os.path.join(env_dir, "Include")
libpath = os.path.join(env_dir, "Lib", "site-packages")
elif ON_POSIX:
binname = os.path.join(env_dir, "bin")
incpath = os.path.join(env_dir, "include")
libpath = os.path.join(
env_dir, "lib", "python%d.%d" % sys.version_info[:2], "site-packages"
)
else:
raise OSError("This OS is not supported.")
return VirtualEnvironment(env_dir, binname, libpath, incpath)
class EnvironmentInUse(Exception):
"""The given environment is currently activated, and the operation cannot be performed."""
class NoEnvironmentActive(Exception):
"""No environment is currently activated, and the operation cannot be performed."""
class Vox(collections.abc.Mapping):
"""API access to Vox and virtual environments, in a dict-like format.
Makes use of the VirtualEnvironment namedtuple:
1. ``env``: The full path to the environment
2. ``bin``: The full path to the bin/Scripts directory of the environment
"""
def __init__(self, force_removals=False):
if not XSH.env.get("VIRTUALENV_HOME"):
home_path = os.path.expanduser("~")
self.venvdir = os.path.join(home_path, ".virtualenvs")
XSH.env["VIRTUALENV_HOME"] = self.venvdir
else:
self.venvdir = XSH.env["VIRTUALENV_HOME"]
self.force_removals = force_removals
self.sub_dirs = _subdir_names()
def create(
self,
name,
interpreter=None,
system_site_packages=False,
symlinks=False,
with_pip=True,
prompt=None,
):
"""Create a virtual environment in $VIRTUALENV_HOME with python3's ``venv``.
Parameters
----------
name : str
Virtual environment name
interpreter: str
Python interpreter used to create the virtual environment.
Can be configured via the $VOX_DEFAULT_INTERPRETER environment variable.
system_site_packages : bool
If True, the system (global) site-packages dir is available to
created environments.
symlinks : bool
If True, attempt to symlink rather than copy files into virtual
environment.
with_pip : bool
If True, ensure pip is installed in the virtual environment. (Default is True)
prompt: str
Provides an alternative prompt prefix for this environment.
"""
if interpreter is None:
interpreter = _get_vox_default_interpreter()
print(f"Using Interpreter: {interpreter}")
# NOTE: clear=True is the same as delete then create.
# NOTE: upgrade=True is its own method
if isinstance(name, os.PathLike):
env_path = os.fspath(name)
else:
env_path = os.path.join(self.venvdir, name)
if not self._check_reserved(env_path):
raise ValueError(
"venv can't contain reserved names ({})".format(
", ".join(self.sub_dirs)
)
)
self._create(
env_path,
interpreter,
system_site_packages,
symlinks,
with_pip,
prompt=prompt,
)
events.vox_on_create.fire(name=name)
def upgrade(self, name, symlinks=False, with_pip=True, interpreter=None):
"""Create a virtual environment in $VIRTUALENV_HOME with python3's ``venv``.
WARNING: If a virtual environment was created with symlinks or without PIP, you must
specify these options again on upgrade.
Parameters
----------
name : str
Virtual environment name
interpreter: str
The Python interpreter used to create the virtualenv
symlinks : bool
If True, attempt to symlink rather than copy files into virtual
environment.
with_pip : bool
If True, ensure pip is installed in the virtual environment.
"""
if interpreter is None:
interpreter = _get_vox_default_interpreter()
print(f"Using Interpreter: {interpreter}")
# venv doesn't reload this, so we have to do it ourselves.
# Is there a bug for this in Python? There should be.
venv = self[name]
cfgfile = os.path.join(venv.env, "pyvenv.cfg")
cfgops = {}
with open(cfgfile) as cfgfile:
for line in cfgfile:
line = line.strip()
if "=" not in line:
continue
k, v = line.split("=", 1)
cfgops[k.strip()] = v.strip()
flags = {
"system_site_packages": cfgops["include-system-site-packages"] == "true",
"symlinks": symlinks,
"with_pip": with_pip,
}
prompt = cfgops.get("prompt")
if prompt:
flags["prompt"] = prompt.lstrip("'\"").rstrip("'\"")
# END things we shouldn't be doing.
# Ok, do what we came here to do.
self._create(venv.env, interpreter, upgrade=True, **flags)
return venv
@staticmethod
def _create(
env_path,
interpreter,
system_site_packages=False,
symlinks=False,
with_pip=True,
upgrade=False,
prompt=None,
):
version_output = sp.check_output(
[interpreter, "--version"], stderr=sp.STDOUT, text=True
)
interpreter_major_version = int(version_output.split()[-1].split(".")[0])
module = "venv" if interpreter_major_version >= 3 else "virtualenv"
system_site_packages = "--system-site-packages" if system_site_packages else ""
symlinks = "--symlinks" if symlinks and interpreter_major_version >= 3 else ""
with_pip = "" if with_pip else "--without-pip"
upgrade = "--upgrade" if upgrade else ""
cmd = [
interpreter,
"-m",
module,
env_path,
system_site_packages,
symlinks,
with_pip,
upgrade,
]
if prompt and module == "venv":
cmd.extend(["--prompt", prompt])
cmd = [arg for arg in cmd if arg] # remove empty args
logging.debug(cmd)
sp.check_call(cmd)
def _check_reserved(self, name):
return (
os.path.basename(name) not in self.sub_dirs
) # FIXME: Check the middle components, too
def __getitem__(self, name) -> "VirtualEnvironment":
"""Get information about a virtual environment.
Parameters
----------
name : str or Ellipsis
Virtual environment name or absolute path. If ... is given, return
the current one (throws a KeyError if there isn't one).
"""
if name is ...:
env = XSH.env
env_paths = [env["VIRTUAL_ENV"]]
elif isinstance(name, os.PathLike):
env_paths = [os.fspath(name)]
else:
if not self._check_reserved(name):
# Don't allow a venv that could be a venv special dir
raise KeyError()
env_paths = []
if os.path.isdir(name):
env_paths += [name]
env_paths += [os.path.join(self.venvdir, name)]
for ep in env_paths:
ve = _mkvenv(ep)
# Actually check if this is an actual venv or just a organizational directory
# eg, if 'spam/eggs' is a venv, reject 'spam'
if not os.path.exists(ve.bin):
continue
return ve
else:
raise KeyError()
def __contains__(self, name):
# For some reason, MutableMapping seems to do this against iter, which is just silly.
try:
self[name]
except KeyError:
return False
else:
return True
def get_binary_path(self, binary: str, *dirs: str):
bin_, _, _ = self.sub_dirs
python_exec = binary
if ON_WINDOWS and not python_exec.endswith(".exe"):
python_exec += ".exe"
return os.path.join(*dirs, bin_, python_exec)
def __iter__(self):
"""List available virtual environments found in $VIRTUALENV_HOME."""
for dirpath, dirnames, _ in os.walk(self.venvdir):
python_exec = self.get_binary_path("python", dirpath)
if os.access(python_exec, os.X_OK):
yield dirpath[len(self.venvdir) + 1 :] # +1 is to remove the separator
dirnames.clear()
def __len__(self):
"""Counts known virtual environments, using the same rules as iter()."""
line = 0
for _ in self:
line += 1
return line
def active(self):
"""Get the name of the active virtual environment.
You can use this as a key to get further information.
Returns None if no environment is active.
"""
env = XSH.env
if "VIRTUAL_ENV" not in env:
return
env_path = env["VIRTUAL_ENV"]
if env_path.startswith(self.venvdir):
name = env_path[len(self.venvdir) :]
if name[0] in "/\\":
name = name[1:]
return name
else:
return env_path
def activate(self, name):
"""
Activate a virtual environment.
Parameters
----------
name : str
Virtual environment name or absolute path.
"""
env = XSH.env
ve = self[name]
if "VIRTUAL_ENV" in env:
self.deactivate()
type(self).oldvars = {"PATH": list(env["PATH"])}
env["PATH"].insert(0, ve.bin)
env["VIRTUAL_ENV"] = ve.env
if "PYTHONHOME" in env:
type(self).oldvars["PYTHONHOME"] = env.pop("PYTHONHOME")
events.vox_on_activate.fire(name=name, path=ve.env)
def deactivate(self):
"""
Deactivate the active virtual environment. Returns its name.
"""
env = XSH.env
if "VIRTUAL_ENV" not in env:
raise NoEnvironmentActive("No environment currently active.")
env_name = self.active()
if hasattr(type(self), "oldvars"):
for k, v in type(self).oldvars.items():
env[k] = v
del type(self).oldvars
del env["VIRTUAL_ENV"]
events.vox_on_deactivate.fire(name=env_name, path=self[env_name].env)
return env_name
def __delitem__(self, name):
"""
Permanently deletes a virtual environment.
Parameters
----------
name : str
Virtual environment name or absolute path.
"""
env_path = self[name].env
try:
if self[...].env == env_path:
raise EnvironmentInUse(
'The "%s" environment is currently active.' % name
)
except KeyError:
# No current venv, ... fails
pass
env_path = os.path.abspath(env_path)
if not self.force_removals:
print(f"The directory {env_path}")
print("and all of its content will be deleted.")
answer = input("Do you want to continue? [Y/n]")
if "n" in answer:
return
shutil.rmtree(env_path)
events.vox_on_delete.fire(name=name)
def _get_vox_default_interpreter():
"""Return the interpreter set by the $VOX_DEFAULT_INTERPRETER if set else sys.executable"""
default = "python3"
if default in XSH.commands_cache:
default = XSH.commands_cache.locate_binary(default)
else:
default = sys.executable
return XSH.env.get("VOX_DEFAULT_INTERPRETER", default)

View file

@ -1,99 +0,0 @@
"""Jump/delete across whole (non-whitespace) words with Ctrl+Left/Right/Delete/Backspace.
Control+left/right: Jump to previous/next whole word
Control+backspace: Delete to beginning of whole word
Control+delete: Delete to end of whole word
Shift+delete: Delete whole word
Alt+Left/Right/Delete/Backspace remain unmodified:
Alt+left/right: Jump to previous/next token
Alt+backspace: Delete to beginning of token
Alt+delete: Delete to end of token
Some terminals cannot differentiate between Backspace and Control+Backspace.
In this case, users can set `$XONSH_WHOLE_WORD_CTRL_BKSP = False` to skip
configuration of the Control+Backspace key binding.
"""
import prompt_toolkit.input.ansi_escape_sequences as ansiseq
from prompt_toolkit.filters import EmacsInsertMode, ViInsertMode
from prompt_toolkit.key_binding.bindings.named_commands import get_by_name
from prompt_toolkit.keys import Keys
from xonsh.built_ins import XSH, XonshSession
from xonsh.platform import ON_WINDOWS
def custom_keybindings(bindings, **kw):
insert_mode = ViInsertMode() | EmacsInsertMode()
# Key bindings for jumping over whole words (everything that's not
# white space) using Ctrl+Left and Ctrl+Right;
# Alt+Left and Alt+Right still jump over smaller word segments.
# See https://github.com/xonsh/xonsh/issues/2403
@bindings.add(Keys.ControlLeft)
def ctrl_left(event):
buff = event.current_buffer
pos = buff.document.find_previous_word_beginning(count=event.arg, WORD=True)
if pos:
buff.cursor_position += pos
@bindings.add(Keys.ControlRight)
def ctrl_right(event):
buff = event.current_buffer
pos = buff.document.find_next_word_ending(count=event.arg, WORD=True)
if pos:
buff.cursor_position += pos
@bindings.add(Keys.ShiftDelete, filter=insert_mode)
def delete_surrounding_big_word(event):
buff = event.current_buffer
startpos, endpos = buff.document.find_boundaries_of_current_word(WORD=True)
startpos = buff.cursor_position + startpos - 1
startpos = 0 if startpos < 0 else startpos
endpos = buff.cursor_position + endpos
endpos = endpos + 1 if startpos == 0 else endpos
buff.text = buff.text[:startpos] + buff.text[endpos:]
buff.cursor_position = startpos
@bindings.add(Keys.ControlDelete, filter=insert_mode)
def delete_big_word(event):
buff = event.current_buffer
pos = buff.document.find_next_word_ending(count=event.arg, WORD=True)
if pos:
buff.delete(count=pos)
@bindings.add(Keys.Escape, Keys.Delete, filter=insert_mode)
def delete_small_word(event):
get_by_name("kill-word").call(event)
# PTK sets both "\x7f" (^?) and "\x08" (^H) to the same behavior. Refs:
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/65c3d0607c69c19d80abb052a18569a2546280e5/src/prompt_toolkit/input/ansi_escape_sequences.py#L65
# https://github.com/prompt-toolkit/python-prompt-toolkit/issues/257#issuecomment-190328366
# We patch the ANSI sequences used by PTK. This requires a terminal
# that sends different codes for <backspace> and <control-h>.
# PTK sets Keys.Backspace = Keys.ControlH, so we hardcode the code.
# Windows has the codes reversed, see https://github.com/xonsh/xonsh/commit/406d20f78f18af39d9bbaf9580b0a763df78a0db
if XSH.env.get("XONSH_WHOLE_WORD_CTRL_BKSP", True):
CONTROL_BKSP = "\x08"
if ON_WINDOWS:
# issue #4845: Windows only (isort competes with black :-()
from prompt_toolkit.input import win32 as ptk_win32 # black:skip
# On windows BKSP is "\x08" and CTRL-BKSP is "\x7f"
CONTROL_BKSP = "\x7f"
ptk_win32.ConsoleInputReader.mappings[b"\x7f"] = CONTROL_BKSP
ansiseq.ANSI_SEQUENCES[CONTROL_BKSP] = CONTROL_BKSP
ansiseq.REVERSE_ANSI_SEQUENCES[CONTROL_BKSP] = CONTROL_BKSP
@bindings.add(CONTROL_BKSP, filter=insert_mode)
def backward_delete_big_word(event):
get_by_name("unix-word-rubout").call(event)
# backward_delete_small_word works on Alt+Backspace by default
def _load_xontrib_(xsh: XonshSession, **_):
xsh.builtins.events.on_ptk_create(custom_keybindings)

View file

@ -1,68 +0,0 @@
"""
This adds `xog` - a simple command to establish and print temporary traceback log file.
"""
import os
import pathlib
import tempfile
from xonsh.built_ins import XSH, XonshSession
def _get_log_file_name():
return pathlib.Path(f"{tempfile.gettempdir()}/xonsh-{os.getpid()}.log")
def _clear_log(logfile, stderr):
try:
os.remove(logfile)
except OSError as e:
print(f"xog: {e}", file=stderr)
return False
return True
def _print_log(logfile, stdout, stderr):
try:
with open(logfile) as log:
for line in log:
print(line, end="", file=stdout)
except OSError as e:
print(f"xog: {e}", file=stderr)
return False
return True
def _print_help(stdout):
print(
"""Usage: xog [OPTIONS]
Prints contents of the shell's traceback log file.
Options:
-c, --clear\t\tclear the log file contents
-?, --help\t\tprint this help text
"""
)
def _xog(args, stdout=None, stderr=None):
if "-?" in args or "--help" in args:
_print_help(stdout)
return 0
logfile = XSH.env.get("XONSH_TRACEBACK_LOGFILE", "")
if not (logfile and os.path.isfile(logfile)):
print("Traceback log file doesn't exist.", file=stderr)
return -1
if "-c" in args or "--clear" in args:
rc = _clear_log(logfile, stderr=stderr)
return 0 if rc else -1
rc = _print_log(logfile, stdout, stderr)
return 0 if rc else -1
def _load_xontrib_(xsh: XonshSession, **_):
xsh.env["XONSH_TRACEBACK_LOGFILE"] = _get_log_file_name()
xsh.aliases["xog"] = _xog