Remove exec side effects from actions that should not exec

pull/85319/head
Matt Martz 8 months ago
parent d3977ebc88
commit 47caea9e46
No known key found for this signature in database
GPG Key ID: 40832D88E9FC91D8

@ -16,7 +16,7 @@ import sys
from contextlib import contextmanager
from importlib import import_module, reload as reload_module
from importlib.machinery import FileFinder
from importlib.util import find_spec, spec_from_loader
from importlib.util import find_spec, module_from_spec, spec_from_loader
from keyword import iskeyword
from types import ModuleType
@ -41,6 +41,18 @@ except ImportError:
# deprecated: description='TraversableResources fallback' python_version='3.8'
TraversableResources = object # type: ignore[assignment,misc]
if sys.version_info >= (3, 10):
from importlib.resources import files
else:
# deprecated: description='reliable importlib.resources.files' python_version='3.10'
def files(name):
spec = find_spec(name)
if spec is None:
raise ImportError(name)
origin = pathlib.Path(spec.origin)
return origin.parent
# NB: this supports import sanity test providing a different impl
try:
from ._collection_meta import _meta_yml_to_dict
@ -86,7 +98,7 @@ class _AnsibleNSTraversable:
self._paths = [pathlib.Path(p) for p in paths]
def __repr__(self):
return "_AnsibleNSTraversable('%s')" % "', '".join(map(_to_text, self._paths))
return "_AnsibleNSTraversable('%s')" % "', '".join(map(str, self._paths))
def iterdir(self):
return itertools.chain.from_iterable(p.iterdir() for p in self._paths if p.is_dir())
@ -355,7 +367,8 @@ class _AnsibleCollectionFinder:
if loader is None:
return None
spec = spec_from_loader(fullname, loader)
origin = getattr(loader, 'path', None)
spec = spec_from_loader(fullname, loader, origin=origin)
if spec is not None and hasattr(loader, '_subpackage_search_paths'):
spec.submodule_search_locations = loader._subpackage_search_paths
return spec
@ -452,6 +465,7 @@ class _AnsibleCollectionPkgLoaderBase:
def __init__(self, fullname, path_list=None):
self._fullname = fullname
self._redirect_module = None
self._redirect_module_spec = None
self._split_name = fullname.split('.')
self._rpart_name = fullname.rpartition('.')
self._parent_package_name = self._rpart_name[0] # eg ansible_collections for ansible_collections.somens, '' for toplevel
@ -534,8 +548,16 @@ class _AnsibleCollectionPkgLoaderBase:
return _AnsibleTraversableResources(fullname, self)
def exec_module(self, module):
# short-circuit redirect; avoid reinitializing existing modules
if self._redirect_module:
if self._redirect_module_spec and not sys.modules.get(self._redirect_module_spec.name):
# mirror import_module behavior
sys.modules[self._redirect_module_spec.name] = self._redirect_module
parent, _sep, child = self._redirect_module_spec.name.rpartition('.')
parent_module = sys.modules[parent]
setattr(parent_module, child, self._redirect_module)
# Actually exec the module
self._redirect_module_spec.loader.exec_module(self._redirect_module)
return
# execute the module's code in its namespace
@ -544,38 +566,18 @@ class _AnsibleCollectionPkgLoaderBase:
exec(code_obj, module.__dict__)
def create_module(self, spec):
# short-circuit redirect; we've already imported the redirected module, so just alias it and return it
spec.submodule_search_locations = self._subpackage_search_paths
if self._redirect_module_spec and (module := sys.modules.get(self._redirect_module_spec.name)):
self._redirect_module = module
elif self._redirect_module_spec:
self._redirect_module = module_from_spec(self._redirect_module_spec)
if self._redirect_module:
return self._redirect_module
else:
return None
def load_module(self, fullname):
# short-circuit redirect; we've already imported the redirected module, so just alias it and return it
if self._redirect_module:
sys.modules[self._fullname] = self._redirect_module
return self._redirect_module
# we're actually loading a module/package
module_attrs = dict(
__loader__=self,
__file__=self.get_filename(fullname),
__package__=self._parent_package_name # sane default for non-packages
)
# eg, I am a package
if self._subpackage_search_paths is not None: # empty is legal
module_attrs['__path__'] = self._subpackage_search_paths
module_attrs['__package__'] = fullname # per PEP366
with self._new_or_existing_module(fullname, **module_attrs) as module:
# execute the module's code in its namespace
code_obj = self.get_code(fullname)
if code_obj is not None: # things like NS packages that can't have code on disk will return None
exec(code_obj, module.__dict__)
return module
def is_package(self, fullname):
if fullname != self._fullname:
raise ValueError('this loader cannot answer is_package for {0}, only {1}'.format(fullname, self._fullname))
@ -742,12 +744,9 @@ class _AnsibleCollectionPkgLoader(_AnsibleCollectionPkgLoaderBase):
self._load_module(module)
def create_module(self, spec):
spec.submodule_search_locations = self._subpackage_search_paths
return None
def load_module(self, fullname):
module = super(_AnsibleCollectionPkgLoader, self).load_module(fullname)
return self._load_module(module)
def _canonicalize_meta(self, meta_dict):
# TODO: rewrite import keys and all redirect targets that start with .. (current namespace) and . (current collection)
# OR we could do it all on the fly?
@ -809,8 +808,11 @@ class _AnsibleCollectionLoader(_AnsibleCollectionPkgLoaderBase):
# see the redirected ancestor package contents instead of the package where they actually live).
if redirect:
# FIXME: wrap this so we can be explicit about a failed redirection
self._redirect_module = import_module(redirect)
if explicit_redirect and hasattr(self._redirect_module, '__path__') and self._redirect_module.__path__:
if not (spec := find_spec(redirect)):
raise ImportError(redirect)
self._redirect_module_spec = spec
self.path = spec.origin
if explicit_redirect and spec.loader.is_package(redirect):
# if the import target looks like a package, store its name so we can rewrite future descendent loads
self._redirected_package_map[self._fullname] = redirect
@ -872,20 +874,6 @@ class _AnsibleInternalRedirectLoader:
def create_module(self, spec):
return None
def load_module(self, fullname):
# since we're delegating to other loaders, this should only be called for internal redirects where we answered
# find_module with this loader, in which case we'll just directly import the redirection target, insert it into
# sys.modules under the name it was requested by, and return the original module.
# should never see this
if not self._redirect:
raise ValueError('no redirect found for {0}'.format(fullname))
# FIXME: smuggle redirection context, provide warning/error that we tried and failed to redirect
mod = import_module(self._redirect)
sys.modules[fullname] = mod
return mod
class AnsibleCollectionRef:
# FUTURE: introspect plugin loaders to get these dynamically?
@ -1081,11 +1069,11 @@ def _get_collection_path(collection_name):
if not collection_name or not isinstance(collection_name, str) or len(collection_name.split('.')) != 2:
raise ValueError('collection_name must be a non-empty string of the form namespace.collection')
try:
collection_pkg = import_module('ansible_collections.' + collection_name)
collection_pkg = files('ansible_collections.' + collection_name)
except ImportError:
raise ValueError('unable to locate collection {0}'.format(collection_name))
return _to_text(os.path.dirname(_to_bytes(collection_pkg.__file__)))
return str(collection_pkg)
def _get_collection_playbook_path(playbook):
@ -1094,27 +1082,26 @@ def _get_collection_playbook_path(playbook):
if acr:
try:
# get_collection_path
pkg = import_module(acr.n_python_collection_package_name)
pkg = files(acr.n_python_collection_package_name)
except (OSError, ModuleNotFoundError) as ex:
# leaving ex as debug target, even though not used in normal code
pkg = None
if pkg:
cpath = os.path.join(sys.modules[acr.n_python_collection_package_name].__file__.replace('__synthetic__', 'playbooks'))
cpath = pkg / 'playbooks'
if acr.subdirs:
paths = [_to_text(x) for x in acr.subdirs.split(u'.')]
paths.insert(0, cpath)
cpath = os.path.join(*paths)
cpath = cpath.joinpath(*paths)
path = os.path.join(cpath, _to_text(acr.resource))
if os.path.exists(_to_bytes(path)):
return acr.resource, path, acr.collection
path = cpath / _to_text(acr.resource)
if path.exists():
return acr.resource, str(path), acr.collection
elif not acr.resource.endswith(PB_EXTENSIONS):
for ext in PB_EXTENSIONS:
path = os.path.join(cpath, _to_text(acr.resource + ext))
if os.path.exists(_to_bytes(path)):
return acr.resource, path, acr.collection
path = (cpath / _to_text(acr.resource)).with_suffix(ext)
if path.exists():
return acr.resource, str(path), acr.collection
return None
@ -1144,13 +1131,8 @@ def _get_collection_resource_path(name, ref_type, collection_list=None):
try:
acr = AnsibleCollectionRef(collection_name=collection_name, subdirs=subdirs, resource=resource, ref_type=ref_type)
# FIXME: error handling/logging; need to catch any import failures and move along
pkg = import_module(acr.n_python_package_name)
if pkg is not None:
# the package is now loaded, get the collection's package and ask where it lives
path = os.path.dirname(_to_bytes(sys.modules[acr.n_python_package_name].__file__))
return resource, _to_text(path), collection_name
pkg = files(acr.n_python_package_name)
return resource, str(pkg), collection_name
except (OSError, ModuleNotFoundError) as ex:
continue
except Exception as ex:
@ -1170,32 +1152,30 @@ def _get_collection_name_from_path(path):
"""
# ensure we compare full paths since pkg path will be abspath
path = _to_text(os.path.abspath(_to_bytes(path)))
path = pathlib.Path(_to_text(path)).absolute()
path_parts = path.split('/')
if path_parts.count('ansible_collections') != 1:
if path.parts.count('ansible_collections') != 1:
return None
ac_pos = path_parts.index('ansible_collections')
ac_pos = path.parts.index('ansible_collections')
# make sure it's followed by at least a namespace and collection name
if len(path_parts) < ac_pos + 3:
if len(path.parts) < ac_pos + 3:
return None
candidate_collection_name = '.'.join(path_parts[ac_pos + 1:ac_pos + 3])
candidate_collection_name = '.'.join(path.parts[ac_pos + 1:ac_pos + 3])
try:
# we've got a name for it, now see if the path prefix matches what the loader sees
imported_pkg_path = _to_text(os.path.dirname(_to_bytes(import_module('ansible_collections.' + candidate_collection_name).__file__)))
imported_pkg_path = files('ansible_collections.' + candidate_collection_name)
except ImportError:
return None
# reassemble the original path prefix up the collection name, and it should match what we just imported. If not
# this is probably a collection root that's not configured.
original_path_prefix = os.path.join('/', *path_parts[0:ac_pos + 3])
original_path_prefix = pathlib.Path(*path.parts[0:ac_pos + 3])
imported_pkg_path = _to_text(os.path.abspath(_to_bytes(imported_pkg_path)))
if original_path_prefix != imported_pkg_path:
return None

@ -1,5 +1,6 @@
from __future__ import annotations
import importlib.util
import inspect
import os
import pkgutil
@ -190,13 +191,14 @@ def test_root_loader():
sys.modules.pop(name, None)
loader = _AnsibleCollectionRootPkgLoader(name, paths)
assert repr(loader).startswith('_AnsibleCollectionRootPkgLoader(path=')
module = loader.load_module(name)
spec = importlib.util.spec_from_loader(name, loader)
module = importlib.util.module_from_spec(spec)
loader.exec_module(module)
assert module.__name__ == name
assert module.__path__ == [p for p in extend_paths(paths, name) if os.path.isdir(p)]
# even if the dir exists somewhere, this loader doesn't support get_data, so make __file__ a non-file
assert module.__file__ == '<ansible_synthetic_collection_package>'
assert module.__package__ == name
assert sys.modules.get(name) == module
def test_nspkg_loader_not_interested():
@ -217,13 +219,14 @@ def test_nspkg_loader_load_module():
sys.modules.pop(name, None)
loader = _AnsibleCollectionNSPkgLoader(name, path_list=paths)
assert repr(loader).startswith('_AnsibleCollectionNSPkgLoader(path=')
module = loader.load_module(name)
spec = importlib.util.spec_from_loader(name, loader)
module = importlib.util.module_from_spec(spec)
loader.exec_module(module)
assert module.__name__ == name
assert isinstance(module.__loader__, _AnsibleCollectionNSPkgLoader)
assert module.__path__ == existing_child_paths
assert module.__package__ == name
assert module.__file__ == '<ansible_synthetic_collection_package>'
assert sys.modules.get(name) == module
def test_collpkg_loader_not_interested():
@ -246,7 +249,9 @@ def test_collpkg_loader_load_module():
sys.modules.pop(name, None)
loader = _AnsibleCollectionPkgLoader(name, path_list=paths)
assert repr(loader).startswith('_AnsibleCollectionPkgLoader(path=')
module = loader.load_module(name)
spec = importlib.util.spec_from_loader(name, loader)
module = importlib.util.module_from_spec(spec)
loader.exec_module(module)
assert module.__name__ == name
assert isinstance(module.__loader__, _AnsibleCollectionPkgLoader)
if is_builtin:
@ -259,7 +264,6 @@ def test_collpkg_loader_load_module():
assert module.__file__ == '<ansible_synthetic_collection_package>'
else:
assert module.__file__.endswith('__synthetic__') and os.path.isdir(os.path.dirname(module.__file__))
assert sys.modules.get(name) == module
assert hasattr(module, '_collection_meta') and isinstance(module._collection_meta, dict)
@ -272,7 +276,10 @@ def test_collpkg_loader_load_module():
with patch.object(_collection_finder, '_meta_yml_to_dict', side_effect=Exception('bang')):
with pytest.raises(Exception) as ex:
_AnsibleCollectionPkgLoader(name, path_list=paths).load_module(name)
loader = _AnsibleCollectionPkgLoader(name, path_list=paths)
spec = importlib.util.spec_from_loader(name, loader)
module = importlib.util.module_from_spec(spec)
loader.exec_module(module)
assert 'error parsing collection metadata' in str(ex.value)
@ -605,7 +612,8 @@ def test_collection_role_name_location(role_name, collection_list, expected_coll
assert found[1] == expected_path
assert found[2] == expected_collection_name
else:
assert expected_collection_name is None and expected_path_suffix is None
assert expected_collection_name is None
assert expected_path_suffix is None
def test_bogus_imports():

Loading…
Cancel
Save