Switch from multiprocessing.Queue to SimpleQueue (#80838)

pull/80867/head
Matt Martz 1 year ago committed by GitHub
parent 202195f5e4
commit 61157f6a9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,3 @@
bugfixes:
- Switch result queue from a ``multiprocessing.queues.Queue` to ``multiprocessing.queues.SimpleQueue``, primarily to allow properly handling
pickling errors, to prevent an infinite hang waiting for task results

@ -194,12 +194,27 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
# put the result on the result queue
display.debug("sending task result for task %s" % self._task._uuid)
self._final_q.send_task_result(
self._host.name,
self._task._uuid,
executor_result,
task_fields=self._task.dump_attrs(),
)
try:
self._final_q.send_task_result(
self._host.name,
self._task._uuid,
executor_result,
task_fields=self._task.dump_attrs(),
)
except Exception as e:
display.debug(f'failed to send task result ({e}), sending surrogate result')
self._final_q.send_task_result(
self._host.name,
self._task._uuid,
# Overriding the task result, to represent the failure
{
'failed': True,
'msg': f'{e}',
'exception': traceback.format_exc(),
},
# The failure pickling may have been caused by the task attrs, omit for safety
{},
)
display.debug("done sending task result for task %s" % self._task._uuid)
except AnsibleConnectionFailure:

@ -76,15 +76,14 @@ class PromptSend:
complete_input: t.Iterable[bytes] = None
class FinalQueue(multiprocessing.queues.Queue):
class FinalQueue(multiprocessing.queues.SimpleQueue):
def __init__(self, *args, **kwargs):
kwargs['ctx'] = multiprocessing_context
super(FinalQueue, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def send_callback(self, method_name, *args, **kwargs):
self.put(
CallbackSend(method_name, *args, **kwargs),
block=False
)
def send_task_result(self, *args, **kwargs):
@ -94,19 +93,16 @@ class FinalQueue(multiprocessing.queues.Queue):
tr = TaskResult(*args, **kwargs)
self.put(
tr,
block=False
)
def send_display(self, *args, **kwargs):
self.put(
DisplaySend(*args, **kwargs),
block=False
)
def send_prompt(self, **kwargs):
self.put(
PromptSend(**kwargs),
block=False
)

@ -1,3 +1,3 @@
shippable/posix/group3
context/controller
skip/macos
needs/target/test_utils

@ -7,7 +7,7 @@ echo "testing for stdio deadlock on forked workers (10s timeout)..."
# Enable a callback that trips deadlocks on forked-child stdout, time out after 10s; forces running
# in a pty, since that tends to be much slower than raw file I/O and thus more likely to trigger the deadlock.
# Redirect stdout to /dev/null since it's full of non-printable garbage we don't want to display unless it failed
ANSIBLE_CALLBACKS_ENABLED=spewstdio SPEWSTDIO_ENABLED=1 python run-with-pty.py timeout 10s ansible-playbook -i hosts -f 5 test.yml > stdout.txt && RC=$? || RC=$?
ANSIBLE_CALLBACKS_ENABLED=spewstdio SPEWSTDIO_ENABLED=1 python run-with-pty.py ../test_utils/scripts/timeout.py -- 10 ansible-playbook -i hosts -f 5 test.yml > stdout.txt && RC=$? || RC=$?
if [ $RC != 0 ]; then
echo "failed; likely stdout deadlock. dumping raw output (may be very large)"

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
# Copyright: Contributors to the Ansible project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.plugins.action import ActionBase
from jinja2 import Undefined
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
return {'obj': Undefined('obj')}

@ -0,0 +1,3 @@
shippable/posix/group5
context/controller
needs/target/test_utils

@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -ux
export ANSIBLE_ROLES_PATH=../
is_timeout() {
rv=$?
if [ "$rv" == "124" ]; then
echo "***hang detected, this likely means the strategy never received a result for the task***"
fi
exit $rv
}
trap "is_timeout" EXIT
../test_utils/scripts/timeout.py -- 10 ansible-playbook -i ../../inventory runme.yml -v "$@"

@ -0,0 +1,7 @@
- hosts: all
gather_facts: false
tasks:
- include_role:
name: result_pickle_error
# Just for caution loop 3 times to ensure no issues
loop: '{{ range(3) }}'

@ -0,0 +1,14 @@
- name: Ensure pickling error doesn't cause a hang
result_pickle_error:
ignore_errors: true
register: result
- assert:
that:
- result.msg == expected_msg
- result is failed
vars:
expected_msg: "cannot pickle 'Undefined' object"
- debug:
msg: Success, no hang

@ -0,0 +1,21 @@
#!/usr/bin/env python
import argparse
import subprocess
import sys
parser = argparse.ArgumentParser()
parser.add_argument('duration', type=int)
parser.add_argument('command', nargs='+')
args = parser.parse_args()
try:
p = subprocess.run(
' '.join(args.command),
shell=True,
timeout=args.duration,
check=False,
)
sys.exit(p.returncode)
except subprocess.TimeoutExpired:
sys.exit(124)
Loading…
Cancel
Save