Add code coverage target analysis to ansible-test. (#67141)

* Refactor coverage file enumeration.
* Relocate sanitize_filename function.
* Support sets when writing JSON files.
* Generalize setting of info_stderr mode.
* Split out coverage path checking.
* Split out collection regex logic.
* Improve sanitize_filename type hints and docs.
* Clean up coverage erase command.
* Fix docs and type hints for initialize_coverage.
* Update type hints on CoverageConfig.
* Split out logic for finding modules.
* Split out arc enumeration.
* Split out powershell coverage enumeration.
* Raise verbosity level of empty coverage warnings.
* Add code coverage target analysis to ansible-test.
pull/67151/head
Matt Clay 6 years ago committed by GitHub
parent 68b981ae21
commit 5e68bb3d93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,2 @@
minor_changes:
- "ansible-test - Added a ``ansible-test coverage analyze targets`` command to analyze integration test code coverage by test target."

@ -12,6 +12,8 @@ from .init import (
CURRENT_RLIMIT_NOFILE, CURRENT_RLIMIT_NOFILE,
) )
from . import types as t
from .util import ( from .util import (
ApplicationError, ApplicationError,
display, display,
@ -42,7 +44,6 @@ from .executor import (
) )
from .config import ( from .config import (
IntegrationConfig,
PosixIntegrationConfig, PosixIntegrationConfig,
WindowsIntegrationConfig, WindowsIntegrationConfig,
NetworkIntegrationConfig, NetworkIntegrationConfig,
@ -113,11 +114,34 @@ from .coverage.xml import (
command_coverage_xml, command_coverage_xml,
) )
from .coverage.analyze.targets.generate import (
command_coverage_analyze_targets_generate,
CoverageAnalyzeTargetsGenerateConfig,
)
from .coverage.analyze.targets.expand import (
command_coverage_analyze_targets_expand,
CoverageAnalyzeTargetsExpandConfig,
)
from .coverage.analyze.targets.combine import (
command_coverage_analyze_targets_combine,
CoverageAnalyzeTargetsCombineConfig,
)
from .coverage.analyze.targets.missing import (
command_coverage_analyze_targets_missing,
CoverageAnalyzeTargetsMissingConfig,
)
from .coverage import ( from .coverage import (
COVERAGE_GROUPS, COVERAGE_GROUPS,
CoverageConfig, CoverageConfig,
) )
if t.TYPE_CHECKING:
import argparse as argparse_module
def main(): def main():
"""Main program function.""" """Main program function."""
@ -131,7 +155,7 @@ def main():
display.truncate = config.truncate display.truncate = config.truncate
display.redact = config.redact display.redact = config.redact
display.color = config.color display.color = config.color
display.info_stderr = (isinstance(config, SanityConfig) and config.lint) or (isinstance(config, IntegrationConfig) and config.list_targets) display.info_stderr = config.info_stderr
check_startup() check_startup()
check_delegation_args(config) check_delegation_args(config)
configure_timeout(config) configure_timeout(config)
@ -147,6 +171,7 @@ def main():
delegate_args = (ex.exclude, ex.require, ex.integration_targets) delegate_args = (ex.exclude, ex.require, ex.integration_targets)
if delegate_args: if delegate_args:
# noinspection PyTypeChecker
delegate(config, *delegate_args) delegate(config, *delegate_args)
display.review_warnings() display.review_warnings()
@ -513,6 +538,8 @@ def parse_args():
coverage_subparsers = coverage.add_subparsers(metavar='COMMAND') coverage_subparsers = coverage.add_subparsers(metavar='COMMAND')
coverage_subparsers.required = True # work-around for python 3 bug which makes subparsers optional coverage_subparsers.required = True # work-around for python 3 bug which makes subparsers optional
add_coverage_analyze(coverage_subparsers, coverage_common)
coverage_combine = coverage_subparsers.add_parser('combine', coverage_combine = coverage_subparsers.add_parser('combine',
parents=[coverage_common], parents=[coverage_common],
help='combine coverage data and rewrite remote paths') help='combine coverage data and rewrite remote paths')
@ -608,6 +635,129 @@ def parse_args():
return args return args
# noinspection PyProtectedMember
def add_coverage_analyze(coverage_subparsers, coverage_common): # type: (argparse_module._SubParsersAction, argparse_module.ArgumentParser) -> None
"""Add the `coverage analyze` subcommand."""
analyze = coverage_subparsers.add_parser(
'analyze',
help='analyze collected coverage data',
)
analyze_subparsers = analyze.add_subparsers(metavar='COMMAND')
analyze_subparsers.required = True # work-around for python 3 bug which makes subparsers optional
targets = analyze_subparsers.add_parser(
'targets',
help='analyze integration test target coverage',
)
targets_subparsers = targets.add_subparsers(metavar='COMMAND')
targets_subparsers.required = True # work-around for python 3 bug which makes subparsers optional
targets_generate = targets_subparsers.add_parser(
'generate',
parents=[coverage_common],
help='aggregate coverage by integration test target',
)
targets_generate.set_defaults(
func=command_coverage_analyze_targets_generate,
config=CoverageAnalyzeTargetsGenerateConfig,
)
targets_generate.add_argument(
'input_dir',
nargs='?',
help='directory to read coverage from',
)
targets_generate.add_argument(
'output_file',
help='output file for aggregated coverage',
)
targets_expand = targets_subparsers.add_parser(
'expand',
parents=[coverage_common],
help='expand target names from integers in aggregated coverage',
)
targets_expand.set_defaults(
func=command_coverage_analyze_targets_expand,
config=CoverageAnalyzeTargetsExpandConfig,
)
targets_expand.add_argument(
'input_file',
help='input file to read aggregated coverage from',
)
targets_expand.add_argument(
'output_file',
help='output file to write expanded coverage to',
)
targets_combine = targets_subparsers.add_parser(
'combine',
parents=[coverage_common],
help='combine multiple aggregated coverage files',
)
targets_combine.set_defaults(
func=command_coverage_analyze_targets_combine,
config=CoverageAnalyzeTargetsCombineConfig,
)
targets_combine.add_argument(
'input_file',
nargs='+',
help='input file to read aggregated coverage from',
)
targets_combine.add_argument(
'output_file',
help='output file to write aggregated coverage to',
)
targets_missing = targets_subparsers.add_parser(
'missing',
parents=[coverage_common],
help='identify coverage in one file missing in another',
)
targets_missing.set_defaults(
func=command_coverage_analyze_targets_missing,
config=CoverageAnalyzeTargetsMissingConfig,
)
targets_missing.add_argument(
'from_file',
help='input file containing aggregated coverage',
)
targets_missing.add_argument(
'to_file',
help='input file containing aggregated coverage',
)
targets_missing.add_argument(
'output_file',
help='output file to write aggregated coverage to',
)
targets_missing.add_argument(
'--only-gaps',
action='store_true',
help='report only arcs/lines not hit by any target',
)
targets_missing.add_argument(
'--only-exists',
action='store_true',
help='limit results to files that exist',
)
def add_lint(parser): def add_lint(parser):
""" """
:type parser: argparse.ArgumentParser :type parser: argparse.ArgumentParser
@ -923,6 +1073,6 @@ def complete_sanity_test(prefix, parsed_args, **_):
""" """
del parsed_args del parsed_args
tests = sorted(t.name for t in sanity_get_tests()) tests = sorted(test.name for test in sanity_get_tests())
return [i for i in tests if i.startswith(prefix)] return [i for i in tests if i.startswith(prefix)]

@ -225,6 +225,8 @@ class SanityConfig(TestConfig):
else: else:
self.base_branch = '' self.base_branch = ''
self.info_stderr = self.lint
class IntegrationConfig(TestConfig): class IntegrationConfig(TestConfig):
"""Configuration for the integration command.""" """Configuration for the integration command."""
@ -260,6 +262,7 @@ class IntegrationConfig(TestConfig):
if self.list_targets: if self.list_targets:
self.explain = True self.explain = True
self.info_stderr = True
def get_ansible_config(self): # type: () -> str def get_ansible_config(self): # type: () -> str
"""Return the path to the Ansible config for the given config.""" """Return the path to the Ansible config for the given config."""

@ -3,17 +3,28 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import os import os
import re
from .. import types as t from .. import types as t
from ..encoding import (
to_bytes,
)
from ..io import (
read_json_file,
)
from ..util import ( from ..util import (
ApplicationError, ApplicationError,
common_environment, common_environment,
display,
ANSIBLE_TEST_DATA_ROOT, ANSIBLE_TEST_DATA_ROOT,
) )
from ..util_common import ( from ..util_common import (
intercept_command, intercept_command,
ResultType,
) )
from ..config import ( from ..config import (
@ -25,16 +36,24 @@ from ..executor import (
install_command_requirements, install_command_requirements,
) )
from .. target import (
walk_module_targets,
)
from ..data import (
data_context,
)
if t.TYPE_CHECKING:
import coverage as coverage_module
COVERAGE_GROUPS = ('command', 'target', 'environment', 'version') COVERAGE_GROUPS = ('command', 'target', 'environment', 'version')
COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc') COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc')
COVERAGE_OUTPUT_FILE_NAME = 'coverage' COVERAGE_OUTPUT_FILE_NAME = 'coverage'
def initialize_coverage(args): def initialize_coverage(args): # type: (CoverageConfig) -> coverage_module
""" """Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available."""
:type args: CoverageConfig
:rtype: coverage
"""
if args.delegate: if args.delegate:
raise Delegate() raise Delegate()
@ -62,15 +81,206 @@ def run_coverage(args, output_file, command, cmd): # type: (CoverageConfig, str
intercept_command(args, target_name='coverage', env=env, cmd=cmd, disable_coverage=True) intercept_command(args, target_name='coverage', env=env, cmd=cmd, disable_coverage=True)
def get_python_coverage_files(): # type: () -> t.List[str]
"""Return the list of Python coverage file paths."""
return get_coverage_files('python')
def get_powershell_coverage_files(): # type: () -> t.List[str]
"""Return the list of PowerShell coverage file paths."""
return get_coverage_files('powershell')
def get_coverage_files(language): # type: (str) -> t.List[str]
"""Return the list of coverage file paths for the given language."""
coverage_dir = ResultType.COVERAGE.path
coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir)
if '=coverage.' in f and '=%s' % language in f]
return coverage_files
def get_collection_path_regexes(): # type: () -> t.Tuple[t.Optional[t.Pattern], t.Optional[t.Pattern]]
"""Return a pair of regexes used for identifying and manipulating collection paths."""
if data_context().content.collection:
collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory)
collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory)
else:
collection_search_re = None
collection_sub_re = None
return collection_search_re, collection_sub_re
def get_python_modules(): # type: () -> t.Dict[str, str]
"""Return a dictionary of Ansible module names and their paths."""
return dict((target.module, target.path) for target in list(walk_module_targets()) if target.path.endswith('.py'))
def enumerate_python_arcs(
path, # type: str
coverage, # type: coverage_module
modules, # type: t.Dict[str, str]
collection_search_re, # type: t.Optional[t.Pattern]
collection_sub_re, # type: t.Optional[t.Pattern]
): # type: (...) -> t.Generator[t.Tuple[str, t.Set[t.Tuple[int, int]]]]
"""Enumerate Python code coverage arcs in the given file."""
if os.path.getsize(path) == 0:
display.warning('Empty coverage file: %s' % path, verbosity=2)
return
original = coverage.CoverageData()
try:
original.read_file(path)
except Exception as ex: # pylint: disable=locally-disabled, broad-except
display.error(u'%s' % ex)
return
for filename in original.measured_files():
arcs = original.arcs(filename)
if not arcs:
# This is most likely due to using an unsupported version of coverage.
display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path))
continue
filename = sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re)
if not filename:
continue
yield filename, set(arcs)
def enumerate_powershell_lines(path): # type: (str) -> t.Generator[t.Tuple[str, t.Dict[int, int]]]
"""Enumerate PowerShell code coverage lines in the given file."""
if os.path.getsize(path) == 0:
display.warning('Empty coverage file: %s' % path, verbosity=2)
return
try:
coverage_run = read_json_file(path)
except Exception as ex: # pylint: disable=locally-disabled, broad-except
display.error(u'%s' % ex)
return
for filename, hits in coverage_run.items():
filename = sanitize_filename(filename)
if not filename:
continue
# PowerShell unpacks arrays if there's only a single entry so this is a defensive check on that
if not isinstance(hits, list):
hits = [hits]
hits = dict((hit['Line'], hit['HitCount']) for hit in hits if hit)
yield filename, hits
def sanitize_filename(
filename, # type: str
modules=None, # type: t.Optional[t.Dict[str, str]]
collection_search_re=None, # type: t.Optional[t.Pattern]
collection_sub_re=None, # type: t.Optional[t.Pattern]
): # type: (...) -> t.Optional[str]
"""Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid."""
ansible_path = os.path.abspath('lib/ansible/') + '/'
root_path = data_context().content.root + '/'
integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep
if modules is None:
modules = {}
if '/ansible_modlib.zip/ansible/' in filename:
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier.
new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif collection_search_re and collection_search_re.search(filename):
new_name = os.path.abspath(collection_sub_re.sub('', filename))
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename):
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later.
new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif '/ansible_module_' in filename:
# Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier.
module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename)
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename):
# Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later.
# AnsiballZ versions using zipimporter will match the `.zip` portion of the regex.
# AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex.
module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$',
'\\g<module>', filename).rstrip('_')
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search('^(/.*?)?/root/ansible/', filename):
# Rewrite the path of code running on a remote host or in a docker container as root.
new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif integration_temp_path in filename:
# Rewrite the path of code running from an integration test temporary directory.
new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
return filename
class CoverageConfig(EnvironmentConfig): class CoverageConfig(EnvironmentConfig):
"""Configuration for the coverage command.""" """Configuration for the coverage command."""
def __init__(self, args): def __init__(self, args): # type: (t.Any) -> None
"""
:type args: any
"""
super(CoverageConfig, self).__init__(args, 'coverage') super(CoverageConfig, self).__init__(args, 'coverage')
self.group_by = frozenset(args.group_by) if 'group_by' in args and args.group_by else set() # type: t.FrozenSet[str] self.group_by = frozenset(args.group_by) if 'group_by' in args and args.group_by else set() # type: t.FrozenSet[str]
self.all = args.all if 'all' in args else False # type: bool self.all = args.all if 'all' in args else False # type: bool
self.stub = args.stub if 'stub' in args else False # type: bool self.stub = args.stub if 'stub' in args else False # type: bool
self.coverage = False # temporary work-around to support intercept_command in cover.py self.coverage = False # temporary work-around to support intercept_command in cover.py
class PathChecker:
"""Checks code coverage paths to verify they are valid and reports on the findings."""
def __init__(self, args, collection_search_re=None): # type: (CoverageConfig, t.Optional[t.Pattern]) -> None
self.args = args
self.collection_search_re = collection_search_re
self.invalid_paths = []
self.invalid_path_chars = 0
def check_path(self, path): # type: (str) -> bool
"""Return True if the given coverage path is valid, otherwise display a warning and return False."""
if os.path.isfile(to_bytes(path)):
return True
if self.collection_search_re and self.collection_search_re.search(path) and os.path.basename(path) == '__init__.py':
# the collection loader uses implicit namespace packages, so __init__.py does not need to exist on disk
# coverage is still reported for these non-existent files, but warnings are not needed
return False
self.invalid_paths.append(path)
self.invalid_path_chars += len(path)
if self.args.verbosity > 1:
display.warning('Invalid coverage path: %s' % path)
return False
def report(self): # type: () -> None
"""Display a warning regarding invalid paths if any were found."""
if self.invalid_paths:
display.warning('Ignored %d characters from %d invalid coverage path(s).' % (self.invalid_path_chars, len(self.invalid_paths)))

