issue #415: replace default Poller with select.poll()

30% latency reduction for IPC.
issue510
David Wilson 6 years ago
parent 7c33979e25
commit ea9ef50b3c

@ -153,6 +153,11 @@ Enhancements
``mitogen_host_pinned`` strategy wraps the ``host_pinned`` strategy
introduced in Ansible 2.7.
* `#415 <https://github.com/dw/mitogen/issues/415>`_: the interface employed for
in-process queues was changed from Kqueue/epoll() to poll(), which requires
no setup or teardown, yielding a 30% latency reduction for inter-thread
communication. This may manifest as a runtime improvement in many-host runs.
Fixes
^^^^^

@ -1690,7 +1690,7 @@ class Poller(object):
A poller manages OS file descriptors the user is waiting to become
available for IO. The :meth:`poll` method blocks the calling thread
until one or more become ready. The default implementation is based on
:func:`select.select`.
:func:`select.poll`.
Each descriptor has an associated `data` element, which is unique for each
readiness type, and defaults to being the same as the file descriptor. The
@ -1713,6 +1713,15 @@ class Poller(object):
Pollers may only be used by one thread at a time.
"""
# This changed from select() to poll() in Mitogen 0.2.4. Since poll() has
# no upper FD limit, it is suitable for use with Latch, which must handle
# FDs larger than select's limit during many-host runs. We want this
# because poll() requires no setup and teardown: just a single system call,
# which is important because Latch.get() creates a Poller on each
# invocation. In a microbenchmark, poll() vs. epoll_ctl() is 30% faster in
# this scenario. If select() must return in future, it is important
# Latch.poller_class is set from parent.py to point to the industrial
# strength poller for the OS, otherwise Latch will fail randomly.
#: Increments on every poll(). Used to version _rfds and _wfds.
_generation = 1
@ -1720,6 +1729,7 @@ class Poller(object):
def __init__(self):
self._rfds = {}
self._wfds = {}
self._pollobj = select.poll()
def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self))
@ -1746,11 +1756,23 @@ class Poller(object):
"""
pass
def _update(self, fd):
mask = (((fd in self._rfds) and select.POLLIN) |
((fd in self._wfds) and select.POLLOUT))
if mask:
self._pollobj.register(fd, mask)
else:
try:
self._pollobj.unregister(fd)
except KeyError:
pass
def start_receive(self, fd, data=None):
"""
Cause :meth:`poll` to yield `data` when `fd` is readable.
"""
self._rfds[fd] = (data or fd, self._generation)
self._update(fd)
def stop_receive(self, fd):
"""
@ -1760,12 +1782,14 @@ class Poller(object):
change in future.
"""
self._rfds.pop(fd, None)
self._update(fd)
def start_transmit(self, fd, data=None):
"""
Cause :meth:`poll` to yield `data` when `fd` is writeable.
"""
self._wfds[fd] = (data or fd, self._generation)
self._update(fd)
def stop_transmit(self, fd):
"""
@ -1775,25 +1799,24 @@ class Poller(object):
change in future.
"""
self._wfds.pop(fd, None)
self._update(fd)
def _poll(self, timeout):
(rfds, wfds, _), _ = io_op(select.select,
self._rfds,
self._wfds,
(), timeout
)
for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if timeout:
timeout *= 1000
events, _ = io_op(self._pollobj.poll, timeout)
for fd, event in events:
if event & select.POLLIN:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if event & select.POLLOUT:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
def poll(self, timeout=None):
"""

@ -942,11 +942,6 @@ PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
mitogen.core.Poller,
)
# For apps that start threads dynamically, it's possible Latch will also get
# very high-numbered wait fds when there are many connections, and so select()
# becomes useless there too. So swap in our favourite poller.
mitogen.core.Latch.poller_class = PREFERRED_POLLER
class DiagLogStream(mitogen.core.BasicStream):
"""

Loading…
Cancel
Save