|
|
|
@ -505,6 +505,48 @@ def write_all(fd, s, deadline=None):
|
|
|
|
|
poller.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IteratingRead(object):
|
|
|
|
|
def __init__(self, fds, deadline=None):
|
|
|
|
|
self.deadline = deadline
|
|
|
|
|
self.poller = PREFERRED_POLLER()
|
|
|
|
|
for fd in fds:
|
|
|
|
|
self.poller.start_receive(fd)
|
|
|
|
|
|
|
|
|
|
self.bits = []
|
|
|
|
|
self.timeout = None
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
self.poller.close()
|
|
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def next(self):
|
|
|
|
|
while self.poller.readers:
|
|
|
|
|
if self.deadline is not None:
|
|
|
|
|
timeout = max(0, self.deadline - time.time())
|
|
|
|
|
if timeout == 0:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
for fd in self.poller.poll(timeout):
|
|
|
|
|
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
|
|
|
|
|
if disconnected or not s:
|
|
|
|
|
IOLOG.debug('iter_read(%r) -> disconnected', fd)
|
|
|
|
|
self.poller.stop_receive(fd)
|
|
|
|
|
else:
|
|
|
|
|
IOLOG.debug('iter_read(%r) -> %r', fd, s)
|
|
|
|
|
self.bits.append(s)
|
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
|
if not poller.readers:
|
|
|
|
|
raise EofError(u'EOF on stream; last 300 bytes received: %r' %
|
|
|
|
|
(b('').join(self.bits)[-300:].decode('latin1'),))
|
|
|
|
|
|
|
|
|
|
raise mitogen.core.TimeoutError('read timed out')
|
|
|
|
|
|
|
|
|
|
__next__ = next
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def iter_read(fds, deadline=None):
|
|
|
|
|
"""Return a generator that arranges for up to 4096-byte chunks to be read
|
|
|
|
|
at a time from the file descriptor `fd` until the generator is destroyed.
|
|
|
|
@ -522,36 +564,7 @@ def iter_read(fds, deadline=None):
|
|
|
|
|
:raises mitogen.core.StreamError:
|
|
|
|
|
Attempt to read past end of file.
|
|
|
|
|
"""
|
|
|
|
|
poller = PREFERRED_POLLER()
|
|
|
|
|
for fd in fds:
|
|
|
|
|
poller.start_receive(fd)
|
|
|
|
|
|
|
|
|
|
bits = []
|
|
|
|
|
timeout = None
|
|
|
|
|
try:
|
|
|
|
|
while poller.readers:
|
|
|
|
|
if deadline is not None:
|
|
|
|
|
timeout = max(0, deadline - time.time())
|
|
|
|
|
if timeout == 0:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
for fd in poller.poll(timeout):
|
|
|
|
|
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
|
|
|
|
|
if disconnected or not s:
|
|
|
|
|
IOLOG.debug('iter_read(%r) -> disconnected', fd)
|
|
|
|
|
poller.stop_receive(fd)
|
|
|
|
|
else:
|
|
|
|
|
IOLOG.debug('iter_read(%r) -> %r', fd, s)
|
|
|
|
|
bits.append(s)
|
|
|
|
|
yield s
|
|
|
|
|
finally:
|
|
|
|
|
poller.close()
|
|
|
|
|
|
|
|
|
|
if not poller.readers:
|
|
|
|
|
raise EofError(u'EOF on stream; last 300 bytes received: %r' %
|
|
|
|
|
(b('').join(bits)[-300:].decode('latin1'),))
|
|
|
|
|
|
|
|
|
|
raise mitogen.core.TimeoutError('read timed out')
|
|
|
|
|
return IteratingRead(fds=fds, deadline=deadline)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def discard_until(fd, s, deadline):
|
|
|
|
|