From 339a02c3847ce41ac8560b3e1f429f8d1d2e88f3 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Wed, 27 May 2015 03:20:54 -0500 Subject: [PATCH] Started reworking module_utils/basic unit tests (v2) --- lib/ansible/module_utils/basic.py | 4 +- test/units/module_utils/test_basic.py | 510 ++++++++++++-------------- 2 files changed, 227 insertions(+), 287 deletions(-) diff --git a/lib/ansible/module_utils/basic.py b/lib/ansible/module_utils/basic.py index 2e4805cb86b..c222bb4d168 100644 --- a/lib/ansible/module_utils/basic.py +++ b/lib/ansible/module_utils/basic.py @@ -930,7 +930,7 @@ class AnsibleModule(object): for check in spec: count = self._count_terms(check) if count > 1: - self.fail_json(msg="parameters are mutually exclusive: %s" % check) + self.fail_json(msg="parameters are mutually exclusive: %s" % (check,)) def _check_required_one_of(self, spec): if spec is None: @@ -948,7 +948,7 @@ class AnsibleModule(object): non_zero = [ c for c in counts if c > 0 ] if len(non_zero) > 0: if 0 in counts: - self.fail_json(msg="parameters are required together: %s" % check) + self.fail_json(msg="parameters are required together: %s" % (check,)) def _check_required_arguments(self): ''' ensure all required arguments are present ''' diff --git a/test/units/module_utils/test_basic.py b/test/units/module_utils/test_basic.py index 60f501ba28b..c3db5138bf2 100644 --- a/test/units/module_utils/test_basic.py +++ b/test/units/module_utils/test_basic.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # (c) 2012-2014, Michael DeHaan # # This file is part of Ansible @@ -16,301 +17,167 @@ # along with Ansible. If not, see . # Make coding more python3-ish -#from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division) __metaclass__ = type -import os -import tempfile +import __builtin__ + +from nose.tools import timed from ansible.compat.tests import unittest from ansible.compat.tests.mock import patch, MagicMock -from ansible.errors import * -from ansible.executor.module_common import modify_module -from ansible.module_utils.basic import heuristic_log_sanitize -from ansible.utils.hashing import checksum as utils_checksum - -TEST_MODULE_DATA = """ -from ansible.module_utils.basic import * - -def get_module(): - return AnsibleModule( - argument_spec = dict(), - supports_check_mode = True, - no_log = True, - ) - -get_module() - -""" - class TestModuleUtilsBasic(unittest.TestCase): - def cleanup_temp_file(self, fd, path): - try: - os.close(fd) - os.remove(path) - except: - pass - - def cleanup_temp_dir(self, path): - try: - os.rmdir(path) - except: - pass - def setUp(self): - # create a temporary file for the test module - # we're about to generate - self.tmp_fd, self.tmp_path = tempfile.mkstemp() - os.write(self.tmp_fd, TEST_MODULE_DATA) - - # template the module code and eval it - module_data, module_style, shebang = modify_module(self.tmp_path, {}) - - d = {} - exec(module_data, d, d) - self.module = d['get_module']() - - # module_utils/basic.py screws with CWD, let's save it and reset - self.cwd = os.getcwd() + pass def tearDown(self): - self.cleanup_temp_file(self.tmp_fd, self.tmp_path) - # Reset CWD back to what it was before basic.py changed it - os.chdir(self.cwd) - - ################################################################################# - # run_command() tests - - # test run_command with a string command - def test_run_command_string(self): - (rc, out, err) = self.module.run_command("/bin/echo -n 'foo bar'") - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - (rc, out, err) = self.module.run_command("/bin/echo -n 'foo bar'", use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - - # test run_command with an array of args (with both use_unsafe_shell=True|False) - def test_run_command_args(self): - (rc, out, err) = self.module.run_command(['/bin/echo', '-n', "foo bar"]) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - (rc, out, err) = self.module.run_command(['/bin/echo', '-n', "foo bar"], use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - - # test run_command with leading environment variables - #@raises(SystemExit) - def test_run_command_string_with_env_variables(self): - self.assertRaises(SystemExit, self.module.run_command, 'FOO=bar /bin/echo -n "foo bar"') - - #@raises(SystemExit) - def test_run_command_args_with_env_variables(self): - self.assertRaises(SystemExit, self.module.run_command, ['FOO=bar', '/bin/echo', '-n', 'foo bar']) - - def test_run_command_string_unsafe_with_env_variables(self): - (rc, out, err) = self.module.run_command('FOO=bar /bin/echo -n "foo bar"', use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - - # test run_command with a command pipe (with both use_unsafe_shell=True|False) - def test_run_command_string_unsafe_with_pipe(self): - (rc, out, err) = self.module.run_command('echo "foo bar" | cat', use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar\n') - - # test run_command with a shell redirect in (with both use_unsafe_shell=True|False) - def test_run_command_string_unsafe_with_redirect_in(self): - (rc, out, err) = self.module.run_command('cat << EOF\nfoo bar\nEOF', use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar\n') - - # test run_command with a shell redirect out (with both use_unsafe_shell=True|False) - def test_run_command_string_unsafe_with_redirect_out(self): - tmp_fd, tmp_path = tempfile.mkstemp() - try: - (rc, out, err) = self.module.run_command('echo "foo bar" > %s' % tmp_path, use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertTrue(os.path.exists(tmp_path)) - checksum = utils_checksum(tmp_path) - self.assertEqual(checksum, 'd53a205a336e07cf9eac45471b3870f9489288ec') - except: - raise - finally: - self.cleanup_temp_file(tmp_fd, tmp_path) - - # test run_command with a double shell redirect out (append) (with both use_unsafe_shell=True|False) - def test_run_command_string_unsafe_with_double_redirect_out(self): - tmp_fd, tmp_path = tempfile.mkstemp() - try: - (rc, out, err) = self.module.run_command('echo "foo bar" >> %s' % tmp_path, use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertTrue(os.path.exists(tmp_path)) - checksum = utils_checksum(tmp_path) - self.assertEqual(checksum, 'd53a205a336e07cf9eac45471b3870f9489288ec') - except: - raise - finally: - self.cleanup_temp_file(tmp_fd, tmp_path) - - # test run_command with data - def test_run_command_string_with_data(self): - (rc, out, err) = self.module.run_command('cat', data='foo bar') - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar\n') - - # test run_command with binary data - def test_run_command_string_with_binary_data(self): - (rc, out, err) = self.module.run_command('cat', data='\x41\x42\x43\x44', binary_data=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'ABCD') - - # test run_command with a cwd set - def test_run_command_string_with_cwd(self): - tmp_path = tempfile.mkdtemp() - try: - (rc, out, err) = self.module.run_command('pwd', cwd=tmp_path) - self.assertEqual(rc, 0) - self.assertTrue(os.path.exists(tmp_path)) - self.assertEqual(out.strip(), os.path.realpath(tmp_path)) - except: - raise - finally: - self.cleanup_temp_dir(tmp_path) - - -class TestModuleUtilsBasicHelpers(unittest.TestCase): - ''' Test some implementation details of AnsibleModule - - Some pieces of AnsibleModule are implementation details but they have - potential cornercases that we need to check. Go ahead and test at - this level that the functions are behaving even though their API may - change and we'd have to rewrite these tests so that we know that we - need to check for those problems in any rewrite. - - In the future we might want to restructure higher level code to be - friendlier to unittests so that we can test at the level that the public - is interacting with the APIs. - ''' - - MANY_RECORDS = 7000 - URL_SECRET = 'http://username:pas:word@foo.com/data' - SSH_SECRET = 'username:pas:word@foo.com/data' - - def cleanup_temp_file(self, fd, path): - try: - os.close(fd) - os.remove(path) - except: + pass + + def test_module_utils_basic_imports(self): + realimport = __builtin__.__import__ + + def _mock_import(name, *args, **kwargs): + if name == 'json': + raise ImportError() + realimport(name, *args, **kwargs) + + with patch.object(__builtin__, '__import__', _mock_import, create=True) as m: + m('ansible.module_utils.basic') + __builtin__.__import__('ansible.module_utils.basic') + + def test_module_utils_basic_get_platform(self): + with patch('platform.system', return_value='foo'): + from ansible.module_utils.basic import get_platform + self.assertEqual(get_platform(), 'foo') + + def test_module_utils_basic_get_distribution(self): + from ansible.module_utils.basic import get_distribution + + with patch('platform.system', return_value='Foo'): + self.assertEqual(get_distribution(), None) + + with patch('platform.system', return_value='Linux'): + with patch('platform.linux_distribution', return_value=("foo", "1", "One")): + self.assertEqual(get_distribution(), "Foo") + + with patch('os.path.isfile', return_value=True): + def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): + if supported_dists != (): + return ("AmazonFooBar", "", "") + else: + return ("", "", "") + + with patch('platform.linux_distribution', side_effect=_dist): + self.assertEqual(get_distribution(), "Amazon") + + def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): + if supported_dists != (): + return ("Bar", "2", "Two") + else: + return ("", "", "") + + with patch('platform.linux_distribution', side_effect=_dist): + self.assertEqual(get_distribution(), "OtherLinux") + + with patch('platform.linux_distribution', side_effect=Exception("boo")): + with patch('platform.dist', return_value=("bar", "2", "Two")): + self.assertEqual(get_distribution(), "Bar") + + def test_module_utils_basic_get_distribution_version(self): + from ansible.module_utils.basic import get_distribution_version + + with patch('platform.system', return_value='Foo'): + self.assertEqual(get_distribution_version(), None) + + with patch('platform.system', return_value='Linux'): + with patch('platform.linux_distribution', return_value=("foo", "1", "One")): + self.assertEqual(get_distribution_version(), "1") + + with patch('os.path.isfile', return_value=True): + def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): + if supported_dists != (): + return ("AmazonFooBar", "2", "") + else: + return ("", "", "") + + with patch('platform.linux_distribution', side_effect=_dist): + self.assertEqual(get_distribution_version(), "2") + + with patch('platform.linux_distribution', side_effect=Exception("boo")): + with patch('platform.dist', return_value=("bar", "3", "Three")): + self.assertEqual(get_distribution_version(), "3") + + def test_module_utils_basic_load_platform_subclass(self): + class LinuxTest: pass - def cleanup_temp_dir(self, path): - try: - os.rmdir(path) - except: - pass - - def _gen_data(self, records, per_rec, top_level, secret_text): - hostvars = {'hostvars': {}} - for i in range(1, records, 1): - host_facts = {'host%s' % i: - {'pstack': - {'running': '875.1', - 'symlinked': '880.0', - 'tars': [], - 'versions': ['885.0']}, - }} - - if per_rec: - host_facts['host%s' % i]['secret'] = secret_text - hostvars['hostvars'].update(host_facts) - if top_level: - hostvars['secret'] = secret_text - return hostvars - - def setUp(self): - self.many_url = repr(self._gen_data(self.MANY_RECORDS, True, True, - self.URL_SECRET)) - self.many_ssh = repr(self._gen_data(self.MANY_RECORDS, True, True, - self.SSH_SECRET)) - self.one_url = repr(self._gen_data(self.MANY_RECORDS, False, True, - self.URL_SECRET)) - self.one_ssh = repr(self._gen_data(self.MANY_RECORDS, False, True, - self.SSH_SECRET)) - self.zero_secrets = repr(self._gen_data(self.MANY_RECORDS, False, - False, '')) - self.few_url = repr(self._gen_data(2, True, True, self.URL_SECRET)) - self.few_ssh = repr(self._gen_data(2, True, True, self.SSH_SECRET)) - - # create a temporary file for the test module - # we're about to generate - self.tmp_fd, self.tmp_path = tempfile.mkstemp() - os.write(self.tmp_fd, TEST_MODULE_DATA) - - # template the module code and eval it - module_data, module_style, shebang = modify_module(self.tmp_path, {}) - - d = {} - exec(module_data, d, d) - self.module = d['get_module']() - - # module_utils/basic.py screws with CWD, let's save it and reset - self.cwd = os.getcwd() - - def tearDown(self): - self.cleanup_temp_file(self.tmp_fd, self.tmp_path) - # Reset CWD back to what it was before basic.py changed it - os.chdir(self.cwd) - - - ################################################################################# - - # - # Speed tests - # - - # Previously, we used regexes which had some pathologically slow cases for - # parameters with large amounts of data with many ':' but no '@'. The - # present function gets slower when there are many replacements so we may - # want to explore regexes in the future (for the speed when substituting - # or flexibility). These speed tests will hopefully tell us if we're - # introducing code that has cases that are simply too slow. - # - # Some regex notes: - # * re.sub() is faster than re.match() + str.join(). - # * We may be able to detect a large number of '@' symbols and then use - # a regex else use the present function. - - #@timed(5) - #def test_log_sanitize_speed_many_url(self): - # heuristic_log_sanitize(self.many_url) - - #@timed(5) - #def test_log_sanitize_speed_many_ssh(self): - # heuristic_log_sanitize(self.many_ssh) - - #@timed(5) - #def test_log_sanitize_speed_one_url(self): - # heuristic_log_sanitize(self.one_url) - - #@timed(5) - #def test_log_sanitize_speed_one_ssh(self): - # heuristic_log_sanitize(self.one_ssh) - - #@timed(5) - #def test_log_sanitize_speed_zero_secrets(self): - # heuristic_log_sanitize(self.zero_secrets) - - # - # Test that the password obfuscation sanitizes somewhat cleanly. - # - - def test_log_sanitize_correctness(self): - url_data = repr(self._gen_data(3, True, True, self.URL_SECRET)) - ssh_data = repr(self._gen_data(3, True, True, self.SSH_SECRET)) + class Foo(LinuxTest): + platform = "Linux" + distribution = None + + class Bar(LinuxTest): + platform = "Linux" + distribution = "Bar" + + from ansible.module_utils.basic import load_platform_subclass + + # match just the platform class, not a specific distribution + with patch('ansible.module_utils.basic.get_platform', return_value="Linux"): + with patch('ansible.module_utils.basic.get_distribution', return_value=None): + self.assertIs(type(load_platform_subclass(LinuxTest)), Foo) + + # match both the distribution and platform class + with patch('ansible.module_utils.basic.get_platform', return_value="Linux"): + with patch('ansible.module_utils.basic.get_distribution', return_value="Bar"): + self.assertIs(type(load_platform_subclass(LinuxTest)), Bar) + + # if neither match, the fallback should be the top-level class + with patch('ansible.module_utils.basic.get_platform', return_value="Foo"): + with patch('ansible.module_utils.basic.get_distribution', return_value=None): + self.assertIs(type(load_platform_subclass(LinuxTest)), LinuxTest) + + def test_module_utils_basic_json_dict_converters(self): + from ansible.module_utils.basic import json_dict_unicode_to_bytes, json_dict_bytes_to_unicode + + test_data = dict( + item1 = u"Fóo", + item2 = [u"Bár", u"Bam"], + item3 = dict(sub1=u"Súb"), + item4 = (u"föo", u"bär", u"©"), + item5 = 42, + ) + res = json_dict_unicode_to_bytes(test_data) + res2 = json_dict_bytes_to_unicode(res) + + self.assertEqual(test_data, res2) + + def test_module_utils_basic_heuristic_log_sanitize(self): + from ansible.module_utils.basic import heuristic_log_sanitize + + URL_SECRET = 'http://username:pas:word@foo.com/data' + SSH_SECRET = 'username:pas:word@foo.com/data' + + def _gen_data(records, per_rec, top_level, secret_text): + hostvars = {'hostvars': {}} + for i in range(1, records, 1): + host_facts = {'host%s' % i: + {'pstack': + {'running': '875.1', + 'symlinked': '880.0', + 'tars': [], + 'versions': ['885.0']}, + }} + if per_rec: + host_facts['host%s' % i]['secret'] = secret_text + hostvars['hostvars'].update(host_facts) + if top_level: + hostvars['secret'] = secret_text + return hostvars + + url_data = repr(_gen_data(3, True, True, URL_SECRET)) + ssh_data = repr(_gen_data(3, True, True, SSH_SECRET)) url_output = heuristic_log_sanitize(url_data) ssh_output = heuristic_log_sanitize(ssh_data) @@ -349,7 +216,80 @@ class TestModuleUtilsBasicHelpers(unittest.TestCase): # python2.6 or less's unittest self.assertTrue(":********@foo.com/data'" in ssh_output, '%s is not present in %s' % (":********@foo.com/data'", ssh_output)) - # The overzealous-ness here may lead to us changing the algorithm in - # the future. We could make it consume less of the data (with the - # possibility of leaving partial passwords exposed) and encourage - # people to use no_log instead of relying on this obfuscation. + + def test_module_utils_basic_ansible_module_creation(self): + from ansible.module_utils import basic + + basic.MODULE_COMPLEX_ARGS = '{}' + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + arg_spec = dict( + foo = dict(required=True), + bar = dict(), + bam = dict(), + baz = dict(), + ) + mut_ex = (('bar', 'bam'),) + req_to = (('bam', 'baz'),) + + # should test ok + basic.MODULE_COMPLEX_ARGS = '{"foo":"hello"}' + am = basic.AnsibleModule( + argument_spec = arg_spec, + mutually_exclusive = mut_ex, + required_together = req_to, + no_log=True, + check_invalid_arguments=False, + add_file_common_args=True, + supports_check_mode=True, + ) + + # fail, because a required param was not specified + basic.MODULE_COMPLEX_ARGS = '{}' + self.assertRaises( + SystemExit, + basic.AnsibleModule, + argument_spec = arg_spec, + mutually_exclusive = mut_ex, + required_together = req_to, + no_log=True, + check_invalid_arguments=False, + add_file_common_args=True, + supports_check_mode=True, + ) + + # fail because of mutually exclusive parameters + basic.MODULE_COMPLEX_ARGS = '{"foo":"hello", "bar": "bad", "bam": "bad"}' + self.assertRaises( + SystemExit, + basic.AnsibleModule, + argument_spec = arg_spec, + mutually_exclusive = mut_ex, + required_together = req_to, + no_log=True, + check_invalid_arguments=False, + add_file_common_args=True, + supports_check_mode=True, + ) + + # fail because a param required due to another param was not specified + basic.MODULE_COMPLEX_ARGS = '{"bam":"bad"}' + self.assertRaises( + SystemExit, + basic.AnsibleModule, + argument_spec = arg_spec, + mutually_exclusive = mut_ex, + required_together = req_to, + no_log=True, + check_invalid_arguments=False, + add_file_common_args=True, + supports_check_mode=True, + ) + + def test_module_utils_basic_get_module_path(self): + from ansible.module_utils.basic import get_module_path + with patch('os.path.realpath', return_value='/path/to/foo/'): + self.assertEqual(get_module_path(), '/path/to/foo') +