Merge remote-tracking branch 'origin/dmw'

new-serialization
David Wilson 5 years ago
commit e70c316eef

@ -264,7 +264,7 @@ def start_containers(containers):
"docker rm -f %(name)s || true" % container,
"docker run "
"--rm "
"--cpuset-cpus 0,1 "
# "--cpuset-cpus 0,1 "
"--detach "
"--privileged "
"--cap-add=SYS_PTRACE "

@ -7,7 +7,7 @@ batches = [
'docker pull %s' % (ci_lib.image_for_distro(ci_lib.DISTRO),),
],
[
'sudo tar -C / -jxvf tests/data/ubuntu-python-2.4.6.tar.bz2',
'curl https://dw.github.io/mitogen/binaries/ubuntu-python-2.4.6.tar.bz2 | sudo tar -C / -jxv',
]
]

@ -265,9 +265,19 @@ class LinuxPolicy(FixedPolicy):
mask >>= 64
return mitogen.core.b('').join(chunks)
def _get_thread_ids(self):
try:
ents = os.listdir('/proc/self/task')
except OSError:
LOG.debug('cannot fetch thread IDs for current process')
return [os.getpid()]
return [int(s) for s in ents if s.isdigit()]
def _set_cpu_mask(self, mask):
s = self._mask_to_bytes(mask)
_sched_setaffinity(os.getpid(), len(s), s)
for tid in self._get_thread_ids():
_sched_setaffinity(tid, len(s), s)
if _sched_setaffinity is not None:

@ -791,7 +791,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
inventory_name, stack = self._build_stack()
worker_model = ansible_mitogen.process.get_worker_model()
self.binding = worker_model.get_binding(inventory_name)
self.binding = worker_model.get_binding(
mitogen.utils.cast(inventory_name)
)
self._connect_stack(stack)
def _put_connection(self):

@ -55,3 +55,8 @@ except ImportError: # Ansible <2.4
from ansible.plugins import module_utils_loader
from ansible.plugins import shell_loader
from ansible.plugins import strategy_loader
# These are original, unwrapped implementations
action_loader__get = action_loader.get
connection_loader__get = connection_loader.get

@ -107,8 +107,9 @@ def setup():
l_mitogen = logging.getLogger('mitogen')
l_mitogen_io = logging.getLogger('mitogen.io')
l_ansible_mitogen = logging.getLogger('ansible_mitogen')
l_operon = logging.getLogger('operon')
for logger in l_mitogen, l_mitogen_io, l_ansible_mitogen:
for logger in l_mitogen, l_mitogen_io, l_ansible_mitogen, l_operon:
logger.handlers = [Handler(display.vvv)]
logger.propagate = False

