From cd3616423986e02f6104b70efab650b320b987a8 Mon Sep 17 00:00:00 2001 From: Toshio Kuratomi Date: Tue, 5 Dec 2017 12:43:13 -0800 Subject: [PATCH] Porting tests to pytest (#33387) * Porting tests to pytest * Achievement Get: No longer need mock/generator.py * Now done via pytest's parametrization * Port safe_eval to pytest * Port text tests to pytest * Port test_set_mode_if_different to pytest * Change conftest AnsibleModule fixtures to be more flexible * Move the AnsibleModules fixtures to module_utils/conftest.py for sharing * Testing the argspec code requires: * injecting both the argspec and the arguments. * Patching the arguments into sys.stdin at a different level * More porting to obsolete mock/procenv.py * Port run_command to pytest * Port known_hosts tests to pytest * Port safe_eval to pytest * Port test_distribution_version.py to pytest * Port test_log to pytest * Port test__log_invocation to pytest * Remove unneeded import of procenv in test_postgresql * Port test_pip to pytest style * As part of this, create a pytest ansiblemodule fixture in modules/conftest.py. This is slightly different than the approach taken in module_utils because here we need to override the AnsibleModule that the modules will inherit from instead of one that we're instantiating ourselves. * Fixup usage of parametrization in test_deprecate_warn * Check that the pip module failed in our test --- test/units/mock/generator.py | 72 ---- .../basic/test__log_invocation.py | 129 +++---- .../module_utils/basic/test_argument_spec.py | 62 +-- .../module_utils/basic/test_deprecate_warn.py | 64 +-- .../module_utils/basic/test_exit_json.py | 212 +++++----- test/units/module_utils/basic/test_log.py | 296 +++++--------- .../module_utils/basic/test_run_command.py | 363 +++++++++--------- .../module_utils/basic/test_safe_eval.py | 117 +++--- .../basic/test_set_mode_if_different.py | 219 +++++------ test/units/module_utils/conftest.py | 65 ++++ .../module_utils/test_distribution_version.py | 78 ++-- test/units/module_utils/test_known_hosts.py | 240 ++++++------ test/units/module_utils/test_postgresql.py | 6 +- test/units/module_utils/test_text.py | 61 ++- test/units/modules/conftest.py | 24 ++ .../modules/packaging/language/test_pip.py | 27 +- 16 files changed, 865 insertions(+), 1170 deletions(-) delete mode 100644 test/units/mock/generator.py create mode 100644 test/units/module_utils/conftest.py create mode 100644 test/units/modules/conftest.py diff --git a/test/units/mock/generator.py b/test/units/mock/generator.py deleted file mode 100644 index a112fe12de9..00000000000 --- a/test/units/mock/generator.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2016 Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -from collections import Mapping - - -def make_method(func, args, kwargs): - - def test_method(self): - func(self, *args, **kwargs) - - # Format the argument string - arg_string = ', '.join(repr(a) for a in args) - kwarg_string = ', '.join('{0}={1}'.format(item[0], repr(item[1])) for item in kwargs.items()) - arg_list = [] - if arg_string: - arg_list.append(arg_string) - if kwarg_string: - arg_list.append(kwarg_string) - - test_method.__name__ = 'test_{0}({1})'.format(func.__name__, ', '.join(arg_list)) - return test_method - - -def add_method(func, *combined_args): - """ - Add a test case via a class decorator. - - nose uses generators for this but doesn't work with unittest.TestCase - subclasses. So we have to write our own. - - The first argument to this decorator is a test function. All subsequent - arguments are the arguments to create each generated test function with in - the following format: - - Each set of arguments is a two-tuple. The first element is an iterable of - positional arguments. the second is a dict representing the kwargs. - """ - def wrapper(cls): - for combined_arg in combined_args: - if len(combined_arg) == 2: - args = combined_arg[0] - kwargs = combined_arg[1] - elif isinstance(combined_arg[0], Mapping): - args = [] - kwargs = combined_arg[0] - else: - args = combined_arg[0] - kwargs = {} - test_method = make_method(func, args, kwargs) - setattr(cls, test_method.__name__, test_method) - return cls - - return wrapper diff --git a/test/units/module_utils/basic/test__log_invocation.py b/test/units/module_utils/basic/test__log_invocation.py index 5cdd5c8c526..3beda8bda93 100644 --- a/test/units/module_utils/basic/test__log_invocation.py +++ b/test/units/module_utils/basic/test__log_invocation.py @@ -1,84 +1,55 @@ # -*- coding: utf-8 -*- # (c) 2016, James Cammarata -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# (c) 2017, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -# Make coding more python3-ish -from __future__ import (absolute_import, division) +from __future__ import absolute_import, division, print_function __metaclass__ = type -import json -import sys - -from units.mock.procenv import swap_stdin_and_argv - -from ansible.compat.tests import unittest -from ansible.compat.tests.mock import MagicMock - - -class TestModuleUtilsBasic(unittest.TestCase): - def test_module_utils_basic__log_invocation(self): - with swap_stdin_and_argv(stdin_data=json.dumps(dict( - ANSIBLE_MODULE_ARGS=dict(foo=False, bar=[1, 2, 3], bam="bam", baz=u'baz')), - )): - from ansible.module_utils import basic - - # test basic log invocation - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=dict( - foo=dict(default=True, type='bool'), - bar=dict(default=[], type='list'), - bam=dict(default="bam"), - baz=dict(default=u"baz"), - password=dict(default=True), - no_log=dict(default="you shouldn't see me", no_log=True), - ), - ) - - am.log = MagicMock() - am._log_invocation() - - # Message is generated from a dict so it will be in an unknown order. - # have to check this manually rather than with assert_called_with() - args = am.log.call_args[0] - self.assertEqual(len(args), 1) - message = args[0] - - self.assertEqual( - len(message), - len('Invoked with bam=bam bar=[1, 2, 3] foo=False baz=baz no_log=NOT_LOGGING_PARAMETER password=NOT_LOGGING_PASSWORD') - ) - self.assertTrue(message.startswith('Invoked with ')) - self.assertIn(' bam=bam', message) - self.assertIn(' bar=[1, 2, 3]', message) - self.assertIn(' foo=False', message) - self.assertIn(' baz=baz', message) - self.assertIn(' no_log=NOT_LOGGING_PARAMETER', message) - self.assertIn(' password=NOT_LOGGING_PASSWORD', message) - - kwargs = am.log.call_args[1] - self.assertEqual( - kwargs, - dict(log_args={ - 'foo': 'False', - 'bar': '[1, 2, 3]', - 'bam': 'bam', - 'baz': 'baz', - 'password': 'NOT_LOGGING_PASSWORD', - 'no_log': 'NOT_LOGGING_PARAMETER', - }) - ) +import pytest + + +ARGS = dict(foo=False, bar=[1, 2, 3], bam="bam", baz=u'baz') +ARGUMENT_SPEC = dict( + foo=dict(default=True, type='bool'), + bar=dict(default=[], type='list'), + bam=dict(default="bam"), + baz=dict(default=u"baz"), + password=dict(default=True), + no_log=dict(default="you shouldn't see me", no_log=True), +) + + +@pytest.mark.parametrize('am, stdin', [(ARGUMENT_SPEC, ARGS)], indirect=['am', 'stdin']) +def test_module_utils_basic__log_invocation(am, mocker): + + am.log = mocker.MagicMock() + am._log_invocation() + + # Message is generated from a dict so it will be in an unknown order. + # have to check this manually rather than with assert_called_with() + args = am.log.call_args[0] + assert len(args) == 1 + message = args[0] + + assert len(message) == \ + len('Invoked with bam=bam bar=[1, 2, 3] foo=False baz=baz no_log=NOT_LOGGING_PARAMETER password=NOT_LOGGING_PASSWORD') + + assert message.startswith('Invoked with ') + assert ' bam=bam' in message + assert ' bar=[1, 2, 3]' in message + assert ' foo=False' in message + assert ' baz=baz' in message + assert ' no_log=NOT_LOGGING_PARAMETER' in message + assert ' password=NOT_LOGGING_PASSWORD' in message + + kwargs = am.log.call_args[1] + assert kwargs == \ + dict(log_args={ + 'foo': 'False', + 'bar': '[1, 2, 3]', + 'bam': 'bam', + 'baz': 'baz', + 'password': 'NOT_LOGGING_PASSWORD', + 'no_log': 'NOT_LOGGING_PARAMETER', + }) diff --git a/test/units/module_utils/basic/test_argument_spec.py b/test/units/module_utils/basic/test_argument_spec.py index f6f9689f726..7e167454f20 100644 --- a/test/units/module_utils/basic/test_argument_spec.py +++ b/test/units/module_utils/basic/test_argument_spec.py @@ -6,44 +6,46 @@ __metaclass__ = type import json -from ansible.compat.tests import unittest +import pytest + from ansible.compat.tests.mock import MagicMock -from units.mock.procenv import swap_stdin_and_argv, swap_stdout from ansible.module_utils import basic -class TestCallableTypeValidation(unittest.TestCase): - def setUp(self): - args = json.dumps(dict(ANSIBLE_MODULE_ARGS=dict(arg="42"))) - self.stdin_swap_ctx = swap_stdin_and_argv(stdin_data=args) - self.stdin_swap_ctx.__enter__() +MOCK_VALIDATOR_SUCCESS = MagicMock(return_value=42) +MOCK_VALIDATOR_FAIL = MagicMock(side_effect=TypeError("bad conversion")) +# Data is argspec, argument, expected +VALID_SPECS = ( + ({'arg': {'type': int}}, {'arg': 42}, 42), + ({'arg': {'type': int}}, {'arg': '42'}, 42), + ({'arg': {'type': MOCK_VALIDATOR_SUCCESS}}, {'arg': 42}, 42), +) - # since we can't use context managers and "with" without overriding run(), call them directly - self.stdout_swap_ctx = swap_stdout() - self.fake_stream = self.stdout_swap_ctx.__enter__() +INVALID_SPECS = ( + ({'arg': {'type': int}}, {'arg': "bad"}, "invalid literal for int() with base 10: 'bad'"), + ({'arg': {'type': MOCK_VALIDATOR_FAIL}}, {'arg': "bad"}, "bad conversion"), +) - basic._ANSIBLE_ARGS = None - def tearDown(self): - # since we can't use context managers and "with" without overriding run(), call them directly to clean up - self.stdin_swap_ctx.__exit__(None, None, None) - self.stdout_swap_ctx.__exit__(None, None, None) +@pytest.mark.parametrize('argspec, expected, am, stdin', [(s[0], s[2], s[0], s[1]) for s in VALID_SPECS], + indirect=['am', 'stdin']) +def test_validator_success(am, mocker, argspec, expected): - def test_validate_success(self): - mock_validator = MagicMock(return_value=42) - m = basic.AnsibleModule(argument_spec=dict( - arg=dict(type=mock_validator) - )) + type_ = argspec['arg']['type'] + if isinstance(type_, MagicMock): + assert type_.called + else: + assert isinstance(am.params['arg'], type_) + assert am.params['arg'] == expected - self.assertTrue(mock_validator.called) - self.assertEqual(m.params['arg'], 42) - self.assertEqual(type(m.params['arg']), int) - def test_validate_fail(self): - mock_validator = MagicMock(side_effect=TypeError("bad conversion")) - with self.assertRaises(SystemExit) as ecm: - m = basic.AnsibleModule(argument_spec=dict( - arg=dict(type=mock_validator) - )) +@pytest.mark.parametrize('argspec, expected, stdin', [(s[0], s[2], s[1]) for s in INVALID_SPECS], + indirect=['stdin']) +def test_validator_fail(stdin, capfd, argspec, expected): + with pytest.raises(SystemExit) as ecm: + m = basic.AnsibleModule(argument_spec=argspec) - self.assertIn("bad conversion", json.loads(self.fake_stream.getvalue())['msg']) + out, err = capfd.readouterr() + assert not err + assert expected in json.loads(out)['msg'] + assert json.loads(out)['failed'] diff --git a/test/units/module_utils/basic/test_deprecate_warn.py b/test/units/module_utils/basic/test_deprecate_warn.py index bb70fe2552c..43d2dec23ff 100644 --- a/test/units/module_utils/basic/test_deprecate_warn.py +++ b/test/units/module_utils/basic/test_deprecate_warn.py @@ -1,19 +1,7 @@ # -*- coding: utf-8 -*- # -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) import json import sys @@ -27,49 +15,7 @@ from ansible.module_utils.six import PY3, string_types from ansible.module_utils._text import to_bytes -@pytest.fixture -def stdin(mocker, request): - if isinstance(request.param, string_types): - args = request.param - elif isinstance(request.param, MutableMapping): - if 'ANSIBLE_MODULE_ARGS' not in request.param: - request.param = {'ANSIBLE_MODULE_ARGS': request.param} - args = json.dumps(request.param) - else: - raise Exception('Malformed data to the stdin pytest fixture') - - real_stdin = sys.stdin - fake_stdin = BytesIO(to_bytes(args, errors='surrogate_or_strict')) - if PY3: - sys.stdin = mocker.MagicMock() - sys.stdin.buffer = fake_stdin - else: - sys.stdin = fake_stdin - - yield fake_stdin - - sys.stdin = real_stdin - - -@pytest.fixture -def am(stdin, request): - old_args = ansible.module_utils.basic._ANSIBLE_ARGS - ansible.module_utils.basic._ANSIBLE_ARGS = None - old_argv = sys.argv - sys.argv = ['ansible_unittest'] - - am = ansible.module_utils.basic.AnsibleModule( - argument_spec=dict(), - ) - am._name = 'ansible_unittest' - - yield am - - ansible.module_utils.basic._ANSIBLE_ARGS = old_args - sys.argv = old_argv - - -@pytest.mark.parametrize('stdin', [{}], indirect=True) +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) def test_warn(am, capfd): am.warn('warning1') @@ -80,7 +26,7 @@ def test_warn(am, capfd): assert json.loads(out)['warnings'] == ['warning1', 'warning2'] -@pytest.mark.parametrize('stdin', [{}], indirect=True) +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) def test_deprecate(am, capfd): am.deprecate('deprecation1') am.deprecate('deprecation2', '2.3') @@ -99,7 +45,7 @@ def test_deprecate(am, capfd): ] -@pytest.mark.parametrize('stdin', [{}], indirect=True) +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) def test_deprecate_without_list(am, capfd): with pytest.raises(SystemExit): am.exit_json(deprecations='Simple deprecation warning') diff --git a/test/units/module_utils/basic/test_exit_json.py b/test/units/module_utils/basic/test_exit_json.py index 47b3db0df75..234a1cc1130 100644 --- a/test/units/module_utils/basic/test_exit_json.py +++ b/test/units/module_utils/basic/test_exit_json.py @@ -1,98 +1,76 @@ # -*- coding: utf-8 -*- -# (c) 2015, Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# Copyright (c) 2015-2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish from __future__ import (absolute_import, division) __metaclass__ = type -import copy import json import sys +from itertools import chain -from ansible.compat.tests import unittest -from ansible.module_utils import basic -from units.mock.procenv import swap_stdin_and_argv, swap_stdout - - -empty_invocation = {u'module_args': {}} - - -class TestAnsibleModuleExitJson(unittest.TestCase): - def setUp(self): - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - self.stdin_swap_ctx = swap_stdin_and_argv(stdin_data=args) - self.stdin_swap_ctx.__enter__() - - # since we can't use context managers and "with" without overriding run(), call them directly - self.stdout_swap_ctx = swap_stdout() - self.fake_stream = self.stdout_swap_ctx.__enter__() - - basic._ANSIBLE_ARGS = None - self.module = basic.AnsibleModule(argument_spec=dict()) - - def tearDown(self): - # since we can't use context managers and "with" without overriding run(), call them directly to clean up - self.stdin_swap_ctx.__exit__(None, None, None) - self.stdout_swap_ctx.__exit__(None, None, None) - - def test_exit_json_no_args_exits(self): - with self.assertRaises(SystemExit) as ctx: - self.module.exit_json() - if isinstance(ctx.exception, int): - # Python2.6... why does sys.exit behave this way? - self.assertEquals(ctx.exception, 0) - else: - self.assertEquals(ctx.exception.code, 0) - return_val = json.loads(self.fake_stream.getvalue()) - self.assertEquals(return_val, dict(invocation=empty_invocation)) - - def test_exit_json_args_exits(self): - with self.assertRaises(SystemExit) as ctx: - self.module.exit_json(msg='message') - if isinstance(ctx.exception, int): - # Python2.6... why does sys.exit behave this way? - self.assertEquals(ctx.exception, 0) - else: - self.assertEquals(ctx.exception.code, 0) - return_val = json.loads(self.fake_stream.getvalue()) - self.assertEquals(return_val, dict(msg="message", invocation=empty_invocation)) - - def test_fail_json_exits(self): - with self.assertRaises(SystemExit) as ctx: - self.module.fail_json(msg='message') - if isinstance(ctx.exception, int): - # Python2.6... why does sys.exit behave this way? - self.assertEquals(ctx.exception, 1) - else: - self.assertEquals(ctx.exception.code, 1) - return_val = json.loads(self.fake_stream.getvalue()) - self.assertEquals(return_val, dict(msg="message", failed=True, invocation=empty_invocation)) - - def test_exit_json_proper_changed(self): - with self.assertRaises(SystemExit) as ctx: - self.module.exit_json(changed=True, msg='success') - return_val = json.loads(self.fake_stream.getvalue()) - self.assertEquals(return_val, dict(changed=True, msg='success', invocation=empty_invocation)) - - -class TestAnsibleModuleExitValuesRemoved(unittest.TestCase): +import pytest + + +EMPTY_INVOCATION = {u'module_args': {}} + + +class TestAnsibleModuleExitJson: + """ + Test that various means of calling exitJson and FailJson return the messages they've been given + """ + DATA = ( + ({}, {'invocation': EMPTY_INVOCATION}), + ({'msg': 'message'}, {'msg': 'message', 'invocation': EMPTY_INVOCATION}), + ({'msg': 'success', 'changed': True}, + {'msg': 'success', 'changed': True, 'invocation': EMPTY_INVOCATION}), + ({'msg': 'nochange', 'changed': False}, + {'msg': 'nochange', 'changed': False, 'invocation': EMPTY_INVOCATION}), + ) + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + # pylint: disable=undefined-variable + @pytest.mark.parametrize('args, expected, stdin', ((a, e, {}) for a, e in DATA), indirect=['stdin']) + def test_exit_json_exits(self, am, capfd, args, expected): + with pytest.raises(SystemExit) as ctx: + am.exit_json(**args) + assert ctx.value.code == 0 + + out, err = capfd.readouterr() + return_val = json.loads(out) + assert return_val == expected + + # Fail_json is only legal if it's called with a message + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('args, expected, stdin', + ((a, e, {}) for a, e in DATA if 'msg' in a), # pylint: disable=undefined-variable + indirect=['stdin']) + def test_fail_json_exits(self, am, capfd, args, expected): + with pytest.raises(SystemExit) as ctx: + am.fail_json(**args) + assert ctx.value.code == 1 + + out, err = capfd.readouterr() + return_val = json.loads(out) + # Fail_json should add failed=True + expected['failed'] = True + assert return_val == expected + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_fail_json_no_msg(self, am): + with pytest.raises(AssertionError) as ctx: + am.fail_json() + assert ctx.value.args[0] == "implementation error -- msg to explain the error is required" + + +class TestAnsibleModuleExitValuesRemoved: + """ + Test that ExitJson and FailJson remove password-like values + """ OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER' - dataset = ( + + DATA = ( ( dict(username='person', password='$ecret k3y'), dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/', @@ -119,43 +97,27 @@ class TestAnsibleModuleExitValuesRemoved(unittest.TestCase): ), ) - def test_exit_json_removes_values(self): - self.maxDiff = None - for args, return_val, expected in self.dataset: - params = dict(ANSIBLE_MODULE_ARGS=args) - params = json.dumps(params) - - with swap_stdin_and_argv(stdin_data=params): - with swap_stdout(): - basic._ANSIBLE_ARGS = None - module = basic.AnsibleModule( - argument_spec=dict( - username=dict(), - password=dict(no_log=True), - token=dict(no_log=True), - ), - ) - with self.assertRaises(SystemExit) as ctx: - self.assertEquals(module.exit_json(**return_val), expected) - self.assertEquals(json.loads(sys.stdout.getvalue()), expected) - - def test_fail_json_removes_values(self): - self.maxDiff = None - for args, return_val, expected in self.dataset: - expected = copy.deepcopy(expected) - expected['failed'] = True - params = dict(ANSIBLE_MODULE_ARGS=args) - params = json.dumps(params) - with swap_stdin_and_argv(stdin_data=params): - with swap_stdout(): - basic._ANSIBLE_ARGS = None - module = basic.AnsibleModule( - argument_spec=dict( - username=dict(), - password=dict(no_log=True), - token=dict(no_log=True), - ), - ) - with self.assertRaises(SystemExit) as ctx: - self.assertEquals(module.fail_json(**return_val), expected) - self.assertEquals(json.loads(sys.stdout.getvalue()), expected) + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('am, stdin, return_val, expected', + (({'username': {}, 'password': {'no_log': True}, 'token': {'no_log': True}}, s, r, e) + for s, r, e in DATA), # pylint: disable=undefined-variable + indirect=['am', 'stdin']) + def test_exit_json_removes_values(self, am, capfd, return_val, expected): + with pytest.raises(SystemExit) as ctx: + am.exit_json(**return_val) + out, err = capfd.readouterr() + + assert json.loads(out) == expected + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('am, stdin, return_val, expected', + (({'username': {}, 'password': {'no_log': True}, 'token': {'no_log': True}}, s, r, e) + for s, r, e in DATA), # pylint: disable=undefined-variable + indirect=['am', 'stdin']) + def test_fail_json_removes_values(self, am, capfd, return_val, expected): + expected['failed'] = True + with pytest.raises(SystemExit) as ctx: + am.fail_json(**return_val) == expected + out, err = capfd.readouterr() + + assert json.loads(out) == expected diff --git a/test/units/module_utils/basic/test_log.py b/test/units/module_utils/basic/test_log.py index d635af97e49..80654bb4678 100644 --- a/test/units/module_utils/basic/test_log.py +++ b/test/units/module_utils/basic/test_log.py @@ -1,35 +1,20 @@ # -*- coding: utf-8 -*- # (c) 2012-2014, Michael DeHaan -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# (c) 2017, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -# Make coding more python3-ish -from __future__ import (absolute_import, division) +from __future__ import absolute_import, division, print_function __metaclass__ = type import sys import json import syslog +from itertools import product -from ansible.compat.tests import unittest -from ansible.compat.tests.mock import patch, MagicMock -from units.mock.procenv import swap_stdin_and_argv +import pytest import ansible.module_utils.basic - +from ansible.module_utils.six import PY3 try: # Python 3.4+ @@ -41,80 +26,47 @@ except ImportError: pass -class TestAnsibleModuleSysLogSmokeTest(unittest.TestCase): - def setUp(self): - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap = swap_stdin_and_argv(stdin_data=args) - self.stdin_swap.__enter__() - - ansible.module_utils.basic._ANSIBLE_ARGS = None - self.am = ansible.module_utils.basic.AnsibleModule( - argument_spec=dict(), - ) - self.am._name = 'unittest' - - self.has_journal = ansible.module_utils.basic.has_journal - if self.has_journal: - # Systems with journal can still test syslog - ansible.module_utils.basic.has_journal = False - - def tearDown(self): - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap.__exit__(None, None, None) - ansible.module_utils.basic.has_journal = self.has_journal +class TestAnsibleModuleLogSmokeTest: + DATA = [u'Text string', u'Toshio くらとみ non-ascii test'] + DATA = DATA + [d.encode('utf-8') for d in DATA] + DATA += [b'non-utf8 :\xff: test'] - def test_smoketest_syslog(self): + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('msg, stdin', ((m, {}) for m in DATA), indirect=['stdin']) # pylint: disable=undefined-variable + def test_smoketest_syslog(self, am, mocker, msg): # These talk to the live daemons on the system. Need to do this to # show that what we send doesn't cause an issue once it gets to the # daemon. These are just smoketests to test that we don't fail. + mocker.patch('ansible.module_utils.basic.has_journal', False) - self.am.log(u'Text string') - self.am.log(u'Toshio くらとみ non-ascii test') - - self.am.log(b'Byte string') - self.am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8')) - self.am.log(b'non-utf8 :\xff: test') - - -class TestAnsibleModuleJournaldSmokeTest(unittest.TestCase): - - def setUp(self): - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap = swap_stdin_and_argv(stdin_data=args) - self.stdin_swap.__enter__() + am.log(u'Text string') + am.log(u'Toshio くらとみ non-ascii test') - ansible.module_utils.basic._ANSIBLE_ARGS = None - self.am = ansible.module_utils.basic.AnsibleModule( - argument_spec=dict(), - ) - self.am._name = 'unittest' + am.log(b'Byte string') + am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8')) + am.log(b'non-utf8 :\xff: test') - def tearDown(self): - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap.__exit__(None, None, None) - - @unittest.skipUnless(ansible.module_utils.basic.has_journal, 'python systemd bindings not installed') - def test_smoketest_journal(self): + @pytest.mark.skipif(not ansible.module_utils.basic.has_journal, reason='python systemd bindings not installed') + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('msg, stdin', ((m, {}) for m in DATA), indirect=['stdin']) # pylint: disable=undefined-variable + def test_smoketest_journal(self, am, mocker, msg): # These talk to the live daemons on the system. Need to do this to # show that what we send doesn't cause an issue once it gets to the # daemon. These are just smoketests to test that we don't fail. + mocker.patch('ansible.module_utils.basic.has_journal', True) - self.am.log(u'Text string') - self.am.log(u'Toshio くらとみ non-ascii test') + am.log(u'Text string') + am.log(u'Toshio くらとみ non-ascii test') - self.am.log(b'Byte string') - self.am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8')) - self.am.log(b'non-utf8 :\xff: test') + am.log(b'Byte string') + am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8')) + am.log(b'non-utf8 :\xff: test') -class TestAnsibleModuleLogSyslog(unittest.TestCase): +class TestAnsibleModuleLogSyslog: """Test the AnsibleModule Log Method""" - py2_output_data = { + PY2_OUTPUT_DATA = { u'Text string': b'Text string', u'Toshio くらとみ non-ascii test': u'Toshio くらとみ non-ascii test'.encode('utf-8'), b'Byte string': b'Byte string', @@ -122,7 +74,7 @@ class TestAnsibleModuleLogSyslog(unittest.TestCase): b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'.decode('utf-8', 'replace').encode('utf-8'), } - py3_output_data = { + PY3_OUTPUT_DATA = { u'Text string': u'Text string', u'Toshio くらとみ non-ascii test': u'Toshio くらとみ non-ascii test', b'Byte string': u'Byte string', @@ -130,60 +82,37 @@ class TestAnsibleModuleLogSyslog(unittest.TestCase): b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'.decode('utf-8', 'replace') } - def setUp(self): - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap = swap_stdin_and_argv(stdin_data=args) - self.stdin_swap.__enter__() - - ansible.module_utils.basic._ANSIBLE_ARGS = None - self.am = ansible.module_utils.basic.AnsibleModule( - argument_spec=dict(), - ) - self.am._name = 'unittest' - - self.has_journal = ansible.module_utils.basic.has_journal - if self.has_journal: - # Systems with journal can still test syslog - ansible.module_utils.basic.has_journal = False - - def tearDown(self): - # teardown/reset - ansible.module_utils.basic.has_journal = self.has_journal - - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap.__exit__(None, None, None) - - @patch('syslog.syslog', autospec=True) - def test_no_log(self, mock_func): - no_log = self.am.no_log - - self.am.no_log = True - self.am.log('unittest no_log') - self.assertFalse(mock_func.called) - - self.am.no_log = False - self.am.log('unittest no_log') - mock_func.assert_called_once_with(syslog.LOG_INFO, 'unittest no_log') - - self.am.no_log = no_log - - def test_output_matches(self): - if sys.version_info >= (3,): - output_data = self.py3_output_data + @pytest.mark.parametrize('no_log, stdin', (product((True, False), [{}])), indirect=['stdin']) + def test_no_log(self, am, mocker, no_log): + """Test that when no_log is set, logging does not occur""" + mock_syslog = mocker.patch('syslog.syslog', autospec=True) + mocker.patch('ansible.module_utils.basic.has_journal', False) + am.no_log = no_log + am.log('unittest no_log') + if no_log: + assert not mock_syslog.called else: - output_data = self.py2_output_data + mock_syslog.assert_called_once_with(syslog.LOG_INFO, 'unittest no_log') + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('msg, param, stdin', + ((m, p, {}) for m, p in + (PY3_OUTPUT_DATA.items() if PY3 else PY2_OUTPUT_DATA.items())), # pylint: disable=undefined-variable + indirect=['stdin']) + def test_output_matches(self, am, mocker, msg, param): + """Check that log messages are sent correctly""" + mocker.patch('ansible.module_utils.basic.has_journal', False) + mock_syslog = mocker.patch('syslog.syslog', autospec=True) - for msg, param in output_data.items(): - with patch('syslog.syslog', autospec=True) as mock_func: - self.am.log(msg) - mock_func.assert_called_once_with(syslog.LOG_INFO, param) + am.log(msg) + mock_syslog.assert_called_once_with(syslog.LOG_INFO, param) -class TestAnsibleModuleLogJournal(unittest.TestCase): +@pytest.mark.skipif(not ansible.module_utils.basic.has_journal, reason='python systemd bindings not installed') +class TestAnsibleModuleLogJournal: """Test the AnsibleModule Log Method""" - output_data = { + OUTPUT_DATA = { u'Text string': u'Text string', u'Toshio くらとみ non-ascii test': u'Toshio くらとみ non-ascii test', b'Byte string': u'Byte string', @@ -191,82 +120,43 @@ class TestAnsibleModuleLogJournal(unittest.TestCase): b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'.decode('utf-8', 'replace') } - # overriding run lets us use context managers for setup/teardown-esque behavior - def setUp(self): - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap = swap_stdin_and_argv(stdin_data=args) - self.stdin_swap.__enter__() - - ansible.module_utils.basic._ANSIBLE_ARGS = None - self.am = ansible.module_utils.basic.AnsibleModule( - argument_spec=dict(), - ) - self.am._name = 'unittest' - - self.has_journal = ansible.module_utils.basic.has_journal - ansible.module_utils.basic.has_journal = True - - self.module_patcher = None - - # In case systemd-python is not installed - if not self.has_journal: - self.module_patcher = patch.dict('sys.modules', {'systemd': MagicMock(), 'systemd.journal': MagicMock()}) - self.module_patcher.start() - try: - reload(ansible.module_utils.basic) - except NameError: - self._fake_out_reload(ansible.module_utils.basic) - - def tearDown(self): - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap.__exit__(None, None, None) - - # teardown/reset - ansible.module_utils.basic.has_journal = self.has_journal - - if self.module_patcher: - self.module_patcher.stop() - reload(ansible.module_utils.basic) - - @patch('systemd.journal.send') - def test_no_log(self, mock_func): - no_log = self.am.no_log - - self.am.no_log = True - self.am.log('unittest no_log') - self.assertFalse(mock_func.called) - - self.am.no_log = False - self.am.log('unittest no_log') - self.assertEqual(mock_func.called, 1) - - # Message - # call_args is a 2-tuple of (arg_list, kwarg_dict) - self.assertTrue(mock_func.call_args[0][0].endswith('unittest no_log'), msg='Message was not sent to log') - # log adds this journal field - self.assertIn('MODULE', mock_func.call_args[1]) - self.assertIn('basic.py', mock_func.call_args[1]['MODULE']) - - self.am.no_log = no_log - - def test_output_matches(self): - for msg, param in self.output_data.items(): - with patch('systemd.journal.send', autospec=True) as mock_func: - self.am.log(msg) - self.assertEqual(mock_func.call_count, 1, msg='journal.send not called exactly once') - self.assertTrue(mock_func.call_args[0][0].endswith(param)) - - @patch('systemd.journal.send') - def test_log_args(self, mock_func): - self.am.log('unittest log_args', log_args=dict(TEST='log unittest')) - self.assertEqual(mock_func.called, 1) - self.assertTrue(mock_func.call_args[0][0].endswith('unittest log_args'), msg='Message was not sent to log') + @pytest.mark.parametrize('no_log, stdin', (product((True, False), [{}])), indirect=['stdin']) + def test_no_log(self, am, mocker, no_log): + journal_send = mocker.patch('systemd.journal.send') + am.no_log = no_log + am.log('unittest no_log') + if no_log: + assert not journal_send.called + else: + assert journal_send.called == 1 + # Message + # call_args is a 2-tuple of (arg_list, kwarg_dict) + assert journal_send.call_args[0][0].endswith('unittest no_log'), 'Message was not sent to log' + # log adds this journal field + assert 'MODULE' in journal_send.call_args[1] + assert 'basic.py' in journal_send.call_args[1]['MODULE'] + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('msg, param, stdin', + ((m, p, {}) for m, p in OUTPUT_DATA.items()), # pylint: disable=undefined-variable + indirect=['stdin']) + def test_output_matches(self, am, mocker, msg, param): + journal_send = mocker.patch('systemd.journal.send') + am.log(msg) + assert journal_send.call_count == 1, 'journal.send not called exactly once' + assert journal_send.call_args[0][0].endswith(param) + + @pytest.mark.parametrize('stdin', ({},), indirect=['stdin']) + def test_log_args(self, am, mocker): + journal_send = mocker.patch('systemd.journal.send') + am.log('unittest log_args', log_args=dict(TEST='log unittest')) + assert journal_send.called == 1 + assert journal_send.call_args[0][0].endswith('unittest log_args'), 'Message was not sent to log' # log adds this journal field - self.assertIn('MODULE', mock_func.call_args[1]) - self.assertIn('basic.py', mock_func.call_args[1]['MODULE']) + assert 'MODULE' in journal_send.call_args[1] + assert 'basic.py' in journal_send.call_args[1]['MODULE'] # We added this journal field - self.assertIn('TEST', mock_func.call_args[1]) - self.assertIn('log unittest', mock_func.call_args[1]['TEST']) + assert 'TEST' in journal_send.call_args[1] + assert 'log unittest' in journal_send.call_args[1]['TEST'] diff --git a/test/units/module_utils/basic/test_run_command.py b/test/units/module_utils/basic/test_run_command.py index e29e2f42695..cd1971d3f36 100644 --- a/test/units/module_utils/basic/test_run_command.py +++ b/test/units/module_utils/basic/test_run_command.py @@ -1,38 +1,18 @@ # -*- coding: utf-8 -*- -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish from __future__ import (absolute_import, division) __metaclass__ = type import errno -import json -import sys -import time -from io import BytesIO, StringIO +from itertools import product +from io import BytesIO import pytest -from ansible.compat.tests import unittest -from ansible.compat.tests.mock import call, MagicMock, Mock, patch, sentinel -from ansible.module_utils.six import PY3 -import ansible.module_utils.basic - -from units.mock.procenv import swap_stdin_and_argv +from ansible.module_utils._text import to_native class OpenBytesIO(BytesIO): @@ -45,165 +25,182 @@ class OpenBytesIO(BytesIO): pass -class TestAnsibleModuleRunCommand(unittest.TestCase): - @pytest.fixture(autouse=True) - def run_command_mocked_env(self, mocker): - self.cmd_out = { - # os.read() is returning 'bytes', not strings - sentinel.stdout: BytesIO(), - sentinel.stderr: BytesIO(), - } - - def mock_os_read(fd, nbytes): - return self.cmd_out[fd].read(nbytes) - - def mock_select(rlist, wlist, xlist, timeout=1): - return (rlist, [], []) - - def mock_os_chdir(path): - if path == '/inaccessible': - raise OSError(errno.EPERM, "Permission denied: '/inaccessible'") - - def mock_os_abspath(path): - if path.startswith('/'): - return path - else: - return self.os.getcwd.return_value + '/' + path - - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap = swap_stdin_and_argv(stdin_data=args) - self.stdin_swap.__enter__() - - ansible.module_utils.basic._ANSIBLE_ARGS = None - self.module = ansible.module_utils.basic.AnsibleModule(argument_spec=dict()) - self.module.fail_json = MagicMock(side_effect=SystemExit) - - self.os = mocker.patch('ansible.module_utils.basic.os') - self.os.path.expandvars.side_effect = lambda x: x - self.os.path.expanduser.side_effect = lambda x: x - self.os.environ = {'PATH': '/bin'} - self.os.getcwd.return_value = '/home/foo' - self.os.path.isdir.return_value = True - self.os.chdir.side_effect = mock_os_chdir - self.os.read.side_effect = mock_os_read - self.os.path.abspath.side_effect = mock_os_abspath - - self.subprocess = mocker.patch('ansible.module_utils.basic.subprocess') - self.cmd = Mock() - self.cmd.returncode = 0 - self.cmd.stdin = OpenBytesIO() - self.cmd.stdout.fileno.return_value = sentinel.stdout - self.cmd.stderr.fileno.return_value = sentinel.stderr - self.subprocess.Popen.return_value = self.cmd - - self.select = mocker.patch('ansible.module_utils.basic.select') - self.select.select.side_effect = mock_select - yield - - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - self.stdin_swap.__exit__(None, None, None) - - def test_list_as_args(self): - self.module.run_command(['/bin/ls', 'a', ' b', 'c ']) - self.assertTrue(self.subprocess.Popen.called) - args, kwargs = self.subprocess.Popen.call_args - self.assertEqual(args, (['/bin/ls', 'a', ' b', 'c '], )) - self.assertEqual(kwargs['shell'], False) - - def test_str_as_args(self): - self.module.run_command('/bin/ls a " b" "c "') - self.assertTrue(self.subprocess.Popen.called) - args, kwargs = self.subprocess.Popen.call_args - self.assertEqual(args, (['/bin/ls', 'a', ' b', 'c '], )) - self.assertEqual(kwargs['shell'], False) - - def test_tuple_as_args(self): - self.assertRaises(SystemExit, self.module.run_command, ('ls', '/')) - self.assertTrue(self.module.fail_json.called) - - def test_unsafe_shell(self): - self.module.run_command('ls a " b" "c "', use_unsafe_shell=True) - self.assertTrue(self.subprocess.Popen.called) - args, kwargs = self.subprocess.Popen.call_args - self.assertEqual(args, ('ls a " b" "c "', )) - self.assertEqual(kwargs['shell'], True) - - def test_cwd(self): - self.os.getcwd.return_value = '/old' - self.module.run_command('/bin/ls', cwd='/new') - self.assertEqual(self.os.chdir.mock_calls, - [call('/new'), call('/old'), ]) - - def test_cwd_relative_path(self): - self.os.getcwd.return_value = '/old' - self.module.run_command('/bin/ls', cwd='sub-dir') - self.assertEqual(self.os.chdir.mock_calls, - [call('/old/sub-dir'), call('/old'), ]) - - def test_cwd_not_a_dir(self): - self.os.getcwd.return_value = '/old' - self.os.path.isdir.side_effect = lambda d: d != '/not-a-dir' - self.module.run_command('/bin/ls', cwd='/not-a-dir') - self.assertEqual(self.os.chdir.mock_calls, [call('/old'), ]) - - def test_cwd_inaccessible(self): - self.assertRaises(SystemExit, self.module.run_command, '/bin/ls', cwd='/inaccessible') - self.assertTrue(self.module.fail_json.called) - args, kwargs = self.module.fail_json.call_args - self.assertEqual(kwargs['rc'], errno.EPERM) - - def test_prompt_bad_regex(self): - self.assertRaises(SystemExit, self.module.run_command, 'foo', prompt_regex='[pP)assword:') - self.assertTrue(self.module.fail_json.called) - - def test_prompt_no_match(self): - self.cmd_out[sentinel.stdout] = BytesIO(b'hello') - (rc, _, _) = self.module.run_command('foo', prompt_regex='[pP]assword:') - self.assertEqual(rc, 0) - - def test_prompt_match_wo_data(self): - self.cmd_out[sentinel.stdout] = BytesIO(b'Authentication required!\nEnter password: ') - (rc, _, _) = self.module.run_command('foo', prompt_regex=r'[pP]assword:', data=None) - self.assertEqual(rc, 257) - - def test_check_rc_false(self): - self.cmd.returncode = 1 - (rc, _, _) = self.module.run_command('/bin/false', check_rc=False) - self.assertEqual(rc, 1) - - def test_check_rc_true(self): - self.cmd.returncode = 1 - self.assertRaises(SystemExit, self.module.run_command, '/bin/false', check_rc=True) - self.assertTrue(self.module.fail_json.called) - args, kwargs = self.module.fail_json.call_args - self.assertEqual(kwargs['rc'], 1) - - def test_text_stdin(self): - (rc, stdout, stderr) = self.module.run_command('/bin/foo', data='hello world') - self.assertEqual(self.cmd.stdin.getvalue(), b'hello world\n') - - def test_ascii_stdout(self): - self.cmd_out[sentinel.stdout] = BytesIO(b'hello') - (rc, stdout, stderr) = self.module.run_command('/bin/cat hello.txt') - self.assertEqual(rc, 0) +@pytest.fixture +def mock_os(mocker): + def mock_os_read(fd, nbytes): + return os._cmd_out[fd].read(nbytes) + + def mock_os_chdir(path): + if path == '/inaccessible': + raise OSError(errno.EPERM, "Permission denied: '/inaccessible'") + + def mock_os_abspath(path): + if path.startswith('/'): + return path + else: + return os.getcwd.return_value + '/' + path + + os = mocker.patch('ansible.module_utils.basic.os') + os._cmd_out = { + # os.read() is returning 'bytes', not strings + mocker.sentinel.stdout: BytesIO(), + mocker.sentinel.stderr: BytesIO(), + } + + os.path.expandvars.side_effect = lambda x: x + os.path.expanduser.side_effect = lambda x: x + os.environ = {'PATH': '/bin'} + os.getcwd.return_value = '/home/foo' + os.path.isdir.return_value = True + os.chdir.side_effect = mock_os_chdir + os.read.side_effect = mock_os_read + os.path.abspath.side_effect = mock_os_abspath + + yield os + + +@pytest.fixture +def mock_subprocess(mocker): + def mock_select(rlist, wlist, xlist, timeout=1): + return (rlist, [], []) + + fake_select = mocker.patch('ansible.module_utils.basic.select') + fake_select.select.side_effect = mock_select + + subprocess = mocker.patch('ansible.module_utils.basic.subprocess') + cmd = mocker.MagicMock() + cmd.returncode = 0 + cmd.stdin = OpenBytesIO() + cmd.stdout.fileno.return_value = mocker.sentinel.stdout + cmd.stderr.fileno.return_value = mocker.sentinel.stderr + subprocess.Popen.return_value = cmd + + yield subprocess + + +@pytest.fixture() +def rc_am(mocker, am, mock_os, mock_subprocess): + am.fail_json = mocker.MagicMock(side_effect=SystemExit) + am._os = mock_os + am._subprocess = mock_subprocess + yield am + + +class TestRunCommandArgs: + # Format is command as passed to run_command, command to Popen as list, command to Popen as string + ARGS_DATA = ( + (['/bin/ls', 'a', 'b', 'c'], ['/bin/ls', 'a', 'b', 'c'], '/bin/ls a b c'), + ('/bin/ls a " b" "c "', ['/bin/ls', 'a', ' b', 'c '], '/bin/ls a " b" "c "'), + ) + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + # pylint: disable=undefined-variable + @pytest.mark.parametrize('cmd, expected, shell, stdin', + ((arg, cmd_str if sh else cmd_lst, sh, {}) + for (arg, cmd_lst, cmd_str), sh in product(ARGS_DATA, (True, False))), + indirect=['stdin']) + def test_args(self, cmd, expected, shell, rc_am): + rc_am.run_command(cmd, use_unsafe_shell=shell) + assert rc_am._subprocess.Popen.called + args, kwargs = rc_am._subprocess.Popen.call_args + assert args == (expected, ) + assert kwargs['shell'] == shell + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_tuple_as_args(self, rc_am): + with pytest.raises(SystemExit): + rc_am.run_command(('ls', '/')) + assert rc_am.fail_json.called + + +class TestRunCommandCwd: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_cwd(self, mocker, rc_am): + rc_am._os.getcwd.return_value = '/old' + rc_am.run_command('/bin/ls', cwd='/new') + assert rc_am._os.chdir.mock_calls == [mocker.call('/new'), mocker.call('/old'), ] + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_cwd_relative_path(self, mocker, rc_am): + rc_am._os.getcwd.return_value = '/old' + rc_am.run_command('/bin/ls', cwd='sub-dir') + assert rc_am._os.chdir.mock_calls == [mocker.call('/old/sub-dir'), mocker.call('/old'), ] + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_cwd_not_a_dir(self, mocker, rc_am): + rc_am._os.getcwd.return_value = '/old' + rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir' + rc_am.run_command('/bin/ls', cwd='/not-a-dir') + assert rc_am._os.chdir.mock_calls == [mocker.call('/old'), ] + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_cwd_inaccessible(self, rc_am): + with pytest.raises(SystemExit): + rc_am.run_command('/bin/ls', cwd='/inaccessible') + assert rc_am.fail_json.called + args, kwargs = rc_am.fail_json.call_args + assert kwargs['rc'] == errno.EPERM + + +class TestRunCommandPrompt: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_prompt_bad_regex(self, rc_am): + with pytest.raises(SystemExit): + rc_am.run_command('foo', prompt_regex='[pP)assword:') + assert rc_am.fail_json.called + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_prompt_no_match(self, mocker, rc_am): + rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello') + (rc, _, _) = rc_am.run_command('foo', prompt_regex='[pP]assword:') + assert rc == 0 + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_prompt_match_wo_data(self, mocker, rc_am): + rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'Authentication required!\nEnter password: ') + (rc, _, _) = rc_am.run_command('foo', prompt_regex=r'[pP]assword:', data=None) + assert rc == 257 + + +class TestRunCommandRc: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_check_rc_false(self, rc_am): + rc_am._subprocess.Popen.return_value.returncode = 1 + (rc, _, _) = rc_am.run_command('/bin/false', check_rc=False) + assert rc == 1 + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_check_rc_true(self, rc_am): + rc_am._subprocess.Popen.return_value.returncode = 1 + with pytest.raises(SystemExit): + rc_am.run_command('/bin/false', check_rc=True) + assert rc_am.fail_json.called + args, kwargs = rc_am.fail_json.call_args + assert kwargs['rc'] == 1 + + +class TestRunCommandOutput: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_text_stdin(self, rc_am): + (rc, stdout, stderr) = rc_am.run_command('/bin/foo', data='hello world') + assert rc_am._subprocess.Popen.return_value.stdin.getvalue() == b'hello world\n' + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_ascii_stdout(self, mocker, rc_am): + rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello') + (rc, stdout, stderr) = rc_am.run_command('/bin/cat hello.txt') + assert rc == 0 # module_utils function. On py3 it returns text and py2 it returns # bytes because it's returning native strings - if PY3: - self.assertEqual(stdout, u'hello') - else: - self.assertEqual(stdout, b'hello') - - def test_utf8_output(self): - self.cmd_out[sentinel.stdout] = BytesIO(u'Žarn§'.encode('utf-8')) - self.cmd_out[sentinel.stderr] = BytesIO(u'لرئيسية'.encode('utf-8')) - (rc, stdout, stderr) = self.module.run_command('/bin/something_ugly') - self.assertEqual(rc, 0) + assert stdout == 'hello' + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_utf8_output(self, mocker, rc_am): + rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(u'Žarn§'.encode('utf-8')) + rc_am._os._cmd_out[mocker.sentinel.stderr] = BytesIO(u'لرئيسية'.encode('utf-8')) + (rc, stdout, stderr) = rc_am.run_command('/bin/something_ugly') + assert rc == 0 # module_utils function. On py3 it returns text and py2 it returns # bytes because it's returning native strings - if PY3: - self.assertEqual(stdout, u'Žarn§') - self.assertEqual(stderr, u'لرئيسية') - else: - self.assertEqual(stdout.decode('utf-8'), u'Žarn§') - self.assertEqual(stderr.decode('utf-8'), u'لرئيسية') + assert stdout == to_native(u'Žarn§') + assert stderr == to_native(u'لرئيسية') diff --git a/test/units/module_utils/basic/test_safe_eval.py b/test/units/module_utils/basic/test_safe_eval.py index cba025755e7..a19266eba6b 100644 --- a/test/units/module_utils/basic/test_safe_eval.py +++ b/test/units/module_utils/basic/test_safe_eval.py @@ -1,99 +1,70 @@ # -*- coding: utf-8 -*- -# (c) 2015-2016, Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# (c) 2015-2017, Toshio Kuratomi +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish from __future__ import (absolute_import, division) __metaclass__ = type -import sys -import json - -from ansible.compat.tests import unittest -from units.mock.procenv import ModuleTestCase -from units.mock.generator import add_method +from itertools import chain +import pytest # Strings that should be converted into a typed value VALID_STRINGS = ( - [("'a'", 'a')], - [("'1'", '1')], - [("1", 1)], - [("True", True)], - [("False", False)], - [("{}", {})], + ("'a'", 'a'), + ("'1'", '1'), + ("1", 1), + ("True", True), + ("False", False), + ("{}", {}), ) # Passing things that aren't strings should just return the object NONSTRINGS = ( - [({'a': 1}, {'a': 1})], + ({'a': 1}, {'a': 1}), ) # These strings are not basic types. For security, these should not be # executed. We return the same string and get an exception for some INVALID_STRINGS = ( - [("a=1", "a=1", SyntaxError)], - [("a.foo()", "a.foo()", None)], - [("import foo", "import foo", None)], - [("__import__('foo')", "__import__('foo')", ValueError)], + ("a=1", "a=1", SyntaxError), + ("a.foo()", "a.foo()", None), + ("import foo", "import foo", None), + ("__import__('foo')", "__import__('foo')", ValueError), ) -def _check_simple_types(self, code, expected): +@pytest.mark.parametrize('code, expected, stdin', + ((c, e, {}) for c, e in chain(VALID_STRINGS, NONSTRINGS)), + indirect=['stdin']) +def test_simple_types(am, code, expected): # test some basic usage for various types - self.assertEqual(self.am.safe_eval(code), expected) + assert am.safe_eval(code) == expected -def _check_simple_types_with_exceptions(self, code, expected): +@pytest.mark.parametrize('code, expected, stdin', + ((c, e, {}) for c, e in chain(VALID_STRINGS, NONSTRINGS)), + indirect=['stdin']) +def test_simple_types_with_exceptions(am, code, expected): # Test simple types with exceptions requested - self.assertEqual(self.am.safe_eval(code, include_exceptions=True), (expected, None)) - - -def _check_invalid_strings(self, code, expected): - self.assertEqual(self.am.safe_eval(code), expected) - - -def _check_invalid_strings_with_exceptions(self, code, expected, exception): - res = self.am.safe_eval("a=1", include_exceptions=True) - self.assertEqual(res[0], "a=1") - self.assertEqual(type(res[1]), SyntaxError) - - -@add_method(_check_simple_types, *VALID_STRINGS) -@add_method(_check_simple_types, *NONSTRINGS) -@add_method(_check_simple_types_with_exceptions, *VALID_STRINGS) -@add_method(_check_simple_types_with_exceptions, *NONSTRINGS) -@add_method(_check_invalid_strings, *[[i[0][0:-1]] for i in INVALID_STRINGS]) -@add_method(_check_invalid_strings_with_exceptions, *INVALID_STRINGS) -class TestSafeEval(ModuleTestCase): - - def setUp(self): - super(TestSafeEval, self).setUp() - - from ansible.module_utils import basic - self.old_ansible_args = basic._ANSIBLE_ARGS - - basic._ANSIBLE_ARGS = None - self.am = basic.AnsibleModule( - argument_spec=dict(), - ) - - def tearDown(self): - super(TestSafeEval, self).tearDown() - - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = self.old_ansible_args + assert am.safe_eval(code, include_exceptions=True), (expected, None) + + +@pytest.mark.parametrize('code, expected, stdin', + ((c, e, {}) for c, e, dummy in INVALID_STRINGS), + indirect=['stdin']) +def test_invalid_strings(am, code, expected): + assert am.safe_eval(code) == expected + + +@pytest.mark.parametrize('code, expected, exception, stdin', + ((c, e, ex, {}) for c, e, ex in INVALID_STRINGS), + indirect=['stdin']) +def test_invalid_strings_with_exceptions(am, code, expected, exception): + res = am.safe_eval(code, include_exceptions=True) + assert res[0] == expected + if exception is None: + assert res[1] == exception + else: + assert type(res[1]) == exception diff --git a/test/units/module_utils/basic/test_set_mode_if_different.py b/test/units/module_utils/basic/test_set_mode_if_different.py index ac74a6f06ee..a184e3a6447 100644 --- a/test/units/module_utils/basic/test_set_mode_if_different.py +++ b/test/units/module_utils/basic/test_set_mode_if_different.py @@ -1,144 +1,147 @@ # -*- coding: utf-8 -*- # (c) 2016, Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish from __future__ import (absolute_import, division, print_function) __metaclass__ = type import os -import pytest +from itertools import product try: import builtins except ImportError: import __builtin__ as builtins -from ansible.compat.tests import unittest -from ansible.compat.tests.mock import patch, MagicMock -from ansible.module_utils import known_hosts - -from units.mock.procenv import ModuleTestCase -from units.mock.generator import add_method +import pytest -class TestSetModeIfDifferentBase(ModuleTestCase): +SYNONYMS_0660 = ( + 0o660, + '0o660', + '660', + 'u+rw-x,g+rw-x,o-rwx', + 'u=rw,g=rw,o-rwx', +) - def setUp(self): - self.mock_stat1 = MagicMock() - self.mock_stat1.st_mode = 0o444 - self.mock_stat2 = MagicMock() - self.mock_stat2.st_mode = 0o660 - super(TestSetModeIfDifferentBase, self).setUp() - from ansible.module_utils import basic - self.old_ANSIBLE_ARGS = basic._ANSIBLE_ARGS - basic._ANSIBLE_ARGS = None +@pytest.fixture +def mock_stats(mocker): + mock_stat1 = mocker.MagicMock() + mock_stat1.st_mode = 0o444 + mock_stat2 = mocker.MagicMock() + mock_stat2.st_mode = 0o660 + yield {"before": mock_stat1, "after": mock_stat2} - self.am = basic.AnsibleModule( - argument_spec=dict(), - ) - def tearDown(self): - super(TestSetModeIfDifferentBase, self).tearDown() - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = self.old_ANSIBLE_ARGS +@pytest.fixture +def am_check_mode(am): + am.check_mode = True + yield am + am.check_mode = False -def _check_no_mode_given_returns_previous_changes(self, previous_changes=True): - with patch('os.lstat', side_effect=[self.mock_stat1]): +@pytest.fixture +def mock_lchmod(mocker): + m_lchmod = mocker.patch('ansible.module_utils.basic.os.lchmod', return_value=None, create=True) + yield m_lchmod - self.assertEqual(self.am.set_mode_if_different('/path/to/file', None, previous_changes), previous_changes) +@pytest.mark.parametrize('previous_changes, check_mode, stdin', + product((True, False), (True, False), ({},)), + indirect=['stdin']) +def test_no_mode_given_returns_previous_changes(am, mock_stats, mock_lchmod, mocker, previous_changes, check_mode): + am.check_mode = check_mode + mocker.patch('os.lstat', side_effect=[mock_stats['before']]) + m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True) -def _check_mode_changed_to_0660(self, mode): - # Note: This is for checking that all the different ways of specifying - # 0660 mode work. It cannot be used to check that setting a mode that is - # not equivalent to 0660 works. - with patch('os.lstat', side_effect=[self.mock_stat1, self.mock_stat2, self.mock_stat2]) as m_lstat: - with patch('os.lchmod', return_value=None, create=True) as m_lchmod: - self.assertEqual(self.am.set_mode_if_different('/path/to/file', mode, False), True) - m_lchmod.assert_called_with(b'/path/to/file', 0o660) + assert am.set_mode_if_different('/path/to/file', None, previous_changes) == previous_changes + assert not m_lchmod.called -def _check_mode_unchanged_when_already_0660(self, mode): +@pytest.mark.parametrize('mode, check_mode, stdin', + product(SYNONYMS_0660, (True, False), ({},)), + indirect=['stdin']) +def test_mode_changed_to_0660(am, mock_stats, mocker, mode, check_mode): # Note: This is for checking that all the different ways of specifying # 0660 mode work. It cannot be used to check that setting a mode that is # not equivalent to 0660 works. - with patch('os.lstat', side_effect=[self.mock_stat2, self.mock_stat2, self.mock_stat2]) as m_lstat: - self.assertEqual(self.am.set_mode_if_different('/path/to/file', mode, False), False) - + am.check_mode = check_mode + mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after'], mock_stats['after']]) + m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True) -SYNONYMS_0660 = ( - [[0o660]], - [['0o660']], - [['660']], -) + assert am.set_mode_if_different('/path/to/file', mode, False) + if check_mode: + assert not m_lchmod.called + else: + m_lchmod.assert_called_with(b'/path/to/file', 0o660) -@add_method(_check_no_mode_given_returns_previous_changes, [dict(previous_changes=True)], [dict(previous_changes=False)], ) -@add_method(_check_mode_changed_to_0660, *SYNONYMS_0660) -@add_method(_check_mode_unchanged_when_already_0660, *SYNONYMS_0660) -class TestSetModeIfDifferent(TestSetModeIfDifferentBase): - def test_module_utils_basic_ansible_module_set_mode_if_different(self): - with patch('os.lstat') as m: - with patch('os.lchmod', return_value=None, create=True) as m_os: - m.side_effect = [self.mock_stat1, self.mock_stat2, self.mock_stat2] - self.am._symbolic_mode_to_octal = MagicMock(side_effect=Exception) - with pytest.raises(SystemExit): - self.am.set_mode_if_different('/path/to/file', 'o+w,g+w,a-r', False) - - original_hasattr = hasattr - - def _hasattr(obj, name): - if obj == os and name == 'lchmod': - return False - return original_hasattr(obj, name) - - # FIXME: this isn't working yet - with patch('os.lstat', side_effect=[self.mock_stat1, self.mock_stat2]): - with patch.object(builtins, 'hasattr', side_effect=_hasattr): - with patch('os.path.islink', return_value=False): - with patch('os.chmod', return_value=None) as m_chmod: - self.assertEqual(self.am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False), True) - with patch('os.lstat', side_effect=[self.mock_stat1, self.mock_stat2]): - with patch.object(builtins, 'hasattr', side_effect=_hasattr): - with patch('os.path.islink', return_value=True): - with patch('os.chmod', return_value=None) as m_chmod: - with patch('os.stat', return_value=self.mock_stat2): - self.assertEqual(self.am.set_mode_if_different('/path/to/file', 0o660, False), True) - - -def _check_knows_to_change_to_0660_in_check_mode(self, mode): +@pytest.mark.parametrize('mode, check_mode, stdin', + product(SYNONYMS_0660, (True, False), ({},)), + indirect=['stdin']) +def test_mode_unchanged_when_already_0660(am, mock_stats, mocker, mode, check_mode): # Note: This is for checking that all the different ways of specifying # 0660 mode work. It cannot be used to check that setting a mode that is # not equivalent to 0660 works. - with patch('os.lstat', side_effect=[self.mock_stat1, self.mock_stat2, self.mock_stat2]) as m_lstat: - self.assertEqual(self.am.set_mode_if_different('/path/to/file', mode, False), True) - - -@add_method(_check_no_mode_given_returns_previous_changes, [dict(previous_changes=True)], [dict(previous_changes=False)],) -@add_method(_check_knows_to_change_to_0660_in_check_mode, *SYNONYMS_0660) -@add_method(_check_mode_unchanged_when_already_0660, *SYNONYMS_0660) -class TestSetModeIfDifferentWithCheckMode(TestSetModeIfDifferentBase): - def setUp(self): - super(TestSetModeIfDifferentWithCheckMode, self).setUp() - self.am.check_mode = True - - def tearDown(self): - super(TestSetModeIfDifferentWithCheckMode, self).tearDown() - self.am.check_mode = False + am.check_mode = check_mode + mocker.patch('os.lstat', side_effect=[mock_stats['after'], mock_stats['after'], mock_stats['after']]) + m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True) + + assert not am.set_mode_if_different('/path/to/file', mode, False) + assert not m_lchmod.called + + +@pytest.mark.parametrize('check_mode, stdin', + product((True, False), ({},)), + indirect=['stdin']) +def test_missing_lchmod_is_not_link(am, mock_stats, mocker, check_mode): + """Some platforms have lchmod (*BSD) others do not (Linux)""" + + am.check_mode = check_mode + original_hasattr = hasattr + + def _hasattr(obj, name): + if obj == os and name == 'lchmod': + return False + return original_hasattr(obj, name) + + mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after']]) + mocker.patch.object(builtins, 'hasattr', side_effect=_hasattr) + mocker.patch('os.path.islink', return_value=False) + m_chmod = mocker.patch('os.chmod', return_value=None) + + assert am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False) + if check_mode: + assert not m_chmod.called + else: + m_chmod.assert_called_with(b'/path/to/file/no_lchmod', 0o660) + + +@pytest.mark.parametrize('check_mode, stdin', + product((True, False), ({},)), + indirect=['stdin']) +def test_missing_lchmod_is_link(am, mock_stats, mocker, check_mode): + """Some platforms have lchmod (*BSD) others do not (Linux)""" + + am.check_mode = check_mode + original_hasattr = hasattr + + def _hasattr(obj, name): + if obj == os and name == 'lchmod': + return False + return original_hasattr(obj, name) + + mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after']]) + mocker.patch.object(builtins, 'hasattr', side_effect=_hasattr) + mocker.patch('os.path.islink', return_value=True) + m_chmod = mocker.patch('os.chmod', return_value=None) + mocker.patch('os.stat', return_value=mock_stats['after']) + + assert am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False) + if check_mode: + assert not m_chmod.called + else: + m_chmod.assert_called_with(b'/path/to/file/no_lchmod', 0o660) diff --git a/test/units/module_utils/conftest.py b/test/units/module_utils/conftest.py new file mode 100644 index 00000000000..8da7df81ec2 --- /dev/null +++ b/test/units/module_utils/conftest.py @@ -0,0 +1,65 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import json +import sys +from collections import MutableMapping +from io import BytesIO + +import pytest + +import ansible.module_utils.basic +from ansible.module_utils.six import PY3, string_types +from ansible.module_utils._text import to_bytes + + +@pytest.fixture +def stdin(mocker, request): + old_args = ansible.module_utils.basic._ANSIBLE_ARGS + ansible.module_utils.basic._ANSIBLE_ARGS = None + old_argv = sys.argv + sys.argv = ['ansible_unittest'] + + if isinstance(request.param, string_types): + args = request.param + elif isinstance(request.param, MutableMapping): + if 'ANSIBLE_MODULE_ARGS' not in request.param: + request.param = {'ANSIBLE_MODULE_ARGS': request.param} + args = json.dumps(request.param) + else: + raise Exception('Malformed data to the stdin pytest fixture') + + fake_stdin = BytesIO(to_bytes(args, errors='surrogate_or_strict')) + if PY3: + mocker.patch('ansible.module_utils.basic.sys.stdin', mocker.MagicMock()) + mocker.patch('ansible.module_utils.basic.sys.stdin.buffer', fake_stdin) + else: + mocker.patch('ansible.module_utils.basic.sys.stdin', fake_stdin) + + yield fake_stdin + + ansible.module_utils.basic._ANSIBLE_ARGS = old_args + sys.argv = old_argv + + +@pytest.fixture +def am(stdin, request): + old_args = ansible.module_utils.basic._ANSIBLE_ARGS + ansible.module_utils.basic._ANSIBLE_ARGS = None + old_argv = sys.argv + sys.argv = ['ansible_unittest'] + + argspec = {} + if hasattr(request, 'param'): + if isinstance(request.param, dict): + argspec = request.param + + am = ansible.module_utils.basic.AnsibleModule( + argument_spec=argspec, + ) + am._name = 'ansible_unittest' + + yield am + + ansible.module_utils.basic._ANSIBLE_ARGS = old_args + sys.argv = old_argv diff --git a/test/units/module_utils/test_distribution_version.py b/test/units/module_utils/test_distribution_version.py index 68dddd9d8bd..06d9162000b 100644 --- a/test/units/module_utils/test_distribution_version.py +++ b/test/units/module_utils/test_distribution_version.py @@ -1,36 +1,18 @@ # -*- coding: utf-8 -*- -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# Copyright: (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -# Make coding more python3-ish -from __future__ import (absolute_import, division) +from __future__ import absolute_import, division, print_function __metaclass__ = type +from itertools import product -# to work around basic.py reading stdin -import json import pytest -from units.mock.procenv import swap_stdin_and_argv - -# for testing -from ansible.compat.tests.mock import patch - -# the module we are actually testing (sort of +# the module we are actually testing (sort of) from ansible.module_utils.facts.system.distribution import DistributionFactCollector + # to generate the testcase data, you can use the script gen_distribution_version_testcase.py in hacking/tests TESTSETS = [ { @@ -880,8 +862,8 @@ DISTRIB_DESCRIPTION="CoreOS 976.0.0 (Coeur Rouge)" ] -@pytest.mark.parametrize("testcase", TESTSETS, ids=lambda x: x['name']) -def test_distribution_version(testcase): +@pytest.mark.parametrize("stdin, testcase", product([{}], TESTSETS), ids=lambda x: x['name'], indirect=['stdin']) +def test_distribution_version(am, mocker, testcase): """tests the distribution parsing code of the Facts class testsets have @@ -891,26 +873,10 @@ def test_distribution_version(testcase): * all files that are not listed here are assumed to not exist at all * the output of pythons platform.dist() * results for the ansible variables distribution* and os_family - """ - - from ansible.module_utils import basic - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - module = basic.AnsibleModule(argument_spec=dict()) - - _test_one_distribution(module, testcase) - - -def _test_one_distribution(module, testcase): - """run the test on one distribution testcase - - * prepare some mock functions to get the testdata in - * run Facts() - * compare with the expected output """ + # prepare some mock functions to get the testdata in def mock_get_file_content(fname, default=None, strip=True): """give fake content if it exists, otherwise pretend the file is empty""" data = default @@ -922,7 +888,7 @@ def _test_one_distribution(module, testcase): data = data.strip() return data - def mock_get_uname_version(module): + def mock_get_uname_version(am): return testcase.get('uname_v', None) def mock_file_exists(fname, allow_empty=False): @@ -942,19 +908,19 @@ def _test_one_distribution(module, testcase): def mock_platform_version(): return testcase.get('platform.version', '') - @patch('ansible.module_utils.facts.system.distribution.get_file_content', mock_get_file_content) - @patch('ansible.module_utils.facts.system.distribution.get_uname_version', mock_get_uname_version) - @patch('ansible.module_utils.facts.system.distribution._file_exists', mock_file_exists) - @patch('platform.dist', lambda: testcase['platform.dist']) - @patch('platform.system', mock_platform_system) - @patch('platform.release', mock_platform_release) - @patch('platform.version', mock_platform_version) - def get_facts(testcase): - distro_collector = DistributionFactCollector() - res = distro_collector.collect(module) - return res + mocker.patch('ansible.module_utils.facts.system.distribution.get_file_content', mock_get_file_content) + mocker.patch('ansible.module_utils.facts.system.distribution.get_uname_version', mock_get_uname_version) + mocker.patch('ansible.module_utils.facts.system.distribution._file_exists', mock_file_exists) + mocker.patch('platform.dist', lambda: testcase['platform.dist']) + mocker.patch('platform.system', mock_platform_system) + mocker.patch('platform.release', mock_platform_release) + mocker.patch('platform.version', mock_platform_version) + + # run Facts() + distro_collector = DistributionFactCollector() + generated_facts = distro_collector.collect(am) - generated_facts = get_facts(testcase) + # compare with the expected output # testcase['result'] has a list of variables and values it expects Facts() to set for key, val in testcase['result'].items(): diff --git a/test/units/module_utils/test_known_hosts.py b/test/units/module_utils/test_known_hosts.py index 7584736dbbd..b9271606958 100644 --- a/test/units/module_utils/test_known_hosts.py +++ b/test/units/module_utils/test_known_hosts.py @@ -1,135 +1,119 @@ # -*- coding: utf-8 -*- # (c) 2015, Michael Scherer -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type import json import os.path -import ansible.module_utils.basic -from ansible.compat.tests import unittest -from ansible.compat.tests.mock import Mock, patch +import pytest + from ansible.module_utils import known_hosts -from units.mock.procenv import swap_stdin_and_argv - - -class TestAnsibleModuleKnownHosts(unittest.TestCase): - urls = { - 'ssh://one.example.org/example.git': { - 'is_ssh_url': True, - 'get_fqdn': 'one.example.org', - 'add_host_key_cmd': " -t rsa one.example.org", - 'port': None, - }, - 'ssh+git://two.example.org/example.git': { - 'is_ssh_url': True, - 'get_fqdn': 'two.example.org', - 'add_host_key_cmd': " -t rsa two.example.org", - 'port': None, - }, - 'rsync://three.example.org/user/example.git': { - 'is_ssh_url': False, - 'get_fqdn': 'three.example.org', - 'add_host_key_cmd': None, # not called for non-ssh urls - 'port': None, - }, - 'git@four.example.org:user/example.git': { - 'is_ssh_url': True, - 'get_fqdn': 'four.example.org', - 'add_host_key_cmd': " -t rsa four.example.org", - 'port': None, - }, - 'git+ssh://five.example.org/example.git': { - 'is_ssh_url': True, - 'get_fqdn': 'five.example.org', - 'add_host_key_cmd': " -t rsa five.example.org", - 'port': None, - }, - 'ssh://six.example.org:21/example.org': { - # ssh on FTP Port? - 'is_ssh_url': True, - 'get_fqdn': 'six.example.org', - 'add_host_key_cmd': " -t rsa -p 21 six.example.org", - 'port': '21', - }, - 'ssh://[2001:DB8::abcd:abcd]/example.git': { - 'is_ssh_url': True, - 'get_fqdn': '[2001:DB8::abcd:abcd]', - 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", - 'port': None, - }, - 'ssh://[2001:DB8::abcd:abcd]:22/example.git': { - 'is_ssh_url': True, - 'get_fqdn': '[2001:DB8::abcd:abcd]', - 'add_host_key_cmd': " -t rsa -p 22 [2001:DB8::abcd:abcd]", - 'port': '22', - }, - 'username@[2001:DB8::abcd:abcd]/example.git': { - 'is_ssh_url': True, - 'get_fqdn': '[2001:DB8::abcd:abcd]', - 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", - 'port': None, - }, - 'username@[2001:DB8::abcd:abcd]:path/example.git': { - 'is_ssh_url': True, - 'get_fqdn': '[2001:DB8::abcd:abcd]', - 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", - 'port': None, - }, - 'ssh://internal.git.server:7999/repos/repo.git': { - 'is_ssh_url': True, - 'get_fqdn': 'internal.git.server', - 'add_host_key_cmd': " -t rsa -p 7999 internal.git.server", - 'port': '7999', - }, - } - - def test_is_ssh_url(self): - for u in self.urls: - self.assertEqual(known_hosts.is_ssh_url(u), self.urls[u]['is_ssh_url']) - - def test_get_fqdn_and_port(self): - for u in self.urls: - self.assertEqual(known_hosts.get_fqdn_and_port(u), (self.urls[u]['get_fqdn'], self.urls[u]['port'])) - - def test_add_host_key(self): - - # Copied - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually - - with swap_stdin_and_argv(stdin_data=args): - ansible.module_utils.basic._ANSIBLE_ARGS = None - self.module = ansible.module_utils.basic.AnsibleModule(argument_spec=dict()) - - get_bin_path = Mock() - get_bin_path.return_value = keyscan_cmd = "/custom/path/ssh-keyscan" - self.module.get_bin_path = get_bin_path - - run_command = Mock() - run_command.return_value = (0, "Needs output, otherwise thinks ssh-keyscan timed out'", "") - self.module.run_command = run_command - - append_to_file = Mock() - append_to_file.return_value = (None,) - self.module.append_to_file = append_to_file - - with patch('os.path.isdir', return_value=True): - with patch('os.path.exists', return_value=True): - for u in self.urls: - if self.urls[u]['is_ssh_url']: - known_hosts.add_host_key(self.module, self.urls[u]['get_fqdn'], port=self.urls[u]['port']) - run_command.assert_called_with(keyscan_cmd + self.urls[u]['add_host_key_cmd']) + + +URLS = { + 'ssh://one.example.org/example.git': { + 'is_ssh_url': True, + 'get_fqdn': 'one.example.org', + 'add_host_key_cmd': " -t rsa one.example.org", + 'port': None, + }, + 'ssh+git://two.example.org/example.git': { + 'is_ssh_url': True, + 'get_fqdn': 'two.example.org', + 'add_host_key_cmd': " -t rsa two.example.org", + 'port': None, + }, + 'rsync://three.example.org/user/example.git': { + 'is_ssh_url': False, + 'get_fqdn': 'three.example.org', + 'add_host_key_cmd': None, # not called for non-ssh urls + 'port': None, + }, + 'git@four.example.org:user/example.git': { + 'is_ssh_url': True, + 'get_fqdn': 'four.example.org', + 'add_host_key_cmd': " -t rsa four.example.org", + 'port': None, + }, + 'git+ssh://five.example.org/example.git': { + 'is_ssh_url': True, + 'get_fqdn': 'five.example.org', + 'add_host_key_cmd': " -t rsa five.example.org", + 'port': None, + }, + 'ssh://six.example.org:21/example.org': { + # ssh on FTP Port? + 'is_ssh_url': True, + 'get_fqdn': 'six.example.org', + 'add_host_key_cmd': " -t rsa -p 21 six.example.org", + 'port': '21', + }, + 'ssh://[2001:DB8::abcd:abcd]/example.git': { + 'is_ssh_url': True, + 'get_fqdn': '[2001:DB8::abcd:abcd]', + 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", + 'port': None, + }, + 'ssh://[2001:DB8::abcd:abcd]:22/example.git': { + 'is_ssh_url': True, + 'get_fqdn': '[2001:DB8::abcd:abcd]', + 'add_host_key_cmd': " -t rsa -p 22 [2001:DB8::abcd:abcd]", + 'port': '22', + }, + 'username@[2001:DB8::abcd:abcd]/example.git': { + 'is_ssh_url': True, + 'get_fqdn': '[2001:DB8::abcd:abcd]', + 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", + 'port': None, + }, + 'username@[2001:DB8::abcd:abcd]:path/example.git': { + 'is_ssh_url': True, + 'get_fqdn': '[2001:DB8::abcd:abcd]', + 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", + 'port': None, + }, + 'ssh://internal.git.server:7999/repos/repo.git': { + 'is_ssh_url': True, + 'get_fqdn': 'internal.git.server', + 'add_host_key_cmd': " -t rsa -p 7999 internal.git.server", + 'port': '7999', + }, +} + + +@pytest.mark.parametrize('url, is_ssh_url', ((k, v['is_ssh_url']) for k, v in URLS.items())) +def test_is_ssh_url(url, is_ssh_url): + assert known_hosts.is_ssh_url(url) == is_ssh_url + + +@pytest.mark.parametrize('url, fqdn, port', ((k, v['get_fqdn'], v['port']) for k, v in URLS.items())) +def test_get_fqdn_and_port(url, fqdn, port): + assert known_hosts.get_fqdn_and_port(url) == (fqdn, port) + + +@pytest.mark.parametrize('fqdn, port, add_host_key_cmd, stdin', + ((v['get_fqdn'], v['port'], v['add_host_key_cmd'], {}) + for v in URLS.values() if v['is_ssh_url']), + indirect=['stdin']) +def test_add_host_key(am, mocker, fqdn, port, add_host_key_cmd): + get_bin_path = mocker.MagicMock() + get_bin_path.return_value = keyscan_cmd = "/custom/path/ssh-keyscan" + am.get_bin_path = get_bin_path + + run_command = mocker.MagicMock() + run_command.return_value = (0, "Needs output, otherwise thinks ssh-keyscan timed out'", "") + am.run_command = run_command + + append_to_file = mocker.MagicMock() + append_to_file.return_value = (None,) + am.append_to_file = append_to_file + + mocker.patch('os.path.isdir', return_value=True) + mocker.patch('os.path.exists', return_value=True) + + known_hosts.add_host_key(am, fqdn, port=port) + run_command.assert_called_with(keyscan_cmd + add_host_key_cmd) diff --git a/test/units/module_utils/test_postgresql.py b/test/units/module_utils/test_postgresql.py index 481b312a531..7823cf92ab9 100644 --- a/test/units/module_utils/test_postgresql.py +++ b/test/units/module_utils/test_postgresql.py @@ -1,13 +1,13 @@ -import json +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + import sys from ansible.compat.tests import unittest from ansible.compat.tests.mock import patch, MagicMock -from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.six.moves import builtins from ansible.module_utils._text import to_native -from units.mock.procenv import swap_stdin_and_argv import pprint diff --git a/test/units/module_utils/test_text.py b/test/units/module_utils/test_text.py index 4997a183769..492aa3e9443 100644 --- a/test/units/module_utils/test_text.py +++ b/test/units/module_utils/test_text.py @@ -1,34 +1,23 @@ # -*- coding: utf-8 -*- # (c) 2016 Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish from __future__ import (absolute_import, division) __metaclass__ = type -from ansible.compat.tests import unittest -from ansible.module_utils.six import PY3 -from units.mock.generator import add_method +import itertools + +import pytest +from ansible.module_utils.six import PY3 # Internal API while this is still being developed. Eventually move to -# module_utils.text +# module_utils.common.text from ansible.module_utils._text import to_text, to_bytes, to_native + # Format: byte representation, text representation, encoding of byte representation VALID_STRINGS = ( (b'abcde', u'abcde', 'ascii'), @@ -40,29 +29,25 @@ VALID_STRINGS = ( ) -def _check_to_text(self, in_string, encoding, expected): +@pytest.mark.parametrize('in_string, encoding, expected', + itertools.chain(((d[0], d[2], d[1]) for d in VALID_STRINGS), + ((d[1], d[2], d[1]) for d in VALID_STRINGS))) +def test_to_text(in_string, encoding, expected): """test happy path of decoding to text""" - self.assertEqual(to_text(in_string, encoding), expected) + assert to_text(in_string, encoding) == expected -def _check_to_bytes(self, in_string, encoding, expected): +@pytest.mark.parametrize('in_string, encoding, expected', + itertools.chain(((d[0], d[2], d[0]) for d in VALID_STRINGS), + ((d[1], d[2], d[0]) for d in VALID_STRINGS))) +def test_to_bytes(in_string, encoding, expected): """test happy path of encoding to bytes""" - self.assertEqual(to_bytes(in_string, encoding), expected) + assert to_bytes(in_string, encoding) == expected -def _check_to_native(self, in_string, encoding, py2_expected, py3_expected): +@pytest.mark.parametrize('in_string, encoding, expected', + itertools.chain(((d[0], d[2], d[1] if PY3 else d[0]) for d in VALID_STRINGS), + ((d[1], d[2], d[1] if PY3 else d[0]) for d in VALID_STRINGS))) +def test_to_native(in_string, encoding, expected): """test happy path of encoding to native strings""" - if PY3: - self.assertEqual(to_native(in_string, encoding), py3_expected) - else: - self.assertEqual(to_native(in_string, encoding), py2_expected) - - -@add_method(_check_to_text, [(i[0], i[2], i[1]) for i in VALID_STRINGS]) -@add_method(_check_to_text, [(i[1], i[2], i[1]) for i in VALID_STRINGS]) -@add_method(_check_to_bytes, [(i[0], i[2], i[0]) for i in VALID_STRINGS]) -@add_method(_check_to_bytes, [(i[1], i[2], i[0]) for i in VALID_STRINGS]) -@add_method(_check_to_native, [(i[0], i[2], i[0], i[1]) for i in VALID_STRINGS]) -@add_method(_check_to_native, [(i[1], i[2], i[0], i[1]) for i in VALID_STRINGS]) -class TestModuleUtilsText(unittest.TestCase): - pass + assert to_native(in_string, encoding) == expected diff --git a/test/units/modules/conftest.py b/test/units/modules/conftest.py new file mode 100644 index 00000000000..68dc0cbedf3 --- /dev/null +++ b/test/units/modules/conftest.py @@ -0,0 +1,24 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import json +from collections import MutableMapping + +import pytest + +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_bytes + + +@pytest.fixture +def patch_ansible_module(request, mocker): + if isinstance(request.param, string_types): + args = request.param + elif isinstance(request.param, MutableMapping): + if 'ANSIBLE_MODULE_ARGS' not in request.param: + request.param = {'ANSIBLE_MODULE_ARGS': request.param} + args = json.dumps(request.param) + else: + raise Exception('Malformed data to the patch_ansible_module pytest fixture') + + mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args)) diff --git a/test/units/modules/packaging/language/test_pip.py b/test/units/modules/packaging/language/test_pip.py index 92e3661ae58..47c5feba196 100644 --- a/test/units/modules/packaging/language/test_pip.py +++ b/test/units/modules/packaging/language/test_pip.py @@ -1,23 +1,24 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) import json -from ansible.compat.tests import unittest -from ansible.compat.tests.mock import patch -from ansible.module_utils import basic +import pytest from ansible.modules.packaging.language import pip -from units.modules.utils import set_module_args, AnsibleFailJson, ModuleTestCase +pytestmark = pytest.mark.usefixtures('patch_ansible_module') -class TestPip(ModuleTestCase): - def setUp(self): - super(TestPip, self).setUp() - @patch.object(basic.AnsibleModule, 'get_bin_path') - def test_failure_when_pip_absent(self, mock_get_bin_path): +@pytest.mark.parametrize('patch_ansible_module', [{'name': 'six'}], indirect=['patch_ansible_module']) +def test_failure_when_pip_absent(mocker, capfd): + get_bin_path = mocker.patch('ansible.module_utils.basic.AnsibleModule.get_bin_path') + get_bin_path.return_value = None - mock_get_bin_path.return_value = None + with pytest.raises(SystemExit): + pip.main() - with self.assertRaises(AnsibleFailJson): - set_module_args({'name': 'six'}) - pip.main() + out, err = capfd.readouterr() + results = json.loads(out) + assert results['failed'] + assert 'pip needs to be installed' in results['msg']