From 4bb37b82c4f97a586ed0932d423d622bae1515c0 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Wed, 29 Apr 2015 01:06:33 -0500 Subject: [PATCH] Fix duplicate callback issue in v2 All v2+ callbacks can now optionally define a CALLBACK_TYPE, which when set to 'stdout' will limit those callbacks which are used for primary output to a single callback plugin (specified to the TaskQueueManager object and configurable in ansible.cfg/environment) --- v2/ansible/constants.py | 1 + v2/ansible/executor/playbook_executor.py | 2 +- v2/ansible/executor/task_queue_manager.py | 47 ++++++++++++++++++----- v2/ansible/plugins/__init__.py | 7 +++- v2/ansible/plugins/callback/default.py | 1 + v2/ansible/plugins/callback/minimal.py | 1 + v2/bin/ansible | 2 +- 7 files changed, 48 insertions(+), 13 deletions(-) diff --git a/v2/ansible/constants.py b/v2/ansible/constants.py index eaca382a98e..09935693ace 100644 --- a/v2/ansible/constants.py +++ b/v2/ansible/constants.py @@ -162,6 +162,7 @@ DEFAULT_CONNECTION_PLUGIN_PATH = get_config(p, DEFAULTS, 'connection_plugins', ' DEFAULT_LOOKUP_PLUGIN_PATH = get_config(p, DEFAULTS, 'lookup_plugins', 'ANSIBLE_LOOKUP_PLUGINS', '~/.ansible/plugins/lookup_plugins:/usr/share/ansible_plugins/lookup_plugins') DEFAULT_VARS_PLUGIN_PATH = get_config(p, DEFAULTS, 'vars_plugins', 'ANSIBLE_VARS_PLUGINS', '~/.ansible/plugins/vars_plugins:/usr/share/ansible_plugins/vars_plugins') DEFAULT_FILTER_PLUGIN_PATH = get_config(p, DEFAULTS, 'filter_plugins', 'ANSIBLE_FILTER_PLUGINS', '~/.ansible/plugins/filter_plugins:/usr/share/ansible_plugins/filter_plugins') +DEFAULT_STDOUT_CALLBACK = get_config(p, DEFAULTS, 'stdout_callback', 'ANSIBLE_STDOUT_CALLBACK', 'default') CACHE_PLUGIN = get_config(p, DEFAULTS, 'fact_caching', 'ANSIBLE_CACHE_PLUGIN', 'memory') CACHE_PLUGIN_CONNECTION = get_config(p, DEFAULTS, 'fact_caching_connection', 'ANSIBLE_CACHE_PLUGIN_CONNECTION', None) diff --git a/v2/ansible/executor/playbook_executor.py b/v2/ansible/executor/playbook_executor.py index 6f0bf31f337..777587f7536 100644 --- a/v2/ansible/executor/playbook_executor.py +++ b/v2/ansible/executor/playbook_executor.py @@ -48,7 +48,7 @@ class PlaybookExecutor: if options.listhosts or options.listtasks or options.listtags: self._tqm = None else: - self._tqm = TaskQueueManager(inventory=inventory, callback='default', variable_manager=variable_manager, loader=loader, display=display, options=options, passwords=self.passwords) + self._tqm = TaskQueueManager(inventory=inventory, variable_manager=variable_manager, loader=loader, display=display, options=options, passwords=self.passwords) def run(self): diff --git a/v2/ansible/executor/task_queue_manager.py b/v2/ansible/executor/task_queue_manager.py index e13930c6df8..5f09e7ff8a8 100644 --- a/v2/ansible/executor/task_queue_manager.py +++ b/v2/ansible/executor/task_queue_manager.py @@ -24,6 +24,7 @@ import os import socket import sys +from ansible import constants as C from ansible.errors import AnsibleError from ansible.executor.connection_info import ConnectionInformation from ansible.executor.play_iterator import PlayIterator @@ -48,7 +49,7 @@ class TaskQueueManager: which dispatches the Play's tasks to hosts. ''' - def __init__(self, inventory, callback, variable_manager, loader, display, options, passwords): + def __init__(self, inventory, variable_manager, loader, display, options, passwords, stdout_callback=None): self._inventory = inventory self._variable_manager = variable_manager @@ -70,14 +71,8 @@ class TaskQueueManager: self._final_q = multiprocessing.Queue() - # load all available callback plugins - # FIXME: we need an option to white-list callback plugins - self._callback_plugins = [] - for callback_plugin in callback_loader.all(class_only=True): - if hasattr(callback_plugin, 'CALLBACK_VERSION') and callback_plugin.CALLBACK_VERSION >= 2.0: - self._callback_plugins.append(callback_plugin(self._display)) - else: - self._callback_plugins.append(callback_plugin()) + # load callback plugins + self._callback_plugins = self._load_callbacks(stdout_callback) # create the pool of worker threads, based on the number of forks specified try: @@ -120,6 +115,40 @@ class TaskQueueManager: for handler in handler_list: self._notified_handlers[handler.get_name()] = [] + def _load_callbacks(self, stdout_callback): + ''' + Loads all available callbacks, with the exception of those which + utilize the CALLBACK_TYPE option. When CALLBACK_TYPE is set to 'stdout', + only one such callback plugin will be loaded. + ''' + + loaded_plugins = [] + + stdout_callback_loaded = False + if stdout_callback is None: + stdout_callback = C.DEFAULT_STDOUT_CALLBACK + + if stdout_callback not in callback_loader: + raise AnsibleError("Invalid callback for stdout specified: %s" % stdout_callback) + + for callback_plugin in callback_loader.all(class_only=True): + if hasattr(callback_plugin, 'CALLBACK_VERSION') and callback_plugin.CALLBACK_VERSION >= 2.0: + # we only allow one callback of type 'stdout' to be loaded, so check + # the name of the current plugin and type to see if we need to skip + # loading this callback plugin + callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', None) + (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path)) + if callback_type == 'stdout': + if callback_name != stdout_callback or stdout_callback_loaded: + continue + stdout_callback_loaded = True + + loaded_plugins.append(callback_plugin(self._display)) + else: + loaded_plugins.append(callback_plugin()) + + return loaded_plugins + def run(self, play): ''' Iterates over the roles/tasks in a play, using the given (or default) diff --git a/v2/ansible/plugins/__init__.py b/v2/ansible/plugins/__init__.py index d16eecd3c39..f81f8c9d387 100644 --- a/v2/ansible/plugins/__init__.py +++ b/v2/ansible/plugins/__init__.py @@ -243,9 +243,12 @@ class PluginLoader: if path not in self._module_cache: self._module_cache[path] = imp.load_source('.'.join([self.package, name]), path) if kwargs.get('class_only', False): - yield getattr(self._module_cache[path], self.class_name) + obj = getattr(self._module_cache[path], self.class_name) else: - yield getattr(self._module_cache[path], self.class_name)(*args, **kwargs) + obj = getattr(self._module_cache[path], self.class_name)(*args, **kwargs) + # set extra info on the module, in case we want it later + setattr(obj, '_original_path', path) + yield obj action_loader = PluginLoader( 'ActionModule', diff --git a/v2/ansible/plugins/callback/default.py b/v2/ansible/plugins/callback/default.py index bb87dc4a942..262303dc570 100644 --- a/v2/ansible/plugins/callback/default.py +++ b/v2/ansible/plugins/callback/default.py @@ -31,6 +31,7 @@ class CallbackModule(CallbackBase): ''' CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'stdout' def v2_on_any(self, *args, **kwargs): pass diff --git a/v2/ansible/plugins/callback/minimal.py b/v2/ansible/plugins/callback/minimal.py index 95dfaee8785..4e9c8fffd2d 100644 --- a/v2/ansible/plugins/callback/minimal.py +++ b/v2/ansible/plugins/callback/minimal.py @@ -32,6 +32,7 @@ class CallbackModule(CallbackBase): ''' CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'stdout' def v2_on_any(self, *args, **kwargs): pass diff --git a/v2/bin/ansible b/v2/bin/ansible index d269790983e..8966b4bc65f 100755 --- a/v2/bin/ansible +++ b/v2/bin/ansible @@ -150,7 +150,7 @@ class Cli(object): # now create a task queue manager to execute the play try: display = Display() - tqm = TaskQueueManager(inventory=inventory, callback='minimal', variable_manager=variable_manager, loader=loader, display=display, options=options, passwords=passwords) + tqm = TaskQueueManager(inventory=inventory, variable_manager=variable_manager, loader=loader, display=display, options=options, passwords=passwords, stdout_callback='minimal') result = tqm.run(play) tqm.cleanup() except AnsibleError: