Merge remote-tracking branch 'origin/dmw'

pull/372/head
David Wilson 6 years ago
commit 52a121d4aa

@ -31,7 +31,6 @@ from __future__ import unicode_literals
import logging
import os
import shlex
import stat
import time
@ -333,11 +332,16 @@ def config_from_play_context(transport, inventory_name, connection):
'become_pass': connection._play_context.become_pass,
'password': connection._play_context.password,
'port': connection._play_context.port,
'python_path': parse_python_path(connection.python_path),
'python_path': parse_python_path(
connection.get_task_var('ansible_python_interpreter',
default='/usr/bin/python')
),
'private_key_file': connection._play_context.private_key_file,
'ssh_executable': connection._play_context.ssh_executable,
'timeout': connection._play_context.timeout,
'ansible_ssh_timeout': connection.ansible_ssh_timeout,
'ansible_ssh_timeout':
connection.get_task_var('ansible_ssh_timeout',
default=C.DEFAULT_TIMEOUT),
'ssh_args': [
mitogen.core.to_text(term)
for s in (
@ -356,12 +360,18 @@ def config_from_play_context(transport, inventory_name, connection):
)
for term in ansible.utils.shlex.shlex_split(s or '')
],
'mitogen_via': connection.mitogen_via,
'mitogen_kind': connection.mitogen_kind,
'mitogen_docker_path': connection.mitogen_docker_path,
'mitogen_lxc_info_path': connection.mitogen_lxc_info_path,
'mitogen_machinectl_path': connection.mitogen_machinectl_path,
'mitogen_ssh_debug_level': connection.mitogen_ssh_debug_level,
'mitogen_via':
connection.get_task_var('mitogen_via'),
'mitogen_kind':
connection.get_task_var('mitogen_kind'),
'mitogen_docker_path':
connection.get_task_var('mitogen_docker_path'),
'mitogen_lxc_info_path':
connection.get_task_var('mitogen_lxc_info_path'),
'mitogen_machinectl_path':
connection.get_task_var('mitogen_machinectl_path'),
'mitogen_ssh_debug_level':
connection.get_task_var('mitogen_ssh_debug_level'),
}
@ -393,6 +403,34 @@ def config_from_hostvars(transport, inventory_name, connection,
})
class CallChain(mitogen.parent.CallChain):
call_aborted_msg = (
'Mitogen was disconnected from the remote environment while a call '
'was in-progress. If you feel this is in error, please file a bug. '
'Original error was: %s'
)
def _rethrow(self, recv):
try:
return recv.get().unpickle()
except mitogen.core.ChannelError as e:
raise ansible.errors.AnsibleConnectionFailure(
self.call_aborted_msg % (e,)
)
def call(self, func, *args, **kwargs):
"""
Like :meth:`mitogen.parent.CallChain.call`, but log timings.
"""
t0 = time.time()
try:
recv = self.call_async(func, *args, **kwargs)
return self._rethrow(recv)
finally:
LOG.debug('Call took %d ms: %r', 1000 * (time.time() - t0),
mitogen.parent.CallSpec(func, args, kwargs))
class Connection(ansible.plugins.connection.ConnectionBase):
#: mitogen.master.Broker for this worker.
broker = None
@ -408,50 +446,47 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: reached via become.
context = None
#: mitogen.parent.Context for the login account on the target. This is
#: always the login account, even when become=True.
#: Context for the login account on the target. This is always the login
#: account, even when become=True.
login_context = None
#: mitogen.parent.Context connected to the fork parent process in the
#: target user account.
fork_context = None
#: Only sudo, su, and doas are supported for now.
become_methods = ['sudo', 'su', 'doas']
#: Dict containing init_child() return vaue as recorded at startup by
#: ContextService. Contains:
#:
#: fork_context: Context connected to the fork parent : process in the
#: target account.
#: home_dir: Target context's home directory.
#: temp_dir: A writeable temporary directory managed by the
#: target, automatically destroyed at shutdown.
init_child_result = None
#: A private temporary directory destroyed during :meth:`close`, or
#: automatically during shutdown if :meth:`close` failed or was never
#: called.
temp_dir = None
#: A :class:`mitogen.parent.CallChain` to use for calls made to the target
#: account, to ensure subsequent calls fail if pipelined directory creation
#: or file transfer fails. This eliminates roundtrips when a call is likely
#: to succeed, and ensures subsequent actions will fail with the original
#: exception if the pipelined call failed.
chain = None
#
# Note: any of the attributes below may be :data:`None` if the connection
# plugin was constructed directly by a non-cooperative action, such as in
# the case of the synchronize module.
#
#: Set to 'ansible_python_interpreter' by on_action_run().
python_path = None
#: Set to 'ansible_ssh_timeout' by on_action_run().
ansible_ssh_timeout = None
#: Set to 'mitogen_via' by on_action_run().
mitogen_via = None
#: Set to 'mitogen_kind' by on_action_run().
mitogen_kind = None
#: Set to 'mitogen_docker_path' by on_action_run().
mitogen_docker_path = None
#: Set to 'mitogen_lxc_info_path' by on_action_run().
mitogen_lxc_info_path = None
#: Set to 'mitogen_lxc_info_path' by on_action_run().
mitogen_machinectl_path = None
#: Set to 'mitogen_ssh_debug_level' by on_action_run().
mitogen_ssh_debug_level = None
#: Set to 'inventory_hostname' by on_action_run().
#: Set to the host name as it appears in inventory by on_action_run().
inventory_hostname = None
#: Set to task_vars by on_action_run().
_task_vars = None
#: Set to 'hostvars' by on_action_run()
host_vars = None
@ -463,12 +498,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: matching vanilla Ansible behaviour.
loader_basedir = None
#: Set after connection to the target context's home directory.
home_dir = None
#: Set after connection to the target context's home directory.
_temp_dir = None
def __init__(self, play_context, new_stdin, **kwargs):
assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
'Mitogen connection types may only be instantiated '
@ -500,26 +529,22 @@ class Connection(ansible.plugins.connection.ConnectionBase):
:param str loader_basedir:
Loader base directory; see :attr:`loader_basedir`.
"""
self.ansible_ssh_timeout = task_vars.get('ansible_ssh_timeout',
C.DEFAULT_TIMEOUT)
self.python_path = task_vars.get('ansible_python_interpreter',
'/usr/bin/python')
self.mitogen_via = task_vars.get('mitogen_via')
self.mitogen_kind = task_vars.get('mitogen_kind')
self.mitogen_docker_path = task_vars.get('mitogen_docker_path')
self.mitogen_lxc_info_path = task_vars.get('mitogen_lxc_info_path')
self.mitogen_machinectl_path = task_vars.get('mitogen_machinectl_path')
self.mitogen_ssh_debug_level = task_vars.get('mitogen_ssh_debug_level')
self.inventory_hostname = task_vars['inventory_hostname']
self._task_vars = task_vars
self.host_vars = task_vars['hostvars']
self.delegate_to_hostname = delegate_to_hostname
self.loader_basedir = loader_basedir
self.close(new_task=True)
def get_task_var(self, key, default=None):
if self._task_vars and key in self._task_vars:
return self._task_vars[key]
return default
@property
def homedir(self):
self._connect()
return self.home_dir
return self.init_child_result['home_dir']
@property
def connected(self):
@ -535,7 +560,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
if isinstance(via_vars, jinja2.runtime.Undefined):
raise ansible.errors.AnsibleConnectionFailure(
self.unknown_via_msg % (
self.mitogen_via,
via_spec,
inventory_name,
)
)
@ -602,7 +627,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
transport=self._play_context.connection,
inventory_name=self.delegate_to_hostname,
connection=self,
hostvars=self.host_vars[self._play_context.delegate_to],
hostvars=self.host_vars[self.delegate_to_hostname],
become_user=(self._play_context.become_user
if self._play_context.become
else None),
@ -641,18 +666,22 @@ class Connection(ansible.plugins.connection.ConnectionBase):
raise ansible.errors.AnsibleConnectionFailure(dct['msg'])
self.context = dct['context']
self.chain = CallChain(self.context, pipelined=True)
if self._play_context.become:
self.login_context = dct['via']
else:
self.login_context = self.context
self.fork_context = dct['init_child_result']['fork_context']
self.home_dir = dct['init_child_result']['home_dir']
self._temp_dir = dct['init_child_result']['temp_dir']
self.init_child_result = dct['init_child_result']
def get_temp_dir(self):
self._connect()
return self._temp_dir
def _init_temp_dir(self):
"""
"""
self.temp_dir = os.path.join(
self.init_child_result['temp_dir'],
'worker-%d-%x' % (os.getpid(), id(self))
)
self.get_chain().call_no_reply(os.mkdir, self.temp_dir)
def _connect(self):
"""
@ -671,6 +700,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self._connect_broker()
stack = self._build_stack()
self._connect_stack(stack)
self._init_temp_dir()
def close(self, new_task=False):
"""
@ -679,70 +709,45 @@ class Connection(ansible.plugins.connection.ConnectionBase):
multiple times.
"""
if self.context:
self.chain.reset()
# No pipelining to ensure exception is logged on failure.
self.context.call_no_reply(ansible_mitogen.target.prune_tree,
self.temp_dir)
self.parent.call_service(
service_name='ansible_mitogen.services.ContextService',
method_name='put',
context=self.context
)
self.temp_dir = None
self.context = None
self.fork_context = None
self.login_context = None
self.init_child_result = None
self.chain = None
if self.broker and not new_task:
self.broker.shutdown()
self.broker.join()
self.broker = None
self.router = None
def call_async(self, func, *args, **kwargs):
def get_chain(self, use_login=False, use_fork=False):
"""
Start a function call to the target.
:param bool use_login_context:
If present and :data:`True`, send the call to the login account
context rather than the optional become user context.
:param bool no_reply:
If present and :data:`True`, send the call with no ``reply_to``
header, causing the context to execute it entirely asynchronously,
and to log any exception thrown. This allows avoiding a roundtrip
in places where the outcome of a call is highly likely to succeed,
and subsequent actions will fail regardless with a meaningful
exception if the no_reply call failed.
Return the :class:`mitogen.parent.CallChain` to use for executing
function calls.
:returns:
:class:`mitogen.core.Receiver` that receives the function call result.
:param bool use_login:
If :data:`True`, always return the chain for the login account
rather than any active become user.
:param bool use_fork:
If :data:`True`, return the chain for the fork parent.
:returns mitogen.parent.CallChain:
"""
self._connect()
if kwargs.pop('use_login_context', None):
call_context = self.login_context
else:
call_context = self.context
if kwargs.pop('no_reply', None):
return call_context.call_no_reply(func, *args, **kwargs)
else:
return call_context.call_async(func, *args, **kwargs)
def call(self, func, *args, **kwargs):
"""
Start and wait for completion of a function call in the target.
:raises mitogen.core.CallError:
The function call failed.
:returns:
Function return value.
"""
t0 = time.time()
try:
recv = self.call_async(func, *args, **kwargs)
if recv is None: # no_reply=True
return None
return recv.get().unpickle()
finally:
LOG.debug('Call took %d ms: %r', 1000 * (time.time() - t0),
mitogen.parent.CallSpec(func, args, kwargs))
if use_login:
return self.login_context.default_call_chain
if use_fork:
return self.init_child_result['fork_context'].default_call_chain
return self.chain
def create_fork_child(self):
"""
@ -753,7 +758,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
:returns:
mitogen.core.Context of the new child.
"""
return self.call(ansible_mitogen.target.create_fork_child)
return self.get_chain(use_fork=True).call(
ansible_mitogen.target.create_fork_child
)
def get_default_cwd(self):
"""
@ -806,35 +813,33 @@ class Connection(ansible.plugins.connection.ConnectionBase):
:param str out_path:
Local filesystem path to write.
"""
output = self.call(ansible_mitogen.target.read_path,
mitogen.utils.cast(in_path))
output = self.get_chain().call(
ansible_mitogen.target.read_path,
mitogen.utils.cast(in_path),
)
ansible_mitogen.target.write_path(out_path, output)
def put_data(self, out_path, data, mode=None, utimes=None):
"""
Implement put_file() by caling the corresponding ansible_mitogen.target
function in the target, transferring small files inline.
function in the target, transferring small files inline. This is
pipelined and will return immediately; failed transfers are reported as
exceptions in subsequent functon calls.
:param str out_path:
Remote filesystem path to write.
:param byte data:
File contents to put.
"""
# no_reply=True here avoids a roundrip that 99% of the time will report
# a successful response. If the file transfer fails, the target context
# will dump an exception into the logging framework, which will appear
# on console, and the missing file will cause the subsequent task step
# to fail regardless. This is safe since CALL_FUNCTION is presently
# single-threaded for each target, so subsequent steps cannot execute
# until the transfer RPC has completed.
self.call(ansible_mitogen.target.write_path,
self.get_chain().call_no_reply(
ansible_mitogen.target.write_path,
mitogen.utils.cast(out_path),
mitogen.core.Blob(data),
mode=mode,
utimes=utimes,
no_reply=True)
)
#: Maximum size of a small file before switching to streaming file
#: Maximum size of a small file before switching to streaming
#: transfer. This should really be the same as
#: mitogen.services.FileService.IO_SIZE, however the message format has
#: slightly more overhead, so just randomly subtract 4KiB.

