ansible-test - Remove obsolete network provisioning (#85433)

pull/85441/head
Matt Clay 6 months ago committed by GitHub
parent 35252fd96c
commit d19366331f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,40 +0,0 @@
#!/usr/bin/env bash
set -o pipefail -eux
declare -a args
IFS='/:' read -ra args <<< "$1"
platform="${args[0]}"
version="${args[1]}"
python_version="${args[2]}"
target="shippable/${platform}/incidental/"
stage="${S:-prod}"
provider="${P:-default}"
# python versions to test in order
# all versions run full tests
IFS=' ' read -r -a python_versions <<< \
"$(PYTHONPATH="${PWD}/test/lib" python -c 'from ansible_test._internal import constants; print(" ".join(constants.CONTROLLER_PYTHON_VERSIONS))')"
if [ "${python_version}" ]; then
# limit tests to a single python version
python_versions=("${python_version}")
fi
for python_version in "${python_versions[@]}"; do
# terminate remote instances on the final python version tested
if [ "${python_version}" = "${python_versions[-1]}" ]; then
terminate="always"
else
terminate="never"
fi
# shellcheck disable=SC2086
ansible-test network-integration --color -v --retry-on-error "${target}" ${COVERAGE:+"$COVERAGE"} ${CHANGED:+"$CHANGED"} ${UNSTABLE:+"$UNSTABLE"} \
--platform "${platform}/${version}" \
--docker default --python "${python_version}" \
--remote-terminate "${terminate}" --remote-stage "${stage}" --remote-provider "${provider}"
done

@ -0,0 +1,2 @@
minor_changes:
- ansible-test - Removed support for automatic provisioning of obsolete instances for network-integration tests.

@ -1,14 +0,0 @@
# NOTE: This file is used by ansible-test to override specific Ansible constants
# This file is used by `ansible-test network-integration`
[defaults]
host_key_checking = False
timeout = 90
[ssh_connection]
ssh_args = '-o UserKnownHostsFile=/dev/null'
[persistent_connection]
command_timeout = 100
connect_timeout = 100
connect_retry_timeout = 100

@ -1 +0,0 @@
scp # needed by incidental_ios_file

@ -1 +0,0 @@
ios/csr1000v collection=cisco.ios connection=ansible.netcommon.network_cli provider=aws arch=x86_64

@ -115,7 +115,6 @@ test/integration/targets/win_script/files/test_script_with_args.ps1 pslint:PSAvo
test/integration/targets/win_script/files/test_script_with_splatting.ps1 pslint:PSAvoidUsingWriteHost # Keep
test/integration/targets/ssh_agent/fake_agents/ssh-agent-bad-shebang shebang # required for test
test/lib/ansible_test/_data/requirements/sanity.pslint.ps1 pslint:PSCustomUseLiteralPath # Uses wildcards on purpose
test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/cliconf/ios.py pylint:arguments-renamed
test/support/windows-integration/collections/ansible_collections/ansible/windows/plugins/module_utils/WebRequest.psm1 pslint!skip
test/support/windows-integration/collections/ansible_collections/ansible/windows/plugins/modules/win_uri.ps1 pslint!skip
test/support/windows-integration/plugins/modules/async_status.ps1 pslint!skip

@ -1,39 +0,0 @@
#
# Copyright 2018 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
from ansible_collections.ansible.netcommon.plugins.action.network import (
ActionModule as ActionNetworkModule,
)
class ActionModule(ActionNetworkModule):
def run(self, tmp=None, task_vars=None):
del tmp # tmp no longer has any effect
self._config_module = True
if self._play_context.connection.split(".")[-1] != "network_cli":
return {
"failed": True,
"msg": "Connection type %s is not valid for cli_config module"
% self._play_context.connection,
}
return super(ActionModule, self).run(task_vars=task_vars)

@ -1,198 +0,0 @@
# (c) 2018, Ansible Inc,
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import os
import re
import uuid
import hashlib
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.module_utils.connection import Connection, ConnectionError
from ansible.plugins.action import ActionBase
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.utils.display import Display
display = Display()
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
socket_path = None
self._get_network_os(task_vars)
persistent_connection = self._play_context.connection.split(".")[-1]
result = super(ActionModule, self).run(task_vars=task_vars)
if persistent_connection != "network_cli":
# It is supported only with network_cli
result["failed"] = True
result["msg"] = (
"connection type %s is not valid for net_get module,"
" please use fully qualified name of network_cli connection type"
% self._play_context.connection
)
return result
try:
src = self._task.args["src"]
except KeyError as exc:
return {
"failed": True,
"msg": "missing required argument: %s" % exc,
}
# Get destination file if specified
dest = self._task.args.get("dest")
if dest is None:
dest = self._get_default_dest(src)
else:
dest = self._handle_dest_path(dest)
# Get proto
proto = self._task.args.get("protocol")
if proto is None:
proto = "scp"
if socket_path is None:
socket_path = self._connection.socket_path
conn = Connection(socket_path)
sock_timeout = conn.get_option("persistent_command_timeout")
try:
changed = self._handle_existing_file(
conn, src, dest, proto, sock_timeout
)
if changed is False:
result["changed"] = changed
result["destination"] = dest
return result
except Exception as exc:
result["msg"] = (
"Warning: %s idempotency check failed. Check dest" % exc
)
try:
conn.get_file(
source=src, destination=dest, proto=proto, timeout=sock_timeout
)
except Exception as exc:
result["failed"] = True
result["msg"] = "Exception received: %s" % exc
result["changed"] = changed
result["destination"] = dest
return result
def _handle_dest_path(self, dest):
working_path = self._get_working_path()
if os.path.isabs(dest) or urlsplit("dest").scheme:
dst = dest
else:
dst = self._loader.path_dwim_relative(working_path, "", dest)
return dst
def _get_src_filename_from_path(self, src_path):
filename_list = re.split("/|:", src_path)
return filename_list[-1]
def _get_default_dest(self, src_path):
dest_path = self._get_working_path()
src_fname = self._get_src_filename_from_path(src_path)
filename = "%s/%s" % (dest_path, src_fname)
return filename
def _handle_existing_file(self, conn, source, dest, proto, timeout):
"""
Determines whether the source and destination file match.
:return: False if source and dest both exist and have matching sha1 sums, True otherwise.
"""
if not os.path.exists(dest):
return True
cwd = self._loader.get_basedir()
filename = str(uuid.uuid4())
tmp_dest_file = os.path.join(cwd, filename)
try:
conn.get_file(
source=source,
destination=tmp_dest_file,
proto=proto,
timeout=timeout,
)
except ConnectionError as exc:
error = to_text(exc)
if error.endswith("No such file or directory"):
if os.path.exists(tmp_dest_file):
os.remove(tmp_dest_file)
return True
try:
with open(tmp_dest_file, "r") as f:
new_content = f.read()
with open(dest, "r") as f:
old_content = f.read()
except (IOError, OSError):
os.remove(tmp_dest_file)
raise
sha1 = hashlib.sha1()
old_content_b = to_bytes(old_content, errors="surrogate_or_strict")
sha1.update(old_content_b)
checksum_old = sha1.digest()
sha1 = hashlib.sha1()
new_content_b = to_bytes(new_content, errors="surrogate_or_strict")
sha1.update(new_content_b)
checksum_new = sha1.digest()
os.remove(tmp_dest_file)
if checksum_old == checksum_new:
return False
return True
def _get_working_path(self):
cwd = self._loader.get_basedir()
if self._task._role is not None:
cwd = self._task._role._role_path
return cwd
def _get_network_os(self, task_vars):
if "network_os" in self._task.args and self._task.args["network_os"]:
display.vvvv("Getting network OS from task argument")
network_os = self._task.args["network_os"]
elif self._play_context.network_os:
display.vvvv("Getting network OS from inventory")
network_os = self._play_context.network_os
elif (
"network_os" in task_vars.get("ansible_facts", {})
and task_vars["ansible_facts"]["network_os"]
):
display.vvvv("Getting network OS from fact")
network_os = task_vars["ansible_facts"]["network_os"]
else:
raise AnsibleError(
"ansible_network_os must be specified on this host"
)
return network_os

@ -1,234 +0,0 @@
# (c) 2018, Ansible Inc,
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import os
import uuid
import hashlib
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.module_utils.connection import Connection, ConnectionError
from ansible.plugins.action import ActionBase
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.utils.display import Display
display = Display()
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
socket_path = None
network_os = self._get_network_os(task_vars).split(".")[-1]
persistent_connection = self._play_context.connection.split(".")[-1]
result = super(ActionModule, self).run(task_vars=task_vars)
if persistent_connection != "network_cli":
# It is supported only with network_cli
result["failed"] = True
result["msg"] = (
"connection type %s is not valid for net_put module,"
" please use fully qualified name of network_cli connection type"
% self._play_context.connection
)
return result
try:
src = self._task.args["src"]
except KeyError as exc:
return {
"failed": True,
"msg": "missing required argument: %s" % exc,
}
src_file_path_name = src
# Get destination file if specified
dest = self._task.args.get("dest")
# Get proto
proto = self._task.args.get("protocol")
if proto is None:
proto = "scp"
# Get mode if set
mode = self._task.args.get("mode")
if mode is None:
mode = "binary"
if mode == "text":
try:
self._handle_template(convert_data=False)
except ValueError as exc:
return dict(failed=True, msg=to_text(exc))
# Now src has resolved file write to disk in current directory for scp
src = self._task.args.get("src")
filename = str(uuid.uuid4())
cwd = self._loader.get_basedir()
output_file = os.path.join(cwd, filename)
try:
with open(output_file, "wb") as f:
f.write(to_bytes(src, encoding="utf-8"))
except Exception:
os.remove(output_file)
raise
else:
try:
output_file = self._get_binary_src_file(src)
except ValueError as exc:
return dict(failed=True, msg=to_text(exc))
if socket_path is None:
socket_path = self._connection.socket_path
conn = Connection(socket_path)
sock_timeout = conn.get_option("persistent_command_timeout")
if dest is None:
dest = src_file_path_name
try:
changed = self._handle_existing_file(
conn, output_file, dest, proto, sock_timeout
)
if changed is False:
result["changed"] = changed
result["destination"] = dest
return result
except Exception as exc:
result["msg"] = (
"Warning: %s idempotency check failed. Check dest" % exc
)
try:
conn.copy_file(
source=output_file,
destination=dest,
proto=proto,
timeout=sock_timeout,
)
except Exception as exc:
if to_text(exc) == "No response from server":
if network_os == "iosxr":
# IOSXR sometimes closes socket prematurely after completion
# of file transfer
result[
"msg"
] = "Warning: iosxr scp server pre close issue. Please check dest"
else:
result["failed"] = True
result["msg"] = "Exception received: %s" % exc
if mode == "text":
# Cleanup tmp file expanded with ansible vars
os.remove(output_file)
result["changed"] = changed
result["destination"] = dest
return result
def _handle_existing_file(self, conn, source, dest, proto, timeout):
"""
Determines whether the source and destination file match.
:return: False if source and dest both exist and have matching sha1 sums, True otherwise.
"""
cwd = self._loader.get_basedir()
filename = str(uuid.uuid4())
tmp_source_file = os.path.join(cwd, filename)
try:
conn.get_file(
source=dest,
destination=tmp_source_file,
proto=proto,
timeout=timeout,
)
except ConnectionError as exc:
error = to_text(exc)
if error.endswith("No such file or directory"):
if os.path.exists(tmp_source_file):
os.remove(tmp_source_file)
return True
try:
with open(source, "r") as f:
new_content = f.read()
with open(tmp_source_file, "r") as f:
old_content = f.read()
except (IOError, OSError):
os.remove(tmp_source_file)
raise
sha1 = hashlib.sha1()
old_content_b = to_bytes(old_content, errors="surrogate_or_strict")
sha1.update(old_content_b)
checksum_old = sha1.digest()
sha1 = hashlib.sha1()
new_content_b = to_bytes(new_content, errors="surrogate_or_strict")
sha1.update(new_content_b)
checksum_new = sha1.digest()
os.remove(tmp_source_file)
if checksum_old == checksum_new:
return False
return True
def _get_binary_src_file(self, src):
working_path = self._get_working_path()
if os.path.isabs(src) or urlsplit("src").scheme:
source = src
else:
source = self._loader.path_dwim_relative(
working_path, "templates", src
)
if not source:
source = self._loader.path_dwim_relative(working_path, src)
if not os.path.exists(source):
raise ValueError("path specified in src not found")
return source
def _get_working_path(self):
cwd = self._loader.get_basedir()
if self._task._role is not None:
cwd = self._task._role._role_path
return cwd
def _get_network_os(self, task_vars):
if "network_os" in self._task.args and self._task.args["network_os"]:
display.vvvv("Getting network OS from task argument")
network_os = self._task.args["network_os"]
elif self._play_context.network_os:
display.vvvv("Getting network OS from inventory")
network_os = self._play_context.network_os
elif (
"network_os" in task_vars.get("ansible_facts", {})
and task_vars["ansible_facts"]["network_os"]
):
display.vvvv("Getting network OS from fact")
network_os = task_vars["ansible_facts"]["network_os"]
else:
raise AnsibleError(
"ansible_network_os must be specified on this host"
)
return network_os

@ -1,206 +0,0 @@
#
# (c) 2018 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
import os
import time
import re
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.plugins.action.normal import ActionModule as _ActionModule
from ansible.utils.display import Display
display = Display()
PRIVATE_KEYS_RE = re.compile("__.+__")
class ActionModule(_ActionModule):
def run(self, task_vars=None):
config_module = hasattr(self, "_config_module") and self._config_module
if config_module and self._task.args.get("src"):
try:
self._handle_src_option()
except AnsibleError as e:
return {"failed": True, "msg": e.message, "changed": False}
result = super(ActionModule, self).run(task_vars=task_vars)
if (
config_module
and self._task.args.get("backup")
and not result.get("failed")
):
self._handle_backup_option(result, task_vars)
return result
def _handle_backup_option(self, result, task_vars):
filename = None
backup_path = None
try:
content = result["__backup__"]
except KeyError:
raise AnsibleError("Failed while reading configuration backup")
backup_options = self._task.args.get("backup_options")
if backup_options:
filename = backup_options.get("filename")
backup_path = backup_options.get("dir_path")
if not backup_path:
cwd = self._get_working_path()
backup_path = os.path.join(cwd, "backup")
if not filename:
tstamp = time.strftime(
"%Y-%m-%d@%H:%M:%S", time.localtime(time.time())
)
filename = "%s_config.%s" % (
task_vars["inventory_hostname"],
tstamp,
)
dest = os.path.join(backup_path, filename)
backup_path = os.path.expanduser(
os.path.expandvars(
to_bytes(backup_path, errors="surrogate_or_strict")
)
)
if not os.path.exists(backup_path):
os.makedirs(backup_path)
new_task = self._task.copy()
for item in self._task.args:
if not item.startswith("_"):
new_task.args.pop(item, None)
new_task.args.update(dict(content=content, dest=dest))
copy_action = self._shared_loader_obj.action_loader.get(
"copy",
task=new_task,
connection=self._connection,
play_context=self._play_context,
loader=self._loader,
templar=self._templar,
shared_loader_obj=self._shared_loader_obj,
)
copy_result = copy_action.run(task_vars=task_vars)
if copy_result.get("failed"):
result["failed"] = copy_result["failed"]
result["msg"] = copy_result.get("msg")
return
result["backup_path"] = dest
if copy_result.get("changed", False):
result["changed"] = copy_result["changed"]
if backup_options and backup_options.get("filename"):
result["date"] = time.strftime(
"%Y-%m-%d",
time.gmtime(os.stat(result["backup_path"]).st_ctime),
)
result["time"] = time.strftime(
"%H:%M:%S",
time.gmtime(os.stat(result["backup_path"]).st_ctime),
)
else:
result["date"] = tstamp.split("@")[0]
result["time"] = tstamp.split("@")[1]
result["shortname"] = result["backup_path"][::-1].split(".", 1)[1][
::-1
]
result["filename"] = result["backup_path"].split("/")[-1]
# strip out any keys that have two leading and two trailing
# underscore characters
for key in list(result.keys()):
if PRIVATE_KEYS_RE.match(key):
del result[key]
def _get_working_path(self):
cwd = self._loader.get_basedir()
if self._task._role is not None:
cwd = self._task._role._role_path
return cwd
def _handle_src_option(self, convert_data=True):
src = self._task.args.get("src")
working_path = self._get_working_path()
if os.path.isabs(src) or urlsplit("src").scheme:
source = src
else:
source = self._loader.path_dwim_relative(
working_path, "templates", src
)
if not source:
source = self._loader.path_dwim_relative(working_path, src)
if not os.path.exists(source):
raise AnsibleError("path specified in src not found")
try:
with open(source, "r") as f:
template_data = to_text(f.read())
except IOError as e:
raise AnsibleError(
"unable to load src file {0}, I/O error({1}): {2}".format(
source, e.errno, e.strerror
)
)
# Create a template search path in the following order:
# [working_path, self_role_path, dependent_role_paths, dirname(source)]
searchpath = [working_path]
if self._task._role is not None:
searchpath.append(self._task._role._role_path)
if hasattr(self._task, "_block:"):
dep_chain = self._task._block.get_dep_chain()
if dep_chain is not None:
for role in dep_chain:
searchpath.append(role._role_path)
searchpath.append(os.path.dirname(source))
templar = self._templar.copy_with_new_env(searchpath=searchpath)
self._task.args["src"] = templar.template(template_data)
def _get_network_os(self, task_vars):
if "network_os" in self._task.args and self._task.args["network_os"]:
display.vvvv("Getting network OS from task argument")
network_os = self._task.args["network_os"]
elif self._play_context.network_os:
display.vvvv("Getting network OS from inventory")
network_os = self._play_context.network_os
elif (
"network_os" in task_vars.get("ansible_facts", {})
and task_vars["ansible_facts"]["network_os"]
):
display.vvvv("Getting network OS from fact")
network_os = task_vars["ansible_facts"]["network_os"]
else:
raise AnsibleError(
"ansible_network_os must be specified on this host"
)
return network_os

@ -1,96 +0,0 @@
# 2017 Red Hat Inc.
# (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
DOCUMENTATION = """author: Ansible Core Team
connection: persistent
short_description: Use a persistent unix socket for connection
description:
- This is a helper plugin to allow making other connections persistent.
options:
persistent_command_timeout:
type: int
description:
- Configures, in seconds, the amount of time to wait for a command to return from
the remote device. If this timer is exceeded before the command returns, the
connection plugin will raise an exception and close
default: 10
ini:
- section: persistent_connection
key: command_timeout
env:
- name: ANSIBLE_PERSISTENT_COMMAND_TIMEOUT
vars:
- name: ansible_command_timeout
"""
from ansible.executor.task_executor import start_connection
from ansible.plugins.connection import ConnectionBase
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.connection import Connection as SocketConnection
from ansible.utils.display import Display
display = Display()
class Connection(ConnectionBase):
""" Local based connections """
transport = "ansible.netcommon.persistent"
has_pipelining = False
def __init__(self, play_context, new_stdin, *args, **kwargs):
super(Connection, self).__init__(
play_context, new_stdin, *args, **kwargs
)
self._task_uuid = to_text(kwargs.get("task_uuid", ""))
def _connect(self):
self._connected = True
return self
def exec_command(self, cmd, in_data=None, sudoable=True):
display.vvvv(
"exec_command(), socket_path=%s" % self.socket_path,
host=self._play_context.remote_addr,
)
connection = SocketConnection(self.socket_path)
out = connection.exec_command(cmd, in_data=in_data, sudoable=sudoable)
return 0, out, ""
def put_file(self, in_path, out_path):
pass
def fetch_file(self, in_path, out_path):
pass
def close(self):
self._connected = False
def run(self):
"""Returns the path of the persistent connection socket.
Attempts to ensure (within playcontext.timeout seconds) that the
socket path exists. If the path exists (or the timeout has expired),
returns the socket path.
"""
display.vvvv(
"starting connection from persistent connection plugin",
host=self._play_context.remote_addr,
)
variables = {
"ansible_command_timeout": self.get_option(
"persistent_command_timeout"
)
}
socket_path = start_connection(
self._play_context, variables, self._task_uuid
)
display.vvvv(
"local domain socket path is %s" % socket_path,
host=self._play_context.remote_addr,
)
setattr(self, "_socket_path", socket_path)
return socket_path

@ -1,75 +0,0 @@
# -*- coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
class ModuleDocFragment(object):
# Standard files documentation fragment
DOCUMENTATION = r"""
options:
import_modules:
type: boolean
description:
- Reduce CPU usage and network module execution time
by enabling direct execution. Instead of the module being packaged
and executed by the shell, it will be directly executed by the Ansible
control node using the same python interpreter as the Ansible process.
Note- Incompatible with C(asynchronous mode).
Note- Python 3 and Ansible 2.9.16 or greater required.
Note- With Ansible 2.9.x fully qualified modules names are required in tasks.
default: true
ini:
- section: ansible_network
key: import_modules
env:
- name: ANSIBLE_NETWORK_IMPORT_MODULES
vars:
- name: ansible_network_import_modules
persistent_connect_timeout:
type: int
description:
- Configures, in seconds, the amount of time to wait when trying to initially
establish a persistent connection. If this value expires before the connection
to the remote device is completed, the connection will fail.
default: 30
ini:
- section: persistent_connection
key: connect_timeout
env:
- name: ANSIBLE_PERSISTENT_CONNECT_TIMEOUT
vars:
- name: ansible_connect_timeout
persistent_command_timeout:
type: int
description:
- Configures, in seconds, the amount of time to wait for a command to
return from the remote device. If this timer is exceeded before the
command returns, the connection plugin will raise an exception and
close.
default: 30
ini:
- section: persistent_connection
key: command_timeout
env:
- name: ANSIBLE_PERSISTENT_COMMAND_TIMEOUT
vars:
- name: ansible_command_timeout
persistent_log_messages:
type: boolean
description:
- This flag will enable logging the command executed and response received from
target device in the ansible log file. For this option to work 'log_path' ansible
configuration option is required to be set to a file path with write access.
- Be sure to fully understand the security implications of enabling this
option as it could create a security vulnerability by logging sensitive information in log file.
default: False
ini:
- section: persistent_connection
key: log_messages
env:
- name: ANSIBLE_PERSISTENT_LOG_MESSAGES
vars:
- name: ansible_persistent_log_messages
"""

@ -1,28 +0,0 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The base class for all resource modules
"""
from __future__ import annotations
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network import (
get_resource_connection,
)
class ConfigBase(object):
""" The base class for all resource modules
"""
ACTION_STATES = ["merged", "replaced", "overridden", "deleted"]
def __init__(self, module):
self._module = module
self.state = module.params["state"]
self._connection = None
if self.state not in ["rendered", "parsed"]:
self._connection = get_resource_connection(module)

@ -1,475 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# (c) 2016 Red Hat Inc.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
from __future__ import annotations
import re
import hashlib
from ansible.module_utils.six.moves import zip
from ansible.module_utils.common.text.converters import to_bytes, to_native
DEFAULT_COMMENT_TOKENS = ["#", "!", "/*", "*/", "echo"]
DEFAULT_IGNORE_LINES_RE = set(
[
re.compile(r"Using \d+ out of \d+ bytes"),
re.compile(r"Building configuration"),
re.compile(r"Current configuration : \d+ bytes"),
]
)
try:
Pattern = re._pattern_type
except AttributeError:
Pattern = re.Pattern
class ConfigLine(object):
def __init__(self, raw):
self.text = str(raw).strip()
self.raw = raw
self._children = list()
self._parents = list()
def __str__(self):
return self.raw
def __eq__(self, other):
return self.line == other.line
def __ne__(self, other):
return not self.__eq__(other)
def __getitem__(self, key):
for item in self._children:
if item.text == key:
return item
raise KeyError(key)
@property
def line(self):
line = self.parents
line.append(self.text)
return " ".join(line)
@property
def children(self):
return _obj_to_text(self._children)
@property
def child_objs(self):
return self._children
@property
def parents(self):
return _obj_to_text(self._parents)
@property
def path(self):
config = _obj_to_raw(self._parents)
config.append(self.raw)
return "\n".join(config)
@property
def has_children(self):
return len(self._children) > 0
@property
def has_parents(self):
return len(self._parents) > 0
def add_child(self, obj):
if not isinstance(obj, ConfigLine):
raise AssertionError("child must be of type `ConfigLine`")
self._children.append(obj)
def ignore_line(text, tokens=None):
for item in tokens or DEFAULT_COMMENT_TOKENS:
if text.startswith(item):
return True
for regex in DEFAULT_IGNORE_LINES_RE:
if regex.match(text):
return True
def _obj_to_text(x):
return [o.text for o in x]
def _obj_to_raw(x):
return [o.raw for o in x]
def _obj_to_block(objects, visited=None):
items = list()
for o in objects:
if o not in items:
items.append(o)
for child in o._children:
if child not in items:
items.append(child)
return _obj_to_raw(items)
def dumps(objects, output="block", comments=False):
if output == "block":
items = _obj_to_block(objects)
elif output == "commands":
items = _obj_to_text(objects)
elif output == "raw":
items = _obj_to_raw(objects)
else:
raise TypeError("unknown value supplied for keyword output")
if output == "block":
if comments:
for index, item in enumerate(items):
nextitem = index + 1
if (
nextitem < len(items)
and not item.startswith(" ")
and items[nextitem].startswith(" ")
):
item = "!\n%s" % item
items[index] = item
items.append("!")
items.append("end")
return "\n".join(items)
class NetworkConfig(object):
def __init__(self, indent=1, contents=None, ignore_lines=None):
self._indent = indent
self._items = list()
self._config_text = None
if ignore_lines:
for item in ignore_lines:
if not isinstance(item, Pattern):
item = re.compile(item)
DEFAULT_IGNORE_LINES_RE.add(item)
if contents:
self.load(contents)
@property
def items(self):
return self._items
@property
def config_text(self):
return self._config_text
@property
def sha1(self):
sha1 = hashlib.sha1()
sha1.update(to_bytes(str(self), errors="surrogate_or_strict"))
return sha1.digest()
def __getitem__(self, key):
for line in self:
if line.text == key:
return line
raise KeyError(key)
def __iter__(self):
return iter(self._items)
def __str__(self):
return "\n".join([c.raw for c in self.items])
def __len__(self):
return len(self._items)
def load(self, s):
self._config_text = s
self._items = self.parse(s)
def loadfp(self, fp):
with open(fp) as f:
return self.load(f.read())
def parse(self, lines, comment_tokens=None):
toplevel = re.compile(r"\S")
childline = re.compile(r"^\s*(.+)$")
entry_reg = re.compile(r"([{};])")
ancestors = list()
config = list()
indents = [0]
for linenum, line in enumerate(
to_native(lines, errors="surrogate_or_strict").split("\n")
):
text = entry_reg.sub("", line).strip()
cfg = ConfigLine(line)
if not text or ignore_line(text, comment_tokens):
continue
# handle top level commands
if toplevel.match(line):
ancestors = [cfg]
indents = [0]
# handle sub level commands
else:
match = childline.match(line)
line_indent = match.start(1)
if line_indent < indents[-1]:
while indents[-1] > line_indent:
indents.pop()
if line_indent > indents[-1]:
indents.append(line_indent)
curlevel = len(indents) - 1
parent_level = curlevel - 1
cfg._parents = ancestors[:curlevel]
if curlevel > len(ancestors):
config.append(cfg)
continue
for i in range(curlevel, len(ancestors)):
ancestors.pop()
ancestors.append(cfg)
ancestors[parent_level].add_child(cfg)
config.append(cfg)
return config
def get_object(self, path):
for item in self.items:
if item.text == path[-1]:
if item.parents == path[:-1]:
return item
def get_block(self, path):
if not isinstance(path, list):
raise AssertionError("path argument must be a list object")
obj = self.get_object(path)
if not obj:
raise ValueError("path does not exist in config")
return self._expand_block(obj)
def get_block_config(self, path):
block = self.get_block(path)
return dumps(block, "block")
def _expand_block(self, configobj, S=None):
if S is None:
S = list()
S.append(configobj)
for child in configobj._children:
if child in S:
continue
self._expand_block(child, S)
return S
def _diff_line(self, other):
updates = list()
for item in self.items:
if item not in other:
updates.append(item)
return updates
def _diff_strict(self, other):
updates = list()
# block extracted from other does not have all parents
# but the last one. In case of multiple parents we need
# to add additional parents.
if other and isinstance(other, list) and len(other) > 0:
start_other = other[0]
if start_other.parents:
for parent in start_other.parents:
other.insert(0, ConfigLine(parent))
for index, line in enumerate(self.items):
try:
if str(line).strip() != str(other[index]).strip():
updates.append(line)
except (AttributeError, IndexError):
updates.append(line)
return updates
def _diff_exact(self, other):
updates = list()
if len(other) != len(self.items):
updates.extend(self.items)
else:
for ours, theirs in zip(self.items, other):
if ours != theirs:
updates.extend(self.items)
break
return updates
def difference(self, other, match="line", path=None, replace=None):
"""Perform a config diff against the another network config
:param other: instance of NetworkConfig to diff against
:param match: type of diff to perform. valid values are 'line',
'strict', 'exact'
:param path: context in the network config to filter the diff
:param replace: the method used to generate the replacement lines.
valid values are 'block', 'line'
:returns: a string of lines that are different
"""
if path and match != "line":
try:
other = other.get_block(path)
except ValueError:
other = list()
else:
other = other.items
# generate a list of ConfigLines that aren't in other
meth = getattr(self, "_diff_%s" % match)
updates = meth(other)
if replace == "block":
parents = list()
for item in updates:
if not item.has_parents:
parents.append(item)
else:
for p in item._parents:
if p not in parents:
parents.append(p)
updates = list()
for item in parents:
updates.extend(self._expand_block(item))
visited = set()
expanded = list()
for item in updates:
for p in item._parents:
if p.line not in visited:
visited.add(p.line)
expanded.append(p)
expanded.append(item)
visited.add(item.line)
return expanded
def add(self, lines, parents=None):
ancestors = list()
offset = 0
obj = None
# global config command
if not parents:
for line in lines:
# handle ignore lines
if ignore_line(line):
continue
item = ConfigLine(line)
item.raw = line
if item not in self.items:
self.items.append(item)
else:
for index, p in enumerate(parents):
try:
i = index + 1
obj = self.get_block(parents[:i])[0]
ancestors.append(obj)
except ValueError:
# add parent to config
offset = index * self._indent
obj = ConfigLine(p)
obj.raw = p.rjust(len(p) + offset)
if ancestors:
obj._parents = list(ancestors)
ancestors[-1]._children.append(obj)
self.items.append(obj)
ancestors.append(obj)
# add child objects
for line in lines:
# handle ignore lines
if ignore_line(line):
continue
# check if child already exists
for child in ancestors[-1]._children:
if child.text == line:
break
else:
offset = len(parents) * self._indent
item = ConfigLine(line)
item.raw = line.rjust(len(line) + offset)
item._parents = ancestors
ancestors[-1]._children.append(item)
self.items.append(item)
class CustomNetworkConfig(NetworkConfig):
def items_text(self):
return [item.text for item in self.items]
def expand_section(self, configobj, S=None):
if S is None:
S = list()
S.append(configobj)
for child in configobj.child_objs:
if child in S:
continue
self.expand_section(child, S)
return S
def to_block(self, section):
return "\n".join([item.raw for item in section])
def get_section(self, path):
try:
section = self.get_section_objects(path)
return self.to_block(section)
except ValueError:
return list()
def get_section_objects(self, path):
if not isinstance(path, list):
path = [path]
obj = self.get_object(path)
if not obj:
raise ValueError("path does not exist in config")
return self.expand_section(obj)

@ -1,164 +0,0 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The facts base class
this contains methods common to all facts subsets
"""
from __future__ import annotations
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network import (
get_resource_connection,
)
from ansible.module_utils.six import iteritems
class FactsBase(object):
"""
The facts base class
"""
def __init__(self, module):
self._module = module
self._warnings = []
self._gather_subset = module.params.get("gather_subset")
self._gather_network_resources = module.params.get(
"gather_network_resources"
)
self._connection = None
if module.params.get("state") not in ["rendered", "parsed"]:
self._connection = get_resource_connection(module)
self.ansible_facts = {"ansible_network_resources": {}}
self.ansible_facts["ansible_net_gather_network_resources"] = list()
self.ansible_facts["ansible_net_gather_subset"] = list()
if not self._gather_subset:
self._gather_subset = ["!config"]
if not self._gather_network_resources:
self._gather_network_resources = ["!all"]
def gen_runable(self, subsets, valid_subsets, resource_facts=False):
""" Generate the runable subset
:param module: The module instance
:param subsets: The provided subsets
:param valid_subsets: The valid subsets
:param resource_facts: A boolean flag
:rtype: list
:returns: The runable subsets
"""
runable_subsets = set()
exclude_subsets = set()
minimal_gather_subset = set()
if not resource_facts:
minimal_gather_subset = frozenset(["default"])
for subset in subsets:
if subset == "all":
runable_subsets.update(valid_subsets)
continue
if subset == "min" and minimal_gather_subset:
runable_subsets.update(minimal_gather_subset)
continue
if subset.startswith("!"):
subset = subset[1:]
if subset == "min":
exclude_subsets.update(minimal_gather_subset)
continue
if subset == "all":
exclude_subsets.update(
valid_subsets - minimal_gather_subset
)
continue
exclude = True
else:
exclude = False
if subset not in valid_subsets:
self._module.fail_json(
msg="Subset must be one of [%s], got %s"
% (
", ".join(sorted(list(valid_subsets))),
subset,
)
)
if exclude:
exclude_subsets.add(subset)
else:
runable_subsets.add(subset)
if not runable_subsets:
runable_subsets.update(valid_subsets)
runable_subsets.difference_update(exclude_subsets)
return runable_subsets
def get_network_resources_facts(
self, facts_resource_obj_map, resource_facts_type=None, data=None
):
"""
:param fact_resource_subsets:
:param data: previously collected configuration
:return:
"""
if not resource_facts_type:
resource_facts_type = self._gather_network_resources
restorun_subsets = self.gen_runable(
resource_facts_type,
frozenset(facts_resource_obj_map.keys()),
resource_facts=True,
)
if restorun_subsets:
self.ansible_facts["ansible_net_gather_network_resources"] = list(
restorun_subsets
)
instances = list()
for key in restorun_subsets:
fact_cls_obj = facts_resource_obj_map.get(key)
if fact_cls_obj:
instances.append(fact_cls_obj(self._module))
else:
self._warnings.extend(
[
"network resource fact gathering for '%s' is not supported"
% key
]
)
for inst in instances:
inst.populate_facts(self._connection, self.ansible_facts, data)
def get_network_legacy_facts(
self, fact_legacy_obj_map, legacy_facts_type=None
):
if not legacy_facts_type:
legacy_facts_type = self._gather_subset
runable_subsets = self.gen_runable(
legacy_facts_type, frozenset(fact_legacy_obj_map.keys())
)
if runable_subsets:
facts = dict()
# default subset should always returned be with legacy facts subsets
if "default" not in runable_subsets:
runable_subsets.add("default")
self.ansible_facts["ansible_net_gather_subset"] = list(
runable_subsets
)
instances = list()
for key in runable_subsets:
instances.append(fact_legacy_obj_map[key](self._module))
for inst in instances:
inst.populate()
facts.update(inst.facts)
self._warnings.extend(inst.warnings)
for key, value in iteritems(facts):
key = "ansible_net_%s" % key
self.ansible_facts[key] = value

@ -1,181 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# (c) 2017 Red Hat Inc.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
from __future__ import annotations
import sys
from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.module_utils.connection import Connection, ConnectionError
try:
from ncclient.xml_ import NCElement, new_ele, sub_ele
HAS_NCCLIENT = True
except (ImportError, AttributeError):
HAS_NCCLIENT = False
try:
from lxml.etree import Element, fromstring, XMLSyntaxError
except ImportError:
from xml.etree.ElementTree import Element, fromstring
if sys.version_info < (2, 7):
from xml.parsers.expat import ExpatError as XMLSyntaxError
else:
from xml.etree.ElementTree import ParseError as XMLSyntaxError
NS_MAP = {"nc": "urn:ietf:params:xml:ns:netconf:base:1.0"}
def exec_rpc(module, *args, **kwargs):
connection = NetconfConnection(module._socket_path)
return connection.execute_rpc(*args, **kwargs)
class NetconfConnection(Connection):
def __init__(self, socket_path):
super(NetconfConnection, self).__init__(socket_path)
def __rpc__(self, name, *args, **kwargs):
"""Executes the json-rpc and returns the output received
from remote device.
:name: rpc method to be executed over connection plugin that implements jsonrpc 2.0
:args: Ordered list of params passed as arguments to rpc method
:kwargs: Dict of valid key, value pairs passed as arguments to rpc method
For usage refer the respective connection plugin docs.
"""
self.check_rc = kwargs.pop("check_rc", True)
self.ignore_warning = kwargs.pop("ignore_warning", True)
response = self._exec_jsonrpc(name, *args, **kwargs)
if "error" in response:
rpc_error = response["error"].get("data")
return self.parse_rpc_error(
to_bytes(rpc_error, errors="surrogate_then_replace")
)
return fromstring(
to_bytes(response["result"], errors="surrogate_then_replace")
)
def parse_rpc_error(self, rpc_error):
if self.check_rc:
try:
error_root = fromstring(rpc_error)
root = Element("root")
root.append(error_root)
error_list = root.findall(".//nc:rpc-error", NS_MAP)
if not error_list:
raise ConnectionError(
to_text(rpc_error, errors="surrogate_then_replace")
)
warnings = []
for error in error_list:
message_ele = error.find("./nc:error-message", NS_MAP)
if message_ele is None:
message_ele = error.find("./nc:error-info", NS_MAP)
message = (
message_ele.text if message_ele is not None else None
)
severity = error.find("./nc:error-severity", NS_MAP).text
if (
severity == "warning"
and self.ignore_warning
and message is not None
):
warnings.append(message)
else:
raise ConnectionError(
to_text(rpc_error, errors="surrogate_then_replace")
)
return warnings
except XMLSyntaxError:
raise ConnectionError(rpc_error)
def transform_reply():
return b"""<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
<xsl:output method="xml" indent="no"/>
<xsl:template match="/|comment()|processing-instruction()">
<xsl:copy>
<xsl:apply-templates/>
</xsl:copy>
</xsl:template>
<xsl:template match="*">
<xsl:element name="{local-name()}">
<xsl:apply-templates select="@*|node()"/>
</xsl:element>
</xsl:template>
<xsl:template match="@*">
<xsl:attribute name="{local-name()}">
<xsl:value-of select="."/>
</xsl:attribute>
</xsl:template>
</xsl:stylesheet>
"""
# Note: Workaround for ncclient 0.5.3
def remove_namespaces(data):
if not HAS_NCCLIENT:
raise ImportError(
"ncclient is required but does not appear to be installed. "
"It can be installed using `pip install ncclient`"
)
return NCElement(data, transform_reply()).data_xml
def build_root_xml_node(tag):
return new_ele(tag)
def build_child_xml_node(parent, tag, text=None, attrib=None):
element = sub_ele(parent, tag)
if text:
element.text = to_text(text)
if attrib:
element.attrib.update(attrib)
return element
def build_subtree(parent, path):
element = parent
for field in path.split("/"):
sub_element = build_child_xml_node(element, field)
element = sub_element
return element

@ -1,276 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# Copyright (c) 2015 Peter Sprygada, <psprygada@ansible.com>
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import annotations
import traceback
import json
from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.basic import env_fallback
from ansible.module_utils.connection import Connection, ConnectionError
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.netconf import (
NetconfConnection,
)
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.parsing import (
Cli,
)
from ansible.module_utils.six import iteritems
NET_TRANSPORT_ARGS = dict(
host=dict(required=True),
port=dict(type="int"),
username=dict(fallback=(env_fallback, ["ANSIBLE_NET_USERNAME"])),
password=dict(
no_log=True, fallback=(env_fallback, ["ANSIBLE_NET_PASSWORD"])
),
ssh_keyfile=dict(
fallback=(env_fallback, ["ANSIBLE_NET_SSH_KEYFILE"]), type="path"
),
authorize=dict(
default=False,
fallback=(env_fallback, ["ANSIBLE_NET_AUTHORIZE"]),
type="bool",
),
auth_pass=dict(
no_log=True, fallback=(env_fallback, ["ANSIBLE_NET_AUTH_PASS"])
),
provider=dict(type="dict", no_log=True),
transport=dict(choices=list()),
timeout=dict(default=10, type="int"),
)
NET_CONNECTION_ARGS = dict()
NET_CONNECTIONS = dict()
def _transitional_argument_spec():
argument_spec = {}
for key, value in iteritems(NET_TRANSPORT_ARGS):
value["required"] = False
argument_spec[key] = value
return argument_spec
def to_list(val):
if isinstance(val, (list, tuple)):
return list(val)
elif val is not None:
return [val]
else:
return list()
class ModuleStub(object):
def __init__(self, argument_spec, fail_json):
self.params = dict()
for key, value in argument_spec.items():
self.params[key] = value.get("default")
self.fail_json = fail_json
class NetworkError(Exception):
def __init__(self, msg, **kwargs):
super(NetworkError, self).__init__(msg)
self.kwargs = kwargs
class Config(object):
def __init__(self, connection):
self.connection = connection
def __call__(self, commands, **kwargs):
lines = to_list(commands)
return self.connection.configure(lines, **kwargs)
def load_config(self, commands, **kwargs):
commands = to_list(commands)
return self.connection.load_config(commands, **kwargs)
def get_config(self, **kwargs):
return self.connection.get_config(**kwargs)
def save_config(self):
return self.connection.save_config()
class NetworkModule(AnsibleModule):
def __init__(self, *args, **kwargs):
connect_on_load = kwargs.pop("connect_on_load", True)
argument_spec = NET_TRANSPORT_ARGS.copy()
argument_spec["transport"]["choices"] = NET_CONNECTIONS.keys()
argument_spec.update(NET_CONNECTION_ARGS.copy())
if kwargs.get("argument_spec"):
argument_spec.update(kwargs["argument_spec"])
kwargs["argument_spec"] = argument_spec
super(NetworkModule, self).__init__(*args, **kwargs)
self.connection = None
self._cli = None
self._config = None
try:
transport = self.params["transport"] or "__default__"
cls = NET_CONNECTIONS[transport]
self.connection = cls()
except KeyError:
self.fail_json(
msg="Unknown transport or no default transport specified"
)
except (TypeError, NetworkError) as exc:
self.fail_json(
msg=to_native(exc), exception=traceback.format_exc()
)
if connect_on_load:
self.connect()
@property
def cli(self):
if not self.connected:
self.connect()
if self._cli:
return self._cli
self._cli = Cli(self.connection)
return self._cli
@property
def config(self):
if not self.connected:
self.connect()
if self._config:
return self._config
self._config = Config(self.connection)
return self._config
@property
def connected(self):
return self.connection._connected
def _load_params(self):
super(NetworkModule, self)._load_params()
provider = self.params.get("provider") or dict()
for key, value in provider.items():
for args in [NET_TRANSPORT_ARGS, NET_CONNECTION_ARGS]:
if key in args:
if self.params.get(key) is None and value is not None:
self.params[key] = value
def connect(self):
try:
if not self.connected:
self.connection.connect(self.params)
if self.params["authorize"]:
self.connection.authorize(self.params)
self.log(
"connected to %s:%s using %s"
% (
self.params["host"],
self.params["port"],
self.params["transport"],
)
)
except NetworkError as exc:
self.fail_json(
msg=to_native(exc), exception=traceback.format_exc()
)
def disconnect(self):
try:
if self.connected:
self.connection.disconnect()
self.log("disconnected from %s" % self.params["host"])
except NetworkError as exc:
self.fail_json(
msg=to_native(exc), exception=traceback.format_exc()
)
def register_transport(transport, default=False):
def register(cls):
NET_CONNECTIONS[transport] = cls
if default:
NET_CONNECTIONS["__default__"] = cls
return cls
return register
def add_argument(key, value):
NET_CONNECTION_ARGS[key] = value
def get_resource_connection(module):
if hasattr(module, "_connection"):
return module._connection
capabilities = get_capabilities(module)
network_api = capabilities.get("network_api")
if network_api in ("cliconf", "nxapi", "eapi", "exosapi"):
module._connection = Connection(module._socket_path)
elif network_api == "netconf":
module._connection = NetconfConnection(module._socket_path)
elif network_api == "local":
# This isn't supported, but we shouldn't fail here.
# Set the connection to a fake connection so it fails sensibly.
module._connection = LocalResourceConnection(module)
else:
module.fail_json(
msg="Invalid connection type {0!s}".format(network_api)
)
return module._connection
def get_capabilities(module):
if hasattr(module, "capabilities"):
return module._capabilities
try:
capabilities = Connection(module._socket_path).get_capabilities()
except ConnectionError as exc:
module.fail_json(msg=to_text(exc, errors="surrogate_then_replace"))
except AssertionError:
# No socket_path, connection most likely local.
return dict(network_api="local")
module._capabilities = json.loads(capabilities)
return module._capabilities
class LocalResourceConnection:
def __init__(self, module):
self.module = module
def get(self, *args, **kwargs):
self.module.fail_json(
msg="Network resource modules not supported over local connection."
)

@ -1,317 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# Copyright (c) 2015 Peter Sprygada, <psprygada@ansible.com>
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import annotations
import re
import shlex
import time
from ansible.module_utils.parsing.convert_bool import (
BOOLEANS_TRUE,
BOOLEANS_FALSE,
)
from ansible.module_utils.six import string_types, text_type
from ansible.module_utils.six.moves import zip
def to_list(val):
if isinstance(val, (list, tuple)):
return list(val)
elif val is not None:
return [val]
else:
return list()
class FailedConditionsError(Exception):
def __init__(self, msg, failed_conditions):
super(FailedConditionsError, self).__init__(msg)
self.failed_conditions = failed_conditions
class FailedConditionalError(Exception):
def __init__(self, msg, failed_conditional):
super(FailedConditionalError, self).__init__(msg)
self.failed_conditional = failed_conditional
class AddCommandError(Exception):
def __init__(self, msg, command):
super(AddCommandError, self).__init__(msg)
self.command = command
class AddConditionError(Exception):
def __init__(self, msg, condition):
super(AddConditionError, self).__init__(msg)
self.condition = condition
class Cli(object):
def __init__(self, connection):
self.connection = connection
self.default_output = connection.default_output or "text"
self._commands = list()
@property
def commands(self):
return [str(c) for c in self._commands]
def __call__(self, commands, output=None):
objects = list()
for cmd in to_list(commands):
objects.append(self.to_command(cmd, output))
return self.connection.run_commands(objects)
def to_command(
self, command, output=None, prompt=None, response=None, **kwargs
):
output = output or self.default_output
if isinstance(command, Command):
return command
if isinstance(prompt, string_types):
prompt = re.compile(re.escape(prompt))
return Command(
command, output, prompt=prompt, response=response, **kwargs
)
def add_commands(self, commands, output=None, **kwargs):
for cmd in commands:
self._commands.append(self.to_command(cmd, output, **kwargs))
def run_commands(self):
responses = self.connection.run_commands(self._commands)
for resp, cmd in zip(responses, self._commands):
cmd.response = resp
# wipe out the commands list to avoid issues if additional
# commands are executed later
self._commands = list()
return responses
class Command(object):
def __init__(
self, command, output=None, prompt=None, response=None, **kwargs
):
self.command = command
self.output = output
self.command_string = command
self.prompt = prompt
self.response = response
self.args = kwargs
def __str__(self):
return self.command_string
class CommandRunner(object):
def __init__(self, module):
self.module = module
self.items = list()
self.conditionals = set()
self.commands = list()
self.retries = 10
self.interval = 1
self.match = "all"
self._default_output = module.connection.default_output
def add_command(
self, command, output=None, prompt=None, response=None, **kwargs
):
if command in [str(c) for c in self.commands]:
raise AddCommandError(
"duplicated command detected", command=command
)
cmd = self.module.cli.to_command(
command, output=output, prompt=prompt, response=response, **kwargs
)
self.commands.append(cmd)
def get_command(self, command, output=None):
for cmd in self.commands:
if cmd.command == command:
return cmd.response
raise ValueError("command '%s' not found" % command)
def get_responses(self):
return [cmd.response for cmd in self.commands]
def add_conditional(self, condition):
try:
self.conditionals.add(Conditional(condition))
except AttributeError as exc:
raise AddConditionError(msg=str(exc), condition=condition)
def run(self):
while self.retries > 0:
self.module.cli.add_commands(self.commands)
responses = self.module.cli.run_commands()
for item in list(self.conditionals):
if item(responses):
if self.match == "any":
return item
self.conditionals.remove(item)
if not self.conditionals:
break
time.sleep(self.interval)
self.retries -= 1
else:
failed_conditions = [item.raw for item in self.conditionals]
errmsg = (
"One or more conditional statements have not been satisfied"
)
raise FailedConditionsError(errmsg, failed_conditions)
class Conditional(object):
"""Used in command modules to evaluate waitfor conditions
"""
OPERATORS = {
"eq": ["eq", "=="],
"neq": ["neq", "ne", "!="],
"gt": ["gt", ">"],
"ge": ["ge", ">="],
"lt": ["lt", "<"],
"le": ["le", "<="],
"contains": ["contains"],
"matches": ["matches"],
}
def __init__(self, conditional, encoding=None):
self.raw = conditional
self.negate = False
try:
components = shlex.split(conditional)
key, val = components[0], components[-1]
op_components = components[1:-1]
if "not" in op_components:
self.negate = True
op_components.pop(op_components.index("not"))
op = op_components[0]
except ValueError:
raise ValueError("failed to parse conditional")
self.key = key
self.func = self._func(op)
self.value = self._cast_value(val)
def __call__(self, data):
value = self.get_value(dict(result=data))
if not self.negate:
return self.func(value)
else:
return not self.func(value)
def _cast_value(self, value):
if value in BOOLEANS_TRUE:
return True
elif value in BOOLEANS_FALSE:
return False
elif re.match(r"^\d+\.d+$", value):
return float(value)
elif re.match(r"^\d+$", value):
return int(value)
else:
return text_type(value)
def _func(self, oper):
for func, operators in self.OPERATORS.items():
if oper in operators:
return getattr(self, func)
raise AttributeError("unknown operator: %s" % oper)
def get_value(self, result):
try:
return self.get_json(result)
except (IndexError, TypeError, AttributeError):
msg = "unable to apply conditional to result"
raise FailedConditionalError(msg, self.raw)
def get_json(self, result):
string = re.sub(r"\[[\'|\"]", ".", self.key)
string = re.sub(r"[\'|\"]\]", ".", string)
parts = re.split(r"\.(?=[^\]]*(?:\[|$))", string)
for part in parts:
match = re.findall(r"\[(\S+?)\]", part)
if match:
key = part[: part.find("[")]
result = result[key]
for m in match:
try:
m = int(m)
except ValueError:
m = str(m)
result = result[m]
else:
result = result.get(part)
return result
def number(self, value):
if "." in str(value):
return float(value)
else:
return int(value)
def eq(self, value):
return value == self.value
def neq(self, value):
return value != self.value
def gt(self, value):
return self.number(value) > self.value
def ge(self, value):
return self.number(value) >= self.value
def lt(self, value):
return self.number(value) < self.value
def le(self, value):
return self.number(value) <= self.value
def contains(self, value):
return str(self.value) in value
def matches(self, value):
match = re.search(self.value, value, re.M)
return match is not None

@ -1,673 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# (c) 2016 Red Hat Inc.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Networking tools for network modules only
from __future__ import annotations
import re
import ast
import operator
import socket
import json
from itertools import chain
from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.module_utils.six.moves.collections_abc import Mapping
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils import basic
from ansible.module_utils.parsing.convert_bool import boolean
try:
from jinja2 import Environment, StrictUndefined
from jinja2.exceptions import UndefinedError
HAS_JINJA2 = True
except ImportError:
HAS_JINJA2 = False
OPERATORS = frozenset(["ge", "gt", "eq", "neq", "lt", "le"])
ALIASES = frozenset(
[("min", "ge"), ("max", "le"), ("exactly", "eq"), ("neq", "ne")]
)
def to_list(val):
if isinstance(val, (list, tuple, set)):
return list(val)
elif val is not None:
return [val]
else:
return list()
def to_lines(stdout):
for item in stdout:
if isinstance(item, string_types):
item = to_text(item).split("\n")
yield item
def transform_commands(module):
transform = ComplexList(
dict(
command=dict(key=True),
output=dict(),
prompt=dict(type="list"),
answer=dict(type="list"),
newline=dict(type="bool", default=True),
sendonly=dict(type="bool", default=False),
check_all=dict(type="bool", default=False),
),
module,
)
return transform(module.params["commands"])
def sort_list(val):
if isinstance(val, list):
return sorted(val)
return val
class Entity(object):
"""Transforms a dict to with an argument spec
This class will take a dict and apply an Ansible argument spec to the
values. The resulting dict will contain all of the keys in the param
with appropriate values set.
Example::
argument_spec = dict(
command=dict(key=True),
display=dict(default='text', choices=['text', 'json']),
validate=dict(type='bool')
)
transform = Entity(module, argument_spec)
value = dict(command='foo')
result = transform(value)
print result
{'command': 'foo', 'display': 'text', 'validate': None}
Supported argument spec:
* key - specifies how to map a single value to a dict
* read_from - read and apply the argument_spec from the module
* required - a value is required
* type - type of value (uses AnsibleModule type checker)
* fallback - implements fallback function
* choices - set of valid options
* default - default value
"""
def __init__(
self, module, attrs=None, args=None, keys=None, from_argspec=False
):
args = [] if args is None else args
self._attributes = attrs or {}
self._module = module
for arg in args:
self._attributes[arg] = dict()
if from_argspec:
self._attributes[arg]["read_from"] = arg
if keys and arg in keys:
self._attributes[arg]["key"] = True
self.attr_names = frozenset(self._attributes.keys())
_has_key = False
for name, attr in iteritems(self._attributes):
if attr.get("read_from"):
if attr["read_from"] not in self._module.argument_spec:
module.fail_json(
msg="argument %s does not exist" % attr["read_from"]
)
spec = self._module.argument_spec.get(attr["read_from"])
for key, value in iteritems(spec):
if key not in attr:
attr[key] = value
if attr.get("key"):
if _has_key:
module.fail_json(msg="only one key value can be specified")
_has_key = True
attr["required"] = True
def serialize(self):
return self._attributes
def to_dict(self, value):
obj = {}
for name, attr in iteritems(self._attributes):
if attr.get("key"):
obj[name] = value
else:
obj[name] = attr.get("default")
return obj
def __call__(self, value, strict=True):
if not isinstance(value, dict):
value = self.to_dict(value)
if strict:
unknown = set(value).difference(self.attr_names)
if unknown:
self._module.fail_json(
msg="invalid keys: %s" % ",".join(unknown)
)
for name, attr in iteritems(self._attributes):
if value.get(name) is None:
value[name] = attr.get("default")
if attr.get("fallback") and not value.get(name):
fallback = attr.get("fallback", (None,))
fallback_strategy = fallback[0]
fallback_args = []
fallback_kwargs = {}
if fallback_strategy is not None:
for item in fallback[1:]:
if isinstance(item, dict):
fallback_kwargs = item
else:
fallback_args = item
try:
value[name] = fallback_strategy(
*fallback_args, **fallback_kwargs
)
except basic.AnsibleFallbackNotFound:
continue
if attr.get("required") and value.get(name) is None:
self._module.fail_json(
msg="missing required attribute %s" % name
)
if "choices" in attr:
if value[name] not in attr["choices"]:
self._module.fail_json(
msg="%s must be one of %s, got %s"
% (name, ", ".join(attr["choices"]), value[name])
)
if value[name] is not None:
value_type = attr.get("type", "str")
type_checker = self._module._CHECK_ARGUMENT_TYPES_DISPATCHER[
value_type
]
type_checker(value[name])
elif value.get(name):
value[name] = self._module.params[name]
return value
class EntityCollection(Entity):
"""Extends ```Entity``` to handle a list of dicts """
def __call__(self, iterable, strict=True):
if iterable is None:
iterable = [
super(EntityCollection, self).__call__(
self._module.params, strict
)
]
if not isinstance(iterable, (list, tuple)):
self._module.fail_json(msg="value must be an iterable")
return [
(super(EntityCollection, self).__call__(i, strict))
for i in iterable
]
# these two are for backwards compatibility and can be removed once all of the
# modules that use them are updated
class ComplexDict(Entity):
def __init__(self, attrs, module, *args, **kwargs):
super(ComplexDict, self).__init__(module, attrs, *args, **kwargs)
class ComplexList(EntityCollection):
def __init__(self, attrs, module, *args, **kwargs):
super(ComplexList, self).__init__(module, attrs, *args, **kwargs)
def dict_diff(base, comparable):
""" Generate a dict object of differences
This function will compare two dict objects and return the difference
between them as a dict object. For scalar values, the key will reflect
the updated value. If the key does not exist in `comparable`, then then no
key will be returned. For lists, the value in comparable will wholly replace
the value in base for the key. For dicts, the returned value will only
return keys that are different.
:param base: dict object to base the diff on
:param comparable: dict object to compare against base
:returns: new dict object with differences
"""
if not isinstance(base, dict):
raise AssertionError("`base` must be of type <dict>")
if not isinstance(comparable, dict):
if comparable is None:
comparable = dict()
else:
raise AssertionError("`comparable` must be of type <dict>")
updates = dict()
for key, value in iteritems(base):
if isinstance(value, dict):
item = comparable.get(key)
if item is not None:
sub_diff = dict_diff(value, comparable[key])
if sub_diff:
updates[key] = sub_diff
else:
comparable_value = comparable.get(key)
if comparable_value is not None:
if sort_list(base[key]) != sort_list(comparable_value):
updates[key] = comparable_value
for key in set(comparable.keys()).difference(base.keys()):
updates[key] = comparable.get(key)
return updates
def dict_merge(base, other):
""" Return a new dict object that combines base and other
This will create a new dict object that is a combination of the key/value
pairs from base and other. When both keys exist, the value will be
selected from other. If the value is a list object, the two lists will
be combined and duplicate entries removed.
:param base: dict object to serve as base
:param other: dict object to combine with base
:returns: new combined dict object
"""
if not isinstance(base, dict):
raise AssertionError("`base` must be of type <dict>")
if not isinstance(other, dict):
raise AssertionError("`other` must be of type <dict>")
combined = dict()
for key, value in iteritems(base):
if isinstance(value, dict):
if key in other:
item = other.get(key)
if item is not None:
if isinstance(other[key], Mapping):
combined[key] = dict_merge(value, other[key])
else:
combined[key] = other[key]
else:
combined[key] = item
else:
combined[key] = value
elif isinstance(value, list):
if key in other:
item = other.get(key)
if item is not None:
try:
combined[key] = list(set(chain(value, item)))
except TypeError:
value.extend([i for i in item if i not in value])
combined[key] = value
else:
combined[key] = item
else:
combined[key] = value
else:
if key in other:
other_value = other.get(key)
if other_value is not None:
if sort_list(base[key]) != sort_list(other_value):
combined[key] = other_value
else:
combined[key] = value
else:
combined[key] = other_value
else:
combined[key] = value
for key in set(other.keys()).difference(base.keys()):
combined[key] = other.get(key)
return combined
def param_list_to_dict(param_list, unique_key="name", remove_key=True):
"""Rotates a list of dictionaries to be a dictionary of dictionaries.
:param param_list: The aforementioned list of dictionaries
:param unique_key: The name of a key which is present and unique in all of param_list's dictionaries. The value
behind this key will be the key each dictionary can be found at in the new root dictionary
:param remove_key: If True, remove unique_key from the individual dictionaries before returning.
"""
param_dict = {}
for params in param_list:
params = params.copy()
if remove_key:
name = params.pop(unique_key)
else:
name = params.get(unique_key)
param_dict[name] = params
return param_dict
def conditional(expr, val, cast=None):
match = re.match(r"^(.+)\((.+)\)$", str(expr), re.I)
if match:
op, arg = match.groups()
else:
op = "eq"
if " " in str(expr):
raise AssertionError("invalid expression: cannot contain spaces")
arg = expr
if cast is None and val is not None:
arg = type(val)(arg)
elif callable(cast):
arg = cast(arg)
val = cast(val)
op = next((oper for alias, oper in ALIASES if op == alias), op)
if not hasattr(operator, op) and op not in OPERATORS:
raise ValueError("unknown operator: %s" % op)
func = getattr(operator, op)
return func(val, arg)
def ternary(value, true_val, false_val):
""" value ? true_val : false_val """
if value:
return true_val
else:
return false_val
def remove_default_spec(spec):
for item in spec:
if "default" in spec[item]:
del spec[item]["default"]
def validate_ip_address(address):
try:
socket.inet_aton(address)
except socket.error:
return False
return address.count(".") == 3
def validate_ip_v6_address(address):
try:
socket.inet_pton(socket.AF_INET6, address)
except socket.error:
return False
return True
def validate_prefix(prefix):
if prefix and not 0 <= int(prefix) <= 32:
return False
return True
def load_provider(spec, args):
provider = args.get("provider") or {}
for key, value in iteritems(spec):
if key not in provider:
if "fallback" in value:
provider[key] = _fallback(value["fallback"])
elif "default" in value:
provider[key] = value["default"]
else:
provider[key] = None
if "authorize" in provider:
# Coerce authorize to provider if a string has somehow snuck in.
provider["authorize"] = boolean(provider["authorize"] or False)
args["provider"] = provider
return provider
def _fallback(fallback):
strategy = fallback[0]
args = []
kwargs = {}
for item in fallback[1:]:
if isinstance(item, dict):
kwargs = item
else:
args = item
try:
return strategy(*args, **kwargs)
except basic.AnsibleFallbackNotFound:
pass
def generate_dict(spec):
"""
Generate dictionary which is in sync with argspec
:param spec: A dictionary that is the argspec of the module
:rtype: A dictionary
:returns: A dictionary in sync with argspec with default value
"""
obj = {}
if not spec:
return obj
for key, val in iteritems(spec):
if "default" in val:
dct = {key: val["default"]}
elif "type" in val and val["type"] == "dict":
dct = {key: generate_dict(val["options"])}
else:
dct = {key: None}
obj.update(dct)
return obj
def parse_conf_arg(cfg, arg):
"""
Parse config based on argument
:param cfg: A text string which is a line of configuration.
:param arg: A text string which is to be matched.
:rtype: A text string
:returns: A text string if match is found
"""
match = re.search(r"%s (.+)(\n|$)" % arg, cfg, re.M)
if match:
result = match.group(1).strip()
else:
result = None
return result
def parse_conf_cmd_arg(cfg, cmd, res1, res2=None, delete_str="no"):
"""
Parse config based on command
:param cfg: A text string which is a line of configuration.
:param cmd: A text string which is the command to be matched
:param res1: A text string to be returned if the command is present
:param res2: A text string to be returned if the negate command
is present
:param delete_str: A text string to identify the start of the
negate command
:rtype: A text string
:returns: A text string if match is found
"""
match = re.search(r"\n\s+%s(\n|$)" % cmd, cfg)
if match:
return res1
if res2 is not None:
match = re.search(r"\n\s+%s %s(\n|$)" % (delete_str, cmd), cfg)
if match:
return res2
return None
def get_xml_conf_arg(cfg, path, data="text"):
"""
:param cfg: The top level configuration lxml Element tree object
:param path: The relative xpath w.r.t to top level element (cfg)
to be searched in the xml hierarchy
:param data: The type of data to be returned for the matched xml node.
Valid values are text, tag, attrib, with default as text.
:return: Returns the required type for the matched xml node or else None
"""
match = cfg.xpath(path)
if len(match):
if data == "tag":
result = getattr(match[0], "tag")
elif data == "attrib":
result = getattr(match[0], "attrib")
else:
result = getattr(match[0], "text")
else:
result = None
return result
def remove_empties(cfg_dict):
"""
Generate final config dictionary
:param cfg_dict: A dictionary parsed in the facts system
:rtype: A dictionary
:returns: A dictionary by eliminating keys that have null values
"""
final_cfg = {}
if not cfg_dict:
return final_cfg
for key, val in iteritems(cfg_dict):
dct = None
if isinstance(val, dict):
child_val = remove_empties(val)
if child_val:
dct = {key: child_val}
elif (
isinstance(val, list)
and val
and all(isinstance(x, dict) for x in val)
):
child_val = [remove_empties(x) for x in val]
if child_val:
dct = {key: child_val}
elif val not in [None, [], {}, (), ""]:
dct = {key: val}
if dct:
final_cfg.update(dct)
return final_cfg
def validate_config(spec, data):
"""
Validate if the input data against the AnsibleModule spec format
:param spec: Ansible argument spec
:param data: Data to be validated
:return:
"""
params = basic._ANSIBLE_ARGS
basic._ANSIBLE_ARGS = to_bytes(json.dumps({"ANSIBLE_MODULE_ARGS": data}))
validated_data = basic.AnsibleModule(spec).params
basic._ANSIBLE_ARGS = params
return validated_data
def search_obj_in_list(name, lst, key="name"):
if not lst:
return None
else:
for item in lst:
if item.get(key) == name:
return item
class Template:
def __init__(self):
if not HAS_JINJA2:
raise ImportError(
"jinja2 is required but does not appear to be installed. "
"It can be installed using `pip install jinja2`"
)
self.env = Environment(undefined=StrictUndefined)
self.env.filters.update({"ternary": ternary})
def __call__(self, value, variables=None, fail_on_undefined=True):
variables = variables or {}
if not self.contains_vars(value):
return value
try:
value = self.env.from_string(value).render(variables)
except UndefinedError:
if not fail_on_undefined:
return None
raise
if value:
try:
return ast.literal_eval(value)
except Exception:
return str(value)
else:
return None
def contains_vars(self, data):
if isinstance(data, string_types):
for marker in (
self.env.block_start_string,
self.env.variable_start_string,
self.env.comment_start_string,
):
if marker in data:
return True
return False

@ -1,442 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# (c) 2018, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
ANSIBLE_METADATA = {
"metadata_version": "1.1",
"status": ["preview"],
"supported_by": "network",
}
DOCUMENTATION = """module: cli_config
author: Trishna Guha (@trishnaguha)
notes:
- The commands will be returned only for platforms that do not support onbox diff.
The C(--diff) option with the playbook will return the difference in configuration
for devices that has support for onbox diff
short_description: Push text based configuration to network devices over network_cli
description:
- This module provides platform agnostic way of pushing text based configuration to
network devices over network_cli connection plugin.
extends_documentation_fragment:
- ansible.netcommon.network_agnostic
options:
config:
description:
- The config to be pushed to the network device. This argument is mutually exclusive
with C(rollback) and either one of the option should be given as input. The
config should have indentation that the device uses.
type: str
commit:
description:
- The C(commit) argument instructs the module to push the configuration to the
device. This is mapped to module check mode.
type: bool
replace:
description:
- If the C(replace) argument is set to C(yes), it will replace the entire running-config
of the device with the C(config) argument value. For devices that support replacing
running configuration from file on device like NXOS/JUNOS, the C(replace) argument
takes path to the file on the device that will be used for replacing the entire
running-config. The value of C(config) option should be I(None) for such devices.
Nexus 9K devices only support replace. Use I(net_put) or I(nxos_file_copy) in
case of NXOS module to copy the flat file to remote device and then use set
the fullpath to this argument.
type: str
backup:
description:
- This argument will cause the module to create a full backup of the current running
config from the remote device before any changes are made. If the C(backup_options)
value is not given, the backup file is written to the C(backup) folder in the
playbook root directory or role root directory, if playbook is part of an ansible
role. If the directory does not exist, it is created.
type: bool
default: 'no'
rollback:
description:
- The C(rollback) argument instructs the module to rollback the current configuration
to the identifier specified in the argument. If the specified rollback identifier
does not exist on the remote device, the module will fail. To rollback to the
most recent commit, set the C(rollback) argument to 0. This option is mutually
exclusive with C(config).
commit_comment:
description:
- The C(commit_comment) argument specifies a text string to be used when committing
the configuration. If the C(commit) argument is set to False, this argument
is silently ignored. This argument is only valid for the platforms that support
commit operation with comment.
type: str
defaults:
description:
- The I(defaults) argument will influence how the running-config is collected
from the device. When the value is set to true, the command used to collect
the running-config is append with the all keyword. When the value is set to
false, the command is issued without the all keyword.
default: 'no'
type: bool
multiline_delimiter:
description:
- This argument is used when pushing a multiline configuration element to the
device. It specifies the character to use as the delimiting character. This
only applies to the configuration action.
type: str
diff_replace:
description:
- Instructs the module on the way to perform the configuration on the device.
If the C(diff_replace) argument is set to I(line) then the modified lines are
pushed to the device in configuration mode. If the argument is set to I(block)
then the entire command block is pushed to the device in configuration mode
if any line is not correct. Note that this parameter will be ignored if the
platform has onbox diff support.
choices:
- line
- block
- config
diff_match:
description:
- Instructs the module on the way to perform the matching of the set of commands
against the current device config. If C(diff_match) is set to I(line), commands
are matched line by line. If C(diff_match) is set to I(strict), command lines
are matched with respect to position. If C(diff_match) is set to I(exact), command
lines must be an equal match. Finally, if C(diff_match) is set to I(none), the
module will not attempt to compare the source configuration with the running
configuration on the remote device. Note that this parameter will be ignored
if the platform has onbox diff support.
choices:
- line
- strict
- exact
- none
diff_ignore_lines:
description:
- Use this argument to specify one or more lines that should be ignored during
the diff. This is used for lines in the configuration that are automatically
updated by the system. This argument takes a list of regular expressions or
exact line matches. Note that this parameter will be ignored if the platform
has onbox diff support.
backup_options:
description:
- This is a dict object containing configurable options related to backup file
path. The value of this option is read only when C(backup) is set to I(yes),
if C(backup) is set to I(no) this option will be silently ignored.
suboptions:
filename:
description:
- The filename to be used to store the backup configuration. If the filename
is not given it will be generated based on the hostname, current time and
date in format defined by <hostname>_config.<current-date>@<current-time>
dir_path:
description:
- This option provides the path ending with directory name in which the backup
configuration file will be stored. If the directory does not exist it will
be first created and the filename is either the value of C(filename) or
default filename as described in C(filename) options description. If the
path value is not given in that case a I(backup) directory will be created
in the current working directory and backup configuration will be copied
in C(filename) within I(backup) directory.
type: path
type: dict
"""
EXAMPLES = """
- name: configure device with config
cli_config:
config: "{{ lookup('template', 'basic/config.j2') }}"
- name: multiline config
cli_config:
config: |
hostname foo
feature nxapi
- name: configure device with config with defaults enabled
cli_config:
config: "{{ lookup('template', 'basic/config.j2') }}"
defaults: yes
- name: Use diff_match
cli_config:
config: "{{ lookup('file', 'interface_config') }}"
diff_match: none
- name: nxos replace config
cli_config:
replace: 'bootflash:nxoscfg'
- name: junos replace config
cli_config:
replace: '/var/home/ansible/junos01.cfg'
- name: commit with comment
cli_config:
config: set system host-name foo
commit_comment: this is a test
- name: configurable backup path
cli_config:
config: "{{ lookup('template', 'basic/config.j2') }}"
backup: yes
backup_options:
filename: backup.cfg
dir_path: /home/user
"""
RETURN = """
commands:
description: The set of commands that will be pushed to the remote device
returned: always
type: list
sample: ['interface Loopback999', 'no shutdown']
backup_path:
description: The full path to the backup file
returned: when backup is yes
type: str
sample: /playbooks/ansible/backup/hostname_config.2016-07-16@22:28:34
"""
import json
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.connection import Connection
from ansible.module_utils.common.text.converters import to_text
def validate_args(module, device_operations):
"""validate param if it is supported on the platform
"""
feature_list = [
"replace",
"rollback",
"commit_comment",
"defaults",
"multiline_delimiter",
"diff_replace",
"diff_match",
"diff_ignore_lines",
]
for feature in feature_list:
if module.params[feature]:
supports_feature = device_operations.get("supports_%s" % feature)
if supports_feature is None:
module.fail_json(
"This platform does not specify whether %s is supported or not. "
"Please report an issue against this platform's cliconf plugin."
% feature
)
elif not supports_feature:
module.fail_json(
msg="Option %s is not supported on this platform" % feature
)
def run(
module, device_operations, connection, candidate, running, rollback_id
):
result = {}
resp = {}
config_diff = []
banner_diff = {}
replace = module.params["replace"]
commit_comment = module.params["commit_comment"]
multiline_delimiter = module.params["multiline_delimiter"]
diff_replace = module.params["diff_replace"]
diff_match = module.params["diff_match"]
diff_ignore_lines = module.params["diff_ignore_lines"]
commit = not module.check_mode
if replace in ("yes", "true", "True"):
replace = True
elif replace in ("no", "false", "False"):
replace = False
if (
replace is not None
and replace not in [True, False]
and candidate is not None
):
module.fail_json(
msg="Replace value '%s' is a configuration file path already"
" present on the device. Hence 'replace' and 'config' options"
" are mutually exclusive" % replace
)
if rollback_id is not None:
resp = connection.rollback(rollback_id, commit)
if "diff" in resp:
result["changed"] = True
elif device_operations.get("supports_onbox_diff"):
if diff_replace:
module.warn(
"diff_replace is ignored as the device supports onbox diff"
)
if diff_match:
module.warn(
"diff_mattch is ignored as the device supports onbox diff"
)
if diff_ignore_lines:
module.warn(
"diff_ignore_lines is ignored as the device supports onbox diff"
)
if candidate and not isinstance(candidate, list):
candidate = candidate.strip("\n").splitlines()
kwargs = {
"candidate": candidate,
"commit": commit,
"replace": replace,
"comment": commit_comment,
}
resp = connection.edit_config(**kwargs)
if "diff" in resp:
result["changed"] = True
elif device_operations.get("supports_generate_diff"):
kwargs = {"candidate": candidate, "running": running}
if diff_match:
kwargs.update({"diff_match": diff_match})
if diff_replace:
kwargs.update({"diff_replace": diff_replace})
if diff_ignore_lines:
kwargs.update({"diff_ignore_lines": diff_ignore_lines})
diff_response = connection.get_diff(**kwargs)
config_diff = diff_response.get("config_diff")
banner_diff = diff_response.get("banner_diff")
if config_diff:
if isinstance(config_diff, list):
candidate = config_diff
else:
candidate = config_diff.splitlines()
kwargs = {
"candidate": candidate,
"commit": commit,
"replace": replace,
"comment": commit_comment,
}
if commit:
connection.edit_config(**kwargs)
result["changed"] = True
result["commands"] = config_diff.split("\n")
if banner_diff:
candidate = json.dumps(banner_diff)
kwargs = {"candidate": candidate, "commit": commit}
if multiline_delimiter:
kwargs.update({"multiline_delimiter": multiline_delimiter})
if commit:
connection.edit_banner(**kwargs)
result["changed"] = True
if module._diff:
if "diff" in resp:
result["diff"] = {"prepared": resp["diff"]}
else:
diff = ""
if config_diff:
if isinstance(config_diff, list):
diff += "\n".join(config_diff)
else:
diff += config_diff
if banner_diff:
diff += json.dumps(banner_diff)
result["diff"] = {"prepared": diff}
return result
def main():
"""main entry point for execution
"""
backup_spec = dict(filename=dict(), dir_path=dict(type="path"))
argument_spec = dict(
backup=dict(default=False, type="bool"),
backup_options=dict(type="dict", options=backup_spec),
config=dict(type="str"),
commit=dict(type="bool"),
replace=dict(type="str"),
rollback=dict(type="int"),
commit_comment=dict(type="str"),
defaults=dict(default=False, type="bool"),
multiline_delimiter=dict(type="str"),
diff_replace=dict(choices=["line", "block", "config"]),
diff_match=dict(choices=["line", "strict", "exact", "none"]),
diff_ignore_lines=dict(type="list"),
)
mutually_exclusive = [("config", "rollback")]
required_one_of = [["backup", "config", "rollback"]]
module = AnsibleModule(
argument_spec=argument_spec,
mutually_exclusive=mutually_exclusive,
required_one_of=required_one_of,
supports_check_mode=True,
)
result = {"changed": False}
connection = Connection(module._socket_path)
capabilities = module.from_json(connection.get_capabilities())
if capabilities:
device_operations = capabilities.get("device_operations", dict())
validate_args(module, device_operations)
else:
device_operations = dict()
if module.params["defaults"]:
if "get_default_flag" in capabilities.get("rpc"):
flags = connection.get_default_flag()
else:
flags = "all"
else:
flags = []
candidate = module.params["config"]
candidate = (
to_text(candidate, errors="surrogate_then_replace")
if candidate
else None
)
running = connection.get_config(flags=flags)
rollback_id = module.params["rollback"]
if module.params["backup"]:
result["__backup__"] = running
if candidate or rollback_id or module.params["replace"]:
try:
result.update(
run(
module,
device_operations,
connection,
candidate,
running,
rollback_id,
)
)
except Exception as exc:
module.fail_json(msg=to_text(exc))
module.exit_json(**result)
if __name__ == "__main__":
main()

@ -1,184 +0,0 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
# (c) 2015 Toshio Kuratomi <tkuratomi@ansible.com>
# (c) 2017, Peter Sprygada <psprygad@redhat.com>
# (c) 2017 Ansible Project
from __future__ import annotations
import os
from ansible import constants as C
from ansible.plugins.connection import ConnectionBase
from ansible.plugins.loader import connection_loader
from ansible.utils.display import Display
from ansible.utils.path import unfrackpath
display = Display()
__all__ = ["NetworkConnectionBase"]
BUFSIZE = 65536
class NetworkConnectionBase(ConnectionBase):
"""
A base class for network-style connections.
"""
force_persistence = True
# Do not use _remote_is_local in other connections
_remote_is_local = True
def __init__(self, play_context, new_stdin, *args, **kwargs):
super(NetworkConnectionBase, self).__init__(
play_context, new_stdin, *args, **kwargs
)
self._messages = []
self._conn_closed = False
self._network_os = self._play_context.network_os
self._local = connection_loader.get("local", play_context, "/dev/null")
self._local.set_options()
self._sub_plugin = {}
self._cached_variables = (None, None, None)
# reconstruct the socket_path and set instance values accordingly
self._ansible_playbook_pid = kwargs.get("ansible_playbook_pid")
self._update_connection_state()
def __getattr__(self, name):
try:
return self.__dict__[name]
except KeyError:
if not name.startswith("_"):
plugin = self._sub_plugin.get("obj")
if plugin:
method = getattr(plugin, name, None)
if method is not None:
return method
raise AttributeError(
"'%s' object has no attribute '%s'"
% (self.__class__.__name__, name)
)
def exec_command(self, cmd, in_data=None, sudoable=True):
return self._local.exec_command(cmd, in_data, sudoable)
def queue_message(self, level, message):
"""
Adds a message to the queue of messages waiting to be pushed back to the controller process.
:arg level: A string which can either be the name of a method in display, or 'log'. When
the messages are returned to task_executor, a value of log will correspond to
``display.display(message, log_only=True)``, while another value will call ``display.[level](message)``
"""
self._messages.append((level, message))
def pop_messages(self):
messages, self._messages = self._messages, []
return messages
def put_file(self, in_path, out_path):
"""Transfer a file from local to remote"""
return self._local.put_file(in_path, out_path)
def fetch_file(self, in_path, out_path):
"""Fetch a file from remote to local"""
return self._local.fetch_file(in_path, out_path)
def reset(self):
"""
Reset the connection
"""
if self._socket_path:
self.queue_message(
"vvvv",
"resetting persistent connection for socket_path %s"
% self._socket_path,
)
self.close()
self.queue_message("vvvv", "reset call on connection instance")
def close(self):
self._conn_closed = True
if self._connected:
self._connected = False
def get_options(self, hostvars=None):
options = super(NetworkConnectionBase, self).get_options(
hostvars=hostvars
)
if (
self._sub_plugin.get("obj")
and self._sub_plugin.get("type") != "external"
):
try:
options.update(
self._sub_plugin["obj"].get_options(hostvars=hostvars)
)
except AttributeError:
pass
return options
def set_options(self, task_keys=None, var_options=None, direct=None):
super(NetworkConnectionBase, self).set_options(
task_keys=task_keys, var_options=var_options, direct=direct
)
if self.get_option("persistent_log_messages"):
warning = (
"Persistent connection logging is enabled for %s. This will log ALL interactions"
% self._play_context.remote_addr
)
logpath = getattr(C, "DEFAULT_LOG_PATH")
if logpath is not None:
warning += " to %s" % logpath
self.queue_message(
"warning",
"%s and WILL NOT redact sensitive configuration like passwords. USE WITH CAUTION!"
% warning,
)
if (
self._sub_plugin.get("obj")
and self._sub_plugin.get("type") != "external"
):
try:
self._sub_plugin["obj"].set_options(
task_keys=task_keys, var_options=var_options, direct=direct
)
except AttributeError:
pass
def _update_connection_state(self):
"""
Reconstruct the connection socket_path and check if it exists
If the socket path exists then the connection is active and set
both the _socket_path value to the path and the _connected value
to True. If the socket path doesn't exist, leave the socket path
value to None and the _connected value to False
"""
ssh = connection_loader.get("ssh", class_only=True)
control_path = ssh._create_control_path(
self._play_context.remote_addr,
self._play_context.port,
self._play_context.remote_user,
self._play_context.connection,
self._ansible_playbook_pid,
)
tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR)
socket_path = unfrackpath(control_path % dict(directory=tmp_path))
if os.path.exists(socket_path):
self._connected = True
self._socket_path = socket_path
def _log_messages(self, message):
if self.get_option("persistent_log_messages"):
self.queue_message("log", message)

