Handlers no longer depend on src_id

Now there is a single handle namespace in each context, indpendent of
the source of the message. Update module forwarder etc. to cope with
that.

This is to support slave contexts communicating without the master's
intercession.
pull/35/head
David Wilson 7 years ago
parent ad182bc128
commit fd5e962cb3

@ -173,15 +173,16 @@ class Message(object):
class Channel(object): class Channel(object):
def __init__(self, context, handle=None): def __init__(self, router, dst_id=None, handle=None):
self._context = context self._router = router
self._queue = Queue.Queue() self._queue = Queue.Queue()
self._dst_id = dst_id
self.handle = handle # Avoid __repr__ crash in add_handler() self.handle = handle # Avoid __repr__ crash in add_handler()
self.handle = context.add_handler(self._receive, handle) self.handle = router.add_handler(self._on_receive, handle)
def _receive(self, msg): def _on_receive(self, msg):
"""Callback from the Stream; appends data to the internal queue.""" """Callback from the Stream; appends data to the internal queue."""
IOLOG.debug('%r._receive(%r)', self, msg) IOLOG.debug('%r._on_receive(%r)', self, msg)
self._queue.put(msg) self._queue.put(msg)
def close(self): def close(self):
@ -192,7 +193,13 @@ class Channel(object):
def put(self, data): def put(self, data):
"""Send `data` to the remote.""" """Send `data` to the remote."""
IOLOG.debug('%r.send(%r)', self, data) IOLOG.debug('%r.send(%r)', self, data)
self._context.send(self.handle, data) self._router.send(
Message.pickled(
data,
dst_id=self._dst_id,
handle=self.handle
)
)
def get(self, timeout=None): def get(self, timeout=None):
"""Receive an object, or ``None`` if `timeout` is reached.""" """Receive an object, or ``None`` if `timeout` is reached."""
@ -233,7 +240,7 @@ class Channel(object):
return return
def __repr__(self): def __repr__(self):
return 'Channel(%r, %r)' % (self._context, self.handle) return 'Channel(%r, %r)' % (self._router, self.handle)
class Importer(object): class Importer(object):
@ -580,26 +587,6 @@ class Context(object):
self.context_id = context_id self.context_id = context_id
self.name = name self.name = name
self.key = key or ('%016x' % random.getrandbits(128)) self.key = key or ('%016x' % random.getrandbits(128))
#: handle -> (persistent?, func(msg))
self._handle_map = {}
self._last_handle = itertools.count(1000)
def add_handler(self, fn, handle=None, persist=True):
"""Invoke `fn(msg)` for each Message sent to `handle` from this
context. Unregister after one invocation if `persist` is ``False``. If
`handle` is ``None``, a new handle is allocated and returned."""
handle = handle or self._last_handle.next()
IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
self._handle_map[handle] = persist, fn
return handle
def on_shutdown(self, broker):
"""Called during :py:meth:`Broker.shutdown`, informs callbacks
registered with :py:meth:`add_handle_cb` the connection is dead."""
LOG.debug('%r.on_shutdown(%r)', self, broker)
for handle, (persist, fn) in self._handle_map.iteritems():
LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(_DEAD)
def on_disconnect(self, broker): def on_disconnect(self, broker):
LOG.debug('Parent stream is gone, dying.') LOG.debug('Parent stream is gone, dying.')
@ -619,7 +606,7 @@ class Context(object):
raise SystemError('Cannot making blocking call on broker thread') raise SystemError('Cannot making blocking call on broker thread')
queue = Queue.Queue() queue = Queue.Queue()
msg.reply_to = self.add_handler(queue.put, persist=False) msg.reply_to = self.router.add_handler(queue.put, persist=False)
LOG.debug('%r.send_await(%r)', self, msg) LOG.debug('%r.send_await(%r)', self, msg)
self.send(msg) self.send(msg)
@ -635,22 +622,6 @@ class Context(object):
IOLOG.debug('%r._send_await() -> %r', self, msg) IOLOG.debug('%r._send_await() -> %r', self, msg)
return msg return msg
def _invoke(self, msg):
#IOLOG.debug('%r._invoke(%r)', self, msg)
try:
persist, fn = self._handle_map[msg.handle]
except KeyError:
LOG.error('%r: invalid handle: %r', self, msg)
return
if not persist:
del self._handle_map[msg.handle]
try:
fn(msg)
except Exception:
LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn)
def __repr__(self): def __repr__(self):
return 'Context(%s, %r)' % (self.context_id, self.name) return 'Context(%s, %r)' % (self.context_id, self.name)
@ -743,14 +714,17 @@ class Router(object):
defined on our parent context. Router.route() straddles the Broker and user defined on our parent context. Router.route() straddles the Broker and user
threads, it is save to call from anywhere. threads, it is save to call from anywhere.
""" """
parent_context = None
def __init__(self, broker): def __init__(self, broker):
self.broker = broker self.broker = broker
#: context ID -> Stream #: context ID -> Stream
self._stream_by_id = {} self._stream_by_id = {}
#: List of contexts to notify of shutdown. #: List of contexts to notify of shutdown.
self._context_by_id = {} self._context_by_id = {}
self._last_handle = itertools.count(1000)
#: handle -> (persistent?, func(msg))
self._handle_map = {
ADD_ROUTE: (True, self._on_add_route)
}
def __repr__(self): def __repr__(self):
return 'Router(%r)' % (self.broker,) return 'Router(%r)' % (self.broker,)
@ -786,12 +760,43 @@ class Router(object):
self._context_by_id[context.context_id] = context self._context_by_id[context.context_id] = context
self.broker.start_receive(stream) self.broker.start_receive(stream)
def add_handler(self, fn, handle=None, persist=True):
"""Invoke `fn(msg)` for each Message sent to `handle` from this
context. Unregister after one invocation if `persist` is ``False``. If
`handle` is ``None``, a new handle is allocated and returned."""
handle = handle or self._last_handle.next()
IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
self._handle_map[handle] = persist, fn
return handle
def on_shutdown(self, broker):
"""Called during :py:meth:`Broker.shutdown`, informs callbacks
registered with :py:meth:`add_handle_cb` the connection is dead."""
LOG.debug('%r.on_shutdown(%r)', self, broker)
for handle, (persist, fn) in self._handle_map.iteritems():
LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(_DEAD)
def _invoke(self, msg):
#IOLOG.debug('%r._invoke(%r)', self, msg)
try:
persist, fn = self._handle_map[msg.handle]
except KeyError:
LOG.error('%r: invalid handle: %r', self, msg)
return
if not persist:
del self._handle_map[msg.handle]
try:
fn(msg)
except Exception:
LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn)
def _route(self, msg): def _route(self, msg):
IOLOG.debug('%r._route(%r)', self, msg) IOLOG.debug('%r._route(%r)', self, msg)
context = self._context_by_id.get(msg.src_id) if msg.dst_id == econtext.context_id:
if context and msg.dst_id == econtext.context_id: return self._invoke(msg)
context._invoke(msg)
return
stream = self._stream_by_id.get(msg.dst_id) stream = self._stream_by_id.get(msg.dst_id)
if stream is None: if stream is None:
@ -987,7 +992,7 @@ class ExternalContext(object):
else: else:
self.parent = Context(self.router, parent_id, 'parent') self.parent = Context(self.router, parent_id, 'parent')
self.channel = Channel(self.master, CALL_FUNCTION) self.channel = Channel(self.router, handle=CALL_FUNCTION)
self.stream = Stream(self.router, parent_id, key) self.stream = Stream(self.router, parent_id, key)
self.stream.name = 'parent' self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd) self.stream.accept(in_fd, out_fd)

