From 47caea9e4689212cef4bb8f4c03a2e7f88ec027f Mon Sep 17 00:00:00 2001 From: Matt Martz Date: Thu, 12 Jun 2025 13:25:54 -0500 Subject: [PATCH] Remove exec side effects from actions that should not exec --- .../collection_loader/_collection_finder.py | 140 ++++++++---------- .../test_collection_loader.py | 24 ++- 2 files changed, 76 insertions(+), 88 deletions(-) diff --git a/lib/ansible/utils/collection_loader/_collection_finder.py b/lib/ansible/utils/collection_loader/_collection_finder.py index c654667d978..81d3becc5e1 100644 --- a/lib/ansible/utils/collection_loader/_collection_finder.py +++ b/lib/ansible/utils/collection_loader/_collection_finder.py @@ -16,7 +16,7 @@ import sys from contextlib import contextmanager from importlib import import_module, reload as reload_module from importlib.machinery import FileFinder -from importlib.util import find_spec, spec_from_loader +from importlib.util import find_spec, module_from_spec, spec_from_loader from keyword import iskeyword from types import ModuleType @@ -41,6 +41,18 @@ except ImportError: # deprecated: description='TraversableResources fallback' python_version='3.8' TraversableResources = object # type: ignore[assignment,misc] + +if sys.version_info >= (3, 10): + from importlib.resources import files +else: + # deprecated: description='reliable importlib.resources.files' python_version='3.10' + def files(name): + spec = find_spec(name) + if spec is None: + raise ImportError(name) + origin = pathlib.Path(spec.origin) + return origin.parent + # NB: this supports import sanity test providing a different impl try: from ._collection_meta import _meta_yml_to_dict @@ -86,7 +98,7 @@ class _AnsibleNSTraversable: self._paths = [pathlib.Path(p) for p in paths] def __repr__(self): - return "_AnsibleNSTraversable('%s')" % "', '".join(map(_to_text, self._paths)) + return "_AnsibleNSTraversable('%s')" % "', '".join(map(str, self._paths)) def iterdir(self): return itertools.chain.from_iterable(p.iterdir() for p in self._paths if p.is_dir()) @@ -355,7 +367,8 @@ class _AnsibleCollectionFinder: if loader is None: return None - spec = spec_from_loader(fullname, loader) + origin = getattr(loader, 'path', None) + spec = spec_from_loader(fullname, loader, origin=origin) if spec is not None and hasattr(loader, '_subpackage_search_paths'): spec.submodule_search_locations = loader._subpackage_search_paths return spec @@ -452,6 +465,7 @@ class _AnsibleCollectionPkgLoaderBase: def __init__(self, fullname, path_list=None): self._fullname = fullname self._redirect_module = None + self._redirect_module_spec = None self._split_name = fullname.split('.') self._rpart_name = fullname.rpartition('.') self._parent_package_name = self._rpart_name[0] # eg ansible_collections for ansible_collections.somens, '' for toplevel @@ -534,8 +548,16 @@ class _AnsibleCollectionPkgLoaderBase: return _AnsibleTraversableResources(fullname, self) def exec_module(self, module): - # short-circuit redirect; avoid reinitializing existing modules - if self._redirect_module: + if self._redirect_module_spec and not sys.modules.get(self._redirect_module_spec.name): + # mirror import_module behavior + sys.modules[self._redirect_module_spec.name] = self._redirect_module + parent, _sep, child = self._redirect_module_spec.name.rpartition('.') + parent_module = sys.modules[parent] + setattr(parent_module, child, self._redirect_module) + + # Actually exec the module + self._redirect_module_spec.loader.exec_module(self._redirect_module) + return # execute the module's code in its namespace @@ -544,38 +566,18 @@ class _AnsibleCollectionPkgLoaderBase: exec(code_obj, module.__dict__) def create_module(self, spec): - # short-circuit redirect; we've already imported the redirected module, so just alias it and return it + spec.submodule_search_locations = self._subpackage_search_paths + + if self._redirect_module_spec and (module := sys.modules.get(self._redirect_module_spec.name)): + self._redirect_module = module + elif self._redirect_module_spec: + self._redirect_module = module_from_spec(self._redirect_module_spec) + if self._redirect_module: return self._redirect_module else: return None - def load_module(self, fullname): - # short-circuit redirect; we've already imported the redirected module, so just alias it and return it - if self._redirect_module: - sys.modules[self._fullname] = self._redirect_module - return self._redirect_module - - # we're actually loading a module/package - module_attrs = dict( - __loader__=self, - __file__=self.get_filename(fullname), - __package__=self._parent_package_name # sane default for non-packages - ) - - # eg, I am a package - if self._subpackage_search_paths is not None: # empty is legal - module_attrs['__path__'] = self._subpackage_search_paths - module_attrs['__package__'] = fullname # per PEP366 - - with self._new_or_existing_module(fullname, **module_attrs) as module: - # execute the module's code in its namespace - code_obj = self.get_code(fullname) - if code_obj is not None: # things like NS packages that can't have code on disk will return None - exec(code_obj, module.__dict__) - - return module - def is_package(self, fullname): if fullname != self._fullname: raise ValueError('this loader cannot answer is_package for {0}, only {1}'.format(fullname, self._fullname)) @@ -742,12 +744,9 @@ class _AnsibleCollectionPkgLoader(_AnsibleCollectionPkgLoaderBase): self._load_module(module) def create_module(self, spec): + spec.submodule_search_locations = self._subpackage_search_paths return None - def load_module(self, fullname): - module = super(_AnsibleCollectionPkgLoader, self).load_module(fullname) - return self._load_module(module) - def _canonicalize_meta(self, meta_dict): # TODO: rewrite import keys and all redirect targets that start with .. (current namespace) and . (current collection) # OR we could do it all on the fly? @@ -809,8 +808,11 @@ class _AnsibleCollectionLoader(_AnsibleCollectionPkgLoaderBase): # see the redirected ancestor package contents instead of the package where they actually live). if redirect: # FIXME: wrap this so we can be explicit about a failed redirection - self._redirect_module = import_module(redirect) - if explicit_redirect and hasattr(self._redirect_module, '__path__') and self._redirect_module.__path__: + if not (spec := find_spec(redirect)): + raise ImportError(redirect) + self._redirect_module_spec = spec + self.path = spec.origin + if explicit_redirect and spec.loader.is_package(redirect): # if the import target looks like a package, store its name so we can rewrite future descendent loads self._redirected_package_map[self._fullname] = redirect @@ -872,20 +874,6 @@ class _AnsibleInternalRedirectLoader: def create_module(self, spec): return None - def load_module(self, fullname): - # since we're delegating to other loaders, this should only be called for internal redirects where we answered - # find_module with this loader, in which case we'll just directly import the redirection target, insert it into - # sys.modules under the name it was requested by, and return the original module. - - # should never see this - if not self._redirect: - raise ValueError('no redirect found for {0}'.format(fullname)) - - # FIXME: smuggle redirection context, provide warning/error that we tried and failed to redirect - mod = import_module(self._redirect) - sys.modules[fullname] = mod - return mod - class AnsibleCollectionRef: # FUTURE: introspect plugin loaders to get these dynamically? @@ -1081,11 +1069,11 @@ def _get_collection_path(collection_name): if not collection_name or not isinstance(collection_name, str) or len(collection_name.split('.')) != 2: raise ValueError('collection_name must be a non-empty string of the form namespace.collection') try: - collection_pkg = import_module('ansible_collections.' + collection_name) + collection_pkg = files('ansible_collections.' + collection_name) except ImportError: raise ValueError('unable to locate collection {0}'.format(collection_name)) - return _to_text(os.path.dirname(_to_bytes(collection_pkg.__file__))) + return str(collection_pkg) def _get_collection_playbook_path(playbook): @@ -1094,27 +1082,26 @@ def _get_collection_playbook_path(playbook): if acr: try: # get_collection_path - pkg = import_module(acr.n_python_collection_package_name) + pkg = files(acr.n_python_collection_package_name) except (OSError, ModuleNotFoundError) as ex: # leaving ex as debug target, even though not used in normal code pkg = None if pkg: - cpath = os.path.join(sys.modules[acr.n_python_collection_package_name].__file__.replace('__synthetic__', 'playbooks')) + cpath = pkg / 'playbooks' if acr.subdirs: paths = [_to_text(x) for x in acr.subdirs.split(u'.')] - paths.insert(0, cpath) - cpath = os.path.join(*paths) + cpath = cpath.joinpath(*paths) - path = os.path.join(cpath, _to_text(acr.resource)) - if os.path.exists(_to_bytes(path)): - return acr.resource, path, acr.collection + path = cpath / _to_text(acr.resource) + if path.exists(): + return acr.resource, str(path), acr.collection elif not acr.resource.endswith(PB_EXTENSIONS): for ext in PB_EXTENSIONS: - path = os.path.join(cpath, _to_text(acr.resource + ext)) - if os.path.exists(_to_bytes(path)): - return acr.resource, path, acr.collection + path = (cpath / _to_text(acr.resource)).with_suffix(ext) + if path.exists(): + return acr.resource, str(path), acr.collection return None @@ -1144,13 +1131,8 @@ def _get_collection_resource_path(name, ref_type, collection_list=None): try: acr = AnsibleCollectionRef(collection_name=collection_name, subdirs=subdirs, resource=resource, ref_type=ref_type) # FIXME: error handling/logging; need to catch any import failures and move along - pkg = import_module(acr.n_python_package_name) - - if pkg is not None: - # the package is now loaded, get the collection's package and ask where it lives - path = os.path.dirname(_to_bytes(sys.modules[acr.n_python_package_name].__file__)) - return resource, _to_text(path), collection_name - + pkg = files(acr.n_python_package_name) + return resource, str(pkg), collection_name except (OSError, ModuleNotFoundError) as ex: continue except Exception as ex: @@ -1170,32 +1152,30 @@ def _get_collection_name_from_path(path): """ # ensure we compare full paths since pkg path will be abspath - path = _to_text(os.path.abspath(_to_bytes(path))) + path = pathlib.Path(_to_text(path)).absolute() - path_parts = path.split('/') - if path_parts.count('ansible_collections') != 1: + if path.parts.count('ansible_collections') != 1: return None - ac_pos = path_parts.index('ansible_collections') + ac_pos = path.parts.index('ansible_collections') # make sure it's followed by at least a namespace and collection name - if len(path_parts) < ac_pos + 3: + if len(path.parts) < ac_pos + 3: return None - candidate_collection_name = '.'.join(path_parts[ac_pos + 1:ac_pos + 3]) + candidate_collection_name = '.'.join(path.parts[ac_pos + 1:ac_pos + 3]) try: # we've got a name for it, now see if the path prefix matches what the loader sees - imported_pkg_path = _to_text(os.path.dirname(_to_bytes(import_module('ansible_collections.' + candidate_collection_name).__file__))) + imported_pkg_path = files('ansible_collections.' + candidate_collection_name) except ImportError: return None # reassemble the original path prefix up the collection name, and it should match what we just imported. If not # this is probably a collection root that's not configured. - original_path_prefix = os.path.join('/', *path_parts[0:ac_pos + 3]) + original_path_prefix = pathlib.Path(*path.parts[0:ac_pos + 3]) - imported_pkg_path = _to_text(os.path.abspath(_to_bytes(imported_pkg_path))) if original_path_prefix != imported_pkg_path: return None diff --git a/test/units/utils/collection_loader/test_collection_loader.py b/test/units/utils/collection_loader/test_collection_loader.py index 9355817be3c..8c973ae1e2f 100644 --- a/test/units/utils/collection_loader/test_collection_loader.py +++ b/test/units/utils/collection_loader/test_collection_loader.py @@ -1,5 +1,6 @@ from __future__ import annotations +import importlib.util import inspect import os import pkgutil @@ -190,13 +191,14 @@ def test_root_loader(): sys.modules.pop(name, None) loader = _AnsibleCollectionRootPkgLoader(name, paths) assert repr(loader).startswith('_AnsibleCollectionRootPkgLoader(path=') - module = loader.load_module(name) + spec = importlib.util.spec_from_loader(name, loader) + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) assert module.__name__ == name assert module.__path__ == [p for p in extend_paths(paths, name) if os.path.isdir(p)] # even if the dir exists somewhere, this loader doesn't support get_data, so make __file__ a non-file assert module.__file__ == '' assert module.__package__ == name - assert sys.modules.get(name) == module def test_nspkg_loader_not_interested(): @@ -217,13 +219,14 @@ def test_nspkg_loader_load_module(): sys.modules.pop(name, None) loader = _AnsibleCollectionNSPkgLoader(name, path_list=paths) assert repr(loader).startswith('_AnsibleCollectionNSPkgLoader(path=') - module = loader.load_module(name) + spec = importlib.util.spec_from_loader(name, loader) + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) assert module.__name__ == name assert isinstance(module.__loader__, _AnsibleCollectionNSPkgLoader) assert module.__path__ == existing_child_paths assert module.__package__ == name assert module.__file__ == '' - assert sys.modules.get(name) == module def test_collpkg_loader_not_interested(): @@ -246,7 +249,9 @@ def test_collpkg_loader_load_module(): sys.modules.pop(name, None) loader = _AnsibleCollectionPkgLoader(name, path_list=paths) assert repr(loader).startswith('_AnsibleCollectionPkgLoader(path=') - module = loader.load_module(name) + spec = importlib.util.spec_from_loader(name, loader) + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) assert module.__name__ == name assert isinstance(module.__loader__, _AnsibleCollectionPkgLoader) if is_builtin: @@ -259,7 +264,6 @@ def test_collpkg_loader_load_module(): assert module.__file__ == '' else: assert module.__file__.endswith('__synthetic__') and os.path.isdir(os.path.dirname(module.__file__)) - assert sys.modules.get(name) == module assert hasattr(module, '_collection_meta') and isinstance(module._collection_meta, dict) @@ -272,7 +276,10 @@ def test_collpkg_loader_load_module(): with patch.object(_collection_finder, '_meta_yml_to_dict', side_effect=Exception('bang')): with pytest.raises(Exception) as ex: - _AnsibleCollectionPkgLoader(name, path_list=paths).load_module(name) + loader = _AnsibleCollectionPkgLoader(name, path_list=paths) + spec = importlib.util.spec_from_loader(name, loader) + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) assert 'error parsing collection metadata' in str(ex.value) @@ -605,7 +612,8 @@ def test_collection_role_name_location(role_name, collection_list, expected_coll assert found[1] == expected_path assert found[2] == expected_collection_name else: - assert expected_collection_name is None and expected_path_suffix is None + assert expected_collection_name is None + assert expected_path_suffix is None def test_bogus_imports():