@ -7,24 +7,28 @@ __metaclass__ = type
def main ( ) :
"""
Main program function used to isolate globals from imported code .
Changes to globals in imported modules on Python 2. 7 will overwrite our own globals .
Changes to globals in imported modules on Python 2. x will overwrite our own globals .
"""
import contextlib
import os
import re
import runpy
import sys
import traceback
import types
import warnings
import_dir = os . environ [ ' SANITY_IMPORT_DIR ' ]
minimal_dir = os . environ [ ' SANITY_MINIMAL_DIR ' ]
ansible_path = os . environ [ ' PYTHONPATH ' ]
temp_path = os . environ [ ' SANITY_TEMP_PATH ' ] + os . path . sep
collection_full_name = os . environ . get ( ' SANITY_COLLECTION_FULL_NAME ' )
try :
import importlib . util
imp = None # pylint: disable=invalid-nam e
# noinspection PyCompatibility
from importlib import import_modul e
except ImportError :
importlib = None # pylint: disable=invalid-name
import imp
def import_module ( name ) :
__import__ ( name )
return sys . modules [ name ]
try :
# noinspection PyCompatibility
@ -32,23 +36,55 @@ def main():
except ImportError :
from io import StringIO
import ansible . module_utils . basic
import ansible . module_utils . common . removed
# pre-load an empty ansible package to prevent unwanted code in __init__.py from loading
# without this the ansible.release import there would pull in many Python modules which Ansible modules should not have access to
ansible_module = types . ModuleType ( ' ansible ' )
ansible_module . __file__ = os . path . join ( os . environ [ ' PYTHONPATH ' ] , ' ansible ' , ' __init__.py ' )
ansible_module . __path__ = [ os . path . dirname ( ansible_module . __file__ ) ]
ansible_module . __package__ = ' ansible '
try :
sys . modules [ ' ansible ' ] = ansible_module
if collection_full_name :
# allow importing code from collections when testing a collection
from ansible . utils . collection_loader import AnsibleCollectionLoader
except ImportError :
# noinspection PyPep8Naming
AnsibleCollectionLoader = None
# These are the public attribute sof a doc-only module
doc_keys = ( ' ANSIBLE_METADATA ' ,
' DOCUMENTATION ' ,
' EXAMPLES ' ,
' RETURN ' ,
' absolute_import ' ,
' division ' ,
' print_function ' )
from ansible . module_utils . _text import to_bytes
def get_source ( self , fullname ) :
mod = sys . modules . get ( fullname )
if not mod :
mod = self . load_module ( fullname )
with open ( to_bytes ( mod . __file__ ) , ' rb ' ) as mod_file :
source = mod_file . read ( )
return source
def get_code ( self , fullname ) :
return compile ( source = self . get_source ( fullname ) , filename = self . get_filename ( fullname ) , mode = ' exec ' , flags = 0 , dont_inherit = True )
def is_package ( self , fullname ) :
return self . get_filename ( fullname ) . endswith ( ' __init__.py ' )
def get_filename ( self , fullname ) :
mod = sys . modules . get ( fullname ) or self . load_module ( fullname )
return mod . __file__
# monkeypatch collection loader to work with runpy
# remove this (and the associated code above) once implemented natively in the collection loader
AnsibleCollectionLoader . get_source = get_source
AnsibleCollectionLoader . get_code = get_code
AnsibleCollectionLoader . is_package = is_package
AnsibleCollectionLoader . get_filename = get_filename
collection_loader = AnsibleCollectionLoader ( )
# noinspection PyCallingNonCallable
sys . meta_path . insert ( 0 , collection_loader )
else :
# do not support collection loading when not testing a collection
collection_loader = None
class ImporterAnsibleModuleException ( Exception ) :
""" Exception thrown during initialization of ImporterAnsibleModule. """
@ -58,185 +94,241 @@ def main():
def __init__ ( self , * args , * * kwargs ) :
raise ImporterAnsibleModuleException ( )
# stop Ansible module execution during AnsibleModule instantiation
ansible . module_utils . basic . AnsibleModule = ImporterAnsibleModule
# no-op for _load_params since it may be called before instantiating AnsibleModule
ansible . module_utils . basic . _load_params = lambda * args , * * kwargs : { } # pylint: disable=protected-access
# no-op for removed_module since it is called in place of AnsibleModule instantiation
ansible . module_utils . common . removed . removed_module = lambda * args , * * kwargs : None
class ImportBlacklist :
""" Blacklist inappropriate imports. """
def __init__ ( self , path , name ) :
self . path = path
self . name = name
self . loaded_modules = set ( )
def find_module ( self , fullname , path = None ) :
""" Return self if the given fullname is blacklisted, otherwise return None.
: param fullname : str
: param path : str
: return : ImportBlacklist | None
"""
if fullname in self . loaded_modules :
return None # ignore modules that are already being loaded
if is_name_in_namepace ( fullname , [ ' ansible ' ] ) :
if fullname in ( ' ansible.module_utils.basic ' , ' ansible.module_utils.common.removed ' ) :
return self # intercept loading so we can modify the result
if is_name_in_namepace ( fullname , [ ' ansible.module_utils ' , self . name ] ) :
return None # module_utils and module under test are always allowed
if os . path . exists ( convert_ansible_name_to_absolute_path ( fullname ) ) :
return self # blacklist ansible files that exist
return None # ansible file does not exist, do not blacklist
if is_name_in_namepace ( fullname , [ ' ansible_collections ' ] ) :
if not collection_loader :
return self # blacklist collections when we are not testing a collection
if is_name_in_namepace ( fullname , [ ' ansible_collections...plugins.module_utils ' , self . name ] ) :
return None # module_utils and module under test are always allowed
if collection_loader . find_module ( fullname , path ) :
return self # blacklist collection files that exist
return None # collection file does not exist, do not blacklist
# not a namespace we care about
return None
def load_module ( self , fullname ) :
""" Raise an ImportError.
: type fullname : str
"""
if fullname == ' ansible.module_utils.basic ' :
module = self . __load_module ( fullname )
# stop Ansible module execution during AnsibleModule instantiation
module . AnsibleModule = ImporterAnsibleModule
# no-op for _load_params since it may be called before instantiating AnsibleModule
module . _load_params = lambda * args , * * kwargs : { } # pylint: disable=protected-access
return module
if fullname == ' ansible.module_utils.common.removed ' :
module = self . __load_module ( fullname )
# no-op for removed_module since it is called in place of AnsibleModule instantiation
module . removed_module = lambda * args , * * kwargs : None
return module
raise ImportError ( ' import of " %s " is not allowed in this context ' % fullname )
def __load_module ( self , fullname ) :
""" Load the requested module while avoiding infinite recursion.
: type fullname : str
: rtype : module
"""
self . loaded_modules . add ( fullname )
return import_module ( fullname )
def run ( ) :
""" Main program function. """
base_dir = os . getcwd ( )
messages = set ( )
if AnsibleCollectionLoader :
# allow importing code from collections
# noinspection PyCallingNonCallable
sys . meta_path . insert ( 0 , AnsibleCollectionLoader ( ) )
for path in sys . argv [ 1 : ] or sys . stdin . read ( ) . splitlines ( ) :
test_python_module ( path , base_dir , messages , False )
test_python_module ( path , base_dir , messages , True )
name = convert_relative_path_to_name ( path )
test_python_module ( path , name , base_dir , messages )
if messages :
exit ( 10 )
def test_python_module ( path , base_dir , messages , ansible_module ) :
if ansible_module :
# importing modules with __main__ under Python 2.6 exits with status code 1
if sys . version_info < ( 2 , 7 ) :
return
# only run __main__ protected code for Ansible modules
if not path . startswith ( ' lib/ansible/modules/ ' ) :
return
# __init__ in module directories is empty (enforced by a different test)
if path . endswith ( ' __init__.py ' ) :
return
# async_wrapper is not an Ansible module
if path == ' lib/ansible/modules/utilities/logic/async_wrapper.py ' :
return
def test_python_module ( path , name , base_dir , messages ) :
""" Test the given python module by importing it.
: type path : str
: type name : str
: type base_dir : str
: type messages : set [ str ]
"""
if name in sys . modules :
return # cannot be tested because it has already been loaded
name = calculate_python_module_name ( path )
# show the Ansible module responsible for the exception, even if it was thrown in module_utils
filter_dir = os . path . join ( base_dir , ' lib/ansible/modules ' )
else :
# Calculate module name
name = calculate_python_module_name ( path )
is_ansible_module = ( path . startswith ( ' lib/ansible/modules/ ' ) or path . startswith ( ' plugins/modules/ ' ) ) and os . path . basename ( path ) != ' __init__.py '
run_main = is_ansible_module
# show the Ansible file responsible for the exception, even if it was thrown in 3rd party code
filter_dir = base_dir
if path == ' lib/ansible/modules/utilities/logic/async_wrapper.py ' :
# async_wrapper is a non-standard Ansible module (does not use AnsibleModule) so we cannot test the main function
run_main = False
capture = Capture ( )
capture_normal = Capture ( )
capture_main = Capture ( )
try :
if imp :
with capture_output ( capture ) :
# On Python2 without absolute_import we have to import parent modules all
# the way up the tree
full_path = os . path . abspath ( path )
parent_mod = None
py_packages = name . split ( ' . ' )
# BIG HACK: reimporting module_utils breaks the monkeypatching of basic we did
# above and also breaks modules which import names directly from module_utils
# modules (you'll get errors like ERROR:
# lib/ansible/modules/storage/netapp/na_ontap_vserver_cifs_security.py:151:0:
# AttributeError: 'module' object has no attribute 'netapp').
# So when we import a module_util here, use a munged name.
if ' module_utils ' in py_packages :
# Avoid accidental double underscores by using _1 as a prefix
py_packages [ - 1 ] = ' _1 %s ' % py_packages [ - 1 ]
name = ' . ' . join ( py_packages )
for idx in range ( 1 , len ( py_packages ) ) :
parent_name = ' . ' . join ( py_packages [ : idx ] )
if parent_mod is None :
toplevel_end = full_path . find ( ' ansible/module ' )
toplevel = full_path [ : toplevel_end ]
parent_mod_info = imp . find_module ( parent_name , [ toplevel ] )
else :
parent_mod_info = imp . find_module ( py_packages [ idx - 1 ] , parent_mod . __path__ )
parent_mod = imp . load_module ( parent_name , * parent_mod_info )
# skip distro due to an apparent bug or bad interaction in
# imp.load_module() with our distro/__init__.py.
# distro/__init__.py sets sys.modules['ansible.module_utils.distro']
# = _distro.pyc
# but after running imp.load_module(),
# sys.modules['ansible.module_utils.distro._distro'] = __init__.pyc
# (The opposite of what we set)
# This does not affect runtime so regular import seems to work. It's
# just imp.load_module()
if name == ' ansible.module_utils.distro._1__init__ ' :
return
with open ( path , ' r ' ) as module_fd :
module = imp . load_module ( name , module_fd , full_path , ( ' .py ' , ' r ' , imp . PY_SOURCE ) )
if ansible_module :
run_if_really_module ( module )
else :
spec = importlib . util . spec_from_file_location ( name , os . path . abspath ( path ) )
module = importlib . util . module_from_spec ( spec )
with capture_output ( capture ) :
spec . loader . exec_module ( module )
if ansible_module :
run_if_really_module ( module )
capture_report ( path , capture , messages )
with monitor_sys_modules ( path , messages ) :
with blacklist_imports ( path , name , messages ) :
with capture_output ( capture_normal ) :
import_module ( name )
if run_main :
with monitor_sys_modules ( path , messages ) :
with blacklist_imports ( path , name , messages ) :
with capture_output ( capture_main ) :
runpy . run_module ( name , run_name = ' __main__ ' )
except ImporterAnsibleModuleException :
# module instantiated AnsibleModule without raising an exception
pass
# We truly want to catch anything the plugin might do here, including call sys.exit() so we
# catch BaseException
except BaseException as ex : # pylint: disable=locally-disabled, broad-except
capture_report ( path , capture , messages )
# intentionally catch all exceptions, including calls to sys.exit
exc_type , _exc , exc_tb = sys . exc_info ( )
message = str ( ex )
results = list ( reversed ( traceback . extract_tb ( exc_tb ) ) )
source = None
line = 0
offset = 0
full_path = os . path . join ( base_dir , path )
base_path = base_dir + os . path . sep
source = None
if isinstance ( ex , SyntaxError ) and ex . filename . endswith ( path ) : # pylint: disable=locally-disabled, no-member
# A SyntaxError in the source we're importing will have the correct path, line and offset.
# However, the traceback will report the path to this importer.py script instead.
# We'll use the details from the SyntaxError in this case, as it's more accurate.
source = path
line = ex . lineno or 0 # pylint: disable=locally-disabled, no-member
offset = ex . offset or 0 # pylint: disable=locally-disabled, no-member
message = str ( ex )
# Hack to remove the filename and line number from the message, if present.
message = message . replace ( ' ( %s , line %d ) ' % ( os . path . basename ( path ) , line ) , ' ' )
else :
for result in results :
if result [ 0 ] . startswith ( filter_dir ) :
source = result [ 0 ] [ len ( base_dir ) + 1 : ] . replace ( ' test/lib/ansible_test/_data/sanity/import/ ' , ' ' )
line = result [ 1 ] or 0
break
if not source :
# If none of our source files are found in the traceback, report the file we were testing.
# I haven't been able to come up with a test case that encounters this issue yet.
source = path
message + = ' (in %s : %d ) ' % ( results [ - 1 ] [ 0 ] , results [ - 1 ] [ 1 ] or 0 )
# avoid line wraps in messages
message = re . sub ( r ' \ n * ' , ' : ' , message )
error = ' %s : %d : %d : %s : %s ' % ( source , line , offset , exc_type . __name__ , message )
report_message ( error , messages )
def run_if_really_module ( module ) :
# Module was removed
if ( ' removed ' not in module . ANSIBLE_METADATA [ ' status ' ] and
# Documentation only module
[ attr for attr in
( frozenset ( module . __dict__ . keys ( ) ) . difference ( doc_keys ) )
if not ( attr . startswith ( ' __ ' ) and attr . endswith ( ' __ ' ) ) ] ) :
# Run main() code for ansible_modules
module . main ( )
def calculate_python_module_name ( path ) :
name = None
try :
idx = path . index ( ' ansible/modules ' )
except ValueError :
try :
idx = path . index ( ' ansible/module_utils ' )
except ValueError :
try :
idx = path . index ( ' ansible_collections ' )
except ValueError :
# Default
name = ' module_import_test '
if name is None :
name = path [ idx : - len ( ' .py ' ) ] . replace ( ' / ' , ' . ' )
for result in results :
if result [ 0 ] == full_path :
# save the line number for the file under test
line = result [ 1 ] or 0
if not source and result [ 0 ] . startswith ( base_path ) and not result [ 0 ] . startswith ( temp_path ) :
# save the first path and line number in the traceback which is in our source tree
source = ( os . path . relpath ( result [ 0 ] , base_path ) , result [ 1 ] or 0 , 0 )
if isinstance ( ex , SyntaxError ) :
# SyntaxError has better information than the traceback
if ex . filename == full_path : # pylint: disable=locally-disabled, no-member
# syntax error was reported in the file under test
line = ex . lineno or 0 # pylint: disable=locally-disabled, no-member
offset = ex . offset or 0 # pylint: disable=locally-disabled, no-member
elif ex . filename . startswith ( base_path ) and not ex . filename . startswith ( temp_path ) : # pylint: disable=locally-disabled, no-member
# syntax error was reported in our source tree
source = ( os . path . relpath ( ex . filename , base_path ) , ex . lineno or 0 , ex . offset or 0 ) # pylint: disable=locally-disabled, no-member
# remove the filename and line number from the message
# either it was extracted above, or it's not really useful information
message = re . sub ( r ' \ (.*?, line [0-9]+ \ )$ ' , ' ' , message )
if source and source [ 0 ] != path :
message + = ' (at %s : %d : %d ) ' % ( source [ 0 ] , source [ 1 ] , source [ 2 ] )
report_message ( path , line , offset , ' traceback ' , ' %s : %s ' % ( exc_type . __name__ , message ) , messages )
finally :
capture_report ( path , capture_normal , messages )
capture_report ( path , capture_main , messages )
def is_name_in_namepace ( name , namespaces ) :
""" Returns True if the given name is one of the given namespaces, otherwise returns False. """
name_parts = name . split ( ' . ' )
for namespace in namespaces :
namespace_parts = namespace . split ( ' . ' )
length = min ( len ( name_parts ) , len ( namespace_parts ) )
truncated_name = name_parts [ 0 : length ]
truncated_namespace = namespace_parts [ 0 : length ]
# empty parts in the namespace are treated as wildcards
# to simplify the comparison, use those empty parts to indicate the positions in the name to be empty as well
for idx , part in enumerate ( truncated_namespace ) :
if not part :
truncated_name [ idx ] = part
# example: name=ansible, allowed_name=ansible.module_utils
# example: name=ansible.module_utils.system.ping, allowed_name=ansible.module_utils
if truncated_name == truncated_namespace :
return True
return False
def check_sys_modules ( path , before , messages ) :
""" Check for unwanted changes to sys.modules.
: type path : str
: type before : dict [ str , module ]
: type messages : set [ str ]
"""
after = sys . modules
removed = set ( before . keys ( ) ) - set ( after . keys ( ) )
changed = set ( key for key , value in before . items ( ) if key in after and value != after [ key ] )
# additions are checked by our custom PEP 302 loader, so we don't need to check them again here
for module in sorted ( removed ) :
report_message ( path , 0 , 0 , ' unload ' , ' unloading of " %s " in sys.modules is not supported ' % module , messages )
for module in sorted ( changed ) :
report_message ( path , 0 , 0 , ' reload ' , ' reloading of " %s " in sys.modules is not supported ' % module , messages )
def convert_ansible_name_to_absolute_path ( name ) :
""" Calculate the module path from the given name.
: type name : str
: rtype : str
"""
return os . path . join ( ansible_path , name . replace ( ' . ' , os . path . sep ) )
def convert_relative_path_to_name ( path ) :
""" Calculate the module name from the given path.
: type path : str
: rtype : str
"""
if path . endswith ( ' /__init__.py ' ) :
clean_path = os . path . dirname ( path )
else :
clean_path = path
clean_path = os . path . splitext ( clean_path ) [ 0 ]
name = clean_path . replace ( os . path . sep , ' . ' )
if collection_loader :
# when testing collections the relative paths (and names) being tested are within the collection under test
name = ' ansible_collections. %s . %s ' % ( collection_full_name , name )
else :
# when testing ansible all files being imported reside under the lib directory
name = name [ len ( ' lib/ ' ) : ]
return name
@ -245,7 +337,6 @@ def main():
def __init__ ( self ) :
self . stdout = StringIO ( )
self . stderr = StringIO ( )
self . warnings = [ ]
def capture_report ( path , capture , messages ) :
""" Report on captured output.
@ -255,44 +346,61 @@ def main():
"""
if capture . stdout . getvalue ( ) :
first = capture . stdout . getvalue ( ) . strip ( ) . splitlines ( ) [ 0 ] . strip ( )
message = ' %s : %d : %d : %s : %s ' % ( path , 0 , 0 , ' StandardOutputUsed ' , first )
report_message ( message , messages )
report_message ( path , 0 , 0 , ' stdout ' , first , messages )
if capture . stderr . getvalue ( ) :
first = capture . stderr . getvalue ( ) . strip ( ) . splitlines ( ) [ 0 ] . strip ( )
message = ' %s : %d : %d : %s : %s ' % ( path , 0 , 0 , ' StandardErrorUsed ' , first )
report_message ( message , messages )
for warning in capture . warnings :
msg = re . sub ( r ' \ s+ ' , ' ' , ' %s ' % warning . message ) . strip ( )
filepath = os . path . relpath ( warning . filename )
lineno = warning . lineno
if filepath . startswith ( ' ../ ' ) or filepath . startswith ( minimal_dir ) :
# The warning occurred outside our source tree.
# The best we can do is to report the file which was tested that triggered the warning.
# If the responsible import is in shared code this warning will be repeated for each file tested which imports the shared code.
msg + = ' (in %s : %d ) ' % ( warning . filename , warning . lineno )
filepath = path
lineno = 0
elif filepath . startswith ( import_dir ) :
# Strip the import dir from warning paths in shared code.
# Needed when warnings occur in places like module_utils but are caught by the modules importing the module_utils.
filepath = os . path . relpath ( filepath , import_dir )
message = ' %s : %d : %d : %s : %s ' % ( filepath , lineno , 0 , warning . category . __name__ , msg )
report_message ( message , messages )
def report_message ( message , messages ) :
report_message ( path , 0 , 0 , ' stderr ' , first , messages )
def report_message ( path , line , column , code , message , messages ) :
""" Report message if not already reported.
: type path : str
: type line : int
: type column : int
: type code : str
: type message : str
: type messages : set [ str ]
"""
message = ' %s : %d : %d : %s : %s ' % ( path , line , column , code , message )
if message not in messages :
messages . add ( message )
print ( message )
@contextlib.contextmanager
def blacklist_imports ( path , name , messages ) :
""" Blacklist imports.
: type path : str
: type name : str
: type messages : set [ str ]
"""
blacklist = ImportBlacklist ( path , name )
sys . meta_path . insert ( 0 , blacklist )
try :
yield
finally :
if sys . meta_path [ 0 ] != blacklist :
report_message ( path , 0 , 0 , ' metapath ' , ' changes to sys.meta_path[0] are not permitted ' , messages )
while blacklist in sys . meta_path :
sys . meta_path . remove ( blacklist )
@contextlib.contextmanager
def monitor_sys_modules ( path , messages ) :
""" Monitor sys.modules for unwanted changes, reverting any additions made to our own namespaces. """
snapshot = sys . modules . copy ( )
try :
yield
finally :
check_sys_modules ( path , snapshot , messages )
for key in set ( sys . modules . keys ( ) ) - set ( snapshot . keys ( ) ) :
if is_name_in_namepace ( key , ( ' ansible ' , ' ansible_collections ' ) ) :
del sys . modules [ key ] # only unload our own code since we know it's native Python
@contextlib.contextmanager
def capture_output ( capture ) :
""" Capture sys.stdout and sys.stderr.
@ -304,12 +412,19 @@ def main():
sys . stdout = capture . stdout
sys . stderr = capture . stderr
with warnings . catch_warnings ( record = True ) as captured_warnings :
# clear all warnings registries to make all warnings available
for module in sys . modules . values ( ) :
try :
module . __warningregistry__ . clear ( )
except AttributeError :
pass
with warnings . catch_warnings ( ) :
warnings . simplefilter ( ' error ' )
try :
yield
finally :
capture . warnings = captured_warnings
sys . stdout = old_stdout
sys . stderr = old_stderr