Merge branch 'wip-module-loader'

pull/79/head
David Wilson 7 years ago
commit bf5fbc9719

@ -1,5 +1,6 @@
-r docs/docs-requirements.txt
ansible==2.3.1.0
Django==1.11.5 # for module_finder_test
docker==2.5.1
docker[tls]==2.5.1
mock==2.0.0

@ -1,6 +1,9 @@
# Makefile for Sphinx documentation
#
default:
sphinx-autobuild . _build/html/
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build

@ -36,8 +36,8 @@ neatly indented, and the purpose of the snippet seems clear.
2. How will the ``if`` statement behave if there is a problem with the machine,
and, say, the ``/bin/grep`` binary is absent?
3. Ignoring quoting, are there any other syntax problems?
4. If this snippet is pasted snippet from its original script into an
interactive shell, will it behave the same as before?
4. If this snippet is pasted from its original script into an interactive
shell, will it behave the same as before?
5. Can you think offhand of differences in how the arguments to ``sudo
...`` and ``ssh fileserver ...`` are parsed?
6. In which context will the ``*`` glob be expanded, if it is expanded at all?
@ -107,13 +107,13 @@ each stage:
'
Even with Python handling the heavy lifting of quoting each shell layer, and
and even if we fixed the aforementioned minor disk-wiping issue, I am still not
100% confident that I know precisely the argument handling rules for all of
``su``, ``sudo``, ``ssh``, and ``bash``.
even if the aforementioned minor disk-wiping issue was fixed, it is still not
100% clear that argument handling rules for all of ``su``, ``sudo``, ``ssh``,
and ``bash`` are correctly respected.
Finally, if any of the login shells involved may not be set to ``bash``, we
must introduce additional layers of quoting, in order to explicitly invoke
``bash`` at each stage, causing an explosion in quoting:
Finally, if any login shell involved is not ``bash``, we must introduce
additional quoting in order to explicitly invoke ``bash`` at each stage,
causing an explosion in quoting:
.. code-block:: bash

@ -634,7 +634,7 @@ Before replying to a child's request for a module with dependencies:
determines the :py:mod:`django` module code in the master has :keyword:`import`
statements for :py:mod:`django.utils`, :py:mod:`django.utils.lru_cache`, and
:py:mod:`django.utils.version`,
and that exceution of the module code on the master caused those modules to
and that execution of the module code on the master caused those modules to
appear in the master's :py:data:`sys.modules`, there is high probability
execution of the :py:mod:`django` module code in the child will cause the
same modules to be loaded. Since all those modules exist within the
@ -642,7 +642,7 @@ Before replying to a child's request for a module with dependencies:
it is safe to assume the child will make follow-up requests for those modules
too.
In the example, this replaces 4 round-trips with 1 round-trip.
In the example, 4 round-trips are replaced by 1 round-trip.
For any package module ever requested by a child, the parent keeps a note of
the name of the package for one final optimization:
@ -663,7 +663,7 @@ the name of the package for one final optimization:
:py:mod:`django.dispatch`, and 7 :py:mod:`django.utils` indirect dependencies
for :py:mod:`django.db`.
In the example, this replaces 17 round-trips with 1 round-trip.
In the example, 17 round-trips are replaced by 1 round-trip.
The method used to detect import statements is similar to the standard library
:py:mod:`modulefinder` module: rather than analyze module source code,
@ -679,6 +679,86 @@ Child Module Enumeration
Package children are enumerated using :py:func:`pkgutil.iter_modules`.
Concurrency
###########
Duplicate requests must never be issued to the parent, either due to a local
import or any :py:data:`GET_MODULE` originating from a child. This lets parents
assume a module requested once by a downstream connection need never be
re-sent, for example, if it appears as a preloading dependency in a subsequent
:py:data:`GET_MODULE`, or had been requested immediately after being sent as a
preloading dependency for an unrelated request by a descendent.
Therefore each tree layer must deduplicate :py:data:`GET_MODULE` requests, and
synchronize their descendents and local threads on corresponding
:py:data:`LOAD_MODULE` responses from the parent.
In each context, pending requests are serialized by a
:py:class:`threading.Lock` within :py:class:`mitogen.core.Importer`, which may
only be held for operations that cannot block, since :py:class:`ModuleForwarder
<mitogen.master.ModuleForwarder>` must acquire it while synchronizing
:py:data:`GET_MODULE` requests from children on the IO multiplexer thread.
Requests From Local Threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~
When Mitogen begins satisfying an import, it is known the module has never been
imported in the local process. :py:class:`Importer <mitogen.core.Importer>`
executes under the runtime importer lock, ensuring :py:keyword:`import`
statements executing in local threads are serialized.
.. note::
In Python 2, :py:exc:`ImportError` is raised when :py:keyword:`import` is
attempted while the runtime import lock is held by another thread,
therefore imports must be serialized by only attempting them from the main
(:py:data:`CALL_FUNCTION`) thread.
The problem is most likely to manifest in third party libraries that lazily
import optional dependencies at runtime from a non-main thread. The
workaround is to explicitly import those dependencies from the main thread
before initializing the third party library.
This was fixed in Python 3.5, but Python 3.x is not yet supported. See
`Python Issue #9260`_.
.. _Python Issue #9260: https://bugs.python.org/issue9260
While holding its own lock, :py:class:`Importer <mitogen.core.Importer>`
checks if the source is not yet cached, determines if an in-flight
:py:data:`GET_MODULE` exists for it, starting one if none exists, adds itself
to a list of callbacks fired when a corresponding :py:data:`LOAD_MODULE`
arrives from the parent, then sleeps waiting for the callback.
When the source becomes available, the module is constructed on the calling
thread using the best practice documented in `PEP 302`_.
.. _PEP 302: https://www.python.org/dev/peps/pep-0302/
Requests From Children
~~~~~~~~~~~~~~~~~~~~~~
As with local imports, when :py:data:`GET_MODULE` is received from a child,
while holding the :py:class:`Importer <mitogen.core.Importer>` lock,
:py:class:`ModuleForwarder <mitogen.master.ModuleForwarder>` checks if the
source is not yet cached, determines if an in-flight :py:data:`GET_MODULE`
toward the parent exists for it, starting one if none exists, then adds a
completion handler to the list of callbacks fired when a corresponding
:py:data:`LOAD_MODULE` arrives from the parent.
When the source becomes available, the completion handler issues corresponding
:py:data:`LOAD_MODULE` messages toward the child for the requested module after
any required for dependencies known to be absent from the child.
Since intermediaries do not know a module's dependencies until the module's
source arrives, it is not possible to preemptively issue :py:data:`LOAD_MODULE`
for those dependencies toward a requesting child as they become available from
the parent at the intermediary. This creates needless network serialization and
latency that should be addressed in a future design.
Use Of Threads
--------------

