mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
548 lines
23 KiB
Python
548 lines
23 KiB
Python
# (C) 2013, James Cammarata <jcammarata@ansible.com>
|
|
# Copyright: (c) 2019, Ansible Project
|
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import json
|
|
import os
|
|
import tarfile
|
|
import uuid
|
|
import time
|
|
|
|
from ansible import context
|
|
from ansible.errors import AnsibleError
|
|
from ansible.module_utils.six import string_types
|
|
from ansible.module_utils.six.moves.urllib.error import HTTPError
|
|
from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode
|
|
from ansible.module_utils._text import to_bytes, to_native, to_text
|
|
from ansible.module_utils.urls import open_url
|
|
from ansible.utils.display import Display
|
|
from ansible.utils.hashing import secure_hash_s
|
|
|
|
display = Display()
|
|
|
|
|
|
def g_connect(versions):
|
|
"""
|
|
Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the
|
|
endpoint.
|
|
|
|
:param versions: A list of API versions that the function supports.
|
|
"""
|
|
def decorator(method):
|
|
def wrapped(self, *args, **kwargs):
|
|
if not self._available_api_versions:
|
|
display.vvvv("Initial connection to galaxy_server: %s" % self.api_server)
|
|
|
|
# Determine the type of Galaxy server we are talking to. First try it unauthenticated then with Bearer
|
|
# auth for Automation Hub.
|
|
n_url = self.api_server
|
|
error_context_msg = 'Error when finding available api versions from %s (%s)' % (self.name, n_url)
|
|
|
|
if self.api_server == 'https://galaxy.ansible.com' or self.api_server == 'https://galaxy.ansible.com/':
|
|
n_url = 'https://galaxy.ansible.com/api/'
|
|
|
|
try:
|
|
data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg)
|
|
except (AnsibleError, GalaxyError, ValueError, KeyError):
|
|
# Either the URL doesnt exist, or other error. Or the URL exists, but isn't a galaxy API
|
|
# root (not JSON, no 'available_versions') so try appending '/api/'
|
|
n_url = _urljoin(n_url, '/api/')
|
|
|
|
# let exceptions here bubble up
|
|
data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg)
|
|
if 'available_versions' not in data:
|
|
raise AnsibleError("Tried to find galaxy API root at %s but no 'available_versions' are available on %s"
|
|
% (n_url, self.api_server))
|
|
|
|
# Default to only supporting v1, if only v1 is returned we also assume that v2 is available even though
|
|
# it isn't returned in the available_versions dict.
|
|
available_versions = data.get('available_versions', {u'v1': u'v1/'})
|
|
if list(available_versions.keys()) == [u'v1']:
|
|
available_versions[u'v2'] = u'v2/'
|
|
|
|
self._available_api_versions = available_versions
|
|
display.vvvv("Found API version '%s' with Galaxy server %s (%s)"
|
|
% (', '.join(available_versions.keys()), self.name, self.api_server))
|
|
|
|
# Verify that the API versions the function works with are available on the server specified.
|
|
available_versions = set(self._available_api_versions.keys())
|
|
common_versions = set(versions).intersection(available_versions)
|
|
if not common_versions:
|
|
raise AnsibleError("Galaxy action %s requires API versions '%s' but only '%s' are available on %s %s"
|
|
% (method.__name__, ", ".join(versions), ", ".join(available_versions),
|
|
self.name, self.api_server))
|
|
|
|
return method(self, *args, **kwargs)
|
|
return wrapped
|
|
return decorator
|
|
|
|
|
|
def _urljoin(*args):
|
|
return '/'.join(to_native(a, errors='surrogate_or_strict').strip('/') for a in args + ('',) if a)
|
|
|
|
|
|
class GalaxyError(AnsibleError):
|
|
""" Error for bad Galaxy server responses. """
|
|
|
|
def __init__(self, http_error, message):
|
|
super(GalaxyError, self).__init__(message)
|
|
self.http_code = http_error.code
|
|
self.url = http_error.geturl()
|
|
|
|
try:
|
|
http_msg = to_text(http_error.read())
|
|
err_info = json.loads(http_msg)
|
|
except (AttributeError, ValueError):
|
|
err_info = {}
|
|
|
|
url_split = self.url.split('/')
|
|
if 'v2' in url_split:
|
|
galaxy_msg = err_info.get('message', 'Unknown error returned by Galaxy server.')
|
|
code = err_info.get('code', 'Unknown')
|
|
full_error_msg = u"%s (HTTP Code: %d, Message: %s Code: %s)" % (message, self.http_code, galaxy_msg, code)
|
|
elif 'v3' in url_split:
|
|
errors = err_info.get('errors', [])
|
|
if not errors:
|
|
errors = [{}] # Defaults are set below, we just need to make sure 1 error is present.
|
|
|
|
message_lines = []
|
|
for error in errors:
|
|
error_msg = error.get('detail') or error.get('title') or 'Unknown error returned by Galaxy server.'
|
|
error_code = error.get('code') or 'Unknown'
|
|
message_line = u"(HTTP Code: %d, Message: %s Code: %s)" % (self.http_code, error_msg, error_code)
|
|
message_lines.append(message_line)
|
|
|
|
full_error_msg = "%s %s" % (message, ', '.join(message_lines))
|
|
else:
|
|
# v1 and unknown API endpoints
|
|
galaxy_msg = err_info.get('default', 'Unknown error returned by Galaxy server.')
|
|
full_error_msg = u"%s (HTTP Code: %d, Message: %s)" % (message, self.http_code, galaxy_msg)
|
|
|
|
self.message = to_native(full_error_msg)
|
|
|
|
|
|
class CollectionVersionMetadata:
|
|
|
|
def __init__(self, namespace, name, version, download_url, artifact_sha256, dependencies):
|
|
"""
|
|
Contains common information about a collection on a Galaxy server to smooth through API differences for
|
|
Collection and define a standard meta info for a collection.
|
|
|
|
:param namespace: The namespace name.
|
|
:param name: The collection name.
|
|
:param version: The version that the metadata refers to.
|
|
:param download_url: The URL to download the collection.
|
|
:param artifact_sha256: The SHA256 of the collection artifact for later verification.
|
|
:param dependencies: A dict of dependencies of the collection.
|
|
"""
|
|
self.namespace = namespace
|
|
self.name = name
|
|
self.version = version
|
|
self.download_url = download_url
|
|
self.artifact_sha256 = artifact_sha256
|
|
self.dependencies = dependencies
|
|
|
|
|
|
class GalaxyAPI:
|
|
""" This class is meant to be used as a API client for an Ansible Galaxy server """
|
|
|
|
def __init__(self, galaxy, name, url, username=None, password=None, token=None):
|
|
self.galaxy = galaxy
|
|
self.name = name
|
|
self.username = username
|
|
self.password = password
|
|
self.token = token
|
|
self.api_server = url
|
|
self.validate_certs = not context.CLIARGS['ignore_certs']
|
|
self._available_api_versions = {}
|
|
|
|
display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs))
|
|
|
|
@property
|
|
@g_connect(['v1', 'v2', 'v3'])
|
|
def available_api_versions(self):
|
|
# Calling g_connect will populate self._available_api_versions
|
|
return self._available_api_versions
|
|
|
|
def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None):
|
|
headers = headers or {}
|
|
self._add_auth_token(headers, url, required=auth_required)
|
|
|
|
try:
|
|
display.vvvv("Calling Galaxy at %s" % url)
|
|
resp = open_url(to_native(url), data=args, validate_certs=self.validate_certs, headers=headers,
|
|
method=method, timeout=20)
|
|
except HTTPError as e:
|
|
raise GalaxyError(e, error_context_msg)
|
|
except Exception as e:
|
|
raise AnsibleError("Unknown error when attempting to call Galaxy at '%s': %s" % (url, to_native(e)))
|
|
|
|
resp_data = to_text(resp.read(), errors='surrogate_or_strict')
|
|
try:
|
|
data = json.loads(resp_data)
|
|
except ValueError:
|
|
raise AnsibleError("Failed to parse Galaxy response from '%s' as JSON:\n%s"
|
|
% (resp.url, to_native(resp_data)))
|
|
|
|
return data
|
|
|
|
def _add_auth_token(self, headers, url, token_type=None, required=False):
|
|
# Don't add the auth token if one is already present
|
|
if 'Authorization' in headers:
|
|
return
|
|
|
|
if not self.token and required:
|
|
raise AnsibleError("No access token or username set. A token can be set with --api-key, with "
|
|
"'ansible-galaxy login', or set in ansible.cfg.")
|
|
|
|
if self.token:
|
|
headers.update(self.token.headers())
|
|
|
|
@g_connect(['v1'])
|
|
def authenticate(self, github_token):
|
|
"""
|
|
Retrieve an authentication token
|
|
"""
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "tokens") + '/'
|
|
args = urlencode({"github_token": github_token})
|
|
resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST")
|
|
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
|
|
return data
|
|
|
|
@g_connect(['v1'])
|
|
def create_import_task(self, github_user, github_repo, reference=None, role_name=None):
|
|
"""
|
|
Post an import request
|
|
"""
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports") + '/'
|
|
args = {
|
|
"github_user": github_user,
|
|
"github_repo": github_repo,
|
|
"github_reference": reference if reference else ""
|
|
}
|
|
if role_name:
|
|
args['alternate_role_name'] = role_name
|
|
elif github_repo.startswith('ansible-role'):
|
|
args['alternate_role_name'] = github_repo[len('ansible-role') + 1:]
|
|
data = self._call_galaxy(url, args=urlencode(args), method="POST")
|
|
if data.get('results', None):
|
|
return data['results']
|
|
return data
|
|
|
|
@g_connect(['v1'])
|
|
def get_import_task(self, task_id=None, github_user=None, github_repo=None):
|
|
"""
|
|
Check the status of an import task.
|
|
"""
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports")
|
|
if task_id is not None:
|
|
url = "%s?id=%d" % (url, task_id)
|
|
elif github_user is not None and github_repo is not None:
|
|
url = "%s?github_user=%s&github_repo=%s" % (url, github_user, github_repo)
|
|
else:
|
|
raise AnsibleError("Expected task_id or github_user and github_repo")
|
|
|
|
data = self._call_galaxy(url)
|
|
return data['results']
|
|
|
|
@g_connect(['v1'])
|
|
def lookup_role_by_name(self, role_name, notify=True):
|
|
"""
|
|
Find a role by name.
|
|
"""
|
|
role_name = to_text(urlquote(to_bytes(role_name)))
|
|
|
|
try:
|
|
parts = role_name.split(".")
|
|
user_name = ".".join(parts[0:-1])
|
|
role_name = parts[-1]
|
|
if notify:
|
|
display.display("- downloading role '%s', owned by %s" % (role_name, user_name))
|
|
except Exception:
|
|
raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
|
|
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles",
|
|
"?owner__username=%s&name=%s" % (user_name, role_name))
|
|
data = self._call_galaxy(url)
|
|
if len(data["results"]) != 0:
|
|
return data["results"][0]
|
|
return None
|
|
|
|
@g_connect(['v1'])
|
|
def fetch_role_related(self, related, role_id):
|
|
"""
|
|
Fetch the list of related items for the given role.
|
|
The url comes from the 'related' field of the role.
|
|
"""
|
|
|
|
results = []
|
|
try:
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles", role_id, related,
|
|
"?page_size=50")
|
|
data = self._call_galaxy(url)
|
|
results = data['results']
|
|
done = (data.get('next_link', None) is None)
|
|
while not done:
|
|
url = _urljoin(self.api_server, data['next_link'])
|
|
data = self._call_galaxy(url)
|
|
results += data['results']
|
|
done = (data.get('next_link', None) is None)
|
|
except Exception as e:
|
|
display.vvvv("Unable to retrive role (id=%s) data (%s), but this is not fatal so we continue: %s"
|
|
% (role_id, related, to_text(e)))
|
|
return results
|
|
|
|
@g_connect(['v1'])
|
|
def get_list(self, what):
|
|
"""
|
|
Fetch the list of items specified.
|
|
"""
|
|
try:
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], what, "?page_size")
|
|
data = self._call_galaxy(url)
|
|
if "results" in data:
|
|
results = data['results']
|
|
else:
|
|
results = data
|
|
done = True
|
|
if "next" in data:
|
|
done = (data.get('next_link', None) is None)
|
|
while not done:
|
|
url = _urljoin(self.api_server, data['next_link'])
|
|
data = self._call_galaxy(url)
|
|
results += data['results']
|
|
done = (data.get('next_link', None) is None)
|
|
return results
|
|
except Exception as error:
|
|
raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error)))
|
|
|
|
@g_connect(['v1'])
|
|
def search_roles(self, search, **kwargs):
|
|
|
|
search_url = _urljoin(self.api_server, self.available_api_versions['v1'], "search", "roles", "?")
|
|
|
|
if search:
|
|
search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search)))
|
|
|
|
tags = kwargs.get('tags', None)
|
|
platforms = kwargs.get('platforms', None)
|
|
page_size = kwargs.get('page_size', None)
|
|
author = kwargs.get('author', None)
|
|
|
|
if tags and isinstance(tags, string_types):
|
|
tags = tags.split(',')
|
|
search_url += '&tags_autocomplete=' + '+'.join(tags)
|
|
|
|
if platforms and isinstance(platforms, string_types):
|
|
platforms = platforms.split(',')
|
|
search_url += '&platforms_autocomplete=' + '+'.join(platforms)
|
|
|
|
if page_size:
|
|
search_url += '&page_size=%s' % page_size
|
|
|
|
if author:
|
|
search_url += '&username_autocomplete=%s' % author
|
|
|
|
data = self._call_galaxy(search_url)
|
|
return data
|
|
|
|
@g_connect(['v1'])
|
|
def add_secret(self, source, github_user, github_repo, secret):
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets") + '/'
|
|
args = urlencode({
|
|
"source": source,
|
|
"github_user": github_user,
|
|
"github_repo": github_repo,
|
|
"secret": secret
|
|
})
|
|
data = self._call_galaxy(url, args=args, method="POST")
|
|
return data
|
|
|
|
@g_connect(['v1'])
|
|
def list_secrets(self):
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets")
|
|
data = self._call_galaxy(url, auth_required=True)
|
|
return data
|
|
|
|
@g_connect(['v1'])
|
|
def remove_secret(self, secret_id):
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets", secret_id) + '/'
|
|
data = self._call_galaxy(url, auth_required=True, method='DELETE')
|
|
return data
|
|
|
|
@g_connect(['v1'])
|
|
def delete_role(self, github_user, github_repo):
|
|
url = _urljoin(self.api_server, self.available_api_versions['v1'], "removerole",
|
|
"?github_user=%s&github_repo=%s" % (github_user, github_repo))
|
|
data = self._call_galaxy(url, auth_required=True, method='DELETE')
|
|
return data
|
|
|
|
# Collection APIs #
|
|
|
|
@g_connect(['v2', 'v3'])
|
|
def publish_collection(self, collection_path):
|
|
"""
|
|
Publishes a collection to a Galaxy server and returns the import task URI.
|
|
|
|
:param collection_path: The path to the collection tarball to publish.
|
|
:return: The import task URI that contains the import results.
|
|
"""
|
|
display.display("Publishing collection artifact '%s' to %s %s" % (collection_path, self.name, self.api_server))
|
|
|
|
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
|
|
if not os.path.exists(b_collection_path):
|
|
raise AnsibleError("The collection path specified '%s' does not exist." % to_native(collection_path))
|
|
elif not tarfile.is_tarfile(b_collection_path):
|
|
raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection "
|
|
"build' to create a proper release artifact." % to_native(collection_path))
|
|
|
|
with open(b_collection_path, 'rb') as collection_tar:
|
|
data = collection_tar.read()
|
|
|
|
boundary = '--------------------------%s' % uuid.uuid4().hex
|
|
b_file_name = os.path.basename(b_collection_path)
|
|
part_boundary = b"--" + to_bytes(boundary, errors='surrogate_or_strict')
|
|
|
|
form = [
|
|
part_boundary,
|
|
b"Content-Disposition: form-data; name=\"sha256\"",
|
|
to_bytes(secure_hash_s(data), errors='surrogate_or_strict'),
|
|
part_boundary,
|
|
b"Content-Disposition: file; name=\"file\"; filename=\"%s\"" % b_file_name,
|
|
b"Content-Type: application/octet-stream",
|
|
b"",
|
|
data,
|
|
b"%s--" % part_boundary,
|
|
]
|
|
data = b"\r\n".join(form)
|
|
|
|
headers = {
|
|
'Content-type': 'multipart/form-data; boundary=%s' % boundary,
|
|
'Content-length': len(data),
|
|
}
|
|
|
|
if 'v3' in self.available_api_versions:
|
|
n_url = _urljoin(self.api_server, self.available_api_versions['v3'], 'artifacts', 'collections') + '/'
|
|
else:
|
|
n_url = _urljoin(self.api_server, self.available_api_versions['v2'], 'collections') + '/'
|
|
|
|
resp = self._call_galaxy(n_url, args=data, headers=headers, method='POST', auth_required=True,
|
|
error_context_msg='Error when publishing collection to %s (%s)'
|
|
% (self.name, self.api_server))
|
|
return resp['task']
|
|
|
|
@g_connect(['v2', 'v3'])
|
|
def wait_import_task(self, task_url, timeout=0):
|
|
"""
|
|
Waits until the import process on the Galaxy server has completed or the timeout is reached.
|
|
|
|
:param task_url: The full URI of the import task to wait for, this is returned by publish_collection.
|
|
:param timeout: The timeout in seconds, 0 is no timeout.
|
|
"""
|
|
# TODO: actually verify that v3 returns the same structure as v2, right now this is just an assumption.
|
|
state = 'waiting'
|
|
data = None
|
|
|
|
display.display("Waiting until Galaxy import task %s has completed" % task_url)
|
|
start = time.time()
|
|
wait = 2
|
|
|
|
while timeout == 0 or (time.time() - start) < timeout:
|
|
data = self._call_galaxy(task_url, method='GET', auth_required=True,
|
|
error_context_msg='Error when getting import task results at %s' % task_url)
|
|
|
|
state = data.get('state', 'waiting')
|
|
|
|
if data.get('finished_at', None):
|
|
break
|
|
|
|
display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again'
|
|
% (state, wait))
|
|
time.sleep(wait)
|
|
|
|
# poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds.
|
|
wait = min(30, wait * 1.5)
|
|
if state == 'waiting':
|
|
raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'"
|
|
% to_native(task_url))
|
|
|
|
for message in data.get('messages', []):
|
|
level = message['level']
|
|
if level == 'error':
|
|
display.error("Galaxy import error message: %s" % message['message'])
|
|
elif level == 'warning':
|
|
display.warning("Galaxy import warning message: %s" % message['message'])
|
|
else:
|
|
display.vvv("Galaxy import message: %s - %s" % (level, message['message']))
|
|
|
|
if state == 'failed':
|
|
code = to_native(data['error'].get('code', 'UNKNOWN'))
|
|
description = to_native(
|
|
data['error'].get('description', "Unknown error, see %s for more details" % task_url))
|
|
raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code))
|
|
|
|
@g_connect(['v2', 'v3'])
|
|
def get_collection_version_metadata(self, namespace, name, version):
|
|
"""
|
|
Gets the collection information from the Galaxy server about a specific Collection version.
|
|
|
|
:param namespace: The collection namespace.
|
|
:param name: The collection name.
|
|
:param version: Optional version of the collection to get the information for.
|
|
:return: CollectionVersionMetadata about the collection at the version requested.
|
|
"""
|
|
api_path = self.available_api_versions.get('v3', self.available_api_versions.get('v2'))
|
|
url_paths = [self.api_server, api_path, 'collections', namespace, name, 'versions', version]
|
|
|
|
n_collection_url = _urljoin(*url_paths)
|
|
error_context_msg = 'Error when getting collection version metadata for %s.%s:%s from %s (%s)' \
|
|
% (namespace, name, version, self.name, self.api_server)
|
|
data = self._call_galaxy(n_collection_url, error_context_msg=error_context_msg)
|
|
|
|
return CollectionVersionMetadata(data['namespace']['name'], data['collection']['name'], data['version'],
|
|
data['download_url'], data['artifact']['sha256'],
|
|
data['metadata']['dependencies'])
|
|
|
|
@g_connect(['v2', 'v3'])
|
|
def get_collection_versions(self, namespace, name):
|
|
"""
|
|
Gets a list of available versions for a collection on a Galaxy server.
|
|
|
|
:param namespace: The collection namespace.
|
|
:param name: The collection name.
|
|
:return: A list of versions that are available.
|
|
"""
|
|
if 'v3' in self.available_api_versions:
|
|
api_path = self.available_api_versions['v3']
|
|
results_key = 'data'
|
|
pagination_path = ['links', 'next']
|
|
else:
|
|
api_path = self.available_api_versions['v2']
|
|
results_key = 'results'
|
|
pagination_path = ['next']
|
|
|
|
n_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions')
|
|
|
|
error_context_msg = 'Error when getting available collection versions for %s.%s from %s (%s)' \
|
|
% (namespace, name, self.name, self.api_server)
|
|
data = self._call_galaxy(n_url, error_context_msg=error_context_msg)
|
|
|
|
versions = []
|
|
while True:
|
|
versions += [v['version'] for v in data[results_key]]
|
|
|
|
next_link = data
|
|
for path in pagination_path:
|
|
next_link = next_link.get(path, {})
|
|
|
|
if not next_link:
|
|
break
|
|
|
|
data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'),
|
|
error_context_msg=error_context_msg)
|
|
|
|
return versions
|