You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/units/TestUtils.py

946 lines
38 KiB
Python

# -*- coding: utf-8 -*-
import traceback
import unittest
import os
import os.path
import re
import tempfile
import yaml
import passlib.hash
import string
import StringIO
import copy
import tempfile
import shutil
from nose.plugins.skip import SkipTest
from mock import patch
import ansible.utils
import ansible.errors
import ansible.constants as C
import ansible.utils.template as template2
from ansible.module_utils.splitter import split_args
from ansible import __version__
import sys
reload(sys)
sys.setdefaultencoding("utf8")
class TestUtils(unittest.TestCase):
def _is_fips(self):
try:
data = open('/proc/sys/crypto/fips_enabled').read().strip()
except:
return False
if data != '1':
return False
return True
def test_before_comment(self):
''' see if we can detect the part of a string before a comment. Used by INI parser in inventory '''
input = "before # comment"
expected = "before "
actual = ansible.utils.before_comment(input)
self.assertEqual(expected, actual)
input = "before \# not a comment"
expected = "before # not a comment"
actual = ansible.utils.before_comment(input)
self.assertEqual(expected, actual)
input = ""
expected = ""
actual = ansible.utils.before_comment(input)
self.assertEqual(expected, actual)
input = "#"
expected = ""
actual = ansible.utils.before_comment(input)
self.assertEqual(expected, actual)
#####################################
### check_conditional tests
def test_check_conditional_jinja2_literals(self):
# see http://jinja.pocoo.org/docs/templates/#literals
# none
self.assertEqual(ansible.utils.check_conditional(
None, '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
'', '/', {}), True)
# list
self.assertEqual(ansible.utils.check_conditional(
['true'], '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
['false'], '/', {}), False)
# non basestring or list
self.assertEqual(ansible.utils.check_conditional(
{}, '/', {}), {})
# boolean
self.assertEqual(ansible.utils.check_conditional(
'true', '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
'false', '/', {}), False)
self.assertEqual(ansible.utils.check_conditional(
'True', '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
'False', '/', {}), False)
# integer
self.assertEqual(ansible.utils.check_conditional(
'1', '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
'0', '/', {}), False)
# string, beware, a string is truthy unless empty
self.assertEqual(ansible.utils.check_conditional(
'"yes"', '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
'"no"', '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
'""', '/', {}), False)
def test_check_conditional_jinja2_variable_literals(self):
# see http://jinja.pocoo.org/docs/templates/#literals
# boolean
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': 'True'}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': 'true'}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': 'False'}), False)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': 'false'}), False)
# integer
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': '1'}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': 1}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': '0'}), False)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': 0}), False)
# string, beware, a string is truthy unless empty
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': '"yes"'}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': '"no"'}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': '""'}), False)
# Python boolean in Jinja2 expression
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': True}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': False}), False)
def test_check_conditional_jinja2_expression(self):
self.assertEqual(ansible.utils.check_conditional(
'1 == 1', '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
'bar == 42', '/', {'bar': 42}), True)
self.assertEqual(ansible.utils.check_conditional(
'bar != 42', '/', {'bar': 42}), False)
def test_check_conditional_jinja2_expression_in_variable(self):
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': '1 == 1'}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': 'bar == 42', 'bar': 42}), True)
self.assertEqual(ansible.utils.check_conditional(
'var', '/', {'var': 'bar != 42', 'bar': 42}), False)
def test_check_conditional_jinja2_unicode(self):
self.assertEqual(ansible.utils.check_conditional(
u'"\u00df"', '/', {}), True)
self.assertEqual(ansible.utils.check_conditional(
u'var == "\u00df"', '/', {'var': u'\u00df'}), True)
#####################################
### key-value parsing
def test_parse_kv_basic(self):
self.assertEqual(ansible.utils.parse_kv('a=simple b="with space" c="this=that"'),
{'a': 'simple', 'b': 'with space', 'c': 'this=that'})
self.assertEqual(ansible.utils.parse_kv('msg=АБВГД'),
{'msg': 'АБВГД'})
def test_jsonify(self):
self.assertEqual(ansible.utils.jsonify(None), '{}')
self.assertEqual(ansible.utils.jsonify(dict(foo='bar', baz=['qux'])), '{"baz": ["qux"], "foo": "bar"}')
expected = u'{"baz":["qux"],"foo":"bar"}'
self.assertEqual("".join(ansible.utils.jsonify(dict(foo='bar', baz=['qux']), format=True).split()), expected)
def test_is_failed(self):
self.assertEqual(ansible.utils.is_failed(dict(rc=0)), False)
self.assertEqual(ansible.utils.is_failed(dict(rc=1)), True)
self.assertEqual(ansible.utils.is_failed(dict()), False)
self.assertEqual(ansible.utils.is_failed(dict(failed=False)), False)
self.assertEqual(ansible.utils.is_failed(dict(failed=True)), True)
self.assertEqual(ansible.utils.is_failed(dict(failed='True')), True)
self.assertEqual(ansible.utils.is_failed(dict(failed='true')), True)
def test_is_changed(self):
self.assertEqual(ansible.utils.is_changed(dict()), False)
self.assertEqual(ansible.utils.is_changed(dict(changed=False)), False)
self.assertEqual(ansible.utils.is_changed(dict(changed=True)), True)
self.assertEqual(ansible.utils.is_changed(dict(changed='True')), True)
self.assertEqual(ansible.utils.is_changed(dict(changed='true')), True)
def test_path_dwim(self):
self.assertEqual(ansible.utils.path_dwim(None, __file__),
__file__)
self.assertEqual(ansible.utils.path_dwim(None, '~'),
os.path.expanduser('~'))
self.assertEqual(ansible.utils.path_dwim(None, 'TestUtils.py'),
__file__.rstrip('c'))
def test_path_dwim_relative(self):
self.assertEqual(ansible.utils.path_dwim_relative(__file__, 'units', 'TestUtils.py',
os.path.dirname(os.path.dirname(__file__))),
__file__.rstrip('c'))
def test_json_loads(self):
self.assertEqual(ansible.utils.json_loads('{"foo": "bar"}'), dict(foo='bar'))
def test_parse_json(self):
# leading junk
self.assertEqual(ansible.utils.parse_json('ansible\n{"foo": "bar"}'), dict(foo="bar"))
# No closing quotation
try:
rc = ansible.utils.parse_json('foo=bar "')
print rc
except ValueError:
pass
else:
traceback.print_exc()
raise AssertionError('Incorrect exception, expected ValueError')
# Failed to parse
try:
ansible.utils.parse_json('{')
except ValueError:
pass
else:
raise AssertionError('Incorrect exception, expected ValueError')
def test_parse_yaml(self):
#json
self.assertEqual(ansible.utils.parse_yaml('{"foo": "bar"}'), dict(foo='bar'))
# broken json
try:
ansible.utils.parse_yaml('{')
except ansible.errors.AnsibleError:
pass
else:
raise AssertionError
# broken json with path_hint
try:
ansible.utils.parse_yaml('{', path_hint='foo')
except ansible.errors.AnsibleError:
pass
else:
raise AssertionError
# yaml with front-matter
self.assertEqual(ansible.utils.parse_yaml("---\nfoo: bar"), dict(foo='bar'))
# yaml no front-matter
self.assertEqual(ansible.utils.parse_yaml('foo: bar'), dict(foo='bar'))
# yaml indented first line (See #6348)
self.assertEqual(ansible.utils.parse_yaml(' - foo: bar\n baz: qux'), [dict(foo='bar', baz='qux')])
def test_process_common_errors(self):
# no quote
self.assertTrue('YAML thought it' in ansible.utils.process_common_errors('', 'foo: {{bar}}', 6))
# extra colon
self.assertTrue('an extra unquoted colon' in ansible.utils.process_common_errors('', 'foo: bar:', 8))
# match
self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', 'foo: "{{bar}}"baz', 6))
self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', "foo: '{{bar}}'baz", 6))
# unbalanced
self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', 'foo: "bad" "wolf"', 6))
self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', "foo: 'bad' 'wolf'", 6))
def test_process_yaml_error(self):
data = 'foo: bar\n baz: qux'
try:
ansible.utils.parse_yaml(data)
except yaml.YAMLError, exc:
try:
ansible.utils.process_yaml_error(exc, data, __file__)
except ansible.errors.AnsibleYAMLValidationFailed, e:
self.assertTrue('Syntax Error while loading' in str(e))
else:
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
data = 'foo: bar\n baz: {{qux}}'
try:
ansible.utils.parse_yaml(data)
except yaml.YAMLError, exc:
try:
ansible.utils.process_yaml_error(exc, data, __file__)
except ansible.errors.AnsibleYAMLValidationFailed, e:
self.assertTrue('Syntax Error while loading' in str(e))
else:
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
data = '\xFF'
try:
ansible.utils.parse_yaml(data)
except yaml.YAMLError, exc:
try:
ansible.utils.process_yaml_error(exc, data, __file__)
except ansible.errors.AnsibleYAMLValidationFailed, e:
self.assertTrue('Check over' in str(e))
else:
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
data = '\xFF'
try:
ansible.utils.parse_yaml(data)
except yaml.YAMLError, exc:
try:
ansible.utils.process_yaml_error(exc, data, None)
except ansible.errors.AnsibleYAMLValidationFailed, e:
self.assertTrue('Could not parse YAML.' in str(e))
else:
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
def test_parse_yaml_from_file(self):
test = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
'common_vars.yml')
encrypted = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
'encrypted.yml')
broken = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
'broken.yml')
try:
ansible.utils.parse_yaml_from_file(os.path.dirname(__file__))
except ansible.errors.AnsibleError:
pass
else:
raise AssertionError('Incorrect exception, expected AnsibleError')
self.assertEqual(ansible.utils.parse_yaml_from_file(test), yaml.safe_load(open(test)))
self.assertEqual(ansible.utils.parse_yaml_from_file(encrypted, 'ansible'), dict(foo='bar'))
try:
ansible.utils.parse_yaml_from_file(broken)
except ansible.errors.AnsibleYAMLValidationFailed, e:
self.assertTrue('Syntax Error while loading' in str(e))
else:
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
def test_merge_hash(self):
self.assertEqual(ansible.utils.merge_hash(dict(foo='bar', baz='qux'), dict(foo='baz')),
dict(foo='baz', baz='qux'))
self.assertEqual(ansible.utils.merge_hash(dict(foo=dict(bar='baz')), dict(foo=dict(bar='qux'))),
dict(foo=dict(bar='qux')))
def test_md5s(self):
if self._is_fips():
raise SkipTest('MD5 unavailable on FIPs enabled systems')
self.assertEqual(ansible.utils.md5s('ansible'), '640c8a5376aa12fa15cf02130ce239a6')
# Need a test that causes UnicodeEncodeError See 4221
def test_md5(self):
if self._is_fips():
raise SkipTest('MD5 unavailable on FIPs enabled systems')
self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cfg')),
'fb7b5b90ea63f04bde33e804b6fad42c')
self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cf')),
None)
def test_checksum_s(self):
self.assertEqual(ansible.utils.checksum_s('ansible'), 'bef45157a43c9e5f469d188810814a4a8ab9f2ed')
# Need a test that causes UnicodeEncodeError See 4221
def test_checksum(self):
self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cfg')),
'658b67c8ac7595adde7048425ff1f9aba270721a')
self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cf')),
None)
def test_default(self):
self.assertEqual(ansible.utils.default(None, lambda: {}), {})
self.assertEqual(ansible.utils.default(dict(foo='bar'), lambda: {}), dict(foo='bar'))
def test__gitinfo(self):
# this fails if not run from git clone
# self.assertEqual('last updated' in ansible.utils._gitinfo())
# missing test for git submodule
# missing test outside of git clone
pass
def test_version(self):
version = ansible.utils.version('ansible')
self.assertTrue(version.startswith('ansible %s' % __version__))
# this fails if not run from git clone
# self.assertEqual('last updated' in version)
def test_getch(self):
# figure out how to test this
pass
def test_sanitize_output(self):
self.assertEqual(ansible.utils.sanitize_output('password=foo'), 'password=VALUE_HIDDEN')
self.assertEqual(ansible.utils.sanitize_output('foo=user:pass@foo/whatever'),
'foo=user:********@foo/whatever')
self.assertEqual(ansible.utils.sanitize_output('foo=http://username:pass@wherever/foo'),
'foo=http://username:********@wherever/foo')
self.assertEqual(ansible.utils.sanitize_output('foo=http://wherever/foo'),
'foo=http://wherever/foo')
def test_increment_debug(self):
ansible.utils.VERBOSITY = 0
ansible.utils.increment_debug(None, None, None, None)
self.assertEqual(ansible.utils.VERBOSITY, 1)
def test_base_parser(self):
output = ansible.utils.base_parser(output_opts=True)
self.assertTrue(output.has_option('--one-line') and output.has_option('--tree'))
runas = ansible.utils.base_parser(runas_opts=True)
for opt in ['--sudo', '--sudo-user', '--user', '--su', '--su-user']:
self.assertTrue(runas.has_option(opt))
async = ansible.utils.base_parser(async_opts=True)
self.assertTrue(async.has_option('--poll') and async.has_option('--background'))
connect = ansible.utils.base_parser(connect_opts=True)
self.assertTrue(connect.has_option('--connection'))
subset = ansible.utils.base_parser(subset_opts=True)
self.assertTrue(subset.has_option('--limit'))
check = ansible.utils.base_parser(check_opts=True)
self.assertTrue(check.has_option('--check'))
diff = ansible.utils.base_parser(diff_opts=True)
self.assertTrue(diff.has_option('--diff'))
def test_do_encrypt(self):
salt_chars = string.ascii_letters + string.digits + './'
salt = ansible.utils.random_password(length=8, chars=salt_chars)
hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt', salt=salt)
self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash))
hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt')
self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash))
try:
ansible.utils.do_encrypt('ansible', 'ansible')
except ansible.errors.AnsibleError:
pass
else:
raise AssertionError('Incorrect exception, expected AnsibleError')
def test_do_encrypt_md5(self):
if self._is_fips():
raise SkipTest('MD5 unavailable on FIPS systems')
hash = ansible.utils.do_encrypt('ansible', 'md5_crypt', salt_size=4)
self.assertTrue(passlib.hash.md5_crypt.verify('ansible', hash))
def test_last_non_blank_line(self):
self.assertEqual(ansible.utils.last_non_blank_line('a\n\nb\n\nc'), 'c')
self.assertEqual(ansible.utils.last_non_blank_line(''), '')
def test_filter_leading_non_json_lines(self):
self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n{"foo": "bar"}'),
'{"foo": "bar"}\n')
self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n["foo", "bar"]'),
'["foo", "bar"]\n')
def test_boolean(self):
self.assertEqual(ansible.utils.boolean("true"), True)
self.assertEqual(ansible.utils.boolean("True"), True)
self.assertEqual(ansible.utils.boolean("TRUE"), True)
self.assertEqual(ansible.utils.boolean("t"), True)
self.assertEqual(ansible.utils.boolean("T"), True)
self.assertEqual(ansible.utils.boolean("Y"), True)
self.assertEqual(ansible.utils.boolean("y"), True)
self.assertEqual(ansible.utils.boolean("1"), True)
self.assertEqual(ansible.utils.boolean(1), True)
self.assertEqual(ansible.utils.boolean("false"), False)
self.assertEqual(ansible.utils.boolean("False"), False)
self.assertEqual(ansible.utils.boolean("0"), False)
self.assertEqual(ansible.utils.boolean(0), False)
self.assertEqual(ansible.utils.boolean("foo"), False)
def test_make_sudo_cmd(self):
cmd = ansible.utils.make_sudo_cmd(C.DEFAULT_SUDO_EXE, 'root', '/bin/sh', '/bin/ls')
self.assertTrue(isinstance(cmd, tuple))
self.assertEqual(len(cmd), 3)
self.assertTrue('-u root' in cmd[0])
self.assertTrue('-p "[sudo via ansible, key=' in cmd[0] and cmd[1].startswith('[sudo via ansible, key'))
self.assertTrue('echo BECOME-SUCCESS-' in cmd[0] and cmd[2].startswith('BECOME-SUCCESS-'))
self.assertTrue('sudo -k' in cmd[0])
def test_make_su_cmd(self):
cmd = ansible.utils.make_su_cmd('root', '/bin/sh', '/bin/ls')
self.assertTrue(isinstance(cmd, tuple))
self.assertEqual(len(cmd), 3)
self.assertTrue('root -c "/bin/sh' in cmd[0] or ' root -c /bin/sh' in cmd[0])
self.assertTrue('echo BECOME-SUCCESS-' in cmd[0] and cmd[2].startswith('BECOME-SUCCESS-'))
def test_to_unicode(self):
uni = ansible.utils.unicode.to_unicode(u'ansible')
self.assertTrue(isinstance(uni, unicode))
self.assertEqual(uni, u'ansible')
none = ansible.utils.unicode.to_unicode(None, nonstring='passthru')
self.assertTrue(isinstance(none, type(None)))
self.assertTrue(none is None)
utf8 = ansible.utils.unicode.to_unicode('ansible')
self.assertTrue(isinstance(utf8, unicode))
self.assertEqual(utf8, u'ansible')
def test_is_list_of_strings(self):
self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', u'baz']), True)
self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', True]), False)
self.assertEqual(ansible.utils.is_list_of_strings(['one', 2, 'three']), False)
def test_contains_vars(self):
self.assertTrue(ansible.utils.contains_vars('{{foo}}'))
self.assertTrue(ansible.utils.contains_vars('$foo'))
self.assertFalse(ansible.utils.contains_vars('foo'))
def test_safe_eval(self):
# Not basestring
self.assertEqual(ansible.utils.safe_eval(len), len)
self.assertEqual(ansible.utils.safe_eval(1), 1)
self.assertEqual(ansible.utils.safe_eval(len, include_exceptions=True), (len, None))
self.assertEqual(ansible.utils.safe_eval(1, include_exceptions=True), (1, None))
# module
self.assertEqual(ansible.utils.safe_eval('foo.bar('), 'foo.bar(')
self.assertEqual(ansible.utils.safe_eval('foo.bar(', include_exceptions=True), ('foo.bar(', None))
# import
self.assertEqual(ansible.utils.safe_eval('import foo'), 'import foo')
self.assertEqual(ansible.utils.safe_eval('import foo', include_exceptions=True), ('import foo', None))
# valid simple eval
self.assertEqual(ansible.utils.safe_eval('True'), True)
self.assertEqual(ansible.utils.safe_eval('True', include_exceptions=True), (True, None))
# valid eval with lookup
self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2)), 3)
self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2), include_exceptions=True), (3, None))
# invalid eval
self.assertEqual(ansible.utils.safe_eval('foo'), 'foo')
nameerror = ansible.utils.safe_eval('foo', include_exceptions=True)
self.assertTrue(isinstance(nameerror, tuple))
self.assertEqual(nameerror[0], 'foo')
self.assertTrue(isinstance(nameerror[1], NameError))
def test_listify_lookup_plugin_terms(self):
basedir = os.path.dirname(__file__)
# Straight lookups
#self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=[])), [])
#self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=['one', 'two'])), ['one', 'two'])
def test_deprecated(self):
sys_stderr = sys.stderr
sys.stderr = StringIO.StringIO()
ansible.utils.deprecated('Ack!', '0.0')
out = sys.stderr.getvalue()
self.assertTrue('0.0' in out)
self.assertTrue('[DEPRECATION WARNING]' in out)
sys.stderr = StringIO.StringIO()
ansible.utils.deprecated('Ack!', None)
out = sys.stderr.getvalue()
self.assertTrue('0.0' not in out)
self.assertTrue('[DEPRECATION WARNING]' in out)
sys.stderr = StringIO.StringIO()
warnings = C.DEPRECATION_WARNINGS
C.DEPRECATION_WARNINGS = False
ansible.utils.deprecated('Ack!', None)
out = sys.stderr.getvalue()
self.assertTrue(not out)
C.DEPRECATION_WARNINGS = warnings
sys.stderr = sys_stderr
try:
ansible.utils.deprecated('Ack!', '0.0', True)
except ansible.errors.AnsibleError, e:
self.assertTrue('0.0' not in str(e))
self.assertTrue('[DEPRECATED]' in str(e))
else:
raise AssertionError("Incorrect exception, expected AnsibleError")
def test_warning(self):
sys_stderr = sys.stderr
sys.stderr = StringIO.StringIO()
ansible.utils.warning('ANSIBLE')
out = sys.stderr.getvalue()
sys.stderr = sys_stderr
self.assertTrue('[WARNING]: ANSIBLE' in out)
def test_combine_vars(self):
one = {'foo': {'bar': True}, 'baz': {'one': 'qux'}}
two = {'baz': {'two': 'qux'}}
replace = {'baz': {'two': 'qux'}, 'foo': {'bar': True}}
merge = {'baz': {'two': 'qux', 'one': 'qux'}, 'foo': {'bar': True}}
C.DEFAULT_HASH_BEHAVIOUR = 'replace'
self.assertEqual(ansible.utils.combine_vars(one, two), replace)
C.DEFAULT_HASH_BEHAVIOUR = 'merge'
self.assertEqual(ansible.utils.combine_vars(one, two), merge)
def test_err(self):
sys_stderr = sys.stderr
sys.stderr = StringIO.StringIO()
ansible.utils.err('ANSIBLE')
out = sys.stderr.getvalue()
sys.stderr = sys_stderr
self.assertEqual(out, 'ANSIBLE\n')
def test_exit(self):
sys_stderr = sys.stderr
sys.stderr = StringIO.StringIO()
try:
ansible.utils.exit('ansible')
except SystemExit, e:
self.assertEqual(e.code, 1)
self.assertEqual(sys.stderr.getvalue(), 'ansible\n')
else:
raise AssertionError('Incorrect exception, expected SystemExit')
finally:
sys.stderr = sys_stderr
def test_unfrackpath(self):
os.environ['TEST_ROOT'] = os.path.dirname(os.path.dirname(__file__))
self.assertEqual(ansible.utils.unfrackpath('$TEST_ROOT/units/../units/TestUtils.py'), __file__.rstrip('c'))
def test_is_executable(self):
self.assertEqual(ansible.utils.is_executable(__file__), 0)
bin_ansible = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
'bin', 'ansible')
self.assertNotEqual(ansible.utils.is_executable(bin_ansible), 0)
def test_get_diff(self):
standard = dict(
before_header='foo',
after_header='bar',
before='fooo',
after='foo'
)
standard_expected = """--- before: foo
+++ after: bar
@@ -1 +1 @@
-fooo+foo"""
# workaround py26 and py27 difflib differences
standard_expected = """-fooo+foo"""
diff = ansible.utils.get_diff(standard)
diff = diff.split('\n')
del diff[0]
del diff[0]
del diff[0]
diff = '\n'.join(diff)
self.assertEqual(diff, unicode(standard_expected))
def test_split_args(self):
# split_args is a smarter shlex.split for the needs of the way ansible uses it
def _split_info(input, desired, actual):
print "SENT: ", input
print "WANT: ", desired
print "GOT: ", actual
def _test_combo(input, desired):
actual = split_args(input)
_split_info(input, desired, actual)
assert actual == desired
# trivial splitting
_test_combo('a b=c d=f', ['a', 'b=c', 'd=f' ])
# mixed quotes
_test_combo('a b=\'c\' d="e" f=\'g\'', ['a', "b='c'", 'd="e"', "f='g'" ])
# with spaces
# FIXME: this fails, commenting out only for now
# _test_combo('a "\'one two three\'"', ['a', "'one two three'" ])
# TODO: ...
# jinja2 preservation
_test_combo('a {{ y }} z', ['a', '{{ y }}', 'z' ])
# jinja2 preservation with spaces and filters and other hard things
_test_combo(
'a {{ x | filter(\'moo\', \'param\') }} z {{ chicken }} "waffles"',
['a', "{{ x | filter('moo', 'param') }}", 'z', '{{ chicken }}', '"waffles"']
)
# invalid quote detection
self.assertRaises(Exception, split_args, 'hey I started a quote"')
self.assertRaises(Exception, split_args, 'hey I started a\' quote')
# jinja2 loop blocks with lots of complexity
_test_combo(
# in memory of neighbors cat
# we preserve line breaks unless a line continuation character precedes them
'a {% if x %} y {%else %} {{meow}} {% endif %} "cookie\nchip" \\\ndone\nand done',
['a', '{% if x %}', 'y', '{%else %}', '{{meow}}', '{% endif %}', '"cookie\nchip"', 'done\n', 'and', 'done']
)
# test space preservation within quotes
_test_combo(
'content="1 2 3 4 " foo=bar',
['content="1 2 3 4 "', 'foo=bar']
)
# invalid jinja2 nesting detection
# invalid quote nesting detection
def test_clean_data(self):
# clean data removes jinja2 tags from data
self.assertEqual(
ansible.utils._clean_data('this is a normal string', from_remote=True),
'this is a normal string'
)
self.assertEqual(
ansible.utils._clean_data('this string has a {{variable}}', from_remote=True),
'this string has a {#variable#}'
)
self.assertEqual(
ansible.utils._clean_data('this string {{has}} two {{variables}} in it', from_remote=True),
'this string {#has#} two {#variables#} in it'
)
self.assertEqual(
ansible.utils._clean_data('this string has a {{variable with a\nnewline}}', from_remote=True),
'this string has a {#variable with a\nnewline#}'
)
self.assertEqual(
ansible.utils._clean_data('this string is from inventory {{variable}}', from_inventory=True),
'this string is from inventory {{variable}}'
)
self.assertEqual(
ansible.utils._clean_data('this string is from inventory too but uses lookup {{lookup("foo","bar")}}', from_inventory=True),
'this string is from inventory too but uses lookup {#lookup("foo","bar")#}'
)
self.assertEqual(
ansible.utils._clean_data('this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}', from_remote=True),
'this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}'
)
self.assertEqual(
ansible.utils._clean_data('this string contains unicode: ¢ £ ¤ ¥', from_remote=True),
'this string contains unicode: ¢ £ ¤ ¥'
)
def test_censor_unlogged_data(self):
''' used by the no_log attribute '''
input = dict(
password='sekrit',
rc=12,
failed=True,
changed=False,
skipped=True,
msg='moo',
)
data = ansible.utils.censor_unlogged_data(input)
assert 'password' not in data
assert 'rc' in data
assert 'failed' in data
assert 'changed' in data
assert 'skipped' in data
assert 'msg' not in data
assert data['censored'] == 'results hidden due to no_log parameter'
def test_repo_url_to_role_name(self):
tests = [("http://git.example.com/repos/repo.git", "repo"),
("ssh://git@git.example.com:repos/role-name", "role-name"),
("ssh://git@git.example.com:repos/role-name,v0.1", "role-name"),
("directory/role/is/installed/in", "directory/role/is/installed/in")]
for (url, result) in tests:
self.assertEqual(ansible.utils.repo_url_to_role_name(url), result)
def test_role_spec_parse(self):
tests = [
(
"git+http://git.example.com/repos/repo.git,v1.0",
{
'scm': 'git',
'src': 'http://git.example.com/repos/repo.git',
'version': 'v1.0',
'name': 'repo'
}
),
(
"http://repo.example.com/download/tarfile.tar.gz",
{
'scm': None,
'src': 'http://repo.example.com/download/tarfile.tar.gz',
'version': '',
'name': 'tarfile'
}
),
(
"http://repo.example.com/download/tarfile.tar.gz,,nicename",
{
'scm': None,
'src': 'http://repo.example.com/download/tarfile.tar.gz',
'version': '',
'name': 'nicename'
}
),
(
"git+http://git.example.com/repos/repo.git,v1.0,awesome",
{
'scm': 'git',
'src': 'http://git.example.com/repos/repo.git',
'version': 'v1.0',
'name': 'awesome'
}
),
(
# test that http://github URLs are assumed git+http:// unless they end in .tar.gz
"http://github.com/ansible/fakerole/fake",
{
'scm' : 'git',
'src' : 'http://github.com/ansible/fakerole/fake',
'version' : 'master',
'name' : 'fake'
}
),
(
# test that http://github URLs are assumed git+http:// unless they end in .tar.gz
"http://github.com/ansible/fakerole/fake/archive/master.tar.gz",
{
'scm' : None,
'src' : 'http://github.com/ansible/fakerole/fake/archive/master.tar.gz',
'version' : '',
'name' : 'master'
}
)
]
for (spec, result) in tests:
self.assertEqual(ansible.utils.role_spec_parse(spec), result)
def test_role_yaml_parse(self):
tests = (
(
# Old style
{
'role': 'debops.elasticsearch',
'name': 'elks'
},
{
'role': 'debops.elasticsearch',
'name': 'elks',
'scm': None,
'src': 'debops.elasticsearch',
'version': '',
}
),
(
{
'role': 'debops.elasticsearch,1.0,elks',
'my_param': 'foo'
},
{
'role': 'debops.elasticsearch,1.0,elks',
'name': 'elks',
'scm': None,
'src': 'debops.elasticsearch',
'version': '1.0',
'my_param': 'foo',
}
),
(
{
'role': 'debops.elasticsearch,1.0',
'my_param': 'foo'
},
{
'role': 'debops.elasticsearch,1.0',
'name': 'debops.elasticsearch',
'scm': None,
'src': 'debops.elasticsearch',
'version': '1.0',
'my_param': 'foo',
}
),
# New style
(
{
'src': 'debops.elasticsearch',
'name': 'elks',
'my_param': 'foo'
},
{
'name': 'elks',
'scm': None,
'src': 'debops.elasticsearch',
'version': '',
'my_param': 'foo'
}
),
)
for (role, result) in tests:
self.assertEqual(ansible.utils.role_yaml_parse(role), result)
@patch('ansible.utils.plugins.module_finder._get_paths')
def test_find_plugin(self, mock_get_paths):
tmp_path = tempfile.mkdtemp()
mock_get_paths.return_value = [tmp_path,]
right_module_1 = 'module.py'
right_module_2 = 'module_without_extension'
wrong_module_1 = 'folder'
wrong_module_2 = 'inexistent'
path_right_module_1 = os.path.join(tmp_path, right_module_1)
path_right_module_2 = os.path.join(tmp_path, right_module_2)
path_wrong_module_1 = os.path.join(tmp_path, wrong_module_1)
open(path_right_module_1, 'w').close()
open(path_right_module_2, 'w').close()
os.mkdir(path_wrong_module_1)
self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(right_module_1),
path_right_module_1)
self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(right_module_2),
path_right_module_2)
self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(wrong_module_1),
None)
self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(wrong_module_2),
None)
shutil.rmtree(tmp_path)