From ee0f21d57f5e4efc51e8f96910b6ee915444c678 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 29 Mar 2018 14:15:43 +0545 Subject: [PATCH] core: remove Queue locking from broker loop. Move defer handling out of Broker and into Waker (where it belongs?). Now the lock must only be taken if Waker was actually woken. Knocks 400-item run_hostname_100_times from 10.62s to 10.05s (-5.3%). --- mitogen/core.py | 111 +++++++++++++++++++++++++++++------------------- 1 file changed, 68 insertions(+), 43 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 4cac056c..9bc0a2c8 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -26,7 +26,6 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -import Queue import cPickle import cStringIO import collections @@ -41,6 +40,7 @@ import signal import socket import struct import sys +import thread import threading import time import traceback @@ -1039,14 +1039,19 @@ class Latch(object): class Waker(BasicStream): """ - :py:class:`BasicStream` subclass implementing the - `UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when - some of its state has been changed by another thread. + :py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. + Used to wake the multiplexer when another thread needs to modify its state + (via a cross-thread function call). .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html """ + broker_ident = None + def __init__(self, broker): self._broker = broker + self._lock = threading.Lock() + self._deferred = [] + rfd, wfd = os.pipe() self.receive_side = Side(self, rfd) self.transmit_side = Side(self, wfd) @@ -1058,24 +1063,63 @@ class Waker(BasicStream): self.transmit_side.fd, ) - def on_receive(self, broker): + @property + def keep_alive(self): """ - Read a byte from the self-pipe. + Prevent immediate Broker shutdown while deferred functions remain. """ - self.receive_side.read(256) + self._lock.acquire() + try: + return len(self._deferred) + finally: + self._lock.release() - def wake(self): + def on_receive(self, broker): """ - Write a byte to the self-pipe, causing the IO multiplexer to wake up. - Nothing is written if the current thread is the IO multiplexer thread. + Drain the pipe and fire callbacks. Reading multiple bytes is safe since + new bytes corresponding to future .defer() calls are written only after + .defer() takes _lock: either a byte we read corresponds to something + already on the queue by the time we take _lock, or a byte remains + buffered, causing another wake up, because it was written after we + released _lock. """ - _vv and IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) - if threading.currentThread() != self._broker._thread: + _vv and IOLOG.debug('%r.on_receive()', self) + self.receive_side.read(128) + self._lock.acquire() + try: + deferred = self._deferred + self._deferred = [] + finally: + self._lock.release() + + for func, args, kwargs in deferred: try: - self.transmit_side.write(' ') - except OSError, e: - if e[0] != errno.EBADF: - raise + func(*args, **kwargs) + except Exception: + LOG.exception('defer() crashed: %r(*%r, **%r)', + func, args, kwargs) + self._broker.shutdown() + + def defer(self, func, *args, **kwargs): + if thread.get_ident() == self.broker_ident: + _vv and IOLOG.debug('%r.defer() [immediate]', self) + return func(*args, **kwargs) + + _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) + self._lock.acquire() + try: + self._deferred.append((func, args, kwargs)) + finally: + self._lock.release() + + # Wake the multiplexer by writing a byte. If the broker is in the midst + # of tearing itself down, the waker fd may already have been closed, so + # ignore EBADF here. + try: + self.transmit_side.write(' ') + except OSError, e: + if e[0] != errno.EBADF: + raise class IoLogger(BasicStream): @@ -1242,24 +1286,17 @@ class Broker(object): def __init__(self): self._alive = True - self._queue = Queue.Queue() - self._readers = [] - self._writers = [] self._waker = Waker(self) - self.start_receive(self._waker) + self.defer = self._waker.defer + self._readers = [self._waker.receive_side] + self._writers = [] self._thread = threading.Thread( target=_profile_hook, args=('broker', self._broker_main), name='mitogen-broker' ) self._thread.start() - - def defer(self, func, *args, **kwargs): - if threading.currentThread() == self._thread: - func(*args, **kwargs) - else: - self._queue.put((func, args, kwargs)) - self._waker.wake() + self._waker.broker_ident = self._thread.ident def _list_discard(self, lst, value): try: @@ -1296,19 +1333,8 @@ class Broker(object): LOG.exception('%r crashed', stream) stream.on_disconnect(self) - def _run_defer(self): - while not self._queue.empty(): - func, args, kwargs = self._queue.get() - try: - func(*args, **kwargs) - except Exception: - LOG.exception('defer() crashed: %r(*%r, **%r)', - func, args, kwargs) - self.shutdown() - def _loop_once(self, timeout=None): _vv and IOLOG.debug('%r._loop_once(%r)', self, timeout) - self._run_defer() #IOLOG.debug('readers = %r', self._readers) #IOLOG.debug('writers = %r', self._writers) @@ -1327,15 +1353,13 @@ class Broker(object): self._call(side.stream, side.stream.on_transmit) def keep_alive(self): - return (sum((side.keep_alive for side in self._readers), 0) + - (not self._queue.empty())) + return sum((side.keep_alive for side in self._readers), 0) def _broker_main(self): try: while self._alive: self._loop_once() - self._run_defer() fire(self, 'shutdown') for side in set(self._readers).union(self._writers): @@ -1361,8 +1385,9 @@ class Broker(object): def shutdown(self): _v and LOG.debug('%r.shutdown()', self) - self._alive = False - self._waker.wake() + def _shutdown(): + self._alive = False + self.defer(_shutdown) def join(self): self._thread.join()