rhn_channel/register: Porting tests to pytest (#33719)

* rhn_channel/register: Porting tests to pytest

Related: #33387

* rhn unit tests: use mock_request as a fixture
pull/33820/head
Pilou 7 years ago committed by Toshio Kuratomi
parent a2650cbe05
commit 58fdbe7415

@ -1,16 +1,18 @@
import json
from ansible.compat.tests.mock import patch from ansible.compat.tests.mock import patch
from ansible.module_utils import basic
from ansible.module_utils.six.moves import xmlrpc_client from ansible.module_utils.six.moves import xmlrpc_client
from ansible.module_utils._text import to_bytes
import pytest
def get_method_name(request_body): def get_method_name(request_body):
return xmlrpc_client.loads(request_body)[1] return xmlrpc_client.loads(request_body)[1]
def mock_request(responses, module_name): @pytest.fixture
def mock_request(request, mocker):
responses = request.getfuncargvalue('testcase')['calls']
module_name = request.module.TESTED_MODULE
def transport_request(host, handler, request_body, verbose=0): def transport_request(host, handler, request_body, verbose=0):
"""Fake request""" """Fake request"""
method_name = get_method_name(request_body) method_name = get_method_name(request_body)
@ -24,4 +26,4 @@ def mock_request(responses, module_name):
raise Exception('Expected call: %r, called with: %r' % (excepted_name, method_name)) raise Exception('Expected call: %r, called with: %r' % (excepted_name, method_name))
target = '{0}.xmlrpc_client.Transport.request'.format(module_name) target = '{0}.xmlrpc_client.Transport.request'.format(module_name)
return patch(target, side_effect=transport_request) mocker.patch(target, side_effect=transport_request)

@ -1,61 +1,63 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Pierre-Louis Bonicoli <pierre-louis@libregerbil.fr>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from itertools import product
import json import json
from ansible.modules.packaging.os import rhn_channel from ansible.modules.packaging.os import rhn_channel
from units.modules.packaging.utils import mock_request import pytest
from units.modules.utils import set_module_args, AnsibleExitJson, AnsibleFailJson, ModuleTestCase
class TestRhnChannel(ModuleTestCase): pytestmark = pytest.mark.usefixtures('patch_ansible_module')
def setUp(self):
super(TestRhnChannel, self).setUp()
self.module = rhn_channel @pytest.mark.parametrize('patch_ansible_module', [{}], indirect=['patch_ansible_module'])
self.module.HAS_UP2DATE_CLIENT = True def test_without_required_parameters(capfd):
with pytest.raises(SystemExit):
rhn_channel.main()
out, err = capfd.readouterr()
results = json.loads(out)
assert results['failed']
assert 'missing required arguments' in results['msg']
def test_without_required_parameters(self):
"""Failure must occurs when all parameters are missing"""
with self.assertRaises(AnsibleFailJson):
set_module_args({})
self.module.main()
def test_channel_already_here(self): TESTED_MODULE = rhn_channel.__name__
"""Check that result isn't changed""" TEST_CASES = [
set_module_args({ [
# add channel already added, check that result isn't changed
{
'name': 'rhel-x86_64-server-6', 'name': 'rhel-x86_64-server-6',
'sysname': 'server01', 'sysname': 'server01',
'url': 'https://rhn.redhat.com/rpc/api', 'url': 'https://rhn.redhat.com/rpc/api',
'user': 'user', 'user': 'user',
'password': 'pass', 'password': 'pass',
}) },
{
responses = [ 'calls': [
('auth.login', ['X' * 43]), ('auth.login', ['X' * 43]),
('system.listUserSystems', ('system.listUserSystems',
[[{'last_checkin': '2017-08-06 19:49:52.0', 'id': '0123456789', 'name': 'server01'}]]), [[{'last_checkin': '2017-08-06 19:49:52.0', 'id': '0123456789', 'name': 'server01'}]]),
('channel.software.listSystemChannels', ('channel.software.listSystemChannels',
[[{'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', 'channel_label': 'rhel-x86_64-server-6'}]]), [[{'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', 'channel_label': 'rhel-x86_64-server-6'}]]),
('auth.logout', [1]), ('auth.logout', [1]),
] ],
'changed': False,
with mock_request(responses, self.module.__name__): 'msg': 'Channel rhel-x86_64-server-6 already exists',
with self.assertRaises(AnsibleExitJson) as result: }
self.module.main() ],
self.assertFalse(result.exception.args[0]['changed']) [
self.assertFalse(responses) # all responses should have been consumed # add channel, check that result is changed
{
def test_add_channel(self):
"""Add another channel: check that result is changed"""
set_module_args({
'name': 'rhel-x86_64-server-6-debuginfo', 'name': 'rhel-x86_64-server-6-debuginfo',
'sysname': 'server01', 'sysname': 'server01',
'url': 'https://rhn.redhat.com/rpc/api', 'url': 'https://rhn.redhat.com/rpc/api',
'user': 'user', 'user': 'user',
'password': 'pass', 'password': 'pass',
}) },
{
responses = [ 'calls': [
('auth.login', ['X' * 43]), ('auth.login', ['X' * 43]),
('system.listUserSystems', ('system.listUserSystems',
[[{'last_checkin': '2017-08-06 19:49:52.0', 'id': '0123456789', 'name': 'server01'}]]), [[{'last_checkin': '2017-08-06 19:49:52.0', 'id': '0123456789', 'name': 'server01'}]]),
@ -65,52 +67,46 @@ class TestRhnChannel(ModuleTestCase):
[[{'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', 'channel_label': 'rhel-x86_64-server-6'}]]), [[{'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', 'channel_label': 'rhel-x86_64-server-6'}]]),
('system.setChildChannels', [1]), ('system.setChildChannels', [1]),
('auth.logout', [1]), ('auth.logout', [1]),
] ],
'changed': True,
with mock_request(responses, self.module.__name__): 'msg': 'Channel rhel-x86_64-server-6-debuginfo added',
with self.assertRaises(AnsibleExitJson) as result: }
self.module.main() ],
self.assertTrue(result.exception.args[0]['changed']) [
self.assertFalse(responses) # all responses should have been consumed # remove inexistent channel, check that result isn't changed
{
def test_remove_inexistent_channel(self):
"""Check that result isn't changed"""
set_module_args({
'name': 'rhel-x86_64-server-6-debuginfo', 'name': 'rhel-x86_64-server-6-debuginfo',
'state': 'absent', 'state': 'absent',
'sysname': 'server01', 'sysname': 'server01',
'url': 'https://rhn.redhat.com/rpc/api', 'url': 'https://rhn.redhat.com/rpc/api',
'user': 'user', 'user': 'user',
'password': 'pass', 'password': 'pass',
}) },
{
responses = [ 'calls': [
('auth.login', ['X' * 43]), ('auth.login', ['X' * 43]),
('system.listUserSystems', ('system.listUserSystems',
[[{'last_checkin': '2017-08-06 19:49:52.0', 'id': '0123456789', 'name': 'server01'}]]), [[{'last_checkin': '2017-08-06 19:49:52.0', 'id': '0123456789', 'name': 'server01'}]]),
('channel.software.listSystemChannels', ('channel.software.listSystemChannels',
[[{'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', 'channel_label': 'rhel-x86_64-server-6'}]]), [[{'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', 'channel_label': 'rhel-x86_64-server-6'}]]),
('auth.logout', [1]), ('auth.logout', [1]),
] ],
'changed': False,
with mock_request(responses, self.module.__name__): 'msg': 'Not subscribed to channel rhel-x86_64-server-6-debuginfo.',
with self.assertRaises(AnsibleExitJson) as result: }
self.module.main() ],
self.assertFalse(result.exception.args[0]['changed']) [
self.assertFalse(responses) # all responses should have been consumed # remove channel, check that result is changed
{
def test_remove_channel(self):
"""Check that result isn't changed"""
set_module_args({
'name': 'rhel-x86_64-server-6-debuginfo', 'name': 'rhel-x86_64-server-6-debuginfo',
'state': 'absent', 'state': 'absent',
'sysname': 'server01', 'sysname': 'server01',
'url': 'https://rhn.redhat.com/rpc/api', 'url': 'https://rhn.redhat.com/rpc/api',
'user': 'user', 'user': 'user',
'password': 'pass', 'password': 'pass',
}) },
{
responses = [ 'calls': [
('auth.login', ['X' * 43]), ('auth.login', ['X' * 43]),
('system.listUserSystems', ('system.listUserSystems',
[[{'last_checkin': '2017-08-06 19:49:52.0', 'id': '0123456789', 'name': 'server01'}]]), [[{'last_checkin': '2017-08-06 19:49:52.0', 'id': '0123456789', 'name': 'server01'}]]),
@ -124,10 +120,23 @@ class TestRhnChannel(ModuleTestCase):
]]), ]]),
('system.setChildChannels', [1]), ('system.setChildChannels', [1]),
('auth.logout', [1]), ('auth.logout', [1]),
],
'changed': True,
'msg': 'Channel rhel-x86_64-server-6-debuginfo removed'
}
] ]
]
@pytest.mark.parametrize('patch_ansible_module, testcase', TEST_CASES, indirect=['patch_ansible_module'])
def test_rhn_channel(capfd, mocker, testcase, mock_request):
"""Check 'msg' and 'changed' results"""
with pytest.raises(SystemExit):
rhn_channel.main()
with mock_request(responses, self.module.__name__): out, err = capfd.readouterr()
with self.assertRaises(AnsibleExitJson) as result: results = json.loads(out)
self.module.main() assert results['changed'] == testcase['changed']
self.assertTrue(result.exception.args[0]['changed']) assert results['msg'] == testcase['msg']
self.assertFalse(responses) # all responses should have been consumed assert not testcase['calls'] # all calls should have been consumed

