Reorganize ansible-test coverage code.

This change moves all code for the `ansible-test coverage` command into the `coverage` directory.

Each subcommand is split into a separate file.

Only minor spelling changes were made aside from code relocation.
pull/66973/head
Matt Clay 4 years ago
parent 2ea159eefd
commit d584584474

@ -0,0 +1,2 @@
minor_changes:
- reorganized code for the ``ansible-test coverage`` command for easier maintenance and feature additions

@ -92,15 +92,30 @@ from .util_common import (
CommonConfig,
)
from .cover import (
from .coverage.combine import (
command_coverage_combine,
)
from .coverage.erase import (
command_coverage_erase,
)
from .coverage.html import (
command_coverage_html,
)
from .coverage.report import (
command_coverage_report,
CoverageReportConfig,
)
from .coverage.xml import (
command_coverage_xml,
)
from .coverage import (
COVERAGE_GROUPS,
CoverageConfig,
CoverageReportConfig,
)

@ -330,30 +330,3 @@ class UnitsConfig(TestConfig):
self.requirements = True
elif self.requirements_mode == 'skip':
self.requirements = False
class CoverageConfig(EnvironmentConfig):
"""Configuration for the coverage command."""
def __init__(self, args):
"""
:type args: any
"""
super(CoverageConfig, self).__init__(args, 'coverage')
self.group_by = frozenset(args.group_by) if 'group_by' in args and args.group_by else set() # type: t.FrozenSet[str]
self.all = args.all if 'all' in args else False # type: bool
self.stub = args.stub if 'stub' in args else False # type: bool
self.coverage = False # temporary work-around to support intercept_command in cover.py
class CoverageReportConfig(CoverageConfig):
"""Configuration for the coverage report command."""
def __init__(self, args):
"""
:type args: any
"""
super(CoverageReportConfig, self).__init__(args)
self.show_missing = args.show_missing # type: bool
self.include = args.include # type: str
self.omit = args.omit # type: str

@ -0,0 +1,76 @@
"""Common logic for the coverage subcommand."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from .. import types as t
from ..util import (
ApplicationError,
common_environment,
ANSIBLE_TEST_DATA_ROOT,
)
from ..util_common import (
intercept_command,
)
from ..config import (
EnvironmentConfig,
)
from ..executor import (
Delegate,
install_command_requirements,
)
COVERAGE_GROUPS = ('command', 'target', 'environment', 'version')
COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc')
COVERAGE_OUTPUT_FILE_NAME = 'coverage'
def initialize_coverage(args):
"""
:type args: CoverageConfig
:rtype: coverage
"""
if args.delegate:
raise Delegate()
if args.requirements:
install_command_requirements(args)
try:
import coverage
except ImportError:
coverage = None
if not coverage:
raise ApplicationError('You must install the "coverage" python module to use this command.')
return coverage
def run_coverage(args, output_file, command, cmd): # type: (CoverageConfig, str, str, t.List[str]) -> None
"""Run the coverage cli tool with the specified options."""
env = common_environment()
env.update(dict(COVERAGE_FILE=output_file))
cmd = ['python', '-m', 'coverage', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd
intercept_command(args, target_name='coverage', env=env, cmd=cmd, disable_coverage=True)
class CoverageConfig(EnvironmentConfig):
"""Configuration for the coverage command."""
def __init__(self, args):
"""
:type args: any
"""
super(CoverageConfig, self).__init__(args, 'coverage')
self.group_by = frozenset(args.group_by) if 'group_by' in args and args.group_by else set() # type: t.FrozenSet[str]
self.all = args.all if 'all' in args else False # type: bool
self.stub = args.stub if 'stub' in args else False # type: bool
self.coverage = False # temporary work-around to support intercept_command in cover.py

@ -1,68 +1,37 @@
"""Code coverage utilities."""
"""Combine code coverage files."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import os
import re
import time
from xml.etree.ElementTree import (
Comment,
Element,
SubElement,
tostring,
)
from xml.dom import (
minidom,
)
from . import types as t
from .target import (
from ..target import (
walk_module_targets,
walk_compile_targets,
walk_powershell_targets,
)
from .util import (
from ..util import (
display,
ApplicationError,
common_environment,
ANSIBLE_TEST_DATA_ROOT,
to_text,
make_dirs,
)
from .util_common import (
intercept_command,
from ..util_common import (
ResultType,
write_text_test_results,
write_json_test_results,
)
from .config import (
CoverageConfig,
CoverageReportConfig,
)
from .env import (
get_ansible_version,
)
from .executor import (
Delegate,
install_command_requirements,
)
from .data import (
from ..data import (
data_context,
)
COVERAGE_GROUPS = ('command', 'target', 'environment', 'version')
COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc')
COVERAGE_OUTPUT_FILE_NAME = 'coverage'
from . import (
initialize_coverage,
COVERAGE_OUTPUT_FILE_NAME,
COVERAGE_GROUPS,
CoverageConfig,
)
def command_coverage_combine(args):
@ -132,7 +101,7 @@ def _command_coverage_combine_python(args):
display.warning('No arcs found for "%s" in coverage file: %s' % (filename, coverage_file))
continue
filename = _sanitise_filename(filename, modules=modules, collection_search_re=collection_search_re,
filename = _sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re,
collection_sub_re=collection_sub_re)
if not filename:
continue
@ -188,263 +157,6 @@ def _command_coverage_combine_python(args):
return sorted(output_files)
def _get_coverage_targets(args, walk_func):
"""
:type args: CoverageConfig
:type walk_func: Func
:rtype: list[tuple[str, int]]
"""
sources = []
if args.all or args.stub:
# excludes symlinks of regular files to avoid reporting on the same file multiple times
# in the future it would be nice to merge any coverage for symlinks into the real files
for target in walk_func(include_symlinks=False):
target_path = os.path.abspath(target.path)
with open(target_path, 'r') as target_fd:
target_lines = len(target_fd.read().splitlines())
sources.append((target_path, target_lines))
sources.sort()
return sources
def _build_stub_groups(args, sources, default_stub_value):
"""
:type args: CoverageConfig
:type sources: List[tuple[str, int]]
:type default_stub_value: Func[int]
:rtype: dict
"""
groups = {}
if args.stub:
stub_group = []
stub_groups = [stub_group]
stub_line_limit = 500000
stub_line_count = 0
for source, source_line_count in sources:
stub_group.append((source, source_line_count))
stub_line_count += source_line_count
if stub_line_count > stub_line_limit:
stub_line_count = 0
stub_group = []
stub_groups.append(stub_group)
for stub_index, stub_group in enumerate(stub_groups):
if not stub_group:
continue
groups['=stub-%02d' % (stub_index + 1)] = dict((source, default_stub_value(line_count))
for source, line_count in stub_group)
return groups
def _sanitise_filename(filename, modules=None, collection_search_re=None, collection_sub_re=None):
"""
:type filename: str
:type modules: dict | None
:type collection_search_re: Pattern | None
:type collection_sub_re: Pattern | None
:rtype: str | None
"""
ansible_path = os.path.abspath('lib/ansible/') + '/'
root_path = data_context().content.root + '/'
integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep
if modules is None:
modules = {}
if '/ansible_modlib.zip/ansible/' in filename:
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier.
new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif collection_search_re and collection_search_re.search(filename):
new_name = os.path.abspath(collection_sub_re.sub('', filename))
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename):
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later.
new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif '/ansible_module_' in filename:
# Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier.
module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename)
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename):
# Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later.
# AnsiballZ versions using zipimporter will match the `.zip` portion of the regex.
# AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex.
module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$',
'\\g<module>', filename).rstrip('_')
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search('^(/.*?)?/root/ansible/', filename):
# Rewrite the path of code running on a remote host or in a docker container as root.
new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif integration_temp_path in filename:
# Rewrite the path of code running from an integration test temporary directory.
new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
return filename
def command_coverage_report(args):
"""
:type args: CoverageReportConfig
"""
output_files = command_coverage_combine(args)
for output_file in output_files:
if args.group_by or args.stub:
display.info('>>> Coverage Group: %s' % ' '.join(os.path.basename(output_file).split('=')[1:]))
if output_file.endswith('-powershell'):
display.info(_generate_powershell_output_report(args, output_file))
else:
options = []
if args.show_missing:
options.append('--show-missing')
if args.include:
options.extend(['--include', args.include])
if args.omit:
options.extend(['--omit', args.omit])
run_coverage(args, output_file, 'report', options)
def command_coverage_html(args):
"""
:type args: CoverageConfig
"""
output_files = command_coverage_combine(args)
for output_file in output_files:
if output_file.endswith('-powershell'):
# coverage.py does not support non-Python files so we just skip the local html report.
display.info("Skipping output file %s in html generation" % output_file, verbosity=3)
continue
dir_name = os.path.join(ResultType.REPORTS.path, os.path.basename(output_file))
make_dirs(dir_name)
run_coverage(args, output_file, 'html', ['-i', '-d', dir_name])
display.info('HTML report generated: file:///%s' % os.path.join(dir_name, 'index.html'))
def command_coverage_xml(args):
"""
:type args: CoverageConfig
"""
output_files = command_coverage_combine(args)
for output_file in output_files:
xml_name = '%s.xml' % os.path.basename(output_file)
if output_file.endswith('-powershell'):
report = _generage_powershell_xml(output_file)
rough_string = tostring(report, 'utf-8')
reparsed = minidom.parseString(rough_string)
pretty = reparsed.toprettyxml(indent=' ')
write_text_test_results(ResultType.REPORTS, xml_name, pretty)
else:
xml_path = os.path.join(ResultType.REPORTS.path, xml_name)
make_dirs(ResultType.REPORTS.path)
run_coverage(args, output_file, 'xml', ['-i', '-o', xml_path])
def command_coverage_erase(args):
"""
:type args: CoverageConfig
"""
initialize_coverage(args)
coverage_dir = ResultType.COVERAGE.path
for name in os.listdir(coverage_dir):
if not name.startswith('coverage') and '=coverage.' not in name:
continue
path = os.path.join(coverage_dir, name)
if not args.explain:
os.remove(path)
def initialize_coverage(args):
"""
:type args: CoverageConfig
:rtype: coverage
"""
if args.delegate:
raise Delegate()
if args.requirements:
install_command_requirements(args)
try:
import coverage
except ImportError:
coverage = None
if not coverage:
raise ApplicationError('You must install the "coverage" python module to use this command.')
return coverage
def get_coverage_group(args, coverage_file):
"""
:type args: CoverageConfig
:type coverage_file: str
:rtype: str
"""
parts = os.path.basename(coverage_file).split('=', 4)
if len(parts) != 5 or not parts[4].startswith('coverage.'):
return None
names = dict(
command=parts[0],
target=parts[1],
environment=parts[2],
version=parts[3],
)
group = ''
for part in COVERAGE_GROUPS:
if part in args.group_by:
group += '=%s' % names[part]
return group
def _command_coverage_combine_powershell(args):
"""
:type args: CoverageConfig
@ -491,7 +203,7 @@ def _command_coverage_combine_powershell(args):
coverage_data = groups[group]
filename = _sanitise_filename(filename)
filename = _sanitize_filename(filename)
if not filename:
continue
@ -549,228 +261,150 @@ def _command_coverage_combine_powershell(args):
return sorted(output_files)
def _generage_powershell_xml(coverage_file):
def _get_coverage_targets(args, walk_func):
"""
:type coverage_file: str
:rtype: Element
:type args: CoverageConfig
:type walk_func: Func
:rtype: list[tuple[str, int]]
"""
with open(coverage_file, 'rb') as coverage_fd:
coverage_info = json.loads(to_text(coverage_fd.read()))
content_root = data_context().content.root
is_ansible = data_context().content.is_ansible
packages = {}
for path, results in coverage_info.items():
filename = os.path.splitext(os.path.basename(path))[0]
if filename.startswith('Ansible.ModuleUtils'):
package = 'ansible.module_utils'
elif is_ansible:
package = 'ansible.modules'
else:
rel_path = path[len(content_root) + 1:]
plugin_type = "modules" if rel_path.startswith("plugins/modules") else "module_utils"
package = 'ansible_collections.%splugins.%s' % (data_context().content.collection.prefix, plugin_type)
if package not in packages:
packages[package] = {}
packages[package][path] = results
elem_coverage = Element('coverage')
elem_coverage.append(
Comment(' Generated by ansible-test from the Ansible project: https://www.ansible.com/ '))
elem_coverage.append(
Comment(' Based on https://raw.githubusercontent.com/cobertura/web/master/htdocs/xml/coverage-04.dtd '))
elem_sources = SubElement(elem_coverage, 'sources')
elem_source = SubElement(elem_sources, 'source')
elem_source.text = data_context().content.root
elem_packages = SubElement(elem_coverage, 'packages')
sources = []
total_lines_hit = 0
total_line_count = 0
if args.all or args.stub:
# excludes symlinks of regular files to avoid reporting on the same file multiple times
# in the future it would be nice to merge any coverage for symlinks into the real files
for target in walk_func(include_symlinks=False):
target_path = os.path.abspath(target.path)
for package_name, package_data in packages.items():
lines_hit, line_count = _add_cobertura_package(elem_packages, package_name, package_data)
with open(target_path, 'r') as target_fd:
target_lines = len(target_fd.read().splitlines())
total_lines_hit += lines_hit
total_line_count += line_count
sources.append((target_path, target_lines))
elem_coverage.attrib.update({
'branch-rate': '0',
'branches-covered': '0',
'branches-valid': '0',
'complexity': '0',
'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0",
'lines-covered': str(total_line_count),
'lines-valid': str(total_lines_hit),
'timestamp': str(int(time.time())),
'version': get_ansible_version(),
})
sources.sort()
return elem_coverage
return sources
def _add_cobertura_package(packages, package_name, package_data):
def _build_stub_groups(args, sources, default_stub_value):
"""
:type packages: SubElement
:type package_name: str
:type package_data: Dict[str, Dict[str, int]]
:rtype: Tuple[int, int]
:type args: CoverageConfig
:type sources: List[tuple[str, int]]
:type default_stub_value: Func[int]
:rtype: dict
"""
elem_package = SubElement(packages, 'package')
elem_classes = SubElement(elem_package, 'classes')
total_lines_hit = 0
total_line_count = 0
for path, results in package_data.items():
lines_hit = len([True for hits in results.values() if hits])
line_count = len(results)
total_lines_hit += lines_hit
total_line_count += line_count
elem_class = SubElement(elem_classes, 'class')
class_name = os.path.splitext(os.path.basename(path))[0]
if class_name.startswith("Ansible.ModuleUtils"):
class_name = class_name[20:]
content_root = data_context().content.root
filename = path
if filename.startswith(content_root):
filename = filename[len(content_root) + 1:]
groups = {}
elem_class.attrib.update({
'branch-rate': '0',
'complexity': '0',
'filename': filename,
'line-rate': str(round(lines_hit / line_count, 4)) if line_count else "0",
'name': class_name,
})
if args.stub:
stub_group = []
stub_groups = [stub_group]
stub_line_limit = 500000
stub_line_count = 0
SubElement(elem_class, 'methods')
for source, source_line_count in sources:
stub_group.append((source, source_line_count))
stub_line_count += source_line_count
elem_lines = SubElement(elem_class, 'lines')
if stub_line_count > stub_line_limit:
stub_line_count = 0
stub_group = []
stub_groups.append(stub_group)
for number, hits in results.items():
elem_line = SubElement(elem_lines, 'line')
elem_line.attrib.update(
hits=str(hits),
number=str(number),
)
for stub_index, stub_group in enumerate(stub_groups):
if not stub_group:
continue
elem_package.attrib.update({
'branch-rate': '0',
'complexity': '0',
'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0",
'name': package_name,
})
groups['=stub-%02d' % (stub_index + 1)] = dict((source, default_stub_value(line_count))
for source, line_count in stub_group)
return total_lines_hit, total_line_count
return groups
def _generate_powershell_output_report(args, coverage_file):
def get_coverage_group(args, coverage_file):
"""
:type args: CoverageReportConfig
:type args: CoverageConfig
:type coverage_file: str
:rtype: str
"""
with open(coverage_file, 'rb') as coverage_fd:
coverage_info = json.loads(to_text(coverage_fd.read()))
root_path = data_context().content.root + '/'
name_padding = 7
cover_padding = 8
file_report = []
total_stmts = 0
total_miss = 0
for filename in sorted(coverage_info.keys()):
hit_info = coverage_info[filename]
if filename.startswith(root_path):
filename = filename[len(root_path):]
if args.omit and filename in args.omit:
continue
if args.include and filename not in args.include:
continue
stmts = len(hit_info)
miss = len([c for c in hit_info.values() if c == 0])
name_padding = max(name_padding, len(filename) + 3)
total_stmts += stmts
total_miss += miss
cover = "{0}%".format(int((stmts - miss) / stmts * 100))
missing = []
current_missing = None
sorted_lines = sorted([int(x) for x in hit_info.keys()])
for idx, line in enumerate(sorted_lines):
hit = hit_info[str(line)]
if hit == 0 and current_missing is None:
current_missing = line
elif hit != 0 and current_missing is not None:
end_line = sorted_lines[idx - 1]
if current_missing == end_line:
missing.append(str(current_missing))
else:
missing.append('%s-%s' % (current_missing, end_line))
current_missing = None
if current_missing is not None:
end_line = sorted_lines[-1]
if current_missing == end_line:
missing.append(str(current_missing))
else:
missing.append('%s-%s' % (current_missing, end_line))
file_report.append({'name': filename, 'stmts': stmts, 'miss': miss, 'cover': cover, 'missing': missing})
if total_stmts == 0:
return ''
parts = os.path.basename(coverage_file).split('=', 4)
total_percent = '{0}%'.format(int((total_stmts - total_miss) / total_stmts * 100))
stmts_padding = max(8, len(str(total_stmts)))
miss_padding = max(7, len(str(total_miss)))
if len(parts) != 5 or not parts[4].startswith('coverage.'):
return None
line_length = name_padding + stmts_padding + miss_padding + cover_padding
names = dict(
command=parts[0],
target=parts[1],
environment=parts[2],
version=parts[3],
)
header = 'Name'.ljust(name_padding) + 'Stmts'.rjust(stmts_padding) + 'Miss'.rjust(miss_padding) + \
'Cover'.rjust(cover_padding)
group = ''
if args.show_missing:
header += 'Lines Missing'.rjust(16)
line_length += 16
for part in COVERAGE_GROUPS:
if part in args.group_by:
group += '=%s' % names[part]
line_break = '-' * line_length
lines = ['%s%s%s%s%s' % (f['name'].ljust(name_padding), str(f['stmts']).rjust(stmts_padding),
str(f['miss']).rjust(miss_padding), f['cover'].rjust(cover_padding),
' ' + ', '.join(f['missing']) if args.show_missing else '')
for f in file_report]
totals = 'TOTAL'.ljust(name_padding) + str(total_stmts).rjust(stmts_padding) + \
str(total_miss).rjust(miss_padding) + total_percent.rjust(cover_padding)
return group
report = '{0}\n{1}\n{2}\n{1}\n{3}'.format(header, line_break, "\n".join(lines), totals)
return report
def _sanitize_filename(filename, modules=None, collection_search_re=None, collection_sub_re=None):
"""
:type filename: str
:type modules: dict | None
:type collection_search_re: Pattern | None
:type collection_sub_re: Pattern | None
:rtype: str | None
"""
ansible_path = os.path.abspath('lib/ansible/') + '/'
root_path = data_context().content.root + '/'
integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep
def run_coverage(args, output_file, command, cmd): # type: (CoverageConfig, str, str, t.List[str]) -> None
"""Run the coverage cli tool with the specified options."""
env = common_environment()
env.update(dict(COVERAGE_FILE=output_file))
if modules is None:
modules = {}
cmd = ['python', '-m', 'coverage', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd
if '/ansible_modlib.zip/ansible/' in filename:
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier.
new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif collection_search_re and collection_search_re.search(filename):
new_name = os.path.abspath(collection_sub_re.sub('', filename))
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename):
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later.
new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif '/ansible_module_' in filename:
# Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier.
module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename)
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename):
# Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later.
# AnsiballZ versions using zipimporter will match the `.zip` portion of the regex.
# AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex.
module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$',
'\\g<module>', filename).rstrip('_')
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search('^(/.*?)?/root/ansible/', filename):
# Rewrite the path of code running on a remote host or in a docker container as root.
new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif integration_temp_path in filename:
# Rewrite the path of code running from an integration test temporary directory.
new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
intercept_command(args, target_name='coverage', env=env, cmd=cmd, disable_coverage=True)
return filename

@ -0,0 +1,32 @@
"""Erase code coverage files."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ..util_common import (
ResultType,
)
from . import (
initialize_coverage,
CoverageConfig,
)
def command_coverage_erase(args):
"""
:type args: CoverageConfig
"""
initialize_coverage(args)
coverage_dir = ResultType.COVERAGE.path
for name in os.listdir(coverage_dir):
if not name.startswith('coverage') and '=coverage.' not in name:
continue
path = os.path.join(coverage_dir, name)
if not args.explain:
os.remove(path)

