Removed tests migrated to ansible/ansible repo. (#3330)

pull/18777/head
Matt Clay 8 years ago
parent db0ac4936b
commit 4ada3463ca

@ -1,76 +0,0 @@
- name: Launching NAT Gateway and allocate a new eip.
ec2_vpc_nat_gateway:
region: us-west-2
state: present
subnet_id: "{{ test_subnet_id }}"
wait: yes
wait_timeout: 600
register: nat
- debug:
var: nat
- fail:
msg: "Failed to create"
when: '"{{ nat["changed"] }}" != "True"'
- name: Launch a new gateway only if one does not exist already in this subnet.
ec2_vpc_nat_gateway:
if_exist_do_not_create: yes
region: us-west-2
state: present
subnet_id: "{{ test_subnet_id }}"
wait: yes
wait_timeout: 600
register: nat_idempotent
- debug:
var: nat_idempotent
- fail:
msg: "Failed to be idempotent"
when: '"{{ nat_idempotent["changed"] }}" == "True"'
- name: Launching NAT Gateway and allocate a new eip even if one already exists in the subnet.
ec2_vpc_nat_gateway:
region: us-west-2
state: present
subnet_id: "{{ test_subnet_id }}"
wait: yes
wait_timeout: 600
register: new_nat
- debug:
var: new_nat
- fail:
msg: "Failed to create"
when: '"{{ new_nat["changed"] }}" != "True"'
- name: Launching NAT Gateway with allocation id, this call is idempotent and will not create anything.
ec2_vpc_nat_gateway:
allocation_id: eipalloc-1234567
region: us-west-2
state: present
subnet_id: "{{ test_subnet_id }}"
wait: yes
wait_timeout: 600
register: nat_with_eipalloc
- debug:
var: nat_with_eipalloc
- fail:
msg: 'Failed to be idempotent.'
when: '"{{ nat_with_eipalloc["changed"] }}" == "True"'
- name: Delete the 1st nat gateway and do not wait for it to finish
ec2_vpc_nat_gateway:
region: us-west-2
nat_gateway_id: "{{ nat.nat_gateway_id }}"
state: absent
- name: Delete the nat_with_eipalloc and release the eip
ec2_vpc_nat_gateway:
region: us-west-2
nat_gateway_id: "{{ new_nat.nat_gateway_id }}"
release_eip: yes
state: absent
wait: yes
wait_timeout: 600

@ -1,3 +0,0 @@
- hosts: 127.0.0.1
roles:
- { role: ec2_vpc_nat_gateway }

@ -1,486 +0,0 @@
#!/usr/bin/python
import boto3
import unittest
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
import cloud.amazon.ec2_vpc_nat_gateway as ng
Options = (
namedtuple(
'Options', [
'connection', 'module_path', 'forks', 'become', 'become_method',
'become_user', 'remote_user', 'private_key_file', 'ssh_common_args',
'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity',
'check'
]
)
)
# initialize needed objects
variable_manager = VariableManager()
loader = DataLoader()
options = (
Options(
connection='local',
module_path='cloud/amazon',
forks=1, become=None, become_method=None, become_user=None, check=True,
remote_user=None, private_key_file=None, ssh_common_args=None,
sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None,
verbosity=3
)
)
passwords = dict(vault_pass='')
aws_region = 'us-west-2'
# create inventory and pass to var manager
inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost')
variable_manager.set_inventory(inventory)
def run(play):
tqm = None
results = None
try:
tqm = TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
options=options,
passwords=passwords,
stdout_callback='default',
)
results = tqm.run(play)
finally:
if tqm is not None:
tqm.cleanup()
return tqm, results
class AnsibleVpcNatGatewayTasks(unittest.TestCase):
def test_create_gateway_using_allocation_id(self):
play_source = dict(
name = "Create new nat gateway with eip allocation-id",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
subnet_id='subnet-12345678',
allocation_id='eipalloc-12345678',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.failUnless(tqm._stats.changed['localhost'] == 1)
def test_create_gateway_using_allocation_id_idempotent(self):
play_source = dict(
name = "Create new nat gateway with eip allocation-id",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
subnet_id='subnet-123456789',
allocation_id='eipalloc-1234567',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.assertFalse(tqm._stats.changed.has_key('localhost'))
def test_create_gateway_using_eip_address(self):
play_source = dict(
name = "Create new nat gateway with eip address",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
subnet_id='subnet-12345678',
eip_address='55.55.55.55',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.failUnless(tqm._stats.changed['localhost'] == 1)
def test_create_gateway_using_eip_address_idempotent(self):
play_source = dict(
name = "Create new nat gateway with eip address",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
subnet_id='subnet-123456789',
eip_address='55.55.55.55',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.assertFalse(tqm._stats.changed.has_key('localhost'))
def test_create_gateway_in_subnet_only_if_one_does_not_exist_already(self):
play_source = dict(
name = "Create new nat gateway only if one does not exist already",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
if_exist_do_not_create='yes',
subnet_id='subnet-123456789',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.assertFalse(tqm._stats.changed.has_key('localhost'))
def test_delete_gateway(self):
play_source = dict(
name = "Delete Nat Gateway",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
nat_gateway_id='nat-123456789',
state='absent',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.assertTrue(tqm._stats.changed.has_key('localhost'))
class AnsibleEc2VpcNatGatewayFunctions(unittest.TestCase):
def test_convert_to_lower(self):
example = ng.DRY_RUN_GATEWAY_UNCONVERTED
converted_example = ng.convert_to_lower(example[0])
keys = converted_example.keys()
keys.sort()
for i in range(len(keys)):
if i == 0:
self.assertEqual(keys[i], 'create_time')
if i == 1:
self.assertEqual(keys[i], 'nat_gateway_addresses')
gw_addresses_keys = converted_example[keys[i]][0].keys()
gw_addresses_keys.sort()
for j in range(len(gw_addresses_keys)):
if j == 0:
self.assertEqual(gw_addresses_keys[j], 'allocation_id')
if j == 1:
self.assertEqual(gw_addresses_keys[j], 'network_interface_id')
if j == 2:
self.assertEqual(gw_addresses_keys[j], 'private_ip')
if j == 3:
self.assertEqual(gw_addresses_keys[j], 'public_ip')
if i == 2:
self.assertEqual(keys[i], 'nat_gateway_id')
if i == 3:
self.assertEqual(keys[i], 'state')
if i == 4:
self.assertEqual(keys[i], 'subnet_id')
if i == 5:
self.assertEqual(keys[i], 'vpc_id')
def test_get_nat_gateways(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, stream = (
ng.get_nat_gateways(client, 'subnet-123456789', check_mode=True)
)
should_return = ng.DRY_RUN_GATEWAYS
self.assertTrue(success)
self.assertEqual(stream, should_return)
def test_get_nat_gateways_no_gateways_found(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, stream = (
ng.get_nat_gateways(client, 'subnet-1234567', check_mode=True)
)
self.assertTrue(success)
self.assertEqual(stream, [])
def test_wait_for_status(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, gws = (
ng.wait_for_status(
client, 5, 'nat-123456789', 'available', check_mode=True
)
)
should_return = ng.DRY_RUN_GATEWAYS[0]
self.assertTrue(success)
self.assertEqual(gws, should_return)
def test_wait_for_status_to_timeout(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, gws = (
ng.wait_for_status(
client, 2, 'nat-12345678', 'available', check_mode=True
)
)
self.assertFalse(success)
self.assertEqual(gws, {})
def test_gateway_in_subnet_exists_with_allocation_id(self):
client = boto3.client('ec2', region_name=aws_region)
gws, err_msg = (
ng.gateway_in_subnet_exists(
client, 'subnet-123456789', 'eipalloc-1234567', check_mode=True
)
)
should_return = ng.DRY_RUN_GATEWAYS
self.assertEqual(gws, should_return)
def test_gateway_in_subnet_exists_with_allocation_id_does_not_exist(self):
client = boto3.client('ec2', region_name=aws_region)
gws, err_msg = (
ng.gateway_in_subnet_exists(
client, 'subnet-123456789', 'eipalloc-123', check_mode=True
)
)
should_return = list()
self.assertEqual(gws, should_return)
def test_gateway_in_subnet_exists_without_allocation_id(self):
client = boto3.client('ec2', region_name=aws_region)
gws, err_msg = (
ng.gateway_in_subnet_exists(
client, 'subnet-123456789', check_mode=True
)
)
should_return = ng.DRY_RUN_GATEWAYS
self.assertEqual(gws, should_return)
def test_get_eip_allocation_id_by_address(self):
client = boto3.client('ec2', region_name=aws_region)
allocation_id, _ = (
ng.get_eip_allocation_id_by_address(
client, '55.55.55.55', check_mode=True
)
)
should_return = 'eipalloc-1234567'
self.assertEqual(allocation_id, should_return)
def test_get_eip_allocation_id_by_address_does_not_exist(self):
client = boto3.client('ec2', region_name=aws_region)
allocation_id, err_msg = (
ng.get_eip_allocation_id_by_address(
client, '52.52.52.52', check_mode=True
)
)
self.assertEqual(err_msg, 'EIP 52.52.52.52 does not exist')
self.assertIsNone(allocation_id)
def test_allocate_eip_address(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, eip_id = (
ng.allocate_eip_address(
client, check_mode=True
)
)
self.assertTrue(success)
def test_release_address(self):
client = boto3.client('ec2', region_name=aws_region)
success, _ = (
ng.release_address(
client, 'eipalloc-1234567', check_mode=True
)
)
self.assertTrue(success)
def test_create(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.create(
client, 'subnet-123456', 'eipalloc-1234567', check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
def test_pre_create(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.pre_create(
client, 'subnet-123456', check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
def test_pre_create_idemptotent_with_allocation_id(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.pre_create(
client, 'subnet-123456789', allocation_id='eipalloc-1234567', check_mode=True
)
)
self.assertTrue(success)
self.assertFalse(changed)
def test_pre_create_idemptotent_with_eip_address(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.pre_create(
client, 'subnet-123456789', eip_address='55.55.55.55', check_mode=True
)
)
self.assertTrue(success)
self.assertFalse(changed)
def test_pre_create_idemptotent_if_exist_do_not_create(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.pre_create(
client, 'subnet-123456789', if_exist_do_not_create=True, check_mode=True
)
)
self.assertTrue(success)
self.assertFalse(changed)
def test_delete(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, _ = (
ng.remove(
client, 'nat-123456789', check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
def test_delete_and_release_ip(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, _ = (
ng.remove(
client, 'nat-123456789', release_eip=True, check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
def test_delete_if_does_not_exist(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, _ = (
ng.remove(
client, 'nat-12345', check_mode=True
)
)
self.assertFalse(success)
self.assertFalse(changed)
def main():
unittest.main()
if __name__ == '__main__':
main()

@ -1,285 +0,0 @@
#!/usr/bin/python
import boto3
import unittest
import cloud.amazon.kinesis_stream as kinesis_stream
aws_region = 'us-west-2'
class AnsibleKinesisStreamFunctions(unittest.TestCase):
def test_convert_to_lower(self):
example = {
'HasMoreShards': True,
'RetentionPeriodHours': 24,
'StreamName': 'test',
'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
'StreamStatus': 'ACTIVE'
}
converted_example = kinesis_stream.convert_to_lower(example)
keys = converted_example.keys()
keys.sort()
for i in range(len(keys)):
if i == 0:
self.assertEqual(keys[i], 'has_more_shards')
if i == 1:
self.assertEqual(keys[i], 'retention_period_hours')
if i == 2:
self.assertEqual(keys[i], 'stream_arn')
if i == 3:
self.assertEqual(keys[i], 'stream_name')
if i == 4:
self.assertEqual(keys[i], 'stream_status')
def test_make_tags_in_aws_format(self):
example = {
'env': 'development'
}
should_return = [
{
'Key': 'env',
'Value': 'development'
}
]
aws_tags = kinesis_stream.make_tags_in_aws_format(example)
self.assertEqual(aws_tags, should_return)
def test_make_tags_in_proper_format(self):
example = [
{
'Key': 'env',
'Value': 'development'
},
{
'Key': 'service',
'Value': 'web'
}
]
should_return = {
'env': 'development',
'service': 'web'
}
proper_tags = kinesis_stream.make_tags_in_proper_format(example)
self.assertEqual(proper_tags, should_return)
def test_recreate_tags_from_list(self):
example = [('environment', 'development'), ('service', 'web')]
should_return = [
{
'Key': 'environment',
'Value': 'development'
},
{
'Key': 'service',
'Value': 'web'
}
]
aws_tags = kinesis_stream.recreate_tags_from_list(example)
self.assertEqual(aws_tags, should_return)
def test_get_tags(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg, tags = kinesis_stream.get_tags(client, 'test', True)
self.assertTrue(success)
should_return = [
{
'Key': 'DryRunMode',
'Value': 'true'
}
]
self.assertEqual(tags, should_return)
def test_find_stream(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg, stream = (
kinesis_stream.find_stream(client, 'test', check_mode=True)
)
should_return = {
'HasMoreShards': True,
'RetentionPeriodHours': 24,
'StreamName': 'test',
'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
'StreamStatus': 'ACTIVE'
}
self.assertTrue(success)
self.assertEqual(stream, should_return)
def test_wait_for_status(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg, stream = (
kinesis_stream.wait_for_status(
client, 'test', 'ACTIVE', check_mode=True
)
)
should_return = {
'HasMoreShards': True,
'RetentionPeriodHours': 24,
'StreamName': 'test',
'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
'StreamStatus': 'ACTIVE'
}
self.assertTrue(success)
self.assertEqual(stream, should_return)
def test_tags_action_create(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, err_msg = (
kinesis_stream.tags_action(
client, 'test', tags, 'create', check_mode=True
)
)
self.assertTrue(success)
def test_tags_action_delete(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, err_msg = (
kinesis_stream.tags_action(
client, 'test', tags, 'delete', check_mode=True
)
)
self.assertTrue(success)
def test_tags_action_invalid(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, err_msg = (
kinesis_stream.tags_action(
client, 'test', tags, 'append', check_mode=True
)
)
self.assertFalse(success)
def test_update_tags(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, err_msg = (
kinesis_stream.update_tags(
client, 'test', tags, check_mode=True
)
)
self.assertTrue(success)
def test_stream_action_create(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.stream_action(
client, 'test', 10, 'create', check_mode=True
)
)
self.assertTrue(success)
def test_stream_action_delete(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.stream_action(
client, 'test', 10, 'delete', check_mode=True
)
)
self.assertTrue(success)
def test_stream_action_invalid(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.stream_action(
client, 'test', 10, 'append', check_mode=True
)
)
self.assertFalse(success)
def test_retention_action_increase(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.retention_action(
client, 'test', 48, 'increase', check_mode=True
)
)
self.assertTrue(success)
def test_retention_action_decrease(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.retention_action(
client, 'test', 24, 'decrease', check_mode=True
)
)
self.assertTrue(success)
def test_retention_action_invalid(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.retention_action(
client, 'test', 24, 'create', check_mode=True
)
)
self.assertFalse(success)
def test_update(self):
client = boto3.client('kinesis', region_name=aws_region)
current_stream = {
'HasMoreShards': True,
'RetentionPeriodHours': 24,
'StreamName': 'test',
'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
'StreamStatus': 'ACTIVE'
}
tags = {
'env': 'development',
'service': 'web'
}
success, changed, err_msg = (
kinesis_stream.update(
client, current_stream, 'test', retention_period=48,
tags=tags, check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')
def test_create_stream(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, changed, err_msg, results = (
kinesis_stream.create_stream(
client, 'test', number_of_shards=10, retention_period=48,
tags=tags, check_mode=True
)
)
should_return = {
'has_more_shards': True,
'retention_period_hours': 24,
'stream_name': 'test',
'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test',
'stream_status': 'ACTIVE',
'tags': tags,
}
self.assertTrue(success)
self.assertTrue(changed)
self.assertEqual(results, should_return)
self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')
def main():
unittest.main()
if __name__ == '__main__':
main()
Loading…
Cancel
Save