@ -6,12 +6,12 @@
from __future__ import annotations
import collections . abc as _c
import os
import pathlib
import typing as t
from collections import namedtuple
from collections . abc import MutableSequence , MutableMapping
from glob import iglob
from urllib . parse import urlparse
from yaml import safe_load
@ -43,7 +43,12 @@ _SOURCE_METADATA_FILE = b'GALAXY.yml'
display = Display ( )
def get_validated_source_info ( b_source_info_path , namespace , name , version ) :
def get_validated_source_info (
b_source_info_path : bytes ,
namespace : str ,
name : str ,
version : str ,
) - > dict [ str , object ] | None :
source_info_path = to_text ( b_source_info_path , errors = ' surrogate_or_strict ' )
if not os . path . isfile ( b_source_info_path ) :
@ -58,7 +63,7 @@ def get_validated_source_info(b_source_info_path, namespace, name, version):
)
return None
if not isinstance ( metadata , MutableMapping ) :
if not isinstance ( metadata , dict ) :
display . warning ( f " Error getting collection source information at ' { source_info_path } ' : expected a YAML dictionary " )
return None
@ -72,7 +77,12 @@ def get_validated_source_info(b_source_info_path, namespace, name, version):
return metadata
def _validate_v1_source_info_schema ( namespace , name , version , provided_arguments ) :
def _validate_v1_source_info_schema (
namespace : str ,
name : str ,
version : str ,
provided_arguments : dict [ str , object ] ,
) - > list [ str ] :
argument_spec_data = dict (
format_version = dict ( choices = [ " 1.0.0 " ] ) ,
download_url = dict ( ) ,
@ -102,24 +112,24 @@ def _validate_v1_source_info_schema(namespace, name, version, provided_arguments
return validation_result . error_messages
def _is_collection_src_dir ( dir_path ):
def _is_collection_src_dir ( dir_path : bytes | str ) - > bool :
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
return os . path . isfile ( os . path . join ( b_dir_path , _GALAXY_YAML ) )
def _is_installed_collection_dir ( dir_path ):
def _is_installed_collection_dir ( dir_path : bytes | str ) - > bool :
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
return os . path . isfile ( os . path . join ( b_dir_path , _MANIFEST_JSON ) )
def _is_collection_dir ( dir_path ):
def _is_collection_dir ( dir_path : bytes | str ) - > bool :
return (
_is_installed_collection_dir ( dir_path ) or
_is_collection_src_dir ( dir_path )
)
def _find_collections_in_subdirs ( dir_path ):
def _find_collections_in_subdirs ( dir_path : str ) - > _c . Iterator [ bytes ] :
b_dir_path = to_bytes ( dir_path , errors = ' surrogate_or_strict ' )
subdir_glob_pattern = os . path . join (
@ -135,23 +145,23 @@ def _find_collections_in_subdirs(dir_path):
yield subdir
def _is_collection_namespace_dir ( tested_str ):
def _is_collection_namespace_dir ( tested_str : str ) - > bool :
return any ( _find_collections_in_subdirs ( tested_str ) )
def _is_file_path ( tested_str ):
def _is_file_path ( tested_str : str ) - > bool :
return os . path . isfile ( to_bytes ( tested_str , errors = ' surrogate_or_strict ' ) )
def _is_http_url ( tested_str ):
def _is_http_url ( tested_str : str ) - > bool :
return urlparse ( tested_str ) . scheme . lower ( ) in { ' http ' , ' https ' }
def _is_git_url ( tested_str ):
def _is_git_url ( tested_str : str ) - > bool :
return tested_str . startswith ( ( ' git+ ' , ' git@ ' ) )
def _is_concrete_artifact_pointer ( tested_str ):
def _is_concrete_artifact_pointer ( tested_str : str ) - > bool :
return any (
predicate ( tested_str )
for predicate in (
@ -168,7 +178,7 @@ def _is_concrete_artifact_pointer(tested_str):
class _ComputedReqKindsMixin :
UNIQUE_ATTRS = ( ' fqcn ' , ' ver ' , ' src ' , ' type ' )
def __init__ ( self , * args , * * kwargs ) :
def __init__ ( self , * args , * * kwargs ) - > None :
if not self . may_have_offline_galaxy_info :
self . _source_info = None
else :
@ -181,18 +191,18 @@ class _ComputedReqKindsMixin:
self . ver
)
def __hash__ ( self ) :
def __hash__ ( self ) - > int :
return hash ( tuple ( getattr ( self , attr ) for attr in _ComputedReqKindsMixin . UNIQUE_ATTRS ) )
def __eq__ ( self , candidate ):
def __eq__ ( self , candidate : _c . Hashable ) - > bool :
return hash ( self ) == hash ( candidate )
@classmethod
def from_dir_path_as_unknown ( # type: ignore[misc]
cls , # type: t.Type[Collection]
dir_path , # type : bytes
art_mgr , # type: ConcreteArtifactsManager
) : # type: (...) -> Collection
def from_dir_path_as_unknown (
cls ,
dir_path : bytes,
art_mgr : ConcreteArtifactsManager ,
) - > t . Self :
""" Make collection from an unspecified dir type.
This alternative constructor attempts to grab metadata from the
@ -215,11 +225,11 @@ class _ComputedReqKindsMixin:
return cls . from_dir_path_implicit ( dir_path )
@classmethod
def from_dir_path ( # type: ignore[misc]
cls , # type: t.Type[Collection]
dir_path , # type : bytes
art_mgr , # type: ConcreteArtifactsManager
) : # type: (...) -> Collection
def from_dir_path (
cls ,
dir_path : bytes,
art_mgr : ConcreteArtifactsManager ,
) - > t . Self :
""" Make collection from an directory with metadata. """
if dir_path . endswith ( to_bytes ( os . path . sep ) ) :
dir_path = dir_path . rstrip ( to_bytes ( os . path . sep ) )
@ -262,10 +272,10 @@ class _ComputedReqKindsMixin:
return cls ( req_name , req_version , dir_path , ' dir ' , None )
@classmethod
def from_dir_path_implicit ( # type: ignore[misc]
cls , # type: t.Type[Collection]
dir_path , # type : bytes
) : # type: (...) -> Collection
def from_dir_path_implicit (
cls ,
dir_path : bytes,
) - > t . Self :
""" Construct a collection instance based on an arbitrary dir.
This alternative constructor infers the FQCN based on the parent
@ -278,11 +288,16 @@ class _ComputedReqKindsMixin:
u_dir_path = to_text ( dir_path , errors = ' surrogate_or_strict ' )
path_list = u_dir_path . split ( os . path . sep )
req_name = ' . ' . join ( path_list [ - 2 : ] )
return cls ( req_name , ' * ' , dir_path , ' dir ' , None ) # type: ignore[call-arg]
return cls ( req_name , ' * ' , dir_path , ' dir ' , None )
@classmethod
def from_string ( cls , collection_input , artifacts_manager , supplemental_signatures ) :
req = { }
def from_string (
cls ,
collection_input : str ,
artifacts_manager : ConcreteArtifactsManager ,
supplemental_signatures : list [ str ] | None ,
) - > t . Self :
req : dict [ str , str | list [ str ] | None ] = { }
if _is_concrete_artifact_pointer ( collection_input ) or AnsibleCollectionRef . is_valid_collection_name ( collection_input ) :
# Arg is a file path or URL to a collection, or just a collection
req [ ' name ' ] = collection_input
@ -307,7 +322,14 @@ class _ComputedReqKindsMixin:
return cls . from_requirement_dict ( req , artifacts_manager )
@classmethod
def from_requirement_dict ( cls , collection_req , art_mgr , validate_signature_options = True ) :
def from_requirement_dict (
cls ,
# NOTE: The actual `collection_req` shape is supposed to be
# NOTE: `dict[str, str | list[str] | None]`
collection_req : dict [ str , t . Any ] ,
art_mgr : ConcreteArtifactsManager ,
validate_signature_options : bool = True ,
) - > t . Self :
req_name = collection_req . get ( ' name ' , None )
req_version = collection_req . get ( ' version ' , ' * ' )
req_type = collection_req . get ( ' type ' )
@ -320,7 +342,7 @@ class _ComputedReqKindsMixin:
f " Signatures were provided to verify { req_name } but no keyring was configured. "
)
if not isinstance ( req_signature_sources , MutableSequence) :
if not isinstance ( req_signature_sources , _c. MutableSequence) :
req_signature_sources = [ req_signature_sources ]
req_signature_sources = frozenset ( req_signature_sources )
@ -434,7 +456,11 @@ class _ComputedReqKindsMixin:
format ( not_url = req_source . api_server ) ,
)
if req_type == ' dir ' and req_source . endswith ( os . path . sep ) :
if (
req_type == ' dir '
and isinstance ( req_source , str )
and req_source . endswith ( os . path . sep )
) :
req_source = req_source . rstrip ( os . path . sep )
tmp_inst_req = cls ( req_name , req_version , req_source , req_type , req_signature_sources )
@ -451,16 +477,16 @@ class _ComputedReqKindsMixin:
req_signature_sources ,
)
def __repr__ ( self ) :
def __repr__ ( self ) - > str :
return (
' < {self!s} of type {coll_type!r} from {src!s} > ' .
format ( self = self , coll_type = self . type , src = self . src or ' Galaxy ' )
)
def __str__ ( self ) :
def __str__ ( self ) - > str :
return to_native ( self . __unicode__ ( ) )
def __unicode__ ( self ) :
def __unicode__ ( self ) - > str :
if self . fqcn is None :
return (
f ' { self . type } collection from a Git repo ' if self . is_scm
@ -473,7 +499,7 @@ class _ComputedReqKindsMixin:
)
@property
def may_have_offline_galaxy_info ( self ) :
def may_have_offline_galaxy_info ( self ) - > bool :
if self . fqcn is None :
# Virtual collection
return False
@ -482,7 +508,7 @@ class _ComputedReqKindsMixin:
return False
return True
def construct_galaxy_info_path ( self , b_collection_path ):
def construct_galaxy_info_path ( self , b_collection_path : bytes ) - > bytes :
if not self . may_have_offline_galaxy_info and not self . type == ' galaxy ' :
raise TypeError ( ' Only installed collections from a Galaxy server have offline Galaxy info ' )
@ -502,21 +528,21 @@ class _ComputedReqKindsMixin:
return self . fqcn . split ( ' . ' )
@property
def namespace ( self ) :
def namespace ( self ) - > str :
if self . is_virtual :
raise TypeError ( f ' { self . type } collections do not have a namespace ' )
return self . _get_separate_ns_n_name ( ) [ 0 ]
@property
def name ( self ) :
def name ( self ) - > str :
if self . is_virtual :
raise TypeError ( f ' { self . type } collections do not have a name ' )
return self . _get_separate_ns_n_name ( ) [ - 1 ]
@property
def canonical_package_id ( self ) :
def canonical_package_id ( self ) - > str :
if not self . is_virtual :
return to_native ( self . fqcn )
@ -526,46 +552,46 @@ class _ComputedReqKindsMixin:
)
@property
def is_virtual ( self ) :
def is_virtual ( self ) - > bool :
return self . is_scm or self . is_subdirs
@property
def is_file ( self ) :
def is_file ( self ) - > bool :
return self . type == ' file '
@property
def is_dir ( self ) :
def is_dir ( self ) - > bool :
return self . type == ' dir '
@property
def namespace_collection_paths ( self ) :
def namespace_collection_paths ( self ) - > list [ str ] :
return [
to_native ( path )
for path in _find_collections_in_subdirs ( self . src )
]
@property
def is_subdirs ( self ) :
def is_subdirs ( self ) - > bool :
return self . type == ' subdirs '
@property
def is_url ( self ) :
def is_url ( self ) - > bool :
return self . type == ' url '
@property
def is_scm ( self ) :
def is_scm ( self ) - > bool :
return self . type == ' git '
@property
def is_concrete_artifact ( self ) :
def is_concrete_artifact ( self ) - > bool :
return self . type in { ' git ' , ' url ' , ' file ' , ' dir ' , ' subdirs ' }
@property
def is_online_index_pointer ( self ) :
def is_online_index_pointer ( self ) - > bool :
return not self . is_concrete_artifact
@property
def is_pinned ( self ) :
def is_pinned ( self ) - > bool :
""" Indicate if the version set is considered pinned.
This essentially computes whether the version field of the current
@ -585,7 +611,7 @@ class _ComputedReqKindsMixin:
)
@property
def source_info ( self ) :
def source_info ( self ) - > dict [ str , object ] | None :
return self . _source_info
@ -601,11 +627,11 @@ class Requirement(
) :
""" An abstract requirement request. """
def __new__ ( cls , * args , * * kwargs ):
def __new__ ( cls , * args : object , * * kwargs : object ) - > t . Self :
self = RequirementNamedTuple . __new__ ( cls , * args , * * kwargs )
return self
def __init__ ( self , * args , * * kwargs ):
def __init__ ( self , * args : object , * * kwargs : object ) - > None :
super ( Requirement , self ) . __init__ ( )
@ -615,14 +641,14 @@ class Candidate(
) :
""" A concrete collection candidate with its version resolved. """
def __new__ ( cls , * args , * * kwargs ):
def __new__ ( cls , * args : object , * * kwargs : object ) - > t . Self :
self = CandidateNamedTuple . __new__ ( cls , * args , * * kwargs )
return self
def __init__ ( self , * args , * * kwargs ):
def __init__ ( self , * args : object , * * kwargs : object ) - > None :
super ( Candidate , self ) . __init__ ( )
def with_signatures_repopulated ( self ) : # type: (Candidate) -> Candidate
def with_signatures_repopulated ( self ) - > Candidate :
""" Populate a new Candidate instance with Galaxy signatures.
: raises AnsibleAssertionError : If the supplied candidate is not sourced from a Galaxy - like index .
"""