Refactor galaxy collection API for v3 support (#61510)

* Refactor galaxy collection API for v3 support

* Added unit tests for GalaxyAPI and starting to fix other failures

* finalise tests

* more unit test fixes
pull/61585/head
Jordan Borean 5 years ago committed by GitHub
parent 82c1becd24
commit a7fd6e99d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,29 +1,16 @@
########################################################################
#
# (C) 2013, James Cammarata <jcammarata@ansible.com> # (C) 2013, James Cammarata <jcammarata@ansible.com>
# # Copyright: (c) 2019, Ansible Project
# This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import base64 import base64
import json import json
import os
import tarfile
import uuid
import time
from ansible import context from ansible import context
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
@ -33,125 +20,210 @@ from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlen
from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.urls import open_url from ansible.module_utils.urls import open_url
from ansible.utils.display import Display from ansible.utils.display import Display
from ansible.utils.hashing import secure_hash_s
display = Display() display = Display()
def g_connect(method): def g_connect(versions):
''' wrapper to lazily initialize connection info to galaxy ''' """
def wrapped(self, *args, **kwargs): Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the
if not self.initialized: endpoint.
display.vvvv("Initial connection to galaxy_server: %s" % self.api_server)
server_version = self._get_server_api_version() :param versions: A list of API versions that the function supports.
"""
def decorator(method):
def wrapped(self, *args, **kwargs):
if not self._available_api_versions:
display.vvvv("Initial connection to galaxy_server: %s" % self.api_server)
# Determine the type of Galaxy server we are talking to. First try it unauthenticated then with Bearer
# auth for Automation Hub.
n_url = _urljoin(self.api_server, 'api')
error_context_msg = 'Error when finding available api versions from %s (%s)' % (self.name, n_url)
try:
data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg)
except GalaxyError as e:
if e.http_code != 401:
raise
# Assume this is v3 (Automation Hub) and auth is required
headers = {}
self._add_auth_token(headers, n_url, token_type='Bearer', required=True)
data = self._call_galaxy(n_url, headers=headers, method='GET', error_context_msg=error_context_msg)
# Default to only supporting v1, if only v1 is returned we also assume that v2 is available even though
# it isn't returned in the available_versions dict.
available_versions = data.get('available_versions', {u'v1': u'/api/v1'})
if list(available_versions.keys()) == [u'v1']:
available_versions[u'v2'] = u'/api/v2'
self._available_api_versions = available_versions
display.vvvv("Found API version '%s' with Galaxy server %s (%s)"
% (', '.join(available_versions.keys()), self.name, self.api_server))
# Verify that the API versions the function works with are available on the server specified.
available_versions = set(self._available_api_versions.keys())
common_versions = set(versions).intersection(available_versions)
if not common_versions:
raise AnsibleError("Galaxy action %s requires API versions '%s' but only '%s' are available on %s %s"
% (method.__name__, ", ".join(versions), ", ".join(available_versions),
self.name, self.api_server))
return method(self, *args, **kwargs)
return wrapped
return decorator
if server_version not in self.SUPPORTED_VERSIONS:
raise AnsibleError("Unsupported Galaxy server API version: %s" % server_version)
self.baseurl = _urljoin(self.api_server, "api", server_version) def _urljoin(*args):
return '/'.join(to_native(a, errors='surrogate_or_strict').strip('/') for a in args + ('',) if a)
self.version = server_version # for future use
display.vvvv("Base API: %s" % self.baseurl) class GalaxyError(AnsibleError):
self.initialized = True """ Error for bad Galaxy server responses. """
return method(self, *args, **kwargs)
return wrapped
def __init__(self, http_error, message):
super(GalaxyError, self).__init__(message)
self.http_code = http_error.code
self.url = http_error.geturl()
def _urljoin(*args): try:
return '/'.join(to_native(a, errors='surrogate_or_strict').rstrip('/') for a in args + ('',)) http_msg = to_text(http_error.read())
err_info = json.loads(http_msg)
except (AttributeError, ValueError):
err_info = {}
url_split = self.url.split('/')
if 'v2' in url_split:
galaxy_msg = err_info.get('message', 'Unknown error returned by Galaxy server.')
code = err_info.get('code', 'Unknown')
full_error_msg = u"%s (HTTP Code: %d, Message: %s Code: %s)" % (message, self.http_code, galaxy_msg, code)
elif 'v3' in url_split:
errors = err_info.get('errors', [])
if not errors:
errors = [{}] # Defaults are set below, we just need to make sure 1 error is present.
message_lines = []
for error in errors:
error_msg = error.get('detail') or error.get('title') or 'Unknown error returned by Galaxy server.'
error_code = error.get('code') or 'Unknown'
message_line = u"(HTTP Code: %d, Message: %s Code: %s)" % (self.http_code, error_msg, error_code)
message_lines.append(message_line)
full_error_msg = "%s %s" % (message, ', '.join(message_lines))
else:
# v1 and unknown API endpoints
galaxy_msg = err_info.get('default', 'Unknown error returned by Galaxy server.')
full_error_msg = u"%s (HTTP Code: %d, Message: %s)" % (message, self.http_code, galaxy_msg)
self.message = to_native(full_error_msg)
class GalaxyAPI(object): class CollectionVersionMetadata:
''' This class is meant to be used as a API client for an Ansible Galaxy server '''
def __init__(self, namespace, name, version, download_url, artifact_sha256, dependencies):
"""
Contains common information about a collection on a Galaxy server to smooth through API differences for
Collection and define a standard meta info for a collection.
:param namespace: The namespace name.
:param name: The collection name.
:param version: The version that the metadata refers to.
:param download_url: The URL to download the collection.
:param artifact_sha256: The SHA256 of the collection artifact for later verification.
:param dependencies: A dict of dependencies of the collection.
"""
self.namespace = namespace
self.name = name
self.version = version
self.download_url = download_url
self.artifact_sha256 = artifact_sha256
self.dependencies = dependencies
SUPPORTED_VERSIONS = ['v1']
def __init__(self, galaxy, name, url, username=None, password=None, token=None, token_type=None): class GalaxyAPI:
""" This class is meant to be used as a API client for an Ansible Galaxy server """
def __init__(self, galaxy, name, url, username=None, password=None, token=None):
self.galaxy = galaxy self.galaxy = galaxy
self.name = name self.name = name
self.username = username self.username = username
self.password = password self.password = password
self.token = token self.token = token
self.token_type = token_type or 'Token'
self.api_server = url self.api_server = url
self.validate_certs = not context.CLIARGS['ignore_certs'] self.validate_certs = not context.CLIARGS['ignore_certs']
self.baseurl = None self._available_api_versions = {}
self.version = None
self.initialized = False
self.available_api_versions = {}
display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs)) display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs))
def _auth_header(self, required=True, token_type=None): @property
'''Generate the Authorization header. @g_connect(['v1', 'v2', 'v3'])
def available_api_versions(self):
# Calling g_connect will populate self._available_api_versions
return self._available_api_versions
def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None):
headers = headers or {}
self._add_auth_token(headers, url, required=auth_required)
try:
display.vvvv("Calling Galaxy at %s" % url)
resp = open_url(to_native(url), data=args, validate_certs=self.validate_certs, headers=headers,
method=method, timeout=20, unredirected_headers=['Authorization'])
except HTTPError as e:
raise GalaxyError(e, error_context_msg)
except Exception as e:
raise AnsibleError("Unknown error when attempting to call Galaxy at '%s': %s" % (url, to_native(e)))
resp_data = to_text(resp.read(), errors='surrogate_or_strict')
try:
data = json.loads(resp_data)
except ValueError:
raise AnsibleError("Failed to parse Galaxy response from '%s' as JSON:\n%s"
% (resp.url, to_native(resp_data)))
return data
def _add_auth_token(self, headers, url, token_type=None, required=False):
# Don't add the auth token if one is already present
if 'Authorization' in headers:
return
Valid token_type values are 'Token' (galaxy v2) and 'Bearer' (galaxy v3)'''
token = self.token.get() if self.token else None token = self.token.get() if self.token else None
# 'Token' for v2 api, 'Bearer' for v3 # 'Token' for v2 api, 'Bearer' for v3 but still allow someone to override the token if necessary.
token_type = token_type or self.token_type is_v3 = 'v3' in url.split('/')
token_type = token_type or ('Bearer' if is_v3 else 'Token')
if token: if token:
return {'Authorization': "%s %s" % (token_type, token)} headers['Authorization'] = '%s %s' % (token_type, token)
elif self.username: elif self.username:
token = "%s:%s" % (to_text(self.username, errors='surrogate_or_strict'), token = "%s:%s" % (to_text(self.username, errors='surrogate_or_strict'),
to_text(self.password, errors='surrogate_or_strict', nonstring='passthru') or '') to_text(self.password, errors='surrogate_or_strict', nonstring='passthru') or '')
b64_val = base64.b64encode(to_bytes(token, encoding='utf-8', errors='surrogate_or_strict')) b64_val = base64.b64encode(to_bytes(token, encoding='utf-8', errors='surrogate_or_strict'))
return {'Authorization': "Basic %s" % to_text(b64_val)} headers['Authorization'] = 'Basic %s' % to_text(b64_val)
elif required: elif required:
raise AnsibleError("No access token or username set. A token can be set with --api-key, with " raise AnsibleError("No access token or username set. A token can be set with --api-key, with "
"'ansible-galaxy login', or set in ansible.cfg.") "'ansible-galaxy login', or set in ansible.cfg.")
else:
return {}
@g_connect
def __call_galaxy(self, url, args=None, headers=None, method=None):
if args and not headers:
headers = self._auth_header()
try:
display.vvv(url)
resp = open_url(url, data=args, validate_certs=self.validate_certs, headers=headers, method=method,
timeout=20)
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
except HTTPError as e:
res = json.loads(to_text(e.fp.read(), errors='surrogate_or_strict'))
raise AnsibleError(res['detail'])
return data
def _get_server_api_version(self):
"""
Fetches the Galaxy API current version to ensure
the API server is up and reachable.
"""
url = _urljoin(self.api_server, "api")
try:
return_data = open_url(url, validate_certs=self.validate_certs)
except Exception as e:
raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e)))
try:
data = json.loads(to_text(return_data.read(), errors='surrogate_or_strict'))
except Exception as e:
raise AnsibleError("Could not process data from the API server (%s): %s " % (url, to_native(e)))
return data['current_version'] @g_connect(['v1'])
@g_connect
def authenticate(self, github_token): def authenticate(self, github_token):
""" """
Retrieve an authentication token Retrieve an authentication token
""" """
url = _urljoin(self.baseurl, "tokens") url = _urljoin(self.api_server, self.available_api_versions['v1'], "tokens") + '/'
args = urlencode({"github_token": github_token}) args = urlencode({"github_token": github_token})
resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST") resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST")
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict')) data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
return data return data
@g_connect @g_connect(['v1'])
def create_import_task(self, github_user, github_repo, reference=None, role_name=None): def create_import_task(self, github_user, github_repo, reference=None, role_name=None):
""" """
Post an import request Post an import request
""" """
url = _urljoin(self.baseurl, "imports") url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports") + '/'
args = { args = {
"github_user": github_user, "github_user": github_user,
"github_repo": github_repo, "github_repo": github_repo,
@ -161,17 +233,17 @@ class GalaxyAPI(object):
args['alternate_role_name'] = role_name args['alternate_role_name'] = role_name
elif github_repo.startswith('ansible-role'): elif github_repo.startswith('ansible-role'):
args['alternate_role_name'] = github_repo[len('ansible-role') + 1:] args['alternate_role_name'] = github_repo[len('ansible-role') + 1:]
data = self.__call_galaxy(url, args=urlencode(args), method="POST") data = self._call_galaxy(url, args=urlencode(args), method="POST")
if data.get('results', None): if data.get('results', None):
return data['results'] return data['results']
return data return data
@g_connect @g_connect(['v1'])
def get_import_task(self, task_id=None, github_user=None, github_repo=None): def get_import_task(self, task_id=None, github_user=None, github_repo=None):
""" """
Check the status of an import task. Check the status of an import task.
""" """
url = _urljoin(self.baseurl, "imports") url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports")
if task_id is not None: if task_id is not None:
url = "%s?id=%d" % (url, task_id) url = "%s?id=%d" % (url, task_id)
elif github_user is not None and github_repo is not None: elif github_user is not None and github_repo is not None:
@ -179,10 +251,10 @@ class GalaxyAPI(object):
else: else:
raise AnsibleError("Expected task_id or github_user and github_repo") raise AnsibleError("Expected task_id or github_user and github_repo")
data = self.__call_galaxy(url) data = self._call_galaxy(url)
return data['results'] return data['results']
@g_connect @g_connect(['v1'])
def lookup_role_by_name(self, role_name, notify=True): def lookup_role_by_name(self, role_name, notify=True):
""" """
Find a role by name. Find a role by name.
@ -198,13 +270,14 @@ class GalaxyAPI(object):
except Exception: except Exception:
raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name) raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
url = _urljoin(self.baseurl, "roles", "?owner__username=%s&name=%s" % (user_name, role_name))[:-1] url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles",
data = self.__call_galaxy(url) "?owner__username=%s&name=%s" % (user_name, role_name))[:-1]
data = self._call_galaxy(url)
if len(data["results"]) != 0: if len(data["results"]) != 0:
return data["results"][0] return data["results"][0]
return None return None
@g_connect @g_connect(['v1'])
def fetch_role_related(self, related, role_id): def fetch_role_related(self, related, role_id):
""" """
Fetch the list of related items for the given role. Fetch the list of related items for the given role.
@ -213,27 +286,29 @@ class GalaxyAPI(object):
results = [] results = []
try: try:
url = _urljoin(self.baseurl, "roles", role_id, related, "?page_size=50")[:-1] url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles", role_id, related,
data = self.__call_galaxy(url) "?page_size=50")[:-1]
data = self._call_galaxy(url)
results = data['results'] results = data['results']
done = (data.get('next_link', None) is None) done = (data.get('next_link', None) is None)
while not done: while not done:
url = _urljoin(self.api_server, data['next_link']) url = _urljoin(self.api_server, data['next_link'])
data = self.__call_galaxy(url) data = self._call_galaxy(url)
results += data['results'] results += data['results']
done = (data.get('next_link', None) is None) done = (data.get('next_link', None) is None)
except Exception as e: except Exception as e:
display.vvvv("Unable to retrive role (id=%s) data (%s), but this is not fatal so we continue: %s" % (role_id, related, to_text(e))) display.vvvv("Unable to retrive role (id=%s) data (%s), but this is not fatal so we continue: %s"
% (role_id, related, to_text(e)))
return results return results
@g_connect @g_connect(['v1'])
def get_list(self, what): def get_list(self, what):
""" """
Fetch the list of items specified. Fetch the list of items specified.
""" """
try: try:
url = _urljoin(self.baseurl, what, "?page_size")[:-1] url = _urljoin(self.api_server, self.available_api_versions['v1'], what, "?page_size")[:-1]
data = self.__call_galaxy(url) data = self._call_galaxy(url)
if "results" in data: if "results" in data:
results = data['results'] results = data['results']
else: else:
@ -243,17 +318,17 @@ class GalaxyAPI(object):
done = (data.get('next_link', None) is None) done = (data.get('next_link', None) is None)
while not done: while not done:
url = _urljoin(self.api_server, data['next_link']) url = _urljoin(self.api_server, data['next_link'])
data = self.__call_galaxy(url) data = self._call_galaxy(url)
results += data['results'] results += data['results']
done = (data.get('next_link', None) is None) done = (data.get('next_link', None) is None)
return results return results
except Exception as error: except Exception as error:
raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error))) raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error)))
@g_connect @g_connect(['v1'])
def search_roles(self, search, **kwargs): def search_roles(self, search, **kwargs):
search_url = _urljoin(self.baseurl, "search", "roles", "?")[:-1] search_url = _urljoin(self.api_server, self.available_api_versions['v1'], "search", "roles", "?")[:-1]
if search: if search:
search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search))) search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search)))
@ -277,35 +352,202 @@ class GalaxyAPI(object):
if author: if author:
search_url += '&username_autocomplete=%s' % author search_url += '&username_autocomplete=%s' % author
data = self.__call_galaxy(search_url) data = self._call_galaxy(search_url)
return data return data
@g_connect @g_connect(['v1'])
def add_secret(self, source, github_user, github_repo, secret): def add_secret(self, source, github_user, github_repo, secret):
url = _urljoin(self.baseurl, "notification_secrets") url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets") + '/'
args = urlencode({ args = urlencode({
"source": source, "source": source,
"github_user": github_user, "github_user": github_user,
"github_repo": github_repo, "github_repo": github_repo,
"secret": secret "secret": secret
}) })
data = self.__call_galaxy(url, args=args, method="POST") data = self._call_galaxy(url, args=args, method="POST")
return data return data
@g_connect @g_connect(['v1'])
def list_secrets(self): def list_secrets(self):
url = _urljoin(self.baseurl, "notification_secrets") url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets")
data = self.__call_galaxy(url, headers=self._auth_header()) data = self._call_galaxy(url, auth_required=True)
return data return data
@g_connect @g_connect(['v1'])
def remove_secret(self, secret_id): def remove_secret(self, secret_id):
url = _urljoin(self.baseurl, "notification_secrets", secret_id) url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets", secret_id) + '/'
data = self.__call_galaxy(url, headers=self._auth_header(), method='DELETE') data = self._call_galaxy(url, auth_required=True, method='DELETE')
return data return data
@g_connect @g_connect(['v1'])
def delete_role(self, github_user, github_repo): def delete_role(self, github_user, github_repo):
url = _urljoin(self.baseurl, "removerole", "?github_user=%s&github_repo=%s" % (github_user, github_repo))[:-1] url = _urljoin(self.api_server, self.available_api_versions['v1'], "removerole",
data = self.__call_galaxy(url, headers=self._auth_header(), method='DELETE') "?github_user=%s&github_repo=%s" % (github_user, github_repo))[:-1]
data = self._call_galaxy(url, auth_required=True, method='DELETE')
return data return data
# Collection APIs #
@g_connect(['v2', 'v3'])
def publish_collection(self, collection_path):
"""
Publishes a collection to a Galaxy server and returns the import task URI.
:param collection_path: The path to the collection tarball to publish.
:return: The import task URI that contains the import results.
"""
display.display("Publishing collection artifact '%s' to %s %s" % (collection_path, self.name, self.api_server))
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
if not os.path.exists(b_collection_path):
raise AnsibleError("The collection path specified '%s' does not exist." % to_native(collection_path))
elif not tarfile.is_tarfile(b_collection_path):
raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection "
"build' to create a proper release artifact." % to_native(collection_path))
with open(b_collection_path, 'rb') as collection_tar:
data = collection_tar.read()
boundary = '--------------------------%s' % uuid.uuid4().hex
b_file_name = os.path.basename(b_collection_path)
part_boundary = b"--" + to_bytes(boundary, errors='surrogate_or_strict')
form = [
part_boundary,
b"Content-Disposition: form-data; name=\"sha256\"",
to_bytes(secure_hash_s(data), errors='surrogate_or_strict'),
part_boundary,
b"Content-Disposition: file; name=\"file\"; filename=\"%s\"" % b_file_name,
b"Content-Type: application/octet-stream",
b"",
data,
b"%s--" % part_boundary,
]
data = b"\r\n".join(form)
headers = {
'Content-type': 'multipart/form-data; boundary=%s' % boundary,
'Content-length': len(data),
}
if 'v3' in self.available_api_versions:
n_url = _urljoin(self.api_server, self.available_api_versions['v3'], 'artifacts', 'collections') + '/'
else:
n_url = _urljoin(self.api_server, self.available_api_versions['v2'], 'collections') + '/'
resp = self._call_galaxy(n_url, args=data, headers=headers, method='POST', auth_required=True,
error_context_msg='Error when publishing collection to %s (%s)'
% (self.name, self.api_server))
return resp['task']
@g_connect(['v2', 'v3'])
def wait_import_task(self, task_url, timeout=0):
"""
Waits until the import process on the Galaxy server has completed or the timeout is reached.
:param task_url: The full URI of the import task to wait for, this is returned by publish_collection.
:param timeout: The timeout in seconds, 0 is no timeout.
"""
# TODO: actually verify that v3 returns the same structure as v2, right now this is just an assumption.
state = 'waiting'
data = None
display.display("Waiting until Galaxy import task %s has completed" % task_url)
start = time.time()
wait = 2
while timeout == 0 or (time.time() - start) < timeout:
data = self._call_galaxy(task_url, method='GET', auth_required=True,
error_context_msg='Error when getting import task results at %s' % task_url)
state = data.get('state', 'waiting')
if data.get('finished_at', None):
break
display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again'
% (state, wait))
time.sleep(wait)
# poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds.
wait = min(30, wait * 1.5)
if state == 'waiting':
raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'"
% to_native(task_url))
for message in data.get('messages', []):
level = message['level']
if level == 'error':
display.error("Galaxy import error message: %s" % message['message'])
elif level == 'warning':
display.warning("Galaxy import warning message: %s" % message['message'])
else:
display.vvv("Galaxy import message: %s - %s" % (level, message['message']))
if state == 'failed':
code = to_native(data['error'].get('code', 'UNKNOWN'))
description = to_native(
data['error'].get('description', "Unknown error, see %s for more details" % task_url))
raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code))
@g_connect(['v2', 'v3'])
def get_collection_version_metadata(self, namespace, name, version):
"""
Gets the collection information from the Galaxy server about a specific Collection version.
:param namespace: The collection namespace.
:param name: The collection name.
:param version: Optional version of the collection to get the information for.
:return: CollectionVersionMetadata about the collection at the version requested.
"""
api_path = self.available_api_versions.get('v3', self.available_api_versions.get('v2'))
url_paths = [self.api_server, api_path, 'collections', namespace, name, 'versions', version]
n_collection_url = _urljoin(*url_paths)
error_context_msg = 'Error when getting collection version metadata for %s.%s:%s from %s (%s)' \
% (namespace, name, version, self.name, self.api_server)
data = self._call_galaxy(n_collection_url, error_context_msg=error_context_msg)
return CollectionVersionMetadata(data['namespace']['name'], data['collection']['name'], data['version'],
data['download_url'], data['artifact']['sha256'],
data['metadata']['dependencies'])
@g_connect(['v2', 'v3'])
def get_collection_versions(self, namespace, name):
"""
Gets a list of available versions for a collection on a Galaxy server.
:param namespace: The collection namespace.
:param name: The collection name.
:return: A list of versions that are available.
"""
if 'v3' in self.available_api_versions:
api_path = self.available_api_versions['v3']
results_key = 'data'
pagination_path = ['links', 'next']
else:
api_path = self.available_api_versions['v2']
results_key = 'results'
pagination_path = ['next']
n_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions')
error_context_msg = 'Error when getting available collection versions for %s.%s from %s (%s)' \
% (namespace, name, self.name, self.api_server)
data = self._call_galaxy(n_url, error_context_msg=error_context_msg)
versions = []
while True:
versions += [v['version'] for v in data[results_key]]
next_link = data
for path in pagination_path:
next_link = next_link.get(path, {})
if not next_link:
break
data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'),
error_context_msg=error_context_msg)
return versions

