Beginnings of module_finder_test

pull/45/head
David Wilson 7 years ago
parent fccf1b1cab
commit eb6afee514

@ -1,4 +1,5 @@
-r docs/docs-requirements.txt -r docs/docs-requirements.txt
ansible==2.3.1.0
docker==2.5.1 docker==2.5.1
docker[tls]==2.5.1 docker[tls]==2.5.1
pytest-capturelog==0.7 pytest-capturelog==0.7

@ -405,7 +405,7 @@ class ModuleFinder(object):
"""Attempt to fetch source code via pkgutil. In an ideal world, this """Attempt to fetch source code via pkgutil. In an ideal world, this
would be the only required implementation of get_module().""" would be the only required implementation of get_module()."""
loader = pkgutil.find_loader(fullname) loader = pkgutil.find_loader(fullname)
LOG.debug('pkgutil.find_loader(%r) -> %r', fullname, loader) LOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r', fullname, loader)
if not loader: if not loader:
return return
@ -420,51 +420,54 @@ class ModuleFinder(object):
def _get_module_via_sys_modules(self, fullname): def _get_module_via_sys_modules(self, fullname):
"""Attempt to fetch source code via sys.modules. This is specifically """Attempt to fetch source code via sys.modules. This is specifically
to support __main__, but it may catch a few more cases.""" to support __main__, but it may catch a few more cases."""
if fullname not in sys.modules: module = sys.modules.get(fullname)
LOG.debug('%r does not appear in sys.modules', fullname) if not isinstance(module, types.ModuleType):
LOG.debug('sys.modules[%r] absent or not a regular module',
fullname)
return return
if 'six.moves' in fullname: modpath = getattr(module, '__file__', '')
# TODO: causes inspect.getsource() to explode.
return None, None, None
modpath = getattr(sys.modules[fullname], '__file__', '')
if not modpath.rstrip('co').endswith('.py'): if not modpath.rstrip('co').endswith('.py'):
# Probably a native module. # Probably a native module.
return None, None, None return
is_pkg = hasattr(sys.modules[fullname], '__path__') is_pkg = hasattr(module, '__path__')
try: try:
source = inspect.getsource(sys.modules[fullname]) source = inspect.getsource(module)
except IOError: except IOError:
# Work around inspect.getsourcelines() bug. # Work around inspect.getsourcelines() bug.
if not is_pkg: if not is_pkg:
raise raise
source = '\n' source = '\n'
return (sys.modules[fullname].__file__.rstrip('co'), return (module.__file__.rstrip('co'),
source, source,
hasattr(sys.modules[fullname], '__path__')) hasattr(module, '__path__'))
def _get_module_via_parent_enumeration(self, fullname): def _get_module_via_parent(self, fullname):
"""Attempt to fetch source code by examining the module's (hopefully """Attempt to fetch source code by examining the module's (hopefully
less insane) parent package. Required for ansible.compat.six.""" less insane) parent package. Required for ansible.compat.six."""
# Need to find the ancient version of Ansible with the ancient
# non-package version of six that required this method to exist.
# Currently it doesn't seem to be needed at all, and it's broken for
# packages.
pkgname, _, modname = fullname.rpartition('.') pkgname, _, modname = fullname.rpartition('.')
pkg = sys.modules.get(pkgname) pkg = sys.modules.get(pkgname)
if pkg is None or not hasattr(pkg, '__file__'): if not (isinstance(pkg, types.ModuleType) and hasattr(pkg, '__file__')):
return return
pkg_path = os.path.dirname(pkg.__file__) pkg_path = os.path.dirname(pkg.__file__)
try: try:
fp, path, ext = imp.find_module(modname, [pkg_path]) fp, path, ext = imp.find_module(modname, [pkg_path])
LOG.error('%r', (fp, path, ext)) if ext and ext[-1] == imp.PKG_DIRECTORY:
assert 0, "TODO"
return path, fp.read(), False return path, fp.read(), False
except ImportError, e: except ImportError, e:
LOG.debug('imp.find_module(%r, %r) -> %s', modname, [pkg_path], e) LOG.debug('imp.find_module(%r, %r) -> %s', modname, [pkg_path], e)
get_module_methods = [_get_module_via_pkgutil, get_module_methods = [_get_module_via_pkgutil,
_get_module_via_sys_modules, _get_module_via_sys_modules,
_get_module_via_parent_enumeration] _get_module_via_parent]
def get_module_source(self, fullname): def get_module_source(self, fullname):
"""Given the name of a loaded module `fullname`, attempt to find its """Given the name of a loaded module `fullname`, attempt to find its
@ -489,11 +492,11 @@ class ModuleFinder(object):
"""Given an ImportFrom AST node, guess the prefix that should be tacked """Given an ImportFrom AST node, guess the prefix that should be tacked
on to an alias name to produce a canonical name. `fullname` is the name on to an alias name to produce a canonical name. `fullname` is the name
of the module in which the ImportFrom appears.""" of the module in which the ImportFrom appears."""
if level == 0: if level == 0 or not fullname:
return '' return ''
bits = fullname.split('.') bits = fullname.split('.')
if len(bits) < level: if len(bits) <= level:
# This would be an ImportError in real code. # This would be an ImportError in real code.
return '' return ''
@ -536,13 +539,15 @@ class ModuleFinder(object):
for name in namelist for name in namelist
) )
return self._related_cache.setdefault(fullname, [ return self._related_cache.setdefault(fullname, sorted(
set(
name name
for name in maybe_names for name in maybe_names
if sys.modules.get(name) is not None if sys.modules.get(name) is not None
and not self.is_stdlib_name(name) and not self.is_stdlib_name(name)
and 'six.moves' not in name # TODO: crap and 'six.moves' not in name # TODO: crap
]) )
))
def find_related(self, fullname): def find_related(self, fullname):
stack = [fullname] stack = [fullname]

