From 4e1c3a58b376687e4f028e387a5296520372818b Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Wed, 23 Mar 2016 19:09:36 -0700 Subject: [PATCH 01/10] Create, Delete, and Modify a Kinesis Stream. * Create a Kinesis Stream. * Tag a Kinesis Stream. * Update the Retention Period of a Kinesis Stream. * Delete a Kinesis Stream. * Wait for a Kinesis Stream to be in an ACTIVE State. --- cloud/amazon/kinesis_stream.py | 1053 ++++++++++++++++++++++++++++++++ 1 file changed, 1053 insertions(+) create mode 100644 cloud/amazon/kinesis_stream.py diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py new file mode 100644 index 00000000000..cc280655a52 --- /dev/null +++ b/cloud/amazon/kinesis_stream.py @@ -0,0 +1,1053 @@ +#!/usr/bin/python +# +# This is a free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This Ansible library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this library. If not, see . + +DOCUMENTATION = ''' +--- +module: kinesis_stream +short_description: Manage a Kinesis Stream. +description: + - Create or Delete a Kinesis Stream. + - Update the retention period of a Kinesis Stream. + - Update Tags on a Kinesis Stream. +version_added: "2.1" +author: Allen Sanabria (@linuxdynasty) +options: + name: + description: + - "The name of the Kinesis Stream you are managing." + default: None + required: true + shards: + description: + - "The number of shards you want to have with this stream. This can not + be modified after being created." + - "This is required when state == present" + required: false + default: None + retention_period: + description: + - "The default retention period is 24 hours and can not be less than 24 + hours." + - "The retention period can be modified during any point in time." + required: false + default: None + state: + description: + - "Create or Delete the Kinesis Stream." + required: false + default: present + choices: [ 'present', 'absent' ] + wait: + description: + - Wait for operation to complete before returning + required: false + default: true + wait_timeout: + description: + - How many seconds to wait for an operation to complete before timing out + required: false + default: 300 + tags: + description: + - "A dictionary of resource tags of the form: { tag1: value1, tag2: value2 }." + required: false + default: null + aliases: [ "resource_tags" ] +extends_documentation_fragment: + - aws + - ec2 +''' + +EXAMPLES = ''' +# Note: These examples do not set authentication details, see the AWS Guide for details. + +# Basic creation example: +- name: Set up Kinesis Stream with 10 shards and wait for the stream to become ACTIVE + kinesis_stream: + name: test-stream + shards: 10 + wait: yes + wait_timeout: 600 + register: test_stream + +# Basic creation example with tags: +- name: Set up Kinesis Stream with 10 shards, tag the environment, and wait for the stream to become ACTIVE + kinesis_stream: + name: test-stream + shards: 10 + tags: + Env: development + wait: yes + wait_timeout: 600 + register: test_stream + +# Basic creation example with tags and increase the retention period from the default 24 hours to 48 hours: +- name: Set up Kinesis Stream with 10 shards, tag the environment, increase the retention period and wait for the stream to become ACTIVE + kinesis_stream: + name: test-stream + retention_period: 48 + shards: 10 + tags: + Env: development + wait: yes + wait_timeout: 600 + register: test_stream + +# Basic delete example: +- name: Delete Kinesis Stream test-stream and wait for it to finish deleting. + kinesis_stream: + name: test-stream + wait: yes + wait_timeout: 600 + register: test_stream +''' + +RETURN = ''' +stream_name: + description: The name of the Kinesis Stream. + returned: when state == present. + type: string + sample: "test-stream" +stream_arn: + description: The amazon resource identifier + returned: when state == present. + type: string + sample: "arn:aws:kinesis:east-side:123456789:stream/test-stream" +stream_status: + description: The current state of the Kinesis Stream. + returned: when state == present. + type: string + sample: "ACTIVE" +retention_period_hours: + description: Number of hours messages will be kept for a Kinesis Stream. + returned: when state == present. + type: int + sample: 24 +tags: + description: Dictionary containing all the tags associated with the Kinesis stream. + returned: when state == present. + type: dict + sample: { + "Name": "Splunk", + "Env": "development" + } +''' + +try: + import botocore + import boto3 + HAS_BOTO3 = True +except ImportError: + HAS_BOTO3 = False + +import re +import datetime +import time +from functools import reduce + +def convert_to_lower(data): + """Convert all uppercase keys in dict with lowercase_ + Args: + data (dict): Dictionary with keys that have upper cases in them + Example.. FooBar == foo_bar + if a val is of type datetime.datetime, it will be converted to + the ISO 8601 + + Basic Usage: + >>> test = {'FooBar': []} + >>> test = convert_to_lower(test) + { + 'foo_bar': [] + } + + Returns: + Dictionary + """ + results = dict() + if isinstance(data, dict): + for key, val in data.items(): + key = re.sub(r'(([A-Z]{1,3}){1})', r'_\1', key).lower() + if key[0] == '_': + key = key[1:] + if isinstance(val, datetime.datetime): + results[key] = val.isoformat() + elif isinstance(val, dict): + results[key] = convert_to_lower(val) + elif isinstance(val, list): + converted = list() + for item in val: + converted.append(convert_to_lower(item)) + results[key] = converted + else: + results[key] = val + return results + +def make_tags_in_proper_format(tags): + """Take a dictionary of tags and convert them into the AWS Tags format. + Args: + tags (list): The tags you want applied. + + Basic Usage: + >>> tags = [{u'Key': 'env', u'Value': 'development'}] + >>> make_tags_in_proper_format(tags) + { + "env": "development", + } + + Returns: + Dict + """ + formatted_tags = dict() + for tag in tags: + formatted_tags[tag.get('Key')] = tag.get('Value') + + return formatted_tags + +def make_tags_in_aws_format(tags): + """Take a dictionary of tags and convert them into the AWS Tags format. + Args: + tags (dict): The tags you want applied. + + Basic Usage: + >>> tags = {'env': 'development', 'service': 'web'} + >>> make_tags_in_proper_format(tags) + [ + { + "Value": "web", + "Key": "service" + }, + { + "Value": "development", + "key": "env" + } + ] + + Returns: + List + """ + formatted_tags = list() + for key, val in tags.items(): + formatted_tags.append({ + 'Key': key, + 'Value': val + }) + + return formatted_tags + +def get_tags(client, stream_name, check_mode=False): + """Retrieve the tags for a Kinesis Stream. + Args: + client (botocore.client.EC2): Boto3 client. + stream_name (str): Name of the Kinesis stream. + + Kwargs: + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> stream_name = 'test-stream' + >> get_tags(client, stream_name) + + Returns: + Tuple (bool, str, dict) + """ + err_msg = '' + success = False + params = { + 'StreamName': stream_name, + } + results = dict() + try: + if not check_mode: + results = ( + client.list_tags_for_stream(**params)['Tags'] + ) + else: + results = [ + { + 'Key': 'DryRunMode', + 'Value': 'true' + }, + ] + success = True + except botocore.exceptions.ClientError, e: + err_msg = str(e) + + return success, err_msg, results + +def find_stream(client, stream_name, limit=1, check_mode=False): + """Retrieve a Kinesis Stream. + Args: + client (botocore.client.EC2): Boto3 client. + stream_name (str): Name of the Kinesis stream. + + Kwargs: + limit (int): Limit the number of shards to return within a stream. + default=1 + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> stream_name = 'test-stream' + + Returns: + Tuple (bool, str, dict) + """ + err_msg = '' + success = False + params = { + 'StreamName': stream_name, + 'Limit': limit + } + results = dict() + try: + if not check_mode: + results = ( + client.describe_stream(**params)['StreamDescription'] + ) + results.pop('Shards') + else: + results = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': stream_name, + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/{0}'.format(stream_name), + 'StreamStatus': u'ACTIVE' + } + success = True + except botocore.exceptions.ClientError, e: + err_msg = str(e) + + return success, err_msg, results + +def wait_for_status(client, stream_name, status, wait_timeout=300, + check_mode=False): + """Wait for the the status to change for a Kinesis Stream. + Args: + client (botocore.client.EC2): Boto3 client + stream_name (str): The name of the kinesis stream. + status (str): The status to wait for. + examples. status=available, status=deleted + + Kwargs: + wait_timeout (int): Number of seconds to wait, until this timeout is reached. + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> stream_name = 'test-stream' + >>> wait_for_status(client, stream_name, 'ACTIVE', 300) + + Returns: + Tuple (bool, str, dict) + """ + polling_increment_secs = 5 + wait_timeout = time.time() + wait_timeout + status_achieved = False + stream = dict() + err_msg = "" + + if not check_mode: + while wait_timeout > time.time(): + try: + find_success, find_msg, stream = ( + find_stream(client, stream_name) + ) + if status != 'DELETING': + if find_success and stream: + if stream.get('StreamStatus') == status: + status_achieved = True + break + elif status == 'DELETING': + if not find_success: + status_achieved = True + break + else: + time.sleep(polling_increment_secs) + except botocore.exceptions.ClientError as e: + err_msg = str(e) + + else: + status_achieved = True + find_success, find_msg, stream = ( + find_stream(client, stream_name, check_mode=check_mode) + ) + + if not status_achieved: + err_msg = "Wait time out reached, while waiting for results" + + return status_achieved, err_msg, stream + +def tags_action(client, stream_name, tags, action='create', check_mode=False): + """Create or delete multiple tags from a Kinesis Stream. + Args: + client (botocore.client.EC2): Boto3 client. + resource_id (str): The Amazon resource id. + tags (list): List of dictionaries. + examples.. [{Name: "", Values: [""]}] + + Kwargs: + action (str): The action to perform. + valid actions == create and delete + default=create + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('ec2') + >>> resource_id = 'pcx-123345678' + >>> tags = {'env': 'development'} + >>> update_tags(client, resource_id, tags) + [True, ''] + + Returns: + List (bool, str) + """ + success = False + err_msg = "" + params = {'StreamName': stream_name} + try: + if not check_mode: + if action == 'create': + params['Tags'] = tags + client.add_tags_to_stream(**params) + success = True + elif action == 'delete': + params['TagKeys'] = tags.keys() + client.remove_tags_from_stream(**params) + success = True + else: + err_msg = 'Invalid action {0}'.format(action) + else: + if action == 'create': + success = True + elif action == 'delete': + success = True + else: + err_msg = 'Invalid action {0}'.format(action) + + except botocore.exceptions.ClientError, e: + err_msg = str(e) + + return success, err_msg + +def recreate_tags_from_list(list_of_tags): + """Recreate tags from a list of tuples into the Amazon Tag format. + Args: + list_of_tags (list): List of tuples. + + Basic Usage: + >>> list_of_tags = [('Env', 'Development')] + >>> recreate_tags_from_list(list_of_tags) + [ + { + "Value": "Development", + "Key": "Env" + } + ] + + Returns: + List + """ + tags = list() + i = 0 + list_of_tags = list_of_tags + for i in range(len(list_of_tags)): + key_name = list_of_tags[i][0] + key_val = list_of_tags[i][1] + tags.append( + { + 'Key': key_name, + 'Value': key_val + } + ) + return tags + +def update_tags(client, stream_name, tags, check_mode=False): + """Update tags for an amazon resource. + Args: + resource_id (str): The Amazon resource id. + tags (dict): Dictionary of tags you want applied to the Kinesis stream. + + Kwargs: + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('ec2') + >>> stream_name = 'test-stream' + >>> tags = {'env': 'development'} + >>> update_tags(client, stream_name, tags) + [True, ''] + + Return: + Tuple (bool, str) + """ + success = False + err_msg = '' + tag_success, tag_msg, current_tags = get_tags(client, stream_name) + if current_tags: + tags = make_tags_in_aws_format(tags) + current_tags_set = ( + set( + reduce( + lambda x, y: x + y, + [make_tags_in_proper_format(current_tags).items()] + ) + ) + ) + + new_tags_set = ( + set( + reduce( + lambda x, y: x + y, + [make_tags_in_proper_format(tags).items()] + ) + ) + ) + tags_to_delete = list(current_tags_set.difference(new_tags_set)) + tags_to_update = list(new_tags_set.difference(current_tags_set)) + if tags_to_delete: + tags_to_delete = make_tags_in_proper_format( + recreate_tags_from_list(tags_to_delete) + ) + delete_success, delete_msg = ( + tags_action( + client, stream_name, tags_to_delete, action='delete', + check_mode=check_mode + ) + ) + if not delete_success: + return delete_success, delete_msg + if tags_to_update: + tags = make_tags_in_proper_format( + recreate_tags_from_list(tags_to_update) + ) + else: + return True, 'Tags do not need to be updated' + + if tags: + create_success, create_msg = ( + tags_action( + client, stream_name, tags, action='create', + check_mode=check_mode + ) + ) + return create_success, create_msg + + return success, err_msg + +def stream_action(client, stream_name, shard_count=1, action='create', + timeout=300, check_mode=False): + """Create or Delete an Amazon Kinesis Stream. + Args: + client (botocore.client.EC2): Boto3 client. + stream_name (str): The name of the kinesis stream. + + Kwargs: + shard_count (int): Number of shards this stream will use. + action (str): The action to perform. + valid actions == create and delete + default=create + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> stream_name = 'test-stream' + >>> shard_count = 20 + >>> stream_action(client, stream_name, shard_count, action='create') + + Returns: + List (bool, str) + """ + success = False + err_msg = '' + params = { + 'StreamName': stream_name + } + try: + if not check_mode: + if action == 'create': + params['ShardCount'] = shard_count + client.create_stream(**params) + success = True + elif action == 'delete': + client.delete_stream(**params) + success = True + else: + err_msg = 'Invalid action {0}'.format(action) + else: + if action == 'create': + success = True + elif action == 'delete': + success = True + else: + err_msg = 'Invalid action {0}'.format(action) + + except botocore.exceptions.ClientError, e: + err_msg = str(e) + + return success, err_msg + +def retention_action(client, stream_name, retention_period=24, + action='increase', check_mode=False): + """Increase or Decreaste the retention of messages in the Kinesis stream. + Args: + client (botocore.client.EC2): Boto3 client. + stream_name (str): The + + Kwargs: + retention_period (int): This is how long messages will be kept before + they are discarded. This can not be less than 24 hours. + action (str): The action to perform. + valid actions == create and delete + default=create + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> stream_name = 'test-stream' + >>> retention_period = 48 + >>> stream_action(client, stream_name, retention_period, action='create') + + Returns: + Tuple (bool, str) + """ + success = False + err_msg = '' + params = { + 'StreamName': stream_name + } + try: + if not check_mode: + if action == 'increase': + params['RetentionPeriodHours'] = retention_period + client.increase_stream_retention_period(**params) + success = True + elif action == 'decrease': + params['RetentionPeriodHours'] = retention_period + client.decrease_stream_retention_period(**params) + success = True + else: + err_msg = 'Invalid action {0}'.format(action) + else: + if action == 'increase': + success = True + elif action == 'decrease': + success = True + else: + err_msg = 'Invalid action {0}'.format(action) + + except botocore.exceptions.ClientError, e: + err_msg = str(e) + + return success, err_msg + +def update(client, current_stream, stream_name, retention_period=None, + tags=None, wait=False, wait_timeout=300, check_mode=False): + """Update an Amazon Kinesis Stream. + Args: + client (botocore.client.EC2): Boto3 client. + stream_name (str): The name of the kinesis stream. + + Kwargs: + retention_period (int): This is how long messages will be kept before + they are discarded. This can not be less than 24 hours. + tags (dict): The tags you want applied. + wait (bool): Wait until Stream is ACTIVE. + default=False + wait_timeout (int): How long to wait until this operation is considered failed. + default=300 + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> current_stream = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test-stream', + 'StreamARN': 'arn:aws:kinesis:us-west-2:123456789:stream/test-stream', + 'StreamStatus': "ACTIVE' + } + >>> stream_name = 'test-stream' + >>> retention_period = 48 + >>> stream_action(client, current_stream, stream_name, + retention_period, action='create' ) + + Returns: + Tuple (bool, bool, str, dict) + """ + success = False + changed = False + err_msg = '' + if retention_period: + if wait: + wait_success, wait_msg, current_stream = ( + wait_for_status( + client, stream_name, 'ACTIVE', wait_timeout, + check_mode=check_mode + ) + ) + if not wait_success: + return wait_success, True, wait_msg + + if current_stream['StreamStatus'] == 'ACTIVE': + if retention_period > current_stream['RetentionPeriodHours']: + retention_changed, retention_msg = ( + retention_action( + client, stream_name, retention_period, action='increase', + check_mode=check_mode + ) + ) + if retention_changed: + success = True + + elif retention_period < current_stream['RetentionPeriodHours']: + retention_changed, retention_msg = ( + retention_action( + client, stream_name, retention_period, action='decrease', + check_mode=check_mode + ) + ) + if retention_changed: + success = True + + elif retention_period == current_stream['RetentionPeriodHours']: + retention_changed = False + retention_msg = ( + 'Retention {0} is the same as {1}' + .format( + retention_period, + current_stream['RetentionPeriodHours'] + ) + ) + success = True + + changed = retention_changed + err_msg = retention_msg + if changed and wait: + wait_success, wait_msg, current_stream = ( + wait_for_status( + client, stream_name, 'ACTIVE', wait_timeout, + check_mode=check_mode + ) + ) + if not wait_success: + return wait_success, True, wait_msg + elif changed and not wait: + stream_found, stream_msg, current_stream = ( + find_stream(client, stream_name, check_mode=check_mode) + ) + if stream_found: + if current_stream['StreamStatus'] != 'ACTIVE': + err_msg = ( + 'Retention Period for {0} is in the process of updating' + .format(stream_name) + ) + return success, changed, err_msg + else: + err_msg = ( + 'StreamStatus has to be ACTIVE in order to modify the retention period. Current status is {0}' + .format(current_stream['StreamStatus']) + ) + return success, changed, err_msg + + if tags: + changed, err_msg = update_tags(client, stream_name, tags, check_mode) + if changed: + success = True + if wait: + success, err_msg, _ = ( + wait_for_status( + client, stream_name, 'ACTIVE', wait_timeout, + check_mode=check_mode + ) + ) + if success and changed: + err_msg = 'Kinesis Stream {0} updated successfully'.format(stream_name) + elif success and not changed: + err_msg = 'Kinesis Stream {0} did not changed'.format(stream_name) + + return success, changed, err_msg + +def create_stream(client, stream_name, number_of_shards=1, retention_period=None, + tags=None, wait=False, wait_timeout=300, check_mode=None): + """Create an Amazon Kinesis Stream. + Args: + client (botocore.client.EC2): Boto3 client. + stream_name (str): The name of the kinesis stream. + + Kwargs: + number_of_shards (int): Number of shards this stream will use. + default=1 + retention_period (int): Can not be less than 24 hours + default=None + tags (dict): The tags you want applied. + default=None + wait (bool): Wait until Stream is ACTIVE. + default=False + wait_timeout (int): How long to wait until this operation is considered failed. + default=300 + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> stream_name = 'test-stream' + >>> number_of_shards = 10 + >>> tags = {'env': 'test'} + >>> create_stream(client, stream_name, number_of_shards, tags=tags) + + Returns: + Tuple (bool, bool, str, dict) + """ + success = False + changed = False + err_msg = '' + results = dict() + + stream_found, stream_msg, current_stream = ( + find_stream(client, stream_name, check_mode=check_mode) + ) + if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait: + wait_success, wait_msg, current_stream = ( + wait_for_status( + client, stream_name, 'ACTIVE', wait_timeout, check_mode + ) + ) + if stream_found and current_stream['StreamStatus'] != 'DELETING': + success, changed, err_msg = update( + client, current_stream, stream_name, retention_period, tags, + wait, wait_timeout, check_mode + ) + else: + create_success, create_msg = ( + stream_action( + client, stream_name, number_of_shards, action='create', + check_mode=check_mode + ) + ) + if create_success: + changed = True + if wait: + wait_success, wait_msg, results = ( + wait_for_status( + client, stream_name, 'ACTIVE', wait_timeout, check_mode + ) + ) + err_msg = ( + 'Kinesis Stream {0} is in the process of being created' + .format(stream_name) + ) + if not wait_success: + return wait_success, True, wait_msg, results + else: + err_msg = ( + 'Kinesis Stream {0} created successfully' + .format(stream_name) + ) + + if tags: + changed, err_msg = ( + tags_action( + client, stream_name, tags, action='create', + check_mode=check_mode + ) + ) + if changed: + success = True + if not success: + return success, changed, err_msg, results + + stream_found, stream_msg, current_stream = ( + find_stream(client, stream_name, check_mode=check_mode) + ) + if retention_period and current_stream['StreamStatus'] == 'ACTIVE': + changed, err_msg = ( + retention_action( + client, stream_name, retention_period, action='increase', + check_mode=check_mode + ) + ) + if changed: + success = True + if not success: + return success, changed, err_msg, results + else: + err_msg = ( + 'StreamStatus has to be ACTIVE in order to modify the retention period. Current status is {0}' + .format(current_stream['StreamStatus']) + ) + success = create_success + changed = True + + if success: + _, _, results = ( + find_stream(client, stream_name, check_mode=check_mode) + ) + _, _, current_tags = ( + get_tags(client, stream_name, check_mode=check_mode) + ) + if current_tags and not check_mode: + current_tags = make_tags_in_proper_format(current_tags) + results['Tags'] = current_tags + elif check_mode and tags: + results['Tags'] = tags + else: + results['Tags'] = dict() + results = convert_to_lower(results) + + return success, changed, err_msg, results + +def delete_stream(client, stream_name, wait=False, wait_timeout=300, + check_mode=False): + """Delete an Amazon Kinesis Stream. + Args: + client (botocore.client.EC2): Boto3 client. + stream_name (str): The name of the kinesis stream. + + Kwargs: + wait (bool): Wait until Stream is ACTIVE. + default=False + wait_timeout (int): How long to wait until this operation is considered failed. + default=300 + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> stream_name = 'test-stream' + >>> delete_stream(client, stream_name) + + Returns: + Tuple (bool, bool, str, dict) + """ + success = False + changed = False + err_msg = '' + results = dict() + stream_found, stream_msg, current_stream = find_stream(client, stream_name) + if stream_found: + success, err_msg = ( + stream_action( + client, stream_name, action='delete', check_mode=check_mode + ) + ) + if success: + changed = True + if wait: + success, err_msg, results = ( + wait_for_status( + client, stream_name, 'DELETING', wait_timeout, + check_mode + ) + ) + err_msg = 'Stream {0} deleted successfully'.format(stream_name) + if not success: + return success, True, err_msg, results + else: + err_msg = ( + 'Stream {0} is in the process of being deleted' + .format(stream_name) + ) + else: + success = True + changed = False + err_msg = 'Stream {0} does not exist'.format(stream_name) + + return success, changed, err_msg, results + +def main(): + argument_spec = ec2_argument_spec() + argument_spec.update( + dict( + name = dict(default=None, required=True), + shards = dict(default=None, required=False, type='int'), + retention_period = dict(default=None, required=False, type='int'), + tags = dict(default=None, required=False, type='dict', aliases=['resource_tags']), + wait = dict(default=True, required=False, type='bool'), + wait_timeout = dict(default=300, required=False, type='int'), + state = dict(default='present', choices=['present', 'absent']), + ) + ) + module = AnsibleModule( + argument_spec=argument_spec, + supports_check_mode=True, + ) + + retention_period = module.params.get('retention_period') + stream_name = module.params.get('name') + shards = module.params.get('shards') + state = module.params.get('state') + tags = module.params.get('tags') + wait = module.params.get('wait') + wait_timeout = module.params.get('wait_timeout') + + if state == 'present' and not shards: + module.fail_json(msg='shards is required when state == present.') + + if not HAS_BOTO3: + module.fail_json(msg='boto3 is required.') + + check_mode = module.check_mode + try: + region, ec2_url, aws_connect_kwargs = ( + get_aws_connection_info(module, boto3=True) + ) + client = ( + boto3_conn( + module, conn_type='client', resource='kinesis', + region=region, endpoint=ec2_url, **aws_connect_kwargs + ) + ) + except botocore.exceptions.ClientError, e: + err_msg = 'Boto3 Client Error - {0}'.format(str(e.msg)) + module.fail_json( + success=False, changed=False, result={}, msg=err_msg + ) + + if state == 'present': + success, changed, err_msg, results = ( + create_stream( + client, stream_name, shards, retention_period, tags, + wait, wait_timeout, check_mode + ) + ) + elif state == 'absent': + success, changed, err_msg, results = ( + delete_stream(client, stream_name, wait, wait_timeout, check_mode) + ) + + if success: + module.exit_json( + success=success, changed=changed, msg=err_msg, **results + ) + else: + module.fail_json( + success=success, changed=changed, msg=err_msg, result=results + ) + +# import module snippets +from ansible.module_utils.basic import * +from ansible.module_utils.ec2 import * + +if __name__ == '__main__': + main() From 514e285d1af7e10252e657c9497816e90d86475f Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Wed, 23 Mar 2016 19:21:24 -0700 Subject: [PATCH 02/10] update doc string --- cloud/amazon/kinesis_stream.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py index cc280655a52..adc1fbe8c80 100644 --- a/cloud/amazon/kinesis_stream.py +++ b/cloud/amazon/kinesis_stream.py @@ -109,6 +109,7 @@ EXAMPLES = ''' - name: Delete Kinesis Stream test-stream and wait for it to finish deleting. kinesis_stream: name: test-stream + state: absent wait: yes wait_timeout: 600 register: test_stream From 988f468457978011ceee6d545fba0a23cbd2943d Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Thu, 24 Mar 2016 13:33:19 -0700 Subject: [PATCH 03/10] Added test to kinesis_stream module. * Update kinesis_stream based on tests. * Added tests for kinesis_stream. --- cloud/amazon/kinesis_stream.py | 55 ++-- cloud/amazon/test_kinesis_stream.py | 471 ++++++++++++++++++++++++++++ 2 files changed, 497 insertions(+), 29 deletions(-) create mode 100644 cloud/amazon/test_kinesis_stream.py diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py index adc1fbe8c80..222a50dd0b3 100644 --- a/cloud/amazon/kinesis_stream.py +++ b/cloud/amazon/kinesis_stream.py @@ -201,7 +201,7 @@ def make_tags_in_proper_format(tags): tags (list): The tags you want applied. Basic Usage: - >>> tags = [{u'Key': 'env', u'Value': 'development'}] + >>> tags = [{'Key': 'env', 'Value': 'development'}] >>> make_tags_in_proper_format(tags) { "env": "development", @@ -327,7 +327,7 @@ def find_stream(client, stream_name, limit=1, check_mode=False): 'RetentionPeriodHours': 24, 'StreamName': stream_name, 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/{0}'.format(stream_name), - 'StreamStatus': u'ACTIVE' + 'StreamStatus': 'ACTIVE' } success = True except botocore.exceptions.ClientError, e: @@ -363,31 +363,24 @@ def wait_for_status(client, stream_name, status, wait_timeout=300, stream = dict() err_msg = "" - if not check_mode: - while wait_timeout > time.time(): - try: - find_success, find_msg, stream = ( - find_stream(client, stream_name) - ) - if status != 'DELETING': - if find_success and stream: - if stream.get('StreamStatus') == status: - status_achieved = True - break - elif status == 'DELETING': - if not find_success: + while wait_timeout > time.time(): + try: + find_success, find_msg, stream = ( + find_stream(client, stream_name, check_mode=check_mode) + ) + if status != 'DELETING': + if find_success and stream: + if stream.get('StreamStatus') == status: status_achieved = True break - else: - time.sleep(polling_increment_secs) - except botocore.exceptions.ClientError as e: - err_msg = str(e) - - else: - status_achieved = True - find_success, find_msg, stream = ( - find_stream(client, stream_name, check_mode=check_mode) - ) + elif status == 'DELETING': + if not find_success: + status_achieved = True + break + else: + time.sleep(polling_increment_secs) + except botocore.exceptions.ClientError as e: + err_msg = str(e) if not status_achieved: err_msg = "Wait time out reached, while waiting for results" @@ -694,7 +687,7 @@ def update(client, current_stream, stream_name, retention_period=None, retention_period, action='create' ) Returns: - Tuple (bool, bool, str, dict) + Tuple (bool, bool, str) """ success = False changed = False @@ -783,9 +776,9 @@ def update(client, current_stream, stream_name, retention_period=None, ) ) if success and changed: - err_msg = 'Kinesis Stream {0} updated successfully'.format(stream_name) + err_msg = 'Kinesis Stream {0} updated successfully.'.format(stream_name) elif success and not changed: - err_msg = 'Kinesis Stream {0} did not changed'.format(stream_name) + err_msg = 'Kinesis Stream {0} did not changed.'.format(stream_name) return success, changed, err_msg @@ -1003,7 +996,11 @@ def main(): wait_timeout = module.params.get('wait_timeout') if state == 'present' and not shards: - module.fail_json(msg='shards is required when state == present.') + module.fail_json(msg='Shards is required when state == present.') + + if retention_period: + if retention_period < 24: + module.fail_json(msg='Retention period can not be less than 24 hours.') if not HAS_BOTO3: module.fail_json(msg='boto3 is required.') diff --git a/cloud/amazon/test_kinesis_stream.py b/cloud/amazon/test_kinesis_stream.py new file mode 100644 index 00000000000..5b8fefd40db --- /dev/null +++ b/cloud/amazon/test_kinesis_stream.py @@ -0,0 +1,471 @@ +#!/usr/bin/python + +import unittest + +from collections import namedtuple +from ansible.parsing.dataloader import DataLoader +from ansible.vars import VariableManager +from ansible.inventory import Inventory +from ansible.playbook.play import Play +from ansible.executor.task_queue_manager import TaskQueueManager + +import kinesis_stream +import boto3 + +Options = ( + namedtuple( + 'Options', [ + 'connection', 'module_path', 'forks', 'become', 'become_method', + 'become_user', 'remote_user', 'private_key_file', 'ssh_common_args', + 'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity', + 'check' + ] + ) +) +# initialize needed objects +variable_manager = VariableManager() +loader = DataLoader() +options = ( + Options( + connection='local', + module_path='./', + forks=1, become=None, become_method=None, become_user=None, check=True, + remote_user=None, private_key_file=None, ssh_common_args=None, + sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, + verbosity=10 + ) +) +passwords = dict(vault_pass='') + +# create inventory and pass to var manager +inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost') +variable_manager.set_inventory(inventory) + +def run(play): + tqm = None + results = None + try: + tqm = TaskQueueManager( + inventory=inventory, + variable_manager=variable_manager, + loader=loader, + options=options, + passwords=passwords, + stdout_callback='default', + ) + results = tqm.run(play) + finally: + if tqm is not None: + tqm.cleanup() + return tqm, results + +class AnsibleKinesisStreamTasks(unittest.TestCase): + + def test_a_create_stream_1(self): + play_source = dict( + name = "Create Kinesis Stream with 10 Shards", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + name='stream-test', + shards=10, + wait='yes' + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 1) + + def test_a_create_stream_2(self): + play_source = dict( + name = "Create Kinesis Stream with 10 Shards and create a tag called environment", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + name='stream-test', + shards=10, + tags=dict( + env='development' + ), + wait='yes' + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 1) + + def test_a_create_stream_3(self): + play_source = dict( + name = "Create Kinesis Stream with 10 Shards and create a tag called environment and Change the default retention period from 24 to 48", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + name='stream-test', + retention_period=48, + shards=10, + tags=dict( + env='development' + ), + wait='yes' + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 1) + + def test_b_create_stream_1(self): + play_source = dict( + name = "Create Kinesis Stream with out specifying the number of shards", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + name='stream-test', + wait='yes' + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.failures['localhost'] == 1) + + def test_b_create_stream_2(self): + play_source = dict( + name = "Create Kinesis Stream with specifying the retention period less than 24 hours", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + name='stream-test', + retention_period=23, + shards=10, + wait='yes' + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.failures['localhost'] == 1) + + def test_c_delete_stream_(self): + play_source = dict( + name = "Delete Kinesis Stream test-stream", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + name='stream-test', + state='absent', + wait='yes' + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 1) + + +class AnsibleKinesisStreamFunctions(unittest.TestCase): + + def test_convert_to_lower(self): + example = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + converted_example = kinesis_stream.convert_to_lower(example) + keys = converted_example.keys() + keys.sort() + for i in range(len(keys)): + if i == 0: + self.assertEqual(keys[i], 'has_more_shards') + if i == 1: + self.assertEqual(keys[i], 'retention_period_hours') + if i == 2: + self.assertEqual(keys[i], 'stream_arn') + if i == 3: + self.assertEqual(keys[i], 'stream_name') + if i == 4: + self.assertEqual(keys[i], 'stream_status') + + def test_make_tags_in_aws_format(self): + example = { + 'env': 'development' + } + should_return = [ + { + 'Key': 'env', + 'Value': 'development' + } + ] + aws_tags = kinesis_stream.make_tags_in_aws_format(example) + self.assertEqual(aws_tags, should_return) + + def test_make_tags_in_proper_format(self): + example = [ + { + 'Key': 'env', + 'Value': 'development' + }, + { + 'Key': 'service', + 'Value': 'web' + } + ] + should_return = { + 'env': 'development', + 'service': 'web' + } + proper_tags = kinesis_stream.make_tags_in_proper_format(example) + self.assertEqual(proper_tags, should_return) + + def test_recreate_tags_from_list(self): + example = [('environment', 'development'), ('service', 'web')] + should_return = [ + { + 'Key': 'environment', + 'Value': 'development' + }, + { + 'Key': 'service', + 'Value': 'web' + } + ] + aws_tags = kinesis_stream.recreate_tags_from_list(example) + self.assertEqual(aws_tags, should_return) + + def test_get_tags(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg, tags = kinesis_stream.get_tags(client, 'test', True) + self.assertTrue(success) + should_return = [ + { + 'Key': 'DryRunMode', + 'Value': 'true' + } + ] + self.assertEqual(tags, should_return) + + def test_find_stream(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg, stream = ( + kinesis_stream.find_stream(client, 'test', check_mode=True) + ) + should_return = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_wait_for_status(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg, stream = ( + kinesis_stream.wait_for_status( + client, 'test', 'ACTIVE', check_mode=True + ) + ) + should_return = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_tags_action_create(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'create', check_mode=True + ) + ) + self.assertTrue(success) + + def test_tags_action_delete(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'delete', check_mode=True + ) + ) + self.assertTrue(success) + + def test_tags_action_invalid(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'append', check_mode=True + ) + ) + self.assertFalse(success) + + def test_update_tags(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.update_tags( + client, 'test', tags, check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_create(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'create', check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_delete(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'delete', check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_invalid(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'append', check_mode=True + ) + ) + self.assertFalse(success) + + def test_retention_action_increase(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 48, 'increase', check_mode=True + ) + ) + self.assertTrue(success) + + def test_retention_action_decrease(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 24, 'decrease', check_mode=True + ) + ) + self.assertTrue(success) + + def test_retention_action_invalid(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 24, 'create', check_mode=True + ) + ) + self.assertFalse(success) + + def test_update(self): + client = boto3.client('kinesis', region_name='us-west-2') + current_stream = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg = ( + kinesis_stream.update( + client, current_stream, 'test', retention_period=48, + tags=tags, check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') + + def test_create_stream(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg, results = ( + kinesis_stream.create_stream( + client, 'test', number_of_shards=10, retention_period=48, + tags=tags, check_mode=True + ) + ) + should_return = { + 'has_more_shards': True, + 'retention_period_hours': 24, + 'stream_name': 'test', + 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'stream_status': 'ACTIVE', + 'tags': tags, + } + self.assertTrue(success) + self.assertTrue(changed) + self.assertEqual(results, should_return) + self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') + + +def main(): + unittest.main() + +if __name__ == '__main__': + main() From 8c1277a8b78f2136f0b1312c1068d237d91d2816 Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Thu, 24 Mar 2016 22:20:58 -0700 Subject: [PATCH 04/10] Removed test as they will not be ran by Ansible. * I will include tests in my personal repo which will contain all modules written by me with their associated tests. --- cloud/amazon/test_kinesis_stream.py | 471 ---------------------------- 1 file changed, 471 deletions(-) delete mode 100644 cloud/amazon/test_kinesis_stream.py diff --git a/cloud/amazon/test_kinesis_stream.py b/cloud/amazon/test_kinesis_stream.py deleted file mode 100644 index 5b8fefd40db..00000000000 --- a/cloud/amazon/test_kinesis_stream.py +++ /dev/null @@ -1,471 +0,0 @@ -#!/usr/bin/python - -import unittest - -from collections import namedtuple -from ansible.parsing.dataloader import DataLoader -from ansible.vars import VariableManager -from ansible.inventory import Inventory -from ansible.playbook.play import Play -from ansible.executor.task_queue_manager import TaskQueueManager - -import kinesis_stream -import boto3 - -Options = ( - namedtuple( - 'Options', [ - 'connection', 'module_path', 'forks', 'become', 'become_method', - 'become_user', 'remote_user', 'private_key_file', 'ssh_common_args', - 'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity', - 'check' - ] - ) -) -# initialize needed objects -variable_manager = VariableManager() -loader = DataLoader() -options = ( - Options( - connection='local', - module_path='./', - forks=1, become=None, become_method=None, become_user=None, check=True, - remote_user=None, private_key_file=None, ssh_common_args=None, - sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, - verbosity=10 - ) -) -passwords = dict(vault_pass='') - -# create inventory and pass to var manager -inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost') -variable_manager.set_inventory(inventory) - -def run(play): - tqm = None - results = None - try: - tqm = TaskQueueManager( - inventory=inventory, - variable_manager=variable_manager, - loader=loader, - options=options, - passwords=passwords, - stdout_callback='default', - ) - results = tqm.run(play) - finally: - if tqm is not None: - tqm.cleanup() - return tqm, results - -class AnsibleKinesisStreamTasks(unittest.TestCase): - - def test_a_create_stream_1(self): - play_source = dict( - name = "Create Kinesis Stream with 10 Shards", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - name='stream-test', - shards=10, - wait='yes' - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 1) - - def test_a_create_stream_2(self): - play_source = dict( - name = "Create Kinesis Stream with 10 Shards and create a tag called environment", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - name='stream-test', - shards=10, - tags=dict( - env='development' - ), - wait='yes' - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 1) - - def test_a_create_stream_3(self): - play_source = dict( - name = "Create Kinesis Stream with 10 Shards and create a tag called environment and Change the default retention period from 24 to 48", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - name='stream-test', - retention_period=48, - shards=10, - tags=dict( - env='development' - ), - wait='yes' - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 1) - - def test_b_create_stream_1(self): - play_source = dict( - name = "Create Kinesis Stream with out specifying the number of shards", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - name='stream-test', - wait='yes' - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.failures['localhost'] == 1) - - def test_b_create_stream_2(self): - play_source = dict( - name = "Create Kinesis Stream with specifying the retention period less than 24 hours", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - name='stream-test', - retention_period=23, - shards=10, - wait='yes' - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.failures['localhost'] == 1) - - def test_c_delete_stream_(self): - play_source = dict( - name = "Delete Kinesis Stream test-stream", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - name='stream-test', - state='absent', - wait='yes' - ) - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 1) - - -class AnsibleKinesisStreamFunctions(unittest.TestCase): - - def test_convert_to_lower(self): - example = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - converted_example = kinesis_stream.convert_to_lower(example) - keys = converted_example.keys() - keys.sort() - for i in range(len(keys)): - if i == 0: - self.assertEqual(keys[i], 'has_more_shards') - if i == 1: - self.assertEqual(keys[i], 'retention_period_hours') - if i == 2: - self.assertEqual(keys[i], 'stream_arn') - if i == 3: - self.assertEqual(keys[i], 'stream_name') - if i == 4: - self.assertEqual(keys[i], 'stream_status') - - def test_make_tags_in_aws_format(self): - example = { - 'env': 'development' - } - should_return = [ - { - 'Key': 'env', - 'Value': 'development' - } - ] - aws_tags = kinesis_stream.make_tags_in_aws_format(example) - self.assertEqual(aws_tags, should_return) - - def test_make_tags_in_proper_format(self): - example = [ - { - 'Key': 'env', - 'Value': 'development' - }, - { - 'Key': 'service', - 'Value': 'web' - } - ] - should_return = { - 'env': 'development', - 'service': 'web' - } - proper_tags = kinesis_stream.make_tags_in_proper_format(example) - self.assertEqual(proper_tags, should_return) - - def test_recreate_tags_from_list(self): - example = [('environment', 'development'), ('service', 'web')] - should_return = [ - { - 'Key': 'environment', - 'Value': 'development' - }, - { - 'Key': 'service', - 'Value': 'web' - } - ] - aws_tags = kinesis_stream.recreate_tags_from_list(example) - self.assertEqual(aws_tags, should_return) - - def test_get_tags(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg, tags = kinesis_stream.get_tags(client, 'test', True) - self.assertTrue(success) - should_return = [ - { - 'Key': 'DryRunMode', - 'Value': 'true' - } - ] - self.assertEqual(tags, should_return) - - def test_find_stream(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg, stream = ( - kinesis_stream.find_stream(client, 'test', check_mode=True) - ) - should_return = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_wait_for_status(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg, stream = ( - kinesis_stream.wait_for_status( - client, 'test', 'ACTIVE', check_mode=True - ) - ) - should_return = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_tags_action_create(self): - client = boto3.client('kinesis', region_name='us-west-2') - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'create', check_mode=True - ) - ) - self.assertTrue(success) - - def test_tags_action_delete(self): - client = boto3.client('kinesis', region_name='us-west-2') - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'delete', check_mode=True - ) - ) - self.assertTrue(success) - - def test_tags_action_invalid(self): - client = boto3.client('kinesis', region_name='us-west-2') - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'append', check_mode=True - ) - ) - self.assertFalse(success) - - def test_update_tags(self): - client = boto3.client('kinesis', region_name='us-west-2') - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.update_tags( - client, 'test', tags, check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_create(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'create', check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_delete(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'delete', check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_invalid(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'append', check_mode=True - ) - ) - self.assertFalse(success) - - def test_retention_action_increase(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 48, 'increase', check_mode=True - ) - ) - self.assertTrue(success) - - def test_retention_action_decrease(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 24, 'decrease', check_mode=True - ) - ) - self.assertTrue(success) - - def test_retention_action_invalid(self): - client = boto3.client('kinesis', region_name='us-west-2') - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 24, 'create', check_mode=True - ) - ) - self.assertFalse(success) - - def test_update(self): - client = boto3.client('kinesis', region_name='us-west-2') - current_stream = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg = ( - kinesis_stream.update( - client, current_stream, 'test', retention_period=48, - tags=tags, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') - - def test_create_stream(self): - client = boto3.client('kinesis', region_name='us-west-2') - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg, results = ( - kinesis_stream.create_stream( - client, 'test', number_of_shards=10, retention_period=48, - tags=tags, check_mode=True - ) - ) - should_return = { - 'has_more_shards': True, - 'retention_period_hours': 24, - 'stream_name': 'test', - 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'stream_status': 'ACTIVE', - 'tags': tags, - } - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(results, should_return) - self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') - - -def main(): - unittest.main() - -if __name__ == '__main__': - main() From 72988aab146292e373e7a631343230f5f9964a74 Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Fri, 25 Mar 2016 12:38:10 -0700 Subject: [PATCH 05/10] updated module to accept check_mode in every boto call --- cloud/amazon/kinesis_stream.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py index 222a50dd0b3..6fbd1ab5402 100644 --- a/cloud/amazon/kinesis_stream.py +++ b/cloud/amazon/kinesis_stream.py @@ -368,15 +368,21 @@ def wait_for_status(client, stream_name, status, wait_timeout=300, find_success, find_msg, stream = ( find_stream(client, stream_name, check_mode=check_mode) ) - if status != 'DELETING': + if check_mode: + status_achieved = True + break + + elif status != 'DELETING': if find_success and stream: if stream.get('StreamStatus') == status: status_achieved = True break - elif status == 'DELETING': + + elif status == 'DELETING' and not check_mode: if not find_success: status_achieved = True break + else: time.sleep(polling_increment_secs) except botocore.exceptions.ClientError as e: @@ -494,7 +500,9 @@ def update_tags(client, stream_name, tags, check_mode=False): """ success = False err_msg = '' - tag_success, tag_msg, current_tags = get_tags(client, stream_name) + tag_success, tag_msg, current_tags = ( + get_tags(client, stream_name, check_mode=check_mode) + ) if current_tags: tags = make_tags_in_aws_format(tags) current_tags_set = ( @@ -765,7 +773,9 @@ def update(client, current_stream, stream_name, retention_period=None, return success, changed, err_msg if tags: - changed, err_msg = update_tags(client, stream_name, tags, check_mode) + changed, err_msg = ( + update_tags(client, stream_name, tags, check_mode=check_mode) + ) if changed: success = True if wait: @@ -783,7 +793,7 @@ def update(client, current_stream, stream_name, retention_period=None, return success, changed, err_msg def create_stream(client, stream_name, number_of_shards=1, retention_period=None, - tags=None, wait=False, wait_timeout=300, check_mode=None): + tags=None, wait=False, wait_timeout=300, check_mode=False): """Create an Amazon Kinesis Stream. Args: client (botocore.client.EC2): Boto3 client. @@ -824,13 +834,14 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait: wait_success, wait_msg, current_stream = ( wait_for_status( - client, stream_name, 'ACTIVE', wait_timeout, check_mode + client, stream_name, 'ACTIVE', wait_timeout, + check_mode=check_mode ) ) if stream_found and current_stream['StreamStatus'] != 'DELETING': success, changed, err_msg = update( client, current_stream, stream_name, retention_period, tags, - wait, wait_timeout, check_mode + wait, wait_timeout, check_mode=check_mode ) else: create_success, create_msg = ( @@ -844,7 +855,8 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None if wait: wait_success, wait_msg, results = ( wait_for_status( - client, stream_name, 'ACTIVE', wait_timeout, check_mode + client, stream_name, 'ACTIVE', wait_timeout, + check_mode=check_mode ) ) err_msg = ( @@ -938,7 +950,9 @@ def delete_stream(client, stream_name, wait=False, wait_timeout=300, changed = False err_msg = '' results = dict() - stream_found, stream_msg, current_stream = find_stream(client, stream_name) + stream_found, stream_msg, current_stream = ( + find_stream(client, stream_name, check_mode=check_mode) + ) if stream_found: success, err_msg = ( stream_action( @@ -951,7 +965,7 @@ def delete_stream(client, stream_name, wait=False, wait_timeout=300, success, err_msg, results = ( wait_for_status( client, stream_name, 'DELETING', wait_timeout, - check_mode + check_mode=check_mode ) ) err_msg = 'Stream {0} deleted successfully'.format(stream_name) From 1cc5ea7418ad61a58b127f876699149abadb02f5 Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Sun, 27 Mar 2016 16:16:52 -0700 Subject: [PATCH 06/10] Including unit tests. * Including unit tests as per https://groups.google.com/forum/#!topic/ansible-devel/ejY4CjKeC34 * This test suite is automatically run in https://github.com/linuxdynasty/ld-ansible-modules --- test/unit/cloud/amazon/test_kinesis_stream.py | 491 ++++++++++++++++++ 1 file changed, 491 insertions(+) create mode 100644 test/unit/cloud/amazon/test_kinesis_stream.py diff --git a/test/unit/cloud/amazon/test_kinesis_stream.py b/test/unit/cloud/amazon/test_kinesis_stream.py new file mode 100644 index 00000000000..5404ef99716 --- /dev/null +++ b/test/unit/cloud/amazon/test_kinesis_stream.py @@ -0,0 +1,491 @@ +#!/usr/bin/python + +import boto3 +import unittest + +from collections import namedtuple +from ansible.parsing.dataloader import DataLoader +from ansible.vars import VariableManager +from ansible.inventory import Inventory +from ansible.playbook.play import Play +from ansible.executor.task_queue_manager import TaskQueueManager + +import cloud.amazon.kinesis_stream as kinesis_stream + +Options = ( + namedtuple( + 'Options', [ + 'connection', 'module_path', 'forks', 'become', 'become_method', + 'become_user', 'remote_user', 'private_key_file', 'ssh_common_args', + 'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity', + 'check' + ] + ) +) +# initialize needed objects +variable_manager = VariableManager() +loader = DataLoader() +options = ( + Options( + connection='local', + module_path='cloud/amazon', + forks=1, become=None, become_method=None, become_user=None, check=True, + remote_user=None, private_key_file=None, ssh_common_args=None, + sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, + verbosity=10 + ) +) +passwords = dict(vault_pass='') + +aws_region = 'us-west-2' + +# create inventory and pass to var manager +inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost') +variable_manager.set_inventory(inventory) + +def run(play): + tqm = None + results = None + try: + tqm = TaskQueueManager( + inventory=inventory, + variable_manager=variable_manager, + loader=loader, + options=options, + passwords=passwords, + stdout_callback='default', + ) + results = tqm.run(play) + finally: + if tqm is not None: + tqm.cleanup() + return tqm, results + +class AnsibleKinesisStreamTasks(unittest.TestCase): + + def test_a_create_stream_1(self): + play_source = dict( + name = "Create Kinesis Stream with 10 Shards", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + args=dict( + name='stream-test', + shards=10, + wait='yes', + region=aws_region, + ) + ), + register='stream', + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 1) + + def test_a_create_stream_2(self): + play_source = dict( + name = "Create Kinesis Stream with 10 Shards and create a tag called environment", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + args=dict( + name='stream-test', + region=aws_region, + shards=10, + tags=dict( + env='development' + ), + wait='yes' + ) + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 1) + + def test_a_create_stream_3(self): + play_source = dict( + name = "Create Kinesis Stream with 10 Shards and create a tag called environment and Change the default retention period from 24 to 48", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + args=dict( + name='stream-test', + retention_period=48, + region=aws_region, + shards=10, + tags=dict( + env='development' + ), + wait='yes' + ) + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 1) + + def test_b_create_stream_1(self): + play_source = dict( + name = "Create Kinesis Stream with out specifying the number of shards", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + args=dict( + name='stream-test', + region=aws_region, + wait='yes' + ) + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.failures['localhost'] == 1) + + def test_b_create_stream_2(self): + play_source = dict( + name = "Create Kinesis Stream with specifying the retention period less than 24 hours", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + args=dict( + name='stream-test', + region=aws_region, + retention_period=23, + shards=10, + wait='yes' + ) + ), + register='stream' + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.failures['localhost'] == 1) + + def test_c_delete_stream_(self): + play_source = dict( + name = "Delete Kinesis Stream test-stream", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='kinesis_stream', + args=dict( + name='stream-test', + region=aws_region, + state='absent', + wait='yes' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 1) + + +class AnsibleKinesisStreamFunctions(unittest.TestCase): + + def test_convert_to_lower(self): + example = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + converted_example = kinesis_stream.convert_to_lower(example) + keys = converted_example.keys() + keys.sort() + for i in range(len(keys)): + if i == 0: + self.assertEqual(keys[i], 'has_more_shards') + if i == 1: + self.assertEqual(keys[i], 'retention_period_hours') + if i == 2: + self.assertEqual(keys[i], 'stream_arn') + if i == 3: + self.assertEqual(keys[i], 'stream_name') + if i == 4: + self.assertEqual(keys[i], 'stream_status') + + def test_make_tags_in_aws_format(self): + example = { + 'env': 'development' + } + should_return = [ + { + 'Key': 'env', + 'Value': 'development' + } + ] + aws_tags = kinesis_stream.make_tags_in_aws_format(example) + self.assertEqual(aws_tags, should_return) + + def test_make_tags_in_proper_format(self): + example = [ + { + 'Key': 'env', + 'Value': 'development' + }, + { + 'Key': 'service', + 'Value': 'web' + } + ] + should_return = { + 'env': 'development', + 'service': 'web' + } + proper_tags = kinesis_stream.make_tags_in_proper_format(example) + self.assertEqual(proper_tags, should_return) + + def test_recreate_tags_from_list(self): + example = [('environment', 'development'), ('service', 'web')] + should_return = [ + { + 'Key': 'environment', + 'Value': 'development' + }, + { + 'Key': 'service', + 'Value': 'web' + } + ] + aws_tags = kinesis_stream.recreate_tags_from_list(example) + self.assertEqual(aws_tags, should_return) + + def test_get_tags(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg, tags = kinesis_stream.get_tags(client, 'test', True) + self.assertTrue(success) + should_return = [ + { + 'Key': 'DryRunMode', + 'Value': 'true' + } + ] + self.assertEqual(tags, should_return) + + def test_find_stream(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg, stream = ( + kinesis_stream.find_stream(client, 'test', check_mode=True) + ) + should_return = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_wait_for_status(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg, stream = ( + kinesis_stream.wait_for_status( + client, 'test', 'ACTIVE', check_mode=True + ) + ) + should_return = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_tags_action_create(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'create', check_mode=True + ) + ) + self.assertTrue(success) + + def test_tags_action_delete(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'delete', check_mode=True + ) + ) + self.assertTrue(success) + + def test_tags_action_invalid(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'append', check_mode=True + ) + ) + self.assertFalse(success) + + def test_update_tags(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.update_tags( + client, 'test', tags, check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_create(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'create', check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_delete(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'delete', check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_invalid(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'append', check_mode=True + ) + ) + self.assertFalse(success) + + def test_retention_action_increase(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 48, 'increase', check_mode=True + ) + ) + self.assertTrue(success) + + def test_retention_action_decrease(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 24, 'decrease', check_mode=True + ) + ) + self.assertTrue(success) + + def test_retention_action_invalid(self): + client = boto3.client('kinesis', region_name='us-west-2') + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 24, 'create', check_mode=True + ) + ) + self.assertFalse(success) + + def test_update(self): + client = boto3.client('kinesis', region_name='us-west-2') + current_stream = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg = ( + kinesis_stream.update( + client, current_stream, 'test', retention_period=48, + tags=tags, check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') + + def test_create_stream(self): + client = boto3.client('kinesis', region_name='us-west-2') + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg, results = ( + kinesis_stream.create_stream( + client, 'test', number_of_shards=10, retention_period=48, + tags=tags, check_mode=True + ) + ) + should_return = { + 'has_more_shards': True, + 'retention_period_hours': 24, + 'stream_name': 'test', + 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'stream_status': 'ACTIVE', + 'tags': tags, + } + self.assertTrue(success) + self.assertTrue(changed) + self.assertEqual(results, should_return) + self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') + + +def main(): + unittest.main() + +if __name__ == '__main__': + main() From e9fcb8b28620268bc57407b010bfbecd08eb8dd1 Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Mon, 28 Mar 2016 07:39:15 -0700 Subject: [PATCH 07/10] Removed Ansible API based tests from this PR --- test/unit/cloud/amazon/test_kinesis_stream.py | 236 ++---------------- 1 file changed, 15 insertions(+), 221 deletions(-) diff --git a/test/unit/cloud/amazon/test_kinesis_stream.py b/test/unit/cloud/amazon/test_kinesis_stream.py index 5404ef99716..280ec5e2de6 100644 --- a/test/unit/cloud/amazon/test_kinesis_stream.py +++ b/test/unit/cloud/amazon/test_kinesis_stream.py @@ -3,216 +3,10 @@ import boto3 import unittest -from collections import namedtuple -from ansible.parsing.dataloader import DataLoader -from ansible.vars import VariableManager -from ansible.inventory import Inventory -from ansible.playbook.play import Play -from ansible.executor.task_queue_manager import TaskQueueManager - import cloud.amazon.kinesis_stream as kinesis_stream -Options = ( - namedtuple( - 'Options', [ - 'connection', 'module_path', 'forks', 'become', 'become_method', - 'become_user', 'remote_user', 'private_key_file', 'ssh_common_args', - 'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity', - 'check' - ] - ) -) -# initialize needed objects -variable_manager = VariableManager() -loader = DataLoader() -options = ( - Options( - connection='local', - module_path='cloud/amazon', - forks=1, become=None, become_method=None, become_user=None, check=True, - remote_user=None, private_key_file=None, ssh_common_args=None, - sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, - verbosity=10 - ) -) -passwords = dict(vault_pass='') - aws_region = 'us-west-2' -# create inventory and pass to var manager -inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost') -variable_manager.set_inventory(inventory) - -def run(play): - tqm = None - results = None - try: - tqm = TaskQueueManager( - inventory=inventory, - variable_manager=variable_manager, - loader=loader, - options=options, - passwords=passwords, - stdout_callback='default', - ) - results = tqm.run(play) - finally: - if tqm is not None: - tqm.cleanup() - return tqm, results - -class AnsibleKinesisStreamTasks(unittest.TestCase): - - def test_a_create_stream_1(self): - play_source = dict( - name = "Create Kinesis Stream with 10 Shards", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - args=dict( - name='stream-test', - shards=10, - wait='yes', - region=aws_region, - ) - ), - register='stream', - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 1) - - def test_a_create_stream_2(self): - play_source = dict( - name = "Create Kinesis Stream with 10 Shards and create a tag called environment", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - args=dict( - name='stream-test', - region=aws_region, - shards=10, - tags=dict( - env='development' - ), - wait='yes' - ) - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 1) - - def test_a_create_stream_3(self): - play_source = dict( - name = "Create Kinesis Stream with 10 Shards and create a tag called environment and Change the default retention period from 24 to 48", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - args=dict( - name='stream-test', - retention_period=48, - region=aws_region, - shards=10, - tags=dict( - env='development' - ), - wait='yes' - ) - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 1) - - def test_b_create_stream_1(self): - play_source = dict( - name = "Create Kinesis Stream with out specifying the number of shards", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - args=dict( - name='stream-test', - region=aws_region, - wait='yes' - ) - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.failures['localhost'] == 1) - - def test_b_create_stream_2(self): - play_source = dict( - name = "Create Kinesis Stream with specifying the retention period less than 24 hours", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - args=dict( - name='stream-test', - region=aws_region, - retention_period=23, - shards=10, - wait='yes' - ) - ), - register='stream' - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.failures['localhost'] == 1) - - def test_c_delete_stream_(self): - play_source = dict( - name = "Delete Kinesis Stream test-stream", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='kinesis_stream', - args=dict( - name='stream-test', - region=aws_region, - state='absent', - wait='yes' - ) - ) - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 1) - class AnsibleKinesisStreamFunctions(unittest.TestCase): @@ -286,7 +80,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertEqual(aws_tags, should_return) def test_get_tags(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg, tags = kinesis_stream.get_tags(client, 'test', True) self.assertTrue(success) should_return = [ @@ -298,7 +92,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertEqual(tags, should_return) def test_find_stream(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg, stream = ( kinesis_stream.find_stream(client, 'test', check_mode=True) ) @@ -313,7 +107,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertEqual(stream, should_return) def test_wait_for_status(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg, stream = ( kinesis_stream.wait_for_status( client, 'test', 'ACTIVE', check_mode=True @@ -330,7 +124,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertEqual(stream, should_return) def test_tags_action_create(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' @@ -343,7 +137,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertTrue(success) def test_tags_action_delete(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' @@ -356,7 +150,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertTrue(success) def test_tags_action_invalid(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' @@ -369,7 +163,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertFalse(success) def test_update_tags(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' @@ -382,7 +176,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertTrue(success) def test_stream_action_create(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.stream_action( client, 'test', 10, 'create', check_mode=True @@ -391,7 +185,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertTrue(success) def test_stream_action_delete(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.stream_action( client, 'test', 10, 'delete', check_mode=True @@ -400,7 +194,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertTrue(success) def test_stream_action_invalid(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.stream_action( client, 'test', 10, 'append', check_mode=True @@ -409,7 +203,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertFalse(success) def test_retention_action_increase(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.retention_action( client, 'test', 48, 'increase', check_mode=True @@ -418,7 +212,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertTrue(success) def test_retention_action_decrease(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.retention_action( client, 'test', 24, 'decrease', check_mode=True @@ -427,7 +221,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertTrue(success) def test_retention_action_invalid(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.retention_action( client, 'test', 24, 'create', check_mode=True @@ -436,7 +230,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertFalse(success) def test_update(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) current_stream = { 'HasMoreShards': True, 'RetentionPeriodHours': 24, @@ -459,7 +253,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') def test_create_stream(self): - client = boto3.client('kinesis', region_name='us-west-2') + client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' From 7cacd7dd2ace6543fc84c9a2937b962fb4af4ce3 Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Fri, 8 Apr 2016 17:37:12 -0700 Subject: [PATCH 08/10] Module requires boto due to ec2.py --- cloud/amazon/kinesis_stream.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py index 6fbd1ab5402..9e46c829c94 100644 --- a/cloud/amazon/kinesis_stream.py +++ b/cloud/amazon/kinesis_stream.py @@ -147,6 +147,7 @@ tags: ''' try: + import boto import botocore import boto3 HAS_BOTO3 = True From 8a17506058dc322f5a85b6ff60fa3e5b86e088de Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Wed, 4 May 2016 16:05:37 -0700 Subject: [PATCH 09/10] version bump --- cloud/amazon/kinesis_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py index 9e46c829c94..a9139053894 100644 --- a/cloud/amazon/kinesis_stream.py +++ b/cloud/amazon/kinesis_stream.py @@ -21,7 +21,7 @@ description: - Create or Delete a Kinesis Stream. - Update the retention period of a Kinesis Stream. - Update Tags on a Kinesis Stream. -version_added: "2.1" +version_added: "2.2" author: Allen Sanabria (@linuxdynasty) options: name: From e9f7fb092754e6a2aa1c540bc39d963594b141dd Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Thu, 14 Jul 2016 16:37:06 -0700 Subject: [PATCH 10/10] Now when number of shards is different than what is the stream currently, it will fail.\n\nShards can not be changed on an already created stream --- cloud/amazon/kinesis_stream.py | 79 +++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py index a9139053894..1ba25e69860 100644 --- a/cloud/amazon/kinesis_stream.py +++ b/cloud/amazon/kinesis_stream.py @@ -147,7 +147,6 @@ tags: ''' try: - import boto import botocore import boto3 HAS_BOTO3 = True @@ -285,20 +284,18 @@ def get_tags(client, stream_name, check_mode=False): }, ] success = True - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg, results -def find_stream(client, stream_name, limit=1, check_mode=False): +def find_stream(client, stream_name, check_mode=False): """Retrieve a Kinesis Stream. Args: client (botocore.client.EC2): Boto3 client. stream_name (str): Name of the Kinesis stream. Kwargs: - limit (int): Limit the number of shards to return within a stream. - default=1 check_mode (bool): This will pass DryRun as one of the parameters to the aws api. default=False @@ -313,15 +310,20 @@ def find_stream(client, stream_name, limit=1, check_mode=False): success = False params = { 'StreamName': stream_name, - 'Limit': limit } results = dict() + has_more_shards = True + shards = list() try: if not check_mode: - results = ( - client.describe_stream(**params)['StreamDescription'] - ) - results.pop('Shards') + while has_more_shards: + results = ( + client.describe_stream(**params)['StreamDescription'] + ) + shards.extend(results.pop('Shards')) + has_more_shards = results['HasMoreShards'] + results['Shards'] = shards + results['ShardsCount'] = len(shards) else: results = { 'HasMoreShards': True, @@ -331,7 +333,7 @@ def find_stream(client, stream_name, limit=1, check_mode=False): 'StreamStatus': 'ACTIVE' } success = True - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg, results @@ -391,6 +393,8 @@ def wait_for_status(client, stream_name, status, wait_timeout=300, if not status_achieved: err_msg = "Wait time out reached, while waiting for results" + else: + err_msg = "Status {0} achieved successfully".format(status) return status_achieved, err_msg, stream @@ -442,7 +446,7 @@ def tags_action(client, stream_name, tags, action='create', check_mode=False): else: err_msg = 'Invalid action {0}'.format(action) - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg @@ -500,6 +504,7 @@ def update_tags(client, stream_name, tags, check_mode=False): Tuple (bool, str) """ success = False + changed = False err_msg = '' tag_success, tag_msg, current_tags = ( get_tags(client, stream_name, check_mode=check_mode) @@ -536,13 +541,13 @@ def update_tags(client, stream_name, tags, check_mode=False): ) ) if not delete_success: - return delete_success, delete_msg + return delete_success, changed, delete_msg if tags_to_update: tags = make_tags_in_proper_format( recreate_tags_from_list(tags_to_update) ) else: - return True, 'Tags do not need to be updated' + return True, changed, 'Tags do not need to be updated' if tags: create_success, create_msg = ( @@ -551,9 +556,11 @@ def update_tags(client, stream_name, tags, check_mode=False): check_mode=check_mode ) ) - return create_success, create_msg + if create_success: + changed = True + return create_success, changed, create_msg - return success, err_msg + return success, changed, err_msg def stream_action(client, stream_name, shard_count=1, action='create', timeout=300, check_mode=False): @@ -603,7 +610,7 @@ def stream_action(client, stream_name, shard_count=1, action='create', else: err_msg = 'Invalid action {0}'.format(action) - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg @@ -644,10 +651,18 @@ def retention_action(client, stream_name, retention_period=24, params['RetentionPeriodHours'] = retention_period client.increase_stream_retention_period(**params) success = True + err_msg = ( + 'Retention Period increased successfully to {0}' + .format(retention_period) + ) elif action == 'decrease': params['RetentionPeriodHours'] = retention_period client.decrease_stream_retention_period(**params) success = True + err_msg = ( + 'Retention Period decreased successfully to {0}' + .format(retention_period) + ) else: err_msg = 'Invalid action {0}'.format(action) else: @@ -658,7 +673,7 @@ def retention_action(client, stream_name, retention_period=24, else: err_msg = 'Invalid action {0}'.format(action) - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg @@ -698,7 +713,7 @@ def update(client, current_stream, stream_name, retention_period=None, Returns: Tuple (bool, bool, str) """ - success = False + success = True changed = False err_msg = '' if retention_period: @@ -710,9 +725,10 @@ def update(client, current_stream, stream_name, retention_period=None, ) ) if not wait_success: - return wait_success, True, wait_msg + return wait_success, False, wait_msg if current_stream['StreamStatus'] == 'ACTIVE': + retention_changed = False if retention_period > current_stream['RetentionPeriodHours']: retention_changed, retention_msg = ( retention_action( @@ -720,8 +736,6 @@ def update(client, current_stream, stream_name, retention_period=None, check_mode=check_mode ) ) - if retention_changed: - success = True elif retention_period < current_stream['RetentionPeriodHours']: retention_changed, retention_msg = ( @@ -730,11 +744,8 @@ def update(client, current_stream, stream_name, retention_period=None, check_mode=check_mode ) ) - if retention_changed: - success = True elif retention_period == current_stream['RetentionPeriodHours']: - retention_changed = False retention_msg = ( 'Retention {0} is the same as {1}' .format( @@ -744,7 +755,10 @@ def update(client, current_stream, stream_name, retention_period=None, ) success = True - changed = retention_changed + if retention_changed: + success = True + changed = True + err_msg = retention_msg if changed and wait: wait_success, wait_msg, current_stream = ( @@ -754,7 +768,7 @@ def update(client, current_stream, stream_name, retention_period=None, ) ) if not wait_success: - return wait_success, True, wait_msg + return wait_success, False, wait_msg elif changed and not wait: stream_found, stream_msg, current_stream = ( find_stream(client, stream_name, check_mode=check_mode) @@ -774,11 +788,9 @@ def update(client, current_stream, stream_name, retention_period=None, return success, changed, err_msg if tags: - changed, err_msg = ( + _, _, err_msg = ( update_tags(client, stream_name, tags, check_mode=check_mode) ) - if changed: - success = True if wait: success, err_msg, _ = ( wait_for_status( @@ -832,6 +844,11 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None stream_found, stream_msg, current_stream = ( find_stream(client, stream_name, check_mode=check_mode) ) + if stream_found: + if current_stream['ShardsCount'] != number_of_shards: + err_msg = 'Can not change the number of shards in a Kinesis Stream' + return success, changed, err_msg, results + if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait: wait_success, wait_msg, current_stream = ( wait_for_status( @@ -1031,7 +1048,7 @@ def main(): region=region, endpoint=ec2_url, **aws_connect_kwargs ) ) - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = 'Boto3 Client Error - {0}'.format(str(e.msg)) module.fail_json( success=False, changed=False, result={}, msg=err_msg