ansible-galaxy - increase page size and add retry decorator for throttling (#74240)

* Get available collection versions with page_size=100 for v2 and limit=100 for v3

* Update unit tests for larger page sizes

* Add a generic retry decorator in module_utils/api.py that accepts an Iterable of delays and a callable to determine if an exception inheriting from Exception should be retried

* Use the new decorator to handle Galaxy API rate limiting

* Add unit tests for new retry decorator

* Preserve the decorated function's metadata with functools.wraps

Co-authored-by: Matt Martz <matt@sivel.net>
Co-authored-by: Sviatoslav Sydorenko <wk.cvs.github@sydorenko.org.ua>
pull/74613/head
Sloane Hertel 4 years ago committed by GitHub
parent 51fd05e76b
commit ee725846f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,9 +19,11 @@ import threading
from ansible import constants as C from ansible import constants as C
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.galaxy.user_agent import user_agent from ansible.galaxy.user_agent import user_agent
from ansible.module_utils.api import retry_with_delays_and_condition
from ansible.module_utils.api import generate_jittered_backoff
from ansible.module_utils.six import string_types from ansible.module_utils.six import string_types
from ansible.module_utils.six.moves.urllib.error import HTTPError from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode, urlparse from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode, urlparse, parse_qs, urljoin
from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.urls import open_url, prepare_multipart from ansible.module_utils.urls import open_url, prepare_multipart
from ansible.utils.display import Display from ansible.utils.display import Display
@ -36,6 +38,11 @@ except ImportError:
display = Display() display = Display()
_CACHE_LOCK = threading.Lock() _CACHE_LOCK = threading.Lock()
COLLECTION_PAGE_SIZE = 100
RETRY_HTTP_ERROR_CODES = [ # TODO: Allow user-configuration
429, # Too Many Requests
520, # Galaxy rate limit error code (Cloudflare unknown error)
]
def cache_lock(func): def cache_lock(func):
@ -46,6 +53,13 @@ def cache_lock(func):
return wrapped return wrapped
def is_rate_limit_exception(exception):
# Note: cloud.redhat.com masks rate limit errors with 403 (Forbidden) error codes.
# Since 403 could reflect the actual problem (such as an expired token), we should
# not retry by default.
return isinstance(exception, GalaxyError) and exception.http_code in RETRY_HTTP_ERROR_CODES
def g_connect(versions): def g_connect(versions):
""" """
Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the
@ -309,10 +323,15 @@ class GalaxyAPI:
# Calling g_connect will populate self._available_api_versions # Calling g_connect will populate self._available_api_versions
return self._available_api_versions return self._available_api_versions
@retry_with_delays_and_condition(
backoff_iterator=generate_jittered_backoff(retries=6, delay_base=2, delay_threshold=40),
should_retry_error=is_rate_limit_exception
)
def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None, def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None,
cache=False): cache=False):
url_info = urlparse(url) url_info = urlparse(url)
cache_id = get_cache_id(url) cache_id = get_cache_id(url)
query = parse_qs(url_info.query)
if cache and self._cache: if cache and self._cache:
server_cache = self._cache.setdefault(cache_id, {}) server_cache = self._cache.setdefault(cache_id, {})
iso_datetime_format = '%Y-%m-%dT%H:%M:%SZ' iso_datetime_format = '%Y-%m-%dT%H:%M:%SZ'
@ -322,7 +341,8 @@ class GalaxyAPI:
expires = datetime.datetime.strptime(server_cache[url_info.path]['expires'], iso_datetime_format) expires = datetime.datetime.strptime(server_cache[url_info.path]['expires'], iso_datetime_format)
valid = datetime.datetime.utcnow() < expires valid = datetime.datetime.utcnow() < expires
if valid and not url_info.query: is_paginated_url = 'page' in query or 'offset' in query
if valid and not is_paginated_url:
# Got a hit on the cache and we aren't getting a paginated response # Got a hit on the cache and we aren't getting a paginated response
path_cache = server_cache[url_info.path] path_cache = server_cache[url_info.path]
if path_cache.get('paginated'): if path_cache.get('paginated'):
@ -342,7 +362,7 @@ class GalaxyAPI:
return res return res
elif not url_info.query: elif not is_paginated_url:
# The cache entry had expired or does not exist, start a new blank entry to be filled later. # The cache entry had expired or does not exist, start a new blank entry to be filled later.
expires = datetime.datetime.utcnow() expires = datetime.datetime.utcnow()
expires += datetime.timedelta(days=1) expires += datetime.timedelta(days=1)
@ -781,7 +801,8 @@ class GalaxyAPI:
api_path = self.available_api_versions['v2'] api_path = self.available_api_versions['v2']
pagination_path = ['next'] pagination_path = ['next']
versions_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions', '/') page_size_name = 'limit' if 'v3' in self.available_api_versions else 'page_size'
versions_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions', '/?%s=%d' % (page_size_name, COLLECTION_PAGE_SIZE))
versions_url_info = urlparse(versions_url) versions_url_info = urlparse(versions_url)
# We should only rely on the cache if the collection has not changed. This may slow things down but it ensures # We should only rely on the cache if the collection has not changed. This may slow things down but it ensures
@ -838,6 +859,9 @@ class GalaxyAPI:
elif relative_link: elif relative_link:
# TODO: This assumes the pagination result is relative to the root server. Will need to be verified # TODO: This assumes the pagination result is relative to the root server. Will need to be verified
# with someone who knows the AH API. # with someone who knows the AH API.
# Remove the query string from the versions_url to use the next_link's query
versions_url = urljoin(versions_url, urlparse(versions_url).path)
next_link = versions_url.replace(versions_url_info.path, next_link) next_link = versions_url.replace(versions_url_info.path, next_link)
data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'), data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'),