@ -1,132 +0,0 @@
#
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
import sys
import copy
from ansible_collections.ansible.netcommon.plugins.action.network import (
ActionModule as ActionNetworkModule,
)
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
load_provider,
)
from ansible_collections.cisco.ios.plugins.module_utils.network.ios.ios import (
ios_provider_spec,
)
from ansible.utils.display import Display
display = Display()
class ActionModule(ActionNetworkModule):
def run(self, tmp=None, task_vars=None):
del tmp # tmp no longer has any effect
module_name = self._task.action.split(".")[-1]
self._config_module = True if module_name == "ios_config" else False
persistent_connection = self._play_context.connection.split(".")[-1]
warnings = []
if persistent_connection == "network_cli":
provider = self._task.args.get("provider", {})
if any(provider.values()):
display.warning(
"provider is unnecessary when using network_cli and will be ignored"
)
del self._task.args["provider"]
elif self._play_context.connection == "local":
provider = load_provider(ios_provider_spec, self._task.args)
pc = copy.deepcopy(self._play_context)
pc.connection = "ansible.netcommon.network_cli"
pc.network_os = "cisco.ios.ios"
pc.remote_addr = provider["host"] or self._play_context.remote_addr
pc.port = int(provider["port"] or self._play_context.port or 22)
pc.remote_user = (
provider["username"] or self._play_context.connection_user
)
pc.password = provider["password"] or self._play_context.password
pc.private_key_file = (
provider["ssh_keyfile"] or self._play_context.private_key_file
)
pc.become = provider["authorize"] or False
if pc.become:
pc.become_method = "enable"
pc.become_pass = provider["auth_pass"]
connection = self._shared_loader_obj.connection_loader.get(
"ansible.netcommon.persistent",
pc,
sys.stdin,
task_uuid=self._task._uuid,
)
# TODO: Remove below code after ansible minimal is cut out
if connection is None:
pc.connection = "network_cli"
pc.network_os = "ios"
connection = self._shared_loader_obj.connection_loader.get(
"persistent", pc, sys.stdin, task_uuid=self._task._uuid
)
display.vvv(
"using connection plugin %s (was local)" % pc.connection,
pc.remote_addr,
)
command_timeout = (
int(provider["timeout"])
if provider["timeout"]
else connection.get_option("persistent_command_timeout")
)
connection.set_options(
direct={"persistent_command_timeout": command_timeout}
)
socket_path = connection.run()
display.vvvv("socket_path: %s" % socket_path, pc.remote_addr)
if not socket_path:
return {
"failed": True,
"msg": "unable to open shell. Please see: "
+ "https://docs.ansible.com/ansible/latest/network/user_guide/network_debug_troubleshooting.html#category-unable-to-open-shell",
}
task_vars["ansible_socket"] = socket_path
warnings.append(
[
"connection local support for this module is deprecated and will be removed in version 2.14, use connection %s"
% pc.connection
]
)
else:
return {
"failed": True,
"msg": "Connection type %s is not valid for this module"
% self._play_context.connection,
}
result = super(ActionModule, self).run(task_vars=task_vars)
if warnings:
if "warnings" in result:
result["warnings"].extend(warnings)
else:
result["warnings"] = warnings
return result

