Get routing working well.

pull/35/head
David Wilson 7 years ago
parent 0a0a060492
commit 0d0c87f910

@ -563,7 +563,8 @@ class Context(object):
"""send `obj` to `handle`, and tell the broker we have output. May """send `obj` to `handle`, and tell the broker we have output. May
be called from any thread.""" be called from any thread."""
msg.dst_id = self.context_id msg.dst_id = self.context_id
msg.src_id = econtext.context_id if msg.src_id is None:
msg.src_id = econtext.context_id
self.router.route(msg) self.router.route(msg)
def send_await(self, msg, deadline=None): def send_await(self, msg, deadline=None):
@ -726,6 +727,7 @@ class Router(object):
context.on_shutdown() context.on_shutdown()
def add_route(self, target_id, via_id): def add_route(self, target_id, via_id):
LOG.info('add_route(%r, %r)', target_id, via_id)
try: try:
self._stream_by_id[target_id] = self._stream_by_id[via_id] self._stream_by_id[target_id] = self._stream_by_id[via_id]
except KeyError: except KeyError:
@ -733,10 +735,12 @@ class Router(object):
self, target_id, via_id) self, target_id, via_id)
def _on_add_route(self, msg): def _on_add_route(self, msg):
target_id, via_id = map(int, msg.data.split('\x00')) if msg != _DEAD:
self.add_route(target_id, via_id) target_id, via_id = map(int, msg.data.split('\x00'))
self.add_route(target_id, via_id)
def register(self, context, stream): def register(self, context, stream):
LOG.debug('register(%r, %r)', context, stream)
self._stream_by_id[context.context_id] = stream self._stream_by_id[context.context_id] = stream
self._context_by_id[context.context_id] = context self._context_by_id[context.context_id] = context
self.broker.start_receive(stream) self.broker.start_receive(stream)
@ -941,7 +945,6 @@ class ExternalContext(object):
os.close(100) os.close(100)
def _setup_logging(self, log_level): def _setup_logging(self, log_level):
return
logging.basicConfig(level=log_level) logging.basicConfig(level=log_level)
root = logging.getLogger() root = logging.getLogger()
root.setLevel(log_level) root.setLevel(log_level)
@ -1011,7 +1014,7 @@ class ExternalContext(object):
self._setup_importer() self._setup_importer()
self._setup_package(context_id) self._setup_package(context_id)
self._setup_stdio() self._setup_stdio()
LOG.debug('Connected to %s', self.context) LOG.debug('Connected to %s; my ID is %r', self.context, context_id)
self.router.register(self.context, self.stream) self.router.register(self.context, self.stream)
self._dispatch_calls() self._dispatch_calls()

@ -331,7 +331,8 @@ class Stream(econtext.core.Stream):
def get_preamble(self): def get_preamble(self):
source = inspect.getsource(econtext.core) source = inspect.getsource(econtext.core)
source += '\nExternalContext().main%r\n' % (( source += '\nExternalContext().main%r\n' % ((
econtext.context_id, # parent_id # econtext.context_id, # parent_id
0,
self.remote_id, # context_id self.remote_id, # context_id
self.key, self.key,
LOG.level or logging.getLogger().level or logging.INFO, LOG.level or logging.getLogger().level or logging.INFO,
@ -367,6 +368,8 @@ class Broker(econtext.core.Broker):
class Context(econtext.core.Context): class Context(econtext.core.Context):
via = None
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(Context, self).__init__(*args, **kwargs) super(Context, self).__init__(*args, **kwargs)
self.responder = ModuleResponder(self) self.responder = ModuleResponder(self)
@ -457,6 +460,20 @@ class Router(econtext.core.Router):
_proxy_connect, name, context_id, klass, kwargs _proxy_connect, name, context_id, klass, kwargs
) )
name = '%s.%s' % (via_context.name, name) name = '%s.%s' % (via_context.name, name)
print ['got name:', name] context = Context(self, context_id, name=name)
self.add_route(context_id, via.context_id) context.via = via_context
return Context(self, context_id, name=name)
child = via_context
parent = via_context.via
while parent is not None:
LOG.info('Adding route to %r for %r via %r', parent, context, child)
parent.send(
econtext.core.Message(
data='%s\x00%s' % (context_id, child.context_id),
handle=econtext.core.ADD_ROUTE,
)
)
self.add_route(context_id, via_context.context_id)
self._context_by_id[context.context_id] = context
return context

Loading…
Cancel
Save