diff --git a/mitogen/core.py b/mitogen/core.py index c11d7065..ac4a1bef 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -188,7 +188,7 @@ def is_blacklisted_import(importer, fullname): treated as blacklisted. """ return ((not any(fullname.startswith(s) for s in importer.whitelist)) or - (any(fullname.startswith(s) for s in importer.blacklist))) + (any(fullname.startswith(s) for s in importer.blacklist))) def set_cloexec(fd): @@ -913,30 +913,36 @@ class Latch(object): self.queue = [] self.wake_socks = [] - def _tls_init(self): - if not hasattr(_tls, 'rsock'): - _tls.rsock, _tls.wsock = socket.socketpair() - set_cloexec(_tls.rsock.fileno()) - set_cloexec(_tls.wsock.fileno()) - def close(self): self.lock.acquire() try: self.closed = True - for sock in self.wake_socks: - self._wake(sock) + while self.wake_socks: + self._wake(self.wake_socks.pop()) finally: self.lock.release() def empty(self): return len(self.queue) == 0 + def _tls_init(self): + if not hasattr(_tls, 'rsock'): + _tls.rsock, _tls.wsock = socket.socketpair() + set_cloexec(_tls.rsock.fileno()) + set_cloexec(_tls.wsock.fileno()) + def get(self, timeout=None, block=True): + _vv and IOLOG.debug( + '%r.get(timeout=%r, block=%r)', + self, timeout, block + ) + self.lock.acquire() try: if self.closed: raise LatchError() if self.queue: + _vv and IOLOG.debug('%r.get() -> %r', self, self.queue[0]) return self.queue.pop(0) if not block: raise TimeoutError() @@ -945,6 +951,7 @@ class Latch(object): finally: self.lock.release() + _vv and IOLOG.debug('%r.get() -> sleeping', self) rfds, _, _ = restart(select.select, [_tls.rsock], [], [], timeout) assert len(rfds) or timeout is not None @@ -958,7 +965,8 @@ class Latch(object): raise TimeoutError() assert _tls.rsock in rfds - _tls.rsock.recv(1) + assert _tls.rsock.recv(1) == '\x7f' + _vv and IOLOG.debug('%r.get() wake -> %r', self, self.queue[0]) return self.queue.pop(0) finally: self.lock.release() @@ -979,11 +987,21 @@ class Latch(object): def _wake(self, sock): try: - os.write(sock.fileno(), '\x00') + os.write(sock.fileno(), '\x7f') except OSError, e: if e[0] != errno.EBADF: raise + def __repr__(self): + rsock = getattr(_tls, 'rsock', None) + wsock = getattr(_tls, 'wsock', None) + return 'Latch(%r, t=%r, r=%r, w=%r)' % ( + id(self), + threading.currentThread().name, + rsock and rsock.fileno(), + wsock and wsock.fileno(), + ) + class Waker(BasicStream): """