Merge pull request #1758 from xonsh/rs

Run Subproc Refactor
This commit is contained in:
Gil Forsyth 2016-10-05 13:26:11 -04:00 committed by GitHub
commit 30e0b911a2
28 changed files with 2481 additions and 1061 deletions

1
.gitignore vendored
View file

@ -46,3 +46,4 @@ include/
.coverage .coverage
feedstock/ feedstock/
*.cred *.cred
tests/tttt

View file

@ -1053,14 +1053,28 @@ Callable Aliases
---------------- ----------------
Lastly, if an alias value is a function (or other callable), then this Lastly, if an alias value is a function (or other callable), then this
function is called *instead* of going to a subprocess command. Such functions function is called *instead* of going to a subprocess command. Such functions
must have one of the following two signatures may have one of the following signatures:
.. code-block:: python .. code-block:: python
def _mycmd(args, stdin=None): def mycmd0():
"""args will be a list of strings representing the arguments to this """This form takes no arguments but may return output or a return code.
command. stdin will be a string, if present. This is used to pipe """
the output of the previous command into this one. return "some output."
def mycmd1(args):
"""This form takes a single argument, args. This is a list of strings
representing the arguments to this command. Feel free to parse them
however you wish!
"""
# perform some action.
return 0
def mycmd2(args, stdin=None):
"""This form takes two arguments. The args list like above, as a well
as standard input. stdin will be a file like object that the command
can read from, if the user piped input to this command. If no input
was provided this will be None.
""" """
# do whatever you want! Anything you print to stdout or stderr # do whatever you want! Anything you print to stdout or stderr
# will be captured for you automatically. This allows callable # will be captured for you automatically. This allows callable
@ -1092,13 +1106,21 @@ must have one of the following two signatures
# examples the return code would be 0/success. # examples the return code would be 0/success.
return (None, "I failed", 2) return (None, "I failed", 2)
def mycmd3(args, stdin=None, stdout=None):
"""This form has three parameters. The first two are the same as above.
The last argument represents the standard output. This is a file-like
object that the command may write too.
"""
# you can either use stdout
stdout.write("Hello, ")
# or print()!
print("Mom!")
return
.. code-block:: python def mycmd4(args, stdin=None, stdout=None, stderr=None):
"""Lastly, the full form of subprocess callables takes all of the
def _mycmd2(args, stdin, stdout, stderr): arguments shown above as well as the standard error stream.
"""args will be a list of strings representing the arguments to this As with stdout, this is a write-only file-like object.
command. stdin is a read-only file-like object, and stdout and stderr
are write-only file-like objects
""" """
# This form allows "streaming" data to stdout and stderr # This form allows "streaming" data to stdout and stderr
import time import time
@ -1132,7 +1154,7 @@ with keyword arguments:
.. code-block:: xonshcon .. code-block:: xonshcon
>>> aliases['banana'] = lambda args,stdin=None: "Banana for scale.\n" >>> aliases['banana'] = lambda: "Banana for scale.\n"
>>> banana >>> banana
Banana for scale. Banana for scale.

50
news/rs.rst Normal file
View file

@ -0,0 +1,50 @@
**Added:**
* New subprocess specification class ``SubprocSpec`` is used for specifiying
and manipulating subprocess classes prior to execution.
* New ``PopenThread`` class runs subprocesses on a a separate thread.
* New ``CommandPipeline`` and ``HiddenCommandPipeline`` classes manage the
execution of a pipeline of commands via the execution of the last command
in the pipeline. Instances may be iterated and stream lines from the
stdout buffer.
* ``$XONSH_STORE_STDOUT`` is now available on all platforms!
* The ``CommandsCache`` now has the ability to predict whether or not a
command must be run in the foreground using ``Popen`` or may use a
background thread and can use ``PopenThread``.
* Callable aliases may now use the full gamut of functions signatures:
``f()``, ``f(args)``, ``f(args, stdin=None)``,
``f(args, stdin=None, stdout=None)``, and `
``f(args, stdin=None, stdout=None, stderr=None)``.
* Uncaptured subprocesses now recieve a PTY file handle for stdout and
stderr.
* New ``$XONSH_PROC_FREQUENCY`` environment variable that specifies how long
loops in the subprocess framwork should sleep. This may be adjusted from
its default value to improved perfromance and mitigate "leaky" pipes on
slower machines.
**Changed:**
* The ``run_subproc()`` function has been replaced with a new implementation.
* Piping between processes now uses OS pipes.
* ``$XONSH_STORE_STDIN`` now uses ``os.pread()`` rather than ``tee`` and a new
file.
**Deprecated:** None
**Removed:**
* ``CompletedCommand`` and ``HiddenCompletedCommand`` classes have been removed
in favor of ``CommandPipeline`` and ``HiddenCommandPipeline``.
* ``SimpleProcProxy`` and ``SimpleForegroundProcProxy`` have been removed
in favor of a more general mechanism for dispatching callable aliases
implemented in the ``ProcProxy`` class.
**Fixed:**
* May now Crtl-C out of an infinite loop with a subprocess, such as
```while True: sleep 1``.
* Fix for stdin redirects.
* Backgrounding works with ``$XONSH_STORE_STDOUT``
**Security:** None

View file

@ -1,4 +1,4 @@
[pytest] [tool:pytest]
flake8-max-line-length = 180 flake8-max-line-length = 180
flake8-ignore = flake8-ignore =
*.py E122 *.py E122

4
tests/bin/pwd Executable file
View file

@ -0,0 +1,4 @@
#!/usr/bin/env python
import os
x = os.getcwd()
print(x)

15
tests/bin/pwd.bat Normal file
View file

@ -0,0 +1,15 @@
@echo on
call :s_which py.exe
rem note that %~dp0 is dir of this batch script
if not "%_path%" == "" (
py -3 %~dp0pwd %*
) else (
python %~dp0pwd %*
)
goto :eof
:s_which
setlocal
endlocal & set _path=%~$PATH:1
goto :eof

View file

@ -8,15 +8,19 @@ import xonsh.built_ins
from xonsh.built_ins import ensure_list_of_strs from xonsh.built_ins import ensure_list_of_strs
from xonsh.execer import Execer from xonsh.execer import Execer
from xonsh.tools import XonshBlockError from xonsh.tools import XonshBlockError
from xonsh.jobs import tasks
from xonsh.events import events from xonsh.events import events
from xonsh.platform import ON_WINDOWS from xonsh.platform import ON_WINDOWS
from tools import DummyShell, sp from xonsh.commands_cache import CommandsCache
from tools import DummyShell, sp, DummyCommandsCache, DummyEnv, DummyHistory
@pytest.fixture @pytest.fixture
def xonsh_execer(monkeypatch): def xonsh_execer(monkeypatch):
"""Initiate the Execer with a mocked nop `load_builtins`""" """Initiate the Execer with a mocked nop `load_builtins`"""
monkeypatch.setattr(xonsh.built_ins, 'load_builtins', lambda *args, **kwargs: None) monkeypatch.setattr(xonsh.built_ins, 'load_builtins',
lambda *args, **kwargs: None)
execer = Execer(login=False, unload=False) execer = Execer(login=False, unload=False)
builtins.__xonsh_execer__ = execer builtins.__xonsh_execer__ = execer
return execer return execer
@ -25,7 +29,7 @@ def xonsh_execer(monkeypatch):
@pytest.yield_fixture @pytest.yield_fixture
def xonsh_builtins(): def xonsh_builtins():
"""Mock out most of the builtins xonsh attributes.""" """Mock out most of the builtins xonsh attributes."""
builtins.__xonsh_env__ = {} builtins.__xonsh_env__ = DummyEnv()
if ON_WINDOWS: if ON_WINDOWS:
builtins.__xonsh_env__['PATHEXT'] = ['.EXE', '.BAT', '.CMD'] builtins.__xonsh_env__['PATHEXT'] = ['.EXE', '.BAT', '.CMD']
builtins.__xonsh_ctx__ = {} builtins.__xonsh_ctx__ = {}
@ -38,7 +42,12 @@ def xonsh_builtins():
builtins.__xonsh_expand_path__ = lambda x: x builtins.__xonsh_expand_path__ = lambda x: x
builtins.__xonsh_subproc_captured__ = sp builtins.__xonsh_subproc_captured__ = sp
builtins.__xonsh_subproc_uncaptured__ = sp builtins.__xonsh_subproc_uncaptured__ = sp
builtins.__xonsh_stdout_uncaptured__ = None
builtins.__xonsh_stderr_uncaptured__ = None
builtins.__xonsh_ensure_list_of_strs__ = ensure_list_of_strs builtins.__xonsh_ensure_list_of_strs__ = ensure_list_of_strs
builtins.__xonsh_commands_cache__ = DummyCommandsCache()
builtins.__xonsh_all_jobs__ = {}
builtins.__xonsh_history__ = DummyHistory()
builtins.XonshBlockError = XonshBlockError builtins.XonshBlockError = XonshBlockError
builtins.__xonsh_subproc_captured_hiddenobject__ = sp builtins.__xonsh_subproc_captured_hiddenobject__ = sp
builtins.evalx = eval builtins.evalx = eval
@ -58,15 +67,21 @@ def xonsh_builtins():
del builtins.__xonsh_superhelp__ del builtins.__xonsh_superhelp__
del builtins.__xonsh_regexpath__ del builtins.__xonsh_regexpath__
del builtins.__xonsh_expand_path__ del builtins.__xonsh_expand_path__
del builtins.__xonsh_stdout_uncaptured__
del builtins.__xonsh_stderr_uncaptured__
del builtins.__xonsh_subproc_captured__ del builtins.__xonsh_subproc_captured__
del builtins.__xonsh_subproc_uncaptured__ del builtins.__xonsh_subproc_uncaptured__
del builtins.__xonsh_ensure_list_of_strs__ del builtins.__xonsh_ensure_list_of_strs__
del builtins.__xonsh_commands_cache__
del builtins.__xonsh_all_jobs__
del builtins.__xonsh_history__
del builtins.XonshBlockError del builtins.XonshBlockError
del builtins.evalx del builtins.evalx
del builtins.execx del builtins.execx
del builtins.compilex del builtins.compilex
del builtins.aliases del builtins.aliases
del builtins.events del builtins.events
tasks.clear() # must to this to enable resetting all_jobs
if ON_WINDOWS: if ON_WINDOWS:

1
tests/run_pwd.xsh Normal file
View file

@ -0,0 +1 @@
pwd

View file

@ -0,0 +1,44 @@
import pytest
from xonsh.commands_cache import CommandsCache, predict_shell, SHELL_PREDICTOR_PARSER
def test_commands_cache_lazy(xonsh_builtins):
cc = CommandsCache()
assert not cc.lazyin('xonsh')
assert 0 == len(list(cc.lazyiter()))
assert 0 == cc.lazylen()
TRUE_SHELL_ARGS = [
['-c', 'yo'],
['-c=yo'],
['file'],
['-i', '-l', 'file'],
['-i', '-c', 'yo'],
['-i', 'file'],
['-i', '-c', 'yo', 'file'],
]
@pytest.mark.parametrize('args', TRUE_SHELL_ARGS)
def test_predict_shell_parser(args):
ns, unknown = SHELL_PREDICTOR_PARSER.parse_known_args(args)
if ns.filename is not None:
assert not ns.filename.startswith('-')
@pytest.mark.parametrize('args', TRUE_SHELL_ARGS)
def test_predict_shell_true(args):
assert predict_shell(args)
FALSE_SHELL_ARGS = [
[],
['-c'],
['-i'],
['-i', '-l'],
]
@pytest.mark.parametrize('args', FALSE_SHELL_ARGS)
def test_predict_shell_false(args):
assert not predict_shell(args)

View file