@ -170,6 +170,22 @@ Importer Class
:members:
Responder Class
---------------
.. currentmodule:: mitogen.master
.. autoclass:: ModuleResponder
:members:
Forwarder Class
---------------
.. currentmodule:: mitogen.master
.. autoclass:: ModuleForwarder
:members:
ExternalContext Class
---------------------

@ -56,6 +56,7 @@ FORWARD_LOG = 102
ADD_ROUTE = 103
ALLOCATE_ID = 104
SHUTDOWN = 105
LOAD_MODULE = 106
CHUNK_SIZE = 16384
@ -305,6 +306,8 @@ def _queue_interruptible_get(queue, timeout=None, block=True):
if timeout is not None:
timeout += time.time()
LOG.info('timeout = %r, block = %r', timeout, block)
msg = None
while msg is None and (timeout is None or timeout > time.time()):
try:
@ -395,7 +398,7 @@ class Importer(object):
:param context: Context to communicate via.
"""
def __init__(self, context, core_src):
def __init__(self, router, context, core_src):
self._context = context
self._present = {'mitogen': [
'mitogen.ansible',
@ -407,13 +410,19 @@ class Importer(object):
'mitogen.sudo',
'mitogen.utils',
]}
self._lock = threading.Lock()
# Presence of an entry in this map indicates in-flight GET_MODULE.
self._callbacks = {}
self.tls = threading.local()
router.add_handler(self._on_load_module, LOAD_MODULE)
self._cache = {}
if core_src:
self._cache['mitogen.core'] = (
'mitogen.core',
None,
'mitogen/core.py',
zlib.compress(core_src),
[],
)
def __repr__(self):
@ -468,23 +477,53 @@ class Importer(object):
# later.
os.environ['PBR_VERSION'] = '0.0.0'
def _on_load_module(self, msg):
tup = msg.unpickle()
fullname = tup[0]
LOG.debug('Importer._on_load_module(%r)', fullname)
self._lock.acquire()
try:
self._cache[fullname] = tup
callbacks = self._callbacks.pop(fullname, [])
finally:
self._lock.release()
for callback in callbacks:
callback()
def _request_module(self, fullname, callback):
self._lock.acquire()
try:
present = fullname in self._cache
if not present:
funcs = self._callbacks.get(fullname)
if funcs is not None:
LOG.debug('_request_module(%r): in flight', fullname)
funcs.append(callback)
else:
LOG.debug('_request_module(%r): new request', fullname)
self._callbacks[fullname] = [callback]
self._context.send(Message(data=fullname, handle=GET_MODULE))
finally:
self._lock.release()
if present:
callback()
def load_module(self, fullname):
LOG.debug('Importer.load_module(%r)', fullname)
self._load_module_hacks(fullname)
try:
ret = self._cache[fullname]
except KeyError:
self._cache[fullname] = ret = (
self._context.send_await(
Message(data=fullname, handle=GET_MODULE)
)
)
event = threading.Event()
self._request_module(fullname, event.set)
event.wait()
ret = self._cache[fullname]
if ret is None:
raise ImportError('Master does not have %r' % (fullname,))
pkg_present = ret[0]
pkg_present = ret[1]
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = self.get_filename(fullname)
mod.__loader__ = self
@ -500,11 +539,11 @@ class Importer(object):
def get_filename(self, fullname):
if fullname in self._cache:
return 'master:' + self._cache[fullname][1]
return 'master:' + self._cache[fullname][2]
def get_source(self, fullname):
if fullname in self._cache:
return zlib.decompress(self._cache[fullname][2])
return zlib.decompress(self._cache[fullname][3])
class LogHandler(logging.Handler):
@ -593,6 +632,7 @@ class Stream(BasicStream):
self._router = router
self.remote_id = remote_id
self.name = 'default'
self.modules_sent = set()
self.construct(**kwargs)
self._output_buf = collections.deque()
@ -853,6 +893,10 @@ class Router(object):
def __repr__(self):
return 'Router(%r)' % (self.broker,)
def stream_by_id(self, dst_id):
return self._stream_by_id.get(dst_id,
self._stream_by_id.get(mitogen.parent_id))
def on_disconnect(self, stream, broker):
"""Invoked by Stream.on_disconnect()."""
for context in self._context_by_id.itervalues():
@ -1132,7 +1176,7 @@ class ExternalContext(object):
else:
core_src = None
self.importer = Importer(self.parent, core_src)
self.importer = Importer(self.router, self.parent, core_src)
sys.meta_path.append(self.importer)
def _setup_package(self, context_id, parent_ids):

@ -266,9 +266,12 @@ def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'),
for c in ordit)
opit, opit2, opit3 = itertools.tee(opit, 3)
next(opit2)
next(opit3)
next(opit3)
try:
next(opit2)
next(opit3)
next(opit3)
except StopIteration:
return
for oparg1, oparg2, (op3, arg3) in itertools.izip(opit, opit2, opit3):
if op3 == IMPORT_NAME:
@ -532,6 +535,10 @@ class ModuleFinder(object):
"""Given an ImportFrom AST node, guess the prefix that should be tacked
on to an alias name to produce a canonical name. `fullname` is the name
of the module in which the ImportFrom appears."""
mod = sys.modules.get(fullname, None)
if hasattr(mod, '__path__'):
fullname += '.__init__'
if level == 0 or not fullname:
return ''
@ -540,11 +547,11 @@ class ModuleFinder(object):
# This would be an ImportError in real code.
return ''
return '.'.join(bits[:-level]) + '.'
return '.'.join(bits[:-level])
def generate_parent_names(self, fullname):
while '.' in fullname:
fullname = fullname[:fullname.rindex('.')]
fullname, _, _ = fullname.rpartition('.')
yield fullname
def find_related_imports(self, fullname):
@ -599,13 +606,14 @@ class ModuleFinder(object):
stack.extend(set(fullnames).difference(found, stack, [fullname]))
found.update(fullnames)
return found
return sorted(found)
class ModuleResponder(object):
def __init__(self, router):
self._router = router
self._finder = ModuleFinder()
self._cache = {} # fullname -> pickled
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
def __repr__(self):
@ -622,40 +630,78 @@ class ModuleResponder(object):
return src[:match.start()]
return src
def _build_tuple(self, fullname):
if fullname in self._cache:
return self._cache[fullname]
path, source, is_pkg = self._finder.get_module_source(fullname)
if source is None:
raise ImportError('could not find %r' % (fullname,))
if is_pkg:
pkg_present = get_child_modules(path, fullname)
LOG.debug('_build_tuple(%r, %r) -> %r',
path, fullname, pkg_present)
else:
pkg_present = None
if fullname == '__main__':
source = self.neutralize_main(source)
compressed = zlib.compress(source)
related = list(self._finder.find_related(fullname))
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related
tup = fullname, pkg_present, path, compressed, related
self._cache[fullname] = tup
return tup
def _send_load_module(self, stream, msg, fullname):
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
self._router.route(
mitogen.core.Message.pickled(
self._build_tuple(fullname),
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
)
stream.sent_modules.add(fullname)
def _on_get_module(self, msg):
LOG.debug('%r.get_module(%r)', self, msg)
if msg == mitogen.core._DEAD:
return
stream = self._router.stream_by_id(msg.src_id)
fullname = msg.data
try:
path, source, is_pkg = self._finder.get_module_source(fullname)
if source is None:
raise ImportError('could not find %r' % (fullname,))
if is_pkg:
pkg_present = get_child_modules(path, fullname)
LOG.debug('get_child_modules(%r, %r) -> %r',
path, fullname, pkg_present)
else:
pkg_present = None
tup = self._build_tuple(fullname)
for name in tup[4]: # related
if name == fullname:
# Must be sent last
continue
parent_pkg, _, _ = name.partition('.')
if parent_pkg != fullname and parent_pkg not in stream.sent_packages:
# Parent hasn't been required, so don't load this guy yet.
continue
if name in stream.sent_modules:
# Submodule has been sent already, skip.
continue
self._send_load_module(stream, msg, name)
self._send_load_module(stream, msg, fullname)
if tup[1] is not None:
# It's a package, record the fact it was sent.
stream.sent_packages.add(fullname)
if fullname == '__main__':
source = self.neutralize_main(source)
compressed = zlib.compress(source)
related = list(self._finder.find_related(fullname))
self._router.route(
mitogen.core.Message.pickled(
(pkg_present, path, compressed, related),
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
except Exception:
LOG.debug('While importing %r', fullname, exc_info=True)
self._router.route(
mitogen.core.Message.pickled(
None,
(fullname, None, None, None, []),
dst_id=msg.src_id,
handle=msg.reply_to,
)
@ -682,41 +728,28 @@ class ModuleForwarder(object):
return
fullname = msg.data
cached = self.importer._cache.get(fullname)
if cached:
LOG.debug('%r._on_get_module(): using cached %r', self, fullname)
self.router.route(
mitogen.core.Message.pickled(
cached,
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
else:
LOG.debug('%r._on_get_module(): requesting %r', self, fullname)
self.parent_context.send(
mitogen.core.Message(
data=msg.data,
handle=mitogen.core.GET_MODULE,
reply_to=self.router.add_handler(
lambda m: self._on_got_source(m, msg),
persist=False
)
)
)
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)
def _on_got_source(self, msg, original_msg):
LOG.debug('%r._on_got_source(%r, %r)', self, msg, original_msg)
fullname = original_msg.data
self.importer._cache[fullname] = msg.unpickle()
def _send_one_module(self, msg, tup):
self.router.route(
mitogen.core.Message(
data=msg.data,
dst_id=original_msg.src_id,
handle=original_msg.reply_to,
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _on_cache_callback(self, msg, fullname):
LOG.debug('%r._on_get_module(): sending %r', self, fullname)
tup = self.importer._cache[fullname]
if tup is not None:
for related in tup[4]:
rtup = self.importer._cache[fullname]
self._send_one_module(msg, rtup)
self._send_one_module(msg, tup)
class Stream(mitogen.core.Stream):
"""
@ -731,6 +764,11 @@ class Stream(mitogen.core.Stream):
#: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
profiling = False
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.sent_packages = set(['mitogen'])
def construct(self, remote_name=None, python_path=None, debug=False,
profiling=False, **kwargs):
"""Get the named context running on the local machine, creating it if

@ -144,6 +144,195 @@ class FindRelatedImportsTest(testlib.TestCase):
'mitogen.master',
])
def test_django_pkg(self):
import django
related = self.call('django')
self.assertEquals(related, [
'django',
'django.utils',
'django.utils.lru_cache',
'django.utils.version',
])
def test_django_db(self):
import django.db
related = self.call('django.db')
self.assertEquals(related, [
'django',
'django.conf',
'django.conf.global_settings',
'django.core',
'django.core.exceptions',
'django.core.signals',
'django.db',
'django.db.utils',
'django.dispatch',
'django.dispatch.dispatcher',
'django.dispatch.weakref_backports',
'django.utils',
'django.utils._os',
'django.utils.deprecation',
'django.utils.encoding',
'django.utils.functional',
'django.utils.inspect',
'django.utils.lru_cache',
'django.utils.module_loading',
'django.utils.six',
'django.utils.version',
])
def test_django_db_models(self):
import django.db.models
related = self.call('django.db.models'), [
'django',
'django.apps',
'django.conf',
'django.conf.global_settings',
'django.core',
'django.core.cache',
'django.core.cache.backends',
'django.core.cache.backends.base',
'django.core.checks',
'django.core.checks.caches',
'django.core.checks.compatibility',
'django.core.checks.compatibility.django_1_10',
'django.core.checks.compatibility.django_1_8_0',
'django.core.checks.database',
'django.core.checks.model_checks',
'django.core.checks.security',
'django.core.checks.security.base',
'django.core.checks.security.csrf',
'django.core.checks.security.sessions',
'django.core.checks.templates',
'django.core.checks.urls',
'django.core.checks.utils',
'django.core.exceptions',
'django.core.files',
'django.core.files.base',
'django.core.files.images',
'django.core.files.locks',
'django.core.files.move',
'django.core.files.storage',
'django.core.files.utils',
'django.core.signals',
'django.core.validators',
'django.db',
'django.db.backends',
'django.db.backends.utils',
'django.db.models',
'django.db.models.aggregates',
'django.db.models.base',
'django.db.models.constants',
'django.db.models.deletion',
'django.db.models.expressions',
'django.db.models.fields',
'django.db.models.fields.files',
'django.db.models.fields.proxy',
'django.db.models.fields.related',
'django.db.models.fields.related_descriptors',
'django.db.models.fields.related_lookups',
'django.db.models.fields.reverse_related',
'django.db.models.functions',
'django.db.models.indexes',
'django.db.models.lookups',
'django.db.models.manager',
'django.db.models.options',
'django.db.models.query',
'django.db.models.query_utils',
'django.db.models.signals',
'django.db.models.sql',
'django.db.models.sql.constants',
'django.db.models.sql.datastructures',
'django.db.models.sql.query',
'django.db.models.sql.subqueries',
'django.db.models.sql.where',
'django.db.models.utils',
'django.db.transaction',
'django.db.utils',
'django.dispatch',
'django.dispatch.dispatcher',
'django.dispatch.weakref_backports',
'django.forms',
'django.forms.boundfield',
'django.forms.fields',
'django.forms.forms',
'django.forms.formsets',
'django.forms.models',
'django.forms.renderers',
'django.forms.utils',
'django.forms.widgets',
'django.template',
'django.template.backends',
'django.template.backends.base',
'django.template.backends.django',
'django.template.backends.jinja2',
'django.template.base',
'django.template.context',
'django.template.engine',
'django.template.exceptions',
'django.template.library',
'django.template.loader',
'django.template.utils',
'django.templatetags',
'django.templatetags.static',
'django.utils',
'django.utils._os',
'django.utils.crypto',
'django.utils.datastructures',
'django.utils.dateformat',
'django.utils.dateparse',
'django.utils.dates',
'django.utils.datetime_safe',
'django.utils.deconstruct',
'django.utils.decorators',
'django.utils.deprecation',
'django.utils.duration',
'django.utils.encoding',
'django.utils.formats',
'django.utils.functional',
'django.utils.html',
'django.utils.html_parser',
'django.utils.http',
'django.utils.inspect',
'django.utils.ipv6',
'django.utils.itercompat',
'django.utils.lru_cache',
'django.utils.module_loading',
'django.utils.numberformat',
'django.utils.safestring',
'django.utils.six',
'django.utils.text',
'django.utils.timezone',
'django.utils.translation',
'django.utils.tree',
'django.utils.version',
'jinja2',
'jinja2._compat',
'jinja2.bccache',
'jinja2.compiler',
'jinja2.defaults',
'jinja2.environment',
'jinja2.exceptions',
'jinja2.filters',
'jinja2.idtracking',
'jinja2.lexer',
'jinja2.loaders',
'jinja2.nodes',
'jinja2.optimizer',
'jinja2.parser',
'jinja2.runtime',
'jinja2.tests',
'jinja2.utils',
'jinja2.visitor',
'markupsafe',
'markupsafe._compat',
'markupsafe._speedups',
'pytz',
'pytz.exceptions',
'pytz.lazy',
'pytz.tzfile',
'pytz.tzinfo',
])
if __name__ == '__main__':
unittest2.main()

Loading…
Cancel
Save