cert validation fixes - Attempt 2 (#55953)

* Attempt 2 of cert validation fixes

* Remove unused code

* Cleanup the tmp cert using atexit

* Fix linting issues

* Only add SSLValidationHandler when not HAS_SSLCONTEXT

* Catch value errors on non PEM certs

* Only catch NotImplementedError to avoid masking issues

* set self._context even with PyOpenSSLContext for conformity

* Fix error building

* normalize how we interact with the context we create

* Remove unused code

* Address test for py3.7 message difference

* open_url should pass the ca_path through

* Account for new error in url lookup test

* Guard some code behind whether or not we are validating certs

* Make _make_context public

* Move atexit.register up to where the tmp file is created
pull/57249/head
Matt Martz 5 years ago committed by ansibot
parent c493593b4b
commit 8bd4e2a144

@ -32,7 +32,9 @@ for users making use of a module. If possible, avoid third party libraries by us
this code instead. this code instead.
''' '''
import atexit
import base64 import base64
import functools
import netrc import netrc
import os import os
import platform import platform
@ -401,9 +403,9 @@ if hasattr(httplib, 'HTTPSConnection') and hasattr(urllib_request, 'HTTPSHandler
httplib.HTTPSConnection.__init__(self, *args, **kwargs) httplib.HTTPSConnection.__init__(self, *args, **kwargs)
self.context = None self.context = None
if HAS_SSLCONTEXT: if HAS_SSLCONTEXT:
self.context = create_default_context() self.context = self._context
elif HAS_URLLIB3_PYOPENSSLCONTEXT: elif HAS_URLLIB3_PYOPENSSLCONTEXT:
self.context = PyOpenSSLContext(PROTOCOL) self.context = self._context = PyOpenSSLContext(PROTOCOL)
if self.context and self.cert_file: if self.context and self.cert_file:
self.context.load_cert_chain(self.cert_file, self.key_file) self.context.load_cert_chain(self.cert_file, self.key_file)
@ -434,7 +436,16 @@ if hasattr(httplib, 'HTTPSConnection') and hasattr(urllib_request, 'HTTPSHandler
class CustomHTTPSHandler(urllib_request.HTTPSHandler): class CustomHTTPSHandler(urllib_request.HTTPSHandler):
def https_open(self, req): def https_open(self, req):
return self.do_open(CustomHTTPSConnection, req) kwargs = {}
if HAS_SSLCONTEXT:
kwargs['context'] = self._context
return self.do_open(
functools.partial(
CustomHTTPSConnection,
**kwargs
),
req
)
https_request = AbstractHTTPHandler.do_request_ https_request = AbstractHTTPHandler.do_request_
@ -635,7 +646,7 @@ class RequestWithMethod(urllib_request.Request):
return urllib_request.Request.get_method(self) return urllib_request.Request.get_method(self)
def RedirectHandlerFactory(follow_redirects=None, validate_certs=True): def RedirectHandlerFactory(follow_redirects=None, validate_certs=True, ca_path=None):
"""This is a class factory that closes over the value of """This is a class factory that closes over the value of
``follow_redirects`` so that the RedirectHandler class has access to ``follow_redirects`` so that the RedirectHandler class has access to
that value without having to use globals, and potentially cause problems that value without having to use globals, and potentially cause problems
@ -650,9 +661,10 @@ def RedirectHandlerFactory(follow_redirects=None, validate_certs=True):
""" """
def redirect_request(self, req, fp, code, msg, hdrs, newurl): def redirect_request(self, req, fp, code, msg, hdrs, newurl):
handler = maybe_add_ssl_handler(newurl, validate_certs) if not HAS_SSLCONTEXT:
if handler: handler = maybe_add_ssl_handler(newurl, validate_certs, ca_path=ca_path)
urllib_request._opener.add_handler(handler) if handler:
urllib_request._opener.add_handler(handler)
# Preserve urllib2 compatibility # Preserve urllib2 compatibility
if follow_redirects == 'urllib2': if follow_redirects == 'urllib2':
@ -735,7 +747,7 @@ def build_ssl_validation_error(hostname, port, paths, exc=None):
' python >= 2.7.9 on your managed machine') ' python >= 2.7.9 on your managed machine')
msg.append(' (the python executable used (%s) is version: %s)' % msg.append(' (the python executable used (%s) is version: %s)' %
(sys.executable, ''.join(sys.version.splitlines()))) (sys.executable, ''.join(sys.version.splitlines())))
if not HAS_URLLIB3_PYOPENSSLCONTEXT or not HAS_URLLIB3_SSL_WRAP_SOCKET: if not HAS_URLLIB3_PYOPENSSLCONTEXT and not HAS_URLLIB3_SSL_WRAP_SOCKET:
msg.append('or you can install the `urllib3`, `pyOpenSSL`,' msg.append('or you can install the `urllib3`, `pyOpenSSL`,'
' `ndg-httpsclient`, and `pyasn1` python modules') ' `ndg-httpsclient`, and `pyasn1` python modules')
@ -752,6 +764,15 @@ def build_ssl_validation_error(hostname, port, paths, exc=None):
raise SSLValidationError(' '.join(msg) % (hostname, port, ", ".join(paths))) raise SSLValidationError(' '.join(msg) % (hostname, port, ", ".join(paths)))
def atexit_remove_file(filename):
if os.path.exists(filename):
try:
os.unlink(filename)
except Exception:
# just ignore if we cannot delete, things should be ok
pass
class SSLValidationHandler(urllib_request.BaseHandler): class SSLValidationHandler(urllib_request.BaseHandler):
''' '''
A custom handler class for SSL validation. A custom handler class for SSL validation.
@ -762,21 +783,38 @@ class SSLValidationHandler(urllib_request.BaseHandler):
''' '''
CONNECT_COMMAND = "CONNECT %s:%s HTTP/1.0\r\n" CONNECT_COMMAND = "CONNECT %s:%s HTTP/1.0\r\n"
def __init__(self, hostname, port): def __init__(self, hostname, port, ca_path=None):
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
self.ca_path = ca_path
def get_ca_certs(self): def get_ca_certs(self):
# tries to find a valid CA cert in one of the # tries to find a valid CA cert in one of the
# standard locations for the current distribution # standard locations for the current distribution
ca_certs = [] ca_certs = []
cadata = bytearray()
paths_checked = [] paths_checked = []
if self.ca_path:
paths_checked = [self.ca_path]
with open(to_bytes(self.ca_path, errors='surrogate_or_strict'), 'rb') as f:
if HAS_SSLCONTEXT:
cadata.extend(
ssl.PEM_cert_to_DER_cert(
to_native(f.read(), errors='surrogate_or_strict')
)
)
else:
ca_certs.append(f.read())
return ca_certs, cadata, paths_checked
if not HAS_SSLCONTEXT:
paths_checked.append('/etc/ssl/certs')
system = to_text(platform.system(), errors='surrogate_or_strict') system = to_text(platform.system(), errors='surrogate_or_strict')
# build a list of paths to check for .crt/.pem files # build a list of paths to check for .crt/.pem files
# based on the platform type # based on the platform type
paths_checked.append('/etc/ssl/certs')
if system == u'Linux': if system == u'Linux':
paths_checked.append('/etc/pki/ca-trust/extracted/pem') paths_checked.append('/etc/pki/ca-trust/extracted/pem')
paths_checked.append('/etc/pki/tls/certs') paths_checked.append('/etc/pki/tls/certs')
@ -794,13 +832,21 @@ class SSLValidationHandler(urllib_request.BaseHandler):
# location if the OS platform one is not available # location if the OS platform one is not available
paths_checked.append('/etc/ansible') paths_checked.append('/etc/ansible')
tmp_fd, tmp_path = tempfile.mkstemp() tmp_path = None
to_add_fd, to_add_path = tempfile.mkstemp() if not HAS_SSLCONTEXT:
to_add = False tmp_fd, tmp_path = tempfile.mkstemp()
atexit.register(atexit_remove_file, tmp_path)
# Write the dummy ca cert if we are running on macOS # Write the dummy ca cert if we are running on macOS
if system == u'Darwin': if system == u'Darwin':
os.write(tmp_fd, b_DUMMY_CA_CERT) if HAS_SSLCONTEXT:
cadata.extend(
ssl.PEM_cert_to_DER_cert(
to_native(b_DUMMY_CA_CERT, errors='surrogate_or_strict')
)
)
else:
os.write(tmp_fd, b_DUMMY_CA_CERT)
# Default Homebrew path for OpenSSL certs # Default Homebrew path for OpenSSL certs
paths_checked.append('/usr/local/etc/openssl') paths_checked.append('/usr/local/etc/openssl')
@ -814,26 +860,29 @@ class SSLValidationHandler(urllib_request.BaseHandler):
full_path = os.path.join(path, f) full_path = os.path.join(path, f)
if os.path.isfile(full_path) and os.path.splitext(f)[1] in ('.crt', '.pem'): if os.path.isfile(full_path) and os.path.splitext(f)[1] in ('.crt', '.pem'):
try: try:
cert_file = open(full_path, 'rb')
cert = cert_file.read()
cert_file.close()
os.write(tmp_fd, cert)
os.write(tmp_fd, b'\n')
if full_path not in LOADED_VERIFY_LOCATIONS: if full_path not in LOADED_VERIFY_LOCATIONS:
to_add = True with open(full_path, 'rb') as cert_file:
os.write(to_add_fd, cert) b_cert = cert_file.read()
os.write(to_add_fd, b'\n') if HAS_SSLCONTEXT:
LOADED_VERIFY_LOCATIONS.add(full_path) try:
cadata.extend(
ssl.PEM_cert_to_DER_cert(
to_native(b_cert, errors='surrogate_or_strict')
)
)
except ValueError:
continue
else:
os.write(tmp_fd, b_cert)
os.write(tmp_fd, b'\n')
except (OSError, IOError): except (OSError, IOError):
pass pass
if not to_add: if HAS_SSLCONTEXT:
try: default_verify_paths = ssl.get_default_verify_paths()
os.remove(to_add_path) paths_checked[:0] = [default_verify_paths.capath]
except OSError:
pass return (tmp_path, cadata, paths_checked)
to_add_path = None
return (tmp_path, to_add_path, paths_checked)
def validate_proxy_response(self, response, valid_codes=None): def validate_proxy_response(self, response, valid_codes=None):
''' '''
@ -864,47 +913,40 @@ class SSLValidationHandler(urllib_request.BaseHandler):
return False return False
return True return True
def _make_context(self, to_add_ca_cert_path): def make_context(self, cafile, cadata):
cafile = self.ca_path or cafile
if self.ca_path:
cadata = None
else:
cadata = cadata or None
if HAS_SSLCONTEXT: if HAS_SSLCONTEXT:
context = create_default_context() context = create_default_context(cafile=cafile)
elif HAS_URLLIB3_PYOPENSSLCONTEXT: elif HAS_URLLIB3_PYOPENSSLCONTEXT:
context = PyOpenSSLContext(PROTOCOL) context = PyOpenSSLContext(PROTOCOL)
else: else:
raise NotImplementedError('Host libraries are too old to support creating an sslcontext') raise NotImplementedError('Host libraries are too old to support creating an sslcontext')
if to_add_ca_cert_path: if cafile or cadata:
context.load_verify_locations(to_add_ca_cert_path) context.load_verify_locations(cafile=cafile, cadata=cadata)
return context return context
def http_request(self, req): def http_request(self, req):
tmp_ca_cert_path, to_add_ca_cert_path, paths_checked = self.get_ca_certs() tmp_ca_cert_path, cadata, paths_checked = self.get_ca_certs()
# Detect if 'no_proxy' environment variable is set and if our URL is included
use_proxy = self.detect_no_proxy(req.get_full_url())
https_proxy = os.environ.get('https_proxy') https_proxy = os.environ.get('https_proxy')
context = None context = None
try: try:
context = self._make_context(to_add_ca_cert_path) context = self.make_context(tmp_ca_cert_path, cadata)
except Exception: except NotImplementedError:
# We'll make do with no context below # We'll make do with no context below
pass pass
# Detect if 'no_proxy' environment variable is set and if our URL is included
use_proxy = self.detect_no_proxy(req.get_full_url())
if not use_proxy:
# ignore proxy settings for this host request
if tmp_ca_cert_path:
try:
os.remove(tmp_ca_cert_path)
except OSError:
pass
if to_add_ca_cert_path:
try:
os.remove(to_add_ca_cert_path)
except OSError:
pass
return req
try: try:
if https_proxy: if use_proxy and https_proxy:
proxy_parts = generic_urlparse(urlparse(https_proxy)) proxy_parts = generic_urlparse(urlparse(https_proxy))
port = proxy_parts.get('port') or 443 port = proxy_parts.get('port') or 443
proxy_hostname = proxy_parts.get('hostname', None) proxy_hostname = proxy_parts.get('hostname', None)
@ -952,27 +994,12 @@ class SSLValidationHandler(urllib_request.BaseHandler):
except socket.error as e: except socket.error as e:
raise ConnectionError('Failed to connect to %s at port %s: %s' % (self.hostname, self.port, to_native(e))) raise ConnectionError('Failed to connect to %s at port %s: %s' % (self.hostname, self.port, to_native(e)))
try:
# cleanup the temp file created, don't worry
# if it fails for some reason
os.remove(tmp_ca_cert_path)
except Exception:
pass
try:
# cleanup the temp file created, don't worry
# if it fails for some reason
if to_add_ca_cert_path:
os.remove(to_add_ca_cert_path)
except Exception:
pass
return req return req
https_request = http_request https_request = http_request
def maybe_add_ssl_handler(url, validate_certs): def maybe_add_ssl_handler(url, validate_certs, ca_path=None):
parsed = generic_urlparse(urlparse(url)) parsed = generic_urlparse(urlparse(url))
if parsed.scheme == 'https' and validate_certs: if parsed.scheme == 'https' and validate_certs:
if not HAS_SSL: if not HAS_SSL:
@ -981,7 +1008,7 @@ def maybe_add_ssl_handler(url, validate_certs):
# create the SSL validation handler and # create the SSL validation handler and
# add it to the list of handlers # add it to the list of handlers
return SSLValidationHandler(parsed.hostname, parsed.port or 443) return SSLValidationHandler(parsed.hostname, parsed.port or 443, ca_path=ca_path)
def rfc2822_date_string(timetuple, zone='-0000'): def rfc2822_date_string(timetuple, zone='-0000'):
@ -1004,7 +1031,8 @@ def rfc2822_date_string(timetuple, zone='-0000'):
class Request: class Request:
def __init__(self, headers=None, use_proxy=True, force=False, timeout=10, validate_certs=True, def __init__(self, headers=None, use_proxy=True, force=False, timeout=10, validate_certs=True,
url_username=None, url_password=None, http_agent=None, force_basic_auth=False, url_username=None, url_password=None, http_agent=None, force_basic_auth=False,
follow_redirects='urllib2', client_cert=None, client_key=None, cookies=None, unix_socket=None): follow_redirects='urllib2', client_cert=None, client_key=None, cookies=None, unix_socket=None,
ca_path=None):
"""This class works somewhat similarly to the ``Session`` class of from requests """This class works somewhat similarly to the ``Session`` class of from requests
by defining a cookiejar that an be used across requests as well as cascaded defaults that by defining a cookiejar that an be used across requests as well as cascaded defaults that
can apply to repeated requests can apply to repeated requests
@ -1038,6 +1066,7 @@ class Request:
self.client_cert = client_cert self.client_cert = client_cert
self.client_key = client_key self.client_key = client_key
self.unix_socket = unix_socket self.unix_socket = unix_socket
self.ca_path = ca_path
if isinstance(cookies, cookiejar.CookieJar): if isinstance(cookies, cookiejar.CookieJar):
self.cookies = cookies self.cookies = cookies
else: else:
@ -1053,7 +1082,7 @@ class Request:
url_username=None, url_password=None, http_agent=None, url_username=None, url_password=None, http_agent=None,
force_basic_auth=None, follow_redirects=None, force_basic_auth=None, follow_redirects=None,
client_cert=None, client_key=None, cookies=None, use_gssapi=False, client_cert=None, client_key=None, cookies=None, use_gssapi=False,
unix_socket=None): unix_socket=None, ca_path=None):
""" """
Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3) Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3)
@ -1089,7 +1118,8 @@ class Request:
request request
:kwarg use_gssapi: (optional) Use GSSAPI handler of requests. :kwarg use_gssapi: (optional) Use GSSAPI handler of requests.
:kwarg unix_socket: (optional) String of file system path to unix socket file to use when establishing :kwarg unix_socket: (optional) String of file system path to unix socket file to use when establishing
connection to the provided url connection to the provided url
:kwarg ca_path: (optional) String of file system path to CA cert bundle to use
:returns: HTTPResponse :returns: HTTPResponse
""" """
@ -1114,14 +1144,15 @@ class Request:
client_key = self._fallback(client_key, self.client_key) client_key = self._fallback(client_key, self.client_key)
cookies = self._fallback(cookies, self.cookies) cookies = self._fallback(cookies, self.cookies)
unix_socket = self._fallback(unix_socket, self.unix_socket) unix_socket = self._fallback(unix_socket, self.unix_socket)
ca_path = self._fallback(ca_path, self.ca_path)
handlers = [] handlers = []
if unix_socket: if unix_socket:
handlers.append(UnixHTTPHandler(unix_socket)) handlers.append(UnixHTTPHandler(unix_socket))
ssl_handler = maybe_add_ssl_handler(url, validate_certs) ssl_handler = maybe_add_ssl_handler(url, validate_certs, ca_path=ca_path)
if ssl_handler: if ssl_handler and not HAS_SSLCONTEXT:
handlers.append(ssl_handler) handlers.append(ssl_handler)
if HAS_GSSAPI and use_gssapi: if HAS_GSSAPI and use_gssapi:
handlers.append(urllib_gssapi.HTTPSPNEGOAuthHandler()) handlers.append(urllib_gssapi.HTTPSPNEGOAuthHandler())
@ -1182,6 +1213,7 @@ class Request:
proxyhandler = urllib_request.ProxyHandler({}) proxyhandler = urllib_request.ProxyHandler({})
handlers.append(proxyhandler) handlers.append(proxyhandler)
context = None
if HAS_SSLCONTEXT and not validate_certs: if HAS_SSLCONTEXT and not validate_certs:
# In 2.7.9, the default context validates certificates # In 2.7.9, the default context validates certificates
context = SSLContext(ssl.PROTOCOL_SSLv23) context = SSLContext(ssl.PROTOCOL_SSLv23)
@ -1199,13 +1231,23 @@ class Request:
client_key=client_key, client_key=client_key,
unix_socket=unix_socket)) unix_socket=unix_socket))
if ssl_handler and HAS_SSLCONTEXT and validate_certs:
tmp_ca_path, cadata, paths_checked = ssl_handler.get_ca_certs()
try:
context = ssl_handler.make_context(tmp_ca_path, cadata)
except NotImplementedError:
pass
# pre-2.6 versions of python cannot use the custom https # pre-2.6 versions of python cannot use the custom https
# handler, since the socket class is lacking create_connection. # handler, since the socket class is lacking create_connection.
# Some python builds lack HTTPS support. # Some python builds lack HTTPS support.
if hasattr(socket, 'create_connection') and CustomHTTPSHandler: if hasattr(socket, 'create_connection') and CustomHTTPSHandler:
handlers.append(CustomHTTPSHandler) kwargs = {}
if HAS_SSLCONTEXT:
kwargs['context'] = context
handlers.append(CustomHTTPSHandler(**kwargs))
handlers.append(RedirectHandlerFactory(follow_redirects, validate_certs)) handlers.append(RedirectHandlerFactory(follow_redirects, validate_certs, ca_path=ca_path))
# add some nicer cookie handling # add some nicer cookie handling
if cookies is not None: if cookies is not None:
@ -1323,7 +1365,7 @@ def open_url(url, data=None, headers=None, method=None, use_proxy=True,
url_username=None, url_password=None, http_agent=None, url_username=None, url_password=None, http_agent=None,
force_basic_auth=False, follow_redirects='urllib2', force_basic_auth=False, follow_redirects='urllib2',
client_cert=None, client_key=None, cookies=None, client_cert=None, client_key=None, cookies=None,
use_gssapi=False, unix_socket=None): use_gssapi=False, unix_socket=None, ca_path=None):
''' '''
Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3) Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3)
@ -1335,7 +1377,7 @@ def open_url(url, data=None, headers=None, method=None, use_proxy=True,
url_username=url_username, url_password=url_password, http_agent=http_agent, url_username=url_username, url_password=url_password, http_agent=http_agent,
force_basic_auth=force_basic_auth, follow_redirects=follow_redirects, force_basic_auth=force_basic_auth, follow_redirects=follow_redirects,
client_cert=client_cert, client_key=client_key, cookies=cookies, client_cert=client_cert, client_key=client_key, cookies=cookies,
use_gssapi=use_gssapi, unix_socket=unix_socket) use_gssapi=use_gssapi, unix_socket=unix_socket, ca_path=ca_path)
# #
@ -1371,7 +1413,7 @@ def url_argument_spec():
def fetch_url(module, url, data=None, headers=None, method=None, def fetch_url(module, url, data=None, headers=None, method=None,
use_proxy=True, force=False, last_mod_time=None, timeout=10, use_proxy=True, force=False, last_mod_time=None, timeout=10,
use_gssapi=False, unix_socket=None): use_gssapi=False, unix_socket=None, ca_path=None):
"""Sends a request via HTTP(S) or FTP (needs the module as parameter) """Sends a request via HTTP(S) or FTP (needs the module as parameter)
:arg module: The AnsibleModule (used to get username, password etc. (s.b.). :arg module: The AnsibleModule (used to get username, password etc. (s.b.).
@ -1386,7 +1428,8 @@ def fetch_url(module, url, data=None, headers=None, method=None,
:kwarg int timeout: Default: 10 :kwarg int timeout: Default: 10
:kwarg boolean use_gssapi: Default: False :kwarg boolean use_gssapi: Default: False
:kwarg unix_socket: (optional) String of file system path to unix socket file to use when establishing :kwarg unix_socket: (optional) String of file system path to unix socket file to use when establishing
connection to the provided url connection to the provided url
:kwarg ca_path: (optional) String of file system path to CA cert bundle to use
:returns: A tuple of (**response**, **info**). Use ``response.read()`` to read the data. :returns: A tuple of (**response**, **info**). Use ``response.read()`` to read the data.
The **info** contains the 'status' and other meta data. When a HttpError (status > 400) The **info** contains the 'status' and other meta data. When a HttpError (status > 400)
@ -1437,7 +1480,7 @@ def fetch_url(module, url, data=None, headers=None, method=None,
url_password=password, http_agent=http_agent, force_basic_auth=force_basic_auth, url_password=password, http_agent=http_agent, force_basic_auth=force_basic_auth,
follow_redirects=follow_redirects, client_cert=client_cert, follow_redirects=follow_redirects, client_cert=client_cert,
client_key=client_key, cookies=cookies, use_gssapi=use_gssapi, client_key=client_key, cookies=cookies, use_gssapi=use_gssapi,
unix_socket=unix_socket) unix_socket=unix_socket, ca_path=ca_path)
# Lowercase keys, to conform to py2 behavior, so that py3 and py2 are predictable # Lowercase keys, to conform to py2 behavior, so that py3 and py2 are predictable
info.update(dict((k.lower(), v) for k, v in r.info().items())) info.update(dict((k.lower(), v) for k, v in r.info().items()))

@ -119,7 +119,7 @@
assert: assert:
that: that:
- "result is failed" - "result is failed"
- "'Failed to validate the SSL certificate' in result.msg or ( result.msg is match('hostname .* doesn.t match .*'))" - "'Failed to validate the SSL certificate' in result.msg or 'Hostname mismatch' in result.msg or ( result.msg is match('hostname .* doesn.t match .*'))"
- "stat_result.stat.exists == false" - "stat_result.stat.exists == false"
- name: test https fetch to a site with mismatched hostname and certificate and validate_certs=no - name: test https fetch to a site with mismatched hostname and certificate and validate_certs=no

@ -231,7 +231,7 @@
- assert: - assert:
that: that:
- "url_invalid_cert.failed" - "url_invalid_cert.failed"
- "'Error validating the server' in url_invalid_cert.msg or ( url_invalid_cert.msg is search('hostname .* doesn.t match .*'))" - "'Error validating the server' in url_invalid_cert.msg or 'Hostname mismatch' in url_invalid_cert.msg or ( url_invalid_cert.msg is search('hostname .* doesn.t match .*'))"
- name: Test that retrieving a url with invalid cert with validate_certs=False works - name: Test that retrieving a url with invalid cert with validate_certs=False works
set_fact: set_fact:

@ -103,7 +103,7 @@
assert: assert:
that: that:
- result.failed == true - result.failed == true
- "'Failed to validate the SSL certificate' in result.msg or ( result.msg is match('hostname .* doesn.t match .*'))" - "'Failed to validate the SSL certificate' in result.msg or 'Hostname mismatch' in result.msg or (result.msg is match('hostname .* doesn.t match .*'))"
- stat_result.stat.exists == false - stat_result.stat.exists == false
- result.status is defined - result.status is defined
- result.status == -1 - result.status == -1

@ -6,7 +6,7 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type __metaclass__ = type
from ansible.module_utils.urls import RedirectHandlerFactory, urllib_request, urllib_error from ansible.module_utils.urls import HAS_SSLCONTEXT, RedirectHandlerFactory, urllib_request, urllib_error
from ansible.module_utils.six import StringIO from ansible.module_utils.six import StringIO
import pytest import pytest
@ -127,7 +127,7 @@ def test_redir_validate_certs(urllib_req, request_body, mocker):
inst = handler() inst = handler()
inst.redirect_request(urllib_req, request_body, 301, '301 Moved Permanently', {}, 'https://docs.ansible.com/') inst.redirect_request(urllib_req, request_body, 301, '301 Moved Permanently', {}, 'https://docs.ansible.com/')
assert opener_mock.add_handler.call_count == 1 assert opener_mock.add_handler.call_count == int(not HAS_SSLCONTEXT)
def test_redir_http_error_308_urllib2(urllib_req, request_body): def test_redir_http_error_308_urllib2(urllib_req, request_body):

@ -47,6 +47,7 @@ def test_Request_fallback(urlopen_mock, install_opener_mock, mocker):
client_key='/tmp/client.key', client_key='/tmp/client.key',
cookies=cookies, cookies=cookies,
unix_socket='/foo/bar/baz.sock', unix_socket='/foo/bar/baz.sock',
ca_path='/foo/bar/baz.pem',
) )
fallback_mock = mocker.spy(request, '_fallback') fallback_mock = mocker.spy(request, '_fallback')
@ -66,10 +67,11 @@ def test_Request_fallback(urlopen_mock, install_opener_mock, mocker):
call(None, '/tmp/client.key'), # client_key call(None, '/tmp/client.key'), # client_key
call(None, cookies), # cookies call(None, cookies), # cookies
call(None, '/foo/bar/baz.sock'), # unix_socket call(None, '/foo/bar/baz.sock'), # unix_socket
call(None, '/foo/bar/baz.pem'), # ca_path
] ]
fallback_mock.assert_has_calls(calls) fallback_mock.assert_has_calls(calls)
assert fallback_mock.call_count == 13 # All but headers use fallback assert fallback_mock.call_count == 14 # All but headers use fallback
args = urlopen_mock.call_args[0] args = urlopen_mock.call_args[0]
assert args[1] is None # data, this is handled in the Request not urlopen assert args[1] is None # data, this is handled in the Request not urlopen
@ -100,17 +102,22 @@ def test_Request_open(urlopen_mock, install_opener_mock):
opener = install_opener_mock.call_args[0][0] opener = install_opener_mock.call_args[0][0]
handlers = opener.handlers handlers = opener.handlers
expected_handlers = ( if not HAS_SSLCONTEXT:
SSLValidationHandler, expected_handlers = (
RedirectHandlerFactory(), # factory, get handler SSLValidationHandler,
) RedirectHandlerFactory(), # factory, get handler
)
else:
expected_handlers = (
RedirectHandlerFactory(), # factory, get handler
)
found_handlers = [] found_handlers = []
for handler in handlers: for handler in handlers:
if isinstance(handler, SSLValidationHandler) or handler.__class__.__name__ == 'RedirectHandler': if isinstance(handler, SSLValidationHandler) or handler.__class__.__name__ == 'RedirectHandler':
found_handlers.append(handler) found_handlers.append(handler)
assert len(found_handlers) == 2 assert len(found_handlers) == len(expected_handlers)
def test_Request_open_http(urlopen_mock, install_opener_mock): def test_Request_open_http(urlopen_mock, install_opener_mock):
@ -446,4 +453,4 @@ def test_open_url(urlopen_mock, install_opener_mock, mocker):
url_username=None, url_password=None, http_agent=None, url_username=None, url_password=None, http_agent=None,
force_basic_auth=False, follow_redirects='urllib2', force_basic_auth=False, follow_redirects='urllib2',
client_cert=None, client_key=None, cookies=None, use_gssapi=False, client_cert=None, client_key=None, cookies=None, use_gssapi=False,
unix_socket=None) unix_socket=None, ca_path=None)

