|
|
@ -1431,6 +1431,29 @@ class Context(mitogen.core.Context):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RouteMonitor(object):
|
|
|
|
class RouteMonitor(object):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
Generate and respond to :data:`mitogen.core.ADD_ROUTE` and
|
|
|
|
|
|
|
|
:data:`mitogen.core.DEL_ROUTE` messages sent to the local context by
|
|
|
|
|
|
|
|
maintaining a table of available routes, and propagating messages towards
|
|
|
|
|
|
|
|
parents and siblings as appropriate.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:class:`RouteMonitor` is responsible for generating routing messages for
|
|
|
|
|
|
|
|
directly attached children. It learns of new children via
|
|
|
|
|
|
|
|
:meth:`notice_stream` called by :class:`Router`, and subscribes to their
|
|
|
|
|
|
|
|
``disconnect`` event to learn when they disappear.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
In children, constructing this class overwrites the stub
|
|
|
|
|
|
|
|
:data:`mitogen.core.DEL_ROUTE` handler installed by
|
|
|
|
|
|
|
|
:class:`mitogen.core.ExternalContext`, which is expected behaviour when a
|
|
|
|
|
|
|
|
child is beging upgraded in preparation to become a parent of children of
|
|
|
|
|
|
|
|
its own.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:param Router router:
|
|
|
|
|
|
|
|
Router to install handlers on.
|
|
|
|
|
|
|
|
:param Context parent:
|
|
|
|
|
|
|
|
:data:`None` in the master process, or reference to the parent context
|
|
|
|
|
|
|
|
we should propagate route updates towards.
|
|
|
|
|
|
|
|
"""
|
|
|
|
def __init__(self, router, parent=None):
|
|
|
|
def __init__(self, router, parent=None):
|
|
|
|
self.router = router
|
|
|
|
self.router = router
|
|
|
|
self.parent = parent
|
|
|
|
self.parent = parent
|
|
|
@ -1451,6 +1474,18 @@ class RouteMonitor(object):
|
|
|
|
self._routes_by_stream = {}
|
|
|
|
self._routes_by_stream = {}
|
|
|
|
|
|
|
|
|
|
|
|
def _send_one(self, stream, handle, target_id, name):
|
|
|
|
def _send_one(self, stream, handle, target_id, name):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
Compose and send an update message on a stream.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:param mitogen.core.Stream stream:
|
|
|
|
|
|
|
|
Stream to send it on.
|
|
|
|
|
|
|
|
:param int handle:
|
|
|
|
|
|
|
|
:data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE`
|
|
|
|
|
|
|
|
:param int target_id:
|
|
|
|
|
|
|
|
ID of the connecting or disconnecting context.
|
|
|
|
|
|
|
|
:param str name:
|
|
|
|
|
|
|
|
Context name or :data:`None`.
|
|
|
|
|
|
|
|
"""
|
|
|
|
data = str(target_id)
|
|
|
|
data = str(target_id)
|
|
|
|
if name:
|
|
|
|
if name:
|
|
|
|
data = '%s:%s' % (target_id, mitogen.core.b(name))
|
|
|
|
data = '%s:%s' % (target_id, mitogen.core.b(name))
|
|
|
@ -1462,20 +1497,34 @@ class RouteMonitor(object):
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def _propagate(self, handle, target_id, name=None):
|
|
|
|
def _propagate_up(self, handle, target_id, name=None):
|
|
|
|
if not self.parent:
|
|
|
|
"""
|
|
|
|
# self.parent is None in the master.
|
|
|
|
In a non-master context, propagate an update towards the master.
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:param int handle:
|
|
|
|
|
|
|
|
:data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE`
|
|
|
|
|
|
|
|
:param int target_id:
|
|
|
|
|
|
|
|
ID of the connecting or disconnecting context.
|
|
|
|
|
|
|
|
:param str name:
|
|
|
|
|
|
|
|
For :data:`mitogen.core.ADD_ROUTE`, the name of the new context
|
|
|
|
|
|
|
|
assigned by its parent. This is used by parents to assign the
|
|
|
|
|
|
|
|
:attr:`mitogen.core.Context.name` attribute.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
if self.parent:
|
|
|
|
stream = self.router.stream_by_id(self.parent.context_id)
|
|
|
|
stream = self.router.stream_by_id(self.parent.context_id)
|
|
|
|
self._send_one(stream, handle, target_id, name)
|
|
|
|
self._send_one(stream, handle, target_id, name)
|
|
|
|
|
|
|
|
|
|
|
|
def _child_propagate(self, handle, target_id):
|
|
|
|
def _propagate_down(self, handle, target_id):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
For DEL_ROUTE, we additionally want to broadcast the message to any
|
|
|
|
For DEL_ROUTE, we additionally want to broadcast the message to any
|
|
|
|
stream that has ever communicated with the disconnecting ID, so
|
|
|
|
stream that has ever communicated with the disconnecting ID, so
|
|
|
|
core.py's :meth:`mitogen.core.Router._on_del_route` can turn the
|
|
|
|
core.py's :meth:`mitogen.core.Router._on_del_route` can turn the
|
|
|
|
message into a disconnect event.
|
|
|
|
message into a disconnect event.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:param int handle:
|
|
|
|
|
|
|
|
:data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE`
|
|
|
|
|
|
|
|
:param int target_id:
|
|
|
|
|
|
|
|
ID of the connecting or disconnecting context.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
for stream in itervalues(self.router._stream_by_id):
|
|
|
|
for stream in itervalues(self.router._stream_by_id):
|
|
|
|
if target_id in stream.egress_ids:
|
|
|
|
if target_id in stream.egress_ids:
|
|
|
@ -1488,7 +1537,7 @@ class RouteMonitor(object):
|
|
|
|
if/when that child disconnects.
|
|
|
|
if/when that child disconnects.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
self._routes_by_stream[stream] = set([stream.remote_id])
|
|
|
|
self._routes_by_stream[stream] = set([stream.remote_id])
|
|
|
|
self._propagate(mitogen.core.ADD_ROUTE, stream.remote_id,
|
|
|
|
self._propagate_up(mitogen.core.ADD_ROUTE, stream.remote_id,
|
|
|
|
stream.name)
|
|
|
|
stream.name)
|
|
|
|
mitogen.core.listen(
|
|
|
|
mitogen.core.listen(
|
|
|
|
obj=stream,
|
|
|
|
obj=stream,
|
|
|
@ -1513,14 +1562,19 @@ class RouteMonitor(object):
|
|
|
|
LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes)
|
|
|
|
LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes)
|
|
|
|
for target_id in routes:
|
|
|
|
for target_id in routes:
|
|
|
|
self.router.del_route(target_id)
|
|
|
|
self.router.del_route(target_id)
|
|
|
|
self._propagate(mitogen.core.DEL_ROUTE, target_id)
|
|
|
|
self._propagate_up(mitogen.core.DEL_ROUTE, target_id)
|
|
|
|
self._child_propagate(mitogen.core.DEL_ROUTE, target_id)
|
|
|
|
self._propagate_down(mitogen.core.DEL_ROUTE, target_id)
|
|
|
|
|
|
|
|
|
|
|
|
context = self.router.context_by_id(target_id, create=False)
|
|
|
|
context = self.router.context_by_id(target_id, create=False)
|
|
|
|
if context:
|
|
|
|
if context:
|
|
|
|
mitogen.core.fire(context, 'disconnect')
|
|
|
|
mitogen.core.fire(context, 'disconnect')
|
|
|
|
|
|
|
|
|
|
|
|
def _on_add_route(self, msg):
|
|
|
|
def _on_add_route(self, msg):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
Respond to :data:`mitogen.core.ADD_ROUTE` by validating the source of
|
|
|
|
|
|
|
|
the message, updating the local table, and propagating the message
|
|
|
|
|
|
|
|
upwards.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if msg.is_dead:
|
|
|
|
if msg.is_dead:
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
@ -1539,9 +1593,15 @@ class RouteMonitor(object):
|
|
|
|
LOG.debug('Adding route to %d via %r', target_id, stream)
|
|
|
|
LOG.debug('Adding route to %d via %r', target_id, stream)
|
|
|
|
self._routes_by_stream[stream].add(target_id)
|
|
|
|
self._routes_by_stream[stream].add(target_id)
|
|
|
|
self.router.add_route(target_id, stream)
|
|
|
|
self.router.add_route(target_id, stream)
|
|
|
|
self._propagate(mitogen.core.ADD_ROUTE, target_id, target_name)
|
|
|
|
self._propagate_up(mitogen.core.ADD_ROUTE, target_id, target_name)
|
|
|
|
|
|
|
|
|
|
|
|
def _on_del_route(self, msg):
|
|
|
|
def _on_del_route(self, msg):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
Respond to :data:`mitogen.core.DEL_ROUTE` by validating the source of
|
|
|
|
|
|
|
|
the message, updating the local table, propagating the message
|
|
|
|
|
|
|
|
upwards, and downwards towards any stream that every had a message
|
|
|
|
|
|
|
|
forwarded from it towards the disconnecting context.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if msg.is_dead:
|
|
|
|
if msg.is_dead:
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
@ -1565,8 +1625,8 @@ class RouteMonitor(object):
|
|
|
|
|
|
|
|
|
|
|
|
self.router.del_route(target_id)
|
|
|
|
self.router.del_route(target_id)
|
|
|
|
if stream.remote_id != mitogen.parent_id:
|
|
|
|
if stream.remote_id != mitogen.parent_id:
|
|
|
|
self._propagate(mitogen.core.DEL_ROUTE, target_id)
|
|
|
|
self._propagate_up(mitogen.core.DEL_ROUTE, target_id)
|
|
|
|
self._child_propagate(mitogen.core.DEL_ROUTE, target_id)
|
|
|
|
self._propagate_down(mitogen.core.DEL_ROUTE, target_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Router(mitogen.core.Router):
|
|
|
|
class Router(mitogen.core.Router):
|
|
|
|