@ -13,7 +13,6 @@ import tarfile
import tempfile import tempfile
import threading import threading
import time import time
import uuid
import yaml import yaml
from contextlib import contextmanager from contextlib import contextmanager
@ -30,9 +29,9 @@ except ImportError:
import ansible.constants as C import ansible.constants as C
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.galaxy import get_collections_galaxy_meta_info from ansible.galaxy import get_collections_galaxy_meta_info
from ansible.galaxy.api import _urljoin from ansible.galaxy.api import CollectionVersionMetadata, GalaxyError
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils import six from ansible.module_utils import six
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.utils.collection_loader import AnsibleCollectionRef from ansible.utils.collection_loader import AnsibleCollectionRef
from ansible.utils.display import Display from ansible.utils.display import Display
from ansible.utils.hashing import secure_hash, secure_hash_s from ansible.utils.hashing import secure_hash, secure_hash_s
@ -65,7 +64,8 @@ class CollectionRequirement:
:param requirement: The version requirement string used to verify the list of versions fit the requirements. :param requirement: The version requirement string used to verify the list of versions fit the requirements.
:param force: Whether the force flag applied to the collection. :param force: Whether the force flag applied to the collection.
:param parent: The name of the parent the collection is a dependency of. :param parent: The name of the parent the collection is a dependency of.
:param metadata: The collection metadata dict if it has already been retrieved. :param metadata: The galaxy.api.CollectionVersionMetadata that has already been retrieved from the Galaxy
server.
:param files: The files that exist inside the collection. This is based on the FILES.json file inside the :param files: The files that exist inside the collection. This is based on the FILES.json file inside the
collection artifact. collection artifact.
:param skip: Whether to skip installing the collection. Should be set if the collection is already installed :param skip: Whether to skip installing the collection. Should be set if the collection is already installed
@ -82,7 +82,6 @@ class CollectionRequirement:
self._metadata = metadata self._metadata = metadata
self._files = files self._files = files
self._galaxy_info = None
self.add_requirement(parent, requirement) self.add_requirement(parent, requirement)
@ -102,12 +101,12 @@ class CollectionRequirement:
@property @property
def dependencies(self): def dependencies(self):
if self._metadata: if self._metadata:
return self._metadata['dependencies'] return self._metadata.dependencies
elif len(self.versions) > 1: elif len(self.versions) > 1:
return None return None
self._get_metadata() self._get_metadata()
return self._metadata['dependencies'] return self._metadata.dependencies
def add_requirement(self, parent, requirement): def add_requirement(self, parent, requirement):
self.required_by.append((parent, requirement)) self.required_by.append((parent, requirement))
@ -150,9 +149,10 @@ class CollectionRequirement:
display.display("Installing '%s:%s' to '%s'" % (to_text(self), self.latest_version, collection_path)) display.display("Installing '%s:%s' to '%s'" % (to_text(self), self.latest_version, collection_path))
if self.b_path is None: if self.b_path is None:
download_url = self._galaxy_info['download_url'] download_url = self._metadata.download_url
artifact_hash = self._galaxy_info['artifact']['sha256'] artifact_hash = self._metadata.artifact_sha256
headers = self.api._auth_header(required=False) headers = {}
self.api._add_auth_token(headers, download_url)
self.b_path = _download_file(download_url, b_temp_path, artifact_hash, self.api.validate_certs, self.b_path = _download_file(download_url, b_temp_path, artifact_hash, self.api.validate_certs,
headers=headers) headers=headers)
@ -186,13 +186,7 @@ class CollectionRequirement:
def _get_metadata(self): def _get_metadata(self):
if self._metadata: if self._metadata:
return return
self._metadata = self.api.get_collection_version_metadata(self.namespace, self.name, self.latest_version)
n_collection_url = _urljoin(self.api.api_server, 'api', 'v2', 'collections', self.namespace, self.name,
'versions', self.latest_version)
details = json.load(open_url(n_collection_url, validate_certs=self.api.validate_certs,
headers=self.api._auth_header(required=False)))
self._galaxy_info = details
self._metadata = details['metadata']
def _meets_requirements(self, version, requirements, parent): def _meets_requirements(self, version, requirements, parent):
""" """
@ -260,6 +254,7 @@ class CollectionRequirement:
namespace = meta['namespace'] namespace = meta['namespace']
name = meta['name'] name = meta['name']
version = meta['version'] version = meta['version']
meta = CollectionVersionMetadata(namespace, name, version, None, None, meta['dependencies'])
return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent, return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent,
metadata=meta, files=files) metadata=meta, files=files)
@ -280,22 +275,21 @@ class CollectionRequirement:
% to_native(b_file_path)) % to_native(b_file_path))
if 'manifest_file' in info: if 'manifest_file' in info:
meta = info['manifest_file']['collection_info'] manifest = info['manifest_file']['collection_info']
namespace = manifest['namespace']
name = manifest['name']
version = manifest['version']
dependencies = manifest['dependencies']
else: else:
display.warning("Collection at '%s' does not have a MANIFEST.json file, cannot detect version." display.warning("Collection at '%s' does not have a MANIFEST.json file, cannot detect version."
% to_text(b_path)) % to_text(b_path))
parent_dir, name = os.path.split(to_text(b_path, errors='surrogate_or_strict')) parent_dir, name = os.path.split(to_text(b_path, errors='surrogate_or_strict'))
namespace = os.path.split(parent_dir)[1] namespace = os.path.split(parent_dir)[1]
meta = {
'namespace': namespace,
'name': name,
'version': '*',
'dependencies': {},
}
namespace = meta['namespace'] version = '*'
name = meta['name'] dependencies = {}
version = meta['version']
meta = CollectionVersionMetadata(namespace, name, version, None, None, dependencies)
files = info.get('files_file', {}).get('files', {}) files = info.get('files_file', {}).get('files', {})
@ -305,67 +299,31 @@ class CollectionRequirement:
@staticmethod @staticmethod
def from_name(collection, apis, requirement, force, parent=None): def from_name(collection, apis, requirement, force, parent=None):
namespace, name = collection.split('.', 1) namespace, name = collection.split('.', 1)
galaxy_info = None
galaxy_meta = None galaxy_meta = None
for api in apis: for api in apis:
collection_url_paths = [api.api_server, 'api', 'v2', 'collections', namespace, name, 'versions']
available_api_versions = get_available_api_versions(api)
if 'v3' in available_api_versions:
# /api/v3/ exists, use it
collection_url_paths[2] = 'v3'
# update this v3 GalaxyAPI to use Bearer token from now on
api.token_type = 'Bearer'
headers = api._auth_header(required=False)
is_single = False
if not (requirement == '*' or requirement.startswith('<') or requirement.startswith('>') or
requirement.startswith('!=')):
if requirement.startswith('='):
requirement = requirement.lstrip('=')
collection_url_paths.append(requirement)
is_single = True
n_collection_url = _urljoin(*collection_url_paths)
try: try:
resp = json.load(open_url(n_collection_url, validate_certs=api.validate_certs, headers=headers)) if not (requirement == '*' or requirement.startswith('<') or requirement.startswith('>') or
except urllib_error.HTTPError as err: requirement.startswith('!=')):
if requirement.startswith('='):
if err.code == 404: requirement = requirement.lstrip('=')
display.vvv("Collection '%s' is not available from server %s %s" % (collection, api.name, api.api_server))
continue
_handle_http_error(err, api, available_api_versions, resp = api.get_collection_version_metadata(namespace, name, requirement)
'Error fetching info for %s from %s (%s)' % (collection, api.name, api.api_server))
if is_single: galaxy_meta = resp
galaxy_info = resp versions = [resp.version]
galaxy_meta = resp['metadata'] else:
versions = [resp['version']] resp = api.get_collection_versions(namespace, name)
else:
versions = []
results_key = 'results'
if 'v3' in available_api_versions:
results_key = 'data'
while True:
# Galaxy supports semver but ansible-galaxy does not. We ignore any versions that don't match # Galaxy supports semver but ansible-galaxy does not. We ignore any versions that don't match
# StrictVersion (x.y.z) and only support pre-releases if an explicit version was set (done above). # StrictVersion (x.y.z) and only support pre-releases if an explicit version was set (done above).
versions += [v['version'] for v in resp[results_key] if StrictVersion.version_re.match(v['version'])] versions = [v for v in resp if StrictVersion.version_re.match(v)]
except GalaxyError as err:
next_link = resp.get('next', None) if err.http_code == 404:
if 'v3' in available_api_versions: display.vvv("Collection '%s' is not available from server %s %s"
next_link = resp['links']['next'] % (collection, api.name, api.api_server))
continue
if next_link is None: raise
break
resp = json.load(open_url(to_native(next_link, errors='surrogate_or_strict'),
validate_certs=api.validate_certs, headers=headers))
display.vvv("Collection '%s' obtained from server %s %s" % (collection, api.name, api.api_server)) display.vvv("Collection '%s' obtained from server %s %s" % (collection, api.name, api.api_server))
break break
@ -374,49 +332,9 @@ class CollectionRequirement:
req = CollectionRequirement(namespace, name, None, api, versions, requirement, force, parent=parent, req = CollectionRequirement(namespace, name, None, api, versions, requirement, force, parent=parent,
metadata=galaxy_meta) metadata=galaxy_meta)
req._galaxy_info = galaxy_info
return req return req
def get_available_api_versions(galaxy_api):
headers = {}
headers.update(galaxy_api._auth_header(required=False))
url = _urljoin(galaxy_api.api_server, "api")
try:
return_data = open_url(url, headers=headers, validate_certs=galaxy_api.validate_certs)
except urllib_error.HTTPError as err:
if err.code != 401:
_handle_http_error(err, galaxy_api, {},
"Error when finding available api versions from %s (%s)" %
(galaxy_api.name, galaxy_api.api_server))
# assume this is v3 and auth is required.
headers = {}
headers.update(galaxy_api._auth_header(token_type='Bearer', required=True))
# try again with auth
try:
return_data = open_url(url, headers=headers, validate_certs=galaxy_api.validate_certs)
except urllib_error.HTTPError as authed_err:
_handle_http_error(authed_err, galaxy_api, {},
"Error when finding available api versions from %s using auth (%s)" %
(galaxy_api.name, galaxy_api.api_server))
except Exception as e:
raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e)))
try:
data = json.loads(to_text(return_data.read(), errors='surrogate_or_strict'))
except Exception as e:
raise AnsibleError("Could not process data from the API server (%s): %s " % (url, to_native(e)))
available_versions = data.get('available_versions',
{'v1': '/api/v1',
'v2': '/api/v2'})
return available_versions
def build_collection(collection_path, output_path, force): def build_collection(collection_path, output_path, force):
""" """
Creates the Ansible collection artifact in a .tar.gz file. Creates the Ansible collection artifact in a .tar.gz file.
@ -461,40 +379,11 @@ def publish_collection(collection_path, api, wait, timeout):
:param wait: Whether to wait until the import process is complete. :param wait: Whether to wait until the import process is complete.
:param timeout: The time in seconds to wait for the import process to finish, 0 is indefinite. :param timeout: The time in seconds to wait for the import process to finish, 0 is indefinite.
""" """
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict') import_uri = api.publish_collection(collection_path)
if not os.path.exists(b_collection_path):
raise AnsibleError("The collection path specified '%s' does not exist." % to_native(collection_path))
elif not tarfile.is_tarfile(b_collection_path):
raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection "
"build' to create a proper release artifact." % to_native(collection_path))
display.display("Publishing collection artifact '%s' to %s %s" % (collection_path, api.name, api.api_server))
n_url = _urljoin(api.api_server, 'api', 'v2', 'collections')
available_api_versions = get_available_api_versions(api)
if 'v3' in available_api_versions:
n_url = _urljoin(api.api_server, 'api', 'v3', 'artifacts', 'collections')
api.token_type = 'Bearer'
headers = {}
headers.update(api._auth_header())
data, content_type = _get_mime_data(b_collection_path)
headers.update({
'Content-type': content_type,
'Content-length': len(data),
})
try:
resp = json.load(open_url(n_url, data=data, headers=headers, method='POST', validate_certs=api.validate_certs))
except urllib_error.HTTPError as err:
_handle_http_error(err, api, available_api_versions, "Error when publishing collection to %s (%s)" % (api.name, api.api_server))
import_uri = resp['task']
if wait: if wait:
display.display("Collection has been published to the Galaxy server %s %s" % (api.name, api.api_server)) display.display("Collection has been published to the Galaxy server %s %s" % (api.name, api.api_server))
_wait_import(import_uri, api, timeout) with _display_progress():
api.wait_import_task(import_uri, timeout)
display.display("Collection has been successfully published and imported to the Galaxy server %s %s" display.display("Collection has been successfully published and imported to the Galaxy server %s %s"
% (api.name, api.api_server)) % (api.name, api.api_server))
else: else:
@ -836,76 +725,6 @@ def _build_collection_tar(b_collection_path, b_tar_path, collection_manifest, fi
display.display('Created collection for %s at %s' % (collection_name, to_text(b_tar_path))) display.display('Created collection for %s at %s' % (collection_name, to_text(b_tar_path)))
def _get_mime_data(b_collection_path):
with open(b_collection_path, 'rb') as collection_tar:
data = collection_tar.read()
boundary = '--------------------------%s' % uuid.uuid4().hex
b_file_name = os.path.basename(b_collection_path)
part_boundary = b"--" + to_bytes(boundary, errors='surrogate_or_strict')
form = [
part_boundary,
b"Content-Disposition: form-data; name=\"sha256\"",
to_bytes(secure_hash_s(data), errors='surrogate_or_strict'),
part_boundary,
b"Content-Disposition: file; name=\"file\"; filename=\"%s\"" % b_file_name,
b"Content-Type: application/octet-stream",
b"",
data,
b"%s--" % part_boundary,
]
content_type = 'multipart/form-data; boundary=%s' % boundary
return b"\r\n".join(form), content_type
def _wait_import(task_url, api, timeout):
headers = api._auth_header()
state = 'waiting'
resp = None
display.display("Waiting until Galaxy import task %s has completed" % task_url)
with _display_progress():
start = time.time()
wait = 2
while timeout == 0 or (time.time() - start) < timeout:
resp = json.load(open_url(to_native(task_url, errors='surrogate_or_strict'), headers=headers,
method='GET', validate_certs=api.validate_certs))
state = resp.get('state', 'waiting')
if resp.get('finished_at', None):
break
display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again'
% (state, wait))
time.sleep(wait)
# poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds.
wait = min(30, wait * 1.5)
if state == 'waiting':
raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'"
% to_native(task_url))
for message in resp.get('messages', []):
level = message['level']
if level == 'error':
display.error("Galaxy import error message: %s" % message['message'])
elif level == 'warning':
display.warning("Galaxy import warning message: %s" % message['message'])
else:
display.vvv("Galaxy import message: %s - %s" % (level, message['message']))
if state == 'failed':
code = to_native(resp['error'].get('code', 'UNKNOWN'))
description = to_native(resp['error'].get('description', "Unknown error, see %s for more details" % task_url))
raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code))
def _find_existing_collections(path): def _find_existing_collections(path):
collections = [] collections = []
@ -1075,33 +894,3 @@ def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
os.makedirs(b_parent_dir) os.makedirs(b_parent_dir)
shutil.move(to_bytes(tmpfile_obj.name, errors='surrogate_or_strict'), b_dest_filepath) shutil.move(to_bytes(tmpfile_obj.name, errors='surrogate_or_strict'), b_dest_filepath)
def _handle_http_error(http_error, api, available_api_versions, context_error_message):
try:
err_info = json.load(http_error)
except (AttributeError, ValueError):
err_info = {}
if 'v3' in available_api_versions:
message_lines = []
errors = err_info.get('errors', None)
if not errors:
errors = [{'detail': 'Unknown error returned by Galaxy server.',
'code': 'Unknown'}]
for error in errors:
error_msg = error.get('detail') or error.get('title') or 'Unknown error returned by Galaxy server.'
error_code = error.get('code') or 'Unknown'
message_line = "(HTTP Code: %d, Message: %s Code: %s)" % (http_error.code, error_msg, error_code)
message_lines.append(message_line)
full_error_msg = "%s %s" % (context_error_message, ', '.join(message_lines))
else:
code = to_native(err_info.get('code', 'Unknown'))
message = to_native(err_info.get('message', 'Unknown error returned by Galaxy server.'))
full_error_msg = "%s (HTTP Code: %d, Message: %s Code: %s)" \
% (context_error_message, http_error.code, message, code)
raise AnsibleError(full_error_msg)