@ -1,465 +0,0 @@
#
# (c) 2017 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
DOCUMENTATION = """
---
author: Ansible Networking Team
cliconf: ios
short_description: Use ios cliconf to run command on Cisco IOS platform
description:
- This ios plugin provides low level abstraction apis for
sending and receiving CLI commands from Cisco IOS network devices.
version_added: "2.4"
"""
import re
import time
import json
from collections.abc import Mapping
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.six import iteritems
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.config import (
NetworkConfig,
dumps,
)
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
to_list,
)
from ansible.plugins.cliconf import CliconfBase, enable_mode
class Cliconf(CliconfBase):
@enable_mode
def get_config(self, source="running", flags=None, format=None):
if source not in ("running", "startup"):
raise ValueError(
"fetching configuration from %s is not supported" % source
)
if format:
raise ValueError(
"'format' value %s is not supported for get_config" % format
)
if not flags:
flags = []
if source == "running":
cmd = "show running-config "
else:
cmd = "show startup-config "
cmd += " ".join(to_list(flags))
cmd = cmd.strip()
return self.send_command(cmd)
def get_diff(
self,
candidate=None,
running=None,
diff_match="line",
diff_ignore_lines=None,
path=None,
diff_replace="line",
):
"""
Generate diff between candidate and running configuration. If the
remote host supports onbox diff capabilities ie. supports_onbox_diff in that case
candidate and running configurations are not required to be passed as argument.
In case if onbox diff capability is not supported candidate argument is mandatory
and running argument is optional.
:param candidate: The configuration which is expected to be present on remote host.
:param running: The base configuration which is used to generate diff.
:param diff_match: Instructs how to match the candidate configuration with current device configuration
Valid values are 'line', 'strict', 'exact', 'none'.
'line' - commands are matched line by line
'strict' - command lines are matched with respect to position
'exact' - command lines must be an equal match
'none' - will not compare the candidate configuration with the running configuration
:param diff_ignore_lines: Use this argument to specify one or more lines that should be
ignored during the diff. This is used for lines in the configuration
that are automatically updated by the system. This argument takes
a list of regular expressions or exact line matches.
:param path: The ordered set of parents that uniquely identify the section or hierarchy
the commands should be checked against. If the parents argument
is omitted, the commands are checked against the set of top
level or global commands.
:param diff_replace: Instructs on the way to perform the configuration on the device.
If the replace argument is set to I(line) then the modified lines are
pushed to the device in configuration mode. If the replace argument is
set to I(block) then the entire command block is pushed to the device in
configuration mode if any line is not correct.
:return: Configuration diff in json format.
{
'config_diff': '',
'banner_diff': {}
}
"""
diff = {}
device_operations = self.get_device_operations()
option_values = self.get_option_values()
if candidate is None and device_operations["supports_generate_diff"]:
raise ValueError(
"candidate configuration is required to generate diff"
)
if diff_match not in option_values["diff_match"]:
raise ValueError(
"'match' value %s in invalid, valid values are %s"
% (diff_match, ", ".join(option_values["diff_match"]))
)
if diff_replace not in option_values["diff_replace"]:
raise ValueError(
"'replace' value %s in invalid, valid values are %s"
% (diff_replace, ", ".join(option_values["diff_replace"]))
)
# prepare candidate configuration
candidate_obj = NetworkConfig(indent=1)
want_src, want_banners = self._extract_banners(candidate)
candidate_obj.load(want_src)
if running and diff_match != "none":
# running configuration
have_src, have_banners = self._extract_banners(running)
running_obj = NetworkConfig(
indent=1, contents=have_src, ignore_lines=diff_ignore_lines
)
configdiffobjs = candidate_obj.difference(
running_obj, path=path, match=diff_match, replace=diff_replace
)
else:
configdiffobjs = candidate_obj.items
have_banners = {}
diff["config_diff"] = (
dumps(configdiffobjs, "commands") if configdiffobjs else ""
)
banners = self._diff_banners(want_banners, have_banners)
diff["banner_diff"] = banners if banners else {}
return diff
@enable_mode
def edit_config(
self, candidate=None, commit=True, replace=None, comment=None
):
resp = {}
operations = self.get_device_operations()
self.check_edit_config_capability(
operations, candidate, commit, replace, comment
)
results = []
requests = []
if commit:
self.send_command("configure terminal")
for line in to_list(candidate):
if not isinstance(line, Mapping):
line = {"command": line}
cmd = line["command"]
if cmd != "end" and cmd[0] != "!":
results.append(self.send_command(**line))
requests.append(cmd)
self.send_command("end")
else:
raise ValueError("check mode is not supported")
resp["request"] = requests
resp["response"] = results
return resp
def edit_macro(
self, candidate=None, commit=True, replace=None, comment=None
):
"""
ios_config:
lines: "{{ macro_lines }}"
parents: "macro name {{ macro_name }}"
after: '@'
match: line
replace: block
"""
resp = {}
operations = self.get_device_operations()
self.check_edit_config_capability(
operations, candidate, commit, replace, comment
)
results = []
requests = []
if commit:
commands = ""
self.send_command("config terminal")
time.sleep(0.1)
# first item: macro command
commands += candidate.pop(0) + "\n"
multiline_delimiter = candidate.pop(-1)
for line in candidate:
commands += " " + line + "\n"
commands += multiline_delimiter + "\n"
obj = {"command": commands, "sendonly": True}
results.append(self.send_command(**obj))
requests.append(commands)
time.sleep(0.1)
self.send_command("end", sendonly=True)
time.sleep(0.1)
results.append(self.send_command("\n"))
requests.append("\n")
resp["request"] = requests
resp["response"] = results
return resp
def get(
self,
command=None,
prompt=None,
answer=None,
sendonly=False,
output=None,
newline=True,
check_all=False,
):
if not command:
raise ValueError("must provide value of command to execute")
if output:
raise ValueError(
"'output' value %s is not supported for get" % output
)
return self.send_command(
command=command,
prompt=prompt,
answer=answer,
sendonly=sendonly,
newline=newline,
check_all=check_all,
)
def get_device_info(self):
device_info = {}
device_info["network_os"] = "ios"
reply = self.get(command="show version")
data = to_text(reply, errors="surrogate_or_strict").strip()
match = re.search(r"Version (\S+)", data)
if match:
device_info["network_os_version"] = match.group(1).strip(",")
model_search_strs = [
r"^[Cc]isco (.+) \(revision",
r"^[Cc]isco (\S+).+bytes of .*memory",
]
for item in model_search_strs:
match = re.search(item, data, re.M)
if match:
version = match.group(1).split(" ")
device_info["network_os_model"] = version[0]
break
match = re.search(r"^(.+) uptime", data, re.M)
if match:
device_info["network_os_hostname"] = match.group(1)
match = re.search(r'image file is "(.+)"', data)
if match:
device_info["network_os_image"] = match.group(1)
return device_info
def get_device_operations(self):
return {
"supports_diff_replace": True,
"supports_commit": False,
"supports_rollback": False,
"supports_defaults": True,
"supports_onbox_diff": False,
"supports_commit_comment": False,
"supports_multiline_delimiter": True,
"supports_diff_match": True,
"supports_diff_ignore_lines": True,
"supports_generate_diff": True,
"supports_replace": False,
}
def get_option_values(self):
return {
"format": ["text"],
"diff_match": ["line", "strict", "exact", "none"],
"diff_replace": ["line", "block"],
"output": [],
}
def get_capabilities(self):
result = super(Cliconf, self).get_capabilities()
result["rpc"] += [
"edit_banner",
"get_diff",
"run_commands",
"get_defaults_flag",
]
result["device_operations"] = self.get_device_operations()
result.update(self.get_option_values())
return json.dumps(result)
def edit_banner(
self, candidate=None, multiline_delimiter="@", commit=True
):
"""
Edit banner on remote device
:param banners: Banners to be loaded in json format
:param multiline_delimiter: Line delimiter for banner
:param commit: Boolean value that indicates if the device candidate
configuration should be pushed in the running configuration or discarded.
:param diff: Boolean flag to indicate if configuration that is applied on remote host should
generated and returned in response or not
:return: Returns response of executing the configuration command received
from remote host
"""
resp = {}
banners_obj = json.loads(candidate)
results = []
requests = []
if commit:
for key, value in iteritems(banners_obj):
key += " %s" % multiline_delimiter
self.send_command("config terminal", sendonly=True)
for cmd in [key, value, multiline_delimiter]:
obj = {"command": cmd, "sendonly": True}
results.append(self.send_command(**obj))
requests.append(cmd)
self.send_command("end", sendonly=True)
time.sleep(0.1)
results.append(self.send_command("\n"))
requests.append("\n")
resp["request"] = requests
resp["response"] = results
return resp
def run_commands(self, commands=None, check_rc=True):
if commands is None:
raise ValueError("'commands' value is required")
responses = list()
for cmd in to_list(commands):
if not isinstance(cmd, Mapping):
cmd = {"command": cmd}
output = cmd.pop("output", None)
if output:
raise ValueError(
"'output' value %s is not supported for run_commands"
% output
)
try:
out = self.send_command(**cmd)
except AnsibleConnectionFailure as e:
if check_rc:
raise
out = getattr(e, "err", to_text(e))
responses.append(out)
return responses
def get_defaults_flag(self):
"""
The method identifies the filter that should be used to fetch running-configuration
with defaults.
:return: valid default filter
"""
out = self.get("show running-config ?")
out = to_text(out, errors="surrogate_then_replace")
commands = set()
for line in out.splitlines():
if line.strip():
commands.add(line.strip().split()[0])
if "all" in commands:
return "all"
else:
return "full"
def set_cli_prompt_context(self):
"""
Make sure we are in the operational cli mode
:return: None
"""
if self._connection.connected:
out = self._connection.get_prompt()
if out is None:
raise AnsibleConnectionFailure(
message=u"cli prompt is not identified from the last received"
u" response window: %s"
% self._connection._last_recv_window
)
if re.search(
r"config.*\)#",
to_text(out, errors="surrogate_then_replace").strip(),
):
self._connection.queue_message(
"vvvv", "wrong context, sending end to device"
)
self._connection.send_command("end")
def _extract_banners(self, config):
banners = {}
banner_cmds = re.findall(r"^banner (\w+)", config, re.M)
for cmd in banner_cmds:
regex = r"banner %s \^C(.+?)(?=\^C)" % cmd
match = re.search(regex, config, re.S)
if match:
key = "banner %s" % cmd
banners[key] = match.group(1).strip()
for cmd in banner_cmds:
regex = r"banner %s \^C(.+?)(?=\^C)" % cmd
match = re.search(regex, config, re.S)
if match:
config = config.replace(str(match.group(1)), "")
config = re.sub(r"banner \w+ \^C\^C", "!! banner removed", config)
return config, banners
def _diff_banners(self, want, have):
candidate = {}
for key, value in iteritems(want):
if value != have.get(key):
candidate[key] = value
return candidate

