From de5c18af7ccf21c45b04a136e27bff6f73a2036d Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Tue, 18 Feb 2020 22:23:50 +0100 Subject: [PATCH] openssl_* / x509_* modules: refactoring (#67540) * Move common code to module_utils. * Unify get_relative_time_option. --- lib/ansible/module_utils/crypto.py | 88 ++++++++++++++++++- .../modules/crypto/openssl_certificate.py | 62 ++++--------- .../crypto/openssl_certificate_info.py | 27 +----- lib/ansible/modules/crypto/x509_crl.py | 88 ++----------------- 4 files changed, 114 insertions(+), 151 deletions(-) diff --git a/lib/ansible/module_utils/crypto.py b/lib/ansible/module_utils/crypto.py index 8eebeeb3ecf..e67eeff1b42 100644 --- a/lib/ansible/module_utils/crypto.py +++ b/lib/ansible/module_utils/crypto.py @@ -109,6 +109,7 @@ try: except ImportError: CRYPTOGRAPHY_HAS_ED448 = False + HAS_CRYPTOGRAPHY = True except ImportError: # Error handled in the calling module. CRYPTOGRAPHY_HAS_X25519 = False @@ -116,6 +117,7 @@ except ImportError: CRYPTOGRAPHY_HAS_X448 = False CRYPTOGRAPHY_HAS_ED25519 = False CRYPTOGRAPHY_HAS_ED448 = False + HAS_CRYPTOGRAPHY = False import abc @@ -129,7 +131,7 @@ import re import tempfile from ansible.module_utils import six -from ansible.module_utils._text import to_bytes, to_text +from ansible.module_utils._text import to_native, to_bytes, to_text class OpenSSLObjectError(Exception): @@ -362,6 +364,40 @@ def convert_relative_to_datetime(relative_time_string): return datetime.datetime.utcnow() - offset +def get_relative_time_option(input_string, input_name, backend='cryptography'): + """Return an absolute timespec if a relative timespec or an ASN1 formatted + string is provided. + + The return value will be a datetime object for the cryptography backend, + and a ASN1 formatted string for the pyopenssl backend.""" + result = to_native(input_string) + if result is None: + raise OpenSSLObjectError( + 'The timespec "%s" for %s is not valid' % + input_string, input_name) + # Relative time + if result.startswith("+") or result.startswith("-"): + result_datetime = convert_relative_to_datetime(result) + if backend == 'pyopenssl': + return result_datetime.strftime("%Y%m%d%H%M%SZ") + elif backend == 'cryptography': + return result_datetime + # Absolute time + if backend == 'pyopenssl': + return input_string + elif backend == 'cryptography': + for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']: + try: + return datetime.datetime.strptime(result, date_fmt) + except ValueError: + pass + + raise OpenSSLObjectError( + 'The time spec "%s" for %s is invalid' % + (input_string, input_name) + ) + + def select_message_digest(digest_string): digest = None if digest_string == 'sha256': @@ -2037,3 +2073,53 @@ def cryptography_compare_public_keys(key1, key2): b = key2.public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw) return a == b return key1.public_numbers() == key2.public_numbers() + + +if HAS_CRYPTOGRAPHY: + REVOCATION_REASON_MAP = { + 'unspecified': x509.ReasonFlags.unspecified, + 'key_compromise': x509.ReasonFlags.key_compromise, + 'ca_compromise': x509.ReasonFlags.ca_compromise, + 'affiliation_changed': x509.ReasonFlags.affiliation_changed, + 'superseded': x509.ReasonFlags.superseded, + 'cessation_of_operation': x509.ReasonFlags.cessation_of_operation, + 'certificate_hold': x509.ReasonFlags.certificate_hold, + 'privilege_withdrawn': x509.ReasonFlags.privilege_withdrawn, + 'aa_compromise': x509.ReasonFlags.aa_compromise, + 'remove_from_crl': x509.ReasonFlags.remove_from_crl, + } + REVOCATION_REASON_MAP_INVERSE = dict() + for k, v in REVOCATION_REASON_MAP.items(): + REVOCATION_REASON_MAP_INVERSE[v] = k + + +def cryptography_decode_revoked_certificate(cert): + result = { + 'serial_number': cert.serial_number, + 'revocation_date': cert.revocation_date, + 'issuer': None, + 'issuer_critical': False, + 'reason': None, + 'reason_critical': False, + 'invalidity_date': None, + 'invalidity_date_critical': False, + } + try: + ext = cert.extensions.get_extension_for_class(x509.CertificateIssuer) + result['issuer'] = list(ext.value) + result['issuer_critical'] = ext.critical + except x509.ExtensionNotFound: + pass + try: + ext = cert.extensions.get_extension_for_class(x509.CRLReason) + result['reason'] = ext.value.reason + result['reason_critical'] = ext.critical + except x509.ExtensionNotFound: + pass + try: + ext = cert.extensions.get_extension_for_class(x509.InvalidityDate) + result['invalidity_date'] = ext.value.invalidity_date + result['invalidity_date_critical'] = ext.critical + except x509.ExtensionNotFound: + pass + return result diff --git a/lib/ansible/modules/crypto/openssl_certificate.py b/lib/ansible/modules/crypto/openssl_certificate.py index 88dd09e9a91..4bd5e5c4682 100644 --- a/lib/ansible/modules/crypto/openssl_certificate.py +++ b/lib/ansible/modules/crypto/openssl_certificate.py @@ -959,34 +959,6 @@ class Certificate(crypto_utils.OpenSSLObject): self.backup = module.params['backup'] self.backup_file = None - def get_relative_time_option(self, input_string, input_name): - """Return an ASN1 formatted string if a relative timespec - or an ASN1 formatted string is provided.""" - result = to_native(input_string) - if result is None: - raise CertificateError( - 'The timespec "%s" for %s is not valid' % - input_string, input_name) - if result.startswith("+") or result.startswith("-"): - result_datetime = crypto_utils.convert_relative_to_datetime( - result) - if self.backend == 'pyopenssl': - return result_datetime.strftime("%Y%m%d%H%M%SZ") - elif self.backend == 'cryptography': - return result_datetime - if self.backend == 'cryptography': - for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']: - try: - return datetime.datetime.strptime(result, date_fmt) - except ValueError: - pass - - raise CertificateError( - 'The time spec "%s" for %s is invalid' % - (input_string, input_name) - ) - return input_string - def _validate_privatekey(self): if self.backend == 'pyopenssl': ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD) @@ -1147,8 +1119,8 @@ class SelfSignedCertificateCryptography(Certificate): def __init__(self, module): super(SelfSignedCertificateCryptography, self).__init__(module, 'cryptography') self.create_subject_key_identifier = module.params['selfsigned_create_subject_key_identifier'] - self.notBefore = self.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before') - self.notAfter = self.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after') + self.notBefore = crypto_utils.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend) + self.notAfter = crypto_utils.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend) self.digest = crypto_utils.select_message_digest(module.params['selfsigned_digest']) self.version = module.params['selfsigned_version'] self.serial_number = x509.random_serial_number() @@ -1280,8 +1252,8 @@ class SelfSignedCertificate(Certificate): super(SelfSignedCertificate, self).__init__(module, 'pyopenssl') if module.params['selfsigned_create_subject_key_identifier'] != 'create_if_not_provided': module.fail_json(msg='selfsigned_create_subject_key_identifier cannot be used with the pyOpenSSL backend!') - self.notBefore = self.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before') - self.notAfter = self.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after') + self.notBefore = crypto_utils.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend) + self.notAfter = crypto_utils.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend) self.digest = module.params['selfsigned_digest'] self.version = module.params['selfsigned_version'] self.serial_number = randint(1000, 99999) @@ -1379,8 +1351,8 @@ class OwnCACertificateCryptography(Certificate): super(OwnCACertificateCryptography, self).__init__(module, 'cryptography') self.create_subject_key_identifier = module.params['ownca_create_subject_key_identifier'] self.create_authority_key_identifier = module.params['ownca_create_authority_key_identifier'] - self.notBefore = self.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before') - self.notAfter = self.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after') + self.notBefore = crypto_utils.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before', backend=self.backend) + self.notAfter = crypto_utils.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after', backend=self.backend) self.digest = crypto_utils.select_message_digest(module.params['ownca_digest']) self.version = module.params['ownca_version'] self.serial_number = x509.random_serial_number() @@ -1575,8 +1547,8 @@ class OwnCACertificate(Certificate): def __init__(self, module): super(OwnCACertificate, self).__init__(module, 'pyopenssl') - self.notBefore = self.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before') - self.notAfter = self.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after') + self.notBefore = crypto_utils.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before', backend=self.backend) + self.notAfter = crypto_utils.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after', backend=self.backend) self.digest = module.params['ownca_digest'] self.version = module.params['ownca_version'] self.serial_number = randint(1000, 99999) @@ -1937,7 +1909,7 @@ class AssertOnlyCertificateBase(Certificate): if self.not_before is not None: cert_not_valid_before = self._validate_not_before() - if cert_not_valid_before != self.get_relative_time_option(self.not_before, 'not_before'): + if cert_not_valid_before != crypto_utils.get_relative_time_option(self.not_before, 'not_before', backend=self.backend): messages.append( 'Invalid not_before component (got %s, expected %s to be present)' % (cert_not_valid_before, self.not_before) @@ -1945,7 +1917,7 @@ class AssertOnlyCertificateBase(Certificate): if self.not_after is not None: cert_not_valid_after = self._validate_not_after() - if cert_not_valid_after != self.get_relative_time_option(self.not_after, 'not_after'): + if cert_not_valid_after != crypto_utils.get_relative_time_option(self.not_after, 'not_after', backend=self.backend): messages.append( 'Invalid not_after component (got %s, expected %s to be present)' % (cert_not_valid_after, self.not_after) @@ -2120,15 +2092,15 @@ class AssertOnlyCertificateCryptography(AssertOnlyCertificateBase): return self.cert.not_valid_after def _validate_valid_at(self): - rt = self.get_relative_time_option(self.valid_at, 'valid_at') + rt = crypto_utils.get_relative_time_option(self.valid_at, 'valid_at', backend=self.backend) return self.cert.not_valid_before, rt, self.cert.not_valid_after def _validate_invalid_at(self): - rt = self.get_relative_time_option(self.invalid_at, 'invalid_at') + rt = crypto_utils.get_relative_time_option(self.invalid_at, 'invalid_at', backend=self.backend) return self.cert.not_valid_before, rt, self.cert.not_valid_after def _validate_valid_in(self): - valid_in_date = self.get_relative_time_option(self.valid_in, "valid_in") + valid_in_date = crypto_utils.get_relative_time_option(self.valid_in, "valid_in", backend=self.backend) return self.cert.not_valid_before, valid_in_date, self.cert.not_valid_after @@ -2285,17 +2257,17 @@ class AssertOnlyCertificate(AssertOnlyCertificateBase): return self.cert.get_notAfter() def _validate_valid_at(self): - rt = self.get_relative_time_option(self.valid_at, "valid_at") + rt = crypto_utils.get_relative_time_option(self.valid_at, "valid_at", backend=self.backend) rt = to_bytes(rt, errors='surrogate_or_strict') return self.cert.get_notBefore(), rt, self.cert.get_notAfter() def _validate_invalid_at(self): - rt = self.get_relative_time_option(self.invalid_at, "invalid_at") + rt = crypto_utils.get_relative_time_option(self.invalid_at, "invalid_at", backend=self.backend) rt = to_bytes(rt, errors='surrogate_or_strict') return self.cert.get_notBefore(), rt, self.cert.get_notAfter() def _validate_valid_in(self): - valid_in_asn1 = self.get_relative_time_option(self.valid_in, "valid_in") + valid_in_asn1 = crypto_utils.get_relative_time_option(self.valid_in, "valid_in", backend=self.backend) valid_in_date = to_bytes(valid_in_asn1, errors='surrogate_or_strict') return self.cert.get_notBefore(), valid_in_date, self.cert.get_notAfter() @@ -2306,7 +2278,7 @@ class EntrustCertificate(Certificate): def __init__(self, module, backend): super(EntrustCertificate, self).__init__(module, backend) self.trackingId = None - self.notAfter = self.get_relative_time_option(module.params['entrust_not_after'], 'entrust_not_after') + self.notAfter = crypto_utils.get_relative_time_option(module.params['entrust_not_after'], 'entrust_not_after', backend=self.backend) if self.csr_content is None or not os.path.exists(self.csr_path): raise CertificateError( diff --git a/lib/ansible/modules/crypto/openssl_certificate_info.py b/lib/ansible/modules/crypto/openssl_certificate_info.py index cf6d0fc2a06..2d7459ae9df 100644 --- a/lib/ansible/modules/crypto/openssl_certificate_info.py +++ b/lib/ansible/modules/crypto/openssl_certificate_info.py @@ -348,31 +348,6 @@ else: TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" -def get_relative_time_option(input_string, input_name): - """Return an ASN1 formatted string if a relative timespec - or an ASN1 formatted string is provided.""" - result = input_string - if result.startswith("+") or result.startswith("-"): - return crypto_utils.convert_relative_to_datetime(result) - if result is None: - raise crypto_utils.OpenSSLObjectError( - 'The timespec "%s" for %s is not valid' % - input_string, input_name) - for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']: - try: - result = datetime.datetime.strptime(input_string, date_fmt) - break - except ValueError: - pass - - if not isinstance(result, datetime.datetime): - raise crypto_utils.OpenSSLObjectError( - 'The time spec "%s" for %s is invalid' % - (input_string, input_name) - ) - return result - - class CertificateInfo(crypto_utils.OpenSSLObject): def __init__(self, module, backend): super(CertificateInfo, self).__init__( @@ -394,7 +369,7 @@ class CertificateInfo(crypto_utils.OpenSSLObject): self.module.fail_json( msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v)) ) - self.valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k)) + self.valid_at[k] = crypto_utils.get_relative_time_option(v, 'valid_at.{0}'.format(k)) def generate(self): # Empty method because crypto_utils.OpenSSLObject wants this diff --git a/lib/ansible/modules/crypto/x509_crl.py b/lib/ansible/modules/crypto/x509_crl.py index 7bfb5e779b2..ef601edadc0 100644 --- a/lib/ansible/modules/crypto/x509_crl.py +++ b/lib/ansible/modules/crypto/x509_crl.py @@ -347,7 +347,6 @@ crl: ''' -import datetime import os import traceback from distutils.version import LooseVersion @@ -369,25 +368,8 @@ try: RevokedCertificateBuilder, NameAttribute, Name, - ReasonFlags, ) CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) - - REASON_MAP = { - 'unspecified': ReasonFlags.unspecified, - 'key_compromise': ReasonFlags.key_compromise, - 'ca_compromise': ReasonFlags.ca_compromise, - 'affiliation_changed': ReasonFlags.affiliation_changed, - 'superseded': ReasonFlags.superseded, - 'cessation_of_operation': ReasonFlags.cessation_of_operation, - 'certificate_hold': ReasonFlags.certificate_hold, - 'privilege_withdrawn': ReasonFlags.privilege_withdrawn, - 'aa_compromise': ReasonFlags.aa_compromise, - 'remove_from_crl': ReasonFlags.remove_from_crl, - } - REASON_MAP_INVERSE = dict() - for k, v in REASON_MAP.items(): - REASON_MAP_INVERSE[v] = k except ImportError: CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() CRYPTOGRAPHY_FOUND = False @@ -404,27 +386,6 @@ class CRLError(crypto_utils.OpenSSLObjectError): class CRL(crypto_utils.OpenSSLObject): - def get_relative_time_option(self, input_string, input_name): - """Return an ASN1 formatted string if a relative timespec - or an ASN1 formatted string is provided.""" - result = to_native(input_string) - if result is None: - raise CRLError( - 'The timespec "%s" for %s is not valid' % - input_string, input_name) - if result.startswith("+") or result.startswith("-"): - return crypto_utils.convert_relative_to_datetime(result) - for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']: - try: - return datetime.datetime.strptime(result, date_fmt) - except ValueError: - pass - - raise CRLError( - 'The time spec "%s" for %s is invalid' % - (input_string, input_name) - ) - def __init__(self, module): super(CRL, self).__init__( module.params['path'], @@ -447,8 +408,8 @@ class CRL(crypto_utils.OpenSSLObject): self.issuer = crypto_utils.parse_name_field(module.params['issuer']) self.issuer = [(entry[0], entry[1]) for entry in self.issuer if entry[1]] - self.last_update = self.get_relative_time_option(module.params['last_update'], 'last_update') - self.next_update = self.get_relative_time_option(module.params['next_update'], 'next_update') + self.last_update = crypto_utils.get_relative_time_option(module.params['last_update'], 'last_update') + self.next_update = crypto_utils.get_relative_time_option(module.params['next_update'], 'next_update') self.digest = crypto_utils.select_message_digest(module.params['digest']) if self.digest is None: @@ -494,15 +455,15 @@ class CRL(crypto_utils.OpenSSLObject): if rc['issuer']: result['issuer'] = [crypto_utils.cryptography_get_name(issuer) for issuer in rc['issuer']] result['issuer_critical'] = rc['issuer_critical'] - result['revocation_date'] = self.get_relative_time_option( + result['revocation_date'] = crypto_utils.get_relative_time_option( rc['revocation_date'], path_prefix + 'revocation_date' ) if rc['reason']: - result['reason'] = REASON_MAP[rc['reason']] + result['reason'] = crypto_utils.REVOCATION_REASON_MAP[rc['reason']] result['reason_critical'] = rc['reason_critical'] if rc['invalidity_date']: - result['invalidity_date'] = self.get_relative_time_option( + result['invalidity_date'] = crypto_utils.get_relative_time_option( rc['invalidity_date'], path_prefix + 'invalidity_date' ) @@ -563,37 +524,6 @@ class CRL(crypto_utils.OpenSSLObject): entry['invalidity_date_critical'], ) - def _decode_revoked(self, cert): - result = { - 'serial_number': cert.serial_number, - 'revocation_date': cert.revocation_date, - 'issuer': None, - 'issuer_critical': False, - 'reason': None, - 'reason_critical': False, - 'invalidity_date': None, - 'invalidity_date_critical': False, - } - try: - ext = cert.extensions.get_extension_for_class(x509.CertificateIssuer) - result['issuer'] = list(ext.value) - result['issuer_critical'] = ext.critical - except x509.ExtensionNotFound: - pass - try: - ext = cert.extensions.get_extension_for_class(x509.CRLReason) - result['reason'] = ext.value.reason - result['reason_critical'] = ext.critical - except x509.ExtensionNotFound: - pass - try: - ext = cert.extensions.get_extension_for_class(x509.InvalidityDate) - result['invalidity_date'] = ext.value.invalidity_date - result['invalidity_date_critical'] = ext.critical - except x509.ExtensionNotFound: - pass - return result - def check(self, perms_required=True): """Ensure the resource is in its desired state.""" @@ -616,7 +546,7 @@ class CRL(crypto_utils.OpenSSLObject): if want_issuer != [(sub.oid, sub.value) for sub in self.crl.issuer]: return False - old_entries = [self._compress_entry(self._decode_revoked(cert)) for cert in self.crl] + old_entries = [self._compress_entry(crypto_utils.cryptography_decode_revoked_certificate(cert)) for cert in self.crl] new_entries = [self._compress_entry(cert) for cert in self.revoked_certificates] if self.update: # We don't simply use a set so that duplicate entries are treated correctly @@ -649,7 +579,7 @@ class CRL(crypto_utils.OpenSSLObject): if self.update and self.crl: new_entries = set([self._compress_entry(entry) for entry in self.revoked_certificates]) for entry in self.crl: - decoded_entry = self._compress_entry(self._decode_revoked(entry)) + decoded_entry = self._compress_entry(crypto_utils.cryptography_decode_revoked_certificate(entry)) if decoded_entry not in new_entries: crl = crl.add_revoked_certificate(entry) for entry in self.revoked_certificates: @@ -700,7 +630,7 @@ class CRL(crypto_utils.OpenSSLObject): [crypto_utils.cryptography_decode_name(issuer) for issuer in entry['issuer']] if entry['issuer'] is not None else None, 'issuer_critical': entry['issuer_critical'], - 'reason': REASON_MAP_INVERSE.get(entry['reason']) if entry['reason'] is not None else None, + 'reason': crypto_utils.REVOCATION_REASON_MAP_INVERSE.get(entry['reason']) if entry['reason'] is not None else None, 'reason_critical': entry['reason_critical'], 'invalidity_date': entry['invalidity_date'].strftime(TIMESTAMP_FORMAT) @@ -758,7 +688,7 @@ class CRL(crypto_utils.OpenSSLObject): result['issuer'][k] = v result['revoked_certificates'] = [] for cert in self.crl: - entry = self._decode_revoked(cert) + entry = crypto_utils.cryptography_decode_revoked_certificate(cert) result['revoked_certificates'].append(self._dump_revoked(entry)) if self.return_content: