diff --git a/docs/howitworks.rst b/docs/howitworks.rst index ef6b35cf..4710b2f4 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -862,12 +862,11 @@ Latch Internals Attributes: * `lock` – :py:class:`threading.Lock`. -* `queue` – enqueued items. -* `wake_socks` – the write sides of the socketpairs for each currently - sleeping thread. While the lock is held, a non-empty `wake_socks` indicates - not only the presence of sleeping threads, but threads that have recently - woken but have not yet to retrieved their item from `queue`. -* `closed` – a simple boolean defaulting to :py:data:`False`. Every time `lock` +* `queue` – items waiting to be dequeued. +* `sleeping` – write sides of the socketpairs for each sleeping thread, and + threads in the process of waking from sleep. +* `waking` – integer number of `sleeping` threads in the process of waking up. +* `closed` – boolean defaulting to :py:data:`False`. Every time `lock` is acquired, `closed` must be tested, and if it is :py:data:`True`, :py:class:`mitogen.core.LatchError` must be thrown. @@ -875,17 +874,23 @@ Attributes: Latch.put() ~~~~~~~~~~~ -:py:meth:`mitogen.core.Latch.put` operates simply by acquiring `lock`, -appending the item on to `queue`, then if `wake_socks` is non-empty, a byte is -written to the first socket in the list before finally releasing `lock`. +:py:meth:`mitogen.core.Latch.put` operates by: + +1. Acquiring `lock` +2. Appending the item on to `queue`. +3. If `waking` is less than the length of `sleeping`, write a byte to the + socket at `sleeping[waking]` and increment `waking`. + +In this way each thread is woken only once, and receives each element according +to when its socket was placed on `sleeping`. Latch.close() ~~~~~~~~~~~ :py:meth:`mitogen.core.Latch.close` acquires `lock`, sets `closed` to -:py:data:`True`, then writes a byte to every socket in `wake_socks`. As above, -on waking from sleep, after removing itself from `wake_socks`, each sleeping +:py:data:`True`, then writes a byte to every socket in `sleeping`. Per above, +on waking from sleep, after removing itself from `sleeping`, each sleeping thread tests if `closed` is :py:data:`True`, and if so throws :py:class:`mitogen.core.LatchError`. @@ -899,30 +904,31 @@ first thread to attempt to retrieve an item always receives the first available item. **1. Non-empty, No Waiters, No sleep** - On entry `lock` is taken, and if `queue` is non-empty, and `wake_socks` is + On entry `lock` is taken, and if `queue` is non-empty, and `sleeping` is empty, it is safe to return `queue`'s first item without blocking. **2. Non-empty, Waiters Present, Sleep** - In this case `wake_socks` is non-empty, and it is not safe to pop the item + In this case `sleeping` is non-empty, and it is not safe to pop the item even though we are holding `lock`, as it would bump the calling thread to the front of the line, starving any sleeping thread of their item, since a - race exists between a thread waking from :py:func:`select.select` and its - re-acquiring of `lock`. + race exists between a thread waking from :py:func:`select.select` and + re-acquiring `lock`. - This avoids the need for a retry loop for waking threads, and a sleeping - thread being continually re-woken only to discover `queue` drained by a - thread that never slept. + This avoids the need for a retry loop for waking threads, and a thread + being continually re-woken to discover `queue` drained by a thread that + never slept. **3. Sleep** - Since `queue` was empty, or `wake_socks` was non-empty, the thread adds its - socket to `wake_socks` before releasing `lock`, and sleeping in + Since `queue` was empty, or `sleeping` was non-empty, the thread adds its + socket to `sleeping` before releasing `lock`, and sleeping in :py:func:`select.select` waiting for a write from :py:meth:`mitogen.core.Latch.put`. **4. Wake, Non-empty** - On wake it re-acquires `lock`, removes itself from `wake_socks`, throws - :py:class:`mitogen.core.TimeoutError` if no byte was written, otherwise - pops and returns the first item in `queue` that is guaranteed to exist. + On wake it re-acquires `lock`, removes itself from `sleeping`, decrementing + `waking`, throws :py:class:`mitogen.core.TimeoutError` if no byte was + written, otherwise pops and returns the first item in `queue` that is + guaranteed to exist. .. rubric:: Footnotes diff --git a/mitogen/core.py b/mitogen/core.py index 9cd317b6..b9dd1488 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -907,12 +907,12 @@ def _unpickle_context(router, context_id, name): class Latch(object): closed = False + _waking = 0 def __init__(self): self._lock = threading.Lock() self._queue = [] self._sleeping = [] - self._pending = {} def close(self): self._lock.acquire() @@ -940,7 +940,7 @@ class Latch(object): try: if self.closed: raise LatchError() - if self._queue: + if self._queue and not self._sleeping: _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0]) return self._queue.pop(0) if not block: @@ -956,6 +956,8 @@ class Latch(object): self._lock.acquire() try: + self._sleeping.remove(_tls.wsock) + self._waking -= 1 if self.closed: raise LatchError() if not rfds: @@ -963,9 +965,8 @@ class Latch(object): if _tls.rsock.recv(2) != '\x7f': raise LatchError('internal error: received >1 wakeups') try: - obj = self._pending.pop(_tls.wsock) - _vv and IOLOG.debug('%r.get() wake -> %r', self, obj) - return obj + _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0]) + return self._queue.pop(0) except IndexError: IOLOG.exception('%r.get() INDEX ERROR', self) raise @@ -978,14 +979,13 @@ class Latch(object): try: if self.closed: raise LatchError() - if self._sleeping: - sock = self._sleeping.pop(0) - self._pending[sock] = obj + self._queue.append(obj) + if self._waking < len(self._sleeping): + sock = self._sleeping[self._waking] + self._waking += 1 _vv and IOLOG.debug('%r.put() -> waking wfd=%r', self, sock.fileno()) self._wake(sock) - else: - self._queue.append(obj) finally: self._lock.release()