From ee725846f070fc6b0dd79b5e8c5199ec652faf87 Mon Sep 17 00:00:00 2001 From: Sloane Hertel <19572925+s-hertel@users.noreply.github.com> Date: Mon, 10 May 2021 13:26:41 -0400 Subject: [PATCH] ansible-galaxy - increase page size and add retry decorator for throttling (#74240) * Get available collection versions with page_size=100 for v2 and limit=100 for v3 * Update unit tests for larger page sizes * Add a generic retry decorator in module_utils/api.py that accepts an Iterable of delays and a callable to determine if an exception inheriting from Exception should be retried * Use the new decorator to handle Galaxy API rate limiting * Add unit tests for new retry decorator * Preserve the decorated function's metadata with functools.wraps Co-authored-by: Matt Martz Co-authored-by: Sviatoslav Sydorenko --- lib/ansible/galaxy/api.py | 32 +++++++++-- lib/ansible/module_utils/api.py | 50 +++++++++++++++++ test/units/galaxy/test_api.py | 34 ++++++++---- test/units/module_utils/test_api.py | 85 +++++++++++++++++++++++++++-- 4 files changed, 180 insertions(+), 21 deletions(-) diff --git a/lib/ansible/galaxy/api.py b/lib/ansible/galaxy/api.py index 352082ded76..5bd37149093 100644 --- a/lib/ansible/galaxy/api.py +++ b/lib/ansible/galaxy/api.py @@ -19,9 +19,11 @@ import threading from ansible import constants as C from ansible.errors import AnsibleError from ansible.galaxy.user_agent import user_agent +from ansible.module_utils.api import retry_with_delays_and_condition +from ansible.module_utils.api import generate_jittered_backoff from ansible.module_utils.six import string_types from ansible.module_utils.six.moves.urllib.error import HTTPError -from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode, urlparse +from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode, urlparse, parse_qs, urljoin from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.module_utils.urls import open_url, prepare_multipart from ansible.utils.display import Display @@ -36,6 +38,11 @@ except ImportError: display = Display() _CACHE_LOCK = threading.Lock() +COLLECTION_PAGE_SIZE = 100 +RETRY_HTTP_ERROR_CODES = [ # TODO: Allow user-configuration + 429, # Too Many Requests + 520, # Galaxy rate limit error code (Cloudflare unknown error) +] def cache_lock(func): @@ -46,6 +53,13 @@ def cache_lock(func): return wrapped +def is_rate_limit_exception(exception): + # Note: cloud.redhat.com masks rate limit errors with 403 (Forbidden) error codes. + # Since 403 could reflect the actual problem (such as an expired token), we should + # not retry by default. + return isinstance(exception, GalaxyError) and exception.http_code in RETRY_HTTP_ERROR_CODES + + def g_connect(versions): """ Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the @@ -309,10 +323,15 @@ class GalaxyAPI: # Calling g_connect will populate self._available_api_versions return self._available_api_versions + @retry_with_delays_and_condition( + backoff_iterator=generate_jittered_backoff(retries=6, delay_base=2, delay_threshold=40), + should_retry_error=is_rate_limit_exception + ) def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None, cache=False): url_info = urlparse(url) cache_id = get_cache_id(url) + query = parse_qs(url_info.query) if cache and self._cache: server_cache = self._cache.setdefault(cache_id, {}) iso_datetime_format = '%Y-%m-%dT%H:%M:%SZ' @@ -322,7 +341,8 @@ class GalaxyAPI: expires = datetime.datetime.strptime(server_cache[url_info.path]['expires'], iso_datetime_format) valid = datetime.datetime.utcnow() < expires - if valid and not url_info.query: + is_paginated_url = 'page' in query or 'offset' in query + if valid and not is_paginated_url: # Got a hit on the cache and we aren't getting a paginated response path_cache = server_cache[url_info.path] if path_cache.get('paginated'): @@ -342,7 +362,7 @@ class GalaxyAPI: return res - elif not url_info.query: + elif not is_paginated_url: # The cache entry had expired or does not exist, start a new blank entry to be filled later. expires = datetime.datetime.utcnow() expires += datetime.timedelta(days=1) @@ -781,7 +801,8 @@ class GalaxyAPI: api_path = self.available_api_versions['v2'] pagination_path = ['next'] - versions_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions', '/') + page_size_name = 'limit' if 'v3' in self.available_api_versions else 'page_size' + versions_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions', '/?%s=%d' % (page_size_name, COLLECTION_PAGE_SIZE)) versions_url_info = urlparse(versions_url) # We should only rely on the cache if the collection has not changed. This may slow things down but it ensures @@ -838,6 +859,9 @@ class GalaxyAPI: elif relative_link: # TODO: This assumes the pagination result is relative to the root server. Will need to be verified # with someone who knows the AH API. + + # Remove the query string from the versions_url to use the next_link's query + versions_url = urljoin(versions_url, urlparse(versions_url).path) next_link = versions_url.replace(versions_url_info.path, next_link) data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'), diff --git a/lib/ansible/module_utils/api.py b/lib/ansible/module_utils/api.py index 46a036d3747..e780ec6b50f 100644 --- a/lib/ansible/module_utils/api.py +++ b/lib/ansible/module_utils/api.py @@ -26,6 +26,8 @@ The 'api' module provides the following common argument specs: from __future__ import (absolute_import, division, print_function) __metaclass__ = type +import functools +import random import sys import time @@ -114,3 +116,51 @@ def retry(retries=None, retry_pause=1): return retried return wrapper + + +def generate_jittered_backoff(retries=10, delay_base=3, delay_threshold=60): + """The "Full Jitter" backoff strategy. + + Ref: https://www.awsarchitectureblog.com/2015/03/backoff.html + + :param retries: The number of delays to generate. + :param delay_base: The base time in seconds used to calculate the exponential backoff. + :param delay_threshold: The maximum time in seconds for any delay. + """ + for retry in range(0, retries): + yield random.randint(0, min(delay_threshold, delay_base * 2 ** retry)) + + +def retry_never(exception_or_result): + return False + + +def retry_with_delays_and_condition(backoff_iterator, should_retry_error=None): + """Generic retry decorator. + + :param backoff_iterator: An iterable of delays in seconds. + :param should_retry_error: A callable that takes an exception of the decorated function and decides whether to retry or not (returns a bool). + """ + if should_retry_error is None: + should_retry_error = retry_never + + def function_wrapper(function): + @functools.wraps(function) + def run_function(*args, **kwargs): + """This assumes the function has not already been called. + If backoff_iterator is empty, we should still run the function a single time with no delay. + """ + call_retryable_function = functools.partial(function, *args, **kwargs) + + for delay in backoff_iterator: + try: + return call_retryable_function() + except Exception as e: + if not should_retry_error(e): + raise + time.sleep(delay) + + # Only or final attempt + return call_retryable_function() + return run_function + return function_wrapper diff --git a/test/units/galaxy/test_api.py b/test/units/galaxy/test_api.py index ebda706c584..8081c7924f0 100644 --- a/test/units/galaxy/test_api.py +++ b/test/units/galaxy/test_api.py @@ -791,9 +791,10 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m actual = api.get_collection_versions('namespace', 'collection') assert actual == [u'1.0.0', u'1.0.1'] + page_query = '?limit=100' if api_version == 'v3' else '?page_size=100' assert mock_open.call_count == 1 assert mock_open.mock_calls[0][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \ - 'versions/' % api_version + 'versions/%s' % (api_version, page_query) if token_ins: assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type @@ -802,9 +803,9 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m ('v2', None, None, [ { 'count': 6, - 'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2', + 'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2&page_size=100', 'previous': None, - 'results': [ + 'results': [ # Pay no mind, using more manageable results than page_size would indicate { 'version': '1.0.0', 'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.0', @@ -817,7 +818,7 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m }, { 'count': 6, - 'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=3', + 'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=3&page_size=100', 'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions', 'results': [ { @@ -833,7 +834,7 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m { 'count': 6, 'next': None, - 'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2', + 'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2&page_size=100', 'results': [ { 'version': '1.0.4', @@ -850,7 +851,8 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m { 'count': 6, 'links': { - 'next': '/api/v3/collections/namespace/collection/versions/?page=2', + # v3 links are relative and the limit is included during pagination + 'next': '/api/v3/collections/namespace/collection/versions/?limit=100&offset=100', 'previous': None, }, 'data': [ @@ -867,7 +869,7 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m { 'count': 6, 'links': { - 'next': '/api/v3/collections/namespace/collection/versions/?page=3', + 'next': '/api/v3/collections/namespace/collection/versions/?limit=100&offset=200', 'previous': '/api/v3/collections/namespace/collection/versions', }, 'data': [ @@ -885,7 +887,7 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m 'count': 6, 'links': { 'next': None, - 'previous': '/api/v3/collections/namespace/collection/versions/?page=2', + 'previous': '/api/v3/collections/namespace/collection/versions/?limit=100&offset=100', }, 'data': [ { @@ -916,12 +918,22 @@ def test_get_collection_versions_pagination(api_version, token_type, token_ins, assert actual == [u'1.0.0', u'1.0.1', u'1.0.2', u'1.0.3', u'1.0.4', u'1.0.5'] assert mock_open.call_count == 3 + + if api_version == 'v3': + query_1 = 'limit=100' + query_2 = 'limit=100&offset=100' + query_3 = 'limit=100&offset=200' + else: + query_1 = 'page_size=100' + query_2 = 'page=2&page_size=100' + query_3 = 'page=3&page_size=100' + assert mock_open.mock_calls[0][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \ - 'versions/' % api_version + 'versions/?%s' % (api_version, query_1) assert mock_open.mock_calls[1][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \ - 'versions/?page=2' % api_version + 'versions/?%s' % (api_version, query_2) assert mock_open.mock_calls[2][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \ - 'versions/?page=3' % api_version + 'versions/?%s' % (api_version, query_3) if token_type: assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type diff --git a/test/units/module_utils/test_api.py b/test/units/module_utils/test_api.py index 0eaea046598..f7e768a8be5 100644 --- a/test/units/module_utils/test_api.py +++ b/test/units/module_utils/test_api.py @@ -7,11 +7,19 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type -from ansible.module_utils.api import rate_limit, retry +from ansible.module_utils.api import rate_limit, retry, retry_with_delays_and_condition import pytest +class CustomException(Exception): + pass + + +class CustomBaseException(BaseException): + pass + + class TestRateLimit: def test_ratelimit(self): @@ -26,17 +34,16 @@ class TestRateLimit: class TestRetry: def test_no_retry_required(self): - self.counter = 0 - @retry(retries=4, retry_pause=2) def login_database(): - self.counter += 1 + login_database.counter += 1 return 'success' + login_database.counter = 0 r = login_database() assert r == 'success' - assert self.counter == 1 + assert login_database.counter == 1 def test_catch_exception(self): @@ -44,5 +51,71 @@ class TestRetry: def login_database(): return 'success' - with pytest.raises(Exception): + with pytest.raises(Exception, match="Retry"): login_database() + + def test_no_retries(self): + + @retry() + def login_database(): + assert False, "Should not execute" + + login_database() + + +class TestRetryWithDelaysAndCondition: + + def test_empty_retry_iterator(self): + @retry_with_delays_and_condition(backoff_iterator=[]) + def login_database(): + login_database.counter += 1 + + login_database.counter = 0 + r = login_database() + assert login_database.counter == 1 + + def test_no_retry_exception(self): + @retry_with_delays_and_condition( + backoff_iterator=[1], + should_retry_error=lambda x: False, + ) + def login_database(): + login_database.counter += 1 + if login_database.counter == 1: + raise CustomException("Error") + + login_database.counter = 0 + with pytest.raises(CustomException, match="Error"): + login_database() + assert login_database.counter == 1 + + def test_no_retry_baseexception(self): + @retry_with_delays_and_condition( + backoff_iterator=[1], + should_retry_error=lambda x: True, # Retry all exceptions inheriting from Exception + ) + def login_database(): + login_database.counter += 1 + if login_database.counter == 1: + # Raise an exception inheriting from BaseException + raise CustomBaseException("Error") + + login_database.counter = 0 + with pytest.raises(CustomBaseException, match="Error"): + login_database() + assert login_database.counter == 1 + + def test_retry_exception(self): + @retry_with_delays_and_condition( + backoff_iterator=[1], + should_retry_error=lambda x: isinstance(x, CustomException), + ) + def login_database(): + login_database.counter += 1 + if login_database.counter == 1: + raise CustomException("Retry") + return 'success' + + login_database.counter = 0 + assert login_database() == 'success' + assert login_database.counter == 2