start of large buffer size

This commit is contained in:
Anthony Scopatz 2016-11-05 11:50:42 -04:00
parent 3545358a4b
commit 495d04f6d6

View file

@ -68,51 +68,15 @@ def RE_VT100_ESCAPE():
return re.compile(b'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
def populate_char_queue(reader, fd, queue):
"""Reads single characters from a file descriptor into a queue.
If this ends or fails, it flags the calling reader object as closed.
"""
while True:
try:
c = os.read(fd, 1)
except OSError:
reader.closed = True
break
if c:
queue.put(c)
else:
reader.closed = True
break
class QueueReader:
"""Provides a file-like interface to reading from a queue."""
def close(self):
"""close the reader"""
self.closed = True
class NonBlockingFDReader:
"""A class for reading characters from a file descriptor on a background
thread. This has the advantages that the calling thread can close the
file and that the reading does not block the calling thread.
"""
def __init__(self, fd, timeout=None):
"""
Parameters
----------
fd : int
A file descriptor
timeout : float or None, optional
The queue reading timeout.
"""
self.fd = fd
self.queue = queue.Queue()
self.timeout = timeout
self.sleepscale = 0
self.closed = False
# start reading from stream
self.thread = threading.Thread(target=populate_char_queue,
args=(self, self.fd, self.queue))
self.thread.daemon = True
self.thread.start()
def read_char(self, timeout=None):
"""Reads a single character from the queue."""
def read_queue(self, timeout=None):
"""Reads a single 'line' from the queue."""
timeout = timeout or self.timeout
try:
self.sleepscale = 0
@ -127,13 +91,13 @@ class NonBlockingFDReader:
"""Reads bytes from the file."""
i = 0
buf = b''
while i != size:
c = self.read_char()
if c:
buf += c
while size < 0 or i != size:
line = self.read_queue()
if line:
buf += line
else:
break
i += 1
i += len(line)
return buf
def readline(self, size=-1):
@ -141,15 +105,15 @@ class NonBlockingFDReader:
i = 0
nl = b'\n'
buf = b''
while i != size:
c = self.read_char()
if c:
buf += c
if c == nl:
while size < 0 or i != size:
line = self.read_queue()
if line:
buf += line
if line.endswith(nl):
break
else:
break
i += 1
i += len(line)
return buf
def readlines(self, hint=-1):
@ -172,6 +136,51 @@ class NonBlockingFDReader:
return True
def populate_fd_queue(reader, fd, queue):
"""Reads single characters from a file descriptor into a queue.
If this ends or fails, it flags the calling reader object as closed.
"""
while True:
try:
c = os.read(fd, 1024)
except OSError:
reader.closed = True
break
if c:
queue.put(c)
else:
reader.closed = True
break
class NonBlockingFDReader(QueueReader):
"""A class for reading characters from a file descriptor on a background
thread. This has the advantages that the calling thread can close the
file and that the reading does not block the calling thread.
"""
def __init__(self, fd, timeout=None):
"""
Parameters
----------
fd : int
A file descriptor
timeout : float or None, optional
The queue reading timeout.
"""
self.fd = fd
self.queue = queue.Queue()
self.timeout = timeout
self.sleepscale = 0
self.closed = False
# start reading from stream
self.thread = threading.Thread(target=populate_fd_queue,
args=(self, self.fd, self.queue))
self.thread.daemon = True
self.thread.start()
def populate_buffer(reader, fd, buffer, chunksize):
"""Reads bytes from the file descriptor and copies them into a buffer.
The reads happend in parallel, using pread(), and is thus only
@ -367,7 +376,7 @@ def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None):
time.sleep(reader.timeout)
class ConsoleParallelReader:
class ConsoleParallelReader(QueueReader):
"""Parallel reader for consoles that runs in a background thread.
This is only needed, available, and useful on Windows.
"""
@ -402,70 +411,6 @@ class ConsoleParallelReader:
self.thread.daemon = True
self.thread.start()
def close(self):
"""close the reader"""
self.closed = True
def read_queue(self, timeout=None):
"""Reads a single 'line' from the queue."""
timeout = timeout or self.timeout
try:
self.sleepscale = 0
return self.queue.get(block=timeout is not None,
timeout=timeout)
except queue.Empty:
self.sleepscale = min(self.sleepscale + 1, 3)
time.sleep(timeout * 10**self.sleepscale)
return b''
def read(self, size=-1):
"""Reads bytes from the file."""
i = 0
buf = b''
while size < 0 or i != size:
line = self.read_queue()
if line:
buf += line
else:
break
i += len(line)
return buf
def readline(self, size=-1):
"""Reads a line, or a partial line from the file descriptor."""
i = 0
nl = b'\n'
buf = b''
while size < 0 or i != size:
line = self.read_queue()
if line:
buf += line
if line.endswith(nl):
break
else:
break
i += len(line)
return buf
def readlines(self, hint=-1):
"""Reads lines from the file descriptor."""
lines = []
while len(lines) != hint:
line = self.readline(size=-1)
if not line:
break
lines.append(line)
return lines
def fileno(self):
"""Returns the file descriptor number."""
return self.fd
@staticmethod
def readable():
"""Returns true, because this object is always readable."""
return True
def safe_fdclose(handle, cache=None):
"""Closes a file handle in the safest way possible, and potentially
@ -652,7 +597,8 @@ class PopenThread(threading.Thread):
if reader is None:
return 0
i = -1
for i, line in enumerate(iter(reader.readline, b'')):
#for i, line in enumerate(iter(reader.readline, b'')):
for i, line in enumerate(iter(reader.read_queue, b'')):
self._alt_mode_switch(line, writer, stdbuf)
if i >= 0:
writer.flush()