@ -26,6 +26,8 @@ The 'api' module provides the following common argument specs:
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import functools
import random
import sys import sys
import time import time
@ -114,3 +116,51 @@ def retry(retries=None, retry_pause=1):
return retried return retried
return wrapper return wrapper
def generate_jittered_backoff(retries=10, delay_base=3, delay_threshold=60):
"""The "Full Jitter" backoff strategy.
Ref: https://www.awsarchitectureblog.com/2015/03/backoff.html
:param retries: The number of delays to generate.
:param delay_base: The base time in seconds used to calculate the exponential backoff.
:param delay_threshold: The maximum time in seconds for any delay.
"""
for retry in range(0, retries):
yield random.randint(0, min(delay_threshold, delay_base * 2 ** retry))
def retry_never(exception_or_result):
return False
def retry_with_delays_and_condition(backoff_iterator, should_retry_error=None):
"""Generic retry decorator.
:param backoff_iterator: An iterable of delays in seconds.
:param should_retry_error: A callable that takes an exception of the decorated function and decides whether to retry or not (returns a bool).
"""
if should_retry_error is None:
should_retry_error = retry_never
def function_wrapper(function):
@functools.wraps(function)
def run_function(*args, **kwargs):
"""This assumes the function has not already been called.
If backoff_iterator is empty, we should still run the function a single time with no delay.
"""
call_retryable_function = functools.partial(function, *args, **kwargs)
for delay in backoff_iterator:
try:
return call_retryable_function()
except Exception as e:
if not should_retry_error(e):
raise
time.sleep(delay)
# Only or final attempt
return call_retryable_function()
return run_function
return function_wrapper

