From 086046d4780c306b4892d4d1d053470a64612ae7 Mon Sep 17 00:00:00 2001 From: Matt Clay Date: Thu, 23 Feb 2023 13:19:19 -0800 Subject: [PATCH] Remove unused network test support files (#80080) * Remove unused network test support files * Remove obsolete ignores --- test/sanity/ignore.txt | 4 - .../netcommon/plugins/action/net_base.py | 90 -- .../netcommon/plugins/become/enable.py | 42 - .../netcommon/plugins/connection/httpapi.py | 324 ----- .../netcommon/plugins/connection/netconf.py | 404 ------ .../plugins/doc_fragments/netconf.py | 66 - .../plugins/doc_fragments/network_agnostic.py | 14 - .../netcommon/plugins/filter/ipaddr.py | 1186 ----------------- .../netcommon/plugins/filter/network.py | 531 -------- .../netcommon/plugins/httpapi/restconf.py | 91 -- .../module_utils/network/netconf/netconf.py | 147 -- .../module_utils/network/restconf/restconf.py | 61 - .../netcommon/plugins/modules/net_get.py | 71 - .../netcommon/plugins/modules/net_put.py | 82 -- .../netcommon/plugins/netconf/default.py | 70 - 15 files changed, 3183 deletions(-) delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/action/net_base.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/become/enable.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/httpapi.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/netconf.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/netconf.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/network_agnostic.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/httpapi/restconf.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/restconf/restconf.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_get.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_put.py delete mode 100644 test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py diff --git a/test/sanity/ignore.txt b/test/sanity/ignore.txt index 577e59d0c8b..272b59bc200 100644 --- a/test/sanity/ignore.txt +++ b/test/sanity/ignore.txt @@ -183,7 +183,6 @@ test/support/integration/plugins/modules/sefcontext.py pylint:unused-import test/support/integration/plugins/modules/zypper.py pylint:unused-import test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/utils.py pylint:unused-import test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/network_cli.py pylint:pointless-string-statement -test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py pylint:pointless-string-statement test/support/windows-integration/plugins/action/win_reboot.py pylint:unused-import test/support/integration/plugins/modules/timezone.py pylint:disallowed-name test/support/integration/plugins/module_utils/compat/ipaddress.py future-import-boilerplate @@ -192,12 +191,9 @@ test/support/integration/plugins/module_utils/compat/ipaddress.py no-unicode-lit test/support/integration/plugins/module_utils/network/common/utils.py future-import-boilerplate test/support/integration/plugins/module_utils/network/common/utils.py metaclass-boilerplate test/support/integration/plugins/module_utils/network/common/utils.py pylint:use-a-generator -test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py pylint:used-before-assignment -test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py pylint:consider-using-dict-comprehension test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/compat/ipaddress.py no-unicode-literals test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/facts/facts.py pylint:unnecessary-comprehension test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/utils.py pylint:use-a-generator -test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py pylint:unnecessary-comprehension test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/cliconf/ios.py pylint:arguments-renamed test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/modules/ios_config.py pep8:E501 test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/modules/ios_config.py pylint:used-before-assignment diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/action/net_base.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/action/net_base.py deleted file mode 100644 index 542dcfef874..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/action/net_base.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright: (c) 2015, Ansible Inc, -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - -import copy - -from ansible.errors import AnsibleError -from ansible.plugins.action import ActionBase -from ansible.utils.display import Display - -display = Display() - - -class ActionModule(ActionBase): - def run(self, tmp=None, task_vars=None): - del tmp # tmp no longer has any effect - - result = {} - play_context = copy.deepcopy(self._play_context) - play_context.network_os = self._get_network_os(task_vars) - new_task = self._task.copy() - - module = self._get_implementation_module( - play_context.network_os, self._task.action - ) - if not module: - if self._task.args["fail_on_missing_module"]: - result["failed"] = True - else: - result["failed"] = False - - result["msg"] = ( - "Could not find implementation module %s for %s" - % (self._task.action, play_context.network_os) - ) - return result - - new_task.action = module - - action = self._shared_loader_obj.action_loader.get( - play_context.network_os, - task=new_task, - connection=self._connection, - play_context=play_context, - loader=self._loader, - templar=self._templar, - shared_loader_obj=self._shared_loader_obj, - ) - display.vvvv("Running implementation module %s" % module) - return action.run(task_vars=task_vars) - - def _get_network_os(self, task_vars): - if "network_os" in self._task.args and self._task.args["network_os"]: - display.vvvv("Getting network OS from task argument") - network_os = self._task.args["network_os"] - elif self._play_context.network_os: - display.vvvv("Getting network OS from inventory") - network_os = self._play_context.network_os - elif ( - "network_os" in task_vars.get("ansible_facts", {}) - and task_vars["ansible_facts"]["network_os"] - ): - display.vvvv("Getting network OS from fact") - network_os = task_vars["ansible_facts"]["network_os"] - else: - raise AnsibleError( - "ansible_network_os must be specified on this host to use platform agnostic modules" - ) - - return network_os - - def _get_implementation_module(self, network_os, platform_agnostic_module): - module_name = ( - network_os.split(".")[-1] - + "_" - + platform_agnostic_module.partition("_")[2] - ) - if "." in network_os: - fqcn_module = ".".join(network_os.split(".")[0:-1]) - implementation_module = fqcn_module + "." + module_name - else: - implementation_module = module_name - - if implementation_module not in self._shared_loader_obj.module_loader: - implementation_module = None - - return implementation_module diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/become/enable.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/become/enable.py deleted file mode 100644 index 33938fd1e70..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/become/enable.py +++ /dev/null @@ -1,42 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright: (c) 2018, Ansible Project -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - -DOCUMENTATION = """become: enable -short_description: Switch to elevated permissions on a network device -description: -- This become plugins allows elevated permissions on a remote network device. -author: ansible (@core) -options: - become_pass: - description: password - ini: - - section: enable_become_plugin - key: password - vars: - - name: ansible_become_password - - name: ansible_become_pass - - name: ansible_enable_pass - env: - - name: ANSIBLE_BECOME_PASS - - name: ANSIBLE_ENABLE_PASS -notes: -- enable is really implemented in the network connection handler and as such can only - be used with network connections. -- This plugin ignores the 'become_exe' and 'become_user' settings as it uses an API - and not an executable. -""" - -from ansible.plugins.become import BecomeBase - - -class BecomeModule(BecomeBase): - - name = "ansible.netcommon.enable" - - def build_become_command(self, cmd, shell): - # enable is implemented inside the network connection plugins - return cmd diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/httpapi.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/httpapi.py deleted file mode 100644 index b063ef0d606..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/httpapi.py +++ /dev/null @@ -1,324 +0,0 @@ -# (c) 2018 Red Hat Inc. -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - -DOCUMENTATION = """author: Ansible Networking Team -connection: httpapi -short_description: Use httpapi to run command on network appliances -description: -- This connection plugin provides a connection to remote devices over a HTTP(S)-based - api. -options: - host: - description: - - Specifies the remote device FQDN or IP address to establish the HTTP(S) connection - to. - default: inventory_hostname - vars: - - name: ansible_host - port: - type: int - description: - - Specifies the port on the remote device that listens for connections when establishing - the HTTP(S) connection. - - When unspecified, will pick 80 or 443 based on the value of use_ssl. - ini: - - section: defaults - key: remote_port - env: - - name: ANSIBLE_REMOTE_PORT - vars: - - name: ansible_httpapi_port - network_os: - description: - - Configures the device platform network operating system. This value is used - to load the correct httpapi plugin to communicate with the remote device - vars: - - name: ansible_network_os - remote_user: - description: - - The username used to authenticate to the remote device when the API connection - is first established. If the remote_user is not specified, the connection will - use the username of the logged in user. - - Can be configured from the CLI via the C(--user) or C(-u) options. - ini: - - section: defaults - key: remote_user - env: - - name: ANSIBLE_REMOTE_USER - vars: - - name: ansible_user - password: - description: - - Configures the user password used to authenticate to the remote device when - needed for the device API. - vars: - - name: ansible_password - - name: ansible_httpapi_pass - - name: ansible_httpapi_password - use_ssl: - type: boolean - description: - - Whether to connect using SSL (HTTPS) or not (HTTP). - default: false - vars: - - name: ansible_httpapi_use_ssl - validate_certs: - type: boolean - description: - - Whether to validate SSL certificates - default: true - vars: - - name: ansible_httpapi_validate_certs - use_proxy: - type: boolean - description: - - Whether to use https_proxy for requests. - default: true - vars: - - name: ansible_httpapi_use_proxy - become: - type: boolean - description: - - The become option will instruct the CLI session to attempt privilege escalation - on platforms that support it. Normally this means transitioning from user mode - to C(enable) mode in the CLI session. If become is set to True and the remote - device does not support privilege escalation or the privilege has already been - elevated, then this option is silently ignored. - - Can be configured from the CLI via the C(--become) or C(-b) options. - default: false - ini: - - section: privilege_escalation - key: become - env: - - name: ANSIBLE_BECOME - vars: - - name: ansible_become - become_method: - description: - - This option allows the become method to be specified in for handling privilege - escalation. Typically the become_method value is set to C(enable) but could - be defined as other values. - default: sudo - ini: - - section: privilege_escalation - key: become_method - env: - - name: ANSIBLE_BECOME_METHOD - vars: - - name: ansible_become_method - persistent_connect_timeout: - type: int - description: - - Configures, in seconds, the amount of time to wait when trying to initially - establish a persistent connection. If this value expires before the connection - to the remote device is completed, the connection will fail. - default: 30 - ini: - - section: persistent_connection - key: connect_timeout - env: - - name: ANSIBLE_PERSISTENT_CONNECT_TIMEOUT - vars: - - name: ansible_connect_timeout - persistent_command_timeout: - type: int - description: - - Configures, in seconds, the amount of time to wait for a command to return from - the remote device. If this timer is exceeded before the command returns, the - connection plugin will raise an exception and close. - default: 30 - ini: - - section: persistent_connection - key: command_timeout - env: - - name: ANSIBLE_PERSISTENT_COMMAND_TIMEOUT - vars: - - name: ansible_command_timeout - persistent_log_messages: - type: boolean - description: - - This flag will enable logging the command executed and response received from - target device in the ansible log file. For this option to work 'log_path' ansible - configuration option is required to be set to a file path with write access. - - Be sure to fully understand the security implications of enabling this option - as it could create a security vulnerability by logging sensitive information - in log file. - default: false - ini: - - section: persistent_connection - key: log_messages - env: - - name: ANSIBLE_PERSISTENT_LOG_MESSAGES - vars: - - name: ansible_persistent_log_messages -""" - -from io import BytesIO - -from ansible.errors import AnsibleConnectionFailure -from ansible.module_utils._text import to_bytes -from ansible.module_utils.six import PY3 -from ansible.module_utils.six.moves import cPickle -from ansible.module_utils.six.moves.urllib.error import HTTPError, URLError -from ansible.module_utils.urls import open_url -from ansible.playbook.play_context import PlayContext -from ansible.plugins.loader import httpapi_loader -from ansible.plugins.connection import NetworkConnectionBase, ensure_connect - - -class Connection(NetworkConnectionBase): - """Network API connection""" - - transport = "ansible.netcommon.httpapi" - has_pipelining = True - - def __init__(self, play_context, new_stdin, *args, **kwargs): - super(Connection, self).__init__( - play_context, new_stdin, *args, **kwargs - ) - - self._url = None - self._auth = None - - if self._network_os: - - self.httpapi = httpapi_loader.get(self._network_os, self) - if self.httpapi: - self._sub_plugin = { - "type": "httpapi", - "name": self.httpapi._load_name, - "obj": self.httpapi, - } - self.queue_message( - "vvvv", - "loaded API plugin %s from path %s for network_os %s" - % ( - self.httpapi._load_name, - self.httpapi._original_path, - self._network_os, - ), - ) - else: - raise AnsibleConnectionFailure( - "unable to load API plugin for network_os %s" - % self._network_os - ) - - else: - raise AnsibleConnectionFailure( - "Unable to automatically determine host network os. Please " - "manually configure ansible_network_os value for this host" - ) - self.queue_message("log", "network_os is set to %s" % self._network_os) - - def update_play_context(self, pc_data): - """Updates the play context information for the connection""" - pc_data = to_bytes(pc_data) - if PY3: - pc_data = cPickle.loads(pc_data, encoding="bytes") - else: - pc_data = cPickle.loads(pc_data) - play_context = PlayContext() - play_context.deserialize(pc_data) - - self.queue_message("vvvv", "updating play_context for connection") - if self._play_context.become ^ play_context.become: - self.set_become(play_context) - if play_context.become is True: - self.queue_message("vvvv", "authorizing connection") - else: - self.queue_message("vvvv", "deauthorizing connection") - - self._play_context = play_context - - def _connect(self): - if not self.connected: - protocol = "https" if self.get_option("use_ssl") else "http" - host = self.get_option("host") - port = self.get_option("port") or ( - 443 if protocol == "https" else 80 - ) - self._url = "%s://%s:%s" % (protocol, host, port) - - self.queue_message( - "vvv", - "ESTABLISH HTTP(S) CONNECTFOR USER: %s TO %s" - % (self._play_context.remote_user, self._url), - ) - self.httpapi.set_become(self._play_context) - self._connected = True - - self.httpapi.login( - self.get_option("remote_user"), self.get_option("password") - ) - - def close(self): - """ - Close the active session to the device - """ - # only close the connection if its connected. - if self._connected: - self.queue_message("vvvv", "closing http(s) connection to device") - self.logout() - - super(Connection, self).close() - - @ensure_connect - def send(self, path, data, **kwargs): - """ - Sends the command to the device over api - """ - url_kwargs = dict( - timeout=self.get_option("persistent_command_timeout"), - validate_certs=self.get_option("validate_certs"), - use_proxy=self.get_option("use_proxy"), - headers={}, - ) - url_kwargs.update(kwargs) - if self._auth: - # Avoid modifying passed-in headers - headers = dict(kwargs.get("headers", {})) - headers.update(self._auth) - url_kwargs["headers"] = headers - else: - url_kwargs["force_basic_auth"] = True - url_kwargs["url_username"] = self.get_option("remote_user") - url_kwargs["url_password"] = self.get_option("password") - - try: - url = self._url + path - self._log_messages( - "send url '%s' with data '%s' and kwargs '%s'" - % (url, data, url_kwargs) - ) - response = open_url(url, data=data, **url_kwargs) - except HTTPError as exc: - is_handled = self.handle_httperror(exc) - if is_handled is True: - return self.send(path, data, **kwargs) - elif is_handled is False: - raise - else: - response = is_handled - except URLError as exc: - raise AnsibleConnectionFailure( - "Could not connect to {0}: {1}".format( - self._url + path, exc.reason - ) - ) - - response_buffer = BytesIO() - resp_data = response.read() - self._log_messages("received response: '%s'" % resp_data) - response_buffer.write(resp_data) - - # Try to assign a new auth token if one is given - self._auth = self.update_auth(response, response_buffer) or self._auth - - response_buffer.seek(0) - - return response, response_buffer diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/netconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/netconf.py deleted file mode 100644 index 1e2d3caa486..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/netconf.py +++ /dev/null @@ -1,404 +0,0 @@ -# (c) 2016 Red Hat Inc. -# (c) 2017 Ansible Project -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - -DOCUMENTATION = """author: Ansible Networking Team -connection: netconf -short_description: Provides a persistent connection using the netconf protocol -description: -- This connection plugin provides a connection to remote devices over the SSH NETCONF - subsystem. This connection plugin is typically used by network devices for sending - and receiving RPC calls over NETCONF. -- Note this connection plugin requires ncclient to be installed on the local Ansible - controller. -requirements: -- ncclient -options: - host: - description: - - Specifies the remote device FQDN or IP address to establish the SSH connection - to. - default: inventory_hostname - vars: - - name: ansible_host - port: - type: int - description: - - Specifies the port on the remote device that listens for connections when establishing - the SSH connection. - default: 830 - ini: - - section: defaults - key: remote_port - env: - - name: ANSIBLE_REMOTE_PORT - vars: - - name: ansible_port - network_os: - description: - - Configures the device platform network operating system. This value is used - to load a device specific netconf plugin. If this option is not configured - (or set to C(auto)), then Ansible will attempt to guess the correct network_os - to use. If it can not guess a network_os correctly it will use C(default). - vars: - - name: ansible_network_os - remote_user: - description: - - The username used to authenticate to the remote device when the SSH connection - is first established. If the remote_user is not specified, the connection will - use the username of the logged in user. - - Can be configured from the CLI via the C(--user) or C(-u) options. - ini: - - section: defaults - key: remote_user - env: - - name: ANSIBLE_REMOTE_USER - vars: - - name: ansible_user - password: - description: - - Configures the user password used to authenticate to the remote device when - first establishing the SSH connection. - vars: - - name: ansible_password - - name: ansible_ssh_pass - - name: ansible_ssh_password - - name: ansible_netconf_password - private_key_file: - description: - - The private SSH key or certificate file used to authenticate to the remote device - when first establishing the SSH connection. - ini: - - section: defaults - key: private_key_file - env: - - name: ANSIBLE_PRIVATE_KEY_FILE - vars: - - name: ansible_private_key_file - look_for_keys: - default: true - description: - - Enables looking for ssh keys in the usual locations for ssh keys (e.g. :file:`~/.ssh/id_*`). - env: - - name: ANSIBLE_PARAMIKO_LOOK_FOR_KEYS - ini: - - section: paramiko_connection - key: look_for_keys - type: boolean - host_key_checking: - description: Set this to "False" if you want to avoid host key checking by the - underlying tools Ansible uses to connect to the host - type: boolean - default: true - env: - - name: ANSIBLE_HOST_KEY_CHECKING - - name: ANSIBLE_SSH_HOST_KEY_CHECKING - - name: ANSIBLE_NETCONF_HOST_KEY_CHECKING - ini: - - section: defaults - key: host_key_checking - - section: paramiko_connection - key: host_key_checking - vars: - - name: ansible_host_key_checking - - name: ansible_ssh_host_key_checking - - name: ansible_netconf_host_key_checking - persistent_connect_timeout: - type: int - description: - - Configures, in seconds, the amount of time to wait when trying to initially - establish a persistent connection. If this value expires before the connection - to the remote device is completed, the connection will fail. - default: 30 - ini: - - section: persistent_connection - key: connect_timeout - env: - - name: ANSIBLE_PERSISTENT_CONNECT_TIMEOUT - vars: - - name: ansible_connect_timeout - persistent_command_timeout: - type: int - description: - - Configures, in seconds, the amount of time to wait for a command to return from - the remote device. If this timer is exceeded before the command returns, the - connection plugin will raise an exception and close. - default: 30 - ini: - - section: persistent_connection - key: command_timeout - env: - - name: ANSIBLE_PERSISTENT_COMMAND_TIMEOUT - vars: - - name: ansible_command_timeout - netconf_ssh_config: - description: - - This variable is used to enable bastion/jump host with netconf connection. If - set to True the bastion/jump host ssh settings should be present in ~/.ssh/config - file, alternatively it can be set to custom ssh configuration file path to read - the bastion/jump host settings. - ini: - - section: netconf_connection - key: ssh_config - version_added: '2.7' - env: - - name: ANSIBLE_NETCONF_SSH_CONFIG - vars: - - name: ansible_netconf_ssh_config - version_added: '2.7' - persistent_log_messages: - type: boolean - description: - - This flag will enable logging the command executed and response received from - target device in the ansible log file. For this option to work 'log_path' ansible - configuration option is required to be set to a file path with write access. - - Be sure to fully understand the security implications of enabling this option - as it could create a security vulnerability by logging sensitive information - in log file. - default: false - ini: - - section: persistent_connection - key: log_messages - env: - - name: ANSIBLE_PERSISTENT_LOG_MESSAGES - vars: - - name: ansible_persistent_log_messages -""" - -import os -import logging -import json - -from ansible.errors import AnsibleConnectionFailure, AnsibleError -from ansible.module_utils._text import to_bytes, to_native, to_text -from ansible.module_utils.basic import missing_required_lib -from ansible.module_utils.parsing.convert_bool import ( - BOOLEANS_TRUE, - BOOLEANS_FALSE, -) -from ansible.plugins.loader import netconf_loader -from ansible.plugins.connection import NetworkConnectionBase, ensure_connect - -try: - from ncclient import manager - from ncclient.operations import RPCError - from ncclient.transport.errors import SSHUnknownHostError - from ncclient.xml_ import to_ele, to_xml - - HAS_NCCLIENT = True - NCCLIENT_IMP_ERR = None -except ( - ImportError, - AttributeError, -) as err: # paramiko and gssapi are incompatible and raise AttributeError not ImportError - HAS_NCCLIENT = False - NCCLIENT_IMP_ERR = err - -logging.getLogger("ncclient").setLevel(logging.INFO) - - -class Connection(NetworkConnectionBase): - """NetConf connections""" - - transport = "ansible.netcommon.netconf" - has_pipelining = False - - def __init__(self, play_context, new_stdin, *args, **kwargs): - super(Connection, self).__init__( - play_context, new_stdin, *args, **kwargs - ) - - # If network_os is not specified then set the network os to auto - # This will be used to trigger the use of guess_network_os when connecting. - self._network_os = self._network_os or "auto" - - self.netconf = netconf_loader.get(self._network_os, self) - if self.netconf: - self._sub_plugin = { - "type": "netconf", - "name": self.netconf._load_name, - "obj": self.netconf, - } - self.queue_message( - "vvvv", - "loaded netconf plugin %s from path %s for network_os %s" - % ( - self.netconf._load_name, - self.netconf._original_path, - self._network_os, - ), - ) - else: - self.netconf = netconf_loader.get("default", self) - self._sub_plugin = { - "type": "netconf", - "name": "default", - "obj": self.netconf, - } - self.queue_message( - "display", - "unable to load netconf plugin for network_os %s, falling back to default plugin" - % self._network_os, - ) - - self.queue_message("log", "network_os is set to %s" % self._network_os) - self._manager = None - self.key_filename = None - self._ssh_config = None - - def exec_command(self, cmd, in_data=None, sudoable=True): - """Sends the request to the node and returns the reply - The method accepts two forms of request. The first form is as a byte - string that represents xml string be send over netconf session. - The second form is a json-rpc (2.0) byte string. - """ - if self._manager: - # to_ele operates on native strings - request = to_ele(to_native(cmd, errors="surrogate_or_strict")) - - if request is None: - return "unable to parse request" - - try: - reply = self._manager.rpc(request) - except RPCError as exc: - error = self.internal_error( - data=to_text(to_xml(exc.xml), errors="surrogate_or_strict") - ) - return json.dumps(error) - - return reply.data_xml - else: - return super(Connection, self).exec_command(cmd, in_data, sudoable) - - @property - @ensure_connect - def manager(self): - return self._manager - - def _connect(self): - if not HAS_NCCLIENT: - raise AnsibleError( - "%s: %s" - % ( - missing_required_lib("ncclient"), - to_native(NCCLIENT_IMP_ERR), - ) - ) - - self.queue_message("log", "ssh connection done, starting ncclient") - - allow_agent = True - if self._play_context.password is not None: - allow_agent = False - setattr(self._play_context, "allow_agent", allow_agent) - - self.key_filename = ( - self._play_context.private_key_file - or self.get_option("private_key_file") - ) - if self.key_filename: - self.key_filename = str(os.path.expanduser(self.key_filename)) - - self._ssh_config = self.get_option("netconf_ssh_config") - if self._ssh_config in BOOLEANS_TRUE: - self._ssh_config = True - elif self._ssh_config in BOOLEANS_FALSE: - self._ssh_config = None - - # Try to guess the network_os if the network_os is set to auto - if self._network_os == "auto": - for cls in netconf_loader.all(class_only=True): - network_os = cls.guess_network_os(self) - if network_os: - self.queue_message( - "vvv", "discovered network_os %s" % network_os - ) - self._network_os = network_os - - # If we have tried to detect the network_os but were unable to i.e. network_os is still 'auto' - # then use default as the network_os - - if self._network_os == "auto": - # Network os not discovered. Set it to default - self.queue_message( - "vvv", - "Unable to discover network_os. Falling back to default.", - ) - self._network_os = "default" - try: - ncclient_device_handler = self.netconf.get_option( - "ncclient_device_handler" - ) - except KeyError: - ncclient_device_handler = "default" - self.queue_message( - "vvv", - "identified ncclient device handler: %s." - % ncclient_device_handler, - ) - device_params = {"name": ncclient_device_handler} - - try: - port = self._play_context.port or 830 - self.queue_message( - "vvv", - "ESTABLISH NETCONF SSH CONNECTION FOR USER: %s on PORT %s TO %s WITH SSH_CONFIG = %s" - % ( - self._play_context.remote_user, - port, - self._play_context.remote_addr, - self._ssh_config, - ), - ) - self._manager = manager.connect( - host=self._play_context.remote_addr, - port=port, - username=self._play_context.remote_user, - password=self._play_context.password, - key_filename=self.key_filename, - hostkey_verify=self.get_option("host_key_checking"), - look_for_keys=self.get_option("look_for_keys"), - device_params=device_params, - allow_agent=self._play_context.allow_agent, - timeout=self.get_option("persistent_connect_timeout"), - ssh_config=self._ssh_config, - ) - - self._manager._timeout = self.get_option( - "persistent_command_timeout" - ) - except SSHUnknownHostError as exc: - raise AnsibleConnectionFailure(to_native(exc)) - except ImportError: - raise AnsibleError( - "connection=netconf is not supported on {0}".format( - self._network_os - ) - ) - - if not self._manager.connected: - return 1, b"", b"not connected" - - self.queue_message( - "log", "ncclient manager object created successfully" - ) - - self._connected = True - - super(Connection, self)._connect() - - return ( - 0, - to_bytes(self._manager.session_id, errors="surrogate_or_strict"), - b"", - ) - - def close(self): - if self._manager: - self._manager.close_session() - super(Connection, self).close() diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/netconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/netconf.py deleted file mode 100644 index 8789075af8b..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/netconf.py +++ /dev/null @@ -1,66 +0,0 @@ -# -*- coding: utf-8 -*- - -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - - -class ModuleDocFragment(object): - - # Standard files documentation fragment - DOCUMENTATION = r"""options: - host: - description: - - Specifies the DNS host name or address for connecting to the remote device over - the specified transport. The value of host is used as the destination address - for the transport. - type: str - required: true - port: - description: - - Specifies the port to use when building the connection to the remote device. The - port value will default to port 830. - type: int - default: 830 - username: - description: - - Configures the username to use to authenticate the connection to the remote - device. This value is used to authenticate the SSH session. If the value is - not specified in the task, the value of environment variable C(ANSIBLE_NET_USERNAME) - will be used instead. - type: str - password: - description: - - Specifies the password to use to authenticate the connection to the remote device. This - value is used to authenticate the SSH session. If the value is not specified - in the task, the value of environment variable C(ANSIBLE_NET_PASSWORD) will - be used instead. - type: str - timeout: - description: - - Specifies the timeout in seconds for communicating with the network device for - either connecting or sending commands. If the timeout is exceeded before the - operation is completed, the module will error. - type: int - default: 10 - ssh_keyfile: - description: - - Specifies the SSH key to use to authenticate the connection to the remote device. This - value is the path to the key used to authenticate the SSH session. If the value - is not specified in the task, the value of environment variable C(ANSIBLE_NET_SSH_KEYFILE) - will be used instead. - type: path - hostkey_verify: - description: - - If set to C(yes), the ssh host key of the device must match a ssh key present - on the host if set to C(no), the ssh host key of the device is not checked. - type: bool - default: true - look_for_keys: - description: - - Enables looking in the usual locations for the ssh keys (e.g. :file:`~/.ssh/id_*`) - type: bool - default: true -notes: -- For information on using netconf see the :ref:`Platform Options guide using Netconf` -- For more information on using Ansible to manage network devices see the :ref:`Ansible - Network Guide ` -""" diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/network_agnostic.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/network_agnostic.py deleted file mode 100644 index ad65f6ef73f..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/network_agnostic.py +++ /dev/null @@ -1,14 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright: (c) 2019 Ansible, Inc -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - - -class ModuleDocFragment(object): - - # Standard files documentation fragment - DOCUMENTATION = r"""options: {} -notes: -- This module is supported on C(ansible_network_os) network platforms. See the :ref:`Network - Platform Options ` for details. -""" diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py deleted file mode 100644 index 6ae47a7302e..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py +++ /dev/null @@ -1,1186 +0,0 @@ -# (c) 2014, Maciej Delmanowski -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - -from functools import partial -import types - -try: - import netaddr -except ImportError: - # in this case, we'll make the filters return error messages (see bottom) - netaddr = None -else: - - class mac_linux(netaddr.mac_unix): - pass - - mac_linux.word_fmt = "%.2x" - -from ansible import errors - - -# ---- IP address and network query helpers ---- -def _empty_ipaddr_query(v, vtype): - # We don't have any query to process, so just check what type the user - # expects, and return the IP address in a correct format - if v: - if vtype == "address": - return str(v.ip) - elif vtype == "network": - return str(v) - - -def _first_last(v): - if v.size == 2: - first_usable = int(netaddr.IPAddress(v.first)) - last_usable = int(netaddr.IPAddress(v.last)) - return first_usable, last_usable - elif v.size > 1: - first_usable = int(netaddr.IPAddress(v.first + 1)) - last_usable = int(netaddr.IPAddress(v.last - 1)) - return first_usable, last_usable - - -def _6to4_query(v, vtype, value): - if v.version == 4: - - if v.size == 1: - ipconv = str(v.ip) - elif v.size > 1: - if v.ip != v.network: - ipconv = str(v.ip) - else: - ipconv = False - - if ipaddr(ipconv, "public"): - numbers = list(map(int, ipconv.split("."))) - - try: - return "2002:{:02x}{:02x}:{:02x}{:02x}::1/48".format(*numbers) - except Exception: - return False - - elif v.version == 6: - if vtype == "address": - if ipaddr(str(v), "2002::/16"): - return value - elif vtype == "network": - if v.ip != v.network: - if ipaddr(str(v.ip), "2002::/16"): - return value - else: - return False - - -def _ip_query(v): - if v.size == 1: - return str(v.ip) - if v.size > 1: - # /31 networks in netaddr have no broadcast address - if v.ip != v.network or not v.broadcast: - return str(v.ip) - - -def _gateway_query(v): - if v.size > 1: - if v.ip != v.network: - return str(v.ip) + "/" + str(v.prefixlen) - - -def _address_prefix_query(v): - if v.size > 1: - if v.ip != v.network: - return str(v.ip) + "/" + str(v.prefixlen) - - -def _bool_ipaddr_query(v): - if v: - return True - - -def _broadcast_query(v): - if v.size > 2: - return str(v.broadcast) - - -def _cidr_query(v): - return str(v) - - -def _cidr_lookup_query(v, iplist, value): - try: - if v in iplist: - return value - except Exception: - return False - - -def _first_usable_query(v, vtype): - if vtype == "address": - "Does it make sense to raise an error" - raise errors.AnsibleFilterError("Not a network address") - elif vtype == "network": - if v.size == 2: - return str(netaddr.IPAddress(int(v.network))) - elif v.size > 1: - return str(netaddr.IPAddress(int(v.network) + 1)) - - -def _host_query(v): - if v.size == 1: - return str(v) - elif v.size > 1: - if v.ip != v.network: - return str(v.ip) + "/" + str(v.prefixlen) - - -def _hostmask_query(v): - return str(v.hostmask) - - -def _int_query(v, vtype): - if vtype == "address": - return int(v.ip) - elif vtype == "network": - return str(int(v.ip)) + "/" + str(int(v.prefixlen)) - - -def _ip_prefix_query(v): - if v.size == 2: - return str(v.ip) + "/" + str(v.prefixlen) - elif v.size > 1: - if v.ip != v.network: - return str(v.ip) + "/" + str(v.prefixlen) - - -def _ip_netmask_query(v): - if v.size == 2: - return str(v.ip) + " " + str(v.netmask) - elif v.size > 1: - if v.ip != v.network: - return str(v.ip) + " " + str(v.netmask) - - -""" -def _ip_wildcard_query(v): - if v.size == 2: - return str(v.ip) + ' ' + str(v.hostmask) - elif v.size > 1: - if v.ip != v.network: - return str(v.ip) + ' ' + str(v.hostmask) -""" - - -def _ipv4_query(v, value): - if v.version == 6: - try: - return str(v.ipv4()) - except Exception: - return False - else: - return value - - -def _ipv6_query(v, value): - if v.version == 4: - return str(v.ipv6()) - else: - return value - - -def _last_usable_query(v, vtype): - if vtype == "address": - "Does it make sense to raise an error" - raise errors.AnsibleFilterError("Not a network address") - elif vtype == "network": - if v.size > 1: - first_usable, last_usable = _first_last(v) - return str(netaddr.IPAddress(last_usable)) - - -def _link_local_query(v, value): - v_ip = netaddr.IPAddress(str(v.ip)) - if v.version == 4: - if ipaddr(str(v_ip), "169.254.0.0/24"): - return value - - elif v.version == 6: - if ipaddr(str(v_ip), "fe80::/10"): - return value - - -def _loopback_query(v, value): - v_ip = netaddr.IPAddress(str(v.ip)) - if v_ip.is_loopback(): - return value - - -def _multicast_query(v, value): - if v.is_multicast(): - return value - - -def _net_query(v): - if v.size > 1: - if v.ip == v.network: - return str(v.network) + "/" + str(v.prefixlen) - - -def _netmask_query(v): - return str(v.netmask) - - -def _network_query(v): - """Return the network of a given IP or subnet""" - return str(v.network) - - -def _network_id_query(v): - """Return the network of a given IP or subnet""" - return str(v.network) - - -def _network_netmask_query(v): - return str(v.network) + " " + str(v.netmask) - - -def _network_wildcard_query(v): - return str(v.network) + " " + str(v.hostmask) - - -def _next_usable_query(v, vtype): - if vtype == "address": - "Does it make sense to raise an error" - raise errors.AnsibleFilterError("Not a network address") - elif vtype == "network": - if v.size > 1: - first_usable, last_usable = _first_last(v) - next_ip = int(netaddr.IPAddress(int(v.ip) + 1)) - if next_ip >= first_usable and next_ip <= last_usable: - return str(netaddr.IPAddress(int(v.ip) + 1)) - - -def _peer_query(v, vtype): - if vtype == "address": - raise errors.AnsibleFilterError("Not a network address") - elif vtype == "network": - if v.size == 2: - return str(netaddr.IPAddress(int(v.ip) ^ 1)) - if v.size == 4: - if int(v.ip) % 4 == 0: - raise errors.AnsibleFilterError( - "Network address of /30 has no peer" - ) - if int(v.ip) % 4 == 3: - raise errors.AnsibleFilterError( - "Broadcast address of /30 has no peer" - ) - return str(netaddr.IPAddress(int(v.ip) ^ 3)) - raise errors.AnsibleFilterError("Not a point-to-point network") - - -def _prefix_query(v): - return int(v.prefixlen) - - -def _previous_usable_query(v, vtype): - if vtype == "address": - "Does it make sense to raise an error" - raise errors.AnsibleFilterError("Not a network address") - elif vtype == "network": - if v.size > 1: - first_usable, last_usable = _first_last(v) - previous_ip = int(netaddr.IPAddress(int(v.ip) - 1)) - if previous_ip >= first_usable and previous_ip <= last_usable: - return str(netaddr.IPAddress(int(v.ip) - 1)) - - -def _private_query(v, value): - if v.is_private(): - return value - - -def _public_query(v, value): - v_ip = netaddr.IPAddress(str(v.ip)) - if ( - v_ip.is_unicast() - and not v_ip.is_private() - and not v_ip.is_loopback() - and not v_ip.is_netmask() - and not v_ip.is_hostmask() - ): - return value - - -def _range_usable_query(v, vtype): - if vtype == "address": - "Does it make sense to raise an error" - raise errors.AnsibleFilterError("Not a network address") - elif vtype == "network": - if v.size > 1: - first_usable, last_usable = _first_last(v) - first_usable = str(netaddr.IPAddress(first_usable)) - last_usable = str(netaddr.IPAddress(last_usable)) - return "{0}-{1}".format(first_usable, last_usable) - - -def _revdns_query(v): - v_ip = netaddr.IPAddress(str(v.ip)) - return v_ip.reverse_dns - - -def _size_query(v): - return v.size - - -def _size_usable_query(v): - if v.size == 1: - return 0 - elif v.size == 2: - return 2 - return v.size - 2 - - -def _subnet_query(v): - return str(v.cidr) - - -def _type_query(v): - if v.size == 1: - return "address" - if v.size > 1: - if v.ip != v.network: - return "address" - else: - return "network" - - -def _unicast_query(v, value): - if v.is_unicast(): - return value - - -def _version_query(v): - return v.version - - -def _wrap_query(v, vtype, value): - if v.version == 6: - if vtype == "address": - return "[" + str(v.ip) + "]" - elif vtype == "network": - return "[" + str(v.ip) + "]/" + str(v.prefixlen) - else: - return value - - -# ---- HWaddr query helpers ---- -def _bare_query(v): - v.dialect = netaddr.mac_bare - return str(v) - - -def _bool_hwaddr_query(v): - if v: - return True - - -def _int_hwaddr_query(v): - return int(v) - - -def _cisco_query(v): - v.dialect = netaddr.mac_cisco - return str(v) - - -def _empty_hwaddr_query(v, value): - if v: - return value - - -def _linux_query(v): - v.dialect = mac_linux - return str(v) - - -def _postgresql_query(v): - v.dialect = netaddr.mac_pgsql - return str(v) - - -def _unix_query(v): - v.dialect = netaddr.mac_unix - return str(v) - - -def _win_query(v): - v.dialect = netaddr.mac_eui48 - return str(v) - - -# ---- IP address and network filters ---- - -# Returns a minified list of subnets or a single subnet that spans all of -# the inputs. -def cidr_merge(value, action="merge"): - if not hasattr(value, "__iter__"): - raise errors.AnsibleFilterError( - "cidr_merge: expected iterable, got " + repr(value) - ) - - if action == "merge": - try: - return [str(ip) for ip in netaddr.cidr_merge(value)] - except Exception as e: - raise errors.AnsibleFilterError( - "cidr_merge: error in netaddr:\n%s" % e - ) - - elif action == "span": - # spanning_cidr needs at least two values - if len(value) == 0: - return None - elif len(value) == 1: - try: - return str(netaddr.IPNetwork(value[0])) - except Exception as e: - raise errors.AnsibleFilterError( - "cidr_merge: error in netaddr:\n%s" % e - ) - else: - try: - return str(netaddr.spanning_cidr(value)) - except Exception as e: - raise errors.AnsibleFilterError( - "cidr_merge: error in netaddr:\n%s" % e - ) - - else: - raise errors.AnsibleFilterError( - "cidr_merge: invalid action '%s'" % action - ) - - -def ipaddr(value, query="", version=False, alias="ipaddr"): - """ Check if string is an IP address or network and filter it """ - - query_func_extra_args = { - "": ("vtype",), - "6to4": ("vtype", "value"), - "cidr_lookup": ("iplist", "value"), - "first_usable": ("vtype",), - "int": ("vtype",), - "ipv4": ("value",), - "ipv6": ("value",), - "last_usable": ("vtype",), - "link-local": ("value",), - "loopback": ("value",), - "lo": ("value",), - "multicast": ("value",), - "next_usable": ("vtype",), - "peer": ("vtype",), - "previous_usable": ("vtype",), - "private": ("value",), - "public": ("value",), - "unicast": ("value",), - "range_usable": ("vtype",), - "wrap": ("vtype", "value"), - } - - query_func_map = { - "": _empty_ipaddr_query, - "6to4": _6to4_query, - "address": _ip_query, - "address/prefix": _address_prefix_query, # deprecate - "bool": _bool_ipaddr_query, - "broadcast": _broadcast_query, - "cidr": _cidr_query, - "cidr_lookup": _cidr_lookup_query, - "first_usable": _first_usable_query, - "gateway": _gateway_query, # deprecate - "gw": _gateway_query, # deprecate - "host": _host_query, - "host/prefix": _address_prefix_query, # deprecate - "hostmask": _hostmask_query, - "hostnet": _gateway_query, # deprecate - "int": _int_query, - "ip": _ip_query, - "ip/prefix": _ip_prefix_query, - "ip_netmask": _ip_netmask_query, - # 'ip_wildcard': _ip_wildcard_query, built then could not think of use case - "ipv4": _ipv4_query, - "ipv6": _ipv6_query, - "last_usable": _last_usable_query, - "link-local": _link_local_query, - "lo": _loopback_query, - "loopback": _loopback_query, - "multicast": _multicast_query, - "net": _net_query, - "next_usable": _next_usable_query, - "netmask": _netmask_query, - "network": _network_query, - "network_id": _network_id_query, - "network/prefix": _subnet_query, - "network_netmask": _network_netmask_query, - "network_wildcard": _network_wildcard_query, - "peer": _peer_query, - "prefix": _prefix_query, - "previous_usable": _previous_usable_query, - "private": _private_query, - "public": _public_query, - "range_usable": _range_usable_query, - "revdns": _revdns_query, - "router": _gateway_query, # deprecate - "size": _size_query, - "size_usable": _size_usable_query, - "subnet": _subnet_query, - "type": _type_query, - "unicast": _unicast_query, - "v4": _ipv4_query, - "v6": _ipv6_query, - "version": _version_query, - "wildcard": _hostmask_query, - "wrap": _wrap_query, - } - - vtype = None - - if not value: - return False - - elif value is True: - return False - - # Check if value is a list and parse each element - elif isinstance(value, (list, tuple, types.GeneratorType)): - - _ret = [] - for element in value: - if ipaddr(element, str(query), version): - _ret.append(ipaddr(element, str(query), version)) - - if _ret: - return _ret - else: - return list() - - # Check if value is a number and convert it to an IP address - elif str(value).isdigit(): - - # We don't know what IP version to assume, so let's check IPv4 first, - # then IPv6 - try: - if (not version) or (version and version == 4): - v = netaddr.IPNetwork("0.0.0.0/0") - v.value = int(value) - v.prefixlen = 32 - elif version and version == 6: - v = netaddr.IPNetwork("::/0") - v.value = int(value) - v.prefixlen = 128 - - # IPv4 didn't work the first time, so it definitely has to be IPv6 - except Exception: - try: - v = netaddr.IPNetwork("::/0") - v.value = int(value) - v.prefixlen = 128 - - # The value is too big for IPv6. Are you a nanobot? - except Exception: - return False - - # We got an IP address, let's mark it as such - value = str(v) - vtype = "address" - - # value has not been recognized, check if it's a valid IP string - else: - try: - v = netaddr.IPNetwork(value) - - # value is a valid IP string, check if user specified - # CIDR prefix or just an IP address, this will indicate default - # output format - try: - address, prefix = value.split("/") - vtype = "network" - except Exception: - vtype = "address" - - # value hasn't been recognized, maybe it's a numerical CIDR? - except Exception: - try: - address, prefix = value.split("/") - address.isdigit() - address = int(address) - prefix.isdigit() - prefix = int(prefix) - - # It's not numerical CIDR, give up - except Exception: - return False - - # It is something, so let's try and build a CIDR from the parts - try: - v = netaddr.IPNetwork("0.0.0.0/0") - v.value = address - v.prefixlen = prefix - - # It's not a valid IPv4 CIDR - except Exception: - try: - v = netaddr.IPNetwork("::/0") - v.value = address - v.prefixlen = prefix - - # It's not a valid IPv6 CIDR. Give up. - except Exception: - return False - - # We have a valid CIDR, so let's write it in correct format - value = str(v) - vtype = "network" - - # We have a query string but it's not in the known query types. Check if - # that string is a valid subnet, if so, we can check later if given IP - # address/network is inside that specific subnet - try: - # ?? 6to4 and link-local were True here before. Should they still? - if ( - query - and (query not in query_func_map or query == "cidr_lookup") - and not str(query).isdigit() - and ipaddr(query, "network") - ): - iplist = netaddr.IPSet([netaddr.IPNetwork(query)]) - query = "cidr_lookup" - except Exception: - pass - - # This code checks if value maches the IP version the user wants, ie. if - # it's any version ("ipaddr()"), IPv4 ("ipv4()") or IPv6 ("ipv6()") - # If version does not match, return False - if version and v.version != version: - return False - - extras = [] - for arg in query_func_extra_args.get(query, tuple()): - extras.append(locals()[arg]) - try: - return query_func_map[query](v, *extras) - except KeyError: - try: - float(query) - if v.size == 1: - if vtype == "address": - return str(v.ip) - elif vtype == "network": - return str(v) - - elif v.size > 1: - try: - return str(v[query]) + "/" + str(v.prefixlen) - except Exception: - return False - - else: - return value - - except Exception: - raise errors.AnsibleFilterError( - alias + ": unknown filter type: %s" % query - ) - - return False - - -def ipmath(value, amount): - try: - if "/" in value: - ip = netaddr.IPNetwork(value).ip - else: - ip = netaddr.IPAddress(value) - except (netaddr.AddrFormatError, ValueError): - msg = "You must pass a valid IP address; {0} is invalid".format(value) - raise errors.AnsibleFilterError(msg) - - if not isinstance(amount, int): - msg = ( - "You must pass an integer for arithmetic; " - "{0} is not a valid integer" - ).format(amount) - raise errors.AnsibleFilterError(msg) - - return str(ip + amount) - - -def ipwrap(value, query=""): - try: - if isinstance(value, (list, tuple, types.GeneratorType)): - _ret = [] - for element in value: - if ipaddr(element, query, version=False, alias="ipwrap"): - _ret.append(ipaddr(element, "wrap")) - else: - _ret.append(element) - - return _ret - else: - _ret = ipaddr(value, query, version=False, alias="ipwrap") - if _ret: - return ipaddr(_ret, "wrap") - else: - return value - - except Exception: - return value - - -def ipv4(value, query=""): - return ipaddr(value, query, version=4, alias="ipv4") - - -def ipv6(value, query=""): - return ipaddr(value, query, version=6, alias="ipv6") - - -# Split given subnet into smaller subnets or find out the biggest subnet of -# a given IP address with given CIDR prefix -# Usage: -# -# - address or address/prefix | ipsubnet -# returns CIDR subnet of a given input -# -# - address/prefix | ipsubnet(cidr) -# returns number of possible subnets for given CIDR prefix -# -# - address/prefix | ipsubnet(cidr, index) -# returns new subnet with given CIDR prefix -# -# - address | ipsubnet(cidr) -# returns biggest subnet with given CIDR prefix that address belongs to -# -# - address | ipsubnet(cidr, index) -# returns next indexed subnet which contains given address -# -# - address/prefix | ipsubnet(subnet/prefix) -# return the index of the subnet in the subnet -def ipsubnet(value, query="", index="x"): - """ Manipulate IPv4/IPv6 subnets """ - - try: - vtype = ipaddr(value, "type") - if vtype == "address": - v = ipaddr(value, "cidr") - elif vtype == "network": - v = ipaddr(value, "subnet") - - value = netaddr.IPNetwork(v) - except Exception: - return False - query_string = str(query) - if not query: - return str(value) - - elif query_string.isdigit(): - vsize = ipaddr(v, "size") - query = int(query) - - try: - float(index) - index = int(index) - - if vsize > 1: - try: - return str(list(value.subnet(query))[index]) - except Exception: - return False - - elif vsize == 1: - try: - return str(value.supernet(query)[index]) - except Exception: - return False - - except Exception: - if vsize > 1: - try: - return str(len(list(value.subnet(query)))) - except Exception: - return False - - elif vsize == 1: - try: - return str(value.supernet(query)[0]) - except Exception: - return False - - elif query_string: - vtype = ipaddr(query, "type") - if vtype == "address": - v = ipaddr(query, "cidr") - elif vtype == "network": - v = ipaddr(query, "subnet") - else: - msg = "You must pass a valid subnet or IP address; {0} is invalid".format( - query_string - ) - raise errors.AnsibleFilterError(msg) - query = netaddr.IPNetwork(v) - for i, subnet in enumerate(query.subnet(value.prefixlen), 1): - if subnet == value: - return str(i) - msg = "{0} is not in the subnet {1}".format(value.cidr, query.cidr) - raise errors.AnsibleFilterError(msg) - return False - - -# Returns the nth host within a network described by value. -# Usage: -# -# - address or address/prefix | nthhost(nth) -# returns the nth host within the given network -def nthhost(value, query=""): - """ Get the nth host within a given network """ - try: - vtype = ipaddr(value, "type") - if vtype == "address": - v = ipaddr(value, "cidr") - elif vtype == "network": - v = ipaddr(value, "subnet") - - value = netaddr.IPNetwork(v) - except Exception: - return False - - if not query: - return False - - try: - nth = int(query) - if value.size > nth: - return value[nth] - - except ValueError: - return False - - return False - - -# Returns the next nth usable ip within a network described by value. -def next_nth_usable(value, offset): - try: - vtype = ipaddr(value, "type") - if vtype == "address": - v = ipaddr(value, "cidr") - elif vtype == "network": - v = ipaddr(value, "subnet") - - v = netaddr.IPNetwork(v) - except Exception: - return False - - if type(offset) != int: - raise errors.AnsibleFilterError("Must pass in an integer") - if v.size > 1: - first_usable, last_usable = _first_last(v) - nth_ip = int(netaddr.IPAddress(int(v.ip) + offset)) - if nth_ip >= first_usable and nth_ip <= last_usable: - return str(netaddr.IPAddress(int(v.ip) + offset)) - - -# Returns the previous nth usable ip within a network described by value. -def previous_nth_usable(value, offset): - try: - vtype = ipaddr(value, "type") - if vtype == "address": - v = ipaddr(value, "cidr") - elif vtype == "network": - v = ipaddr(value, "subnet") - - v = netaddr.IPNetwork(v) - except Exception: - return False - - if type(offset) != int: - raise errors.AnsibleFilterError("Must pass in an integer") - if v.size > 1: - first_usable, last_usable = _first_last(v) - nth_ip = int(netaddr.IPAddress(int(v.ip) - offset)) - if nth_ip >= first_usable and nth_ip <= last_usable: - return str(netaddr.IPAddress(int(v.ip) - offset)) - - -def _range_checker(ip_check, first, last): - """ - Tests whether an ip address is within the bounds of the first and last address. - - :param ip_check: The ip to test if it is within first and last. - :param first: The first IP in the range to test against. - :param last: The last IP in the range to test against. - - :return: bool - """ - if ip_check >= first and ip_check <= last: - return True - else: - return False - - -def _address_normalizer(value): - """ - Used to validate an address or network type and return it in a consistent format. - This is being used for future use cases not currently available such as an address range. - - :param value: The string representation of an address or network. - - :return: The address or network in the normalized form. - """ - try: - vtype = ipaddr(value, "type") - if vtype == "address" or vtype == "network": - v = ipaddr(value, "subnet") - except Exception: - return False - - return v - - -def network_in_usable(value, test): - """ - Checks whether 'test' is a useable address or addresses in 'value' - - :param: value: The string representation of an address or network to test against. - :param test: The string representation of an address or network to validate if it is within the range of 'value'. - - :return: bool - """ - # normalize value and test variables into an ipaddr - v = _address_normalizer(value) - w = _address_normalizer(test) - - # get first and last addresses as integers to compare value and test; or cathes value when case is /32 - v_first = ipaddr(ipaddr(v, "first_usable") or ipaddr(v, "address"), "int") - v_last = ipaddr(ipaddr(v, "last_usable") or ipaddr(v, "address"), "int") - w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int") - w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int") - - if _range_checker(w_first, v_first, v_last) and _range_checker( - w_last, v_first, v_last - ): - return True - else: - return False - - -def network_in_network(value, test): - """ - Checks whether the 'test' address or addresses are in 'value', including broadcast and network - - :param: value: The network address or range to test against. - :param test: The address or network to validate if it is within the range of 'value'. - - :return: bool - """ - # normalize value and test variables into an ipaddr - v = _address_normalizer(value) - w = _address_normalizer(test) - - # get first and last addresses as integers to compare value and test; or cathes value when case is /32 - v_first = ipaddr(ipaddr(v, "network") or ipaddr(v, "address"), "int") - v_last = ipaddr(ipaddr(v, "broadcast") or ipaddr(v, "address"), "int") - w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int") - w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int") - - if _range_checker(w_first, v_first, v_last) and _range_checker( - w_last, v_first, v_last - ): - return True - else: - return False - - -def reduce_on_network(value, network): - """ - Reduces a list of addresses to only the addresses that match a given network. - - :param: value: The list of addresses to filter on. - :param: network: The network to validate against. - - :return: The reduced list of addresses. - """ - # normalize network variable into an ipaddr - n = _address_normalizer(network) - - # get first and last addresses as integers to compare value and test; or cathes value when case is /32 - n_first = ipaddr(ipaddr(n, "network") or ipaddr(n, "address"), "int") - n_last = ipaddr(ipaddr(n, "broadcast") or ipaddr(n, "address"), "int") - - # create an empty list to fill and return - r = [] - - for address in value: - # normalize address variables into an ipaddr - a = _address_normalizer(address) - - # get first and last addresses as integers to compare value and test; or cathes value when case is /32 - a_first = ipaddr(ipaddr(a, "network") or ipaddr(a, "address"), "int") - a_last = ipaddr(ipaddr(a, "broadcast") or ipaddr(a, "address"), "int") - - if _range_checker(a_first, n_first, n_last) and _range_checker( - a_last, n_first, n_last - ): - r.append(address) - - return r - - -# Returns the SLAAC address within a network for a given HW/MAC address. -# Usage: -# -# - prefix | slaac(mac) -def slaac(value, query=""): - """ Get the SLAAC address within given network """ - try: - vtype = ipaddr(value, "type") - if vtype == "address": - v = ipaddr(value, "cidr") - elif vtype == "network": - v = ipaddr(value, "subnet") - - if ipaddr(value, "version") != 6: - return False - - value = netaddr.IPNetwork(v) - except Exception: - return False - - if not query: - return False - - try: - mac = hwaddr(query, alias="slaac") - - eui = netaddr.EUI(mac) - except Exception: - return False - - return eui.ipv6(value.network) - - -# ---- HWaddr / MAC address filters ---- -def hwaddr(value, query="", alias="hwaddr"): - """ Check if string is a HW/MAC address and filter it """ - - query_func_extra_args = {"": ("value",)} - - query_func_map = { - "": _empty_hwaddr_query, - "bare": _bare_query, - "bool": _bool_hwaddr_query, - "int": _int_hwaddr_query, - "cisco": _cisco_query, - "eui48": _win_query, - "linux": _linux_query, - "pgsql": _postgresql_query, - "postgresql": _postgresql_query, - "psql": _postgresql_query, - "unix": _unix_query, - "win": _win_query, - } - - try: - v = netaddr.EUI(value) - except Exception: - if query and query != "bool": - raise errors.AnsibleFilterError( - alias + ": not a hardware address: %s" % value - ) - - extras = [] - for arg in query_func_extra_args.get(query, tuple()): - extras.append(locals()[arg]) - try: - return query_func_map[query](v, *extras) - except KeyError: - raise errors.AnsibleFilterError( - alias + ": unknown filter type: %s" % query - ) - - return False - - -def macaddr(value, query=""): - return hwaddr(value, query, alias="macaddr") - - -def _need_netaddr(f_name, *args, **kwargs): - raise errors.AnsibleFilterError( - "The %s filter requires python's netaddr be " - "installed on the ansible controller" % f_name - ) - - -def ip4_hex(arg, delimiter=""): - """ Convert an IPv4 address to Hexadecimal notation """ - numbers = list(map(int, arg.split("."))) - return "{0:02x}{sep}{1:02x}{sep}{2:02x}{sep}{3:02x}".format( - *numbers, sep=delimiter - ) - - -# ---- Ansible filters ---- -class FilterModule(object): - """ IP address and network manipulation filters """ - - filter_map = { - # IP addresses and networks - "cidr_merge": cidr_merge, - "ipaddr": ipaddr, - "ipmath": ipmath, - "ipwrap": ipwrap, - "ip4_hex": ip4_hex, - "ipv4": ipv4, - "ipv6": ipv6, - "ipsubnet": ipsubnet, - "next_nth_usable": next_nth_usable, - "network_in_network": network_in_network, - "network_in_usable": network_in_usable, - "reduce_on_network": reduce_on_network, - "nthhost": nthhost, - "previous_nth_usable": previous_nth_usable, - "slaac": slaac, - # MAC / HW addresses - "hwaddr": hwaddr, - "macaddr": macaddr, - } - - def filters(self): - if netaddr: - return self.filter_map - else: - # Need to install python's netaddr for these filters to work - return dict( - (f, partial(_need_netaddr, f)) for f in self.filter_map - ) diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py deleted file mode 100644 index 72d6c8684c1..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py +++ /dev/null @@ -1,531 +0,0 @@ -# -# {c) 2017 Red Hat, Inc. -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - -import re -import os -import traceback -import string - -from collections.abc import Mapping -from xml.etree.ElementTree import fromstring - -from ansible.module_utils._text import to_native, to_text -from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import ( - Template, -) -from ansible.module_utils.six import iteritems, string_types -from ansible.errors import AnsibleError, AnsibleFilterError -from ansible.utils.display import Display -from ansible.utils.encrypt import passlib_or_crypt, random_password - -try: - import yaml - - HAS_YAML = True -except ImportError: - HAS_YAML = False - -try: - import textfsm - - HAS_TEXTFSM = True -except ImportError: - HAS_TEXTFSM = False - -display = Display() - - -def re_matchall(regex, value): - objects = list() - for match in re.findall(regex.pattern, value, re.M): - obj = {} - if regex.groupindex: - for name, index in iteritems(regex.groupindex): - if len(regex.groupindex) == 1: - obj[name] = match - else: - obj[name] = match[index - 1] - objects.append(obj) - return objects - - -def re_search(regex, value): - obj = {} - match = regex.search(value, re.M) - if match: - items = list(match.groups()) - if regex.groupindex: - for name, index in iteritems(regex.groupindex): - obj[name] = items[index - 1] - return obj - - -def parse_cli(output, tmpl): - if not isinstance(output, string_types): - raise AnsibleError( - "parse_cli input should be a string, but was given a input of %s" - % (type(output)) - ) - - if not os.path.exists(tmpl): - raise AnsibleError("unable to locate parse_cli template: %s" % tmpl) - - try: - template = Template() - except ImportError as exc: - raise AnsibleError(to_native(exc)) - - with open(tmpl) as tmpl_fh: - tmpl_content = tmpl_fh.read() - - spec = yaml.safe_load(tmpl_content) - obj = {} - - for name, attrs in iteritems(spec["keys"]): - value = attrs["value"] - - try: - variables = spec.get("vars", {}) - value = template(value, variables) - except Exception: - pass - - if "start_block" in attrs and "end_block" in attrs: - start_block = re.compile(attrs["start_block"]) - end_block = re.compile(attrs["end_block"]) - - blocks = list() - lines = None - block_started = False - - for line in output.split("\n"): - match_start = start_block.match(line) - match_end = end_block.match(line) - - if match_start: - lines = list() - lines.append(line) - block_started = True - - elif match_end: - if lines: - lines.append(line) - blocks.append("\n".join(lines)) - block_started = False - - elif block_started: - if lines: - lines.append(line) - - regex_items = [re.compile(r) for r in attrs["items"]] - objects = list() - - for block in blocks: - if isinstance(value, Mapping) and "key" not in value: - items = list() - for regex in regex_items: - match = regex.search(block) - if match: - item_values = match.groupdict() - item_values["match"] = list(match.groups()) - items.append(item_values) - else: - items.append(None) - - obj = {} - for k, v in iteritems(value): - try: - obj[k] = template( - v, {"item": items}, fail_on_undefined=False - ) - except Exception: - obj[k] = None - objects.append(obj) - - elif isinstance(value, Mapping): - items = list() - for regex in regex_items: - match = regex.search(block) - if match: - item_values = match.groupdict() - item_values["match"] = list(match.groups()) - items.append(item_values) - else: - items.append(None) - - key = template(value["key"], {"item": items}) - values = dict( - [ - (k, template(v, {"item": items})) - for k, v in iteritems(value["values"]) - ] - ) - objects.append({key: values}) - - return objects - - elif "items" in attrs: - regexp = re.compile(attrs["items"]) - when = attrs.get("when") - conditional = ( - "{%% if %s %%}True{%% else %%}False{%% endif %%}" % when - ) - - if isinstance(value, Mapping) and "key" not in value: - values = list() - - for item in re_matchall(regexp, output): - entry = {} - - for item_key, item_value in iteritems(value): - entry[item_key] = template(item_value, {"item": item}) - - if when: - if template(conditional, {"item": entry}): - values.append(entry) - else: - values.append(entry) - - obj[name] = values - - elif isinstance(value, Mapping): - values = dict() - - for item in re_matchall(regexp, output): - entry = {} - - for item_key, item_value in iteritems(value["values"]): - entry[item_key] = template(item_value, {"item": item}) - - key = template(value["key"], {"item": item}) - - if when: - if template( - conditional, {"item": {"key": key, "value": entry}} - ): - values[key] = entry - else: - values[key] = entry - - obj[name] = values - - else: - item = re_search(regexp, output) - obj[name] = template(value, {"item": item}) - - else: - obj[name] = value - - return obj - - -def parse_cli_textfsm(value, template): - if not HAS_TEXTFSM: - raise AnsibleError( - "parse_cli_textfsm filter requires TextFSM library to be installed" - ) - - if not isinstance(value, string_types): - raise AnsibleError( - "parse_cli_textfsm input should be a string, but was given a input of %s" - % (type(value)) - ) - - if not os.path.exists(template): - raise AnsibleError( - "unable to locate parse_cli_textfsm template: %s" % template - ) - - try: - template = open(template) - except IOError as exc: - raise AnsibleError(to_native(exc)) - - re_table = textfsm.TextFSM(template) - fsm_results = re_table.ParseText(value) - - results = list() - for item in fsm_results: - results.append(dict(zip(re_table.header, item))) - - return results - - -def _extract_param(template, root, attrs, value): - - key = None - when = attrs.get("when") - conditional = "{%% if %s %%}True{%% else %%}False{%% endif %%}" % when - param_to_xpath_map = attrs["items"] - - if isinstance(value, Mapping): - key = value.get("key", None) - if key: - value = value["values"] - - entries = dict() if key else list() - - for element in root.findall(attrs["top"]): - entry = dict() - item_dict = dict() - for param, param_xpath in iteritems(param_to_xpath_map): - fields = None - try: - fields = element.findall(param_xpath) - except Exception: - display.warning( - "Failed to evaluate value of '%s' with XPath '%s'.\nUnexpected error: %s." - % (param, param_xpath, traceback.format_exc()) - ) - - tags = param_xpath.split("/") - - # check if xpath ends with attribute. - # If yes set attribute key/value dict to param value in case attribute matches - # else if it is a normal xpath assign matched element text value. - if len(tags) and tags[-1].endswith("]"): - if fields: - if len(fields) > 1: - item_dict[param] = [field.attrib for field in fields] - else: - item_dict[param] = fields[0].attrib - else: - item_dict[param] = {} - else: - if fields: - if len(fields) > 1: - item_dict[param] = [field.text for field in fields] - else: - item_dict[param] = fields[0].text - else: - item_dict[param] = None - - if isinstance(value, Mapping): - for item_key, item_value in iteritems(value): - entry[item_key] = template(item_value, {"item": item_dict}) - else: - entry = template(value, {"item": item_dict}) - - if key: - expanded_key = template(key, {"item": item_dict}) - if when: - if template( - conditional, - {"item": {"key": expanded_key, "value": entry}}, - ): - entries[expanded_key] = entry - else: - entries[expanded_key] = entry - else: - if when: - if template(conditional, {"item": entry}): - entries.append(entry) - else: - entries.append(entry) - - return entries - - -def parse_xml(output, tmpl): - if not os.path.exists(tmpl): - raise AnsibleError("unable to locate parse_xml template: %s" % tmpl) - - if not isinstance(output, string_types): - raise AnsibleError( - "parse_xml works on string input, but given input of : %s" - % type(output) - ) - - root = fromstring(output) - try: - template = Template() - except ImportError as exc: - raise AnsibleError(to_native(exc)) - - with open(tmpl) as tmpl_fh: - tmpl_content = tmpl_fh.read() - - spec = yaml.safe_load(tmpl_content) - obj = {} - - for name, attrs in iteritems(spec["keys"]): - value = attrs["value"] - - try: - variables = spec.get("vars", {}) - value = template(value, variables) - except Exception: - pass - - if "items" in attrs: - obj[name] = _extract_param(template, root, attrs, value) - else: - obj[name] = value - - return obj - - -def type5_pw(password, salt=None): - if not isinstance(password, string_types): - raise AnsibleFilterError( - "type5_pw password input should be a string, but was given a input of %s" - % (type(password).__name__) - ) - - salt_chars = u"".join( - (to_text(string.ascii_letters), to_text(string.digits), u"./") - ) - if salt is not None and not isinstance(salt, string_types): - raise AnsibleFilterError( - "type5_pw salt input should be a string, but was given a input of %s" - % (type(salt).__name__) - ) - elif not salt: - salt = random_password(length=4, chars=salt_chars) - elif not set(salt) <= set(salt_chars): - raise AnsibleFilterError( - "type5_pw salt used inproper characters, must be one of %s" - % (salt_chars) - ) - - encrypted_password = passlib_or_crypt(password, "md5_crypt", salt=salt) - - return encrypted_password - - -def hash_salt(password): - - split_password = password.split("$") - if len(split_password) != 4: - raise AnsibleFilterError( - "Could not parse salt out password correctly from {0}".format( - password - ) - ) - else: - return split_password[2] - - -def comp_type5( - unencrypted_password, encrypted_password, return_original=False -): - - salt = hash_salt(encrypted_password) - if type5_pw(unencrypted_password, salt) == encrypted_password: - if return_original is True: - return encrypted_password - else: - return True - return False - - -def vlan_parser(vlan_list, first_line_len=48, other_line_len=44): - - """ - Input: Unsorted list of vlan integers - Output: Sorted string list of integers according to IOS-like vlan list rules - - 1. Vlans are listed in ascending order - 2. Runs of 3 or more consecutive vlans are listed with a dash - 3. The first line of the list can be first_line_len characters long - 4. Subsequent list lines can be other_line_len characters - """ - - # Sort and remove duplicates - sorted_list = sorted(set(vlan_list)) - - if sorted_list[0] < 1 or sorted_list[-1] > 4094: - raise AnsibleFilterError("Valid VLAN range is 1-4094") - - parse_list = [] - idx = 0 - while idx < len(sorted_list): - start = idx - end = start - while end < len(sorted_list) - 1: - if sorted_list[end + 1] - sorted_list[end] == 1: - end += 1 - else: - break - - if start == end: - # Single VLAN - parse_list.append(str(sorted_list[idx])) - elif start + 1 == end: - # Run of 2 VLANs - parse_list.append(str(sorted_list[start])) - parse_list.append(str(sorted_list[end])) - else: - # Run of 3 or more VLANs - parse_list.append( - str(sorted_list[start]) + "-" + str(sorted_list[end]) - ) - idx = end + 1 - - line_count = 0 - result = [""] - for vlans in parse_list: - # First line (" switchport trunk allowed vlan ") - if line_count == 0: - if len(result[line_count] + vlans) > first_line_len: - result.append("") - line_count += 1 - result[line_count] += vlans + "," - else: - result[line_count] += vlans + "," - - # Subsequent lines (" switchport trunk allowed vlan add ") - else: - if len(result[line_count] + vlans) > other_line_len: - result.append("") - line_count += 1 - result[line_count] += vlans + "," - else: - result[line_count] += vlans + "," - - # Remove trailing orphan commas - for idx in range(0, len(result)): - result[idx] = result[idx].rstrip(",") - - # Sometimes text wraps to next line, but there are no remaining VLANs - if "" in result: - result.remove("") - - return result - - -class FilterModule(object): - """Filters for working with output from network devices""" - - filter_map = { - "parse_cli": parse_cli, - "parse_cli_textfsm": parse_cli_textfsm, - "parse_xml": parse_xml, - "type5_pw": type5_pw, - "hash_salt": hash_salt, - "comp_type5": comp_type5, - "vlan_parser": vlan_parser, - } - - def filters(self): - return self.filter_map diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/httpapi/restconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/httpapi/restconf.py deleted file mode 100644 index 8afb3e5e284..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/httpapi/restconf.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright (c) 2018 Cisco and/or its affiliates. -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . -# - -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - -DOCUMENTATION = """author: Ansible Networking Team -httpapi: restconf -short_description: HttpApi Plugin for devices supporting Restconf API -description: -- This HttpApi plugin provides methods to connect to Restconf API endpoints. -options: - root_path: - type: str - description: - - Specifies the location of the Restconf root. - default: /restconf - vars: - - name: ansible_httpapi_restconf_root -""" - -import json - -from ansible.module_utils._text import to_text -from ansible.module_utils.connection import ConnectionError -from ansible.module_utils.six.moves.urllib.error import HTTPError -from ansible.plugins.httpapi import HttpApiBase - - -CONTENT_TYPE = "application/yang-data+json" - - -class HttpApi(HttpApiBase): - def send_request(self, data, **message_kwargs): - if data: - data = json.dumps(data) - - path = "/".join( - [ - self.get_option("root_path").rstrip("/"), - message_kwargs.get("path", "").lstrip("/"), - ] - ) - - headers = { - "Content-Type": message_kwargs.get("content_type") or CONTENT_TYPE, - "Accept": message_kwargs.get("accept") or CONTENT_TYPE, - } - response, response_data = self.connection.send( - path, data, headers=headers, method=message_kwargs.get("method") - ) - - return handle_response(response, response_data) - - -def handle_response(response, response_data): - try: - response_data = json.loads(response_data.read()) - except ValueError: - response_data = response_data.read() - - if isinstance(response, HTTPError): - if response_data: - if "errors" in response_data: - errors = response_data["errors"]["error"] - error_text = "\n".join( - (error["error-message"] for error in errors) - ) - else: - error_text = response_data - - raise ConnectionError(error_text, code=response.code) - raise ConnectionError(to_text(response), code=response.code) - - return response_data diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py deleted file mode 100644 index 1f03299bea8..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py +++ /dev/null @@ -1,147 +0,0 @@ -# -# (c) 2018 Red Hat, Inc. -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . -# -import json - -from copy import deepcopy -from contextlib import contextmanager - -try: - from lxml.etree import fromstring, tostring -except ImportError: - from xml.etree.ElementTree import fromstring, tostring - -from ansible.module_utils._text import to_text, to_bytes -from ansible.module_utils.connection import Connection, ConnectionError -from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.netconf import ( - NetconfConnection, -) - - -IGNORE_XML_ATTRIBUTE = () - - -def get_connection(module): - if hasattr(module, "_netconf_connection"): - return module._netconf_connection - - capabilities = get_capabilities(module) - network_api = capabilities.get("network_api") - if network_api == "netconf": - module._netconf_connection = NetconfConnection(module._socket_path) - else: - module.fail_json(msg="Invalid connection type %s" % network_api) - - return module._netconf_connection - - -def get_capabilities(module): - if hasattr(module, "_netconf_capabilities"): - return module._netconf_capabilities - - capabilities = Connection(module._socket_path).get_capabilities() - module._netconf_capabilities = json.loads(capabilities) - return module._netconf_capabilities - - -def lock_configuration(module, target=None): - conn = get_connection(module) - return conn.lock(target=target) - - -def unlock_configuration(module, target=None): - conn = get_connection(module) - return conn.unlock(target=target) - - -@contextmanager -def locked_config(module, target=None): - try: - lock_configuration(module, target=target) - yield - finally: - unlock_configuration(module, target=target) - - -def get_config(module, source, filter=None, lock=False): - conn = get_connection(module) - try: - locked = False - if lock: - conn.lock(target=source) - locked = True - response = conn.get_config(source=source, filter=filter) - - except ConnectionError as e: - module.fail_json( - msg=to_text(e, errors="surrogate_then_replace").strip() - ) - - finally: - if locked: - conn.unlock(target=source) - - return response - - -def get(module, filter, lock=False): - conn = get_connection(module) - try: - locked = False - if lock: - conn.lock(target="running") - locked = True - - response = conn.get(filter=filter) - - except ConnectionError as e: - module.fail_json( - msg=to_text(e, errors="surrogate_then_replace").strip() - ) - - finally: - if locked: - conn.unlock(target="running") - - return response - - -def dispatch(module, request): - conn = get_connection(module) - try: - response = conn.dispatch(request) - except ConnectionError as e: - module.fail_json( - msg=to_text(e, errors="surrogate_then_replace").strip() - ) - - return response - - -def sanitize_xml(data): - tree = fromstring( - to_bytes(deepcopy(data), errors="surrogate_then_replace") - ) - for element in tree.getiterator(): - # remove attributes - attribute = element.attrib - if attribute: - for key in list(attribute): - if key not in IGNORE_XML_ATTRIBUTE: - attribute.pop(key) - return to_text(tostring(tree), errors="surrogate_then_replace").strip() diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/restconf/restconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/restconf/restconf.py deleted file mode 100644 index fba46be0d6b..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/restconf/restconf.py +++ /dev/null @@ -1,61 +0,0 @@ -# This code is part of Ansible, but is an independent component. -# This particular file snippet, and this file snippet only, is BSD licensed. -# Modules you write using this snippet, which is embedded dynamically by Ansible -# still belong to the author of the module, and may assign their own license -# to the complete work. -# -# (c) 2018 Red Hat Inc. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. -# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# - -from ansible.module_utils.connection import Connection - - -def get(module, path=None, content=None, fields=None, output="json"): - if path is None: - raise ValueError("path value must be provided") - if content: - path += "?" + "content=%s" % content - if fields: - path += "?" + "field=%s" % fields - - accept = None - if output == "xml": - accept = "application/yang-data+xml" - - connection = Connection(module._socket_path) - return connection.send_request( - None, path=path, method="GET", accept=accept - ) - - -def edit_config(module, path=None, content=None, method="GET", format="json"): - if path is None: - raise ValueError("path value must be provided") - - content_type = None - if format == "xml": - content_type = "application/yang-data+xml" - - connection = Connection(module._socket_path) - return connection.send_request( - content, path=path, method=method, content_type=content_type - ) diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_get.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_get.py deleted file mode 100644 index f0910f52e62..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_get.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# (c) 2018, Ansible by Red Hat, inc -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - - -ANSIBLE_METADATA = { - "metadata_version": "1.1", - "status": ["preview"], - "supported_by": "network", -} - - -DOCUMENTATION = """module: net_get -author: Deepak Agrawal (@dagrawal) -short_description: Copy a file from a network device to Ansible Controller -description: -- This module provides functionality to copy file from network device to ansible controller. -extends_documentation_fragment: -- ansible.netcommon.network_agnostic -options: - src: - description: - - Specifies the source file. The path to the source file can either be the full - path on the network device or a relative path as per path supported by destination - network device. - required: true - protocol: - description: - - Protocol used to transfer file. - default: scp - choices: - - scp - - sftp - dest: - description: - - Specifies the destination file. The path to the destination file can either - be the full path on the Ansible control host or a relative path from the playbook - or role root directory. - default: - - Same filename as specified in I(src). The path will be playbook root or role - root directory if playbook is part of a role. -requirements: -- scp -notes: -- Some devices need specific configurations to be enabled before scp can work These - configuration should be pre-configured before using this module e.g ios - C(ip scp - server enable). -- User privilege to do scp on network device should be pre-configured e.g. ios - need - user privilege 15 by default for allowing scp. -- Default destination of source file. -""" - -EXAMPLES = """ -- name: copy file from the network device to Ansible controller - net_get: - src: running_cfg_ios1.txt - -- name: copy file from ios to common location at /tmp - net_get: - src: running_cfg_sw1.txt - dest : /tmp/ios1.txt -""" - -RETURN = """ -""" diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_put.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_put.py deleted file mode 100644 index 2fc4a98c012..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_put.py +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# (c) 2018, Ansible by Red Hat, inc -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - - -ANSIBLE_METADATA = { - "metadata_version": "1.1", - "status": ["preview"], - "supported_by": "network", -} - - -DOCUMENTATION = """module: net_put -author: Deepak Agrawal (@dagrawal) -short_description: Copy a file from Ansible Controller to a network device -description: -- This module provides functionality to copy file from Ansible controller to network - devices. -extends_documentation_fragment: -- ansible.netcommon.network_agnostic -options: - src: - description: - - Specifies the source file. The path to the source file can either be the full - path on the Ansible control host or a relative path from the playbook or role - root directory. - required: true - protocol: - description: - - Protocol used to transfer file. - default: scp - choices: - - scp - - sftp - dest: - description: - - Specifies the destination file. The path to destination file can either be the - full path or relative path as supported by network_os. - default: - - Filename from src and at default directory of user shell on network_os. - required: false - mode: - description: - - Set the file transfer mode. If mode is set to I(text) then I(src) file will - go through Jinja2 template engine to replace any vars if present in the src - file. If mode is set to I(binary) then file will be copied as it is to destination - device. - default: binary - choices: - - binary - - text -requirements: -- scp -notes: -- Some devices need specific configurations to be enabled before scp can work These - configuration should be pre-configured before using this module e.g ios - C(ip scp - server enable). -- User privilege to do scp on network device should be pre-configured e.g. ios - need - user privilege 15 by default for allowing scp. -- Default destination of source file. -""" - -EXAMPLES = """ -- name: copy file from ansible controller to a network device - net_put: - src: running_cfg_ios1.txt - -- name: copy file at root dir of flash in slot 3 of sw1(ios) - net_put: - src: running_cfg_sw1.txt - protocol: sftp - dest : flash3:/running_cfg_sw1.txt -""" - -RETURN = """ -""" diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py deleted file mode 100644 index e9332f26d91..00000000000 --- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py +++ /dev/null @@ -1,70 +0,0 @@ -# -# (c) 2017 Red Hat Inc. -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . -# -from __future__ import absolute_import, division, print_function - -__metaclass__ = type - -DOCUMENTATION = """author: Ansible Networking Team -netconf: default -short_description: Use default netconf plugin to run standard netconf commands as - per RFC -description: -- This default plugin provides low level abstraction apis for sending and receiving - netconf commands as per Netconf RFC specification. -options: - ncclient_device_handler: - type: str - default: default - description: - - Specifies the ncclient device handler name for network os that support default - netconf implementation as per Netconf RFC specification. To identify the ncclient - device handler name refer ncclient library documentation. -""" -import json - -from ansible.module_utils._text import to_text -from ansible.plugins.netconf import NetconfBase - - -class Netconf(NetconfBase): - def get_text(self, ele, tag): - try: - return to_text( - ele.find(tag).text, errors="surrogate_then_replace" - ).strip() - except AttributeError: - pass - - def get_device_info(self): - device_info = dict() - device_info["network_os"] = "default" - return device_info - - def get_capabilities(self): - result = dict() - result["rpc"] = self.get_base_rpc() - result["network_api"] = "netconf" - result["device_info"] = self.get_device_info() - result["server_capabilities"] = [c for c in self.m.server_capabilities] - result["client_capabilities"] = [c for c in self.m.client_capabilities] - result["session_id"] = self.m.session_id - result["device_operations"] = self.get_device_operations( - result["server_capabilities"] - ) - return json.dumps(result)