@ -50,19 +50,28 @@ def shares_setup(tmpdir_factory):
, [r'uncpushd_test_PARENT', TEMP_DRIVE[3], PARENT]] , [r'uncpushd_test_PARENT', TEMP_DRIVE[3], PARENT]]
for s, d, l in shares: # set up some shares on local machine. dirs already exist test case must invoke wd_setup. for s, d, l in shares: # set up some shares on local machine. dirs already exist test case must invoke wd_setup.
subprocess.call(['NET', 'SHARE', s, '/delete'], universal_newlines=True) # clean up from previous run after good, long wait. rtn = subprocess.call(['NET', 'SHARE', s, '/delete'], universal_newlines=True) # clean up from previous run after good, long wait.
subprocess.call(['NET', 'SHARE', s + '=' + l], universal_newlines=True) if rtn != 0:
subprocess.call(['NET', 'USE', d, r"\\localhost" + '\\' + s], universal_newlines=True) yield None
return
rtn = subprocess.call(['NET', 'SHARE', s + '=' + l], universal_newlines=True)
if rtn != 0:
yield None
return
rtn = subprocess.call(['NET', 'USE', d, r"\\localhost" + '\\' + s], universal_newlines=True)
if rtn != 0:
yield None
return
yield [[r"\\localhost" + '\\' + s[0], s[1], s[2]] for s in shares] yield [[r"\\localhost" + '\\' + s[0], s[1], s[2]] for s in shares]
# we want to delete the test shares we've created, but can't do that if unc shares in DIRSTACK # we want to delete the test shares we've created, but can't do that if unc shares in DIRSTACK
# (left over from assert fail aborted test) # (left over from assert fail aborted test)
os.chdir(HERE) os.chdir(HERE)
for dl in _unc_tempDrives: for dl in _unc_tempDrives:
subprocess.call(['net', 'use', dl, '/delete'], universal_newlines=True) rtn = subprocess.call(['net', 'use', dl, '/delete'], universal_newlines=True)
for s, d, l in shares: for s, d, l in shares:
subprocess.call(['net', 'use', d, '/delete'], universal_newlines=True) rtn = subprocess.call(['net', 'use', d, '/delete'], universal_newlines=True)
# subprocess.call(['net', 'share', s, '/delete'], universal_newlines=True) # fails with access denied, # subprocess.call(['net', 'share', s, '/delete'], universal_newlines=True) # fails with access denied,
# unless I wait > 10 sec. see http://stackoverflow.com/questions/38448413/access-denied-in-net-share-delete # unless I wait > 10 sec. see http://stackoverflow.com/questions/38448413/access-denied-in-net-share-delete
@ -92,8 +101,9 @@ def test_cd_dot(xonsh_builtins):
@pytest.mark.skipif( not ON_WINDOWS, reason="Windows-only UNC functionality") @pytest.mark.skipif( not ON_WINDOWS, reason="Windows-only UNC functionality")
def test_uncpushd_simple_push_pop(xonsh_builtins, shares_setup): def test_uncpushd_simple_push_pop(xonsh_builtins, shares_setup):
if shares_setup is None:
return
xonsh_builtins.__xonsh_env__ = Env(CDPATH=PARENT, PWD=HERE) xonsh_builtins.__xonsh_env__ = Env(CDPATH=PARENT, PWD=HERE)
dirstack.cd([PARENT]) dirstack.cd([PARENT])
owd = os.getcwd() owd = os.getcwd()
assert owd.casefold() == xonsh_builtins.__xonsh_env__['PWD'].casefold() assert owd.casefold() == xonsh_builtins.__xonsh_env__['PWD'].casefold()
@ -106,8 +116,10 @@ def test_uncpushd_simple_push_pop(xonsh_builtins, shares_setup):
assert len(_unc_tempDrives) == 0 assert len(_unc_tempDrives) == 0
@pytest.mark.skipif( not ON_WINDOWS, reason="Windows-only UNC functionality") @pytest.mark.skipif(not ON_WINDOWS, reason="Windows-only UNC functionality")
def test_uncpushd_push_to_same_share(xonsh_builtins): def test_uncpushd_push_to_same_share(xonsh_builtins, shares_setup):
if shares_setup is None:
return
xonsh_builtins.__xonsh_env__ = Env(CDPATH=PARENT, PWD=HERE) xonsh_builtins.__xonsh_env__ = Env(CDPATH=PARENT, PWD=HERE)
dirstack.cd([PARENT]) dirstack.cd([PARENT])
@ -135,9 +147,11 @@ def test_uncpushd_push_to_same_share(xonsh_builtins):
@pytest.mark.skipif( not ON_WINDOWS, reason="Windows-only UNC functionality") @pytest.mark.skipif( not ON_WINDOWS, reason="Windows-only UNC functionality")
def test_uncpushd_push_other_push_same(xonsh_builtins): def test_uncpushd_push_other_push_same(xonsh_builtins, shares_setup):
"""push to a, then to b. verify drive letter is TEMP_DRIVE[2], skipping already used TEMP_DRIVE[1] """push to a, then to b. verify drive letter is TEMP_DRIVE[2], skipping already used TEMP_DRIVE[1]
Then push to a again. Pop (check b unmapped and a still mapped), pop, pop (check a is unmapped)""" Then push to a again. Pop (check b unmapped and a still mapped), pop, pop (check a is unmapped)"""
if shares_setup is None:
return
xonsh_builtins.__xonsh_env__ = Env(CDPATH=PARENT, PWD=HERE) xonsh_builtins.__xonsh_env__ = Env(CDPATH=PARENT, PWD=HERE)
dirstack.cd([PARENT]) dirstack.cd([PARENT])
@ -181,7 +195,7 @@ def test_uncpushd_push_other_push_same(xonsh_builtins):
assert not os.path.isdir(TEMP_DRIVE[0] + '\\') assert not os.path.isdir(TEMP_DRIVE[0] + '\\')
@pytest.mark.skipif( not ON_WINDOWS, reason="Windows-only UNC functionality") @pytest.mark.skipif(not ON_WINDOWS, reason="Windows-only UNC functionality")
def test_uncpushd_push_base_push_rempath(xonsh_builtins): def test_uncpushd_push_base_push_rempath(xonsh_builtins):
"""push to subdir under share, verify mapped path includes subdir""" """push to subdir under share, verify mapped path includes subdir"""
pass pass
@ -238,6 +252,7 @@ def with_unc_check_disabled(): # just like the above, but value is 1 to *disabl
@pytest.fixture() @pytest.fixture()
def xonsh_builtins_cd(xonsh_builtins): def xonsh_builtins_cd(xonsh_builtins):
xonsh_builtins.__xonsh_env__['CDPATH'] = PARENT
xonsh_builtins.__xonsh_env__['PWD'] = os.getcwd() xonsh_builtins.__xonsh_env__['PWD'] = os.getcwd()
xonsh_builtins.__xonsh_env__['DIRSTACK_SIZE'] = 20 xonsh_builtins.__xonsh_env__['DIRSTACK_SIZE'] = 20
return xonsh_builtins return xonsh_builtins
@ -247,7 +262,8 @@ def xonsh_builtins_cd(xonsh_builtins):
def test_uncpushd_cd_unc_auto_pushd(xonsh_builtins_cd, with_unc_check_enabled): def test_uncpushd_cd_unc_auto_pushd(xonsh_builtins_cd, with_unc_check_enabled):
xonsh_builtins_cd.__xonsh_env__['AUTO_PUSHD'] = True xonsh_builtins_cd.__xonsh_env__['AUTO_PUSHD'] = True
so, se, rc = dirstack.cd([r'\\localhost\uncpushd_test_PARENT']) so, se, rc = dirstack.cd([r'\\localhost\uncpushd_test_PARENT'])
assert rc == 0 if rc != 0:
return
assert os.getcwd().casefold() == TEMP_DRIVE[0] + '\\' assert os.getcwd().casefold() == TEMP_DRIVE[0] + '\\'
assert len(DIRSTACK) == 1 assert len(DIRSTACK) == 1
assert os.path.isdir(TEMP_DRIVE[0] + '\\') assert os.path.isdir(TEMP_DRIVE[0] + '\\')
@ -255,12 +271,16 @@ def test_uncpushd_cd_unc_auto_pushd(xonsh_builtins_cd, with_unc_check_enabled):
@pytest.mark.skipif(not ON_WINDOWS, reason="Windows-only UNC functionality") @pytest.mark.skipif(not ON_WINDOWS, reason="Windows-only UNC functionality")
def test_uncpushd_cd_unc_nocheck(xonsh_builtins_cd, with_unc_check_disabled): def test_uncpushd_cd_unc_nocheck(xonsh_builtins_cd, with_unc_check_disabled):
if with_unc_check_disabled == 0:
return
dirstack.cd([r'\\localhost\uncpushd_test_HERE']) dirstack.cd([r'\\localhost\uncpushd_test_HERE'])
assert os.getcwd().casefold() == r'\\localhost\uncpushd_test_here' assert os.getcwd().casefold() == r'\\localhost\uncpushd_test_here'
@pytest.mark.skipif(not ON_WINDOWS, reason="Windows-only UNC functionality") @pytest.mark.skipif(not ON_WINDOWS, reason="Windows-only UNC functionality")
def test_uncpushd_cd_unc_no_auto_pushd(xonsh_builtins_cd, with_unc_check_enabled): def test_uncpushd_cd_unc_no_auto_pushd(xonsh_builtins_cd, with_unc_check_enabled):
if with_unc_check_enabled == 0:
return
so, se, rc = dirstack.cd([r'\\localhost\uncpushd_test_PARENT']) so, se, rc = dirstack.cd([r'\\localhost\uncpushd_test_PARENT'])
assert rc != 0 assert rc != 0
assert so is None or len(so) == 0 assert so is None or len(so) == 0

View file

@ -9,6 +9,7 @@ from xonsh.tools import ON_WINDOWS
import pytest import pytest
from xonsh.commands_cache import CommandsCache
from xonsh.environ import Env, load_static_config, locate_binary from xonsh.environ import Env, load_static_config, locate_binary
from tools import skip_if_on_unix from tools import skip_if_on_unix
@ -133,8 +134,9 @@ def test_locate_binary_on_windows(xonsh_builtins):
'PATH': [tmpdir], 'PATH': [tmpdir],
'PATHEXT': ['.COM', '.EXE', '.BAT'], 'PATHEXT': ['.COM', '.EXE', '.BAT'],
}) })
assert locate_binary('file1') == os.path.join(tmpdir,'file1.exe') xonsh_builtins.__xonsh_commands_cache__ = CommandsCache()
assert locate_binary('file1.exe') == os.path.join(tmpdir,'file1.exe') assert locate_binary('file1') == os.path.join(tmpdir, 'file1.exe')
assert locate_binary('file2') == os.path.join(tmpdir,'FILE2.BAT') assert locate_binary('file1.exe') == os.path.join(tmpdir, 'file1.exe')
assert locate_binary('file2.bat') == os.path.join(tmpdir,'FILE2.BAT') assert locate_binary('file2') == os.path.join(tmpdir, 'FILE2.BAT')
assert locate_binary('file2.bat') == os.path.join(tmpdir, 'FILE2.BAT')
assert locate_binary('file3') is None assert locate_binary('file3') is None

View file

