Merge remote-tracking branch 'origin/dmw'

- #413
issue260
David Wilson 6 years ago
commit 1064778dfd

@ -303,12 +303,9 @@ class ContextService(mitogen.service.Service):
longer reachable context. This method runs in the Broker thread and longer reachable context. This method runs in the Broker thread and
must not to block. must not to block.
""" """
# TODO: there is a race between creation of a context and disconnection
# of its related stream. An error reply should be sent to any message
# in _latches_by_key below.
self._lock.acquire() self._lock.acquire()
try: try:
LOG.info('Forgetting %r due to stream disconnect', context) LOG.info('%r: Forgetting %r due to stream disconnect', self, context)
self._forget_context_unlocked(context) self._forget_context_unlocked(context)
finally: finally:
self._lock.release() self._lock.release()

@ -201,7 +201,8 @@ Core Library
receivers to wake with :class:`mitogen.core.ChannelError`, even when one receivers to wake with :class:`mitogen.core.ChannelError`, even when one
participant is not a parent of the other. participant is not a parent of the other.
* `#387 <https://github.com/dw/mitogen/issues/387>`_: dead messages include an * `#387 <https://github.com/dw/mitogen/issues/387>`_,
`#413 <https://github.com/dw/mitogen/issues/413>`_: dead messages include an
optional reason in their body. This is used to cause optional reason in their body. This is used to cause
:class:`mitogen.core.ChannelError` to report far more useful diagnostics at :class:`mitogen.core.ChannelError` to report far more useful diagnostics at
the point the error occurs that previously would have been buried in debug the point the error occurs that previously would have been buried in debug
@ -219,7 +220,8 @@ Core Library
* `#406 <https://github.com/dw/mitogen/issues/406>`_: connections could leak * `#406 <https://github.com/dw/mitogen/issues/406>`_: connections could leak
FDs when a child process failed to start. FDs when a child process failed to start.
* `#406 <https://github.com/dw/mitogen/issues/406>`_, * `#288 <https://github.com/dw/mitogen/issues/288>`_,
`#406 <https://github.com/dw/mitogen/issues/406>`_,
`#417 <https://github.com/dw/mitogen/issues/417>`_: connections could leave `#417 <https://github.com/dw/mitogen/issues/417>`_: connections could leave
FD wrapper objects that had not been closed lying around to be closed during FD wrapper objects that had not been closed lying around to be closed during
garbage collection, causing reused FD numbers to be closed at random moments. garbage collection, causing reused FD numbers to be closed at random moments.

@ -2104,8 +2104,8 @@ class Router(object):
def _on_del_route(self, msg): def _on_del_route(self, msg):
""" """
Stub DEL_ROUTE handler; fires 'disconnect' events on the corresponding Stub :data:`DEL_ROUTE` handler; fires 'disconnect' events on the
member of :attr:`_context_by_id`. This handler is replaced by corresponding :attr:`_context_by_id` member. This is replaced by
:class:`mitogen.parent.RouteMonitor` in an upgraded context. :class:`mitogen.parent.RouteMonitor` in an upgraded context.
""" """
LOG.error('%r._on_del_route() %r', self, msg) LOG.error('%r._on_del_route() %r', self, msg)

@ -1558,6 +1558,9 @@ class RouteMonitor(object):
#: stream; used to cleanup routes during disconnection. #: stream; used to cleanup routes during disconnection.
self._routes_by_stream = {} self._routes_by_stream = {}
def __repr__(self):
return 'RouteMonitor()'
def _send_one(self, stream, handle, target_id, name): def _send_one(self, stream, handle, target_id, name):
""" """
Compose and send an update message on a stream. Compose and send an update message on a stream.
@ -1644,7 +1647,8 @@ class RouteMonitor(object):
Respond to disconnection of a local stream by Respond to disconnection of a local stream by
""" """
routes = self._routes_by_stream.pop(stream) routes = self._routes_by_stream.pop(stream)
LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes) LOG.debug('%r: %r is gone; propagating DEL_ROUTE for %r',
self, stream, routes)
for target_id in routes: for target_id in routes:
self.router.del_route(target_id) self.router.del_route(target_id)
self._propagate_up(mitogen.core.DEL_ROUTE, target_id) self._propagate_up(mitogen.core.DEL_ROUTE, target_id)
@ -1692,18 +1696,21 @@ class RouteMonitor(object):
target_id = int(msg.data) target_id = int(msg.data)
registered_stream = self.router.stream_by_id(target_id) registered_stream = self.router.stream_by_id(target_id)
if registered_stream is None:
return
stream = self.router.stream_by_id(msg.auth_id) stream = self.router.stream_by_id(msg.auth_id)
if registered_stream != stream: if registered_stream != stream:
LOG.error('Received DEL_ROUTE for %d from %r, expected %r', LOG.error('%r: received DEL_ROUTE for %d from %r, expected %r',
target_id, stream, registered_stream) self, target_id, stream, registered_stream)
return return
context = self.router.context_by_id(target_id, create=False) context = self.router.context_by_id(target_id, create=False)
if context: if context:
LOG.debug('%r: Firing local disconnect for %r', self, context) LOG.debug('%r: firing local disconnect for %r', self, context)
mitogen.core.fire(context, 'disconnect') mitogen.core.fire(context, 'disconnect')
LOG.debug('Deleting route to %d via %r', target_id, stream) LOG.debug('%r: deleting route to %d via %r', self, target_id, stream)
routes = self._routes_by_stream.get(stream) routes = self._routes_by_stream.get(stream)
if routes: if routes:
routes.discard(target_id) routes.discard(target_id)

Loading…
Cancel
Save