@ -115,15 +115,6 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
)
return super(ActionModuleMixin, self).run(tmp, task_vars)
def call(self, func, *args, **kwargs):
"""
Arrange for a Python function to be called in the target context, which
should be some function from the standard library or
ansible_mitogen.target module. This junction point exists mainly as a
nice place to insert print statements during debugging.
"""
return self._connection.call(func, *args, **kwargs)
COMMAND_RESULT = {
'rc': 0,
'stdout': '',
@ -164,7 +155,10 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
target user account.
"""
LOG.debug('_remote_file_exists(%r)', path)
return self.call(os.path.exists, mitogen.utils.cast(path))
return self._connection.get_chain().call(
os.path.exists,
mitogen.utils.cast(path)
)
def _configure_module(self, module_name, module_args, task_vars=None):
"""
@ -182,12 +176,13 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
def _make_tmp_path(self, remote_user=None):
"""
Return the temporary directory created by the persistent interpreter at
startup.
Return the directory created by the Connection instance during
connection.
"""
LOG.debug('_make_tmp_path(remote_user=%r)', remote_user)
self._connection._connect()
# _make_tmp_path() is basically a global stashed away as Shell.tmpdir.
self._connection._shell.tmpdir = self._connection.get_temp_dir()
self._connection._shell.tmpdir = self._connection.temp_dir
LOG.debug('Temporary directory: %r', self._connection._shell.tmpdir)
self._cleanup_remote_tmp = True
return self._connection._shell.tmpdir
@ -241,7 +236,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
LOG.debug('_remote_chmod(%r, mode=%r, sudoable=%r)',
paths, mode, sudoable)
return self.fake_shell(lambda: mitogen.select.Select.all(
self._connection.call_async(
self._connection.get_chain().call_async(
ansible_mitogen.target.set_file_mode, path, mode
)
for path in paths
@ -254,9 +249,9 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
"""
LOG.debug('_remote_chown(%r, user=%r, sudoable=%r)',
paths, user, sudoable)
ent = self.call(pwd.getpwnam, user)
ent = self._connection.get_chain().call(pwd.getpwnam, user)
return self.fake_shell(lambda: mitogen.select.Select.all(
self._connection.call_async(
self._connection.get_chain().call_async(
os.chown, path, ent.pw_uid, ent.pw_gid
)
for path in paths
@ -284,8 +279,10 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
# ~/.ansible -> /home/dmw/.ansible
return os.path.join(self._connection.homedir, path[2:])
# ~root/.ansible -> /root/.ansible
return self.call(os.path.expanduser, mitogen.utils.cast(path),
use_login_context=not sudoable)
return self._connection.get_chain(login=(not sudoable)).call(
os.path.expanduser,
mitogen.utils.cast(path),
)
def get_task_timeout_secs(self):
"""
@ -322,7 +319,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
self._connection._connect()
if ansible.__version__ > '2.5':
module_args['_ansible_tmpdir'] = self._connection.get_temp_dir()
module_args['_ansible_tmpdir'] = self._connection.temp_dir
return ansible_mitogen.planner.invoke(
ansible_mitogen.planner.Invocation(

@ -149,6 +149,7 @@ class Planner(object):
"""
new = dict((mitogen.core.UnicodeType(k), kwargs[k])
for k in kwargs)
new.setdefault('temp_dir', self._inv.connection.temp_dir)
new.setdefault('cwd', self._inv.connection.get_default_cwd())
new.setdefault('extra_env', self._inv.connection.get_default_env())
new.setdefault('emulate_tty', True)
@ -478,7 +479,7 @@ def invoke(invocation):
response = _invoke_forked_task(invocation, planner)
else:
_propagate_deps(invocation, planner, invocation.connection.context)
response = invocation.connection.call(
response = invocation.connection.get_chain().call(
ansible_mitogen.target.run_module,
kwargs=planner.get_kwargs(),
)

@ -36,6 +36,11 @@ import socket
import sys
import time
try:
import faulthandler
except ImportError:
faulthandler = None
import mitogen
import mitogen.core
import mitogen.debug
@ -45,6 +50,7 @@ import mitogen.service
import mitogen.unix
import mitogen.utils
import ansible.constants as C
import ansible_mitogen.logging
import ansible_mitogen.services
@ -70,6 +76,38 @@ def clean_shutdown(sock):
sock.recv(1)
def getenv_int(key, default=0):
"""
Get an integer-valued environment variable `key`, if it exists and parses
as an integer, otherwise return `default`.
"""
try:
return int(os.environ.get(key, str(default)))
except ValueError:
return default
def setup_gil():
"""
Set extremely long GIL release interval to let threads naturally progress
through CPU-heavy sequences without forcing the wake of another thread that
may contend trying to run the same CPU-heavy code. For the new-style work,
this drops runtime ~33% and involuntary context switches by >80%,
essentially making threads cooperatively scheduled.
"""
try:
# Python 2.
sys.setcheckinterval(100000)
except AttributeError:
pass
try:
# Python 3.
sys.setswitchinterval(10)
except AttributeError:
pass
class MuxProcess(object):
"""
Implement a subprocess forked from the Ansible top-level, as a safe place
@ -127,6 +165,10 @@ class MuxProcess(object):
if cls.worker_sock is not None:
return
if faulthandler is not None:
faulthandler.enable()
setup_gil()
cls.unix_listener_path = mitogen.unix.make_socket_path()
cls.worker_sock, cls.child_sock = socket.socketpair()
atexit.register(lambda: clean_shutdown(cls.worker_sock))
@ -164,6 +206,15 @@ class MuxProcess(object):
# Block until the socket is closed, which happens on parent exit.
mitogen.core.io_op(self.child_sock.recv, 1)
def _enable_router_debug(self):
if 'MITOGEN_ROUTER_DEBUG' in os.environ:
self.router.enable_debug()
def _enable_stack_dumps(self):
secs = getenv_int('MITOGEN_DUMP_THREAD_STACKS', default=0)
if secs:
mitogen.debug.dump_to_logger(secs=secs)
def _setup_master(self):
"""
Construct a Router, Broker, and mitogen.unix listener
@ -176,11 +227,10 @@ class MuxProcess(object):
self.listener = mitogen.unix.Listener(
router=self.router,
path=self.unix_listener_path,
backlog=C.DEFAULT_FORKS,
)
if 'MITOGEN_ROUTER_DEBUG' in os.environ:
self.router.enable_debug()
if 'MITOGEN_DUMP_THREAD_STACKS' in os.environ:
mitogen.debug.dump_to_logger()
self._enable_router_debug()
self._enable_stack_dumps()
def _setup_services(self):
"""
@ -195,7 +245,7 @@ class MuxProcess(object):
ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.ModuleDepService(self.router),
],
size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')),
size=getenv_int('MITOGEN_POOL_SIZE', default=16),
)
LOG.debug('Service pool configured: size=%d', self.pool.size)

@ -66,9 +66,6 @@ except ImportError:
# Prevent accidental import of an Ansible module from hanging on stdin read.
import ansible.module_utils.basic
ansible.module_utils.basic._ANSIBLE_ARGS = '{}'
ansible.module_utils.basic.get_module_path = lambda: (
ansible_mitogen.target.temp_dir
)
# For tasks that modify /etc/resolv.conf, non-Debian derivative glibcs cache
# resolv.conf at startup and never implicitly reload it. Cope with that via an
@ -245,13 +242,15 @@ class Runner(object):
When :data:`True`, indicate the runner should detach the context from
its parent after setup has completed successfully.
"""
def __init__(self, module, service_context, json_args, extra_env=None,
cwd=None, env=None, econtext=None, detach=False):
def __init__(self, module, service_context, json_args, temp_dir,
extra_env=None, cwd=None, env=None, econtext=None,
detach=False):
self.module = module
self.service_context = service_context
self.econtext = econtext
self.detach = detach
self.args = json.loads(json_args)
self.temp_dir = temp_dir
self.extra_env = extra_env
self.env = env
self.cwd = cwd
@ -292,33 +291,6 @@ class Runner(object):
implementation simply restores the original environment.
"""
self._env.revert()
self._try_cleanup_temp()
def _cleanup_temp(self):
"""
Empty temp_dir in time for the next module invocation.
"""
for name in os.listdir(ansible_mitogen.target.temp_dir):
if name in ('.', '..'):
continue
path = os.path.join(ansible_mitogen.target.temp_dir, name)
LOG.debug('Deleting %r', path)
ansible_mitogen.target.prune_tree(path)
def _try_cleanup_temp(self):
"""
During broker shutdown triggered by async task timeout or loss of
connection to the parent, it is possible for prune_tree() in
target.py::_on_broker_shutdown() to run before _cleanup_temp(), so skip
cleanup if the directory or a file disappears from beneath us.
"""
try:
self._cleanup_temp()
except (IOError, OSError) as e:
if e.args[0] == errno.ENOENT:
return
raise
def _run(self):
"""
@ -431,7 +403,8 @@ class NewStyleStdio(object):
"""
Patch ansible.module_utils.basic argument globals.
"""
def __init__(self, args):
def __init__(self, args, temp_dir):
self.temp_dir = temp_dir
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
self.original_stdin = sys.stdin
@ -441,7 +414,15 @@ class NewStyleStdio(object):
ansible.module_utils.basic._ANSIBLE_ARGS = utf8(encoded)
sys.stdin = StringIO(mitogen.core.to_text(encoded))
self.original_get_path = getattr(ansible.module_utils.basic,
'get_module_path', None)
ansible.module_utils.basic.get_module_path = self._get_path
def _get_path(self):
return self.temp_dir
def revert(self):
ansible.module_utils.basic.get_module_path = self.original_get_path
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
sys.stdin = self.original_stdin
@ -485,7 +466,7 @@ class ProgramRunner(Runner):
fetched via :meth:`_get_program`.
"""
filename = self._get_program_filename()
path = os.path.join(ansible_mitogen.target.temp_dir, filename)
path = os.path.join(self.temp_dir, filename)
self.program_fp = open(path, 'wb')
self.program_fp.write(self._get_program())
self.program_fp.flush()
@ -565,7 +546,7 @@ class ArgsFileRunner(Runner):
self.args_fp = tempfile.NamedTemporaryFile(
prefix='ansible_mitogen',
suffix='-args',
dir=ansible_mitogen.target.temp_dir,
dir=self.temp_dir,
)
self.args_fp.write(utf8(self._get_args_contents()))
self.args_fp.flush()
@ -680,7 +661,7 @@ class NewStyleRunner(ScriptRunner):
def setup(self):
super(NewStyleRunner, self).setup()
self._stdio = NewStyleStdio(self.args)
self._stdio = NewStyleStdio(self.args, self.temp_dir)
# It is possible that not supplying the script filename will break some
# module, but this has never been a bug report. Instead act like an
# interpreter that had its script piped on stdin.
@ -758,7 +739,7 @@ class NewStyleRunner(ScriptRunner):
# don't want to pointlessly write the module to disk when it never
# actually needs to exist. So just pass the filename as it would exist.
mod.__file__ = os.path.join(
ansible_mitogen.target.temp_dir,
self.temp_dir,
'ansible_module_' + os.path.basename(self.path),
)

@ -269,8 +269,7 @@ class ContextService(mitogen.service.Service):
)
def _send_module_forwards(self, context):
for fullname in self.ALWAYS_PRELOAD:
self.router.responder.forward_module(context, fullname)
self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD)
_candidate_temp_dirs = None
@ -380,6 +379,12 @@ class ContextService(mitogen.service.Service):
return latch
disconnect_msg = (
'Channel was disconnected while connection attempt was in progress; '
'this may be caused by an abnormal Ansible exit, or due to an '
'unreliable target.'
)
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'stack': list
@ -407,6 +412,13 @@ class ContextService(mitogen.service.Service):
if isinstance(result, tuple): # exc_info()
reraise(*result)
via = result['context']
except mitogen.core.ChannelError:
return {
'context': None,
'init_child_result': None,
'method_name': spec['method'],
'msg': self.disconnect_msg,
}
except mitogen.core.StreamError as e:
return {
'context': None,

@ -464,9 +464,10 @@ filesystem with ``noexec`` disabled:
8. ``/usr/tmp``
9. Current working directory
As the directory is created once at startup, and its content is managed by code
running remotely, no additional network roundtrips are required to manage it
for each task requiring temporary storage.
The directory is created once at startup, and subdirectories are automatically
created and destroyed for every new task. Management of subdirectories happens
on the controller, but management of the parent directory happens entirely on
the target.
.. _ansible_process_env:
@ -832,25 +833,43 @@ except connection delegation is supported.
Debugging
---------
Diagnostics and use of the :py:mod:`logging` package output on the target
machine are usually discarded. With Mitogen, all of this is captured and
returned to the controller, where it can be viewed as desired with ``-vvv``.
Basic high level logs are produced with ``-vvv``, with logging of all IO on the
controller with ``-vvvv`` or higher.
Although use of standard IO and the logging package on the target is forwarded
to the controller, it is not possible to receive IO activity logs, as the
process of receiving those logs would would itself generate IO activity. To
receive a complete trace of every process on every machine, file-based logging
is necessary. File-based logging can be enabled by setting
``MITOGEN_ROUTER_DEBUG=1`` in your environment.
When file-based logging is enabled, one file per context will be created on the
local machine and every target machine, as ``/tmp/mitogen.<pid>.log``.
If you are experiencing a hang, ``MITOGEN_DUMP_THREAD_STACKS=1`` causes every
process on every machine to dump every thread stack into the logging framework
every 5 seconds.
Diagnostics and :py:mod:`logging` package output on targets are usually
discarded. With Mitogen, these are captured and forwarded to the controller
where they can be viewed with ``-vvv``. Basic high level logs are produced with
``-vvv``, with logging of all IO on the controller with ``-vvvv`` or higher.
While uncaptured standard IO and the logging package on targets is forwarded,
it is not possible to receive IO activity logs, as the forwarding process would
would itself generate additional IO.
To receive a complete trace of every process on every machine, file-based
logging is necessary. File-based logging can be enabled by setting
``MITOGEN_ROUTER_DEBUG=1`` in your environment. When file-based logging is
enabled, one file per context will be created on the local machine and every
target machine, as ``/tmp/mitogen.<pid>.log``.
Diagnosing Hangs
~~~~~~~~~~~~~~~~
If you encounter a hang, the ``MITOGEN_DUMP_THREAD_STACKS=<secs>`` environment
variable arranges for each process on each machine to dump each thread stack
into the logging framework every `secs` seconds, which is visible when running
with ``-vvv``.
However, certain controller hangs may render ``MITOGEN_DUMP_THREAD_STACKS``
ineffective, or occur too infrequently for interactive reproduction. In these
cases `faulthandler <https://faulthandler.readthedocs.io/>`_ may be used:
1. For Python 2, ``pip install faulthandler``. This is unnecessary on Python 3.
2. Once the hang occurs, observe the process tree using ``pstree`` or ``ps
--forest``.
3. The most likely process to be hung is the connection multiplexer, which can
easily be identified as the parent of all SSH client processes.
4. Send ``kill -SEGV <pid>`` to the multiplexer PID, causing it to print all
thread stacks.
5. `File a bug <https://github.com/dw/mitogen/issues/new/>`_ including a copy
of the stacks, along with a description of the last task executing prior to
the hang.
Getting Help

@ -35,9 +35,9 @@ mitogen.core
Decorator that marks a function or class method to automatically receive a
kwarg named `econtext`, referencing the
:py:class:`mitogen.core.ExternalContext` active in the context in which the
:class:`mitogen.core.ExternalContext` active in the context in which the
function is being invoked in. The decorator is only meaningful when the
function is invoked via :py:data:`CALL_FUNCTION
function is invoked via :data:`CALL_FUNCTION
<mitogen.core.CALL_FUNCTION>`.
When the function is invoked directly, `econtext` must still be passed to
@ -47,10 +47,10 @@ mitogen.core
.. decorator:: takes_router
Decorator that marks a function or class method to automatically receive a
kwarg named `router`, referencing the :py:class:`mitogen.core.Router`
kwarg named `router`, referencing the :class:`mitogen.core.Router`
active in the context in which the function is being invoked in. The
decorator is only meaningful when the function is invoked via
:py:data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>`.
:data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>`.
When the function is invoked directly, `router` must still be passed to it
explicitly.
@ -94,18 +94,18 @@ Message Class
.. attribute:: router
The :py:class:`mitogen.core.Router` responsible for routing the
message. This is :py:data:`None` for locally originated messages.
The :class:`mitogen.core.Router` responsible for routing the
message. This is :data:`None` for locally originated messages.
.. attribute:: receiver
The :py:class:`mitogen.core.Receiver` over which the message was last
received. Part of the :py:class:`mitogen.select.Select` interface.
Defaults to :py:data:`None`.
The :class:`mitogen.core.Receiver` over which the message was last
received. Part of the :class:`mitogen.select.Select` interface.
Defaults to :data:`None`.
.. attribute:: dst_id
Integer target context ID. :py:class:`mitogen.core.Router` delivers
Integer target context ID. :class:`mitogen.core.Router` delivers
messages locally when their :attr:`dst_id` matches
:data:`mitogen.context_id`, otherwise they are routed up or downstream.
@ -117,12 +117,12 @@ Message Class
.. attribute:: auth_id
The context ID under whose authority the message is acting. See
:py:ref:`source-verification`.
:ref:`source-verification`.
.. attribute:: handle
Integer target handle in the destination context. This is one of the
:py:ref:`standard-handles`, or a dynamically generated handle used to
:ref:`standard-handles`, or a dynamically generated handle used to
receive a one-time reply, such as the return value of a function call.
.. attribute:: reply_to
@ -143,12 +143,12 @@ Message Class
.. py:method:: __init__ (\**kwargs)
Construct a message from from the supplied `kwargs`. :py:attr:`src_id`
and :py:attr:`auth_id` are always set to :py:data:`mitogen.context_id`.
Construct a message from from the supplied `kwargs`. :attr:`src_id`
and :attr:`auth_id` are always set to :data:`mitogen.context_id`.
.. py:classmethod:: pickled (obj, \**kwargs)
Construct a pickled message, setting :py:attr:`data` to the
Construct a pickled message, setting :attr:`data` to the
serialization of `obj`, and setting remaining fields using `kwargs`.
:returns:
@ -156,10 +156,10 @@ Message Class
.. method:: unpickle (throw=True)
Unpickle :py:attr:`data`, optionally raising any exceptions present.
Unpickle :attr:`data`, optionally raising any exceptions present.
:param bool throw:
If :py:data:`True`, raise exceptions, otherwise it is the caller's
If :data:`True`, raise exceptions, otherwise it is the caller's
responsibility.
:raises mitogen.core.CallError:
@ -169,8 +169,8 @@ Message Class
.. method:: reply (obj, router=None, \**kwargs)
Compose a reply to this message and send it using :py:attr:`router`, or
`router` is :py:attr:`router` is :data:`None`.
Compose a reply to this message and send it using :attr:`router`, or
`router` is :attr:`router` is :data:`None`.
:param obj:
Either a :class:`Message`, or an object to be serialized in order
@ -190,8 +190,8 @@ Router Class
.. class:: Router
Route messages between parent and child contexts, and invoke handlers
defined on our parent context. :py:meth:`Router.route() <route>` straddles
the :py:class:`Broker <mitogen.core.Broker>` and user threads, it is safe
defined on our parent context. :meth:`Router.route() <route>` straddles
the :class:`Broker <mitogen.core.Broker>` and user threads, it is safe
to call anywhere.
**Note:** This is the somewhat limited core version of the Router class
@ -217,7 +217,7 @@ Router Class
.. method:: stream_by_id (dst_id)
Return the :py:class:`mitogen.core.Stream` that should be used to
Return the :class:`mitogen.core.Stream` that should be used to
communicate with `dst_id`. If a specific route for `dst_id` is not
known, a reference to the parent context's stream is returned.
@ -260,24 +260,24 @@ Router Class
:param function policy:
Function invoked as `policy(msg, stream)` where `msg` is a
:py:class:`mitogen.core.Message` about to be delivered, and
`stream` is the :py:class:`mitogen.core.Stream` on which it was
received. The function must return :py:data:`True`, otherwise an
:class:`mitogen.core.Message` about to be delivered, and
`stream` is the :class:`mitogen.core.Stream` on which it was
received. The function must return :data:`True`, otherwise an
error is logged and delivery is refused.
Two built-in policy functions exist:
* :py:func:`mitogen.core.has_parent_authority`: requires the
* :func:`mitogen.core.has_parent_authority`: requires the
message arrived from a parent context, or a context acting with a
parent context's authority (``auth_id``).
* :py:func:`mitogen.parent.is_immediate_child`: requires the
* :func:`mitogen.parent.is_immediate_child`: requires the
message arrived from an immediately connected child, for use in
messaging patterns where either something becomes buggy or
insecure by permitting indirect upstream communication.
In case of refusal, and the message's ``reply_to`` field is
nonzero, a :py:class:`mitogen.core.CallError` is delivered to the
nonzero, a :class:`mitogen.core.CallError` is delivered to the
sender indicating refusal occurred.
:return:
@ -297,7 +297,7 @@ Router Class
destination is the local context, then arrange for it to be dispatched
using the local handlers.
This is a lower overhead version of :py:meth:`route` that may only be
This is a lower overhead version of :meth:`route` that may only be
called from the I/O multiplexer thread.
:param mitogen.core.Stream stream:
@ -308,11 +308,11 @@ Router Class
.. method:: route(msg)
Arrange for the :py:class:`Message` `msg` to be delivered to its
Arrange for the :class:`Message` `msg` to be delivered to its
destination using any relevant downstream context, or if none is found,
by forwarding the message upstream towards the master context. If `msg`
is destined for the local context, it is dispatched using the handles
registered with :py:meth:`add_handler`.
registered with :meth:`add_handler`.
This may be called from any thread.
@ -321,7 +321,7 @@ Router Class
.. class:: Router (broker=None)
Extend :py:class:`mitogen.core.Router` with functionality useful to
Extend :class:`mitogen.core.Router` with functionality useful to
masters, and child contexts who later become masters. Currently when this
class is required, the target context's router is upgraded at runtime.
@ -334,16 +334,16 @@ Router Class
customers or projects.
:param mitogen.master.Broker broker:
:py:class:`Broker` instance to use. If not specified, a private
:py:class:`Broker` is created.
:class:`Broker` instance to use. If not specified, a private
:class:`Broker` is created.
.. attribute:: profiling
When :data:`True`, cause the broker thread and any subsequent broker
and main threads existing in any child to write
``/tmp/mitogen.stats.<pid>.<thread_name>.log`` containing a
:py:mod:`cProfile` dump on graceful exit. Must be set prior to
construction of any :py:class:`Broker`, e.g. via:
:mod:`cProfile` dump on graceful exit. Must be set prior to
construction of any :class:`Broker`, e.g. via:
.. code::
@ -378,7 +378,7 @@ Router Class
and router, and responds to function calls identically to children
created using other methods.
For long-lived processes, :py:meth:`local` is always better as it
For long-lived processes, :meth:`local` is always better as it
guarantees a pristine interpreter state that inherited little from the
parent. Forking should only be used in performance-sensitive scenarios
where short-lived children must be spawned to isolate potentially buggy
@ -420,10 +420,10 @@ Router Class
immediate copy-on-write to large portions of the process heap.
* Locks held in the parent causing random deadlocks in the child, such
as when another thread emits a log entry via the :py:mod:`logging`
package concurrent to another thread calling :py:meth:`fork`.
as when another thread emits a log entry via the :mod:`logging`
package concurrent to another thread calling :meth:`fork`.
* Objects existing in Thread-Local Storage of every non-:py:meth:`fork`
* Objects existing in Thread-Local Storage of every non-:meth:`fork`
thread becoming permanently inaccessible, and never having their
object destructors called, including TLS usage by native extension
code, triggering many new variants of all the issues above.
@ -434,16 +434,16 @@ Router Class
case, children continually reuse the same state due to repeatedly
forking from a static parent.
:py:meth:`fork` cleans up Mitogen-internal objects, in addition to
locks held by the :py:mod:`logging` package, reseeds
:py:func:`random.random`, and the OpenSSL PRNG via
:py:func:`ssl.RAND_add`, but only if the :py:mod:`ssl` module is
:meth:`fork` cleans up Mitogen-internal objects, in addition to
locks held by the :mod:`logging` package, reseeds
:func:`random.random`, and the OpenSSL PRNG via
:func:`ssl.RAND_add`, but only if the :mod:`ssl` module is
already loaded. You must arrange for your program's state, including
any third party packages in use, to be cleaned up by specifying an
`on_fork` function.
The associated stream implementation is
:py:class:`mitogen.fork.Stream`.
:class:`mitogen.fork.Stream`.
:param function on_fork:
Function invoked as `on_fork()` from within the child process. This
@ -459,19 +459,19 @@ Router Class
serialization.
:param Context via:
Same as the `via` parameter for :py:meth:`local`.
Same as the `via` parameter for :meth:`local`.
:param bool debug:
Same as the `debug` parameter for :py:meth:`local`.
Same as the `debug` parameter for :meth:`local`.
:param bool profiling:
Same as the `profiling` parameter for :py:meth:`local`.
Same as the `profiling` parameter for :meth:`local`.
.. method:: local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None)
Construct a context on the local machine as a subprocess of the current
process. The associated stream implementation is
:py:class:`mitogen.master.Stream`.
:class:`mitogen.master.Stream`.
:param str remote_name:
The ``argv[0]`` suffix for the new process. If `remote_name` is
@ -493,9 +493,9 @@ Router Class
another tool, such as ``["/usr/bin/env", "python"]``.
:param bool debug:
If :data:`True`, arrange for debug logging (:py:meth:`enable_debug`) to
If :data:`True`, arrange for debug logging (:meth:`enable_debug`) to
be enabled in the new context. Automatically :data:`True` when
:py:meth:`enable_debug` has been called, but may be used
:meth:`enable_debug` has been called, but may be used
selectively otherwise.
:param bool unidirectional:
@ -510,14 +510,14 @@ Router Class
healthy. Defaults to 30 seconds.
:param bool profiling:
If :data:`True`, arrange for profiling (:py:data:`profiling`) to be
If :data:`True`, arrange for profiling (:data:`profiling`) to be
enabled in the new context. Automatically :data:`True` when
:py:data:`profiling` is :data:`True`, but may be used selectively
:data:`profiling` is :data:`True`, but may be used selectively
otherwise.
:param mitogen.core.Context via:
If not :data:`None`, arrange for construction to occur via RPCs
made to the context `via`, and for :py:data:`ADD_ROUTE
made to the context `via`, and for :data:`ADD_ROUTE
<mitogen.core.ADD_ROUTE>` messages to be generated as appropriate.
.. code-block:: python
@ -528,28 +528,28 @@ Router Class
# Use the SSH connection to create a sudo connection.
remote_root = router.sudo(username='root', via=remote_machine)
.. method:: dos (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs)
.. method:: doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs)
Construct a context on the local machine over a ``su`` invocation. The
``su`` process is started in a newly allocated pseudo-terminal, and
supports typing interactive passwords.
Construct a context on the local machine over a ``doas`` invocation.
The ``doas`` process is started in a newly allocated pseudo-terminal,
and supports typing interactive passwords.
Accepts all parameters accepted by :py:meth:`local`, in addition to:
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str username:
Username to use, defaults to ``root``.
:param str password:
The account password to use if requested.
:param str su_path:
Filename or complete path to the ``su`` binary. ``PATH`` will be
searched if given as a filename. Defaults to ``su``.
:param str doas_path:
Filename or complete path to the ``doas`` binary. ``PATH`` will be
searched if given as a filename. Defaults to ``doas``.
:param bytes password_prompt:
A string that indicates ``doas`` is requesting a password. Defaults
to ``Password:``.
:param list incorrect_prompts:
List of bytestrings indicating the password is incorrect. Defaults
to `(b"doas: authentication failed")`.
:raises mitogen.su.PasswordError:
:raises mitogen.doas.PasswordError:
A password was requested but none was provided, the supplied
password was incorrect, or the target account did not exist.
@ -559,7 +559,7 @@ Router Class
temporary new Docker container using the ``docker`` program. One of
`container` or `image` must be specified.
Accepts all parameters accepted by :py:meth:`local`, in addition to:
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str container:
Existing container to connect to. Defaults to :data:`None`.
@ -578,7 +578,7 @@ Router Class
Construct a context on the local machine within a FreeBSD jail using
the ``jexec`` program.
Accepts all parameters accepted by :py:meth:`local`, in addition to:
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str container:
Existing container to connect to. Defaults to :data:`None`.
@ -594,7 +594,7 @@ Router Class
Construct a context on the local machine within an LXC classic
container using the ``lxc-attach`` program.
Accepts all parameters accepted by :py:meth:`local`, in addition to:
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str container:
Existing container to connect to. Defaults to :data:`None`.
@ -608,7 +608,7 @@ Router Class
Construct a context on the local machine within a LXD container using
the ``lxc`` program.
Accepts all parameters accepted by :py:meth:`local`, in addition to:
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str container:
Existing container to connect to. Defaults to :data:`None`.
@ -616,7 +616,7 @@ Router Class
Filename or complete path to the ``lxc`` binary. ``PATH`` will be
searched if given as a filename. Defaults to ``lxc``.
.. method:: setns (container, kind, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs)
.. method:: setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs)
Construct a context in the style of :meth:`local`, but change the
active Linux process namespaces via calls to `setns(1)` before
@ -633,6 +633,9 @@ Router Class
Container to connect to.
:param str kind:
One of ``docker``, ``lxc``, ``lxd`` or ``machinectl``.
:param str username:
Username within the container to :func:`setuid` to. Defaults to
``root``.
:param str docker_path:
Filename or complete path to the Docker binary. ``PATH`` will be
searched if given as a filename. Defaults to ``docker``.
@ -653,7 +656,7 @@ Router Class
``su`` process is started in a newly allocated pseudo-terminal, and
supports typing interactive passwords.
Accepts all parameters accepted by :py:meth:`local`, in addition to:
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str username:
Username to pass to ``su``, defaults to ``root``.
@ -680,7 +683,7 @@ Router Class
The ``sudo`` process is started in a newly allocated pseudo-terminal,
and supports typing interactive passwords.
Accepts all parameters accepted by :py:meth:`local`, in addition to:
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str username:
Username to pass to sudo as the ``-u`` parameter, defaults to
@ -691,28 +694,33 @@ Router Class
:param str password:
The password to use if/when sudo requests it. Depending on the sudo
configuration, this is either the current account password or the
target account password. :py:class:`mitogen.sudo.PasswordError`
target account password. :class:`mitogen.sudo.PasswordError`
will be raised if sudo requests a password but none is provided.
:param bool set_home:
If :py:data:`True`, request ``sudo`` set the ``HOME`` environment
If :data:`True`, request ``sudo`` set the ``HOME`` environment
variable to match the target UNIX account.
:param bool preserve_env:
If :py:data:`True`, request ``sudo`` to preserve the environment of
If :data:`True`, request ``sudo`` to preserve the environment of
the parent process.
:param list sudo_args:
Arguments in the style of :py:data:`sys.argv` that would normally
Arguments in the style of :data:`sys.argv` that would normally
be passed to ``sudo``. The arguments are parsed in-process to set
equivalent parameters. Re-parsing ensures unsupported options cause
:py:class:`mitogen.core.StreamError` to be raised, and that
:class:`mitogen.core.StreamError` to be raised, and that
attributes of the stream match the actual behaviour of ``sudo``.
.. method:: ssh (hostname, username=None, ssh_path=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs)
Construct a remote context over a ``ssh`` invocation. The ``ssh``
process is started in a newly allocated pseudo-terminal, and supports
typing interactive passwords.
Construct a remote context over an OpenSSH ``ssh`` invocation.
The ``ssh`` process is started in a newly allocated pseudo-terminal to
support typing interactive passwords and responding to prompts, if a
password is specified, or `check_host_keys=accept`. In other scenarios,
``BatchMode`` is enabled and no PTY is allocated. For many-target
configurations, both options should be avoided as most systems have a
conservative limit on the number of pseudo-terminals that may exist.
Accepts all parameters accepted by :py:meth:`local`, in addition to:
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str username:
The SSH username; default is unspecified, which causes SSH to pick
@ -734,7 +742,7 @@ Router Class
unknown hosts cause a connection failure.
:param str password:
Password to type if/when ``ssh`` requests it. If not specified and
a password is requested, :py:class:`mitogen.ssh.PasswordError` is
a password is requested, :class:`mitogen.ssh.PasswordError` is
raised.
:param str identity_file:
Path to an SSH private key file to use for authentication. Default
@ -752,12 +760,12 @@ Router Class
present in ``~/.ssh``. This ensures authentication attempts only
occur using the supplied password or SSH key.
:param bool compression:
If :py:data:`True`, enable ``ssh`` compression support. Compression
If :data:`True`, enable ``ssh`` compression support. Compression
has a minimal effect on the size of modules transmitted, as they
are already compressed, however it has a large effect on every
remaining message in the otherwise uncompressed stream protocol,
such as function call arguments and return values.
:parama int ssh_debug_level:
:param int ssh_debug_level:
Optional integer `0..3` indicating the SSH client debug level.
:raises mitogen.ssh.PasswordError:
A password was requested but none was specified, or the specified
@ -805,12 +813,12 @@ Context Class
The message.
:returns:
:py:class:`mitogen.core.Receiver` configured to receive any replies
:class:`mitogen.core.Receiver` configured to receive any replies
sent to the message's `reply_to` handle.
.. method:: send_await (msg, deadline=None)
As with :py:meth:`send_async`, but expect a single reply
As with :meth:`send_async`, but expect a single reply
(`persist=False`) delivered within `deadline` seconds.
:param mitogen.core.Message msg:
@ -825,12 +833,21 @@ Context Class
.. currentmodule:: mitogen.parent
.. autoclass:: CallChain
:members:
.. class:: Context
Extend :py:class:`mitogen.core.Router` with functionality useful to
Extend :class:`mitogen.core.Router` with functionality useful to
masters, and child contexts who later become parents. Currently when this
class is required, the target context's router is upgraded at runtime.
.. attribute:: default_call_chain
A :class:`CallChain` instance constructed by default, with pipelining
disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply`
use this instance.
.. method:: shutdown (wait=False)
Arrange for the context to receive a ``SHUTDOWN`` message, triggering
@ -840,7 +857,7 @@ Context Class
terminate a hung context using this method. This will be fixed shortly.
:param bool wait:
If :py:data:`True`, block the calling thread until the context has
If :data:`True`, block the calling thread until the context has
completely terminated.
:returns:
If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch`
@ -850,77 +867,15 @@ Context Class
.. method:: call_async (fn, \*args, \*\*kwargs)
Arrange for the context's ``CALL_FUNCTION`` handle to receive a
message that causes `fn(\*args, \**kwargs)` to be invoked on the
context's main thread.
:param fn:
A free function in module scope or a class method of a class
directly reachable from module scope:
.. code-block:: python
# mymodule.py
def my_func():
"""A free function reachable as mymodule.my_func"""
class MyClass:
@classmethod
def my_classmethod(cls):
"""Reachable as mymodule.MyClass.my_classmethod"""
def my_instancemethod(self):
"""Unreachable: requires a class instance!"""
class MyEmbeddedClass:
@classmethod
def my_classmethod(cls):
"""Not directly reachable from module scope!"""
:param tuple args:
Function arguments, if any. See :ref:`serialization-rules` for
permitted types.
:param dict kwargs:
Function keyword arguments, if any. See :ref:`serialization-rules`
for permitted types.
:returns:
:py:class:`mitogen.core.Receiver` configured to receive the result
of the invocation:
.. code-block:: python
recv = context.call_async(os.check_output, 'ls /tmp/')
try:
# Prints output once it is received.
msg = recv.get()
print(msg.unpickle())
except mitogen.core.CallError, e:
print('Call failed:', str(e))
Asynchronous calls may be dispatched in parallel to multiple
contexts and consumed as they complete using
:py:class:`mitogen.select.Select`.
See :meth:`CallChain.call_async`.
.. method:: call (fn, \*args, \*\*kwargs)
Equivalent to :py:meth:`call_async(fn, \*args, \**kwargs).get().unpickle()
<call_async>`.
:returns:
The function's return value.
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
See :meth:`CallChain.call`.
.. method:: call_no_reply (fn, \*args, \*\*kwargs)
Send a function call, but expect no return value. If the call fails,
the full exception will be logged to the target context's logging framework.
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
See :meth:`CallChain.call_no_reply`.
Receiver Class
@ -932,7 +887,7 @@ Receiver Class
Receivers are used to wait for pickled responses from another context to be
sent to a handle registered in this context. A receiver may be single-use
(as in the case of :py:meth:`mitogen.parent.Context.call_async`) or
(as in the case of :meth:`mitogen.parent.Context.call_async`) or
multiple use.
:param mitogen.core.Router router:
@ -956,12 +911,12 @@ Receiver Class
If not :data:`None`, a reference to a function invoked as
`notify(receiver)` when a new message is delivered to this receiver.
Used by :py:class:`mitogen.select.Select` to implement waiting on
Used by :class:`mitogen.select.Select` to implement waiting on
multiple receivers.
.. py:method:: to_sender ()
Return a :py:class:`mitogen.core.Sender` configured to deliver messages
Return a :class:`mitogen.core.Sender` configured to deliver messages
to this receiver. Since a Sender can be serialized, this makes it
convenient to pass `(context_id, handle)` pairs around::
@ -978,15 +933,15 @@ Receiver Class
.. py:method:: empty ()
Return :data:`True` if calling :py:meth:`get` would block.
Return :data:`True` if calling :meth:`get` would block.
As with :py:class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and
:py:meth:`get`.
As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :meth:`empty` and
:meth:`get`.
:py:meth:`empty` is only useful to avoid a race while installing
:py:attr:`notify`:
:meth:`empty` is only useful to avoid a race while installing
:attr:`notify`:
.. code-block:: python
@ -1000,8 +955,8 @@ Receiver Class
.. py:method:: close ()
Cause :py:class:`mitogen.core.ChannelError` to be raised in any thread
waiting in :py:meth:`get` on this receiver.
Cause :class:`mitogen.core.ChannelError` to be raised in any thread
waiting in :meth:`get` on this receiver.
.. py:method:: get (timeout=None)
@ -1019,17 +974,17 @@ Receiver Class
:returns:
`(msg, data)` tuple, where `msg` is the
:py:class:`mitogen.core.Message` that was received, and `data` is
:class:`mitogen.core.Message` that was received, and `data` is
its unpickled data part.
.. py:method:: get_data (timeout=None)
Like :py:meth:`get`, except only return the data part.
Like :meth:`get`, except only return the data part.
.. py:method:: __iter__ ()
Block and yield `(msg, data)` pairs delivered to this receiver until
:py:class:`mitogen.core.ChannelError` is raised.
:class:`mitogen.core.ChannelError` is raised.
Sender Class
@ -1040,10 +995,10 @@ Sender Class
.. class:: Sender (context, dst_handle)
Senders are used to send pickled messages to a handle in another context,
it is the inverse of :py:class:`mitogen.core.Sender`.
it is the inverse of :class:`mitogen.core.Sender`.
Senders may be serialized, making them convenient to wire up data flows.
See :py:meth:`mitogen.core.Receiver.to_sender` for more information.
See :meth:`mitogen.core.Receiver.to_sender` for more information.
:param mitogen.core.Context context:
Context to send messages to.
@ -1052,7 +1007,7 @@ Sender Class
.. py:method:: close ()
Send a dead message to the remote end, causing :py:meth:`ChannelError`
Send a dead message to the remote end, causing :meth:`ChannelError`
to be raised in any waiting thread.
.. py:method:: send (data)
@ -1071,11 +1026,11 @@ Select Class
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:py:class:`mitogen.core.Receiver` or :py:class:`mitogen.select.Select`
:class:`mitogen.core.Receiver` or :class:`mitogen.select.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :py:meth:`__iter__` terminates once the final receiver is
result; since :meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
@ -1090,7 +1045,7 @@ Select Class
# Iteration ends when last Receiver yields a result.
print('Received total %s from %s receivers' % (total, len(recvs)))
:py:class:`Select` may drive a long-running scheduler:
:class:`Select` may drive a long-running scheduler:
.. code-block:: python
@ -1101,7 +1056,7 @@ Select Class
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:py:class:`Select` may be nested:
:class:`Select` may be nested:
.. code-block:: python
@ -1119,11 +1074,11 @@ Select Class
.. py:classmethod:: all (it)
Take an iterable of receivers and retrieve a :py:class:`Message` from
Take an iterable of receivers and retrieve a :class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch :py:class:`Context.call_async`
This is sugar for handling batch :class:`Context.call_async`
invocations:
.. code-block:: python
@ -1139,32 +1094,32 @@ Select Class
.. code-block:: python
sum(context.call_async(get_disk_usage).get().unpickle()
for context in contexts)
recvs = [c.call_async(get_disk_usage) for c in contexts]
sum(recv.get().unpickle() for recv in recvs)
Result processing happens concurrently to new results arriving, so
:py:meth:`all` should always be faster.
Result processing happens in the order results arrive, rather than the
order requests were issued, so :meth:`all` should always be faster.
.. py:method:: get (timeout=None, block=True)
Fetch the next available value from any receiver, or raise
:py:class:`mitogen.core.TimeoutError` if no value is available within
:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :py:attr:`receiver
On success, the message's :attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:param bool block:
If :py:data:`False`, immediately raise
:py:class:`mitogen.core.TimeoutError` if the select is empty.
If :data:`False`, immediately raise
:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:py:class:`mitogen.core.Message`
:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the underlying latch is no
:meth:`close` has been called, and the underlying latch is no
longer valid.
.. py:method:: __bool__ ()
@ -1175,8 +1130,8 @@ Select Class
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :py:meth:`get` to be woken with
:py:class:`mitogen.core.LatchError`.
sleeping in :meth:`get` to be woken with
:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
@ -1184,35 +1139,35 @@ Select Class
.. py:method:: empty ()
Return :data:`True` if calling :py:meth:`get` would block.
Return :data:`True` if calling :meth:`get` would block.
As with :py:class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and
:py:meth:`get`.
As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :meth:`empty` and
:meth:`get`.
:py:meth:`empty` may return :data:`False` even when :py:meth:`get`
:meth:`empty` may return :data:`False` even when :meth:`get`
would block if another thread has drained a receiver added to this
select. This can be avoided by only consuming each receiver from a
single thread.
.. py:method:: __iter__ (self)
Yield the result of :py:meth:`get` until no receivers remain in the
Yield the result of :meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :py:meth:`remove`.
explicitly removed via :meth:`remove`.
.. py:method:: add (recv)
Add the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` to the select.
Add the :class:`mitogen.core.Receiver` or
:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :py:meth:`remove`, then it will
still be returned by a subsequent :py:meth:`get`. This may change in a
Remove the :class:`mitogen.core.Receiver` or
:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :meth:`remove`, then it will
still be returned by a subsequent :meth:`get`. This may change in a
future version.
@ -1223,7 +1178,7 @@ Channel Class
.. class:: Channel (router, context, dst_handle, handle=None)
A channel inherits from :py:class:`mitogen.core.Sender` and
A channel inherits from :class:`mitogen.core.Sender` and
`mitogen.core.Receiver` to provide bidirectional functionality.
Since all handles aren't known until after both ends are constructed, for
@ -1245,8 +1200,8 @@ Broker Class
.. attribute:: shutdown_timeout = 3.0
Seconds grace to allow :py:class:`streams <Stream>` to shutdown
gracefully before force-disconnecting them during :py:meth:`shutdown`.
Seconds grace to allow :class:`streams <Stream>` to shutdown
gracefully before force-disconnecting them during :meth:`shutdown`.
.. method:: defer (func, \*args, \*kwargs)
@ -1256,26 +1211,26 @@ Broker Class
.. method:: start_receive (stream)
Mark the :py:attr:`receive_side <Stream.receive_side>` on `stream` as
Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as
ready for reading. Safe to call from any thread. When the associated
file descriptor becomes ready for reading,
:py:meth:`BasicStream.on_receive` will be called.
:meth:`BasicStream.on_receive` will be called.
.. method:: stop_receive (stream)
Mark the :py:attr:`receive_side <Stream.receive_side>` on `stream` as
Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as
not ready for reading. Safe to call from any thread.
.. method:: _start_transmit (stream)
Mark the :py:attr:`transmit_side <Stream.transmit_side>` on `stream` as
Mark the :attr:`transmit_side <Stream.transmit_side>` on `stream` as
ready for writing. Must only be called from the Broker thread. When the
associated file descriptor becomes ready for writing,
:py:meth:`BasicStream.on_transmit` will be called.
:meth:`BasicStream.on_transmit` will be called.
.. method:: stop_receive (stream)
Mark the :py:attr:`transmit_side <Stream.receive_side>` on `stream` as
Mark the :attr:`transmit_side <Stream.receive_side>` on `stream` as
not ready for writing. Safe to call from any thread.
.. method:: shutdown
@ -1285,12 +1240,12 @@ Broker Class
.. method:: join
Wait for the broker to stop, expected to be called after
:py:meth:`shutdown`.
:meth:`shutdown`.
.. method:: keep_alive
Return :data:`True` if any reader's :py:attr:`Side.keep_alive`
attribute is :data:`True`, or any :py:class:`Context` is still
Return :data:`True` if any reader's :attr:`Side.keep_alive`
attribute is :data:`True`, or any :class:`Context` is still
registered that is not the master. Used to delay shutdown while some
important work is in progress (e.g. log draining).
@ -1298,11 +1253,11 @@ Broker Class
.. method:: _broker_main
Handle events until :py:meth:`shutdown`. On shutdown, invoke
:py:meth:`Stream.on_shutdown` for every active stream, then allow up to
:py:attr:`shutdown_timeout` seconds for the streams to unregister
Handle events until :meth:`shutdown`. On shutdown, invoke
:meth:`Stream.on_shutdown` for every active stream, then allow up to
:attr:`shutdown_timeout` seconds for the streams to unregister
themselves before forcefully calling
:py:meth:`Stream.on_disconnect`.
:meth:`Stream.on_disconnect`.
.. currentmodule:: mitogen.master
@ -1318,7 +1273,7 @@ Broker Class
:param bool install_watcher:
If :data:`True`, an additional thread is started to monitor the
lifetime of the main thread, triggering :py:meth:`shutdown`
lifetime of the main thread, triggering :meth:`shutdown`
automatically in case the user forgets to call it, or their code
crashed.
@ -1329,8 +1284,8 @@ Broker Class
.. attribute:: shutdown_timeout = 5.0
Seconds grace to allow :py:class:`streams <Stream>` to shutdown
gracefully before force-disconnecting them during :py:meth:`shutdown`.
Seconds grace to allow :class:`streams <Stream>` to shutdown
gracefully before force-disconnecting them during :meth:`shutdown`.
Utility Functions
@ -1346,7 +1301,7 @@ A random assortment of utility functions useful on masters and children.
Many tools love to subclass built-in types in order to implement useful
functionality, such as annotating the safety of a Unicode string, or adding
additional methods to a dict. However, cPickle loves to preserve those
subtypes during serialization, resulting in CallError during :py:meth:`call
subtypes during serialization, resulting in CallError during :meth:`call
<mitogen.parent.Context.call>` in the target when it tries to deserialize
the data.
@ -1366,12 +1321,12 @@ A random assortment of utility functions useful on masters and children.
Remove all entries mentioning ``site-packages`` or ``Extras`` from the
system path. Used primarily for testing on OS X within a virtualenv, where
OS X bundles some ancient version of the :py:mod:`six` module.
OS X bundles some ancient version of the :mod:`six` module.
.. currentmodule:: mitogen.utils
.. function:: log_to_file (path=None, io=False, level='INFO')
Install a new :py:class:`logging.Handler` writing applications logs to the
Install a new :class:`logging.Handler` writing applications logs to the
filesystem. Useful when debugging slave IO problems.
Parameters to this function may be overridden at runtime using environment
@ -1379,14 +1334,14 @@ A random assortment of utility functions useful on masters and children.
:param str path:
If not :data:`None`, a filesystem path to write logs to. Otherwise,
logs are written to :py:data:`sys.stderr`.
logs are written to :data:`sys.stderr`.
:param bool io:
If :data:`True`, include extremely verbose IO logs in the output.
Useful for debugging hangs, less useful for debugging application code.
:param str level:
Name of the :py:mod:`logging` package constant that is the minimum
Name of the :mod:`logging` package constant that is the minimum
level to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``,
and ``ERROR``.
@ -1394,7 +1349,7 @@ A random assortment of utility functions useful on masters and children.
.. function:: run_with_router(func, \*args, \**kwargs)
Arrange for `func(router, \*args, \**kwargs)` to run with a temporary
:py:class:`mitogen.master.Router`, ensuring the Router and Broker are
:class:`mitogen.master.Router`, ensuring the Router and Broker are
correctly shut down during normal or exceptional return.
:returns:
@ -1403,7 +1358,7 @@ A random assortment of utility functions useful on masters and children.
.. currentmodule:: mitogen.utils
.. decorator:: with_router
Decorator version of :py:func:`run_with_router`. Example:
Decorator version of :func:`run_with_router`. Example:
.. code-block:: python

@ -24,8 +24,7 @@ Mitogen for Ansible
Enhancements
^^^^^^^^^^^^
* `#315 <https://github.com/dw/mitogen/pull/315>`_: Ansible 2.6 is now
supported.
* `#315 <https://github.com/dw/mitogen/pull/315>`_: Ansible 2.6 is supported.
* `#321 <https://github.com/dw/mitogen/issues/321>`_,
`#336 <https://github.com/dw/mitogen/issues/336>`_: temporary file handling
@ -59,6 +58,14 @@ Enhancements
synchronization, wasting significant runtime in the connection multiplexer.
In one case work was reduced by 95%, which may manifest as faster runs.
* `5189408e <https://github.com/dw/mitogen/commit/5189408e>`_: threads are
cooperatively scheduled, minimizing `GIL
<https://en.wikipedia.org/wiki/Global_interpreter_lock>`_ contention, and
reducing context switching by around 90%. This manifests as an overall
improvement, but is easily noticeable on short many-target runs, where
startup overhead dominates runtime.
Fixes
^^^^^
@ -78,10 +85,10 @@ Fixes
would fail due to fallout from the Python 3 port and related tests being
disabled.
* `#331 <https://github.com/dw/mitogen/issues/331>`_: fixed known issue: the
connection multiplexer subprocess always exits before the main Ansible
process, ensuring logs generated by it do not overwrite the user's prompt
when ``-vvv`` is enabled.
* `#331 <https://github.com/dw/mitogen/issues/331>`_: the connection
multiplexer subprocess always exits before the main Ansible process, ensuring
logs generated by it do not overwrite the user's prompt when ``-vvv`` is
enabled.
* `#332 <https://github.com/dw/mitogen/issues/332>`_: support a new
:func:`sys.excepthook`-based module exit mechanism added in Ansible 2.6.
@ -101,24 +108,44 @@ Fixes
yes`` option is no longer supplied to OpenSSH by default, better matching
Ansible's behaviour.
* `#355 <https://github.com/dw/mitogen/issues/355>`_: tasks configured to run
in an isolated forked subprocess were forked from the wrong parent context.
This meant built-in modules overridden via a custom ``module_utils`` search
path may not have had any effect.
* A missing check caused an exception traceback to appear when using the
``ansible`` command-line tool with a missing or misspelled module name.
* Ansible since >=2.7 began importing ``__main__`` from
``ansible.module_utils.basic``, causing an error during execution, due to the
controller being configured to refuse network imports outside the
* Ansible since >=2.7 began importing :mod:`__main__` from
:mod:`ansible.module_utils.basic`, causing an error during execution, due to
the controller being configured to refuse network imports outside the
``ansible.*`` namespace. Update the target implementation to construct a stub
``__main__`` module to satisfy the otherwise seemingly vestigial import.
:mod:`__main__` module to satisfy the otherwise seemingly vestigial import.
Core Library
~~~~~~~~~~~~
* A new :class:`mitogen.parent.CallChain` class abstracts safe pipelining of
related function calls to a target context, cancelling the chain if an
exception occurs.
* `#305 <https://github.com/dw/mitogen/issues/305>`_: fix a long-standing minor
race relating to the logging framework, where *no route for Message..*
would frequently appear during startup.
* `#313 <https://github.com/dw/mitogen/issues/313>`_:
:meth:`mitogen.parent.Context.call` was documented as capable of accepting
static methods. While possible on Python 2.x the result is very ugly, and in
every case it should be trivially possible to replace with a class method.
The API docs were updated to remove mention of static methods.
static methods. While possible on Python 2.x the result is ugly, and in every
case it should be trivial to replace with a classmethod. The documentation
was fixed.
* `#337 <https://github.com/dw/mitogen/issues/337>`_: to avoid a scaling
limitation, SSH no longer allocates a PTY for every OpenSSH client. PTYs are
only allocated if a password is supplied, or when `host_key_checking=accept`.
This is since Linux has a default of 4096 PTYs (``kernel.pty.max``), while OS
X has a default of 127 and an absolute maximum of 999
(``kern.tty.ptmx_max``).
* `#339 <https://github.com/dw/mitogen/issues/339>`_: the LXD connection method
was erroneously executing LXC Classic commands.
@ -126,8 +153,17 @@ Core Library
* `#345 <https://github.com/dw/mitogen/issues/345>`_: the SSH connection method
allows optionally disabling ``IdentitiesOnly yes``.
* Add a :func:`mitogen.fork.on_fork` function to allow non-Mitogen managed
process forks to clean up Mitogen resources in the forked chlid.
* `af2ded66 <https://github.com/dw/mitogen/commit/af2ded66>`_: add
:func:`mitogen.fork.on_fork` to allow non-Mitogen managed process forks to
clean up Mitogen resources in the child.
* `d6784242 <https://github.com/dw/mitogen/commit/d6784242>`_: the setns method
always resets ``HOME``, ``SHELL``, ``LOGNAME`` and ``USER`` environment
variables to an account in the target container, defaulting to ``root``.
* `830966bf <https://github.com/dw/mitogen/commit/830966bf>`_: the UNIX
listener no longer crashes if the peer process disappears in the middle of
connection setup.
Thanks!
@ -137,13 +173,18 @@ Mitogen would not be possible without the support of users. A huge thanks for
the bug reports in this release contributed by
`Alex Russu <https://github.com/alexrussu>`_,
`atoom <https://github.com/atoom>`_,
`Berend De Schouwer <https://github.com/berenddeschouwer>`_,
`Dan Quackenbush <https://github.com/danquack>`_,
`dsgnr <https://github.com/dsgnr>`_,
`Jesse London <https://github.com/jesteria>`_,
`Jonathan Rosser <https://github.com/jrosser>`_,
`Josh Smift <https://github.com/jbscare>`_,
`Luca Nunzi <https://github.com/0xlc>`_,
`nikitakazantsev12 <https://github.com/nikitakazantsev12>`_,
`Pateek Jain <https://github.com/prateekj201>`_,
`Peter V. Saveliev <https://github.com/svinota>`_,
`Pierre-Henry Muller <https://github.com/pierrehenrymuller>`_,
`Pierre-Louis Bonicoli <https://github.com/jesteria>`_,
`Prateek Jain <https://github.com/prateekj201>`_,
`Rick Box <https://github.com/boxrick>`_, and
`Timo Beckers <https://github.com/ti-mo>`_.
@ -324,10 +365,6 @@ Mitogen for Ansible
- initech_app
- y2k_fix
* When running with ``-vvv``, log messages such as *mitogen: Router(Broker(0x7f5a48921590)): no route
for Message(..., 102, ...), my ID is ...* may be visible. These are due to a
minor race while initializing logging and can be ignored.
.. * When running with ``-vvv``, log messages will be printed to the console
*after* the Ansible run completes, as connection multiplexer shutdown only
begins after Ansible exits. This is due to a lack of suitable shutdown hook

@ -373,11 +373,9 @@ Children listen on the following handles:
.. currentmodule:: mitogen.core
.. data:: CALL_FUNCTION
Receives `(mod_name, class_name, func_name, args, kwargs)`
5-tuples from
:py:meth:`call_async() <mitogen.parent.Context.call_async>`,
imports ``mod_name``, then attempts to execute
`class_name.func_name(\*args, \**kwargs)`.
Receives `(chain_id, mod_name, class_name, func_name, args, kwargs)`
6-tuples from :class:`mitogen.parent.CallChain`, imports ``mod_name``, then
attempts to execute `class_name.func_name(\*args, \**kwargs)`.
When this channel is closed (by way of receiving a dead message), the
child's main thread begins graceful shutdown of its own :py:class:`Broker`

@ -950,6 +950,16 @@ class LogHandler(logging.Handler):
logging.Handler.__init__(self)
self.context = context
self.local = threading.local()
self._buffer = []
def uncork(self):
self._send = self.context.send
for msg in self._buffer:
self._send(msg)
self._buffer = None
def _send(self, msg):
self._buffer.append(msg)
def emit(self, rec):
if rec.name == 'mitogen.io' or \
@ -963,7 +973,7 @@ class LogHandler(logging.Handler):
if isinstance(encoded, UnicodeType):
# Logging package emits both :(
encoded = encoded.encode('utf-8')
self.context.send(Message(data=encoded, handle=FORWARD_LOG))
self._send(Message(data=encoded, handle=FORWARD_LOG))
finally:
self.local.in_emit = False
@ -1939,15 +1949,76 @@ class Broker(object):
return 'Broker(%#x)' % (id(self),)
class Dispatcher(object):
def __init__(self, econtext):
self.econtext = econtext
#: Chain ID -> CallError if prior call failed.
self._error_by_chain_id = {}
self.recv = Receiver(router=econtext.router,
handle=CALL_FUNCTION,
policy=has_parent_authority)
listen(econtext.broker, 'shutdown', self.recv.close)
@classmethod
@takes_econtext
def forget_chain(cls, chain_id, econtext):
econtext.dispatcher._error_by_chain_id.pop(chain_id, None)
def _parse_request(self, msg):
data = msg.unpickle(throw=False)
_v and LOG.debug('_dispatch_one(%r)', data)
chain_id, modname, klass, func, args, kwargs = data
obj = import_module(modname)
if klass:
obj = getattr(obj, klass)
fn = getattr(obj, func)
if getattr(fn, 'mitogen_takes_econtext', None):
kwargs.setdefault('econtext', self.econtext)
if getattr(fn, 'mitogen_takes_router', None):
kwargs.setdefault('router', self.econtext.router)
return chain_id, fn, args, kwargs
def _dispatch_one(self, msg):
try:
chain_id, fn, args, kwargs = self._parse_request(msg)
except Exception:
return None, CallError(sys.exc_info()[1])
if chain_id in self._error_by_chain_id:
return chain_id, self._error_by_chain_id[chain_id]
try:
return chain_id, fn(*args, **kwargs)
except Exception:
e = CallError(sys.exc_info()[1])
if chain_id is not None:
self._error_by_chain_id[chain_id] = e
return chain_id, e
def _dispatch_calls(self):
for msg in self.recv:
chain_id, ret = self._dispatch_one(msg)
_v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret)
if msg.reply_to:
msg.reply(ret)
elif isinstance(ret, CallError) and chain_id is None:
LOG.error('No-reply function call failed: %s', ret)
def run(self):
if self.econtext.config.get('on_start'):
self.econtext.config['on_start'](self)
_profile_hook('main', self._dispatch_calls)
class ExternalContext(object):
detached = False
def __init__(self, config):
self.config = config
def _on_broker_shutdown(self):
self.recv.close()
def _on_broker_exit(self):
if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM)
@ -2031,16 +2102,12 @@ class ExternalContext(object):
in_fd = self.config.get('in_fd', 100)
out_fd = self.config.get('out_fd', 1)
self.recv = Receiver(router=self.router,
handle=CALL_FUNCTION,
policy=has_parent_authority)
self.stream = Stream(self.router, parent_id)
self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd)
self.stream.receive_side.keep_alive = False
listen(self.stream, 'disconnect', self._on_parent_disconnect)
listen(self.broker, 'shutdown', self._on_broker_shutdown)
listen(self.broker, 'exit', self._on_broker_exit)
os.close(in_fd)
@ -2052,9 +2119,10 @@ class ExternalContext(object):
pass # No first stage exists (e.g. fakessh)
def _setup_logging(self):
self.log_handler = LogHandler(self.master)
root = logging.getLogger()
root.setLevel(self.config['log_level'])
root.handlers = [LogHandler(self.master)]
root.handlers = [self.log_handler]
if self.config['debug']:
enable_debug_logging()
@ -2137,40 +2205,6 @@ class ExternalContext(object):
# Reopen with line buffering.
sys.stdout = os.fdopen(1, 'w', 1)
def _dispatch_one(self, msg):
data = msg.unpickle(throw=False)
_v and LOG.debug('_dispatch_calls(%r)', data)
modname, klass, func, args, kwargs = data
obj = import_module(modname)
if klass:
obj = getattr(obj, klass)
fn = getattr(obj, func)
if getattr(fn, 'mitogen_takes_econtext', None):
kwargs.setdefault('econtext', self)
if getattr(fn, 'mitogen_takes_router', None):
kwargs.setdefault('router', self.router)
return fn(*args, **kwargs)
def _dispatch_calls(self):
if self.config.get('on_start'):
self.config['on_start'](self)
for msg in self.recv:
try:
ret = self._dispatch_one(msg)
_v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret)
if msg.reply_to:
msg.reply(ret)
except Exception:
e = sys.exc_info()[1]
if msg.reply_to:
_v and LOG.debug('_dispatch_calls: %s', e)
msg.reply(CallError(e))
else:
LOG.exception('_dispatch_calls: %r', msg)
self.dispatch_stopped = True
def main(self):
self._setup_master()
try:
@ -2184,14 +2218,16 @@ class ExternalContext(object):
if self.config.get('setup_stdio', True):
self._setup_stdio()
self.dispatcher = Dispatcher(self)
self.router.register(self.parent, self.stream)
self.log_handler.uncork()
sys.executable = os.environ.pop('ARGV0', sys.executable)
_v and LOG.debug('Connected to %s; my ID is %r, PID is %r',
self.parent, mitogen.context_id, os.getpid())
_v and LOG.debug('Recovered sys.executable: %r', sys.executable)
_profile_hook('main', self._dispatch_calls)
self.dispatcher.run()
_v and LOG.debug('ExternalContext.main() normal exit')
except KeyboardInterrupt:
LOG.debug('KeyboardInterrupt received, exiting gracefully.')

