diff --git a/docs/howitworks.rst b/docs/howitworks.rst index a9dd503a..c5197b18 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -813,6 +813,47 @@ on Windows. When that happens, it would be nice if the process model on Windows and UNIX did not differ, and in fact the code used on both were identical. +Waking Sleeping Threads +####################### + +Due to fundamental deficiencies in Python 2's threading implementation, it is +not possible to block waiting on synchronization objects sanely. Two major +problems exist: + +* Sleeping with no timeout set causes signals to be blocked, preventing the + user from using CTRL+C to terminate the process. + +* Sleeping with a timeout set internally makes use of polling, with an + exponential backoff that eventually results in the thread sleeping + unconditionally in 50ms increments. . This is a huge source of latency that + quickly multiplies. + +As the UNIX self-pipe trick must already be employed to wake the broker thread +from its select loop, Mitogen reuses this technique to wake any thread +synchronization primitive exposed by the library, embodied in a queue-like +abstraction called a :py:class:`mitogen.core.Latch`. + +Unfortunately it is commonplace for hosts to enforce severe per-process file +descriptors limits, so aside from being inefficient, it is impossible in the +usual case to create a pair of descriptors for every waitable object, which for +example includes the result of every single asynchronous function call. + +For this reason self-pipes are created on a per-thread basis, with their +associated :py:func:`socketpairs ` kept in thread-local +storage. When a latch wishes to sleep its thread, this pair is created +on-demand and temporarily associated with it only for the duration of the +sleep. + +Python's garbage collector is relied on to clean up by calling the pair's +destructor on thread exit. There does not otherwise seem to be a robust method +to trigger cleanup code on arbitrary threads. + +To summarize, file descriptor usage is bounded by the number of threads rather +than the number of waitables, which is a much smaller number, however it also +means that Mitogen requires twice as many file descriptors as there are user +threads, with a minimum of 4 required in any configuration. + + .. rubric:: Footnotes .. [#f1] Compression may seem redundant, however it is basically free and reducing IO diff --git a/mitogen/core.py b/mitogen/core.py index ca796ab2..964bcd25 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -64,6 +64,7 @@ SHUTDOWN = 105 LOAD_MODULE = 106 CHUNK_SIZE = 16384 +_tls = threading.local() if __name__ == 'mitogen.core': @@ -327,7 +328,6 @@ class Receiver(object): self.handle = handle # Avoid __repr__ crash in add_handler() self.handle = router.add_handler(self._on_receive, handle, persist, respondent) - self._queue = Queue.Queue() self._latch = Latch() def __repr__(self): @@ -336,21 +336,19 @@ class Receiver(object): def _on_receive(self, msg): """Callback from the Stream; appends data to the internal queue.""" IOLOG.debug('%r._on_receive(%r)', self, msg) - self._queue.put(msg) - self._latch.wake() + self._latch.put(msg) if self.notify: self.notify(self) def close(self): - self._queue.put(_DEAD) + self._latch.put(_DEAD) def empty(self): - return self._queue.empty() + return self._latch.empty() def get(self, timeout=None, block=True): IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) - self._latch.wait(timeout=timeout) - msg = self._queue.get() + msg = self._latch.get(timeout=timeout, block=block) #IOLOG.debug('%r.get() got %r', self, msg) if msg == _DEAD: @@ -399,7 +397,6 @@ class Importer(object): self._lock = threading.Lock() # Presence of an entry in this map indicates in-flight GET_MODULE. self._callbacks = {} - self.tls = threading.local() router.add_handler(self._on_load_module, LOAD_MODULE) self._cache = {} if core_src: @@ -415,10 +412,10 @@ class Importer(object): return 'Importer()' def find_module(self, fullname, path=None): - if hasattr(self.tls, 'running'): + if hasattr(_tls, 'running'): return None - self.tls.running = True + _tls.running = True fullname = fullname.rstrip('.') try: pkgname, _, _ = fullname.rpartition('.') @@ -440,7 +437,7 @@ class Importer(object): LOG.debug('find_module(%r) returning self', fullname) return self finally: - del self.tls.running + del _tls.running def _load_module_hacks(self, fullname): if fullname in ('builtins', '__builtin__'): @@ -789,42 +786,67 @@ def _unpickle_context(router, context_id, name): class Latch(object): def __init__(self): - rfd, wfd = os.pipe() - set_cloexec(rfd) - set_cloexec(wfd) - self.receive_side = Side(self, rfd) - self.transmit_side = Side(self, wfd) + self.lock = threading.Lock() + self.queue = [] + self.wake_socks = [] - def close(self): - self.receive_side.close() - self.transmit_side.close() + def _tls_init(self): + if not hasattr(_tls, 'rsock'): + _tls.rsock, _tls.wsock = socket.socketpair() + set_cloexec(_tls.rsock.fileno()) + set_cloexec(_tls.wsock.fileno()) - __del__ = close + def empty(self): + return len(self.queue) == 0 - def wait(self, timeout=None): - while True: - rfds, _, _ = select.select([self.receive_side], [], [], timeout) - if not rfds: - return False + def get(self, timeout=None, block=True): + self.lock.acquire() + try: + if self.queue: + return self.queue.pop(0) + if not block: + return + self._tls_init() + self.wake_socks.append(_tls.wsock) + finally: + self.lock.release() - try: - self.receive_side.read(1) - except OSError, e: - if e[0] == errno.EWOULDBLOCK: - continue - raise - return False + rfds, _, _ = select.select([_tls.rsock], [], [], timeout) + assert len(rfds) or timeout is None - def wake(self): - IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) + self.lock.acquire() + try: + if _tls.wsock in self.wake_socks: + # Nothing woke us, remove stale entry. + self.wake_socks.remove(_tls.wsock) + return + if _tls.rsock in rfds: + _tls.rsock.recv(1) + return self.queue.pop(0) + finally: + self.lock.release() + + def put(self, obj): + IOLOG.debug('%r.put(%r)', self, obj) + self.lock.acquire() + try: + self.queue.append(obj) + woken = len(self.wake_socks) > 0 + if woken: + self._wake(self.wake_socks.pop(0)) + finally: + self.lock.release() + LOG.debug('put() done. woken? %s', woken) + + def _wake(self, sock): try: - self.transmit_side.write(' ') + os.write(sock.fileno(), '\x00') except OSError, e: if e[0] != errno.EBADF: raise -class Waker(Latch, BasicStream): +class Waker(BasicStream): """ :py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when @@ -833,8 +855,12 @@ class Waker(Latch, BasicStream): .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html """ def __init__(self, broker): - super(Waker, self).__init__() self._broker = broker + rfd, wfd = os.pipe() + set_cloexec(rfd) + set_cloexec(wfd) + self.receive_side = Side(self, rfd) + self.transmit_side = Side(self, wfd) def __repr__(self): return 'Waker(%r)' % (self._broker,) @@ -850,8 +876,13 @@ class Waker(Latch, BasicStream): Write a byte to the self-pipe, causing the IO multiplexer to wake up. Nothing is written if the current thread is the IO multiplexer thread. """ + IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) if threading.currentThread() != self._broker._thread: - super(Waker, self).wake() + try: + self.transmit_side.write(' ') + except OSError, e: + if e[0] != errno.EBADF: + raise class IoLogger(BasicStream):