diff --git a/lib/ansible/plugins/callback/logentries.py b/lib/ansible/plugins/callback/logentries.py index 3d5346952f8..746c9e08ba4 100644 --- a/lib/ansible/plugins/callback/logentries.py +++ b/lib/ansible/plugins/callback/logentries.py @@ -1,6 +1,19 @@ -""" -(c) 2015, Logentries.com -Author: Jimmy Tang +""" (c) 2015, Logentries.com, Jimmy Tang + +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . This callback plugin will generate json objects to be sent to logentries for auditing/debugging purposes. @@ -18,7 +31,7 @@ Add this to your ansible.cfg file in the defaults block callback_stdout = logentries callback_whitelist = logentries -Copy the callback plugin into the callback_plugings directory +Copy the callback plugin into the callback_plugins directory Either set the environment variables @@ -34,17 +47,16 @@ Or create a logentries.ini config file that sites next to the plugin with the fo tls_port = 20000 use_tls = no token = dd21fc88-f00a-43ff-b977-e3a4233c53af + flatten = False """ import os -import threading import socket import random import time import codecs -import Queue import ConfigParser import uuid try: @@ -72,15 +84,13 @@ def create_unicode(ch): return unicode(ch, 'utf-8') -class PlainTextSocketAppender(threading.Thread): +class PlainTextSocketAppender(object): def __init__(self, verbose=True, LE_API='data.logentries.com', LE_PORT=80, LE_TLS_PORT=443): - threading.Thread.__init__(self) - self.QUEUE_SIZE = 32768 self.LE_API = LE_API self.LE_PORT = LE_PORT self.LE_TLS_PORT = LE_TLS_PORT @@ -92,13 +102,8 @@ class PlainTextSocketAppender(threading.Thread): # Unicode Line separator character \u2028 self.LINE_SEP = to_unicode('\u2028') - self.daemon = True self.verbose = verbose self._conn = None - self._queue = Queue.Queue(self.QUEUE_SIZE) - - def empty(self): - return self._queue.empty() def open_connection(self): self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -131,35 +136,22 @@ class PlainTextSocketAppender(threading.Thread): if self._conn is not None: self._conn.close() - def run(self): - try: - # Open connection - self.reopen_connection() - - # Send data in queue - while True: - # Take data from queue - data = self._queue.get(block=True) - - # Replace newlines with Unicode line separator - # for multi-line events - if not is_unicode(data): - multiline = create_unicode(data).replace( - '\n', self.LINE_SEP) - else: - multiline = data.replace('\n', self.LINE_SEP) - multiline += "\n" - # Send data, reconnect if needed - while True: - try: - self._conn.send(multiline.encode('utf-8')) - except socket.error: - self.reopen_connection() - continue - break - except KeyboardInterrupt: - if self.verbose: - print("Logentries asynchronous socket client interrupted") + def put(self, data): + # Replace newlines with Unicode line separator + # for multi-line events + if not is_unicode(data): + multiline = create_unicode(data).replace('\n', self.LINE_SEP) + else: + multiline = data.replace('\n', self.LINE_SEP) + multiline += "\n" + # Send data, reconnect if needed + while True: + try: + self._conn.send(multiline.encode('utf-8')) + except socket.error: + self.reopen_connection() + continue + break self.close_connection() @@ -213,6 +205,9 @@ class CallbackModule(CallbackBase): self.use_tls = config.getboolean('logentries', 'use_tls') if config.has_option('logentries', 'token'): self.token = config.get('logentries', 'token') + if config.has_option('logentries', 'flatten'): + self.flatten = config.getboolean('logentries', 'flatten') + except: self.api_uri = os.getenv('LOGENTRIES_API') if self.api_uri is None: @@ -245,32 +240,37 @@ class CallbackModule(CallbackBase): self._display.warning( 'Logentries token could not be loaded. The logentries token can be provided using the `LOGENTRIES_TOKEN` environment variable') + self.flatten = os.getenv('LOGENTRIES_FLATTEN') + if self.flatten is None: + self.flatten = False + elif self.flatten.lower() in ['yes', 'true']: + self.flatten = True + self.verbose = False self.timeout = 10 self.le_jobid = str(uuid.uuid4()) if self.use_tls: - self._thread = TLSSocketAppender(verbose=self.verbose, - LE_API=self.api_uri, - LE_TLS_PORT=self.api_tls_port) + self._appender = TLSSocketAppender(verbose=self.verbose, + LE_API=self.api_uri, + LE_TLS_PORT=self.api_tls_port) + else: + self._appender = PlainTextSocketAppender(verbose=self.verbose, + LE_API=self.api_uri, + LE_PORT=self.api_port) + self._appender.reopen_connection() + + def emit_formatted(self, record): + if self.flatten: + results = flatdict.FlatDict(record) + self.emit(self._dump_results(results)) else: - self._thread = PlainTextSocketAppender(verbose=self.verbose, - LE_API=self.api_uri, - LE_PORT=self.api_port) + self.emit(self._dump_results(record)) def emit(self, record): - if not self._thread.is_alive(): - try: - self._thread.start() - if self.verbose: - print("Starting Logentries Asynchronous Socket Appender") - except RuntimeError: # It's already started. - if not self._thread.is_alive(): - raise - msg = record.rstrip('\n') msg = "{} {}".format(self.token, msg) - self._thread._queue.put(msg) + self._appender.put(msg) def runner_on_ok(self, host, res): results = {} @@ -278,8 +278,7 @@ class CallbackModule(CallbackBase): results['hostname'] = host results['results'] = res results['status'] = 'OK' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def runner_on_failed(self, host, res, ignore_errors=False): results = {} @@ -287,16 +286,14 @@ class CallbackModule(CallbackBase): results['hostname'] = host results['results'] = res results['status'] = 'FAILED' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def runner_on_skipped(self, host, item=None): results = {} results['le_jobid'] = self.le_jobid results['hostname'] = host results['status'] = 'SKIPPED' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def runner_on_unreachable(self, host, res): results = {} @@ -304,8 +301,7 @@ class CallbackModule(CallbackBase): results['hostname'] = host results['results'] = res results['status'] = 'UNREACHABLE' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def runner_on_async_failed(self, host, res, jid): results = {} @@ -314,8 +310,7 @@ class CallbackModule(CallbackBase): results['results'] = res results['jid'] = jid results['status'] = 'ASYNC_FAILED' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def v2_playbook_on_play_start(self, play): results = {} @@ -324,13 +319,8 @@ class CallbackModule(CallbackBase): if play.name: results['play'] = play.name results['hosts'] = play.hosts - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def playbook_on_stats(self, stats): - """ flush out queue of messages """ - now = time.time() - while not self._thread.empty(): - time.sleep(0.2) - if time.time() - now > self.timeout: - break + """ close connection """ + self._appender.close_connection()