diff --git a/lib/ansible/parsing/vault/__init__.py b/lib/ansible/parsing/vault/__init__.py index 2a98511f885..c781a8f92e8 100644 --- a/lib/ansible/parsing/vault/__init__.py +++ b/lib/ansible/parsing/vault/__init__.py @@ -427,6 +427,10 @@ class VaultEditor: # A file to be encrypted into a vaultfile could be any encoding # so treat the contents as a byte string. + + # follow the symlink + filename = os.path.realpath(filename) + b_plaintext = self.read_data(filename) b_ciphertext = self.vault.encrypt(b_plaintext) self.write_data(b_ciphertext, output_file or filename) @@ -435,7 +439,11 @@ class VaultEditor: check_prereqs() + # follow the symlink + filename = os.path.realpath(filename) + ciphertext = self.read_data(filename) + try: plaintext = self.vault.decrypt(ciphertext) except AnsibleError as e: @@ -458,7 +466,11 @@ class VaultEditor: check_prereqs() + # follow the symlink + filename = os.path.realpath(filename) + ciphertext = self.read_data(filename) + try: plaintext = self.vault.decrypt(ciphertext) except AnsibleError as e: @@ -486,8 +498,12 @@ class VaultEditor: check_prereqs() + # follow the symlink + filename = os.path.realpath(filename) + prev = os.stat(filename) ciphertext = self.read_data(filename) + try: plaintext = self.vault.decrypt(ciphertext) except AnsibleError as e: diff --git a/test/units/parsing/vault/test_vault_editor.py b/test/units/parsing/vault/test_vault_editor.py index 15c0c861f56..51b8301a0d7 100644 --- a/test/units/parsing/vault/test_vault_editor.py +++ b/test/units/parsing/vault/test_vault_editor.py @@ -20,7 +20,6 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type -import sys import os import tempfile from nose.plugins.skip import SkipTest @@ -29,8 +28,7 @@ from ansible.compat.tests import unittest from ansible.compat.tests.mock import patch from ansible import errors -from ansible.parsing.vault import VaultLib -from ansible.parsing.vault import VaultEditor +from ansible.parsing import vault from ansible.module_utils._text import to_bytes, to_text @@ -71,25 +69,344 @@ v11_data = """$ANSIBLE_VAULT;1.1;AES256 class TestVaultEditor(unittest.TestCase): def setUp(self): - pass + self._test_dir = None def tearDown(self): - pass + if self._test_dir: + pass + #shutil.rmtree(self._test_dir) + self._test_dir = None def test_methods_exist(self): - v = VaultEditor(None) + v = vault.VaultEditor(None) slots = ['create_file', 'decrypt_file', 'edit_file', 'encrypt_file', 'rekey_file', 'read_data', - 'write_data', - 'shuffle_files'] + 'write_data'] for slot in slots: assert hasattr(v, slot), "VaultLib is missing the %s method" % slot - @patch.object(VaultEditor, '_editor_shell_command') + def _create_test_dir(self): + suffix = '_ansible_unit_test_%s_' % (self.__class__.__name__) + return tempfile.mkdtemp(suffix=suffix) + + def _create_file(self, test_dir, name, content=None, symlink=False): + file_path = os.path.join(test_dir, name) + opened_file = open(file_path, 'wb') + if content: + opened_file.write(content) + opened_file.close() + return file_path + + @patch('ansible.parsing.vault.call') + def test_edit_file_helper_empty_target(self, mock_sp_call): + self._test_dir = self._create_test_dir() + + src_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) + + mock_sp_call.side_effect = self._faux_command + ve = vault.VaultEditor('password') + + b_ciphertext = ve._edit_file_helper(src_file_path) + + self.assertNotEqual(src_contents, b_ciphertext) + + @patch('ansible.parsing.vault.call') + def test_edit_file_helper_call_exception(self, mock_sp_call): + self._test_dir = self._create_test_dir() + + src_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) + + error_txt = 'calling editor raised an exception' + mock_sp_call.side_effect = errors.AnsibleError(error_txt) + + ve = vault.VaultEditor('password') + + self.assertRaisesRegexp(errors.AnsibleError, + error_txt, + ve._edit_file_helper, + src_file_path) + + @patch('ansible.parsing.vault.call') + def test_edit_file_helper_symlink_target(self, mock_sp_call): + self._test_dir = self._create_test_dir() + + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + src_file_link_path = os.path.join(self._test_dir, 'a_link_to_dest_file') + + os.symlink(src_file_path, src_file_link_path) + + mock_sp_call.side_effect = self._faux_command + ve = vault.VaultEditor('password') + + b_ciphertext = ve._edit_file_helper(src_file_link_path) + + self.assertNotEqual(src_file_contents, b_ciphertext, + 'b_ciphertext should be encrypted and not equal to src_contents') + + def _faux_editor(self, editor_args, new_src_contents=None): + if editor_args[0] == 'shred': + return + + tmp_path = editor_args[-1] + + # simulate the tmp file being editted + tmp_file = open(tmp_path, 'wb') + if new_src_contents: + tmp_file.write(new_src_contents) + tmp_file.close() + + def _faux_command(self, tmp_path): + pass + + @patch('ansible.parsing.vault.call') + def test_edit_file_helper_no_change(self, mock_sp_call): + self._test_dir = self._create_test_dir() + + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + # editor invocation doesnt change anything + def faux_editor(editor_args): + self._faux_editor(editor_args, src_file_contents) + + mock_sp_call.side_effect = faux_editor + ve = vault.VaultEditor('password') + + ve._edit_file_helper(src_file_path, existing_data=src_file_contents) + + new_target_file = open(src_file_path, 'rb') + new_target_file_contents = new_target_file.read() + self.assertEqual(src_file_contents, new_target_file_contents) + + def _assert_file_is_encrypted(self, vault_editor, src_file_path, src_contents): + new_src_file = open(src_file_path, 'rb') + new_src_file_contents = new_src_file.read() + + # TODO: assert that it is encrypted + self.assertTrue(vault.is_encrypted(new_src_file_contents)) + + src_file_plaintext = vault_editor.vault.decrypt(new_src_file_contents) + + # the plaintext should not be encrypted + self.assertFalse(vault.is_encrypted(src_file_plaintext)) + + # and the new plaintext should match the original + self.assertEqual(src_file_plaintext, src_contents) + + def _assert_file_is_link(self, src_file_link_path, src_file_path): + self.assertTrue(os.path.islink(src_file_link_path), + 'The dest path (%s) should be a symlink to (%s) but is not' % (src_file_link_path, src_file_path)) + + def test_rekey_file(self): + self._test_dir = self._create_test_dir() + + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + ve = vault.VaultEditor('password') + ve.encrypt_file(src_file_path) + + new_password = 'password2:electricbugaloo' + ve.rekey_file(src_file_path, new_password) + + new_ve = vault.VaultEditor(new_password) + self._assert_file_is_encrypted(new_ve, src_file_path, src_file_contents) + + def test_rekey_file_no_new_password(self): + self._test_dir = self._create_test_dir() + + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + ve = vault.VaultEditor('password') + ve.encrypt_file(src_file_path) + + self.assertRaisesRegexp(errors.AnsibleError, + 'The value for the new_password to rekey', + ve.rekey_file, + src_file_path, + None) + + def test_rekey_file_not_encrypted(self): + self._test_dir = self._create_test_dir() + + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + ve = vault.VaultEditor('password') + + new_password = 'password2:electricbugaloo' + self.assertRaisesRegexp(errors.AnsibleError, + 'input is not vault encrypted data', + ve.rekey_file, + src_file_path, new_password) + + def test_plaintext(self): + self._test_dir = self._create_test_dir() + + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + ve = vault.VaultEditor('password') + ve.encrypt_file(src_file_path) + + res = ve.plaintext(src_file_path) + self.assertEquals(src_file_contents, res) + + def test_plaintext_not_encrypted(self): + self._test_dir = self._create_test_dir() + + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + ve = vault.VaultEditor('password') + self.assertRaisesRegexp(errors.AnsibleError, + 'input is not vault encrypted data', + ve.plaintext, + src_file_path) + + def test_encrypt_file(self): + self._test_dir = self._create_test_dir() + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + ve = vault.VaultEditor('password') + ve.encrypt_file(src_file_path) + + self._assert_file_is_encrypted(ve, src_file_path, src_file_contents) + + def test_encrypt_file_symlink(self): + self._test_dir = self._create_test_dir() + + src_file_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) + + src_file_link_path = os.path.join(self._test_dir, 'a_link_to_dest_file') + os.symlink(src_file_path, src_file_link_path) + + ve = vault.VaultEditor('password') + ve.encrypt_file(src_file_link_path) + + self._assert_file_is_encrypted(ve, src_file_path, src_file_contents) + self._assert_file_is_encrypted(ve, src_file_link_path, src_file_contents) + + self._assert_file_is_link(src_file_link_path, src_file_path) + + @patch('ansible.parsing.vault.call') + def test_edit_file(self, mock_sp_call): + self._test_dir = self._create_test_dir() + src_contents = to_bytes("some info in a file\nyup.") + + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) + + new_src_contents = to_bytes("The info is different now.") + + def faux_editor(editor_args): + self._faux_editor(editor_args, new_src_contents) + + mock_sp_call.side_effect = faux_editor + + ve = vault.VaultEditor('password') + + ve.encrypt_file(src_file_path) + ve.edit_file(src_file_path) + + new_src_file = open(src_file_path, 'rb') + new_src_file_contents = new_src_file.read() + + src_file_plaintext = ve.vault.decrypt(new_src_file_contents) + self.assertEqual(src_file_plaintext, new_src_contents) + + new_stat = os.stat(src_file_path) + print(new_stat) + + @patch('ansible.parsing.vault.call') + def test_edit_file_symlink(self, mock_sp_call): + self._test_dir = self._create_test_dir() + src_contents = to_bytes("some info in a file\nyup.") + + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) + + new_src_contents = to_bytes("The info is different now.") + + def faux_editor(editor_args): + self._faux_editor(editor_args, new_src_contents) + + mock_sp_call.side_effect = faux_editor + + ve = vault.VaultEditor('password') + + ve.encrypt_file(src_file_path) + + src_file_link_path = os.path.join(self._test_dir, 'a_link_to_dest_file') + + os.symlink(src_file_path, src_file_link_path) + + ve.edit_file(src_file_link_path) + + new_src_file = open(src_file_path, 'rb') + new_src_file_contents = new_src_file.read() + + src_file_plaintext = ve.vault.decrypt(new_src_file_contents) + + self._assert_file_is_link(src_file_link_path, src_file_path) + + self.assertEqual(src_file_plaintext, new_src_contents) + + #self.assertEqual(src_file_plaintext, new_src_contents, + # 'The decrypted plaintext of the editted file is not the expected contents.') + + @patch('ansible.parsing.vault.call') + def test_edit_file_not_encrypted(self, mock_sp_call): + self._test_dir = self._create_test_dir() + src_contents = to_bytes("some info in a file\nyup.") + + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) + + new_src_contents = to_bytes("The info is different now.") + + def faux_editor(editor_args): + self._faux_editor(editor_args, new_src_contents) + + mock_sp_call.side_effect = faux_editor + + ve = vault.VaultEditor('password') + self.assertRaisesRegexp(errors.AnsibleError, + 'input is not vault encrypted data', + ve.edit_file, + src_file_path) + + def test_create_file_exists(self): + self._test_dir = self._create_test_dir() + src_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) + + ve = vault.VaultEditor('password') + self.assertRaisesRegexp(errors.AnsibleError, + 'please use .edit. instead', + ve.create_file, + src_file_path) + + def test_decrypt_file_exception(self): + self._test_dir = self._create_test_dir() + src_contents = to_bytes("some info in a file\nyup.") + src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) + + ve = vault.VaultEditor('password') + self.assertRaisesRegexp(errors.AnsibleError, + 'input is not vault encrypted data', + ve.decrypt_file, + src_file_path) + + @patch.object(vault.VaultEditor, '_editor_shell_command') def test_create_file(self, mock_editor_shell_command): def sc_side_effect(filename): @@ -99,7 +416,7 @@ class TestVaultEditor(unittest.TestCase): tmp_file = tempfile.NamedTemporaryFile() os.unlink(tmp_file.name) - ve = VaultEditor("ansible") + ve = vault.VaultEditor("ansible") ve.create_file(tmp_file.name) self.assertTrue(os.path.exists(tmp_file.name)) @@ -113,7 +430,7 @@ class TestVaultEditor(unittest.TestCase): with v10_file as f: f.write(to_bytes(v10_data)) - ve = VaultEditor("ansible") + ve = vault.VaultEditor("ansible") # make sure the password functions for the cipher error_hit = False @@ -130,6 +447,7 @@ class TestVaultEditor(unittest.TestCase): os.unlink(v10_file.name) assert error_hit is False, "error decrypting 1.0 file" + self.assertEquals(fdata.strip(), "foo") assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip() def test_decrypt_1_1(self): @@ -140,7 +458,7 @@ class TestVaultEditor(unittest.TestCase): with v11_file as f: f.write(to_bytes(v11_data)) - ve = VaultEditor("ansible") + ve = vault.VaultEditor("ansible") # make sure the password functions for the cipher error_hit = False @@ -168,7 +486,7 @@ class TestVaultEditor(unittest.TestCase): with v10_file as f: f.write(to_bytes(v10_data)) - ve = VaultEditor("ansible") + ve = vault.VaultEditor("ansible") # make sure the password functions for the cipher error_hit = False @@ -185,7 +503,7 @@ class TestVaultEditor(unittest.TestCase): assert error_hit is False, "error rekeying 1.0 file to 1.1" # ensure filedata can be decrypted, is 1.1 and is AES256 - vl = VaultLib("ansible2") + vl = vault.VaultLib("ansible2") dec_data = None error_hit = False try: