Moving result reading to a background thread

pull/17679/head
James Cammarata 8 years ago
parent dfb1c0647e
commit 5a57c66e3c

@ -0,0 +1,43 @@
# (c) 2016 - Red Hat, Inc. <support@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from multiprocessing import Lock
from ansible.module_utils.facts import Facts
if 'action_write_locks' not in globals():
# Do not initialize this more than once because it seems to bash
# the existing one. multiprocessing must be reloading the module
# when it forks?
action_write_locks = dict()
# Below is a Lock for use when we weren't expecting a named module.
# It gets used when an action plugin directly invokes a module instead
# of going through the strategies. Slightly less efficient as all
# processes with unexpected module names will wait on this lock
action_write_locks[None] = Lock()
# These plugins are called directly by action plugins (not going through
# a strategy). We precreate them here as an optimization
mods = set(p['name'] for p in Facts.PKG_MGRS)
mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
for mod_name in mods:
action_write_locks[mod_name] = Lock()

@ -36,7 +36,7 @@ from ansible.module_utils._text import to_bytes, to_text
# Must import strategy and use write_locks from there # Must import strategy and use write_locks from there
# If we import write_locks directly then we end up binding a # If we import write_locks directly then we end up binding a
# variable to the object and then it never gets updated. # variable to the object and then it never gets updated.
from ansible.plugins import strategy from ansible.executor import action_write_locks
try: try:
from __main__ import display from __main__ import display
@ -605,16 +605,16 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta
display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename) display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename)
zipdata = open(cached_module_filename, 'rb').read() zipdata = open(cached_module_filename, 'rb').read()
else: else:
if module_name in strategy.action_write_locks: if module_name in action_write_locks.action_write_locks:
display.debug('ANSIBALLZ: Using lock for %s' % module_name) display.debug('ANSIBALLZ: Using lock for %s' % module_name)
lock = strategy.action_write_locks[module_name] lock = action_write_locks.action_write_locks[module_name]
else: else:
# If the action plugin directly invokes the module (instead of # If the action plugin directly invokes the module (instead of
# going through a strategy) then we don't have a cross-process # going through a strategy) then we don't have a cross-process
# Lock specifically for this module. Use the "unexpected # Lock specifically for this module. Use the "unexpected
# module" lock instead # module" lock instead
display.debug('ANSIBALLZ: Using generic lock for %s' % module_name) display.debug('ANSIBALLZ: Using generic lock for %s' % module_name)
lock = strategy.action_write_locks[None] lock = action_write_locks.action_write_locks[None]
display.debug('ANSIBALLZ: Acquiring lock') display.debug('ANSIBALLZ: Acquiring lock')
with lock: with lock:

@ -285,6 +285,7 @@ class TaskQueueManager:
for host_name in iterator.get_failed_hosts(): for host_name in iterator.get_failed_hosts():
self._failed_hosts[host_name] = True self._failed_hosts[host_name] = True
strategy.cleanup()
self._cleanup_processes() self._cleanup_processes()
return play_return return play_return

@ -25,6 +25,7 @@ from ansible.compat.six import text_type
from ansible.errors import AnsibleError, AnsibleUndefinedVariable from ansible.errors import AnsibleError, AnsibleUndefinedVariable
from ansible.playbook.attribute import FieldAttribute from ansible.playbook.attribute import FieldAttribute
from ansible.template import Templar from ansible.template import Templar
from ansible.module_utils._text import to_native
class Conditional: class Conditional:
@ -72,7 +73,7 @@ class Conditional:
if not self._check_conditional(conditional, templar, all_vars): if not self._check_conditional(conditional, templar, all_vars):
return False return False
except Exception as e: except Exception as e:
raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (conditional, e), obj=ds) raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds)
return True return True

