diff --git a/test/units/modules/extras/__init__.py b/test/units/modules/extras/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/units/modules/extras/cloud/__init__.py b/test/units/modules/extras/cloud/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/units/modules/extras/cloud/amazon/__init__.py b/test/units/modules/extras/cloud/amazon/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/units/modules/extras/cloud/amazon/test_ec2_vpc_nat_gateway.py b/test/units/modules/extras/cloud/amazon/test_ec2_vpc_nat_gateway.py new file mode 100644 index 00000000000..573fa377787 --- /dev/null +++ b/test/units/modules/extras/cloud/amazon/test_ec2_vpc_nat_gateway.py @@ -0,0 +1,497 @@ +#!/usr/bin/python + +from nose.plugins.skip import SkipTest + +try: + import boto3 + import botocore + HAS_BOTO3 = True +except ImportError: + HAS_BOTO3 = False + +if not HAS_BOTO3: + raise SkipTest("test_ec2_vpc_nat_gateway.py requires the python module 'boto3' and 'botocore'") + +import unittest + +from collections import namedtuple +from ansible.parsing.dataloader import DataLoader +from ansible.vars import VariableManager +from ansible.inventory import Inventory +from ansible.playbook.play import Play +from ansible.executor.task_queue_manager import TaskQueueManager + +import ansible.modules.extras.cloud.amazon.ec2_vpc_nat_gateway as ng + +Options = ( + namedtuple( + 'Options', [ + 'connection', 'module_path', 'forks', 'become', 'become_method', + 'become_user', 'remote_user', 'private_key_file', 'ssh_common_args', + 'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity', + 'check' + ] + ) +) +# initialize needed objects +variable_manager = VariableManager() +loader = DataLoader() +options = ( + Options( + connection='local', + module_path='cloud/amazon', + forks=1, become=None, become_method=None, become_user=None, check=True, + remote_user=None, private_key_file=None, ssh_common_args=None, + sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, + verbosity=3 + ) +) +passwords = dict(vault_pass='') + +aws_region = 'us-west-2' + +# create inventory and pass to var manager +inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost') +variable_manager.set_inventory(inventory) + +def run(play): + tqm = None + results = None + try: + tqm = TaskQueueManager( + inventory=inventory, + variable_manager=variable_manager, + loader=loader, + options=options, + passwords=passwords, + stdout_callback='default', + ) + results = tqm.run(play) + finally: + if tqm is not None: + tqm.cleanup() + return tqm, results + +class AnsibleVpcNatGatewayTasks(unittest.TestCase): + + def test_create_gateway_using_allocation_id(self): + play_source = dict( + name = "Create new nat gateway with eip allocation-id", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + subnet_id='subnet-12345678', + allocation_id='eipalloc-12345678', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.failUnless(tqm._stats.changed['localhost'] == 1) + + def test_create_gateway_using_allocation_id_idempotent(self): + play_source = dict( + name = "Create new nat gateway with eip allocation-id", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + subnet_id='subnet-123456789', + allocation_id='eipalloc-1234567', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.assertFalse('localhost' in tqm._stats.changed) + + def test_create_gateway_using_eip_address(self): + play_source = dict( + name = "Create new nat gateway with eip address", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + subnet_id='subnet-12345678', + eip_address='55.55.55.55', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.failUnless(tqm._stats.changed['localhost'] == 1) + + def test_create_gateway_using_eip_address_idempotent(self): + play_source = dict( + name = "Create new nat gateway with eip address", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + subnet_id='subnet-123456789', + eip_address='55.55.55.55', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.assertFalse('localhost' in tqm._stats.changed) + + def test_create_gateway_in_subnet_only_if_one_does_not_exist_already(self): + play_source = dict( + name = "Create new nat gateway only if one does not exist already", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + if_exist_do_not_create='yes', + subnet_id='subnet-123456789', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.assertFalse('localhost' in tqm._stats.changed) + + def test_delete_gateway(self): + play_source = dict( + name = "Delete Nat Gateway", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + nat_gateway_id='nat-123456789', + state='absent', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.assertTrue('localhost' in tqm._stats.changed) + +class AnsibleEc2VpcNatGatewayFunctions(unittest.TestCase): + + def test_convert_to_lower(self): + example = ng.DRY_RUN_GATEWAY_UNCONVERTED + converted_example = ng.convert_to_lower(example[0]) + keys = list(converted_example.keys()) + keys.sort() + for i in range(len(keys)): + if i == 0: + self.assertEqual(keys[i], 'create_time') + if i == 1: + self.assertEqual(keys[i], 'nat_gateway_addresses') + gw_addresses_keys = list(converted_example[keys[i]][0].keys()) + gw_addresses_keys.sort() + for j in range(len(gw_addresses_keys)): + if j == 0: + self.assertEqual(gw_addresses_keys[j], 'allocation_id') + if j == 1: + self.assertEqual(gw_addresses_keys[j], 'network_interface_id') + if j == 2: + self.assertEqual(gw_addresses_keys[j], 'private_ip') + if j == 3: + self.assertEqual(gw_addresses_keys[j], 'public_ip') + if i == 2: + self.assertEqual(keys[i], 'nat_gateway_id') + if i == 3: + self.assertEqual(keys[i], 'state') + if i == 4: + self.assertEqual(keys[i], 'subnet_id') + if i == 5: + self.assertEqual(keys[i], 'vpc_id') + + def test_get_nat_gateways(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, stream = ( + ng.get_nat_gateways(client, 'subnet-123456789', check_mode=True) + ) + should_return = ng.DRY_RUN_GATEWAYS + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_get_nat_gateways_no_gateways_found(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, stream = ( + ng.get_nat_gateways(client, 'subnet-1234567', check_mode=True) + ) + self.assertTrue(success) + self.assertEqual(stream, []) + + def test_wait_for_status(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, gws = ( + ng.wait_for_status( + client, 5, 'nat-123456789', 'available', check_mode=True + ) + ) + should_return = ng.DRY_RUN_GATEWAYS[0] + self.assertTrue(success) + self.assertEqual(gws, should_return) + + def test_wait_for_status_to_timeout(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, gws = ( + ng.wait_for_status( + client, 2, 'nat-12345678', 'available', check_mode=True + ) + ) + self.assertFalse(success) + self.assertEqual(gws, {}) + + def test_gateway_in_subnet_exists_with_allocation_id(self): + client = boto3.client('ec2', region_name=aws_region) + gws, err_msg = ( + ng.gateway_in_subnet_exists( + client, 'subnet-123456789', 'eipalloc-1234567', check_mode=True + ) + ) + should_return = ng.DRY_RUN_GATEWAYS + self.assertEqual(gws, should_return) + + def test_gateway_in_subnet_exists_with_allocation_id_does_not_exist(self): + client = boto3.client('ec2', region_name=aws_region) + gws, err_msg = ( + ng.gateway_in_subnet_exists( + client, 'subnet-123456789', 'eipalloc-123', check_mode=True + ) + ) + should_return = list() + self.assertEqual(gws, should_return) + + def test_gateway_in_subnet_exists_without_allocation_id(self): + client = boto3.client('ec2', region_name=aws_region) + gws, err_msg = ( + ng.gateway_in_subnet_exists( + client, 'subnet-123456789', check_mode=True + ) + ) + should_return = ng.DRY_RUN_GATEWAYS + self.assertEqual(gws, should_return) + + def test_get_eip_allocation_id_by_address(self): + client = boto3.client('ec2', region_name=aws_region) + allocation_id, _ = ( + ng.get_eip_allocation_id_by_address( + client, '55.55.55.55', check_mode=True + ) + ) + should_return = 'eipalloc-1234567' + self.assertEqual(allocation_id, should_return) + + def test_get_eip_allocation_id_by_address_does_not_exist(self): + client = boto3.client('ec2', region_name=aws_region) + allocation_id, err_msg = ( + ng.get_eip_allocation_id_by_address( + client, '52.52.52.52', check_mode=True + ) + ) + self.assertEqual(err_msg, 'EIP 52.52.52.52 does not exist') + self.assertTrue(allocation_id is None) + + def test_allocate_eip_address(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, eip_id = ( + ng.allocate_eip_address( + client, check_mode=True + ) + ) + self.assertTrue(success) + + def test_release_address(self): + client = boto3.client('ec2', region_name=aws_region) + success, _ = ( + ng.release_address( + client, 'eipalloc-1234567', check_mode=True + ) + ) + self.assertTrue(success) + + def test_create(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.create( + client, 'subnet-123456', 'eipalloc-1234567', check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + + def test_pre_create(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.pre_create( + client, 'subnet-123456', check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + + def test_pre_create_idemptotent_with_allocation_id(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.pre_create( + client, 'subnet-123456789', allocation_id='eipalloc-1234567', check_mode=True + ) + ) + self.assertTrue(success) + self.assertFalse(changed) + + def test_pre_create_idemptotent_with_eip_address(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.pre_create( + client, 'subnet-123456789', eip_address='55.55.55.55', check_mode=True + ) + ) + self.assertTrue(success) + self.assertFalse(changed) + + def test_pre_create_idemptotent_if_exist_do_not_create(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.pre_create( + client, 'subnet-123456789', if_exist_do_not_create=True, check_mode=True + ) + ) + self.assertTrue(success) + self.assertFalse(changed) + + def test_delete(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, _ = ( + ng.remove( + client, 'nat-123456789', check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + + def test_delete_and_release_ip(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, _ = ( + ng.remove( + client, 'nat-123456789', release_eip=True, check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + + def test_delete_if_does_not_exist(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, _ = ( + ng.remove( + client, 'nat-12345', check_mode=True + ) + ) + self.assertFalse(success) + self.assertFalse(changed) + +def main(): + unittest.main() + +if __name__ == '__main__': + main() diff --git a/test/units/modules/extras/cloud/amazon/test_kinesis_stream.py b/test/units/modules/extras/cloud/amazon/test_kinesis_stream.py new file mode 100644 index 00000000000..7b5dcff67c4 --- /dev/null +++ b/test/units/modules/extras/cloud/amazon/test_kinesis_stream.py @@ -0,0 +1,296 @@ +#!/usr/bin/python + +from nose.plugins.skip import SkipTest + +try: + import boto3 + import botocore + HAS_BOTO3 = True +except ImportError: + HAS_BOTO3 = False + +if not HAS_BOTO3: + raise SkipTest("test_kinesis_stream.py requires the python module 'boto3' and 'botocore'") + +import unittest + +import ansible.modules.extras.cloud.amazon.kinesis_stream as kinesis_stream + +aws_region = 'us-west-2' + + +class AnsibleKinesisStreamFunctions(unittest.TestCase): + + def test_convert_to_lower(self): + example = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + converted_example = kinesis_stream.convert_to_lower(example) + keys = list(converted_example.keys()) + keys.sort() + for i in range(len(keys)): + if i == 0: + self.assertEqual(keys[i], 'has_more_shards') + if i == 1: + self.assertEqual(keys[i], 'retention_period_hours') + if i == 2: + self.assertEqual(keys[i], 'stream_arn') + if i == 3: + self.assertEqual(keys[i], 'stream_name') + if i == 4: + self.assertEqual(keys[i], 'stream_status') + + def test_make_tags_in_aws_format(self): + example = { + 'env': 'development' + } + should_return = [ + { + 'Key': 'env', + 'Value': 'development' + } + ] + aws_tags = kinesis_stream.make_tags_in_aws_format(example) + self.assertEqual(aws_tags, should_return) + + def test_make_tags_in_proper_format(self): + example = [ + { + 'Key': 'env', + 'Value': 'development' + }, + { + 'Key': 'service', + 'Value': 'web' + } + ] + should_return = { + 'env': 'development', + 'service': 'web' + } + proper_tags = kinesis_stream.make_tags_in_proper_format(example) + self.assertEqual(proper_tags, should_return) + + def test_recreate_tags_from_list(self): + example = [('environment', 'development'), ('service', 'web')] + should_return = [ + { + 'Key': 'environment', + 'Value': 'development' + }, + { + 'Key': 'service', + 'Value': 'web' + } + ] + aws_tags = kinesis_stream.recreate_tags_from_list(example) + self.assertEqual(aws_tags, should_return) + + def test_get_tags(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True) + self.assertTrue(success) + should_return = [ + { + 'Key': 'DryRunMode', + 'Value': 'true' + } + ] + self.assertEqual(tags, should_return) + + def test_find_stream(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg, stream = ( + kinesis_stream.find_stream(client, 'test', check_mode=True) + ) + should_return = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_wait_for_status(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg, stream = ( + kinesis_stream.wait_for_status( + client, 'test', 'ACTIVE', check_mode=True + ) + ) + should_return = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_tags_action_create(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'create', check_mode=True + ) + ) + self.assertTrue(success) + + def test_tags_action_delete(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'delete', check_mode=True + ) + ) + self.assertTrue(success) + + def test_tags_action_invalid(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'append', check_mode=True + ) + ) + self.assertFalse(success) + + def test_update_tags(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg = ( + kinesis_stream.update_tags( + client, 'test', tags, check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_create(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'create', check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_delete(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'delete', check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_invalid(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'append', check_mode=True + ) + ) + self.assertFalse(success) + + def test_retention_action_increase(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 48, 'increase', check_mode=True + ) + ) + self.assertTrue(success) + + def test_retention_action_decrease(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 24, 'decrease', check_mode=True + ) + ) + self.assertTrue(success) + + def test_retention_action_invalid(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 24, 'create', check_mode=True + ) + ) + self.assertFalse(success) + + def test_update(self): + client = boto3.client('kinesis', region_name=aws_region) + current_stream = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg = ( + kinesis_stream.update( + client, current_stream, 'test', retention_period=48, + tags=tags, check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') + + def test_create_stream(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg, results = ( + kinesis_stream.create_stream( + client, 'test', number_of_shards=10, retention_period=48, + tags=tags, check_mode=True + ) + ) + should_return = { + 'has_more_shards': True, + 'retention_period_hours': 24, + 'stream_name': 'test', + 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'stream_status': 'ACTIVE', + 'tags': tags, + } + self.assertTrue(success) + self.assertTrue(changed) + self.assertEqual(results, should_return) + self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') + + +def main(): + unittest.main() + +if __name__ == '__main__': + main()