Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  tests: add new compression parameter to mitogen_get_stack results
  tests: disable affinity_test on Travis :/
  issue #508: fix responder stats test due to new smaller parent.py.
  issue #508: tests: skip minify_test Py2.4/2.5 for profiler.py.
  tests: fix fallout from 36fb318adf5c56e729296c3efce84f4dd75ced4e
  issue #520: add AIX auth failure string to su.
  tests: move affinity_test to Ansible tests.
  core: cProfile is not available in 2.4.
  issue #505: docs: add new detail graph for one scenario.
  docs: update and re-record profile graphs in docs; closes #505
  service: fix PushFileService exception
  tests: pad out localhost-*
  service: start pool shutdown on broker shutdown.
  master: .encode() needed for Py3.
  ansible: stash PID files in CWD if requested for debugging.
  issue #508: master: minify_safe_re must be bytes for Py3.
  bench: tidy up and cpu-pin some more files.
  tests: add localhost-x100
  ansible: double the default pool size.
  ansible: raise error with correct exception type.
  issue #508: master: minify all Mitogen/ansible_mitogen sources.
  parent: PartialZlib docstrings.
  ansible: hacky parser to alow bools to be specified on command line
  parent: pre-cache bootstrap if possible.
  docs: update Changelog.
  ansible: add mitogen_ssh_compression variable.
  service: PushFileService never recorded a file as sent.
  parent: synchronize get_core_source()
  service: use correct profile aggregation name.
  SyntaxError.
  ansible: don't pin controller if <4 cores.
  tests: make soak testing work reliably on vanilla.
  docs: changelog tidyups.
  ansible: document and make affinity stuff portable to non-Linux
  ansible: fix affinity.py test failure on 2 cores.
  ansible: preheat PluginLoader caches before fork.
  tests: make mitogen_shutdown_all be run_once by default.
  docs: update Changelog.
  ansible: use Poller for WorkerProcess; closes #491.
  ansible: new multiplexer/workers configuration
  docs: update Changelog.
  docs: update Changelog.
  ansible: pin connection multiplexer to a single core
  utils: pad out reset_affinity() and integrate with detach_popen()
  utils: import reset_affinity() function.
  master: set Router.profiling if MITOGEN_PROFILING variable present.
  parent: don't kill children when profiling is active.
  ansible: hook strategy and worker processes into profiler
  profiler: import from linear2 branch
  core: tidy up existing profiling code and support MITOGEN_PROFILE_FMT
  issue #260: redundant if statement.
  ansible: ensure MuxProcess MITOGEN_PROFILING results reach disk.
  ansible/bench: make end= configurable.
  master: cache sent/forwarded module names
pull/564/head
David Wilson 5 years ago
commit 1345f05627

@ -0,0 +1,241 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
As Mitogen separates asynchronous IO out to a broker thread, communication
necessarily involves context switching and waking that thread. When application
threads and the broker share a CPU, this can be almost invisibly fast - around
25 microseconds for a full A->B->A round-trip.
However when threads are scheduled on different CPUs, round-trip delays
regularly vary wildly, and easily into milliseconds. Many contributing factors
exist, not least scenarios like:
1. A is preempted immediately after waking B, but before releasing the GIL.
2. B wakes from IO wait only to immediately enter futex wait.
3. A may wait 10ms or more for another timeslice, as the scheduler on its CPU
runs threads unrelated to its transaction (i.e. not B), wake only to release
its GIL, before entering IO sleep waiting for a reply from B, which cannot
exist yet.
4. B wakes, acquires GIL, performs work, and sends reply to A, causing it to
wake. B is preempted before releasing GIL.
5. A wakes from IO wait only to immediately enter futex wait.
6. B may wait 10ms or more for another timeslice, wake only to release its GIL,
before sleeping again.
7. A wakes, acquires GIL, finally receives reply.
Per above if we are unlucky, on an even moderately busy machine it is possible
to lose milliseconds just in scheduling delay, and the effect is compounded
when pairs of threads in process A are communicating with pairs of threads in
process B using the same scheme, such as when Ansible WorkerProcess is
communicating with ContextService in the connection multiplexer. In the worst
case it could involve 4 threads working in lockstep spread across 4 busy CPUs.
Since multithreading in Python is essentially useless except for waiting on IO
due to the presence of the GIL, at least in Ansible there is no good reason for
threads in the same process to run on distinct CPUs - they always operate in
lockstep due to the GIL, and are thus vulnerable to issues like above.
Linux lacks any natural API to describe what we want, it only permits
individual threads to be constrained to run on specific CPUs, and for that
constraint to be inherited by new threads and forks of the constrained thread.
This module therefore implements a CPU pinning policy for Ansible processes,
providing methods that should be called early in any new process, either to
rebalance which CPU it is pinned to, or in the case of subprocesses, to remove
the pinning entirely. It is likely to require ongoing tweaking, since pinning
necessarily involves preventing the scheduler from making load balancing
decisions.
"""
import ctypes
import mmap
import multiprocessing
import os
import struct
import mitogen.parent
try:
_libc = ctypes.CDLL(None, use_errno=True)
_strerror = _libc.strerror
_strerror.restype = ctypes.c_char_p
_pthread_mutex_init = _libc.pthread_mutex_init
_pthread_mutex_lock = _libc.pthread_mutex_lock
_pthread_mutex_unlock = _libc.pthread_mutex_unlock
_sched_setaffinity = _libc.sched_setaffinity
except (OSError, AttributeError):
_libc = None
_strerror = None
_pthread_mutex_init = None
_pthread_mutex_lock = None
_pthread_mutex_unlock = None
_sched_setaffinity = None
class pthread_mutex_t(ctypes.Structure):
"""
Wrap pthread_mutex_t to allow storing a lock in shared memory.
"""
_fields_ = [
('data', ctypes.c_uint8 * 512),
]
def init(self):
if _pthread_mutex_init(self.data, 0):
raise Exception(_strerror(ctypes.get_errno()))
def acquire(self):
if _pthread_mutex_lock(self.data):
raise Exception(_strerror(ctypes.get_errno()))
def release(self):
if _pthread_mutex_unlock(self.data):
raise Exception(_strerror(ctypes.get_errno()))
class State(ctypes.Structure):
"""
Contents of shared memory segment. This allows :meth:`Manager.assign` to be
called from any child, since affinity assignment must happen from within
the context of the new child process.
"""
_fields_ = [
('lock', pthread_mutex_t),
('counter', ctypes.c_uint8),
]
class Policy(object):
"""
Process affinity policy.
"""
def assign_controller(self):
"""
Assign the Ansible top-level policy to this process.
"""
def assign_muxprocess(self):
"""
Assign the MuxProcess policy to this process.
"""
def assign_worker(self):
"""
Assign the WorkerProcess policy to this process.
"""
def assign_subprocess(self):
"""
Assign the helper subprocess policy to this process.
"""
class LinuxPolicy(Policy):
"""
:class:`Policy` for Linux machines. The scheme here was tested on an
otherwise idle 16 thread machine.
- The connection multiplexer is pinned to CPU 0.
- The Ansible top-level (strategy) is pinned to CPU 1.
- WorkerProcesses are pinned sequentually to 2..N, wrapping around when no
more CPUs exist.
- Children such as SSH may be scheduled on any CPU except 0/1.
If the machine has less than 4 cores available, the top-level and workers
are pinned between CPU 2..N, i.e. no CPU is reserved for the top-level
process.
This could at least be improved by having workers pinned to independent
cores, before reusing the second hyperthread of an existing core.
A hook is installed that causes :meth:`reset` to run in the child of any
process created with :func:`mitogen.parent.detach_popen`, ensuring
CPU-intensive children like SSH are not forced to share the same core as
the (otherwise potentially very busy) parent.
"""
def __init__(self):
self.mem = mmap.mmap(-1, 4096)
self.state = State.from_buffer(self.mem)
self.state.lock.init()
if self._cpu_count() < 4:
self._reserve_mask = 3
self._reserve_shift = 2
self._reserve_controller = True
else:
self._reserve_mask = 1
self._reserve_shift = 1
self._reserve_controller = False
def _set_affinity(self, mask):
mitogen.parent._preexec_hook = self._clear
s = struct.pack('L', mask)
_sched_setaffinity(os.getpid(), len(s), s)
def _cpu_count(self):
return multiprocessing.cpu_count()
def _balance(self):
self.state.lock.acquire()
try:
n = self.state.counter
self.state.counter += 1
finally:
self.state.lock.release()
self._set_cpu(self._reserve_shift + (
(n % max(1, (self._cpu_count() - self._reserve_shift)))
))
def _set_cpu(self, cpu):
self._set_affinity(1 << cpu)
def _clear(self):
self._set_affinity(0xffffffff & ~self._reserve_mask)
def assign_controller(self):
if self._reserve_controller:
self._set_cpu(1)
else:
self._balance()
def assign_muxprocess(self):
self._set_cpu(0)
def assign_worker(self):
self._balance()
def assign_subprocess(self):
self._clear()
if _sched_setaffinity is not None:
policy = LinuxPolicy()
else:
policy = Policy()

@ -44,6 +44,7 @@ import ansible.errors
import ansible.plugins.connection
import ansible.utils.shlex
import mitogen.core
import mitogen.fork
import mitogen.unix
import mitogen.utils
@ -69,6 +70,27 @@ def optional_int(value):
return None
def convert_bool(obj):
if isinstance(obj, bool):
return obj
if str(obj).lower() in ('no', 'false', '0'):
return False
if str(obj).lower() not in ('yes', 'true', '1'):
raise ansible.errors.AnsibleConnectionFailure(
'expected yes/no/true/false/0/1, got %r' % (obj,)
)
return True
def default(value, default):
"""
Return `default` is `value` is :data:`None`, otherwise return `value`.
"""
if value is None:
return default
return value
def _connect_local(spec):
"""
Return ContextService arguments for a local connection.
@ -102,6 +124,9 @@ def _connect_ssh(spec):
'check_host_keys': check_host_keys,
'hostname': spec.remote_addr(),
'username': spec.remote_user(),
'compression': convert_bool(
default(spec.mitogen_ssh_compression(), True)
),
'password': spec.password(),
'port': spec.port(),
'python_path': spec.python_path(),
@ -340,12 +365,21 @@ CONNECTION_METHOD = {
}
class Broker(mitogen.master.Broker):
"""
WorkerProcess maintains at most 2 file descriptors, therefore does not need
the exuberant syscall expense of EpollPoller, so override it and restore
the poll() poller.
"""
poller_class = mitogen.core.Poller
class CallChain(mitogen.parent.CallChain):
"""
Extend :class:`mitogen.parent.CallChain` to additionally cause the
associated :class:`Connection` to be reset if a ChannelError occurs.
This only catches failures that occur while a call is pnding, it is a
This only catches failures that occur while a call is pending, it is a
stop-gap until a more general method is available to notice connection in
every situation.
"""

@ -56,6 +56,7 @@ import ansible_mitogen.logging
import ansible_mitogen.services
from mitogen.core import b
import ansible_mitogen.affinity
LOG = logging.getLogger(__name__)
@ -93,6 +94,24 @@ def getenv_int(key, default=0):
return default
def save_pid(name):
"""
When debugging and profiling, it is very annoying to poke through the
process list to discover the currently running Ansible and MuxProcess IDs,
especially when trying to catch an issue during early startup. So here, if
a magic environment variable set, stash them in hidden files in the CWD::
alias muxpid="cat .ansible-mux.pid"
alias anspid="cat .ansible-controller.pid"
gdb -p $(muxpid)
perf top -p $(anspid)
"""
if os.environ.get('MITOGEN_SAVE_PIDS'):
with open('.ansible-%s.pid' % (name,), 'w') as fp:
fp.write(str(os.getpid()))
class MuxProcess(object):
"""
Implement a subprocess forked from the Ansible top-level, as a safe place
@ -163,7 +182,8 @@ class MuxProcess(object):
mitogen.core.set_cloexec(cls.worker_sock.fileno())
mitogen.core.set_cloexec(cls.child_sock.fileno())
if os.environ.get('MITOGEN_PROFILING'):
cls.profiling = os.environ.get('MITOGEN_PROFILING') is not None
if cls.profiling:
mitogen.core.enable_profiling()
cls.original_env = dict(os.environ)
@ -171,10 +191,14 @@ class MuxProcess(object):
if _init_logging:
ansible_mitogen.logging.setup()
if cls.child_pid:
save_pid('controller')
ansible_mitogen.affinity.policy.assign_controller()
cls.child_sock.close()
cls.child_sock = None
mitogen.core.io_op(cls.worker_sock.recv, 1)
else:
save_pid('mux')
ansible_mitogen.affinity.policy.assign_muxprocess()
cls.worker_sock.close()
cls.worker_sock = None
self = cls()
@ -270,7 +294,7 @@ class MuxProcess(object):
ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.ModuleDepService(self.router),
],
size=getenv_int('MITOGEN_POOL_SIZE', default=16),
size=getenv_int('MITOGEN_POOL_SIZE', default=32),
)
LOG.debug('Service pool configured: size=%d', self.pool.size)
@ -281,7 +305,9 @@ class MuxProcess(object):
then cannot clean up pending handlers, which is required for the
threads to exit gracefully.
"""
self.pool.stop(join=False)
# In normal operation we presently kill the process because there is
# not yet any way to cancel connect().
self.pool.stop(join=self.profiling)
def on_broker_exit(self):
"""
@ -289,10 +315,9 @@ class MuxProcess(object):
ourself. In future this should gracefully join the pool, but TERM is
fine for now.
"""
if os.environ.get('MITOGEN_PROFILING'):
# TODO: avoid killing pool threads before they have written their
# .pstats. Really shouldn't be using kill() here at all, but hard
# to guarantee services can always be unblocked during shutdown.
time.sleep(1)
os.kill(os.getpid(), signal.SIGTERM)
if not self.profiling:
# In normal operation we presently kill the process because there is
# not yet any way to cancel connect(). When profiling, threads
# including the broker must shut down gracefully, otherwise pstats
# won't be written.
os.kill(os.getpid(), signal.SIGTERM)

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
These classes implement execution for each style of Ansible module. They are
instantiated in the target context by way of target.py::run_module().

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
Classes in this file define Mitogen 'services' that run (initially) within the
connection multiplexer process that is forked off the top-level controller

