opennebula: new module one_host (#40041)

pull/40311/head
Rafael 6 years ago committed by René Moser
parent 4fd770f792
commit 44eaa2c007

@ -0,0 +1,352 @@
#
# Copyright 2018 www.privaz.io Valletech AB
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
import time
import ssl
from os import environ
from ansible.module_utils.six import string_types
from ansible.module_utils.basic import AnsibleModule
HAS_PYONE = True
try:
import pyone
from pyone import OneException
from pyone.tester import OneServerTester
except ImportError:
OneException = Exception
HAS_PYONE = False
class OpenNebulaModule:
"""
Base class for all OpenNebula Ansible Modules.
This is basically a wrapper of the common arguments, the pyone client and
Some utility methods. It will also create a Test client if fixtures are
to be replayed or recorded and manage that they are flush to disk when
required.
"""
common_args = dict(
api_url=dict(type='str', aliases=['api_endpoint']),
api_username=dict(type='str'),
api_password=dict(type='str', no_log=True, aliases=['api_token']),
validate_certs=dict(default=True, type='bool'),
wait_timeout=dict(type='int', default=300),
)
def __init__(self, argument_spec, supports_check_mode=False, mutually_exclusive=None):
module_args = OpenNebulaModule.common_args
module_args.update(argument_spec)
self.module = AnsibleModule(argument_spec=module_args,
supports_check_mode=supports_check_mode,
mutually_exclusive=mutually_exclusive)
self.result = dict(changed=False,
original_message='',
message='')
self.one = self.create_one_client()
self.resolved_parameters = self.resolve_parameters()
def create_one_client(self):
"""
Creates an XMLPRC client to OpenNebula.
Dependign on environment variables it will implement a test client.
Returns: the new xmlrpc client.
"""
test_fixture = (environ.get("ONE_TEST_FIXTURE", "False").lower() in ["1", "yes", "true"])
test_fixture_file = environ.get("ONE_TEST_FIXTURE_FILE", "undefined")
test_fixture_replay = (environ.get("ONE_TEST_FIXTURE_REPLAY", "True").lower() in ["1", "yes", "true"])
test_fixture_unit = environ.get("ONE_TEST_FIXTURE_UNIT", "init")
# context required for not validating SSL, old python versions won't validate anyway.
if hasattr(ssl, '_create_unverified_context'):
no_ssl_validation_context = ssl._create_unverified_context()
else:
no_ssl_validation_context = None
# Check if the module can run
if not HAS_PYONE:
self.fail("pyone is required for this module")
if 'api_url' in self.module.params:
url = self.module.params.get("api_url", environ.get("ONE_URL", False))
else:
self.fail("Either api_url or the environment variable ONE_URL must be provided")
if 'api_username' in self.module.params:
username = self.module.params.get("api_username", environ.get("ONE_USERNAME", False))
else:
self.fail("Either api_username or the environment vairable ONE_USERNAME must be provided")
if 'api_password' in self.module.params:
password = self.module.params.get("api_password", environ.get("ONE_PASSWORD", False))
else:
self.fail("Either api_password or the environment vairable ONE_PASSWORD must be provided")
session = "%s:%s" % (username, password)
if not test_fixture:
if not self.module.params.get("validate_certs") and "PYTHONHTTPSVERIFY" not in environ:
return pyone.OneServer(url, session=session, context=no_ssl_validation_context)
else:
return pyone.OneServer(url, session)
else:
if not self.module.params.get("validate_certs") and "PYTHONHTTPSVERIFY" not in environ:
one = OneServerTester(url,
fixture_file=test_fixture_file,
fixture_replay=test_fixture_replay,
session=session,
context=no_ssl_validation_context)
else:
one = OneServerTester(url,
fixture_file=test_fixture_file,
fixture_replay=test_fixture_replay,
session=session)
one.set_fixture_unit_test(test_fixture_unit)
return one
def close_one_client(self):
"""
Closing is only require in the event of fixture recording, as fixtures will be dumped to file
"""
if self.is_fixture_writing():
self.one._close_fixtures()
def fail(self, msg):
"""
Utility failure method, will ensure fixtures are flushed before failing.
Args:
msg: human readable failure reason.
"""
if hasattr(self, 'one'):
self.close_one_client()
self.module.fail_json(msg=msg)
def exit(self):
"""
Utility exit method, will ensure fixtures are flushed before exiting.
"""
if hasattr(self, 'one'):
self.close_one_client()
self.module.exit_json(**self.result)
def resolve_parameters(self):
"""
This method resolves parameters provided by a secondary ID to the primary ID.
For example if cluster_name is present, cluster_id will be introduced by performing
the required resolution
Returns: a copy of the parameters that includes the resolved parameters.
"""
resolved_params = dict(self.module.params)
if 'cluster_name' in self.module.params:
clusters = self.one.clusterpool.info()
for cluster in clusters.CLUSTER:
if cluster.NAME == self.module.params.get('cluster_name'):
resolved_params['cluster_id'] = cluster.ID
return resolved_params
def is_parameter(self, name):
"""
Utility method to check if a parameter was provided or is resolved
Args:
name: the parameter to check
"""
if name in self.resolved_parameters:
return self.get_parameter(name) is not None
else:
return False
def get_parameter(self, name):
"""
Utility method for accessing parameters that includes resolved ID
parameters from provided Name parameters.
"""
return self.resolved_parameters.get(name)
def is_fixture_replay(self):
"""
Returns: true if we are currently running fixtures in replay mode.
"""
return (environ.get("ONE_TEST_FIXTURE", "False").lower() in ["1", "yes", "true"]) and \
(environ.get("ONE_TEST_FIXTURE_REPLAY", "True").lower() in ["1", "yes", "true"])
def is_fixture_writing(self):
"""
Returns: true if we are currently running fixtures in write mode.
"""
return (environ.get("ONE_TEST_FIXTURE", "False").lower() in ["1", "yes", "true"]) and \
(environ.get("ONE_TEST_FIXTURE_REPLAY", "True").lower() in ["0", "no", "false"])
def get_host_by_name(self, name):
'''
Returns a host given its name.
Args:
name: the name of the host
Returns: the host object or None if the host is absent.
'''
hosts = self.one.hostpool.info()
for h in hosts.HOST:
if h.NAME == name:
return h
return None
def get_cluster_by_name(self, name):
"""
Returns a cluster given its name.
Args:
name: the name of the cluster
Returns: the cluster object or None if the host is absent.
"""
clusters = self.one.clusterpool.info()
for c in clusters.CLUSTER:
if c.NAME == name:
return c
return None
def get_template_by_name(self, name):
'''
Returns a template given its name.
Args:
name: the name of the template
Returns: the template object or None if the host is absent.
'''
templates = self.one.templatepool.info()
for t in templates.TEMPLATE:
if t.NAME == name:
return t
return None
def cast_template(self, template):
"""
OpenNebula handles all template elements as strings
At some point there is a cast being performed on types provided by the user
This function mimics that transformation so that required template updates are detected properly
additionally an array will be converted to a comma separated list,
which works for labels and hopefully for something more.
Args:
template: the template to transform
Returns: the transformed template with data casts applied.
"""
# TODO: check formally available data types in templates
# TODO: some arrays might be converted to space separated
for key in template:
value = template[key]
if isinstance(value, dict):
self.cast_template(template[key])
elif isinstance(value, list):
template[key] = ', '.join(value)
elif not isinstance(value, string_types):
template[key] = str(value)
def requires_template_update(self, current, desired):
"""
This function will help decide if a template update is required or not
If a desired key is missing from the current dictionary an update is required
If the intersection of both dictionaries is not deep equal, an update is required
Args:
current: current template as a dictionary
desired: desired template as a dictionary
Returns: True if a template update is required
"""
if not desired:
return False
self.cast_template(desired)
intersection = dict()
for dkey in desired.keys():
if dkey in current.keys():
intersection[dkey] = current[dkey]
else:
return True
return not (desired == intersection)
def wait_for_state(self, element_name, state, state_name, target_states,
invalid_states=None, transition_states=None,
wait_timeout=None):
"""
Args:
element_name: the name of the object we are waiting for: HOST, VM, etc.
state: lambda that returns the current state, will be queried until target state is reached
state_name: lambda that returns the readable form of a given state
target_states: states expected to be reached
invalid_states: if any of this states is reached, fail
transition_states: when used, these are the valid states during the transition.
wait_timeout: timeout period in seconds. Defaults to the provided parameter.
"""
if not wait_timeout:
wait_timeout = self.module.params.get("wait_timeout")
if self.is_fixture_replay():
sleep_time_ms = 0.01
else:
sleep_time_ms = 1
start_time = time.time()
while (time.time() - start_time) < wait_timeout:
current_state = state()
if current_state in invalid_states:
self.fail('invalid %s state %s' % (element_name, state_name(current_state)))
if transition_states:
if current_state not in transition_states:
self.fail('invalid %s transition state %s' % (element_name, state_name(current_state)))
if current_state in target_states:
return True
time.sleep(sleep_time_ms)
self.fail(msg="Wait timeout has expired!")
def run_module(self):
"""
trigger the start of the execution of the module.
Returns:
"""
try:
self.run(self.one, self.module, self.result)
except OneException as e:
self.fail(msg="OpenNebula Exception: %s" % e)
def run(self, one, module, result):
"""
to be implemented by subclass with the actual module actions.
Args:
one: the OpenNebula XMLRPC client
module: the Ansible Module object
result: the Ansible result
"""
raise NotImplementedError("Method requires implementation")

@ -0,0 +1,280 @@
#!/usr/bin/python
#
# Copyright 2018 www.privaz.io Valletech AB
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
ANSIBLE_METADATA = {
'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'
}
DOCUMENTATION = '''
---
module: one_host
short_description: Manages OpenNebula Hosts
version_added: "2.6"
requirements:
- pyone
description:
- "Manages OpenNebula Hosts"
options:
name:
description:
- Hostname of the machine to manage.
required: true
state:
description:
- Takes the host to the desired lifecycle state.
- If C(absent) the host will be deleted from the cluster.
- If C(present) the host will be created in the cluster (includes C(enabled), C(disabled) and C(offline) states).
- If C(enabled) the host is fully operational.
- C(disabled), e.g. to perform maintenance operations.
- C(offline), host is totally offline.
choices:
- absent
- present
- enabled
- disabled
- offline
default: present
im_mad_name:
description:
- The name of the information manager, this values are taken from the oned.conf with the tag name IM_MAD (name)
default: kvm
vmm_mad_name:
description:
- The name of the virtual machine manager mad name, this values are taken from the oned.conf with the tag name VM_MAD (name)
default: kvm
cluster_id:
description:
- The cluster ID.
default: 0
cluster_name:
description:
- The cluster specified by name.
labels:
description:
- The labels for this host.
template:
description:
- The template or attribute changes to merge into the host template.
aliases:
- attributes
extends_documentation_fragment: opennebula
author:
- Rafael del Valle (@rvalle)
'''
EXAMPLES = '''
- name: Create a new host in OpenNebula
one_host:
name: host1
cluster_id: 1
api_url: http://127.0.0.1:2633/RPC2
- name: Create a host and adjust its template
one_host:
name: host2
cluster_name: default
template:
LABELS:
- gold
- ssd
RESERVED_CPU: -100
'''
# TODO: pending setting guidelines on returned values
RETURN = '''
'''
# TODO: Documentation on valid state transitions is required to properly implement all valid cases
# TODO: To be coherent with CLI this module should also provide "flush" functionality
from ansible.module_utils.opennebula import OpenNebulaModule
try:
from pyone import HOST_STATES, HOST_STATUS
except ImportError:
pass # handled at module utils
# Pseudo definitions...
HOST_ABSENT = -99 # the host is absent (special case defined by this module)
class HostModule(OpenNebulaModule):
def __init__(self):
argument_spec = dict(
name=dict(type='str', required=True),
state=dict(choices=['present', 'absent', 'enabled', 'disabled', 'offline'], default='present'),
im_mad_name=dict(type='str', default="kvm"),
vmm_mad_name=dict(type='str', default="kvm"),
cluster_id=dict(type='int', default=0),
cluster_name=dict(type='str'),
labels=dict(type='list'),
template=dict(type='dict', aliases=['attributes']),
)
mutually_exclusive = [
['cluster_id', 'cluster_name']
]
OpenNebulaModule.__init__(self, argument_spec, mutually_exclusive=mutually_exclusive)
def allocate_host(self):
"""
Creates a host entry in OpenNebula
Returns: True on success, fails otherwise.
"""
if not self.one.host.allocate(self.get_parameter('name'),
self.get_parameter('vmm_mad_name'),
self.get_parameter('im_mad_name'),
self.get_parameter('cluster_id')):
self.fail(msg="could not allocate host")
else:
self.result['changed'] = True
return True
def wait_for_host_state(self, host, target_states):
"""
Utility method that waits for a host state.
Args:
host:
target_states:
"""
return self.wait_for_state('host',
lambda: self.one.host.info(host.ID).STATE,
lambda s: HOST_STATES(s).name, target_states,
invalid_states=[HOST_STATES.ERROR, HOST_STATES.MONITORING_ERROR])
def run(self, one, module, result):
# Get the list of hosts
host_name = self.get_parameter("name")
host = self.get_host_by_name(host_name)
# manage host state
desired_state = self.get_parameter('state')
if bool(host):
current_state = host.STATE
current_state_name = HOST_STATES(host.STATE).name
else:
current_state = HOST_ABSENT
current_state_name = "ABSENT"
# apply properties
if desired_state == 'present':
if current_state == HOST_ABSENT:
self.allocate_host()
host = self.get_host_by_name(host_name)
self.wait_for_host_state(host, [HOST_STATES.MONITORED])
elif current_state in [HOST_STATES.ERROR, HOST_STATES.MONITORING_ERROR]:
self.fail(msg="invalid host state %s" % current_state_name)
elif desired_state == 'enabled':
if current_state == HOST_ABSENT:
self.allocate_host()
host = self.get_host_by_name(host_name)
self.wait_for_host_state(host, [HOST_STATES.MONITORED])
elif current_state in [HOST_STATES.DISABLED, HOST_STATES.OFFLINE]:
if one.host.status(host.ID, HOST_STATUS.ENABLED):
self.wait_for_host_state(host, [HOST_STATES.MONITORED])
result['changed'] = True
else:
self.fail(msg="could not enable host")
elif current_state in [HOST_STATES.MONITORED]:
pass
else:
self.fail(msg="unknown host state %s, cowardly refusing to change state to enable" % current_state_name)
elif desired_state == 'disabled':
if current_state == HOST_ABSENT:
self.fail(msg='absent host cannot be put in disabled state')
elif current_state in [HOST_STATES.MONITORED, HOST_STATES.OFFLINE]:
if one.host.status(host.ID, HOST_STATUS.DISABLED):
self.wait_for_host_state(host, [HOST_STATES.DISABLED])
result['changed'] = True
else:
self.fail(msg="could not disable host")
elif current_state in [HOST_STATES.DISABLED]:
pass
else:
self.fail(msg="unknown host state %s, cowardly refusing to change state to disable" % current_state_name)
elif desired_state == 'offline':
if current_state == HOST_ABSENT:
self.fail(msg='absent host cannot be placed in offline state')
elif current_state in [HOST_STATES.MONITORED, HOST_STATES.DISABLED]:
if one.host.status(host.ID, HOST_STATUS.OFFLINE):
self.wait_for_host_state(host, [HOST_STATES.OFFLINE])
result['changed'] = True
else:
self.fail(msg="could not set host offline")
elif current_state in [HOST_STATES.OFFLINE]:
pass
else:
self.fail(msg="unknown host state %s, cowardly refusing to change state to offline" % current_state_name)
elif desired_state == 'absent':
if current_state != HOST_ABSENT:
if one.host.delete(host.ID):
result['changed'] = True
else:
self.fail(msg="could not delete host from cluster")
# if we reach this point we can assume that the host was taken to the desired state
if desired_state != "absent":
# manipulate or modify the template
desired_template_changes = self.get_parameter('template')
if desired_template_changes is None:
desired_template_changes = dict()
# complete the template with speficic ansible parameters
if self.is_parameter('labels'):
desired_template_changes['LABELS'] = self.get_parameter('labels')
if self.requires_template_update(host.TEMPLATE, desired_template_changes):
# setup the root element so that pyone will generate XML instead of attribute vector
desired_template_changes = {"TEMPLATE": desired_template_changes}
if one.host.update(host.ID, desired_template_changes, 1): # merge the template
result['changed'] = True
else:
self.fail(msg="failed to update the host template")
# the cluster
if host.CLUSTER_ID != self.get_parameter('cluster_id'):
if one.cluster.addhost(self.get_parameter('cluster_id'), host.ID):
result['changed'] = True
else:
self.fail(msg="failed to update the host cluster")
# return
self.exit()
def main():
HostModule().run_module()
if __name__ == '__main__':
main()

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
#
# Copyright 2018 www.privaz.io Valletech AB
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
class ModuleDocFragment(object):
# OpenNebula common documentation
DOCUMENTATION = '''
options:
api_url:
description:
- The ENDPOINT URL of the XMLRPC server.
If not specified then the value of the ONE_URL environment variable, if any, is used.
aliases:
- api_endpoint
api_username:
description:
- The name of the user for XMLRPC authentication.
If not specified then the value of the ONE_USERNAME environment variable, if any, is used.
api_password:
description:
- The password or token for XMLRPC authentication.
aliases:
- api_token
validate_certs:
description:
- Whether to validate the SSL certificates or not.
This parameter is ignored if PYTHONHTTPSVERIFY environment variable is used.
type: bool
default: true
wait_timeout:
description:
- time to wait for the desired state to be reached before timeout, in seconds.
default: 300
'''

@ -0,0 +1,20 @@
# This is the configuration template for ansible-test OpenNebula integration tests.
#
# You do not need this template if you are:
#
# 1) Running integration tests without using ansible-test.
# 2) Running integration tests against previously recorded XMLRPC fixtures
#
# If you want to test against a Live OpenNebula platform,
# fill in the values below and save this file without the .template extension.
# This will cause ansible-test to use the given configuration.
#
# If you run with @FIXTURES enabled (true) then you can decide if you want to
# run in @REPLAY mode (true) or, record mode (false).
opennebula_url: @URL
opennebula_username: @USERNAME
opennebula_password: @PASSWORD
opennebula_test_fixture: @FIXTURES
opennebula_test_fixture_replay: @REPLAY

@ -0,0 +1,2 @@
cloud/opennebula
posix/ci/cloud/group4/opennebula

@ -0,0 +1,2 @@
dependencies:
- setup_opennebula

@ -0,0 +1,235 @@
# test code for the one_host module
# ENVIRONENT PREPARACTION
- set_fact: test_number= 0
- name: "test_{{test_number}}: copy fixtures to test host"
copy:
src: testhost/tmp/opennebula-fixtures.json.gz
dest: /tmp
when:
- opennebula_test_fixture
- opennebula_test_fixture_replay
# SETUP INITIAL TESTING CONDITION
- set_fact: test_number={{ test_number | int + 1 }}
- name: "test_{{test_number}}: ensure the tests hosts are absent"
one_host:
name: "{{ item }}"
state: absent
api_endpoint: "{{ opennebula_url }}"
api_username: "{{ opennebula_username }}"
api_token: "{{ opennebula_password }}"
validate_certs: false
environment:
ONE_TEST_FIXTURE: "{{ opennebula_test_fixture }}"
ONE_TEST_FIXTURE_FILE: /tmp/opennebula-fixtures.json.gz
ONE_TEST_FIXTURE_REPLAY: "{{ opennebula_test_fixture_replay }}"
ONE_TEST_FIXTURE_UNIT: "test_{{test_number}}_{{ item }}"
with_items: "{{opennebula_test.hosts}}"
register: result
# NOT EXISTING HOSTS
- set_fact: test_number={{ test_number | int + 1 }}
- name: "test_{{test_number}}: attempt to enable a host that does not exists"
one_host:
name: badhost
state: "{{item}}"
api_url: "{{ opennebula_url }}"
api_username: "{{ opennebula_username }}"
api_password: "{{ opennebula_password }}"
validate_certs: false
environment:
ONE_TEST_FIXTURE: "{{ opennebula_test_fixture }}"
ONE_TEST_FIXTURE_FILE: /tmp/opennebula-fixtures.json.gz
ONE_TEST_FIXTURE_REPLAY: "{{ opennebula_test_fixture_replay }}"
ONE_TEST_FIXTURE_UNIT: "test_{{test_number}}_{{item}}"
ignore_errors: true
register: result
with_items:
- enabled
- disabled
- offline
- name: "assert test_{{test_number}} failed"
assert:
that:
- result is failed
- result.results[0].msg == 'invalid host state ERROR'
# ---
- set_fact: test_number={{ test_number | int + 1 }}
- name: "test_{{test_number}}: delete an unexisting host"
one_host:
name: badhost
state: absent
api_url: "{{ opennebula_url }}"
api_username: "{{ opennebula_username }}"
api_password: "{{ opennebula_password }}"
validate_certs: false
environment:
ONE_TEST_FIXTURE: "{{ opennebula_test_fixture }}"
ONE_TEST_FIXTURE_FILE: /tmp/opennebula-fixtures.json.gz
ONE_TEST_FIXTURE_REPLAY: "{{ opennebula_test_fixture_replay }}"
ONE_TEST_FIXTURE_UNIT: "test_{{test_number}}"
register: result
- name: "assert test_{{test_number}} worked"
assert:
that:
- result.changed
# HOST ENABLEMENT
- set_fact: test_number={{ test_number | int + 1 }}
- name: "test_{{test_number}}: enable the test hosts"
one_host:
name: "{{ item }}"
state: enabled
api_url: "{{ opennebula_url }}"
api_username: "{{ opennebula_username }}"
api_password: "{{ opennebula_password }}"
validate_certs: false
environment:
ONE_TEST_FIXTURE: "{{ opennebula_test_fixture }}"
ONE_TEST_FIXTURE_FILE: /tmp/opennebula-fixtures.json.gz
ONE_TEST_FIXTURE_REPLAY: "{{ opennebula_test_fixture_replay }}"
ONE_TEST_FIXTURE_UNIT: "test_{{test_number}}_{{ item }}"
with_items: "{{opennebula_test.hosts}}"
register: result
- name: "assert test_{{test_number}} worked"
assert:
that:
- result.changed
# TEMPLATE MANAGEMENT
- set_fact: test_number={{ test_number | int + 1 }}
- name: "test_{{test_number}}: setup template values on hosts"
one_host:
name: "{{ item }}"
state: enabled
api_url: "{{ opennebula_url }}"
api_username: "{{ opennebula_username }}"
api_password: "{{ opennebula_password }}"
validate_certs: false
template:
LABELS:
- test
- custom
TEST_VALUE: 2
environment:
ONE_TEST_FIXTURE: "{{ opennebula_test_fixture }}"
ONE_TEST_FIXTURE_FILE: /tmp/opennebula-fixtures.json.gz
ONE_TEST_FIXTURE_REPLAY: "{{ opennebula_test_fixture_replay }}"
ONE_TEST_FIXTURE_UNIT: "test_{{test_number}}_{{ item }}"
with_items: "{{opennebula_test.hosts}}"
register: result
- name: "assert test_{{test_number}} worked"
assert:
that:
- result.changed
# ---
- set_fact: test_number={{ test_number | int + 1 }}
- name: "test_{{test_number}}: setup equivalent template values on hosts"
one_host:
name: "{{ item }}"
state: enabled
api_url: "{{ opennebula_url }}"
api_username: "{{ opennebula_username }}"
api_password: "{{ opennebula_password }}"
validate_certs: false
labels:
- test
- custom
attributes:
TEST_VALUE: "2"
environment:
ONE_TEST_FIXTURE: "{{ opennebula_test_fixture }}"
ONE_TEST_FIXTURE_FILE: /tmp/opennebula-fixtures.json.gz
ONE_TEST_FIXTURE_REPLAY: "{{ opennebula_test_fixture_replay }}"
ONE_TEST_FIXTURE_UNIT: "test_{{test_number}}_{{ item }}"
with_items: "{{opennebula_test.hosts}}"
register: result
- name: "assert test_{{test_number}} worked"
assert:
that:
- result.changed == false
# HOST DISABLEMENT
- set_fact: test_number={{ test_number | int + 1 }}
- name: "test_{{test_number}}: disable the test hosts"
one_host:
name: "{{ item }}"
state: disabled
api_url: "{{ opennebula_url }}"
api_username: "{{ opennebula_username }}"
api_password: "{{ opennebula_password }}"
validate_certs: false
environment:
ONE_TEST_FIXTURE: "{{ opennebula_test_fixture }}"
ONE_TEST_FIXTURE_FILE: /tmp/opennebula-fixtures.json.gz
ONE_TEST_FIXTURE_REPLAY: "{{ opennebula_test_fixture_replay }}"
ONE_TEST_FIXTURE_UNIT: "test_{{test_number}}_{{ item }}"
with_items: "{{opennebula_test.hosts}}"
register: result
- name: "assert test_{{test_number}} worked"
assert:
that:
- result.changed
# HOST OFFLINE
- set_fact: test_number={{ test_number | int + 1 }}
- name: "test_{{test_number}}: offline the test hosts"
one_host:
name: "{{ item }}"
state: offline
api_url: "{{ opennebula_url }}"
api_username: "{{ opennebula_username }}"
api_password: "{{ opennebula_password }}"
validate_certs: false
environment:
ONE_TEST_FIXTURE: "{{ opennebula_test_fixture }}"
ONE_TEST_FIXTURE_FILE: /tmp/opennebula-fixtures.json.gz
ONE_TEST_FIXTURE_REPLAY: "{{ opennebula_test_fixture_replay }}"
ONE_TEST_FIXTURE_UNIT: "test_{{test_number}}_{{ item }}"
with_items: "{{opennebula_test.hosts}}"
register: result
- name: "assert test_{{test_number}} worked"
assert:
that:
- result.changed
# TEARDOWN
- name: fetch fixtures
fetch:
src: /tmp/opennebula-fixtures.json.gz
dest: targets/one_host/files
when:
- opennebula_test_fixture
- not opennebula_test_fixture_replay

@ -0,0 +1,6 @@
---
opennebula_test:
hosts:
- hv1
- hv2

@ -0,0 +1,61 @@
"""OpenNebula plugin for integration tests."""
import os
from lib.cloud import (
CloudProvider,
CloudEnvironment
)
from lib.util import (
find_executable,
ApplicationError,
display,
is_shippable,
)
class OpenNebulaCloudProvider(CloudProvider):
"""Checks if a configuration file has been passed or fixtures are going to be used for testing"""
def filter(self, targets, exclude):
""" no need to filter modules, they can either run from config file or from fixtures"""
pass
def setup(self):
"""Setup the cloud resource before delegation and register a cleanup callback."""
super(OpenNebulaCloudProvider, self).setup()
if not self._use_static_config():
self._setup_dynamic()
def _setup_dynamic(self):
display.info('No config file provided, will run test from fixtures')
config = self._read_config_template()
values = dict(
URL="http://localhost/RPC2",
USERNAME='oneadmin',
PASSWORD='onepass',
FIXTURES='true',
REPLAY='true',
)
config = self._populate_config_template(config, values)
self._write_config(config)
class OpenNebulaCloudEnvironment(CloudEnvironment):
"""
Updates integration test environment after delegation. Will setup the config file as parameter.
"""
def configure_environment(self, env, cmd):
"""
:type env: dict[str, str]
:type cmd: list[str]
"""
cmd.append('-e')
cmd.append('@%s' % self.config_path)
cmd.append('-e')
cmd.append('resource_prefix=%s' % self.resource_prefix)
Loading…
Cancel
Save