From f8cb5392c49126e0c2ac31cefdf9bb2154c98954 Mon Sep 17 00:00:00 2001 From: Anthony Scopatz Date: Sat, 24 Sep 2016 11:13:11 -0400 Subject: [PATCH] parallel readed --- xonsh/proc.py | 78 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/xonsh/proc.py b/xonsh/proc.py index 0ae812a1b..148d66bf4 100644 --- a/xonsh/proc.py +++ b/xonsh/proc.py @@ -68,6 +68,20 @@ ALTERNATE_MODE_FLAGS = LazyObject( globals(), 'ALTERNATE_MODE_FLAGS') +def populate_char_queue(nb, fd, queue): + while True: + try: + c = os.read(fd, 1) + except OSError: + nb.closed = True + break + if c: + queue.put(c) + else: + nb.closed = True + break + + class NonBlockingFDReader: def __init__(self, fd, timeout=None): @@ -75,21 +89,8 @@ class NonBlockingFDReader: self.queue = queue.Queue() self.timeout = timeout self.closed = False - - def populate_queue(nb, fd, queue): - while True: - try: - c = os.read(fd, 1) - except OSError: - nb.closed = True - break - if c: - queue.put(c) - else: - break - # start reading from stream - self.thread = threading.Thread(target=populate_queue, + self.thread = threading.Thread(target=populate_char_queue, args=(self, self.fd, self.queue)) self.thread.daemon = True self.thread.start() @@ -142,6 +143,38 @@ class NonBlockingFDReader: return self.fd +def populate_char_queue(reader, fd, buffer, chunksize): + offset = 0 + while True: + try: + buf = os.pread(fd, chunksize, offset) + except OSError: + reader.closed = True + break + if c: + buffer.write(buf) + offset += len(buf) + else: + reader.closed = True + break + + +class BufferedFDParallelReader: + + def __init__(self, fd, buffer=None, chunksize=1024): + self.fd = fd + self.buffer = io.BytesIO() if buffer is None else buffer + self.chunksize = chunksize + self.closed = False + # start reading from stream + self.thread = threading.Thread(target=populate_buffer, + args=(self, fd, self.buffer, chunksize)) + self.thread.daemon = True + self.thread.start() + + + + class PopenThread(threading.Thread): def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs): @@ -155,8 +188,6 @@ class PopenThread(threading.Thread): self._signal_winch) # start up process self.orig_stdin = stdin - if stdin is not None: - stdin = subprocess.PIPE self.proc = proc = subprocess.Popen(*args, stdin=stdin, stdout=stdout, @@ -189,6 +220,9 @@ class PopenThread(threading.Thread): self._wait_for_attr('captured_stdin') self._wait_for_attr('captured_stdout') self._wait_for_attr('captured_stderr') + stdin = self.stdin + stdout = self.stdout + stderr = self.stderr #if self.piped_stdin is None: # pipein = None #else: @@ -202,25 +236,23 @@ class PopenThread(threading.Thread): else: origin = self.orig_stdin origfd = origin if isinstance(origin, int) else origin.fileno() - origin = NonBlockingFDReader(origfd, timeout=self.timeout) + origin = BufferedFDParallelReader(origfd, buffer=stdin, + timeout=self.timeout) procout = NonBlockingFDReader(self.captured_stdout.fileno(), timeout=self.timeout) procerr = NonBlockingFDReader(self.captured_stderr.fileno(), timeout=self.timeout) - stdin = self.stdin - stdout = self.stdout - stderr = self.stderr #print('a') #print(self.piped_stdin) #self._read_write_in(pipein, sysin, stdin, capin) - self._communicate_in(origin, stdin) + #self._communicate_in(origin, stdin) self._read_write(procout, stdout, sys.stdout) self._read_write(procerr, stderr, sys.stderr) #print('b') while proc.poll() is None: #print('c') #self._read_write_in(pipein, sysin, stdin, capin) - self._communicate_in(origin, stdin) + #self._communicate_in(origin, stdin) self._read_write(procout, stdout, sys.stdout) self._read_write(procerr, stderr, sys.stderr) #if pipein is not None and pipein.closed: @@ -233,7 +265,7 @@ class PopenThread(threading.Thread): #print('d') #print('e') #self._read_write_in(pipein, sysin, stdin, capin) - self._communicate_in(origin, stdin) + #self._communicate_in(origin, stdin) self._read_write(procout, stdout, sys.stdout) self._read_write(procerr, stderr, sys.stderr) #print('f')