From d33ef1866e649cd5ef53aac9b5cb4bf41c4b72e3 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 03:06:18 +0000 Subject: [PATCH 01/11] ansible: wrap socket calls in io_op() Breaks under signal stress test. --- ansible_mitogen/process.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index c4f58310..d61de0a2 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -110,7 +110,7 @@ class MuxProcess(object): if cls.child_pid: cls.child_sock.close() cls.child_sock = None - cls.worker_sock.recv(1) + mitogen.core.io_op(cls.worker_sock.recv, 1) else: cls.worker_sock.close() cls.worker_sock = None @@ -128,9 +128,9 @@ class MuxProcess(object): self._setup_services() # Let the parent know our listening socket is ready. - self.child_sock.send('1') + mitogen.core.io_op(self.child_sock.send, '1') # Block until the socket is closed, which happens on parent exit. - self.child_sock.recv(1) + mitogen.core.io_op(self.child_sock.recv, 1) def _setup_master(self): """ From 6377f2d69c95691eaa3965a6660670984deb41c9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 03:06:59 +0000 Subject: [PATCH 02/11] issue #257: split pool shutdown and join. --- ansible_mitogen/process.py | 19 +++++++++++++++---- mitogen/service.py | 6 +++++- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index d61de0a2..ff637299 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -30,6 +30,7 @@ from __future__ import absolute_import import errno import logging import os +import signal import socket import sys @@ -140,6 +141,7 @@ class MuxProcess(object): self.router.responder.whitelist_prefix('ansible') self.router.responder.whitelist_prefix('ansible_mitogen') mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown) + mitogen.core.listen(self.router.broker, 'exit', self.on_broker_exit) self.listener = mitogen.unix.Listener( router=self.router, path=self.unix_listener_path, @@ -168,14 +170,23 @@ class MuxProcess(object): def on_broker_shutdown(self): """ - Respond to the Router shutdown (indirectly triggered through exit of - the main thread) by unlinking the listening socket. Ideally this would - happen explicitly, but Ansible provides no hook to allow it. + Respond to broker shutdown by beginning service pool shutdown. Do not + join on the pool yet, since that would block the broker thread which + then cannot clean up pending handlers, which is required for the + threads to exit gracefully. """ - self.pool.stop() + self.pool.stop(join=False) try: os.unlink(self.listener.path) except OSError, e: # Prevent a shutdown race with the parent process. if e.args[0] != errno.ENOENT: raise + + def on_broker_exit(self): + """ + Respond to the broker thread about to exit by sending SIGTERM to + ourself. In future this should gracefully join the pool, but TERM is + fine for now. + """ + os.kill(os.getpid(), signal.SIGTERM) diff --git a/mitogen/service.py b/mitogen/service.py index 62180e33..dd8ac5cb 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -456,9 +456,13 @@ class Pool(object): closed = False - def stop(self): + def stop(self, join=True): self.closed = True self._select.close() + if join: + self.join() + + def join(self): for th in self._threads: th.join() for invoker in self._invoker_by_name.itervalues(): From d5c4333b9e00520fb95bcbb5b7433288090706d3 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 04:46:24 +0000 Subject: [PATCH 03/11] debug: functions for triggering EINTR --- mitogen/debug.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/mitogen/debug.py b/mitogen/debug.py index 95f7db3e..312c8d9a 100644 --- a/mitogen/debug.py +++ b/mitogen/debug.py @@ -48,6 +48,15 @@ LOG = logging.getLogger(__name__) _last = None +def enable_evil_interrupts(): + signal.signal(signal.SIGALRM, (lambda a, b: None)) + signal.setitimer(signal.ITIMER_REAL, 0.01, 0.01) + + +def disable_evil_interrupts(): + signal.setitimer(signal.ITIMER_REAL, 0, 0) + + def _hex(n): return '%08x' % n From 1d96d80e8dfd4bab4b621688e05e222c8ce60464 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 04:50:58 +0000 Subject: [PATCH 04/11] tests: osx_setup.yml missing line --- tests/ansible/osx_setup.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/ansible/osx_setup.yml b/tests/ansible/osx_setup.yml index 655c7605..7a6ff23f 100644 --- a/tests/ansible/osx_setup.yml +++ b/tests/ansible/osx_setup.yml @@ -35,6 +35,7 @@ - has_sudo_pubkey - require_tty - pw_required + - readonly_homedir - require_tty_pw_required - slow_user when: ansible_system != 'Darwin' From ffc7306cf899ab19040ac83f6f8a3bbdb55d41a7 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 05:00:42 +0000 Subject: [PATCH 05/11] tests: better runner_two_simultaneous_jobs.yml. --- .../async/runner_two_simultaneous_jobs.yml | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml b/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml index 51ceef2f..9474263b 100644 --- a/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml +++ b/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml @@ -6,16 +6,20 @@ # Start 2 duplicate jobs, verify they run concurrently. + - file: + path: /tmp/flurp + state: absent + - name: create semaphore file and sleep for 5 seconds. shell: | exec 2>/dev/null; bash -c ' - echo im_alive $$ > /tmp/flurp + echo im_alive $$ > /tmp/flurp; sleep 60; '; rm -f /tmp/flurp; echo alldone - async: 1000 + async: 30 poll: 0 register: job1 @@ -24,30 +28,29 @@ # below compltes quickly. - name: verify semaphore file exists while this job exists. shell: | - [ -f /tmp/flurp ] && { - read im_alive pid < /tmp/flurp - echo $im_alive - kill $pid &>/dev/null - } - async: 1000 + while [ ! -f /tmp/flurp ]; do sleep 0.1; done; + read im_alive pid < /tmp/flurp + echo $im_alive + kill $pid &>/dev/null + async: 30 poll: 0 register: job2 - - name: (job1) busy-poll up to 100000 times + - name: (job1) poll async_status: jid: "{{job1.ansible_job_id}}" register: result1 until: result1.finished - retries: 100000 - delay: 0 + retries: 5 + delay: 1 - - name: (job2) busy-poll up to 100000 times + - name: (job2) poll async_status: jid: "{{job2.ansible_job_id}}" register: result2 until: result2.finished - retries: 100000 - delay: 0 + retries: 5 + delay: 1 - assert: that: From 42797d5cff333069607b2e9085a936c2d67133af Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 13:40:14 +0100 Subject: [PATCH 06/11] try to catch EINTR on travis --- .travis/ansible_tests.sh | 2 +- tests/ansible/all.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis/ansible_tests.sh b/.travis/ansible_tests.sh index b5162ae0..38c15090 100755 --- a/.travis/ansible_tests.sh +++ b/.travis/ansible_tests.sh @@ -59,7 +59,7 @@ echo travis_fold:end:job_setup echo travis_fold:start:ansible -/usr/bin/time ./run_ansible_playbook.sh \ +/usr/bin/time ./run_ansible_playbook.sh -vvv \ all.yml \ -i "${TMPDIR}/hosts" echo travis_fold:end:ansible diff --git a/tests/ansible/all.yml b/tests/ansible/all.yml index a68831f7..f1615a1d 100644 --- a/tests/ansible/all.yml +++ b/tests/ansible/all.yml @@ -1,3 +1,3 @@ -- import_playbook: regression/all.yml +#- import_playbook: regression/all.yml - import_playbook: integration/all.yml From fbb67e837e6f4338275a386291f7e4003decfa42 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 15:03:34 +0100 Subject: [PATCH 07/11] tests: import nice_stdout plugin --- tests/ansible/ansible.cfg | 1 + tests/ansible/lib/callback/nice_stdout.py | 54 +++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 tests/ansible/lib/callback/nice_stdout.py diff --git a/tests/ansible/ansible.cfg b/tests/ansible/ansible.cfg index 437e67b7..eeb82109 100644 --- a/tests/ansible/ansible.cfg +++ b/tests/ansible/ansible.cfg @@ -4,6 +4,7 @@ gathering = explicit strategy_plugins = ../../ansible_mitogen/plugins/strategy action_plugins = lib/action callback_plugins = lib/callback +stdout_callback = nice_stdout library = lib/modules # module_utils = lib/module_utils retry_files_enabled = False diff --git a/tests/ansible/lib/callback/nice_stdout.py b/tests/ansible/lib/callback/nice_stdout.py new file mode 100644 index 00000000..08a3757b --- /dev/null +++ b/tests/ansible/lib/callback/nice_stdout.py @@ -0,0 +1,54 @@ +from __future__ import unicode_literals +import io + +try: + from ansible.plugins import callback_loader +except ImportError: + from ansible.plugins.loader import callback_loader + + +def printi(tio, obj, key=None, indent=0): + def write(s, *args): + if args: + s %= args + tio.write(' ' * indent) + if key is not None: + tio.write('%s: ' % (key,)) + tio.write(s) + tio.write('\n') + + if isinstance(obj, (list, tuple)): + write('[') + for i, obj2 in enumerate(obj): + printi(tio, obj2, key=i, indent=indent+1) + key = None + write(']') + elif isinstance(obj, dict): + write('{') + for key2, obj2 in sorted(obj.iteritems()): + if not (key2.startswith('_ansible_') or + key2.endswith('_lines')): + printi(tio, obj2, key=key2, indent=indent+1) + key = None + write('}') + elif isinstance(obj, basestring): + if isinstance(obj, str): + obj = obj.decode('utf-8', 'replace') + for line in obj.splitlines(): + write('%s', line.rstrip('\r\n')) + else: + write('%r', obj) + + +DefaultModule = callback_loader.get('default', class_only=True) + +class CallbackModule(DefaultModule): + def _dump_results(self, result, *args, **kwargs): + try: + tio = io.StringIO() + printi(tio, result) + return tio.getvalue().encode('ascii', 'replace') + except: + import traceback + traceback.print_exc() + raise From 45b748833daa6031674c7e0240a4bf5b138d7500 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 15:25:11 +0100 Subject: [PATCH 08/11] ansible: don't randomly fail due to temp directory cleanup. Happens about 1 time in 3 when async task times out. --- ansible_mitogen/runner.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index e14d26bd..7155aee1 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -38,6 +38,7 @@ how to build arguments for it, preseed related data, etc. from __future__ import absolute_import import cStringIO import ctypes +import errno import imp import json import logging @@ -148,7 +149,7 @@ class Runner(object): implementation simply restores the original environment. """ self._env.revert() - self._cleanup_temp() + self._try_cleanup_temp() def _cleanup_temp(self): """ @@ -162,6 +163,20 @@ class Runner(object): LOG.debug('Deleting %r', path) ansible_mitogen.target.prune_tree(path) + def _try_cleanup_temp(self): + """ + During broker shutdown triggered by async task timeout or loss of + connection to the parent, it is possible for prune_tree() in + target.py::_on_broker_shutdown() to run before _cleanup_temp(), so skip + cleanup if the directory or a file disappears from beneath us. + """ + try: + self._cleanup_temp() + except (IOError, OSError) as e: + if e.args[0] == errno.ENOENT: + return + raise + def _run(self): """ The _run() method is expected to return a dictionary in the form of From 205052ed9099c115052dbdb2a05a264dcef19fd0 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 15:26:14 +0100 Subject: [PATCH 09/11] service: fix SerializedInvoker CallError handling. This cutpaste needs refactored. Ensure the caller receives a copy of the exception. --- mitogen/service.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mitogen/service.py b/mitogen/service.py index dd8ac5cb..ffb45649 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -271,9 +271,12 @@ class SerializedInvoker(Invoker): method_name, kwargs, msg = tup try: super(SerializedInvoker, self).invoke(method_name, kwargs, msg) + except mitogen.core.CallError: + e = sys.exc_info()[1] + LOG.warning('%r: call error: %s: %s', self, msg, e) + msg.reply(e) except Exception: - LOG.exception('%r: while invoking %r of %r', - self, method_name, self.service) + LOG.exception('%r: while invoking %s()', self, method_name) msg.reply(mitogen.core.Message.dead()) def invoke(self, method_name, kwargs, msg): From 08538d327b3f3664ce5c1d5bbf4963f5699831f7 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 15:27:06 +0100 Subject: [PATCH 10/11] ansible: don't write failed job result after async timeout. The failed job result is likely to be "interrupted system call", and we don't want that to overwrite the SIGALRM handler's "the task timed out", so just discard it. --- ansible_mitogen/planner.py | 1 - ansible_mitogen/target.py | 203 +++++++++++++++++++++---------------- 2 files changed, 115 insertions(+), 89 deletions(-) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 8ea3886a..801950f9 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -51,7 +51,6 @@ except ImportError: # Ansible <2.4 from ansible.plugins import module_loader from ansible.plugins import module_utils_loader -import mitogen import ansible_mitogen.target diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 9d4d8d3a..9bf239d3 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -266,75 +266,67 @@ def _get_async_dir(): ) -def _write_job_status(job_id, dct): - """ - Update an async job status file. - """ - LOG.info('_write_job_status(%r, %r)', job_id, dct) - dct.setdefault('ansible_job_id', job_id) - dct.setdefault('data', '') - - async_dir = _get_async_dir() - if not os.path.exists(async_dir): - os.makedirs(async_dir) - - path = os.path.join(async_dir, job_id) - with open(path + '.tmp', 'w') as fp: - fp.write(json.dumps(dct)) - os.rename(path + '.tmp', path) - - -def _sigalrm(broker, timeout_secs, job_id): - """ - Respond to SIGALRM (job timeout) by updating the job file and killing the - process. - """ - msg = "Job reached maximum time limit of %d seconds." % (timeout_secs,) - _write_job_status(job_id, { - "failed": 1, - "finished": 1, - "msg": msg, - }) - broker.shutdown() - - -def _install_alarm(broker, timeout_secs, job_id): - handler = lambda *_: _sigalrm(broker, timeout_secs, job_id) - signal.signal(signal.SIGALRM, handler) - signal.alarm(timeout_secs) - - -def _run_module_async(kwargs, job_id, timeout_secs, econtext): - """ - 1. Immediately updates the status file to mark the job as started. - 2. Installs a timer/signal handler to implement the time limit. - 3. Runs as with run_module(), writing the result to the status file. - - :param dict kwargs: - Runner keyword arguments. - :param str job_id: - String job ID. - :param int timeout_secs: - If >0, limit the task's maximum run time. - """ - _write_job_status(job_id, { - 'started': 1, - 'finished': 0, - 'pid': os.getpid() - }) - - if timeout_secs > 0: - _install_alarm(econtext.broker, timeout_secs, job_id) +class AsyncRunner(object): + def __init__(self, job_id, timeout_secs, econtext, kwargs): + self.job_id = job_id + self.timeout_secs = timeout_secs + self.econtext = econtext + self.kwargs = kwargs + self._timed_out = False + self._init_path() + + def _init_path(self): + async_dir = _get_async_dir() + if not os.path.exists(async_dir): + os.makedirs(async_dir) + self.path = os.path.join(async_dir, self.job_id) + + def _update(self, dct): + """ + Update an async job status file. + """ + LOG.info('%r._update(%r, %r)', self, self.job_id, dct) + dct.setdefault('ansible_job_id', self.job_id) + dct.setdefault('data', '') + + with open(self.path + '.tmp', 'w') as fp: + fp.write(json.dumps(dct)) + os.rename(self.path + '.tmp', self.path) + + def _on_sigalrm(self, signum, frame): + """ + Respond to SIGALRM (job timeout) by updating the job file and killing + the process. + """ + msg = "Job reached maximum time limit of %d seconds." % ( + self.timeout_secs, + ) + self._update({ + "failed": 1, + "finished": 1, + "msg": msg, + }) + self._timed_out = True + self.econtext.broker.shutdown() + + def _install_alarm(self): + signal.signal(signal.SIGALRM, self._on_sigalrm) + signal.alarm(self.timeout_secs) + + def _run_module(self): + kwargs = dict(self.kwargs, **{ + 'detach': True, + 'econtext': self.econtext, + 'emulate_tty': False, + }) - kwargs['detach'] = True - kwargs['econtext'] = econtext - kwargs['emulate_tty'] = False - dct = run_module(kwargs) - if mitogen.core.PY3: - for key in 'stdout', 'stderr': - dct[key] = dct[key].decode('utf-8', 'surrogateescape') + dct = run_module(kwargs) + if mitogen.core.PY3: + for key in 'stdout', 'stderr': + dct[key] = dct[key].decode('utf-8', 'surrogateescape') + return dct - try: + def _parse_result(self, dct): filtered, warnings = ( ansible.module_utils.json_utils. _filter_non_json_lines(dct['stdout']) @@ -342,34 +334,69 @@ def _run_module_async(kwargs, job_id, timeout_secs, econtext): result = json.loads(filtered) result.setdefault('warnings', []).extend(warnings) result['stderr'] = dct['stderr'] - _write_job_status(job_id, result) - except Exception: - _write_job_status(job_id, { - "failed": 1, - "msg": traceback.format_exc(), - "data": dct['stdout'], # temporary notice only - "stderr": dct['stderr'] + self._update(result) + + def _run(self): + """ + 1. Immediately updates the status file to mark the job as started. + 2. Installs a timer/signal handler to implement the time limit. + 3. Runs as with run_module(), writing the result to the status file. + + :param dict kwargs: + Runner keyword arguments. + :param str job_id: + String job ID. + :param int timeout_secs: + If >0, limit the task's maximum run time. + """ + self._update({ + 'started': 1, + 'finished': 0, + 'pid': os.getpid() }) + if self.timeout_secs > 0: + self._install_alarm() + + dct = self._run_module() + if not self._timed_out: + # After SIGALRM fires, there is a window between broker responding + # to shutdown() by killing the process, and work continuing on the + # main thread. If main thread was asleep in at least + # basic.py/select.select(), an EINTR will be raised. We want to + # discard that exception. + try: + self._parse_result(dct) + except Exception: + self._update({ + "failed": 1, + "msg": traceback.format_exc(), + "data": dct['stdout'], # temporary notice only + "stderr": dct['stderr'] + }) + + def run(self): + try: + try: + self._run() + except Exception: + self._update({ + "failed": 1, + "msg": traceback.format_exc(), + }) + finally: + self.econtext.broker.shutdown() + @mitogen.core.takes_econtext def run_module_async(kwargs, job_id, timeout_secs, econtext): """ - Arrange for a module to be executed with its run status and result - serialized to a disk file. This function expects to run in a child forked - using :func:`create_fork_child`. + Execute a module with its run status and result written to a file, + terminating on the process on completion. This function must run in a child + forked using :func:`create_fork_child`. """ - try: - try: - _run_module_async(kwargs, job_id, timeout_secs, econtext) - except Exception: - # Catch any (ansible_mitogen) bugs and write them to the job file. - _write_job_status(job_id, { - "failed": 1, - "msg": traceback.format_exc(), - }) - finally: - econtext.broker.shutdown() + arunner = AsyncRunner(job_id, timeout_secs, econtext, kwargs) + arunner.run() def make_temp_directory(base_dir): From 9617f4d7bffcd292714237d1575178cdf73788d3 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 15:28:33 +0100 Subject: [PATCH 11/11] Revert "try to catch EINTR on travis" This reverts commit 42797d5cff333069607b2e9085a936c2d67133af. --- .travis/ansible_tests.sh | 2 +- tests/ansible/all.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis/ansible_tests.sh b/.travis/ansible_tests.sh index 38c15090..b5162ae0 100755 --- a/.travis/ansible_tests.sh +++ b/.travis/ansible_tests.sh @@ -59,7 +59,7 @@ echo travis_fold:end:job_setup echo travis_fold:start:ansible -/usr/bin/time ./run_ansible_playbook.sh -vvv \ +/usr/bin/time ./run_ansible_playbook.sh \ all.yml \ -i "${TMPDIR}/hosts" echo travis_fold:end:ansible diff --git a/tests/ansible/all.yml b/tests/ansible/all.yml index f1615a1d..a68831f7 100644 --- a/tests/ansible/all.yml +++ b/tests/ansible/all.yml @@ -1,3 +1,3 @@ -#- import_playbook: regression/all.yml +- import_playbook: regression/all.yml - import_playbook: integration/all.yml