mirror of https://github.com/ansible/ansible.git
New options of na_ontap_aggregate (#48906)
* add new options for na_ontap_aggregate * add gpl line * remove dup option * Put files in wrong directory * change unit test to match the request from PR 48941 * Changed for review comments * pep8pull/49422/head
parent
34c57b4c42
commit
62dd1fe29e
@ -0,0 +1,214 @@
|
||||
""" unit tests for Ansible module: na_ontap_aggregate """
|
||||
|
||||
from __future__ import print_function
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from units.compat import unittest
|
||||
from units.compat.mock import patch, Mock
|
||||
from ansible.module_utils import basic
|
||||
from ansible.module_utils._text import to_bytes
|
||||
import ansible.module_utils.netapp as netapp_utils
|
||||
|
||||
from ansible.modules.storage.netapp.na_ontap_aggregate \
|
||||
import NetAppOntapAggregate as my_module # module under test
|
||||
|
||||
if not netapp_utils.has_netapp_lib():
|
||||
pytestmark = pytest.skip('skipping as missing required netapp_lib')
|
||||
|
||||
|
||||
def set_module_args(args):
|
||||
"""prepare arguments so that they will be picked up during module creation"""
|
||||
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
|
||||
basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
|
||||
|
||||
|
||||
class AnsibleExitJson(Exception):
|
||||
"""Exception class to be raised by module.exit_json and caught by the test case"""
|
||||
pass
|
||||
|
||||
|
||||
class AnsibleFailJson(Exception):
|
||||
"""Exception class to be raised by module.fail_json and caught by the test case"""
|
||||
pass
|
||||
|
||||
|
||||
def exit_json(*args, **kwargs): # pylint: disable=unused-argument
|
||||
"""function to patch over exit_json; package return data into an exception"""
|
||||
if 'changed' not in kwargs:
|
||||
kwargs['changed'] = False
|
||||
raise AnsibleExitJson(kwargs)
|
||||
|
||||
|
||||
def fail_json(*args, **kwargs): # pylint: disable=unused-argument
|
||||
"""function to patch over fail_json; package return data into an exception"""
|
||||
kwargs['failed'] = True
|
||||
raise AnsibleFailJson(kwargs)
|
||||
|
||||
|
||||
class MockONTAPConnection(object):
|
||||
''' mock server connection to ONTAP host '''
|
||||
|
||||
def __init__(self, kind=None, parm1=None, parm2=None):
|
||||
''' save arguments '''
|
||||
self.type = kind
|
||||
self.parm1 = parm1
|
||||
self.parm2 = parm2
|
||||
self.xml_in = None
|
||||
self.xml_out = None
|
||||
|
||||
def invoke_successfully(self, xml, enable_tunneling): # pylint: disable=unused-argument
|
||||
''' mock invoke_successfully returning xml data '''
|
||||
self.xml_in = xml
|
||||
if self.type == 'aggregate':
|
||||
xml = self.build_aggregate_info(self.parm1, self.parm2)
|
||||
elif self.type == 'aggregate_fail':
|
||||
raise netapp_utils.zapi.NaApiError(code='TEST', message="This exception is from the unit test")
|
||||
self.xml_out = xml
|
||||
return xml
|
||||
|
||||
@staticmethod
|
||||
def build_aggregate_info(vserver, aggregate):
|
||||
''' build xml data for aggregatte and vserser-info '''
|
||||
xml = netapp_utils.zapi.NaElement('xml')
|
||||
data = {'num-records': 2,
|
||||
'attributes-list':
|
||||
{'aggr-attributes':
|
||||
{'aggregate-name': aggregate,
|
||||
'aggr-raid-attributes':
|
||||
{'state': 'offline'
|
||||
}
|
||||
},
|
||||
},
|
||||
'vserver-info':
|
||||
{'vserver-name': vserver
|
||||
}
|
||||
}
|
||||
xml.translate_struct(data)
|
||||
print(xml.to_string())
|
||||
return xml
|
||||
|
||||
|
||||
class TestMyModule(unittest.TestCase):
|
||||
''' a group of related Unit Tests '''
|
||||
|
||||
def setUp(self):
|
||||
self.mock_module_helper = patch.multiple(basic.AnsibleModule,
|
||||
exit_json=exit_json,
|
||||
fail_json=fail_json)
|
||||
self.mock_module_helper.start()
|
||||
self.addCleanup(self.mock_module_helper.stop)
|
||||
self.server = MockONTAPConnection('aggregate', '12', 'name')
|
||||
# whether to use a mock or a simulator
|
||||
self.onbox = False
|
||||
|
||||
def set_default_args(self):
|
||||
if self.onbox:
|
||||
hostname = '10.193.74.78'
|
||||
username = 'admin'
|
||||
password = 'netapp1!'
|
||||
name = 'name'
|
||||
else:
|
||||
hostname = 'hostname'
|
||||
username = 'username'
|
||||
password = 'password'
|
||||
name = 'name'
|
||||
return dict({
|
||||
'hostname': hostname,
|
||||
'username': username,
|
||||
'password': password,
|
||||
'name': name
|
||||
})
|
||||
|
||||
def call_command(self, module_args):
|
||||
''' utility function to call apply '''
|
||||
module_args.update(self.set_default_args())
|
||||
set_module_args(module_args)
|
||||
my_obj = my_module()
|
||||
my_obj.asup_log_for_cserver = Mock(return_value=None)
|
||||
if not self.onbox:
|
||||
# mock the connection
|
||||
my_obj.server = MockONTAPConnection('aggregate', '12', 'test_name')
|
||||
with pytest.raises(AnsibleExitJson) as exc:
|
||||
my_obj.apply()
|
||||
return exc.value.args[0]['changed']
|
||||
|
||||
def test_module_fail_when_required_args_missing(self):
|
||||
''' required arguments are reported as errors '''
|
||||
with pytest.raises(AnsibleFailJson) as exc:
|
||||
set_module_args({})
|
||||
my_module()
|
||||
print('Info: %s' % exc.value.args[0]['msg'])
|
||||
|
||||
def test_is_mirrored(self):
|
||||
module_args = {
|
||||
'disk_count': '2',
|
||||
'is_mirrored': 'true',
|
||||
}
|
||||
changed = self.call_command(module_args)
|
||||
assert not changed
|
||||
|
||||
def test_disks_list(self):
|
||||
module_args = {
|
||||
'disk_count': '2',
|
||||
'disks': ['1', '2'],
|
||||
}
|
||||
changed = self.call_command(module_args)
|
||||
assert not changed
|
||||
|
||||
def test_mirror_disks(self):
|
||||
module_args = {
|
||||
'disk_count': '2',
|
||||
'disks': ['1', '2'],
|
||||
'mirror_disks': ['3', '4']
|
||||
}
|
||||
changed = self.call_command(module_args)
|
||||
assert not changed
|
||||
|
||||
def test_spare_pool(self):
|
||||
module_args = {
|
||||
'disk_count': '2',
|
||||
'spare_pool': 'Pool1'
|
||||
}
|
||||
changed = self.call_command(module_args)
|
||||
assert not changed
|
||||
|
||||
def test_rename(self):
|
||||
module_args = {
|
||||
'from_name': 'test_name2'
|
||||
}
|
||||
changed = self.call_command(module_args)
|
||||
assert not changed
|
||||
|
||||
def test_if_all_methods_catch_exception(self):
|
||||
module_args = {}
|
||||
module_args.update(self.set_default_args())
|
||||
module_args.update({'service_state': 'online'})
|
||||
module_args.update({'unmount_volumes': 'True'})
|
||||
module_args.update({'from_name': 'test_name2'})
|
||||
set_module_args(module_args)
|
||||
my_obj = my_module()
|
||||
if not self.onbox:
|
||||
my_obj.server = MockONTAPConnection('aggregate_fail')
|
||||
with pytest.raises(AnsibleFailJson) as exc:
|
||||
my_obj.aggr_get_iter(module_args.get('name'))
|
||||
assert '' in exc.value.args[0]['msg']
|
||||
with pytest.raises(AnsibleFailJson) as exc:
|
||||
my_obj.aggregate_online()
|
||||
assert 'Error changing the state of aggregate' in exc.value.args[0]['msg']
|
||||
with pytest.raises(AnsibleFailJson) as exc:
|
||||
my_obj.aggregate_offline()
|
||||
assert 'Error changing the state of aggregate' in exc.value.args[0]['msg']
|
||||
with pytest.raises(AnsibleFailJson) as exc:
|
||||
my_obj.create_aggr()
|
||||
assert 'Error provisioning aggregate' in exc.value.args[0]['msg']
|
||||
with pytest.raises(AnsibleFailJson) as exc:
|
||||
my_obj.delete_aggr()
|
||||
assert 'Error removing aggregate' in exc.value.args[0]['msg']
|
||||
with pytest.raises(AnsibleFailJson) as exc:
|
||||
my_obj.rename_aggregate()
|
||||
assert 'Error renaming aggregate' in exc.value.args[0]['msg']
|
||||
with pytest.raises(AnsibleFailJson) as exc:
|
||||
my_obj.asup_log_for_cserver = Mock(return_value=None)
|
||||
my_obj.apply()
|
||||
assert 'Error renaming: aggregate test_name2 does not exist' in exc.value.args[0]['msg']
|
Loading…
Reference in New Issue