Remove threading and queues. Added license information and cleaned up callback.

pull/12510/head
Jimmy Tang 9 years ago committed by Jimmy Tang
parent 85277c8aae
commit c02ceb8f12

@ -1,6 +1,19 @@
""" """ (c) 2015, Logentries.com, Jimmy Tang <jimmy.tang@logentries.com>
(c) 2015, Logentries.com
Author: Jimmy Tang <jimmy.tang@logentries.com> # This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
This callback plugin will generate json objects to be sent to logentries This callback plugin will generate json objects to be sent to logentries
for auditing/debugging purposes. for auditing/debugging purposes.
@ -18,7 +31,7 @@ Add this to your ansible.cfg file in the defaults block
callback_stdout = logentries callback_stdout = logentries
callback_whitelist = logentries callback_whitelist = logentries
Copy the callback plugin into the callback_plugings directory Copy the callback plugin into the callback_plugins directory
Either set the environment variables Either set the environment variables
@ -34,17 +47,16 @@ Or create a logentries.ini config file that sites next to the plugin with the fo
tls_port = 20000 tls_port = 20000
use_tls = no use_tls = no
token = dd21fc88-f00a-43ff-b977-e3a4233c53af token = dd21fc88-f00a-43ff-b977-e3a4233c53af
flatten = False
""" """
import os import os
import threading
import socket import socket
import random import random
import time import time
import codecs import codecs
import Queue
import ConfigParser import ConfigParser
import uuid import uuid
try: try:
@ -72,15 +84,13 @@ def create_unicode(ch):
return unicode(ch, 'utf-8') return unicode(ch, 'utf-8')
class PlainTextSocketAppender(threading.Thread): class PlainTextSocketAppender(object):
def __init__(self, def __init__(self,
verbose=True, verbose=True,
LE_API='data.logentries.com', LE_API='data.logentries.com',
LE_PORT=80, LE_PORT=80,
LE_TLS_PORT=443): LE_TLS_PORT=443):
threading.Thread.__init__(self)
self.QUEUE_SIZE = 32768
self.LE_API = LE_API self.LE_API = LE_API
self.LE_PORT = LE_PORT self.LE_PORT = LE_PORT
self.LE_TLS_PORT = LE_TLS_PORT self.LE_TLS_PORT = LE_TLS_PORT
@ -92,13 +102,8 @@ class PlainTextSocketAppender(threading.Thread):
# Unicode Line separator character \u2028 # Unicode Line separator character \u2028
self.LINE_SEP = to_unicode('\u2028') self.LINE_SEP = to_unicode('\u2028')
self.daemon = True
self.verbose = verbose self.verbose = verbose
self._conn = None self._conn = None
self._queue = Queue.Queue(self.QUEUE_SIZE)
def empty(self):
return self._queue.empty()
def open_connection(self): def open_connection(self):
self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -131,21 +136,11 @@ class PlainTextSocketAppender(threading.Thread):
if self._conn is not None: if self._conn is not None:
self._conn.close() self._conn.close()
def run(self): def put(self, data):
try:
# Open connection
self.reopen_connection()
# Send data in queue
while True:
# Take data from queue
data = self._queue.get(block=True)
# Replace newlines with Unicode line separator # Replace newlines with Unicode line separator
# for multi-line events # for multi-line events
if not is_unicode(data): if not is_unicode(data):
multiline = create_unicode(data).replace( multiline = create_unicode(data).replace('\n', self.LINE_SEP)
'\n', self.LINE_SEP)
else: else:
multiline = data.replace('\n', self.LINE_SEP) multiline = data.replace('\n', self.LINE_SEP)
multiline += "\n" multiline += "\n"
@ -157,9 +152,6 @@ class PlainTextSocketAppender(threading.Thread):
self.reopen_connection() self.reopen_connection()
continue continue
break break
except KeyboardInterrupt:
if self.verbose:
print("Logentries asynchronous socket client interrupted")
self.close_connection() self.close_connection()
@ -213,6 +205,9 @@ class CallbackModule(CallbackBase):
self.use_tls = config.getboolean('logentries', 'use_tls') self.use_tls = config.getboolean('logentries', 'use_tls')
if config.has_option('logentries', 'token'): if config.has_option('logentries', 'token'):
self.token = config.get('logentries', 'token') self.token = config.get('logentries', 'token')
if config.has_option('logentries', 'flatten'):
self.flatten = config.getboolean('logentries', 'flatten')
except: except:
self.api_uri = os.getenv('LOGENTRIES_API') self.api_uri = os.getenv('LOGENTRIES_API')
if self.api_uri is None: if self.api_uri is None:
@ -245,32 +240,37 @@ class CallbackModule(CallbackBase):
self._display.warning( self._display.warning(
'Logentries token could not be loaded. The logentries token can be provided using the `LOGENTRIES_TOKEN` environment variable') 'Logentries token could not be loaded. The logentries token can be provided using the `LOGENTRIES_TOKEN` environment variable')
self.flatten = os.getenv('LOGENTRIES_FLATTEN')
if self.flatten is None:
self.flatten = False
elif self.flatten.lower() in ['yes', 'true']:
self.flatten = True
self.verbose = False self.verbose = False
self.timeout = 10 self.timeout = 10
self.le_jobid = str(uuid.uuid4()) self.le_jobid = str(uuid.uuid4())
if self.use_tls: if self.use_tls:
self._thread = TLSSocketAppender(verbose=self.verbose, self._appender = TLSSocketAppender(verbose=self.verbose,
LE_API=self.api_uri, LE_API=self.api_uri,
LE_TLS_PORT=self.api_tls_port) LE_TLS_PORT=self.api_tls_port)
else: else:
self._thread = PlainTextSocketAppender(verbose=self.verbose, self._appender = PlainTextSocketAppender(verbose=self.verbose,
LE_API=self.api_uri, LE_API=self.api_uri,
LE_PORT=self.api_port) LE_PORT=self.api_port)
self._appender.reopen_connection()
def emit(self, record): def emit_formatted(self, record):
if not self._thread.is_alive(): if self.flatten:
try: results = flatdict.FlatDict(record)
self._thread.start() self.emit(self._dump_results(results))
if self.verbose: else:
print("Starting Logentries Asynchronous Socket Appender") self.emit(self._dump_results(record))
except RuntimeError: # It's already started.
if not self._thread.is_alive():
raise
def emit(self, record):
msg = record.rstrip('\n') msg = record.rstrip('\n')
msg = "{} {}".format(self.token, msg) msg = "{} {}".format(self.token, msg)
self._thread._queue.put(msg) self._appender.put(msg)
def runner_on_ok(self, host, res): def runner_on_ok(self, host, res):
results = {} results = {}
@ -278,8 +278,7 @@ class CallbackModule(CallbackBase):
results['hostname'] = host results['hostname'] = host
results['results'] = res results['results'] = res
results['status'] = 'OK' results['status'] = 'OK'
results = flatdict.FlatDict(results) self.emit_formatted(results)
self.emit(self._dump_results(results))
def runner_on_failed(self, host, res, ignore_errors=False): def runner_on_failed(self, host, res, ignore_errors=False):
results = {} results = {}
@ -287,16 +286,14 @@ class CallbackModule(CallbackBase):
results['hostname'] = host results['hostname'] = host
results['results'] = res results['results'] = res
results['status'] = 'FAILED' results['status'] = 'FAILED'
results = flatdict.FlatDict(results) self.emit_formatted(results)
self.emit(self._dump_results(results))
def runner_on_skipped(self, host, item=None): def runner_on_skipped(self, host, item=None):
results = {} results = {}
results['le_jobid'] = self.le_jobid results['le_jobid'] = self.le_jobid
results['hostname'] = host results['hostname'] = host
results['status'] = 'SKIPPED' results['status'] = 'SKIPPED'
results = flatdict.FlatDict(results) self.emit_formatted(results)
self.emit(self._dump_results(results))
def runner_on_unreachable(self, host, res): def runner_on_unreachable(self, host, res):
results = {} results = {}
@ -304,8 +301,7 @@ class CallbackModule(CallbackBase):
results['hostname'] = host results['hostname'] = host
results['results'] = res results['results'] = res
results['status'] = 'UNREACHABLE' results['status'] = 'UNREACHABLE'
results = flatdict.FlatDict(results) self.emit_formatted(results)
self.emit(self._dump_results(results))
def runner_on_async_failed(self, host, res, jid): def runner_on_async_failed(self, host, res, jid):
results = {} results = {}
@ -314,8 +310,7 @@ class CallbackModule(CallbackBase):
results['results'] = res results['results'] = res
results['jid'] = jid results['jid'] = jid
results['status'] = 'ASYNC_FAILED' results['status'] = 'ASYNC_FAILED'
results = flatdict.FlatDict(results) self.emit_formatted(results)
self.emit(self._dump_results(results))
def v2_playbook_on_play_start(self, play): def v2_playbook_on_play_start(self, play):
results = {} results = {}
@ -324,13 +319,8 @@ class CallbackModule(CallbackBase):
if play.name: if play.name:
results['play'] = play.name results['play'] = play.name
results['hosts'] = play.hosts results['hosts'] = play.hosts
results = flatdict.FlatDict(results) self.emit_formatted(results)
self.emit(self._dump_results(results))
def playbook_on_stats(self, stats): def playbook_on_stats(self, stats):
""" flush out queue of messages """ """ close connection """
now = time.time() self._appender.close_connection()
while not self._thread.empty():
time.sleep(0.2)
if time.time() - now > self.timeout:
break

Loading…
Cancel
Save