diff --git a/mitogen/master.py b/mitogen/master.py index d2fe66c8..e6a7a8ed 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -167,27 +167,22 @@ def write_all(fd, s): return written -def read_with_deadline(fd, size, deadline): - timeout = deadline - time.time() - if timeout > 0: - rfds, _, _ = select.select([fd], [], [], timeout) - if rfds: - return os.read(fd, size) - - raise mitogen.core.TimeoutError('read timed out') +def iter_read(fd, deadline=None): + bits = [] + timeout = None + while True: + if deadline is not None: + timeout = max(0, deadline - time.time()) + if timeout == 0: + break -def iter_read(fd, deadline): - if deadline is not None: - LOG.debug('Warning: iter_read(.., deadline=...) unimplemented') + rfds, _, _ = select.select([fd], [], [], timeout) + if not rfds: + continue - bits = [] - while True: s, disconnected = mitogen.core.io_op(os.read, fd, 4096) if disconnected: - s = '' - - if not s: raise mitogen.core.StreamError( 'EOF on stream; last 300 bytes received: %r' % (''.join(bits)[-300:],) @@ -196,6 +191,8 @@ def iter_read(fd, deadline): bits.append(s) yield s + raise mitogen.core.TimeoutError('read timed out') + def discard_until(fd, s, deadline): for buf in iter_read(fd, deadline): diff --git a/tests/data/iter_read_generator.sh b/tests/data/iter_read_generator.sh new file mode 100755 index 00000000..3aa6d6ac --- /dev/null +++ b/tests/data/iter_read_generator.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# I produce text every 100ms, for testing mitogen.core.iter_read() + +i=0 + +while :; do + i=$(($i + 1)) + echo "$i" + sleep 0.1 +done diff --git a/tests/master_test.py b/tests/master_test.py new file mode 100644 index 00000000..72318f46 --- /dev/null +++ b/tests/master_test.py @@ -0,0 +1,56 @@ + +import subprocess +import time +import unittest + +import testlib +import mitogen.master + + +class IterReadTest(unittest.TestCase): + func = staticmethod(mitogen.master.iter_read) + + def make_proc(self): + args = [testlib.data_path('iter_read_generator.sh')] + return subprocess.Popen(args, stdout=subprocess.PIPE) + + def test_no_deadline(self): + proc = self.make_proc() + try: + reader = self.func(proc.stdout.fileno()) + for i, chunk in enumerate(reader, 1): + assert i == int(chunk) + if i > 3: + break + finally: + proc.terminate() + + def test_deadline_exceeded_before_call(self): + proc = self.make_proc() + reader = self.func(proc.stdout.fileno(), 0) + try: + got = [] + try: + for chunk in reader: + got.append(chunk) + assert 0, 'TimeoutError not raised' + except mitogen.core.TimeoutError: + assert len(got) == 0 + finally: + proc.terminate() + + def test_deadline_exceeded_during_call(self): + proc = self.make_proc() + reader = self.func(proc.stdout.fileno(), time.time() + 0.4) + try: + got = [] + try: + for chunk in reader: + got.append(chunk) + assert 0, 'TimeoutError not raised' + except mitogen.core.TimeoutError: + # Give a little wiggle room in case of imperfect scheduling. + # Ideal number should be 9. + assert 3 < len(got) < 5 + finally: + proc.terminate()