@ -28,12 +28,17 @@
from __future__ import absolute_import
import os
import signal
import threading
import mitogen.core
import ansible_mitogen.affinity
import ansible_mitogen.loaders
import ansible_mitogen.mixins
import ansible_mitogen.process
import ansible.executor.process.worker
def _patch_awx_callback():
"""
@ -93,6 +98,22 @@ def wrap_connection_loader__get(name, *args, **kwargs):
return connection_loader__get(name, *args, **kwargs)
def wrap_worker__run(*args, **kwargs):
"""
While the strategy is active, rewrite connection_loader.get() calls for
some transports into requests for a compatible Mitogen transport.
"""
# Ignore parent's attempts to murder us when we still need to write
# profiling output.
if mitogen.core._profile_hook.__name__ != '_profile_hook':
signal.signal(signal.SIGTERM, signal.SIG_IGN)
ansible_mitogen.affinity.policy.assign_worker()
return mitogen.core._profile_hook('WorkerProcess',
lambda: worker__run(*args, **kwargs)
)
class StrategyMixin(object):
"""
This mix-in enhances any built-in strategy by arranging for various Mitogen
@ -167,12 +188,17 @@ class StrategyMixin(object):
connection_loader__get = ansible_mitogen.loaders.connection_loader.get
ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get
global worker__run
worker__run = ansible.executor.process.worker.WorkerProcess.run
ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run
def _remove_wrappers(self):
"""
Uninstall the PluginLoader monkey patches.
"""
ansible_mitogen.loaders.action_loader.get = action_loader__get
ansible_mitogen.loaders.connection_loader.get = connection_loader__get
ansible.executor.process.worker.WorkerProcess.run = worker__run
def _add_plugin_paths(self):
"""
@ -187,15 +213,44 @@ class StrategyMixin(object):
os.path.join(base_dir, 'action')
)
def _queue_task(self, host, task, task_vars, play_context):
"""
Many PluginLoader caches are defective as they are only populated in
the ephemeral WorkerProcess. Touch each plug-in path before forking to
ensure all workers receive a hot cache.
"""
ansible_mitogen.loaders.module_loader.find_plugin(
name=task.action,
mod_type='',
)
ansible_mitogen.loaders.connection_loader.get(
name=play_context.connection,
class_only=True,
)
ansible_mitogen.loaders.action_loader.get(
name=task.action,
class_only=True,
)
return super(StrategyMixin, self)._queue_task(
host=host,
task=task,
task_vars=task_vars,
play_context=play_context,
)
def run(self, iterator, play_context, result=0):
"""
Arrange for a mitogen.master.Router to be available for the duration of
the strategy's real run() method.
"""
ansible_mitogen.process.MuxProcess.start()
run = super(StrategyMixin, self).run
self._add_plugin_paths()
self._install_wrappers()
try:
return super(StrategyMixin, self).run(iterator, play_context)
return mitogen.core._profile_hook('Strategy',
lambda: run(iterator, play_context)
)
finally:
self._remove_wrappers()

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
Helper functions intended to be executed on the target. These are entrypoints
for file transfer, module execution and sundry bits like changing file modes.

@ -273,6 +273,12 @@ class Spec(with_metaclass(abc.ABCMeta, object)):
The SSH debug level.
"""
@abc.abstractmethod
def mitogen_ssh_compression(self):
"""
Whether SSH compression is enabled.
"""
@abc.abstractmethod
def extra_args(self):
"""
@ -398,6 +404,9 @@ class PlayContextSpec(Spec):
def mitogen_ssh_debug_level(self):
return self._connection.get_task_var('mitogen_ssh_debug_level')
def mitogen_ssh_compression(self):
return self._connection.get_task_var('mitogen_ssh_compression')
def extra_args(self):
return self._connection.get_extra_args()
@ -577,5 +586,8 @@ class MitogenViaSpec(Spec):
def mitogen_ssh_debug_level(self):
return self._host_vars.get('mitogen_ssh_debug_level')
def mitogen_ssh_compression(self):
return self._host_vars.get('mitogen_ssh_compression')
def extra_args(self):
return [] # TODO

@ -179,7 +179,7 @@ Noteworthy Differences
practice, and light web searches failed to reveal many examples of them.
* Ansible permits up to ``forks`` connections to be setup in parallel, whereas
in Mitogen this is handled by a fixed-size thread pool. Up to 16 connections
in Mitogen this is handled by a fixed-size thread pool. Up to 32 connections
may be established in parallel by default, this can be modified by setting
the ``MITOGEN_POOL_SIZE`` environment variable.
@ -901,6 +901,10 @@ except connection delegation is supported.
* ``ssh_args``, ``ssh_common_args``, ``ssh_extra_args``
* ``mitogen_ssh_debug_level``: integer between `0..3` indicating the SSH client
debug level. Ansible must also be run with '-vvv' to view the output.
* ``mitogen_ssh_compression``: :data:`True` to enable SSH compression,
otherwise :data:`False`. This will change to off by default in a future
release. If you are targetting many hosts on a fast network, please consider
disabling SSH compression.
Debugging
@ -1164,35 +1168,103 @@ FreeNode IRC network.
Sample Profiles
---------------
Local VM connection
~~~~~~~~~~~~~~~~~~~
The summaries below may be reproduced using data and scripts maintained in the
`pcaps branch <https://github.com/dw/mitogen/tree/pcaps/>`_. Traces were
recorded using Ansible 2.5.14.
Trivial Loop: Local Host
~~~~~~~~~~~~~~~~~~~~~~~~
This demonstrates Mitogen vs. SSH pipelining to the local machine running
`bench/loop-100-items.yml
<https://github.com/dw/mitogen/blob/master/tests/ansible/bench/loop-100-items.yml>`_,
executing a simple command 100 times. Most Ansible controller overhead is
isolated, characterizing just module executor and connection layer performance.
Mitogen requires **63x less bandwidth and 5.9x less time**.
.. image:: images/ansible/pcaps/loop-100-items-local.svg
Unlike in SSH pipelining where payloads are sent as a single compressed block,
by default Mitogen enables SSH compression for its uncompressed RPC data. In
many-host scenarios it may be desirable to disable compression. This has
negligible impact on footprint, since program code is separately compressed and
sent only once. Compression also benefits SSH pipelining, but the presence of
large precompressed per-task payloads may present a more significant CPU burden
during many-host runs.
.. image:: images/ansible/pcaps/loop-100-items-local-detail.svg
In a detailed trace, improved interaction with the host machine is visible. In
this playbook because no forks were required to start SSH clients from the
worker process executing the loop, the worker's memory was never marked
read-only, thus avoiding a major hidden performance problem - the page fault
rate is more than halved.
File Transfer: UK to France
~~~~~~~~~~~~~~~~~~~~~~~~~~~
This demonstrates Mitogen vs. connection pipelining to a local VM executing
``bench/loop-100-items.yml``, which simply executes ``hostname`` 100 times.
Mitogen requires **43x less bandwidth and 6.5x less time**.
`This playbook
<https://github.com/dw/mitogen/blob/master/tests/ansible/regression/issue_140__thread_pileup.yml>`_
was used to compare file transfer performance over a ~26 ms link. It uses the
``with_filetree`` loop syntax to copy a directory of 1,000 0-byte files to the
target.
.. raw:: html
<style>
.nojunk td,
.nojunk th { padding: 4px; font-size: 90%; text-align: right !important; }
table.docutils col {
width: auto !important;
}
</style>
.. csv-table::
:header: , Secs, CPU Secs, Sent, Received, Roundtrips
:class: nojunk
:align: right
Mitogen, 98.54, 43.04, "815 KiB", "447 KiB", 3.79
SSH Pipelining, "1,483.54", 329.37, "99,539 KiB", "6,870 KiB", 57.01
*Roundtrips* is the approximate number of network roundtrips required to
describe the runtime that was consumed. Due to Mitogen's built-in file transfer
support, continuous reinitialization of an external `scp`/`sftp` client is
avoided, permitting large ``with_filetree`` copies to become practical without
any special casing within the playbook or the Ansible implementation.
DebOps: UK to India
~~~~~~~~~~~~~~~~~~~
.. image:: images/ansible/run_hostname_100_times_mito.svg
.. image:: images/ansible/run_hostname_100_times_plain.svg
This is an all-green run of 246 tasks from the `DebOps
<https://docs.debops.org/en/master/>`_ 0.7.2 `common.yml
<https://github.com/debops/debops-playbooks/blob/master/playbooks/common.yml>`_
playbook over a ~370 ms link between the UK and India. The playbook touches a
wide variety of modules, many featuring unavoidable waits for slow computation
on the target.
More tasks of a wider variety are featured than previously, placing strain on
Mitogen's module loading and in-memory caching. By running over a long-distance
connection, it highlights behaviour of the connection layer in the presence of
high latency.
Kathmandu to Paris
~~~~~~~~~~~~~~~~~~
Mitogen requires **14.5x less bandwidth and 4x less time**.
This is a full Django application playbook over a ~180ms link between Kathmandu
and Paris. Aside from large pauses where the host performs useful work, the
high latency of this link means Mitogen only manages a 1.7x speedup.
.. image:: images/ansible/pcaps/debops-uk-india.svg
Many early roundtrips are due to inefficiencies in Mitogen's importer that will
be fixed over time, however the majority, comprising at least 10 seconds, are
due to idling while the host's previous result and next command are in-flight
on the network.
The initial extension lays groundwork for exciting structural changes to the
execution model: a future version will tackle latency head-on by delegating
some control flow to the target host, melding the performance and scalability
benefits of pull-based operation with the management simplicity of push-based
operation.
Django App: UK to India
~~~~~~~~~~~~~~~~~~~~~~~
.. image:: images/ansible/costapp.png
This short playbook features only 23 steps executed over the same ~370 ms link
as previously, with many steps running unavoidably expensive tasks like
building C++ code, and compiling static web site assets.
Despite the small margin for optimization, Mitogen still manages **6.2x less
bandwidth and 1.8x less time**.
.. image:: images/ansible/pcaps/costapp-uk-india.svg

@ -131,6 +131,10 @@ v0.2.4 (2018-??-??)
Mitogen for Ansible
~~~~~~~~~~~~~~~~~~~
This release includes a huge variety of important fixes and new optimizations.
It is 35% faster than 0.2.3 on a synthetic 64 target run that places heavy load
on the connection multiplexer.
Enhancements
^^^^^^^^^^^^
@ -138,7 +142,7 @@ Enhancements
`#351 <https://github.com/dw/mitogen/issues/351>`_,
`#352 <https://github.com/dw/mitogen/issues/352>`_: disconnect propagation
has improved, allowing Ansible to cancel waits for responses from abruptly
disconnected targets. This ensures a task will gracefully fail rather than
disconnected targets. This ensures a task will reliably fail rather than
hang, for example on network failure or EC2 instance maintenance.
* `#369 <https://github.com/dw/mitogen/issues/369>`_,
@ -154,20 +158,6 @@ Enhancements
``mitogen_host_pinned`` strategy wraps the ``host_pinned`` strategy
introduced in Ansible 2.7.
* `#412 <https://github.com/dw/mitogen/issues/412>`_: to simplify diagnosing
issues with connection configuration, Mitogen ships with a
``mitogen_get_stack`` action that is automatically added to the action
plug-in path. See :ref:`mitogen-get-stack` for more information.
* `#415 <https://github.com/dw/mitogen/issues/415>`_,
`#493 <https://github.com/dw/mitogen/issues/493>`_: the interface employed
for in-process queues changed from `kqueue
<https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2>`_ / `epoll
<http://man7.org/linux/man-pages/man7/epoll.7.html>`_ to `poll()
<http://man7.org/linux/man-pages/man2/poll.2.html>`_, which requires no setup
or teardown, yielding a 38% latency reduction for inter-thread communication.
This may manifest as a runtime improvement in many-host runs.
* `#477 <https://github.com/dw/mitogen/issues/477>`_: Python 2.4 is fully
supported by the core library and tested automatically, in any parent/child
combination of 2.4, 2.6, 2.7 and 3.6 interpreters.
@ -178,6 +168,50 @@ Enhancements
managed with Mitogen. The ``simplejson`` package need not be installed on
such targets, as is usually required by Ansible.
* `#412 <https://github.com/dw/mitogen/issues/412>`_: to simplify diagnosing
connection configuration problems, Mitogen ships a ``mitogen_get_stack``
action that is automatically added to the action plug-in path. See
:ref:`mitogen-get-stack` for more information.
* `152effc2 <https://github.com/dw/mitogen/commit/152effc2>`_,
`bd4b04ae <https://github.com/dw/mitogen/commit/bd4b04ae>`_: a CPU affinity
policy was added for Linux controllers, reducing latency and SMP overhead on
hot paths exercised for every task. This yielded a 19% speedup in a 64-target
job composed of many short tasks, and should easily be visible as a runtime
improvement in many-host runs.
* `2b44d598 <https://github.com/dw/mitogen/commit/2b44d598>`_: work around a
defective caching mechanism by pre-heating it before spawning workers. This
saves 40% runtime on a synthetic repetitive task.
* `0979422a <https://github.com/dw/mitogen/commit/0979422a>`_: an expensive
dependency scanning step was redundantly invoked for every task,
bottlenecking the connection multiplexer.
* `eaa990a97 <https://github.com/dw/mitogen/commit/eaa990a97>`_: a new
``mitogen_ssh_compression`` variable is supported, allowing Mitogen's default
SSH compression to be disabled. SSH compression is a large contributor to CPU
usage in many-target runs, and severely limits file transfer. On a `"shell:
hostname"` task repeated 500 times, Mitogen requires around 800 bytes per
task with compression, rising to 3 KiB without. File transfer throughput
rises from ~25MiB/s when enabled to ~200MiB/s when disabled.
* `#260 <https://github.com/dw/mitogen/issues/260>`_,
`a18a083c <https://github.com/dw/mitogen/commit/a18a083c>`_: brokers no
longer wait for readiness indication to transmit, and instead assume
transmission will succeed. As this is usually true, one loop iteration and
two poller reconfigurations are avoided, yielding a significant reduction in
interprocess round-trip latency.
* `#415 <https://github.com/dw/mitogen/issues/415>`_,
`#491 <https://github.com/dw/mitogen/issues/491>`_,
`#493 <https://github.com/dw/mitogen/issues/493>`_: the interface employed
for in-process queues changed from `kqueue
<https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2>`_ / `epoll
<http://man7.org/linux/man-pages/man7/epoll.7.html>`_ to `poll()
<http://man7.org/linux/man-pages/man2/poll.2.html>`_, which requires no setup
or teardown, yielding a 38% latency reduction for inter-thread communication.
Fixes
^^^^^
@ -279,6 +313,10 @@ Fixes
when starting async tasks, where it was possible for the controller to
observe no status file on disk before the task had a chance to write one.
* `2c7af9f04 <https://github.com/dw/mitogen/commit/2c7af9f04>`_: Ansible
modules were repeatedly re-transferred. The bug was hidden by the previously
mandatorily enabled SSH compression.
Core Library
~~~~~~~~~~~~

Before

Width:  |  Height:  |  Size: 8.0 KiB

After

Width:  |  Height:  |  Size: 8.0 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 60 KiB

@ -1,3 +1,3 @@
*.pcapng filter=lfs diff=lfs merge=lfs -text
**pcap** filter=lfs diff=lfs merge=lfs -text
run_hostname_100_times_mito.pcap.gz filter=lfs diff=lfs merge=lfs -text
run_hostname_100_times_vanilla.pcap.gz filter=lfs diff=lfs merge=lfs -text

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 26 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 33 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 48 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 36 KiB

@ -0,0 +1,16 @@
import sys
# Add viewBox attr to SVGs lacking it, so IE scales properly.
import lxml.etree
import glob
for name in sys.argv[1:]: # glob.glob('*/*.svg'): #+ glob.glob('images/ansible/*.svg'):
doc = lxml.etree.parse(open(name))
svg = doc.getroot()
for elem in svg.cssselect('[stroke-width]'):
if elem.attrib['stroke-width'] < '2':
elem.attrib['stroke-width'] = '2'
open(name, 'w').write(lxml.etree.tostring(svg, xml_declaration=True, encoding='UTF-8'))

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 84 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 99 KiB

@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:6d9b4d4ff263003bd16e44c265783e7c1deff19950e453e3adeb8a6ab5052081
size 175120

@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:7a993832501b7948a38c2e8ced3467d1cb04279a57ddd6afc735ed96ec509c08
size 7623337

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
On the Mitogen master, this is imported from ``mitogen/__init__.py`` as would
be expected. On the slave, it is built dynamically during startup.
@ -57,7 +59,12 @@ parent_id = None
parent_ids = []
def main(log_level='INFO', profiling=False):
import os
_default_profiling = os.environ.get('MITOGEN_PROFILING') is not None
del os
def main(log_level='INFO', profiling=_default_profiling):
"""
Convenience decorator primarily useful for writing discardable test
scripts.
@ -106,7 +113,7 @@ def main(log_level='INFO', profiling=False):
mitogen.master.Router.profiling = profiling
utils.log_to_file(level=log_level)
return mitogen.core._profile_hook(
'main',
'app.main',
utils.run_with_router,
func,
)

