@ -48,6 +48,7 @@ class AnsibleCollectionLoader(with_metaclass(Singleton, object)):
self . _n_configured_paths = [ to_native ( os . path . expanduser ( p ) , errors = ' surrogate_or_strict ' ) for p in self . _n_configured_paths ]
self . _n_playbook_paths = [ ]
self . _default_collection = None
# pre-inject grafted package maps so we can force them to use the right loader instead of potentially delegating to a "normal" loader
for syn_pkg_def in ( p for p in iteritems ( _SYNTHETIC_PACKAGES ) if p [ 1 ] . get ( ' graft ' ) ) :
pkg_name = syn_pkg_def [ 0 ]
@ -70,6 +71,14 @@ class AnsibleCollectionLoader(with_metaclass(Singleton, object)):
def n_collection_paths ( self ) :
return self . _n_playbook_paths + self . _n_configured_paths
def get_collection_path ( self , collection_name ) :
if not AnsibleCollectionRef . is_valid_collection_name ( collection_name ) :
raise ValueError ( ' {0} is not a valid collection name ' . format ( to_native ( collection_name ) ) )
m = import_module ( ' ansible_collections. {0} ' . format ( collection_name ) )
return m . __file__
def set_playbook_paths ( self , b_playbook_paths ) :
if isinstance ( b_playbook_paths , string_types ) :
b_playbook_paths = [ b_playbook_paths ]
@ -81,6 +90,15 @@ class AnsibleCollectionLoader(with_metaclass(Singleton, object)):
self . _n_playbook_paths = [ os . path . join ( to_native ( p ) , ' collections ' ) for p in b_playbook_paths if not ( p in added_paths or added_paths . add ( p ) ) ]
# FIXME: only allow setting this once, or handle any necessary cache/package path invalidations internally?
# FIXME: is there a better place to store this?
# FIXME: only allow setting this once
def set_default_collection ( self , collection_name ) :
self . _default_collection = collection_name
@property
def default_collection ( self ) :
return self . _default_collection
def find_module ( self , fullname , path = None ) :
# this loader is only concerned with items under the Ansible Collections namespace hierarchy, ignore others
if fullname . startswith ( ' ansible_collections. ' ) or fullname == ' ansible_collections ' :
@ -432,14 +450,17 @@ def get_collection_role_path(role_name, collection_list=None):
# looks like a valid qualified collection ref; skip the collection_list
role = acr . resource
collection_list = [ acr . collection ]
subdirs = acr . subdirs
resource = acr . resource
elif not collection_list :
return None # not a FQ role and no collection search list spec'd, nothing to do
else :
role = role_name # treat as unqualified, loop through the collection search list to try and resolve
resource = role_name # treat as unqualified, loop through the collection search list to try and resolve
subdirs = ' '
for collection_name in collection_list :
try :
acr = AnsibleCollectionRef ( collection_name = collection_name , subdirs = acr. subdirs, resource = acr. resource , ref_type = acr . ref_type )
acr = AnsibleCollectionRef ( collection_name = collection_name , subdirs = subdirs, resource = resource, ref_type = ' role ' )
# FIXME: error handling/logging; need to catch any import failures and move along
# FIXME: this line shouldn't be necessary, but py2 pkgutil.get_data is delegating back to built-in loader when it shouldn't
@ -448,7 +469,7 @@ def get_collection_role_path(role_name, collection_list=None):
if pkg is not None :
# the package is now loaded, get the collection's package and ask where it lives
path = os . path . dirname ( to_bytes ( sys . modules [ acr . n_python_package_name ] . __file__ , errors = ' surrogate_or_strict ' ) )
return r ol e, to_text ( path , errors = ' surrogate_or_strict ' ) , collection_name
return r esourc e, to_text ( path , errors = ' surrogate_or_strict ' ) , collection_name
except IOError :
continue
@ -459,5 +480,50 @@ def get_collection_role_path(role_name, collection_list=None):
return None
_N_COLLECTION_PATH_RE = re . compile ( r ' /ansible_collections/([^/]+)/([^/]+) ' )
def get_collection_name_from_path ( path ) :
"""
Return the containing collection name for a given path , or None if the path is not below a configured collection , or
the collection cannot be loaded ( eg , the collection is masked by another of the same name higher in the configured
collection roots ) .
: param n_path : native - string path to evaluate for collection containment
: return : collection name or None
"""
n_collection_paths = [ to_native ( os . path . realpath ( to_bytes ( p ) ) ) for p in AnsibleCollectionLoader ( ) . n_collection_paths ]
b_path = os . path . realpath ( to_bytes ( path ) )
n_path = to_native ( b_path )
for coll_path in n_collection_paths :
common_prefix = to_native ( os . path . commonprefix ( [ b_path , to_bytes ( coll_path ) ] ) )
if common_prefix == coll_path :
# strip off the common prefix (handle weird testing cases of nested collection roots, eg)
collection_remnant = n_path [ len ( coll_path ) : ]
# commonprefix may include the trailing /, prepend to the remnant if necessary (eg trailing / on root)
if collection_remnant [ 0 ] != ' / ' :
collection_remnant = ' / ' + collection_remnant
# the path lives under this collection root, see if it maps to a collection
found_collection = _N_COLLECTION_PATH_RE . search ( collection_remnant )
if not found_collection :
continue
n_collection_name = ' {0} . {1} ' . format ( * found_collection . groups ( ) )
loaded_collection_path = AnsibleCollectionLoader ( ) . get_collection_path ( n_collection_name )
if not loaded_collection_path :
return None
# ensure we're using the canonical real path, with the bogus __synthetic__ stripped off
b_loaded_collection_path = os . path . dirname ( os . path . realpath ( to_bytes ( loaded_collection_path ) ) )
# if the collection path prefix matches the path prefix we were passed, it's the same collection that's loaded
if os . path . commonprefix ( [ b_path , b_loaded_collection_path ] ) == b_loaded_collection_path :
return n_collection_name
return None # if not, it's a collection, but not the same collection the loader sees, so ignore it
def set_collection_playbook_paths ( b_playbook_paths ) :
AnsibleCollectionLoader ( ) . set_playbook_paths ( b_playbook_paths )