openssl_*: proper mode support (#54085)

* Add write helper.

* Adjust modules (except openssl_certificate).

* Adding tests for mode (with openssl_privatekey).

* Add openssl_certificate support.

* Never, ever remove the output file before actually trying to generate new content for it.

Removal is only allowed when state=absent, or when the object has been regenerated and the result needs to be written to that place.

* Add changelog.

* Extend test.
pull/54337/head
Felix Fontein 6 years ago committed by Martin Krizek
parent 9c355e5c52
commit d7a273273a

@ -0,0 +1,5 @@
minor_changes:
- "openssl_pkcs12, openssl_privatekey, openssl_publickey - These modules no longer delete the output file before starting to regenerate the output, or when generating the output failed."
bugfixes:
- "openssl_pkcs12, openssl_privatekey - These modules now accept the output file mode in symbolic form or as a octal string (https://github.com/ansible/ansible/issues/53476)."
- "openssl_certificate, openssl_csr, openssl_pkcs12, openssl_privatekey, openssl_publickey - The modules are now able to overwrite write-protected files (https://github.com/ansible/ansible/issues/48656)."

@ -40,6 +40,7 @@ import errno
import hashlib import hashlib
import os import os
import re import re
import tempfile
from ansible.module_utils import six from ansible.module_utils import six
from ansible.module_utils._text import to_bytes, to_text from ansible.module_utils._text import to_bytes, to_text
@ -235,6 +236,49 @@ def select_message_digest(digest_string):
return digest return digest
def write_file(module, content, default_mode=None):
'''
Writes content into destination file as securely as possible.
Uses file arguments from module.
'''
# Find out parameters for file
file_args = module.load_file_common_arguments(module.params)
if file_args['mode'] is None:
file_args['mode'] = default_mode
# Create tempfile name
tmp_fd, tmp_name = tempfile.mkstemp(prefix=b'.ansible_tmp')
try:
os.close(tmp_fd)
except Exception as dummy:
pass
module.add_cleanup_file(tmp_name) # if we fail, let Ansible try to remove the file
try:
try:
# Create tempfile
file = os.open(tmp_name, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
os.write(file, content)
os.close(file)
except Exception as e:
try:
os.remove(tmp_name)
except Exception as dummy:
pass
module.fail_json(msg='Error while writing result into temporary file: {0}'.format(e))
# Update destination to wanted permissions
if os.path.exists(file_args['path']):
module.set_fs_attributes_if_different(file_args, False)
# Move tempfile to final destination
module.atomic_move(tmp_name, file_args['path'])
# Try to update permissions again
module.set_fs_attributes_if_different(file_args, False)
except Exception as e:
try:
os.remove(tmp_name)
except Exception as dummy:
pass
module.fail_json(msg='Error while writing result: {0}'.format(e))
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class OpenSSLObject(object): class OpenSSLObject(object):

@ -758,12 +758,7 @@ class SelfSignedCertificateCryptography(Certificate):
self.cert = certificate self.cert = certificate
try: crypto_utils.write_file(module, certificate.public_bytes(Encoding.PEM))
with open(self.path, 'wb') as cert_file:
cert_file.write(certificate.public_bytes(Encoding.PEM))
except Exception as exc:
raise CertificateError(exc)
self.changed = True self.changed = True
else: else:
self.cert = crypto_utils.load_certificate(self.path, backend=self.backend) self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
@ -841,12 +836,7 @@ class SelfSignedCertificate(Certificate):
cert.sign(self.privatekey, self.digest) cert.sign(self.privatekey, self.digest)
self.cert = cert self.cert = cert
try: crypto_utils.write_file(module, crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert))
with open(self.path, 'wb') as cert_file:
cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert))
except EnvironmentError as exc:
raise CertificateError(exc)
self.changed = True self.changed = True
file_args = module.load_file_common_arguments(module.params) file_args = module.load_file_common_arguments(module.params)
@ -934,12 +924,7 @@ class OwnCACertificateCryptography(Certificate):
self.cert = certificate self.cert = certificate
try: crypto_utils.write_file(module, certificate.public_bytes(Encoding.PEM))
with open(self.path, 'wb') as cert_file:
cert_file.write(certificate.public_bytes(Encoding.PEM))
except Exception as exc:
raise CertificateError(exc)
self.changed = True self.changed = True
else: else:
self.cert = crypto_utils.load_certificate(self.path, backend=self.backend) self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
@ -1028,12 +1013,7 @@ class OwnCACertificate(Certificate):
cert.sign(self.ca_privatekey, self.digest) cert.sign(self.ca_privatekey, self.digest)
self.cert = cert self.cert = cert
try: crypto_utils.write_file(module, crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert))
with open(self.path, 'wb') as cert_file:
cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert))
except EnvironmentError as exc:
raise CertificateError(exc)
self.changed = True self.changed = True
file_args = module.load_file_common_arguments(module.params) file_args = module.load_file_common_arguments(module.params)
@ -1623,8 +1603,7 @@ class AcmeCertificate(Certificate):
self.csr_path, self.csr_path,
self.challenge_path), self.challenge_path),
check_rc=True)[1] check_rc=True)[1]
with open(self.path, 'wb') as certfile: crypto_utils.write_file(module, to_bytes(crt))
certfile.write(to_bytes(crt))
self.changed = True self.changed = True
except OSError as exc: except OSError as exc:
raise CertificateError(exc) raise CertificateError(exc)

