Remove unused network test support files (#80080)

* Remove unused network test support files

* Remove obsolete ignores
pull/80084/head
Matt Clay 2 years ago committed by GitHub
parent 6bfe6b899a
commit 086046d478
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -183,7 +183,6 @@ test/support/integration/plugins/modules/sefcontext.py pylint:unused-import
test/support/integration/plugins/modules/zypper.py pylint:unused-import
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/utils.py pylint:unused-import
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/network_cli.py pylint:pointless-string-statement
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py pylint:pointless-string-statement
test/support/windows-integration/plugins/action/win_reboot.py pylint:unused-import
test/support/integration/plugins/modules/timezone.py pylint:disallowed-name
test/support/integration/plugins/module_utils/compat/ipaddress.py future-import-boilerplate
@ -192,12 +191,9 @@ test/support/integration/plugins/module_utils/compat/ipaddress.py no-unicode-lit
test/support/integration/plugins/module_utils/network/common/utils.py future-import-boilerplate
test/support/integration/plugins/module_utils/network/common/utils.py metaclass-boilerplate
test/support/integration/plugins/module_utils/network/common/utils.py pylint:use-a-generator
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py pylint:used-before-assignment
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py pylint:consider-using-dict-comprehension
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/compat/ipaddress.py no-unicode-literals
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/facts/facts.py pylint:unnecessary-comprehension
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/utils.py pylint:use-a-generator
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py pylint:unnecessary-comprehension
test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/cliconf/ios.py pylint:arguments-renamed
test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/modules/ios_config.py pep8:E501
test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/modules/ios_config.py pylint:used-before-assignment

@ -1,90 +0,0 @@
# Copyright: (c) 2015, Ansible Inc,
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import copy
from ansible.errors import AnsibleError
from ansible.plugins.action import ActionBase
from ansible.utils.display import Display
display = Display()
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
del tmp # tmp no longer has any effect
result = {}
play_context = copy.deepcopy(self._play_context)
play_context.network_os = self._get_network_os(task_vars)
new_task = self._task.copy()
module = self._get_implementation_module(
play_context.network_os, self._task.action
)
if not module:
if self._task.args["fail_on_missing_module"]:
result["failed"] = True
else:
result["failed"] = False
result["msg"] = (
"Could not find implementation module %s for %s"
% (self._task.action, play_context.network_os)
)
return result
new_task.action = module
action = self._shared_loader_obj.action_loader.get(
play_context.network_os,
task=new_task,
connection=self._connection,
play_context=play_context,
loader=self._loader,
templar=self._templar,
shared_loader_obj=self._shared_loader_obj,
)
display.vvvv("Running implementation module %s" % module)
return action.run(task_vars=task_vars)
def _get_network_os(self, task_vars):
if "network_os" in self._task.args and self._task.args["network_os"]:
display.vvvv("Getting network OS from task argument")
network_os = self._task.args["network_os"]
elif self._play_context.network_os:
display.vvvv("Getting network OS from inventory")
network_os = self._play_context.network_os
elif (
"network_os" in task_vars.get("ansible_facts", {})
and task_vars["ansible_facts"]["network_os"]
):
display.vvvv("Getting network OS from fact")
network_os = task_vars["ansible_facts"]["network_os"]
else:
raise AnsibleError(
"ansible_network_os must be specified on this host to use platform agnostic modules"
)
return network_os
def _get_implementation_module(self, network_os, platform_agnostic_module):
module_name = (
network_os.split(".")[-1]
+ "_"
+ platform_agnostic_module.partition("_")[2]
)
if "." in network_os:
fqcn_module = ".".join(network_os.split(".")[0:-1])
implementation_module = fqcn_module + "." + module_name
else:
implementation_module = module_name
if implementation_module not in self._shared_loader_obj.module_loader:
implementation_module = None
return implementation_module

@ -1,42 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """become: enable
short_description: Switch to elevated permissions on a network device
description:
- This become plugins allows elevated permissions on a remote network device.
author: ansible (@core)
options:
become_pass:
description: password
ini:
- section: enable_become_plugin
key: password
vars:
- name: ansible_become_password
- name: ansible_become_pass
- name: ansible_enable_pass
env:
- name: ANSIBLE_BECOME_PASS
- name: ANSIBLE_ENABLE_PASS
notes:
- enable is really implemented in the network connection handler and as such can only
be used with network connections.
- This plugin ignores the 'become_exe' and 'become_user' settings as it uses an API
and not an executable.
"""
from ansible.plugins.become import BecomeBase
class BecomeModule(BecomeBase):
name = "ansible.netcommon.enable"
def build_become_command(self, cmd, shell):
# enable is implemented inside the network connection plugins
return cmd

@ -1,324 +0,0 @@
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """author: Ansible Networking Team
connection: httpapi
short_description: Use httpapi to run command on network appliances
description:
- This connection plugin provides a connection to remote devices over a HTTP(S)-based
api.
options:
host:
description:
- Specifies the remote device FQDN or IP address to establish the HTTP(S) connection
to.
default: inventory_hostname
vars:
- name: ansible_host
port:
type: int
description:
- Specifies the port on the remote device that listens for connections when establishing
the HTTP(S) connection.
- When unspecified, will pick 80 or 443 based on the value of use_ssl.
ini:
- section: defaults
key: remote_port
env:
- name: ANSIBLE_REMOTE_PORT
vars:
- name: ansible_httpapi_port
network_os:
description:
- Configures the device platform network operating system. This value is used
to load the correct httpapi plugin to communicate with the remote device
vars:
- name: ansible_network_os
remote_user:
description:
- The username used to authenticate to the remote device when the API connection
is first established. If the remote_user is not specified, the connection will
use the username of the logged in user.
- Can be configured from the CLI via the C(--user) or C(-u) options.
ini:
- section: defaults
key: remote_user
env:
- name: ANSIBLE_REMOTE_USER
vars:
- name: ansible_user
password:
description:
- Configures the user password used to authenticate to the remote device when
needed for the device API.
vars:
- name: ansible_password
- name: ansible_httpapi_pass
- name: ansible_httpapi_password
use_ssl:
type: boolean
description:
- Whether to connect using SSL (HTTPS) or not (HTTP).
default: false
vars:
- name: ansible_httpapi_use_ssl
validate_certs:
type: boolean
description:
- Whether to validate SSL certificates
default: true
vars:
- name: ansible_httpapi_validate_certs
use_proxy:
type: boolean
description:
- Whether to use https_proxy for requests.
default: true
vars:
- name: ansible_httpapi_use_proxy
become:
type: boolean
description:
- The become option will instruct the CLI session to attempt privilege escalation
on platforms that support it. Normally this means transitioning from user mode
to C(enable) mode in the CLI session. If become is set to True and the remote
device does not support privilege escalation or the privilege has already been
elevated, then this option is silently ignored.
- Can be configured from the CLI via the C(--become) or C(-b) options.
default: false
ini:
- section: privilege_escalation
key: become
env:
- name: ANSIBLE_BECOME
vars:
- name: ansible_become
become_method:
description:
- This option allows the become method to be specified in for handling privilege
escalation. Typically the become_method value is set to C(enable) but could
be defined as other values.
default: sudo
ini:
- section: privilege_escalation
key: become_method
env:
- name: ANSIBLE_BECOME_METHOD
vars:
- name: ansible_become_method
persistent_connect_timeout:
type: int
description:
- Configures, in seconds, the amount of time to wait when trying to initially
establish a persistent connection. If this value expires before the connection
to the remote device is completed, the connection will fail.
default: 30
ini:
- section: persistent_connection
key: connect_timeout
env:
- name: ANSIBLE_PERSISTENT_CONNECT_TIMEOUT
vars:
- name: ansible_connect_timeout
persistent_command_timeout:
type: int
description:
- Configures, in seconds, the amount of time to wait for a command to return from
the remote device. If this timer is exceeded before the command returns, the
connection plugin will raise an exception and close.
default: 30
ini:
- section: persistent_connection
key: command_timeout
env:
- name: ANSIBLE_PERSISTENT_COMMAND_TIMEOUT
vars:
- name: ansible_command_timeout
persistent_log_messages:
type: boolean
description:
- This flag will enable logging the command executed and response received from
target device in the ansible log file. For this option to work 'log_path' ansible
configuration option is required to be set to a file path with write access.
- Be sure to fully understand the security implications of enabling this option
as it could create a security vulnerability by logging sensitive information
in log file.
default: false
ini:
- section: persistent_connection
key: log_messages
env:
- name: ANSIBLE_PERSISTENT_LOG_MESSAGES
vars:
- name: ansible_persistent_log_messages
"""
from io import BytesIO
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils._text import to_bytes
from ansible.module_utils.six import PY3
from ansible.module_utils.six.moves import cPickle
from ansible.module_utils.six.moves.urllib.error import HTTPError, URLError
from ansible.module_utils.urls import open_url
from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import httpapi_loader
from ansible.plugins.connection import NetworkConnectionBase, ensure_connect
class Connection(NetworkConnectionBase):
"""Network API connection"""
transport = "ansible.netcommon.httpapi"
has_pipelining = True
def __init__(self, play_context, new_stdin, *args, **kwargs):
super(Connection, self).__init__(
play_context, new_stdin, *args, **kwargs
)
self._url = None
self._auth = None
if self._network_os:
self.httpapi = httpapi_loader.get(self._network_os, self)
if self.httpapi:
self._sub_plugin = {
"type": "httpapi",
"name": self.httpapi._load_name,
"obj": self.httpapi,
}
self.queue_message(
"vvvv",
"loaded API plugin %s from path %s for network_os %s"
% (
self.httpapi._load_name,
self.httpapi._original_path,
self._network_os,
),
)
else:
raise AnsibleConnectionFailure(
"unable to load API plugin for network_os %s"
% self._network_os
)
else:
raise AnsibleConnectionFailure(
"Unable to automatically determine host network os. Please "
"manually configure ansible_network_os value for this host"
)
self.queue_message("log", "network_os is set to %s" % self._network_os)
def update_play_context(self, pc_data):
"""Updates the play context information for the connection"""
pc_data = to_bytes(pc_data)
if PY3:
pc_data = cPickle.loads(pc_data, encoding="bytes")
else:
pc_data = cPickle.loads(pc_data)
play_context = PlayContext()
play_context.deserialize(pc_data)
self.queue_message("vvvv", "updating play_context for connection")
if self._play_context.become ^ play_context.become:
self.set_become(play_context)
if play_context.become is True:
self.queue_message("vvvv", "authorizing connection")
else:
self.queue_message("vvvv", "deauthorizing connection")
self._play_context = play_context
def _connect(self):
if not self.connected:
protocol = "https" if self.get_option("use_ssl") else "http"
host = self.get_option("host")
port = self.get_option("port") or (
443 if protocol == "https" else 80
)
self._url = "%s://%s:%s" % (protocol, host, port)
self.queue_message(
"vvv",
"ESTABLISH HTTP(S) CONNECTFOR USER: %s TO %s"
% (self._play_context.remote_user, self._url),
)
self.httpapi.set_become(self._play_context)
self._connected = True
self.httpapi.login(
self.get_option("remote_user"), self.get_option("password")
)
def close(self):
"""
Close the active session to the device
"""
# only close the connection if its connected.
if self._connected:
self.queue_message("vvvv", "closing http(s) connection to device")
self.logout()
super(Connection, self).close()
@ensure_connect
def send(self, path, data, **kwargs):
"""
Sends the command to the device over api
"""
url_kwargs = dict(
timeout=self.get_option("persistent_command_timeout"),
validate_certs=self.get_option("validate_certs"),
use_proxy=self.get_option("use_proxy"),
headers={},
)
url_kwargs.update(kwargs)
if self._auth:
# Avoid modifying passed-in headers
headers = dict(kwargs.get("headers", {}))
headers.update(self._auth)
url_kwargs["headers"] = headers
else:
url_kwargs["force_basic_auth"] = True
url_kwargs["url_username"] = self.get_option("remote_user")
url_kwargs["url_password"] = self.get_option("password")
try:
url = self._url + path
self._log_messages(
"send url '%s' with data '%s' and kwargs '%s'"
% (url, data, url_kwargs)
)
response = open_url(url, data=data, **url_kwargs)
except HTTPError as exc:
is_handled = self.handle_httperror(exc)
if is_handled is True:
return self.send(path, data, **kwargs)
elif is_handled is False:
raise
else:
response = is_handled
except URLError as exc:
raise AnsibleConnectionFailure(
"Could not connect to {0}: {1}".format(
self._url + path, exc.reason
)
)
response_buffer = BytesIO()
resp_data = response.read()
self._log_messages("received response: '%s'" % resp_data)
response_buffer.write(resp_data)
# Try to assign a new auth token if one is given
self._auth = self.update_auth(response, response_buffer) or self._auth
response_buffer.seek(0)
return response, response_buffer

@ -1,404 +0,0 @@
# (c) 2016 Red Hat Inc.
# (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """author: Ansible Networking Team
connection: netconf
short_description: Provides a persistent connection using the netconf protocol
description:
- This connection plugin provides a connection to remote devices over the SSH NETCONF
subsystem. This connection plugin is typically used by network devices for sending
and receiving RPC calls over NETCONF.
- Note this connection plugin requires ncclient to be installed on the local Ansible
controller.
requirements:
- ncclient
options:
host:
description:
- Specifies the remote device FQDN or IP address to establish the SSH connection
to.
default: inventory_hostname
vars:
- name: ansible_host
port:
type: int
description:
- Specifies the port on the remote device that listens for connections when establishing
the SSH connection.
default: 830
ini:
- section: defaults
key: remote_port
env:
- name: ANSIBLE_REMOTE_PORT
vars:
- name: ansible_port
network_os:
description:
- Configures the device platform network operating system. This value is used
to load a device specific netconf plugin. If this option is not configured
(or set to C(auto)), then Ansible will attempt to guess the correct network_os
to use. If it can not guess a network_os correctly it will use C(default).
vars:
- name: ansible_network_os
remote_user:
description:
- The username used to authenticate to the remote device when the SSH connection
is first established. If the remote_user is not specified, the connection will
use the username of the logged in user.
- Can be configured from the CLI via the C(--user) or C(-u) options.
ini:
- section: defaults
key: remote_user
env:
- name: ANSIBLE_REMOTE_USER
vars:
- name: ansible_user
password:
description:
- Configures the user password used to authenticate to the remote device when
first establishing the SSH connection.
vars:
- name: ansible_password
- name: ansible_ssh_pass
- name: ansible_ssh_password
- name: ansible_netconf_password
private_key_file:
description:
- The private SSH key or certificate file used to authenticate to the remote device
when first establishing the SSH connection.
ini:
- section: defaults
key: private_key_file
env:
- name: ANSIBLE_PRIVATE_KEY_FILE
vars:
- name: ansible_private_key_file
look_for_keys:
default: true
description:
- Enables looking for ssh keys in the usual locations for ssh keys (e.g. :file:`~/.ssh/id_*`).
env:
- name: ANSIBLE_PARAMIKO_LOOK_FOR_KEYS
ini:
- section: paramiko_connection
key: look_for_keys
type: boolean
host_key_checking:
description: Set this to "False" if you want to avoid host key checking by the
underlying tools Ansible uses to connect to the host
type: boolean
default: true
env:
- name: ANSIBLE_HOST_KEY_CHECKING
- name: ANSIBLE_SSH_HOST_KEY_CHECKING
- name: ANSIBLE_NETCONF_HOST_KEY_CHECKING
ini:
- section: defaults
key: host_key_checking
- section: paramiko_connection
key: host_key_checking
vars:
- name: ansible_host_key_checking
- name: ansible_ssh_host_key_checking
- name: ansible_netconf_host_key_checking
persistent_connect_timeout:
type: int
description:
- Configures, in seconds, the amount of time to wait when trying to initially
establish a persistent connection. If this value expires before the connection
to the remote device is completed, the connection will fail.
default: 30
ini:
- section: persistent_connection
key: connect_timeout
env:
- name: ANSIBLE_PERSISTENT_CONNECT_TIMEOUT
vars:
- name: ansible_connect_timeout
persistent_command_timeout:
type: int
description:
- Configures, in seconds, the amount of time to wait for a command to return from
the remote device. If this timer is exceeded before the command returns, the
connection plugin will raise an exception and close.
default: 30
ini:
- section: persistent_connection
key: command_timeout
env:
- name: ANSIBLE_PERSISTENT_COMMAND_TIMEOUT
vars:
- name: ansible_command_timeout
netconf_ssh_config:
description:
- This variable is used to enable bastion/jump host with netconf connection. If
set to True the bastion/jump host ssh settings should be present in ~/.ssh/config
file, alternatively it can be set to custom ssh configuration file path to read
the bastion/jump host settings.
ini:
- section: netconf_connection
key: ssh_config
version_added: '2.7'
env:
- name: ANSIBLE_NETCONF_SSH_CONFIG
vars:
- name: ansible_netconf_ssh_config
version_added: '2.7'
persistent_log_messages:
type: boolean
description:
- This flag will enable logging the command executed and response received from
target device in the ansible log file. For this option to work 'log_path' ansible
configuration option is required to be set to a file path with write access.
- Be sure to fully understand the security implications of enabling this option
as it could create a security vulnerability by logging sensitive information
in log file.
default: false
ini:
- section: persistent_connection
key: log_messages
env:
- name: ANSIBLE_PERSISTENT_LOG_MESSAGES
vars:
- name: ansible_persistent_log_messages
"""
import os
import logging
import json
from ansible.errors import AnsibleConnectionFailure, AnsibleError
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.parsing.convert_bool import (
BOOLEANS_TRUE,
BOOLEANS_FALSE,
)
from ansible.plugins.loader import netconf_loader
from ansible.plugins.connection import NetworkConnectionBase, ensure_connect
try:
from ncclient import manager
from ncclient.operations import RPCError
from ncclient.transport.errors import SSHUnknownHostError
from ncclient.xml_ import to_ele, to_xml
HAS_NCCLIENT = True
NCCLIENT_IMP_ERR = None
except (
ImportError,
AttributeError,
) as err: # paramiko and gssapi are incompatible and raise AttributeError not ImportError
HAS_NCCLIENT = False
NCCLIENT_IMP_ERR = err
logging.getLogger("ncclient").setLevel(logging.INFO)
class Connection(NetworkConnectionBase):
"""NetConf connections"""
transport = "ansible.netcommon.netconf"
has_pipelining = False
def __init__(self, play_context, new_stdin, *args, **kwargs):
super(Connection, self).__init__(
play_context, new_stdin, *args, **kwargs
)
# If network_os is not specified then set the network os to auto
# This will be used to trigger the use of guess_network_os when connecting.
self._network_os = self._network_os or "auto"
self.netconf = netconf_loader.get(self._network_os, self)
if self.netconf:
self._sub_plugin = {
"type": "netconf",
"name": self.netconf._load_name,
"obj": self.netconf,
}
self.queue_message(
"vvvv",
"loaded netconf plugin %s from path %s for network_os %s"
% (
self.netconf._load_name,
self.netconf._original_path,
self._network_os,
),
)
else:
self.netconf = netconf_loader.get("default", self)
self._sub_plugin = {
"type": "netconf",
"name": "default",
"obj": self.netconf,
}
self.queue_message(
"display",
"unable to load netconf plugin for network_os %s, falling back to default plugin"
% self._network_os,
)
self.queue_message("log", "network_os is set to %s" % self._network_os)
self._manager = None
self.key_filename = None
self._ssh_config = None
def exec_command(self, cmd, in_data=None, sudoable=True):
"""Sends the request to the node and returns the reply
The method accepts two forms of request. The first form is as a byte
string that represents xml string be send over netconf session.
The second form is a json-rpc (2.0) byte string.
"""
if self._manager:
# to_ele operates on native strings
request = to_ele(to_native(cmd, errors="surrogate_or_strict"))
if request is None:
return "unable to parse request"
try:
reply = self._manager.rpc(request)
except RPCError as exc:
error = self.internal_error(
data=to_text(to_xml(exc.xml), errors="surrogate_or_strict")
)
return json.dumps(error)
return reply.data_xml
else:
return super(Connection, self).exec_command(cmd, in_data, sudoable)
@property
@ensure_connect
def manager(self):
return self._manager
def _connect(self):
if not HAS_NCCLIENT:
raise AnsibleError(
"%s: %s"
% (
missing_required_lib("ncclient"),
to_native(NCCLIENT_IMP_ERR),
)
)
self.queue_message("log", "ssh connection done, starting ncclient")
allow_agent = True
if self._play_context.password is not None:
allow_agent = False
setattr(self._play_context, "allow_agent", allow_agent)
self.key_filename = (
self._play_context.private_key_file
or self.get_option("private_key_file")
)
if self.key_filename:
self.key_filename = str(os.path.expanduser(self.key_filename))
self._ssh_config = self.get_option("netconf_ssh_config")
if self._ssh_config in BOOLEANS_TRUE:
self._ssh_config = True
elif self._ssh_config in BOOLEANS_FALSE:
self._ssh_config = None
# Try to guess the network_os if the network_os is set to auto
if self._network_os == "auto":
for cls in netconf_loader.all(class_only=True):
network_os = cls.guess_network_os(self)
if network_os:
self.queue_message(
"vvv", "discovered network_os %s" % network_os
)
self._network_os = network_os
# If we have tried to detect the network_os but were unable to i.e. network_os is still 'auto'
# then use default as the network_os
if self._network_os == "auto":
# Network os not discovered. Set it to default
self.queue_message(
"vvv",
"Unable to discover network_os. Falling back to default.",
)
self._network_os = "default"
try:
ncclient_device_handler = self.netconf.get_option(
"ncclient_device_handler"
)
except KeyError:
ncclient_device_handler = "default"
self.queue_message(
"vvv",
"identified ncclient device handler: %s."
% ncclient_device_handler,
)
device_params = {"name": ncclient_device_handler}
try:
port = self._play_context.port or 830
self.queue_message(
"vvv",
"ESTABLISH NETCONF SSH CONNECTION FOR USER: %s on PORT %s TO %s WITH SSH_CONFIG = %s"
% (
self._play_context.remote_user,
port,
self._play_context.remote_addr,
self._ssh_config,
),
)
self._manager = manager.connect(
host=self._play_context.remote_addr,
port=port,
username=self._play_context.remote_user,
password=self._play_context.password,
key_filename=self.key_filename,
hostkey_verify=self.get_option("host_key_checking"),
look_for_keys=self.get_option("look_for_keys"),
device_params=device_params,
allow_agent=self._play_context.allow_agent,
timeout=self.get_option("persistent_connect_timeout"),
ssh_config=self._ssh_config,
)
self._manager._timeout = self.get_option(
"persistent_command_timeout"
)
except SSHUnknownHostError as exc:
raise AnsibleConnectionFailure(to_native(exc))
except ImportError:
raise AnsibleError(
"connection=netconf is not supported on {0}".format(
self._network_os
)
)
if not self._manager.connected:
return 1, b"", b"not connected"
self.queue_message(
"log", "ncclient manager object created successfully"
)
self._connected = True
super(Connection, self)._connect()
return (
0,
to_bytes(self._manager.session_id, errors="surrogate_or_strict"),
b"",
)
def close(self):
if self._manager:
self._manager.close_session()
super(Connection, self).close()

@ -1,66 +0,0 @@
# -*- coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
class ModuleDocFragment(object):
# Standard files documentation fragment
DOCUMENTATION = r"""options:
host:
description:
- Specifies the DNS host name or address for connecting to the remote device over
the specified transport. The value of host is used as the destination address
for the transport.
type: str
required: true
port:
description:
- Specifies the port to use when building the connection to the remote device. The
port value will default to port 830.
type: int
default: 830
username:
description:
- Configures the username to use to authenticate the connection to the remote
device. This value is used to authenticate the SSH session. If the value is
not specified in the task, the value of environment variable C(ANSIBLE_NET_USERNAME)
will be used instead.
type: str
password:
description:
- Specifies the password to use to authenticate the connection to the remote device. This
value is used to authenticate the SSH session. If the value is not specified
in the task, the value of environment variable C(ANSIBLE_NET_PASSWORD) will
be used instead.
type: str
timeout:
description:
- Specifies the timeout in seconds for communicating with the network device for
either connecting or sending commands. If the timeout is exceeded before the
operation is completed, the module will error.
type: int
default: 10
ssh_keyfile:
description:
- Specifies the SSH key to use to authenticate the connection to the remote device. This
value is the path to the key used to authenticate the SSH session. If the value
is not specified in the task, the value of environment variable C(ANSIBLE_NET_SSH_KEYFILE)
will be used instead.
type: path
hostkey_verify:
description:
- If set to C(yes), the ssh host key of the device must match a ssh key present
on the host if set to C(no), the ssh host key of the device is not checked.
type: bool
default: true
look_for_keys:
description:
- Enables looking in the usual locations for the ssh keys (e.g. :file:`~/.ssh/id_*`)
type: bool
default: true
notes:
- For information on using netconf see the :ref:`Platform Options guide using Netconf<netconf_enabled_platform_options>`
- For more information on using Ansible to manage network devices see the :ref:`Ansible
Network Guide <network_guide>`
"""

@ -1,14 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2019 Ansible, Inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
class ModuleDocFragment(object):
# Standard files documentation fragment
DOCUMENTATION = r"""options: {}
notes:
- This module is supported on C(ansible_network_os) network platforms. See the :ref:`Network
Platform Options <platform_options>` for details.
"""

@ -1,531 +0,0 @@
#
# {c) 2017 Red Hat, Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import re
import os
import traceback
import string
from collections.abc import Mapping
from xml.etree.ElementTree import fromstring
from ansible.module_utils._text import to_native, to_text
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
Template,
)
from ansible.module_utils.six import iteritems, string_types
from ansible.errors import AnsibleError, AnsibleFilterError
from ansible.utils.display import Display
from ansible.utils.encrypt import passlib_or_crypt, random_password
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
try:
import textfsm
HAS_TEXTFSM = True
except ImportError:
HAS_TEXTFSM = False
display = Display()
def re_matchall(regex, value):
objects = list()
for match in re.findall(regex.pattern, value, re.M):
obj = {}
if regex.groupindex:
for name, index in iteritems(regex.groupindex):
if len(regex.groupindex) == 1:
obj[name] = match
else:
obj[name] = match[index - 1]
objects.append(obj)
return objects
def re_search(regex, value):
obj = {}
match = regex.search(value, re.M)
if match:
items = list(match.groups())
if regex.groupindex:
for name, index in iteritems(regex.groupindex):
obj[name] = items[index - 1]
return obj
def parse_cli(output, tmpl):
if not isinstance(output, string_types):
raise AnsibleError(
"parse_cli input should be a string, but was given a input of %s"
% (type(output))
)
if not os.path.exists(tmpl):
raise AnsibleError("unable to locate parse_cli template: %s" % tmpl)
try:
template = Template()
except ImportError as exc:
raise AnsibleError(to_native(exc))
with open(tmpl) as tmpl_fh:
tmpl_content = tmpl_fh.read()
spec = yaml.safe_load(tmpl_content)
obj = {}
for name, attrs in iteritems(spec["keys"]):
value = attrs["value"]
try:
variables = spec.get("vars", {})
value = template(value, variables)
except Exception:
pass
if "start_block" in attrs and "end_block" in attrs:
start_block = re.compile(attrs["start_block"])
end_block = re.compile(attrs["end_block"])
blocks = list()
lines = None
block_started = False
for line in output.split("\n"):
match_start = start_block.match(line)
match_end = end_block.match(line)
if match_start:
lines = list()
lines.append(line)
block_started = True
elif match_end:
if lines:
lines.append(line)
blocks.append("\n".join(lines))
block_started = False
elif block_started:
if lines:
lines.append(line)
regex_items = [re.compile(r) for r in attrs["items"]]
objects = list()
for block in blocks:
if isinstance(value, Mapping) and "key" not in value:
items = list()
for regex in regex_items:
match = regex.search(block)
if match:
item_values = match.groupdict()
item_values["match"] = list(match.groups())
items.append(item_values)
else:
items.append(None)
obj = {}
for k, v in iteritems(value):
try:
obj[k] = template(
v, {"item": items}, fail_on_undefined=False
)
except Exception:
obj[k] = None
objects.append(obj)
elif isinstance(value, Mapping):
items = list()
for regex in regex_items:
match = regex.search(block)
if match:
item_values = match.groupdict()
item_values["match"] = list(match.groups())
items.append(item_values)
else:
items.append(None)
key = template(value["key"], {"item": items})
values = dict(
[
(k, template(v, {"item": items}))
for k, v in iteritems(value["values"])
]
)
objects.append({key: values})
return objects
elif "items" in attrs:
regexp = re.compile(attrs["items"])
when = attrs.get("when")
conditional = (
"{%% if %s %%}True{%% else %%}False{%% endif %%}" % when
)
if isinstance(value, Mapping) and "key" not in value:
values = list()
for item in re_matchall(regexp, output):
entry = {}
for item_key, item_value in iteritems(value):
entry[item_key] = template(item_value, {"item": item})
if when:
if template(conditional, {"item": entry}):
values.append(entry)
else:
values.append(entry)
obj[name] = values
elif isinstance(value, Mapping):
values = dict()
for item in re_matchall(regexp, output):
entry = {}
for item_key, item_value in iteritems(value["values"]):
entry[item_key] = template(item_value, {"item": item})
key = template(value["key"], {"item": item})
if when:
if template(
conditional, {"item": {"key": key, "value": entry}}
):
values[key] = entry
else:
values[key] = entry
obj[name] = values
else:
item = re_search(regexp, output)
obj[name] = template(value, {"item": item})
else:
obj[name] = value
return obj
def parse_cli_textfsm(value, template):
if not HAS_TEXTFSM:
raise AnsibleError(
"parse_cli_textfsm filter requires TextFSM library to be installed"
)
if not isinstance(value, string_types):
raise AnsibleError(
"parse_cli_textfsm input should be a string, but was given a input of %s"
% (type(value))
)
if not os.path.exists(template):
raise AnsibleError(
"unable to locate parse_cli_textfsm template: %s" % template
)
try:
template = open(template)
except IOError as exc:
raise AnsibleError(to_native(exc))
re_table = textfsm.TextFSM(template)
fsm_results = re_table.ParseText(value)
results = list()
for item in fsm_results:
results.append(dict(zip(re_table.header, item)))
return results
def _extract_param(template, root, attrs, value):
key = None
when = attrs.get("when")
conditional = "{%% if %s %%}True{%% else %%}False{%% endif %%}" % when
param_to_xpath_map = attrs["items"]
if isinstance(value, Mapping):
key = value.get("key", None)
if key:
value = value["values"]
entries = dict() if key else list()
for element in root.findall(attrs["top"]):
entry = dict()
item_dict = dict()
for param, param_xpath in iteritems(param_to_xpath_map):
fields = None
try:
fields = element.findall(param_xpath)
except Exception:
display.warning(
"Failed to evaluate value of '%s' with XPath '%s'.\nUnexpected error: %s."
% (param, param_xpath, traceback.format_exc())
)
tags = param_xpath.split("/")
# check if xpath ends with attribute.
# If yes set attribute key/value dict to param value in case attribute matches
# else if it is a normal xpath assign matched element text value.
if len(tags) and tags[-1].endswith("]"):
if fields:
if len(fields) > 1:
item_dict[param] = [field.attrib for field in fields]
else:
item_dict[param] = fields[0].attrib
else:
item_dict[param] = {}
else:
if fields:
if len(fields) > 1:
item_dict[param] = [field.text for field in fields]
else:
item_dict[param] = fields[0].text
else:
item_dict[param] = None
if isinstance(value, Mapping):
for item_key, item_value in iteritems(value):
entry[item_key] = template(item_value, {"item": item_dict})
else:
entry = template(value, {"item": item_dict})
if key:
expanded_key = template(key, {"item": item_dict})
if when:
if template(
conditional,
{"item": {"key": expanded_key, "value": entry}},
):
entries[expanded_key] = entry
else:
entries[expanded_key] = entry
else:
if when:
if template(conditional, {"item": entry}):
entries.append(entry)
else:
entries.append(entry)
return entries
def parse_xml(output, tmpl):
if not os.path.exists(tmpl):
raise AnsibleError("unable to locate parse_xml template: %s" % tmpl)
if not isinstance(output, string_types):
raise AnsibleError(
"parse_xml works on string input, but given input of : %s"
% type(output)
)
root = fromstring(output)
try:
template = Template()
except ImportError as exc:
raise AnsibleError(to_native(exc))
with open(tmpl) as tmpl_fh:
tmpl_content = tmpl_fh.read()
spec = yaml.safe_load(tmpl_content)
obj = {}
for name, attrs in iteritems(spec["keys"]):
value = attrs["value"]
try:
variables = spec.get("vars", {})
value = template(value, variables)
except Exception:
pass
if "items" in attrs:
obj[name] = _extract_param(template, root, attrs, value)
else:
obj[name] = value
return obj
def type5_pw(password, salt=None):
if not isinstance(password, string_types):
raise AnsibleFilterError(
"type5_pw password input should be a string, but was given a input of %s"
% (type(password).__name__)
)
salt_chars = u"".join(
(to_text(string.ascii_letters), to_text(string.digits), u"./")
)
if salt is not None and not isinstance(salt, string_types):
raise AnsibleFilterError(
"type5_pw salt input should be a string, but was given a input of %s"
% (type(salt).__name__)
)
elif not salt:
salt = random_password(length=4, chars=salt_chars)
elif not set(salt) <= set(salt_chars):
raise AnsibleFilterError(
"type5_pw salt used inproper characters, must be one of %s"
% (salt_chars)
)
encrypted_password = passlib_or_crypt(password, "md5_crypt", salt=salt)
return encrypted_password
def hash_salt(password):
split_password = password.split("$")
if len(split_password) != 4:
raise AnsibleFilterError(
"Could not parse salt out password correctly from {0}".format(
password
)
)
else:
return split_password[2]
def comp_type5(
unencrypted_password, encrypted_password, return_original=False
):
salt = hash_salt(encrypted_password)
if type5_pw(unencrypted_password, salt) == encrypted_password:
if return_original is True:
return encrypted_password
else:
return True
return False
def vlan_parser(vlan_list, first_line_len=48, other_line_len=44):
"""
Input: Unsorted list of vlan integers
Output: Sorted string list of integers according to IOS-like vlan list rules
1. Vlans are listed in ascending order
2. Runs of 3 or more consecutive vlans are listed with a dash
3. The first line of the list can be first_line_len characters long
4. Subsequent list lines can be other_line_len characters
"""
# Sort and remove duplicates
sorted_list = sorted(set(vlan_list))
if sorted_list[0] < 1 or sorted_list[-1] > 4094:
raise AnsibleFilterError("Valid VLAN range is 1-4094")
parse_list = []
idx = 0
while idx < len(sorted_list):
start = idx
end = start
while end < len(sorted_list) - 1:
if sorted_list[end + 1] - sorted_list[end] == 1:
end += 1
else:
break
if start == end:
# Single VLAN
parse_list.append(str(sorted_list[idx]))
elif start + 1 == end:
# Run of 2 VLANs
parse_list.append(str(sorted_list[start]))
parse_list.append(str(sorted_list[end]))
else:
# Run of 3 or more VLANs
parse_list.append(
str(sorted_list[start]) + "-" + str(sorted_list[end])
)
idx = end + 1
line_count = 0
result = [""]
for vlans in parse_list:
# First line (" switchport trunk allowed vlan ")
if line_count == 0:
if len(result[line_count] + vlans) > first_line_len:
result.append("")
line_count += 1
result[line_count] += vlans + ","
else:
result[line_count] += vlans + ","
# Subsequent lines (" switchport trunk allowed vlan add ")
else:
if len(result[line_count] + vlans) > other_line_len:
result.append("")
line_count += 1
result[line_count] += vlans + ","
else:
result[line_count] += vlans + ","
# Remove trailing orphan commas
for idx in range(0, len(result)):
result[idx] = result[idx].rstrip(",")
# Sometimes text wraps to next line, but there are no remaining VLANs
if "" in result:
result.remove("")
return result
class FilterModule(object):
"""Filters for working with output from network devices"""
filter_map = {
"parse_cli": parse_cli,
"parse_cli_textfsm": parse_cli_textfsm,
"parse_xml": parse_xml,
"type5_pw": type5_pw,
"hash_salt": hash_salt,
"comp_type5": comp_type5,
"vlan_parser": vlan_parser,
}
def filters(self):
return self.filter_map

@ -1,91 +0,0 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """author: Ansible Networking Team
httpapi: restconf
short_description: HttpApi Plugin for devices supporting Restconf API
description:
- This HttpApi plugin provides methods to connect to Restconf API endpoints.
options:
root_path:
type: str
description:
- Specifies the location of the Restconf root.
default: /restconf
vars:
- name: ansible_httpapi_restconf_root
"""
import json
from ansible.module_utils._text import to_text
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.plugins.httpapi import HttpApiBase
CONTENT_TYPE = "application/yang-data+json"
class HttpApi(HttpApiBase):
def send_request(self, data, **message_kwargs):
if data:
data = json.dumps(data)
path = "/".join(
[
self.get_option("root_path").rstrip("/"),
message_kwargs.get("path", "").lstrip("/"),
]
)
headers = {
"Content-Type": message_kwargs.get("content_type") or CONTENT_TYPE,
"Accept": message_kwargs.get("accept") or CONTENT_TYPE,
}
response, response_data = self.connection.send(
path, data, headers=headers, method=message_kwargs.get("method")
)
return handle_response(response, response_data)
def handle_response(response, response_data):
try:
response_data = json.loads(response_data.read())
except ValueError:
response_data = response_data.read()
if isinstance(response, HTTPError):
if response_data:
if "errors" in response_data:
errors = response_data["errors"]["error"]
error_text = "\n".join(
(error["error-message"] for error in errors)
)
else:
error_text = response_data
raise ConnectionError(error_text, code=response.code)
raise ConnectionError(to_text(response), code=response.code)
return response_data

@ -1,147 +0,0 @@
#
# (c) 2018 Red Hat, Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import json
from copy import deepcopy
from contextlib import contextmanager
try:
from lxml.etree import fromstring, tostring
except ImportError:
from xml.etree.ElementTree import fromstring, tostring
from ansible.module_utils._text import to_text, to_bytes
from ansible.module_utils.connection import Connection, ConnectionError
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.netconf import (
NetconfConnection,
)
IGNORE_XML_ATTRIBUTE = ()
def get_connection(module):
if hasattr(module, "_netconf_connection"):
return module._netconf_connection
capabilities = get_capabilities(module)
network_api = capabilities.get("network_api")
if network_api == "netconf":
module._netconf_connection = NetconfConnection(module._socket_path)
else:
module.fail_json(msg="Invalid connection type %s" % network_api)
return module._netconf_connection
def get_capabilities(module):
if hasattr(module, "_netconf_capabilities"):
return module._netconf_capabilities
capabilities = Connection(module._socket_path).get_capabilities()
module._netconf_capabilities = json.loads(capabilities)
return module._netconf_capabilities
def lock_configuration(module, target=None):
conn = get_connection(module)
return conn.lock(target=target)
def unlock_configuration(module, target=None):
conn = get_connection(module)
return conn.unlock(target=target)
@contextmanager
def locked_config(module, target=None):
try:
lock_configuration(module, target=target)
yield
finally:
unlock_configuration(module, target=target)
def get_config(module, source, filter=None, lock=False):
conn = get_connection(module)
try:
locked = False
if lock:
conn.lock(target=source)
locked = True
response = conn.get_config(source=source, filter=filter)
except ConnectionError as e:
module.fail_json(
msg=to_text(e, errors="surrogate_then_replace").strip()
)
finally:
if locked:
conn.unlock(target=source)
return response
def get(module, filter, lock=False):
conn = get_connection(module)
try:
locked = False
if lock:
conn.lock(target="running")
locked = True
response = conn.get(filter=filter)
except ConnectionError as e:
module.fail_json(
msg=to_text(e, errors="surrogate_then_replace").strip()
)
finally:
if locked:
conn.unlock(target="running")
return response
def dispatch(module, request):
conn = get_connection(module)
try:
response = conn.dispatch(request)
except ConnectionError as e:
module.fail_json(
msg=to_text(e, errors="surrogate_then_replace").strip()
)
return response
def sanitize_xml(data):
tree = fromstring(
to_bytes(deepcopy(data), errors="surrogate_then_replace")
)
for element in tree.getiterator():
# remove attributes
attribute = element.attrib
if attribute:
for key in list(attribute):
if key not in IGNORE_XML_ATTRIBUTE:
attribute.pop(key)
return to_text(tostring(tree), errors="surrogate_then_replace").strip()

@ -1,61 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# (c) 2018 Red Hat Inc.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
from ansible.module_utils.connection import Connection
def get(module, path=None, content=None, fields=None, output="json"):
if path is None:
raise ValueError("path value must be provided")
if content:
path += "?" + "content=%s" % content
if fields:
path += "?" + "field=%s" % fields
accept = None
if output == "xml":
accept = "application/yang-data+xml"
connection = Connection(module._socket_path)
return connection.send_request(
None, path=path, method="GET", accept=accept
)
def edit_config(module, path=None, content=None, method="GET", format="json"):
if path is None:
raise ValueError("path value must be provided")
content_type = None
if format == "xml":
content_type = "application/yang-data+xml"
connection = Connection(module._socket_path)
return connection.send_request(
content, path=path, method=method, content_type=content_type
)

@ -1,71 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# (c) 2018, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {
"metadata_version": "1.1",
"status": ["preview"],
"supported_by": "network",
}
DOCUMENTATION = """module: net_get
author: Deepak Agrawal (@dagrawal)
short_description: Copy a file from a network device to Ansible Controller
description:
- This module provides functionality to copy file from network device to ansible controller.
extends_documentation_fragment:
- ansible.netcommon.network_agnostic
options:
src:
description:
- Specifies the source file. The path to the source file can either be the full
path on the network device or a relative path as per path supported by destination
network device.
required: true
protocol:
description:
- Protocol used to transfer file.
default: scp
choices:
- scp
- sftp
dest:
description:
- Specifies the destination file. The path to the destination file can either
be the full path on the Ansible control host or a relative path from the playbook
or role root directory.
default:
- Same filename as specified in I(src). The path will be playbook root or role
root directory if playbook is part of a role.
requirements:
- scp
notes:
- Some devices need specific configurations to be enabled before scp can work These
configuration should be pre-configured before using this module e.g ios - C(ip scp
server enable).
- User privilege to do scp on network device should be pre-configured e.g. ios - need
user privilege 15 by default for allowing scp.
- Default destination of source file.
"""
EXAMPLES = """
- name: copy file from the network device to Ansible controller
net_get:
src: running_cfg_ios1.txt
- name: copy file from ios to common location at /tmp
net_get:
src: running_cfg_sw1.txt
dest : /tmp/ios1.txt
"""
RETURN = """
"""

@ -1,82 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# (c) 2018, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {
"metadata_version": "1.1",
"status": ["preview"],
"supported_by": "network",
}
DOCUMENTATION = """module: net_put
author: Deepak Agrawal (@dagrawal)
short_description: Copy a file from Ansible Controller to a network device
description:
- This module provides functionality to copy file from Ansible controller to network
devices.
extends_documentation_fragment:
- ansible.netcommon.network_agnostic
options:
src:
description:
- Specifies the source file. The path to the source file can either be the full
path on the Ansible control host or a relative path from the playbook or role
root directory.
required: true
protocol:
description:
- Protocol used to transfer file.
default: scp
choices:
- scp
- sftp
dest:
description:
- Specifies the destination file. The path to destination file can either be the
full path or relative path as supported by network_os.
default:
- Filename from src and at default directory of user shell on network_os.
required: false
mode:
description:
- Set the file transfer mode. If mode is set to I(text) then I(src) file will
go through Jinja2 template engine to replace any vars if present in the src
file. If mode is set to I(binary) then file will be copied as it is to destination
device.
default: binary
choices:
- binary
- text
requirements:
- scp
notes:
- Some devices need specific configurations to be enabled before scp can work These
configuration should be pre-configured before using this module e.g ios - C(ip scp
server enable).
- User privilege to do scp on network device should be pre-configured e.g. ios - need
user privilege 15 by default for allowing scp.
- Default destination of source file.
"""
EXAMPLES = """
- name: copy file from ansible controller to a network device
net_put:
src: running_cfg_ios1.txt
- name: copy file at root dir of flash in slot 3 of sw1(ios)
net_put:
src: running_cfg_sw1.txt
protocol: sftp
dest : flash3:/running_cfg_sw1.txt
"""
RETURN = """
"""

@ -1,70 +0,0 @@
#
# (c) 2017 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """author: Ansible Networking Team
netconf: default
short_description: Use default netconf plugin to run standard netconf commands as
per RFC
description:
- This default plugin provides low level abstraction apis for sending and receiving
netconf commands as per Netconf RFC specification.
options:
ncclient_device_handler:
type: str
default: default
description:
- Specifies the ncclient device handler name for network os that support default
netconf implementation as per Netconf RFC specification. To identify the ncclient
device handler name refer ncclient library documentation.
"""
import json
from ansible.module_utils._text import to_text
from ansible.plugins.netconf import NetconfBase
class Netconf(NetconfBase):
def get_text(self, ele, tag):
try:
return to_text(
ele.find(tag).text, errors="surrogate_then_replace"
).strip()
except AttributeError:
pass
def get_device_info(self):
device_info = dict()
device_info["network_os"] = "default"
return device_info
def get_capabilities(self):
result = dict()
result["rpc"] = self.get_base_rpc()
result["network_api"] = "netconf"
result["device_info"] = self.get_device_info()
result["server_capabilities"] = [c for c in self.m.server_capabilities]
result["client_capabilities"] = [c for c in self.m.client_capabilities]
result["session_id"] = self.m.session_id
result["device_operations"] = self.get_device_operations(
result["server_capabilities"]
)
return json.dumps(result)
Loading…
Cancel
Save