pull/58597/head
Chris Archibald 5 years ago
parent 826f224f02
commit 84848f8291

@ -1,350 +1,460 @@
#!/usr/bin/python
# (c) 2018-2019, NetApp, Inc
# (c) 2019, NetApp, Inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'certified'}
'supported_by': 'community'}
DOCUMENTATION = '''
module: na_ontap_firewall_policy
short_description: NetApp ONTAP Manage a firewall policy
version_added: '2.7'
author: NetApp Ansible Team (@carchi8py) <ng-ansibleteam@netapp.com>
description:
- Configure firewall on an ONTAP node and manage firewall policy for an ONTAP SVM
- Update ONTAP service-prosessor firmware
extends_documentation_fragment:
- netapp.na_ontap
requirements:
- Python package ipaddress. Install using 'pip install ipaddress'
module: na_ontap_firmware_upgrade
options:
state:
description:
- Whether to set up a firewall policy or not
choices: ['present', 'absent']
- Whether the specified ONTAP firmware should be upgraded or not.
default: present
allow_list:
type: str
node:
description:
- A list of IPs and masks to use.
- The host bits of the IP addresses used in this list must be set to 0.
policy:
- Node on which the device is located.
type: str
required: true
clear_logs:
description:
- A policy name for the firewall policy
service:
- Clear logs on the device after update. Default value is true
type: bool
default: true
package:
description:
- The service to apply the policy to
choices: ['dns', 'http', 'https', 'ndmp', 'ndmps', 'ntp', 'rsh', 'snmp', 'ssh', 'telnet']
vserver:
- Name of the package file containing the firmware to be installed. Not required when -baseline is true.
type: str
shelf_module_fw:
description:
- The Vserver to apply the policy to.
enable:
- Shelf module firmware to be updated to.
type: str
disk_fw:
description:
- enable firewall on a node
choices: ['enable', 'disable']
logging:
- disk firmware to be updated to.
type: str
update_type:
description:
- enable logging for firewall on a node
choices: ['enable', 'disable']
node:
- Type of firmware update to be performed. Options include serial_full, serial_differential, network_full.
type: str
install_baseline_image:
description:
- The node to run the firewall configuration on
- Install the version packaged with ONTAP if this parameter is set to true. Otherwise, package must be used to specify the package to install.
type: bool
default: false
firmware_type:
description:
- Type of firmware to be upgraded. Options include shelf, ACP, service-processor, and disk.
- For shelf firmware upgrade the operation is asynchronous, and therefore returns no errors that might occur during the download process.
- Shelf firmware upgrade is idempotent if shelf_module_fw is provided .
- disk firmware upgrade is idempotent if disk_fw is provided .
- With check mode, SP, ACP, disk, and shelf firmware upgrade is not idempotent.
- This operation will only update firmware on shelves/disk that do not have the latest firmware-revision.
choices: ['service-processor', 'shelf', 'acp', 'disk']
type: str
short_description: NetApp ONTAP firmware upgrade for SP, shelf, ACP, and disk.
version_added: "2.9"
'''
EXAMPLES = """
- name: create firewall Policy
na_ontap_firewall_policy:
- name: SP firmware upgrade
na_ontap_firmware_upgrade:
state: present
allow_list: [1.2.3.0/24,1.3.0.0/16]
policy: pizza
service: http
vserver: ci_dev
hostname: "{{ netapp hostname }}"
username: "{{ netapp username }}"
password: "{{ netapp password }}"
- name: Modify firewall Policy
na_ontap_firewall_policy:
node: vsim1
package: "{{ file name }}"
clear_logs: True
install_baseline_image: False
update_type: serial_full
firmware_type: service-processor
hostname: "{{ netapp_hostname }}"
username: "{{ netapp_username }}"
password: "{{ netapp_password }}"
- name: ACP firmware upgrade
na_ontap_firmware_upgrade:
state: present
allow_list: [1.5.3.0/24]
policy: pizza
service: http
vserver: ci_dev
hostname: "{{ netapp hostname }}"
username: "{{ netapp username }}"
password: "{{ netapp password }}"
- name: Destory firewall Policy
na_ontap_firewall_policy:
state: absent
policy: pizza
service: http
vserver: ci_dev
hostname: "{{ netapp hostname }}"
username: "{{ netapp username }}"
password: "{{ netapp password }}"
- name: Enable firewall and logging on a node
na_ontap_firewall_policy:
node: test-vsim1
enable: enable
logging: enable
hostname: "{{ netapp hostname }}"
username: "{{ netapp username }}"
password: "{{ netapp password }}"
node: vsim1
firmware_type: acp
hostname: "{{ netapp_hostname }}"
username: "{{ netapp_username }}"
password: "{{ netapp_password }}"
- name: shelf firmware upgrade
na_ontap_firmware_upgrade:
state: present
firmware_type: shelf
shelf_module_fw: 1221
hostname: "{{ netapp_hostname }}"
username: "{{ netapp_username }}"
password: "{{ netapp_password }}"
- name: disk firmware upgrade
na_ontap_firmware_upgrade:
state: present
firmware_type: disk
disk_fw: NA02
hostname: "{{ netapp_hostname }}"
username: "{{ netapp_username }}"
password: "{{ netapp_password }}"
"""
RETURN = """
"""
import traceback
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native
import ansible.module_utils.netapp as netapp_utils
from ansible.module_utils.netapp_module import NetAppModule
try:
import ipaddress
HAS_IPADDRESS_LIB = True
except ImportError:
HAS_IPADDRESS_LIB = False
import sys
import time
HAS_NETAPP_LIB = netapp_utils.has_netapp_lib()
class NetAppONTAPFirewallPolicy(object):
class NetAppONTAPFirmwareUpgrade(object):
"""
Class with ONTAP firmware upgrade methods
"""
def __init__(self):
self.argument_spec = netapp_utils.na_ontap_host_argument_spec()
self.argument_spec.update(dict(
state=dict(required=False, choices=['present', 'absent'], default='present'),
allow_list=dict(required=False, type="list"),
policy=dict(required=False, type='str'),
service=dict(required=False, type='str', choices=['dns', 'http', 'https', 'ndmp',
'ndmps', 'ntp', 'rsh', 'snmp', 'ssh', 'telnet']),
vserver=dict(required=False, type="str"),
enable=dict(required=False, type="str", choices=['enable', 'disable']),
logging=dict(required=False, type="str", choices=['enable', 'disable']),
node=dict(required=False, type="str")
state=dict(required=False, type='str', default='present'),
node=dict(required=False, type='str'),
firmware_type=dict(required=True, type='str', choices=['service-processor', 'shelf', 'acp', 'disk']),
clear_logs=dict(required=False, type='bool', default=True),
package=dict(required=False, type='str'),
install_baseline_image=dict(required=False, type='bool', default=False),
update_type=dict(required=False, type='str'),
shelf_module_fw=dict(required=False, type='str'),
disk_fw=dict(required=False, type='str')
))
self.module = AnsibleModule(
argument_spec=self.argument_spec,
required_together=(['policy', 'service', 'vserver'],
['enable', 'node']
),
required_if=[
('firmware_type', 'acp', ['node']),
('firmware_type', 'disk', ['node']),
('firmware_type', 'service-processor', ['node', 'update_type']),
],
supports_check_mode=True
)
self.na_helper = NetAppModule()
self.parameters = self.na_helper.set_parameters(self.module.params)
if self.parameters.get('firmware_type') == 'service-processor':
if self.parameters.get('install_baseline_image') and self.parameters.get('package') is not None:
self.module.fail_json(msg='Do not specify both package and install_baseline_image: true')
if not self.parameters.get('package') and self.parameters.get('install_baseline_image') == 'False':
self.module.fail_json(msg='Specify at least one of package or install_baseline_image')
if HAS_NETAPP_LIB is False:
self.module.fail_json(msg="the python NetApp-Lib module is required")
else:
self.server = netapp_utils.setup_na_ontap_zapi(module=self.module)
if HAS_IPADDRESS_LIB is False:
self.module.fail_json(msg="the python ipaddress lib is required for this module")
return
def validate_ip_addresses(self):
'''
Validate if the given IP address is a network address (i.e. it's host bits are set to 0)
ONTAP doesn't validate if the host bits are set,
and hence doesn't add a new address unless the IP is from a different network.
So this validation allows the module to be idempotent.
:return: None
'''
for ip in self.parameters['allow_list']:
# create an IPv4 object for current IP address
if sys.version_info[0] >= 3:
ip_addr = str(ip)
else:
ip_addr = unicode(ip) # pylint: disable=undefined-variable
# get network address from netmask, throw exception if address is not a network address
try:
ipaddress.ip_network(ip_addr)
except ValueError as exc:
self.module.fail_json(msg='Error: Invalid IP address value for allow_list parameter.'
'Please specify a network address without host bits set: %s'
% (to_native(exc)))
def get_firewall_policy(self):
def firmware_image_get_iter(self):
"""
Get a firewall policy
:return: returns a firewall policy object, or returns False if there are none
Compose NaElement object to query current firmware version
:return: NaElement object for firmware_image_get_iter with query
"""
net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-get-iter")
attributes = {
'query': {
'net-firewall-policy-info': self.firewall_policy_attributes()
}
}
net_firewall_policy_obj.translate_struct(attributes)
firmware_image_get = netapp_utils.zapi.NaElement('service-processor-get-iter')
query = netapp_utils.zapi.NaElement('query')
firmware_image_info = netapp_utils.zapi.NaElement('service-processor-info')
firmware_image_info.add_new_child('node', self.parameters['node'])
query.add_child_elem(firmware_image_info)
firmware_image_get.add_child_elem(query)
return firmware_image_get
def firmware_image_get(self, node_name):
"""
Get current firmware image info
:return: True if query successful, else return None
"""
firmware_image_get_iter = self.firmware_image_get_iter()
try:
result = self.server.invoke_successfully(net_firewall_policy_obj, True)
result = self.server.invoke_successfully(firmware_image_get_iter, enable_tunneling=True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg="Error getting firewall policy %s:%s" % (self.parameters['policy'],
to_native(error)),
self.module.fail_json(msg='Error fetching firmware image details: %s: %s'
% (self.parameters['node'], to_native(error)),
exception=traceback.format_exc())
if result.get_child_by_name('num-records') and int(result.get_child_content('num-records')) >= 1:
attributes_list = result.get_child_by_name('attributes-list')
policy_info = attributes_list.get_child_by_name('net-firewall-policy-info')
ips = self.na_helper.get_value_for_list(from_zapi=True,
zapi_parent=policy_info.get_child_by_name('allow-list'))
return {
'service': policy_info['service'],
'allow_list': ips}
# return firmware image details
if result.get_child_by_name('num-records') and int(result.get_child_content('num-records')) > 0:
sp_info = result.get_child_by_name('attributes-list').get_child_by_name('service-processor-info')
firmware_version = sp_info.get_child_content('firmware-version')
return firmware_version
return None
def create_firewall_policy(self):
def acp_firmware_required_get(self):
"""
Create a firewall policy for given vserver
:return: None
where acp firmware upgrade is required
:return: True is firmware upgrade is required else return None
"""
net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-create")
net_firewall_policy_obj.translate_struct(self.firewall_policy_attributes())
if self.parameters.get('allow_list'):
self.validate_ip_addresses()
net_firewall_policy_obj.add_child_elem(self.na_helper.get_value_for_list(from_zapi=False,
zapi_parent='allow-list',
zapi_child='ip-and-mask',
data=self.parameters['allow_list'])
)
acp_firmware_get_iter = netapp_utils.zapi.NaElement('storage-shelf-acp-module-get-iter')
query = netapp_utils.zapi.NaElement('query')
acp_info = netapp_utils.zapi.NaElement('storage-shelf-acp-module')
query.add_child_elem(acp_info)
acp_firmware_get_iter.add_child_elem(query)
try:
self.server.invoke_successfully(net_firewall_policy_obj, enable_tunneling=True)
result = self.server.invoke_successfully(acp_firmware_get_iter, enable_tunneling=True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg="Error creating Firewall Policy: %s" % (to_native(error)), exception=traceback.format_exc())
def destroy_firewall_policy(self):
self.module.fail_json(msg='Error fetching acp firmware details details: %s'
% (to_native(error)), exception=traceback.format_exc())
if result.get_child_by_name('attributes-list').get_child_by_name('storage-shelf-acp-module'):
acp_module_info = result.get_child_by_name('attributes-list').get_child_by_name(
'storage-shelf-acp-module')
state = acp_module_info.get_child_content('state')
if state == 'firmware_update_required':
# acp firmware version upgrade required
return True
return False
def sp_firmware_image_update_progress_get(self, node_name):
"""
Destroy a Firewall Policy from a vserver
:return: None
Get current firmware image update progress info
:return: Dictionary of firmware image update progress if query successful, else return None
"""
net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-destroy")
net_firewall_policy_obj.translate_struct(self.firewall_policy_attributes())
firmware_update_progress_get = netapp_utils.zapi.NaElement('service-processor-image-update-progress-get')
firmware_update_progress_get.add_new_child('node', self.parameters['node'])
firmware_update_progress_info = dict()
try:
self.server.invoke_successfully(net_firewall_policy_obj, enable_tunneling=True)
result = self.server.invoke_successfully(firmware_update_progress_get, enable_tunneling=True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg="Error destroying Firewall Policy: %s" % (to_native(error)), exception=traceback.format_exc())
self.module.fail_json(msg='Error fetching firmware image upgrade progress details: %s'
% (to_native(error)), exception=traceback.format_exc())
# return firmware image update progress details
if result.get_child_by_name('attributes').get_child_by_name('service-processor-image-update-progress-info'):
update_progress_info = result.get_child_by_name('attributes').get_child_by_name('service-processor-image-update-progress-info')
firmware_update_progress_info['is-in-progress'] = update_progress_info.get_child_content('is-in-progress')
firmware_update_progress_info['node'] = update_progress_info.get_child_content('node')
return firmware_update_progress_info
def shelf_firmware_info_get(self):
"""
Get the current firmware of shelf module
:return:dict with module id and firmware info
"""
shelf_id_fw_info = dict()
shelf_firmware_info_get = netapp_utils.zapi.NaElement('storage-shelf-info-get-iter')
desired_attributes = netapp_utils.zapi.NaElement('desired-attributes')
storage_shelf_info = netapp_utils.zapi.NaElement('storage-shelf-info')
shelf_module = netapp_utils.zapi.NaElement('shelf-modules')
shelf_module_info = netapp_utils.zapi.NaElement('storage-shelf-module-info')
shelf_module.add_child_elem(shelf_module_info)
storage_shelf_info.add_child_elem(shelf_module)
desired_attributes.add_child_elem(storage_shelf_info)
shelf_firmware_info_get.add_child_elem(desired_attributes)
def modify_firewall_policy(self, modify):
try:
result = self.server.invoke_successfully(shelf_firmware_info_get, enable_tunneling=True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg='Error fetching shelf module firmware details: %s'
% (to_native(error)), exception=traceback.format_exc())
if result.get_child_by_name('num-records') and int(result.get_child_content('num-records')) > 0:
shelf_info = result.get_child_by_name('attributes-list').get_child_by_name('storage-shelf-info')
if (shelf_info.get_child_by_name('shelf-modules') and
shelf_info.get_child_by_name('shelf-modules').get_child_by_name('storage-shelf-module-info')):
shelves = shelf_info['shelf-modules'].get_children()
for shelf in shelves:
shelf_id_fw_info[shelf.get_child_content('module-id')] = shelf.get_child_content('module-fw-revision')
return shelf_id_fw_info
def disk_firmware_info_get(self):
"""
Modify a firewall Policy on a vserver
:return: none
Get the current firmware of disks module
:return:
"""
self.validate_ip_addresses()
net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-modify")
net_firewall_policy_obj.translate_struct(self.firewall_policy_attributes())
net_firewall_policy_obj.add_child_elem(self.na_helper.get_value_for_list(from_zapi=False,
zapi_parent='allow-list',
zapi_child='ip-and-mask',
data=modify['allow_list']))
disk_id_fw_info = dict()
disk_firmware_info_get = netapp_utils.zapi.NaElement('storage-disk-get-iter')
desired_attributes = netapp_utils.zapi.NaElement('desired-attributes')
storage_disk_info = netapp_utils.zapi.NaElement('storage-disk-info')
disk_inv = netapp_utils.zapi.NaElement('disk-inventory-info')
storage_disk_info.add_child_elem(disk_inv)
desired_attributes.add_child_elem(storage_disk_info)
disk_firmware_info_get.add_child_elem(desired_attributes)
try:
self.server.invoke_successfully(net_firewall_policy_obj, enable_tunneling=True)
result = self.server.invoke_successfully(disk_firmware_info_get, enable_tunneling=True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg="Error modifying Firewall Policy: %s" % (to_native(error)), exception=traceback.format_exc())
self.module.fail_json(msg='Error fetching disk module firmware details: %s'
% (to_native(error)), exception=traceback.format_exc())
if result.get_child_by_name('num-records') and int(result.get_child_content('num-records')) > 0:
disk_info = result.get_child_by_name('attributes-list')
disks = disk_info.get_children()
for disk in disks:
disk_id_fw_info[disk.get_child_content('disk-uid')] = disk.get_child_by_name('disk-inventory-info').get_child_content('firmware-revision')
return disk_id_fw_info
def disk_firmware_required_get(self):
"""
Check weather disk firmware upgrade is required or not
:return: True if the firmware upgrade is required
"""
disk_firmware_info = self.disk_firmware_info_get()
for disk in disk_firmware_info:
if (disk_firmware_info[disk]) != self.parameters['disk_fw']:
return True
return False
def firewall_policy_attributes(self):
return {
'policy': self.parameters['policy'],
'service': self.parameters['service'],
'vserver': self.parameters['vserver'],
}
def shelf_firmware_required_get(self):
"""
Check weather shelf firmware upgrade is required or not
:return: True if the firmware upgrade is required
"""
shelf_firmware_info = self.shelf_firmware_info_get()
for module in shelf_firmware_info:
if (shelf_firmware_info[module]) != self.parameters['shelf_module_fw']:
return True
return False
def get_firewall_config_for_node(self):
def sp_firmware_image_update(self):
"""
Get firewall configuration on the node
:return: dict() with firewall config details
Update current firmware image
"""
if self.parameters.get('logging'):
if self.parameters.get('node') is None:
self.module.fail_json(msg='Error: Missing parameter \'node\' to modify firewall logging')
net_firewall_config_obj = netapp_utils.zapi.NaElement("net-firewall-config-get")
net_firewall_config_obj.add_new_child('node-name', self.parameters['node'])
firmware_update_info = netapp_utils.zapi.NaElement('service-processor-image-update')
if self.parameters.get('package') is not None:
firmware_update_info.add_new_child('package', self.parameters['package'])
if self.parameters.get('clear_logs') is not None:
firmware_update_info.add_new_child('clear-logs', str(self.parameters['clear_logs']))
if self.parameters.get('install_baseline_image') is not None:
firmware_update_info.add_new_child('install-baseline-image', str(self.parameters['install_baseline_image']))
firmware_update_info.add_new_child('node', self.parameters['node'])
firmware_update_info.add_new_child('update-type', self.parameters['update_type'])
try:
result = self.server.invoke_successfully(net_firewall_config_obj, True)
self.server.invoke_successfully(firmware_update_info, enable_tunneling=True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg="Error getting Firewall Configuration: %s" % (to_native(error)),
# Current firmware version matches the version to be installed
if to_native(error.code) == '13001' and (error.message.startswith('Service Processor update skipped')):
return False
self.module.fail_json(msg='Error updating firmware image for %s: %s'
% (self.parameters['node'], to_native(error)),
exception=traceback.format_exc())
if result.get_child_by_name('attributes'):
firewall_info = result['attributes'].get_child_by_name('net-firewall-config-info')
return {'enable': self.change_status_to_bool(firewall_info.get_child_content('is-enabled'), to_zapi=False),
'logging': self.change_status_to_bool(firewall_info.get_child_content('is-logging'), to_zapi=False)}
return None
return True
def modify_firewall_config(self, modify):
def shelf_firmware_upgrade(self):
"""
Modify the configuration of a firewall on node
:return: None
Upgrade shelf firmware image
"""
net_firewall_config_obj = netapp_utils.zapi.NaElement("net-firewall-config-modify")
net_firewall_config_obj.add_new_child('node-name', self.parameters['node'])
if modify.get('enable'):
net_firewall_config_obj.add_new_child('is-enabled', self.change_status_to_bool(self.parameters['enable']))
if modify.get('logging'):
net_firewall_config_obj.add_new_child('is-logging', self.change_status_to_bool(self.parameters['logging']))
shelf_firmware_update_info = netapp_utils.zapi.NaElement('storage-shelf-firmware-update')
try:
self.server.invoke_successfully(net_firewall_config_obj, enable_tunneling=True)
self.server.invoke_successfully(shelf_firmware_update_info, enable_tunneling=True)
return True
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg="Error modifying Firewall Config: %s" % (to_native(error)),
exception=traceback.format_exc())
self.module.fail_json(msg='Error updating shelf firmware image : %s'
% (to_native(error)), exception=traceback.format_exc())
def change_status_to_bool(self, input, to_zapi=True):
if to_zapi:
return 'true' if input == 'enable' else 'false'
else:
return 'enable' if input == 'true' else 'disable'
def acp_firmware_upgrade(self):
"""
Upgrade shelf firmware image
"""
acp_firmware_update_info = netapp_utils.zapi.NaElement('storage-shelf-acp-firmware-update')
acp_firmware_update_info.add_new_child('node-name', self.parameters['node'])
try:
self.server.invoke_successfully(acp_firmware_update_info, enable_tunneling=True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg='Error updating acp firmware image : %s'
% (to_native(error)), exception=traceback.format_exc())
def disk_firmware_upgrade(self):
"""
Upgrade disk firmware
"""
disk_firmware_update_info = netapp_utils.zapi.NaElement('disk-update-disk-fw')
disk_firmware_update_info.add_new_child('node-name', self.parameters['node'])
try:
self.server.invoke_successfully(disk_firmware_update_info, enable_tunneling=True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg='Error updating disk firmware image : %s'
% (to_native(error)), exception=traceback.format_exc())
return True
def autosupport_log(self):
"""
Autosupport log for software_update
:return:
"""
results = netapp_utils.get_cserver(self.server)
cserver = netapp_utils.setup_na_ontap_zapi(module=self.module, vserver=results)
netapp_utils.ems_log_event("na_ontap_firewall_policy", cserver)
netapp_utils.ems_log_event("na_ontap_firmware_upgrade", cserver)
def apply(self):
"""
Apply action to upgrade firmware
"""
changed = False
self.autosupport_log()
cd_action, modify, modify_config = None, None, None
if self.parameters.get('policy'):
current = self.get_firewall_policy()
cd_action = self.na_helper.get_cd_action(current, self.parameters)
if cd_action is None and self.parameters['state'] == 'present':
modify = self.na_helper.get_modified_attributes(current, self.parameters)
if self.parameters.get('node'):
current_config = self.get_firewall_config_for_node()
# firewall config for a node is always present, we cannot create or delete a firewall on a node
modify_config = self.na_helper.get_modified_attributes(current_config, self.parameters)
if self.na_helper.changed:
if self.module.check_mode:
pass
firmware_update_progress = dict()
if self.parameters.get('firmware_type') == 'service-processor':
# service-processor firmware upgrade
current = self.firmware_image_get(self.parameters['node'])
if self.parameters.get('state') == 'present' and current:
if not self.module.check_mode:
if self.sp_firmware_image_update():
changed = True
firmware_update_progress = self.sp_firmware_image_update_progress_get(self.parameters['node'])
while firmware_update_progress.get('is-in-progress') == 'true':
time.sleep(25)
firmware_update_progress = self.sp_firmware_image_update_progress_get(self.parameters['node'])
else:
# we don't know until we try the upgrade
changed = True
elif self.parameters.get('firmware_type') == 'shelf':
# shelf firmware upgrade
if self.parameters.get('shelf_module_fw'):
if self.shelf_firmware_required_get():
if not self.module.check_mode:
changed = self.shelf_firmware_upgrade()
else:
changed = True
else:
if not self.module.check_mode:
changed = self.shelf_firmware_upgrade()
else:
# we don't know until we try the upgrade -- assuming the worst
changed = True
elif self.parameters.get('firmware_type') == 'acp':
# acp firmware upgrade
if self.acp_firmware_required_get():
if not self.module.check_mode:
self.acp_firmware_upgrade()
changed = True
elif self.parameters.get('firmware_type') == 'disk':
# Disk firmware upgrade
if self.parameters.get('disk_fw'):
if self.disk_firmware_required_get():
if not self.module.check_mode:
changed = self.disk_firmware_upgrade()
else:
changed = True
else:
if cd_action == 'create':
self.create_firewall_policy()
elif cd_action == 'delete':
self.destroy_firewall_policy()
if not self.module.check_mode:
changed = self.disk_firmware_upgrade()
else:
if modify:
self.modify_firewall_policy(modify)
if modify_config:
self.modify_firewall_config(modify_config)
self.module.exit_json(changed=self.na_helper.changed)
# we don't know until we try the upgrade -- assuming the worst
changed = True
self.module.exit_json(changed=changed)
def main():
"""
Execute action from playbook
:return: nothing
"""
cg_obj = NetAppONTAPFirewallPolicy()
cg_obj.apply()
"""Execute action"""
community_obj = NetAppONTAPFirmwareUpgrade()
community_obj.apply()
if __name__ == '__main__':

@ -0,0 +1,290 @@
# (c) 2018, NetApp, Inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
''' unit tests ONTAP Ansible module: na_ontap_firmware_upgrade '''
from __future__ import print_function
import json
import pytest
from units.compat import unittest
from units.compat.mock import patch, Mock
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
import ansible.module_utils.netapp as netapp_utils
from ansible.modules.storage.netapp.na_ontap_firmware_upgrade\
import NetAppONTAPFirmwareUpgrade as my_module # module under test
if not netapp_utils.has_netapp_lib():
pytestmark = pytest.mark.skip('skipping as missing required netapp_lib')
def set_module_args(args):
"""prepare arguments so that they will be picked up during module creation"""
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
class AnsibleExitJson(Exception):
"""Exception class to be raised by module.exit_json and caught by the test case"""
pass
class AnsibleFailJson(Exception):
"""Exception class to be raised by module.fail_json and caught by the test case"""
pass
def exit_json(*args, **kwargs): # pylint: disable=unused-argument
"""function to patch over exit_json; package return data into an exception"""
if 'changed' not in kwargs:
kwargs['changed'] = False
raise AnsibleExitJson(kwargs)
def fail_json(*args, **kwargs): # pylint: disable=unused-argument
"""function to patch over fail_json; package return data into an exception"""
kwargs['failed'] = True
raise AnsibleFailJson(kwargs)
class MockONTAPConnection(object):
''' mock server connection to ONTAP host '''
def __init__(self, kind=None, parm1=None, parm2=None, parm3=None):
''' save arguments '''
self.type = kind
self.parm1 = parm1
self.parm2 = parm2
# self.parm3 = parm3
self.xml_in = None
self.xml_out = None
self.firmware_type = 'None'
def invoke_successfully(self, xml, enable_tunneling): # pylint: disable=unused-argument
''' mock invoke_successfully returning xml data '''
self.xml_in = xml
if self.type == 'firmware_upgrade':
xml = self.build_firmware_upgrade_info(self.parm1, self.parm2)
if self.type == 'acp':
xml = self.build_acp_firmware_info(self.firmware_type)
self.xml_out = xml
return xml
def autosupport_log(self):
''' mock autosupport log'''
return None
@staticmethod
def build_firmware_upgrade_info(version, node):
''' build xml data for service-processor firmware info '''
xml = netapp_utils.zapi.NaElement('xml')
data = {
'num-records': 1,
'attributes-list': {'service-processor-info': {'firmware-version': '3.4'}}
}
xml.translate_struct(data)
print(xml.to_string())
return xml
@staticmethod
def build_acp_firmware_info(firmware_type):
''' build xml data for acp firmware info '''
xml = netapp_utils.zapi.NaElement('xml')
data = {
# 'num-records': 1,
'attributes-list': {'storage-shelf-acp-module': {'state': 'firmware_update_required'}}
}
xml.translate_struct(data)
print(xml.to_string())
return xml
class TestMyModule(unittest.TestCase):
''' a group of related Unit Tests '''
def setUp(self):
self.mock_module_helper = patch.multiple(basic.AnsibleModule,
exit_json=exit_json,
fail_json=fail_json)
self.mock_module_helper.start()
self.addCleanup(self.mock_module_helper.stop)
self.server = MockONTAPConnection()
self.use_vsim = False
def set_default_args(self):
if self.use_vsim:
hostname = '10.10.10.10'
username = 'admin'
password = 'admin'
node = 'vsim1'
clear_logs = True
package = 'test1.zip'
install_baseline_image = False
update_type = 'serial_full'
else:
hostname = 'hostname'
username = 'username'
password = 'password'
node = 'abc'
package = 'test1.zip'
clear_logs = True
install_baseline_image = False
update_type = 'serial_full'
return dict({
'hostname': hostname,
'username': username,
'password': password,
'node': node,
'package': package,
'clear_logs': clear_logs,
'install_baseline_image': install_baseline_image,
'update_type': update_type,
'https': 'true'
})
def test_module_fail_when_required_args_missing(self):
''' required arguments are reported as errors '''
with pytest.raises(AnsibleFailJson) as exc:
set_module_args({})
my_module()
print('Info: %s' % exc.value.args[0]['msg'])
def test_missing_parameters(self):
''' fail if firmware_type is missing '''
module_args = {}
module_args.update(self.set_default_args())
set_module_args(module_args)
with pytest.raises(AnsibleFailJson) as exc:
set_module_args(module_args)
my_module()
msg = 'missing required arguments: firmware_type'
print('Info: %s' % exc.value.args[0]['msg'])
assert exc.value.args[0]['msg'] == msg
def test_invalid_firmware_type_parameters(self):
''' fail if firmware_type is missing '''
module_args = {}
module_args.update(self.set_default_args())
module_args.update({'firmware_type': 'service_test'})
set_module_args(module_args)
with pytest.raises(AnsibleFailJson) as exc:
set_module_args(module_args)
my_module()
msg = 'value of firmware_type must be one of: service-processor, shelf, acp, disk, got: %s' % module_args['firmware_type']
print('Info: %s' % exc.value.args[0]['msg'])
assert exc.value.args[0]['msg'] == msg
def test_ensure_sp_firmware_get_called(self):
''' a more interesting test '''
module_args = {}
module_args.update(self.set_default_args())
module_args.update({'firmware_type': 'service-processor'})
set_module_args(module_args)
my_obj = my_module()
my_obj.server = self.server
firmware_image_get = my_obj.firmware_image_get('node')
print('Info: test_firmware_upgrade_get: %s' % repr(firmware_image_get))
assert firmware_image_get is None
def test_ensure_firmware_get_with_package_baseline_called(self):
''' a more interesting test '''
module_args = {}
module_args.update(self.set_default_args())
module_args.update({'firmware_type': 'service-processor'})
module_args.update({'package': 'test1.zip'})
module_args.update({'install_baseline_image': True})
with pytest.raises(AnsibleFailJson) as exc:
set_module_args(module_args)
my_module()
msg = 'Do not specify both package and install_baseline_image: true'
print('info: ' + exc.value.args[0]['msg'])
assert exc.value.args[0]['msg'] == msg
def test_ensure_acp_firmware_required_get_called(self):
''' a test tp verify acp firmware upgrade is required or not '''
module_args = {}
module_args.update(self.set_default_args())
module_args.update({'firmware_type': 'acp'})
set_module_args(module_args)
my_obj = my_module()
# my_obj.server = self.server
my_obj.server = MockONTAPConnection(kind='acp')
acp_firmware_required_get = my_obj.acp_firmware_required_get()
print('Info: test_acp_firmware_upgrade_required_get: %s' % repr(acp_firmware_required_get))
assert acp_firmware_required_get is True
@patch('ansible.modules.storage.netapp.na_ontap_firmware_upgrade.NetAppONTAPFirmwareUpgrade.sp_firmware_image_update')
@patch('ansible.modules.storage.netapp.na_ontap_firmware_upgrade.NetAppONTAPFirmwareUpgrade.sp_firmware_image_update_progress_get')
def test_ensure_apply_for_firmware_upgrade_called(self, get_mock, update_mock):
''' updgrading firmware and checking idempotency '''
module_args = {}
module_args.update(self.set_default_args())
module_args.update({'package': 'test1.zip'})
module_args.update({'firmware_type': 'service-processor'})
set_module_args(module_args)
my_obj = my_module()
my_obj.autosupport_log = Mock(return_value=None)
if not self.use_vsim:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
print('Info: test_firmware_upgrade_apply: %s' % repr(exc.value))
assert not exc.value.args[0]['changed']
if not self.use_vsim:
my_obj.server = MockONTAPConnection('firmware_upgrade', '3.5', 'true')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
print('Info: test_firmware_upgrade_apply: %s' % repr(exc.value))
assert exc.value.args[0]['changed']
update_mock.assert_called_with()
def test_shelf_firmware_upgrade(self):
''' Test shelf firmware upgrade '''
module_args = {}
module_args.update(self.set_default_args())
module_args.update({'firmware_type': 'shelf'})
set_module_args(module_args)
my_obj = my_module()
my_obj.autosupport_log = Mock(return_value=None)
if not self.use_vsim:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
print('Info: test_firmware_upgrade_apply: %s' % repr(exc.value))
assert exc.value.args[0]['changed']
@patch('ansible.modules.storage.netapp.na_ontap_firmware_upgrade.NetAppONTAPFirmwareUpgrade.acp_firmware_upgrade')
@patch('ansible.modules.storage.netapp.na_ontap_firmware_upgrade.NetAppONTAPFirmwareUpgrade.acp_firmware_required_get')
def test_acp_firmware_upgrade(self, get_mock, update_mock):
''' Test ACP firmware upgrade '''
module_args = {}
module_args.update(self.set_default_args())
module_args.update({'firmware_type': 'acp'})
set_module_args(module_args)
my_obj = my_module()
my_obj.autosupport_log = Mock(return_value=None)
if not self.use_vsim:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
print('Info: test_firmware_upgrade_apply: %s' % repr(exc.value))
assert exc.value.args[0]['changed']
@patch('ansible.modules.storage.netapp.na_ontap_firmware_upgrade.NetAppONTAPFirmwareUpgrade.disk_firmware_upgrade')
def test_disk_firmware_upgrade(self, get_mock):
''' Test disk firmware upgrade '''
module_args = {}
module_args.update(self.set_default_args())
module_args.update({'firmware_type': 'disk'})
set_module_args(module_args)
my_obj = my_module()
my_obj.autosupport_log = Mock(return_value=None)
if not self.use_vsim:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
print('Info: test_firmware_upgrade_apply: %s' % repr(exc.value))
assert exc.value.args[0]['changed']
Loading…
Cancel
Save