@ -422,14 +422,7 @@ class CertificateSigningRequestBase(crypto_utils.OpenSSLObject):
'''Generate the certificate signing request.''' '''Generate the certificate signing request.'''
if not self.check(module, perms_required=False) or self.force: if not self.check(module, perms_required=False) or self.force:
result = self._generate_csr() result = self._generate_csr()
crypto_utils.write_file(module, result)
try:
csr_file = open(self.path, 'wb')
csr_file.write(result)
csr_file.close()
except (IOError, OSError) as exc:
raise CertificateSigningRequestError(exc)
self.changed = True self.changed = True
file_args = module.load_file_common_arguments(module.params) file_args = module.load_file_common_arguments(module.params)

@ -199,9 +199,9 @@ class Pkcs(crypto_utils.OpenSSLObject):
self.privatekey_passphrase = module.params['privatekey_passphrase'] self.privatekey_passphrase = module.params['privatekey_passphrase']
self.privatekey_path = module.params['privatekey_path'] self.privatekey_path = module.params['privatekey_path']
self.src = module.params['src'] self.src = module.params['src']
self.mode = module.params['mode']
if not self.mode: if module.params['mode'] is None:
self.mode = 0o400 module.params['mode'] = '0400'
def check(self, module, perms_required=True): def check(self, module, perms_required=True):
"""Ensure the resource is in its desired state.""" """Ensure the resource is in its desired state."""
@ -240,11 +240,6 @@ class Pkcs(crypto_utils.OpenSSLObject):
self.pkcs12 = crypto.PKCS12() self.pkcs12 = crypto.PKCS12()
try:
self.remove(module)
except PkcsError as exc:
module.fail_json(msg=to_native(exc))
if self.ca_certificates: if self.ca_certificates:
ca_certs = [crypto_utils.load_certificate(ca_cert) for ca_cert ca_certs = [crypto_utils.load_certificate(ca_cert) for ca_cert
in self.ca_certificates] in self.ca_certificates]
@ -266,22 +261,16 @@ class Pkcs(crypto_utils.OpenSSLObject):
except crypto_utils.OpenSSLBadPassphraseError as exc: except crypto_utils.OpenSSLBadPassphraseError as exc:
raise PkcsError(exc) raise PkcsError(exc)
try: crypto_utils.write_file(
pkcs12_file = os.open(self.path, module,
os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.pkcs12.export(self.passphrase, self.iter_size, self.maciter_size),
self.mode) 0o600
os.write(pkcs12_file, self.pkcs12.export(self.passphrase, )
self.iter_size, self.maciter_size))
os.close(pkcs12_file)
except (IOError, OSError) as exc:
self.remove(module)
raise PkcsError(exc)
def parse(self, module): def parse(self, module):
"""Read PKCS#12 file.""" """Read PKCS#12 file."""
try: try:
self.remove(module)
with open(self.src, 'rb') as pkcs12_fh: with open(self.src, 'rb') as pkcs12_fh:
pkcs12_content = pkcs12_fh.read() pkcs12_content = pkcs12_fh.read()
p12 = crypto.load_pkcs12(pkcs12_content, p12 = crypto.load_pkcs12(pkcs12_content,
@ -291,14 +280,9 @@ class Pkcs(crypto_utils.OpenSSLObject):
crt = crypto.dump_certificate(crypto.FILETYPE_PEM, crt = crypto.dump_certificate(crypto.FILETYPE_PEM,
p12.get_certificate()) p12.get_certificate())
pkcs12_file = os.open(self.path, crypto_utils.write_file(module, b'%s%s' % (pkey, crt))
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
self.mode)
os.write(pkcs12_file, b'%s%s' % (pkey, crt))
os.close(pkcs12_file)
except IOError as exc: except IOError as exc:
self.remove(module)
raise PkcsError(exc) raise PkcsError(exc)

@ -275,9 +275,8 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject):
self.backup = module.params['backup'] self.backup = module.params['backup']
self.backup_file = None self.backup_file = None
self.mode = module.params.get('mode', None) if module.params['mode'] is None:
if self.mode is None: module.params['mode'] = '0600'
self.mode = 0o600
@abc.abstractmethod @abc.abstractmethod
def _generate_private_key_data(self): def _generate_private_key_data(self):
@ -294,26 +293,8 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject):
if self.backup: if self.backup:
self.backup_file = module.backup_local(self.path) self.backup_file = module.backup_local(self.path)
privatekey_data = self._generate_private_key_data() privatekey_data = self._generate_private_key_data()
try: crypto_utils.write_file(module, privatekey_data, 0o600)
privatekey_file = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
os.close(privatekey_file)
if isinstance(self.mode, string_types):
try:
self.mode = int(self.mode, 8)
except ValueError as e:
try:
st = os.lstat(self.path)
self.mode = AnsibleModule._symbolic_mode_to_octal(st, self.mode)
except ValueError as e:
module.fail_json(msg="%s" % to_native(e), exception=traceback.format_exc())
os.chmod(self.path, self.mode)
privatekey_file = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode)
os.write(privatekey_file, privatekey_data)
os.close(privatekey_file)
self.changed = True self.changed = True
except IOError as exc:
self.remove()
raise PrivateKeyError(exc)
self.fingerprint = self._get_fingerprint() self.fingerprint = self._get_fingerprint()
file_args = module.load_file_common_arguments(module.params) file_args = module.load_file_common_arguments(module.params)

