got some of it

This commit is contained in:
Anthony Scopatz 2016-09-09 00:08:13 -04:00
parent 71445888a9
commit c5285a796a
7 changed files with 68 additions and 22 deletions

View file

@ -10,6 +10,8 @@ from xonsh.execer import Execer
from xonsh.tools import XonshBlockError
from xonsh.events import events
from xonsh.platform import ON_WINDOWS
from xonsh.commands_cache import CommandsCache
from tools import DummyShell, sp, DummyCommandsCache, DummyEnv
@ -40,6 +42,7 @@ def xonsh_builtins():
builtins.__xonsh_stderr_uncaptured__ = None
builtins.__xonsh_ensure_list_of_strs__ = ensure_list_of_strs
builtins.__xonsh_commands_cache__ = DummyCommandsCache()
builtins.__xonsh_all_jobs__ = {}
builtins.XonshBlockError = XonshBlockError
builtins.__xonsh_subproc_captured_hiddenobject__ = sp
builtins.evalx = eval
@ -65,6 +68,7 @@ def xonsh_builtins():
del builtins.__xonsh_subproc_uncaptured__
del builtins.__xonsh_ensure_list_of_strs__
del builtins.__xonsh_commands_cache__
del builtins.__xonsh_all_jobs__
del builtins.XonshBlockError
del builtins.evalx
del builtins.execx
@ -72,6 +76,7 @@ def xonsh_builtins():
del builtins.aliases
del builtins.events
pytest_plugins = ['xonsh', ]
if ON_WINDOWS:
try:

View file

@ -1,22 +1,29 @@
import sys
import os
import sys
import pytest
from xonsh.built_ins import run_subproc
@pytest.yield_fixture(autouse=True, scope='module')
def chdir_to_test_dir():
old_cwd = os.getcwd()
os.chdir(os.path.dirname(__file__))
new_cwd = os.path.dirname(__file__)
os.chdir(new_cwd)
yield
os.chdir(old_cwd)
def test_runsubproc_simple(capfd, xonsh_builtins, xonsh_execer):
run_subproc([['pwd']])
out, err = capfd.readouterr()
assert out.rstrip() == os.getcwd()
#def test_runsubproc_simple(capfd, xonsh_builtins, xonsh_execer):
def test_runsubproc_simple(xonsh_builtins, xonsh_execer):
new_cwd = os.path.dirname(__file__)
xonsh_builtins.__xonsh_env__['PATH'] = os.path.join(new_cwd, 'bin') + \
os.pathsep + os.path.dirname(sys.executable)
out = run_subproc([['pwd']], captured='stdout')
#run_subproc([['pwd']])
#out, err = capfd.readouterr()
assert out.rstrip() == new_cwd
def test_runsubproc_pipe(capfd, xonsh_builtins, xonsh_execer):
run_subproc([['ls'], '|', ['grep', '-i', 'sample']])

View file

@ -1,16 +1,14 @@
def test_simple():
assert 1 + 1 == 2
def test_envionment():
$USER = 'snail'
x = 'USER'
assert x in ${...}
assert ${'U' + 'SER'} == 'snail'
def test_xonsh_party():
x = 'xonsh'
y = 'party'

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh lexer."""
from __future__ import unicode_literals, print_function
import os
import sys
import ast
import builtins
@ -66,8 +67,8 @@ class DummyShell:
class DummyCommandsCache():
def locate_binary(self, *args, **kwargs):
pass
def locate_binary(self, name):
return os.path.join(os.path.dirname(__file__), 'bin', name)
class DummyEnv(MutableMapping):

View file

@ -660,9 +660,12 @@ def _update_last_spec(last, captured=False):
last.universal_newlines = True
elif captured in stdout_capture_kinds:
last.universal_newlines = False
r, w = os.pipe()
last.stdout = safe_open(w, 'wb')
last.captured_stdout = safe_open(r, 'rb')
if callable(last.alias):
r, w = os.pipe()
last.stdout = safe_open(w, 'wb')
last.captured_stdout = safe_open(r, 'rb')
else:
last.stdout = subprocess.PIPE
elif builtins.__xonsh_stdout_uncaptured__ is not None:
last.universal_newlines = True
last.stdout = builtins.__xonsh_stdout_uncaptured__
@ -767,8 +770,9 @@ def run_subproc(cmds, captured=False):
command = Command(specs, procs, starttime=starttime)
# now figure out what we should return.
if captured == 'stdout':
print(1)
command.end()
#raise Exception
print(2)
return command.output
elif captured == 'object' or captured == 'hiddenobject':
return command

View file

@ -619,14 +619,24 @@ class Command:
def __iter__(self):
proc = self.proc
stdout = proc.stdout
if not stdout.readable() and self.spec.captured_stdout is not None:
if ((stdout is None or not stdout.readable()) and
self.spec.captured_stdout is not None):
stdout = self.spec.captured_stdout
print(stdout)
if not stdout or not stdout.readable():
print('not readable')
raise StopIteration()
while proc.poll() is None:
print('reading lines', proc)
#while proc.poll() is None:
while proc.returncode is None:
print('poll', proc.returncode)
yield from stdout.readlines(1024)
print('poll', proc.returncode)
print('ending time')
self._endtime()
print('finish reading lines')
yield from stdout.readlines()
print('read stdout')
#self.end()
def itercheck(self):
@ -642,10 +652,15 @@ class Command:
yields each line.
"""
self.output = '' if self.spec.universal_newlines else b''
print(self.output)
for line in self:
print(line)
sys.stdout.write(line)
print('wrote to stdout')
self.output += line
print('added line to output')
yield line
print('yielded line')
def _decode_uninew(self, b):
"""Decode bytes into a str and apply universal newlines as needed."""
@ -666,18 +681,30 @@ class Command:
"""Waits for the command to complete and then runs any closing and
cleanup procedures that need to be run.
"""
print(self.ended)
if self.ended:
return
print(10)
self.proc.wait()
print(20)
self._endtime()
print(30)
self._close_procs()
print(40)
self._set_input()
print(50)
self._set_output()
print(60)
self._set_errors()
print(70)
self._check_signal()
print(80)
self._apply_to_history()
print(90)
self._raise_subproc_error()
print(100)
self.ended = True
print(110)
def _endtime(self):
"""Sets the closing timestamp if it hasn't been already."""
@ -703,10 +730,14 @@ class Command:
def _set_output(self):
"""Sets the output vaiable."""
output = b''
#output = b''
print('prepping to tee')
for line in self.tee_stdout():
output += line
self.output = self._decode_uninew(output)
pass
print('teed')
# output += line
self.output = self._decode_uninew(self.output)
print('converted')
def _set_errors(self):
"""Sets the errors vaiable."""

View file

@ -559,7 +559,7 @@ def command_not_found(cmd):
def suggest_commands(cmd, env, aliases):
"""Suggests alternative commands given an environment and aliases."""
if not env.get('SUGGEST_COMMANDS'):
return
return ''
thresh = env.get('SUGGEST_THRESHOLD')
max_sugg = env.get('SUGGEST_MAX_NUM')
if max_sugg < 0: