Merge pull request #255 from dw/dmw

Dmw
pull/274/head
dw 6 years ago committed by GitHub
commit c24d29d367
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -458,13 +458,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
) )
) )
dct = mitogen.service.call( dct = self.parent.call_service(
context=self.parent, service_name='ansible_mitogen.services.ContextService',
handle=ansible_mitogen.services.ContextService.handle, method_name='get',
method='get', stack=mitogen.utils.cast(list(stack)),
kwargs=mitogen.utils.cast({
'stack': stack,
})
) )
if dct['msg']: if dct['msg']:
@ -490,13 +487,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
multiple times. multiple times.
""" """
if self.context: if self.context:
mitogen.service.call( self.parent.call_service(
context=self.parent, service_name='ansible_mitogen.services.ContextService',
handle=ansible_mitogen.services.ContextService.handle, method_name='put',
method='put', context=self.context
kwargs={
'context': self.context
}
) )
self.context = None self.context = None
@ -618,13 +612,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return self.put_data(out_path, s, mode=st.st_mode, return self.put_data(out_path, s, mode=st.st_mode,
utimes=(st.st_atime, st.st_mtime)) utimes=(st.st_atime, st.st_mtime))
mitogen.service.call( self.parent.call_service(
context=self.parent, service_name='ansible_mitogen.services.FileService',
handle=ansible_mitogen.services.FileService.handle, method_name='register',
method='register', path=mitogen.utils.cast(in_path)
kwargs={
'path': mitogen.utils.cast(in_path)
}
) )
self.call( self.call(
ansible_mitogen.target.transfer_file, ansible_mitogen.target.transfer_file,

@ -49,7 +49,7 @@ except ImportError: # Ansible<2.4
from ansible.plugins import module_loader from ansible.plugins import module_loader
import mitogen.core import mitogen.core
import mitogen.master import mitogen.select
import mitogen.utils import mitogen.utils
import ansible_mitogen.connection import ansible_mitogen.connection
@ -253,7 +253,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
""" """
LOG.debug('_remote_chmod(%r, mode=%r, sudoable=%r)', LOG.debug('_remote_chmod(%r, mode=%r, sudoable=%r)',
paths, mode, sudoable) paths, mode, sudoable)
return self.fake_shell(lambda: mitogen.master.Select.all( return self.fake_shell(lambda: mitogen.select.Select.all(
self._connection.call_async( self._connection.call_async(
ansible_mitogen.target.set_file_mode, path, mode ansible_mitogen.target.set_file_mode, path, mode
) )
@ -268,7 +268,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
LOG.debug('_remote_chown(%r, user=%r, sudoable=%r)', LOG.debug('_remote_chown(%r, user=%r, sudoable=%r)',
paths, user, sudoable) paths, user, sudoable)
ent = self.call(pwd.getpwnam, user) ent = self.call(pwd.getpwnam, user)
return self.fake_shell(lambda: mitogen.master.Select.all( return self.fake_shell(lambda: mitogen.select.Select.all(
self._connection.call_async( self._connection.call_async(
os.chown, path, ent.pw_uid, ent.pw_gid os.chown, path, ent.pw_uid, ent.pw_gid
) )

@ -1,80 +1,89 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
import collections
import imp import imp
import os import os
import sys
import mitogen.master
class Name(object):
def __str__(self):
return self.identifier
def __repr__(self):
return 'Name(%r)' % (self.identifier,)
def __init__(self, identifier): import mitogen.master
self.identifier = identifier
def head(self):
head, _, tail = self.identifier.partition('.')
return head
def tail(self): PREFIX = 'ansible.module_utils.'
head, _, tail = self.identifier.partition('.')
return tail
def pop_n(self, level):
name = self.identifier
for _ in xrange(level):
if '.' not in name:
return None
name, _, _ = self.identifier.rpartition('.')
return Name(name)
def append(self, part): Module = collections.namedtuple('Module', 'name path kind parent')
return Name('%s.%s' % (self.identifier, part))
class Module(object): def get_fullname(module):
def __init__(self, name, path, kind=imp.PY_SOURCE, parent=None): """
self.name = Name(name) Reconstruct a Module's canonical path by recursing through its parents.
self.path = path """
if kind == imp.PKG_DIRECTORY: bits = [str(module.name)]
self.path = os.path.join(self.path, '__init__.py') while module.parent:
self.kind = kind bits.append(str(module.parent.name))
self.parent = parent module = module.parent
return '.'.join(reversed(bits))
def fullname(self):
bits = [str(self.name)]
while self.parent:
bits.append(str(self.parent.name))
self = self.parent
return '.'.join(reversed(bits))
def __repr__(self): def get_code(module):
return 'Module(%r, path=%r, parent=%r)' % ( """
self.name, Compile and return a Module's code object.
self.path, """
self.parent, fp = open(module.path)
) try:
return compile(fp.read(), str(module.name), 'exec')
finally:
fp.close()
def dirname(self):
return os.path.dirname(self.path)
def code(self): def is_pkg(module):
fp = open(self.path) """
try: Return :data:`True` if a Module represents a package.
return compile(fp.read(), str(self.name), 'exec') """
finally: return module.kind == imp.PKG_DIRECTORY
fp.close()
def find(name, path=(), parent=None): def find(name, path=(), parent=None):
""" """
(Name, search path) -> Module instance or None. Return a Module instance describing the first matching module found on the
given search path.
:param str name:
Module name.
:param str path:
Search path.
:param Module parent:
If given, make the found module a child of this module.
""" """
head, _, tail = name.partition('.')
try: try:
tup = imp.find_module(name.head(), list(path)) tup = imp.find_module(head, list(path))
except ImportError: except ImportError:
return parent return parent
@ -82,56 +91,54 @@ def find(name, path=(), parent=None):
if fp: if fp:
fp.close() fp.close()
module = Module(name.head(), path, kind, parent) if kind == imp.PKG_DIRECTORY:
if name.tail(): path = os.path.join(path, '__init__.py')
return find_relative(module, Name(name.tail()), path) module = Module(head, path, kind, parent)
if tail:
return find_relative(module, tail, path)
return module return module
def find_relative(parent, name, path=()): def find_relative(parent, name, path=()):
path = [parent.dirname()] + list(path) path = [os.path.dirname(parent.path)] + list(path)
return find(name, path, parent=parent) return find(name, path, parent=parent)
def path_pop(s, n): def scan_fromlist(code):
return os.pathsep.join(s.split(os.pathsep)[-n:]) for level, modname_s, fromlist in mitogen.master.scan_code_imports(code):
for name in fromlist:
yield level, '%s.%s' % (modname_s, name)
def scan(module, path): if not fromlist:
scanner = mitogen.master.scan_code_imports(module.code()) yield level, modname_s
for level, modname_s, fromlist in scanner:
modname = Name(modname_s)
if level == -1:
imported = find_relative(module, modname, path)
elif level:
subpath = [path_pop(module.dirname(), level)] + list(path)
imported = find(modname.pop_n(level), subpath)
else:
imported = find(modname.pop_n(level), path)
if imported and mitogen.master.is_stdlib_path(imported.path):
continue
if imported and fromlist:
have = False
for fromname_s in fromlist:
fromname = modname.append(fromname_s)
f_imported = find_relative(imported, fromname, path)
if f_imported and f_imported.fullname() == fromname.identifier:
have = True
yield fromname, f_imported, None
if have:
continue
if imported:
yield modname, imported
def scan(module_name, module_path, search_path):
module = Module(module_name, module_path, imp.PY_SOURCE, None)
stack = [module]
seen = set()
module = Module(name='ansible_module_apt', path='/Users/dmw/src/mitogen/.venv/lib/python2.7/site-packages/ansible/modules/packaging/os/apt.py') while stack:
path = tuple(sys.path) module = stack.pop(0)
path = ('/Users/dmw/src/ansible/lib',) + path for level, fromname in scan_fromlist(get_code(module)):
if not fromname.startswith(PREFIX):
continue
imported = find(fromname[len(PREFIX):], search_path)
if imported is None or imported in seen:
continue
from pprint import pprint seen.add(imported)
for name, imported in scan(module, sys.path): stack.append(imported)
print '%s: %s' % (name, imported and (str(name) == imported.fullname())) parent = imported.parent
while parent:
fullname = get_fullname(parent)
module = Module(fullname, parent.path, parent.kind, None)
if module not in seen:
seen.add(module)
stack.append(module)
parent = parent.parent
return sorted(
(PREFIX + get_fullname(module), module.path, is_pkg(module))
for module in seen
)

@ -35,17 +35,19 @@ files/modules known missing.
""" """
from __future__ import absolute_import from __future__ import absolute_import
import json
import logging import logging
import os import os
from ansible.executor import module_common from ansible.executor import module_common
import ansible.errors import ansible.errors
import ansible.module_utils
try: try:
from ansible.plugins.loader import module_loader from ansible.plugins.loader import module_loader
from ansible.plugins.loader import module_utils_loader
except ImportError: # Ansible <2.4 except ImportError: # Ansible <2.4
from ansible.plugins import module_loader from ansible.plugins import module_loader
from ansible.plugins import module_utils_loader
import mitogen import mitogen
import mitogen.service import mitogen.service
@ -178,16 +180,16 @@ class BinaryPlanner(Planner):
def detect(self, invocation): def detect(self, invocation):
return module_common._is_binary(invocation.module_source) return module_common._is_binary(invocation.module_source)
def plan(self, invocation, **kwargs): def _grant_file_service_access(self, invocation):
invocation.connection._connect() invocation.connection._connect()
mitogen.service.call( invocation.connection.parent.call_service(
context=invocation.connection.parent, service_name='ansible_mitogen.services.FileService',
handle=ansible_mitogen.services.FileService.handle, method_name='register',
method='register', path=invocation.module_path,
kwargs={
'path': invocation.module_path
}
) )
def plan(self, invocation, **kwargs):
self._grant_file_service_access(invocation)
return super(BinaryPlanner, self).plan( return super(BinaryPlanner, self).plan(
invocation=invocation, invocation=invocation,
runner_name=self.runner_name, runner_name=self.runner_name,
@ -228,36 +230,6 @@ class ScriptPlanner(BinaryPlanner):
) )
class ReplacerPlanner(BinaryPlanner):
"""
The Module Replacer framework is the original framework implementing
new-style modules. It is essentially a preprocessor (like the C
Preprocessor for those familiar with that programming language). It does
straight substitutions of specific substring patterns in the module file.
There are two types of substitutions.
* Replacements that only happen in the module file. These are public
replacement strings that modules can utilize to get helpful boilerplate
or access to arguments.
"from ansible.module_utils.MOD_LIB_NAME import *" is replaced with the
contents of the ansible/module_utils/MOD_LIB_NAME.py. These should only
be used with new-style Python modules.
"#<<INCLUDE_ANSIBLE_MODULE_COMMON>>" is equivalent to
"from ansible.module_utils.basic import *" and should also only apply to
new-style Python modules.
"# POWERSHELL_COMMON" substitutes the contents of
"ansible/module_utils/powershell.ps1". It should only be used with
new-style Powershell modules.
"""
runner_name = 'ReplacerRunner'
def detect(self, invocation):
return module_common.REPLACER in invocation.module_source
class JsonArgsPlanner(ScriptPlanner): class JsonArgsPlanner(ScriptPlanner):
""" """
Script that has its interpreter directive and the task arguments Script that has its interpreter directive and the task arguments
@ -298,6 +270,12 @@ class NewStylePlanner(ScriptPlanner):
def _get_interpreter(self, invocation): def _get_interpreter(self, invocation):
return None, None return None, None
def _grant_file_service_access(self, invocation):
"""
Stub out BinaryPlanner's method since ModuleDepService makes internal
calls to grant file access, avoiding 2 IPCs per task invocation.
"""
def get_should_fork(self, invocation): def get_should_fork(self, invocation):
""" """
In addition to asynchronous tasks, new-style modules should be forked In addition to asynchronous tasks, new-style modules should be forked
@ -311,8 +289,58 @@ class NewStylePlanner(ScriptPlanner):
def detect(self, invocation): def detect(self, invocation):
return 'from ansible.module_utils.' in invocation.module_source return 'from ansible.module_utils.' in invocation.module_source
def get_search_path(self, invocation):
return tuple(
path
for path in module_utils_loader._get_paths(subdirs=False)
if os.path.isdir(path)
)
def get_module_utils(self, invocation):
invocation.connection._connect()
return invocation.connection.parent.call_service(
service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan',
module_name='ansible_module_%s' % (invocation.module_name,),
module_path=invocation.module_path,
search_path=self.get_search_path(invocation),
builtin_path=module_common._MODULE_UTILS_PATH,
)
def plan(self, invocation):
module_utils = self.get_module_utils(invocation)
return super(NewStylePlanner, self).plan(
invocation,
module_utils=module_utils,
should_fork=(self.get_should_fork(invocation) or bool(module_utils)),
)
class ReplacerPlanner(NewStylePlanner): class ReplacerPlanner(NewStylePlanner):
"""
The Module Replacer framework is the original framework implementing
new-style modules. It is essentially a preprocessor (like the C
Preprocessor for those familiar with that programming language). It does
straight substitutions of specific substring patterns in the module file.
There are two types of substitutions.
* Replacements that only happen in the module file. These are public
replacement strings that modules can utilize to get helpful boilerplate
or access to arguments.
"from ansible.module_utils.MOD_LIB_NAME import *" is replaced with the
contents of the ansible/module_utils/MOD_LIB_NAME.py. These should only
be used with new-style Python modules.
"#<<INCLUDE_ANSIBLE_MODULE_COMMON>>" is equivalent to
"from ansible.module_utils.basic import *" and should also only apply to
new-style Python modules.
"# POWERSHELL_COMMON" substitutes the contents of
"ansible/module_utils/powershell.ps1". It should only be used with
new-style Powershell modules.
"""
runner_name = 'ReplacerRunner' runner_name = 'ReplacerRunner'
def detect(self, invocation): def detect(self, invocation):

@ -151,11 +151,16 @@ class MuxProcess(object):
Construct a ContextService and a thread to service requests for it Construct a ContextService and a thread to service requests for it
arriving from worker processes. arriving from worker processes.
""" """
file_service = ansible_mitogen.services.FileService(router=self.router)
self.pool = mitogen.service.Pool( self.pool = mitogen.service.Pool(
router=self.router, router=self.router,
services=[ services=[
file_service,
ansible_mitogen.services.ContextService(self.router), ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.FileService(self.router), ansible_mitogen.services.ModuleDepService(
router=self.router,
file_service=file_service,
),
], ],
size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')),
) )

@ -38,6 +38,7 @@ how to build arguments for it, preseed related data, etc.
from __future__ import absolute_import from __future__ import absolute_import
import cStringIO import cStringIO
import ctypes import ctypes
import imp
import json import json
import logging import logging
import os import os
@ -182,6 +183,46 @@ class Runner(object):
self.revert() self.revert()
class ModuleUtilsImporter(object):
"""
:param list module_utils:
List of `(fullname, path, is_pkg)` tuples.
"""
def __init__(self, context, module_utils):
self._context = context
self._by_fullname = {
fullname: (path, is_pkg)
for fullname, path, is_pkg in module_utils
}
self._loaded = set()
sys.meta_path.insert(0, self)
def revert(self):
sys.meta_path.remove(self)
for fullname in self._loaded:
sys.modules.pop(fullname, None)
def find_module(self, fullname, path=None):
if fullname in self._by_fullname:
return self
def load_module(self, fullname):
path, is_pkg = self._by_fullname[fullname]
source = ansible_mitogen.target.get_file(self._context, path)
code = compile(source, path, 'exec')
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = "master:%s" % (path,)
mod.__loader__ = self
if is_pkg:
mod.__path__ = []
mod.__package__ = fullname
else:
mod.__package__ = fullname.rpartition('.')[0]
exec(code, mod.__dict__)
self._loaded.add(fullname)
return mod
class TemporaryEnvironment(object): class TemporaryEnvironment(object):
def __init__(self, env=None): def __init__(self, env=None):
self.original = os.environ.copy() self.original = os.environ.copy()
@ -402,6 +443,10 @@ class NewStyleRunner(ScriptRunner):
#: path => new-style module bytecode. #: path => new-style module bytecode.
_code_by_path = {} _code_by_path = {}
def __init__(self, module_utils, **kwargs):
super(NewStyleRunner, self).__init__(**kwargs)
self.module_utils = module_utils
def setup(self): def setup(self):
super(NewStyleRunner, self).setup() super(NewStyleRunner, self).setup()
self._stdio = NewStyleStdio(self.args) self._stdio = NewStyleStdio(self.args)
@ -409,6 +454,10 @@ class NewStyleRunner(ScriptRunner):
# module, but this has never been a bug report. Instead act like an # module, but this has never been a bug report. Instead act like an
# interpreter that had its script piped on stdin. # interpreter that had its script piped on stdin.
self._argv = TemporaryArgv(['']) self._argv = TemporaryArgv([''])
self._importer = ModuleUtilsImporter(
context=self.service_context,
module_utils=self.module_utils,
)
if libc__res_init: if libc__res_init:
libc__res_init() libc__res_init()

@ -49,7 +49,9 @@ import threading
import zlib import zlib
import mitogen import mitogen
import mitogen.master
import mitogen.service import mitogen.service
import ansible_mitogen.module_finder
import ansible_mitogen.target import ansible_mitogen.target
@ -74,8 +76,6 @@ class ContextService(mitogen.service.Service):
processes and arranging for the worker to select one according to a hash of processes and arranging for the worker to select one according to a hash of
the connection parameters (sharding). the connection parameters (sharding).
""" """
handle = 500
max_message_size = 1000
max_interpreters = int(os.getenv('MITOGEN_MAX_INTERPRETERS', '20')) max_interpreters = int(os.getenv('MITOGEN_MAX_INTERPRETERS', '20'))
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -418,8 +418,6 @@ class FileService(mitogen.service.Service):
proceed normally, without the associated thread needing to be proceed normally, without the associated thread needing to be
forcefully killed. forcefully killed.
""" """
handle = 501
max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.' unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context' context_mismatch_msg = 'sender= kwarg context must match requestee context'
@ -440,6 +438,17 @@ class FileService(mitogen.service.Service):
except KeyError: except KeyError:
return None return None
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'paths': list
})
def register_many(self, paths):
"""
Batch version of register().
"""
for path in paths:
self.register(path)
@mitogen.service.expose(policy=mitogen.service.AllowParents()) @mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
'path': basestring 'path': basestring
@ -589,3 +598,44 @@ class FileService(mitogen.service.Service):
self._schedule_pending_unlocked(state) self._schedule_pending_unlocked(state)
finally: finally:
state.lock.release() state.lock.release()
class ModuleDepService(mitogen.service.Service):
"""
Scan a new-style module and produce a cached mapping of module_utils names
to their resolved filesystem paths.
"""
def __init__(self, file_service, **kwargs):
super(ModuleDepService, self).__init__(**kwargs)
self._file_service = file_service
self._cache = {}
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'module_name': basestring,
'module_path': basestring,
'search_path': tuple,
'builtin_path': basestring,
})
def scan(self, module_name, module_path, search_path, builtin_path):
if (module_name, search_path) not in self._cache:
resolved = ansible_mitogen.module_finder.scan(
module_name=module_name,
module_path=module_path,
search_path=tuple(search_path) + (builtin_path,),
)
builtin_path = os.path.abspath(builtin_path)
filtered = [
(fullname, path, is_pkg)
for fullname, path, is_pkg in resolved
if not os.path.abspath(path).startswith(builtin_path)
]
self._cache[module_name, search_path] = filtered
# Grant FileService access to paths in here to avoid another 2 IPCs
# from WorkerProcess.
self._file_service.register(path=module_path)
for fullname, path, is_pkg in filtered:
self._file_service.register(path=path)
return self._cache[module_name, search_path]

@ -90,26 +90,20 @@ def _get_file(context, path, out_fp):
LOG.debug('_get_file(): fetching %r from %r', path, context) LOG.debug('_get_file(): fetching %r from %r', path, context)
t0 = time.time() t0 = time.time()
recv = mitogen.core.Receiver(router=context.router) recv = mitogen.core.Receiver(router=context.router)
metadata = mitogen.service.call( metadata = context.call_service(
context=context, service_name='ansible_mitogen.services.FileService',
handle=ansible_mitogen.services.FileService.handle, method_name='fetch',
method='fetch', path=path,
kwargs={ sender=recv.to_sender(),
'path': path,
'sender': recv.to_sender()
}
) )
for chunk in recv: for chunk in recv:
s = chunk.unpickle() s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s)) LOG.debug('_get_file(%r): received %d bytes', path, len(s))
mitogen.service.call_async( context.call_service_async(
context=context, service_name='ansible_mitogen.services.FileService',
handle=ansible_mitogen.services.FileService.handle, method_name='acknowledge',
method='acknowledge', size=len(s),
kwargs={
'size': len(s),
}
).close() ).close()
out_fp.write(s) out_fp.write(s)
@ -518,7 +512,7 @@ def write_path(path, s, owner=None, group=None, mode=None,
prefix='.ansible_mitogen_transfer-', prefix='.ansible_mitogen_transfer-',
dir=os.path.dirname(path)) dir=os.path.dirname(path))
fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE) fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE)
LOG.debug('write_path(path=%r) tempory file: %s', path, tmp_path) LOG.debug('write_path(path=%r) temporary file: %s', path, tmp_path)
try: try:
try: try:

@ -1,8 +1,9 @@
-r docs/docs-requirements.txt -r docs/docs-requirements.txt
ansible==2.3.1.0 ansible==2.5.2
coverage==4.5.1 coverage==4.5.1
Django==1.6.11; python_version < '2.7' Django==1.6.11; python_version < '2.7'
Django==1.11.5; python_version >= '2.7' # for module_finder_test Django==1.11.5; python_version >= '2.7' # for module_finder_test
debops==0.7.2
https://github.com/docker/docker-py/archive/1.10.6.tar.gz; python_version < '2.7' https://github.com/docker/docker-py/archive/1.10.6.tar.gz; python_version < '2.7'
docker[tls]==2.5.1; python_version >= '2.7' docker[tls]==2.5.1; python_version >= '2.7'
mock==2.0.0 mock==2.0.0

@ -137,14 +137,6 @@ Noteworthy Differences
* Asynchronous jobs presently exist only for the duration of a run, and time * Asynchronous jobs presently exist only for the duration of a run, and time
limits are not implemented. limits are not implemented.
* Due to use of :func:`select.select` the IO multiplexer breaks down around 100
targets, expect performance degradation as this number is approached and
errant behaviour as it is exceeded. A replacement will appear soon.
* The undocumented ability to extend and override :mod:`ansible.module_utils`
by supplying a ``module_utils`` directory alongside a custom new-style module
is not yet supported.
* "Module Replacer" style modules are not supported. These rarely appear in * "Module Replacer" style modules are not supported. These rarely appear in
practice, and light web searches failed to reveal many examples of them. practice, and light web searches failed to reveal many examples of them.

@ -28,11 +28,7 @@ mitogen Package
mitogen.core mitogen.core
------------ ------------
.. module:: mitogen.core .. automodule:: mitogen.core
This module implements most package functionality, but remains separate from
non-essential code in order to reduce its size, since it is also serves as the
bootstrap implementation sent to every new slave context.
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. decorator:: takes_econtext .. decorator:: takes_econtext
@ -63,250 +59,25 @@ bootstrap implementation sent to every new slave context.
mitogen.master mitogen.master
-------------- --------------
.. module:: mitogen.master .. automodule:: mitogen.master
This module implements functionality required by master processes, such as
starting new contexts via SSH. Its size is also restricted, since it must
be sent to any context that will be used to establish additional child
contexts.
.. currentmodule:: mitogen.master
.. class:: Select (receivers=(), oneshot=True)
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:py:class:`mitogen.core.Receiver` or :py:class:`mitogen.master.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is ``True``, then remove each receiver as it yields a result;
since :py:meth:`__iter__` terminates once the final receiver is removed,
this makes it convenient to respond to calls made in parallel:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.master.Select(recvs):
print 'Got %s from %s' % (msg, msg.receiver)
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
:py:class:`Select` may drive a long-running scheduler:
.. code-block:: python
with mitogen.master.Select(oneshot=False) as select:
while running():
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:py:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.master.Select(get_some_work()),
mitogen.master.Select(get_some_work()),
mitogen.master.Select([
mitogen.master.Select(get_some_work()),
mitogen.master.Select(get_some_work())
])
]
for msg in mitogen.master.Select(selects):
print msg.unpickle()
.. py:classmethod:: all (it)
Take an iterable of receivers and retrieve a :py:class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch :py:class:`Context.call_async`
invocations:
.. code-block:: python
print('Total disk usage: %.02fMiB' % (sum(
mitogen.master.Select.all(
context.call_async(get_disk_usage)
for context in contexts
) / 1048576.0
),))
However, unlike in a naive comprehension such as:
.. code-block:: python
sum(context.call_async(get_disk_usage).get().unpickle() mitogen.parent
for context in contexts) --------------
Result processing happens concurrently to new results arriving, so
:py:meth:`all` should always be faster.
.. py:method:: get (timeout=None)
Fetch the next available value from any receiver, or raise
:py:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :py:attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:return:
:py:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the underlying latch is no
longer valid.
.. py:method:: __bool__ ()
Return ``True`` if any receivers are registered with this select.
.. py:method:: close ()
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :py:meth:`get` to be woken with
:py:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
used.
.. py:method:: empty ()
Return ``True`` if calling :py:meth:`get` would block.
As with :py:class:`Queue.Queue`, ``True`` may be returned even though a
subsequent call to :py:meth:`get` will succeed, since a message may be
posted at any moment between :py:meth:`empty` and :py:meth:`get`.
:py:meth:`empty` may return ``False`` even when :py:meth:`get` would
block if another thread has drained a receiver added to this select.
This can be avoided by only consuming each receiver from a single
thread.
.. py:method:: __iter__ (self)
Yield the result of :py:meth:`get` until no receivers remain in the
select, either because `oneshot` is ``True``, or each receiver was
explicitly removed via :py:meth:`remove`.
.. py:method:: add (recv)
Add the :py:class:`mitogen.core.Receiver` or .. automodule:: mitogen.parent
:py:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :py:meth:`remove`, then it will
still be returned by a subsequent :py:meth:`get`. This may change in a
future version.
mitogen.fakessh mitogen.fakessh
--------------- ---------------
.. module:: mitogen.fakessh .. image:: images/fakessh.png
:align: right
fakessh is a stream implementation that starts a local subprocess with its
environment modified such that ``PATH`` searches for `ssh` return an mitogen
implementation of the SSH command. When invoked, this tool arranges for the
command line supplied by the calling program to be executed in a context
already established by the master process, reusing the master's (possibly
proxied) connection to that context.
This allows tools like `rsync` and `scp` to transparently reuse the connections
and tunnels already established by the host program to connect to a target
machine, without wasteful redundant SSH connection setup, 3-way handshakes, or
firewall hopping configurations, and enables these tools to be used in
impossible scenarios, such as over `sudo` with ``requiretty`` enabled.
The fake `ssh` command source is written to a temporary file on disk, and
consists of a copy of the :py:mod:`mitogen.core` source code (just like any
other child context), with a line appended to cause it to connect back to the
host process over an FD it inherits. As there is no reliance on an existing
filesystem file, it is possible for child contexts to use fakessh.
As a consequence of connecting back through an inherited FD, only one SSH
invocation is possible, which is fine for tools like `rsync`, however in future
this restriction will be lifted.
Sequence:
1. ``fakessh`` Context and Stream created by parent context. The stream's
buffer has a :py:func:`_fakessh_main` :py:data:`CALL_FUNCTION
<mitogen.core.CALL_FUNCTION>` enqueued.
2. Target program (`rsync/scp/sftp`) invoked, which internally executes
`ssh` from ``PATH``.
3. :py:mod:`mitogen.core` bootstrap begins, recovers the stream FD
inherited via the target program, established itself as the fakessh
context.
4. :py:func:`_fakessh_main` :py:data:`CALL_FUNCTION
<mitogen.core.CALL_FUNCTION>` is read by fakessh context,
a. sets up :py:class:`IoPump` for stdio, registers
stdin_handle for local context.
b. Enqueues :py:data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>` for
:py:func:`_start_slave` invoked in target context,
i. the program from the `ssh` command line is started
ii. sets up :py:class:`IoPump` for `ssh` command line process's
stdio pipes
iii. returns `(control_handle, stdin_handle)` to
:py:func:`_fakessh_main`
5. :py:func:`_fakessh_main` receives control/stdin handles from from
:py:func:`_start_slave`,
a. registers remote's stdin_handle with local :py:class:`IoPump`.
b. sends `("start", local_stdin_handle)` to remote's control_handle
c. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker`.
d. loops waiting for `local stdout closed && remote stdout closed`
6. :py:func:`_start_slave` control channel receives `("start", stdin_handle)`,
a. registers remote's stdin_handle with local :py:class:`IoPump`
b. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker`.
c. loops waiting for `local stdout closed && remote stdout closed`
.. automodule:: mitogen.fakessh
.. currentmodule:: mitogen.fakessh .. currentmodule:: mitogen.fakessh
.. function:: run (dest, router, args, daedline=None, econtext=None) .. autofunction:: run (dest, router, args, daedline=None, econtext=None)
Run the command specified by the argument vector `args` such that ``PATH``
searches for SSH by the command will cause its attempt to use SSH to
execute a remote program to be redirected to use mitogen to execute that
program using the context `dest` instead.
:param mitogen.core.Context dest:
The destination context to execute the SSH command line in.
:param mitogen.core.Router router:
:param list[str] args:
Command line arguments for local program, e.g.
``['rsync', '/tmp', 'remote:/tmp']``
:returns:
Exit status of the child process.
Message Class Message Class
@ -316,6 +87,11 @@ Message Class
.. class:: Message .. class:: Message
Messages are the fundamental unit of communication, comprising the fields
from in the :ref:`stream-protocol` header, an optional reference to the
receiving :class:`mitogen.core.Router` for ingress messages, and helper
methods for deserialization and generating replies.
.. attribute:: router .. attribute:: router
The :py:class:`mitogen.core.Router` responsible for routing the The :py:class:`mitogen.core.Router` responsible for routing the
@ -324,21 +100,41 @@ Message Class
.. attribute:: receiver .. attribute:: receiver
The :py:class:`mitogen.core.Receiver` over which the message was last The :py:class:`mitogen.core.Receiver` over which the message was last
received. Part of the :py:class:`mitogen.master.Select` interface. received. Part of the :py:class:`mitogen.select.Select` interface.
Defaults to :py:data:`None`. Defaults to :py:data:`None`.
.. attribute:: dst_id .. attribute:: dst_id
Integer target context ID. :py:class:`mitogen.core.Router` delivers
messages locally when their :attr:`dst_id` matches
:data:`mitogen.context_id`, otherwise they are routed up or downstream.
.. attribute:: src_id .. attribute:: src_id
Integer source context ID. Used as the target of replies if any are
generated.
.. attribute:: auth_id .. attribute:: auth_id
The context ID under whose authority the message is acting. See
:py:ref:`source-verification`.
.. attribute:: handle .. attribute:: handle
Integer target handle in the destination context. This is one of the
:py:ref:`standard-handles`, or a dynamically generated handle used to
receive a one-time reply, such as the return value of a function call.
.. attribute:: reply_to .. attribute:: reply_to
Integer target handle to direct any reply to this message. Used to
receive a one-time reply, such as the return value of a function call.
:data:`IS_DEAD` has a special meaning when it appears in this field.
.. attribute:: data .. attribute:: data
Message data, which may be raw or pickled.
.. attribute:: is_dead .. attribute:: is_dead
:data:`True` if :attr:`reply_to` is set to the magic value :data:`True` if :attr:`reply_to` is set to the magic value
@ -389,7 +185,6 @@ Message Class
Router Class Router Class
============ ============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. class:: Router .. class:: Router
@ -402,6 +197,24 @@ Router Class
**Note:** This is the somewhat limited core version of the Router class **Note:** This is the somewhat limited core version of the Router class
used by child contexts. The master subclass is documented below this one. used by child contexts. The master subclass is documented below this one.
.. attribute:: unidirectional
When :data:`True`, permit children to only communicate with the current
context or a parent of the current context. Routing between siblings or
children of parents is prohibited, ensuring no communication is
possible between intentionally partitioned networks, such as when a
program simultaneously manipulates hosts spread across a corporate and
a production network, or production networks that are otherwise
air-gapped.
Sending a prohibited message causes an error to be logged and a dead
message to be sent in reply to the errant message, if that message has
``reply_to`` set.
The value of :data:`unidirectional` becomes the default for the
:meth:`local() <mitogen.master.Router.local>` `unidirectional`
parameter.
.. method:: stream_by_id (dst_id) .. method:: stream_by_id (dst_id)
Return the :py:class:`mitogen.core.Stream` that should be used to Return the :py:class:`mitogen.core.Stream` that should be used to
@ -523,13 +336,13 @@ Router Class
:py:class:`Broker` instance to use. If not specified, a private :py:class:`Broker` instance to use. If not specified, a private
:py:class:`Broker` is created. :py:class:`Broker` is created.
.. data:: profiling .. attribute:: profiling
When enabled, causes the broker thread and any subsequent broker and When :data:`True`, cause the broker thread and any subsequent broker
main threads existing in any child to write and main threads existing in any child to write
``/tmp/mitogen.stats.<pid>.<thread_name>.log`` containing a ``/tmp/mitogen.stats.<pid>.<thread_name>.log`` containing a
:py:mod:`cProfile` dump on graceful exit. Must be set prior to any :py:mod:`cProfile` dump on graceful exit. Must be set prior to
:py:class:`Broker` being constructed, e.g. via: construction of any :py:class:`Broker`, e.g. via:
.. code:: .. code::
@ -557,7 +370,7 @@ Router Class
**Context Factories** **Context Factories**
.. method:: fork (new_stack=False, on_fork=None, debug=False, profiling=False, via=None) .. method:: fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None)
Construct a context on the local machine by forking the current Construct a context on the local machine by forking the current
process. The forked child receives a new identity, sets up a new broker process. The forked child receives a new identity, sets up a new broker
@ -631,18 +444,19 @@ Router Class
The associated stream implementation is The associated stream implementation is
:py:class:`mitogen.fork.Stream`. :py:class:`mitogen.fork.Stream`.
:param bool new_stack:
If :py:data:`True`, arrange for the local thread stack to be
discarded, by forking from a new thread. Aside from clean
tracebacks, this has the effect of causing objects referenced by
the stack to cease existing in the child.
:param function on_fork: :param function on_fork:
Function invoked as `on_fork()` from within the child process. This Function invoked as `on_fork()` from within the child process. This
permits supplying a program-specific cleanup function to break permits supplying a program-specific cleanup function to break
locks and close file descriptors belonging to the parent from locks and close file descriptors belonging to the parent from
within the child. within the child.
:param function on_start:
Invoked as `on_start(econtext)` from within the child process after
it has been set up, but before the function dispatch loop starts.
This permits supplying a custom child main function that inherits
rich data structures that cannot normally be passed via a
serialization.
:param Context via: :param Context via:
Same as the `via` parameter for :py:meth:`local`. Same as the `via` parameter for :py:meth:`local`.
@ -674,19 +488,26 @@ Router Class
``python2.7``. In future this may default to ``sys.executable``. ``python2.7``. In future this may default to ``sys.executable``.
:param bool debug: :param bool debug:
If ``True``, arrange for debug logging (:py:meth:`enable_debug`) to If :data:`True`, arrange for debug logging (:py:meth:`enable_debug`) to
be enabled in the new context. Automatically ``True`` when be enabled in the new context. Automatically :data:`True` when
:py:meth:`enable_debug` has been called, but may be used :py:meth:`enable_debug` has been called, but may be used
selectively otherwise. selectively otherwise.
:param bool unidirectional:
If :data:`True`, arrange for the child's router to be constructed
with :attr:`unidirectional routing
<mitogen.core.Router.unidirectional>` enabled. Automatically
:data:`True` when it was enabled for this router, but may still be
explicitly set to :data:`False`.
:param float connect_timeout: :param float connect_timeout:
Fractional seconds to wait for the subprocess to indicate it is Fractional seconds to wait for the subprocess to indicate it is
healthy. Defaults to 30 seconds. healthy. Defaults to 30 seconds.
:param bool profiling: :param bool profiling:
If ``True``, arrange for profiling (:py:data:`profiling`) to be If :data:`True`, arrange for profiling (:py:data:`profiling`) to be
enabled in the new context. Automatically ``True`` when enabled in the new context. Automatically :data:`True` when
:py:data:`profiling` is ``True``, but may be used selectively :py:data:`profiling` is :data:`True`, but may be used selectively
otherwise. otherwise.
:param mitogen.core.Context via: :param mitogen.core.Context via:
@ -1032,7 +853,7 @@ Context Class
Asynchronous calls may be dispatched in parallel to multiple Asynchronous calls may be dispatched in parallel to multiple
contexts and consumed as they complete using contexts and consumed as they complete using
:py:class:`mitogen.master.Select`. :py:class:`mitogen.select.Select`.
.. method:: call (fn, \*args, \*\*kwargs) .. method:: call (fn, \*args, \*\*kwargs)
@ -1048,7 +869,7 @@ Context Class
Receiver Class Receiver Class
-------------- ==============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
@ -1067,8 +888,8 @@ Receiver Class
handle is chosen. handle is chosen.
:param bool persist: :param bool persist:
If ``True``, do not unregister the receiver's handler after the first If :data:`True`, do not unregister the receiver's handler after the
message. first message.
:param mitogen.core.Context respondent: :param mitogen.core.Context respondent:
Reference to the context this receiver is receiving from. If not Reference to the context this receiver is receiving from. If not
@ -1080,7 +901,7 @@ Receiver Class
If not ``None``, a reference to a function invoked as If not ``None``, a reference to a function invoked as
`notify(receiver)` when a new message is delivered to this receiver. `notify(receiver)` when a new message is delivered to this receiver.
Used by :py:class:`mitogen.master.Select` to implement waiting on Used by :py:class:`mitogen.select.Select` to implement waiting on
multiple receivers. multiple receivers.
.. py:method:: to_sender () .. py:method:: to_sender ()
@ -1102,11 +923,12 @@ Receiver Class
.. py:method:: empty () .. py:method:: empty ()
Return ``True`` if calling :py:meth:`get` would block. Return :data:`True` if calling :py:meth:`get` would block.
As with :py:class:`Queue.Queue`, ``True`` may be returned even though a As with :py:class:`Queue.Queue`, :data:`True` may be returned even
subsequent call to :py:meth:`get` will succeed, since a message may be though a subsequent call to :py:meth:`get` will succeed, since a
posted at any moment between :py:meth:`empty` and :py:meth:`get`. message may be posted at any moment between :py:meth:`empty` and
:py:meth:`get`.
:py:meth:`empty` is only useful to avoid a race while installing :py:meth:`empty` is only useful to avoid a race while installing
:py:attr:`notify`: :py:attr:`notify`:
@ -1156,7 +978,7 @@ Receiver Class
Sender Class Sender Class
------------ ============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
@ -1183,8 +1005,164 @@ Sender Class
Send `data` to the remote end. Send `data` to the remote end.
Select Class
============
.. module:: mitogen.select
.. currentmodule:: mitogen.select
.. class:: Select (receivers=(), oneshot=True)
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:py:class:`mitogen.core.Receiver` or :py:class:`mitogen.select.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :py:meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.select.Select(recvs):
print 'Got %s from %s' % (msg, msg.receiver)
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
:py:class:`Select` may drive a long-running scheduler:
.. code-block:: python
with mitogen.select.Select(oneshot=False) as select:
while running():
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:py:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work()),
mitogen.select.Select([
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work())
])
]
for msg in mitogen.select.Select(selects):
print msg.unpickle()
.. py:classmethod:: all (it)
Take an iterable of receivers and retrieve a :py:class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch :py:class:`Context.call_async`
invocations:
.. code-block:: python
print('Total disk usage: %.02fMiB' % (sum(
mitogen.select.Select.all(
context.call_async(get_disk_usage)
for context in contexts
) / 1048576.0
),))
However, unlike in a naive comprehension such as:
.. code-block:: python
sum(context.call_async(get_disk_usage).get().unpickle()
for context in contexts)
Result processing happens concurrently to new results arriving, so
:py:meth:`all` should always be faster.
.. py:method:: get (timeout=None, block=True)
Fetch the next available value from any receiver, or raise
:py:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :py:attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:param bool block:
If :py:data:`False`, immediately raise
:py:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:py:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the underlying latch is no
longer valid.
.. py:method:: __bool__ ()
Return :data:`True` if any receivers are registered with this select.
.. py:method:: close ()
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :py:meth:`get` to be woken with
:py:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
used.
.. py:method:: empty ()
Return :data:`True` if calling :py:meth:`get` would block.
As with :py:class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and
:py:meth:`get`.
:py:meth:`empty` may return ``False`` even when :py:meth:`get` would
block if another thread has drained a receiver added to this select.
This can be avoided by only consuming each receiver from a single
thread.
.. py:method:: __iter__ (self)
Yield the result of :py:meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :py:meth:`remove`.
.. py:method:: add (recv)
Add the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :py:meth:`remove`, then it will
still be returned by a subsequent :py:meth:`get`. This may change in a
future version.
Channel Class Channel Class
------------- =============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
@ -1256,10 +1234,10 @@ Broker Class
.. method:: keep_alive .. method:: keep_alive
Return ``True`` if any reader's :py:attr:`Side.keep_alive` attribute is Return :data:`True` if any reader's :py:attr:`Side.keep_alive`
``True``, or any :py:class:`Context` is still registered that is not attribute is :data:`True`, or any :py:class:`Context` is still
the master. Used to delay shutdown while some important work is in registered that is not the master. Used to delay shutdown while some
progress (e.g. log draining). important work is in progress (e.g. log draining).
**Internal Methods** **Internal Methods**
@ -1284,9 +1262,10 @@ Broker Class
non-payment results in termination for one customer. non-payment results in termination for one customer.
:param bool install_watcher: :param bool install_watcher:
If ``True``, an additional thread is started to monitor the lifetime of If :data:`True`, an additional thread is started to monitor the
the main thread, triggering :py:meth:`shutdown` automatically in case lifetime of the main thread, triggering :py:meth:`shutdown`
the user forgets to call it, or their code crashed. automatically in case the user forgets to call it, or their code
crashed.
You should not rely on this functionality in your program, it is only You should not rely on this functionality in your program, it is only
intended as a fail-safe and to simplify the API for new users. In intended as a fail-safe and to simplify the API for new users. In
@ -1348,12 +1327,12 @@ A random assortment of utility functions useful on masters and children.
are written to :py:data:`sys.stderr`. are written to :py:data:`sys.stderr`.
:param bool io: :param bool io:
If ``True``, include extremely verbose IO logs in the output. Useful If :data:`True`, include extremely verbose IO logs in the output.
for debugging hangs, less useful for debugging application code. Useful for debugging hangs, less useful for debugging application code.
:parm bool usec: :parm bool usec:
If ``True``, include microsecond timestamps. This greatly helps when If :data:`True`, include microsecond timestamps. This greatly helps
debugging races and similar determinism issues. when debugging races and similar determinism issues.
:param str level: :param str level:
Name of the :py:mod:`logging` package constant that is the minimum Name of the :py:mod:`logging` package constant that is the minimum

@ -29,8 +29,8 @@ sponsorship and outstanding future-thinking of its early adopters.
<p> <p>
<br clear="all"> <br clear="all">
For global career opportunities, please visit <a For career opportunities, please visit <a
href="http://www.cgi.com/en/careers/working-at-cgi">http://www.cgi.com/en/careers/working-at-cgi</a>. href="http://cgi-group.co.uk/defence-and-intelligence-opportunities">cgi-group.co.uk/defence-and-intelligence-opportunities</a>.
</p> </p>
<p style="margin-bottom: 0px;"> <p style="margin-bottom: 0px;">

@ -256,7 +256,7 @@ without the need for writing asynchronous code::
contexts = [router.ssh(hostname=hn) for hn in hostnames] contexts = [router.ssh(hostname=hn) for hn in hostnames]
calls = [context.call(my_func) for context in contexts] calls = [context.call(my_func) for context in contexts]
for recv, (msg, data) in mitogen.master.Select(calls): for msg in mitogen.select.Select(calls):
print 'Reply from %s: %s' % (recv.context, data) print 'Reply from %s: %s' % (recv.context, data)

@ -258,7 +258,7 @@ Stream Protocol
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
Once connected, a basic framing protocol is used to communicate between Once connected, a basic framing protocol is used to communicate between
parent and child: parent and child. Integers use big endian in their encoded form.
.. list-table:: .. list-table::
:header-rows: 1 :header-rows: 1
@ -342,23 +342,6 @@ Masters listen on the following handles:
million parent contexts to be created and destroyed before the associated million parent contexts to be created and destroyed before the associated
Router must be recreated. Router must be recreated.
.. _IS_DEAD:
.. currentmodule:: mitogen.core
.. data:: IS_DEAD
Special value used to signal disconnection or the inability to route a
message, when it appears in the `reply_to` field. Usually causes
:class:`mitogen.core.ChannelError` to be raised when it is received.
It indicates the sender did not know how to process the message, or wishes
no further messages to be delivered to it. It is used when:
* a remote receiver is disconnected or explicitly closed.
* a related message could not be delivered due to no route existing for it.
* a router is being torn down, as a sentinel value to notify
:py:meth:`mitogen.core.Router.add_handler` callbacks to clean up.
Children listen on the following handles: Children listen on the following handles:
.. _LOAD_MODULE: .. _LOAD_MODULE:
@ -434,6 +417,22 @@ also listen on the following handles:
route from its local table, then propagates the message upward towards its route from its local table, then propagates the message upward towards its
own parent. own parent.
.. currentmodule:: mitogen.core
.. data:: DETACHING
Sent to inform a parent that user code has invoked
:meth:`ExternalContext.detach` to decouple the lifecycle of a directly
connected context and its subtree from the running program.
A child usually shuts down immediately if it loses its parent connection,
and parents usually terminate any related Python/SSH subprocess on
disconnection. Receiving :data:`DETACHING` informs the parent the
connection will soon drop, but the process intends to continue life
independently, and to avoid terminating the related subprocess if that
subprocess is the child itself.
Non-master parents also listen on the following handles:
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. data:: GET_MODULE .. data:: GET_MODULE
@ -446,18 +445,39 @@ also listen on the following handles:
a direct descendant. a direct descendant.
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. data:: DETACHING .. data:: FORWARD_MODULE
Sent to inform a parent that user code has invoked Receives `(context, fullname)` tuples from its parent and arranges for a
:meth:`ExternalContext.detach` to decouple the lifecycle of a directly :data:`LOAD_MODULE` to be sent towards `context` for the module `fullname`
connected context and its subtree from the running program. and any related modules. The module must already have been delivered to the
current context by its parent in a prior :data:`LOAD_MODULE` message.
A child usually shuts down immediately if it loses its parent connection, If the receiver is the immediate parent of `context`, then only
and parents usually terminate any related Python/SSH subprocess on :data:`LOAD_MODULE` is sent to the child. Otherwise :data:`LOAD_MODULE` is
disconnection. Receiving :data:`DETACHING` informs the parent the sent to the next closest parent if the module has not previously been sent
connection will soon drop, but the process intends to continue life on that stream, followed by a copy of the :data:`FORWARD_MODULE` message.
independently, and to avoid terminating the related subprocess if that
subprocess is the child itself. This message is used to recursively preload indirect children with modules,
ensuring they are cached and deduplicated at each hop in the chain leading
to the target context.
Special values for the `reply_to` field:
.. _IS_DEAD:
.. currentmodule:: mitogen.core
.. data:: IS_DEAD
Special value used to signal disconnection or the inability to route a
message, when it appears in the `reply_to` field. Usually causes
:class:`mitogen.core.ChannelError` to be raised when it is received.
It indicates the sender did not know how to process the message, or wishes
no further messages to be delivered to it. It is used when:
* a remote receiver is disconnected or explicitly closed.
* a related message could not be delivered due to no route existing for it.
* a router is being torn down, as a sentinel value to notify
:py:meth:`mitogen.core.Router.add_handler` callbacks to clean up.
Additional handles are created to receive the result of every function call Additional handles are created to receive the result of every function call

@ -261,6 +261,19 @@ Other Stream Subclasses
:members: :members:
Poller Class
------------
.. currentmodule:: mitogen.core
.. autoclass:: Poller
.. currentmodule:: mitogen.parent
.. autoclass:: KqueuePoller
.. currentmodule:: mitogen.parent
.. autoclass:: EpollPoller
Importer Class Importer Class
-------------- --------------

@ -6,14 +6,48 @@ Service Framework
================= =================
Mitogen includes a simple framework for implementing services exposed to other Mitogen includes a simple framework for implementing services exposed to other
contexts, with built-in subclasses that capture some common service models. contexts, with some built-in subclasses to capture common designs. This is a
This is a work in progress, and new functionality will be added as common usage work in progress, and new functionality will be added as common usage patterns
patterns emerge. emerge.
Overview Overview
-------- --------
Service
* User-supplied class with explicitly exposed methods.
* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass).
* May be auto-imported/constructed in a child from a parent simply by calling it
* Children receive refusals if the class is not already activated by a aprent
* Has an associated Select instance which may be dynamically loaded with
receivers over time, on_message_received() invoked if any receiver becomes
ready.
Invoker
* Abstracts mechanism for calling a service method and verifying permissions.
* Built-in 'service.Invoker': concurrent execution of all methods on the thread pool.
* Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct
(method, kwargs) key, only one such method executes, return value is cached
and broadcast to all requesters.
Activator
* Abstracts mechanism for activating a service and verifying activation
permission.
* Built-in activator looks for service by fully.qualified.ClassName using
Python import mechanism, and only permits parents to trigger activation.
Pool
* Manages a fixed-size thread pool, mapping of service name to Invoker, and an
aggregate Select over every activate service's Selects.
* Constructed automatically in children in response to the first
CALL_SERVICE message sent to them by a parent.
* Must be constructed manually in parent context.
* Has close() and add() methods.
Example Example
------- -------

@ -18,6 +18,7 @@ import time
import mitogen.core import mitogen.core
import mitogen.master import mitogen.master
import mitogen.select
import mitogen.utils import mitogen.utils
@ -185,7 +186,7 @@ def main(router):
sys.exit(1) sys.exit(1)
delay = 2.0 delay = 2.0
select = mitogen.master.Select(oneshot=False) select = mitogen.select.Select(oneshot=False)
hosts = [] hosts = []
# For each hostname on the command line, create a Host instance, a Mitogen # For each hostname on the command line, create a Host instance, a Mitogen

@ -11,9 +11,6 @@ import mitogen.unix
class PingService(mitogen.service.Service): class PingService(mitogen.service.Service):
well_known_id = 500
max_message_size = 1000
def dispatch(self, dct, msg): def dispatch(self, dct, msg):
return 'Hello, world' return 'Hello, world'

@ -36,33 +36,19 @@ be expected. On the slave, it is built dynamically during startup.
__version__ = (0, 0, 2) __version__ = (0, 0, 2)
#: This is ``False`` in slave contexts. It is used in single-file Python #: This is :data:`False` in slave contexts. Previously it was used to prevent
#: programs to avoid reexecuting the program's :py:func:`main` function in the #: re-execution of :mod:`__main__` in single file programs, however that now
#: slave. For example: #: happens automatically.
#:
#: .. code-block:: python
#:
#: def do_work():
#: os.system('hostname')
#:
#: def main(broker):
#: context = mitogen.master.connect(broker)
#: context.call(do_work) # Causes slave to import __main__.
#:
#: if __name__ == '__main__' and mitogen.is_master:
#: import mitogen.utils
#: mitogen.utils.run_with_broker(main)
#:
is_master = True is_master = True
#: This is ``0`` in a master, otherwise it is a master-generated ID unique to #: This is `0` in a master, otherwise it is the master-assigned ID unique to
#: the slave context used for message routing. #: the slave context used for message routing.
context_id = 0 context_id = 0
#: This is ``None`` in a master, otherwise it is the master-generated ID unique #: This is :data:`None` in a master, otherwise it is the master-assigned ID
#: to the slave's parent context. #: unique to the slave's parent context.
parent_id = None parent_id = None
@ -76,8 +62,8 @@ def main(log_level='INFO', profiling=False):
Convenience decorator primarily useful for writing discardable test Convenience decorator primarily useful for writing discardable test
scripts. scripts.
In the master process, when `func` is defined in the ``__main__`` module, In the master process, when `func` is defined in the :mod:`__main__`
arranges for `func(router)` to be invoked immediately, with module, arranges for `func(router)` to be invoked immediately, with
:py:class:`mitogen.master.Router` construction and destruction handled just :py:class:`mitogen.master.Router` construction and destruction handled just
as in :py:func:`mitogen.utils.run_with_router`. In slaves, this function as in :py:func:`mitogen.utils.run_with_router`. In slaves, this function
does nothing. does nothing.

@ -26,6 +26,12 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
"""
This module implements most package functionality, but remains separate from
non-essential code in order to reduce its size, since it is also serves as the
bootstrap implementation sent to every new slave context.
"""
import collections import collections
import errno import errno
import fcntl import fcntl
@ -33,7 +39,6 @@ import imp
import itertools import itertools
import logging import logging
import os import os
import select
import signal import signal
import socket import socket
import struct import struct
@ -45,6 +50,9 @@ import warnings
import weakref import weakref
import zlib import zlib
# Absolute imports for <2.5.
select = __import__('select')
try: try:
import cPickle import cPickle
except ImportError: except ImportError:
@ -67,6 +75,10 @@ IOLOG.setLevel(logging.INFO)
_v = False _v = False
_vv = False _vv = False
# Also taken by Broker, no blocking work can occur with it held.
_service_call_lock = threading.Lock()
_service_calls = []
GET_MODULE = 100 GET_MODULE = 100
CALL_FUNCTION = 101 CALL_FUNCTION = 101
FORWARD_LOG = 102 FORWARD_LOG = 102
@ -75,9 +87,16 @@ DEL_ROUTE = 104
ALLOCATE_ID = 105 ALLOCATE_ID = 105
SHUTDOWN = 106 SHUTDOWN = 106
LOAD_MODULE = 107 LOAD_MODULE = 107
DETACHING = 108 FORWARD_MODULE = 108
DETACHING = 109
CALL_SERVICE = 110
IS_DEAD = 999 IS_DEAD = 999
try:
BaseException
except NameError:
BaseException = Exception
PY3 = sys.version_info > (3,) PY3 = sys.version_info > (3,)
if PY3: if PY3:
b = lambda s: s.encode('latin-1') b = lambda s: s.encode('latin-1')
@ -134,7 +153,7 @@ class Secret(UnicodeType):
class CallError(Error): class CallError(Error):
def __init__(self, fmt=None, *args): def __init__(self, fmt=None, *args):
if not isinstance(fmt, Exception): if not isinstance(fmt, BaseException):
Error.__init__(self, fmt, *args) Error.__init__(self, fmt, *args)
else: else:
e = fmt e = fmt
@ -229,7 +248,7 @@ def io_op(func, *args):
while True: while True:
try: try:
return func(*args), False return func(*args), False
except (select.error, OSError): except (select.error, OSError, IOError):
e = sys.exc_info()[1] e = sys.exc_info()[1]
_vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e) _vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
if e[0] == errno.EINTR: if e[0] == errno.EINTR:
@ -267,6 +286,9 @@ class PidfulStreamHandler(logging.StreamHandler):
def enable_debug_logging(): def enable_debug_logging():
global _v, _vv
_v = True
_vv = True
root = logging.getLogger() root = logging.getLogger()
root.setLevel(logging.DEBUG) root.setLevel(logging.DEBUG)
IOLOG.setLevel(logging.DEBUG) IOLOG.setLevel(logging.DEBUG)
@ -280,9 +302,11 @@ def enable_debug_logging():
_profile_hook = lambda name, func, *args: func(*args) _profile_hook = lambda name, func, *args: func(*args)
def enable_profiling(): def enable_profiling():
global _profile_hook global _profile_hook
import cProfile, pstats import cProfile
import pstats
def _profile_hook(name, func, *args): def _profile_hook(name, func, *args):
profiler = cProfile.Profile() profiler = cProfile.Profile()
profiler.enable() profiler.enable()
@ -299,6 +323,13 @@ def enable_profiling():
fp.close() fp.close()
def import_module(modname):
"""
Import `module` and return the attribute named `attr`.
"""
return __import__(modname, None, None, [''])
class Message(object): class Message(object):
dst_id = None dst_id = None
src_id = None src_id = None
@ -363,7 +394,10 @@ class Message(object):
msg.dst_id = self.src_id msg.dst_id = self.src_id
msg.handle = self.reply_to msg.handle = self.reply_to
vars(msg).update(kwargs) vars(msg).update(kwargs)
(self.router or router).route(msg) if msg.handle:
(self.router or router).route(msg)
else:
LOG.debug('Message.reply(): discarding due to zero handle: %r', msg)
def unpickle(self, throw=True, throw_dead=True): def unpickle(self, throw=True, throw_dead=True):
"""Deserialize `data` into an object.""" """Deserialize `data` into an object."""
@ -522,6 +556,7 @@ class Importer(object):
'lxc', 'lxc',
'master', 'master',
'parent', 'parent',
'select',
'service', 'service',
'setns', 'setns',
'ssh', 'ssh',
@ -753,6 +788,7 @@ class Side(object):
def __init__(self, stream, fd, cloexec=True, keep_alive=True): def __init__(self, stream, fd, cloexec=True, keep_alive=True):
self.stream = stream self.stream = stream
self.fd = fd self.fd = fd
self.closed = False
self.keep_alive = keep_alive self.keep_alive = keep_alive
self._fork_refs[id(self)] = self self._fork_refs[id(self)] = self
if cloexec: if cloexec:
@ -762,21 +798,16 @@ class Side(object):
def __repr__(self): def __repr__(self):
return '<Side of %r fd %s>' % (self.stream, self.fd) return '<Side of %r fd %s>' % (self.stream, self.fd)
def fileno(self):
if self.fd is None:
raise StreamError('%r.fileno() called but no FD set', self)
return self.fd
@classmethod @classmethod
def _on_fork(cls): def _on_fork(cls):
for side in list(cls._fork_refs.values()): for side in list(cls._fork_refs.values()):
side.close() side.close()
def close(self): def close(self):
if self.fd is not None: if not self.closed:
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
os.close(self.fd) os.close(self.fd)
self.fd = None self.closed = True
def read(self, n=CHUNK_SIZE): def read(self, n=CHUNK_SIZE):
s, disconnected = io_op(os.read, self.fd, n) s, disconnected = io_op(os.read, self.fd, n)
@ -800,11 +831,11 @@ class BasicStream(object):
def on_disconnect(self, broker): def on_disconnect(self, broker):
LOG.debug('%r.on_disconnect()', self) LOG.debug('%r.on_disconnect()', self)
broker.stop_receive(self)
broker._stop_transmit(self)
if self.receive_side: if self.receive_side:
broker.stop_receive(self)
self.receive_side.close() self.receive_side.close()
if self.transmit_side: if self.transmit_side:
broker._stop_transmit(self)
self.transmit_side.close() self.transmit_side.close()
fire(self, 'disconnect') fire(self, 'disconnect')
@ -974,12 +1005,6 @@ class Context(object):
_v and LOG.debug('%r.on_disconnect()', self) _v and LOG.debug('%r.on_disconnect()', self)
fire(self, 'disconnect') fire(self, 'disconnect')
def send(self, msg):
"""send `obj` to `handle`, and tell the broker we have output. May
be called from any thread."""
msg.dst_id = self.context_id
self.router.route(msg)
def send_async(self, msg, persist=False): def send_async(self, msg, persist=False):
if self.router.broker._thread == threading.currentThread(): # TODO if self.router.broker._thread == threading.currentThread(): # TODO
raise SystemError('Cannot making blocking call on broker thread') raise SystemError('Cannot making blocking call on broker thread')
@ -992,6 +1017,25 @@ class Context(object):
self.send(msg) self.send(msg)
return receiver return receiver
def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('%r.call_service_async(%r, %r, %r)',
self, service_name, method_name, kwargs)
if not isinstance(service_name, basestring):
service_name = service_name.name() # Service.name()
tup = (service_name, method_name, kwargs)
msg = Message.pickled(tup, handle=CALL_SERVICE)
return self.send_async(msg)
def send(self, msg):
"""send `obj` to `handle`, and tell the broker we have output. May
be called from any thread."""
msg.dst_id = self.context_id
self.router.route(msg)
def call_service(self, service_name, method_name, **kwargs):
recv = self.call_service_async(service_name, method_name, **kwargs)
return recv.get().unpickle()
def send_await(self, msg, deadline=None): def send_await(self, msg, deadline=None):
"""Send `msg` and wait for a response with an optional timeout.""" """Send `msg` and wait for a response with an optional timeout."""
receiver = self.send_async(msg) receiver = self.send_async(msg)
@ -1014,7 +1058,56 @@ def _unpickle_context(router, context_id, name):
return router.context_class(router, context_id, name) return router.context_class(router, context_id, name)
class Poller(object):
def __init__(self):
self._rfds = {}
self._wfds = {}
@property
def readers(self):
return list(self._rfds.items())
@property
def writers(self):
return list(self._wfds.items())
def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self))
def close(self):
pass
def start_receive(self, fd, data=None):
self._rfds[fd] = data or fd
def stop_receive(self, fd):
self._rfds.pop(fd, None)
def start_transmit(self, fd, data=None):
self._wfds[fd] = data or fd
def stop_transmit(self, fd):
self._wfds.pop(fd, None)
def poll(self, timeout=None):
_vv and IOLOG.debug('%r.poll(%r)', self, timeout)
(rfds, wfds, _), _ = io_op(select.select,
self._rfds,
self._wfds,
(), timeout
)
for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
yield self._rfds[fd]
for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
yield self._wfds[fd]
class Latch(object): class Latch(object):
poller_class = Poller
closed = False closed = False
_waking = 0 _waking = 0
_sockets = [] _sockets = []
@ -1058,7 +1151,6 @@ class Latch(object):
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)',
self, timeout, block) self, timeout, block)
self._lock.acquire() self._lock.acquire()
try: try:
if self.closed: if self.closed:
@ -1074,14 +1166,19 @@ class Latch(object):
finally: finally:
self._lock.release() self._lock.release()
return self._get_sleep(timeout, block, rsock, wsock) poller = self.poller_class()
poller.start_receive(rsock.fileno())
try:
return self._get_sleep(poller, timeout, block, rsock, wsock)
finally:
poller.close()
def _get_sleep(self, timeout, block, rsock, wsock): def _get_sleep(self, poller, timeout, block, rsock, wsock):
_vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)', _vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)',
self, timeout, block) self, timeout, block)
e = None e = None
try: try:
io_op(select.select, [rsock], [], [], timeout) list(poller.poll(timeout))
except Exception: except Exception:
e = sys.exc_info()[1] e = sys.exc_info()[1]
@ -1091,7 +1188,7 @@ class Latch(object):
del self._sleeping[i] del self._sleeping[i]
self._sockets.append((rsock, wsock)) self._sockets.append((rsock, wsock))
if i >= self._waking: if i >= self._waking:
raise TimeoutError() raise e or TimeoutError()
self._waking -= 1 self._waking -= 1
if rsock.recv(2) != '\x7f': if rsock.recv(2) != '\x7f':
raise LatchError('internal error: received >1 wakeups') raise LatchError('internal error: received >1 wakeups')
@ -1348,7 +1445,7 @@ class Router(object):
refused_msg = 'Refused by policy.' refused_msg = 'Refused by policy.'
def _invoke(self, msg, stream): def _invoke(self, msg, stream):
#IOLOG.debug('%r._invoke(%r)', self, msg) # IOLOG.debug('%r._invoke(%r)', self, msg)
try: try:
persist, fn, policy = self._handle_map[msg.handle] persist, fn, policy = self._handle_map[msg.handle]
except KeyError: except KeyError:
@ -1432,16 +1529,20 @@ class Router(object):
class Broker(object): class Broker(object):
poller_class = Poller
_waker = None _waker = None
_thread = None _thread = None
shutdown_timeout = 3.0 shutdown_timeout = 3.0
def __init__(self): def __init__(self, poller_class=None):
self._alive = True self._alive = True
self._waker = Waker(self) self._waker = Waker(self)
self.defer = self._waker.defer self.defer = self._waker.defer
self._readers = [self._waker.receive_side] self.poller = self.poller_class()
self._writers = [] self.poller.start_receive(
self._waker.receive_side.fd,
(self._waker.receive_side, self._waker.on_receive)
)
self._thread = threading.Thread( self._thread = threading.Thread(
target=_profile_hook, target=_profile_hook,
args=('broker', self._broker_main), args=('broker', self._broker_main),
@ -1450,33 +1551,30 @@ class Broker(object):
self._thread.start() self._thread.start()
self._waker.broker_ident = self._thread.ident self._waker.broker_ident = self._thread.ident
def _list_discard(self, lst, value):
try:
lst.remove(value)
except ValueError:
pass
def _list_add(self, lst, value):
if value not in lst:
lst.append(value)
def start_receive(self, stream): def start_receive(self, stream):
_vv and IOLOG.debug('%r.start_receive(%r)', self, stream) _vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
assert stream.receive_side and stream.receive_side.fd is not None side = stream.receive_side
self.defer(self._list_add, self._readers, stream.receive_side) assert side and side.fd is not None
self.defer(self.poller.start_receive,
side.fd, (side, stream.on_receive))
def stop_receive(self, stream): def stop_receive(self, stream):
IOLOG.debug('%r.stop_receive(%r)', self, stream) _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
self.defer(self._list_discard, self._readers, stream.receive_side) self.defer(self.poller.stop_receive, stream.receive_side.fd)
def _start_transmit(self, stream): def _start_transmit(self, stream):
IOLOG.debug('%r._start_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream)
assert stream.transmit_side and stream.transmit_side.fd is not None side = stream.transmit_side
self._list_add(self._writers, stream.transmit_side) assert side and side.fd is not None
self.poller.start_transmit(side.fd, (side, stream.on_transmit))
def _stop_transmit(self, stream): def _stop_transmit(self, stream):
IOLOG.debug('%r._stop_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream)
self._list_discard(self._writers, stream.transmit_side) self.poller.stop_transmit(stream.transmit_side.fd)
def keep_alive(self):
it = (side.keep_alive for (_, (side, _)) in self.poller.readers)
return sum(it, 0)
def _call(self, stream, func): def _call(self, stream, func):
try: try:
@ -1486,26 +1584,12 @@ class Broker(object):
stream.on_disconnect(self) stream.on_disconnect(self)
def _loop_once(self, timeout=None): def _loop_once(self, timeout=None):
_vv and IOLOG.debug('%r._loop_once(%r)', self, timeout) _vv and IOLOG.debug('%r._loop_once(%r, %r)',
self, timeout, self.poller)
#IOLOG.debug('readers = %r', self._readers) #IOLOG.debug('readers =\n%s', pformat(self.poller.readers))
#IOLOG.debug('writers = %r', self._writers) #IOLOG.debug('writers =\n%s', pformat(self.poller.writers))
(rsides, wsides, _), _ = io_op(select.select, for (side, func) in self.poller.poll(timeout):
self._readers, self._call(side.stream, func)
self._writers,
(), timeout
)
for side in rsides:
_vv and IOLOG.debug('%r: POLLIN for %r', self, side)
self._call(side.stream, side.stream.on_receive)
for side in wsides:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, side)
self._call(side.stream, side.stream.on_transmit)
def keep_alive(self):
return sum((side.keep_alive for side in self._readers), 0)
def _broker_main(self): def _broker_main(self):
try: try:
@ -1513,8 +1597,7 @@ class Broker(object):
self._loop_once() self._loop_once()
fire(self, 'shutdown') fire(self, 'shutdown')
for _, (side, _) in self.poller.readers + self.poller.writers:
for side in set(self._readers).union(self._writers):
self._call(side.stream, side.stream.on_shutdown) self._call(side.stream, side.stream.on_shutdown)
deadline = time.time() + self.shutdown_timeout deadline = time.time() + self.shutdown_timeout
@ -1527,7 +1610,7 @@ class Broker(object):
'more child processes still connected to ' 'more child processes still connected to '
'our stdout/stderr pipes.', self) 'our stdout/stderr pipes.', self)
for side in set(self._readers).union(self._writers): for _, (side, _) in self.poller.readers + self.poller.writers:
LOG.error('_broker_main() force disconnecting %r', side) LOG.error('_broker_main() force disconnecting %r', side)
side.stream.on_disconnect(self) side.stream.on_disconnect(self)
except Exception: except Exception:
@ -1551,13 +1634,38 @@ class Broker(object):
class ExternalContext(object): class ExternalContext(object):
detached = False detached = False
def __init__(self, config):
self.config = config
def _on_broker_shutdown(self): def _on_broker_shutdown(self):
self.channel.close() self.recv.close()
def _on_broker_exit(self): def _on_broker_exit(self):
if not self.profiling: if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGTERM)
def _on_call_service_msg(self, msg):
"""
Stub CALL_SERVICE handler, push message on temporary queue and invoke
_on_stub_call() from the main thread.
"""
if msg.is_dead:
return
_service_call_lock.acquire()
try:
_service_calls.append(msg)
finally:
_service_call_lock.release()
self.router.route(
Message.pickled(
dst_id=mitogen.context_id,
handle=CALL_FUNCTION,
obj=('mitogen.service', None, '_on_stub_call', (), {}),
router=self.router,
)
)
def _on_shutdown_msg(self, msg): def _on_shutdown_msg(self, msg):
_v and LOG.debug('_on_shutdown_msg(%r)', msg) _v and LOG.debug('_on_shutdown_msg(%r)', msg)
if not msg.is_dead: if not msg.is_dead:
@ -1593,29 +1701,35 @@ class ExternalContext(object):
LOG.error('Stream had %d bytes after 2000ms', pending) LOG.error('Stream had %d bytes after 2000ms', pending)
self.broker.defer(stream.on_disconnect, self.broker) self.broker.defer(stream.on_disconnect, self.broker)
def _setup_master(self, max_message_size, profiling, unidirectional, def _setup_master(self):
parent_id, context_id, in_fd, out_fd): Router.max_message_size = self.config['max_message_size']
Router.max_message_size = max_message_size if self.config['profiling']:
self.profiling = profiling
if profiling:
enable_profiling() enable_profiling()
self.broker = Broker() self.broker = Broker()
self.router = Router(self.broker) self.router = Router(self.broker)
self.router.undirectional = unidirectional self.router.undirectional = self.config['unidirectional']
self.router.add_handler( self.router.add_handler(
fn=self._on_shutdown_msg, fn=self._on_shutdown_msg,
handle=SHUTDOWN, handle=SHUTDOWN,
policy=has_parent_authority, policy=has_parent_authority,
) )
self.router.add_handler(
fn=self._on_call_service_msg,
handle=CALL_SERVICE,
policy=has_parent_authority,
)
self.master = Context(self.router, 0, 'master') self.master = Context(self.router, 0, 'master')
parent_id = self.config['parent_ids'][0]
if parent_id == 0: if parent_id == 0:
self.parent = self.master self.parent = self.master
else: else:
self.parent = Context(self.router, parent_id, 'parent') self.parent = Context(self.router, parent_id, 'parent')
self.channel = Receiver(router=self.router, in_fd = self.config.get('in_fd', 100)
handle=CALL_FUNCTION, out_fd = self.config.get('out_fd', 1)
policy=has_parent_authority) self.recv = Receiver(router=self.router,
handle=CALL_FUNCTION,
policy=has_parent_authority)
self.stream = Stream(self.router, parent_id) self.stream = Stream(self.router, parent_id)
self.stream.name = 'parent' self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd) self.stream.accept(in_fd, out_fd)
@ -1633,20 +1747,22 @@ class ExternalContext(object):
except OSError: except OSError:
pass # No first stage exists (e.g. fakessh) pass # No first stage exists (e.g. fakessh)
def _setup_logging(self, debug, log_level): def _setup_logging(self):
root = logging.getLogger() root = logging.getLogger()
root.setLevel(log_level) root.setLevel(self.config['log_level'])
root.handlers = [LogHandler(self.master)] root.handlers = [LogHandler(self.master)]
if debug: if self.config['debug']:
enable_debug_logging() enable_debug_logging()
def _setup_importer(self, importer, core_src_fd, whitelist, blacklist): def _setup_importer(self):
importer = self.config.get('importer')
if importer: if importer:
importer._install_handler(self.router) importer._install_handler(self.router)
importer._context = self.parent importer._context = self.parent
else: else:
core_src_fd = self.config.get('core_src_fd', 101)
if core_src_fd: if core_src_fd:
fp = os.fdopen(101, 'r', 1) fp = os.fdopen(core_src_fd, 'r', 1)
try: try:
core_size = int(fp.readline()) core_size = int(fp.readline())
core_src = fp.read(core_size) core_src = fp.read(core_size)
@ -1657,8 +1773,13 @@ class ExternalContext(object):
else: else:
core_src = None core_src = None
importer = Importer(self.router, self.parent, importer = Importer(
core_src, whitelist, blacklist) self.router,
self.parent,
core_src,
self.config.get('whitelist', ()),
self.config.get('blacklist', ()),
)
self.importer = importer self.importer = importer
self.router.importer = importer self.router.importer = importer
@ -1678,12 +1799,12 @@ class ExternalContext(object):
sys.modules['mitogen.core'] = mitogen.core sys.modules['mitogen.core'] = mitogen.core
del sys.modules['__main__'] del sys.modules['__main__']
def _setup_globals(self, version, context_id, parent_ids): def _setup_globals(self):
mitogen.__version__ = version
mitogen.is_master = False mitogen.is_master = False
mitogen.context_id = context_id mitogen.__version__ = self.config['version']
mitogen.parent_ids = parent_ids mitogen.context_id = self.config['context_id']
mitogen.parent_id = parent_ids[0] mitogen.parent_ids = self.config['parent_ids'][:]
mitogen.parent_id = mitogen.parent_ids[0]
def _setup_stdio(self): def _setup_stdio(self):
# We must open this prior to closing stdout, otherwise it will recycle # We must open this prior to closing stdout, otherwise it will recycle
@ -1718,7 +1839,7 @@ class ExternalContext(object):
_v and LOG.debug('_dispatch_calls(%r)', data) _v and LOG.debug('_dispatch_calls(%r)', data)
modname, klass, func, args, kwargs = data modname, klass, func, args, kwargs = data
obj = __import__(modname, {}, {}, ['']) obj = import_module(modname)
if klass: if klass:
obj = getattr(obj, klass) obj = getattr(obj, klass)
fn = getattr(obj, func) fn = getattr(obj, func)
@ -1729,7 +1850,10 @@ class ExternalContext(object):
return fn(*args, **kwargs) return fn(*args, **kwargs)
def _dispatch_calls(self): def _dispatch_calls(self):
for msg in self.channel: if self.config.get('on_start'):
self.config['on_start'](self)
for msg in self.recv:
try: try:
msg.reply(self._dispatch_one(msg)) msg.reply(self._dispatch_one(msg))
except Exception: except Exception:
@ -1738,28 +1862,24 @@ class ExternalContext(object):
msg.reply(CallError(e)) msg.reply(CallError(e))
self.dispatch_stopped = True self.dispatch_stopped = True
def main(self, parent_ids, context_id, debug, profiling, log_level, def main(self):
unidirectional, max_message_size, version, in_fd=100, out_fd=1, self._setup_master()
core_src_fd=101, setup_stdio=True, setup_package=True,
importer=None, whitelist=(), blacklist=()):
self._setup_master(max_message_size, profiling, unidirectional,
parent_ids[0], context_id, in_fd, out_fd)
try: try:
try: try:
self._setup_logging(debug, log_level) self._setup_logging()
self._setup_importer(importer, core_src_fd, whitelist, blacklist) self._setup_importer()
self._reap_first_stage() self._reap_first_stage()
if setup_package: if self.config.get('setup_package', True):
self._setup_package() self._setup_package()
self._setup_globals(version, context_id, parent_ids) self._setup_globals()
if setup_stdio: if self.config.get('setup_stdio', True):
self._setup_stdio() self._setup_stdio()
self.router.register(self.parent, self.stream) self.router.register(self.parent, self.stream)
sys.executable = os.environ.pop('ARGV0', sys.executable) sys.executable = os.environ.pop('ARGV0', sys.executable)
_v and LOG.debug('Connected to %s; my ID is %r, PID is %r', _v and LOG.debug('Connected to %s; my ID is %r, PID is %r',
self.parent, context_id, os.getpid()) self.parent, mitogen.context_id, os.getpid())
_v and LOG.debug('Recovered sys.executable: %r', sys.executable) _v and LOG.debug('Recovered sys.executable: %r', sys.executable)
_profile_hook('main', self._dispatch_calls) _profile_hook('main', self._dispatch_calls)

@ -26,6 +26,70 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
"""
:mod:`mitogen.fakessh` is a stream implementation that starts a subprocess with
its environment modified such that ``PATH`` searches for `ssh` return a Mitogen
implementation of SSH. When invoked, this implementation arranges for the
command line supplied by the caller to be executed in a remote context, reusing
the parent context's (possibly proxied) connection to that remote context.
This allows tools like `rsync` and `scp` to transparently reuse the connections
and tunnels already established by the host program to connect to a target
machine, without wasteful redundant SSH connection setup, 3-way handshakes, or
firewall hopping configurations, and enables these tools to be used in
impossible scenarios, such as over `sudo` with ``requiretty`` enabled.
The fake `ssh` command source is written to a temporary file on disk, and
consists of a copy of the :py:mod:`mitogen.core` source code (just like any
other child context), with a line appended to cause it to connect back to the
host process over an FD it inherits. As there is no reliance on an existing
filesystem file, it is possible for child contexts to use fakessh.
As a consequence of connecting back through an inherited FD, only one SSH
invocation is possible, which is fine for tools like `rsync`, however in future
this restriction will be lifted.
Sequence:
1. ``fakessh`` Context and Stream created by parent context. The stream's
buffer has a :py:func:`_fakessh_main` :py:data:`CALL_FUNCTION
<mitogen.core.CALL_FUNCTION>` enqueued.
2. Target program (`rsync/scp/sftp`) invoked, which internally executes
`ssh` from ``PATH``.
3. :py:mod:`mitogen.core` bootstrap begins, recovers the stream FD
inherited via the target program, established itself as the fakessh
context.
4. :py:func:`_fakessh_main` :py:data:`CALL_FUNCTION
<mitogen.core.CALL_FUNCTION>` is read by fakessh context,
a. sets up :py:class:`IoPump` for stdio, registers
stdin_handle for local context.
b. Enqueues :py:data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>` for
:py:func:`_start_slave` invoked in target context,
i. the program from the `ssh` command line is started
ii. sets up :py:class:`IoPump` for `ssh` command line process's
stdio pipes
iii. returns `(control_handle, stdin_handle)` to
:py:func:`_fakessh_main`
5. :py:func:`_fakessh_main` receives control/stdin handles from from
:py:func:`_start_slave`,
a. registers remote's stdin_handle with local :py:class:`IoPump`.
b. sends `("start", local_stdin_handle)` to remote's control_handle
c. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker`.
d. loops waiting for `local stdout closed && remote stdout closed`
6. :py:func:`_start_slave` control channel receives `("start", stdin_handle)`,
a. registers remote's stdin_handle with local :py:class:`IoPump`
b. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker`.
c. loops waiting for `local stdout closed && remote stdout closed`
"""
import getopt import getopt
import inspect import inspect
import os import os
@ -304,6 +368,25 @@ def _fakessh_main(dest_context_id, econtext):
process.control.put(('exit', None)) process.control.put(('exit', None))
def _get_econtext_config(context, sock2):
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
return {
'context_id': context.context_id,
'core_src_fd': None,
'debug': getattr(context.router, 'debug', False),
'in_fd': sock2.fileno(),
'log_level': mitogen.parent.get_log_level(),
'max_message_size': context.router.max_message_size,
'out_fd': sock2.fileno(),
'parent_ids': parent_ids,
'profiling': getattr(context.router, 'profiling', False),
'unidirectional': getattr(context.router, 'unidirectional', False),
'setup_stdio': False,
'version': mitogen.__version__,
}
# #
# Public API. # Public API.
# #
@ -311,6 +394,26 @@ def _fakessh_main(dest_context_id, econtext):
@mitogen.core.takes_econtext @mitogen.core.takes_econtext
@mitogen.core.takes_router @mitogen.core.takes_router
def run(dest, router, args, deadline=None, econtext=None): def run(dest, router, args, deadline=None, econtext=None):
"""
Run the command specified by `args` such that ``PATH`` searches for SSH by
the command will cause its attempt to use SSH to execute a remote program
to be redirected to use mitogen to execute that program using the context
`dest` instead.
:param list args:
Argument vector.
:param mitogen.core.Context dest:
The destination context to execute the SSH command line in.
:param mitogen.core.Router router:
:param list[str] args:
Command line arguments for local program, e.g.
``['rsync', '/tmp', 'remote:/tmp']``
:returns:
Exit status of the child process.
"""
if econtext is not None: if econtext is not None:
mitogen.parent.upgrade_router(econtext) mitogen.parent.upgrade_router(econtext)
@ -328,9 +431,6 @@ def run(dest, router, args, deadline=None, econtext=None):
# Held in socket buffer until process is booted. # Held in socket buffer until process is booted.
fakessh.call_async(_fakessh_main, dest.context_id) fakessh.call_async(_fakessh_main, dest.context_id)
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
tmp_path = tempfile.mkdtemp(prefix='mitogen_fakessh') tmp_path = tempfile.mkdtemp(prefix='mitogen_fakessh')
try: try:
ssh_path = os.path.join(tmp_path, 'ssh') ssh_path = os.path.join(tmp_path, 'ssh')
@ -339,20 +439,9 @@ def run(dest, router, args, deadline=None, econtext=None):
fp.write('#!%s\n' % (sys.executable,)) fp.write('#!%s\n' % (sys.executable,))
fp.write(inspect.getsource(mitogen.core)) fp.write(inspect.getsource(mitogen.core))
fp.write('\n') fp.write('\n')
fp.write('ExternalContext().main(**%r)\n' % ({ fp.write('ExternalContext(%r).main()\n' % (
'context_id': context_id, _get_econtext_config(context, sock2),
'core_src_fd': None, ))
'debug': getattr(router, 'debug', False),
'in_fd': sock2.fileno(),
'log_level': mitogen.parent.get_log_level(),
'max_message_size': router.max_message_size,
'out_fd': sock2.fileno(),
'parent_ids': parent_ids,
'profiling': getattr(router, 'profiling', False),
'unidirectional': getattr(router, 'unidirectional', False),
'setup_stdio': False,
'version': mitogen.__version__,
},))
finally: finally:
fp.close() fp.close()

@ -90,12 +90,14 @@ class Stream(mitogen.parent.Stream):
on_fork = None on_fork = None
def construct(self, old_router, max_message_size, on_fork=None, def construct(self, old_router, max_message_size, on_fork=None,
debug=False, profiling=False, unidirectional=False): debug=False, profiling=False, unidirectional=False,
on_start=None):
# fork method only supports a tiny subset of options. # fork method only supports a tiny subset of options.
super(Stream, self).construct(max_message_size=max_message_size, super(Stream, self).construct(max_message_size=max_message_size,
debug=debug, profiling=profiling, debug=debug, profiling=profiling,
unidirectional=False) unidirectional=False)
self.on_fork = on_fork self.on_fork = on_fork
self.on_start = on_start
responder = getattr(old_router, 'responder', None) responder = getattr(old_router, 'responder', None)
if isinstance(responder, mitogen.parent.ModuleForwarder): if isinstance(responder, mitogen.parent.ModuleForwarder):
@ -134,6 +136,7 @@ class Stream(mitogen.parent.Stream):
# Expected by the ExternalContext.main(). # Expected by the ExternalContext.main().
os.dup2(childfp.fileno(), 1) os.dup2(childfp.fileno(), 1)
os.dup2(childfp.fileno(), 100) os.dup2(childfp.fileno(), 100)
# Overwritten by ExternalContext.main(); we must replace the # Overwritten by ExternalContext.main(); we must replace the
# parent-inherited descriptors that were closed by Side._on_fork() to # parent-inherited descriptors that were closed by Side._on_fork() to
# avoid ExternalContext.main() accidentally allocating new files over # avoid ExternalContext.main() accidentally allocating new files over
@ -146,14 +149,24 @@ class Stream(mitogen.parent.Stream):
if devnull != 2: if devnull != 2:
os.dup2(devnull, 2) os.dup2(devnull, 2)
os.close(devnull) os.close(devnull)
childfp.close()
kwargs = self.get_main_kwargs() # If we're unlucky, childfp.fileno() may coincidentally be one of our
kwargs['core_src_fd'] = None # desired FDs. In that case closing it breaks ExternalContext.main().
kwargs['importer'] = self.importer if childfp.fileno() not in (0, 1, 100):
kwargs['setup_package'] = False childfp.close()
config = self.get_econtext_config()
config['core_src_fd'] = None
config['importer'] = self.importer
config['setup_package'] = False
if self.on_start:
config['on_start'] = self.on_start
try: try:
mitogen.core.ExternalContext().main(**kwargs) mitogen.core.ExternalContext(config).main()
except Exception:
# TODO: report exception somehow.
os._exit(72)
finally: finally:
# Don't trigger atexit handlers, they were copied from the parent. # Don't trigger atexit handlers, they were copied from the parent.
os._exit(0) os._exit(0)

@ -26,6 +26,13 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
"""
This module implements functionality required by master processes, such as
starting new contexts via SSH. Its size is also restricted, since it must
be sent to any context that will be used to establish additional child
contexts.
"""
import dis import dis
import imp import imp
import inspect import inspect
@ -181,110 +188,6 @@ class ThreadWatcher(object):
return watcher return watcher
class SelectError(mitogen.core.Error):
pass
class Select(object):
notify = None
@classmethod
def all(cls, receivers):
return list(msg.unpickle() for msg in cls(receivers))
def __init__(self, receivers=(), oneshot=True):
self._receivers = []
self._oneshot = oneshot
self._latch = mitogen.core.Latch()
for recv in receivers:
self.add(recv)
def _put(self, value):
self._latch.put(value)
if self.notify:
self.notify(self)
def __bool__(self):
return bool(self._receivers)
def __enter__(self):
return self
def __exit__(self, e_type, e_val, e_tb):
self.close()
def __iter__(self):
while self._receivers:
yield self.get()
loop_msg = 'Adding this Select instance would create a Select cycle'
def _check_no_loop(self, recv):
if recv is self:
raise SelectError(self.loop_msg)
for recv_ in self._receivers:
if recv_ == recv:
raise SelectError(self.loop_msg)
if isinstance(recv_, Select):
recv_._check_no_loop(recv)
owned_msg = 'Cannot add: Receiver is already owned by another Select'
def add(self, recv):
if isinstance(recv, Select):
recv._check_no_loop(self)
self._receivers.append(recv)
if recv.notify is not None:
raise SelectError(self.owned_msg)
recv.notify = self._put
# Avoid race by polling once after installation.
if not recv.empty():
self._put(recv)
not_present_msg = 'Instance is not a member of this Select'
def remove(self, recv):
try:
if recv.notify != self._put:
raise ValueError
self._receivers.remove(recv)
recv.notify = None
except (IndexError, ValueError):
raise SelectError(self.not_present_msg)
def close(self):
for recv in self._receivers[:]:
self.remove(recv)
self._latch.close()
def empty(self):
return self._latch.empty()
empty_msg = 'Cannot get(), Select instance is empty'
def get(self, timeout=None):
if not self._receivers:
raise SelectError(self.empty_msg)
while True:
recv = self._latch.get(timeout=timeout)
try:
msg = recv.get(block=False)
if self._oneshot:
self.remove(recv)
msg.receiver = recv
return msg
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
continue
class LogForwarder(object): class LogForwarder(object):
def __init__(self, router): def __init__(self, router):
self._router = router self._router = router
@ -661,6 +564,7 @@ class ModuleResponder(object):
class Broker(mitogen.core.Broker): class Broker(mitogen.core.Broker):
shutdown_timeout = 5.0 shutdown_timeout = 5.0
_watcher = None _watcher = None
poller_class = mitogen.parent.PREFERRED_POLLER
def __init__(self, install_watcher=True): def __init__(self, install_watcher=True):
if install_watcher: if install_watcher:

@ -26,13 +26,18 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
"""
This module defines functionality common to master and parent processes. It is
sent to any child context that is due to become a parent, due to recursive
connection.
"""
import errno import errno
import fcntl import fcntl
import getpass import getpass
import inspect import inspect
import logging import logging
import os import os
import select
import signal import signal
import socket import socket
import subprocess import subprocess
@ -44,6 +49,9 @@ import time
import types import types
import zlib import zlib
# Absolute imports for <2.5.
select = __import__('select')
try: try:
from cStringIO import StringIO as BytesIO from cStringIO import StringIO as BytesIO
except ImportError: except ImportError:
@ -70,23 +78,6 @@ except:
SC_OPEN_MAX = 1024 SC_OPEN_MAX = 1024
class Argv(object):
def __init__(self, argv):
self.argv = argv
def escape(self, x):
s = '"'
for c in x:
if c in '\\$"`':
s += '\\'
s += c
s += '"'
return s
def __str__(self):
return ' '.join(map(self.escape, self.argv))
def get_log_level(): def get_log_level():
return (LOG.level or logging.getLogger().level or logging.INFO) return (LOG.level or logging.getLogger().level or logging.INFO)
@ -375,55 +366,58 @@ def hybrid_tty_create_child(args):
def write_all(fd, s, deadline=None): def write_all(fd, s, deadline=None):
timeout = None timeout = None
written = 0 written = 0
poller = PREFERRED_POLLER()
poller.start_transmit(fd)
while written < len(s): try:
if deadline is not None: while written < len(s):
timeout = max(0, deadline - time.time()) if deadline is not None:
if timeout == 0: timeout = max(0, deadline - time.time())
raise mitogen.core.TimeoutError('write timed out') if timeout == 0:
raise mitogen.core.TimeoutError('write timed out')
_, wfds, _ = select.select([], [fd], [], timeout)
if not wfds:
continue
n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written)) for fd in poller.poll(timeout):
if disconnected: n, disconnected = mitogen.core.io_op(
raise mitogen.core.StreamError('EOF on stream during write') os.write, fd, buffer(s, written))
if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
written += n written += n
finally:
poller.close()
def iter_read(fds, deadline=None): def iter_read(fds, deadline=None):
fds = list(fds) poller = PREFERRED_POLLER()
for fd in fds:
poller.start_receive(fd)
bits = [] bits = []
timeout = None timeout = None
try:
while fds: while poller.readers:
if deadline is not None: if deadline is not None:
timeout = max(0, deadline - time.time()) timeout = max(0, deadline - time.time())
if timeout == 0: if timeout == 0:
break break
rfds, _, _ = select.select(fds, [], [], timeout) for fd in poller.poll(timeout):
if not rfds: s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
continue if disconnected or not s:
IOLOG.debug('iter_read(%r) -> disconnected', fd)
for fd in rfds: poller.stop_receive(fd)
s, disconnected = mitogen.core.io_op(os.read, fd, 4096) else:
if disconnected or not s: IOLOG.debug('iter_read(%r) -> %r', fd, s)
IOLOG.debug('iter_read(%r) -> disconnected', fd) bits.append(s)
fds.remove(fd) yield s
else: finally:
IOLOG.debug('iter_read(%r) -> %r', fd, s) poller.close()
bits.append(s)
yield s if not poller.readers:
if not fds:
raise mitogen.core.StreamError( raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' % 'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],) (''.join(bits)[-300:],)
) )
raise mitogen.core.TimeoutError('read timed out') raise mitogen.core.TimeoutError('read timed out')
@ -436,8 +430,41 @@ def discard_until(fd, s, deadline):
return return
def _upgrade_broker(broker):
"""
Extract the poller state from Broker and replace it with the industrial
strength poller for this OS. Must run on the Broker thread.
"""
# This function is deadly! The act of calling start_receive() generates log
# messages which must be silenced as the upgrade progresses, otherwise the
# poller state will change as it is copied, resulting in write fds that are
# lost. (Due to LogHandler->Router->Stream->Broker->Poller, where Stream
# only calls start_transmit() when transitioning from empty to non-empty
# buffer. If the start_transmit() is lost, writes from the child hang
# permanently).
root = logging.getLogger()
old_level = root.level
root.setLevel(logging.CRITICAL)
old = broker.poller
new = PREFERRED_POLLER()
for fd, data in old.readers:
new.start_receive(fd, data)
for fd, data in old.writers:
new.start_transmit(fd, data)
old.close()
broker.poller = new
root.setLevel(old_level)
LOG.debug('replaced %r with %r (new: %d readers, %d writers; '
'old: %d readers, %d writers)', old, new,
len(new.readers), len(new.writers),
len(old.readers), len(old.writers))
def upgrade_router(econtext): def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO if not isinstance(econtext.router, Router): # TODO
econtext.broker.defer(_upgrade_broker, econtext.broker)
econtext.router.__class__ = Router # TODO econtext.router.__class__ = Router # TODO
econtext.router.upgrade( econtext.router.upgrade(
importer=econtext.importer, importer=econtext.importer,
@ -465,9 +492,8 @@ def stream_by_method_name(name):
""" """
if name == 'local': if name == 'local':
name = 'parent' name = 'parent'
Stream = None module = mitogen.core.import_module('mitogen.' + name)
exec('from mitogen.%s import Stream' % (name,)) return module.Stream
return Stream
@mitogen.core.takes_econtext @mitogen.core.takes_econtext
@ -497,6 +523,182 @@ def _proxy_connect(name, method_name, kwargs, econtext):
} }
class Argv(object):
def __init__(self, argv):
self.argv = argv
def escape(self, x):
s = '"'
for c in x:
if c in '\\$"`':
s += '\\'
s += c
s += '"'
return s
def __str__(self):
return ' '.join(map(self.escape, self.argv))
class KqueuePoller(mitogen.core.Poller):
_repr = 'KqueuePoller()'
def __init__(self):
self._kqueue = select.kqueue()
self._rfds = {}
self._wfds = {}
self._changelist = []
def close(self):
self._kqueue.close()
@property
def readers(self):
return list(self._rfds.items())
@property
def writers(self):
return list(self._wfds.items())
def _control(self, fd, filters, flags):
mitogen.core._vv and IOLOG.debug(
'%r._control(%r, %r, %r)', self, fd, filters, flags)
self._changelist.append(select.kevent(fd, filters, flags))
def start_receive(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)',
self, fd, data)
if fd not in self._rfds:
self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
self._rfds[fd] = data or fd
def stop_receive(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd)
if fd in self._rfds:
self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
del self._rfds[fd]
def start_transmit(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)',
self, fd, data)
if fd not in self._wfds:
self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
self._wfds[fd] = data or fd
def stop_transmit(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd)
if fd in self._wfds:
self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
del self._wfds[fd]
def poll(self, timeout=None):
changelist = self._changelist
self._changelist = []
events, _ = mitogen.core.io_op(self._kqueue.control,
changelist, 32, timeout)
for event in events:
fd = event.ident
if event.filter == select.KQ_FILTER_READ and fd in self._rfds:
# Events can still be read for an already-discarded fd.
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
yield self._rfds[fd]
elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
yield self._wfds[fd]
class EpollPoller(mitogen.core.Poller):
_repr = 'EpollPoller()'
def __init__(self):
self._epoll = select.epoll(32)
self._registered_fds = set()
self._rfds = {}
self._wfds = {}
def close(self):
self._epoll.close()
@property
def readers(self):
return list(self._rfds.items())
@property
def writers(self):
return list(self._wfds.items())
def _control(self, fd):
mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd)
mask = (((fd in self._rfds) and select.EPOLLIN) |
((fd in self._wfds) and select.EPOLLOUT))
if mask:
if fd in self._registered_fds:
self._epoll.modify(fd, mask)
else:
self._epoll.register(fd, mask)
self._registered_fds.add(fd)
elif fd in self._registered_fds:
self._epoll.unregister(fd)
self._registered_fds.remove(fd)
def start_receive(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)',
self, fd, data)
self._rfds[fd] = data or fd
self._control(fd)
def stop_receive(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd)
self._rfds.pop(fd, None)
self._control(fd)
def start_transmit(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)',
self, fd, data)
self._wfds[fd] = data or fd
self._control(fd)
def stop_transmit(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd)
self._wfds.pop(fd, None)
self._control(fd)
_inmask = (getattr(select, 'EPOLLIN', 0) |
getattr(select, 'EPOLLHUP', 0))
def poll(self, timeout=None):
the_timeout = -1
if timeout is not None:
the_timeout = timeout
events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32)
for fd, event in events:
if event & self._inmask and fd in self._rfds:
# Events can still be read for an already-discarded fd.
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
yield self._rfds[fd]
if event & select.EPOLLOUT and fd in self._wfds:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
yield self._wfds[fd]
POLLER_BY_SYSNAME = {
'Darwin': KqueuePoller,
'FreeBSD': KqueuePoller,
'Linux': EpollPoller,
}
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
os.uname()[0],
mitogen.core.Poller,
)
# For apps that start threads dynamically, it's possible Latch will also get
# very high-numbered wait fds when there are many connections, and so select()
# becomes useless there too. So swap in our favourite poller.
mitogen.core.Latch.poller_class = PREFERRED_POLLER
class TtyLogStream(mitogen.core.BasicStream): class TtyLogStream(mitogen.core.BasicStream):
""" """
For "hybrid TTY/socketpair" mode, after a connection has been setup, a For "hybrid TTY/socketpair" mode, after a connection has been setup, a
@ -592,7 +794,7 @@ class Stream(mitogen.core.Stream):
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Request the slave gracefully shut itself down.""" """Request the slave gracefully shut itself down."""
LOG.debug('%r closing CALL_FUNCTION channel', self) LOG.debug('%r closing CALL_FUNCTION channel', self)
self.send( self._send(
mitogen.core.Message( mitogen.core.Message(
src_id=mitogen.context_id, src_id=mitogen.context_id,
dst_id=self.remote_id, dst_id=self.remote_id,
@ -628,7 +830,7 @@ class Stream(mitogen.core.Stream):
except OSError: except OSError:
e = sys.exc_info()[1] e = sys.exc_info()[1]
if e.args[0] == errno.ECHILD: if e.args[0] == errno.ECHILD:
LOG.warn('%r: waitpid(%r) produced ECHILD', self.pid, self) LOG.warn('%r: waitpid(%r) produced ECHILD', self, self.pid)
return return
raise raise
@ -701,7 +903,7 @@ class Stream(mitogen.core.Stream):
'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded,) 'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded,)
] ]
def get_main_kwargs(self): def get_econtext_config(self):
assert self.max_message_size is not None assert self.max_message_size is not None
parent_ids = mitogen.parent_ids[:] parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id) parent_ids.insert(0, mitogen.context_id)
@ -720,8 +922,8 @@ class Stream(mitogen.core.Stream):
def get_preamble(self): def get_preamble(self):
source = inspect.getsource(mitogen.core) source = inspect.getsource(mitogen.core)
source += '\nExternalContext().main(**%r)\n' % ( source += '\nExternalContext(%r).main()\n' % (
self.get_main_kwargs(), self.get_econtext_config(),
) )
return zlib.compress(minimize_source(source), 9) return zlib.compress(minimize_source(source), 9)
@ -1011,7 +1213,6 @@ class Router(mitogen.core.Router):
try: try:
stream.connect() stream.connect()
except mitogen.core.TimeoutError: except mitogen.core.TimeoutError:
e = sys.exc_info()[1]
raise mitogen.core.StreamError(self.connection_timeout_msg) raise mitogen.core.StreamError(self.connection_timeout_msg)
context.name = stream.name context.name = stream.name
self.route_monitor.notice_stream(stream) self.route_monitor.notice_stream(stream)
@ -1106,6 +1307,12 @@ class ModuleForwarder(object):
self.router = router self.router = router
self.parent_context = parent_context self.parent_context = parent_context
self.importer = importer self.importer = importer
router.add_handler(
fn=self._on_forward_module,
handle=mitogen.core.FORWARD_MODULE,
persist=True,
policy=mitogen.core.has_parent_authority,
)
router.add_handler( router.add_handler(
fn=self._on_get_module, fn=self._on_get_module,
handle=mitogen.core.GET_MODULE, handle=mitogen.core.GET_MODULE,
@ -1116,34 +1323,64 @@ class ModuleForwarder(object):
def __repr__(self): def __repr__(self):
return 'ModuleForwarder(%r)' % (self.router,) return 'ModuleForwarder(%r)' % (self.router,)
def _on_forward_module(self, msg):
if msg.is_dead:
return
context_id_s, fullname = msg.data.partition('\x00')
context_id = int(context_id_s)
stream = self.router.stream_by_id(context_id)
if stream.remote_id == mitogen.parent_id:
LOG.error('%r: dropping FORWARD_MODULE(%d, %r): no route to child',
self, context_id, fullname)
return
LOG.debug('%r._on_forward_module() sending %r to %r via %r',
self, fullname, context_id, stream.remote_id)
self._send_module_and_related(stream, fullname)
if stream.remote_id != context_id:
stream._send(
mitogen.core.Message(
dst_id=stream.remote_id,
handle=mitogen.core.FORWARD_MODULE,
data=msg.data,
)
)
def _on_get_module(self, msg): def _on_get_module(self, msg):
LOG.debug('%r._on_get_module(%r)', self, msg) LOG.debug('%r._on_get_module(%r)', self, msg)
if msg.is_dead: if msg.is_dead:
return return
fullname = msg.data fullname = msg.data
callback = lambda: self._on_cache_callback(msg, fullname) self.importer._request_module(fullname,
self.importer._request_module(fullname, callback) lambda: self._on_cache_callback(msg, fullname)
def _send_one_module(self, msg, tup):
self.router._async_route(
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
) )
def _on_cache_callback(self, msg, fullname): def _on_cache_callback(self, msg, fullname):
LOG.debug('%r._on_get_module(): sending %r', self, fullname) LOG.debug('%r._on_get_module(): sending %r', self, fullname)
stream = self.router.stream_by_id(msg.src_id)
self._send_module_and_related(stream, fullname)
def _send_module_and_related(self, stream, fullname):
tup = self.importer._cache[fullname] tup = self.importer._cache[fullname]
if tup is not None: for related in tup[4]:
for related in tup[4]: rtup = self.importer._cache.get(related)
rtup = self.importer._cache.get(related) if rtup:
if not rtup: self._send_one_module(stream, rtup)
LOG.debug('%r._on_get_module(): skipping absent %r', else:
self, related) LOG.debug('%r._send_module_and_related(%r): absent: %r',
continue self, fullname, related)
self._send_one_module(msg, rtup)
self._send_one_module(stream, tup)
self._send_one_module(msg, tup)
def _send_one_module(self, stream, tup):
if tup[0] not in stream.sent_modules:
stream.sent_modules.add(tup[0])
self.router._async_route(
mitogen.core.Message.pickled(
tup,
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
)

@ -0,0 +1,133 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import mitogen.core
class Error(mitogen.core.Error):
pass
class Select(object):
notify = None
@classmethod
def all(cls, receivers):
return list(msg.unpickle() for msg in cls(receivers))
def __init__(self, receivers=(), oneshot=True):
self._receivers = []
self._oneshot = oneshot
self._latch = mitogen.core.Latch()
for recv in receivers:
self.add(recv)
def _put(self, value):
self._latch.put(value)
if self.notify:
self.notify(self)
def __bool__(self):
return bool(self._receivers)
def __enter__(self):
return self
def __exit__(self, e_type, e_val, e_tb):
self.close()
def __iter__(self):
while self._receivers:
yield self.get()
loop_msg = 'Adding this Select instance would create a Select cycle'
def _check_no_loop(self, recv):
if recv is self:
raise Error(self.loop_msg)
for recv_ in self._receivers:
if recv_ == recv:
raise Error(self.loop_msg)
if isinstance(recv_, Select):
recv_._check_no_loop(recv)
owned_msg = 'Cannot add: Receiver is already owned by another Select'
def add(self, recv):
if isinstance(recv, Select):
recv._check_no_loop(self)
self._receivers.append(recv)
if recv.notify is not None:
raise Error(self.owned_msg)
recv.notify = self._put
# Avoid race by polling once after installation.
if not recv.empty():
self._put(recv)
not_present_msg = 'Instance is not a member of this Select'
def remove(self, recv):
try:
if recv.notify != self._put:
raise ValueError
self._receivers.remove(recv)
recv.notify = None
except (IndexError, ValueError):
raise Error(self.not_present_msg)
def close(self):
for recv in self._receivers[:]:
self.remove(recv)
self._latch.close()
def empty(self):
return self._latch.empty()
empty_msg = 'Cannot get(), Select instance is empty'
def get(self, timeout=None, block=True):
if not self._receivers:
raise Error(self.empty_msg)
while True:
recv = self._latch.get(timeout=timeout, block=block)
try:
msg = recv.get(block=False)
if self._oneshot:
self.remove(recv)
msg.receiver = recv
return msg
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
continue

@ -31,27 +31,40 @@ import sys
import threading import threading
import mitogen.core import mitogen.core
import mitogen.master import mitogen.select
from mitogen.core import LOG from mitogen.core import LOG
class Policy(object): DEFAULT_POOL_SIZE = 16
""" _pool = None
Base security policy.
"""
def is_authorized(self, service, msg):
raise NotImplementedError()
class AllowAny(Policy): @mitogen.core.takes_router
def is_authorized(self, service, msg): def get_or_create_pool(size=None, router=None):
return True global _pool
if _pool is None:
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE)
return _pool
class AllowParents(Policy): @mitogen.core.takes_router
def is_authorized(self, service, msg): def _on_stub_call(router):
return (msg.auth_id in mitogen.parent_ids or """
msg.auth_id == mitogen.context_id) Called for each message received by the core.py stub CALL_SERVICE handler.
Create the pool if it doesn't already exist, and push enqueued messages
into the pool's receiver. This may be called more than once as the stub
service handler runs in asynchronous context, while _on_stub_call() happens
on the main thread. Multiple CALL_SERVICE may end up enqueued before Pool
has a chance to install the real CALL_SERVICE handler.
"""
pool = get_or_create_pool(router=router)
mitogen.core._service_call_lock.acquire()
try:
for msg in mitogen.core._service_calls:
pool._receiver._on_receive(msg)
del mitogen.core._service_calls[:]
finally:
mitogen.core._service_call_lock.release()
def validate_arg_spec(spec, args): def validate_arg_spec(spec, args):
@ -132,63 +145,86 @@ def no_reply():
return wrapper return wrapper
class Service(object): class Error(Exception):
#: Sentinel object to suppress reply generation, since returning ``None`` """
#: will trigger a response message containing the pickled ``None``. Raised when an error occurs configuring a service or pool.
NO_REPLY = object() """
#: If ``None``, a handle is dynamically allocated, otherwise the fixed
#: integer handle to use.
handle = None
max_message_size = 0
def __init__(self, router): class Policy(object):
self.router = router """
self.recv = mitogen.core.Receiver(router, self.handle) Base security policy.
self.recv.service = self """
self.handle = self.recv.handle def is_authorized(self, service, msg):
self.running = True raise NotImplementedError()
def __repr__(self):
return '%s()' % (self.__class__.__name__,)
def on_shutdown(self): class AllowAny(Policy):
""" def is_authorized(self, service, msg):
Called by Pool.shutdown() once the last worker thread has exitted. return True
"""
def dispatch(self, args, msg):
raise NotImplementedError()
def _validate_message(self, msg): class AllowParents(Policy):
if len(msg.data) > self.max_message_size: def is_authorized(self, service, msg):
raise mitogen.core.CallError('Message size exceeded.') return (msg.auth_id in mitogen.parent_ids or
msg.auth_id == mitogen.context_id)
pair = msg.unpickle(throw=False)
if not (isinstance(pair, tuple) and
len(pair) == 2 and
isinstance(pair[0], basestring)):
raise mitogen.core.CallError('Invalid message format.')
method_name, kwargs = pair class Activator(object):
method = getattr(self, method_name, None) """
"""
def is_permitted(self, mod_name, class_name, msg):
return mitogen.core.has_parent_authority(msg)
not_active_msg = (
'Service %r is not yet activated in this context, and the '
'caller is not privileged, therefore autoactivation is disabled.'
)
def activate(self, pool, service_name, msg):
mod_name, _, class_name = service_name.rpartition('.')
if not self.is_permitted(mod_name, class_name, msg):
raise mitogen.core.CallError(self.not_active_msg, service_name)
module = mitogen.core.import_module(mod_name)
klass = getattr(module, class_name)
service = klass(pool.router)
pool.add(service)
return service
class Invoker(object):
def __init__(self, service):
self.service = service
def __repr__(self):
return '%s(%s)' % (type(self).__name__, self.service)
unauthorized_msg = (
'Caller is not authorized to invoke %r of service %r'
)
def _validate(self, method_name, kwargs, msg):
method = getattr(self.service, method_name, None)
if method is None: if method is None:
raise mitogen.core.CallError('No such method exists.') raise mitogen.core.CallError('No such method: %r', method_name)
policies = getattr(method, 'mitogen_service__policies', None) policies = getattr(method, 'mitogen_service__policies', None)
if not policies: if not policies:
raise mitogen.core.CallError('Method has no policies set.') raise mitogen.core.CallError('Method has no policies set.')
if not all(p.is_authorized(self, msg) for p in policies): if not all(p.is_authorized(self.service, msg) for p in policies):
raise mitogen.core.CallError('Unauthorized') raise mitogen.core.CallError(
self.unauthorized_msg,
method_name,
self.service.name()
)
required = getattr(method, 'mitogen_service__arg_spec', {}) required = getattr(method, 'mitogen_service__arg_spec', {})
validate_arg_spec(required, kwargs) validate_arg_spec(required, kwargs)
return method_name, kwargs
def _on_receive_message(self, msg): def _invoke(self, method_name, kwargs, msg):
method_name, kwargs = self._validate_message(msg) method = getattr(self.service, method_name)
method = getattr(self, method_name)
if 'msg' in method.func_code.co_varnames: if 'msg' in method.func_code.co_varnames:
kwargs['msg'] = msg # TODO: hack kwargs['msg'] = msg # TODO: hack
@ -197,7 +233,7 @@ class Service(object):
try: try:
ret = method(**kwargs) ret = method(**kwargs)
if no_reply: if no_reply:
return self.NO_REPLY return Service.NO_REPLY
return ret return ret
except Exception: except Exception:
if no_reply: if no_reply:
@ -206,22 +242,14 @@ class Service(object):
else: else:
raise raise
def on_receive_message(self, msg): def invoke(self, method_name, kwargs, msg):
try: self._validate(method_name, kwargs, msg)
response = self._on_receive_message(msg) response = self._invoke(method_name, kwargs, msg)
if response is not self.NO_REPLY: if response is not Service.NO_REPLY:
msg.reply(response) msg.reply(response)
except mitogen.core.CallError:
e = sys.exc_info()[1]
LOG.warning('%r: call error: %s: %s', self, msg, e)
msg.reply(e)
except Exception:
LOG.exception('While invoking %r.dispatch()', self)
e = sys.exc_info()[1]
msg.reply(mitogen.core.CallError(e))
class DeduplicatingService(Service): class DeduplicatingInvoker(Invoker):
""" """
A service that deduplicates and caches expensive responses. Requests are A service that deduplicates and caches expensive responses. Requests are
deduplicated according to a customizable key, and the single expensive deduplicated according to a customizable key, and the single expensive
@ -233,8 +261,8 @@ class DeduplicatingService(Service):
Only one pool thread is blocked during generation of the response, Only one pool thread is blocked during generation of the response,
regardless of the number of requestors. regardless of the number of requestors.
""" """
def __init__(self, router): def __init__(self, service):
super(DeduplicatingService, self).__init__(router) super(DeduplicatingInvoker, self).__init__(service)
self._responses = {} self._responses = {}
self._waiters = {} self._waiters = {}
self._lock = threading.Lock() self._lock = threading.Lock()
@ -261,10 +289,8 @@ class DeduplicatingService(Service):
finally: finally:
self._lock.release() self._lock.release()
def _on_receive_message(self, msg): def _invoke(self, method_name, kwargs, msg):
method_name, kwargs = self._validate_message(msg)
key = self.key_from_request(method_name, kwargs) key = self.key_from_request(method_name, kwargs)
self._lock.acquire() self._lock.acquire()
try: try:
if key in self._responses: if key in self._responses:
@ -272,7 +298,7 @@ class DeduplicatingService(Service):
if key in self._waiters: if key in self._waiters:
self._waiters[key].append(msg) self._waiters[key].append(msg)
return self.NO_REPLY return Service.NO_REPLY
self._waiters[key] = [msg] self._waiters[key] = [msg]
finally: finally:
@ -289,7 +315,37 @@ class DeduplicatingService(Service):
e = sys.exc_info()[1] e = sys.exc_info()[1]
self._produce_response(key, mitogen.core.CallError(e)) self._produce_response(key, mitogen.core.CallError(e))
return self.NO_REPLY return Service.NO_REPLY
class Service(object):
#: Sentinel object to suppress reply generation, since returning ``None``
#: will trigger a response message containing the pickled ``None``.
NO_REPLY = object()
invoker_class = Invoker
@classmethod
def name(cls):
return '%s.%s' % (cls.__module__, cls.__name__)
def __init__(self, router):
self.router = router
self.select = mitogen.select.Select()
def __repr__(self):
return '%s()' % (self.__class__.__name__,)
def on_message(self, recv, msg):
"""
Called when a message arrives on any of :attr:`select`'s registered
receivers.
"""
def on_shutdown(self):
"""
Called by Pool.shutdown() once the last worker thread has exitted.
"""
class Pool(object): class Pool(object):
@ -299,7 +355,7 @@ class Pool(object):
Internally this is implemented by subscribing every :py:class:`Service`'s Internally this is implemented by subscribing every :py:class:`Service`'s
:py:class:`mitogen.core.Receiver` using a single :py:class:`mitogen.core.Receiver` using a single
:py:class:`mitogen.master.Select`, then arranging for every thread to :py:class:`mitogen.select.Select`, then arranging for every thread to
consume messages delivered to that select. consume messages delivered to that select.
In this way the threads are fairly shared by all available services, and no In this way the threads are fairly shared by all available services, and no
@ -308,21 +364,33 @@ class Pool(object):
There is no penalty for exposing large numbers of services; the list of There is no penalty for exposing large numbers of services; the list of
exposed services could even be generated dynamically in response to your exposed services could even be generated dynamically in response to your
program's configuration or its input data. program's configuration or its input data.
:param mitogen.core.Router router:
Router to listen for ``CALL_SERVICE`` messages on.
:param list services:
Initial list of services to register.
""" """
activator_class = Activator
def __init__(self, router, services, size=1): def __init__(self, router, services, size=1):
assert size > 0
self.router = router self.router = router
self.services = list(services) self._activator = self.activator_class()
self.size = size self._receiver = mitogen.core.Receiver(
self._select = mitogen.master.Select( router=router,
receivers=[ handle=mitogen.core.CALL_SERVICE,
service.recv
for service in self.services
],
oneshot=False,
) )
self._select = mitogen.select.Select(oneshot=False)
self._select.add(self._receiver)
#: Serialize service construction.
self._lock = threading.Lock()
self._func_by_recv = {self._receiver: self._on_service_call}
self._invoker_by_name = {}
for service in services:
self.add(service)
self._threads = [] self._threads = []
for x in xrange(size): for x in range(size):
thread = threading.Thread( thread = threading.Thread(
name='mitogen.service.Pool.%x.worker-%d' % (id(self), x,), name='mitogen.service.Pool.%x.worker-%d' % (id(self), x,),
target=self._worker_main, target=self._worker_main,
@ -330,6 +398,19 @@ class Pool(object):
thread.start() thread.start()
self._threads.append(thread) self._threads.append(thread)
@property
def size(self):
return len(self._threads)
def add(self, service):
name = service.name()
if name in self._invoker_by_name:
raise Error('service named %r already registered' % (name,))
assert service.select not in self._func_by_recv
invoker = service.invoker_class(service)
self._invoker_by_name[name] = invoker
self._func_by_recv[service.select] = service.on_message
closed = False closed = False
def stop(self): def stop(self):
@ -337,8 +418,45 @@ class Pool(object):
self._select.close() self._select.close()
for th in self._threads: for th in self._threads:
th.join() th.join()
for service in self.services: for invoker in self._invoker_by_name.itervalues():
service.on_shutdown() invoker.service.on_shutdown()
def get_invoker(self, name, msg):
self._lock.acquire()
try:
invoker = self._invoker_by_name.get(name)
if not invoker:
service = self._activator.activate(self, name, msg)
invoker = service.invoker_class(service)
self._invoker_by_name[name] = invoker
finally:
self._lock.release()
return invoker
def _validate(self, msg):
tup = msg.unpickle(throw=False)
if not (isinstance(tup, tuple) and
len(tup) == 3 and
isinstance(tup[0], basestring) and
isinstance(tup[1], basestring) and
isinstance(tup[2], dict)):
raise mitogen.core.CallError('Invalid message format.')
def _on_service_call(self, recv, msg):
self._validate(msg)
service_name, method_name, kwargs = msg.unpickle()
try:
invoker = self.get_invoker(service_name, msg)
return invoker.invoke(method_name, kwargs, msg)
except mitogen.core.CallError:
e = sys.exc_info()[1]
LOG.warning('%r: call error: %s: %s', self, msg, e)
msg.reply(e)
except Exception:
LOG.exception('While invoking %r._invoke()', self)
e = sys.exc_info()[1]
msg.reply(mitogen.core.CallError(e))
def _worker_run(self): def _worker_run(self):
while not self.closed: while not self.closed:
@ -349,11 +467,11 @@ class Pool(object):
LOG.info('%r: channel or latch closed, exitting: %s', self, e) LOG.info('%r: channel or latch closed, exitting: %s', self, e)
return return
service = msg.receiver.service func = self._func_by_recv[msg.receiver]
try: try:
service.on_receive_message(msg) func(msg.receiver, msg)
except Exception: except Exception:
LOG.exception('While handling %r using %r', msg, service) LOG.exception('While handling %r using %r', msg, func)
def _worker_main(self): def _worker_main(self):
try: try:
@ -367,19 +485,6 @@ class Pool(object):
th = threading.currentThread() th = threading.currentThread()
return 'mitogen.service.Pool(%#x, size=%d, th=%r)' % ( return 'mitogen.service.Pool(%#x, size=%d, th=%r)' % (
id(self), id(self),
self.size, len(self._threads),
th.name, th.name,
) )
def call_async(context, handle, method, kwargs=None):
LOG.debug('service.call_async(%r, %r, %r, %r)',
context, handle, method, kwargs)
pair = (method, kwargs or {})
msg = mitogen.core.Message.pickled(pair, handle=handle)
return context.send_async(msg)
def call(context, handle, method, kwargs):
recv = call_async(context, handle, method, kwargs)
return recv.get().unpickle()

@ -6,5 +6,5 @@ omit =
mitogen/compat/* mitogen/compat/*
[flake8] [flake8]
ignore = E402,E128,W503 ignore = E402,E128,W503,E731
exclude = mitogen/compat exclude = mitogen/compat

@ -1,5 +1,5 @@
[defaults] [defaults]
inventory = hosts inventory = hosts,lib/inventory
gathering = explicit gathering = explicit
strategy_plugins = ../../ansible_mitogen/plugins/strategy strategy_plugins = ../../ansible_mitogen/plugins/strategy
action_plugins = lib/action action_plugins = lib/action

@ -0,0 +1,6 @@
# Command line.
````
time LANG=C LC_ALL=C ANSIBLE_STRATEGY=mitogen MITOGEN_GCLOUD_GROUP=debops_all_hosts debops common
```

@ -0,0 +1,3 @@
[defaults]
inventory = hosts
retry_files_enabled = False

@ -0,0 +1,86 @@
- hosts: controller
tasks:
- shell: "rsync -a ~/.ssh {{inventory_hostname}}:"
connection: local
- lineinfile:
line: "net.ipv4.ip_forward=1"
path: /etc/sysctl.conf
register: sysctl_conf
become: true
- shell: /sbin/sysctl -p
when: sysctl_conf.changed
become: true
- shell: |
iptables -t nat -F;
iptables -t nat -X;
iptables -t nat -A POSTROUTING -j MASQUERADE;
become: true
- apt: name={{item}} state=installed
become: true
with_items:
- python-pip
- python-virtualenv
- strace
- libldap2-dev
- libsasl2-dev
- build-essential
- git
- git:
dest: ~/mitogen
repo: https://github.com/dw/mitogen.git
version: dmw
- git:
dest: ~/ansible
repo: https://github.com/dw/ansible.git
version: lazy-vars
- pip:
virtualenv: ~/venv
requirements: ~/mitogen/dev_requirements.txt
- pip:
virtualenv: ~/venv
editable: true
name: ~/mitogen
- pip:
virtualenv: ~/venv
editable: true
name: ~/ansible
- lineinfile:
line: "source $HOME/venv/bin/activate"
path: ~/.profile
- name: debops-init
shell: ~/venv/bin/debops-init ~/prj
args:
creates: ~/prj
- name: grpvars
copy:
dest: "{{ansible_user_dir}}/prj/ansible/inventory/group_vars/all/dhparam.yml"
content: |
---
dhparam__bits: [ '256' ]
- blockinfile:
path: ~/prj/.debops.cfg
insertafter: '\[ansible defaults\]'
block: |
strategy_plugins = {{ansible_user_dir}}/mitogen/ansible_mitogen/plugins/strategy
forks = 50
host_key_checking = False
- file:
path: ~/prj/ansible/inventory/gcloud.py
state: link
src: ~/mitogen/tests/ansible/lib/inventory/gcloud.py

@ -0,0 +1,2 @@
[controller]
35.206.145.240

@ -1 +1,2 @@
- import_playbook: timeouts.yml - import_playbook: timeouts.yml
- import_playbook: variables.yml

@ -0,0 +1,114 @@
# These tests don't run on vanilla because ssh-askpass wants to run for
# whatever reason.
- name: integration/ssh/variables.yml
hosts: test-targets
connection: local
vars:
# ControlMaster has the effect of caching the previous auth to the same
# account, so disable it. Can't disable with ControlMaster no since that
# already appears on command line, so override ControlPath with junk.
ansible_ssh_common_args: |
-o "ControlPath /tmp/mitogen-ansible-test-{{18446744073709551615|random}}"
tasks:
- name: ansible_ssh_user
# Remaining tests just use "ansible_user".
shell: >
ANSIBLE_STRATEGY=mitogen_linear
ANSIBLE_SSH_ARGS=""
ansible -m shell -a whoami -i "{{inventory_file}}" test-targets
-e ansible_ssh_user=mitogen__has_sudo
-e ansible_ssh_pass=has_sudo_password
register: out
when: is_mitogen
- shell: >
ANSIBLE_STRATEGY=mitogen_linear
ANSIBLE_SSH_ARGS=""
ansible -m shell -a whoami -i "{{inventory_file}}" test-targets
-e ansible_ssh_user=mitogen__has_sudo
-e ansible_ssh_pass=wrong_password
register: out
ignore_errors: true
when: is_mitogen
- assert:
that: out.rc == 4 # unreachable
when: is_mitogen
- name: ansible_ssh_pass
shell: >
ANSIBLE_STRATEGY=mitogen_linear
ANSIBLE_SSH_ARGS=""
ansible -m shell -a whoami -i "{{inventory_file}}" test-targets
-e ansible_user=mitogen__has_sudo
-e ansible_ssh_pass=has_sudo_password
register: out
when: is_mitogen
- shell: >
ANSIBLE_STRATEGY=mitogen_linear
ANSIBLE_SSH_ARGS=""
ansible -m shell -a whoami -i "{{inventory_file}}" test-targets
-e ansible_user=mitogen__has_sudo
-e ansible_ssh_pass=wrong_password
register: out
ignore_errors: true
when: is_mitogen
- assert:
that: out.rc == 4 # unreachable
when: is_mitogen
- name: ansible_password
shell: >
ANSIBLE_STRATEGY=mitogen_linear
ANSIBLE_SSH_ARGS=""
ansible -m shell -a whoami -i "{{inventory_file}}" test-targets
-e ansible_user=mitogen__has_sudo
-e ansible_password=has_sudo_password
register: out
when: is_mitogen
- shell: >
ANSIBLE_STRATEGY=mitogen_linear
ANSIBLE_SSH_ARGS=""
ansible -m shell -a whoami -i "{{inventory_file}}" test-targets
-e ansible_user=mitogen__has_sudo
-e ansible_password=wrong_password
register: out
ignore_errors: true
when: is_mitogen
- assert:
that: out.rc == 4 # unreachable
when: is_mitogen
- name: ansible_ssh_private_key_file
shell: >
ANSIBLE_STRATEGY=mitogen_linear
ANSIBLE_SSH_ARGS=""
ansible -m shell -a whoami -i "{{inventory_file}}" test-targets
-e ansible_user=mitogen__has_sudo_pubkey
-e ansible_ssh_private_key_file=../data/docker/mitogen__has_sudo_pubkey.key
register: out
when: is_mitogen
- shell: >
ANSIBLE_STRATEGY=mitogen_linear
ANSIBLE_SSH_ARGS=""
ansible -m shell -a whoami -i "{{inventory_file}}" test-targets
-e ansible_user=mitogen__has_sudo
-e ansible_ssh_private_key_file=/dev/null
register: out
ignore_errors: true
when: is_mitogen
- assert:
that: out.rc == 4 # unreachable
when: is_mitogen

@ -25,10 +25,8 @@ class ActionModule(ActionBase):
self._connection._connect() self._connection._connect()
return { return {
'changed': True, 'changed': True,
'result': mitogen.service.call( 'result': self._connection.parent.call_service(
context=self._connection.parent, service_name='ansible_mitogen.services.ContextService',
handle=ansible_mitogen.services.ContextService.handle, method_name='shutdown_all',
method='shutdown_all',
kwargs={}
) )
} }

@ -1,15 +1,20 @@
#!/usr/bin/env python #!/usr/bin/env python
import json
import os import os
import sys import sys
if (not os.environ.get('MITOGEN_GCLOUD_GROUP')) or any('--host' in s for s in sys.argv):
sys.stdout.write('{}')
sys.exit(0)
import googleapiclient.discovery import googleapiclient.discovery
def main(): def main():
project = 'mitogen-load-testing' project = 'mitogen-load-testing'
zone = 'asia-south1-c' zone = 'europe-west1-d'
group_name = 'micro-debian9' group_name = 'target'
client = googleapiclient.discovery.build('compute', 'v1') client = googleapiclient.discovery.build('compute', 'v1')
resp = client.instances().list(project=project, zone=zone).execute() resp = client.instances().list(project=project, zone=zone).execute()
@ -24,12 +29,20 @@ def main():
#for config in interface['accessConfigs'] #for config in interface['accessConfigs']
) )
print 'Addresses:', ips sys.stderr.write('Addresses: %s\n' % (ips,))
os.execvp('ansible-playbook', [ gname = os.environ['MITOGEN_GCLOUD_GROUP']
'anisble-playbook', groups = {
'--user=dw', gname: {
'--inventory-file=' + ','.join(ips) + ',' 'hosts': ips
] + sys.argv[1:]) }
}
for i in 1, 10, 20, 50, 100:
groups['%s-%s' % (gname, i)] = {
'hosts': ips[:i]
}
sys.stdout.write(json.dumps(groups, indent=4))
if __name__ == '__main__': if __name__ == '__main__':

@ -20,12 +20,19 @@
# #
# Hashed passwords. # Hashed passwords.
# #
- name: Create Mitogen test group
group:
name: "mitogen__group"
- name: Create Mitogen test users - name: Create Mitogen test users
user: user:
name: "mitogen__{{item}}" name: "mitogen__{{item}}"
shell: /bin/bash shell: /bin/bash
groups: mitogen__group
password: "{{ (item + '_password') | password_hash('sha256') }}" password: "{{ (item + '_password') | password_hash('sha256') }}"
with_items: with_items:
- has_sudo
- has_sudo_pubkey
- require_tty - require_tty
- pw_required - pw_required
- require_tty_pw_required - require_tty_pw_required
@ -47,8 +54,11 @@
user: user:
name: "mitogen__{{item}}" name: "mitogen__{{item}}"
shell: /bin/bash shell: /bin/bash
groups: mitogen__group
password: "{{item}}_password" password: "{{item}}_password"
with_items: with_items:
- has_sudo
- has_sudo_pubkey
- require_tty - require_tty
- pw_required - pw_required
- require_tty_pw_required - require_tty_pw_required
@ -98,6 +108,20 @@
- bashrc - bashrc
- profile - profile
- name: Install pubkey for one account
file:
path: ~mitogen__has_sudo_pubkey/.ssh
state: directory
mode: go=
owner: mitogen__has_sudo_pubkey
- name: Install pubkey for one account
copy:
dest: ~mitogen__has_sudo_pubkey/.ssh/authorized_keys
src: ../data/docker/mitogen__has_sudo_pubkey.key.pub
mode: go=
owner: mitogen__has_sudo_pubkey
- name: Require a TTY for two accounts - name: Require a TTY for two accounts
lineinfile: lineinfile:
path: /etc/sudoers path: /etc/sudoers

@ -22,6 +22,11 @@ class ConstructorTest(unittest2.TestCase):
e = self.klass(ve) e = self.klass(ve)
self.assertEquals(e[0], 'exceptions.ValueError: eek') self.assertEquals(e[0], 'exceptions.ValueError: eek')
def test_form_base_exc(self):
ve = SystemExit('eek')
e = self.klass(ve)
self.assertEquals(e[0], 'exceptions.SystemExit: eek')
def test_from_exc_tb(self): def test_from_exc_tb(self):
try: try:
raise ValueError('eek') raise ValueError('eek')

@ -1,10 +0,0 @@
import mitogen.core
import mitogen.parent
@mitogen.core.takes_econtext
def allocate_an_id(econtext):
mitogen.parent.upgrade_router(econtext)
return econtext.router.allocate_id()

@ -70,6 +70,13 @@ class ForkTest(testlib.RouterMixin, unittest2.TestCase):
context = self.router.fork() context = self.router.fork()
self.assertEqual(2, context.call(exercise_importer, 1)) self.assertEqual(2, context.call(exercise_importer, 1))
def test_on_start(self):
recv = mitogen.core.Receiver(self.router)
def on_start(econtext):
sender = mitogen.core.Sender(econtext.parent, recv.handle)
sender.send(123)
context = self.router.fork(on_start=on_start)
self.assertEquals(123, recv.get().unpickle())
class DoubleChildTest(testlib.RouterMixin, unittest2.TestCase): class DoubleChildTest(testlib.RouterMixin, unittest2.TestCase):
def test_okay(self): def test_okay(self):

@ -2,7 +2,15 @@
import unittest2 import unittest2
import testlib import testlib
import id_allocation
import mitogen.core
import mitogen.parent
@mitogen.core.takes_econtext
def allocate_an_id(econtext):
mitogen.parent.upgrade_router(econtext)
return econtext.router.allocate_id()
class SlaveTest(testlib.RouterMixin, testlib.TestCase): class SlaveTest(testlib.RouterMixin, testlib.TestCase):
@ -12,11 +20,11 @@ class SlaveTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(1, context.context_id) self.assertEquals(1, context.context_id)
# First call from slave allocates a block (2..1001) # First call from slave allocates a block (2..1001)
id_ = context.call(id_allocation.allocate_an_id) id_ = context.call(allocate_an_id)
self.assertEqual(id_, 2) self.assertEqual(id_, 2)
# Second call from slave allocates from block (3..1001) # Second call from slave allocates from block (3..1001)
id_ = context.call(id_allocation.allocate_an_id) id_ = context.call(allocate_an_id)
self.assertEqual(id_, 3) self.assertEqual(id_, 3)
# Subsequent master allocation does not collide # Subsequent master allocation does not collide

@ -1,6 +1,7 @@
import errno import errno
import os import os
import subprocess import subprocess
import sys
import tempfile import tempfile
import time import time

@ -1,13 +1,13 @@
import unittest2 import unittest2
import mitogen.master import mitogen.select
import testlib import testlib
class AddTest(testlib.RouterMixin, testlib.TestCase): class AddTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select klass = mitogen.select.Select
def test_receiver(self): def test_receiver(self):
recv = mitogen.core.Receiver(self.router) recv = mitogen.core.Receiver(self.router)
@ -47,7 +47,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
def test_subselect_loop_direct(self): def test_subselect_loop_direct(self):
select = self.klass() select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError, exc = self.assertRaises(mitogen.select.Error,
lambda: select.add(select)) lambda: select.add(select))
self.assertEquals(str(exc), self.klass.loop_msg) self.assertEquals(str(exc), self.klass.loop_msg)
@ -58,7 +58,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
s0.add(s1) s0.add(s1)
s1.add(s2) s1.add(s2)
exc = self.assertRaises(mitogen.master.SelectError, exc = self.assertRaises(mitogen.select.Error,
lambda: s2.add(s0)) lambda: s2.add(s0))
self.assertEquals(str(exc), self.klass.loop_msg) self.assertEquals(str(exc), self.klass.loop_msg)
@ -66,7 +66,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass() select = self.klass()
recv = mitogen.core.Receiver(self.router) recv = mitogen.core.Receiver(self.router)
select.add(recv) select.add(recv)
exc = self.assertRaises(mitogen.master.SelectError, exc = self.assertRaises(mitogen.select.Error,
lambda: select.add(recv)) lambda: select.add(recv))
self.assertEquals(str(exc), self.klass.owned_msg) self.assertEquals(str(exc), self.klass.owned_msg)
@ -74,18 +74,18 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass() select = self.klass()
select2 = self.klass() select2 = self.klass()
select.add(select2) select.add(select2)
exc = self.assertRaises(mitogen.master.SelectError, exc = self.assertRaises(mitogen.select.Error,
lambda: select.add(select2)) lambda: select.add(select2))
self.assertEquals(str(exc), self.klass.owned_msg) self.assertEquals(str(exc), self.klass.owned_msg)
class RemoveTest(testlib.RouterMixin, testlib.TestCase): class RemoveTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select klass = mitogen.select.Select
def test_empty(self): def test_empty(self):
select = self.klass() select = self.klass()
recv = mitogen.core.Receiver(self.router) recv = mitogen.core.Receiver(self.router)
exc = self.assertRaises(mitogen.master.SelectError, exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(recv)) lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg) self.assertEquals(str(exc), self.klass.not_present_msg)
@ -94,7 +94,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
recv = mitogen.core.Receiver(self.router) recv = mitogen.core.Receiver(self.router)
recv2 = mitogen.core.Receiver(self.router) recv2 = mitogen.core.Receiver(self.router)
select.add(recv2) select.add(recv2)
exc = self.assertRaises(mitogen.master.SelectError, exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(recv)) lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg) self.assertEquals(str(exc), self.klass.not_present_msg)
@ -108,7 +108,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
class CloseTest(testlib.RouterMixin, testlib.TestCase): class CloseTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select klass = mitogen.select.Select
def test_empty(self): def test_empty(self):
select = self.klass() select = self.klass()
@ -147,7 +147,7 @@ class CloseTest(testlib.RouterMixin, testlib.TestCase):
class EmptyTest(testlib.RouterMixin, testlib.TestCase): class EmptyTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select klass = mitogen.select.Select
def test_no_receivers(self): def test_no_receivers(self):
select = self.klass() select = self.klass()
@ -172,7 +172,7 @@ class EmptyTest(testlib.RouterMixin, testlib.TestCase):
class IterTest(testlib.RouterMixin, testlib.TestCase): class IterTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select klass = mitogen.select.Select
def test_empty(self): def test_empty(self):
select = self.klass() select = self.klass()
@ -187,7 +187,7 @@ class IterTest(testlib.RouterMixin, testlib.TestCase):
class OneShotTest(testlib.RouterMixin, testlib.TestCase): class OneShotTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select klass = mitogen.select.Select
def test_true_removed_after_get(self): def test_true_removed_after_get(self):
recv = mitogen.core.Receiver(self.router) recv = mitogen.core.Receiver(self.router)
@ -212,17 +212,17 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
class GetTest(testlib.RouterMixin, testlib.TestCase): class GetTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select klass = mitogen.select.Select
def test_no_receivers(self): def test_no_receivers(self):
select = self.klass() select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError, exc = self.assertRaises(mitogen.select.Error,
lambda: select.get()) lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg) self.assertEquals(str(exc), self.klass.empty_msg)
def test_timeout_no_receivers(self): def test_timeout_no_receivers(self):
select = self.klass() select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError, exc = self.assertRaises(mitogen.select.Error,
lambda: select.get(timeout=1.0)) lambda: select.get(timeout=1.0))
self.assertEquals(str(exc), self.klass.empty_msg) self.assertEquals(str(exc), self.klass.empty_msg)

@ -0,0 +1,93 @@
import unittest2
import mitogen.core
import mitogen.service
import testlib
class MyService(mitogen.service.Service):
def __init__(self, router):
super(MyService, self).__init__(router)
self._counter = 0
@mitogen.service.expose(policy=mitogen.service.AllowParents())
def get_id(self):
self._counter += 1
return self._counter, id(self)
@mitogen.service.expose(policy=mitogen.service.AllowParents())
def privileged_op(self):
return 'privileged!'
@mitogen.service.expose(policy=mitogen.service.AllowAny())
def unprivileged_op(self):
return 'unprivileged!'
class MyService2(MyService):
"""
A uniquely named service that lets us test framework activation and class
activation separately.
"""
def call_service_in(context, service_name, method_name):
return context.call_service(service_name, method_name)
class ActivationTest(testlib.RouterMixin, testlib.TestCase):
def test_parent_can_activate(self):
l1 = self.router.fork()
counter, id_ = l1.call_service(MyService, 'get_id')
self.assertEquals(1, counter)
self.assertTrue(isinstance(id_, int))
def test_sibling_cannot_activate_framework(self):
l1 = self.router.fork()
l2 = self.router.fork()
exc = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id'))
self.assertTrue(mitogen.core.Router.refused_msg in exc.args[0])
def test_sibling_cannot_activate_service(self):
l1 = self.router.fork()
l2 = self.router.fork()
l1.call_service(MyService, 'get_id') # force framework activation
exc = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id'))
msg = mitogen.service.Activator.not_active_msg % (MyService2.name(),)
self.assertTrue(msg in exc.args[0])
def test_activates_only_once(self):
l1 = self.router.fork()
counter, id_ = l1.call_service(MyService, 'get_id')
counter2, id_2 = l1.call_service(MyService, 'get_id')
self.assertEquals(1, counter)
self.assertEquals(2, counter2)
self.assertEquals(id_, id_2)
class PermissionTest(testlib.RouterMixin, testlib.TestCase):
def test_sibling_unprivileged_ok(self):
l1 = self.router.fork()
l1.call_service(MyService, 'get_id')
l2 = self.router.fork()
self.assertEquals('unprivileged!',
l2.call(call_service_in, l1, MyService.name(), 'unprivileged_op'))
def test_sibling_privileged_bad(self):
l1 = self.router.fork()
l1.call_service(MyService, 'get_id')
l2 = self.router.fork()
exc = self.assertRaises(mitogen.core.CallError, lambda:
l2.call(call_service_in, l1, MyService.name(), 'privileged_op'))
msg = mitogen.service.Invoker.unauthorized_msg % (
'privileged_op',
MyService.name(),
)
self.assertTrue(msg in exc.args[0])
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save