From aba6cb302a85f824ecdff4543ac0d483e2289baf Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 28 May 2018 23:31:10 +0100 Subject: [PATCH 01/40] docs: add example sudoers rule hat tip @seuf :) --- docs/ansible.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/ansible.rst b/docs/ansible.rst index c401dd42..a737d47f 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -69,6 +69,12 @@ Installation per-run basis. Like ``mitogen_linear``, the ``mitogen_free`` strategy exists to mimic the ``free`` strategy. +5. If targets have a restrictive ``sudoers`` file, add a rule like: + + .. code-block:: plain + + deploy = (ALL) NOPASSWD:/usr/bin/python -c* + Demo ~~~~ From cee76ee7b9a861e8d4cb6ae8819bcc3c956b2d8d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 00:45:11 +0100 Subject: [PATCH 02/40] tests: gcloud setup fixes. --- tests/ansible/gcloud/ansible.cfg | 3 +++ tests/ansible/gcloud/controller.yml | 9 +++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/ansible/gcloud/ansible.cfg b/tests/ansible/gcloud/ansible.cfg index d1fcd982..75be745c 100644 --- a/tests/ansible/gcloud/ansible.cfg +++ b/tests/ansible/gcloud/ansible.cfg @@ -1,3 +1,6 @@ [defaults] +strategy_plugins = ../../../ansible_mitogen/plugins/strategy +strategy = mitogen inventory = hosts retry_files_enabled = False +host_key_checking = False diff --git a/tests/ansible/gcloud/controller.yml b/tests/ansible/gcloud/controller.yml index 4c768510..48f233d9 100644 --- a/tests/ansible/gcloud/controller.yml +++ b/tests/ansible/gcloud/controller.yml @@ -1,9 +1,6 @@ - hosts: controller tasks: - - shell: "rsync -a ~/.ssh {{inventory_hostname}}:" - connection: local - - lineinfile: line: "net.ipv4.ip_forward=1" path: /etc/sysctl.conf @@ -30,6 +27,10 @@ - libsasl2-dev - build-essential - git + - rsync + + - shell: "rsync -a ~/.ssh {{inventory_hostname}}:" + connection: local - git: dest: ~/mitogen @@ -39,7 +40,7 @@ - git: dest: ~/ansible repo: https://github.com/dw/ansible.git - version: lazy-vars + version: dmw - pip: virtualenv: ~/venv From 325d13538f89b9b91daa7a40c271487fed7c864b Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 00:51:11 +0100 Subject: [PATCH 03/40] issue #196: debug: don't statically import master. --- mitogen/debug.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/mitogen/debug.py b/mitogen/debug.py index f2746380..95f7db3e 100644 --- a/mitogen/debug.py +++ b/mitogen/debug.py @@ -41,7 +41,6 @@ import time import traceback import mitogen.core -import mitogen.master import mitogen.parent @@ -53,14 +52,26 @@ def _hex(n): return '%08x' % n +def get_subclasses(klass): + """ + Rather than statically import every interesting subclass, forcing it all to + be transferred and potentially disrupting the debugged environment, + enumerate only those loaded in memory. Also returns the original class. + """ + stack = [klass] + seen = set() + while stack: + klass = stack.pop() + seen.add(klass) + stack.extend(klass.__subclasses__()) + return seen + + def get_routers(): + kl return { _hex(id(router)): router - for klass in ( - mitogen.core.Router, - mitogen.parent.Router, - mitogen.master.Router, - ) + for klass in get_subclasses(mitogen.core.Router) for router in gc.get_referrers(klass) if isinstance(router, mitogen.core.Router) } From 9492dbc4d7c2ac24d50e750c59fdd428d5a5c694 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 13:00:29 +0100 Subject: [PATCH 04/40] parent: split out minify.py and add stub where master can install it. This needs a cleaner mechanism to install it, at least this one is documented. --- mitogen/core.py | 1 + mitogen/master.py | 15 +++++ mitogen/minify.py | 134 +++++++++++++++++++++++++++++++++++++++++++++ mitogen/parent.py | 120 ++++------------------------------------ mitogen/service.py | 1 + preamble_size.py | 7 ++- 6 files changed, 167 insertions(+), 111 deletions(-) create mode 100644 mitogen/minify.py diff --git a/mitogen/core.py b/mitogen/core.py index ccaf9ab0..66842852 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -555,6 +555,7 @@ class Importer(object): 'jail', 'lxc', 'master', + 'minify', 'parent', 'select', 'service', diff --git a/mitogen/master.py b/mitogen/master.py index 9a5b7e83..9665377f 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -52,7 +52,9 @@ if not hasattr(pkgutil, 'find_loader'): # been kept intentionally 2.3 compatible so we can reuse it. from mitogen.compat import pkgutil +import mitogen import mitogen.core +import mitogen.minify import mitogen.parent from mitogen.core import LOG @@ -79,6 +81,19 @@ def get_child_modules(path): return [name for _, name, _ in it] +def get_core_source(): + """ + Master version of parent.get_core_source(). + """ + source = inspect.getsource(mitogen.core) + return mitogen.minify.minimize_source(source) + + +if mitogen.is_master: + # TODO: find a less surprising way of installing this. + mitogen.parent.get_core_source = get_core_source + + LOAD_CONST = dis.opname.index('LOAD_CONST') IMPORT_NAME = dis.opname.index('IMPORT_NAME') diff --git a/mitogen/minify.py b/mitogen/minify.py new file mode 100644 index 00000000..1d6f8d11 --- /dev/null +++ b/mitogen/minify.py @@ -0,0 +1,134 @@ +# Copyright 2017, Alex Willmer +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import sys + + +try: + from cStringIO import StringIO as BytesIO +except ImportError: + from io import BytesIO + +if sys.version_info < (2, 7, 11): + from mitogen.compat import tokenize +else: + import tokenize + +try: + from functools import lru_cache +except ImportError: + from mitogen.compat.functools import lru_cache + + +@lru_cache() +def minimize_source(source): + """Remove most comments and docstrings from Python source code. + """ + tokens = tokenize.generate_tokens(BytesIO(source).readline) + tokens = strip_comments(tokens) + tokens = strip_docstrings(tokens) + tokens = reindent(tokens) + return tokenize.untokenize(tokens) + + +def strip_comments(tokens): + """Drop comment tokens from a `tokenize` stream. + + Comments on lines 1-2 are kept, to preserve hashbang and encoding. + Trailing whitespace is remove from all lines. + """ + prev_typ = None + prev_end_col = 0 + for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens: + if typ in (tokenize.NL, tokenize.NEWLINE): + if prev_typ in (tokenize.NL, tokenize.NEWLINE): + start_col = 0 + else: + start_col = prev_end_col + end_col = start_col + 1 + elif typ == tokenize.COMMENT and start_row > 2: + continue + prev_typ = typ + prev_end_col = end_col + yield typ, tok, (start_row, start_col), (end_row, end_col), line + + +def strip_docstrings(tokens): + """Replace docstring tokens with NL tokens in a `tokenize` stream. + + Any STRING token not part of an expression is deemed a docstring. + Indented docstrings are not yet recognised. + """ + stack = [] + state = 'wait_string' + for t in tokens: + typ = t[0] + if state == 'wait_string': + if typ in (tokenize.NL, tokenize.COMMENT): + yield t + elif typ in (tokenize.DEDENT, tokenize.INDENT, tokenize.STRING): + stack.append(t) + elif typ == tokenize.NEWLINE: + stack.append(t) + start_line, end_line = stack[0][2][0], stack[-1][3][0]+1 + for i in range(start_line, end_line): + yield tokenize.NL, '\n', (i, 0), (i,1), '\n' + for t in stack: + if t[0] in (tokenize.DEDENT, tokenize.INDENT): + yield t[0], t[1], (i+1, t[2][1]), (i+1, t[3][1]), t[4] + del stack[:] + else: + stack.append(t) + for t in stack: yield t + del stack[:] + state = 'wait_newline' + elif state == 'wait_newline': + if typ == tokenize.NEWLINE: + state = 'wait_string' + yield t + + +def reindent(tokens, indent=' '): + """Replace existing indentation in a token steam, with `indent`. + """ + old_levels = [] + old_level = 0 + new_level = 0 + for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens: + if typ == tokenize.INDENT: + old_levels.append(old_level) + old_level = len(tok) + new_level += 1 + tok = indent * new_level + elif typ == tokenize.DEDENT: + old_level = old_levels.pop() + new_level -= 1 + start_col = max(0, start_col - old_level + new_level) + if start_row == end_row: + end_col = start_col + len(tok) + yield typ, tok, (start_row, start_col), (end_row, end_col), line diff --git a/mitogen/parent.py b/mitogen/parent.py index 9436591e..bec03f85 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -52,21 +52,6 @@ import zlib # Absolute imports for <2.5. select = __import__('select') -try: - from cStringIO import StringIO as BytesIO -except ImportError: - from io import BytesIO - -if sys.version_info < (2, 7, 11): - from mitogen.compat import tokenize -else: - import tokenize - -try: - from functools import lru_cache -except ImportError: - from mitogen.compat.functools import lru_cache - import mitogen.core from mitogen.core import LOG from mitogen.core import IOLOG @@ -82,101 +67,21 @@ def get_log_level(): return (LOG.level or logging.getLogger().level or logging.INFO) -def is_immediate_child(msg, stream): - """ - Handler policy that requires messages to arrive only from immediately - connected children. - """ - return msg.src_id == stream.remote_id - - -@lru_cache() -def minimize_source(source): - """Remove most comments and docstrings from Python source code. +def get_core_source(): """ - tokens = tokenize.generate_tokens(BytesIO(source).readline) - tokens = strip_comments(tokens) - tokens = strip_docstrings(tokens) - tokens = reindent(tokens) - return tokenize.untokenize(tokens) - - -def strip_comments(tokens): - """Drop comment tokens from a `tokenize` stream. - - Comments on lines 1-2 are kept, to preserve hashbang and encoding. - Trailing whitespace is remove from all lines. + In non-masters, simply fetch the cached mitogen.core source code via the + import mechanism. In masters, this function is replaced with a version that + performs minification directly. """ - prev_typ = None - prev_end_col = 0 - for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens: - if typ in (tokenize.NL, tokenize.NEWLINE): - if prev_typ in (tokenize.NL, tokenize.NEWLINE): - start_col = 0 - else: - start_col = prev_end_col - end_col = start_col + 1 - elif typ == tokenize.COMMENT and start_row > 2: - continue - prev_typ = typ - prev_end_col = end_col - yield typ, tok, (start_row, start_col), (end_row, end_col), line - + return inspect.getsource(mitogen.core) -def strip_docstrings(tokens): - """Replace docstring tokens with NL tokens in a `tokenize` stream. - Any STRING token not part of an expression is deemed a docstring. - Indented docstrings are not yet recognised. +def is_immediate_child(msg, stream): """ - stack = [] - state = 'wait_string' - for t in tokens: - typ = t[0] - if state == 'wait_string': - if typ in (tokenize.NL, tokenize.COMMENT): - yield t - elif typ in (tokenize.DEDENT, tokenize.INDENT, tokenize.STRING): - stack.append(t) - elif typ == tokenize.NEWLINE: - stack.append(t) - start_line, end_line = stack[0][2][0], stack[-1][3][0]+1 - for i in range(start_line, end_line): - yield tokenize.NL, '\n', (i, 0), (i,1), '\n' - for t in stack: - if t[0] in (tokenize.DEDENT, tokenize.INDENT): - yield t[0], t[1], (i+1, t[2][1]), (i+1, t[3][1]), t[4] - del stack[:] - else: - stack.append(t) - for t in stack: yield t - del stack[:] - state = 'wait_newline' - elif state == 'wait_newline': - if typ == tokenize.NEWLINE: - state = 'wait_string' - yield t - - -def reindent(tokens, indent=' '): - """Replace existing indentation in a token steam, with `indent`. + Handler policy that requires messages to arrive only from immediately + connected children. """ - old_levels = [] - old_level = 0 - new_level = 0 - for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens: - if typ == tokenize.INDENT: - old_levels.append(old_level) - old_level = len(tok) - new_level += 1 - tok = indent * new_level - elif typ == tokenize.DEDENT: - old_level = old_levels.pop() - new_level -= 1 - start_col = max(0, start_col - old_level + new_level) - if start_row == end_row: - end_col = start_col + len(tok) - yield typ, tok, (start_row, start_col), (end_row, end_col), line + return msg.src_id == stream.remote_id def flags(names): @@ -498,8 +403,7 @@ def stream_by_method_name(name): @mitogen.core.takes_econtext def _proxy_connect(name, method_name, kwargs, econtext): - - mitogen.parent.upgrade_router(econtext) + upgrade_router(econtext) try: context = econtext.router._connect( klass=stream_by_method_name(method_name), @@ -921,11 +825,11 @@ class Stream(mitogen.core.Stream): } def get_preamble(self): - source = inspect.getsource(mitogen.core) + source = get_core_source() source += '\nExternalContext(%r).main()\n' % ( self.get_econtext_config(), ) - return zlib.compress(minimize_source(source), 9) + return zlib.compress(source, 9) create_child = staticmethod(create_child) create_child_args = {} diff --git a/mitogen/service.py b/mitogen/service.py index 6719f833..4d824f3d 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -149,6 +149,7 @@ class Error(Exception): """ Raised when an error occurs configuring a service or pool. """ + pass # cope with minify_source() bug. class Policy(object): diff --git a/preamble_size.py b/preamble_size.py index 6e3c7924..df1b3330 100644 --- a/preamble_size.py +++ b/preamble_size.py @@ -8,6 +8,7 @@ import zlib import mitogen.fakessh import mitogen.master +import mitogen.minify import mitogen.parent import mitogen.service import mitogen.ssh @@ -34,16 +35,16 @@ print( ) for mod in ( - mitogen.master, mitogen.parent, - mitogen.service, mitogen.ssh, mitogen.sudo, + mitogen.service, mitogen.fakessh, + mitogen.master, ): original = inspect.getsource(mod) original_size = len(original) - minimized = mitogen.parent.minimize_source(original) + minimized = mitogen.minify.minimize_source(original) minimized_size = len(minimized) compressed = zlib.compress(minimized, 9) compressed_size = len(compressed) From a578250bfb4adb6fdb9c73c0288a7232805b2a40 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 13:05:52 +0100 Subject: [PATCH 05/40] ansible: remove indirect master.py imports. Avoids sending 10 modules: 77d76 < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'ansible_mitogen.module_finder') 79d77 < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'ansible_mitogen.services') 81,84d78 < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'mitogen.compat') < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'mitogen.compat.collections') < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'mitogen.compat.functools') < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'mitogen.compat.tokenize') 86,87d79 < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'mitogen.master') < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'mitogen.minify') 89,90d80 < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'mitogen.select') < _send_load_module(mitogen.ssh.Stream(u'ssh.localhost'), 'mitogen.service') --- ansible_mitogen/runner.py | 1 - ansible_mitogen/target.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 807623f7..c9e6d53b 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -47,7 +47,6 @@ import sys import tempfile import types -import mitogen.service import ansible_mitogen.target # TODO: circular import try: diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 090730d8..089e5167 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -50,11 +50,9 @@ import traceback import ansible.module_utils.json_utils import ansible_mitogen.runner -import ansible_mitogen.services import mitogen.core import mitogen.fork import mitogen.parent -import mitogen.service LOG = logging.getLogger(__name__) From f7b368b1fbe38708a312c7bb1f60b88f46abb7fa Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 13:33:58 +0100 Subject: [PATCH 06/40] master: implement ModuleResponder.forward_module(). --- mitogen/master.py | 78 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 21 deletions(-) diff --git a/mitogen/master.py b/mitogen/master.py index 9665377f..cce9a292 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -538,11 +538,41 @@ class ModuleResponder(object): self._cache[fullname] = tup return tup - def _send_load_module(self, stream, msg, fullname): - LOG.debug('_send_load_module(%r, %r)', stream, fullname) - msg.reply(self._build_tuple(fullname), - handle=mitogen.core.LOAD_MODULE) - stream.sent_modules.add(fullname) + def _send_load_module(self, stream, fullname): + if fullname not in stream.sent_modules: + LOG.debug('_send_load_module(%r, %r)', stream, fullname) + self._router._async_route( + mitogen.core.Message.pickled( + self._build_tuple(fullname), + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, + ) + ) + stream.sent_modules.add(fullname) + + def _send_module_load_failed(self, stream, fullname): + stream.send( + mitogen.core.Message.pickled( + (fullname, None, None, None, ()), + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, + ) + ) + + def _send_module_and_related(self, stream, fullname): + try: + tup = self._build_tuple(fullname) + for name in tup[4]: # related + parent, _, _ = name.partition('.') + if parent != fullname and parent not in stream.sent_modules: + # Parent hasn't been sent, so don't load submodule yet. + continue + + self._send_load_module(stream, name) + self._send_load_module(stream, fullname) + except Exception: + LOG.debug('While importing %r', fullname, exc_info=True) + self._send_module_load_failed(stream, fullname) def _on_get_module(self, msg): if msg.is_dead: @@ -555,25 +585,31 @@ class ModuleResponder(object): LOG.warning('_on_get_module(): dup request for %r from %r', fullname, stream) - try: - tup = self._build_tuple(fullname) - for name in tup[4]: # related - parent, _, _ = name.partition('.') - if parent != fullname and parent not in stream.sent_modules: - # Parent hasn't been sent, so don't load submodule yet. - continue + self._send_module_and_related(stream, fullname) - if name in stream.sent_modules: - # Submodule has been sent already, skip. - continue + def _send_forward_module(self, stream, context, fullname): + if stream.remote_id != context.context_id: + stream.send( + mitogen.core.Message( + data='%s\x00%s' % (context.context_id, fullname), + handle=mitogen.core.FORWARD_MODULE, + ) + ) - self._send_load_module(stream, msg, name) - self._send_load_module(stream, msg, fullname) + def _forward_module(self, context, fullname): + LOG.debug('%r._forward_module(%r, %r)', self, context, fullname) + path = [] + while fullname: + path.append(fullname) + fullname, _, _ = fullname.rpartition('.') - except Exception: - LOG.debug('While importing %r', fullname, exc_info=True) - msg.reply((fullname, None, None, None, ()), - handle=mitogen.core.LOAD_MODULE) + for fullname in reversed(path): + stream = self._router.stream_by_id(context.context_id) + self._send_module_and_related(stream, fullname) + self._send_forward_module(stream, context, fullname) + + def forward_module(self, context, fullname): + self._router.broker.defer(self._forward_module, context, fullname) class Broker(mitogen.core.Broker): From 8d45e609ee35d22fd1940eb082b6620f00cfea3d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 13:34:20 +0100 Subject: [PATCH 07/40] ansible: preload always-requested modules. Avoid 9 roundtrips during setup. In combination with previous change, reduces 'ansible -m stat' execution over 25ms link from 3.7s to 3.07s. 1,3d0 < _on_get_module('ansible') < _on_get_module('ansible.module_utils') < _on_get_module('ansible.module_utils.basic') 69,74d65 < _on_get_module('ansible.module_utils.json_utils') < _on_get_module('ansible.release') < _on_get_module('ansible_mitogen') < _on_get_module('ansible_mitogen.runner') < _on_get_module('ansible_mitogen.target') < _on_get_module('mitogen.fork') --- ansible_mitogen/services.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 24e1f5b1..ee4ca010 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -227,6 +227,19 @@ class ContextService(mitogen.service.Service): finally: self._lock.release() + ALWAYS_PRELOAD = ( + 'ansible_mitogen.target', + 'ansible.release', + 'ansible.module_utils.json_utils', + 'ansible_mitogen.runner', + 'mitogen.fork', + 'ansible.module_utils.basic', + ) + + def _send_module_forwards(self, context): + for fullname in self.ALWAYS_PRELOAD: + self.router.responder.forward_module(context, fullname) + def _connect(self, key, spec, via=None): """ Actual connect implementation. Arranges for the Mitogen connection to @@ -266,6 +279,7 @@ class ContextService(mitogen.service.Service): mitogen.core.listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream)) + self._send_module_forwards(context) home_dir = context.call(os.path.expanduser, '~') # We don't need to wait for the result of this. Ideally we'd check its From f7d2eace085bf088981212a24063b2975aa7db9a Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 13:44:04 +0100 Subject: [PATCH 08/40] tests: importer fixes --- mitogen/core.py | 2 +- mitogen/master.py | 1 + mitogen/parent.py | 9 ++++++--- tests/minimize_source_test.py | 19 ++++++++++--------- tests/module_finder_test.py | 1 + tests/responder_test.py | 8 ++++---- 6 files changed, 23 insertions(+), 17 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 66842852..70c2f588 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -864,7 +864,7 @@ class Stream(BasicStream): self._router = router self.remote_id = remote_id self.name = 'default' - self.sent_modules = set() + self.sent_modules = set(['mitogen', 'mitogen.core']) self.construct(**kwargs) self._input_buf = collections.deque() self._output_buf = collections.deque() diff --git a/mitogen/master.py b/mitogen/master.py index cce9a292..8c06ef8f 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -593,6 +593,7 @@ class ModuleResponder(object): mitogen.core.Message( data='%s\x00%s' % (context.context_id, fullname), handle=mitogen.core.FORWARD_MODULE, + dst_id=stream.remote_id, ) ) diff --git a/mitogen/parent.py b/mitogen/parent.py index bec03f85..1d645f23 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1231,7 +1231,7 @@ class ModuleForwarder(object): if msg.is_dead: return - context_id_s, fullname = msg.data.partition('\x00') + context_id_s, _, fullname = msg.data.partition('\x00') context_id = int(context_id_s) stream = self.router.stream_by_id(context_id) if stream.remote_id == mitogen.parent_id: @@ -1239,15 +1239,18 @@ class ModuleForwarder(object): self, context_id, fullname) return + if fullname in stream.sent_modules: + return + LOG.debug('%r._on_forward_module() sending %r to %r via %r', self, fullname, context_id, stream.remote_id) self._send_module_and_related(stream, fullname) if stream.remote_id != context_id: stream._send( mitogen.core.Message( - dst_id=stream.remote_id, - handle=mitogen.core.FORWARD_MODULE, data=msg.data, + handle=mitogen.core.FORWARD_MODULE, + dst_id=stream.remote_id, ) ) diff --git a/tests/minimize_source_test.py b/tests/minimize_source_test.py index 857fc339..b98cdebd 100644 --- a/tests/minimize_source_test.py +++ b/tests/minimize_source_test.py @@ -1,7 +1,6 @@ import unittest2 -from mitogen.parent import minimize_source - +import mitogen.minify import testlib @@ -14,40 +13,42 @@ def read_sample(fname): class MinimizeSource(unittest2.TestCase): + func = staticmethod(mitogen.minify.minimize_source) + def test_class(self): original = read_sample('class.py') expected = read_sample('class_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_comment(self): original = read_sample('comment.py') expected = read_sample('comment_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_def(self): original = read_sample('def.py') expected = read_sample('def_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_hashbang(self): original = read_sample('hashbang.py') expected = read_sample('hashbang_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_mod(self): original = read_sample('mod.py') expected = read_sample('mod_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_pass(self): original = read_sample('pass.py') expected = read_sample('pass_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_obstacle_course(self): original = read_sample('obstacle_course.py') expected = read_sample('obstacle_course_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) if __name__ == '__main__': diff --git a/tests/module_finder_test.py b/tests/module_finder_test.py index c4b65e11..1c77bdee 100644 --- a/tests/module_finder_test.py +++ b/tests/module_finder_test.py @@ -200,6 +200,7 @@ class FindRelatedTest(testlib.TestCase): 'mitogen.compat.functools', 'mitogen.core', 'mitogen.master', + 'mitogen.minify', 'mitogen.parent', ]) diff --git a/tests/responder_test.py b/tests/responder_test.py index 837beb3e..3f6f66a9 100644 --- a/tests/responder_test.py +++ b/tests/responder_test.py @@ -52,9 +52,9 @@ class BrokenModulesTest(unittest2.TestCase): responder = mitogen.master.ModuleResponder(router) responder._on_get_module(msg) - self.assertEquals(1, len(router.route.mock_calls)) + self.assertEquals(1, len(router._async_route.mock_calls)) - call = router.route.mock_calls[0] + call = router._async_route.mock_calls[0] msg, = call[1] self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle) self.assertEquals(('non_existent_module', None, None, None, ()), @@ -81,9 +81,9 @@ class BrokenModulesTest(unittest2.TestCase): responder = mitogen.master.ModuleResponder(router) responder._on_get_module(msg) - self.assertEquals(1, len(router.route.mock_calls)) + self.assertEquals(1, len(router._async_route.mock_calls)) - call = router.route.mock_calls[0] + call = router._async_route.mock_calls[0] msg, = call[1] self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle) self.assertIsInstance(msg.unpickle(), tuple) From b577a11f86fc533ded7417a988b31dd3780c91be Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 14:26:05 +0100 Subject: [PATCH 09/40] master: fix IdAllocator log messages. --- mitogen/master.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mitogen/master.py b/mitogen/master.py index 8c06ef8f..1c346d36 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -704,7 +704,7 @@ class IdAllocator(object): id_ = self.next_id self.next_id += self.BLOCK_SIZE end_id = id_ + self.BLOCK_SIZE - LOG.debug('%r: allocating (%d..%d]', self, id_, end_id) + LOG.debug('%r: allocating [%d..%d)', self, id_, end_id) return id_, end_id finally: self.lock.release() @@ -718,5 +718,5 @@ class IdAllocator(object): allocated = self.router.context_by_id(id_, msg.src_id) LOG.debug('%r: allocating [%r..%r) to %r', - self, allocated, requestee, msg.src_id) + self, id_, last_id, requestee) msg.reply((id_, last_id)) From fdbd9541132a91218ca04814c951dffad321ba18 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 14:45:12 +0100 Subject: [PATCH 10/40] ansible: preload built-in modules in ModuleDepScanner. For "ansible -m setup" over a 25ms link, avoids 65 roundtrips and reduces runtime from 5.7s to 4.1s (-28%). For "ansible -m setup" over a simulated 250 ms link, reduces runtime from m27.015s to 0m8.254s (-69%). --- ansible_mitogen/mixins.py | 1 + ansible_mitogen/planner.py | 3 +-- ansible_mitogen/services.py | 38 +++++++++++++++++++++++++++---------- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/ansible_mitogen/mixins.py b/ansible_mitogen/mixins.py index fdd104b2..18d29da3 100644 --- a/ansible_mitogen/mixins.py +++ b/ansible_mitogen/mixins.py @@ -313,6 +313,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): env = {} self._compute_environment_string(env) + self._connection._connect() return ansible_mitogen.planner.invoke( ansible_mitogen.planner.Invocation( action=self, diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 6605686c..29a1670a 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -181,7 +181,6 @@ class BinaryPlanner(Planner): return module_common._is_binary(invocation.module_source) def _grant_file_service_access(self, invocation): - invocation.connection._connect() invocation.connection.parent.call_service( service_name='ansible_mitogen.services.FileService', method_name='register', @@ -297,7 +296,6 @@ class NewStylePlanner(ScriptPlanner): ) def get_module_utils(self, invocation): - invocation.connection._connect() return invocation.connection.parent.call_service( service_name='ansible_mitogen.services.ModuleDepService', method_name='scan', @@ -306,6 +304,7 @@ class NewStylePlanner(ScriptPlanner): module_path=invocation.module_path, search_path=self.get_search_path(invocation), builtin_path=module_common._MODULE_UTILS_PATH, + context=invocation.connection.context, ) def plan(self, invocation): diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index ee4ca010..916a4c1a 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -624,32 +624,50 @@ class ModuleDepService(mitogen.service.Service): self._file_service = file_service self._cache = {} + def _get_builtin_names(self, builtin_path, resolved): + return [ + fullname + for fullname, path, is_pkg in resolved + if os.path.abspath(path).startswith(builtin_path) + ] + + def _get_custom_tups(self, builtin_path, resolved): + return [ + (fullname, path, is_pkg) + for fullname, path, is_pkg in resolved + if not os.path.abspath(path).startswith(builtin_path) + ] + @mitogen.service.expose(policy=mitogen.service.AllowParents()) @mitogen.service.arg_spec({ 'module_name': basestring, 'module_path': basestring, 'search_path': tuple, 'builtin_path': basestring, + 'context': mitogen.core.Context, }) - def scan(self, module_name, module_path, search_path, builtin_path): - if (module_name, search_path) not in self._cache: + def scan(self, module_name, module_path, search_path, builtin_path, context): + key = (module_name, search_path) + if key not in self._cache: resolved = ansible_mitogen.module_finder.scan( module_name=module_name, module_path=module_path, search_path=tuple(search_path) + (builtin_path,), ) builtin_path = os.path.abspath(builtin_path) - filtered = [ - (fullname, path, is_pkg) - for fullname, path, is_pkg in resolved - if not os.path.abspath(path).startswith(builtin_path) - ] - self._cache[module_name, search_path] = filtered + builtin = self._get_builtin_names(builtin_path, resolved) + custom = self._get_custom_tups(builtin_path, resolved) + self._cache[key] = { + 'builtin': builtin, + 'custom': custom, + } # Grant FileService access to paths in here to avoid another 2 IPCs # from WorkerProcess. self._file_service.register(path=module_path) - for fullname, path, is_pkg in filtered: + for fullname, path, is_pkg in custom: self._file_service.register(path=path) - return self._cache[module_name, search_path] + for name in self._cache[key]['builtin']: + self.router.responder.forward_module(context, name) + return self._cache[key]['custom'] From 9cb3878f3ff2558da33d7fa6226506f4fedde770 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 15:30:14 +0100 Subject: [PATCH 11/40] nsible: remove unused master import --- ansible_mitogen/services.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 916a4c1a..0add61c5 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -49,7 +49,6 @@ import threading import zlib import mitogen -import mitogen.master import mitogen.service import ansible_mitogen.module_finder import ansible_mitogen.target From d9087c510ba9be4b3743a39987f3e1b829286a1c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 17:07:58 +0100 Subject: [PATCH 12/40] ansible: move FileService into mitogen.service. --- ansible_mitogen/connection.py | 2 +- ansible_mitogen/planner.py | 2 +- ansible_mitogen/process.py | 2 +- ansible_mitogen/runner.py | 2 +- ansible_mitogen/services.py | 250 ---------------------------------- ansible_mitogen/target.py | 4 +- mitogen/service.py | 240 ++++++++++++++++++++++++++++++++ 7 files changed, 246 insertions(+), 256 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 35643d7d..ee34c22b 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -613,7 +613,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): utimes=(st.st_atime, st.st_mtime)) self.parent.call_service( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='register', path=mitogen.utils.cast(in_path) ) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 29a1670a..825e5897 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -182,7 +182,7 @@ class BinaryPlanner(Planner): def _grant_file_service_access(self, invocation): invocation.connection.parent.call_service( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='register', path=invocation.module_path, ) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 4946aa29..236e8ab7 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -151,7 +151,7 @@ class MuxProcess(object): Construct a ContextService and a thread to service requests for it arriving from worker processes. """ - file_service = ansible_mitogen.services.FileService(router=self.router) + file_service = mitogen.service.FileService(router=self.router) self.pool = mitogen.service.Pool( router=self.router, services=[ diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index c9e6d53b..fe14365f 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -272,7 +272,7 @@ class ProgramRunner(Runner): :param str path: Absolute path to the program file on the master, as it can be retrieved - via :class:`ansible_mitogen.services.FileService`. + via :class:`mitogen.service.FileService`. :param bool emulate_tty: If :data:`True`, execute the program with `stdout` and `stderr` merged into a single pipe, emulating Ansible behaviour when an SSH TTY is in diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 0add61c5..c99d2840 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -38,15 +38,11 @@ when a child has completed a job. """ from __future__ import absolute_import -import grp import logging import os import os.path -import pwd -import stat import sys import threading -import zlib import mitogen import mitogen.service @@ -367,252 +363,6 @@ class ContextService(mitogen.service.Service): return result -class StreamState(object): - def __init__(self): - #: List of [(Sender, file object)] - self.jobs = [] - self.completing = {} - #: In-flight byte count. - self.unacked = 0 - #: Lock. - self.lock = threading.Lock() - - -class FileService(mitogen.service.Service): - """ - Streaming file server, used to serve small files like Ansible modules and - huge files like ISO images. Paths must be registered by a trusted context - before they will be served to a child. - - Transfers are divided among the physical streams that connect external - contexts, ensuring each stream never has excessive data buffered in RAM, - while still maintaining enough to fully utilize available bandwidth. This - is achieved by making an initial bandwidth assumption, enqueueing enough - chunks to fill that assumed pipe, then responding to delivery - acknowledgements from the receiver by scheduling new chunks. - - Transfers proceed one-at-a-time per stream. When multiple contexts exist on - a stream (e.g. one is the SSH account, another is a sudo account, and a - third is a proxied SSH connection), each request is satisfied in turn - before subsequent requests start flowing. This ensures when a stream is - contended, priority is given to completing individual transfers rather than - potentially aborting many partial transfers, causing the bandwidth to be - wasted. - - Theory of operation: - 1. Trusted context (i.e. WorkerProcess) calls register(), making a - file available to any untrusted context. - 2. Requestee context creates a mitogen.core.Receiver() to receive - chunks, then calls fetch(path, recv.to_sender()), to set up the - transfer. - 3. fetch() replies to the call with the file's metadata, then - schedules an initial burst up to the window size limit (1MiB). - 4. Chunks begin to arrive in the requestee, which calls acknowledge() - for each 128KiB received. - 5. The acknowledge() call arrives at FileService, which scheduled a new - chunk to refill the drained window back to the size limit. - 6. When the last chunk has been pumped for a single transfer, - Sender.close() is called causing the receive loop in - target.py::_get_file() to exit, allowing that code to compare the - transferred size with the total file size from the metadata. - 7. If the sizes mismatch, _get_file()'s caller is informed which will - discard the result and log/raise an error. - - Shutdown: - 1. process.py calls service.Pool.shutdown(), which arranges for the - service pool threads to exit and be joined, guranteeing no new - requests can arrive, before calling Service.on_shutdown() for each - registered service. - 2. FileService.on_shutdown() walks every in-progress transfer and calls - Sender.close(), causing Receiver loops in the requestees to exit - early. The size check fails and any partially downloaded file is - discarded. - 3. Control exits _get_file() in every target, and graceful shutdown can - proceed normally, without the associated thread needing to be - forcefully killed. - """ - unregistered_msg = 'Path is not registered with FileService.' - context_mismatch_msg = 'sender= kwarg context must match requestee context' - - #: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which - #: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5. - window_size_bytes = 1048576 - - def __init__(self, router): - super(FileService, self).__init__(router) - #: Mapping of registered path -> file size. - self._metadata_by_path = {} - #: Mapping of Stream->StreamState. - self._state_by_stream = {} - - def _name_or_none(self, func, n, attr): - try: - return getattr(func(n), attr) - except KeyError: - return None - - @mitogen.service.expose(policy=mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'paths': list - }) - def register_many(self, paths): - """ - Batch version of register(). - """ - for path in paths: - self.register(path) - - @mitogen.service.expose(policy=mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'path': basestring - }) - def register(self, path): - """ - Authorize a path for access by children. Repeat calls with the same - path is harmless. - - :param str path: - File path. - """ - if path in self._metadata_by_path: - return - - st = os.stat(path) - if not stat.S_ISREG(st.st_mode): - raise IOError('%r is not a regular file.' % (in_path,)) - - LOG.debug('%r: registering %r', self, path) - self._metadata_by_path[path] = { - 'size': st.st_size, - 'mode': st.st_mode, - 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), - 'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'), - 'mtime': st.st_mtime, - 'atime': st.st_atime, - } - - def on_shutdown(self): - """ - Respond to shutdown by sending close() to every target, allowing their - receive loop to exit and clean up gracefully. - """ - LOG.debug('%r.on_shutdown()', self) - for stream, state in self._state_by_stream.items(): - state.lock.acquire() - try: - for sender, fp in reversed(state.jobs): - sender.close() - fp.close() - state.jobs.pop() - finally: - state.lock.release() - - # The IO loop pumps 128KiB chunks. An ideal message is a multiple of this, - # odd-sized messages waste one tiny write() per message on the trailer. - # Therefore subtract 10 bytes pickle overhead + 24 bytes header. - IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + ( - len( - mitogen.core.Message.pickled( - mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE) - ).data - ) - mitogen.core.CHUNK_SIZE - )) - - def _schedule_pending_unlocked(self, state): - """ - Consider the pending transfers for a stream, pumping new chunks while - the unacknowledged byte count is below :attr:`window_size_bytes`. Must - be called with the StreamState lock held. - - :param StreamState state: - Stream to schedule chunks for. - """ - while state.jobs and state.unacked < self.window_size_bytes: - sender, fp = state.jobs[0] - s = fp.read(self.IO_SIZE) - if s: - state.unacked += len(s) - sender.send(mitogen.core.Blob(s)) - else: - # File is done. Cause the target's receive loop to exit by - # closing the sender, close the file, and remove the job entry. - sender.close() - fp.close() - state.jobs.pop(0) - - @mitogen.service.expose(policy=mitogen.service.AllowAny()) - @mitogen.service.no_reply() - @mitogen.service.arg_spec({ - 'path': basestring, - 'sender': mitogen.core.Sender, - }) - def fetch(self, path, sender, msg): - """ - Start a transfer for a registered path. - - :param str path: - File path. - :param mitogen.core.Sender sender: - Sender to receive file data. - :returns: - Dict containing the file metadata: - - * ``size``: File size in bytes. - * ``mode``: Integer file mode. - * ``owner``: Owner account name on host machine. - * ``group``: Owner group name on host machine. - * ``mtime``: Floating point modification time. - * ``ctime``: Floating point change time. - :raises Error: - Unregistered path, or Sender did not match requestee context. - """ - if path not in self._metadata_by_path: - raise Error(self.unregistered_msg) - if msg.src_id != sender.context.context_id: - raise Error(self.context_mismatch_msg) - - LOG.debug('Serving %r', path) - fp = open(path, 'rb', self.IO_SIZE) - # Response must arrive first so requestee can begin receive loop, - # otherwise first ack won't arrive until all pending chunks were - # delivered. In that case max BDP would always be 128KiB, aka. max - # ~10Mbit/sec over a 100ms link. - msg.reply(self._metadata_by_path[path]) - - stream = self.router.stream_by_id(sender.context.context_id) - state = self._state_by_stream.setdefault(stream, StreamState()) - state.lock.acquire() - try: - state.jobs.append((sender, fp)) - self._schedule_pending_unlocked(state) - finally: - state.lock.release() - - @mitogen.service.expose(policy=mitogen.service.AllowAny()) - @mitogen.service.no_reply() - @mitogen.service.arg_spec({ - 'size': int, - }) - @mitogen.service.no_reply() - def acknowledge(self, size, msg): - """ - Acknowledge bytes received by a transfer target, scheduling new chunks - to keep the window full. This should be called for every chunk received - by the target. - """ - stream = self.router.stream_by_id(msg.src_id) - state = self._state_by_stream[stream] - state.lock.acquire() - try: - if state.unacked < size: - LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d', - self, msg.src_id, state.unacked, size) - state.unacked -= min(state.unacked, size) - self._schedule_pending_unlocked(state) - finally: - state.lock.release() - - class ModuleDepService(mitogen.service.Service): """ Scan a new-style module and produce a cached mapping of module_utils names diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 089e5167..7aaf50fb 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -89,7 +89,7 @@ def _get_file(context, path, out_fp): t0 = time.time() recv = mitogen.core.Receiver(router=context.router) metadata = context.call_service( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='fetch', path=path, sender=recv.to_sender(), @@ -99,7 +99,7 @@ def _get_file(context, path, out_fp): s = chunk.unpickle() LOG.debug('_get_file(%r): received %d bytes', path, len(s)) context.call_service_async( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='acknowledge', size=len(s), ).close() diff --git a/mitogen/service.py b/mitogen/service.py index 4d824f3d..842eac1e 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -26,7 +26,12 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import grp +import os +import os.path import pprint +import pwd +import stat import sys import threading @@ -489,3 +494,238 @@ class Pool(object): len(self._threads), th.name, ) + + +class FileStreamState(object): + def __init__(self): + #: List of [(Sender, file object)] + self.jobs = [] + self.completing = {} + #: In-flight byte count. + self.unacked = 0 + #: Lock. + self.lock = threading.Lock() + + +class FileService(Service): + """ + Streaming file server, used to serve small files and huge files alike. + Paths must be registered by a trusted context before they will be served to + a child. + + Transfers are divided among the physical streams that connect external + contexts, ensuring each stream never has excessive data buffered in RAM, + while still maintaining enough to fully utilize available bandwidth. This + is achieved by making an initial bandwidth assumption, enqueueing enough + chunks to fill that assumed pipe, then responding to delivery + acknowledgements from the receiver by scheduling new chunks. + + Transfers proceed one-at-a-time per stream. When multiple contexts exist on + a stream (e.g. one is the SSH account, another is a sudo account, and a + third is a proxied SSH connection), each request is satisfied in turn + before subsequent requests start flowing. This ensures when a stream is + contended, priority is given to completing individual transfers rather than + potentially aborting many partial transfers, causing the bandwidth to be + wasted. + + Theory of operation: + 1. Trusted context (i.e. WorkerProcess) calls register(), making a + file available to any untrusted context. + 2. Requestee context creates a mitogen.core.Receiver() to receive + chunks, then calls fetch(path, recv.to_sender()), to set up the + transfer. + 3. fetch() replies to the call with the file's metadata, then + schedules an initial burst up to the window size limit (1MiB). + 4. Chunks begin to arrive in the requestee, which calls acknowledge() + for each 128KiB received. + 5. The acknowledge() call arrives at FileService, which scheduled a new + chunk to refill the drained window back to the size limit. + 6. When the last chunk has been pumped for a single transfer, + Sender.close() is called causing the receive loop in + target.py::_get_file() to exit, allowing that code to compare the + transferred size with the total file size from the metadata. + 7. If the sizes mismatch, _get_file()'s caller is informed which will + discard the result and log/raise an error. + + Shutdown: + 1. process.py calls service.Pool.shutdown(), which arranges for the + service pool threads to exit and be joined, guranteeing no new + requests can arrive, before calling Service.on_shutdown() for each + registered service. + 2. FileService.on_shutdown() walks every in-progress transfer and calls + Sender.close(), causing Receiver loops in the requestees to exit + early. The size check fails and any partially downloaded file is + discarded. + 3. Control exits _get_file() in every target, and graceful shutdown can + proceed normally, without the associated thread needing to be + forcefully killed. + """ + unregistered_msg = 'Path is not registered with FileService.' + context_mismatch_msg = 'sender= kwarg context must match requestee context' + + #: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which + #: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5. + window_size_bytes = 1048576 + + def __init__(self, router): + super(FileService, self).__init__(router) + #: Mapping of registered path -> file size. + self._metadata_by_path = {} + #: Mapping of Stream->FileStreamState. + self._state_by_stream = {} + + def _name_or_none(self, func, n, attr): + try: + return getattr(func(n), attr) + except KeyError: + return None + + @expose(policy=AllowParents()) + @arg_spec({ + 'path': basestring, + }) + def register(self, path): + """ + Authorize a path for access by children. Repeat calls with the same + path is harmless. + + :param str path: + File path. + """ + if path in self._metadata_by_path: + return + + st = os.stat(path) + if not stat.S_ISREG(st.st_mode): + raise IOError('%r is not a regular file.' % (in_path,)) + + LOG.debug('%r: registering %r', self, path) + self._metadata_by_path[path] = { + 'size': st.st_size, + 'mode': st.st_mode, + 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), + 'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'), + 'mtime': st.st_mtime, + 'atime': st.st_atime, + } + + def on_shutdown(self): + """ + Respond to shutdown by sending close() to every target, allowing their + receive loop to exit and clean up gracefully. + """ + LOG.debug('%r.on_shutdown()', self) + for stream, state in self._state_by_stream.items(): + state.lock.acquire() + try: + for sender, fp in reversed(state.jobs): + sender.close() + fp.close() + state.jobs.pop() + finally: + state.lock.release() + + # The IO loop pumps 128KiB chunks. An ideal message is a multiple of this, + # odd-sized messages waste one tiny write() per message on the trailer. + # Therefore subtract 10 bytes pickle overhead + 24 bytes header. + IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + ( + len( + mitogen.core.Message.pickled( + mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE) + ).data + ) - mitogen.core.CHUNK_SIZE + )) + + def _schedule_pending_unlocked(self, state): + """ + Consider the pending transfers for a stream, pumping new chunks while + the unacknowledged byte count is below :attr:`window_size_bytes`. Must + be called with the FileStreamState lock held. + + :param FileStreamState state: + Stream to schedule chunks for. + """ + while state.jobs and state.unacked < self.window_size_bytes: + sender, fp = state.jobs[0] + s = fp.read(self.IO_SIZE) + if s: + state.unacked += len(s) + sender.send(mitogen.core.Blob(s)) + else: + # File is done. Cause the target's receive loop to exit by + # closing the sender, close the file, and remove the job entry. + sender.close() + fp.close() + state.jobs.pop(0) + + @expose(policy=AllowAny()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'sender': mitogen.core.Sender, + }) + def fetch(self, path, sender, msg): + """ + Start a transfer for a registered path. + + :param str path: + File path. + :param mitogen.core.Sender sender: + Sender to receive file data. + :returns: + Dict containing the file metadata: + + * ``size``: File size in bytes. + * ``mode``: Integer file mode. + * ``owner``: Owner account name on host machine. + * ``group``: Owner group name on host machine. + * ``mtime``: Floating point modification time. + * ``ctime``: Floating point change time. + :raises Error: + Unregistered path, or Sender did not match requestee context. + """ + if path not in self._metadata_by_path: + raise Error(self.unregistered_msg) + if msg.src_id != sender.context.context_id: + raise Error(self.context_mismatch_msg) + + LOG.debug('Serving %r', path) + fp = open(path, 'rb', self.IO_SIZE) + # Response must arrive first so requestee can begin receive loop, + # otherwise first ack won't arrive until all pending chunks were + # delivered. In that case max BDP would always be 128KiB, aka. max + # ~10Mbit/sec over a 100ms link. + msg.reply(self._metadata_by_path[path]) + + stream = self.router.stream_by_id(sender.context.context_id) + state = self._state_by_stream.setdefault(stream, FileStreamState()) + state.lock.acquire() + try: + state.jobs.append((sender, fp)) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() + + @expose(policy=AllowAny()) + @no_reply() + @arg_spec({ + 'size': int, + }) + @no_reply() + def acknowledge(self, size, msg): + """ + Acknowledge bytes received by a transfer target, scheduling new chunks + to keep the window full. This should be called for every chunk received + by the target. + """ + stream = self.router.stream_by_id(msg.src_id) + state = self._state_by_stream[stream] + state.lock.acquire() + try: + if state.unacked < size: + LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d', + self, msg.src_id, state.unacked, size) + state.unacked -= min(state.unacked, size) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() From 7162c13e05ecd56e2c9592521bd3fcfdcdab9962 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 17:15:36 +0100 Subject: [PATCH 13/40] docs: add more notes for getting_started.rst --- docs/getting_started.rst | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 985f796a..0947e040 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -40,6 +40,41 @@ and possibly your team and its successors with: appropriate, prefer a higher level solution instead. +First Principles +---------------- + +Before starting, take a moment to reflect on writing a program that will +operate across machines and privilege domains: + +* As with multithreaded programming, writing a program that spans multiple + hosts is exposed to many asynchrony issues. Unlike multithreaded programming, + the margin for unexpected failures is much higher, even between only two + peers, as communication may be fail at any moment, since that communication + depends on reliability of an external network. + +* Since a multi-host program always spans trust and privilege domains, trust + must be taken into consideration in your design from the outset. Mitogen + attempts to protect the consuming application by default where possible, + however it is paramount that trust considerations are always in mind when + exposing any privileged functionality to a potentially untrusted network of + peers. + + A parent must always assume data received from a child is suspect, and must + not base privileged control decisions on that data. As a small example, a + parent should not form a command to execute in a subprocess using strings + received from a child. + +* As the program spans multiple hosts, its design will benefit from a strict + separation of program and data. This entails avoiding some common Python + idioms that rely on its ability to manipulate functions and closures as if + they were data, such as passing a lambda closed over some program state as a + callback parameter. + + In the general case this is both difficult and unsafe to support in a + distributed program, and so (for now at least) it should be assumed this + functionality is unlikely to appear in future. + + Broker And Router ----------------- @@ -330,6 +365,12 @@ Subclasses of built-in types must be undecorated using :py:func:`mitogen.utils.cast`. +Test Your Design +---------------- + +``tc qdisc add dev eth0 root netem delay 250ms`` + + .. _troubleshooting: Troubleshooting From daa9cfd0a8ce9c93ab8fde2623344cca10f0158e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 18:08:26 +0100 Subject: [PATCH 14/40] ansible: MITOGEN_DUMP_THREAD_STACKS for mux process too --- ansible_mitogen/process.py | 3 +++ docs/ansible.rst | 3 +++ 2 files changed, 6 insertions(+) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 236e8ab7..e36f173d 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -35,6 +35,7 @@ import sys import mitogen import mitogen.core +import mitogen.debug import mitogen.master import mitogen.parent import mitogen.service @@ -145,6 +146,8 @@ class MuxProcess(object): ) if 'MITOGEN_ROUTER_DEBUG' in os.environ: self.router.enable_debug() + if 'MITOGEN_DUMP_THREAD_STACKS' in os.environ: + mitogen.debug.dump_to_logger() def _setup_services(self): """ diff --git a/docs/ansible.rst b/docs/ansible.rst index a737d47f..210e85bf 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -648,6 +648,9 @@ is necessary. File-based logging can be enabled by setting When file-based logging is enabled, one file per context will be created on the local machine and every target machine, as ``/tmp/mitogen..log``. +If you are experiencing a hang, ``MITOGEN_DUMP_THREAD_STACKS=1`` causes every +process to dump every thread stack into the logging framework every 5 seconds. + Getting Help ~~~~~~~~~~~~ From 3aadfbcfa16ca292f1c39753583ae666fae2399c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 May 2018 18:14:29 +0100 Subject: [PATCH 15/40] Add select to preamble_size --- preamble_size.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/preamble_size.py b/preamble_size.py index df1b3330..bca55ab1 100644 --- a/preamble_size.py +++ b/preamble_size.py @@ -10,6 +10,7 @@ import mitogen.fakessh import mitogen.master import mitogen.minify import mitogen.parent +import mitogen.select import mitogen.service import mitogen.ssh import mitogen.sudo @@ -38,6 +39,7 @@ for mod in ( mitogen.parent, mitogen.ssh, mitogen.sudo, + mitogen.select, mitogen.service, mitogen.fakessh, mitogen.master, From 088a7e5cff8c6d93e85b56e777ad7c15beb9b32f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:25:34 +0100 Subject: [PATCH 16/40] ansible: handle "from timeout import timeout" imports. It's not simple without executing a module to determine whether the above refers to a submodule of a package, or an object defined within a module. Therefore detect when resolution of a child module yields the same path as the parent, and ignore the result. --- ansible_mitogen/module_finder.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/ansible_mitogen/module_finder.py b/ansible_mitogen/module_finder.py index 28522dd9..1d062454 100644 --- a/ansible_mitogen/module_finder.py +++ b/ansible_mitogen/module_finder.py @@ -72,35 +72,42 @@ def is_pkg(module): def find(name, path=(), parent=None): """ Return a Module instance describing the first matching module found on the - given search path. + search path. :param str name: Module name. - :param str path: - Search path. + :param list path: + List of directory names to search for the module. :param Module parent: - If given, make the found module a child of this module. + Optional module parent. """ + assert isinstance(path, tuple) head, _, tail = name.partition('.') try: tup = imp.find_module(head, list(path)) - except ImportError: + except ImportError as e: return parent - fp, path, (suffix, mode, kind) = tup + fp, modpath, (suffix, mode, kind) = tup + if parent and modpath == parent.path: + # 'from timeout import timeout', where 'timeout' is a function but also + # the name of the module being imported. + return None + if fp: fp.close() if kind == imp.PKG_DIRECTORY: - path = os.path.join(path, '__init__.py') - module = Module(head, path, kind, parent) + modpath = os.path.join(modpath, '__init__.py') + module = Module(head, modpath, kind, parent) if tail: return find_relative(module, tail, path) return module def find_relative(parent, name, path=()): - path = [os.path.dirname(parent.path)] + list(path) + if parent.kind == imp.PKG_DIRECTORY: + path = (os.path.dirname(parent.path),) + path return find(name, path, parent=parent) From 34daec4a7a9d73c306c7a024f4ffb0beaf013c7f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:27:24 +0100 Subject: [PATCH 17/40] core: prevent warning when CALL_FUNCTION used without reply_to Such as when the stub CALL_SERVICE handler is used. --- mitogen/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mitogen/core.py b/mitogen/core.py index 70c2f588..58e004ae 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1856,7 +1856,9 @@ class ExternalContext(object): for msg in self.recv: try: - msg.reply(self._dispatch_one(msg)) + ret = self._dispatch_one(msg) + if msg.reply_to: + msg.reply(ret) except Exception: e = sys.exc_info()[1] _v and LOG.debug('_dispatch_calls: %s', e) From 1745c3aff08884ec4095349e362f3fa7591d5b29 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:31:01 +0100 Subject: [PATCH 18/40] issue #186: ansible: detach asynchronous tasks After Runner.setup() has executed, but before the module executes. This relies on subsequent commits to ensure all files are preloaded. --- ansible_mitogen/runner.py | 8 +++++++- ansible_mitogen/target.py | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index fe14365f..d96fb2d7 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -115,12 +115,15 @@ class Runner(object): :param dict env: Additional environment variables to set during the run. """ - def __init__(self, module, service_context, args=None, env=None): + def __init__(self, module, service_context, econtext=None, detach=False, + args=None, env=None): if args is None: args = {} self.module = utf8(module) self.service_context = service_context + self.econtext = econtext + self.detach = detach self.args = args self.env = env @@ -176,6 +179,9 @@ class Runner(object): Module result dictionary. """ self.setup() + if self.detach: + self.econtext.detach() + try: return self._run() finally: diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 7aaf50fb..38889039 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -278,6 +278,8 @@ def start_fork_child(wrap_async, kwargs, econtext): context.shutdown() job_id = '%016x' % random.randint(0, 2**64) + kwargs['detach'] = True + kwargs['econtext'] = econtext context.call_async(run_module_async, job_id, kwargs) return { 'stdout': json.dumps({ From 2e8c027322e28321d8ab0c10b45596f0cdc03778 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:38:00 +0100 Subject: [PATCH 19/40] issue #213: avoid service.Pool construction race Ensure concurrent calls to service.Pool do not result in a duplicate pool being constructed in a child. --- mitogen/service.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/mitogen/service.py b/mitogen/service.py index 842eac1e..d41240c2 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -42,14 +42,20 @@ from mitogen.core import LOG DEFAULT_POOL_SIZE = 16 _pool = None +#: Serialize pool construction. +_pool_lock = threading.Lock() @mitogen.core.takes_router def get_or_create_pool(size=None, router=None): global _pool - if _pool is None: - _pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE) - return _pool + _pool_lock.acquire() + try: + if _pool is None: + _pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE) + return _pool + finally: + _pool_lock.release() @mitogen.core.takes_router From a3b747af1b2b78fc23e42a8a661933be2403174c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:40:50 +0100 Subject: [PATCH 20/40] issue #186: add PushFileService This is like FileService but blocks until the file is pushed by a parent context, with deduplicating behaviour at each level in the hierarchy. It does not stream large files, so it is only suitable for small files like Python modules. Additionally add SerializedInvoker for use with PushFileService, which ensures all method calls to a single service occur in sequence. --- docs/services.rst | 10 ++- mitogen/service.py | 164 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 166 insertions(+), 8 deletions(-) diff --git a/docs/services.rst b/docs/services.rst index 4c3f0ab1..49108e80 100644 --- a/docs/services.rst +++ b/docs/services.rst @@ -17,8 +17,9 @@ Overview Service * User-supplied class with explicitly exposed methods. -* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass). * May be auto-imported/constructed in a child from a parent simply by calling it +* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass) by + default, but may use any naming scheme the configured activator understands. * Children receive refusals if the class is not already activated by a aprent * Has an associated Select instance which may be dynamically loaded with receivers over time, on_message_received() invoked if any receiver becomes @@ -28,9 +29,12 @@ Invoker * Abstracts mechanism for calling a service method and verifying permissions. * Built-in 'service.Invoker': concurrent execution of all methods on the thread pool. +* Built-in 'service.SerializedInvoker': serialization of all calls on a single + thread borrowed from the pool while any request is pending. * Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct - (method, kwargs) key, only one such method executes, return value is cached - and broadcast to all requesters. + (method, kwargs) key, only one such method ever executes, return value is + cached and broadcast to all request waiters. Waiters do not block additional + pool threads. Activator diff --git a/mitogen/service.py b/mitogen/service.py index d41240c2..33757836 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -195,12 +195,12 @@ class Activator(object): def activate(self, pool, service_name, msg): mod_name, _, class_name = service_name.rpartition('.') - if not self.is_permitted(mod_name, class_name, msg): + if msg and not self.is_permitted(mod_name, class_name, msg): raise mitogen.core.CallError(self.not_active_msg, service_name) module = mitogen.core.import_module(mod_name) klass = getattr(module, class_name) - service = klass(pool.router) + service = klass(router=pool.router) pool.add(service) return service @@ -261,6 +261,50 @@ class Invoker(object): msg.reply(response) +class SerializedInvoker(Invoker): + def __init__(self, **kwargs): + super(SerializedInvoker, self).__init__(**kwargs) + self._lock = threading.Lock() + self._queue = [] + self._running = False + + def _pop(self): + self._lock.acquire() + try: + try: + return self._queue.pop(0) + except IndexError: + self._running = False + finally: + self._lock.release() + + def _run(self): + while True: + tup = self._pop() + if tup is None: + return + method_name, kwargs, msg = tup + try: + super(SerializedInvoker, self).invoke(method_name, kwargs, msg) + except Exception: + LOG.exception('%r: while invoking %r of %r', + self, method_name, self.service) + msg.reply(mitogen.core.Message.dead()) + + def invoke(self, method_name, kwargs, msg): + self._lock.acquire() + try: + self._queue.append((method_name, kwargs, msg)) + first = not self._running + self._running = True + finally: + self._lock.release() + + if first: + self._run() + return Service.NO_REPLY + + class DeduplicatingInvoker(Invoker): """ A service that deduplicates and caches expensive responses. Requests are @@ -419,7 +463,7 @@ class Pool(object): if name in self._invoker_by_name: raise Error('service named %r already registered' % (name,)) assert service.select not in self._func_by_recv - invoker = service.invoker_class(service) + invoker = service.invoker_class(service=service) self._invoker_by_name[name] = invoker self._func_by_recv[service.select] = service.on_message @@ -439,13 +483,17 @@ class Pool(object): invoker = self._invoker_by_name.get(name) if not invoker: service = self._activator.activate(self, name, msg) - invoker = service.invoker_class(service) + invoker = service.invoker_class(service=service) self._invoker_by_name[name] = invoker finally: self._lock.release() return invoker + def get_service(self, name): + invoker = self.get_invoker(name, None) + return invoker.service + def _validate(self, msg): tup = msg.unpickle(throw=False) if not (isinstance(tup, tuple) and @@ -466,7 +514,8 @@ class Pool(object): LOG.warning('%r: call error: %s: %s', self, msg, e) msg.reply(e) except Exception: - LOG.exception('While invoking %r._invoke()', self) + LOG.exception('%r: while invoking %r of %r', + self, method_name, service_name) e = sys.exc_info()[1] msg.reply(mitogen.core.CallError(e)) @@ -513,6 +562,111 @@ class FileStreamState(object): self.lock = threading.Lock() +class PushFileService(Service): + """ + Push-based file service. Files are delivered and cached in RAM, sent + recursively from parent to child. A child that requests a file via + :meth:`get` will block until it has ben delivered by a parent. + + This service will eventually be merged into FileService. + """ + invoker_class = SerializedInvoker + + def __init__(self, **kwargs): + super(PushFileService, self).__init__(**kwargs) + self._lock = threading.Lock() + self._cache = {} + self._waiters = {} + self._sent_by_stream = {} + + def get(self, path): + self._lock.acquire() + try: + if path in self._cache: + return self._cache[path] + waiters = self._waiters.setdefault(path, []) + latch = mitogen.core.Latch() + waiters.append(lambda: latch.put(None)) + finally: + self._lock.release() + + latch.get() + LOG.debug('%r.get(%r) -> %r', self, path, self._cache[path]) + return self._cache[path] + + def _forward(self, context, path): + stream = self.router.stream_by_id(context.context_id) + child = mitogen.core.Context(self.router, stream.remote_id) + sent = self._sent_by_stream.setdefault(stream, set()) + if path in sent and child.context_id != context.context_id: + child.call_service_async( + service_name=self.name(), + method_name='forward', + path=path, + context=context + ).close() + else: + child.call_service_async( + service_name=self.name(), + method_name='store_and_forward', + path=path, + data=self._cache[path], + context=context + ).close() + + @expose(policy=AllowParents()) + @arg_spec({ + 'context': mitogen.core.Context, + 'path': basestring, + }) + def propagate_to(self, context, path): + LOG.debug('%r.propagate_to(%r, %r)', self, context, path) + if path not in self._cache: + fp = open(path, 'rb') + try: + self._cache[path] = mitogen.core.Blob(fp.read()) + finally: + fp.close() + self._forward(context, path) + + def _store(self, path, data): + self._lock.acquire() + try: + self._cache[path] = data + return self._waiters.pop(path, []) + finally: + self._lock.release() + + @expose(policy=AllowParents()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'data': mitogen.core.Blob, + 'context': mitogen.core.Context, + }) + def store_and_forward(self, path, data, context): + LOG.debug('%r.store_and_forward(%r, %r, %r)', + self, path, data, context) + waiters = self._store(path, data) + if context.context_id != mitogen.context_id: + self._forward(path, context) + for callback in waiters: + callback() + + @expose(policy=AllowParents()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'context': mitogen.core.Context, + }) + def forward(self, path, context): + LOG.debug('%r.forward(%r, %r)', self, path, context) + if path not in self._cache: + LOG.error('%r: %r is not in local cache', self, path) + return + self._forward(path, context) + + class FileService(Service): """ Streaming file server, used to serve small files and huge files alike. From 76beea6554f4ddfce64b25e899cbffc75b85b2c8 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:44:02 +0100 Subject: [PATCH 21/40] issue #186: move target._get_file into mitogen.service For lack of a better place to keep the client function, make it a classmethod of FileService itself for now. The old _get_file() is removed in a subsequent commit. --- ansible_mitogen/target.py | 7 +++++- mitogen/service.py | 51 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 38889039..02f51d40 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -53,6 +53,7 @@ import ansible_mitogen.runner import mitogen.core import mitogen.fork import mitogen.parent +import mitogen.service LOG = logging.getLogger(__name__) @@ -168,7 +169,11 @@ def transfer_file(context, in_path, out_path, sync=False, set_owner=False): try: try: - ok, metadata = _get_file(context, in_path, fp) + ok, metadata = mitogen.service.FileService.get( + context=context, + path=in_path, + out_fp=fp, + ) if not ok: raise IOError('transfer of %r was interrupted.' % (in_path,)) diff --git a/mitogen/service.py b/mitogen/service.py index 33757836..424f21c8 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -669,9 +669,8 @@ class PushFileService(Service): class FileService(Service): """ - Streaming file server, used to serve small files and huge files alike. - Paths must be registered by a trusted context before they will be served to - a child. + Streaming file server, used to serve small and huge files alike. Paths must + be registered by a trusted context before they will be served to a child. Transfers are divided among the physical streams that connect external contexts, ensuring each stream never has excessive data buffered in RAM, @@ -889,3 +888,49 @@ class FileService(Service): self._schedule_pending_unlocked(state) finally: state.lock.release() + + @classmethod + def get(cls, context, path, out_fp): + """ + Streamily download a file from the connection multiplexer process in + the controller. + + :param mitogen.core.Context context: + Reference to the context hosting the FileService that will be used + to fetch the file. + :param bytes in_path: + FileService registered name of the input file. + :param bytes out_path: + Name of the output path on the local disk. + :returns: + :data:`True` on success, or :data:`False` if the transfer was + interrupted and the output should be discarded. + """ + LOG.debug('get_file(): fetching %r from %r', path, context) + t0 = time.time() + recv = mitogen.core.Receiver(router=context.router) + metadata = context.call_service( + service_name=cls.name(), + method_name='fetch', + path=path, + sender=recv.to_sender(), + ) + + for chunk in recv: + s = chunk.unpickle() + LOG.debug('get_file(%r): received %d bytes', path, len(s)) + context.call_service_async( + service_name=cls.name(), + method_name='acknowledge', + size=len(s), + ).close() + out_fp.write(s) + + ok = out_fp.tell() == metadata['size'] + if not ok: + LOG.error('get_file(%r): receiver was closed early, controller ' + 'is likely shutting down.', path) + + LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', + metadata['size'], path, context, 1000 * (time.time() - t0)) + return ok, metadata From 7d4f4b205f6b5683c6c6c46872bb823af8f6188d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:45:02 +0100 Subject: [PATCH 22/40] ansible: update module preload list. --- ansible_mitogen/services.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index c99d2840..5e192e0a 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -223,12 +223,13 @@ class ContextService(mitogen.service.Service): self._lock.release() ALWAYS_PRELOAD = ( - 'ansible_mitogen.target', - 'ansible.release', + 'ansible.module_utils.basic', 'ansible.module_utils.json_utils', + 'ansible.release', 'ansible_mitogen.runner', + 'ansible_mitogen.target', 'mitogen.fork', - 'ansible.module_utils.basic', + 'mitogen.service', ) def _send_module_forwards(self, context): From 569c12a2d6bb84c2ed4f684458f94d5995294105 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:48:42 +0100 Subject: [PATCH 23/40] ansible: use PushFileService for module deps. planner.py: * Rather than grant FileService access to a file for children, use PushFileService to trigger deduplicating send of the file through the hierarchy immediately. * Send the complete list of Ansible module imports to the target so runner.py knows which files and scripts must be loaded via PushFileService prior to detaching. runner.py: * Teach NewStyleRunner to use the full module map to block until everything is loaded prior to detach(). target.py: * Delete old _get_file(), replace get_file() with get_small_file() which uses PushFileService instead. Closes #186 --- ansible_mitogen/planner.py | 16 ++++++---- ansible_mitogen/process.py | 4 ++- ansible_mitogen/runner.py | 32 +++++++++++++++---- ansible_mitogen/services.py | 17 ++++++++--- ansible_mitogen/target.py | 61 +++---------------------------------- 5 files changed, 55 insertions(+), 75 deletions(-) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 825e5897..ce635239 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -182,9 +182,10 @@ class BinaryPlanner(Planner): def _grant_file_service_access(self, invocation): invocation.connection.parent.call_service( - service_name='mitogen.service.FileService', - method_name='register', + service_name='mitogen.service.PushFileService', + method_name='propagate_to', path=invocation.module_path, + context=invocation.connection.context, ) def plan(self, invocation, **kwargs): @@ -295,7 +296,7 @@ class NewStylePlanner(ScriptPlanner): if os.path.isdir(path) ) - def get_module_utils(self, invocation): + def get_module_map(self, invocation): return invocation.connection.parent.call_service( service_name='ansible_mitogen.services.ModuleDepService', method_name='scan', @@ -308,11 +309,14 @@ class NewStylePlanner(ScriptPlanner): ) def plan(self, invocation): - module_utils = self.get_module_utils(invocation) + module_map = self.get_module_map(invocation) return super(NewStylePlanner, self).plan( invocation, - module_utils=module_utils, - should_fork=(self.get_should_fork(invocation) or bool(module_utils)), + module_map=module_map, + should_fork=( + self.get_should_fork(invocation) or + len(module_map['custom']) > 0 + ) ) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index e36f173d..96703e64 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -155,14 +155,16 @@ class MuxProcess(object): arriving from worker processes. """ file_service = mitogen.service.FileService(router=self.router) + push_file_service = mitogen.service.PushFileService(router=self.router) self.pool = mitogen.service.Pool( router=self.router, services=[ file_service, + push_file_service, ansible_mitogen.services.ContextService(self.router), ansible_mitogen.services.ModuleDepService( router=self.router, - file_service=file_service, + push_file_service=push_file_service, ), ], size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index d96fb2d7..6c353800 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -47,6 +47,7 @@ import sys import tempfile import types +import mitogen.core import ansible_mitogen.target # TODO: circular import try: @@ -213,7 +214,7 @@ class ModuleUtilsImporter(object): def load_module(self, fullname): path, is_pkg = self._by_fullname[fullname] - source = ansible_mitogen.target.get_file(self._context, path) + source = ansible_mitogen.target.get_small_file(self._context, path) code = compile(source, path, 'exec') mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod.__file__ = "master:%s" % (path,) @@ -320,7 +321,7 @@ class ProgramRunner(Runner): """ Fetch the module binary from the master if necessary. """ - return ansible_mitogen.target.get_file( + return ansible_mitogen.target.get_small_file( context=self.service_context, path=self.path, ) @@ -448,12 +449,30 @@ class NewStyleRunner(ScriptRunner): #: path => new-style module bytecode. _code_by_path = {} - def __init__(self, module_utils, **kwargs): + def __init__(self, module_map, **kwargs): super(NewStyleRunner, self).__init__(**kwargs) - self.module_utils = module_utils + self.module_map = module_map + + def _setup_imports(self): + """ + Ensure the local importer has loaded every module needed by the Ansible + module before setup() completes, but before detach() is called in an + asynchronous task. + + The master automatically streams modules towards us concurrent to the + runner invocation, however there is no public API to synchronize on the + completion of those preloads. Instead simply reuse the importer's + synchronization mechanism by importing everything the module will need + prior to detaching. + """ + for fullname, _, _ in self.module_map['custom']: + mitogen.core.import_module(fullname) + for fullname in self.module_map['builtin']: + mitogen.core.import_module(fullname) def setup(self): super(NewStyleRunner, self).setup() + self._stdio = NewStyleStdio(self.args) # It is possible that not supplying the script filename will break some # module, but this has never been a bug report. Instead act like an @@ -461,8 +480,9 @@ class NewStyleRunner(ScriptRunner): self._argv = TemporaryArgv(['']) self._importer = ModuleUtilsImporter( context=self.service_context, - module_utils=self.module_utils, + module_utils=self.module_map['custom'], ) + self._setup_imports() if libc__res_init: libc__res_init() @@ -484,7 +504,7 @@ class NewStyleRunner(ScriptRunner): pass def _get_code(self): - self.source = ansible_mitogen.target.get_file( + self.source = ansible_mitogen.target.get_small_file( context=self.service_context, path=self.path, ) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 5e192e0a..928af5fd 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -369,9 +369,9 @@ class ModuleDepService(mitogen.service.Service): Scan a new-style module and produce a cached mapping of module_utils names to their resolved filesystem paths. """ - def __init__(self, file_service, **kwargs): + def __init__(self, push_file_service, **kwargs): super(ModuleDepService, self).__init__(**kwargs) - self._file_service = file_service + self._push_file_service = push_file_service self._cache = {} def _get_builtin_names(self, builtin_path, resolved): @@ -414,10 +414,17 @@ class ModuleDepService(mitogen.service.Service): # Grant FileService access to paths in here to avoid another 2 IPCs # from WorkerProcess. - self._file_service.register(path=module_path) + self._push_file_service.propagate_to( + path=module_path, + context=context, + ) + for fullname, path, is_pkg in custom: - self._file_service.register(path=path) + self._push_file_service.propagate_to( + path=path, + context=context, + ) for name in self._cache[key]['builtin']: self.router.responder.forward_module(context, name) - return self._cache[key]['custom'] + return self._cache[key] diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 02f51d40..1b3d587b 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -62,61 +62,12 @@ LOG = logging.getLogger(__name__) #: the duration of the process. temp_dir = None -#: Caching of fetched file data. -_file_cache = {} - #: Initialized to an econtext.parent.Context pointing at a pristine fork of #: the target Python interpreter before it executes any code or imports. _fork_parent = None -def _get_file(context, path, out_fp): - """ - Streamily download a file from the connection multiplexer process in the - controller. - - :param mitogen.core.Context context: - Reference to the context hosting the FileService that will be used to - fetch the file. - :param bytes in_path: - FileService registered name of the input file. - :param bytes out_path: - Name of the output path on the local disk. - :returns: - :data:`True` on success, or :data:`False` if the transfer was - interrupted and the output should be discarded. - """ - LOG.debug('_get_file(): fetching %r from %r', path, context) - t0 = time.time() - recv = mitogen.core.Receiver(router=context.router) - metadata = context.call_service( - service_name='mitogen.service.FileService', - method_name='fetch', - path=path, - sender=recv.to_sender(), - ) - - for chunk in recv: - s = chunk.unpickle() - LOG.debug('_get_file(%r): received %d bytes', path, len(s)) - context.call_service_async( - service_name='mitogen.service.FileService', - method_name='acknowledge', - size=len(s), - ).close() - out_fp.write(s) - - ok = out_fp.tell() == metadata['size'] - if not ok: - LOG.error('get_file(%r): receiver was closed early, controller ' - 'is likely shutting down.', path) - - LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', - metadata['size'], path, context, 1000 * (time.time() - t0)) - return ok, metadata - - -def get_file(context, path): +def get_small_file(context, path): """ Basic in-memory caching module fetcher. This generates an one roundtrip for every previously unseen file, so it is only a temporary solution. @@ -130,13 +81,9 @@ def get_file(context, path): :returns: Bytestring file data. """ - if path not in _file_cache: - io = cStringIO.StringIO() - ok, metadata = _get_file(context, path, io) - if not ok: - raise IOError('transfer of %r was interrupted.' % (path,)) - _file_cache[path] = io.getvalue() - return _file_cache[path] + pool = mitogen.service.get_or_create_pool() + service = pool.get_service('mitogen.service.PushFileService') + return service.get(path) def transfer_file(context, in_path, out_path, sync=False, set_owner=False): From 785df88fa42693ea5aaf910f45e822b2aa395534 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 17:42:40 +0100 Subject: [PATCH 24/40] issue #186: core: remove long-forgotten hack. This is likely to break something, it was definitely needed at some point, but I never put much effort into figuring out why. Meanwhile, Python appears to make find_module('ansible.module_utils.facts.') requests in some circumstances, which causes us to indicate the module exists while this hack exists. So remove it, and let's see what breaks. --- mitogen/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/mitogen/core.py b/mitogen/core.py index 58e004ae..9c37f16d 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -623,7 +623,6 @@ class Importer(object): return None _tls.running = True - fullname = fullname.rstrip('.') try: pkgname, dot, _ = fullname.rpartition('.') _v and LOG.debug('%r.find_module(%r)', self, fullname) From 23b2a545cfddb7e3816cbcc77be6a81dc8a5e0b9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 17:04:10 +0100 Subject: [PATCH 25/40] fork: avoid another logging deadlock at startup. The very first task /must/ be clearing out logging locks, since _at_fork() functions call LOG.debug() via Side.close(). Additionally, the root logger is not included in loggerDict, so we must specify it explicitly. --- mitogen/fork.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/mitogen/fork.py b/mitogen/fork.py index 8b29ad0c..dd6008db 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -51,7 +51,7 @@ def fixup_prngs(): sys.modules['ssl'].RAND_add(s, 75.0) -def break_logging_locks(): +def reset_logging_framework(): """ After fork, ensure any logging.Handler locks are recreated, as a variety of threads in the parent may have been using the logging package at the moment @@ -61,10 +61,19 @@ def break_logging_locks(): https://github.com/dw/mitogen/issues/150 for a full discussion. """ logging._lock = threading.RLock() - for name in logging.Logger.manager.loggerDict: + + # The root logger does not appear in the loggerDict. + for name in [None] + list(logging.Logger.manager.loggerDict): for handler in logging.getLogger(name).handlers: handler.createLock() + root = logging.getLogger() + root.handlers = [ + handler + for handler in root.handlers + if not isinstance(handler, mitogen.core.LogHandler) + ] + def handle_child_crash(): """ @@ -125,10 +134,10 @@ class Stream(mitogen.parent.Stream): handle_child_crash() def _child_main(self, childfp): + reset_logging_framework() # Must be first! + fixup_prngs() mitogen.core.Latch._on_fork() mitogen.core.Side._on_fork() - break_logging_locks() - fixup_prngs() if self.on_fork: self.on_fork() mitogen.core.set_block(childfp.fileno()) From 64b60be50c38cbacb6f1f3165f12857094c3b2cd Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 19:51:44 +0100 Subject: [PATCH 26/40] tests: split runner_new_process out of runner_one_job --- tests/ansible/integration/async/all.yml | 1 + .../integration/async/runner_new_process.yml | 54 +++++++++++++++++++ .../integration/async/runner_one_job.yml | 50 ----------------- 3 files changed, 55 insertions(+), 50 deletions(-) create mode 100644 tests/ansible/integration/async/runner_new_process.yml diff --git a/tests/ansible/integration/async/all.yml b/tests/ansible/integration/async/all.yml index b295b526..bae2c308 100644 --- a/tests/ansible/integration/async/all.yml +++ b/tests/ansible/integration/async/all.yml @@ -1,5 +1,6 @@ - import_playbook: result_binary_producing_json.yml - import_playbook: result_binary_producing_junk.yml - import_playbook: result_shell_echo_hi.yml +- import_playbook: runner_new_process.yml - import_playbook: runner_one_job.yml - import_playbook: runner_two_simultaneous_jobs.yml diff --git a/tests/ansible/integration/async/runner_new_process.yml b/tests/ansible/integration/async/runner_new_process.yml new file mode 100644 index 00000000..7b0bf628 --- /dev/null +++ b/tests/ansible/integration/async/runner_new_process.yml @@ -0,0 +1,54 @@ +# Verify async jobs run in a new process. + +- name: integration/async/runner_new_process.yml + hosts: test-targets + any_errors_fatal: true + tasks: + + - name: get process ID. + custom_python_detect_environment: + register: sync_proc1 + + - name: get process ID again. + custom_python_detect_environment: + register: sync_proc2 + + - assert: + that: + - sync_proc1.pid == sync_proc2.pid + when: is_mitogen + + - name: get async process ID. + custom_python_detect_environment: + register: async_proc1 + async: 1000 + poll: 0 + + - name: busy-poll up to 100000 times + async_status: + jid: "{{async_proc1.ansible_job_id}}" + register: async_result1 + until: async_result1.finished + retries: 100000 + delay: 0 + + - name: get async process ID again. + custom_python_detect_environment: + register: async_proc2 + async: 1000 + poll: 0 + + - name: busy-poll up to 100000 times + async_status: + jid: "{{async_proc2.ansible_job_id}}" + register: async_result2 + until: async_result2.finished + retries: 100000 + delay: 0 + + - assert: + that: + - sync_proc1.pid == sync_proc2.pid + - async_result1.pid != sync_proc1.pid + - async_result1.pid != async_result2.pid + when: is_mitogen diff --git a/tests/ansible/integration/async/runner_one_job.yml b/tests/ansible/integration/async/runner_one_job.yml index 989a7cda..04ffc5ea 100644 --- a/tests/ansible/integration/async/runner_one_job.yml +++ b/tests/ansible/integration/async/runner_one_job.yml @@ -6,56 +6,6 @@ any_errors_fatal: true tasks: - # Verify async jobs run in a new process. - - - name: get process ID. - custom_python_detect_environment: - register: sync_proc1 - - - name: get process ID again. - custom_python_detect_environment: - register: sync_proc2 - - - assert: - that: - - sync_proc1.pid == sync_proc2.pid - when: is_mitogen - - - name: get async process ID. - custom_python_detect_environment: - register: async_proc1 - async: 1000 - poll: 0 - - - name: busy-poll up to 100000 times - async_status: - jid: "{{async_proc1.ansible_job_id}}" - register: async_result1 - until: async_result1.finished - retries: 100000 - delay: 0 - - - name: get async process ID again. - custom_python_detect_environment: - register: async_proc2 - async: 1000 - poll: 0 - - - name: busy-poll up to 100000 times - async_status: - jid: "{{async_proc2.ansible_job_id}}" - register: async_result2 - until: async_result2.finished - retries: 100000 - delay: 0 - - - assert: - that: - - sync_proc1.pid == sync_proc2.pid - - async_result1.pid != sync_proc1.pid - - async_result1.pid != async_result2.pid - when: is_mitogen - # Verify output of a single async job. - name: start 2 second op From b3a5fa70b0e97d7eda71f3f42d34a8fb08363f87 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 20:45:07 +0100 Subject: [PATCH 27/40] core: copy debug setting to child's Router too. core.Router doesn't pay attention to this attribute, but after upgrade_router() has been called, the new parent.Router will. --- mitogen/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mitogen/core.py b/mitogen/core.py index 9c37f16d..023b68ca 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1707,6 +1707,7 @@ class ExternalContext(object): enable_profiling() self.broker = Broker() self.router = Router(self.broker) + self.router.debug = self.config.get('debug', False) self.router.undirectional = self.config['unidirectional'] self.router.add_handler( fn=self._on_shutdown_msg, From 9e78c20ebaa04dd62b798377dfbbf7d271c045d4 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 21:01:58 +0100 Subject: [PATCH 28/40] core/parent: add Context.call_no_reply(). --- docs/api.rst | 10 +++++++++- mitogen/core.py | 8 ++++++-- mitogen/parent.py | 5 +++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 20389180..114f58d0 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -857,7 +857,7 @@ Context Class .. method:: call (fn, \*args, \*\*kwargs) - Equivalent to :py:meth:`call_async(fn, \*args, \**kwargs).get_data() + Equivalent to :py:meth:`call_async(fn, \*args, \**kwargs).get().unpickle() `. :returns: @@ -866,6 +866,14 @@ Context Class :raises mitogen.core.CallError: An exception was raised in the remote context during execution. + .. method:: call_no_reply (fn, \*args, \*\*kwargs) + + Send a function call, but expect no return value. If the call fails, + the full exception will be logged to the target context's logging framework. + + :raises mitogen.core.CallError: + An exception was raised in the remote context during execution. + Receiver Class diff --git a/mitogen/core.py b/mitogen/core.py index 023b68ca..3ce3d97a 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1857,12 +1857,16 @@ class ExternalContext(object): for msg in self.recv: try: ret = self._dispatch_one(msg) + _v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret) if msg.reply_to: msg.reply(ret) except Exception: e = sys.exc_info()[1] - _v and LOG.debug('_dispatch_calls: %s', e) - msg.reply(CallError(e)) + if msg.reply_to: + _v and LOG.debug('_dispatch_calls: %s', e) + msg.reply(CallError(e)) + else: + LOG.exception('_dispatch_calls: %r', msg) self.dispatch_stopped = True def main(self): diff --git a/mitogen/parent.py b/mitogen/parent.py index 1d645f23..7e095dea 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -912,6 +912,11 @@ class Context(mitogen.core.Context): receiver = self.call_async(fn, *args, **kwargs) return receiver.get().unpickle(throw_dead=False) + def call_no_reply(self, fn, *args, **kwargs): + LOG.debug('%r.call_no_reply(%r, *%r, **%r)', + self, fn, args, kwargs) + self.send(make_call_msg(fn, *args, **kwargs)) + def shutdown(self, wait=False): LOG.debug('%r.shutdown() sending SHUTDOWN', self) latch = mitogen.core.Latch() From 526590027afce04193ea329ea44f19efb8bbaaed Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 22:07:59 +0100 Subject: [PATCH 29/40] issue #186: PushFileService improvements. New method to send all modules and files in one roundtrip. --- mitogen/service.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/mitogen/service.py b/mitogen/service.py index 424f21c8..1bd426d1 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -590,6 +590,7 @@ class PushFileService(Service): finally: self._lock.release() + LOG.debug('%r.get(%r) waiting for uncached file to arrive', self, path) latch.get() LOG.debug('%r.get(%r) -> %r', self, path, self._cache[path]) return self._cache[path] @@ -614,6 +615,22 @@ class PushFileService(Service): context=context ).close() + @expose(policy=AllowParents()) + @arg_spec({ + 'context': mitogen.core.Context, + 'paths': list, + 'modules': list, + }) + def propagate_paths_and_modules(self, context, paths, modules): + """ + One size fits all method to ensure a target context has been preloaded + with a set of small files and Python modules. + """ + for path in paths: + self.propagate_to(context, path) + for fullname in modules: + self.router.responder.forward_module(context, fullname) + @expose(policy=AllowParents()) @arg_spec({ 'context': mitogen.core.Context, @@ -649,7 +666,7 @@ class PushFileService(Service): self, path, data, context) waiters = self._store(path, data) if context.context_id != mitogen.context_id: - self._forward(path, context) + self._forward(context, path) for callback in waiters: callback() From caffaa79f7636886f67418ef5108ec7a26cd09a1 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 22:11:26 +0100 Subject: [PATCH 30/40] issue #186: rework async/forked tasks again. The controller must know the ID of the forked child in order to propagate dependencies to it, so forking+starting the module run cannot happen entirely on the target, without some additional mechanism to wait-and-repropagate the deps as they arrive on the target. Rework things so that init_child() also handles starting the fork parent, and returns it along with the context's home directory in a single round trip. Now master knows the identity of the fork parent, it can directly create fork children and call run_module_async() in them. This necessitates 2 roundtrips to start an asynchronous task. This whole thing sucks and entirely needs simplified, but for now things almost work, so keeping it. connection.py: * Expect ContextService to return the entire dict return value of init_child(). Store the fork_contxt from the return value. planner.py: * Rework Planner to store the invocation as an instance attribute, to simplify method calls. * Add Planner.get_push_files() and Planner.get_module_deps(). * Add _propagate_deps() which takes a Planner and ensures the deps it describes are sent to a (non forked or forked) context. * Move async task logic out of target.py and into invoke() / _invoke_*(). process.py: * Services no longer need references to each other. planner.py handles sending module deps with one extra RPC. services.py: * Return "init_child_result" key instead of simple "home_dir" key. * Get rid of dep propagation from ModuleDepService, it lives in planner.py now. target.py: * Get rid of async task start logic, lives in planner.py now. --- ansible_mitogen/connection.py | 32 +++-- ansible_mitogen/planner.py | 247 ++++++++++++++++++++++------------ ansible_mitogen/process.py | 11 +- ansible_mitogen/runner.py | 15 ++- ansible_mitogen/services.py | 46 +++---- ansible_mitogen/target.py | 89 ++++++------ 6 files changed, 257 insertions(+), 183 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index ee34c22b..b5f80910 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -298,13 +298,17 @@ class Connection(ansible.plugins.connection.ConnectionBase): router = None #: mitogen.master.Context representing the parent Context, which is - #: presently always the master process. + #: presently always the connection multiplexer process. parent = None #: mitogen.master.Context connected to the target user account on the #: target machine (i.e. via sudo). context = None + #: mitogen.master.Context connected to the fork parent process in the + #: target user account. + fork_context = None + #: Only sudo and su are supported for now. become_methods = ['sudo', 'su'] @@ -336,7 +340,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): host_vars = None #: Set after connection to the target context's home directory. - _homedir = None + home_dir = None def __init__(self, play_context, new_stdin, **kwargs): assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( @@ -376,7 +380,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): @property def homedir(self): self._connect() - return self._homedir + return self.home_dir @property def connected(self): @@ -470,15 +474,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): raise ansible.errors.AnsibleConnectionFailure(dct['msg']) self.context = dct['context'] - self._homedir = dct['home_dir'] - - def get_context_name(self): - """ - Return the name of the target context we issue commands against, i.e. a - unique string useful as a key for related data, such as a list of - modules uploaded to the target. - """ - return self.context.name + self.fork_context = dct['init_child_result']['fork_context'] + self.home_dir = dct['init_child_result']['home_dir'] def close(self, new_task=False): """ @@ -526,6 +523,17 @@ class Connection(ansible.plugins.connection.ConnectionBase): LOG.debug('Call took %d ms: %s%r', 1000 * (time.time() - t0), func.func_name, args) + def create_fork_child(self): + """ + Fork a new child off the target context. The actual fork occurs from + the 'virginal fork parent', which does not any Ansible modules prior to + fork, to avoid conflicts resulting from custom module_utils paths. + + :returns: + mitogen.core.Context of the new child. + """ + return self.call(ansible_mitogen.target.create_fork_child) + def exec_command(self, cmd, in_data='', sudoable=True, mitogen_chdir=None): """ Implement exec_command() by calling the corresponding diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index ce635239..c540119b 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -35,8 +35,10 @@ files/modules known missing. """ from __future__ import absolute_import +import json import logging import os +import random from ansible.executor import module_common import ansible.errors @@ -132,20 +134,36 @@ class Planner(object): file, indicates whether or not it understands how to run the module, and exports a method to run the module. """ - def detect(self, invocation): + def __init__(self, invocation): + self._inv = invocation + + def detect(self): """ Return true if the supplied `invocation` matches the module type implemented by this planner. """ raise NotImplementedError() - def get_should_fork(self, invocation): + def should_fork(self): """ Asynchronous tasks must always be forked. """ - return invocation.wrap_async + return self._inv.wrap_async + + def get_push_files(self): + """ + Return a list of files that should be propagated to the target context + using PushFileService. The default implementation pushes nothing. + """ + return [] + + def get_module_deps(self): + """ + Return a list of the Python module names imported by the module. + """ + return [] - def plan(self, invocation, **kwargs): + def get_kwargs(self, **kwargs): """ If :meth:`detect` returned :data:`True`, plan for the module's execution, including granting access to or delivering any files to it @@ -161,9 +179,7 @@ class Planner(object): } """ kwargs.setdefault('emulate_tty', True) - kwargs.setdefault('service_context', invocation.connection.parent) - kwargs.setdefault('should_fork', self.get_should_fork(invocation)) - kwargs.setdefault('wrap_async', invocation.wrap_async) + kwargs.setdefault('service_context', self._inv.connection.parent) return kwargs def __repr__(self): @@ -177,26 +193,19 @@ class BinaryPlanner(Planner): """ runner_name = 'BinaryRunner' - def detect(self, invocation): - return module_common._is_binary(invocation.module_source) + def detect(self): + return module_common._is_binary(self._inv.module_source) - def _grant_file_service_access(self, invocation): - invocation.connection.parent.call_service( - service_name='mitogen.service.PushFileService', - method_name='propagate_to', - path=invocation.module_path, - context=invocation.connection.context, - ) + def get_push_files(self): + return [self._inv.module_path] - def plan(self, invocation, **kwargs): - self._grant_file_service_access(invocation) - return super(BinaryPlanner, self).plan( - invocation=invocation, + def get_kwargs(self, **kwargs): + return super(BinaryPlanner, self).get_kwargs( runner_name=self.runner_name, - module=invocation.module_name, - path=invocation.module_path, - args=invocation.module_args, - env=invocation.env, + module=self._inv.module_name, + path=self._inv.module_path, + args=self._inv.module_args, + env=self._inv.env, **kwargs ) @@ -206,24 +215,25 @@ class ScriptPlanner(BinaryPlanner): Common functionality for script module planners -- handle interpreter detection and rewrite. """ - def _get_interpreter(self, invocation): - interpreter, arg = parse_script_interpreter(invocation.module_source) + def _get_interpreter(self): + interpreter, arg = parse_script_interpreter( + self._inv.module_source + ) if interpreter is None: raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % ( - invocation.module_name, + self._inv.module_name, )) key = u'ansible_%s_interpreter' % os.path.basename(interpreter).strip() try: - template = invocation.task_vars[key].strip() - return invocation.templar.template(template), arg + template = self._inv.task_vars[key].strip() + return self._inv.templar.template(template), arg except KeyError: return interpreter, arg - def plan(self, invocation, **kwargs): - interpreter, arg = self._get_interpreter(invocation) - return super(ScriptPlanner, self).plan( - invocation=invocation, + def get_kwargs(self, **kwargs): + interpreter, arg = self._get_interpreter() + return super(ScriptPlanner, self).get_kwargs( interpreter_arg=arg, interpreter=interpreter, **kwargs @@ -237,8 +247,8 @@ class JsonArgsPlanner(ScriptPlanner): """ runner_name = 'JsonArgsRunner' - def detect(self, invocation): - return module_common.REPLACER_JSONARGS in invocation.module_source + def detect(self): + return module_common.REPLACER_JSONARGS in self._inv.module_source class WantJsonPlanner(ScriptPlanner): @@ -255,8 +265,8 @@ class WantJsonPlanner(ScriptPlanner): """ runner_name = 'WantJsonRunner' - def detect(self, invocation): - return 'WANT_JSON' in invocation.module_source + def detect(self): + return 'WANT_JSON' in self._inv.module_source class NewStylePlanner(ScriptPlanner): @@ -267,56 +277,59 @@ class NewStylePlanner(ScriptPlanner): """ runner_name = 'NewStyleRunner' - def _get_interpreter(self, invocation): + def detect(self): + return 'from ansible.module_utils.' in self._inv.module_source + + def _get_interpreter(self): return None, None - def _grant_file_service_access(self, invocation): - """ - Stub out BinaryPlanner's method since ModuleDepService makes internal - calls to grant file access, avoiding 2 IPCs per task invocation. - """ + def get_push_files(self): + return super(NewStylePlanner, self).get_push_files() + [ + path + for fullname, path, is_pkg in self.get_module_map()['custom'] + ] - def get_should_fork(self, invocation): + def get_module_deps(self): + return self.get_module_map()['builtin'] + + def should_fork(self): """ In addition to asynchronous tasks, new-style modules should be forked - if mitogen_task_isolation=fork. + if the user specifies mitogen_task_isolation=fork, or if the new-style + module has a custom module search path. """ return ( - super(NewStylePlanner, self).get_should_fork(invocation) or - (invocation.task_vars.get('mitogen_task_isolation') == 'fork') + super(NewStylePlanner, self).should_fork() or + (self._inv.task_vars.get('mitogen_task_isolation') == 'fork') or + (len(self.get_module_map()['custom']) > 0) ) - def detect(self, invocation): - return 'from ansible.module_utils.' in invocation.module_source - - def get_search_path(self, invocation): + def get_search_path(self): return tuple( path for path in module_utils_loader._get_paths(subdirs=False) if os.path.isdir(path) ) - def get_module_map(self, invocation): - return invocation.connection.parent.call_service( - service_name='ansible_mitogen.services.ModuleDepService', - method_name='scan', + _module_map = None - module_name='ansible_module_%s' % (invocation.module_name,), - module_path=invocation.module_path, - search_path=self.get_search_path(invocation), - builtin_path=module_common._MODULE_UTILS_PATH, - context=invocation.connection.context, - ) + def get_module_map(self): + if self._module_map is None: + self._module_map = self._inv.connection.parent.call_service( + service_name='ansible_mitogen.services.ModuleDepService', + method_name='scan', - def plan(self, invocation): - module_map = self.get_module_map(invocation) - return super(NewStylePlanner, self).plan( - invocation, - module_map=module_map, - should_fork=( - self.get_should_fork(invocation) or - len(module_map['custom']) > 0 + module_name='ansible_module_%s' % (self._inv.module_name,), + module_path=self._inv.module_path, + search_path=self.get_search_path(), + builtin_path=module_common._MODULE_UTILS_PATH, + context=self._inv.connection.context, ) + return self._module_map + + def get_kwargs(self): + return super(NewStylePlanner, self).get_kwargs( + module_map=self.get_module_map(), ) @@ -346,14 +359,14 @@ class ReplacerPlanner(NewStylePlanner): """ runner_name = 'ReplacerRunner' - def detect(self, invocation): - return module_common.REPLACER in invocation.module_source + def detect(self): + return module_common.REPLACER in self._inv.module_source class OldStylePlanner(ScriptPlanner): runner_name = 'OldStyleRunner' - def detect(self, invocation): + def detect(self): # Everything else. return True @@ -375,24 +388,84 @@ def get_module_data(name): return path, source -def invoke(invocation): - """ - Find a suitable Planner that knows how to run `invocation`. - """ - (invocation.module_path, - invocation.module_source) = get_module_data(invocation.module_name) +def _propagate_deps(invocation, planner, context): + invocation.connection.parent.call_service( + service_name='mitogen.service.PushFileService', + method_name='propagate_paths_and_modules', + context=context, + paths=planner.get_push_files(), + modules=planner.get_module_deps(), + ) + + +def _invoke_async_task(invocation, planner): + job_id = '%016x' % random.randint(0, 2**64) + context = invocation.connection.create_fork_child() + _propagate_deps(invocation, planner, context) + context.call_no_reply( + ansible_mitogen.target.run_module_async, + job_id=job_id, + kwargs=planner.get_kwargs(), + ) + + return { + 'stdout': json.dumps({ + # modules/utilities/logic/async_wrapper.py::_run_module(). + 'changed': True, + 'started': 1, + 'finished': 0, + 'ansible_job_id': job_id, + }) + } + + +def _invoke_forked_task(invocation, planner): + context = invocation.connection.create_fork_child() + _propagate_deps(invocation, planner, context) + try: + return context.call( + ansible_mitogen.target.run_module, + kwargs=planner.get_kwargs(), + ) + finally: + context.shutdown() + +def _get_planner(invocation): for klass in _planners: - planner = klass() - if planner.detect(invocation): + planner = klass(invocation) + if planner.detect(): LOG.debug('%r accepted %r (filename %r)', planner, invocation.module_name, invocation.module_path) - return invocation.action._postprocess_response( - invocation.connection.call( - ansible_mitogen.target.run_module, - planner.plan(invocation), - ) - ) + return planner LOG.debug('%r rejected %r', planner, invocation.module_name) - raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) + + +def invoke(invocation): + """ + Find a Planner subclass corresnding to `invocation` and use it to invoke + the module. + + :param Invocation invocation: + :returns: + Module return dict. + :raises ansible.errors.AnsibleError: + Unrecognized/unsupported module type. + """ + (invocation.module_path, + invocation.module_source) = get_module_data(invocation.module_name) + planner = _get_planner(invocation) + + if invocation.wrap_async: + response = _invoke_async_task(invocation, planner) + elif planner.should_fork(): + response = _invoke_forked_task(invocation, planner) + else: + _propagate_deps(invocation, planner, invocation.connection.context) + response = invocation.connection.call( + ansible_mitogen.target.run_module, + kwargs=planner.get_kwargs(), + ) + + return invocation.action._postprocess_response(response) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 96703e64..e97d695d 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -154,18 +154,13 @@ class MuxProcess(object): Construct a ContextService and a thread to service requests for it arriving from worker processes. """ - file_service = mitogen.service.FileService(router=self.router) - push_file_service = mitogen.service.PushFileService(router=self.router) self.pool = mitogen.service.Pool( router=self.router, services=[ - file_service, - push_file_service, + mitogen.service.FileService(router=self.router), + mitogen.service.PushFileService(router=self.router), ansible_mitogen.services.ContextService(self.router), - ansible_mitogen.services.ModuleDepService( - router=self.router, - push_file_service=push_file_service, - ), + ansible_mitogen.services.ModuleDepService(self.router), ], size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), ) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 6c353800..a399b453 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -111,13 +111,20 @@ class Runner(object): Context to which we should direct FileService calls. For now, always the connection multiplexer process on the controller. :param dict args: - Ansible module arguments. A strange mixture of user and internal keys - created by ActionBase._execute_module(). + Ansible module arguments. A mixture of user and internal keys created + by :meth:`ansible.plugins.action.ActionBase._execute_module`. :param dict env: Additional environment variables to set during the run. + + :param mitogen.core.ExternalContext econtext: + When `detach` is :data:`True`, a reference to the ExternalContext the + runner is executing in. + :param bool detach: + When :data:`True`, indicate the runner should detach the context from + its parent after setup has completed successfully. """ - def __init__(self, module, service_context, econtext=None, detach=False, - args=None, env=None): + def __init__(self, module, service_context, args=None, env=None, + econtext=None, detach=False): if args is None: args = {} diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 928af5fd..0331efd2 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -251,13 +251,18 @@ class ContextService(mitogen.service.Service): { 'context': mitogen.core.Context or None, - 'home_dir': str or None, + 'init_child_result': { + 'fork_context': mitogen.core.Context, + 'home_dir': str or None, + }, 'msg': str or None } - Where either `msg` is an error message and the remaining fields are - :data:`None`, or `msg` is :data:`None` and the remaining fields are - set. + Where `context` is a reference to the newly constructed context, + `init_child_result` is the result of executing + :func:`ansible_mitogen.target.init_child` in that context, `msg` is + an error message and the remaining fields are :data:`None`, or + `msg` is :data:`None` and the remaining fields are set. """ try: method = getattr(self.router, spec['method']) @@ -276,11 +281,7 @@ class ContextService(mitogen.service.Service): lambda: self._on_stream_disconnect(stream)) self._send_module_forwards(context) - home_dir = context.call(os.path.expanduser, '~') - - # We don't need to wait for the result of this. Ideally we'd check its - # return value somewhere, but logs will catch a failure anyway. - context.call_async(ansible_mitogen.target.init_child) + init_child_result = context.call(ansible_mitogen.target.init_child) if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'): from mitogen import debug @@ -290,7 +291,7 @@ class ContextService(mitogen.service.Service): self._refs_by_context[context] = 0 return { 'context': context, - 'home_dir': home_dir, + 'init_child_result': init_child_result, 'msg': None, } @@ -341,7 +342,7 @@ class ContextService(mitogen.service.Service): :returns dict: * context: mitogen.master.Context or None. - * homedir: Context's home directory or None. + * init_child_result: Result of :func:`init_child`. * msg: StreamError exception text or None. * method_name: string failing method name. """ @@ -356,7 +357,7 @@ class ContextService(mitogen.service.Service): except mitogen.core.StreamError as e: return { 'context': None, - 'home_dir': None, + 'init_child_result': None, 'method_name': spec['method'], 'msg': str(e), } @@ -369,9 +370,8 @@ class ModuleDepService(mitogen.service.Service): Scan a new-style module and produce a cached mapping of module_utils names to their resolved filesystem paths. """ - def __init__(self, push_file_service, **kwargs): - super(ModuleDepService, self).__init__(**kwargs) - self._push_file_service = push_file_service + def __init__(self, *args, **kwargs): + super(ModuleDepService, self).__init__(*args, **kwargs) self._cache = {} def _get_builtin_names(self, builtin_path, resolved): @@ -411,20 +411,4 @@ class ModuleDepService(mitogen.service.Service): 'builtin': builtin, 'custom': custom, } - - # Grant FileService access to paths in here to avoid another 2 IPCs - # from WorkerProcess. - self._push_file_service.propagate_to( - path=module_path, - context=context, - ) - - for fullname, path, is_pkg in custom: - self._push_file_service.propagate_to( - path=path, - context=context, - ) - - for name in self._cache[key]['builtin']: - self.router.responder.forward_module(context, name) return self._cache[key] diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 1b3d587b..63adba5f 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -40,7 +40,6 @@ import logging import operator import os import pwd -import random import re import stat import subprocess @@ -81,7 +80,7 @@ def get_small_file(context, path): :returns: Bytestring file data. """ - pool = mitogen.service.get_or_create_pool() + pool = mitogen.service.get_or_create_pool(router=context.router) service = pool.get_service('mitogen.service.PushFileService') return service.get(path) @@ -211,52 +210,51 @@ def init_child(econtext): This is necessary to prevent modules that are executed in-process from polluting the global interpreter state in a way that effects explicitly isolated modules. + + :returns: + Dict like:: + + { + 'fork_context': mitogen.core.Context. + 'home_dir': str. + } + + Where `fork_context` refers to the newly forked 'fork parent' context + the controller will use to start forked jobs, and `home_dir` is the + home directory for the active user account. """ global _fork_parent mitogen.parent.upgrade_router(econtext) _fork_parent = econtext.router.fork() reset_temp_dir(econtext) + return { + 'fork_context': _fork_parent, + 'home_dir': os.path.expanduser('~'), + } + @mitogen.core.takes_econtext -def start_fork_child(wrap_async, kwargs, econtext): +def create_fork_child(econtext): + """ + For helper functions executed in the fork parent context, arrange for + the context's router to be upgraded as necessary and for a new child to be + prepared. + """ mitogen.parent.upgrade_router(econtext) context = econtext.router.fork() context.call(reset_temp_dir) - if not wrap_async: - try: - return context.call(run_module, kwargs) - finally: - context.shutdown() - - job_id = '%016x' % random.randint(0, 2**64) - kwargs['detach'] = True - kwargs['econtext'] = econtext - context.call_async(run_module_async, job_id, kwargs) - return { - 'stdout': json.dumps({ - # modules/utilities/logic/async_wrapper.py::_run_module(). - 'changed': True, - 'started': 1, - 'finished': 0, - 'ansible_job_id': job_id, - }) - } + LOG.debug('create_fork_child() -> %r', context) + return context -@mitogen.core.takes_econtext -def run_module(kwargs, econtext): +def run_module(kwargs): """ Set up the process environment in preparation for running an Ansible module. This monkey-patches the Ansible libraries in various places to prevent it from trying to kill the process on completion, and to prevent it from reading sys.stdin. """ - should_fork = kwargs.pop('should_fork', False) - wrap_async = kwargs.pop('wrap_async', False) - if should_fork: - return _fork_parent.call(start_fork_child, wrap_async, kwargs) - runner_name = kwargs.pop('runner_name') klass = getattr(ansible_mitogen.runner, runner_name) impl = klass(**kwargs) @@ -287,21 +285,27 @@ def _write_job_status(job_id, dct): os.rename(path + '.tmp', path) -def _run_module_async(job_id, kwargs, econtext): +def _run_module_async(kwargs, job_id, econtext): """ - Body on run_module_async(). - 1. Immediately updates the status file to mark the job as started. 2. Installs a timer/signal handler to implement the time limit. 3. Runs as with run_module(), writing the result to the status file. + + :param dict kwargs: + Runner keyword arguments. + :param str job_id: + String job ID. """ _write_job_status(job_id, { 'started': 1, - 'finished': 0 + 'finished': 0, + 'pid': os.getpid() }) + #kwargs['detach'] = True + #kwargs['econtext'] = econtext kwargs['emulate_tty'] = False - dct = run_module(kwargs, econtext) + dct = run_module(kwargs) if mitogen.core.PY3: for key in 'stdout', 'stderr': dct[key] = dct[key].decode('utf-8', 'surrogateescape') @@ -325,18 +329,21 @@ def _run_module_async(job_id, kwargs, econtext): @mitogen.core.takes_econtext -def run_module_async(job_id, kwargs, econtext): +def run_module_async(kwargs, job_id, econtext): """ - Since run_module_async() is invoked with .call_async(), with nothing to - read the result from the corresponding Receiver, wrap the body in an - exception logger, and wrap that in something that tears down the context on - completion. + Arrange for a module to be executed with its run status and result + serialized to a disk file. This function expects to run in a child forked + using :func:`create_fork_child`. """ try: try: - _run_module_async(job_id, kwargs, econtext) + _run_module_async(kwargs, job_id, econtext) except Exception: - LOG.exception('_run_module_async crashed') + # Catch any (ansible_mitogen) bugs and write them to the job file. + _write_job_status(job_id, { + "failed": 1, + "msg": traceback.format_exc(), + }) finally: econtext.broker.shutdown() From f6d9b074ff149d73164bac224f76addb2f77d231 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 22:20:46 +0100 Subject: [PATCH 31/40] master: reduce module verbosity somewhat. --- mitogen/master.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mitogen/master.py b/mitogen/master.py index 1c346d36..b024cdd1 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -56,6 +56,7 @@ import mitogen import mitogen.core import mitogen.minify import mitogen.parent +from mitogen.core import IOLOG from mitogen.core import LOG @@ -305,8 +306,8 @@ class ModuleFinder(object): """Attempt to fetch source code via pkgutil. In an ideal world, this would be the only required implementation of get_module().""" loader = pkgutil.find_loader(fullname) - LOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r', - fullname, loader) + IOLOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r', + fullname, loader) if not loader: return @@ -598,7 +599,7 @@ class ModuleResponder(object): ) def _forward_module(self, context, fullname): - LOG.debug('%r._forward_module(%r, %r)', self, context, fullname) + IOLOG.debug('%r._forward_module(%r, %r)', self, context, fullname) path = [] while fullname: path.append(fullname) From 92ecf295592797bd52c173b4896b4e1ce40567c9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 22:27:54 +0100 Subject: [PATCH 32/40] core: check in the hacks that let Ansible work just now. --- mitogen/core.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mitogen/core.py b/mitogen/core.py index 3ce3d97a..b9b202a2 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -623,6 +623,8 @@ class Importer(object): return None _tls.running = True + # TODO: hack: this is papering over a bug elsewhere. + fullname = fullname.rstrip('.') try: pkgname, dot, _ = fullname.rpartition('.') _v and LOG.debug('%r.find_module(%r)', self, fullname) @@ -715,6 +717,8 @@ class Importer(object): def load_module(self, fullname): _v and LOG.debug('Importer.load_module(%r)', fullname) + # TODO: hack: this is papering over a bug elsewhere. + fullname = fullname.rstrip('.') self._refuse_imports(fullname) event = threading.Event() From 05e0b134f92fe5e37914ab6f73a938978def3fec Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Jun 2018 23:06:35 +0100 Subject: [PATCH 33/40] service: simplify CALL_SERVICE stub and fix race. If PushService.store_and_forward() loses the race to arrive at a brand new context first, and the context's main thread is already executing a CALL_FUNCTION that is blocked on the result of PushService, deadlock could occur in the old scheme. Instead (for now) simply spam a thread for each incoming message, and use the get_or_create_pool() lock to ensure things work out in the end. This could potentially generate a huge number of threads given the wrong app, but we'll fix that problem when it appears. --- mitogen/core.py | 35 +++++++++++++---------------------- mitogen/service.py | 23 ++--------------------- 2 files changed, 15 insertions(+), 43 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index b9b202a2..ebd41f84 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -75,10 +75,6 @@ IOLOG.setLevel(logging.INFO) _v = False _vv = False -# Also taken by Broker, no blocking work can occur with it held. -_service_call_lock = threading.Lock() -_service_calls = [] - GET_MODULE = 100 CALL_FUNCTION = 101 FORWARD_LOG = 102 @@ -1648,27 +1644,22 @@ class ExternalContext(object): if not self.config['profiling']: os.kill(os.getpid(), signal.SIGTERM) + def _service_stub_main(self, msg): + import mitogen.service + pool = mitogen.service.get_or_create_pool(router=self.router) + pool._receiver._on_receive(msg) + def _on_call_service_msg(self, msg): """ - Stub CALL_SERVICE handler, push message on temporary queue and invoke - _on_stub_call() from the main thread. + Stub service handler. Start a thread to import the mitogen.service + implementation from, and deliver the message to the newly constructed + pool. This must be done as CALL_SERVICE for e.g. PushFileService may + race with a CALL_FUNCTION blocking the main thread waiting for a result + from that service. """ - if msg.is_dead: - return - _service_call_lock.acquire() - try: - _service_calls.append(msg) - finally: - _service_call_lock.release() - - self.router.route( - Message.pickled( - dst_id=mitogen.context_id, - handle=CALL_FUNCTION, - obj=('mitogen.service', None, '_on_stub_call', (), {}), - router=self.router, - ) - ) + if not msg.is_dead: + th = threading.Thread(target=self._service_stub_main, args=(msg,)) + th.start() def _on_shutdown_msg(self, msg): _v and LOG.debug('_on_shutdown_msg(%r)', msg) diff --git a/mitogen/service.py b/mitogen/service.py index 1bd426d1..811791b6 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -58,26 +58,6 @@ def get_or_create_pool(size=None, router=None): _pool_lock.release() -@mitogen.core.takes_router -def _on_stub_call(router): - """ - Called for each message received by the core.py stub CALL_SERVICE handler. - Create the pool if it doesn't already exist, and push enqueued messages - into the pool's receiver. This may be called more than once as the stub - service handler runs in asynchronous context, while _on_stub_call() happens - on the main thread. Multiple CALL_SERVICE may end up enqueued before Pool - has a chance to install the real CALL_SERVICE handler. - """ - pool = get_or_create_pool(router=router) - mitogen.core._service_call_lock.acquire() - try: - for msg in mitogen.core._service_calls: - pool._receiver._on_receive(msg) - del mitogen.core._service_calls[:] - finally: - mitogen.core._service_call_lock.release() - - def validate_arg_spec(spec, args): for name in spec: try: @@ -250,7 +230,8 @@ class Invoker(object): except Exception: if no_reply: LOG.exception('While calling no-reply method %s.%s', - type(self).__name__, method.func_name) + type(self.service).__name__, + method.func_name) else: raise From ae20a689efff6a3f906909911a57417d7becd58c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 00:05:24 +0100 Subject: [PATCH 34/40] issue #186: finally enable detach. --- ansible_mitogen/runner.py | 15 +++++++-------- ansible_mitogen/target.py | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index a399b453..21a9ee99 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -462,9 +462,9 @@ class NewStyleRunner(ScriptRunner): def _setup_imports(self): """ - Ensure the local importer has loaded every module needed by the Ansible - module before setup() completes, but before detach() is called in an - asynchronous task. + Ensure the local importer and PushFileService has everything for the + Ansible module before setup() completes, but before detach() is called + in an asynchronous task. The master automatically streams modules towards us concurrent to the runner invocation, however there is no public API to synchronize on the @@ -476,6 +476,10 @@ class NewStyleRunner(ScriptRunner): mitogen.core.import_module(fullname) for fullname in self.module_map['builtin']: mitogen.core.import_module(fullname) + self.source = ansible_mitogen.target.get_small_file( + context=self.service_context, + path=self.path, + ) def setup(self): super(NewStyleRunner, self).setup() @@ -511,11 +515,6 @@ class NewStyleRunner(ScriptRunner): pass def _get_code(self): - self.source = ansible_mitogen.target.get_small_file( - context=self.service_context, - path=self.path, - ) - try: return self._code_by_path[self.path] except KeyError: diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 63adba5f..3ac97c2f 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -302,8 +302,8 @@ def _run_module_async(kwargs, job_id, econtext): 'pid': os.getpid() }) - #kwargs['detach'] = True - #kwargs['econtext'] = econtext + kwargs['detach'] = True + kwargs['econtext'] = econtext kwargs['emulate_tty'] = False dct = run_module(kwargs) if mitogen.core.PY3: From 3909cb11f624a4fde6b18b8a102467602f0395ce Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 00:57:14 +0100 Subject: [PATCH 35/40] service: recreate the pool after fork. --- mitogen/service.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mitogen/service.py b/mitogen/service.py index 811791b6..3e05deb7 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -42,6 +42,7 @@ from mitogen.core import LOG DEFAULT_POOL_SIZE = 16 _pool = None +_pool_pid = None #: Serialize pool construction. _pool_lock = threading.Lock() @@ -49,10 +50,12 @@ _pool_lock = threading.Lock() @mitogen.core.takes_router def get_or_create_pool(size=None, router=None): global _pool + global _pool_pid _pool_lock.acquire() try: - if _pool is None: + if _pool_pid != os.getpid(): _pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE) + _pool_pid = os.getpid() return _pool finally: _pool_lock.release() @@ -435,6 +438,8 @@ class Pool(object): thread.start() self._threads.append(thread) + LOG.debug('%r: initialized', self) + @property def size(self): return len(self._threads) From 4bd992e35a14ddd0a74e24347dceb997170cec3c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 00:59:24 +0100 Subject: [PATCH 36/40] issue #186: move module code fetch back to overridden method --- ansible_mitogen/runner.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 21a9ee99..522f1e2e 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -476,10 +476,6 @@ class NewStyleRunner(ScriptRunner): mitogen.core.import_module(fullname) for fullname in self.module_map['builtin']: mitogen.core.import_module(fullname) - self.source = ansible_mitogen.target.get_small_file( - context=self.service_context, - path=self.path, - ) def setup(self): super(NewStyleRunner, self).setup() @@ -512,7 +508,10 @@ class NewStyleRunner(ScriptRunner): pass def _setup_program(self): - pass + self.source = ansible_mitogen.target.get_small_file( + context=self.service_context, + path=self.path, + ) def _get_code(self): try: From df8fe59eda6c37870a4079f228f76d2535bbf166 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 01:00:12 +0100 Subject: [PATCH 37/40] tests: replace hard-coded sleep with a polling loop --- .../async/result_binary_producing_json.yml | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/ansible/integration/async/result_binary_producing_json.yml b/tests/ansible/integration/async/result_binary_producing_json.yml index 8b6d59b9..61d63a08 100644 --- a/tests/ansible/integration/async/result_binary_producing_json.yml +++ b/tests/ansible/integration/async/result_binary_producing_json.yml @@ -10,7 +10,21 @@ poll: 0 register: job - - shell: sleep 1 + - assert: + that: | + job.ansible_job_id and + (job.changed == True) and + (job.started == 1) and + (job.changed == True) and + (job.finished == 0) + + - name: busy-poll up to 100000 times + async_status: + jid: "{{job.ansible_job_id}}" + register: result + until: result.finished + retries: 100000 + delay: 0 - slurp: src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}" From e35694acd50f46d9a212b8f00abe756279cc0d1d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 01:22:46 +0100 Subject: [PATCH 38/40] ansible: flake8 fixes. --- ansible_mitogen/connection.py | 4 ++-- ansible_mitogen/mixins.py | 6 ------ ansible_mitogen/module_finder.py | 2 +- ansible_mitogen/planner.py | 2 -- ansible_mitogen/process.py | 2 +- ansible_mitogen/runner.py | 1 - ansible_mitogen/strategy.py | 1 - ansible_mitogen/target.py | 2 -- mitogen/service.py | 1 + 9 files changed, 5 insertions(+), 16 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index b5f80910..90b5e41a 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -31,7 +31,6 @@ import logging import os import shlex import stat -import sys import time import jinja2.runtime @@ -58,6 +57,7 @@ def _connect_local(spec): } } + def wrap_or_none(klass, value): if value is not None: return klass(value) @@ -393,7 +393,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): raise ansible.errors.AnsibleConnectionFailure( self.unknown_via_msg % ( self.mitogen_via, - config['inventory_name'], + inventory_name, ) ) diff --git a/ansible_mitogen/mixins.py b/ansible_mitogen/mixins.py index 18d29da3..e4a7a748 100644 --- a/ansible_mitogen/mixins.py +++ b/ansible_mitogen/mixins.py @@ -32,7 +32,6 @@ import logging import os import pwd import shutil -import tempfile import traceback from ansible.module_utils._text import to_bytes @@ -43,11 +42,6 @@ import ansible.constants import ansible.plugins import ansible.plugins.action -try: - from ansible.plugins.loader import module_loader -except ImportError: # Ansible<2.4 - from ansible.plugins import module_loader - import mitogen.core import mitogen.select import mitogen.utils diff --git a/ansible_mitogen/module_finder.py b/ansible_mitogen/module_finder.py index 1d062454..79e1882c 100644 --- a/ansible_mitogen/module_finder.py +++ b/ansible_mitogen/module_finder.py @@ -85,7 +85,7 @@ def find(name, path=(), parent=None): head, _, tail = name.partition('.') try: tup = imp.find_module(head, list(path)) - except ImportError as e: + except ImportError: return parent fp, modpath, (suffix, mode, kind) = tup diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index c540119b..1006956c 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -52,9 +52,7 @@ except ImportError: # Ansible <2.4 from ansible.plugins import module_utils_loader import mitogen -import mitogen.service import ansible_mitogen.target -import ansible_mitogen.services LOG = logging.getLogger(__name__) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index e97d695d..c4f58310 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -136,7 +136,7 @@ class MuxProcess(object): """ Construct a Router, Broker, and mitogen.unix listener """ - self.router = mitogen.master.Router(max_message_size=4096*1048576) + self.router = mitogen.master.Router(max_message_size=4096 * 1048576) self.router.responder.whitelist_prefix('ansible') self.router.responder.whitelist_prefix('ansible_mitogen') mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 522f1e2e..e14d26bd 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -42,7 +42,6 @@ import imp import json import logging import os -import shutil import sys import tempfile import types diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index ce528c7f..2fd4cb91 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -29,7 +29,6 @@ from __future__ import absolute_import import os -import ansible.errors import ansible_mitogen.mixins import ansible_mitogen.process diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 3ac97c2f..6dbaca8c 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -32,7 +32,6 @@ for file transfer, module execution and sundry bits like changing file modes. """ from __future__ import absolute_import -import cStringIO import errno import grp import json @@ -44,7 +43,6 @@ import re import stat import subprocess import tempfile -import time import traceback import ansible.module_utils.json_utils diff --git a/mitogen/service.py b/mitogen/service.py index 3e05deb7..62180e33 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -34,6 +34,7 @@ import pwd import stat import sys import threading +import time import mitogen.core import mitogen.select From d2accbce53170ca6a7036a72c519f2f0de2f6bca Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 01:34:15 +0100 Subject: [PATCH 39/40] docs: remove more Ansible limitations --- docs/ansible.rst | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/docs/ansible.rst b/docs/ansible.rst index 210e85bf..9593bd4f 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -140,8 +140,7 @@ Noteworthy Differences artificial serialization, causing slowdown equivalent to `task_duration * num_targets`. This will be fixed soon. -* Asynchronous jobs presently exist only for the duration of a run, and time - limits are not implemented. +* Asynchronous job time limits are not implemented. * "Module Replacer" style modules are not supported. These rarely appear in practice, and light web searches failed to reveal many examples of them. @@ -151,8 +150,8 @@ Noteworthy Differences may be established in parallel by default, this can be modified by setting the ``MITOGEN_POOL_SIZE`` environment variable. -* Performance does not scale perfectly linearly with target count. This will - improve over time. +* Performance does not scale linearly with target count. This will improve over + time. * SSH and ``become`` are treated distinctly when applying timeouts, and timeouts apply up to the point when the new interpreter is ready to accept @@ -201,11 +200,6 @@ container. Connection delegation is a work in progress, bug reports are welcome. - * While imports are cached on intermediaries, module scripts are needlessly - reuploaded for each target. Fixing this is equivalent to implementing - **Topology-Aware File Synchronization**, so it may remain unfixed until - that feature is started. - * Delegated connection setup is single-threaded; only one connection can be constructed in parallel per intermediary. From 3994f1b30a9c149b95b77662a732b30280ad882d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 02:11:41 +0100 Subject: [PATCH 40/40] ansible: implment async job time limit. --- ansible_mitogen/mixins.py | 10 ++++ ansible_mitogen/planner.py | 6 ++- ansible_mitogen/target.py | 32 ++++++++++-- docs/ansible.rst | 2 - tests/ansible/integration/async/all.yml | 2 + .../integration/async/runner_job_timeout.yml | 52 ------------------- .../async/runner_timeout_then_polling.yml | 34 ++++++++++++ .../async/runner_with_polling_and_timeout.yml | 24 +++++++++ 8 files changed, 103 insertions(+), 59 deletions(-) delete mode 100644 tests/ansible/integration/async/runner_job_timeout.yml create mode 100644 tests/ansible/integration/async/runner_timeout_then_polling.yml create mode 100644 tests/ansible/integration/async/runner_with_polling_and_timeout.yml diff --git a/ansible_mitogen/mixins.py b/ansible_mitogen/mixins.py index e4a7a748..efa0bd5a 100644 --- a/ansible_mitogen/mixins.py +++ b/ansible_mitogen/mixins.py @@ -288,6 +288,15 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): # ~root/.ansible -> /root/.ansible return self.call(os.path.expanduser, mitogen.utils.cast(path)) + def get_task_timeout_secs(self): + """ + Return the task "async:" value, portable across 2.4-2.5. + """ + try: + return self._task.async_val + except AttributeError: + return getattr(self._task, 'async') + def _execute_module(self, module_name=None, module_args=None, tmp=None, task_vars=None, persist_files=False, delete_remote_tmp=True, wrap_async=False): @@ -318,6 +327,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): templar=self._templar, env=mitogen.utils.cast(env), wrap_async=wrap_async, + timeout_secs=self.get_task_timeout_secs(), ) ) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 1006956c..8ea3886a 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -94,7 +94,7 @@ class Invocation(object): target.run_module() or helpers.run_module_async() in the target context. """ def __init__(self, action, connection, module_name, module_args, - task_vars, templar, env, wrap_async): + task_vars, templar, env, wrap_async, timeout_secs): #: ActionBase instance invoking the module. Required to access some #: output postprocessing methods that don't belong in ActionBase at #: all. @@ -114,7 +114,8 @@ class Invocation(object): self.env = env #: Boolean, if :py:data:`True`, launch the module asynchronously. self.wrap_async = wrap_async - + #: Integer, if >0, limit the time an asynchronous job may run for. + self.timeout_secs = timeout_secs #: Initially ``None``, but set by :func:`invoke`. The path on the #: master to the module's implementation file. self.module_path = None @@ -403,6 +404,7 @@ def _invoke_async_task(invocation, planner): context.call_no_reply( ansible_mitogen.target.run_module_async, job_id=job_id, + timeout_secs=invocation.timeout_secs, kwargs=planner.get_kwargs(), ) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 6dbaca8c..9d4d8d3a 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -40,6 +40,7 @@ import operator import os import pwd import re +import signal import stat import subprocess import tempfile @@ -283,7 +284,27 @@ def _write_job_status(job_id, dct): os.rename(path + '.tmp', path) -def _run_module_async(kwargs, job_id, econtext): +def _sigalrm(broker, timeout_secs, job_id): + """ + Respond to SIGALRM (job timeout) by updating the job file and killing the + process. + """ + msg = "Job reached maximum time limit of %d seconds." % (timeout_secs,) + _write_job_status(job_id, { + "failed": 1, + "finished": 1, + "msg": msg, + }) + broker.shutdown() + + +def _install_alarm(broker, timeout_secs, job_id): + handler = lambda *_: _sigalrm(broker, timeout_secs, job_id) + signal.signal(signal.SIGALRM, handler) + signal.alarm(timeout_secs) + + +def _run_module_async(kwargs, job_id, timeout_secs, econtext): """ 1. Immediately updates the status file to mark the job as started. 2. Installs a timer/signal handler to implement the time limit. @@ -293,6 +314,8 @@ def _run_module_async(kwargs, job_id, econtext): Runner keyword arguments. :param str job_id: String job ID. + :param int timeout_secs: + If >0, limit the task's maximum run time. """ _write_job_status(job_id, { 'started': 1, @@ -300,6 +323,9 @@ def _run_module_async(kwargs, job_id, econtext): 'pid': os.getpid() }) + if timeout_secs > 0: + _install_alarm(econtext.broker, timeout_secs, job_id) + kwargs['detach'] = True kwargs['econtext'] = econtext kwargs['emulate_tty'] = False @@ -327,7 +353,7 @@ def _run_module_async(kwargs, job_id, econtext): @mitogen.core.takes_econtext -def run_module_async(kwargs, job_id, econtext): +def run_module_async(kwargs, job_id, timeout_secs, econtext): """ Arrange for a module to be executed with its run status and result serialized to a disk file. This function expects to run in a child forked @@ -335,7 +361,7 @@ def run_module_async(kwargs, job_id, econtext): """ try: try: - _run_module_async(kwargs, job_id, econtext) + _run_module_async(kwargs, job_id, timeout_secs, econtext) except Exception: # Catch any (ansible_mitogen) bugs and write them to the job file. _write_job_status(job_id, { diff --git a/docs/ansible.rst b/docs/ansible.rst index 9593bd4f..be615375 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -140,8 +140,6 @@ Noteworthy Differences artificial serialization, causing slowdown equivalent to `task_duration * num_targets`. This will be fixed soon. -* Asynchronous job time limits are not implemented. - * "Module Replacer" style modules are not supported. These rarely appear in practice, and light web searches failed to reveal many examples of them. diff --git a/tests/ansible/integration/async/all.yml b/tests/ansible/integration/async/all.yml index bae2c308..17969ead 100644 --- a/tests/ansible/integration/async/all.yml +++ b/tests/ansible/integration/async/all.yml @@ -3,4 +3,6 @@ - import_playbook: result_shell_echo_hi.yml - import_playbook: runner_new_process.yml - import_playbook: runner_one_job.yml +- import_playbook: runner_timeout_then_polling.yml +- import_playbook: runner_with_polling_and_timeout.yml - import_playbook: runner_two_simultaneous_jobs.yml diff --git a/tests/ansible/integration/async/runner_job_timeout.yml b/tests/ansible/integration/async/runner_job_timeout.yml deleted file mode 100644 index e279c5cb..00000000 --- a/tests/ansible/integration/async/runner_job_timeout.yml +++ /dev/null @@ -1,52 +0,0 @@ -# Verify 'async: ' functions as desired. - -- name: integration/async/runner_job_timeout.yml - hosts: test-targets - any_errors_fatal: true - tasks: - - # Verify async-with-polling-and-timeout behaviour. - - - name: sleep for 7 seconds, but timeout after 1 second. - ignore_errors: true - shell: sleep 7 - async: 1 - poll: 1 - register: job1 - - - assert: - that: - - job1.changed == False - - job1.failed == True - - job1.msg == "async task did not complete within the requested time" - - job1.keys()|sort == ['changed', 'failed', 'msg'] - - # Verify async-with-timeout-then-poll behaviour. - # This is broken in upstream Ansible, so disable the tests there. - # - # TODO: the tests below are totally broken, not clear what Ansible is - # supposed to do here, so can't emulate it in Mitogen. - - - name: sleep for 7 seconds, but timeout after 1 second. - ignore_errors: true - shell: sleep 7 - async: 1 - poll: 0 - register: job2 - when: false # is_mitogen - - - name: poll up to 10 times. - async_status: - jid: "{{job2.ansible_job_id}}" - register: result2 - until: result2.finished - retries: 10 - delay: 1 - when: false # is_mitogen - - - assert: - that: - - result1.rc == 0 - - result2.rc == 0 - - result2.stdout == 'im_alive' - when: false # is_mitogen diff --git a/tests/ansible/integration/async/runner_timeout_then_polling.yml b/tests/ansible/integration/async/runner_timeout_then_polling.yml new file mode 100644 index 00000000..5490e711 --- /dev/null +++ b/tests/ansible/integration/async/runner_timeout_then_polling.yml @@ -0,0 +1,34 @@ +# Verify 'async: ' functions as desired. + +- name: integration/async/runner_timeout_then_polling.yml + hosts: test-targets + any_errors_fatal: true + tasks: + + # Verify async-with-timeout-then-poll behaviour. + # This is semi-broken in upstream Ansible, it does not bother to update the + # job file on failure. So only test on Mitogen. + + - name: sleep for 7 seconds, but timeout after 1 second. + shell: sleep 10 + async: 1 + poll: 0 + register: job + when: is_mitogen + + - name: busy-poll up to 500 times + async_status: + jid: "{{job.ansible_job_id}}" + register: result + until: result.finished + retries: 500 + delay: 0 + when: is_mitogen + ignore_errors: true + + - assert: + that: + - result.failed == 1 + - result.finished == 1 + - result.msg == "Job reached maximum time limit of 1 seconds." + when: is_mitogen diff --git a/tests/ansible/integration/async/runner_with_polling_and_timeout.yml b/tests/ansible/integration/async/runner_with_polling_and_timeout.yml new file mode 100644 index 00000000..6d87fe6c --- /dev/null +++ b/tests/ansible/integration/async/runner_with_polling_and_timeout.yml @@ -0,0 +1,24 @@ +# Verify 'async: ' functions as desired. + +- name: integration/async/runner_with_polling_and_timeout.yml + hosts: test-targets + any_errors_fatal: true + tasks: + + # Verify async-with-polling-and-timeout behaviour. + + - name: sleep for 7 seconds, but timeout after 1 second. + ignore_errors: true + shell: sleep 7 + async: 1 + poll: 1 + register: job1 + + - assert: + that: + - job1.changed == False + - job1.failed == True + - | + job1.msg == "async task did not complete within the requested time" or + job1.msg == "Job reached maximum time limit of 1 seconds." +