From fd5e962cb3feb8acef536f4bf346ce8efa6e0e9d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 31 Aug 2017 18:06:57 +0530 Subject: [PATCH] Handlers no longer depend on src_id Now there is a single handle namespace in each context, indpendent of the source of the message. Update module forwarder etc. to cope with that. This is to support slave contexts communicating without the master's intercession. --- econtext/core.py | 107 ++++++++++++++++++++++++--------------------- econtext/master.py | 105 +++++++++++++++++++++++++------------------- 2 files changed, 117 insertions(+), 95 deletions(-) diff --git a/econtext/core.py b/econtext/core.py index 3506b383..44dc46c5 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -173,15 +173,16 @@ class Message(object): class Channel(object): - def __init__(self, context, handle=None): - self._context = context + def __init__(self, router, dst_id=None, handle=None): + self._router = router self._queue = Queue.Queue() + self._dst_id = dst_id self.handle = handle # Avoid __repr__ crash in add_handler() - self.handle = context.add_handler(self._receive, handle) + self.handle = router.add_handler(self._on_receive, handle) - def _receive(self, msg): + def _on_receive(self, msg): """Callback from the Stream; appends data to the internal queue.""" - IOLOG.debug('%r._receive(%r)', self, msg) + IOLOG.debug('%r._on_receive(%r)', self, msg) self._queue.put(msg) def close(self): @@ -192,7 +193,13 @@ class Channel(object): def put(self, data): """Send `data` to the remote.""" IOLOG.debug('%r.send(%r)', self, data) - self._context.send(self.handle, data) + self._router.send( + Message.pickled( + data, + dst_id=self._dst_id, + handle=self.handle + ) + ) def get(self, timeout=None): """Receive an object, or ``None`` if `timeout` is reached.""" @@ -233,7 +240,7 @@ class Channel(object): return def __repr__(self): - return 'Channel(%r, %r)' % (self._context, self.handle) + return 'Channel(%r, %r)' % (self._router, self.handle) class Importer(object): @@ -580,26 +587,6 @@ class Context(object): self.context_id = context_id self.name = name self.key = key or ('%016x' % random.getrandbits(128)) - #: handle -> (persistent?, func(msg)) - self._handle_map = {} - self._last_handle = itertools.count(1000) - - def add_handler(self, fn, handle=None, persist=True): - """Invoke `fn(msg)` for each Message sent to `handle` from this - context. Unregister after one invocation if `persist` is ``False``. If - `handle` is ``None``, a new handle is allocated and returned.""" - handle = handle or self._last_handle.next() - IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) - self._handle_map[handle] = persist, fn - return handle - - def on_shutdown(self, broker): - """Called during :py:meth:`Broker.shutdown`, informs callbacks - registered with :py:meth:`add_handle_cb` the connection is dead.""" - LOG.debug('%r.on_shutdown(%r)', self, broker) - for handle, (persist, fn) in self._handle_map.iteritems(): - LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) - fn(_DEAD) def on_disconnect(self, broker): LOG.debug('Parent stream is gone, dying.') @@ -619,7 +606,7 @@ class Context(object): raise SystemError('Cannot making blocking call on broker thread') queue = Queue.Queue() - msg.reply_to = self.add_handler(queue.put, persist=False) + msg.reply_to = self.router.add_handler(queue.put, persist=False) LOG.debug('%r.send_await(%r)', self, msg) self.send(msg) @@ -635,22 +622,6 @@ class Context(object): IOLOG.debug('%r._send_await() -> %r', self, msg) return msg - def _invoke(self, msg): - #IOLOG.debug('%r._invoke(%r)', self, msg) - try: - persist, fn = self._handle_map[msg.handle] - except KeyError: - LOG.error('%r: invalid handle: %r', self, msg) - return - - if not persist: - del self._handle_map[msg.handle] - - try: - fn(msg) - except Exception: - LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) - def __repr__(self): return 'Context(%s, %r)' % (self.context_id, self.name) @@ -743,14 +714,17 @@ class Router(object): defined on our parent context. Router.route() straddles the Broker and user threads, it is save to call from anywhere. """ - parent_context = None - def __init__(self, broker): self.broker = broker #: context ID -> Stream self._stream_by_id = {} #: List of contexts to notify of shutdown. self._context_by_id = {} + self._last_handle = itertools.count(1000) + #: handle -> (persistent?, func(msg)) + self._handle_map = { + ADD_ROUTE: (True, self._on_add_route) + } def __repr__(self): return 'Router(%r)' % (self.broker,) @@ -786,12 +760,43 @@ class Router(object): self._context_by_id[context.context_id] = context self.broker.start_receive(stream) + def add_handler(self, fn, handle=None, persist=True): + """Invoke `fn(msg)` for each Message sent to `handle` from this + context. Unregister after one invocation if `persist` is ``False``. If + `handle` is ``None``, a new handle is allocated and returned.""" + handle = handle or self._last_handle.next() + IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) + self._handle_map[handle] = persist, fn + return handle + + def on_shutdown(self, broker): + """Called during :py:meth:`Broker.shutdown`, informs callbacks + registered with :py:meth:`add_handle_cb` the connection is dead.""" + LOG.debug('%r.on_shutdown(%r)', self, broker) + for handle, (persist, fn) in self._handle_map.iteritems(): + LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) + fn(_DEAD) + + def _invoke(self, msg): + #IOLOG.debug('%r._invoke(%r)', self, msg) + try: + persist, fn = self._handle_map[msg.handle] + except KeyError: + LOG.error('%r: invalid handle: %r', self, msg) + return + + if not persist: + del self._handle_map[msg.handle] + + try: + fn(msg) + except Exception: + LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) + def _route(self, msg): IOLOG.debug('%r._route(%r)', self, msg) - context = self._context_by_id.get(msg.src_id) - if context and msg.dst_id == econtext.context_id: - context._invoke(msg) - return + if msg.dst_id == econtext.context_id: + return self._invoke(msg) stream = self._stream_by_id.get(msg.dst_id) if stream is None: @@ -987,7 +992,7 @@ class ExternalContext(object): else: self.parent = Context(self.router, parent_id, 'parent') - self.channel = Channel(self.master, CALL_FUNCTION) + self.channel = Channel(self.router, handle=CALL_FUNCTION) self.stream = Stream(self.router, parent_id, key) self.stream.name = 'parent' self.stream.accept(in_fd, out_fd) diff --git a/econtext/master.py b/econtext/master.py index 937f4355..9328fd59 100644 --- a/econtext/master.py +++ b/econtext/master.py @@ -124,33 +124,39 @@ def discard_until(fd, s, deadline): class LogForwarder(object): - _log = None - - def __init__(self, context): - self._context = context - context.add_handler(self.forward, econtext.core.FORWARD_LOG) - - def forward(self, msg): - if not self._log: - # Delay initialization so Stream has a chance to set Context's - # default name, if one wasn't otherwise specified. - name = '%s.%s' % (RLOG.name, self._context.name) - self._log = logging.getLogger(name) - if msg != econtext.core._DEAD: - name, level_s, s = msg.data.split('\x00', 2) - self._log.log(int(level_s), '%s: %s', name, s) + def __init__(self, router): + self._router = router + self._cache = {} + router.add_handler(self._on_forward_log, econtext.core.FORWARD_LOG) + + def _on_forward_log(self, msg): + if msg == econtext.core._DEAD: + return + + logger = self._cache.get(msg.src_id) + if logger is None: + context = self._router.context_by_id(msg.src_id) + if context is None: + LOG.error('FORWARD_LOG received from src_id %d', msg.src_id) + return + + name = '%s.%s' % (RLOG.name, context.name) + self._cache[msg.src_id] = logger = logging.getLogger(name) + + name, level_s, s = msg.data.split('\x00', 2) + logger.log(int(level_s), '%s: %s', name, s) def __repr__(self): - return 'LogForwarder(%r)' % (self._context,) + return 'LogForwarder(%r)' % (self._router,) class ModuleResponder(object): - def __init__(self, context): - self._context = context - context.add_handler(self.get_module, econtext.core.GET_MODULE) + def __init__(self, router): + self._router = router + router.add_handler(self._on_get_module, econtext.core.GET_MODULE) def __repr__(self): - return 'ModuleResponder(%r)' % (self._context,) + return 'ModuleResponder(%r)' % (self._router,) def _get_module_via_pkgutil(self, fullname): """Attempt to fetch source code via pkgutil. In an ideal world, this @@ -208,7 +214,7 @@ class ModuleResponder(object): _get_module_via_sys_modules, _get_module_via_parent_enumeration] - def get_module(self, msg): + def _on_get_module(self, msg): LOG.debug('%r.get_module(%r)', self, msg) if msg == econtext.core._DEAD: return @@ -235,15 +241,22 @@ class ModuleResponder(object): pkg_present = None compressed = zlib.compress(source) - self._context.send( + self._router.route( econtext.core.Message.pickled( (pkg_present, path, compressed), - handle=msg.reply_to + dst_id=msg.src_id, + handle=msg.reply_to, ) ) except Exception: LOG.debug('While importing %r', fullname, exc_info=True) - self._context.send(reply_to, None) + self._router.route( + econtext.core.Message.pickled( + None, + dst_id=msg.src_id, + handle=msg.reply_to, + ) + ) class ModuleForwarder(object): @@ -251,14 +264,14 @@ class ModuleForwarder(object): Respond to GET_MODULE requests in a slave by forwarding the request to our parent context, or satisfying the request from our local Importer cache. """ - def __init__(self, context, parent_context, importer): - self.context = context + def __init__(self, router, parent_context, importer): + self.router = router self.parent_context = parent_context self.importer = importer - context.add_handler(self._on_get_module, econtext.core.GET_MODULE) + router.add_handler(self._on_get_module, econtext.core.GET_MODULE) def __repr__(self): - return 'ModuleForwarder(%r)' % (self.context,) + return 'ModuleForwarder(%r)' % (self.router,) def _on_get_module(self, msg): LOG.debug('%r._on_get_module(%r)', self, msg) @@ -268,10 +281,11 @@ class ModuleForwarder(object): fullname = msg.data cached = self.importer._cache.get(fullname) if cached: - self.context.send( + self.router.route( econtext.core.Message.pickled( - data=cached, - handle=msg.reply_to + cached, + dst_id=msg.src_id, + handle=msg.reply_to, ) ) else: @@ -290,10 +304,11 @@ class ModuleForwarder(object): LOG.debug('%r._on_got_source(%r, %r)', self, msg, original_msg) fullname = original_msg.data self.importer._cache[fullname] = msg.unpickle() - self.context.send( + self.router.route( econtext.core.Message( data=msg.data, - handle=original_msg.reply_to + dst_id=original_msg.src_id, + handle=original_msg.reply_to, ) ) @@ -423,11 +438,6 @@ class Broker(econtext.core.Broker): class Context(econtext.core.Context): via = None - def __init__(self, *args, **kwargs): - super(Context, self).__init__(*args, **kwargs) - self.responder = ModuleResponder(self) - self.log_forwarder = LogForwarder(self) - def on_disconnect(self, broker): pass @@ -499,7 +509,10 @@ class Context(econtext.core.Context): def _proxy_connect(econtext, name, context_id, klass, kwargs): - econtext.router.__class__ = Router # TODO + if not isinstance(econtext.router, Router): # TODO + econtext.router.__class__ = Router # TODO + LOG.debug('_proxy_connect(): constructing ModuleForwarder') + ModuleForwarder(econtext.router, econtext.parent, econtext.importer) context = econtext.router._connect( context_id, @@ -507,9 +520,6 @@ def _proxy_connect(econtext, name, context_id, klass, kwargs): name=name, **kwargs ) - - LOG.debug('_proxy_connect(): constructing ModuleForwarder for %r', context) - ModuleForwarder(context, econtext.parent, econtext.importer) return context.name @@ -518,6 +528,11 @@ class Router(econtext.core.Router): debug = False + def __init__(self, *args, **kwargs): + super(Router, self).__init__(*args, **kwargs) + self.responder = ModuleResponder(self) + self.log_forwarder = LogForwarder(self) + def enable_debug(self): """ Cause this context and any descendant child contexts to write debug @@ -572,20 +587,22 @@ class Router(econtext.core.Router): name = via_context.call_with_deadline(None, True, _proxy_connect, name, context_id, klass, kwargs ) - name = '%s.%s' % (via_context.name, name) + # name = '%s.%s' % (via_context.name, name) context = Context(self, context_id, name=name) context.via = via_context child = via_context parent = via_context.via while parent is not None: - LOG.info('Adding route to %r for %r via %r', parent, context, child) + LOG.debug('Adding route to %r for %r via %r', parent, context, child) parent.send( econtext.core.Message( data='%s\x00%s' % (context_id, child.context_id), handle=econtext.core.ADD_ROUTE, ) ) + child = parent + parent = parent.via self._context_by_id[context.context_id] = context return context