You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/lib/ansible_test/_internal/ansible_util.py

308 lines
10 KiB
Python

"""Miscellaneous utility functions and classes specific to ansible cli tools."""
from __future__ import annotations
import json
import os
import typing as t
from .constants import (
SOFT_RLIMIT_NOFILE,
)
from .io import (
write_text_file,
)
from .util import (
common_environment,
ApplicationError,
ANSIBLE_LIB_ROOT,
ANSIBLE_TEST_DATA_ROOT,
ANSIBLE_BIN_PATH,
ANSIBLE_SOURCE_ROOT,
ANSIBLE_TEST_TOOLS_ROOT,
get_ansible_version,
raw_command,
)
from .util_common import (
create_temp_dir,
ResultType,
intercept_python,
get_injector_path,
)
from .config import (
IntegrationConfig,
PosixIntegrationConfig,
EnvironmentConfig,
CommonConfig,
)
from .data import (
data_context,
)
from .python_requirements import (
install_requirements,
)
from .host_configs import (
PythonConfig,
)
from .thread import (
mutex,
)
def parse_inventory(args: EnvironmentConfig, inventory_path: str) -> dict[str, t.Any]:
"""Return a dict parsed from the given inventory file."""
cmd = ['ansible-inventory', '-i', inventory_path, '--list']
env = ansible_environment(args)
inventory = json.loads(intercept_python(args, args.controller_python, cmd, env, capture=True, always=True)[0])
return inventory
def get_hosts(inventory: dict[str, t.Any], group_name: str) -> dict[str, dict[str, t.Any]]:
"""Return a dict of hosts from the specified group in the given inventory."""
hostvars = inventory.get('_meta', {}).get('hostvars', {})
group = inventory.get(group_name, {})
host_names = group.get('hosts', [])
hosts = dict((name, hostvars.get(name, {})) for name in host_names)
return hosts
def ansible_environment(args: CommonConfig, color: bool = True, ansible_config: t.Optional[str] = None) -> dict[str, str]:
"""Return a dictionary of environment variables to use when running Ansible commands."""
env = common_environment()
path = env['PATH']
if not path.startswith(ANSIBLE_BIN_PATH + os.path.pathsep):
path = ANSIBLE_BIN_PATH + os.path.pathsep + path
if not ansible_config:
# use the default empty configuration unless one has been provided
ansible_config = args.get_ansible_config()
if not args.explain and not os.path.exists(ansible_config):
raise ApplicationError('Configuration not found: %s' % ansible_config)
ansible = dict(
ANSIBLE_PYTHON_MODULE_RLIMIT_NOFILE=str(SOFT_RLIMIT_NOFILE),
ANSIBLE_FORCE_COLOR='%s' % 'true' if args.color and color else 'false',
ANSIBLE_FORCE_HANDLERS='true', # allow cleanup handlers to run when tests fail
ANSIBLE_HOST_PATTERN_MISMATCH='error', # prevent tests from unintentionally passing when hosts are not found
ANSIBLE_INVENTORY='/dev/null', # force tests to provide inventory
ANSIBLE_DEPRECATION_WARNINGS='false',
ANSIBLE_HOST_KEY_CHECKING='false',
ANSIBLE_RETRY_FILES_ENABLED='false',
ANSIBLE_CONFIG=ansible_config,
ANSIBLE_LIBRARY='/dev/null',
ANSIBLE_DEVEL_WARNING='false', # Don't show warnings that CI is running devel
PYTHONPATH=get_ansible_python_path(args),
PAGER='/bin/cat',
PATH=path,
# give TQM worker processes time to report code coverage results
# without this the last task in a play may write no coverage file, an empty file, or an incomplete file
# enabled even when not using code coverage to surface warnings when worker processes do not exit cleanly
ANSIBLE_WORKER_SHUTDOWN_POLL_COUNT='100',
ANSIBLE_WORKER_SHUTDOWN_POLL_DELAY='0.1',
)
if isinstance(args, IntegrationConfig) and args.coverage:
# standard path injection is not effective for ansible-connection, instead the location must be configured
# ansible-connection only requires the injector for code coverage
# the correct python interpreter is already selected using the sys.executable used to invoke ansible
ansible.update(
ANSIBLE_CONNECTION_PATH=os.path.join(get_injector_path(), 'ansible-connection'),
)
if isinstance(args, PosixIntegrationConfig):
ansible.update(
ANSIBLE_PYTHON_INTERPRETER='/set/ansible_python_interpreter/in/inventory', # force tests to set ansible_python_interpreter in inventory
)
env.update(ansible)
if args.debug:
env.update(
ANSIBLE_DEBUG='true',
ANSIBLE_LOG_PATH=os.path.join(ResultType.LOGS.name, 'debug.log'),
)
if data_context().content.collection:
env.update(
ANSIBLE_COLLECTIONS_PATH=data_context().content.collection.root,
)
if data_context().content.is_ansible:
env.update(configure_plugin_paths(args))
return env
def configure_plugin_paths(args: CommonConfig) -> dict[str, str]:
"""Return environment variables with paths to plugins relevant for the current command."""
if not isinstance(args, IntegrationConfig):
return {}
support_path = os.path.join(ANSIBLE_SOURCE_ROOT, 'test', 'support', args.command)
# provide private copies of collections for integration tests
collection_root = os.path.join(support_path, 'collections')
env = dict(
ANSIBLE_COLLECTIONS_PATH=collection_root,
)
# provide private copies of plugins for integration tests
plugin_root = os.path.join(support_path, 'plugins')
plugin_list = [
'action',
'become',
'cache',
'callback',
'cliconf',
'connection',
'filter',
'httpapi',
'inventory',
'lookup',
'netconf',
# 'shell' is not configurable
'strategy',
'terminal',
'test',
'vars',
]
# most plugins follow a standard naming convention
plugin_map = dict(('%s_plugins' % name, name) for name in plugin_list)
# these plugins do not follow the standard naming convention
plugin_map.update(
doc_fragment='doc_fragments',
library='modules',
module_utils='module_utils',
)
env.update(dict(('ANSIBLE_%s' % key.upper(), os.path.join(plugin_root, value)) for key, value in plugin_map.items()))
# only configure directories which exist
env = dict((key, value) for key, value in env.items() if os.path.isdir(value))
return env
@mutex
def get_ansible_python_path(args: CommonConfig) -> str:
"""
Return a directory usable for PYTHONPATH, containing only the ansible package.
If a temporary directory is required, it will be cached for the lifetime of the process and cleaned up at exit.
"""
try:
return get_ansible_python_path.python_path # type: ignore[attr-defined]
except AttributeError:
pass
if ANSIBLE_SOURCE_ROOT:
# when running from source there is no need for a temporary directory to isolate the ansible package
python_path = os.path.dirname(ANSIBLE_LIB_ROOT)
else:
# when not running from source the installed directory is unsafe to add to PYTHONPATH
# doing so would expose many unwanted packages on sys.path
# instead a temporary directory is created which contains only ansible using a symlink
python_path = create_temp_dir(prefix='ansible-test-')
os.symlink(ANSIBLE_LIB_ROOT, os.path.join(python_path, 'ansible'))
if not args.explain:
generate_egg_info(python_path)
get_ansible_python_path.python_path = python_path # type: ignore[attr-defined]
return python_path
def generate_egg_info(path: str) -> None:
"""Generate an egg-info in the specified base directory."""
# minimal PKG-INFO stub following the format defined in PEP 241
# required for older setuptools versions to avoid a traceback when importing pkg_resources from packages like cryptography
# newer setuptools versions are happy with an empty directory
# including a stub here means we don't need to locate the existing file or run any tools to generate it when running from source
pkg_info = '''
Metadata-Version: 1.0
Name: ansible
Version: %s
Platform: UNKNOWN
Summary: Radically simple IT automation
Author-email: info@ansible.com
License: GPLv3+
''' % get_ansible_version()
pkg_info_path = os.path.join(path, 'ansible_core.egg-info', 'PKG-INFO')
if os.path.exists(pkg_info_path):
return
write_text_file(pkg_info_path, pkg_info.lstrip(), create_directories=True)
class CollectionDetail:
"""Collection detail."""
def __init__(self) -> None:
self.version: t.Optional[str] = None
class CollectionDetailError(ApplicationError):
"""An error occurred retrieving collection detail."""
def __init__(self, reason: str) -> None:
super().__init__('Error collecting collection detail: %s' % reason)
self.reason = reason
def get_collection_detail(python: PythonConfig) -> CollectionDetail:
"""Return collection detail."""
collection = data_context().content.collection
directory = os.path.join(collection.root, collection.directory)
stdout = raw_command([python.path, os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'collection_detail.py'), directory], capture=True)[0]
result = json.loads(stdout)
error = result.get('error')
if error:
raise CollectionDetailError(error)
version = result.get('version')
detail = CollectionDetail()
detail.version = str(version) if version is not None else None
return detail
def run_playbook(
args: EnvironmentConfig,
inventory_path: str,
playbook: str,
capture: bool,
variables: t.Optional[dict[str, t.Any]] = None,
) -> None:
"""Run the specified playbook using the given inventory file and playbook variables."""
playbook_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'playbooks', playbook)
cmd = ['ansible-playbook', '-i', inventory_path, playbook_path]
if variables:
cmd.extend(['-e', json.dumps(variables)])
if args.verbosity:
cmd.append('-%s' % ('v' * args.verbosity))
install_requirements(args, args.controller_python, ansible=True) # run_playbook()
env = ansible_environment(args)
intercept_python(args, args.controller_python, cmd, env, capture=capture)