core: have ExternalContext accept a config dict rather than kwargs.

The parameter lists had gotten out of control.
pull/255/head
David Wilson 7 years ago
parent eac4cc7afe
commit 00edf0d66d

@ -1596,11 +1596,14 @@ class Broker(object):
class ExternalContext(object): class ExternalContext(object):
detached = False detached = False
def __init__(self, config):
self.config = config
def _on_broker_shutdown(self): def _on_broker_shutdown(self):
self.channel.close() self.channel.close()
def _on_broker_exit(self): def _on_broker_exit(self):
if not self.profiling: if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGTERM)
def _on_shutdown_msg(self, msg): def _on_shutdown_msg(self, msg):
@ -1638,29 +1641,31 @@ class ExternalContext(object):
LOG.error('Stream had %d bytes after 2000ms', pending) LOG.error('Stream had %d bytes after 2000ms', pending)
self.broker.defer(stream.on_disconnect, self.broker) self.broker.defer(stream.on_disconnect, self.broker)
def _setup_master(self, max_message_size, profiling, unidirectional, def _setup_master(self):
parent_id, context_id, in_fd, out_fd): Router.max_message_size = self.config['max_message_size']
Router.max_message_size = max_message_size if self.config['profiling']:
self.profiling = profiling
if profiling:
enable_profiling() enable_profiling()
self.broker = Broker() self.broker = Broker()
self.router = Router(self.broker) self.router = Router(self.broker)
self.router.undirectional = unidirectional self.router.undirectional = self.config['unidirectional']
self.router.add_handler( self.router.add_handler(
fn=self._on_shutdown_msg, fn=self._on_shutdown_msg,
handle=SHUTDOWN, handle=SHUTDOWN,
policy=has_parent_authority, policy=has_parent_authority,
) )
self.master = Context(self.router, 0, 'master') self.master = Context(self.router, 0, 'master')
parent_id = self.config['parent_ids'][0]
if parent_id == 0: if parent_id == 0:
self.parent = self.master self.parent = self.master
else: else:
self.parent = Context(self.router, parent_id, 'parent') self.parent = Context(self.router, parent_id, 'parent')
self.channel = Receiver(router=self.router, in_fd = self.config.get('in_fd', 100)
handle=CALL_FUNCTION, out_fd = self.config.get('out_fd', 1)
policy=has_parent_authority)
self.recv = Receiver(router=self.router,
handle=CALL_FUNCTION,
policy=has_parent_authority)
self.stream = Stream(self.router, parent_id) self.stream = Stream(self.router, parent_id)
self.stream.name = 'parent' self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd) self.stream.accept(in_fd, out_fd)
@ -1678,20 +1683,22 @@ class ExternalContext(object):
except OSError: except OSError:
pass # No first stage exists (e.g. fakessh) pass # No first stage exists (e.g. fakessh)
def _setup_logging(self, debug, log_level): def _setup_logging(self):
root = logging.getLogger() root = logging.getLogger()
root.setLevel(log_level) root.setLevel(self.config['log_level'])
root.handlers = [LogHandler(self.master)] root.handlers = [LogHandler(self.master)]
if debug: if self.config['debug']:
enable_debug_logging() enable_debug_logging()
def _setup_importer(self, importer, core_src_fd, whitelist, blacklist): def _setup_importer(self):
importer = self.config.get('importer')
if importer: if importer:
importer._install_handler(self.router) importer._install_handler(self.router)
importer._context = self.parent importer._context = self.parent
else: else:
core_src_fd = self.config.get('core_src_fd', 101)
if core_src_fd: if core_src_fd:
fp = os.fdopen(101, 'r', 1) fp = os.fdopen(core_src_fd, 'r', 1)
try: try:
core_size = int(fp.readline()) core_size = int(fp.readline())
core_src = fp.read(core_size) core_src = fp.read(core_size)
@ -1702,8 +1709,13 @@ class ExternalContext(object):
else: else:
core_src = None core_src = None
importer = Importer(self.router, self.parent, importer = Importer(
core_src, whitelist, blacklist) self.router,
self.parent,
core_src,
self.config.get('whitelist', ()),
self.config.get('blacklist', ()),
)
self.importer = importer self.importer = importer
self.router.importer = importer self.router.importer = importer
@ -1723,12 +1735,12 @@ class ExternalContext(object):
sys.modules['mitogen.core'] = mitogen.core sys.modules['mitogen.core'] = mitogen.core
del sys.modules['__main__'] del sys.modules['__main__']
def _setup_globals(self, version, context_id, parent_ids): def _setup_globals(self):
mitogen.__version__ = version
mitogen.is_master = False mitogen.is_master = False
mitogen.context_id = context_id mitogen.__version__ = self.config['version']
mitogen.parent_ids = parent_ids mitogen.context_id = self.config['context_id']
mitogen.parent_id = parent_ids[0] mitogen.parent_ids = self.config['parent_ids'][:]
mitogen.parent_id = mitogen.parent_ids[0]
def _setup_stdio(self): def _setup_stdio(self):
# We must open this prior to closing stdout, otherwise it will recycle # We must open this prior to closing stdout, otherwise it will recycle
@ -1774,7 +1786,7 @@ class ExternalContext(object):
return fn(*args, **kwargs) return fn(*args, **kwargs)
def _dispatch_calls(self): def _dispatch_calls(self):
for msg in self.channel: for msg in self.recv:
try: try:
msg.reply(self._dispatch_one(msg)) msg.reply(self._dispatch_one(msg))
except Exception: except Exception:
@ -1783,28 +1795,24 @@ class ExternalContext(object):
msg.reply(CallError(e)) msg.reply(CallError(e))
self.dispatch_stopped = True self.dispatch_stopped = True
def main(self, parent_ids, context_id, debug, profiling, log_level, def main(self):
unidirectional, max_message_size, version, in_fd=100, out_fd=1, self._setup_master()
core_src_fd=101, setup_stdio=True, setup_package=True,
importer=None, whitelist=(), blacklist=()):
self._setup_master(max_message_size, profiling, unidirectional,
parent_ids[0], context_id, in_fd, out_fd)
try: try:
try: try:
self._setup_logging(debug, log_level) self._setup_logging()
self._setup_importer(importer, core_src_fd, whitelist, blacklist) self._setup_importer()
self._reap_first_stage() self._reap_first_stage()
if setup_package: if self.config.get('setup_package', True):
self._setup_package() self._setup_package()
self._setup_globals(version, context_id, parent_ids) self._setup_globals()
if setup_stdio: if self.config.get('setup_stdio', True):
self._setup_stdio() self._setup_stdio()
self.router.register(self.parent, self.stream) self.router.register(self.parent, self.stream)
sys.executable = os.environ.pop('ARGV0', sys.executable) sys.executable = os.environ.pop('ARGV0', sys.executable)
_v and LOG.debug('Connected to %s; my ID is %r, PID is %r', _v and LOG.debug('Connected to %s; my ID is %r, PID is %r',
self.parent, context_id, os.getpid()) self.parent, mitogen.context_id, os.getpid())
_v and LOG.debug('Recovered sys.executable: %r', sys.executable) _v and LOG.debug('Recovered sys.executable: %r', sys.executable)
_profile_hook('main', self._dispatch_calls) _profile_hook('main', self._dispatch_calls)

