diff --git a/changelogs/fragments/openssl-cryptography.yml b/changelogs/fragments/openssl-cryptography.yml deleted file mode 100644 index 394c6adfe9b..00000000000 --- a/changelogs/fragments/openssl-cryptography.yml +++ /dev/null @@ -1,3 +0,0 @@ -minor_changes: -- "openssl_csr - now works with both PyOpenSSL and cryptography Python libraries. Autodetection can be overridden with ``select_crypto_backend`` option." -- "openssl_privatekey - now works with both PyOpenSSL and cryptography Python libraries. Autodetection can be overridden with ``select_crypto_backend`` option." diff --git a/lib/ansible/modules/crypto/openssl_csr.py b/lib/ansible/modules/crypto/openssl_csr.py index e8add2f2c57..e67c9dd0b7f 100644 --- a/lib/ansible/modules/crypto/openssl_csr.py +++ b/lib/ansible/modules/crypto/openssl_csr.py @@ -25,9 +25,7 @@ description: the subjectAltName, keyUsage, extendedKeyUsage, basicConstraints and OCSP Must Staple extensions." requirements: - - "One of the following Python libraries:" - - "cryptography >= 1.3" - - "pyOpenSSL >= 0.15" + - "python-pyOpenSSL >= 0.15" options: state: required: false @@ -171,22 +169,6 @@ options: are required to reject such certificates (see U(https://tools.ietf.org/html/rfc7633#section-4))." version_added: 2.5 - select_crypto_backend: - description: - - "Determines which crypto backend to use. The default choice is C(auto), - which tries to use C(cryptography) if available, and falls back to - C(pyopenssl)." - - "If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) - library." - - "If set to C(cryptography), will try to use the - L(cryptography,https://cryptography.io/) library." - type: str - default: 'auto' - choices: - - auto - - cryptography - - pyopenssl - version_added: "2.8" extends_documentation_fragment: files notes: @@ -307,59 +289,37 @@ ocsp_must_staple: sample: false ''' -import abc import os -from distutils.version import LooseVersion from ansible.module_utils import crypto as crypto_utils from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils._text import to_native, to_bytes, to_text - -MINIMAL_PYOPENSSL_VERSION = '0.15' -MINIMAL_CRYPTOGRAPHY_VERSION = '1.3' +from ansible.module_utils._text import to_native, to_bytes try: import OpenSSL from OpenSSL import crypto - PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) except ImportError: - PYOPENSSL_FOUND = False + pyopenssl_found = False else: - PYOPENSSL_FOUND = True + pyopenssl_found = True if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: # OpenSSL 1.1.0 or newer - OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" - OPENSSL_MUST_STAPLE_VALUE = b"status_request" + MUST_STAPLE_NAME = b"tlsfeature" + MUST_STAPLE_VALUE = b"status_request" else: # OpenSSL 1.0.x or older - OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" - OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" - -try: - import cryptography - import cryptography.x509 - import cryptography.x509.oid - import cryptography.exceptions - import cryptography.hazmat.backends - import cryptography.hazmat.primitives.serialization - import cryptography.hazmat.primitives.hashes - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) -except ImportError: - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True - CRYPTOGRAPHY_MUST_STAPLE_NAME = cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") - CRYPTOGRAPHY_MUST_STAPLE_VALUE = b"\x30\x03\x02\x01\x05" + MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" + MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" class CertificateSigningRequestError(crypto_utils.OpenSSLObjectError): pass -class CertificateSigningRequestBase(crypto_utils.OpenSSLObject): +class CertificateSigningRequest(crypto_utils.OpenSSLObject): def __init__(self, module): - super(CertificateSigningRequestBase, self).__init__( + super(CertificateSigningRequest, self).__init__( module.params['path'], module.params['state'], module.params['force'], @@ -398,22 +358,53 @@ class CertificateSigningRequestBase(crypto_utils.OpenSSLObject): if not self.subjectAltName: for sub in self.subject: - if sub[0] in ('commonName', 'CN'): + if OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])) == 13: # 13 is the NID for "commonName" self.subjectAltName = ['DNS:%s' % sub[1]] break - @abc.abstractmethod - def _generate_csr(self): - pass - def generate(self, module): '''Generate the certificate signing request.''' + if not self.check(module, perms_required=False) or self.force: - result = self._generate_csr() + req = crypto.X509Req() + req.set_version(self.version - 1) + subject = req.get_subject() + for entry in self.subject: + if entry[1] is not None: + # Workaround for https://github.com/pyca/pyopenssl/issues/165 + nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0])) + OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0) + + extensions = [] + if self.subjectAltName: + altnames = ', '.join(self.subjectAltName) + extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii'))) + + if self.keyUsage: + usages = ', '.join(self.keyUsage) + extensions.append(crypto.X509Extension(b"keyUsage", self.keyUsage_critical, usages.encode('ascii'))) + + if self.extendedKeyUsage: + usages = ', '.join(self.extendedKeyUsage) + extensions.append(crypto.X509Extension(b"extendedKeyUsage", self.extendedKeyUsage_critical, usages.encode('ascii'))) + + if self.basicConstraints: + usages = ', '.join(self.basicConstraints) + extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii'))) + + if self.ocspMustStaple: + extensions.append(crypto.X509Extension(MUST_STAPLE_NAME, self.ocspMustStaple_critical, MUST_STAPLE_VALUE)) + + if extensions: + req.add_extensions(extensions) + + req.set_pubkey(self.privatekey) + req.sign(self.privatekey, self.digest) + self.request = req try: csr_file = open(self.path, 'wb') - csr_file.write(result) + csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request)) csr_file.close() except (IOError, OSError) as exc: raise CertificateSigningRequestError(exc) @@ -424,91 +415,12 @@ class CertificateSigningRequestBase(crypto_utils.OpenSSLObject): if module.set_fs_attributes_if_different(file_args, False): self.changed = True - @abc.abstractmethod - def _load_private_key(self): - pass - - @abc.abstractmethod - def _check_csr(self): - pass - def check(self, module, perms_required=True): """Ensure the resource is in its desired state.""" - state_and_perms = super(CertificateSigningRequestBase, self).check(module, perms_required) - - self._load_private_key() - - if not state_and_perms: - return False - - return self._check_csr() + state_and_perms = super(CertificateSigningRequest, self).check(module, perms_required) - def dump(self): - '''Serialize the object into a dictionary.''' - - result = { - 'privatekey': self.privatekey_path, - 'filename': self.path, - 'subject': self.subject, - 'subjectAltName': self.subjectAltName, - 'keyUsage': self.keyUsage, - 'extendedKeyUsage': self.extendedKeyUsage, - 'basicConstraints': self.basicConstraints, - 'ocspMustStaple': self.ocspMustStaple, - 'changed': self.changed - } - - return result - - -class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase): - - def __init__(self, module): - super(CertificateSigningRequestPyOpenSSL, self).__init__(module) - - def _generate_csr(self): - req = crypto.X509Req() - req.set_version(self.version - 1) - subject = req.get_subject() - for entry in self.subject: - if entry[1] is not None: - # Workaround for https://github.com/pyca/pyopenssl/issues/165 - nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0])) - OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0) - - extensions = [] - if self.subjectAltName: - altnames = ', '.join(self.subjectAltName) - extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii'))) - - if self.keyUsage: - usages = ', '.join(self.keyUsage) - extensions.append(crypto.X509Extension(b"keyUsage", self.keyUsage_critical, usages.encode('ascii'))) - - if self.extendedKeyUsage: - usages = ', '.join(self.extendedKeyUsage) - extensions.append(crypto.X509Extension(b"extendedKeyUsage", self.extendedKeyUsage_critical, usages.encode('ascii'))) - - if self.basicConstraints: - usages = ', '.join(self.basicConstraints) - extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii'))) - - if self.ocspMustStaple: - extensions.append(crypto.X509Extension(OPENSSL_MUST_STAPLE_NAME, self.ocspMustStaple_critical, OPENSSL_MUST_STAPLE_VALUE)) - - if extensions: - req.add_extensions(extensions) - - req.set_pubkey(self.privatekey) - req.sign(self.privatekey, self.digest) - self.request = req - - return crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request) - - def _load_private_key(self): self.privatekey = crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase) - def _check_csr(self): def _check_subject(csr): subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in self.subject] current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in csr.get_subject().get_components()] @@ -564,7 +476,7 @@ class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase): return _check_keyUsage_(extensions, b'basicConstraints', self.basicConstraints, self.basicConstraints_critical) def _check_ocspMustStaple(extensions): - oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE] + oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == MUST_STAPLE_NAME and to_bytes(ext) == MUST_STAPLE_VALUE] if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: # Older versions of libssl don't know about OCSP Must Staple oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) @@ -585,345 +497,29 @@ class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase): except crypto.Error: return False + if not state_and_perms: + return False + csr = crypto_utils.load_certificate_request(self.path) return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr) + def dump(self): + '''Serialize the object into a dictionary.''' -class CertificateSigningRequestCryptography(CertificateSigningRequestBase): - - def __init__(self, module): - super(CertificateSigningRequestCryptography, self).__init__(module) - self.cryptography_backend = cryptography.hazmat.backends.default_backend() - - def _get_name_oid(self, id): - if id in ('CN', 'commonName'): - return cryptography.x509.oid.NameOID.COMMON_NAME - if id in ('C', 'countryName'): - return cryptography.x509.oid.NameOID.COUNTRY_NAME - if id in ('L', 'localityName'): - return cryptography.x509.oid.NameOID.LOCALITY_NAME - if id in ('ST', 'stateOrProvinceName'): - return cryptography.x509.oid.NameOID.STATE_OR_PROVINCE_NAME - if id in ('street', 'streetAddress'): - return cryptography.x509.oid.NameOID.STREET_ADDRESS - if id in ('O', 'organizationName'): - return cryptography.x509.oid.NameOID.ORGANIZATION_NAME - if id in ('OU', 'organizationalUnitName'): - return cryptography.x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME - if id in ('serialNumber', ): - return cryptography.x509.oid.NameOID.SERIAL_NUMBER - if id in ('SN', 'surname'): - return cryptography.x509.oid.NameOID.SURNAME - if id in ('GN', 'givenName'): - return cryptography.x509.oid.NameOID.GIVEN_NAME - if id in ('title', ): - return cryptography.x509.oid.NameOID.TITLE - if id in ('generationQualifier', ): - return cryptography.x509.oid.NameOID.GENERATION_QUALIFIER - if id in ('x500UniqueIdentifier', ): - return cryptography.x509.oid.NameOID.X500_UNIQUE_IDENTIFIER - if id in ('dnQualifier', ): - return cryptography.x509.oid.NameOID.DN_QUALIFIER - if id in ('pseudonym', ): - return cryptography.x509.oid.NameOID.PSEUDONYM - if id in ('UID', 'userId'): - return cryptography.x509.oid.NameOID.USER_ID - if id in ('DC', 'domainComponent'): - return cryptography.x509.oid.NameOID.DOMAIN_COMPONENT - if id in ('emailAddress', ): - return cryptography.x509.oid.NameOID.EMAIL_ADDRESS - if id in ('jurisdictionC', 'jurisdictionCountryName'): - return cryptography.x509.oid.NameOID.JURISDICTION_COUNTRY_NAME - if id in ('jurisdictionL', 'jurisdictionLocalityName'): - return cryptography.x509.oid.NameOID.JURISDICTION_LOCALITY_NAME - if id in ('jurisdictionST', 'jurisdictionStateOrProvinceName'): - return cryptography.x509.oid.NameOID.JURISDICTION_STATE_OR_PROVINCE_NAME - if id in ('businessCategory', ): - return cryptography.x509.oid.NameOID.BUSINESS_CATEGORY - if id in ('postalAddress', ): - return cryptography.x509.oid.NameOID.POSTAL_ADDRESS - if id in ('postalCode', ): - return cryptography.x509.oid.NameOID.POSTAL_CODE - raise CertificateSigningRequestError('Unknown subject field identifier "{0}"'.format(id)) - - def _get_san(self, name): - if name.startswith('DNS:'): - return cryptography.x509.DNSName(to_text(name[4:])) - if name.startswith('IP:'): - return cryptography.x509.IPAddress(to_text(name[3:])) - if name.startswith('email:'): - return cryptography.x509.RFC822Name(to_text(name[6:])) - if name.startswith('URI:'): - return cryptography.x509.UniformResourceIdentifier(to_text(name[4:])) - if name.startswith('DirName:'): - return cryptography.x509.DirectoryName(to_text(name[8:])) - if ':' not in name: - raise CertificateSigningRequestError('Cannot parse Subject Alternative Name "{0}" (forgot "DNS:" prefix?)'.format(name)) - raise CertificateSigningRequestError('Cannot parse Subject Alternative Name "{0}" (potentially unsupported by cryptography backend)'.format(name)) - - def _get_keyusage(self, usage): - if usage in ('Digital Signature', 'digitalSignature'): - return 'digital_signature' - if usage in ('Non Repudiation', 'nonRepudiation'): - return 'content_commitment' - if usage in ('Key Encipherment', 'keyEncipherment'): - return 'key_encipherment' - if usage in ('Data Encipherment', 'dataEncipherment'): - return 'data_encipherment' - if usage in ('Key Agreement', 'keyAgreement'): - return 'key_agreement' - if usage in ('Certificate Sign', 'keyCertSign'): - return 'key_cert_sign' - if usage in ('CRL Sign', 'cRLSign'): - return 'crl_sign' - if usage in ('Encipher Only', 'encipherOnly'): - return 'encipher_only' - if usage in ('Decipher Only', 'decipherOnly'): - return 'decipher_only' - raise CertificateSigningRequestError('Unknown key usage "{0}"'.format(usage)) - - def _get_ext_keyusage(self, usage): - if usage in ('serverAuth', 'TLS Web Server Authentication'): - return cryptography.x509.oid.ExtendedKeyUsageOID.SERVER_AUTH - if usage in ('clientAuth', 'TLS Web Client Authentication'): - return cryptography.x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH - if usage in ('codeSigning', 'Code Signing'): - return cryptography.x509.oid.ExtendedKeyUsageOID.CODE_SIGNING - if usage in ('emailProtection', 'E-mail Protection'): - return cryptography.x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION - if usage in ('timeStamping', 'Time Stamping'): - return cryptography.x509.oid.ExtendedKeyUsageOID.TIME_STAMPING - if usage in ('OCSPSigning', 'OCSP Signing'): - return cryptography.x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING - if usage in ('anyExtendedKeyUsage', 'Any Extended Key Usage'): - return cryptography.x509.oid.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE - if usage in ('qcStatements', ): - return cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.3") - if usage in ('DVCS', ): - return cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.10") - if usage in ('IPSec User', 'ipsecUser'): - return cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.7") - if usage in ('Biometric Info', 'biometricInfo'): - return cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.2") - # FIXME need some more, probably all from https://www.iana.org/assignments/smi-numbers/smi-numbers.xhtml#smi-numbers-1.3.6.1.5.5.7.3 - raise CertificateSigningRequestError('Unknown extended key usage "{0}"'.format(usage)) - - def _get_basic_constraints(self, constraints): - ca = False - path_length = None - if constraints: - for constraint in constraints: - if constraint.startswith('CA:'): - if constraint == 'CA:TRUE': - ca = True - elif constraint == 'CA:FALSE': - ca = False - else: - raise CertificateSigningRequestError('Unknown basic constraint value "{0}" for CA'.format(constraint[3:])) - elif constraint.startswith('pathlen:'): - v = constraint[len('pathlen:'):] - try: - path_length = int(v) - except Exception as e: - raise CertificateSigningRequestError('Cannot parse path length constraint "{0}" ({1})'.format(v, e)) - else: - raise CertificateSigningRequestError('Unknown basic constraint "{0}"'.format(constraint)) - return ca, path_length - - def _parse_key_usage(self): - params = dict( - digital_signature=False, - content_commitment=False, - key_encipherment=False, - data_encipherment=False, - key_agreement=False, - key_cert_sign=False, - crl_sign=False, - encipher_only=False, - decipher_only=False, - ) - for usage in self.keyUsage: - params[self._get_keyusage(usage)] = True - return params - - def _generate_csr(self): - csr = cryptography.x509.CertificateSigningRequestBuilder() - csr = csr.subject_name(cryptography.x509.Name([ - cryptography.x509.NameAttribute(self._get_name_oid(entry[0]), to_text(entry[1])) for entry in self.subject - ])) - - if self.subjectAltName: - csr = csr.add_extension(cryptography.x509.SubjectAlternativeName([ - self._get_san(name) for name in self.subjectAltName - ]), critical=self.subjectAltName_critical) - - if self.keyUsage: - params = self._parse_key_usage() - csr = csr.add_extension(cryptography.x509.KeyUsage(**params), critical=self.keyUsage_critical) - - if self.extendedKeyUsage: - usages = [self._get_ext_keyusage(usage) for usage in self.extendedKeyUsage] - csr = csr.add_extension(cryptography.x509.ExtendedKeyUsage(usages), critical=self.extendedKeyUsage_critical) - - if self.basicConstraints: - params = {} - ca, path_length = self._get_basic_constraints(self.basicConstraints) - csr = csr.add_extension(cryptography.x509.BasicConstraints(ca, path_length), critical=self.basicConstraints_critical) - - if self.ocspMustStaple: - try: - # This only works with cryptography >= 2.1 - csr = csr.add_extension(cryptography.x509.TLSFeature([cryptography.x509.TLSFeatureType.status_request]), critical=self.ocspMustStaple_critical) - except AttributeError as dummy: - csr = csr.add_extension( - cryptography.x509.UnrecognizedExtension(CRYPTOGRAPHY_MUST_STAPLE_NAME, CRYPTOGRAPHY_MUST_STAPLE_VALUE), - critical=self.ocspMustStaple_critical - ) - - digest = None - if self.digest == 'sha256': - digest = cryptography.hazmat.primitives.hashes.SHA256() - elif self.digest == 'sha384': - digest = cryptography.hazmat.primitives.hashes.SHA384() - elif self.digest == 'sha512': - digest = cryptography.hazmat.primitives.hashes.SHA512() - elif self.digest == 'sha1': - digest = cryptography.hazmat.primitives.hashes.SHA1() - elif self.digest == 'md5': - digest = cryptography.hazmat.primitives.hashes.MD5() - # FIXME - else: - raise CertificateSigningRequestError('Unsupported digest "{0}"'.format(self.digest)) - self.request = csr.sign(self.privatekey, digest, self.cryptography_backend) - - return self.request.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM) - - def _load_private_key(self): - try: - with open(self.privatekey_path, 'rb') as f: - self.privatekey = cryptography.hazmat.primitives.serialization.load_pem_private_key( - f.read(), - None if self.privatekey_passphrase is None else to_bytes(self.privatekey_passphrase), - backend=self.cryptography_backend - ) - except Exception as e: - raise CertificateSigningRequestError(e) - - def _check_csr(self): - def _check_subject(csr): - subject = [(self._get_name_oid(entry[0]), entry[1]) for entry in self.subject] - current_subject = [(sub.oid, sub.value) for sub in csr.subject] - return set(subject) == set(current_subject) - - def _find_extension(extensions, type): - return next( - (ext for ext in extensions if isinstance(ext.value, type)), - None - ) - - def _check_subjectAltName(extensions): - current_altnames_ext = _find_extension(extensions, cryptography.x509.SubjectAlternativeName) - current_altnames = [str(altname) for altname in current_altnames_ext.value] if current_altnames_ext else [] - altnames = [str(self._get_san(altname)) for altname in self.subjectAltName] - if set(altnames) != set(current_altnames): - return False - if altnames: - if current_altnames_ext.critical != self.subjectAltName_critical: - return False - return True - - def _check_keyUsage(extensions): - current_keyusage_ext = _find_extension(extensions, cryptography.x509.KeyUsage) - if not self.keyUsage: - return current_keyusage_ext is None - elif current_keyusage_ext is None: - return False - params = self._parse_key_usage() - for param in params: - if getattr(current_keyusage_ext.value, '_' + param) != params[param]: - return False - if current_keyusage_ext.critical != self.keyUsage_critical: - return False - return True - - def _check_extenededKeyUsage(extensions): - current_usages_ext = _find_extension(extensions, cryptography.x509.ExtendedKeyUsage) - current_usages = [str(usage) for usage in current_usages_ext.value] if current_usages_ext else [] - usages = [str(self._get_ext_keyusage(usage)) for usage in self.extendedKeyUsage] if self.extendedKeyUsage else [] - if set(current_usages) != set(usages): - return False - if usages: - if current_usages_ext.critical != self.extendedKeyUsage_critical: - return False - return True - - def _check_basicConstraints(extensions): - bc_ext = _find_extension(extensions, cryptography.x509.BasicConstraints) - current_ca = bc_ext.ca if bc_ext else False - current_path_length = bc_ext.path_length if bc_ext else None - ca, path_length = self._get_basic_constraints(self.basicConstraints) - # Check CA flag - if ca != current_ca: - return False - # Check path length - if path_length != current_path_length: - return False - # Check criticality - if self.basicConstraints: - if bc_ext.critical != self.basicConstraints_critical: - return False - return True - - def _check_ocspMustStaple(extensions): - try: - # This only works with cryptography >= 2.1 - tlsfeature_ext = _find_extension(extensions, cryptography.x509.TLSFeature) - has_tlsfeature = True - except AttributeError as dummy: - tlsfeature_ext = next( - (ext for ext in extensions if ext.value.oid == CRYPTOGRAPHY_MUST_STAPLE_NAME), - None - ) - has_tlsfeature = False - if self.ocspMustStaple: - if not tlsfeature_ext or tlsfeature_ext.critical != self.ocspMustStaple_critical: - return False - if has_tlsfeature: - return cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value - else: - return tlsfeature_ext.value.value == CRYPTOGRAPHY_MUST_STAPLE_VALUE - else: - return tlsfeature_ext is None - - def _check_extensions(csr): - extensions = csr.extensions - return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and - _check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and - _check_ocspMustStaple(extensions)) - - def _check_signature(csr): - if not csr.is_signature_valid: - return False - # To check whether public key of CSR belongs to private key, - # encode both public keys and compare PEMs. - key_a = csr.public_key().public_bytes( - cryptography.hazmat.primitives.serialization.Encoding.PEM, - cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo - ) - key_b = self.privatekey.public_key().public_bytes( - cryptography.hazmat.primitives.serialization.Encoding.PEM, - cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo - ) - return key_a == key_b - - try: - with open(self.path, 'rb') as f: - csr = cryptography.x509.load_pem_x509_csr(f.read(), self.cryptography_backend) - except Exception as dummy: - return False + result = { + 'privatekey': self.privatekey_path, + 'filename': self.path, + 'subject': self.subject, + 'subjectAltName': self.subjectAltName, + 'keyUsage': self.keyUsage, + 'extendedKeyUsage': self.extendedKeyUsage, + 'basicConstraints': self.basicConstraints, + 'ocspMustStaple': self.ocspMustStaple, + 'changed': self.changed + } - return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr) + return result def main(): @@ -954,54 +550,24 @@ def main(): basicConstraints_critical=dict(aliases=['basic_constraints_critical'], default=False, type='bool'), ocspMustStaple=dict(aliases=['ocsp_must_staple'], default=False, type='bool'), ocspMustStaple_critical=dict(aliases=['ocsp_must_staple_critical'], default=False, type='bool'), - select_crypto_backend=dict(required=False, choices=['auto', 'pyopenssl', 'cryptography'], default='auto', type='str'), ), add_file_common_args=True, supports_check_mode=True, ) + if not pyopenssl_found: + module.fail_json(msg='the python pyOpenSSL module is required') + + try: + getattr(crypto.X509Req, 'get_extensions') + except AttributeError: + module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs') + base_dir = os.path.dirname(module.params['path']) or '.' if not os.path.isdir(base_dir): module.fail_json(name=base_dir, msg='The directory %s does not exist or the file is not a directory' % base_dir) - backend = module.params['select_crypto_backend'] - if backend == 'auto': - # Detection what is possible - can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) - - # Decision - if module.params['cipher'] and module.params['passphrase'] and module.params['cipher'] != 'auto': - # First try pyOpenSSL, then cryptography - if can_use_pyopenssl: - backend = 'pyopenssl' - elif can_use_cryptography: - backend = 'cryptography' - else: - # First try cryptography, then pyOpenSSL - if can_use_cryptography: - backend = 'cryptography' - elif can_use_pyopenssl: - backend = 'pyopenssl' - - # Success? - if backend == 'auto': - module.fail_json(msg=('Can detect none of the Python libraries ' - 'cryptography (>= {0}) and pyOpenSSL (>= {1})').format( - MINIMAL_CRYPTOGRAPHY_VERSION, - MINIMAL_PYOPENSSL_VERSION)) - if backend == 'pyopenssl': - if not PYOPENSSL_FOUND: - module.fail_json(msg='The Python pyOpenSSL library is required') - try: - getattr(crypto.X509Req, 'get_extensions') - except AttributeError: - module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs') - csr = CertificateSigningRequestPyOpenSSL(module) - elif backend == 'cryptography': - if not CRYPTOGRAPHY_FOUND: - module.fail_json(msg='The Python cryptography library is required') - csr = CertificateSigningRequestCryptography(module) + csr = CertificateSigningRequest(module) if module.params['state'] == 'present': diff --git a/test/integration/targets/openssl_csr/tasks/impl.yml b/test/integration/targets/openssl_csr/tasks/impl.yml deleted file mode 100644 index ffc4bf32e13..00000000000 --- a/test/integration/targets/openssl_csr/tasks/impl.yml +++ /dev/null @@ -1,148 +0,0 @@ ---- -- name: Generate privatekey - openssl_privatekey: - path: '{{ output_dir }}/privatekey.pem' - -- name: Generate CSR (check mode) - openssl_csr: - path: '{{ output_dir }}/csr.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: www.ansible.com - select_crypto_backend: '{{ select_crypto_backend }}' - check_mode: yes - register: generate_csr_check - -- name: Generate CSR - openssl_csr: - path: '{{ output_dir }}/csr.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: www.ansible.com - select_crypto_backend: '{{ select_crypto_backend }}' - register: generate_csr - -- name: Generate CSR (idempotent) - openssl_csr: - path: '{{ output_dir }}/csr.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: www.ansible.com - select_crypto_backend: '{{ select_crypto_backend }}' - register: generate_csr_check_idempotent - -- name: Generate CSR (idempotent, check mode) - openssl_csr: - path: '{{ output_dir }}/csr.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: www.ansible.com - select_crypto_backend: '{{ select_crypto_backend }}' - check_mode: yes - register: generate_csr_check_idempotent_check - -# keyUsage longname and shortname should be able to be used -# interchangeably. Hence the long name is specified here -# but the short name is used to test idempotency for ipsecuser -# and vice-versa for biometricInfo -- name: Generate CSR with KU and XKU - openssl_csr: - path: '{{ output_dir }}/csr_ku_xku.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - CN: www.ansible.com - keyUsage: - - digitalSignature - - keyAgreement - extendedKeyUsage: - - qcStatements - - DVCS - - IPSec User - - biometricInfo - select_crypto_backend: '{{ select_crypto_backend }}' - -- name: Generate CSR with KU and XKU (test idempotency) - openssl_csr: - path: '{{ output_dir }}/csr_ku_xku.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: 'www.ansible.com' - keyUsage: - - Key Agreement - - digitalSignature - extendedKeyUsage: - - ipsecUser - - qcStatements - - DVCS - - Biometric Info - select_crypto_backend: '{{ select_crypto_backend }}' - register: csr_ku_xku - -- name: Generate CSR with KU and XKU (test XKU change) - openssl_csr: - path: '{{ output_dir }}/csr_ku_xku.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: 'www.ansible.com' - keyUsage: - - digitalSignature - - keyAgreement - extendedKeyUsage: - - ipsecUser - - qcStatements - - Biometric Info - select_crypto_backend: '{{ select_crypto_backend }}' - register: csr_ku_xku_change - -- name: Generate CSR with KU and XKU (test KU change) - openssl_csr: - path: '{{ output_dir }}/csr_ku_xku.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: 'www.ansible.com' - keyUsage: - - digitalSignature - extendedKeyUsage: - - ipsecUser - - qcStatements - - Biometric Info - select_crypto_backend: '{{ select_crypto_backend }}' - register: csr_ku_xku_change_2 - -- name: Generate CSR with old API - openssl_csr: - path: '{{ output_dir }}/csr_oldapi.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - commonName: www.ansible.com - select_crypto_backend: '{{ select_crypto_backend }}' - -- name: Generate CSR with OCSP Must Staple - openssl_csr: - path: '{{ output_dir }}/csr_ocsp.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject_alt_name: "DNS:www.ansible.com" - ocsp_must_staple: true - select_crypto_backend: '{{ select_crypto_backend }}' - -- name: Generate CSR with OCSP Must Staple (test idempotency) - openssl_csr: - path: '{{ output_dir }}/csr_ocsp.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject_alt_name: "DNS:www.ansible.com" - ocsp_must_staple: true - select_crypto_backend: '{{ select_crypto_backend }}' - register: csr_ocsp_idempotency - -- name: Generate ECC privatekey - openssl_privatekey: - path: '{{ output_dir }}/privatekey2.pem' - type: ECC - curve: secp384r1 - -- name: Generate CSR with ECC privatekey - openssl_csr: - path: '{{ output_dir }}/csr2.csr' - privatekey_path: '{{ output_dir }}/privatekey2.pem' - subject: - commonName: www.ansible.com - select_crypto_backend: '{{ select_crypto_backend }}' diff --git a/test/integration/targets/openssl_csr/tasks/main.yml b/test/integration/targets/openssl_csr/tasks/main.yml index ceed31702f8..2c7806dd190 100644 --- a/test/integration/targets/openssl_csr/tasks/main.yml +++ b/test/integration/targets/openssl_csr/tasks/main.yml @@ -1,30 +1,141 @@ --- - block: - - name: Running tests with pyOpenSSL backend - include_tasks: impl.yml - vars: - select_crypto_backend: pyopenssl + - name: Generate privatekey + openssl_privatekey: + path: '{{ output_dir }}/privatekey.pem' - - import_tasks: ../tests/validate.yml + - name: Generate CSR (check mode) + openssl_csr: + path: '{{ output_dir }}/csr.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + check_mode: yes + register: generate_csr_check - when: pyopenssl_version.stdout is version('0.15', '>=') + - name: Generate CSR + openssl_csr: + path: '{{ output_dir }}/csr.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + register: generate_csr -- name: Remove output directory - file: - path: "{{ output_dir }}" - state: absent + - name: Generate CSR (idempotent) + openssl_csr: + path: '{{ output_dir }}/csr.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + register: generate_csr_check_idempotent -- name: Re-create output directory - file: - path: "{{ output_dir }}" - state: directory + - name: Generate CSR (idempotent, check mode) + openssl_csr: + path: '{{ output_dir }}/csr.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + check_mode: yes + register: generate_csr_check_idempotent_check -- block: - - name: Running tests with cryptography backend - include_tasks: impl.yml - vars: - select_crypto_backend: cryptography + # keyUsage longname and shortname should be able to be used + # interchangeably. Hence the long name is specified here + # but the short name is used to test idempotency for ipsecuser + # and vice-versa for biometricInfo + - name: Generate CSR with KU and XKU + openssl_csr: + path: '{{ output_dir }}/csr_ku_xku.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + CN: www.ansible.com + keyUsage: + - digitalSignature + - keyAgreement + extendedKeyUsage: + - qcStatements + - DVCS + - IPSec User + - biometricInfo + + - name: Generate CSR with KU and XKU (test idempotency) + openssl_csr: + path: '{{ output_dir }}/csr_ku_xku.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: 'www.ansible.com' + keyUsage: + - Key Agreement + - digitalSignature + extendedKeyUsage: + - ipsecUser + - qcStatements + - DVCS + - Biometric Info + register: csr_ku_xku + + - name: Generate CSR with KU and XKU (test XKU change) + openssl_csr: + path: '{{ output_dir }}/csr_ku_xku.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: 'www.ansible.com' + keyUsage: + - digitalSignature + - keyAgreement + extendedKeyUsage: + - ipsecUser + - qcStatements + - Biometric Info + register: csr_ku_xku_change + + - name: Generate CSR with KU and XKU (test KU change) + openssl_csr: + path: '{{ output_dir }}/csr_ku_xku.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: 'www.ansible.com' + keyUsage: + - digitalSignature + extendedKeyUsage: + - ipsecUser + - qcStatements + - Biometric Info + register: csr_ku_xku_change_2 + + - name: Generate CSR with old API + openssl_csr: + path: '{{ output_dir }}/csr_oldapi.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + commonName: www.ansible.com - - import_tasks: ../tests/validate.yml + - name: Generate CSR with OCSP Must Staple + openssl_csr: + path: '{{ output_dir }}/csr_ocsp.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject_alt_name: "DNS:www.ansible.com" + ocsp_must_staple: true - when: cryptography_version.stdout is version('1.3', '>=') + - name: Generate CSR with OCSP Must Staple (test idempotency) + openssl_csr: + path: '{{ output_dir }}/csr_ocsp.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject_alt_name: "DNS:www.ansible.com" + ocsp_must_staple: true + register: csr_ocsp_idempotency + + - name: Generate ECC privatekey + openssl_privatekey: + path: '{{ output_dir }}/privatekey2.pem' + type: ECC + curve: secp256k1 + + - name: Generate CSR with ECC privatekey + openssl_csr: + path: '{{ output_dir }}/csr2.csr' + privatekey_path: '{{ output_dir }}/privatekey2.pem' + subject: + commonName: www.ansible.com + + - import_tasks: ../tests/validate.yml + + when: pyopenssl_version.stdout is version('0.15', '>=')