issue #156: fix several more races

* Don't need to sleep if queue>sleepers, can just pop the right queue
  element and return it.

* If queue>sleeping and waking==sleeping, no mechanism existed to ensure
  a thread newly added to sleeping would ever be woken. Above change
  fixes that.

* Cannot trust select() return value, scheduler might sleep us
  indefinitely while put() writes a byte.

* Sleeping threads didn't pop FIFO, they popped in whatever order
  scheduler woke them up. Must recover index and use it to pick the pop
  index.
pull/167/head
David Wilson 7 years ago
parent 526b0a514b
commit 20f5d89dfa

@ -859,6 +859,8 @@ threads, with a minimum of 4 required in any configuration.
Latch Internals Latch Internals
~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~
.. currentmodule:: mitogen.core
Attributes: Attributes:
* `lock` :py:class:`threading.Lock`. * `lock` :py:class:`threading.Lock`.
@ -868,15 +870,15 @@ Attributes:
* `waking` integer number of `sleeping` threads in the process of waking up. * `waking` integer number of `sleeping` threads in the process of waking up.
* `closed` boolean defaulting to :py:data:`False`. Every time `lock` * `closed` boolean defaulting to :py:data:`False`. Every time `lock`
is acquired, `closed` must be tested, and if it is :py:data:`True`, is acquired, `closed` must be tested, and if it is :py:data:`True`,
:py:class:`mitogen.core.LatchError` must be thrown. :py:class:`LatchError` must be thrown.
Latch.put() Latch.put()
~~~~~~~~~~~ ~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.put` operates by: :py:meth:`Latch.put` operates by:
1. Acquiring `lock` 1. Acquiring `lock`.
2. Appending the item on to `queue`. 2. Appending the item on to `queue`.
3. If `waking` is less than the length of `sleeping`, write a byte to the 3. If `waking` is less than the length of `sleeping`, write a byte to the
socket at `sleeping[waking]` and increment `waking`. socket at `sleeping[waking]` and increment `waking`.
@ -886,14 +888,13 @@ to when its socket was placed on `sleeping`.
Latch.close() Latch.close()
~~~~~~~~~~~ ~~~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.close` acquires `lock`, sets `closed` to :py:meth:`Latch.close` acquires `lock`, sets `closed` to :py:data:`True`, then
:py:data:`True`, then writes a byte to every `sleeping[waking]` socket, while writes a byte to every `sleeping[waking]` socket, while incrementing `waking`,
incrementing `waking`, until no more unwoken sockets exist. Per above, on until no more unwoken sockets exist. Per above, on waking from sleep, after
waking from sleep, after removing itself from `sleeping`, each sleeping thread removing itself from `sleeping`, each sleeping thread tests if `closed` is
tests if `closed` is :py:data:`True`, and if so throws :py:data:`True`, and if so throws :py:class:`LatchError`.
:py:class:`mitogen.core.LatchError`.
It is necessary to ensure at most one byte is delivered on each socket, even if It is necessary to ensure at most one byte is delivered on each socket, even if
the latch is being torn down, as the sockets outlive the scope of a single the latch is being torn down, as the sockets outlive the scope of a single
@ -904,42 +905,53 @@ unexpected wakeups if future latches sleep on the same thread.
Latch.get() Latch.get()
~~~~~~~~~~~ ~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.get` is far more fiddly, as there are a variety of :py:meth:`Latch.get` is far more intricate, as there are many outcomes to
outcomes to handle. Queue ordering is strictly first-in first-out, and the handle. Queue ordering is strictly first-in first-out, and threads always
first thread to attempt to retrieve an item always receives the first available receive items in the order they are requested, as they become available.
item.
**1. Non-empty, No Waiters, No sleep** **1. Non-empty, No Waiters, No sleep**
On entry `lock` is taken, and if `queue` is non-empty, and `sleeping` is On entry `lock` is taken, and if `queue` is non-empty, and `sleeping` is
empty, it is safe to return `queue`'s first item without blocking. empty, it is safe to return `queue`'s first item without blocking.
**2. Non-empty, Waiters Present, Sleep** **2. Non-empty, Waiters Present, Queue > Waiters, No sleep**
In this case `sleeping` is non-empty, and it is not safe to pop the item When `sleeping` is non-empty but there are more items than sleeping
even though we are holding `lock`, as it would bump the calling thread to threads, it is safe to pop `queue[len(sleeping)]` without blocking.
the front of the line, starving any sleeping thread of their item, since a
race exists between a thread waking from :py:func:`select.select` and **3. Non-empty, Waiters Present, Queue <= Waiters**
re-acquiring `lock`. In this case `sleeping` is non-empty and there are no surplus items. It is
not safe to pop any item even though we are holding `lock`, as it would
starve waking threads of their position in favour of the calling thread,
since scheduling uncertainty exists between a thread waking from
:py:func:`select.select` and re-acquiring `lock`.
This avoids the need for a retry loop for waking threads, and a thread This avoids the need for a retry loop for waking threads, and a thread
being continually re-woken to discover `queue` drained by a thread that being continually re-woken to discover `queue` drained by a thread that
never slept. never slept.
**3. Sleep** **4. Sleep**
Since `queue` was empty, or `sleeping` was non-empty, the thread adds its Since no surplus items existed, the thread adds its socket to `sleeping`
socket to `sleeping` before releasing `lock`, and sleeping in before releasing `lock`, and sleeping in :py:func:`select.select` waiting
:py:func:`select.select` waiting for a write from for timeout, or a write from :py:meth:`Latch.put` or
:py:meth:`mitogen.core.Latch.put`. :py:meth:`Latch.close`.
**4. Wake, Non-empty** **5. Wake, Non-empty**
On wake it re-acquires `lock`, removes itself from `sleeping`, throws On wake `lock` is re-acquired, the socket is removed from `sleeping` after
:py:class:`mitogen.core.TimeoutError` if no byte was written, decrements noting its index, and :py:class:`TimeoutError` is thrown if `waking`
`waking`, then pops and returns the first item in `queue` that is indicates :py:meth:`Latch.put()` nor :py:meth:`Latch.close` have yet to
guaranteed to exist. send a wake byte to that index. The byte is then read off,
:py:class:`LatchError` is thrown if `closed` is :py:data:`True`, otherwise
It is paramount that in every case, if :py:func:`select.select` indicates a the queue item corresponding to the thread's index is popped and returned.
byte was written to the socket, that the byte is read away. The socket is
reused by subsequent latches sleeping on the same thread, and unexpected It is paramount that in every case, if a byte was written to the socket,
wakeups are triggered if extraneous data remains buffered on the socket. that the byte is read away. The socket is reused by subsequent latches
sleeping on the same thread, and unexpected wakeups are triggered if
extraneous data remains buffered on the socket.
It is also necessary to favour the synchronized `waking` variable over the
return value of :py:func:`select.select`, as scheduling uncertainty
introduces a race between the select timing out, and :py:meth:`Latch.put()`
or :py:meth:`Latch.close` writing a wake byte before :py:meth:`Latch.get`
has re-acquired `lock`.
.. rubric:: Footnotes .. rubric:: Footnotes

@ -941,9 +941,10 @@ class Latch(object):
try: try:
if self.closed: if self.closed:
raise LatchError() raise LatchError()
if self._queue and not self._sleeping: i = len(self._sleeping)
_vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0]) if len(self._queue) > i:
return self._queue.pop(0) _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[i])
return self._queue.pop(i)
if not block: if not block:
raise TimeoutError() raise TimeoutError()
self._tls_init() self._tls_init()
@ -952,25 +953,21 @@ class Latch(object):
self._lock.release() self._lock.release()
_vv and IOLOG.debug('%r.get() -> sleeping', self) _vv and IOLOG.debug('%r.get() -> sleeping', self)
rfds, _, _ = restart(select.select, [_tls.rsock], [], [], timeout) restart(select.select, [_tls.rsock], [], [], timeout)
assert len(rfds) or timeout is not None
self._lock.acquire() self._lock.acquire()
try: try:
self._sleeping.remove(_tls.wsock) i = self._sleeping.index(_tls.wsock)
if not rfds: del self._sleeping[i]
self._waking -= 1
if i > self._waking:
raise TimeoutError() raise TimeoutError()
if _tls.rsock.recv(2) != '\x7f': if _tls.rsock.recv(2) != '\x7f':
raise LatchError('internal error: received >1 wakeups') raise LatchError('internal error: received >1 wakeups')
self._waking -= 1
if self.closed: if self.closed:
raise LatchError() raise LatchError()
try: _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[i])
_vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0]) return self._queue.pop(i)
return self._queue.pop(0)
except IndexError:
IOLOG.exception('%r.get() INDEX ERROR', self)
raise
finally: finally:
self._lock.release() self._lock.release()

Loading…
Cancel
Save