@ -0,0 +1,42 @@
"""Generate HTML code coverage reports."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ..util import (
display,
make_dirs,
)
from ..util_common import (
ResultType,
)
from .combine import (
command_coverage_combine,
)
from . import (
run_coverage,
CoverageConfig,
)
def command_coverage_html(args):
"""
:type args: CoverageConfig
"""
output_files = command_coverage_combine(args)
for output_file in output_files:
if output_file.endswith('-powershell'):
# coverage.py does not support non-Python files so we just skip the local html report.
display.info("Skipping output file %s in html generation" % output_file, verbosity=3)
continue
dir_name = os.path.join(ResultType.REPORTS.path, os.path.basename(output_file))
make_dirs(dir_name)
run_coverage(args, output_file, 'html', ['-i', '-d', dir_name])
display.info('HTML report generated: file:///%s' % os.path.join(dir_name, 'index.html'))

@ -0,0 +1,155 @@
"""Generate console code coverage reports."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import os
from ..util import (
display,
to_text,
)
from ..data import (
data_context,
)
from .combine import (
command_coverage_combine,
)
from . import (
run_coverage,
CoverageConfig,
)
def command_coverage_report(args):
"""
:type args: CoverageReportConfig
"""
output_files = command_coverage_combine(args)
for output_file in output_files:
if args.group_by or args.stub:
display.info('>>> Coverage Group: %s' % ' '.join(os.path.basename(output_file).split('=')[1:]))
if output_file.endswith('-powershell'):
display.info(_generate_powershell_output_report(args, output_file))
else:
options = []
if args.show_missing:
options.append('--show-missing')
if args.include:
options.extend(['--include', args.include])
if args.omit:
options.extend(['--omit', args.omit])
run_coverage(args, output_file, 'report', options)
def _generate_powershell_output_report(args, coverage_file):
"""
:type args: CoverageReportConfig
:type coverage_file: str
:rtype: str
"""
with open(coverage_file, 'rb') as coverage_fd:
coverage_info = json.loads(to_text(coverage_fd.read()))
root_path = data_context().content.root + '/'
name_padding = 7
cover_padding = 8
file_report = []
total_stmts = 0
total_miss = 0
for filename in sorted(coverage_info.keys()):
hit_info = coverage_info[filename]
if filename.startswith(root_path):
filename = filename[len(root_path):]
if args.omit and filename in args.omit:
continue
if args.include and filename not in args.include:
continue
stmts = len(hit_info)
miss = len([c for c in hit_info.values() if c == 0])
name_padding = max(name_padding, len(filename) + 3)
total_stmts += stmts
total_miss += miss
cover = "{0}%".format(int((stmts - miss) / stmts * 100))
missing = []
current_missing = None
sorted_lines = sorted([int(x) for x in hit_info.keys()])
for idx, line in enumerate(sorted_lines):
hit = hit_info[str(line)]
if hit == 0 and current_missing is None:
current_missing = line
elif hit != 0 and current_missing is not None:
end_line = sorted_lines[idx - 1]
if current_missing == end_line:
missing.append(str(current_missing))
else:
missing.append('%s-%s' % (current_missing, end_line))
current_missing = None
if current_missing is not None:
end_line = sorted_lines[-1]
if current_missing == end_line:
missing.append(str(current_missing))
else:
missing.append('%s-%s' % (current_missing, end_line))
file_report.append({'name': filename, 'stmts': stmts, 'miss': miss, 'cover': cover, 'missing': missing})
if total_stmts == 0:
return ''
total_percent = '{0}%'.format(int((total_stmts - total_miss) / total_stmts * 100))
stmts_padding = max(8, len(str(total_stmts)))
miss_padding = max(7, len(str(total_miss)))
line_length = name_padding + stmts_padding + miss_padding + cover_padding
header = 'Name'.ljust(name_padding) + 'Stmts'.rjust(stmts_padding) + 'Miss'.rjust(miss_padding) + \
'Cover'.rjust(cover_padding)
if args.show_missing:
header += 'Lines Missing'.rjust(16)
line_length += 16
line_break = '-' * line_length
lines = ['%s%s%s%s%s' % (f['name'].ljust(name_padding), str(f['stmts']).rjust(stmts_padding),
str(f['miss']).rjust(miss_padding), f['cover'].rjust(cover_padding),
' ' + ', '.join(f['missing']) if args.show_missing else '')
for f in file_report]
totals = 'TOTAL'.ljust(name_padding) + str(total_stmts).rjust(stmts_padding) + \
str(total_miss).rjust(miss_padding) + total_percent.rjust(cover_padding)
report = '{0}\n{1}\n{2}\n{1}\n{3}'.format(header, line_break, "\n".join(lines), totals)
return report
class CoverageReportConfig(CoverageConfig):
"""Configuration for the coverage report command."""
def __init__(self, args):
"""
:type args: any
"""
super(CoverageReportConfig, self).__init__(args)
self.show_missing = args.show_missing # type: bool
self.include = args.include # type: str
self.omit = args.omit # type: str

