diff --git a/mitogen/core.py b/mitogen/core.py index 2b1771a5..2ac9bdbe 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1945,17 +1945,14 @@ class Waker(BasicStream): def on_receive(self, broker): """ - Drain the pipe and fire callbacks. Reading multiple bytes is safe since - new bytes corresponding to future .defer() calls are written only after - .defer() takes _lock: either a byte we read corresponds to something - already on the queue by the time we take _lock, or a byte remains - buffered, causing another wake up, because it was written after we - released _lock. + Drain the pipe and fire callbacks. Since :attr:`_deferred` is + synchronized, :meth:`defer` and :meth:`on_receive` can conspire to + ensure only one byte needs to be pending regardless of queue length. """ _vv and IOLOG.debug('%r.on_receive()', self) - self.receive_side.read(128) self._lock.acquire() try: + self.receive_side.read(1) deferred = self._deferred self._deferred = [] finally: @@ -1969,6 +1966,18 @@ class Waker(BasicStream): func, args, kwargs) self._broker.shutdown() + def _wake(self): + """ + Wake the multiplexer by writing a byte. If Broker is midway through + teardown, the FD may already be closed, so ignore EBADF. + """ + try: + self.transmit_side.write(b(' ')) + except OSError: + e = sys.exc_info()[1] + if e.args[0] != errno.EBADF: + raise + def defer(self, func, *args, **kwargs): if threading.currentThread().ident == self.broker_ident: _vv and IOLOG.debug('%r.defer() [immediate]', self) @@ -1977,20 +1986,12 @@ class Waker(BasicStream): _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) self._lock.acquire() try: + if not self._deferred: + self._wake() self._deferred.append((func, args, kwargs)) finally: self._lock.release() - # Wake the multiplexer by writing a byte. If the broker is in the midst - # of tearing itself down, the waker fd may already have been closed, so - # ignore EBADF here. - try: - self.transmit_side.write(b(' ')) - except OSError: - e = sys.exc_info()[1] - if e.args[0] != errno.EBADF: - raise - class IoLogger(BasicStream): """