core: replace Queue with Latch

On Python 2.x, operations on pthread objects with a timeout set actually
cause internal polling. When polling fails to yield a positive result,
it quickly backs off to a 50ms loop, which results in a huge amount of
latency throughout.

Instead, give up using Queue.Queue.get(timeout=...) and replace it with
the UNIX self-pipe trick. Knocks another 45% off my.yml in the Ansible
examples directory against a local VM.

This has the potential to burn a *lot* of file descriptors, but hell,
it's not the 1940s any more, RAM is all but infinite. I can live with
that.

This gets things down to around 75ms per playbook step, still hunting
for additional sources of latency.
pull/81/head
David Wilson 7 years ago
parent 1c6e529458
commit 8121530144

@ -52,7 +52,6 @@ class ContextProxyService(mitogen.service.Service):
return isinstance(args, dict) return isinstance(args, dict)
def dispatch(self, dct, msg): def dispatch(self, dct, msg):
print dct.get('via')
key = repr(sorted(dct.items())) key = repr(sorted(dct.items()))
if key not in self._context_by_id: if key not in self._context_by_id:
method = getattr(self.router, dct.pop('method')) method = getattr(self.router, dct.pop('method'))

@ -318,28 +318,6 @@ class Sender(object):
) )
def _queue_interruptible_get(queue, timeout=None, block=True):
# bool is subclass of int, cannot use isinstance!
assert timeout is None or type(timeout) in (int, long, float)
assert isinstance(block, bool)
if timeout is not None:
timeout += time.time()
msg = None
while msg is None and (timeout is None or timeout > time.time()):
try:
msg = queue.get(block, 0.5)
except Queue.Empty:
if not block:
break
if msg is None:
raise TimeoutError('deadline exceeded.')
return msg
class Receiver(object): class Receiver(object):
notify = None notify = None
raise_channelerror = True raise_channelerror = True
@ -350,6 +328,7 @@ class Receiver(object):
self.handle = router.add_handler(self._on_receive, handle, self.handle = router.add_handler(self._on_receive, handle,
persist, respondent) persist, respondent)
self._queue = Queue.Queue() self._queue = Queue.Queue()
self._latch = Latch()
def __repr__(self): def __repr__(self):
return 'Receiver(%r, %r)' % (self.router, self.handle) return 'Receiver(%r, %r)' % (self.router, self.handle)
@ -358,6 +337,7 @@ class Receiver(object):
"""Callback from the Stream; appends data to the internal queue.""" """Callback from the Stream; appends data to the internal queue."""
IOLOG.debug('%r._on_receive(%r)', self, msg) IOLOG.debug('%r._on_receive(%r)', self, msg)
self._queue.put(msg) self._queue.put(msg)
self._latch.wake()
if self.notify: if self.notify:
self.notify(self) self.notify(self)
@ -369,9 +349,9 @@ class Receiver(object):
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
self._latch.wait(timeout=timeout)
msg = _queue_interruptible_get(self._queue, timeout, block=block) msg = self._queue.get()
IOLOG.debug('%r.get() got %r', self, msg) #IOLOG.debug('%r.get() got %r', self, msg)
if msg == _DEAD: if msg == _DEAD:
raise ChannelError(ChannelError.local_msg) raise ChannelError(ChannelError.local_msg)
@ -807,7 +787,44 @@ def _unpickle_context(router, context_id, name):
return router.context_class(router, context_id, name) return router.context_class(router, context_id, name)
class Waker(BasicStream): class Latch(object):
def __init__(self):
rfd, wfd = os.pipe()
set_cloexec(rfd)
set_cloexec(wfd)
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
def close(self):
self.receive_side.close()
self.transmit_side.close()
__del__ = close
def wait(self, timeout=None):
while True:
rfds, _, _ = select.select([self.receive_side], [], [], timeout)
if not rfds:
return False
try:
self.receive_side.read(1)
except OSError, e:
if e[0] == errno.EWOULDBLOCK:
continue
raise
return False
def wake(self):
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
try:
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
class Waker(Latch, BasicStream):
""" """
:py:class:`BasicStream` subclass implementing the :py:class:`BasicStream` subclass implementing the
`UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when `UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when
@ -816,34 +833,25 @@ class Waker(BasicStream):
.. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
""" """
def __init__(self, broker): def __init__(self, broker):
super(Waker, self).__init__()
self._broker = broker self._broker = broker
rfd, wfd = os.pipe()
set_cloexec(rfd)
set_cloexec(wfd)
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
def __repr__(self): def __repr__(self):
return 'Waker(%r)' % (self._broker,) return 'Waker(%r)' % (self._broker,)
def on_receive(self, broker):
"""
Read a byte from the self-pipe.
"""
self.receive_side.read(256)
def wake(self): def wake(self):
""" """
Write a byte to the self-pipe, causing the IO multiplexer to wake up. Write a byte to the self-pipe, causing the IO multiplexer to wake up.
Nothing is written if the current thread is the IO multiplexer thread. Nothing is written if the current thread is the IO multiplexer thread.
""" """
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
if threading.currentThread() != self._broker._thread: if threading.currentThread() != self._broker._thread:
try: super(Waker, self).wake()
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
def on_receive(self, broker):
"""
Read a byte from the self-pipe.
"""
self.receive_side.read(256)
class IoLogger(BasicStream): class IoLogger(BasicStream):

@ -127,11 +127,13 @@ class Select(object):
self._receivers = [] self._receivers = []
self._oneshot = oneshot self._oneshot = oneshot
self._queue = Queue.Queue() self._queue = Queue.Queue()
self._latch = mitogen.core.Latch()
for recv in receivers: for recv in receivers:
self.add(recv) self.add(recv)
def _put(self, value): def _put(self, value):
self._queue.put(value) self._queue.put(value)
self._latch.wake()
if self.notify: if self.notify:
self.notify(self) self.notify(self)
@ -200,7 +202,8 @@ class Select(object):
raise SelectError(self.empty_msg) raise SelectError(self.empty_msg)
while True: while True:
recv = mitogen.core._queue_interruptible_get(self._queue, timeout) self._latch.wait()
recv = self._queue.get()
try: try:
msg = recv.get(block=False) msg = recv.get(block=False)
if self._oneshot: if self._oneshot:

Loading…
Cancel
Save