Merge pull request #244 from dw/dmw

Dmw
pull/246/head
dw 7 years ago committed by GitHub
commit e56104ff15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -30,7 +30,7 @@ docker run \
--detach \
--publish 0.0.0.0:2201:22/tcp \
--name=target \
d2mw/mitogen-${MITOGEN_TEST_DISTRO}-test
mitogen/${MITOGEN_TEST_DISTRO}-test
echo travis_fold:end:docker_setup
@ -39,6 +39,7 @@ pip install -U ansible=="${ANSIBLE_VERSION}"
cd ${TRAVIS_BUILD_DIR}/tests/ansible
chmod go= ${TRAVIS_BUILD_DIR}/tests/data/docker/mitogen__has_sudo_pubkey.key
echo '[test-targets]' > ${TMPDIR}/hosts
echo \
target \
ansible_host=$DOCKER_HOSTNAME \
@ -59,7 +60,6 @@ echo travis_fold:end:job_setup
echo travis_fold:start:mitogen_linear
/usr/bin/time ./mitogen_ansible_playbook.sh \
all.yml \
-vvv \
-i "${TMPDIR}/hosts"
echo travis_fold:end:mitogen_linear
@ -67,6 +67,5 @@ echo travis_fold:end:mitogen_linear
echo travis_fold:start:vanilla_ansible
/usr/bin/time ./run_ansible_playbook.sh \
all.yml \
-vvv \
-i "${TMPDIR}/hosts"
echo travis_fold:end:vanilla_ansible

@ -60,7 +60,7 @@ do
--detach \
--publish 0.0.0.0:$port:22/tcp \
--name=target$i \
d2mw/mitogen-${MITOGEN_TEST_DISTRO}-test
mitogen/${MITOGEN_TEST_DISTRO}-test
echo \
target$i \

@ -356,7 +356,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
executing. We use the opportunity to grab relevant bits from the
task-specific data.
"""
self.ansible_ssh_timeout = task_vars.get('ansible_ssh_timeout')
self.ansible_ssh_timeout = task_vars.get('ansible_ssh_timeout',
C.DEFAULT_TIMEOUT)
self.python_path = task_vars.get('ansible_python_interpreter',
'/usr/bin/python')
self.mitogen_via = task_vars.get('mitogen_via')

@ -319,7 +319,6 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
connection=self._connection,
module_name=mitogen.utils.cast(module_name),
module_args=mitogen.utils.cast(module_args),
remote_tmp=mitogen.utils.cast(self._get_remote_tmp()),
task_vars=task_vars,
templar=self._templar,
env=mitogen.utils.cast(env),

@ -92,7 +92,7 @@ class Invocation(object):
target.run_module() or helpers.run_module_async() in the target context.
"""
def __init__(self, action, connection, module_name, module_args,
remote_tmp, task_vars, templar, env, wrap_async):
task_vars, templar, env, wrap_async):
#: ActionBase instance invoking the module. Required to access some
#: output postprocessing methods that don't belong in ActionBase at
#: all.
@ -104,9 +104,6 @@ class Invocation(object):
self.module_name = module_name
#: Final module arguments.
self.module_args = module_args
#: Value of 'remote_tmp' parameter, to allow target to create temporary
#: files in correct location.
self.remote_tmp = remote_tmp
#: Task variables, needed to extract ansible_*_interpreter.
self.task_vars = task_vars
#: Templar, needed to extract ansible_*_interpreter.
@ -198,7 +195,6 @@ class BinaryPlanner(Planner):
path=invocation.module_path,
args=invocation.module_args,
env=invocation.env,
remote_tmp=invocation.remote_tmp,
**kwargs
)

