Merge pull request #262 from dw/dmw

Fully functional async tasks, minify refactor, import trimming, module/script preloading/deduplication, service framework enhancements, fix logging deadlock
pull/274/head
dw 8 years ago committed by GitHub
commit 3a2f422725
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -31,7 +31,6 @@ import logging
import os
import shlex
import stat
import sys
import time
import jinja2.runtime
@ -58,6 +57,7 @@ def _connect_local(spec):
}
}
def wrap_or_none(klass, value):
if value is not None:
return klass(value)
@ -298,13 +298,17 @@ class Connection(ansible.plugins.connection.ConnectionBase):
router = None
#: mitogen.master.Context representing the parent Context, which is
#: presently always the master process.
#: presently always the connection multiplexer process.
parent = None
#: mitogen.master.Context connected to the target user account on the
#: target machine (i.e. via sudo).
context = None
#: mitogen.master.Context connected to the fork parent process in the
#: target user account.
fork_context = None
#: Only sudo and su are supported for now.
become_methods = ['sudo', 'su']
@ -336,7 +340,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
host_vars = None
#: Set after connection to the target context's home directory.
_homedir = None
home_dir = None
def __init__(self, play_context, new_stdin, **kwargs):
assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
@ -376,7 +380,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
@property
def homedir(self):
self._connect()
return self._homedir
return self.home_dir
@property
def connected(self):
@ -389,7 +393,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
raise ansible.errors.AnsibleConnectionFailure(
self.unknown_via_msg % (
self.mitogen_via,
config['inventory_name'],
inventory_name,
)
)
@ -470,15 +474,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
raise ansible.errors.AnsibleConnectionFailure(dct['msg'])
self.context = dct['context']
self._homedir = dct['home_dir']
def get_context_name(self):
"""
Return the name of the target context we issue commands against, i.e. a
unique string useful as a key for related data, such as a list of
modules uploaded to the target.
"""
return self.context.name
self.fork_context = dct['init_child_result']['fork_context']
self.home_dir = dct['init_child_result']['home_dir']
def close(self, new_task=False):
"""
@ -526,6 +523,17 @@ class Connection(ansible.plugins.connection.ConnectionBase):
LOG.debug('Call took %d ms: %s%r', 1000 * (time.time() - t0),
func.func_name, args)
def create_fork_child(self):
"""
Fork a new child off the target context. The actual fork occurs from
the 'virginal fork parent', which does not any Ansible modules prior to
fork, to avoid conflicts resulting from custom module_utils paths.
:returns:
mitogen.core.Context of the new child.
"""
return self.call(ansible_mitogen.target.create_fork_child)
def exec_command(self, cmd, in_data='', sudoable=True, mitogen_chdir=None):
"""
Implement exec_command() by calling the corresponding
@ -613,7 +621,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
utimes=(st.st_atime, st.st_mtime))
self.parent.call_service(
service_name='ansible_mitogen.services.FileService',
service_name='mitogen.service.FileService',
method_name='register',
path=mitogen.utils.cast(in_path)
)

@ -32,7 +32,6 @@ import logging
import os
import pwd
import shutil
import tempfile
import traceback
from ansible.module_utils._text import to_bytes
@ -43,11 +42,6 @@ import ansible.constants
import ansible.plugins
import ansible.plugins.action
try:
from ansible.plugins.loader import module_loader
except ImportError: # Ansible<2.4
from ansible.plugins import module_loader
import mitogen.core
import mitogen.select
import mitogen.utils
@ -294,6 +288,15 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
# ~root/.ansible -> /root/.ansible
return self.call(os.path.expanduser, mitogen.utils.cast(path))
def get_task_timeout_secs(self):
"""
Return the task "async:" value, portable across 2.4-2.5.
"""
try:
return self._task.async_val
except AttributeError:
return getattr(self._task, 'async')
def _execute_module(self, module_name=None, module_args=None, tmp=None,
task_vars=None, persist_files=False,
delete_remote_tmp=True, wrap_async=False):
@ -313,6 +316,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
env = {}
self._compute_environment_string(env)
self._connection._connect()
return ansible_mitogen.planner.invoke(
ansible_mitogen.planner.Invocation(
action=self,
@ -323,6 +327,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
templar=self._templar,
env=mitogen.utils.cast(env),
wrap_async=wrap_async,
timeout_secs=self.get_task_timeout_secs(),
)
)

@ -72,35 +72,42 @@ def is_pkg(module):
def find(name, path=(), parent=None):
"""
Return a Module instance describing the first matching module found on the
given search path.
search path.
:param str name:
Module name.
:param str path:
Search path.
:param list path:
List of directory names to search for the module.
:param Module parent:
If given, make the found module a child of this module.
Optional module parent.
"""
assert isinstance(path, tuple)
head, _, tail = name.partition('.')
try:
tup = imp.find_module(head, list(path))
except ImportError:
return parent
fp, path, (suffix, mode, kind) = tup
fp, modpath, (suffix, mode, kind) = tup
if parent and modpath == parent.path:
# 'from timeout import timeout', where 'timeout' is a function but also
# the name of the module being imported.
return None
if fp:
fp.close()
if kind == imp.PKG_DIRECTORY:
path = os.path.join(path, '__init__.py')
module = Module(head, path, kind, parent)
modpath = os.path.join(modpath, '__init__.py')
module = Module(head, modpath, kind, parent)
if tail:
return find_relative(module, tail, path)
return module
def find_relative(parent, name, path=()):
path = [os.path.dirname(parent.path)] + list(path)
if parent.kind == imp.PKG_DIRECTORY:
path = (os.path.dirname(parent.path),) + path
return find(name, path, parent=parent)

