From e6a107c5aa3ac1200c55f33e86d1e2b521ebf48e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 15 Feb 2018 01:59:49 +0545 Subject: [PATCH] core: replace Queue with Latch On Python 2.x, operations on pthread objects with a timeout set actually cause internal polling. When polling fails to yield a positive result, it quickly backs off to a 50ms loop, which results in a huge amount of latency throughout. Instead, give up using Queue.Queue.get(timeout=...) and replace it with the UNIX self-pipe trick. Knocks another 45% off my.yml in the Ansible examples directory against a local VM. This has the potential to burn a *lot* of file descriptors, but hell, it's not the 1940s any more, RAM is all but infinite. I can live with that. This gets things down to around 75ms per playbook step, still hunting for additional sources of latency. --- ansible_mitogen/strategy/mitogen.py | 1 - mitogen/core.py | 94 ++++++++++++++++------------- mitogen/master.py | 5 +- 3 files changed, 55 insertions(+), 45 deletions(-) diff --git a/ansible_mitogen/strategy/mitogen.py b/ansible_mitogen/strategy/mitogen.py index 7be52f59..8eab8408 100644 --- a/ansible_mitogen/strategy/mitogen.py +++ b/ansible_mitogen/strategy/mitogen.py @@ -52,7 +52,6 @@ class ContextProxyService(mitogen.service.Service): return isinstance(args, dict) def dispatch(self, dct, msg): - print dct.get('via') key = repr(sorted(dct.items())) if key not in self._context_by_id: method = getattr(self.router, dct.pop('method')) diff --git a/mitogen/core.py b/mitogen/core.py index 1ec2bc57..ca796ab2 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -318,28 +318,6 @@ class Sender(object): ) -def _queue_interruptible_get(queue, timeout=None, block=True): - # bool is subclass of int, cannot use isinstance! - assert timeout is None or type(timeout) in (int, long, float) - assert isinstance(block, bool) - - if timeout is not None: - timeout += time.time() - - msg = None - while msg is None and (timeout is None or timeout > time.time()): - try: - msg = queue.get(block, 0.5) - except Queue.Empty: - if not block: - break - - if msg is None: - raise TimeoutError('deadline exceeded.') - - return msg - - class Receiver(object): notify = None raise_channelerror = True @@ -350,6 +328,7 @@ class Receiver(object): self.handle = router.add_handler(self._on_receive, handle, persist, respondent) self._queue = Queue.Queue() + self._latch = Latch() def __repr__(self): return 'Receiver(%r, %r)' % (self.router, self.handle) @@ -358,6 +337,7 @@ class Receiver(object): """Callback from the Stream; appends data to the internal queue.""" IOLOG.debug('%r._on_receive(%r)', self, msg) self._queue.put(msg) + self._latch.wake() if self.notify: self.notify(self) @@ -369,9 +349,9 @@ class Receiver(object): def get(self, timeout=None, block=True): IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) - - msg = _queue_interruptible_get(self._queue, timeout, block=block) - IOLOG.debug('%r.get() got %r', self, msg) + self._latch.wait(timeout=timeout) + msg = self._queue.get() + #IOLOG.debug('%r.get() got %r', self, msg) if msg == _DEAD: raise ChannelError(ChannelError.local_msg) @@ -807,7 +787,44 @@ def _unpickle_context(router, context_id, name): return router.context_class(router, context_id, name) -class Waker(BasicStream): +class Latch(object): + def __init__(self): + rfd, wfd = os.pipe() + set_cloexec(rfd) + set_cloexec(wfd) + self.receive_side = Side(self, rfd) + self.transmit_side = Side(self, wfd) + + def close(self): + self.receive_side.close() + self.transmit_side.close() + + __del__ = close + + def wait(self, timeout=None): + while True: + rfds, _, _ = select.select([self.receive_side], [], [], timeout) + if not rfds: + return False + + try: + self.receive_side.read(1) + except OSError, e: + if e[0] == errno.EWOULDBLOCK: + continue + raise + return False + + def wake(self): + IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) + try: + self.transmit_side.write(' ') + except OSError, e: + if e[0] != errno.EBADF: + raise + + +class Waker(Latch, BasicStream): """ :py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when @@ -816,34 +833,25 @@ class Waker(BasicStream): .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html """ def __init__(self, broker): + super(Waker, self).__init__() self._broker = broker - rfd, wfd = os.pipe() - set_cloexec(rfd) - set_cloexec(wfd) - self.receive_side = Side(self, rfd) - self.transmit_side = Side(self, wfd) def __repr__(self): return 'Waker(%r)' % (self._broker,) + def on_receive(self, broker): + """ + Read a byte from the self-pipe. + """ + self.receive_side.read(256) + def wake(self): """ Write a byte to the self-pipe, causing the IO multiplexer to wake up. Nothing is written if the current thread is the IO multiplexer thread. """ - IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) if threading.currentThread() != self._broker._thread: - try: - self.transmit_side.write(' ') - except OSError, e: - if e[0] != errno.EBADF: - raise - - def on_receive(self, broker): - """ - Read a byte from the self-pipe. - """ - self.receive_side.read(256) + super(Waker, self).wake() class IoLogger(BasicStream): diff --git a/mitogen/master.py b/mitogen/master.py index 9d1f33b2..7905ed7d 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -127,11 +127,13 @@ class Select(object): self._receivers = [] self._oneshot = oneshot self._queue = Queue.Queue() + self._latch = mitogen.core.Latch() for recv in receivers: self.add(recv) def _put(self, value): self._queue.put(value) + self._latch.wake() if self.notify: self.notify(self) @@ -200,7 +202,8 @@ class Select(object): raise SelectError(self.empty_msg) while True: - recv = mitogen.core._queue_interruptible_get(self._queue, timeout) + self._latch.wait() + recv = self._queue.get() try: msg = recv.get(block=False) if self._oneshot: