diff --git a/mitogen/core.py b/mitogen/core.py index b952e601..189e6d31 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1454,6 +1454,9 @@ class Poller(object): def __repr__(self): return self._repr + def close(self): + pass + def start_receive(self, fd, data=None): self._rfds[fd] = data or fd diff --git a/mitogen/parent.py b/mitogen/parent.py index ca77df8b..d40bfee8 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -356,23 +356,27 @@ def hybrid_tty_create_child(args): def write_all(fd, s, deadline=None): - poller = PREFERRED_POLLER() - poller.start_transmit(fd) timeout = None written = 0 + poller = PREFERRED_POLLER() + poller.start_transmit(fd) - while written < len(s): - if deadline is not None: - timeout = max(0, deadline - time.time()) - if timeout == 0: - raise mitogen.core.TimeoutError('write timed out') + try: + while written < len(s): + if deadline is not None: + timeout = max(0, deadline - time.time()) + if timeout == 0: + raise mitogen.core.TimeoutError('write timed out') - for fd in poller.poll(timeout): - n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written)) - if disconnected: - raise mitogen.core.StreamError('EOF on stream during write') + for fd in poller.poll(timeout): + n, disconnected = mitogen.core.io_op( + os.write, fd, buffer(s, written)) + if disconnected: + raise mitogen.core.StreamError('EOF on stream during write') - written += n + written += n + finally: + poller.close() def iter_read(fds, deadline=None): @@ -382,28 +386,30 @@ def iter_read(fds, deadline=None): bits = [] timeout = None - while poller.readers: - if deadline is not None: - timeout = max(0, deadline - time.time()) - if timeout == 0: - break - - for fd in poller.poll(timeout): - s, disconnected = mitogen.core.io_op(os.read, fd, 4096) - if disconnected or not s: - IOLOG.debug('iter_read(%r) -> disconnected', fd) - fds.remove(fd) - else: - IOLOG.debug('iter_read(%r) -> %r', fd, s) - bits.append(s) - yield s - - if not fds: + try: + while poller.readers: + if deadline is not None: + timeout = max(0, deadline - time.time()) + if timeout == 0: + break + + for fd in poller.poll(timeout): + s, disconnected = mitogen.core.io_op(os.read, fd, 4096) + if disconnected or not s: + IOLOG.debug('iter_read(%r) -> disconnected', fd) + fds.remove(fd) + else: + IOLOG.debug('iter_read(%r) -> %r', fd, s) + bits.append(s) + yield s + finally: + poller.close() + + if not poller.readers: raise mitogen.core.StreamError( 'EOF on stream; last 300 bytes received: %r' % (''.join(bits)[-300:],) ) - raise mitogen.core.TimeoutError('read timed out') @@ -513,6 +519,9 @@ class KqueuePoller(Poller): self._wfds = {} self._changelist = [] + def close(self): + self._kqueue.close() + @property def readers(self): return list(self._rfds.items()) @@ -575,6 +584,9 @@ class EpollPoller(Poller): self._rfds = {} self._wfds = {} + def close(self): + self._epoll.close() + @property def readers(self): return list(self._rfds.items())