|
|
|
@ -1175,6 +1175,21 @@ class Poller(object):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Latch(object):
|
|
|
|
|
"""
|
|
|
|
|
A latch is a :py:class:`Queue.Queue`-like object that supports mutation and
|
|
|
|
|
waiting from multiple threads, however unlike :py:class:`Queue.Queue`,
|
|
|
|
|
waiting threads always remain interruptible, so CTRL+C always succeeds, and
|
|
|
|
|
waits where a timeout is set experience no wake up latency. These
|
|
|
|
|
properties are not possible in combination using the built-in threading
|
|
|
|
|
primitives available in Python 2.x.
|
|
|
|
|
|
|
|
|
|
Latches implement queues using the UNIX self-pipe trick, and a per-thread
|
|
|
|
|
:py:func:`socket.socketpair` that is lazily created the first time any
|
|
|
|
|
latch attempts to sleep on a thread, and dynamically associated with the
|
|
|
|
|
waiting Latch only for duration of the wait.
|
|
|
|
|
|
|
|
|
|
See :ref:`waking-sleeping-threads` for further discussion.
|
|
|
|
|
"""
|
|
|
|
|
poller_class = Poller
|
|
|
|
|
closed = False
|
|
|
|
|
_waking = 0
|
|
|
|
@ -1188,11 +1203,18 @@ class Latch(object):
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def _on_fork(cls):
|
|
|
|
|
"""
|
|
|
|
|
Clean up any files belonging to the parent process after a fork.
|
|
|
|
|
"""
|
|
|
|
|
cls._sockets = []
|
|
|
|
|
while cls._allsockets:
|
|
|
|
|
cls._allsockets.pop().close()
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
"""
|
|
|
|
|
Mark the latch as closed, and cause every sleeping thread to be woken,
|
|
|
|
|
with :py:class:`mitogen.core.LatchError` raised in each thread.
|
|
|
|
|
"""
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
self.closed = True
|
|
|
|
@ -1203,9 +1225,25 @@ class Latch(object):
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
|
|
|
|
def empty(self):
|
|
|
|
|
"""
|
|
|
|
|
Return :py:data:`True` if calling :py:meth:`get` would block.
|
|
|
|
|
|
|
|
|
|
As with :py:class:`Queue.Queue`, :py:data:`True` may be returned even
|
|
|
|
|
though a subsequent call to :py:meth:`get` will succeed, since a
|
|
|
|
|
message may be posted at any moment between :py:meth:`empty` and
|
|
|
|
|
:py:meth:`get`.
|
|
|
|
|
|
|
|
|
|
As with :py:class:`Queue.Queue`, :py:data:`False` may be returned even
|
|
|
|
|
though a subsequent call to :py:meth:`get` will block, since another
|
|
|
|
|
waiting thread may be woken at any moment between :py:meth:`empty` and
|
|
|
|
|
:py:meth:`get`.
|
|
|
|
|
"""
|
|
|
|
|
return len(self._queue) == 0
|
|
|
|
|
|
|
|
|
|
def _tls_init(self):
|
|
|
|
|
def _get_socketpair(self):
|
|
|
|
|
"""
|
|
|
|
|
Return an unused socketpair, creating one if none exist.
|
|
|
|
|
"""
|
|
|
|
|
# pop() must be atomic, which is true for GIL-equipped interpreters.
|
|
|
|
|
try:
|
|
|
|
|
return self._sockets.pop()
|
|
|
|
@ -1217,6 +1255,25 @@ class Latch(object):
|
|
|
|
|
return rsock, wsock
|
|
|
|
|
|
|
|
|
|
def get(self, timeout=None, block=True):
|
|
|
|
|
"""
|
|
|
|
|
Return the next enqueued object, or sleep waiting for one.
|
|
|
|
|
|
|
|
|
|
:param float timeout:
|
|
|
|
|
If not :py:data:`None`, specifies a timeout in seconds.
|
|
|
|
|
|
|
|
|
|
:param bool block:
|
|
|
|
|
If :py:data:`False`, immediately raise
|
|
|
|
|
:py:class:`mitogen.core.TimeoutError` if the latch is empty.
|
|
|
|
|
|
|
|
|
|
:raises mitogen.core.LatchError:
|
|
|
|
|
:py:meth:`close` has been called, and the object is no longer valid.
|
|
|
|
|
|
|
|
|
|
:raises mitogen.core.TimeoutError:
|
|
|
|
|
Timeout was reached.
|
|
|
|
|
|
|
|
|
|
:returns:
|
|
|
|
|
The de-queued object.
|
|
|
|
|
"""
|
|
|
|
|
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)',
|
|
|
|
|
self, timeout, block)
|
|
|
|
|
self._lock.acquire()
|
|
|
|
@ -1229,7 +1286,7 @@ class Latch(object):
|
|
|
|
|
return self._queue.pop(i)
|
|
|
|
|
if not block:
|
|
|
|
|
raise TimeoutError()
|
|
|
|
|
rsock, wsock = self._tls_init()
|
|
|
|
|
rsock, wsock = self._get_socketpair()
|
|
|
|
|
self._sleeping.append(wsock)
|
|
|
|
|
finally:
|
|
|
|
|
self._lock.release()
|
|
|
|
@ -1242,11 +1299,15 @@ class Latch(object):
|
|
|
|
|
poller.close()
|
|
|
|
|
|
|
|
|
|
def _get_sleep(self, poller, timeout, block, rsock, wsock):
|
|
|
|
|
"""
|
|
|
|
|
When a result is not immediately available, sleep waiting for
|
|
|
|
|
:meth:`put` to write a byte to our socket pair.
|
|
|
|
|
"""
|
|
|
|
|
_vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)',
|
|
|
|
|
self, timeout, block)
|
|
|
|
|
e = None
|
|
|
|
|
try:
|
|
|
|
|
list(poller.poll(timeout))
|
|
|
|
|
l = list(poller.poll(timeout))
|
|
|
|
|
except Exception:
|
|
|
|
|
e = sys.exc_info()[1]
|
|
|
|
|
|
|
|
|
@ -1256,7 +1317,7 @@ class Latch(object):
|
|
|
|
|
del self._sleeping[i]
|
|
|
|
|
self._sockets.append((rsock, wsock))
|
|
|
|
|
if i >= self._waking:
|
|
|
|
|
raise e or TimeoutError()
|
|
|
|
|
raise e or TimeoutError(repr(l))
|
|
|
|
|
self._waking -= 1
|
|
|
|
|
byte = rsock.recv(10)
|
|
|
|
|
if byte != b('\x7f'):
|
|
|
|
@ -1272,6 +1333,13 @@ class Latch(object):
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
|
|
|
|
def put(self, obj):
|
|
|
|
|
"""
|
|
|
|
|
Enqueue an object, waking the first thread waiting for a result, if one
|
|
|
|
|
exists.
|
|
|
|
|
|
|
|
|
|
:raises mitogen.core.LatchError:
|
|
|
|
|
:py:meth:`close` has been called, and the object is no longer valid.
|
|
|
|
|
"""
|
|
|
|
|
_vv and IOLOG.debug('%r.put(%r)', self, obj)
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|