parallel readed

This commit is contained in:
Anthony Scopatz 2016-09-24 11:13:11 -04:00
parent 2b3ae98289
commit f8cb5392c4

View file

@ -68,6 +68,20 @@ ALTERNATE_MODE_FLAGS = LazyObject(
globals(), 'ALTERNATE_MODE_FLAGS')
def populate_char_queue(nb, fd, queue):
while True:
try:
c = os.read(fd, 1)
except OSError:
nb.closed = True
break
if c:
queue.put(c)
else:
nb.closed = True
break
class NonBlockingFDReader:
def __init__(self, fd, timeout=None):
@ -75,21 +89,8 @@ class NonBlockingFDReader:
self.queue = queue.Queue()
self.timeout = timeout
self.closed = False
def populate_queue(nb, fd, queue):
while True:
try:
c = os.read(fd, 1)
except OSError:
nb.closed = True
break
if c:
queue.put(c)
else:
break
# start reading from stream
self.thread = threading.Thread(target=populate_queue,
self.thread = threading.Thread(target=populate_char_queue,
args=(self, self.fd, self.queue))
self.thread.daemon = True
self.thread.start()
@ -142,6 +143,38 @@ class NonBlockingFDReader:
return self.fd
def populate_char_queue(reader, fd, buffer, chunksize):
offset = 0
while True:
try:
buf = os.pread(fd, chunksize, offset)
except OSError:
reader.closed = True
break
if c:
buffer.write(buf)
offset += len(buf)
else:
reader.closed = True
break
class BufferedFDParallelReader:
def __init__(self, fd, buffer=None, chunksize=1024):
self.fd = fd
self.buffer = io.BytesIO() if buffer is None else buffer
self.chunksize = chunksize
self.closed = False
# start reading from stream
self.thread = threading.Thread(target=populate_buffer,
args=(self, fd, self.buffer, chunksize))
self.thread.daemon = True
self.thread.start()
class PopenThread(threading.Thread):
def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs):
@ -155,8 +188,6 @@ class PopenThread(threading.Thread):
self._signal_winch)
# start up process
self.orig_stdin = stdin
if stdin is not None:
stdin = subprocess.PIPE
self.proc = proc = subprocess.Popen(*args,
stdin=stdin,
stdout=stdout,
@ -189,6 +220,9 @@ class PopenThread(threading.Thread):
self._wait_for_attr('captured_stdin')
self._wait_for_attr('captured_stdout')
self._wait_for_attr('captured_stderr')
stdin = self.stdin
stdout = self.stdout
stderr = self.stderr
#if self.piped_stdin is None:
# pipein = None
#else:
@ -202,25 +236,23 @@ class PopenThread(threading.Thread):
else:
origin = self.orig_stdin
origfd = origin if isinstance(origin, int) else origin.fileno()
origin = NonBlockingFDReader(origfd, timeout=self.timeout)
origin = BufferedFDParallelReader(origfd, buffer=stdin,
timeout=self.timeout)
procout = NonBlockingFDReader(self.captured_stdout.fileno(),
timeout=self.timeout)
procerr = NonBlockingFDReader(self.captured_stderr.fileno(),
timeout=self.timeout)
stdin = self.stdin
stdout = self.stdout
stderr = self.stderr
#print('a')
#print(self.piped_stdin)
#self._read_write_in(pipein, sysin, stdin, capin)
self._communicate_in(origin, stdin)
#self._communicate_in(origin, stdin)
self._read_write(procout, stdout, sys.stdout)
self._read_write(procerr, stderr, sys.stderr)
#print('b')
while proc.poll() is None:
#print('c')
#self._read_write_in(pipein, sysin, stdin, capin)
self._communicate_in(origin, stdin)
#self._communicate_in(origin, stdin)
self._read_write(procout, stdout, sys.stdout)
self._read_write(procerr, stderr, sys.stderr)
#if pipein is not None and pipein.closed:
@ -233,7 +265,7 @@ class PopenThread(threading.Thread):
#print('d')
#print('e')
#self._read_write_in(pipein, sysin, stdin, capin)
self._communicate_in(origin, stdin)
#self._communicate_in(origin, stdin)
self._read_write(procout, stdout, sys.stdout)
self._read_write(procerr, stderr, sys.stderr)
#print('f')