diff --git a/mitogen/core.py b/mitogen/core.py index 428965eb..f58eac83 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1019,7 +1019,60 @@ def _unpickle_context(router, context_id, name): return router.context_class(router, context_id, name) +class Poller(object): + def __init__(self): + self._rfds = {} + self._wfds = {} + + _repr = 'Poller()' + + @property + def readers(self): + return list(self._rfds.items()) + + @property + def writers(self): + return list(self._wfds.items()) + + def __repr__(self): + return self._repr + + def close(self): + pass + + def start_receive(self, fd, data=None): + self._rfds[fd] = data or fd + + def stop_receive(self, fd): + self._rfds.pop(fd, None) + + def start_transmit(self, fd, data=None): + self._wfds[fd] = data or fd + + def stop_transmit(self, fd): + self._wfds.pop(fd, None) + + def poll(self, timeout=None): + _vv and IOLOG.debug('%r.poll(%r)', self, timeout) + IOLOG.debug('readers = %r', self._rfds) + IOLOG.debug('writers = %r', self._wfds) + (rfds, wfds, _), _ = io_op(select.select, + self._rfds, + self._wfds, + (), timeout + ) + + for fd in rfds: + _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) + yield self._rfds[fd] + + for fd in wfds: + _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) + yield self._wfds[fd] + + class Latch(object): + poller_class = Poller closed = False _waking = 0 _sockets = [] @@ -1084,10 +1137,15 @@ class Latch(object): _vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)', self, timeout, block) e = None + poller = self.poller_class() + poller.start_receive(rsock.fileno()) try: - io_op(select.select, [rsock], [], [], timeout) - except Exception: - e = sys.exc_info()[1] + try: + list(poller.poll(timeout)) + except Exception: + e = sys.exc_info()[1] + finally: + poller.close() self._lock.acquire() try: @@ -1435,58 +1493,6 @@ class Router(object): self.broker.defer(self._async_route, msg) -class Poller(object): - def __init__(self): - self._rfds = {} - self._wfds = {} - - _repr = 'Poller()' - - @property - def readers(self): - return list(self._rfds.items()) - - @property - def writers(self): - return list(self._wfds.items()) - - def __repr__(self): - return self._repr - - def close(self): - pass - - def start_receive(self, fd, data=None): - self._rfds[fd] = data or fd - - def stop_receive(self, fd): - self._rfds.pop(fd, None) - - def start_transmit(self, fd, data=None): - self._wfds[fd] = data or fd - - def stop_transmit(self, fd): - self._wfds.pop(fd, None) - - def poll(self, timeout=None): - _vv and IOLOG.debug('%r.poll(%r)', self, timeout) - IOLOG.debug('readers = %r', self._rfds) - IOLOG.debug('writers = %r', self._wfds) - (rfds, wfds, _), _ = io_op(select.select, - self._rfds, - self._wfds, - (), timeout - ) - - for fd in rfds: - _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) - yield self._rfds[fd] - - for fd in wfds: - _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) - yield self._wfds[fd] - - class Broker(object): poller_class = Poller _waker = None diff --git a/mitogen/parent.py b/mitogen/parent.py index 9e40d17f..3cfff8fb 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -654,11 +654,17 @@ POLLER_BY_SYSNAME = { 'FreeBSD': KqueuePoller, 'Linux': EpollPoller, } + PREFERRED_POLLER = POLLER_BY_SYSNAME.get( os.uname()[0], mitogen.core.Poller, ) +# For apps that start threads dynamically, it's possible Latch will also get +# very high-numbered wait fds when there are many connections, and so select() +# becomes useless there too. So swap in our favourite poller. +mitogen.core.Latch.poller_class = PREFERRED_POLLER + class TtyLogStream(mitogen.core.BasicStream): """