@ -39,13 +39,30 @@ except ImportError:
from configparser import ConfigParser
DOCKER_COMPLETION = { }
COVERAGE _PATHS = { } # type: dict[str, str]
PYTHON _PATHS = { } # type: dict[str, str]
try :
MAXFD = subprocess . MAXFD
except AttributeError :
MAXFD = - 1
COVERAGE_CONFIG_PATH = ' .coveragerc '
COVERAGE_OUTPUT_PATH = ' coverage '
# Modes are set to allow all users the same level of access.
# This permits files to be used in tests that change users.
# The only exception is write access to directories for the user creating them.
# This avoids having to modify the directory permissions a second time.
MODE_READ = stat . S_IRUSR | stat . S_IRGRP | stat . S_IROTH
MODE_FILE = MODE_READ
MODE_FILE_EXECUTE = MODE_FILE | stat . S_IXUSR | stat . S_IXGRP | stat . S_IXOTH
MODE_FILE_WRITE = MODE_FILE | stat . S_IWUSR | stat . S_IWGRP | stat . S_IWOTH
MODE_DIRECTORY = MODE_READ | stat . S_IWUSR | stat . S_IXUSR | stat . S_IXGRP | stat . S_IXOTH
MODE_DIRECTORY_WRITE = MODE_DIRECTORY | stat . S_IWGRP | stat . S_IWOTH
def get_docker_completion ( ) :
"""
@ -107,6 +124,83 @@ def read_lines_without_comments(path, remove_blank_lines=False):
return lines
def get_python_path ( args , interpreter ) :
"""
: type args : TestConfig
: type interpreter : str
: rtype : str
"""
python_path = PYTHON_PATHS . get ( interpreter )
if python_path :
return python_path
prefix = ' python- '
suffix = ' -ansible '
root_temp_dir = ' /tmp '
if args . explain :
return os . path . join ( root_temp_dir , ' ' . join ( ( prefix , ' temp ' , suffix ) ) )
python_path = tempfile . mkdtemp ( prefix = prefix , suffix = suffix , dir = root_temp_dir )
os . chmod ( python_path , MODE_DIRECTORY )
os . symlink ( interpreter , os . path . join ( python_path , ' python ' ) )
if not PYTHON_PATHS :
atexit . register ( cleanup_python_paths )
PYTHON_PATHS [ interpreter ] = python_path
return python_path
def cleanup_python_paths ( ) :
""" Clean up all temporary python directories. """
for path in sorted ( PYTHON_PATHS . values ( ) ) :
display . info ( ' Cleaning up temporary python directory: %s ' % path , verbosity = 2 )
shutil . rmtree ( path )
def get_coverage_environment ( args , target_name , version , temp_path ) :
"""
: type args : TestConfig
: type target_name : str
: type version : str
: type temp_path : str
: rtype : dict [ str , str ]
"""
if temp_path :
# integration tests (both localhost and the optional testhost)
# config and results are in a temporary directory
coverage_config_base_path = temp_path
coverage_output_base_path = temp_path
else :
# unit tests, sanity tests and other special cases (localhost only)
# config and results are in the source tree
coverage_config_base_path = os . getcwd ( )
coverage_output_base_path = os . path . abspath ( os . path . join ( ' test/results ' ) )
config_file = os . path . join ( coverage_config_base_path , COVERAGE_CONFIG_PATH )
coverage_file = os . path . join ( coverage_output_base_path , COVERAGE_OUTPUT_PATH , ' %s = %s = %s = %s =coverage ' % (
args . command , target_name , args . coverage_label or ' local- %s ' % version , ' python- %s ' % version ) )
if args . coverage_check :
coverage_file = ' '
env = dict (
# both AnsiballZ and the ansible-test coverage injector rely on this
_ANSIBLE_COVERAGE_CONFIG = config_file ,
# used during AnsiballZ wrapper creation to set COVERAGE_FILE for the module
_ANSIBLE_COVERAGE_OUTPUT = coverage_file ,
# handle cases not covered by the AnsiballZ wrapper creation above
COVERAGE_FILE = coverage_file ,
)
return env
def find_executable ( executable , cwd = None , path = None , required = True ) :
"""
: type executable : str
@ -183,18 +277,19 @@ def generate_pip_command(python):
return [ python , ' -m ' , ' pip.__main__ ' ]
def intercept_command ( args , cmd , target_name , capture= Fals e, env = Non e, data = None , cwd = None , python_version = None , path= None , coverage = None ) :
def intercept_command ( args , cmd , target_name , env, capture= Fals e, data = None , cwd = None , python_version = None , temp_ path= None , coverage = None , virtualenv = None ) :
"""
: type args : TestConfig
: type cmd : collections . Iterable [ str ]
: type target_name : str
: type env : dict [ str , str ]
: type capture : bool
: type env : dict [ str , str ] | None
: type data : str | None
: type cwd : str | None
: type python_version : str | None
: type path: str | None
: type temp_ path: str | None
: type coverage : bool | None
: type virtualenv : str | None
: rtype : str | None , str | None
"""
if not env :
@ -205,108 +300,26 @@ def intercept_command(args, cmd, target_name, capture=False, env=None, data=None
cmd = list ( cmd )
version = python_version or args . python_version
interpreter = find_python ( version , path )
inject_path = get_coverage_path ( args , interpreter )
config_path = os . path . join ( inject_path , ' injector.json ' )
coverage_file = os . path . abspath ( os . path . join ( inject_path , ' .. ' , ' output ' , ' %s = %s = %s = %s =coverage ' % (
args . command , target_name , args . coverage_label or ' local- %s ' % version , ' python- %s ' % version ) ) )
interpreter = virtualenv or find_python ( version )
inject_path = os . path . abspath ( ' test/runner/injector ' )
if args . coverage_check :
coverage_file = ' '
if not virtualenv :
# injection of python into the path is required when not activating a virtualenv
# otherwise scripts may find the wrong interpreter or possibly no interpreter
python_path = get_python_path ( args , interpreter )
inject_path = python_path + os . path . pathsep + inject_path
env [ ' PATH ' ] = inject_path + os . path . pathsep + env [ ' PATH ' ]
env [ ' ANSIBLE_TEST_PYTHON_VERSION ' ] = version
env [ ' ANSIBLE_TEST_PYTHON_INTERPRETER ' ] = interpreter
if coverage :
env [ ' _ANSIBLE_COVERAGE_CONFIG ' ] = os . path . join ( inject_path , ' .coveragerc ' )
env [ ' _ANSIBLE_COVERAGE_OUTPUT ' ] = coverage_file
config = dict (
python_interpreter = interpreter ,
coverage_file = coverage_file if coverage else None ,
)
if not args . explain :
with open ( config_path , ' w ' ) as config_fd :
json . dump ( config , config_fd , indent = 4 , sort_keys = True )
# add the necessary environment variables to enable code coverage collection
env . update ( get_coverage_environment ( args , target_name , version , temp_path ) )
return run_command ( args , cmd , capture = capture , env = env , data = data , cwd = cwd )
def get_coverage_path ( args , interpreter ) :
"""
: type args : TestConfig
: type interpreter : str
: rtype : str
"""
coverage_path = COVERAGE_PATHS . get ( interpreter )
if coverage_path :
return os . path . join ( coverage_path , ' coverage ' )
prefix = ' ansible-test-coverage- '
tmp_dir = ' /tmp '
if args . explain :
return os . path . join ( tmp_dir , ' %s tmp ' % prefix , ' coverage ' )
src = os . path . abspath ( os . path . join ( os . getcwd ( ) , ' test/runner/injector/ ' ) )
coverage_path = tempfile . mkdtemp ( ' ' , prefix , dir = tmp_dir )
os . chmod ( coverage_path , stat . S_IRWXU | stat . S_IRGRP | stat . S_IXGRP | stat . S_IROTH | stat . S_IXOTH )
shutil . copytree ( src , os . path . join ( coverage_path , ' coverage ' ) )
shutil . copy ( ' .coveragerc ' , os . path . join ( coverage_path , ' coverage ' , ' .coveragerc ' ) )
for root , dir_names , file_names in os . walk ( coverage_path ) :
for name in dir_names + file_names :
os . chmod ( os . path . join ( root , name ) , stat . S_IRWXU | stat . S_IRGRP | stat . S_IXGRP | stat . S_IROTH | stat . S_IXOTH )
for directory in ' output ' , ' logs ' :
os . mkdir ( os . path . join ( coverage_path , directory ) )
os . chmod ( os . path . join ( coverage_path , directory ) , stat . S_IRWXU | stat . S_IRWXG | stat . S_IRWXO )
os . symlink ( interpreter , os . path . join ( coverage_path , ' coverage ' , ' python ' ) )
if not COVERAGE_PATHS :
atexit . register ( cleanup_coverage_dirs )
COVERAGE_PATHS [ interpreter ] = coverage_path
return os . path . join ( coverage_path , ' coverage ' )
def cleanup_coverage_dirs ( ) :
""" Clean up all coverage directories. """
for path in COVERAGE_PATHS . values ( ) :
display . info ( ' Cleaning up coverage directory: %s ' % path , verbosity = 2 )
cleanup_coverage_dir ( path )
def cleanup_coverage_dir ( coverage_path ) :
""" Copy over coverage data from temporary directory and purge temporary directory.
: type coverage_path : str
"""
output_dir = os . path . join ( coverage_path , ' output ' )
for filename in os . listdir ( output_dir ) :
src = os . path . join ( output_dir , filename )
dst = os . path . join ( os . getcwd ( ) , ' test ' , ' results ' , ' coverage ' )
shutil . copy ( src , dst )
logs_dir = os . path . join ( coverage_path , ' logs ' )
for filename in os . listdir ( logs_dir ) :
random_suffix = ' ' . join ( random . choice ( string . ascii_letters + string . digits ) for _ in range ( 8 ) )
new_name = ' %s . %s .log ' % ( os . path . splitext ( os . path . basename ( filename ) ) [ 0 ] , random_suffix )
src = os . path . join ( logs_dir , filename )
dst = os . path . join ( os . getcwd ( ) , ' test ' , ' results ' , ' logs ' , new_name )
shutil . copy ( src , dst )
shutil . rmtree ( coverage_path )
def run_command ( args , cmd , capture = False , env = None , data = None , cwd = None , always = False , stdin = None , stdout = None ,
cmd_verbosity = 1 , str_errors = ' strict ' ) :
"""