@ -1,82 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2015, Peter Sprygada <psprygada@ansible.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
class ModuleDocFragment(object):
# Standard files documentation fragment
DOCUMENTATION = r"""options:
provider:
description:
- B(Deprecated)
- 'Starting with Ansible 2.5 we recommend using C(connection: network_cli).'
- For more information please see the L(IOS Platform Options guide, ../network/user_guide/platform_ios.html).
- HORIZONTALLINE
- A dict object containing connection details.
type: dict
suboptions:
host:
description:
- Specifies the DNS host name or address for connecting to the remote device
over the specified transport. The value of host is used as the destination
address for the transport.
type: str
required: true
port:
description:
- Specifies the port to use when building the connection to the remote device.
type: int
default: 22
username:
description:
- Configures the username to use to authenticate the connection to the remote
device. This value is used to authenticate the SSH session. If the value
is not specified in the task, the value of environment variable C(ANSIBLE_NET_USERNAME)
will be used instead.
type: str
password:
description:
- Specifies the password to use to authenticate the connection to the remote
device. This value is used to authenticate the SSH session. If the value
is not specified in the task, the value of environment variable C(ANSIBLE_NET_PASSWORD)
will be used instead.
type: str
timeout:
description:
- Specifies the timeout in seconds for communicating with the network device
for either connecting or sending commands. If the timeout is exceeded before
the operation is completed, the module will error.
type: int
default: 10
ssh_keyfile:
description:
- Specifies the SSH key to use to authenticate the connection to the remote
device. This value is the path to the key used to authenticate the SSH
session. If the value is not specified in the task, the value of environment
variable C(ANSIBLE_NET_SSH_KEYFILE) will be used instead.
type: path
authorize:
description:
- Instructs the module to enter privileged mode on the remote device before
sending any commands. If not specified, the device will attempt to execute
all commands in non-privileged mode. If the value is not specified in the
task, the value of environment variable C(ANSIBLE_NET_AUTHORIZE) will be
used instead.
type: bool
default: false
auth_pass:
description:
- Specifies the password to use if required to enter privileged mode on the
remote device. If I(authorize) is false, then this argument does nothing.
If the value is not specified in the task, the value of environment variable
C(ANSIBLE_NET_AUTH_PASS) will be used instead.
type: str
notes:
- For more information on using Ansible to manage network devices see the :ref:`Ansible
Network Guide <network_guide>`
- For more information on using Ansible to manage Cisco devices see the `Cisco integration
page <https://www.ansible.com/integrations/networks/cisco>`_.
"""

