get_certificate: add cryptography backend (#60599)

* Add cryptography backend for get_certificate.

* Add changelog.

* Use short names (if possible).

* Adjust version (to behave as pyOpenSSL).

* Work around bugs (needed for cryptography 1.2.3).

* Don't run cryptography backend tests for CentOS 6.

* Bump cryptography requirement to 1.6 or newer.

Otherwise, signature_algorithm_oid isn't there, either.

* Simplify requirement text.

* CentOS 6 has cryptography 1.9, so we still need to block.

* Add auto-detect test.

* Improve YAML.
pull/60705/head
Felix Fontein 5 years ago committed by GitHub
parent 38435e1bd0
commit 601a4b8f47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,2 @@
minor_changes:
- "get_certificate - now works with both PyOpenSSL and cryptography Python libraries. Autodetection can be overridden with ``select_crypto_backend`` option."

@ -1504,6 +1504,7 @@ _OID_MAP = {
_OID_LOOKUP = dict()
_NORMALIZE_NAMES = dict()
_NORMALIZE_NAMES_SHORT = dict()
for dotted, names in _OID_MAP.items():
for name in names:
@ -1513,6 +1514,7 @@ for dotted, names in _OID_MAP.items():
.format(name, dotted, _OID_LOOKUP[name])
)
_NORMALIZE_NAMES[name] = names[0]
_NORMALIZE_NAMES_SHORT[name] = names[-1]
_OID_LOOKUP[name] = dotted
for alias, original in [('userID', 'userId')]:
if alias in _NORMALIZE_NAMES:
@ -1521,14 +1523,18 @@ for alias, original in [('userID', 'userId')]:
.format(alias, original, _OID_LOOKUP[alias])
)
_NORMALIZE_NAMES[alias] = original
_NORMALIZE_NAMES_SHORT[alias] = _NORMALIZE_NAMES_SHORT[original]
_OID_LOOKUP[alias] = _OID_LOOKUP[original]
def pyopenssl_normalize_name(name):
def pyopenssl_normalize_name(name, short=False):
nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(name))
if nid != 0:
b_name = OpenSSL._util.lib.OBJ_nid2ln(nid)
name = to_text(OpenSSL._util.ffi.string(b_name))
if short:
return _NORMALIZE_NAMES_SHORT.get(name, name)
else:
return _NORMALIZE_NAMES.get(name, name)
@ -1695,10 +1701,13 @@ def cryptography_name_to_oid(name):
return x509.oid.ObjectIdentifier(dotted)
def cryptography_oid_to_name(oid):
def cryptography_oid_to_name(oid, short=False):
dotted_string = oid.dotted_string
names = _OID_MAP.get(dotted_string)
name = names[0] if names else oid._name
if short:
return _NORMALIZE_NAMES_SHORT.get(name, name)
else:
return _NORMALIZE_NAMES.get(name, name)

