Merge pull request #1938 from xonsh/sleepless

Sleepless
This commit is contained in:
Morten Enemark Lund 2016-11-08 06:44:29 +01:00 committed by GitHub
commit 3ce27a17fa
4 changed files with 87 additions and 51 deletions

View file

@ -48,6 +48,8 @@
```while True: sleep 1``.
* Fix for stdin redirects.
* Backgrounding works with ``$XONSH_STORE_STDOUT``
* ``PopenThread`` blocks its thread from finishing until command has completed
or process is suspended.
* Added a minimum time buffer time for command pipelines to check for
if previous commands have executed successfully. This is helpful
for pipelines where the last command takes a long time to start up,

View file

@ -426,7 +426,8 @@ class SubprocSpec:
self.captured_stderr = None
def __str__(self):
s = self.cls.__name__ + '(' + str(self.cmd) + ', '
s = self.__class__.__name__ + '(' + str(self.cmd) + ', '
s += self.cls.__name__ + ', '
kws = [n + '=' + str(getattr(self, n)) for n in self.kwnames]
s += ', '.join(kws) + ')'
return s

View file

@ -115,7 +115,10 @@ class HistoryGC(threading.Thread):
file) tuples.
"""
# pylint: disable=no-member
xdd = builtins.__xonsh_env__.get('XONSH_DATA_DIR')
env = getattr(builtins, '__xonsh_env__', None)
if env is None:
return []
xdd = env.get('XONSH_DATA_DIR')
xdd = expanduser_abs_path(xdd)
fs = [f for f in glob.iglob(os.path.join(xdd, 'xonsh-*.json'))]

View file

@ -82,24 +82,29 @@ class QueueReader:
"""
self.fd = fd
self.timeout = timeout
self.sleepscale = 0
self.closed = False
self.queue = queue.Queue()
self.thread = None
def close(self):
"""close the reader"""
self.closed = True
def read_queue(self, timeout=None):
"""Reads a single chunk from the queue. This is non-blocking."""
timeout = timeout or self.timeout
def is_fully_read(self):
"""Returns whether or not the queue is fully read and the reader is
closed.
"""
return (self.closed
and (self.thread is None or not self.thread.is_alive())
and self.queue.empty())
def read_queue(self):
"""Reads a single chunk from the queue. This is blocking if
the timeout is None and non-blocking otherwise.
"""
try:
self.sleepscale = 0
return self.queue.get(block=timeout is not None,
timeout=timeout)
return self.queue.get(block=True, timeout=self.timeout)
except queue.Empty:
self.sleepscale = min(self.sleepscale + 1, 3)
time.sleep(timeout * 10**self.sleepscale)
return b''
def read(self, size=-1):
@ -131,14 +136,26 @@ class QueueReader:
i += len(line)
return buf
def _read_all_lines(self):
"""This reads all remaining lines in a blocking fashion."""
lines = []
while not self.is_fully_read():
chunk = self.read_queue()
lines.extend(chunk.splitlines(keepends=True))
return lines
def readlines(self, hint=-1):
"""Reads lines from the file descriptor."""
"""Reads lines from the file descriptor. This is blocking for negative
hints (i.e. read all the remaining lines) and non-blocking otherwise.
"""
if hint == -1:
return self._read_all_lines()
lines = []
while len(lines) != hint:
line = self.readline(size=-1)
if not line:
chunk = self.read_queue()
if not chunk:
break
lines.append(line)
lines.extend(chunk.splitlines(keepends=True))
return lines
def fileno(self):
@ -150,6 +167,14 @@ class QueueReader:
"""Returns true, because this object is always readable."""
return True
def iterqueue(self):
"""Iterates through all remaining chunks in a blocking fashion."""
while not self.is_fully_read():
chunk = self.read_queue()
if not chunk:
continue
yield chunk
def populate_fd_queue(reader, fd, queue):
"""Reads 1 kb of data from a file descriptor into a queue.
@ -242,6 +267,8 @@ class BufferedFDParallelReader:
def _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd):
# if we are getting close to the end of the console buffer,
# expand it so that we can read from it successfully.
if cols == 0:
return orig_posize[-1], max_offset, orig_posize
rows = ((max_offset + expandsize)//cols) + 1
winutils.set_console_screen_buffer_size(cols, rows, fd=fd)
orig_posize = orig_posize[:3] + (rows,)
@ -251,9 +278,10 @@ def _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd):
def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None):
"""Reads bytes from the file descriptor and puts lines into the queue.
The reads happend in parallel, using xonsh.winutils.read_console_output_character(),
and is thus only available on windows. If the read fails for any reason, the reader is
flagged as closed.
The reads happend in parallel,
using xonsh.winutils.read_console_output_character(),
and is thus only available on windows. If the read fails for any reason,
the reader is flagged as closed.
"""
# OK, so this function is super annoying because Windows stores its
# buffers as a 2D regular, dense array -- without trailing newlines.
@ -316,7 +344,7 @@ def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None):
if reader.closed:
break
else:
time.sleep(reader.timeout * 10**reader.sleepscale)
time.sleep(reader.timeout)
continue
elif max_offset <= offset + expandsize:
ecb = _expand_console_buffer(cols, max_offset, expandsize,
@ -348,7 +376,7 @@ def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None):
buf = buf.rstrip()
nread = len(buf)
if nread == 0:
time.sleep(reader.timeout * 10**reader.sleepscale)
time.sleep(reader.timeout)
continue
cur_x, cur_y = posize[0], posize[1]
cur_offset = (cols*cur_y) + cur_x
@ -364,7 +392,7 @@ def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None):
for l in range(yshift)]
lines = [line for line in lines if line]
if not lines:
time.sleep(reader.timeout * 10**reader.sleepscale)
time.sleep(reader.timeout)
continue
# put lines in the queue
nl = b'\n'
@ -469,9 +497,8 @@ class PopenThread(threading.Thread):
def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs):
super().__init__()
self.lock = threading.RLock()
self.stdout_fd = 1 if stdout is None else stdout.fileno()
self._set_pty_size()
env = builtins.__xonsh_env__
# stdin setup
self.orig_stdin = stdin
if stdin is None:
self.stdin_fd = 0
@ -482,6 +509,12 @@ class PopenThread(threading.Thread):
self.store_stdin = env.get('XONSH_STORE_STDIN')
self.in_alt_mode = False
self.stdin_mode = None
# stdout setup
self.orig_stdout = stdout
self.stdout_fd = 1 if stdout is None else stdout.fileno()
self._set_pty_size()
# stderr setup
self.orig_stderr = stderr
# Set some signal handles, if we can. Must come before process
# is started to prevent deadlock on windows
self.proc = None # has to be here for closure for handles
@ -561,33 +594,35 @@ class PopenThread(threading.Thread):
# loop over reads while process is running.
cnt = 1
while proc.poll() is None:
i = self._read_write(procout, stdout, sys.__stdout__)
j = self._read_write(procerr, stderr, sys.__stderr__)
if self.suspended:
break
elif self.in_alt_mode:
# this is here for CPU performance reasons.
if i + j == 0:
cnt = min(cnt + 1, 1000)
else:
cnt = 1
time.sleep(self.timeout * cnt)
elif self.prevs_are_closed:
break
else:
time.sleep(self.timeout)
# final closing read.
cntout = cnterr = 0
while cntout < 10 and cnterr < 10:
i = self._read_write(procout, stdout, sys.__stdout__)
j = self._read_write(procerr, stderr, sys.__stderr__)
cntout = 0 if i > 0 else cntout + 1
cnterr = 0 if j > 0 else cnterr + 1
time.sleep(self.timeout * (10 - cntout))
procout.timeout = procerr.timeout = self.timeout * cnt
if self.suspended:
return
# close files to send EOF to non-blocking reader.
# capout & caperr seem to be needed only by Windows, while
# orig_stdout & orig_stderr are need by posix and Windows.
# Probably best to close them all. Also, order seems to matter here,
# with orig_* needed to be closed before cap*
safe_fdclose(self.orig_stdout)
safe_fdclose(self.orig_stderr)
safe_fdclose(capout)
safe_fdclose(caperr)
# read in the remaining data in a blocking fashion.
while (procout is not None and not procout.is_fully_read()) or \
(procerr is not None and not procerr.is_fully_read()):
self._read_write(procout, stdout, sys.__stdout__)
self._read_write(procerr, stderr, sys.__stderr__)
# kill the process if it is still alive. Happens when piping.
time.sleep(self.timeout)
if proc.poll() is None and not self.suspended:
time.sleep(self.timeout)
if proc.poll() is None:
proc.terminate()
def _wait_and_getattr(self, name):
@ -692,7 +727,6 @@ class PopenThread(threading.Thread):
def _signal_int(self, signum, frame):
"""Signal handler for SIGINT - Ctrl+C may have been pressed."""
self.send_signal(signum)
time.sleep(self.timeout)
if self.proc.poll() is not None:
self._restore_sigint(frame=frame)
@ -794,9 +828,7 @@ class PopenThread(threading.Thread):
"""
self._disable_cbreak_stdin()
rtn = self.proc.wait(timeout=timeout)
while self.is_alive():
self.join(timeout=1e-7)
time.sleep(1e-7)
self.join()
# need to replace the old signal handlers somewhere...
if self.old_winch_handler is not None and on_main_thread():
signal.signal(signal.SIGWINCH, self.old_winch_handler)
@ -1267,9 +1299,7 @@ class ProcProxyThread(threading.Thread):
def wait(self, timeout=None):
"""Waits for the process to finish and returns the return code."""
while self.is_alive():
self.join(timeout=1e-7)
time.sleep(1e-7)
self.join()
return self.returncode
# The code below (_get_devnull, _get_handles, and _make_inheritable) comes
@ -1558,6 +1588,8 @@ class CommandPipeline:
"stderr_redirect", "timestamps", "executed_cmd", 'input',
'output', 'errors')
nonblocking = (io.BytesIO, NonBlockingFDReader, ConsoleParallelReader)
def __init__(self, specs, procs, starttime=None, captured=False):
"""
Parameters
@ -1635,8 +1667,7 @@ class CommandPipeline:
stdout = spec.captured_stdout
if hasattr(stdout, 'buffer'):
stdout = stdout.buffer
if stdout is not None and \
not isinstance(stdout, (io.BytesIO, NonBlockingFDReader, ConsoleParallelReader)):
if stdout is not None and not isinstance(stdout, self.nonblocking):
stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout)
if not stdout or not safe_readable(stdout):
# we get here if the process is not threadable or the
@ -1654,8 +1685,7 @@ class CommandPipeline:
stderr = spec.captured_stderr
if hasattr(stderr, 'buffer'):
stderr = stderr.buffer
if stderr is not None and \
not isinstance(stderr, (io.BytesIO, NonBlockingFDReader, ConsoleParallelReader)):
if stderr is not None and not isinstance(stderr, self.nonblocking):
stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout)
# read from process while it is running
check_prev_done = len(self.procs) == 1