@ -304,6 +304,25 @@ def _fakessh_main(dest_context_id, econtext):
process.control.put(('exit', None)) process.control.put(('exit', None))
def _get_econtext_config(context, sock2):
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
return {
'context_id': context.context_id,
'core_src_fd': None,
'debug': getattr(context.router, 'debug', False),
'in_fd': sock2.fileno(),
'log_level': mitogen.parent.get_log_level(),
'max_message_size': context.router.max_message_size,
'out_fd': sock2.fileno(),
'parent_ids': parent_ids,
'profiling': getattr(context.router, 'profiling', False),
'unidirectional': getattr(context.router, 'unidirectional', False),
'setup_stdio': False,
'version': mitogen.__version__,
}
# #
# Public API. # Public API.
# #
@ -328,9 +347,6 @@ def run(dest, router, args, deadline=None, econtext=None):
# Held in socket buffer until process is booted. # Held in socket buffer until process is booted.
fakessh.call_async(_fakessh_main, dest.context_id) fakessh.call_async(_fakessh_main, dest.context_id)
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
tmp_path = tempfile.mkdtemp(prefix='mitogen_fakessh') tmp_path = tempfile.mkdtemp(prefix='mitogen_fakessh')
try: try:
ssh_path = os.path.join(tmp_path, 'ssh') ssh_path = os.path.join(tmp_path, 'ssh')
@ -339,20 +355,9 @@ def run(dest, router, args, deadline=None, econtext=None):
fp.write('#!%s\n' % (sys.executable,)) fp.write('#!%s\n' % (sys.executable,))
fp.write(inspect.getsource(mitogen.core)) fp.write(inspect.getsource(mitogen.core))
fp.write('\n') fp.write('\n')
fp.write('ExternalContext().main(**%r)\n' % ({ fp.write('ExternalContext(%r).main()\n' % (
'context_id': context_id, _get_econtext_config(context, sock2),
'core_src_fd': None, ))
'debug': getattr(router, 'debug', False),
'in_fd': sock2.fileno(),
'log_level': mitogen.parent.get_log_level(),
'max_message_size': router.max_message_size,
'out_fd': sock2.fileno(),
'parent_ids': parent_ids,
'profiling': getattr(router, 'profiling', False),
'unidirectional': getattr(router, 'unidirectional', False),
'setup_stdio': False,
'version': mitogen.__version__,
},))
finally: finally:
fp.close() fp.close()

@ -148,12 +148,12 @@ class Stream(mitogen.parent.Stream):
os.close(devnull) os.close(devnull)
childfp.close() childfp.close()
kwargs = self.get_main_kwargs() config = self.get_econtext_config()
kwargs['core_src_fd'] = None config['core_src_fd'] = None
kwargs['importer'] = self.importer config['importer'] = self.importer
kwargs['setup_package'] = False config['setup_package'] = False
try: try:
mitogen.core.ExternalContext().main(**kwargs) mitogen.core.ExternalContext(config).main()
finally: finally:
# Don't trigger atexit handlers, they were copied from the parent. # Don't trigger atexit handlers, they were copied from the parent.
os._exit(0) os._exit(0)

@ -896,7 +896,7 @@ class Stream(mitogen.core.Stream):
'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded,) 'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded,)
] ]
def get_main_kwargs(self): def get_econtext_config(self):
assert self.max_message_size is not None assert self.max_message_size is not None
parent_ids = mitogen.parent_ids[:] parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id) parent_ids.insert(0, mitogen.context_id)
@ -915,8 +915,8 @@ class Stream(mitogen.core.Stream):
def get_preamble(self): def get_preamble(self):
source = inspect.getsource(mitogen.core) source = inspect.getsource(mitogen.core)
source += '\nExternalContext().main(**%r)\n' % ( source += '\nExternalContext(%r).main()\n' % (
self.get_main_kwargs(), self.get_econtext_config(),
) )
return zlib.compress(minimize_source(source), 9) return zlib.compress(minimize_source(source), 9)

@ -1,6 +1,7 @@
import errno import errno
import os import os
import subprocess import subprocess
import sys
import tempfile import tempfile
import time import time

Loading…
Cancel
Save