@ -1,199 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# (c) 2016 Red Hat Inc.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
from __future__ import annotations
import json
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.basic import env_fallback
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
to_list,
)
from ansible.module_utils.connection import Connection, ConnectionError
_DEVICE_CONFIGS = {}
ios_provider_spec = {
"host": dict(),
"port": dict(type="int"),
"username": dict(fallback=(env_fallback, ["ANSIBLE_NET_USERNAME"])),
"password": dict(
fallback=(env_fallback, ["ANSIBLE_NET_PASSWORD"]), no_log=True
),
"ssh_keyfile": dict(
fallback=(env_fallback, ["ANSIBLE_NET_SSH_KEYFILE"]), type="path"
),
"authorize": dict(
fallback=(env_fallback, ["ANSIBLE_NET_AUTHORIZE"]), type="bool"
),
"auth_pass": dict(
fallback=(env_fallback, ["ANSIBLE_NET_AUTH_PASS"]), no_log=True
),
"timeout": dict(type="int"),
}
ios_argument_spec = {
"provider": dict(
type="dict", options=ios_provider_spec, removed_in_version=2.14
)
}
def get_provider_argspec():
return ios_provider_spec
def get_connection(module):
if hasattr(module, "_ios_connection"):
return module._ios_connection
capabilities = get_capabilities(module)
network_api = capabilities.get("network_api")
if network_api == "cliconf":
module._ios_connection = Connection(module._socket_path)
else:
module.fail_json(msg="Invalid connection type %s" % network_api)
return module._ios_connection
def get_capabilities(module):
if hasattr(module, "_ios_capabilities"):
return module._ios_capabilities
try:
capabilities = Connection(module._socket_path).get_capabilities()
except ConnectionError as exc:
module.fail_json(msg=to_text(exc, errors="surrogate_then_replace"))
module._ios_capabilities = json.loads(capabilities)
return module._ios_capabilities
def get_defaults_flag(module):
connection = get_connection(module)
try:
out = connection.get_defaults_flag()
except ConnectionError as exc:
module.fail_json(msg=to_text(exc, errors="surrogate_then_replace"))
return to_text(out, errors="surrogate_then_replace").strip()
def get_config(module, flags=None):
flags = to_list(flags)
section_filter = False
if flags and "section" in flags[-1]:
section_filter = True
flag_str = " ".join(flags)
try:
return _DEVICE_CONFIGS[flag_str]
except KeyError:
connection = get_connection(module)
try:
out = connection.get_config(flags=flags)
except ConnectionError as exc:
if section_filter:
# Some ios devices don't understand `| section foo`
out = get_config(module, flags=flags[:-1])
else:
module.fail_json(
msg=to_text(exc, errors="surrogate_then_replace")
)
cfg = to_text(out, errors="surrogate_then_replace").strip()
_DEVICE_CONFIGS[flag_str] = cfg
return cfg
def run_commands(module, commands, check_rc=True):
connection = get_connection(module)
try:
return connection.run_commands(commands=commands, check_rc=check_rc)
except ConnectionError as exc:
module.fail_json(msg=to_text(exc))
def load_config(module, commands):
connection = get_connection(module)
try:
resp = connection.edit_config(commands)
return resp.get("response")
except ConnectionError as exc:
module.fail_json(msg=to_text(exc))
def normalize_interface(name):
"""Return the normalized interface name
"""
if not name:
return
def _get_number(name):
digits = ""
for char in name:
if char.isdigit() or char in "/.":
digits += char
return digits
if name.lower().startswith("gi"):
if_type = "GigabitEthernet"
elif name.lower().startswith("te"):
if_type = "TenGigabitEthernet"
elif name.lower().startswith("fa"):
if_type = "FastEthernet"
elif name.lower().startswith("fo"):
if_type = "FortyGigabitEthernet"
elif name.lower().startswith("et"):
if_type = "Ethernet"
elif name.lower().startswith("vl"):
if_type = "Vlan"
elif name.lower().startswith("lo"):
if_type = "loopback"
elif name.lower().startswith("po"):
if_type = "port-channel"
elif name.lower().startswith("nv"):
if_type = "nve"
elif name.lower().startswith("twe"):
if_type = "TwentyFiveGigE"
elif name.lower().startswith("hu"):
if_type = "HundredGigE"
else:
if_type = None
number_list = name.split(" ")
if len(number_list) == 2:
if_number = number_list[-1].strip()
else:
if_number = _get_number(name)
if if_type:
proper_interface = if_type + if_number
else:
proper_interface = name
return proper_interface

