# Copyright 2018, Matt Martz # Copyright 2019, Andrew Klychkov @Andersson007 # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import annotations import os import json import pytest from collections.abc import Mapping from datetime import date, datetime, timezone, timedelta from ansible.parsing.ajson import AnsibleJSONEncoder, AnsibleJSONDecoder from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode from ansible.utils.unsafe_proxy import AnsibleUnsafeText def test_AnsibleJSONDecoder_vault(): with open(os.path.join(os.path.dirname(__file__), 'fixtures/ajson.json')) as f: data = json.load(f, cls=AnsibleJSONDecoder) assert isinstance(data['password'], AnsibleVaultEncryptedUnicode) assert isinstance(data['bar']['baz'][0]['password'], AnsibleVaultEncryptedUnicode) assert isinstance(data['foo']['password'], AnsibleVaultEncryptedUnicode) def test_encode_decode_unsafe(): data = { 'key_value': AnsibleUnsafeText(u'{#NOTACOMMENT#}'), 'list': [AnsibleUnsafeText(u'{#NOTACOMMENT#}')], 'list_dict': [{'key_value': AnsibleUnsafeText(u'{#NOTACOMMENT#}')}]} json_expected = ( '{"key_value": {"__ansible_unsafe": "{#NOTACOMMENT#}"}, ' '"list": [{"__ansible_unsafe": "{#NOTACOMMENT#}"}], ' '"list_dict": [{"key_value": {"__ansible_unsafe": "{#NOTACOMMENT#}"}}]}' ) assert json.dumps(data, cls=AnsibleJSONEncoder, preprocess_unsafe=True, sort_keys=True) == json_expected assert json.loads(json_expected, cls=AnsibleJSONDecoder) == data def vault_data(): """ Prepare AnsibleVaultEncryptedUnicode test data for AnsibleJSONEncoder.default(). Return a list of tuples (input, expected). """ with open(os.path.join(os.path.dirname(__file__), 'fixtures/ajson.json')) as f: data = json.load(f, cls=AnsibleJSONDecoder) data_0 = data['password'] data_1 = data['bar']['baz'][0]['password'] expected_0 = (u'$ANSIBLE_VAULT;1.1;AES256\n34646264306632313333393636316' '562356435376162633631326264383934326565333633366238\n3863' '373264326461623132613931346165636465346337310a32643431383' '0316337393263616439\n646539373134633963666338613632666334' '65663730303633323534363331316164623237363831\n35363335613' '93238370a313330316263373938326162386433313336613532653538' '376662306435\n3339\n') expected_1 = (u'$ANSIBLE_VAULT;1.1;AES256\n34646264306632313333393636316' '562356435376162633631326264383934326565333633366238\n3863' '373264326461623132613931346165636465346337310a32643431383' '0316337393263616439\n646539373134633963666338613632666334' '65663730303633323534363331316164623237363831\n35363335613' '93238370a313330316263373938326162386433313336613532653538' '376662306435\n3338\n') return [ (data_0, expected_0), (data_1, expected_1), ] class TestAnsibleJSONEncoder: """ Namespace for testing AnsibleJSONEncoder. """ @pytest.fixture(scope='class') def mapping(self, request): """ Returns object of Mapping mock class. The object is used for testing handling of Mapping objects in AnsibleJSONEncoder.default(). Using a plain dictionary instead is not suitable because it is handled by default encoder of the superclass (json.JSONEncoder). """ class M(Mapping): """Mock mapping class.""" def __init__(self, *args, **kwargs): self.__dict__.update(*args, **kwargs) def __getitem__(self, key): return self.__dict__[key] def __iter__(self): return iter(self.__dict__) def __len__(self): return len(self.__dict__) mapping = M(request.param) assert isinstance(len(mapping), int) # ensure coverage of __len__ return mapping @pytest.fixture def ansible_json_encoder(self): """Return AnsibleJSONEncoder object.""" return AnsibleJSONEncoder() ############### # Test methods: @pytest.mark.parametrize( 'test_input,expected', [ (datetime(2019, 5, 14, 13, 39, 38, 569047), '2019-05-14T13:39:38.569047'), (datetime(2019, 5, 14, 13, 47, 16, 923866), '2019-05-14T13:47:16.923866'), (date(2019, 5, 14), '2019-05-14'), (date(2020, 5, 14), '2020-05-14'), (datetime(2019, 6, 15, 14, 45, tzinfo=timezone.utc), '2019-06-15T14:45:00+00:00'), (datetime(2019, 6, 15, 14, 45, tzinfo=timezone(timedelta(hours=1, minutes=40))), '2019-06-15T14:45:00+01:40'), ] ) def test_date_datetime(self, ansible_json_encoder, test_input, expected): """ Test for passing datetime.date or datetime.datetime objects to AnsibleJSONEncoder.default(). """ assert ansible_json_encoder.default(test_input) == expected @pytest.mark.parametrize( 'mapping,expected', [ ({1: 1}, {1: 1}), ({2: 2}, {2: 2}), ({1: 2}, {1: 2}), ({2: 1}, {2: 1}), ], indirect=['mapping'], ) def test_mapping(self, ansible_json_encoder, mapping, expected): """ Test for passing Mapping object to AnsibleJSONEncoder.default(). """ assert ansible_json_encoder.default(mapping) == expected @pytest.mark.parametrize('test_input,expected', vault_data()) def test_ansible_json_decoder_vault(self, ansible_json_encoder, test_input, expected): """ Test for passing AnsibleVaultEncryptedUnicode to AnsibleJSONEncoder.default(). """ assert ansible_json_encoder.default(test_input) == {'__ansible_vault': expected} assert json.dumps(test_input, cls=AnsibleJSONEncoder, preprocess_unsafe=True) == '{"__ansible_vault": "%s"}' % expected.replace('\n', '\\n') @pytest.mark.parametrize( 'test_input,expected', [ ({1: 'first'}, {1: 'first'}), ({2: 'second'}, {2: 'second'}), ] ) def test_default_encoder(self, ansible_json_encoder, test_input, expected): """ Test for the default encoder of AnsibleJSONEncoder.default(). If objects of different classes that are not tested above were passed, AnsibleJSONEncoder.default() invokes 'default()' method of json.JSONEncoder superclass. """ assert ansible_json_encoder.default(test_input) == expected @pytest.mark.parametrize('test_input', [1, 1.1, 'string', [1, 2], set('set'), True, None]) def test_default_encoder_unserializable(self, ansible_json_encoder, test_input): """ Test for the default encoder of AnsibleJSONEncoder.default(), not serializable objects. It must fail with TypeError 'object is not serializable'. """ with pytest.raises(TypeError): ansible_json_encoder.default(test_input)