@ -1,12 +1,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Testing xonsh import hooks""" """Testing xonsh import hooks"""
import os import os
import builtins
import pytest import pytest
from xonsh import imphooks from xonsh import imphooks
from xonsh.environ import Env from xonsh.environ import Env
from xonsh.built_ins import load_builtins, unload_builtins from xonsh.built_ins import load_builtins, unload_builtins
import builtins
imphooks.install_hook() imphooks.install_hook()

47
tests/test_run_subproc.py Normal file
View file

@ -0,0 +1,47 @@
import os
import sys
import builtins
import pytest
from xonsh.platform import ON_WINDOWS
from xonsh.built_ins import run_subproc
from tools import skip_if_on_windows
@pytest.yield_fixture(autouse=True)
def chdir_to_test_dir(xonsh_builtins):
old_cwd = os.getcwd()
new_cwd = os.path.dirname(__file__)
os.chdir(new_cwd)
yield
os.chdir(old_cwd)
@skip_if_on_windows
def test_runsubproc_simple(xonsh_builtins, xonsh_execer):
new_cwd = os.path.dirname(__file__)
xonsh_builtins.__xonsh_env__['PATH'] = os.path.join(new_cwd, 'bin') + \
os.pathsep + os.path.dirname(sys.executable)
xonsh_builtins.__xonsh_env__['XONSH_ENCODING'] = 'utf8'
xonsh_builtins.__xonsh_env__['XONSH_ENCODING_ERRORS'] = 'surrogateescape'
xonsh_builtins.__xonsh_env__['XONSH_PROC_FREQUENCY'] = 1e-4
if ON_WINDOWS:
pathext = xonsh_builtins.__xonsh_env__['PATHEXT']
xonsh_builtins.__xonsh_env__['PATHEXT'] = ';'.join(pathext)
pwd = 'PWD.BAT'
else:
pwd = 'pwd'
out = run_subproc([[pwd]], captured='stdout')
assert out.rstrip() == new_cwd
@skip_if_on_windows
def test_runsubproc_redirect_out_to_file(xonsh_builtins, xonsh_execer):
xonsh_builtins.__xonsh_env__['XONSH_PROC_FREQUENCY'] = 1e-4
run_subproc([['pwd', 'out>', 'tttt']], captured='stdout')
with open('tttt') as f:
assert f.read().rstrip() == os.getcwd()
os.remove('tttt')

View file

@ -26,7 +26,6 @@ from xonsh.tools import (
pathsep_to_upper_seq, seq_to_upper_pathsep, expandvars, is_int_as_str, is_slice_as_str, pathsep_to_upper_seq, seq_to_upper_pathsep, expandvars, is_int_as_str, is_slice_as_str,
ensure_timestamp, get_portions ensure_timestamp, get_portions
) )
from xonsh.commands_cache import CommandsCache
from xonsh.built_ins import expand_path from xonsh.built_ins import expand_path
from xonsh.environ import Env from xonsh.environ import Env
@ -1118,13 +1117,6 @@ def test_expand_case_matching(inp, exp):
assert exp == obs assert exp == obs
def test_commands_cache_lazy(xonsh_builtins):
cc = CommandsCache()
assert not cc.lazyin('xonsh')
assert 0 == len(list(cc.lazyiter()))
assert 0 == cc.lazylen()
@pytest.mark.parametrize('inp, exp', [ @pytest.mark.parametrize('inp, exp', [
("foo", "foo"), ("foo", "foo"),
("$foo $bar", "bar $bar"), ("$foo $bar", "bar $bar"),

View file

@ -1,18 +1,16 @@
def test_simple(): def test_simple():
assert 1 + 1 == 2 assert 1 + 1 == 2
def test_envionment(): def test_envionment():
$USER = 'snail' $USER = 'snail'
x = 'USER' x = 'USER'
assert x in ${...} assert x in ${...}
assert ${'U' + 'SER'} == 'snail' assert ${'U' + 'SER'} == 'snail'
def test_xonsh_party(): def test_xonsh_party():
x = 'xonsh' x = 'xonsh'
y = 'party' y = 'party'
out = $(echo @(x + ' ' + y)) out = $(echo @(x + '-' + y)).strip()
assert out == 'xonsh party\n', 'Out really was <' + out + '>, sorry.' assert out == 'xonsh-party', 'Out really was <' + out + '>, sorry.'

View file

@ -1,12 +1,14 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Tests the xonsh lexer.""" """Tests the xonsh lexer."""
from __future__ import unicode_literals, print_function from __future__ import unicode_literals, print_function
import os
import sys import sys
import ast import ast
import builtins import builtins
import platform import platform
import subprocess import subprocess
from collections import defaultdict from collections import defaultdict
from collections.abc import MutableMapping
import pytest import pytest
@ -63,6 +65,47 @@ class DummyShell:
return self._shell return self._shell
class DummyCommandsCache:
def locate_binary(self, name):
return os.path.join(os.path.dirname(__file__), 'bin', name)
def predict_backgroundable(self, cmd):
return True
class DummyHistory:
last_cmd_rtn = 0
last_cmd_out = ''
def append(self, x):
pass
class DummyEnv(MutableMapping):
def __init__(self, *args, **kwargs):
self._d = dict(*args, **kwargs)
def detype(self):
return {k: str(v) for k, v in self._d.items()}
def __getitem__(self, k):
return self._d[k]
def __setitem__(self, k, v):
self._d[k] = v
def __delitem__(self, k):
del self._d[k]
def __len__(self):
return len(self._d)
def __iter__(self):
yield from self._d
# #
# Execer tools # Execer tools
# #

View file

@ -19,8 +19,6 @@ else:
_sys.modules['xonsh.ansi_colors'] = __amalgam__ _sys.modules['xonsh.ansi_colors'] = __amalgam__
codecache = __amalgam__ codecache = __amalgam__
_sys.modules['xonsh.codecache'] = __amalgam__ _sys.modules['xonsh.codecache'] = __amalgam__
lazyimps = __amalgam__
_sys.modules['xonsh.lazyimps'] = __amalgam__
platform = __amalgam__ platform = __amalgam__
_sys.modules['xonsh.platform'] = __amalgam__ _sys.modules['xonsh.platform'] = __amalgam__
pretty = __amalgam__ pretty = __amalgam__
@ -29,10 +27,10 @@ else:
_sys.modules['xonsh.timings'] = __amalgam__ _sys.modules['xonsh.timings'] = __amalgam__
jobs = __amalgam__ jobs = __amalgam__
_sys.modules['xonsh.jobs'] = __amalgam__ _sys.modules['xonsh.jobs'] = __amalgam__
lazyimps = __amalgam__
_sys.modules['xonsh.lazyimps'] = __amalgam__
parser = __amalgam__ parser = __amalgam__
_sys.modules['xonsh.parser'] = __amalgam__ _sys.modules['xonsh.parser'] = __amalgam__
teepty = __amalgam__
_sys.modules['xonsh.teepty'] = __amalgam__
tokenize = __amalgam__ tokenize = __amalgam__
_sys.modules['xonsh.tokenize'] = __amalgam__ _sys.modules['xonsh.tokenize'] = __amalgam__
tools = __amalgam__ tools = __amalgam__

View file

@ -22,94 +22,191 @@ if ON_WINDOWS:
kernel32.SetConsoleTitleW.argtypes = [ctypes.c_wchar_p] kernel32.SetConsoleTitleW.argtypes = [ctypes.c_wchar_p]
class _TeeOut(object): class _TeeStdBuf(io.RawIOBase):
"""Tees stdout into the original sys.stdout and another buffer.""" """A dispatcher for bytes to two buffers, as std stream buffer and an
in memory buffer.
"""
def __init__(self, buf): def __init__(self, stdbuf, membuf):
self.buffer = buf """
self.stdout = sys.stdout Parameters
self.encoding = self.stdout.encoding ----------
self.errors = self.stdout.errors stdbuf : BytesIO-like
sys.stdout = self The std stream buffer.
membuf : BytesIO-like
The in memory stream buffer.
"""
self.stdbuf = stdbuf
self.membuf = membuf
def fileno(self):
"""Returns the file descriptor of the std buffer."""
return self.stdbuf.fileno()
def seek(self, offset, whence=io.SEEK_SET):
"""Sets the location in both the stdbuf and the membuf."""
self.stdbuf.seek(offset, whence)
self.membuf.seek(offset, whence)
def truncate(self, size=None):
"""Truncate both buffers."""
self.stdbuf.truncate(size)
self.membuf.truncate(size)
def readinto(self, b):
"""Read bytes into buffer from both streams."""
self.stdbuf.readinto(b)
return self.membuf.readinto(b)
def write(self, b):
"""Write bytes into both buffers."""
self.stdbuf.write(b)
return self.membuf.write(b)
class _TeeStd(io.TextIOBase):
"""Tees a std stream into an in-memory container and the original stream."""
def __init__(self, name, mem):
"""
Parameters
----------
name : str
The name of the buffer in the sys module, e.g. 'stdout'.
mem : io.TextIOBase-like
The in-memory text-based representation/
"""
self._name = name
self.std = std = getattr(sys, name)
self.mem = mem
self.buffer = _TeeStdBuf(std.buffer, mem.buffer)
setattr(sys, name, self)
@property
def encoding(self):
"""The encoding of the in-memory buffer."""
return self.mem.encoding
@property
def errors(self):
"""The errors of the in-memory buffer."""
return self.mem.errors
@property
def newlines(self):
"""The newlines of the in-memory buffer."""
return self.mem.newlines
def _replace_std(self):
std = self.std
if std is None:
return
setattr(sys, self._name, std)
self.std = self._name = None
def __del__(self): def __del__(self):
sys.stdout = self.stdout self._replace_std()
def close(self): def close(self):
"""Restores the original stdout.""" """Restores the original std stream."""
sys.stdout = self.stdout self._replace_std()
def write(self, data): def write(self, s):
"""Writes data to the original stdout and the buffer.""" """Writes data to the original std stream and the in-memory object."""
# data = data.replace('\001', '').replace('\002', '') self.std.write(s)
self.stdout.write(data) self.mem.write(s)
self.buffer.write(data)
def flush(self): def flush(self):
"""Flushes both the original stdout and the buffer.""" """Flushes both the original stdout and the buffer."""
self.stdout.flush() self.std.flush()
self.buffer.flush() self.mem.flush()
def fileno(self): def fileno(self):
"""Tunnel fileno() calls.""" """Tunnel fileno() calls to the std stream."""
return self.stdout.fileno() return self.std.fileno()
def seek(self, offset, whence=io.SEEK_SET):
"""Seek to a location in both streams."""
self.std.seek(offset, whence)
self.mem.seek(offset, whence)
def truncate(self, size=None):
"""Seek to a location in both streams."""
self.std.truncate(size)
self.mem.truncate(size)
def detach(self):
"""This operation is not supported."""
raise io.UnsupportedOperation
def read(self, size=None):
"""Read from the in-memory stream and seek to a new location in the
std stream.
"""
s = self.mem.read(size)
loc = self.std.tell()
self.std.seek(loc + len(s))
return s
def readline(self, size=-1):
"""Read a line from the in-memory stream and seek to a new location
in the std stream.
"""
s = self.mem.readline(size)
loc = self.std.tell()
self.std.seek(loc + len(s))
return s
def write(self, s):
"""Write a string to both streams and return the length written to the
in-memory stream.
"""
self.std.write(s)
return self.mem.write(s)
class _TeeErr(object): class Tee:
"""Tees stderr into the original sys.stdout and another buffer.""" """Class that merges tee'd stdout and stderr into a single strea,.
def __init__(self, buf):
self.buffer = buf
self.stderr = sys.stderr
self.encoding = self.stderr.encoding
self.errors = self.stderr.errors
sys.stderr = self
def __del__(self):
sys.stderr = self.stderr
def close(self):
"""Restores the original stderr."""
sys.stderr = self.stderr
def write(self, data):
"""Writes data to the original stderr and the buffer."""
# data = data.replace('\001', '').replace('\002', '')
self.stderr.write(data)
self.buffer.write(data)
def flush(self):
"""Flushes both the original stderr and the buffer."""
self.stderr.flush()
self.buffer.flush()
def fileno(self):
"""Tunnel fileno() calls."""
return self.stderr.fileno()
class Tee(io.StringIO):
"""Class that merges tee'd stdout and stderr into a single buffer.
This represents what a user would actually see on the command line. This represents what a user would actually see on the command line.
This class as the same interface as io.TextIOWrapper, except that
the buffer is optional.
""" """
# pylint is a stupid about counting public methods when using inheritance. # pylint is a stupid about counting public methods when using inheritance.
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
def __init__(self, *args, **kwargs): def __init__(self, buffer=None, encoding=None, errors=None,
super().__init__(*args, **kwargs) newline=None, line_buffering=False, write_through=False):
self.stdout = _TeeOut(self) self.buffer = io.BytesIO() if buffer is None else buffer
self.stderr = _TeeErr(self) self.memory = io.TextIOWrapper(self.buffer, encoding=encoding,
errors=errors, newline=newline,
line_buffering=line_buffering,
write_through=write_through)
self.stdout = _TeeStd('stdout', self.memory)
self.stderr = _TeeStd('stderr', self.memory)
@property
def line_buffering(self):
return self.memory.line_buffering
def __del__(self): def __del__(self):
del self.stdout, self.stderr del self.stdout, self.stderr
super().__del__() self.stdout = self.stderr = None
def close(self): def close(self):
"""Closes the buffer as well as the stdout and stderr tees.""" """Closes the buffer as well as the stdout and stderr tees."""
self.stdout.close() self.stdout.close()
self.stderr.close() self.stderr.close()
super().close() self.memory.close()
def getvalue(self):
"""Gets the current contents of the in-memory buffer."""
m = self.memory
loc = m.tell()
m.seek(0)
s = m.read()
m.seek(loc)
return s
class BaseShell(object): class BaseShell(object):
@ -164,13 +261,14 @@ class BaseShell(object):
src, code = self.push(line) src, code = self.push(line)
if code is None: if code is None:
return return
events.on_precommand.fire(src) events.on_precommand.fire(src)
env = builtins.__xonsh_env__
hist = builtins.__xonsh_history__ # pylint: disable=no-member hist = builtins.__xonsh_history__ # pylint: disable=no-member
ts1 = None ts1 = None
store_stdout = builtins.__xonsh_env__.get('XONSH_STORE_STDOUT') # pylint: disable=no-member store_stdout = env.get('XONSH_STORE_STDOUT') # pylint: disable=no-member
tee = Tee() if store_stdout else io.StringIO() enc = env.get('XONSH_ENCODING')
err = env.get('XONSH_ENCODING_ERRORS')
tee = Tee(encoding=enc, errors=err) if store_stdout else io.StringIO()
try: try:
ts0 = time.time() ts0 = time.time()
run_compiled_code(code, self.ctx, None, 'single') run_compiled_code(code, self.ctx, None, 'single')
@ -189,7 +287,6 @@ class BaseShell(object):
ts1 = ts1 or time.time() ts1 = ts1 or time.time()
self._append_history(inp=src, ts=[ts0, ts1], tee_out=tee.getvalue()) self._append_history(inp=src, ts=[ts0, ts1], tee_out=tee.getvalue())
tee.close() tee.close()
self._fix_cwd() self._fix_cwd()
if builtins.__xonsh_exit__: # pylint: disable=no-member if builtins.__xonsh_exit__: # pylint: disable=no-member
return True return True