@ -1,230 +0,0 @@
#!/usr/bin/python
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
ANSIBLE_METADATA = {
"metadata_version": "1.1",
"status": ["preview"],
"supported_by": "network",
}
DOCUMENTATION = """module: ios_command
author: Peter Sprygada (@privateip)
short_description: Run commands on remote devices running Cisco IOS
description:
- Sends arbitrary commands to an ios node and returns the results read from the device.
This module includes an argument that will cause the module to wait for a specific
condition before returning or timing out if the condition is not met.
- This module does not support running commands in configuration mode. Please use
M(ios_config) to configure IOS devices.
extends_documentation_fragment:
- cisco.ios.ios
notes:
- Tested against IOS 15.6
options:
commands:
description:
- List of commands to send to the remote ios device over the configured provider.
The resulting output from the command is returned. If the I(wait_for) argument
is provided, the module is not returned until the condition is satisfied or
the number of retries has expired. If a command sent to the device requires
answering a prompt, it is possible to pass a dict containing I(command), I(answer)
and I(prompt). Common answers are 'y' or "\r" (carriage return, must be double
quotes). See examples.
required: true
wait_for:
description:
- List of conditions to evaluate against the output of the command. The task will
wait for each condition to be true before moving forward. If the conditional
is not true within the configured number of retries, the task fails. See examples.
aliases:
- waitfor
match:
description:
- The I(match) argument is used in conjunction with the I(wait_for) argument to
specify the match policy. Valid values are C(all) or C(any). If the value
is set to C(all) then all conditionals in the wait_for must be satisfied. If
the value is set to C(any) then only one of the values must be satisfied.
default: all
choices:
- any
- all
retries:
description:
- Specifies the number of retries a command should by tried before it is considered
failed. The command is run on the target device every retry and evaluated against
the I(wait_for) conditions.
default: 10
interval:
description:
- Configures the interval in seconds to wait between retries of the command. If
the command does not pass the specified conditions, the interval indicates how
long to wait before trying the command again.
default: 1
"""
EXAMPLES = r"""
tasks:
- name: run show version on remote devices
ios_command:
commands: show version
- name: run show version and check to see if output contains IOS
ios_command:
commands: show version
wait_for: result[0] contains IOS
- name: run multiple commands on remote nodes
ios_command:
commands:
- show version
- show interfaces
- name: run multiple commands and evaluate the output
ios_command:
commands:
- show version
- show interfaces
wait_for:
- result[0] contains IOS
- result[1] contains Loopback0
- name: run commands that require answering a prompt
ios_command:
commands:
- command: 'clear counters GigabitEthernet0/1'
prompt: 'Clear "show interface" counters on this interface \[confirm\]'
answer: 'y'
- command: 'clear counters GigabitEthernet0/2'
prompt: '[confirm]'
answer: "\r"
"""
RETURN = """
stdout:
description: The set of responses from the commands
returned: always apart from low level errors (such as action plugin)
type: list
sample: ['...', '...']
stdout_lines:
description: The value of stdout split into a list
returned: always apart from low level errors (such as action plugin)
type: list
sample: [['...', '...'], ['...'], ['...']]
failed_conditions:
description: The list of conditionals that have failed
returned: failed
type: list
sample: ['...', '...']
"""
import time
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.parsing import (
Conditional,
)
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
transform_commands,
to_lines,
)
from ansible_collections.cisco.ios.plugins.module_utils.network.ios.ios import (
run_commands,
)
from ansible_collections.cisco.ios.plugins.module_utils.network.ios.ios import (
ios_argument_spec,
)
def parse_commands(module, warnings):
commands = transform_commands(module)
if module.check_mode:
for item in list(commands):
if not item["command"].startswith("show"):
warnings.append(
"Only show commands are supported when using check mode, not "
"executing %s" % item["command"]
)
commands.remove(item)
return commands
def main():
"""main entry point for module execution
"""
argument_spec = dict(
commands=dict(type="list", required=True),
wait_for=dict(type="list", aliases=["waitfor"]),
match=dict(default="all", choices=["all", "any"]),
retries=dict(default=10, type="int"),
interval=dict(default=1, type="int"),
)
argument_spec.update(ios_argument_spec)
module = AnsibleModule(
argument_spec=argument_spec, supports_check_mode=True
)
warnings = list()
result = {"changed": False, "warnings": warnings}
commands = parse_commands(module, warnings)
wait_for = module.params["wait_for"] or list()
try:
conditionals = [Conditional(c) for c in wait_for]
except AttributeError as exc:
module.fail_json(msg=to_text(exc))
retries = module.params["retries"]
interval = module.params["interval"]
match = module.params["match"]
while retries > 0:
responses = run_commands(module, commands)
for item in list(conditionals):
if item(responses):
if match == "any":
conditionals = list()
break
conditionals.remove(item)
if not conditionals:
break
time.sleep(interval)
retries -= 1
if conditionals:
failed_conditions = [item.raw for item in conditionals]
msg = "One or more conditional statements have not been satisfied"
module.fail_json(msg=msg, failed_conditions=failed_conditions)
result.update(
{"stdout": responses, "stdout_lines": list(to_lines(responses))}
)
module.exit_json(**result)
if __name__ == "__main__":
main()

