Template "original_task" fields in _process_pending_results

Since we no longer use a post-validated task in _process_pending_results, we
need to be sure to template fields used in original_task as they are raw and
may contain variables.

This patch also moves the handler tracking to be per-uuid, not per-object.
Doing it per-object had implications for the above due to the fact that the
copy of the original task is now being used, so the only sure way is to track
based on the uuid instead.

Fixes #18289
pull/19606/head
James Cammarata 8 years ago
parent 7faa041636
commit dd0257b995

@ -138,8 +138,8 @@ class TaskQueueManager:
# then initialize it with the given handler list # then initialize it with the given handler list
for handler in handler_list: for handler in handler_list:
if handler not in self._notified_handlers: if handler._uuid not in self._notified_handlers:
self._notified_handlers[handler] = [] self._notified_handlers[handler._uuid] = []
if handler.listen: if handler.listen:
listeners = handler.listen listeners = handler.listen
if not isinstance(listeners, list): if not isinstance(listeners, list):
@ -147,13 +147,6 @@ class TaskQueueManager:
for listener in listeners: for listener in listeners:
if listener not in self._listening_handlers: if listener not in self._listening_handlers:
self._listening_handlers[listener] = [] self._listening_handlers[listener] = []
# if the handler has a name, we append it to the list of listening
# handlers, otherwise we use the uuid to avoid trampling on other
# nameless listeners
if handler.name:
self._listening_handlers[listener].append(handler.get_name())
else:
self._listening_handlers[listener].append(handler._uuid) self._listening_handlers[listener].append(handler._uuid)
def load_callbacks(self): def load_callbacks(self):

