diff --git a/lib/ansible/module_utils/k8s/scale.py b/lib/ansible/module_utils/k8s/scale.py new file mode 100644 index 00000000000..981e767d249 --- /dev/null +++ b/lib/ansible/module_utils/k8s/scale.py @@ -0,0 +1,245 @@ +# +# Copyright 2018 Red Hat | Ansible +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import absolute_import, division, print_function + +import copy +import math +import time + +from ansible.module_utils.six import iteritems +from ansible.module_utils.k8s.common import KubernetesAnsibleModule, OpenShiftAnsibleModuleHelper +from ansible.module_utils.k8s.helper import AUTH_ARG_SPEC, COMMON_ARG_SPEC + +try: + from kubernetes import watch + from openshift.helper.exceptions import KubernetesException +except ImportError as exc: + class KubernetesException(Exception): + pass + + +SCALE_ARG_SPEC = { + 'replicas': {'type': 'int', 'required': True}, + 'current_replicas': {'type': 'int'}, + 'resource_version': {}, + 'wait': {'type': 'bool', 'default': True}, + 'wait_timeout': {'type': 'int', 'default': 20} +} + + +class KubernetesAnsibleScaleModule(KubernetesAnsibleModule): + + def execute_module(self): + if self.resource_definition: + resource_params = self.resource_to_parameters(self.resource_definition) + self.params.update(resource_params) + + self._authenticate() + + name = self.params.get('name') + namespace = self.params.get('namespace') + current_replicas = self.params.get('current_replicas') + replicas = self.params.get('replicas') + resource_version = self.params.get('resource_version') + + wait = self.params.get('wait') + wait_time = self.params.get('wait_timeout') + existing = None + existing_count = None + return_attributes = dict(changed=False, result=dict()) + + try: + existing = self.helper.get_object(name, namespace) + return_attributes['result'] = existing.to_dict() + except KubernetesException as exc: + self.fail_json(msg='Failed to retrieve requested object: {0}'.format(exc.message), + error=exc.value.get('status')) + + if self.kind == 'job': + existing_count = existing.spec.parallelism + elif hasattr(existing.spec, 'replicas'): + existing_count = existing.spec.replicas + + if existing_count is None: + self.fail_json(msg='Failed to retrieve the available count for the requested object.') + + if resource_version and resource_version != existing.metadata.resource_version: + self.exit_json(**return_attributes) + + if current_replicas is not None and existing_count != current_replicas: + self.exit_json(**return_attributes) + + if existing_count != replicas: + return_attributes['changed'] = True + if not self.check_mode: + if self.kind == 'job': + existing.spec.parallelism = replicas + k8s_obj = self.helper.patch_object(name, namespace, existing) + else: + k8s_obj = self.scale(existing, replicas, wait, wait_time) + return_attributes['result'] = k8s_obj.to_dict() + + self.exit_json(**return_attributes) + + def resource_to_parameters(self, resource): + """ Converts a resource definition to module parameters """ + parameters = {} + for key, value in iteritems(resource): + if key in ('apiVersion', 'kind', 'status'): + continue + elif key == 'metadata' and isinstance(value, dict): + for meta_key, meta_value in iteritems(value): + if meta_key in ('name', 'namespace', 'resourceVersion'): + parameters[meta_key] = meta_value + return parameters + + @property + def _argspec(self): + args = copy.deepcopy(COMMON_ARG_SPEC) + args.pop('state') + args.pop('force') + args.update(AUTH_ARG_SPEC) + args.update(SCALE_ARG_SPEC) + return args + + def scale(self, existing_object, replicas, wait, wait_time): + name = existing_object.metadata.name + namespace = existing_object.metadata.namespace + method_name = 'patch_namespaced_{0}_scale'.format(self.kind) + method = None + model = None + + try: + method = self.helper.lookup_method(method_name=method_name) + except KubernetesException: + self.fail_json( + msg="Failed to get method {0}. Is 'scale' a valid operation for {1}?".format(method_name, self.kind) + ) + + try: + model = self.helper.get_model(self.api_version, 'scale') + except KubernetesException: + self.fail_json( + msg="Failed to fetch the 'Scale' model for API version {0}. Are you using the correct " + "API?".format(self.api_version) + ) + + scale_obj = model() + scale_obj.kind = 'scale' + scale_obj.api_version = self.api_version.lower() + scale_obj.metadata = self.helper.get_model( + self.api_version, + self.helper.get_base_model_name(scale_obj.swagger_types['metadata']) + )() + scale_obj.metadata.name = name + scale_obj.metadata.namespace = namespace + scale_obj.spec = self.helper.get_model( + self.api_version, + self.helper.get_base_model_name(scale_obj.swagger_types['spec']) + )() + scale_obj.spec.replicas = replicas + + return_obj = None + stream = None + + if wait: + w, stream = self._create_stream(namespace, wait_time) + + try: + method(name, namespace, scale_obj) + except Exception as exc: + self.fail_json( + msg="Scale request failed: {0}".format(exc.message) + ) + + if wait and stream is not None: + return_obj = self._read_stream(w, stream, name, replicas) + + if not return_obj: + return_obj = self._wait_for_response(name, namespace) + + return return_obj + + def _create_stream(self, namespace, wait_time): + """ Create a stream of events for the object """ + w = None + stream = None + try: + list_method = self.helper.lookup_method('list', namespace) + w = watch.Watch() + w._api_client = self.helper.api_client + if namespace: + stream = w.stream(list_method, namespace, timeout_seconds=wait_time) + else: + stream = w.stream(list_method, timeout_seconds=wait_time) + except KubernetesException: + pass + except Exception: + raise + return w, stream + + def _read_stream(self, watcher, stream, name, replicas): + """ Wait for ready_replicas to equal the requested number of replicas. """ + return_obj = None + try: + for event in stream: + if event.get('object'): + obj = event['object'] + if obj.metadata.name == name and hasattr(obj, 'status'): + if hasattr(obj.status, 'ready_replicas') and obj.status.ready_replicas == replicas: + return_obj = obj + watcher.stop() + break + except Exception as exc: + self.fail_json(msg="Exception reading event stream: {0}".format(exc.message)) + + if not return_obj: + self.fail_json(msg="Error fetching the patched object. Try a higher wait_timeout value.") + if return_obj.status.ready_replicas is None: + self.fail_json(msg="Failed to fetch the number of ready replicas. Try a higher wait_timeout value.") + if return_obj.status.ready_replicas != replicas: + self.fail_json(msg="Number of ready replicas is {0}. Failed to reach {1} ready replicas within " + "the wait_timeout period.".format(return_obj.status.ready_replicas, replicas)) + return return_obj + + def _wait_for_response(self, name, namespace): + """ Wait for an API response """ + tries = 0 + half = math.ceil(20 / 2) + obj = None + + while tries <= half: + obj = self.helper.get_object(name, namespace) + if obj: + break + tries += 2 + time.sleep(2) + return obj + + +class OpenShiftAnsibleScaleModule(KubernetesAnsibleScaleModule): + + def _get_helper(self, api_version, kind): + helper = None + try: + helper = OpenShiftAnsibleModuleHelper(api_version=api_version, kind=kind, debug=False) + helper.get_model(api_version, kind) + except KubernetesException as exc: + self.exit_json(msg="Error initializing module helper {}".format(exc.message)) + return helper diff --git a/lib/ansible/modules/clustering/k8s/k8s_scale.py b/lib/ansible/modules/clustering/k8s/k8s_scale.py new file mode 100644 index 00000000000..3fda8e08eaa --- /dev/null +++ b/lib/ansible/modules/clustering/k8s/k8s_scale.py @@ -0,0 +1,127 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# (c) 2018, Chris Houseknecht <@chouseknecht> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + + +__metaclass__ = type + +ANSIBLE_METADATA = {'metadata_version': '1.1', + 'status': ['preview'], + 'supported_by': 'community'} + +DOCUMENTATION = ''' + +module: k8s_scale + +short_description: Set a new size for a Deployment, ReplicaSet, Replication Controller, or Job. + +version_added: "2.5" + +author: "Chris Houseknecht (@chouseknecht)" + +description: + - Similar to the kubectl scale command. Use to set the number of replicas for a Deployment, ReplicatSet, + or Replication Controller, or the parallelism attribute of a Job. Supports check mode. + +extends_documentation_fragment: + - k8s_name_options + - k8s_auth_options + - k8s_resource_options + - k8s_scale_options + +requirements: + - "python >= 2.7" + - "openshift >= 0.3" + - "PyYAML >= 3.11" +''' + +EXAMPLES = ''' +- name: Scale deployment up, and extend timeout + k8s_scale: + api_version: v1 + kind: Deployment + name: elastic + namespace: myproject + replicas: 3 + wait_timeout: 60 + +- name: Scale deployment down when current replicas match + k8s_scale: + api_version: v1 + kind: Deployment + name: elastic + namespace: myproject + current_replicas: 3 + replicas: 2 + +- name: Increase job parallelism + k8s_scale: + api_version: batch/v1 + kind: job + name: pi-with-timeout + namespace: testing + replicas: 2 + +# Match object using local file or inline definition + +- name: Scale deployment based on a file from the local filesystem + k8s_scale: + src: /myproject/elastic_deployment.yml + replicas: 3 + wait: no + +- name: Scale deployment based on a template output + k8s_scale: + resource_definition: "{{ lookup('template', '/myproject/elastic_deployment.yml') | from_yaml }}" + replicas: 3 + wait: no + +- name: Scale deployment based on a file from the Ansible controller filesystem + k8s_scale: + resource_definition: "{{ lookup('file', '/myproject/elastic_deployment.yml') | from_yaml }}" + replicas: 3 + wait: no +''' + +RETURN = ''' +result: + description: + - If a change was made, will return the patched object, otherwise returns the existing object. + returned: success + type: complex + contains: + api_version: + description: The versioned schema of this representation of an object. + returned: success + type: str + kind: + description: Represents the REST resource this object represents. + returned: success + type: str + metadata: + description: Standard object metadata. Includes name, namespace, annotations, labels, etc. + returned: success + type: complex + spec: + description: Specific attributes of the object. Will vary based on the I(api_version) and I(kind). + returned: success + type: complex + status: + description: Current status details for the object. + returned: success + type: complex +''' + +from ansible.module_utils.k8s.scale import KubernetesAnsibleScaleModule + + +def main(): + KubernetesAnsibleScaleModule().execute_module() + + +if __name__ == '__main__': + main() diff --git a/lib/ansible/utils/module_docs_fragments/k8s_scale_options.py b/lib/ansible/utils/module_docs_fragments/k8s_scale_options.py new file mode 100644 index 00000000000..6eef7cbb8f7 --- /dev/null +++ b/lib/ansible/utils/module_docs_fragments/k8s_scale_options.py @@ -0,0 +1,51 @@ +# +# Copyright 2018 Red Hat | Ansible +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Options used by scale modules. + + +class ModuleDocFragment(object): + + DOCUMENTATION = ''' +options: + replicas: + description: + - The desired number of replicas. + current_replicas: + description: + - For Deployment, ReplicaSet, Replication Controller, only scale, if the number of existing replicas + matches. In the case of a Job, update parallelism only if the current parallelism value matches. + type: int + resource_version: + description: + - Only attempt to scale, if the current object version matches. + type: str + wait: + description: + - For Deployment, ReplicaSet, Replication Controller, wait for the status value of I(ready_replicas) to change + to the number of I(replicas). In the case of a Job, this option is ignored. + type: bool + default: true + wait_timeout: + description: + - When C(wait) is I(True), the number of seconds to wait for the I(ready_replicas) status to equal I(replicas). + If the status is not reached within the allotted time, an error will result. In the case of a Job, this option + is ignored. + type: int + default: 20 +'''