Resolve perf issue with async callback events (#76783)

* Don't send full task with async callback events. Fixes #76729

* Use args for async_status task, instead of k=v

* Make sure we send back the async task attrs for polling

* Add clog frag

* load is a staticmethod
pull/76822/head
Matt Martz 2 years ago committed by GitHub
parent d7d1bd6269
commit 96ce4804ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,3 @@
bugfixes:
- async - Improve performance of sending async callback events by never sending the full task through the queue
(https://github.com/ansible/ansible/issues/76729)

@ -621,14 +621,14 @@ class TaskExecutor:
self._final_q.send_callback(
'v2_runner_on_async_failed',
TaskResult(self._host.name,
self._task, # We send the full task here, because the controller knows nothing about it, the TE created it
self._task._uuid,
result,
task_fields=self._task.dump_attrs()))
else:
self._final_q.send_callback(
'v2_runner_on_async_ok',
TaskResult(self._host.name,
self._task, # We send the full task here, because the controller knows nothing about it, the TE created it
self._task._uuid,
result,
task_fields=self._task.dump_attrs()))
@ -797,7 +797,7 @@ class TaskExecutor:
# that (with a sleep for "poll" seconds between each retry) until the
# async time limit is exceeded.
async_task = Task().load(dict(action='async_status jid=%s' % async_jid, environment=self._task.environment))
async_task = Task.load(dict(action='async_status', args={'jid': async_jid}, environment=self._task.environment))
# FIXME: this is no longer the case, normal takes care of all, see if this can just be generalized
# Because this is an async task, the action handler is async. However,
@ -849,9 +849,9 @@ class TaskExecutor:
'v2_runner_on_async_poll',
TaskResult(
self._host.name,
async_task, # We send the full task here, because the controller knows nothing about it, the TE created it
async_task._uuid,
async_result,
task_fields=self._task.dump_attrs(),
task_fields=async_task.dump_attrs(),
),
)
@ -863,7 +863,7 @@ class TaskExecutor:
else:
# If the async task finished, automatically cleanup the temporary
# status file left behind.
cleanup_task = Task().load(
cleanup_task = Task.load(
{
'async_status': {
'jid': async_jid,

@ -48,6 +48,7 @@ from ansible.playbook.conditional import Conditional
from ansible.playbook.handler import Handler
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.included_file import IncludedFile
from ansible.playbook.task import Task
from ansible.playbook.task_include import TaskInclude
from ansible.plugins import loader as plugin_loader
from ansible.template import Templar
@ -471,9 +472,18 @@ class StrategyBase:
if isinstance(task_result._task, string_types):
# If the value is a string, it is ``Task._uuid``
queue_cache_entry = (task_result._host.name, task_result._task)
found_task = self._queued_task_cache.get(queue_cache_entry)['task']
original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
original_task._parent = found_task._parent
try:
found_task = self._queued_task_cache[queue_cache_entry]['task']
except KeyError:
# This should only happen due to an implicit task created by the
# TaskExecutor, restrict this behavior to the explicit use case
# of an implicit async_status task
if task_result._task_fields.get('action') != 'async_status':
raise
original_task = Task()
else:
original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
original_task._parent = found_task._parent
original_task.from_attrs(task_result._task_fields)
task_result._task = original_task

Loading…
Cancel
Save