Latch v2: combined queue + one self-pipe-per-thread

Turns out it is far too easy to burn through available file descriptors,
so try something else: self-pipes are per thread, and only temporarily
associated with a Lack that wishes to sleep.

Reduce pointless locking by giving Latch its own queue, and removing
Queue.Queue() use in some places.

Temporarily undo merging of of Waker and Latch, let's do this one step
at a time.
pull/87/head
David Wilson 7 years ago
parent 8121530144
commit a6324aaeb1

@ -813,6 +813,47 @@ on Windows. When that happens, it would be nice if the process model on Windows
and UNIX did not differ, and in fact the code used on both were identical.
Waking Sleeping Threads
#######################
Due to fundamental deficiencies in Python 2's threading implementation, it is
not possible to block waiting on synchronization objects sanely. Two major
problems exist:
* Sleeping with no timeout set causes signals to be blocked, preventing the
user from using CTRL+C to terminate the process.
* Sleeping with a timeout set internally makes use of polling, with an
exponential backoff that eventually results in the thread sleeping
unconditionally in 50ms increments. . This is a huge source of latency that
quickly multiplies.
As the UNIX self-pipe trick must already be employed to wake the broker thread
from its select loop, Mitogen reuses this technique to wake any thread
synchronization primitive exposed by the library, embodied in a queue-like
abstraction called a :py:class:`mitogen.core.Latch`.
Unfortunately it is commonplace for hosts to enforce severe per-process file
descriptors limits, so aside from being inefficient, it is impossible in the
usual case to create a pair of descriptors for every waitable object, which for
example includes the result of every single asynchronous function call.
For this reason self-pipes are created on a per-thread basis, with their
associated :py:func:`socketpairs <socket.socketpair>` kept in thread-local
storage. When a latch wishes to sleep its thread, this pair is created
on-demand and temporarily associated with it only for the duration of the
sleep.
Python's garbage collector is relied on to clean up by calling the pair's
destructor on thread exit. There does not otherwise seem to be a robust method
to trigger cleanup code on arbitrary threads.
To summarize, file descriptor usage is bounded by the number of threads rather
than the number of waitables, which is a much smaller number, however it also
means that Mitogen requires twice as many file descriptors as there are user
threads, with a minimum of 4 required in any configuration.
.. rubric:: Footnotes
.. [#f1] Compression may seem redundant, however it is basically free and reducing IO

@ -64,6 +64,7 @@ SHUTDOWN = 105
LOAD_MODULE = 106
CHUNK_SIZE = 16384
_tls = threading.local()
if __name__ == 'mitogen.core':
@ -327,7 +328,6 @@ class Receiver(object):
self.handle = handle # Avoid __repr__ crash in add_handler()
self.handle = router.add_handler(self._on_receive, handle,
persist, respondent)
self._queue = Queue.Queue()
self._latch = Latch()
def __repr__(self):
@ -336,21 +336,19 @@ class Receiver(object):
def _on_receive(self, msg):
"""Callback from the Stream; appends data to the internal queue."""
IOLOG.debug('%r._on_receive(%r)', self, msg)
self._queue.put(msg)
self._latch.wake()
self._latch.put(msg)
if self.notify:
self.notify(self)
def close(self):
self._queue.put(_DEAD)
self._latch.put(_DEAD)
def empty(self):
return self._queue.empty()
return self._latch.empty()
def get(self, timeout=None, block=True):
IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
self._latch.wait(timeout=timeout)
msg = self._queue.get()
msg = self._latch.get(timeout=timeout, block=block)
#IOLOG.debug('%r.get() got %r', self, msg)
if msg == _DEAD:
@ -399,7 +397,6 @@ class Importer(object):
self._lock = threading.Lock()
# Presence of an entry in this map indicates in-flight GET_MODULE.
self._callbacks = {}
self.tls = threading.local()
router.add_handler(self._on_load_module, LOAD_MODULE)
self._cache = {}
if core_src:
@ -415,10 +412,10 @@ class Importer(object):
return 'Importer()'
def find_module(self, fullname, path=None):
if hasattr(self.tls, 'running'):
if hasattr(_tls, 'running'):
return None
self.tls.running = True
_tls.running = True
fullname = fullname.rstrip('.')
try:
pkgname, _, _ = fullname.rpartition('.')
@ -440,7 +437,7 @@ class Importer(object):
LOG.debug('find_module(%r) returning self', fullname)
return self
finally:
del self.tls.running
del _tls.running
def _load_module_hacks(self, fullname):
if fullname in ('builtins', '__builtin__'):
@ -789,42 +786,67 @@ def _unpickle_context(router, context_id, name):
class Latch(object):
def __init__(self):
rfd, wfd = os.pipe()
set_cloexec(rfd)
set_cloexec(wfd)
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
self.lock = threading.Lock()
self.queue = []
self.wake_socks = []
def close(self):
self.receive_side.close()
self.transmit_side.close()
def _tls_init(self):
if not hasattr(_tls, 'rsock'):
_tls.rsock, _tls.wsock = socket.socketpair()
set_cloexec(_tls.rsock.fileno())
set_cloexec(_tls.wsock.fileno())
def empty(self):
return len(self.queue) == 0
__del__ = close
def get(self, timeout=None, block=True):
self.lock.acquire()
try:
if self.queue:
return self.queue.pop(0)
if not block:
return
self._tls_init()
self.wake_socks.append(_tls.wsock)
finally:
self.lock.release()
def wait(self, timeout=None):
while True:
rfds, _, _ = select.select([self.receive_side], [], [], timeout)
if not rfds:
return False
rfds, _, _ = select.select([_tls.rsock], [], [], timeout)
assert len(rfds) or timeout is None
self.lock.acquire()
try:
self.receive_side.read(1)
except OSError, e:
if e[0] == errno.EWOULDBLOCK:
continue
raise
return False
if _tls.wsock in self.wake_socks:
# Nothing woke us, remove stale entry.
self.wake_socks.remove(_tls.wsock)
return
if _tls.rsock in rfds:
_tls.rsock.recv(1)
return self.queue.pop(0)
finally:
self.lock.release()
def wake(self):
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
def put(self, obj):
IOLOG.debug('%r.put(%r)', self, obj)
self.lock.acquire()
try:
self.transmit_side.write(' ')
self.queue.append(obj)
woken = len(self.wake_socks) > 0
if woken:
self._wake(self.wake_socks.pop(0))
finally:
self.lock.release()
LOG.debug('put() done. woken? %s', woken)
def _wake(self, sock):
try:
os.write(sock.fileno(), '\x00')
except OSError, e:
if e[0] != errno.EBADF:
raise
class Waker(Latch, BasicStream):
class Waker(BasicStream):
"""
:py:class:`BasicStream` subclass implementing the
`UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when
@ -833,8 +855,12 @@ class Waker(Latch, BasicStream):
.. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
"""
def __init__(self, broker):
super(Waker, self).__init__()
self._broker = broker
rfd, wfd = os.pipe()
set_cloexec(rfd)
set_cloexec(wfd)
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
def __repr__(self):
return 'Waker(%r)' % (self._broker,)
@ -850,8 +876,13 @@ class Waker(Latch, BasicStream):
Write a byte to the self-pipe, causing the IO multiplexer to wake up.
Nothing is written if the current thread is the IO multiplexer thread.
"""
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
if threading.currentThread() != self._broker._thread:
super(Waker, self).wake()
try:
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
class IoLogger(BasicStream):

Loading…
Cancel
Save