core: reduce chance of Latch.read()/write()/close() race.

Previously it was possible for a thread to call Waker.defer() after
Broker has torns its Waker down, and the underlying file descriptor
reallocated by the OS to some other component.

This manifested as latches of a subsequent test invocation receiving the
waker byte (' ') rather than their expected byte '\x7f'.

This doesn't fix the problem, it just significantly reduces the chance
of it occurring. In future Side.write()/read()/close() must be
synchronized with a lock.

Previously the problem could be reliably triggered with:

    while :; do
        python tests/call_function_test.py -vf CallFunctionTest.{test_aborted_on_local_broker_shutdown,test_aborted_on_local_context_disconnect}
    done
pull/295/head
David Wilson 6 years ago
parent 3e48f95439
commit ff2f44b046

@ -869,17 +869,23 @@ class Side(object):
def close(self): def close(self):
if not self.closed: if not self.closed:
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
os.close(self.fd)
self.closed = True self.closed = True
os.close(self.fd)
def read(self, n=CHUNK_SIZE): def read(self, n=CHUNK_SIZE):
if self.closed:
# Refuse to touch the handle after closed, it may have been reused
# by another thread. TODO: synchronize read()/write()/close().
return b('')
s, disconnected = io_op(os.read, self.fd, n) s, disconnected = io_op(os.read, self.fd, n)
if disconnected: if disconnected:
return '' return b('')
return s return s
def write(self, s): def write(self, s):
if self.fd is None: if self.closed or self.fd is None:
# Refuse to touch the handle after closed, it may have been reused
# by another thread.
return None return None
written, disconnected = io_op(os.write, self.fd, s) written, disconnected = io_op(os.write, self.fd, s)
@ -1303,11 +1309,13 @@ class Latch(object):
When a result is not immediately available, sleep waiting for When a result is not immediately available, sleep waiting for
:meth:`put` to write a byte to our socket pair. :meth:`put` to write a byte to our socket pair.
""" """
_vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)', _vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)',
self, timeout, block) self, timeout, block, rsock.fileno(),
wsock.fileno())
e = None e = None
woken = None
try: try:
l = list(poller.poll(timeout)) woken = list(poller.poll(timeout))
except Exception: except Exception:
e = sys.exc_info()[1] e = sys.exc_info()[1]
@ -1317,7 +1325,9 @@ class Latch(object):
del self._sleeping[i] del self._sleeping[i]
self._sockets.append((rsock, wsock)) self._sockets.append((rsock, wsock))
if i >= self._waking: if i >= self._waking:
raise e or TimeoutError(repr(l)) recv = rsock.recv(10) if woken else None
s = '%r: woken=%r, recv=%r' % (self, woken, recv)
raise e or TimeoutError(s)
self._waking -= 1 self._waking -= 1
byte = rsock.recv(10) byte = rsock.recv(10)
if byte != b('\x7f'): if byte != b('\x7f'):

Loading…
Cancel
Save