diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 35643d7d..90b5e41a 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -31,7 +31,6 @@ import logging import os import shlex import stat -import sys import time import jinja2.runtime @@ -58,6 +57,7 @@ def _connect_local(spec): } } + def wrap_or_none(klass, value): if value is not None: return klass(value) @@ -298,13 +298,17 @@ class Connection(ansible.plugins.connection.ConnectionBase): router = None #: mitogen.master.Context representing the parent Context, which is - #: presently always the master process. + #: presently always the connection multiplexer process. parent = None #: mitogen.master.Context connected to the target user account on the #: target machine (i.e. via sudo). context = None + #: mitogen.master.Context connected to the fork parent process in the + #: target user account. + fork_context = None + #: Only sudo and su are supported for now. become_methods = ['sudo', 'su'] @@ -336,7 +340,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): host_vars = None #: Set after connection to the target context's home directory. - _homedir = None + home_dir = None def __init__(self, play_context, new_stdin, **kwargs): assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( @@ -376,7 +380,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): @property def homedir(self): self._connect() - return self._homedir + return self.home_dir @property def connected(self): @@ -389,7 +393,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): raise ansible.errors.AnsibleConnectionFailure( self.unknown_via_msg % ( self.mitogen_via, - config['inventory_name'], + inventory_name, ) ) @@ -470,15 +474,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): raise ansible.errors.AnsibleConnectionFailure(dct['msg']) self.context = dct['context'] - self._homedir = dct['home_dir'] - - def get_context_name(self): - """ - Return the name of the target context we issue commands against, i.e. a - unique string useful as a key for related data, such as a list of - modules uploaded to the target. - """ - return self.context.name + self.fork_context = dct['init_child_result']['fork_context'] + self.home_dir = dct['init_child_result']['home_dir'] def close(self, new_task=False): """ @@ -526,6 +523,17 @@ class Connection(ansible.plugins.connection.ConnectionBase): LOG.debug('Call took %d ms: %s%r', 1000 * (time.time() - t0), func.func_name, args) + def create_fork_child(self): + """ + Fork a new child off the target context. The actual fork occurs from + the 'virginal fork parent', which does not any Ansible modules prior to + fork, to avoid conflicts resulting from custom module_utils paths. + + :returns: + mitogen.core.Context of the new child. + """ + return self.call(ansible_mitogen.target.create_fork_child) + def exec_command(self, cmd, in_data='', sudoable=True, mitogen_chdir=None): """ Implement exec_command() by calling the corresponding @@ -613,7 +621,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): utimes=(st.st_atime, st.st_mtime)) self.parent.call_service( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='register', path=mitogen.utils.cast(in_path) ) diff --git a/ansible_mitogen/mixins.py b/ansible_mitogen/mixins.py index fdd104b2..efa0bd5a 100644 --- a/ansible_mitogen/mixins.py +++ b/ansible_mitogen/mixins.py @@ -32,7 +32,6 @@ import logging import os import pwd import shutil -import tempfile import traceback from ansible.module_utils._text import to_bytes @@ -43,11 +42,6 @@ import ansible.constants import ansible.plugins import ansible.plugins.action -try: - from ansible.plugins.loader import module_loader -except ImportError: # Ansible<2.4 - from ansible.plugins import module_loader - import mitogen.core import mitogen.select import mitogen.utils @@ -294,6 +288,15 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): # ~root/.ansible -> /root/.ansible return self.call(os.path.expanduser, mitogen.utils.cast(path)) + def get_task_timeout_secs(self): + """ + Return the task "async:" value, portable across 2.4-2.5. + """ + try: + return self._task.async_val + except AttributeError: + return getattr(self._task, 'async') + def _execute_module(self, module_name=None, module_args=None, tmp=None, task_vars=None, persist_files=False, delete_remote_tmp=True, wrap_async=False): @@ -313,6 +316,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): env = {} self._compute_environment_string(env) + self._connection._connect() return ansible_mitogen.planner.invoke( ansible_mitogen.planner.Invocation( action=self, @@ -323,6 +327,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): templar=self._templar, env=mitogen.utils.cast(env), wrap_async=wrap_async, + timeout_secs=self.get_task_timeout_secs(), ) ) diff --git a/ansible_mitogen/module_finder.py b/ansible_mitogen/module_finder.py index 28522dd9..79e1882c 100644 --- a/ansible_mitogen/module_finder.py +++ b/ansible_mitogen/module_finder.py @@ -72,35 +72,42 @@ def is_pkg(module): def find(name, path=(), parent=None): """ Return a Module instance describing the first matching module found on the - given search path. + search path. :param str name: Module name. - :param str path: - Search path. + :param list path: + List of directory names to search for the module. :param Module parent: - If given, make the found module a child of this module. + Optional module parent. """ + assert isinstance(path, tuple) head, _, tail = name.partition('.') try: tup = imp.find_module(head, list(path)) except ImportError: return parent - fp, path, (suffix, mode, kind) = tup + fp, modpath, (suffix, mode, kind) = tup + if parent and modpath == parent.path: + # 'from timeout import timeout', where 'timeout' is a function but also + # the name of the module being imported. + return None + if fp: fp.close() if kind == imp.PKG_DIRECTORY: - path = os.path.join(path, '__init__.py') - module = Module(head, path, kind, parent) + modpath = os.path.join(modpath, '__init__.py') + module = Module(head, modpath, kind, parent) if tail: return find_relative(module, tail, path) return module def find_relative(parent, name, path=()): - path = [os.path.dirname(parent.path)] + list(path) + if parent.kind == imp.PKG_DIRECTORY: + path = (os.path.dirname(parent.path),) + path return find(name, path, parent=parent) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 6605686c..8ea3886a 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -35,8 +35,10 @@ files/modules known missing. """ from __future__ import absolute_import +import json import logging import os +import random from ansible.executor import module_common import ansible.errors @@ -50,9 +52,7 @@ except ImportError: # Ansible <2.4 from ansible.plugins import module_utils_loader import mitogen -import mitogen.service import ansible_mitogen.target -import ansible_mitogen.services LOG = logging.getLogger(__name__) @@ -94,7 +94,7 @@ class Invocation(object): target.run_module() or helpers.run_module_async() in the target context. """ def __init__(self, action, connection, module_name, module_args, - task_vars, templar, env, wrap_async): + task_vars, templar, env, wrap_async, timeout_secs): #: ActionBase instance invoking the module. Required to access some #: output postprocessing methods that don't belong in ActionBase at #: all. @@ -114,7 +114,8 @@ class Invocation(object): self.env = env #: Boolean, if :py:data:`True`, launch the module asynchronously. self.wrap_async = wrap_async - + #: Integer, if >0, limit the time an asynchronous job may run for. + self.timeout_secs = timeout_secs #: Initially ``None``, but set by :func:`invoke`. The path on the #: master to the module's implementation file. self.module_path = None @@ -132,20 +133,36 @@ class Planner(object): file, indicates whether or not it understands how to run the module, and exports a method to run the module. """ - def detect(self, invocation): + def __init__(self, invocation): + self._inv = invocation + + def detect(self): """ Return true if the supplied `invocation` matches the module type implemented by this planner. """ raise NotImplementedError() - def get_should_fork(self, invocation): + def should_fork(self): """ Asynchronous tasks must always be forked. """ - return invocation.wrap_async + return self._inv.wrap_async + + def get_push_files(self): + """ + Return a list of files that should be propagated to the target context + using PushFileService. The default implementation pushes nothing. + """ + return [] - def plan(self, invocation, **kwargs): + def get_module_deps(self): + """ + Return a list of the Python module names imported by the module. + """ + return [] + + def get_kwargs(self, **kwargs): """ If :meth:`detect` returned :data:`True`, plan for the module's execution, including granting access to or delivering any files to it @@ -161,9 +178,7 @@ class Planner(object): } """ kwargs.setdefault('emulate_tty', True) - kwargs.setdefault('service_context', invocation.connection.parent) - kwargs.setdefault('should_fork', self.get_should_fork(invocation)) - kwargs.setdefault('wrap_async', invocation.wrap_async) + kwargs.setdefault('service_context', self._inv.connection.parent) return kwargs def __repr__(self): @@ -177,26 +192,19 @@ class BinaryPlanner(Planner): """ runner_name = 'BinaryRunner' - def detect(self, invocation): - return module_common._is_binary(invocation.module_source) + def detect(self): + return module_common._is_binary(self._inv.module_source) - def _grant_file_service_access(self, invocation): - invocation.connection._connect() - invocation.connection.parent.call_service( - service_name='ansible_mitogen.services.FileService', - method_name='register', - path=invocation.module_path, - ) + def get_push_files(self): + return [self._inv.module_path] - def plan(self, invocation, **kwargs): - self._grant_file_service_access(invocation) - return super(BinaryPlanner, self).plan( - invocation=invocation, + def get_kwargs(self, **kwargs): + return super(BinaryPlanner, self).get_kwargs( runner_name=self.runner_name, - module=invocation.module_name, - path=invocation.module_path, - args=invocation.module_args, - env=invocation.env, + module=self._inv.module_name, + path=self._inv.module_path, + args=self._inv.module_args, + env=self._inv.env, **kwargs ) @@ -206,24 +214,25 @@ class ScriptPlanner(BinaryPlanner): Common functionality for script module planners -- handle interpreter detection and rewrite. """ - def _get_interpreter(self, invocation): - interpreter, arg = parse_script_interpreter(invocation.module_source) + def _get_interpreter(self): + interpreter, arg = parse_script_interpreter( + self._inv.module_source + ) if interpreter is None: raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % ( - invocation.module_name, + self._inv.module_name, )) key = u'ansible_%s_interpreter' % os.path.basename(interpreter).strip() try: - template = invocation.task_vars[key].strip() - return invocation.templar.template(template), arg + template = self._inv.task_vars[key].strip() + return self._inv.templar.template(template), arg except KeyError: return interpreter, arg - def plan(self, invocation, **kwargs): - interpreter, arg = self._get_interpreter(invocation) - return super(ScriptPlanner, self).plan( - invocation=invocation, + def get_kwargs(self, **kwargs): + interpreter, arg = self._get_interpreter() + return super(ScriptPlanner, self).get_kwargs( interpreter_arg=arg, interpreter=interpreter, **kwargs @@ -237,8 +246,8 @@ class JsonArgsPlanner(ScriptPlanner): """ runner_name = 'JsonArgsRunner' - def detect(self, invocation): - return module_common.REPLACER_JSONARGS in invocation.module_source + def detect(self): + return module_common.REPLACER_JSONARGS in self._inv.module_source class WantJsonPlanner(ScriptPlanner): @@ -255,8 +264,8 @@ class WantJsonPlanner(ScriptPlanner): """ runner_name = 'WantJsonRunner' - def detect(self, invocation): - return 'WANT_JSON' in invocation.module_source + def detect(self): + return 'WANT_JSON' in self._inv.module_source class NewStylePlanner(ScriptPlanner): @@ -267,53 +276,59 @@ class NewStylePlanner(ScriptPlanner): """ runner_name = 'NewStyleRunner' - def _get_interpreter(self, invocation): + def detect(self): + return 'from ansible.module_utils.' in self._inv.module_source + + def _get_interpreter(self): return None, None - def _grant_file_service_access(self, invocation): - """ - Stub out BinaryPlanner's method since ModuleDepService makes internal - calls to grant file access, avoiding 2 IPCs per task invocation. - """ + def get_push_files(self): + return super(NewStylePlanner, self).get_push_files() + [ + path + for fullname, path, is_pkg in self.get_module_map()['custom'] + ] + + def get_module_deps(self): + return self.get_module_map()['builtin'] - def get_should_fork(self, invocation): + def should_fork(self): """ In addition to asynchronous tasks, new-style modules should be forked - if mitogen_task_isolation=fork. + if the user specifies mitogen_task_isolation=fork, or if the new-style + module has a custom module search path. """ return ( - super(NewStylePlanner, self).get_should_fork(invocation) or - (invocation.task_vars.get('mitogen_task_isolation') == 'fork') + super(NewStylePlanner, self).should_fork() or + (self._inv.task_vars.get('mitogen_task_isolation') == 'fork') or + (len(self.get_module_map()['custom']) > 0) ) - def detect(self, invocation): - return 'from ansible.module_utils.' in invocation.module_source - - def get_search_path(self, invocation): + def get_search_path(self): return tuple( path for path in module_utils_loader._get_paths(subdirs=False) if os.path.isdir(path) ) - def get_module_utils(self, invocation): - invocation.connection._connect() - return invocation.connection.parent.call_service( - service_name='ansible_mitogen.services.ModuleDepService', - method_name='scan', + _module_map = None - module_name='ansible_module_%s' % (invocation.module_name,), - module_path=invocation.module_path, - search_path=self.get_search_path(invocation), - builtin_path=module_common._MODULE_UTILS_PATH, - ) + def get_module_map(self): + if self._module_map is None: + self._module_map = self._inv.connection.parent.call_service( + service_name='ansible_mitogen.services.ModuleDepService', + method_name='scan', - def plan(self, invocation): - module_utils = self.get_module_utils(invocation) - return super(NewStylePlanner, self).plan( - invocation, - module_utils=module_utils, - should_fork=(self.get_should_fork(invocation) or bool(module_utils)), + module_name='ansible_module_%s' % (self._inv.module_name,), + module_path=self._inv.module_path, + search_path=self.get_search_path(), + builtin_path=module_common._MODULE_UTILS_PATH, + context=self._inv.connection.context, + ) + return self._module_map + + def get_kwargs(self): + return super(NewStylePlanner, self).get_kwargs( + module_map=self.get_module_map(), ) @@ -343,14 +358,14 @@ class ReplacerPlanner(NewStylePlanner): """ runner_name = 'ReplacerRunner' - def detect(self, invocation): - return module_common.REPLACER in invocation.module_source + def detect(self): + return module_common.REPLACER in self._inv.module_source class OldStylePlanner(ScriptPlanner): runner_name = 'OldStyleRunner' - def detect(self, invocation): + def detect(self): # Everything else. return True @@ -372,24 +387,85 @@ def get_module_data(name): return path, source -def invoke(invocation): - """ - Find a suitable Planner that knows how to run `invocation`. - """ - (invocation.module_path, - invocation.module_source) = get_module_data(invocation.module_name) +def _propagate_deps(invocation, planner, context): + invocation.connection.parent.call_service( + service_name='mitogen.service.PushFileService', + method_name='propagate_paths_and_modules', + context=context, + paths=planner.get_push_files(), + modules=planner.get_module_deps(), + ) + + +def _invoke_async_task(invocation, planner): + job_id = '%016x' % random.randint(0, 2**64) + context = invocation.connection.create_fork_child() + _propagate_deps(invocation, planner, context) + context.call_no_reply( + ansible_mitogen.target.run_module_async, + job_id=job_id, + timeout_secs=invocation.timeout_secs, + kwargs=planner.get_kwargs(), + ) + + return { + 'stdout': json.dumps({ + # modules/utilities/logic/async_wrapper.py::_run_module(). + 'changed': True, + 'started': 1, + 'finished': 0, + 'ansible_job_id': job_id, + }) + } + + +def _invoke_forked_task(invocation, planner): + context = invocation.connection.create_fork_child() + _propagate_deps(invocation, planner, context) + try: + return context.call( + ansible_mitogen.target.run_module, + kwargs=planner.get_kwargs(), + ) + finally: + context.shutdown() + +def _get_planner(invocation): for klass in _planners: - planner = klass() - if planner.detect(invocation): + planner = klass(invocation) + if planner.detect(): LOG.debug('%r accepted %r (filename %r)', planner, invocation.module_name, invocation.module_path) - return invocation.action._postprocess_response( - invocation.connection.call( - ansible_mitogen.target.run_module, - planner.plan(invocation), - ) - ) + return planner LOG.debug('%r rejected %r', planner, invocation.module_name) - raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) + + +def invoke(invocation): + """ + Find a Planner subclass corresnding to `invocation` and use it to invoke + the module. + + :param Invocation invocation: + :returns: + Module return dict. + :raises ansible.errors.AnsibleError: + Unrecognized/unsupported module type. + """ + (invocation.module_path, + invocation.module_source) = get_module_data(invocation.module_name) + planner = _get_planner(invocation) + + if invocation.wrap_async: + response = _invoke_async_task(invocation, planner) + elif planner.should_fork(): + response = _invoke_forked_task(invocation, planner) + else: + _propagate_deps(invocation, planner, invocation.connection.context) + response = invocation.connection.call( + ansible_mitogen.target.run_module, + kwargs=planner.get_kwargs(), + ) + + return invocation.action._postprocess_response(response) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 4946aa29..c4f58310 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -35,6 +35,7 @@ import sys import mitogen import mitogen.core +import mitogen.debug import mitogen.master import mitogen.parent import mitogen.service @@ -135,7 +136,7 @@ class MuxProcess(object): """ Construct a Router, Broker, and mitogen.unix listener """ - self.router = mitogen.master.Router(max_message_size=4096*1048576) + self.router = mitogen.master.Router(max_message_size=4096 * 1048576) self.router.responder.whitelist_prefix('ansible') self.router.responder.whitelist_prefix('ansible_mitogen') mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown) @@ -145,22 +146,21 @@ class MuxProcess(object): ) if 'MITOGEN_ROUTER_DEBUG' in os.environ: self.router.enable_debug() + if 'MITOGEN_DUMP_THREAD_STACKS' in os.environ: + mitogen.debug.dump_to_logger() def _setup_services(self): """ Construct a ContextService and a thread to service requests for it arriving from worker processes. """ - file_service = ansible_mitogen.services.FileService(router=self.router) self.pool = mitogen.service.Pool( router=self.router, services=[ - file_service, + mitogen.service.FileService(router=self.router), + mitogen.service.PushFileService(router=self.router), ansible_mitogen.services.ContextService(self.router), - ansible_mitogen.services.ModuleDepService( - router=self.router, - file_service=file_service, - ), + ansible_mitogen.services.ModuleDepService(self.router), ], size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), ) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 807623f7..e14d26bd 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -42,12 +42,11 @@ import imp import json import logging import os -import shutil import sys import tempfile import types -import mitogen.service +import mitogen.core import ansible_mitogen.target # TODO: circular import try: @@ -111,17 +110,27 @@ class Runner(object): Context to which we should direct FileService calls. For now, always the connection multiplexer process on the controller. :param dict args: - Ansible module arguments. A strange mixture of user and internal keys - created by ActionBase._execute_module(). + Ansible module arguments. A mixture of user and internal keys created + by :meth:`ansible.plugins.action.ActionBase._execute_module`. :param dict env: Additional environment variables to set during the run. + + :param mitogen.core.ExternalContext econtext: + When `detach` is :data:`True`, a reference to the ExternalContext the + runner is executing in. + :param bool detach: + When :data:`True`, indicate the runner should detach the context from + its parent after setup has completed successfully. """ - def __init__(self, module, service_context, args=None, env=None): + def __init__(self, module, service_context, args=None, env=None, + econtext=None, detach=False): if args is None: args = {} self.module = utf8(module) self.service_context = service_context + self.econtext = econtext + self.detach = detach self.args = args self.env = env @@ -177,6 +186,9 @@ class Runner(object): Module result dictionary. """ self.setup() + if self.detach: + self.econtext.detach() + try: return self._run() finally: @@ -208,7 +220,7 @@ class ModuleUtilsImporter(object): def load_module(self, fullname): path, is_pkg = self._by_fullname[fullname] - source = ansible_mitogen.target.get_file(self._context, path) + source = ansible_mitogen.target.get_small_file(self._context, path) code = compile(source, path, 'exec') mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod.__file__ = "master:%s" % (path,) @@ -273,7 +285,7 @@ class ProgramRunner(Runner): :param str path: Absolute path to the program file on the master, as it can be retrieved - via :class:`ansible_mitogen.services.FileService`. + via :class:`mitogen.service.FileService`. :param bool emulate_tty: If :data:`True`, execute the program with `stdout` and `stderr` merged into a single pipe, emulating Ansible behaviour when an SSH TTY is in @@ -315,7 +327,7 @@ class ProgramRunner(Runner): """ Fetch the module binary from the master if necessary. """ - return ansible_mitogen.target.get_file( + return ansible_mitogen.target.get_small_file( context=self.service_context, path=self.path, ) @@ -443,12 +455,30 @@ class NewStyleRunner(ScriptRunner): #: path => new-style module bytecode. _code_by_path = {} - def __init__(self, module_utils, **kwargs): + def __init__(self, module_map, **kwargs): super(NewStyleRunner, self).__init__(**kwargs) - self.module_utils = module_utils + self.module_map = module_map + + def _setup_imports(self): + """ + Ensure the local importer and PushFileService has everything for the + Ansible module before setup() completes, but before detach() is called + in an asynchronous task. + + The master automatically streams modules towards us concurrent to the + runner invocation, however there is no public API to synchronize on the + completion of those preloads. Instead simply reuse the importer's + synchronization mechanism by importing everything the module will need + prior to detaching. + """ + for fullname, _, _ in self.module_map['custom']: + mitogen.core.import_module(fullname) + for fullname in self.module_map['builtin']: + mitogen.core.import_module(fullname) def setup(self): super(NewStyleRunner, self).setup() + self._stdio = NewStyleStdio(self.args) # It is possible that not supplying the script filename will break some # module, but this has never been a bug report. Instead act like an @@ -456,8 +486,9 @@ class NewStyleRunner(ScriptRunner): self._argv = TemporaryArgv(['']) self._importer = ModuleUtilsImporter( context=self.service_context, - module_utils=self.module_utils, + module_utils=self.module_map['custom'], ) + self._setup_imports() if libc__res_init: libc__res_init() @@ -476,14 +507,12 @@ class NewStyleRunner(ScriptRunner): pass def _setup_program(self): - pass - - def _get_code(self): - self.source = ansible_mitogen.target.get_file( + self.source = ansible_mitogen.target.get_small_file( context=self.service_context, path=self.path, ) + def _get_code(self): try: return self._code_by_path[self.path] except KeyError: diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 24e1f5b1..0331efd2 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -38,18 +38,13 @@ when a child has completed a job. """ from __future__ import absolute_import -import grp import logging import os import os.path -import pwd -import stat import sys import threading -import zlib import mitogen -import mitogen.master import mitogen.service import ansible_mitogen.module_finder import ansible_mitogen.target @@ -227,6 +222,20 @@ class ContextService(mitogen.service.Service): finally: self._lock.release() + ALWAYS_PRELOAD = ( + 'ansible.module_utils.basic', + 'ansible.module_utils.json_utils', + 'ansible.release', + 'ansible_mitogen.runner', + 'ansible_mitogen.target', + 'mitogen.fork', + 'mitogen.service', + ) + + def _send_module_forwards(self, context): + for fullname in self.ALWAYS_PRELOAD: + self.router.responder.forward_module(context, fullname) + def _connect(self, key, spec, via=None): """ Actual connect implementation. Arranges for the Mitogen connection to @@ -242,13 +251,18 @@ class ContextService(mitogen.service.Service): { 'context': mitogen.core.Context or None, - 'home_dir': str or None, + 'init_child_result': { + 'fork_context': mitogen.core.Context, + 'home_dir': str or None, + }, 'msg': str or None } - Where either `msg` is an error message and the remaining fields are - :data:`None`, or `msg` is :data:`None` and the remaining fields are - set. + Where `context` is a reference to the newly constructed context, + `init_child_result` is the result of executing + :func:`ansible_mitogen.target.init_child` in that context, `msg` is + an error message and the remaining fields are :data:`None`, or + `msg` is :data:`None` and the remaining fields are set. """ try: method = getattr(self.router, spec['method']) @@ -266,11 +280,8 @@ class ContextService(mitogen.service.Service): mitogen.core.listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream)) - home_dir = context.call(os.path.expanduser, '~') - - # We don't need to wait for the result of this. Ideally we'd check its - # return value somewhere, but logs will catch a failure anyway. - context.call_async(ansible_mitogen.target.init_child) + self._send_module_forwards(context) + init_child_result = context.call(ansible_mitogen.target.init_child) if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'): from mitogen import debug @@ -280,7 +291,7 @@ class ContextService(mitogen.service.Service): self._refs_by_context[context] = 0 return { 'context': context, - 'home_dir': home_dir, + 'init_child_result': init_child_result, 'msg': None, } @@ -331,7 +342,7 @@ class ContextService(mitogen.service.Service): :returns dict: * context: mitogen.master.Context or None. - * homedir: Context's home directory or None. + * init_child_result: Result of :func:`init_child`. * msg: StreamError exception text or None. * method_name: string failing method name. """ @@ -346,7 +357,7 @@ class ContextService(mitogen.service.Service): except mitogen.core.StreamError as e: return { 'context': None, - 'home_dir': None, + 'init_child_result': None, 'method_name': spec['method'], 'msg': str(e), } @@ -354,288 +365,50 @@ class ContextService(mitogen.service.Service): return result -class StreamState(object): - def __init__(self): - #: List of [(Sender, file object)] - self.jobs = [] - self.completing = {} - #: In-flight byte count. - self.unacked = 0 - #: Lock. - self.lock = threading.Lock() - - -class FileService(mitogen.service.Service): - """ - Streaming file server, used to serve small files like Ansible modules and - huge files like ISO images. Paths must be registered by a trusted context - before they will be served to a child. - - Transfers are divided among the physical streams that connect external - contexts, ensuring each stream never has excessive data buffered in RAM, - while still maintaining enough to fully utilize available bandwidth. This - is achieved by making an initial bandwidth assumption, enqueueing enough - chunks to fill that assumed pipe, then responding to delivery - acknowledgements from the receiver by scheduling new chunks. - - Transfers proceed one-at-a-time per stream. When multiple contexts exist on - a stream (e.g. one is the SSH account, another is a sudo account, and a - third is a proxied SSH connection), each request is satisfied in turn - before subsequent requests start flowing. This ensures when a stream is - contended, priority is given to completing individual transfers rather than - potentially aborting many partial transfers, causing the bandwidth to be - wasted. - - Theory of operation: - 1. Trusted context (i.e. WorkerProcess) calls register(), making a - file available to any untrusted context. - 2. Requestee context creates a mitogen.core.Receiver() to receive - chunks, then calls fetch(path, recv.to_sender()), to set up the - transfer. - 3. fetch() replies to the call with the file's metadata, then - schedules an initial burst up to the window size limit (1MiB). - 4. Chunks begin to arrive in the requestee, which calls acknowledge() - for each 128KiB received. - 5. The acknowledge() call arrives at FileService, which scheduled a new - chunk to refill the drained window back to the size limit. - 6. When the last chunk has been pumped for a single transfer, - Sender.close() is called causing the receive loop in - target.py::_get_file() to exit, allowing that code to compare the - transferred size with the total file size from the metadata. - 7. If the sizes mismatch, _get_file()'s caller is informed which will - discard the result and log/raise an error. - - Shutdown: - 1. process.py calls service.Pool.shutdown(), which arranges for the - service pool threads to exit and be joined, guranteeing no new - requests can arrive, before calling Service.on_shutdown() for each - registered service. - 2. FileService.on_shutdown() walks every in-progress transfer and calls - Sender.close(), causing Receiver loops in the requestees to exit - early. The size check fails and any partially downloaded file is - discarded. - 3. Control exits _get_file() in every target, and graceful shutdown can - proceed normally, without the associated thread needing to be - forcefully killed. - """ - unregistered_msg = 'Path is not registered with FileService.' - context_mismatch_msg = 'sender= kwarg context must match requestee context' - - #: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which - #: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5. - window_size_bytes = 1048576 - - def __init__(self, router): - super(FileService, self).__init__(router) - #: Mapping of registered path -> file size. - self._metadata_by_path = {} - #: Mapping of Stream->StreamState. - self._state_by_stream = {} - - def _name_or_none(self, func, n, attr): - try: - return getattr(func(n), attr) - except KeyError: - return None - - @mitogen.service.expose(policy=mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'paths': list - }) - def register_many(self, paths): - """ - Batch version of register(). - """ - for path in paths: - self.register(path) - - @mitogen.service.expose(policy=mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'path': basestring - }) - def register(self, path): - """ - Authorize a path for access by children. Repeat calls with the same - path is harmless. - - :param str path: - File path. - """ - if path in self._metadata_by_path: - return - - st = os.stat(path) - if not stat.S_ISREG(st.st_mode): - raise IOError('%r is not a regular file.' % (in_path,)) - - LOG.debug('%r: registering %r', self, path) - self._metadata_by_path[path] = { - 'size': st.st_size, - 'mode': st.st_mode, - 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), - 'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'), - 'mtime': st.st_mtime, - 'atime': st.st_atime, - } - - def on_shutdown(self): - """ - Respond to shutdown by sending close() to every target, allowing their - receive loop to exit and clean up gracefully. - """ - LOG.debug('%r.on_shutdown()', self) - for stream, state in self._state_by_stream.items(): - state.lock.acquire() - try: - for sender, fp in reversed(state.jobs): - sender.close() - fp.close() - state.jobs.pop() - finally: - state.lock.release() - - # The IO loop pumps 128KiB chunks. An ideal message is a multiple of this, - # odd-sized messages waste one tiny write() per message on the trailer. - # Therefore subtract 10 bytes pickle overhead + 24 bytes header. - IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + ( - len( - mitogen.core.Message.pickled( - mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE) - ).data - ) - mitogen.core.CHUNK_SIZE - )) - - def _schedule_pending_unlocked(self, state): - """ - Consider the pending transfers for a stream, pumping new chunks while - the unacknowledged byte count is below :attr:`window_size_bytes`. Must - be called with the StreamState lock held. - - :param StreamState state: - Stream to schedule chunks for. - """ - while state.jobs and state.unacked < self.window_size_bytes: - sender, fp = state.jobs[0] - s = fp.read(self.IO_SIZE) - if s: - state.unacked += len(s) - sender.send(mitogen.core.Blob(s)) - else: - # File is done. Cause the target's receive loop to exit by - # closing the sender, close the file, and remove the job entry. - sender.close() - fp.close() - state.jobs.pop(0) - - @mitogen.service.expose(policy=mitogen.service.AllowAny()) - @mitogen.service.no_reply() - @mitogen.service.arg_spec({ - 'path': basestring, - 'sender': mitogen.core.Sender, - }) - def fetch(self, path, sender, msg): - """ - Start a transfer for a registered path. - - :param str path: - File path. - :param mitogen.core.Sender sender: - Sender to receive file data. - :returns: - Dict containing the file metadata: - - * ``size``: File size in bytes. - * ``mode``: Integer file mode. - * ``owner``: Owner account name on host machine. - * ``group``: Owner group name on host machine. - * ``mtime``: Floating point modification time. - * ``ctime``: Floating point change time. - :raises Error: - Unregistered path, or Sender did not match requestee context. - """ - if path not in self._metadata_by_path: - raise Error(self.unregistered_msg) - if msg.src_id != sender.context.context_id: - raise Error(self.context_mismatch_msg) - - LOG.debug('Serving %r', path) - fp = open(path, 'rb', self.IO_SIZE) - # Response must arrive first so requestee can begin receive loop, - # otherwise first ack won't arrive until all pending chunks were - # delivered. In that case max BDP would always be 128KiB, aka. max - # ~10Mbit/sec over a 100ms link. - msg.reply(self._metadata_by_path[path]) - - stream = self.router.stream_by_id(sender.context.context_id) - state = self._state_by_stream.setdefault(stream, StreamState()) - state.lock.acquire() - try: - state.jobs.append((sender, fp)) - self._schedule_pending_unlocked(state) - finally: - state.lock.release() - - @mitogen.service.expose(policy=mitogen.service.AllowAny()) - @mitogen.service.no_reply() - @mitogen.service.arg_spec({ - 'size': int, - }) - @mitogen.service.no_reply() - def acknowledge(self, size, msg): - """ - Acknowledge bytes received by a transfer target, scheduling new chunks - to keep the window full. This should be called for every chunk received - by the target. - """ - stream = self.router.stream_by_id(msg.src_id) - state = self._state_by_stream[stream] - state.lock.acquire() - try: - if state.unacked < size: - LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d', - self, msg.src_id, state.unacked, size) - state.unacked -= min(state.unacked, size) - self._schedule_pending_unlocked(state) - finally: - state.lock.release() - - class ModuleDepService(mitogen.service.Service): """ Scan a new-style module and produce a cached mapping of module_utils names to their resolved filesystem paths. """ - def __init__(self, file_service, **kwargs): - super(ModuleDepService, self).__init__(**kwargs) - self._file_service = file_service + def __init__(self, *args, **kwargs): + super(ModuleDepService, self).__init__(*args, **kwargs) self._cache = {} + def _get_builtin_names(self, builtin_path, resolved): + return [ + fullname + for fullname, path, is_pkg in resolved + if os.path.abspath(path).startswith(builtin_path) + ] + + def _get_custom_tups(self, builtin_path, resolved): + return [ + (fullname, path, is_pkg) + for fullname, path, is_pkg in resolved + if not os.path.abspath(path).startswith(builtin_path) + ] + @mitogen.service.expose(policy=mitogen.service.AllowParents()) @mitogen.service.arg_spec({ 'module_name': basestring, 'module_path': basestring, 'search_path': tuple, 'builtin_path': basestring, + 'context': mitogen.core.Context, }) - def scan(self, module_name, module_path, search_path, builtin_path): - if (module_name, search_path) not in self._cache: + def scan(self, module_name, module_path, search_path, builtin_path, context): + key = (module_name, search_path) + if key not in self._cache: resolved = ansible_mitogen.module_finder.scan( module_name=module_name, module_path=module_path, search_path=tuple(search_path) + (builtin_path,), ) builtin_path = os.path.abspath(builtin_path) - filtered = [ - (fullname, path, is_pkg) - for fullname, path, is_pkg in resolved - if not os.path.abspath(path).startswith(builtin_path) - ] - self._cache[module_name, search_path] = filtered - - # Grant FileService access to paths in here to avoid another 2 IPCs - # from WorkerProcess. - self._file_service.register(path=module_path) - for fullname, path, is_pkg in filtered: - self._file_service.register(path=path) - - return self._cache[module_name, search_path] + builtin = self._get_builtin_names(builtin_path, resolved) + custom = self._get_custom_tups(builtin_path, resolved) + self._cache[key] = { + 'builtin': builtin, + 'custom': custom, + } + return self._cache[key] diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index ce528c7f..2fd4cb91 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -29,7 +29,6 @@ from __future__ import absolute_import import os -import ansible.errors import ansible_mitogen.mixins import ansible_mitogen.process diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 090730d8..9d4d8d3a 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -32,7 +32,6 @@ for file transfer, module execution and sundry bits like changing file modes. """ from __future__ import absolute_import -import cStringIO import errno import grp import json @@ -40,17 +39,15 @@ import logging import operator import os import pwd -import random import re +import signal import stat import subprocess import tempfile -import time import traceback import ansible.module_utils.json_utils import ansible_mitogen.runner -import ansible_mitogen.services import mitogen.core import mitogen.fork import mitogen.parent @@ -63,61 +60,12 @@ LOG = logging.getLogger(__name__) #: the duration of the process. temp_dir = None -#: Caching of fetched file data. -_file_cache = {} - #: Initialized to an econtext.parent.Context pointing at a pristine fork of #: the target Python interpreter before it executes any code or imports. _fork_parent = None -def _get_file(context, path, out_fp): - """ - Streamily download a file from the connection multiplexer process in the - controller. - - :param mitogen.core.Context context: - Reference to the context hosting the FileService that will be used to - fetch the file. - :param bytes in_path: - FileService registered name of the input file. - :param bytes out_path: - Name of the output path on the local disk. - :returns: - :data:`True` on success, or :data:`False` if the transfer was - interrupted and the output should be discarded. - """ - LOG.debug('_get_file(): fetching %r from %r', path, context) - t0 = time.time() - recv = mitogen.core.Receiver(router=context.router) - metadata = context.call_service( - service_name='ansible_mitogen.services.FileService', - method_name='fetch', - path=path, - sender=recv.to_sender(), - ) - - for chunk in recv: - s = chunk.unpickle() - LOG.debug('_get_file(%r): received %d bytes', path, len(s)) - context.call_service_async( - service_name='ansible_mitogen.services.FileService', - method_name='acknowledge', - size=len(s), - ).close() - out_fp.write(s) - - ok = out_fp.tell() == metadata['size'] - if not ok: - LOG.error('get_file(%r): receiver was closed early, controller ' - 'is likely shutting down.', path) - - LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', - metadata['size'], path, context, 1000 * (time.time() - t0)) - return ok, metadata - - -def get_file(context, path): +def get_small_file(context, path): """ Basic in-memory caching module fetcher. This generates an one roundtrip for every previously unseen file, so it is only a temporary solution. @@ -131,13 +79,9 @@ def get_file(context, path): :returns: Bytestring file data. """ - if path not in _file_cache: - io = cStringIO.StringIO() - ok, metadata = _get_file(context, path, io) - if not ok: - raise IOError('transfer of %r was interrupted.' % (path,)) - _file_cache[path] = io.getvalue() - return _file_cache[path] + pool = mitogen.service.get_or_create_pool(router=context.router) + service = pool.get_service('mitogen.service.PushFileService') + return service.get(path) def transfer_file(context, in_path, out_path, sync=False, set_owner=False): @@ -170,7 +114,11 @@ def transfer_file(context, in_path, out_path, sync=False, set_owner=False): try: try: - ok, metadata = _get_file(context, in_path, fp) + ok, metadata = mitogen.service.FileService.get( + context=context, + path=in_path, + out_fp=fp, + ) if not ok: raise IOError('transfer of %r was interrupted.' % (in_path,)) @@ -261,50 +209,51 @@ def init_child(econtext): This is necessary to prevent modules that are executed in-process from polluting the global interpreter state in a way that effects explicitly isolated modules. + + :returns: + Dict like:: + + { + 'fork_context': mitogen.core.Context. + 'home_dir': str. + } + + Where `fork_context` refers to the newly forked 'fork parent' context + the controller will use to start forked jobs, and `home_dir` is the + home directory for the active user account. """ global _fork_parent mitogen.parent.upgrade_router(econtext) _fork_parent = econtext.router.fork() reset_temp_dir(econtext) + return { + 'fork_context': _fork_parent, + 'home_dir': os.path.expanduser('~'), + } + @mitogen.core.takes_econtext -def start_fork_child(wrap_async, kwargs, econtext): +def create_fork_child(econtext): + """ + For helper functions executed in the fork parent context, arrange for + the context's router to be upgraded as necessary and for a new child to be + prepared. + """ mitogen.parent.upgrade_router(econtext) context = econtext.router.fork() context.call(reset_temp_dir) - if not wrap_async: - try: - return context.call(run_module, kwargs) - finally: - context.shutdown() - - job_id = '%016x' % random.randint(0, 2**64) - context.call_async(run_module_async, job_id, kwargs) - return { - 'stdout': json.dumps({ - # modules/utilities/logic/async_wrapper.py::_run_module(). - 'changed': True, - 'started': 1, - 'finished': 0, - 'ansible_job_id': job_id, - }) - } + LOG.debug('create_fork_child() -> %r', context) + return context -@mitogen.core.takes_econtext -def run_module(kwargs, econtext): +def run_module(kwargs): """ Set up the process environment in preparation for running an Ansible module. This monkey-patches the Ansible libraries in various places to prevent it from trying to kill the process on completion, and to prevent it from reading sys.stdin. """ - should_fork = kwargs.pop('should_fork', False) - wrap_async = kwargs.pop('wrap_async', False) - if should_fork: - return _fork_parent.call(start_fork_child, wrap_async, kwargs) - runner_name = kwargs.pop('runner_name') klass = getattr(ansible_mitogen.runner, runner_name) impl = klass(**kwargs) @@ -335,21 +284,52 @@ def _write_job_status(job_id, dct): os.rename(path + '.tmp', path) -def _run_module_async(job_id, kwargs, econtext): +def _sigalrm(broker, timeout_secs, job_id): + """ + Respond to SIGALRM (job timeout) by updating the job file and killing the + process. """ - Body on run_module_async(). + msg = "Job reached maximum time limit of %d seconds." % (timeout_secs,) + _write_job_status(job_id, { + "failed": 1, + "finished": 1, + "msg": msg, + }) + broker.shutdown() + +def _install_alarm(broker, timeout_secs, job_id): + handler = lambda *_: _sigalrm(broker, timeout_secs, job_id) + signal.signal(signal.SIGALRM, handler) + signal.alarm(timeout_secs) + + +def _run_module_async(kwargs, job_id, timeout_secs, econtext): + """ 1. Immediately updates the status file to mark the job as started. 2. Installs a timer/signal handler to implement the time limit. 3. Runs as with run_module(), writing the result to the status file. + + :param dict kwargs: + Runner keyword arguments. + :param str job_id: + String job ID. + :param int timeout_secs: + If >0, limit the task's maximum run time. """ _write_job_status(job_id, { 'started': 1, - 'finished': 0 + 'finished': 0, + 'pid': os.getpid() }) + if timeout_secs > 0: + _install_alarm(econtext.broker, timeout_secs, job_id) + + kwargs['detach'] = True + kwargs['econtext'] = econtext kwargs['emulate_tty'] = False - dct = run_module(kwargs, econtext) + dct = run_module(kwargs) if mitogen.core.PY3: for key in 'stdout', 'stderr': dct[key] = dct[key].decode('utf-8', 'surrogateescape') @@ -373,18 +353,21 @@ def _run_module_async(job_id, kwargs, econtext): @mitogen.core.takes_econtext -def run_module_async(job_id, kwargs, econtext): +def run_module_async(kwargs, job_id, timeout_secs, econtext): """ - Since run_module_async() is invoked with .call_async(), with nothing to - read the result from the corresponding Receiver, wrap the body in an - exception logger, and wrap that in something that tears down the context on - completion. + Arrange for a module to be executed with its run status and result + serialized to a disk file. This function expects to run in a child forked + using :func:`create_fork_child`. """ try: try: - _run_module_async(job_id, kwargs, econtext) + _run_module_async(kwargs, job_id, timeout_secs, econtext) except Exception: - LOG.exception('_run_module_async crashed') + # Catch any (ansible_mitogen) bugs and write them to the job file. + _write_job_status(job_id, { + "failed": 1, + "msg": traceback.format_exc(), + }) finally: econtext.broker.shutdown() diff --git a/docs/ansible.rst b/docs/ansible.rst index c401dd42..be615375 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -69,6 +69,12 @@ Installation per-run basis. Like ``mitogen_linear``, the ``mitogen_free`` strategy exists to mimic the ``free`` strategy. +5. If targets have a restrictive ``sudoers`` file, add a rule like: + + .. code-block:: plain + + deploy = (ALL) NOPASSWD:/usr/bin/python -c* + Demo ~~~~ @@ -134,9 +140,6 @@ Noteworthy Differences artificial serialization, causing slowdown equivalent to `task_duration * num_targets`. This will be fixed soon. -* Asynchronous jobs presently exist only for the duration of a run, and time - limits are not implemented. - * "Module Replacer" style modules are not supported. These rarely appear in practice, and light web searches failed to reveal many examples of them. @@ -145,8 +148,8 @@ Noteworthy Differences may be established in parallel by default, this can be modified by setting the ``MITOGEN_POOL_SIZE`` environment variable. -* Performance does not scale perfectly linearly with target count. This will - improve over time. +* Performance does not scale linearly with target count. This will improve over + time. * SSH and ``become`` are treated distinctly when applying timeouts, and timeouts apply up to the point when the new interpreter is ready to accept @@ -195,11 +198,6 @@ container. Connection delegation is a work in progress, bug reports are welcome. - * While imports are cached on intermediaries, module scripts are needlessly - reuploaded for each target. Fixing this is equivalent to implementing - **Topology-Aware File Synchronization**, so it may remain unfixed until - that feature is started. - * Delegated connection setup is single-threaded; only one connection can be constructed in parallel per intermediary. @@ -642,6 +640,9 @@ is necessary. File-based logging can be enabled by setting When file-based logging is enabled, one file per context will be created on the local machine and every target machine, as ``/tmp/mitogen..log``. +If you are experiencing a hang, ``MITOGEN_DUMP_THREAD_STACKS=1`` causes every +process to dump every thread stack into the logging framework every 5 seconds. + Getting Help ~~~~~~~~~~~~ diff --git a/docs/api.rst b/docs/api.rst index 20389180..114f58d0 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -857,7 +857,7 @@ Context Class .. method:: call (fn, \*args, \*\*kwargs) - Equivalent to :py:meth:`call_async(fn, \*args, \**kwargs).get_data() + Equivalent to :py:meth:`call_async(fn, \*args, \**kwargs).get().unpickle() `. :returns: @@ -866,6 +866,14 @@ Context Class :raises mitogen.core.CallError: An exception was raised in the remote context during execution. + .. method:: call_no_reply (fn, \*args, \*\*kwargs) + + Send a function call, but expect no return value. If the call fails, + the full exception will be logged to the target context's logging framework. + + :raises mitogen.core.CallError: + An exception was raised in the remote context during execution. + Receiver Class diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 985f796a..0947e040 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -40,6 +40,41 @@ and possibly your team and its successors with: appropriate, prefer a higher level solution instead. +First Principles +---------------- + +Before starting, take a moment to reflect on writing a program that will +operate across machines and privilege domains: + +* As with multithreaded programming, writing a program that spans multiple + hosts is exposed to many asynchrony issues. Unlike multithreaded programming, + the margin for unexpected failures is much higher, even between only two + peers, as communication may be fail at any moment, since that communication + depends on reliability of an external network. + +* Since a multi-host program always spans trust and privilege domains, trust + must be taken into consideration in your design from the outset. Mitogen + attempts to protect the consuming application by default where possible, + however it is paramount that trust considerations are always in mind when + exposing any privileged functionality to a potentially untrusted network of + peers. + + A parent must always assume data received from a child is suspect, and must + not base privileged control decisions on that data. As a small example, a + parent should not form a command to execute in a subprocess using strings + received from a child. + +* As the program spans multiple hosts, its design will benefit from a strict + separation of program and data. This entails avoiding some common Python + idioms that rely on its ability to manipulate functions and closures as if + they were data, such as passing a lambda closed over some program state as a + callback parameter. + + In the general case this is both difficult and unsafe to support in a + distributed program, and so (for now at least) it should be assumed this + functionality is unlikely to appear in future. + + Broker And Router ----------------- @@ -330,6 +365,12 @@ Subclasses of built-in types must be undecorated using :py:func:`mitogen.utils.cast`. +Test Your Design +---------------- + +``tc qdisc add dev eth0 root netem delay 250ms`` + + .. _troubleshooting: Troubleshooting diff --git a/docs/services.rst b/docs/services.rst index 4c3f0ab1..49108e80 100644 --- a/docs/services.rst +++ b/docs/services.rst @@ -17,8 +17,9 @@ Overview Service * User-supplied class with explicitly exposed methods. -* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass). * May be auto-imported/constructed in a child from a parent simply by calling it +* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass) by + default, but may use any naming scheme the configured activator understands. * Children receive refusals if the class is not already activated by a aprent * Has an associated Select instance which may be dynamically loaded with receivers over time, on_message_received() invoked if any receiver becomes @@ -28,9 +29,12 @@ Invoker * Abstracts mechanism for calling a service method and verifying permissions. * Built-in 'service.Invoker': concurrent execution of all methods on the thread pool. +* Built-in 'service.SerializedInvoker': serialization of all calls on a single + thread borrowed from the pool while any request is pending. * Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct - (method, kwargs) key, only one such method executes, return value is cached - and broadcast to all requesters. + (method, kwargs) key, only one such method ever executes, return value is + cached and broadcast to all request waiters. Waiters do not block additional + pool threads. Activator diff --git a/mitogen/core.py b/mitogen/core.py index ccaf9ab0..ebd41f84 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -75,10 +75,6 @@ IOLOG.setLevel(logging.INFO) _v = False _vv = False -# Also taken by Broker, no blocking work can occur with it held. -_service_call_lock = threading.Lock() -_service_calls = [] - GET_MODULE = 100 CALL_FUNCTION = 101 FORWARD_LOG = 102 @@ -555,6 +551,7 @@ class Importer(object): 'jail', 'lxc', 'master', + 'minify', 'parent', 'select', 'service', @@ -622,6 +619,7 @@ class Importer(object): return None _tls.running = True + # TODO: hack: this is papering over a bug elsewhere. fullname = fullname.rstrip('.') try: pkgname, dot, _ = fullname.rpartition('.') @@ -715,6 +713,8 @@ class Importer(object): def load_module(self, fullname): _v and LOG.debug('Importer.load_module(%r)', fullname) + # TODO: hack: this is papering over a bug elsewhere. + fullname = fullname.rstrip('.') self._refuse_imports(fullname) event = threading.Event() @@ -863,7 +863,7 @@ class Stream(BasicStream): self._router = router self.remote_id = remote_id self.name = 'default' - self.sent_modules = set() + self.sent_modules = set(['mitogen', 'mitogen.core']) self.construct(**kwargs) self._input_buf = collections.deque() self._output_buf = collections.deque() @@ -1644,27 +1644,22 @@ class ExternalContext(object): if not self.config['profiling']: os.kill(os.getpid(), signal.SIGTERM) + def _service_stub_main(self, msg): + import mitogen.service + pool = mitogen.service.get_or_create_pool(router=self.router) + pool._receiver._on_receive(msg) + def _on_call_service_msg(self, msg): """ - Stub CALL_SERVICE handler, push message on temporary queue and invoke - _on_stub_call() from the main thread. + Stub service handler. Start a thread to import the mitogen.service + implementation from, and deliver the message to the newly constructed + pool. This must be done as CALL_SERVICE for e.g. PushFileService may + race with a CALL_FUNCTION blocking the main thread waiting for a result + from that service. """ - if msg.is_dead: - return - _service_call_lock.acquire() - try: - _service_calls.append(msg) - finally: - _service_call_lock.release() - - self.router.route( - Message.pickled( - dst_id=mitogen.context_id, - handle=CALL_FUNCTION, - obj=('mitogen.service', None, '_on_stub_call', (), {}), - router=self.router, - ) - ) + if not msg.is_dead: + th = threading.Thread(target=self._service_stub_main, args=(msg,)) + th.start() def _on_shutdown_msg(self, msg): _v and LOG.debug('_on_shutdown_msg(%r)', msg) @@ -1707,6 +1702,7 @@ class ExternalContext(object): enable_profiling() self.broker = Broker() self.router = Router(self.broker) + self.router.debug = self.config.get('debug', False) self.router.undirectional = self.config['unidirectional'] self.router.add_handler( fn=self._on_shutdown_msg, @@ -1855,11 +1851,17 @@ class ExternalContext(object): for msg in self.recv: try: - msg.reply(self._dispatch_one(msg)) + ret = self._dispatch_one(msg) + _v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret) + if msg.reply_to: + msg.reply(ret) except Exception: e = sys.exc_info()[1] - _v and LOG.debug('_dispatch_calls: %s', e) - msg.reply(CallError(e)) + if msg.reply_to: + _v and LOG.debug('_dispatch_calls: %s', e) + msg.reply(CallError(e)) + else: + LOG.exception('_dispatch_calls: %r', msg) self.dispatch_stopped = True def main(self): diff --git a/mitogen/debug.py b/mitogen/debug.py index f2746380..95f7db3e 100644 --- a/mitogen/debug.py +++ b/mitogen/debug.py @@ -41,7 +41,6 @@ import time import traceback import mitogen.core -import mitogen.master import mitogen.parent @@ -53,14 +52,26 @@ def _hex(n): return '%08x' % n +def get_subclasses(klass): + """ + Rather than statically import every interesting subclass, forcing it all to + be transferred and potentially disrupting the debugged environment, + enumerate only those loaded in memory. Also returns the original class. + """ + stack = [klass] + seen = set() + while stack: + klass = stack.pop() + seen.add(klass) + stack.extend(klass.__subclasses__()) + return seen + + def get_routers(): + kl return { _hex(id(router)): router - for klass in ( - mitogen.core.Router, - mitogen.parent.Router, - mitogen.master.Router, - ) + for klass in get_subclasses(mitogen.core.Router) for router in gc.get_referrers(klass) if isinstance(router, mitogen.core.Router) } diff --git a/mitogen/fork.py b/mitogen/fork.py index 8b29ad0c..dd6008db 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -51,7 +51,7 @@ def fixup_prngs(): sys.modules['ssl'].RAND_add(s, 75.0) -def break_logging_locks(): +def reset_logging_framework(): """ After fork, ensure any logging.Handler locks are recreated, as a variety of threads in the parent may have been using the logging package at the moment @@ -61,10 +61,19 @@ def break_logging_locks(): https://github.com/dw/mitogen/issues/150 for a full discussion. """ logging._lock = threading.RLock() - for name in logging.Logger.manager.loggerDict: + + # The root logger does not appear in the loggerDict. + for name in [None] + list(logging.Logger.manager.loggerDict): for handler in logging.getLogger(name).handlers: handler.createLock() + root = logging.getLogger() + root.handlers = [ + handler + for handler in root.handlers + if not isinstance(handler, mitogen.core.LogHandler) + ] + def handle_child_crash(): """ @@ -125,10 +134,10 @@ class Stream(mitogen.parent.Stream): handle_child_crash() def _child_main(self, childfp): + reset_logging_framework() # Must be first! + fixup_prngs() mitogen.core.Latch._on_fork() mitogen.core.Side._on_fork() - break_logging_locks() - fixup_prngs() if self.on_fork: self.on_fork() mitogen.core.set_block(childfp.fileno()) diff --git a/mitogen/master.py b/mitogen/master.py index 9a5b7e83..b024cdd1 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -52,8 +52,11 @@ if not hasattr(pkgutil, 'find_loader'): # been kept intentionally 2.3 compatible so we can reuse it. from mitogen.compat import pkgutil +import mitogen import mitogen.core +import mitogen.minify import mitogen.parent +from mitogen.core import IOLOG from mitogen.core import LOG @@ -79,6 +82,19 @@ def get_child_modules(path): return [name for _, name, _ in it] +def get_core_source(): + """ + Master version of parent.get_core_source(). + """ + source = inspect.getsource(mitogen.core) + return mitogen.minify.minimize_source(source) + + +if mitogen.is_master: + # TODO: find a less surprising way of installing this. + mitogen.parent.get_core_source = get_core_source + + LOAD_CONST = dis.opname.index('LOAD_CONST') IMPORT_NAME = dis.opname.index('IMPORT_NAME') @@ -290,8 +306,8 @@ class ModuleFinder(object): """Attempt to fetch source code via pkgutil. In an ideal world, this would be the only required implementation of get_module().""" loader = pkgutil.find_loader(fullname) - LOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r', - fullname, loader) + IOLOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r', + fullname, loader) if not loader: return @@ -523,11 +539,41 @@ class ModuleResponder(object): self._cache[fullname] = tup return tup - def _send_load_module(self, stream, msg, fullname): - LOG.debug('_send_load_module(%r, %r)', stream, fullname) - msg.reply(self._build_tuple(fullname), - handle=mitogen.core.LOAD_MODULE) - stream.sent_modules.add(fullname) + def _send_load_module(self, stream, fullname): + if fullname not in stream.sent_modules: + LOG.debug('_send_load_module(%r, %r)', stream, fullname) + self._router._async_route( + mitogen.core.Message.pickled( + self._build_tuple(fullname), + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, + ) + ) + stream.sent_modules.add(fullname) + + def _send_module_load_failed(self, stream, fullname): + stream.send( + mitogen.core.Message.pickled( + (fullname, None, None, None, ()), + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, + ) + ) + + def _send_module_and_related(self, stream, fullname): + try: + tup = self._build_tuple(fullname) + for name in tup[4]: # related + parent, _, _ = name.partition('.') + if parent != fullname and parent not in stream.sent_modules: + # Parent hasn't been sent, so don't load submodule yet. + continue + + self._send_load_module(stream, name) + self._send_load_module(stream, fullname) + except Exception: + LOG.debug('While importing %r', fullname, exc_info=True) + self._send_module_load_failed(stream, fullname) def _on_get_module(self, msg): if msg.is_dead: @@ -540,25 +586,32 @@ class ModuleResponder(object): LOG.warning('_on_get_module(): dup request for %r from %r', fullname, stream) - try: - tup = self._build_tuple(fullname) - for name in tup[4]: # related - parent, _, _ = name.partition('.') - if parent != fullname and parent not in stream.sent_modules: - # Parent hasn't been sent, so don't load submodule yet. - continue + self._send_module_and_related(stream, fullname) - if name in stream.sent_modules: - # Submodule has been sent already, skip. - continue + def _send_forward_module(self, stream, context, fullname): + if stream.remote_id != context.context_id: + stream.send( + mitogen.core.Message( + data='%s\x00%s' % (context.context_id, fullname), + handle=mitogen.core.FORWARD_MODULE, + dst_id=stream.remote_id, + ) + ) - self._send_load_module(stream, msg, name) - self._send_load_module(stream, msg, fullname) + def _forward_module(self, context, fullname): + IOLOG.debug('%r._forward_module(%r, %r)', self, context, fullname) + path = [] + while fullname: + path.append(fullname) + fullname, _, _ = fullname.rpartition('.') - except Exception: - LOG.debug('While importing %r', fullname, exc_info=True) - msg.reply((fullname, None, None, None, ()), - handle=mitogen.core.LOAD_MODULE) + for fullname in reversed(path): + stream = self._router.stream_by_id(context.context_id) + self._send_module_and_related(stream, fullname) + self._send_forward_module(stream, context, fullname) + + def forward_module(self, context, fullname): + self._router.broker.defer(self._forward_module, context, fullname) class Broker(mitogen.core.Broker): @@ -652,7 +705,7 @@ class IdAllocator(object): id_ = self.next_id self.next_id += self.BLOCK_SIZE end_id = id_ + self.BLOCK_SIZE - LOG.debug('%r: allocating (%d..%d]', self, id_, end_id) + LOG.debug('%r: allocating [%d..%d)', self, id_, end_id) return id_, end_id finally: self.lock.release() @@ -666,5 +719,5 @@ class IdAllocator(object): allocated = self.router.context_by_id(id_, msg.src_id) LOG.debug('%r: allocating [%r..%r) to %r', - self, allocated, requestee, msg.src_id) + self, id_, last_id, requestee) msg.reply((id_, last_id)) diff --git a/mitogen/minify.py b/mitogen/minify.py new file mode 100644 index 00000000..1d6f8d11 --- /dev/null +++ b/mitogen/minify.py @@ -0,0 +1,134 @@ +# Copyright 2017, Alex Willmer +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import sys + + +try: + from cStringIO import StringIO as BytesIO +except ImportError: + from io import BytesIO + +if sys.version_info < (2, 7, 11): + from mitogen.compat import tokenize +else: + import tokenize + +try: + from functools import lru_cache +except ImportError: + from mitogen.compat.functools import lru_cache + + +@lru_cache() +def minimize_source(source): + """Remove most comments and docstrings from Python source code. + """ + tokens = tokenize.generate_tokens(BytesIO(source).readline) + tokens = strip_comments(tokens) + tokens = strip_docstrings(tokens) + tokens = reindent(tokens) + return tokenize.untokenize(tokens) + + +def strip_comments(tokens): + """Drop comment tokens from a `tokenize` stream. + + Comments on lines 1-2 are kept, to preserve hashbang and encoding. + Trailing whitespace is remove from all lines. + """ + prev_typ = None + prev_end_col = 0 + for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens: + if typ in (tokenize.NL, tokenize.NEWLINE): + if prev_typ in (tokenize.NL, tokenize.NEWLINE): + start_col = 0 + else: + start_col = prev_end_col + end_col = start_col + 1 + elif typ == tokenize.COMMENT and start_row > 2: + continue + prev_typ = typ + prev_end_col = end_col + yield typ, tok, (start_row, start_col), (end_row, end_col), line + + +def strip_docstrings(tokens): + """Replace docstring tokens with NL tokens in a `tokenize` stream. + + Any STRING token not part of an expression is deemed a docstring. + Indented docstrings are not yet recognised. + """ + stack = [] + state = 'wait_string' + for t in tokens: + typ = t[0] + if state == 'wait_string': + if typ in (tokenize.NL, tokenize.COMMENT): + yield t + elif typ in (tokenize.DEDENT, tokenize.INDENT, tokenize.STRING): + stack.append(t) + elif typ == tokenize.NEWLINE: + stack.append(t) + start_line, end_line = stack[0][2][0], stack[-1][3][0]+1 + for i in range(start_line, end_line): + yield tokenize.NL, '\n', (i, 0), (i,1), '\n' + for t in stack: + if t[0] in (tokenize.DEDENT, tokenize.INDENT): + yield t[0], t[1], (i+1, t[2][1]), (i+1, t[3][1]), t[4] + del stack[:] + else: + stack.append(t) + for t in stack: yield t + del stack[:] + state = 'wait_newline' + elif state == 'wait_newline': + if typ == tokenize.NEWLINE: + state = 'wait_string' + yield t + + +def reindent(tokens, indent=' '): + """Replace existing indentation in a token steam, with `indent`. + """ + old_levels = [] + old_level = 0 + new_level = 0 + for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens: + if typ == tokenize.INDENT: + old_levels.append(old_level) + old_level = len(tok) + new_level += 1 + tok = indent * new_level + elif typ == tokenize.DEDENT: + old_level = old_levels.pop() + new_level -= 1 + start_col = max(0, start_col - old_level + new_level) + if start_row == end_row: + end_col = start_col + len(tok) + yield typ, tok, (start_row, start_col), (end_row, end_col), line diff --git a/mitogen/parent.py b/mitogen/parent.py index 9436591e..7e095dea 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -52,21 +52,6 @@ import zlib # Absolute imports for <2.5. select = __import__('select') -try: - from cStringIO import StringIO as BytesIO -except ImportError: - from io import BytesIO - -if sys.version_info < (2, 7, 11): - from mitogen.compat import tokenize -else: - import tokenize - -try: - from functools import lru_cache -except ImportError: - from mitogen.compat.functools import lru_cache - import mitogen.core from mitogen.core import LOG from mitogen.core import IOLOG @@ -82,101 +67,21 @@ def get_log_level(): return (LOG.level or logging.getLogger().level or logging.INFO) -def is_immediate_child(msg, stream): - """ - Handler policy that requires messages to arrive only from immediately - connected children. - """ - return msg.src_id == stream.remote_id - - -@lru_cache() -def minimize_source(source): - """Remove most comments and docstrings from Python source code. +def get_core_source(): """ - tokens = tokenize.generate_tokens(BytesIO(source).readline) - tokens = strip_comments(tokens) - tokens = strip_docstrings(tokens) - tokens = reindent(tokens) - return tokenize.untokenize(tokens) - - -def strip_comments(tokens): - """Drop comment tokens from a `tokenize` stream. - - Comments on lines 1-2 are kept, to preserve hashbang and encoding. - Trailing whitespace is remove from all lines. + In non-masters, simply fetch the cached mitogen.core source code via the + import mechanism. In masters, this function is replaced with a version that + performs minification directly. """ - prev_typ = None - prev_end_col = 0 - for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens: - if typ in (tokenize.NL, tokenize.NEWLINE): - if prev_typ in (tokenize.NL, tokenize.NEWLINE): - start_col = 0 - else: - start_col = prev_end_col - end_col = start_col + 1 - elif typ == tokenize.COMMENT and start_row > 2: - continue - prev_typ = typ - prev_end_col = end_col - yield typ, tok, (start_row, start_col), (end_row, end_col), line - + return inspect.getsource(mitogen.core) -def strip_docstrings(tokens): - """Replace docstring tokens with NL tokens in a `tokenize` stream. - Any STRING token not part of an expression is deemed a docstring. - Indented docstrings are not yet recognised. +def is_immediate_child(msg, stream): """ - stack = [] - state = 'wait_string' - for t in tokens: - typ = t[0] - if state == 'wait_string': - if typ in (tokenize.NL, tokenize.COMMENT): - yield t - elif typ in (tokenize.DEDENT, tokenize.INDENT, tokenize.STRING): - stack.append(t) - elif typ == tokenize.NEWLINE: - stack.append(t) - start_line, end_line = stack[0][2][0], stack[-1][3][0]+1 - for i in range(start_line, end_line): - yield tokenize.NL, '\n', (i, 0), (i,1), '\n' - for t in stack: - if t[0] in (tokenize.DEDENT, tokenize.INDENT): - yield t[0], t[1], (i+1, t[2][1]), (i+1, t[3][1]), t[4] - del stack[:] - else: - stack.append(t) - for t in stack: yield t - del stack[:] - state = 'wait_newline' - elif state == 'wait_newline': - if typ == tokenize.NEWLINE: - state = 'wait_string' - yield t - - -def reindent(tokens, indent=' '): - """Replace existing indentation in a token steam, with `indent`. + Handler policy that requires messages to arrive only from immediately + connected children. """ - old_levels = [] - old_level = 0 - new_level = 0 - for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens: - if typ == tokenize.INDENT: - old_levels.append(old_level) - old_level = len(tok) - new_level += 1 - tok = indent * new_level - elif typ == tokenize.DEDENT: - old_level = old_levels.pop() - new_level -= 1 - start_col = max(0, start_col - old_level + new_level) - if start_row == end_row: - end_col = start_col + len(tok) - yield typ, tok, (start_row, start_col), (end_row, end_col), line + return msg.src_id == stream.remote_id def flags(names): @@ -498,8 +403,7 @@ def stream_by_method_name(name): @mitogen.core.takes_econtext def _proxy_connect(name, method_name, kwargs, econtext): - - mitogen.parent.upgrade_router(econtext) + upgrade_router(econtext) try: context = econtext.router._connect( klass=stream_by_method_name(method_name), @@ -921,11 +825,11 @@ class Stream(mitogen.core.Stream): } def get_preamble(self): - source = inspect.getsource(mitogen.core) + source = get_core_source() source += '\nExternalContext(%r).main()\n' % ( self.get_econtext_config(), ) - return zlib.compress(minimize_source(source), 9) + return zlib.compress(source, 9) create_child = staticmethod(create_child) create_child_args = {} @@ -1008,6 +912,11 @@ class Context(mitogen.core.Context): receiver = self.call_async(fn, *args, **kwargs) return receiver.get().unpickle(throw_dead=False) + def call_no_reply(self, fn, *args, **kwargs): + LOG.debug('%r.call_no_reply(%r, *%r, **%r)', + self, fn, args, kwargs) + self.send(make_call_msg(fn, *args, **kwargs)) + def shutdown(self, wait=False): LOG.debug('%r.shutdown() sending SHUTDOWN', self) latch = mitogen.core.Latch() @@ -1327,7 +1236,7 @@ class ModuleForwarder(object): if msg.is_dead: return - context_id_s, fullname = msg.data.partition('\x00') + context_id_s, _, fullname = msg.data.partition('\x00') context_id = int(context_id_s) stream = self.router.stream_by_id(context_id) if stream.remote_id == mitogen.parent_id: @@ -1335,15 +1244,18 @@ class ModuleForwarder(object): self, context_id, fullname) return + if fullname in stream.sent_modules: + return + LOG.debug('%r._on_forward_module() sending %r to %r via %r', self, fullname, context_id, stream.remote_id) self._send_module_and_related(stream, fullname) if stream.remote_id != context_id: stream._send( mitogen.core.Message( - dst_id=stream.remote_id, - handle=mitogen.core.FORWARD_MODULE, data=msg.data, + handle=mitogen.core.FORWARD_MODULE, + dst_id=stream.remote_id, ) ) diff --git a/mitogen/service.py b/mitogen/service.py index 6719f833..62180e33 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -26,9 +26,15 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import grp +import os +import os.path import pprint +import pwd +import stat import sys import threading +import time import mitogen.core import mitogen.select @@ -37,34 +43,23 @@ from mitogen.core import LOG DEFAULT_POOL_SIZE = 16 _pool = None +_pool_pid = None +#: Serialize pool construction. +_pool_lock = threading.Lock() @mitogen.core.takes_router def get_or_create_pool(size=None, router=None): global _pool - if _pool is None: - _pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE) - return _pool - - -@mitogen.core.takes_router -def _on_stub_call(router): - """ - Called for each message received by the core.py stub CALL_SERVICE handler. - Create the pool if it doesn't already exist, and push enqueued messages - into the pool's receiver. This may be called more than once as the stub - service handler runs in asynchronous context, while _on_stub_call() happens - on the main thread. Multiple CALL_SERVICE may end up enqueued before Pool - has a chance to install the real CALL_SERVICE handler. - """ - pool = get_or_create_pool(router=router) - mitogen.core._service_call_lock.acquire() + global _pool_pid + _pool_lock.acquire() try: - for msg in mitogen.core._service_calls: - pool._receiver._on_receive(msg) - del mitogen.core._service_calls[:] + if _pool_pid != os.getpid(): + _pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE) + _pool_pid = os.getpid() + return _pool finally: - mitogen.core._service_call_lock.release() + _pool_lock.release() def validate_arg_spec(spec, args): @@ -149,6 +144,7 @@ class Error(Exception): """ Raised when an error occurs configuring a service or pool. """ + pass # cope with minify_source() bug. class Policy(object): @@ -183,12 +179,12 @@ class Activator(object): def activate(self, pool, service_name, msg): mod_name, _, class_name = service_name.rpartition('.') - if not self.is_permitted(mod_name, class_name, msg): + if msg and not self.is_permitted(mod_name, class_name, msg): raise mitogen.core.CallError(self.not_active_msg, service_name) module = mitogen.core.import_module(mod_name) klass = getattr(module, class_name) - service = klass(pool.router) + service = klass(router=pool.router) pool.add(service) return service @@ -238,7 +234,8 @@ class Invoker(object): except Exception: if no_reply: LOG.exception('While calling no-reply method %s.%s', - type(self).__name__, method.func_name) + type(self.service).__name__, + method.func_name) else: raise @@ -249,6 +246,50 @@ class Invoker(object): msg.reply(response) +class SerializedInvoker(Invoker): + def __init__(self, **kwargs): + super(SerializedInvoker, self).__init__(**kwargs) + self._lock = threading.Lock() + self._queue = [] + self._running = False + + def _pop(self): + self._lock.acquire() + try: + try: + return self._queue.pop(0) + except IndexError: + self._running = False + finally: + self._lock.release() + + def _run(self): + while True: + tup = self._pop() + if tup is None: + return + method_name, kwargs, msg = tup + try: + super(SerializedInvoker, self).invoke(method_name, kwargs, msg) + except Exception: + LOG.exception('%r: while invoking %r of %r', + self, method_name, self.service) + msg.reply(mitogen.core.Message.dead()) + + def invoke(self, method_name, kwargs, msg): + self._lock.acquire() + try: + self._queue.append((method_name, kwargs, msg)) + first = not self._running + self._running = True + finally: + self._lock.release() + + if first: + self._run() + return Service.NO_REPLY + + class DeduplicatingInvoker(Invoker): """ A service that deduplicates and caches expensive responses. Requests are @@ -398,6 +439,8 @@ class Pool(object): thread.start() self._threads.append(thread) + LOG.debug('%r: initialized', self) + @property def size(self): return len(self._threads) @@ -407,7 +450,7 @@ class Pool(object): if name in self._invoker_by_name: raise Error('service named %r already registered' % (name,)) assert service.select not in self._func_by_recv - invoker = service.invoker_class(service) + invoker = service.invoker_class(service=service) self._invoker_by_name[name] = invoker self._func_by_recv[service.select] = service.on_message @@ -427,13 +470,17 @@ class Pool(object): invoker = self._invoker_by_name.get(name) if not invoker: service = self._activator.activate(self, name, msg) - invoker = service.invoker_class(service) + invoker = service.invoker_class(service=service) self._invoker_by_name[name] = invoker finally: self._lock.release() return invoker + def get_service(self, name): + invoker = self.get_invoker(name, None) + return invoker.service + def _validate(self, msg): tup = msg.unpickle(throw=False) if not (isinstance(tup, tuple) and @@ -454,7 +501,8 @@ class Pool(object): LOG.warning('%r: call error: %s: %s', self, msg, e) msg.reply(e) except Exception: - LOG.exception('While invoking %r._invoke()', self) + LOG.exception('%r: while invoking %r of %r', + self, method_name, service_name) e = sys.exc_info()[1] msg.reply(mitogen.core.CallError(e)) @@ -488,3 +536,405 @@ class Pool(object): len(self._threads), th.name, ) + + +class FileStreamState(object): + def __init__(self): + #: List of [(Sender, file object)] + self.jobs = [] + self.completing = {} + #: In-flight byte count. + self.unacked = 0 + #: Lock. + self.lock = threading.Lock() + + +class PushFileService(Service): + """ + Push-based file service. Files are delivered and cached in RAM, sent + recursively from parent to child. A child that requests a file via + :meth:`get` will block until it has ben delivered by a parent. + + This service will eventually be merged into FileService. + """ + invoker_class = SerializedInvoker + + def __init__(self, **kwargs): + super(PushFileService, self).__init__(**kwargs) + self._lock = threading.Lock() + self._cache = {} + self._waiters = {} + self._sent_by_stream = {} + + def get(self, path): + self._lock.acquire() + try: + if path in self._cache: + return self._cache[path] + waiters = self._waiters.setdefault(path, []) + latch = mitogen.core.Latch() + waiters.append(lambda: latch.put(None)) + finally: + self._lock.release() + + LOG.debug('%r.get(%r) waiting for uncached file to arrive', self, path) + latch.get() + LOG.debug('%r.get(%r) -> %r', self, path, self._cache[path]) + return self._cache[path] + + def _forward(self, context, path): + stream = self.router.stream_by_id(context.context_id) + child = mitogen.core.Context(self.router, stream.remote_id) + sent = self._sent_by_stream.setdefault(stream, set()) + if path in sent and child.context_id != context.context_id: + child.call_service_async( + service_name=self.name(), + method_name='forward', + path=path, + context=context + ).close() + else: + child.call_service_async( + service_name=self.name(), + method_name='store_and_forward', + path=path, + data=self._cache[path], + context=context + ).close() + + @expose(policy=AllowParents()) + @arg_spec({ + 'context': mitogen.core.Context, + 'paths': list, + 'modules': list, + }) + def propagate_paths_and_modules(self, context, paths, modules): + """ + One size fits all method to ensure a target context has been preloaded + with a set of small files and Python modules. + """ + for path in paths: + self.propagate_to(context, path) + for fullname in modules: + self.router.responder.forward_module(context, fullname) + + @expose(policy=AllowParents()) + @arg_spec({ + 'context': mitogen.core.Context, + 'path': basestring, + }) + def propagate_to(self, context, path): + LOG.debug('%r.propagate_to(%r, %r)', self, context, path) + if path not in self._cache: + fp = open(path, 'rb') + try: + self._cache[path] = mitogen.core.Blob(fp.read()) + finally: + fp.close() + self._forward(context, path) + + def _store(self, path, data): + self._lock.acquire() + try: + self._cache[path] = data + return self._waiters.pop(path, []) + finally: + self._lock.release() + + @expose(policy=AllowParents()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'data': mitogen.core.Blob, + 'context': mitogen.core.Context, + }) + def store_and_forward(self, path, data, context): + LOG.debug('%r.store_and_forward(%r, %r, %r)', + self, path, data, context) + waiters = self._store(path, data) + if context.context_id != mitogen.context_id: + self._forward(context, path) + for callback in waiters: + callback() + + @expose(policy=AllowParents()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'context': mitogen.core.Context, + }) + def forward(self, path, context): + LOG.debug('%r.forward(%r, %r)', self, path, context) + if path not in self._cache: + LOG.error('%r: %r is not in local cache', self, path) + return + self._forward(path, context) + + +class FileService(Service): + """ + Streaming file server, used to serve small and huge files alike. Paths must + be registered by a trusted context before they will be served to a child. + + Transfers are divided among the physical streams that connect external + contexts, ensuring each stream never has excessive data buffered in RAM, + while still maintaining enough to fully utilize available bandwidth. This + is achieved by making an initial bandwidth assumption, enqueueing enough + chunks to fill that assumed pipe, then responding to delivery + acknowledgements from the receiver by scheduling new chunks. + + Transfers proceed one-at-a-time per stream. When multiple contexts exist on + a stream (e.g. one is the SSH account, another is a sudo account, and a + third is a proxied SSH connection), each request is satisfied in turn + before subsequent requests start flowing. This ensures when a stream is + contended, priority is given to completing individual transfers rather than + potentially aborting many partial transfers, causing the bandwidth to be + wasted. + + Theory of operation: + 1. Trusted context (i.e. WorkerProcess) calls register(), making a + file available to any untrusted context. + 2. Requestee context creates a mitogen.core.Receiver() to receive + chunks, then calls fetch(path, recv.to_sender()), to set up the + transfer. + 3. fetch() replies to the call with the file's metadata, then + schedules an initial burst up to the window size limit (1MiB). + 4. Chunks begin to arrive in the requestee, which calls acknowledge() + for each 128KiB received. + 5. The acknowledge() call arrives at FileService, which scheduled a new + chunk to refill the drained window back to the size limit. + 6. When the last chunk has been pumped for a single transfer, + Sender.close() is called causing the receive loop in + target.py::_get_file() to exit, allowing that code to compare the + transferred size with the total file size from the metadata. + 7. If the sizes mismatch, _get_file()'s caller is informed which will + discard the result and log/raise an error. + + Shutdown: + 1. process.py calls service.Pool.shutdown(), which arranges for the + service pool threads to exit and be joined, guranteeing no new + requests can arrive, before calling Service.on_shutdown() for each + registered service. + 2. FileService.on_shutdown() walks every in-progress transfer and calls + Sender.close(), causing Receiver loops in the requestees to exit + early. The size check fails and any partially downloaded file is + discarded. + 3. Control exits _get_file() in every target, and graceful shutdown can + proceed normally, without the associated thread needing to be + forcefully killed. + """ + unregistered_msg = 'Path is not registered with FileService.' + context_mismatch_msg = 'sender= kwarg context must match requestee context' + + #: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which + #: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5. + window_size_bytes = 1048576 + + def __init__(self, router): + super(FileService, self).__init__(router) + #: Mapping of registered path -> file size. + self._metadata_by_path = {} + #: Mapping of Stream->FileStreamState. + self._state_by_stream = {} + + def _name_or_none(self, func, n, attr): + try: + return getattr(func(n), attr) + except KeyError: + return None + + @expose(policy=AllowParents()) + @arg_spec({ + 'path': basestring, + }) + def register(self, path): + """ + Authorize a path for access by children. Repeat calls with the same + path is harmless. + + :param str path: + File path. + """ + if path in self._metadata_by_path: + return + + st = os.stat(path) + if not stat.S_ISREG(st.st_mode): + raise IOError('%r is not a regular file.' % (in_path,)) + + LOG.debug('%r: registering %r', self, path) + self._metadata_by_path[path] = { + 'size': st.st_size, + 'mode': st.st_mode, + 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), + 'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'), + 'mtime': st.st_mtime, + 'atime': st.st_atime, + } + + def on_shutdown(self): + """ + Respond to shutdown by sending close() to every target, allowing their + receive loop to exit and clean up gracefully. + """ + LOG.debug('%r.on_shutdown()', self) + for stream, state in self._state_by_stream.items(): + state.lock.acquire() + try: + for sender, fp in reversed(state.jobs): + sender.close() + fp.close() + state.jobs.pop() + finally: + state.lock.release() + + # The IO loop pumps 128KiB chunks. An ideal message is a multiple of this, + # odd-sized messages waste one tiny write() per message on the trailer. + # Therefore subtract 10 bytes pickle overhead + 24 bytes header. + IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + ( + len( + mitogen.core.Message.pickled( + mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE) + ).data + ) - mitogen.core.CHUNK_SIZE + )) + + def _schedule_pending_unlocked(self, state): + """ + Consider the pending transfers for a stream, pumping new chunks while + the unacknowledged byte count is below :attr:`window_size_bytes`. Must + be called with the FileStreamState lock held. + + :param FileStreamState state: + Stream to schedule chunks for. + """ + while state.jobs and state.unacked < self.window_size_bytes: + sender, fp = state.jobs[0] + s = fp.read(self.IO_SIZE) + if s: + state.unacked += len(s) + sender.send(mitogen.core.Blob(s)) + else: + # File is done. Cause the target's receive loop to exit by + # closing the sender, close the file, and remove the job entry. + sender.close() + fp.close() + state.jobs.pop(0) + + @expose(policy=AllowAny()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'sender': mitogen.core.Sender, + }) + def fetch(self, path, sender, msg): + """ + Start a transfer for a registered path. + + :param str path: + File path. + :param mitogen.core.Sender sender: + Sender to receive file data. + :returns: + Dict containing the file metadata: + + * ``size``: File size in bytes. + * ``mode``: Integer file mode. + * ``owner``: Owner account name on host machine. + * ``group``: Owner group name on host machine. + * ``mtime``: Floating point modification time. + * ``ctime``: Floating point change time. + :raises Error: + Unregistered path, or Sender did not match requestee context. + """ + if path not in self._metadata_by_path: + raise Error(self.unregistered_msg) + if msg.src_id != sender.context.context_id: + raise Error(self.context_mismatch_msg) + + LOG.debug('Serving %r', path) + fp = open(path, 'rb', self.IO_SIZE) + # Response must arrive first so requestee can begin receive loop, + # otherwise first ack won't arrive until all pending chunks were + # delivered. In that case max BDP would always be 128KiB, aka. max + # ~10Mbit/sec over a 100ms link. + msg.reply(self._metadata_by_path[path]) + + stream = self.router.stream_by_id(sender.context.context_id) + state = self._state_by_stream.setdefault(stream, FileStreamState()) + state.lock.acquire() + try: + state.jobs.append((sender, fp)) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() + + @expose(policy=AllowAny()) + @no_reply() + @arg_spec({ + 'size': int, + }) + @no_reply() + def acknowledge(self, size, msg): + """ + Acknowledge bytes received by a transfer target, scheduling new chunks + to keep the window full. This should be called for every chunk received + by the target. + """ + stream = self.router.stream_by_id(msg.src_id) + state = self._state_by_stream[stream] + state.lock.acquire() + try: + if state.unacked < size: + LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d', + self, msg.src_id, state.unacked, size) + state.unacked -= min(state.unacked, size) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() + + @classmethod + def get(cls, context, path, out_fp): + """ + Streamily download a file from the connection multiplexer process in + the controller. + + :param mitogen.core.Context context: + Reference to the context hosting the FileService that will be used + to fetch the file. + :param bytes in_path: + FileService registered name of the input file. + :param bytes out_path: + Name of the output path on the local disk. + :returns: + :data:`True` on success, or :data:`False` if the transfer was + interrupted and the output should be discarded. + """ + LOG.debug('get_file(): fetching %r from %r', path, context) + t0 = time.time() + recv = mitogen.core.Receiver(router=context.router) + metadata = context.call_service( + service_name=cls.name(), + method_name='fetch', + path=path, + sender=recv.to_sender(), + ) + + for chunk in recv: + s = chunk.unpickle() + LOG.debug('get_file(%r): received %d bytes', path, len(s)) + context.call_service_async( + service_name=cls.name(), + method_name='acknowledge', + size=len(s), + ).close() + out_fp.write(s) + + ok = out_fp.tell() == metadata['size'] + if not ok: + LOG.error('get_file(%r): receiver was closed early, controller ' + 'is likely shutting down.', path) + + LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', + metadata['size'], path, context, 1000 * (time.time() - t0)) + return ok, metadata diff --git a/preamble_size.py b/preamble_size.py index 6e3c7924..bca55ab1 100644 --- a/preamble_size.py +++ b/preamble_size.py @@ -8,7 +8,9 @@ import zlib import mitogen.fakessh import mitogen.master +import mitogen.minify import mitogen.parent +import mitogen.select import mitogen.service import mitogen.ssh import mitogen.sudo @@ -34,16 +36,17 @@ print( ) for mod in ( - mitogen.master, mitogen.parent, - mitogen.service, mitogen.ssh, mitogen.sudo, + mitogen.select, + mitogen.service, mitogen.fakessh, + mitogen.master, ): original = inspect.getsource(mod) original_size = len(original) - minimized = mitogen.parent.minimize_source(original) + minimized = mitogen.minify.minimize_source(original) minimized_size = len(minimized) compressed = zlib.compress(minimized, 9) compressed_size = len(compressed) diff --git a/tests/ansible/gcloud/ansible.cfg b/tests/ansible/gcloud/ansible.cfg index d1fcd982..75be745c 100644 --- a/tests/ansible/gcloud/ansible.cfg +++ b/tests/ansible/gcloud/ansible.cfg @@ -1,3 +1,6 @@ [defaults] +strategy_plugins = ../../../ansible_mitogen/plugins/strategy +strategy = mitogen inventory = hosts retry_files_enabled = False +host_key_checking = False diff --git a/tests/ansible/gcloud/controller.yml b/tests/ansible/gcloud/controller.yml index 4c768510..48f233d9 100644 --- a/tests/ansible/gcloud/controller.yml +++ b/tests/ansible/gcloud/controller.yml @@ -1,9 +1,6 @@ - hosts: controller tasks: - - shell: "rsync -a ~/.ssh {{inventory_hostname}}:" - connection: local - - lineinfile: line: "net.ipv4.ip_forward=1" path: /etc/sysctl.conf @@ -30,6 +27,10 @@ - libsasl2-dev - build-essential - git + - rsync + + - shell: "rsync -a ~/.ssh {{inventory_hostname}}:" + connection: local - git: dest: ~/mitogen @@ -39,7 +40,7 @@ - git: dest: ~/ansible repo: https://github.com/dw/ansible.git - version: lazy-vars + version: dmw - pip: virtualenv: ~/venv diff --git a/tests/ansible/integration/async/all.yml b/tests/ansible/integration/async/all.yml index b295b526..17969ead 100644 --- a/tests/ansible/integration/async/all.yml +++ b/tests/ansible/integration/async/all.yml @@ -1,5 +1,8 @@ - import_playbook: result_binary_producing_json.yml - import_playbook: result_binary_producing_junk.yml - import_playbook: result_shell_echo_hi.yml +- import_playbook: runner_new_process.yml - import_playbook: runner_one_job.yml +- import_playbook: runner_timeout_then_polling.yml +- import_playbook: runner_with_polling_and_timeout.yml - import_playbook: runner_two_simultaneous_jobs.yml diff --git a/tests/ansible/integration/async/result_binary_producing_json.yml b/tests/ansible/integration/async/result_binary_producing_json.yml index 8b6d59b9..61d63a08 100644 --- a/tests/ansible/integration/async/result_binary_producing_json.yml +++ b/tests/ansible/integration/async/result_binary_producing_json.yml @@ -10,7 +10,21 @@ poll: 0 register: job - - shell: sleep 1 + - assert: + that: | + job.ansible_job_id and + (job.changed == True) and + (job.started == 1) and + (job.changed == True) and + (job.finished == 0) + + - name: busy-poll up to 100000 times + async_status: + jid: "{{job.ansible_job_id}}" + register: result + until: result.finished + retries: 100000 + delay: 0 - slurp: src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}" diff --git a/tests/ansible/integration/async/runner_job_timeout.yml b/tests/ansible/integration/async/runner_job_timeout.yml deleted file mode 100644 index e279c5cb..00000000 --- a/tests/ansible/integration/async/runner_job_timeout.yml +++ /dev/null @@ -1,52 +0,0 @@ -# Verify 'async: ' functions as desired. - -- name: integration/async/runner_job_timeout.yml - hosts: test-targets - any_errors_fatal: true - tasks: - - # Verify async-with-polling-and-timeout behaviour. - - - name: sleep for 7 seconds, but timeout after 1 second. - ignore_errors: true - shell: sleep 7 - async: 1 - poll: 1 - register: job1 - - - assert: - that: - - job1.changed == False - - job1.failed == True - - job1.msg == "async task did not complete within the requested time" - - job1.keys()|sort == ['changed', 'failed', 'msg'] - - # Verify async-with-timeout-then-poll behaviour. - # This is broken in upstream Ansible, so disable the tests there. - # - # TODO: the tests below are totally broken, not clear what Ansible is - # supposed to do here, so can't emulate it in Mitogen. - - - name: sleep for 7 seconds, but timeout after 1 second. - ignore_errors: true - shell: sleep 7 - async: 1 - poll: 0 - register: job2 - when: false # is_mitogen - - - name: poll up to 10 times. - async_status: - jid: "{{job2.ansible_job_id}}" - register: result2 - until: result2.finished - retries: 10 - delay: 1 - when: false # is_mitogen - - - assert: - that: - - result1.rc == 0 - - result2.rc == 0 - - result2.stdout == 'im_alive' - when: false # is_mitogen diff --git a/tests/ansible/integration/async/runner_new_process.yml b/tests/ansible/integration/async/runner_new_process.yml new file mode 100644 index 00000000..7b0bf628 --- /dev/null +++ b/tests/ansible/integration/async/runner_new_process.yml @@ -0,0 +1,54 @@ +# Verify async jobs run in a new process. + +- name: integration/async/runner_new_process.yml + hosts: test-targets + any_errors_fatal: true + tasks: + + - name: get process ID. + custom_python_detect_environment: + register: sync_proc1 + + - name: get process ID again. + custom_python_detect_environment: + register: sync_proc2 + + - assert: + that: + - sync_proc1.pid == sync_proc2.pid + when: is_mitogen + + - name: get async process ID. + custom_python_detect_environment: + register: async_proc1 + async: 1000 + poll: 0 + + - name: busy-poll up to 100000 times + async_status: + jid: "{{async_proc1.ansible_job_id}}" + register: async_result1 + until: async_result1.finished + retries: 100000 + delay: 0 + + - name: get async process ID again. + custom_python_detect_environment: + register: async_proc2 + async: 1000 + poll: 0 + + - name: busy-poll up to 100000 times + async_status: + jid: "{{async_proc2.ansible_job_id}}" + register: async_result2 + until: async_result2.finished + retries: 100000 + delay: 0 + + - assert: + that: + - sync_proc1.pid == sync_proc2.pid + - async_result1.pid != sync_proc1.pid + - async_result1.pid != async_result2.pid + when: is_mitogen diff --git a/tests/ansible/integration/async/runner_one_job.yml b/tests/ansible/integration/async/runner_one_job.yml index 989a7cda..04ffc5ea 100644 --- a/tests/ansible/integration/async/runner_one_job.yml +++ b/tests/ansible/integration/async/runner_one_job.yml @@ -6,56 +6,6 @@ any_errors_fatal: true tasks: - # Verify async jobs run in a new process. - - - name: get process ID. - custom_python_detect_environment: - register: sync_proc1 - - - name: get process ID again. - custom_python_detect_environment: - register: sync_proc2 - - - assert: - that: - - sync_proc1.pid == sync_proc2.pid - when: is_mitogen - - - name: get async process ID. - custom_python_detect_environment: - register: async_proc1 - async: 1000 - poll: 0 - - - name: busy-poll up to 100000 times - async_status: - jid: "{{async_proc1.ansible_job_id}}" - register: async_result1 - until: async_result1.finished - retries: 100000 - delay: 0 - - - name: get async process ID again. - custom_python_detect_environment: - register: async_proc2 - async: 1000 - poll: 0 - - - name: busy-poll up to 100000 times - async_status: - jid: "{{async_proc2.ansible_job_id}}" - register: async_result2 - until: async_result2.finished - retries: 100000 - delay: 0 - - - assert: - that: - - sync_proc1.pid == sync_proc2.pid - - async_result1.pid != sync_proc1.pid - - async_result1.pid != async_result2.pid - when: is_mitogen - # Verify output of a single async job. - name: start 2 second op diff --git a/tests/ansible/integration/async/runner_timeout_then_polling.yml b/tests/ansible/integration/async/runner_timeout_then_polling.yml new file mode 100644 index 00000000..5490e711 --- /dev/null +++ b/tests/ansible/integration/async/runner_timeout_then_polling.yml @@ -0,0 +1,34 @@ +# Verify 'async: ' functions as desired. + +- name: integration/async/runner_timeout_then_polling.yml + hosts: test-targets + any_errors_fatal: true + tasks: + + # Verify async-with-timeout-then-poll behaviour. + # This is semi-broken in upstream Ansible, it does not bother to update the + # job file on failure. So only test on Mitogen. + + - name: sleep for 7 seconds, but timeout after 1 second. + shell: sleep 10 + async: 1 + poll: 0 + register: job + when: is_mitogen + + - name: busy-poll up to 500 times + async_status: + jid: "{{job.ansible_job_id}}" + register: result + until: result.finished + retries: 500 + delay: 0 + when: is_mitogen + ignore_errors: true + + - assert: + that: + - result.failed == 1 + - result.finished == 1 + - result.msg == "Job reached maximum time limit of 1 seconds." + when: is_mitogen diff --git a/tests/ansible/integration/async/runner_with_polling_and_timeout.yml b/tests/ansible/integration/async/runner_with_polling_and_timeout.yml new file mode 100644 index 00000000..6d87fe6c --- /dev/null +++ b/tests/ansible/integration/async/runner_with_polling_and_timeout.yml @@ -0,0 +1,24 @@ +# Verify 'async: ' functions as desired. + +- name: integration/async/runner_with_polling_and_timeout.yml + hosts: test-targets + any_errors_fatal: true + tasks: + + # Verify async-with-polling-and-timeout behaviour. + + - name: sleep for 7 seconds, but timeout after 1 second. + ignore_errors: true + shell: sleep 7 + async: 1 + poll: 1 + register: job1 + + - assert: + that: + - job1.changed == False + - job1.failed == True + - | + job1.msg == "async task did not complete within the requested time" or + job1.msg == "Job reached maximum time limit of 1 seconds." + diff --git a/tests/minimize_source_test.py b/tests/minimize_source_test.py index 857fc339..b98cdebd 100644 --- a/tests/minimize_source_test.py +++ b/tests/minimize_source_test.py @@ -1,7 +1,6 @@ import unittest2 -from mitogen.parent import minimize_source - +import mitogen.minify import testlib @@ -14,40 +13,42 @@ def read_sample(fname): class MinimizeSource(unittest2.TestCase): + func = staticmethod(mitogen.minify.minimize_source) + def test_class(self): original = read_sample('class.py') expected = read_sample('class_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_comment(self): original = read_sample('comment.py') expected = read_sample('comment_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_def(self): original = read_sample('def.py') expected = read_sample('def_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_hashbang(self): original = read_sample('hashbang.py') expected = read_sample('hashbang_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_mod(self): original = read_sample('mod.py') expected = read_sample('mod_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_pass(self): original = read_sample('pass.py') expected = read_sample('pass_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) def test_obstacle_course(self): original = read_sample('obstacle_course.py') expected = read_sample('obstacle_course_min.py') - self.assertEqual(expected, minimize_source(original)) + self.assertEqual(expected, self.func(original)) if __name__ == '__main__': diff --git a/tests/module_finder_test.py b/tests/module_finder_test.py index c4b65e11..1c77bdee 100644 --- a/tests/module_finder_test.py +++ b/tests/module_finder_test.py @@ -200,6 +200,7 @@ class FindRelatedTest(testlib.TestCase): 'mitogen.compat.functools', 'mitogen.core', 'mitogen.master', + 'mitogen.minify', 'mitogen.parent', ]) diff --git a/tests/responder_test.py b/tests/responder_test.py index 837beb3e..3f6f66a9 100644 --- a/tests/responder_test.py +++ b/tests/responder_test.py @@ -52,9 +52,9 @@ class BrokenModulesTest(unittest2.TestCase): responder = mitogen.master.ModuleResponder(router) responder._on_get_module(msg) - self.assertEquals(1, len(router.route.mock_calls)) + self.assertEquals(1, len(router._async_route.mock_calls)) - call = router.route.mock_calls[0] + call = router._async_route.mock_calls[0] msg, = call[1] self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle) self.assertEquals(('non_existent_module', None, None, None, ()), @@ -81,9 +81,9 @@ class BrokenModulesTest(unittest2.TestCase): responder = mitogen.master.ModuleResponder(router) responder._on_get_module(msg) - self.assertEquals(1, len(router.route.mock_calls)) + self.assertEquals(1, len(router._async_route.mock_calls)) - call = router.route.mock_calls[0] + call = router._async_route.mock_calls[0] msg, = call[1] self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle) self.assertIsInstance(msg.unpickle(), tuple)