@ -183,15 +183,16 @@ def install_handler():
signal.signal(signal.SIGUSR2, _handler)
def _logging_main():
def _logging_main(secs):
while True:
time.sleep(5)
time.sleep(secs)
LOG.info('PERIODIC THREAD DUMP\n\n%s', get_snapshot())
def dump_to_logger():
def dump_to_logger(secs=5):
th = threading.Thread(
target=_logging_main,
kwargs={'secs': secs},
name='mitogen.debug.dump_to_logger',
)
th.setDaemon(True)

@ -45,8 +45,8 @@ class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
#: Once connected, points to the corresponding DiagLogStream, allowing it
#: to be disconnected at the same time this stream is being torn down.
tty_stream = None
username = 'root'
@ -89,7 +89,7 @@ class Stream(mitogen.parent.Stream):
password_required_msg = 'doas password is required'
def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.TtyLogStream(extra_fd, self)
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self)
password_sent = False
it = mitogen.parent.iter_read(

@ -371,7 +371,7 @@ class ModuleFinder(object):
# requests.packages.urllib3.contrib.pyopenssl"
e = sys.exc_info()[1]
LOG.debug('%r: loading %r using %r failed: %s',
self, fullname, loader)
self, fullname, loader, e)
return
if path is None or source is None:
@ -681,8 +681,7 @@ class ModuleResponder(object):
)
)
def _forward_module(self, context, fullname):
IOLOG.debug('%r._forward_module(%r, %r)', self, context, fullname)
def _forward_one_module(self, context, fullname):
path = []
while fullname:
path.append(fullname)
@ -693,8 +692,13 @@ class ModuleResponder(object):
self._send_module_and_related(stream, fullname)
self._send_forward_module(stream, context, fullname)
def forward_module(self, context, fullname):
self._router.broker.defer(self._forward_module, context, fullname)
def _forward_modules(self, context, fullnames):
IOLOG.debug('%r._forward_modules(%r, %r)', self, context, fullnames)
for fullname in fullnames:
self._forward_one_module(context, fullname)
def forward_modules(self, context, fullnames):
self._router.broker.defer(self._forward_modules, context, fullnames)
class Broker(mitogen.core.Broker):