@ -74,7 +74,7 @@ def reopen_readonly(fp):
"""
Replace the file descriptor belonging to the file object `fp` with one
open on the same file (`fp.name`), but opened with :py:data:`os.O_RDONLY`.
This enables temporary files to be executed on Linux, which usually theows
This enables temporary files to be executed on Linux, which usually throws
``ETXTBUSY`` if any writeable handle exists pointing to a file passed to
`execve()`.
"""
@ -91,34 +91,32 @@ class Runner(object):
returned by `run()`.
Subclasses may override `_run`()` and extend `setup()` and `revert()`.
:param str module:
Name of the module to execute, e.g. "shell"
:param mitogen.core.Context service_context:
Context to which we should direct FileService calls. For now, always
the connection multiplexer process on the controller.
:param dict args:
Ansible module arguments. A strange mixture of user and internal keys
created by ActionBase._execute_module().
:param dict env:
Additional environment variables to set during the run.
"""
def __init__(self, module, remote_tmp, service_context,
emulate_tty=None, raw_params=None, args=None, env=None):
def __init__(self, module, service_context, args=None, env=None):
if args is None:
args = {}
if raw_params is not None:
args['_raw_params'] = raw_params
self.module = utf8(module)
self.remote_tmp = utf8(os.path.expanduser(remote_tmp))
self.service_context = service_context
self.emulate_tty = emulate_tty
self.raw_params = raw_params
self.args = args
self.env = env
self._temp_dir = None
def get_temp_dir(self):
if not self._temp_dir:
self._temp_dir = tempfile.mkdtemp(prefix='ansible_mitogen_')
# https://github.com/dw/mitogen/issues/239
#ansible_mitogen.target.make_temp_directory(self.remote_tmp)
return self._temp_dir
def setup(self):
"""
Prepare the current process for running a module. The base
implementation simply prepares the environment.
Prepare for running a module, including fetching necessary dependencies
from the parent, as :meth:`run` may detach prior to beginning
execution. The base implementation simply prepares the environment.
"""
self._env = TemporaryEnvironment(self.env)
@ -128,8 +126,19 @@ class Runner(object):
implementation simply restores the original environment.
"""
self._env.revert()
if self._temp_dir:
shutil.rmtree(self._temp_dir)
self._cleanup_temp()
def _cleanup_temp(self):
"""
Empty temp_dir in time for the next module invocation.
"""
for name in os.listdir(ansible_mitogen.target.temp_dir):
if name in ('.', '..'):
continue
path = os.path.join(ansible_mitogen.target.temp_dir, name)
LOG.debug('Deleting %r', path)
ansible_mitogen.target.prune_tree(path)
def _run(self):
"""
@ -175,7 +184,7 @@ class TemporaryEnvironment(object):
class TemporaryArgv(object):
def __init__(self, argv):
self.original = sys.argv[:]
sys.argv[:] = argv
sys.argv[:] = map(str, argv)
def revert(self):
sys.argv[:] = self.original
@ -206,23 +215,44 @@ class NewStyleStdio(object):
class ProgramRunner(Runner):
def __init__(self, path, **kwargs):
"""
Base class for runners that run external programs.
:param str path:
Absolute path to the program file on the master, as it can be retrieved
via :class:`ansible_mitogen.services.FileService`.
:param bool emulate_tty:
If :data:`True`, execute the program with `stdout` and `stderr` merged
into a single pipe, emulating Ansible behaviour when an SSH TTY is in
use.
"""
def __init__(self, path, emulate_tty=None, **kwargs):
super(ProgramRunner, self).__init__(**kwargs)
self.path = path
self.emulate_tty = emulate_tty
self.path = utf8(path)
def setup(self):
super(ProgramRunner, self).setup()
self._setup_program()
def _get_program_filename(self):
"""
Return the filename used for program on disk. Ansible uses the original
filename for non-Ansiballz runs, and "ansible_module_+filename for
Ansiballz runs.
"""
return os.path.basename(self.path)
program_fp = None
def _setup_program(self):
"""
Create a temporary file containing the program code. The code is
fetched via :meth:`_get_program`.
"""
self.program_fp = open(
os.path.join(self.get_temp_dir(), self.module),
'wb'
)
filename = self._get_program_filename()
path = os.path.join(ansible_mitogen.target.temp_dir, filename)
self.program_fp = open(path, 'wb')
self.program_fp.write(self._get_program())
self.program_fp.flush()
os.chmod(self.program_fp.name, int('0700', 8))
@ -248,7 +278,8 @@ class ProgramRunner(Runner):
"""
Delete the temporary program file.
"""
self.program_fp.close()
if self.program_fp:
self.program_fp.close()
super(ProgramRunner, self).revert()
def _run(self):
@ -285,7 +316,7 @@ class ArgsFileRunner(Runner):
self.args_fp = tempfile.NamedTemporaryFile(
prefix='ansible_mitogen',
suffix='-args',
dir=self.get_temp_dir(),
dir=ansible_mitogen.target.temp_dir,
)
self.args_fp.write(self._get_args_contents())
self.args_fp.flush()
@ -362,22 +393,39 @@ class NewStyleRunner(ScriptRunner):
def setup(self):
super(NewStyleRunner, self).setup()
self._stdio = NewStyleStdio(self.args)
self._argv = TemporaryArgv([self.program_fp.name])
# It is possible that not supplying the script filename will break some
# module, but this has never been a bug report. Instead act like an
# interpreter that had its script piped on stdin.
self._argv = TemporaryArgv([''])
def revert(self):
self._argv.revert()
self._stdio.revert()
super(NewStyleRunner, self).revert()
def _get_program_filename(self):
"""
See ProgramRunner._get_program_filename().
"""
return 'ansible_module_' + os.path.basename(self.path)
def _setup_args(self):
pass
def _setup_program(self):
pass
def _get_code(self):
self.source = ansible_mitogen.target.get_file(
context=self.service_context,
path=self.path,
)
try:
return self._code_by_path[self.path]
except KeyError:
return self._code_by_path.setdefault(self.path, compile(
source=ansible_mitogen.target.get_file(
context=self.service_context,
path=self.path,
),
source=self.source,
filename=self.path,
mode='exec',
dont_inherit=True,
@ -385,14 +433,21 @@ class NewStyleRunner(ScriptRunner):
def _run(self):
code = self._get_code()
mod = types.ModuleType('__main__')
mod.__file__ = self.program_fp.name
mod.__package__ = None
d = vars(mod)
# Some Ansible modules use __file__ to find the Ansiballz temporary
# directory. We must provide some temporary path in __file__, but we
# don't want to pointlessly write the module to disk when it never
# actually needs to exist. So just pass the filename as it would exist.
mod.__file__ = os.path.join(
ansible_mitogen.target.temp_dir,
'ansible_module_' + os.path.basename(self.path),
)
e = None
try:
exec code in d, d
exec code in vars(mod)
except SystemExit, e:
pass

@ -270,7 +270,7 @@ class ContextService(mitogen.service.Service):
# We don't need to wait for the result of this. Ideally we'd check its
# return value somewhere, but logs will catch a failure anyway.
context.call_async(ansible_mitogen.target.start_fork_parent)
context.call_async(ansible_mitogen.target.init_child)
if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'):
from mitogen import debug

@ -33,6 +33,7 @@ for file transfer, module execution and sundry bits like changing file modes.
from __future__ import absolute_import
import cStringIO
import errno
import grp
import json
import logging
@ -58,6 +59,10 @@ import mitogen.service
LOG = logging.getLogger(__name__)
#: Set by init_child() to the single temporary directory that will exist for
#: the duration of the process.
temp_dir = None
#: Caching of fetched file data.
_file_cache = {}
@ -191,8 +196,68 @@ def transfer_file(context, in_path, out_path, sync=False, set_owner=False):
os.utime(out_path, (metadata['atime'], metadata['mtime']))
def prune_tree(path):
"""
Like shutil.rmtree(), but log errors rather than discard them, and do not
waste multiple os.stat() calls discovering whether the object can be
deleted, just try deleting it instead.
"""
try:
os.unlink(path)
return
except OSError, e:
if not (os.path.isdir(path) and
e.args[0] in (errno.EPERM, errno.EISDIR)):
LOG.error('prune_tree(%r): %s', path, e)
return
try:
# Ensure write access for readonly directories. Ignore error in case
# path is on a weird filesystem (e.g. vfat).
os.chmod(path, int('0700', 8))
except OSError, e:
LOG.warning('prune_tree(%r): %s', path, e)
try:
for name in os.listdir(path):
if name not in ('.', '..'):
prune_tree(os.path.join(path, name))
os.rmdir(path)
except OSError, e:
LOG.error('prune_tree(%r): %s', path, e)
def _on_broker_shutdown():
"""
Respond to broker shutdown (graceful termination by parent, or loss of
connection to parent) by deleting our sole temporary directory.
"""
prune_tree(temp_dir)
@mitogen.core.takes_econtext
def reset_temp_dir(econtext):
"""
Create one temporary directory to be reused by all runner.py invocations
for the lifetime of the process. The temporary directory is changed for
each forked job, and emptied as necessary by runner.py::_cleanup_temp()
after each module invocation.
The result is that a context need only create and delete one directory
during startup and shutdown, and no further filesystem writes need occur
assuming no modules execute that create temporary files.
"""
global temp_dir
# https://github.com/dw/mitogen/issues/239
temp_dir = tempfile.mkdtemp(prefix='ansible_mitogen_')
# This must be reinstalled in forked children too, since the Broker
# instance from the parent process does not carry over to the new child.
mitogen.core.listen(econtext.broker, 'shutdown', _on_broker_shutdown)
@mitogen.core.takes_econtext
def start_fork_parent(econtext):
def init_child(econtext):
"""
Called by ContextService immediately after connection; arranges for the
(presently) spotless Python interpreter to be forked, where the newly
@ -206,12 +271,14 @@ def start_fork_parent(econtext):
global _fork_parent
mitogen.parent.upgrade_router(econtext)
_fork_parent = econtext.router.fork()
reset_temp_dir(econtext)
@mitogen.core.takes_econtext
def start_fork_child(wrap_async, kwargs, econtext):
mitogen.parent.upgrade_router(econtext)
context = econtext.router.fork()
context.call(reset_temp_dir)
if not wrap_async:
try:
return context.call(run_module, kwargs)
@ -331,7 +398,9 @@ def run_module_async(job_id, kwargs, econtext):
def make_temp_directory(base_dir):
"""
Handle creation of `base_dir` if it is absent, in addition to a unique
temporary directory within `base_dir`.
temporary directory within `base_dir`. This is the temporary directory that
becomes 'remote_tmp', not the one used by Ansiballz. It always uses the
system temporary directory.
:returns:
Newly created temporary directory.

@ -157,9 +157,16 @@ Noteworthy Differences
* Performance does not scale perfectly linearly with target count. This will
improve over time.
* Timeouts normally apply to the combined runtime of the SSH and become steps
of a task. As Mitogen treats SSH and sudo distincly, during a failure the
effective timeout may appear to double.
* SSH and ``become`` are treated distinctly when applying timeouts, and
timeouts apply up to the point when the new interpreter is ready to accept
messages. Ansible has two timeouts: ``ConnectTimeout`` for SSH, applying up
to when authentication completes, and a separate parallel timeout up to when
``become`` authentication completes.
For busy targets, Ansible may successfully execute a module where Mitogen
would fail without increasing the timeout. For sick targets, Ansible may hang
indefinitely after authentication without executing a command, for example
due to a stuck filesystem IO appearing in ``$HOME/.profile``.
New Features & Notes

@ -859,14 +859,14 @@ Router Class
Port number to connect to; default is unspecified, which causes SSH
to pick the port number.
:param str check_host_keys:
Specifies the SSH host key checking mode:
Specifies the SSH host key checking mode. Defaults to ``enforce``.
* ``ignore``: no host key checking is performed. Connections never
fail due to an unknown or changed host key.
* ``accept``: known hosts keys are checked to ensure they match,
new host keys are automatically accepted and verified in future
connections.
* ``enforce``: known host keys are checke to ensure they match,
* ``enforce``: known host keys are checked to ensure they match,
unknown hosts cause a connection failure.
:param str password:
Password to type if/when ``ssh`` requests it. If not specified and
@ -887,6 +887,16 @@ Router Class
remaining message in the otherwise uncompressed stream protocol,
such as function call arguments and return values.
:raises mitogen.ssh.PasswordError:
A password was requested but none was specified, or the specified
password was incorrect.
:raises mitogen.ssh.HostKeyError:
When `check_host_keys` is set to either ``accept``, indicates a
previously recorded key no longer matches the remote machine. When
set to ``enforce``, as above, but additionally indicates no
previously recorded key exists for the remote machine.
Context Class
=============

@ -13,7 +13,7 @@ def grep_version():
author = u'David Wilson'
copyright = u'2016, David Wilson'
copyright = u'2018, David Wilson'
exclude_patterns = ['_build']
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinxcontrib.programoutput']
html_show_sourcelink = False

@ -115,15 +115,15 @@ Logging Environment Variables
Overrides the :py:mod:`logging` package log level set by any call to
:py:func:`mitogen.utils.log_to_file`. Defaults to ``INFO``.
If set to ``IO``, equivalent to ``DEBUG`` but additionally enabled IO
logging for any call to :py:func:`mitogen.utils.log_to_file`. IO logging
produces verbose records of any IO interaction, which is useful for
debugging hangs and deadlocks.
``MITOGEN_LOG_USEC``
If present, forces microsecond-level timestamps for any call to
:py:func:`mitogen.utils.log_to_file`.
``MITOGEN_LOG_IO``
If present, forces IO logging for any call to
:py:func:`mitogen.utils.log_to_file`. IO logging produces extremely verbose
logs of any IO interaction, which is useful when debugging deadlocks.
Logging Records

@ -445,6 +445,20 @@ also listen on the following handles:
In this way, the master need never re-send a module it has already sent to
a direct descendant.
.. currentmodule:: mitogen.core
.. data:: DETACHING
Sent to inform a parent that user code has invoked
:meth:`ExternalContext.detach` to decouple the lifecycle of a directly
connected context and its subtree from the running program.
A child usually shuts down immediately if it loses its parent connection,
and parents usually terminate any related Python/SSH subprocess on
disconnection. Receiving :data:`DETACHING` informs the parent the
connection will soon drop, but the process intends to continue life
independently, and to avoid terminating the related subprocess if that
subprocess is the child itself.
Additional handles are created to receive the result of every function call
triggered by :py:meth:`call_async() <mitogen.parent.Context.call_async>`.

@ -15,7 +15,6 @@
<graph edgedefault="directed" id="G">
<data key="d0"/>
<node id="n0">
<data key="d5"/>
<data key="d6">
<y:ShapeNode>
<y:Geometry height="30.0" width="140.0" x="319.0" y="189.0"/>
@ -33,7 +32,6 @@
</data>
</node>
<node id="n1">
<data key="d5"/>
<data key="d6">
<y:ShapeNode>
<y:Geometry height="30.0" width="140.0" x="319.0" y="239.0"/>
@ -51,13 +49,12 @@
</data>
</node>
<node id="n2">
<data key="d5"/>
<data key="d6">
<y:ShapeNode>
<y:Geometry height="30.0" width="140.0" x="319.0" y="289.0"/>
<y:Geometry height="30.0" width="140.0" x="319.0" y="342.0"/>
<y:Fill color="#FFCC00" transparent="false"/>
<y:BorderStyle color="#000000" type="line" width="1.0"/>
<y:NodeLabel alignment="center" autoSizePolicy="content" fontFamily="Dialog" fontSize="12" fontStyle="plain" hasBackgroundColor="false" hasLineColor="false" height="18.1328125" modelName="custom" textColor="#000000" visible="true" width="32.48828125" x="53.755859375" y="5.93359375">sudo<y:LabelModel>
<y:NodeLabel alignment="center" autoSizePolicy="content" fontFamily="Dialog" fontSize="12" fontStyle="plain" hasBackgroundColor="false" hasLineColor="false" height="18.1328125" modelName="custom" textColor="#000000" visible="true" width="60.42578125" x="39.787109375" y="5.93359375">sudo:root<y:LabelModel>
<y:SmartNodeLabelModel distance="4.0"/>
</y:LabelModel>
<y:ModelParameter>
@ -69,13 +66,12 @@
</data>
</node>
<node id="n3">
<data key="d5"/>
<data key="d6">
<y:ShapeNode>
<y:Geometry height="30.0" width="140.0" x="319.0" y="339.0"/>
<y:Geometry height="30.0" width="140.0" x="319.0" y="392.0"/>
<y:Fill color="#FFCC00" transparent="false"/>
<y:BorderStyle color="#000000" type="line" width="1.0"/>
<y:NodeLabel alignment="center" autoSizePolicy="content" fontFamily="Dialog" fontSize="12" fontStyle="plain" hasBackgroundColor="false" hasLineColor="false" height="18.1328125" modelName="custom" textColor="#000000" visible="true" width="71.423828125" x="34.2880859375" y="5.93359375">ssh:billing0<y:LabelModel>
<y:NodeLabel alignment="center" autoSizePolicy="content" fontFamily="Dialog" fontSize="12" fontStyle="plain" hasBackgroundColor="false" hasLineColor="false" height="18.1328125" modelName="custom" textColor="#000000" visible="true" width="91.421875" x="24.2890625" y="5.93359375">docker:billing0<y:LabelModel>
<y:SmartNodeLabelModel distance="4.0"/>
</y:LabelModel>
<y:ModelParameter>
@ -87,10 +83,9 @@
</data>
</node>
<node id="n4">
<data key="d5"/>
<data key="d6">
<y:ShapeNode>
<y:Geometry height="30.0" width="140.0" x="319.0" y="389.0"/>
<y:Geometry height="30.0" width="140.0" x="319.0" y="442.0"/>
<y:Fill color="#99CC00" transparent="false"/>
<y:BorderStyle color="#000000" type="line" width="1.0"/>
<y:NodeLabel alignment="center" autoSizePolicy="content" fontFamily="Dialog" fontSize="12" fontStyle="plain" hasBackgroundColor="false" hasLineColor="false" height="18.1328125" modelName="custom" textColor="#000000" visible="true" width="131.740234375" x="4.1298828125" y="5.93359375">run-nightly-billing.py<y:LabelModel>
@ -104,8 +99,24 @@
</y:ShapeNode>
</data>
</node>
<node id="n5">
<data key="d6">
<y:ShapeNode>
<y:Geometry height="30.0" width="140.0" x="319.0" y="290.5"/>
<y:Fill color="#FFCC00" transparent="false"/>
<y:BorderStyle color="#000000" type="line" width="1.0"/>
<y:NodeLabel alignment="center" autoSizePolicy="content" fontFamily="Dialog" fontSize="12" fontStyle="plain" hasBackgroundColor="false" hasLineColor="false" height="18.1328125" modelName="custom" textColor="#000000" visible="true" width="80.728515625" x="29.6357421875" y="5.93359375">ssh:docker-a<y:LabelModel>
<y:SmartNodeLabelModel distance="4.0"/>
</y:LabelModel>
<y:ModelParameter>
<y:SmartNodeLabelModelParameter labelRatioX="0.0" labelRatioY="0.0" nodeRatioX="0.0" nodeRatioY="0.0" offsetX="0.0" offsetY="0.0" upX="0.0" upY="-1.0"/>
</y:ModelParameter>
</y:NodeLabel>
<y:Shape type="rectangle"/>
</y:ShapeNode>
</data>
</node>
<edge id="e0" source="n0" target="n1">
<data key="d9"/>
<data key="d10">
<y:PolyLineEdge>
<y:Path sx="0.0" sy="0.0" tx="0.0" ty="0.0"/>
@ -115,8 +126,7 @@
</y:PolyLineEdge>
</data>
</edge>
<edge id="e1" source="n1" target="n2">
<data key="d9"/>
<edge id="e1" source="n1" target="n5">
<data key="d10">
<y:PolyLineEdge>
<y:Path sx="0.0" sy="0.0" tx="0.0" ty="0.0"/>
@ -127,7 +137,6 @@
</data>
</edge>
<edge id="e2" source="n2" target="n3">
<data key="d9"/>
<data key="d10">
<y:PolyLineEdge>
<y:Path sx="0.0" sy="0.0" tx="0.0" ty="0.0"/>
@ -138,6 +147,16 @@
</data>
</edge>
<edge id="e3" source="n3" target="n4">
<data key="d10">
<y:PolyLineEdge>
<y:Path sx="0.0" sy="0.0" tx="0.0" ty="0.0"/>
<y:LineStyle color="#000000" type="line" width="1.0"/>
<y:Arrows source="none" target="standard"/>
<y:BendStyle smoothed="false"/>
</y:PolyLineEdge>
</data>
</edge>
<edge id="e4" source="n5" target="n2">
<data key="d9"/>
<data key="d10">
<y:PolyLineEdge>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.3 KiB

After

Width:  |  Height:  |  Size: 4.4 KiB

Before

Width:  |  Height:  |  Size: 13 KiB

After

Width:  |  Height:  |  Size: 13 KiB

@ -95,15 +95,20 @@ to your network topology**.
hostname='jump-box.mycorp.com'
)
ssh_account = router.sudo(
docker_host = router.ssh(
via=bastion_host,
hostname='docker-a.prod.mycorp.com'
)
sudo_account = router.sudo(
via=docker_host,
username='user_with_magic_ssh_key',
password='sudo password',
)
internal_box = router.ssh(
via=ssh_account,
hostname='billing0.internal.mycorp.com'
internal_box = router.docker(
via=sudo_account,
container='billing0',
)
internal_box.call(os.system, './run-nightly-billing.py')
@ -232,6 +237,32 @@ uptime')** without further need to capture or manage output.
18:17:56 I mitogen.ctx.k3: stdout: 17:37:10 up 562 days, 2:25, 5 users, load average: 1.24, 1.13, 1.14
Detached Subtrees
#################
.. image:: images/detached-subtree.png
Contexts may detach from and outlive the running program, while maintaining
communication with descendents in their subtree. This enables persistent
background tasks that reuse Mitogen features.
.. code::
@mitogen.core.takes_econtext
def become_monitoring_master(children, econtext):
kill_old_process('/var/run/mydaemon.pid')
write_pid_file('/var/run/mydaemon.pid')
econtext.detach()
while True:
for child in children:
if child.call(get_cpu_load) > 0.9:
alert_operator('Child is too busy! ' + str(child))
time.sleep(1)
dc1.call_async(become_monitoring_master, children)
Blocking Code Friendly
######################

@ -2,6 +2,11 @@
Internal API Reference
**********************
.. toctree::
:hidden:
signals
mitogen.core
============
@ -462,3 +467,9 @@ Helper Functions
:returns str:
The minimized source.
Signals
=======
:ref:`Please refer to Signals <signals>`.

@ -1,11 +1,20 @@
.. _signals:
Signals
=======
Mitogen exposes a simplistic signal mechanism to help decouple its internal
Mitogen contains a simplistic signal mechanism to help decouple its internal
components. When a signal is fired by a particular instance of a class, any
functions registered to receive it will be called back.
.. warning::
As signals execute on the Broker thread, and without exception handling,
they are generally unsafe for consumption by user code, as any bugs could
trigger crashes and hangs for which the broker is unable to forward logs,
or ensure the buggy context always shuts down on disconnect.
Functions
---------

@ -75,6 +75,7 @@ DEL_ROUTE = 104
ALLOCATE_ID = 105
SHUTDOWN = 106
LOAD_MODULE = 107
DETACHING = 108
IS_DEAD = 999
PY3 = sys.version_info > (3,)
@ -953,6 +954,7 @@ class Context(object):
raise SystemError('Cannot making blocking call on broker thread')
receiver = Receiver(self.router, persist=persist, respondent=self)
msg.dst_id = self.context_id
msg.reply_to = receiver.handle
_v and LOG.debug('%r.send_async(%r)', self, msg)
@ -1277,6 +1279,10 @@ class Router(object):
self.broker.start_receive(stream)
listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream))
def stream_by_id(self, dst_id):
return self._stream_by_id.get(dst_id,
self._stream_by_id.get(mitogen.parent_id))
def del_handler(self, handle):
del self._handle_map[handle]
@ -1501,6 +1507,8 @@ class Broker(object):
class ExternalContext(object):
detached = False
def _on_broker_shutdown(self):
self.channel.close()
@ -1514,8 +1522,34 @@ class ExternalContext(object):
self.broker.shutdown()
def _on_parent_disconnect(self):
_v and LOG.debug('%r: parent stream is gone, dying.', self)
self.broker.shutdown()
if self.detached:
mitogen.parent_ids = []
mitogen.parent_id = None
LOG.info('Detachment complete')
else:
_v and LOG.debug('%r: parent stream is gone, dying.', self)
self.broker.shutdown()
def _sync(self, func):
latch = Latch()
self.broker.defer(lambda: latch.put(func()))
return latch.get()
def detach(self):
self.detached = True
stream = self.router.stream_by_id(mitogen.parent_id)
if stream: # not double-detach()'d
os.setsid()
self.parent.send_await(Message(handle=DETACHING))
LOG.info('Detaching from %r; parent is %s', stream, self.parent)
for x in range(20):
pending = self._sync(lambda: stream.pending_bytes())
if not pending:
break
time.sleep(0.05)
if pending:
LOG.error('Stream had %d bytes after 2000ms', pending)
self.broker.defer(stream.on_disconnect, self.broker)
def _setup_master(self, max_message_size, profiling, parent_id,
context_id, in_fd, out_fd):

@ -36,6 +36,8 @@ LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
container = None
image = None
username = None

@ -81,6 +81,8 @@ def handle_child_crash():
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = True
#: Reference to the importer, if any, recovered from the parent.
importer = None

@ -36,6 +36,7 @@ LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
create_child_args = {
'merge_stdio': True
}

@ -36,6 +36,7 @@ LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
create_child_args = {
# If lxc-attach finds any of stdin, stdout, stderr connected to a TTY,
# to prevent input injection it creates a proxy pty, forcing all IO to

@ -690,6 +690,11 @@ class Router(mitogen.parent.Router):
self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self)
self.route_monitor = mitogen.parent.RouteMonitor(router=self)
self.add_handler( # TODO: cutpaste.
fn=self._on_detaching,
handle=mitogen.core.DETACHING,
persist=True,
)
def enable_debug(self):
mitogen.core.enable_debug_logging()

@ -599,12 +599,23 @@ class Stream(mitogen.core.Stream):
)
)
#: If :data:`True`, indicates the subprocess managed by us should not be
#: killed during graceful detachment, as it the actual process implementing
#: the child context. In all other cases, the subprocess is SSH, sudo, or a
#: similar tool that should be reminded to quit during disconnection.
child_is_immediate_subprocess = True
detached = False
_reaped = False
def _reap_child(self):
"""
Reap the child process during disconnection.
"""
if self.detached and self.child_is_immediate_subprocess:
LOG.debug('%r: immediate child is detached, won\'t reap it', self)
return
if self._reaped:
# on_disconnect() may be invoked more than once, for example, if
# there is still a pending message to be sent after the first
@ -929,10 +940,22 @@ class Router(mitogen.core.Router):
importer=importer,
)
self.route_monitor = RouteMonitor(self, parent)
self.add_handler(
fn=self._on_detaching,
handle=mitogen.core.DETACHING,
persist=True,
)
def stream_by_id(self, dst_id):
return self._stream_by_id.get(dst_id,
self._stream_by_id.get(mitogen.parent_id))
def _on_detaching(self, msg):
if msg.is_dead:
return
stream = self.stream_by_id(msg.src_id)
if stream.remote_id != msg.src_id or stream.detached:
LOG.warning('bad DETACHING received on %r: %r', stream, msg)
return
LOG.debug('%r: marking as detached', stream)
stream.detached = True
msg.reply(None)
def add_route(self, target_id, stream):
LOG.debug('%r.add_route(%r, %r)', self, target_id, stream)
@ -974,6 +997,8 @@ class Router(mitogen.core.Router):
self._context_by_id[context_id] = context
return context
connection_timeout_msg = "Connection timed out."
def _connect(self, klass, name=None, **kwargs):
context_id = self.allocate_id()
context = self.context_class(self, context_id)
@ -982,7 +1007,11 @@ class Router(mitogen.core.Router):
stream = klass(self, context_id, **kwargs)
if name is not None:
stream.name = name
stream.connect()
try:
stream.connect()
except mitogen.core.TimeoutError:
e = sys.exc_info()[1]
raise mitogen.core.StreamError(self.connection_timeout_msg)
context.name = stream.name
self.route_monitor.notice_stream(stream)
self.register(context, stream)

@ -330,7 +330,10 @@ class Pool(object):
thread.start()
self._threads.append(thread)
closed = False
def stop(self):
self.closed = True
self._select.close()
for th in self._threads:
th.join()
@ -338,7 +341,7 @@ class Pool(object):
service.on_shutdown()
def _worker_run(self):
while True:
while not self.closed:
try:
msg = self._select.get()
except (mitogen.core.ChannelError, mitogen.core.LatchError):

@ -105,6 +105,8 @@ def get_machinectl_pid(path, name):
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
container = None
username = None
kind = None

@ -59,6 +59,7 @@ class HostKeyError(mitogen.core.StreamError):
class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
python_path = 'python2.7'
#: Once connected, points to the corresponding TtyLogStream, allowing it to
@ -154,8 +155,8 @@ class Stream(mitogen.parent.Stream):
'configuration.'
)
hostkey_failed_msg = (
'check_host_keys is set to enforce, and SSH reported an unknown '
'or changed host key.'
'Host key checking is enabled, and SSH reported an unrecognized or '
'mismatching host key.'
)
def _host_key_prompt(self):

@ -46,6 +46,7 @@ class Stream(mitogen.parent.Stream):
# for hybrid_tty_create_child(), there just needs to be either a shell
# snippet or bootstrap support for fixing things up afterwards.
create_child = staticmethod(mitogen.parent.tty_create_child)
child_is_immediate_subprocess = False
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.

@ -104,6 +104,7 @@ class PasswordError(mitogen.core.StreamError):
class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.

@ -71,13 +71,14 @@ def log_to_file(path=None, io=False, usec=False, level='INFO'):
fp = sys.stderr
level = os.environ.get('MITOGEN_LOG_LEVEL', level).upper()
level = getattr(logging, level, logging.INFO)
log.setLevel(level)
io = ('MITOGEN_LOG_IO' in os.environ) or io
io = level == 'IO'
if io:
level = 'DEBUG'
logging.getLogger('mitogen.io').setLevel(level)
level = getattr(logging, level, logging.INFO)
log.setLevel(level)
handler = logging.StreamHandler(fp)
handler.formatter = log_get_formatter(usec=usec)
log.handlers.insert(0, handler)

@ -8,6 +8,9 @@ library = lib/modules
retry_files_enabled = False
forks = 50
# Required by integration/ssh/timeouts.yml
timeout = 10
# On Travis, paramiko check fails due to host key checking enabled.
host_key_checking = False

@ -3,11 +3,12 @@
# This playbook imports all tests that are known to work at present.
#
- import_playbook: action/all.yml
- import_playbook: async/all.yml
- import_playbook: become/all.yml
- import_playbook: connection_loader/all.yml
- import_playbook: context_service/all.yml
- import_playbook: playbook_semantics/all.yml
- import_playbook: remote_tmp/all.yml
- import_playbook: runner/all.yml
#- import_playbook: action/all.yml
#- import_playbook: async/all.yml
#- import_playbook: become/all.yml
#- import_playbook: connection_loader/all.yml
#- import_playbook: context_service/all.yml
#- import_playbook: playbook_semantics/all.yml
#- import_playbook: remote_tmp/all.yml
#- import_playbook: runner/all.yml
- import_playbook: ssh/all.yml

@ -14,8 +14,7 @@
vars:
ansible_become_pass: readonly_homedir_password
- debug: msg={{out}}
- name: Verify system temp directory was used.
assert:
that:
- out.argv[0].startswith("/tmp/ansible_mitogen_")
- out.__file__.startswith("/tmp/ansible_mitogen_")

@ -12,4 +12,3 @@
- import_playbook: custom_python_want_json_module.yml
- import_playbook: custom_script_interpreter.yml
- import_playbook: forking_behaviour.yml
- import_playbook: remote_tmp.yml

@ -1,5 +1,5 @@
- name: integration/runner__builtin_command_module.yml
- name: integration/runner/builtin_command_module.yml
hosts: test-targets
any_errors_fatal: true
gather_facts: true

@ -1,4 +1,4 @@
- name: integration/runner__custom_bash_old_style_module.yml
- name: integration/runner/custom_bash_old_style_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,4 +1,4 @@
- name: integration/runner__custom_bash_want_json_module.yml
- name: integration/runner/custom_bash_want_json_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,4 +1,4 @@
- name: integration/runner__custom_binary_producing_json.yml
- name: integration/runner/custom_binary_producing_json.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,4 +1,4 @@
- name: integration/runner__custom_binary_producing_junk.yml
- name: integration/runner/custom_binary_producing_junk.yml
hosts: test-targets
tasks:
- custom_binary_producing_junk:

@ -1,4 +1,4 @@
- name: integration/runner__custom_binary_single_null.yml
- name: integration/runner/custom_binary_single_null.yml
hosts: test-targets
tasks:
- custom_binary_single_null:

@ -1,4 +1,4 @@
- name: integration/runner__custom_perl_json_args_module.yml
- name: integration/runner/custom_perl_json_args_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,4 +1,4 @@
- name: integration/runner__custom_perl_want_json_module.yml
- name: integration/runner/custom_perl_want_json_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,4 +1,4 @@
- name: integration/runner__custom_python_json_args_module.yml
- name: integration/runner/custom_python_json_args_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,5 +1,5 @@
- name: integration/runner__custom_python_new_style_module.yml
- name: integration/runner/custom_python_new_style_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,4 +1,4 @@
- name: integration/runner__custom_python_new_style_module.yml
- name: integration/runner/custom_python_new_style_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,4 +1,4 @@
- name: integration/runner__custom_python_want_json_module.yml
- name: integration/runner/custom_python_want_json_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:

@ -1,18 +0,0 @@
#
# The ansible.cfg remote_tmp setting should be copied to the target and used
# when generating temporary paths created by the runner.py code executing
# remotely.
#
- name: integration/runner__remote_tmp.yml
hosts: test-targets
any_errors_fatal: true
gather_facts: true
tasks:
- bash_return_paths:
register: output
- assert:
that: output.argv0.startswith('%s/.ansible/mitogen-tests/' % ansible_user_dir)
- assert:
that: output.argv1.startswith('%s/.ansible/mitogen-tests/' % ansible_user_dir)

@ -0,0 +1 @@
- import_playbook: timeouts.yml

@ -0,0 +1,20 @@
# Ensure 'ssh' connections time out correctly.
- name: integration/ssh/timeouts.yml
hosts: test-targets
tasks:
- connection: local
command: ansible -vvv -i "{{inventory_file}}" test-targets -m custom_python_detect_environment -e ansible_user=mitogen__slow_user -e ansible_password=slow_user_password
register: out
ignore_errors: true
when: is_mitogen
- assert:
that:
- |
'"changed": false' in out.stdout
- |
'"unreachable": true' in out.stdout
- |
'"msg": "Connection timed out."' in out.stdout
when: is_mitogen

@ -6,6 +6,7 @@ required for reliable LRU tests.
import traceback
import sys
import ansible_mitogen.connection
import ansible_mitogen.services
import mitogen.service
@ -15,9 +16,10 @@ from ansible.plugins.action import ActionBase
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
if not type(self._connection).__module__.startswith('ansible_mitogen'):
if not isinstance(self._connection,
ansible_mitogen.connection.Connection):
return {
'changed': False
'skipped': True,
}
self._connection._connect()

@ -14,6 +14,7 @@ def main():
module = AnsibleModule(argument_spec={})
module.exit_json(
argv=sys.argv,
__file__=__file__,
argv_types=[str(type(s)) for s in sys.argv],
env=dict(os.environ),
cwd=os.getcwd(),

@ -11,16 +11,10 @@ def usage():
sys.stderr.write('Usage: %s <input.json>\n' % (sys.argv[0],))
sys.exit(1)
# Also must slurp in our own source code, to verify the encoding string was
# added.
with open(sys.argv[0]) as fp:
me = fp.read()
input_json = sys.stdin.read()
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"source\": [%s]," % (json.dumps(me),)
print " \"input\": [%s]" % (input_json,)
print "}"

@ -12,11 +12,6 @@ def usage():
sys.stderr.write('Usage: %s <input.json>\n' % (sys.argv[0],))
sys.exit(1)
# Also must slurp in our own source code, to verify the encoding string was
# added.
with open(sys.argv[0]) as fp:
me = fp.read()
input_json = sys.stdin.read()
print "{"
@ -27,6 +22,5 @@ print " \"__file__\": \"%s\"," % (__file__,)
# Python sets this during a regular import.
print " \"__package__\": \"%s\"," % (__package__,)
print " \"msg\": \"Here is my input\","
print " \"source\": [%s]," % (json.dumps(me),)
print " \"input\": [%s]" % (input_json,)
print "}"

@ -29,6 +29,7 @@
- require_tty
- pw_required
- require_tty_pw_required
- slow_user
when: ansible_system != 'Darwin'
- name: Create Mitogen test users
@ -52,6 +53,7 @@
- pw_required
- require_tty_pw_required
- readonly_homedir
- slow_user
when: ansible_system == 'Darwin'
- name: Create Mitogen test users
@ -88,6 +90,14 @@
- name: Readonly homedir for one account
shell: "chown -R root: ~mitogen__readonly_homedir"
- name: Slow bash profile for one account
copy:
dest: ~mitogen__slow_user/.{{item}}
src: ../data/docker/mitogen__slow_user.profile
with_items:
- bashrc
- profile
- name: Require a TTY for two accounts
lineinfile:
path: /etc/sudoers

@ -47,8 +47,10 @@ RUN \
useradd -s /bin/bash -m mitogen__require_tty && \
useradd -s /bin/bash -m mitogen__require_tty_pw_required && \
useradd -s /bin/bash -m mitogen__readonly_homedir && \
useradd -s /bin/bash -m mitogen__slow_user && \
chown -R root: ~mitogen__readonly_homedir && \
{ for i in `seq 1 21`; do useradd -s /bin/bash -m mitogen__user$i; done; } && \
{ for i in `seq 1 21`; do echo mitogen__user$i:user$i_password | chpasswd; } && \
( echo 'root:rootpassword' | chpasswd; ) && \
( echo 'mitogen__has_sudo:has_sudo_password' | chpasswd; ) && \
( echo 'mitogen__has_sudo_pubkey:has_sudo_pubkey_password' | chpasswd; ) && \
@ -58,10 +60,14 @@ RUN \
( echo 'mitogen__require_tty:require_tty_password' | chpasswd; ) && \
( echo 'mitogen__require_tty_pw_required:require_tty_pw_required_password' | chpasswd; ) && \
( echo 'mitogen__readonly_homedir:readonly_homedir_password' | chpasswd; ) && \
( echo 'mitogen__slow_user:slow_user_password' | chpasswd; ) && \
mkdir ~mitogen__has_sudo_pubkey/.ssh && \
{ echo '#!/bin/bash\nexec strace -ff -o /tmp/pywrap$$.trace python2.7 "$@"' > /usr/local/bin/pywrap; chmod +x /usr/local/bin/pywrap; }
COPY data/docker/mitogen__has_sudo_pubkey.key.pub /home/mitogen__has_sudo_pubkey/.ssh/authorized_keys
COPY data/docker/mitogen__slow_user.profile /home/mitogen__slow_user/.profile
COPY data/docker/mitogen__slow_user.profile /home/mitogen__slow_user/.bashrc
RUN \
chown -R mitogen__has_sudo_pubkey ~mitogen__has_sudo_pubkey && \
chmod -R go= ~mitogen__has_sudo_pubkey
@ -93,6 +99,6 @@ for (distro, wheel, prefix) in (('debian', 'sudo', DEBIAN_DOCKERFILE),
subprocess.check_call(sh('docker build %s -t %s -f %s',
mydir,
'd2mw/mitogen-%s-test' % (distro,),
'mitogen/%s-test' % (distro,),
dockerfile_fp.name
))

@ -0,0 +1,3 @@
# mitogen__slow_user takes forever to log in.
sleep 30

@ -175,7 +175,7 @@ class DockerizedSshDaemon(object):
def get_image(self):
if not self.image:
distro = os.environ.get('MITOGEN_TEST_DISTRO', 'debian')
self.image = 'd2mw/mitogen-%s-test' % (distro,)
self.image = 'mitogen/%s-test' % (distro,)
return self.image
def __init__(self):

Loading…
Cancel
Save