View file

@ -4,6 +4,7 @@
Note that this module is named 'built_ins' so as not to be confused with the Note that this module is named 'built_ins' so as not to be confused with the
special Python builtins module. special Python builtins module.
""" """
import io
import os import os
import re import re
import sys import sys
@ -30,13 +31,14 @@ from xonsh.foreign_shells import load_foreign_aliases
from xonsh.jobs import add_job, wait_for_active_job from xonsh.jobs import add_job, wait_for_active_job
from xonsh.platform import ON_POSIX, ON_WINDOWS from xonsh.platform import ON_POSIX, ON_WINDOWS
from xonsh.proc import ( from xonsh.proc import (
ProcProxy, SimpleProcProxy, ForegroundProcProxy, PopenThread, ProcProxy, ForegroundProcProxy,
SimpleForegroundProcProxy, TeePTYProc, pause_call_resume, CompletedCommand, pause_call_resume, CommandPipeline,
HiddenCompletedCommand) HiddenCommandPipeline, STDOUT_CAPTURE_KINDS)
from xonsh.tools import ( from xonsh.tools import (
suggest_commands, expandvars, globpath, XonshError, suggest_commands, expandvars, globpath, XonshError,
XonshCalledProcessError, XonshBlockError XonshCalledProcessError, XonshBlockError
) )
from xonsh.lazyimps import pty
from xonsh.commands_cache import CommandsCache from xonsh.commands_cache import CommandsCache
from xonsh.events import events from xonsh.events import events
@ -55,24 +57,6 @@ def AT_EXIT_SIGNALS():
return sigs return sigs
@lazyobject
def SIGNAL_MESSAGES():
sm = {
signal.SIGABRT: 'Aborted',
signal.SIGFPE: 'Floating point exception',
signal.SIGILL: 'Illegal instructions',
signal.SIGTERM: 'Terminated',
signal.SIGSEGV: 'Segmentation fault',
}
if ON_POSIX:
sm.update({
signal.SIGQUIT: 'Quit',
signal.SIGHUP: 'Hangup',
signal.SIGKILL: 'Killed',
})
return sm
def resetting_signal_handle(sig, f): def resetting_signal_handle(sig, f):
"""Sets a new signal handle that will automatically restore the old value """Sets a new signal handle that will automatically restore the old value
once the new handle is finished. once the new handle is finished.
@ -211,8 +195,7 @@ def _un_shebang(x):
def get_script_subproc_command(fname, args): def get_script_subproc_command(fname, args):
""" """Given the name of a script outside the path, returns a list representing
Given the name of a script outside the path, returns a list representing
an appropriate subprocess command to execute the script. Raises an appropriate subprocess command to execute the script. Raises
PermissionError if the script is not executable. PermissionError if the script is not executable.
""" """
@ -281,12 +264,11 @@ def _is_redirect(x):
return isinstance(x, str) and _REDIR_REGEX.match(x) return isinstance(x, str) and _REDIR_REGEX.match(x)
def _open(fname, mode): def safe_open(fname, mode, buffering=-1):
"""Safely attempts to open a file in for xonsh subprocs."""
# file descriptors # file descriptors
if isinstance(fname, int):
return fname
try: try:
return open(fname, mode) return io.open(fname, mode, buffering=buffering)
except PermissionError: except PermissionError:
raise XonshError('xonsh: {0}: permission denied'.format(fname)) raise XonshError('xonsh: {0}: permission denied'.format(fname))
except FileNotFoundError: except FileNotFoundError:
@ -295,13 +277,20 @@ def _open(fname, mode):
raise XonshError('xonsh: {0}: unable to open file'.format(fname)) raise XonshError('xonsh: {0}: unable to open file'.format(fname))
def _redirect_io(streams, r, loc=None): def safe_close(x):
# special case of redirecting stderr to stdout """Safely attempts to close an object."""
if r.replace('&', '') in _E2O_MAP: if not isinstance(x, io.IOBase):
if 'stderr' in streams:
raise XonshError('Multiple redirects for stderr')
streams['stderr'] = ('<stdout>', 'a', subprocess.STDOUT)
return return
if x.closed:
return
try:
x.close()
except Exception:
pass
def _parse_redirects(r):
"""returns origin, mode, destination tuple"""
orig, mode, dest = _REDIR_REGEX.match(r).groups() orig, mode, dest = _REDIR_REGEX.match(r).groups()
# redirect to fd # redirect to fd
if dest.startswith('&'): if dest.startswith('&'):
@ -317,44 +306,435 @@ def _redirect_io(streams, r, loc=None):
except Exception: except Exception:
pass pass
mode = _MODES.get(mode, None) mode = _MODES.get(mode, None)
if mode == 'r' and (len(orig) > 0 or len(dest) > 0):
raise XonshError('Unrecognized redirection command: {}'.format(r))
elif mode in _WRITE_MODES and len(dest) > 0:
raise XonshError('Unrecognized redirection command: {}'.format(r))
return orig, mode, dest
def _redirect_streams(r, loc=None):
"""Returns stdin, stdout, stderr tuple of redirections."""
stdin = stdout = stderr = None
# special case of redirecting stderr to stdout
if r.replace('&', '') in _E2O_MAP:
stderr = subprocess.STDOUT
return stdin, stdout, stderr
# get streams
orig, mode, dest = _parse_redirects(r)
if mode == 'r': if mode == 'r':
if len(orig) > 0 or len(dest) > 0: stdin = safe_open(loc, mode)
raise XonshError('Unrecognized redirection command: {}'.format(r))
elif 'stdin' in streams:
raise XonshError('Multiple inputs for stdin')
else:
streams['stdin'] = (loc, 'r', _open(loc, mode))
elif mode in _WRITE_MODES: elif mode in _WRITE_MODES:
if orig in _REDIR_ALL: if orig in _REDIR_ALL:
if 'stderr' in streams: stdout = stderr = safe_open(loc, mode)
raise XonshError('Multiple redirects for stderr')
elif 'stdout' in streams:
raise XonshError('Multiple redirects for stdout')
elif len(dest) > 0:
e = 'Unrecognized redirection command: {}'.format(r)
raise XonshError(e)
targets = ['stdout', 'stderr']
elif orig in _REDIR_ERR:
if 'stderr' in streams:
raise XonshError('Multiple redirects for stderr')
elif len(dest) > 0:
e = 'Unrecognized redirection command: {}'.format(r)
raise XonshError(e)
targets = ['stderr']
elif orig in _REDIR_OUT: elif orig in _REDIR_OUT:
if 'stdout' in streams: stdout = safe_open(loc, mode)
raise XonshError('Multiple redirects for stdout') elif orig in _REDIR_ERR:
elif len(dest) > 0: stderr = safe_open(loc, mode)
e = 'Unrecognized redirection command: {}'.format(r)
raise XonshError(e)
targets = ['stdout']
else: else:
raise XonshError('Unrecognized redirection command: {}'.format(r)) raise XonshError('Unrecognized redirection command: {}'.format(r))
f = _open(loc, mode)
for t in targets:
streams[t] = (loc, mode, f)
else: else:
raise XonshError('Unrecognized redirection command: {}'.format(r)) raise XonshError('Unrecognized redirection command: {}'.format(r))
return stdin, stdout, stderr
def default_signal_pauser(n, f):
"""Pauses a signal, as needed."""
signal.pause()
def no_pg_xonsh_preexec_fn():
"""Default subprocess preexec function for when there is no existing
pipeline group.
"""
os.setpgrp()
signal.signal(signal.SIGTSTP, default_signal_pauser)
class SubprocSpec:
"""A container for specifiying how a subprocess command should be
executed.
"""
kwnames = ('stdin', 'stdout', 'stderr', 'universal_newlines')
def __init__(self, cmd, *, cls=subprocess.Popen, stdin=None, stdout=None,
stderr=None, universal_newlines=False):
"""
Parameters
----------
cmd : list of str
Command to be run.
cls : Popen-like
Class to run the subprocess with.
stdin : file-like
Popen file descriptor or flag for stdin.
stdout : file-like
Popen file descriptor or flag for stdout.
stderr : file-like
Popen file descriptor or flag for stderr.
universal_newlines : bool
Whether or not to use universal newlines.
Attributes
----------
args : list of str
Arguments as originally supplied.
alias : list of str, callable, or None
The alias that was reolved for this command, if any.
binary_loc : str or None
Path to binary to execute.
is_proxy : bool
Whether or not the subprocess is or should be run as a proxy.
background : bool
Whether or not the subprocess should be started in the background.
backgroundable : bool
Whether or not the subprocess is able to be run in the background.
last_in_pipeline : bool
Whether the subprocess is the last in the execution pipeline.
captured_stdout : file-like
Handle to captured stdin
captured_stderr : file-like
Handle to captured stderr
"""
self._stdin = self._stdout = self._stderr = None
# args
self.cmd = list(cmd)
self.cls = cls
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.universal_newlines = universal_newlines
# pure attrs
self.args = list(cmd)
self.alias = None
self.binary_loc = None
self.is_proxy = False
self.background = False
self.backgroundable = True
self.last_in_pipeline = False
self.captured_stdout = None
self.captured_stderr = None
def __str__(self):
s = self.cls.__name__ + '(' + str(cmd) + ', '
kws = [n + '=' + str(getattr(self, n)) for n in self.kwnames]
s += ', '.join(kws) + ')'
return s
def __repr__(self):
s = self.__class__.__name__ + '(' + repr(cmd) + ', '
s += self.cls.__name__ + ', '
kws = [n + '=' + repr(getattr(self, n)) for n in self.kwnames]
s += ', '.join(kws) + ')'
return s
#
# Properties
#
@property
def stdin(self):
return self._stdin
@stdin.setter
def stdin(self, value):
if self._stdin is None:
self._stdin = value
elif value is None:
pass
else:
safe_close(value)
msg = 'Multiple inputs for stdin for {0!r}'
msg = msg.format(' '.join(self.args))
raise XonshError(msg)
@property
def stdout(self):
return self._stdout
@stdout.setter
def stdout(self, value):
if self._stdout is None:
self._stdout = value
elif value is None:
pass
else:
safe_close(value)
msg = 'Multiple redirections for stdout for {0!r}'
msg = msg.format(' '.join(self.args))
raise XonshError(msg)
@property
def stderr(self):
return self._stderr
@stderr.setter
def stderr(self, value):
if self._stderr is None:
self._stderr = value
elif value is None:
pass
else:
safe_close(value)
msg = 'Multiple redirections for stderr for {0!r}'
msg = msg.format(' '.join(self.args))
raise XonshError(msg)
#
# Execution methods
#
def run(self, *, pipeline_group=None):
"""Launches the subprocess and returns the object."""
kwargs = {n: getattr(self, n) for n in self.kwnames}
self.prep_env(kwargs)
self.prep_preexec_fn(kwargs, pipeline_group=pipeline_group)
if callable(self.alias):
p = self.cls(self.alias, self.cmd, **kwargs)
else:
p = self._run_binary(kwargs)
p.last_in_pipeline = self.last_in_pipeline
p.captured_stdout = self.captured_stdout
p.captured_stderr = self.captured_stderr
return p
def _run_binary(self, kwargs):
try:
bufsize = 1
p = self.cls(self.cmd, bufsize=bufsize, **kwargs)
except PermissionError:
e = 'xonsh: subprocess mode: permission denied: {0}'
raise XonshError(e.format(self.cmd[0]))
except FileNotFoundError:
cmd0 = self.cmd[0]
e = 'xonsh: subprocess mode: command not found: {0}'.format(cmd0)
env = builtins.__xonsh_env__
sug = suggest_commands(cmd0, env, builtins.aliases)
if len(sug.strip()) > 0:
e += '\n' + suggest_commands(cmd0, env, builtins.aliases)
raise XonshError(e)
return p
def prep_env(self, kwargs):
"""Prepares the environment to use in the subprocess."""
denv = builtins.__xonsh_env__.detype()
if ON_WINDOWS:
# Over write prompt variable as xonsh's $PROMPT does
# not make much sense for other subprocs
denv['PROMPT'] = '$P$G'
kwargs['env'] = denv
def prep_preexec_fn(self, kwargs, pipeline_group=None):
"""Prepares the 'preexec_fn' keyword argument"""
if not (ON_POSIX and self.cls is subprocess.Popen):
return
if pipeline_group is None:
xonsh_preexec_fn = no_pg_xonsh_preexec_fn
else:
def xonsh_preexec_fn():
"""Preexec function bound to a pipeline group."""
os.setpgid(0, pipeline_group)
signal.signal(signal.SIGTSTP, default_signal_pauser)
kwargs['preexec_fn'] = xonsh_preexec_fn
#
# Building methods
#
@classmethod
def build(kls, cmd, *, cls=subprocess.Popen, **kwargs):
"""Creates an instance of the subprocess command, with any
modifcations and adjustments based on the actual cmd that
was recieved.
"""
# modifications that do not alter cmds may come before creating instance
spec = kls(cmd, cls=cls, **kwargs)
# modifications that alter cmds must come after creating instance
spec.redirect_leading()
spec.redirect_trailing()
spec.resolve_alias()
spec.resolve_binary_loc()
spec.resolve_auto_cd()
spec.resolve_executable_commands()
spec.resolve_alias_cls()
return spec
def redirect_leading(self):
"""Manage leading redirects such as with '< input.txt COMMAND'. """
while len(self.cmd) >= 3 and self.cmd[0] == '<':
self.stdin = safe_open(self.cmd[1], 'r')
self.cmd = self.cmd[2:]
def redirect_trailing(self):
"""Manages trailing redirects."""
while True:
cmd = self.cmd
if len(cmd) >= 3 and _is_redirect(cmd[-2]):
streams = _redirect_streams(cmd[-2], cmd[-1])
self.stdin, self.stdout, self.stderr = streams
self.cmd = cmd[:-2]
elif len(cmd) >= 2 and _is_redirect(cmd[-1]):
streams = _redirect_streams(cmd[-1])
self.stdin, self.stdout, self.stderr = streams
self.cmd = cmd[:-1]
else:
break
def resolve_alias(self):
"""Sets alias in command, if applicable."""
cmd0 = self.cmd[0]
if callable(cmd0):
alias = cmd0
else:
alias = builtins.aliases.get(cmd0, None)
self.alias = alias
def resolve_binary_loc(self):
"""Sets the binary location"""
alias = self.alias
if alias is None:
binary_loc = locate_binary(self.cmd[0])
elif callable(alias):
binary_loc = None
else:
binary_loc = locate_binary(alias[0])
self.binary_loc = binary_loc
def resolve_auto_cd(self):
"""Implements AUTO_CD functionality."""
if not (self.alias is None and
self.binary_loc is None and
len(self.cmd) == 1 and
builtins.__xonsh_env__.get('AUTO_CD') and
os.path.isdir(self.cmd[0])):
return
self.cmd.insert(0, 'cd')
self.alias = builtins.aliases.get('cd', None)
def resolve_executable_commands(self):
"""Resolve command executables, if applicable."""
alias = self.alias
if callable(alias):
self.cmd.pop(0)
return
elif alias is None:
pass
else:
self.cmd = alias + self.cmd[1:]
if self.binary_loc is None:
return
try:
self.cmd = get_script_subproc_command(self.binary_loc, self.cmd[1:])
except PermissionError:
e = 'xonsh: subprocess mode: permission denied: {0}'
raise XonshError(e.format(self.cmd[0]))
def resolve_alias_cls(self):
"""Determine which proxy class to run an alias with."""
alias = self.alias
if not callable(alias):
return
self.is_proxy = True
bgable = getattr(alias, '__xonsh_backgroundable__', True)
cls = ProcProxy if bgable else ForegroundProcProxy
self.cls = cls
self.backgroundable = bgable
def _update_last_spec(last, captured=False):
last.last_in_pipeline = True
env = builtins.__xonsh_env__
if not captured:
return
callable_alias = callable(last.alias)
if callable_alias:
pass
else:
bgable = (last.stdin is not None) or \
builtins.__xonsh_commands_cache__.predict_backgroundable(last.args)
if captured and bgable:
last.cls = PopenThread
elif not bgable:
# foreground processes should use Popen and not pipe stdout, stderr
last.backgroundable = False
return
# cannot used PTY pipes for aliases, for some dark reason,
# and must use normal pipes instead.
use_tty = ON_POSIX and not callable_alias
# Do not set standard in! Popen is not a fan of redirections here
# set standard out
if last.stdout is not None:
last.universal_newlines = True
elif captured in STDOUT_CAPTURE_KINDS:
last.universal_newlines = False
r, w = os.pipe()
last.stdout = safe_open(w, 'wb')
last.captured_stdout = safe_open(r, 'rb')
elif builtins.__xonsh_stdout_uncaptured__ is not None:
last.universal_newlines = True
last.stdout = builtins.__xonsh_stdout_uncaptured__
last.captured_stdout = last.stdout
else:
last.universal_newlines = True
r, w = pty.openpty() if use_tty else os.pipe()
last.stdout = safe_open(w, 'w')
last.captured_stdout = safe_open(r, 'r')
# set standard error
if last.stderr is not None:
pass
elif captured == 'object':
r, w = os.pipe()
last.stderr = safe_open(w, 'w')
last.captured_stderr = safe_open(r, 'r')
elif builtins.__xonsh_stderr_uncaptured__ is not None:
last.stderr = builtins.__xonsh_stderr_uncaptured__
last.captured_stderr = last.stderr
else:
r, w = pty.openpty() if use_tty else os.pipe()
last.stderr = safe_open(w, 'w')
last.captured_stderr = safe_open(r, 'r')
def cmds_to_specs(cmds, captured=False):
"""Converts a list of cmds to a list of SubprocSpec objects that are
ready to be executed.
"""
# first build the subprocs independently and separate from the redirects
specs = []
redirects = []
for cmd in cmds:
if isinstance(cmd, str):
redirects.append(cmd)
else:
if cmd[-1] == '&':
cmd = cmd[:-1]
redirects.append('&')
spec = SubprocSpec.build(cmd)
specs.append(spec)
# now modify the subprocs based on the redirects.
for i, redirect in enumerate(redirects):
if redirect == '|':
# these should remain integer file descriptors, and not Python
# file objects since they connect processes.
r, w = os.pipe()
specs[i].stdout = w
specs[i + 1].stdin = r
elif redirect == '&' and i == len(redirects) - 1:
specs[-1].background = True
else:
raise XonshError('unrecognized redirect {0!r}'.format(redirect))
# Apply boundry conditions
_update_last_spec(specs[-1], captured=captured)
return specs
def _should_set_title(captured=False):
env = builtins.__xonsh_env__
return (env.get('XONSH_INTERACTIVE') and
not env.get('XONSH_STORE_STDOUT') and
captured not in STDOUT_CAPTURE_KINDS and
hasattr(builtins, '__xonsh_shell__'))
def run_subproc(cmds, captured=False): def run_subproc(cmds, captured=False):
@ -371,268 +751,49 @@ def run_subproc(cmds, captured=False):
Lastly, the captured argument affects only the last real command. Lastly, the captured argument affects only the last real command.
""" """
env = builtins.__xonsh_env__ env = builtins.__xonsh_env__
background = False specs = cmds_to_specs(cmds, captured=captured)
procinfo = {}
if cmds[-1] == '&':
background = True
cmds = cmds[:-1]
_pipeline_group = None
write_target = None
last_cmd = len(cmds) - 1
procs = [] procs = []
prev_proc = None proc = pipeline_group = None
_capture_streams = captured in {'stdout', 'object'} for spec in specs:
for ix, cmd in enumerate(cmds):
starttime = time.time() starttime = time.time()
procinfo['args'] = list(cmd) proc = spec.run(pipeline_group=pipeline_group)
stdin = None
stderr = None
if isinstance(cmd, str):
continue
streams = {}
while True:
if len(cmd) >= 3 and _is_redirect(cmd[-2]):
_redirect_io(streams, cmd[-2], cmd[-1])
cmd = cmd[:-2]
elif len(cmd) >= 2 and _is_redirect(cmd[-1]):
_redirect_io(streams, cmd[-1])
cmd = cmd[:-1]
elif len(cmd) >= 3 and cmd[0] == '<':
_redirect_io(streams, cmd[0], cmd[1])
cmd = cmd[2:]
else:
break
# set standard input
if 'stdin' in streams:
if prev_proc is not None:
raise XonshError('Multiple inputs for stdin')
stdin = streams['stdin'][-1]
procinfo['stdin_redirect'] = streams['stdin'][:-1]
elif prev_proc is not None:
stdin = prev_proc.stdout
# set standard output
_stdout_name = None
_stderr_name = None
if 'stdout' in streams:
if ix != last_cmd:
raise XonshError('Multiple redirects for stdout')
stdout = streams['stdout'][-1]
procinfo['stdout_redirect'] = streams['stdout'][:-1]
elif ix != last_cmd:
stdout = subprocess.PIPE
elif _capture_streams:
_nstdout = stdout = tempfile.NamedTemporaryFile(delete=False)
_stdout_name = stdout.name
elif builtins.__xonsh_stdout_uncaptured__ is not None:
stdout = builtins.__xonsh_stdout_uncaptured__
else:
stdout = None
# set standard error
if 'stderr' in streams:
stderr = streams['stderr'][-1]
procinfo['stderr_redirect'] = streams['stderr'][:-1]
elif captured == 'object' and ix == last_cmd:
_nstderr = stderr = tempfile.NamedTemporaryFile(delete=False)
_stderr_name = stderr.name
elif builtins.__xonsh_stderr_uncaptured__ is not None:
stderr = builtins.__xonsh_stderr_uncaptured__
uninew = (ix == last_cmd) and (not _capture_streams)
# find alias
if callable(cmd[0]):
alias = cmd[0]
else:
alias = builtins.aliases.get(cmd[0], None)
procinfo['alias'] = alias
# find binary location, if not callable
if alias is None:
binary_loc = locate_binary(cmd[0])
elif not callable(alias):
binary_loc = locate_binary(alias[0])
# implement AUTO_CD
if (alias is None and
builtins.__xonsh_env__.get('AUTO_CD') and
len(cmd) == 1 and
os.path.isdir(cmd[0]) and
binary_loc is None):
cmd.insert(0, 'cd')
alias = builtins.aliases.get('cd', None)
if callable(alias):
aliased_cmd = alias
else:
if alias is not None:
aliased_cmd = alias + cmd[1:]
else:
aliased_cmd = cmd
if binary_loc is not None:
try:
aliased_cmd = get_script_subproc_command(binary_loc,
aliased_cmd[1:])
except PermissionError:
e = 'xonsh: subprocess mode: permission denied: {0}'
raise XonshError(e.format(cmd[0]))
_stdin_file = None
if (stdin is not None and
env.get('XONSH_STORE_STDIN') and
captured == 'object' and
__xonsh_commands_cache__.lazy_locate_binary('cat') and
__xonsh_commands_cache__.lazy_locate_binary('tee')):
_stdin_file = tempfile.NamedTemporaryFile()
cproc = subprocess.Popen(['cat'], stdin=stdin,
stdout=subprocess.PIPE)
tproc = subprocess.Popen(['tee', _stdin_file.name],
stdin=cproc.stdout, stdout=subprocess.PIPE)
stdin = tproc.stdout
if callable(aliased_cmd):
prev_is_proxy = True
bgable = getattr(aliased_cmd, '__xonsh_backgroundable__', True)
numargs = len(inspect.signature(aliased_cmd).parameters)
if numargs == 2:
cls = SimpleProcProxy if bgable else SimpleForegroundProcProxy
elif numargs == 4:
cls = ProcProxy if bgable else ForegroundProcProxy
else:
e = 'Expected callable with 2 or 4 arguments, not {}'
raise XonshError(e.format(numargs))
proc = cls(aliased_cmd, cmd[1:],
stdin, stdout, stderr,
universal_newlines=uninew)
else:
prev_is_proxy = False
usetee = ((stdout is None) and
(not background) and
env.get('XONSH_STORE_STDOUT', False))
cls = TeePTYProc if usetee else subprocess.Popen
subproc_kwargs = {}
if ON_POSIX and cls is subprocess.Popen:
def _subproc_pre():
if _pipeline_group is None:
os.setpgrp()
else:
os.setpgid(0, _pipeline_group)
signal.signal(signal.SIGTSTP, lambda n, f: signal.pause())
subproc_kwargs['preexec_fn'] = _subproc_pre
denv = env.detype()
if ON_WINDOWS:
# Over write prompt variable as xonsh's $PROMPT does
# not make much sense for other subprocs
denv['PROMPT'] = '$P$G'
try:
proc = cls(aliased_cmd,
universal_newlines=uninew,
env=denv,
stdin=stdin,
stdout=stdout,
stderr=stderr,
**subproc_kwargs)
except PermissionError:
e = 'xonsh: subprocess mode: permission denied: {0}'
raise XonshError(e.format(aliased_cmd[0]))
except FileNotFoundError:
cmd = aliased_cmd[0]
e = 'xonsh: subprocess mode: command not found: {0}'.format(cmd)
sug = suggest_commands(cmd, env, builtins.aliases)
if len(sug.strip()) > 0:
e += '\n' + suggest_commands(cmd, env, builtins.aliases)
raise XonshError(e)
procs.append(proc) procs.append(proc)
prev_proc = proc if ON_POSIX and pipeline_group is None and \
if ON_POSIX and cls is subprocess.Popen and _pipeline_group is None: spec.cls is subprocess.Popen:
_pipeline_group = prev_proc.pid pipeline_group = proc.pid
if not prev_is_proxy: if not spec.is_proxy:
add_job({ add_job({
'cmds': cmds, 'cmds': cmds,
'pids': [i.pid for i in procs], 'pids': [i.pid for i in procs],
'obj': prev_proc, 'obj': proc,
'bg': background 'bg': spec.background,
}) })
if (env.get('XONSH_INTERACTIVE') and if _should_set_title(captured=captured):
not env.get('XONSH_STORE_STDOUT') and # set title here to get currently executing command
not _capture_streams and pause_call_resume(proc, builtins.__xonsh_shell__.settitle)
hasattr(builtins, '__xonsh_shell__')): # create command or return if backgrounding.
# set title here to get current command running if spec.background:
pause_call_resume(prev_proc, builtins.__xonsh_shell__.settitle)
if background:
return return
if prev_is_proxy: #if not captured:
prev_proc.wait() # pass
wait_for_active_job() if captured == 'hiddenobject':
for proc in procs[:-1]: command = HiddenCommandPipeline(specs, procs, starttime=starttime,
try: captured=captured)
proc.stdout.close() else:
except OSError: command = CommandPipeline(specs, procs, starttime=starttime,
pass captured=captured)
hist = builtins.__xonsh_history__ # now figure out what we should return.
hist.last_cmd_rtn = prev_proc.returncode
# get output
output = b''
if write_target is None:
if _stdout_name is not None:
with open(_stdout_name, 'rb') as stdoutfile:
output = stdoutfile.read()
try:
_nstdout.close()
except Exception:
pass
os.unlink(_stdout_name)
elif prev_proc.stdout not in (None, sys.stdout):
output = prev_proc.stdout.read()
if _capture_streams:
# to get proper encoding from Popen, we have to
# use a byte stream and then implement universal_newlines here
output = output.decode(encoding=env.get('XONSH_ENCODING'),
errors=env.get('XONSH_ENCODING_ERRORS'))
output = output.replace('\r\n', '\n')
else:
hist.last_cmd_out = output
if captured == 'object': # get stderr as well
named = _stderr_name is not None
unnamed = prev_proc.stderr not in {None, sys.stderr}
if named:
with open(_stderr_name, 'rb') as stderrfile:
errout = stderrfile.read()
try:
_nstderr.close()
except Exception:
pass
os.unlink(_stderr_name)
elif unnamed:
errout = prev_proc.stderr.read()
if named or unnamed:
errout = errout.decode(encoding=env.get('XONSH_ENCODING'),
errors=env.get('XONSH_ENCODING_ERRORS'))
errout = errout.replace('\r\n', '\n')
procinfo['stderr'] = errout
if getattr(prev_proc, 'signal', None):
sig, core = prev_proc.signal
sig_str = SIGNAL_MESSAGES.get(sig)
if sig_str:
if core:
sig_str += ' (core dumped)'
print(sig_str, file=sys.stderr)
if (not prev_is_proxy and
hist.last_cmd_rtn is not None and
hist.last_cmd_rtn > 0 and
env.get('RAISE_SUBPROC_ERROR')):
raise subprocess.CalledProcessError(hist.last_cmd_rtn, aliased_cmd,
output=output)
if captured == 'stdout': if captured == 'stdout':
return output command.end()
elif captured is not False: return command.output
procinfo['executed_cmd'] = aliased_cmd elif captured == 'object':
procinfo['pid'] = prev_proc.pid return command
procinfo['returncode'] = prev_proc.returncode elif captured == 'hiddenobject':
procinfo['timestamp'] = (starttime, time.time()) command.end()
if captured == 'object': return command
procinfo['stdout'] = output else:
if _stdin_file is not None: command.end()
_stdin_file.seek(0) return
procinfo['stdin'] = _stdin_file.read().decode()
_stdin_file.close()
return CompletedCommand(**procinfo)
else:
return HiddenCompletedCommand(**procinfo)
def subproc_captured_stdout(*cmds): def subproc_captured_stdout(*cmds):
@ -651,15 +812,14 @@ def subproc_captured_inject(*cmds):
def subproc_captured_object(*cmds): def subproc_captured_object(*cmds):
""" """
Runs a subprocess, capturing the output. Returns an instance of Runs a subprocess, capturing the output. Returns an instance of
``CompletedCommand`` representing the completed command. CommandPipeline representing the completed command.
""" """
return run_subproc(cmds, captured='object') return run_subproc(cmds, captured='object')
def subproc_captured_hiddenobject(*cmds): def subproc_captured_hiddenobject(*cmds):
""" """Runs a subprocess, capturing the output. Returns an instance of
Runs a subprocess, capturing the output. Returns an instance of HiddenCommandPipeline representing the completed command.
``HiddenCompletedCommand`` representing the completed command.
""" """
return run_subproc(cmds, captured='hiddenobject') return run_subproc(cmds, captured='hiddenobject')