@ -19,14 +19,18 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import os
import threading
import time import time
from collections import deque
from multiprocessing import Lock from multiprocessing import Lock
from jinja2.exceptions import UndefinedError from jinja2.exceptions import UndefinedError
from ansible.compat.six.moves import queue as Queue from ansible.compat.six.moves import queue as Queue
from ansible.compat.six import iteritems, string_types from ansible.compat.six import iteritems, string_types
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor import action_write_locks
from ansible.executor.process.worker import WorkerProcess from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_result import TaskResult from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host from ansible.inventory.host import Host
@ -50,25 +54,6 @@ except ImportError:
__all__ = ['StrategyBase'] __all__ = ['StrategyBase']
if 'action_write_locks' not in globals():
# Do not initialize this more than once because it seems to bash
# the existing one. multiprocessing must be reloading the module
# when it forks?
action_write_locks = dict()
# Below is a Lock for use when we weren't expecting a named module.
# It gets used when an action plugin directly invokes a module instead
# of going through the strategies. Slightly less efficient as all
# processes with unexpected module names will wait on this lock
action_write_locks[None] = Lock()
# These plugins are called directly by action plugins (not going through
# a strategy). We precreate them here as an optimization
mods = set(p['name'] for p in Facts.PKG_MGRS)
mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
for mod_name in mods:
action_write_locks[mod_name] = Lock()
# TODO: this should probably be in the plugins/__init__.py, with # TODO: this should probably be in the plugins/__init__.py, with
# a smarter mechanism to set all of the attributes based on # a smarter mechanism to set all of the attributes based on
# the loaders created there # the loaders created there
@ -86,6 +71,25 @@ class SharedPluginLoaderObj:
self.module_loader = module_loader self.module_loader = module_loader
_sentinel = object()
def results_thread_main(strategy):
#print("RESULT THREAD STARTING: %s" % threading.current_thread())
while True:
try:
result = strategy._final_q.get()
if type(result) == object:
break
else:
#print("result in thread is: %s" % result._result)
strategy._results_lock.acquire()
strategy._results.append(result)
strategy._results_lock.release()
except (IOError, EOFError):
break
except Queue.Empty:
pass
#print("RESULT THREAD EXITED: %s" % threading.current_thread())
class StrategyBase: class StrategyBase:
''' '''
@ -104,6 +108,7 @@ class StrategyBase:
self._final_q = tqm._final_q self._final_q = tqm._final_q
self._step = getattr(tqm._options, 'step', False) self._step = getattr(tqm._options, 'step', False)
self._diff = getattr(tqm._options, 'diff', False) self._diff = getattr(tqm._options, 'diff', False)
# Backwards compat: self._display isn't really needed, just import the global display and use that. # Backwards compat: self._display isn't really needed, just import the global display and use that.
self._display = display self._display = display
@ -115,6 +120,18 @@ class StrategyBase:
# outstanding tasks still in queue # outstanding tasks still in queue
self._blocked_hosts = dict() self._blocked_hosts = dict()
self._results = deque()
self._results_lock = threading.Condition(threading.Lock())
#print("creating thread for strategy %s" % id(self))
self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
self._results_thread.daemon = True
self._results_thread.start()
def cleanup(self):
self._final_q.put(_sentinel)
self._results_thread.join()
def run(self, iterator, play_context, result=0): def run(self, iterator, play_context, result=0):
# save the failed/unreachable hosts, as the run_handlers() # save the failed/unreachable hosts, as the run_handlers()
# method will clear that information during its execution # method will clear that information during its execution
@ -174,10 +191,9 @@ class StrategyBase:
# tasks inside of play_iterator so we'd have to extract them to do it # tasks inside of play_iterator so we'd have to extract them to do it
# there. # there.
global action_write_locks if task.action not in action_write_locks.action_write_locks:
if task.action not in action_write_locks:
display.debug('Creating lock for %s' % task.action) display.debug('Creating lock for %s' % task.action)
action_write_locks[task.action] = Lock() action_write_locks.action_write_locks[task.action] = Lock()
# and then queue the new task # and then queue the new task
try: try:
@ -211,7 +227,7 @@ class StrategyBase:
return return
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
def _process_pending_results(self, iterator, one_pass=False): def _process_pending_results(self, iterator, one_pass=False, max_passes=None):
''' '''
Reads results off the final queue and takes appropriate action Reads results off the final queue and takes appropriate action
based on the result (executing callbacks, updating state, etc.). based on the result (executing callbacks, updating state, etc.).
@ -270,228 +286,232 @@ class StrategyBase:
else: else:
return False return False
passes = 0 cur_pass = 0
while not self._tqm._terminated: while True:
try: try:
task_result = self._final_q.get(timeout=0.001) self._results_lock.acquire()
original_host = get_original_host(task_result._host) task_result = self._results.pop()
original_task = iterator.get_original_task(original_host, task_result._task) except IndexError:
task_result._host = original_host break
task_result._task = original_task finally:
self._results_lock.release()
# send callbacks for 'non final' results
if '_ansible_retry' in task_result._result: original_host = get_original_host(task_result._host)
self._tqm.send_callback('v2_runner_retry', task_result) original_task = iterator.get_original_task(original_host, task_result._task)
continue task_result._host = original_host
elif '_ansible_item_result' in task_result._result: task_result._task = original_task
if task_result.is_failed() or task_result.is_unreachable():
self._tqm.send_callback('v2_runner_item_on_failed', task_result) # send callbacks for 'non final' results
elif task_result.is_skipped(): if '_ansible_retry' in task_result._result:
self._tqm.send_callback('v2_runner_item_on_skipped', task_result) self._tqm.send_callback('v2_runner_retry', task_result)
else: continue
if 'diff' in task_result._result: elif '_ansible_item_result' in task_result._result:
if self._diff: if task_result.is_failed() or task_result.is_unreachable():
self._tqm.send_callback('v2_on_file_diff', task_result) self._tqm.send_callback('v2_runner_item_on_failed', task_result)
self._tqm.send_callback('v2_runner_item_on_ok', task_result) elif task_result.is_skipped():
continue self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
else:
if 'diff' in task_result._result:
if self._diff:
self._tqm.send_callback('v2_on_file_diff', task_result)
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
continue
if original_task.register:
#print("^ REGISTERING RESULT %s" % original_task.register)
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [original_host]
clean_copy = strip_internal_keys(task_result._result)
if 'invocation' in clean_copy:
del clean_copy['invocation']
if original_task.register: for target_host in host_list:
self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
# all host status messages contain 2 entries: (msg, task_result)
role_ran = False
if task_result.is_failed():
role_ran = True
if not original_task.ignore_errors:
display.debug("marking %s as failed" % original_host.name)
if original_task.run_once: if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] # if we're using run_once, we have to fail every host here
for h in self._inventory.get_hosts(iterator._play.hosts):
if h.name not in self._tqm._unreachable_hosts:
state, _ = iterator.get_next_task_for_host(h, peek=True)
iterator.mark_host_failed(h)
state, new_task = iterator.get_next_task_for_host(h, peek=True)
else: else:
host_list = [original_host] iterator.mark_host_failed(original_host)
clean_copy = strip_internal_keys(task_result._result)
if 'invocation' in clean_copy:
del clean_copy['invocation']
for target_host in host_list:
self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
# all host status messages contain 2 entries: (msg, task_result)
role_ran = False
if task_result.is_failed():
role_ran = True
if not original_task.ignore_errors:
display.debug("marking %s as failed" % original_host.name)
if original_task.run_once:
# if we're using run_once, we have to fail every host here
for h in self._inventory.get_hosts(iterator._play.hosts):
if h.name not in self._tqm._unreachable_hosts:
state, _ = iterator.get_next_task_for_host(h, peek=True)
iterator.mark_host_failed(h)
state, new_task = iterator.get_next_task_for_host(h, peek=True)
else:
iterator.mark_host_failed(original_host)
# only add the host to the failed list officially if it has # only add the host to the failed list officially if it has
# been failed by the iterator # been failed by the iterator
if iterator.is_failed(original_host): if iterator.is_failed(original_host):
self._tqm._failed_hosts[original_host.name] = True self._tqm._failed_hosts[original_host.name] = True
self._tqm._stats.increment('failures', original_host.name) self._tqm._stats.increment('failures', original_host.name)
else:
# otherwise, we grab the current state and if we're iterating on
# the rescue portion of a block then we save the failed task in a
# special var for use within the rescue/always
state, _ = iterator.get_next_task_for_host(original_host, peek=True)
if state.run_state == iterator.ITERATING_RESCUE:
self._variable_manager.set_nonpersistent_facts(
original_host,
dict(
ansible_failed_task=original_task.serialize(),
ansible_failed_result=task_result._result,
),
)
else: else:
self._tqm._stats.increment('ok', original_host.name) # otherwise, we grab the current state and if we're iterating on
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) # the rescue portion of a block then we save the failed task in a
elif task_result.is_unreachable(): # special var for use within the rescue/always
self._tqm._unreachable_hosts[original_host.name] = True state, _ = iterator.get_next_task_for_host(original_host, peek=True)
self._tqm._stats.increment('dark', original_host.name) if state.run_state == iterator.ITERATING_RESCUE:
self._tqm.send_callback('v2_runner_on_unreachable', task_result) self._variable_manager.set_nonpersistent_facts(
elif task_result.is_skipped(): original_host,
self._tqm._stats.increment('skipped', original_host.name) dict(
self._tqm.send_callback('v2_runner_on_skipped', task_result) ansible_failed_task=original_task.serialize(),
ansible_failed_result=task_result._result,
),
)
else: else:
role_ran = True self._tqm._stats.increment('ok', original_host.name)
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors)
elif task_result.is_unreachable():
self._tqm._unreachable_hosts[original_host.name] = True
self._tqm._stats.increment('dark', original_host.name)
self._tqm.send_callback('v2_runner_on_unreachable', task_result)
elif task_result.is_skipped():
self._tqm._stats.increment('skipped', original_host.name)
self._tqm.send_callback('v2_runner_on_skipped', task_result)
else:
role_ran = True
if original_task.loop: if original_task.loop:
# this task had a loop, and has more than one result, so # this task had a loop, and has more than one result, so
# loop over all of them instead of a single result # loop over all of them instead of a single result
result_items = task_result._result.get('results', []) result_items = task_result._result.get('results', [])
else: else:
result_items = [ task_result._result ] result_items = [ task_result._result ]
for result_item in result_items: for result_item in result_items:
if '_ansible_notify' in result_item: if '_ansible_notify' in result_item:
if task_result.is_changed(): if task_result.is_changed():
# The shared dictionary for notified handlers is a proxy, which # The shared dictionary for notified handlers is a proxy, which
# does not detect when sub-objects within the proxy are modified. # does not detect when sub-objects within the proxy are modified.
# So, per the docs, we reassign the list so the proxy picks up and # So, per the docs, we reassign the list so the proxy picks up and
# notifies all other threads # notifies all other threads
for handler_name in result_item['_ansible_notify']: for handler_name in result_item['_ansible_notify']:
# Find the handler using the above helper. First we look up the # Find the handler using the above helper. First we look up the
# dependency chain of the current task (if it's from a role), otherwise # dependency chain of the current task (if it's from a role), otherwise
# we just look through the list of handlers in the current play/all # we just look through the list of handlers in the current play/all
# roles and use the first one that matches the notify name # roles and use the first one that matches the notify name
if handler_name in self._listening_handlers: if handler_name in self._listening_handlers:
for listening_handler_name in self._listening_handlers[handler_name]: for listening_handler_name in self._listening_handlers[handler_name]:
listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
if listening_handler is None: if listening_handler is None:
raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
if original_host not in self._notified_handlers[listening_handler]: if original_host not in self._notified_handlers[listening_handler]:
self._notified_handlers[listening_handler].append(original_host) self._notified_handlers[listening_handler].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
else:
target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
if target_handler is not None:
if original_host not in self._notified_handlers[target_handler]:
self._notified_handlers[target_handler].append(original_host)
# FIXME: should this be a callback?
display.vv("NOTIFIED HANDLER %s" % (handler_name,))
else: else:
target_handler = search_handler_blocks(handler_name, iterator._play.handlers) # As there may be more than one handler with the notified name as the
if target_handler is not None: # parent, so we just keep track of whether or not we found one at all
if original_host not in self._notified_handlers[target_handler]: found = False
for target_handler in self._notified_handlers:
if parent_handler_match(target_handler, handler_name):
self._notified_handlers[target_handler].append(original_host) self._notified_handlers[target_handler].append(original_host)
# FIXME: should this be a callback? display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),))
display.vv("NOTIFIED HANDLER %s" % (handler_name,)) found = True
else:
# As there may be more than one handler with the notified name as the # and if none were found, then we raise an error
# parent, so we just keep track of whether or not we found one at all if not found:
found = False raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
for target_handler in self._notified_handlers:
if parent_handler_match(target_handler, handler_name):
self._notified_handlers[target_handler].append(original_host) if 'add_host' in result_item:
display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),)) # this task added a new host (add_host module)
found = True new_host_info = result_item.get('add_host', dict())
self._add_host(new_host_info, iterator)
# and if none were found, then we raise an error
if not found: elif 'add_group' in result_item:
raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) # this task added a new group (group_by module)
self._add_group(original_host, result_item)
if 'add_host' in result_item: elif 'ansible_facts' in result_item:
# this task added a new host (add_host module) loop_var = 'item'
new_host_info = result_item.get('add_host', dict()) if original_task.loop_control:
self._add_host(new_host_info, iterator) loop_var = original_task.loop_control.loop_var or 'item'
elif 'add_group' in result_item: item = result_item.get(loop_var, None)
# this task added a new group (group_by module)
self._add_group(original_host, result_item) if original_task.action == 'include_vars':
for (var_name, var_value) in iteritems(result_item['ansible_facts']):
elif 'ansible_facts' in result_item: # find the host we're actually refering too here, which may
loop_var = 'item' # be a host that is not really in inventory at all
if original_task.loop_control: if original_task.delegate_to is not None and original_task.delegate_facts:
loop_var = original_task.loop_control.loop_var or 'item' task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task)
self.add_tqm_variables(task_vars, play=iterator._play)
item = result_item.get(loop_var, None) if item is not None:
task_vars[loop_var] = item
if original_task.action == 'include_vars': templar = Templar(loader=self._loader, variables=task_vars)
for (var_name, var_value) in iteritems(result_item['ansible_facts']): host_name = templar.template(original_task.delegate_to)
# find the host we're actually refering too here, which may actual_host = self._inventory.get_host(host_name)
# be a host that is not really in inventory at all if actual_host is None:
if original_task.delegate_to is not None and original_task.delegate_facts: actual_host = Host(name=host_name)
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task) else:
self.add_tqm_variables(task_vars, play=iterator._play) actual_host = original_host
if item is not None:
task_vars[loop_var] = item
templar = Templar(loader=self._loader, variables=task_vars)
host_name = templar.template(original_task.delegate_to)
actual_host = self._inventory.get_host(host_name)
if actual_host is None:
actual_host = Host(name=host_name)
else:
actual_host = original_host
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [actual_host]
for target_host in host_list:
self._variable_manager.set_host_variable(target_host, var_name, var_value)
else:
if original_task.run_once: if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else: else:
host_list = [original_host] host_list = [actual_host]
for target_host in host_list: for target_host in host_list:
if original_task.action == 'set_fact': self._variable_manager.set_host_variable(target_host, var_name, var_value)
self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) else:
else: if original_task.run_once:
self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [original_host]
if 'diff' in task_result._result: for target_host in host_list:
if self._diff: if original_task.action == 'set_fact':
self._tqm.send_callback('v2_on_file_diff', task_result) self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy())
else:
self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy())
if original_task.action not in ['include', 'include_role']: if 'diff' in task_result._result:
self._tqm._stats.increment('ok', original_host.name) if self._diff:
if 'changed' in task_result._result and task_result._result['changed']: self._tqm.send_callback('v2_on_file_diff', task_result)
self._tqm._stats.increment('changed', original_host.name)
# finally, send the ok for this task if original_task.action not in ['include', 'include_role']:
self._tqm.send_callback('v2_runner_on_ok', task_result) self._tqm._stats.increment('ok', original_host.name)
if 'changed' in task_result._result and task_result._result['changed']:
self._tqm._stats.increment('changed', original_host.name)
self._pending_results -= 1 # finally, send the ok for this task
if original_host.name in self._blocked_hosts: self._tqm.send_callback('v2_runner_on_ok', task_result)
del self._blocked_hosts[original_host.name]
# If this is a role task, mark the parent role as being run (if self._pending_results -= 1
# the task was ok or failed, but not skipped or unreachable) if original_host.name in self._blocked_hosts:
if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':? del self._blocked_hosts[original_host.name]
# lookup the role in the ROLE_CACHE to make sure we're dealing
# with the correct object and mark it as executed
for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]):
if role_obj._uuid == original_task._role._uuid:
role_obj._had_task_run[original_host.name] = True
ret_results.append(task_result) # If this is a role task, mark the parent role as being run (if
# the task was ok or failed, but not skipped or unreachable)
if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':?
# lookup the role in the ROLE_CACHE to make sure we're dealing
# with the correct object and mark it as executed
for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]):
if role_obj._uuid == original_task._role._uuid:
role_obj._had_task_run[original_host.name] = True
except Queue.Empty: ret_results.append(task_result)
passes += 1
if passes > 2:
break
if one_pass: if one_pass or max_passes is not None and (cur_pass+1) >= max_passes:
break break
cur_pass += 1
return ret_results return ret_results
def _wait_on_pending_results(self, iterator): def _wait_on_pending_results(self, iterator):