@ -6,13 +6,26 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import json
import os
import re
import pytest import pytest
import tarfile
import tempfile
import time
from io import BytesIO, StringIO
from units.compat.mock import MagicMock
from ansible import context from ansible import context
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.galaxy.api import GalaxyAPI from ansible.galaxy import api as galaxy_api
from ansible.galaxy.api import CollectionVersionMetadata, GalaxyAPI, GalaxyError
from ansible.galaxy.token import GalaxyToken from ansible.galaxy.token import GalaxyToken
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.six.moves.urllib import error as urllib_error
from ansible.utils import context_objects as co from ansible.utils import context_objects as co
from ansible.utils.display import Display
@pytest.fixture(autouse='function') @pytest.fixture(autouse='function')
@ -24,9 +37,34 @@ def reset_cli_args():
co.GlobalCLIArgs._Singleton__instance = None co.GlobalCLIArgs._Singleton__instance = None
@pytest.fixture()
def collection_artifact(tmp_path_factory):
''' Creates a collection artifact tarball that is ready to be published '''
output_dir = to_text(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Output'))
tar_path = os.path.join(output_dir, 'namespace-collection-v1.0.0.tar.gz')
with tarfile.open(tar_path, 'w:gz') as tfile:
b_io = BytesIO(b"\x00\x01\x02\x03")
tar_info = tarfile.TarInfo('test')
tar_info.size = 4
tar_info.mode = 0o0644
tfile.addfile(tarinfo=tar_info, fileobj=b_io)
yield tar_path
def get_test_galaxy_api(url, version):
api = GalaxyAPI(None, "test", url)
api._available_api_versions = {version: '/api/%s' % version}
api.token = GalaxyToken(token="my token")
return api
def test_api_no_auth(): def test_api_no_auth():
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com") api = GalaxyAPI(None, "test", "https://galaxy.ansible.com")
actual = api._auth_header(required=False) actual = {}
api._add_auth_token(actual, "")
assert actual == {} assert actual == {}
@ -34,23 +72,722 @@ def test_api_no_auth_but_required():
expected = "No access token or username set. A token can be set with --api-key, with 'ansible-galaxy login', " \ expected = "No access token or username set. A token can be set with --api-key, with 'ansible-galaxy login', " \
"or set in ansible.cfg." "or set in ansible.cfg."
with pytest.raises(AnsibleError, match=expected): with pytest.raises(AnsibleError, match=expected):
GalaxyAPI(None, "test", "https://galaxy.ansible.com")._auth_header() GalaxyAPI(None, "test", "https://galaxy.ansible.com")._add_auth_token({}, "", required=True)
def test_api_token_auth(): def test_api_token_auth():
token = GalaxyToken(token=u"my_token") token = GalaxyToken(token=u"my_token")
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=token) api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=token)
actual = api._auth_header() actual = {}
api._add_auth_token(actual, "")
assert actual == {'Authorization': 'Token my_token'}
def test_api_token_auth_with_token_type():
token = GalaxyToken(token=u"my_token")
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=token)
actual = {}
api._add_auth_token(actual, "", token_type="Bearer")
assert actual == {'Authorization': 'Bearer my_token'}
def test_api_token_auth_with_v3_url():
token = GalaxyToken(token=u"my_token")
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=token)
actual = {}
api._add_auth_token(actual, "https://galaxy.ansible.com/api/v3/resource/name")
assert actual == {'Authorization': 'Bearer my_token'}
def test_api_token_auth_with_v2_url():
token = GalaxyToken(token=u"my_token")
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=token)
actual = {}
# Add v3 to random part of URL but response should only see the v2 as the full URI path segment.
api._add_auth_token(actual, "https://galaxy.ansible.com/api/v2/resourcev3/name")
assert actual == {'Authorization': 'Token my_token'} assert actual == {'Authorization': 'Token my_token'}
def test_api_basic_auth_password(): def test_api_basic_auth_password():
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", username=u"user", password=u"pass") api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", username=u"user", password=u"pass")
actual = api._auth_header() actual = {}
api._add_auth_token(actual, "")
assert actual == {'Authorization': 'Basic dXNlcjpwYXNz'} assert actual == {'Authorization': 'Basic dXNlcjpwYXNz'}
def test_api_basic_auth_no_password(): def test_api_basic_auth_no_password():
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", username=u"user",) api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", username=u"user",)
actual = api._auth_header() actual = {}
api._add_auth_token(actual, "")
assert actual == {'Authorization': 'Basic dXNlcjo='} assert actual == {'Authorization': 'Basic dXNlcjo='}
def test_api_dont_override_auth_header():
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com")
actual = {'Authorization': 'Custom token'}
api._add_auth_token(actual, "")
assert actual == {'Authorization': 'Custom token'}
def test_initialise_galaxy(monkeypatch):
mock_open = MagicMock()
mock_open.side_effect = [
StringIO(u'{"available_versions":{"v1":"/api/v1"}}'),
StringIO(u'{"token":"my token"}'),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com")
actual = api.authenticate("github_token")
assert len(api.available_api_versions) == 2
assert api.available_api_versions['v1'] == u'/api/v1'
assert api.available_api_versions['v2'] == u'/api/v2'
assert actual == {u'token': u'my token'}
assert mock_open.call_count == 2
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.ansible.com/api'
assert mock_open.mock_calls[1][1][0] == 'https://galaxy.ansible.com/api/v1/tokens/'
assert mock_open.mock_calls[1][2]['data'] == 'github_token=github_token'
def test_initialise_galaxy_with_auth(monkeypatch):
mock_open = MagicMock()
mock_open.side_effect = [
StringIO(u'{"available_versions":{"v1":"/api/v1"}}'),
StringIO(u'{"token":"my token"}'),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=GalaxyToken(token='my_token'))
actual = api.authenticate("github_token")
assert len(api.available_api_versions) == 2
assert api.available_api_versions['v1'] == u'/api/v1'
assert api.available_api_versions['v2'] == u'/api/v2'
assert actual == {u'token': u'my token'}
assert mock_open.call_count == 2
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.ansible.com/api'
assert mock_open.mock_calls[0][2]['headers'] == {'Authorization': 'Token my_token'}
assert mock_open.mock_calls[1][1][0] == 'https://galaxy.ansible.com/api/v1/tokens/'
assert mock_open.mock_calls[1][2]['data'] == 'github_token=github_token'
def test_initialise_automation_hub(monkeypatch):
mock_open = MagicMock()
mock_open.side_effect = [
urllib_error.HTTPError('https://galaxy.ansible.com/api', 401, 'msg', {}, StringIO()),
# AH won't return v1 but we do for authenticate() to work.
StringIO(u'{"available_versions":{"v1":"/api/v1","v3":"/api/v3"}}'),
StringIO(u'{"token":"my token"}'),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=GalaxyToken(token='my_token'))
actual = api.authenticate("github_token")
assert len(api.available_api_versions) == 2
assert api.available_api_versions['v1'] == u'/api/v1'
assert api.available_api_versions['v3'] == u'/api/v3'
assert actual == {u'token': u'my token'}
assert mock_open.call_count == 3
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.ansible.com/api'
assert mock_open.mock_calls[0][2]['headers'] == {'Authorization': 'Token my_token'}
assert mock_open.mock_calls[1][1][0] == 'https://galaxy.ansible.com/api'
assert mock_open.mock_calls[1][2]['headers'] == {'Authorization': 'Bearer my_token'}
assert mock_open.mock_calls[2][1][0] == 'https://galaxy.ansible.com/api/v1/tokens/'
assert mock_open.mock_calls[2][2]['data'] == 'github_token=github_token'
def test_initialise_unknown(monkeypatch):
mock_open = MagicMock()
mock_open.side_effect = [
urllib_error.HTTPError('https://galaxy.ansible.com/api', 500, 'msg', {}, StringIO(u'{"msg":"raw error"}')),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=GalaxyToken(token='my_token'))
expected = "Error when finding available api versions from test (%s/api) (HTTP Code: 500, Message: Unknown " \
"error returned by Galaxy server.)" % api.api_server
with pytest.raises(GalaxyError, match=re.escape(expected)):
api.authenticate("github_token")
def test_get_available_api_versions(monkeypatch):
mock_open = MagicMock()
mock_open.side_effect = [
StringIO(u'{"available_versions":{"v1":"/api/v1","v2":"/api/v2"}}'),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com")
actual = api.available_api_versions
assert len(actual) == 2
assert actual['v1'] == u'/api/v1'
assert actual['v2'] == u'/api/v2'
assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.ansible.com/api'
def test_publish_collection_missing_file():
fake_path = u'/fake/ÅÑŚÌβŁÈ/path'
expected = to_native("The collection path specified '%s' does not exist." % fake_path)
api = get_test_galaxy_api("https://galaxy.ansible.com", "v2")
with pytest.raises(AnsibleError, match=expected):
api.publish_collection(fake_path)
def test_publish_collection_not_a_tarball():
expected = "The collection path specified '{0}' is not a tarball, use 'ansible-galaxy collection build' to " \
"create a proper release artifact."
api = get_test_galaxy_api("https://galaxy.ansible.com", "v2")
with tempfile.NamedTemporaryFile(prefix=u'ÅÑŚÌβŁÈ') as temp_file:
temp_file.write(b"\x00")
temp_file.flush()
with pytest.raises(AnsibleError, match=expected.format(to_native(temp_file.name))):
api.publish_collection(temp_file.name)
def test_publish_collection_unsupported_version():
expected = "Galaxy action publish_collection requires API versions 'v2, v3' but only 'v1' are available on test " \
"https://galaxy.ansible.com"
api = get_test_galaxy_api("https://galaxy.ansible.com", "v1")
with pytest.raises(AnsibleError, match=expected):
api.publish_collection("path")
@pytest.mark.parametrize('api_version, collection_url', [
('v2', 'collections'),
('v3', 'artifacts/collections'),
])
def test_publish_collection(api_version, collection_url, collection_artifact, monkeypatch):
api = get_test_galaxy_api("https://galaxy.ansible.com", api_version)
mock_call = MagicMock()
mock_call.return_value = {'task': 'http://task.url/'}
monkeypatch.setattr(api, '_call_galaxy', mock_call)
actual = api.publish_collection(collection_artifact)
assert actual == 'http://task.url/'
assert mock_call.call_count == 1
assert mock_call.mock_calls[0][1][0] == 'https://galaxy.ansible.com/api/%s/%s/' % (api_version, collection_url)
assert mock_call.mock_calls[0][2]['headers']['Content-length'] == len(mock_call.mock_calls[0][2]['args'])
assert mock_call.mock_calls[0][2]['headers']['Content-type'].startswith(
'multipart/form-data; boundary=--------------------------')
assert mock_call.mock_calls[0][2]['args'].startswith(b'--------------------------')
assert mock_call.mock_calls[0][2]['method'] == 'POST'
assert mock_call.mock_calls[0][2]['auth_required'] is True
@pytest.mark.parametrize('api_version, collection_url, response, expected', [
('v2', 'collections', {},
'Error when publishing collection to test (%s) (HTTP Code: 500, Message: Unknown error returned by Galaxy '
'server. Code: Unknown)'),
('v2', 'collections', {
'message': u'Galaxy error messäge',
'code': 'GWE002',
}, u'Error when publishing collection to test (%s) (HTTP Code: 500, Message: Galaxy error messäge Code: GWE002)'),
('v3', 'artifact/collections', {},
'Error when publishing collection to test (%s) (HTTP Code: 500, Message: Unknown error returned by Galaxy '
'server. Code: Unknown)'),
('v3', 'artifact/collections', {
'errors': [
{
'code': 'conflict.collection_exists',
'detail': 'Collection "mynamespace-mycollection-4.1.1" already exists.',
'title': 'Conflict.',
'status': '400',
},
{
'code': 'quantum_improbability',
'title': u'Rändom(?) quantum improbability.',
'source': {'parameter': 'the_arrow_of_time'},
'meta': {'remediation': 'Try again before'},
},
],
}, u'Error when publishing collection to test (%s) (HTTP Code: 500, Message: Collection '
u'"mynamespace-mycollection-4.1.1" already exists. Code: conflict.collection_exists), (HTTP Code: 500, '
u'Message: Rändom(?) quantum improbability. Code: quantum_improbability)')
])
def test_publish_failure(api_version, collection_url, response, expected, collection_artifact, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
expected_url = '%s/api/%s/%s' % (api.api_server, api_version, collection_url)
mock_open = MagicMock()
mock_open.side_effect = urllib_error.HTTPError(expected_url, 500, 'msg', {},
StringIO(to_text(json.dumps(response))))
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
with pytest.raises(GalaxyError, match=re.escape(to_native(expected % api.api_server))):
api.publish_collection(collection_artifact)
@pytest.mark.parametrize('api_version, token_type', [
('v2', 'Token'),
('v3', 'Bearer'),
])
def test_wait_import_task(api_version, token_type, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
import_uri = 'https://galaxy.server.com/api/%s/task/1234' % api_version
mock_open = MagicMock()
mock_open.return_value = StringIO(u'{"state":"success","finished_at":"time"}')
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display)
api.wait_import_task(import_uri)
assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == import_uri
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == 'Waiting until Galaxy import task %s has completed' % import_uri
@pytest.mark.parametrize('api_version, token_type', [
('v2', 'Token'),
('v3', 'Bearer'),
])
def test_wait_import_task_multiple_requests(api_version, token_type, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
import_uri = 'https://galaxy.server.com/api/%s/task/1234' % api_version
mock_open = MagicMock()
mock_open.side_effect = [
StringIO(u'{"state":"test"}'),
StringIO(u'{"state":"success","finished_at":"time"}'),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display)
mock_vvv = MagicMock()
monkeypatch.setattr(Display, 'vvv', mock_vvv)
monkeypatch.setattr(time, 'sleep', MagicMock())
api.wait_import_task(import_uri)
assert mock_open.call_count == 2
assert mock_open.mock_calls[0][1][0] == import_uri
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_open.mock_calls[1][1][0] == import_uri
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == 'Waiting until Galaxy import task %s has completed' % import_uri
assert mock_vvv.call_count == 2 # 1st is opening Galaxy token file.
assert mock_vvv.mock_calls[1][1][0] == \
'Galaxy import process has a status of test, wait 2 seconds before trying again'
@pytest.mark.parametrize('api_version, token_type', [
('v2', 'Token'),
('v3', 'Bearer'),
])
def test_wait_import_task_with_failure(api_version, token_type, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
import_uri = 'https://galaxy.server.com/api/%s/task/1234' % api_version
mock_open = MagicMock()
mock_open.side_effect = [
StringIO(to_text(json.dumps({
'finished_at': 'some_time',
'state': 'failed',
'error': {
'code': 'GW001',
'description': u'Becäuse I said so!',
},
'messages': [
{
'level': 'error',
'message': u'Somé error',
},
{
'level': 'warning',
'message': u'Some wärning',
},
{
'level': 'info',
'message': u'Somé info',
},
],
}))),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display)
mock_vvv = MagicMock()
monkeypatch.setattr(Display, 'vvv', mock_vvv)
mock_warn = MagicMock()
monkeypatch.setattr(Display, 'warning', mock_warn)
mock_err = MagicMock()
monkeypatch.setattr(Display, 'error', mock_err)
expected = to_native(u'Galaxy import process failed: Becäuse I said so! (Code: GW001)')
with pytest.raises(AnsibleError, match=re.escape(expected)):
api.wait_import_task(import_uri)
assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == import_uri
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == 'Waiting until Galaxy import task %s has completed' % import_uri
assert mock_vvv.call_count == 2 # 1st is opening Galaxy token file.
assert mock_vvv.mock_calls[1][1][0] == u'Galaxy import message: info - Somé info'
assert mock_warn.call_count == 1
assert mock_warn.mock_calls[0][1][0] == u'Galaxy import warning message: Some wärning'
assert mock_err.call_count == 1
assert mock_err.mock_calls[0][1][0] == u'Galaxy import error message: Somé error'
@pytest.mark.parametrize('api_version, token_type', [
('v2', 'Token'),
('v3', 'Bearer'),
])
def test_wait_import_task_with_failure_no_error(api_version, token_type, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
import_uri = 'https://galaxy.server.com/api/%s/task/1234' % api_version
mock_open = MagicMock()
mock_open.side_effect = [
StringIO(to_text(json.dumps({
'finished_at': 'some_time',
'state': 'failed',
'error': {},
'messages': [
{
'level': 'error',
'message': u'Somé error',
},
{
'level': 'warning',
'message': u'Some wärning',
},
{
'level': 'info',
'message': u'Somé info',
},
],
}))),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display)
mock_vvv = MagicMock()
monkeypatch.setattr(Display, 'vvv', mock_vvv)
mock_warn = MagicMock()
monkeypatch.setattr(Display, 'warning', mock_warn)
mock_err = MagicMock()
monkeypatch.setattr(Display, 'error', mock_err)
expected = 'Galaxy import process failed: Unknown error, see %s for more details (Code: UNKNOWN)' % import_uri
with pytest.raises(AnsibleError, match=re.escape(expected)):
api.wait_import_task(import_uri)
assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == import_uri
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == 'Waiting until Galaxy import task %s has completed' % import_uri
assert mock_vvv.call_count == 2 # 1st is opening Galaxy token file.
assert mock_vvv.mock_calls[1][1][0] == u'Galaxy import message: info - Somé info'
assert mock_warn.call_count == 1
assert mock_warn.mock_calls[0][1][0] == u'Galaxy import warning message: Some wärning'
assert mock_err.call_count == 1
assert mock_err.mock_calls[0][1][0] == u'Galaxy import error message: Somé error'
@pytest.mark.parametrize('api_version, token_type', [
('v2', 'Token'),
('v3', 'Bearer'),
])
def test_wait_import_task_timeout(api_version, token_type, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
import_uri = 'https://galaxy.server.com/api/%s/task/1234' % api_version
def return_response(*args, **kwargs):
return StringIO(u'{"state":"waiting"}')
mock_open = MagicMock()
mock_open.side_effect = return_response
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display)
mock_vvv = MagicMock()
monkeypatch.setattr(Display, 'vvv', mock_vvv)
monkeypatch.setattr(time, 'sleep', MagicMock())
expected = "Timeout while waiting for the Galaxy import process to finish, check progress at '%s'" % import_uri
with pytest.raises(AnsibleError, match=expected):
api.wait_import_task(import_uri, 1)
assert mock_open.call_count > 1
assert mock_open.mock_calls[0][1][0] == import_uri
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_open.mock_calls[1][1][0] == import_uri
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == 'Waiting until Galaxy import task %s has completed' % import_uri
expected_wait_msg = 'Galaxy import process has a status of waiting, wait {0} seconds before trying again'
assert mock_vvv.call_count > 9 # 1st is opening Galaxy token file.
assert mock_vvv.mock_calls[1][1][0] == expected_wait_msg.format(2)
assert mock_vvv.mock_calls[2][1][0] == expected_wait_msg.format(3)
assert mock_vvv.mock_calls[3][1][0] == expected_wait_msg.format(4)
assert mock_vvv.mock_calls[4][1][0] == expected_wait_msg.format(6)
assert mock_vvv.mock_calls[5][1][0] == expected_wait_msg.format(10)
assert mock_vvv.mock_calls[6][1][0] == expected_wait_msg.format(15)
assert mock_vvv.mock_calls[7][1][0] == expected_wait_msg.format(22)
assert mock_vvv.mock_calls[8][1][0] == expected_wait_msg.format(30)
@pytest.mark.parametrize('api_version, token_type, version', [
('v2', 'Token', 'v2.1.13'),
('v3', 'Bearer', 'v1.0.0'),
])
def test_get_collection_version_metadata_no_version(api_version, token_type, version, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
mock_open = MagicMock()
mock_open.side_effect = [
StringIO(to_text(json.dumps({
'download_url': 'https://downloadme.com',
'artifact': {
'sha256': 'ac47b6fac117d7c171812750dacda655b04533cf56b31080b82d1c0db3c9d80f',
},
'namespace': {
'name': 'namespace',
},
'collection': {
'name': 'collection',
},
'version': version,
'metadata': {
'dependencies': {},
}
}))),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
actual = api.get_collection_version_metadata('namespace', 'collection', version)
assert isinstance(actual, CollectionVersionMetadata)
assert actual.namespace == u'namespace'
assert actual.name == u'collection'
assert actual.download_url == u'https://downloadme.com'
assert actual.artifact_sha256 == u'ac47b6fac117d7c171812750dacda655b04533cf56b31080b82d1c0db3c9d80f'
assert actual.version == version
assert actual.dependencies == {}
assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == '%s/api/%s/collections/namespace/collection/versions/%s' \
% (api.api_server, api_version, version)
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
@pytest.mark.parametrize('api_version, token_type, response', [
('v2', 'Token', {
'count': 2,
'next': None,
'previous': None,
'results': [
{
'version': '1.0.0',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.0',
},
{
'version': '1.0.1',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.1',
},
],
}),
# TODO: Verify this once Automation Hub is actually out
('v3', 'Bearer', {
'count': 2,
'next': None,
'previous': None,
'data': [
{
'version': '1.0.0',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.0',
},
{
'version': '1.0.1',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.1',
},
],
}),
])
def test_get_collection_versions(api_version, token_type, response, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
mock_open = MagicMock()
mock_open.side_effect = [
StringIO(to_text(json.dumps(response))),
]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
actual = api.get_collection_versions('namespace', 'collection')
assert actual == [u'1.0.0', u'1.0.1']
assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \
'versions' % api_version
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
@pytest.mark.parametrize('api_version, token_type, responses', [
('v2', 'Token', [
{
'count': 6,
'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2',
'previous': None,
'results': [
{
'version': '1.0.0',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.0',
},
{
'version': '1.0.1',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.1',
},
],
},
{
'count': 6,
'next': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=3',
'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions',
'results': [
{
'version': '1.0.2',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.2',
},
{
'version': '1.0.3',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.3',
},
],
},
{
'count': 6,
'next': None,
'previous': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/?page=2',
'results': [
{
'version': '1.0.4',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.4',
},
{
'version': '1.0.5',
'href': 'https://galaxy.server.com/api/v2/collections/namespace/collection/versions/1.0.5',
},
],
},
]),
('v3', 'Bearer', [
{
'count': 6,
'links': {
'next': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/?page=2',
'previous': None,
},
'data': [
{
'version': '1.0.0',
'href': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/1.0.0',
},
{
'version': '1.0.1',
'href': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/1.0.1',
},
],
},
{
'count': 6,
'links': {
'next': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/?page=3',
'previous': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions',
},
'data': [
{
'version': '1.0.2',
'href': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/1.0.2',
},
{
'version': '1.0.3',
'href': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/1.0.3',
},
],
},
{
'count': 6,
'links': {
'next': None,
'previous': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/?page=2',
},
'data': [
{
'version': '1.0.4',
'href': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/1.0.4',
},
{
'version': '1.0.5',
'href': 'https://galaxy.server.com/api/v3/collections/namespace/collection/versions/1.0.5',
},
],
},
]),
])
def test_get_collection_versions_pagination(api_version, token_type, responses, monkeypatch):
api = get_test_galaxy_api('https://galaxy.server.com', api_version)
mock_open = MagicMock()
mock_open.side_effect = [StringIO(to_text(json.dumps(r))) for r in responses]
monkeypatch.setattr(galaxy_api, 'open_url', mock_open)
actual = api.get_collection_versions('namespace', 'collection')
a = ''
assert actual == [u'1.0.0', u'1.0.1', u'1.0.2', u'1.0.3', u'1.0.4', u'1.0.5']
assert mock_open.call_count == 3
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \
'versions' % api_version
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_open.mock_calls[1][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \
'versions/?page=2' % api_version
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == '%s my token' % token_type
assert mock_open.mock_calls[2][1][0] == 'https://galaxy.server.com/api/%s/collections/namespace/collection/' \
'versions/?page=3' % api_version
assert mock_open.mock_calls[2][2]['headers']['Authorization'] == '%s my token' % token_type

@ -9,18 +9,13 @@ __metaclass__ = type
import json import json
import os import os
import pytest import pytest
import re
import tarfile import tarfile
import tempfile
import time
import uuid import uuid
from hashlib import sha256 from hashlib import sha256
from io import BytesIO, StringIO from io import BytesIO
from units.compat.mock import MagicMock from units.compat.mock import MagicMock
import ansible.module_utils.six.moves.urllib.error as urllib_error
from ansible import context from ansible import context
from ansible.cli.galaxy import GalaxyCLI from ansible.cli.galaxy import GalaxyCLI
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
@ -55,13 +50,6 @@ def collection_input(tmp_path_factory):
return collection_dir, output_dir return collection_dir, output_dir
@pytest.fixture()
def galaxy_api_version(monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
@pytest.fixture() @pytest.fixture()
def collection_artifact(monkeypatch, tmp_path_factory): def collection_artifact(monkeypatch, tmp_path_factory):
''' Creates a temp collection artifact and mocked open_url instance for publishing tests ''' ''' Creates a temp collection artifact and mocked open_url instance for publishing tests '''
@ -408,440 +396,53 @@ def test_build_with_symlink_inside_collection(collection_input):
assert actual_file == '63444bfc766154e1bc7557ef6280de20d03fcd81' assert actual_file == '63444bfc766154e1bc7557ef6280de20d03fcd81'
def test_publish_missing_file():
fake_path = u'/fake/ÅÑŚÌβŁÈ/path'
expected = to_native("The collection path specified '%s' does not exist." % fake_path)
with pytest.raises(AnsibleError, match=expected):
collection.publish_collection(fake_path, None, True, 0)
def test_publish_not_a_tarball():
expected = "The collection path specified '{0}' is not a tarball, use 'ansible-galaxy collection build' to " \
"create a proper release artifact."
with tempfile.NamedTemporaryFile(prefix=u'ÅÑŚÌβŁÈ') as temp_file:
temp_file.write(b"\x00")
temp_file.flush()
with pytest.raises(AnsibleError, match=expected.format(to_native(temp_file.name))):
collection.publish_collection(temp_file.name, None, True, 0)
def test_publish_no_wait(galaxy_server, collection_artifact, monkeypatch): def test_publish_no_wait(galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
mock_display = MagicMock() mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display) monkeypatch.setattr(Display, 'display', mock_display)
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
fake_import_uri = 'https://galaxy.server.com/api/v2/import/1234' fake_import_uri = 'https://galaxy.server.com/api/v2/import/1234'
mock_open.return_value = StringIO(u'{"task":"%s"}' % fake_import_uri) mock_publish = MagicMock()
expected_form, expected_content_type = collection._get_mime_data(to_bytes(artifact_path)) mock_publish.return_value = fake_import_uri
monkeypatch.setattr(galaxy_server, 'publish_collection', mock_publish)
collection.publish_collection(artifact_path, galaxy_server, False, 0) collection.publish_collection(artifact_path, galaxy_server, False, 0)
assert mock_open.call_count == 1 assert mock_publish.call_count == 1
assert mock_open.mock_calls[0][1][0] == '%s/api/v2/collections/' % galaxy_server.api_server assert mock_publish.mock_calls[0][1][0] == artifact_path
assert mock_open.mock_calls[0][2]['data'] == expected_form
assert mock_open.mock_calls[0][2]['method'] == 'POST'
assert mock_open.mock_calls[0][2]['validate_certs'] is True
assert mock_open.mock_calls[0][2]['headers']['Authorization'] == 'Token key'
assert mock_open.mock_calls[0][2]['headers']['Content-length'] == len(expected_form)
assert mock_open.mock_calls[0][2]['headers']['Content-type'] == expected_content_type
assert mock_display.call_count == 2
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s" \
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[1][1][0] == \
"Collection has been pushed to the Galaxy server %s %s, not waiting until import has completed due to --no-wait " \
"being set. Import task results can be found at %s" % (galaxy_server.name, galaxy_server.api_server, fake_import_uri)
def test_publish_dont_validate_cert(galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
galaxy_server.validate_certs = False
artifact_path, mock_open = collection_artifact
mock_open.return_value = StringIO(u'{"task":"https://galaxy.server.com/api/v2/import/1234"}')
collection.publish_collection(artifact_path, galaxy_server, False, 0)
assert mock_open.call_count == 1
assert mock_open.mock_calls[0][2]['validate_certs'] is False
def test_publish_failure(galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
artifact_path, mock_open = collection_artifact
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 500, 'msg', {}, StringIO())
expected = 'Error when publishing collection to test_server (https://galaxy.ansible.com) ' \
'(HTTP Code: 500, Message: Unknown error returned by Galaxy ' \
'server. Code: Unknown)'
with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, galaxy_server, True, 0)
def test_publish_failure_with_json_info(galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
artifact_path, mock_open = collection_artifact
return_content = StringIO(u'{"message":"Galaxy error message","code":"GWE002"}')
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 503, 'msg', {}, return_content)
expected = 'Error when publishing collection to test_server (https://galaxy.ansible.com) ' \
'(HTTP Code: 503, Message: Galaxy error message Code: GWE002)'
with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, galaxy_server, True, 0)
@pytest.mark.parametrize("api_version,token_type", [
('v2', 'Token'),
('v3', 'Bearer')
])
def test_publish_with_wait(api_version, token_type, galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {api_version: '/api/%s' % api_version}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234'
artifact_path, mock_open = collection_artifact
mock_open.side_effect = (
StringIO(u'{"task":"%s"}' % fake_import_uri),
StringIO(u'{"finished_at":"some_time","state":"success"}')
)
collection.publish_collection(artifact_path, galaxy_server, True, 0)
assert mock_open.call_count == 2
assert mock_open.mock_calls[1][1][0] == fake_import_uri
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == '%s key' % token_type
assert mock_open.mock_calls[1][2]['validate_certs'] is True
assert mock_open.mock_calls[1][2]['method'] == 'GET'
assert mock_display.call_count == 5
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s" \
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[1][1][0] == 'Collection has been published to the Galaxy server %s %s'\
% (galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[2][1][0] == 'Waiting until Galaxy import task %s has completed' % fake_import_uri
assert mock_display.mock_calls[4][1][0] == 'Collection has been successfully published and imported to the ' \
'Galaxy server %s %s' % (galaxy_server.name, galaxy_server.api_server)
@pytest.mark.parametrize("api_version,exp_api_url,token_type", [
('v2', '/api/v2/collections/', 'Token'),
('v3', '/api/v3/artifacts/collections/', 'Bearer')
])
def test_publish_with_wait_timeout(api_version, exp_api_url, token_type, galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {api_version: '/api/%s' % api_version}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
monkeypatch.setattr(time, 'sleep', MagicMock())
mock_vvv = MagicMock()
monkeypatch.setattr(Display, 'vvv', mock_vvv)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234'
artifact_path, mock_open = collection_artifact
mock_open.side_effect = (
StringIO(u'{"task":"%s"}' % fake_import_uri),
StringIO(u'{"finished_at":null}'),
StringIO(u'{"finished_at":"some_time","state":"success"}')
)
collection.publish_collection(artifact_path, galaxy_server, True, 60)
assert mock_open.call_count == 3
assert mock_open.mock_calls[1][1][0] == fake_import_uri
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == '%s key' % token_type
assert mock_open.mock_calls[1][2]['validate_certs'] is True
assert mock_open.mock_calls[1][2]['method'] == 'GET'
assert mock_open.mock_calls[2][1][0] == fake_import_uri
assert mock_open.mock_calls[2][2]['headers']['Authorization'] == '%s key' % token_type
assert mock_open.mock_calls[2][2]['validate_certs'] is True
assert mock_open.mock_calls[2][2]['method'] == 'GET'
assert mock_vvv.call_count == 2
assert mock_vvv.mock_calls[1][1][0] == \
'Galaxy import process has a status of waiting, wait 2 seconds before trying again'
def test_publish_with_wait_timeout_failure(galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
monkeypatch.setattr(time, 'sleep', MagicMock())
mock_vvv = MagicMock()
monkeypatch.setattr(Display, 'vvv', mock_vvv)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234' assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == \
artifact_path, mock_open = collection_artifact "Collection has been pushed to the Galaxy server %s %s, not waiting until import has completed due to " \
"--no-wait being set. Import task results can be found at %s" % (galaxy_server.name, galaxy_server.api_server,
first_call = True fake_import_uri)
def open_value(*args, **kwargs):
if first_call:
return StringIO(u'{"task":"%s"}' % fake_import_uri)
else:
return StringIO(u'{"finished_at":null}')
mock_open.side_effect = open_value
expected = "Timeout while waiting for the Galaxy import process to finish, check progress at '%s'" \
% fake_import_uri
with pytest.raises(AnsibleError, match=expected):
collection.publish_collection(artifact_path, galaxy_server, True, 2)
# While the seconds exceed the time we are testing that the exponential backoff gets to 30 and then sits there
# Because we mock time.sleep() there should be thousands of calls here
expected_wait_msg = 'Galaxy import process has a status of waiting, wait {0} seconds before trying again'
assert mock_vvv.call_count > 9
assert mock_vvv.mock_calls[1][1][0] == expected_wait_msg.format(2)
assert mock_vvv.mock_calls[2][1][0] == expected_wait_msg.format(3)
assert mock_vvv.mock_calls[3][1][0] == expected_wait_msg.format(4)
assert mock_vvv.mock_calls[4][1][0] == expected_wait_msg.format(6)
assert mock_vvv.mock_calls[5][1][0] == expected_wait_msg.format(10)
assert mock_vvv.mock_calls[6][1][0] == expected_wait_msg.format(15)
assert mock_vvv.mock_calls[7][1][0] == expected_wait_msg.format(22)
assert mock_vvv.mock_calls[8][1][0] == expected_wait_msg.format(30)
def test_publish_with_wait_and_failure(galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
def test_publish_with_wait(galaxy_server, collection_artifact, monkeypatch):
mock_display = MagicMock() mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display) monkeypatch.setattr(Display, 'display', mock_display)
mock_vvv = MagicMock()
monkeypatch.setattr(Display, 'vvv', mock_vvv)
mock_warn = MagicMock()
monkeypatch.setattr(Display, 'warning', mock_warn)
mock_err = MagicMock()
monkeypatch.setattr(Display, 'error', mock_err)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234'
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
fake_import_uri = 'https://galaxy.server.com/api/v2/import/1234'
import_stat = { mock_publish = MagicMock()
'finished_at': 'some_time', mock_publish.return_value = fake_import_uri
'state': 'failed', monkeypatch.setattr(galaxy_server, 'publish_collection', mock_publish)
'error': {
'code': 'GW001',
'description': 'Because I said so!',
},
'messages': [
{
'level': 'error',
'message': 'Some error',
},
{
'level': 'warning',
'message': 'Some warning',
},
{
'level': 'info',
'message': 'Some info',
},
],
}
mock_open.side_effect = (
StringIO(u'{"task":"%s"}' % fake_import_uri),
StringIO(to_text(json.dumps(import_stat)))
)
expected = 'Galaxy import process failed: Because I said so! (Code: GW001)'
with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, galaxy_server, True, 0)
assert mock_open.call_count == 2
assert mock_open.mock_calls[1][1][0] == fake_import_uri
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == 'Token key'
assert mock_open.mock_calls[1][2]['validate_certs'] is True
assert mock_open.mock_calls[1][2]['method'] == 'GET'
assert mock_display.call_count == 4
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s"\
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[1][1][0] == 'Collection has been published to the Galaxy server %s %s'\
% (galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[2][1][0] == 'Waiting until Galaxy import task %s has completed' % fake_import_uri
assert mock_vvv.call_count == 2
assert mock_vvv.mock_calls[1][1][0] == 'Galaxy import message: info - Some info'
assert mock_warn.call_count == 1
assert mock_warn.mock_calls[0][1][0] == 'Galaxy import warning message: Some warning'
assert mock_err.call_count == 1
assert mock_err.mock_calls[0][1][0] == 'Galaxy import error message: Some error'
def test_publish_with_wait_and_failure_and_no_error(galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display)
mock_vvv = MagicMock()
monkeypatch.setattr(Display, 'vvv', mock_vvv)
mock_warn = MagicMock()
monkeypatch.setattr(Display, 'warning', mock_warn)
mock_err = MagicMock()
monkeypatch.setattr(Display, 'error', mock_err)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234'
artifact_path, mock_open = collection_artifact
import_stat = {
'finished_at': 'some_time',
'state': 'failed',
'error': {},
'messages': [
{
'level': 'error',
'message': 'Some error',
},
{
'level': 'warning',
'message': 'Some warning',
},
{
'level': 'info',
'message': 'Some info',
},
],
}
mock_open.side_effect = (
StringIO(u'{"task":"%s"}' % fake_import_uri),
StringIO(to_text(json.dumps(import_stat)))
)
expected = 'Galaxy import process failed: Unknown error, see %s for more details (Code: UNKNOWN)' % fake_import_uri
with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, galaxy_server, True, 0)
assert mock_open.call_count == 2
assert mock_open.mock_calls[1][1][0] == fake_import_uri
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == 'Token key'
assert mock_open.mock_calls[1][2]['validate_certs'] is True
assert mock_open.mock_calls[1][2]['method'] == 'GET'
assert mock_display.call_count == 4
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s"\
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[1][1][0] == 'Collection has been published to the Galaxy server %s %s'\
% (galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[2][1][0] == 'Waiting until Galaxy import task %s has completed' % fake_import_uri
assert mock_vvv.call_count == 2
assert mock_vvv.mock_calls[1][1][0] == 'Galaxy import message: info - Some info'
assert mock_warn.call_count == 1
assert mock_warn.mock_calls[0][1][0] == 'Galaxy import warning message: Some warning'
assert mock_err.call_count == 1
assert mock_err.mock_calls[0][1][0] == 'Galaxy import error message: Some error'
def test_publish_failure_v3_with_json_info_409_conflict(galaxy_server, collection_artifact, monkeypatch):
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v3': '/api/v3'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
artifact_path, mock_open = collection_artifact
error_response = {
"errors": [
{
"code": "conflict.collection_exists",
"detail": 'Collection "testing-ansible_testing_content-4.0.4" already exists.',
"title": "Conflict.",
"status": "409",
},
]
}
return_content = StringIO(to_text(json.dumps(error_response)))
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 409, 'msg', {}, return_content)
expected = 'Error when publishing collection to test_server (https://galaxy.ansible.com) ' \ mock_wait = MagicMock()
'(HTTP Code: 409, Message: Collection "testing-ansible_testing_content-4.0.4"' \ monkeypatch.setattr(galaxy_server, 'wait_import_task', mock_wait)
' already exists. Code: conflict.collection_exists)'
with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, galaxy_server, True, 0)
collection.publish_collection(artifact_path, galaxy_server, True, 0)
def test_publish_failure_v3_with_json_info_multiple_errors(galaxy_server, collection_artifact, monkeypatch): assert mock_publish.call_count == 1
mock_avail_ver = MagicMock() assert mock_publish.mock_calls[0][1][0] == artifact_path
mock_avail_ver.return_value = {'v3': '/api/v3'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
artifact_path, mock_open = collection_artifact assert mock_wait.call_count == 1
assert mock_wait.mock_calls[0][1][0] == fake_import_uri
error_response = { assert mock_display.mock_calls[0][1][0] == "Collection has been published to the Galaxy server test_server %s" \
"errors": [ % galaxy_server.api_server
{
"code": "conflict.collection_exists",
"detail": 'Collection "mynamespace-mycollection-4.1.1" already exists.',
"title": "Conflict.",
"status": "400",
},
{
"code": "quantum_improbability",
"title": "Random(?) quantum improbability.",
"source": {"parameter": "the_arrow_of_time"},
"meta": {"remediation": "Try again before"}
},
]
}
return_content = StringIO(to_text(json.dumps(error_response)))
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 400, 'msg', {}, return_content)
expected = 'Error when publishing collection to test_server (https://galaxy.ansible.com) ' \
'(HTTP Code: 400, Message: Collection "mynamespace-mycollection-4.1.1"' \
' already exists. Code: conflict.collection_exists),' \
' (HTTP Code: 400, Message: Random(?) quantum improbability. Code: quantum_improbability)'
with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, galaxy_server, True, 0)
def test_find_existing_collections(tmp_path_factory, monkeypatch): def test_find_existing_collections(tmp_path_factory, monkeypatch):
@ -962,91 +563,3 @@ def test_extract_tar_file_missing_parent_dir(tmp_tarfile):
collection._extract_tar_file(tfile, filename, output_dir, temp_dir, checksum) collection._extract_tar_file(tfile, filename, output_dir, temp_dir, checksum)
os.path.isfile(output_file) os.path.isfile(output_file)
def test_get_available_api_versions_v2_auth_not_required_without_auth(galaxy_server, collection_artifact, monkeypatch):
# mock_avail_ver = MagicMock()
# mock_avail_ver.side_effect = {api_version: '/api/%s' % api_version}
# monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
response_obj = {
"description": "GALAXY REST API",
"current_version": "v1",
"available_versions": {
"v1": "/api/v1/",
"v2": "/api/v2/"
},
"server_version": "3.2.4",
"version_name": "Doin' it Right",
"team_members": [
"chouseknecht",
"cutwater",
"alikins",
"newswangerd",
"awcrosby",
"tima",
"gregdek"
]
}
artifact_path, mock_open = collection_artifact
return_content = StringIO(to_text(json.dumps(response_obj)))
mock_open.return_value = return_content
res = collection.get_available_api_versions(galaxy_server)
assert res == {'v1': '/api/v1/', 'v2': '/api/v2/'}
def test_get_available_api_versions_v3_auth_required_without_auth(galaxy_server, collection_artifact, monkeypatch):
# mock_avail_ver = MagicMock()
# mock_avail_ver.side_effect = {api_version: '/api/%s' % api_version}
# monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
error_response = {'code': 'unauthorized', 'detail': 'The request was not authorized'}
artifact_path, mock_open = collection_artifact
return_content = StringIO(to_text(json.dumps(error_response)))
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 401, 'msg', {'WWW-Authenticate': 'Bearer'}, return_content)
with pytest.raises(AnsibleError):
collection.get_available_api_versions(galaxy_server)
def test_get_available_api_versions_v3_auth_required_with_auth_on_retry(galaxy_server, collection_artifact, monkeypatch):
# mock_avail_ver = MagicMock()
# mock_avail_ver.side_effect = {api_version: '/api/%s' % api_version}
# monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
error_obj = {'code': 'unauthorized', 'detail': 'The request was not authorized'}
success_obj = {
"description": "GALAXY REST API",
"current_version": "v1",
"available_versions": {
"v3": "/api/v3/"
},
"server_version": "3.2.4",
"version_name": "Doin' it Right",
"team_members": [
"chouseknecht",
"cutwater",
"alikins",
"newswangerd",
"awcrosby",
"tima",
"gregdek"
]
}
artifact_path, mock_open = collection_artifact
error_response = StringIO(to_text(json.dumps(error_obj)))
success_response = StringIO(to_text(json.dumps(success_obj)))
mock_open.side_effect = [
urllib_error.HTTPError('https://galaxy.server.com', 401, 'msg', {'WWW-Authenticate': 'Bearer'}, error_response),
success_response,
]
try:
res = collection.get_available_api_versions(galaxy_server)
except AnsibleError as err:
print(err)
raise
assert res == {'v3': '/api/v3/'}

@ -290,21 +290,10 @@ def test_build_requirement_from_tar_invalid_manifest(tmp_path_factory):
collection.CollectionRequirement.from_tar(tar_path, True, True) collection.CollectionRequirement.from_tar(tar_path, True, True)
@pytest.mark.parametrize("api_version,exp_api_url", [ def test_build_requirement_from_name(galaxy_server, monkeypatch):
('v2', '/api/v2/collections/namespace/collection/versions/'), mock_get_versions = MagicMock()
('v3', '/api/v3/collections/namespace/collection/versions/') mock_get_versions.return_value = ['2.1.9', '2.1.10']
]) monkeypatch.setattr(galaxy_server, 'get_collection_versions', mock_get_versions)
def test_build_requirement_from_name(api_version, exp_api_url, galaxy_server, monkeypatch):
mock_avail_ver = MagicMock()
avail_api_versions = {api_version: '/api/%s' % api_version}
mock_avail_ver.return_value = avail_api_versions
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
json_str = artifact_versions_json('namespace', 'collection', ['2.1.9', '2.1.10'], galaxy_server, avail_api_versions)
mock_open = MagicMock()
mock_open.return_value = StringIO(json_str)
monkeypatch.setattr(collection, 'open_url', mock_open)
actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '*', True, True) actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '*', True, True)
@ -317,27 +306,14 @@ def test_build_requirement_from_name(api_version, exp_api_url, galaxy_server, mo
assert actual.latest_version == u'2.1.10' assert actual.latest_version == u'2.1.10'
assert actual.dependencies is None assert actual.dependencies is None
assert mock_open.call_count == 1 assert mock_get_versions.call_count == 1
assert mock_open.mock_calls[0][1][0] == '%s%s' % (galaxy_server.api_server, exp_api_url) assert mock_get_versions.mock_calls[0][1] == ('namespace', 'collection')
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
@pytest.mark.parametrize("api_version,exp_api_url", [
('v2', '/api/v2/collections/namespace/collection/versions/'),
('v3', '/api/v3/collections/namespace/collection/versions/')
])
def test_build_requirement_from_name_with_prerelease(api_version, exp_api_url, galaxy_server, monkeypatch):
mock_avail_ver = MagicMock()
avail_api_versions = {api_version: '/api/%s' % api_version}
mock_avail_ver.return_value = avail_api_versions
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
json_str = artifact_versions_json('namespace', 'collection', ['1.0.1', '2.0.1-beta.1', '2.0.1'],
galaxy_server, avail_api_versions)
mock_open = MagicMock()
mock_open.return_value = StringIO(json_str)
monkeypatch.setattr(collection, 'open_url', mock_open) def test_build_requirement_from_name_with_prerelease(galaxy_server, monkeypatch):
mock_get_versions = MagicMock()
mock_get_versions.return_value = ['1.0.1', '2.0.1-beta.1', '2.0.1']
monkeypatch.setattr(galaxy_server, 'get_collection_versions', mock_get_versions)
actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '*', True, True) actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '*', True, True)
@ -350,28 +326,15 @@ def test_build_requirement_from_name_with_prerelease(api_version, exp_api_url, g
assert actual.latest_version == u'2.0.1' assert actual.latest_version == u'2.0.1'
assert actual.dependencies is None assert actual.dependencies is None
assert mock_open.call_count == 1 assert mock_get_versions.call_count == 1
assert mock_open.mock_calls[0][1][0] == '%s%s' % (galaxy_server.api_server, exp_api_url) assert mock_get_versions.mock_calls[0][1] == ('namespace', 'collection')
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
@pytest.mark.parametrize("api_version,exp_api_url", [
('v2', '/api/v2/collections/namespace/collection/versions/2.0.1-beta.1/'),
('v3', '/api/v3/collections/namespace/collection/versions/2.0.1-beta.1/')
])
def test_build_requirment_from_name_with_prerelease_explicit(api_version, exp_api_url, galaxy_server, monkeypatch):
mock_avail_ver = MagicMock()
avail_api_versions = {api_version: '/api/%s' % api_version}
mock_avail_ver.return_value = avail_api_versions
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
json_str = artifact_json('namespace', 'collection', '2.0.1-beta.1', {}, galaxy_server.api_server)
mock_open = MagicMock()
mock_open.side_effect = (
StringIO(json_str),
)
monkeypatch.setattr(collection, 'open_url', mock_open) def test_build_requirment_from_name_with_prerelease_explicit(galaxy_server, monkeypatch):
mock_get_info = MagicMock()
mock_get_info.return_value = api.CollectionVersionMetadata('namespace', 'collection', '2.0.1-beta.1', None, None,
{})
monkeypatch.setattr(galaxy_server, 'get_collection_version_metadata', mock_get_info)
actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '2.0.1-beta.1', True, actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '2.0.1-beta.1', True,
True) True)
@ -385,32 +348,22 @@ def test_build_requirment_from_name_with_prerelease_explicit(api_version, exp_ap
assert actual.latest_version == u'2.0.1-beta.1' assert actual.latest_version == u'2.0.1-beta.1'
assert actual.dependencies == {} assert actual.dependencies == {}
assert mock_open.call_count == 1 assert mock_get_info.call_count == 1
assert mock_open.mock_calls[0][1][0] == '%s%s' % (galaxy_server.api_server, exp_api_url) assert mock_get_info.mock_calls[0][1] == ('namespace', 'collection', '2.0.1-beta.1')
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
@pytest.mark.parametrize("api_version,exp_api_url", [
('v2', '/api/v2/collections/namespace/collection/versions/'),
('v3', '/api/v3/collections/namespace/collection/versions/')
])
def test_build_requirement_from_name_second_server(api_version, exp_api_url, galaxy_server, monkeypatch):
mock_avail_ver = MagicMock()
avail_api_versions = {api_version: '/api/%s' % api_version}
mock_avail_ver.return_value = avail_api_versions
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
json_str = artifact_versions_json('namespace', 'collection', ['1.0.1', '1.0.2', '1.0.3'], galaxy_server, avail_api_versions)
mock_open = MagicMock()
mock_open.side_effect = (
urllib_error.HTTPError('https://galaxy.server.com', 404, 'msg', {}, None),
StringIO(json_str)
)
monkeypatch.setattr(collection, 'open_url', mock_open) def test_build_requirement_from_name_second_server(galaxy_server, monkeypatch):
mock_get_versions = MagicMock()
mock_get_versions.return_value = ['1.0.1', '1.0.2', '1.0.3']
monkeypatch.setattr(galaxy_server, 'get_collection_versions', mock_get_versions)
broken_server = copy.copy(galaxy_server) broken_server = copy.copy(galaxy_server)
broken_server.api_server = 'https://broken.com/' broken_server.api_server = 'https://broken.com/'
mock_404 = MagicMock()
mock_404.side_effect = api.GalaxyError(urllib_error.HTTPError('https://galaxy.server.com', 404, 'msg', {},
StringIO()), "custom msg")
monkeypatch.setattr(broken_server, 'get_collection_versions', mock_404)
actual = collection.CollectionRequirement.from_name('namespace.collection', [broken_server, galaxy_server], actual = collection.CollectionRequirement.from_name('namespace.collection', [broken_server, galaxy_server],
'>1.0.1', False, True) '>1.0.1', False, True)
@ -423,99 +376,46 @@ def test_build_requirement_from_name_second_server(api_version, exp_api_url, gal
assert actual.latest_version == u'1.0.3' assert actual.latest_version == u'1.0.3'
assert actual.dependencies is None assert actual.dependencies is None
assert mock_open.call_count == 2 assert mock_404.call_count == 1
assert mock_open.mock_calls[0][1][0] == u"https://broken.com%s" % exp_api_url assert mock_404.mock_calls[0][1] == ('namespace', 'collection')
assert mock_open.mock_calls[1][1][0] == u"%s%s" % (galaxy_server.api_server, exp_api_url)
assert mock_open.mock_calls[1][2] == {'validate_certs': True, "headers": {}} assert mock_get_versions.call_count == 1
assert mock_get_versions.mock_calls[0][1] == ('namespace', 'collection')
def test_build_requirement_from_name_missing(galaxy_server, monkeypatch): def test_build_requirement_from_name_missing(galaxy_server, monkeypatch):
mock_open = MagicMock() mock_open = MagicMock()
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 404, 'msg', {}, None) mock_open.side_effect = api.GalaxyError(urllib_error.HTTPError('https://galaxy.server.com', 404, 'msg', {},
StringIO()), "")
monkeypatch.setattr(collection, 'open_url', mock_open)
mock_avail_ver = MagicMock() monkeypatch.setattr(galaxy_server, 'get_collection_versions', mock_open)
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
expected = "Failed to find collection namespace.collection:*" expected = "Failed to find collection namespace.collection:*"
with pytest.raises(AnsibleError, match=expected): with pytest.raises(AnsibleError, match=expected):
collection.CollectionRequirement.from_name('namespace.collection', collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server, galaxy_server], '*', False,
[galaxy_server, galaxy_server], '*', False, True) True)
@pytest.mark.parametrize("api_version,errors_to_return,expected", [
('v2',
[],
'Error fetching info for .*\\..* \\(HTTP Code: 400, Message: Unknown error returned by Galaxy server. Code: Unknown\\)'),
('v2',
[{'message': 'Polarization error. Try flipping it over.', 'code': 'polarization_error'}],
'Error fetching info for .*\\..* \\(HTTP Code: 400, Message: Polarization error. Try flipping it over. Code: polarization_error\\)'),
('v3',
[],
'Error fetching info for .*\\..* \\(HTTP Code: 400, Message: Unknown error returned by Galaxy server. Code: Unknown\\)'),
('v3',
[{'code': 'invalid_param', 'detail': '"easy" is not a valid query param'}],
'Error fetching info for .*\\..* \\(HTTP Code: 400, Message: "easy" is not a valid query param Code: invalid_param\\)'),
])
def test_build_requirement_from_name_400_bad_request(api_version, errors_to_return, expected, galaxy_server, monkeypatch):
mock_avail_ver = MagicMock()
available_api_versions = {api_version: '/api/%s' % api_version}
mock_avail_ver.return_value = available_api_versions
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
json_str = error_json(galaxy_server, errors_to_return=errors_to_return, available_api_versions=available_api_versions)
def test_build_requirement_from_name_401_unauthorized(galaxy_server, monkeypatch):
mock_open = MagicMock() mock_open = MagicMock()
monkeypatch.setattr(collection, 'open_url', mock_open) mock_open.side_effect = api.GalaxyError(urllib_error.HTTPError('https://galaxy.server.com', 401, 'msg', {},
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 400, 'msg', {}, StringIO(json_str)) StringIO()), "error")
with pytest.raises(AnsibleError, match=expected): monkeypatch.setattr(galaxy_server, 'get_collection_versions', mock_open)
collection.CollectionRequirement.from_name('namespace.collection',
[galaxy_server, galaxy_server], '*', False)
@pytest.mark.parametrize("api_version,errors_to_return,expected", [
('v2',
[],
'Error fetching info for .*\\..* \\(HTTP Code: 401, Message: Unknown error returned by Galaxy server. Code: Unknown\\)'),
('v3',
[],
'Error fetching info for .*\\..* \\(HTTP Code: 401, Message: Unknown error returned by Galaxy server. Code: Unknown\\)'),
('v3',
[{'code': 'unauthorized', 'detail': 'The request was not authorized'}],
'Error fetching info for .*\\..* \\(HTTP Code: 401, Message: The request was not authorized Code: unauthorized\\)'),
])
def test_build_requirement_from_name_401_unauthorized(api_version, errors_to_return, expected, galaxy_server, monkeypatch):
mock_avail_ver = MagicMock()
available_api_versions = {api_version: '/api/%s' % api_version}
mock_avail_ver.return_value = available_api_versions
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
json_str = error_json(galaxy_server, errors_to_return=errors_to_return, available_api_versions=available_api_versions)
mock_open = MagicMock() expected = "error (HTTP Code: 401, Message: Unknown error returned by Galaxy server.)"
monkeypatch.setattr(collection, 'open_url', mock_open) with pytest.raises(api.GalaxyError, match=re.escape(expected)):
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 401, 'msg', {}, StringIO(json_str)) collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server, galaxy_server], '*', False)
with pytest.raises(AnsibleError, match=expected):
collection.CollectionRequirement.from_name('namespace.collection',
[galaxy_server, galaxy_server], '*', False)
def test_build_requirement_from_name_single_version(galaxy_server, monkeypatch): def test_build_requirement_from_name_single_version(galaxy_server, monkeypatch):
json_str = artifact_json('namespace', 'collection', '2.0.0', {}, galaxy_server.api_server) mock_get_info = MagicMock()
mock_open = MagicMock() mock_get_info.return_value = api.CollectionVersionMetadata('namespace', 'collection', '2.0.0', None, None,
mock_open.return_value = StringIO(json_str) {})
monkeypatch.setattr(galaxy_server, 'get_collection_version_metadata', mock_get_info)
monkeypatch.setattr(collection, 'open_url', mock_open)
mock_avail_ver = MagicMock()
mock_avail_ver.return_value = {'v2': '/api/v2'}
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '2.0.0', True, True) actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '2.0.0', True,
True)
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
@ -526,24 +426,19 @@ def test_build_requirement_from_name_single_version(galaxy_server, monkeypatch):
assert actual.latest_version == u'2.0.0' assert actual.latest_version == u'2.0.0'
assert actual.dependencies == {} assert actual.dependencies == {}
assert mock_open.call_count == 1 assert mock_get_info.call_count == 1
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/2.0.0/" \ assert mock_get_info.mock_calls[0][1] == ('namespace', 'collection', '2.0.0')
% galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
def test_build_requirement_from_name_multiple_versions_one_match(galaxy_server, monkeypatch): def test_build_requirement_from_name_multiple_versions_one_match(galaxy_server, monkeypatch):
json_str1 = artifact_versions_json('namespace', 'collection', ['2.0.0', '2.0.1', '2.0.2'], mock_get_versions = MagicMock()
galaxy_server) mock_get_versions.return_value = ['2.0.0', '2.0.1', '2.0.2']
json_str2 = artifact_json('namespace', 'collection', '2.0.1', {}, galaxy_server.api_server) monkeypatch.setattr(galaxy_server, 'get_collection_versions', mock_get_versions)
mock_open = MagicMock()
mock_open.side_effect = (StringIO(json_str1), StringIO(json_str2))
monkeypatch.setattr(collection, 'open_url', mock_open) mock_get_info = MagicMock()
mock_get_info.return_value = api.CollectionVersionMetadata('namespace', 'collection', '2.0.1', None, None,
mock_avail_ver = MagicMock() {})
mock_avail_ver.return_value = {'v2': '/api/v2'} monkeypatch.setattr(galaxy_server, 'get_collection_version_metadata', mock_get_info)
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '>=2.0.1,<2.0.2', actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '>=2.0.1,<2.0.2',
True, True) True, True)
@ -557,62 +452,17 @@ def test_build_requirement_from_name_multiple_versions_one_match(galaxy_server,
assert actual.latest_version == u'2.0.1' assert actual.latest_version == u'2.0.1'
assert actual.dependencies == {} assert actual.dependencies == {}
assert mock_open.call_count == 2 assert mock_get_versions.call_count == 1
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" \ assert mock_get_versions.mock_calls[0][1] == ('namespace', 'collection')
% galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
assert mock_open.mock_calls[1][1][0] == u"%s/api/v2/collections/namespace/collection/versions/2.0.1/" \
% galaxy_server.api_server
assert mock_open.mock_calls[1][2] == {'validate_certs': True, "headers": {}}
assert mock_get_info.call_count == 1
assert mock_get_info.mock_calls[0][1] == ('namespace', 'collection', '2.0.1')
def test_build_requirement_from_name_multiple_version_results(galaxy_server, monkeypatch):
json_str1 = json.dumps({
'count': 6,
'next': '%s/api/v2/collections/namespace/collection/versions/?page=2' % galaxy_server.api_server,
'previous': None,
'results': [
{
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.0/' % galaxy_server.api_server,
'version': '2.0.0',
},
{
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.1/' % galaxy_server.api_server,
'version': '2.0.1',
},
{
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.2/' % galaxy_server.api_server,
'version': '2.0.2',
},
]
})
json_str2 = json.dumps({
'count': 6,
'next': None,
'previous': '%s/api/v2/collections/namespace/collection/versions/?page=1' % galaxy_server.api_server,
'results': [
{
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.3/' % galaxy_server.api_server,
'version': '2.0.3',
},
{
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.4/' % galaxy_server.api_server,
'version': '2.0.4',
},
{
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.5/' % galaxy_server.api_server,
'version': '2.0.5',
},
]
})
mock_open = MagicMock()
mock_open.side_effect = (StringIO(to_text(json_str1)), StringIO(to_text(json_str2)))
monkeypatch.setattr(collection, 'open_url', mock_open) def test_build_requirement_from_name_multiple_version_results(galaxy_server, monkeypatch):
mock_get_versions = MagicMock()
mock_avail_ver = MagicMock() mock_get_versions.return_value = ['2.0.0', '2.0.1', '2.0.2', '2.0.3', '2.0.4', '2.0.5']
mock_avail_ver.return_value = {'v2': '/api/v2'} monkeypatch.setattr(galaxy_server, 'get_collection_versions', mock_get_versions)
monkeypatch.setattr(collection, 'get_available_api_versions', mock_avail_ver)
actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '!=2.0.2', actual = collection.CollectionRequirement.from_name('namespace.collection', [galaxy_server], '!=2.0.2',
True, True) True, True)
@ -626,13 +476,8 @@ def test_build_requirement_from_name_multiple_version_results(galaxy_server, mon
assert actual.latest_version == u'2.0.5' assert actual.latest_version == u'2.0.5'
assert actual.dependencies is None assert actual.dependencies is None
assert mock_open.call_count == 2 assert mock_get_versions.call_count == 1
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" \ assert mock_get_versions.mock_calls[0][1] == ('namespace', 'collection')
% galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
assert mock_open.mock_calls[1][1][0] == u"%s/api/v2/collections/namespace/collection/versions/?page=2" \
% galaxy_server.api_server
assert mock_open.mock_calls[1][2] == {'validate_certs': True, "headers": {}}
@pytest.mark.parametrize('versions, requirement, expected_filter, expected_latest', [ @pytest.mark.parametrize('versions, requirement, expected_filter, expected_latest', [
@ -767,14 +612,10 @@ def test_install_collection_with_download(galaxy_server, collection_artifact, mo
temp_path = os.path.join(os.path.split(collection_tar)[0], b'temp') temp_path = os.path.join(os.path.split(collection_tar)[0], b'temp')
os.makedirs(temp_path) os.makedirs(temp_path)
meta = api.CollectionVersionMetadata('ansible_namespace', 'collection', '0.1.0', 'https://downloadme.com',
'myhash', {})
req = collection.CollectionRequirement('ansible_namespace', 'collection', None, galaxy_server, req = collection.CollectionRequirement('ansible_namespace', 'collection', None, galaxy_server,
['0.1.0'], '*', False) ['0.1.0'], '*', False, metadata=meta)
req._galaxy_info = {
'download_url': 'https://downloadme.com',
'artifact': {
'sha256': 'myhash',
},
}
req.install(to_text(output_path), temp_path) req.install(to_text(output_path), temp_path)
# Ensure the temp directory is empty, nothing is left behind # Ensure the temp directory is empty, nothing is left behind

Loading…
Cancel
Save