From 22cc1a3689ce8eb9d05ae19ceea496799a5160d9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 25 Mar 2018 11:40:05 +0545 Subject: [PATCH] issue #155: core: refactor Latch to avoid TLS use TLS destructors are not called after fork, therefore we must explicitly track a global list of free file descriptors, and arrange for that list to explicitly be destroyed from fork.py. --- mitogen/core.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index e646157e..e43ecb42 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -899,12 +899,20 @@ def _unpickle_context(router, context_id, name): class Latch(object): closed = False _waking = 0 + _sockets = [] def __init__(self): self._lock = threading.Lock() self._queue = [] self._sleeping = [] + @classmethod + def _on_fork(cls): + while cls._sockets: + rsock, wsock = cls._sockets.pop() + rsock.close() + wsock.close() + def close(self): self._lock.acquire() try: @@ -919,10 +927,12 @@ class Latch(object): return len(self._queue) == 0 def _tls_init(self): - if not hasattr(_tls, 'rsock'): - _tls.rsock, _tls.wsock = socket.socketpair() - set_cloexec(_tls.rsock.fileno()) - set_cloexec(_tls.wsock.fileno()) + if self._sockets: + return self._sockets.pop() + rsock, wsock = socket.socketpair() + set_cloexec(rsock.fileno()) + set_cloexec(wsock.fileno()) + return rsock, wsock def get(self, timeout=None, block=True): _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', @@ -938,30 +948,31 @@ class Latch(object): return self._queue.pop(i) if not block: raise TimeoutError() - self._tls_init() - self._sleeping.append(_tls.wsock) + rsock, wsock = self._tls_init() + self._sleeping.append(wsock) finally: self._lock.release() - return self._get_sleep(timeout, block) + return self._get_sleep(timeout, block, rsock, wsock) - def _get_sleep(self, timeout, block): + def _get_sleep(self, timeout, block, rsock, wsock): _vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)', self, timeout, block) e = None try: - io_op(select.select, [_tls.rsock], [], [], timeout) + io_op(select.select, [rsock], [], [], timeout) except Exception, e: pass self._lock.acquire() try: - i = self._sleeping.index(_tls.wsock) + i = self._sleeping.index(wsock) del self._sleeping[i] + self._sockets.append((rsock, wsock)) if i >= self._waking: raise TimeoutError() self._waking -= 1 - if _tls.rsock.recv(2) != '\x7f': + if rsock.recv(2) != '\x7f': raise LatchError('internal error: received >1 wakeups') if e: raise e