@ -263,17 +263,15 @@ class StrategyModule(StrategyBase):
if run_once: if run_once:
break break
# FIXME: probably not required here any more with the result proc results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1)))
# having been removed, so there's no only a single result
# queue for the main thread
results += self._process_pending_results(iterator, one_pass=True)
# go to next host/task group # go to next host/task group
if skip_rest: if skip_rest:
continue continue
display.debug("done queuing things up, now waiting for results queue to drain") display.debug("done queuing things up, now waiting for results queue to drain")
results += self._wait_on_pending_results(iterator) if self._pending_results > 0:
results += self._wait_on_pending_results(iterator)
host_results.extend(results) host_results.extend(results)
all_role_blocks = [] all_role_blocks = []

@ -45,16 +45,49 @@ class TestStrategyBase(unittest.TestCase):
pass pass
def test_strategy_base_init(self): def test_strategy_base_init(self):
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
mock_queue = MagicMock()
mock_queue.empty.side_effect = _queue_empty
mock_queue.get.side_effect = _queue_get
mock_queue.put.side_effect = _queue_put
mock_tqm = MagicMock(TaskQueueManager) mock_tqm = MagicMock(TaskQueueManager)
mock_tqm._final_q = MagicMock() mock_tqm._final_q = mock_queue
mock_tqm._options = MagicMock() mock_tqm._options = MagicMock()
mock_tqm._notified_handlers = {} mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {} mock_tqm._listening_handlers = {}
strategy_base = StrategyBase(tqm=mock_tqm) strategy_base = StrategyBase(tqm=mock_tqm)
strategy_base.cleanup()
def test_strategy_base_run(self): def test_strategy_base_run(self):
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
mock_queue = MagicMock()
mock_queue.empty.side_effect = _queue_empty
mock_queue.get.side_effect = _queue_get
mock_queue.put.side_effect = _queue_put
mock_tqm = MagicMock(TaskQueueManager) mock_tqm = MagicMock(TaskQueueManager)
mock_tqm._final_q = MagicMock() mock_tqm._final_q = mock_queue
mock_tqm._stats = MagicMock() mock_tqm._stats = MagicMock()
mock_tqm._notified_handlers = {} mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {} mock_tqm._listening_handlers = {}
@ -87,8 +120,25 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm._unreachable_hosts = dict(host1=True) mock_tqm._unreachable_hosts = dict(host1=True)
mock_iterator.get_failed_hosts.return_value = [] mock_iterator.get_failed_hosts.return_value = []
self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context, result=False), mock_tqm.RUN_UNREACHABLE_HOSTS) self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context, result=False), mock_tqm.RUN_UNREACHABLE_HOSTS)
strategy_base.cleanup()
def test_strategy_base_get_hosts(self): def test_strategy_base_get_hosts(self):
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
mock_queue = MagicMock()
mock_queue.empty.side_effect = _queue_empty
mock_queue.get.side_effect = _queue_get
mock_queue.put.side_effect = _queue_put
mock_hosts = [] mock_hosts = []
for i in range(0, 5): for i in range(0, 5):
mock_host = MagicMock() mock_host = MagicMock()
@ -100,7 +150,7 @@ class TestStrategyBase(unittest.TestCase):
mock_inventory.get_hosts.return_value = mock_hosts mock_inventory.get_hosts.return_value = mock_hosts
mock_tqm = MagicMock() mock_tqm = MagicMock()
mock_tqm._final_q = MagicMock() mock_tqm._final_q = mock_queue
mock_tqm._notified_handlers = {} mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {} mock_tqm._listening_handlers = {}
mock_tqm.get_inventory.return_value = mock_inventory mock_tqm.get_inventory.return_value = mock_inventory
@ -120,6 +170,7 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm._unreachable_hosts = ["host02"] mock_tqm._unreachable_hosts = ["host02"]
self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:]) self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:])
strategy_base.cleanup()
@patch.object(WorkerProcess, 'run') @patch.object(WorkerProcess, 'run')
def test_strategy_base_queue_task(self, mock_worker): def test_strategy_base_queue_task(self, mock_worker):
@ -178,10 +229,13 @@ class TestStrategyBase(unittest.TestCase):
raise Queue.Empty raise Queue.Empty
else: else:
return queue_items.pop() return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
mock_queue = MagicMock() mock_queue = MagicMock()
mock_queue.empty.side_effect = _queue_empty mock_queue.empty.side_effect = _queue_empty
mock_queue.get.side_effect = _queue_get mock_queue.get.side_effect = _queue_get
mock_queue.put.side_effect = _queue_put
mock_tqm._final_q = mock_queue mock_tqm._final_q = mock_queue
mock_tqm._stats = MagicMock() mock_tqm._stats = MagicMock()
@ -272,7 +326,7 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
mock_iterator.is_failed.return_value = True mock_iterator.is_failed.return_value = True
results = strategy_base._process_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result) self.assertEqual(results[0], task_result)
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
@ -306,7 +360,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
results = strategy_base._process_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
@ -314,7 +368,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo')))) queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo'))))
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
results = strategy_base._process_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
@ -322,7 +376,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler']))) queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler'])))
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
results = strategy_base._process_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
@ -341,6 +395,7 @@ class TestStrategyBase(unittest.TestCase):
#queue_items.append(('bad')) #queue_items.append(('bad'))
#self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator) #self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator)
strategy_base.cleanup()
def test_strategy_base_load_included_file(self): def test_strategy_base_load_included_file(self):
fake_loader = DictDataLoader({ fake_loader = DictDataLoader({
@ -351,13 +406,30 @@ class TestStrategyBase(unittest.TestCase):
""", """,
}) })
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
mock_queue = MagicMock()
mock_queue.empty.side_effect = _queue_empty
mock_queue.get.side_effect = _queue_get
mock_queue.put.side_effect = _queue_put
mock_tqm = MagicMock() mock_tqm = MagicMock()
mock_tqm._final_q = MagicMock() mock_tqm._final_q = mock_queue
mock_tqm._notified_handlers = {} mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {} mock_tqm._listening_handlers = {}
strategy_base = StrategyBase(tqm=mock_tqm) strategy_base = StrategyBase(tqm=mock_tqm)
strategy_base._loader = fake_loader strategy_base._loader = fake_loader
strategy_base.cleanup()
mock_play = MagicMock() mock_play = MagicMock()
@ -443,4 +515,5 @@ class TestStrategyBase(unittest.TestCase):
result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context) result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context)
finally: finally:
strategy_base.cleanup()
tqm.cleanup() tqm.cleanup()

Loading…
Cancel
Save