|
|
|
@ -909,21 +909,21 @@ class Latch(object):
|
|
|
|
|
closed = False
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.lock = threading.Lock()
|
|
|
|
|
self.queue = []
|
|
|
|
|
self.wake_socks = []
|
|
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
self._queue = []
|
|
|
|
|
self._waiters = []
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
self.lock.acquire()
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
self.closed = True
|
|
|
|
|
for wsock in self.wake_socks:
|
|
|
|
|
for wsock in self._waiters:
|
|
|
|
|
self._wake(wsock)
|
|
|
|
|
finally:
|
|
|
|
|
self.lock.release()
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
|
|
|
|
def empty(self):
|
|
|
|
|
return len(self.queue) == 0
|
|
|
|
|
return len(self._queue) == 0
|
|
|
|
|
|
|
|
|
|
def _tls_init(self):
|
|
|
|
|
if not hasattr(_tls, 'rsock'):
|
|
|
|
@ -935,54 +935,54 @@ class Latch(object):
|
|
|
|
|
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)',
|
|
|
|
|
self, timeout, block)
|
|
|
|
|
|
|
|
|
|
self.lock.acquire()
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
if self.closed:
|
|
|
|
|
raise LatchError()
|
|
|
|
|
if self.queue and not self.wake_socks:
|
|
|
|
|
_vv and IOLOG.debug('%r.get() -> %r', self, self.queue[0])
|
|
|
|
|
return self.queue.pop(0)
|
|
|
|
|
if self._queue and not self._waiters:
|
|
|
|
|
_vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0])
|
|
|
|
|
return self._queue.pop(0)
|
|
|
|
|
if not block:
|
|
|
|
|
raise TimeoutError()
|
|
|
|
|
self._tls_init()
|
|
|
|
|
self.wake_socks.append(_tls.wsock)
|
|
|
|
|
self._waiters.append(_tls.wsock)
|
|
|
|
|
finally:
|
|
|
|
|
self.lock.release()
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
|
|
|
|
_vv and IOLOG.debug('%r.get() -> sleeping', self)
|
|
|
|
|
rfds, _, _ = restart(select.select, [_tls.rsock], [], [], timeout)
|
|
|
|
|
assert len(rfds) or timeout is not None
|
|
|
|
|
|
|
|
|
|
self.lock.acquire()
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
self.wake_socks.remove(_tls.wsock)
|
|
|
|
|
self._waiters.remove(_tls.wsock)
|
|
|
|
|
if self.closed:
|
|
|
|
|
raise LatchError()
|
|
|
|
|
if not rfds:
|
|
|
|
|
raise TimeoutError()
|
|
|
|
|
assert _tls.rsock.recv(1) == '\x7f'
|
|
|
|
|
try:
|
|
|
|
|
_vv and IOLOG.debug('%r.get() wake -> %r', self, self.queue[0])
|
|
|
|
|
return self.queue.pop(0)
|
|
|
|
|
_vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0])
|
|
|
|
|
return self._queue.pop(0)
|
|
|
|
|
except IndexError:
|
|
|
|
|
IOLOG.exception('%r.get() INDEX ERROR', self)
|
|
|
|
|
raise
|
|
|
|
|
finally:
|
|
|
|
|
self.lock.release()
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
|
|
|
|
def put(self, obj):
|
|
|
|
|
_vv and IOLOG.debug('%r.put(%r)', self, obj)
|
|
|
|
|
self.lock.acquire()
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
if self.closed:
|
|
|
|
|
raise LatchError()
|
|
|
|
|
self.queue.append(obj)
|
|
|
|
|
if self.wake_socks:
|
|
|
|
|
self._queue.append(obj)
|
|
|
|
|
if self._waiters:
|
|
|
|
|
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
|
|
|
|
|
self, self.wake_socks[0].fileno())
|
|
|
|
|
self._wake(self.wake_socks[0])
|
|
|
|
|
self, self._waiters[0].fileno())
|
|
|
|
|
self._wake(self._waiters[0])
|
|
|
|
|
finally:
|
|
|
|
|
self.lock.release()
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
|
|
|
|
def _wake(self, sock):
|
|
|
|
|
try:
|
|
|
|
@ -996,7 +996,7 @@ class Latch(object):
|
|
|
|
|
wsock = getattr(_tls, 'wsock', None)
|
|
|
|
|
return 'Latch(%#x, size=%d, t=%r, r=%r, w=%r)' % (
|
|
|
|
|
id(self),
|
|
|
|
|
len(self.queue),
|
|
|
|
|
len(self._queue),
|
|
|
|
|
threading.currentThread().name,
|
|
|
|
|
rsock and rsock.fileno(),
|
|
|
|
|
wsock and wsock.fileno(),
|
|
|
|
|