@ -67,7 +67,7 @@ def test_fetch_url(open_url_mock, fake_ansible_module):
open_url_mock.assert_called_once_with('http://ansible.com/', client_cert=None, client_key=None, cookies=kwargs['cookies'], data=None, open_url_mock.assert_called_once_with('http://ansible.com/', client_cert=None, client_key=None, cookies=kwargs['cookies'], data=None,
follow_redirects='urllib2', force=False, force_basic_auth='', headers=None, follow_redirects='urllib2', force=False, force_basic_auth='', headers=None,
http_agent='ansible-httpget', last_mod_time=None, method=None, timeout=10, url_password='', url_username='', http_agent='ansible-httpget', last_mod_time=None, method=None, timeout=10, url_password='', url_username='',
use_proxy=True, validate_certs=True, use_gssapi=False, unix_socket=None) use_proxy=True, validate_certs=True, use_gssapi=False, unix_socket=None, ca_path=None)
def test_fetch_url_params(open_url_mock, fake_ansible_module): def test_fetch_url_params(open_url_mock, fake_ansible_module):
@ -89,7 +89,7 @@ def test_fetch_url_params(open_url_mock, fake_ansible_module):
open_url_mock.assert_called_once_with('http://ansible.com/', client_cert='client.pem', client_key='client.key', cookies=kwargs['cookies'], data=None, open_url_mock.assert_called_once_with('http://ansible.com/', client_cert='client.pem', client_key='client.key', cookies=kwargs['cookies'], data=None,
follow_redirects='all', force=False, force_basic_auth=True, headers=None, follow_redirects='all', force=False, force_basic_auth=True, headers=None,
http_agent='ansible-test', last_mod_time=None, method=None, timeout=10, url_password='passwd', url_username='user', http_agent='ansible-test', last_mod_time=None, method=None, timeout=10, url_password='passwd', url_username='user',
use_proxy=True, validate_certs=False, use_gssapi=False, unix_socket=None) use_proxy=True, validate_certs=False, use_gssapi=False, unix_socket=None, ca_path=None)
def test_fetch_url_cookies(mocker, fake_ansible_module): def test_fetch_url_cookies(mocker, fake_ansible_module):

Loading…
Cancel
Save