From ea9ef50b3c0056694db56a5fa2f4af73c4b3592e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 20 Jan 2019 13:39:12 +0000 Subject: [PATCH] issue #415: replace default Poller with select.poll() 30% latency reduction for IPC. --- docs/changelog.rst | 5 ++++ mitogen/core.py | 59 ++++++++++++++++++++++++++++++++-------------- mitogen/parent.py | 5 ---- 3 files changed, 46 insertions(+), 23 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 5d1fd9ca..dab5821e 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -153,6 +153,11 @@ Enhancements ``mitogen_host_pinned`` strategy wraps the ``host_pinned`` strategy introduced in Ansible 2.7. +* `#415 `_: the interface employed for + in-process queues was changed from Kqueue/epoll() to poll(), which requires + no setup or teardown, yielding a 30% latency reduction for inter-thread + communication. This may manifest as a runtime improvement in many-host runs. + Fixes ^^^^^ diff --git a/mitogen/core.py b/mitogen/core.py index 9bc11277..825c580c 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1690,7 +1690,7 @@ class Poller(object): A poller manages OS file descriptors the user is waiting to become available for IO. The :meth:`poll` method blocks the calling thread until one or more become ready. The default implementation is based on - :func:`select.select`. + :func:`select.poll`. Each descriptor has an associated `data` element, which is unique for each readiness type, and defaults to being the same as the file descriptor. The @@ -1713,6 +1713,15 @@ class Poller(object): Pollers may only be used by one thread at a time. """ + # This changed from select() to poll() in Mitogen 0.2.4. Since poll() has + # no upper FD limit, it is suitable for use with Latch, which must handle + # FDs larger than select's limit during many-host runs. We want this + # because poll() requires no setup and teardown: just a single system call, + # which is important because Latch.get() creates a Poller on each + # invocation. In a microbenchmark, poll() vs. epoll_ctl() is 30% faster in + # this scenario. If select() must return in future, it is important + # Latch.poller_class is set from parent.py to point to the industrial + # strength poller for the OS, otherwise Latch will fail randomly. #: Increments on every poll(). Used to version _rfds and _wfds. _generation = 1 @@ -1720,6 +1729,7 @@ class Poller(object): def __init__(self): self._rfds = {} self._wfds = {} + self._pollobj = select.poll() def __repr__(self): return '%s(%#x)' % (type(self).__name__, id(self)) @@ -1746,11 +1756,23 @@ class Poller(object): """ pass + def _update(self, fd): + mask = (((fd in self._rfds) and select.POLLIN) | + ((fd in self._wfds) and select.POLLOUT)) + if mask: + self._pollobj.register(fd, mask) + else: + try: + self._pollobj.unregister(fd) + except KeyError: + pass + def start_receive(self, fd, data=None): """ Cause :meth:`poll` to yield `data` when `fd` is readable. """ self._rfds[fd] = (data or fd, self._generation) + self._update(fd) def stop_receive(self, fd): """ @@ -1760,12 +1782,14 @@ class Poller(object): change in future. """ self._rfds.pop(fd, None) + self._update(fd) def start_transmit(self, fd, data=None): """ Cause :meth:`poll` to yield `data` when `fd` is writeable. """ self._wfds[fd] = (data or fd, self._generation) + self._update(fd) def stop_transmit(self, fd): """ @@ -1775,25 +1799,24 @@ class Poller(object): change in future. """ self._wfds.pop(fd, None) + self._update(fd) def _poll(self, timeout): - (rfds, wfds, _), _ = io_op(select.select, - self._rfds, - self._wfds, - (), timeout - ) - - for fd in rfds: - _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) - data, gen = self._rfds.get(fd, (None, None)) - if gen and gen < self._generation: - yield data - - for fd in wfds: - _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) - data, gen = self._wfds.get(fd, (None, None)) - if gen and gen < self._generation: - yield data + if timeout: + timeout *= 1000 + + events, _ = io_op(self._pollobj.poll, timeout) + for fd, event in events: + if event & select.POLLIN: + _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) + data, gen = self._rfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + if event & select.POLLOUT: + _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) + data, gen = self._wfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data def poll(self, timeout=None): """ diff --git a/mitogen/parent.py b/mitogen/parent.py index e247b7fb..fbb488e5 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -942,11 +942,6 @@ PREFERRED_POLLER = POLLER_BY_SYSNAME.get( mitogen.core.Poller, ) -# For apps that start threads dynamically, it's possible Latch will also get -# very high-numbered wait fds when there are many connections, and so select() -# becomes useless there too. So swap in our favourite poller. -mitogen.core.Latch.poller_class = PREFERRED_POLLER - class DiagLogStream(mitogen.core.BasicStream): """