@ -55,6 +55,11 @@ import ansible_mitogen.planner
import ansible_mitogen.target
from ansible.module_utils._text import to_text
try:
from ansible.utils.unsafe_proxy import wrap_var
except ImportError:
from ansible.vars.unsafe_proxy import wrap_var
LOG = logging.getLogger(__name__)
@ -306,7 +311,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
except AttributeError:
return getattr(self._task, 'async')
def _temp_file_gibberish(self, module_args, wrap_async):
def _set_temp_file_args(self, module_args, wrap_async):
# Ansible>2.5 module_utils reuses the action's temporary directory if
# one exists. Older versions error if this key is present.
if ansible.__version__ > '2.5':
@ -343,7 +348,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
self._update_module_args(module_name, module_args, task_vars)
env = {}
self._compute_environment_string(env)
self._temp_file_gibberish(module_args, wrap_async)
self._set_temp_file_args(module_args, wrap_async)
self._connection._connect()
result = ansible_mitogen.planner.invoke(
@ -365,7 +370,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
# on _execute_module().
self._remove_tmp_path(tmp)
return result
return wrap_var(result)
def _postprocess_response(self, result):
"""

@ -45,6 +45,7 @@ import random
from ansible.executor import module_common
import ansible.errors
import ansible.module_utils
import ansible.release
import mitogen.core
import mitogen.select
@ -58,6 +59,8 @@ NO_METHOD_MSG = 'Mitogen: no invocation method found for: '
NO_INTERPRETER_MSG = 'module (%s) is missing interpreter line'
NO_MODULE_MSG = 'The module %s was not found in configured module paths.'
_planner_by_path = {}
class Invocation(object):
"""
@ -92,7 +95,12 @@ class Invocation(object):
self.module_path = None
#: Initially ``None``, but set by :func:`invoke`. The raw source or
#: binary contents of the module.
self.module_source = None
self._module_source = None
def get_module_source(self):
if self._module_source is None:
self._module_source = read_file(self.module_path)
return self._module_source
def __repr__(self):
return 'Invocation(module_name=%s)' % (self.module_name,)
@ -107,7 +115,8 @@ class Planner(object):
def __init__(self, invocation):
self._inv = invocation
def detect(self):
@classmethod
def detect(cls, path, source):
"""
Return true if the supplied `invocation` matches the module type
implemented by this planner.
@ -171,8 +180,9 @@ class BinaryPlanner(Planner):
"""
runner_name = 'BinaryRunner'
def detect(self):
return module_common._is_binary(self._inv.module_source)
@classmethod
def detect(cls, path, source):
return module_common._is_binary(source)
def get_push_files(self):
return [mitogen.core.to_text(self._inv.module_path)]
@ -218,7 +228,7 @@ class ScriptPlanner(BinaryPlanner):
def _get_interpreter(self):
path, arg = ansible_mitogen.parsing.parse_hashbang(
self._inv.module_source
self._inv.get_module_source()
)
if path is None:
raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % (
@ -247,8 +257,9 @@ class JsonArgsPlanner(ScriptPlanner):
"""
runner_name = 'JsonArgsRunner'
def detect(self):
return module_common.REPLACER_JSONARGS in self._inv.module_source
@classmethod
def detect(cls, path, source):
return module_common.REPLACER_JSONARGS in source
class WantJsonPlanner(ScriptPlanner):
@ -265,8 +276,9 @@ class WantJsonPlanner(ScriptPlanner):
"""
runner_name = 'WantJsonRunner'
def detect(self):
return b'WANT_JSON' in self._inv.module_source
@classmethod
def detect(cls, path, source):
return b'WANT_JSON' in source
class NewStylePlanner(ScriptPlanner):
@ -278,8 +290,9 @@ class NewStylePlanner(ScriptPlanner):
runner_name = 'NewStyleRunner'
marker = b'from ansible.module_utils.'
def detect(self):
return self.marker in self._inv.module_source
@classmethod
def detect(cls, path, source):
return cls.marker in source
def _get_interpreter(self):
return None, None
@ -323,7 +336,6 @@ class NewStylePlanner(ScriptPlanner):
for path in ansible_mitogen.loaders.module_utils_loader._get_paths(
subdirs=False
)
if os.path.isdir(path)
)
_module_map = None
@ -347,6 +359,10 @@ class NewStylePlanner(ScriptPlanner):
def get_kwargs(self):
return super(NewStylePlanner, self).get_kwargs(
module_map=self.get_module_map(),
py_module_name=py_modname_from_path(
self._inv.module_name,
self._inv.module_path,
),
)
@ -376,14 +392,16 @@ class ReplacerPlanner(NewStylePlanner):
"""
runner_name = 'ReplacerRunner'
def detect(self):
return module_common.REPLACER in self._inv.module_source
@classmethod
def detect(cls, path, source):
return module_common.REPLACER in source
class OldStylePlanner(ScriptPlanner):
runner_name = 'OldStyleRunner'
def detect(self):
@classmethod
def detect(cls, path, source):
# Everything else.
return True
@ -398,14 +416,54 @@ _planners = [
]
def get_module_data(name):
path = ansible_mitogen.loaders.module_loader.find_plugin(name, '')
if path is None:
raise ansible.errors.AnsibleError(NO_MODULE_MSG % (name,))
try:
_get_ansible_module_fqn = module_common._get_ansible_module_fqn
except AttributeError:
_get_ansible_module_fqn = None
with open(path, 'rb') as fp:
source = fp.read()
return mitogen.core.to_text(path), source
def py_modname_from_path(name, path):
"""
Fetch the logical name of a new-style module as it might appear in
:data:`sys.modules` of the target's Python interpreter.
* For Ansible <2.7, this is an unpackaged module named like
"ansible_module_%s".
* For Ansible <2.9, this is an unpackaged module named like
"ansible.modules.%s"
* Since Ansible 2.9, modules appearing within a package have the original
package hierarchy approximated on the target, enabling relative imports
to function correctly. For example, "ansible.modules.system.setup".
"""
# 2.9+
if _get_ansible_module_fqn:
try:
return _get_ansible_module_fqn(path)
except ValueError:
pass
if ansible.__version__ < '2.7':
return 'ansible_module_' + name
return 'ansible.modules.' + name
def read_file(path):
fd = os.open(path, os.O_RDONLY)
try:
bits = []
chunk = True
while True:
chunk = os.read(fd, 65536)
if not chunk:
break
bits.append(chunk)
finally:
os.close(fd)
return mitogen.core.b('').join(bits)
def _propagate_deps(invocation, planner, context):
@ -466,14 +524,12 @@ def _invoke_isolated_task(invocation, planner):
context.shutdown()
def _get_planner(invocation):
def _get_planner(name, path, source):
for klass in _planners:
planner = klass(invocation)
if planner.detect():
LOG.debug('%r accepted %r (filename %r)', planner,
invocation.module_name, invocation.module_path)
return planner
LOG.debug('%r rejected %r', planner, invocation.module_name)
if klass.detect(path, source):
LOG.debug('%r accepted %r (filename %r)', klass, name, path)
return klass
LOG.debug('%r rejected %r', klass, name)
raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation))
@ -488,10 +544,24 @@ def invoke(invocation):
:raises ansible.errors.AnsibleError:
Unrecognized/unsupported module type.
"""
(invocation.module_path,
invocation.module_source) = get_module_data(invocation.module_name)
planner = _get_planner(invocation)
path = ansible_mitogen.loaders.module_loader.find_plugin(
invocation.module_name,
'',
)
if path is None:
raise ansible.errors.AnsibleError(NO_MODULE_MSG % (
invocation.module_name,
))
invocation.module_path = mitogen.core.to_text(path)
if invocation.module_path not in _planner_by_path:
_planner_by_path[invocation.module_path] = _get_planner(
invocation.module_name,
invocation.module_path,
invocation.get_module_source()
)
planner = _planner_by_path[invocation.module_path](invocation)
if invocation.wrap_async:
response = _invoke_async_task(invocation, planner)
elif planner.should_fork():

@ -31,11 +31,6 @@ from __future__ import absolute_import
import os.path
import sys
try:
from ansible.plugins.connection import kubectl
except ImportError:
kubectl = None
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.six import iteritems
@ -47,6 +42,19 @@ except ImportError:
del base_dir
import ansible_mitogen.connection
import ansible_mitogen.loaders
_class = ansible_mitogen.loaders.connection_loader__get(
'kubectl',
class_only=True,
)
if _class:
kubectl = sys.modules[_class.__module__]
del _class
else:
kubectl = None
class Connection(ansible_mitogen.connection.Connection):

@ -42,21 +42,23 @@ DOCUMENTATION = """
options:
"""
import ansible.plugins.connection.ssh
try:
import ansible_mitogen.connection
import ansible_mitogen
except ImportError:
base_dir = os.path.dirname(__file__)
sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..')))
del base_dir
import ansible_mitogen.connection
import ansible_mitogen.loaders
class Connection(ansible_mitogen.connection.Connection):
transport = 'ssh'
vanilla_class = ansible.plugins.connection.ssh.Connection
vanilla_class = ansible_mitogen.loaders.connection_loader__get(
'ssh',
class_only=True,
)
@staticmethod
def _create_control_path(*args, **kwargs):

@ -803,9 +803,10 @@ class NewStyleRunner(ScriptRunner):
#: path => new-style module bytecode.
_code_by_path = {}
def __init__(self, module_map, **kwargs):
def __init__(self, module_map, py_module_name, **kwargs):
super(NewStyleRunner, self).__init__(**kwargs)
self.module_map = module_map
self.py_module_name = py_module_name
def _setup_imports(self):
"""
@ -942,9 +943,22 @@ class NewStyleRunner(ScriptRunner):
self._handle_magic_exception(mod, sys.exc_info()[1])
raise
def _get_module_package(self):
"""
Since Ansible 2.9 __package__ must be set in accordance with an
approximation of the original package hierarchy, so that relative
imports function correctly.
"""
pkg, sep, modname = str_rpartition(self.py_module_name, '.')
if not sep:
return None
if mitogen.core.PY3:
return pkg
return pkg.encode()
def _run(self):
mod = types.ModuleType(self.main_module_name)
mod.__package__ = None
mod.__package__ = self._get_module_package()
# Some Ansible modules use __file__ to find the Ansiballz temporary
# directory. We must provide some temporary path in __file__, but we
# don't want to pointlessly write the module to disk when it never

@ -347,7 +347,8 @@ class ContextService(mitogen.service.Service):
)
def _send_module_forwards(self, context):
self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD)
if hasattr(self.router.responder, 'forward_modules'):
self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD)
_candidate_temp_dirs = None

@ -27,6 +27,7 @@
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
import distutils.version
import os
import signal
import threading
@ -52,8 +53,8 @@ except ImportError:
Sentinel = None
ANSIBLE_VERSION_MIN = '2.3'
ANSIBLE_VERSION_MAX = '2.8'
ANSIBLE_VERSION_MIN = (2, 3)
ANSIBLE_VERSION_MAX = (2, 9)
NEW_VERSION_MSG = (
"Your Ansible version (%s) is too recent. The most recent version\n"
"supported by Mitogen for Ansible is %s.x. Please check the Mitogen\n"
@ -76,13 +77,15 @@ def _assert_supported_release():
an unsupported Ansible release.
"""
v = ansible.__version__
if not isinstance(v, tuple):
v = tuple(distutils.version.LooseVersion(v).version)
if v[:len(ANSIBLE_VERSION_MIN)] < ANSIBLE_VERSION_MIN:
if v[:2] < ANSIBLE_VERSION_MIN:
raise ansible.errors.AnsibleError(
OLD_VERSION_MSG % (v, ANSIBLE_VERSION_MIN)
)
if v[:len(ANSIBLE_VERSION_MAX)] > ANSIBLE_VERSION_MAX:
if v[:2] > ANSIBLE_VERSION_MAX:
raise ansible.errors.AnsibleError(
NEW_VERSION_MSG % (ansible.__version__, ANSIBLE_VERSION_MAX)
)
@ -132,7 +135,7 @@ def wrap_action_loader__get(name, *args, **kwargs):
if ansible.__version__ >= '2.8':
get_kwargs['collection_list'] = kwargs.pop('collection_list', None)
klass = action_loader__get(name, **get_kwargs)
klass = ansible_mitogen.loaders.action_loader__get(name, **get_kwargs)
if klass:
bases = (ansible_mitogen.mixins.ActionModuleMixin, klass)
adorned_klass = type(str(name), bases, {})
@ -141,15 +144,29 @@ def wrap_action_loader__get(name, *args, **kwargs):
return adorned_klass(*args, **kwargs)
REDIRECTED_CONNECTION_PLUGINS = (
'buildah',
'docker',
'kubectl',
'jail',
'local',
'lxc',
'lxd',
'machinectl',
'setns',
'ssh',
)
def wrap_connection_loader__get(name, *args, **kwargs):
"""
While a Mitogen strategy is active, rewrite connection_loader.get() calls
for some transports into requests for a compatible Mitogen transport.
"""
if name in ('buildah', 'docker', 'kubectl', 'jail', 'local',
'lxc', 'lxd', 'machinectl', 'setns', 'ssh'):
if name in REDIRECTED_CONNECTION_PLUGINS:
name = 'mitogen_' + name
return connection_loader__get(name, *args, **kwargs)
return ansible_mitogen.loaders.connection_loader__get(name, *args, **kwargs)
def wrap_worker__run(self):
@ -199,12 +216,7 @@ class AnsibleWrappers(object):
Install our PluginLoader monkey patches and update global variables
with references to the real functions.
"""
global action_loader__get
action_loader__get = ansible_mitogen.loaders.action_loader.get
ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get
global connection_loader__get
connection_loader__get = ansible_mitogen.loaders.connection_loader.get
ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get
global worker__run
@ -215,8 +227,12 @@ class AnsibleWrappers(object):
"""
Uninstall the PluginLoader monkey patches.
"""
ansible_mitogen.loaders.action_loader.get = action_loader__get
ansible_mitogen.loaders.connection_loader.get = connection_loader__get
ansible_mitogen.loaders.action_loader.get = (
ansible_mitogen.loaders.action_loader__get
)
ansible_mitogen.loaders.connection_loader.get = (
ansible_mitogen.loaders.connection_loader__get
)
ansible.executor.process.worker.WorkerProcess.run = worker__run
def install(self):

@ -7,6 +7,7 @@
{# Alabaster ships a completely useless custom.css, suppress it. #}
{%- block extrahead %}
<meta name="referrer" content="strict-origin">
<meta name="google-site-verification" content="oq5hNxRYo25tcfjfs3l6pPxfNgY3JzDYSpskc9q4TYI" />
<meta name="viewport" content="width=device-width, initial-scale=0.9, maximum-scale=0.9" />
{% endblock %}

@ -85,10 +85,15 @@ Installation
Get notified of new releases and important fixes.
<p>
<input type="email" placeholder="E-mail Address" name="email" style="font-size: 105%;">
<input type="email" placeholder="E-mail Address" name="email" style="font-size: 105%;"><br>
<input name="captcha_1" placeholder="Captcha" style="width: 10ex;">
<img class="captcha-image">
<a class="captcha-refresh" href="#">&#x21bb</a>
<button type="submit" style="font-size: 105%;">
Subscribe
</button>
</p>
<div id="emailthanks" style="display:none">
@ -1380,6 +1385,7 @@ bandwidth and 1.8x less time**.
page_id: "operon",
urls: {
save_email: "https://networkgenomics.com/save-email/",
save_email_captcha: "https://networkgenomics.com/save-email/captcha/",
}
}
};

@ -15,12 +15,20 @@ Release Notes
</style>
v0.2.9 (unreleased)
-------------------
v0.2.10 (unreleased)
--------------------
To avail of fixes in an unreleased version, please download a ZIP file
`directly from GitHub <https://github.com/dw/mitogen/>`_.
*(no changes)*
v0.2.9 (2019-11-02)
-------------------
This release contains minimal fixes beyond those required for Ansible 2.9.
* :gh:issue:`633`: :ans:mod:`meta: reset_connection <meta>` could fail to reset
a connection when ``become: true`` was set on the playbook.
@ -30,7 +38,7 @@ Thanks!
Mitogen would not be possible without the support of users. A huge thanks for
bug reports, testing, features and fixes in this release contributed by
`Can Ozokur httpe://github.com/canozokur/>`_,
`Can Ozokur <https://github.com/canozokur/>`_.
v0.2.8 (2019-08-18)
@ -44,10 +52,9 @@ Enhancements
~~~~~~~~~~~~
* :gh:issue:`556`,
:gh:issue:`587`: Ansible 2.8 is supported. `Become plugins
<https://docs.ansible.com/ansible/latest/plugins/become.html>`_ and
`interpreter discovery
<https://docs.ansible.com/ansible/latest/reference_appendices/interpreter_discovery.html>`_
:gh:issue:`587`: Ansible 2.8 is supported.
`Become plugins <https://docs.ansible.com/ansible/latest/plugins/become.html>`_ (:gh:issue:`631`) and
`interpreter discovery <https://docs.ansible.com/ansible/latest/reference_appendices/interpreter_discovery.html>`_ (:gh:issue:`630`)
are not yet handled.
* :gh:issue:`419`, :gh:issue:`470`: file descriptor usage is approximately
@ -67,9 +74,10 @@ Enhancements
is exposed to Ansible as the :ans:conn:`buildah`.
* :gh:issue:`615`: a modified :ans:mod:`fetch` implements streaming transfer
even when ``become`` is active, avoiding excess CPU usage and memory spikes,
and improving performance. A copy of two 512 MiB files drops from 47 seconds
to 7 seconds, with peak memory usage dropping from 10.7 GiB to 64.8 MiB.
even when ``become`` is active, avoiding excess CPU and memory spikes, and
improving performance. A representative copy of two 512 MiB files drops from
55.7 seconds to 6.3 seconds, with peak memory usage dropping from 10.7 GiB to
64.8 MiB. [#i615]_
* `Operon <https://networkgenomics.com/operon/>`_ no longer requires a custom
library installation, both Ansible and Operon are supported by a single
@ -96,8 +104,7 @@ Mitogen for Ansible
a broken heuristic in common SELinux policies that prevents inheriting
:linux:man7:`unix` sockets across privilege domains.
* `#467 <httpe://github.com/dw/mitogen/issues/467>`_: an incompatibility
running Mitogen under `Molecule
* :gh:issue:`467`: an incompatibility running Mitogen under `Molecule
<https://molecule.readthedocs.io/en/stable/>`_ was resolved.
* :gh:issue:`547`, :gh:issue:`598`: fix a deadlock during initialization of
@ -139,7 +146,7 @@ Mitogen for Ansible
encoding.
* :gh:issue:`602`: connection configuration is more accurately inferred for
:ans:mod:`meta: reset_connection <meta>` the :ans:mod:`synchronize`, and for
:ans:mod:`meta: reset_connection <meta>`, the :ans:mod:`synchronize`, and for
any action plug-ins that establish additional connections.
* :gh:issue:`598`, :gh:issue:`605`: fix a deadlock managing a shared counter
@ -147,15 +154,15 @@ Mitogen for Ansible
* :gh:issue:`615`: streaming is implemented for the :ans:mod:`fetch` and other
actions that transfer files from targets to the controller. Previously files
delivered were sent in one message, requiring them to fit in RAM and be
smaller than an internal message size sanity check. Transfers from controller
to targets have been streaming since 0.2.0.
were sent in one message, requiring them to fit in RAM and be smaller than an
internal message size sanity check. Transfers from controller to targets have
been streaming since 0.2.0.
* :gh:commit:`7ae926b3`: the :ans:mod:`lineinfile` leaks writable temporary
file descriptors since Ansible 2.7.0. When :ans:mod:`~lineinfile` created or
modified a script, and that script was later executed, the execution could
fail with "*text file busy*". Temporary descriptors are now tracked and
cleaned up on exit for all modules.
* :gh:commit:`7ae926b3`: the :ans:mod:`lineinfile` leaked writable temporary
file descriptors between Ansible 2.7.0 and 2.8.2. When :ans:mod:`~lineinfile`
created or modified a script, and that script was later executed, the
execution could fail with "*text file busy*". Temporary descriptors are now
tracked and cleaned up on exit for all modules.
Core Library
@ -265,7 +272,7 @@ Core Library
unidirectional routing, where contexts may optionally only communicate with
parents and never siblings (so that air-gapped networks cannot be
unintentionally bridged) was not inherited when a child was initiated
directly from an another child. This did not effect Ansible, since the
directly from another child. This did not effect Ansible, since the
controller initiates any new child used for routing, only forked tasks are
initiated by children.
@ -305,6 +312,13 @@ bug reports, testing, features and fixes in this release contributed by
`@tho86 <https://github.com/tho86>`_.
.. rubric:: Footnotes
.. [#i615] Peak RSS of controller and target as measured with ``/usr/bin/time
-v ansible-playbook -c local`` using the reproduction supplied in
:gh:issue:`615`.
v0.2.7 (2019-05-19)
-------------------

@ -334,6 +334,14 @@ These signals are used internally by Mitogen.
- Fired when :class:`mitogen.parent.Reaper` detects subprocess has fully
exitted.
* - :py:class:`mitogen.core.Broker`
- ``shutdown``
- Fired after Broker.shutdown() is called, but before ``shutdown`` event
fires. This can be used to trigger any behaviour that relies on the
process remaining intact, as processing of ``shutdown`` races with any
parent sending the child a signal because it is not shutting down in
reasonable time.
* - :py:class:`mitogen.core.Broker`
- ``shutdown``
- Fired after Broker.shutdown() is called.

@ -503,7 +503,7 @@ def set_cloexec(fd):
:func:`mitogen.fork.on_fork`.
"""
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
assert fd > 2
assert fd > 2, 'fd %r <= 2' % (fd,)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
@ -808,7 +808,7 @@ class Message(object):
self.src_id = mitogen.context_id
self.auth_id = mitogen.context_id
vars(self).update(kwargs)
assert isinstance(self.data, BytesType)
assert isinstance(self.data, BytesType), 'Message data is not Bytes'
def pack(self):
return (
@ -1834,7 +1834,8 @@ class DelimitedProtocol(Protocol):
if cont:
self.on_partial_line_received(self._trailer)
else:
assert stream.protocol is not self
assert stream.protocol is not self, \
'stream protocol is no longer %r' % (self,)
stream.protocol.on_receive(broker, self._trailer)
def on_line_received(self, line):
@ -2046,6 +2047,10 @@ class MitogenProtocol(Protocol):
#: :data:`mitogen.parent_ids`.
is_privileged = False
#: Invoked as `on_message(stream, msg)` each message received from the
#: peer.
on_message = None
def __init__(self, router, remote_id, auth_id=None,
local_id=None, parent_ids=None):
self._router = router
@ -2245,12 +2250,12 @@ class Context(object):
return receiver
def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('calling service %s.%s of %r, args: %r',
service_name, method_name, self, kwargs)
if isinstance(service_name, BytesType):
service_name = service_name.encode('utf-8')
elif not isinstance(service_name, UnicodeType):
service_name = service_name.name() # Service.name()
_v and LOG.debug('calling service %s.%s of %r, args: %r',
service_name, method_name, self, kwargs)
tup = (service_name, to_text(method_name), Kwargs(kwargs))
msg = Message.pickled(tup, handle=CALL_SERVICE)
return self.send_async(msg)
@ -2575,6 +2580,7 @@ class Latch(object):
return self._cls_idle_socketpairs.pop() # pop() must be atomic
except IndexError:
rsock, wsock = socket.socketpair()
rsock.setblocking(False)
set_cloexec(rsock.fileno())
set_cloexec(wsock.fileno())
self._cls_all_sockets.extend((rsock, wsock))
@ -2649,9 +2655,8 @@ class Latch(object):
)
e = None
woken = None
try:
woken = list(poller.poll(timeout))
list(poller.poll(timeout))
except Exception:
e = sys.exc_info()[1]
@ -2659,11 +2664,19 @@ class Latch(object):
try:
i = self._sleeping.index((wsock, cookie))
del self._sleeping[i]
if not woken:
raise e or TimeoutError()
got_cookie = rsock.recv(self.COOKIE_SIZE)
try:
got_cookie = rsock.recv(self.COOKIE_SIZE)
except socket.error:
e2 = sys.exc_info()[1]
if e2.args[0] == errno.EAGAIN:
e = TimeoutError()
else:
e = e2
self._cls_idle_socketpairs.append((rsock, wsock))
if e:
raise e
assert cookie == got_cookie, (
"Cookie incorrect; got %r, expected %r" \
@ -2744,8 +2757,7 @@ class Waker(Protocol):
def __init__(self, broker):
self._broker = broker
self._lock = threading.Lock()
self._deferred = []
self._deferred = collections.deque()
def __repr__(self):
return 'Waker(fd=%r/%r)' % (
@ -2758,11 +2770,7 @@ class Waker(Protocol):
"""
Prevent immediate Broker shutdown while deferred functions remain.
"""
self._lock.acquire()
try:
return len(self._deferred)
finally:
self._lock.release()
return len(self._deferred)
def on_receive(self, broker, buf):
"""
@ -2771,14 +2779,12 @@ class Waker(Protocol):
ensure only one byte needs to be pending regardless of queue length.
"""
_vv and IOLOG.debug('%r.on_receive()', self)
self._lock.acquire()
try:
deferred = self._deferred
self._deferred = []
finally:
self._lock.release()
while True:
try:
func, args, kwargs = self._deferred.popleft()
except IndexError:
return
for func, args, kwargs in deferred:
try:
func(*args, **kwargs)
except Exception:
@ -2795,7 +2801,7 @@ class Waker(Protocol):
self.stream.transmit_side.write(b(' '))
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EBADF:
if e.args[0] in (errno.EBADF, errno.EWOULDBLOCK):
raise
broker_shutdown_msg = (
@ -2821,15 +2827,8 @@ class Waker(Protocol):
_vv and IOLOG.debug('%r.defer() [fd=%r]', self,
self.stream.transmit_side.fd)
self._lock.acquire()
try:
should_wake = not self._deferred
self._deferred.append((func, args, kwargs))
finally:
self._lock.release()
if should_wake:
self._wake()
self._deferred.append((func, args, kwargs))
self._wake()
class IoLoggerProtocol(DelimitedProtocol):
@ -3299,6 +3298,8 @@ class Router(object):
# the parent.
if in_stream.protocol.auth_id is not None:
msg.auth_id = in_stream.protocol.auth_id
if in_stream.protocol.on_message is not None:
in_stream.protocol.on_message(in_stream, msg)
# Record the IDs the source ever communicated with.
in_stream.protocol.egress_ids.add(msg.dst_id)
@ -3548,6 +3549,7 @@ class Broker(object):
while self._alive:
self._loop_once()
fire(self, 'before_shutdown')
fire(self, 'shutdown')
self._broker_shutdown()
except Exception:
@ -3625,7 +3627,13 @@ class Dispatcher(object):
policy=has_parent_authority,
)
self._service_recv.notify = self._on_call_service
listen(econtext.broker, 'shutdown', self.recv.close)
listen(econtext.broker, 'shutdown', self._on_broker_shutdown)
def _on_broker_shutdown(self):
if self._service_recv.notify == self._on_call_service:
self._service_recv.notify = None
self.recv.close()
@classmethod
@takes_econtext
@ -3987,4 +3995,3 @@ class ExternalContext(object):
raise
finally:
self.broker.shutdown()
self.broker.join()

@ -79,16 +79,8 @@ def reset_logging_framework():
logging._lock = threading.RLock()
# The root logger does not appear in the loggerDict.
for name in [None] + list(logging.Logger.manager.loggerDict):
for handler in logging.getLogger(name).handlers:
handler.createLock()
root = logging.getLogger()
root.handlers = [
handler
for handler in root.handlers
if not isinstance(handler, mitogen.core.LogHandler)
]
logging.Logger.manager.loggerDict = {}
logging.getLogger().handlers = []
def on_fork():
@ -245,6 +237,8 @@ class Connection(mitogen.parent.Connection):
if childfp.fileno() not in (0, 1, 100):
childfp.close()
mitogen.core.IOLOG.setLevel(logging.INFO)
try:
try:
mitogen.core.ExternalContext(self.get_econtext_config()).main()

@ -1680,6 +1680,7 @@ class Connection(object):
try:
self.proc = self.start_child()
except Exception:
LOG.debug('failed to start child', exc_info=True)
self._fail_connection(sys.exc_info()[1])
return
@ -2230,7 +2231,7 @@ class RouteMonitor(object):
target_name = target_name.decode()
target_id = int(target_id_s)
self.router.context_by_id(target_id).name = target_name
stream = self.router.stream_by_id(msg.auth_id)
stream = self.router.stream_by_id(msg.src_id)
current = self.router.stream_by_id(target_id)
if current and current.protocol.remote_id != mitogen.parent_id:
self._log.error('Cannot add duplicate route to %r via %r, '
@ -2258,7 +2259,7 @@ class RouteMonitor(object):
if registered_stream is None:
return
stream = self.router.stream_by_id(msg.auth_id)
stream = self.router.stream_by_id(msg.src_id)
if registered_stream != stream:
self._log.error('received DEL_ROUTE for %d from %r, expected %r',
target_id, stream, registered_stream)

@ -324,13 +324,13 @@ class Select(object):
if not self._receivers:
raise Error(self.empty_msg)
event = Event()
while True:
recv = self._latch.get(timeout=timeout, block=block)
try:
if isinstance(recv, Select):
event = recv.get_event(block=False)
else:
event = Event()
event.source = recv
event.data = recv.get(block=False)
if self._oneshot:

@ -1,3 +1,4 @@
lib/modules/custom_binary_producing_junk
lib/modules/custom_binary_producing_json
hosts/*.local
gcloud

@ -2,6 +2,7 @@
inventory = hosts
gathering = explicit
strategy_plugins = ../../ansible_mitogen/plugins/strategy
inventory_plugins = lib/inventory
action_plugins = lib/action
callback_plugins = lib/callback
stdout_callback = nice_stdout

@ -1,2 +0,0 @@
terraform.tfstate*
.terraform

@ -1,3 +0,0 @@
default:
terraform fmt

@ -1,6 +0,0 @@
# Command line.
````
time LANG=C LC_ALL=C ANSIBLE_STRATEGY=mitogen MITOGEN_GCLOUD_GROUP=debops_all_hosts debops common
```

@ -1,8 +0,0 @@
[defaults]
strategy_plugins = ../../../ansible_mitogen/plugins/strategy
strategy = mitogen
inventory = hosts
retry_files_enabled = False
host_key_checking = False
callback_plugins = ../lib/callback
stdout_callback = nice_stdout

@ -1,159 +0,0 @@
- hosts: all
become: true
tasks:
- apt: name={{item}} state=installed
with_items:
- openvpn
- tcpdump
- python-pip
- python-virtualenv
- strace
- libldap2-dev
- linux-perf
- libsasl2-dev
- build-essential
- git
- rsync
- file:
path: /etc/openvpn
state: directory
- copy:
dest: /etc/openvpn/secret
mode: '0600'
content: |
-----BEGIN OpenVPN Static key V1-----
f94005e4206828e281eb397aefd69b37
ebe6cd39057d5641c5d8dd539cd07651
557d94d0077852bd8f92b68bef927169
c5f0e42ac962a2cbbed35e107ffa0e71
1a2607c6bcd919ec5846917b20eb6684
c7505152815d6ed7b4420714777a3d4a
8edb27ca81971cba7a1e88fe3936e13b
85e9be6706a30cd1334836ed0f08e899
78942329a330392dff42e4570731ac24
9330358aaa6828c07ecb41fb9c498a89
1e0435c5a45bfed390cd2104073634ef
b00f9fae1d3c49ef5de51854103edac9
5ff39c9dfc66ae270510b2ffa74d87d2
9d4b3844b1e1473237bc6dc78fb03e2e
643ce58e667a532efceec7177367fb37
a16379a51e0a8c8e3ec00a59952b79d4
-----END OpenVPN Static key V1-----
- copy:
dest: /etc/openvpn/k3.conf
content: |
remote k3.botanicus.net
dev tun
ifconfig 10.18.0.1 10.18.0.2
secret secret
- shell: systemctl enable openvpn@k3.service
- shell: systemctl start openvpn@k3.service
- lineinfile:
line: "{{item}}"
path: /etc/sysctl.conf
register: sysctl_conf
with_items:
- "net.ipv4.ip_forward=1"
- "kernel.perf_event_paranoid=-1"
- shell: /sbin/sysctl -p
when: sysctl_conf.changed
- copy:
dest: /etc/rc.local
mode: "0744"
content: |
#!/bin/bash
iptables -t nat -F;
iptables -t nat -X;
iptables -t nat -A POSTROUTING -j MASQUERADE;
- shell: systemctl daemon-reload
- shell: systemctl enable rc-local
- shell: systemctl start rc-local
- hosts: all
vars:
git_username: '{{ lookup("pipe", "git config --global user.name") }}'
git_email: '{{ lookup("pipe", "git config --global user.email") }}'
tasks:
- copy:
src: ~/.ssh/id_gitlab
dest: ~/.ssh/id_gitlab
mode: 0600
- template:
dest: ~/.ssh/config
src: ssh_config.j2
- shell: "rsync -a ~/.ssh {{inventory_hostname}}:"
connection: local
- shell: |
git config --global user.email "{{git_username}}"
git config --global user.name "{{git_email}}"
name: set_git_config
- git:
dest: ~/mitogen
repo: https://github.com/dw/mitogen.git
version: dmw
- git:
dest: ~/ansible
repo: https://github.com/ansible/ansible.git
#version: dmw
- pip:
virtualenv: ~/venv
requirements: ~/mitogen/dev_requirements.txt
- pip:
virtualenv: ~/venv
editable: true
name: ~/mitogen
- pip:
virtualenv: ~/venv
editable: true
name: ~/ansible
- pip:
virtualenv: ~/venv
name: debops
- lineinfile:
line: "source $HOME/venv/bin/activate"
path: ~/.profile
- name: debops-init
shell: ~/venv/bin/debops-init ~/prj
args:
creates: ~/prj
- name: grpvars
copy:
dest: "{{ansible_user_dir}}/prj/ansible/inventory/group_vars/all/dhparam.yml"
content: |
---
dhparam__bits: [ '256' ]
- blockinfile:
path: ~/prj/.debops.cfg
insertafter: '\[ansible defaults\]'
block: |
strategy_plugins = {{ansible_user_dir}}/mitogen/ansible_mitogen/plugins/strategy
forks = 50
host_key_checking = False
- file:
path: ~/prj/ansible/inventory/gcloud.py
state: link
src: ~/mitogen/tests/ansible/lib/inventory/gcloud.py

@ -1,2 +0,0 @@
[controller]
c

@ -1,149 +0,0 @@
variable "node-count" {
default = 0
}
variable "preemptible" {
default = true
}
variable "big" {
default = false
}
provider "google" {
project = "mitogen-load-testing"
region = "europe-west1"
zone = "europe-west1-d"
}
resource "google_compute_instance" "controller" {
name = "ansible-controller"
machine_type = "${var.big ? "n1-highcpu-32" : "custom-1-1024"}"
allow_stopping_for_update = true
can_ip_forward = true
boot_disk {
initialize_params {
image = "debian-cloud/debian-9"
}
}
scheduling {
preemptible = true
automatic_restart = false
}
network_interface {
subnetwork = "${google_compute_subnetwork.loadtest-subnet.self_link}"
access_config = {}
}
provisioner "local-exec" {
command = <<-EOF
ip=${google_compute_instance.controller.network_interface.0.access_config.0.nat_ip};
ssh-keygen -R $ip;
ssh-keyscan $ip >> ~/.ssh/known_hosts;
sed -ri -e "s/.*CONTROLLER_IP_HERE.*/ Hostname $ip/" ~/.ssh/config;
ansible-playbook -i $ip, controller.yml
EOF
}
}
resource "google_compute_network" "loadtest" {
name = "loadtest"
auto_create_subnetworks = false
}
resource "google_compute_subnetwork" "loadtest-subnet" {
name = "loadtest-subnet"
ip_cidr_range = "10.19.0.0/16"
network = "${google_compute_network.loadtest.id}"
}
resource "google_compute_firewall" "allow-all-in" {
name = "allow-all-in"
network = "${google_compute_network.loadtest.name}"
direction = "INGRESS"
allow {
protocol = "all"
}
}
resource "google_compute_firewall" "allow-all-out" {
name = "allow-all-out"
network = "${google_compute_network.loadtest.name}"
direction = "EGRESS"
allow {
protocol = "all"
}
}
resource "google_compute_route" "route-nodes-via-controller" {
name = "route-nodes-via-controller"
dest_range = "0.0.0.0/0"
network = "${google_compute_network.loadtest.name}"
next_hop_instance = "${google_compute_instance.controller.self_link}"
next_hop_instance_zone = "${google_compute_instance.controller.zone}"
priority = 800
tags = ["node"]
}
resource "google_compute_instance_template" "node" {
name = "node"
tags = ["node"]
machine_type = "custom-1-1024"
scheduling {
preemptible = "${var.preemptible}"
automatic_restart = false
}
disk {
source_image = "debian-cloud/debian-9"
auto_delete = true
boot = true
}
network_interface {
subnetwork = "${google_compute_subnetwork.loadtest-subnet.self_link}"
}
}
#
# Compute Engine tops out at 1000 VMs per group
#
resource "google_compute_instance_group_manager" "nodes-a" {
name = "nodes-a"
base_instance_name = "node"
instance_template = "${google_compute_instance_template.node.self_link}"
target_size = "${var.node-count / 4}"
}
resource "google_compute_instance_group_manager" "nodes-b" {
name = "nodes-b"
base_instance_name = "node"
instance_template = "${google_compute_instance_template.node.self_link}"
target_size = "${var.node-count / 4}"
}
resource "google_compute_instance_group_manager" "nodes-c" {
name = "nodes-c"
base_instance_name = "node"
instance_template = "${google_compute_instance_template.node.self_link}"
target_size = "${var.node-count / 4}"
}
resource "google_compute_instance_group_manager" "nodes-d" {
name = "nodes-d"
base_instance_name = "node"
instance_template = "${google_compute_instance_template.node.self_link}"
target_size = "${var.node-count / 4}"
}

@ -1 +0,0 @@
google-api-python-client==1.6.5

@ -1,19 +0,0 @@
[defaults]
inventory = hosts,~/mitogen/tests/ansible/lib/inventory
gathering = explicit
strategy_plugins = ~/mitogen/ansible_mitogen/plugins/strategy
action_plugins = ~/mitogen/tests/ansible/lib/action
callback_plugins = ~/mitogen/tests/ansible/lib/callback
stdout_callback = nice_stdout
vars_plugins = ~/mitogen/tests/ansible/lib/vars
library = ~/mitogen/tests/ansible/lib/modules
retry_files_enabled = False
forks = 50
strategy = mitogen_linear
host_key_checking = False
[ssh_connection]
ssh_args = -o ForwardAgent=yes -o ControlMaster=auto -o ControlPersist=60s
pipelining = True

@ -1,6 +0,0 @@
Host localhost-*
Hostname localhost
Host gitlab.com
IdentityFile ~/.ssh/id_gitlab

@ -1,6 +1,6 @@
- name: integration/runner/missing_module.yml
hosts: test-targets
hosts: test-targets[0]
connection: local
tasks:
- connection: local

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import os
import io
import os
import sys
from ansible import constants as C
from ansible.module_utils import six
@ -15,6 +16,27 @@ try:
except KeyError:
pprint = None
DefaultModule = callback_loader.get('default', class_only=True)
DOCUMENTATION = '''
callback: nice_stdout
type: stdout
options:
check_mode_markers:
name: Show markers when running in check mode
description:
- "Toggle to control displaying markers when running in check mode. The markers are C(DRY RUN)
at the beggining and ending of playbook execution (when calling C(ansible-playbook --check))
and C(CHECK MODE) as a suffix at every play and task that is run in check mode."
type: bool
default: no
version_added: 2.9
env:
- name: ANSIBLE_CHECK_MODE_MARKERS
ini:
- key: check_mode_markers
section: defaults
'''
def printi(tio, obj, key=None, indent=0):
def write(s, *args):
@ -51,8 +73,6 @@ def printi(tio, obj, key=None, indent=0):
write('%r', obj)
DefaultModule = callback_loader.get('default', class_only=True)
class CallbackModule(DefaultModule):
def _dump_results(self, result, *args, **kwargs):
try:

@ -1,49 +0,0 @@
#!/usr/bin/env python
import json
import os
import sys
if (not os.environ.get('MITOGEN_GCLOUD_GROUP')) or any('--host' in s for s in sys.argv):
sys.stdout.write('{}')
sys.exit(0)
import googleapiclient.discovery
def main():
project = 'mitogen-load-testing'
zone = 'europe-west1-d'
prefix = 'node-'
client = googleapiclient.discovery.build('compute', 'v1')
resp = client.instances().list(project=project, zone=zone).execute()
ips = []
for inst in resp['items']:
if inst['status'] == 'RUNNING' and inst['name'].startswith(prefix):
ips.extend(
#bytes(config['natIP'])
bytes(interface['networkIP'])
for interface in inst['networkInterfaces']
#for config in interface['accessConfigs']
)
sys.stderr.write('Addresses: %s\n' % (ips,))
gname = os.environ['MITOGEN_GCLOUD_GROUP']
groups = {
gname: {
'hosts': ips
}
}
for i in 1, 10, 20, 50, 100:
groups['%s-%s' % (gname, i)] = {
'hosts': ips[:i]
}
sys.stdout.write(json.dumps(groups, indent=4))
if __name__ == '__main__':
main()

@ -6,6 +6,8 @@
any_errors_fatal: true
gather_facts: true
tasks:
- meta: end_play
when: ansible_version.full < '2.6'
# Copy the naughty 'ansible' into place.
- copy:
@ -13,7 +15,7 @@
src: ansible.py
# Restart the connection.
- mitogen_shutdown_all:
- meta: reset_connection
- custom_python_detect_environment:
register: env

@ -1 +0,0 @@
*.tar.bz2 filter=lfs diff=lfs merge=lfs -text

@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:123ddbd9055745d37e8f14bf1c8352541ff4d500e6daa4aa3165e604fb7e8b6a
size 6176131

@ -193,7 +193,7 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase):
sem = mitogen.core.Latch()
router.add_handler(sem.put)
log = testlib.LogCapturer('mitogen')
log = testlib.LogCapturer()
log.start()
# Force a crash and ensure it wakes up.

@ -54,6 +54,15 @@ if faulthandler is not None:
faulthandler.enable()
#
# Temporary hack: Operon changed logging somewhat, and this broke LogCapturer /
# log_handler_test.
#
mitogen.core.LOG.propagate = True
def get_fd_count():
"""
Return the number of FDs open by this process.

Loading…
Cancel
Save