@ -0,0 +1,6 @@
import sys
def say_hi():
print 'hi'

@ -0,0 +1,3 @@
from __future__ import absolute_import
from module_finder_testmod.regular_mod import say_hi

@ -2,26 +2,168 @@
import unittest import unittest
import mitogen.master import mitogen.master
import testlib
class CompilerModuleTest(unittest.TestCase):
klass = mitogen.master.ModuleScanner
@classmethod class ConstructorTest(testlib.TestCase):
def setUpClass(cls): klass = mitogen.master.ModuleFinder
super(CompilerModuleTest, cls).setUpClass()
#import compiler def test_simple(self):
#mitogen.master.ast = None self.klass()
#mitogen.master.compiler = compiler
class ReprTest(testlib.TestCase):
klass = mitogen.master.ModuleFinder
def test_simple(self):
self.assertEquals('ModuleFinder()', repr(self.klass()))
class IsStdlibNameTest(testlib.TestCase):
klass = mitogen.master.ModuleFinder
def call(self, fullname):
return self.klass().is_stdlib_name(fullname)
def test_builtin(self):
import sys
self.assertTrue(self.call('sys'))
def test_stdlib_1(self):
import logging
self.assertTrue(self.call('logging'))
def test_stdlib_2(self):
# virtualenv only symlinks some paths to its local site-packages
# directory. Ensure both halves of the search path return the correct
# result.
import email
self.assertTrue(self.call('email'))
def test_mitogen_core(self):
import mitogen.core
self.assertFalse(self.call('mitogen.core'))
def test_mitogen_fakessh(self):
import mitogen.fakessh
self.assertFalse(self.call('mitogen.fakessh'))
class GetModuleViaPkgutilTest(testlib.TestCase):
klass = mitogen.master.ModuleFinder
def call(self, fullname):
return self.klass()._get_module_via_pkgutil(fullname)
def test_empty_source_pkg(self):
path, src, is_pkg = self.call('module_finder_testmod')
self.assertEquals(path,
testlib.data_path('module_finder_testmod/__init__.py'))
self.assertEquals('', src)
self.assertTrue(is_pkg)
def test_empty_source_module(self):
path, src, is_pkg = self.call('module_finder_testmod.empty_mod')
self.assertEquals(path,
testlib.data_path('module_finder_testmod/empty_mod.py'))
self.assertEquals('', src)
self.assertFalse(is_pkg)
def test_regular_mod(self):
from module_finder_testmod import regular_mod
path, src, is_pkg = self.call('module_finder_testmod.regular_mod')
self.assertEquals(path,
testlib.data_path('module_finder_testmod/regular_mod.py'))
self.assertEquals(src, file(regular_mod.__file__).read())
self.assertFalse(is_pkg)
class GetModuleViaSysModulesTest(testlib.TestCase):
klass = mitogen.master.ModuleFinder
def call(self, fullname):
return self.klass()._get_module_via_sys_modules(fullname)
def test_main(self):
import __main__
path, src, is_pkg = self.call('__main__')
self.assertEquals(path, __main__.__file__)
self.assertEquals(src, file(path).read())
self.assertFalse(is_pkg)
def test_dylib_fails(self):
# _socket comes from a .so
import _socket
tup = self.call('_socket')
self.assertEquals(None, tup)
def test_builtin_fails(self):
# sys is built-in
tup = self.call('sys')
self.assertEquals(None, tup)
class GetModuleViaParentEnumerationTest(testlib.TestCase):
klass = mitogen.master.ModuleFinder
def call(self, fullname):
return self.klass()._get_module_via_parent(fullname)
def test_simple_module(self):
import email.utils
path, src, is_pkg = self.call('email.utils')
self.assertEquals(path, email.utils.__file__.rstrip('co'))
self.assertEquals(src, file(email.utils.__file__.rstrip('co')).read())
self.assertFalse(is_pkg)
def test_ansible_compat_six(self):
# See comment in _get_module_via_parent
raise unittest.SkipTest()
import ansible.compat.six
path, src, is_pkg = self.call('ansible.compat.six')
self.assertEquals(path, __main__.__file__)
self.assertEquals(src, file(path).read())
self.assertFalse(is_pkg)
class ResolveRelPathTest(testlib.TestCase):
klass = mitogen.master.ModuleFinder
def call(self, fullname, level):
return self.klass().resolve_relpath(fullname, level)
def test_empty(self):
self.assertEquals('', self.call('', 0))
self.assertEquals('', self.call('', 1))
self.assertEquals('', self.call('', 2))
def test_absolute(self):
self.assertEquals('', self.call('email.utils', 0))
def test_rel1(self):
self.assertEquals('email.', self.call('email.utils', 1))
def test_rel2(self):
self.assertEquals('', self.call('email.utils', 2))
def test_rel_overflow(self):
self.assertEquals('', self.call('email.utils', 3))
class FindRelatedImportsTest(testlib.TestCase):
klass = mitogen.master.ModuleFinder
def call(self, fullname):
return self.klass().find_related_imports(fullname)
def test_simple(self): def test_simple(self):
for x in range(100):
finder = self.klass()
from pprint import pprint
import time
t0 = time.time()
import mitogen.fakessh import mitogen.fakessh
pprint(finder.find_related('mitogen.fakessh')) related = self.call('mitogen.fakessh')
print 1000 * (time.time() - t0) self.assertEquals(related, [
'mitogen',
'mitogen.core',
'mitogen.master',
])
if __name__ == '__main__': if __name__ == '__main__':

Loading…
Cancel
Save