Merge pull request #18275 from mattclay/test-move

Move amazon unit tests and apply fixes.
pull/18276/head
Matt Clay 8 years ago committed by GitHub
commit 723872c400

@ -0,0 +1,497 @@
#!/usr/bin/python
from nose.plugins.skip import SkipTest
try:
import boto3
import botocore
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
if not HAS_BOTO3:
raise SkipTest("test_ec2_vpc_nat_gateway.py requires the python module 'boto3' and 'botocore'")
import unittest
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
import ansible.modules.extras.cloud.amazon.ec2_vpc_nat_gateway as ng
Options = (
namedtuple(
'Options', [
'connection', 'module_path', 'forks', 'become', 'become_method',
'become_user', 'remote_user', 'private_key_file', 'ssh_common_args',
'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity',
'check'
]
)
)
# initialize needed objects
variable_manager = VariableManager()
loader = DataLoader()
options = (
Options(
connection='local',
module_path='cloud/amazon',
forks=1, become=None, become_method=None, become_user=None, check=True,
remote_user=None, private_key_file=None, ssh_common_args=None,
sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None,
verbosity=3
)
)
passwords = dict(vault_pass='')
aws_region = 'us-west-2'
# create inventory and pass to var manager
inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost')
variable_manager.set_inventory(inventory)
def run(play):
tqm = None
results = None
try:
tqm = TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
options=options,
passwords=passwords,
stdout_callback='default',
)
results = tqm.run(play)
finally:
if tqm is not None:
tqm.cleanup()
return tqm, results
class AnsibleVpcNatGatewayTasks(unittest.TestCase):
def test_create_gateway_using_allocation_id(self):
play_source = dict(
name = "Create new nat gateway with eip allocation-id",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
subnet_id='subnet-12345678',
allocation_id='eipalloc-12345678',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.failUnless(tqm._stats.changed['localhost'] == 1)
def test_create_gateway_using_allocation_id_idempotent(self):
play_source = dict(
name = "Create new nat gateway with eip allocation-id",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
subnet_id='subnet-123456789',
allocation_id='eipalloc-1234567',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.assertFalse('localhost' in tqm._stats.changed)
def test_create_gateway_using_eip_address(self):
play_source = dict(
name = "Create new nat gateway with eip address",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
subnet_id='subnet-12345678',
eip_address='55.55.55.55',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.failUnless(tqm._stats.changed['localhost'] == 1)
def test_create_gateway_using_eip_address_idempotent(self):
play_source = dict(
name = "Create new nat gateway with eip address",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
subnet_id='subnet-123456789',
eip_address='55.55.55.55',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.assertFalse('localhost' in tqm._stats.changed)
def test_create_gateway_in_subnet_only_if_one_does_not_exist_already(self):
play_source = dict(
name = "Create new nat gateway only if one does not exist already",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
if_exist_do_not_create='yes',
subnet_id='subnet-123456789',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.assertFalse('localhost' in tqm._stats.changed)
def test_delete_gateway(self):
play_source = dict(
name = "Delete Nat Gateway",
hosts = 'localhost',
gather_facts = 'no',
tasks = [
dict(
action=dict(
module='ec2_vpc_nat_gateway',
args=dict(
nat_gateway_id='nat-123456789',
state='absent',
wait='yes',
region=aws_region,
)
),
register='nat_gateway',
),
dict(
action=dict(
module='debug',
args=dict(
msg='{{nat_gateway}}'
)
)
)
]
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm, results = run(play)
self.failUnless(tqm._stats.ok['localhost'] == 2)
self.assertTrue('localhost' in tqm._stats.changed)
class AnsibleEc2VpcNatGatewayFunctions(unittest.TestCase):
def test_convert_to_lower(self):
example = ng.DRY_RUN_GATEWAY_UNCONVERTED
converted_example = ng.convert_to_lower(example[0])
keys = list(converted_example.keys())
keys.sort()
for i in range(len(keys)):
if i == 0:
self.assertEqual(keys[i], 'create_time')
if i == 1:
self.assertEqual(keys[i], 'nat_gateway_addresses')
gw_addresses_keys = list(converted_example[keys[i]][0].keys())
gw_addresses_keys.sort()
for j in range(len(gw_addresses_keys)):
if j == 0:
self.assertEqual(gw_addresses_keys[j], 'allocation_id')
if j == 1:
self.assertEqual(gw_addresses_keys[j], 'network_interface_id')
if j == 2:
self.assertEqual(gw_addresses_keys[j], 'private_ip')
if j == 3:
self.assertEqual(gw_addresses_keys[j], 'public_ip')
if i == 2:
self.assertEqual(keys[i], 'nat_gateway_id')
if i == 3:
self.assertEqual(keys[i], 'state')
if i == 4:
self.assertEqual(keys[i], 'subnet_id')
if i == 5:
self.assertEqual(keys[i], 'vpc_id')
def test_get_nat_gateways(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, stream = (
ng.get_nat_gateways(client, 'subnet-123456789', check_mode=True)
)
should_return = ng.DRY_RUN_GATEWAYS
self.assertTrue(success)
self.assertEqual(stream, should_return)
def test_get_nat_gateways_no_gateways_found(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, stream = (
ng.get_nat_gateways(client, 'subnet-1234567', check_mode=True)
)
self.assertTrue(success)
self.assertEqual(stream, [])
def test_wait_for_status(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, gws = (
ng.wait_for_status(
client, 5, 'nat-123456789', 'available', check_mode=True
)
)
should_return = ng.DRY_RUN_GATEWAYS[0]
self.assertTrue(success)
self.assertEqual(gws, should_return)
def test_wait_for_status_to_timeout(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, gws = (
ng.wait_for_status(
client, 2, 'nat-12345678', 'available', check_mode=True
)
)
self.assertFalse(success)
self.assertEqual(gws, {})
def test_gateway_in_subnet_exists_with_allocation_id(self):
client = boto3.client('ec2', region_name=aws_region)
gws, err_msg = (
ng.gateway_in_subnet_exists(
client, 'subnet-123456789', 'eipalloc-1234567', check_mode=True
)
)
should_return = ng.DRY_RUN_GATEWAYS
self.assertEqual(gws, should_return)
def test_gateway_in_subnet_exists_with_allocation_id_does_not_exist(self):
client = boto3.client('ec2', region_name=aws_region)
gws, err_msg = (
ng.gateway_in_subnet_exists(
client, 'subnet-123456789', 'eipalloc-123', check_mode=True
)
)
should_return = list()
self.assertEqual(gws, should_return)
def test_gateway_in_subnet_exists_without_allocation_id(self):
client = boto3.client('ec2', region_name=aws_region)
gws, err_msg = (
ng.gateway_in_subnet_exists(
client, 'subnet-123456789', check_mode=True
)
)
should_return = ng.DRY_RUN_GATEWAYS
self.assertEqual(gws, should_return)
def test_get_eip_allocation_id_by_address(self):
client = boto3.client('ec2', region_name=aws_region)
allocation_id, _ = (
ng.get_eip_allocation_id_by_address(
client, '55.55.55.55', check_mode=True
)
)
should_return = 'eipalloc-1234567'
self.assertEqual(allocation_id, should_return)
def test_get_eip_allocation_id_by_address_does_not_exist(self):
client = boto3.client('ec2', region_name=aws_region)
allocation_id, err_msg = (
ng.get_eip_allocation_id_by_address(
client, '52.52.52.52', check_mode=True
)
)
self.assertEqual(err_msg, 'EIP 52.52.52.52 does not exist')
self.assertTrue(allocation_id is None)
def test_allocate_eip_address(self):
client = boto3.client('ec2', region_name=aws_region)
success, err_msg, eip_id = (
ng.allocate_eip_address(
client, check_mode=True
)
)
self.assertTrue(success)
def test_release_address(self):
client = boto3.client('ec2', region_name=aws_region)
success, _ = (
ng.release_address(
client, 'eipalloc-1234567', check_mode=True
)
)
self.assertTrue(success)
def test_create(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.create(
client, 'subnet-123456', 'eipalloc-1234567', check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
def test_pre_create(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.pre_create(
client, 'subnet-123456', check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
def test_pre_create_idemptotent_with_allocation_id(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.pre_create(
client, 'subnet-123456789', allocation_id='eipalloc-1234567', check_mode=True
)
)
self.assertTrue(success)
self.assertFalse(changed)
def test_pre_create_idemptotent_with_eip_address(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.pre_create(
client, 'subnet-123456789', eip_address='55.55.55.55', check_mode=True
)
)
self.assertTrue(success)
self.assertFalse(changed)
def test_pre_create_idemptotent_if_exist_do_not_create(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, results = (
ng.pre_create(
client, 'subnet-123456789', if_exist_do_not_create=True, check_mode=True
)
)
self.assertTrue(success)
self.assertFalse(changed)
def test_delete(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, _ = (
ng.remove(
client, 'nat-123456789', check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
def test_delete_and_release_ip(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, _ = (
ng.remove(
client, 'nat-123456789', release_eip=True, check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
def test_delete_if_does_not_exist(self):
client = boto3.client('ec2', region_name=aws_region)
success, changed, err_msg, _ = (
ng.remove(
client, 'nat-12345', check_mode=True
)
)
self.assertFalse(success)
self.assertFalse(changed)
def main():
unittest.main()
if __name__ == '__main__':
main()

@ -0,0 +1,296 @@
#!/usr/bin/python
from nose.plugins.skip import SkipTest
try:
import boto3
import botocore
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
if not HAS_BOTO3:
raise SkipTest("test_kinesis_stream.py requires the python module 'boto3' and 'botocore'")
import unittest
import ansible.modules.extras.cloud.amazon.kinesis_stream as kinesis_stream
aws_region = 'us-west-2'
class AnsibleKinesisStreamFunctions(unittest.TestCase):
def test_convert_to_lower(self):
example = {
'HasMoreShards': True,
'RetentionPeriodHours': 24,
'StreamName': 'test',
'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
'StreamStatus': 'ACTIVE'
}
converted_example = kinesis_stream.convert_to_lower(example)
keys = list(converted_example.keys())
keys.sort()
for i in range(len(keys)):
if i == 0:
self.assertEqual(keys[i], 'has_more_shards')
if i == 1:
self.assertEqual(keys[i], 'retention_period_hours')
if i == 2:
self.assertEqual(keys[i], 'stream_arn')
if i == 3:
self.assertEqual(keys[i], 'stream_name')
if i == 4:
self.assertEqual(keys[i], 'stream_status')
def test_make_tags_in_aws_format(self):
example = {
'env': 'development'
}
should_return = [
{
'Key': 'env',
'Value': 'development'
}
]
aws_tags = kinesis_stream.make_tags_in_aws_format(example)
self.assertEqual(aws_tags, should_return)
def test_make_tags_in_proper_format(self):
example = [
{
'Key': 'env',
'Value': 'development'
},
{
'Key': 'service',
'Value': 'web'
}
]
should_return = {
'env': 'development',
'service': 'web'
}
proper_tags = kinesis_stream.make_tags_in_proper_format(example)
self.assertEqual(proper_tags, should_return)
def test_recreate_tags_from_list(self):
example = [('environment', 'development'), ('service', 'web')]
should_return = [
{
'Key': 'environment',
'Value': 'development'
},
{
'Key': 'service',
'Value': 'web'
}
]
aws_tags = kinesis_stream.recreate_tags_from_list(example)
self.assertEqual(aws_tags, should_return)
def test_get_tags(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True)
self.assertTrue(success)
should_return = [
{
'Key': 'DryRunMode',
'Value': 'true'
}
]
self.assertEqual(tags, should_return)
def test_find_stream(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg, stream = (
kinesis_stream.find_stream(client, 'test', check_mode=True)
)
should_return = {
'HasMoreShards': True,
'RetentionPeriodHours': 24,
'StreamName': 'test',
'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
'StreamStatus': 'ACTIVE'
}
self.assertTrue(success)
self.assertEqual(stream, should_return)
def test_wait_for_status(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg, stream = (
kinesis_stream.wait_for_status(
client, 'test', 'ACTIVE', check_mode=True
)
)
should_return = {
'HasMoreShards': True,
'RetentionPeriodHours': 24,
'StreamName': 'test',
'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
'StreamStatus': 'ACTIVE'
}
self.assertTrue(success)
self.assertEqual(stream, should_return)
def test_tags_action_create(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, err_msg = (
kinesis_stream.tags_action(
client, 'test', tags, 'create', check_mode=True
)
)
self.assertTrue(success)
def test_tags_action_delete(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, err_msg = (
kinesis_stream.tags_action(
client, 'test', tags, 'delete', check_mode=True
)
)
self.assertTrue(success)
def test_tags_action_invalid(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, err_msg = (
kinesis_stream.tags_action(
client, 'test', tags, 'append', check_mode=True
)
)
self.assertFalse(success)
def test_update_tags(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, changed, err_msg = (
kinesis_stream.update_tags(
client, 'test', tags, check_mode=True
)
)
self.assertTrue(success)
def test_stream_action_create(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.stream_action(
client, 'test', 10, 'create', check_mode=True
)
)
self.assertTrue(success)
def test_stream_action_delete(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.stream_action(
client, 'test', 10, 'delete', check_mode=True
)
)
self.assertTrue(success)
def test_stream_action_invalid(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.stream_action(
client, 'test', 10, 'append', check_mode=True
)
)
self.assertFalse(success)
def test_retention_action_increase(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.retention_action(
client, 'test', 48, 'increase', check_mode=True
)
)
self.assertTrue(success)
def test_retention_action_decrease(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.retention_action(
client, 'test', 24, 'decrease', check_mode=True
)
)
self.assertTrue(success)
def test_retention_action_invalid(self):
client = boto3.client('kinesis', region_name=aws_region)
success, err_msg = (
kinesis_stream.retention_action(
client, 'test', 24, 'create', check_mode=True
)
)
self.assertFalse(success)
def test_update(self):
client = boto3.client('kinesis', region_name=aws_region)
current_stream = {
'HasMoreShards': True,
'RetentionPeriodHours': 24,
'StreamName': 'test',
'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
'StreamStatus': 'ACTIVE'
}
tags = {
'env': 'development',
'service': 'web'
}
success, changed, err_msg = (
kinesis_stream.update(
client, current_stream, 'test', retention_period=48,
tags=tags, check_mode=True
)
)
self.assertTrue(success)
self.assertTrue(changed)
self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')
def test_create_stream(self):
client = boto3.client('kinesis', region_name=aws_region)
tags = {
'env': 'development',
'service': 'web'
}
success, changed, err_msg, results = (
kinesis_stream.create_stream(
client, 'test', number_of_shards=10, retention_period=48,
tags=tags, check_mode=True
)
)
should_return = {
'has_more_shards': True,
'retention_period_hours': 24,
'stream_name': 'test',
'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test',
'stream_status': 'ACTIVE',
'tags': tags,
}
self.assertTrue(success)
self.assertTrue(changed)
self.assertEqual(results, should_return)
self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')
def main():
unittest.main()
if __name__ == '__main__':
main()
Loading…
Cancel
Save