diff --git a/mitogen/core.py b/mitogen/core.py index 4cac056c..9bc0a2c8 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -26,7 +26,6 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -import Queue import cPickle import cStringIO import collections @@ -41,6 +40,7 @@ import signal import socket import struct import sys +import thread import threading import time import traceback @@ -1039,14 +1039,19 @@ class Latch(object): class Waker(BasicStream): """ - :py:class:`BasicStream` subclass implementing the - `UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when - some of its state has been changed by another thread. + :py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. + Used to wake the multiplexer when another thread needs to modify its state + (via a cross-thread function call). .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html """ + broker_ident = None + def __init__(self, broker): self._broker = broker + self._lock = threading.Lock() + self._deferred = [] + rfd, wfd = os.pipe() self.receive_side = Side(self, rfd) self.transmit_side = Side(self, wfd) @@ -1058,24 +1063,63 @@ class Waker(BasicStream): self.transmit_side.fd, ) - def on_receive(self, broker): + @property + def keep_alive(self): """ - Read a byte from the self-pipe. + Prevent immediate Broker shutdown while deferred functions remain. """ - self.receive_side.read(256) + self._lock.acquire() + try: + return len(self._deferred) + finally: + self._lock.release() - def wake(self): + def on_receive(self, broker): """ - Write a byte to the self-pipe, causing the IO multiplexer to wake up. - Nothing is written if the current thread is the IO multiplexer thread. + Drain the pipe and fire callbacks. Reading multiple bytes is safe since + new bytes corresponding to future .defer() calls are written only after + .defer() takes _lock: either a byte we read corresponds to something + already on the queue by the time we take _lock, or a byte remains + buffered, causing another wake up, because it was written after we + released _lock. """ - _vv and IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) - if threading.currentThread() != self._broker._thread: + _vv and IOLOG.debug('%r.on_receive()', self) + self.receive_side.read(128) + self._lock.acquire() + try: + deferred = self._deferred + self._deferred = [] + finally: + self._lock.release() + + for func, args, kwargs in deferred: try: - self.transmit_side.write(' ') - except OSError, e: - if e[0] != errno.EBADF: - raise + func(*args, **kwargs) + except Exception: + LOG.exception('defer() crashed: %r(*%r, **%r)', + func, args, kwargs) + self._broker.shutdown() + + def defer(self, func, *args, **kwargs): + if thread.get_ident() == self.broker_ident: + _vv and IOLOG.debug('%r.defer() [immediate]', self) + return func(*args, **kwargs) + + _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) + self._lock.acquire() + try: + self._deferred.append((func, args, kwargs)) + finally: + self._lock.release() + + # Wake the multiplexer by writing a byte. If the broker is in the midst + # of tearing itself down, the waker fd may already have been closed, so + # ignore EBADF here. + try: + self.transmit_side.write(' ') + except OSError, e: + if e[0] != errno.EBADF: + raise class IoLogger(BasicStream): @@ -1242,24 +1286,17 @@ class Broker(object): def __init__(self): self._alive = True - self._queue = Queue.Queue() - self._readers = [] - self._writers = [] self._waker = Waker(self) - self.start_receive(self._waker) + self.defer = self._waker.defer + self._readers = [self._waker.receive_side] + self._writers = [] self._thread = threading.Thread( target=_profile_hook, args=('broker', self._broker_main), name='mitogen-broker' ) self._thread.start() - - def defer(self, func, *args, **kwargs): - if threading.currentThread() == self._thread: - func(*args, **kwargs) - else: - self._queue.put((func, args, kwargs)) - self._waker.wake() + self._waker.broker_ident = self._thread.ident def _list_discard(self, lst, value): try: @@ -1296,19 +1333,8 @@ class Broker(object): LOG.exception('%r crashed', stream) stream.on_disconnect(self) - def _run_defer(self): - while not self._queue.empty(): - func, args, kwargs = self._queue.get() - try: - func(*args, **kwargs) - except Exception: - LOG.exception('defer() crashed: %r(*%r, **%r)', - func, args, kwargs) - self.shutdown() - def _loop_once(self, timeout=None): _vv and IOLOG.debug('%r._loop_once(%r)', self, timeout) - self._run_defer() #IOLOG.debug('readers = %r', self._readers) #IOLOG.debug('writers = %r', self._writers) @@ -1327,15 +1353,13 @@ class Broker(object): self._call(side.stream, side.stream.on_transmit) def keep_alive(self): - return (sum((side.keep_alive for side in self._readers), 0) + - (not self._queue.empty())) + return sum((side.keep_alive for side in self._readers), 0) def _broker_main(self): try: while self._alive: self._loop_once() - self._run_defer() fire(self, 'shutdown') for side in set(self._readers).union(self._writers): @@ -1361,8 +1385,9 @@ class Broker(object): def shutdown(self): _v and LOG.debug('%r.shutdown()', self) - self._alive = False - self._waker.wake() + def _shutdown(): + self._alive = False + self.defer(_shutdown) def join(self): self._thread.join()