mirror of
https://github.com/xonsh/xonsh.git
synced 2025-03-04 00:14:41 +01:00
Fix vox env dir (#4736)
* refactor: use stdout passed from function better handling of errors * test: no need to skip these on windows * test: load local plugin * test: update vox-runin * test: speedup vox tests we dont need to test venv, only calls made to it via subprocess * test: pip.exe on windows
This commit is contained in:
parent
5ab9812f82
commit
011c5c00e9
5 changed files with 286 additions and 203 deletions
|
@ -0,0 +1,2 @@
|
|||
# when xonsh is not installed in the current env
|
||||
# pytest_plugins = ("xonsh.pytest.plugin",)
|
|
@ -6,7 +6,6 @@ import os
|
|||
import pytest
|
||||
|
||||
from xonsh.aliases import Aliases, ExecAlias
|
||||
from xonsh.pytest.tools import skip_if_on_windows
|
||||
|
||||
|
||||
def cd(args, stdin=None, **kwargs):
|
||||
|
@ -52,7 +51,6 @@ def test_eval_recursive(xession):
|
|||
assert ales.get("color_ls") == ["ls", "- -", "--color=true"]
|
||||
|
||||
|
||||
@skip_if_on_windows
|
||||
def test_eval_recursive_callable_partial(xonsh_execer, xession):
|
||||
ales = make_aliases()
|
||||
xession.env["HOME"] = os.path.expanduser("~")
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
"""Vox tests"""
|
||||
import io
|
||||
import os
|
||||
import pathlib
|
||||
import stat
|
||||
|
@ -8,21 +9,75 @@ import types
|
|||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from py.path import local
|
||||
|
||||
from xonsh.platform import ON_WINDOWS
|
||||
from xonsh.pytest.tools import skip_if_on_conda, skip_if_on_msys
|
||||
from xontrib.voxapi import Vox
|
||||
from xontrib.voxapi import Vox, _get_vox_default_interpreter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pytest_subprocess import FakeProcess
|
||||
|
||||
from xontrib.vox import VoxHandler
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def vox(xession, tmpdir, load_xontrib) -> "VoxHandler":
|
||||
"""vox Alias function"""
|
||||
|
||||
def venv_home(tmpdir, xession):
|
||||
"""Path where VENVs are created"""
|
||||
home = tmpdir / "venvs"
|
||||
home.ensure_dir()
|
||||
# Set up an isolated venv home
|
||||
xession.env["VIRTUALENV_HOME"] = str(tmpdir)
|
||||
xession.env["VIRTUALENV_HOME"] = str(home)
|
||||
return home
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def venv_proc(fake_process: "FakeProcess", venv_home):
|
||||
def version_handle(process):
|
||||
ver = str(sys.version).split()[0]
|
||||
process.stdout.write(f"Python {ver}")
|
||||
|
||||
def venv_handle(process):
|
||||
env_path = local(process.args[3])
|
||||
(env_path / "lib").ensure_dir()
|
||||
bin_path = env_path / ("Scripts" if ON_WINDOWS else "bin")
|
||||
|
||||
(bin_path / "python").write("", ensure=True)
|
||||
(bin_path / "python.exe").write("", ensure=True)
|
||||
for file in bin_path.listdir():
|
||||
st = os.stat(str(file))
|
||||
os.chmod(str(file), st.st_mode | stat.S_IEXEC)
|
||||
|
||||
for pip_name in ["pip", "pip.exe"]:
|
||||
fake_process.register(
|
||||
[str(bin_path / pip_name), "freeze", fake_process.any()], stdout=""
|
||||
)
|
||||
|
||||
# will be used by `vox runin`
|
||||
fake_process.register(
|
||||
[pip_name, "--version"],
|
||||
stdout=f"pip 22.0.4 from {env_path}/lib/python3.10/site-packages/pip (python 3.10)",
|
||||
)
|
||||
fake_process.keep_last_process(True)
|
||||
return env_path
|
||||
|
||||
def get_interpreters():
|
||||
interpreter = _get_vox_default_interpreter()
|
||||
yield interpreter
|
||||
if sys.executable != interpreter:
|
||||
yield sys.executable
|
||||
|
||||
for cmd in get_interpreters():
|
||||
fake_process.register([cmd, "--version"], callback=version_handle)
|
||||
venv = (cmd, "-m", "venv")
|
||||
fake_process.register([*venv, fake_process.any(min=1)], callback=venv_handle)
|
||||
fake_process.keep_last_process(True)
|
||||
return fake_process
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def vox(xession, load_xontrib, venv_proc) -> "VoxHandler":
|
||||
"""vox Alias function"""
|
||||
|
||||
# Set up enough environment for xonsh to function
|
||||
xession.env["PWD"] = os.getcwd()
|
||||
|
@ -31,8 +86,8 @@ def vox(xession, tmpdir, load_xontrib) -> "VoxHandler":
|
|||
xession.env["XONSH_SHOW_TRACEBACK"] = True
|
||||
|
||||
load_xontrib("vox")
|
||||
|
||||
yield xession.aliases["vox"]
|
||||
vox = xession.aliases["vox"]
|
||||
return vox
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -55,46 +110,67 @@ def record_events(xession):
|
|||
yield Listener()
|
||||
|
||||
|
||||
@skip_if_on_msys
|
||||
@skip_if_on_conda
|
||||
def test_vox_flow(xession, vox, record_events, tmpdir):
|
||||
def test_vox_flow(xession, vox, record_events, venv_home):
|
||||
"""
|
||||
Creates a virtual environment, gets it, enumerates it, and then deletes it.
|
||||
"""
|
||||
xession.env["VIRTUALENV_HOME"] = str(tmpdir)
|
||||
|
||||
record_events(
|
||||
"vox_on_create", "vox_on_delete", "vox_on_activate", "vox_on_deactivate"
|
||||
)
|
||||
|
||||
vox(["create", "spam"])
|
||||
assert stat.S_ISDIR(tmpdir.join("spam").stat().mode)
|
||||
assert stat.S_ISDIR(venv_home.join("spam").stat().mode)
|
||||
assert record_events.last == ("vox_on_create", "spam")
|
||||
|
||||
ve = vox.vox["spam"]
|
||||
assert ve.env == str(tmpdir.join("spam"))
|
||||
assert ve.env == str(venv_home.join("spam"))
|
||||
assert os.path.isdir(ve.bin)
|
||||
|
||||
assert "spam" in vox.vox
|
||||
assert "spam" in list(vox.vox)
|
||||
|
||||
# activate/deactivate
|
||||
# activate
|
||||
vox(["activate", "spam"])
|
||||
assert xession.env["VIRTUAL_ENV"] == vox.vox["spam"].env
|
||||
assert record_events.last == ("vox_on_activate", "spam", str(ve.env))
|
||||
vox.deactivate()
|
||||
|
||||
out = io.StringIO()
|
||||
# info
|
||||
vox(["info"], stdout=out)
|
||||
assert "spam" in out.getvalue()
|
||||
out.seek(0)
|
||||
|
||||
# list
|
||||
vox(["list"], stdout=out)
|
||||
print(out.getvalue())
|
||||
assert "spam" in out.getvalue()
|
||||
out.seek(0)
|
||||
|
||||
# wipe
|
||||
vox(["wipe"], stdout=out)
|
||||
print(out.getvalue())
|
||||
assert "Nothing to remove" in out.getvalue()
|
||||
out.seek(0)
|
||||
|
||||
# deactivate
|
||||
vox(["deactivate"])
|
||||
assert "VIRTUAL_ENV" not in xession.env
|
||||
assert record_events.last == ("vox_on_deactivate", "spam", str(ve.env))
|
||||
|
||||
# runin
|
||||
vox(["runin", "spam", "pip", "--version"], stdout=out)
|
||||
print(out.getvalue())
|
||||
assert "spam" in out.getvalue()
|
||||
out.seek(0)
|
||||
|
||||
# removal
|
||||
vox(["rm", "spam", "--force"])
|
||||
assert not tmpdir.join("spam").check()
|
||||
assert not venv_home.join("spam").check()
|
||||
assert record_events.last == ("vox_on_delete", "spam")
|
||||
|
||||
|
||||
@skip_if_on_msys
|
||||
@skip_if_on_conda
|
||||
def test_activate_non_vox_venv(xession, vox, record_events, tmpdir):
|
||||
def test_activate_non_vox_venv(xession, vox, record_events, venv_proc, venv_home):
|
||||
"""
|
||||
Create a virtual environment using Python's built-in venv module
|
||||
(not in VIRTUALENV_HOME) and verify that vox can activate it correctly.
|
||||
|
@ -103,7 +179,7 @@ def test_activate_non_vox_venv(xession, vox, record_events, tmpdir):
|
|||
|
||||
record_events("vox_on_activate", "vox_on_deactivate")
|
||||
|
||||
with tmpdir.as_cwd():
|
||||
with venv_home.as_cwd():
|
||||
venv_dirname = "venv"
|
||||
sp.run([sys.executable, "-m", "venv", venv_dirname])
|
||||
vox(["activate", venv_dirname])
|
||||
|
@ -117,7 +193,7 @@ def test_activate_non_vox_venv(xession, vox, record_events, tmpdir):
|
|||
assert record_events.last == (
|
||||
"vox_on_activate",
|
||||
venv_dirname,
|
||||
str(pathlib.Path(str(tmpdir)) / venv_dirname),
|
||||
str(pathlib.Path(str(venv_home)) / venv_dirname),
|
||||
)
|
||||
|
||||
vox(["deactivate"])
|
||||
|
@ -126,13 +202,13 @@ def test_activate_non_vox_venv(xession, vox, record_events, tmpdir):
|
|||
assert record_events.last == (
|
||||
"vox_on_deactivate",
|
||||
venv_dirname,
|
||||
str(pathlib.Path(str(tmpdir)) / venv_dirname),
|
||||
str(pathlib.Path(str(venv_home)) / venv_dirname),
|
||||
)
|
||||
|
||||
|
||||
@skip_if_on_msys
|
||||
@skip_if_on_conda
|
||||
def test_path(xession, vox, tmpdir, a_venv):
|
||||
def test_path(xession, vox, a_venv):
|
||||
"""
|
||||
Test to make sure Vox properly activates and deactivates by examining $PATH
|
||||
"""
|
||||
|
@ -146,20 +222,17 @@ def test_path(xession, vox, tmpdir, a_venv):
|
|||
assert oldpath == xession.env["PATH"]
|
||||
|
||||
|
||||
@skip_if_on_msys
|
||||
@skip_if_on_conda
|
||||
def test_crud_subdir(xession, tmpdir):
|
||||
def test_crud_subdir(xession, venv_home, venv_proc):
|
||||
"""
|
||||
Creates a virtual environment, gets it, enumerates it, and then deletes it.
|
||||
"""
|
||||
xession.env["VIRTUALENV_HOME"] = str(tmpdir)
|
||||
|
||||
vox = Vox(force_removals=True)
|
||||
vox.create("spam/eggs")
|
||||
assert stat.S_ISDIR(tmpdir.join("spam", "eggs").stat().mode)
|
||||
assert stat.S_ISDIR(venv_home.join("spam", "eggs").stat().mode)
|
||||
|
||||
ve = vox["spam/eggs"]
|
||||
assert ve.env == str(tmpdir.join("spam", "eggs"))
|
||||
assert ve.env == str(venv_home.join("spam", "eggs"))
|
||||
assert os.path.isdir(ve.bin)
|
||||
|
||||
assert "spam/eggs" in vox
|
||||
|
@ -170,16 +243,14 @@ def test_crud_subdir(xession, tmpdir):
|
|||
|
||||
del vox["spam/eggs"]
|
||||
|
||||
assert not tmpdir.join("spam", "eggs").check()
|
||||
assert not venv_home.join("spam", "eggs").check()
|
||||
|
||||
|
||||
@skip_if_on_msys
|
||||
@skip_if_on_conda
|
||||
def test_crud_path(xession, tmpdir):
|
||||
def test_crud_path(xession, tmpdir, venv_proc):
|
||||
"""
|
||||
Creates a virtual environment, gets it, enumerates it, and then deletes it.
|
||||
"""
|
||||
tmp = pathlib.Path(str(tmpdir))
|
||||
tmp = str(tmpdir)
|
||||
|
||||
vox = Vox(force_removals=True)
|
||||
vox.create(tmp)
|
||||
|
@ -217,9 +288,7 @@ def test_reserved_names(xession, tmpdir):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("registered", [False, True])
|
||||
@skip_if_on_msys
|
||||
@skip_if_on_conda
|
||||
def test_autovox(xession, tmpdir, vox, a_venv, load_xontrib, registered):
|
||||
def test_autovox(xession, vox, a_venv, load_xontrib, registered):
|
||||
"""
|
||||
Tests that autovox works
|
||||
"""
|
||||
|
@ -248,39 +317,33 @@ def test_autovox(xession, tmpdir, vox, a_venv, load_xontrib, registered):
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def venv_home(tmpdir):
|
||||
"""Path where VENVs are created"""
|
||||
return tmpdir
|
||||
def create_venv(venv_proc):
|
||||
vox = Vox(force_removals=True)
|
||||
|
||||
def wrapped(name):
|
||||
vox.create(name)
|
||||
return local(vox[name].env)
|
||||
|
||||
return wrapped
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def venvs(venv_home):
|
||||
def venvs(venv_home, create_venv):
|
||||
"""Create virtualenv with names venv0, venv1"""
|
||||
from xonsh.dirstack import popd, pushd
|
||||
|
||||
pushd([str(venv_home)])
|
||||
paths = []
|
||||
for idx in range(2):
|
||||
env_path = venv_home / f"venv{idx}"
|
||||
bin_path = env_path / ("Scripts" if ON_WINDOWS else "bin")
|
||||
paths.append(env_path)
|
||||
|
||||
(bin_path / "python").write("", ensure=True)
|
||||
(bin_path / "python.exe").write("", ensure=True)
|
||||
for file in bin_path.listdir():
|
||||
st = os.stat(str(file))
|
||||
os.chmod(str(file), st.st_mode | stat.S_IEXEC)
|
||||
yield paths
|
||||
yield [create_venv(f"venv{idx}") for idx in range(2)]
|
||||
popd([])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def a_venv(venvs):
|
||||
return venvs[0]
|
||||
def a_venv(create_venv):
|
||||
return create_venv("venv0")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_cmd_cache(xession, vox, venvs, monkeypatch):
|
||||
def patched_cmd_cache(xession, vox, monkeypatch):
|
||||
cc = xession.commands_cache
|
||||
|
||||
def no_change(self, *_):
|
||||
|
@ -289,7 +352,7 @@ def patched_cmd_cache(xession, vox, venvs, monkeypatch):
|
|||
monkeypatch.setattr(cc, "_check_changes", types.MethodType(no_change, cc))
|
||||
monkeypatch.setattr(cc, "_update_cmds_cache", types.MethodType(no_change, cc))
|
||||
bins = {path: (path, False) for path in _PY_BINS}
|
||||
cc._cmds_cache.update(bins)
|
||||
cc._cmds_cache = bins
|
||||
yield cc
|
||||
|
||||
|
||||
|
@ -308,7 +371,6 @@ _VOX_NEW_OPTS = {
|
|||
"--system-site-packages",
|
||||
"--without-pip",
|
||||
}.union(_HELP_OPTS)
|
||||
_VOX_NEW_EXP = _PY_BINS.union(_VOX_NEW_OPTS)
|
||||
|
||||
if ON_WINDOWS:
|
||||
_VOX_NEW_OPTS.add("--symlinks")
|
||||
|
@ -318,7 +380,21 @@ else:
|
|||
_VOX_RM_OPTS = {"-f", "--force"}.union(_HELP_OPTS)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
class TestVoxCompletions:
|
||||
@pytest.fixture
|
||||
def check(self, check_completer, xession, vox):
|
||||
def wrapped(cmd, positionals, options=None):
|
||||
for k in list(xession.completers):
|
||||
if k != "alias":
|
||||
xession.completers.pop(k)
|
||||
assert check_completer(cmd) == positionals
|
||||
xession.env["ALIAS_COMPLETIONS_OPTIONS_BY_DEFAULT"] = True
|
||||
if options:
|
||||
assert check_completer(cmd) == positionals.union(options)
|
||||
|
||||
return wrapped
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"args, positionals, opts",
|
||||
[
|
||||
(
|
||||
|
@ -376,19 +452,20 @@ _VOX_RM_OPTS = {"-f", "--force"}.union(_HELP_OPTS)
|
|||
("vox rm", _VENV_NAMES, _VOX_RM_OPTS),
|
||||
("vox rm venv1", _VENV_NAMES, _VOX_RM_OPTS), # pos nargs: one or more
|
||||
("vox rm venv1 venv2", _VENV_NAMES, _VOX_RM_OPTS), # pos nargs: two or more
|
||||
("vox new --activate --interpreter", _PY_BINS, set()), # option after option
|
||||
("vox new --interpreter", _PY_BINS, set()), # "option: first
|
||||
("vox new --activate env1 --interpreter", _PY_BINS, set()), # option after pos
|
||||
("vox new env1 --interpreter", _PY_BINS, set()), # "option: at end"
|
||||
("vox new env1 --interpreter=", _PY_BINS, set()), # "option: at end with
|
||||
],
|
||||
)
|
||||
def test_vox_completer(
|
||||
args, check_completer, positionals, opts, xession, patched_cmd_cache, venv_home
|
||||
):
|
||||
xession.env["XONSH_DATA_DIR"] = venv_home
|
||||
if positionals:
|
||||
assert check_completer(args) == positionals
|
||||
xession.env["ALIAS_COMPLETIONS_OPTIONS_BY_DEFAULT"] = True
|
||||
if opts:
|
||||
assert check_completer(args) == positionals.union(opts)
|
||||
)
|
||||
def test_vox_commands(self, args, positionals, opts, check, venvs):
|
||||
check(args, positionals, opts)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"args",
|
||||
[
|
||||
"vox new --activate --interpreter", # option after option
|
||||
"vox new --interpreter", # "option: first
|
||||
"vox new --activate env1 --interpreter", # option after pos
|
||||
"vox new env1 --interpreter", # "option: at end"
|
||||
"vox new env1 --interpreter=", # "option: at end with
|
||||
],
|
||||
)
|
||||
def test_interpreter(self, check, args, patched_cmd_cache):
|
||||
check(args, _PY_BINS)
|
||||
|
|
|
@ -582,6 +582,14 @@ class ArgParserAlias:
|
|||
For usage please check ``xonsh.completers.completer.py`` module.
|
||||
"""
|
||||
|
||||
class Error(Exception):
|
||||
"""Special case, when raised, the traceback will not be shown.
|
||||
Instead the process with exit with error code and message"""
|
||||
|
||||
def __init__(self, message: str, errno=1):
|
||||
super().__init__(message)
|
||||
self.errno = errno
|
||||
|
||||
def __init__(self, threadable=True, **kwargs) -> None:
|
||||
if not threadable:
|
||||
from xonsh.tools import unthreadable
|
||||
|
@ -646,11 +654,11 @@ class ArgParserAlias:
|
|||
|
||||
def err(self, *args, **kwargs):
|
||||
"""Write text to error stream"""
|
||||
return self.write_to("stderr", *args, **kwargs)
|
||||
self.write_to("stderr", *args, **kwargs)
|
||||
|
||||
def out(self, *args, **kwargs):
|
||||
"""Write text to output stream"""
|
||||
return self.write_to("stdout", *args, **kwargs)
|
||||
self.write_to("stdout", *args, **kwargs)
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
|
@ -664,6 +672,7 @@ class ArgParserAlias:
|
|||
):
|
||||
self.stdout = stdout
|
||||
self.stderr = stderr
|
||||
try:
|
||||
result = dispatch(
|
||||
self.parser,
|
||||
args,
|
||||
|
@ -676,7 +685,10 @@ class ArgParserAlias:
|
|||
_stack=stack,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
except self.Error as ex:
|
||||
self.err(f"Error: {ex}")
|
||||
sys.exit(getattr(ex, "errno", 1))
|
||||
finally:
|
||||
# free the reference to input/output. Otherwise it will result in errors
|
||||
self.stdout = None
|
||||
self.stderr = None
|
||||
|
|
100
xontrib/vox.py
100
xontrib/vox.py
|
@ -1,7 +1,6 @@
|
|||
"""Python virtual environment manager for xonsh."""
|
||||
import os.path
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import typing as tp
|
||||
from pathlib import Path
|
||||
|
@ -11,6 +10,7 @@ import xontrib.voxapi as voxapi
|
|||
from xonsh.built_ins import XSH
|
||||
from xonsh.dirstack import pushd_fn
|
||||
from xonsh.platform import ON_WINDOWS
|
||||
from xonsh.tools import XonshError
|
||||
|
||||
__all__ = ()
|
||||
|
||||
|
@ -119,7 +119,7 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
Provides an alternative prompt prefix for this environment.
|
||||
"""
|
||||
|
||||
print("Creating environment...")
|
||||
self.out("Creating environment...")
|
||||
|
||||
if temporary:
|
||||
path = tempfile.mkdtemp(prefix=f"vox-env-{name}")
|
||||
|
@ -150,9 +150,9 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
|
||||
if activate:
|
||||
self.activate(name)
|
||||
print(f"Environment {name!r} created and activated.\n")
|
||||
self.out(f"Environment {name!r} created and activated.\n")
|
||||
else:
|
||||
print(
|
||||
self.out(
|
||||
f'Environment {name!r} created. Activate it with "vox activate {name}".\n'
|
||||
)
|
||||
|
||||
|
@ -179,12 +179,11 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
try:
|
||||
self.vox.activate(name)
|
||||
except KeyError:
|
||||
self.parser.error(
|
||||
f'This environment doesn\'t exist. Create it with "vox new {name}".\n',
|
||||
raise self.Error(
|
||||
f'This environment doesn\'t exist. Create it with "vox new {name}"',
|
||||
)
|
||||
return None
|
||||
|
||||
print(f'Activated "{name}".\n')
|
||||
self.out(f'Activated "{name}".\n')
|
||||
if not no_cd:
|
||||
project_dir = self._get_project_dir(name)
|
||||
if project_dir:
|
||||
|
@ -202,16 +201,16 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
"""
|
||||
|
||||
if self.vox.active() is None:
|
||||
self.parser.error(
|
||||
raise self.Error(
|
||||
'No environment currently active. Activate one with "vox activate".\n',
|
||||
)
|
||||
env_name = self.vox.deactivate()
|
||||
if remove:
|
||||
self.vox.force_removals = force
|
||||
del self.vox[env_name]
|
||||
print(f'Environment "{env_name}" deactivated and removed.\n')
|
||||
self.out(f'Environment "{env_name}" deactivated and removed.\n')
|
||||
else:
|
||||
print(f'Environment "{env_name}" deactivated.\n')
|
||||
self.out(f'Environment "{env_name}" deactivated.\n')
|
||||
|
||||
def list(self):
|
||||
"""List available virtual environments."""
|
||||
|
@ -219,16 +218,15 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
try:
|
||||
envs = sorted(self.vox.keys())
|
||||
except PermissionError:
|
||||
self.parser.error("No permissions on VIRTUALENV_HOME")
|
||||
return None
|
||||
raise self.Error("No permissions on VIRTUALENV_HOME")
|
||||
|
||||
if not envs:
|
||||
self.parser.error(
|
||||
raise self.Error(
|
||||
'No environments available. Create one with "vox new".\n',
|
||||
)
|
||||
|
||||
print("Available environments:")
|
||||
print("\n".join(envs))
|
||||
self.out("Available environments:")
|
||||
self.out("\n".join(envs))
|
||||
|
||||
def remove(
|
||||
self,
|
||||
|
@ -253,28 +251,24 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
try:
|
||||
del self.vox[name]
|
||||
except voxapi.EnvironmentInUse:
|
||||
self.parser.error(
|
||||
raise self.Error(
|
||||
f'The "{name}" environment is currently active. '
|
||||
'In order to remove it, deactivate it first with "vox deactivate".\n',
|
||||
)
|
||||
except KeyError:
|
||||
self.parser.error(f'"{name}" environment doesn\'t exist.\n')
|
||||
raise self.Error(f'"{name}" environment doesn\'t exist.\n')
|
||||
else:
|
||||
print(f'Environment "{name}" removed.')
|
||||
print()
|
||||
self.out(f'Environment "{name}" removed.')
|
||||
self.out()
|
||||
|
||||
def _in_venv(self, env_dir: str, command: str, *args, **kwargs):
|
||||
env = XSH.env.detype()
|
||||
env["VIRTUAL_ENV"] = env_dir
|
||||
env["PATH"] = os.pathsep.join(
|
||||
[
|
||||
os.path.join(env_dir, self.vox.sub_dirs[0]),
|
||||
env["PATH"],
|
||||
]
|
||||
)
|
||||
env = {**XSH.env.detype(), "VIRTUAL_ENV": env_dir}
|
||||
|
||||
bin_path = os.path.join(env_dir, self.vox.sub_dirs[0])
|
||||
env["PATH"] = os.pathsep.join([bin_path, env["PATH"]])
|
||||
|
||||
for key in ("PYTHONHOME", "__PYVENV_LAUNCHER__"):
|
||||
if key in env:
|
||||
del env[key]
|
||||
env.pop(key, None)
|
||||
|
||||
try:
|
||||
return subprocess.check_call(
|
||||
|
@ -284,7 +278,7 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
# won't inherit the PATH
|
||||
except OSError as e:
|
||||
if e.errno == 2:
|
||||
self.parser.error(f"Unable to find {command}")
|
||||
raise self.Error(f"Unable to find {command}")
|
||||
raise
|
||||
|
||||
def runin(
|
||||
|
@ -310,7 +304,7 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
"""
|
||||
env_dir = self._get_env_dir(venv)
|
||||
if not args:
|
||||
self.parser.error("No command is passed")
|
||||
raise self.Error("No command is passed")
|
||||
self._in_venv(env_dir, *args)
|
||||
|
||||
def runin_all(
|
||||
|
@ -326,19 +320,18 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
"""
|
||||
errors = False
|
||||
for env in self.vox:
|
||||
print("\n%s:" % env)
|
||||
self.out("\n%s:" % env)
|
||||
try:
|
||||
self.runin(env, *args)
|
||||
except subprocess.CalledProcessError as e:
|
||||
errors = True
|
||||
print(e, file=sys.stderr)
|
||||
sys.exit(errors)
|
||||
self.err(e)
|
||||
self.parser.exit(errors)
|
||||
|
||||
def _sitepackages_dir(self, venv_path: str):
|
||||
env_python = self.vox.get_binary_path("python", venv_path)
|
||||
if not os.path.exists(env_python):
|
||||
self.parser.error("ERROR: no virtualenv active")
|
||||
return
|
||||
raise self.Error("no virtualenv active")
|
||||
|
||||
return Path(
|
||||
subprocess.check_output(
|
||||
|
@ -352,17 +345,18 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
)
|
||||
|
||||
def _get_env_dir(self, venv=None):
|
||||
venv = venv or "..."
|
||||
venv = venv or ...
|
||||
try:
|
||||
env_dir = self.vox[venv].env
|
||||
except KeyError:
|
||||
# check whether the venv is a valid path to an environment
|
||||
if os.path.exists(venv) and os.path.exists(
|
||||
self.vox.get_binary_path("python", venv)
|
||||
if (
|
||||
isinstance(venv, str)
|
||||
and os.path.exists(venv)
|
||||
and os.path.exists(self.vox.get_binary_path("python", venv))
|
||||
):
|
||||
return venv
|
||||
self.parser.error("No virtualenv is found")
|
||||
return
|
||||
raise XonshError("No virtualenv is found")
|
||||
return env_dir
|
||||
|
||||
def toggle_ssp(self):
|
||||
|
@ -374,10 +368,10 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
ngsp_file = site.parent / "no-global-site-packages.txt"
|
||||
if ngsp_file.exists():
|
||||
ngsp_file.unlink()
|
||||
print("Enabled global site-packages")
|
||||
self.out("Enabled global site-packages")
|
||||
else:
|
||||
with ngsp_file.open("w"):
|
||||
print("Disabled global site-packages")
|
||||
self.out("Disabled global site-packages")
|
||||
|
||||
def project_set(
|
||||
self,
|
||||
|
@ -397,9 +391,9 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
|
||||
project = os.path.abspath(project_path or ".")
|
||||
if not os.path.exists(env_dir):
|
||||
self.parser.error(f"Environment '{env_dir}' doesn't exist.")
|
||||
raise self.Error(f"Environment '{env_dir}' doesn't exist.")
|
||||
if not os.path.isdir(project):
|
||||
self.parser.error(f"{project} does not exist")
|
||||
raise self.Error(f"{project} does not exist")
|
||||
|
||||
project_file = self._get_project_file()
|
||||
project_file.write_text(project)
|
||||
|
@ -428,10 +422,10 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
"""
|
||||
project_dir = self._get_project_dir(venv)
|
||||
if project_dir:
|
||||
print(project_dir)
|
||||
self.out(project_dir)
|
||||
else:
|
||||
project_file = self._get_project_file(venv)
|
||||
self.parser.error(
|
||||
raise self.Error(
|
||||
f"Corrupted or outdated: {project_file}\nDirectory: {project_dir} doesn't exist."
|
||||
)
|
||||
|
||||
|
@ -454,11 +448,11 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
ignored = sorted(all_pkgs - pkgs)
|
||||
to_remove = {p.split("==")[0] for p in pkgs}
|
||||
if to_remove:
|
||||
print("Ignoring:\n %s" % "\n ".join(ignored))
|
||||
print("Uninstalling packages:\n %s" % "\n ".join(to_remove))
|
||||
self.out("Ignoring:\n %s" % "\n ".join(ignored))
|
||||
self.out("Uninstalling packages:\n %s" % "\n ".join(to_remove))
|
||||
return subprocess.run([pip_bin, "uninstall", "-y", *to_remove])
|
||||
else:
|
||||
print("Nothing to remove")
|
||||
self.out("Nothing to remove")
|
||||
|
||||
def info(self, venv: _venv_option = None):
|
||||
"""Prints the path for the supplied env
|
||||
|
@ -468,7 +462,7 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
venv
|
||||
name of the venv
|
||||
"""
|
||||
print(self.vox[venv or ...])
|
||||
self.out(self.vox[venv or ...])
|
||||
|
||||
def upgrade(
|
||||
self,
|
||||
|
@ -498,7 +492,7 @@ class VoxHandler(xcli.ArgParserAlias):
|
|||
venv = self.vox.upgrade(
|
||||
name or ..., symlinks=symlinks, with_pip=with_pip, interpreter=interpreter
|
||||
)
|
||||
print(venv)
|
||||
self.out(venv)
|
||||
|
||||
|
||||
XSH.aliases["vox"] = VoxHandler()
|
||||
|
|
Loading…
Add table
Reference in a new issue