From 5a57c66e3c14e26fa5f20f1ce9bb038c667b19e5 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Fri, 16 Sep 2016 00:14:53 -0500 Subject: [PATCH] Moving result reading to a background thread --- lib/ansible/executor/action_write_locks.py | 43 ++ lib/ansible/executor/module_common.py | 8 +- lib/ansible/executor/task_queue_manager.py | 1 + lib/ansible/playbook/conditional.py | 3 +- lib/ansible/plugins/strategy/__init__.py | 456 +++++++++--------- lib/ansible/plugins/strategy/linear.py | 8 +- .../plugins/strategies/test_strategy_base.py | 89 +++- 7 files changed, 372 insertions(+), 236 deletions(-) create mode 100644 lib/ansible/executor/action_write_locks.py diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py new file mode 100644 index 00000000000..413d56d9d72 --- /dev/null +++ b/lib/ansible/executor/action_write_locks.py @@ -0,0 +1,43 @@ +# (c) 2016 - Red Hat, Inc. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from multiprocessing import Lock +from ansible.module_utils.facts import Facts + +if 'action_write_locks' not in globals(): + # Do not initialize this more than once because it seems to bash + # the existing one. multiprocessing must be reloading the module + # when it forks? + action_write_locks = dict() + + # Below is a Lock for use when we weren't expecting a named module. + # It gets used when an action plugin directly invokes a module instead + # of going through the strategies. Slightly less efficient as all + # processes with unexpected module names will wait on this lock + action_write_locks[None] = Lock() + + # These plugins are called directly by action plugins (not going through + # a strategy). We precreate them here as an optimization + mods = set(p['name'] for p in Facts.PKG_MGRS) + mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) + for mod_name in mods: + action_write_locks[mod_name] = Lock() + diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py index d7dac13de0a..86549ea0b7a 100644 --- a/lib/ansible/executor/module_common.py +++ b/lib/ansible/executor/module_common.py @@ -36,7 +36,7 @@ from ansible.module_utils._text import to_bytes, to_text # Must import strategy and use write_locks from there # If we import write_locks directly then we end up binding a # variable to the object and then it never gets updated. -from ansible.plugins import strategy +from ansible.executor import action_write_locks try: from __main__ import display @@ -605,16 +605,16 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename) zipdata = open(cached_module_filename, 'rb').read() else: - if module_name in strategy.action_write_locks: + if module_name in action_write_locks.action_write_locks: display.debug('ANSIBALLZ: Using lock for %s' % module_name) - lock = strategy.action_write_locks[module_name] + lock = action_write_locks.action_write_locks[module_name] else: # If the action plugin directly invokes the module (instead of # going through a strategy) then we don't have a cross-process # Lock specifically for this module. Use the "unexpected # module" lock instead display.debug('ANSIBALLZ: Using generic lock for %s' % module_name) - lock = strategy.action_write_locks[None] + lock = action_write_locks.action_write_locks[None] display.debug('ANSIBALLZ: Acquiring lock') with lock: diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index c0032753065..2e6948f1e0c 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -285,6 +285,7 @@ class TaskQueueManager: for host_name in iterator.get_failed_hosts(): self._failed_hosts[host_name] = True + strategy.cleanup() self._cleanup_processes() return play_return diff --git a/lib/ansible/playbook/conditional.py b/lib/ansible/playbook/conditional.py index 14f50f88295..1fb54df9982 100644 --- a/lib/ansible/playbook/conditional.py +++ b/lib/ansible/playbook/conditional.py @@ -25,6 +25,7 @@ from ansible.compat.six import text_type from ansible.errors import AnsibleError, AnsibleUndefinedVariable from ansible.playbook.attribute import FieldAttribute from ansible.template import Templar +from ansible.module_utils._text import to_native class Conditional: @@ -72,7 +73,7 @@ class Conditional: if not self._check_conditional(conditional, templar, all_vars): return False except Exception as e: - raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (conditional, e), obj=ds) + raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds) return True diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index aaa164ee75b..d6db1dbde87 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -19,14 +19,18 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +import os +import threading import time +from collections import deque from multiprocessing import Lock from jinja2.exceptions import UndefinedError from ansible.compat.six.moves import queue as Queue from ansible.compat.six import iteritems, string_types from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable +from ansible.executor import action_write_locks from ansible.executor.process.worker import WorkerProcess from ansible.executor.task_result import TaskResult from ansible.inventory.host import Host @@ -50,25 +54,6 @@ except ImportError: __all__ = ['StrategyBase'] -if 'action_write_locks' not in globals(): - # Do not initialize this more than once because it seems to bash - # the existing one. multiprocessing must be reloading the module - # when it forks? - action_write_locks = dict() - - # Below is a Lock for use when we weren't expecting a named module. - # It gets used when an action plugin directly invokes a module instead - # of going through the strategies. Slightly less efficient as all - # processes with unexpected module names will wait on this lock - action_write_locks[None] = Lock() - - # These plugins are called directly by action plugins (not going through - # a strategy). We precreate them here as an optimization - mods = set(p['name'] for p in Facts.PKG_MGRS) - mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) - for mod_name in mods: - action_write_locks[mod_name] = Lock() - # TODO: this should probably be in the plugins/__init__.py, with # a smarter mechanism to set all of the attributes based on # the loaders created there @@ -86,6 +71,25 @@ class SharedPluginLoaderObj: self.module_loader = module_loader +_sentinel = object() +def results_thread_main(strategy): + #print("RESULT THREAD STARTING: %s" % threading.current_thread()) + while True: + try: + result = strategy._final_q.get() + if type(result) == object: + break + else: + #print("result in thread is: %s" % result._result) + strategy._results_lock.acquire() + strategy._results.append(result) + strategy._results_lock.release() + except (IOError, EOFError): + break + except Queue.Empty: + pass + #print("RESULT THREAD EXITED: %s" % threading.current_thread()) + class StrategyBase: ''' @@ -104,6 +108,7 @@ class StrategyBase: self._final_q = tqm._final_q self._step = getattr(tqm._options, 'step', False) self._diff = getattr(tqm._options, 'diff', False) + # Backwards compat: self._display isn't really needed, just import the global display and use that. self._display = display @@ -115,6 +120,18 @@ class StrategyBase: # outstanding tasks still in queue self._blocked_hosts = dict() + self._results = deque() + self._results_lock = threading.Condition(threading.Lock()) + + #print("creating thread for strategy %s" % id(self)) + self._results_thread = threading.Thread(target=results_thread_main, args=(self,)) + self._results_thread.daemon = True + self._results_thread.start() + + def cleanup(self): + self._final_q.put(_sentinel) + self._results_thread.join() + def run(self, iterator, play_context, result=0): # save the failed/unreachable hosts, as the run_handlers() # method will clear that information during its execution @@ -174,10 +191,9 @@ class StrategyBase: # tasks inside of play_iterator so we'd have to extract them to do it # there. - global action_write_locks - if task.action not in action_write_locks: + if task.action not in action_write_locks.action_write_locks: display.debug('Creating lock for %s' % task.action) - action_write_locks[task.action] = Lock() + action_write_locks.action_write_locks[task.action] = Lock() # and then queue the new task try: @@ -211,7 +227,7 @@ class StrategyBase: return display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) - def _process_pending_results(self, iterator, one_pass=False): + def _process_pending_results(self, iterator, one_pass=False, max_passes=None): ''' Reads results off the final queue and takes appropriate action based on the result (executing callbacks, updating state, etc.). @@ -270,228 +286,232 @@ class StrategyBase: else: return False - passes = 0 - while not self._tqm._terminated: + cur_pass = 0 + while True: try: - task_result = self._final_q.get(timeout=0.001) - original_host = get_original_host(task_result._host) - original_task = iterator.get_original_task(original_host, task_result._task) - task_result._host = original_host - task_result._task = original_task - - # send callbacks for 'non final' results - if '_ansible_retry' in task_result._result: - self._tqm.send_callback('v2_runner_retry', task_result) - continue - elif '_ansible_item_result' in task_result._result: - if task_result.is_failed() or task_result.is_unreachable(): - self._tqm.send_callback('v2_runner_item_on_failed', task_result) - elif task_result.is_skipped(): - self._tqm.send_callback('v2_runner_item_on_skipped', task_result) - else: - if 'diff' in task_result._result: - if self._diff: - self._tqm.send_callback('v2_on_file_diff', task_result) - self._tqm.send_callback('v2_runner_item_on_ok', task_result) - continue + self._results_lock.acquire() + task_result = self._results.pop() + except IndexError: + break + finally: + self._results_lock.release() + + original_host = get_original_host(task_result._host) + original_task = iterator.get_original_task(original_host, task_result._task) + task_result._host = original_host + task_result._task = original_task + + # send callbacks for 'non final' results + if '_ansible_retry' in task_result._result: + self._tqm.send_callback('v2_runner_retry', task_result) + continue + elif '_ansible_item_result' in task_result._result: + if task_result.is_failed() or task_result.is_unreachable(): + self._tqm.send_callback('v2_runner_item_on_failed', task_result) + elif task_result.is_skipped(): + self._tqm.send_callback('v2_runner_item_on_skipped', task_result) + else: + if 'diff' in task_result._result: + if self._diff: + self._tqm.send_callback('v2_on_file_diff', task_result) + self._tqm.send_callback('v2_runner_item_on_ok', task_result) + continue + + if original_task.register: + #print("^ REGISTERING RESULT %s" % original_task.register) + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [original_host] + + clean_copy = strip_internal_keys(task_result._result) + if 'invocation' in clean_copy: + del clean_copy['invocation'] - if original_task.register: + for target_host in host_list: + self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) + + # all host status messages contain 2 entries: (msg, task_result) + role_ran = False + if task_result.is_failed(): + role_ran = True + if not original_task.ignore_errors: + display.debug("marking %s as failed" % original_host.name) if original_task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + # if we're using run_once, we have to fail every host here + for h in self._inventory.get_hosts(iterator._play.hosts): + if h.name not in self._tqm._unreachable_hosts: + state, _ = iterator.get_next_task_for_host(h, peek=True) + iterator.mark_host_failed(h) + state, new_task = iterator.get_next_task_for_host(h, peek=True) else: - host_list = [original_host] - - clean_copy = strip_internal_keys(task_result._result) - if 'invocation' in clean_copy: - del clean_copy['invocation'] - - for target_host in host_list: - self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) - - # all host status messages contain 2 entries: (msg, task_result) - role_ran = False - if task_result.is_failed(): - role_ran = True - if not original_task.ignore_errors: - display.debug("marking %s as failed" % original_host.name) - if original_task.run_once: - # if we're using run_once, we have to fail every host here - for h in self._inventory.get_hosts(iterator._play.hosts): - if h.name not in self._tqm._unreachable_hosts: - state, _ = iterator.get_next_task_for_host(h, peek=True) - iterator.mark_host_failed(h) - state, new_task = iterator.get_next_task_for_host(h, peek=True) - else: - iterator.mark_host_failed(original_host) + iterator.mark_host_failed(original_host) - # only add the host to the failed list officially if it has - # been failed by the iterator - if iterator.is_failed(original_host): - self._tqm._failed_hosts[original_host.name] = True - self._tqm._stats.increment('failures', original_host.name) - else: - # otherwise, we grab the current state and if we're iterating on - # the rescue portion of a block then we save the failed task in a - # special var for use within the rescue/always - state, _ = iterator.get_next_task_for_host(original_host, peek=True) - if state.run_state == iterator.ITERATING_RESCUE: - self._variable_manager.set_nonpersistent_facts( - original_host, - dict( - ansible_failed_task=original_task.serialize(), - ansible_failed_result=task_result._result, - ), - ) + # only add the host to the failed list officially if it has + # been failed by the iterator + if iterator.is_failed(original_host): + self._tqm._failed_hosts[original_host.name] = True + self._tqm._stats.increment('failures', original_host.name) else: - self._tqm._stats.increment('ok', original_host.name) - self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) - elif task_result.is_unreachable(): - self._tqm._unreachable_hosts[original_host.name] = True - self._tqm._stats.increment('dark', original_host.name) - self._tqm.send_callback('v2_runner_on_unreachable', task_result) - elif task_result.is_skipped(): - self._tqm._stats.increment('skipped', original_host.name) - self._tqm.send_callback('v2_runner_on_skipped', task_result) + # otherwise, we grab the current state and if we're iterating on + # the rescue portion of a block then we save the failed task in a + # special var for use within the rescue/always + state, _ = iterator.get_next_task_for_host(original_host, peek=True) + if state.run_state == iterator.ITERATING_RESCUE: + self._variable_manager.set_nonpersistent_facts( + original_host, + dict( + ansible_failed_task=original_task.serialize(), + ansible_failed_result=task_result._result, + ), + ) else: - role_ran = True + self._tqm._stats.increment('ok', original_host.name) + self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) + elif task_result.is_unreachable(): + self._tqm._unreachable_hosts[original_host.name] = True + self._tqm._stats.increment('dark', original_host.name) + self._tqm.send_callback('v2_runner_on_unreachable', task_result) + elif task_result.is_skipped(): + self._tqm._stats.increment('skipped', original_host.name) + self._tqm.send_callback('v2_runner_on_skipped', task_result) + else: + role_ran = True - if original_task.loop: - # this task had a loop, and has more than one result, so - # loop over all of them instead of a single result - result_items = task_result._result.get('results', []) - else: - result_items = [ task_result._result ] - - for result_item in result_items: - if '_ansible_notify' in result_item: - if task_result.is_changed(): - # The shared dictionary for notified handlers is a proxy, which - # does not detect when sub-objects within the proxy are modified. - # So, per the docs, we reassign the list so the proxy picks up and - # notifies all other threads - for handler_name in result_item['_ansible_notify']: - # Find the handler using the above helper. First we look up the - # dependency chain of the current task (if it's from a role), otherwise - # we just look through the list of handlers in the current play/all - # roles and use the first one that matches the notify name - if handler_name in self._listening_handlers: - for listening_handler_name in self._listening_handlers[handler_name]: - listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) - if listening_handler is None: - raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) - if original_host not in self._notified_handlers[listening_handler]: - self._notified_handlers[listening_handler].append(original_host) - display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) + if original_task.loop: + # this task had a loop, and has more than one result, so + # loop over all of them instead of a single result + result_items = task_result._result.get('results', []) + else: + result_items = [ task_result._result ] + + for result_item in result_items: + if '_ansible_notify' in result_item: + if task_result.is_changed(): + # The shared dictionary for notified handlers is a proxy, which + # does not detect when sub-objects within the proxy are modified. + # So, per the docs, we reassign the list so the proxy picks up and + # notifies all other threads + for handler_name in result_item['_ansible_notify']: + # Find the handler using the above helper. First we look up the + # dependency chain of the current task (if it's from a role), otherwise + # we just look through the list of handlers in the current play/all + # roles and use the first one that matches the notify name + if handler_name in self._listening_handlers: + for listening_handler_name in self._listening_handlers[handler_name]: + listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) + if listening_handler is None: + raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) + if original_host not in self._notified_handlers[listening_handler]: + self._notified_handlers[listening_handler].append(original_host) + display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) + else: + target_handler = search_handler_blocks(handler_name, iterator._play.handlers) + if target_handler is not None: + if original_host not in self._notified_handlers[target_handler]: + self._notified_handlers[target_handler].append(original_host) + # FIXME: should this be a callback? + display.vv("NOTIFIED HANDLER %s" % (handler_name,)) else: - target_handler = search_handler_blocks(handler_name, iterator._play.handlers) - if target_handler is not None: - if original_host not in self._notified_handlers[target_handler]: + # As there may be more than one handler with the notified name as the + # parent, so we just keep track of whether or not we found one at all + found = False + for target_handler in self._notified_handlers: + if parent_handler_match(target_handler, handler_name): self._notified_handlers[target_handler].append(original_host) - # FIXME: should this be a callback? - display.vv("NOTIFIED HANDLER %s" % (handler_name,)) - else: - # As there may be more than one handler with the notified name as the - # parent, so we just keep track of whether or not we found one at all - found = False - for target_handler in self._notified_handlers: - if parent_handler_match(target_handler, handler_name): - self._notified_handlers[target_handler].append(original_host) - display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),)) - found = True - - # and if none were found, then we raise an error - if not found: - raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) - - - if 'add_host' in result_item: - # this task added a new host (add_host module) - new_host_info = result_item.get('add_host', dict()) - self._add_host(new_host_info, iterator) - - elif 'add_group' in result_item: - # this task added a new group (group_by module) - self._add_group(original_host, result_item) - - elif 'ansible_facts' in result_item: - loop_var = 'item' - if original_task.loop_control: - loop_var = original_task.loop_control.loop_var or 'item' - - item = result_item.get(loop_var, None) - - if original_task.action == 'include_vars': - for (var_name, var_value) in iteritems(result_item['ansible_facts']): - # find the host we're actually refering too here, which may - # be a host that is not really in inventory at all - if original_task.delegate_to is not None and original_task.delegate_facts: - task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task) - self.add_tqm_variables(task_vars, play=iterator._play) - if item is not None: - task_vars[loop_var] = item - templar = Templar(loader=self._loader, variables=task_vars) - host_name = templar.template(original_task.delegate_to) - actual_host = self._inventory.get_host(host_name) - if actual_host is None: - actual_host = Host(name=host_name) - else: - actual_host = original_host - - if original_task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] - else: - host_list = [actual_host] + display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),)) + found = True + + # and if none were found, then we raise an error + if not found: + raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) + + + if 'add_host' in result_item: + # this task added a new host (add_host module) + new_host_info = result_item.get('add_host', dict()) + self._add_host(new_host_info, iterator) + + elif 'add_group' in result_item: + # this task added a new group (group_by module) + self._add_group(original_host, result_item) + + elif 'ansible_facts' in result_item: + loop_var = 'item' + if original_task.loop_control: + loop_var = original_task.loop_control.loop_var or 'item' + + item = result_item.get(loop_var, None) + + if original_task.action == 'include_vars': + for (var_name, var_value) in iteritems(result_item['ansible_facts']): + # find the host we're actually refering too here, which may + # be a host that is not really in inventory at all + if original_task.delegate_to is not None and original_task.delegate_facts: + task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task) + self.add_tqm_variables(task_vars, play=iterator._play) + if item is not None: + task_vars[loop_var] = item + templar = Templar(loader=self._loader, variables=task_vars) + host_name = templar.template(original_task.delegate_to) + actual_host = self._inventory.get_host(host_name) + if actual_host is None: + actual_host = Host(name=host_name) + else: + actual_host = original_host - for target_host in host_list: - self._variable_manager.set_host_variable(target_host, var_name, var_value) - else: if original_task.run_once: host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] else: - host_list = [original_host] + host_list = [actual_host] for target_host in host_list: - if original_task.action == 'set_fact': - self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) - else: - self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) + self._variable_manager.set_host_variable(target_host, var_name, var_value) + else: + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [original_host] - if 'diff' in task_result._result: - if self._diff: - self._tqm.send_callback('v2_on_file_diff', task_result) + for target_host in host_list: + if original_task.action == 'set_fact': + self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) + else: + self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) - if original_task.action not in ['include', 'include_role']: - self._tqm._stats.increment('ok', original_host.name) - if 'changed' in task_result._result and task_result._result['changed']: - self._tqm._stats.increment('changed', original_host.name) + if 'diff' in task_result._result: + if self._diff: + self._tqm.send_callback('v2_on_file_diff', task_result) - # finally, send the ok for this task - self._tqm.send_callback('v2_runner_on_ok', task_result) + if original_task.action not in ['include', 'include_role']: + self._tqm._stats.increment('ok', original_host.name) + if 'changed' in task_result._result and task_result._result['changed']: + self._tqm._stats.increment('changed', original_host.name) - self._pending_results -= 1 - if original_host.name in self._blocked_hosts: - del self._blocked_hosts[original_host.name] + # finally, send the ok for this task + self._tqm.send_callback('v2_runner_on_ok', task_result) - # If this is a role task, mark the parent role as being run (if - # the task was ok or failed, but not skipped or unreachable) - if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':? - # lookup the role in the ROLE_CACHE to make sure we're dealing - # with the correct object and mark it as executed - for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]): - if role_obj._uuid == original_task._role._uuid: - role_obj._had_task_run[original_host.name] = True + self._pending_results -= 1 + if original_host.name in self._blocked_hosts: + del self._blocked_hosts[original_host.name] - ret_results.append(task_result) + # If this is a role task, mark the parent role as being run (if + # the task was ok or failed, but not skipped or unreachable) + if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':? + # lookup the role in the ROLE_CACHE to make sure we're dealing + # with the correct object and mark it as executed + for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]): + if role_obj._uuid == original_task._role._uuid: + role_obj._had_task_run[original_host.name] = True - except Queue.Empty: - passes += 1 - if passes > 2: - break + ret_results.append(task_result) - if one_pass: + if one_pass or max_passes is not None and (cur_pass+1) >= max_passes: break + cur_pass += 1 + return ret_results def _wait_on_pending_results(self, iterator): diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index c6ef0a347b9..8f42ef81b57 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -263,17 +263,15 @@ class StrategyModule(StrategyBase): if run_once: break - # FIXME: probably not required here any more with the result proc - # having been removed, so there's no only a single result - # queue for the main thread - results += self._process_pending_results(iterator, one_pass=True) + results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1))) # go to next host/task group if skip_rest: continue display.debug("done queuing things up, now waiting for results queue to drain") - results += self._wait_on_pending_results(iterator) + if self._pending_results > 0: + results += self._wait_on_pending_results(iterator) host_results.extend(results) all_role_blocks = [] diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py index 49fd095bd49..faa6173eb48 100644 --- a/test/units/plugins/strategies/test_strategy_base.py +++ b/test/units/plugins/strategies/test_strategy_base.py @@ -45,16 +45,49 @@ class TestStrategyBase(unittest.TestCase): pass def test_strategy_base_init(self): + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put + mock_tqm = MagicMock(TaskQueueManager) - mock_tqm._final_q = MagicMock() + mock_tqm._final_q = mock_queue mock_tqm._options = MagicMock() mock_tqm._notified_handlers = {} mock_tqm._listening_handlers = {} strategy_base = StrategyBase(tqm=mock_tqm) + strategy_base.cleanup() def test_strategy_base_run(self): + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put + mock_tqm = MagicMock(TaskQueueManager) - mock_tqm._final_q = MagicMock() + mock_tqm._final_q = mock_queue mock_tqm._stats = MagicMock() mock_tqm._notified_handlers = {} mock_tqm._listening_handlers = {} @@ -87,8 +120,25 @@ class TestStrategyBase(unittest.TestCase): mock_tqm._unreachable_hosts = dict(host1=True) mock_iterator.get_failed_hosts.return_value = [] self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context, result=False), mock_tqm.RUN_UNREACHABLE_HOSTS) + strategy_base.cleanup() def test_strategy_base_get_hosts(self): + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put + mock_hosts = [] for i in range(0, 5): mock_host = MagicMock() @@ -100,7 +150,7 @@ class TestStrategyBase(unittest.TestCase): mock_inventory.get_hosts.return_value = mock_hosts mock_tqm = MagicMock() - mock_tqm._final_q = MagicMock() + mock_tqm._final_q = mock_queue mock_tqm._notified_handlers = {} mock_tqm._listening_handlers = {} mock_tqm.get_inventory.return_value = mock_inventory @@ -120,6 +170,7 @@ class TestStrategyBase(unittest.TestCase): mock_tqm._unreachable_hosts = ["host02"] self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:]) + strategy_base.cleanup() @patch.object(WorkerProcess, 'run') def test_strategy_base_queue_task(self, mock_worker): @@ -178,10 +229,13 @@ class TestStrategyBase(unittest.TestCase): raise Queue.Empty else: return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) mock_queue = MagicMock() mock_queue.empty.side_effect = _queue_empty mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put mock_tqm._final_q = mock_queue mock_tqm._stats = MagicMock() @@ -272,7 +326,7 @@ class TestStrategyBase(unittest.TestCase): strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 mock_iterator.is_failed.return_value = True - results = strategy_base._process_pending_results(iterator=mock_iterator) + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) self.assertEqual(strategy_base._pending_results, 0) @@ -306,7 +360,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - results = strategy_base._process_pending_results(iterator=mock_iterator) + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) self.assertNotIn('test01', strategy_base._blocked_hosts) @@ -314,7 +368,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo')))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - results = strategy_base._process_pending_results(iterator=mock_iterator) + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) self.assertNotIn('test01', strategy_base._blocked_hosts) @@ -322,7 +376,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler']))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - results = strategy_base._process_pending_results(iterator=mock_iterator) + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) self.assertNotIn('test01', strategy_base._blocked_hosts) @@ -341,6 +395,7 @@ class TestStrategyBase(unittest.TestCase): #queue_items.append(('bad')) #self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator) + strategy_base.cleanup() def test_strategy_base_load_included_file(self): fake_loader = DictDataLoader({ @@ -351,13 +406,30 @@ class TestStrategyBase(unittest.TestCase): """, }) + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put + mock_tqm = MagicMock() - mock_tqm._final_q = MagicMock() + mock_tqm._final_q = mock_queue mock_tqm._notified_handlers = {} mock_tqm._listening_handlers = {} strategy_base = StrategyBase(tqm=mock_tqm) strategy_base._loader = fake_loader + strategy_base.cleanup() mock_play = MagicMock() @@ -443,4 +515,5 @@ class TestStrategyBase(unittest.TestCase): result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context) finally: + strategy_base.cleanup() tqm.cleanup()