@ -0,0 +1,19 @@
"""Common logic for the `coverage analyze` subcommand."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ... import types as t
from .. import (
CoverageConfig,
)
class CoverageAnalyzeConfig(CoverageConfig):
"""Configuration for the `coverage analyze` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeConfig, self).__init__(args)
# avoid mixing log messages with file output when using `/dev/stdout` for the output file on commands
# this may be worth considering as the default behavior in the future, instead of being dependent on the command or options used
self.info_stderr = True

@ -0,0 +1,115 @@
"""Analyze integration test target code coverage."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from .... import types as t
from ....io import (
read_json_file,
write_json_file,
)
from ....util import (
ApplicationError,
display,
)
from .. import (
CoverageAnalyzeConfig,
)
if t.TYPE_CHECKING:
Arcs = t.Dict[str, t.Dict[t.Tuple[int, int], t.Set[int]]]
Lines = t.Dict[str, t.Dict[int, t.Set[int]]]
TargetIndexes = t.Dict[str, int]
TargetSetIndexes = t.Dict[t.FrozenSet[int], int]
def make_report(target_indexes, arcs, lines): # type: (TargetIndexes, Arcs, Lines) -> t.Dict[str, t.Any]
"""Condense target indexes, arcs and lines into a compact report."""
set_indexes = {}
arc_refs = dict((path, dict((format_arc(arc), get_target_set_index(indexes, set_indexes)) for arc, indexes in data.items())) for path, data in arcs.items())
line_refs = dict((path, dict((line, get_target_set_index(indexes, set_indexes)) for line, indexes in data.items())) for path, data in lines.items())
report = dict(
targets=[name for name, index in sorted(target_indexes.items(), key=lambda kvp: kvp[1])],
target_sets=[sorted(data) for data, index in sorted(set_indexes.items(), key=lambda kvp: kvp[1])],
arcs=arc_refs,
lines=line_refs,
)
return report
def load_report(report): # type: (t.Dict[str, t.Any]) -> t.Tuple[t.List[str], Arcs, Lines]
"""Extract target indexes, arcs and lines from an existing report."""
try:
target_indexes = report['targets'] # type: t.List[str]
target_sets = report['target_sets'] # type: t.List[t.List[int]]
arc_data = report['arcs'] # type: t.Dict[str, t.Dict[str, int]]
line_data = report['lines'] # type: t.Dict[str, t.Dict[int, int]]
except KeyError as ex:
raise ApplicationError('Document is missing key "%s".' % ex.args)
except TypeError:
raise ApplicationError('Document is type "%s" instead of "dict".' % type(report).__name__)
arcs = dict((path, dict((parse_arc(arc), set(target_sets[index])) for arc, index in data.items())) for path, data in arc_data.items())
lines = dict((path, dict((int(line), set(target_sets[index])) for line, index in data.items())) for path, data in line_data.items())
return target_indexes, arcs, lines
def read_report(path): # type: (str) -> t.Tuple[t.List[str], Arcs, Lines]
"""Read a JSON report from disk."""
try:
report = read_json_file(path)
except Exception as ex:
raise ApplicationError('File "%s" is not valid JSON: %s' % (path, ex))
try:
return load_report(report)
except ApplicationError as ex:
raise ApplicationError('File "%s" is not an aggregated coverage data file. %s' % (path, ex))
def write_report(args, report, path): # type: (CoverageAnalyzeTargetsConfig, t.Dict[str, t.Any], str) -> None
"""Write a JSON report to disk."""
if args.explain:
return
write_json_file(path, report, formatted=False)
display.info('Generated %d byte report with %d targets covering %d files.' % (
os.path.getsize(path), len(report['targets']), len(set(report['arcs'].keys()) | set(report['lines'].keys())),
), verbosity=1)
def format_arc(value): # type: (t.Tuple[int, int]) -> str
"""Format an arc tuple as a string."""
return '%d:%d' % value
def parse_arc(value): # type: (str) -> t.Tuple[int, int]
"""Parse an arc string into a tuple."""
first, last = tuple(map(int, value.split(':')))
return first, last
def get_target_set_index(data, target_set_indexes): # type: (t.Set[int], TargetSetIndexes) -> int
"""Find or add the target set in the result set and return the target set index."""
return target_set_indexes.setdefault(frozenset(data), len(target_set_indexes))
def get_target_index(name, target_indexes): # type: (str, TargetIndexes) -> int
"""Find or add the target in the result set and return the target index."""
return target_indexes.setdefault(name, len(target_indexes))
class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig):
"""Configuration for the `coverage analyze targets` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsConfig, self).__init__(args)
self.info_stderr = True

