From 4d01dc3ba6e95e878537b86f40d23d1132d428fc Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 11 Feb 2018 23:34:13 +0545 Subject: [PATCH] Initial pass at module preloading * Don't implement the rules for when preloading occurs yet * Don't attempt to streamily preload modules downstream while this context hasn't yet received the final module. There is quite significant latency buried in here, but for now it's a lot of work to fix. This works well enough to handle at least the mitogen package, but it's likely broken for anything bigger. --- mitogen/core.py | 70 +++++++++++++++++++++------ mitogen/master.py | 117 +++++++++++++++++++++++++--------------------- 2 files changed, 120 insertions(+), 67 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index c6220d64..111b23e3 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -56,6 +56,7 @@ FORWARD_LOG = 102 ADD_ROUTE = 103 ALLOCATE_ID = 104 SHUTDOWN = 105 +LOAD_MODULE = 106 CHUNK_SIZE = 16384 @@ -305,6 +306,8 @@ def _queue_interruptible_get(queue, timeout=None, block=True): if timeout is not None: timeout += time.time() + LOG.info('timeout = %r, block = %r', timeout, block) + msg = None while msg is None and (timeout is None or timeout > time.time()): try: @@ -395,7 +398,7 @@ class Importer(object): :param context: Context to communicate via. """ - def __init__(self, context, core_src): + def __init__(self, router, context, core_src): self._context = context self._present = {'mitogen': [ 'mitogen.ansible', @@ -407,13 +410,19 @@ class Importer(object): 'mitogen.sudo', 'mitogen.utils', ]} + self._lock = threading.Lock() + # Presence of an entry in this map indicates in-flight GET_MODULE. + self._callbacks = {} self.tls = threading.local() + router.add_handler(self._on_load_module, LOAD_MODULE) self._cache = {} if core_src: self._cache['mitogen.core'] = ( + 'mitogen.core', None, 'mitogen/core.py', zlib.compress(core_src), + [], ) def __repr__(self): @@ -468,23 +477,53 @@ class Importer(object): # later. os.environ['PBR_VERSION'] = '0.0.0' + def _on_load_module(self, msg): + tup = msg.unpickle() + fullname = tup[0] + LOG.debug('Importer._on_load_module(%r)', fullname) + + self._lock.acquire() + try: + self._cache[fullname] = tup + callbacks = self._callbacks.pop(fullname, []) + finally: + self._lock.release() + + for callback in callbacks: + callback() + + def _request_module(self, fullname, callback): + self._lock.acquire() + try: + present = fullname in self._cache + if not present: + funcs = self._callbacks.get(fullname) + if funcs is not None: + LOG.debug('_request_module(%r): in flight', fullname) + funcs.append(callback) + else: + LOG.debug('_request_module(%r): new request', fullname) + self._callbacks[fullname] = [callback] + self._context.send(Message(data=fullname, handle=GET_MODULE)) + finally: + self._lock.release() + + if present: + callback() + def load_module(self, fullname): LOG.debug('Importer.load_module(%r)', fullname) self._load_module_hacks(fullname) - try: - ret = self._cache[fullname] - except KeyError: - self._cache[fullname] = ret = ( - self._context.send_await( - Message(data=fullname, handle=GET_MODULE) - ) - ) + event = threading.Event() + self._request_module(fullname, event.set) + event.wait() + ret = self._cache[fullname] if ret is None: raise ImportError('Master does not have %r' % (fullname,)) - pkg_present = ret[0] + pkg_present = ret[1] mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod.__file__ = self.get_filename(fullname) mod.__loader__ = self @@ -500,11 +539,11 @@ class Importer(object): def get_filename(self, fullname): if fullname in self._cache: - return 'master:' + self._cache[fullname][1] + return 'master:' + self._cache[fullname][2] def get_source(self, fullname): if fullname in self._cache: - return zlib.decompress(self._cache[fullname][2]) + return zlib.decompress(self._cache[fullname][3]) class LogHandler(logging.Handler): @@ -593,6 +632,7 @@ class Stream(BasicStream): self._router = router self.remote_id = remote_id self.name = 'default' + self.modules_sent = set() self.construct(**kwargs) self._output_buf = collections.deque() @@ -853,6 +893,10 @@ class Router(object): def __repr__(self): return 'Router(%r)' % (self.broker,) + def stream_by_id(self, dst_id): + return self._stream_by_id.get(dst_id, + self._stream_by_id.get(mitogen.parent_id)) + def on_disconnect(self, stream, broker): """Invoked by Stream.on_disconnect().""" for context in self._context_by_id.itervalues(): @@ -1132,7 +1176,7 @@ class ExternalContext(object): else: core_src = None - self.importer = Importer(self.parent, core_src) + self.importer = Importer(self.router, self.parent, core_src) sys.meta_path.append(self.importer) def _setup_package(self, context_id, parent_ids): diff --git a/mitogen/master.py b/mitogen/master.py index f1fecd54..fcedacbd 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -599,13 +599,14 @@ class ModuleFinder(object): stack.extend(set(fullnames).difference(found, stack, [fullname])) found.update(fullnames) - return found + return sorted(found) class ModuleResponder(object): def __init__(self, router): self._router = router self._finder = ModuleFinder() + self._cache = {} # fullname -> pickled router.add_handler(self._on_get_module, mitogen.core.GET_MODULE) def __repr__(self): @@ -622,40 +623,57 @@ class ModuleResponder(object): return src[:match.start()] return src - def _on_get_module(self, msg): - LOG.debug('%r.get_module(%r)', self, msg) - if msg == mitogen.core._DEAD: - return + def _build_tuple(self, fullname): + if fullname in self._cache: + return self._cache[fullname] - fullname = msg.data - try: - path, source, is_pkg = self._finder.get_module_source(fullname) - if source is None: - raise ImportError('could not find %r' % (fullname,)) - - if is_pkg: - pkg_present = get_child_modules(path, fullname) - LOG.debug('get_child_modules(%r, %r) -> %r', - path, fullname, pkg_present) - else: - pkg_present = None + path, source, is_pkg = self._finder.get_module_source(fullname) + if source is None: + raise ImportError('could not find %r' % (fullname,)) - if fullname == '__main__': - source = self.neutralize_main(source) - compressed = zlib.compress(source) - related = list(self._finder.find_related(fullname)) + if is_pkg: + pkg_present = get_child_modules(path, fullname) + LOG.debug('_build_tuple(%r, %r) -> %r', + path, fullname, pkg_present) + else: + pkg_present = None + + if fullname == '__main__': + source = self.neutralize_main(source) + compressed = zlib.compress(source) + related = list(self._finder.find_related(fullname)) + # 0:fullname 1:pkg_present 2:path 3:compressed 4:related + return fullname, pkg_present, path, compressed, related + + def _send_load_module(self, msg, fullname): + stream = self._router.stream_by_id(msg.src_id) + if fullname not in stream.sent_modules: self._router.route( mitogen.core.Message.pickled( - (pkg_present, path, compressed, related), + self._build_tuple(fullname), dst_id=msg.src_id, - handle=msg.reply_to, + handle=mitogen.core.LOAD_MODULE, ) ) + stream.sent_modules.add(fullname) + + def _on_get_module(self, msg): + LOG.debug('%r.get_module(%r)', self, msg) + if msg == mitogen.core._DEAD: + return + + fullname = msg.data + try: + tup = self._build_tuple(fullname) + related = tup[4] + for name in related + [fullname]: + if name not in ('mitogen', 'mitogen.core'): + self._send_load_module(msg, name) except Exception: LOG.debug('While importing %r', fullname, exc_info=True) self._router.route( mitogen.core.Message.pickled( - None, + (fullname, None, None, None, []), dst_id=msg.src_id, handle=msg.reply_to, ) @@ -682,41 +700,28 @@ class ModuleForwarder(object): return fullname = msg.data - cached = self.importer._cache.get(fullname) - if cached: - LOG.debug('%r._on_get_module(): using cached %r', self, fullname) - self.router.route( - mitogen.core.Message.pickled( - cached, - dst_id=msg.src_id, - handle=msg.reply_to, - ) - ) - else: - LOG.debug('%r._on_get_module(): requesting %r', self, fullname) - self.parent_context.send( - mitogen.core.Message( - data=msg.data, - handle=mitogen.core.GET_MODULE, - reply_to=self.router.add_handler( - lambda m: self._on_got_source(m, msg), - persist=False - ) - ) - ) + callback = lambda: self._on_cache_callback(msg, fullname) + self.importer._request_module(fullname, callback) - def _on_got_source(self, msg, original_msg): - LOG.debug('%r._on_got_source(%r, %r)', self, msg, original_msg) - fullname = original_msg.data - self.importer._cache[fullname] = msg.unpickle() + def _send_one_module(self, msg, tup): self.router.route( - mitogen.core.Message( - data=msg.data, - dst_id=original_msg.src_id, - handle=original_msg.reply_to, + mitogen.core.Message.pickled( + self.importer._cache[fullname], + dst_id=msg.src_id, + handle=mitogen.core.LOAD_MODULE, ) ) + def _on_cache_callback(self, msg, fullname): + LOG.debug('%r._on_get_module(): sending %r', self, fullname) + tup = self.importer._cache[fullname] + if tup is not None: + for related in tup[4]: + rtup = self.importer._cache[fullname] + self._send_one_module(rtup) + + self._send_one_module(tup) + class Stream(mitogen.core.Stream): """ @@ -731,6 +736,10 @@ class Stream(mitogen.core.Stream): #: True to cause context to write /tmp/mitogen.stats...log. profiling = False + def __init__(self, *args, **kwargs): + super(Stream, self).__init__(*args, **kwargs) + self.sent_modules = set() + def construct(self, remote_name=None, python_path=None, debug=False, profiling=False, **kwargs): """Get the named context running on the local machine, creating it if