@ -40,6 +40,14 @@ except ImportError:
_meta_yml_to_dict = None
if not hasattr ( __builtins__ , ' ModuleNotFoundError ' ) :
# this was introduced in Python 3.6
ModuleNotFoundError = ImportError
PB_EXTENSIONS = ( ' .yml ' , ' .yaml ' )
class _AnsibleCollectionFinder :
def __init__ ( self , paths = None , scan_sys_paths = True ) :
# TODO: accept metadata loader override
@ -663,7 +671,7 @@ class AnsibleCollectionRef:
VALID_REF_TYPES = frozenset ( to_text ( r ) for r in [ ' action ' , ' become ' , ' cache ' , ' callback ' , ' cliconf ' , ' connection ' ,
' doc_fragments ' , ' filter ' , ' httpapi ' , ' inventory ' , ' lookup ' ,
' module_utils ' , ' modules ' , ' netconf ' , ' role ' , ' shell ' , ' strategy ' ,
' terminal ' , ' test ' , ' vars ' ])
' terminal ' , ' test ' , ' vars ' , ' playbook ' ])
# FIXME: tighten this up to match Python identifier reqs, etc
VALID_COLLECTION_NAME_RE = re . compile ( to_text ( r ' ^( \ w+) \ .( \ w+)$ ' ) )
@ -708,6 +716,8 @@ class AnsibleCollectionRef:
if self . ref_type == u ' role ' :
package_components . append ( u ' roles ' )
elif self . ref_type == u ' playbook ' :
package_components . append ( u ' playbooks ' )
else :
# we assume it's a plugin
package_components + = [ u ' plugins ' , self . ref_type ]
@ -716,8 +726,8 @@ class AnsibleCollectionRef:
package_components . append ( self . subdirs )
fqcr_components . append ( self . subdirs )
if self . ref_type == u ' role ' :
# roles are their own resource
if self . ref_type in ( u ' role ' , u ' playbook ' ) :
# playbooks and roles are their own resource
package_components . append ( self . resource )
fqcr_components . append ( self . resource )
@ -751,10 +761,17 @@ class AnsibleCollectionRef:
ref = to_text ( ref , errors = ' strict ' )
ref_type = to_text ( ref_type , errors = ' strict ' )
ext = ' '
resource_splitname = ref . rsplit ( u ' . ' , 1 )
package_remnant = resource_splitname [ 0 ]
resource = resource_splitname [ 1 ]
if ref_type == u ' playbook ' and ref . endswith ( PB_EXTENSIONS ) :
resource_splitname = ref . rsplit ( u ' . ' , 2 )
package_remnant = resource_splitname [ 0 ]
resource = resource_splitname [ 1 ]
ext = ' . ' + resource_splitname [ 2 ]
else :
resource_splitname = ref . rsplit ( u ' . ' , 1 )
package_remnant = resource_splitname [ 0 ]
resource = resource_splitname [ 1 ]
# split the left two components of the collection package name off, anything remaining is plugin-type
# specific subdirs to be added back on below the plugin type
@ -766,7 +783,7 @@ class AnsibleCollectionRef:
collection_name = u ' . ' . join ( package_splitname [ 0 : 2 ] )
return AnsibleCollectionRef ( collection_name , subdirs , resource , ref_type )
return AnsibleCollectionRef ( collection_name , subdirs , resource + ext , ref_type )
@staticmethod
def try_parse_fqcr ( ref , ref_type ) :
@ -829,23 +846,55 @@ class AnsibleCollectionRef:
return bool ( re . match ( AnsibleCollectionRef . VALID_COLLECTION_NAME_RE , collection_name ) )
def _get_collection_playbook_path ( playbook ) :
acr = AnsibleCollectionRef . try_parse_fqcr ( playbook , u ' playbook ' )
if acr :
try :
# get_collection_path
pkg = import_module ( acr . n_python_collection_package_name )
except ( IOError , ModuleNotFoundError ) as e :
# leaving e as debug target, even though not used in normal code
pkg = None
if pkg :
cpath = os . path . join ( sys . modules [ acr . n_python_collection_package_name ] . __file__ . replace ( ' __synthetic__ ' , ' playbooks ' ) )
path = os . path . join ( cpath , to_native ( acr . resource ) )
if os . path . exists ( to_bytes ( path ) ) :
return acr . resource , path , acr . collection
elif not acr . resource . endswith ( PB_EXTENSIONS ) :
for ext in PB_EXTENSIONS :
path = os . path . join ( cpath , to_native ( acr . resource + ext ) )
if os . path . exists ( to_bytes ( path ) ) :
return acr . resource , path , acr . collection
return None
def _get_collection_role_path ( role_name , collection_list = None ) :
acr = AnsibleCollectionRef . try_parse_fqcr ( role_name , ' role ' )
return _get_collection_resource_path ( role_name , u ' role ' , collection_list )
def _get_collection_resource_path ( name , ref_type , collection_list = None ) :
if ref_type == u ' playbook ' :
# they are handled a bit diff due to 'extension variance' and no collection_list
return _get_collection_playbook_path ( name )
acr = AnsibleCollectionRef . try_parse_fqcr ( name , ref_type )
if acr :
# looks like a valid qualified collection ref; skip the collection_list
collection_list = [ acr . collection ]
subdirs = acr . subdirs
resource = acr . resource
elif not collection_list :
return None # not a FQ role and no collection search list spec'd, nothing to do
return None # not a FQ and no collection search list spec'd, nothing to do
else :
resource = role_name # treat as unqualified, loop through the collection search list to try and resolve
resource = name # treat as unqualified, loop through the collection search list to try and resolve
subdirs = ' '
for collection_name in collection_list :
try :
acr = AnsibleCollectionRef ( collection_name = collection_name , subdirs = subdirs , resource = resource , ref_type = ' role ' )
acr = AnsibleCollectionRef ( collection_name = collection_name , subdirs = subdirs , resource = resource , ref_type = ref_type )
# FIXME: error handling/logging; need to catch any import failures and move along
pkg = import_module ( acr . n_python_package_name )
@ -854,7 +903,7 @@ def _get_collection_role_path(role_name, collection_list=None):
path = os . path . dirname ( to_bytes ( sys . modules [ acr . n_python_package_name ] . __file__ , errors = ' surrogate_or_strict ' ) )
return resource , to_text ( path , errors = ' surrogate_or_strict ' ) , collection_name
except IOError :
except ( IOError , ModuleNotFoundError ) as e :
continue
except Exception as ex :
# FIXME: pick out typical import errors first, then error logging
@ -872,8 +921,8 @@ def _get_collection_name_from_path(path):
: return : collection name or None
"""
# FIXME: mess with realpath canonicalization or not?
path = to_native ( path)
# ensure we compare full paths since pkg path will be abspath
path = to_native ( os. path. abspath ( to_bytes ( path ) ) )
path_parts = path . split ( ' / ' )
if path_parts . count ( ' ansible_collections ' ) != 1 :