issue #249: port Latch to poller too.

This is probably going to suck for perf :/
pull/255/head
David Wilson 6 years ago
parent 1070dfae72
commit 36a1024861

@ -1019,7 +1019,60 @@ def _unpickle_context(router, context_id, name):
return router.context_class(router, context_id, name)
class Poller(object):
def __init__(self):
self._rfds = {}
self._wfds = {}
_repr = 'Poller()'
@property
def readers(self):
return list(self._rfds.items())
@property
def writers(self):
return list(self._wfds.items())
def __repr__(self):
return self._repr
def close(self):
pass
def start_receive(self, fd, data=None):
self._rfds[fd] = data or fd
def stop_receive(self, fd):
self._rfds.pop(fd, None)
def start_transmit(self, fd, data=None):
self._wfds[fd] = data or fd
def stop_transmit(self, fd):
self._wfds.pop(fd, None)
def poll(self, timeout=None):
_vv and IOLOG.debug('%r.poll(%r)', self, timeout)
IOLOG.debug('readers = %r', self._rfds)
IOLOG.debug('writers = %r', self._wfds)
(rfds, wfds, _), _ = io_op(select.select,
self._rfds,
self._wfds,
(), timeout
)
for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
yield self._rfds[fd]
for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
yield self._wfds[fd]
class Latch(object):
poller_class = Poller
closed = False
_waking = 0
_sockets = []
@ -1084,10 +1137,15 @@ class Latch(object):
_vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)',
self, timeout, block)
e = None
poller = self.poller_class()
poller.start_receive(rsock.fileno())
try:
io_op(select.select, [rsock], [], [], timeout)
except Exception:
e = sys.exc_info()[1]
try:
list(poller.poll(timeout))
except Exception:
e = sys.exc_info()[1]
finally:
poller.close()
self._lock.acquire()
try:
@ -1435,58 +1493,6 @@ class Router(object):
self.broker.defer(self._async_route, msg)
class Poller(object):
def __init__(self):
self._rfds = {}
self._wfds = {}
_repr = 'Poller()'
@property
def readers(self):
return list(self._rfds.items())
@property
def writers(self):
return list(self._wfds.items())
def __repr__(self):
return self._repr
def close(self):
pass
def start_receive(self, fd, data=None):
self._rfds[fd] = data or fd
def stop_receive(self, fd):
self._rfds.pop(fd, None)
def start_transmit(self, fd, data=None):
self._wfds[fd] = data or fd
def stop_transmit(self, fd):
self._wfds.pop(fd, None)
def poll(self, timeout=None):
_vv and IOLOG.debug('%r.poll(%r)', self, timeout)
IOLOG.debug('readers = %r', self._rfds)
IOLOG.debug('writers = %r', self._wfds)
(rfds, wfds, _), _ = io_op(select.select,
self._rfds,
self._wfds,
(), timeout
)
for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
yield self._rfds[fd]
for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
yield self._wfds[fd]
class Broker(object):
poller_class = Poller
_waker = None

@ -654,11 +654,17 @@ POLLER_BY_SYSNAME = {
'FreeBSD': KqueuePoller,
'Linux': EpollPoller,
}
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
os.uname()[0],
mitogen.core.Poller,
)
# For apps that start threads dynamically, it's possible Latch will also get
# very high-numbered wait fds when there are many connections, and so select()
# becomes useless there too. So swap in our favourite poller.
mitogen.core.Latch.poller_class = PREFERRED_POLLER
class TtyLogStream(mitogen.core.BasicStream):
"""

Loading…
Cancel
Save