@ -0,0 +1,193 @@
"""Generate XML code coverage reports."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import os
import time
from xml.etree.ElementTree import (
Comment,
Element,
SubElement,
tostring,
)
from xml.dom import (
minidom,
)
from ..util import (
to_text,
make_dirs,
)
from ..util_common import (
ResultType,
write_text_test_results,
)
from ..env import (
get_ansible_version,
)
from ..data import (
data_context,
)
from .combine import (
command_coverage_combine,
)
from . import (
run_coverage,
CoverageConfig,
)
def command_coverage_xml(args):
"""
:type args: CoverageConfig
"""
output_files = command_coverage_combine(args)
for output_file in output_files:
xml_name = '%s.xml' % os.path.basename(output_file)
if output_file.endswith('-powershell'):
report = _generate_powershell_xml(output_file)
rough_string = tostring(report, 'utf-8')
reparsed = minidom.parseString(rough_string)
pretty = reparsed.toprettyxml(indent=' ')
write_text_test_results(ResultType.REPORTS, xml_name, pretty)
else:
xml_path = os.path.join(ResultType.REPORTS.path, xml_name)
make_dirs(ResultType.REPORTS.path)
run_coverage(args, output_file, 'xml', ['-i', '-o', xml_path])
def _generate_powershell_xml(coverage_file):
"""
:type coverage_file: str
:rtype: Element
"""
with open(coverage_file, 'rb') as coverage_fd:
coverage_info = json.loads(to_text(coverage_fd.read()))
content_root = data_context().content.root
is_ansible = data_context().content.is_ansible
packages = {}
for path, results in coverage_info.items():
filename = os.path.splitext(os.path.basename(path))[0]
if filename.startswith('Ansible.ModuleUtils'):
package = 'ansible.module_utils'
elif is_ansible:
package = 'ansible.modules'
else:
rel_path = path[len(content_root) + 1:]
plugin_type = "modules" if rel_path.startswith("plugins/modules") else "module_utils"
package = 'ansible_collections.%splugins.%s' % (data_context().content.collection.prefix, plugin_type)
if package not in packages:
packages[package] = {}
packages[package][path] = results
elem_coverage = Element('coverage')
elem_coverage.append(
Comment(' Generated by ansible-test from the Ansible project: https://www.ansible.com/ '))
elem_coverage.append(
Comment(' Based on https://raw.githubusercontent.com/cobertura/web/master/htdocs/xml/coverage-04.dtd '))
elem_sources = SubElement(elem_coverage, 'sources')
elem_source = SubElement(elem_sources, 'source')
elem_source.text = data_context().content.root
elem_packages = SubElement(elem_coverage, 'packages')
total_lines_hit = 0
total_line_count = 0
for package_name, package_data in packages.items():
lines_hit, line_count = _add_cobertura_package(elem_packages, package_name, package_data)
total_lines_hit += lines_hit
total_line_count += line_count
elem_coverage.attrib.update({
'branch-rate': '0',
'branches-covered': '0',
'branches-valid': '0',
'complexity': '0',
'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0",
'lines-covered': str(total_line_count),
'lines-valid': str(total_lines_hit),
'timestamp': str(int(time.time())),
'version': get_ansible_version(),
})
return elem_coverage
def _add_cobertura_package(packages, package_name, package_data):
"""
:type packages: SubElement
:type package_name: str
:type package_data: Dict[str, Dict[str, int]]
:rtype: Tuple[int, int]
"""
elem_package = SubElement(packages, 'package')
elem_classes = SubElement(elem_package, 'classes')
total_lines_hit = 0
total_line_count = 0
for path, results in package_data.items():
lines_hit = len([True for hits in results.values() if hits])
line_count = len(results)
total_lines_hit += lines_hit
total_line_count += line_count
elem_class = SubElement(elem_classes, 'class')
class_name = os.path.splitext(os.path.basename(path))[0]
if class_name.startswith("Ansible.ModuleUtils"):
class_name = class_name[20:]
content_root = data_context().content.root
filename = path
if filename.startswith(content_root):
filename = filename[len(content_root) + 1:]
elem_class.attrib.update({
'branch-rate': '0',
'complexity': '0',
'filename': filename,
'line-rate': str(round(lines_hit / line_count, 4)) if line_count else "0",
'name': class_name,
})
SubElement(elem_class, 'methods')
elem_lines = SubElement(elem_class, 'lines')
for number, hits in results.items():
elem_line = SubElement(elem_lines, 'line')
elem_line.attrib.update(
hits=str(hits),
number=str(number),
)
elem_package.attrib.update({
'branch-rate': '0',
'complexity': '0',
'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0",
'name': package_name,
})
return total_lines_hit, total_line_count
Loading…
Cancel
Save