mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
942 lines
36 KiB
Python
942 lines
36 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (c) 2019 Ansible Project
|
|
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import os
|
|
|
|
from collections import deque
|
|
from itertools import chain
|
|
|
|
from ansible.module_utils.common.collections import is_iterable
|
|
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
|
|
from ansible.module_utils.common.warnings import warn
|
|
from ansible.module_utils.errors import (
|
|
AliasError,
|
|
AnsibleFallbackNotFound,
|
|
AnsibleValidationErrorMultiple,
|
|
ArgumentTypeError,
|
|
ArgumentValueError,
|
|
ElementError,
|
|
MutuallyExclusiveError,
|
|
NoLogError,
|
|
RequiredByError,
|
|
RequiredError,
|
|
RequiredIfError,
|
|
RequiredOneOfError,
|
|
RequiredTogetherError,
|
|
SubParameterTypeError,
|
|
)
|
|
from ansible.module_utils.parsing.convert_bool import BOOLEANS_FALSE, BOOLEANS_TRUE
|
|
|
|
from ansible.module_utils.six.moves.collections_abc import (
|
|
KeysView,
|
|
Set,
|
|
Sequence,
|
|
Mapping,
|
|
MutableMapping,
|
|
MutableSet,
|
|
MutableSequence,
|
|
)
|
|
|
|
from ansible.module_utils.six import (
|
|
binary_type,
|
|
integer_types,
|
|
string_types,
|
|
text_type,
|
|
PY2,
|
|
PY3,
|
|
)
|
|
|
|
from ansible.module_utils.common.validation import (
|
|
check_mutually_exclusive,
|
|
check_required_arguments,
|
|
check_required_together,
|
|
check_required_one_of,
|
|
check_required_if,
|
|
check_required_by,
|
|
check_type_bits,
|
|
check_type_bool,
|
|
check_type_bytes,
|
|
check_type_dict,
|
|
check_type_float,
|
|
check_type_int,
|
|
check_type_jsonarg,
|
|
check_type_list,
|
|
check_type_path,
|
|
check_type_raw,
|
|
check_type_str,
|
|
)
|
|
|
|
# Python2 & 3 way to get NoneType
|
|
NoneType = type(None)
|
|
|
|
_ADDITIONAL_CHECKS = (
|
|
{'func': check_required_together, 'attr': 'required_together', 'err': RequiredTogetherError},
|
|
{'func': check_required_one_of, 'attr': 'required_one_of', 'err': RequiredOneOfError},
|
|
{'func': check_required_if, 'attr': 'required_if', 'err': RequiredIfError},
|
|
{'func': check_required_by, 'attr': 'required_by', 'err': RequiredByError},
|
|
)
|
|
|
|
# if adding boolean attribute, also add to PASS_BOOL
|
|
# some of this dupes defaults from controller config
|
|
# keep in sync with copy in lib/ansible/module_utils/csharp/Ansible.Basic.cs
|
|
PASS_VARS = {
|
|
'check_mode': ('check_mode', False),
|
|
'debug': ('_debug', False),
|
|
'diff': ('_diff', False),
|
|
'keep_remote_files': ('_keep_remote_files', False),
|
|
'ignore_unknown_opts': ('_ignore_unknown_opts', False),
|
|
'module_name': ('_name', None),
|
|
'no_log': ('no_log', False),
|
|
'remote_tmp': ('_remote_tmp', None),
|
|
'target_log_info': ('_target_log_info', None),
|
|
'selinux_special_fs': ('_selinux_special_fs', ['fuse', 'nfs', 'vboxsf', 'ramfs', '9p', 'vfat']),
|
|
'shell_executable': ('_shell', '/bin/sh'),
|
|
'socket': ('_socket_path', None),
|
|
'string_conversion_action': ('_string_conversion_action', 'warn'),
|
|
'syslog_facility': ('_syslog_facility', 'INFO'),
|
|
'tmpdir': ('_tmpdir', None),
|
|
'verbosity': ('_verbosity', 0),
|
|
'version': ('ansible_version', '0.0'),
|
|
}
|
|
|
|
PASS_BOOLS = ('check_mode', 'debug', 'diff', 'keep_remote_files', 'ignore_unknown_opts', 'no_log')
|
|
|
|
DEFAULT_TYPE_VALIDATORS = {
|
|
'str': check_type_str,
|
|
'list': check_type_list,
|
|
'dict': check_type_dict,
|
|
'bool': check_type_bool,
|
|
'int': check_type_int,
|
|
'float': check_type_float,
|
|
'path': check_type_path,
|
|
'raw': check_type_raw,
|
|
'jsonarg': check_type_jsonarg,
|
|
'json': check_type_jsonarg,
|
|
'bytes': check_type_bytes,
|
|
'bits': check_type_bits,
|
|
}
|
|
|
|
|
|
def _get_type_validator(wanted):
|
|
"""Returns the callable used to validate a wanted type and the type name.
|
|
|
|
:arg wanted: String or callable. If a string, get the corresponding
|
|
validation function from DEFAULT_TYPE_VALIDATORS. If callable,
|
|
get the name of the custom callable and return that for the type_checker.
|
|
|
|
:returns: Tuple of callable function or None, and a string that is the name
|
|
of the wanted type.
|
|
"""
|
|
|
|
# Use one of our builtin validators.
|
|
if not callable(wanted):
|
|
if wanted is None:
|
|
# Default type for parameters
|
|
wanted = 'str'
|
|
|
|
type_checker = DEFAULT_TYPE_VALIDATORS.get(wanted)
|
|
|
|
# Use the custom callable for validation.
|
|
else:
|
|
type_checker = wanted
|
|
wanted = getattr(wanted, '__name__', to_native(type(wanted)))
|
|
|
|
return type_checker, wanted
|
|
|
|
|
|
def _get_legal_inputs(argument_spec, parameters, aliases=None):
|
|
if aliases is None:
|
|
aliases = _handle_aliases(argument_spec, parameters)
|
|
|
|
return list(aliases.keys()) + list(argument_spec.keys())
|
|
|
|
|
|
def _get_unsupported_parameters(argument_spec, parameters, legal_inputs=None, options_context=None, store_supported=None):
|
|
"""Check keys in parameters against those provided in legal_inputs
|
|
to ensure they contain legal values. If legal_inputs are not supplied,
|
|
they will be generated using the argument_spec.
|
|
|
|
:arg argument_spec: Dictionary of parameters, their type, and valid values.
|
|
:arg parameters: Dictionary of parameters.
|
|
:arg legal_inputs: List of valid key names property names. Overrides values
|
|
in argument_spec.
|
|
:arg options_context: List of parent keys for tracking the context of where
|
|
a parameter is defined.
|
|
|
|
:returns: Set of unsupported parameters. Empty set if no unsupported parameters
|
|
are found.
|
|
"""
|
|
|
|
if legal_inputs is None:
|
|
legal_inputs = _get_legal_inputs(argument_spec, parameters)
|
|
|
|
unsupported_parameters = set()
|
|
for k in parameters.keys():
|
|
if k not in legal_inputs:
|
|
context = k
|
|
if options_context:
|
|
context = tuple(options_context + [k])
|
|
|
|
unsupported_parameters.add(context)
|
|
|
|
if store_supported is not None:
|
|
supported_aliases = _handle_aliases(argument_spec, parameters)
|
|
supported_params = []
|
|
for option in legal_inputs:
|
|
if option in supported_aliases:
|
|
continue
|
|
supported_params.append(option)
|
|
|
|
store_supported.update({context: (supported_params, supported_aliases)})
|
|
|
|
return unsupported_parameters
|
|
|
|
|
|
def _handle_aliases(argument_spec, parameters, alias_warnings=None, alias_deprecations=None):
|
|
"""Process aliases from an argument_spec including warnings and deprecations.
|
|
|
|
Modify ``parameters`` by adding a new key for each alias with the supplied
|
|
value from ``parameters``.
|
|
|
|
If a list is provided to the alias_warnings parameter, it will be filled with tuples
|
|
(option, alias) in every case where both an option and its alias are specified.
|
|
|
|
If a list is provided to alias_deprecations, it will be populated with dictionaries,
|
|
each containing deprecation information for each alias found in argument_spec.
|
|
|
|
:param argument_spec: Dictionary of parameters, their type, and valid values.
|
|
:type argument_spec: dict
|
|
|
|
:param parameters: Dictionary of parameters.
|
|
:type parameters: dict
|
|
|
|
:param alias_warnings:
|
|
:type alias_warnings: list
|
|
|
|
:param alias_deprecations:
|
|
:type alias_deprecations: list
|
|
"""
|
|
|
|
aliases_results = {} # alias:canon
|
|
|
|
for (k, v) in argument_spec.items():
|
|
aliases = v.get('aliases', None)
|
|
default = v.get('default', None)
|
|
required = v.get('required', False)
|
|
|
|
if alias_deprecations is not None:
|
|
for alias in argument_spec[k].get('deprecated_aliases', []):
|
|
if alias.get('name') in parameters:
|
|
alias_deprecations.append(alias)
|
|
|
|
if default is not None and required:
|
|
# not alias specific but this is a good place to check this
|
|
raise ValueError("internal error: required and default are mutually exclusive for %s" % k)
|
|
|
|
if aliases is None:
|
|
continue
|
|
|
|
if not is_iterable(aliases) or isinstance(aliases, (binary_type, text_type)):
|
|
raise TypeError('internal error: aliases must be a list or tuple')
|
|
|
|
for alias in aliases:
|
|
aliases_results[alias] = k
|
|
if alias in parameters:
|
|
if k in parameters and alias_warnings is not None:
|
|
alias_warnings.append((k, alias))
|
|
parameters[k] = parameters[alias]
|
|
|
|
return aliases_results
|
|
|
|
|
|
def _list_deprecations(argument_spec, parameters, prefix=''):
|
|
"""Return a list of deprecations
|
|
|
|
:arg argument_spec: An argument spec dictionary
|
|
:arg parameters: Dictionary of parameters
|
|
|
|
:returns: List of dictionaries containing a message and version in which
|
|
the deprecated parameter will be removed, or an empty list.
|
|
|
|
:Example return:
|
|
|
|
.. code-block:: python
|
|
|
|
[
|
|
{
|
|
'msg': "Param 'deptest' is deprecated. See the module docs for more information",
|
|
'version': '2.9'
|
|
}
|
|
]
|
|
"""
|
|
|
|
deprecations = []
|
|
for arg_name, arg_opts in argument_spec.items():
|
|
if arg_name in parameters:
|
|
if prefix:
|
|
sub_prefix = '%s["%s"]' % (prefix, arg_name)
|
|
else:
|
|
sub_prefix = arg_name
|
|
if arg_opts.get('removed_at_date') is not None:
|
|
deprecations.append({
|
|
'msg': "Param '%s' is deprecated. See the module docs for more information" % sub_prefix,
|
|
'date': arg_opts.get('removed_at_date'),
|
|
'collection_name': arg_opts.get('removed_from_collection'),
|
|
})
|
|
elif arg_opts.get('removed_in_version') is not None:
|
|
deprecations.append({
|
|
'msg': "Param '%s' is deprecated. See the module docs for more information" % sub_prefix,
|
|
'version': arg_opts.get('removed_in_version'),
|
|
'collection_name': arg_opts.get('removed_from_collection'),
|
|
})
|
|
# Check sub-argument spec
|
|
sub_argument_spec = arg_opts.get('options')
|
|
if sub_argument_spec is not None:
|
|
sub_arguments = parameters[arg_name]
|
|
if isinstance(sub_arguments, Mapping):
|
|
sub_arguments = [sub_arguments]
|
|
if isinstance(sub_arguments, list):
|
|
for sub_params in sub_arguments:
|
|
if isinstance(sub_params, Mapping):
|
|
deprecations.extend(_list_deprecations(sub_argument_spec, sub_params, prefix=sub_prefix))
|
|
|
|
return deprecations
|
|
|
|
|
|
def _list_no_log_values(argument_spec, params):
|
|
"""Return set of no log values
|
|
|
|
:arg argument_spec: An argument spec dictionary
|
|
:arg params: Dictionary of all parameters
|
|
|
|
:returns: :class:`set` of strings that should be hidden from output:
|
|
"""
|
|
|
|
no_log_values = set()
|
|
for arg_name, arg_opts in argument_spec.items():
|
|
if arg_opts.get('no_log', False):
|
|
# Find the value for the no_log'd param
|
|
no_log_object = params.get(arg_name, None)
|
|
|
|
if no_log_object:
|
|
try:
|
|
no_log_values.update(_return_datastructure_name(no_log_object))
|
|
except TypeError as e:
|
|
raise TypeError('Failed to convert "%s": %s' % (arg_name, to_native(e)))
|
|
|
|
# Get no_log values from suboptions
|
|
sub_argument_spec = arg_opts.get('options')
|
|
if sub_argument_spec is not None:
|
|
wanted_type = arg_opts.get('type')
|
|
sub_parameters = params.get(arg_name)
|
|
|
|
if sub_parameters is not None:
|
|
if wanted_type == 'dict' or (wanted_type == 'list' and arg_opts.get('elements', '') == 'dict'):
|
|
# Sub parameters can be a dict or list of dicts. Ensure parameters are always a list.
|
|
if not isinstance(sub_parameters, list):
|
|
sub_parameters = [sub_parameters]
|
|
|
|
for sub_param in sub_parameters:
|
|
# Validate dict fields in case they came in as strings
|
|
|
|
if isinstance(sub_param, string_types):
|
|
sub_param = check_type_dict(sub_param)
|
|
|
|
if not isinstance(sub_param, Mapping):
|
|
raise TypeError("Value '{1}' in the sub parameter field '{0}' must be a {2}, "
|
|
"not '{1.__class__.__name__}'".format(arg_name, sub_param, wanted_type))
|
|
|
|
no_log_values.update(_list_no_log_values(sub_argument_spec, sub_param))
|
|
|
|
return no_log_values
|
|
|
|
|
|
def _return_datastructure_name(obj):
|
|
""" Return native stringified values from datastructures.
|
|
|
|
For use with removing sensitive values pre-jsonification."""
|
|
if isinstance(obj, (text_type, binary_type)):
|
|
if obj:
|
|
yield to_native(obj, errors='surrogate_or_strict')
|
|
return
|
|
elif isinstance(obj, Mapping):
|
|
for element in obj.items():
|
|
for subelement in _return_datastructure_name(element[1]):
|
|
yield subelement
|
|
elif is_iterable(obj):
|
|
for element in obj:
|
|
for subelement in _return_datastructure_name(element):
|
|
yield subelement
|
|
elif obj is None or isinstance(obj, bool):
|
|
# This must come before int because bools are also ints
|
|
return
|
|
elif isinstance(obj, tuple(list(integer_types) + [float])):
|
|
yield to_native(obj, nonstring='simplerepr')
|
|
else:
|
|
raise TypeError('Unknown parameter type: %s' % (type(obj)))
|
|
|
|
|
|
def _remove_values_conditions(value, no_log_strings, deferred_removals):
|
|
"""
|
|
Helper function for :meth:`remove_values`.
|
|
|
|
:arg value: The value to check for strings that need to be stripped
|
|
:arg no_log_strings: set of strings which must be stripped out of any values
|
|
:arg deferred_removals: List which holds information about nested
|
|
containers that have to be iterated for removals. It is passed into
|
|
this function so that more entries can be added to it if value is
|
|
a container type. The format of each entry is a 2-tuple where the first
|
|
element is the ``value`` parameter and the second value is a new
|
|
container to copy the elements of ``value`` into once iterated.
|
|
|
|
:returns: if ``value`` is a scalar, returns ``value`` with two exceptions:
|
|
|
|
1. :class:`~datetime.datetime` objects which are changed into a string representation.
|
|
2. objects which are in ``no_log_strings`` are replaced with a placeholder
|
|
so that no sensitive data is leaked.
|
|
|
|
If ``value`` is a container type, returns a new empty container.
|
|
|
|
``deferred_removals`` is added to as a side-effect of this function.
|
|
|
|
.. warning:: It is up to the caller to make sure the order in which value
|
|
is passed in is correct. For instance, higher level containers need
|
|
to be passed in before lower level containers. For example, given
|
|
``{'level1': {'level2': 'level3': [True]} }`` first pass in the
|
|
dictionary for ``level1``, then the dict for ``level2``, and finally
|
|
the list for ``level3``.
|
|
"""
|
|
if isinstance(value, (text_type, binary_type)):
|
|
# Need native str type
|
|
native_str_value = value
|
|
if isinstance(value, text_type):
|
|
value_is_text = True
|
|
if PY2:
|
|
native_str_value = to_bytes(value, errors='surrogate_or_strict')
|
|
elif isinstance(value, binary_type):
|
|
value_is_text = False
|
|
if PY3:
|
|
native_str_value = to_text(value, errors='surrogate_or_strict')
|
|
|
|
if native_str_value in no_log_strings:
|
|
return 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
|
|
for omit_me in no_log_strings:
|
|
native_str_value = native_str_value.replace(omit_me, '*' * 8)
|
|
|
|
if value_is_text and isinstance(native_str_value, binary_type):
|
|
value = to_text(native_str_value, encoding='utf-8', errors='surrogate_then_replace')
|
|
elif not value_is_text and isinstance(native_str_value, text_type):
|
|
value = to_bytes(native_str_value, encoding='utf-8', errors='surrogate_then_replace')
|
|
else:
|
|
value = native_str_value
|
|
|
|
elif isinstance(value, Sequence):
|
|
if isinstance(value, MutableSequence):
|
|
new_value = type(value)()
|
|
else:
|
|
new_value = [] # Need a mutable value
|
|
deferred_removals.append((value, new_value))
|
|
value = new_value
|
|
|
|
elif isinstance(value, Set):
|
|
if isinstance(value, MutableSet):
|
|
new_value = type(value)()
|
|
else:
|
|
new_value = set() # Need a mutable value
|
|
deferred_removals.append((value, new_value))
|
|
value = new_value
|
|
|
|
elif isinstance(value, Mapping):
|
|
if isinstance(value, MutableMapping):
|
|
new_value = type(value)()
|
|
else:
|
|
new_value = {} # Need a mutable value
|
|
deferred_removals.append((value, new_value))
|
|
value = new_value
|
|
|
|
elif isinstance(value, tuple(chain(integer_types, (float, bool, NoneType)))):
|
|
stringy_value = to_native(value, encoding='utf-8', errors='surrogate_or_strict')
|
|
if stringy_value in no_log_strings:
|
|
return 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
|
|
for omit_me in no_log_strings:
|
|
if omit_me in stringy_value:
|
|
return 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
|
|
|
|
elif isinstance(value, (datetime.datetime, datetime.date)):
|
|
value = value.isoformat()
|
|
else:
|
|
raise TypeError('Value of unknown type: %s, %s' % (type(value), value))
|
|
|
|
return value
|
|
|
|
|
|
def _set_defaults(argument_spec, parameters, set_default=True):
|
|
"""Set default values for parameters when no value is supplied.
|
|
|
|
Modifies parameters directly.
|
|
|
|
:arg argument_spec: Argument spec
|
|
:type argument_spec: dict
|
|
|
|
:arg parameters: Parameters to evaluate
|
|
:type parameters: dict
|
|
|
|
:kwarg set_default: Whether or not to set the default values
|
|
:type set_default: bool
|
|
|
|
:returns: Set of strings that should not be logged.
|
|
:rtype: set
|
|
"""
|
|
|
|
no_log_values = set()
|
|
for param, value in argument_spec.items():
|
|
|
|
# TODO: Change the default value from None to Sentinel to differentiate between
|
|
# user supplied None and a default value set by this function.
|
|
default = value.get('default', None)
|
|
|
|
# This prevents setting defaults on required items on the 1st run,
|
|
# otherwise will set things without a default to None on the 2nd.
|
|
if param not in parameters and (default is not None or set_default):
|
|
# Make sure any default value for no_log fields are masked.
|
|
if value.get('no_log', False) and default:
|
|
no_log_values.add(default)
|
|
|
|
parameters[param] = default
|
|
|
|
return no_log_values
|
|
|
|
|
|
def _sanitize_keys_conditions(value, no_log_strings, ignore_keys, deferred_removals):
|
|
""" Helper method to :func:`sanitize_keys` to build ``deferred_removals`` and avoid deep recursion. """
|
|
if isinstance(value, (text_type, binary_type)):
|
|
return value
|
|
|
|
if isinstance(value, Sequence):
|
|
if isinstance(value, MutableSequence):
|
|
new_value = type(value)()
|
|
else:
|
|
new_value = [] # Need a mutable value
|
|
deferred_removals.append((value, new_value))
|
|
return new_value
|
|
|
|
if isinstance(value, Set):
|
|
if isinstance(value, MutableSet):
|
|
new_value = type(value)()
|
|
else:
|
|
new_value = set() # Need a mutable value
|
|
deferred_removals.append((value, new_value))
|
|
return new_value
|
|
|
|
if isinstance(value, Mapping):
|
|
if isinstance(value, MutableMapping):
|
|
new_value = type(value)()
|
|
else:
|
|
new_value = {} # Need a mutable value
|
|
deferred_removals.append((value, new_value))
|
|
return new_value
|
|
|
|
if isinstance(value, tuple(chain(integer_types, (float, bool, NoneType)))):
|
|
return value
|
|
|
|
if isinstance(value, (datetime.datetime, datetime.date)):
|
|
return value
|
|
|
|
raise TypeError('Value of unknown type: %s, %s' % (type(value), value))
|
|
|
|
|
|
def _validate_elements(wanted_type, parameter, values, options_context=None, errors=None):
|
|
|
|
if errors is None:
|
|
errors = AnsibleValidationErrorMultiple()
|
|
|
|
type_checker, wanted_element_type = _get_type_validator(wanted_type)
|
|
validated_parameters = []
|
|
# Get param name for strings so we can later display this value in a useful error message if needed
|
|
# Only pass 'kwargs' to our checkers and ignore custom callable checkers
|
|
kwargs = {}
|
|
if wanted_element_type == 'str' and isinstance(wanted_type, string_types):
|
|
if isinstance(parameter, string_types):
|
|
kwargs['param'] = parameter
|
|
elif isinstance(parameter, dict):
|
|
kwargs['param'] = list(parameter.keys())[0]
|
|
|
|
for value in values:
|
|
try:
|
|
validated_parameters.append(type_checker(value, **kwargs))
|
|
except (TypeError, ValueError) as e:
|
|
msg = "Elements value for option '%s'" % parameter
|
|
if options_context:
|
|
msg += " found in '%s'" % " -> ".join(options_context)
|
|
msg += " is of type %s and we were unable to convert to %s: %s" % (type(value), wanted_element_type, to_native(e))
|
|
errors.append(ElementError(msg))
|
|
return validated_parameters
|
|
|
|
|
|
def _validate_argument_types(argument_spec, parameters, prefix='', options_context=None, errors=None):
|
|
"""Validate that parameter types match the type in the argument spec.
|
|
|
|
Determine the appropriate type checker function and run each
|
|
parameter value through that function. All error messages from type checker
|
|
functions are returned. If any parameter fails to validate, it will not
|
|
be in the returned parameters.
|
|
|
|
:arg argument_spec: Argument spec
|
|
:type argument_spec: dict
|
|
|
|
:arg parameters: Parameters
|
|
:type parameters: dict
|
|
|
|
:kwarg prefix: Name of the parent key that contains the spec. Used in the error message
|
|
:type prefix: str
|
|
|
|
:kwarg options_context: List of contexts?
|
|
:type options_context: list
|
|
|
|
:returns: Two item tuple containing validated and coerced parameters
|
|
and a list of any errors that were encountered.
|
|
:rtype: tuple
|
|
|
|
"""
|
|
|
|
if errors is None:
|
|
errors = AnsibleValidationErrorMultiple()
|
|
|
|
for param, spec in argument_spec.items():
|
|
if param not in parameters:
|
|
continue
|
|
|
|
value = parameters[param]
|
|
if value is None and not spec.get('required') and spec.get('default') is None:
|
|
continue
|
|
|
|
wanted_type = spec.get('type')
|
|
type_checker, wanted_name = _get_type_validator(wanted_type)
|
|
# Get param name for strings so we can later display this value in a useful error message if needed
|
|
# Only pass 'kwargs' to our checkers and ignore custom callable checkers
|
|
kwargs = {}
|
|
if wanted_name == 'str' and isinstance(wanted_type, string_types):
|
|
kwargs['param'] = list(parameters.keys())[0]
|
|
|
|
# Get the name of the parent key if this is a nested option
|
|
if prefix:
|
|
kwargs['prefix'] = prefix
|
|
|
|
try:
|
|
parameters[param] = type_checker(value, **kwargs)
|
|
elements_wanted_type = spec.get('elements', None)
|
|
if elements_wanted_type:
|
|
elements = parameters[param]
|
|
if wanted_type != 'list' or not isinstance(elements, list):
|
|
msg = "Invalid type %s for option '%s'" % (wanted_name, elements)
|
|
if options_context:
|
|
msg += " found in '%s'." % " -> ".join(options_context)
|
|
msg += ", elements value check is supported only with 'list' type"
|
|
errors.append(ArgumentTypeError(msg))
|
|
parameters[param] = _validate_elements(elements_wanted_type, param, elements, options_context, errors)
|
|
|
|
except (TypeError, ValueError) as e:
|
|
msg = "argument '%s' is of type %s" % (param, type(value))
|
|
if options_context:
|
|
msg += " found in '%s'." % " -> ".join(options_context)
|
|
msg += " and we were unable to convert to %s: %s" % (wanted_name, to_native(e))
|
|
errors.append(ArgumentTypeError(msg))
|
|
|
|
|
|
def _validate_argument_values(argument_spec, parameters, options_context=None, errors=None):
|
|
"""Ensure all arguments have the requested values, and there are no stray arguments"""
|
|
|
|
if errors is None:
|
|
errors = AnsibleValidationErrorMultiple()
|
|
|
|
for param, spec in argument_spec.items():
|
|
choices = spec.get('choices')
|
|
if choices is None:
|
|
continue
|
|
|
|
if isinstance(choices, (frozenset, KeysView, Sequence)) and not isinstance(choices, (binary_type, text_type)):
|
|
if param in parameters:
|
|
# Allow one or more when type='list' param with choices
|
|
if isinstance(parameters[param], list):
|
|
diff_list = [item for item in parameters[param] if item not in choices]
|
|
if diff_list:
|
|
choices_str = ", ".join([to_native(c) for c in choices])
|
|
diff_str = ", ".join(diff_list)
|
|
msg = "value of %s must be one or more of: %s. Got no match for: %s" % (param, choices_str, diff_str)
|
|
if options_context:
|
|
msg = "{0} found in {1}".format(msg, " -> ".join(options_context))
|
|
errors.append(ArgumentValueError(msg))
|
|
elif parameters[param] not in choices:
|
|
# PyYaml converts certain strings to bools. If we can unambiguously convert back, do so before checking
|
|
# the value. If we can't figure this out, module author is responsible.
|
|
if parameters[param] == 'False':
|
|
overlap = BOOLEANS_FALSE.intersection(choices)
|
|
if len(overlap) == 1:
|
|
# Extract from a set
|
|
(parameters[param],) = overlap
|
|
|
|
if parameters[param] == 'True':
|
|
overlap = BOOLEANS_TRUE.intersection(choices)
|
|
if len(overlap) == 1:
|
|
(parameters[param],) = overlap
|
|
|
|
if parameters[param] not in choices:
|
|
choices_str = ", ".join([to_native(c) for c in choices])
|
|
msg = "value of %s must be one of: %s, got: %s" % (param, choices_str, parameters[param])
|
|
if options_context:
|
|
msg = "{0} found in {1}".format(msg, " -> ".join(options_context))
|
|
errors.append(ArgumentValueError(msg))
|
|
else:
|
|
msg = "internal error: choices for argument %s are not iterable: %s" % (param, choices)
|
|
if options_context:
|
|
msg = "{0} found in {1}".format(msg, " -> ".join(options_context))
|
|
errors.append(ArgumentTypeError(msg))
|
|
|
|
|
|
def _validate_sub_spec(
|
|
argument_spec,
|
|
parameters,
|
|
prefix="",
|
|
options_context=None,
|
|
errors=None,
|
|
no_log_values=None,
|
|
unsupported_parameters=None,
|
|
supported_parameters=None,
|
|
alias_deprecations=None,
|
|
):
|
|
"""Validate sub argument spec.
|
|
|
|
This function is recursive.
|
|
"""
|
|
|
|
if options_context is None:
|
|
options_context = []
|
|
|
|
if errors is None:
|
|
errors = AnsibleValidationErrorMultiple()
|
|
|
|
if no_log_values is None:
|
|
no_log_values = set()
|
|
|
|
if unsupported_parameters is None:
|
|
unsupported_parameters = set()
|
|
if supported_parameters is None:
|
|
supported_parameters = dict()
|
|
|
|
for param, value in argument_spec.items():
|
|
wanted = value.get('type')
|
|
if wanted == 'dict' or (wanted == 'list' and value.get('elements', '') == 'dict'):
|
|
sub_spec = value.get('options')
|
|
if value.get('apply_defaults', False):
|
|
if sub_spec is not None:
|
|
if parameters.get(param) is None:
|
|
parameters[param] = {}
|
|
else:
|
|
continue
|
|
elif sub_spec is None or param not in parameters or parameters[param] is None:
|
|
continue
|
|
|
|
# Keep track of context for warning messages
|
|
options_context.append(param)
|
|
|
|
# Make sure we can iterate over the elements
|
|
if not isinstance(parameters[param], Sequence) or isinstance(parameters[param], string_types):
|
|
elements = [parameters[param]]
|
|
else:
|
|
elements = parameters[param]
|
|
|
|
for idx, sub_parameters in enumerate(elements):
|
|
no_log_values.update(set_fallbacks(sub_spec, sub_parameters))
|
|
|
|
if not isinstance(sub_parameters, dict):
|
|
errors.append(SubParameterTypeError("value of '%s' must be of type dict or list of dicts" % param))
|
|
continue
|
|
|
|
# Set prefix for warning messages
|
|
new_prefix = prefix + param
|
|
if wanted == 'list':
|
|
new_prefix += '[%d]' % idx
|
|
new_prefix += '.'
|
|
|
|
alias_warnings = []
|
|
alias_deprecations_sub = []
|
|
try:
|
|
options_aliases = _handle_aliases(sub_spec, sub_parameters, alias_warnings, alias_deprecations_sub)
|
|
except (TypeError, ValueError) as e:
|
|
options_aliases = {}
|
|
errors.append(AliasError(to_native(e)))
|
|
|
|
for option, alias in alias_warnings:
|
|
warn('Both option %s%s and its alias %s%s are set.' % (new_prefix, option, new_prefix, alias))
|
|
|
|
if alias_deprecations is not None:
|
|
for deprecation in alias_deprecations_sub:
|
|
alias_deprecations.append({
|
|
'name': '%s%s' % (new_prefix, deprecation['name']),
|
|
'version': deprecation.get('version'),
|
|
'date': deprecation.get('date'),
|
|
'collection_name': deprecation.get('collection_name'),
|
|
})
|
|
|
|
try:
|
|
no_log_values.update(_list_no_log_values(sub_spec, sub_parameters))
|
|
except TypeError as te:
|
|
errors.append(NoLogError(to_native(te)))
|
|
|
|
legal_inputs = _get_legal_inputs(sub_spec, sub_parameters, options_aliases)
|
|
unsupported_parameters.update(
|
|
_get_unsupported_parameters(
|
|
sub_spec,
|
|
sub_parameters,
|
|
legal_inputs,
|
|
options_context,
|
|
store_supported=supported_parameters,
|
|
)
|
|
)
|
|
|
|
try:
|
|
check_mutually_exclusive(value.get('mutually_exclusive'), sub_parameters, options_context)
|
|
except TypeError as e:
|
|
errors.append(MutuallyExclusiveError(to_native(e)))
|
|
|
|
no_log_values.update(_set_defaults(sub_spec, sub_parameters, False))
|
|
|
|
try:
|
|
check_required_arguments(sub_spec, sub_parameters, options_context)
|
|
except TypeError as e:
|
|
errors.append(RequiredError(to_native(e)))
|
|
|
|
_validate_argument_types(sub_spec, sub_parameters, new_prefix, options_context, errors=errors)
|
|
_validate_argument_values(sub_spec, sub_parameters, options_context, errors=errors)
|
|
|
|
for check in _ADDITIONAL_CHECKS:
|
|
try:
|
|
check['func'](value.get(check['attr']), sub_parameters, options_context)
|
|
except TypeError as e:
|
|
errors.append(check['err'](to_native(e)))
|
|
|
|
no_log_values.update(_set_defaults(sub_spec, sub_parameters))
|
|
|
|
# Handle nested specs
|
|
_validate_sub_spec(
|
|
sub_spec, sub_parameters, new_prefix, options_context, errors, no_log_values,
|
|
unsupported_parameters, supported_parameters, alias_deprecations)
|
|
|
|
options_context.pop()
|
|
|
|
|
|
def env_fallback(*args, **kwargs):
|
|
"""Load value from environment variable"""
|
|
|
|
for arg in args:
|
|
if arg in os.environ:
|
|
return os.environ[arg]
|
|
raise AnsibleFallbackNotFound
|
|
|
|
|
|
def set_fallbacks(argument_spec, parameters):
|
|
no_log_values = set()
|
|
for param, value in argument_spec.items():
|
|
fallback = value.get('fallback', (None,))
|
|
fallback_strategy = fallback[0]
|
|
fallback_args = []
|
|
fallback_kwargs = {}
|
|
if param not in parameters and fallback_strategy is not None:
|
|
for item in fallback[1:]:
|
|
if isinstance(item, dict):
|
|
fallback_kwargs = item
|
|
else:
|
|
fallback_args = item
|
|
try:
|
|
fallback_value = fallback_strategy(*fallback_args, **fallback_kwargs)
|
|
except AnsibleFallbackNotFound:
|
|
continue
|
|
else:
|
|
if value.get('no_log', False) and fallback_value:
|
|
no_log_values.add(fallback_value)
|
|
parameters[param] = fallback_value
|
|
|
|
return no_log_values
|
|
|
|
|
|
def sanitize_keys(obj, no_log_strings, ignore_keys=frozenset()):
|
|
"""Sanitize the keys in a container object by removing ``no_log`` values from key names.
|
|
|
|
This is a companion function to the :func:`remove_values` function. Similar to that function,
|
|
we make use of ``deferred_removals`` to avoid hitting maximum recursion depth in cases of
|
|
large data structures.
|
|
|
|
:arg obj: The container object to sanitize. Non-container objects are returned unmodified.
|
|
:arg no_log_strings: A set of string values we do not want logged.
|
|
:kwarg ignore_keys: A set of string values of keys to not sanitize.
|
|
|
|
:returns: An object with sanitized keys.
|
|
"""
|
|
|
|
deferred_removals = deque()
|
|
|
|
no_log_strings = [to_native(s, errors='surrogate_or_strict') for s in no_log_strings]
|
|
new_value = _sanitize_keys_conditions(obj, no_log_strings, ignore_keys, deferred_removals)
|
|
|
|
while deferred_removals:
|
|
old_data, new_data = deferred_removals.popleft()
|
|
|
|
if isinstance(new_data, Mapping):
|
|
for old_key, old_elem in old_data.items():
|
|
if old_key in ignore_keys or old_key.startswith('_ansible'):
|
|
new_data[old_key] = _sanitize_keys_conditions(old_elem, no_log_strings, ignore_keys, deferred_removals)
|
|
else:
|
|
# Sanitize the old key. We take advantage of the sanitizing code in
|
|
# _remove_values_conditions() rather than recreating it here.
|
|
new_key = _remove_values_conditions(old_key, no_log_strings, None)
|
|
new_data[new_key] = _sanitize_keys_conditions(old_elem, no_log_strings, ignore_keys, deferred_removals)
|
|
else:
|
|
for elem in old_data:
|
|
new_elem = _sanitize_keys_conditions(elem, no_log_strings, ignore_keys, deferred_removals)
|
|
if isinstance(new_data, MutableSequence):
|
|
new_data.append(new_elem)
|
|
elif isinstance(new_data, MutableSet):
|
|
new_data.add(new_elem)
|
|
else:
|
|
raise TypeError('Unknown container type encountered when removing private values from keys')
|
|
|
|
return new_value
|
|
|
|
|
|
def remove_values(value, no_log_strings):
|
|
"""Remove strings in ``no_log_strings`` from value.
|
|
|
|
If value is a container type, then remove a lot more.
|
|
|
|
Use of ``deferred_removals`` exists, rather than a pure recursive solution,
|
|
because of the potential to hit the maximum recursion depth when dealing with
|
|
large amounts of data (see `issue #24560 <https://github.com/ansible/ansible/issues/24560>`_).
|
|
"""
|
|
|
|
deferred_removals = deque()
|
|
|
|
no_log_strings = [to_native(s, errors='surrogate_or_strict') for s in no_log_strings]
|
|
new_value = _remove_values_conditions(value, no_log_strings, deferred_removals)
|
|
|
|
while deferred_removals:
|
|
old_data, new_data = deferred_removals.popleft()
|
|
if isinstance(new_data, Mapping):
|
|
for old_key, old_elem in old_data.items():
|
|
new_elem = _remove_values_conditions(old_elem, no_log_strings, deferred_removals)
|
|
new_data[old_key] = new_elem
|
|
else:
|
|
for elem in old_data:
|
|
new_elem = _remove_values_conditions(elem, no_log_strings, deferred_removals)
|
|
if isinstance(new_data, MutableSequence):
|
|
new_data.append(new_elem)
|
|
elif isinstance(new_data, MutableSet):
|
|
new_data.add(new_elem)
|
|
else:
|
|
raise TypeError('Unknown container type encountered when removing private values from output')
|
|
|
|
return new_value
|