@ -244,7 +244,7 @@ class StrategyBase:
else: else:
return self._inventory.get_host(host_name) return self._inventory.get_host(host_name)
def search_handler_blocks(handler_name, handler_blocks): def search_handler_blocks_by_name(handler_name, handler_blocks):
for handler_block in handler_blocks: for handler_block in handler_blocks:
for handler_task in handler_block.block: for handler_task in handler_block.block:
if handler_task.name: if handler_task.name:
@ -268,10 +268,13 @@ class StrategyBase:
# set_fact or some other method, and we don't want to error # set_fact or some other method, and we don't want to error
# out unnecessarily # out unnecessarily
continue continue
else: return None
# if the handler name is not set, we check via the handlers uuid.
# this is mainly used by listening handlers only
if handler_name == handler_task._uuid: def search_handler_blocks_by_uuid(handler_uuid, handler_blocks):
for handler_block in handler_blocks:
for handler_task in handler_block.block:
if handler_uuid == handler_task._uuid:
return handler_task return handler_task
return None return None
@ -294,6 +297,11 @@ class StrategyBase:
else: else:
return False return False
# a Templar class to use for templating things later, as we're using
# original/non-validated objects here on the manager side. We set the
# variables in use later inside the loop below
templar = Templar(loader=self._loader)
cur_pass = 0 cur_pass = 0
while True: while True:
try: try:
@ -304,11 +312,24 @@ class StrategyBase:
finally: finally:
self._results_lock.release() self._results_lock.release()
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
original_host = get_original_host(task_result._host) original_host = get_original_host(task_result._host)
original_task = iterator.get_original_task(original_host, task_result._task) original_task = iterator.get_original_task(original_host, task_result._task)
task_result._host = original_host task_result._host = original_host
task_result._task = original_task task_result._task = original_task
# get the correct loop var for use later
if original_task.loop_control:
loop_var = original_task.loop_control.loop_var or 'item'
else:
loop_var = 'item'
# get the vars for this task/host pair, make them the active set of vars for our templar above
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=original_host, task=original_task)
self.add_tqm_variables(task_vars, play=iterator._play)
templar.set_available_variables(task_vars)
# send callbacks for 'non final' results # send callbacks for 'non final' results
if '_ansible_retry' in task_result._result: if '_ansible_retry' in task_result._result:
self._tqm.send_callback('v2_runner_retry', task_result) self._tqm.send_callback('v2_runner_retry', task_result)
@ -325,8 +346,9 @@ class StrategyBase:
self._tqm.send_callback('v2_runner_item_on_ok', task_result) self._tqm.send_callback('v2_runner_item_on_ok', task_result)
continue continue
run_once = templar.template(original_task.run_once)
if original_task.register: if original_task.register:
if original_task.run_once: if run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else: else:
host_list = [original_host] host_list = [original_host]
@ -342,9 +364,10 @@ class StrategyBase:
role_ran = False role_ran = False
if task_result.is_failed(): if task_result.is_failed():
role_ran = True role_ran = True
if not original_task.ignore_errors: ignore_errors = templar.template(original_task.ignore_errors)
if not ignore_errors:
display.debug("marking %s as failed" % original_host.name) display.debug("marking %s as failed" % original_host.name)
if original_task.run_once: if run_once:
# if we're using run_once, we have to fail every host here # if we're using run_once, we have to fail every host here
for h in self._inventory.get_hosts(iterator._play.hosts): for h in self._inventory.get_hosts(iterator._play.hosts):
if h.name not in self._tqm._unreachable_hosts: if h.name not in self._tqm._unreachable_hosts:
@ -377,7 +400,7 @@ class StrategyBase:
self._tqm._stats.increment('ok', original_host.name) self._tqm._stats.increment('ok', original_host.name)
if 'changed' in task_result._result and task_result._result['changed']: if 'changed' in task_result._result and task_result._result['changed']:
self._tqm._stats.increment('changed', original_host.name) self._tqm._stats.increment('changed', original_host.name)
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=ignore_errors)
elif task_result.is_unreachable(): elif task_result.is_unreachable():
self._tqm._unreachable_hosts[original_host.name] = True self._tqm._unreachable_hosts[original_host.name] = True
iterator._play._removed_hosts.append(original_host.name) iterator._play._removed_hosts.append(original_host.name)
@ -398,43 +421,46 @@ class StrategyBase:
for result_item in result_items: for result_item in result_items:
if '_ansible_notify' in result_item: if '_ansible_notify' in result_item:
print("GOT A NOTIFY")
if task_result.is_changed(): if task_result.is_changed():
# The shared dictionary for notified handlers is a proxy, which # The shared dictionary for notified handlers is a proxy, which
# does not detect when sub-objects within the proxy are modified. # does not detect when sub-objects within the proxy are modified.
# So, per the docs, we reassign the list so the proxy picks up and # So, per the docs, we reassign the list so the proxy picks up and
# notifies all other threads # notifies all other threads
for handler_name in result_item['_ansible_notify']: for handler_name in result_item['_ansible_notify']:
print("TRYING TO SEND NOTIFICATION TO HANDLER: %s" % handler_name)
found = False found = False
# Find the handler using the above helper. First we look up the # Find the handler using the above helper. First we look up the
# dependency chain of the current task (if it's from a role), otherwise # dependency chain of the current task (if it's from a role), otherwise
# we just look through the list of handlers in the current play/all # we just look through the list of handlers in the current play/all
# roles and use the first one that matches the notify name # roles and use the first one that matches the notify name
target_handler = search_handler_blocks(handler_name, iterator._play.handlers) target_handler = search_handler_blocks_by_name(handler_name, iterator._play.handlers)
if target_handler is not None: if target_handler is not None:
found = True found = True
if original_host not in self._notified_handlers[target_handler]: if original_host._uuid not in self._notified_handlers[target_handler._uuid]:
self._notified_handlers[target_handler].append(original_host) self._notified_handlers[target_handler._uuid].append(original_host)
# FIXME: should this be a callback? # FIXME: should this be a callback?
display.vv("NOTIFIED HANDLER %s" % (handler_name,)) display.vv("NOTIFIED HANDLER %s" % (handler_name,))
else: else:
# As there may be more than one handler with the notified name as the # As there may be more than one handler with the notified name as the
# parent, so we just keep track of whether or not we found one at all # parent, so we just keep track of whether or not we found one at all
for target_handler in self._notified_handlers: for target_handler_uuid in self._notified_handlers:
if parent_handler_match(target_handler, handler_name): target_handler = search_handler_blocks_by_uuid(target_handler_uuid, iterator._play.handlers)
self._notified_handlers[target_handler].append(original_host) if target_handler and parent_handler_match(target_handler, handler_name):
self._notified_handlers[target_handler._uuid].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),)) display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),))
found = True found = True
if handler_name in self._listening_handlers: if handler_name in self._listening_handlers:
for listening_handler_name in self._listening_handlers[handler_name]: for listening_handler_uuid in self._listening_handlers[handler_name]:
listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) listening_handler = search_handler_blocks_by_uuid(listening_handler_uuid, iterator._play.handlers)
if listening_handler is not None: if listening_handler is not None:
found = True found = True
else: else:
continue continue
if original_host not in self._notified_handlers[listening_handler]: if original_host not in self._notified_handlers[listening_handler._uuid]:
self._notified_handlers[listening_handler].append(original_host) self._notified_handlers[listening_handler._uuid].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) display.vv("NOTIFIED HANDLER %s" % (listening_handler.get_name(),))
# and if none were found, then we raise an error # and if none were found, then we raise an error
if not found: if not found:
@ -455,21 +481,11 @@ class StrategyBase:
elif 'ansible_facts' in result_item: elif 'ansible_facts' in result_item:
# set correct loop var
if original_task.loop_control:
loop_var = original_task.loop_control.loop_var or 'item'
else:
loop_var = 'item'
item = result_item.get(loop_var, None)
# if delegated fact and we are delegating facts, we need to change target host for them # if delegated fact and we are delegating facts, we need to change target host for them
if original_task.delegate_to is not None and original_task.delegate_facts: if original_task.delegate_to is not None and original_task.delegate_facts:
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=original_host, task=original_task) item = result_item.get(loop_var, None)
self.add_tqm_variables(task_vars, play=iterator._play)
if item is not None: if item is not None:
task_vars[loop_var] = item task_vars[loop_var] = item
templar = Templar(loader=self._loader, variables=task_vars)
host_name = templar.template(original_task.delegate_to) host_name = templar.template(original_task.delegate_to)
actual_host = self._inventory.get_host(host_name) actual_host = self._inventory.get_host(host_name)
if actual_host is None: if actual_host is None:
@ -482,7 +498,7 @@ class StrategyBase:
# find the host we're actually referring too here, which may # find the host we're actually referring too here, which may
# be a host that is not really in inventory at all # be a host that is not really in inventory at all
if original_task.run_once: if run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else: else:
host_list = [actual_host] host_list = [actual_host]
@ -490,7 +506,7 @@ class StrategyBase:
for target_host in host_list: for target_host in host_list:
self._variable_manager.set_host_variable(target_host, var_name, var_value) self._variable_manager.set_host_variable(target_host, var_name, var_value)
else: else:
if original_task.run_once: if run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else: else:
host_list = [actual_host] host_list = [actual_host]
@ -719,7 +735,7 @@ class StrategyBase:
# but this may take some work in the iterator and gets tricky when # but this may take some work in the iterator and gets tricky when
# we consider the ability of meta tasks to flush handlers # we consider the ability of meta tasks to flush handlers
for handler in handler_block.block: for handler in handler_block.block:
if handler in self._notified_handlers and len(self._notified_handlers[handler]): if handler._uuid in self._notified_handlers and len(self._notified_handlers[handler._uuid]):
result = self._do_handler_run(handler, handler.get_name(), iterator=iterator, play_context=play_context) result = self._do_handler_run(handler, handler.get_name(), iterator=iterator, play_context=play_context)
if not result: if not result:
break break
@ -738,7 +754,7 @@ class StrategyBase:
handler.name = saved_name handler.name = saved_name
if notified_hosts is None: if notified_hosts is None:
notified_hosts = self._notified_handlers[handler] notified_hosts = self._notified_handlers[handler._uuid]
run_once = False run_once = False
try: try:
@ -802,7 +818,7 @@ class StrategyBase:
continue continue
# wipe the notification list # wipe the notification list
self._notified_handlers[handler] = [] self._notified_handlers[handler._uuid] = []
display.debug("done running handlers, result is: %s" % result) display.debug("done running handlers, result is: %s" % result)
return result return result

