pull/51985/head
Matt Martz 3 weeks ago
parent 802e95f580
commit 59fbd9ccc5
No known key found for this signature in database
GPG Key ID: 40832D88E9FC91D8

@ -90,6 +90,9 @@ class CallbackModule(CallbackBase):
if self._last_task_banner != result._task._uuid:
self._print_task_banner(result._task)
if result._task.action in C._ACTION_META:
self._display.vv(f'META: {result._result["msg"]}')
msg = "ok: [%s]" % (host_label,)
color = C.COLOR_OK
@ -116,6 +119,9 @@ class CallbackModule(CallbackBase):
if result._task.loop is not None and 'results' in result._result:
self._process_items(result)
if result._task.action in C._ACTION_META:
self._display.vv(f'META: {result._result["skip_reason"]}')
msg = "skipping: [%s]" % result._host.get_name()
if self._run_is_verbose(result):
msg += " => %s" % self._dump_results(result._result)

@ -347,7 +347,7 @@ class StrategyBase:
vars['ansible_current_hosts'] = self.get_hosts_remaining(play)
vars['ansible_failed_hosts'] = self.get_failed_hosts(play)
def _queue_task(self, host, task, task_vars, play_context):
def _queue_task(self, host, task, task_vars, play_context, iterator=None):
''' handles queueing the task up to be sent to a worker '''
display.debug("entering _queue_task() for %s/%s" % (host.name, task.action))
@ -406,14 +406,19 @@ class StrategyBase:
'play_context': play_context
}
# Pass WorkerProcess its strategy worker number so it can send an identifier along with intra-task requests
worker_prc = WorkerProcess(
self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, plugin_loader, self._cur_worker,
)
self._workers[self._cur_worker] = worker_prc
self._tqm.send_callback('v2_runner_on_start', host, task)
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
if task.action in C._ACTION_META:
if iterator is not None:
self._tqm.send_callback('v2_runner_on_start', host, task)
self._execute_meta(task, play_context, iterator, host)
else:
# Pass WorkerProcess its strategy worker number so it can send an identifier along with intra-task requests
worker_prc = WorkerProcess(
self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, plugin_loader, self._cur_worker,
)
self._workers[self._cur_worker] = worker_prc
self._tqm.send_callback('v2_runner_on_start', host, task)
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True
self._cur_worker += 1
@ -628,7 +633,7 @@ class StrategyBase:
)
else:
self._tqm._stats.increment('failures', original_host.name)
else:
elif not original_task.implicit:
self._tqm._stats.increment('ok', original_host.name)
self._tqm._stats.increment('ignored', original_host.name)
if 'changed' in task_result._result and task_result._result['changed']:
@ -640,7 +645,7 @@ class StrategyBase:
self._tqm._unreachable_hosts[original_host.name] = True
iterator._play._removed_hosts.append(original_host.name)
self._tqm._stats.increment('dark', original_host.name)
else:
elif not original_task.implicit:
self._tqm._stats.increment('ok', original_host.name)
self._tqm._stats.increment('ignored', original_host.name)
self._tqm.send_callback('v2_runner_on_unreachable', task_result)
@ -770,13 +775,14 @@ class StrategyBase:
if self._diff or getattr(original_task, 'diff', False):
self._tqm.send_callback('v2_on_file_diff', task_result)
if not isinstance(original_task, TaskInclude):
if not isinstance(original_task, TaskInclude) and not original_task.implicit:
self._tqm._stats.increment('ok', original_host.name)
if 'changed' in task_result._result and task_result._result['changed']:
self._tqm._stats.increment('changed', original_host.name)
# finally, send the ok for this task
self._tqm.send_callback('v2_runner_on_ok', task_result)
if not original_task.implicit:
self._tqm.send_callback('v2_runner_on_ok', task_result)
# register final results
if original_task.register:
@ -952,10 +958,6 @@ class StrategyBase:
skipped = False
msg = meta_action
skip_reason = '%s conditional evaluated to False' % meta_action
if isinstance(task, Handler):
self._tqm.send_callback('v2_playbook_on_handler_task_start', task)
else:
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
# These don't support "when" conditionals
if meta_action in ('noop', 'refresh_inventory', 'reset_connection') and task.when:
@ -1097,13 +1099,9 @@ class StrategyBase:
else:
result['changed'] = False
if not task.implicit:
header = skip_reason if skipped else msg
display.vv(f"META: {header}")
res = TaskResult(target_host, task, result)
if skipped:
self._tqm.send_callback('v2_runner_on_skipped', res)
with self._results_lock:
self._results.append(res)
return [res]
def _get_cached_role(self, task, play):