@ -1,288 +0,0 @@
# encoding: utf-8
"""Selected backports from Python stdlib functools module
"""
# Written by Nick Coghlan <ncoghlan at gmail.com>,
# Raymond Hettinger <python at rcn.com>,
# and Łukasz Langa <lukasz at langa.pl>.
# Copyright (C) 2006-2013 Python Software Foundation.
__all__ = [
'update_wrapper', 'wraps', 'WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES',
'lru_cache',
]
from threading import RLock
################################################################################
### update_wrapper() and wraps() decorator
################################################################################
# update_wrapper() and wraps() are tools to help write
# wrapper functions that can handle naive introspection
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__qualname__', '__doc__',
'__annotations__')
WRAPPER_UPDATES = ('__dict__',)
def update_wrapper(wrapper,
wrapped,
assigned = WRAPPER_ASSIGNMENTS,
updated = WRAPPER_UPDATES):
"""Update a wrapper function to look like the wrapped function
wrapper is the function to be updated
wrapped is the original function
assigned is a tuple naming the attributes assigned directly
from the wrapped function to the wrapper function (defaults to
functools.WRAPPER_ASSIGNMENTS)
updated is a tuple naming the attributes of the wrapper that
are updated with the corresponding attribute from the wrapped
function (defaults to functools.WRAPPER_UPDATES)
"""
for attr in assigned:
try:
value = getattr(wrapped, attr)
except AttributeError:
pass
else:
setattr(wrapper, attr, value)
for attr in updated:
getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
# Issue #17482: set __wrapped__ last so we don't inadvertently copy it
# from the wrapped function when updating __dict__
wrapper.__wrapped__ = wrapped
# Return the wrapper so this can be used as a decorator via partial()
return wrapper
def wraps(wrapped,
assigned = WRAPPER_ASSIGNMENTS,
updated = WRAPPER_UPDATES):
"""Decorator factory to apply update_wrapper() to a wrapper function
Returns a decorator that invokes update_wrapper() with the decorated
function as the wrapper argument and the arguments to wraps() as the
remaining arguments. Default arguments are as for update_wrapper().
This is a convenience function to simplify applying partial() to
update_wrapper().
"""
return partial(update_wrapper, wrapped=wrapped,
assigned=assigned, updated=updated)
################################################################################
### partial() argument application
################################################################################
# Purely functional, no descriptor behaviour
def partial(func, *args, **keywords):
"""New function with partial application of the given arguments
and keywords.
"""
if hasattr(func, 'func'):
args = func.args + args
tmpkw = func.keywords.copy()
tmpkw.update(keywords)
keywords = tmpkw
del tmpkw
func = func.func
def newfunc(*fargs, **fkeywords):
newkeywords = keywords.copy()
newkeywords.update(fkeywords)
return func(*(args + fargs), **newkeywords)
newfunc.func = func
newfunc.args = args
newfunc.keywords = keywords
return newfunc
################################################################################
### LRU Cache function decorator
################################################################################
class _HashedSeq(list):
""" This class guarantees that hash() will be called no more than once
per element. This is important because the lru_cache() will hash
the key multiple times on a cache miss.
"""
__slots__ = 'hashvalue'
def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)
def __hash__(self):
return self.hashvalue
def _make_key(args, kwds, typed,
kwd_mark = (object(),),
fasttypes = set([int, str, frozenset, type(None)]),
sorted=sorted, tuple=tuple, type=type, len=len):
"""Make a cache key from optionally typed positional and keyword arguments
The key is constructed in a way that is flat as possible rather than
as a nested structure that would take more memory.
If there is only a single argument and its data type is known to cache
its hash value, then that argument is returned without a wrapper. This
saves space and improves lookup speed.
"""
key = args
if kwds:
sorted_items = sorted(kwds.items())
key += kwd_mark
for item in sorted_items:
key += item
if typed:
key += tuple(type(v) for v in args)
if kwds:
key += tuple(type(v) for k, v in sorted_items)
elif len(key) == 1 and type(key[0]) in fasttypes:
return key[0]
return _HashedSeq(key)
def lru_cache(maxsize=128, typed=False):
"""Least-recently-used cache decorator.
If *maxsize* is set to None, the LRU features are disabled and the cache
can grow without bound.
If *typed* is True, arguments of different types will be cached separately.
For example, f(3.0) and f(3) will be treated as distinct calls with
distinct results.
Arguments to the cached function must be hashable.
View the cache statistics named tuple (hits, misses, maxsize, currsize)
with f.cache_info(). Clear the cache and statistics with f.cache_clear().
Access the underlying function with f.__wrapped__.
See: http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
"""
# Users should only access the lru_cache through its public API:
# cache_info, cache_clear, and f.__wrapped__
# The internals of the lru_cache are encapsulated for thread safety and
# to allow the implementation to change (including a possible C version).
# Early detection of an erroneous call to @lru_cache without any arguments
# resulting in the inner function being passed to maxsize instead of an
# integer or None.
if maxsize is not None and not isinstance(maxsize, int):
raise TypeError('Expected maxsize to be an integer or None')
def decorating_function(user_function):
wrapper = _lru_cache_wrapper(user_function, maxsize, typed)
return update_wrapper(wrapper, user_function)
return decorating_function
def _lru_cache_wrapper(user_function, maxsize, typed):
# Constants shared by all lru cache instances:
sentinel = object() # unique object used to signal cache misses
make_key = _make_key # build a key from the function arguments
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 # names for the link fields
cache = {}
cache_get = cache.get # bound method to lookup a key or return None
lock = RLock() # because linkedlist updates aren't threadsafe
root = [] # root of the circular doubly linked list
root[:] = [root, root, None, None] # initialize by pointing to self
hits_misses_full_root = [0, 0, False, root]
HITS,MISSES,FULL,ROOT = 0, 1, 2, 3
if maxsize == 0:
def wrapper(*args, **kwds):
# No caching -- just a statistics update after a successful call
result = user_function(*args, **kwds)
hits_misses_full_root[MISSES] += 1
return result
elif maxsize is None:
def wrapper(*args, **kwds):
# Simple caching without ordering or size limit
key = make_key(args, kwds, typed)
result = cache_get(key, sentinel)
if result is not sentinel:
hits_misses_full_root[HITS] += 1
return result
result = user_function(*args, **kwds)
cache[key] = result
hits_misses_full_root[MISSES] += 1
return result
else:
def wrapper(*args, **kwds):
# Size limited caching that tracks accesses by recency
key = make_key(args, kwds, typed)
lock.acquire()
try:
link = cache_get(key)
if link is not None:
# Move the link to the front of the circular queue
root = hits_misses_full_root[ROOT]
link_prev, link_next, _key, result = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
hits_misses_full_root[HITS] += 1
return result
finally:
lock.release()
result = user_function(*args, **kwds)
lock.acquire()
try:
if key in cache:
# Getting here means that this same key was added to the
# cache while the lock was released. Since the link
# update is already done, we need only return the
# computed result and update the count of misses.
pass
elif hits_misses_full_root[FULL]:
# Use the old root to store the new key and result.
oldroot = root = hits_misses_full_root[ROOT]
oldroot[KEY] = key
oldroot[RESULT] = result
# Empty the oldest link and make it the new root.
# Keep a reference to the old key and old result to
# prevent their ref counts from going to zero during the
# update. That will prevent potentially arbitrary object
# clean-up code (i.e. __del__) from running while we're
# still adjusting the links.
root = hits_misses_full_root[ROOT] = oldroot[NEXT]
oldkey = root[KEY]
oldresult = root[RESULT]
root[KEY] = root[RESULT] = None
# Now update the cache dictionary.
del cache[oldkey]
# Save the potentially reentrant cache[key] assignment
# for last, after the root and links have been put in
# a consistent state.
cache[key] = oldroot
else:
# Put result in a new link at the front of the queue.
root = hits_misses_full_root[ROOT]
last = root[PREV]
link = [last, root, key, result]
last[NEXT] = root[PREV] = cache[key] = link
# Use the __len__() method instead of the len() function
# which could potentially be wrapped in an lru_cache itself.
hits_misses_full_root[FULL] = (cache.__len__() >= maxsize)
hits_misses_full_root[MISSES]
finally:
lock.release()
return result
def cache_clear():
"""Clear the cache and cache statistics"""
lock.acquire()
try:
cache.clear()
root = hits_misses_full_root[ROOT]
root[:] = [root, root, None, None]
hits_misses_full[HITS] = 0
hits_misses_full[MISSES] = 0
hits_misses_full[FULL] = False
finally:
lock.release()
wrapper.cache_clear = cache_clear
return wrapper