@ -18,6 +18,9 @@ version_added: "2.8"
short_description: Get a certificate from a host:port
description:
- Makes a secure connection and returns information about the presented certificate
- The module can use the cryptography Python library, or the pyOpenSSL Python
library. By default, it tries to detect which one is available. This can be
overridden with the I(select_crypto_backend) option."
options:
host:
description:
@ -50,13 +53,23 @@ options:
- The timeout in seconds
type: int
default: 10
select_crypto_backend:
description:
- Determines which crypto backend to use.
- The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
- If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
- If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
type: str
default: auto
choices: [ auto, cryptography, pyopenssl ]
version_added: "2.9"
notes:
- When using ca_cert on OS X it has been reported that in some conditions the validate will always succeed.
requirements:
- "python >= 2.7 when using C(proxy_host)"
- "pyOpenSSL >= 0.15"
- "cryptography >= 1.6 or pyOpenSSL >= 0.15"
'''
RETURN = '''
@ -126,14 +139,22 @@ EXAMPLES = '''
expire_days: "{{ (( cert.not_after | to_datetime('%Y%m%d%H%M%SZ')) - (ansible_date_time.iso8601 | to_datetime('%Y-%m-%dT%H:%M:%SZ')) ).days }}"
'''
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils._text import to_bytes
from ansible.module_utils import crypto as crypto_utils
from distutils.version import LooseVersion
from os.path import isfile
from ssl import get_server_certificate, DER_cert_to_PEM_cert, CERT_NONE, CERT_OPTIONAL
from socket import setdefaulttimeout, socket
from ssl import get_server_certificate, DER_cert_to_PEM_cert, CERT_NONE, CERT_OPTIONAL
import atexit
import base64
import datetime
import traceback
MINIMAL_PYOPENSSL_VERSION = '0.15'
MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
CREATE_DEFAULT_CONTEXT_IMP_ERR = None
try:
@ -146,12 +167,27 @@ else:
PYOPENSSL_IMP_ERR = None
try:
import OpenSSL
from OpenSSL import crypto
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
except ImportError:
PYOPENSSL_IMP_ERR = traceback.format_exc()
pyopenssl_found = False
PYOPENSSL_FOUND = False
else:
PYOPENSSL_FOUND = True
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
import cryptography.exceptions
import cryptography.x509
from cryptography.hazmat.backends import default_backend as cryptography_backend
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
pyopenssl_found = True
CRYPTOGRAPHY_FOUND = True
def main():
@ -163,6 +199,7 @@ def main():
proxy_host=dict(type='str'),
proxy_port=dict(type='int', default=8080),
timeout=dict(type='int', default=10),
select_crypto_backend=dict(type='str', choices=['auto', 'pyopenssl', 'cryptography'], default='auto'),
),
)
@ -173,11 +210,39 @@ def main():
proxy_port = module.params.get('proxy_port')
timeout = module.params.get('timeout')
backend = module.params.get('select_crypto_backend')
if backend == 'auto':
# Detection what is possible
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
# First try cryptography, then pyOpenSSL
if can_use_cryptography:
backend = 'cryptography'
elif can_use_pyopenssl:
backend = 'pyopenssl'
# Success?
if backend == 'auto':
module.fail_json(msg=("Can't detect any of the required Python libraries "
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
MINIMAL_CRYPTOGRAPHY_VERSION,
MINIMAL_PYOPENSSL_VERSION))
if backend == 'pyopenssl':
if not PYOPENSSL_FOUND:
module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
exception=PYOPENSSL_IMP_ERR)
elif backend == 'cryptography':
if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
exception=CRYPTOGRAPHY_IMP_ERR)
result = dict(
changed=False,
)
if not pyopenssl_found:
if not PYOPENSSL_FOUND:
module.fail_json(msg=missing_required_lib('pyOpenSSL >= 0.15'), exception=PYOPENSSL_IMP_ERR)
if timeout:
@ -210,18 +275,19 @@ def main():
cert = ctx.wrap_socket(sock, server_hostname=host).getpeercert(True)
cert = DER_cert_to_PEM_cert(cert)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
except Exception as e:
module.fail_json(msg="Failed to get cert from port with error: {0}".format(e))
else:
try:
cert = get_server_certificate((host, port), ca_certs=ca_cert)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
except Exception as e:
module.fail_json(msg="Failed to get cert from port with error: {0}".format(e))
result['cert'] = cert
if backend == 'pyopenssl':
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
result['subject'] = {}
for component in x509.get_subject().get_components():
result['subject'][component[0]] = component[1]
@ -250,6 +316,41 @@ def main():
result['version'] = x509.get_version()
elif backend == 'cryptography':
x509 = cryptography.x509.load_pem_x509_certificate(to_bytes(cert), cryptography_backend())
result['subject'] = {}
for attribute in x509.subject:
result['subject'][crypto_utils.cryptography_oid_to_name(attribute.oid, short=True)] = attribute.value
result['expired'] = x509.not_valid_after < datetime.datetime.utcnow()
result['extensions'] = []
for dotted_number, entry in crypto_utils.cryptography_get_extensions_from_cert(x509).items():
oid = cryptography.x509.oid.ObjectIdentifier(dotted_number)
result['extensions'].append({
'critical': entry['critical'],
'asn1_data': base64.b64decode(entry['value']),
'name': crypto_utils.cryptography_oid_to_name(oid, short=True),
})
result['issuer'] = {}
for attribute in x509.issuer:
result['issuer'][crypto_utils.cryptography_oid_to_name(attribute.oid, short=True)] = attribute.value
result['not_after'] = x509.not_valid_after.strftime('%Y%m%d%H%M%SZ')
result['not_before'] = x509.not_valid_before.strftime('%Y%m%d%H%M%SZ')
result['serial_number'] = x509.serial_number
result['signature_algorithm'] = crypto_utils.cryptography_oid_to_name(x509.signature_algorithm_oid)
# We need the -1 offset to get the same values as pyOpenSSL
if x509.version == cryptography.x509.Version.v1:
result['version'] = 1 - 1
elif x509.version == cryptography.x509.Version.v3:
result['version'] = 3 - 1
else:
result['version'] = "unknown"
module.exit_json(**result)