@ -180,23 +180,19 @@ class StrategyModule(StrategyBase):
del self._blocked_hosts[host_name]
continue
if task.action in C._ACTION_META:
self._execute_meta(task, play_context, iterator, target_host=host)
self._blocked_hosts[host_name] = False
else:
# handle step if needed, skip meta actions as they are used internally
if not self._step or self._take_step(task, host_name):
if task.any_errors_fatal:
display.warning("Using any_errors_fatal with the free strategy is not supported, "
"as tasks are executed independently on each host")
if isinstance(task, Handler):
self._tqm.send_callback('v2_playbook_on_handler_task_start', task)
else:
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
self._queue_task(host, task, task_vars, play_context)
# each task is counted as a worker being busy
workers_free -= 1
del task_vars
# handle step if needed, skip meta actions as they are used internally
if task.action in C._ACTION_META or not self._step or self._take_step(task, host_name):
if task.any_errors_fatal:
display.warning("Using any_errors_fatal with the free strategy is not supported, "
"as tasks are executed independently on each host")
if isinstance(task, Handler):
self._tqm.send_callback('v2_playbook_on_handler_task_start', task)
else:
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
self._queue_task(host, task, task_vars, play_context, iterator)
# each task is counted as a worker being busy
workers_free -= 1
del task_vars
else:
display.debug("%s is blocked, skipping for now" % host_name)

@ -143,7 +143,7 @@ class StrategyModule(StrategyBase):
display.debug("done getting the remaining hosts for this loop")
# queue up this task for each host in the inventory
callback_sent = False
prev_uuid = None
work_to_do = False
host_tasks = self._get_next_task_lockstep(hosts_left, iterator)
@ -194,51 +194,47 @@ class StrategyModule(StrategyBase):
# corresponding action plugin
action = None
# handle step if needed, skip meta actions as they are used internally
if task.action not in C._ACTION_META and self._step and choose_step:
if self._take_step(task):
choose_step = False
else:
skip_rest = True
break
run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False)
if task_action in C._ACTION_META:
# for the linear strategy, we run meta tasks just once and for
# all hosts currently being iterated over rather than one host
results.extend(self._execute_meta(task, play_context, iterator, host))
if task.args.get('_raw_params', None) not in ('noop', 'reset_connection', 'end_host', 'role_complete', 'flush_handlers'):
run_once = True
if (task.any_errors_fatal or run_once) and not task.ignore_errors:
any_errors_fatal = True
else:
# handle step if needed, skip meta actions as they are used internally
if self._step and choose_step:
if self._take_step(task):
choose_step = False
else:
skip_rest = True
break
run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False)
if (task.any_errors_fatal or run_once) and not task.ignore_errors:
any_errors_fatal = True
if not callback_sent:
display.debug("sending task start callback, copying the task so we can template it temporarily")
saved_name = task.name
display.debug("done copying, going to template now")
try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating")
except Exception:
# just ignore any errors during task name templating,
# we don't care if it just shows the raw name
display.debug("templating failed for some reason")
display.debug("here goes the callback...")
if isinstance(task, Handler):
self._tqm.send_callback('v2_playbook_on_handler_task_start', task)
else:
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
task.name = saved_name
callback_sent = True
display.debug("sending task start callback")
self._blocked_hosts[host.get_name()] = True
self._queue_task(host, task, task_vars, play_context)
del task_vars
if (task.any_errors_fatal or run_once) and not task.ignore_errors:
any_errors_fatal = True
if prev_uuid != task._uuid:
display.debug("sending task start callback, copying the task so we can template it temporarily")
saved_name = task.name
display.debug("done copying, going to template now")
try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating")
except Exception:
# just ignore any errors during task name templating,
# we don't care if it just shows the raw name
display.debug("templating failed for some reason")
display.debug("here goes the callback...")
if isinstance(task, Handler):
self._tqm.send_callback('v2_playbook_on_handler_task_start', task)
else:
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
task.name = saved_name
prev_uuid = task._uuid
display.debug("sending task start callback")
self._blocked_hosts[host.get_name()] = True
self._queue_task(host, task, task_vars, play_context, iterator)
del task_vars
if isinstance(task, Handler):
if run_once:

Loading…
Cancel
Save