@ -35,8 +35,10 @@ files/modules known missing.
"""
from __future__ import absolute_import
import json
import logging
import os
import random
from ansible.executor import module_common
import ansible.errors
@ -50,9 +52,7 @@ except ImportError: # Ansible <2.4
from ansible.plugins import module_utils_loader
import mitogen
import mitogen.service
import ansible_mitogen.target
import ansible_mitogen.services
LOG = logging.getLogger(__name__)
@ -94,7 +94,7 @@ class Invocation(object):
target.run_module() or helpers.run_module_async() in the target context.
"""
def __init__(self, action, connection, module_name, module_args,
task_vars, templar, env, wrap_async):
task_vars, templar, env, wrap_async, timeout_secs):
#: ActionBase instance invoking the module. Required to access some
#: output postprocessing methods that don't belong in ActionBase at
#: all.
@ -114,7 +114,8 @@ class Invocation(object):
self.env = env
#: Boolean, if :py:data:`True`, launch the module asynchronously.
self.wrap_async = wrap_async
#: Integer, if >0, limit the time an asynchronous job may run for.
self.timeout_secs = timeout_secs
#: Initially ``None``, but set by :func:`invoke`. The path on the
#: master to the module's implementation file.
self.module_path = None
@ -132,20 +133,36 @@ class Planner(object):
file, indicates whether or not it understands how to run the module, and
exports a method to run the module.
"""
def detect(self, invocation):
def __init__(self, invocation):
self._inv = invocation
def detect(self):
"""
Return true if the supplied `invocation` matches the module type
implemented by this planner.
"""
raise NotImplementedError()
def get_should_fork(self, invocation):
def should_fork(self):
"""
Asynchronous tasks must always be forked.
"""
return invocation.wrap_async
return self._inv.wrap_async
def get_push_files(self):
"""
Return a list of files that should be propagated to the target context
using PushFileService. The default implementation pushes nothing.
"""
return []
def plan(self, invocation, **kwargs):
def get_module_deps(self):
"""
Return a list of the Python module names imported by the module.
"""
return []
def get_kwargs(self, **kwargs):
"""
If :meth:`detect` returned :data:`True`, plan for the module's
execution, including granting access to or delivering any files to it
@ -161,9 +178,7 @@ class Planner(object):
}
"""
kwargs.setdefault('emulate_tty', True)
kwargs.setdefault('service_context', invocation.connection.parent)
kwargs.setdefault('should_fork', self.get_should_fork(invocation))
kwargs.setdefault('wrap_async', invocation.wrap_async)
kwargs.setdefault('service_context', self._inv.connection.parent)
return kwargs
def __repr__(self):
@ -177,26 +192,19 @@ class BinaryPlanner(Planner):
"""
runner_name = 'BinaryRunner'
def detect(self, invocation):
return module_common._is_binary(invocation.module_source)
def detect(self):
return module_common._is_binary(self._inv.module_source)
def _grant_file_service_access(self, invocation):
invocation.connection._connect()
invocation.connection.parent.call_service(
service_name='ansible_mitogen.services.FileService',
method_name='register',
path=invocation.module_path,
)
def get_push_files(self):
return [self._inv.module_path]
def plan(self, invocation, **kwargs):
self._grant_file_service_access(invocation)
return super(BinaryPlanner, self).plan(
invocation=invocation,
def get_kwargs(self, **kwargs):
return super(BinaryPlanner, self).get_kwargs(
runner_name=self.runner_name,
module=invocation.module_name,
path=invocation.module_path,
args=invocation.module_args,
env=invocation.env,
module=self._inv.module_name,
path=self._inv.module_path,
args=self._inv.module_args,
env=self._inv.env,
**kwargs
)
@ -206,24 +214,25 @@ class ScriptPlanner(BinaryPlanner):
Common functionality for script module planners -- handle interpreter
detection and rewrite.
"""
def _get_interpreter(self, invocation):
interpreter, arg = parse_script_interpreter(invocation.module_source)
def _get_interpreter(self):
interpreter, arg = parse_script_interpreter(
self._inv.module_source
)
if interpreter is None:
raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % (
invocation.module_name,
self._inv.module_name,
))
key = u'ansible_%s_interpreter' % os.path.basename(interpreter).strip()
try:
template = invocation.task_vars[key].strip()
return invocation.templar.template(template), arg
template = self._inv.task_vars[key].strip()
return self._inv.templar.template(template), arg
except KeyError:
return interpreter, arg
def plan(self, invocation, **kwargs):
interpreter, arg = self._get_interpreter(invocation)
return super(ScriptPlanner, self).plan(
invocation=invocation,
def get_kwargs(self, **kwargs):
interpreter, arg = self._get_interpreter()
return super(ScriptPlanner, self).get_kwargs(
interpreter_arg=arg,
interpreter=interpreter,
**kwargs
@ -237,8 +246,8 @@ class JsonArgsPlanner(ScriptPlanner):
"""
runner_name = 'JsonArgsRunner'
def detect(self, invocation):
return module_common.REPLACER_JSONARGS in invocation.module_source
def detect(self):
return module_common.REPLACER_JSONARGS in self._inv.module_source
class WantJsonPlanner(ScriptPlanner):
@ -255,8 +264,8 @@ class WantJsonPlanner(ScriptPlanner):
"""
runner_name = 'WantJsonRunner'
def detect(self, invocation):
return 'WANT_JSON' in invocation.module_source
def detect(self):
return 'WANT_JSON' in self._inv.module_source
class NewStylePlanner(ScriptPlanner):
@ -267,53 +276,59 @@ class NewStylePlanner(ScriptPlanner):
"""
runner_name = 'NewStyleRunner'
def _get_interpreter(self, invocation):
def detect(self):
return 'from ansible.module_utils.' in self._inv.module_source
def _get_interpreter(self):
return None, None
def _grant_file_service_access(self, invocation):
"""
Stub out BinaryPlanner's method since ModuleDepService makes internal
calls to grant file access, avoiding 2 IPCs per task invocation.
"""
def get_push_files(self):
return super(NewStylePlanner, self).get_push_files() + [
path
for fullname, path, is_pkg in self.get_module_map()['custom']
]
def get_module_deps(self):
return self.get_module_map()['builtin']
def get_should_fork(self, invocation):
def should_fork(self):
"""
In addition to asynchronous tasks, new-style modules should be forked
if mitogen_task_isolation=fork.
if the user specifies mitogen_task_isolation=fork, or if the new-style
module has a custom module search path.
"""
return (
super(NewStylePlanner, self).get_should_fork(invocation) or
(invocation.task_vars.get('mitogen_task_isolation') == 'fork')
super(NewStylePlanner, self).should_fork() or
(self._inv.task_vars.get('mitogen_task_isolation') == 'fork') or
(len(self.get_module_map()['custom']) > 0)
)
def detect(self, invocation):
return 'from ansible.module_utils.' in invocation.module_source
def get_search_path(self, invocation):
def get_search_path(self):
return tuple(
path
for path in module_utils_loader._get_paths(subdirs=False)
if os.path.isdir(path)
)
def get_module_utils(self, invocation):
invocation.connection._connect()
return invocation.connection.parent.call_service(
service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan',
_module_map = None
module_name='ansible_module_%s' % (invocation.module_name,),
module_path=invocation.module_path,
search_path=self.get_search_path(invocation),
builtin_path=module_common._MODULE_UTILS_PATH,
)
def get_module_map(self):
if self._module_map is None:
self._module_map = self._inv.connection.parent.call_service(
service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan',
def plan(self, invocation):
module_utils = self.get_module_utils(invocation)
return super(NewStylePlanner, self).plan(
invocation,
module_utils=module_utils,
should_fork=(self.get_should_fork(invocation) or bool(module_utils)),
module_name='ansible_module_%s' % (self._inv.module_name,),
module_path=self._inv.module_path,
search_path=self.get_search_path(),
builtin_path=module_common._MODULE_UTILS_PATH,
context=self._inv.connection.context,
)
return self._module_map
def get_kwargs(self):
return super(NewStylePlanner, self).get_kwargs(
module_map=self.get_module_map(),
)
@ -343,14 +358,14 @@ class ReplacerPlanner(NewStylePlanner):
"""
runner_name = 'ReplacerRunner'
def detect(self, invocation):
return module_common.REPLACER in invocation.module_source
def detect(self):
return module_common.REPLACER in self._inv.module_source
class OldStylePlanner(ScriptPlanner):
runner_name = 'OldStyleRunner'
def detect(self, invocation):
def detect(self):
# Everything else.
return True
@ -372,24 +387,85 @@ def get_module_data(name):
return path, source
def invoke(invocation):
"""
Find a suitable Planner that knows how to run `invocation`.
"""
(invocation.module_path,
invocation.module_source) = get_module_data(invocation.module_name)
def _propagate_deps(invocation, planner, context):
invocation.connection.parent.call_service(
service_name='mitogen.service.PushFileService',
method_name='propagate_paths_and_modules',
context=context,
paths=planner.get_push_files(),
modules=planner.get_module_deps(),
)
def _invoke_async_task(invocation, planner):
job_id = '%016x' % random.randint(0, 2**64)
context = invocation.connection.create_fork_child()
_propagate_deps(invocation, planner, context)
context.call_no_reply(
ansible_mitogen.target.run_module_async,
job_id=job_id,
timeout_secs=invocation.timeout_secs,
kwargs=planner.get_kwargs(),
)
return {
'stdout': json.dumps({
# modules/utilities/logic/async_wrapper.py::_run_module().
'changed': True,
'started': 1,
'finished': 0,
'ansible_job_id': job_id,
})
}
def _invoke_forked_task(invocation, planner):
context = invocation.connection.create_fork_child()
_propagate_deps(invocation, planner, context)
try:
return context.call(
ansible_mitogen.target.run_module,
kwargs=planner.get_kwargs(),
)
finally:
context.shutdown()
def _get_planner(invocation):
for klass in _planners:
planner = klass()
if planner.detect(invocation):
planner = klass(invocation)
if planner.detect():
LOG.debug('%r accepted %r (filename %r)', planner,
invocation.module_name, invocation.module_path)
return invocation.action._postprocess_response(
invocation.connection.call(
ansible_mitogen.target.run_module,
planner.plan(invocation),
)
)
return planner
LOG.debug('%r rejected %r', planner, invocation.module_name)
raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation))
def invoke(invocation):
"""
Find a Planner subclass corresnding to `invocation` and use it to invoke
the module.
:param Invocation invocation:
:returns:
Module return dict.
:raises ansible.errors.AnsibleError:
Unrecognized/unsupported module type.
"""
(invocation.module_path,
invocation.module_source) = get_module_data(invocation.module_name)
planner = _get_planner(invocation)
if invocation.wrap_async:
response = _invoke_async_task(invocation, planner)
elif planner.should_fork():
response = _invoke_forked_task(invocation, planner)
else:
_propagate_deps(invocation, planner, invocation.connection.context)
response = invocation.connection.call(
ansible_mitogen.target.run_module,
kwargs=planner.get_kwargs(),
)
return invocation.action._postprocess_response(response)

@ -35,6 +35,7 @@ import sys
import mitogen
import mitogen.core
import mitogen.debug
import mitogen.master
import mitogen.parent
import mitogen.service
@ -135,7 +136,7 @@ class MuxProcess(object):
"""
Construct a Router, Broker, and mitogen.unix listener
"""
self.router = mitogen.master.Router(max_message_size=4096*1048576)
self.router = mitogen.master.Router(max_message_size=4096 * 1048576)
self.router.responder.whitelist_prefix('ansible')
self.router.responder.whitelist_prefix('ansible_mitogen')
mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown)
@ -145,22 +146,21 @@ class MuxProcess(object):
)
if 'MITOGEN_ROUTER_DEBUG' in os.environ:
self.router.enable_debug()
if 'MITOGEN_DUMP_THREAD_STACKS' in os.environ:
mitogen.debug.dump_to_logger()
def _setup_services(self):
"""
Construct a ContextService and a thread to service requests for it
arriving from worker processes.
"""
file_service = ansible_mitogen.services.FileService(router=self.router)
self.pool = mitogen.service.Pool(
router=self.router,
services=[
file_service,
mitogen.service.FileService(router=self.router),
mitogen.service.PushFileService(router=self.router),
ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.ModuleDepService(
router=self.router,
file_service=file_service,
),
ansible_mitogen.services.ModuleDepService(self.router),
],
size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')),
)

@ -42,12 +42,11 @@ import imp
import json
import logging
import os
import shutil
import sys
import tempfile
import types
import mitogen.service
import mitogen.core
import ansible_mitogen.target # TODO: circular import
try:
@ -111,17 +110,27 @@ class Runner(object):
Context to which we should direct FileService calls. For now, always
the connection multiplexer process on the controller.
:param dict args:
Ansible module arguments. A strange mixture of user and internal keys
created by ActionBase._execute_module().
Ansible module arguments. A mixture of user and internal keys created
by :meth:`ansible.plugins.action.ActionBase._execute_module`.
:param dict env:
Additional environment variables to set during the run.
:param mitogen.core.ExternalContext econtext:
When `detach` is :data:`True`, a reference to the ExternalContext the
runner is executing in.
:param bool detach:
When :data:`True`, indicate the runner should detach the context from
its parent after setup has completed successfully.
"""
def __init__(self, module, service_context, args=None, env=None):
def __init__(self, module, service_context, args=None, env=None,
econtext=None, detach=False):
if args is None:
args = {}
self.module = utf8(module)
self.service_context = service_context
self.econtext = econtext
self.detach = detach
self.args = args
self.env = env
@ -177,6 +186,9 @@ class Runner(object):
Module result dictionary.
"""
self.setup()
if self.detach:
self.econtext.detach()
try:
return self._run()
finally:
@ -208,7 +220,7 @@ class ModuleUtilsImporter(object):
def load_module(self, fullname):
path, is_pkg = self._by_fullname[fullname]
source = ansible_mitogen.target.get_file(self._context, path)
source = ansible_mitogen.target.get_small_file(self._context, path)
code = compile(source, path, 'exec')
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = "master:%s" % (path,)
@ -273,7 +285,7 @@ class ProgramRunner(Runner):
:param str path:
Absolute path to the program file on the master, as it can be retrieved
via :class:`ansible_mitogen.services.FileService`.
via :class:`mitogen.service.FileService`.
:param bool emulate_tty:
If :data:`True`, execute the program with `stdout` and `stderr` merged
into a single pipe, emulating Ansible behaviour when an SSH TTY is in
@ -315,7 +327,7 @@ class ProgramRunner(Runner):
"""
Fetch the module binary from the master if necessary.
"""
return ansible_mitogen.target.get_file(
return ansible_mitogen.target.get_small_file(
context=self.service_context,
path=self.path,
)
@ -443,12 +455,30 @@ class NewStyleRunner(ScriptRunner):
#: path => new-style module bytecode.
_code_by_path = {}
def __init__(self, module_utils, **kwargs):
def __init__(self, module_map, **kwargs):
super(NewStyleRunner, self).__init__(**kwargs)
self.module_utils = module_utils
self.module_map = module_map
def _setup_imports(self):
"""
Ensure the local importer and PushFileService has everything for the
Ansible module before setup() completes, but before detach() is called
in an asynchronous task.
The master automatically streams modules towards us concurrent to the
runner invocation, however there is no public API to synchronize on the
completion of those preloads. Instead simply reuse the importer's
synchronization mechanism by importing everything the module will need
prior to detaching.
"""
for fullname, _, _ in self.module_map['custom']:
mitogen.core.import_module(fullname)
for fullname in self.module_map['builtin']:
mitogen.core.import_module(fullname)
def setup(self):
super(NewStyleRunner, self).setup()
self._stdio = NewStyleStdio(self.args)
# It is possible that not supplying the script filename will break some
# module, but this has never been a bug report. Instead act like an
@ -456,8 +486,9 @@ class NewStyleRunner(ScriptRunner):
self._argv = TemporaryArgv([''])
self._importer = ModuleUtilsImporter(
context=self.service_context,
module_utils=self.module_utils,
module_utils=self.module_map['custom'],
)
self._setup_imports()
if libc__res_init:
libc__res_init()
@ -476,14 +507,12 @@ class NewStyleRunner(ScriptRunner):
pass
def _setup_program(self):
pass
def _get_code(self):
self.source = ansible_mitogen.target.get_file(
self.source = ansible_mitogen.target.get_small_file(
context=self.service_context,
path=self.path,
)
def _get_code(self):
try:
return self._code_by_path[self.path]
except KeyError:

@ -38,18 +38,13 @@ when a child has completed a job.
"""
from __future__ import absolute_import
import grp
import logging
import os
import os.path
import pwd
import stat
import sys
import threading
import zlib
import mitogen
import mitogen.master
import mitogen.service
import ansible_mitogen.module_finder
import ansible_mitogen.target
@ -227,6 +222,20 @@ class ContextService(mitogen.service.Service):
finally:
self._lock.release()
ALWAYS_PRELOAD = (
'ansible.module_utils.basic',
'ansible.module_utils.json_utils',
'ansible.release',
'ansible_mitogen.runner',
'ansible_mitogen.target',
'mitogen.fork',
'mitogen.service',
)
def _send_module_forwards(self, context):
for fullname in self.ALWAYS_PRELOAD:
self.router.responder.forward_module(context, fullname)
def _connect(self, key, spec, via=None):
"""
Actual connect implementation. Arranges for the Mitogen connection to
@ -242,13 +251,18 @@ class ContextService(mitogen.service.Service):
{
'context': mitogen.core.Context or None,
'home_dir': str or None,
'init_child_result': {
'fork_context': mitogen.core.Context,
'home_dir': str or None,
},
'msg': str or None
}
Where either `msg` is an error message and the remaining fields are
:data:`None`, or `msg` is :data:`None` and the remaining fields are
set.
Where `context` is a reference to the newly constructed context,
`init_child_result` is the result of executing
:func:`ansible_mitogen.target.init_child` in that context, `msg` is
an error message and the remaining fields are :data:`None`, or
`msg` is :data:`None` and the remaining fields are set.
"""
try:
method = getattr(self.router, spec['method'])
@ -266,11 +280,8 @@ class ContextService(mitogen.service.Service):
mitogen.core.listen(stream, 'disconnect',
lambda: self._on_stream_disconnect(stream))
home_dir = context.call(os.path.expanduser, '~')
# We don't need to wait for the result of this. Ideally we'd check its
# return value somewhere, but logs will catch a failure anyway.
context.call_async(ansible_mitogen.target.init_child)
self._send_module_forwards(context)
init_child_result = context.call(ansible_mitogen.target.init_child)
if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'):
from mitogen import debug
@ -280,7 +291,7 @@ class ContextService(mitogen.service.Service):
self._refs_by_context[context] = 0
return {
'context': context,
'home_dir': home_dir,
'init_child_result': init_child_result,
'msg': None,
}
@ -331,7 +342,7 @@ class ContextService(mitogen.service.Service):
:returns dict:
* context: mitogen.master.Context or None.
* homedir: Context's home directory or None.
* init_child_result: Result of :func:`init_child`.
* msg: StreamError exception text or None.
* method_name: string failing method name.
"""
@ -346,7 +357,7 @@ class ContextService(mitogen.service.Service):
except mitogen.core.StreamError as e:
return {
'context': None,
'home_dir': None,
'init_child_result': None,
'method_name': spec['method'],
'msg': str(e),
}
@ -354,288 +365,50 @@ class ContextService(mitogen.service.Service):
return result
class StreamState(object):
def __init__(self):
#: List of [(Sender, file object)]
self.jobs = []
self.completing = {}
#: In-flight byte count.
self.unacked = 0
#: Lock.
self.lock = threading.Lock()
class FileService(mitogen.service.Service):
"""
Streaming file server, used to serve small files like Ansible modules and
huge files like ISO images. Paths must be registered by a trusted context
before they will be served to a child.
Transfers are divided among the physical streams that connect external
contexts, ensuring each stream never has excessive data buffered in RAM,
while still maintaining enough to fully utilize available bandwidth. This
is achieved by making an initial bandwidth assumption, enqueueing enough
chunks to fill that assumed pipe, then responding to delivery
acknowledgements from the receiver by scheduling new chunks.
Transfers proceed one-at-a-time per stream. When multiple contexts exist on
a stream (e.g. one is the SSH account, another is a sudo account, and a
third is a proxied SSH connection), each request is satisfied in turn
before subsequent requests start flowing. This ensures when a stream is
contended, priority is given to completing individual transfers rather than
potentially aborting many partial transfers, causing the bandwidth to be
wasted.
Theory of operation:
1. Trusted context (i.e. WorkerProcess) calls register(), making a
file available to any untrusted context.
2. Requestee context creates a mitogen.core.Receiver() to receive
chunks, then calls fetch(path, recv.to_sender()), to set up the
transfer.
3. fetch() replies to the call with the file's metadata, then
schedules an initial burst up to the window size limit (1MiB).
4. Chunks begin to arrive in the requestee, which calls acknowledge()
for each 128KiB received.
5. The acknowledge() call arrives at FileService, which scheduled a new
chunk to refill the drained window back to the size limit.
6. When the last chunk has been pumped for a single transfer,
Sender.close() is called causing the receive loop in
target.py::_get_file() to exit, allowing that code to compare the
transferred size with the total file size from the metadata.
7. If the sizes mismatch, _get_file()'s caller is informed which will
discard the result and log/raise an error.
Shutdown:
1. process.py calls service.Pool.shutdown(), which arranges for the
service pool threads to exit and be joined, guranteeing no new
requests can arrive, before calling Service.on_shutdown() for each
registered service.
2. FileService.on_shutdown() walks every in-progress transfer and calls
Sender.close(), causing Receiver loops in the requestees to exit
early. The size check fails and any partially downloaded file is
discarded.
3. Control exits _get_file() in every target, and graceful shutdown can
proceed normally, without the associated thread needing to be
forcefully killed.
"""
unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context'
#: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which
#: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5.
window_size_bytes = 1048576
def __init__(self, router):
super(FileService, self).__init__(router)
#: Mapping of registered path -> file size.
self._metadata_by_path = {}
#: Mapping of Stream->StreamState.
self._state_by_stream = {}
def _name_or_none(self, func, n, attr):
try:
return getattr(func(n), attr)
except KeyError:
return None
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'paths': list
})
def register_many(self, paths):
"""
Batch version of register().
"""
for path in paths:
self.register(path)
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'path': basestring
})
def register(self, path):
"""
Authorize a path for access by children. Repeat calls with the same
path is harmless.
:param str path:
File path.
"""
if path in self._metadata_by_path:
return
st = os.stat(path)
if not stat.S_ISREG(st.st_mode):
raise IOError('%r is not a regular file.' % (in_path,))
LOG.debug('%r: registering %r', self, path)
self._metadata_by_path[path] = {
'size': st.st_size,
'mode': st.st_mode,
'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'),
'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'),
'mtime': st.st_mtime,
'atime': st.st_atime,
}
def on_shutdown(self):
"""
Respond to shutdown by sending close() to every target, allowing their
receive loop to exit and clean up gracefully.
"""
LOG.debug('%r.on_shutdown()', self)
for stream, state in self._state_by_stream.items():
state.lock.acquire()
try:
for sender, fp in reversed(state.jobs):
sender.close()
fp.close()
state.jobs.pop()
finally:
state.lock.release()
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
def _schedule_pending_unlocked(self, state):
"""
Consider the pending transfers for a stream, pumping new chunks while
the unacknowledged byte count is below :attr:`window_size_bytes`. Must
be called with the StreamState lock held.
:param StreamState state:
Stream to schedule chunks for.
"""
while state.jobs and state.unacked < self.window_size_bytes:
sender, fp = state.jobs[0]
s = fp.read(self.IO_SIZE)
if s:
state.unacked += len(s)
sender.send(mitogen.core.Blob(s))
else:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
sender.close()
fp.close()
state.jobs.pop(0)
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.no_reply()
@mitogen.service.arg_spec({
'path': basestring,
'sender': mitogen.core.Sender,
})
def fetch(self, path, sender, msg):
"""
Start a transfer for a registered path.
:param str path:
File path.
:param mitogen.core.Sender sender:
Sender to receive file data.
:returns:
Dict containing the file metadata:
* ``size``: File size in bytes.
* ``mode``: Integer file mode.
* ``owner``: Owner account name on host machine.
* ``group``: Owner group name on host machine.
* ``mtime``: Floating point modification time.
* ``ctime``: Floating point change time.
:raises Error:
Unregistered path, or Sender did not match requestee context.
"""
if path not in self._metadata_by_path:
raise Error(self.unregistered_msg)
if msg.src_id != sender.context.context_id:
raise Error(self.context_mismatch_msg)
LOG.debug('Serving %r', path)
fp = open(path, 'rb', self.IO_SIZE)
# Response must arrive first so requestee can begin receive loop,
# otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
# ~10Mbit/sec over a 100ms link.
msg.reply(self._metadata_by_path[path])
stream = self.router.stream_by_id(sender.context.context_id)
state = self._state_by_stream.setdefault(stream, StreamState())
state.lock.acquire()
try:
state.jobs.append((sender, fp))
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.no_reply()
@mitogen.service.arg_spec({
'size': int,
})
@mitogen.service.no_reply()
def acknowledge(self, size, msg):
"""
Acknowledge bytes received by a transfer target, scheduling new chunks
to keep the window full. This should be called for every chunk received
by the target.
"""
stream = self.router.stream_by_id(msg.src_id)
state = self._state_by_stream[stream]
state.lock.acquire()
try:
if state.unacked < size:
LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d',
self, msg.src_id, state.unacked, size)
state.unacked -= min(state.unacked, size)
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
class ModuleDepService(mitogen.service.Service):
"""
Scan a new-style module and produce a cached mapping of module_utils names
to their resolved filesystem paths.
"""
def __init__(self, file_service, **kwargs):
super(ModuleDepService, self).__init__(**kwargs)
self._file_service = file_service
def __init__(self, *args, **kwargs):
super(ModuleDepService, self).__init__(*args, **kwargs)
self._cache = {}
def _get_builtin_names(self, builtin_path, resolved):
return [
fullname
for fullname, path, is_pkg in resolved
if os.path.abspath(path).startswith(builtin_path)
]
def _get_custom_tups(self, builtin_path, resolved):
return [
(fullname, path, is_pkg)
for fullname, path, is_pkg in resolved
if not os.path.abspath(path).startswith(builtin_path)
]
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'module_name': basestring,
'module_path': basestring,
'search_path': tuple,
'builtin_path': basestring,
'context': mitogen.core.Context,
})
def scan(self, module_name, module_path, search_path, builtin_path):
if (module_name, search_path) not in self._cache:
def scan(self, module_name, module_path, search_path, builtin_path, context):
key = (module_name, search_path)
if key not in self._cache:
resolved = ansible_mitogen.module_finder.scan(
module_name=module_name,
module_path=module_path,
search_path=tuple(search_path) + (builtin_path,),
)
builtin_path = os.path.abspath(builtin_path)
filtered = [
(fullname, path, is_pkg)
for fullname, path, is_pkg in resolved
if not os.path.abspath(path).startswith(builtin_path)
]
self._cache[module_name, search_path] = filtered
# Grant FileService access to paths in here to avoid another 2 IPCs
# from WorkerProcess.
self._file_service.register(path=module_path)
for fullname, path, is_pkg in filtered:
self._file_service.register(path=path)
return self._cache[module_name, search_path]
builtin = self._get_builtin_names(builtin_path, resolved)
custom = self._get_custom_tups(builtin_path, resolved)
self._cache[key] = {
'builtin': builtin,
'custom': custom,
}
return self._cache[key]

@ -29,7 +29,6 @@
from __future__ import absolute_import
import os
import ansible.errors
import ansible_mitogen.mixins
import ansible_mitogen.process

@ -32,7 +32,6 @@ for file transfer, module execution and sundry bits like changing file modes.
"""
from __future__ import absolute_import
import cStringIO
import errno
import grp
import json
@ -40,17 +39,15 @@ import logging
import operator
import os
import pwd
import random
import re
import signal
import stat
import subprocess
import tempfile
import time
import traceback
import ansible.module_utils.json_utils
import ansible_mitogen.runner
import ansible_mitogen.services
import mitogen.core
import mitogen.fork
import mitogen.parent
@ -63,61 +60,12 @@ LOG = logging.getLogger(__name__)
#: the duration of the process.
temp_dir = None
#: Caching of fetched file data.
_file_cache = {}
#: Initialized to an econtext.parent.Context pointing at a pristine fork of
#: the target Python interpreter before it executes any code or imports.
_fork_parent = None
def _get_file(context, path, out_fp):
"""
Streamily download a file from the connection multiplexer process in the
controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used to
fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
:returns:
:data:`True` on success, or :data:`False` if the transfer was
interrupted and the output should be discarded.
"""
LOG.debug('_get_file(): fetching %r from %r', path, context)
t0 = time.time()
recv = mitogen.core.Receiver(router=context.router)
metadata = context.call_service(
service_name='ansible_mitogen.services.FileService',
method_name='fetch',
path=path,
sender=recv.to_sender(),
)
for chunk in recv:
s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
context.call_service_async(
service_name='ansible_mitogen.services.FileService',
method_name='acknowledge',
size=len(s),
).close()
out_fp.write(s)
ok = out_fp.tell() == metadata['size']
if not ok:
LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down.', path)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
metadata['size'], path, context, 1000 * (time.time() - t0))
return ok, metadata
def get_file(context, path):
def get_small_file(context, path):
"""
Basic in-memory caching module fetcher. This generates an one roundtrip for
every previously unseen file, so it is only a temporary solution.
@ -131,13 +79,9 @@ def get_file(context, path):
:returns:
Bytestring file data.
"""
if path not in _file_cache:
io = cStringIO.StringIO()
ok, metadata = _get_file(context, path, io)
if not ok:
raise IOError('transfer of %r was interrupted.' % (path,))
_file_cache[path] = io.getvalue()
return _file_cache[path]
pool = mitogen.service.get_or_create_pool(router=context.router)
service = pool.get_service('mitogen.service.PushFileService')
return service.get(path)
def transfer_file(context, in_path, out_path, sync=False, set_owner=False):
@ -170,7 +114,11 @@ def transfer_file(context, in_path, out_path, sync=False, set_owner=False):
try:
try:
ok, metadata = _get_file(context, in_path, fp)
ok, metadata = mitogen.service.FileService.get(
context=context,
path=in_path,
out_fp=fp,
)
if not ok:
raise IOError('transfer of %r was interrupted.' % (in_path,))
@ -261,50 +209,51 @@ def init_child(econtext):
This is necessary to prevent modules that are executed in-process from
polluting the global interpreter state in a way that effects explicitly
isolated modules.
:returns:
Dict like::
{
'fork_context': mitogen.core.Context.
'home_dir': str.
}
Where `fork_context` refers to the newly forked 'fork parent' context
the controller will use to start forked jobs, and `home_dir` is the
home directory for the active user account.
"""
global _fork_parent
mitogen.parent.upgrade_router(econtext)
_fork_parent = econtext.router.fork()
reset_temp_dir(econtext)
return {
'fork_context': _fork_parent,
'home_dir': os.path.expanduser('~'),
}
@mitogen.core.takes_econtext
def start_fork_child(wrap_async, kwargs, econtext):
def create_fork_child(econtext):
"""
For helper functions executed in the fork parent context, arrange for
the context's router to be upgraded as necessary and for a new child to be
prepared.
"""
mitogen.parent.upgrade_router(econtext)
context = econtext.router.fork()
context.call(reset_temp_dir)
if not wrap_async:
try:
return context.call(run_module, kwargs)
finally:
context.shutdown()
job_id = '%016x' % random.randint(0, 2**64)
context.call_async(run_module_async, job_id, kwargs)
return {
'stdout': json.dumps({
# modules/utilities/logic/async_wrapper.py::_run_module().
'changed': True,
'started': 1,
'finished': 0,
'ansible_job_id': job_id,
})
}
LOG.debug('create_fork_child() -> %r', context)
return context
@mitogen.core.takes_econtext
def run_module(kwargs, econtext):
def run_module(kwargs):
"""
Set up the process environment in preparation for running an Ansible
module. This monkey-patches the Ansible libraries in various places to
prevent it from trying to kill the process on completion, and to prevent it
from reading sys.stdin.
"""
should_fork = kwargs.pop('should_fork', False)
wrap_async = kwargs.pop('wrap_async', False)
if should_fork:
return _fork_parent.call(start_fork_child, wrap_async, kwargs)
runner_name = kwargs.pop('runner_name')
klass = getattr(ansible_mitogen.runner, runner_name)
impl = klass(**kwargs)
@ -335,21 +284,52 @@ def _write_job_status(job_id, dct):
os.rename(path + '.tmp', path)
def _run_module_async(job_id, kwargs, econtext):
def _sigalrm(broker, timeout_secs, job_id):
"""
Respond to SIGALRM (job timeout) by updating the job file and killing the
process.
"""
Body on run_module_async().
msg = "Job reached maximum time limit of %d seconds." % (timeout_secs,)
_write_job_status(job_id, {
"failed": 1,
"finished": 1,
"msg": msg,
})
broker.shutdown()
def _install_alarm(broker, timeout_secs, job_id):
handler = lambda *_: _sigalrm(broker, timeout_secs, job_id)
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_secs)
def _run_module_async(kwargs, job_id, timeout_secs, econtext):
"""
1. Immediately updates the status file to mark the job as started.
2. Installs a timer/signal handler to implement the time limit.
3. Runs as with run_module(), writing the result to the status file.
:param dict kwargs:
Runner keyword arguments.
:param str job_id:
String job ID.
:param int timeout_secs:
If >0, limit the task's maximum run time.
"""
_write_job_status(job_id, {
'started': 1,
'finished': 0
'finished': 0,
'pid': os.getpid()
})
if timeout_secs > 0:
_install_alarm(econtext.broker, timeout_secs, job_id)
kwargs['detach'] = True
kwargs['econtext'] = econtext
kwargs['emulate_tty'] = False
dct = run_module(kwargs, econtext)
dct = run_module(kwargs)
if mitogen.core.PY3:
for key in 'stdout', 'stderr':
dct[key] = dct[key].decode('utf-8', 'surrogateescape')
@ -373,18 +353,21 @@ def _run_module_async(job_id, kwargs, econtext):
@mitogen.core.takes_econtext
def run_module_async(job_id, kwargs, econtext):
def run_module_async(kwargs, job_id, timeout_secs, econtext):
"""
Since run_module_async() is invoked with .call_async(), with nothing to
read the result from the corresponding Receiver, wrap the body in an
exception logger, and wrap that in something that tears down the context on
completion.
Arrange for a module to be executed with its run status and result
serialized to a disk file. This function expects to run in a child forked
using :func:`create_fork_child`.
"""
try:
try:
_run_module_async(job_id, kwargs, econtext)
_run_module_async(kwargs, job_id, timeout_secs, econtext)
except Exception:
LOG.exception('_run_module_async crashed')
# Catch any (ansible_mitogen) bugs and write them to the job file.
_write_job_status(job_id, {
"failed": 1,
"msg": traceback.format_exc(),
})
finally:
econtext.broker.shutdown()

@ -69,6 +69,12 @@ Installation
per-run basis. Like ``mitogen_linear``, the ``mitogen_free`` strategy exists
to mimic the ``free`` strategy.
5. If targets have a restrictive ``sudoers`` file, add a rule like:
.. code-block:: plain
deploy = (ALL) NOPASSWD:/usr/bin/python -c*
Demo
~~~~
@ -134,9 +140,6 @@ Noteworthy Differences
artificial serialization, causing slowdown equivalent to `task_duration *
num_targets`. This will be fixed soon.
* Asynchronous jobs presently exist only for the duration of a run, and time
limits are not implemented.
* "Module Replacer" style modules are not supported. These rarely appear in
practice, and light web searches failed to reveal many examples of them.
@ -145,8 +148,8 @@ Noteworthy Differences
may be established in parallel by default, this can be modified by setting
the ``MITOGEN_POOL_SIZE`` environment variable.
* Performance does not scale perfectly linearly with target count. This will
improve over time.
* Performance does not scale linearly with target count. This will improve over
time.
* SSH and ``become`` are treated distinctly when applying timeouts, and
timeouts apply up to the point when the new interpreter is ready to accept
@ -195,11 +198,6 @@ container.
Connection delegation is a work in progress, bug reports are welcome.
* While imports are cached on intermediaries, module scripts are needlessly
reuploaded for each target. Fixing this is equivalent to implementing
**Topology-Aware File Synchronization**, so it may remain unfixed until
that feature is started.
* Delegated connection setup is single-threaded; only one connection can be
constructed in parallel per intermediary.
@ -642,6 +640,9 @@ is necessary. File-based logging can be enabled by setting
When file-based logging is enabled, one file per context will be created on the
local machine and every target machine, as ``/tmp/mitogen.<pid>.log``.
If you are experiencing a hang, ``MITOGEN_DUMP_THREAD_STACKS=1`` causes every
process to dump every thread stack into the logging framework every 5 seconds.
Getting Help
~~~~~~~~~~~~

@ -857,7 +857,7 @@ Context Class
.. method:: call (fn, \*args, \*\*kwargs)
Equivalent to :py:meth:`call_async(fn, \*args, \**kwargs).get_data()
Equivalent to :py:meth:`call_async(fn, \*args, \**kwargs).get().unpickle()
<call_async>`.
:returns:
@ -866,6 +866,14 @@ Context Class
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
.. method:: call_no_reply (fn, \*args, \*\*kwargs)
Send a function call, but expect no return value. If the call fails,
the full exception will be logged to the target context's logging framework.
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
Receiver Class

@ -40,6 +40,41 @@ and possibly your team and its successors with:
appropriate, prefer a higher level solution instead.
First Principles
----------------
Before starting, take a moment to reflect on writing a program that will
operate across machines and privilege domains:
* As with multithreaded programming, writing a program that spans multiple
hosts is exposed to many asynchrony issues. Unlike multithreaded programming,
the margin for unexpected failures is much higher, even between only two
peers, as communication may be fail at any moment, since that communication
depends on reliability of an external network.
* Since a multi-host program always spans trust and privilege domains, trust
must be taken into consideration in your design from the outset. Mitogen
attempts to protect the consuming application by default where possible,
however it is paramount that trust considerations are always in mind when
exposing any privileged functionality to a potentially untrusted network of
peers.
A parent must always assume data received from a child is suspect, and must
not base privileged control decisions on that data. As a small example, a
parent should not form a command to execute in a subprocess using strings
received from a child.
* As the program spans multiple hosts, its design will benefit from a strict
separation of program and data. This entails avoiding some common Python
idioms that rely on its ability to manipulate functions and closures as if
they were data, such as passing a lambda closed over some program state as a
callback parameter.
In the general case this is both difficult and unsafe to support in a
distributed program, and so (for now at least) it should be assumed this
functionality is unlikely to appear in future.
Broker And Router
-----------------
@ -330,6 +365,12 @@ Subclasses of built-in types must be undecorated using
:py:func:`mitogen.utils.cast`.
Test Your Design
----------------
``tc qdisc add dev eth0 root netem delay 250ms``
.. _troubleshooting:
Troubleshooting

@ -17,8 +17,9 @@ Overview
Service
* User-supplied class with explicitly exposed methods.
* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass).
* May be auto-imported/constructed in a child from a parent simply by calling it
* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass) by
default, but may use any naming scheme the configured activator understands.
* Children receive refusals if the class is not already activated by a aprent
* Has an associated Select instance which may be dynamically loaded with
receivers over time, on_message_received() invoked if any receiver becomes
@ -28,9 +29,12 @@ Invoker
* Abstracts mechanism for calling a service method and verifying permissions.
* Built-in 'service.Invoker': concurrent execution of all methods on the thread pool.
* Built-in 'service.SerializedInvoker': serialization of all calls on a single
thread borrowed from the pool while any request is pending.
* Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct
(method, kwargs) key, only one such method executes, return value is cached
and broadcast to all requesters.
(method, kwargs) key, only one such method ever executes, return value is
cached and broadcast to all request waiters. Waiters do not block additional
pool threads.
Activator

@ -75,10 +75,6 @@ IOLOG.setLevel(logging.INFO)
_v = False
_vv = False
# Also taken by Broker, no blocking work can occur with it held.
_service_call_lock = threading.Lock()
_service_calls = []
GET_MODULE = 100
CALL_FUNCTION = 101
FORWARD_LOG = 102
@ -555,6 +551,7 @@ class Importer(object):
'jail',
'lxc',
'master',
'minify',
'parent',
'select',
'service',
@ -622,6 +619,7 @@ class Importer(object):
return None
_tls.running = True
# TODO: hack: this is papering over a bug elsewhere.
fullname = fullname.rstrip('.')
try:
pkgname, dot, _ = fullname.rpartition('.')
@ -715,6 +713,8 @@ class Importer(object):
def load_module(self, fullname):
_v and LOG.debug('Importer.load_module(%r)', fullname)
# TODO: hack: this is papering over a bug elsewhere.
fullname = fullname.rstrip('.')
self._refuse_imports(fullname)
event = threading.Event()
@ -863,7 +863,7 @@ class Stream(BasicStream):
self._router = router
self.remote_id = remote_id
self.name = 'default'
self.sent_modules = set()
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.construct(**kwargs)
self._input_buf = collections.deque()
self._output_buf = collections.deque()
@ -1644,27 +1644,22 @@ class ExternalContext(object):
if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM)
def _service_stub_main(self, msg):
import mitogen.service
pool = mitogen.service.get_or_create_pool(router=self.router)
pool._receiver._on_receive(msg)
def _on_call_service_msg(self, msg):
"""
Stub CALL_SERVICE handler, push message on temporary queue and invoke
_on_stub_call() from the main thread.
Stub service handler. Start a thread to import the mitogen.service
implementation from, and deliver the message to the newly constructed
pool. This must be done as CALL_SERVICE for e.g. PushFileService may
race with a CALL_FUNCTION blocking the main thread waiting for a result
from that service.
"""
if msg.is_dead:
return
_service_call_lock.acquire()
try:
_service_calls.append(msg)
finally:
_service_call_lock.release()
self.router.route(
Message.pickled(
dst_id=mitogen.context_id,
handle=CALL_FUNCTION,
obj=('mitogen.service', None, '_on_stub_call', (), {}),
router=self.router,
)
)
if not msg.is_dead:
th = threading.Thread(target=self._service_stub_main, args=(msg,))
th.start()
def _on_shutdown_msg(self, msg):
_v and LOG.debug('_on_shutdown_msg(%r)', msg)
@ -1707,6 +1702,7 @@ class ExternalContext(object):
enable_profiling()
self.broker = Broker()
self.router = Router(self.broker)
self.router.debug = self.config.get('debug', False)
self.router.undirectional = self.config['unidirectional']
self.router.add_handler(
fn=self._on_shutdown_msg,
@ -1855,11 +1851,17 @@ class ExternalContext(object):
for msg in self.recv:
try:
msg.reply(self._dispatch_one(msg))
ret = self._dispatch_one(msg)
_v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret)
if msg.reply_to:
msg.reply(ret)
except Exception:
e = sys.exc_info()[1]
_v and LOG.debug('_dispatch_calls: %s', e)
msg.reply(CallError(e))
if msg.reply_to:
_v and LOG.debug('_dispatch_calls: %s', e)
msg.reply(CallError(e))
else:
LOG.exception('_dispatch_calls: %r', msg)
self.dispatch_stopped = True
def main(self):

@ -41,7 +41,6 @@ import time
import traceback
import mitogen.core
import mitogen.master
import mitogen.parent
@ -53,14 +52,26 @@ def _hex(n):
return '%08x' % n
def get_subclasses(klass):
"""
Rather than statically import every interesting subclass, forcing it all to
be transferred and potentially disrupting the debugged environment,
enumerate only those loaded in memory. Also returns the original class.
"""
stack = [klass]
seen = set()
while stack:
klass = stack.pop()
seen.add(klass)
stack.extend(klass.__subclasses__())
return seen
def get_routers():
kl
return {
_hex(id(router)): router
for klass in (
mitogen.core.Router,
mitogen.parent.Router,
mitogen.master.Router,
)
for klass in get_subclasses(mitogen.core.Router)
for router in gc.get_referrers(klass)
if isinstance(router, mitogen.core.Router)
}

@ -51,7 +51,7 @@ def fixup_prngs():
sys.modules['ssl'].RAND_add(s, 75.0)
def break_logging_locks():
def reset_logging_framework():
"""
After fork, ensure any logging.Handler locks are recreated, as a variety of
threads in the parent may have been using the logging package at the moment
@ -61,10 +61,19 @@ def break_logging_locks():
https://github.com/dw/mitogen/issues/150 for a full discussion.
"""
logging._lock = threading.RLock()
for name in logging.Logger.manager.loggerDict:
# The root logger does not appear in the loggerDict.
for name in [None] + list(logging.Logger.manager.loggerDict):
for handler in logging.getLogger(name).handlers:
handler.createLock()
root = logging.getLogger()
root.handlers = [
handler
for handler in root.handlers
if not isinstance(handler, mitogen.core.LogHandler)
]
def handle_child_crash():
"""
@ -125,10 +134,10 @@ class Stream(mitogen.parent.Stream):
handle_child_crash()
def _child_main(self, childfp):
reset_logging_framework() # Must be first!
fixup_prngs()
mitogen.core.Latch._on_fork()
mitogen.core.Side._on_fork()
break_logging_locks()
fixup_prngs()
if self.on_fork:
self.on_fork()
mitogen.core.set_block(childfp.fileno())

@ -52,8 +52,11 @@ if not hasattr(pkgutil, 'find_loader'):
# been kept intentionally 2.3 compatible so we can reuse it.
from mitogen.compat import pkgutil
import mitogen
import mitogen.core
import mitogen.minify
import mitogen.parent
from mitogen.core import IOLOG
from mitogen.core import LOG
@ -79,6 +82,19 @@ def get_child_modules(path):
return [name for _, name, _ in it]
def get_core_source():
"""
Master version of parent.get_core_source().
"""
source = inspect.getsource(mitogen.core)
return mitogen.minify.minimize_source(source)
if mitogen.is_master:
# TODO: find a less surprising way of installing this.
mitogen.parent.get_core_source = get_core_source
LOAD_CONST = dis.opname.index('LOAD_CONST')
IMPORT_NAME = dis.opname.index('IMPORT_NAME')
@ -290,8 +306,8 @@ class ModuleFinder(object):
"""Attempt to fetch source code via pkgutil. In an ideal world, this
would be the only required implementation of get_module()."""
loader = pkgutil.find_loader(fullname)
LOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r',
fullname, loader)
IOLOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r',
fullname, loader)
if not loader:
return
@ -523,11 +539,41 @@ class ModuleResponder(object):
self._cache[fullname] = tup
return tup
def _send_load_module(self, stream, msg, fullname):
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
msg.reply(self._build_tuple(fullname),
handle=mitogen.core.LOAD_MODULE)
stream.sent_modules.add(fullname)
def _send_load_module(self, stream, fullname):
if fullname not in stream.sent_modules:
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
self._router._async_route(
mitogen.core.Message.pickled(
self._build_tuple(fullname),
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
)
stream.sent_modules.add(fullname)
def _send_module_load_failed(self, stream, fullname):
stream.send(
mitogen.core.Message.pickled(
(fullname, None, None, None, ()),
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _send_module_and_related(self, stream, fullname):
try:
tup = self._build_tuple(fullname)
for name in tup[4]: # related
parent, _, _ = name.partition('.')
if parent != fullname and parent not in stream.sent_modules:
# Parent hasn't been sent, so don't load submodule yet.
continue
self._send_load_module(stream, name)
self._send_load_module(stream, fullname)
except Exception:
LOG.debug('While importing %r', fullname, exc_info=True)
self._send_module_load_failed(stream, fullname)
def _on_get_module(self, msg):
if msg.is_dead:
@ -540,25 +586,32 @@ class ModuleResponder(object):
LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream)
try:
tup = self._build_tuple(fullname)
for name in tup[4]: # related
parent, _, _ = name.partition('.')
if parent != fullname and parent not in stream.sent_modules:
# Parent hasn't been sent, so don't load submodule yet.
continue
self._send_module_and_related(stream, fullname)
if name in stream.sent_modules:
# Submodule has been sent already, skip.
continue
def _send_forward_module(self, stream, context, fullname):
if stream.remote_id != context.context_id:
stream.send(
mitogen.core.Message(
data='%s\x00%s' % (context.context_id, fullname),
handle=mitogen.core.FORWARD_MODULE,
dst_id=stream.remote_id,
)
)
self._send_load_module(stream, msg, name)
self._send_load_module(stream, msg, fullname)
def _forward_module(self, context, fullname):
IOLOG.debug('%r._forward_module(%r, %r)', self, context, fullname)
path = []
while fullname:
path.append(fullname)
fullname, _, _ = fullname.rpartition('.')
except Exception:
LOG.debug('While importing %r', fullname, exc_info=True)
msg.reply((fullname, None, None, None, ()),
handle=mitogen.core.LOAD_MODULE)
for fullname in reversed(path):
stream = self._router.stream_by_id(context.context_id)
self._send_module_and_related(stream, fullname)
self._send_forward_module(stream, context, fullname)
def forward_module(self, context, fullname):
self._router.broker.defer(self._forward_module, context, fullname)
class Broker(mitogen.core.Broker):
@ -652,7 +705,7 @@ class IdAllocator(object):
id_ = self.next_id
self.next_id += self.BLOCK_SIZE
end_id = id_ + self.BLOCK_SIZE
LOG.debug('%r: allocating (%d..%d]', self, id_, end_id)
LOG.debug('%r: allocating [%d..%d)', self, id_, end_id)
return id_, end_id
finally:
self.lock.release()
@ -666,5 +719,5 @@ class IdAllocator(object):
allocated = self.router.context_by_id(id_, msg.src_id)
LOG.debug('%r: allocating [%r..%r) to %r',
self, allocated, requestee, msg.src_id)
self, id_, last_id, requestee)
msg.reply((id_, last_id))

@ -0,0 +1,134 @@
# Copyright 2017, Alex Willmer
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import sys
try:
from cStringIO import StringIO as BytesIO
except ImportError:
from io import BytesIO
if sys.version_info < (2, 7, 11):
from mitogen.compat import tokenize
else:
import tokenize
try:
from functools import lru_cache
except ImportError:
from mitogen.compat.functools import lru_cache
@lru_cache()
def minimize_source(source):
"""Remove most comments and docstrings from Python source code.
"""
tokens = tokenize.generate_tokens(BytesIO(source).readline)
tokens = strip_comments(tokens)
tokens = strip_docstrings(tokens)
tokens = reindent(tokens)
return tokenize.untokenize(tokens)
def strip_comments(tokens):
"""Drop comment tokens from a `tokenize` stream.
Comments on lines 1-2 are kept, to preserve hashbang and encoding.
Trailing whitespace is remove from all lines.
"""
prev_typ = None
prev_end_col = 0
for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens:
if typ in (tokenize.NL, tokenize.NEWLINE):
if prev_typ in (tokenize.NL, tokenize.NEWLINE):
start_col = 0
else:
start_col = prev_end_col
end_col = start_col + 1
elif typ == tokenize.COMMENT and start_row > 2:
continue
prev_typ = typ
prev_end_col = end_col
yield typ, tok, (start_row, start_col), (end_row, end_col), line
def strip_docstrings(tokens):
"""Replace docstring tokens with NL tokens in a `tokenize` stream.
Any STRING token not part of an expression is deemed a docstring.
Indented docstrings are not yet recognised.
"""
stack = []
state = 'wait_string'
for t in tokens:
typ = t[0]
if state == 'wait_string':
if typ in (tokenize.NL, tokenize.COMMENT):
yield t
elif typ in (tokenize.DEDENT, tokenize.INDENT, tokenize.STRING):
stack.append(t)
elif typ == tokenize.NEWLINE:
stack.append(t)
start_line, end_line = stack[0][2][0], stack[-1][3][0]+1
for i in range(start_line, end_line):
yield tokenize.NL, '\n', (i, 0), (i,1), '\n'
for t in stack:
if t[0] in (tokenize.DEDENT, tokenize.INDENT):
yield t[0], t[1], (i+1, t[2][1]), (i+1, t[3][1]), t[4]
del stack[:]
else:
stack.append(t)
for t in stack: yield t
del stack[:]
state = 'wait_newline'
elif state == 'wait_newline':
if typ == tokenize.NEWLINE:
state = 'wait_string'
yield t
def reindent(tokens, indent=' '):
"""Replace existing indentation in a token steam, with `indent`.
"""
old_levels = []
old_level = 0
new_level = 0
for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens:
if typ == tokenize.INDENT:
old_levels.append(old_level)
old_level = len(tok)
new_level += 1
tok = indent * new_level
elif typ == tokenize.DEDENT:
old_level = old_levels.pop()
new_level -= 1
start_col = max(0, start_col - old_level + new_level)
if start_row == end_row:
end_col = start_col + len(tok)
yield typ, tok, (start_row, start_col), (end_row, end_col), line

@ -52,21 +52,6 @@ import zlib
# Absolute imports for <2.5.
select = __import__('select')
try:
from cStringIO import StringIO as BytesIO
except ImportError:
from io import BytesIO
if sys.version_info < (2, 7, 11):
from mitogen.compat import tokenize
else:
import tokenize
try:
from functools import lru_cache
except ImportError:
from mitogen.compat.functools import lru_cache
import mitogen.core
from mitogen.core import LOG
from mitogen.core import IOLOG
@ -82,101 +67,21 @@ def get_log_level():
return (LOG.level or logging.getLogger().level or logging.INFO)
def is_immediate_child(msg, stream):
"""
Handler policy that requires messages to arrive only from immediately
connected children.
"""
return msg.src_id == stream.remote_id
@lru_cache()
def minimize_source(source):
"""Remove most comments and docstrings from Python source code.
def get_core_source():
"""
tokens = tokenize.generate_tokens(BytesIO(source).readline)
tokens = strip_comments(tokens)
tokens = strip_docstrings(tokens)
tokens = reindent(tokens)
return tokenize.untokenize(tokens)
def strip_comments(tokens):
"""Drop comment tokens from a `tokenize` stream.
Comments on lines 1-2 are kept, to preserve hashbang and encoding.
Trailing whitespace is remove from all lines.
In non-masters, simply fetch the cached mitogen.core source code via the
import mechanism. In masters, this function is replaced with a version that
performs minification directly.
"""
prev_typ = None
prev_end_col = 0
for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens:
if typ in (tokenize.NL, tokenize.NEWLINE):
if prev_typ in (tokenize.NL, tokenize.NEWLINE):
start_col = 0
else:
start_col = prev_end_col
end_col = start_col + 1
elif typ == tokenize.COMMENT and start_row > 2:
continue
prev_typ = typ
prev_end_col = end_col
yield typ, tok, (start_row, start_col), (end_row, end_col), line
return inspect.getsource(mitogen.core)
def strip_docstrings(tokens):
"""Replace docstring tokens with NL tokens in a `tokenize` stream.
Any STRING token not part of an expression is deemed a docstring.
Indented docstrings are not yet recognised.
def is_immediate_child(msg, stream):
"""
stack = []
state = 'wait_string'
for t in tokens:
typ = t[0]
if state == 'wait_string':
if typ in (tokenize.NL, tokenize.COMMENT):
yield t
elif typ in (tokenize.DEDENT, tokenize.INDENT, tokenize.STRING):
stack.append(t)
elif typ == tokenize.NEWLINE:
stack.append(t)
start_line, end_line = stack[0][2][0], stack[-1][3][0]+1
for i in range(start_line, end_line):
yield tokenize.NL, '\n', (i, 0), (i,1), '\n'
for t in stack:
if t[0] in (tokenize.DEDENT, tokenize.INDENT):
yield t[0], t[1], (i+1, t[2][1]), (i+1, t[3][1]), t[4]
del stack[:]
else:
stack.append(t)
for t in stack: yield t
del stack[:]
state = 'wait_newline'
elif state == 'wait_newline':
if typ == tokenize.NEWLINE:
state = 'wait_string'
yield t
def reindent(tokens, indent=' '):
"""Replace existing indentation in a token steam, with `indent`.
Handler policy that requires messages to arrive only from immediately
connected children.
"""
old_levels = []
old_level = 0
new_level = 0
for typ, tok, (start_row, start_col), (end_row, end_col), line in tokens:
if typ == tokenize.INDENT:
old_levels.append(old_level)
old_level = len(tok)
new_level += 1
tok = indent * new_level
elif typ == tokenize.DEDENT:
old_level = old_levels.pop()
new_level -= 1
start_col = max(0, start_col - old_level + new_level)
if start_row == end_row:
end_col = start_col + len(tok)
yield typ, tok, (start_row, start_col), (end_row, end_col), line
return msg.src_id == stream.remote_id
def flags(names):
@ -498,8 +403,7 @@ def stream_by_method_name(name):
@mitogen.core.takes_econtext
def _proxy_connect(name, method_name, kwargs, econtext):
mitogen.parent.upgrade_router(econtext)
upgrade_router(econtext)
try:
context = econtext.router._connect(
klass=stream_by_method_name(method_name),
@ -921,11 +825,11 @@ class Stream(mitogen.core.Stream):
}
def get_preamble(self):
source = inspect.getsource(mitogen.core)
source = get_core_source()
source += '\nExternalContext(%r).main()\n' % (
self.get_econtext_config(),
)
return zlib.compress(minimize_source(source), 9)
return zlib.compress(source, 9)
create_child = staticmethod(create_child)
create_child_args = {}
@ -1008,6 +912,11 @@ class Context(mitogen.core.Context):
receiver = self.call_async(fn, *args, **kwargs)
return receiver.get().unpickle(throw_dead=False)
def call_no_reply(self, fn, *args, **kwargs):
LOG.debug('%r.call_no_reply(%r, *%r, **%r)',
self, fn, args, kwargs)
self.send(make_call_msg(fn, *args, **kwargs))
def shutdown(self, wait=False):
LOG.debug('%r.shutdown() sending SHUTDOWN', self)
latch = mitogen.core.Latch()
@ -1327,7 +1236,7 @@ class ModuleForwarder(object):
if msg.is_dead:
return
context_id_s, fullname = msg.data.partition('\x00')
context_id_s, _, fullname = msg.data.partition('\x00')
context_id = int(context_id_s)
stream = self.router.stream_by_id(context_id)
if stream.remote_id == mitogen.parent_id:
@ -1335,15 +1244,18 @@ class ModuleForwarder(object):
self, context_id, fullname)
return
if fullname in stream.sent_modules:
return
LOG.debug('%r._on_forward_module() sending %r to %r via %r',
self, fullname, context_id, stream.remote_id)
self._send_module_and_related(stream, fullname)
if stream.remote_id != context_id:
stream._send(
mitogen.core.Message(
dst_id=stream.remote_id,
handle=mitogen.core.FORWARD_MODULE,
data=msg.data,
handle=mitogen.core.FORWARD_MODULE,
dst_id=stream.remote_id,
)
)

