From 599da0689a2697b735d032630aebb003f37aa8d9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 27 Jan 2019 03:58:05 +0000 Subject: [PATCH] issue #477 / ansible: avoid a race in async job startup. Ansible 2.3/Python 2.4 work revealed there is no guarantee a slow target will have written the initial job status file out before a fast controller makes an initial check for it. Therefore, provide AsyncRunner with a sender it should send a message to when the initial job file has been written. As a bonus, also catch and report exceptions happening early in AsyncRunner, rather than leaving them to end up in -vvv output. --- ansible_mitogen/planner.py | 42 ++++++++++++------- ansible_mitogen/target.py | 23 ++++++++-- .../integration/async/runner_one_job.yml | 7 +++- 3 files changed, 52 insertions(+), 20 deletions(-) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 80cf5f8b..f3e4500e 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -46,6 +46,7 @@ from ansible.executor import module_common import ansible.errors import ansible.module_utils import mitogen.core +import mitogen.select import ansible_mitogen.loaders import ansible_mitogen.parsing @@ -416,22 +417,33 @@ def _invoke_async_task(invocation, planner): job_id = '%016x' % random.randint(0, 2**64) context = invocation.connection.spawn_isolated_child() _propagate_deps(invocation, planner, context) - context.call_no_reply( - ansible_mitogen.target.run_module_async, - job_id=job_id, - timeout_secs=invocation.timeout_secs, - kwargs=planner.get_kwargs(), - ) - return { - 'stdout': json.dumps({ - # modules/utilities/logic/async_wrapper.py::_run_module(). - 'changed': True, - 'started': 1, - 'finished': 0, - 'ansible_job_id': job_id, - }) - } + with mitogen.core.Receiver(context.router) as started_recv: + call_recv = context.call_async( + ansible_mitogen.target.run_module_async, + job_id=job_id, + timeout_secs=invocation.timeout_secs, + started_sender=started_recv.to_sender(), + kwargs=planner.get_kwargs(), + ) + + # Wait for run_module_async() to crash, or for AsyncRunner to indicate + # the job file has been written. + for msg in mitogen.select.Select([started_recv, call_recv]): + if msg.receiver is call_recv: + # It can only be an exception. + raise msg.unpickle() + break + + return { + 'stdout': json.dumps({ + # modules/utilities/logic/async_wrapper.py::_run_module(). + 'changed': True, + 'started': 1, + 'finished': 0, + 'ansible_job_id': job_id, + }) + } def _invoke_isolated_task(invocation, planner): diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 0db9af75..84fd3359 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -430,9 +430,10 @@ def _get_async_dir(): class AsyncRunner(object): - def __init__(self, job_id, timeout_secs, econtext, kwargs): + def __init__(self, job_id, timeout_secs, started_sender, econtext, kwargs): self.job_id = job_id self.timeout_secs = timeout_secs + self.started_sender = started_sender self.econtext = econtext self.kwargs = kwargs self._timed_out = False @@ -515,6 +516,7 @@ class AsyncRunner(object): 'finished': 0, 'pid': os.getpid() }) + self.started_sender.send(True) if self.timeout_secs > 0: self._install_alarm() @@ -550,13 +552,26 @@ class AsyncRunner(object): @mitogen.core.takes_econtext -def run_module_async(kwargs, job_id, timeout_secs, econtext): +def run_module_async(kwargs, job_id, timeout_secs, started_sender, econtext): """ Execute a module with its run status and result written to a file, terminating on the process on completion. This function must run in a child forked using :func:`create_fork_child`. - """ - arunner = AsyncRunner(job_id, timeout_secs, econtext, kwargs) + + @param mitogen.core.Sender started_sender: + A sender that will receive :data:`True` once the job has reached a + point where its initial job file has been written. This is required to + avoid a race where an overly eager controller can check for a task + before it has reached that point in execution, which is possible at + least on Python 2.4, where forking is not available for async tasks. + """ + arunner = AsyncRunner( + job_id, + timeout_secs, + started_sender, + econtext, + kwargs + ) arunner.run() diff --git a/tests/ansible/integration/async/runner_one_job.yml b/tests/ansible/integration/async/runner_one_job.yml index 04ffc5ea..19fba7de 100644 --- a/tests/ansible/integration/async/runner_one_job.yml +++ b/tests/ansible/integration/async/runner_one_job.yml @@ -40,7 +40,6 @@ - result1.cmd == "sleep 1;\n echo alldone" - result1.delta|length == 14 - result1.start|length == 26 - - result1.failed == False - result1.finished == 1 - result1.rc == 0 - result1.start|length == 26 @@ -48,3 +47,9 @@ - result1.stderr_lines == [] - result1.stdout == "alldone" - result1.stdout_lines == ["alldone"] + + - assert: + that: + - result1.failed == False + when: ansible_version.full > '2.4' +