@ -78,6 +78,13 @@ try:
except:
SC_OPEN_MAX = 1024
OPENPTY_MSG = (
"Failed to create a PTY: %s. It is likely the maximum number of PTYs has "
"been reached. Consider increasing the 'kern.tty.ptmx_max' sysctl on OS "
"X, the 'kernel.pty.max' sysctl on Linux, or modifying your configuration "
"to avoid PTY use."
)
def get_log_level():
return (LOG.level or logging.getLogger().level or logging.INFO)
@ -198,7 +205,7 @@ def detach_popen(*args, **kwargs):
return proc.pid
def create_child(args, merge_stdio=False, preexec_fn=None):
def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None):
"""
Create a child process whose stdin/stdout is connected to a socket.
@ -209,8 +216,13 @@ def create_child(args, merge_stdio=False, preexec_fn=None):
socketpair, rather than inherited from the parent process. This may be
necessary to ensure that not TTY is connected to any stdio handle, for
instance when using LXC.
:param bool stderr_pipe:
If :data:`True` and `merge_stdio` is :data:`False`, arrange for
`stderr` to be connected to a separate pipe, to allow any ongoing debug
logs generated by e.g. SSH to be outpu as the session progresses,
without interfering with `stdout`.
:returns:
`(pid, socket_obj, :data:`None`)`
`(pid, socket_obj, :data:`None` or pipe_fd)`
"""
parentfp, childfp = create_socketpair()
# When running under a monkey patches-enabled gevent, the socket module
@ -219,10 +231,15 @@ def create_child(args, merge_stdio=False, preexec_fn=None):
# O_NONBLOCK from Python's future stdin fd.
mitogen.core.set_block(childfp.fileno())
stderr_r = None
extra = {}
if merge_stdio:
extra = {'stderr': childfp}
else:
extra = {}
elif stderr_pipe:
stderr_r, stderr_w = os.pipe()
mitogen.core.set_cloexec(stderr_r)
mitogen.core.set_cloexec(stderr_w)
extra = {'stderr': stderr_w}
pid = detach_popen(
args=args,
@ -232,6 +249,8 @@ def create_child(args, merge_stdio=False, preexec_fn=None):
preexec_fn=preexec_fn,
**extra
)
if stderr_pipe:
os.close(stderr_w)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno())
@ -239,7 +258,7 @@ def create_child(args, merge_stdio=False, preexec_fn=None):
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
pid, fd, os.getpid(), Argv(args))
return pid, fd, None
return pid, fd, stderr_r
def _acquire_controlling_tty():
@ -254,6 +273,22 @@ def _acquire_controlling_tty():
fcntl.ioctl(2, termios.TIOCSCTTY)
def openpty():
"""
Call :func:`os.openpty`, raising a descriptive error if the call fails.
:raises mitogen.core.StreamError:
Creating a PTY failed.
:returns:
See :func`os.openpty`.
"""
try:
return os.openpty()
except OSError:
e = sys.exc_info()[1]
raise mitogen.core.StreamError(OPENPTY_MSG, e)
def tty_create_child(args):
"""
Return a file descriptor connected to the master end of a pseudo-terminal,
@ -268,7 +303,7 @@ def tty_create_child(args):
:returns:
`(pid, tty_fd, None)`
"""
master_fd, slave_fd = os.openpty()
master_fd, slave_fd = openpty()
mitogen.core.set_block(slave_fd)
disable_echo(master_fd)
disable_echo(slave_fd)
@ -300,7 +335,7 @@ def hybrid_tty_create_child(args):
:returns:
`(pid, socketpair_fd, tty_fd)`
"""
master_fd, slave_fd = os.openpty()
master_fd, slave_fd = openpty()
parentfp, childfp = create_socketpair()
mitogen.core.set_block(slave_fd)
@ -488,22 +523,6 @@ def upgrade_router(econtext):
)
def make_call_msg(fn, *args, **kwargs):
if inspect.ismethod(fn) and inspect.isclass(fn.__self__):
klass = mitogen.core.to_text(fn.__self__.__name__)
else:
klass = None
tup = (
mitogen.core.to_text(fn.__module__),
klass,
mitogen.core.to_text(fn.__name__),
args,
mitogen.core.Kwargs(kwargs)
)
return mitogen.core.Message.pickled(tup, handle=mitogen.core.CALL_FUNCTION)
def stream_by_method_name(name):
"""
Given the name of a Mitogen connection method, import its implementation
@ -769,7 +788,7 @@ PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
mitogen.core.Latch.poller_class = PREFERRED_POLLER
class TtyLogStream(mitogen.core.BasicStream):
class DiagLogStream(mitogen.core.BasicStream):
"""
For "hybrid TTY/socketpair" mode, after a connection has been setup, a
spare TTY file descriptor will exist that cannot be closed, and to which
@ -779,18 +798,21 @@ class TtyLogStream(mitogen.core.BasicStream):
termination signal to any processes whose controlling TTY is the TTY that
has been closed.
TtyLogStream takes over this descriptor and creates corresponding log
DiagLogStream takes over this descriptor and creates corresponding log
messages for anything written to it.
"""
def __init__(self, tty_fd, stream):
self.receive_side = mitogen.core.Side(self, tty_fd)
def __init__(self, fd, stream):
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = self.receive_side
self.stream = stream
self.buf = ''
def __repr__(self):
return 'mitogen.parent.TtyLogStream(%r)' % (self.stream.name,)
return 'mitogen.parent.DiagLogStream(fd=%r, %r)' % (
self.receive_side.fd,
self.stream.name,
)
def on_receive(self, broker):
"""
@ -1099,9 +1121,214 @@ class ChildIdAllocator(object):
return self.allocate()
class CallChain(object):
"""
Deliver :data:`mitogen.core.CALL_FUNCTION` messages to a target context,
optionally threading related calls so an exception in an earlier call
cancels subsequent calls.
:param mitogen.core.Context context:
Target context.
:param bool pipelined:
Enable pipelining.
:meth:`call`, :meth:`call_no_reply` and :meth:`call_async`
normally issue calls and produce responses with no memory of prior
exceptions. If a call made with :meth:`call_no_reply` fails, the exception
is logged to the target context's logging framework.
**Pipelining**
When pipelining is enabled, if an exception occurs during a call,
subsequent calls made by the same :class:`CallChain` fail with the same
exception, including those already in-flight on the network, and no further
calls execute until :meth:`reset` is invoked.
No exception is logged for calls made with :meth:`call_no_reply`, instead
it is saved and reported as the result of subsequent :meth:`call` or
:meth:`call_async` calls.
Sequences of asynchronous calls can be made without wasting network
round-trips to discover if prior calls succeed, and chains originating from
multiple unrelated source contexts may overlap concurrently at a target
context without interference. In this example, 4 calls complete in one
round-trip::
chain = mitogen.parent.CallChain(context, pipelined=True)
chain.call_no_reply(os.mkdir, '/tmp/foo')
# If previous mkdir() failed, this never runs:
chain.call_no_reply(os.mkdir, '/tmp/foo/bar')
# If either mkdir() failed, this never runs, and the exception is
# asynchronously delivered to the receiver.
recv = chain.call_async(subprocess.check_output, '/tmp/foo')
# If anything so far failed, this never runs, and raises the exception.
chain.call(do_something)
# If this code was executed, the exception would also be raised.
if recv.get().unpickle() == 'baz':
pass
When pipelining is enabled, :meth:`reset` must be invoked to ensure any
exception is discarded, otherwise unbounded memory usage is possible in
long-running programs. The context manager protocol is supported to ensure
:meth:`reset` is always invoked::
with mitogen.parent.CallChain(context, pipelined=True) as chain:
chain.call_no_reply(...)
chain.call_no_reply(...)
chain.call_no_reply(...)
chain.call(...)
# chain.reset() automatically invoked.
"""
def __init__(self, context, pipelined=False):
self.context = context
if pipelined:
self.chain_id = self.make_chain_id()
else:
self.chain_id = None
@classmethod
def make_chain_id(cls):
return '%s-%s-%x-%x' % (
socket.gethostname(),
os.getpid(),
threading.currentThread().ident,
int(1e6 * time.time()),
)
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.context)
def __enter__(self):
return self
def __exit__(self, _1, _2, _3):
self.reset()
def reset(self):
"""
Instruct the target to forget any related exception.
"""
if not self.chain_id:
return
saved, self.chain_id = self.chain_id, None
try:
self.call_no_reply(mitogen.core.Dispatcher.forget_chain, saved)
finally:
self.chain_id = saved
def make_msg(self, fn, *args, **kwargs):
if inspect.ismethod(fn) and inspect.isclass(fn.__self__):
klass = mitogen.core.to_text(fn.__self__.__name__)
else:
klass = None
tup = (
self.chain_id,
mitogen.core.to_text(fn.__module__),
klass,
mitogen.core.to_text(fn.__name__),
args,
mitogen.core.Kwargs(kwargs)
)
return mitogen.core.Message.pickled(tup,
handle=mitogen.core.CALL_FUNCTION)
def call_no_reply(self, fn, *args, **kwargs):
"""
Like :meth:`call_async`, but do not wait for a return value, and inform
the target context no reply is expected. If the call fails and
pipelining is disabled, the exception will be logged to the target
context's logging framework.
"""
LOG.debug('%r.call_no_reply(): %r', self, CallSpec(fn, args, kwargs))
self.context.send(self.make_msg(fn, *args, **kwargs))
def call_async(self, fn, *args, **kwargs):
"""
Arrange for `fn(\*args, \**kwargs)` to be invoked on the context's main
thread.
:param fn:
A free function in module scope or a class method of a class
directly reachable from module scope:
.. code-block:: python
# mymodule.py
def my_func():
'''A free function reachable as mymodule.my_func'''
class MyClass:
@classmethod
def my_classmethod(cls):
'''Reachable as mymodule.MyClass.my_classmethod'''
def my_instancemethod(self):
'''Unreachable: requires a class instance!'''
class MyEmbeddedClass:
@classmethod
def my_classmethod(cls):
'''Not directly reachable from module scope!'''
:param tuple args:
Function arguments, if any. See :ref:`serialization-rules` for
permitted types.
:param dict kwargs:
Function keyword arguments, if any. See :ref:`serialization-rules`
for permitted types.
:returns:
:class:`mitogen.core.Receiver` configured to receive the result of
the invocation:
.. code-block:: python
recv = context.call_async(os.check_output, 'ls /tmp/')
try:
# Prints output once it is received.
msg = recv.get()
print(msg.unpickle())
except mitogen.core.CallError, e:
print('Call failed:', str(e))
Asynchronous calls may be dispatched in parallel to multiple
contexts and consumed as they complete using
:class:`mitogen.select.Select`.
"""
LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs))
return self.context.send_async(self.make_msg(fn, *args, **kwargs))
def call(self, fn, *args, **kwargs):
"""
Like :meth:`call_async`, but block until the return value is available.
Equivalent to::
call_async(fn, *args, **kwargs).get().unpickle()
:returns:
The function's return value.
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
"""
receiver = self.call_async(fn, *args, **kwargs)
return receiver.get().unpickle(throw_dead=False)
class Context(mitogen.core.Context):
call_chain_class = CallChain
via = None
def __init__(self, *args, **kwargs):
super(Context, self).__init__(*args, **kwargs)
self.default_call_chain = self.call_chain_class(self)
def __eq__(self, other):
return (isinstance(other, mitogen.core.Context) and
(other.context_id == self.context_id) and
@ -1111,17 +1338,13 @@ class Context(mitogen.core.Context):
return hash((self.router, self.context_id))
def call_async(self, fn, *args, **kwargs):
LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs))
return self.send_async(make_call_msg(fn, *args, **kwargs))
return self.default_call_chain.call_async(fn, *args, **kwargs)
def call(self, fn, *args, **kwargs):
receiver = self.call_async(fn, *args, **kwargs)
return receiver.get().unpickle(throw_dead=False)
return self.default_call_chain.call(fn, *args, **kwargs)
def call_no_reply(self, fn, *args, **kwargs):
LOG.debug('%r.call_no_reply(%r, *%r, **%r)',
self, fn, args, kwargs)
self.send(make_call_msg(fn, *args, **kwargs))
self.default_call_chain.call_no_reply(fn, *args, **kwargs)
def shutdown(self, wait=False):
LOG.debug('%r.shutdown() sending SHUTDOWN', self)

@ -636,8 +636,7 @@ class PushFileService(Service):
"""
for path in paths:
self.propagate_to(context, path)
for fullname in modules:
self.router.responder.forward_module(context, fullname)
self.router.responder.forward_modules(context, modules)
@expose(policy=AllowParents())
@arg_spec({

@ -118,7 +118,7 @@ class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
container = None
username = None
username = 'root'
kind = None
python_path = 'python'
docker_path = 'docker'
@ -184,7 +184,6 @@ class Stream(mitogen.parent.Stream):
except AttributeError:
pass
if self.username:
try:
os.setgroups([grent.gr_gid
for grent in grp.getgrall()

@ -111,7 +111,6 @@ class HostKeyError(mitogen.core.StreamError):
class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
#: Default to whatever is available as 'python' on the remote machine,
@ -121,8 +120,8 @@ class Stream(mitogen.parent.Stream):
#: Number of -v invocations to pass on command line.
ssh_debug_level = 0
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
#: If batch_mode=False, points to the corresponding DiagLogStream, allowing
#: it to be disconnected at the same time this stream is being torn down.
tty_stream = None
#: The path to the SSH binary.
@ -165,7 +164,32 @@ class Stream(mitogen.parent.Stream):
if ssh_debug_level:
self.ssh_debug_level = ssh_debug_level
self._init_create_child()
def _requires_pty(self):
"""
Return :data:`True` if the configuration requires a PTY to be
allocated. This is only true if we must interactively accept host keys,
or type a password.
"""
return (self.check_host_keys == 'accept' or
self.password is not None)
def _init_create_child(self):
"""
Initialize the base class :attr:`create_child` and
:attr:`create_child_args` according to whether we need a PTY or not.
"""
if self._requires_pty():
self.create_child = mitogen.parent.hybrid_tty_create_child
else:
self.create_child = mitogen.parent.create_child
self.create_child_args = {
'stderr_pipe': True,
}
def on_disconnect(self, broker):
if self.tty_stream is not None:
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
@ -193,6 +217,8 @@ class Stream(mitogen.parent.Stream):
'-o', 'ServerAliveInterval %s' % (self.keepalive_interval,),
'-o', 'ServerAliveCountMax %s' % (self.keepalive_count,),
]
if not self._requires_pty():
bits += ['-o', 'BatchMode yes']
if self.check_host_keys == 'enforce':
bits += ['-o', 'StrictHostKeyChecking yes']
if self.check_host_keys == 'accept':
@ -240,19 +266,23 @@ class Stream(mitogen.parent.Stream):
# with ours.
raise HostKeyError(self.hostkey_config_msg)
def _ec0_received(self):
if self.tty_stream is not None:
self._router.broker.start_receive(self.tty_stream)
return super(Stream, self)._ec0_received()
def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.TtyLogStream(extra_fd, self)
fds = [self.receive_side.fd]
if extra_fd is not None:
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self)
fds.append(extra_fd)
password_sent = False
it = mitogen.parent.iter_read(
fds=[self.receive_side.fd, extra_fd],
deadline=self.connect_deadline
)
it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline)
password_sent = False
for buf, partial in filter_debug(self, it):
LOG.debug('%r: received %r', self, buf)
if buf.endswith(self.EC0_MARKER):
self._router.broker.start_receive(self.tty_stream)
self._ec0_received()
return
elif HOSTKEY_REQ_PROMPT in buf.lower():
@ -265,6 +295,9 @@ class Stream(mitogen.parent.Stream):
# it at the start of the line.
if self.password is not None and password_sent:
raise PasswordError(self.password_incorrect_msg)
elif 'password' in buf and self.password is None:
# Permission denied (password,pubkey)
raise PasswordError(self.password_required_msg)
else:
raise PasswordError(self.auth_incorrect_msg)
elif partial and PASSWORD_PROMPT in buf.lower():

@ -49,7 +49,7 @@ class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.tty_create_child)
child_is_immediate_subprocess = False
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: Once connected, points to the corresponding DiagLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
username = 'root'

