Fix ansible-test coverage analysis option usage. (#68406)

* Fix ansible-test coverage analysis option usage.

The `--input-dir` option for `coverage analyze targets generate` was being ignored.

No changelog entry since this feature has not yet been released.

* Move coverage config to fix type annotations.

Declaring the types before referencing them makes sure they're recognized by tools such as PyCharm.
pull/68236/head
Matt Clay 4 years ago committed by GitHub
parent d049888a92
commit 9765a80bd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -52,6 +52,17 @@ COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc')
COVERAGE_OUTPUT_FILE_NAME = 'coverage' COVERAGE_OUTPUT_FILE_NAME = 'coverage'
class CoverageConfig(EnvironmentConfig):
"""Configuration for the coverage command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageConfig, self).__init__(args, 'coverage')
self.group_by = frozenset(args.group_by) if 'group_by' in args and args.group_by else set() # type: t.FrozenSet[str]
self.all = args.all if 'all' in args else False # type: bool
self.stub = args.stub if 'stub' in args else False # type: bool
self.coverage = False # temporary work-around to support intercept_command in cover.py
def initialize_coverage(args): # type: (CoverageConfig) -> coverage_module def initialize_coverage(args): # type: (CoverageConfig) -> coverage_module
"""Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available.""" """Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available."""
if args.delegate: if args.delegate:
@ -81,19 +92,19 @@ def run_coverage(args, output_file, command, cmd): # type: (CoverageConfig, str
intercept_command(args, target_name='coverage', env=env, cmd=cmd, disable_coverage=True) intercept_command(args, target_name='coverage', env=env, cmd=cmd, disable_coverage=True)
def get_python_coverage_files(): # type: () -> t.List[str] def get_python_coverage_files(path=None): # type: (t.Optional[str]) -> t.List[str]
"""Return the list of Python coverage file paths.""" """Return the list of Python coverage file paths."""
return get_coverage_files('python') return get_coverage_files('python', path)
def get_powershell_coverage_files(): # type: () -> t.List[str] def get_powershell_coverage_files(path=None): # type: (t.Optional[str]) -> t.List[str]
"""Return the list of PowerShell coverage file paths.""" """Return the list of PowerShell coverage file paths."""
return get_coverage_files('powershell') return get_coverage_files('powershell', path)
def get_coverage_files(language): # type: (str) -> t.List[str] def get_coverage_files(language, path=None): # type: (str, t.Optional[str]) -> t.List[str]
"""Return the list of coverage file paths for the given language.""" """Return the list of coverage file paths for the given language."""
coverage_dir = ResultType.COVERAGE.path coverage_dir = path or ResultType.COVERAGE.path
coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir) coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir)
if '=coverage.' in f and '=%s' % language in f] if '=coverage.' in f and '=%s' % language in f]
@ -247,17 +258,6 @@ def sanitize_filename(
return filename return filename
class CoverageConfig(EnvironmentConfig):
"""Configuration for the coverage command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageConfig, self).__init__(args, 'coverage')
self.group_by = frozenset(args.group_by) if 'group_by' in args and args.group_by else set() # type: t.FrozenSet[str]
self.all = args.all if 'all' in args else False # type: bool
self.stub = args.stub if 'stub' in args else False # type: bool
self.coverage = False # temporary work-around to support intercept_command in cover.py
class PathChecker: class PathChecker:
"""Checks code coverage paths to verify they are valid and reports on the findings.""" """Checks code coverage paths to verify they are valid and reports on the findings."""
def __init__(self, args, collection_search_re=None): # type: (CoverageConfig, t.Optional[t.Pattern]) -> None def __init__(self, args, collection_search_re=None): # type: (CoverageConfig, t.Optional[t.Pattern]) -> None

@ -30,6 +30,14 @@ if t.TYPE_CHECKING:
TargetSetIndexes = t.Dict[t.FrozenSet[int], int] TargetSetIndexes = t.Dict[t.FrozenSet[int], int]
class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig):
"""Configuration for the `coverage analyze targets` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsConfig, self).__init__(args)
self.info_stderr = True
def make_report(target_indexes, arcs, lines): # type: (TargetIndexes, Arcs, Lines) -> t.Dict[str, t.Any] def make_report(target_indexes, arcs, lines): # type: (TargetIndexes, Arcs, Lines) -> t.Dict[str, t.Any]
"""Condense target indexes, arcs and lines into a compact report.""" """Condense target indexes, arcs and lines into a compact report."""
set_indexes = {} set_indexes = {}
@ -144,11 +152,3 @@ def generate_indexes(target_indexes, data): # type: (TargetIndexes, NamedPoints
result_point.add(get_target_index(target_name, target_indexes)) result_point.add(get_target_index(target_name, target_indexes))
return results return results
class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig):
"""Configuration for the `coverage analyze targets` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsConfig, self).__init__(args)
self.info_stderr = True

