You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/runner/lib/target.py

675 lines
21 KiB
Python

"""Test target identification, iteration and inclusion/exclusion."""
from __future__ import absolute_import, print_function
import collections
import os
import re
import errno
import itertools
import abc
import sys
from lib.util import (
ApplicationError,
display,
read_lines_without_comments,
)
MODULE_EXTENSIONS = '.py', '.ps1'
def find_target_completion(target_func, prefix):
"""
:type target_func: () -> collections.Iterable[CompletionTarget]
:type prefix: unicode
:rtype: list[str]
"""
try:
targets = target_func()
if sys.version_info[0] == 2:
prefix = prefix.encode()
short = os.environ.get('COMP_TYPE') == '63' # double tab completion from bash
matches = walk_completion_targets(targets, prefix, short)
return matches
except Exception as ex: # pylint: disable=locally-disabled, broad-except
return [u'%s' % ex]
def walk_completion_targets(targets, prefix, short=False):
"""
:type targets: collections.Iterable[CompletionTarget]
:type prefix: str
:type short: bool
:rtype: tuple[str]
"""
aliases = set(alias for target in targets for alias in target.aliases)
if prefix.endswith('/') and prefix in aliases:
aliases.remove(prefix)
matches = [alias for alias in aliases if alias.startswith(prefix) and '/' not in alias[len(prefix):-1]]
if short:
offset = len(os.path.dirname(prefix))
if offset:
offset += 1
relative_matches = [match[offset:] for match in matches if len(match) > offset]
if len(relative_matches) > 1:
matches = relative_matches
return tuple(sorted(matches))
def walk_internal_targets(targets, includes=None, excludes=None, requires=None):
"""
:type targets: collections.Iterable[T <= CompletionTarget]
:type includes: list[str]
:type excludes: list[str]
:type requires: list[str]
:rtype: tuple[T <= CompletionTarget]
"""
targets = tuple(targets)
include_targets = sorted(filter_targets(targets, includes, errors=True, directories=False), key=lambda t: t.name)
if requires:
require_targets = set(filter_targets(targets, requires, errors=True, directories=False))
include_targets = [target for target in include_targets if target in require_targets]
if excludes:
list(filter_targets(targets, excludes, errors=True, include=False, directories=False))
internal_targets = set(filter_targets(include_targets, excludes, errors=False, include=False, directories=False))
return tuple(sorted(internal_targets, key=lambda t: t.name))
def walk_external_targets(targets, includes=None, excludes=None, requires=None):
"""
:type targets: collections.Iterable[CompletionTarget]
:type includes: list[str]
:type excludes: list[str]
:type requires: list[str]
:rtype: tuple[CompletionTarget], tuple[CompletionTarget]
"""
targets = tuple(targets)
if requires:
include_targets = list(filter_targets(targets, includes, errors=True, directories=False))
require_targets = set(filter_targets(targets, requires, errors=True, directories=False))
includes = [target.name for target in include_targets if target in require_targets]
if includes:
include_targets = sorted(filter_targets(targets, includes, errors=True), key=lambda t: t.name)
else:
include_targets = []
else:
include_targets = sorted(filter_targets(targets, includes, errors=True), key=lambda t: t.name)
if excludes:
exclude_targets = sorted(filter_targets(targets, excludes, errors=True), key=lambda t: t.name)
else:
exclude_targets = []
previous = None
include = []
for target in include_targets:
if isinstance(previous, DirectoryTarget) and isinstance(target, DirectoryTarget) \
and previous.name == target.name:
previous.modules = tuple(set(previous.modules) | set(target.modules))
else:
include.append(target)
previous = target
previous = None
exclude = []
for target in exclude_targets:
if isinstance(previous, DirectoryTarget) and isinstance(target, DirectoryTarget) \
and previous.name == target.name:
previous.modules = tuple(set(previous.modules) | set(target.modules))
else:
exclude.append(target)
previous = target
return tuple(include), tuple(exclude)
def filter_targets(targets, patterns, include=True, directories=True, errors=True):
"""
:type targets: collections.Iterable[CompletionTarget]
:type patterns: list[str]
:type include: bool
:type directories: bool
:type errors: bool
:rtype: collections.Iterable[CompletionTarget]
"""
unmatched = set(patterns or ())
compiled_patterns = dict((p, re.compile('^%s$' % p)) for p in patterns) if patterns else None
for target in targets:
matched_directories = set()
match = False
if patterns:
for alias in target.aliases:
for pattern in patterns:
if compiled_patterns[pattern].match(alias):
match = True
try:
unmatched.remove(pattern)
except KeyError:
pass
if alias.endswith('/'):
if target.base_path and len(target.base_path) > len(alias):
matched_directories.add(target.base_path)
else:
matched_directories.add(alias)
elif include:
match = True
if not target.base_path:
matched_directories.add('.')
for alias in target.aliases:
if alias.endswith('/'):
if target.base_path and len(target.base_path) > len(alias):
matched_directories.add(target.base_path)
else:
matched_directories.add(alias)
if match != include:
continue
if directories and matched_directories:
yield DirectoryTarget(sorted(matched_directories, key=len)[0], target.modules)
else:
yield target
if errors:
if unmatched:
raise TargetPatternsNotMatched(unmatched)
def walk_module_targets():
"""
:rtype: collections.Iterable[TestTarget]
"""
path = 'lib/ansible/modules'
for target in walk_test_targets(path, path + '/', extensions=MODULE_EXTENSIONS):
if not target.module:
continue
yield target
def walk_units_targets():
"""
:rtype: collections.Iterable[TestTarget]
"""
return walk_test_targets(path='test/units', module_path='test/units/modules/', extensions=('.py',), prefix='test_')
def walk_compile_targets():
"""
:rtype: collections.Iterable[TestTarget]
"""
return walk_test_targets(module_path='lib/ansible/modules/', extensions=('.py',), extra_dirs=('bin',))
def walk_sanity_targets():
"""
:rtype: collections.Iterable[TestTarget]
"""
return walk_test_targets(module_path='lib/ansible/modules/')
def walk_posix_integration_targets(include_hidden=False):
"""
:type include_hidden: bool
:rtype: collections.Iterable[IntegrationTarget]
"""
for target in walk_integration_targets():
if 'posix/' in target.aliases or (include_hidden and 'hidden/posix/' in target.aliases):
yield target
def walk_network_integration_targets(include_hidden=False):
"""
:type include_hidden: bool
:rtype: collections.Iterable[IntegrationTarget]
"""
for target in walk_integration_targets():
if 'network/' in target.aliases or (include_hidden and 'hidden/network/' in target.aliases):
yield target
def walk_windows_integration_targets(include_hidden=False):
"""
:type include_hidden: bool
:rtype: collections.Iterable[IntegrationTarget]
"""
for target in walk_integration_targets():
if 'windows/' in target.aliases or (include_hidden and 'hidden/windows/' in target.aliases):
yield target
def walk_integration_targets():
"""
:rtype: collections.Iterable[IntegrationTarget]
"""
path = 'test/integration/targets'
modules = frozenset(t.module for t in walk_module_targets())
paths = sorted(os.path.join(path, p) for p in os.listdir(path))
prefixes = load_integration_prefixes()
for path in paths:
if os.path.isdir(path):
yield IntegrationTarget(path, modules, prefixes)
def load_integration_prefixes():
"""
:rtype: dict[str, str]
"""
path = 'test/integration'
names = sorted(f for f in os.listdir(path) if os.path.splitext(f)[0] == 'target-prefixes')
prefixes = {}
for name in names:
prefix = os.path.splitext(name)[1][1:]
with open(os.path.join(path, name), 'r') as prefix_fd:
prefixes.update(dict((k, prefix) for k in prefix_fd.read().splitlines()))
return prefixes
def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None, extra_dirs=None):
"""
:type path: str | None
:type module_path: str | None
:type extensions: tuple[str] | None
:type prefix: str | None
:type extra_dirs: tuple[str] | None
:rtype: collections.Iterable[TestTarget]
"""
for root, _, file_names in os.walk(path or '.', topdown=False):
if root.endswith('/__pycache__'):
continue
if '/.tox/' in root:
continue
if path is None:
root = root[2:]
if root.startswith('.') and root != '.github':
continue
for file_name in file_names:
name, ext = os.path.splitext(os.path.basename(file_name))
if name.startswith('.'):
continue
if extensions and ext not in extensions:
continue
if prefix and not name.startswith(prefix):
continue
file_path = os.path.join(root, file_name)
if os.path.islink(file_path):
# special case to allow a symlink of ansible_release.py -> ../release.py
if file_path != 'lib/ansible/module_utils/ansible_release.py':
continue
yield TestTarget(file_path, module_path, prefix, path)
if extra_dirs:
for extra_dir in extra_dirs:
file_names = os.listdir(extra_dir)
for file_name in file_names:
file_path = os.path.join(extra_dir, file_name)
if os.path.isfile(file_path) and not os.path.islink(file_path):
yield TestTarget(file_path, module_path, prefix, path)
def analyze_integration_target_dependencies(integration_targets):
"""
:type integration_targets: list[IntegrationTarget]
:rtype: dict[str,set[str]]
"""
real_target_root = os.path.realpath('test/integration/targets') + '/'
role_targets = [t for t in integration_targets if t.type == 'role']
hidden_role_target_names = set(t.name for t in role_targets if 'hidden/' in t.aliases)
dependencies = collections.defaultdict(set)
# handle setup dependencies
for target in integration_targets:
for setup_target_name in target.setup_always + target.setup_once:
dependencies[setup_target_name].add(target.name)
# handle target dependencies
for target in integration_targets:
for need_target in target.needs_target:
dependencies[need_target].add(target.name)
# handle symlink dependencies between targets
# this use case is supported, but discouraged
for target in integration_targets:
for root, _dummy, file_names in os.walk(target.path):
for name in file_names:
path = os.path.join(root, name)
if not os.path.islink(path):
continue
real_link_path = os.path.realpath(path)
if not real_link_path.startswith(real_target_root):
continue
link_target = real_link_path[len(real_target_root):].split('/')[0]
if link_target == target.name:
continue
dependencies[link_target].add(target.name)
# intentionally primitive analysis of role meta to avoid a dependency on pyyaml
# script based targets are scanned as they may execute a playbook with role dependencies
for target in integration_targets:
meta_dir = os.path.join(target.path, 'meta')
if not os.path.isdir(meta_dir):
continue
meta_paths = sorted([os.path.join(meta_dir, name) for name in os.listdir(meta_dir)])
for meta_path in meta_paths:
if os.path.exists(meta_path):
with open(meta_path, 'r') as meta_fd:
meta_lines = meta_fd.read().splitlines()
for meta_line in meta_lines:
if re.search(r'^ *#.*$', meta_line):
continue
if not meta_line.strip():
continue
for hidden_target_name in hidden_role_target_names:
if hidden_target_name in meta_line:
dependencies[hidden_target_name].add(target.name)
while True:
changes = 0
for dummy, dependent_target_names in dependencies.items():
for dependent_target_name in list(dependent_target_names):
new_target_names = dependencies.get(dependent_target_name)
if new_target_names:
for new_target_name in new_target_names:
if new_target_name not in dependent_target_names:
dependent_target_names.add(new_target_name)
changes += 1
if not changes:
break
for target_name in sorted(dependencies):
consumers = dependencies[target_name]
if not consumers:
continue
display.info('%s:' % target_name, verbosity=4)
for consumer in sorted(consumers):
display.info(' %s' % consumer, verbosity=4)
return dependencies
class CompletionTarget(object):
"""Command-line argument completion target base class."""
__metaclass__ = abc.ABCMeta
def __init__(self):
self.name = None
self.path = None
self.base_path = None
self.modules = tuple()
self.aliases = tuple()
def __eq__(self, other):
if isinstance(other, CompletionTarget):
return self.__repr__() == other.__repr__()
return False
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
return self.name.__lt__(other.name)
def __gt__(self, other):
return self.name.__gt__(other.name)
def __hash__(self):
return hash(self.__repr__())
def __repr__(self):
if self.modules:
return '%s (%s)' % (self.name, ', '.join(self.modules))
return self.name
class DirectoryTarget(CompletionTarget):
"""Directory target."""
def __init__(self, path, modules):
"""
:type path: str
:type modules: tuple[str]
"""
super(DirectoryTarget, self).__init__()
self.name = path
self.path = path
self.modules = modules
class TestTarget(CompletionTarget):
"""Generic test target."""
def __init__(self, path, module_path, module_prefix, base_path):
"""
:type path: str
:type module_path: str | None
:type module_prefix: str | None
:type base_path: str
"""
super(TestTarget, self).__init__()
self.name = path
self.path = path
self.base_path = base_path + '/' if base_path else None
name, ext = os.path.splitext(os.path.basename(self.path))
if module_path and path.startswith(module_path) and name != '__init__' and ext in MODULE_EXTENSIONS:
self.module = name[len(module_prefix or ''):].lstrip('_')
self.modules = (self.module,)
else:
self.module = None
self.modules = tuple()
aliases = [self.path, self.module]
parts = self.path.split('/')
for i in range(1, len(parts)):
alias = '%s/' % '/'.join(parts[:i])
aliases.append(alias)
aliases = [a for a in aliases if a]
self.aliases = tuple(sorted(aliases))
class IntegrationTarget(CompletionTarget):
"""Integration test target."""
non_posix = frozenset((
'network',
'windows',
))
categories = frozenset(non_posix | frozenset((
'posix',
'module',
'needs',
'skip',
)))
def __init__(self, path, modules, prefixes):
"""
:type path: str
:type modules: frozenset[str]
:type prefixes: dict[str, str]
"""
super(IntegrationTarget, self).__init__()
self.name = os.path.basename(path)
self.path = path
# script_path and type
contents = sorted(os.listdir(path))
runme_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'runme')
test_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'test')
self.script_path = None
if runme_files:
self.type = 'script'
self.script_path = os.path.join(path, runme_files[0])
elif test_files:
self.type = 'special'
elif os.path.isdir(os.path.join(path, 'tasks')) or os.path.isdir(os.path.join(path, 'defaults')):
self.type = 'role'
else:
self.type = 'role' # ansible will consider these empty roles, so ansible-test should as well
# static_aliases
try:
aliases_path = os.path.join(path, 'aliases')
static_aliases = tuple(read_lines_without_comments(aliases_path, remove_blank_lines=True))
except IOError as ex:
if ex.errno != errno.ENOENT:
raise
static_aliases = tuple()
# modules
if self.name in modules:
module_name = self.name
elif self.name.startswith('win_') and self.name[4:] in modules:
module_name = self.name[4:]
else:
module_name = None
self.modules = tuple(sorted(a for a in static_aliases + tuple([module_name]) if a in modules))
# groups
groups = [self.type]
groups += [a for a in static_aliases if a not in modules]
groups += ['module/%s' % m for m in self.modules]
if not self.modules:
groups.append('non_module')
if 'destructive' not in groups:
groups.append('non_destructive')
if '_' in self.name:
prefix = self.name[:self.name.find('_')]
else:
prefix = None
if prefix in prefixes:
group = prefixes[prefix]
if group != prefix:
group = '%s/%s' % (group, prefix)
groups.append(group)
if self.name.startswith('win_'):
groups.append('windows')
if self.name.startswith('connection_'):
groups.append('connection')
if self.name.startswith('setup_') or self.name.startswith('prepare_'):
groups.append('hidden')
if self.type not in ('script', 'role'):
groups.append('hidden')
# Collect file paths before group expansion to avoid including the directories.
# Ignore references to test targets, as those must be defined using `needs/target/*` or other target references.
self.needs_file = tuple(sorted(set('/'.join(g.split('/')[2:]) for g in groups if
g.startswith('needs/file/') and not g.startswith('needs/file/test/integration/targets/'))))
for group in itertools.islice(groups, 0, len(groups)):
if '/' in group:
parts = group.split('/')
for i in range(1, len(parts)):
groups.append('/'.join(parts[:i]))
if not any(g in self.non_posix for g in groups):
groups.append('posix')
# aliases
aliases = [self.name] + \
['%s/' % g for g in groups] + \
['%s/%s' % (g, self.name) for g in groups if g not in self.categories]
if 'hidden/' in aliases:
aliases = ['hidden/'] + ['hidden/%s' % a for a in aliases if not a.startswith('hidden/')]
self.aliases = tuple(sorted(set(aliases)))
# configuration
self.setup_once = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('setup/once/'))))
self.setup_always = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('setup/always/'))))
self.needs_target = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('needs/target/'))))
class TargetPatternsNotMatched(ApplicationError):
"""One or more targets were not matched when a match was required."""
def __init__(self, patterns):
"""
:type patterns: set[str]
"""
self.patterns = sorted(patterns)
if len(patterns) > 1:
message = 'Target patterns not matched:\n%s' % '\n'.join(self.patterns)
else:
message = 'Target pattern not matched: %s' % self.patterns[0]
super(TargetPatternsNotMatched, self).__init__(message)