From 0d8eefe197acc4f6e42c768f151e3f079c8a3c43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jos=C3=A9=20Pando?= Date: Mon, 23 May 2016 11:39:20 -0400 Subject: [PATCH] fixup sns topic subscriptions (#2232) * fixup sns topic subscriptions * return docs --- cloud/amazon/sns_topic.py | 422 ++++++++++++++++++++++---------------- 1 file changed, 241 insertions(+), 181 deletions(-) mode change 100755 => 100644 cloud/amazon/sns_topic.py diff --git a/cloud/amazon/sns_topic.py b/cloud/amazon/sns_topic.py old mode 100755 new mode 100644 index 2161fdaed94..f4916693edc --- a/cloud/amazon/sns_topic.py +++ b/cloud/amazon/sns_topic.py @@ -97,35 +97,29 @@ EXAMPLES = """ """ RETURN = ''' -topic_created: - description: Whether the topic was newly created - type: bool - returned: changed and state == present - sample: True - -attributes_set: - description: The attributes which were changed - type: list - returned: state == "present" - sample: ["policy", "delivery_policy"] - -subscriptions_added: - description: The subscriptions added to the topic - type: list - returned: state == "present" - sample: [["sms", "my_mobile_number"], ["sms", "my_mobile_2"]] - -subscriptions_deleted: - description: The subscriptions deleted from the topic - type: list - returned: state == "present" - sample: [["sms", "my_mobile_number"], ["sms", "my_mobile_2"]] - sns_arn: description: The ARN of the topic you are modifying type: string - returned: state == "present" sample: "arn:aws:sns:us-east-1:123456789012:my_topic_name" + +sns_topic: + description: Dict of sns topic details + type: dict + sample: + name: sns-topic-name + state: present + display_name: default + policy: {} + delivery_policy: {} + subscriptions_new: [] + subscriptions_existing: [] + subscriptions_deleted: [] + subscriptions_added: [] + subscriptions_purge': false + check_mode: false + topic_created: false + topic_deleted: false + attributes_set: [] ''' import sys @@ -141,38 +135,210 @@ except ImportError: HAS_BOTO = False -def canonicalize_endpoint(protocol, endpoint): - if protocol == 'sms': - import re - return re.sub('[^0-9]*', '', endpoint) - return endpoint - - -def get_all_topics(connection, module): - next_token = None - topics = [] - while True: +class SnsTopicManager(object): + """ Handles SNS Topic creation and destruction """ + + def __init__(self, + module, + name, + state, + display_name, + policy, + delivery_policy, + subscriptions, + purge_subscriptions, + check_mode, + region, + **aws_connect_params): + + self.region = region + self.aws_connect_params = aws_connect_params + self.connection = self._get_boto_connection() + self.changed = False + self.module = module + self.name = name + self.state = state + self.display_name = display_name + self.policy = policy + self.delivery_policy = delivery_policy + self.subscriptions = subscriptions + self.subscriptions_existing = [] + self.subscriptions_deleted = [] + self.subscriptions_added = [] + self.purge_subscriptions = purge_subscriptions + self.check_mode = check_mode + self.topic_created = False + self.topic_deleted = False + self.arn_topic = None + self.attributes_set = [] + + def _get_boto_connection(self): try: - response = connection.get_all_topics(next_token) - except BotoServerError, e: - module.fail_json(msg=e.message) + return connect_to_aws(boto.sns, self.region, + **self.aws_connect_params) + except BotoServerError, err: + self.module.fail_json(msg=err.message) + + def _get_all_topics(self): + next_token = None + topics = [] + while True: + try: + response = self.connection.get_all_topics(next_token) + except BotoServerError, err: + module.fail_json(msg=err.message) + topics.extend(response['ListTopicsResponse']['ListTopicsResult']['Topics']) + next_token = response['ListTopicsResponse']['ListTopicsResult']['NextToken'] + if not next_token: + break + return [t['TopicArn'] for t in topics] + + + def _arn_topic_lookup(self): + # topic names cannot have colons, so this captures the full topic name + all_topics = self._get_all_topics() + lookup_topic = ':%s' % self.name + for topic in all_topics: + if topic.endswith(lookup_topic): + return topic + + + def _create_topic(self): + self.changed = True + self.topic_created = True + if not self.check_mode: + self.connection.create_topic(self.name) + self.arn_topic = self._arn_topic_lookup() + while not self.arn_topic: + time.sleep(3) + self.arn_topic = self._arn_topic_lookup() - topics.extend(response['ListTopicsResponse']['ListTopicsResult']['Topics']) - next_token = \ - response['ListTopicsResponse']['ListTopicsResult']['NextToken'] - if not next_token: - break - return [t['TopicArn'] for t in topics] + def _set_topic_attrs(self): + topic_attributes = self.connection.get_topic_attributes(self.arn_topic) \ + ['GetTopicAttributesResponse'] ['GetTopicAttributesResult'] \ + ['Attributes'] + + if self.display_name and self.display_name != topic_attributes['DisplayName']: + self.changed = True + self.attributes_set.append('display_name') + if not self.check_mode: + self.connection.set_topic_attributes(self.arn_topic, 'DisplayName', + self.display_name) + + if self.policy and self.policy != json.loads(topic_attributes['Policy']): + self.changed = True + self.attributes_set.append('policy') + if not self.check_mode: + self.connection.set_topic_attributes(self.arn_topic, 'Policy', + json.dumps(self.policy)) + + if self.delivery_policy and ('DeliveryPolicy' not in topic_attributes or \ + self.delivery_policy != json.loads(topic_attributes['DeliveryPolicy'])): + self.changed = True + self.attributes_set.append('delivery_policy') + if not self.check_mode: + self.connection.set_topic_attributes(self.arn_topic, 'DeliveryPolicy', + json.dumps(self.delivery_policy)) + + + def _canonicalize_endpoint(self, protocol, endpoint): + if protocol == 'sms': + return re.sub('[^0-9]*', '', endpoint) + return endpoint + + + def _get_topic_subs(self): + next_token = None + while True: + response = self.connection.get_all_subscriptions_by_topic(self.arn_topic, next_token) + self.subscriptions_existing.extend(response['ListSubscriptionsByTopicResponse'] \ + ['ListSubscriptionsByTopicResult']['Subscriptions']) + next_token = response['ListSubscriptionsByTopicResponse'] \ + ['ListSubscriptionsByTopicResult']['NextToken'] + if not next_token: + break + + def _set_topic_subs(self): + subscriptions_existing_list = [] + desired_subscriptions = [(sub['protocol'], + self._canonicalize_endpoint(sub['protocol'], sub['endpoint'])) for sub in + self.subscriptions] + + if self.subscriptions_existing: + for sub in self.subscriptions_existing: + sub_key = (sub['Protocol'], sub['Endpoint']) + subscriptions_existing_list.append(sub_key) + if self.purge_subscriptions and sub_key not in desired_subscriptions and \ + sub['SubscriptionArn'] != 'PendingConfirmation': + self.changed = True + self.subscriptions_deleted.append(sub_key) + if not self.check_mode: + self.connection.unsubscribe(sub['SubscriptionArn']) + + for (protocol, endpoint) in desired_subscriptions: + if (protocol, endpoint) not in subscriptions_existing_list: + self.changed = True + self.subscriptions_added.append(sub) + if not self.check_mode: + self.connection.subscribe(self.arn_topic, protocol, endpoint) + + + def _delete_subscriptions(self): + # NOTE: subscriptions in 'PendingConfirmation' timeout in 3 days + # https://forums.aws.amazon.com/thread.jspa?threadID=85993 + for sub in self.subscriptions_existing: + if sub['SubscriptionArn'] != 'PendingConfirmation': + self.subscriptions_deleted.append(sub['SubscriptionArn']) + self.changed = True + if not self.check_mode: + self.connection.unsubscribe(sub['SubscriptionArn']) + + + def _delete_topic(self): + self.topic_deleted = True + self.changed = True + if not self.check_mode: + self.connection.delete_topic(self.arn_topic) + + + def ensure_ok(self): + self.arn_topic = self._arn_topic_lookup() + if not self.arn_topic: + self._create_topic() + self._set_topic_attrs() + self._get_topic_subs() + self._set_topic_subs() + + def ensure_gone(self): + self.arn_topic = self._arn_topic_lookup() + if self.arn_topic: + self._get_topic_subs() + if self.subscriptions_existing: + self._delete_subscriptions() + self._delete_topic() + + + def get_info(self): + info = { + 'name': self.name, + 'state': self.state, + 'display_name': self.display_name, + 'policy': self.policy, + 'delivery_policy': self.delivery_policy, + 'subscriptions_new': self.subscriptions, + 'subscriptions_existing': self.subscriptions_existing, + 'subscriptions_deleted': self.subscriptions_deleted, + 'subscriptions_added': self.subscriptions_added, + 'subscriptions_purge': self.purge_subscriptions, + 'check_mode': self.check_mode, + 'topic_created': self.topic_created, + 'topic_deleted': self.topic_deleted, + 'attributes_set': self.attributes_set + } + + return info -def arn_topic_lookup(connection, short_topic, module): - # topic names cannot have colons, so this captures the full topic name - all_topics = get_all_topics(connection, module) - lookup_topic = ':%s' % short_topic - for topic in all_topics: - if topic.endswith(lookup_topic): - return topic - return None def main(): @@ -190,7 +356,8 @@ def main(): ) ) - module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True) + module = AnsibleModule(argument_spec=argument_spec, + supports_check_mode=True) if not HAS_BOTO: module.fail_json(msg='boto required for this module') @@ -203,142 +370,35 @@ def main(): subscriptions = module.params.get('subscriptions') purge_subscriptions = module.params.get('purge_subscriptions') check_mode = module.check_mode - changed = False - - topic_created = False - attributes_set = [] - subscriptions_added = [] - subscriptions_deleted = [] region, ec2_url, aws_connect_params = get_aws_connection_info(module) if not region: module.fail_json(msg="region must be specified") - try: - connection = connect_to_aws(boto.sns, region, **aws_connect_params) - except boto.exception.NoAuthHandlerFound, e: - module.fail_json(msg=str(e)) - - # topics cannot contain ':', so thats the decider - if ':' in name: - all_topics = get_all_topics(connection, module) - if name in all_topics: - arn_topic = name - elif state == 'absent': - module.exit_json(changed=False) - else: - module.fail_json(msg="specified an ARN for a topic but it doesn't" - " exist") - else: - arn_topic = arn_topic_lookup(connection, name, module) - if not arn_topic: - if state == 'absent': - module.exit_json(changed=False) - elif check_mode: - module.exit_json(changed=True, topic_created=True, - subscriptions_added=subscriptions, - subscriptions_deleted=[]) - - changed=True - topic_created = True - try: - connection.create_topic(name) - except BotoServerError, e: - module.fail_json(msg=e.message) - arn_topic = arn_topic_lookup(connection, name, module) - while not arn_topic: - time.sleep(3) - arn_topic = arn_topic_lookup(connection, name, module) - - if arn_topic and state == "absent": - if not check_mode: - try: - connection.delete_topic(arn_topic) - except BotoServerError, e: - module.fail_json(msg=e.message) - module.exit_json(changed=True) + sns_topic = SnsTopicManager(module, + name, + state, + display_name, + policy, + delivery_policy, + subscriptions, + purge_subscriptions, + check_mode, + region, + **aws_connect_params) - topic_attributes = connection.get_topic_attributes(arn_topic) \ - ['GetTopicAttributesResponse'] ['GetTopicAttributesResult'] \ - ['Attributes'] - if display_name and display_name != topic_attributes['DisplayName']: - changed = True - attributes_set.append('display_name') - if not check_mode: - try: - connection.set_topic_attributes(arn_topic, 'DisplayName', display_name) - except BotoServerError, e: - module.fail_json(msg=e.message) - - if policy and policy != json.loads(topic_attributes['Policy']): - changed = True - attributes_set.append('policy') - if not check_mode: - try: - connection.set_topic_attributes(arn_topic, 'Policy', json.dumps(policy)) - except BotoServerError, e: - module.fail_json(msg=e.message) - - if delivery_policy and ('DeliveryPolicy' not in topic_attributes or \ - delivery_policy != json.loads(topic_attributes['DeliveryPolicy'])): - changed = True - attributes_set.append('delivery_policy') - if not check_mode: - try: - connection.set_topic_attributes(arn_topic, 'DeliveryPolicy',json.dumps(delivery_policy)) - except BotoServerError, e: - module.fail_json(msg=e.message) + if state == 'present': + sns_topic.ensure_ok() + elif state == 'absent': + sns_topic.ensure_gone() - next_token = None - aws_subscriptions = [] - while True: - try: - response = connection.get_all_subscriptions_by_topic(arn_topic, - next_token) - except BotoServerError, e: - module.fail_json(msg=e.message) + sns_facts = dict(changed=sns_topic.changed, + sns_arn=sns_topic.arn_topic, + sns_topic=sns_topic.get_info()) + + module.exit_json(**sns_facts) - aws_subscriptions.extend(response['ListSubscriptionsByTopicResponse'] \ - ['ListSubscriptionsByTopicResult']['Subscriptions']) - next_token = response['ListSubscriptionsByTopicResponse'] \ - ['ListSubscriptionsByTopicResult']['NextToken'] - if not next_token: - break - - desired_subscriptions = [(sub['protocol'], - canonicalize_endpoint(sub['protocol'], sub['endpoint'])) for sub in - subscriptions] - - aws_subscriptions_list = [] - - for sub in aws_subscriptions: - sub_key = (sub['Protocol'], sub['Endpoint']) - aws_subscriptions_list.append(sub_key) - if purge_subscriptions and sub_key not in desired_subscriptions and \ - sub['SubscriptionArn'] != 'PendingConfirmation': - changed = True - subscriptions_deleted.append(sub_key) - if not check_mode: - try: - connection.unsubscribe(sub['SubscriptionArn']) - except BotoServerError, e: - module.fail_json(msg=e.message) - - for (protocol, endpoint) in desired_subscriptions: - if (protocol, endpoint) not in aws_subscriptions_list: - changed = True - subscriptions_added.append(sub) - if not check_mode: - try: - connection.subscribe(arn_topic, protocol, endpoint) - except BotoServerError, e: - module.fail_json(msg=e.message) - - module.exit_json(changed=changed, topic_created=topic_created, - attributes_set=attributes_set, - subscriptions_added=subscriptions_added, - subscriptions_deleted=subscriptions_deleted, sns_arn=arn_topic) from ansible.module_utils.basic import * from ansible.module_utils.ec2 import *