Remove incidental_cs_role_permission (#72380)

* Add explicit argspec tests for choices

* ci_complete ci_coverage

* Remove incidental_cs_role_permission

* ci_complete ci_coverage

* ci_complete ci_coverage
pull/72446/head
Matt Martz 4 years ago committed by GitHub
parent 2ee5af514b
commit 6543c7bc5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -140,7 +140,6 @@ matrix:
- env: T=i/aws/2.7/1 - env: T=i/aws/2.7/1
- env: T=i/aws/3.6/1 - env: T=i/aws/3.6/1
- env: T=i/vcenter//1 - env: T=i/vcenter//1
- env: T=i/cs//1
- env: T=i/tower//1 - env: T=i/tower//1
- env: T=i/cloud//1 - env: T=i/cloud//1

@ -91,6 +91,13 @@ def main():
'off', 'off',
], ],
}, },
'choices': {
'type': 'str',
'choices': [
'foo',
'bar',
],
}
}, },
required_if=( required_if=(
('state', 'present', ('path', 'content'), True), ('state', 'present', ('path', 'content'), True),

@ -259,6 +259,18 @@
required_one_of_one: value required_one_of_one: value
register: argspec_choices_with_strings_like_bools_false register: argspec_choices_with_strings_like_bools_false
- argspec:
required: value
required_one_of_one: value
choices: foo
- argspec:
required: value
required_one_of_one: value
choices: baz
register: argspec_choices_bad_choice
ignore_errors: true
- assert: - assert:
that: that:
- argspec_required_fail is failed - argspec_required_fail is failed
@ -310,3 +322,5 @@
- argspec_choices_with_strings_like_bools_true.choices_with_strings_like_bools == 'on' - argspec_choices_with_strings_like_bools_true.choices_with_strings_like_bools == 'on'
- argspec_choices_with_strings_like_bools_true_bool.choices_with_strings_like_bools == 'on' - argspec_choices_with_strings_like_bools_true_bool.choices_with_strings_like_bools == 'on'
- argspec_choices_with_strings_like_bools_false.choices_with_strings_like_bools == 'off' - argspec_choices_with_strings_like_bools_false.choices_with_strings_like_bools == 'off'
- argspec_choices_bad_choice is failed

@ -1,3 +0,0 @@
---
dependencies:
- incidental_cs_common

@ -1,303 +0,0 @@
- name: pre-setup
cs_role:
name: "testRole"
register: testRole
- name: verify pre-setup
assert:
that:
- testRole is successful
- name: setup
cs_role_permission:
name: "fakeRolePerm"
role: "{{ testRole.id }}"
state: absent
register: roleperm
- name: verify setup
assert:
that:
- roleperm is successful
- name: setup2
cs_role_permission:
name: "fakeRolePerm2"
role: "{{ testRole.id }}"
state: absent
register: roleperm2
- name: verify setup2
assert:
that:
- roleperm2 is successful
- name: test fail if missing name
cs_role_permission:
role: "{{ testRole.id }}"
register: roleperm
ignore_errors: true
- name: verify results of fail if missing name
assert:
that:
- roleperm is failed
- 'roleperm.msg == "missing required arguments: name"'
- name: test fail if missing role
cs_role_permission:
name: "fakeRolePerm"
register: roleperm
ignore_errors: true
- name: verify results of fail if missing role
assert:
that:
- roleperm is failed
- 'roleperm.msg == "missing required arguments: role"'
- name: test fail if role does not exist
cs_role_permission:
name: "fakeRolePerm"
role: "testtest"
register: roleperm
ignore_errors: true
- name: verify results of fail if role does not exist
assert:
that:
- roleperm is failed
- roleperm.msg == "Role 'testtest' not found"
- name: test fail if state is incorrcect
cs_role_permission:
state: badstate
role: "{{ testRole.id }}"
name: "fakeRolePerm"
permission: allow
register: roleperm
ignore_errors: true
- name: verify results of fail if state is incorrcect
assert:
that:
- roleperm is failed
- 'roleperm.msg == "value of state must be one of: present, absent, got: badstate"'
- name: test create role permission in check mode
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
permission: allow
description: "fakeRolePerm description"
register: roleperm
check_mode: yes
- name: verify results of role permission in check mode
assert:
that:
- roleperm is successful
- roleperm is changed
- name: test create role permission
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
permission: allow
description: "fakeRolePerm description"
register: roleperm
- name: verify results of role permission
assert:
that:
- roleperm is successful
- roleperm is changed
- roleperm.name == "fakeRolePerm"
- roleperm.permission == "allow"
- roleperm.description == "fakeRolePerm description"
- name: test create role permission idempotency
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
permission: allow
description: "fakeRolePerm description"
register: roleperm
- name: verify results of role permission idempotency
assert:
that:
- roleperm is successful
- roleperm is not changed
- roleperm.name == "fakeRolePerm"
- roleperm.permission == "allow"
- roleperm.description == "fakeRolePerm description"
- name: test update role permission in check_mode
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
permission: deny
description: "fakeRolePerm description"
register: roleperm
check_mode: yes
- name: verify results of update role permission in check mode
assert:
that:
- roleperm is successful
- roleperm is changed
- roleperm.name == "fakeRolePerm"
- roleperm.permission == "allow"
- roleperm.description == "fakeRolePerm description"
- name: test update role permission
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
permission: deny
description: "fakeRolePerm description"
register: roleperm
- name: verify results of update role permission
assert:
that:
- roleperm is successful
- roleperm is changed
- roleperm.name == "fakeRolePerm"
- roleperm.permission == "deny"
- roleperm.description == "fakeRolePerm description"
- name: test update role permission idempotency
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
permission: deny
description: "fakeRolePerm description"
register: roleperm
- name: verify results of update role permission idempotency
assert:
that:
- roleperm is successful
- roleperm is not changed
- roleperm.name == "fakeRolePerm"
- roleperm.permission == "deny"
- roleperm.description == "fakeRolePerm description"
- name: test create a second role permission
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm2"
permission: allow
register: roleperm2
- name: verify results of create a second role permission
assert:
that:
- roleperm2 is successful
- roleperm2 is changed
- roleperm2.name == "fakeRolePerm2"
- name: test update rules order in check_mode
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
parent: "{{ roleperm2.id }}"
register: roleperm
check_mode: yes
- name: verify results of update rule order check mode
assert:
that:
- roleperm is successful
- roleperm is changed
- roleperm.name == "fakeRolePerm"
- name: test update rules order
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
parent: "{{ roleperm2.id }}"
register: roleperm
- name: verify results of update rule order
assert:
that:
- roleperm is successful
- roleperm is changed
- roleperm.name == "fakeRolePerm"
- name: test update rules order to the top of the list
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
parent: 0
register: roleperm
- name: verify results of update rule order to the top of the list
assert:
that:
- roleperm is successful
- roleperm is changed
- roleperm.name == "fakeRolePerm"
- name: test update rules order with parent NAME
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
parent: "{{ roleperm2.name }}"
register: roleperm
- name: verify results of update rule order with parent NAME
assert:
that:
- roleperm is successful
- roleperm is changed
- roleperm.name == "fakeRolePerm"
- name: test fail if permission AND parent args are present
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
permission: allow
parent: 0
register: roleperm
ignore_errors: true
- name: verify results of fail if permission AND parent args are present
assert:
that:
- roleperm is failed
- 'roleperm.msg == "parameters are mutually exclusive: permission|parent"'
- name: test fail if parent does not exist
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
parent: "badParent"
register: roleperm
ignore_errors: true
- name: verify results of fail if parent does not exist
assert:
that:
- roleperm is failed
- roleperm.msg == "Parent rule 'badParent' not found"
- name: test remove role permission in check_mode
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
state: absent
register: roleperm
check_mode: yes
- name: verify results of rename role permission in check_mode
assert:
that:
- roleperm is successful
- roleperm is changed
- name: test remove role permission
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm"
state: absent
register: roleperm
- name: verify results of remove role permission
assert:
that:
- roleperm is successful
- roleperm is changed
- name: remove second role permission
cs_role_permission:
role: "{{ testRole.id }}"
name: "fakeRolePerm2"
state: absent
register: roleperm
- name: verify results of remove second role permission
assert:
that:
- roleperm is successful
- roleperm is changed

@ -1,664 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015, René Moser <mail@renemoser.net>
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import os
import sys
import time
import traceback
from ansible.module_utils._text import to_text, to_native
from ansible.module_utils.basic import missing_required_lib
CS_IMP_ERR = None
try:
from cs import CloudStack, CloudStackException, read_config
HAS_LIB_CS = True
except ImportError:
CS_IMP_ERR = traceback.format_exc()
HAS_LIB_CS = False
if sys.version_info > (3,):
long = int
def cs_argument_spec():
return dict(
api_key=dict(default=os.environ.get('CLOUDSTACK_KEY')),
api_secret=dict(default=os.environ.get('CLOUDSTACK_SECRET'), no_log=True),
api_url=dict(default=os.environ.get('CLOUDSTACK_ENDPOINT')),
api_http_method=dict(choices=['get', 'post'], default=os.environ.get('CLOUDSTACK_METHOD')),
api_timeout=dict(type='int', default=os.environ.get('CLOUDSTACK_TIMEOUT')),
api_region=dict(default=os.environ.get('CLOUDSTACK_REGION') or 'cloudstack'),
)
def cs_required_together():
return [['api_key', 'api_secret']]
class AnsibleCloudStack:
def __init__(self, module):
if not HAS_LIB_CS:
module.fail_json(msg=missing_required_lib('cs'), exception=CS_IMP_ERR)
self.result = {
'changed': False,
'diff': {
'before': dict(),
'after': dict()
}
}
# Common returns, will be merged with self.returns
# search_for_key: replace_with_key
self.common_returns = {
'id': 'id',
'name': 'name',
'created': 'created',
'zonename': 'zone',
'state': 'state',
'project': 'project',
'account': 'account',
'domain': 'domain',
'displaytext': 'display_text',
'displayname': 'display_name',
'description': 'description',
}
# Init returns dict for use in subclasses
self.returns = {}
# these values will be casted to int
self.returns_to_int = {}
# these keys will be compared case sensitive in self.has_changed()
self.case_sensitive_keys = [
'id',
'displaytext',
'displayname',
'description',
]
self.module = module
self._cs = None
# Helper for VPCs
self._vpc_networks_ids = None
self.domain = None
self.account = None
self.project = None
self.ip_address = None
self.network = None
self.physical_network = None
self.vpc = None
self.zone = None
self.vm = None
self.vm_default_nic = None
self.os_type = None
self.hypervisor = None
self.capabilities = None
self.network_acl = None
@property
def cs(self):
if self._cs is None:
api_config = self.get_api_config()
self._cs = CloudStack(**api_config)
return self._cs
def get_api_config(self):
api_region = self.module.params.get('api_region') or os.environ.get('CLOUDSTACK_REGION')
try:
config = read_config(api_region)
except KeyError:
config = {}
api_config = {
'endpoint': self.module.params.get('api_url') or config.get('endpoint'),
'key': self.module.params.get('api_key') or config.get('key'),
'secret': self.module.params.get('api_secret') or config.get('secret'),
'timeout': self.module.params.get('api_timeout') or config.get('timeout') or 10,
'method': self.module.params.get('api_http_method') or config.get('method') or 'get',
}
self.result.update({
'api_region': api_region,
'api_url': api_config['endpoint'],
'api_key': api_config['key'],
'api_timeout': int(api_config['timeout']),
'api_http_method': api_config['method'],
})
if not all([api_config['endpoint'], api_config['key'], api_config['secret']]):
self.fail_json(msg="Missing api credentials: can not authenticate")
return api_config
def fail_json(self, **kwargs):
self.result.update(kwargs)
self.module.fail_json(**self.result)
def get_or_fallback(self, key=None, fallback_key=None):
value = self.module.params.get(key)
if not value:
value = self.module.params.get(fallback_key)
return value
def has_changed(self, want_dict, current_dict, only_keys=None, skip_diff_for_keys=None):
result = False
for key, value in want_dict.items():
# Optionally limit by a list of keys
if only_keys and key not in only_keys:
continue
# Skip None values
if value is None:
continue
if key in current_dict:
if isinstance(value, (int, float, long, complex)):
# ensure we compare the same type
if isinstance(value, int):
current_dict[key] = int(current_dict[key])
elif isinstance(value, float):
current_dict[key] = float(current_dict[key])
elif isinstance(value, long):
current_dict[key] = long(current_dict[key])
elif isinstance(value, complex):
current_dict[key] = complex(current_dict[key])
if value != current_dict[key]:
if skip_diff_for_keys and key not in skip_diff_for_keys:
self.result['diff']['before'][key] = current_dict[key]
self.result['diff']['after'][key] = value
result = True
else:
before_value = to_text(current_dict[key])
after_value = to_text(value)
if self.case_sensitive_keys and key in self.case_sensitive_keys:
if before_value != after_value:
if skip_diff_for_keys and key not in skip_diff_for_keys:
self.result['diff']['before'][key] = before_value
self.result['diff']['after'][key] = after_value
result = True
# Test for diff in case insensitive way
elif before_value.lower() != after_value.lower():
if skip_diff_for_keys and key not in skip_diff_for_keys:
self.result['diff']['before'][key] = before_value
self.result['diff']['after'][key] = after_value
result = True
else:
if skip_diff_for_keys and key not in skip_diff_for_keys:
self.result['diff']['before'][key] = None
self.result['diff']['after'][key] = to_text(value)
result = True
return result
def _get_by_key(self, key=None, my_dict=None):
if my_dict is None:
my_dict = {}
if key:
if key in my_dict:
return my_dict[key]
self.fail_json(msg="Something went wrong: %s not found" % key)
return my_dict
def query_api(self, command, **args):
try:
res = getattr(self.cs, command)(**args)
if 'errortext' in res:
self.fail_json(msg="Failed: '%s'" % res['errortext'])
except CloudStackException as e:
self.fail_json(msg='CloudStackException: %s' % to_native(e))
except Exception as e:
self.fail_json(msg=to_native(e))
return res
def get_network_acl(self, key=None):
if self.network_acl is None:
args = {
'name': self.module.params.get('network_acl'),
'vpcid': self.get_vpc(key='id'),
}
network_acls = self.query_api('listNetworkACLLists', **args)
if network_acls:
self.network_acl = network_acls['networkacllist'][0]
self.result['network_acl'] = self.network_acl['name']
if self.network_acl:
return self._get_by_key(key, self.network_acl)
else:
self.fail_json(msg="Network ACL %s not found" % self.module.params.get('network_acl'))
def get_vpc(self, key=None):
"""Return a VPC dictionary or the value of given key of."""
if self.vpc:
return self._get_by_key(key, self.vpc)
vpc = self.module.params.get('vpc')
if not vpc:
vpc = os.environ.get('CLOUDSTACK_VPC')
if not vpc:
return None
args = {
'account': self.get_account(key='name'),
'domainid': self.get_domain(key='id'),
'projectid': self.get_project(key='id'),
'zoneid': self.get_zone(key='id'),
}
vpcs = self.query_api('listVPCs', **args)
if not vpcs:
self.fail_json(msg="No VPCs available.")
for v in vpcs['vpc']:
if vpc in [v['name'], v['displaytext'], v['id']]:
# Fail if the identifyer matches more than one VPC
if self.vpc:
self.fail_json(msg="More than one VPC found with the provided identifyer '%s'" % vpc)
else:
self.vpc = v
self.result['vpc'] = v['name']
if self.vpc:
return self._get_by_key(key, self.vpc)
self.fail_json(msg="VPC '%s' not found" % vpc)
def is_vpc_network(self, network_id):
"""Returns True if network is in VPC."""
# This is an efficient way to query a lot of networks at a time
if self._vpc_networks_ids is None:
args = {
'account': self.get_account(key='name'),
'domainid': self.get_domain(key='id'),
'projectid': self.get_project(key='id'),
'zoneid': self.get_zone(key='id'),
}
vpcs = self.query_api('listVPCs', **args)
self._vpc_networks_ids = []
if vpcs:
for vpc in vpcs['vpc']:
for n in vpc.get('network', []):
self._vpc_networks_ids.append(n['id'])
return network_id in self._vpc_networks_ids
def get_physical_network(self, key=None):
if self.physical_network:
return self._get_by_key(key, self.physical_network)
physical_network = self.module.params.get('physical_network')
args = {
'zoneid': self.get_zone(key='id')
}
physical_networks = self.query_api('listPhysicalNetworks', **args)
if not physical_networks:
self.fail_json(msg="No physical networks available.")
for net in physical_networks['physicalnetwork']:
if physical_network in [net['name'], net['id']]:
self.physical_network = net
self.result['physical_network'] = net['name']
return self._get_by_key(key, self.physical_network)
self.fail_json(msg="Physical Network '%s' not found" % physical_network)
def get_network(self, key=None):
"""Return a network dictionary or the value of given key of."""
if self.network:
return self._get_by_key(key, self.network)
network = self.module.params.get('network')
if not network:
vpc_name = self.get_vpc(key='name')
if vpc_name:
self.fail_json(msg="Could not find network for VPC '%s' due missing argument: network" % vpc_name)
return None
args = {
'account': self.get_account(key='name'),
'domainid': self.get_domain(key='id'),
'projectid': self.get_project(key='id'),
'zoneid': self.get_zone(key='id'),
'vpcid': self.get_vpc(key='id')
}
networks = self.query_api('listNetworks', **args)
if not networks:
self.fail_json(msg="No networks available.")
for n in networks['network']:
# ignore any VPC network if vpc param is not given
if 'vpcid' in n and not self.get_vpc(key='id'):
continue
if network in [n['displaytext'], n['name'], n['id']]:
self.result['network'] = n['name']
self.network = n
return self._get_by_key(key, self.network)
self.fail_json(msg="Network '%s' not found" % network)
def get_project(self, key=None):
if self.project:
return self._get_by_key(key, self.project)
project = self.module.params.get('project')
if not project:
project = os.environ.get('CLOUDSTACK_PROJECT')
if not project:
return None
args = {
'account': self.get_account(key='name'),
'domainid': self.get_domain(key='id')
}
projects = self.query_api('listProjects', **args)
if projects:
for p in projects['project']:
if project.lower() in [p['name'].lower(), p['id']]:
self.result['project'] = p['name']
self.project = p
return self._get_by_key(key, self.project)
self.fail_json(msg="project '%s' not found" % project)
def get_ip_address(self, key=None):
if self.ip_address:
return self._get_by_key(key, self.ip_address)
ip_address = self.module.params.get('ip_address')
if not ip_address:
self.fail_json(msg="IP address param 'ip_address' is required")
args = {
'ipaddress': ip_address,
'account': self.get_account(key='name'),
'domainid': self.get_domain(key='id'),
'projectid': self.get_project(key='id'),
'vpcid': self.get_vpc(key='id'),
}
ip_addresses = self.query_api('listPublicIpAddresses', **args)
if not ip_addresses:
self.fail_json(msg="IP address '%s' not found" % args['ipaddress'])
self.ip_address = ip_addresses['publicipaddress'][0]
return self._get_by_key(key, self.ip_address)
def get_vm_guest_ip(self):
vm_guest_ip = self.module.params.get('vm_guest_ip')
default_nic = self.get_vm_default_nic()
if not vm_guest_ip:
return default_nic['ipaddress']
for secondary_ip in default_nic['secondaryip']:
if vm_guest_ip == secondary_ip['ipaddress']:
return vm_guest_ip
self.fail_json(msg="Secondary IP '%s' not assigned to VM" % vm_guest_ip)
def get_vm_default_nic(self):
if self.vm_default_nic:
return self.vm_default_nic
nics = self.query_api('listNics', virtualmachineid=self.get_vm(key='id'))
if nics:
for n in nics['nic']:
if n['isdefault']:
self.vm_default_nic = n
return self.vm_default_nic
self.fail_json(msg="No default IP address of VM '%s' found" % self.module.params.get('vm'))
def get_vm(self, key=None, filter_zone=True):
if self.vm:
return self._get_by_key(key, self.vm)
vm = self.module.params.get('vm')
if not vm:
self.fail_json(msg="Virtual machine param 'vm' is required")
args = {
'account': self.get_account(key='name'),
'domainid': self.get_domain(key='id'),
'projectid': self.get_project(key='id'),
'zoneid': self.get_zone(key='id') if filter_zone else None,
'fetch_list': True,
}
vms = self.query_api('listVirtualMachines', **args)
if vms:
for v in vms:
if vm.lower() in [v['name'].lower(), v['displayname'].lower(), v['id']]:
self.vm = v
return self._get_by_key(key, self.vm)
self.fail_json(msg="Virtual machine '%s' not found" % vm)
def get_disk_offering(self, key=None):
disk_offering = self.module.params.get('disk_offering')
if not disk_offering:
return None
# Do not add domain filter for disk offering listing.
disk_offerings = self.query_api('listDiskOfferings')
if disk_offerings:
for d in disk_offerings['diskoffering']:
if disk_offering in [d['displaytext'], d['name'], d['id']]:
return self._get_by_key(key, d)
self.fail_json(msg="Disk offering '%s' not found" % disk_offering)
def get_zone(self, key=None):
if self.zone:
return self._get_by_key(key, self.zone)
zone = self.module.params.get('zone')
if not zone:
zone = os.environ.get('CLOUDSTACK_ZONE')
zones = self.query_api('listZones')
if not zones:
self.fail_json(msg="No zones available. Please create a zone first")
# use the first zone if no zone param given
if not zone:
self.zone = zones['zone'][0]
self.result['zone'] = self.zone['name']
return self._get_by_key(key, self.zone)
if zones:
for z in zones['zone']:
if zone.lower() in [z['name'].lower(), z['id']]:
self.result['zone'] = z['name']
self.zone = z
return self._get_by_key(key, self.zone)
self.fail_json(msg="zone '%s' not found" % zone)
def get_os_type(self, key=None):
if self.os_type:
return self._get_by_key(key, self.zone)
os_type = self.module.params.get('os_type')
if not os_type:
return None
os_types = self.query_api('listOsTypes')
if os_types:
for o in os_types['ostype']:
if os_type in [o['description'], o['id']]:
self.os_type = o
return self._get_by_key(key, self.os_type)
self.fail_json(msg="OS type '%s' not found" % os_type)
def get_hypervisor(self):
if self.hypervisor:
return self.hypervisor
hypervisor = self.module.params.get('hypervisor')
hypervisors = self.query_api('listHypervisors')
# use the first hypervisor if no hypervisor param given
if not hypervisor:
self.hypervisor = hypervisors['hypervisor'][0]['name']
return self.hypervisor
for h in hypervisors['hypervisor']:
if hypervisor.lower() == h['name'].lower():
self.hypervisor = h['name']
return self.hypervisor
self.fail_json(msg="Hypervisor '%s' not found" % hypervisor)
def get_account(self, key=None):
if self.account:
return self._get_by_key(key, self.account)
account = self.module.params.get('account')
if not account:
account = os.environ.get('CLOUDSTACK_ACCOUNT')
if not account:
return None
domain = self.module.params.get('domain')
if not domain:
self.fail_json(msg="Account must be specified with Domain")
args = {
'name': account,
'domainid': self.get_domain(key='id'),
'listall': True
}
accounts = self.query_api('listAccounts', **args)
if accounts:
self.account = accounts['account'][0]
self.result['account'] = self.account['name']
return self._get_by_key(key, self.account)
self.fail_json(msg="Account '%s' not found" % account)
def get_domain(self, key=None):
if self.domain:
return self._get_by_key(key, self.domain)
domain = self.module.params.get('domain')
if not domain:
domain = os.environ.get('CLOUDSTACK_DOMAIN')
if not domain:
return None
args = {
'listall': True,
}
domains = self.query_api('listDomains', **args)
if domains:
for d in domains['domain']:
if d['path'].lower() in [domain.lower(), "root/" + domain.lower(), "root" + domain.lower()]:
self.domain = d
self.result['domain'] = d['path']
return self._get_by_key(key, self.domain)
self.fail_json(msg="Domain '%s' not found" % domain)
def query_tags(self, resource, resource_type):
args = {
'resourceid': resource['id'],
'resourcetype': resource_type,
}
tags = self.query_api('listTags', **args)
return self.get_tags(resource=tags, key='tag')
def get_tags(self, resource=None, key='tags'):
existing_tags = []
for tag in resource.get(key) or []:
existing_tags.append({'key': tag['key'], 'value': tag['value']})
return existing_tags
def _process_tags(self, resource, resource_type, tags, operation="create"):
if tags:
self.result['changed'] = True
if not self.module.check_mode:
args = {
'resourceids': resource['id'],
'resourcetype': resource_type,
'tags': tags,
}
if operation == "create":
response = self.query_api('createTags', **args)
else:
response = self.query_api('deleteTags', **args)
self.poll_job(response)
def _tags_that_should_exist_or_be_updated(self, resource, tags):
existing_tags = self.get_tags(resource)
return [tag for tag in tags if tag not in existing_tags]
def _tags_that_should_not_exist(self, resource, tags):
existing_tags = self.get_tags(resource)
return [tag for tag in existing_tags if tag not in tags]
def ensure_tags(self, resource, resource_type=None):
if not resource_type or not resource:
self.fail_json(msg="Error: Missing resource or resource_type for tags.")
if 'tags' in resource:
tags = self.module.params.get('tags')
if tags is not None:
self._process_tags(resource, resource_type, self._tags_that_should_not_exist(resource, tags), operation="delete")
self._process_tags(resource, resource_type, self._tags_that_should_exist_or_be_updated(resource, tags))
resource['tags'] = self.query_tags(resource=resource, resource_type=resource_type)
return resource
def get_capabilities(self, key=None):
if self.capabilities:
return self._get_by_key(key, self.capabilities)
capabilities = self.query_api('listCapabilities')
self.capabilities = capabilities['capability']
return self._get_by_key(key, self.capabilities)
def poll_job(self, job=None, key=None):
if 'jobid' in job:
while True:
res = self.query_api('queryAsyncJobResult', jobid=job['jobid'])
if res['jobstatus'] != 0 and 'jobresult' in res:
if 'errortext' in res['jobresult']:
self.fail_json(msg="Failed: '%s'" % res['jobresult']['errortext'])
if key and key in res['jobresult']:
job = res['jobresult'][key]
break
time.sleep(2)
return job
def update_result(self, resource, result=None):
if result is None:
result = dict()
if resource:
returns = self.common_returns.copy()
returns.update(self.returns)
for search_key, return_key in returns.items():
if search_key in resource:
result[return_key] = resource[search_key]
# Bad bad API does not always return int when it should.
for search_key, return_key in self.returns_to_int.items():
if search_key in resource:
result[return_key] = int(resource[search_key])
if 'tags' in resource:
result['tags'] = resource['tags']
return result
def get_result(self, resource):
return self.update_result(resource, self.result)
def get_result_and_facts(self, facts_name, resource):
result = self.get_result(resource)
ansible_facts = {
facts_name: result.copy()
}
for k in ['diff', 'changed']:
if k in ansible_facts[facts_name]:
del ansible_facts[facts_name][k]
result.update(ansible_facts=ansible_facts)
return result

@ -1,211 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# (c) 2016, René Moser <mail@renemoser.net>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = '''
---
module: cs_role
short_description: Manages user roles on Apache CloudStack based clouds.
description:
- Create, update, delete user roles.
version_added: '2.3'
author: René Moser (@resmo)
options:
name:
description:
- Name of the role.
type: str
required: true
uuid:
description:
- ID of the role.
- If provided, I(uuid) is used as key.
type: str
aliases: [ id ]
role_type:
description:
- Type of the role.
- Only considered for creation.
type: str
default: User
choices: [ User, DomainAdmin, ResourceAdmin, Admin ]
description:
description:
- Description of the role.
type: str
state:
description:
- State of the role.
type: str
default: present
choices: [ present, absent ]
extends_documentation_fragment: cloudstack
'''
EXAMPLES = '''
- name: Ensure an user role is present
cs_role:
name: myrole_user
delegate_to: localhost
- name: Ensure a role having particular ID is named as myrole_user
cs_role:
name: myrole_user
id: 04589590-ac63-4ffc-93f5-b698b8ac38b6
delegate_to: localhost
- name: Ensure a role is absent
cs_role:
name: myrole_user
state: absent
delegate_to: localhost
'''
RETURN = '''
---
id:
description: UUID of the role.
returned: success
type: str
sample: 04589590-ac63-4ffc-93f5-b698b8ac38b6
name:
description: Name of the role.
returned: success
type: str
sample: myrole
description:
description: Description of the role.
returned: success
type: str
sample: "This is my role description"
role_type:
description: Type of the role.
returned: success
type: str
sample: User
'''
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.cloudstack import (
AnsibleCloudStack,
cs_argument_spec,
cs_required_together,
)
class AnsibleCloudStackRole(AnsibleCloudStack):
def __init__(self, module):
super(AnsibleCloudStackRole, self).__init__(module)
self.returns = {
'type': 'role_type',
}
def get_role(self):
uuid = self.module.params.get('uuid')
if uuid:
args = {
'id': uuid,
}
roles = self.query_api('listRoles', **args)
if roles:
return roles['role'][0]
else:
args = {
'name': self.module.params.get('name'),
}
roles = self.query_api('listRoles', **args)
if roles:
return roles['role'][0]
return None
def present_role(self):
role = self.get_role()
if role:
role = self._update_role(role)
else:
role = self._create_role(role)
return role
def _create_role(self, role):
self.result['changed'] = True
args = {
'name': self.module.params.get('name'),
'type': self.module.params.get('role_type'),
'description': self.module.params.get('description'),
}
if not self.module.check_mode:
res = self.query_api('createRole', **args)
role = res['role']
return role
def _update_role(self, role):
args = {
'id': role['id'],
'name': self.module.params.get('name'),
'description': self.module.params.get('description'),
}
if self.has_changed(args, role):
self.result['changed'] = True
if not self.module.check_mode:
res = self.query_api('updateRole', **args)
# The API as in 4.9 does not return an updated role yet
if 'role' not in res:
role = self.get_role()
else:
role = res['role']
return role
def absent_role(self):
role = self.get_role()
if role:
self.result['changed'] = True
args = {
'id': role['id'],
}
if not self.module.check_mode:
self.query_api('deleteRole', **args)
return role
def main():
argument_spec = cs_argument_spec()
argument_spec.update(dict(
uuid=dict(aliases=['id']),
name=dict(required=True),
description=dict(),
role_type=dict(choices=['User', 'DomainAdmin', 'ResourceAdmin', 'Admin'], default='User'),
state=dict(choices=['present', 'absent'], default='present'),
))
module = AnsibleModule(
argument_spec=argument_spec,
required_together=cs_required_together(),
supports_check_mode=True
)
acs_role = AnsibleCloudStackRole(module)
state = module.params.get('state')
if state == 'absent':
role = acs_role.absent_role()
else:
role = acs_role.present_role()
result = acs_role.get_result(role)
module.exit_json(**result)
if __name__ == '__main__':
main()

@ -1,351 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017, David Passante (@dpassante)
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = '''
---
module: cs_role_permission
short_description: Manages role permissions on Apache CloudStack based clouds.
description:
- Create, update and remove CloudStack role permissions.
- Managing role permissions only supported in CloudStack >= 4.9.
version_added: '2.6'
author: David Passante (@dpassante)
options:
name:
description:
- The API name of the permission.
type: str
required: true
role:
description:
- Name or ID of the role.
type: str
required: true
permission:
description:
- The rule permission, allow or deny. Defaulted to deny.
type: str
choices: [ allow, deny ]
default: deny
state:
description:
- State of the role permission.
type: str
choices: [ present, absent ]
default: present
description:
description:
- The description of the role permission.
type: str
parent:
description:
- The parent role permission uuid. use 0 to move this rule at the top of the list.
type: str
extends_documentation_fragment: cloudstack
'''
EXAMPLES = '''
- name: Create a role permission
cs_role_permission:
role: My_Custom_role
name: createVPC
permission: allow
description: My comments
delegate_to: localhost
- name: Remove a role permission
cs_role_permission:
state: absent
role: My_Custom_role
name: createVPC
delegate_to: localhost
- name: Update a system role permission
cs_role_permission:
role: Domain Admin
name: createVPC
permission: deny
delegate_to: localhost
- name: Update rules order. Move the rule at the top of list
cs_role_permission:
role: Domain Admin
name: createVPC
parent: 0
delegate_to: localhost
'''
RETURN = '''
---
id:
description: The ID of the role permission.
returned: success
type: str
sample: a6f7a5fc-43f8-11e5-a151-feff819cdc9f
name:
description: The API name of the permission.
returned: success
type: str
sample: createVPC
permission:
description: The permission type of the api name.
returned: success
type: str
sample: allow
role_id:
description: The ID of the role to which the role permission belongs.
returned: success
type: str
sample: c6f7a5fc-43f8-11e5-a151-feff819cdc7f
description:
description: The description of the role permission
returned: success
type: str
sample: Deny createVPC for users
'''
from distutils.version import LooseVersion
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.cloudstack import (
AnsibleCloudStack,
cs_argument_spec,
cs_required_together,
)
class AnsibleCloudStackRolePermission(AnsibleCloudStack):
def __init__(self, module):
super(AnsibleCloudStackRolePermission, self).__init__(module)
cloudstack_min_version = LooseVersion('4.9.2')
self.returns = {
'id': 'id',
'roleid': 'role_id',
'rule': 'name',
'permission': 'permission',
'description': 'description',
}
self.role_permission = None
self.cloudstack_version = self._cloudstack_ver()
if self.cloudstack_version < cloudstack_min_version:
self.fail_json(msg="This module requires CloudStack >= %s." % cloudstack_min_version)
def _cloudstack_ver(self):
capabilities = self.get_capabilities()
return LooseVersion(capabilities['cloudstackversion'])
def _get_role_id(self):
role = self.module.params.get('role')
if not role:
return None
res = self.query_api('listRoles')
roles = res['role']
if roles:
for r in roles:
if role in [r['name'], r['id']]:
return r['id']
self.fail_json(msg="Role '%s' not found" % role)
def _get_role_perm(self):
role_permission = self.role_permission
args = {
'roleid': self._get_role_id(),
}
rp = self.query_api('listRolePermissions', **args)
if rp:
role_permission = rp['rolepermission']
return role_permission
def _get_rule(self, rule=None):
if not rule:
rule = self.module.params.get('name')
if self._get_role_perm():
for _rule in self._get_role_perm():
if rule == _rule['rule'] or rule == _rule['id']:
return _rule
return None
def _get_rule_order(self):
perms = self._get_role_perm()
rules = []
if perms:
for i, rule in enumerate(perms):
rules.append(rule['id'])
return rules
def replace_rule(self):
old_rule = self._get_rule()
if old_rule:
rules_order = self._get_rule_order()
old_pos = rules_order.index(old_rule['id'])
self.remove_role_perm()
new_rule = self.create_role_perm()
if new_rule:
perm_order = self.order_permissions(int(old_pos - 1), new_rule['id'])
return perm_order
return None
def order_permissions(self, parent, rule_id):
rules = self._get_rule_order()
if isinstance(parent, int):
parent_pos = parent
elif parent == '0':
parent_pos = -1
else:
parent_rule = self._get_rule(parent)
if not parent_rule:
self.fail_json(msg="Parent rule '%s' not found" % parent)
parent_pos = rules.index(parent_rule['id'])
r_id = rules.pop(rules.index(rule_id))
rules.insert((parent_pos + 1), r_id)
rules = ','.join(map(str, rules))
return rules
def create_or_update_role_perm(self):
role_permission = self._get_rule()
if not role_permission:
role_permission = self.create_role_perm()
else:
role_permission = self.update_role_perm(role_permission)
return role_permission
def create_role_perm(self):
role_permission = None
self.result['changed'] = True
args = {
'rule': self.module.params.get('name'),
'description': self.module.params.get('description'),
'roleid': self._get_role_id(),
'permission': self.module.params.get('permission'),
}
if not self.module.check_mode:
res = self.query_api('createRolePermission', **args)
role_permission = res['rolepermission']
return role_permission
def update_role_perm(self, role_perm):
perm_order = None
if not self.module.params.get('parent'):
args = {
'ruleid': role_perm['id'],
'roleid': role_perm['roleid'],
'permission': self.module.params.get('permission'),
}
if self.has_changed(args, role_perm, only_keys=['permission']):
self.result['changed'] = True
if not self.module.check_mode:
if self.cloudstack_version >= LooseVersion('4.11.0'):
self.query_api('updateRolePermission', **args)
role_perm = self._get_rule()
else:
perm_order = self.replace_rule()
else:
perm_order = self.order_permissions(self.module.params.get('parent'), role_perm['id'])
if perm_order:
args = {
'roleid': role_perm['roleid'],
'ruleorder': perm_order,
}
self.result['changed'] = True
if not self.module.check_mode:
self.query_api('updateRolePermission', **args)
role_perm = self._get_rule()
return role_perm
def remove_role_perm(self):
role_permission = self._get_rule()
if role_permission:
self.result['changed'] = True
args = {
'id': role_permission['id'],
}
if not self.module.check_mode:
self.query_api('deleteRolePermission', **args)
return role_permission
def main():
argument_spec = cs_argument_spec()
argument_spec.update(dict(
role=dict(required=True),
name=dict(required=True),
permission=dict(choices=['allow', 'deny'], default='deny'),
description=dict(),
state=dict(choices=['present', 'absent'], default='present'),
parent=dict(),
))
module = AnsibleModule(
argument_spec=argument_spec,
required_together=cs_required_together(),
mutually_exclusive=(
['permission', 'parent'],
),
supports_check_mode=True
)
acs_role_perm = AnsibleCloudStackRolePermission(module)
state = module.params.get('state')
if state in ['absent']:
role_permission = acs_role_perm.remove_role_perm()
else:
role_permission = acs_role_perm.create_or_update_role_perm()
result = acs_role_perm.get_result(role_permission)
module.exit_json(**result)
if __name__ == '__main__':
main()
Loading…
Cancel
Save