|
|
@ -737,10 +737,6 @@ class Router(object):
|
|
|
|
def __repr__(self):
|
|
|
|
def __repr__(self):
|
|
|
|
return 'Router(%r)' % (self.broker,)
|
|
|
|
return 'Router(%r)' % (self.broker,)
|
|
|
|
|
|
|
|
|
|
|
|
def set_parent(self, context):
|
|
|
|
|
|
|
|
self.parent_context = context
|
|
|
|
|
|
|
|
context.add_handler(self._on_add_route, ADD_ROUTE)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def on_disconnect(self, stream, broker):
|
|
|
|
def on_disconnect(self, stream, broker):
|
|
|
|
"""Invoked by Stream.on_disconnect()."""
|
|
|
|
"""Invoked by Stream.on_disconnect()."""
|
|
|
|
for context in self._context_by_id.itervalues():
|
|
|
|
for context in self._context_by_id.itervalues():
|
|
|
@ -772,15 +768,16 @@ class Router(object):
|
|
|
|
self.broker.start_receive(stream)
|
|
|
|
self.broker.start_receive(stream)
|
|
|
|
|
|
|
|
|
|
|
|
def _route(self, msg):
|
|
|
|
def _route(self, msg):
|
|
|
|
#LOG.debug('%r._route(%r)', self, msg)
|
|
|
|
IOLOG.debug('%r._route(%r)', self, msg)
|
|
|
|
context = self._context_by_id.get(msg.src_id)
|
|
|
|
context = self._context_by_id.get(msg.src_id)
|
|
|
|
if msg.dst_id == econtext.context_id and context is not None:
|
|
|
|
if context and msg.dst_id == econtext.context_id:
|
|
|
|
context._invoke(msg)
|
|
|
|
context._invoke(msg)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
stream = self._stream_by_id.get(msg.dst_id)
|
|
|
|
stream = self._stream_by_id.get(msg.dst_id)
|
|
|
|
if stream is None:
|
|
|
|
if stream is None:
|
|
|
|
LOG.error('%r: no route for %r, my ID is %r', self, msg, econtext.context_id)
|
|
|
|
LOG.error('%r: no route for %r, my ID is %r',
|
|
|
|
|
|
|
|
self, msg, econtext.context_id)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
stream.send(msg)
|
|
|
|
stream.send(msg)
|
|
|
@ -963,10 +960,15 @@ class ExternalContext(object):
|
|
|
|
def _setup_master(self, parent_id, context_id, key):
|
|
|
|
def _setup_master(self, parent_id, context_id, key):
|
|
|
|
self.broker = Broker()
|
|
|
|
self.broker = Broker()
|
|
|
|
self.router = Router(self.broker)
|
|
|
|
self.router = Router(self.broker)
|
|
|
|
self.context = Context(self.router, parent_id, 'master', key=key)
|
|
|
|
self.master = Context(self.router, 0, 'master')
|
|
|
|
self.router.set_parent(self.context)
|
|
|
|
if parent_id == 0:
|
|
|
|
self.channel = Channel(self.context, CALL_FUNCTION)
|
|
|
|
self.parent = self.master
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
self.parent = Context(self.router, parent_id, 'parent')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.channel = Channel(self.master, CALL_FUNCTION)
|
|
|
|
self.stream = Stream(self.router, parent_id, key)
|
|
|
|
self.stream = Stream(self.router, parent_id, key)
|
|
|
|
|
|
|
|
self.stream.name = 'parent'
|
|
|
|
self.stream.accept(100, 1)
|
|
|
|
self.stream.accept(100, 1)
|
|
|
|
|
|
|
|
|
|
|
|
os.wait() # Reap first stage.
|
|
|
|
os.wait() # Reap first stage.
|
|
|
@ -975,7 +977,7 @@ class ExternalContext(object):
|
|
|
|
def _setup_logging(self, debug, log_level):
|
|
|
|
def _setup_logging(self, debug, log_level):
|
|
|
|
root = logging.getLogger()
|
|
|
|
root = logging.getLogger()
|
|
|
|
root.setLevel(log_level)
|
|
|
|
root.setLevel(log_level)
|
|
|
|
root.handlers = [LogHandler(self.context)]
|
|
|
|
root.handlers = [LogHandler(self.master)]
|
|
|
|
if debug:
|
|
|
|
if debug:
|
|
|
|
enable_debug_logging()
|
|
|
|
enable_debug_logging()
|
|
|
|
|
|
|
|
|
|
|
@ -987,7 +989,7 @@ class ExternalContext(object):
|
|
|
|
core_src = '\n'.join(core_src.splitlines()[:-1])
|
|
|
|
core_src = '\n'.join(core_src.splitlines()[:-1])
|
|
|
|
fp.close()
|
|
|
|
fp.close()
|
|
|
|
|
|
|
|
|
|
|
|
self.importer = Importer(self.context, core_src)
|
|
|
|
self.importer = Importer(self.master, core_src)
|
|
|
|
sys.meta_path.append(self.importer)
|
|
|
|
sys.meta_path.append(self.importer)
|
|
|
|
|
|
|
|
|
|
|
|
def _setup_package(self, context_id):
|
|
|
|
def _setup_package(self, context_id):
|
|
|
@ -1030,10 +1032,10 @@ class ExternalContext(object):
|
|
|
|
obj = getattr(obj, klass)
|
|
|
|
obj = getattr(obj, klass)
|
|
|
|
fn = getattr(obj, func)
|
|
|
|
fn = getattr(obj, func)
|
|
|
|
ret = fn(*args, **kwargs)
|
|
|
|
ret = fn(*args, **kwargs)
|
|
|
|
self.context.send(Message.pickled(ret, handle=msg.reply_to))
|
|
|
|
self.master.send(Message.pickled(ret, handle=msg.reply_to))
|
|
|
|
except Exception, e:
|
|
|
|
except Exception, e:
|
|
|
|
e = CallError(str(e))
|
|
|
|
e = CallError(str(e))
|
|
|
|
self.context.send(Message.pickled(e, handle=msg.reply_to))
|
|
|
|
self.master.send(Message.pickled(e, handle=msg.reply_to))
|
|
|
|
|
|
|
|
|
|
|
|
def main(self, parent_id, context_id, key, debug, log_level):
|
|
|
|
def main(self, parent_id, context_id, key, debug, log_level):
|
|
|
|
self._setup_master(parent_id, context_id, key)
|
|
|
|
self._setup_master(parent_id, context_id, key)
|
|
|
@ -1043,10 +1045,12 @@ class ExternalContext(object):
|
|
|
|
self._setup_importer()
|
|
|
|
self._setup_importer()
|
|
|
|
self._setup_package(context_id)
|
|
|
|
self._setup_package(context_id)
|
|
|
|
self._setup_stdio()
|
|
|
|
self._setup_stdio()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.router.register(self.parent, self.stream)
|
|
|
|
|
|
|
|
self.router.register(self.master, self.stream)
|
|
|
|
LOG.debug('Connected to %s; my ID is %r, PID is %r',
|
|
|
|
LOG.debug('Connected to %s; my ID is %r, PID is %r',
|
|
|
|
self.context, context_id, os.getpid())
|
|
|
|
self.parent, context_id, os.getpid())
|
|
|
|
|
|
|
|
|
|
|
|
self.router.register(self.context, self.stream)
|
|
|
|
|
|
|
|
self._dispatch_calls()
|
|
|
|
self._dispatch_calls()
|
|
|
|
LOG.debug('ExternalContext.main() normal exit')
|
|
|
|
LOG.debug('ExternalContext.main() normal exit')
|
|
|
|
except BaseException:
|
|
|
|
except BaseException:
|
|
|
|