diff --git a/test/units/module_utils/__init__.py b/test/units/module_utils/__init__.py new file mode 100644 index 00000000000..785fc459921 --- /dev/null +++ b/test/units/module_utils/__init__.py @@ -0,0 +1,21 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/test/units/module_utils/test_basic.py b/test/units/module_utils/test_basic.py new file mode 100644 index 00000000000..60f501ba28b --- /dev/null +++ b/test/units/module_utils/test_basic.py @@ -0,0 +1,355 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +#from __future__ import (absolute_import, division, print_function) +from __future__ import (absolute_import, division) +__metaclass__ = type + +import os +import tempfile + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, MagicMock + +from ansible.errors import * +from ansible.executor.module_common import modify_module +from ansible.module_utils.basic import heuristic_log_sanitize +from ansible.utils.hashing import checksum as utils_checksum + +TEST_MODULE_DATA = """ +from ansible.module_utils.basic import * + +def get_module(): + return AnsibleModule( + argument_spec = dict(), + supports_check_mode = True, + no_log = True, + ) + +get_module() + +""" + +class TestModuleUtilsBasic(unittest.TestCase): + + def cleanup_temp_file(self, fd, path): + try: + os.close(fd) + os.remove(path) + except: + pass + + def cleanup_temp_dir(self, path): + try: + os.rmdir(path) + except: + pass + + def setUp(self): + # create a temporary file for the test module + # we're about to generate + self.tmp_fd, self.tmp_path = tempfile.mkstemp() + os.write(self.tmp_fd, TEST_MODULE_DATA) + + # template the module code and eval it + module_data, module_style, shebang = modify_module(self.tmp_path, {}) + + d = {} + exec(module_data, d, d) + self.module = d['get_module']() + + # module_utils/basic.py screws with CWD, let's save it and reset + self.cwd = os.getcwd() + + def tearDown(self): + self.cleanup_temp_file(self.tmp_fd, self.tmp_path) + # Reset CWD back to what it was before basic.py changed it + os.chdir(self.cwd) + + ################################################################################# + # run_command() tests + + # test run_command with a string command + def test_run_command_string(self): + (rc, out, err) = self.module.run_command("/bin/echo -n 'foo bar'") + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo bar') + (rc, out, err) = self.module.run_command("/bin/echo -n 'foo bar'", use_unsafe_shell=True) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo bar') + + # test run_command with an array of args (with both use_unsafe_shell=True|False) + def test_run_command_args(self): + (rc, out, err) = self.module.run_command(['/bin/echo', '-n', "foo bar"]) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo bar') + (rc, out, err) = self.module.run_command(['/bin/echo', '-n', "foo bar"], use_unsafe_shell=True) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo bar') + + # test run_command with leading environment variables + #@raises(SystemExit) + def test_run_command_string_with_env_variables(self): + self.assertRaises(SystemExit, self.module.run_command, 'FOO=bar /bin/echo -n "foo bar"') + + #@raises(SystemExit) + def test_run_command_args_with_env_variables(self): + self.assertRaises(SystemExit, self.module.run_command, ['FOO=bar', '/bin/echo', '-n', 'foo bar']) + + def test_run_command_string_unsafe_with_env_variables(self): + (rc, out, err) = self.module.run_command('FOO=bar /bin/echo -n "foo bar"', use_unsafe_shell=True) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo bar') + + # test run_command with a command pipe (with both use_unsafe_shell=True|False) + def test_run_command_string_unsafe_with_pipe(self): + (rc, out, err) = self.module.run_command('echo "foo bar" | cat', use_unsafe_shell=True) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo bar\n') + + # test run_command with a shell redirect in (with both use_unsafe_shell=True|False) + def test_run_command_string_unsafe_with_redirect_in(self): + (rc, out, err) = self.module.run_command('cat << EOF\nfoo bar\nEOF', use_unsafe_shell=True) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo bar\n') + + # test run_command with a shell redirect out (with both use_unsafe_shell=True|False) + def test_run_command_string_unsafe_with_redirect_out(self): + tmp_fd, tmp_path = tempfile.mkstemp() + try: + (rc, out, err) = self.module.run_command('echo "foo bar" > %s' % tmp_path, use_unsafe_shell=True) + self.assertEqual(rc, 0) + self.assertTrue(os.path.exists(tmp_path)) + checksum = utils_checksum(tmp_path) + self.assertEqual(checksum, 'd53a205a336e07cf9eac45471b3870f9489288ec') + except: + raise + finally: + self.cleanup_temp_file(tmp_fd, tmp_path) + + # test run_command with a double shell redirect out (append) (with both use_unsafe_shell=True|False) + def test_run_command_string_unsafe_with_double_redirect_out(self): + tmp_fd, tmp_path = tempfile.mkstemp() + try: + (rc, out, err) = self.module.run_command('echo "foo bar" >> %s' % tmp_path, use_unsafe_shell=True) + self.assertEqual(rc, 0) + self.assertTrue(os.path.exists(tmp_path)) + checksum = utils_checksum(tmp_path) + self.assertEqual(checksum, 'd53a205a336e07cf9eac45471b3870f9489288ec') + except: + raise + finally: + self.cleanup_temp_file(tmp_fd, tmp_path) + + # test run_command with data + def test_run_command_string_with_data(self): + (rc, out, err) = self.module.run_command('cat', data='foo bar') + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo bar\n') + + # test run_command with binary data + def test_run_command_string_with_binary_data(self): + (rc, out, err) = self.module.run_command('cat', data='\x41\x42\x43\x44', binary_data=True) + self.assertEqual(rc, 0) + self.assertEqual(out, 'ABCD') + + # test run_command with a cwd set + def test_run_command_string_with_cwd(self): + tmp_path = tempfile.mkdtemp() + try: + (rc, out, err) = self.module.run_command('pwd', cwd=tmp_path) + self.assertEqual(rc, 0) + self.assertTrue(os.path.exists(tmp_path)) + self.assertEqual(out.strip(), os.path.realpath(tmp_path)) + except: + raise + finally: + self.cleanup_temp_dir(tmp_path) + + +class TestModuleUtilsBasicHelpers(unittest.TestCase): + ''' Test some implementation details of AnsibleModule + + Some pieces of AnsibleModule are implementation details but they have + potential cornercases that we need to check. Go ahead and test at + this level that the functions are behaving even though their API may + change and we'd have to rewrite these tests so that we know that we + need to check for those problems in any rewrite. + + In the future we might want to restructure higher level code to be + friendlier to unittests so that we can test at the level that the public + is interacting with the APIs. + ''' + + MANY_RECORDS = 7000 + URL_SECRET = 'http://username:pas:word@foo.com/data' + SSH_SECRET = 'username:pas:word@foo.com/data' + + def cleanup_temp_file(self, fd, path): + try: + os.close(fd) + os.remove(path) + except: + pass + + def cleanup_temp_dir(self, path): + try: + os.rmdir(path) + except: + pass + + def _gen_data(self, records, per_rec, top_level, secret_text): + hostvars = {'hostvars': {}} + for i in range(1, records, 1): + host_facts = {'host%s' % i: + {'pstack': + {'running': '875.1', + 'symlinked': '880.0', + 'tars': [], + 'versions': ['885.0']}, + }} + + if per_rec: + host_facts['host%s' % i]['secret'] = secret_text + hostvars['hostvars'].update(host_facts) + if top_level: + hostvars['secret'] = secret_text + return hostvars + + def setUp(self): + self.many_url = repr(self._gen_data(self.MANY_RECORDS, True, True, + self.URL_SECRET)) + self.many_ssh = repr(self._gen_data(self.MANY_RECORDS, True, True, + self.SSH_SECRET)) + self.one_url = repr(self._gen_data(self.MANY_RECORDS, False, True, + self.URL_SECRET)) + self.one_ssh = repr(self._gen_data(self.MANY_RECORDS, False, True, + self.SSH_SECRET)) + self.zero_secrets = repr(self._gen_data(self.MANY_RECORDS, False, + False, '')) + self.few_url = repr(self._gen_data(2, True, True, self.URL_SECRET)) + self.few_ssh = repr(self._gen_data(2, True, True, self.SSH_SECRET)) + + # create a temporary file for the test module + # we're about to generate + self.tmp_fd, self.tmp_path = tempfile.mkstemp() + os.write(self.tmp_fd, TEST_MODULE_DATA) + + # template the module code and eval it + module_data, module_style, shebang = modify_module(self.tmp_path, {}) + + d = {} + exec(module_data, d, d) + self.module = d['get_module']() + + # module_utils/basic.py screws with CWD, let's save it and reset + self.cwd = os.getcwd() + + def tearDown(self): + self.cleanup_temp_file(self.tmp_fd, self.tmp_path) + # Reset CWD back to what it was before basic.py changed it + os.chdir(self.cwd) + + + ################################################################################# + + # + # Speed tests + # + + # Previously, we used regexes which had some pathologically slow cases for + # parameters with large amounts of data with many ':' but no '@'. The + # present function gets slower when there are many replacements so we may + # want to explore regexes in the future (for the speed when substituting + # or flexibility). These speed tests will hopefully tell us if we're + # introducing code that has cases that are simply too slow. + # + # Some regex notes: + # * re.sub() is faster than re.match() + str.join(). + # * We may be able to detect a large number of '@' symbols and then use + # a regex else use the present function. + + #@timed(5) + #def test_log_sanitize_speed_many_url(self): + # heuristic_log_sanitize(self.many_url) + + #@timed(5) + #def test_log_sanitize_speed_many_ssh(self): + # heuristic_log_sanitize(self.many_ssh) + + #@timed(5) + #def test_log_sanitize_speed_one_url(self): + # heuristic_log_sanitize(self.one_url) + + #@timed(5) + #def test_log_sanitize_speed_one_ssh(self): + # heuristic_log_sanitize(self.one_ssh) + + #@timed(5) + #def test_log_sanitize_speed_zero_secrets(self): + # heuristic_log_sanitize(self.zero_secrets) + + # + # Test that the password obfuscation sanitizes somewhat cleanly. + # + + def test_log_sanitize_correctness(self): + url_data = repr(self._gen_data(3, True, True, self.URL_SECRET)) + ssh_data = repr(self._gen_data(3, True, True, self.SSH_SECRET)) + + url_output = heuristic_log_sanitize(url_data) + ssh_output = heuristic_log_sanitize(ssh_data) + + # Basic functionality: Successfully hid the password + try: + self.assertNotIn('pas:word', url_output) + self.assertNotIn('pas:word', ssh_output) + + # Slightly more advanced, we hid all of the password despite the ":" + self.assertNotIn('pas', url_output) + self.assertNotIn('pas', ssh_output) + except AttributeError: + # python2.6 or less's unittest + self.assertFalse('pas:word' in url_output, '%s is present in %s' % ('"pas:word"', url_output)) + self.assertFalse('pas:word' in ssh_output, '%s is present in %s' % ('"pas:word"', ssh_output)) + + self.assertFalse('pas' in url_output, '%s is present in %s' % ('"pas"', url_output)) + self.assertFalse('pas' in ssh_output, '%s is present in %s' % ('"pas"', ssh_output)) + + # In this implementation we replace the password with 8 "*" which is + # also the length of our password. The url fields should be able to + # accurately detect where the password ends so the length should be + # the same: + self.assertEqual(len(url_output), len(url_data)) + + # ssh checking is harder as the heuristic is overzealous in many + # cases. Since the input will have at least one ":" present before + # the password we can tell some things about the beginning and end of + # the data, though: + self.assertTrue(ssh_output.startswith("{'")) + self.assertTrue(ssh_output.endswith("}")) + try: + self.assertIn(":********@foo.com/data'", ssh_output) + except AttributeError: + # python2.6 or less's unittest + self.assertTrue(":********@foo.com/data'" in ssh_output, '%s is not present in %s' % (":********@foo.com/data'", ssh_output)) + + # The overzealous-ness here may lead to us changing the algorithm in + # the future. We could make it consume less of the data (with the + # possibility of leaving partial passwords exposed) and encourage + # people to use no_log instead of relying on this obfuscation. diff --git a/test/units/module_utils/test_database.py b/test/units/module_utils/test_database.py new file mode 100644 index 00000000000..67da0b60e0b --- /dev/null +++ b/test/units/module_utils/test_database.py @@ -0,0 +1,118 @@ +import collections +import mock +import os +import re + +from nose.tools import eq_ +try: + from nose.tools import assert_raises_regexp +except ImportError: + # Python < 2.7 + def assert_raises_regexp(expected, regexp, callable, *a, **kw): + try: + callable(*a, **kw) + except expected as e: + if isinstance(regexp, basestring): + regexp = re.compile(regexp) + if not regexp.search(str(e)): + raise Exception('"%s" does not match "%s"' % + (regexp.pattern, str(e))) + else: + if hasattr(expected,'__name__'): excName = expected.__name__ + else: excName = str(expected) + raise AssertionError("%s not raised" % excName) + +from ansible.module_utils.database import ( + pg_quote_identifier, + SQLParseError, +) + + +# Note: Using nose's generator test cases here so we can't inherit from +# unittest.TestCase +class TestQuotePgIdentifier(object): + + # These are all valid strings + # The results are based on interpreting the identifier as a table name + valid = { + # User quoted + '"public.table"': '"public.table"', + '"public"."table"': '"public"."table"', + '"schema test"."table test"': '"schema test"."table test"', + + # We quote part + 'public.table': '"public"."table"', + '"public".table': '"public"."table"', + 'public."table"': '"public"."table"', + 'schema test.table test': '"schema test"."table test"', + '"schema test".table test': '"schema test"."table test"', + 'schema test."table test"': '"schema test"."table test"', + + # Embedded double quotes + 'table "test"': '"table ""test"""', + 'public."table ""test"""': '"public"."table ""test"""', + 'public.table "test"': '"public"."table ""test"""', + 'schema "test".table': '"schema ""test"""."table"', + '"schema ""test""".table': '"schema ""test"""."table"', + '"""wat"""."""test"""': '"""wat"""."""test"""', + # Sigh, handle these as well: + '"no end quote': '"""no end quote"', + 'schema."table': '"schema"."""table"', + '"schema.table': '"""schema"."table"', + 'schema."table.something': '"schema"."""table"."something"', + + # Embedded dots + '"schema.test"."table.test"': '"schema.test"."table.test"', + '"schema.".table': '"schema."."table"', + '"schema."."table"': '"schema."."table"', + 'schema.".table"': '"schema".".table"', + '"schema".".table"': '"schema".".table"', + '"schema.".".table"': '"schema.".".table"', + # These are valid but maybe not what the user intended + '."table"': '".""table"""', + 'table.': '"table."', + } + + invalid = { + ('test.too.many.dots', 'table'): 'PostgreSQL does not support table with more than 3 dots', + ('"test.too".many.dots', 'database'): 'PostgreSQL does not support database with more than 1 dots', + ('test.too."many.dots"', 'database'): 'PostgreSQL does not support database with more than 1 dots', + ('"test"."too"."many"."dots"', 'database'): "PostgreSQL does not support database with more than 1 dots", + ('"test"."too"."many"."dots"', 'schema'): "PostgreSQL does not support schema with more than 2 dots", + ('"test"."too"."many"."dots"', 'table'): "PostgreSQL does not support table with more than 3 dots", + ('"test"."too"."many"."dots"."for"."column"', 'column'): "PostgreSQL does not support column with more than 4 dots", + ('"table "invalid" double quote"', 'table'): 'User escaped identifiers must escape extra quotes', + ('"schema "invalid"""."table "invalid"', 'table'): 'User escaped identifiers must escape extra quotes', + ('"schema."table"','table'): 'User escaped identifiers must escape extra quotes', + ('"schema".', 'table'): 'Identifier name unspecified or unquoted trailing dot', + } + + def check_valid_quotes(self, identifier, quoted_identifier): + eq_(pg_quote_identifier(identifier, 'table'), quoted_identifier) + + def test_valid_quotes(self): + for identifier in self.valid: + yield self.check_valid_quotes, identifier, self.valid[identifier] + + def check_invalid_quotes(self, identifier, id_type, msg): + assert_raises_regexp(SQLParseError, msg, pg_quote_identifier, *(identifier, id_type)) + + def test_invalid_quotes(self): + for test in self.invalid: + yield self.check_invalid_quotes, test[0], test[1], self.invalid[test] + + def test_how_many_dots(self): + eq_(pg_quote_identifier('role', 'role'), '"role"') + assert_raises_regexp(SQLParseError, "PostgreSQL does not support role with more than 1 dots", pg_quote_identifier, *('role.more', 'role')) + + eq_(pg_quote_identifier('db', 'database'), '"db"') + assert_raises_regexp(SQLParseError, "PostgreSQL does not support database with more than 1 dots", pg_quote_identifier, *('db.more', 'database')) + + eq_(pg_quote_identifier('db.schema', 'schema'), '"db"."schema"') + assert_raises_regexp(SQLParseError, "PostgreSQL does not support schema with more than 2 dots", pg_quote_identifier, *('db.schema.more', 'schema')) + + eq_(pg_quote_identifier('db.schema.table', 'table'), '"db"."schema"."table"') + assert_raises_regexp(SQLParseError, "PostgreSQL does not support table with more than 3 dots", pg_quote_identifier, *('db.schema.table.more', 'table')) + + eq_(pg_quote_identifier('db.schema.table.column', 'column'), '"db"."schema"."table"."column"') + assert_raises_regexp(SQLParseError, "PostgreSQL does not support column with more than 4 dots", pg_quote_identifier, *('db.schema.table.column.more', 'column'))