@ -791,9 +791,10 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m
actual = api.get_collection_versions('namespace', 'collection') actual = api.get_collection_versions('namespace', 'collection')
assert actual == [u'1.0.0', u'1.0.1'] assert actual == [u'1.0.0', u'1.0.1']
page_query = '?limit=100' if api_version == 'v3' else '?page_size=100'
assert mock_open.call_count == 1 assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \ assert mock_open.mock_calls[0][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \
'versions/' % api_version 'versions/%s' % (api_version, page_query)
if token_ins: if token_ins:
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
@ -802,9 +803,9 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m
('v2', None, None, [ ('v2', None, None, [
{ {
'count': 6, 'count': 6,
'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2', 'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2&page_size=100',
'previous': None, 'previous': None,
'results': [ 'results': [ # Pay no mind, using more manageable results than page_size would indicate
{ {
'version': '1.0.0', 'version': '1.0.0',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.0', 'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.0',
@ -817,7 +818,7 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m
}, },
{ {
'count': 6, 'count': 6,
'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=3', 'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=3&page_size=100',
'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions', 'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions',
'results': [ 'results': [
{ {
@ -833,7 +834,7 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m
{ {
'count': 6, 'count': 6,
'next': None, 'next': None,
'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2', 'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2&page_size=100',
'results': [ 'results': [
{ {
'version': '1.0.4', 'version': '1.0.4',
@ -850,7 +851,8 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m
{ {
'count': 6, 'count': 6,
'links': { 'links': {
'next': '/api/v3/collections/namespace/collection/versions/?page=2', # v3 links are relative and the limit is included during pagination
'next': '/api/v3/collections/namespace/collection/versions/?limit=100&offset=100',
'previous': None, 'previous': None,
}, },
'data': [ 'data': [
@ -867,7 +869,7 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m
{ {
'count': 6, 'count': 6,
'links': { 'links': {
'next': '/api/v3/collections/namespace/collection/versions/?page=3', 'next': '/api/v3/collections/namespace/collection/versions/?limit=100&offset=200',
'previous': '/api/v3/collections/namespace/collection/versions', 'previous': '/api/v3/collections/namespace/collection/versions',
}, },
'data': [ 'data': [
@ -885,7 +887,7 @@ def test_get_collection_versions(api_version, token_type, token_ins, response, m
'count': 6, 'count': 6,
'links': { 'links': {
'next': None, 'next': None,
'previous': '/api/v3/collections/namespace/collection/versions/?page=2', 'previous': '/api/v3/collections/namespace/collection/versions/?limit=100&offset=100',
}, },
'data': [ 'data': [
{ {
@ -916,12 +918,22 @@ def test_get_collection_versions_pagination(api_version, token_type, token_ins,
assert actual == [u'1.0.0', u'1.0.1', u'1.0.2', u'1.0.3', u'1.0.4', u'1.0.5'] assert actual == [u'1.0.0', u'1.0.1', u'1.0.2', u'1.0.3', u'1.0.4', u'1.0.5']
assert mock_open.call_count == 3 assert mock_open.call_count == 3
if api_version == 'v3':
query_1 = 'limit=100'
query_2 = 'limit=100&offset=100'
query_3 = 'limit=100&offset=200'
else:
query_1 = 'page_size=100'
query_2 = 'page=2&page_size=100'
query_3 = 'page=3&page_size=100'
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \ assert mock_open.mock_calls[0][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \
'versions/' % api_version 'versions/?%s' % (api_version, query_1)
assert mock_open.mock_calls[1][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \ assert mock_open.mock_calls[1][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \
'versions/?page=2' % api_version 'versions/?%s' % (api_version, query_2)
assert mock_open.mock_calls[2][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \ assert mock_open.mock_calls[2][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \
'versions/?page=3' % api_version 'versions/?%s' % (api_version, query_3)
if token_type: if token_type:
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type

@ -7,11 +7,19 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
from ansible.module_utils.api import rate_limit, retry from ansible.module_utils.api import rate_limit, retry, retry_with_delays_and_condition
import pytest import pytest
class CustomException(Exception):
pass
class CustomBaseException(BaseException):
pass
class TestRateLimit: class TestRateLimit:
def test_ratelimit(self): def test_ratelimit(self):
@ -26,17 +34,16 @@ class TestRateLimit:
class TestRetry: class TestRetry:
def test_no_retry_required(self): def test_no_retry_required(self):
self.counter = 0
@retry(retries=4, retry_pause=2) @retry(retries=4, retry_pause=2)
def login_database(): def login_database():
self.counter += 1 login_database.counter += 1
return 'success' return 'success'
login_database.counter = 0
r = login_database() r = login_database()
assert r == 'success' assert r == 'success'
assert self.counter == 1 assert login_database.counter == 1
def test_catch_exception(self): def test_catch_exception(self):
@ -44,5 +51,71 @@ class TestRetry:
def login_database(): def login_database():
return 'success' return 'success'
with pytest.raises(Exception): with pytest.raises(Exception, match="Retry"):
login_database()
def test_no_retries(self):
@retry()
def login_database():
assert False, "Should not execute"
login_database()
class TestRetryWithDelaysAndCondition:
def test_empty_retry_iterator(self):
@retry_with_delays_and_condition(backoff_iterator=[])
def login_database():
login_database.counter += 1
login_database.counter = 0
r = login_database()
assert login_database.counter == 1
def test_no_retry_exception(self):
@retry_with_delays_and_condition(
backoff_iterator=[1],
should_retry_error=lambda x: False,
)
def login_database():
login_database.counter += 1
if login_database.counter == 1:
raise CustomException("Error")
login_database.counter = 0
with pytest.raises(CustomException, match="Error"):
login_database()
assert login_database.counter == 1
def test_no_retry_baseexception(self):
@retry_with_delays_and_condition(
backoff_iterator=[1],
should_retry_error=lambda x: True, # Retry all exceptions inheriting from Exception
)
def login_database():
login_database.counter += 1
if login_database.counter == 1:
# Raise an exception inheriting from BaseException
raise CustomBaseException("Error")
login_database.counter = 0
with pytest.raises(CustomBaseException, match="Error"):
login_database() login_database()
assert login_database.counter == 1
def test_retry_exception(self):
@retry_with_delays_and_condition(
backoff_iterator=[1],
should_retry_error=lambda x: isinstance(x, CustomException),
)
def login_database():
login_database.counter += 1
if login_database.counter == 1:
raise CustomException("Retry")
return 'success'
login_database.counter = 0
assert login_database() == 'success'
assert login_database.counter == 2

Loading…
Cancel
Save