View file

@ -1,10 +1,20 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Module for caching command & alias names as well as for predicting whether
a command will be able to be run in the background.
A background predictor is a function that accepect a single argument list
and returns whethere or not the process can be run in the background (returns
True) or must be run the foreground (returns False).
"""
import os import os
import builtins import builtins
import argparse
import collections
import collections.abc as cabc import collections.abc as cabc
from xonsh.platform import ON_WINDOWS from xonsh.platform import ON_WINDOWS, pathbasename
from xonsh.tools import executables_in from xonsh.tools import executables_in
from xonsh.lazyasd import lazyobject
class CommandsCache(cabc.Mapping): class CommandsCache(cabc.Mapping):
@ -20,6 +30,7 @@ class CommandsCache(cabc.Mapping):
self._path_checksum = None self._path_checksum = None
self._alias_checksum = None self._alias_checksum = None
self._path_mtime = -1 self._path_mtime = -1
self.backgroundable_predictors = default_backgroundable_predictors()
def __contains__(self, key): def __contains__(self, key):
_ = self.all_commands _ = self.all_commands
@ -97,17 +108,22 @@ class CommandsCache(cabc.Mapping):
self._cmds_cache = allcmds self._cmds_cache = allcmds
return allcmds return allcmds
def cached_name(self, name):
"""Returns the name that would appear in the cache, if it was exists."""
if name is None:
return None
cached = pathbasename(name)
if ON_WINDOWS:
keys = self.get_possible_names(cached)
cached = next((k for k in keys if k in self._cmds_cache), None)
return cached
def lazyin(self, key): def lazyin(self, key):
"""Checks if the value is in the current cache without the potential to """Checks if the value is in the current cache without the potential to
update the cache. It just says whether the value is known *now*. This update the cache. It just says whether the value is known *now*. This
may not reflect precisely what is on the $PATH. may not reflect precisely what is on the $PATH.
""" """
if ON_WINDOWS: return self.cached_name(key) in self._cmds_cache
keys = self.get_possible_names(key)
cached_key = next((k for k in keys if k in self._cmds_cache), None)
return cached_key is not None
else:
return key in self._cmds_cache
def lazyiter(self): def lazyiter(self):
"""Returns an iterator over the current cache contents without the """Returns an iterator over the current cache contents without the
@ -125,11 +141,7 @@ class CommandsCache(cabc.Mapping):
def lazyget(self, key, default=None): def lazyget(self, key, default=None):
"""A lazy value getter.""" """A lazy value getter."""
if ON_WINDOWS: return self._cmds_cache.get(self.cached_name(key), default)
keys = self.get_possible_names(key)
cached_key = next((k for k in keys if k in self._cmds_cache), None)
key = cached_key if cached_key is not None else key
return self._cmds_cache.get(key, default)
def locate_binary(self, name): def locate_binary(self, name):
"""Locates an executable on the file system using the cache.""" """Locates an executable on the file system using the cache."""
@ -153,3 +165,65 @@ class CommandsCache(cabc.Mapping):
return self._cmds_cache[cached][0] return self._cmds_cache[cached][0]
elif os.path.isfile(name) and name != os.path.basename(name): elif os.path.isfile(name) and name != os.path.basename(name):
return name return name
def predict_backgroundable(self, cmd):
"""Predics whether a command list is backgroundable."""
name = self.cached_name(cmd[0])
path, is_alias = self.lazyget(name, (None, None))
if path is None or is_alias:
return True
predictor = self.backgroundable_predictors[name]
return predictor(cmd[1:])
#
# Background Predictors
#
def predict_true(args):
"""Always say the process is backgroundable."""
return True
def predict_false(args):
"""Never say the process is backgroundable."""
return False
@lazyobject
def SHELL_PREDICTOR_PARSER():
p = argparse.ArgumentParser('shell')
p.add_argument('-c', nargs='?', default=None)
p.add_argument('filename', nargs='?', default=None)
return p
def predict_shell(args):
"""Precict the backgroundability of the normal shell interface, which
comes down to whether it is being run in subproc mode.
"""
ns, _ = SHELL_PREDICTOR_PARSER.parse_known_args(args)
if ns.c is None and ns.filename is None:
pred = False
else:
pred = True
return pred
def default_backgroundable_predictors():
"""Generates a new defaultdict for known backgroundable predictors.
The default is to predict true.
"""
return collections.defaultdict(lambda: predict_true,
sh=predict_shell,
zsh=predict_shell,
ksh=predict_shell,
csh=predict_shell,
tcsh=predict_shell,
bash=predict_shell,
fish=predict_shell,
xonsh=predict_shell,
ssh=predict_false,
startx=predict_false,
vi=predict_false,
vim=predict_false,
)

View file

@ -140,7 +140,6 @@ def DEFAULT_ENSURERS():
'BOTTOM_TOOLBAR': (is_string_or_callable, ensure_string, ensure_string), 'BOTTOM_TOOLBAR': (is_string_or_callable, ensure_string, ensure_string),
'SUBSEQUENCE_PATH_COMPLETION': (is_bool, to_bool, bool_to_str), 'SUBSEQUENCE_PATH_COMPLETION': (is_bool, to_bool, bool_to_str),
'SUPPRESS_BRANCH_TIMEOUT_MESSAGE': (is_bool, to_bool, bool_to_str), 'SUPPRESS_BRANCH_TIMEOUT_MESSAGE': (is_bool, to_bool, bool_to_str),
'TEEPTY_PIPE_DELAY': (is_float, float, str),
'UPDATE_OS_ENVIRON': (is_bool, to_bool, bool_to_str), 'UPDATE_OS_ENVIRON': (is_bool, to_bool, bool_to_str),
'VC_BRANCH_TIMEOUT': (is_float, float, str), 'VC_BRANCH_TIMEOUT': (is_float, float, str),
'VI_MODE': (is_bool, to_bool, bool_to_str), 'VI_MODE': (is_bool, to_bool, bool_to_str),
@ -156,6 +155,7 @@ def DEFAULT_ENSURERS():
'XONSH_ENCODING_ERRORS': (is_string, ensure_string, ensure_string), 'XONSH_ENCODING_ERRORS': (is_string, ensure_string, ensure_string),
'XONSH_HISTORY_SIZE': (is_history_tuple, to_history_tuple, history_tuple_to_str), 'XONSH_HISTORY_SIZE': (is_history_tuple, to_history_tuple, history_tuple_to_str),
'XONSH_LOGIN': (is_bool, to_bool, bool_to_str), 'XONSH_LOGIN': (is_bool, to_bool, bool_to_str),
'XONSH_PROC_FREQUENCY': (is_float, float, str),
'XONSH_SHOW_TRACEBACK': (is_bool, to_bool, bool_to_str), 'XONSH_SHOW_TRACEBACK': (is_bool, to_bool, bool_to_str),
'XONSH_STORE_STDOUT': (is_bool, to_bool, bool_to_str), 'XONSH_STORE_STDOUT': (is_bool, to_bool, bool_to_str),
'XONSH_STORE_STDIN': (is_bool, to_bool, bool_to_str), 'XONSH_STORE_STDIN': (is_bool, to_bool, bool_to_str),
@ -275,7 +275,6 @@ def DEFAULT_VALUES():
'SUGGEST_COMMANDS': True, 'SUGGEST_COMMANDS': True,
'SUGGEST_MAX_NUM': 5, 'SUGGEST_MAX_NUM': 5,
'SUGGEST_THRESHOLD': 3, 'SUGGEST_THRESHOLD': 3,
'TEEPTY_PIPE_DELAY': 0.01,
'TITLE': DEFAULT_TITLE, 'TITLE': DEFAULT_TITLE,
'UPDATE_OS_ENVIRON': False, 'UPDATE_OS_ENVIRON': False,
'VC_BRANCH_TIMEOUT': 0.2 if ON_WINDOWS else 0.1, 'VC_BRANCH_TIMEOUT': 0.2 if ON_WINDOWS else 0.1,
@ -298,6 +297,7 @@ def DEFAULT_VALUES():
'XONSH_HISTORY_FILE': os.path.expanduser('~/.xonsh_history.json'), 'XONSH_HISTORY_FILE': os.path.expanduser('~/.xonsh_history.json'),
'XONSH_HISTORY_SIZE': (8128, 'commands'), 'XONSH_HISTORY_SIZE': (8128, 'commands'),
'XONSH_LOGIN': False, 'XONSH_LOGIN': False,
'XONSH_PROC_FREQUENCY': 1e-4,
'XONSH_SHOW_TRACEBACK': False, 'XONSH_SHOW_TRACEBACK': False,
'XONSH_STORE_STDIN': False, 'XONSH_STORE_STDIN': False,
'XONSH_STORE_STDOUT': False, 'XONSH_STORE_STDOUT': False,
@ -532,15 +532,6 @@ def DEFAULT_DOCS():
'tab completion of paths.'), 'tab completion of paths.'),
'SUPPRESS_BRANCH_TIMEOUT_MESSAGE': VarDocs( 'SUPPRESS_BRANCH_TIMEOUT_MESSAGE': VarDocs(
'Whether or not to supress branch timeout warning messages.'), 'Whether or not to supress branch timeout warning messages.'),
'TEEPTY_PIPE_DELAY': VarDocs(
'The number of [seconds] to delay a spawned process if it has '
'information being piped in via stdin. This value must be a float. '
'If a value less than or equal to zero is passed in, no delay is '
'used. This can be used to fix situations where a spawned process, '
'such as piping into ``grep``, exits too quickly for the piping '
'operation itself. TeePTY (and thus this variable) are currently '
'only used when ``$XONSH_STORE_STDOUT`` is True.',
configurable=ON_LINUX),
'TERM': VarDocs( 'TERM': VarDocs(
'TERM is sometimes set by the terminal emulator. This is used (when ' 'TERM is sometimes set by the terminal emulator. This is used (when '
"valid) to determine whether or not to set the title. Users shouldn't " "valid) to determine whether or not to set the title. Users shouldn't "
@ -660,6 +651,9 @@ def DEFAULT_DOCS():
'XONSH_LOGIN': VarDocs( 'XONSH_LOGIN': VarDocs(
'``True`` if xonsh is running as a login shell, and ``False`` otherwise.', '``True`` if xonsh is running as a login shell, and ``False`` otherwise.',
configurable=False), configurable=False),
'XONSH_PROC_FREQUENCY': VarDocs('The process frquency is the time that '
'xonsh process threads sleep for while running command pipelines. '
'The value has units of seconds [s].'),
'XONSH_SHOW_TRACEBACK': VarDocs( 'XONSH_SHOW_TRACEBACK': VarDocs(
'Controls if a traceback is shown if exceptions occur in the shell. ' 'Controls if a traceback is shown if exceptions occur in the shell. '
'Set to ``True`` to always show traceback or ``False`` to always hide. ' 'Set to ``True`` to always show traceback or ``False`` to always hide. '

View file

@ -251,9 +251,7 @@ def get_next_job_number():
def add_job(info): def add_job(info):
""" """Add a new job to the jobs dictionary."""
Add a new job to the jobs dictionary.
"""
num = get_next_job_number() num = get_next_job_number()
info['started'] = time.time() info['started'] = time.time()
info['status'] = "running" info['status'] = "running"
@ -377,16 +375,15 @@ def fg(args, stdin=None):
def bg(args, stdin=None): def bg(args, stdin=None):
""" """xonsh command: bg
xonsh command: bg
Resume execution of the currently active job in the background, or, if a Resume execution of the currently active job in the background, or, if a
single number is given as an argument, resume that job in the background. single number is given as an argument, resume that job in the background.
""" """
res = fg(args, stdin) res = fg(args, stdin)
if res is None: if res is None:
curTask = get_task(tasks[0]) curtask = get_task(tasks[0])
curTask['bg'] = True curtask['bg'] = True
_continue(curTask) _continue(curtask)
else: else:
return res return res

View file

@ -1,9 +1,42 @@
"""Lazy imports that may apply across the xonsh package.""" """Lazy imports that may apply across the xonsh package."""
import importlib import importlib
from xonsh.lazyasd import LazyObject from xonsh.platform import ON_WINDOWS
from xonsh.lazyasd import LazyObject, lazyobject
pygments = LazyObject(lambda: importlib.import_module('pygments'), pygments = LazyObject(lambda: importlib.import_module('pygments'),
globals(), 'pygments') globals(), 'pygments')
pyghooks = LazyObject(lambda: importlib.import_module('xonsh.pyghooks'), pyghooks = LazyObject(lambda: importlib.import_module('xonsh.pyghooks'),
globals(), 'pyghooks') globals(), 'pyghooks')
@lazyobject
def pty():
if ON_WINDOWS:
return
else:
return importlib.import_module('pty')
@lazyobject
def termios():
if ON_WINDOWS:
return
else:
return importlib.import_module('termios')
@lazyobject
def fcntl():
if ON_WINDOWS:
return
else:
return importlib.import_module('fcntl')
@lazyobject
def tty():
if ON_WINDOWS:
return
else:
return importlib.import_module('tty')

View file

@ -11,7 +11,7 @@ from xonsh import __version__
from xonsh.lazyasd import lazyobject from xonsh.lazyasd import lazyobject
from xonsh.shell import Shell from xonsh.shell import Shell
from xonsh.pretty import pretty from xonsh.pretty import pretty
from xonsh.proc import HiddenCompletedCommand from xonsh.proc import HiddenCommandPipeline
from xonsh.jobs import ignore_sigtstp from xonsh.jobs import ignore_sigtstp
from xonsh.tools import setup_win_unicode_console, print_color from xonsh.tools import setup_win_unicode_console, print_color
from xonsh.platform import HAS_PYGMENTS, ON_WINDOWS from xonsh.platform import HAS_PYGMENTS, ON_WINDOWS
@ -124,7 +124,7 @@ def _pprint_displayhook(value):
if value is None: if value is None:
return return
builtins._ = None # Set '_' to None to avoid recursion builtins._ = None # Set '_' to None to avoid recursion
if isinstance(value, HiddenCompletedCommand): if isinstance(value, HiddenCommandPipeline):
builtins._ = value builtins._ = value
return return
env = builtins.__xonsh_env__ env = builtins.__xonsh_env__

View file

@ -4,6 +4,7 @@ on a platform.
""" """
import os import os
import sys import sys
import signal
import pathlib import pathlib
import platform import platform
import functools import functools
@ -64,7 +65,11 @@ ON_ANACONDA = LazyBool(
lambda: any(s in sys.version for s in {'Anaconda', 'Continuum'}), lambda: any(s in sys.version for s in {'Anaconda', 'Continuum'}),
globals(), 'ON_ANACONDA') globals(), 'ON_ANACONDA')
""" ``True`` if executed in an Anaconda instance, else ``False``. """ """ ``True`` if executed in an Anaconda instance, else ``False``. """
CAN_RESIZE_WINDOW = LazyBool(lambda: hasattr(signal, 'SIGWINCH'),
globals(), 'CAN_RESIZE_WINDOW')
"""``True`` if we can resize terminal window, as provided by the presense of
signal.SIGWINCH, else ``False``.
"""
@lazybool @lazybool
def HAS_PYGMENTS(): def HAS_PYGMENTS():
@ -131,6 +136,35 @@ def is_readline_available():
return (spec is not None) return (spec is not None)
@lazyobject
def seps():
"""String of all path separators."""
s = os.path.sep
if os.path.altsep is not None:
s += os.path.altsep
return s
def pathsplit(p):
"""This is a safe version of os.path.split(), which does not work on input
without a drive.
"""
n = len(p)
while n and p[n-1] not in seps:
n -= 1
pre = p[:n]
pre = pre.rstrip(seps) or pre
post = p[n:]
return pre, post
def pathbasename(p):
"""This is a safe version of os.path.basename(), which does not work on
input without a drive.
"""
return pathsplit(p)[-1]
# #
# Dev release info # Dev release info
# #

File diff suppressed because it is too large Load diff

View file

@ -1,376 +0,0 @@
# -*- coding: utf-8 -*-
"""This implements a psuedo-TTY that tees its output into a Python buffer.
This file was forked from a version distibuted under an MIT license and
Copyright (c) 2011 Joshua D. Bartlett.
See http://sqizit.bartletts.id.au/2011/02/14/pseudo-terminals-in-python/ for
more information.
"""
import io
import re
import os
import sys
import time
import array
import select
import signal
import tempfile
import importlib
import threading
from xonsh.lazyasd import LazyObject, lazyobject
from xonsh.platform import ON_WINDOWS
#
# Explicit lazy imports for windows
#
@lazyobject
def tty():
if ON_WINDOWS:
return
else:
return importlib.import_module('tty')
@lazyobject
def pty():
if ON_WINDOWS:
return
else:
return importlib.import_module('pty')
@lazyobject
def termios():
if ON_WINDOWS:
return
else:
return importlib.import_module('termios')
@lazyobject
def fcntl():
if ON_WINDOWS:
return
else:
return importlib.import_module('fcntl')
# The following escape codes are xterm codes.
# See http://rtfm.etla.org/xterm/ctlseq.html for more.
MODE_NUMS = ('1049', '47', '1047')
START_ALTERNATE_MODE = LazyObject(
lambda: frozenset('\x1b[?{0}h'.format(i).encode() for i in MODE_NUMS),
globals(), 'START_ALTERNATE_MODE')
END_ALTERNATE_MODE = LazyObject(
lambda: frozenset('\x1b[?{0}l'.format(i).encode() for i in MODE_NUMS),
globals(), 'END_ALTERNATE_MODE')
ALTERNATE_MODE_FLAGS = LazyObject(
lambda: tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE),
globals(), 'ALTERNATE_MODE_FLAGS')
RE_HIDDEN_BYTES = LazyObject(lambda: re.compile(b'(\001.*?\002)'),
globals(), 'RE_HIDDEN')
RE_COLOR = LazyObject(lambda: re.compile(b'\033\[\d+;?\d*m'),
globals(), 'RE_COLOR')
def _findfirst(s, substrs):
"""Finds whichever of the given substrings occurs first in the given string
and returns that substring, or returns None if no such strings occur.
"""
i = len(s)
result = None
for substr in substrs:
pos = s.find(substr)
if -1 < pos < i:
i = pos
result = substr
return i, result
def _on_main_thread():
"""Checks if we are on the main thread or not. Duplicated from xonsh.tools
here so that this module only relies on the Python standrd library.
"""
return threading.current_thread() is threading.main_thread()
def _find_error_code(e):
"""Gets the approriate error code for an exception e, see
http://tldp.org/LDP/abs/html/exitcodes.html for exit codes.
"""
if isinstance(e, PermissionError):
code = 126
elif isinstance(e, FileNotFoundError):
code = 127
else:
code = 1
return code
class TeePTY(object):
"""This class is a pseudo terminal that tees the stdout and stderr into a buffer."""
def __init__(self, bufsize=1024, remove_color=True, encoding='utf-8',
errors='strict'):
"""
Parameters
----------
bufsize : int, optional
The buffer size to read from the root terminal to/from the tee'd terminal.
remove_color : bool, optional
Removes color codes from the tee'd buffer, though not the TTY.
encoding : str, optional
The encoding to use when decoding into a str.
errors : str, optional
The encoding error flag to use when decoding into a str.
"""
self.bufsize = bufsize
self.pid = self.master_fd = None
self._in_alt_mode = False
self.remove_color = remove_color
self.encoding = encoding
self.errors = errors
self.buffer = io.BytesIO()
self.wcode = None # os.wait encoded retval
self._temp_stdin = None
def __str__(self):
return self.buffer.getvalue().decode(encoding=self.encoding,
errors=self.errors)
def __del__(self):
if self._temp_stdin is not None:
self._temp_stdin.close()
self._temp_stdin = None
def spawn(self, argv=None, env=None, stdin=None, delay=None):
"""Create a spawned process. Based on the code for pty.spawn().
This cannot be used except from the main thread.
Parameters
----------
argv : list of str, optional
Arguments to pass in as subprocess. In None, will execute $SHELL.
env : Mapping, optional
Environment to pass execute in.
delay : float, optional
Delay timing before executing process if piping in data. The value
is passed into time.sleep() so it is in [seconds]. If delay is None,
its value will attempted to be looked up from the environment
variable $TEEPTY_PIPE_DELAY, from the passed in env or os.environ.
If not present or not positive valued, no delay is used.
Returns
-------
wcode : int
Return code for the spawned process encoded as os.wait format.
"""
assert self.master_fd is None
self._in_alt_mode = False
if not argv:
argv = [os.environ.get('SHELL', 'sh')]
argv = self._put_stdin_in_argv(argv, stdin)
pid, master_fd = pty.fork()
self.pid = pid
self.master_fd = master_fd
if pid == pty.CHILD:
# determine if a piping delay is needed.
if self._temp_stdin is not None:
self._delay_for_pipe(env=env, delay=delay)
# ok, go
try:
if env is None:
os.execvp(argv[0], argv)
else:
os.execvpe(argv[0], argv, env)
except OSError as e:
os._exit(_find_error_code(e))
else:
self._pipe_stdin(stdin)
on_main_thread = _on_main_thread()
if on_main_thread:
old_handler = signal.signal(signal.SIGWINCH, self._signal_winch)
try:
mode = tty.tcgetattr(pty.STDIN_FILENO)
tty.setraw(pty.STDIN_FILENO)
restore = True
except tty.error: # This is the same as termios.error
restore = False
self._init_fd()
try:
self._copy()
except (IOError, OSError):
if restore:
tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)
_, self.wcode = os.waitpid(pid, 0)
os.close(master_fd)
self.master_fd = None
self._in_alt_mode = False
if on_main_thread:
signal.signal(signal.SIGWINCH, old_handler)
return self.wcode
def _init_fd(self):
"""Called once when the pty is first set up."""
self._set_pty_size()
def _signal_winch(self, signum, frame):
"""Signal handler for SIGWINCH - window size has changed."""
self._set_pty_size()
def _set_pty_size(self):
"""Sets the window size of the child pty based on the window size of
our own controlling terminal.
"""
assert self.master_fd is not None
# Get the terminal size of the real terminal, set it on the
# pseudoterminal.
buf = array.array('h', [0, 0, 0, 0])
fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)
def _copy(self):
"""Main select loop. Passes all data to self.master_read() or self.stdin_read().
"""
assert self.master_fd is not None
master_fd = self.master_fd
bufsize = self.bufsize
while True:
try:
rfds, wfds, xfds = select.select([master_fd, pty.STDIN_FILENO], [], [])
except OSError as e:
if e.errno == 4: # Interrupted system call.
continue # This happens at terminal resize.
if master_fd in rfds:
data = os.read(master_fd, bufsize)
self.write_stdout(data)
if pty.STDIN_FILENO in rfds:
data = os.read(pty.STDIN_FILENO, bufsize)
self.write_stdin(data)
def _sanatize_data(self, data):
i, flag = _findfirst(data, ALTERNATE_MODE_FLAGS)
if flag is None and self._in_alt_mode:
return b''
elif flag is not None:
if flag in START_ALTERNATE_MODE:
# This code is executed when the child process switches the terminal into
# alternate mode. The line below assumes that the user has opened vim,
# less, or similar, and writes writes to stdin.
d0 = data[:i]
self._in_alt_mode = True
d1 = self._sanatize_data(data[i+len(flag):])
data = d0 + d1
elif flag in END_ALTERNATE_MODE:
# This code is executed when the child process switches the terminal back
# out of alternate mode. The line below assumes that the user has
# returned to the command prompt.
self._in_alt_mode = False
data = self._sanatize_data(data[i+len(flag):])
data = RE_HIDDEN_BYTES.sub(b'', data)
if self.remove_color:
data = RE_COLOR.sub(b'', data)
return data
def write_stdout(self, data):
"""Writes to stdout as if the child process had written the data (bytes)."""
os.write(pty.STDOUT_FILENO, data) # write to real terminal
# tee to buffer
data = self._sanatize_data(data)
if len(data) > 0:
self.buffer.write(data)
def write_stdin(self, data):
"""Writes to the child process from its controlling terminal."""
master_fd = self.master_fd
assert master_fd is not None
while len(data) > 0:
n = os.write(master_fd, data)
data = data[n:]
def _stdin_filename(self, stdin):
if stdin is None:
rtn = None
elif isinstance(stdin, io.FileIO) and os.path.isfile(stdin.name):
rtn = stdin.name
elif isinstance(stdin, (io.BufferedIOBase, str, bytes)):
self._temp_stdin = tsi = tempfile.NamedTemporaryFile()
rtn = tsi.name
else:
raise ValueError('stdin not understood {0!r}'.format(stdin))
return rtn
def _put_stdin_in_argv(self, argv, stdin):
stdin_filename = self._stdin_filename(stdin)
if stdin_filename is None:
return argv
argv = list(argv)
# a lone dash '-' argument means stdin
if argv.count('-') == 0:
argv.append(stdin_filename)
else:
argv[argv.index('-')] = stdin_filename
return argv
def _pipe_stdin(self, stdin):
if stdin is None or isinstance(stdin, io.FileIO):
return None
tsi = self._temp_stdin
bufsize = self.bufsize
if isinstance(stdin, io.BufferedIOBase):
buf = stdin.read(bufsize)
while len(buf) != 0:
tsi.write(buf)
tsi.flush()
buf = stdin.read(bufsize)
elif isinstance(stdin, (str, bytes)):
raw = stdin.encode() if isinstance(stdin, str) else stdin
for i in range((len(raw)//bufsize) + 1):
tsi.write(raw[i*bufsize:(i + 1)*bufsize])
tsi.flush()
else:
raise ValueError('stdin not understood {0!r}'.format(stdin))
def _delay_for_pipe(self, env=None, delay=None):
# This delay is sometimes needed because the temporary stdin file that
# is being written (the pipe) may not have even hits its first flush()
# call by the time the spawned process starts up and determines there
# is nothing in the file. The spawn can thus exit, without doing any
# real work. Consider the case of piping something into grep:
#
# $ ps aux | grep root
#
# grep will exit on EOF and so there is a race between the buffersize
# and flushing the temporary file and grep. However, this race is not
# always meaningful. Pagers, for example, update when the file is written
# to. So what is important is that we start the spawned process ASAP:
#
# $ ps aux | less
#
# So there is a push-and-pull between the the competing objectives of
# not blocking and letting the spawned process have enough to work with
# such that it doesn't exit prematurely. Unfortunately, there is no
# way to know a priori how big the file is, how long the spawned process
# will run for, etc. Thus as user-definable delay let's the user
# find something that works for them.
if delay is None:
delay = (env or os.environ).get('TEEPTY_PIPE_DELAY', -1.0)
delay = float(delay)
if 0.0 < delay:
time.sleep(delay)
def _teepty_main():
tpty = TeePTY()
tpty.spawn(sys.argv[1:])
print('-=-'*10)
print(tpty.buffer.getvalue())
print('-=-'*10)
print(tpty)
print('-=-'*10)
print('Returned with status {0}'.format(tpty.wcode))

View file

@ -121,6 +121,20 @@ def decode_bytes(b):
return b.decode(encoding=enc, errors=err) return b.decode(encoding=enc, errors=err)
def findfirst(s, substrs):
"""Finds whichever of the given substrings occurs first in the given string
and returns that substring, or returns None if no such strings occur.
"""
i = len(s)
result = None
for substr in substrs:
pos = s.find(substr)
if -1 < pos < i:
i = pos
result = substr
return i, result
class EnvPath(collections.MutableSequence): class EnvPath(collections.MutableSequence):
"""A class that implements an environment path, which is a list of """A class that implements an environment path, which is a list of
strings. Provides a custom method that expands all paths if the strings. Provides a custom method that expands all paths if the
@ -559,7 +573,7 @@ def command_not_found(cmd):
def suggest_commands(cmd, env, aliases): def suggest_commands(cmd, env, aliases):
"""Suggests alternative commands given an environment and aliases.""" """Suggests alternative commands given an environment and aliases."""
if not env.get('SUGGEST_COMMANDS'): if not env.get('SUGGEST_COMMANDS'):
return return ''
thresh = env.get('SUGGEST_THRESHOLD') thresh = env.get('SUGGEST_THRESHOLD')
max_sugg = env.get('SUGGEST_MAX_NUM') max_sugg = env.get('SUGGEST_MAX_NUM')
if max_sugg < 0: if max_sugg < 0: