Merge remote-tracking branch 'origin/026' into stable

* origin/026:
  docs: update Changelog for release.
  Bump version for release.
  issue #555: ansible: workaround ancient reload(sys) hack.
  issue #554: mitogen_action_script fix
  issue #554: fix Ansible 2.4 compatibility
  issue #554: don't rely on tmp_path autoremoval in test.
  issue #554: track and remove multiple make_tmp_path() calls.
  docs: update Changelog.
  docs: drastically simplify install/changelog.
  issue #552: include process identity in log messages.
  issue #550: update Changelog.
  issue #550: parent: add explanatory comment.
  issue #550: fix up TTY ioctls on WSL 2016 Anniversary Update
  docs: update Changelog.
  service: make service list optional.
  docs: update Changelog; closes #548.
  issue #548: always treat transport=smart as 'ssh' for mitogen_via=.
  docs: better intro paragraph.
  .ci: copy private key file to tempdir.
  os_fork: more doc tweaks
  os_fork: more doc tweaks
  os_fork: yet more doc tidyup
  os_fork: more doc tweaks
  os_fork: clean up docs
  .ci: import soak scripts.
  .ci: allow containers for different jobs to run simultaneously
  os_fork: python 3 fixes and tests.
  issue #535: activate Corker on 2.4 in master too.
  issue #535: update Changelog.
  issue #535: wire mitogen.os_fork into Broker and Pool.
  issue #535: parent: add create_socketpair(size=..) parameter.
  issue #535: introduce mitogen.os_fork module and Corker class.
  issue #535: docs: update Changelog
  issue #535: service: support Pool.defer() like Broker.defer()
  issue #535: core: unicode.encode() may take importer lock on 2.x
  issue #535: docs: fix up Select doc
  issue #535: docs: update Changelog.
  issue #535: core/select: support selecting from Latches.
  core: increase cookie field lengths to 64-bit; closes #545.
  tests: ensure serialization restrictions are in effect
  tests/bench: set process affinity in throughput.py.
  docs: update copyright year.
  docs: update Changelog.
  core: Make Latch.put(obj=) optional.
  docs: change 'unreleased' Changelog format and add a hint.
  docs: update Changelog; closes #542.
  issue #542: return of select poller, new selection logic
  issue #542: .ci: move some tests to Azure and enable Mac job.
  ansible: create stub __init__.py for sdist.
pull/862/head v0.2.6
David Wilson 7 years ago
commit 407307adf6

@ -0,0 +1,20 @@
parameters:
name: ''
pool: ''
sign: false
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '$(python.version)'
architecture: 'x64'
- script: .ci/prep_azure.py
displayName: "Install requirements."
- script: .ci/$(MODE)_install.py
displayName: "Install requirements."
- script: .ci/$(MODE)_tests.py
displayName: Run tests.

@ -5,79 +5,85 @@
jobs:
- job: 'MitogenTests'
- job: Mac
steps:
- template: azure-pipelines-steps.yml
pool:
vmImage: 'Ubuntu 16.04'
vmImage: macOS-10.13
strategy:
matrix:
Mitogen27Debian_27:
Mito27_27:
python.version: '2.7'
MODE: mitogen
DISTRO: debian
MitogenPy27CentOS6_26:
- job: Linux
pool:
vmImage: "Ubuntu 16.04"
steps:
- template: azure-pipelines-steps.yml
strategy:
matrix:
#
# Confirmed working
#
Mito27Debian_27:
python.version: '2.7'
MODE: mitogen
DISTRO: centos6
DISTRO: debian
#Py26CentOS7:
#MitoPy27CentOS6_26:
#python.version: '2.7'
#MODE: mitogen
#DISTRO: centos6
Mitogen36CentOS6_26:
Mito36CentOS6_26:
python.version: '3.6'
MODE: mitogen
DISTRO: centos6
DebOps_2460_27_27:
python.version: '2.7'
MODE: debops_common
VER: 2.4.6.0
DebOps_262_36_27:
python.version: '3.6'
MODE: debops_common
VER: 2.6.2
Ansible_2460_26:
python.version: '2.7'
MODE: ansible
VER: 2.4.6.0
#
#
#
Ansible_262_26:
python.version: '2.7'
MODE: ansible
VER: 2.6.2
#Py26CentOS7:
#python.version: '2.7'
#MODE: mitogen
#DISTRO: centos6
Ansible_2460_36:
python.version: '3.6'
MODE: ansible
VER: 2.4.6.0
#DebOps_2460_27_27:
#python.version: '2.7'
#MODE: debops_common
#VER: 2.4.6.0
Ansible_262_36:
python.version: '3.6'
MODE: ansible
VER: 2.6.2
#DebOps_262_36_27:
#python.version: '3.6'
#MODE: debops_common
#VER: 2.6.2
Vanilla_262_27:
python.version: '2.7'
MODE: ansible
VER: 2.6.2
DISTROS: debian
STRATEGY: linear
#Ansible_2460_26:
#python.version: '2.7'
#MODE: ansible
#VER: 2.4.6.0
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '$(python.version)'
architecture: 'x64'
#Ansible_262_26:
#python.version: '2.7'
#MODE: ansible
#VER: 2.6.2
- script: .ci/prep_azure.py
displayName: "Install requirements."
#Ansible_2460_36:
#python.version: '3.6'
#MODE: ansible
#VER: 2.4.6.0
- script: .ci/$(MODE)_install.py
displayName: "Install requirements."
#Ansible_262_36:
#python.version: '3.6'
#MODE: ansible
#VER: 2.6.2
- script: .ci/$(MODE)_tests.py
displayName: Run tests.
#Vanilla_262_27:
#python.version: '2.7'
#MODE: ansible
#VER: 2.6.2
#DISTROS: debian
#STRATEGY: linear

@ -43,6 +43,18 @@ if not hasattr(subprocess, 'check_output'):
subprocess.check_output = subprocess__check_output
# ------------------
def have_apt():
proc = subprocess.Popen('apt --help >/dev/null 2>/dev/null', shell=True)
return proc.wait() == 0
def have_docker():
proc = subprocess.Popen('docker info >/dev/null 2>/dev/null', shell=True)
return proc.wait() == 0
# -----------------
# Force stdout FD 1 to be a pipe, so tools like pip don't spam progress bars.
@ -134,6 +146,17 @@ TARGET_COUNT = int(os.environ.get('TARGET_COUNT', '2'))
BASE_PORT = 2200
TMP = TempDir().path
# We copy this out of the way to avoid random stuff modifying perms in the Git
# tree (like git pull).
src_key_file = os.path.join(GIT_ROOT,
'tests/data/docker/mitogen__has_sudo_pubkey.key')
key_file = os.path.join(TMP,
'mitogen__has_sudo_pubkey.key')
shutil.copyfile(src_key_file, key_file)
os.chmod(key_file, int('0600', 8))
os.environ['PYTHONDONTWRITEBYTECODE'] = 'x'
os.environ['PYTHONPATH'] = '%s:%s' % (
os.environ.get('PYTHONPATH', ''),
@ -153,7 +176,7 @@ def image_for_distro(distro):
return 'mitogen/%s-test' % (distro.partition('-')[0],)
def make_containers():
def make_containers(name_prefix='', port_offset=0):
docker_hostname = get_docker_hostname()
firstbit = lambda s: (s+'-').split('-')[0]
secondbit = lambda s: (s+'-').split('-')[1]
@ -171,9 +194,9 @@ def make_containers():
for x in range(count):
lst.append({
"distro": firstbit(distro),
"name": "target-%s-%s" % (distro, i),
"name": name_prefix + ("target-%s-%s" % (distro, i)),
"hostname": docker_hostname,
"port": BASE_PORT + i,
"port": BASE_PORT + i + port_offset,
"python_path": (
'/usr/bin/python3'
if secondbit(distro) == 'py3'
@ -195,6 +218,8 @@ def start_containers(containers):
"docker run "
"--rm "
"--detach "
"--privileged "
"--cap-add=SYS_PTRACE "
"--publish 0.0.0.0:%(port)s:22/tcp "
"--hostname=%(name)s "
"--name=%(name)s "

@ -2,6 +2,7 @@
from __future__ import print_function
import os
import shutil
import ci_lib
@ -10,17 +11,13 @@ import ci_lib
ci_lib.DISTROS = ['debian'] * ci_lib.TARGET_COUNT
project_dir = os.path.join(ci_lib.TMP, 'project')
key_file = os.path.join(
ci_lib.GIT_ROOT,
'tests/data/docker/mitogen__has_sudo_pubkey.key',
)
vars_path = 'ansible/inventory/group_vars/debops_all_hosts.yml'
inventory_path = 'ansible/inventory/hosts'
docker_hostname = ci_lib.get_docker_hostname()
with ci_lib.Fold('docker_setup'):
containers = ci_lib.make_containers()
containers = ci_lib.make_containers(port_offset=500, name_prefix='debops-')
ci_lib.start_containers(containers)
@ -36,7 +33,6 @@ with ci_lib.Fold('job_setup'):
% (ci_lib.GIT_ROOT,)
)
ci_lib.run('chmod go= %s', key_file)
with open(vars_path, 'w') as fp:
fp.write(
"ansible_python_interpreter: /usr/bin/python2.7\n"
@ -47,7 +43,7 @@ with ci_lib.Fold('job_setup'):
"\n"
# Speed up slow DH generation.
"dhparam__bits: ['128', '64']\n"
% (key_file,)
% (ci_lib.key_file,)
)
with open(inventory_path, 'a') as fp:

@ -6,10 +6,12 @@ batches = [
[
'pip install "pycparser<2.19" "idna<2.7"',
'pip install -r tests/requirements.txt',
],
[
'docker pull %s' % (ci_lib.image_for_distro(ci_lib.DISTRO),),
]
]
if ci_lib.have_docker():
batches.append([
'docker pull %s' % (ci_lib.image_for_distro(ci_lib.DISTRO),),
])
ci_lib.run_batches(batches)

@ -11,4 +11,7 @@ os.environ.update({
'SKIP_ANSIBLE': '1',
})
if not ci_lib.have_docker():
os.environ['SKIP_DOCKER_TESTS'] = '1'
ci_lib.run('./run_tests -v')

@ -1,22 +1,30 @@
#!/usr/bin/env python
import os
import sys
import ci_lib
batches = []
batches.append([
'echo force-unsafe-io | sudo tee /etc/dpkg/dpkg.cfg.d/nosync',
'sudo add-apt-repository ppa:deadsnakes/ppa',
'sudo apt-get update',
'sudo apt-get -y install python2.6 python2.6-dev libsasl2-dev libldap2-dev',
])
batches.append([
'pip install -r dev_requirements.txt',
])
batches.extend(
['docker pull %s' % (ci_lib.image_for_distro(distro),)]
for distro in ci_lib.DISTROS
)
if ci_lib.have_apt():
batches.append([
'echo force-unsafe-io | sudo tee /etc/dpkg/dpkg.cfg.d/nosync',
'sudo add-apt-repository ppa:deadsnakes/ppa',
'sudo apt-get update',
'sudo apt-get -y install python2.6 python2.6-dev libsasl2-dev libldap2-dev',
])
#batches.append([
#'pip install -r dev_requirements.txt',
#])
if ci_lib.have_docker():
batches.extend(
['docker pull %s' % (ci_lib.image_for_distro(distro),)]
for distro in ci_lib.DISTROS
)
ci_lib.run_batches(batches)

@ -0,0 +1,16 @@
#!/bin/bash
export NOCOVERAGE=1
# Make Docker containers once.
/usr/bin/time -v ./.ci/debops_common_tests.py "$@" || break
export KEEP=1
i=0
while :
do
i=$((i + 1))
/usr/bin/time -v ./.ci/debops_common_tests.py "$@" || break
done
echo $i

@ -0,0 +1,17 @@
#!/bin/bash
export NOCOVERAGE=1
export DISTROS="debian*4"
# Make Docker containers once.
/usr/bin/time -v ./.ci/ansible_tests.py "$@"
export KEEP=1
i=0
while :
do
i=$((i + 1))
/usr/bin/time -v ./.ci/ansible_tests.py "$@" || break
done
echo $i

@ -0,0 +1,12 @@
#!/bin/bash
export NOCOVERAGE=1
i=0
while :
do
i=$((i + 1))
/usr/bin/time -v ./.ci/mitogen_py24_tests.py "$@" || break
done
echo $i

@ -27,9 +27,7 @@ matrix:
# 2.4 -> 2.4
- language: c
env: MODE=mitogen_py24 DISTRO=centos5
# 2.7 -> 2.7
- python: "2.7"
env: MODE=mitogen DISTRO=debian
# 2.7 -> 2.7 -- moved to Azure
# 2.7 -> 2.6
#- python: "2.7"
#env: MODE=mitogen DISTRO=centos6
@ -39,9 +37,7 @@ matrix:
# 2.6 -> 3.5
- python: "2.6"
env: MODE=mitogen DISTRO=debian-py3
# 3.6 -> 2.6
- python: "3.6"
env: MODE=mitogen DISTRO=centos6
# 3.6 -> 2.6 -- moved to Azure
# Debops tests.
# 2.4.6.0; 2.7 -> 2.7

@ -33,7 +33,6 @@ import errno
import logging
import os
import pprint
import random
import stat
import sys
import time
@ -356,6 +355,7 @@ CONNECTION_METHOD = {
'machinectl': _connect_machinectl,
'setns': _connect_setns,
'ssh': _connect_ssh,
'smart': _connect_ssh, # issue #548.
'su': _connect_su,
'sudo': _connect_sudo,
'doas': _connect_doas,
@ -702,35 +702,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self._connect()
return self.init_child_result['good_temp_dir']
def _generate_tmp_path(self):
return os.path.join(
self.get_good_temp_dir(),
'ansible_mitogen_action_%016x' % (
random.getrandbits(8*8),
)
)
def _make_tmp_path(self):
assert getattr(self._shell, 'tmpdir', None) is None
self._shell.tmpdir = self._generate_tmp_path()
LOG.debug('Temporary directory: %r', self._shell.tmpdir)
self.get_chain().call_no_reply(os.mkdir, self._shell.tmpdir)
return self._shell.tmpdir
def _reset_tmp_path(self):
"""
Called by _mitogen_reset(); ask the remote context to delete any
temporary directory created for the action. CallChain is not used here
to ensure exception is logged by the context on failure, since the
CallChain itself is about to be destructed.
"""
if getattr(self._shell, 'tmpdir', None) is not None:
self.context.call_no_reply(
ansible_mitogen.target.prune_tree,
self._shell.tmpdir,
)
self._shell.tmpdir = None
def _connect(self):
"""
Establish a connection to the master process's UNIX listener socket,
@ -762,7 +733,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
if not self.context:
return
self._reset_tmp_path()
self.chain.reset()
self.parent.call_service(
service_name='ansible_mitogen.services.ContextService',

@ -40,6 +40,25 @@ except ImportError:
display = Display()
#: The process name set via :func:`set_process_name`.
_process_name = None
#: The PID of the process that last called :func:`set_process_name`, so its
#: value can be ignored in unknown fork children.
_process_pid = None
def set_process_name(name):
"""
Set a name to adorn log messages with.
"""
global _process_name
_process_name = name
global _process_pid
_process_pid = os.getpid()
class Handler(logging.Handler):
"""
Use Mitogen's log format, but send the result to a Display method.
@ -65,7 +84,12 @@ class Handler(logging.Handler):
if mitogen_name in self.NOISY_LOGGERS and record.levelno >= logging.WARNING:
record.levelno = logging.DEBUG
s = '[pid %d] %s' % (os.getpid(), self.format(record))
if _process_pid == os.getpid():
process_name = _process_name
else:
process_name = '?'
s = '[%-4s %d] %s' % (process_name, os.getpid(), self.format(record))
if record.levelno >= logging.ERROR:
display.error(s, wrap_text=False)
elif record.levelno >= logging.WARNING:

@ -30,6 +30,7 @@ from __future__ import absolute_import
import logging
import os
import pwd
import random
import traceback
try:
@ -173,26 +174,48 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
"""
assert False, "_is_pipelining_enabled() should never be called."
def _generate_tmp_path(self):
return os.path.join(
self._connection.get_good_temp_dir(),
'ansible_mitogen_action_%016x' % (
random.getrandbits(8*8),
)
)
def _generate_tmp_path(self):
return os.path.join(
self._connection.get_good_temp_dir(),
'ansible_mitogen_action_%016x' % (
random.getrandbits(8*8),
)
)
def _make_tmp_path(self, remote_user=None):
"""
Return the directory created by the Connection instance during
connection.
Create a temporary subdirectory as a child of the temporary directory
managed by the remote interpreter.
"""
LOG.debug('_make_tmp_path(remote_user=%r)', remote_user)
return self._connection._make_tmp_path()
path = self._generate_tmp_path()
LOG.debug('Temporary directory: %r', path)
self._connection.get_chain().call_no_reply(os.mkdir, path)
self._connection._shell.tmpdir = path
return path
def _remove_tmp_path(self, tmp_path):
"""
Stub out the base implementation's invocation of rm -rf, replacing it
with nothing, as the persistent interpreter automatically cleans up
after itself without introducing roundtrips.
Replace the base implementation's invocation of rm -rf, replacing it
with a pipelined call to :func:`ansible_mitogen.target.prune_tree`.
"""
# The actual removal is pipelined by Connection.close().
LOG.debug('_remove_tmp_path(%r)', tmp_path)
# Upstream _remove_tmp_path resets shell.tmpdir here, however
# connection.py uses that as the sole location of the temporary
# directory, if one exists.
# self._connection._shell.tmpdir = None
if tmp_path is None and ansible.__version__ > '2.6':
tmp_path = self._connection._shell.tmpdir # 06f73ad578d
if tmp_path is not None:
self._connection.get_chain().call_no_reply(
ansible_mitogen.target.prune_tree,
tmp_path,
)
self._connection._shell.tmpdir = None
def _transfer_data(self, remote_path, data):
"""
@ -331,7 +354,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
self._temp_file_gibberish(module_args, wrap_async)
self._connection._connect()
return ansible_mitogen.planner.invoke(
result = ansible_mitogen.planner.invoke(
ansible_mitogen.planner.Invocation(
action=self,
connection=self._connection,
@ -345,6 +368,14 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
)
)
if ansible.__version__ < '2.5' and delete_remote_tmp and \
getattr(self._connection._shell, 'tmpdir', None) is not None:
# Built-in actions expected tmpdir to be cleaned up automatically
# on _execute_module().
self._remove_tmp_path(self._connection._shell.tmpdir)
return result
def _postprocess_response(self, result):
"""
Apply fixups mimicking ActionBase._execute_module(); this is copied

@ -185,19 +185,21 @@ class MuxProcess(object):
cls.profiling = os.environ.get('MITOGEN_PROFILING') is not None
if cls.profiling:
mitogen.core.enable_profiling()
if _init_logging:
ansible_mitogen.logging.setup()
cls.original_env = dict(os.environ)
cls.child_pid = os.fork()
if _init_logging:
ansible_mitogen.logging.setup()
if cls.child_pid:
save_pid('controller')
ansible_mitogen.logging.set_process_name('top')
ansible_mitogen.affinity.policy.assign_controller()
cls.child_sock.close()
cls.child_sock = None
mitogen.core.io_op(cls.worker_sock.recv, 1)
else:
save_pid('mux')
ansible_mitogen.logging.set_process_name('mux')
ansible_mitogen.affinity.policy.assign_muxprocess()
cls.worker_sock.close()
cls.worker_sock = None

@ -40,6 +40,7 @@ import atexit
import codecs
import imp
import os
import re
import shlex
import shutil
import sys
@ -806,11 +807,20 @@ class NewStyleRunner(ScriptRunner):
def _setup_args(self):
pass
# issue #555: in old times it was considered good form to reload sys and
# change the default encoding. This hack was removed from Ansible long ago,
# but not before permeating into many third party modules.
PREHISTORIC_HACK_RE = re.compile(
b(r'reload\s*\(\s*sys\s*\)\s*'
r'sys\s*\.\s*setdefaultencoding\([^)]+\)')
)
def _setup_program(self):
self.source = ansible_mitogen.target.get_small_file(
source = ansible_mitogen.target.get_small_file(
context=self.service_context,
path=self.path,
)
self.source = self.PREHISTORIC_HACK_RE.sub(b(''), source)
def _get_code(self):
try:

@ -108,6 +108,7 @@ def wrap_worker__run(*args, **kwargs):
if mitogen.core._profile_hook.__name__ != '_profile_hook':
signal.signal(signal.SIGTERM, signal.SIG_IGN)
ansible_mitogen.logging.set_process_name('task')
ansible_mitogen.affinity.policy.assign_worker()
return mitogen.core._profile_hook('WorkerProcess',
lambda: worker__run(*args, **kwargs)

@ -260,14 +260,6 @@ def prune_tree(path):
LOG.error('prune_tree(%r): %s', path, e)
def _on_broker_shutdown():
"""
Respond to broker shutdown (graceful termination by parent, or loss of
connection to parent) by deleting our sole temporary directory.
"""
prune_tree(temp_dir)
def is_good_temp_dir(path):
"""
Return :data:`True` if `path` can be used as a temporary directory, logging

@ -5,13 +5,14 @@ Mitogen for Ansible
.. image:: images/ansible/ansible_mitogen.svg
:class: mitogen-right-180 mitogen-logo-wrap
An extension to `Ansible`_ is included that implements connections over
Mitogen, replacing embedded shell invocations with pure-Python equivalents
invoked via highly efficient remote procedure calls to persistent interpreters
tunnelled over SSH. No changes are required to target hosts.
**Mitogen for Ansible** is a completely redesigned UNIX connection layer and
module runtime for `Ansible`_. Requiring minimal configuration changes, it
updates Ansible's slow and wasteful shell-centic implementation with
pure-Python equivalents, invoked via highly efficient remote procedure calls to
persistent interpreters tunnelled over SSH. No changes are required to target
hosts.
The extension is stable and real-world use is encouraged. `Bug reports`_ are
welcome: Ansible is huge, and only wide testing will ensure soundness.
The extension is considered stable and real-world use is encouraged.
.. _Ansible: https://www.ansible.com/
@ -56,7 +57,7 @@ write files.
Installation
------------
1. Thoroughly review :ref:`noteworthy_differences` and :ref:`known_issues`.
1. Review :ref:`noteworthy_differences`.
2. Download and extract |mitogen_url|.
3. Modify ``ansible.cfg``:
@ -142,13 +143,29 @@ Testimonials
Noteworthy Differences
----------------------
* Ansible 2.3-2.7 are supported along with Python 2.6, 2.7 or 3.6. Verify your
installation is running one of these versions by checking ``ansible
* Ansible 2.3-2.7 are supported along with Python 2.6, 2.7, 3.6 and 3.7. Verify
your installation is running one of these versions by checking ``ansible
--version`` output.
* The Ansible ``raw`` action executes as a regular Mitogen connection,
precluding its use for installing Python on a target. This will be addressed
soon.
* The ``raw`` action executes as a regular Mitogen connection, which requires
Python on the target, precluding its use for installing Python. This will be
addressed in a future release. For now, simply mix Mitogen and vanilla
Ansible strategies:
.. code-block:: yaml
- hosts: web-servers
strategy: linear
tasks:
- name: Install Python if necessary.
raw: test -e /usr/bin/python || apt install -y python-minimal
- hosts: web-servers
strategy: mitogen_linear
roles:
- nginx
- initech_app
- y2k_fix
* The ``doas``, ``su`` and ``sudo`` become methods are available. File bugs to
register interest in more.
@ -165,43 +182,70 @@ Noteworthy Differences
:ref:`mitogen_su <su>`, :ref:`mitogen_sudo <sudo>`, and :ref:`setns <setns>`
types. File bugs to register interest in others.
* Local commands execute in a reuseable interpreter created identically to
interpreters on targets. Presently one interpreter per ``become_user``
exists, and so only one local action may execute simultaneously.
* Actions are single-threaded for each `(host, user account)` combination,
including actions that execute on the local machine. Playbooks may experience
slowdown compared to vanilla Ansible if they employ long-running
``local_action`` or ``delegate_to`` tasks delegating many target hosts to a
single machine and user account.
Ansible usually permits up to ``forks`` simultaneous local actions. Any
long-running local actions that execute for every target will experience
artificial serialization, causing slowdown equivalent to `task_duration *
num_targets`. This will be fixed soon.
num_targets`. This will be addressed soon.
* "Module Replacer" style modules are not supported. These rarely appear in
practice, and light web searches failed to reveal many examples of them.
* The Ansible 2.7 `reboot
<https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_ module
may require a ``pre_reboot_delay`` on systemd hosts, as insufficient time
exists for the reboot command's exit status to be reported before necessary
processes are torn down.
* On OS X when a SSH password is specified and the default connection type of
``smart`` is used, Ansible may select the Paramiko plug-in rather than
Mitogen. If you specify a password on OS X, ensure ``connection: ssh``
appears in your playbook, ``ansible.cfg``, or as ``-c ssh`` on the
command-line.
* Ansible permits up to ``forks`` connections to be setup in parallel, whereas
in Mitogen this is handled by a fixed-size thread pool. Up to 32 connections
may be established in parallel by default, this can be modified by setting
the ``MITOGEN_POOL_SIZE`` environment variable.
* The ``ansible_python_interpreter`` variable is parsed using a restrictive
:mod:`shell-like <shlex>` syntax, permitting values such as ``/usr/bin/env
FOO=bar python``, which occur in practice. Ansible `documents this
<https://docs.ansible.com/ansible/latest/user_guide/intro_inventory.html#ansible-python-interpreter>`_
as an absolute path, however the implementation passes it unquoted through
the shell, permitting arbitrary code to be injected.
* Performance does not scale linearly with target count. This will improve over
* Performance does not scale cleanly with target count. This will improve over
time.
* SSH and ``become`` are treated distinctly when applying timeouts, and
timeouts apply up to the point when the new interpreter is ready to accept
messages. Ansible has two timeouts: ``ConnectTimeout`` for SSH, applying up
to when authentication completes, and a separate parallel timeout up to when
``become`` authentication completes.
For busy targets, Ansible may successfully execute a module where Mitogen
would fail without increasing the timeout. For sick targets, Ansible may hang
indefinitely after authentication without executing a command, for example
due to a stuck filesystem IO appearing in ``$HOME/.profile``.
* Performance on Python 3 is significantly worse than on Python 2. While this
has not yet been investigated, at least some of the regression appears to be
part of the core library, and should therefore be straightforward to fix as
part of 0.2.x.
..
* SSH and ``become`` are treated distinctly when applying timeouts, and
timeouts apply up to the point when the new interpreter is ready to accept
messages. Ansible has two timeouts: ``ConnectTimeout`` for SSH, applying up
to when authentication completes, and a separate parallel timeout up to
when ``become`` authentication completes.
For busy targets, Ansible may successfully execute a module where Mitogen
would fail without increasing the timeout. For sick targets, Ansible may
hang indefinitely after authentication without executing a command, for
example due to a stuck filesystem IO appearing in ``$HOME/.profile``.
..
* "Module Replacer" style modules are not supported. These rarely appear in
practice, and light web searches failed to reveal many examples of them.
..
* The ``ansible_python_interpreter`` variable is parsed using a restrictive
:mod:`shell-like <shlex>` syntax, permitting values such as ``/usr/bin/env
FOO=bar python``, which occur in practice. Ansible `documents this
<https://docs.ansible.com/ansible/latest/user_guide/intro_inventory.html#ansible-python-interpreter>`_
as an absolute path, however the implementation passes it unquoted through
the shell, permitting arbitrary code to be injected.
..
* Configurations will break that rely on the `hashbang argument splitting
behaviour <https://github.com/ansible/ansible/issues/15635>`_ of the
``ansible_python_interpreter`` setting, contrary to the Ansible
documentation. This will be addressed in a future 0.2 release.
New Features & Notes
@ -252,8 +296,8 @@ container.
``ansible_password``, or ``ansible_become_pass`` inventory variables.
* Automatic tunnelling of SSH-dependent actions, such as the
``synchronize`` module, is not yet supported. This will be added in the
0.3 series.
``synchronize`` module, is not yet supported. This will be addressed in a
future release.
To enable connection delegation, set ``mitogen_via=<inventory name>`` on the
command line, or as host and group variables.

@ -579,6 +579,10 @@ Select Class
.. module:: mitogen.select
.. currentmodule:: mitogen.select
.. autoclass:: Event
:members:
.. autoclass:: Select
:members:
@ -605,6 +609,14 @@ Broker Class
:members:
Fork Safety
===========
.. currentmodule:: mitogen.os_fork
.. autoclass:: Corker
:members:
Utility Functions
=================

@ -15,114 +15,87 @@ Release Notes
</style>
.. _known_issues:
v0.2.7 (unreleased)
-------------------
Known Issues
------------
To avail of fixes in an unreleased version, please download a ZIP file
`directly from GitHub <https://github.com/dw/mitogen/>`_.
Mitogen For Ansible
~~~~~~~~~~~~~~~~~~~
Fixes
~~~~~
*(none yet)*
v0.2.6 (2019-03-06)
-------------------
Fixes
~~~~~
* The Ansible 2.7 `reboot
<https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_ module
may require a ``pre_reboot_delay`` on systemd hosts, as insufficient time
exists for the reboot command's exit status to be reported before necessary
processes are torn down.
* On OS X when a SSH password is specified and the default connection type of
``smart`` is used, Ansible may select the Paramiko plug-in rather than
Mitogen. If you specify a password on OS X, ensure ``connection: ssh``
appears in your playbook, ``ansible.cfg``, or as ``-c ssh`` on the
command-line.
* The ``raw`` action executes as a regular Mitogen connection, which requires
Python on the target, precluding its use for installing Python. This will be
addressed in a future 0.2 release. For now, simply mix Mitogen and vanilla
Ansible strategies in your playbook:
.. code-block:: yaml
- hosts: web-servers
strategy: linear
tasks:
- name: Install Python if necessary.
raw: test -e /usr/bin/python || apt install -y python-minimal
- hosts: web-servers
strategy: mitogen_linear
roles:
- nginx
- initech_app
- y2k_fix
.. * When running with ``-vvv``, log messages will be printed to the console
*after* the Ansible run completes, as connection multiplexer shutdown only
begins after Ansible exits. This is due to a lack of suitable shutdown hook
in Ansible, and is fairly harmless, albeit cosmetically annoying. A future
release may include a solution.
.. * Configurations will break that rely on the `hashbang argument splitting
behaviour <https://github.com/ansible/ansible/issues/15635>`_ of the
``ansible_python_interpreter`` setting, contrary to the Ansible
documentation. This will be addressed in a future 0.2 release.
* Performance does not scale linearly with target count. This requires
significant additional work, as major bottlenecks exist in the surrounding
Ansible code. Performance-related bug reports for any scenario remain
welcome with open arms.
* Performance on Python 3 is significantly worse than on Python 2. While this
has not yet been investigated, at least some of the regression appears to be
part of the core library, and should therefore be straightforward to fix as
part of 0.2.x.
* *Module Replacer* style Ansible modules are not supported.
* Actions are single-threaded for each `(host, user account)` combination,
including actions that execute on the local machine. Playbooks may experience
slowdown compared to vanilla Ansible if they employ long-running
``local_action`` or ``delegate_to`` tasks delegating many target hosts to a
single machine and user account.
* Connection Delegation remains in preview and has bugs around how it infers
connections. Connection establishment will remain single-threaded for the 0.2
series, however connection inference bugs will be addressed in a future 0.2
release.
* Connection Delegation does not support automatic tunnelling of SSH-dependent
actions, such as the ``synchronize`` module. This will be addressed in the
0.3 series.
* `#542 <https://github.com/dw/mitogen/issues/542>`_: some versions of OS X
ship a default Python that does not support :func:`select.poll`. Restore the
0.2.3 behaviour of defaulting to Kqueue in this case, but still prefer
:func:`select.poll` if it is available.
* `#545 <https://github.com/dw/mitogen/issues/545>`_: an optimization
introduced in `#493 <https://github.com/dw/mitogen/issues/493>`_ caused a
64-bit integer to be assigned to a 32-bit field on ARM 32-bit targets,
causing runs to fail.
* `#548 <https://github.com/dw/mitogen/issues/548>`_: `mitogen_via=` could fail
when the selected transport was set to ``smart``.
* `#550 <https://github.com/dw/mitogen/issues/550>`_: avoid some broken
TTY-related `ioctl()` calls on Windows Subsystem for Linux 2016 Anniversary
Update.
* `#554 <https://github.com/dw/mitogen/issues/554>`_: third party Ansible
action plug-ins that invoked :func:`_make_tmp_path` repeatedly could trigger
an assertion failure.
* `#555 <https://github.com/dw/mitogen/issues/555>`_: work around an old idiom
that reloaded :mod:`sys` in order to change the interpreter's default encoding.
* `ffae0355 <https://github.com/dw/mitogen/commit/ffae0355>`_: needless
information was removed from the documentation and installation procedure.
Core Library
~~~~~~~~~~~~
* Serialization is still based on :mod:`pickle`. While there is high confidence
remote code execution is impossible in Mitogen's configuration, an untrusted
context may at least trigger disproportionately high memory usage injecting
small messages (*"billion laughs attack"*). Replacement is an important
future priority, but not critical for an initial release.
* `#535 <https://github.com/dw/mitogen/issues/535>`_: to support function calls
on a service pool from another thread, :class:`mitogen.select.Select`
additionally permits waiting on :class:`mitogen.core.Latch`.
* Child processes are not reliably reaped, leading to a pileup of zombie
processes when a program makes many short-lived connections in a single
invocation. This does not impact Mitogen for Ansible, however it limits the
usefulness of the core library. A future 0.2 release will address it.
* `#535 <https://github.com/dw/mitogen/issues/535>`_:
:class:`mitogen.service.Pool.defer` allows any function to be enqueued for
the thread pool from another thread.
* Some races remain around :class:`mitogen.core.Broker <Broker>` destruction,
disconnection and corresponding file descriptor closure. These are only
problematic in situations where child process reaping is also problematic.
* `#535 <https://github.com/dw/mitogen/issues/535>`_: a new
:mod:`mitogen.os_fork` module provides a :func:`os.fork` wrapper that pauses
thread activity during fork. On Python<2.6, :class:`mitogen.core.Broker` and
:class:`mitogen.service.Pool` automatically record their existence so that a
:func:`os.fork` monkey-patch can automatically pause them for any attempt to
start a subprocess.
* The `fakessh` component does not shut down correctly and requires flow
control added to the design. While minimal fixes are possible, due to the
absence of flow control the original design is functionally incomplete.
* `ca63c26e <https://github.com/dw/mitogen/commit/ca63c26e>`_:
:meth:`mitogen.core.Latch.put`'s `obj` argument was made optional.
* The multi-threaded :ref:`service` remains in a state of design flux and
should be considered obsolete, despite heavy use in Mitogen for Ansible. A
future replacement may be integrated more tightly with, or entirely replace
the RPC dispatcher on the main thread.
* Documentation is in a state of disrepair. This will be improved over the 0.2
series.
Thanks!
~~~~~~~
Mitogen would not be possible without the support of users. A huge thanks for
bug reports, testing, features and fixes in this release contributed by
`Fabian Arrotin <https://github.com/arrfab>`_,
`Giles Westwood <https://github.com/gilesw>`_,
`Matt Layman <https://github.com/mblayman>`_,
`Percy Grunwald <https://github.com/percygrunwald>`_,
`Petr Enkov <https://github.com/enkov>`_,
`Tony Finch <https://github.com/fanf2>`_,
`@elbunda <https://github.com/elbunda>`_, and
`@zyphermonkey <https://github.com/zyphermonkey>`_.
v0.2.5 (2019-02-14)

@ -6,7 +6,7 @@ import mitogen
VERSION = '%s.%s.%s' % mitogen.__version__
author = u'David Wilson'
copyright = u'2018, David Wilson'
copyright = u'2019, David Wilson'
exclude_patterns = ['_build']
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinxcontrib.programoutput']
html_show_sourcelink = False

@ -35,7 +35,7 @@ be expected. On the slave, it is built dynamically during startup.
#: Library version as a tuple.
__version__ = (0, 2, 5)
__version__ = (0, 2, 6)
#: This is :data:`False` in slave contexts. Previously it was used to prevent

@ -103,6 +103,9 @@ IOLOG = logging.getLogger('mitogen.io')
IOLOG.setLevel(logging.INFO)
LATIN1_CODEC = encodings.latin_1.Codec()
# str.encode() may take import lock. Deadlock possible if broker calls
# .encode() on behalf of thread currently waiting for module.
UTF8_CODEC = encodings.latin_1.Codec()
_v = False
_vv = False
@ -137,7 +140,6 @@ try:
except NameError:
BaseException = Exception
IS_WSL = 'Microsoft' in os.uname()[2]
PY24 = sys.version_info < (2, 5)
PY3 = sys.version_info > (3,)
if PY3:
@ -161,6 +163,14 @@ try:
except NameError:
next = lambda it: it.next()
# #550: prehistoric WSL did not advertise itself in uname output.
try:
fp = open('/proc/sys/kernel/osrelease')
IS_WSL = 'Microsoft' in fp.read()
fp.close()
except IOError:
IS_WSL = False
#: Default size for calls to :meth:`Side.read` or :meth:`Side.write`, and the
#: size of buffers configured by :func:`mitogen.parent.create_socketpair`. This
@ -271,9 +281,8 @@ class Kwargs(dict):
def __init__(self, dct):
for k, v in dct.iteritems():
if type(k) is unicode:
self[k.encode()] = v
else:
self[k] = v
k, _ = UTF8_CODEC.encode(k)
self[k] = v
def __repr__(self):
return 'Kwargs(%s)' % (dict.__repr__(self),)
@ -735,7 +744,7 @@ class Message(object):
"""
Syntax helper to construct a dead message.
"""
kwargs['data'] = (reason or u'').encode()
kwargs['data'], _ = UTF8_CODEC.encode(reason or u'')
return cls(reply_to=IS_DEAD, **kwargs)
@classmethod
@ -1092,6 +1101,7 @@ class Importer(object):
'lxd',
'master',
'minify',
'os_fork',
'parent',
'select',
'service',
@ -1332,7 +1342,7 @@ class Importer(object):
if mod.__package__ and not PY3:
# 2.x requires __package__ to be exactly a string.
mod.__package__ = mod.__package__.encode()
mod.__package__, _ = UTF8_CODEC.encode(mod.__package__)
source = self.get_source(fullname)
try:
@ -1912,6 +1922,8 @@ class Poller(object):
Pollers may only be used by one thread at a time.
"""
SUPPORTED = True
# This changed from select() to poll() in Mitogen 0.2.4. Since poll() has
# no upper FD limit, it is suitable for use with Latch, which must handle
# FDs larger than select's limit during many-host runs. We want this
@ -1928,11 +1940,16 @@ class Poller(object):
def __init__(self):
self._rfds = {}
self._wfds = {}
self._pollobj = select.poll()
def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self))
def _update(self, fd):
"""
Required by PollPoller subclass.
"""
pass
@property
def readers(self):
"""
@ -1955,20 +1972,6 @@ class Poller(object):
"""
pass
_readmask = select.POLLIN | select.POLLHUP
# TODO: no proof we dont need writemask too
def _update(self, fd):
mask = (((fd in self._rfds) and self._readmask) |
((fd in self._wfds) and select.POLLOUT))
if mask:
self._pollobj.register(fd, mask)
else:
try:
self._pollobj.unregister(fd)
except KeyError:
pass
def start_receive(self, fd, data=None):
"""
Cause :meth:`poll` to yield `data` when `fd` is readable.
@ -2004,22 +2007,27 @@ class Poller(object):
self._update(fd)
def _poll(self, timeout):
(rfds, wfds, _), _ = io_op(select.select,
self._rfds,
self._wfds,
(), timeout
)
for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if timeout:
timeout *= 1000
events, _ = io_op(self._pollobj.poll, timeout)
for fd, event in events:
if event & self._readmask:
_vv and IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if event & select.POLLOUT:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
def poll(self, timeout=None):
"""
Block the calling thread until one or more FDs are ready for IO.
@ -2053,6 +2061,8 @@ class Latch(object):
"""
poller_class = Poller
notify = None
# The _cls_ prefixes here are to make it crystal clear in the code which
# state mutation isn't covered by :attr:`_lock`.
@ -2142,7 +2152,7 @@ class Latch(object):
return rsock, wsock
COOKIE_MAGIC, = struct.unpack('L', b('LTCH') * (struct.calcsize('L')//4))
COOKIE_FMT = 'Llll'
COOKIE_FMT = '>Qqqq' # #545: id() and get_ident() may exceed long on armhfp.
COOKIE_SIZE = struct.calcsize(COOKIE_FMT)
def _make_cookie(self):
@ -2242,11 +2252,14 @@ class Latch(object):
finally:
self._lock.release()
def put(self, obj):
def put(self, obj=None):
"""
Enqueue an object, waking the first thread waiting for a result, if one
exists.
:param obj:
Object to enqueue. Defaults to :data:`None` as a convenience when
using :class:`Latch` only for synchronization.
:raises mitogen.core.LatchError:
:meth:`close` has been called, and the object is no longer valid.
"""
@ -2263,6 +2276,8 @@ class Latch(object):
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, wsock.fileno())
self._wake(wsock, cookie)
elif self.notify:
self.notify(self)
finally:
self._lock.release()
@ -2832,7 +2847,7 @@ class Broker(object):
#: before force-disconnecting them during :meth:`shutdown`.
shutdown_timeout = 3.0
def __init__(self, poller_class=None):
def __init__(self, poller_class=None, activate_compat=True):
self._alive = True
self._exitted = False
self._waker = Waker(self)
@ -2850,6 +2865,19 @@ class Broker(object):
name='mitogen.broker'
)
self._thread.start()
if activate_compat:
self._py24_25_compat()
def _py24_25_compat(self):
"""
Python 2.4/2.5 have grave difficulties with threads/fork. We
mandatorily quiesce all running threads during fork using a
monkey-patch there.
"""
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner.
os_fork = import_module('mitogen.os_fork')
mitogen.os_fork._notice_broker_or_pool(self)
def start_receive(self, stream):
"""
@ -2995,6 +3023,7 @@ class Broker(object):
except Exception:
LOG.exception('_broker_main() crashed')
self._alive = False # Ensure _alive is consistent on crash.
self._exitted = True
self._broker_exit()
@ -3198,7 +3227,7 @@ class ExternalContext(object):
Router.max_message_size = self.config['max_message_size']
if self.config['profiling']:
enable_profiling()
self.broker = Broker()
self.broker = Broker(activate_compat=False)
self.router = Router(self.broker)
self.router.debug = self.config.get('debug', False)
self.router.undirectional = self.config['unidirectional']
@ -3367,6 +3396,7 @@ class ExternalContext(object):
socket.gethostname())
_v and LOG.debug('Recovered sys.executable: %r', sys.executable)
self.broker._py24_25_compat()
self.dispatcher.run()
_v and LOG.debug('ExternalContext.main() normal exit')
except KeyboardInterrupt:

@ -0,0 +1,183 @@
# Copyright 2019, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
Support for operating in a mixed threading/forking environment.
"""
import os
import socket
import sys
import weakref
import mitogen.core
# List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this
# list and mitogen.service registers its Pool too.
_brokers = weakref.WeakKeyDictionary()
_pools = weakref.WeakKeyDictionary()
def _notice_broker_or_pool(obj):
"""
Used by :mod:`mitogen.core` and :mod:`mitogen.service` to automatically
register every broker and pool on Python 2.4/2.5.
"""
if isinstance(obj, mitogen.core.Broker):
_brokers[obj] = True
else:
_pools[obj] = True
def wrap_os__fork():
corker = Corker(
brokers=list(_brokers),
pools=list(_pools),
)
try:
corker.cork()
return os__fork()
finally:
corker.uncork()
# If Python 2.4/2.5 where threading state is not fixed up, subprocess.Popen()
# may still deadlock due to the broker thread. In this case, pause os.fork() so
# that all active threads are paused during fork.
if sys.version_info < (2, 6):
os__fork = os.fork
os.fork = wrap_os__fork
class Corker(object):
"""
Arrange for :class:`mitogen.core.Broker` and optionally
:class:`mitogen.service.Pool` to be temporarily "corked" while fork
operations may occur.
In a mixed threading/forking environment, it is critical no threads are
active at the moment of fork, as they could hold mutexes whose state is
unrecoverably snapshotted in the locked state in the fork child, causing
deadlocks at random future moments.
To ensure a target thread has all locks dropped, it is made to write a
large string to a socket with a small buffer that has :data:`os.O_NONBLOCK`
disabled. CPython will drop the GIL and enter the ``write()`` system call,
where it will block until the socket buffer is drained, or the write side
is closed.
:class:`mitogen.core.Poller` is used to ensure the thread really has
blocked outside any Python locks, by checking if the socket buffer has
started to fill.
Since this necessarily involves posting a message to every existent thread
and verifying acknowledgement, it will never be a fast operation.
This does not yet handle the case of corking being initiated from within a
thread that is also a cork target.
:param brokers:
Sequence of :class:`mitogen.core.Broker` instances to cork.
:param pools:
Sequence of :class:`mitogen.core.Pool` instances to cork.
"""
def __init__(self, brokers=(), pools=()):
self.brokers = brokers
self.pools = pools
def _do_cork(self, s, wsock):
try:
try:
while True:
# at least EINTR is possible. Do our best to keep handling
# outside the GIL in this case using sendall().
wsock.sendall(s)
except socket.error:
pass
finally:
wsock.close()
def _cork_one(self, s, obj):
"""
Construct a socketpair, saving one side of it, and passing the other to
`obj` to be written to by one of its threads.
"""
rsock, wsock = mitogen.parent.create_socketpair(size=4096)
mitogen.core.set_cloexec(rsock.fileno())
mitogen.core.set_cloexec(wsock.fileno())
mitogen.core.set_block(wsock) # gevent
self._rsocks.append(rsock)
obj.defer(self._do_cork, s, wsock)
def _verify_one(self, rsock):
"""
Pause until the socket `rsock` indicates readability, due to
:meth:`_do_cork` triggering a blocking write on another thread.
"""
poller = mitogen.core.Poller()
poller.start_receive(rsock.fileno())
try:
while True:
for fd in poller.poll():
return
finally:
poller.close()
def cork(self):
"""
Arrange for any associated brokers and pools to be paused with no locks
held. This will not return until each thread acknowledges it has ceased
execution.
"""
s = mitogen.core.b('CORK') * ((128 // 4) * 1024)
self._rsocks = []
# Pools must be paused first, as existing work may require the
# participation of a broker in order to complete.
for pool in self.pools:
if not pool.closed:
for x in range(pool.size):
self._cork_one(s, pool)
for broker in self.brokers:
if broker._alive:
self._cork_one(s, broker)
# Pause until we can detect every thread has entered write().
for rsock in self._rsocks:
self._verify_one(rsock)
def uncork(self):
"""
Arrange for paused threads to resume operation.
"""
for rsock in self._rsocks:
rsock.close()

@ -253,7 +253,7 @@ def close_nonstandard_fds():
pass
def create_socketpair():
def create_socketpair(size=None):
"""
Create a :func:`socket.socketpair` to use for use as a child process's UNIX
stdio channels. As socket pairs are bidirectional, they are economical on
@ -265,10 +265,10 @@ def create_socketpair():
parentfp, childfp = socket.socketpair()
parentfp.setsockopt(socket.SOL_SOCKET,
socket.SO_SNDBUF,
mitogen.core.CHUNK_SIZE)
size or mitogen.core.CHUNK_SIZE)
childfp.setsockopt(socket.SOL_SOCKET,
socket.SO_RCVBUF,
mitogen.core.CHUNK_SIZE)
size or mitogen.core.CHUNK_SIZE)
return parentfp, childfp
@ -371,11 +371,12 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None):
def _acquire_controlling_tty():
os.setsid()
if sys.platform == 'linux2':
if sys.platform in ('linux', 'linux2'):
# On Linux, the controlling tty becomes the first tty opened by a
# process lacking any prior tty.
os.close(os.open(os.ttyname(2), os.O_RDWR))
if hasattr(termios, 'TIOCSCTTY'):
if hasattr(termios, 'TIOCSCTTY') and not mitogen.core.IS_WSL:
# #550: prehistoric WSL does not like TIOCSCTTY.
# On BSD an explicit ioctl is required. For some inexplicable reason,
# Python 2.6 on Travis also requires it.
fcntl.ioctl(2, termios.TIOCSCTTY)
@ -890,10 +891,58 @@ class CallSpec(object):
)
class PollPoller(mitogen.core.Poller):
"""
Poller based on the POSIX poll(2) interface. Not available on some versions
of OS X, otherwise it is the preferred poller for small FD counts.
"""
SUPPORTED = hasattr(select, 'poll')
_repr = 'PollPoller()'
def __init__(self):
super(PollPoller, self).__init__()
self._pollobj = select.poll()
# TODO: no proof we dont need writemask too
_readmask = (
getattr(select, 'POLLIN', 0) |
getattr(select, 'POLLHUP', 0)
)
def _update(self, fd):
mask = (((fd in self._rfds) and self._readmask) |
((fd in self._wfds) and select.POLLOUT))
if mask:
self._pollobj.register(fd, mask)
else:
try:
self._pollobj.unregister(fd)
except KeyError:
pass
def _poll(self, timeout):
if timeout:
timeout *= 1000
events, _ = mitogen.core.io_op(self._pollobj.poll, timeout)
for fd, event in events:
if event & self._readmask:
IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if event & select.POLLOUT:
IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
class KqueuePoller(mitogen.core.Poller):
"""
Poller based on the FreeBSD/Darwin kqueue(2) interface.
"""
SUPPORTED = hasattr(select, 'kqueue')
_repr = 'KqueuePoller()'
def __init__(self):
@ -971,6 +1020,7 @@ class EpollPoller(mitogen.core.Poller):
"""
Poller based on the Linux epoll(2) interface.
"""
SUPPORTED = hasattr(select, 'epoll')
_repr = 'EpollPoller()'
def __init__(self):
@ -1041,20 +1091,18 @@ class EpollPoller(mitogen.core.Poller):
yield data
if sys.version_info < (2, 6):
# 2.4 and 2.5 only had select.select() and select.poll().
POLLER_BY_SYSNAME = {}
else:
POLLER_BY_SYSNAME = {
'Darwin': KqueuePoller,
'FreeBSD': KqueuePoller,
'Linux': EpollPoller,
}
# 2.4 and 2.5 only had select.select() and select.poll().
for _klass in mitogen.core.Poller, PollPoller, KqueuePoller, EpollPoller:
if _klass.SUPPORTED:
PREFERRED_POLLER = _klass
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
os.uname()[0],
mitogen.core.Poller,
)
# For apps that start threads dynamically, it's possible Latch will also get
# very high-numbered wait fds when there are many connections, and so select()
# becomes useless there too. So swap in our favourite poller.
if PollPoller.SUPPORTED:
mitogen.core.Latch.poller_class = PollPoller
else:
mitogen.core.Latch.poller_class = PREFERRED_POLLER
class DiagLogStream(mitogen.core.BasicStream):

@ -35,12 +35,25 @@ class Error(mitogen.core.Error):
pass
class Event(object):
"""
Represents one selected event.
"""
#: The first Receiver or Latch the event traversed.
source = None
#: The :class:`mitogen.core.Message` delivered to a receiver, or the object
#: posted to a latch.
data = None
class Select(object):
"""
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances
and returns the first value posted to any receiver or select.
:class:`receivers <mitogen.core.Receiver>`,
:class:`channels <mitogen.core.Channel>`,
:class:`latches <mitogen.core.Latch>`, and
:class:`sub-selects <Select>`.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :meth:`__iter__` terminates once the final receiver is
@ -84,6 +97,19 @@ class Select(object):
for msg in mitogen.select.Select(selects):
print(msg.unpickle())
:class:`Select` may be used to mix inter-thread and inter-process IO:
latch = mitogen.core.Latch()
start_thread(latch)
recv = remote_host.call_async(os.getuid)
sel = Select([latch, recv])
event = sel.get_event()
if event.source is latch:
# woken by a local thread
else:
# woken by function call result
"""
notify = None
@ -145,14 +171,29 @@ class Select(object):
def __exit__(self, e_type, e_val, e_tb):
self.close()
def __iter__(self):
def iter_data(self):
"""
Yield the result of :meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
Yield :attr:`Event.data` until no receivers remain in the select,
either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :meth:`remove`.
:meth:`__iter__` is an alias for :meth:`iter_data`, allowing loops
like::
for msg in Select([recv1, recv2]):
print msg.unpickle()
"""
while self._receivers:
yield self.get()
yield self.get_event().data
__iter__ = iter_data
def iter_events(self):
"""
Yield :class:`Event` instances until no receivers remain in the select.
"""
while self._receivers:
yield self.get_event()
loop_msg = 'Adding this Select instance would create a Select cycle'
@ -170,8 +211,8 @@ class Select(object):
def add(self, recv):
"""
Add the :class:`mitogen.core.Receiver` or :class:`Select` `recv` to the
select.
Add a :class:`mitogen.core.Receiver`, :class:`Select` or
:class:`mitogen.core.Latch` to the select.
:raises mitogen.select.Error:
An attempt was made to add a :class:`Select` to which this select
@ -193,10 +234,9 @@ class Select(object):
def remove(self, recv):
"""
Remove the :class:`mitogen.core.Receiver` or :class:`Select` `recv`
from the select. Note that if the receiver has notified prior to
:meth:`remove`, it will still be returned by a subsequent :meth:`get`.
This may change in a future version.
Remove an object from from the select. Note that if the receiver has
notified prior to :meth:`remove`, it will still be returned by a
subsequent :meth:`get`. This may change in a future version.
"""
try:
if recv.notify != self._put:
@ -240,7 +280,14 @@ class Select(object):
def get(self, timeout=None, block=True):
"""
Fetch the next available value from any receiver, or raise
Call `get_event(timeout, block)` returning :attr:`Event.data` of the
first available event.
"""
return self.get_event(timeout, block).data
def get_event(self, timeout=None, block=True):
"""
Fetch the next available :class:`Event` from any source, or raise
:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
@ -253,7 +300,7 @@ class Select(object):
If :data:`False`, immediately raise
:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:class:`mitogen.core.Message`
:class:`Event`.
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
@ -263,14 +310,21 @@ class Select(object):
if not self._receivers:
raise Error(self.empty_msg)
event = Event()
while True:
recv = self._latch.get(timeout=timeout, block=block)
try:
msg = recv.get(block=False)
if isinstance(recv, Select):
event = recv.get_event(block=False)
else:
event.source = recv
event.data = recv.get(block=False)
if self._oneshot:
self.remove(recv)
msg.receiver = recv
return msg
if isinstance(recv, mitogen.core.Receiver):
# Remove in 0.3.x.
event.data.receiver = recv
return event
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another

@ -411,10 +411,12 @@ class Service(object):
def __repr__(self):
return '%s()' % (self.__class__.__name__,)
def on_message(self, recv, msg):
def on_message(self, event):
"""
Called when a message arrives on any of :attr:`select`'s registered
receivers.
:param mitogen.select.Event event:
"""
pass
@ -449,9 +451,10 @@ class Pool(object):
"""
activator_class = Activator
def __init__(self, router, services, size=1, overwrite=False):
def __init__(self, router, services=(), size=1, overwrite=False):
self.router = router
self._activator = self.activator_class()
self._ipc_latch = mitogen.core.Latch()
self._receiver = mitogen.core.Receiver(
router=router,
handle=mitogen.core.CALL_SERVICE,
@ -460,13 +463,18 @@ class Pool(object):
self._select = mitogen.select.Select(oneshot=False)
self._select.add(self._receiver)
self._select.add(self._ipc_latch)
#: Serialize service construction.
self._lock = threading.Lock()
self._func_by_recv = {self._receiver: self._on_service_call}
self._func_by_source = {
self._receiver: self._on_service_call,
self._ipc_latch: self._on_ipc_latch,
}
self._invoker_by_name = {}
for service in services:
self.add(service)
self._py_24_25_compat()
self._threads = []
for x in range(size):
name = 'mitogen.service.Pool.%x.worker-%d' % (id(self), x,)
@ -480,6 +488,13 @@ class Pool(object):
LOG.debug('%r: initialized', self)
def _py_24_25_compat(self):
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner sending mitogen.fork
# to all mitogen.service importers.
os_fork = mitogen.core.import_module('mitogen.os_fork')
os_fork._notice_broker_or_pool(self)
@property
def size(self):
return len(self._threads)
@ -488,10 +503,10 @@ class Pool(object):
name = service.name()
if name in self._invoker_by_name:
raise Error('service named %r already registered' % (name,))
assert service.select not in self._func_by_recv
assert service.select not in self._func_by_source
invoker = service.invoker_class(service=service)
self._invoker_by_name[name] = invoker
self._func_by_recv[service.select] = service.on_message
self._func_by_source[service.select] = service.on_message
closed = False
@ -534,7 +549,18 @@ class Pool(object):
isinstance(tup[2], dict)):
raise mitogen.core.CallError('Invalid message format.')
def _on_service_call(self, recv, msg):
def defer(self, func, *args, **kwargs):
"""
Arrange for `func(*args, **kwargs)` to be invoked in the context of a
service pool thread.
"""
self._ipc_latch.put(lambda: func(*args, **kwargs))
def _on_ipc_latch(self, event):
event.data()
def _on_service_call(self, event):
msg = event.data
service_name = None
method_name = None
try:
@ -555,17 +581,17 @@ class Pool(object):
def _worker_run(self):
while not self.closed:
try:
msg = self._select.get()
event = self._select.get_event()
except (mitogen.core.ChannelError, mitogen.core.LatchError):
e = sys.exc_info()[1]
LOG.debug('%r: channel or latch closed, exitting: %s', self, e)
return
func = self._func_by_recv[msg.receiver]
func = self._func_by_source[event.source]
try:
func(msg.receiver, msg)
func(event)
except Exception:
LOG.exception('While handling %r using %r', msg, func)
LOG.exception('While handling %r using %r', event.data, func)
def _worker_main(self):
try:

@ -5,6 +5,7 @@
# tansport()
tc-transport-unset
tc-transport-local ansible_connection=local
tc-transport-smart ansible_connection=smart
# python_path()
tc-python-path-unset

@ -2,6 +2,7 @@
- include: fixup_perms2__copy.yml
- include: low_level_execute_command.yml
- include: make_tmp_path.yml
- include: make_tmp_path__double.yml
- include: remote_expand_user.yml
- include: remote_file_exists.yml
- include: remove_tmp_path.yml

@ -19,22 +19,26 @@
#
- name: "Find regular temp path"
action_passthrough:
method: _make_tmp_path
mitogen_action_script:
script: |
path = self._make_tmp_path()
self._remove_tmp_path(path)
register: tmp_path
- name: "Find regular temp path (new task)"
action_passthrough:
method: _make_tmp_path
mitogen_action_script:
script: |
path = self._make_tmp_path()
self._remove_tmp_path(path)
register: tmp_path2
- name: "Find good temp path"
set_fact:
good_temp_path: "{{tmp_path.result|dirname}}"
good_temp_path: "{{tmp_path.path|dirname}}"
- name: "Find good temp path (new task)"
set_fact:
good_temp_path2: "{{tmp_path2.result|dirname}}"
good_temp_path2: "{{tmp_path2.path|dirname}}"
- name: "Verify common base path for both tasks"
assert:
@ -44,7 +48,7 @@
- name: "Verify different subdir for both tasks"
assert:
that:
- tmp_path.result != tmp_path2.result
- tmp_path.path != tmp_path2.path
#
# Verify subdirectory removal.
@ -52,12 +56,12 @@
- name: Stat temp path
stat:
path: "{{tmp_path.result}}"
path: "{{tmp_path.path}}"
register: stat1
- name: Stat temp path (new task)
stat:
path: "{{tmp_path2.result}}"
path: "{{tmp_path2.path}}"
register: stat2
- name: "Verify neither subdir exists any more"
@ -108,14 +112,17 @@
- name: "Find root temp path"
become: true
action_passthrough:
method: _make_tmp_path
mitogen_action_script:
script: |
path = self._make_tmp_path()
self._remove_tmp_path(path)
register: tmp_path_root
- name: "Verify root temp path differs from regular path"
assert:
that:
- tmp_path2.result != tmp_path_root.result
- tmp_path2.path != tmp_path_root.path
- tmp_path2.path|dirname != tmp_path_root.path|dirname
#
# readonly homedir

@ -0,0 +1,20 @@
# issue #554: double calls to make_tmp_path() fail with assertion error. Ensure
# they succeed and are cleaned up correctly.
- hosts: target
tasks:
- mitogen_action_script:
script: |
result['t1'] = self._make_tmp_path()
result['t2'] = self._make_tmp_path()
assert result['t1'] != result['t2']
assert self._remote_file_exists(result['t1'])
assert self._remote_file_exists(result['t2'])
self._remove_tmp_path(result['t1'])
self._remove_tmp_path(result['t2'])
register: out
- mitogen_action_script:
script: |
assert not self._remote_file_exists("{{ out.t1 }}")
assert not self._remote_file_exists("{{ out.t2 }}")

@ -6,9 +6,6 @@
hosts: test-targets
any_errors_fatal: true
tasks:
- meta: end_play
when: not is_mitogen
#
# Use the copy module to cause a temporary directory to be created, and
# return a result with a 'src' attribute pointing into that directory.

@ -12,6 +12,7 @@
- include: custom_python_json_args_module.yml
- include: custom_python_new_style_missing_interpreter.yml
- include: custom_python_new_style_module.yml
- include: custom_python_prehistoric_module.yml
- include: custom_python_want_json_module.yml
- include: custom_script_interpreter.yml
- include: environment_isolation.yml

@ -0,0 +1,10 @@
# issue #555
- name: integration/runner/custom_python_prehistoric_module.yml
hosts: test-targets
any_errors_fatal: true
tasks:
- custom_python_prehistoric_module:
register: out
- assert: that=out.ok

@ -8,3 +8,4 @@
- include: remote_addr.yml
- include: remote_user.yml
- include: transport.yml
- include: transport__smart.yml

@ -0,0 +1,49 @@
# issue #548
# Each case is followed by mitogen_via= case to test hostvars method.
# When no ansible_connection= is set, transport comes via ansible.cfg ("smart"
# is parsed away to either paramiko or ssh).
- name: integration/transport_config/transport.yml
hosts: tc-transport-smart
tasks:
- include: ../_mitogen_only.yml
- {mitogen_get_stack: {}, register: out}
- assert_equal:
left: out.result[0].method
right: "ssh"
- hosts: tc-transport-local
vars: {mitogen_via: tc-transport-smart}
tasks:
- include: ../_mitogen_only.yml
- {mitogen_get_stack: {}, register: out}
- assert_equal:
left: out.result[0].method
right: "ssh"
- assert_equal:
left: out.result[1].method
right: "local"
# ansible_connection=local
- hosts: tc-transport-local
tasks:
- include: ../_mitogen_only.yml
- {mitogen_get_stack: {}, register: out}
- assert_equal:
left: out.result[0].method
right: "local"
- hosts: tc-transport-smart
vars: {mitogen_via: tc-transport-local}
tasks:
- include: ../_mitogen_only.yml
- {mitogen_get_stack: {}, register: out}
- assert_equal:
left: out.result[0].method
right: "local"
- assert_equal:
left: out.result[1].method
right: "ssh"

@ -5,6 +5,21 @@ import sys
from ansible.plugins.action import ActionBase
try:
long
except NameError:
long = int
try:
unicode
except NameError:
unicode = str
try:
bytes
except NameError:
bytes = str
def execute(s, gbls, lcls):
if sys.version_info > (3,):
@ -16,12 +31,25 @@ def execute(s, gbls, lcls):
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
super(ActionModule, self).run(tmp=tmp, task_vars=task_vars)
lcls = {
'self': self,
'result': {}
}
lcls = {}
# Capture keys to remove later, including any crap Python adds.
execute('pass', globals(), lcls)
lcls['self'] = self
# Old mitogen_action_script used explicit result dict.
lcls['result'] = lcls
pre_keys = list(lcls)
execute(self._task.args['script'], globals(), lcls)
return lcls['result']
for key in pre_keys:
del lcls[key]
for key in list(lcls):
if not isinstance(lcls[key],
(unicode, bytes, int, long, dict, list, tuple,
bool)):
del lcls[key]
return lcls
if __name__ == '__main__':

@ -0,0 +1,23 @@
#!/usr/bin/python
# issue #555: I'm a module that cutpastes an old hack.
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.basic import get_module_path
from ansible.module_utils import six
import os
import pwd
import socket
import sys
import sys
reload(sys)
sys.setdefaultencoding('utf8')
def main():
module = AnsibleModule(argument_spec={})
module.exit_json(ok=True)
if __name__ == '__main__':
main()

@ -17,6 +17,10 @@ class NullFixedPolicy(ansible_mitogen.affinity.FixedPolicy):
self.mask = mask
@unittest2.skipIf(
reason='Linux only',
condition=(not os.uname()[0] == 'Linux')
)
class FixedPolicyTest(testlib.TestCase):
klass = NullFixedPolicy

@ -9,6 +9,7 @@ import time
import mitogen
import mitogen.service
import ansible_mitogen.affinity
def prepare():
@ -45,6 +46,8 @@ def run_test(router, fp, s, context):
@mitogen.main()
def main(router):
ansible_mitogen.affinity.policy.assign_muxprocess()
bigfile = tempfile.NamedTemporaryFile()
fill_with_random(bigfile, 1048576*512)

@ -1,4 +1,6 @@
import sys
import unittest2
import mitogen.service
@ -32,10 +34,15 @@ class FetchTest(testlib.RouterMixin, testlib.TestCase):
expect = service.unregistered_msg % ('/etc/shadow',)
self.assertTrue(expect in e.args[0])
if sys.platform == 'darwin':
ROOT_GROUP = 'wheel'
else:
ROOT_GROUP = 'root'
def _validate_response(self, resp):
self.assertTrue(isinstance(resp, dict))
self.assertEquals('root', resp['owner'])
self.assertEquals('root', resp['group'])
self.assertEquals(self.ROOT_GROUP, resp['group'])
self.assertTrue(isinstance(resp['mode'], int))
self.assertTrue(isinstance(resp['mtime'], float))
self.assertTrue(isinstance(resp['atime'], float))

@ -0,0 +1,52 @@
import testlib
import unittest2
import mitogen.os_fork
import mitogen.service
class CorkTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.os_fork.Corker
def ping(self, latch):
latch.put('pong')
def test_cork_broker(self):
latch = mitogen.core.Latch()
self.broker.defer(self.ping, latch)
self.assertEquals('pong', latch.get())
corker = self.klass(brokers=(self.broker,))
corker.cork()
latch = mitogen.core.Latch()
self.broker.defer(self.ping, latch)
self.assertRaises(mitogen.core.TimeoutError,
lambda: latch.get(timeout=0.5))
corker.uncork()
self.assertEquals('pong', latch.get())
def test_cork_pool(self):
pool = mitogen.service.Pool(self.router, services=(), size=4)
try:
latch = mitogen.core.Latch()
pool.defer(self.ping, latch)
self.assertEquals('pong', latch.get())
corker = self.klass(pools=(pool,))
corker.cork()
latch = mitogen.core.Latch()
pool.defer(self.ping, latch)
self.assertRaises(mitogen.core.TimeoutError,
lambda: latch.get(timeout=0.5))
corker.uncork()
self.assertEquals('pong', latch.get())
finally:
pool.stop(join=True)
if __name__ == '__main__':
unittest2.main()

@ -401,16 +401,25 @@ class SelectTest(AllMixin, testlib.TestCase):
klass = mitogen.core.Poller
SelectTest = unittest2.skipIf(
condition=not hasattr(select, 'select'),
condition=(not SelectTest.klass.SUPPORTED),
reason='select.select() not supported'
)(SelectTest)
class PollTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.PollPoller
PollTest = unittest2.skipIf(
condition=(not PollTest.klass.SUPPORTED),
reason='select.poll() not supported'
)(PollTest)
class KqueueTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.KqueuePoller
KqueueTest = unittest2.skipIf(
condition=not hasattr(select, 'kqueue'),
condition=(not KqueueTest.klass.SUPPORTED),
reason='select.kqueue() not supported'
)(KqueueTest)
@ -419,7 +428,7 @@ class EpollTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.EpollPoller
EpollTest = unittest2.skipIf(
condition=not hasattr(select, 'epoll'),
condition=(not EpollTest.klass.SUPPORTED),
reason='select.epoll() not supported'
)(EpollTest)

@ -74,8 +74,9 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
context = self.router.local()
self.assertEquals(256, context.call(plain_old_module.pow, 2, 8))
self.assertEquals(1, self.router.responder.get_module_count)
self.assertEquals(1, self.router.responder.good_load_module_count)
os_fork = int(sys.version_info < (2, 6)) # mitogen.os_fork
self.assertEquals(1+os_fork, self.router.responder.get_module_count)
self.assertEquals(1+os_fork, self.router.responder.good_load_module_count)
self.assertLess(300, self.router.responder.good_load_module_size)
def test_simple_pkg(self):
@ -84,8 +85,9 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
context = self.router.local()
self.assertEquals(3,
context.call(simple_pkg.a.subtract_one_add_two, 2))
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(3, self.router.responder.good_load_module_count)
os_fork = int(sys.version_info < (2, 6)) # mitogen.os_fork
self.assertEquals(2+os_fork, self.router.responder.get_module_count)
self.assertEquals(3+os_fork, self.router.responder.good_load_module_count)
self.assertEquals(0, self.router.responder.bad_load_module_count)
self.assertLess(450, self.router.responder.good_load_module_size)
@ -185,9 +187,10 @@ class ForwardTest(testlib.RouterMixin, testlib.TestCase):
c1 = self.router.local()
c2 = self.router.local(via=c1)
os_fork = int(sys.version_info < (2, 6))
self.assertEquals(256, c2.call(plain_old_module.pow, 2, 8))
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(2, self.router.responder.good_load_module_count)
self.assertEquals(2+os_fork, self.router.responder.get_module_count)
self.assertEquals(2+os_fork, self.router.responder.good_load_module_count)
self.assertLess(10000, self.router.responder.good_load_module_size)
self.assertGreater(40000, self.router.responder.good_load_module_size)

@ -174,6 +174,8 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase):
expect = '_broker_main() crashed'
self.assertTrue(expect in log.stop())
self.broker.join()
class AddHandlerTest(testlib.TestCase):
klass = mitogen.master.Router

@ -9,6 +9,18 @@ import testlib
class BoolTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_latch(self):
latch = mitogen.core.Latch() # oneshot
select = self.klass()
self.assertFalse(select)
select.add(latch)
self.assertTrue(select)
latch.put(123)
self.assertTrue(select)
self.assertEquals(123, select.get())
self.assertFalse(select)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router) # oneshot
select = self.klass()
@ -25,6 +37,14 @@ class BoolTest(testlib.RouterMixin, testlib.TestCase):
class AddTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_latch(self):
latch = mitogen.core.Latch()
select = self.klass()
select.add(latch)
self.assertEquals(1, len(select._receivers))
self.assertEquals(latch, select._receivers[0])
self.assertEquals(select._put, latch.notify)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass()
@ -98,14 +118,14 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
class RemoveTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_empty(self):
def test_receiver_empty(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_absent(self):
def test_receiver_absent(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
recv2 = mitogen.core.Receiver(self.router)
@ -114,7 +134,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_present(self):
def test_receiver_present(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
select.add(recv)
@ -122,6 +142,30 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, recv.notify)
def test_latch_empty(self):
select = self.klass()
latch = mitogen.core.Latch()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(latch))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_latch_absent(self):
select = self.klass()
latch = mitogen.core.Latch()
latch2 = mitogen.core.Latch()
select.add(latch2)
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(latch))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_latch_present(self):
select = self.klass()
latch = mitogen.core.Latch()
select.add(latch)
select.remove(latch)
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
class CloseTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
@ -130,6 +174,18 @@ class CloseTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass()
select.close() # No effect.
def test_one_latch(self):
select = self.klass()
latch = mitogen.core.Latch()
select.add(latch)
self.assertEquals(1, len(select._receivers))
self.assertEquals(select._put, latch.notify)
select.close()
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
def test_one_receiver(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
@ -174,18 +230,35 @@ class EmptyTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass([recv])
self.assertTrue(select.empty())
def test_nonempty_before_add(self):
def test_nonempty_receiver_before_add(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
self.assertFalse(select.empty())
def test_nonempty_after_add(self):
def test_nonempty__receiver_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
recv._on_receive(mitogen.core.Message.pickled('123'))
self.assertFalse(select.empty())
def test_empty_latch(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertTrue(select.empty())
def test_nonempty_latch_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertFalse(select.empty())
def test_nonempty__latch_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertFalse(select.empty())
class IterTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
@ -194,18 +267,24 @@ class IterTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass()
self.assertEquals([], list(select))
def test_nonempty(self):
def test_nonempty_receiver(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
self.assertEquals([msg], list(select))
def test_nonempty_latch(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals([123], list(select))
class OneShotTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_true_removed_after_get(self):
def test_true_receiver_removed_after_get(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
@ -215,7 +294,7 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, recv.notify)
def test_false_persists_after_get(self):
def test_false_receiver_persists_after_get(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv], oneshot=False)
msg = mitogen.core.Message.pickled('123')
@ -226,8 +305,26 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(recv, select._receivers[0])
self.assertEquals(select._put, recv.notify)
def test_true_latch_removed_after_get(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals(123, select.get())
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
class GetTest(testlib.RouterMixin, testlib.TestCase):
def test_false_latch_persists_after_get(self):
latch = mitogen.core.Latch()
select = self.klass([latch], oneshot=False)
latch.put(123)
self.assertEquals(123, select.get())
self.assertEquals(1, len(select._receivers))
self.assertEquals(latch, select._receivers[0])
self.assertEquals(select._put, latch.notify)
class GetReceiverTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_no_receivers(self):
@ -285,5 +382,79 @@ class GetTest(testlib.RouterMixin, testlib.TestCase):
lambda: select.get(timeout=0.0))
class GetLatchTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_no_latches(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg)
def test_timeout_no_receivers(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get(timeout=1.0))
self.assertEquals(str(exc), self.klass.empty_msg)
def test_zero_timeout(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
def test_timeout(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.1))
def test_nonempty_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertEquals(123, select.get())
def test_nonempty_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals(123, latch.get())
def test_drained_by_other_thread(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertEquals(123, latch.get())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
class GetEventTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_empty(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg)
def test_latch(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
event = select.get_event()
self.assertEquals(latch, event.source)
self.assertEquals(123, event.data)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
event = select.get_event()
self.assertEquals(recv, event.source)
self.assertEquals('123', event.data.unpickle())
if __name__ == '__main__':
unittest2.main()

@ -8,11 +8,23 @@ from mitogen.core import b
import testlib
class EvilObject(object):
pass
def roundtrip(v):
msg = mitogen.core.Message.pickled(v)
return mitogen.core.Message(data=msg.data).unpickle()
class EvilObjectTest(testlib.TestCase):
def test_deserialization_fails(self):
msg = mitogen.core.Message.pickled(EvilObject())
e = self.assertRaises(mitogen.core.StreamError,
lambda: msg.unpickle()
)
class BlobTest(testlib.TestCase):
klass = mitogen.core.Blob

@ -432,6 +432,7 @@ class BrokerMixin(object):
if not self.broker_shutdown:
self.broker.shutdown()
self.broker.join()
del self.broker
super(BrokerMixin, self).tearDown()
def sync_with_broker(self):
@ -445,11 +446,17 @@ class RouterMixin(BrokerMixin):
super(RouterMixin, self).setUp()
self.router = self.router_class(self.broker)
def tearDown(self):
del self.router
super(RouterMixin, self).tearDown()
class DockerMixin(RouterMixin):
@classmethod
def setUpClass(cls):
super(DockerMixin, cls).setUpClass()
if os.environ.get('SKIP_DOCKER_TESTS'):
raise unittest2.SkipTest('SKIP_DOCKER_TESTS is set')
cls.dockerized_ssh = DockerizedSshDaemon()
cls.dockerized_ssh.wait_for_sshd()

Loading…
Cancel
Save