@ -26,9 +26,15 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import grp
import os
import os.path
import pprint
import pwd
import stat
import sys
import threading
import time
import mitogen.core
import mitogen.select
@ -37,34 +43,23 @@ from mitogen.core import LOG
DEFAULT_POOL_SIZE = 16
_pool = None
_pool_pid = None
#: Serialize pool construction.
_pool_lock = threading.Lock()
@mitogen.core.takes_router
def get_or_create_pool(size=None, router=None):
global _pool
if _pool is None:
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE)
return _pool
@mitogen.core.takes_router
def _on_stub_call(router):
"""
Called for each message received by the core.py stub CALL_SERVICE handler.
Create the pool if it doesn't already exist, and push enqueued messages
into the pool's receiver. This may be called more than once as the stub
service handler runs in asynchronous context, while _on_stub_call() happens
on the main thread. Multiple CALL_SERVICE may end up enqueued before Pool
has a chance to install the real CALL_SERVICE handler.
"""
pool = get_or_create_pool(router=router)
mitogen.core._service_call_lock.acquire()
global _pool_pid
_pool_lock.acquire()
try:
for msg in mitogen.core._service_calls:
pool._receiver._on_receive(msg)
del mitogen.core._service_calls[:]
if _pool_pid != os.getpid():
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE)
_pool_pid = os.getpid()
return _pool
finally:
mitogen.core._service_call_lock.release()
_pool_lock.release()
def validate_arg_spec(spec, args):
@ -149,6 +144,7 @@ class Error(Exception):
"""
Raised when an error occurs configuring a service or pool.
"""
pass # cope with minify_source() bug.
class Policy(object):
@ -183,12 +179,12 @@ class Activator(object):
def activate(self, pool, service_name, msg):
mod_name, _, class_name = service_name.rpartition('.')
if not self.is_permitted(mod_name, class_name, msg):
if msg and not self.is_permitted(mod_name, class_name, msg):
raise mitogen.core.CallError(self.not_active_msg, service_name)
module = mitogen.core.import_module(mod_name)
klass = getattr(module, class_name)
service = klass(pool.router)
service = klass(router=pool.router)
pool.add(service)
return service
@ -238,7 +234,8 @@ class Invoker(object):
except Exception:
if no_reply:
LOG.exception('While calling no-reply method %s.%s',
type(self).__name__, method.func_name)
type(self.service).__name__,
method.func_name)
else:
raise
@ -249,6 +246,50 @@ class Invoker(object):
msg.reply(response)
class SerializedInvoker(Invoker):
def __init__(self, **kwargs):
super(SerializedInvoker, self).__init__(**kwargs)
self._lock = threading.Lock()
self._queue = []
self._running = False
def _pop(self):
self._lock.acquire()
try:
try:
return self._queue.pop(0)
except IndexError:
self._running = False
finally:
self._lock.release()
def _run(self):
while True:
tup = self._pop()
if tup is None:
return
method_name, kwargs, msg = tup
try:
super(SerializedInvoker, self).invoke(method_name, kwargs, msg)
except Exception:
LOG.exception('%r: while invoking %r of %r',
self, method_name, self.service)
msg.reply(mitogen.core.Message.dead())
def invoke(self, method_name, kwargs, msg):
self._lock.acquire()
try:
self._queue.append((method_name, kwargs, msg))
first = not self._running
self._running = True
finally:
self._lock.release()
if first:
self._run()
return Service.NO_REPLY
class DeduplicatingInvoker(Invoker):
"""
A service that deduplicates and caches expensive responses. Requests are
@ -398,6 +439,8 @@ class Pool(object):
thread.start()
self._threads.append(thread)
LOG.debug('%r: initialized', self)
@property
def size(self):
return len(self._threads)
@ -407,7 +450,7 @@ class Pool(object):
if name in self._invoker_by_name:
raise Error('service named %r already registered' % (name,))
assert service.select not in self._func_by_recv
invoker = service.invoker_class(service)
invoker = service.invoker_class(service=service)
self._invoker_by_name[name] = invoker
self._func_by_recv[service.select] = service.on_message
@ -427,13 +470,17 @@ class Pool(object):
invoker = self._invoker_by_name.get(name)
if not invoker:
service = self._activator.activate(self, name, msg)
invoker = service.invoker_class(service)
invoker = service.invoker_class(service=service)
self._invoker_by_name[name] = invoker
finally:
self._lock.release()
return invoker
def get_service(self, name):
invoker = self.get_invoker(name, None)
return invoker.service
def _validate(self, msg):
tup = msg.unpickle(throw=False)
if not (isinstance(tup, tuple) and
@ -454,7 +501,8 @@ class Pool(object):
LOG.warning('%r: call error: %s: %s', self, msg, e)
msg.reply(e)
except Exception:
LOG.exception('While invoking %r._invoke()', self)
LOG.exception('%r: while invoking %r of %r',
self, method_name, service_name)
e = sys.exc_info()[1]
msg.reply(mitogen.core.CallError(e))
@ -488,3 +536,405 @@ class Pool(object):
len(self._threads),
th.name,
)
class FileStreamState(object):
def __init__(self):
#: List of [(Sender, file object)]
self.jobs = []
self.completing = {}
#: In-flight byte count.
self.unacked = 0
#: Lock.
self.lock = threading.Lock()
class PushFileService(Service):
"""
Push-based file service. Files are delivered and cached in RAM, sent
recursively from parent to child. A child that requests a file via
:meth:`get` will block until it has ben delivered by a parent.
This service will eventually be merged into FileService.
"""
invoker_class = SerializedInvoker
def __init__(self, **kwargs):
super(PushFileService, self).__init__(**kwargs)
self._lock = threading.Lock()
self._cache = {}
self._waiters = {}
self._sent_by_stream = {}
def get(self, path):
self._lock.acquire()
try:
if path in self._cache:
return self._cache[path]
waiters = self._waiters.setdefault(path, [])
latch = mitogen.core.Latch()
waiters.append(lambda: latch.put(None))
finally:
self._lock.release()
LOG.debug('%r.get(%r) waiting for uncached file to arrive', self, path)
latch.get()
LOG.debug('%r.get(%r) -> %r', self, path, self._cache[path])
return self._cache[path]
def _forward(self, context, path):
stream = self.router.stream_by_id(context.context_id)
child = mitogen.core.Context(self.router, stream.remote_id)
sent = self._sent_by_stream.setdefault(stream, set())
if path in sent and child.context_id != context.context_id:
child.call_service_async(
service_name=self.name(),
method_name='forward',
path=path,
context=context
).close()
else:
child.call_service_async(
service_name=self.name(),
method_name='store_and_forward',
path=path,
data=self._cache[path],
context=context
).close()
@expose(policy=AllowParents())
@arg_spec({
'context': mitogen.core.Context,
'paths': list,
'modules': list,
})
def propagate_paths_and_modules(self, context, paths, modules):
"""
One size fits all method to ensure a target context has been preloaded
with a set of small files and Python modules.
"""
for path in paths:
self.propagate_to(context, path)
for fullname in modules:
self.router.responder.forward_module(context, fullname)
@expose(policy=AllowParents())
@arg_spec({
'context': mitogen.core.Context,
'path': basestring,
})
def propagate_to(self, context, path):
LOG.debug('%r.propagate_to(%r, %r)', self, context, path)
if path not in self._cache:
fp = open(path, 'rb')
try:
self._cache[path] = mitogen.core.Blob(fp.read())
finally:
fp.close()
self._forward(context, path)
def _store(self, path, data):
self._lock.acquire()
try:
self._cache[path] = data
return self._waiters.pop(path, [])
finally:
self._lock.release()
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
'path': basestring,
'data': mitogen.core.Blob,
'context': mitogen.core.Context,
})
def store_and_forward(self, path, data, context):
LOG.debug('%r.store_and_forward(%r, %r, %r)',
self, path, data, context)
waiters = self._store(path, data)
if context.context_id != mitogen.context_id:
self._forward(context, path)
for callback in waiters:
callback()
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
'path': basestring,
'context': mitogen.core.Context,
})
def forward(self, path, context):
LOG.debug('%r.forward(%r, %r)', self, path, context)
if path not in self._cache:
LOG.error('%r: %r is not in local cache', self, path)
return
self._forward(path, context)
class FileService(Service):
"""
Streaming file server, used to serve small and huge files alike. Paths must
be registered by a trusted context before they will be served to a child.
Transfers are divided among the physical streams that connect external
contexts, ensuring each stream never has excessive data buffered in RAM,
while still maintaining enough to fully utilize available bandwidth. This
is achieved by making an initial bandwidth assumption, enqueueing enough
chunks to fill that assumed pipe, then responding to delivery
acknowledgements from the receiver by scheduling new chunks.
Transfers proceed one-at-a-time per stream. When multiple contexts exist on
a stream (e.g. one is the SSH account, another is a sudo account, and a
third is a proxied SSH connection), each request is satisfied in turn
before subsequent requests start flowing. This ensures when a stream is
contended, priority is given to completing individual transfers rather than
potentially aborting many partial transfers, causing the bandwidth to be
wasted.
Theory of operation:
1. Trusted context (i.e. WorkerProcess) calls register(), making a
file available to any untrusted context.
2. Requestee context creates a mitogen.core.Receiver() to receive
chunks, then calls fetch(path, recv.to_sender()), to set up the
transfer.
3. fetch() replies to the call with the file's metadata, then
schedules an initial burst up to the window size limit (1MiB).
4. Chunks begin to arrive in the requestee, which calls acknowledge()
for each 128KiB received.
5. The acknowledge() call arrives at FileService, which scheduled a new
chunk to refill the drained window back to the size limit.
6. When the last chunk has been pumped for a single transfer,
Sender.close() is called causing the receive loop in
target.py::_get_file() to exit, allowing that code to compare the
transferred size with the total file size from the metadata.
7. If the sizes mismatch, _get_file()'s caller is informed which will
discard the result and log/raise an error.
Shutdown:
1. process.py calls service.Pool.shutdown(), which arranges for the
service pool threads to exit and be joined, guranteeing no new
requests can arrive, before calling Service.on_shutdown() for each
registered service.
2. FileService.on_shutdown() walks every in-progress transfer and calls
Sender.close(), causing Receiver loops in the requestees to exit
early. The size check fails and any partially downloaded file is
discarded.
3. Control exits _get_file() in every target, and graceful shutdown can
proceed normally, without the associated thread needing to be
forcefully killed.
"""
unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context'
#: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which
#: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5.
window_size_bytes = 1048576
def __init__(self, router):
super(FileService, self).__init__(router)
#: Mapping of registered path -> file size.
self._metadata_by_path = {}
#: Mapping of Stream->FileStreamState.
self._state_by_stream = {}
def _name_or_none(self, func, n, attr):
try:
return getattr(func(n), attr)
except KeyError:
return None
@expose(policy=AllowParents())
@arg_spec({
'path': basestring,
})
def register(self, path):
"""
Authorize a path for access by children. Repeat calls with the same
path is harmless.
:param str path:
File path.
"""
if path in self._metadata_by_path:
return
st = os.stat(path)
if not stat.S_ISREG(st.st_mode):
raise IOError('%r is not a regular file.' % (in_path,))
LOG.debug('%r: registering %r', self, path)
self._metadata_by_path[path] = {
'size': st.st_size,
'mode': st.st_mode,
'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'),
'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'),
'mtime': st.st_mtime,
'atime': st.st_atime,
}
def on_shutdown(self):
"""
Respond to shutdown by sending close() to every target, allowing their
receive loop to exit and clean up gracefully.
"""
LOG.debug('%r.on_shutdown()', self)
for stream, state in self._state_by_stream.items():
state.lock.acquire()
try:
for sender, fp in reversed(state.jobs):
sender.close()
fp.close()
state.jobs.pop()
finally:
state.lock.release()
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
def _schedule_pending_unlocked(self, state):
"""
Consider the pending transfers for a stream, pumping new chunks while
the unacknowledged byte count is below :attr:`window_size_bytes`. Must
be called with the FileStreamState lock held.
:param FileStreamState state:
Stream to schedule chunks for.
"""
while state.jobs and state.unacked < self.window_size_bytes:
sender, fp = state.jobs[0]
s = fp.read(self.IO_SIZE)
if s:
state.unacked += len(s)
sender.send(mitogen.core.Blob(s))
else:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
sender.close()
fp.close()
state.jobs.pop(0)
@expose(policy=AllowAny())
@no_reply()
@arg_spec({
'path': basestring,
'sender': mitogen.core.Sender,
})
def fetch(self, path, sender, msg):
"""
Start a transfer for a registered path.
:param str path:
File path.
:param mitogen.core.Sender sender:
Sender to receive file data.
:returns:
Dict containing the file metadata:
* ``size``: File size in bytes.
* ``mode``: Integer file mode.
* ``owner``: Owner account name on host machine.
* ``group``: Owner group name on host machine.
* ``mtime``: Floating point modification time.
* ``ctime``: Floating point change time.
:raises Error:
Unregistered path, or Sender did not match requestee context.
"""
if path not in self._metadata_by_path:
raise Error(self.unregistered_msg)
if msg.src_id != sender.context.context_id:
raise Error(self.context_mismatch_msg)
LOG.debug('Serving %r', path)
fp = open(path, 'rb', self.IO_SIZE)
# Response must arrive first so requestee can begin receive loop,
# otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
# ~10Mbit/sec over a 100ms link.
msg.reply(self._metadata_by_path[path])
stream = self.router.stream_by_id(sender.context.context_id)
state = self._state_by_stream.setdefault(stream, FileStreamState())
state.lock.acquire()
try:
state.jobs.append((sender, fp))
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
@expose(policy=AllowAny())
@no_reply()
@arg_spec({
'size': int,
})
@no_reply()
def acknowledge(self, size, msg):
"""
Acknowledge bytes received by a transfer target, scheduling new chunks
to keep the window full. This should be called for every chunk received
by the target.
"""
stream = self.router.stream_by_id(msg.src_id)
state = self._state_by_stream[stream]
state.lock.acquire()
try:
if state.unacked < size:
LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d',
self, msg.src_id, state.unacked, size)
state.unacked -= min(state.unacked, size)
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
@classmethod
def get(cls, context, path, out_fp):
"""
Streamily download a file from the connection multiplexer process in
the controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used
to fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
:returns:
:data:`True` on success, or :data:`False` if the transfer was
interrupted and the output should be discarded.
"""
LOG.debug('get_file(): fetching %r from %r', path, context)
t0 = time.time()
recv = mitogen.core.Receiver(router=context.router)
metadata = context.call_service(
service_name=cls.name(),
method_name='fetch',
path=path,
sender=recv.to_sender(),
)
for chunk in recv:
s = chunk.unpickle()
LOG.debug('get_file(%r): received %d bytes', path, len(s))
context.call_service_async(
service_name=cls.name(),
method_name='acknowledge',
size=len(s),
).close()
out_fp.write(s)
ok = out_fp.tell() == metadata['size']
if not ok:
LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down.', path)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
metadata['size'], path, context, 1000 * (time.time() - t0))
return ok, metadata

@ -8,7 +8,9 @@ import zlib
import mitogen.fakessh
import mitogen.master
import mitogen.minify
import mitogen.parent
import mitogen.select
import mitogen.service
import mitogen.ssh
import mitogen.sudo
@ -34,16 +36,17 @@ print(
)
for mod in (
mitogen.master,
mitogen.parent,
mitogen.service,
mitogen.ssh,
mitogen.sudo,
mitogen.select,
mitogen.service,
mitogen.fakessh,
mitogen.master,
):
original = inspect.getsource(mod)
original_size = len(original)
minimized = mitogen.parent.minimize_source(original)
minimized = mitogen.minify.minimize_source(original)
minimized_size = len(minimized)
compressed = zlib.compress(minimized, 9)
compressed_size = len(compressed)

@ -1,3 +1,6 @@
[defaults]
strategy_plugins = ../../../ansible_mitogen/plugins/strategy
strategy = mitogen
inventory = hosts
retry_files_enabled = False
host_key_checking = False

@ -1,9 +1,6 @@
- hosts: controller
tasks:
- shell: "rsync -a ~/.ssh {{inventory_hostname}}:"
connection: local
- lineinfile:
line: "net.ipv4.ip_forward=1"
path: /etc/sysctl.conf
@ -30,6 +27,10 @@
- libsasl2-dev
- build-essential
- git
- rsync
- shell: "rsync -a ~/.ssh {{inventory_hostname}}:"
connection: local
- git:
dest: ~/mitogen
@ -39,7 +40,7 @@
- git:
dest: ~/ansible
repo: https://github.com/dw/ansible.git
version: lazy-vars
version: dmw
- pip:
virtualenv: ~/venv

@ -1,5 +1,8 @@
- import_playbook: result_binary_producing_json.yml
- import_playbook: result_binary_producing_junk.yml
- import_playbook: result_shell_echo_hi.yml
- import_playbook: runner_new_process.yml
- import_playbook: runner_one_job.yml
- import_playbook: runner_timeout_then_polling.yml
- import_playbook: runner_with_polling_and_timeout.yml
- import_playbook: runner_two_simultaneous_jobs.yml

@ -10,7 +10,21 @@
poll: 0
register: job
- shell: sleep 1
- assert:
that: |
job.ansible_job_id and
(job.changed == True) and
(job.started == 1) and
(job.changed == True) and
(job.finished == 0)
- name: busy-poll up to 100000 times
async_status:
jid: "{{job.ansible_job_id}}"
register: result
until: result.finished
retries: 100000
delay: 0
- slurp:
src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}"

@ -1,52 +0,0 @@
# Verify 'async: <timeout>' functions as desired.
- name: integration/async/runner_job_timeout.yml
hosts: test-targets
any_errors_fatal: true
tasks:
# Verify async-with-polling-and-timeout behaviour.
- name: sleep for 7 seconds, but timeout after 1 second.
ignore_errors: true
shell: sleep 7
async: 1
poll: 1
register: job1
- assert:
that:
- job1.changed == False
- job1.failed == True
- job1.msg == "async task did not complete within the requested time"
- job1.keys()|sort == ['changed', 'failed', 'msg']
# Verify async-with-timeout-then-poll behaviour.
# This is broken in upstream Ansible, so disable the tests there.
#
# TODO: the tests below are totally broken, not clear what Ansible is
# supposed to do here, so can't emulate it in Mitogen.
- name: sleep for 7 seconds, but timeout after 1 second.
ignore_errors: true
shell: sleep 7
async: 1
poll: 0
register: job2
when: false # is_mitogen
- name: poll up to 10 times.
async_status:
jid: "{{job2.ansible_job_id}}"
register: result2
until: result2.finished
retries: 10
delay: 1
when: false # is_mitogen
- assert:
that:
- result1.rc == 0
- result2.rc == 0
- result2.stdout == 'im_alive'
when: false # is_mitogen

@ -0,0 +1,54 @@
# Verify async jobs run in a new process.
- name: integration/async/runner_new_process.yml
hosts: test-targets
any_errors_fatal: true
tasks:
- name: get process ID.
custom_python_detect_environment:
register: sync_proc1
- name: get process ID again.
custom_python_detect_environment:
register: sync_proc2
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
when: is_mitogen
- name: get async process ID.
custom_python_detect_environment:
register: async_proc1
async: 1000
poll: 0
- name: busy-poll up to 100000 times
async_status:
jid: "{{async_proc1.ansible_job_id}}"
register: async_result1
until: async_result1.finished
retries: 100000
delay: 0
- name: get async process ID again.
custom_python_detect_environment:
register: async_proc2
async: 1000
poll: 0
- name: busy-poll up to 100000 times
async_status:
jid: "{{async_proc2.ansible_job_id}}"
register: async_result2
until: async_result2.finished
retries: 100000
delay: 0
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
- async_result1.pid != sync_proc1.pid
- async_result1.pid != async_result2.pid
when: is_mitogen

@ -6,56 +6,6 @@
any_errors_fatal: true
tasks:
# Verify async jobs run in a new process.
- name: get process ID.
custom_python_detect_environment:
register: sync_proc1
- name: get process ID again.
custom_python_detect_environment:
register: sync_proc2
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
when: is_mitogen
- name: get async process ID.
custom_python_detect_environment:
register: async_proc1
async: 1000
poll: 0
- name: busy-poll up to 100000 times
async_status:
jid: "{{async_proc1.ansible_job_id}}"
register: async_result1
until: async_result1.finished
retries: 100000
delay: 0
- name: get async process ID again.
custom_python_detect_environment:
register: async_proc2
async: 1000
poll: 0
- name: busy-poll up to 100000 times
async_status:
jid: "{{async_proc2.ansible_job_id}}"
register: async_result2
until: async_result2.finished
retries: 100000
delay: 0
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
- async_result1.pid != sync_proc1.pid
- async_result1.pid != async_result2.pid
when: is_mitogen
# Verify output of a single async job.
- name: start 2 second op

@ -0,0 +1,34 @@
# Verify 'async: <timeout>' functions as desired.
- name: integration/async/runner_timeout_then_polling.yml
hosts: test-targets
any_errors_fatal: true
tasks:
# Verify async-with-timeout-then-poll behaviour.
# This is semi-broken in upstream Ansible, it does not bother to update the
# job file on failure. So only test on Mitogen.
- name: sleep for 7 seconds, but timeout after 1 second.
shell: sleep 10
async: 1
poll: 0
register: job
when: is_mitogen
- name: busy-poll up to 500 times
async_status:
jid: "{{job.ansible_job_id}}"
register: result
until: result.finished
retries: 500
delay: 0
when: is_mitogen
ignore_errors: true
- assert:
that:
- result.failed == 1
- result.finished == 1
- result.msg == "Job reached maximum time limit of 1 seconds."
when: is_mitogen

@ -0,0 +1,24 @@
# Verify 'async: <timeout>' functions as desired.
- name: integration/async/runner_with_polling_and_timeout.yml
hosts: test-targets
any_errors_fatal: true
tasks:
# Verify async-with-polling-and-timeout behaviour.
- name: sleep for 7 seconds, but timeout after 1 second.
ignore_errors: true
shell: sleep 7
async: 1
poll: 1
register: job1
- assert:
that:
- job1.changed == False
- job1.failed == True
- |
job1.msg == "async task did not complete within the requested time" or
job1.msg == "Job reached maximum time limit of 1 seconds."

@ -1,7 +1,6 @@
import unittest2
from mitogen.parent import minimize_source
import mitogen.minify
import testlib
@ -14,40 +13,42 @@ def read_sample(fname):
class MinimizeSource(unittest2.TestCase):
func = staticmethod(mitogen.minify.minimize_source)
def test_class(self):
original = read_sample('class.py')
expected = read_sample('class_min.py')
self.assertEqual(expected, minimize_source(original))
self.assertEqual(expected, self.func(original))
def test_comment(self):
original = read_sample('comment.py')
expected = read_sample('comment_min.py')
self.assertEqual(expected, minimize_source(original))
self.assertEqual(expected, self.func(original))
def test_def(self):
original = read_sample('def.py')
expected = read_sample('def_min.py')
self.assertEqual(expected, minimize_source(original))
self.assertEqual(expected, self.func(original))
def test_hashbang(self):
original = read_sample('hashbang.py')
expected = read_sample('hashbang_min.py')
self.assertEqual(expected, minimize_source(original))
self.assertEqual(expected, self.func(original))
def test_mod(self):
original = read_sample('mod.py')
expected = read_sample('mod_min.py')
self.assertEqual(expected, minimize_source(original))
self.assertEqual(expected, self.func(original))
def test_pass(self):
original = read_sample('pass.py')
expected = read_sample('pass_min.py')
self.assertEqual(expected, minimize_source(original))
self.assertEqual(expected, self.func(original))
def test_obstacle_course(self):
original = read_sample('obstacle_course.py')
expected = read_sample('obstacle_course_min.py')
self.assertEqual(expected, minimize_source(original))
self.assertEqual(expected, self.func(original))
if __name__ == '__main__':

@ -200,6 +200,7 @@ class FindRelatedTest(testlib.TestCase):
'mitogen.compat.functools',
'mitogen.core',
'mitogen.master',
'mitogen.minify',
'mitogen.parent',
])

@ -52,9 +52,9 @@ class BrokenModulesTest(unittest2.TestCase):
responder = mitogen.master.ModuleResponder(router)
responder._on_get_module(msg)
self.assertEquals(1, len(router.route.mock_calls))
self.assertEquals(1, len(router._async_route.mock_calls))
call = router.route.mock_calls[0]
call = router._async_route.mock_calls[0]
msg, = call[1]
self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle)
self.assertEquals(('non_existent_module', None, None, None, ()),
@ -81,9 +81,9 @@ class BrokenModulesTest(unittest2.TestCase):
responder = mitogen.master.ModuleResponder(router)
responder._on_get_module(msg)
self.assertEquals(1, len(router.route.mock_calls))
self.assertEquals(1, len(router._async_route.mock_calls))
call = router.route.mock_calls[0]
call = router._async_route.mock_calls[0]
msg, = call[1]
self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle)
self.assertIsInstance(msg.unpickle(), tuple)

Loading…
Cancel
Save