@ -30,9 +30,6 @@ _SYNTHETIC_PACKAGES = {
' ansible_collections.ansible.builtin.plugins.modules ' : dict ( type = ' flatmap ' , flatmap = ' ansible.modules ' , graft = True ) ,
' ansible_collections.ansible.builtin.plugins.modules ' : dict ( type = ' flatmap ' , flatmap = ' ansible.modules ' , graft = True ) ,
}
}
# TODO: tighten this up to subset Python identifier requirements (and however we want to restrict ns/collection names)
_collection_qualified_re = re . compile ( to_text ( r ' ^( \ w+) \ .( \ w+) \ .( \ w+)$ ' ) )
# FIXME: exception handling/error logging
# FIXME: exception handling/error logging
class AnsibleCollectionLoader ( with_metaclass ( Singleton , object ) ) :
class AnsibleCollectionLoader ( with_metaclass ( Singleton , object ) ) :
@ -271,32 +268,186 @@ class AnsibleFlatMapLoader(object):
# def is_package(self, fullname):
# def is_package(self, fullname):
class AnsibleCollectionRef :
# FUTURE: introspect plugin loaders to get these dynamically?
VALID_REF_TYPES = frozenset ( to_text ( r ) for r in [ ' action ' , ' become ' , ' cache ' , ' callback ' , ' cliconf ' , ' connection ' ,
' doc_fragments ' , ' filter ' , ' httpapi ' , ' inventory ' , ' lookup ' ,
' module_utils ' , ' modules ' , ' netconf ' , ' role ' , ' shell ' , ' strategy ' ,
' terminal ' , ' test ' , ' vars ' ] )
# FIXME: tighten this up to match Python identifier reqs, etc
VALID_COLLECTION_NAME_RE = re . compile ( to_text ( r ' ^( \ w+) \ .( \ w+)$ ' ) )
VALID_SUBDIRS_RE = re . compile ( to_text ( r ' ^ \ w+( \ . \ w+)*$ ' ) )
VALID_FQCR_RE = re . compile ( to_text ( r ' ^ \ w+ \ . \ w+ \ . \ w+( \ . \ w+)*$ ' ) ) # can have 0-N included subdirs as well
def __init__ ( self , collection_name , subdirs , resource , ref_type ) :
"""
Create an AnsibleCollectionRef from components
: param collection_name : a collection name of the form ' namespace.collectionname '
: param subdirs : optional subdir segments to be appended below the plugin type ( eg , ' subdir1.subdir2 ' )
: param resource : the name of the resource being references ( eg , ' mymodule ' , ' someaction ' , ' a_role ' )
: param ref_type : the type of the reference , eg ' module ' , ' role ' , ' doc_fragment '
"""
collection_name = to_text ( collection_name , errors = ' strict ' )
if subdirs is not None :
subdirs = to_text ( subdirs , errors = ' strict ' )
resource = to_text ( resource , errors = ' strict ' )
ref_type = to_text ( ref_type , errors = ' strict ' )
if not self . is_valid_collection_name ( collection_name ) :
raise ValueError ( ' invalid collection name (must be of the form namespace.collection): {0} ' . format ( to_native ( collection_name ) ) )
if ref_type not in self . VALID_REF_TYPES :
raise ValueError ( ' invalid collection ref_type: {0} ' . format ( ref_type ) )
self . collection = collection_name
if subdirs :
if not re . match ( self . VALID_SUBDIRS_RE , subdirs ) :
raise ValueError ( ' invalid subdirs entry: {0} (must be empty/None or of the form subdir1.subdir2) ' . format ( to_native ( subdirs ) ) )
self . subdirs = subdirs
else :
self . subdirs = u ' '
self . resource = resource
self . ref_type = ref_type
package_components = [ u ' ansible_collections ' , self . collection ]
if self . ref_type == u ' role ' :
package_components . append ( u ' roles ' )
else :
# we assume it's a plugin
package_components + = [ u ' plugins ' , self . ref_type ]
if self . subdirs :
package_components . append ( self . subdirs )
if self . ref_type == u ' role ' :
# roles are their own resource
package_components . append ( self . resource )
self . n_python_package_name = to_native ( ' . ' . join ( package_components ) )
@staticmethod
def from_fqcr ( ref , ref_type ) :
"""
Parse a string as a fully - qualified collection reference , raises ValueError if invalid
: param ref : collection reference to parse ( a valid ref is of the form ' ns.coll.resource ' or ' ns.coll.subdir1.subdir2.resource ' )
: param ref_type : the type of the reference , eg ' module ' , ' role ' , ' doc_fragment '
: return : a populated AnsibleCollectionRef object
"""
# assuming the fq_name is of the form (ns).(coll).(optional_subdir_N).(resource_name),
# we split the resource name off the right, split ns and coll off the left, and we're left with any optional
# subdirs that need to be added back below the plugin-specific subdir we'll add. So:
# ns.coll.resource -> ansible_collections.ns.coll.plugins.(plugintype).resource
# ns.coll.subdir1.resource -> ansible_collections.ns.coll.plugins.subdir1.(plugintype).resource
# ns.coll.rolename -> ansible_collections.ns.coll.roles.rolename
if not AnsibleCollectionRef . is_valid_fqcr ( ref ) :
raise ValueError ( ' {0} is not a valid collection reference ' . format ( to_native ( ref ) ) )
ref = to_text ( ref , errors = ' strict ' )
ref_type = to_text ( ref_type , errors = ' strict ' )
resource_splitname = ref . rsplit ( u ' . ' , 1 )
package_remnant = resource_splitname [ 0 ]
resource = resource_splitname [ 1 ]
# split the left two components of the collection package name off, anything remaining is plugin-type
# specific subdirs to be added back on below the plugin type
package_splitname = package_remnant . split ( u ' . ' , 2 )
if len ( package_splitname ) == 3 :
subdirs = package_splitname [ 2 ]
else :
subdirs = u ' '
collection_name = u ' . ' . join ( package_splitname [ 0 : 2 ] )
return AnsibleCollectionRef ( collection_name , subdirs , resource , ref_type )
@staticmethod
def try_parse_fqcr ( ref , ref_type ) :
"""
Attempt to parse a string as a fully - qualified collection reference , returning None on failure ( instead of raising an error )
: param ref : collection reference to parse ( a valid ref is of the form ' ns.coll.resource ' or ' ns.coll.subdir1.subdir2.resource ' )
: param ref_type : the type of the reference , eg ' module ' , ' role ' , ' doc_fragment '
: return : a populated AnsibleCollectionRef object on successful parsing , else None
"""
try :
return AnsibleCollectionRef . from_fqcr ( ref , ref_type )
except ValueError :
pass
@staticmethod
def legacy_plugin_dir_to_plugin_type ( legacy_plugin_dir_name ) :
"""
Utility method to convert from a PluginLoader dir name to a plugin ref_type
: param legacy_plugin_dir_name : PluginLoader dir name ( eg , ' action_plugins ' , ' library ' )
: return : the corresponding plugin ref_type ( eg , ' action ' , ' role ' )
"""
legacy_plugin_dir_name = to_text ( legacy_plugin_dir_name )
plugin_type = legacy_plugin_dir_name . replace ( u ' _plugins ' , u ' ' )
if plugin_type == u ' library ' :
plugin_type = u ' modules '
if plugin_type not in AnsibleCollectionRef . VALID_REF_TYPES :
raise ValueError ( ' {0} cannot be mapped to a valid collection ref type ' . format ( to_native ( legacy_plugin_dir_name ) ) )
return plugin_type
@staticmethod
def is_valid_fqcr ( ref , ref_type = None ) :
"""
Validates if is string is a well - formed fully - qualified collection reference ( does not look up the collection itself )
: param ref : candidate collection reference to validate ( a valid ref is of the form ' ns.coll.resource ' or ' ns.coll.subdir1.subdir2.resource ' )
: param ref_type : optional reference type to enable deeper validation , eg ' module ' , ' role ' , ' doc_fragment '
: return : True if the collection ref passed is well - formed , False otherwise
"""
ref = to_text ( ref )
if not ref_type :
return bool ( re . match ( AnsibleCollectionRef . VALID_FQCR_RE , ref ) )
return bool ( AnsibleCollectionRef . try_parse_fqcr ( ref , ref_type ) )
@staticmethod
def is_valid_collection_name ( collection_name ) :
"""
Validates if is string is a well - formed collection name ( does not look up the collection itself )
: param collection_name : candidate collection name to validate ( a valid name is of the form ' ns.collname ' )
: return : True if the collection name passed is well - formed , False otherwise
"""
collection_name = to_text ( collection_name )
return bool ( re . match ( AnsibleCollectionRef . VALID_COLLECTION_NAME_RE , collection_name ) )
def get_collection_role_path ( role_name , collection_list = None ) :
def get_collection_role_path ( role_name , collection_list = None ) :
match = _collection_qualified_re . match ( role_name )
acr = AnsibleCollectionRef . try_parse_fqcr ( role_name , ' role ' )
if match :
if acr :
grps = match . groups ( )
# looks like a valid qualified collection ref; skip the collection_list
collection_list = [ ' . ' . join ( grps [ : 2 ] ) ]
role = acr . resource
role = grps [ 2 ]
collection_list = [ acr . collection ]
elif not collection_list :
elif not collection_list :
return None # not a FQ role and no collection search list spec'd, nothing to do
return None # not a FQ role and no collection search list spec'd, nothing to do
else :
else :
role = role_name
role = role_name # treat as unqualified, loop through the collection search list to try and resolve
for collection_name in collection_list :
for collection_name in collection_list :
try :
try :
role_package = u ' ansible_collections. {0} .roles. {1} ' . format ( collection_name , role )
acr = AnsibleCollectionRef ( collection_name = collection_name , subdirs = acr . subdirs , resource = acr . resource , ref_type = acr . ref_typ e)
# FIXME: error handling/logging; need to catch any import failures and move along
# FIXME: error handling/logging; need to catch any import failures and move along
# FIXME: this line shouldn't be necessary, but py2 pkgutil.get_data is delegating back to built-in loader when it shouldn't
# FIXME: this line shouldn't be necessary, but py2 pkgutil.get_data is delegating back to built-in loader when it shouldn't
pkg = import_module ( role_package + u ' .tasks ' )
pkg = import_module ( acr . n_python_package_name )
# get_data input must be a native string
tasks_file = pkgutil . get_data ( to_native ( role_package ) + ' .tasks ' , ' main.yml ' )
if tasks_file is not None :
if pkg is not None :
# the package is now loaded, get the collection's package and ask where it lives
# the package is now loaded, get the collection's package and ask where it lives
path = os . path . dirname ( to_bytes ( sys . modules [ role_package ] . __file__ , errors = ' surrogate_or_strict ' ) )
path = os . path . dirname ( to_bytes ( sys . modules [ acr. n_python_package_nam e] . __file__ , errors = ' surrogate_or_strict ' ) )
return role , to_text ( path , errors = ' surrogate_or_strict ' ) , collection_name
return role , to_text ( path , errors = ' surrogate_or_strict ' ) , collection_name
except IOError :
except IOError :
@ -308,9 +459,5 @@ def get_collection_role_path(role_name, collection_list=None):
return None
return None
def is_collection_ref ( candidate_name ) :
return bool ( _collection_qualified_re . match ( candidate_name ) )
def set_collection_playbook_paths ( b_playbook_paths ) :
def set_collection_playbook_paths ( b_playbook_paths ) :
AnsibleCollectionLoader ( ) . set_playbook_paths ( b_playbook_paths )
AnsibleCollectionLoader ( ) . set_playbook_paths ( b_playbook_paths )