@ -107,7 +107,7 @@ class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: Once connected, points to the corresponding DiagLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
tty_stream = None
@ -165,7 +165,7 @@ class Stream(mitogen.parent.Stream):
password_required_msg = 'sudo password is required'
def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.TtyLogStream(extra_fd, self)
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self)
password_sent = False
it = mitogen.parent.iter_read(

@ -63,7 +63,7 @@ def make_socket_path():
class Listener(mitogen.core.BasicStream):
keep_alive = True
def __init__(self, router, path=None, backlog=30):
def __init__(self, router, path=None, backlog=100):
self._router = router
self.path = path or make_socket_path()
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@ -78,21 +78,38 @@ class Listener(mitogen.core.BasicStream):
self.receive_side = mitogen.core.Side(self, self._sock.fileno())
router.broker.start_receive(self)
def on_receive(self, broker):
sock, _ = self._sock.accept()
def _accept_client(self, sock):
sock.setblocking(True)
try:
pid, = struct.unpack('>L', sock.recv(4))
except socket.error:
LOG.error('%r: failed to read remote identity: %s',
self, sys.exc_info()[1])
return
context_id = self._router.id_allocator.allocate()
context = mitogen.parent.Context(self._router, context_id)
stream = mitogen.core.Stream(self._router, context_id)
stream.accept(sock.fileno(), sock.fileno())
stream.name = u'unix_client.%d' % (pid,)
stream.auth_id = mitogen.context_id
stream.is_privileged = True
self._router.register(context, stream)
try:
sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
os.getpid()))
except socket.error:
LOG.error('%r: failed to assign identity to PID %d: %s',
self, pid, sys.exc_info()[1])
return
stream.accept(sock.fileno(), sock.fileno())
self._router.register(context, stream)
def on_receive(self, broker):
sock, _ = self._sock.accept()
try:
self._accept_client(sock)
finally:
sock.close()

