master: implement ModuleResponder.forward_module().

pull/262/head
David Wilson 6 years ago
parent a578250bfb
commit f7b368b1fb

@ -538,11 +538,41 @@ class ModuleResponder(object):
self._cache[fullname] = tup
return tup
def _send_load_module(self, stream, msg, fullname):
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
msg.reply(self._build_tuple(fullname),
handle=mitogen.core.LOAD_MODULE)
stream.sent_modules.add(fullname)
def _send_load_module(self, stream, fullname):
if fullname not in stream.sent_modules:
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
self._router._async_route(
mitogen.core.Message.pickled(
self._build_tuple(fullname),
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
)
stream.sent_modules.add(fullname)
def _send_module_load_failed(self, stream, fullname):
stream.send(
mitogen.core.Message.pickled(
(fullname, None, None, None, ()),
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _send_module_and_related(self, stream, fullname):
try:
tup = self._build_tuple(fullname)
for name in tup[4]: # related
parent, _, _ = name.partition('.')
if parent != fullname and parent not in stream.sent_modules:
# Parent hasn't been sent, so don't load submodule yet.
continue
self._send_load_module(stream, name)
self._send_load_module(stream, fullname)
except Exception:
LOG.debug('While importing %r', fullname, exc_info=True)
self._send_module_load_failed(stream, fullname)
def _on_get_module(self, msg):
if msg.is_dead:
@ -555,25 +585,31 @@ class ModuleResponder(object):
LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream)
try:
tup = self._build_tuple(fullname)
for name in tup[4]: # related
parent, _, _ = name.partition('.')
if parent != fullname and parent not in stream.sent_modules:
# Parent hasn't been sent, so don't load submodule yet.
continue
self._send_module_and_related(stream, fullname)
if name in stream.sent_modules:
# Submodule has been sent already, skip.
continue
def _send_forward_module(self, stream, context, fullname):
if stream.remote_id != context.context_id:
stream.send(
mitogen.core.Message(
data='%s\x00%s' % (context.context_id, fullname),
handle=mitogen.core.FORWARD_MODULE,
)
)
self._send_load_module(stream, msg, name)
self._send_load_module(stream, msg, fullname)
def _forward_module(self, context, fullname):
LOG.debug('%r._forward_module(%r, %r)', self, context, fullname)
path = []
while fullname:
path.append(fullname)
fullname, _, _ = fullname.rpartition('.')
except Exception:
LOG.debug('While importing %r', fullname, exc_info=True)
msg.reply((fullname, None, None, None, ()),
handle=mitogen.core.LOAD_MODULE)
for fullname in reversed(path):
stream = self._router.stream_by_id(context.context_id)
self._send_module_and_related(stream, fullname)
self._send_forward_module(stream, context, fullname)
def forward_module(self, context, fullname):
self._router.broker.defer(self._forward_module, context, fullname)
class Broker(mitogen.core.Broker):

Loading…
Cancel
Save