issue #413: paper over harmless duplicate del_route()

Ideally it would only be called once, and in future maybe it can, but
right now we need to cope with these cases:

* Downstream parent notifies us of disconnection (DEL_ROUTE)
* We notify ourself of disconnection
* We notify ourself and so does downstream parent

It's case 3 that causes the error.
issue260
David Wilson 6 years ago
parent fea0fb41fc
commit 76ec4f201c

@ -303,12 +303,9 @@ class ContextService(mitogen.service.Service):
longer reachable context. This method runs in the Broker thread and longer reachable context. This method runs in the Broker thread and
must not to block. must not to block.
""" """
# TODO: there is a race between creation of a context and disconnection
# of its related stream. An error reply should be sent to any message
# in _latches_by_key below.
self._lock.acquire() self._lock.acquire()
try: try:
LOG.info('Forgetting %r due to stream disconnect', context) LOG.info('%r: Forgetting %r due to stream disconnect', self, context)
self._forget_context_unlocked(context) self._forget_context_unlocked(context)
finally: finally:
self._lock.release() self._lock.release()

@ -2104,8 +2104,8 @@ class Router(object):
def _on_del_route(self, msg): def _on_del_route(self, msg):
""" """
Stub DEL_ROUTE handler; fires 'disconnect' events on the corresponding Stub :data:`DEL_ROUTE` handler; fires 'disconnect' events on the
member of :attr:`_context_by_id`. This handler is replaced by corresponding :attr:`_context_by_id` member. This is replaced by
:class:`mitogen.parent.RouteMonitor` in an upgraded context. :class:`mitogen.parent.RouteMonitor` in an upgraded context.
""" """
LOG.error('%r._on_del_route() %r', self, msg) LOG.error('%r._on_del_route() %r', self, msg)

@ -1558,6 +1558,9 @@ class RouteMonitor(object):
#: stream; used to cleanup routes during disconnection. #: stream; used to cleanup routes during disconnection.
self._routes_by_stream = {} self._routes_by_stream = {}
def __repr__(self):
return 'RouteMonitor()'
def _send_one(self, stream, handle, target_id, name): def _send_one(self, stream, handle, target_id, name):
""" """
Compose and send an update message on a stream. Compose and send an update message on a stream.
@ -1644,7 +1647,8 @@ class RouteMonitor(object):
Respond to disconnection of a local stream by Respond to disconnection of a local stream by
""" """
routes = self._routes_by_stream.pop(stream) routes = self._routes_by_stream.pop(stream)
LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes) LOG.debug('%r: %r is gone; propagating DEL_ROUTE for %r',
self, stream, routes)
for target_id in routes: for target_id in routes:
self.router.del_route(target_id) self.router.del_route(target_id)
self._propagate_up(mitogen.core.DEL_ROUTE, target_id) self._propagate_up(mitogen.core.DEL_ROUTE, target_id)
@ -1692,18 +1696,21 @@ class RouteMonitor(object):
target_id = int(msg.data) target_id = int(msg.data)
registered_stream = self.router.stream_by_id(target_id) registered_stream = self.router.stream_by_id(target_id)
if registered_stream is None:
return
stream = self.router.stream_by_id(msg.auth_id) stream = self.router.stream_by_id(msg.auth_id)
if registered_stream != stream: if registered_stream != stream:
LOG.error('Received DEL_ROUTE for %d from %r, expected %r', LOG.error('%r: received DEL_ROUTE for %d from %r, expected %r',
target_id, stream, registered_stream) self, target_id, stream, registered_stream)
return return
context = self.router.context_by_id(target_id, create=False) context = self.router.context_by_id(target_id, create=False)
if context: if context:
LOG.debug('%r: Firing local disconnect for %r', self, context) LOG.debug('%r: firing local disconnect for %r', self, context)
mitogen.core.fire(context, 'disconnect') mitogen.core.fire(context, 'disconnect')
LOG.debug('Deleting route to %d via %r', target_id, stream) LOG.debug('%r: deleting route to %d via %r', self, target_id, stream)
routes = self._routes_by_stream.get(stream) routes = self._routes_by_stream.get(stream)
if routes: if routes:
routes.discard(target_id) routes.discard(target_id)

Loading…
Cancel
Save