@ -10,11 +10,14 @@ __metaclass__ = type
import glob
import os
import os . path
import pkgutil
import sys
import warnings
from collections import defaultdict , namedtuple
from traceback import format_exc
from ansible import __version__ as ansible_version
from ansible import constants as C
from ansible . errors import AnsibleError , AnsiblePluginCircularRedirect , AnsiblePluginRemovedError , AnsibleCollectionUnsupportedVersionError
from ansible . module_utils . _text import to_bytes , to_text , to_native
@ -26,8 +29,7 @@ from ansible.plugins import get_plugin_class, MODULE_CACHE, PATH_CACHE, PLUGIN_P
from ansible . utils . collection_loader import AnsibleCollectionConfig , AnsibleCollectionRef
from ansible . utils . collection_loader . _collection_finder import _AnsibleCollectionFinder , _get_collection_metadata
from ansible . utils . display import Display
from ansible . utils . plugin_docs import add_fragments
from ansible import __version__ as ansible_version
from ansible . utils . plugin_docs import add_fragments , find_plugin_docfile
# TODO: take the packaging dep, or vendor SpecifierSet?
@ -399,12 +401,18 @@ class PluginLoader:
# if type name != 'module_doc_fragment':
if type_name in C . CONFIGURABLE_PLUGINS and not C . config . get_configuration_definition ( type_name , name ) :
dstring = AnsibleLoader ( getattr ( module , ' DOCUMENTATION ' , ' ' ) , file_name = path ) . get_single_data ( )
# TODO: allow configurable plugins to use sidecar
# if not dstring:
# filename, cn = find_plugin_docfile( name, type_name, self, [os.path.dirname(path)], C.YAML_DOC_EXTENSIONS)
# # TODO: dstring = AnsibleLoader(, file_name=path).get_single_data()
if dstring :
add_fragments ( dstring , path , fragment_loader = fragment_loader , is_module = ( type_name == ' module ' ) )
if dstring and ' options ' in dstring and isinstance ( dstring [ ' options ' ] , dict ) :
C . config . initialize_plugin_configuration_definitions ( type_name , name , dstring [ ' options ' ] )
display . debug ( ' Loaded config def from plugin ( %s / %s ) ' % ( type_name , name ) )
if ' options ' in dstring and isinstance ( dstring [ ' options ' ] , dict ) :
C . config . initialize_plugin_configuration_definitions ( type_name , name , dstring [ ' options ' ] )
display . debug ( ' Loaded config def from plugin ( %s / %s ) ' % ( type_name , name ) )
def add_directory ( self , directory , with_subdir = False ) :
''' Adds an additional directory to the search path '''
@ -546,8 +554,7 @@ class PluginLoader:
found_files = sorted ( found_files ) # sort to ensure deterministic results, with the shortest match first
if len ( found_files ) > 1 :
# TODO: warn?
pass
display . debug ( ' Found several possible candidates for the plugin but using first: %s ' % ' , ' . join ( found_files ) )
return plugin_load_context . resolve (
full_name , to_text ( found_files [ 0 ] ) , acr . collection ,
@ -764,8 +771,7 @@ class PluginLoader:
# last ditch, if it's something that can be redirected, look for a builtin redirect before giving up
candidate_fqcr = ' ansible.builtin. {0} ' . format ( name )
if ' . ' not in name and AnsibleCollectionRef . is_valid_fqcr ( candidate_fqcr ) :
return self . _find_fq_plugin ( fq_name = candidate_fqcr , extension = suffix , plugin_load_context = plugin_load_context ,
ignore_deprecated = ignore_deprecated )
return self . _find_fq_plugin ( fq_name = candidate_fqcr , extension = suffix , plugin_load_context = plugin_load_context , ignore_deprecated = ignore_deprecated )
return plugin_load_context . nope ( ' {0} is not eligible for last-chance resolution ' . format ( name ) )
@ -896,7 +902,7 @@ class PluginLoader:
def all ( self , * args , * * kwargs ) :
'''
Iterate through all plugins of this type
Iterate through all plugins of this type , in configured paths ( no collections )
A plugin loader is initialized with a specific type . This function is an iterator returning
all of the plugins of that type to the caller .
@ -946,15 +952,17 @@ class PluginLoader:
name = os . path . splitext ( path ) [ 0 ]
basename = os . path . basename ( name )
if basename == ' __init__ ' or basename in _PLUGIN_FILTERS [ self . package ] :
# either empty or ignored by the module blocklist
if basename in _PLUGIN_FILTERS [ self . package ] :
display . debug ( " ' %s ' skipped due to a defined plugin filter " % basename )
continue
if basename == ' base' and self . package == ' ansible.plugins.cache ' :
if basename == ' __init__' or ( basename == ' base' and self . package == ' ansible.plugins.cache ' ) :
# cache has legacy 'base.py' file, which is wrapper for __init__.py
display . debug ( " ' %s ' skipped due to reserved name " % basename )
continue
if dedupe and basename in loaded_modules :
display . debug ( " ' %s ' skipped as duplicate " % basename )
continue
loaded_modules . add ( basename )
@ -964,17 +972,19 @@ class PluginLoader:
continue
if path not in self . _module_cache :
if self . type in ( ' filter ' , ' test ' ) :
# filter and test plugin files can contain multiple plugins
# they must have a unique python module name to prevent them from shadowing each other
full_name = ' {0} _ {1} ' . format ( abs ( hash ( path ) ) , basename )
else :
full_name = basename
try :
if self . subdir in ( ' filter_plugins ' , ' test_plugins ' ) :
# filter and test plugin files can contain multiple plugins
# they must have a unique python module name to prevent them from shadowing each other
full_name = ' {0} _ {1} ' . format ( abs ( hash ( path ) ) , basename )
else :
full_name = basename
module = self . _load_module_source ( full_name , path )
except Exception as e :
display . warning ( " Skipping plugin ( %s ) as it seems to be invali d: %s " % ( path , to_text ( e ) ) )
display . warning ( " Skipping plugin ( %s ), cannot load: %s " % ( path , to_text ( e ) ) )
continue
self . _module_cache [ path ] = module
found_in_cache = False
else :
@ -1017,58 +1027,285 @@ class Jinja2Loader(PluginLoader):
PluginLoader optimized for Jinja2 plugins
The filter and test plugins are Jinja2 plugins encapsulated inside of our plugin format .
The way the calling code is setup , we need to do a few things differently in the all ( ) method
We can ' t use the base class version because of file == plugin assumptions and dedupe logic
We need to do a few things differently in the base class because of file == plugin
assumptions and dedupe logic .
"""
def __init__ ( self , class_name , package , config , subdir , aliases = None , required_base_class = None ) :
super ( Jinja2Loader , self ) . __init__ ( class_name , package , config , subdir , aliases = aliases , required_base_class = required_base_class )
self . _loaded_j2_file_maps = [ ]
def _clear_caches ( self ) :
super ( Jinja2Loader , self ) . _clear_caches ( )
self . _loaded_j2_file_maps = [ ]
def find_plugin ( self , name , mod_type = ' ' , ignore_deprecated = False , check_aliases = False , collection_list = None ) :
''' this is really ' find plugin file ' '''
return super ( Jinja2Loader , self ) . find_plugin ( name , mod_type = mod_type , ignore_deprecated = ignore_deprecated , check_aliases = check_aliases ,
collection_list = collection_list )
# TODO: handle collection plugin find, see 'get_with_context'
# this can really 'find plugin file'
plugin = super ( Jinja2Loader , self ) . find_plugin ( name , mod_type = mod_type , ignore_deprecated = ignore_deprecated , check_aliases = check_aliases ,
collection_list = collection_list )
def get ( self , name , * args , * * kwargs ) :
# if not found, try loading all non collection plugins and see if this in there
if not plugin :
all_plugins = self . all ( )
plugin = all_plugins . get ( name , None )
return plugin
@property
def method_map_name ( self ) :
return get_plugin_class ( self . class_name ) + ' s '
def get_contained_plugins ( self , collection , plugin_path , name ) :
plugins = [ ]
full_name = ' . ' . join ( [ ' ansible_collections ' , collection , ' plugins ' , self . type , name ] )
try :
# use 'parent' loader class to find files, but cannot return this as it can contain multiple plugins per file
if plugin_path not in self . _module_cache :
self . _module_cache [ plugin_path ] = self . _load_module_source ( full_name , plugin_path )
module = self . _module_cache [ plugin_path ]
obj = getattr ( module , self . class_name )
except Exception as e :
raise KeyError ( ' Failed to load %s for %s : %s ' % ( plugin_path , collection , to_native ( e ) ) )
plugin_impl = obj ( )
if plugin_impl is None :
raise KeyError ( ' Could not find %s . %s ' % ( collection , name ) )
try :
method_map = getattr ( plugin_impl , self . method_map_name )
plugin_map = method_map ( ) . items ( )
except Exception as e :
display . warning ( " Ignoring %s plugins in ' %s ' as it seems to be invalid: %r " % ( self . type , to_text ( plugin_path ) , e ) )
return plugins
for func_name , func in plugin_map :
fq_name = ' . ' . join ( ( collection , func_name ) )
pclass = self . _load_jinja2_class ( )
plugin = pclass ( func )
if plugin in plugins :
continue
self . _update_object ( plugin , fq_name , plugin_path )
plugins . append ( plugin )
return plugins
def get_with_context ( self , name , * args , * * kwargs ) :
# found_in_cache = True
class_only = kwargs . pop ( ' class_only ' , False ) # just pop it, dont want to pass through
collection_list = kwargs . pop ( ' collection_list ' , None )
context = PluginLoadContext ( )
# avoid collection path for legacy
name = name . removeprefix ( ' ansible.legacy. ' )
if ' . ' not in name and not collection_list :
# find in builtin/legacy list
for known_plugin in self . all ( * args , * * kwargs ) :
if known_plugin . _load_name == name :
# set context
context . resolved = True
context . plugin_resolved_name = name
context . plugin_resolved_path = known_plugin . _original_path
# TODO: context.plugin_resolved_collection = 'ansible.builtin' if path_with_context.internal else 'ansible.legacy'
return get_with_context_result ( known_plugin , context )
plugin = None
key , leaf_key = get_fqcr_and_name ( name )
seen = set ( )
# follow the meta!
while True :
if key in seen :
raise AnsibleError ( ' recursive collection redirect found for %r ' % name , 0 )
seen . add ( key )
acr = AnsibleCollectionRef . try_parse_fqcr ( key , self . type )
if not acr :
raise KeyError ( ' invalid plugin name: {0} ' . format ( key ) )
try :
ts = _get_collection_metadata ( acr . collection )
except ValueError as e :
# no collection
raise KeyError ( ' Invalid plugin FQCN ( {0} ): {1} ' . format ( key , to_native ( e ) ) )
# TODO: implement cycle detection (unified across collection redir as well)
routing_entry = ts . get ( ' plugin_routing ' , { } ) . get ( self . type , { } ) . get ( leaf_key , { } )
# check deprecations
deprecation_entry = routing_entry . get ( ' deprecation ' )
if deprecation_entry :
warning_text = deprecation_entry . get ( ' warning_text ' )
removal_date = deprecation_entry . get ( ' removal_date ' )
removal_version = deprecation_entry . get ( ' removal_version ' )
if not warning_text :
warning_text = ' {0} " {1} " is deprecated ' . format ( self . type , key )
display . deprecated ( warning_text , version = removal_version , date = removal_date , collection_name = acr . collection )
# check removal
tombstone_entry = routing_entry . get ( ' tombstone ' )
if tombstone_entry :
warning_text = tombstone_entry . get ( ' warning_text ' )
removal_date = tombstone_entry . get ( ' removal_date ' )
removal_version = tombstone_entry . get ( ' removal_version ' )
if not warning_text :
warning_text = ' {0} " {1} " has been removed ' . format ( self . type , key )
if ' . ' in name : # NOTE: this is wrong way to detect collection, see note above for example
return super ( Jinja2Loader , self ) . get ( name , * args , * * kwargs )
exc_msg = display . get_deprecation_message ( warning_text , version = removal_version , date = removal_date ,
collection_name = acr . collection , removed = True )
# Nothing is currently using this method
raise AnsibleError ( ' No code should call " get " for Jinja2Loaders (Not implemented) for non collection use ' )
raise AnsiblePluginRemovedError ( exc_msg )
# check redirects
redirect = routing_entry . get ( ' redirect ' , None )
if redirect :
next_key , leaf_key = get_fqcr_and_name ( redirect , collection = acr . collection )
display . vvv ( ' redirecting (type: {0} ) {1} . {2} to {3} ' . format ( self . type , acr . collection , acr . resource , next_key ) )
key = next_key
else :
break
try :
pkg = import_module ( acr . n_python_package_name )
except ImportError as e :
raise KeyError ( to_native ( e ) )
parent_prefix = acr . collection
if acr . subdirs :
parent_prefix = ' {0} . {1} ' . format ( parent_prefix , acr . subdirs )
try :
for dummy , module_name , ispkg in pkgutil . iter_modules ( pkg . __path__ , prefix = parent_prefix + ' . ' ) :
if ispkg :
continue
try :
# use 'parent' loader class to find files, but cannot return this as it can contain
# multiple plugins per file
plugin_impl = super ( Jinja2Loader , self ) . get_with_context ( module_name , * args , * * kwargs )
except Exception as e :
raise KeyError ( to_native ( e ) )
try :
method_map = getattr ( plugin_impl . object , self . method_map_name )
plugin_map = method_map ( ) . items ( )
except Exception as e :
display . warning ( " Skipping %s plugins in ' %s ' as it seems to be invalid: %r " % ( self . type , to_text ( plugin_impl . object . _original_path ) , e ) )
continue
for func_name , func in plugin_map :
fq_name = ' . ' . join ( ( parent_prefix , func_name ) )
# TODO: load anyways into CACHE so we only match each at end of loop
# the files themseves should already be cached by base class caching of modules(python)
if key in ( func_name , fq_name ) :
pclass = self . _load_jinja2_class ( )
plugin = pclass ( func )
if plugin :
context = plugin_impl . plugin_load_context
self . _update_object ( plugin , fq_name , plugin_impl . object . _original_path )
break # go to next file as it can override if dupe (dont break both loops)
except AnsiblePluginRemovedError as apre :
raise AnsibleError ( to_native ( apre ) , 0 , orig_exc = apre )
except ( AnsibleError , KeyError ) :
raise
except Exception as ex :
display . warning ( ' An unexpected error occurred during Jinja2 plugin loading: {0} ' . format ( to_native ( ex ) ) )
display . vvv ( ' Unexpected error during Jinja2 plugin loading: {0} ' . format ( format_exc ( ) ) )
raise AnsibleError ( to_native ( ex ) , 0 , orig_exc = ex )
return get_with_context_result ( plugin , context )
def all ( self , * args , * * kwargs ) :
# inputs, we ignore 'dedupe' we always do, used in base class to find files for this one
path_only = kwargs . pop ( ' path_only ' , False )
class_only = kwargs . pop ( ' class_only ' , False ) # basically ignored for test/filters since they are functions
# Having both path_only and class_only is a coding bug
if path_only and class_only :
raise AnsibleError ( ' Do not set both path_only and class_only when calling PluginLoader.all() ' )
found = set ( )
# get plugins from files in configured paths (mulitple in each)
for p_map in self . _j2_all_file_maps ( * args , * * kwargs ) :
# p_map is really object from file with class that holds mulitple plugins
plugins_list = getattr ( p_map , self . method_map_name )
try :
plugins = plugins_list ( )
except Exception as e :
display . vvvv ( " Skipping %s plugins in ' %s ' as it seems to be invalid: %r " % ( self . type , to_text ( p_map . _original_path ) , e ) )
continue
for plugin_name in plugins . keys ( ) :
if plugin_name in _PLUGIN_FILTERS [ self . package ] :
display . debug ( " %s skipped due to a defined plugin filter " % plugin_name )
continue
if plugin_name in found :
display . debug ( " %s skipped as duplicate " % plugin_name )
continue
if path_only :
result = p_map . _original_path
else :
# loader class is for the file with multiple plugins, but each plugin now has it's own class
pclass = self . _load_jinja2_class ( )
result = pclass ( plugins [ plugin_name ] ) # if bad plugin, let exception rise
found . add ( plugin_name )
self . _update_object ( result , plugin_name , p_map . _original_path )
yield result
def _load_jinja2_class ( self ) :
""" override the normal method of plugin classname as these are used in the generic funciton
to access the ' multimap ' of filter / tests to function , this is a ' singular ' plugin for
each entry .
"""
Differences with : meth : ` PluginLoader . all ` :
class_name = ' AnsibleJinja2 %s ' % get_plugin_class ( self . class_name ) . capitalize ( )
module = __import__ ( self . package , fromlist = [ class_name ] )
return getattr ( module , class_name )
def _j2_all_file_maps ( self , * args , * * kwargs ) :
"""
* Unlike other plugin types , file != plugin , a file can contain multiple plugins ( of same type ) .
This is why we do not deduplicate ansible file names at this point , we mostly care about
the names of the actual jinja2 plugins which are inside of our files .
* We reverse the order of the list of files compared to other PluginLoaders . This is
because of how calling code chooses to sync the plugins from the list . It adds all the
Jinja2 plugins from one of our Ansible files into a dict . Then it adds the Jinja2
plugins from the next Ansible file , overwriting any Jinja2 plugins that had the same
name . This is an encapsulation violation ( the PluginLoader should not know about what
calling code does with the data ) but we ' re pushing the common code here. We ' ll fix
this in the future by moving more of the common code into this PluginLoader .
* We return a list . We could iterate the list instead but that ' s extra work for no gain because
the API receiving this doesn ' t care. It just needs an iterable
* This method will NOT fetch collection plugins , only those that would be expected under ' ansible.legacy ' .
* This method will NOT fetch collection plugin files , only those that would be expected under ' ansible.builtin/legacy ' .
"""
# We don't deduplicate ansible file names.
# Instead, calling code deduplicates jinja2 plugin names when loading each file.
kwargs [ ' _dedupe ' ] = False
# TODO: move this to initialization and extract/dedupe plugin names in loader and offset this from
# caller. It would have to cache/refresh on add_directory to reevaluate plugin list and dedupe.
# Another option is to always prepend 'ansible.legac'y and force the collection path to
# load/find plugins, just need to check compatibility of that approach.
# This would also enable get/find_plugin for these type of plugins.
# We have to instantiate a list of all files so that we can reverse the list.
# We reverse it so that calling code will deduplicate this correctly.
files = list ( super ( Jinja2Loader , self ) . all ( * args , * * kwargs ) )
files . reverse ( )
return files
# populate cache if needed
if not self . _loaded_j2_file_maps :
# We don't deduplicate ansible file names.
# Instead, calling code deduplicates jinja2 plugin names when loading each file.
kwargs [ ' _dedupe ' ] = False
# To match correct precedence, call base class' all() to get a list of files,
self . _loaded_j2_file_maps = list ( super ( Jinja2Loader , self ) . all ( * args , * * kwargs ) )
return self . _loaded_j2_file_maps
def get_fqcr_and_name ( resource , collection = ' ansible.builtin ' ) :
if ' . ' not in resource :
name = resource
fqcr = collection + ' . ' + resource
else :
name = resource . split ( ' . ' ) [ - 1 ]
fqcr = resource
return fqcr , name
def _load_plugin_filter ( ) :