From 526b0a514b323ac8eae597ff4416dafd8765a293 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 20 Mar 2018 13:14:51 +0545 Subject: [PATCH] issue #156: prevent Latch.close() triggering spurious wakeups --- docs/howitworks.rst | 17 ++++++++++++++--- mitogen/core.py | 11 ++++++----- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/docs/howitworks.rst b/docs/howitworks.rst index c49828e5..ac496a67 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -889,11 +889,17 @@ Latch.close() ~~~~~~~~~~~ :py:meth:`mitogen.core.Latch.close` acquires `lock`, sets `closed` to -:py:data:`True`, then writes a byte to every socket in `sleeping`. Per above, -on waking from sleep, after removing itself from `sleeping`, each sleeping -thread tests if `closed` is :py:data:`True`, and if so throws +:py:data:`True`, then writes a byte to every `sleeping[waking]` socket, while +incrementing `waking`, until no more unwoken sockets exist. Per above, on +waking from sleep, after removing itself from `sleeping`, each sleeping thread +tests if `closed` is :py:data:`True`, and if so throws :py:class:`mitogen.core.LatchError`. +It is necessary to ensure at most one byte is delivered on each socket, even if +the latch is being torn down, as the sockets outlive the scope of a single +latch, and must never have extraneous data buffered on them, as this will cause +unexpected wakeups if future latches sleep on the same thread. + Latch.get() ~~~~~~~~~~~ @@ -930,6 +936,11 @@ item. `waking`, then pops and returns the first item in `queue` that is guaranteed to exist. + It is paramount that in every case, if :py:func:`select.select` indicates a + byte was written to the socket, that the byte is read away. The socket is + reused by subsequent latches sleeping on the same thread, and unexpected + wakeups are triggered if extraneous data remains buffered on the socket. + .. rubric:: Footnotes diff --git a/mitogen/core.py b/mitogen/core.py index 7617be26..fbe0d6eb 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -918,8 +918,9 @@ class Latch(object): self._lock.acquire() try: self.closed = True - for wsock in self._sleeping: - self._wake(wsock) + while self._waking < len(self._sleeping): + self._wake(self._sleeping[self._waking]) + self._waking += 1 finally: self._lock.release() @@ -957,13 +958,13 @@ class Latch(object): self._lock.acquire() try: self._sleeping.remove(_tls.wsock) - if self.closed: - raise LatchError() if not rfds: raise TimeoutError() - self._waking -= 1 if _tls.rsock.recv(2) != '\x7f': raise LatchError('internal error: received >1 wakeups') + self._waking -= 1 + if self.closed: + raise LatchError() try: _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0]) return self._queue.pop(0)