You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
8.5 KiB

# (c) Ansible Project
# GNU General Public License v3.0+ (see COPYING or
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ansible import context
from ansible import constants as C
from ansible.collections.list import list_collections
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_native, to_bytes
from ansible.plugins import loader
from ansible.utils.display import Display
from ansible.utils.collection_loader._collection_finder import _get_collection_path
display = Display()
# not real plugins
# ptype: names
'module': ('async_wrapper', ),
'cache': ('base', ),
def get_composite_name(collection, name, path, depth):
resolved_collection = collection
if '.' not in name:
resource_name = name
if collection == 'ansible.legacy' and name.startswith('ansible.builtin.'):
resolved_collection = 'ansible.builtin'
resource_name = '.'.join(name.split(f"{resolved_collection}.")[1:])
# collectionize name
composite = [resolved_collection]
if depth:
composite.extend(path.split(os.path.sep)[depth * -1:])
return '.'.join(composite)
def _list_plugins_from_paths(ptype, dirs, collection, depth=0):
# TODO: update to use importlib.resources
plugins = {}
for path in dirs:
display.debug("Searching '{0}'s '{1}' for {2} plugins".format(collection, path, ptype))
b_path = to_bytes(path)
if os.path.basename(b_path).startswith((b'.', b'__')):
# skip hidden/special dirs
if os.path.exists(b_path):
if os.path.isdir(b_path):
bkey = ptype.lower()
for plugin_file in os.listdir(b_path):
if plugin_file.startswith((b'.', b'__')):
# hidden or python internal file/dir
display.debug("Found possible plugin: '{0}'".format(plugin_file))
b_plugin, b_ext = os.path.splitext(plugin_file)
plugin = to_native(b_plugin)
full_path = os.path.join(b_path, plugin_file)
if os.path.isdir(full_path):
# its a dir, recurse
if not os.path.exists(os.path.join(full_path, b'')):
# dont recurse for synthetic unless present
# actually recurse dirs
plugins.update(_list_plugins_from_paths(ptype, [to_native(full_path)], collection, depth=depth + 1))
if any([
plugin in C.IGNORE_FILES, # general files to ignore
to_native(b_ext) in C.REJECT_EXTS, # general extensions to ignore
b_ext in (b'.yml', b'.yaml', b'.json'), # ignore docs files TODO: constant!
plugin in IGNORE.get(bkey, ()), # plugin in reject list
os.path.islink(full_path), # skip aliases, author should document in 'aliaes' field
if ptype in ('test', 'filter'):
file_plugins = _list_j2_plugins_from_file(collection, full_path, ptype, plugin)
except KeyError as e:
display.warning('Skipping file %s: %s' % (full_path, to_native(e)))
for plugin in file_plugins:
plugin_name = get_composite_name(collection, plugin.ansible_name, os.path.dirname(to_native(full_path)), depth)
plugins[plugin_name] = full_path
plugin_name = get_composite_name(collection, plugin, os.path.dirname(to_native(full_path)), depth)
plugins[plugin_name] = full_path
display.debug("Skip listing plugins in '{0}' as it is not a directory".format(path))
display.debug("Skip listing plugins in '{0}' as it does not exist".format(path))
return plugins
def _list_j2_plugins_from_file(collection, plugin_path, ptype, plugin_name):
ploader = getattr(loader, '{0}_loader'.format(ptype))
file_plugins = ploader.get_contained_plugins(collection, plugin_path, plugin_name)
return file_plugins
def list_collection_plugins(ptype, collections, search_paths=None):
# TODO: update to use importlib.resources
# starts at {plugin_name: filepath, ...}, but changes at the end
plugins = {}
ploader = getattr(loader, '{0}_loader'.format(ptype))
except AttributeError:
raise AnsibleError('Cannot list plugins, incorrect plugin type supplied: {0}'.format(ptype))
# get plugins for each collection
for collection in collections.keys():
if collection == 'ansible.builtin':
# dirs from ansible install, but not configured paths
dirs = [d.path for d in ploader._get_paths_with_context() if d.internal]
elif collection == 'ansible.legacy':
# configured paths + search paths (should include basedirs/-M)
dirs = [d.path for d in ploader._get_paths_with_context() if not d.internal]
if context.CLIARGS.get('module_path', None):
# search path in this case is for locating collection itselfA
b_ptype = to_bytes(C.COLLECTION_PTYPE_COMPAT.get(ptype, ptype))
dirs = [to_native(os.path.join(collections[collection], b'plugins', b_ptype))]
# acr = AnsibleCollectionRef.try_parse_fqcr(collection, ptype)
# if acr:
# dirs = acr.subdirs
# else:
# raise Exception('bad acr for %s, %s' % (collection, ptype))
plugins.update(_list_plugins_from_paths(ptype, dirs, collection))
# return plugin and it's class object, None for those not verifiable or failing
if ptype in ('module',):
# no 'invalid' tests for modules
for plugin in plugins.keys():
plugins[plugin] = (plugins[plugin], None)
# detect invalid plugin candidates AND add loaded object to return data
for plugin in list(plugins.keys()):
pobj = None
pobj = ploader.get(plugin, class_only=True)
except Exception as e:
display.vvv("The '{0}' {1} plugin could not be loaded from '{2}': {3}".format(plugin, ptype, plugins[plugin], to_native(e)))
# sets final {plugin_name: (filepath, class|NONE if not loaded), ...}
plugins[plugin] = (plugins[plugin], pobj)
# {plugin_name: (filepath, class), ...}
return plugins
def list_plugins(ptype, collection=None, search_paths=None):
# {plugin_name: (filepath, class), ...}
plugins = {}
collections = {}
if collection is None:
# list all collections, add synthetic ones
collections['ansible.builtin'] = b''
collections['ansible.legacy'] = b''
collections.update(list_collections(search_paths=search_paths, dedupe=True))
elif collection == 'ansible.legacy':
# add builtin, since legacy also resolves to these
collections[collection] = b''
collections['ansible.builtin'] = b''
collections[collection] = to_bytes(_get_collection_path(collection))
except ValueError as e:
raise AnsibleError("Cannot use supplied collection {0}: {1}".format(collection, to_native(e)), orig_exc=e)
if collections:
plugins.update(list_collection_plugins(ptype, collections))
return plugins
# wrappers
def list_plugin_names(ptype, collection=None):
return [plugin.ansible_name for plugin in list_plugins(ptype, collection)]
def list_plugin_files(ptype, collection=None):
plugins = list_plugins(ptype, collection)
return [plugins[k][0] for k in plugins.keys()]
def list_plugin_classes(ptype, collection=None):
plugins = list_plugins(ptype, collection)
return [plugins[k][1] for k in plugins.keys()]