mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
422 lines
15 KiB
Python
422 lines
15 KiB
Python
"""Code coverage support for integration tests."""
|
|
from __future__ import annotations
|
|
|
|
import abc
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import typing as t
|
|
import zipfile
|
|
|
|
from ...io import (
|
|
write_text_file,
|
|
)
|
|
|
|
from ...ansible_util import (
|
|
run_playbook,
|
|
)
|
|
|
|
from ...config import (
|
|
IntegrationConfig,
|
|
)
|
|
|
|
from ...util import (
|
|
COVERAGE_CONFIG_NAME,
|
|
MODE_DIRECTORY,
|
|
MODE_DIRECTORY_WRITE,
|
|
MODE_FILE,
|
|
SubprocessError,
|
|
cache,
|
|
display,
|
|
generate_name,
|
|
get_generic_type,
|
|
get_type_map,
|
|
remove_tree,
|
|
sanitize_host_name,
|
|
verified_chmod,
|
|
)
|
|
|
|
from ...util_common import (
|
|
ResultType,
|
|
)
|
|
|
|
from ...coverage_util import (
|
|
generate_coverage_config,
|
|
get_coverage_platform,
|
|
)
|
|
|
|
from ...host_configs import (
|
|
HostConfig,
|
|
PosixConfig,
|
|
WindowsConfig,
|
|
WindowsInventoryConfig,
|
|
WindowsRemoteConfig,
|
|
)
|
|
|
|
from ...data import (
|
|
data_context,
|
|
)
|
|
|
|
from ...host_profiles import (
|
|
ControllerProfile,
|
|
HostProfile,
|
|
PosixProfile,
|
|
SshTargetHostProfile,
|
|
)
|
|
|
|
from ...provisioning import (
|
|
HostState,
|
|
)
|
|
|
|
from ...connections import (
|
|
LocalConnection,
|
|
)
|
|
|
|
from ...inventory import (
|
|
create_windows_inventory,
|
|
create_posix_inventory,
|
|
)
|
|
|
|
THostConfig = t.TypeVar('THostConfig', bound=HostConfig)
|
|
|
|
|
|
class CoverageHandler(t.Generic[THostConfig], metaclass=abc.ABCMeta):
|
|
"""Base class for configuring hosts for integration test code coverage."""
|
|
|
|
def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
|
|
self.args = args
|
|
self.host_state = host_state
|
|
self.inventory_path = inventory_path
|
|
self.profiles = self.get_profiles()
|
|
|
|
def get_profiles(self) -> list[HostProfile]:
|
|
"""Return a list of profiles relevant for this handler."""
|
|
profile_type = get_generic_type(type(self), HostConfig)
|
|
profiles = [profile for profile in self.host_state.target_profiles if isinstance(profile.config, profile_type)]
|
|
|
|
return profiles
|
|
|
|
@property
|
|
@abc.abstractmethod
|
|
def is_active(self) -> bool:
|
|
"""True if the handler should be used, otherwise False."""
|
|
|
|
@abc.abstractmethod
|
|
def setup(self) -> None:
|
|
"""Perform setup for code coverage."""
|
|
|
|
@abc.abstractmethod
|
|
def teardown(self) -> None:
|
|
"""Perform teardown for code coverage."""
|
|
|
|
@abc.abstractmethod
|
|
def create_inventory(self) -> None:
|
|
"""Create inventory, if needed."""
|
|
|
|
@abc.abstractmethod
|
|
def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]:
|
|
"""Return a dictionary of environment variables for running tests with code coverage."""
|
|
|
|
def run_playbook(self, playbook: str, variables: dict[str, str]) -> None:
|
|
"""Run the specified playbook using the current inventory."""
|
|
self.create_inventory()
|
|
run_playbook(self.args, self.inventory_path, playbook, capture=False, variables=variables)
|
|
|
|
|
|
class PosixCoverageHandler(CoverageHandler[PosixConfig]):
|
|
"""Configure integration test code coverage for POSIX hosts."""
|
|
|
|
def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
|
|
super().__init__(args, host_state, inventory_path)
|
|
|
|
# Common temporary directory used on all POSIX hosts that will be created world writeable.
|
|
self.common_temp_path = f'/tmp/ansible-test-{generate_name()}'
|
|
|
|
def get_profiles(self) -> list[HostProfile]:
|
|
"""Return a list of profiles relevant for this handler."""
|
|
profiles = super().get_profiles()
|
|
profiles = [profile for profile in profiles if not isinstance(profile, ControllerProfile) or
|
|
profile.python.path != self.host_state.controller_profile.python.path]
|
|
|
|
return profiles
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
"""True if the handler should be used, otherwise False."""
|
|
return True
|
|
|
|
@property
|
|
def target_profile(self) -> t.Optional[PosixProfile]:
|
|
"""The POSIX target profile, if it uses a different Python interpreter than the controller, otherwise None."""
|
|
return t.cast(PosixProfile, self.profiles[0]) if self.profiles else None
|
|
|
|
def setup(self) -> None:
|
|
"""Perform setup for code coverage."""
|
|
self.setup_controller()
|
|
self.setup_target()
|
|
|
|
def teardown(self) -> None:
|
|
"""Perform teardown for code coverage."""
|
|
self.teardown_controller()
|
|
self.teardown_target()
|
|
|
|
def setup_controller(self) -> None:
|
|
"""Perform setup for code coverage on the controller."""
|
|
coverage_config_path = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME)
|
|
coverage_output_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name)
|
|
|
|
coverage_config = generate_coverage_config(self.args)
|
|
|
|
write_text_file(coverage_config_path, coverage_config, create_directories=True)
|
|
|
|
verified_chmod(coverage_config_path, MODE_FILE)
|
|
os.mkdir(coverage_output_path)
|
|
verified_chmod(coverage_output_path, MODE_DIRECTORY_WRITE)
|
|
|
|
def setup_target(self) -> None:
|
|
"""Perform setup for code coverage on the target."""
|
|
if not self.target_profile:
|
|
return
|
|
|
|
if isinstance(self.target_profile, ControllerProfile):
|
|
return
|
|
|
|
self.run_playbook('posix_coverage_setup.yml', self.get_playbook_variables())
|
|
|
|
def teardown_controller(self) -> None:
|
|
"""Perform teardown for code coverage on the controller."""
|
|
coverage_temp_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name)
|
|
platform = get_coverage_platform(self.args.controller)
|
|
|
|
for filename in os.listdir(coverage_temp_path):
|
|
shutil.copyfile(os.path.join(coverage_temp_path, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform)))
|
|
|
|
remove_tree(self.common_temp_path)
|
|
|
|
def teardown_target(self) -> None:
|
|
"""Perform teardown for code coverage on the target."""
|
|
if not self.target_profile:
|
|
return
|
|
|
|
if isinstance(self.target_profile, ControllerProfile):
|
|
return
|
|
|
|
profile = t.cast(SshTargetHostProfile, self.target_profile)
|
|
platform = get_coverage_platform(profile.config)
|
|
con = profile.get_controller_target_connections()[0]
|
|
|
|
with tempfile.NamedTemporaryFile(prefix='ansible-test-coverage-', suffix='.tgz') as coverage_tgz:
|
|
try:
|
|
con.create_archive(chdir=self.common_temp_path, name=ResultType.COVERAGE.name, dst=coverage_tgz)
|
|
except SubprocessError as ex:
|
|
display.warning(f'Failed to download coverage results: {ex}')
|
|
else:
|
|
coverage_tgz.seek(0)
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
local_con = LocalConnection(self.args)
|
|
local_con.extract_archive(chdir=temp_dir, src=coverage_tgz)
|
|
|
|
base_dir = os.path.join(temp_dir, ResultType.COVERAGE.name)
|
|
|
|
for filename in os.listdir(base_dir):
|
|
shutil.copyfile(os.path.join(base_dir, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform)))
|
|
|
|
self.run_playbook('posix_coverage_teardown.yml', self.get_playbook_variables())
|
|
|
|
def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]:
|
|
"""Return a dictionary of environment variables for running tests with code coverage."""
|
|
|
|
# Enable code coverage collection on Ansible modules (both local and remote).
|
|
# Used by the AnsiballZ wrapper generator in lib/ansible/executor/module_common.py to support code coverage.
|
|
config_file = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME)
|
|
|
|
# Include the command, target and platform marker so the remote host can create a filename with that info.
|
|
# The generated AnsiballZ wrapper is responsible for adding '=python-{X.Y}=coverage.{hostname}.{pid}.{id}'
|
|
coverage_file = os.path.join(self.common_temp_path, ResultType.COVERAGE.name, '='.join((self.args.command, target_name, 'platform')))
|
|
|
|
if self.args.coverage_check:
|
|
# cause the 'coverage' module to be found, but not imported or enabled
|
|
coverage_file = ''
|
|
|
|
variables = dict(
|
|
_ANSIBLE_COVERAGE_CONFIG=config_file,
|
|
_ANSIBLE_COVERAGE_OUTPUT=coverage_file,
|
|
)
|
|
|
|
return variables
|
|
|
|
def create_inventory(self) -> None:
|
|
"""Create inventory."""
|
|
create_posix_inventory(self.args, self.inventory_path, self.host_state.target_profiles)
|
|
|
|
def get_playbook_variables(self) -> dict[str, str]:
|
|
"""Return a dictionary of variables for setup and teardown of POSIX coverage."""
|
|
return dict(
|
|
common_temp_dir=self.common_temp_path,
|
|
coverage_config=generate_coverage_config(self.args),
|
|
coverage_config_path=os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME),
|
|
coverage_output_path=os.path.join(self.common_temp_path, ResultType.COVERAGE.name),
|
|
mode_directory=f'{MODE_DIRECTORY:04o}',
|
|
mode_directory_write=f'{MODE_DIRECTORY_WRITE:04o}',
|
|
mode_file=f'{MODE_FILE:04o}',
|
|
)
|
|
|
|
|
|
class WindowsCoverageHandler(CoverageHandler[WindowsConfig]):
|
|
"""Configure integration test code coverage for Windows hosts."""
|
|
|
|
def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
|
|
super().__init__(args, host_state, inventory_path)
|
|
|
|
# Common temporary directory used on all Windows hosts that will be created writable by everyone.
|
|
self.remote_temp_path = f'C:\\ansible_test_coverage_{generate_name()}'
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
"""True if the handler should be used, otherwise False."""
|
|
return bool(self.profiles) and not self.args.coverage_check
|
|
|
|
def setup(self) -> None:
|
|
"""Perform setup for code coverage."""
|
|
self.run_playbook('windows_coverage_setup.yml', self.get_playbook_variables())
|
|
|
|
def teardown(self) -> None:
|
|
"""Perform teardown for code coverage."""
|
|
with tempfile.TemporaryDirectory() as local_temp_path:
|
|
variables = self.get_playbook_variables()
|
|
variables.update(
|
|
local_temp_path=local_temp_path,
|
|
)
|
|
|
|
self.run_playbook('windows_coverage_teardown.yml', variables)
|
|
|
|
for filename in os.listdir(local_temp_path):
|
|
if all(isinstance(profile.config, WindowsRemoteConfig) for profile in self.profiles):
|
|
prefix = 'remote'
|
|
elif all(isinstance(profile.config, WindowsInventoryConfig) for profile in self.profiles):
|
|
prefix = 'inventory'
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
platform = f'{prefix}-{sanitize_host_name(os.path.splitext(filename)[0])}'
|
|
|
|
with zipfile.ZipFile(os.path.join(local_temp_path, filename)) as coverage_zip:
|
|
for item in coverage_zip.infolist():
|
|
if item.is_dir():
|
|
raise Exception(f'Unexpected directory in zip file: {item.filename}')
|
|
|
|
item.filename = update_coverage_filename(item.filename, platform)
|
|
|
|
coverage_zip.extract(item, ResultType.COVERAGE.path)
|
|
|
|
def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]:
|
|
"""Return a dictionary of environment variables for running tests with code coverage."""
|
|
|
|
# Include the command, target and platform marker so the remote host can create a filename with that info.
|
|
# The remote is responsible for adding '={language-version}=coverage.{hostname}.{pid}.{id}'
|
|
coverage_name = '='.join((self.args.command, target_name, 'platform'))
|
|
|
|
variables = dict(
|
|
_ANSIBLE_COVERAGE_REMOTE_OUTPUT=os.path.join(self.remote_temp_path, coverage_name),
|
|
_ANSIBLE_COVERAGE_REMOTE_PATH_FILTER=os.path.join(data_context().content.root, '*'),
|
|
)
|
|
|
|
return variables
|
|
|
|
def create_inventory(self) -> None:
|
|
"""Create inventory."""
|
|
create_windows_inventory(self.args, self.inventory_path, self.host_state.target_profiles)
|
|
|
|
def get_playbook_variables(self) -> dict[str, str]:
|
|
"""Return a dictionary of variables for setup and teardown of Windows coverage."""
|
|
return dict(
|
|
remote_temp_path=self.remote_temp_path,
|
|
)
|
|
|
|
|
|
class CoverageManager:
|
|
"""Manager for code coverage configuration and state."""
|
|
|
|
def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
|
|
self.args = args
|
|
self.host_state = host_state
|
|
self.inventory_path = inventory_path
|
|
|
|
if self.args.coverage:
|
|
handler_types = set(get_handler_type(type(profile.config)) for profile in host_state.profiles)
|
|
handler_types.discard(None)
|
|
else:
|
|
handler_types = set()
|
|
|
|
handlers = [handler_type(args=args, host_state=host_state, inventory_path=inventory_path) for handler_type in handler_types]
|
|
|
|
self.handlers = [handler for handler in handlers if handler.is_active]
|
|
|
|
def setup(self) -> None:
|
|
"""Perform setup for code coverage."""
|
|
if not self.args.coverage:
|
|
return
|
|
|
|
for handler in self.handlers:
|
|
handler.setup()
|
|
|
|
def teardown(self) -> None:
|
|
"""Perform teardown for code coverage."""
|
|
if not self.args.coverage:
|
|
return
|
|
|
|
for handler in self.handlers:
|
|
handler.teardown()
|
|
|
|
def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]:
|
|
"""Return a dictionary of environment variables for running tests with code coverage."""
|
|
if not self.args.coverage or 'non_local/' in aliases:
|
|
return {}
|
|
|
|
env = {}
|
|
|
|
for handler in self.handlers:
|
|
env.update(handler.get_environment(target_name, aliases))
|
|
|
|
return env
|
|
|
|
|
|
@cache
|
|
def get_config_handler_type_map() -> dict[t.Type[HostConfig], t.Type[CoverageHandler]]:
|
|
"""Create and return a mapping of HostConfig types to CoverageHandler types."""
|
|
return get_type_map(CoverageHandler, HostConfig)
|
|
|
|
|
|
def get_handler_type(config_type: t.Type[HostConfig]) -> t.Optional[t.Type[CoverageHandler]]:
|
|
"""Return the coverage handler type associated with the given host config type if found, otherwise return None."""
|
|
queue = [config_type]
|
|
type_map = get_config_handler_type_map()
|
|
|
|
while queue:
|
|
config_type = queue.pop(0)
|
|
handler_type = type_map.get(config_type)
|
|
|
|
if handler_type:
|
|
return handler_type
|
|
|
|
queue.extend(config_type.__bases__)
|
|
|
|
return None
|
|
|
|
|
|
def update_coverage_filename(original_filename: str, platform: str) -> str:
|
|
"""Validate the given filename and insert the specified platform, then return the result."""
|
|
parts = original_filename.split('=')
|
|
|
|
if original_filename != os.path.basename(original_filename) or len(parts) != 5 or parts[2] != 'platform':
|
|
raise Exception(f'Unexpected coverage filename: {original_filename}')
|
|
|
|
parts[2] = platform
|
|
|
|
updated_filename = '='.join(parts)
|
|
|
|
display.info(f'Coverage file for platform "{platform}": {original_filename} -> {updated_filename}', verbosity=3)
|
|
|
|
return updated_filename
|