@ -24,9 +24,7 @@ import datetime
import json
import os
import re
import subprocess
import sys
import tempfile
import traceback
import warnings
@ -301,8 +299,8 @@ class ModuleValidator(Validator):
ACCEPTLIST_FUTURE_IMPORTS = frozenset ( ( ' absolute_import ' , ' division ' , ' print_function ' ) )
def __init__ ( self , path , analyze_arg_spec= False , collection = None , collection_version = None ,
base_branch= None , git_cache = None , reporter= None , routing = None , plugin_type = ' module ' ) :
def __init__ ( self , path , git_cache: GitCache , analyze_arg_spec= False , collection = None , collection_version = None ,
reporter= None , routing = None , plugin_type = ' module ' ) :
super ( ModuleValidator , self ) . __init__ ( reporter = reporter or Reporter ( ) )
self . path = path
@ -328,8 +326,8 @@ class ModuleValidator(Validator):
self . collection_version_str = collection_version
self . collection_version = SemanticVersion ( collection_version )
self . base_branch = base_branch
self . git_cache = git_cache or GitCache ( )
self . git_cache = git_cache
self . base_module = self . git_cache . get_original_path ( self . path )
self . _python_module_override = False
@ -341,11 +339,6 @@ class ModuleValidator(Validator):
except Exception :
self . ast = None
if base_branch :
self . base_module = self . _get_base_file ( )
else :
self . base_module = None
def _create_version ( self , v , collection_name = None ) :
if not v :
raise ValueError ( ' Empty string is not a valid version ' )
@ -368,13 +361,7 @@ class ModuleValidator(Validator):
return self
def __exit__ ( self , exc_type , exc_value , traceback ) :
if not self . base_module :
return
try :
os . remove ( self . base_module )
except Exception :
pass
pass
@property
def object_name ( self ) :
@ -426,36 +413,9 @@ class ModuleValidator(Validator):
except AttributeError :
return False
def _get_base_branch_module_path ( self ) :
""" List all paths within lib/ansible/modules to try and match a moved module """
return self . git_cache . base_module_paths . get ( self . object_name )
def _has_alias ( self ) :
""" Return true if the module has any aliases. """
return self . object_name in self . git_cache . head_aliased_modules
def _get_base_file ( self ) :
# In case of module moves, look for the original location
base_path = self . _get_base_branch_module_path ( )
ext = os . path . splitext ( base_path or self . path ) [ 1 ]
command = [ ' git ' , ' show ' , ' %s : %s ' % ( self . base_branch , base_path or self . path ) ]
p = subprocess . run ( command , stdin = subprocess . DEVNULL , capture_output = True , check = False )
if int ( p . returncode ) != 0 :
return None
t = tempfile . NamedTemporaryFile ( delete = False , suffix = ext )
t . write ( p . stdout )
t . close ( )
return t . name
def _is_new_module ( self ) :
if self . _has_alias ( ) :
return False
return not self . object_name . startswith ( ' _ ' ) and bool ( self . base_branch ) and not bool ( self . base_module )
def _is_new_module ( self ) - > bool | None :
""" Return True if the content is new, False if it is not and None if the information is not available. """
return self . git_cache . is_new ( self . path )
def _check_interpreter ( self , powershell = False ) :
if powershell :
@ -2050,7 +2010,7 @@ class ModuleValidator(Validator):
)
def _check_for_new_args ( self , doc ) :
if not self . base_ branch or self . _is_new_ module( ) :
if not self . base_ module:
return
with CaptureStd ( ) :
@ -2284,7 +2244,7 @@ class ModuleValidator(Validator):
# We can only validate PowerShell arg spec if it is using the new Ansible.Basic.AnsibleModule util
pattern = r ' (?im)^# \ s*ansiblerequires \ s+ \ -csharputil \ s*Ansible \ .Basic '
if re . search ( pattern , self . text ) and self . object_name not in self . PS_ARG_VALIDATE_REJECTLIST :
with ModuleValidator ( docs_path , base_branch= self . base_branch , git_cache= self . git_cache ) as docs_mv :
with ModuleValidator ( docs_path , git_cache= self . git_cache ) as docs_mv :
docs = docs_mv . _validate_docs ( ) [ 1 ]
self . _validate_ansible_module_call ( docs )
@ -2329,6 +2289,84 @@ class PythonPackageValidator(Validator):
)
class GitCache ( metaclass = abc . ABCMeta ) :
""" Base class for access to original files. """
@abc.abstractmethod
def get_original_path ( self , path : str ) - > str | None :
""" Return the path to the original version of the specified file, or None if there isn ' t one. """
@abc.abstractmethod
def is_new ( self , path : str ) - > bool | None :
""" Return True if the content is new, False if it is not and None if the information is not available. """
@staticmethod
def create ( original_plugins : str | None , plugin_type : str ) - > GitCache :
return CoreGitCache ( original_plugins , plugin_type ) if original_plugins else NoOpGitCache ( )
class CoreGitCache ( GitCache ) :
""" Provides access to original files when testing core. """
def __init__ ( self , original_plugins : str | None , plugin_type : str ) - > None :
super ( ) . __init__ ( )
self . original_plugins = original_plugins
rel_path = ' lib/ansible/modules/ ' if plugin_type == ' module ' else f ' lib/ansible/plugins/ { plugin_type } / '
head_tree = self . _find_files ( rel_path )
head_aliased_modules = set ( )
for path in head_tree :
filename = os . path . basename ( path )
if filename . startswith ( ' _ ' ) and filename != ' __init__.py ' :
if os . path . islink ( path ) :
head_aliased_modules . add ( os . path . basename ( os . path . realpath ( path ) ) )
self . _head_aliased_modules = head_aliased_modules
def get_original_path ( self , path : str ) - > str | None :
""" Return the path to the original version of the specified file, or None if there isn ' t one. """
path = os . path . join ( self . original_plugins , path )
if not os . path . exists ( path ) :
path = None
return path
def is_new ( self , path : str ) - > bool | None :
""" Return True if the content is new, False if it is not and None if the information is not available. """
if os . path . basename ( path ) . startswith ( ' _ ' ) :
return False
if os . path . basename ( path ) in self . _head_aliased_modules :
return False
return not self . get_original_path ( path )
@staticmethod
def _find_files ( path : str ) - > list [ str ] :
""" Return a list of files found in the specified directory. """
paths = [ ]
for ( dir_path , dir_names , file_names ) in os . walk ( path ) :
for file_name in file_names :
paths . append ( os . path . join ( dir_path , file_name ) )
return sorted ( paths )
class NoOpGitCache ( GitCache ) :
""" Provides a no-op interface for access to original files. """
def get_original_path ( self , path : str ) - > str | None :
""" Return the path to the original version of the specified file, or None if there isn ' t one. """
return None
def is_new ( self , path : str ) - > bool | None :
""" Return True if the content is new, False if it is not and None if the information is not available. """
return None
def re_compile ( value ) :
"""
Argparse expects things to raise TypeError , re . compile raises an re . error
@ -2354,8 +2392,6 @@ def run():
type = re_compile )
parser . add_argument ( ' --arg-spec ' , help = ' Analyze module argument spec ' ,
action = ' store_true ' , default = False )
parser . add_argument ( ' --base-branch ' , default = None ,
help = ' Used in determining if new options were added ' )
parser . add_argument ( ' --format ' , choices = [ ' json ' , ' plain ' ] , default = ' plain ' ,
help = ' Output format. Default: " %(default)s " ' )
parser . add_argument ( ' --output ' , default = ' - ' ,
@ -2372,13 +2408,14 @@ def run():
parser . add_argument ( ' --plugin-type ' ,
default = ' module ' ,
help = ' The plugin type to validate. Defaults to %(default)s ' )
parser . add_argument ( ' --original-plugins ' )
args = parser . parse_args ( )
args . plugins = [ m . rstrip ( ' / ' ) for m in args . plugins ]
reporter = Reporter ( )
git_cache = GitCache ( args . base_branch , args . plugin_type )
git_cache = GitCache . create ( args . original_plugins , args . plugin_type )
check_dirs = set ( )
@ -2403,7 +2440,7 @@ def run():
if ModuleValidator . is_on_rejectlist ( path ) :
continue
with ModuleValidator ( path , collection = args . collection , collection_version = args . collection_version ,
analyze_arg_spec = args . arg_spec , base_branch = args . base_branch ,
analyze_arg_spec = args . arg_spec ,
git_cache = git_cache , reporter = reporter , routing = routing ,
plugin_type = args . plugin_type ) as mv1 :
mv1 . validate ( )
@ -2428,7 +2465,7 @@ def run():
if ModuleValidator . is_on_rejectlist ( path ) :
continue
with ModuleValidator ( path , collection = args . collection , collection_version = args . collection_version ,
analyze_arg_spec = args . arg_spec , base_branch = args . base_branch ,
analyze_arg_spec = args . arg_spec ,
git_cache = git_cache , reporter = reporter , routing = routing ,
plugin_type = args . plugin_type ) as mv2 :
mv2 . validate ( )
@ -2444,75 +2481,6 @@ def run():
sys . exit ( reporter . json ( warnings = args . warnings , output = args . output ) )
class GitCache :
def __init__ ( self , base_branch , plugin_type ) :
self . base_branch = base_branch
self . plugin_type = plugin_type
self . rel_path = ' lib/ansible/modules/ '
if plugin_type != ' module ' :
self . rel_path = ' lib/ansible/plugins/ %s / ' % plugin_type
if self . base_branch :
self . base_tree = self . _git ( [ ' ls-tree ' , ' -r ' , ' --name-only ' , self . base_branch , self . rel_path ] )
else :
self . base_tree = [ ]
try :
self . head_tree = self . _git ( [ ' ls-tree ' , ' -r ' , ' --name-only ' , ' HEAD ' , self . rel_path ] )
except GitError as ex :
if ex . status == 128 :
# fallback when there is no .git directory
self . head_tree = self . _get_module_files ( )
else :
raise
except FileNotFoundError :
# fallback when git is not installed
self . head_tree = self . _get_module_files ( )
allowed_exts = ( ' .py ' , ' .ps1 ' )
if plugin_type != ' module ' :
allowed_exts = ( ' .py ' , )
self . base_module_paths = dict ( ( os . path . basename ( p ) , p ) for p in self . base_tree if os . path . splitext ( p ) [ 1 ] in allowed_exts )
self . base_module_paths . pop ( ' __init__.py ' , None )
self . head_aliased_modules = set ( )
for path in self . head_tree :
filename = os . path . basename ( path )
if filename . startswith ( ' _ ' ) and filename != ' __init__.py ' :
if os . path . islink ( path ) :
self . head_aliased_modules . add ( os . path . basename ( os . path . realpath ( path ) ) )
def _get_module_files ( self ) :
module_files = [ ]
for ( dir_path , dir_names , file_names ) in os . walk ( self . rel_path ) :
for file_name in file_names :
module_files . append ( os . path . join ( dir_path , file_name ) )
return module_files
@staticmethod
def _git ( args ) :
cmd = [ ' git ' ] + args
p = subprocess . run ( cmd , stdin = subprocess . DEVNULL , capture_output = True , text = True , check = False )
if p . returncode != 0 :
raise GitError ( p . stderr , p . returncode )
return p . stdout . splitlines ( )
class GitError ( Exception ) :
def __init__ ( self , message , status ) :
super ( GitError , self ) . __init__ ( message )
self . status = status
def main ( ) :
try :
run ( )