From d8bddc0d221209885f33817c656f087835b9694d Mon Sep 17 00:00:00 2001 From: Mariusz Mazur Date: Thu, 4 Apr 2019 11:36:38 +0200 Subject: [PATCH] Modify kubevirt_vm crud/wait logic (#54404) 1. Adds proper wait support for VM stops and starts 2. Detect https://github.com/kubevirt/ansible-kubevirt-modules/issues/177 and return a sane error 3. Switch to openshift-restclient 0.9.x style wait code --- lib/ansible/module_utils/kubevirt.py | 52 ++--- .../modules/cloud/kubevirt/kubevirt_vm.py | 219 +++++++++++------- 2 files changed, 148 insertions(+), 123 deletions(-) diff --git a/lib/ansible/module_utils/kubevirt.py b/lib/ansible/module_utils/kubevirt.py index e445554e53f..be1d93a1d54 100644 --- a/lib/ansible/module_utils/kubevirt.py +++ b/lib/ansible/module_utils/kubevirt.py @@ -10,37 +10,18 @@ from distutils.version import Version from ansible.module_utils.k8s.common import list_dict_str from ansible.module_utils.k8s.raw import KubernetesRawModule -try: - from openshift import watch - from openshift.helper.exceptions import KubernetesException -except ImportError: - # Handled in k8s common: - pass - import re MAX_SUPPORTED_API_VERSION = 'v1alpha3' API_GROUP = 'kubevirt.io' -VM_COMMON_ARG_SPEC = { - 'name': {'required': True}, - 'namespace': {'required': True}, - 'state': { - 'default': 'present', - 'choices': ['present', 'absent'], - }, - 'force': { - 'type': 'bool', - 'default': False, - }, +# Put all args that (can) modify 'spec:' here: +VM_SPEC_DEF_ARG_SPEC = { 'resource_definition': { 'type': 'dict', 'aliases': ['definition', 'inline'] }, - 'merge_type': {'type': 'list', 'choices': ['json', 'merge', 'strategic-merge']}, - 'wait': {'type': 'bool', 'default': True}, - 'wait_timeout': {'type': 'int', 'default': 120}, 'memory': {'type': 'str'}, 'memory_limit': {'type': 'str'}, 'cpu_cores': {'type': 'int'}, @@ -59,6 +40,23 @@ VM_COMMON_ARG_SPEC = { 'cpu_shares': {'type': 'int'}, 'cpu_features': {'type': 'list'}, } +# And other common args go here: +VM_COMMON_ARG_SPEC = { + 'name': {'required': True}, + 'namespace': {'required': True}, + 'state': { + 'default': 'present', + 'choices': ['present', 'absent'], + }, + 'force': { + 'type': 'bool', + 'default': False, + }, + 'merge_type': {'type': 'list', 'choices': ['json', 'merge', 'strategic-merge']}, + 'wait': {'type': 'bool', 'default': True}, + 'wait_timeout': {'type': 'int', 'default': 120}, +} +VM_COMMON_ARG_SPEC.update(VM_SPEC_DEF_ARG_SPEC) def virtdict(): @@ -144,18 +142,6 @@ class KubeVirtRawModule(KubernetesRawModule): else: yield (k, y[k]) - def _create_stream(self, resource, namespace, wait_timeout): - """ Create a stream of events for the object """ - w = None - stream = None - try: - w = watch.Watch() - w._api_client = self.client.client - stream = w.stream(resource.get, serialize=False, namespace=namespace, timeout_seconds=wait_timeout) - except KubernetesException as exc: - self.fail_json(msg='Failed to initialize watch: {0}'.format(exc.message)) - return w, stream - def get_resource(self, resource): try: existing = resource.get(name=self.name, namespace=self.namespace) diff --git a/lib/ansible/modules/cloud/kubevirt/kubevirt_vm.py b/lib/ansible/modules/cloud/kubevirt/kubevirt_vm.py index 3c6a76b0af2..5fdcc92d2b7 100644 --- a/lib/ansible/modules/cloud/kubevirt/kubevirt_vm.py +++ b/lib/ansible/modules/cloud/kubevirt/kubevirt_vm.py @@ -29,10 +29,10 @@ options: state: description: - Set the virtual machine to either I(present), I(absent), I(running) or I(stopped). - - "I(present) - Create or update virtual machine." - - "I(absent) - Removes virtual machine." - - "I(running) - Create or update virtual machine and run it." - - "I(stopped) - Stops the virtual machine." + - "I(present) - Create or update a virtual machine. (And run it if it's ephemeral.)" + - "I(absent) - Remove a virtual machine." + - "I(running) - Create or update a virtual machine and run it." + - "I(stopped) - Stop a virtual machine. (This deletes ephemeral VMs.)" default: "present" choices: - present @@ -64,11 +64,11 @@ options: type: list template: description: - - "Template to used to create a virtual machine." + - "Name of Template to be used in creation of a virtual machine." type: str template_parameters: description: - - "Value of parameters to be replaced in template parameters." + - "New values of parameters from Template." type: dict extends_documentation_fragment: @@ -219,17 +219,12 @@ import traceback from ansible.module_utils.k8s.common import AUTH_ARG_SPEC -try: - from openshift.dynamic.client import ResourceInstance -except ImportError: - # Handled in module_utils - pass - from ansible.module_utils.k8s.common import AUTH_ARG_SPEC from ansible.module_utils.kubevirt import ( virtdict, KubeVirtRawModule, VM_COMMON_ARG_SPEC, + VM_SPEC_DEF_ARG_SPEC ) VM_ARG_SPEC = { @@ -246,6 +241,9 @@ VM_ARG_SPEC = { 'template_parameters': {'type': 'dict'}, } +# Which params (can) modify 'spec:' contents of a VM: +VM_SPEC_PARAMS = list(VM_SPEC_DEF_ARG_SPEC.keys()) + ['datavolumes', 'template', 'template_parameters'] + class KubeVirtVM(KubeVirtRawModule): @@ -257,84 +255,80 @@ class KubeVirtVM(KubeVirtRawModule): argument_spec.update(VM_ARG_SPEC) return argument_spec - def _manage_state(self, running, resource, existing, wait, wait_timeout): - definition = {'metadata': {'name': self.name, 'namespace': self.namespace}, 'spec': {'running': running}} - self.patch_resource(resource, definition, existing, self.name, self.namespace, merge_type='merge') - - if wait: - resource = self.find_supported_resource('VirtualMachineInstance') - w, stream = self._create_stream(resource, self.namespace, wait_timeout) - - if wait and stream is not None: - self._read_stream(resource, w, stream, self.name, running) - - def _read_stream(self, resource, watcher, stream, name, running): - """ Wait for ready_replicas to equal the requested number of replicas. """ - for event in stream: - if event.get('object'): - obj = ResourceInstance(resource, event['object']) - if running: - if obj.metadata.name == name and hasattr(obj, 'status'): - phase = getattr(obj.status, 'phase', None) - if phase: - if phase == 'Running' and running: - watcher.stop() - return - else: - # TODO: wait for stopped state: - watcher.stop() - return - - self.fail_json(msg="Error waiting for virtual machine. Try a higher wait_timeout value. %s" % obj.to_dict()) - - def manage_state(self, state): - wait = self.params.get('wait') - wait_timeout = self.params.get('wait_timeout') - resource_version = self.params.get('resource_version') - - resource_vm = self.find_supported_resource('VirtualMachine') - existing = self.get_resource(resource_vm) - if resource_version and resource_version != existing.metadata.resourceVersion: - return False - - existing_running = False - resource_vmi = self.find_supported_resource('VirtualMachineInstance') - existing_running_vmi = self.get_resource(resource_vmi) - if existing_running_vmi and hasattr(existing_running_vmi.status, 'phase'): - existing_running = existing_running_vmi.status.phase == 'Running' - - if state == 'running': - if existing_running: - return False - else: - self._manage_state(True, resource_vm, existing, wait, wait_timeout) - return True - elif state == 'stopped': - if not existing_running: - return False + @staticmethod + def fix_serialization(obj): + if obj and hasattr(obj, 'to_dict'): + return obj.to_dict() + return obj + + def _wait_for_vmi_running(self): + for event in self._kind_resource.watch(namespace=self.namespace, timeout=self.params.get('wait_timeout')): + entity = event['object'] + if entity.metadata.name != self.name: + continue + status = entity.get('status', {}) + phase = status.get('phase', None) + if phase == 'Running': + return entity + + self.fail("Timeout occurred while waiting for virtual machine to start. Maybe try a higher wait_timeout value?") + + def _wait_for_vm_state(self, new_state): + if new_state == 'running': + want_created = want_ready = True + else: + want_created = want_ready = False + + for event in self._kind_resource.watch(namespace=self.namespace, timeout=self.params.get('wait_timeout')): + entity = event['object'] + if entity.metadata.name != self.name: + continue + status = entity.get('status', {}) + created = status.get('created', False) + ready = status.get('ready', False) + if (created, ready) == (want_created, want_ready): + return entity + + self.fail("Timeout occurred while waiting for virtual machine to achieve '{0}' state. " + "Maybe try a higher wait_timeout value?".format(new_state)) + + def manage_vm_state(self, new_state, already_changed): + new_running = True if new_state == 'running' else False + changed = False + k8s_obj = {} + + if not already_changed: + k8s_obj = self.get_resource(self._kind_resource) + if not k8s_obj: + self.fail("VirtualMachine object disappeared during module operation, aborting.") + if k8s_obj.spec.get('running', False) == new_running: + return False, k8s_obj + + newdef = dict(metadata=dict(name=self.name, namespace=self.namespace), spec=dict(running=new_running)) + k8s_obj, err = self.patch_resource(self._kind_resource, newdef, k8s_obj, + self.name, self.namespace, merge_type='merge') + if err: + self.fail_json(**err) else: - self._manage_state(False, resource_vm, existing, wait, wait_timeout) - return True + changed = True - def execute_module(self): - # Parse parameters specific for this module: - self.client = self.get_api_client() - definition = virtdict() - ephemeral = self.params.get('ephemeral') - state = self.params.get('state') + if self.params.get('wait'): + k8s_obj = self._wait_for_vm_state(new_state) - if not ephemeral: - definition['spec']['running'] = state == 'running' + return changed, k8s_obj + + def construct_definition(self, kind, our_state, ephemeral): + definition = virtdict() + processedtemplate = {} # Construct the API object definition: vm_template = self.params.get('template') - processedtemplate = {} if vm_template: # Find the template the VM should be created from: template_resource = self.client.resources.get(api_version='template.openshift.io/v1', kind='Template', name='templates') proccess_template = template_resource.get(name=vm_template, namespace=self.params.get('namespace')) - # Set proper template values set by Ansible parameter 'parameters': + # Set proper template values taken from module option 'template_parameters': for k, v in self.params.get('template_parameters', {}).items(): for parameter in proccess_template.parameters: if parameter.name == k: @@ -344,27 +338,72 @@ class KubeVirtVM(KubeVirtRawModule): processedtemplates_res = self.client.resources.get(api_version='template.openshift.io/v1', kind='Template', name='processedtemplates') processedtemplate = processedtemplates_res.create(proccess_template.to_dict()).to_dict()['objects'][0] + if not ephemeral: + definition['spec']['running'] = our_state == 'running' template = definition if ephemeral else definition['spec']['template'] - kind = 'VirtualMachineInstance' if ephemeral else 'VirtualMachine' template['metadata']['labels']['vm.cnv.io/name'] = self.params.get('name') dummy, definition = self.construct_vm_definition(kind, definition, template) definition = dict(self.merge_dicts(processedtemplate, definition)) - # Create the VM: - result = self.execute_crud(kind, definition) - changed = result['changed'] + return definition - # Manage state of the VM: - if state in ['running', 'stopped']: - if not self.check_mode: - ret = self.manage_state(state) - changed = changed or ret + def execute_module(self): + # Parse parameters specific to this module: + ephemeral = self.params.get('ephemeral') + k8s_state = our_state = self.params.get('state') + kind = 'VirtualMachineInstance' if ephemeral else 'VirtualMachine' + _used_params = [name for name in self.params if self.params[name] is not None] + # Is 'spec:' getting changed? + vm_spec_change = True if set(VM_SPEC_PARAMS).intersection(_used_params) else False + changed = False + crud_executed = False + method = '' + + # Underlying module_utils/k8s/* code knows only of state == present/absent; let's make sure not to confuse it + if ephemeral: + # Ephemerals don't actually support running/stopped; we treat those as aliases for present/absent instead + if our_state == 'running': + self.params['state'] = k8s_state = 'present' + elif our_state == 'stopped': + self.params['state'] = k8s_state = 'absent' + else: + if our_state != 'absent': + self.params['state'] = k8s_state = 'present' + + self.client = self.get_api_client() + self._kind_resource = self.find_supported_resource(kind) + k8s_obj = self.get_resource(self._kind_resource) + if not self.check_mode and not vm_spec_change and k8s_state != 'absent' and not k8s_obj: + self.fail("It's impossible to create an empty VM or change state of a non-existent VM.") + + # Changes in VM's spec or any changes to VMIs warrant a full CRUD, the latter because + # VMIs don't really have states to manage; they're either present or don't exist + # Also check_mode always warrants a CRUD, as that'll produce a sane result + if vm_spec_change or ephemeral or k8s_state == 'absent' or self.check_mode: + definition = self.construct_definition(kind, our_state, ephemeral) + result = self.execute_crud(kind, definition) + changed = result['changed'] + k8s_obj = result['result'] + method = result['method'] + crud_executed = True + + if ephemeral and self.params.get('wait') and k8s_state == 'present' and not self.check_mode: + # Waiting for k8s_state==absent is handled inside execute_crud() + k8s_obj = self._wait_for_vmi_running() + + if not ephemeral and our_state in ['running', 'stopped'] and not self.check_mode: + # State==present/absent doesn't involve any additional VMI state management and is fully + # handled inside execute_crud() (including wait logic) + patched, k8s_obj = self.manage_vm_state(our_state, crud_executed) + changed = changed or patched + if changed: + method = method or 'patch' # Return from the module: self.exit_json(**{ 'changed': changed, - 'kubevirt_vm': result.pop('result'), - 'result': result, + 'kubevirt_vm': self.fix_serialization(k8s_obj), + 'method': method })