@ -0,0 +1,63 @@
"""Combine integration test target code coverage reports."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from .... import types as t
from . import (
CoverageAnalyzeTargetsConfig,
get_target_index,
make_report,
read_report,
write_report,
)
if t.TYPE_CHECKING:
from . import (
Arcs,
Lines,
TargetIndexes,
)
def command_coverage_analyze_targets_combine(args): # type: (CoverageAnalyzeTargetsCombineConfig) -> None
"""Combine integration test target code coverage reports."""
combined_target_indexes = {} # type: TargetIndexes
combined_path_arcs = {} # type: Arcs
combined_path_lines = {} # type: Lines
for report_path in args.input_files:
covered_targets, covered_path_arcs, covered_path_lines = read_report(report_path)
merge_indexes(covered_path_arcs, covered_targets, combined_path_arcs, combined_target_indexes)
merge_indexes(covered_path_lines, covered_targets, combined_path_lines, combined_target_indexes)
report = make_report(combined_target_indexes, combined_path_arcs, combined_path_lines)
write_report(args, report, args.output_file)
def merge_indexes(
source_data, # type: t.Dict[str, t.Dict[t.Any, t.Set[int]]]
source_index, # type: t.List[str]
combined_data, # type: t.Dict[str, t.Dict[t.Any, t.Set[int]]]
combined_index, # type: TargetIndexes
): # type: (...) -> None
"""Merge indexes from the source into the combined data set (arcs or lines)."""
for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {})
for covered_point, covered_target_indexes in covered_points.items():
combined_point = combined_points.setdefault(covered_point, set())
for covered_target_index in covered_target_indexes:
combined_point.add(get_target_index(source_index[covered_target_index], combined_index))
class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets combine` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsCombineConfig, self).__init__(args)
self.input_files = args.input_file # type: t.List[str]
self.output_file = args.output_file # type: str

@ -0,0 +1,58 @@
"""Expand target names in an aggregated coverage file."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from .... import types as t
from ....io import (
SortedSetEncoder,
write_json_file,
)
from . import (
CoverageAnalyzeTargetsConfig,
format_arc,
read_report,
)
def command_coverage_analyze_targets_expand(args): # type: (CoverageAnalyzeTargetsExpandConfig) -> None
"""Expand target names in an aggregated coverage file."""
covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file)
report = dict(
arcs=expand_indexes(covered_path_arcs, covered_targets, format_arc),
lines=expand_indexes(covered_path_lines, covered_targets, str),
)
if not args.explain:
write_json_file(args.output_file, report, encoder=SortedSetEncoder)
def expand_indexes(
source_data, # type: t.Dict[str, t.Dict[t.Any, t.Set[int]]]
source_index, # type: t.List[str]
format_func, # type: t.Callable[t.Tuple[t.Any], str]
): # type: (...) -> t.Dict[str, t.Dict[t.Any, t.Set[str]]]
"""Merge indexes from the source into the combined data set (arcs or lines)."""
combined_data = {} # type: t.Dict[str, t.Dict[t.Any, t.Set[str]]]
for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {})
for covered_point, covered_target_indexes in covered_points.items():
combined_point = combined_points.setdefault(format_func(covered_point), set())
for covered_target_index in covered_target_indexes:
combined_point.add(source_index[covered_target_index])
return combined_data
class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets expand` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsExpandConfig, self).__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str

@ -0,0 +1,143 @@
"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from .... import types as t
from ....encoding import (
to_text,
)
from ....data import (
data_context,
)
from ....util_common import (
ResultType,
)
from ... import (
enumerate_powershell_lines,
enumerate_python_arcs,
get_collection_path_regexes,
get_powershell_coverage_files,
get_python_coverage_files,
get_python_modules,
initialize_coverage,
PathChecker,
)
from . import (
CoverageAnalyzeTargetsConfig,
get_target_index,
make_report,
write_report,
)
if t.TYPE_CHECKING:
from . import (
Arcs,
Lines,
TargetIndexes,
)
def command_coverage_analyze_targets_generate(args): # type: (CoverageAnalyzeTargetsGenerateConfig) -> None
"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line."""
root = data_context().content.root
target_indexes = {}
arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, target_indexes).items())
lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, target_indexes).items())
report = make_report(target_indexes, arcs, lines)
write_report(args, report, args.output_file)
def analyze_python_coverage(
args, # type: CoverageAnalyzeTargetsConfig
target_indexes, # type: TargetIndexes
): # type: (...) -> Arcs
"""Analyze Python code coverage."""
results = {} # type: Arcs
collection_search_re, collection_sub_re = get_collection_path_regexes()
modules = get_python_modules()
python_files = get_python_coverage_files()
coverage = initialize_coverage(args)
for python_file in python_files:
if not is_integration_coverage_file(python_file):
continue
target_name = get_target_name(python_file)
target_index = get_target_index(target_name, target_indexes)
for filename, covered_arcs in enumerate_python_arcs(python_file, coverage, modules, collection_search_re, collection_sub_re):
arcs = results.setdefault(filename, {})
for covered_arc in covered_arcs:
arc = arcs.setdefault(covered_arc, set())
arc.add(target_index)
prune_invalid_filenames(args, results, collection_search_re=collection_search_re)
return results
def analyze_powershell_coverage(
args, # type: CoverageAnalyzeTargetsConfig
target_indexes, # type: TargetIndexes
): # type: (...) -> Lines
"""Analyze PowerShell code coverage"""
results = {} # type: Lines
powershell_files = get_powershell_coverage_files()
for powershell_file in powershell_files:
if not is_integration_coverage_file(powershell_file):
continue
target_name = get_target_name(powershell_file)
target_index = get_target_index(target_name, target_indexes)
for filename, hits in enumerate_powershell_lines(powershell_file):
lines = results.setdefault(filename, {})
for covered_line in hits:
line = lines.setdefault(covered_line, set())
line.add(target_index)
prune_invalid_filenames(args, results)
return results
def prune_invalid_filenames(
args, # type: CoverageAnalyzeTargetsConfig
results, # type: t.Dict[str, t.Any]
collection_search_re=None, # type: t.Optional[str]
): # type: (...) -> None
"""Remove invalid filenames from the given result set."""
path_checker = PathChecker(args, collection_search_re)
for path in list(results.keys()):
if not path_checker.check_path(path):
del results[path]
def get_target_name(path): # type: (str) -> str
"""Extract the test target name from the given coverage path."""
return to_text(os.path.basename(path).split('=')[1])
def is_integration_coverage_file(path): # type: (str) -> bool
"""Returns True if the coverage file came from integration tests, otherwise False."""
return os.path.basename(path).split('=')[0] in ('integration', 'windows-integration', 'network-integration')
class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets generate` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsGenerateConfig, self).__init__(args)
self.input_dir = args.input_dir or ResultType.COVERAGE.path # type: str
self.output_file = args.output_file # type: str

