From cfde90b92efc51b5a4151fff197bcd70b4bebba8 Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Sun, 16 Oct 2016 12:07:39 -0400 Subject: [PATCH 1/8] some piping aliases --- tests/test_integrations.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_integrations.py b/tests/test_integrations.py index cd2228a31..af9eda68f 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -80,6 +80,21 @@ aliases['test-stream'] = _test_stream x = !(test-stream) print(x.returncode) """, "hallo on err\n1\n", 0), +# test piping aliases +(""" +def dummy(args, inn, out, err): + out.write('hey!') + return 0 + +def dummy2(args, inn, out, err): + s = inn.read() + out.write(s.upper()) + return 0 + +aliases['d'] = dummy +aliases['d2'] = dummy2 +d | d2 +""", "HEY!", 0), ] From 7e57f1d272dde31e9dd4c7347668946d466807ea Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Tue, 18 Oct 2016 21:14:08 -0400 Subject: [PATCH 2/8] non-blocking aliases --- news/rs.rst | 3 ++- tests/test_integrations.py | 9 +++++++++ xonsh/proc.py | 11 ++++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/news/rs.rst b/news/rs.rst index ca2d8d6c9..dd183ad29 100644 --- a/news/rs.rst +++ b/news/rs.rst @@ -6,7 +6,8 @@ * New ``CommandPipeline`` and ``HiddenCommandPipeline`` classes manage the execution of a pipeline of commands via the execution of the last command in the pipeline. Instances may be iterated and stream lines from the - stdout buffer. + stdout buffer. These pipelines read from the stdout & stderr streams in a + non-blocking manner. * ``$XONSH_STORE_STDOUT`` is now available on all platforms! * The ``CommandsCache`` now has the ability to predict whether or not a command must be run in the foreground using ``Popen`` or may use a diff --git a/tests/test_integrations.py b/tests/test_integrations.py index bf6a82186..031e83438 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -122,6 +122,15 @@ aliases['d'] = dummy aliases['d2'] = dummy2 d | d2 """, "HEY!", 0), +# test output larger than most pipe buffers +(""" +def _g(args, stdin=None): + for i in range(1000): + print('x' * 100) + +aliases['g'] = _g +g +""", (("x"*100) + '\n') * 1000, 0), ] diff --git a/xonsh/proc.py b/xonsh/proc.py index a8cfe6115..49e2053bb 100644 --- a/xonsh/proc.py +++ b/xonsh/proc.py @@ -184,6 +184,11 @@ class NonBlockingFDReader: """Returns the file descriptor number.""" return self.fd + @staticmethod + def readable(): + """Returns true, because this object is always readable.""" + return True + def populate_buffer(reader, fd, buffer, chunksize): """Reads bytes from the file descriptor and copies them into a buffer. @@ -1443,6 +1448,7 @@ class CommandPipeline: """ # get approriate handles proc = self.proc + timeout = builtins.__xonsh_env__.get('XONSH_PROC_FREQUENCY') # get the correct stdout stdout = proc.stdout if ((stdout is None or not safe_readable(stdout)) and @@ -1450,6 +1456,8 @@ class CommandPipeline: stdout = self.spec.captured_stdout if hasattr(stdout, 'buffer'): stdout = stdout.buffer + if not isinstance(stdout, (io.BytesIO, NonBlockingFDReader)): + stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout) if not stdout or not safe_readable(stdout): # we get here if the process is not bacgroundable or the # class is the real Popen @@ -1466,8 +1474,9 @@ class CommandPipeline: stderr = self.spec.captured_stderr if hasattr(stderr, 'buffer'): stderr = stderr.buffer + if not isinstance(stderr, (io.BytesIO, NonBlockingFDReader)): + stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout) # read from process while it is running - timeout = builtins.__xonsh_env__.get('XONSH_PROC_FREQUENCY') while proc.poll() is None: if getattr(proc, 'suspended', False): return From 5cd5e18a641dc86d77cd48f7d576bbd304703fca Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Wed, 19 Oct 2016 14:45:46 -0400 Subject: [PATCH 3/8] None check --- xonsh/proc.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/xonsh/proc.py b/xonsh/proc.py index 88867c49e..499584d5c 100644 --- a/xonsh/proc.py +++ b/xonsh/proc.py @@ -1456,7 +1456,8 @@ class CommandPipeline: stdout = self.spec.captured_stdout if hasattr(stdout, 'buffer'): stdout = stdout.buffer - if not isinstance(stdout, (io.BytesIO, NonBlockingFDReader)): + if stdout is not None and \ + not isinstance(stdout, (io.BytesIO, NonBlockingFDReader)): stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout) if not stdout or not safe_readable(stdout): # we get here if the process is not bacgroundable or the @@ -1474,7 +1475,8 @@ class CommandPipeline: stderr = self.spec.captured_stderr if hasattr(stderr, 'buffer'): stderr = stderr.buffer - if not isinstance(stderr, (io.BytesIO, NonBlockingFDReader)): + if stderr is not None and \ + not isinstance(stderr, (io.BytesIO, NonBlockingFDReader)): stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout) # read from process while it is running check_prev_done = len(self.procs) == 1 From 358c79cc86ead99314c7f053233ad12d9fb935eb Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Thu, 20 Oct 2016 01:37:16 -0400 Subject: [PATCH 4/8] fix flake8 --- xonsh/proc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xonsh/proc.py b/xonsh/proc.py index 499584d5c..4e8390280 100644 --- a/xonsh/proc.py +++ b/xonsh/proc.py @@ -1457,7 +1457,7 @@ class CommandPipeline: if hasattr(stdout, 'buffer'): stdout = stdout.buffer if stdout is not None and \ - not isinstance(stdout, (io.BytesIO, NonBlockingFDReader)): + not isinstance(stdout, (io.BytesIO, NonBlockingFDReader)): stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout) if not stdout or not safe_readable(stdout): # we get here if the process is not bacgroundable or the @@ -1476,7 +1476,7 @@ class CommandPipeline: if hasattr(stderr, 'buffer'): stderr = stderr.buffer if stderr is not None and \ - not isinstance(stderr, (io.BytesIO, NonBlockingFDReader)): + not isinstance(stderr, (io.BytesIO, NonBlockingFDReader)): stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout) # read from process while it is running check_prev_done = len(self.procs) == 1 From 89369af9a71a4d77dd8b3e457b80203860302397 Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Mon, 24 Oct 2016 22:15:12 -0400 Subject: [PATCH 5/8] proc stuff --- xonsh/proc.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/xonsh/proc.py b/xonsh/proc.py index 4e8390280..6c1ddca8c 100644 --- a/xonsh/proc.py +++ b/xonsh/proc.py @@ -1029,6 +1029,13 @@ class ProcProxyThread(threading.Thread): env = builtins.__xonsh_env__ enc = env.get('XONSH_ENCODING') err = env.get('XONSH_ENCODING_ERRORS') + if ON_WINDOWS: + if self.p2cread != -1: + self.p2cread = msvcrt.open_osfhandle(self.p2cread.Detach(), 0) + if self.c2pwrite != -1: + self.c2pwrite = msvcrt.open_osfhandle(self.c2pwrite.Detach(), 0) + if self.errwrite != -1: + self.errwrite = msvcrt.open_osfhandle(self.errwrite.Detach(), 0) # get stdin if self.stdin is None: sp_stdin = None @@ -1037,11 +1044,6 @@ class ProcProxyThread(threading.Thread): encoding=enc, errors=err) else: sp_stdin = sys.stdin - if ON_WINDOWS: - if self.c2pwrite != -1: - self.c2pwrite = msvcrt.open_osfhandle(self.c2pwrite.Detach(), 0) - if self.errwrite != -1: - self.errwrite = msvcrt.open_osfhandle(self.errwrite.Detach(), 0) # stdout if self.c2pwrite != -1: sp_stdout = io.TextIOWrapper(io.open(self.c2pwrite, 'wb', -1), @@ -1071,8 +1073,6 @@ class ProcProxyThread(threading.Thread): safe_flush(sp_stdout) safe_flush(sp_stderr) self.returncode = parse_proxy_return(r, sp_stdout, sp_stderr) - if not last_in_pipeline: - return # clean up # scopz: not sure why this is needed, but stdin cannot go here # and stdout & stderr must. @@ -1496,6 +1496,7 @@ class CommandPipeline: self._close_prev_procs() proc.prevs_are_closed = True break + #print(stdout) stdout_lines = safe_readlines(stdout, 1024) yield from stdout_lines stderr_lines = safe_readlines(stderr, 1024) @@ -1663,7 +1664,7 @@ class CommandPipeline: """Sets the input vaiable.""" stdin = self.proc.stdin if stdin is None or isinstance(stdin, int) or stdin.closed or \ - not stdin.seekable(): + not stdin.seekable() or not safe_readable(stdin): input = b'' else: stdin.seek(0) From 86f554f487aac9d471235b1a6f371303560e3fb2 Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Mon, 24 Oct 2016 22:16:11 -0400 Subject: [PATCH 6/8] removed comment --- xonsh/proc.py | 1 - 1 file changed, 1 deletion(-) diff --git a/xonsh/proc.py b/xonsh/proc.py index 6c1ddca8c..ef2067e92 100644 --- a/xonsh/proc.py +++ b/xonsh/proc.py @@ -1496,7 +1496,6 @@ class CommandPipeline: self._close_prev_procs() proc.prevs_are_closed = True break - #print(stdout) stdout_lines = safe_readlines(stdout, 1024) yield from stdout_lines stderr_lines = safe_readlines(stderr, 1024) From e41ef2fa6a49d0a332cf7642ae9e97d671d05074 Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Mon, 24 Oct 2016 22:41:01 -0400 Subject: [PATCH 7/8] hopefully fix test erros --- tests/test_integrations.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/test_integrations.py b/tests/test_integrations.py index 031e83438..824e65c5b 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -85,17 +85,18 @@ def _f(): aliases['f'] = _f print(![f].returncode) """, "42\n", 0), -# test uncaptured streaming alias +# test uncaptured streaming alias, +# order actually printed in is non-deterministic (""" def _test_stream(args, stdin, stdout, stderr): - print('hallo on err', file=stderr) - print('hallo on out', file=stdout) + print('hallo on stream', file=stderr) + print('hallo on stream', file=stdout) return 1 aliases['test-stream'] = _test_stream x = ![test-stream] print(x.returncode) -""", "hallo on out\nhallo on err\n1\n", 0), +""", "hallo on stream\nhallo on stream\n1\n", 0), # test captured streaming alias (""" def _test_stream(args, stdin, stdout, stderr): From be91e65daab6941fc12447fa83b74035e1dd76ff Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Mon, 24 Oct 2016 23:11:18 -0400 Subject: [PATCH 8/8] update for mac windows --- xonsh/proc.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/xonsh/proc.py b/xonsh/proc.py index ef2067e92..b57486f93 100644 --- a/xonsh/proc.py +++ b/xonsh/proc.py @@ -1073,6 +1073,10 @@ class ProcProxyThread(threading.Thread): safe_flush(sp_stdout) safe_flush(sp_stderr) self.returncode = parse_proxy_return(r, sp_stdout, sp_stderr) + if not last_in_pipeline and not ON_WINDOWS: + # mac requires us *not to* close the handles here while + # windows requires us *to* close the handles here + return # clean up # scopz: not sure why this is needed, but stdin cannot go here # and stdout & stderr must.