@ -1,599 +0,0 @@
#!/usr/bin/python
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
ANSIBLE_METADATA = {
"metadata_version": "1.1",
"status": ["preview"],
"supported_by": "network",
}
DOCUMENTATION = """module: ios_config
author: Peter Sprygada (@privateip)
short_description: Manage Cisco IOS configuration sections
description:
- Cisco IOS configurations use a simple block indent file syntax for segmenting configuration
into sections. This module provides an implementation for working with IOS configuration
sections in a deterministic way.
extends_documentation_fragment:
- cisco.ios.ios
notes:
- Tested against IOS 15.6
- Abbreviated commands are NOT idempotent,
see L(Network FAQ,../network/user_guide/faq.html#why-do-the-config-modules-always-return-changed-true-with-abbreviated-commands).
options:
lines:
description:
- The ordered set of commands that should be configured in the section. The commands
must be the exact same commands as found in the device running-config. Be sure
to note the configuration command syntax as some commands are automatically
modified by the device config parser.
aliases:
- commands
parents:
description:
- The ordered set of parents that uniquely identify the section or hierarchy the
commands should be checked against. If the parents argument is omitted, the
commands are checked against the set of top level or global commands.
src:
description:
- Specifies the source path to the file that contains the configuration or configuration
template to load. The path to the source file can either be the full path on
the Ansible control host or a relative path from the playbook or role root directory. This
argument is mutually exclusive with I(lines), I(parents).
before:
description:
- The ordered set of commands to push on to the command stack if a change needs
to be made. This allows the playbook designer the opportunity to perform configuration
commands prior to pushing any changes without affecting how the set of commands
are matched against the system.
after:
description:
- The ordered set of commands to append to the end of the command stack if a change
needs to be made. Just like with I(before) this allows the playbook designer
to append a set of commands to be executed after the command set.
match:
description:
- Instructs the module on the way to perform the matching of the set of commands
against the current device config. If match is set to I(line), commands are
matched line by line. If match is set to I(strict), command lines are matched
with respect to position. If match is set to I(exact), command lines must be
an equal match. Finally, if match is set to I(none), the module will not attempt
to compare the source configuration with the running configuration on the remote
device.
choices:
- line
- strict
- exact
- none
default: line
replace:
description:
- Instructs the module on the way to perform the configuration on the device.
If the replace argument is set to I(line) then the modified lines are pushed
to the device in configuration mode. If the replace argument is set to I(block)
then the entire command block is pushed to the device in configuration mode
if any line is not correct.
default: line
choices:
- line
- block
multiline_delimiter:
description:
- This argument is used when pushing a multiline configuration element to the
IOS device. It specifies the character to use as the delimiting character. This
only applies to the configuration action.
default: '@'
backup:
description:
- This argument will cause the module to create a full backup of the current C(running-config)
from the remote device before any changes are made. If the C(backup_options)
value is not given, the backup file is written to the C(backup) folder in the
playbook root directory or role root directory, if playbook is part of an ansible
role. If the directory does not exist, it is created.
type: bool
default: 'no'
running_config:
description:
- The module, by default, will connect to the remote device and retrieve the current
running-config to use as a base for comparing against the contents of source.
There are times when it is not desirable to have the task get the current running-config
for every task in a playbook. The I(running_config) argument allows the implementer
to pass in the configuration to use as the base config for comparison.
aliases:
- config
defaults:
description:
- This argument specifies whether or not to collect all defaults when getting
the remote device running config. When enabled, the module will get the current
config by issuing the command C(show running-config all).
type: bool
default: 'no'
save_when:
description:
- When changes are made to the device running-configuration, the changes are not
copied to non-volatile storage by default. Using this argument will change
that before. If the argument is set to I(always), then the running-config will
always be copied to the startup-config and the I(modified) flag will always
be set to True. If the argument is set to I(modified), then the running-config
will only be copied to the startup-config if it has changed since the last save
to startup-config. If the argument is set to I(never), the running-config will
never be copied to the startup-config. If the argument is set to I(changed),
then the running-config will only be copied to the startup-config if the task
has made a change. I(changed) was added in Ansible 2.5.
default: never
choices:
- always
- never
- modified
- changed
diff_against:
description:
- When using the C(ansible-playbook --diff) command line argument the module can
generate diffs against different sources.
- When this option is configure as I(startup), the module will return the diff
of the running-config against the startup-config.
- When this option is configured as I(intended), the module will return the diff
of the running-config against the configuration provided in the C(intended_config)
argument.
- When this option is configured as I(running), the module will return the before
and after diff of the running-config with respect to any changes made to the
device configuration.
choices:
- running
- startup
- intended
diff_ignore_lines:
description:
- Use this argument to specify one or more lines that should be ignored during
the diff. This is used for lines in the configuration that are automatically
updated by the system. This argument takes a list of regular expressions or
exact line matches.
intended_config:
description:
- The C(intended_config) provides the master configuration that the node should
conform to and is used to check the final running-config against. This argument
will not modify any settings on the remote device and is strictly used to check
the compliance of the current device's configuration against. When specifying
this argument, the task should also modify the C(diff_against) value and set
it to I(intended).
backup_options:
description:
- This is a dict object containing configurable options related to backup file
path. The value of this option is read only when C(backup) is set to I(yes),
if C(backup) is set to I(no) this option will be silently ignored.
suboptions:
filename:
description:
- The filename to be used to store the backup configuration. If the filename
is not given it will be generated based on the hostname, current time and
date in format defined by <hostname>_config.<current-date>@<current-time>
dir_path:
description:
- This option provides the path ending with directory name in which the backup
configuration file will be stored. If the directory does not exist it will
be first created and the filename is either the value of C(filename) or
default filename as described in C(filename) options description. If the
path value is not given in that case a I(backup) directory will be created
in the current working directory and backup configuration will be copied
in C(filename) within I(backup) directory.
type: path
type: dict
"""
EXAMPLES = """
- name: configure top level configuration
ios_config:
lines: hostname {{ inventory_hostname }}
- name: configure interface settings
ios_config:
lines:
- description test interface
- ip address 172.31.1.1 255.255.255.0
parents: interface Ethernet1
- name: configure ip helpers on multiple interfaces
ios_config:
lines:
- ip helper-address 172.26.1.10
- ip helper-address 172.26.3.8
parents: "{{ item }}"
with_items:
- interface Ethernet1
- interface Ethernet2
- interface GigabitEthernet1
- name: configure policer in Scavenger class
ios_config:
lines:
- conform-action transmit
- exceed-action drop
parents:
- policy-map Foo
- class Scavenger
- police cir 64000
- name: load new acl into device
ios_config:
lines:
- 10 permit ip host 192.0.2.1 any log
- 20 permit ip host 192.0.2.2 any log
- 30 permit ip host 192.0.2.3 any log
- 40 permit ip host 192.0.2.4 any log
- 50 permit ip host 192.0.2.5 any log
parents: ip access-list extended test
before: no ip access-list extended test
match: exact
- name: check the running-config against master config
ios_config:
diff_against: intended
intended_config: "{{ lookup('file', 'master.cfg') }}"
- name: check the startup-config against the running-config
ios_config:
diff_against: startup
diff_ignore_lines:
- ntp clock .*
- name: save running to startup when modified
ios_config:
save_when: modified
- name: for idempotency, use full-form commands
ios_config:
lines:
# - shut
- shutdown
# parents: int gig1/0/11
parents: interface GigabitEthernet1/0/11
# Set boot image based on comparison to a group_var (version) and the version
# that is returned from the `ios_facts` module
- name: SETTING BOOT IMAGE
ios_config:
lines:
- no boot system
- boot system flash bootflash:{{new_image}}
host: "{{ inventory_hostname }}"
when: ansible_net_version != version
- name: render a Jinja2 template onto an IOS device
ios_config:
backup: yes
src: ios_template.j2
- name: configurable backup path
ios_config:
src: ios_template.j2
backup: yes
backup_options:
filename: backup.cfg
dir_path: /home/user
"""
RETURN = """
updates:
description: The set of commands that will be pushed to the remote device
returned: always
type: list
sample: ['hostname foo', 'router ospf 1', 'router-id 192.0.2.1']
commands:
description: The set of commands that will be pushed to the remote device
returned: always
type: list
sample: ['hostname foo', 'router ospf 1', 'router-id 192.0.2.1']
backup_path:
description: The full path to the backup file
returned: when backup is yes
type: str
sample: /playbooks/ansible/backup/ios_config.2016-07-16@22:28:34
filename:
description: The name of the backup file
returned: when backup is yes and filename is not specified in backup options
type: str
sample: ios_config.2016-07-16@22:28:34
shortname:
description: The full path to the backup file excluding the timestamp
returned: when backup is yes and filename is not specified in backup options
type: str
sample: /playbooks/ansible/backup/ios_config
date:
description: The date extracted from the backup file name
returned: when backup is yes
type: str
sample: "2016-07-16"
time:
description: The time extracted from the backup file name
returned: when backup is yes
type: str
sample: "22:28:34"
"""
import json
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.connection import ConnectionError
from ansible_collections.cisco.ios.plugins.module_utils.network.ios.ios import (
run_commands,
get_config,
)
from ansible_collections.cisco.ios.plugins.module_utils.network.ios.ios import (
get_defaults_flag,
get_connection,
)
from ansible_collections.cisco.ios.plugins.module_utils.network.ios.ios import (
ios_argument_spec,
)
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.config import (
NetworkConfig,
dumps,
)
def check_args(module, warnings):
if module.params["multiline_delimiter"]:
if len(module.params["multiline_delimiter"]) != 1:
module.fail_json(
msg="multiline_delimiter value can only be a "
"single character"
)
def edit_config_or_macro(connection, commands):
# only catch the macro configuration command,
# not negated 'no' variation.
if commands[0].startswith("macro name"):
connection.edit_macro(candidate=commands)
else:
connection.edit_config(candidate=commands)
def get_candidate_config(module):
candidate = ""
if module.params["src"]:
candidate = module.params["src"]
elif module.params["lines"]:
candidate_obj = NetworkConfig(indent=1)
parents = module.params["parents"] or list()
candidate_obj.add(module.params["lines"], parents=parents)
candidate = dumps(candidate_obj, "raw")
return candidate
def get_running_config(module, current_config=None, flags=None):
running = module.params["running_config"]
if not running:
if not module.params["defaults"] and current_config:
running = current_config
else:
running = get_config(module, flags=flags)
return running
def save_config(module, result):
result["changed"] = True
if not module.check_mode:
run_commands(module, "copy running-config startup-config\r")
else:
module.warn(
"Skipping command `copy running-config startup-config` "
"due to check_mode. Configuration not copied to "
"non-volatile storage"
)
def main():
""" main entry point for module execution
"""
backup_spec = dict(filename=dict(), dir_path=dict(type="path"))
argument_spec = dict(
src=dict(type="path"),
lines=dict(aliases=["commands"], type="list"),
parents=dict(type="list"),
before=dict(type="list"),
after=dict(type="list"),
match=dict(
default="line", choices=["line", "strict", "exact", "none"]
),
replace=dict(default="line", choices=["line", "block"]),
multiline_delimiter=dict(default="@"),
running_config=dict(aliases=["config"]),
intended_config=dict(),
defaults=dict(type="bool", default=False),
backup=dict(type="bool", default=False),
backup_options=dict(type="dict", options=backup_spec),
save_when=dict(
choices=["always", "never", "modified", "changed"], default="never"
),
diff_against=dict(choices=["startup", "intended", "running"]),
diff_ignore_lines=dict(type="list"),
)
argument_spec.update(ios_argument_spec)
mutually_exclusive = [("lines", "src"), ("parents", "src")]
required_if = [
("match", "strict", ["lines"]),
("match", "exact", ["lines"]),
("replace", "block", ["lines"]),
("diff_against", "intended", ["intended_config"]),
]
module = AnsibleModule(
argument_spec=argument_spec,
mutually_exclusive=mutually_exclusive,
required_if=required_if,
supports_check_mode=True,
)
result = {"changed": False}
warnings = list()
check_args(module, warnings)
result["warnings"] = warnings
diff_ignore_lines = module.params["diff_ignore_lines"]
config = None
contents = None
flags = get_defaults_flag(module) if module.params["defaults"] else []
connection = get_connection(module)
if module.params["backup"] or (
module._diff and module.params["diff_against"] == "running"
):
contents = get_config(module, flags=flags)
config = NetworkConfig(indent=1, contents=contents)
if module.params["backup"]:
result["__backup__"] = contents
if any((module.params["lines"], module.params["src"])):
match = module.params["match"]
replace = module.params["replace"]
path = module.params["parents"]
candidate = get_candidate_config(module)
running = get_running_config(module, contents, flags=flags)
try:
response = connection.get_diff(
candidate=candidate,
running=running,
diff_match=match,
diff_ignore_lines=diff_ignore_lines,
path=path,
diff_replace=replace,
)
except ConnectionError as exc:
module.fail_json(msg=to_text(exc, errors="surrogate_then_replace"))
config_diff = response["config_diff"]
banner_diff = response["banner_diff"]
if config_diff or banner_diff:
commands = config_diff.split("\n")
if module.params["before"]:
commands[:0] = module.params["before"]
if module.params["after"]:
commands.extend(module.params["after"])
result["commands"] = commands
result["updates"] = commands
result["banners"] = banner_diff
# send the configuration commands to the device and merge
# them with the current running config
if not module.check_mode:
if commands:
edit_config_or_macro(connection, commands)
if banner_diff:
connection.edit_banner(
candidate=json.dumps(banner_diff),
multiline_delimiter=module.params[
"multiline_delimiter"
],
)
result["changed"] = True
running_config = module.params["running_config"]
startup_config = None
if module.params["save_when"] == "always":
save_config(module, result)
elif module.params["save_when"] == "modified":
output = run_commands(
module, ["show running-config", "show startup-config"]
)
running_config = NetworkConfig(
indent=1, contents=output[0], ignore_lines=diff_ignore_lines
)
startup_config = NetworkConfig(
indent=1, contents=output[1], ignore_lines=diff_ignore_lines
)
if running_config.sha1 != startup_config.sha1:
save_config(module, result)
elif module.params["save_when"] == "changed" and result["changed"]:
save_config(module, result)
if module._diff:
if not running_config:
output = run_commands(module, "show running-config")
contents = output[0]
else:
contents = running_config
# recreate the object in order to process diff_ignore_lines
running_config = NetworkConfig(
indent=1, contents=contents, ignore_lines=diff_ignore_lines
)
if module.params["diff_against"] == "running":
if module.check_mode:
module.warn(
"unable to perform diff against running-config due to check mode"
)
contents = None
else:
contents = config.config_text
elif module.params["diff_against"] == "startup":
if not startup_config:
output = run_commands(module, "show startup-config")
contents = output[0]
else:
contents = startup_config.config_text
elif module.params["diff_against"] == "intended":
contents = module.params["intended_config"]
if contents is not None:
base_config = NetworkConfig(
indent=1, contents=contents, ignore_lines=diff_ignore_lines
)
if running_config.sha1 != base_config.sha1:
before, after = "", ""
if module.params["diff_against"] == "intended":
before = running_config
after = base_config
elif module.params["diff_against"] in ("startup", "running"):
before = base_config
after = running_config
result.update(
{
"changed": True,
"diff": {"before": str(before), "after": str(after)},
}
)
module.exit_json(**result)
if __name__ == "__main__":
main()

