diff --git a/mitogen/core.py b/mitogen/core.py index edc15f68..ad9511cc 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -909,21 +909,21 @@ class Latch(object): closed = False def __init__(self): - self.lock = threading.Lock() - self.queue = [] - self.wake_socks = [] + self._lock = threading.Lock() + self._queue = [] + self._waiters = [] def close(self): - self.lock.acquire() + self._lock.acquire() try: self.closed = True - for wsock in self.wake_socks: + for wsock in self._waiters: self._wake(wsock) finally: - self.lock.release() + self._lock.release() def empty(self): - return len(self.queue) == 0 + return len(self._queue) == 0 def _tls_init(self): if not hasattr(_tls, 'rsock'): @@ -935,54 +935,54 @@ class Latch(object): _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) - self.lock.acquire() + self._lock.acquire() try: if self.closed: raise LatchError() - if self.queue and not self.wake_socks: - _vv and IOLOG.debug('%r.get() -> %r', self, self.queue[0]) - return self.queue.pop(0) + if self._queue and not self._waiters: + _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0]) + return self._queue.pop(0) if not block: raise TimeoutError() self._tls_init() - self.wake_socks.append(_tls.wsock) + self._waiters.append(_tls.wsock) finally: - self.lock.release() + self._lock.release() _vv and IOLOG.debug('%r.get() -> sleeping', self) rfds, _, _ = restart(select.select, [_tls.rsock], [], [], timeout) assert len(rfds) or timeout is not None - self.lock.acquire() + self._lock.acquire() try: - self.wake_socks.remove(_tls.wsock) + self._waiters.remove(_tls.wsock) if self.closed: raise LatchError() if not rfds: raise TimeoutError() assert _tls.rsock.recv(1) == '\x7f' try: - _vv and IOLOG.debug('%r.get() wake -> %r', self, self.queue[0]) - return self.queue.pop(0) + _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0]) + return self._queue.pop(0) except IndexError: IOLOG.exception('%r.get() INDEX ERROR', self) raise finally: - self.lock.release() + self._lock.release() def put(self, obj): _vv and IOLOG.debug('%r.put(%r)', self, obj) - self.lock.acquire() + self._lock.acquire() try: if self.closed: raise LatchError() - self.queue.append(obj) - if self.wake_socks: + self._queue.append(obj) + if self._waiters: _vv and IOLOG.debug('%r.put() -> waking wfd=%r', - self, self.wake_socks[0].fileno()) - self._wake(self.wake_socks[0]) + self, self._waiters[0].fileno()) + self._wake(self._waiters[0]) finally: - self.lock.release() + self._lock.release() def _wake(self, sock): try: @@ -996,7 +996,7 @@ class Latch(object): wsock = getattr(_tls, 'wsock', None) return 'Latch(%#x, size=%d, t=%r, r=%r, w=%r)' % ( id(self), - len(self.queue), + len(self._queue), threading.currentThread().name, rsock and rsock.fileno(), wsock and wsock.fileno(),