|
|
|
@ -1949,15 +1949,60 @@ class Broker(object):
|
|
|
|
|
return 'Broker(%#x)' % (id(self),)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Dispatcher(object):
|
|
|
|
|
def __init__(self, econtext):
|
|
|
|
|
self.econtext = econtext
|
|
|
|
|
self.recv = Receiver(router=econtext.router,
|
|
|
|
|
handle=CALL_FUNCTION,
|
|
|
|
|
policy=has_parent_authority)
|
|
|
|
|
listen(econtext.broker, 'shutdown', self._on_broker_shutdown)
|
|
|
|
|
|
|
|
|
|
def _on_broker_shutdown(self):
|
|
|
|
|
self.recv.close()
|
|
|
|
|
|
|
|
|
|
def _dispatch_one(self, msg):
|
|
|
|
|
data = msg.unpickle(throw=False)
|
|
|
|
|
_v and LOG.debug('_dispatch_one(%r)', data)
|
|
|
|
|
|
|
|
|
|
modname, klass, func, args, kwargs = data
|
|
|
|
|
obj = import_module(modname)
|
|
|
|
|
if klass:
|
|
|
|
|
obj = getattr(obj, klass)
|
|
|
|
|
fn = getattr(obj, func)
|
|
|
|
|
if getattr(fn, 'mitogen_takes_econtext', None):
|
|
|
|
|
kwargs.setdefault('econtext', self.econtext)
|
|
|
|
|
if getattr(fn, 'mitogen_takes_router', None):
|
|
|
|
|
kwargs.setdefault('router', self.econtext.router)
|
|
|
|
|
return fn(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def _dispatch_calls(self):
|
|
|
|
|
for msg in self.recv:
|
|
|
|
|
try:
|
|
|
|
|
ret = self._dispatch_one(msg)
|
|
|
|
|
_v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret)
|
|
|
|
|
if msg.reply_to:
|
|
|
|
|
msg.reply(ret)
|
|
|
|
|
except Exception:
|
|
|
|
|
e = sys.exc_info()[1]
|
|
|
|
|
if msg.reply_to:
|
|
|
|
|
_v and LOG.debug('_dispatch_calls: %s', e)
|
|
|
|
|
msg.reply(CallError(e))
|
|
|
|
|
else:
|
|
|
|
|
LOG.exception('_dispatch_calls: %r', msg)
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
if self.econtext.config.get('on_start'):
|
|
|
|
|
self.econtext.config['on_start'](self)
|
|
|
|
|
|
|
|
|
|
_profile_hook('main', self._dispatch_calls)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ExternalContext(object):
|
|
|
|
|
detached = False
|
|
|
|
|
|
|
|
|
|
def __init__(self, config):
|
|
|
|
|
self.config = config
|
|
|
|
|
|
|
|
|
|
def _on_broker_shutdown(self):
|
|
|
|
|
self.recv.close()
|
|
|
|
|
|
|
|
|
|
def _on_broker_exit(self):
|
|
|
|
|
if not self.config['profiling']:
|
|
|
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
|
|
@ -2041,16 +2086,12 @@ class ExternalContext(object):
|
|
|
|
|
|
|
|
|
|
in_fd = self.config.get('in_fd', 100)
|
|
|
|
|
out_fd = self.config.get('out_fd', 1)
|
|
|
|
|
self.recv = Receiver(router=self.router,
|
|
|
|
|
handle=CALL_FUNCTION,
|
|
|
|
|
policy=has_parent_authority)
|
|
|
|
|
self.stream = Stream(self.router, parent_id)
|
|
|
|
|
self.stream.name = 'parent'
|
|
|
|
|
self.stream.accept(in_fd, out_fd)
|
|
|
|
|
self.stream.receive_side.keep_alive = False
|
|
|
|
|
|
|
|
|
|
listen(self.stream, 'disconnect', self._on_parent_disconnect)
|
|
|
|
|
listen(self.broker, 'shutdown', self._on_broker_shutdown)
|
|
|
|
|
listen(self.broker, 'exit', self._on_broker_exit)
|
|
|
|
|
|
|
|
|
|
os.close(in_fd)
|
|
|
|
@ -2148,40 +2189,6 @@ class ExternalContext(object):
|
|
|
|
|
# Reopen with line buffering.
|
|
|
|
|
sys.stdout = os.fdopen(1, 'w', 1)
|
|
|
|
|
|
|
|
|
|
def _dispatch_one(self, msg):
|
|
|
|
|
data = msg.unpickle(throw=False)
|
|
|
|
|
_v and LOG.debug('_dispatch_calls(%r)', data)
|
|
|
|
|
|
|
|
|
|
modname, klass, func, args, kwargs = data
|
|
|
|
|
obj = import_module(modname)
|
|
|
|
|
if klass:
|
|
|
|
|
obj = getattr(obj, klass)
|
|
|
|
|
fn = getattr(obj, func)
|
|
|
|
|
if getattr(fn, 'mitogen_takes_econtext', None):
|
|
|
|
|
kwargs.setdefault('econtext', self)
|
|
|
|
|
if getattr(fn, 'mitogen_takes_router', None):
|
|
|
|
|
kwargs.setdefault('router', self.router)
|
|
|
|
|
return fn(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def _dispatch_calls(self):
|
|
|
|
|
if self.config.get('on_start'):
|
|
|
|
|
self.config['on_start'](self)
|
|
|
|
|
|
|
|
|
|
for msg in self.recv:
|
|
|
|
|
try:
|
|
|
|
|
ret = self._dispatch_one(msg)
|
|
|
|
|
_v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret)
|
|
|
|
|
if msg.reply_to:
|
|
|
|
|
msg.reply(ret)
|
|
|
|
|
except Exception:
|
|
|
|
|
e = sys.exc_info()[1]
|
|
|
|
|
if msg.reply_to:
|
|
|
|
|
_v and LOG.debug('_dispatch_calls: %s', e)
|
|
|
|
|
msg.reply(CallError(e))
|
|
|
|
|
else:
|
|
|
|
|
LOG.exception('_dispatch_calls: %r', msg)
|
|
|
|
|
self.dispatch_stopped = True
|
|
|
|
|
|
|
|
|
|
def main(self):
|
|
|
|
|
self._setup_master()
|
|
|
|
|
try:
|
|
|
|
@ -2203,7 +2210,8 @@ class ExternalContext(object):
|
|
|
|
|
self.parent, mitogen.context_id, os.getpid())
|
|
|
|
|
_v and LOG.debug('Recovered sys.executable: %r', sys.executable)
|
|
|
|
|
|
|
|
|
|
_profile_hook('main', self._dispatch_calls)
|
|
|
|
|
self.dispatcher = Dispatcher(self)
|
|
|
|
|
self.dispatcher.run()
|
|
|
|
|
_v and LOG.debug('ExternalContext.main() normal exit')
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
LOG.debug('KeyboardInterrupt received, exiting gracefully.')
|
|
|
|
|