From c2edc8a26407b6c5528e31e4b5fbb1126530548b Mon Sep 17 00:00:00 2001 From: Tom Melendez Date: Thu, 26 Jan 2017 13:16:52 -0800 Subject: [PATCH] [GCE] Google Cloud Pubsub Module (#19091) * Google Cloud Pubsub Module The Google Cloud Pubsub module allows the Ansible user to: * Create/Delete Topics * Create/Delete Subscriptions * Change subscription from pull to push (and configure endpoint) * Publish messages to a topic * Pull messages from a Subscription An accessory module, gcpubsub_facts, has been added to list topics and subscriptions. * Added docs for state field to DOCUMENTATION and RETURN blocks. --- lib/ansible/module_utils/gcp.py | 61 ++-- lib/ansible/modules/cloud/google/gcpubsub.py | 325 ++++++++++++++++++ .../modules/cloud/google/gcpubsub_facts.py | 150 ++++++++ test/units/module_utils/gcp/test_utils.py | 41 +++ 4 files changed, 555 insertions(+), 22 deletions(-) create mode 100644 lib/ansible/modules/cloud/google/gcpubsub.py create mode 100644 lib/ansible/modules/cloud/google/gcpubsub_facts.py create mode 100644 test/units/module_utils/gcp/test_utils.py diff --git a/lib/ansible/module_utils/gcp.py b/lib/ansible/module_utils/gcp.py index 3e662d3bf14..227ed74aee2 100644 --- a/lib/ansible/module_utils/gcp.py +++ b/lib/ansible/module_utils/gcp.py @@ -39,22 +39,23 @@ try: except ImportError: HAS_LIBCLOUD_BASE = False +# google-auth +try: + import google.auth + from google.oauth2 import service_account + HAS_GOOGLE_AUTH = True +except ImportError as e: + HAS_GOOGLE_AUTH = False + # google-python-api try: + import google_auth_httplib2 from httplib2 import Http - from oauth2client.service_account import ServiceAccountCredentials from googleapiclient.http import set_user_agent HAS_GOOGLE_API_LIB = True except ImportError: HAS_GOOGLE_API_LIB = False -# google-cloud-python -try: - import google.cloud.core as _GOOGLE_CLOUD_CORE_CHECK__ - from httplib2 import Http - HAS_GOOGLE_CLOUD_CORE = True -except ImportError: - HAS_GOOGLE_CLOUD_CORE = False # Ansible Display object for warnings try: @@ -195,7 +196,7 @@ def _get_gcp_credentials(module, require_valid_json=True, check_libcloud=False): return None else: if credentials_file is None or project_id is None: - module.fail_json(msg=('GCP connection error: enable to determine project (%s) or' + module.fail_json(msg=('GCP connection error: unable to determine project (%s) or ' 'credentials file (%s)' % (project_id, credentials_file))) # ensure the credentials file is found and is in the proper format. @@ -300,16 +301,23 @@ def get_google_cloud_credentials(module, scopes=[]): params dict {'service_account_email': '...', 'credentials_file': '...', 'project_id': ...} :rtype: ``tuple`` """ - creds = _get_gcp_credentials(module, + if not HAS_GOOGLE_AUTH: + module.fail_json(msg='Please install google-auth.') + + conn_params = _get_gcp_credentials(module, require_valid_json=True, check_libcloud=False) try: - credentials = ServiceAccountCredentials.from_json_keyfile_name( - creds['credentials_file'], - scopes=scopes - ) + if conn_params['credentials_file']: + credentials = service_account.Credentials.from_service_account_file( + conn_params['credentials_file']) + if scopes: + credentials = credentials.with_scopes(scopes) + else: + credentials = google.auth.default( + scopes=scopes)[0] - return (credentials, creds) + return (credentials, conn_params) except Exception as e: module.fail_json(msg=unexpected_error_msg(e), changed=False) return (None, None) @@ -318,10 +326,10 @@ def get_google_api_auth(module, scopes=[], user_agent_product='ansible-python-ap """ Authentication for use with google-python-api-client. - Function calls _get_gcp_credentials, which attempts to assemble the credentials from various locations. - Next it attempts to authenticate with Google. + Function calls get_google_cloud_credentials, which attempts to assemble the credentials + from various locations. Next it attempts to authenticate with Google. - This function returns an httplib2 object that can be provided to the Google Python API client. + This function returns an httplib2 (compatible) object that can be provided to the Google Python API client. For libcloud, don't use this function, use gcp_connect instead. For Google Cloud, See get_google_cloud_credentials for how to connect. @@ -330,7 +338,7 @@ def get_google_api_auth(module, scopes=[], user_agent_product='ansible-python-ap U(https://cloud.google.com/apis/docs/client-libraries-explained#google_api_client_libraries) Google API example: - http_auth, conn_params = gcp_api_auth(module, scopes, user_agent_product, user_agent_version) + http_auth, conn_params = get_google_api_auth(module, scopes, user_agent_product, user_agent_version) service = build('myservice', 'v1', http=http_auth) ... @@ -356,15 +364,24 @@ def get_google_api_auth(module, scopes=[], user_agent_product='ansible-python-ap if not scopes: scopes = ['https://www.googleapis.com/auth/cloud-platform'] try: - (credentials, conn_params) = get_google_credentials(module, scopes, - require_valid_json=True, check_libcloud=False) + (credentials, conn_params) = get_google_cloud_credentials(module, scopes) http = set_user_agent(Http(), '%s-%s' % (user_agent_product, user_agent_version)) - http_auth = credentials.authorize(http) + http_auth = google_auth_httplib2.AuthorizedHttp(credentials, http=http) + return (http_auth, conn_params) except Exception as e: module.fail_json(msg=unexpected_error_msg(e), changed=False) return (None, None) +def check_min_pkg_version(pkg_name, minimum_version): + """Minimum required version is >= installed version.""" + from pkg_resources import get_distribution + try: + installed_version = get_distribution(pkg_name).version + return LooseVersion(installed_version) >= minimum_version + except Exception as e: + return False + def unexpected_error_msg(error): """Create an error string based on passed in error.""" return 'Unexpected response: (%s). Detail: %s' % (str(error), traceback.format_exc(error)) diff --git a/lib/ansible/modules/cloud/google/gcpubsub.py b/lib/ansible/modules/cloud/google/gcpubsub.py new file mode 100644 index 00000000000..027629f7407 --- /dev/null +++ b/lib/ansible/modules/cloud/google/gcpubsub.py @@ -0,0 +1,325 @@ +#!/usr/bin/python +# Copyright 2016 Google Inc. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +DOCUMENTATION = ''' +--- +module: gcpubsub +version_added: "2.3" +short_description: Create and Delete Topics/Subscriptions, Publish and pull messages on PubSub. +description: + - Create and Delete Topics/Subscriptions, Publish and pull messages on PubSub. + See U(https://cloud.google.com/pubsub/docs) for an overview. +requirements: + - "python >= 2.6" + - "google-auth >= 0.5.0" + - "google-cloud-pubsub >= 0.22.0" +notes: + - Subscription pull happens before publish. You cannot publish and pull in the same task. +author: + - "Tom Melendez (@supertom) " +options: + topic: + description: + - GCP pubsub topic name. Only the name, not the full path, is required. + required: True + subscription: + description: + - Dictionary containing a subscripton name associated with a topic (required), along with optional ack_deadline, push_endpoint and pull. For pulling from a subscription, message_ack (bool), max_messages (int) and return_immediate are available as subfields. See subfields name, push_endpoint and ack_deadline for more information. + required: False + name: + description: Subfield of subscription. Required if subscription is specified. See examples. + required: False + ack_deadline: + description: Subfield of subscription. Not required. Default deadline for subscriptions to ACK the message before it is resent. See examples. + required: False + pull: + description: Subfield of subscription. Not required. If specified, messages will be retrieved from topic via the provided subscription name. max_messages (int; default None; max number of messages to pull), message_ack (bool; default False; acknowledge the message) and return_immediately (bool; default True, don't wait for messages to appear). If the messages are acknowledged, changed is set to True, otherwise, changed is False. + push_endpoint: + description: Subfield of subscription. Not required. If specified, message will be sent to an endpoint. See U(https://cloud.google.com/pubsub/docs/advanced#push_endpoints) for more information. + required: False + publish: + description: List of dictionaries describing messages and attributes to be published. Dictionary is in message(str):attributes(dict) format. Only message is required. + required: False + state: + description: State of the topic or queue (absent, present). Applies to the most granular resource. Remove the most granular resource. If subcription is specified we remove it. If only topic is specified, that is what is removed. Note that a topic can be removed without first removing the subscription. + required: False + default: "present" +''' +EXAMPLES = ''' +# Create a topic and publish a message to it +# (Message will be pushed; there is no check to see if the message was pushed before +# Topics: +## Create Topic +gcpubsub: + topic: ansible-topic-example + state: present + +## Delete Topic +### Subscriptions associated with topic are not deleted. +gcpubsub: + topic: ansible-topic-example + state: absent + +## Messages: publish multiple messages, with attributes (key:value available with the message) +### setting absent will keep the messages from being sent +gcpubsub: + topic: "{{ topic_name }}" + state: present + publish: + - message: "this is message 1" + attributes: + mykey1: myvalue + mykey2: myvalu2 + mykey3: myvalue3 + - message: "this is message 2" + attributes: + server: prod + sla: "99.9999" + owner: fred + +# Subscriptions +## Create Subscription (pull) +gcpubsub: + topic: ansible-topic-example + subscription: + - name: mysub + state: present + +## Create Subscription with ack_deadline and push endpoint +### pull is default, ack_deadline is not required +gcpubsub: + topic: ansible-topic-example + subscription: + - name: mysub + ack_deadline: "60" + push_endpoint: http://pushendpoint.example.com + state: present + +## Subscription change from push to pull +### setting push_endpoint to "None" converts subscription to pull. +gcpubsub: + topic: ansible-topic-example + subscription: + name: mysub + push_endpoint: "None" + +## Delete subscription +### Topic will not be deleted +gcpubsub: + topic: ansible-topic-example + subscription: + - name: mysub + state: absent + +## Pull messages from subscription +### only pull keyword is required. +gcpubsub: + topic: ansible-topic-example + subscription: + name: ansible-topic-example-sub + pull: + message_ack: yes + max_messages: "100" +''' + +RETURN = ''' +publish: + description: List of dictionaries describing messages and attributes to be published. Dictionary is in message(str):attributes(dict) format. Only message is required. + returned: Only when specified + type: list of dictionary + sample: "publish: ['message': 'my message', attributes: {'key1': 'value1'}]" + +pulled_messages: + description: list of dictionaries containing message info. Fields are ack_id, attributes, data, message_id. + returned: Only when subscription.pull is specified + type: list of dictionary + sample: [{ "ack_id": "XkASTCcYREl...","attributes": {"key1": "val1",...}, "data": "this is message 1", "message_id": "49107464153705"},..] + +state: + description: The state of the topic or subscription. Value will be either 'absent' or 'present'. + returned: Always + type: str + sample: "present" + +subscription: + description: Name of subscription. + returned: When subscription fields are specified + type: str + sample: "mysubscription" + +topic: + description: Name of topic. + returned: Always + type: str + sample: "mytopic" +''' +CLOUD_CLIENT = 'google-cloud-pubsub' +CLOUD_CLIENT_MINIMUM_VERSION = '0.22.0' +CLOUD_CLIENT_USER_AGENT = 'ansible-pubsub-0.1' + +try: + from ast import literal_eval + HAS_PYTHON26 = True +except ImportError: + HAS_PYTHON26 = False; + +try: + from google.cloud import pubsub + HAS_GOOGLE_CLOUD_PUBSUB = True +except ImportError as e: + HAS_GOOGLE_CLOUD_PUBSUB = False + + +def publish_messages(message_list, topic): + with topic.batch() as batch: + for message in message_list: + msg = message['message'] + attrs = {} + if 'attributes' in message: + attrs = message['attributes'] + batch.publish(bytes(msg), **attrs) + return True + +def pull_messages(pull_params, sub): + """ + :rtype: tuple (output, changed) + """ + changed = False + max_messages=pull_params.get('max_messages', None) + message_ack = pull_params.get('message_ack', 'no') + return_immediately = pull_params.get('return_immediately', False) + + output= [] + pulled = sub.pull(return_immediately=return_immediately, + max_messages=max_messages) + + for ack_id, msg in pulled: + msg_dict = {'message_id': msg.message_id, + 'attributes': msg.attributes, + 'data': msg.data, + 'ack_id': ack_id } + output.append(msg_dict) + + if message_ack: + ack_ids = [m['ack_id'] for m in output] + if ack_ids: + sub.acknowledge(ack_ids) + changed = True + return (output, changed) + + +def main(): + + module = AnsibleModule(argument_spec=dict( + topic=dict(required=True), + state=dict(choices=['absent', 'present'], default='present'), + publish=dict(type='list', default=None), + subscription=dict(type='dict', default=None), + service_account_email=dict(), + credentials_file=dict(), + project_id=dict(), ),) + + if not HAS_PYTHON26: + module.fail_json( + msg="GCE module requires python's 'ast' module, python v2.6+") + + if not HAS_GOOGLE_CLOUD_PUBSUB: + module.fail_json(msg="Please install google-cloud-pubsub library.") + + if not check_min_pkg_version(CLOUD_CLIENT, CLOUD_CLIENT_MINIMUM_VERSION): + module.fail_json(msg="Please install %s client version %s" % (CLOUD_CLIENT, CLOUD_CLIENT_MINIMUM_VERSION)) + + mod_params = {} + mod_params['publish'] = module.params.get('publish') + mod_params['state'] = module.params.get('state') + mod_params['topic'] = module.params.get('topic') + mod_params['subscription'] = module.params.get('subscription') + + creds, params = get_google_cloud_credentials(module) + pubsub_client = pubsub.Client(project=params['project_id'], credentials=creds, use_gax=False) + pubsub_client.user_agent = CLOUD_CLIENT_USER_AGENT + + changed = False + json_output = {} + + t = None + if mod_params['topic']: + t = pubsub_client.topic(mod_params['topic']) + s = None + if mod_params['subscription']: + # Note: default ack deadline cannot be changed without deleting/recreating subscription + s = t.subscription(mod_params['subscription']['name'], + ack_deadline=mod_params['subscription'].get('ack_deadline', None), + push_endpoint=mod_params['subscription'].get('push_endpoint', None)) + + if mod_params['state'] == 'absent': + # Remove the most granular resource. If subcription is specified + # we remove it. If only topic is specified, that is what is removed. + # Note that a topic can be removed without first removing the subscription. + # TODO(supertom): Enhancement: Provide an option to only delete a topic + # if there are no subscriptions associated with it (which the API does not support). + if s is not None: + if s.exists(): + s.delete() + changed = True + else: + if t.exists(): + t.delete() + changed = True + elif mod_params['state'] == 'present': + if not t.exists(): + t.create() + changed = True + if s: + if not s.exists(): + s.create() + s.reload() + changed = True + else: + # Subscription operations + # TODO(supertom): if more 'update' operations arise, turn this into a function. + s.reload() + push_endpoint=mod_params['subscription'].get('push_endpoint', None) + if push_endpoint is not None: + if push_endpoint != s.push_endpoint: + if push_endpoint == 'None': + push_endpoint = None + s.modify_push_configuration(push_endpoint=push_endpoint) + s.reload() + changed = push_endpoint == s.push_endpoint + + if 'pull' in mod_params['subscription']: + if s.push_endpoint is not None: + module.fail_json(msg="Cannot pull messages, push_endpoint is configured.") + (json_output['pulled_messages'], changed) = pull_messages( + mod_params['subscription']['pull'], s) + + # publish messages to the topic + if mod_params['publish'] and len(mod_params['publish']) > 0: + changed = publish_messages(mod_params['publish'], t) + + + json_output['changed'] = changed + json_output.update(mod_params) + module.exit_json(**json_output) + +# import module snippets +from ansible.module_utils.basic import * +from ansible.module_utils.gcp import * +if __name__ == '__main__': + main() diff --git a/lib/ansible/modules/cloud/google/gcpubsub_facts.py b/lib/ansible/modules/cloud/google/gcpubsub_facts.py new file mode 100644 index 00000000000..08a60841f37 --- /dev/null +++ b/lib/ansible/modules/cloud/google/gcpubsub_facts.py @@ -0,0 +1,150 @@ +#!/usr/bin/python +# Copyright 2016 Google Inc. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +DOCUMENTATION = ''' +--- +module: gcpubsub_facts +version_added: "2.3" +short_description: List Topics/Subscriptions and Messages from Google PubSub. +description: + - List Topics/Subscriptions from Google PubSub. Use the gcpubsub module for + topic/subscription management. + See U(https://cloud.google.com/pubsub/docs) for an overview. +requirements: + - "python >= 2.6" + - "google-auth >= 0.5.0" + - "google-cloud-pubsub >= 0.22.0" +notes: + - list state enables user to list topics or subscriptions in the project. See examples for details. +author: + - "Tom Melendez (@supertom) " +options: + topic: + description: + - GCP pubsub topic name. Only the name, not the full path, is required. + required: False + view: + description: + - Choices are 'topics' or 'subscriptions' + required: True + state: + description: + - list is the only valid option. + required: False +''' + +EXAMPLES = ''' +## List all Topics in a project +gcpubsub_facts: + view: topics + state: list + +## List all Subscriptions in a project +gcpubsub_facts: + view: subscriptions + state: list + +## List all Subscriptions for a Topic in a project +gcpubsub_facts: + view: subscriptions + topic: my-topic + state: list +''' + +RETURN = ''' +subscriptions: + description: List of subscriptions. + returned: When view is set to subscriptions. + type: list + sample: ["mysubscription", "mysubscription2"] +topic: + description: Name of topic. Used to filter subscriptions. + returned: Always + type: str + sample: "mytopic" +topics: + description: List of topics. + returned: When view is set to topics. + type: list + sample: ["mytopic", "mytopic2"] +''' + +try: + from ast import literal_eval + HAS_PYTHON26 = True +except ImportError: + HAS_PYTHON26 = False; + +try: + from google.cloud import pubsub + HAS_GOOGLE_CLOUD_PUBSUB = True +except ImportError as e: + HAS_GOOGLE_CLOUD_PUBSUB = False +def list_func(data, member='name'): + """Used for state=list.""" + return [getattr(x, member) for x in data] + + +def main(): + module = AnsibleModule(argument_spec=dict( + view=dict(choices=['topics', 'subscriptions'], default='topics'), + topic=dict(required=False), + state=dict(choices=['list'], default='list'), + service_account_email=dict(), + credentials_file=dict(), + project_id=dict(), ),) + + if not HAS_PYTHON26: + module.fail_json( + msg="GCE module requires python's 'ast' module, python v2.6+") + + if not HAS_GOOGLE_CLOUD_PUBSUB: + module.fail_json(msg="Please install google-cloud-pubsub library.") + + CLIENT_MINIMUM_VERSION = '0.22.0' + if not check_min_pkg_version('google-cloud-pubsub', CLIENT_MINIMUM_VERSION): + module.fail_json(msg="Please install google-cloud-pubsub library version %s" % CLIENT_MINIMUM_VERSION) + + mod_params = {} + mod_params['state'] = module.params.get('state') + mod_params['topic'] = module.params.get('topic') + mod_params['view'] = module.params.get('view') + + creds, params = get_google_cloud_credentials(module) + pubsub_client = pubsub.Client(project=params['project_id'], credentials=creds, use_gax=False) + pubsub_client.user_agent = 'ansible-pubsub-0.1' + + json_output = {} + if mod_params['view'] == 'topics': + json_output['topics'] = list_func(pubsub_client.list_topics()) + elif mod_params['view'] == 'subscriptions': + if mod_params['topic']: + t = pubsub_client.topic(mod_params['topic']) + json_output['subscriptions'] = list_func(t.list_subscriptions()) + else: + json_output['subscriptions'] = list_func(pubsub_client.list_subscriptions()) + + json_output['changed'] = False + json_output.update(mod_params) + module.exit_json(**json_output) + +# import module snippets +from ansible.module_utils.basic import * +from ansible.module_utils.gcp import * +if __name__ == '__main__': + main() diff --git a/test/units/module_utils/gcp/test_utils.py b/test/units/module_utils/gcp/test_utils.py new file mode 100644 index 00000000000..c536ff0ff1c --- /dev/null +++ b/test/units/module_utils/gcp/test_utils.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# (c) 2016, Tom Melendez +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . +import os +import sys + +from ansible.compat.tests import mock, unittest +from ansible.module_utils.gcp import (check_min_pkg_version) + +def build_distribution(version): + obj = mock.MagicMock() + obj.version = '0.5.0' + return obj + + +class GCPUtilsTestCase(unittest.TestCase): + + @mock.patch("pkg_resources.get_distribution", side_effect=build_distribution) + def test_check_minimum_pkg_version(self, mockobj): + self.assertTrue(check_min_pkg_version('foobar', '0.4.0')) + self.assertTrue(check_min_pkg_version('foobar', '0.5.0')) + self.assertFalse(check_min_pkg_version('foobar', '0.6.0')) + + +if __name__ == '__main__': + unittest.main()