@ -1,5 +1,7 @@
"""Utilities to support packages."""
# !mitogen: minify_safe
# NOTE: This module must remain compatible with Python 2.3, as it is shared
# by setuptools for distribution with Python 2.3 and up.

@ -22,6 +22,8 @@ are the same, except instead of generating tokens, tokeneater is a callback
function to which the 5 fields described above are passed as 5 arguments,
each time a new token is found."""
# !mitogen: minify_safe
__author__ = 'Ka-Ping Yee <ping@lfw.org>'
__credits__ = ('GvR, ESR, Tim Peters, Thomas Wouters, Fred Drake, '
'Skip Montanaro, Raymond Hettinger')

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
This module implements most package functionality, but remains separate from
non-essential code in order to reduce its size, since it is also serves as the
@ -40,8 +42,9 @@ import fcntl
import itertools
import linecache
import logging
import pickle as py_pickle
import os
import pickle as py_pickle
import pstats
import signal
import socket
import struct
@ -60,6 +63,11 @@ import imp
# Absolute imports for <2.5.
select = __import__('select')
try:
import cProfile
except ImportError:
cProfile = None
try:
import thread
except ImportError:
@ -528,27 +536,48 @@ def enable_debug_logging():
_profile_hook = lambda name, func, *args: func(*args)
_profile_fmt = os.environ.get(
'MITOGEN_PROFILE_FMT',
'/tmp/mitogen.stats.%(pid)s.%(identity)s.%(now)s.%(ext)s',
)
def enable_profiling():
global _profile_hook
import cProfile
import pstats
def _profile_hook(name, func, *args):
profiler = cProfile.Profile()
profiler.enable()
def _profile_hook(name, func, *args):
"""
Call `func(*args)` and return its result. This function is replaced by
:func:`_real_profile_hook` when :func:`enable_profiling` is called. This
interface is obsolete and will be replaced by a signals-based integration
later on.
"""
return func(*args)
def _real_profile_hook(name, func, *args):
profiler = cProfile.Profile()
profiler.enable()
try:
return func(*args)
finally:
path = _profile_fmt % {
'now': int(1e6 * time.time()),
'identity': name,
'pid': os.getpid(),
'ext': '%s'
}
profiler.dump_stats(path % ('pstats',))
profiler.create_stats()
fp = open(path % ('log',), 'w')
try:
return func(*args)
stats = pstats.Stats(profiler, stream=fp)
stats.sort_stats('cumulative')
stats.print_stats()
finally:
profiler.dump_stats('/tmp/mitogen.stats.%d.%s.pstat' % (os.getpid(), name))
profiler.create_stats()
fp = open('/tmp/mitogen.stats.%d.%s.log' % (os.getpid(), name), 'w')
try:
stats = pstats.Stats(profiler, stream=fp)
stats.sort_stats('cumulative')
stats.print_stats()
finally:
fp.close()
fp.close()
def enable_profiling(econtext=None):
global _profile_hook
_profile_hook = _real_profile_hook
def import_module(modname):
@ -1684,11 +1713,9 @@ class Stream(BasicStream):
msg.reply_to or 0, len(msg.data)) + msg.data
if not self._output_buf_len:
# Modifying epoll/Kqueue state is expensive, as is needless broker
# loop iterations. Rather than wait for writeability, simply
# attempt to write immediately, and only fall back to
# start_transmit()/on_transmit() if an error occurred or the socket
# buffer was full.
# Modifying epoll/Kqueue state is expensive, as are needless broker
# loops. Rather than wait for writeability, just write immediately,
# and fall back to the broker loop on error or full buffer.
try:
n = self.transmit_side.write(pkt)
if n:
@ -1698,7 +1725,6 @@ class Stream(BasicStream):
except OSError:
pass
if not self._output_buf_len:
self._router.broker._start_transmit(self)
self._output_buf.append(pkt)
self._output_buf_len += len(pkt)
@ -2816,7 +2842,7 @@ class Broker(object):
)
self._thread = threading.Thread(
target=self._broker_main,
name='mitogen-broker'
name='mitogen.broker'
)
self._thread.start()
@ -2968,7 +2994,7 @@ class Broker(object):
self._broker_exit()
def _broker_main(self):
_profile_hook('mitogen-broker', self._do_broker_main)
_profile_hook('mitogen.broker', self._do_broker_main)
fire(self, 'exit')
def shutdown(self):
@ -3064,7 +3090,7 @@ class Dispatcher(object):
if self.econtext.config.get('on_start'):
self.econtext.config['on_start'](self.econtext)
_profile_hook('main', self._dispatch_calls)
_profile_hook('mitogen.child_main', self._dispatch_calls)
class ExternalContext(object):

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
Basic signal handler for dumping thread stacks.
"""

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import mitogen.core

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import mitogen.core

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
:mod:`mitogen.fakessh` is a stream implementation that starts a subprocess with
its environment modified such that ``PATH`` searches for `ssh` return a Mitogen

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import os
import random

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import mitogen.core

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import mitogen.core

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import mitogen.core

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import mitogen.core

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
This module implements functionality required by master processes, such as
starting new contexts via SSH. Its size is also restricted, since it must
@ -140,7 +142,7 @@ def get_child_modules(path):
return [to_text(name) for _, name, _ in it]
def get_core_source():
def _get_core_source():
"""
Master version of parent.get_core_source().
"""
@ -150,7 +152,7 @@ def get_core_source():
if mitogen.is_master:
# TODO: find a less surprising way of installing this.
mitogen.parent.get_core_source = get_core_source
mitogen.parent._get_core_source = _get_core_source
LOAD_CONST = dis.opname.index('LOAD_CONST')
@ -707,10 +709,15 @@ class ModuleResponder(object):
self.blacklist = []
self.whitelist = ['']
#: Context -> set([fullname, ..])
self._forwarded_by_context = {}
#: Number of GET_MODULE messages received.
self.get_module_count = 0
#: Total time spent in uncached GET_MODULE.
self.get_module_secs = 0.0
#: Total time spent minifying modules.
self.minify_secs = 0.0
#: Number of successful LOAD_MODULE messages sent.
self.good_load_module_count = 0
#: Total bytes in successful LOAD_MODULE payloads.
@ -767,6 +774,8 @@ class ModuleResponder(object):
def _make_negative_response(self, fullname):
return (fullname, None, None, None, ())
minify_safe_re = re.compile(b(r'\s+#\s*!mitogen:\s*minify_safe'))
def _build_tuple(self, fullname):
if fullname in self._cache:
return self._cache[fullname]
@ -792,6 +801,12 @@ class ModuleResponder(object):
self._cache[fullname] = tup
return tup
if self.minify_safe_re.search(source):
# If the module contains a magic marker, it's safe to minify.
t0 = time.time()
source = mitogen.minify.minimize_source(source).encode('utf-8')
self.minify_secs += time.time() - t0
if is_pkg:
pkg_present = get_child_modules(path)
LOG.debug('_build_tuple(%r, %r) -> %r',
@ -820,13 +835,14 @@ class ModuleResponder(object):
def _send_load_module(self, stream, fullname):
if fullname not in stream.sent_modules:
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
tup = self._build_tuple(fullname)
msg = mitogen.core.Message.pickled(
tup,
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
LOG.debug('%s: sending module %s (%.2f KiB)',
stream.name, fullname, len(msg.data) / 1024.0)
self._router._async_route(msg)
stream.sent_modules.add(fullname)
if tup[2] is not None:
@ -846,6 +862,9 @@ class ModuleResponder(object):
)
def _send_module_and_related(self, stream, fullname):
if fullname in stream.sent_modules:
return
try:
tup = self._build_tuple(fullname)
for name in tup[4]: # related
@ -889,6 +908,14 @@ class ModuleResponder(object):
)
def _forward_one_module(self, context, fullname):
forwarded = self._forwarded_by_context.get(context)
if forwarded is None:
forwarded = set()
self._forwarded_by_context[context] = forwarded
if fullname in forwarded:
return
path = []
while fullname:
path.append(fullname)
@ -984,7 +1011,7 @@ class Router(mitogen.parent.Router):
#: of any :class:`Broker`, e.g. via::
#:
#: mitogen.master.Router.profiling = True
profiling = False
profiling = os.environ.get('MITOGEN_PROFILING') is not None
def __init__(self, broker=None, max_message_size=None):
if broker is None:
@ -1009,6 +1036,7 @@ class Router(mitogen.parent.Router):
super(Router, self)._on_broker_exit()
dct = self.get_stats()
dct['self'] = self
dct['minify_ms'] = 1000 * dct['minify_secs']
dct['get_module_ms'] = 1000 * dct['get_module_secs']
dct['good_load_module_size_kb'] = dct['good_load_module_size'] / 1024.0
dct['good_load_module_size_avg'] = (
@ -1022,7 +1050,8 @@ class Router(mitogen.parent.Router):
'%(self)r: stats: '
'%(get_module_count)d module requests in '
'%(get_module_ms)d ms, '
'%(good_load_module_count)d sent, '
'%(good_load_module_count)d sent '
'(%(minify_ms)d ms minify time), '
'%(bad_load_module_count)d negative responses. '
'Sent %(good_load_module_size_kb).01f kb total, '
'%(good_load_module_size_avg).01f kb avg.'
@ -1047,6 +1076,8 @@ class Router(mitogen.parent.Router):
:data:`mitogen.core.LOAD_MODULE` message payloads.
* `bad_load_module_count`: Integer count of negative
:data:`mitogen.core.LOAD_MODULE` messages sent.
* `minify_secs`: CPU seconds spent minifying modules marked
minify-safe.
"""
return {
'get_module_count': self.responder.get_module_count,
@ -1054,6 +1085,7 @@ class Router(mitogen.parent.Router):
'good_load_module_count': self.responder.good_load_module_count,
'good_load_module_size': self.responder.good_load_module_size,
'bad_load_module_count': self.responder.bad_load_module_count,
'minify_secs': self.responder.minify_secs,
}
def enable_debug(self):

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import sys
try:
@ -40,13 +42,7 @@ if sys.version_info < (2, 7, 11):
else:
import tokenize
try:
from functools import lru_cache
except ImportError:
from mitogen.compat.functools import lru_cache
@lru_cache()
def minimize_source(source):
"""Remove comments and docstrings from Python `source`, preserving line
numbers and syntax of empty blocks.

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
This module defines functionality common to master and parent processes. It is
sent to any child context that is due to become a parent, due to recursive
@ -115,6 +117,11 @@ def _ioctl_cast(n):
return n
# If not :data:`None`, called prior to exec() of any new child process. Used by
# :func:`mitogen.utils.reset_affinity` to allow the child to be freely
# scheduled.
_preexec_hook = None
# Get PTY number; asm-generic/ioctls.h
LINUX_TIOCGPTN = _ioctl_cast(2147767344)
@ -150,7 +157,11 @@ def get_sys_executable():
return '/usr/bin/python'
def get_core_source():
_core_source_lock = threading.Lock()
_core_source_partial = None
def _get_core_source():
"""
In non-masters, simply fetch the cached mitogen.core source code via the
import mechanism. In masters, this function is replaced with a version that
@ -159,6 +170,26 @@ def get_core_source():
return inspect.getsource(mitogen.core)
def get_core_source_partial():
"""
_get_core_source() is expensive, even with @lru_cache in minify.py, threads
can enter it simultaneously causing severe slowdowns.
"""
global _core_source_partial
if _core_source_partial is None:
_core_source_lock.acquire()
try:
if _core_source_partial is None:
_core_source_partial = PartialZlib(
_get_core_source().encode('utf-8')
)
finally:
_core_source_lock.release()
return _core_source_partial
def get_default_remote_name():
"""
Return the default name appearing in argv[0] of remote machines.
@ -262,7 +293,13 @@ def detach_popen(**kwargs):
# handling, without tying the surrounding code into managing a Popen
# object, which isn't possible for at least :mod:`mitogen.fork`. This
# should be replaced by a swappable helper class in a future version.
proc = subprocess.Popen(**kwargs)
real_preexec_fn = kwargs.pop('preexec_fn', None)
def preexec_fn():
if _preexec_hook:
_preexec_hook()
if real_preexec_fn:
real_preexec_fn()
proc = subprocess.Popen(preexec_fn=preexec_fn, **kwargs)
proc._child_created = False
return proc.pid
@ -419,11 +456,11 @@ def tty_create_child(args):
`(pid, tty_fd, None)`
"""
master_fd, slave_fd = openpty()
mitogen.core.set_block(slave_fd)
disable_echo(master_fd)
disable_echo(slave_fd)
try:
mitogen.core.set_block(slave_fd)
disable_echo(master_fd)
disable_echo(slave_fd)
pid = detach_popen(
args=args,
stdin=slave_fd,
@ -456,27 +493,30 @@ def hybrid_tty_create_child(args):
`(pid, socketpair_fd, tty_fd)`
"""
master_fd, slave_fd = openpty()
parentfp, childfp = create_socketpair()
mitogen.core.set_block(slave_fd)
mitogen.core.set_block(childfp)
disable_echo(master_fd)
disable_echo(slave_fd)
try:
pid = detach_popen(
args=args,
stdin=childfp,
stdout=childfp,
stderr=slave_fd,
preexec_fn=_acquire_controlling_tty,
close_fds=True,
)
disable_echo(master_fd)
disable_echo(slave_fd)
mitogen.core.set_block(slave_fd)
parentfp, childfp = create_socketpair()
try:
mitogen.core.set_block(childfp)
pid = detach_popen(
args=args,
stdin=childfp,
stdout=childfp,
stderr=slave_fd,
preexec_fn=_acquire_controlling_tty,
close_fds=True,
)
except Exception:
parentfp.close()
childfp.close()
raise
except Exception:
os.close(master_fd)
os.close(slave_fd)
parentfp.close()
childfp.close()
raise
os.close(slave_fd)
@ -536,6 +576,43 @@ def write_all(fd, s, deadline=None):
poller.close()
class PartialZlib(object):
"""
Because the mitogen.core source has a line appended to it during bootstrap,
it must be recompressed for each connection. This is not a problem for a
small number of connections, but it amounts to 30 seconds CPU time by the
time 500 targets are in use.
For that reason, build a compressor containing mitogen.core and flush as
much of it as possible into an initial buffer. Then to append the custom
line, clone the compressor and compress just that line.
A full compression costs ~6ms on a modern machine, this method costs ~35
usec.
"""
def __init__(self, s):
self.s = s
if sys.version_info > (2, 5):
self._compressor = zlib.compressobj(9)
self._out = self._compressor.compress(s)
self._out += self._compressor.flush(zlib.Z_SYNC_FLUSH)
else:
self._compressor = None
def append(self, s):
"""
Append the bytestring `s` to the compressor state and return the
final compressed output.
"""
if self._compressor is None:
return zlib.compress(self.s + s, 9)
else:
compressor = self._compressor.copy()
out = self._out
out += compressor.compress(s)
return out + compressor.flush()
class IteratingRead(object):
def __init__(self, fds, deadline=None):
self.deadline = deadline
@ -1147,15 +1224,16 @@ class Stream(mitogen.core.Stream):
LOG.debug('%r: PID %d %s', self, pid, wstatus_to_str(status))
return
# For processes like sudo we cannot actually send sudo a signal,
# because it is setuid, so this is best-effort only.
LOG.debug('%r: child process still alive, sending SIGTERM', self)
try:
os.kill(self.pid, signal.SIGTERM)
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EPERM:
raise
if not self._router.profiling:
# For processes like sudo we cannot actually send sudo a signal,
# because it is setuid, so this is best-effort only.
LOG.debug('%r: child process still alive, sending SIGTERM', self)
try:
os.kill(self.pid, signal.SIGTERM)
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EPERM:
raise
def on_disconnect(self, broker):
super(Stream, self).on_disconnect(broker)
@ -1263,11 +1341,12 @@ class Stream(mitogen.core.Stream):
}
def get_preamble(self):
source = get_core_source()
source += '\nExternalContext(%r).main()\n' % (
self.get_econtext_config(),
suffix = (
'\nExternalContext(%r).main()\n' %\
(self.get_econtext_config(),)
)
return zlib.compress(source.encode('utf-8'), 9)
partial = get_core_source_partial()
return partial.append(suffix.encode('utf-8'))
def start_child(self):
args = self.get_boot_command()

@ -0,0 +1,166 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""mitogen.profiler
Record and report cProfile statistics from a run. Creates one aggregated
output file, one aggregate containing only workers, and one for the
top-level process.
Usage:
mitogen.profiler record <dest_path> <tool> [args ..]
mitogen.profiler report <dest_path> [sort_mode]
mitogen.profiler stat <sort_mode> <tool> [args ..]
Mode:
record: Record a trace.
report: Report on a previously recorded trace.
stat: Record and report in a single step.
Where:
dest_path: Filesystem prefix to write .pstats files to.
sort_mode: Sorting mode; defaults to "cumulative". See:
https://docs.python.org/2/library/profile.html#pstats.Stats.sort_stats
Example:
mitogen.profiler record /tmp/mypatch ansible-playbook foo.yml
mitogen.profiler dump /tmp/mypatch-worker.pstats
"""
from __future__ import print_function
import os
import pstats
import cProfile
import shutil
import subprocess
import sys
import tempfile
import time
import mitogen.core
def try_merge(stats, path):
try:
stats.add(path)
return True
except Exception as e:
print('Failed. Race? Will retry. %s' % (e,))
return False
def merge_stats(outpath, inpaths):
first, rest = inpaths[0], inpaths[1:]
for x in range(5):
try:
stats = pstats.Stats(first)
except EOFError:
time.sleep(0.2)
continue
print("Writing %r..." % (outpath,))
for path in rest:
#print("Merging %r into %r.." % (os.path.basename(path), outpath))
for x in range(5):
if try_merge(stats, path):
break
time.sleep(0.2)
stats.dump_stats(outpath)
def generate_stats(outpath, tmpdir):
print('Generating stats..')
all_paths = []
paths_by_ident = {}
for name in os.listdir(tmpdir):
if name.endswith('-dump.pstats'):
ident, _, pid = name.partition('-')
path = os.path.join(tmpdir, name)
all_paths.append(path)
paths_by_ident.setdefault(ident, []).append(path)
merge_stats('%s-all.pstat' % (outpath,), all_paths)
for ident, paths in paths_by_ident.items():
merge_stats('%s-%s.pstat' % (outpath, ident), paths)
def do_record(tmpdir, path, *args):
env = os.environ.copy()
fmt = '%(identity)s-%(pid)s.%(now)s-dump.%(ext)s'
env['MITOGEN_PROFILING'] = '1'
env['MITOGEN_PROFILE_FMT'] = os.path.join(tmpdir, fmt)
rc = subprocess.call(args, env=env)
generate_stats(path, tmpdir)
return rc
def do_report(tmpdir, path, sort='cumulative'):
stats = pstats.Stats(path).sort_stats(sort)
stats.print_stats(100)
def do_stat(tmpdir, sort, *args):
valid_sorts = pstats.Stats.sort_arg_dict_default
if sort not in valid_sorts:
sys.stderr.write('Invalid sort %r, must be one of %s\n' %
(sort, ', '.join(sorted(valid_sorts))))
sys.exit(1)
outfile = os.path.join(tmpdir, 'combined')
do_record(tmpdir, outfile, *args)
aggs = ('app.main', 'mitogen.broker', 'mitogen.child_main',
'mitogen.service.pool', 'Strategy', 'WorkerProcess',
'all')
for agg in aggs:
path = '%s-%s.pstat' % (outfile, agg)
if os.path.exists(path):
print()
print()
print('------ Aggregation %r ------' % (agg,))
print()
do_report(tmpdir, path, sort)
print()
def main():
if len(sys.argv) < 2 or sys.argv[1] not in ('record', 'report', 'stat'):
sys.stderr.write(__doc__)
sys.exit(1)
func = globals()['do_' + sys.argv[1]]
tmpdir = tempfile.mkdtemp(prefix='mitogen.profiler')
try:
sys.exit(func(tmpdir, *sys.argv[2:]) or 0)
finally:
shutil.rmtree(tmpdir)
if __name__ == '__main__':
main()

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import mitogen.core

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import grp
import os
import os.path
@ -80,6 +82,10 @@ def get_or_create_pool(size=None, router=None):
if _pool_pid != os.getpid():
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE,
overwrite=True)
# In case of Broker shutdown crash, Pool can cause 'zombie'
# processes.
mitogen.core.listen(router.broker, 'shutdown',
lambda: _pool.stop(join=False))
_pool_pid = os.getpid()
return _pool
finally:
@ -467,7 +473,7 @@ class Pool(object):
thread = threading.Thread(
name=name,
target=mitogen.core._profile_hook,
args=(name, self._worker_main),
args=('mitogen.service.pool', self._worker_main),
)
thread.start()
self._threads.append(thread)
@ -634,7 +640,7 @@ class PushFileService(Service):
path=path,
context=context
).close()
else:
elif path not in sent:
child.call_service_async(
service_name=self.name(),
method_name='store_and_forward',
@ -642,6 +648,7 @@ class PushFileService(Service):
data=self._cache[path],
context=context
).close()
sent.add(path)
@expose(policy=AllowParents())
@arg_spec({
@ -708,7 +715,7 @@ class PushFileService(Service):
if path not in self._cache:
LOG.error('%r: %r is not in local cache', self, path)
return
self._forward(path, context)
self._forward(context, path)
class FileService(Service):

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import ctypes
import grp
import logging

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
Functionality to allow establishing new slave contexts over an SSH connection.
"""

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import mitogen.core
@ -64,6 +66,7 @@ class Stream(mitogen.parent.Stream):
b('su: sorry'), # BSD
b('su: authentication failure'), # Linux
b('su: incorrect password'), # CentOS 6
b('authentication is denied'), # AIX
)
def construct(self, username=None, password=None, su_path=None,

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import base64
import logging
import optparse

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
Permit connection of additional contexts that may act with the authority of
this context. For now, the UNIX socket is always mode 0600, i.e. can only be

@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import datetime
import logging
import os
@ -34,6 +36,7 @@ import sys
import mitogen
import mitogen.core
import mitogen.master
import mitogen.parent
LOG = logging.getLogger('mitogen')

@ -28,5 +28,5 @@ timeout = 10
host_key_checking = False
[ssh_connection]
ssh_args = -o ForwardAgent=yes -o ControlMaster=auto -o ControlPersist=60s
ssh_args = -o UserKnownHostsFile=/dev/null -o ForwardAgent=yes -o ControlMaster=auto -o ControlPersist=60s
pipelining = True

@ -7,4 +7,4 @@
- hosts: all
tasks:
- command: hostname
with_sequence: start=1 end=100
with_sequence: start=1 end="{{end|default(100)}}"

@ -11,4 +11,4 @@
mode: 0755
content:
Hello from {{item}}
with_sequence: start=1 end=20
with_sequence: start=1 end={{end|default(20)}}

@ -7,4 +7,31 @@ localhost
# This is only used for manual testing.
[localhost-x10]
localhost-[01:10]
localhost-[001:010]
[localhost-x20]
localhost-[001:020]
[localhost-x30]
localhost-[001:030]
[localhost-x40]
localhost-[001:040]
[localhost-x50]
localhost-[001:050]
[localhost-x60]
localhost-[001:060]
[localhost-x70]
localhost-[001:070]
[localhost-x80]
localhost-[001:080]
[localhost-x90]
localhost-[001:090]
[localhost-x100]
localhost-[001:100]

@ -32,6 +32,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'alias-host',
'identities_only': False,
@ -40,6 +41,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',
@ -56,6 +59,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'cd-normal-alias',
'identities_only': False,
@ -64,6 +68,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',

@ -65,6 +65,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'alias-host',
'identities_only': False,
@ -73,6 +74,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',
@ -102,6 +105,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'alias-host',
'identities_only': False,
@ -110,6 +114,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',
@ -149,6 +155,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'cd-normal-normal',
'identities_only': False,
@ -157,6 +164,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',
@ -186,6 +195,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'alias-host',
'identities_only': False,
@ -194,6 +204,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',
@ -210,6 +222,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'cd-normal-alias',
'identities_only': False,
@ -218,6 +231,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',
@ -257,6 +272,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'cd-newuser-normal-normal',
'identities_only': False,
@ -265,6 +281,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',
@ -295,6 +313,7 @@
{
'kwargs': {
'check_host_keys': 'ignore',
'compression': True,
'connect_timeout': 10,
'hostname': 'alias-host',
'identities_only': False,
@ -303,6 +322,8 @@
'port': null,
'python_path': null,
'ssh_args': [
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'ForwardAgent=yes',
'-o',

@ -12,6 +12,9 @@ from ansible.plugins.action import ActionBase
class ActionModule(ActionBase):
# Running this for every host is pointless.
BYPASS_HOST_LOOP = True
def run(self, tmp=None, task_vars=None):
if not isinstance(self._connection,
ansible_mitogen.connection.Connection):

@ -0,0 +1,63 @@
import multiprocessing
import os
import tempfile
import mock
import unittest2
import testlib
import mitogen.parent
import ansible_mitogen.affinity
@unittest2.skipIf(
reason='Linux/SMP only',
condition=(not (
os.uname()[0] == 'Linux' and
multiprocessing.cpu_count() >= 4
))
)
class LinuxPolicyTest(testlib.TestCase):
klass = ansible_mitogen.affinity.LinuxPolicy
def setUp(self):
self.policy = self.klass()
def _get_cpus(self, path='/proc/self/status'):
fp = open(path)
try:
for line in fp:
if line.startswith('Cpus_allowed'):
return int(line.split()[1], 16)
finally:
fp.close()
def test_set_clear(self):
before = self._get_cpus()
self.policy._set_cpu(3)
self.assertEquals(self._get_cpus(), 1 << 3)
self.policy._clear()
self.assertEquals(self._get_cpus(), before)
def test_clear_on_popen(self):
tf = tempfile.NamedTemporaryFile()
try:
before = self._get_cpus()
self.policy._set_cpu(3)
my_cpu = self._get_cpus()
pid = mitogen.parent.detach_popen(
args=['cp', '/proc/self/status', tf.name]
)
os.waitpid(pid, 0)
his_cpu = self._get_cpus(tf.name)
self.assertNotEquals(my_cpu, his_cpu)
self.policy._clear()
finally:
tf.close()
if __name__ == '__main__':
unittest2.main()

@ -7,8 +7,10 @@ import time
import mitogen
import mitogen.utils
import ansible_mitogen.affinity
mitogen.utils.setup_gil()
ansible_mitogen.affinity.policy.assign_worker()
X = 20000

@ -6,7 +6,19 @@
#
[ ! "$1" ] && exit 1
sudo tcpdump -w $1-out.cap -s 0 host k1.botanicus.net &
date +%s.%N > $1-task-clock.csv
perf stat -x, -I 25 -e task-clock --append -o $1-task-clock.csv ansible-playbook run_hostname_100_times.yml
name="$1"; shift
sudo tcpdump -i any -w $name-net.pcap -s 66 port 22 or port 9122 &
sleep 0.5
perf stat -x, -I 100 \
-e branches \
-e instructions \
-e task-clock \
-e context-switches \
-e page-faults \
-e cpu-migrations \
-o $name-perf.csv "$@"
pkill -f ssh:; sleep 0.1
sudo pkill -f tcpdump

@ -2,19 +2,23 @@
Measure latency of .local() setup.
"""
import mitogen
import time
import mitogen
import mitogen.utils
import ansible_mitogen.affinity
mitogen.utils.setup_gil()
#ansible_mitogen.affinity.policy.assign_worker()
@mitogen.main()
def main(router):
for x in range(1000):
t0=time.time()
for x in range(100):
t = time.time()
f = router.local()# debug=True)
tt = time.time()
print(x, 1000 * (tt - t))
print(f)
print('EEK', f.call(socket.gethostname))
print('MY PID', os.getpid())
print('EEKERY', f.call(os.getpid))
print('%.03f ms' % (1000 * (time.time() - t0) / (1.0 + x)))

@ -6,8 +6,10 @@ import time
import mitogen
import mitogen.utils
import ansible_mitogen.affinity
mitogen.utils.setup_gil()
ansible_mitogen.affinity.policy.assign_worker()
try:
xrange

@ -1,6 +1,7 @@
import codecs
import glob
import pprint
import sys
import unittest2
@ -92,8 +93,15 @@ class MitogenCoreTest(testlib.TestCase):
'mwords': mwords,
})
PY_24_25_SKIP = [
# cProfile unsupported on 2.4, 2.6+ syntax is fine here.
'mitogen/profiler.py',
]
def test_minify_all(self):
for name in glob.glob('mitogen/*.py'):
if name in self.PY_24_25_SKIP and sys.version_info < (2, 6):
continue
original = self.read_source(name)
try:
minified = self.func(original)

@ -220,12 +220,11 @@ class FindRelatedTest(testlib.TestCase):
u'mitogen.parent',
])
if sys.version_info < (3, 2):
SIMPLE_EXPECT.add('mitogen.compat')
SIMPLE_EXPECT.add('mitogen.compat.functools')
if sys.version_info < (2, 7):
SIMPLE_EXPECT.add('mitogen.compat')
SIMPLE_EXPECT.add('mitogen.compat.tokenize')
if sys.version_info < (2, 6):
SIMPLE_EXPECT.add('mitogen.compat')
SIMPLE_EXPECT.add('mitogen.compat.pkgutil')
def test_simple(self):

@ -175,7 +175,8 @@ class ForwardTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(256, c2.call(plain_old_module.pow, 2, 8))
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(2, self.router.responder.good_load_module_count)
self.assertLess(20000, self.router.responder.good_load_module_size)
self.assertLess(10000, self.router.responder.good_load_module_size)
self.assertGreater(40000, self.router.responder.good_load_module_size)
class BlacklistTest(testlib.TestCase):

@ -1,8 +1,13 @@
#!/usr/bin/env python
import os
import tempfile
import unittest2
import mock
import mitogen.core
import mitogen.parent
import mitogen.master
import mitogen.utils
from mitogen.core import b

Loading…
Cancel
Save