Merge pull request #1870 from xonsh/funcalias

Non-blocking Function Aliases
This commit is contained in:
Gil Forsyth 2016-10-25 09:32:48 -04:00 committed by GitHub
commit 524f61fcc7
3 changed files with 54 additions and 13 deletions

View file

@ -6,7 +6,8 @@
* New ``CommandPipeline`` and ``HiddenCommandPipeline`` classes manage the
execution of a pipeline of commands via the execution of the last command
in the pipeline. Instances may be iterated and stream lines from the
stdout buffer.
stdout buffer. These pipelines read from the stdout & stderr streams in a
non-blocking manner.
* ``$XONSH_STORE_STDOUT`` is now available on all platforms!
* The ``CommandsCache`` now has the ability to predict whether or not a
command must be run in the foreground using ``Popen`` or may use a

View file

@ -85,17 +85,18 @@ def _f():
aliases['f'] = _f
print(![f].returncode)
""", "42\n", 0),
# test uncaptured streaming alias
# test uncaptured streaming alias,
# order actually printed in is non-deterministic
("""
def _test_stream(args, stdin, stdout, stderr):
print('hallo on err', file=stderr)
print('hallo on out', file=stdout)
print('hallo on stream', file=stderr)
print('hallo on stream', file=stdout)
return 1
aliases['test-stream'] = _test_stream
x = ![test-stream]
print(x.returncode)
""", "hallo on out\nhallo on err\n1\n", 0),
""", "hallo on stream\nhallo on stream\n1\n", 0),
# test captured streaming alias
("""
def _test_stream(args, stdin, stdout, stderr):
@ -107,6 +108,30 @@ aliases['test-stream'] = _test_stream
x = !(test-stream)
print(x.returncode)
""", "hallo on err\n1\n", 0),
# test piping aliases
("""
def dummy(args, inn, out, err):
out.write('hey!')
return 0
def dummy2(args, inn, out, err):
s = inn.read()
out.write(s.upper())
return 0
aliases['d'] = dummy
aliases['d2'] = dummy2
d | d2
""", "HEY!", 0),
# test output larger than most pipe buffers
("""
def _g(args, stdin=None):
for i in range(1000):
print('x' * 100)
aliases['g'] = _g
g
""", (("x"*100) + '\n') * 1000, 0),
]

View file

@ -184,6 +184,11 @@ class NonBlockingFDReader:
"""Returns the file descriptor number."""
return self.fd
@staticmethod
def readable():
"""Returns true, because this object is always readable."""
return True
def populate_buffer(reader, fd, buffer, chunksize):
"""Reads bytes from the file descriptor and copies them into a buffer.
@ -1024,6 +1029,13 @@ class ProcProxyThread(threading.Thread):
env = builtins.__xonsh_env__
enc = env.get('XONSH_ENCODING')
err = env.get('XONSH_ENCODING_ERRORS')
if ON_WINDOWS:
if self.p2cread != -1:
self.p2cread = msvcrt.open_osfhandle(self.p2cread.Detach(), 0)
if self.c2pwrite != -1:
self.c2pwrite = msvcrt.open_osfhandle(self.c2pwrite.Detach(), 0)
if self.errwrite != -1:
self.errwrite = msvcrt.open_osfhandle(self.errwrite.Detach(), 0)
# get stdin
if self.stdin is None:
sp_stdin = None
@ -1032,11 +1044,6 @@ class ProcProxyThread(threading.Thread):
encoding=enc, errors=err)
else:
sp_stdin = sys.stdin
if ON_WINDOWS:
if self.c2pwrite != -1:
self.c2pwrite = msvcrt.open_osfhandle(self.c2pwrite.Detach(), 0)
if self.errwrite != -1:
self.errwrite = msvcrt.open_osfhandle(self.errwrite.Detach(), 0)
# stdout
if self.c2pwrite != -1:
sp_stdout = io.TextIOWrapper(io.open(self.c2pwrite, 'wb', -1),
@ -1066,7 +1073,9 @@ class ProcProxyThread(threading.Thread):
safe_flush(sp_stdout)
safe_flush(sp_stderr)
self.returncode = parse_proxy_return(r, sp_stdout, sp_stderr)
if not last_in_pipeline:
if not last_in_pipeline and not ON_WINDOWS:
# mac requires us *not to* close the handles here while
# windows requires us *to* close the handles here
return
# clean up
# scopz: not sure why this is needed, but stdin cannot go here
@ -1443,6 +1452,7 @@ class CommandPipeline:
"""
# get approriate handles
proc = self.proc
timeout = builtins.__xonsh_env__.get('XONSH_PROC_FREQUENCY')
# get the correct stdout
stdout = proc.stdout
if ((stdout is None or not safe_readable(stdout)) and
@ -1450,6 +1460,9 @@ class CommandPipeline:
stdout = self.spec.captured_stdout
if hasattr(stdout, 'buffer'):
stdout = stdout.buffer
if stdout is not None and \
not isinstance(stdout, (io.BytesIO, NonBlockingFDReader)):
stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout)
if not stdout or not safe_readable(stdout):
# we get here if the process is not bacgroundable or the
# class is the real Popen
@ -1466,8 +1479,10 @@ class CommandPipeline:
stderr = self.spec.captured_stderr
if hasattr(stderr, 'buffer'):
stderr = stderr.buffer
if stderr is not None and \
not isinstance(stderr, (io.BytesIO, NonBlockingFDReader)):
stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout)
# read from process while it is running
timeout = builtins.__xonsh_env__.get('XONSH_PROC_FREQUENCY')
check_prev_done = len(self.procs) == 1
while proc.poll() is None:
if getattr(proc, 'suspended', False):
@ -1652,7 +1667,7 @@ class CommandPipeline:
"""Sets the input vaiable."""
stdin = self.proc.stdin
if stdin is None or isinstance(stdin, int) or stdin.closed or \
not stdin.seekable():
not stdin.seekable() or not safe_readable(stdin):
input = b''
else:
stdin.seek(0)