@ -1,14 +1,15 @@
import contextlib
import json import json
import os
from ansible.compat.tests import unittest from ansible.compat.tests.mock import mock_open
from ansible.compat.tests.mock import PropertyMock, patch, mock_open
from ansible.module_utils import basic from ansible.module_utils import basic
from ansible.module_utils._text import to_native from ansible.module_utils._text import to_native
import ansible.module_utils.six
from ansible.module_utils.six.moves import xmlrpc_client from ansible.module_utils.six.moves import xmlrpc_client
from ansible.modules.packaging.os import rhn_register from ansible.modules.packaging.os import rhn_register
from units.modules.packaging.utils import mock_request import pytest
from units.modules.utils import set_module_args, AnsibleExitJson, AnsibleFailJson, ModuleTestCase
SYSTEMID = """<?xml version="1.0"?> SYSTEMID = """<?xml version="1.0"?>
@ -30,250 +31,233 @@ def skipWhenAllModulesMissing(modules):
for module in modules: for module in modules:
try: try:
__import__(module) __import__(module)
return lambda func: func return False
except ImportError: except ImportError:
continue continue
return unittest.skip("{0}: none are available".format(', '.join(modules))) return True
class TestRhnRegister(ModuleTestCase): orig_import = __import__
def setUp(self): @pytest.fixture
super(TestRhnRegister, self).setUp() def import_libxml(mocker):
def mock_import(name, *args, **kwargs):
if name in ['libxml2', 'libxml']:
raise ImportError()
else:
return orig_import(name, *args, **kwargs)
if ansible.module_utils.six.PY3:
mocker.patch('builtins.__import__', side_effect=mock_import)
else:
mocker.patch('__builtin__.__import__', side_effect=mock_import)
self.module = rhn_register
self.module.HAS_UP2DATE_CLIENT = True
@pytest.fixture
def patch_rhn(mocker):
load_config_return = { load_config_return = {
'serverURL': 'https://xmlrpc.rhn.redhat.com/XMLRPC', 'serverURL': 'https://xmlrpc.rhn.redhat.com/XMLRPC',
'systemIdPath': '/etc/sysconfig/rhn/systemid' 'systemIdPath': '/etc/sysconfig/rhn/systemid'
} }
self.mock_load_config = patch.object(rhn_register.Rhn, 'load_config', return_value=load_config_return)
self.mock_load_config.start()
self.addCleanup(self.mock_load_config.stop)
enable_patcher = patch.object(rhn_register.Rhn, 'enable')
self.mock_enable = enable_patcher.start()
self.addCleanup(enable_patcher.stop)
# This one fails, module needs to be fixed.
# @patch('os.path.isfile')
# def test_systemid_requirements_missing(self, mock_isfile):
# """Check that missing dependencies are detected"""
#
# def mock_import(name, *args):
# if name in ['libxml2', 'libxml']:
# raise ImportError()
# else:
# return orig_import(name, *args)
#
# mock_isfile.return_value = True
# with patch('ansible.modules.packaging.os.rhn_register.open', mock_open(read_data=SYSTEMID), create=True):
# orig_import = __import__
# with patch('__builtin__.__import__', side_effect=mock_import):
# rhn = self.module.Rhn()
# with self.assertRaises(AnsibleFailJson):
# rhn.systemid
@skipWhenAllModulesMissing(['libxml2', 'libxml'])
@patch('os.path.isfile')
def test_systemid_with_requirements(self, mock_isfile):
"""Check systemid property"""
def mock_import(name, *args):
if name in ['libxml2', 'libxml']:
raise ImportError()
else:
return orig_import(name, *args)
mock_isfile.return_value = True mocker.patch.object(rhn_register.Rhn, 'load_config', return_value=load_config_return)
with patch('ansible.modules.packaging.os.rhn_register.open', mock_open(read_data=SYSTEMID), create=True): mocker.patch.object(rhn_register, 'HAS_UP2DATE_CLIENT', mocker.PropertyMock(return_value=True))
orig_import = __import__
with patch('__builtin__.__import__', side_effect=mock_import):
rhn = self.module.Rhn() @pytest.mark.skipif(skipWhenAllModulesMissing(['libxml2', 'libxml']), reason='none are available: libxml2, libxml')
self.assertEqual('123456789', to_native(rhn.systemid)) def test_systemid_with_requirements(capfd, mocker, patch_rhn):
"""Check 'msg' and 'changed' results"""
def test_without_required_parameters(self): mocker.patch.object(rhn_register.Rhn, 'enable')
mock_isfile = mocker.patch('os.path.isfile', return_value=True)
mocker.patch('ansible.modules.packaging.os.rhn_register.open', mock_open(read_data=SYSTEMID), create=True)
rhn = rhn_register.Rhn()
assert '123456789' == to_native(rhn.systemid)
@pytest.mark.parametrize('patch_ansible_module', [{}], indirect=['patch_ansible_module'])
@pytest.mark.usefixtures('patch_ansible_module')
def test_systemid_requirements_missing(capfd, mocker, patch_rhn, import_libxml):
"""Check that missing dependencies are detected"""
mocker.patch('os.path.isfile', return_value=True)
mocker.patch('ansible.modules.packaging.os.rhn_register.open', mock_open(read_data=SYSTEMID), create=True)
with pytest.raises(SystemExit):
rhn_register.main()
out, err = capfd.readouterr()
results = json.loads(out)
assert results['failed']
assert 'Missing arguments' in results['msg']
@pytest.mark.parametrize('patch_ansible_module', [{}], indirect=['patch_ansible_module'])
@pytest.mark.usefixtures('patch_ansible_module')
def test_without_required_parameters(capfd, patch_rhn):
"""Failure must occurs when all parameters are missing""" """Failure must occurs when all parameters are missing"""
with self.assertRaises(AnsibleFailJson):
set_module_args({})
self.module.main()
def test_register_parameters(self): with pytest.raises(SystemExit):
"""Registering an unregistered host""" rhn_register.main()
set_module_args({ out, err = capfd.readouterr()
'activationkey': 'key', results = json.loads(out)
'username': 'user', assert results['failed']
'password': 'pass', assert 'Missing arguments' in results['msg']
})
responses = [
('auth.login', ['X' * 43]),
('channel.software.listSystemChannels',
[[{'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', 'channel_label': 'rhel-x86_64-server-6'}]]),
('channel.software.setSystemChannels', [1]),
('auth.logout', [1]),
]
with patch.object(basic.AnsibleModule, 'run_command') as run_command: TESTED_MODULE = rhn_register.__name__
run_command.return_value = 0, '', '' # successful execution, no output TEST_CASES = [
with patch.object(rhn_register.Rhn, 'systemid', PropertyMock(return_value=12345)): [
with mock_request(responses, self.module.__name__): # Registering an unregistered host
with self.assertRaises(AnsibleExitJson) as result: {
self.module.main()
self.assertTrue(result.exception.args[0]['changed'])
self.assertFalse(responses) # all responses should have been consumed
self.assertEqual(self.mock_enable.call_count, 1)
self.mock_enable.reset_mock()
self.assertEqual(run_command.call_count, 1)
self.assertEqual(run_command.call_args[0][0][0], '/usr/sbin/rhnreg_ks')
def test_register_add_channel(self):
"""Register an unregistered host and add another channel"""
set_module_args({
'activationkey': 'key', 'activationkey': 'key',
'username': 'user', 'username': 'user',
'password': 'pass', 'password': 'pass',
'channels': 'rhel-x86_64-server-6-debuginfo' },
}) {
'calls': [
responses = [
('auth.login', ['X' * 43]), ('auth.login', ['X' * 43]),
('channel.software.listSystemChannels', [[{ ('channel.software.listSystemChannels',
'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', [[{'channel_name': 'Red Hat Enterprise Linux Server (v. 6 for 64-bit x86_64)', 'channel_label': 'rhel-x86_64-server-6'}]]),
'channel_label': 'rhel-x86_64-server-6'}]]),
('channel.software.setSystemChannels', [1]), ('channel.software.setSystemChannels', [1]),
('auth.logout', [1]), ('auth.logout', [1]),
] ],
'is_registered': False,
with patch.object(basic.AnsibleModule, 'run_command') as run_command: 'is_registered.call_count': 1,
run_command.return_value = 0, '', '' # successful execution, no output 'enable.call_count': 1,
with patch.object(rhn_register.Rhn, 'systemid', PropertyMock(return_value=12345)): 'systemid.call_count': 2,
with mock_request(responses, self.module.__name__): 'changed': True,
with self.assertRaises(AnsibleExitJson) as result: 'msg': "System successfully registered to 'rhn.redhat.com'.",
self.module.main() 'run_command.call_count': 1,
self.assertTrue(result.exception.args[0]['changed']) 'run_command.call_args': '/usr/sbin/rhnreg_ks',
self.assertFalse(responses) # all responses should have been consumed 'request_called': True,
'unlink.call_count': 0,
self.assertEqual(self.mock_enable.call_count, 1) }
self.mock_enable.reset_mock() ],
self.assertEqual(run_command.call_count, 1) [
self.assertEqual(run_command.call_args[0][0][0], '/usr/sbin/rhnreg_ks') # Register an host already registered, check that result is unchanged
{
def test_already_registered(self):
"""Register an host already registered, check that result is
unchanged"""
set_module_args({
'activationkey': 'key', 'activationkey': 'key',
'username': 'user', 'username': 'user',
'password': 'pass', 'password': 'pass',
}) },
{
responses = [] 'calls': [
],
with patch.object(basic.AnsibleModule, 'run_command') as run_command: 'is_registered': True,
with patch.object(rhn_register.Rhn, 'is_registered', PropertyMock(return_value=True)) as mock_systemid: 'is_registered.call_count': 1,
with mock_request(responses, self.module.__name__) as req: 'enable.call_count': 0,
with self.assertRaises(AnsibleExitJson) as result: 'systemid.call_count': 0,
self.module.main() 'changed': False,
self.assertFalse(result.exception.args[0]['changed']) 'msg': 'System already registered.',
self.assertFalse(req.called) 'run_command.call_count': 0,
self.assertEqual(mock_systemid.call_count, 1) 'request_called': False,
'unlink.call_count': 0,
self.assertEqual(self.mock_enable.call_count, 0) },
self.assertFalse(run_command.called) ],
[
@patch('os.unlink') # Unregister an host, check that result is changed
def test_unregister(self, mock_unlink): {
"""Unregister an host, check that result is changed"""
mock_unlink.return_value = True
set_module_args({
'activationkey': 'key', 'activationkey': 'key',
'username': 'user', 'username': 'user',
'password': 'pass', 'password': 'pass',
'state': 'absent', 'state': 'absent',
}) },
{
responses = [ 'calls': [
('auth.login', ['X' * 43]), ('auth.login', ['X' * 43]),
('system.deleteSystems', [1]), ('system.deleteSystems', [1]),
('auth.logout', [1]), ('auth.logout', [1]),
] ],
'is_registered': True,
with patch.object(basic.AnsibleModule, 'run_command') as run_command: 'is_registered.call_count': 1,
run_command.return_value = 0, '', '' # successful execution, no output 'enable.call_count': 0,
mock_is_registered = PropertyMock(return_value=True) 'systemid.call_count': 1,
mock_systemid = PropertyMock(return_value=12345) 'changed': True,
with patch.multiple(rhn_register.Rhn, systemid=mock_systemid, is_registered=mock_is_registered): 'msg': 'System successfully unregistered from rhn.redhat.com.',
with mock_request(responses, self.module.__name__): 'run_command.call_count': 0,
with self.assertRaises(AnsibleExitJson) as result: 'request_called': True,
self.module.main() 'unlink.call_count': 1,
self.assertTrue(result.exception.args[0]['changed']) }
self.assertFalse(responses) # all responses should have been consumed ],
self.assertEqual(mock_systemid.call_count, 1) [
self.assertEqual(mock_is_registered.call_count, 1) # Unregister a unregistered host (systemid missing) locally, check that result is unchanged
self.assertFalse(run_command.called) {
self.assertEqual(mock_unlink.call_count, 1)
@patch('os.unlink')
def test_unregister_not_registered(self, mock_unlink):
"""Unregister a unregistered host (systemid missing)
locally, check that result is unchanged"""
mock_unlink.return_value = True
set_module_args({
'activationkey': 'key', 'activationkey': 'key',
'username': 'user', 'username': 'user',
'password': 'pass', 'password': 'pass',
'state': 'absent', 'state': 'absent',
}) },
{
with patch.object(basic.AnsibleModule, 'run_command') as run_command: 'calls': [],
with patch.object(rhn_register.Rhn, 'is_registered', PropertyMock(return_value=False)) as mock_is_registered: 'is_registered': False,
with patch('ansible.modules.packaging.os.rhn_register.xmlrpc_client.Transport.request') as req: 'is_registered.call_count': 1,
with self.assertRaises(AnsibleExitJson) as result: 'enable.call_count': 0,
self.module.main() 'systemid.call_count': 0,
self.assertFalse(result.exception.args[0]['changed']) 'changed': False,
self.assertFalse(req.called) 'msg': 'System already unregistered.',
self.assertEqual(mock_is_registered.call_count, 1) 'run_command.call_count': 0,
'request_called': False,
self.assertFalse(run_command.called) 'unlink.call_count': 0,
self.assertFalse(mock_unlink.called) }
@patch('os.unlink') ],
def test_unregister_unknown_host(self, mock_unlink): [
"""Unregister an unknown host (an host with a systemid available # Unregister an unknown host (an host with a systemid available locally, check that result contains failed
locally, check that result contains failed""" {
set_module_args({
'activationkey': 'key', 'activationkey': 'key',
'username': 'user', 'username': 'user',
'password': 'pass', 'password': 'pass',
'state': 'absent', 'state': 'absent',
}) },
{
responses = [ 'calls': [
('auth.login', ['X' * 43]), ('auth.login', ['X' * 43]),
('system.deleteSystems', xmlrpc_client.Fault(1003, 'The following systems were NOT deleted: 123456789')), ('system.deleteSystems', xmlrpc_client.Fault(1003, 'The following systems were NOT deleted: 123456789')),
('auth.logout', [1]), ('auth.logout', [1]),
],
'is_registered': True,
'is_registered.call_count': 1,
'enable.call_count': 0,
'systemid.call_count': 1,
'failed': True,
'msg': "Failed to unregister: <Fault 1003: 'The following systems were NOT deleted: 123456789'>",
'run_command.call_count': 0,
'request_called': True,
'unlink.call_count': 0,
}
],
] ]
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
run_command.return_value = 0, '', '' # successful execution, no output @pytest.mark.parametrize('patch_ansible_module, testcase', TEST_CASES, indirect=['patch_ansible_module'])
mock_is_registered = PropertyMock(return_value=True) @pytest.mark.usefixtures('patch_ansible_module')
mock_systemid = PropertyMock(return_value=12345) def test_register_parameters(mocker, capfd, mock_request, patch_rhn, testcase):
with patch.multiple(rhn_register.Rhn, systemid=mock_systemid, is_registered=mock_is_registered): # successful execution, no output
with mock_request(responses, self.module.__name__): mocker.patch.object(basic.AnsibleModule, 'run_command', return_value=(0, '', ''))
with self.assertRaises(AnsibleFailJson) as result: mock_is_registered = mocker.patch.object(rhn_register.Rhn, 'is_registered', mocker.PropertyMock(return_value=testcase['is_registered']))
self.module.main() mocker.patch.object(rhn_register.Rhn, 'enable')
self.assertTrue(result.exception.args[0]['failed']) mock_systemid = mocker.patch.object(rhn_register.Rhn, 'systemid', mocker.PropertyMock(return_value=12345))
self.assertFalse(responses) # all responses should have been consumed mocker.patch('os.unlink', return_value=True)
self.assertEqual(mock_systemid.call_count, 1)
self.assertEqual(mock_is_registered.call_count, 1) with pytest.raises(SystemExit):
self.assertFalse(run_command.called) rhn_register.main()
self.assertFalse(mock_unlink.called)
assert basic.AnsibleModule.run_command.call_count == testcase['run_command.call_count']
if basic.AnsibleModule.run_command.call_count:
assert basic.AnsibleModule.run_command.call_args[0][0][0] == testcase['run_command.call_args']
assert mock_is_registered.call_count == testcase['is_registered.call_count']
assert rhn_register.Rhn.enable.call_count == testcase['enable.call_count']
assert mock_systemid.call_count == testcase['systemid.call_count']
assert xmlrpc_client.Transport.request.called == testcase['request_called']
assert os.unlink.call_count == testcase['unlink.call_count']
out, err = capfd.readouterr()
results = json.loads(out)
assert results.get('changed') == testcase.get('changed')
assert results.get('failed') == testcase.get('failed')
assert results['msg'] == testcase['msg']
assert not testcase['calls'] # all calls should have been consumed

Loading…
Cancel
Save