@ -21,6 +21,15 @@ if t.TYPE_CHECKING:
) )
class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets combine` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsCombineConfig, self).__init__(args)
self.input_files = args.input_file # type: t.List[str]
self.output_file = args.output_file # type: str
def command_coverage_analyze_targets_combine(args): # type: (CoverageAnalyzeTargetsCombineConfig) -> None def command_coverage_analyze_targets_combine(args): # type: (CoverageAnalyzeTargetsCombineConfig) -> None
"""Combine integration test target code coverage reports.""" """Combine integration test target code coverage reports."""
combined_target_indexes = {} # type: TargetIndexes combined_target_indexes = {} # type: TargetIndexes
@ -53,12 +62,3 @@ def merge_indexes(
for covered_target_index in covered_target_indexes: for covered_target_index in covered_target_indexes:
combined_point.add(get_target_index(source_index[covered_target_index], combined_index)) combined_point.add(get_target_index(source_index[covered_target_index], combined_index))
class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets combine` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsCombineConfig, self).__init__(args)
self.input_files = args.input_file # type: t.List[str]
self.output_file = args.output_file # type: str

@ -17,6 +17,15 @@ from . import (
) )
class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets expand` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsExpandConfig, self).__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str
def command_coverage_analyze_targets_expand(args): # type: (CoverageAnalyzeTargetsExpandConfig) -> None def command_coverage_analyze_targets_expand(args): # type: (CoverageAnalyzeTargetsExpandConfig) -> None
"""Expand target names in an aggregated coverage file.""" """Expand target names in an aggregated coverage file."""
covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file)
@ -28,12 +37,3 @@ def command_coverage_analyze_targets_expand(args): # type: (CoverageAnalyzeTarg
if not args.explain: if not args.explain:
write_json_file(args.output_file, report, encoder=SortedSetEncoder) write_json_file(args.output_file, report, encoder=SortedSetEncoder)
class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets expand` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsExpandConfig, self).__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str

@ -22,6 +22,19 @@ if t.TYPE_CHECKING:
) )
class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets filter` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsFilterConfig, self).__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str
self.include_targets = args.include_targets # type: t.List[str]
self.exclude_targets = args.exclude_targets # type: t.List[str]
self.include_path = args.include_path # type: t.Optional[str]
self.exclude_path = args.exclude_path # type: t.Optional[str]
def command_coverage_analyze_targets_filter(args): # type: (CoverageAnalyzeTargetsFilterConfig) -> None def command_coverage_analyze_targets_filter(args): # type: (CoverageAnalyzeTargetsFilterConfig) -> None
"""Filter target names in an aggregated coverage file.""" """Filter target names in an aggregated coverage file."""
covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file)
@ -89,16 +102,3 @@ def filter_data(
result[src_path] = dst_points result[src_path] = dst_points
return result return result
class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets filter` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsFilterConfig, self).__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str
self.include_targets = args.include_targets # type: t.List[str]
self.exclude_targets = args.exclude_targets # type: t.List[str]
self.include_path = args.include_path # type: t.Optional[str]
self.exclude_path = args.exclude_path # type: t.Optional[str]

