diff --git a/mitogen/master.py b/mitogen/master.py index 9665377f..cce9a292 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -538,11 +538,41 @@ class ModuleResponder(object): self._cache[fullname] = tup return tup - def _send_load_module(self, stream, msg, fullname): - LOG.debug('_send_load_module(%r, %r)', stream, fullname) - msg.reply(self._build_tuple(fullname), - handle=mitogen.core.LOAD_MODULE) - stream.sent_modules.add(fullname) + def _send_load_module(self, stream, fullname): + if fullname not in stream.sent_modules: + LOG.debug('_send_load_module(%r, %r)', stream, fullname) + self._router._async_route( + mitogen.core.Message.pickled( + self._build_tuple(fullname), + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, + ) + ) + stream.sent_modules.add(fullname) + + def _send_module_load_failed(self, stream, fullname): + stream.send( + mitogen.core.Message.pickled( + (fullname, None, None, None, ()), + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, + ) + ) + + def _send_module_and_related(self, stream, fullname): + try: + tup = self._build_tuple(fullname) + for name in tup[4]: # related + parent, _, _ = name.partition('.') + if parent != fullname and parent not in stream.sent_modules: + # Parent hasn't been sent, so don't load submodule yet. + continue + + self._send_load_module(stream, name) + self._send_load_module(stream, fullname) + except Exception: + LOG.debug('While importing %r', fullname, exc_info=True) + self._send_module_load_failed(stream, fullname) def _on_get_module(self, msg): if msg.is_dead: @@ -555,25 +585,31 @@ class ModuleResponder(object): LOG.warning('_on_get_module(): dup request for %r from %r', fullname, stream) - try: - tup = self._build_tuple(fullname) - for name in tup[4]: # related - parent, _, _ = name.partition('.') - if parent != fullname and parent not in stream.sent_modules: - # Parent hasn't been sent, so don't load submodule yet. - continue + self._send_module_and_related(stream, fullname) - if name in stream.sent_modules: - # Submodule has been sent already, skip. - continue + def _send_forward_module(self, stream, context, fullname): + if stream.remote_id != context.context_id: + stream.send( + mitogen.core.Message( + data='%s\x00%s' % (context.context_id, fullname), + handle=mitogen.core.FORWARD_MODULE, + ) + ) - self._send_load_module(stream, msg, name) - self._send_load_module(stream, msg, fullname) + def _forward_module(self, context, fullname): + LOG.debug('%r._forward_module(%r, %r)', self, context, fullname) + path = [] + while fullname: + path.append(fullname) + fullname, _, _ = fullname.rpartition('.') - except Exception: - LOG.debug('While importing %r', fullname, exc_info=True) - msg.reply((fullname, None, None, None, ()), - handle=mitogen.core.LOAD_MODULE) + for fullname in reversed(path): + stream = self._router.stream_by_id(context.context_id) + self._send_module_and_related(stream, fullname) + self._send_forward_module(stream, context, fullname) + + def forward_module(self, context, fullname): + self._router.broker.defer(self._forward_module, context, fullname) class Broker(mitogen.core.Broker):