@ -1,114 +0,0 @@
#
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
import json
import re
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.plugins.terminal import TerminalBase
from ansible.utils.display import Display
display = Display()
class TerminalModule(TerminalBase):
terminal_stdout_re = [
re.compile(br"[\r\n]?[\w\+\-\.:\/\[\]]+(?:\([^\)]+\)){0,3}(?:[>#]) ?$")
]
terminal_stderr_re = [
re.compile(br"% ?Error"),
# re.compile(br"^% \w+", re.M),
re.compile(br"% ?Bad secret"),
re.compile(br"[\r\n%] Bad passwords"),
re.compile(br"invalid input", re.I),
re.compile(br"(?:incomplete|ambiguous) command", re.I),
re.compile(br"connection timed out", re.I),
re.compile(br"[^\r\n]+ not found"),
re.compile(br"'[^']' +returned error code: ?\d+"),
re.compile(br"Bad mask", re.I),
re.compile(br"% ?(\S+) ?overlaps with ?(\S+)", re.I),
re.compile(br"[%\S] ?Error: ?[\s]+", re.I),
re.compile(br"[%\S] ?Informational: ?[\s]+", re.I),
re.compile(br"Command authorization failed"),
]
def on_open_shell(self):
try:
self._exec_cli_command(b"terminal length 0")
except AnsibleConnectionFailure:
raise AnsibleConnectionFailure("unable to set terminal parameters")
try:
self._exec_cli_command(b"terminal width 512")
try:
self._exec_cli_command(b"terminal width 0")
except AnsibleConnectionFailure:
pass
except AnsibleConnectionFailure:
display.display(
"WARNING: Unable to set terminal width, command responses may be truncated"
)
def on_become(self, passwd=None):
if self._get_prompt().endswith(b"#"):
return
cmd = {u"command": u"enable"}
if passwd:
# Note: python-3.5 cannot combine u"" and r"" together. Thus make
# an r string and use to_text to ensure it's text on both py2 and py3.
cmd[u"prompt"] = to_text(
r"[\r\n]?(?:.*)?[Pp]assword: ?$", errors="surrogate_or_strict"
)
cmd[u"answer"] = passwd
cmd[u"prompt_retry_check"] = True
try:
self._exec_cli_command(
to_bytes(json.dumps(cmd), errors="surrogate_or_strict")
)
prompt = self._get_prompt()
if prompt is None or not prompt.endswith(b"#"):
raise AnsibleConnectionFailure(
"failed to elevate privilege to enable mode still at prompt [%s]"
% prompt
)
except AnsibleConnectionFailure as e:
prompt = self._get_prompt()
raise AnsibleConnectionFailure(
"unable to elevate privilege to enable mode, at prompt [%s] with error: %s"
% (prompt, e.message)
)
def on_unbecome(self):
prompt = self._get_prompt()
if prompt is None:
# if prompt is None most likely the terminal is hung up at a prompt
return
if b"(config" in prompt:
self._exec_cli_command(b"end")
self._exec_cli_command(b"disable")
elif prompt.endswith(b"#"):
self._exec_cli_command(b"disable")
Loading…
Cancel
Save