Reorganize more ansible-test code. (#74611)

* Split out shell command.
* Relocate ansible-test integration code.
pull/74612/head
Matt Clay 4 years ago committed by GitHub
parent 065fc3ca17
commit e6d7aecbe4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,2 @@
minor_changes:
- ansible-test - Reorganize integration test implementation by command.

@ -0,0 +1,2 @@
minor_changes:
- ansible-test - Split out shell command implementation.

@ -22,6 +22,7 @@ from .util import (
read_lines_without_comments,
MAXFD,
ANSIBLE_TEST_DATA_ROOT,
SUPPORTED_PYTHON_VERSIONS,
)
from .delegation import (
@ -30,17 +31,28 @@ from .delegation import (
)
from .executor import (
command_posix_integration,
command_network_integration,
command_windows_integration,
command_shell,
SUPPORTED_PYTHON_VERSIONS,
ApplicationWarning,
Delegate,
generate_pip_install,
configure_pypi_proxy,
)
from .commands.integration.posix import (
command_posix_integration,
)
from .commands.integration.network import (
command_network_integration,
)
from .commands.integration.windows import (
command_windows_integration,
)
from .commands.shell import (
command_shell,
)
from .config import (
PosixIntegrationConfig,
WindowsIntegrationConfig,

@ -3,10 +3,17 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import contextlib
import datetime
import difflib
import filecmp
import json
import os
import random
import re
import shutil
import string
import tempfile
import time
from ... import types as t
@ -14,9 +21,29 @@ from ...encoding import (
to_bytes,
)
from ...ansible_util import (
ansible_environment,
check_pyyaml,
)
from ...executor import (
get_python_version,
get_changes_filter,
AllTargetsSkipped,
Delegate,
install_command_requirements,
)
from ...ci import (
get_ci_provider,
)
from ...target import (
analyze_integration_target_dependencies,
walk_integration_targets,
IntegrationTarget,
walk_internal_targets,
TIntegrationTarget,
)
from ...config import (
@ -24,12 +51,14 @@ from ...config import (
NetworkIntegrationConfig,
PosixIntegrationConfig,
WindowsIntegrationConfig,
TIntegrationConfig,
)
from ...io import (
make_dirs,
write_text_file,
read_text_file,
open_text_file,
)
from ...util import (
@ -39,11 +68,23 @@ from ...util import (
MODE_DIRECTORY,
MODE_DIRECTORY_WRITE,
MODE_FILE,
SubprocessError,
remove_tree,
find_executable,
raw_command,
ANSIBLE_TEST_DATA_ROOT,
SUPPORTED_PYTHON_VERSIONS,
get_hash,
)
from ...util_common import (
named_temporary_file,
ResultType,
get_docker_completion,
get_remote_completion,
intercept_command,
run_command,
write_json_test_results,
)
from ...coverage_util import (
@ -56,6 +97,10 @@ from ...cache import (
from .cloud import (
CloudEnvironmentConfig,
cloud_filter,
cloud_init,
get_cloud_environment,
get_cloud_platforms,
)
from ...data import (
@ -322,6 +367,726 @@ def integration_test_config_file(args, env_config, integration_dir):
yield path
def get_integration_filter(args, targets):
"""
:type args: IntegrationConfig
:type targets: tuple[IntegrationTarget]
:rtype: list[str]
"""
if args.docker:
return get_integration_docker_filter(args, targets)
if args.remote:
return get_integration_remote_filter(args, targets)
return get_integration_local_filter(args, targets)
def common_integration_filter(args, targets, exclude):
"""
:type args: IntegrationConfig
:type targets: tuple[IntegrationTarget]
:type exclude: list[str]
"""
override_disabled = set(target for target in args.include if target.startswith('disabled/'))
if not args.allow_disabled:
skip = 'disabled/'
override = [target.name for target in targets if override_disabled & set(target.aliases)]
skipped = [target.name for target in targets if skip in target.aliases and target.name not in override]
if skipped:
exclude.extend(skipped)
display.warning('Excluding tests marked "%s" which require --allow-disabled or prefixing with "disabled/": %s'
% (skip.rstrip('/'), ', '.join(skipped)))
override_unsupported = set(target for target in args.include if target.startswith('unsupported/'))
if not args.allow_unsupported:
skip = 'unsupported/'
override = [target.name for target in targets if override_unsupported & set(target.aliases)]
skipped = [target.name for target in targets if skip in target.aliases and target.name not in override]
if skipped:
exclude.extend(skipped)
display.warning('Excluding tests marked "%s" which require --allow-unsupported or prefixing with "unsupported/": %s'
% (skip.rstrip('/'), ', '.join(skipped)))
override_unstable = set(target for target in args.include if target.startswith('unstable/'))
if args.allow_unstable_changed:
override_unstable |= set(args.metadata.change_description.focused_targets or [])
if not args.allow_unstable:
skip = 'unstable/'
override = [target.name for target in targets if override_unstable & set(target.aliases)]
skipped = [target.name for target in targets if skip in target.aliases and target.name not in override]
if skipped:
exclude.extend(skipped)
display.warning('Excluding tests marked "%s" which require --allow-unstable or prefixing with "unstable/": %s'
% (skip.rstrip('/'), ', '.join(skipped)))
# only skip a Windows test if using --windows and all the --windows versions are defined in the aliases as skip/windows/%s
if isinstance(args, WindowsIntegrationConfig) and args.windows:
all_skipped = []
not_skipped = []
for target in targets:
if "skip/windows/" not in target.aliases:
continue
skip_valid = []
skip_missing = []
for version in args.windows:
if "skip/windows/%s/" % version in target.aliases:
skip_valid.append(version)
else:
skip_missing.append(version)
if skip_missing and skip_valid:
not_skipped.append((target.name, skip_valid, skip_missing))
elif skip_valid:
all_skipped.append(target.name)
if all_skipped:
exclude.extend(all_skipped)
skip_aliases = ["skip/windows/%s/" % w for w in args.windows]
display.warning('Excluding tests marked "%s" which are set to skip with --windows %s: %s'
% ('", "'.join(skip_aliases), ', '.join(args.windows), ', '.join(all_skipped)))
if not_skipped:
for target, skip_valid, skip_missing in not_skipped:
# warn when failing to skip due to lack of support for skipping only some versions
display.warning('Including test "%s" which was marked to skip for --windows %s but not %s.'
% (target, ', '.join(skip_valid), ', '.join(skip_missing)))
def get_integration_local_filter(args, targets):
"""
:type args: IntegrationConfig
:type targets: tuple[IntegrationTarget]
:rtype: list[str]
"""
exclude = []
common_integration_filter(args, targets, exclude)
if not args.allow_root and os.getuid() != 0:
skip = 'needs/root/'
skipped = [target.name for target in targets if skip in target.aliases]
if skipped:
exclude.append(skip)
display.warning('Excluding tests marked "%s" which require --allow-root or running as root: %s'
% (skip.rstrip('/'), ', '.join(skipped)))
override_destructive = set(target for target in args.include if target.startswith('destructive/'))
if not args.allow_destructive:
skip = 'destructive/'
override = [target.name for target in targets if override_destructive & set(target.aliases)]
skipped = [target.name for target in targets if skip in target.aliases and target.name not in override]
if skipped:
exclude.extend(skipped)
display.warning('Excluding tests marked "%s" which require --allow-destructive or prefixing with "destructive/" to run locally: %s'
% (skip.rstrip('/'), ', '.join(skipped)))
exclude_targets_by_python_version(targets, args.python_version, exclude)
return exclude
def get_integration_docker_filter(args, targets):
"""
:type args: IntegrationConfig
:type targets: tuple[IntegrationTarget]
:rtype: list[str]
"""
exclude = []
common_integration_filter(args, targets, exclude)
skip = 'skip/docker/'
skipped = [target.name for target in targets if skip in target.aliases]
if skipped:
exclude.append(skip)
display.warning('Excluding tests marked "%s" which cannot run under docker: %s'
% (skip.rstrip('/'), ', '.join(skipped)))
if not args.docker_privileged:
skip = 'needs/privileged/'
skipped = [target.name for target in targets if skip in target.aliases]
if skipped:
exclude.append(skip)
display.warning('Excluding tests marked "%s" which require --docker-privileged to run under docker: %s'
% (skip.rstrip('/'), ', '.join(skipped)))
python_version = get_python_version(args, get_docker_completion(), args.docker_raw)
exclude_targets_by_python_version(targets, python_version, exclude)
return exclude
def get_integration_remote_filter(args, targets):
"""
:type args: IntegrationConfig
:type targets: tuple[IntegrationTarget]
:rtype: list[str]
"""
remote = args.parsed_remote
exclude = []
common_integration_filter(args, targets, exclude)
skips = {
'skip/%s' % remote.platform: remote.platform,
'skip/%s/%s' % (remote.platform, remote.version): '%s %s' % (remote.platform, remote.version),
'skip/%s%s' % (remote.platform, remote.version): '%s %s' % (remote.platform, remote.version), # legacy syntax, use above format
}
if remote.arch:
skips.update({
'skip/%s/%s' % (remote.arch, remote.platform): '%s on %s' % (remote.platform, remote.arch),
'skip/%s/%s/%s' % (remote.arch, remote.platform, remote.version): '%s %s on %s' % (remote.platform, remote.version, remote.arch),
})
for skip, description in skips.items():
skipped = [target.name for target in targets if skip in target.skips]
if skipped:
exclude.append(skip + '/')
display.warning('Excluding tests marked "%s" which are not supported on %s: %s' % (skip, description, ', '.join(skipped)))
python_version = get_python_version(args, get_remote_completion(), args.remote)
exclude_targets_by_python_version(targets, python_version, exclude)
return exclude
def exclude_targets_by_python_version(targets, python_version, exclude):
"""
:type targets: tuple[IntegrationTarget]
:type python_version: str
:type exclude: list[str]
"""
if not python_version:
display.warning('Python version unknown. Unable to skip tests based on Python version.')
return
python_major_version = python_version.split('.')[0]
skip = 'skip/python%s/' % python_version
skipped = [target.name for target in targets if skip in target.aliases]
if skipped:
exclude.append(skip)
display.warning('Excluding tests marked "%s" which are not supported on python %s: %s'
% (skip.rstrip('/'), python_version, ', '.join(skipped)))
skip = 'skip/python%s/' % python_major_version
skipped = [target.name for target in targets if skip in target.aliases]
if skipped:
exclude.append(skip)
display.warning('Excluding tests marked "%s" which are not supported on python %s: %s'
% (skip.rstrip('/'), python_version, ', '.join(skipped)))
def command_integration_filter(args, # type: TIntegrationConfig
targets, # type: t.Iterable[TIntegrationTarget]
init_callback=None, # type: t.Callable[[TIntegrationConfig, t.Tuple[TIntegrationTarget, ...]], None]
): # type: (...) -> t.Tuple[TIntegrationTarget, ...]
"""Filter the given integration test targets."""
targets = tuple(target for target in targets if 'hidden/' not in target.aliases)
changes = get_changes_filter(args)
# special behavior when the --changed-all-target target is selected based on changes
if args.changed_all_target in changes:
# act as though the --changed-all-target target was in the include list
if args.changed_all_mode == 'include' and args.changed_all_target not in args.include:
args.include.append(args.changed_all_target)
args.delegate_args += ['--include', args.changed_all_target]
# act as though the --changed-all-target target was in the exclude list
elif args.changed_all_mode == 'exclude' and args.changed_all_target not in args.exclude:
args.exclude.append(args.changed_all_target)
require = args.require + changes
exclude = args.exclude
internal_targets = walk_internal_targets(targets, args.include, exclude, require)
environment_exclude = get_integration_filter(args, internal_targets)
environment_exclude += cloud_filter(args, internal_targets)
if environment_exclude:
exclude += environment_exclude
internal_targets = walk_internal_targets(targets, args.include, exclude, require)
if not internal_targets:
raise AllTargetsSkipped()
if args.start_at and not any(target.name == args.start_at for target in internal_targets):
raise ApplicationError('Start at target matches nothing: %s' % args.start_at)
if init_callback:
init_callback(args, internal_targets)
cloud_init(args, internal_targets)
vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path)
if os.path.exists(vars_file_src):
def integration_config_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None
"""
Add the integration config vars file to the payload file list.
This will preserve the file during delegation even if the file is ignored by source control.
"""
files.append((vars_file_src, data_context().content.integration_vars_path))
data_context().register_payload_callback(integration_config_callback)
if args.delegate:
raise Delegate(require=require, exclude=exclude)
extra_requirements = []
for cloud_platform in get_cloud_platforms(args):
extra_requirements.append('%s.cloud.%s' % (args.command, cloud_platform))
install_command_requirements(args, extra_requirements=extra_requirements)
return internal_targets
def command_integration_filtered(
args, # type: IntegrationConfig
targets, # type: t.Tuple[IntegrationTarget]
all_targets, # type: t.Tuple[IntegrationTarget]
inventory_path, # type: str
pre_target=None, # type: t.Optional[t.Callable[IntegrationTarget]]
post_target=None, # type: t.Optional[t.Callable[IntegrationTarget]]
remote_temp_path=None, # type: t.Optional[str]
):
"""Run integration tests for the specified targets."""
found = False
passed = []
failed = []
targets_iter = iter(targets)
all_targets_dict = dict((target.name, target) for target in all_targets)
setup_errors = []
setup_targets_executed = set()
for target in all_targets:
for setup_target in target.setup_once + target.setup_always:
if setup_target not in all_targets_dict:
setup_errors.append('Target "%s" contains invalid setup target: %s' % (target.name, setup_target))
if setup_errors:
raise ApplicationError('Found %d invalid setup aliases:\n%s' % (len(setup_errors), '\n'.join(setup_errors)))
check_pyyaml(args, args.python_version)
test_dir = os.path.join(ResultType.TMP.path, 'output_dir')
if not args.explain and any('needs/ssh/' in target.aliases for target in targets):
max_tries = 20
display.info('SSH service required for tests. Checking to make sure we can connect.')
for i in range(1, max_tries + 1):
try:
run_command(args, ['ssh', '-o', 'BatchMode=yes', 'localhost', 'id'], capture=True)
display.info('SSH service responded.')
break
except SubprocessError:
if i == max_tries:
raise
seconds = 3
display.warning('SSH service not responding. Waiting %d second(s) before checking again.' % seconds)
time.sleep(seconds)
start_at_task = args.start_at_task
results = {}
current_environment = None # type: t.Optional[EnvironmentDescription]
# common temporary directory path that will be valid on both the controller and the remote
# it must be common because it will be referenced in environment variables that are shared across multiple hosts
common_temp_path = '/tmp/ansible-test-%s' % ''.join(random.choice(string.ascii_letters + string.digits) for _idx in range(8))
setup_common_temp_dir(args, common_temp_path)
try:
for target in targets_iter:
if args.start_at and not found:
found = target.name == args.start_at
if not found:
continue
if args.list_targets:
print(target.name)
continue
tries = 2 if args.retry_on_error else 1
verbosity = args.verbosity
cloud_environment = get_cloud_environment(args, target)
original_environment = current_environment if current_environment else EnvironmentDescription(args)
current_environment = None
display.info('>>> Environment Description\n%s' % original_environment, verbosity=3)
try:
while tries:
tries -= 1
try:
if cloud_environment:
cloud_environment.setup_once()
run_setup_targets(args, test_dir, target.setup_once, all_targets_dict, setup_targets_executed, inventory_path, common_temp_path, False)
start_time = time.time()
if pre_target:
pre_target(target)
run_setup_targets(args, test_dir, target.setup_always, all_targets_dict, setup_targets_executed, inventory_path, common_temp_path, True)
if not args.explain:
# create a fresh test directory for each test target
remove_tree(test_dir)
make_dirs(test_dir)
try:
if target.script_path:
command_integration_script(args, target, test_dir, inventory_path, common_temp_path,
remote_temp_path=remote_temp_path)
else:
command_integration_role(args, target, start_at_task, test_dir, inventory_path,
common_temp_path, remote_temp_path=remote_temp_path)
start_at_task = None
finally:
if post_target:
post_target(target)
end_time = time.time()
results[target.name] = dict(
name=target.name,
type=target.type,
aliases=target.aliases,
modules=target.modules,
run_time_seconds=int(end_time - start_time),
setup_once=target.setup_once,
setup_always=target.setup_always,
coverage=args.coverage,
coverage_label=args.coverage_label,
python_version=args.python_version,
)
break
except SubprocessError:
if cloud_environment:
cloud_environment.on_failure(target, tries)
if not original_environment.validate(target.name, throw=False):
raise
if not tries:
raise
display.warning('Retrying test target "%s" with maximum verbosity.' % target.name)
display.verbosity = args.verbosity = 6
start_time = time.time()
current_environment = EnvironmentDescription(args)
end_time = time.time()
EnvironmentDescription.check(original_environment, current_environment, target.name, throw=True)
results[target.name]['validation_seconds'] = int(end_time - start_time)
passed.append(target)
except Exception as ex:
failed.append(target)
if args.continue_on_error:
display.error(ex)
continue
display.notice('To resume at this test target, use the option: --start-at %s' % target.name)
next_target = next(targets_iter, None)
if next_target:
display.notice('To resume after this test target, use the option: --start-at %s' % next_target.name)
raise
finally:
display.verbosity = args.verbosity = verbosity
finally:
if not args.explain:
if args.coverage:
coverage_temp_path = os.path.join(common_temp_path, ResultType.COVERAGE.name)
coverage_save_path = ResultType.COVERAGE.path
for filename in os.listdir(coverage_temp_path):
shutil.copy(os.path.join(coverage_temp_path, filename), os.path.join(coverage_save_path, filename))
remove_tree(common_temp_path)
result_name = '%s-%s.json' % (
args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0))))
data = dict(
targets=results,
)
write_json_test_results(ResultType.DATA, result_name, data)
if failed:
raise ApplicationError('The %d integration test(s) listed below (out of %d) failed. See error output above for details:\n%s' % (
len(failed), len(passed) + len(failed), '\n'.join(target.name for target in failed)))
def command_integration_script(args, target, test_dir, inventory_path, temp_path, remote_temp_path=None):
"""
:type args: IntegrationConfig
:type target: IntegrationTarget
:type test_dir: str
:type inventory_path: str
:type temp_path: str
:type remote_temp_path: str | None
"""
display.info('Running %s integration test script' % target.name)
env_config = None
if isinstance(args, PosixIntegrationConfig):
cloud_environment = get_cloud_environment(args, target)
if cloud_environment:
env_config = cloud_environment.get_environment_config()
if env_config:
display.info('>>> Environment Config\n%s' % json.dumps(dict(
env_vars=env_config.env_vars,
ansible_vars=env_config.ansible_vars,
callback_plugins=env_config.callback_plugins,
module_defaults=env_config.module_defaults,
), indent=4, sort_keys=True), verbosity=3)
with integration_test_environment(args, target, inventory_path) as test_env:
cmd = ['./%s' % os.path.basename(target.script_path)]
if args.verbosity:
cmd.append('-' + ('v' * args.verbosity))
env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config)
cwd = os.path.join(test_env.targets_dir, target.relative_path)
env.update(dict(
# support use of adhoc ansible commands in collections without specifying the fully qualified collection name
ANSIBLE_PLAYBOOK_DIR=cwd,
))
if env_config and env_config.env_vars:
env.update(env_config.env_vars)
with integration_test_config_file(args, env_config, test_env.integration_dir) as config_path:
if config_path:
cmd += ['-e', '@%s' % config_path]
module_coverage = 'non_local/' not in target.aliases
intercept_command(args, cmd, target_name=target.name, env=env, cwd=cwd, temp_path=temp_path,
remote_temp_path=remote_temp_path, module_coverage=module_coverage)
def command_integration_role(args, target, start_at_task, test_dir, inventory_path, temp_path, remote_temp_path=None):
"""
:type args: IntegrationConfig
:type target: IntegrationTarget
:type start_at_task: str | None
:type test_dir: str
:type inventory_path: str
:type temp_path: str
:type remote_temp_path: str | None
"""
display.info('Running %s integration test role' % target.name)
env_config = None
vars_files = []
variables = dict(
output_dir=test_dir,
)
if isinstance(args, WindowsIntegrationConfig):
hosts = 'windows'
gather_facts = False
variables.update(dict(
win_output_dir=r'C:\ansible_testing',
))
elif isinstance(args, NetworkIntegrationConfig):
hosts = target.network_platform
gather_facts = False
else:
hosts = 'testhost'
gather_facts = True
if not isinstance(args, NetworkIntegrationConfig):
cloud_environment = get_cloud_environment(args, target)
if cloud_environment:
env_config = cloud_environment.get_environment_config()
if env_config:
display.info('>>> Environment Config\n%s' % json.dumps(dict(
env_vars=env_config.env_vars,
ansible_vars=env_config.ansible_vars,
callback_plugins=env_config.callback_plugins,
module_defaults=env_config.module_defaults,
), indent=4, sort_keys=True), verbosity=3)
with integration_test_environment(args, target, inventory_path) as test_env:
if os.path.exists(test_env.vars_file):
vars_files.append(os.path.relpath(test_env.vars_file, test_env.integration_dir))
play = dict(
hosts=hosts,
gather_facts=gather_facts,
vars_files=vars_files,
vars=variables,
roles=[
target.name,
],
)
if env_config:
if env_config.ansible_vars:
variables.update(env_config.ansible_vars)
play.update(dict(
environment=env_config.env_vars,
module_defaults=env_config.module_defaults,
))
playbook = json.dumps([play], indent=4, sort_keys=True)
with named_temporary_file(args=args, directory=test_env.integration_dir, prefix='%s-' % target.name, suffix='.yml', content=playbook) as playbook_path:
filename = os.path.basename(playbook_path)
display.info('>>> Playbook: %s\n%s' % (filename, playbook.strip()), verbosity=3)
cmd = ['ansible-playbook', filename, '-i', os.path.relpath(test_env.inventory_path, test_env.integration_dir)]
if start_at_task:
cmd += ['--start-at-task', start_at_task]
if args.tags:
cmd += ['--tags', args.tags]
if args.skip_tags:
cmd += ['--skip-tags', args.skip_tags]
if args.diff:
cmd += ['--diff']
if isinstance(args, NetworkIntegrationConfig):
if args.testcase:
cmd += ['-e', 'testcase=%s' % args.testcase]
if args.verbosity:
cmd.append('-' + ('v' * args.verbosity))
env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config)
cwd = test_env.integration_dir
env.update(dict(
# support use of adhoc ansible commands in collections without specifying the fully qualified collection name
ANSIBLE_PLAYBOOK_DIR=cwd,
))
if env_config and env_config.env_vars:
env.update(env_config.env_vars)
env['ANSIBLE_ROLES_PATH'] = test_env.targets_dir
module_coverage = 'non_local/' not in target.aliases
intercept_command(args, cmd, target_name=target.name, env=env, cwd=cwd, temp_path=temp_path,
remote_temp_path=remote_temp_path, module_coverage=module_coverage)
def run_setup_targets(args, test_dir, target_names, targets_dict, targets_executed, inventory_path, temp_path, always):
"""
:type args: IntegrationConfig
:type test_dir: str
:type target_names: list[str]
:type targets_dict: dict[str, IntegrationTarget]
:type targets_executed: set[str]
:type inventory_path: str
:type temp_path: str
:type always: bool
"""
for target_name in target_names:
if not always and target_name in targets_executed:
continue
target = targets_dict[target_name]
if not args.explain:
# create a fresh test directory for each test target
remove_tree(test_dir)
make_dirs(test_dir)
if target.script_path:
command_integration_script(args, target, test_dir, inventory_path, temp_path)
else:
command_integration_role(args, target, None, test_dir, inventory_path, temp_path)
targets_executed.add(target_name)
def integration_environment(args, target, test_dir, inventory_path, ansible_config, env_config):
"""
:type args: IntegrationConfig
:type target: IntegrationTarget
:type test_dir: str
:type inventory_path: str
:type ansible_config: str | None
:type env_config: CloudEnvironmentConfig | None
:rtype: dict[str, str]
"""
env = ansible_environment(args, ansible_config=ansible_config)
callback_plugins = ['junit'] + (env_config.callback_plugins or [] if env_config else [])
integration = dict(
JUNIT_OUTPUT_DIR=ResultType.JUNIT.path,
ANSIBLE_CALLBACKS_ENABLED=','.join(sorted(set(callback_plugins))),
ANSIBLE_TEST_CI=args.metadata.ci_provider or get_ci_provider().code,
ANSIBLE_TEST_COVERAGE='check' if args.coverage_check else ('yes' if args.coverage else ''),
OUTPUT_DIR=test_dir,
INVENTORY_PATH=os.path.abspath(inventory_path),
)
if args.debug_strategy:
env.update(dict(ANSIBLE_STRATEGY='debug'))
if 'non_local/' in target.aliases:
if args.coverage:
display.warning('Skipping coverage reporting on Ansible modules for non-local test: %s' % target.name)
env.update(dict(ANSIBLE_TEST_REMOTE_INTERPRETER=''))
env.update(integration)
return env
class IntegrationEnvironment:
"""Details about the integration environment."""
def __init__(self, integration_dir, targets_dir, inventory_path, ansible_config, vars_file):
@ -347,3 +1112,167 @@ class IntegrationCache(CommonCache):
:rtype: dict[str, set[IntegrationTarget]]
"""
return self.get('dependency_map', lambda: generate_dependency_map(self.integration_targets))
class EnvironmentDescription:
"""Description of current running environment."""
def __init__(self, args):
"""Initialize snapshot of environment configuration.
:type args: IntegrationConfig
"""
self.args = args
if self.args.explain:
self.data = {}
return
warnings = []
versions = ['']
versions += SUPPORTED_PYTHON_VERSIONS
versions += list(set(v.split('.')[0] for v in SUPPORTED_PYTHON_VERSIONS))
version_check = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'versions.py')
python_paths = dict((v, find_executable('python%s' % v, required=False)) for v in sorted(versions))
pip_paths = dict((v, find_executable('pip%s' % v, required=False)) for v in sorted(versions))
program_versions = dict((v, self.get_version([python_paths[v], version_check], warnings)) for v in sorted(python_paths) if python_paths[v])
pip_interpreters = dict((v, self.get_shebang(pip_paths[v])) for v in sorted(pip_paths) if pip_paths[v])
known_hosts_hash = get_hash(os.path.expanduser('~/.ssh/known_hosts'))
for version in sorted(versions):
self.check_python_pip_association(version, python_paths, pip_paths, pip_interpreters, warnings)
for warning in warnings:
display.warning(warning, unique=True)
self.data = dict(
python_paths=python_paths,
pip_paths=pip_paths,
program_versions=program_versions,
pip_interpreters=pip_interpreters,
known_hosts_hash=known_hosts_hash,
warnings=warnings,
)
@staticmethod
def check_python_pip_association(version, python_paths, pip_paths, pip_interpreters, warnings):
"""
:type version: str
:param python_paths: dict[str, str]
:param pip_paths: dict[str, str]
:param pip_interpreters: dict[str, str]
:param warnings: list[str]
"""
python_label = 'Python%s' % (' %s' % version if version else '')
pip_path = pip_paths.get(version)
python_path = python_paths.get(version)
if not python_path or not pip_path:
# skip checks when either python or pip are missing for this version
return
pip_shebang = pip_interpreters.get(version)
match = re.search(r'#!\s*(?P<command>[^\s]+)', pip_shebang)
if not match:
warnings.append('A %s pip was found at "%s", but it does not have a valid shebang: %s' % (python_label, pip_path, pip_shebang))
return
pip_interpreter = os.path.realpath(match.group('command'))
python_interpreter = os.path.realpath(python_path)
if pip_interpreter == python_interpreter:
return
try:
identical = filecmp.cmp(pip_interpreter, python_interpreter)
except OSError:
identical = False
if identical:
return
warnings.append('A %s pip was found at "%s", but it uses interpreter "%s" instead of "%s".' % (
python_label, pip_path, pip_interpreter, python_interpreter))
def __str__(self):
"""
:rtype: str
"""
return json.dumps(self.data, sort_keys=True, indent=4)
def validate(self, target_name, throw):
"""
:type target_name: str
:type throw: bool
:rtype: bool
"""
current = EnvironmentDescription(self.args)
return self.check(self, current, target_name, throw)
@staticmethod
def check(original, current, target_name, throw):
"""
:type original: EnvironmentDescription
:type current: EnvironmentDescription
:type target_name: str
:type throw: bool
:rtype: bool
"""
original_json = str(original)
current_json = str(current)
if original_json == current_json:
return True
unified_diff = '\n'.join(difflib.unified_diff(
a=original_json.splitlines(),
b=current_json.splitlines(),
fromfile='original.json',
tofile='current.json',
lineterm='',
))
message = ('Test target "%s" has changed the test environment!\n'
'If these changes are necessary, they must be reverted before the test finishes.\n'
'>>> Original Environment\n'
'%s\n'
'>>> Current Environment\n'
'%s\n'
'>>> Environment Diff\n'
'%s'
% (target_name, original_json, current_json, unified_diff))
if throw:
raise ApplicationError(message)
display.error(message)
return False
@staticmethod
def get_version(command, warnings):
"""
:type command: list[str]
:type warnings: list[text]
:rtype: list[str]
"""
try:
stdout, stderr = raw_command(command, capture=True, cmd_verbosity=2)
except SubprocessError as ex:
warnings.append(u'%s' % ex)
return None # all failures are equal, we don't care why it failed, only that it did
return [line.strip() for line in ((stdout or '').strip() + (stderr or '').strip()).splitlines()]
@staticmethod
def get_shebang(path):
"""
:type path: str
:rtype: str
"""
with open_text_file(path) as script_fd:
return script_fd.readline().strip()

@ -0,0 +1,246 @@
"""Network integration testing."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import time
import textwrap
import functools
from ... import types as t
from ...thread import (
WrappedThread,
)
from ...core_ci import (
AnsibleCoreCI,
SshKey,
)
from ...manage_ci import (
ManageNetworkCI,
get_network_settings,
)
from ...io import (
write_text_file,
)
from ...util import (
ApplicationError,
display,
ANSIBLE_TEST_CONFIG_ROOT,
)
from ...util_common import (
get_python_path,
handle_layout_messages,
)
from ...target import (
IntegrationTarget,
walk_network_integration_targets,
)
from ...config import (
NetworkIntegrationConfig,
)
from . import (
command_integration_filter,
command_integration_filtered,
get_inventory_relative_path,
check_inventory,
delegate_inventory,
)
from ...data import (
data_context,
)
def command_network_integration(args):
"""
:type args: NetworkIntegrationConfig
"""
handle_layout_messages(data_context().content.integration_messages)
inventory_relative_path = get_inventory_relative_path(args)
template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, os.path.basename(inventory_relative_path)) + '.template'
if args.inventory:
inventory_path = os.path.join(data_context().content.root, data_context().content.integration_path, args.inventory)
else:
inventory_path = os.path.join(data_context().content.root, inventory_relative_path)
if args.no_temp_workdir:
# temporary solution to keep DCI tests working
inventory_exists = os.path.exists(inventory_path)
else:
inventory_exists = os.path.isfile(inventory_path)
if not args.explain and not args.platform and not inventory_exists:
raise ApplicationError(
'Inventory not found: %s\n'
'Use --inventory to specify the inventory path.\n'
'Use --platform to provision resources and generate an inventory file.\n'
'See also inventory template: %s' % (inventory_path, template_path)
)
check_inventory(args, inventory_path)
delegate_inventory(args, inventory_path)
all_targets = tuple(walk_network_integration_targets(include_hidden=True))
internal_targets = command_integration_filter(args, all_targets, init_callback=network_init)
instances = [] # type: t.List[WrappedThread]
if args.platform:
get_python_path(args, args.python_executable) # initialize before starting threads
configs = dict((config['platform_version'], config) for config in args.metadata.instance_config)
for platform_version in args.platform:
platform, version = platform_version.split('/', 1)
config = configs.get(platform_version)
if not config:
continue
instance = WrappedThread(functools.partial(network_run, args, platform, version, config))
instance.daemon = True
instance.start()
instances.append(instance)
while any(instance.is_alive() for instance in instances):
time.sleep(1)
remotes = [instance.wait_for_result() for instance in instances]
inventory = network_inventory(args, remotes)
display.info('>>> Inventory: %s\n%s' % (inventory_path, inventory.strip()), verbosity=3)
if not args.explain:
write_text_file(inventory_path, inventory)
success = False
try:
command_integration_filtered(args, internal_targets, all_targets, inventory_path)
success = True
finally:
if args.remote_terminate == 'always' or (args.remote_terminate == 'success' and success):
for instance in instances:
instance.result.stop()
def network_init(args, internal_targets): # type: (NetworkIntegrationConfig, t.Tuple[IntegrationTarget, ...]) -> None
"""Initialize platforms for network integration tests."""
if not args.platform:
return
if args.metadata.instance_config is not None:
return
platform_targets = set(a for target in internal_targets for a in target.aliases if a.startswith('network/'))
instances = [] # type: t.List[WrappedThread]
# generate an ssh key (if needed) up front once, instead of for each instance
SshKey(args)
for platform_version in args.platform:
platform, version = platform_version.split('/', 1)
platform_target = 'network/%s/' % platform
if platform_target not in platform_targets:
display.warning('Skipping "%s" because selected tests do not target the "%s" platform.' % (
platform_version, platform))
continue
instance = WrappedThread(functools.partial(network_start, args, platform, version))
instance.daemon = True
instance.start()
instances.append(instance)
while any(instance.is_alive() for instance in instances):
time.sleep(1)
args.metadata.instance_config = [instance.wait_for_result() for instance in instances]
def network_start(args, platform, version):
"""
:type args: NetworkIntegrationConfig
:type platform: str
:type version: str
:rtype: AnsibleCoreCI
"""
core_ci = AnsibleCoreCI(args, platform, version, stage=args.remote_stage, provider=args.remote_provider)
core_ci.start()
return core_ci.save()
def network_run(args, platform, version, config):
"""
:type args: NetworkIntegrationConfig
:type platform: str
:type version: str
:type config: dict[str, str]
:rtype: AnsibleCoreCI
"""
core_ci = AnsibleCoreCI(args, platform, version, stage=args.remote_stage, provider=args.remote_provider, load=False)
core_ci.load(config)
core_ci.wait()
manage = ManageNetworkCI(args, core_ci)
manage.wait()
return core_ci
def network_inventory(args, remotes):
"""
:type args: NetworkIntegrationConfig
:type remotes: list[AnsibleCoreCI]
:rtype: str
"""
groups = dict([(remote.platform, []) for remote in remotes])
net = []
for remote in remotes:
options = dict(
ansible_host=remote.connection.hostname,
ansible_user=remote.connection.username,
ansible_ssh_private_key_file=os.path.abspath(remote.ssh_key.key),
)
settings = get_network_settings(args, remote.platform, remote.version)
options.update(settings.inventory_vars)
groups[remote.platform].append(
'%s %s' % (
remote.name.replace('.', '-'),
' '.join('%s="%s"' % (k, options[k]) for k in sorted(options)),
)
)
net.append(remote.platform)
groups['net:children'] = net
template = ''
for group in groups:
hosts = '\n'.join(groups[group])
template += textwrap.dedent("""
[%s]
%s
""") % (group, hosts)
inventory = template
return inventory

@ -0,0 +1,57 @@
"""POSIX integration testing."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ... import types as t
from ...util import (
ANSIBLE_TEST_DATA_ROOT,
)
from ...util_common import (
handle_layout_messages,
)
from ...containers import (
SshConnectionDetail,
create_container_hooks,
)
from ...target import (
walk_posix_integration_targets,
)
from ...config import (
PosixIntegrationConfig,
)
from . import (
command_integration_filter,
command_integration_filtered,
get_inventory_relative_path,
)
from ...data import (
data_context,
)
def command_posix_integration(args):
"""
:type args: PosixIntegrationConfig
"""
handle_layout_messages(data_context().content.integration_messages)
inventory_relative_path = get_inventory_relative_path(args)
inventory_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, os.path.basename(inventory_relative_path))
all_targets = tuple(walk_posix_integration_targets(include_hidden=True))
internal_targets = command_integration_filter(args, all_targets)
managed_connections = None # type: t.Optional[t.List[SshConnectionDetail]]
pre_target, post_target = create_container_hooks(args, managed_connections)
command_integration_filtered(args, internal_targets, all_targets, inventory_path, pre_target=pre_target, post_target=post_target)

@ -0,0 +1,310 @@
"""Windows integration testing."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import time
import textwrap
import functools
from ... import types as t
from ...thread import (
WrappedThread,
)
from ...core_ci import (
AnsibleCoreCI,
SshKey,
)
from ...manage_ci import (
ManageWindowsCI,
)
from ...io import (
write_text_file,
)
from ...util import (
ApplicationError,
display,
ANSIBLE_TEST_CONFIG_ROOT,
tempdir,
open_zipfile,
)
from ...util_common import (
get_python_path,
ResultType,
handle_layout_messages,
)
from ...containers import (
SshConnectionDetail,
create_container_hooks,
)
from ...ansible_util import (
run_playbook,
)
from ...target import (
IntegrationTarget,
walk_windows_integration_targets,
)
from ...config import (
WindowsIntegrationConfig,
)
from . import (
command_integration_filter,
command_integration_filtered,
get_inventory_relative_path,
check_inventory,
delegate_inventory,
)
from ...data import (
data_context,
)
from ...executor import (
parse_inventory,
get_hosts,
)
def command_windows_integration(args):
"""
:type args: WindowsIntegrationConfig
"""
handle_layout_messages(data_context().content.integration_messages)
inventory_relative_path = get_inventory_relative_path(args)
template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, os.path.basename(inventory_relative_path)) + '.template'
if args.inventory:
inventory_path = os.path.join(data_context().content.root, data_context().content.integration_path, args.inventory)
else:
inventory_path = os.path.join(data_context().content.root, inventory_relative_path)
if not args.explain and not args.windows and not os.path.isfile(inventory_path):
raise ApplicationError(
'Inventory not found: %s\n'
'Use --inventory to specify the inventory path.\n'
'Use --windows to provision resources and generate an inventory file.\n'
'See also inventory template: %s' % (inventory_path, template_path)
)
check_inventory(args, inventory_path)
delegate_inventory(args, inventory_path)
all_targets = tuple(walk_windows_integration_targets(include_hidden=True))
internal_targets = command_integration_filter(args, all_targets, init_callback=windows_init)
instances = [] # type: t.List[WrappedThread]
managed_connections = [] # type: t.List[SshConnectionDetail]
if args.windows:
get_python_path(args, args.python_executable) # initialize before starting threads
configs = dict((config['platform_version'], config) for config in args.metadata.instance_config)
for version in args.windows:
config = configs['windows/%s' % version]
instance = WrappedThread(functools.partial(windows_run, args, version, config))
instance.daemon = True
instance.start()
instances.append(instance)
while any(instance.is_alive() for instance in instances):
time.sleep(1)
remotes = [instance.wait_for_result() for instance in instances]
inventory = windows_inventory(remotes)
display.info('>>> Inventory: %s\n%s' % (inventory_path, inventory.strip()), verbosity=3)
if not args.explain:
write_text_file(inventory_path, inventory)
for core_ci in remotes:
ssh_con = core_ci.connection
ssh = SshConnectionDetail(core_ci.name, ssh_con.hostname, 22, ssh_con.username, core_ci.ssh_key.key, shell_type='powershell')
managed_connections.append(ssh)
elif args.explain:
identity_file = SshKey(args).key
# mock connection details to prevent tracebacks in explain mode
managed_connections = [SshConnectionDetail(
name='windows',
host='windows',
port=22,
user='administrator',
identity_file=identity_file,
shell_type='powershell',
)]
else:
inventory = parse_inventory(args, inventory_path)
hosts = get_hosts(inventory, 'windows')
identity_file = SshKey(args).key
managed_connections = [SshConnectionDetail(
name=name,
host=config['ansible_host'],
port=22,
user=config['ansible_user'],
identity_file=identity_file,
shell_type='powershell',
) for name, config in hosts.items()]
if managed_connections:
display.info('Generated SSH connection details from inventory:\n%s' % (
'\n'.join('%s %s@%s:%d' % (ssh.name, ssh.user, ssh.host, ssh.port) for ssh in managed_connections)), verbosity=1)
pre_target, post_target = create_container_hooks(args, managed_connections)
remote_temp_path = None
if args.coverage and not args.coverage_check:
# Create the remote directory that is writable by everyone. Use Ansible to talk to the remote host.
remote_temp_path = 'C:\\ansible_test_coverage_%s' % time.time()
playbook_vars = {'remote_temp_path': remote_temp_path}
run_playbook(args, inventory_path, 'windows_coverage_setup.yml', playbook_vars)
success = False
try:
command_integration_filtered(args, internal_targets, all_targets, inventory_path, pre_target=pre_target,
post_target=post_target, remote_temp_path=remote_temp_path)
success = True
finally:
if remote_temp_path:
# Zip up the coverage files that were generated and fetch it back to localhost.
with tempdir() as local_temp_path:
playbook_vars = {'remote_temp_path': remote_temp_path, 'local_temp_path': local_temp_path}
run_playbook(args, inventory_path, 'windows_coverage_teardown.yml', playbook_vars)
for filename in os.listdir(local_temp_path):
with open_zipfile(os.path.join(local_temp_path, filename)) as coverage_zip:
coverage_zip.extractall(ResultType.COVERAGE.path)
if args.remote_terminate == 'always' or (args.remote_terminate == 'success' and success):
for instance in instances:
instance.result.stop()
# noinspection PyUnusedLocal
def windows_init(args, internal_targets): # pylint: disable=locally-disabled, unused-argument
"""
:type args: WindowsIntegrationConfig
:type internal_targets: tuple[IntegrationTarget]
"""
# generate an ssh key (if needed) up front once, instead of for each instance
SshKey(args)
if not args.windows:
return
if args.metadata.instance_config is not None:
return
instances = [] # type: t.List[WrappedThread]
for version in args.windows:
instance = WrappedThread(functools.partial(windows_start, args, version))
instance.daemon = True
instance.start()
instances.append(instance)
while any(instance.is_alive() for instance in instances):
time.sleep(1)
args.metadata.instance_config = [instance.wait_for_result() for instance in instances]
def windows_start(args, version):
"""
:type args: WindowsIntegrationConfig
:type version: str
:rtype: AnsibleCoreCI
"""
core_ci = AnsibleCoreCI(args, 'windows', version, stage=args.remote_stage, provider=args.remote_provider)
core_ci.start()
return core_ci.save()
def windows_run(args, version, config):
"""
:type args: WindowsIntegrationConfig
:type version: str
:type config: dict[str, str]
:rtype: AnsibleCoreCI
"""
core_ci = AnsibleCoreCI(args, 'windows', version, stage=args.remote_stage, provider=args.remote_provider, load=False)
core_ci.load(config)
core_ci.wait()
manage = ManageWindowsCI(core_ci)
manage.wait()
return core_ci
def windows_inventory(remotes):
"""
:type remotes: list[AnsibleCoreCI]
:rtype: str
"""
hosts = []
for remote in remotes:
options = dict(
ansible_host=remote.connection.hostname,
ansible_user=remote.connection.username,
ansible_password=remote.connection.password,
ansible_port=remote.connection.port,
)
# used for the connection_windows_ssh test target
if remote.ssh_key:
options["ansible_ssh_private_key_file"] = os.path.abspath(remote.ssh_key.key)
if remote.name == 'windows-2016':
options.update(
# force 2016 to use NTLM + HTTP message encryption
ansible_connection='winrm',
ansible_winrm_server_cert_validation='ignore',
ansible_winrm_transport='ntlm',
ansible_winrm_scheme='http',
ansible_port='5985',
)
else:
options.update(
ansible_connection='winrm',
ansible_winrm_server_cert_validation='ignore',
)
hosts.append(
'%s %s' % (
remote.name.replace('/', '_'),
' '.join('%s="%s"' % (k, options[k]) for k in sorted(options)),
)
)
template = """
[windows]
%s
# support winrm binary module tests (temporary solution)
[testhost:children]
windows
"""
template = textwrap.dedent(template)
inventory = template % ('\n'.join(hosts))
return inventory

@ -0,0 +1,30 @@
"""Open a shell prompt inside an ansible-test environment."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ...util_common import (
run_command,
)
from ...config import (
ShellConfig,
)
from ...executor import (
Delegate,
create_shell_command,
install_command_requirements,
)
def command_shell(args):
"""
:type args: ShellConfig
"""
if args.delegate:
raise Delegate()
install_command_requirements(args)
cmd = create_shell_command(['bash', '-i'])
run_command(args, cmd)

@ -16,7 +16,6 @@ from .io import (
)
from .executor import (
SUPPORTED_PYTHON_VERSIONS,
create_shell_command,
run_pypi_proxy,
get_python_interpreter,
@ -54,6 +53,7 @@ from .util import (
ANSIBLE_LIB_ROOT,
ANSIBLE_TEST_ROOT,
tempdir,
SUPPORTED_PYTHON_VERSIONS,
)
from .util_common import (

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save