@ -250,15 +250,20 @@ class TestStrategyBase(unittest.TestCase):
mock_task = MagicMock() mock_task = MagicMock()
mock_task._role = None mock_task._role = None
mock_task._parent = None
mock_task.ignore_errors = False mock_task.ignore_errors = False
mock_task._uuid = uuid.uuid4() mock_task._uuid = uuid.uuid4()
mock_task.loop = None mock_task.loop = None
mock_task.copy.return_value = mock_task
mock_handler_task = MagicMock(Handler) mock_handler_task = MagicMock(Handler)
mock_handler_task.name = 'test handler' mock_handler_task.name = 'test handler'
mock_handler_task.action = 'foo' mock_handler_task.action = 'foo'
mock_handler_task._parent = None
mock_handler_task.get_name.return_value = "test handler" mock_handler_task.get_name.return_value = "test handler"
mock_handler_task.has_triggered.return_value = False mock_handler_task.has_triggered.return_value = False
mock_handler_task._uuid = 'xxxxxxxxxxxxx'
mock_handler_task.copy.return_value = mock_handler_task
mock_iterator = MagicMock() mock_iterator = MagicMock()
mock_iterator._play = mock_play mock_iterator._play = mock_play
@ -272,7 +277,7 @@ class TestStrategyBase(unittest.TestCase):
mock_handler_block.always = [] mock_handler_block.always = []
mock_play.handlers = [mock_handler_block] mock_play.handlers = [mock_handler_block]
mock_tqm._notified_handlers = {mock_handler_task: []} mock_tqm._notified_handlers = {mock_handler_task._uuid: []}
mock_tqm._listening_handlers = {} mock_tqm._listening_handlers = {}
mock_group = MagicMock() mock_group = MagicMock()
@ -298,6 +303,7 @@ class TestStrategyBase(unittest.TestCase):
mock_var_mgr = MagicMock() mock_var_mgr = MagicMock()
mock_var_mgr.set_host_variable.return_value = None mock_var_mgr.set_host_variable.return_value = None
mock_var_mgr.set_host_facts.return_value = None mock_var_mgr.set_host_facts.return_value = None
mock_var_mgr.get_vars.return_value = dict()
strategy_base = StrategyBase(tqm=mock_tqm) strategy_base = StrategyBase(tqm=mock_tqm)
strategy_base._inventory = mock_inventory strategy_base._inventory = mock_inventory
@ -307,7 +313,7 @@ class TestStrategyBase(unittest.TestCase):
def _has_dead_workers(): def _has_dead_workers():
return False return False
strategy_base._tqm.has_dead_workers = _has_dead_workers strategy_base._tqm.has_dead_workers.side_effect = _has_dead_workers
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0) self.assertEqual(len(results), 0)
@ -380,8 +386,8 @@ class TestStrategyBase(unittest.TestCase):
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
self.assertIn(mock_handler_task, strategy_base._notified_handlers) self.assertIn(mock_handler_task._uuid, strategy_base._notified_handlers)
self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task]) self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task._uuid])
#queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar')) #queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
#results = strategy_base._process_pending_results(iterator=mock_iterator) #results = strategy_base._process_pending_results(iterator=mock_iterator)
@ -440,6 +446,7 @@ class TestStrategyBase(unittest.TestCase):
mock_task = MagicMock() mock_task = MagicMock()
mock_task._block = mock_block mock_task._block = mock_block
mock_task._role = None mock_task._role = None
mock_task._parent = None
mock_iterator = MagicMock() mock_iterator = MagicMock()
mock_iterator.mark_host_failed.return_value = None mock_iterator.mark_host_failed.return_value = None
@ -467,6 +474,8 @@ class TestStrategyBase(unittest.TestCase):
mock_handler_task.has_triggered.return_value = False mock_handler_task.has_triggered.return_value = False
mock_handler_task.listen = None mock_handler_task.listen = None
mock_handler_task._role = None mock_handler_task._role = None
mock_handler_task._parent = None
mock_handler_task._uuid = 'xxxxxxxxxxxxxxxx'
mock_handler = MagicMock() mock_handler = MagicMock()
mock_handler.block = [mock_handler_task] mock_handler.block = [mock_handler_task]
@ -508,7 +517,7 @@ class TestStrategyBase(unittest.TestCase):
strategy_base = StrategyBase(tqm=tqm) strategy_base = StrategyBase(tqm=tqm)
strategy_base._inventory = mock_inventory strategy_base._inventory = mock_inventory
strategy_base._notified_handlers = {mock_handler_task: [mock_host]} strategy_base._notified_handlers = {mock_handler_task._uuid: [mock_host]}
task_result = TaskResult(Host('host01'), Handler(), dict(changed=False)) task_result = TaskResult(Host('host01'), Handler(), dict(changed=False))
tqm._final_q.put(task_result) tqm._final_q.put(task_result)

Loading…
Cancel
Save