@ -1,2 +1,3 @@
lib/modules/custom_binary_producing_junk
lib/modules/custom_binary_producing_json
hosts/*.local

@ -9,7 +9,10 @@ vars_plugins = lib/vars
library = lib/modules
module_utils = lib/module_utils
retry_files_enabled = False
forks = 50
display_args_to_stdout = True
forks = 100
no_target_syslog = True
# Required by integration/ssh/timeouts.yml
timeout = 10

@ -1,51 +0,0 @@
# vim: syntax=dosini
[connection-delegation-test]
cd-bastion
cd-rack11 mitogen_via=ssh-user@cd-bastion
cd-rack11a mitogen_via=root@cd-rack11
cd-rack11a-docker mitogen_via=docker-admin@cd-rack11a ansible_connection=docker
[connection-delegation-cycle]
# Create cycle with Docker container.
cdc-bastion mitogen_via=cdc-rack11a-docker
cdc-rack11 mitogen_via=ssh-user@cdc-bastion
cdc-rack11a mitogen_via=root@cdc-rack11
cdc-rack11a-docker mitogen_via=docker-admin@cdc-rack11a ansible_connection=docker
[conn-delegation]
cd-user1 ansible_user=mitogen__user1 ansible_connection=mitogen_sudo mitogen_via=target
# Connection delegation scenarios. It's impossible to connection to them, but
# you can inspect the would-be config via "mitogen_get_stack" action.
[cd-no-connect]
# Normal inventory host, no aliasing.
cd-normal ansible_connection=mitogen_doas ansible_user=normal-user
# Inventory host that is really a different host.
cd-alias ansible_connection=ssh ansible_user=alias-user ansible_host=alias-host
# Via one normal host.
cd-normal-normal mitogen_via=cd-normal
# Via one aliased host.
cd-normal-alias mitogen_via=cd-alias
# newuser@host via host with explicit username.
cd-newuser-normal-normal mitogen_via=cd-normal ansible_user=newuser-normal-normal-user
# doas:newuser via host.
cd-newuser-doas-normal mitogen_via=cd-normal ansible_connection=mitogen_doas ansible_user=newuser-doas-normal-user
# Connection Delegation issue #340 reproduction.
# Path to jails is SSH to H -> mitogen_sudo to root -> jail to J
[issue340]
# 'target' plays the role of the normal host machine H.
# 'mitogen__sudo1' plays the role of root@H via mitogen_sudo.
# 'mitogen__user1' plays the role of root@J via mitogen__user1.
# 'mitogen__user2' plays the role of E, the delgate_to target for certs.
i340-root ansible_user=mitogen__sudo1 ansible_connection=mitogen_sudo mitogen_via=target
i340-jail ansible_user=mitogen__user1 ansible_connection=mitogen_sudo mitogen_via=i340-root
i340-certs ansible_user=mitogen__user2 ansible_connection=mitogen_sudo mitogen_via=target

@ -5,6 +5,24 @@
git_email: '{{ lookup("pipe", "git config --global user.email") }}'
tasks:
- lineinfile:
line: "{{item}}"
path: /etc/sysctl.conf
register: sysctl_conf
become: true
with_items:
- "net.ipv4.ip_forward=1"
- "kernel.perf_event_paranoid=-1"
- copy:
src: ~/.ssh/id_gitlab
dest: ~/.ssh/id_gitlab
mode: 0600
- template:
dest: ~/.ssh/config
src: ssh_config.j2
- lineinfile:
line: "net.ipv4.ip_forward=1"
path: /etc/sysctl.conf

@ -1,2 +1,2 @@
[controller]
35.206.145.240
c

@ -0,0 +1,19 @@
[defaults]
inventory = hosts,~/mitogen/tests/ansible/lib/inventory
gathering = explicit
strategy_plugins = ~/mitogen/ansible_mitogen/plugins/strategy
action_plugins = ~/mitogen/tests/ansible/lib/action
callback_plugins = ~/mitogen/tests/ansible/lib/callback
stdout_callback = nice_stdout
vars_plugins = ~/mitogen/tests/ansible/lib/vars
library = ~/mitogen/tests/ansible/lib/modules
retry_files_enabled = False
forks = 50
strategy = mitogen_linear
host_key_checking = False
[ssh_connection]
ssh_args = -o ForwardAgent=yes -o ControlMaster=auto -o ControlPersist=60s
pipelining = True

@ -0,0 +1,6 @@
Host localhost-*
Hostname localhost
Host gitlab.com
IdentityFile ~/.ssh/id_gitlab

@ -1,100 +0,0 @@
mydeb9-1 ansible_connection=docker
mydeb9-2 ansible_connection=docker
mydeb9-3 ansible_connection=docker
mydeb9-4 ansible_connection=docker
mydeb9-5 ansible_connection=docker
mydeb9-6 ansible_connection=docker
mydeb9-7 ansible_connection=docker
mydeb9-8 ansible_connection=docker
mydeb9-9 ansible_connection=docker
mydeb9-10 ansible_connection=docker
mydeb9-11 ansible_connection=docker
mydeb9-12 ansible_connection=docker
mydeb9-13 ansible_connection=docker
mydeb9-14 ansible_connection=docker
mydeb9-15 ansible_connection=docker
mydeb9-16 ansible_connection=docker
mydeb9-17 ansible_connection=docker
mydeb9-18 ansible_connection=docker
mydeb9-19 ansible_connection=docker
mydeb9-20 ansible_connection=docker
mydeb9-21 ansible_connection=docker
mydeb9-22 ansible_connection=docker
mydeb9-23 ansible_connection=docker
mydeb9-24 ansible_connection=docker
mydeb9-25 ansible_connection=docker
mydeb9-26 ansible_connection=docker
mydeb9-27 ansible_connection=docker
mydeb9-28 ansible_connection=docker
mydeb9-29 ansible_connection=docker
mydeb9-30 ansible_connection=docker
mydeb9-31 ansible_connection=docker
mydeb9-32 ansible_connection=docker
mydeb9-33 ansible_connection=docker
mydeb9-34 ansible_connection=docker
mydeb9-35 ansible_connection=docker
mydeb9-36 ansible_connection=docker
mydeb9-37 ansible_connection=docker
mydeb9-38 ansible_connection=docker
mydeb9-39 ansible_connection=docker
mydeb9-40 ansible_connection=docker
mydeb9-41 ansible_connection=docker
mydeb9-42 ansible_connection=docker
mydeb9-43 ansible_connection=docker
mydeb9-44 ansible_connection=docker
mydeb9-45 ansible_connection=docker
mydeb9-46 ansible_connection=docker
mydeb9-47 ansible_connection=docker
mydeb9-48 ansible_connection=docker
mydeb9-49 ansible_connection=docker
mydeb9-50 ansible_connection=docker
mydeb9-51 ansible_connection=docker
mydeb9-52 ansible_connection=docker
mydeb9-53 ansible_connection=docker
mydeb9-54 ansible_connection=docker
mydeb9-55 ansible_connection=docker
mydeb9-56 ansible_connection=docker
mydeb9-57 ansible_connection=docker
mydeb9-58 ansible_connection=docker
mydeb9-59 ansible_connection=docker
mydeb9-60 ansible_connection=docker
mydeb9-61 ansible_connection=docker
mydeb9-62 ansible_connection=docker
mydeb9-63 ansible_connection=docker
mydeb9-64 ansible_connection=docker
mydeb9-65 ansible_connection=docker
mydeb9-66 ansible_connection=docker
mydeb9-67 ansible_connection=docker
mydeb9-68 ansible_connection=docker
mydeb9-69 ansible_connection=docker
mydeb9-70 ansible_connection=docker
mydeb9-71 ansible_connection=docker
mydeb9-72 ansible_connection=docker
mydeb9-73 ansible_connection=docker
mydeb9-74 ansible_connection=docker
mydeb9-75 ansible_connection=docker
mydeb9-76 ansible_connection=docker
mydeb9-77 ansible_connection=docker
mydeb9-78 ansible_connection=docker
mydeb9-79 ansible_connection=docker
mydeb9-80 ansible_connection=docker
mydeb9-81 ansible_connection=docker
mydeb9-82 ansible_connection=docker
mydeb9-83 ansible_connection=docker
mydeb9-84 ansible_connection=docker
mydeb9-85 ansible_connection=docker
mydeb9-86 ansible_connection=docker
mydeb9-87 ansible_connection=docker
mydeb9-88 ansible_connection=docker
mydeb9-89 ansible_connection=docker
mydeb9-90 ansible_connection=docker
mydeb9-91 ansible_connection=docker
mydeb9-92 ansible_connection=docker
mydeb9-93 ansible_connection=docker
mydeb9-94 ansible_connection=docker
mydeb9-95 ansible_connection=docker
mydeb9-96 ansible_connection=docker
mydeb9-97 ansible_connection=docker
mydeb9-98 ansible_connection=docker
mydeb9-99 ansible_connection=docker
mydeb9-100 ansible_connection=docker

@ -0,0 +1,37 @@
# vim: syntax=dosini
[connection-delegation-test]
cd-bastion
cd-rack11 mitogen_via=ssh-user@cd-bastion
cd-rack11a mitogen_via=root@cd-rack11
cd-rack11a-docker mitogen_via=docker-admin@cd-rack11a ansible_connection=docker
[connection-delegation-cycle]
# Create cycle with Docker container.
cdc-bastion mitogen_via=cdc-rack11a-docker
cdc-rack11 mitogen_via=ssh-user@cdc-bastion
cdc-rack11a mitogen_via=root@cdc-rack11
cdc-rack11a-docker mitogen_via=docker-admin@cdc-rack11a ansible_connection=docker
[conn-delegation]
cd-user1 ansible_user=mitogen__user1 ansible_connection=mitogen_sudo mitogen_via=target
# Connection delegation scenarios. It's impossible to connection to them, but
# you can inspect the would-be config via "mitogen_get_stack" action.
[cd-no-connect]
# Normal inventory host, no aliasing.
cd-normal ansible_connection=mitogen_doas ansible_user=normal-user
# Inventory host that is really a different host.
cd-alias ansible_connection=ssh ansible_user=alias-user ansible_host=alias-host
# Via one normal host.
cd-normal-normal mitogen_via=cd-normal
# Via one aliased host.
cd-normal-alias mitogen_via=cd-alias
# newuser@host via host with explicit username.
cd-newuser-normal-normal mitogen_via=cd-normal ansible_user=newuser-normal-normal-user
# doas:newuser via host.
cd-newuser-doas-normal mitogen_via=cd-normal ansible_connection=mitogen_doas ansible_user=newuser-doas-normal-user

@ -0,0 +1,12 @@
[connection-delegation-test]
cd-bastion
cd-rack11 mitogen_via=ssh-user@cd-bastion
cd-rack11a mitogen_via=root@cd-rack11
cd-rack11a-docker mitogen_via=docker-admin@cd-rack11a ansible_connection=docker
[connection-delegation-cycle]
# Create cycle with Docker container.
cdc-bastion mitogen_via=cdc-rack11a-docker
cdc-rack11 mitogen_via=ssh-user@cdc-bastion
cdc-rack11a mitogen_via=root@cdc-rack11
cdc-rack11a-docker mitogen_via=docker-admin@cdc-rack11a ansible_connection=docker

@ -0,0 +1,4 @@
---
ansible_connection: setns
mitogen_kind: lxc

@ -0,0 +1,12 @@
# Connection Delegation issue #340 reproduction.
# Path to jails is SSH to H -> mitogen_sudo to root -> jail to J
[issue340]
# 'target' plays the role of the normal host machine H.
# 'mitogen__sudo1' plays the role of root@H via mitogen_sudo.
# 'mitogen__user1' plays the role of root@J via mitogen__user1.
# 'mitogen__user2' plays the role of E, the delgate_to target for certs.
i340-root ansible_user=mitogen__sudo1 ansible_connection=mitogen_sudo mitogen_via=target
i340-jail ansible_user=mitogen__user1 ansible_connection=mitogen_sudo mitogen_via=i340-root
i340-certs ansible_user=mitogen__user2 ansible_connection=mitogen_sudo mitogen_via=target

@ -0,0 +1,25 @@
k3
[k3-x10]
k3-[01:10]
[k3-x20]
k3-[01:20]
[k3-x50]
k3-[01:50]
[k3-x100]
k3-[001:100]
[k3-x200]
k3-[001:200]
[k3-x300]
k3-[001:300]
[k3-x400]
k3-[001:400]
[k3-x500]
k3-[001:500]

@ -1,2 +1,8 @@
[test-targets]
localhost
target ansible_host=localhost
[test-targets]
target
[localhost-x10]
localhost-[01:10]

@ -0,0 +1,10 @@
nessy
[nessy-x10]
nessy-[00:10]
[nessy-x20]
nessy-[00:20]
[nessy-x50]
nessy-[00:50]

@ -0,0 +1,9 @@
# integration/delegation/delegate_to_container.yml
# Patterned after openstack-ansible/all_containers.yml
osa-host-machine ansible_host=172.29.236.100
[osa-all-containers]
osa-container-1 container_tech=lxc
osa-container-2 container_tech=lxc
osa-container-3 container_tech=lxc

@ -49,7 +49,7 @@
#
- file:
path: /tmp/weird-mode
path: /tmp/weird-mode.out
state: absent
- name: Create local test file.
@ -61,10 +61,10 @@
- copy:
src: "/tmp/weird-mode"
dest: "/tmp/weird-mode"
dest: "/tmp/weird-mode.out"
- stat:
path: "/tmp/weird-mode"
path: "/tmp/weird-mode.out"
register: out
- assert:
that:

@ -1,2 +1,4 @@
- import_playbook: delegate_to_template.yml
- import_playbook: osa_container_standalone.yml
- import_playbook: osa_delegate_to_self.yml
- import_playbook: stack_construction.yml

@ -18,6 +18,30 @@
- assert:
that: |
out.result == [
{
'kwargs': {
'check_host_keys': 'ignore',
'connect_timeout': 10,
'hostname': 'alias-host',
'identities_only': False,
'identity_file': None,
'password': None,
'port': None,
'python_path': None,
'ssh_args': [
'-o',
'ForwardAgent=yes',
'-o',
'ControlMaster=auto',
'-o',
'ControlPersist=60s',
],
'ssh_debug_level': None,
'ssh_path': 'ssh',
'username': 'alias-user',
},
'method': 'ssh',
},
{
'kwargs': {
'check_host_keys': 'ignore',
@ -41,5 +65,5 @@
'username': None,
},
'method': 'ssh',
},
}
]

@ -0,0 +1,29 @@
# Verify one OSA-style container has the correct config.
- name: integration/delegation/container_standalone.yml
hosts: dtc-container-1
gather_facts: false
tasks:
- meta: end_play
when: not is_mitogen
- mitogen_get_stack:
register: out
- debug: msg={{out}}
- assert:
that: |
out.result == [
{
'kwargs': {
'container': 'dtc-container-1',
'docker_path': None,
'kind': 'lxc',
'lxc_info_path': None,
'machinectl_path': None,
'python_path': ['/usr/bin/python'],
'username': None,
},
'method': 'setns',
},
]

@ -0,0 +1,31 @@
# OSA: Verify delegating the connection back to the container succeeds.
- name: integration/delegation/osa_delegate_to_self.yml
hosts: osa-container-1
vars:
target: osa-container-1
gather_facts: false
tasks:
- meta: end_play
when: not is_mitogen
- mitogen_get_stack:
delegate_to: "{{target}}"
register: out
- assert:
that: |
out.result == [
{
'kwargs': {
'container': 'osa-container-1',
'docker_path': None,
'kind': 'lxc',
'lxc_info_path': None,
'machinectl_path': None,
'python_path': None,
'username': None,
},
'method': 'setns',
},
]

@ -14,5 +14,7 @@
- import_playbook: custom_script_interpreter.yml
- import_playbook: environment_isolation.yml
- import_playbook: etc_environment.yml
- import_playbook: forking_behaviour.yml
- import_playbook: forking_active.yml
- import_playbook: forking_inactive.yml
- import_playbook: forking_correct_parent.yml
- import_playbook: missing_module.yml

@ -1,27 +1,13 @@
- name: integration/runner/forking_behaviour.yml
- name: integration/runner/forking_active.yml
hosts: test-targets
any_errors_fatal: true
tasks:
# Verify non-async jobs run in-process.
# Verify mitogen_task_isolation=fork triggers forking.
- name: get process ID.
- name: get regular process ID.
custom_python_detect_environment:
register: sync_proc1
when: is_mitogen
- name: get process ID again.
custom_python_detect_environment:
register: sync_proc2
when: is_mitogen
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
when: is_mitogen
# Verify mitogen_task_isolation=fork triggers forking.
- name: get force-forked process ID.
custom_python_detect_environment:
@ -42,3 +28,4 @@
- fork_proc1.pid != sync_proc1.pid
- fork_proc1.pid != fork_proc2.pid
when: is_mitogen

@ -0,0 +1,26 @@
- name: integration/runner/forking_correct_parent.yml
hosts: test-targets
any_errors_fatal: true
tasks:
# Verify mitogen_task_isolation=fork forks from "virginal fork parent", not
# shared interpreter.
- name: get regular process ID.
custom_python_detect_environment:
register: regular_proc
when: is_mitogen
- name: get force-forked process ID again.
custom_python_detect_environment:
register: fork_proc
vars:
mitogen_task_isolation: fork
when: is_mitogen
- assert:
that:
- fork_proc.pid != regular_proc.pid
- fork_proc.ppid != regular_proc.pid
when: is_mitogen

@ -0,0 +1,23 @@
# Verify non-async jobs run in-process.
- name: integration/runner/forking_inactive.yml
hosts: test-targets
any_errors_fatal: true
tasks:
- name: get process ID.
custom_python_detect_environment:
register: sync_proc1
when: is_mitogen
- name: get process ID again.
custom_python_detect_environment:
register: sync_proc2
when: is_mitogen
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
when: is_mitogen

@ -1,6 +1,8 @@
from __future__ import unicode_literals
import os
import io
from ansible import constants as C
from ansible.module_utils import six
try:
@ -8,6 +10,11 @@ try:
except ImportError:
from ansible.plugins.loader import callback_loader
try:
pprint = __import__(os.environ['NICE_STDOUT_PPRINT'])
except KeyError:
pprint = None
def printi(tio, obj, key=None, indent=0):
def write(s, *args):
@ -50,9 +57,43 @@ class CallbackModule(DefaultModule):
def _dump_results(self, result, *args, **kwargs):
try:
tio = io.StringIO()
if pprint:
pprint.pprint(result, stream=tio)
else:
printi(tio, result)
return tio.getvalue() #.encode('ascii', 'replace')
except:
import traceback
traceback.print_exc()
raise
def v2_runner_on_failed(self, result, ignore_errors=False):
delegated_vars = result._result.get('_ansible_delegated_vars')
self._clean_results(result._result, result._task.action)
if self._play.strategy == 'free' and self._last_task_banner != result._task._uuid:
self._print_task_banner(result._task)
self._handle_exception(result._result)
self._handle_warnings(result._result)
if result._task.loop and 'results' in result._result:
return
if delegated_vars:
msg = "[%s -> %s]: FAILED! => %s" % (
result._host.get_name(),
delegated_vars['ansible_host'],
self._dump_results(result._result),
)
else:
msg = "[%s]: FAILED! => %s" % (
result._host.get_name(),
self._dump_results(result._result),
)
s = "fatal: %s: %s" % (
result._task.get_path() or '(dynamic task)',
msg,
)
self._display.display(s, color=C.COLOR_ERROR)

@ -14,7 +14,7 @@ import googleapiclient.discovery
def main():
project = 'mitogen-load-testing'
zone = 'europe-west1-d'
group_name = 'target'
group_name = 'micro-debian9'
client = googleapiclient.discovery.build('compute', 'v1')
resp = client.instances().list(project=project, zone=zone).execute()

@ -16,7 +16,7 @@
creates: /tmp/filetree.in
- name: Delete remote file tree
shell: rm -rf /tmp/filetree.out
file: path=/tmp/filetree.out state=absent
- file:
state: directory
@ -26,6 +26,5 @@
copy:
src: "{{item.src}}"
dest: "/tmp/filetree.out/{{item.path}}"
with_filetree:
- /tmp/filetree.in
with_filetree: /tmp/filetree.in
when: item.state == 'file'

@ -4,6 +4,7 @@ import time
import unittest2
import mitogen.core
import mitogen.parent
import mitogen.master
import testlib
@ -18,15 +19,15 @@ def function_that_adds_numbers(x, y):
return x + y
def function_that_fails():
raise plain_old_module.MyError('exception text')
def function_that_fails(s=''):
raise plain_old_module.MyError('exception text'+s)
def func_with_bad_return_value():
return CrazyType()
def func_accepts_returns_context(context):
def func_returns_arg(context):
return context
@ -101,7 +102,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg)
def test_accepts_returns_context(self):
context = self.local.call(func_accepts_returns_context, self.local)
context = self.local.call(func_returns_arg, self.local)
self.assertIsNot(context, self.local)
self.assertEqual(context.context_id, self.local.context_id)
self.assertEqual(context.name, self.local.name)
@ -118,5 +119,40 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
lambda: recv.get().unpickle())
class ChainTest(testlib.RouterMixin, testlib.TestCase):
# Verify mitogen_chain functionality.
klass = mitogen.parent.CallChain
def setUp(self):
super(ChainTest, self).setUp()
self.local = self.router.fork()
def test_subsequent_calls_produce_same_error(self):
chain = self.klass(self.local, pipelined=True)
self.assertEquals('xx', chain.call(func_returns_arg, 'xx'))
chain.call_no_reply(function_that_fails, 'x1')
e1 = self.assertRaises(mitogen.core.CallError,
lambda: chain.call(function_that_fails, 'x2'))
e2 = self.assertRaises(mitogen.core.CallError,
lambda: chain.call(func_returns_arg, 'x3'))
self.assertEquals(str(e1), str(e2))
def test_unrelated_overlapping_failed_chains(self):
c1 = self.klass(self.local, pipelined=True)
c2 = self.klass(self.local, pipelined=True)
c1.call_no_reply(function_that_fails, 'c1')
self.assertEquals('yes', c2.call(func_returns_arg, 'yes'))
self.assertRaises(mitogen.core.CallError,
lambda: c1.call(func_returns_arg, 'yes'))
def test_reset(self):
c1 = self.klass(self.local, pipelined=True)
c1.call_no_reply(function_that_fails, 'x1')
e1 = self.assertRaises(mitogen.core.CallError,
lambda: c1.call(function_that_fails, 'x2'))
c1.reset()
self.assertEquals('x3', c1.call(func_returns_arg, 'x3'))
if __name__ == '__main__':
unittest2.main()

@ -6,6 +6,52 @@ import shlex
import subprocess
import sys
HOST_KEY_ASK_MSG = """
The authenticity of host '[91.121.165.123]:9122 ([91.121.165.123]:9122)' can't be established.
ECDSA key fingerprint is SHA256:JvfPvazZzQ9/CUdKN7tiYlNZtDRdEgDsYVIzOgPrsR4.
Are you sure you want to continue connecting (yes/no)?
""".strip('\n')
HOST_KEY_STRICT_MSG = """Host key verification failed.\n"""
def tty(msg):
fp = open('/dev/tty', 'w', 0)
fp.write(msg)
fp.close()
def stderr(msg):
fp = open('/dev/stderr', 'w', 0)
fp.write(msg)
fp.close()
def confirm(msg):
tty(msg)
fp = open('/dev/tty', 'r', 0)
try:
return fp.readline()
finally:
fp.close()
if os.getenv('FAKESSH_MODE') == 'ask':
assert 'y\n' == confirm(HOST_KEY_ASK_MSG)
if os.getenv('FAKESSH_MODE') == 'strict':
stderr(HOST_KEY_STRICT_MSG)
sys.exit(255)
#
# Set an env var if stderr was a TTY to make ssh_test tests easier to write.
#
if os.isatty(2):
os.environ['STDERR_WAS_TTY'] = '1'
parser = optparse.OptionParser()
parser.add_option('--user', '-l', action='store')
parser.add_option('-o', dest='options', action='append')

@ -121,6 +121,25 @@ class ContextTest(testlib.RouterMixin, unittest2.TestCase):
self.assertRaises(OSError, lambda: os.kill(pid, 0))
class OpenPtyTest(testlib.TestCase):
func = staticmethod(mitogen.parent.openpty)
def test_pty_returned(self):
master_fd, slave_fd = self.func()
self.assertTrue(isinstance(master_fd, int))
self.assertTrue(isinstance(slave_fd, int))
os.close(master_fd)
os.close(slave_fd)
@mock.patch('os.openpty')
def test_max_reached(self, openpty):
openpty.side_effect = OSError(errno.ENXIO)
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.func())
msg = mitogen.parent.OPENPTY_MSG % (openpty.side_effect,)
self.assertEquals(e.args[0], msg)
class TtyCreateChildTest(unittest2.TestCase):
func = staticmethod(mitogen.parent.tty_create_child)

@ -1,3 +1,4 @@
import os
import sys
import mitogen
@ -123,6 +124,41 @@ class BannerTest(testlib.DockerMixin, unittest2.TestCase):
self.assertEquals(name, context.name)
class RequirePtyTest(testlib.DockerMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream
def fake_ssh(self, FAKESSH_MODE=None, **kwargs):
os.environ['FAKESSH_MODE'] = str(FAKESSH_MODE)
try:
return self.router.ssh(
hostname='hostname',
username='mitogen__has_sudo',
ssh_path=testlib.data_path('fakessh.py'),
**kwargs
)
finally:
del os.environ['FAKESSH_MODE']
def test_check_host_keys_accept(self):
# required=true, host_key_checking=accept
context = self.fake_ssh(FAKESSH_MODE='ask', check_host_keys='accept')
self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_check_host_keys_enforce(self):
# required=false, host_key_checking=enforce
context = self.fake_ssh(check_host_keys='enforce')
self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_check_host_keys_ignore(self):
# required=false, host_key_checking=ignore
context = self.fake_ssh(check_host_keys='ignore')
self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_password_present(self):
# required=true, password is not None
context = self.fake_ssh(check_host_keys='ignore', password='willick')
self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY'))
if __name__ == '__main__':
unittest2.main()

Loading…
Cancel
Save