@ -44,25 +44,35 @@ if t.TYPE_CHECKING:
) )
class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets generate` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsGenerateConfig, self).__init__(args)
self.input_dir = args.input_dir or ResultType.COVERAGE.path # type: str
self.output_file = args.output_file # type: str
def command_coverage_analyze_targets_generate(args): # type: (CoverageAnalyzeTargetsGenerateConfig) -> None def command_coverage_analyze_targets_generate(args): # type: (CoverageAnalyzeTargetsGenerateConfig) -> None
"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line.""" """Analyze code coverage data to determine which integration test targets provide coverage for each arc or line."""
root = data_context().content.root root = data_context().content.root
target_indexes = {} target_indexes = {}
arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, target_indexes).items()) arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, args.input_dir, target_indexes).items())
lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, target_indexes).items()) lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, args.input_dir, target_indexes).items())
report = make_report(target_indexes, arcs, lines) report = make_report(target_indexes, arcs, lines)
write_report(args, report, args.output_file) write_report(args, report, args.output_file)
def analyze_python_coverage( def analyze_python_coverage(
args, # type: CoverageAnalyzeTargetsConfig args, # type: CoverageAnalyzeTargetsGenerateConfig
path, # type: str
target_indexes, # type: TargetIndexes target_indexes, # type: TargetIndexes
): # type: (...) -> Arcs ): # type: (...) -> Arcs
"""Analyze Python code coverage.""" """Analyze Python code coverage."""
results = {} # type: Arcs results = {} # type: Arcs
collection_search_re, collection_sub_re = get_collection_path_regexes() collection_search_re, collection_sub_re = get_collection_path_regexes()
modules = get_python_modules() modules = get_python_modules()
python_files = get_python_coverage_files() python_files = get_python_coverage_files(path)
coverage = initialize_coverage(args) coverage = initialize_coverage(args)
for python_file in python_files: for python_file in python_files:
@ -85,13 +95,14 @@ def analyze_python_coverage(
def analyze_powershell_coverage( def analyze_powershell_coverage(
args, # type: CoverageAnalyzeTargetsConfig args, # type: CoverageAnalyzeTargetsGenerateConfig
path, # type: str
target_indexes, # type: TargetIndexes target_indexes, # type: TargetIndexes
): # type: (...) -> Lines ): # type: (...) -> Lines
"""Analyze PowerShell code coverage""" """Analyze PowerShell code coverage"""
results = {} # type: Lines results = {} # type: Lines
collection_search_re, collection_sub_re = get_collection_path_regexes() collection_search_re, collection_sub_re = get_collection_path_regexes()
powershell_files = get_powershell_coverage_files() powershell_files = get_powershell_coverage_files(path)
for powershell_file in powershell_files: for powershell_file in powershell_files:
if not is_integration_coverage_file(powershell_file): if not is_integration_coverage_file(powershell_file):
@ -113,7 +124,7 @@ def analyze_powershell_coverage(
def prune_invalid_filenames( def prune_invalid_filenames(
args, # type: CoverageAnalyzeTargetsConfig args, # type: CoverageAnalyzeTargetsGenerateConfig
results, # type: t.Dict[str, t.Any] results, # type: t.Dict[str, t.Any]
collection_search_re=None, # type: t.Optional[str] collection_search_re=None, # type: t.Optional[str]
): # type: (...) -> None ): # type: (...) -> None
@ -133,12 +144,3 @@ def get_target_name(path): # type: (str) -> str
def is_integration_coverage_file(path): # type: (str) -> bool def is_integration_coverage_file(path): # type: (str) -> bool
"""Returns True if the coverage file came from integration tests, otherwise False.""" """Returns True if the coverage file came from integration tests, otherwise False."""
return os.path.basename(path).split('=')[0] in ('integration', 'windows-integration', 'network-integration') return os.path.basename(path).split('=')[0] in ('integration', 'windows-integration', 'network-integration')
class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets generate` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsGenerateConfig, self).__init__(args)
self.input_dir = args.input_dir or ResultType.COVERAGE.path # type: str
self.output_file = args.output_file # type: str

@ -25,6 +25,19 @@ if t.TYPE_CHECKING:
) )
class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets missing` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsMissingConfig, self).__init__(args)
self.from_file = args.from_file # type: str
self.to_file = args.to_file # type: str
self.output_file = args.output_file # type: str
self.only_gaps = args.only_gaps # type: bool
self.only_exists = args.only_exists # type: bool
def command_coverage_analyze_targets_missing(args): # type: (CoverageAnalyzeTargetsMissingConfig) -> None def command_coverage_analyze_targets_missing(args): # type: (CoverageAnalyzeTargetsMissingConfig) -> None
"""Identify aggregated coverage in one file missing from another.""" """Identify aggregated coverage in one file missing from another."""
from_targets, from_path_arcs, from_path_lines = read_report(args.from_file) from_targets, from_path_arcs, from_path_lines = read_report(args.from_file)
@ -94,16 +107,3 @@ def find_missing(
target_index.update(get_target_index(name, target_indexes) for name in remaining_targets) target_index.update(get_target_index(name, target_indexes) for name in remaining_targets)
return target_data return target_data
class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets missing` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsMissingConfig, self).__init__(args)
self.from_file = args.from_file # type: str
self.to_file = args.to_file # type: str
self.output_file = args.output_file # type: str
self.only_gaps = args.only_gaps # type: bool
self.only_exists = args.only_exists # type: bool

Loading…
Cancel
Save