@ -6,12 +6,12 @@
from __future__ import annotations
from __future__ import annotations
import collections . abc as _c
import os
import os
import pathlib
import pathlib
import typing as t
import typing as t
from collections import namedtuple
from collections import namedtuple
from collections . abc import MutableSequence , MutableMapping
from glob import iglob
from glob import iglob
from urllib . parse import urlparse
from urllib . parse import urlparse
from yaml import safe_load
from yaml import safe_load
@ -43,7 +43,12 @@ _SOURCE_METADATA_FILE = b'GALAXY.yml'
display = Display ( )
display = Display ( )
def get_validated_source_info ( b_source_info_path , namespace , name , version ) :
def get_validated_source_info (
b_source_info_path : bytes ,
namespace : str ,
name : str ,
version : str ,
) - > dict [ str , object ] | None :
source_info_path = to_text ( b_source_info_path , errors = ' surrogate_or_strict ' )
source_info_path = to_text ( b_source_info_path , errors = ' surrogate_or_strict ' )
if not os . path . isfile ( b_source_info_path ) :
if not os . path . isfile ( b_source_info_path ) :
@ -58,7 +63,7 @@ def get_validated_source_info(b_source_info_path, namespace, name, version):
)
)
return None
return None
if not isinstance ( metadata , MutableMapping ) :
if not isinstance ( metadata , dict ) :
display . warning ( f " Error getting collection source information at ' { source_info_path } ' : expected a YAML dictionary " )
display . warning ( f " Error getting collection source information at ' { source_info_path } ' : expected a YAML dictionary " )
return None
return None
@ -72,7 +77,12 @@ def get_validated_source_info(b_source_info_path, namespace, name, version):
return metadata
return metadata
def _validate_v1_source_info_schema ( namespace , name , version , provided_arguments ) :
def _validate_v1_source_info_schema (
namespace : str ,
name : str ,
version : str ,
provided_arguments : dict [ str , object ] ,
) - > list [ str ] :
argument_spec_data = dict (
argument_spec_data = dict (
format_version = dict ( choices = [ " 1.0.0 " ] ) ,
format_version = dict ( choices = [ " 1.0.0 " ] ) ,
download_url = dict ( ) ,
download_url = dict ( ) ,
@ -102,24 +112,24 @@ def _validate_v1_source_info_schema(namespace, name, version, provided_arguments
return validation_result . error_messages
return validation_result . error_messages
def _is_collection_src_dir ( dir_path ):
def _is_collection_src_dir ( dir_path : bytes | str ) - > bool :
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
return os . path . isfile ( os . path . join ( b_dir_path , _GALAXY_YAML ) )
return os . path . isfile ( os . path . join ( b_dir_path , _GALAXY_YAML ) )
def _is_installed_collection_dir ( dir_path ):
def _is_installed_collection_dir ( dir_path : bytes | str ) - > bool :
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
return os . path . isfile ( os . path . join ( b_dir_path , _MANIFEST_JSON ) )
return os . path . isfile ( os . path . join ( b_dir_path , _MANIFEST_JSON ) )
def _is_collection_dir ( dir_path ):
def _is_collection_dir ( dir_path : bytes | str ) - > bool :
return (
return (
_is_installed_collection_dir ( dir_path ) or
_is_installed_collection_dir ( dir_path ) or
_is_collection_src_dir ( dir_path )
_is_collection_src_dir ( dir_path )
)
)
def _find_collections_in_subdirs ( dir_path ):
def _find_collections_in_subdirs ( dir_path : str ) - > _c . Iterator [ bytes ] :
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
subdir_glob_pattern = os . path . join (
subdir_glob_pattern = os . path . join (
@ -135,23 +145,23 @@ def _find_collections_in_subdirs(dir_path):
yield subdir
yield subdir
def _is_collection_namespace_dir ( tested_str ):
def _is_collection_namespace_dir ( tested_str : str ) - > bool :
return any ( _find_collections_in_subdirs ( tested_str ) )
return any ( _find_collections_in_subdirs ( tested_str ) )
def _is_file_path ( tested_str ):
def _is_file_path ( tested_str : str ) - > bool :
return os . path . isfile ( to_bytes ( tested_str , errors = ' surrogate_or_strict ' ) )
return os . path . isfile ( to_bytes ( tested_str , errors = ' surrogate_or_strict ' ) )
def _is_http_url ( tested_str ):
def _is_http_url ( tested_str : str ) - > bool :
return urlparse ( tested_str ) . scheme . lower ( ) in { ' http ' , ' https ' }
return urlparse ( tested_str ) . scheme . lower ( ) in { ' http ' , ' https ' }
def _is_git_url ( tested_str ):
def _is_git_url ( tested_str : str ) - > bool :
return tested_str . startswith ( ( ' git+ ' , ' git@ ' ) )
return tested_str . startswith ( ( ' git+ ' , ' git@ ' ) )
def _is_concrete_artifact_pointer ( tested_str ):
def _is_concrete_artifact_pointer ( tested_str : str ) - > bool :
return any (
return any (
predicate ( tested_str )
predicate ( tested_str )
for predicate in (
for predicate in (
@ -168,7 +178,7 @@ def _is_concrete_artifact_pointer(tested_str):
class _ComputedReqKindsMixin :
class _ComputedReqKindsMixin :
UNIQUE_ATTRS = ( ' fqcn ' , ' ver ' , ' src ' , ' type ' )
UNIQUE_ATTRS = ( ' fqcn ' , ' ver ' , ' src ' , ' type ' )
def __init__ ( self , * args , * * kwargs ) :
def __init__ ( self , * args , * * kwargs ) - > None :
if not self . may_have_offline_galaxy_info :
if not self . may_have_offline_galaxy_info :
self . _source_info = None
self . _source_info = None
else :
else :
@ -181,18 +191,18 @@ class _ComputedReqKindsMixin:
self . ver
self . ver
)
)
def __hash__ ( self ) :
def __hash__ ( self ) - > int :
return hash ( tuple ( getattr ( self , attr ) for attr in _ComputedReqKindsMixin . UNIQUE_ATTRS ) )
return hash ( tuple ( getattr ( self , attr ) for attr in _ComputedReqKindsMixin . UNIQUE_ATTRS ) )
def __eq__ ( self , candidate ):
def __eq__ ( self , candidate : _c . Hashable ) - > bool :
return hash ( self ) == hash ( candidate )
return hash ( self ) == hash ( candidate )
@classmethod
@classmethod
def from_dir_path_as_unknown ( # type: ignore[misc]
def from_dir_path_as_unknown (
cls , # type: t.Type[Collection]
cls ,
dir_path , # type : bytes
dir_path : bytes,
art_mgr , # type: ConcreteArtifactsManager
art_mgr : ConcreteArtifactsManager ,
) : # type: (...) -> Collection
) - > t . Self :
""" Make collection from an unspecified dir type.
""" Make collection from an unspecified dir type.
This alternative constructor attempts to grab metadata from the
This alternative constructor attempts to grab metadata from the
@ -215,11 +225,11 @@ class _ComputedReqKindsMixin:
return cls . from_dir_path_implicit ( dir_path )
return cls . from_dir_path_implicit ( dir_path )
@classmethod
@classmethod
def from_dir_path ( # type: ignore[misc]
def from_dir_path (
cls , # type: t.Type[Collection]
cls ,
dir_path , # type : bytes
dir_path : bytes,
art_mgr , # type: ConcreteArtifactsManager
art_mgr : ConcreteArtifactsManager ,
) : # type: (...) -> Collection
) - > t . Self :
""" Make collection from an directory with metadata. """
""" Make collection from an directory with metadata. """
if dir_path . endswith ( to_bytes ( os . path . sep ) ) :
if dir_path . endswith ( to_bytes ( os . path . sep ) ) :
dir_path = dir_path . rstrip ( to_bytes ( os . path . sep ) )
dir_path = dir_path . rstrip ( to_bytes ( os . path . sep ) )
@ -262,10 +272,10 @@ class _ComputedReqKindsMixin:
return cls ( req_name , req_version , dir_path , ' dir ' , None )
return cls ( req_name , req_version , dir_path , ' dir ' , None )
@classmethod
@classmethod
def from_dir_path_implicit ( # type: ignore[misc]
def from_dir_path_implicit (
cls , # type: t.Type[Collection]
cls ,
dir_path , # type : bytes
dir_path : bytes,
) : # type: (...) -> Collection
) - > t . Self :
""" Construct a collection instance based on an arbitrary dir.
""" Construct a collection instance based on an arbitrary dir.
This alternative constructor infers the FQCN based on the parent
This alternative constructor infers the FQCN based on the parent
@ -278,11 +288,16 @@ class _ComputedReqKindsMixin:
u_dir_path = to_text ( dir_path , errors = ' surrogate_or_strict ' )
u_dir_path = to_text ( dir_path , errors = ' surrogate_or_strict ' )
path_list = u_dir_path . split ( os . path . sep )
path_list = u_dir_path . split ( os . path . sep )
req_name = ' . ' . join ( path_list [ - 2 : ] )
req_name = ' . ' . join ( path_list [ - 2 : ] )
return cls ( req_name , ' * ' , dir_path , ' dir ' , None ) # type: ignore[call-arg]
return cls ( req_name , ' * ' , dir_path , ' dir ' , None )
@classmethod
@classmethod
def from_string ( cls , collection_input , artifacts_manager , supplemental_signatures ) :
def from_string (
req = { }
cls ,
collection_input : str ,
artifacts_manager : ConcreteArtifactsManager ,
supplemental_signatures : list [ str ] | None ,
) - > t . Self :
req : dict [ str , str | list [ str ] | None ] = { }
if _is_concrete_artifact_pointer ( collection_input ) or AnsibleCollectionRef . is_valid_collection_name ( collection_input ) :
if _is_concrete_artifact_pointer ( collection_input ) or AnsibleCollectionRef . is_valid_collection_name ( collection_input ) :
# Arg is a file path or URL to a collection, or just a collection
# Arg is a file path or URL to a collection, or just a collection
req [ ' name ' ] = collection_input
req [ ' name ' ] = collection_input
@ -307,7 +322,14 @@ class _ComputedReqKindsMixin:
return cls . from_requirement_dict ( req , artifacts_manager )
return cls . from_requirement_dict ( req , artifacts_manager )
@classmethod
@classmethod
def from_requirement_dict ( cls , collection_req , art_mgr , validate_signature_options = True ) :
def from_requirement_dict (
cls ,
# NOTE: The actual `collection_req` shape is supposed to be
# NOTE: `dict[str, str | list[str] | None]`
collection_req : dict [ str , t . Any ] ,
art_mgr : ConcreteArtifactsManager ,
validate_signature_options : bool = True ,
) - > t . Self :
req_name = collection_req . get ( ' name ' , None )
req_name = collection_req . get ( ' name ' , None )
req_version = collection_req . get ( ' version ' , ' * ' )
req_version = collection_req . get ( ' version ' , ' * ' )
req_type = collection_req . get ( ' type ' )
req_type = collection_req . get ( ' type ' )
@ -320,7 +342,7 @@ class _ComputedReqKindsMixin:
f " Signatures were provided to verify { req_name } but no keyring was configured. "
f " Signatures were provided to verify { req_name } but no keyring was configured. "
)
)
if not isinstance ( req_signature_sources , MutableSequence) :
if not isinstance ( req_signature_sources , _c. MutableSequence) :
req_signature_sources = [ req_signature_sources ]
req_signature_sources = [ req_signature_sources ]
req_signature_sources = frozenset ( req_signature_sources )
req_signature_sources = frozenset ( req_signature_sources )
@ -434,7 +456,11 @@ class _ComputedReqKindsMixin:
format ( not_url = req_source . api_server ) ,
format ( not_url = req_source . api_server ) ,
)
)
if req_type == ' dir ' and req_source . endswith ( os . path . sep ) :
if (
req_type == ' dir '
and isinstance ( req_source , str )
and req_source . endswith ( os . path . sep )
) :
req_source = req_source . rstrip ( os . path . sep )
req_source = req_source . rstrip ( os . path . sep )
tmp_inst_req = cls ( req_name , req_version , req_source , req_type , req_signature_sources )
tmp_inst_req = cls ( req_name , req_version , req_source , req_type , req_signature_sources )
@ -451,16 +477,16 @@ class _ComputedReqKindsMixin:
req_signature_sources ,
req_signature_sources ,
)
)
def __repr__ ( self ) :
def __repr__ ( self ) - > str :
return (
return (
' < {self!s} of type {coll_type!r} from {src!s} > ' .
' < {self!s} of type {coll_type!r} from {src!s} > ' .
format ( self = self , coll_type = self . type , src = self . src or ' Galaxy ' )
format ( self = self , coll_type = self . type , src = self . src or ' Galaxy ' )
)
)
def __str__ ( self ) :
def __str__ ( self ) - > str :
return to_native ( self . __unicode__ ( ) )
return to_native ( self . __unicode__ ( ) )
def __unicode__ ( self ) :
def __unicode__ ( self ) - > str :
if self . fqcn is None :
if self . fqcn is None :
return (
return (
f ' { self . type } collection from a Git repo ' if self . is_scm
f ' { self . type } collection from a Git repo ' if self . is_scm
@ -473,7 +499,7 @@ class _ComputedReqKindsMixin:
)
)
@property
@property
def may_have_offline_galaxy_info ( self ) :
def may_have_offline_galaxy_info ( self ) - > bool :
if self . fqcn is None :
if self . fqcn is None :
# Virtual collection
# Virtual collection
return False
return False
@ -482,7 +508,7 @@ class _ComputedReqKindsMixin:
return False
return False
return True
return True
def construct_galaxy_info_path ( self , b_collection_path ):
def construct_galaxy_info_path ( self , b_collection_path : bytes ) - > bytes :
if not self . may_have_offline_galaxy_info and not self . type == ' galaxy ' :
if not self . may_have_offline_galaxy_info and not self . type == ' galaxy ' :
raise TypeError ( ' Only installed collections from a Galaxy server have offline Galaxy info ' )
raise TypeError ( ' Only installed collections from a Galaxy server have offline Galaxy info ' )
@ -502,21 +528,21 @@ class _ComputedReqKindsMixin:
return self . fqcn . split ( ' . ' )
return self . fqcn . split ( ' . ' )
@property
@property
def namespace ( self ) :
def namespace ( self ) - > str :
if self . is_virtual :
if self . is_virtual :
raise TypeError ( f ' { self . type } collections do not have a namespace ' )
raise TypeError ( f ' { self . type } collections do not have a namespace ' )
return self . _get_separate_ns_n_name ( ) [ 0 ]
return self . _get_separate_ns_n_name ( ) [ 0 ]
@property
@property
def name ( self ) :
def name ( self ) - > str :
if self . is_virtual :
if self . is_virtual :
raise TypeError ( f ' { self . type } collections do not have a name ' )
raise TypeError ( f ' { self . type } collections do not have a name ' )
return self . _get_separate_ns_n_name ( ) [ - 1 ]
return self . _get_separate_ns_n_name ( ) [ - 1 ]
@property
@property
def canonical_package_id ( self ) :
def canonical_package_id ( self ) - > str :
if not self . is_virtual :
if not self . is_virtual :
return to_native ( self . fqcn )
return to_native ( self . fqcn )
@ -526,46 +552,46 @@ class _ComputedReqKindsMixin:
)
)
@property
@property
def is_virtual ( self ) :
def is_virtual ( self ) - > bool :
return self . is_scm or self . is_subdirs
return self . is_scm or self . is_subdirs
@property
@property
def is_file ( self ) :
def is_file ( self ) - > bool :
return self . type == ' file '
return self . type == ' file '
@property
@property
def is_dir ( self ) :
def is_dir ( self ) - > bool :
return self . type == ' dir '
return self . type == ' dir '
@property
@property
def namespace_collection_paths ( self ) :
def namespace_collection_paths ( self ) - > list [ str ] :
return [
return [
to_native ( path )
to_native ( path )
for path in _find_collections_in_subdirs ( self . src )
for path in _find_collections_in_subdirs ( self . src )
]
]
@property
@property
def is_subdirs ( self ) :
def is_subdirs ( self ) - > bool :
return self . type == ' subdirs '
return self . type == ' subdirs '
@property
@property
def is_url ( self ) :
def is_url ( self ) - > bool :
return self . type == ' url '
return self . type == ' url '
@property
@property
def is_scm ( self ) :
def is_scm ( self ) - > bool :
return self . type == ' git '
return self . type == ' git '
@property
@property
def is_concrete_artifact ( self ) :
def is_concrete_artifact ( self ) - > bool :
return self . type in { ' git ' , ' url ' , ' file ' , ' dir ' , ' subdirs ' }
return self . type in { ' git ' , ' url ' , ' file ' , ' dir ' , ' subdirs ' }
@property
@property
def is_online_index_pointer ( self ) :
def is_online_index_pointer ( self ) - > bool :
return not self . is_concrete_artifact
return not self . is_concrete_artifact
@property
@property
def is_pinned ( self ) :
def is_pinned ( self ) - > bool :
""" Indicate if the version set is considered pinned.
""" Indicate if the version set is considered pinned.
This essentially computes whether the version field of the current
This essentially computes whether the version field of the current
@ -585,7 +611,7 @@ class _ComputedReqKindsMixin:
)
)
@property
@property
def source_info ( self ) :
def source_info ( self ) - > dict [ str , object ] | None :
return self . _source_info
return self . _source_info
@ -601,11 +627,11 @@ class Requirement(
) :
) :
""" An abstract requirement request. """
""" An abstract requirement request. """
def __new__ ( cls , * args , * * kwargs ):
def __new__ ( cls , * args : object , * * kwargs : object ) - > t . Self :
self = RequirementNamedTuple . __new__ ( cls , * args , * * kwargs )
self = RequirementNamedTuple . __new__ ( cls , * args , * * kwargs )
return self
return self
def __init__ ( self , * args , * * kwargs ):
def __init__ ( self , * args : object , * * kwargs : object ) - > None :
super ( Requirement , self ) . __init__ ( )
super ( Requirement , self ) . __init__ ( )
@ -615,14 +641,14 @@ class Candidate(
) :
) :
""" A concrete collection candidate with its version resolved. """
""" A concrete collection candidate with its version resolved. """
def __new__ ( cls , * args , * * kwargs ):
def __new__ ( cls , * args : object , * * kwargs : object ) - > t . Self :
self = CandidateNamedTuple . __new__ ( cls , * args , * * kwargs )
self = CandidateNamedTuple . __new__ ( cls , * args , * * kwargs )
return self
return self
def __init__ ( self , * args , * * kwargs ):
def __init__ ( self , * args : object , * * kwargs : object ) - > None :
super ( Candidate , self ) . __init__ ( )
super ( Candidate , self ) . __init__ ( )
def with_signatures_repopulated ( self ) : # type: (Candidate) -> Candidate
def with_signatures_repopulated ( self ) - > Candidate :
""" Populate a new Candidate instance with Galaxy signatures.
""" Populate a new Candidate instance with Galaxy signatures.
: raises AnsibleAssertionError : If the supplied candidate is not sourced from a Galaxy - like index .
: raises AnsibleAssertionError : If the supplied candidate is not sourced from a Galaxy - like index .
"""
"""