From df3e4cd7f42fe4724441290fc779243cd766b841 Mon Sep 17 00:00:00 2001 From: Adrian Likins Date: Wed, 8 Feb 2017 16:09:34 -0500 Subject: [PATCH] Don't check for var._obj in template._clean_data (#20868) * Don't check for var._obj in template._clean_data AnsibleUnsafe or other unsafe vars used to have a '_obj' slot but no longer do. This was causing attribute errors if a object was 'unsafe' but not a string. Add tests for AnsibleUnsafe, lookups, and AnsibleContext --- lib/ansible/template/__init__.py | 6 +- test/units/template/test_templar.py | 379 ++++++++++++++++++++++++++-- 2 files changed, 365 insertions(+), 20 deletions(-) diff --git a/lib/ansible/template/__init__.py b/lib/ansible/template/__init__.py index 2704eb915b0..6adb880dcaf 100644 --- a/lib/ansible/template/__init__.py +++ b/lib/ansible/template/__init__.py @@ -345,11 +345,7 @@ class Templar: if hasattr(variable, '__UNSAFE__'): if isinstance(variable, text_type): rval = self._clean_data(variable) - else: - # Do we need to convert these into text_type as well? - # return self._clean_data(to_text(variable._obj, nonstring='passthru')) - rval = self._clean_data(variable._obj) - return rval + return rval try: if convert_bare: diff --git a/test/units/template/test_templar.py b/test/units/template/test_templar.py index 6c8f5e7eb64..80d305a9a3d 100644 --- a/test/units/template/test_templar.py +++ b/test/units/template/test_templar.py @@ -19,25 +19,23 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +from jinja2.runtime import Context + from ansible.compat.tests import unittest -from ansible.compat.tests.mock import patch, MagicMock +from ansible.compat.tests.mock import patch +from ansible.compat.six import string_types from ansible import constants as C -from ansible.errors import * -from ansible.plugins import filter_loader, lookup_loader, module_loader -from ansible.plugins.strategy import SharedPluginLoaderObj -from ansible.template import Templar - +from ansible.errors import AnsibleError, AnsibleUndefinedVariable +from ansible.template import Templar, AnsibleContext, AnsibleEnvironment +from ansible.vars.unsafe_proxy import AnsibleUnsafe, wrap_var +#from ansible.unsafe_proxy import AnsibleUnsafe, wrap_var from units.mock.loader import DictDataLoader -class TestTemplar(unittest.TestCase): +class BaseTemplar(object): def setUp(self): - fake_loader = DictDataLoader({ - "/path/to/my_file.txt": "foo\n", - }) - shared_loader = SharedPluginLoaderObj() - variables = dict( + self.test_vars = dict( foo="bar", bam="{{foo}}", num=1, @@ -47,12 +45,218 @@ class TestTemplar(unittest.TestCase): bad_dict="{a='b'", var_list=[1], recursive="{{recursive}}", + some_var="blip", + some_static_var="static_blip", + some_keyword="{{ foo }}", + some_unsafe_var=wrap_var("unsafe_blip"), + some_static_unsafe_var=wrap_var("static_unsafe_blip"), + some_unsafe_keyword=wrap_var("{{ foo }}"), ) - self.templar = Templar(loader=fake_loader, variables=variables) + self.fake_loader = DictDataLoader({ + "/path/to/my_file.txt": "foo\n", + }) + self.templar = Templar(loader=self.fake_loader, variables=self.test_vars) + + def is_unsafe(self, obj): + if obj is None: + return False + + if hasattr(obj, '__UNSAFE__'): + return True + + if isinstance(obj, AnsibleUnsafe): + return True + + if isinstance(obj, dict): + for key in obj.keys(): + if self.is_unsafe(key) or self.is_unsafe(obj[key]): + return True + + if isinstance(obj, list): + for item in obj: + if self.is_unsafe(item): + return True + + if isinstance(obj, string_types) and hasattr(obj, '__UNSAFE__'): + return True + + return False + + +# class used for testing arbitrary objects passed to template +class SomeClass(object): + foo = 'bar' + + def __init__(self): + self.blip = 'blip' + + +class SomeUnsafeClass(AnsibleUnsafe): + def __init__(self): + super(SomeUnsafeClass, self).__init__() + self.blip = 'unsafe blip' + + +class TestTemplarTemplate(BaseTemplar, unittest.TestCase): + def test_lookup_jinja_dict_key_in_static_vars(self): + res = self.templar.template("{'some_static_var': '{{ some_var }}'}", + static_vars=['some_static_var']) + #self.assertEqual(res['{{ a_keyword }}'], "blip") + print(res) + + def test_templatable(self): + res = self.templar.templatable('foo') + self.assertTrue(res) + + def test_templatable_none(self): + res = self.templar.templatable(None) + self.assertTrue(res) + + @patch('ansible.template.Templar.template', side_effect=AnsibleError) + def test_templatable_exception(self, mock_template): + res = self.templar.templatable('foo') + self.assertFalse(res) + + def test_template_convert_bare_string(self): + # Note: no bare_deprecated=False so we hit the deprecation path + res = self.templar.template('foo', convert_bare=True) + self.assertEqual(res, 'bar') + + def test_template_convert_bare_nested(self): + res = self.templar.template('bam', convert_bare=True, bare_deprecated=False) + self.assertEqual(res, 'bar') + + def test_template_convert_bare_unsafe(self): + res = self.templar.template('some_unsafe_var', convert_bare=True, bare_deprecated=False) + self.assertEqual(res, 'unsafe_blip') + #self.assertIsInstance(res, AnsibleUnsafe) + self.assertTrue(self.is_unsafe(res), 'returned value from template.template (%s) is not marked unsafe' % res) + + def test_template_convert_bare_filter(self): + res = self.templar.template('bam|capitalize', convert_bare=True, bare_deprecated=False) + self.assertEqual(res, 'Bar') + + def test_template_convert_bare_filter_unsafe(self): + res = self.templar.template('some_unsafe_var|capitalize', convert_bare=True, bare_deprecated=False) + self.assertEqual(res, 'Unsafe_blip') + #self.assertIsInstance(res, AnsibleUnsafe) + self.assertTrue(self.is_unsafe(res), 'returned value from template.template (%s) is not marked unsafe' % res) + + def test_template_convert_data(self): + res = self.templar.template('{{foo}}', convert_data=True) + self.assertTrue(res) + self.assertEqual(res, 'bar') + + @patch('ansible.template.safe_eval', side_effect=AnsibleError) + def test_template_convert_data_template_in_data(self, mock_safe_eval): + res = self.templar.template('{{bam}}', convert_data=True) + self.assertTrue(res) + self.assertEqual(res, 'bar') + + def test_template_convert_data_bare(self): + res = self.templar.template('bam', convert_data=True) + self.assertTrue(res) + self.assertEqual(res, 'bam') + + def test_template_convert_data_to_json(self): + res = self.templar.template('{{bam|to_json}}', convert_data=True) + self.assertTrue(res) + self.assertEqual(res, '"bar"') + + def test_template_convert_data_convert_bare_data_bare(self): + res = self.templar.template('bam', convert_data=True, convert_bare=True) + self.assertTrue(res) + self.assertEqual(res, 'bar') + + def test_template_unsafe_non_string(self): + unsafe_obj = AnsibleUnsafe() + res = self.templar.template(unsafe_obj) + self.assertTrue(self.is_unsafe(res), 'returned value from template.template (%s) is not marked unsafe' % res) + + def test_template_unsafe_non_string_subclass(self): + unsafe_obj = SomeUnsafeClass() + res = self.templar.template(unsafe_obj) + self.assertTrue(self.is_unsafe(res), 'returned value from template.template (%s) is not marked unsafe' % res) + + @patch('ansible.template.Templar._clean_data', side_effect=AnsibleError) + def test_template_unsafe_clean_data_exception(self, mock_clean_data): + self.assertRaises(AnsibleError, + self.templar.template, + wrap_var('blip bar')) + + # TODO: not sure what template is supposed to do it, but it currently throws attributeError + @patch('ansible.template.Templar._clean_data') + def test_template_unsafe_non_string_clean_data_exception(self, mock_clean_data): + msg = 'Error raised from _clean_data by test_template_unsafe_non_string_clean_data_exception' + mock_clean_data.side_effect = AnsibleError(msg) + unsafe_obj = AnsibleUnsafe() + res = self.templar.template(unsafe_obj) + self.assertTrue(self.is_unsafe(res), 'returned value from template.template (%s) is not marked unsafe' % res) + + # TODO: not sure what template is supposed to do it, but it currently throws attributeError + @patch('ansible.template.Templar._clean_data', side_effect=AnsibleError) + def test_template_unsafe_non_string_subclass_clean_data_exception(self, mock_clean_data): + unsafe_obj = SomeUnsafeClass() + self.assertTrue(self.is_unsafe(unsafe_obj)) + res = self.templar.template(unsafe_obj) + self.assertTrue(self.is_unsafe(res), 'returned value from template.template (%s) is not marked unsafe' % res) - def tearDown(self): - pass + def test_weird(self): + data = u'''1 2 #}huh{# %}ddfg{% }}dfdfg{{ {%what%} {{#foo#}} {%{bar}%} {#%blip%#} {{asdfsd%} 3 4 {{foo}} 5 6 7''' + self.assertRaisesRegexp(AnsibleError, + 'template error while templating string', + self.templar.template, + data) + +class TestTemplarCleanData(BaseTemplar, unittest.TestCase): + def test_clean_data(self): + res = self.templar._clean_data(u'some string') + self.assertEqual(res, u'some string') + + def test_clean_data_not_stringtype(self): + res = self.templar._clean_data(None) + # None vs NoneType + self.assertEqual(res, None) + + def test_clean_data_jinja(self): + res = self.templar._clean_data(u'1 2 {what} 3 4 {{foo}} 5 6 7') + self.assertEqual(res, u'1 2 {what} 3 4 {#foo#} 5 6 7') + + def test_clean_data_block(self): + res = self.templar._clean_data(u'1 2 {%what%} 3 4 {{foo}} 5 6 7') + self.assertEqual(res, u'1 2 {#what#} 3 4 {#foo#} 5 6 7') + +# def test_clean_data_weird(self): +# res = self.templar._clean_data(u'1 2 #}huh{# %}ddfg{% }}dfdfg{{ {%what%} {{#foo#}} {%{bar}%} {#%blip%#} {{asdfsd%} 3 4 {{foo}} 5 6 7') +# print(res) + + self.assertEqual(res, u'1 2 {#what#} 3 4 {#foo#} 5 6 7') + + def test_clean_data_object(self): + obj = {'foo': [1, 2, 3, 'bdasdf', '{what}', '{{foo}}', 5]} + res = self.templar._clean_data(obj) + self.assertEqual(res, obj) + + def test_clean_data_object_unsafe(self): + rval = [1, 2, 3, wrap_var('bdasdf'), '{what}', wrap_var('{{unsafe_foo}}'), 5] + obj = {'foo': rval} + res = self.templar._clean_data(obj) + self.assertEqual(res, obj) + self.assertTrue(self.is_unsafe(res), 'returned value of _clean_data (%s) is not marked unsafe.' % res) + + def test_clean_data_bad_dict(self): + res = self.templar._clean_data(u'{{bad_dict}}') + self.assertEqual(res, u'{#bad_dict#}') + + def test_clean_data_unsafe_obj(self): + some_obj = SomeClass() + unsafe_obj = wrap_var(some_obj) + res = self.templar._clean_data(unsafe_obj) + self.assertIsInstance(res, SomeClass) + + +class TestTemplarMisc(BaseTemplar, unittest.TestCase): def test_templar_simple(self): templar = self.templar @@ -111,3 +315,148 @@ class TestTemplar(unittest.TestCase): finally: C.DEFAULT_JINJA2_EXTENSIONS = old_exts + +class TestTemplarLookup(BaseTemplar, unittest.TestCase): + def test_lookup_missing_plugin(self): + self.assertRaisesRegexp(AnsibleError, + 'lookup plugin \(not_a_real_lookup_plugin\) not found', + self.templar._lookup, + 'not_a_real_lookup_plugin', + 'an_arg', a_keyword_arg='a_keyword_arg_value') + + def test_lookup_list(self): + res = self.templar._lookup('list', 'an_arg', 'another_arg') + self.assertEqual(res, 'an_arg,another_arg') + + def test_lookup_jinja_undefined(self): + self.assertRaisesRegexp(AnsibleUndefinedVariable, + "'an_undefined_jinja_var' is undefined", + self.templar._lookup, + 'list', '{{ an_undefined_jinja_var }}') + + def test_lookup_jinja_defined(self): + res = self.templar._lookup('list', '{{ some_var }}') + self.assertTrue(self.is_unsafe(res)) + #self.assertIsInstance(res, AnsibleUnsafe) + + def test_lookup_jinja_dict_string_passed(self): + self.assertRaisesRegexp(AnsibleError, + "with_dict expects a dict", + self.templar._lookup, + 'dict', + '{{ some_var }}') + + def test_lookup_jinja_dict_list_passed(self): + self.assertRaisesRegexp(AnsibleError, + "with_dict expects a dict", + self.templar._lookup, + 'dict', + ['foo', 'bar']) + + def test_lookup_jinja_kwargs(self): + res = self.templar._lookup('list', 'blip', random_keyword='12345') + self.assertTrue(self.is_unsafe(res)) + #self.assertIsInstance(res, AnsibleUnsafe) + + def test_lookup_jinja_list_wantlist(self): + res = self.templar._lookup('list', '{{ some_var }}', wantlist=True) + self.assertEqual(res, ["blip"]) + + def test_lookup_jinja_list_wantlist_undefined(self): + self.assertRaisesRegexp(AnsibleUndefinedVariable, + "'some_undefined_var' is undefined", + self.templar._lookup, + 'list', + '{{ some_undefined_var }}', + wantlist=True) + + def test_lookup_jinja_list_wantlist_unsafe(self): + res = self.templar._lookup('list', '{{ some_unsafe_var }}', wantlist=True) + for lookup_result in res: + self.assertTrue(self.is_unsafe(lookup_result)) + #self.assertIsInstance(lookup_result, AnsibleUnsafe) + + # Should this be an AnsibleUnsafe + # self.assertIsInstance(res, AnsibleUnsafe) + + def test_lookup_jinja_dict(self): + res = self.templar._lookup('list', {'{{ a_keyword }}': '{{ some_var }}'}) + self.assertEqual(res['{{ a_keyword }}'], "blip") + # TODO: Should this be an AnsibleUnsafe + #self.assertIsInstance(res['{{ a_keyword }}'], AnsibleUnsafe) + #self.assertIsInstance(res, AnsibleUnsafe) + + def test_lookup_jinja_dict_unsafe(self): + res = self.templar._lookup('list', {'{{ some_unsafe_key }}': '{{ some_unsafe_var }}'}) + self.assertTrue(self.is_unsafe(res['{{ some_unsafe_key }}'])) + #self.assertIsInstance(res['{{ some_unsafe_key }}'], AnsibleUnsafe) + # TODO: Should this be an AnsibleUnsafe + #self.assertIsInstance(res, AnsibleUnsafe) + + def test_lookup_jinja_dict_unsafe_value(self): + res = self.templar._lookup('list', {'{{ a_keyword }}': '{{ some_unsafe_var }}'}) + self.assertTrue(self.is_unsafe(res['{{ a_keyword }}'])) + #self.assertIsInstance(res['{{ a_keyword }}'], AnsibleUnsafe) + # TODO: Should this be an AnsibleUnsafe + #self.assertIsInstance(res, AnsibleUnsafe) + + def test_lookup_jinja_none(self): + res = self.templar._lookup('list', None) + self.assertIsNone(res) + + +class TestAnsibleContext(BaseTemplar, unittest.TestCase): + def _context(self, variables=None): + variables = variables or {} + + env = AnsibleEnvironment() + context = AnsibleContext(env, parent={}, name='some_context', + blocks={}) + + for key, value in variables.items(): + context.vars[key] = value + + return context + + def test(self): + context = self._context() + self.assertIsInstance(context, AnsibleContext) + self.assertIsInstance(context, Context) + + def test_resolve_unsafe(self): + context = self._context(variables={'some_unsafe_key': wrap_var('some_unsafe_string')}) + res = context.resolve('some_unsafe_key') + #self.assertIsInstance(res, AnsibleUnsafe) + self.assertTrue(self.is_unsafe(res), + 'return of AnsibleContext.resolve (%s) was expected to be marked unsafe but was not' % res) + + def test_resolve_unsafe_list(self): + context = self._context(variables={'some_unsafe_key': [wrap_var('some unsafe string 1')]}) + res = context.resolve('some_unsafe_key') + #self.assertIsInstance(res[0], AnsibleUnsafe) + self.assertTrue(self.is_unsafe(res), + 'return of AnsibleContext.resolve (%s) was expected to be marked unsafe but was not' % res) + + def test_resolve_unsafe_dict(self): + context = self._context(variables={'some_unsafe_key': + {'an_unsafe_dict': wrap_var('some unsafe string 1')} + }) + res = context.resolve('some_unsafe_key') + self.assertTrue(self.is_unsafe(res['an_unsafe_dict']), + 'return of AnsibleContext.resolve (%s) was expected to be marked unsafe but was not' % res['an_unsafe_dict']) + + def test_resolve(self): + context = self._context(variables={'some_key': 'some_string'}) + res = context.resolve('some_key') + self.assertEqual(res, 'some_string') + #self.assertNotIsInstance(res, AnsibleUnsafe) + self.assertFalse(self.is_unsafe(res), + 'return of AnsibleContext.resolve (%s) was not expected to be marked unsafe but was' % res) + + def test_resolve_none(self): + context = self._context(variables={'some_key': None}) + res = context.resolve('some_key') + self.assertEqual(res, None) + #self.assertNotIsInstance(res, AnsibleUnsafe) + self.assertFalse(self.is_unsafe(res), + 'return of AnsibleContext.resolve (%s) was not expected to be marked unsafe but was' % res)