@ -124,33 +124,39 @@ def discard_until(fd, s, deadline):
class LogForwarder(object): class LogForwarder(object):
_log = None def __init__(self, router):
self._router = router
def __init__(self, context): self._cache = {}
self._context = context router.add_handler(self._on_forward_log, econtext.core.FORWARD_LOG)
context.add_handler(self.forward, econtext.core.FORWARD_LOG)
def _on_forward_log(self, msg):
def forward(self, msg): if msg == econtext.core._DEAD:
if not self._log: return
# Delay initialization so Stream has a chance to set Context's
# default name, if one wasn't otherwise specified. logger = self._cache.get(msg.src_id)
name = '%s.%s' % (RLOG.name, self._context.name) if logger is None:
self._log = logging.getLogger(name) context = self._router.context_by_id(msg.src_id)
if msg != econtext.core._DEAD: if context is None:
name, level_s, s = msg.data.split('\x00', 2) LOG.error('FORWARD_LOG received from src_id %d', msg.src_id)
self._log.log(int(level_s), '%s: %s', name, s) return
name = '%s.%s' % (RLOG.name, context.name)
self._cache[msg.src_id] = logger = logging.getLogger(name)
name, level_s, s = msg.data.split('\x00', 2)
logger.log(int(level_s), '%s: %s', name, s)
def __repr__(self): def __repr__(self):
return 'LogForwarder(%r)' % (self._context,) return 'LogForwarder(%r)' % (self._router,)
class ModuleResponder(object): class ModuleResponder(object):
def __init__(self, context): def __init__(self, router):
self._context = context self._router = router
context.add_handler(self.get_module, econtext.core.GET_MODULE) router.add_handler(self._on_get_module, econtext.core.GET_MODULE)
def __repr__(self): def __repr__(self):
return 'ModuleResponder(%r)' % (self._context,) return 'ModuleResponder(%r)' % (self._router,)
def _get_module_via_pkgutil(self, fullname): def _get_module_via_pkgutil(self, fullname):
"""Attempt to fetch source code via pkgutil. In an ideal world, this """Attempt to fetch source code via pkgutil. In an ideal world, this
@ -208,7 +214,7 @@ class ModuleResponder(object):
_get_module_via_sys_modules, _get_module_via_sys_modules,
_get_module_via_parent_enumeration] _get_module_via_parent_enumeration]
def get_module(self, msg): def _on_get_module(self, msg):
LOG.debug('%r.get_module(%r)', self, msg) LOG.debug('%r.get_module(%r)', self, msg)
if msg == econtext.core._DEAD: if msg == econtext.core._DEAD:
return return
@ -235,15 +241,22 @@ class ModuleResponder(object):
pkg_present = None pkg_present = None
compressed = zlib.compress(source) compressed = zlib.compress(source)
self._context.send( self._router.route(
econtext.core.Message.pickled( econtext.core.Message.pickled(
(pkg_present, path, compressed), (pkg_present, path, compressed),
handle=msg.reply_to dst_id=msg.src_id,
handle=msg.reply_to,
) )
) )
except Exception: except Exception:
LOG.debug('While importing %r', fullname, exc_info=True) LOG.debug('While importing %r', fullname, exc_info=True)
self._context.send(reply_to, None) self._router.route(
econtext.core.Message.pickled(
None,
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
class ModuleForwarder(object): class ModuleForwarder(object):
@ -251,14 +264,14 @@ class ModuleForwarder(object):
Respond to GET_MODULE requests in a slave by forwarding the request to our Respond to GET_MODULE requests in a slave by forwarding the request to our
parent context, or satisfying the request from our local Importer cache. parent context, or satisfying the request from our local Importer cache.
""" """
def __init__(self, context, parent_context, importer): def __init__(self, router, parent_context, importer):
self.context = context self.router = router
self.parent_context = parent_context self.parent_context = parent_context
self.importer = importer self.importer = importer
context.add_handler(self._on_get_module, econtext.core.GET_MODULE) router.add_handler(self._on_get_module, econtext.core.GET_MODULE)
def __repr__(self): def __repr__(self):
return 'ModuleForwarder(%r)' % (self.context,) return 'ModuleForwarder(%r)' % (self.router,)
def _on_get_module(self, msg): def _on_get_module(self, msg):
LOG.debug('%r._on_get_module(%r)', self, msg) LOG.debug('%r._on_get_module(%r)', self, msg)
@ -268,10 +281,11 @@ class ModuleForwarder(object):
fullname = msg.data fullname = msg.data
cached = self.importer._cache.get(fullname) cached = self.importer._cache.get(fullname)
if cached: if cached:
self.context.send( self.router.route(
econtext.core.Message.pickled( econtext.core.Message.pickled(
data=cached, cached,
handle=msg.reply_to dst_id=msg.src_id,
handle=msg.reply_to,
) )
) )
else: else:
@ -290,10 +304,11 @@ class ModuleForwarder(object):
LOG.debug('%r._on_got_source(%r, %r)', self, msg, original_msg) LOG.debug('%r._on_got_source(%r, %r)', self, msg, original_msg)
fullname = original_msg.data fullname = original_msg.data
self.importer._cache[fullname] = msg.unpickle() self.importer._cache[fullname] = msg.unpickle()
self.context.send( self.router.route(
econtext.core.Message( econtext.core.Message(
data=msg.data, data=msg.data,
handle=original_msg.reply_to dst_id=original_msg.src_id,
handle=original_msg.reply_to,
) )
) )
@ -423,11 +438,6 @@ class Broker(econtext.core.Broker):
class Context(econtext.core.Context): class Context(econtext.core.Context):
via = None via = None
def __init__(self, *args, **kwargs):
super(Context, self).__init__(*args, **kwargs)
self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self)
def on_disconnect(self, broker): def on_disconnect(self, broker):
pass pass
@ -499,7 +509,10 @@ class Context(econtext.core.Context):
def _proxy_connect(econtext, name, context_id, klass, kwargs): def _proxy_connect(econtext, name, context_id, klass, kwargs):
econtext.router.__class__ = Router # TODO if not isinstance(econtext.router, Router): # TODO
econtext.router.__class__ = Router # TODO
LOG.debug('_proxy_connect(): constructing ModuleForwarder')
ModuleForwarder(econtext.router, econtext.parent, econtext.importer)
context = econtext.router._connect( context = econtext.router._connect(
context_id, context_id,
@ -507,9 +520,6 @@ def _proxy_connect(econtext, name, context_id, klass, kwargs):
name=name, name=name,
**kwargs **kwargs
) )
LOG.debug('_proxy_connect(): constructing ModuleForwarder for %r', context)
ModuleForwarder(context, econtext.parent, econtext.importer)
return context.name return context.name
@ -518,6 +528,11 @@ class Router(econtext.core.Router):
debug = False debug = False
def __init__(self, *args, **kwargs):
super(Router, self).__init__(*args, **kwargs)
self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self)
def enable_debug(self): def enable_debug(self):
""" """
Cause this context and any descendant child contexts to write debug Cause this context and any descendant child contexts to write debug
@ -572,20 +587,22 @@ class Router(econtext.core.Router):
name = via_context.call_with_deadline(None, True, name = via_context.call_with_deadline(None, True,
_proxy_connect, name, context_id, klass, kwargs _proxy_connect, name, context_id, klass, kwargs
) )
name = '%s.%s' % (via_context.name, name) # name = '%s.%s' % (via_context.name, name)
context = Context(self, context_id, name=name) context = Context(self, context_id, name=name)
context.via = via_context context.via = via_context
child = via_context child = via_context
parent = via_context.via parent = via_context.via
while parent is not None: while parent is not None:
LOG.info('Adding route to %r for %r via %r', parent, context, child) LOG.debug('Adding route to %r for %r via %r', parent, context, child)
parent.send( parent.send(
econtext.core.Message( econtext.core.Message(
data='%s\x00%s' % (context_id, child.context_id), data='%s\x00%s' % (context_id, child.context_id),
handle=econtext.core.ADD_ROUTE, handle=econtext.core.ADD_ROUTE,
) )
) )
child = parent
parent = parent.via
self._context_by_id[context.context_id] = context self._context_by_id[context.context_id] = context
return context return context

Loading…
Cancel
Save