From 11efa5c48bc0175770814af8efc446a90d3293c8 Mon Sep 17 00:00:00 2001 From: Matt Martz Date: Tue, 6 Feb 2024 10:50:19 -0600 Subject: [PATCH] Reduce complexity of Request.open (#81988) --- .../request-open-reduce-complexity.yml | 2 + lib/ansible/module_utils/urls.py | 129 ++++++++++-------- 2 files changed, 73 insertions(+), 58 deletions(-) create mode 100644 changelogs/fragments/request-open-reduce-complexity.yml diff --git a/changelogs/fragments/request-open-reduce-complexity.yml b/changelogs/fragments/request-open-reduce-complexity.yml new file mode 100644 index 00000000000..e74f217c6ef --- /dev/null +++ b/changelogs/fragments/request-open-reduce-complexity.yml @@ -0,0 +1,2 @@ +minor_changes: +- urls - reduce complexity of ``Request.open`` diff --git a/lib/ansible/module_utils/urls.py b/lib/ansible/module_utils/urls.py index c7152311057..c4c8e3ab7df 100644 --- a/lib/ansible/module_utils/urls.py +++ b/lib/ansible/module_utils/urls.py @@ -633,6 +633,74 @@ def rfc2822_date_string(timetuple, zone='-0000'): zone) +def _configure_auth(url, url_username, url_password, use_gssapi, force_basic_auth, use_netrc): + headers = {} + handlers = [] + + parsed = urlparse(url) + if parsed.scheme == 'ftp': + return url, headers, handlers + + username = url_username + password = url_password + + if username: + netloc = parsed.netloc + elif '@' in parsed.netloc: + credentials, netloc = parsed.netloc.split('@', 1) + if ':' in credentials: + username, password = credentials.split(':', 1) + else: + username = credentials + password = '' + username = unquote(username) + password = unquote(password) + + # reconstruct url without credentials + url = urlunparse(parsed._replace(netloc=netloc)) + + if use_gssapi: + if HTTPGSSAPIAuthHandler: # type: ignore[truthy-function] + handlers.append(HTTPGSSAPIAuthHandler(username, password)) + else: + imp_err_msg = missing_required_lib('gssapi', reason='for use_gssapi=True', + url='https://pypi.org/project/gssapi/') + raise MissingModuleError(imp_err_msg, import_traceback=GSSAPI_IMP_ERR) + + elif username and not force_basic_auth: + passman = urllib.request.HTTPPasswordMgrWithDefaultRealm() + + # this creates a password manager + passman.add_password(None, netloc, username, password) + + # because we have put None at the start it will always + # use this username/password combination for urls + # for which `theurl` is a super-url + authhandler = urllib.request.HTTPBasicAuthHandler(passman) + digest_authhandler = urllib.request.HTTPDigestAuthHandler(passman) + + # create the AuthHandler + handlers.append(authhandler) + handlers.append(digest_authhandler) + + elif username and force_basic_auth: + headers["Authorization"] = basic_auth_header(username, password) + + elif use_netrc: + try: + rc = netrc.netrc(os.environ.get('NETRC')) + login = rc.authenticators(parsed.hostname) + except IOError: + login = None + + if login: + username, dummy, password = login + if username and password: + headers["Authorization"] = basic_auth_header(username, password) + + return url, headers, handlers + + class Request: def __init__(self, headers=None, use_proxy=True, force=False, timeout=10, validate_certs=True, url_username=None, url_password=None, http_agent=None, force_basic_auth=False, @@ -772,64 +840,9 @@ class Request: if unix_socket: handlers.append(UnixHTTPHandler(unix_socket)) - parsed = urlparse(url) - if parsed.scheme != 'ftp': - username = url_username - password = url_password - - if username: - netloc = parsed.netloc - elif '@' in parsed.netloc: - credentials, netloc = parsed.netloc.split('@', 1) - if ':' in credentials: - username, password = credentials.split(':', 1) - else: - username = credentials - password = '' - username = unquote(username) - password = unquote(password) - - # reconstruct url without credentials - url = urlunparse(parsed._replace(netloc=netloc)) - - if use_gssapi: - if HTTPGSSAPIAuthHandler: # type: ignore[truthy-function] - handlers.append(HTTPGSSAPIAuthHandler(username, password)) - else: - imp_err_msg = missing_required_lib('gssapi', reason='for use_gssapi=True', - url='https://pypi.org/project/gssapi/') - raise MissingModuleError(imp_err_msg, import_traceback=GSSAPI_IMP_ERR) - - elif username and not force_basic_auth: - passman = urllib.request.HTTPPasswordMgrWithDefaultRealm() - - # this creates a password manager - passman.add_password(None, netloc, username, password) - - # because we have put None at the start it will always - # use this username/password combination for urls - # for which `theurl` is a super-url - authhandler = urllib.request.HTTPBasicAuthHandler(passman) - digest_authhandler = urllib.request.HTTPDigestAuthHandler(passman) - - # create the AuthHandler - handlers.append(authhandler) - handlers.append(digest_authhandler) - - elif username and force_basic_auth: - headers["Authorization"] = basic_auth_header(username, password) - - elif use_netrc: - try: - rc = netrc.netrc(os.environ.get('NETRC')) - login = rc.authenticators(parsed.hostname) - except IOError: - login = None - - if login: - username, dummy, password = login - if username and password: - headers["Authorization"] = basic_auth_header(username, password) + url, auth_headers, auth_handlers = _configure_auth(url, url_username, url_password, use_gssapi, force_basic_auth, use_netrc) + headers.update(auth_headers) + handlers.extend(auth_handlers) if not use_proxy: proxyhandler = urllib.request.ProxyHandler({})