diff --git a/news/procs.rst b/news/procs.rst new file mode 100644 index 000000000..1b7bed32c --- /dev/null +++ b/news/procs.rst @@ -0,0 +1,24 @@ +**Added:** + +* New ``xonsh.procs`` subpackage for handling subprocess mode. + +**Changed:** + +* + +**Deprecated:** + +* + +**Removed:** + +* The deprecated ``foreground`` decorator has been removed. + Please use ``unthreadable`` instead. + +**Fixed:** + +* + +**Security:** + +* diff --git a/xonsh/procs/posix.py b/xonsh/procs/posix.py new file mode 100644 index 000000000..b125073c3 --- /dev/null +++ b/xonsh/procs/posix.py @@ -0,0 +1,488 @@ +"""Interface for running subprocess-mode commands on posix systems.""" +import os +import io +import time +import array +import signal +import builtins +import threading +import subprocess + +import xonsh.lazyasd as xl +import xonsh.platform as xp +import xonsh.tools as xt + +from xonsh.procs.readers import BufferedFDParallelReader, NonBlockingFDReader, safe_fdclose + + +# The following escape codes are xterm codes. +# See http://rtfm.etla.org/xterm/ctlseq.html for more. +MODE_NUMS = ("1049", "47", "1047") + + +@xl.lazyobject +def START_ALTERNATE_MODE(): + return frozenset("\x1b[?{0}h".format(i).encode() for i in MODE_NUMS) + + +@xl.lazyobject +def END_ALTERNATE_MODE(): + return frozenset("\x1b[?{0}l".format(i).encode() for i in MODE_NUMS) + + +@xl.lazyobject +def ALTERNATE_MODE_FLAGS(): + return tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE) + + +class PopenThread(threading.Thread): + """A thread for running and managing subprocess. This allows reading + from the stdin, stdout, and stderr streams in a non-blocking fashion. + + This takes the same arguments and keyword arguments as regular Popen. + This requires that the captured_stdout and captured_stderr attributes + to be set following instantiation. + """ + + def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs): + super().__init__() + self.lock = threading.RLock() + env = builtins.__xonsh__.env + # stdin setup + self.orig_stdin = stdin + if stdin is None: + self.stdin_fd = 0 + elif isinstance(stdin, int): + self.stdin_fd = stdin + else: + self.stdin_fd = stdin.fileno() + self.store_stdin = env.get("XONSH_STORE_STDIN") + self.timeout = env.get("XONSH_PROC_FREQUENCY") + self.in_alt_mode = False + self.stdin_mode = None + self._tc_cc_vsusp = b"\x1a" # default is usually ^Z + self._disable_suspend_keybind() + # stdout setup + self.orig_stdout = stdout + self.stdout_fd = 1 if stdout is None else stdout.fileno() + self._set_pty_size() + # stderr setup + self.orig_stderr = stderr + # Set some signal handles, if we can. Must come before process + # is started to prevent deadlock on windows + self.proc = None # has to be here for closure for handles + self.old_int_handler = self.old_winch_handler = None + self.old_tstp_handler = self.old_quit_handler = None + if xt.on_main_thread(): + self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int) + if xp.ON_POSIX: + self.old_tstp_handler = signal.signal(signal.SIGTSTP, self._signal_tstp) + self.old_quit_handler = signal.signal(signal.SIGQUIT, self._signal_quit) + if xp.CAN_RESIZE_WINDOW: + self.old_winch_handler = signal.signal( + signal.SIGWINCH, self._signal_winch + ) + # start up process + if xp.ON_WINDOWS and stdout is not None: + os.set_inheritable(stdout.fileno(), False) + + try: + self.proc = proc = subprocess.Popen( + *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwargs + ) + except Exception: + self._clean_up() + raise + + self.pid = proc.pid + self.universal_newlines = uninew = proc.universal_newlines + if uninew: + self.encoding = enc = env.get("XONSH_ENCODING") + self.encoding_errors = err = env.get("XONSH_ENCODING_ERRORS") + self.stdin = io.BytesIO() # stdin is always bytes! + self.stdout = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err) + self.stderr = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err) + else: + self.encoding = self.encoding_errors = None + self.stdin = io.BytesIO() + self.stdout = io.BytesIO() + self.stderr = io.BytesIO() + self.suspended = False + self.prevs_are_closed = False + self.start() + + def run(self): + """Runs the subprocess by performing a parallel read on stdin if allowed, + and copying bytes from captured_stdout to stdout and bytes from + captured_stderr to stderr. + """ + proc = self.proc + spec = self._wait_and_getattr("spec") + # get stdin and apply parallel reader if needed. + stdin = self.stdin + if self.orig_stdin is None: + origin = None + elif xp.ON_POSIX and self.store_stdin: + origin = self.orig_stdin + origfd = origin if isinstance(origin, int) else origin.fileno() + origin = BufferedFDParallelReader(origfd, buffer=stdin) + else: + origin = None + # get non-blocking stdout + stdout = self.stdout.buffer if self.universal_newlines else self.stdout + capout = spec.captured_stdout + if capout is None: + procout = None + else: + procout = NonBlockingFDReader(capout.fileno(), timeout=self.timeout) + # get non-blocking stderr + stderr = self.stderr.buffer if self.universal_newlines else self.stderr + caperr = spec.captured_stderr + if caperr is None: + procerr = None + else: + procerr = NonBlockingFDReader(caperr.fileno(), timeout=self.timeout) + # initial read from buffer + self._read_write(procout, stdout, sys.__stdout__) + self._read_write(procerr, stderr, sys.__stderr__) + # loop over reads while process is running. + i = j = cnt = 1 + while proc.poll() is None: + # this is here for CPU performance reasons. + if i + j == 0: + cnt = min(cnt + 1, 1000) + tout = self.timeout * cnt + if procout is not None: + procout.timeout = tout + if procerr is not None: + procerr.timeout = tout + elif cnt == 1: + pass + else: + cnt = 1 + if procout is not None: + procout.timeout = self.timeout + if procerr is not None: + procerr.timeout = self.timeout + # redirect some output! + i = self._read_write(procout, stdout, sys.__stdout__) + j = self._read_write(procerr, stderr, sys.__stderr__) + if self.suspended: + break + if self.suspended: + return + # close files to send EOF to non-blocking reader. + # capout & caperr seem to be needed only by Windows, while + # orig_stdout & orig_stderr are need by posix and Windows. + # Also, order seems to matter here, + # with orig_* needed to be closed before cap* + safe_fdclose(self.orig_stdout) + safe_fdclose(self.orig_stderr) + if xp.ON_WINDOWS: + safe_fdclose(capout) + safe_fdclose(caperr) + # read in the remaining data in a blocking fashion. + while (procout is not None and not procout.is_fully_read()) or ( + procerr is not None and not procerr.is_fully_read() + ): + self._read_write(procout, stdout, sys.__stdout__) + self._read_write(procerr, stderr, sys.__stderr__) + # kill the process if it is still alive. Happens when piping. + if proc.poll() is None: + proc.terminate() + + def _wait_and_getattr(self, name): + """make sure the instance has a certain attr, and return it.""" + while not hasattr(self, name): + time.sleep(1e-7) + return getattr(self, name) + + def _read_write(self, reader, writer, stdbuf): + """Reads a chunk of bytes from a buffer and write into memory or back + down to the standard buffer, as appropriate. Returns the number of + successful reads. + """ + if reader is None: + return 0 + i = -1 + for i, chunk in enumerate(iter(reader.read_queue, b"")): + self._alt_mode_switch(chunk, writer, stdbuf) + if i >= 0: + writer.flush() + stdbuf.flush() + return i + 1 + + def _alt_mode_switch(self, chunk, membuf, stdbuf): + """Enables recursively switching between normal capturing mode + and 'alt' mode, which passes through values to the standard + buffer. Pagers, text editors, curses applications, etc. use + alternate mode. + """ + i, flag = xt.findfirst(chunk, ALTERNATE_MODE_FLAGS) + if flag is None: + self._alt_mode_writer(chunk, membuf, stdbuf) + else: + # This code is executed when the child process switches the + # terminal into or out of alternate mode. The line below assumes + # that the user has opened vim, less, or similar, and writes writes + # to stdin. + j = i + len(flag) + # write the first part of the chunk in the current mode. + self._alt_mode_writer(chunk[:i], membuf, stdbuf) + # switch modes + # write the flag itself the current mode where alt mode is on + # so that it is streamed to the terminal ASAP. + # this is needed for terminal emulators to find the correct + # positions before and after alt mode. + alt_mode = flag in START_ALTERNATE_MODE + if alt_mode: + self.in_alt_mode = alt_mode + self._alt_mode_writer(flag, membuf, stdbuf) + self._enable_cbreak_stdin() + else: + self._alt_mode_writer(flag, membuf, stdbuf) + self.in_alt_mode = alt_mode + self._disable_cbreak_stdin() + # recurse this function, but without the current flag. + self._alt_mode_switch(chunk[j:], membuf, stdbuf) + + def _alt_mode_writer(self, chunk, membuf, stdbuf): + """Write bytes to the standard buffer if in alt mode or otherwise + to the in-memory buffer. + """ + if not chunk: + pass # don't write empty values + elif self.in_alt_mode: + stdbuf.buffer.write(chunk) + else: + with self.lock: + p = membuf.tell() + membuf.seek(0, io.SEEK_END) + membuf.write(chunk) + membuf.seek(p) + + # + # Window resize handlers + # + + def _signal_winch(self, signum, frame): + """Signal handler for SIGWINCH - window size has changed.""" + self.send_signal(signal.SIGWINCH) + self._set_pty_size() + + def _set_pty_size(self): + """Sets the window size of the child pty based on the window size of + our own controlling terminal. + """ + if xp.ON_WINDOWS or not os.isatty(self.stdout_fd): + return + # Get the terminal size of the real terminal, set it on the + # pseudoterminal. + buf = array.array("h", [0, 0, 0, 0]) + # 1 = stdout here + try: + xli.fcntl.ioctl(1, xli.termios.TIOCGWINSZ, buf, True) + xli.fcntl.ioctl(self.stdout_fd, xli.termios.TIOCSWINSZ, buf) + except OSError: + pass + + # + # SIGINT handler + # + + def _signal_int(self, signum, frame): + """Signal handler for SIGINT - Ctrl+C may have been pressed.""" + self.send_signal(signal.CTRL_C_EVENT if xp.ON_WINDOWS else signum) + if self.proc is not None and self.proc.poll() is not None: + self._restore_sigint(frame=frame) + if xt.on_main_thread() and not xp.ON_WINDOWS: + signal.pthread_kill(threading.get_ident(), signal.SIGINT) + + def _restore_sigint(self, frame=None): + old = self.old_int_handler + if old is not None: + if xt.on_main_thread(): + signal.signal(signal.SIGINT, old) + self.old_int_handler = None + if frame is not None: + self._disable_cbreak_stdin() + if old is not None and old is not self._signal_int: + old(signal.SIGINT, frame) + + # + # SIGTSTP handler + # + + def _signal_tstp(self, signum, frame): + """Signal handler for suspending SIGTSTP - Ctrl+Z may have been pressed. + """ + self.suspended = True + self.send_signal(signum) + self._restore_sigtstp(frame=frame) + + def _restore_sigtstp(self, frame=None): + old = self.old_tstp_handler + if old is not None: + if xt.on_main_thread(): + signal.signal(signal.SIGTSTP, old) + self.old_tstp_handler = None + if frame is not None: + self._disable_cbreak_stdin() + self._restore_suspend_keybind() + + def _disable_suspend_keybind(self): + if xp.ON_WINDOWS: + return + try: + mode = xli.termios.tcgetattr(0) # only makes sense for stdin + self._tc_cc_vsusp = mode[CC][xli.termios.VSUSP] + mode[CC][xli.termios.VSUSP] = b"\x00" # set ^Z (ie SIGSTOP) to undefined + xli.termios.tcsetattr(0, xli.termios.TCSANOW, mode) + except xli.termios.error: + return + + def _restore_suspend_keybind(self): + if xp.ON_WINDOWS: + return + try: + mode = xli.termios.tcgetattr(0) # only makes sense for stdin + mode[CC][ + xli.termios.VSUSP + ] = self._tc_cc_vsusp # set ^Z (ie SIGSTOP) to original + # this usually doesn't work in interactive mode, + # but we should try it anyway. + xli.termios.tcsetattr(0, xli.termios.TCSANOW, mode) + except xli.termios.error: + pass + + # + # SIGQUIT handler + # + + def _signal_quit(self, signum, frame): + r"""Signal handler for quiting SIGQUIT - Ctrl+\ may have been pressed. + """ + self.send_signal(signum) + self._restore_sigquit(frame=frame) + + def _restore_sigquit(self, frame=None): + old = self.old_quit_handler + if old is not None: + if xt.on_main_thread(): + signal.signal(signal.SIGQUIT, old) + self.old_quit_handler = None + if frame is not None: + self._disable_cbreak_stdin() + + # + # cbreak mode handlers + # + + def _enable_cbreak_stdin(self): + if not xp.ON_POSIX: + return + try: + self.stdin_mode = xli.termios.tcgetattr(self.stdin_fd)[:] + except xli.termios.error: + # this can happen for cases where another process is controlling + # xonsh's tty device, such as in testing. + self.stdin_mode = None + return + new = self.stdin_mode[:] + new[LFLAG] &= ~(xli.termios.ECHO | xli.termios.ICANON) + new[CC][xli.termios.VMIN] = 1 + new[CC][xli.termios.VTIME] = 0 + try: + # termios.TCSAFLUSH may be less reliable than termios.TCSANOW + xli.termios.tcsetattr(self.stdin_fd, xli.termios.TCSANOW, new) + except xli.termios.error: + self._disable_cbreak_stdin() + + def _disable_cbreak_stdin(self): + if not xp.ON_POSIX or self.stdin_mode is None: + return + new = self.stdin_mode[:] + new[LFLAG] |= xli.termios.ECHO | xli.termios.ICANON + new[CC][xli.termios.VMIN] = 1 + new[CC][xli.termios.VTIME] = 0 + try: + xli.termios.tcsetattr(self.stdin_fd, xli.termios.TCSANOW, new) + except xli.termios.error: + pass + + # + # Dispatch methods + # + + def poll(self): + """Dispatches to Popen.returncode.""" + return self.proc.returncode + + def wait(self, timeout=None): + """Dispatches to Popen.wait(), but also does process cleanup such as + joining this thread and replacing the original window size signal + handler. + """ + self._disable_cbreak_stdin() + rtn = self.proc.wait(timeout=timeout) + self.join() + # need to replace the old signal handlers somewhere... + if self.old_winch_handler is not None and xt.on_main_thread(): + signal.signal(signal.SIGWINCH, self.old_winch_handler) + self.old_winch_handler = None + self._clean_up() + return rtn + + def _clean_up(self): + self._restore_sigint() + self._restore_sigtstp() + self._restore_sigquit() + + @property + def returncode(self): + """Process return code.""" + return self.proc.returncode + + @returncode.setter + def returncode(self, value): + """Process return code.""" + self.proc.returncode = value + + @property + def signal(self): + """Process signal, or None.""" + s = getattr(self.proc, "signal", None) + if s is None: + rtn = self.returncode + if rtn is not None and rtn != 0: + s = (-1 * rtn, rtn < 0 if xp.ON_WINDOWS else os.WCOREDUMP(rtn)) + return s + + @signal.setter + def signal(self, value): + """Process signal, or None.""" + self.proc.signal = value + + def send_signal(self, signal): + """Dispatches to Popen.send_signal().""" + dt = 0.0 + while self.proc is None and dt < self.timeout: + time.sleep(1e-7) + dt += 1e-7 + if self.proc is None: + return + try: + rtn = self.proc.send_signal(signal) + except ProcessLookupError: + # This can happen in the case of !(cmd) when the command has ended + rtn = None + return rtn + + def terminate(self): + """Dispatches to Popen.terminate().""" + return self.proc.terminate() + + def kill(self): + """Dispatches to Popen.kill().""" + return self.proc.kill() + diff --git a/xonsh/procs/readers.py b/xonsh/procs/readers.py new file mode 100644 index 000000000..6f46c7488 --- /dev/null +++ b/xonsh/procs/readers.py @@ -0,0 +1,422 @@ +"""File handle readers and related tools.""" +import os +import io +import time +import queue +import builtins +import threading + +import xonsh.lazyimps as xli + + +class QueueReader: + """Provides a file-like interface to reading from a queue.""" + + def __init__(self, fd, timeout=None): + """ + Parameters + ---------- + fd : int + A file descriptor + timeout : float or None, optional + The queue reading timeout. + """ + self.fd = fd + self.timeout = timeout + self.closed = False + self.queue = queue.Queue() + self.thread = None + + def close(self): + """close the reader""" + self.closed = True + + def is_fully_read(self): + """Returns whether or not the queue is fully read and the reader is + closed. + """ + return ( + self.closed + and (self.thread is None or not self.thread.is_alive()) + and self.queue.empty() + ) + + def read_queue(self): + """Reads a single chunk from the queue. This is blocking if + the timeout is None and non-blocking otherwise. + """ + try: + return self.queue.get(block=True, timeout=self.timeout) + except queue.Empty: + return b"" + + def read(self, size=-1): + """Reads bytes from the file.""" + i = 0 + buf = b"" + while size < 0 or i != size: + line = self.read_queue() + if line: + buf += line + else: + break + i += len(line) + return buf + + def readline(self, size=-1): + """Reads a line, or a partial line from the file descriptor.""" + i = 0 + nl = b"\n" + buf = b"" + while size < 0 or i != size: + line = self.read_queue() + if line: + buf += line + if line.endswith(nl): + break + else: + break + i += len(line) + return buf + + def _read_all_lines(self): + """This reads all remaining lines in a blocking fashion.""" + lines = [] + while not self.is_fully_read(): + chunk = self.read_queue() + lines.extend(chunk.splitlines(keepends=True)) + return lines + + def readlines(self, hint=-1): + """Reads lines from the file descriptor. This is blocking for negative + hints (i.e. read all the remaining lines) and non-blocking otherwise. + """ + if hint == -1: + return self._read_all_lines() + lines = [] + while len(lines) != hint: + chunk = self.read_queue() + if not chunk: + break + lines.extend(chunk.splitlines(keepends=True)) + return lines + + def fileno(self): + """Returns the file descriptor number.""" + return self.fd + + @staticmethod + def readable(): + """Returns true, because this object is always readable.""" + return True + + def iterqueue(self): + """Iterates through all remaining chunks in a blocking fashion.""" + while not self.is_fully_read(): + chunk = self.read_queue() + if not chunk: + continue + yield chunk + + +def populate_fd_queue(reader, fd, queue): + """Reads 1 kb of data from a file descriptor into a queue. + If this ends or fails, it flags the calling reader object as closed. + """ + while True: + try: + c = os.read(fd, 1024) + except OSError: + reader.closed = True + break + if c: + queue.put(c) + else: + reader.closed = True + break + + +class NonBlockingFDReader(QueueReader): + """A class for reading characters from a file descriptor on a background + thread. This has the advantages that the calling thread can close the + file and that the reading does not block the calling thread. + """ + + def __init__(self, fd, timeout=None): + """ + Parameters + ---------- + fd : int + A file descriptor + timeout : float or None, optional + The queue reading timeout. + """ + super().__init__(fd, timeout=timeout) + # start reading from stream + self.thread = threading.Thread( + target=populate_fd_queue, args=(self, self.fd, self.queue) + ) + self.thread.daemon = True + self.thread.start() + + +def populate_buffer(reader, fd, buffer, chunksize): + """Reads bytes from the file descriptor and copies them into a buffer. + + The reads happen in parallel using the pread() syscall; which is only + available on POSIX systems. If the read fails for any reason, the reader is + flagged as closed. + """ + offset = 0 + while True: + try: + buf = os.pread(fd, chunksize, offset) + except OSError: + reader.closed = True + break + if buf: + buffer.write(buf) + offset += len(buf) + else: + reader.closed = True + break + + +class BufferedFDParallelReader: + """Buffered, parallel background thread reader.""" + + def __init__(self, fd, buffer=None, chunksize=1024): + """ + Parameters + ---------- + fd : int + File descriptor from which to read. + buffer : binary file-like or None, optional + A buffer to write bytes into. If None, a new BytesIO object + is created. + chunksize : int, optional + The max size of the parallel reads, default 1 kb. + """ + self.fd = fd + self.buffer = io.BytesIO() if buffer is None else buffer + self.chunksize = chunksize + self.closed = False + # start reading from stream + self.thread = threading.Thread( + target=populate_buffer, args=(self, fd, self.buffer, chunksize) + ) + self.thread.daemon = True + + self.thread.start() + + +def _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd): + # if we are getting close to the end of the console buffer, + # expand it so that we can read from it successfully. + if cols == 0: + return orig_posize[-1], max_offset, orig_posize + rows = ((max_offset + expandsize) // cols) + 1 + xli.winutils.set_console_screen_buffer_size(cols, rows, fd=fd) + orig_posize = orig_posize[:3] + (rows,) + max_offset = (rows - 1) * cols + return rows, max_offset, orig_posize + + +def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None): + """Reads bytes from the file descriptor and puts lines into the queue. + The reads happened in parallel, + using xonsh.winutils.read_console_output_character(), + and is thus only available on windows. If the read fails for any reason, + the reader is flagged as closed. + """ + # OK, so this function is super annoying because Windows stores its + # buffers as a 2D regular, dense array -- without trailing newlines. + # Meanwhile, we want to add *lines* to the queue. Also, as is typical + # with parallel reads, the entire buffer that you ask for may not be + # filled. Thus we have to deal with the full generality. + # 1. reads may end in the middle of a line + # 2. excess whitespace at the end of a line may not be real, unless + # 3. you haven't read to the end of the line yet! + # So there are alignment issues everywhere. Also, Windows will automatically + # read past the current cursor position, even though there is presumably + # nothing to see there. + # + # These chunked reads basically need to happen like this because, + # a. The default buffer size is HUGE for the console (90k lines x 120 cols) + # as so we can't just read in everything at the end and see what we + # care about without a noticeable performance hit. + # b. Even with this huge size, it is still possible to write more lines than + # this, so we should scroll along with the console. + # Unfortunately, because we do not have control over the terminal emulator, + # It is not possible to compute how far back we should set the beginning + # read position because we don't know how many characters have been popped + # off the top of the buffer. If we did somehow know this number we could do + # something like the following: + # + # new_offset = (y*cols) + x + # if new_offset == max_offset: + # new_offset -= scrolled_offset + # x = new_offset%cols + # y = new_offset//cols + # continue + # + # So this method is imperfect and only works as long as the screen has + # room to expand to. Thus the trick here is to expand the screen size + # when we get close enough to the end of the screen. There remain some + # async issues related to not being able to set the cursor position. + # but they just affect the alignment / capture of the output of the + # first command run after a screen resize. + if expandsize is None: + expandsize = 100 * chunksize + x, y, cols, rows = posize = xli.winutils.get_position_size(fd) + pre_x = pre_y = -1 + orig_posize = posize + offset = (cols * y) + x + max_offset = (rows - 1) * cols + # I believe that there is a bug in PTK that if we reset the + # cursor position, the cursor on the next prompt is accidentally on + # the next line. If this is fixed, uncomment the following line. + # if max_offset < offset + expandsize: + # rows, max_offset, orig_posize = _expand_console_buffer( + # cols, max_offset, expandsize, + # orig_posize, fd) + # winutils.set_console_cursor_position(x, y, fd=fd) + while True: + posize = xli.winutils.get_position_size(fd) + offset = (cols * y) + x + if ((posize[1], posize[0]) <= (y, x) and posize[2:] == (cols, rows)) or ( + pre_x == x and pre_y == y + ): + # already at or ahead of the current cursor position. + if reader.closed: + break + else: + time.sleep(reader.timeout) + continue + elif max_offset <= offset + expandsize: + ecb = _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd) + rows, max_offset, orig_posize = ecb + continue + elif posize[2:] == (cols, rows): + # cursor updated but screen size is the same. + pass + else: + # screen size changed, which is offset preserving + orig_posize = posize + cols, rows = posize[2:] + x = offset % cols + y = offset // cols + pre_x = pre_y = -1 + max_offset = (rows - 1) * cols + continue + try: + buf = xli.winutils.read_console_output_character( + x=x, y=y, fd=fd, buf=buffer, bufsize=chunksize, raw=True + ) + except (OSError, IOError): + reader.closed = True + break + # cursor position and offset + if not reader.closed: + buf = buf.rstrip() + nread = len(buf) + if nread == 0: + time.sleep(reader.timeout) + continue + cur_x, cur_y = posize[0], posize[1] + cur_offset = (cols * cur_y) + cur_x + beg_offset = (cols * y) + x + end_offset = beg_offset + nread + if end_offset > cur_offset and cur_offset != max_offset: + buf = buf[: cur_offset - end_offset] + # convert to lines + xshift = cols - x + yshift = (nread // cols) + (1 if nread % cols > 0 else 0) + lines = [buf[:xshift]] + lines += [ + buf[l * cols + xshift : (l + 1) * cols + xshift] for l in range(yshift) + ] + lines = [line for line in lines if line] + if not lines: + time.sleep(reader.timeout) + continue + # put lines in the queue + nl = b"\n" + for line in lines[:-1]: + queue.put(line.rstrip() + nl) + if len(lines[-1]) == xshift: + queue.put(lines[-1].rstrip() + nl) + else: + queue.put(lines[-1]) + # update x and y locations + if (beg_offset + len(buf)) % cols == 0: + new_offset = beg_offset + len(buf) + else: + new_offset = beg_offset + len(buf.rstrip()) + pre_x = x + pre_y = y + x = new_offset % cols + y = new_offset // cols + time.sleep(reader.timeout) + + +class ConsoleParallelReader(QueueReader): + """Parallel reader for consoles that runs in a background thread. + This is only needed, available, and useful on Windows. + """ + + def __init__(self, fd, buffer=None, chunksize=1024, timeout=None): + """ + Parameters + ---------- + fd : int + Standard buffer file descriptor, 0 for stdin, 1 for stdout (default), + and 2 for stderr. + buffer : ctypes.c_wchar_p, optional + An existing buffer to (re-)use. + chunksize : int, optional + The max size of the parallel reads, default 1 kb. + timeout : float, optional + The queue reading timeout. + """ + timeout = timeout or builtins.__xonsh__.env.get("XONSH_PROC_FREQUENCY") + super().__init__(fd, timeout=timeout) + self._buffer = buffer # this cannot be public + if buffer is None: + self._buffer = ctypes.c_char_p(b" " * chunksize) + self.chunksize = chunksize + # start reading from stream + self.thread = threading.Thread( + target=populate_console, + args=(self, fd, self._buffer, chunksize, self.queue), + ) + self.thread.daemon = True + self.thread.start() + + +def safe_fdclose(handle, cache=None): + """Closes a file handle in the safest way possible, and potentially + storing the result. + """ + if cache is not None and cache.get(handle, False): + return + status = True + if handle is None: + pass + elif isinstance(handle, int): + if handle >= 3: + # don't close stdin, stdout, stderr, -1 + try: + os.close(handle) + except OSError: + status = False + elif handle is sys.stdin or handle is sys.stdout or handle is sys.stderr: + # don't close stdin, stdout, or stderr + pass + else: + try: + handle.close() + except OSError: + status = False + if cache is not None: + cache[handle] = status