@ -28,9 +28,11 @@ description:
- Many properties that can be specified in this module are for validation of an
existing or newly generated certificate . The proper place to specify them , if you
want to receive a certificate with these properties is a CSR ( Certificate Signing Request ) .
- It uses the pyOpenSSL python library to interact with OpenSSL .
- It uses the pyOpenSSL or cryptography python library to interact with OpenSSL .
- If both the cryptography and PyOpenSSL libraries are available ( and meet the minimum version requirements )
cryptography will be preferred as a backend over PyOpenSSL ( unless the backend is forced with C ( select_crypto_backend ) )
requirements :
- python - pyOpenSSL > = 0.15 ( if using C ( selfsigned ) or C ( assertonly ) provider )
- PyOpenSSL > = 0.15 or cryptography > = 1.6 ( if using C ( selfsigned ) or C ( assertonly ) provider )
- acme - tiny ( if using the C ( acme ) provider )
author :
- Yanis Guenane ( @Spredzy )
@ -346,6 +348,17 @@ options:
default : no
aliases : [ subjectAltName_strict ]
select_crypto_backend :
description :
- Determines which crypto backend to use .
- The default choice is C ( auto ) , which tries to use C ( cryptography ) if available , and falls back to C ( pyopenssl ) .
- If set to C ( pyopenssl ) , will try to use the L ( pyOpenSSL , https : / / pypi . org / project / pyOpenSSL / ) library .
- If set to C ( cryptography ) , will try to use the L ( cryptography , https : / / cryptography . io / ) library .
type : str
default : auto
choices : [ auto , cryptography , pyopenssl ]
version_added : " 2.8 "
extends_documentation_fragment : files
notes :
- All ASN .1 TIME values should be specified following the YYYYMMDDHHMMSSZ pattern .
@ -504,20 +517,39 @@ from random import randint
import datetime
import os
import traceback
from distutils . version import LooseVersion
from ansible . module_utils import crypto as crypto_utils
from ansible . module_utils . basic import AnsibleModule , missing_required_lib
from ansible . module_utils . _text import to_native , to_bytes
from ansible . module_utils . _text import to_native , to_bytes , to_text
MINIMAL_CRYPTOGRAPHY_VERSION = ' 1.6 '
MINIMAL_PYOPENSSL_VERSION = ' 0.15 '
PYOPENSSL_IMP_ERR = None
try :
import OpenSSL
from OpenSSL import crypto
PYOPENSSL_VERSION = LooseVersion ( OpenSSL . __version__ )
except ImportError :
PYOPENSSL_IMP_ERR = traceback . format_exc ( )
pyopenssl_found = False
PYOPENSSL_FOUND = False
else :
PYOPENSSL_FOUND = True
CRYPTOGRAPHY_IMP_ERR = None
try :
import cryptography
from cryptography import x509
from cryptography . hazmat . backends import default_backend
from cryptography . hazmat . primitives . serialization import Encoding
from cryptography . x509 import NameAttribute , Name
CRYPTOGRAPHY_VERSION = LooseVersion ( cryptography . __version__ )
except ImportError :
CRYPTOGRAPHY_IMP_ERR = traceback . format_exc ( )
CRYPTOGRAPHY_FOUND = False
else :
pyopenssl_found = True
CRYPTOGRAPHY_FOUND = True
class CertificateError ( crypto_utils . OpenSSLObjectError ) :
@ -526,7 +558,7 @@ class CertificateError(crypto_utils.OpenSSLObjectError):
class Certificate ( crypto_utils . OpenSSLObject ) :
def __init__ ( self , module ):
def __init__ ( self , module , backend ):
super ( Certificate , self ) . __init__ (
module . params [ ' path ' ] ,
module . params [ ' state ' ] ,
@ -541,6 +573,7 @@ class Certificate(crypto_utils.OpenSSLObject):
self . cert = None
self . privatekey = None
self . csr = None
self . backend = backend
self . module = module
def get_relative_time_option ( self , input_string , input_name ) :
@ -548,37 +581,55 @@ class Certificate(crypto_utils.OpenSSLObject):
or an ASN1 formatted string is provided . """
result = input_string
if result . startswith ( " + " ) or result . startswith ( " - " ) :
result = crypto_utils . convert_relative_to_datetime (
result ) . strftime ( " % Y % m %d % H % M % SZ " )
result_datetime = crypto_utils . convert_relative_to_datetime (
result )
if self . backend == ' pyopenssl ' :
return result_datetime . strftime ( " % Y % m %d % H % M % SZ " )
elif self . backend == ' cryptography ' :
return result_datetime
if result is None :
raise CertificateError (
' The timespec " %s " for %s is not valid ' %
input_string , input_name )
if self . backend == ' cryptography ' :
for date_fmt in [ ' % Y % m %d % H % M % SZ ' , ' % Y % m %d % H % MZ ' , ' % Y % m %d % H % M % S % z ' , ' % Y % m %d % H % M % z ' ] :
try :
result = datetime . datetime . strptime ( input_string , date_fmt )
break
except ValueError :
pass
if not isinstance ( result , datetime . datetime ) :
raise CertificateError (
' The time spec " %s " for %s is invalid ' %
( input_string , input_name )
)
return result
def check ( self , module , perms_required = True ) :
""" Ensure the resource is in its desired state. """
state_and_perms = super ( Certificate , self ) . check ( module , perms_required )
def _validate_privatekey ( ) :
if self . privatekey_path :
ctx = OpenSSL . SSL . Context ( OpenSSL . SSL . TLSv1_2_METHOD )
ctx . use_privatekey ( self . privatekey )
ctx . use_certificate ( self . cert )
try :
ctx . check_privatekey ( )
return True
except OpenSSL . SSL . Error :
return False
def _validate_privatekey ( self ) :
if self . backend == ' pyopenssl ' :
ctx = OpenSSL . SSL . Context ( OpenSSL . SSL . TLSv1_2_METHOD )
ctx . use_privatekey ( self . privatekey )
ctx . use_certificate ( self . cert )
try :
ctx . check_privatekey ( )
return True
except OpenSSL . SSL . Error :
return False
elif self . backend == ' cryptography ' :
return self . cert . public_key ( ) . public_numbers ( ) == self . privatekey . public_key ( ) . public_numbers ( )
def _validate_csr ( ) :
def _validate_csr ( self ) :
if self . backend == ' pyopenssl ' :
# Verify that CSR is signed by certificate's private key
try :
self . csr . verify ( self . cert . get_pubkey ( ) )
except OpenSSL . crypto . Error :
return False
# Check subject
if self . csr . get_subject ( ) != self . cert . get_subject ( ) :
return False
# Check extensions
csr_extensions = self . csr . get_extensions ( )
cert_extension_count = self . cert . get_extension_count ( )
if len ( csr_extensions ) != cert_extension_count :
@ -589,35 +640,156 @@ class Certificate(crypto_utils.OpenSSLObject):
if cert_extension . get_data ( ) != list ( csr_extension ) [ 0 ] . get_data ( ) :
return False
return True
elif self . backend == ' cryptography ' :
# Verify that CSR is signed by certificate's private key
if not self . csr . is_signature_valid :
return False
if self . csr . public_key ( ) . public_numbers ( ) != self . cert . public_key ( ) . public_numbers ( ) :
return False
# Check subject
if self . csr . subject != self . cert . subject :
return False
# Check extensions
cert_exts = self . cert . extensions
csr_exts = self . csr . extensions
if len ( cert_exts ) != len ( csr_exts ) :
return False
for cert_ext in cert_exts :
try :
csr_ext = csr_exts . get_extension_for_oid ( cert_ext . oid )
if cert_ext != csr_ext :
return False
except cryptography . x509 . ExtensionNotFound as e :
return False
return True
def check ( self , module , perms_required = True ) :
""" Ensure the resource is in its desired state. """
state_and_perms = super ( Certificate , self ) . check ( module , perms_required )
if not state_and_perms :
return False
self . cert = crypto_utils . load_certificate ( self . path )
self . cert = crypto_utils . load_certificate ( self . path , backend = self . backend )
if self . privatekey_path :
try :
self . privatekey = crypto_utils . load_privatekey (
self . privatekey_path ,
self . privatekey_passphrase
self . privatekey_passphrase ,
backend = self . backend
)
except crypto_utils . OpenSSLBadPassphraseError as exc :
raise CertificateError ( exc )
return _validate_privatekey ( )
return self . _validate_privatekey ( )
if self . csr_path :
self . csr = crypto_utils . load_certificate_request ( self . csr_path )
if not _validate_csr ( ) :
self . csr = crypto_utils . load_certificate_request ( self . csr_path , backend = self . backend )
if not self . _validate_csr ( ) :
return False
return True
class SelfSignedCertificateCryptography ( Certificate ) :
""" Generate the self-signed certificate, using the cryptography backend """
def __init__ ( self , module ) :
super ( SelfSignedCertificateCryptography , self ) . __init__ ( module , ' cryptography ' )
self . notBefore = self . get_relative_time_option ( module . params [ ' selfsigned_not_before ' ] , ' selfsigned_not_before ' )
self . notAfter = self . get_relative_time_option ( module . params [ ' selfsigned_not_after ' ] , ' selfsigned_not_after ' )
self . digest = crypto_utils . select_message_digest ( module . params [ ' selfsigned_digest ' ] )
self . version = module . params [ ' selfsigned_version ' ]
self . serial_number = x509 . random_serial_number ( )
self . csr = crypto_utils . load_certificate_request ( self . csr_path , backend = self . backend )
self . _module = module
try :
self . privatekey = crypto_utils . load_privatekey (
self . privatekey_path , self . privatekey_passphrase , backend = self . backend
)
except crypto_utils . OpenSSLBadPassphraseError as exc :
module . fail_json ( msg = to_native ( exc ) )
if self . digest is None :
raise CertificateError (
' The digest %s is not supported with the cryptography backend ' % module . params [ ' selfsigned_digest ' ]
)
def generate ( self , module ) :
if not os . path . exists ( self . privatekey_path ) :
raise CertificateError (
' The private key %s does not exist ' % self . privatekey_path
)
if not os . path . exists ( self . csr_path ) :
raise CertificateError (
' The certificate signing request file %s does not exist ' % self . csr_path
)
if not self . check ( module , perms_required = False ) or self . force :
try :
cert_builder = x509 . CertificateBuilder ( )
cert_builder = cert_builder . subject_name ( self . csr . subject )
cert_builder = cert_builder . issuer_name ( self . csr . subject )
cert_builder = cert_builder . serial_number ( self . serial_number )
cert_builder = cert_builder . not_valid_before ( self . notBefore )
cert_builder = cert_builder . not_valid_after ( self . notAfter )
cert_builder = cert_builder . public_key ( self . privatekey . public_key ( ) )
for extension in self . csr . extensions :
cert_builder = cert_builder . add_extension ( extension . value , critical = extension . critical )
except ValueError as e :
raise CertificateError ( str ( e ) )
certificate = cert_builder . sign (
private_key = self . privatekey , algorithm = self . digest ,
backend = default_backend ( )
)
self . cert = certificate
try :
with open ( self . path , ' wb ' ) as cert_file :
cert_file . write ( certificate . public_bytes ( Encoding . PEM ) )
except Exception as exc :
raise CertificateError ( exc )
self . changed = True
else :
self . cert = crypto_utils . load_certificate ( self . path , backend = self . backend )
file_args = module . load_file_common_arguments ( module . params )
if module . set_fs_attributes_if_different ( file_args , False ) :
self . changed = True
def dump ( self , check_mode = False ) :
result = {
' changed ' : self . changed ,
' filename ' : self . path ,
' privatekey ' : self . privatekey_path ,
' csr ' : self . csr_path
}
if check_mode :
result . update ( {
' notBefore ' : self . notBefore . strftime ( " % Y % m %d % H % M % SZ " ) ,
' notAfter ' : self . notAfter . strftime ( " % Y % m %d % H % M % SZ " ) ,
' serial_number ' : self . serial_number ,
} )
else :
result . update ( {
' notBefore ' : self . cert . not_valid_before . strftime ( " % Y % m %d % H % M % SZ " ) ,
' notAfter ' : self . cert . not_valid_after . strftime ( " % Y % m %d % H % M % SZ " ) ,
' serial_number ' : self . cert . serial_number ,
} )
return result
class SelfSignedCertificate ( Certificate ) :
""" Generate the self-signed certificate. """
def __init__ ( self , module ) :
super ( SelfSignedCertificate , self ) . __init__ ( module )
super ( SelfSignedCertificate , self ) . __init__ ( module , ' pyopenssl ' )
self . notBefore = self . get_relative_time_option ( module . params [ ' selfsigned_not_before ' ] , ' selfsigned_not_before ' )
self . notAfter = self . get_relative_time_option ( module . params [ ' selfsigned_not_after ' ] , ' selfsigned_not_after ' )
self . digest = module . params [ ' selfsigned_digest ' ]
@ -694,11 +866,108 @@ class SelfSignedCertificate(Certificate):
return result
class OwnCACertificateCryptography ( Certificate ) :
""" Generate the own CA certificate. Using the cryptography backend """
def __init__ ( self , module ) :
super ( OwnCACertificateCryptography , self ) . __init__ ( module , ' cryptography ' )
self . notBefore = self . get_relative_time_option ( module . params [ ' ownca_not_before ' ] , ' ownca_not_before ' )
self . notAfter = self . get_relative_time_option ( module . params [ ' ownca_not_after ' ] , ' ownca_not_after ' )
self . digest = crypto_utils . select_message_digest ( module . params [ ' ownca_digest ' ] )
self . version = module . params [ ' ownca_version ' ]
self . serial_number = x509 . random_serial_number ( )
self . ca_cert_path = module . params [ ' ownca_path ' ]
self . ca_privatekey_path = module . params [ ' ownca_privatekey_path ' ]
self . ca_privatekey_passphrase = module . params [ ' ownca_privatekey_passphrase ' ]
self . csr = crypto_utils . load_certificate_request ( self . csr_path , backend = self . backend )
self . ca_cert = crypto_utils . load_certificate ( self . ca_cert_path , backend = self . backend )
try :
self . ca_private_key = crypto_utils . load_privatekey (
self . ca_privatekey_path , self . ca_privatekey_passphrase , backend = self . backend
)
except crypto_utils . OpenSSLBadPassphraseError as exc :
module . fail_json ( msg = str ( exc ) )
def generate ( self , module ) :
if not os . path . exists ( self . ca_cert_path ) :
raise CertificateError (
' The CA certificate %s does not exist ' % self . ca_cert_path
)
if not os . path . exists ( self . ca_privatekey_path ) :
raise CertificateError (
' The CA private key %s does not exist ' % self . ca_privatekey_path
)
if not os . path . exists ( self . csr_path ) :
raise CertificateError (
' The certificate signing request file %s does not exist ' % self . csr_path
)
if not self . check ( module , perms_required = False ) or self . force :
cert_builder = x509 . CertificateBuilder ( )
cert_builder = cert_builder . subject_name ( self . csr . subject )
cert_builder = cert_builder . issuer_name ( self . ca_cert . subject )
cert_builder = cert_builder . serial_number ( self . serial_number )
cert_builder = cert_builder . not_valid_before ( self . notBefore )
cert_builder = cert_builder . not_valid_after ( self . notAfter )
cert_builder = cert_builder . public_key ( self . csr . public_key ( ) )
for extension in self . csr . extensions :
cert_builder = cert_builder . add_extension ( extension . value , critical = extension . critical )
certificate = cert_builder . sign (
private_key = self . ca_private_key , algorithm = self . digest ,
backend = default_backend ( )
)
self . cert = certificate
try :
with open ( self . path , ' wb ' ) as cert_file :
cert_file . write ( certificate . public_bytes ( Encoding . PEM ) )
except Exception as exc :
raise CertificateError ( exc )
self . changed = True
else :
self . cert = crypto_utils . load_certificate ( self . path , backend = self . backend )
file_args = module . load_file_common_arguments ( module . params )
if module . set_fs_attributes_if_different ( file_args , False ) :
self . changed = True
def dump ( self , check_mode = False ) :
result = {
' changed ' : self . changed ,
' filename ' : self . path ,
' privatekey ' : self . privatekey_path ,
' csr ' : self . csr_path ,
' ca_cert ' : self . ca_cert_path ,
' ca_privatekey ' : self . ca_privatekey_path
}
if check_mode :
result . update ( {
' notBefore ' : self . notBefore . strftime ( " % Y % m %d % H % M % SZ " ) ,
' notAfter ' : self . notAfter . strftime ( " % Y % m %d % H % M % SZ " ) ,
' serial_number ' : self . serial_number ,
} )
else :
result . update ( {
' notBefore ' : self . cert . not_valid_before . strftime ( " % Y % m %d % H % M % SZ " ) ,
' notAfter ' : self . cert . not_valid_after . strftime ( " % Y % m %d % H % M % SZ " ) ,
' serial_number ' : self . cert . serial_number ,
} )
return result
class OwnCACertificate ( Certificate ) :
""" Generate the own CA certificate. """
def __init__ ( self , module ) :
super ( OwnCACertificate , self ) . __init__ ( module )
super ( OwnCACertificate , self ) . __init__ ( module , ' pyopenssl ' )
self . notBefore = self . get_relative_time_option ( module . params [ ' ownca_not_before ' ] , ' ownca_not_before ' )
self . notAfter = self . get_relative_time_option ( module . params [ ' ownca_not_after ' ] , ' ownca_not_after ' )
self . digest = module . params [ ' ownca_digest ' ]
@ -786,11 +1055,408 @@ class OwnCACertificate(Certificate):
return result
class AssertOnlyCertificateCryptography ( Certificate ) :
""" Validate the supplied cert, using the cryptography backend """
def __init__ ( self , module ) :
super ( AssertOnlyCertificateCryptography , self ) . __init__ ( module , ' cryptography ' )
self . signature_algorithms = module . params [ ' signature_algorithms ' ]
if module . params [ ' subject ' ] :
self . subject = crypto_utils . parse_name_field ( module . params [ ' subject ' ] )
else :
self . subject = [ ]
self . subject_strict = module . params [ ' subject_strict ' ]
if module . params [ ' issuer ' ] :
self . issuer = crypto_utils . parse_name_field ( module . params [ ' issuer ' ] )
else :
self . issuer = [ ]
self . issuer_strict = module . params [ ' issuer_strict ' ]
self . has_expired = module . params [ ' has_expired ' ]
self . version = module . params [ ' version ' ]
self . keyUsage = module . params [ ' key_usage ' ]
self . keyUsage_strict = module . params [ ' key_usage_strict ' ]
self . extendedKeyUsage = module . params [ ' extended_key_usage ' ]
self . extendedKeyUsage_strict = module . params [ ' extended_key_usage_strict ' ]
self . subjectAltName = module . params [ ' subject_alt_name ' ]
self . subjectAltName_strict = module . params [ ' subject_alt_name_strict ' ]
self . notBefore = module . params [ ' not_before ' ] ,
self . notAfter = module . params [ ' not_after ' ] ,
self . valid_at = module . params [ ' valid_at ' ] ,
self . invalid_at = module . params [ ' invalid_at ' ] ,
self . valid_in = module . params [ ' valid_in ' ] ,
self . message = [ ]
def _get_name_oid ( self , id ) :
if id in ( ' CN ' , ' commonName ' ) :
return cryptography . x509 . oid . NameOID . COMMON_NAME
if id in ( ' C ' , ' countryName ' ) :
return cryptography . x509 . oid . NameOID . COUNTRY_NAME
if id in ( ' L ' , ' localityName ' ) :
return cryptography . x509 . oid . NameOID . LOCALITY_NAME
if id in ( ' ST ' , ' stateOrProvinceName ' ) :
return cryptography . x509 . oid . NameOID . STATE_OR_PROVINCE_NAME
if id in ( ' street ' , ' streetAddress ' ) :
return cryptography . x509 . oid . NameOID . STREET_ADDRESS
if id in ( ' O ' , ' organizationName ' ) :
return cryptography . x509 . oid . NameOID . ORGANIZATION_NAME
if id in ( ' OU ' , ' organizationalUnitName ' ) :
return cryptography . x509 . oid . NameOID . ORGANIZATIONAL_UNIT_NAME
if id in ( ' serialNumber ' , ) :
return cryptography . x509 . oid . NameOID . SERIAL_NUMBER
if id in ( ' SN ' , ' surname ' ) :
return cryptography . x509 . oid . NameOID . SURNAME
if id in ( ' GN ' , ' givenName ' ) :
return cryptography . x509 . oid . NameOID . GIVEN_NAME
if id in ( ' title ' , ) :
return cryptography . x509 . oid . NameOID . TITLE
if id in ( ' generationQualifier ' , ) :
return cryptography . x509 . oid . NameOID . GENERATION_QUALIFIER
if id in ( ' x500UniqueIdentifier ' , ) :
return cryptography . x509 . oid . NameOID . X500_UNIQUE_IDENTIFIER
if id in ( ' dnQualifier ' , ) :
return cryptography . x509 . oid . NameOID . DN_QUALIFIER
if id in ( ' pseudonym ' , ) :
return cryptography . x509 . oid . NameOID . PSEUDONYM
if id in ( ' UID ' , ' userId ' ) :
return cryptography . x509 . oid . NameOID . USER_ID
if id in ( ' DC ' , ' domainComponent ' ) :
return cryptography . x509 . oid . NameOID . DOMAIN_COMPONENT
if id in ( ' emailAddress ' , ) :
return cryptography . x509 . oid . NameOID . EMAIL_ADDRESS
if id in ( ' jurisdictionC ' , ' jurisdictionCountryName ' ) :
return cryptography . x509 . oid . NameOID . JURISDICTION_COUNTRY_NAME
if id in ( ' jurisdictionL ' , ' jurisdictionLocalityName ' ) :
return cryptography . x509 . oid . NameOID . JURISDICTION_LOCALITY_NAME
if id in ( ' jurisdictionST ' , ' jurisdictionStateOrProvinceName ' ) :
return cryptography . x509 . oid . NameOID . JURISDICTION_STATE_OR_PROVINCE_NAME
if id in ( ' businessCategory ' , ) :
return cryptography . x509 . oid . NameOID . BUSINESS_CATEGORY
if id in ( ' postalAddress ' , ) :
return cryptography . x509 . oid . NameOID . POSTAL_ADDRESS
if id in ( ' postalCode ' , ) :
return cryptography . x509 . oid . NameOID . POSTAL_CODE
def _get_san ( self , name ) :
if name . startswith ( ' DNS: ' ) :
return cryptography . x509 . DNSName ( to_native ( name [ 4 : ] ) )
if name . startswith ( ' IP: ' ) :
return cryptography . x509 . IPAddress ( to_native ( name [ 3 : ] ) )
if name . startswith ( ' email: ' ) :
return cryptography . x509 . RFC822Name ( to_native ( name [ 6 : ] ) )
if name . startswith ( ' URI: ' ) :
return cryptography . x509 . UniformResourceIdentifier ( to_native ( name [ 4 : ] ) )
if name . startswith ( ' DirName: ' ) :
return cryptography . x509 . DirectoryName ( to_native ( name [ 8 : ] ) )
if ' : ' not in name :
raise CertificateError ( ' Cannot parse Subject Alternative Name " {0} " (forgot " DNS: " prefix?) ' . format ( name ) )
raise CertificateError ( ' Cannot parse Subject Alternative Name " {0} " (potentially unsupported by cryptography backend) ' . format ( name ) )
def _get_keyusage ( self , usage ) :
if usage in ( ' Digital Signature ' , ' digitalSignature ' ) :
return ' digital_signature '
if usage in ( ' Non Repudiation ' , ' nonRepudiation ' ) :
return ' content_commitment '
if usage in ( ' Key Encipherment ' , ' keyEncipherment ' ) :
return ' key_encipherment '
if usage in ( ' Data Encipherment ' , ' dataEncipherment ' ) :
return ' data_encipherment '
if usage in ( ' Key Agreement ' , ' keyAgreement ' ) :
return ' key_agreement '
if usage in ( ' Certificate Sign ' , ' keyCertSign ' ) :
return ' key_cert_sign '
if usage in ( ' CRL Sign ' , ' cRLSign ' ) :
return ' crl_sign '
if usage in ( ' Encipher Only ' , ' encipherOnly ' ) :
return ' encipher_only '
if usage in ( ' Decipher Only ' , ' decipherOnly ' ) :
return ' decipher_only '
raise CertificateError ( ' Unknown key usage " {0} " ' . format ( usage ) )
def _get_ext_keyusage ( self , usage ) :
if usage in ( ' serverAuth ' , ' TLS Web Server Authentication ' ) :
return cryptography . x509 . oid . ExtendedKeyUsageOID . SERVER_AUTH
if usage in ( ' clientAuth ' , ' TLS Web Client Authentication ' ) :
return cryptography . x509 . oid . ExtendedKeyUsageOID . CLIENT_AUTH
if usage in ( ' codeSigning ' , ' Code Signing ' ) :
return cryptography . x509 . oid . ExtendedKeyUsageOID . CODE_SIGNING
if usage in ( ' emailProtection ' , ' E-mail Protection ' ) :
return cryptography . x509 . oid . ExtendedKeyUsageOID . EMAIL_PROTECTION
if usage in ( ' timeStamping ' , ' Time Stamping ' ) :
return cryptography . x509 . oid . ExtendedKeyUsageOID . TIME_STAMPING
if usage in ( ' OCSPSigning ' , ' OCSP Signing ' ) :
return cryptography . x509 . oid . ExtendedKeyUsageOID . OCSP_SIGNING
if usage in ( ' anyExtendedKeyUsage ' , ' Any Extended Key Usage ' ) :
return cryptography . x509 . oid . ExtendedKeyUsageOID . ANY_EXTENDED_KEY_USAGE
if usage in ( ' qcStatements ' , ) :
return cryptography . x509 . oid . ObjectIdentifier ( " 1.3.6.1.5.5.7.1.3 " )
if usage in ( ' DVCS ' , ) :
return cryptography . x509 . oid . ObjectIdentifier ( " 1.3.6.1.5.5.7.3.10 " )
if usage in ( ' IPSec User ' , ' ipsecUser ' ) :
return cryptography . x509 . oid . ObjectIdentifier ( " 1.3.6.1.5.5.7.3.7 " )
if usage in ( ' Biometric Info ' , ' biometricInfo ' ) :
return cryptography . x509 . oid . ObjectIdentifier ( " 1.3.6.1.5.5.7.1.2 " )
# FIXME need some more, probably all from https://www.iana.org/assignments/smi-numbers/smi-numbers.xhtml#smi-numbers-1.3.6.1.5.5.7.3
raise CertificateError ( ' Unknown extended key usage " {0} " ' . format ( usage ) )
def _get_basic_constraints ( self , constraints ) :
ca = False
path_length = None
if constraints :
for constraint in constraints :
if constraint . startswith ( ' CA: ' ) :
if constraint == ' CA:TRUE ' :
ca = True
elif constraint == ' CA:FALSE ' :
ca = False
else :
raise CertificateError ( ' Unknown basic constraint value " {0} " for CA ' . format ( constraint [ 3 : ] ) )
elif constraint . startswith ( ' pathlen: ' ) :
v = constraint [ len ( ' pathlen: ' ) : ]
try :
path_length = int ( v )
except Exception as e :
raise CertificateError ( ' Cannot parse path length constraint " {0} " ( {1} ) ' . format ( v , e ) )
else :
raise CertificateError ( ' Unknown basic constraint " {0} " ' . format ( constraint ) )
return ca , path_length
def _parse_key_usage ( self ) :
params = dict (
digital_signature = False ,
content_commitment = False ,
key_encipherment = False ,
data_encipherment = False ,
key_agreement = False ,
key_cert_sign = False ,
crl_sign = False ,
encipher_only = False ,
decipher_only = False ,
)
for usage in self . keyUsage :
params [ self . _get_keyusage ( usage ) ] = True
return params
def assertonly ( self ) :
self . cert = crypto_utils . load_certificate ( self . path , backend = self . backend )
def _validate_signature_algorithms ( ) :
if self . signature_algorithms :
if self . cert . signature_algorithm_oid . _name not in self . signature_algorithms :
self . message . append (
' Invalid signature algorithm (got %s , expected one of %s ) ' %
( self . cert . signature_algorithm_oid . _name , self . signature_algorithms )
)
def _validate_subject ( ) :
if self . subject :
expected_subject = Name ( [ NameAttribute ( oid = self . _get_name_oid ( sub [ 0 ] ) , value = to_text ( sub [ 1 ] ) )
for sub in self . subject ] )
cert_subject = self . cert . subject
if ( not self . subject_strict and not all ( x in cert_subject for x in expected_subject ) ) or \
( self . subject_strict and not set ( expected_subject ) == set ( cert_subject ) ) :
self . message . append (
' Invalid subject component (got %s , expected all of %s to be present) ' %
( cert_subject , expected_subject )
)
def _validate_issuer ( ) :
if self . issuer :
expected_issuer = Name ( [ NameAttribute ( oid = self . _get_name_oid ( iss [ 0 ] ) , value = to_text ( iss [ 1 ] ) )
for iss in self . issuer ] )
cert_issuer = self . cert . issuer
if ( not self . issuer_strict and not all ( x in cert_issuer for x in expected_issuer ) ) or \
( self . issuer_strict and not set ( expected_issuer ) == set ( cert_issuer ) ) :
self . message . append (
' Invalid issuer component (got %s , expected all of %s to be present) ' % ( cert_issuer , self . issuer )
)
def _validate_has_expired ( ) :
cert_not_after = self . cert . not_valid_after
cert_expired = cert_not_after < datetime . datetime . utcnow ( )
if self . has_expired != cert_expired :
self . message . append (
' Certificate expiration check failed (certificate expiration is %s , expected %s ) ' % ( cert_expired , self . has_expired )
)
def _validate_version ( ) :
# FIXME
if self . version :
expected_version = x509 . Version ( int ( self . version ) - 1 )
if expected_version != self . cert . version :
self . message . append (
' Invalid certificate version number (got %s , expected %s ) ' % ( self . cert . version , self . version )
)
def _validate_keyUsage ( ) :
if self . keyUsage :
try :
current_keyusage = self . cert . extensions . get_extension_for_class ( x509 . KeyUsage ) . value
expected_keyusage = x509 . KeyUsage ( * * self . _parse_key_usage ( ) )
test_keyusage = dict (
digital_signature = current_keyusage . digital_signature ,
content_commitment = current_keyusage . content_commitment ,
key_encipherment = current_keyusage . key_encipherment ,
data_encipherment = current_keyusage . data_encipherment ,
key_agreement = current_keyusage . key_agreement ,
key_cert_sign = current_keyusage . key_cert_sign ,
crl_sign = current_keyusage . crl_sign ,
)
if test_keyusage [ ' key_agreement ' ] :
test_keyusage . update ( dict (
encipher_only = current_keyusage . encipher_only ,
decipher_only = current_keyusage . decipher_only
) )
else :
test_keyusage . update ( dict (
encipher_only = False ,
decipher_only = False
) )
if ( not self . keyUsage_strict and not all ( self . _parse_key_usage ( ) [ x ] == test_keyusage [ x ] for x in self . _parse_key_usage ( ) ) ) or \
( self . keyUsage_strict and current_keyusage != expected_keyusage ) :
self . message . append (
' Invalid keyUsage components (got %s , expected all of %s to be present) ' %
( [ x for x in test_keyusage if x is True ] , [ x for x in self . keyUsage if x is True ] )
)
except cryptography . x509 . ExtensionNotFound :
self . message . append ( ' Found no keyUsage extension ' )
def _validate_extendedKeyUsage ( ) :
if self . extendedKeyUsage :
try :
current_ext_keyusage = self . cert . extensions . get_extension_for_class ( x509 . ExtendedKeyUsage ) . value
usages = [ self . _get_ext_keyusage ( usage ) for usage in self . extendedKeyUsage ]
expected_ext_keyusage = x509 . ExtendedKeyUsage ( usages )
if ( not self . extendedKeyUsage_strict and not all ( x in expected_ext_keyusage for x in current_ext_keyusage ) ) or \
( self . extendedKeyUsage_strict and not current_ext_keyusage == expected_ext_keyusage ) :
self . message . append (
' Invalid extendedKeyUsage component (got %s , expected all of %s to be present) ' % ( [ xku . value for xku in current_ext_keyusage ] ,
[ exku . value for exku in expected_ext_keyusage ] )
)
except cryptography . x509 . ExtensionNotFound :
self . message . append ( ' Found no extendedKeyUsage extension ' )
def _validate_subjectAltName ( ) :
if self . subjectAltName :
try :
current_san = self . cert . extensions . get_extension_for_class ( x509 . SubjectAlternativeName ) . value
expected_san = [ self . _get_san ( san ) for san in self . subjectAltName ]
if ( not self . subjectAltName_strict and not all ( x in current_san for x in expected_san ) ) or \
( self . subjectAltName_strict and not set ( current_san ) == set ( expected_san ) ) :
self . message . append (
' Invalid subjectAltName component (got %s , expected all of %s to be present) ' %
( current_san , self . subjectAltName )
)
except cryptography . x509 . ExtensionNotFound :
self . message . append ( ' Found no subjectAltName extension ' )
def _validate_notBefore ( ) :
if self . notBefore [ 0 ] :
# try:
if self . cert . not_valid_before != self . get_relative_time_option ( self . notBefore [ 0 ] , ' not_before ' ) :
self . message . append (
' Invalid notBefore component (got %s , expected %s to be present) ' % ( self . cert . not_valid_before , self . notBefore )
)
# except AttributeError:
# self.message.append(str(self.notBefore))
def _validate_notAfter ( ) :
if self . notAfter [ 0 ] :
if self . cert . not_valid_after != self . get_relative_time_option ( self . notAfter [ 0 ] , ' not_after ' ) :
self . message . append (
' Invalid notAfter component (got %s , expected %s to be present) ' % ( self . cert . not_valid_after , self . notAfter )
)
def _validate_valid_at ( ) :
if self . valid_at [ 0 ] :
rt = self . get_relative_time_option ( self . valid_at [ 0 ] , ' valid_at ' )
if not ( self . cert . not_valid_before < = rt < = self . cert . not_valid_after ) :
self . message . append (
' Certificate is not valid for the specified date ( %s ) - notBefore: %s - notAfter: %s ' % ( self . valid_at ,
self . cert . not_valid_before ,
self . cert . not_valid_after )
)
def _validate_invalid_at ( ) :
if self . invalid_at [ 0 ] :
if ( self . get_relative_time_option ( self . invalid_at [ 0 ] , ' invalid_at ' ) > self . cert . not_valid_before ) \
or ( self . get_relative_time_option ( self . invalid_at , ' invalid_at ' ) > = self . cert . not_valid_after ) :
self . message . append (
' Certificate is not invalid for the specified date ( %s ) - notBefore: %s - notAfter: %s ' % ( self . invalid_at ,
self . cert . not_valid_before ,
self . cert . not_valid_after )
)
def _validate_valid_in ( ) :
if self . valid_in [ 0 ] :
if not self . valid_in [ 0 ] . startswith ( " + " ) and not self . valid_in [ 0 ] . startswith ( " - " ) :
try :
int ( self . valid_in [ 0 ] )
except ValueError :
raise CertificateError (
' The supplied value for " valid_in " ( %s ) is not an integer or a valid timespec ' % self . valid_in )
self . valid_in = " + " + self . valid_in + " s "
valid_in_date = self . get_relative_time_option ( self . valid_in [ 0 ] , " valid_in " )
if not self . cert . not_valid_before < = valid_in_date < = self . cert . not_valid_after :
self . message . append (
' Certificate is not valid in %s from now (that would be %s ) - notBefore: %s - notAfter: %s '
% ( self . valid_in , valid_in_date ,
self . cert . not_valid_before ,
self . cert . not_valid_after ) )
for validation in [ ' signature_algorithms ' , ' subject ' , ' issuer ' ,
' has_expired ' , ' version ' , ' keyUsage ' ,
' extendedKeyUsage ' , ' subjectAltName ' ,
' notBefore ' , ' notAfter ' , ' valid_at ' , ' valid_in ' , ' invalid_at ' ] :
f_name = locals ( ) [ ' _validate_ %s ' % validation ]
f_name ( )
def generate ( self , module ) :
""" Don ' t generate anything - only assert """
self . assertonly ( )
try :
if self . privatekey_path and \
not super ( AssertOnlyCertificateCryptography , self ) . check ( module , perms_required = False ) :
self . message . append (
' Certificate %s and private key %s do not match ' % ( self . path , self . privatekey_path )
)
except CertificateError as e :
self . message . append (
' Error while reading private key %s : %s ' % ( self . privatekey_path , str ( e ) )
)
if len ( self . message ) :
module . fail_json ( msg = ' | ' . join ( self . message ) )
def check ( self , module , perms_required = False ) :
""" Ensure the resource is in its desired state. """
parent_check = super ( AssertOnlyCertificateCryptography , self ) . check ( module , perms_required )
self . assertonly ( )
assertonly_check = not len ( self . message )
self . message = [ ]
return parent_check and assertonly_check
def dump ( self , check_mode = False ) :
result = {
' changed ' : self . changed ,
' filename ' : self . path ,
' privatekey ' : self . privatekey_path ,
' csr ' : self . csr_path ,
}
return result
class AssertOnlyCertificate ( Certificate ) :
""" validate the supplied certificate. """
def __init__ ( self , module ) :
super ( AssertOnlyCertificate , self ) . __init__ ( module )
super ( AssertOnlyCertificate , self ) . __init__ ( module , ' pyopenssl ' )
self . signature_algorithms = module . params [ ' signature_algorithms ' ]
if module . params [ ' subject ' ] :
self . subject = crypto_utils . parse_name_field ( module . params [ ' subject ' ] )
@ -991,8 +1657,7 @@ class AssertOnlyCertificate(Certificate):
self . valid_in = " + " + self . valid_in + " s "
valid_in_asn1 = self . get_relative_time_option ( self . valid_in , " valid_in " )
valid_in_date = to_bytes ( valid_in_asn1 , errors = ' surrogate_or_strict ' )
if not ( self . cert . get_notBefore ( ) < = valid_in_date < =
self . cert . get_notAfter ( ) ) :
if not ( self . cert . get_notBefore ( ) < = valid_in_date < = self . cert . get_notAfter ( ) ) :
self . message . append (
' Certificate is not valid in %s from now (that would be %s ) - notBefore: %s - notAfter: %s '
% ( self . valid_in , valid_in_date ,
@ -1051,8 +1716,11 @@ class AssertOnlyCertificate(Certificate):
class AcmeCertificate ( Certificate ) :
""" Retrieve a certificate using the ACME protocol. """
def __init__ ( self , module ) :
super ( AcmeCertificate , self ) . __init__ ( module )
# Since there's no real use of the backend,
# other than the 'self.check' function, we just pass the backend to the constructor
def __init__ ( self , module , backend ) :
super ( AcmeCertificate , self ) . __init__ ( module , backend )
self . accountkey_path = module . params [ ' acme_accountkey_path ' ]
self . challenge_path = module . params [ ' acme_challenge_path ' ]
self . use_chain = module . params [ ' acme_chain ' ]
@ -1123,6 +1791,7 @@ def main():
provider = dict ( type = ' str ' , choices = [ ' acme ' , ' assertonly ' , ' ownca ' , ' selfsigned ' ] ) ,
force = dict ( type = ' bool ' , default = False , ) ,
csr_path = dict ( type = ' path ' ) ,
select_crypto_backend = dict ( type = ' str ' , default = ' auto ' , choices = [ ' auto ' , ' cryptography ' , ' pyopenssl ' ] ) ,
# General properties of a certificate
privatekey_path = dict ( type = ' path ' ) ,
@ -1172,14 +1841,6 @@ def main():
add_file_common_args = True ,
)
if not pyopenssl_found :
module . fail_json ( msg = missing_required_lib ( ' pyOpenSSL ' ) , exception = PYOPENSSL_IMP_ERR )
if module . params [ ' provider ' ] in [ ' selfsigned ' , ' ownca ' , ' assertonly ' ] :
try :
getattr ( crypto . X509Req , ' get_extensions ' )
except AttributeError :
module . fail_json ( msg = ' You need to have PyOpenSSL>=0.15 ' )
if module . params [ ' provider ' ] != ' assertonly ' and module . params [ ' csr_path ' ] is None :
module . fail_json ( msg = ' csr_path is required when provider is not assertonly ' )
@ -1192,14 +1853,60 @@ def main():
provider = module . params [ ' provider ' ]
if provider == ' selfsigned ' :
certificate = SelfSignedCertificate ( module )
elif provider == ' acme ' :
certificate = AcmeCertificate ( module )
elif provider == ' ownca ' :
certificate = OwnCACertificate ( module )
else :
certificate = AssertOnlyCertificate ( module )
backend = module . params [ ' select_crypto_backend ' ]
if backend == ' auto ' :
# Detect what backend we can use
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION > = LooseVersion ( MINIMAL_CRYPTOGRAPHY_VERSION )
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION > = LooseVersion ( MINIMAL_PYOPENSSL_VERSION )
# If cryptography is available we'll use it
if can_use_cryptography :
backend = ' cryptography '
elif can_use_pyopenssl :
backend = ' pyopenssl '
if module . params [ ' selfsigned_version ' ] == 2 or module . params [ ' ownca_version ' ] == 2 :
module . warn ( ' crypto backend forced to pyopenssl. The cryptography library does not support v2 certificates ' )
backend = ' pyopenssl '
# Fail if no backend has been found
if backend == ' auto ' :
module . fail_json ( msg = ( " Can ' t detect none of the required Python libraries "
" cryptography (>= {0} ) or PyOpenSSL (>= {1} ) " ) . format (
MINIMAL_CRYPTOGRAPHY_VERSION ,
MINIMAL_PYOPENSSL_VERSION ) )
if backend == ' pyopenssl ' :
if not PYOPENSSL_FOUND :
module . fail_json ( msg = missing_required_lib ( ' pyOpenSSL ' ) , exception = PYOPENSSL_IMP_ERR )
if module . params [ ' provider ' ] in [ ' selfsigned ' , ' ownca ' , ' assertonly ' ] :
try :
getattr ( crypto . X509Req , ' get_extensions ' )
except AttributeError :
module . fail_json ( msg = ' You need to have PyOpenSSL>=0.15 ' )
if provider == ' selfsigned ' :
certificate = SelfSignedCertificate ( module )
elif provider == ' acme ' :
certificate = AcmeCertificate ( module , ' pyopenssl ' )
elif provider == ' ownca ' :
certificate = OwnCACertificate ( module )
else :
certificate = AssertOnlyCertificate ( module )
elif backend == ' cryptography ' :
if not CRYPTOGRAPHY_FOUND :
module . fail_json ( msg = missing_required_lib ( ' cryptography ' ) , exception = CRYPTOGRAPHY_IMP_ERR )
if module . params [ ' selfsigned_version ' ] == 2 or module . params [ ' ownca_version ' ] == 2 :
module . fail_json ( msg = ' The cryptography backend does not support v2 certificates, '
' use select_crypto_backend=pyopenssl for v2 certificates ' )
if provider == ' selfsigned ' :
certificate = SelfSignedCertificateCryptography ( module )
elif provider == ' acme ' :
certificate = AcmeCertificate ( module , ' cryptography ' )
elif provider == ' ownca ' :
certificate = OwnCACertificateCryptography ( module )
else :
certificate = AssertOnlyCertificateCryptography ( module )
if module . params [ ' state ' ] == ' present ' :