diff --git a/changelogs/fragments/simple-result-queue.yml b/changelogs/fragments/simple-result-queue.yml new file mode 100644 index 00000000000..300e1495cb3 --- /dev/null +++ b/changelogs/fragments/simple-result-queue.yml @@ -0,0 +1,3 @@ +bugfixes: +- Switch result queue from a ``multiprocessing.queues.Queue` to ``multiprocessing.queues.SimpleQueue``, primarily to allow properly handling + pickling errors, to prevent an infinite hang waiting for task results diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 43bd292cf57..c043137c95f 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -194,12 +194,27 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin # put the result on the result queue display.debug("sending task result for task %s" % self._task._uuid) - self._final_q.send_task_result( - self._host.name, - self._task._uuid, - executor_result, - task_fields=self._task.dump_attrs(), - ) + try: + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + executor_result, + task_fields=self._task.dump_attrs(), + ) + except Exception as e: + display.debug(f'failed to send task result ({e}), sending surrogate result') + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + # Overriding the task result, to represent the failure + { + 'failed': True, + 'msg': f'{e}', + 'exception': traceback.format_exc(), + }, + # The failure pickling may have been caused by the task attrs, omit for safety + {}, + ) display.debug("done sending task result for task %s" % self._task._uuid) except AnsibleConnectionFailure: diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 9af431de3de..eb3fd351602 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -76,15 +76,14 @@ class PromptSend: complete_input: t.Iterable[bytes] = None -class FinalQueue(multiprocessing.queues.Queue): +class FinalQueue(multiprocessing.queues.SimpleQueue): def __init__(self, *args, **kwargs): kwargs['ctx'] = multiprocessing_context - super(FinalQueue, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) def send_callback(self, method_name, *args, **kwargs): self.put( CallbackSend(method_name, *args, **kwargs), - block=False ) def send_task_result(self, *args, **kwargs): @@ -94,19 +93,16 @@ class FinalQueue(multiprocessing.queues.Queue): tr = TaskResult(*args, **kwargs) self.put( tr, - block=False ) def send_display(self, *args, **kwargs): self.put( DisplaySend(*args, **kwargs), - block=False ) def send_prompt(self, **kwargs): self.put( PromptSend(**kwargs), - block=False ) diff --git a/test/integration/targets/fork_safe_stdio/aliases b/test/integration/targets/fork_safe_stdio/aliases index e968db7203b..7761837eaab 100644 --- a/test/integration/targets/fork_safe_stdio/aliases +++ b/test/integration/targets/fork_safe_stdio/aliases @@ -1,3 +1,3 @@ shippable/posix/group3 context/controller -skip/macos +needs/target/test_utils diff --git a/test/integration/targets/fork_safe_stdio/runme.sh b/test/integration/targets/fork_safe_stdio/runme.sh index 4438c3fe90d..863582f3f25 100755 --- a/test/integration/targets/fork_safe_stdio/runme.sh +++ b/test/integration/targets/fork_safe_stdio/runme.sh @@ -7,7 +7,7 @@ echo "testing for stdio deadlock on forked workers (10s timeout)..." # Enable a callback that trips deadlocks on forked-child stdout, time out after 10s; forces running # in a pty, since that tends to be much slower than raw file I/O and thus more likely to trigger the deadlock. # Redirect stdout to /dev/null since it's full of non-printable garbage we don't want to display unless it failed -ANSIBLE_CALLBACKS_ENABLED=spewstdio SPEWSTDIO_ENABLED=1 python run-with-pty.py timeout 10s ansible-playbook -i hosts -f 5 test.yml > stdout.txt && RC=$? || RC=$? +ANSIBLE_CALLBACKS_ENABLED=spewstdio SPEWSTDIO_ENABLED=1 python run-with-pty.py ../test_utils/scripts/timeout.py -- 10 ansible-playbook -i hosts -f 5 test.yml > stdout.txt && RC=$? || RC=$? if [ $RC != 0 ]; then echo "failed; likely stdout deadlock. dumping raw output (may be very large)" diff --git a/test/integration/targets/result_pickle_error/action_plugins/result_pickle_error.py b/test/integration/targets/result_pickle_error/action_plugins/result_pickle_error.py new file mode 100644 index 00000000000..e8d712a337b --- /dev/null +++ b/test/integration/targets/result_pickle_error/action_plugins/result_pickle_error.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +# Copyright: Contributors to the Ansible project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.plugins.action import ActionBase +from jinja2 import Undefined + + +class ActionModule(ActionBase): + + def run(self, tmp=None, task_vars=None): + return {'obj': Undefined('obj')} diff --git a/test/integration/targets/result_pickle_error/aliases b/test/integration/targets/result_pickle_error/aliases new file mode 100644 index 00000000000..70fbe57e9b1 --- /dev/null +++ b/test/integration/targets/result_pickle_error/aliases @@ -0,0 +1,3 @@ +shippable/posix/group5 +context/controller +needs/target/test_utils diff --git a/test/integration/targets/result_pickle_error/runme.sh b/test/integration/targets/result_pickle_error/runme.sh new file mode 100755 index 00000000000..e2ec37b8956 --- /dev/null +++ b/test/integration/targets/result_pickle_error/runme.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +set -ux +export ANSIBLE_ROLES_PATH=../ + +is_timeout() { + rv=$? + if [ "$rv" == "124" ]; then + echo "***hang detected, this likely means the strategy never received a result for the task***" + fi + exit $rv +} + +trap "is_timeout" EXIT + +../test_utils/scripts/timeout.py -- 10 ansible-playbook -i ../../inventory runme.yml -v "$@" diff --git a/test/integration/targets/result_pickle_error/runme.yml b/test/integration/targets/result_pickle_error/runme.yml new file mode 100644 index 00000000000..605084985f4 --- /dev/null +++ b/test/integration/targets/result_pickle_error/runme.yml @@ -0,0 +1,7 @@ +- hosts: all + gather_facts: false + tasks: + - include_role: + name: result_pickle_error + # Just for caution loop 3 times to ensure no issues + loop: '{{ range(3) }}' diff --git a/test/integration/targets/result_pickle_error/tasks/main.yml b/test/integration/targets/result_pickle_error/tasks/main.yml new file mode 100644 index 00000000000..895475dd09a --- /dev/null +++ b/test/integration/targets/result_pickle_error/tasks/main.yml @@ -0,0 +1,14 @@ +- name: Ensure pickling error doesn't cause a hang + result_pickle_error: + ignore_errors: true + register: result + +- assert: + that: + - result.msg == expected_msg + - result is failed + vars: + expected_msg: "cannot pickle 'Undefined' object" + +- debug: + msg: Success, no hang diff --git a/test/integration/targets/test_utils/aliases b/test/integration/targets/test_utils/aliases new file mode 100644 index 00000000000..136c05e0d02 --- /dev/null +++ b/test/integration/targets/test_utils/aliases @@ -0,0 +1 @@ +hidden diff --git a/test/integration/targets/test_utils/scripts/timeout.py b/test/integration/targets/test_utils/scripts/timeout.py new file mode 100755 index 00000000000..f88f3e4e15c --- /dev/null +++ b/test/integration/targets/test_utils/scripts/timeout.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +import argparse +import subprocess +import sys + +parser = argparse.ArgumentParser() +parser.add_argument('duration', type=int) +parser.add_argument('command', nargs='+') +args = parser.parse_args() + +try: + p = subprocess.run( + ' '.join(args.command), + shell=True, + timeout=args.duration, + check=False, + ) + sys.exit(p.returncode) +except subprocess.TimeoutExpired: + sys.exit(124)