mirror of
https://github.com/xonsh/xonsh.git
synced 2025-03-04 08:24:40 +01:00

* todo * test: remove usage of DummyEnv and setting .env attribute on xession fixture one step closer to making too much of tweaking to xession during tests * test: fix tests vox and gitstatus-prompt * docs: update test-fixture usage * fix: import flake8 error * test: remove direct access to XSH in tests * test: remove usage of XSH in test files * todo * test: use tmp-dir to create stubs * refactor: use fixture factory to mock out XonshSession * refactor: remove direct access of XSH from functions * refactor: remove direct access of XSH from functions * fix: qa checks * refactor: rename variables to match their values * test: update failing tests because it had no PATH set previously * fix: remove builtins usage from pyghooks.py * style: * refactor: update tests to use fixtures * fix: env varialbe is setup per function some tests accidentally update the env variables and that is leaking into next tests * fix: failing vox tests * test: set commands_cache per test * test: fix failing tests * fix: failing tests on linux ptk-highlight * fix: failing tests on Windows cwd-prompt * test: copy env as to not affect original object * fix: lazily evaluate cmds-cache in pyghooks * test: fix failing tests * fix: qa errors import * test: set commands-cache per test while caching path results * test: speedup test_thread_local_swap * fix: failing tests on windows * refactor: Execer doesn't control session * refactor: XSH.unload will take care of reversing builtins attrs set * test: use env.update over monkeypatch * Revert "test: use env.update over monkeypatch" This reverts commit 010a5022247a098f1741966b8af1bf758663480e.
169 lines
5.4 KiB
Python
169 lines
5.4 KiB
Python
"""Tests the xonsh.procs.specs"""
|
|
import itertools
|
|
import sys
|
|
from subprocess import Popen
|
|
|
|
import pytest
|
|
|
|
from xonsh.procs.specs import cmds_to_specs, run_subproc
|
|
from xonsh.procs.posix import PopenThread
|
|
from xonsh.procs.proxies import ProcProxy, ProcProxyThread, STDOUT_DISPATCHER
|
|
|
|
from tests.tools import skip_if_on_windows
|
|
|
|
|
|
@skip_if_on_windows
|
|
def test_cmds_to_specs_thread_subproc(xession):
|
|
env = xession.env
|
|
cmds = [["pwd"]]
|
|
|
|
# XONSH_CAPTURE_ALWAYS=False should disable interactive threaded subprocs
|
|
env["XONSH_CAPTURE_ALWAYS"] = False
|
|
env["THREAD_SUBPROCS"] = True
|
|
specs = cmds_to_specs(cmds, captured="hiddenobject")
|
|
assert specs[0].cls is Popen
|
|
|
|
# Now for the other situations
|
|
env["XONSH_CAPTURE_ALWAYS"] = True
|
|
|
|
# First check that threadable subprocs become threadable
|
|
env["THREAD_SUBPROCS"] = True
|
|
specs = cmds_to_specs(cmds, captured="hiddenobject")
|
|
assert specs[0].cls is PopenThread
|
|
# turn off threading and check we use Popen
|
|
env["THREAD_SUBPROCS"] = False
|
|
specs = cmds_to_specs(cmds, captured="hiddenobject")
|
|
assert specs[0].cls is Popen
|
|
|
|
# now check the threadbility of callable aliases
|
|
cmds = [[lambda: "Keras Selyrian"]]
|
|
# check that threadable alias become threadable
|
|
env["THREAD_SUBPROCS"] = True
|
|
specs = cmds_to_specs(cmds, captured="hiddenobject")
|
|
assert specs[0].cls is ProcProxyThread
|
|
# turn off threading and check we use ProcProxy
|
|
env["THREAD_SUBPROCS"] = False
|
|
specs = cmds_to_specs(cmds, captured="hiddenobject")
|
|
assert specs[0].cls is ProcProxy
|
|
|
|
|
|
@pytest.mark.parametrize("thread_subprocs", [True, False])
|
|
def test_cmds_to_specs_capture_stdout_not_stderr(thread_subprocs, xonsh_session):
|
|
env = xonsh_session.env
|
|
cmds = (["ls", "/root"],)
|
|
|
|
env["THREAD_SUBPROCS"] = thread_subprocs
|
|
|
|
specs = cmds_to_specs(cmds, captured="stdout")
|
|
assert specs[0].stdout is not None
|
|
assert specs[0].stderr is None
|
|
|
|
|
|
@skip_if_on_windows
|
|
@pytest.mark.parametrize("pipe", (True, False))
|
|
@pytest.mark.parametrize("alias_type", (None, "func", "exec", "simple"))
|
|
@pytest.mark.parametrize(
|
|
"thread_subprocs, capture_always", list(itertools.product((True, False), repeat=2))
|
|
)
|
|
@pytest.mark.flaky(reruns=5, reruns_delay=2)
|
|
def test_capture_always(
|
|
capfd, thread_subprocs, capture_always, alias_type, pipe, monkeypatch, xonsh_session
|
|
):
|
|
if not thread_subprocs and alias_type in ["func", "exec"]:
|
|
if pipe:
|
|
return pytest.skip("https://github.com/xonsh/xonsh/issues/4443")
|
|
else:
|
|
return pytest.skip("https://github.com/xonsh/xonsh/issues/4444")
|
|
|
|
env = xonsh_session.env
|
|
exp = "HELLO\nBYE\n"
|
|
cmds = [["echo", "-n", exp]]
|
|
if pipe:
|
|
exp = exp.splitlines()[1] + "\n" # second line
|
|
cmds += ["|", ["grep", "--color=never", exp.strip()]]
|
|
|
|
if alias_type:
|
|
first_cmd = cmds[0]
|
|
# Enable capfd for function aliases:
|
|
monkeypatch.setattr(STDOUT_DISPATCHER, "default", sys.stdout)
|
|
if alias_type == "func":
|
|
xonsh_session.aliases["tst"] = (
|
|
lambda: run_subproc([first_cmd], "hiddenobject") and None
|
|
) # Don't return a value
|
|
elif alias_type == "exec":
|
|
first_cmd = " ".join(repr(arg) for arg in first_cmd)
|
|
xonsh_session.aliases["tst"] = f"![{first_cmd}]"
|
|
else:
|
|
# alias_type == "simple"
|
|
xonsh_session.aliases["tst"] = first_cmd
|
|
|
|
cmds[0] = ["tst"]
|
|
|
|
env["THREAD_SUBPROCS"] = thread_subprocs
|
|
env["XONSH_CAPTURE_ALWAYS"] = capture_always
|
|
|
|
hidden = run_subproc(cmds, "hiddenobject") # ![]
|
|
# Check that interactive subprocs are always printed
|
|
assert exp in capfd.readouterr().out
|
|
|
|
if capture_always and thread_subprocs:
|
|
# Check that the interactive output was captured
|
|
assert hidden.out == exp
|
|
else:
|
|
# without THREAD_SUBPROCS capturing in ![] isn't possible
|
|
assert not hidden.out
|
|
|
|
# Explicitly captured commands are always captured
|
|
hidden = run_subproc(cmds, "object") # !()
|
|
hidden.end()
|
|
if thread_subprocs:
|
|
assert exp not in capfd.readouterr().out
|
|
assert hidden.out == exp
|
|
else:
|
|
# for some reason THREAD_SUBPROCS=False fails to capture in `!()` but still succeeds in `$()`
|
|
assert exp in capfd.readouterr().out
|
|
assert not hidden.out
|
|
|
|
output = run_subproc(cmds, "stdout") # $()
|
|
assert exp not in capfd.readouterr().out
|
|
assert output == exp
|
|
|
|
# Explicitly non-captured commands are never captured (/always printed)
|
|
run_subproc(cmds, captured=False) # $[]
|
|
assert exp in capfd.readouterr().out
|
|
|
|
|
|
@skip_if_on_windows
|
|
@pytest.mark.parametrize(
|
|
"captured, exp_is_none",
|
|
[
|
|
("object", False),
|
|
("stdout", True),
|
|
("hiddenobject", True),
|
|
(False, True),
|
|
],
|
|
)
|
|
def test_run_subproc_background(captured, exp_is_none):
|
|
|
|
cmds = (["echo", "hello"], "&")
|
|
return_val = run_subproc(cmds, captured)
|
|
assert (return_val is None) == exp_is_none
|
|
|
|
|
|
@pytest.mark.parametrize("thread_subprocs", [False, True])
|
|
def test_callable_alias_cls(thread_subprocs, xession):
|
|
class Cls:
|
|
def __call__(self, *args, **kwargs):
|
|
print(args, kwargs)
|
|
|
|
obj = Cls()
|
|
xession.aliases["tst"] = obj
|
|
|
|
env = xession.env
|
|
cmds = (["tst", "/root"],)
|
|
|
|
env["THREAD_SUBPROCS"] = thread_subprocs
|
|
|
|
spec = cmds_to_specs(cmds, captured="stdout")[0]
|
|
proc = spec.run()
|
|
assert proc.f == obj
|