issue #498: wrap Router dict mutations in a lock

issue510
David Wilson 6 years ago
parent 60fe3fd6f5
commit e2478dcb9f

@ -2423,9 +2423,10 @@ class Router(object):
listen(broker, 'exit', self._on_broker_exit)
self._setup_logging()
#: context ID -> Stream
self._write_lock = threading.Lock()
#: context ID -> Stream; must hold _write_lock to edit or iterate
self._stream_by_id = {}
#: List of contexts to notify of shutdown.
#: List of contexts to notify of shutdown; must hold _write_lock
self._context_by_id = {}
self._last_handle = itertools.count(1000)
#: handle -> (persistent?, func(msg))
@ -2456,20 +2457,30 @@ class Router(object):
:class:`mitogen.parent.RouteMonitor` in an upgraded context.
"""
LOG.error('%r._on_del_route() %r', self, msg)
if not msg.is_dead:
target_id_s, _, name = bytes_partition(msg.data, b(':'))
target_id = int(target_id_s, 10)
if target_id not in self._context_by_id:
LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg)
if msg.is_dead:
return
fire(self._context_by_id[target_id], 'disconnect')
target_id_s, _, name = bytes_partition(msg.data, b(':'))
context = self._context_by_id.get(int(target_id_s, 10))
if context:
fire(context, 'disconnect')
else:
LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg)
def _on_stream_disconnect(self, stream):
for context in self._context_by_id.values():
notify = []
self._write_lock.acquire()
try:
for context in list(self._context_by_id.values()):
stream_ = self._stream_by_id.get(context.context_id)
if stream_ is stream:
del self._stream_by_id[context.context_id]
notify.append(context)
finally:
self._write_lock.release()
# Happens outside lock as e.g. RouteMonitor wants the same lock.
for context in notify:
context.on_disconnect()
broker_exit_msg = 'Broker has exitted'
@ -2492,14 +2503,27 @@ class Router(object):
def context_by_id(self, context_id, via_id=None, create=True, name=None):
"""
Messy factory/lookup function to find a context by its ID, or construct
it. In future this will be replaced by a much more sensible interface.
it. This will eventually be replaced by a more sensible interface.
"""
context = self._context_by_id.get(context_id)
if context:
return context
if create and via_id is not None:
via = self.context_by_id(via_id)
else:
via = None
self._write_lock.acquire()
try:
context = self._context_by_id.get(context_id)
if create and not context:
context = self.context_class(self, context_id, name=name)
if via_id is not None:
context.via = self.context_by_id(via_id)
context.via = via
self._context_by_id[context_id] = context
finally:
self._write_lock.release()
return context
def register(self, context, stream):
@ -2509,8 +2533,13 @@ class Router(object):
public while the design has not yet settled.
"""
_v and LOG.debug('register(%r, %r)', context, stream)
self._write_lock.acquire()
try:
self._stream_by_id[context.context_id] = stream
self._context_by_id[context.context_id] = context
finally:
self._write_lock.release()
self.broker.start_receive(stream)
listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream))
@ -2520,8 +2549,10 @@ class Router(object):
`dst_id`. If a specific route for `dst_id` is not known, a reference to
the parent context's stream is returned.
"""
parent = self._stream_by_id.get(mitogen.parent_id)
return self._stream_by_id.get(dst_id, parent)
return (
self._stream_by_id.get(dst_id) or
self._stream_by_id.get(mitogen.parent_id)
)
def del_handler(self, handle):
"""

@ -1688,6 +1688,10 @@ class RouteMonitor(object):
child is beging upgraded in preparation to become a parent of children of
its own.
By virtue of only being active while responding to messages from a handler,
RouteMonitor lives entirely on the broker thread, so its data requires no
locking.
:param Router router:
Router to install handlers on.
:param Context parent:
@ -1697,6 +1701,9 @@ class RouteMonitor(object):
def __init__(self, router, parent=None):
self.router = router
self.parent = parent
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.
self._routes_by_stream = {}
self.router.add_handler(
fn=self._on_add_route,
handle=mitogen.core.ADD_ROUTE,
@ -1711,9 +1718,6 @@ class RouteMonitor(object):
policy=is_immediate_child,
overwrite=True,
)
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.
self._routes_by_stream = {}
def __repr__(self):
return 'RouteMonitor()'
@ -1775,7 +1779,7 @@ class RouteMonitor(object):
:param int target_id:
ID of the connecting or disconnecting context.
"""
for stream in itervalues(self.router._stream_by_id):
for stream in self.router.get_streams():
if target_id in stream.egress_ids:
self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None)
@ -1918,6 +1922,16 @@ class Router(mitogen.core.Router):
stream.detached = True
msg.reply(None)
def get_streams(self):
"""
Return a snapshot of all streams in existence at time of call.
"""
self._write_lock.acquire()
try:
return itervalues(self._stream_by_id)
finally:
self._write_lock.release()
def add_route(self, target_id, stream):
"""
Arrange for messages whose `dst_id` is `target_id` to be forwarded on
@ -1929,11 +1943,12 @@ class Router(mitogen.core.Router):
LOG.debug('%r.add_route(%r, %r)', self, target_id, stream)
assert isinstance(target_id, int)
assert isinstance(stream, Stream)
self._write_lock.acquire()
try:
self._stream_by_id[target_id] = stream
except KeyError:
LOG.error('%r: cant add route to %r via %r: no such stream',
self, target_id, stream)
finally:
self._write_lock.release()
def del_route(self, target_id):
LOG.debug('%r.del_route(%r)', self, target_id)
@ -1942,7 +1957,11 @@ class Router(mitogen.core.Router):
# 'disconnect' event on the appropriate Context instance. In that case,
# we won't a matching _stream_by_id entry for the disappearing route,
# so don't raise an error for a missing key here.
self._write_lock.acquire()
try:
self._stream_by_id.pop(target_id, None)
finally:
self._write_lock.release()
def get_module_blacklist(self):
if mitogen.context_id == 0:
@ -2001,7 +2020,11 @@ class Router(mitogen.core.Router):
name = u'%s.%s' % (via_context.name, resp['name'])
context = self.context_class(self, resp['id'], name=name)
context.via = via_context
self._write_lock.acquire()
try:
self._context_by_id[context.context_id] = context
finally:
self._write_lock.release()
return context
def doas(self, **kwargs):

Loading…
Cancel
Save