diff --git a/econtext/core.py b/econtext/core.py index 3506b383..44dc46c5 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -173,15 +173,16 @@ class Message(object): class Channel(object): - def __init__(self, context, handle=None): - self._context = context + def __init__(self, router, dst_id=None, handle=None): + self._router = router self._queue = Queue.Queue() + self._dst_id = dst_id self.handle = handle # Avoid __repr__ crash in add_handler() - self.handle = context.add_handler(self._receive, handle) + self.handle = router.add_handler(self._on_receive, handle) - def _receive(self, msg): + def _on_receive(self, msg): """Callback from the Stream; appends data to the internal queue.""" - IOLOG.debug('%r._receive(%r)', self, msg) + IOLOG.debug('%r._on_receive(%r)', self, msg) self._queue.put(msg) def close(self): @@ -192,7 +193,13 @@ class Channel(object): def put(self, data): """Send `data` to the remote.""" IOLOG.debug('%r.send(%r)', self, data) - self._context.send(self.handle, data) + self._router.send( + Message.pickled( + data, + dst_id=self._dst_id, + handle=self.handle + ) + ) def get(self, timeout=None): """Receive an object, or ``None`` if `timeout` is reached.""" @@ -233,7 +240,7 @@ class Channel(object): return def __repr__(self): - return 'Channel(%r, %r)' % (self._context, self.handle) + return 'Channel(%r, %r)' % (self._router, self.handle) class Importer(object): @@ -580,26 +587,6 @@ class Context(object): self.context_id = context_id self.name = name self.key = key or ('%016x' % random.getrandbits(128)) - #: handle -> (persistent?, func(msg)) - self._handle_map = {} - self._last_handle = itertools.count(1000) - - def add_handler(self, fn, handle=None, persist=True): - """Invoke `fn(msg)` for each Message sent to `handle` from this - context. Unregister after one invocation if `persist` is ``False``. If - `handle` is ``None``, a new handle is allocated and returned.""" - handle = handle or self._last_handle.next() - IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) - self._handle_map[handle] = persist, fn - return handle - - def on_shutdown(self, broker): - """Called during :py:meth:`Broker.shutdown`, informs callbacks - registered with :py:meth:`add_handle_cb` the connection is dead.""" - LOG.debug('%r.on_shutdown(%r)', self, broker) - for handle, (persist, fn) in self._handle_map.iteritems(): - LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) - fn(_DEAD) def on_disconnect(self, broker): LOG.debug('Parent stream is gone, dying.') @@ -619,7 +606,7 @@ class Context(object): raise SystemError('Cannot making blocking call on broker thread') queue = Queue.Queue() - msg.reply_to = self.add_handler(queue.put, persist=False) + msg.reply_to = self.router.add_handler(queue.put, persist=False) LOG.debug('%r.send_await(%r)', self, msg) self.send(msg) @@ -635,22 +622,6 @@ class Context(object): IOLOG.debug('%r._send_await() -> %r', self, msg) return msg - def _invoke(self, msg): - #IOLOG.debug('%r._invoke(%r)', self, msg) - try: - persist, fn = self._handle_map[msg.handle] - except KeyError: - LOG.error('%r: invalid handle: %r', self, msg) - return - - if not persist: - del self._handle_map[msg.handle] - - try: - fn(msg) - except Exception: - LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) - def __repr__(self): return 'Context(%s, %r)' % (self.context_id, self.name) @@ -743,14 +714,17 @@ class Router(object): defined on our parent context. Router.route() straddles the Broker and user threads, it is save to call from anywhere. """ - parent_context = None - def __init__(self, broker): self.broker = broker #: context ID -> Stream self._stream_by_id = {} #: List of contexts to notify of shutdown. self._context_by_id = {} + self._last_handle = itertools.count(1000) + #: handle -> (persistent?, func(msg)) + self._handle_map = { + ADD_ROUTE: (True, self._on_add_route) + } def __repr__(self): return 'Router(%r)' % (self.broker,) @@ -786,12 +760,43 @@ class Router(object): self._context_by_id[context.context_id] = context self.broker.start_receive(stream) + def add_handler(self, fn, handle=None, persist=True): + """Invoke `fn(msg)` for each Message sent to `handle` from this + context. Unregister after one invocation if `persist` is ``False``. If + `handle` is ``None``, a new handle is allocated and returned.""" + handle = handle or self._last_handle.next() + IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) + self._handle_map[handle] = persist, fn + return handle + + def on_shutdown(self, broker): + """Called during :py:meth:`Broker.shutdown`, informs callbacks + registered with :py:meth:`add_handle_cb` the connection is dead.""" + LOG.debug('%r.on_shutdown(%r)', self, broker) + for handle, (persist, fn) in self._handle_map.iteritems(): + LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) + fn(_DEAD) + + def _invoke(self, msg): + #IOLOG.debug('%r._invoke(%r)', self, msg) + try: + persist, fn = self._handle_map[msg.handle] + except KeyError: + LOG.error('%r: invalid handle: %r', self, msg) + return + + if not persist: + del self._handle_map[msg.handle] + + try: + fn(msg) + except Exception: + LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) + def _route(self, msg): IOLOG.debug('%r._route(%r)', self, msg) - context = self._context_by_id.get(msg.src_id) - if context and msg.dst_id == econtext.context_id: - context._invoke(msg) - return + if msg.dst_id == econtext.context_id: + return self._invoke(msg) stream = self._stream_by_id.get(msg.dst_id) if stream is None: @@ -987,7 +992,7 @@ class ExternalContext(object): else: self.parent = Context(self.router, parent_id, 'parent') - self.channel = Channel(self.master, CALL_FUNCTION) + self.channel = Channel(self.router, handle=CALL_FUNCTION) self.stream = Stream(self.router, parent_id, key) self.stream.name = 'parent' self.stream.accept(in_fd, out_fd) diff --git a/econtext/master.py b/econtext/master.py index 937f4355..9328fd59 100644 --- a/econtext/master.py +++ b/econtext/master.py @@ -124,33 +124,39 @@ def discard_until(fd, s, deadline): class LogForwarder(object): - _log = None - - def __init__(self, context): - self._context = context - context.add_handler(self.forward, econtext.core.FORWARD_LOG) - - def forward(self, msg): - if not self._log: - # Delay initialization so Stream has a chance to set Context's - # default name, if one wasn't otherwise specified. - name = '%s.%s' % (RLOG.name, self._context.name) - self._log = logging.getLogger(name) - if msg != econtext.core._DEAD: - name, level_s, s = msg.data.split('\x00', 2) - self._log.log(int(level_s), '%s: %s', name, s) + def __init__(self, router): + self._router = router + self._cache = {} + router.add_handler(self._on_forward_log, econtext.core.FORWARD_LOG) + + def _on_forward_log(self, msg): + if msg == econtext.core._DEAD: + return + + logger = self._cache.get(msg.src_id) + if logger is None: + context = self._router.context_by_id(msg.src_id) + if context is None: + LOG.error('FORWARD_LOG received from src_id %d', msg.src_id) + return + + name = '%s.%s' % (RLOG.name, context.name) + self._cache[msg.src_id] = logger = logging.getLogger(name) + + name, level_s, s = msg.data.split('\x00', 2) + logger.log(int(level_s), '%s: %s', name, s) def __repr__(self): - return 'LogForwarder(%r)' % (self._context,) + return 'LogForwarder(%r)' % (self._router,) class ModuleResponder(object): - def __init__(self, context): - self._context = context - context.add_handler(self.get_module, econtext.core.GET_MODULE) + def __init__(self, router): + self._router = router + router.add_handler(self._on_get_module, econtext.core.GET_MODULE) def __repr__(self): - return 'ModuleResponder(%r)' % (self._context,) + return 'ModuleResponder(%r)' % (self._router,) def _get_module_via_pkgutil(self, fullname): """Attempt to fetch source code via pkgutil. In an ideal world, this @@ -208,7 +214,7 @@ class ModuleResponder(object): _get_module_via_sys_modules, _get_module_via_parent_enumeration] - def get_module(self, msg): + def _on_get_module(self, msg): LOG.debug('%r.get_module(%r)', self, msg) if msg == econtext.core._DEAD: return @@ -235,15 +241,22 @@ class ModuleResponder(object): pkg_present = None compressed = zlib.compress(source) - self._context.send( + self._router.route( econtext.core.Message.pickled( (pkg_present, path, compressed), - handle=msg.reply_to + dst_id=msg.src_id, + handle=msg.reply_to, ) ) except Exception: LOG.debug('While importing %r', fullname, exc_info=True) - self._context.send(reply_to, None) + self._router.route( + econtext.core.Message.pickled( + None, + dst_id=msg.src_id, + handle=msg.reply_to, + ) + ) class ModuleForwarder(object): @@ -251,14 +264,14 @@ class ModuleForwarder(object): Respond to GET_MODULE requests in a slave by forwarding the request to our parent context, or satisfying the request from our local Importer cache. """ - def __init__(self, context, parent_context, importer): - self.context = context + def __init__(self, router, parent_context, importer): + self.router = router self.parent_context = parent_context self.importer = importer - context.add_handler(self._on_get_module, econtext.core.GET_MODULE) + router.add_handler(self._on_get_module, econtext.core.GET_MODULE) def __repr__(self): - return 'ModuleForwarder(%r)' % (self.context,) + return 'ModuleForwarder(%r)' % (self.router,) def _on_get_module(self, msg): LOG.debug('%r._on_get_module(%r)', self, msg) @@ -268,10 +281,11 @@ class ModuleForwarder(object): fullname = msg.data cached = self.importer._cache.get(fullname) if cached: - self.context.send( + self.router.route( econtext.core.Message.pickled( - data=cached, - handle=msg.reply_to + cached, + dst_id=msg.src_id, + handle=msg.reply_to, ) ) else: @@ -290,10 +304,11 @@ class ModuleForwarder(object): LOG.debug('%r._on_got_source(%r, %r)', self, msg, original_msg) fullname = original_msg.data self.importer._cache[fullname] = msg.unpickle() - self.context.send( + self.router.route( econtext.core.Message( data=msg.data, - handle=original_msg.reply_to + dst_id=original_msg.src_id, + handle=original_msg.reply_to, ) ) @@ -423,11 +438,6 @@ class Broker(econtext.core.Broker): class Context(econtext.core.Context): via = None - def __init__(self, *args, **kwargs): - super(Context, self).__init__(*args, **kwargs) - self.responder = ModuleResponder(self) - self.log_forwarder = LogForwarder(self) - def on_disconnect(self, broker): pass @@ -499,7 +509,10 @@ class Context(econtext.core.Context): def _proxy_connect(econtext, name, context_id, klass, kwargs): - econtext.router.__class__ = Router # TODO + if not isinstance(econtext.router, Router): # TODO + econtext.router.__class__ = Router # TODO + LOG.debug('_proxy_connect(): constructing ModuleForwarder') + ModuleForwarder(econtext.router, econtext.parent, econtext.importer) context = econtext.router._connect( context_id, @@ -507,9 +520,6 @@ def _proxy_connect(econtext, name, context_id, klass, kwargs): name=name, **kwargs ) - - LOG.debug('_proxy_connect(): constructing ModuleForwarder for %r', context) - ModuleForwarder(context, econtext.parent, econtext.importer) return context.name @@ -518,6 +528,11 @@ class Router(econtext.core.Router): debug = False + def __init__(self, *args, **kwargs): + super(Router, self).__init__(*args, **kwargs) + self.responder = ModuleResponder(self) + self.log_forwarder = LogForwarder(self) + def enable_debug(self): """ Cause this context and any descendant child contexts to write debug @@ -572,20 +587,22 @@ class Router(econtext.core.Router): name = via_context.call_with_deadline(None, True, _proxy_connect, name, context_id, klass, kwargs ) - name = '%s.%s' % (via_context.name, name) + # name = '%s.%s' % (via_context.name, name) context = Context(self, context_id, name=name) context.via = via_context child = via_context parent = via_context.via while parent is not None: - LOG.info('Adding route to %r for %r via %r', parent, context, child) + LOG.debug('Adding route to %r for %r via %r', parent, context, child) parent.send( econtext.core.Message( data='%s\x00%s' % (context_id, child.context_id), handle=econtext.core.ADD_ROUTE, ) ) + child = parent + parent = parent.via self._context_by_id[context.context_id] = context return context