@ -197,8 +197,7 @@ class PublicKey(crypto_utils.OpenSSLObject):
) )
publickey_content = crypto.dump_publickey(crypto.FILETYPE_PEM, self.privatekey) publickey_content = crypto.dump_publickey(crypto.FILETYPE_PEM, self.privatekey)
with open(self.path, 'wb') as publickey_file: crypto_utils.write_file(module, publickey_content)
publickey_file.write(publickey_content)
self.changed = True self.changed = True
except crypto_utils.OpenSSLBadPassphraseError as exc: except crypto_utils.OpenSSLBadPassphraseError as exc:
@ -206,7 +205,6 @@ class PublicKey(crypto_utils.OpenSSLObject):
except (IOError, OSError) as exc: except (IOError, OSError) as exc:
raise PublicKeyError(exc) raise PublicKeyError(exc)
except AttributeError as exc: except AttributeError as exc:
self.remove(module)
raise PublicKeyError('You need to have PyOpenSSL>=16.0.0 to generate public keys') raise PublicKeyError('You need to have PyOpenSSL>=16.0.0 to generate public keys')
self.fingerprint = crypto_utils.get_fingerprint( self.fingerprint = crypto_utils.get_fingerprint(

@ -203,3 +203,33 @@
backup: yes backup: yes
state: absent state: absent
register: remove_2 register: remove_2
- name: Generate privatekey_mode (mode 0400)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_mode.pem'
mode: '0400'
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_mode_1
- name: Stat for privatekey_mode
stat:
path: '{{ output_dir }}/privatekey_mode.pem'
register: privatekey_mode_1_stat
- name: Generate privatekey_mode (mode 0400, idempotency)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_mode.pem'
mode: '0400'
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_mode_2
- name: Generate privatekey_mode (mode 0400, force)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_mode.pem'
mode: '0400'
force: yes
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_mode_3
- name: Stat for privatekey_mode
stat:
path: '{{ output_dir }}/privatekey_mode.pem'
register: privatekey_mode_3_stat

@ -126,3 +126,13 @@
- remove_2 is not changed - remove_2 is not changed
- remove_1.backup_file is string - remove_1.backup_file is string
- remove_2.backup_file is undefined - remove_2.backup_file is undefined
- name: Validate mode
assert:
that:
- privatekey_mode_1 is changed
- privatekey_mode_1_stat.stat.mode == '0400'
- privatekey_mode_2 is not changed
- privatekey_mode_3 is changed
- privatekey_mode_3_stat.stat.mode == '0400'
- privatekey_mode_1_stat.stat.mtime != privatekey_mode_3_stat.stat.mtime

Loading…
Cancel
Save