You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/lib/ansible/galaxy/collection.py

1203 lines
52 KiB
Python

# Copyright: (c) 2019, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import fnmatch
import json
import operator
import os
import shutil
import stat
import sys
import tarfile
import tempfile
import threading
import time
import yaml
from collections import namedtuple
from contextlib import contextmanager
from distutils.version import LooseVersion
from hashlib import sha256
from io import BytesIO
from yaml.error import YAMLError
try:
import queue
except ImportError:
import Queue as queue # Python 2
import ansible.constants as C
from ansible.errors import AnsibleError
from ansible.galaxy import get_collections_galaxy_meta_info
from ansible.galaxy.api import CollectionVersionMetadata, GalaxyError
from ansible.galaxy.user_agent import user_agent
from ansible.module_utils import six
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.utils.collection_loader import AnsibleCollectionRef
from ansible.utils.display import Display
from ansible.utils.hashing import secure_hash, secure_hash_s
from ansible.utils.version import SemanticVersion
from ansible.module_utils.urls import open_url
urlparse = six.moves.urllib.parse.urlparse
urllib_error = six.moves.urllib.error
display = Display()
MANIFEST_FORMAT = 1
ModifiedContent = namedtuple('ModifiedContent', ['filename', 'expected', 'installed'])
class CollectionRequirement:
_FILE_MAPPING = [(b'MANIFEST.json', 'manifest_file'), (b'FILES.json', 'files_file')]
def __init__(self, namespace, name, b_path, api, versions, requirement, force, parent=None, metadata=None,
files=None, skip=False, allow_pre_releases=False):
"""
Represents a collection requirement, the versions that are available to be installed as well as any
dependencies the collection has.
:param namespace: The collection namespace.
:param name: The collection name.
:param b_path: Byte str of the path to the collection tarball if it has already been downloaded.
:param api: The GalaxyAPI to use if the collection is from Galaxy.
:param versions: A list of versions of the collection that are available.
:param requirement: The version requirement string used to verify the list of versions fit the requirements.
:param force: Whether the force flag applied to the collection.
:param parent: The name of the parent the collection is a dependency of.
:param metadata: The galaxy.api.CollectionVersionMetadata that has already been retrieved from the Galaxy
server.
:param files: The files that exist inside the collection. This is based on the FILES.json file inside the
collection artifact.
:param skip: Whether to skip installing the collection. Should be set if the collection is already installed
and force is not set.
:param allow_pre_releases: Whether to skip pre-release versions of collections.
"""
self.namespace = namespace
self.name = name
self.b_path = b_path
self.api = api
self._versions = set(versions)
self.force = force
self.skip = skip
self.required_by = []
self.allow_pre_releases = allow_pre_releases
self._metadata = metadata
self._files = files
self.add_requirement(parent, requirement)
def __str__(self):
return to_native("%s.%s" % (self.namespace, self.name))
def __unicode__(self):
return u"%s.%s" % (self.namespace, self.name)
@property
def metadata(self):
self._get_metadata()
return self._metadata
@property
def versions(self):
if self.allow_pre_releases:
return self._versions
return set(v for v in self._versions if v == '*' or not SemanticVersion(v).is_prerelease)
@versions.setter
def versions(self, value):
self._versions = set(value)
@property
def pre_releases(self):
return set(v for v in self._versions if SemanticVersion(v).is_prerelease)
@property
def latest_version(self):
try:
return max([v for v in self.versions if v != '*'], key=SemanticVersion)
except ValueError: # ValueError: max() arg is an empty sequence
return '*'
@property
def dependencies(self):
if not self._metadata:
if len(self.versions) > 1:
return {}
self._get_metadata()
dependencies = self._metadata.dependencies
if dependencies is None:
return {}
return dependencies
def add_requirement(self, parent, requirement):
self.required_by.append((parent, requirement))
new_versions = set(v for v in self.versions if self._meets_requirements(v, requirement, parent))
if len(new_versions) == 0:
if self.skip:
force_flag = '--force-with-deps' if parent else '--force'
version = self.latest_version if self.latest_version != '*' else 'unknown'
msg = "Cannot meet requirement %s:%s as it is already installed at version '%s'. Use %s to overwrite" \
% (to_text(self), requirement, version, force_flag)
raise AnsibleError(msg)
elif parent is None:
msg = "Cannot meet requirement %s for dependency %s" % (requirement, to_text(self))
else:
msg = "Cannot meet dependency requirement '%s:%s' for collection %s" \
% (to_text(self), requirement, parent)
collection_source = to_text(self.b_path, nonstring='passthru') or self.api.api_server
req_by = "\n".join(
"\t%s - '%s:%s'" % (to_text(p) if p else 'base', to_text(self), r)
for p, r in self.required_by
)
versions = ", ".join(sorted(self.versions, key=SemanticVersion))
if not self.versions and self.pre_releases:
pre_release_msg = (
'\nThis collection only contains pre-releases. Utilize `--pre` to install pre-releases, or '
'explicitly provide the pre-release version.'
)
else:
pre_release_msg = ''
raise AnsibleError(
"%s from source '%s'. Available versions before last requirement added: %s\nRequirements from:\n%s%s"
% (msg, collection_source, versions, req_by, pre_release_msg)
)
self.versions = new_versions
def download(self, b_path):
download_url = self._metadata.download_url
artifact_hash = self._metadata.artifact_sha256
headers = {}
self.api._add_auth_token(headers, download_url, required=False)
b_collection_path = _download_file(download_url, b_path, artifact_hash, self.api.validate_certs,
headers=headers)
return to_text(b_collection_path, errors='surrogate_or_strict')
def install(self, path, b_temp_path):
if self.skip:
display.display("Skipping '%s' as it is already installed" % to_text(self))
return
# Install if it is not
collection_path = os.path.join(path, self.namespace, self.name)
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
display.display("Installing '%s:%s' to '%s'" % (to_text(self), self.latest_version, collection_path))
if self.b_path is None:
self.b_path = self.download(b_temp_path)
if os.path.exists(b_collection_path):
shutil.rmtree(b_collection_path)
os.makedirs(b_collection_path)
try:
with tarfile.open(self.b_path, mode='r') as collection_tar:
files_member_obj = collection_tar.getmember('FILES.json')
with _tarfile_extract(collection_tar, files_member_obj) as files_obj:
files = json.loads(to_text(files_obj.read(), errors='surrogate_or_strict'))
_extract_tar_file(collection_tar, 'MANIFEST.json', b_collection_path, b_temp_path)
_extract_tar_file(collection_tar, 'FILES.json', b_collection_path, b_temp_path)
for file_info in files['files']:
file_name = file_info['name']
if file_name == '.':
continue
if file_info['ftype'] == 'file':
_extract_tar_file(collection_tar, file_name, b_collection_path, b_temp_path,
expected_hash=file_info['chksum_sha256'])
else:
os.makedirs(os.path.join(b_collection_path, to_bytes(file_name, errors='surrogate_or_strict')))
except Exception:
# Ensure we don't leave the dir behind in case of a failure.
shutil.rmtree(b_collection_path)
b_namespace_path = os.path.dirname(b_collection_path)
if not os.listdir(b_namespace_path):
os.rmdir(b_namespace_path)
raise
def set_latest_version(self):
self.versions = set([self.latest_version])
self._get_metadata()
def verify(self, remote_collection, path, b_temp_tar_path):
if not self.skip:
display.display("'%s' has not been installed, nothing to verify" % (to_text(self)))
return
collection_path = os.path.join(path, self.namespace, self.name)
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
display.vvv("Verifying '%s:%s'." % (to_text(self), self.latest_version))
display.vvv("Installed collection found at '%s'" % collection_path)
display.vvv("Remote collection found at '%s'" % remote_collection.metadata.download_url)
# Compare installed version versus requirement version
if self.latest_version != remote_collection.latest_version:
err = "%s has the version '%s' but is being compared to '%s'" % (to_text(self), self.latest_version, remote_collection.latest_version)
display.display(err)
return
modified_content = []
# Verify the manifest hash matches before verifying the file manifest
expected_hash = _get_tar_file_hash(b_temp_tar_path, 'MANIFEST.json')
self._verify_file_hash(b_collection_path, 'MANIFEST.json', expected_hash, modified_content)
manifest = _get_json_from_tar_file(b_temp_tar_path, 'MANIFEST.json')
# Use the manifest to verify the file manifest checksum
file_manifest_data = manifest['file_manifest_file']
file_manifest_filename = file_manifest_data['name']
expected_hash = file_manifest_data['chksum_%s' % file_manifest_data['chksum_type']]
# Verify the file manifest before using it to verify individual files
self._verify_file_hash(b_collection_path, file_manifest_filename, expected_hash, modified_content)
file_manifest = _get_json_from_tar_file(b_temp_tar_path, file_manifest_filename)
# Use the file manifest to verify individual file checksums
for manifest_data in file_manifest['files']:
if manifest_data['ftype'] == 'file':
expected_hash = manifest_data['chksum_%s' % manifest_data['chksum_type']]
self._verify_file_hash(b_collection_path, manifest_data['name'], expected_hash, modified_content)
if modified_content:
display.display("Collection %s contains modified content in the following files:" % to_text(self))
display.display(to_text(self))
display.vvv(to_text(self.b_path))
for content_change in modified_content:
display.display(' %s' % content_change.filename)
display.vvv(" Expected: %s\n Found: %s" % (content_change.expected, content_change.installed))
else:
display.vvv("Successfully verified that checksums for '%s:%s' match the remote collection" % (to_text(self), self.latest_version))
def _verify_file_hash(self, b_path, filename, expected_hash, error_queue):
b_file_path = to_bytes(os.path.join(to_text(b_path), filename), errors='surrogate_or_strict')
if not os.path.isfile(b_file_path):
actual_hash = None
else:
with open(b_file_path, mode='rb') as file_object:
actual_hash = _consume_file(file_object)
if expected_hash != actual_hash:
error_queue.append(ModifiedContent(filename=filename, expected=expected_hash, installed=actual_hash))
def _get_metadata(self):
if self._metadata:
return
self._metadata = self.api.get_collection_version_metadata(self.namespace, self.name, self.latest_version)
def _meets_requirements(self, version, requirements, parent):
"""
Supports version identifiers can be '==', '!=', '>', '>=', '<', '<=', '*'. Each requirement is delimited by ','
"""
op_map = {
'!=': operator.ne,
'==': operator.eq,
'=': operator.eq,
'>=': operator.ge,
'>': operator.gt,
'<=': operator.le,
'<': operator.lt,
}
for req in list(requirements.split(',')):
op_pos = 2 if len(req) > 1 and req[1] == '=' else 1
op = op_map.get(req[:op_pos])
requirement = req[op_pos:]
if not op:
requirement = req
op = operator.eq
# In the case we are checking a new requirement on a base requirement (parent != None) we can't accept
# version as '*' (unknown version) unless the requirement is also '*'.
if parent and version == '*' and requirement != '*':
display.warning("Failed to validate the collection requirement '%s:%s' for %s when the existing "
"install does not have a version set, the collection may not work."
% (to_text(self), req, parent))
continue
elif requirement == '*' or version == '*':
continue
if not op(SemanticVersion(version), SemanticVersion.from_loose_version(LooseVersion(requirement))):
break
else:
return True
# The loop was broken early, it does not meet all the requirements
return False
@staticmethod
def from_tar(b_path, force, parent=None):
if not tarfile.is_tarfile(b_path):
raise AnsibleError("Collection artifact at '%s' is not a valid tar file." % to_native(b_path))
info = {}
with tarfile.open(b_path, mode='r') as collection_tar:
for b_member_name, property_name in CollectionRequirement._FILE_MAPPING:
n_member_name = to_native(b_member_name)
try:
member = collection_tar.getmember(n_member_name)
except KeyError:
raise AnsibleError("Collection at '%s' does not contain the required file %s."
% (to_native(b_path), n_member_name))
with _tarfile_extract(collection_tar, member) as member_obj:
try:
info[property_name] = json.loads(to_text(member_obj.read(), errors='surrogate_or_strict'))
except ValueError:
raise AnsibleError("Collection tar file member %s does not contain a valid json string."
% n_member_name)
meta = info['manifest_file']['collection_info']
files = info['files_file']['files']
namespace = meta['namespace']
name = meta['name']
version = meta['version']
meta = CollectionVersionMetadata(namespace, name, version, None, None, meta['dependencies'])
if SemanticVersion(version).is_prerelease:
allow_pre_release = True
else:
allow_pre_release = False
return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent,
metadata=meta, files=files, allow_pre_releases=allow_pre_release)
@staticmethod
def from_path(b_path, force, parent=None):
info = {}
for b_file_name, property_name in CollectionRequirement._FILE_MAPPING:
b_file_path = os.path.join(b_path, b_file_name)
if not os.path.exists(b_file_path):
continue
with open(b_file_path, 'rb') as file_obj:
try:
info[property_name] = json.loads(to_text(file_obj.read(), errors='surrogate_or_strict'))
except ValueError:
raise AnsibleError("Collection file at '%s' does not contain a valid json string."
% to_native(b_file_path))
allow_pre_release = False
if 'manifest_file' in info:
manifest = info['manifest_file']['collection_info']
namespace = manifest['namespace']
name = manifest['name']
version = to_text(manifest['version'], errors='surrogate_or_strict')
try:
_v = SemanticVersion()
_v.parse(version)
if _v.is_prerelease:
allow_pre_release = True
except ValueError:
display.warning("Collection at '%s' does not have a valid version set, falling back to '*'. Found "
"version: '%s'" % (to_text(b_path), version))
version = '*'
dependencies = manifest['dependencies']
else:
display.warning("Collection at '%s' does not have a MANIFEST.json file, cannot detect version."
% to_text(b_path))
parent_dir, name = os.path.split(to_text(b_path, errors='surrogate_or_strict'))
namespace = os.path.split(parent_dir)[1]
version = '*'
dependencies = {}
meta = CollectionVersionMetadata(namespace, name, version, None, None, dependencies)
files = info.get('files_file', {}).get('files', {})
return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent,
metadata=meta, files=files, skip=True, allow_pre_releases=allow_pre_release)
@staticmethod
def from_name(collection, apis, requirement, force, parent=None, allow_pre_release=False):
namespace, name = collection.split('.', 1)
galaxy_meta = None
for api in apis:
try:
if not (requirement == '*' or requirement.startswith('<') or requirement.startswith('>') or
requirement.startswith('!=')):
# Exact requirement
allow_pre_release = True
if requirement.startswith('='):
requirement = requirement.lstrip('=')
resp = api.get_collection_version_metadata(namespace, name, requirement)
galaxy_meta = resp
versions = [resp.version]
else:
versions = api.get_collection_versions(namespace, name)
except GalaxyError as err:
if err.http_code == 404:
display.vvv("Collection '%s' is not available from server %s %s"
% (collection, api.name, api.api_server))
continue
raise
display.vvv("Collection '%s' obtained from server %s %s" % (collection, api.name, api.api_server))
break
else:
raise AnsibleError("Failed to find collection %s:%s" % (collection, requirement))
req = CollectionRequirement(namespace, name, None, api, versions, requirement, force, parent=parent,
metadata=galaxy_meta, allow_pre_releases=allow_pre_release)
return req
def build_collection(collection_path, output_path, force):
"""
Creates the Ansible collection artifact in a .tar.gz file.
:param collection_path: The path to the collection to build. This should be the directory that contains the
galaxy.yml file.
:param output_path: The path to create the collection build artifact. This should be a directory.
:param force: Whether to overwrite an existing collection build artifact or fail.
:return: The path to the collection build artifact.
"""
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
b_galaxy_path = os.path.join(b_collection_path, b'galaxy.yml')
if not os.path.exists(b_galaxy_path):
raise AnsibleError("The collection galaxy.yml path '%s' does not exist." % to_native(b_galaxy_path))
collection_meta = _get_galaxy_yml(b_galaxy_path)
file_manifest = _build_files_manifest(b_collection_path, collection_meta['namespace'], collection_meta['name'],
collection_meta['build_ignore'])
collection_manifest = _build_manifest(**collection_meta)
collection_output = os.path.join(output_path, "%s-%s-%s.tar.gz" % (collection_meta['namespace'],
collection_meta['name'],
collection_meta['version']))
b_collection_output = to_bytes(collection_output, errors='surrogate_or_strict')
if os.path.exists(b_collection_output):
if os.path.isdir(b_collection_output):
raise AnsibleError("The output collection artifact '%s' already exists, "
"but is a directory - aborting" % to_native(collection_output))
elif not force:
raise AnsibleError("The file '%s' already exists. You can use --force to re-create "
"the collection artifact." % to_native(collection_output))
_build_collection_tar(b_collection_path, b_collection_output, collection_manifest, file_manifest)
def download_collections(collections, output_path, apis, validate_certs, no_deps, allow_pre_release):
"""
Download Ansible collections as their tarball from a Galaxy server to the path specified and creates a requirements
file of the downloaded requirements to be used for an install.
:param collections: The collections to download, should be a list of tuples with (name, requirement, Galaxy Server).
:param output_path: The path to download the collections to.
:param apis: A list of GalaxyAPIs to query when search for a collection.
:param validate_certs: Whether to validate the certificate if downloading a tarball from a non-Galaxy host.
:param no_deps: Ignore any collection dependencies and only download the base requirements.
:param allow_pre_release: Do not ignore pre-release versions when selecting the latest.
"""
with _tempdir() as b_temp_path:
display.display("Process install dependency map")
with _display_progress():
dep_map = _build_dependency_map(collections, [], b_temp_path, apis, validate_certs, True, True, no_deps,
allow_pre_release=allow_pre_release)
requirements = []
display.display("Starting collection download process to '%s'" % output_path)
with _display_progress():
for name, requirement in dep_map.items():
collection_filename = "%s-%s-%s.tar.gz" % (requirement.namespace, requirement.name,
requirement.latest_version)
dest_path = os.path.join(output_path, collection_filename)
requirements.append({'name': collection_filename, 'version': requirement.latest_version})
display.display("Downloading collection '%s' to '%s'" % (name, dest_path))
b_temp_download_path = requirement.download(b_temp_path)
shutil.move(b_temp_download_path, to_bytes(dest_path, errors='surrogate_or_strict'))
requirements_path = os.path.join(output_path, 'requirements.yml')
display.display("Writing requirements.yml file of downloaded collections to '%s'" % requirements_path)
with open(to_bytes(requirements_path, errors='surrogate_or_strict'), mode='wb') as req_fd:
req_fd.write(to_bytes(yaml.safe_dump({'collections': requirements}), errors='surrogate_or_strict'))
def publish_collection(collection_path, api, wait, timeout):
"""
Publish an Ansible collection tarball into an Ansible Galaxy server.
:param collection_path: The path to the collection tarball to publish.
:param api: A GalaxyAPI to publish the collection to.
:param wait: Whether to wait until the import process is complete.
:param timeout: The time in seconds to wait for the import process to finish, 0 is indefinite.
"""
import_uri = api.publish_collection(collection_path)
if wait:
# Galaxy returns a url fragment which differs between v2 and v3. The second to last entry is
# always the task_id, though.
# v2: {"task": "https://galaxy-dev.ansible.com/api/v2/collection-imports/35573/"}
# v3: {"task": "/api/automation-hub/v3/imports/collections/838d1308-a8f4-402c-95cb-7823f3806cd8/"}
task_id = None
for path_segment in reversed(import_uri.split('/')):
if path_segment:
task_id = path_segment
break
if not task_id:
raise AnsibleError("Publishing the collection did not return valid task info. Cannot wait for task status. Returned task info: '%s'" % import_uri)
display.display("Collection has been published to the Galaxy server %s %s" % (api.name, api.api_server))
with _display_progress():
api.wait_import_task(task_id, timeout)
display.display("Collection has been successfully published and imported to the Galaxy server %s %s"
% (api.name, api.api_server))
else:
display.display("Collection has been pushed to the Galaxy server %s %s, not waiting until import has "
"completed due to --no-wait being set. Import task results can be found at %s"
% (api.name, api.api_server, import_uri))
def install_collections(collections, output_path, apis, validate_certs, ignore_errors, no_deps, force, force_deps,
allow_pre_release=False):
"""
Install Ansible collections to the path specified.
:param collections: The collections to install, should be a list of tuples with (name, requirement, Galaxy server).
:param output_path: The path to install the collections to.
:param apis: A list of GalaxyAPIs to query when searching for a collection.
:param validate_certs: Whether to validate the certificates if downloading a tarball.
:param ignore_errors: Whether to ignore any errors when installing the collection.
:param no_deps: Ignore any collection dependencies and only install the base requirements.
:param force: Re-install a collection if it has already been installed.
:param force_deps: Re-install a collection as well as its dependencies if they have already been installed.
"""
existing_collections = find_existing_collections(output_path)
with _tempdir() as b_temp_path:
display.display("Process install dependency map")
with _display_progress():
dependency_map = _build_dependency_map(collections, existing_collections, b_temp_path, apis,
validate_certs, force, force_deps, no_deps,
allow_pre_release=allow_pre_release)
display.display("Starting collection install process")
with _display_progress():
for collection in dependency_map.values():
try:
collection.install(output_path, b_temp_path)
except AnsibleError as err:
if ignore_errors:
display.warning("Failed to install collection %s but skipping due to --ignore-errors being set. "
"Error: %s" % (to_text(collection), to_text(err)))
else:
raise
def validate_collection_name(name):
"""
Validates the collection name as an input from the user or a requirements file fit the requirements.
:param name: The input name with optional range specifier split by ':'.
:return: The input value, required for argparse validation.
"""
collection, dummy, dummy = name.partition(':')
if AnsibleCollectionRef.is_valid_collection_name(collection):
return name
raise AnsibleError("Invalid collection name '%s', "
"name must be in the format <namespace>.<collection>. \n"
"Please make sure namespace and collection name contains "
"characters from [a-zA-Z0-9_] only." % name)
def validate_collection_path(collection_path):
""" Ensure a given path ends with 'ansible_collections'
:param collection_path: The path that should end in 'ansible_collections'
:return: collection_path ending in 'ansible_collections' if it does not already.
"""
if os.path.split(collection_path)[1] != 'ansible_collections':
return os.path.join(collection_path, 'ansible_collections')
return collection_path
def verify_collections(collections, search_paths, apis, validate_certs, ignore_errors, allow_pre_release=False):
with _display_progress():
with _tempdir() as b_temp_path:
for collection in collections:
try:
local_collection = None
b_collection = to_bytes(collection[0], errors='surrogate_or_strict')
if os.path.isfile(b_collection) or urlparse(collection[0]).scheme.lower() in ['http', 'https'] or len(collection[0].split('.')) != 2:
raise AnsibleError(message="'%s' is not a valid collection name. The format namespace.name is expected." % collection[0])
collection_name = collection[0]
namespace, name = collection_name.split('.')
collection_version = collection[1]
# Verify local collection exists before downloading it from a galaxy server
for search_path in search_paths:
b_search_path = to_bytes(os.path.join(search_path, namespace, name), errors='surrogate_or_strict')
if os.path.isdir(b_search_path):
local_collection = CollectionRequirement.from_path(b_search_path, False)
break
if local_collection is None:
raise AnsibleError(message='Collection %s is not installed in any of the collection paths.' % collection_name)
# Download collection on a galaxy server for comparison
try:
remote_collection = CollectionRequirement.from_name(collection_name, apis, collection_version, False, parent=None,
allow_pre_release=allow_pre_release)
except AnsibleError as e:
if e.message == 'Failed to find collection %s:%s' % (collection[0], collection[1]):
raise AnsibleError('Failed to find remote collection %s:%s on any of the galaxy servers' % (collection[0], collection[1]))
raise
download_url = remote_collection.metadata.download_url
headers = {}
remote_collection.api._add_auth_token(headers, download_url, required=False)
b_temp_tar_path = _download_file(download_url, b_temp_path, None, validate_certs, headers=headers)
local_collection.verify(remote_collection, search_path, b_temp_tar_path)
except AnsibleError as err:
if ignore_errors:
display.warning("Failed to verify collection %s but skipping due to --ignore-errors being set. "
"Error: %s" % (collection[0], to_text(err)))
else:
raise
@contextmanager
def _tempdir():
b_temp_path = tempfile.mkdtemp(dir=to_bytes(C.DEFAULT_LOCAL_TMP, errors='surrogate_or_strict'))
yield b_temp_path
shutil.rmtree(b_temp_path)
@contextmanager
def _tarfile_extract(tar, member):
tar_obj = tar.extractfile(member)
yield tar_obj
tar_obj.close()
@contextmanager
def _display_progress():
config_display = C.GALAXY_DISPLAY_PROGRESS
display_wheel = sys.stdout.isatty() if config_display is None else config_display
if not display_wheel:
yield
return
def progress(display_queue, actual_display):
actual_display.debug("Starting display_progress display thread")
t = threading.current_thread()
while True:
for c in "|/-\\":
actual_display.display(c + "\b", newline=False)
time.sleep(0.1)
# Display a message from the main thread
while True:
try:
method, args, kwargs = display_queue.get(block=False, timeout=0.1)
except queue.Empty:
break
else:
func = getattr(actual_display, method)
func(*args, **kwargs)
if getattr(t, "finish", False):
actual_display.debug("Received end signal for display_progress display thread")
return
class DisplayThread(object):
def __init__(self, display_queue):
self.display_queue = display_queue
def __getattr__(self, attr):
def call_display(*args, **kwargs):
self.display_queue.put((attr, args, kwargs))
return call_display
# Temporary override the global display class with our own which add the calls to a queue for the thread to call.
global display
old_display = display
try:
display_queue = queue.Queue()
display = DisplayThread(display_queue)
t = threading.Thread(target=progress, args=(display_queue, old_display))
t.daemon = True
t.start()
try:
yield
finally:
t.finish = True
t.join()
except Exception:
# The exception is re-raised so we can sure the thread is finished and not using the display anymore
raise
finally:
display = old_display
def _get_galaxy_yml(b_galaxy_yml_path):
meta_info = get_collections_galaxy_meta_info()
mandatory_keys = set()
string_keys = set()
list_keys = set()
dict_keys = set()
for info in meta_info:
if info.get('required', False):
mandatory_keys.add(info['key'])
key_list_type = {
'str': string_keys,
'list': list_keys,
'dict': dict_keys,
}[info.get('type', 'str')]
key_list_type.add(info['key'])
all_keys = frozenset(list(mandatory_keys) + list(string_keys) + list(list_keys) + list(dict_keys))
try:
with open(b_galaxy_yml_path, 'rb') as g_yaml:
galaxy_yml = yaml.safe_load(g_yaml)
except YAMLError as err:
raise AnsibleError("Failed to parse the galaxy.yml at '%s' with the following error:\n%s"
% (to_native(b_galaxy_yml_path), to_native(err)))
set_keys = set(galaxy_yml.keys())
missing_keys = mandatory_keys.difference(set_keys)
if missing_keys:
raise AnsibleError("The collection galaxy.yml at '%s' is missing the following mandatory keys: %s"
% (to_native(b_galaxy_yml_path), ", ".join(sorted(missing_keys))))
extra_keys = set_keys.difference(all_keys)
if len(extra_keys) > 0:
display.warning("Found unknown keys in collection galaxy.yml at '%s': %s"
% (to_text(b_galaxy_yml_path), ", ".join(extra_keys)))
# Add the defaults if they have not been set
for optional_string in string_keys:
if optional_string not in galaxy_yml:
galaxy_yml[optional_string] = None
for optional_list in list_keys:
list_val = galaxy_yml.get(optional_list, None)
if list_val is None:
galaxy_yml[optional_list] = []
elif not isinstance(list_val, list):
galaxy_yml[optional_list] = [list_val]
for optional_dict in dict_keys:
if optional_dict not in galaxy_yml:
galaxy_yml[optional_dict] = {}
# license is a builtin var in Python, to avoid confusion we just rename it to license_ids
galaxy_yml['license_ids'] = galaxy_yml['license']
del galaxy_yml['license']
return galaxy_yml
def _build_files_manifest(b_collection_path, namespace, name, ignore_patterns):
# We always ignore .pyc and .retry files as well as some well known version control directories. The ignore
# patterns can be extended by the build_ignore key in galaxy.yml
b_ignore_patterns = [
b'galaxy.yml',
b'*.pyc',
b'*.retry',
b'tests/output', # Ignore ansible-test result output directory.
to_bytes('{0}-{1}-*.tar.gz'.format(namespace, name)), # Ignores previously built artifacts in the root dir.
]
b_ignore_patterns += [to_bytes(p) for p in ignore_patterns]
b_ignore_dirs = frozenset([b'CVS', b'.bzr', b'.hg', b'.git', b'.svn', b'__pycache__', b'.tox'])
entry_template = {
'name': None,
'ftype': None,
'chksum_type': None,
'chksum_sha256': None,
'format': MANIFEST_FORMAT
}
manifest = {
'files': [
{
'name': '.',
'ftype': 'dir',
'chksum_type': None,
'chksum_sha256': None,
'format': MANIFEST_FORMAT,
},
],
'format': MANIFEST_FORMAT,
}
def _walk(b_path, b_top_level_dir):
for b_item in os.listdir(b_path):
b_abs_path = os.path.join(b_path, b_item)
b_rel_base_dir = b'' if b_path == b_top_level_dir else b_path[len(b_top_level_dir) + 1:]
b_rel_path = os.path.join(b_rel_base_dir, b_item)
rel_path = to_text(b_rel_path, errors='surrogate_or_strict')
if os.path.isdir(b_abs_path):
if any(b_item == b_path for b_path in b_ignore_dirs) or \
any(fnmatch.fnmatch(b_rel_path, b_pattern) for b_pattern in b_ignore_patterns):
display.vvv("Skipping '%s' for collection build" % to_text(b_abs_path))
continue
if os.path.islink(b_abs_path):
b_link_target = os.path.realpath(b_abs_path)
if not b_link_target.startswith(b_top_level_dir):
display.warning("Skipping '%s' as it is a symbolic link to a directory outside the collection"
% to_text(b_abs_path))
continue
manifest_entry = entry_template.copy()
manifest_entry['name'] = rel_path
manifest_entry['ftype'] = 'dir'
manifest['files'].append(manifest_entry)
_walk(b_abs_path, b_top_level_dir)
else:
if any(fnmatch.fnmatch(b_rel_path, b_pattern) for b_pattern in b_ignore_patterns):
display.vvv("Skipping '%s' for collection build" % to_text(b_abs_path))
continue
manifest_entry = entry_template.copy()
manifest_entry['name'] = rel_path
manifest_entry['ftype'] = 'file'
manifest_entry['chksum_type'] = 'sha256'
manifest_entry['chksum_sha256'] = secure_hash(b_abs_path, hash_func=sha256)
manifest['files'].append(manifest_entry)
_walk(b_collection_path, b_collection_path)
return manifest
def _build_manifest(namespace, name, version, authors, readme, tags, description, license_ids, license_file,
dependencies, repository, documentation, homepage, issues, **kwargs):
manifest = {
'collection_info': {
'namespace': namespace,
'name': name,
'version': version,
'authors': authors,
'readme': readme,
'tags': tags,
'description': description,
'license': license_ids,
'license_file': license_file if license_file else None, # Handle galaxy.yml having an empty string (None)
'dependencies': dependencies,
'repository': repository,
'documentation': documentation,
'homepage': homepage,
'issues': issues,
},
'file_manifest_file': {
'name': 'FILES.json',
'ftype': 'file',
'chksum_type': 'sha256',
'chksum_sha256': None, # Filled out in _build_collection_tar
'format': MANIFEST_FORMAT
},
'format': MANIFEST_FORMAT,
}
return manifest
def _build_collection_tar(b_collection_path, b_tar_path, collection_manifest, file_manifest):
files_manifest_json = to_bytes(json.dumps(file_manifest, indent=True), errors='surrogate_or_strict')
collection_manifest['file_manifest_file']['chksum_sha256'] = secure_hash_s(files_manifest_json, hash_func=sha256)
collection_manifest_json = to_bytes(json.dumps(collection_manifest, indent=True), errors='surrogate_or_strict')
with _tempdir() as b_temp_path:
b_tar_filepath = os.path.join(b_temp_path, os.path.basename(b_tar_path))
with tarfile.open(b_tar_filepath, mode='w:gz') as tar_file:
# Add the MANIFEST.json and FILES.json file to the archive
for name, b in [('MANIFEST.json', collection_manifest_json), ('FILES.json', files_manifest_json)]:
b_io = BytesIO(b)
tar_info = tarfile.TarInfo(name)
tar_info.size = len(b)
tar_info.mtime = time.time()
tar_info.mode = 0o0644
tar_file.addfile(tarinfo=tar_info, fileobj=b_io)
for file_info in file_manifest['files']:
if file_info['name'] == '.':
continue
# arcname expects a native string, cannot be bytes
filename = to_native(file_info['name'], errors='surrogate_or_strict')
b_src_path = os.path.join(b_collection_path, to_bytes(filename, errors='surrogate_or_strict'))
def reset_stat(tarinfo):
existing_is_exec = tarinfo.mode & stat.S_IXUSR
tarinfo.mode = 0o0755 if existing_is_exec or tarinfo.isdir() else 0o0644
tarinfo.uid = tarinfo.gid = 0
tarinfo.uname = tarinfo.gname = ''
return tarinfo
tar_file.add(os.path.realpath(b_src_path), arcname=filename, recursive=False, filter=reset_stat)
shutil.copy(b_tar_filepath, b_tar_path)
collection_name = "%s.%s" % (collection_manifest['collection_info']['namespace'],
collection_manifest['collection_info']['name'])
display.display('Created collection for %s at %s' % (collection_name, to_text(b_tar_path)))
def find_existing_collections(path):
collections = []
b_path = to_bytes(path, errors='surrogate_or_strict')
for b_namespace in os.listdir(b_path):
b_namespace_path = os.path.join(b_path, b_namespace)
if os.path.isfile(b_namespace_path):
continue
for b_collection in os.listdir(b_namespace_path):
b_collection_path = os.path.join(b_namespace_path, b_collection)
if os.path.isdir(b_collection_path):
req = CollectionRequirement.from_path(b_collection_path, False)
display.vvv("Found installed collection %s:%s at '%s'" % (to_text(req), req.latest_version,
to_text(b_collection_path)))
collections.append(req)
return collections
def _build_dependency_map(collections, existing_collections, b_temp_path, apis, validate_certs, force, force_deps,
no_deps, allow_pre_release=False):
dependency_map = {}
# First build the dependency map on the actual requirements
for name, version, source in collections:
_get_collection_info(dependency_map, existing_collections, name, version, source, b_temp_path, apis,
validate_certs, (force or force_deps), allow_pre_release=allow_pre_release)
checked_parents = set([to_text(c) for c in dependency_map.values() if c.skip])
while len(dependency_map) != len(checked_parents):
while not no_deps: # Only parse dependencies if no_deps was not set
parents_to_check = set(dependency_map.keys()).difference(checked_parents)
deps_exhausted = True
for parent in parents_to_check:
parent_info = dependency_map[parent]
if parent_info.dependencies:
deps_exhausted = False
for dep_name, dep_requirement in parent_info.dependencies.items():
_get_collection_info(dependency_map, existing_collections, dep_name, dep_requirement,
parent_info.api, b_temp_path, apis, validate_certs, force_deps,
parent=parent, allow_pre_release=allow_pre_release)
checked_parents.add(parent)
# No extra dependencies were resolved, exit loop
if deps_exhausted:
break
# Now we have resolved the deps to our best extent, now select the latest version for collections with
# multiple versions found and go from there
deps_not_checked = set(dependency_map.keys()).difference(checked_parents)
for collection in deps_not_checked:
dependency_map[collection].set_latest_version()
if no_deps or len(dependency_map[collection].dependencies) == 0:
checked_parents.add(collection)
return dependency_map
def _get_collection_info(dep_map, existing_collections, collection, requirement, source, b_temp_path, apis,
validate_certs, force, parent=None, allow_pre_release=False):
dep_msg = ""
if parent:
dep_msg = " - as dependency of %s" % parent
display.vvv("Processing requirement collection '%s'%s" % (to_text(collection), dep_msg))
b_tar_path = None
if os.path.isfile(to_bytes(collection, errors='surrogate_or_strict')):
display.vvvv("Collection requirement '%s' is a tar artifact" % to_text(collection))
b_tar_path = to_bytes(collection, errors='surrogate_or_strict')
elif urlparse(collection).scheme.lower() in ['http', 'https']:
display.vvvv("Collection requirement '%s' is a URL to a tar artifact" % collection)
try:
b_tar_path = _download_file(collection, b_temp_path, None, validate_certs)
except urllib_error.URLError as err:
raise AnsibleError("Failed to download collection tar from '%s': %s"
% (to_native(collection), to_native(err)))
if b_tar_path:
req = CollectionRequirement.from_tar(b_tar_path, force, parent=parent)
collection_name = to_text(req)
if collection_name in dep_map:
collection_info = dep_map[collection_name]
collection_info.add_requirement(None, req.latest_version)
else:
collection_info = req
else:
validate_collection_name(collection)
display.vvvv("Collection requirement '%s' is the name of a collection" % collection)
if collection in dep_map:
collection_info = dep_map[collection]
collection_info.add_requirement(parent, requirement)
else:
apis = [source] if source else apis
collection_info = CollectionRequirement.from_name(collection, apis, requirement, force, parent=parent,
allow_pre_release=allow_pre_release)
existing = [c for c in existing_collections if to_text(c) == to_text(collection_info)]
if existing and not collection_info.force:
# Test that the installed collection fits the requirement
existing[0].add_requirement(parent, requirement)
collection_info = existing[0]
dep_map[to_text(collection_info)] = collection_info
def _download_file(url, b_path, expected_hash, validate_certs, headers=None):
urlsplit = os.path.splitext(to_text(url.rsplit('/', 1)[1]))
b_file_name = to_bytes(urlsplit[0], errors='surrogate_or_strict')
b_file_ext = to_bytes(urlsplit[1], errors='surrogate_or_strict')
b_file_path = tempfile.NamedTemporaryFile(dir=b_path, prefix=b_file_name, suffix=b_file_ext, delete=False).name
display.vvv("Downloading %s to %s" % (url, to_text(b_path)))
# Galaxy redirs downloads to S3 which reject the request if an Authorization header is attached so don't redir that
resp = open_url(to_native(url, errors='surrogate_or_strict'), validate_certs=validate_certs, headers=headers,
unredirected_headers=['Authorization'], http_agent=user_agent())
with open(b_file_path, 'wb') as download_file:
actual_hash = _consume_file(resp, download_file)
if expected_hash:
display.vvvv("Validating downloaded file hash %s with expected hash %s" % (actual_hash, expected_hash))
if expected_hash != actual_hash:
raise AnsibleError("Mismatch artifact hash with downloaded file")
return b_file_path
def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
with _get_tar_file_member(tar, filename) as tar_obj:
with tempfile.NamedTemporaryFile(dir=b_temp_path, delete=False) as tmpfile_obj:
actual_hash = _consume_file(tar_obj, tmpfile_obj)
if expected_hash and actual_hash != expected_hash:
raise AnsibleError("Checksum mismatch for '%s' inside collection at '%s'"
% (to_native(filename, errors='surrogate_or_strict'), to_native(tar.name)))
b_dest_filepath = os.path.abspath(os.path.join(b_dest, to_bytes(filename, errors='surrogate_or_strict')))
b_parent_dir = os.path.dirname(b_dest_filepath)
if b_parent_dir != b_dest and not b_parent_dir.startswith(b_dest + to_bytes(os.path.sep)):
raise AnsibleError("Cannot extract tar entry '%s' as it will be placed outside the collection directory"
% to_native(filename, errors='surrogate_or_strict'))
if not os.path.exists(b_parent_dir):
# Seems like Galaxy does not validate if all file entries have a corresponding dir ftype entry. This check
# makes sure we create the parent directory even if it wasn't set in the metadata.
os.makedirs(b_parent_dir, mode=0o0755)
shutil.move(to_bytes(tmpfile_obj.name, errors='surrogate_or_strict'), b_dest_filepath)
# Default to rw-r--r-- and only add execute if the tar file has execute.
tar_member = tar.getmember(to_native(filename, errors='surrogate_or_strict'))
new_mode = 0o644
if stat.S_IMODE(tar_member.mode) & stat.S_IXUSR:
new_mode |= 0o0111
os.chmod(b_dest_filepath, new_mode)
def _get_tar_file_member(tar, filename):
n_filename = to_native(filename, errors='surrogate_or_strict')
try:
member = tar.getmember(n_filename)
except KeyError:
raise AnsibleError("Collection tar at '%s' does not contain the expected file '%s'." % (
to_native(tar.name),
n_filename))
return _tarfile_extract(tar, member)
def _get_json_from_tar_file(b_path, filename):
file_contents = ''
with tarfile.open(b_path, mode='r') as collection_tar:
with _get_tar_file_member(collection_tar, filename) as tar_obj:
bufsize = 65536
data = tar_obj.read(bufsize)
while data:
file_contents += to_text(data)
data = tar_obj.read(bufsize)
return json.loads(file_contents)
def _get_tar_file_hash(b_path, filename):
with tarfile.open(b_path, mode='r') as collection_tar:
with _get_tar_file_member(collection_tar, filename) as tar_obj:
return _consume_file(tar_obj)
def _consume_file(read_from, write_to=None):
bufsize = 65536
sha256_digest = sha256()
data = read_from.read(bufsize)
while data:
if write_to is not None:
write_to.write(data)
write_to.flush()
sha256_digest.update(data)
data = read_from.read(bufsize)
return sha256_digest.hexdigest()