ansible-galaxy - define multiple galaxy instances in ansible.cfg (#60553)

* ansible-galaxy: support multiple servers on install

* Added docs for the server configuration file

* Fix up doc string for requirements file format

* Fix bugs after testing

* Fix kwarg doc and added version

* Fix typo and doc improvement

* Fix base64 encoding and allow --server to override list
pull/60986/head
Jordan Borean 5 years ago committed by GitHub
parent 4d424d0830
commit e747487720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -381,6 +381,74 @@ You can also setup a ``requirements.yml`` file to install multiple collections i
The ``version`` key can take in the same range identifier format documented above. The ``version`` key can take in the same range identifier format documented above.
Roles can also be specified and placed under the ``roles`` key. The values follow the same format as a requirements
file used in older Ansible releases.
.. note::
While both roles and collections can be specified in one requirements file, they need to be installed separately.
The ``ansible-galaxy role install -r requirements.yml`` will only install roles and
``ansible-galaxy collection install -r requirements.yml -p ./`` will only install collections.
.. _galaxy_server_config:
Galaxy server configuration list
--------------------------------
By default running ``ansible-galaxy`` will use the :ref:`galaxy_server` config value or the ``--server`` command line
argument when it performs an action against a Galaxy server. The ``ansible-galaxy collection install`` supports
installing collections from multiple servers as defined in the :ref:`ansible_configuration_settings_locations` file
using the :ref:`galaxy_server_list` configuration option. To define multiple Galaxy servers you have to create the
following entries like so:
.. code-block:: ini
[galaxy]
server_list = my_org_hub, release_galaxy, test_galaxy
[galaxy_server.my_org_hub]
url=https://automation.my_org/
username=my_user
password=my_pass
[galaxy_server.release_galaxy]
url=https://galaxy.ansible.com/
token=my_token
[galaxy_server.test_galaxy]
url=https://galaxy-dev.ansible.com/
token=my_token
.. note::
You can use the ``--server`` command line argument to select an explicit Galaxy server in the ``server_list`` and
the value of this arg should match the name of the server. If the value of ``--server`` is not a pre-defined server
in ``ansible.cfg`` then the value specified will be the URL used to access that server and all pre-defined servers
are ignored. Also the ``--api-key`` argument is not applied to any of the pre-defined servers, it is only applied
if no server list is defined or a URL was specified by ``--server``.
The :ref:`galaxy_server_list` option is a list of server identifiers in a prioritized order. When searching for a
collection, the install process will search in that order, e.g. ``my_org_hub`` first, then ``release_galaxy``, and
finally ``test_galaxy`` until the collection is found. The actual Galaxy instance is then defined under the section
``[galaxy_server.{{ id }}]`` where ``{{ id }}`` is the server identifier defined in the list. This section can then
define the following keys:
* ``url``: The URL of the galaxy instance to connect to, this is required.
* ``token``: A token key to use for authentication against the Galaxy instance, this is mutually exclusive with ``username``
* ``username``: The username to use for basic authentication against the Galaxy instance, this is mutually exclusive with ``token``
* ``password``: The password to use for basic authentication
As well as being defined in the ``ansible.cfg`` file, these server options can be defined as an environment variable.
The environment variable is in the form ``ANSIBLE_GALAXY_SERVER_{{ id }}_{{ key }}`` where ``{{ id }}`` is the upper
case form of the server identifier and ``{{ key }}`` is the key to define. For example I can define ``token`` for
``release_galaxy`` by setting ``ANSIBLE_GALAXY_SERVER_RELEASE_GALAXY_TOKEN=secret_token``.
For operations where only one Galaxy server is used, i.e. ``publish``, ``info``, ``login`` then the first entry in the
``server_list`` is used unless an explicit server was passed in as a command line argument.
.. note::
Once a collection is found, any of its requirements are only searched within the same Galaxy instance as the parent
collection. The install process will not search for a collection requirement in a different Galaxy instance.
Using collections Using collections
================= =================

@ -25,7 +25,7 @@ Playbook
Command Line Command Line
============ ============
No notable changes * The location of the Galaxy token file has changed from ``~/.ansible_galaxy`` to ``~/.ansible/galaxy_token``. You can configure both path and file name with the :ref:`galaxy_token_path` config.
Deprecated Deprecated

@ -13,6 +13,7 @@ import time
import yaml import yaml
from jinja2 import BaseLoader, Environment, FileSystemLoader from jinja2 import BaseLoader, Environment, FileSystemLoader
from yaml.error import YAMLError
import ansible.constants as C import ansible.constants as C
from ansible import context from ansible import context
@ -21,13 +22,14 @@ from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleError, AnsibleOptionsError from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.galaxy import Galaxy, get_collections_galaxy_meta_info from ansible.galaxy import Galaxy, get_collections_galaxy_meta_info
from ansible.galaxy.api import GalaxyAPI from ansible.galaxy.api import GalaxyAPI
from ansible.galaxy.collection import build_collection, install_collections, parse_collections_requirements_file, \ from ansible.galaxy.collection import build_collection, install_collections, publish_collection, \
publish_collection, validate_collection_name validate_collection_name
from ansible.galaxy.login import GalaxyLogin from ansible.galaxy.login import GalaxyLogin
from ansible.galaxy.role import GalaxyRole from ansible.galaxy.role import GalaxyRole
from ansible.galaxy.token import GalaxyToken from ansible.galaxy.token import GalaxyToken, NoTokenSentinel
from ansible.module_utils.ansible_release import __version__ as ansible_version from ansible.module_utils.ansible_release import __version__ as ansible_version
from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.parsing.yaml.loader import AnsibleLoader
from ansible.playbook.role.requirement import RoleRequirement from ansible.playbook.role.requirement import RoleRequirement
from ansible.utils.display import Display from ansible.utils.display import Display
from ansible.utils.plugin_docs import get_versioned_doclink from ansible.utils.plugin_docs import get_versioned_doclink
@ -48,7 +50,7 @@ class GalaxyCLI(CLI):
idx = 2 if args[1].startswith('-v') else 1 idx = 2 if args[1].startswith('-v') else 1
args.insert(idx, 'role') args.insert(idx, 'role')
self.api = None self.api_servers = []
self.galaxy = None self.galaxy = None
super(GalaxyCLI, self).__init__(args) super(GalaxyCLI, self).__init__(args)
@ -61,8 +63,11 @@ class GalaxyCLI(CLI):
# Common arguments that apply to more than 1 action # Common arguments that apply to more than 1 action
common = opt_help.argparse.ArgumentParser(add_help=False) common = opt_help.argparse.ArgumentParser(add_help=False)
common.add_argument('-s', '--server', dest='api_server', default=C.GALAXY_SERVER, common.add_argument('-s', '--server', dest='api_server', help='The Galaxy API server URL')
help='The Galaxy API server URL') common.add_argument('--api-key', dest='api_key',
help='The Ansible Galaxy API key which can be found at '
'https://galaxy.ansible.com/me/preferences. You can also use ansible-galaxy login to '
'retrieve this key or set the token for the GALAXY_SERVER_LIST entry.')
common.add_argument('-c', '--ignore-certs', action='store_true', dest='ignore_certs', common.add_argument('-c', '--ignore-certs', action='store_true', dest='ignore_certs',
default=C.GALAXY_IGNORE_CERTS, help='Ignore SSL certificate validation errors.') default=C.GALAXY_IGNORE_CERTS, help='Ignore SSL certificate validation errors.')
opt_help.add_verbosity_options(common) opt_help.add_verbosity_options(common)
@ -278,10 +283,6 @@ class GalaxyCLI(CLI):
publish_parser.add_argument('args', metavar='collection_path', publish_parser.add_argument('args', metavar='collection_path',
help='The path to the collection tarball to publish.') help='The path to the collection tarball to publish.')
publish_parser.add_argument('--api-key', dest='api_key',
help='The Ansible Galaxy API key which can be found at '
'https://galaxy.ansible.com/me/preferences. You can also use ansible-galaxy '
'login to retrieve this key.')
publish_parser.add_argument('--no-wait', dest='wait', action='store_false', default=True, publish_parser.add_argument('--no-wait', dest='wait', action='store_false', default=True,
help="Don't wait for import validation results.") help="Don't wait for import validation results.")
@ -296,9 +297,166 @@ class GalaxyCLI(CLI):
self.galaxy = Galaxy() self.galaxy = Galaxy()
self.api = GalaxyAPI(self.galaxy) def server_config_def(section, key, required):
return {
'description': 'The %s of the %s Galaxy server' % (key, section),
'ini': [
{
'section': 'galaxy_server.%s' % section,
'key': key,
}
],
'environment': [
{'name': 'ANSIBLE_GALAXY_SERVER_%s_%s' % (section.upper(), key.upper())},
],
'required': required,
}
server_def = [('url', True), ('username', False), ('password', False), ('token', False)]
config_servers = []
for server_key in (C.GALAXY_SERVER_LIST or []):
# Config definitions are looked up dynamically based on the C.GALAXY_SERVER_LIST entry. We look up the
# section [galaxy_server.<server>] for the values url, username, password, and token.
config_dict = dict((k, server_config_def(server_key, k, req)) for k, req in server_def)
defs = AnsibleLoader(yaml.safe_dump(config_dict)).get_single_data()
C.config.initialize_plugin_configuration_definitions('galaxy_server', server_key, defs)
server_options = C.config.get_plugin_options('galaxy_server', server_key)
token_val = server_options['token'] or NoTokenSentinel
server_options['token'] = GalaxyToken(token=token_val)
config_servers.append(GalaxyAPI(self.galaxy, server_key, **server_options))
cmd_server = context.CLIARGS['api_server']
cmd_token = GalaxyToken(token=context.CLIARGS['api_key'])
if cmd_server:
# Cmd args take precedence over the config entry but fist check if the arg was a name and use that config
# entry, otherwise create a new API entry for the server specified.
config_server = next((s for s in config_servers if s.name == cmd_server), None)
if config_server:
self.api_servers.append(config_server)
else:
self.api_servers.append(GalaxyAPI(self.galaxy, 'cmd_arg', cmd_server, token=cmd_token))
else:
self.api_servers = config_servers
# Default to C.GALAXY_SERVER if no servers were defined
if len(self.api_servers) == 0:
self.api_servers.append(GalaxyAPI(self.galaxy, 'default', C.GALAXY_SERVER, token=cmd_token))
context.CLIARGS['func']() context.CLIARGS['func']()
@property
def api(self):
return self.api_servers[0]
def _parse_requirements_file(self, requirements_file, allow_old_format=True):
"""
Parses an Ansible requirement.yml file and returns all the roles and/or collections defined in it. There are 2
requirements file format:
# v1 (roles only)
- src: The source of the role, required if include is not set. Can be Galaxy role name, URL to a SCM repo or tarball.
name: Downloads the role to the specified name, defaults to Galaxy name from Galaxy or name of repo if src is a URL.
scm: If src is a URL, specify the SCM. Only git or hd are supported and defaults ot git.
version: The version of the role to download. Can also be tag, commit, or branch name and defaults to master.
include: Path to additional requirements.yml files.
# v2 (roles and collections)
---
roles:
# Same as v1 format just under the roles key
collections:
- namespace.collection
- name: namespace.collection
version: version identifier, multiple identifiers are separated by ','
source: the URL or a predefined source name that relates to C.GALAXY_SERVER_LIST
:param requirements_file: The path to the requirements file.
:param allow_old_format: Will fail if a v1 requirements file is found and this is set to False.
:return: a dict containing roles and collections to found in the requirements file.
"""
requirements = {
'roles': [],
'collections': [],
}
b_requirements_file = to_bytes(requirements_file, errors='surrogate_or_strict')
if not os.path.exists(b_requirements_file):
raise AnsibleError("The requirements file '%s' does not exist." % to_native(requirements_file))
display.vvv("Reading requirement file at '%s'" % requirements_file)
with open(b_requirements_file, 'rb') as req_obj:
try:
file_requirements = yaml.safe_load(req_obj)
except YAMLError as err:
raise AnsibleError(
"Failed to parse the requirements yml at '%s' with the following error:\n%s"
% (to_native(requirements_file), to_native(err)))
if requirements_file is None:
raise AnsibleError("No requirements found in file '%s'" % to_native(requirements_file))
def parse_role_req(requirement):
if "include" not in requirement:
role = RoleRequirement.role_yaml_parse(requirement)
display.vvv("found role %s in yaml file" % to_text(role))
if "name" not in role and "src" not in role:
raise AnsibleError("Must specify name or src for role")
return [GalaxyRole(self.galaxy, **role)]
else:
b_include_path = to_bytes(requirement["include"], errors="surrogate_or_strict")
if not os.path.isfile(b_include_path):
raise AnsibleError("Failed to find include requirements file '%s' in '%s'"
% (to_native(b_include_path), to_native(requirements_file)))
with open(b_include_path, 'rb') as f_include:
try:
return [GalaxyRole(self.galaxy, **r) for r in
(RoleRequirement.role_yaml_parse(i) for i in yaml.safe_load(f_include))]
except Exception as e:
raise AnsibleError("Unable to load data from include requirements file: %s %s"
% (to_native(requirements_file), to_native(e)))
if isinstance(file_requirements, list):
# Older format that contains only roles
if not allow_old_format:
raise AnsibleError("Expecting requirements file to be a dict with the key 'collections' that contains "
"a list of collections to install")
for role_req in file_requirements:
requirements['roles'] += parse_role_req(role_req)
else:
# Newer format with a collections and/or roles key
extra_keys = set(file_requirements.keys()).difference(set(['roles', 'collections']))
if extra_keys:
raise AnsibleError("Expecting only 'roles' and/or 'collections' as base keys in the requirements "
"file. Found: %s" % (to_native(", ".join(extra_keys))))
for role_req in file_requirements.get('roles', []):
requirements['roles'] += parse_role_req(role_req)
for collection_req in file_requirements.get('collections', []):
if isinstance(collection_req, dict):
req_name = collection_req.get('name', None)
if req_name is None:
raise AnsibleError("Collections requirement entry should contain the key name.")
req_version = collection_req.get('version', '*')
req_source = collection_req.get('source', None)
if req_source:
# Try and match up the requirement source with our list of Galaxy API servers defined in the
# config, otherwise create a server with that URL without any auth.
req_source = next(iter([a for a in self.api_servers if req_source in [a.name, a.api_server]]),
GalaxyAPI(self.galaxy, "explicit_requirement_%s" % req_name, req_source))
requirements['collections'].append((req_name, req_version, req_source))
else:
requirements['collections'].append((collection_req, '*', None))
return requirements
@staticmethod @staticmethod
def exit_without_ignore(rc=1): def exit_without_ignore(rc=1):
""" """
@ -605,8 +763,6 @@ class GalaxyCLI(CLI):
collections = context.CLIARGS['args'] collections = context.CLIARGS['args']
force = context.CLIARGS['force'] force = context.CLIARGS['force']
output_path = context.CLIARGS['collections_path'] output_path = context.CLIARGS['collections_path']
# TODO: use a list of server that have been configured in ~/.ansible_galaxy
servers = [context.CLIARGS['api_server']]
ignore_certs = context.CLIARGS['ignore_certs'] ignore_certs = context.CLIARGS['ignore_certs']
ignore_errors = context.CLIARGS['ignore_errors'] ignore_errors = context.CLIARGS['ignore_errors']
requirements_file = context.CLIARGS['requirements'] requirements_file = context.CLIARGS['requirements']
@ -620,12 +776,12 @@ class GalaxyCLI(CLI):
if requirements_file: if requirements_file:
requirements_file = GalaxyCLI._resolve_path(requirements_file) requirements_file = GalaxyCLI._resolve_path(requirements_file)
collection_requirements = parse_collections_requirements_file(requirements_file) requirements = self._parse_requirements_file(requirements_file, allow_old_format=False)['collections']
else: else:
collection_requirements = [] requirements = []
for collection_input in collections: for collection_input in collections:
name, dummy, requirement = collection_input.partition(':') name, dummy, requirement = collection_input.partition(':')
collection_requirements.append((name, requirement or '*', None)) requirements.append((name, requirement or '*', None))
output_path = GalaxyCLI._resolve_path(output_path) output_path = GalaxyCLI._resolve_path(output_path)
collections_path = C.COLLECTIONS_PATHS collections_path = C.COLLECTIONS_PATHS
@ -642,7 +798,7 @@ class GalaxyCLI(CLI):
if not os.path.exists(b_output_path): if not os.path.exists(b_output_path):
os.makedirs(b_output_path) os.makedirs(b_output_path)
install_collections(collection_requirements, output_path, servers, (not ignore_certs), ignore_errors, install_collections(requirements, output_path, self.api_servers, (not ignore_certs), ignore_errors,
no_deps, force, force_deps) no_deps, force, force_deps)
return 0 return 0
@ -660,41 +816,10 @@ class GalaxyCLI(CLI):
roles_left = [] roles_left = []
if role_file: if role_file:
try: if not (role_file.endswith('.yaml') or role_file.endswith('.yml')):
f = open(role_file, 'r') raise AnsibleError("Invalid role requirements file, it must end with a .yml or .yaml extension")
if role_file.endswith('.yaml') or role_file.endswith('.yml'):
try: roles_left = self._parse_requirements_file(role_file)['roles']
required_roles = yaml.safe_load(f.read())
except Exception as e:
raise AnsibleError(
"Unable to load data from the requirements file (%s): %s" % (role_file, to_native(e))
)
if required_roles is None:
raise AnsibleError("No roles found in file: %s" % role_file)
for role in required_roles:
if "include" not in role:
role = RoleRequirement.role_yaml_parse(role)
display.vvv("found role %s in yaml file" % str(role))
if "name" not in role and "scm" not in role:
raise AnsibleError("Must specify name or src for role")
roles_left.append(GalaxyRole(self.galaxy, **role))
else:
with open(role["include"]) as f_include:
try:
roles_left += [
GalaxyRole(self.galaxy, **r) for r in
(RoleRequirement.role_yaml_parse(i) for i in yaml.safe_load(f_include))
]
except Exception as e:
msg = "Unable to load data from the include requirements file: %s %s"
raise AnsibleError(msg % (role_file, e))
else:
raise AnsibleError("Invalid role requirements file")
f.close()
except (IOError, OSError) as e:
raise AnsibleError('Unable to open %s: %s' % (role_file, to_native(e)))
else: else:
# roles were specified directly, so we'll just go out grab them # roles were specified directly, so we'll just go out grab them
# (and their dependencies, unless the user doesn't want us to). # (and their dependencies, unless the user doesn't want us to).
@ -850,13 +975,10 @@ class GalaxyCLI(CLI):
""" """
Publish a collection into Ansible Galaxy. Requires the path to the collection tarball to publish. Publish a collection into Ansible Galaxy. Requires the path to the collection tarball to publish.
""" """
api_key = context.CLIARGS['api_key'] or GalaxyToken().get()
api_server = context.CLIARGS['api_server']
collection_path = GalaxyCLI._resolve_path(context.CLIARGS['args']) collection_path = GalaxyCLI._resolve_path(context.CLIARGS['args'])
ignore_certs = context.CLIARGS['ignore_certs']
wait = context.CLIARGS['wait'] wait = context.CLIARGS['wait']
publish_collection(collection_path, api_server, api_key, ignore_certs, wait) publish_collection(collection_path, self.api, wait)
def execute_search(self): def execute_search(self):
''' searches for roles on the Ansible Galaxy server''' ''' searches for roles on the Ansible Galaxy server'''

@ -1344,6 +1344,18 @@ GALAXY_SERVER:
ini: ini:
- {key: server, section: galaxy} - {key: server, section: galaxy}
yaml: {key: galaxy.server} yaml: {key: galaxy.server}
GALAXY_SERVER_LIST:
description:
- A list of Galaxy servers to use when installing a collection.
- The value corresponds to the config ini header ``[galaxy_server.{{item}}]`` which defines the server details.
- 'See :ref:`galaxy_server_config` for more details on how to define a Galaxy server.'
- The order of servers in this list is used to as the order in which a collection is resolved.
- Setting this config option will ignore the :ref:`galaxy_server` config option.
env: [{name: ANSIBLE_GALAXY_SERVER_LIST}]
ini:
- {key: server_list, section: galaxy}
type: list
version_added: "2.9"
GALAXY_TOKEN: GALAXY_TOKEN:
default: null default: null
description: "GitHub personal access token" description: "GitHub personal access token"
@ -1358,6 +1370,7 @@ GALAXY_TOKEN_PATH:
ini: ini:
- {key: token_path, section: galaxy} - {key: token_path, section: galaxy}
type: path type: path
version_added: "2.9"
HOST_KEY_CHECKING: HOST_KEY_CHECKING:
name: Check host keys name: Check host keys
default: True default: True

@ -26,6 +26,7 @@ __metaclass__ = type
import os import os
import yaml import yaml
import ansible.constants as C
from ansible import context from ansible import context
from ansible.module_utils._text import to_bytes from ansible.module_utils._text import to_bytes
@ -45,7 +46,7 @@ class Galaxy(object):
def __init__(self): def __init__(self):
# roles_path needs to be a list and will be by default # roles_path needs to be a list and will be by default
roles_path = context.CLIARGS.get('roles_path', tuple()) roles_path = context.CLIARGS.get('roles_path', C.DEFAULT_ROLES_PATH)
# cli option handling is responsible for splitting roles_path # cli option handling is responsible for splitting roles_path
self.roles_paths = roles_path self.roles_paths = roles_path

@ -22,14 +22,11 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import base64
import json import json
from functools import wraps
from ansible import context from ansible import context
import ansible.constants as C
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.galaxy.token import GalaxyToken
from ansible.module_utils.six import string_types from ansible.module_utils.six import string_types
from ansible.module_utils.six.moves.urllib.error import HTTPError from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode
@ -40,26 +37,16 @@ from ansible.utils.display import Display
display = Display() display = Display()
def requires_token(func):
''' wrapper to laziliy initialize token file '''
@wraps(func)
def wrapped(self, *args, **kwargs):
if self.token is None:
self.token = GalaxyToken()
return func(self, *args, **kwargs)
return wrapped
def g_connect(method): def g_connect(method):
''' wrapper to lazily initialize connection info to galaxy ''' ''' wrapper to lazily initialize connection info to galaxy '''
def wrapped(self, *args, **kwargs): def wrapped(self, *args, **kwargs):
if not self.initialized: if not self.initialized:
display.vvvv("Initial connection to galaxy_server: %s" % self._api_server) display.vvvv("Initial connection to galaxy_server: %s" % self.api_server)
server_version = self._get_server_api_version() server_version = self._get_server_api_version()
if server_version not in self.SUPPORTED_VERSIONS: if server_version not in self.SUPPORTED_VERSIONS:
raise AnsibleError("Unsupported Galaxy server API version: %s" % server_version) raise AnsibleError("Unsupported Galaxy server API version: %s" % server_version)
self.baseurl = '%s/api/%s' % (self._api_server, server_version) self.baseurl = _urljoin(self.api_server, "api", server_version)
self.version = server_version # for future use self.version = server_version # for future use
display.vvvv("Base API: %s" % self.baseurl) display.vvvv("Base API: %s" % self.baseurl)
self.initialized = True self.initialized = True
@ -67,40 +54,52 @@ def g_connect(method):
return wrapped return wrapped
def _urljoin(*args):
return '/'.join(to_native(a, errors='surrogate_or_strict').rstrip('/') for a in args + ('',))
class GalaxyAPI(object): class GalaxyAPI(object):
''' This class is meant to be used as a API client for an Ansible Galaxy server ''' ''' This class is meant to be used as a API client for an Ansible Galaxy server '''
SUPPORTED_VERSIONS = ['v1'] SUPPORTED_VERSIONS = ['v1']
def __init__(self, galaxy): def __init__(self, galaxy, name, url, username=None, password=None, token=None):
self.galaxy = galaxy self.galaxy = galaxy
self.token = None self.name = name
self._api_server = C.GALAXY_SERVER self.username = username
self._validate_certs = not context.CLIARGS['ignore_certs'] self.password = password
self.token = token
self.api_server = url
self.validate_certs = not context.CLIARGS['ignore_certs']
self.baseurl = None self.baseurl = None
self.version = None self.version = None
self.initialized = False self.initialized = False
display.debug('Validate TLS certificates: %s' % self._validate_certs) display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs))
# set the API server def _auth_header(self, required=True):
if context.CLIARGS['api_server'] != C.GALAXY_SERVER: token = self.token.get() if self.token else None
self._api_server = context.CLIARGS['api_server']
if token:
@requires_token return {'Authorization': "Token %s" % token}
def __auth_header(self): elif self.username:
token = self.token.get() token = "%s:%s" % (to_text(self.username, errors='surrogate_or_strict'),
if token is None: to_text(self.password, errors='surrogate_or_strict', nonstring='passthru') or '')
raise AnsibleError("No access token. You must first use login to authenticate and obtain an access token.") b64_val = base64.b64encode(to_bytes(token, encoding='utf-8', errors='surrogate_or_strict'))
return {'Authorization': 'Token ' + token} return {'Authorization': "Basic %s" % to_text(b64_val)}
elif required:
raise AnsibleError("No access token or username set. A token can be set with --api-key, with "
"'ansible-galaxy login', or set in ansible.cfg.")
else:
return {}
@g_connect @g_connect
def __call_galaxy(self, url, args=None, headers=None, method=None): def __call_galaxy(self, url, args=None, headers=None, method=None):
if args and not headers: if args and not headers:
headers = self.__auth_header() headers = self._auth_header()
try: try:
display.vvv(url) display.vvv(url)
resp = open_url(url, data=args, validate_certs=self._validate_certs, headers=headers, method=method, resp = open_url(url, data=args, validate_certs=self.validate_certs, headers=headers, method=method,
timeout=20) timeout=20)
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict')) data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
except HTTPError as e: except HTTPError as e:
@ -108,22 +107,14 @@ class GalaxyAPI(object):
raise AnsibleError(res['detail']) raise AnsibleError(res['detail'])
return data return data
@property
def api_server(self):
return self._api_server
@property
def validate_certs(self):
return self._validate_certs
def _get_server_api_version(self): def _get_server_api_version(self):
""" """
Fetches the Galaxy API current version to ensure Fetches the Galaxy API current version to ensure
the API server is up and reachable. the API server is up and reachable.
""" """
url = '%s/api/' % self._api_server url = _urljoin(self.api_server, "api")
try: try:
return_data = open_url(url, validate_certs=self._validate_certs) return_data = open_url(url, validate_certs=self.validate_certs)
except Exception as e: except Exception as e:
raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e))) raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e)))
@ -142,9 +133,9 @@ class GalaxyAPI(object):
""" """
Retrieve an authentication token Retrieve an authentication token
""" """
url = '%s/tokens/' % self.baseurl url = _urljoin(self.baseurl, "tokens")
args = urlencode({"github_token": github_token}) args = urlencode({"github_token": github_token})
resp = open_url(url, data=args, validate_certs=self._validate_certs, method="POST") resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST")
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict')) data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
return data return data
@ -153,7 +144,7 @@ class GalaxyAPI(object):
""" """
Post an import request Post an import request
""" """
url = '%s/imports/' % self.baseurl url = _urljoin(self.baseurl, "imports")
args = { args = {
"github_user": github_user, "github_user": github_user,
"github_repo": github_repo, "github_repo": github_repo,
@ -173,7 +164,7 @@ class GalaxyAPI(object):
""" """
Check the status of an import task. Check the status of an import task.
""" """
url = '%s/imports/' % self.baseurl url = _urljoin(self.baseurl, "imports")
if task_id is not None: if task_id is not None:
url = "%s?id=%d" % (url, task_id) url = "%s?id=%d" % (url, task_id)
elif github_user is not None and github_repo is not None: elif github_user is not None and github_repo is not None:
@ -200,7 +191,7 @@ class GalaxyAPI(object):
except Exception: except Exception:
raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name) raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
url = '%s/roles/?owner__username=%s&name=%s' % (self.baseurl, user_name, role_name) url = _urljoin(self.baseurl, "roles", "?owner__username=%s&name=%s" % (user_name, role_name))[:-1]
data = self.__call_galaxy(url) data = self.__call_galaxy(url)
if len(data["results"]) != 0: if len(data["results"]) != 0:
return data["results"][0] return data["results"][0]
@ -215,12 +206,12 @@ class GalaxyAPI(object):
results = [] results = []
try: try:
url = '%s/roles/%s/%s/?page_size=50' % (self.baseurl, role_id, related) url = _urljoin(self.baseurl, "roles", role_id, related, "?page_size=50")[:-1]
data = self.__call_galaxy(url) data = self.__call_galaxy(url)
results = data['results'] results = data['results']
done = (data.get('next_link', None) is None) done = (data.get('next_link', None) is None)
while not done: while not done:
url = '%s%s' % (self._api_server, data['next_link']) url = _urljoin(self.api_server, data['next_link'])
data = self.__call_galaxy(url) data = self.__call_galaxy(url)
results += data['results'] results += data['results']
done = (data.get('next_link', None) is None) done = (data.get('next_link', None) is None)
@ -234,7 +225,7 @@ class GalaxyAPI(object):
Fetch the list of items specified. Fetch the list of items specified.
""" """
try: try:
url = '%s/%s/?page_size' % (self.baseurl, what) url = _urljoin(self.baseurl, what, "?page_size")[:-1]
data = self.__call_galaxy(url) data = self.__call_galaxy(url)
if "results" in data: if "results" in data:
results = data['results'] results = data['results']
@ -244,7 +235,7 @@ class GalaxyAPI(object):
if "next" in data: if "next" in data:
done = (data.get('next_link', None) is None) done = (data.get('next_link', None) is None)
while not done: while not done:
url = '%s%s' % (self._api_server, data['next_link']) url = _urljoin(self.api_server, data['next_link'])
data = self.__call_galaxy(url) data = self.__call_galaxy(url)
results += data['results'] results += data['results']
done = (data.get('next_link', None) is None) done = (data.get('next_link', None) is None)
@ -255,7 +246,7 @@ class GalaxyAPI(object):
@g_connect @g_connect
def search_roles(self, search, **kwargs): def search_roles(self, search, **kwargs):
search_url = self.baseurl + '/search/roles/?' search_url = _urljoin(self.baseurl, "search", "roles", "?")[:-1]
if search: if search:
search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search))) search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search)))
@ -284,7 +275,7 @@ class GalaxyAPI(object):
@g_connect @g_connect
def add_secret(self, source, github_user, github_repo, secret): def add_secret(self, source, github_user, github_repo, secret):
url = "%s/notification_secrets/" % self.baseurl url = _urljoin(self.baseurl, "notification_secrets")
args = urlencode({ args = urlencode({
"source": source, "source": source,
"github_user": github_user, "github_user": github_user,
@ -296,18 +287,18 @@ class GalaxyAPI(object):
@g_connect @g_connect
def list_secrets(self): def list_secrets(self):
url = "%s/notification_secrets" % self.baseurl url = _urljoin(self.baseurl, "notification_secrets")
data = self.__call_galaxy(url, headers=self.__auth_header()) data = self.__call_galaxy(url, headers=self._auth_header())
return data return data
@g_connect @g_connect
def remove_secret(self, secret_id): def remove_secret(self, secret_id):
url = "%s/notification_secrets/%s/" % (self.baseurl, secret_id) url = _urljoin(self.baseurl, "notification_secrets", secret_id)
data = self.__call_galaxy(url, headers=self.__auth_header(), method='DELETE') data = self.__call_galaxy(url, headers=self._auth_header(), method='DELETE')
return data return data
@g_connect @g_connect
def delete_role(self, github_user, github_repo): def delete_role(self, github_user, github_repo):
url = "%s/removerole/?github_user=%s&github_repo=%s" % (self.baseurl, github_user, github_repo) url = _urljoin(self.baseurl, "removerole", "?github_user=%s&github_repo=%s" % (github_user, github_repo))[:-1]
data = self.__call_galaxy(url, headers=self.__auth_header(), method='DELETE') data = self.__call_galaxy(url, headers=self._auth_header(), method='DELETE')
return data return data

@ -24,6 +24,7 @@ from yaml.error import YAMLError
import ansible.constants as C import ansible.constants as C
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.galaxy import get_collections_galaxy_meta_info from ansible.galaxy import get_collections_galaxy_meta_info
from ansible.galaxy.api import _urljoin
from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils import six from ansible.module_utils import six
from ansible.utils.collection_loader import is_collection_ref from ansible.utils.collection_loader import is_collection_ref
@ -44,8 +45,8 @@ class CollectionRequirement:
_FILE_MAPPING = [(b'MANIFEST.json', 'manifest_file'), (b'FILES.json', 'files_file')] _FILE_MAPPING = [(b'MANIFEST.json', 'manifest_file'), (b'FILES.json', 'files_file')]
def __init__(self, namespace, name, b_path, source, versions, requirement, force, parent=None, validate_certs=True, def __init__(self, namespace, name, b_path, api, versions, requirement, force, parent=None, metadata=None,
metadata=None, files=None, skip=False): files=None, skip=False):
""" """
Represents a collection requirement, the versions that are available to be installed as well as any Represents a collection requirement, the versions that are available to be installed as well as any
dependencies the collection has. dependencies the collection has.
@ -53,12 +54,11 @@ class CollectionRequirement:
:param namespace: The collection namespace. :param namespace: The collection namespace.
:param name: The collection name. :param name: The collection name.
:param b_path: Byte str of the path to the collection tarball if it has already been downloaded. :param b_path: Byte str of the path to the collection tarball if it has already been downloaded.
:param source: The Galaxy server URL to download if the collection is from Galaxy. :param api: The GalaxyAPI to use if the collection is from Galaxy.
:param versions: A list of versions of the collection that are available. :param versions: A list of versions of the collection that are available.
:param requirement: The version requirement string used to verify the list of versions fit the requirements. :param requirement: The version requirement string used to verify the list of versions fit the requirements.
:param force: Whether the force flag applied to the collection. :param force: Whether the force flag applied to the collection.
:param parent: The name of the parent the collection is a dependency of. :param parent: The name of the parent the collection is a dependency of.
:param validate_certs: Whether to validate the Galaxy server certificate.
:param metadata: The collection metadata dict if it has already been retrieved. :param metadata: The collection metadata dict if it has already been retrieved.
:param files: The files that exist inside the collection. This is based on the FILES.json file inside the :param files: The files that exist inside the collection. This is based on the FILES.json file inside the
collection artifact. collection artifact.
@ -68,12 +68,11 @@ class CollectionRequirement:
self.namespace = namespace self.namespace = namespace
self.name = name self.name = name
self.b_path = b_path self.b_path = b_path
self.source = source self.api = api
self.versions = set(versions) self.versions = set(versions)
self.force = force self.force = force
self.skip = skip self.skip = skip
self.required_by = [] self.required_by = []
self._validate_certs = validate_certs
self._metadata = metadata self._metadata = metadata
self._files = files self._files = files
@ -120,7 +119,7 @@ class CollectionRequirement:
msg = "Cannot meet dependency requirement '%s:%s' for collection %s" \ msg = "Cannot meet dependency requirement '%s:%s' for collection %s" \
% (to_text(self), requirement, parent) % (to_text(self), requirement, parent)
collection_source = to_text(self.b_path, nonstring='passthru') or self.source collection_source = to_text(self.b_path, nonstring='passthru') or self.api.api_server
req_by = "\n".join( req_by = "\n".join(
"\t%s - '%s:%s'" % (to_text(p) if p else 'base', to_text(self), r) "\t%s - '%s:%s'" % (to_text(p) if p else 'base', to_text(self), r)
for p, r in self.required_by for p, r in self.required_by
@ -147,7 +146,9 @@ class CollectionRequirement:
if self.b_path is None: if self.b_path is None:
download_url = self._galaxy_info['download_url'] download_url = self._galaxy_info['download_url']
artifact_hash = self._galaxy_info['artifact']['sha256'] artifact_hash = self._galaxy_info['artifact']['sha256']
self.b_path = _download_file(download_url, b_temp_path, artifact_hash, self._validate_certs) headers = self.api._auth_header(required=False)
self.b_path = _download_file(download_url, b_temp_path, artifact_hash, self.api.validate_certs,
headers=headers)
if os.path.exists(b_collection_path): if os.path.exists(b_collection_path):
shutil.rmtree(b_collection_path) shutil.rmtree(b_collection_path)
@ -180,9 +181,10 @@ class CollectionRequirement:
if self._metadata: if self._metadata:
return return
n_collection_url = _urljoin(self.source, 'api', 'v2', 'collections', self.namespace, self.name, 'versions', n_collection_url = _urljoin(self.api.api_server, 'api', 'v2', 'collections', self.namespace, self.name,
self.latest_version) 'versions', self.latest_version)
details = json.load(open_url(n_collection_url, validate_certs=self._validate_certs)) details = json.load(open_url(n_collection_url, validate_certs=self.api.validate_certs,
headers=self.api._auth_header(required=False)))
self._galaxy_info = details self._galaxy_info = details
self._metadata = details['metadata'] self._metadata = details['metadata']
@ -225,7 +227,7 @@ class CollectionRequirement:
return False return False
@staticmethod @staticmethod
def from_tar(b_path, validate_certs, force, parent=None): def from_tar(b_path, force, parent=None):
if not tarfile.is_tarfile(b_path): if not tarfile.is_tarfile(b_path):
raise AnsibleError("Collection artifact at '%s' is not a valid tar file." % to_native(b_path)) raise AnsibleError("Collection artifact at '%s' is not a valid tar file." % to_native(b_path))
@ -254,10 +256,10 @@ class CollectionRequirement:
version = meta['version'] version = meta['version']
return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent, return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent,
validate_certs=validate_certs, metadata=meta, files=files) metadata=meta, files=files)
@staticmethod @staticmethod
def from_path(b_path, validate_certs, force, parent=None): def from_path(b_path, force, parent=None):
info = {} info = {}
for b_file_name, property_name in CollectionRequirement._FILE_MAPPING: for b_file_name, property_name in CollectionRequirement._FILE_MAPPING:
b_file_path = os.path.join(b_path, b_file_name) b_file_path = os.path.join(b_path, b_file_name)
@ -292,16 +294,17 @@ class CollectionRequirement:
files = info.get('files_file', {}).get('files', {}) files = info.get('files_file', {}).get('files', {})
return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent, return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent,
validate_certs=validate_certs, metadata=meta, files=files, skip=True) metadata=meta, files=files, skip=True)
@staticmethod @staticmethod
def from_name(collection, servers, requirement, validate_certs, force, parent=None): def from_name(collection, apis, requirement, force, parent=None):
namespace, name = collection.split('.', 1) namespace, name = collection.split('.', 1)
galaxy_info = None galaxy_info = None
galaxy_meta = None galaxy_meta = None
for server in servers: for api in apis:
collection_url_paths = [server, 'api', 'v2', 'collections', namespace, name, 'versions'] collection_url_paths = [api.api_server, 'api', 'v2', 'collections', namespace, name, 'versions']
headers = api._auth_header(required=False)
is_single = False is_single = False
if not (requirement == '*' or requirement.startswith('<') or requirement.startswith('>') or if not (requirement == '*' or requirement.startswith('<') or requirement.startswith('>') or
@ -314,7 +317,7 @@ class CollectionRequirement:
n_collection_url = _urljoin(*collection_url_paths) n_collection_url = _urljoin(*collection_url_paths)
try: try:
resp = json.load(open_url(n_collection_url, validate_certs=validate_certs)) resp = json.load(open_url(n_collection_url, validate_certs=api.validate_certs, headers=headers))
except urllib_error.HTTPError as err: except urllib_error.HTTPError as err:
if err.code == 404: if err.code == 404:
continue continue
@ -333,14 +336,14 @@ class CollectionRequirement:
if resp['next'] is None: if resp['next'] is None:
break break
resp = json.load(open_url(to_native(resp['next'], errors='surrogate_or_strict'), resp = json.load(open_url(to_native(resp['next'], errors='surrogate_or_strict'),
validate_certs=validate_certs)) validate_certs=api.validate_certs, headers=headers))
break break
else: else:
raise AnsibleError("Failed to find collection %s:%s" % (collection, requirement)) raise AnsibleError("Failed to find collection %s:%s" % (collection, requirement))
req = CollectionRequirement(namespace, name, None, server, versions, requirement, force, parent=parent, req = CollectionRequirement(namespace, name, None, api, versions, requirement, force, parent=parent,
validate_certs=validate_certs, metadata=galaxy_meta) metadata=galaxy_meta)
req._galaxy_info = galaxy_info req._galaxy_info = galaxy_info
return req return req
@ -380,14 +383,13 @@ def build_collection(collection_path, output_path, force):
_build_collection_tar(b_collection_path, b_collection_output, collection_manifest, file_manifest) _build_collection_tar(b_collection_path, b_collection_output, collection_manifest, file_manifest)
def publish_collection(collection_path, server, key, ignore_certs, wait): def publish_collection(collection_path, api, wait):
""" """
Publish an Ansible collection tarball into an Ansible Galaxy server. Publish an Ansible collection tarball into an Ansible Galaxy server.
:param collection_path: The path to the collection tarball to publish. :param collection_path: The path to the collection tarball to publish.
:param server: A native string of the Ansible Galaxy server to publish to. :param api: A GalaxyAPI to publish the collection to.
:param key: The API key to use for authorization. :param wait: Whether to wait until the import process is complete.
:param ignore_certs: Whether to ignore certificate validation when interacting with the server.
""" """
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict') b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
if not os.path.exists(b_collection_path): if not os.path.exists(b_collection_path):
@ -396,21 +398,19 @@ def publish_collection(collection_path, server, key, ignore_certs, wait):
raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection " raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection "
"build' to create a proper release artifact." % to_native(collection_path)) "build' to create a proper release artifact." % to_native(collection_path))
display.display("Publishing collection artifact '%s' to %s" % (collection_path, server)) display.display("Publishing collection artifact '%s' to %s %s" % (collection_path, api.name, api.api_server))
n_url = _urljoin(server, 'api', 'v2', 'collections') n_url = _urljoin(api.api_server, 'api', 'v2', 'collections')
data, content_type = _get_mime_data(b_collection_path) data, content_type = _get_mime_data(b_collection_path)
headers = { headers = {
'Content-type': content_type, 'Content-type': content_type,
'Content-length': len(data), 'Content-length': len(data),
} }
if key: headers.update(api._auth_header())
headers['Authorization'] = "Token %s" % key
validate_certs = not ignore_certs
try: try:
resp = json.load(open_url(n_url, data=data, headers=headers, method='POST', validate_certs=validate_certs)) resp = json.load(open_url(n_url, data=data, headers=headers, method='POST', validate_certs=api.validate_certs))
except urllib_error.HTTPError as err: except urllib_error.HTTPError as err:
try: try:
err_info = json.load(err) err_info = json.load(err)
@ -423,24 +423,24 @@ def publish_collection(collection_path, server, key, ignore_certs, wait):
raise AnsibleError("Error when publishing collection (HTTP Code: %d, Message: %s Code: %s)" raise AnsibleError("Error when publishing collection (HTTP Code: %d, Message: %s Code: %s)"
% (err.code, message, code)) % (err.code, message, code))
display.vvv("Collection has been pushed to the Galaxy server %s" % server) display.vvv("Collection has been pushed to the Galaxy server %s %s" % (api.name, api.api_server))
import_uri = resp['task'] import_uri = resp['task']
if wait: if wait:
_wait_import(import_uri, key, validate_certs) _wait_import(import_uri, api)
display.display("Collection has been successfully published to the Galaxy server") display.display("Collection has been successfully published to the Galaxy server")
else: else:
display.display("Collection has been pushed to the Galaxy server, not waiting until import has completed " display.display("Collection has been pushed to the Galaxy server, not waiting until import has completed "
"due to --no-wait being set. Import task results can be found at %s" % import_uri) "due to --no-wait being set. Import task results can be found at %s" % import_uri)
def install_collections(collections, output_path, servers, validate_certs, ignore_errors, no_deps, force, force_deps): def install_collections(collections, output_path, apis, validate_certs, ignore_errors, no_deps, force, force_deps):
""" """
Install Ansible collections to the path specified. Install Ansible collections to the path specified.
:param collections: The collections to install, should be a list of tuples with (name, requirement, Galaxy server). :param collections: The collections to install, should be a list of tuples with (name, requirement, Galaxy server).
:param output_path: The path to install the collections to. :param output_path: The path to install the collections to.
:param servers: A list of Galaxy servers to query when searching for a collection. :param apis: A list of GalaxyAPIs to query when searching for a collection.
:param validate_certs: Whether to validate the Galaxy server certificates. :param validate_certs: Whether to validate the certificates if downloading a tarball.
:param ignore_errors: Whether to ignore any errors when installing the collection. :param ignore_errors: Whether to ignore any errors when installing the collection.
:param no_deps: Ignore any collection dependencies and only install the base requirements. :param no_deps: Ignore any collection dependencies and only install the base requirements.
:param force: Re-install a collection if it has already been installed. :param force: Re-install a collection if it has already been installed.
@ -449,7 +449,7 @@ def install_collections(collections, output_path, servers, validate_certs, ignor
existing_collections = _find_existing_collections(output_path) existing_collections = _find_existing_collections(output_path)
with _tempdir() as b_temp_path: with _tempdir() as b_temp_path:
dependency_map = _build_dependency_map(collections, existing_collections, b_temp_path, servers, validate_certs, dependency_map = _build_dependency_map(collections, existing_collections, b_temp_path, apis, validate_certs,
force, force_deps, no_deps) force, force_deps, no_deps)
for collection in dependency_map.values(): for collection in dependency_map.values():
@ -463,56 +463,6 @@ def install_collections(collections, output_path, servers, validate_certs, ignor
raise raise
def parse_collections_requirements_file(requirements_file):
"""
Parses an Ansible requirement.yml file and returns all the collections defined in it. This value ca be used with
install_collection(). The requirements file is in the form:
---
collections:
- namespace.collection
- name: namespace.collection
version: version identifier, multiple identifiers are separated by ','
source: the URL or prededefined source name in ~/.ansible_galaxy to pull the collection from
:param requirements_file: The path to the requirements file.
:return: A list of tuples (name, version, source).
"""
collection_info = []
b_requirements_file = to_bytes(requirements_file, errors='surrogate_or_strict')
if not os.path.exists(b_requirements_file):
raise AnsibleError("The requirements file '%s' does not exist." % to_native(requirements_file))
display.vvv("Reading collection requirement file at '%s'" % requirements_file)
with open(b_requirements_file, 'rb') as req_obj:
try:
requirements = yaml.safe_load(req_obj)
except YAMLError as err:
raise AnsibleError("Failed to parse the collection requirements yml at '%s' with the following error:\n%s"
% (to_native(requirements_file), to_native(err)))
if not isinstance(requirements, dict) or 'collections' not in requirements:
# TODO: Link to documentation page that documents the requirements.yml format for collections.
raise AnsibleError("Expecting collections requirements file to be a dict with the key "
"collections that contains a list of collections to install.")
for collection_req in requirements['collections']:
if isinstance(collection_req, dict):
req_name = collection_req.get('name', None)
if req_name is None:
raise AnsibleError("Collections requirement entry should contain the key name.")
req_version = collection_req.get('version', '*')
req_source = collection_req.get('source', None)
collection_info.append((req_name, req_version, req_source))
else:
collection_info.append((collection_req, '*', None))
return collection_info
def validate_collection_name(name): def validate_collection_name(name):
""" """
Validates the collection name as an input from the user or a requirements file fit the requirements. Validates the collection name as an input from the user or a requirements file fit the requirements.
@ -779,17 +729,15 @@ def _get_mime_data(b_collection_path):
return b"\r\n".join(form), content_type return b"\r\n".join(form), content_type
def _wait_import(task_url, key, validate_certs): def _wait_import(task_url, api):
headers = {} headers = api._auth_header()
if key:
headers['Authorization'] = "Token %s" % key
display.vvv('Waiting until galaxy import task %s has completed' % task_url) display.vvv('Waiting until galaxy import task %s has completed' % task_url)
wait = 2 wait = 2
while True: while True:
resp = json.load(open_url(to_native(task_url, errors='surrogate_or_strict'), headers=headers, method='GET', resp = json.load(open_url(to_native(task_url, errors='surrogate_or_strict'), headers=headers, method='GET',
validate_certs=validate_certs)) validate_certs=api.validate_certs))
if resp.get('finished_at', None): if resp.get('finished_at', None):
break break
@ -830,7 +778,7 @@ def _find_existing_collections(path):
for b_collection in os.listdir(b_namespace_path): for b_collection in os.listdir(b_namespace_path):
b_collection_path = os.path.join(b_namespace_path, b_collection) b_collection_path = os.path.join(b_namespace_path, b_collection)
if os.path.isdir(b_collection_path): if os.path.isdir(b_collection_path):
req = CollectionRequirement.from_path(b_collection_path, True, False) req = CollectionRequirement.from_path(b_collection_path, False)
display.vvv("Found installed collection %s:%s at '%s'" % (to_text(req), req.latest_version, display.vvv("Found installed collection %s:%s at '%s'" % (to_text(req), req.latest_version,
to_text(b_collection_path))) to_text(b_collection_path)))
collections.append(req) collections.append(req)
@ -838,13 +786,13 @@ def _find_existing_collections(path):
return collections return collections
def _build_dependency_map(collections, existing_collections, b_temp_path, servers, validate_certs, force, force_deps, def _build_dependency_map(collections, existing_collections, b_temp_path, apis, validate_certs, force, force_deps,
no_deps): no_deps):
dependency_map = {} dependency_map = {}
# First build the dependency map on the actual requirements # First build the dependency map on the actual requirements
for name, version, source in collections: for name, version, source in collections:
_get_collection_info(dependency_map, existing_collections, name, version, source, b_temp_path, servers, _get_collection_info(dependency_map, existing_collections, name, version, source, b_temp_path, apis,
validate_certs, (force or force_deps)) validate_certs, (force or force_deps))
checked_parents = set([to_text(c) for c in dependency_map.values() if c.skip]) checked_parents = set([to_text(c) for c in dependency_map.values() if c.skip])
@ -860,7 +808,7 @@ def _build_dependency_map(collections, existing_collections, b_temp_path, server
deps_exhausted = False deps_exhausted = False
for dep_name, dep_requirement in parent_info.dependencies.items(): for dep_name, dep_requirement in parent_info.dependencies.items():
_get_collection_info(dependency_map, existing_collections, dep_name, dep_requirement, _get_collection_info(dependency_map, existing_collections, dep_name, dep_requirement,
parent_info.source, b_temp_path, servers, validate_certs, force_deps, parent_info.api, b_temp_path, apis, validate_certs, force_deps,
parent=parent) parent=parent)
checked_parents.add(parent) checked_parents.add(parent)
@ -880,7 +828,7 @@ def _build_dependency_map(collections, existing_collections, b_temp_path, server
return dependency_map return dependency_map
def _get_collection_info(dep_map, existing_collections, collection, requirement, source, b_temp_path, server_list, def _get_collection_info(dep_map, existing_collections, collection, requirement, source, b_temp_path, apis,
validate_certs, force, parent=None): validate_certs, force, parent=None):
dep_msg = "" dep_msg = ""
if parent: if parent:
@ -896,7 +844,7 @@ def _get_collection_info(dep_map, existing_collections, collection, requirement,
b_tar_path = _download_file(collection, b_temp_path, None, validate_certs) b_tar_path = _download_file(collection, b_temp_path, None, validate_certs)
if b_tar_path: if b_tar_path:
req = CollectionRequirement.from_tar(b_tar_path, validate_certs, force, parent=parent) req = CollectionRequirement.from_tar(b_tar_path, force, parent=parent)
collection_name = to_text(req) collection_name = to_text(req)
if collection_name in dep_map: if collection_name in dep_map:
@ -912,9 +860,8 @@ def _get_collection_info(dep_map, existing_collections, collection, requirement,
collection_info = dep_map[collection] collection_info = dep_map[collection]
collection_info.add_requirement(parent, requirement) collection_info.add_requirement(parent, requirement)
else: else:
servers = [source] if source else server_list apis = [source] if source else apis
collection_info = CollectionRequirement.from_name(collection, servers, requirement, validate_certs, force, collection_info = CollectionRequirement.from_name(collection, apis, requirement, force, parent=parent)
parent=parent)
existing = [c for c in existing_collections if to_text(c) == to_text(collection_info)] existing = [c for c in existing_collections if to_text(c) == to_text(collection_info)]
if existing and not collection_info.force: if existing and not collection_info.force:
@ -925,11 +872,7 @@ def _get_collection_info(dep_map, existing_collections, collection, requirement,
dep_map[to_text(collection_info)] = collection_info dep_map[to_text(collection_info)] = collection_info
def _urljoin(*args): def _download_file(url, b_path, expected_hash, validate_certs, headers=None):
return '/'.join(to_native(a, errors='surrogate_or_strict').rstrip('/') for a in args + ('',))
def _download_file(url, b_path, expected_hash, validate_certs):
bufsize = 65536 bufsize = 65536
digest = sha256() digest = sha256()
@ -939,7 +882,9 @@ def _download_file(url, b_path, expected_hash, validate_certs):
b_file_path = tempfile.NamedTemporaryFile(dir=b_path, prefix=b_file_name, suffix=b_file_ext, delete=False).name b_file_path = tempfile.NamedTemporaryFile(dir=b_path, prefix=b_file_name, suffix=b_file_ext, delete=False).name
display.vvv("Downloading %s to %s" % (url, to_text(b_path))) display.vvv("Downloading %s to %s" % (url, to_text(b_path)))
resp = open_url(to_native(url, errors='surrogate_or_strict'), validate_certs=validate_certs) # Galaxy redirs downloads to S3 which reject the request if an Authorization header is attached so don't redir that
resp = open_url(to_native(url, errors='surrogate_or_strict'), validate_certs=validate_certs, headers=headers,
unredirected_headers=['Authorization'])
with open(b_file_path, 'wb') as download_file: with open(b_file_path, 'wb') as download_file:
data = resp.read(bufsize) data = resp.read(bufsize)

@ -31,6 +31,7 @@ import yaml
from distutils.version import LooseVersion from distutils.version import LooseVersion
from shutil import rmtree from shutil import rmtree
import ansible.constants as C
from ansible import context from ansible import context
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.module_utils._text import to_native, to_text from ansible.module_utils._text import to_native, to_text
@ -204,7 +205,7 @@ class GalaxyRole(object):
role_data = self.src role_data = self.src
tmp_file = self.fetch(role_data) tmp_file = self.fetch(role_data)
else: else:
api = GalaxyAPI(self.galaxy) api = GalaxyAPI(self.galaxy, 'role_default', C.GALAXY_SERVER)
role_data = api.lookup_role_by_name(self.src) role_data = api.lookup_role_by_name(self.src)
if not role_data: if not role_data:
raise AnsibleError("- sorry, %s was not found on %s." % (self.src, api.api_server)) raise AnsibleError("- sorry, %s was not found on %s." % (self.src, api.api_server))

@ -33,33 +33,49 @@ from ansible.utils.display import Display
display = Display() display = Display()
class NoTokenSentinel(object):
""" Represents an ansible.cfg server with not token defined (will ignore cmdline and GALAXY_TOKEN_PATH. """
def __new__(cls, *args, **kwargs):
return cls
class GalaxyToken(object): class GalaxyToken(object):
''' Class to storing and retrieving local galaxy token ''' ''' Class to storing and retrieving local galaxy token '''
def __init__(self): def __init__(self, token=None):
self.b_file = to_bytes(C.GALAXY_TOKEN_PATH) self.b_file = to_bytes(C.GALAXY_TOKEN_PATH, errors='surrogate_or_strict')
self.config = yaml.safe_load(self.__open_config_for_read()) # Done so the config file is only opened when set/get/save is called
if not self.config: self._config = None
self.config = {} self._token = token
@property
def config(self):
if not self._config:
self._config = self._read()
def __open_config_for_read(self): # Prioritise the token passed into the constructor
if self._token:
self._config['token'] = None if self._token is NoTokenSentinel else self._token
f = None return self._config
def _read(self):
action = 'Opened' action = 'Opened'
if not os.path.isfile(self.b_file): if not os.path.isfile(self.b_file):
# token file not found, create and chomd u+rw # token file not found, create and chomd u+rw
f = open(self.b_file, 'w') open(self.b_file, 'w').close()
f.close()
os.chmod(self.b_file, S_IRUSR | S_IWUSR) # owner has +rw os.chmod(self.b_file, S_IRUSR | S_IWUSR) # owner has +rw
action = 'Created' action = 'Created'
f = open(self.b_file, 'r') with open(self.b_file, 'r') as f:
config = yaml.safe_load(f)
display.vvv('%s %s' % (action, to_text(self.b_file))) display.vvv('%s %s' % (action, to_text(self.b_file)))
return f return config or {}
def set(self, token): def set(self, token):
self.config['token'] = token self._token = token
self.save() self.save()
def get(self): def get(self):

@ -1085,7 +1085,7 @@ class Request:
url_username=None, url_password=None, http_agent=None, url_username=None, url_password=None, http_agent=None,
force_basic_auth=None, follow_redirects=None, force_basic_auth=None, follow_redirects=None,
client_cert=None, client_key=None, cookies=None, use_gssapi=False, client_cert=None, client_key=None, cookies=None, use_gssapi=False,
unix_socket=None, ca_path=None): unix_socket=None, ca_path=None, unredirected_headers=None):
""" """
Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3) Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3)
@ -1123,7 +1123,8 @@ class Request:
:kwarg unix_socket: (optional) String of file system path to unix socket file to use when establishing :kwarg unix_socket: (optional) String of file system path to unix socket file to use when establishing
connection to the provided url connection to the provided url
:kwarg ca_path: (optional) String of file system path to CA cert bundle to use :kwarg ca_path: (optional) String of file system path to CA cert bundle to use
:returns: HTTPResponse :kwarg unredirected_headers: (optional) A list of headers to not attach on a redirected request
:returns: HTTPResponse. Added in Ansible 2.9
""" """
method = method.upper() method = method.upper()
@ -1277,8 +1278,12 @@ class Request:
request.add_header('If-Modified-Since', tstamp) request.add_header('If-Modified-Since', tstamp)
# user defined headers now, which may override things we've set above # user defined headers now, which may override things we've set above
unredirected_headers = unredirected_headers or []
for header in headers: for header in headers:
request.add_header(header, headers[header]) if header in unredirected_headers:
request.add_unredirected_header(header, headers[header])
else:
request.add_header(header, headers[header])
urlopen_args = [request, None] urlopen_args = [request, None]
if sys.version_info >= (2, 6, 0): if sys.version_info >= (2, 6, 0):
@ -1368,7 +1373,8 @@ def open_url(url, data=None, headers=None, method=None, use_proxy=True,
url_username=None, url_password=None, http_agent=None, url_username=None, url_password=None, http_agent=None,
force_basic_auth=False, follow_redirects='urllib2', force_basic_auth=False, follow_redirects='urllib2',
client_cert=None, client_key=None, cookies=None, client_cert=None, client_key=None, cookies=None,
use_gssapi=False, unix_socket=None, ca_path=None): use_gssapi=False, unix_socket=None, ca_path=None,
unredirected_headers=None):
''' '''
Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3) Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3)
@ -1380,7 +1386,8 @@ def open_url(url, data=None, headers=None, method=None, use_proxy=True,
url_username=url_username, url_password=url_password, http_agent=http_agent, url_username=url_username, url_password=url_password, http_agent=http_agent,
force_basic_auth=force_basic_auth, follow_redirects=follow_redirects, force_basic_auth=force_basic_auth, follow_redirects=follow_redirects,
client_cert=client_cert, client_key=client_key, cookies=cookies, client_cert=client_cert, client_key=client_key, cookies=cookies,
use_gssapi=use_gssapi, unix_socket=unix_socket, ca_path=ca_path) use_gssapi=use_gssapi, unix_socket=unix_socket, ca_path=ca_path,
unredirected_headers=unredirected_headers)
# #

@ -32,8 +32,9 @@ import yaml
import ansible.constants as C import ansible.constants as C
from ansible import context from ansible import context
from ansible.cli.galaxy import GalaxyCLI from ansible.cli.galaxy import GalaxyCLI
from ansible.galaxy.api import GalaxyAPI
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.module_utils._text import to_text from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.utils import context_objects as co from ansible.utils import context_objects as co
from units.compat import unittest from units.compat import unittest
from units.compat.mock import patch, MagicMock from units.compat.mock import patch, MagicMock
@ -754,7 +755,9 @@ def test_collection_install_with_names(collection_install):
assert mock_install.call_args[0][0] == [('namespace.collection', '*', None), assert mock_install.call_args[0][0] == [('namespace.collection', '*', None),
('namespace2.collection', '1.0.1', None)] ('namespace2.collection', '1.0.1', None)]
assert mock_install.call_args[0][1] == collection_path assert mock_install.call_args[0][1] == collection_path
assert mock_install.call_args[0][2] == ['https://galaxy.ansible.com'] assert len(mock_install.call_args[0][2]) == 1
assert mock_install.call_args[0][2][0].api_server == 'https://galaxy.ansible.com'
assert mock_install.call_args[0][2][0].validate_certs is True
assert mock_install.call_args[0][3] is True assert mock_install.call_args[0][3] is True
assert mock_install.call_args[0][4] is False assert mock_install.call_args[0][4] is False
assert mock_install.call_args[0][5] is False assert mock_install.call_args[0][5] is False
@ -789,7 +792,9 @@ collections:
assert mock_install.call_args[0][0] == [('namespace.coll', '*', None), assert mock_install.call_args[0][0] == [('namespace.coll', '*', None),
('namespace2.coll', '>2.0.1', None)] ('namespace2.coll', '>2.0.1', None)]
assert mock_install.call_args[0][1] == collection_path assert mock_install.call_args[0][1] == collection_path
assert mock_install.call_args[0][2] == ['https://galaxy.ansible.com'] assert len(mock_install.call_args[0][2]) == 1
assert mock_install.call_args[0][2][0].api_server == 'https://galaxy.ansible.com'
assert mock_install.call_args[0][2][0].validate_certs is True
assert mock_install.call_args[0][3] is True assert mock_install.call_args[0][3] is True
assert mock_install.call_args[0][4] is False assert mock_install.call_args[0][4] is False
assert mock_install.call_args[0][5] is False assert mock_install.call_args[0][5] is False
@ -801,8 +806,8 @@ def test_collection_install_with_relative_path(collection_install, monkeypatch):
mock_install = collection_install[0] mock_install = collection_install[0]
mock_req = MagicMock() mock_req = MagicMock()
mock_req.return_value = [('namespace.coll', '*', None)] mock_req.return_value = {'collections': [('namespace.coll', '*', None)]}
monkeypatch.setattr(ansible.cli.galaxy, 'parse_collections_requirements_file', mock_req) monkeypatch.setattr(ansible.cli.galaxy.GalaxyCLI, '_parse_requirements_file', mock_req)
monkeypatch.setattr(os, 'makedirs', MagicMock()) monkeypatch.setattr(os, 'makedirs', MagicMock())
@ -815,7 +820,9 @@ def test_collection_install_with_relative_path(collection_install, monkeypatch):
assert mock_install.call_count == 1 assert mock_install.call_count == 1
assert mock_install.call_args[0][0] == [('namespace.coll', '*', None)] assert mock_install.call_args[0][0] == [('namespace.coll', '*', None)]
assert mock_install.call_args[0][1] == os.path.abspath(collections_path) assert mock_install.call_args[0][1] == os.path.abspath(collections_path)
assert mock_install.call_args[0][2] == ['https://galaxy.ansible.com'] assert len(mock_install.call_args[0][2]) == 1
assert mock_install.call_args[0][2][0].api_server == 'https://galaxy.ansible.com'
assert mock_install.call_args[0][2][0].validate_certs is True
assert mock_install.call_args[0][3] is True assert mock_install.call_args[0][3] is True
assert mock_install.call_args[0][4] is False assert mock_install.call_args[0][4] is False
assert mock_install.call_args[0][5] is False assert mock_install.call_args[0][5] is False
@ -830,8 +837,8 @@ def test_collection_install_with_unexpanded_path(collection_install, monkeypatch
mock_install = collection_install[0] mock_install = collection_install[0]
mock_req = MagicMock() mock_req = MagicMock()
mock_req.return_value = [('namespace.coll', '*', None)] mock_req.return_value = {'collections': [('namespace.coll', '*', None)]}
monkeypatch.setattr(ansible.cli.galaxy, 'parse_collections_requirements_file', mock_req) monkeypatch.setattr(ansible.cli.galaxy.GalaxyCLI, '_parse_requirements_file', mock_req)
monkeypatch.setattr(os, 'makedirs', MagicMock()) monkeypatch.setattr(os, 'makedirs', MagicMock())
@ -844,7 +851,9 @@ def test_collection_install_with_unexpanded_path(collection_install, monkeypatch
assert mock_install.call_count == 1 assert mock_install.call_count == 1
assert mock_install.call_args[0][0] == [('namespace.coll', '*', None)] assert mock_install.call_args[0][0] == [('namespace.coll', '*', None)]
assert mock_install.call_args[0][1] == os.path.expanduser(os.path.expandvars(collections_path)) assert mock_install.call_args[0][1] == os.path.expanduser(os.path.expandvars(collections_path))
assert mock_install.call_args[0][2] == ['https://galaxy.ansible.com'] assert len(mock_install.call_args[0][2]) == 1
assert mock_install.call_args[0][2][0].api_server == 'https://galaxy.ansible.com'
assert mock_install.call_args[0][2][0].validate_certs is True
assert mock_install.call_args[0][3] is True assert mock_install.call_args[0][3] is True
assert mock_install.call_args[0][4] is False assert mock_install.call_args[0][4] is False
assert mock_install.call_args[0][5] is False assert mock_install.call_args[0][5] is False
@ -870,7 +879,9 @@ def test_collection_install_in_collection_dir(collection_install, monkeypatch):
assert mock_install.call_args[0][0] == [('namespace.collection', '*', None), assert mock_install.call_args[0][0] == [('namespace.collection', '*', None),
('namespace2.collection', '1.0.1', None)] ('namespace2.collection', '1.0.1', None)]
assert mock_install.call_args[0][1] == os.path.join(collections_path, 'ansible_collections') assert mock_install.call_args[0][1] == os.path.join(collections_path, 'ansible_collections')
assert mock_install.call_args[0][2] == ['https://galaxy.ansible.com'] assert len(mock_install.call_args[0][2]) == 1
assert mock_install.call_args[0][2][0].api_server == 'https://galaxy.ansible.com'
assert mock_install.call_args[0][2][0].validate_certs is True
assert mock_install.call_args[0][3] is True assert mock_install.call_args[0][3] is True
assert mock_install.call_args[0][4] is False assert mock_install.call_args[0][4] is False
assert mock_install.call_args[0][5] is False assert mock_install.call_args[0][5] is False
@ -914,7 +925,9 @@ def test_collection_install_path_with_ansible_collections(collection_install):
assert mock_install.call_args[0][0] == [('namespace.collection', '*', None), assert mock_install.call_args[0][0] == [('namespace.collection', '*', None),
('namespace2.collection', '1.0.1', None)] ('namespace2.collection', '1.0.1', None)]
assert mock_install.call_args[0][1] == collection_path assert mock_install.call_args[0][1] == collection_path
assert mock_install.call_args[0][2] == ['https://galaxy.ansible.com'] assert len(mock_install.call_args[0][2]) == 1
assert mock_install.call_args[0][2][0].api_server == 'https://galaxy.ansible.com'
assert mock_install.call_args[0][2][0].validate_certs is True
assert mock_install.call_args[0][3] is True assert mock_install.call_args[0][3] is True
assert mock_install.call_args[0][4] is False assert mock_install.call_args[0][4] is False
assert mock_install.call_args[0][5] is False assert mock_install.call_args[0][5] is False
@ -979,4 +992,191 @@ def test_collection_install_custom_server(collection_install):
'--server', 'https://galaxy-dev.ansible.com'] '--server', 'https://galaxy-dev.ansible.com']
GalaxyCLI(args=galaxy_args).run() GalaxyCLI(args=galaxy_args).run()
assert mock_install.call_args[0][2] == ['https://galaxy-dev.ansible.com'] assert len(mock_install.call_args[0][2]) == 1
assert mock_install.call_args[0][2][0].api_server == 'https://galaxy-dev.ansible.com'
assert mock_install.call_args[0][2][0].validate_certs is True
@pytest.fixture()
def requirements_file(request, tmp_path_factory):
content = request.param
test_dir = to_text(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Collections Requirements'))
requirements_file = os.path.join(test_dir, 'requirements.yml')
if content:
with open(requirements_file, 'wb') as req_obj:
req_obj.write(to_bytes(content))
yield requirements_file
@pytest.fixture()
def requirements_cli(monkeypatch):
monkeypatch.setattr(GalaxyCLI, 'execute_install', MagicMock())
cli = GalaxyCLI(args=['ansible-galaxy', 'install'])
cli.run()
return cli
@pytest.mark.parametrize('requirements_file', [None], indirect=True)
def test_parse_requirements_file_that_doesnt_exist(requirements_cli, requirements_file):
expected = "The requirements file '%s' does not exist." % to_native(requirements_file)
with pytest.raises(AnsibleError, match=expected):
requirements_cli._parse_requirements_file(requirements_file)
@pytest.mark.parametrize('requirements_file', ['not a valid yml file: hi: world'], indirect=True)
def test_parse_requirements_file_that_isnt_yaml(requirements_cli, requirements_file):
expected = "Failed to parse the requirements yml at '%s' with the following error" % to_native(requirements_file)
with pytest.raises(AnsibleError, match=expected):
requirements_cli._parse_requirements_file(requirements_file)
@pytest.mark.parametrize('requirements_file', [('''
# Older role based requirements.yml
- galaxy.role
- anotherrole
''')], indirect=True)
def test_parse_requirements_in_older_format_illega(requirements_cli, requirements_file):
expected = "Expecting requirements file to be a dict with the key 'collections' that contains a list of " \
"collections to install"
with pytest.raises(AnsibleError, match=expected):
requirements_cli._parse_requirements_file(requirements_file, allow_old_format=False)
@pytest.mark.parametrize('requirements_file', ['''
collections:
- version: 1.0.0
'''], indirect=True)
def test_parse_requirements_without_mandatory_name_key(requirements_cli, requirements_file):
expected = "Collections requirement entry should contain the key name."
with pytest.raises(AnsibleError, match=expected):
requirements_cli._parse_requirements_file(requirements_file)
@pytest.mark.parametrize('requirements_file', [('''
collections:
- namespace.collection1
- namespace.collection2
'''), ('''
collections:
- name: namespace.collection1
- name: namespace.collection2
''')], indirect=True)
def test_parse_requirements(requirements_cli, requirements_file):
expected = {
'roles': [],
'collections': [('namespace.collection1', '*', None), ('namespace.collection2', '*', None)]
}
actual = requirements_cli._parse_requirements_file(requirements_file)
assert actual == expected
@pytest.mark.parametrize('requirements_file', ['''
collections:
- name: namespace.collection1
version: ">=1.0.0,<=2.0.0"
source: https://galaxy-dev.ansible.com
- namespace.collection2'''], indirect=True)
def test_parse_requirements_with_extra_info(requirements_cli, requirements_file):
actual = requirements_cli._parse_requirements_file(requirements_file)
assert len(actual['roles']) == 0
assert len(actual['collections']) == 2
assert actual['collections'][0][0] == 'namespace.collection1'
assert actual['collections'][0][1] == '>=1.0.0,<=2.0.0'
assert actual['collections'][0][2].api_server == 'https://galaxy-dev.ansible.com'
assert actual['collections'][0][2].name == 'explicit_requirement_namespace.collection1'
assert actual['collections'][0][2].token is None
assert actual['collections'][0][2].username is None
assert actual['collections'][0][2].password is None
assert actual['collections'][0][2].validate_certs is True
assert actual['collections'][1] == ('namespace.collection2', '*', None)
@pytest.mark.parametrize('requirements_file', ['''
roles:
- username.role_name
- src: username2.role_name2
- src: ssh://github.com/user/repo
scm: git
collections:
- namespace.collection2
'''], indirect=True)
def test_parse_requirements_with_roles_and_collections(requirements_cli, requirements_file):
actual = requirements_cli._parse_requirements_file(requirements_file)
assert len(actual['roles']) == 3
assert actual['roles'][0].name == 'username.role_name'
assert actual['roles'][1].name == 'username2.role_name2'
assert actual['roles'][2].name == 'repo'
assert actual['roles'][2].src == 'ssh://github.com/user/repo'
assert len(actual['collections']) == 1
assert actual['collections'][0] == ('namespace.collection2', '*', None)
@pytest.mark.parametrize('requirements_file', ['''
collections:
- name: namespace.collection
- name: namespace2.collection2
source: https://galaxy-dev.ansible.com/
- name: namespace3.collection3
source: server
'''], indirect=True)
def test_parse_requirements_with_collection_source(requirements_cli, requirements_file):
galaxy_api = GalaxyAPI(requirements_cli.api, 'server', 'https://config-server')
requirements_cli.api_servers.append(galaxy_api)
actual = requirements_cli._parse_requirements_file(requirements_file)
assert actual['roles'] == []
assert len(actual['collections']) == 3
assert actual['collections'][0] == ('namespace.collection', '*', None)
assert actual['collections'][1][0] == 'namespace2.collection2'
assert actual['collections'][1][1] == '*'
assert actual['collections'][1][2].api_server == 'https://galaxy-dev.ansible.com/'
assert actual['collections'][1][2].name == 'explicit_requirement_namespace2.collection2'
assert actual['collections'][1][2].token is None
assert actual['collections'][2] == ('namespace3.collection3', '*', galaxy_api)
@pytest.mark.parametrize('requirements_file', ['''
- username.included_role
- src: https://github.com/user/repo
'''], indirect=True)
def test_parse_requirements_roles_with_include(requirements_cli, requirements_file):
reqs = [
'ansible.role',
{'include': requirements_file},
]
parent_requirements = os.path.join(os.path.dirname(requirements_file), 'parent.yaml')
with open(to_bytes(parent_requirements), 'wb') as req_fd:
req_fd.write(to_bytes(yaml.safe_dump(reqs)))
actual = requirements_cli._parse_requirements_file(parent_requirements)
assert len(actual['roles']) == 3
assert actual['collections'] == []
assert actual['roles'][0].name == 'ansible.role'
assert actual['roles'][1].name == 'username.included_role'
assert actual['roles'][2].name == 'repo'
assert actual['roles'][2].src == 'https://github.com/user/repo'
@pytest.mark.parametrize('requirements_file', ['''
- username.role
- include: missing.yml
'''], indirect=True)
def test_parse_requirements_roles_with_include_missing(requirements_cli, requirements_file):
expected = "Failed to find include requirements file 'missing.yml' in '%s'" % to_native(requirements_file)
with pytest.raises(AnsibleError, match=expected):
requirements_cli._parse_requirements_file(requirements_file)

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible import context
from ansible.errors import AnsibleError
from ansible.galaxy.api import GalaxyAPI
from ansible.galaxy.token import GalaxyToken
from ansible.utils import context_objects as co
@pytest.fixture(autouse='function')
def reset_cli_args():
co.GlobalCLIArgs._Singleton__instance = None
# Required to initialise the GalaxyAPI object
context.CLIARGS._store = {'ignore_certs': False}
yield
co.GlobalCLIArgs._Singleton__instance = None
def test_api_no_auth():
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com")
actual = api._auth_header(required=False)
assert actual == {}
def test_api_no_auth_but_required():
expected = "No access token or username set. A token can be set with --api-key, with 'ansible-galaxy login', " \
"or set in ansible.cfg."
with pytest.raises(AnsibleError, match=expected):
GalaxyAPI(None, "test", "https://galaxy.ansible.com")._auth_header()
def test_api_token_auth():
token = GalaxyToken(token=u"my_token")
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", token=token)
actual = api._auth_header()
assert actual == {'Authorization': 'Token my_token'}
def test_api_basic_auth_password():
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", username=u"user", password=u"pass")
actual = api._auth_header()
assert actual == {'Authorization': 'Basic dXNlcjpwYXNz'}
def test_api_basic_auth_no_password():
api = GalaxyAPI(None, "test", "https://galaxy.ansible.com", username=u"user",)
actual = api._auth_header()
assert actual == {'Authorization': 'Basic dXNlcjo='}

@ -21,9 +21,10 @@ from units.compat.mock import MagicMock
import ansible.module_utils.six.moves.urllib.error as urllib_error import ansible.module_utils.six.moves.urllib.error as urllib_error
from ansible import context
from ansible.cli.galaxy import GalaxyCLI from ansible.cli.galaxy import GalaxyCLI
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.galaxy import collection from ansible.galaxy import api, collection, token
from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.utils import context_objects as co from ansible.utils import context_objects as co
from ansible.utils.display import Display from ansible.utils.display import Display
@ -77,20 +78,6 @@ def collection_artifact(monkeypatch, tmp_path_factory):
return input_file, mock_open return input_file, mock_open
@pytest.fixture()
def requirements_file(request, tmp_path_factory):
content = request.param
test_dir = to_text(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Collections Requirements'))
requirements_file = os.path.join(test_dir, 'requirements.yml')
if content:
with open(requirements_file, 'wb') as req_obj:
req_obj.write(to_bytes(content))
yield requirements_file
@pytest.fixture() @pytest.fixture()
def galaxy_yml(request, tmp_path_factory): def galaxy_yml(request, tmp_path_factory):
b_test_dir = to_bytes(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Collections')) b_test_dir = to_bytes(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Collections'))
@ -123,6 +110,14 @@ def tmp_tarfile(tmp_path_factory):
yield temp_dir, tfile, filename, sha256_hash.hexdigest() yield temp_dir, tfile, filename, sha256_hash.hexdigest()
@pytest.fixture()
def galaxy_server():
context.CLIARGS._store = {'ignore_certs': False}
galaxy_api = api.GalaxyAPI(None, 'test_server', 'https://galaxy.ansible.com',
token=token.GalaxyToken(token='key'))
return galaxy_api
def test_build_collection_no_galaxy_yaml(): def test_build_collection_no_galaxy_yaml():
fake_path = u'/fake/ÅÑŚÌβŁÈ/path' fake_path = u'/fake/ÅÑŚÌβŁÈ/path'
expected = to_native("The collection galaxy.yml path '%s/galaxy.yml' does not exist." % fake_path) expected = to_native("The collection galaxy.yml path '%s/galaxy.yml' does not exist." % fake_path)
@ -411,7 +406,7 @@ def test_publish_missing_file():
expected = to_native("The collection path specified '%s' does not exist." % fake_path) expected = to_native("The collection path specified '%s' does not exist." % fake_path)
with pytest.raises(AnsibleError, match=expected): with pytest.raises(AnsibleError, match=expected):
collection.publish_collection(fake_path, None, None, False, True) collection.publish_collection(fake_path, None, True)
def test_publish_not_a_tarball(): def test_publish_not_a_tarball():
@ -422,24 +417,23 @@ def test_publish_not_a_tarball():
temp_file.write(b"\x00") temp_file.write(b"\x00")
temp_file.flush() temp_file.flush()
with pytest.raises(AnsibleError, match=expected.format(to_native(temp_file.name))): with pytest.raises(AnsibleError, match=expected.format(to_native(temp_file.name))):
collection.publish_collection(temp_file.name, None, None, False, True) collection.publish_collection(temp_file.name, None, True)
def test_publish_no_wait(collection_artifact, monkeypatch): def test_publish_no_wait(galaxy_server, collection_artifact, monkeypatch):
mock_display = MagicMock() mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display) monkeypatch.setattr(Display, 'display', mock_display)
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
fake_import_uri = 'https://galaxy.server.com/api/v2/import/1234' fake_import_uri = 'https://galaxy.server.com/api/v2/import/1234'
server = 'https://galaxy.com'
mock_open.return_value = StringIO(u'{"task":"%s"}' % fake_import_uri) mock_open.return_value = StringIO(u'{"task":"%s"}' % fake_import_uri)
expected_form, expected_content_type = collection._get_mime_data(to_bytes(artifact_path)) expected_form, expected_content_type = collection._get_mime_data(to_bytes(artifact_path))
collection.publish_collection(artifact_path, server, 'key', False, False) collection.publish_collection(artifact_path, galaxy_server, False)
assert mock_open.call_count == 1 assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == 'https://galaxy.com/api/v2/collections/' assert mock_open.mock_calls[0][1][0] == '%s/api/v2/collections/' % galaxy_server.api_server
assert mock_open.mock_calls[0][2]['data'] == expected_form assert mock_open.mock_calls[0][2]['data'] == expected_form
assert mock_open.mock_calls[0][2]['method'] == 'POST' assert mock_open.mock_calls[0][2]['method'] == 'POST'
assert mock_open.mock_calls[0][2]['validate_certs'] is True assert mock_open.mock_calls[0][2]['validate_certs'] is True
@ -448,24 +442,26 @@ def test_publish_no_wait(collection_artifact, monkeypatch):
assert mock_open.mock_calls[0][2]['headers']['Content-type'] == expected_content_type assert mock_open.mock_calls[0][2]['headers']['Content-type'] == expected_content_type
assert mock_display.call_count == 2 assert mock_display.call_count == 2
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s" % (artifact_path, server) assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s" \
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[1][1][0] == \ assert mock_display.mock_calls[1][1][0] == \
"Collection has been pushed to the Galaxy server, not waiting until import has completed due to --no-wait " \ "Collection has been pushed to the Galaxy server, not waiting until import has completed due to --no-wait " \
"being set. Import task results can be found at %s" % fake_import_uri "being set. Import task results can be found at %s" % fake_import_uri
def test_publish_dont_validate_cert(collection_artifact): def test_publish_dont_validate_cert(galaxy_server, collection_artifact):
galaxy_server.validate_certs = False
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
mock_open.return_value = StringIO(u'{"task":"https://galaxy.server.com/api/v2/import/1234"}') mock_open.return_value = StringIO(u'{"task":"https://galaxy.server.com/api/v2/import/1234"}')
collection.publish_collection(artifact_path, 'https://galaxy.server.com', 'key', True, False) collection.publish_collection(artifact_path, galaxy_server, False)
assert mock_open.call_count == 1 assert mock_open.call_count == 1
assert mock_open.mock_calls[0][2]['validate_certs'] is False assert mock_open.mock_calls[0][2]['validate_certs'] is False
def test_publish_failure(collection_artifact): def test_publish_failure(galaxy_server, collection_artifact):
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 500, 'msg', {}, StringIO()) mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 500, 'msg', {}, StringIO())
@ -473,10 +469,10 @@ def test_publish_failure(collection_artifact):
expected = 'Error when publishing collection (HTTP Code: 500, Message: Unknown error returned by Galaxy ' \ expected = 'Error when publishing collection (HTTP Code: 500, Message: Unknown error returned by Galaxy ' \
'server. Code: Unknown)' 'server. Code: Unknown)'
with pytest.raises(AnsibleError, match=re.escape(expected)): with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, 'https://galaxy.server.com', 'key', False, True) collection.publish_collection(artifact_path, galaxy_server, True)
def test_publish_failure_with_json_info(collection_artifact): def test_publish_failure_with_json_info(galaxy_server, collection_artifact):
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
return_content = StringIO(u'{"message":"Galaxy error message","code":"GWE002"}') return_content = StringIO(u'{"message":"Galaxy error message","code":"GWE002"}')
@ -484,10 +480,10 @@ def test_publish_failure_with_json_info(collection_artifact):
expected = 'Error when publishing collection (HTTP Code: 503, Message: Galaxy error message Code: GWE002)' expected = 'Error when publishing collection (HTTP Code: 503, Message: Galaxy error message Code: GWE002)'
with pytest.raises(AnsibleError, match=re.escape(expected)): with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, 'https://galaxy.server.com', 'key', False, True) collection.publish_collection(artifact_path, galaxy_server, True)
def test_publish_with_wait(collection_artifact, monkeypatch): def test_publish_with_wait(galaxy_server, collection_artifact, monkeypatch):
mock_display = MagicMock() mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display) monkeypatch.setattr(Display, 'display', mock_display)
@ -495,7 +491,6 @@ def test_publish_with_wait(collection_artifact, monkeypatch):
monkeypatch.setattr(Display, 'vvv', mock_vvv) monkeypatch.setattr(Display, 'vvv', mock_vvv)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234' fake_import_uri = 'https://galaxy-server/api/v2/import/1234'
server = 'https://galaxy.server.com'
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
@ -504,7 +499,7 @@ def test_publish_with_wait(collection_artifact, monkeypatch):
StringIO(u'{"finished_at":"some_time","state":"success"}') StringIO(u'{"finished_at":"some_time","state":"success"}')
) )
collection.publish_collection(artifact_path, server, 'key', False, True) collection.publish_collection(artifact_path, galaxy_server, True)
assert mock_open.call_count == 2 assert mock_open.call_count == 2
assert mock_open.mock_calls[1][1][0] == fake_import_uri assert mock_open.mock_calls[1][1][0] == fake_import_uri
@ -513,12 +508,14 @@ def test_publish_with_wait(collection_artifact, monkeypatch):
assert mock_open.mock_calls[1][2]['method'] == 'GET' assert mock_open.mock_calls[1][2]['method'] == 'GET'
assert mock_display.call_count == 2 assert mock_display.call_count == 2
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s" % (artifact_path, server) assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s" \
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
assert mock_display.mock_calls[1][1][0] == 'Collection has been successfully published to the Galaxy server' assert mock_display.mock_calls[1][1][0] == 'Collection has been successfully published to the Galaxy server'
assert mock_vvv.call_count == 2 assert mock_vvv.call_count == 3
assert mock_vvv.mock_calls[0][1][0] == 'Collection has been pushed to the Galaxy server %s' % server assert mock_vvv.mock_calls[1][1][0] == 'Collection has been pushed to the Galaxy server %s %s' \
assert mock_vvv.mock_calls[1][1][0] == 'Waiting until galaxy import task %s has completed' % fake_import_uri % (galaxy_server.name, galaxy_server.api_server)
assert mock_vvv.mock_calls[2][1][0] == 'Waiting until galaxy import task %s has completed' % fake_import_uri
def test_publish_with_wait_timeout(collection_artifact, monkeypatch): def test_publish_with_wait_timeout(collection_artifact, monkeypatch):
@ -564,7 +561,9 @@ def test_publish_with_wait_timeout(collection_artifact, monkeypatch):
'Galaxy import process has a status of waiting, wait 2 seconds before trying again' 'Galaxy import process has a status of waiting, wait 2 seconds before trying again'
def test_publish_with_wait_timeout(collection_artifact, monkeypatch): def test_publish_with_wait_timeout(galaxy_server, collection_artifact, monkeypatch):
galaxy_server.validate_certs = False
monkeypatch.setattr(time, 'sleep', MagicMock()) monkeypatch.setattr(time, 'sleep', MagicMock())
mock_display = MagicMock() mock_display = MagicMock()
@ -574,7 +573,6 @@ def test_publish_with_wait_timeout(collection_artifact, monkeypatch):
monkeypatch.setattr(Display, 'vvv', mock_vvv) monkeypatch.setattr(Display, 'vvv', mock_vvv)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234' fake_import_uri = 'https://galaxy-server/api/v2/import/1234'
server = 'https://galaxy.server.com'
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
@ -592,7 +590,7 @@ def test_publish_with_wait_timeout(collection_artifact, monkeypatch):
expected = "Timeout while waiting for the Galaxy import process to finish, check progress at '%s'" \ expected = "Timeout while waiting for the Galaxy import process to finish, check progress at '%s'" \
% fake_import_uri % fake_import_uri
with pytest.raises(AnsibleError, match=expected): with pytest.raises(AnsibleError, match=expected):
collection.publish_collection(artifact_path, server, 'key', True, True) collection.publish_collection(artifact_path, galaxy_server, True)
assert mock_open.call_count == 8 assert mock_open.call_count == 8
for i in range(7): for i in range(7):
@ -603,21 +601,23 @@ def test_publish_with_wait_timeout(collection_artifact, monkeypatch):
assert mock_call[2]['method'] == 'GET' assert mock_call[2]['method'] == 'GET'
assert mock_display.call_count == 1 assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s" % (artifact_path, server) assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s" \
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
expected_wait_msg = 'Galaxy import process has a status of waiting, wait {0} seconds before trying again' expected_wait_msg = 'Galaxy import process has a status of waiting, wait {0} seconds before trying again'
assert mock_vvv.call_count == 8 assert mock_vvv.call_count == 9
assert mock_vvv.mock_calls[0][1][0] == 'Collection has been pushed to the Galaxy server %s' % server assert mock_vvv.mock_calls[1][1][0] == 'Collection has been pushed to the Galaxy server %s %s' \
assert mock_vvv.mock_calls[1][1][0] == 'Waiting until galaxy import task %s has completed' % fake_import_uri % (galaxy_server.name, galaxy_server.api_server)
assert mock_vvv.mock_calls[2][1][0] == expected_wait_msg.format(2) assert mock_vvv.mock_calls[2][1][0] == 'Waiting until galaxy import task %s has completed' % fake_import_uri
assert mock_vvv.mock_calls[3][1][0] == expected_wait_msg.format(3) assert mock_vvv.mock_calls[3][1][0] == expected_wait_msg.format(2)
assert mock_vvv.mock_calls[4][1][0] == expected_wait_msg.format(4) assert mock_vvv.mock_calls[4][1][0] == expected_wait_msg.format(3)
assert mock_vvv.mock_calls[5][1][0] == expected_wait_msg.format(6) assert mock_vvv.mock_calls[5][1][0] == expected_wait_msg.format(4)
assert mock_vvv.mock_calls[6][1][0] == expected_wait_msg.format(10) assert mock_vvv.mock_calls[6][1][0] == expected_wait_msg.format(6)
assert mock_vvv.mock_calls[7][1][0] == expected_wait_msg.format(15) assert mock_vvv.mock_calls[7][1][0] == expected_wait_msg.format(10)
assert mock_vvv.mock_calls[8][1][0] == expected_wait_msg.format(15)
def test_publish_with_wait_and_failure(collection_artifact, monkeypatch):
def test_publish_with_wait_and_failure(galaxy_server, collection_artifact, monkeypatch):
mock_display = MagicMock() mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display) monkeypatch.setattr(Display, 'display', mock_display)
@ -631,7 +631,6 @@ def test_publish_with_wait_and_failure(collection_artifact, monkeypatch):
monkeypatch.setattr(Display, 'error', mock_err) monkeypatch.setattr(Display, 'error', mock_err)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234' fake_import_uri = 'https://galaxy-server/api/v2/import/1234'
server = 'https://galaxy.server.com'
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
@ -666,21 +665,23 @@ def test_publish_with_wait_and_failure(collection_artifact, monkeypatch):
expected = 'Galaxy import process failed: Because I said so! (Code: GW001)' expected = 'Galaxy import process failed: Because I said so! (Code: GW001)'
with pytest.raises(AnsibleError, match=re.escape(expected)): with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, server, 'key', True, True) collection.publish_collection(artifact_path, galaxy_server, True)
assert mock_open.call_count == 2 assert mock_open.call_count == 2
assert mock_open.mock_calls[1][1][0] == fake_import_uri assert mock_open.mock_calls[1][1][0] == fake_import_uri
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == 'Token key' assert mock_open.mock_calls[1][2]['headers']['Authorization'] == 'Token key'
assert mock_open.mock_calls[1][2]['validate_certs'] is False assert mock_open.mock_calls[1][2]['validate_certs'] is True
assert mock_open.mock_calls[1][2]['method'] == 'GET' assert mock_open.mock_calls[1][2]['method'] == 'GET'
assert mock_display.call_count == 1 assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s" % (artifact_path, server) assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s" \
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
assert mock_vvv.call_count == 3 assert mock_vvv.call_count == 4
assert mock_vvv.mock_calls[0][1][0] == 'Collection has been pushed to the Galaxy server %s' % server assert mock_vvv.mock_calls[1][1][0] == 'Collection has been pushed to the Galaxy server %s %s' \
assert mock_vvv.mock_calls[1][1][0] == 'Waiting until galaxy import task %s has completed' % fake_import_uri % (galaxy_server.name, galaxy_server.api_server)
assert mock_vvv.mock_calls[2][1][0] == 'Galaxy import message: info - Some info' assert mock_vvv.mock_calls[2][1][0] == 'Waiting until galaxy import task %s has completed' % fake_import_uri
assert mock_vvv.mock_calls[3][1][0] == 'Galaxy import message: info - Some info'
assert mock_warn.call_count == 1 assert mock_warn.call_count == 1
assert mock_warn.mock_calls[0][1][0] == 'Galaxy import warning message: Some warning' assert mock_warn.mock_calls[0][1][0] == 'Galaxy import warning message: Some warning'
@ -689,7 +690,7 @@ def test_publish_with_wait_and_failure(collection_artifact, monkeypatch):
assert mock_err.mock_calls[0][1][0] == 'Galaxy import error message: Some error' assert mock_err.mock_calls[0][1][0] == 'Galaxy import error message: Some error'
def test_publish_with_wait_and_failure_and_no_error(collection_artifact, monkeypatch): def test_publish_with_wait_and_failure_and_no_error(galaxy_server, collection_artifact, monkeypatch):
mock_display = MagicMock() mock_display = MagicMock()
monkeypatch.setattr(Display, 'display', mock_display) monkeypatch.setattr(Display, 'display', mock_display)
@ -703,7 +704,6 @@ def test_publish_with_wait_and_failure_and_no_error(collection_artifact, monkeyp
monkeypatch.setattr(Display, 'error', mock_err) monkeypatch.setattr(Display, 'error', mock_err)
fake_import_uri = 'https://galaxy-server/api/v2/import/1234' fake_import_uri = 'https://galaxy-server/api/v2/import/1234'
server = 'https://galaxy.server.com'
artifact_path, mock_open = collection_artifact artifact_path, mock_open = collection_artifact
@ -734,21 +734,23 @@ def test_publish_with_wait_and_failure_and_no_error(collection_artifact, monkeyp
expected = 'Galaxy import process failed: Unknown error, see %s for more details (Code: UNKNOWN)' % fake_import_uri expected = 'Galaxy import process failed: Unknown error, see %s for more details (Code: UNKNOWN)' % fake_import_uri
with pytest.raises(AnsibleError, match=re.escape(expected)): with pytest.raises(AnsibleError, match=re.escape(expected)):
collection.publish_collection(artifact_path, server, 'key', True, True) collection.publish_collection(artifact_path, galaxy_server, True)
assert mock_open.call_count == 2 assert mock_open.call_count == 2
assert mock_open.mock_calls[1][1][0] == fake_import_uri assert mock_open.mock_calls[1][1][0] == fake_import_uri
assert mock_open.mock_calls[1][2]['headers']['Authorization'] == 'Token key' assert mock_open.mock_calls[1][2]['headers']['Authorization'] == 'Token key'
assert mock_open.mock_calls[1][2]['validate_certs'] is False assert mock_open.mock_calls[1][2]['validate_certs'] is True
assert mock_open.mock_calls[1][2]['method'] == 'GET' assert mock_open.mock_calls[1][2]['method'] == 'GET'
assert mock_display.call_count == 1 assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s" % (artifact_path, server) assert mock_display.mock_calls[0][1][0] == "Publishing collection artifact '%s' to %s %s" \
% (artifact_path, galaxy_server.name, galaxy_server.api_server)
assert mock_vvv.call_count == 3 assert mock_vvv.call_count == 4
assert mock_vvv.mock_calls[0][1][0] == 'Collection has been pushed to the Galaxy server %s' % server assert mock_vvv.mock_calls[1][1][0] == 'Collection has been pushed to the Galaxy server %s %s' \
assert mock_vvv.mock_calls[1][1][0] == 'Waiting until galaxy import task %s has completed' % fake_import_uri % (galaxy_server.name, galaxy_server.api_server)
assert mock_vvv.mock_calls[2][1][0] == 'Galaxy import message: info - Some info' assert mock_vvv.mock_calls[2][1][0] == 'Waiting until galaxy import task %s has completed' % fake_import_uri
assert mock_vvv.mock_calls[3][1][0] == 'Galaxy import message: info - Some info'
assert mock_warn.call_count == 1 assert mock_warn.call_count == 1
assert mock_warn.mock_calls[0][1][0] == 'Galaxy import warning message: Some warning' assert mock_warn.mock_calls[0][1][0] == 'Galaxy import warning message: Some warning'
@ -757,78 +759,6 @@ def test_publish_with_wait_and_failure_and_no_error(collection_artifact, monkeyp
assert mock_err.mock_calls[0][1][0] == 'Galaxy import error message: Some error' assert mock_err.mock_calls[0][1][0] == 'Galaxy import error message: Some error'
@pytest.mark.parametrize('requirements_file', [None], indirect=True)
def test_parse_requirements_file_that_doesnt_exist(requirements_file):
expected = "The requirements file '%s' does not exist." % to_native(requirements_file)
with pytest.raises(AnsibleError, match=expected):
collection.parse_collections_requirements_file(requirements_file)
@pytest.mark.parametrize('requirements_file', ['not a valid yml file: hi: world'], indirect=True)
def test_parse_requirements_file_that_isnt_yaml(requirements_file):
expected = "Failed to parse the collection requirements yml at '%s' with the following error" \
% to_native(requirements_file)
with pytest.raises(AnsibleError, match=expected):
collection.parse_collections_requirements_file(requirements_file)
@pytest.mark.parametrize('requirements_file', [('''
# Older role based requirements.yml
- galaxy.role
- anotherrole
'''), ('''
# Doesn't have collections key
roles:
- galaxy.role
- anotherole
''')], indirect=True)
def test_parse_requirements_in_invalid_format(requirements_file):
expected = "Expecting collections requirements file to be a dict with the key collections that contains a list " \
"of collections to install."
with pytest.raises(AnsibleError, match=expected):
collection.parse_collections_requirements_file(requirements_file)
@pytest.mark.parametrize('requirements_file', ['''
collections:
- version: 1.0.0
'''], indirect=True)
def test_parse_requirements_without_mandatory_name_key(requirements_file):
expected = "Collections requirement entry should contain the key name."
with pytest.raises(AnsibleError, match=expected):
collection.parse_collections_requirements_file(requirements_file)
@pytest.mark.parametrize('requirements_file', [('''
collections:
- namespace.collection1
- namespace.collection2
'''), ('''
collections:
- name: namespace.collection1
- name: namespace.collection2
''')], indirect=True)
def test_parse_requirements(requirements_file):
expected = [('namespace.collection1', '*', None), ('namespace.collection2', '*', None)]
actual = collection.parse_collections_requirements_file(requirements_file)
assert actual == expected
@pytest.mark.parametrize('requirements_file', ['''
collections:
- name: namespace.collection1
version: ">=1.0.0,<=2.0.0"
source: https://galaxy-dev.ansible.com
- namespace.collection2'''], indirect=True)
def test_parse_requirements_with_extra_info(requirements_file):
expected = [('namespace.collection1', '>=1.0.0,<=2.0.0', 'https://galaxy-dev.ansible.com'),
('namespace.collection2', '*', None)]
actual = collection.parse_collections_requirements_file(requirements_file)
assert actual == expected
def test_find_existing_collections(tmp_path_factory, monkeypatch): def test_find_existing_collections(tmp_path_factory, monkeypatch):
test_dir = to_text(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Collections')) test_dir = to_text(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Collections'))
collection1 = os.path.join(test_dir, 'namespace1', 'collection1') collection1 = os.path.join(test_dir, 'namespace1', 'collection1')
@ -869,7 +799,7 @@ def test_find_existing_collections(tmp_path_factory, monkeypatch):
assert actual_collection.namespace == 'namespace1' assert actual_collection.namespace == 'namespace1'
assert actual_collection.name == 'collection1' assert actual_collection.name == 'collection1'
assert actual_collection.b_path == to_bytes(collection1) assert actual_collection.b_path == to_bytes(collection1)
assert actual_collection.source is None assert actual_collection.api is None
assert actual_collection.versions == set(['1.2.3']) assert actual_collection.versions == set(['1.2.3'])
assert actual_collection.latest_version == '1.2.3' assert actual_collection.latest_version == '1.2.3'
assert actual_collection.dependencies == {} assert actual_collection.dependencies == {}
@ -877,7 +807,7 @@ def test_find_existing_collections(tmp_path_factory, monkeypatch):
assert actual_collection.namespace == 'namespace2' assert actual_collection.namespace == 'namespace2'
assert actual_collection.name == 'collection2' assert actual_collection.name == 'collection2'
assert actual_collection.b_path == to_bytes(collection2) assert actual_collection.b_path == to_bytes(collection2)
assert actual_collection.source is None assert actual_collection.api is None
assert actual_collection.versions == set(['*']) assert actual_collection.versions == set(['*'])
assert actual_collection.latest_version == '*' assert actual_collection.latest_version == '*'
assert actual_collection.dependencies == {} assert actual_collection.dependencies == {}

@ -6,6 +6,7 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import copy
import json import json
import os import os
import pytest import pytest
@ -19,9 +20,10 @@ from units.compat.mock import MagicMock
import ansible.module_utils.six.moves.urllib.error as urllib_error import ansible.module_utils.six.moves.urllib.error as urllib_error
from ansible import context
from ansible.cli.galaxy import GalaxyCLI from ansible.cli.galaxy import GalaxyCLI
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.galaxy import collection from ansible.galaxy import collection, api, Galaxy
from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.utils import context_objects as co from ansible.utils import context_objects as co
from ansible.utils.display import Display from ansible.utils.display import Display
@ -71,6 +73,13 @@ def artifact_versions_json(namespace, name, versions, server):
return to_text(json_str) return to_text(json_str)
@pytest.fixture(autouse='function')
def reset_cli_args():
co.GlobalCLIArgs._Singleton__instance = None
yield
co.GlobalCLIArgs._Singleton__instance = None
@pytest.fixture() @pytest.fixture()
def collection_artifact(request, tmp_path_factory): def collection_artifact(request, tmp_path_factory):
test_dir = to_text(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Collections Input')) test_dir = to_text(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Collections Input'))
@ -99,13 +108,20 @@ def collection_artifact(request, tmp_path_factory):
return to_bytes(collection_path), to_bytes(collection_tar) return to_bytes(collection_path), to_bytes(collection_tar)
@pytest.fixture()
def galaxy_server():
context.CLIARGS._store = {'ignore_certs': False}
galaxy_api = api.GalaxyAPI(None, 'test_server', 'https://galaxy.ansible.com')
return galaxy_api
def test_build_requirement_from_path(collection_artifact): def test_build_requirement_from_path(collection_artifact):
actual = collection.CollectionRequirement.from_path(collection_artifact[0], True, True) actual = collection.CollectionRequirement.from_path(collection_artifact[0], True)
assert actual.namespace == u'ansible_namespace' assert actual.namespace == u'ansible_namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path == collection_artifact[0] assert actual.b_path == collection_artifact[0]
assert actual.source is None assert actual.api is None
assert actual.skip is True assert actual.skip is True
assert actual.versions == set([u'*']) assert actual.versions == set([u'*'])
assert actual.latest_version == u'*' assert actual.latest_version == u'*'
@ -127,13 +143,13 @@ def test_build_requirement_from_path_with_manifest(collection_artifact):
with open(manifest_path, 'wb') as manifest_obj: with open(manifest_path, 'wb') as manifest_obj:
manifest_obj.write(to_bytes(manifest_value)) manifest_obj.write(to_bytes(manifest_value))
actual = collection.CollectionRequirement.from_path(collection_artifact[0], True, True) actual = collection.CollectionRequirement.from_path(collection_artifact[0], True)
# While the folder name suggests a different collection, we treat MANIFEST.json as the source of truth. # While the folder name suggests a different collection, we treat MANIFEST.json as the source of truth.
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'name' assert actual.name == u'name'
assert actual.b_path == collection_artifact[0] assert actual.b_path == collection_artifact[0]
assert actual.source is None assert actual.api is None
assert actual.skip is True assert actual.skip is True
assert actual.versions == set([u'1.1.1']) assert actual.versions == set([u'1.1.1'])
assert actual.latest_version == u'1.1.1' assert actual.latest_version == u'1.1.1'
@ -147,7 +163,7 @@ def test_build_requirement_from_path_invalid_manifest(collection_artifact):
expected = "Collection file at '%s' does not contain a valid json string." % to_native(manifest_path) expected = "Collection file at '%s' does not contain a valid json string." % to_native(manifest_path)
with pytest.raises(AnsibleError, match=expected): with pytest.raises(AnsibleError, match=expected):
collection.CollectionRequirement.from_path(collection_artifact[0], True, True) collection.CollectionRequirement.from_path(collection_artifact[0], True)
def test_build_requirement_from_tar(collection_artifact): def test_build_requirement_from_tar(collection_artifact):
@ -156,7 +172,7 @@ def test_build_requirement_from_tar(collection_artifact):
assert actual.namespace == u'ansible_namespace' assert actual.namespace == u'ansible_namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path == collection_artifact[1] assert actual.b_path == collection_artifact[1]
assert actual.source is None assert actual.api is None
assert actual.skip is False assert actual.skip is False
assert actual.versions == set([u'0.1.0']) assert actual.versions == set([u'0.1.0'])
assert actual.latest_version == u'0.1.0' assert actual.latest_version == u'0.1.0'
@ -237,9 +253,8 @@ def test_build_requirement_from_tar_invalid_manifest(tmp_path_factory):
collection.CollectionRequirement.from_tar(tar_path, True, True) collection.CollectionRequirement.from_tar(tar_path, True, True)
def test_build_requirement_from_name(monkeypatch): def test_build_requirement_from_name(galaxy_server, monkeypatch):
galaxy_server = 'https://galaxy.ansible.com' json_str = artifact_versions_json('namespace', 'collection', ['2.1.9', '2.1.10'], galaxy_server.api_server)
json_str = artifact_versions_json('namespace', 'collection', ['2.1.9', '2.1.10'], galaxy_server)
mock_open = MagicMock() mock_open = MagicMock()
mock_open.return_value = StringIO(json_str) mock_open.return_value = StringIO(json_str)
monkeypatch.setattr(collection, 'open_url', mock_open) monkeypatch.setattr(collection, 'open_url', mock_open)
@ -249,20 +264,20 @@ def test_build_requirement_from_name(monkeypatch):
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path is None assert actual.b_path is None
assert actual.source == to_text(galaxy_server) assert actual.api == galaxy_server
assert actual.skip is False assert actual.skip is False
assert actual.versions == set([u'2.1.9', u'2.1.10']) assert actual.versions == set([u'2.1.9', u'2.1.10'])
assert actual.latest_version == u'2.1.10' assert actual.latest_version == u'2.1.10'
assert actual.dependencies is None assert actual.dependencies is None
assert mock_open.call_count == 1 assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" % galaxy_server assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" % galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True} assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
def test_build_requirement_from_name_with_prerelease(monkeypatch): def test_build_requirement_from_name_with_prerelease(galaxy_server, monkeypatch):
galaxy_server = 'https://galaxy-dev.ansible.com' json_str = artifact_versions_json('namespace', 'collection', ['1.0.1', '2.0.1-beta.1', '2.0.1'],
json_str = artifact_versions_json('namespace', 'collection', ['1.0.1', '2.0.1-beta.1', '2.0.1'], galaxy_server) galaxy_server.api_server)
mock_open = MagicMock() mock_open = MagicMock()
mock_open.return_value = StringIO(json_str) mock_open.return_value = StringIO(json_str)
@ -273,20 +288,20 @@ def test_build_requirement_from_name_with_prerelease(monkeypatch):
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path is None assert actual.b_path is None
assert actual.source == to_text(galaxy_server) assert actual.api == galaxy_server
assert actual.skip is False assert actual.skip is False
assert actual.versions == set([u'1.0.1', u'2.0.1']) assert actual.versions == set([u'1.0.1', u'2.0.1'])
assert actual.latest_version == u'2.0.1' assert actual.latest_version == u'2.0.1'
assert actual.dependencies is None assert actual.dependencies is None
assert mock_open.call_count == 1 assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" % galaxy_server assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" \
assert mock_open.mock_calls[0][2] == {'validate_certs': True} % galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
def test_build_requirment_from_name_with_prerelease_explicit(monkeypatch): def test_build_requirment_from_name_with_prerelease_explicit(galaxy_server, monkeypatch):
galaxy_server = 'https://galaxy-dev.ansible.com' json_str = artifact_json('namespace', 'collection', '2.0.1-beta.1', {}, galaxy_server.api_server)
json_str = artifact_json('namespace', 'collection', '2.0.1-beta.1', {}, galaxy_server)
mock_open = MagicMock() mock_open = MagicMock()
mock_open.return_value = StringIO(json_str) mock_open.return_value = StringIO(json_str)
@ -298,7 +313,7 @@ def test_build_requirment_from_name_with_prerelease_explicit(monkeypatch):
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path is None assert actual.b_path is None
assert actual.source == to_text(galaxy_server) assert actual.api == galaxy_server
assert actual.skip is False assert actual.skip is False
assert actual.versions == set([u'2.0.1-beta.1']) assert actual.versions == set([u'2.0.1-beta.1'])
assert actual.latest_version == u'2.0.1-beta.1' assert actual.latest_version == u'2.0.1-beta.1'
@ -306,13 +321,12 @@ def test_build_requirment_from_name_with_prerelease_explicit(monkeypatch):
assert mock_open.call_count == 1 assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/2.0.1-beta.1/" \ assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/2.0.1-beta.1/" \
% galaxy_server % galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True} assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
def test_build_requirement_from_name_second_server(monkeypatch): def test_build_requirement_from_name_second_server(galaxy_server, monkeypatch):
galaxy_server = 'https://galaxy-dev.ansible.com' json_str = artifact_versions_json('namespace', 'collection', ['1.0.1', '1.0.2', '1.0.3'], galaxy_server.api_server)
json_str = artifact_versions_json('namespace', 'collection', ['1.0.1', '1.0.2', '1.0.3'], galaxy_server)
mock_open = MagicMock() mock_open = MagicMock()
mock_open.side_effect = ( mock_open.side_effect = (
urllib_error.HTTPError('https://galaxy.server.com', 404, 'msg', {}, None), urllib_error.HTTPError('https://galaxy.server.com', 404, 'msg', {}, None),
@ -321,13 +335,15 @@ def test_build_requirement_from_name_second_server(monkeypatch):
monkeypatch.setattr(collection, 'open_url', mock_open) monkeypatch.setattr(collection, 'open_url', mock_open)
actual = collection.CollectionRequirement.from_name('namespace.collection', ['https://broken.com/', galaxy_server], broken_server = copy.copy(galaxy_server)
broken_server.api_server = 'https://broken.com/'
actual = collection.CollectionRequirement.from_name('namespace.collection', [broken_server, galaxy_server],
'>1.0.1', False, True) '>1.0.1', False, True)
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path is None assert actual.b_path is None
assert actual.source == to_text(galaxy_server) assert actual.api == galaxy_server
assert actual.skip is False assert actual.skip is False
assert actual.versions == set([u'1.0.2', u'1.0.3']) assert actual.versions == set([u'1.0.2', u'1.0.3'])
assert actual.latest_version == u'1.0.3' assert actual.latest_version == u'1.0.3'
@ -335,12 +351,13 @@ def test_build_requirement_from_name_second_server(monkeypatch):
assert mock_open.call_count == 2 assert mock_open.call_count == 2
assert mock_open.mock_calls[0][1][0] == u"https://broken.com/api/v2/collections/namespace/collection/versions/" assert mock_open.mock_calls[0][1][0] == u"https://broken.com/api/v2/collections/namespace/collection/versions/"
assert mock_open.mock_calls[0][2] == {'validate_certs': False} assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
assert mock_open.mock_calls[1][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" % galaxy_server assert mock_open.mock_calls[1][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" \
assert mock_open.mock_calls[1][2] == {'validate_certs': False} % galaxy_server.api_server
assert mock_open.mock_calls[1][2] == {'validate_certs': True, "headers": {}}
def test_build_requirement_from_name_missing(monkeypatch): def test_build_requirement_from_name_missing(galaxy_server, monkeypatch):
mock_open = MagicMock() mock_open = MagicMock()
mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 404, 'msg', {}, None) mock_open.side_effect = urllib_error.HTTPError('https://galaxy.server.com', 404, 'msg', {}, None)
@ -349,12 +366,11 @@ def test_build_requirement_from_name_missing(monkeypatch):
expected = "Failed to find collection namespace.collection:*" expected = "Failed to find collection namespace.collection:*"
with pytest.raises(AnsibleError, match=expected): with pytest.raises(AnsibleError, match=expected):
collection.CollectionRequirement.from_name('namespace.collection', collection.CollectionRequirement.from_name('namespace.collection',
['https://broken.com/', 'https://broken2.com'], '*', False, True) [galaxy_server, galaxy_server], '*', False, True)
def test_build_requirement_from_name_single_version(monkeypatch): def test_build_requirement_from_name_single_version(galaxy_server, monkeypatch):
galaxy_server = 'https://galaxy.ansible.com' json_str = artifact_json('namespace', 'collection', '2.0.0', {}, galaxy_server.api_server)
json_str = artifact_json('namespace', 'collection', '2.0.0', {}, galaxy_server)
mock_open = MagicMock() mock_open = MagicMock()
mock_open.return_value = StringIO(json_str) mock_open.return_value = StringIO(json_str)
@ -365,7 +381,7 @@ def test_build_requirement_from_name_single_version(monkeypatch):
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path is None assert actual.b_path is None
assert actual.source == to_text(galaxy_server) assert actual.api == galaxy_server
assert actual.skip is False assert actual.skip is False
assert actual.versions == set([u'2.0.0']) assert actual.versions == set([u'2.0.0'])
assert actual.latest_version == u'2.0.0' assert actual.latest_version == u'2.0.0'
@ -373,14 +389,14 @@ def test_build_requirement_from_name_single_version(monkeypatch):
assert mock_open.call_count == 1 assert mock_open.call_count == 1
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/2.0.0/" \ assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/2.0.0/" \
% galaxy_server % galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True} assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
def test_build_requirement_from_name_multiple_versions_one_match(monkeypatch): def test_build_requirement_from_name_multiple_versions_one_match(galaxy_server, monkeypatch):
galaxy_server = 'https://galaxy.ansible.com' json_str1 = artifact_versions_json('namespace', 'collection', ['2.0.0', '2.0.1', '2.0.2'],
json_str1 = artifact_versions_json('namespace', 'collection', ['2.0.0', '2.0.1', '2.0.2'], galaxy_server) galaxy_server.api_server)
json_str2 = artifact_json('namespace', 'collection', '2.0.1', {}, galaxy_server) json_str2 = artifact_json('namespace', 'collection', '2.0.1', {}, galaxy_server.api_server)
mock_open = MagicMock() mock_open = MagicMock()
mock_open.side_effect = (StringIO(json_str1), StringIO(json_str2)) mock_open.side_effect = (StringIO(json_str1), StringIO(json_str2))
@ -392,38 +408,37 @@ def test_build_requirement_from_name_multiple_versions_one_match(monkeypatch):
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path is None assert actual.b_path is None
assert actual.source == to_text(galaxy_server) assert actual.api == galaxy_server
assert actual.skip is False assert actual.skip is False
assert actual.versions == set([u'2.0.1']) assert actual.versions == set([u'2.0.1'])
assert actual.latest_version == u'2.0.1' assert actual.latest_version == u'2.0.1'
assert actual.dependencies == {} assert actual.dependencies == {}
assert mock_open.call_count == 2 assert mock_open.call_count == 2
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" % galaxy_server assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" \
assert mock_open.mock_calls[0][2] == {'validate_certs': True} % galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
assert mock_open.mock_calls[1][1][0] == u"%s/api/v2/collections/namespace/collection/versions/2.0.1/" \ assert mock_open.mock_calls[1][1][0] == u"%s/api/v2/collections/namespace/collection/versions/2.0.1/" \
% galaxy_server % galaxy_server.api_server
assert mock_open.mock_calls[1][2] == {'validate_certs': True} assert mock_open.mock_calls[1][2] == {'validate_certs': True, "headers": {}}
def test_build_requirement_from_name_multiple_version_results(monkeypatch):
galaxy_server = 'https://galaxy-dev.ansible.com'
def test_build_requirement_from_name_multiple_version_results(galaxy_server, monkeypatch):
json_str1 = json.dumps({ json_str1 = json.dumps({
'count': 6, 'count': 6,
'next': '%s/api/v2/collections/namespace/collection/versions/?page=2' % galaxy_server, 'next': '%s/api/v2/collections/namespace/collection/versions/?page=2' % galaxy_server.api_server,
'previous': None, 'previous': None,
'results': [ 'results': [
{ {
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.0/' % galaxy_server, 'href': '%s/api/v2/collections/namespace/collection/versions/2.0.0/' % galaxy_server.api_server,
'version': '2.0.0', 'version': '2.0.0',
}, },
{ {
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.1/' % galaxy_server, 'href': '%s/api/v2/collections/namespace/collection/versions/2.0.1/' % galaxy_server.api_server,
'version': '2.0.1', 'version': '2.0.1',
}, },
{ {
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.2/' % galaxy_server, 'href': '%s/api/v2/collections/namespace/collection/versions/2.0.2/' % galaxy_server.api_server,
'version': '2.0.2', 'version': '2.0.2',
}, },
] ]
@ -431,18 +446,18 @@ def test_build_requirement_from_name_multiple_version_results(monkeypatch):
json_str2 = json.dumps({ json_str2 = json.dumps({
'count': 6, 'count': 6,
'next': None, 'next': None,
'previous': '%s/api/v2/collections/namespace/collection/versions/?page=1' % galaxy_server, 'previous': '%s/api/v2/collections/namespace/collection/versions/?page=1' % galaxy_server.api_server,
'results': [ 'results': [
{ {
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.3/' % galaxy_server, 'href': '%s/api/v2/collections/namespace/collection/versions/2.0.3/' % galaxy_server.api_server,
'version': '2.0.3', 'version': '2.0.3',
}, },
{ {
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.4/' % galaxy_server, 'href': '%s/api/v2/collections/namespace/collection/versions/2.0.4/' % galaxy_server.api_server,
'version': '2.0.4', 'version': '2.0.4',
}, },
{ {
'href': '%s/api/v2/collections/namespace/collection/versions/2.0.5/' % galaxy_server, 'href': '%s/api/v2/collections/namespace/collection/versions/2.0.5/' % galaxy_server.api_server,
'version': '2.0.5', 'version': '2.0.5',
}, },
] ]
@ -458,18 +473,19 @@ def test_build_requirement_from_name_multiple_version_results(monkeypatch):
assert actual.namespace == u'namespace' assert actual.namespace == u'namespace'
assert actual.name == u'collection' assert actual.name == u'collection'
assert actual.b_path is None assert actual.b_path is None
assert actual.source == to_text(galaxy_server) assert actual.api == galaxy_server
assert actual.skip is False assert actual.skip is False
assert actual.versions == set([u'2.0.0', u'2.0.1', u'2.0.3', u'2.0.4', u'2.0.5']) assert actual.versions == set([u'2.0.0', u'2.0.1', u'2.0.3', u'2.0.4', u'2.0.5'])
assert actual.latest_version == u'2.0.5' assert actual.latest_version == u'2.0.5'
assert actual.dependencies is None assert actual.dependencies is None
assert mock_open.call_count == 2 assert mock_open.call_count == 2
assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" % galaxy_server assert mock_open.mock_calls[0][1][0] == u"%s/api/v2/collections/namespace/collection/versions/" \
assert mock_open.mock_calls[0][2] == {'validate_certs': True} % galaxy_server.api_server
assert mock_open.mock_calls[0][2] == {'validate_certs': True, "headers": {}}
assert mock_open.mock_calls[1][1][0] == u"%s/api/v2/collections/namespace/collection/versions/?page=2" \ assert mock_open.mock_calls[1][1][0] == u"%s/api/v2/collections/namespace/collection/versions/?page=2" \
% galaxy_server % galaxy_server.api_server
assert mock_open.mock_calls[1][2] == {'validate_certs': True} assert mock_open.mock_calls[1][2] == {'validate_certs': True, "headers": {}}
@pytest.mark.parametrize('versions, requirement, expected_filter, expected_latest', [ @pytest.mark.parametrize('versions, requirement, expected_filter, expected_latest', [
@ -507,26 +523,24 @@ def test_add_collection_wildcard_requirement_to_unknown_installed_version():
assert req.latest_version == '*' assert req.latest_version == '*'
def test_add_collection_requirement_with_conflict(): def test_add_collection_requirement_with_conflict(galaxy_server):
source = 'https://galaxy.ansible.com'
expected = "Cannot meet requirement ==1.0.2 for dependency namespace.name from source '%s'. Available versions " \ expected = "Cannot meet requirement ==1.0.2 for dependency namespace.name from source '%s'. Available versions " \
"before last requirement added: 1.0.0, 1.0.1\n" \ "before last requirement added: 1.0.0, 1.0.1\n" \
"Requirements from:\n" \ "Requirements from:\n" \
"\tbase - 'namespace.name:==1.0.2'" % source "\tbase - 'namespace.name:==1.0.2'" % galaxy_server.api_server
with pytest.raises(AnsibleError, match=expected): with pytest.raises(AnsibleError, match=expected):
collection.CollectionRequirement('namespace', 'name', None, source, ['1.0.0', '1.0.1'], '==1.0.2', False) collection.CollectionRequirement('namespace', 'name', None, galaxy_server, ['1.0.0', '1.0.1'], '==1.0.2',
False)
def test_add_requirement_to_existing_collection_with_conflict(): def test_add_requirement_to_existing_collection_with_conflict(galaxy_server):
source = 'https://galaxy.ansible.com' req = collection.CollectionRequirement('namespace', 'name', None, galaxy_server, ['1.0.0', '1.0.1'], '*', False)
req = collection.CollectionRequirement('namespace', 'name', None, source, ['1.0.0', '1.0.1'], '*', False)
expected = "Cannot meet dependency requirement 'namespace.name:1.0.2' for collection namespace.collection2 from " \ expected = "Cannot meet dependency requirement 'namespace.name:1.0.2' for collection namespace.collection2 from " \
"source '%s'. Available versions before last requirement added: 1.0.0, 1.0.1\n" \ "source '%s'. Available versions before last requirement added: 1.0.0, 1.0.1\n" \
"Requirements from:\n" \ "Requirements from:\n" \
"\tbase - 'namespace.name:*'\n" \ "\tbase - 'namespace.name:*'\n" \
"\tnamespace.collection2 - 'namespace.name:1.0.2'" % source "\tnamespace.collection2 - 'namespace.name:1.0.2'" % galaxy_server.api_server
with pytest.raises(AnsibleError, match=re.escape(expected)): with pytest.raises(AnsibleError, match=re.escape(expected)):
req.add_requirement('namespace.collection2', '1.0.2') req.add_requirement('namespace.collection2', '1.0.2')
@ -591,7 +605,7 @@ def test_install_collection(collection_artifact, monkeypatch):
% to_text(collection_path) % to_text(collection_path)
def test_install_collection_with_download(collection_artifact, monkeypatch): def test_install_collection_with_download(galaxy_server, collection_artifact, monkeypatch):
collection_tar = collection_artifact[1] collection_tar = collection_artifact[1]
output_path = os.path.join(os.path.split(collection_tar)[0], b'output') output_path = os.path.join(os.path.split(collection_tar)[0], b'output')
collection_path = os.path.join(output_path, b'ansible_namespace', b'collection') collection_path = os.path.join(output_path, b'ansible_namespace', b'collection')
@ -606,7 +620,7 @@ def test_install_collection_with_download(collection_artifact, monkeypatch):
temp_path = os.path.join(os.path.split(collection_tar)[0], b'temp') temp_path = os.path.join(os.path.split(collection_tar)[0], b'temp')
os.makedirs(temp_path) os.makedirs(temp_path)
req = collection.CollectionRequirement('ansible_namespace', 'collection', None, ['https://galaxy.ansible.com'], req = collection.CollectionRequirement('ansible_namespace', 'collection', None, galaxy_server,
['0.1.0'], '*', False) ['0.1.0'], '*', False)
req._galaxy_info = { req._galaxy_info = {
'download_url': 'https://downloadme.com', 'download_url': 'https://downloadme.com',

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import pytest
import ansible.constants as C
from ansible.galaxy.token import GalaxyToken, NoTokenSentinel
from ansible.module_utils._text import to_bytes, to_text
@pytest.fixture()
def b_token_file(request, tmp_path_factory):
b_test_dir = to_bytes(tmp_path_factory.mktemp('test-ÅÑŚÌβŁÈ Token'))
b_token_path = os.path.join(b_test_dir, b"token.yml")
token = getattr(request, 'param', None)
if token:
with open(b_token_path, 'wb') as token_fd:
token_fd.write(b"token: %s" % to_bytes(token))
orig_token_path = C.GALAXY_TOKEN_PATH
C.GALAXY_TOKEN_PATH = to_text(b_token_path)
try:
yield b_token_path
finally:
C.GALAXY_TOKEN_PATH = orig_token_path
def test_token_explicit(b_token_file):
assert GalaxyToken(token="explicit").get() == "explicit"
@pytest.mark.parametrize('b_token_file', ['file'], indirect=True)
def test_token_explicit_override_file(b_token_file):
assert GalaxyToken(token="explicit").get() == "explicit"
@pytest.mark.parametrize('b_token_file', ['file'], indirect=True)
def test_token_from_file(b_token_file):
assert GalaxyToken().get() == "file"
def test_token_from_file_missing(b_token_file):
assert GalaxyToken().get() is None
@pytest.mark.parametrize('b_token_file', ['file'], indirect=True)
def test_token_none(b_token_file):
assert GalaxyToken(token=NoTokenSentinel).get() is None

@ -453,4 +453,4 @@ def test_open_url(urlopen_mock, install_opener_mock, mocker):
url_username=None, url_password=None, http_agent=None, url_username=None, url_password=None, http_agent=None,
force_basic_auth=False, follow_redirects='urllib2', force_basic_auth=False, follow_redirects='urllib2',
client_cert=None, client_key=None, cookies=None, use_gssapi=False, client_cert=None, client_key=None, cookies=None, use_gssapi=False,
unix_socket=None, ca_path=None) unix_socket=None, ca_path=None, unredirected_headers=None)

Loading…
Cancel
Save