@ -1,5 +1,42 @@
---
- block:
- name: Get servers certificate with backend auto-detection
get_certificate:
host: "{{ httpbin_host }}"
port: 443
when: |
pyopenssl_version.stdout is version('0.15', '>=') or
(cryptography_version.stdout is version('1.6', '>=') and (ansible_distribution != 'CentOS' or ansible_distribution_major_version|int > 6))
- block:
- include_tasks: ../tests/validate.yml
vars:
select_crypto_backend: pyopenssl
when: pyopenssl_version.stdout is version('0.15', '>=')
- name: Remove output directory
file:
path: "{{ output_dir }}"
state: absent
- name: Re-create output directory
file:
path: "{{ output_dir }}"
state: directory
- block:
- include_tasks: ../tests/validate.yml
vars:
select_crypto_backend: cryptography
# The module doesn't work with CentOS 6. Since the pyOpenSSL installed there is too old,
# we never noticed before. This becomes a problem with the new cryptography backend,
# since there is a new enough cryptography version...
when: |
cryptography_version.stdout is version('1.6', '>=') and
(ansible_distribution != 'CentOS' or ansible_distribution_major_version|int > 6)

@ -1,7 +1,9 @@
---
- name: Get servers certificate
get_certificate:
host: "{{ httpbin_host }}"
port: 443
select_crypto_backend: "{{ select_crypto_backend }}"
register: result
- debug: var=result
@ -18,6 +20,7 @@
get_certificate:
host: "{{ httpbin_host }}"
port: 80
select_crypto_backend: "{{ select_crypto_backend }}"
register: result
ignore_errors: true
@ -33,6 +36,7 @@
host: "{{ httpbin_host }}"
port: 1234
timeout: 1
select_crypto_backend: "{{ select_crypto_backend }}"
register: result
ignore_errors: true
@ -48,6 +52,7 @@
host: "{{ httpbin_host }}"
port: 443
ca_cert: dn.e
select_crypto_backend: "{{ select_crypto_backend }}"
register: result
ignore_errors: true
@ -68,6 +73,7 @@
ca_cert: '{{ output_dir }}/temp.pem'
host: "{{ httpbin_host }}"
port: 443
select_crypto_backend: "{{ select_crypto_backend }}"
register: result
- assert:
@ -90,6 +96,7 @@
ca_cert: '{{ my_temp_dir.path }}/bogus_ca.pem'
host: "{{ httpbin_host }}"
port: 443
select_crypto_backend: "{{ select_crypto_backend }}"
register: result
ignore_errors: true

Loading…
Cancel
Save