@ -0,0 +1,110 @@
"""Identify aggregated coverage in one file missing from another."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from .... import types as t
from ....encoding import (
to_bytes,
)
from . import (
CoverageAnalyzeTargetsConfig,
get_target_index,
make_report,
read_report,
write_report,
)
if t.TYPE_CHECKING:
from . import (
TargetIndexes,
)
TargetKey = t.TypeVar('TargetKey', int, t.Tuple[int, int])
def command_coverage_analyze_targets_missing(args): # type: (CoverageAnalyzeTargetsMissingConfig) -> None
"""Identify aggregated coverage in one file missing from another."""
from_targets, from_path_arcs, from_path_lines = read_report(args.from_file)
to_targets, to_path_arcs, to_path_lines = read_report(args.to_file)
target_indexes = {}
if args.only_gaps:
arcs = find_gaps(from_path_arcs, from_targets, to_path_arcs, target_indexes, args.only_exists)
lines = find_gaps(from_path_lines, from_targets, to_path_lines, target_indexes, args.only_exists)
else:
arcs = find_missing(from_path_arcs, from_targets, to_path_arcs, to_targets, target_indexes, args.only_exists)
lines = find_missing(from_path_lines, from_targets, to_path_lines, to_targets, target_indexes, args.only_exists)
report = make_report(target_indexes, arcs, lines)
write_report(args, report, args.output_file)
def find_gaps(
from_data, # type: t.Dict[str, t.Dict[TargetKey, t.Set[int]]]
from_index, # type: t.List[str]
to_data, # type: t.Dict[str, t.Dict[TargetKey, t.Set[int]]]
target_indexes, # type: TargetIndexes,
only_exists, # type: bool
): # type: (...) -> t.Dict[str, t.Dict[TargetKey, t.Set[int]]]
"""Find gaps in coverage between the from and to data sets."""
target_data = {}
for from_path, from_points in from_data.items():
if only_exists and not os.path.isfile(to_bytes(from_path)):
continue
to_points = to_data.get(from_path, {})
gaps = set(from_points.keys()) - set(to_points.keys())
if gaps:
gap_points = dict((key, value) for key, value in from_points.items() if key in gaps)
target_data[from_path] = dict((gap, set(get_target_index(from_index[i], target_indexes) for i in indexes)) for gap, indexes in gap_points.items())
return target_data
def find_missing(
from_data, # type: t.Dict[str, t.Dict[TargetKey, t.Set[int]]]
from_index, # type: t.List[str]
to_data, # type: t.Dict[str, t.Dict[TargetKey, t.Set[int]]]
to_index, # type: t.List[str]
target_indexes, # type: TargetIndexes,
only_exists, # type: bool
): # type: (...) -> t.Dict[str, t.Dict[TargetKey, t.Set[int]]]
"""Find coverage in from_data not present in to_data (arcs or lines)."""
target_data = {}
for from_path, from_points in from_data.items():
if only_exists and not os.path.isfile(to_bytes(from_path)):
continue
to_points = to_data.get(from_path, {})
for from_point, from_target_indexes in from_points.items():
to_target_indexes = to_points.get(from_point, set())
remaining_targets = set(from_index[i] for i in from_target_indexes) - set(to_index[i] for i in to_target_indexes)
if remaining_targets:
target_index = target_data.setdefault(from_path, {}).setdefault(from_point, set())
target_index.update(get_target_index(name, target_indexes) for name in remaining_targets)
return target_data
class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets missing` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsMissingConfig, self).__init__(args)
self.from_file = args.from_file # type: str
self.to_file = args.to_file # type: str
self.output_file = args.output_file # type: str
self.only_gaps = args.only_gaps # type: bool
self.only_exists = args.only_exists # type: bool

@ -3,16 +3,13 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import os import os
import re
from ..target import ( from ..target import (
walk_module_targets,
walk_compile_targets, walk_compile_targets,
walk_powershell_targets, walk_powershell_targets,
) )
from ..io import ( from ..io import (
read_json_file,
read_text_file, read_text_file,
) )
@ -25,15 +22,18 @@ from ..util_common import (
write_json_test_results, write_json_test_results,
) )
from ..data import (
data_context,
)
from . import ( from . import (
enumerate_python_arcs,
enumerate_powershell_lines,
get_collection_path_regexes,
get_python_coverage_files,
get_python_modules,
get_powershell_coverage_files,
initialize_coverage, initialize_coverage,
COVERAGE_OUTPUT_FILE_NAME, COVERAGE_OUTPUT_FILE_NAME,
COVERAGE_GROUPS, COVERAGE_GROUPS,
CoverageConfig, CoverageConfig,
PathChecker,
) )
@ -57,58 +57,27 @@ def _command_coverage_combine_python(args):
""" """
coverage = initialize_coverage(args) coverage = initialize_coverage(args)
modules = dict((target.module, target.path) for target in list(walk_module_targets()) if target.path.endswith('.py')) modules = get_python_modules()
coverage_dir = ResultType.COVERAGE.path coverage_files = get_python_coverage_files()
coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir)
if '=coverage.' in f and '=python' in f]
counter = 0 counter = 0
sources = _get_coverage_targets(args, walk_compile_targets) sources = _get_coverage_targets(args, walk_compile_targets)
groups = _build_stub_groups(args, sources, lambda line_count: set()) groups = _build_stub_groups(args, sources, lambda line_count: set())
if data_context().content.collection: collection_search_re, collection_sub_re = get_collection_path_regexes()
collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory)
collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory)
else:
collection_search_re = None
collection_sub_re = None
for coverage_file in coverage_files: for coverage_file in coverage_files:
counter += 1 counter += 1
display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2) display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)
original = coverage.CoverageData()
group = get_coverage_group(args, coverage_file) group = get_coverage_group(args, coverage_file)
if group is None: if group is None:
display.warning('Unexpected name for coverage file: %s' % coverage_file) display.warning('Unexpected name for coverage file: %s' % coverage_file)
continue continue
if os.path.getsize(coverage_file) == 0: for filename, arcs in enumerate_python_arcs(coverage_file, coverage, modules, collection_search_re, collection_sub_re):
display.warning('Empty coverage file: %s' % coverage_file)
continue
try:
original.read_file(coverage_file)
except Exception as ex: # pylint: disable=locally-disabled, broad-except
display.error(u'%s' % ex)
continue
for filename in original.measured_files():
arcs = set(original.arcs(filename) or [])
if not arcs:
# This is most likely due to using an unsupported version of coverage.
display.warning('No arcs found for "%s" in coverage file: %s' % (filename, coverage_file))
continue
filename = _sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re,
collection_sub_re=collection_sub_re)
if not filename:
continue
if group not in groups: if group not in groups:
groups[group] = {} groups[group] = {}
@ -120,28 +89,18 @@ def _command_coverage_combine_python(args):
arc_data[filename].update(arcs) arc_data[filename].update(arcs)
output_files = [] output_files = []
invalid_path_count = 0
invalid_path_chars = 0
coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME) coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME)
path_checker = PathChecker(args, collection_search_re)
for group in sorted(groups): for group in sorted(groups):
arc_data = groups[group] arc_data = groups[group]
updated = coverage.CoverageData() updated = coverage.CoverageData()
for filename in arc_data: for filename in arc_data:
if not os.path.isfile(filename): if not path_checker.check_path(filename):
if collection_search_re and collection_search_re.search(filename) and os.path.basename(filename) == '__init__.py':
# the collection loader uses implicit namespace packages, so __init__.py does not need to exist on disk
continue
invalid_path_count += 1
invalid_path_chars += len(filename)
if args.verbosity > 1:
display.warning('Invalid coverage path: %s' % filename)
continue continue
updated.add_arcs({filename: list(arc_data[filename])}) updated.add_arcs({filename: list(arc_data[filename])})
@ -154,8 +113,7 @@ def _command_coverage_combine_python(args):
updated.write_file(output_file) updated.write_file(output_file)
output_files.append(output_file) output_files.append(output_file)
if invalid_path_count > 0: path_checker.report()
display.warning('Ignored %d characters from %d invalid coverage path(s).' % (invalid_path_chars, invalid_path_count))
return sorted(output_files) return sorted(output_files)
@ -165,9 +123,7 @@ def _command_coverage_combine_powershell(args):
:type args: CoverageConfig :type args: CoverageConfig
:rtype: list[str] :rtype: list[str]
""" """
coverage_dir = ResultType.COVERAGE.path coverage_files = get_powershell_coverage_files()
coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir)
if '=coverage.' in f and '=powershell' in f]
def _default_stub_value(lines): def _default_stub_value(lines):
val = {} val = {}
@ -189,57 +145,26 @@ def _command_coverage_combine_powershell(args):
display.warning('Unexpected name for coverage file: %s' % coverage_file) display.warning('Unexpected name for coverage file: %s' % coverage_file)
continue continue
if os.path.getsize(coverage_file) == 0: for filename, hits in enumerate_powershell_lines(coverage_file):
display.warning('Empty coverage file: %s' % coverage_file)
continue
try:
coverage_run = read_json_file(coverage_file)
except Exception as ex: # pylint: disable=locally-disabled, broad-except
display.error(u'%s' % ex)
continue
for filename, hit_info in coverage_run.items():
if group not in groups: if group not in groups:
groups[group] = {} groups[group] = {}
coverage_data = groups[group] coverage_data = groups[group]
filename = _sanitize_filename(filename)
if not filename:
continue
if filename not in coverage_data: if filename not in coverage_data:
coverage_data[filename] = {} coverage_data[filename] = {}
file_coverage = coverage_data[filename] file_coverage = coverage_data[filename]
if not isinstance(hit_info, list): for line_no, hit_count in hits.items():
hit_info = [hit_info] file_coverage[line_no] = file_coverage.get(line_no, 0) + hit_count
for hit_entry in hit_info:
if not hit_entry:
continue
line_count = file_coverage.get(hit_entry['Line'], 0) + hit_entry['HitCount']
file_coverage[hit_entry['Line']] = line_count
output_files = [] output_files = []
invalid_path_count = 0
invalid_path_chars = 0
for group in sorted(groups):
coverage_data = groups[group]
for filename in coverage_data: path_checker = PathChecker(args)
if not os.path.isfile(filename):
invalid_path_count += 1
invalid_path_chars += len(filename)
if args.verbosity > 1: for group in sorted(groups):
display.warning('Invalid coverage path: %s' % filename) coverage_data = dict((filename, data) for filename, data in groups[group].items() if path_checker.check_path(filename))
continue
if args.all: if args.all:
# Add 0 line entries for files not in coverage_data # Add 0 line entries for files not in coverage_data
@ -256,9 +181,7 @@ def _command_coverage_combine_powershell(args):
output_files.append(os.path.join(ResultType.COVERAGE.path, output_file)) output_files.append(os.path.join(ResultType.COVERAGE.path, output_file))
if invalid_path_count > 0: path_checker.report()
display.warning(
'Ignored %d characters from %d invalid coverage path(s).' % (invalid_path_chars, invalid_path_count))
return sorted(output_files) return sorted(output_files)
@ -346,67 +269,3 @@ def get_coverage_group(args, coverage_file):
group += '=%s' % names[part] group += '=%s' % names[part]
return group return group
def _sanitize_filename(filename, modules=None, collection_search_re=None, collection_sub_re=None):
"""
:type filename: str
:type modules: dict | None
:type collection_search_re: Pattern | None
:type collection_sub_re: Pattern | None
:rtype: str | None
"""
ansible_path = os.path.abspath('lib/ansible/') + '/'
root_path = data_context().content.root + '/'
integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep
if modules is None:
modules = {}
if '/ansible_modlib.zip/ansible/' in filename:
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier.
new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif collection_search_re and collection_search_re.search(filename):
new_name = os.path.abspath(collection_sub_re.sub('', filename))
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename):
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later.
new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif '/ansible_module_' in filename:
# Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier.
module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename)
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename):
# Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later.
# AnsiballZ versions using zipimporter will match the `.zip` portion of the regex.
# AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex.
module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$',
'\\g<module>', filename).rstrip('_')
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search('^(/.*?)?/root/ansible/', filename):
# Rewrite the path of code running on a remote host or in a docker container as root.
new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif integration_temp_path in filename:
# Rewrite the path of code running from an integration test temporary directory.
new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
return filename

@ -9,17 +9,12 @@ from ..util_common import (
) )
from . import ( from . import (
initialize_coverage,
CoverageConfig, CoverageConfig,
) )
def command_coverage_erase(args): def command_coverage_erase(args): # type: (CoverageConfig) -> None
""" """Erase code coverage data files collected during test runs."""
:type args: CoverageConfig
"""
initialize_coverage(args)
coverage_dir = ResultType.COVERAGE.path coverage_dir = ResultType.COVERAGE.path
for name in os.listdir(coverage_dir): for name in os.listdir(coverage_dir):

@ -41,9 +41,20 @@ def make_dirs(path): # type: (str) -> None
raise raise
def write_json_file(path, content, create_directories=False, formatted=True): # type: (str, t.Union[t.List[t.Any], t.Dict[str, t.Any]], bool, bool) -> None def write_json_file(path, # type: str
content, # type: t.Union[t.List[t.Any], t.Dict[str, t.Any]]
create_directories=False, # type: bool
formatted=True, # type: bool
encoder=None, # type: t.Optional[t.Callable[[t.Any], t.Any]]
): # type: (...) -> None
"""Write the given json content to the specified path, optionally creating missing directories.""" """Write the given json content to the specified path, optionally creating missing directories."""
text_content = json.dumps(content, sort_keys=formatted, indent=4 if formatted else None, separators=(', ', ': ') if formatted else (',', ':')) + '\n' text_content = json.dumps(content,
sort_keys=formatted,
indent=4 if formatted else None,
separators=(', ', ': ') if formatted else (',', ':'),
cls=encoder,
) + '\n'
write_text_file(path, text_content, create_directories=create_directories) write_text_file(path, text_content, create_directories=create_directories)
@ -72,3 +83,12 @@ def open_binary_file(path, mode='rb'): # type: (str, str) -> t.BinaryIO
# noinspection PyTypeChecker # noinspection PyTypeChecker
return io.open(to_bytes(path), mode) return io.open(to_bytes(path), mode)
class SortedSetEncoder(json.JSONEncoder):
"""Encode sets as sorted lists."""
def default(self, obj): # pylint: disable=method-hidden, arguments-differ
if isinstance(obj, set):
return sorted(obj)
return super(SortedSetEncoder).default(self, obj)

@ -101,6 +101,8 @@ class CommonConfig:
self.truncate = args.truncate # type: int self.truncate = args.truncate # type: int
self.redact = args.redact # type: bool self.redact = args.redact # type: bool
self.info_stderr = False # type: bool
self.cache = {} self.cache = {}
def get_ansible_config(self): # type: () -> str def get_ansible_config(self): # type: () -> str
@ -143,10 +145,15 @@ def named_temporary_file(args, prefix, suffix, directory, content):
yield tempfile_fd.name yield tempfile_fd.name
def write_json_test_results(category, name, content, formatted=True): # type: (ResultType, str, t.Union[t.List[t.Any], t.Dict[str, t.Any]], bool) -> None def write_json_test_results(category, # type: ResultType
name, # type: str
content, # type: t.Union[t.List[t.Any], t.Dict[str, t.Any]]
formatted=True, # type: bool
encoder=None, # type: t.Optional[t.Callable[[t.Any], t.Any]]
): # type: (...) -> None
"""Write the given json content to the specified test results path, creating directories as needed.""" """Write the given json content to the specified test results path, creating directories as needed."""
path = os.path.join(category.path, name) path = os.path.join(category.path, name)
write_json_file(path, content, create_directories=True, formatted=formatted) write_json_file(path, content, create_directories=True, formatted=formatted, encoder=encoder)
def write_text_test_results(category, name, content): # type: (ResultType, str, str) -> None def write_text_test_results(category, name, content): # type: (ResultType, str, str) -> None

@ -103,6 +103,9 @@ function cleanup
ansible-test coverage xml --color -v --requirements --group-by command --group-by version ${stub:+"$stub"} ansible-test coverage xml --color -v --requirements --group-by command --group-by version ${stub:+"$stub"}
cp -a test/results/reports/coverage=*.xml shippable/codecoverage/ cp -a test/results/reports/coverage=*.xml shippable/codecoverage/
# analyze and capture code coverage aggregated by integration test target
ansible-test coverage analyze targets generate -v shippable/testresults/coverage-analyze-targets.json
# upload coverage report to codecov.io only when using complete on-demand coverage # upload coverage report to codecov.io only when using complete on-demand coverage
if [ "${COVERAGE}" == "--coverage" ] && [ "${CHANGED}" == "" ]; then if [ "${COVERAGE}" == "--coverage" ] && [ "${CHANGED}" == "" ]; then
for file in test/results/reports/coverage=*.xml; do for file in test/results/reports/coverage=*.xml; do

Loading…
Cancel
Save