mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
215 lines
7.5 KiB
Python
215 lines
7.5 KiB
Python
""" unit tests for Ansible module: na_ontap_aggregate """
|
|
|
|
from __future__ import print_function
|
|
import json
|
|
import pytest
|
|
|
|
from units.compat import unittest
|
|
from units.compat.mock import patch, Mock
|
|
from ansible.module_utils import basic
|
|
from ansible.module_utils._text import to_bytes
|
|
import ansible.module_utils.netapp as netapp_utils
|
|
|
|
from ansible.modules.storage.netapp.na_ontap_aggregate \
|
|
import NetAppOntapAggregate as my_module # module under test
|
|
|
|
if not netapp_utils.has_netapp_lib():
|
|
pytestmark = pytest.skip('skipping as missing required netapp_lib')
|
|
|
|
|
|
def set_module_args(args):
|
|
"""prepare arguments so that they will be picked up during module creation"""
|
|
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
|
|
basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
|
|
|
|
|
|
class AnsibleExitJson(Exception):
|
|
"""Exception class to be raised by module.exit_json and caught by the test case"""
|
|
pass
|
|
|
|
|
|
class AnsibleFailJson(Exception):
|
|
"""Exception class to be raised by module.fail_json and caught by the test case"""
|
|
pass
|
|
|
|
|
|
def exit_json(*args, **kwargs): # pylint: disable=unused-argument
|
|
"""function to patch over exit_json; package return data into an exception"""
|
|
if 'changed' not in kwargs:
|
|
kwargs['changed'] = False
|
|
raise AnsibleExitJson(kwargs)
|
|
|
|
|
|
def fail_json(*args, **kwargs): # pylint: disable=unused-argument
|
|
"""function to patch over fail_json; package return data into an exception"""
|
|
kwargs['failed'] = True
|
|
raise AnsibleFailJson(kwargs)
|
|
|
|
|
|
class MockONTAPConnection(object):
|
|
''' mock server connection to ONTAP host '''
|
|
|
|
def __init__(self, kind=None, parm1=None, parm2=None):
|
|
''' save arguments '''
|
|
self.type = kind
|
|
self.parm1 = parm1
|
|
self.parm2 = parm2
|
|
self.xml_in = None
|
|
self.xml_out = None
|
|
|
|
def invoke_successfully(self, xml, enable_tunneling): # pylint: disable=unused-argument
|
|
''' mock invoke_successfully returning xml data '''
|
|
self.xml_in = xml
|
|
if self.type == 'aggregate':
|
|
xml = self.build_aggregate_info(self.parm1, self.parm2)
|
|
elif self.type == 'aggregate_fail':
|
|
raise netapp_utils.zapi.NaApiError(code='TEST', message="This exception is from the unit test")
|
|
self.xml_out = xml
|
|
return xml
|
|
|
|
@staticmethod
|
|
def build_aggregate_info(vserver, aggregate):
|
|
''' build xml data for aggregatte and vserser-info '''
|
|
xml = netapp_utils.zapi.NaElement('xml')
|
|
data = {'num-records': 2,
|
|
'attributes-list':
|
|
{'aggr-attributes':
|
|
{'aggregate-name': aggregate,
|
|
'aggr-raid-attributes':
|
|
{'state': 'offline'
|
|
}
|
|
},
|
|
},
|
|
'vserver-info':
|
|
{'vserver-name': vserver
|
|
}
|
|
}
|
|
xml.translate_struct(data)
|
|
print(xml.to_string())
|
|
return xml
|
|
|
|
|
|
class TestMyModule(unittest.TestCase):
|
|
''' a group of related Unit Tests '''
|
|
|
|
def setUp(self):
|
|
self.mock_module_helper = patch.multiple(basic.AnsibleModule,
|
|
exit_json=exit_json,
|
|
fail_json=fail_json)
|
|
self.mock_module_helper.start()
|
|
self.addCleanup(self.mock_module_helper.stop)
|
|
self.server = MockONTAPConnection('aggregate', '12', 'name')
|
|
# whether to use a mock or a simulator
|
|
self.onbox = False
|
|
|
|
def set_default_args(self):
|
|
if self.onbox:
|
|
hostname = '10.193.74.78'
|
|
username = 'admin'
|
|
password = 'netapp1!'
|
|
name = 'name'
|
|
else:
|
|
hostname = 'hostname'
|
|
username = 'username'
|
|
password = 'password'
|
|
name = 'name'
|
|
return dict({
|
|
'hostname': hostname,
|
|
'username': username,
|
|
'password': password,
|
|
'name': name
|
|
})
|
|
|
|
def call_command(self, module_args):
|
|
''' utility function to call apply '''
|
|
module_args.update(self.set_default_args())
|
|
set_module_args(module_args)
|
|
my_obj = my_module()
|
|
my_obj.asup_log_for_cserver = Mock(return_value=None)
|
|
if not self.onbox:
|
|
# mock the connection
|
|
my_obj.server = MockONTAPConnection('aggregate', '12', 'test_name')
|
|
with pytest.raises(AnsibleExitJson) as exc:
|
|
my_obj.apply()
|
|
return exc.value.args[0]['changed']
|
|
|
|
def test_module_fail_when_required_args_missing(self):
|
|
''' required arguments are reported as errors '''
|
|
with pytest.raises(AnsibleFailJson) as exc:
|
|
set_module_args({})
|
|
my_module()
|
|
print('Info: %s' % exc.value.args[0]['msg'])
|
|
|
|
def test_is_mirrored(self):
|
|
module_args = {
|
|
'disk_count': '2',
|
|
'is_mirrored': 'true',
|
|
}
|
|
changed = self.call_command(module_args)
|
|
assert not changed
|
|
|
|
def test_disks_list(self):
|
|
module_args = {
|
|
'disk_count': '2',
|
|
'disks': ['1', '2'],
|
|
}
|
|
changed = self.call_command(module_args)
|
|
assert not changed
|
|
|
|
def test_mirror_disks(self):
|
|
module_args = {
|
|
'disk_count': '2',
|
|
'disks': ['1', '2'],
|
|
'mirror_disks': ['3', '4']
|
|
}
|
|
changed = self.call_command(module_args)
|
|
assert not changed
|
|
|
|
def test_spare_pool(self):
|
|
module_args = {
|
|
'disk_count': '2',
|
|
'spare_pool': 'Pool1'
|
|
}
|
|
changed = self.call_command(module_args)
|
|
assert not changed
|
|
|
|
def test_rename(self):
|
|
module_args = {
|
|
'from_name': 'test_name2'
|
|
}
|
|
changed = self.call_command(module_args)
|
|
assert not changed
|
|
|
|
def test_if_all_methods_catch_exception(self):
|
|
module_args = {}
|
|
module_args.update(self.set_default_args())
|
|
module_args.update({'service_state': 'online'})
|
|
module_args.update({'unmount_volumes': 'True'})
|
|
module_args.update({'from_name': 'test_name2'})
|
|
set_module_args(module_args)
|
|
my_obj = my_module()
|
|
if not self.onbox:
|
|
my_obj.server = MockONTAPConnection('aggregate_fail')
|
|
with pytest.raises(AnsibleFailJson) as exc:
|
|
my_obj.aggr_get_iter(module_args.get('name'))
|
|
assert '' in exc.value.args[0]['msg']
|
|
with pytest.raises(AnsibleFailJson) as exc:
|
|
my_obj.aggregate_online()
|
|
assert 'Error changing the state of aggregate' in exc.value.args[0]['msg']
|
|
with pytest.raises(AnsibleFailJson) as exc:
|
|
my_obj.aggregate_offline()
|
|
assert 'Error changing the state of aggregate' in exc.value.args[0]['msg']
|
|
with pytest.raises(AnsibleFailJson) as exc:
|
|
my_obj.create_aggr()
|
|
assert 'Error provisioning aggregate' in exc.value.args[0]['msg']
|
|
with pytest.raises(AnsibleFailJson) as exc:
|
|
my_obj.delete_aggr()
|
|
assert 'Error removing aggregate' in exc.value.args[0]['msg']
|
|
with pytest.raises(AnsibleFailJson) as exc:
|
|
my_obj.rename_aggregate()
|
|
assert 'Error renaming aggregate' in exc.value.args[0]['msg']
|
|
with pytest.raises(AnsibleFailJson) as exc:
|
|
my_obj.asup_log_for_cserver = Mock(return_value=None)
|
|
my_obj.apply()
|
|
assert 'Error renaming: aggregate test_name2 does not exist' in exc.value.args[0]['msg']
|