Merge branch 'python3'

Hooray \o/

Remaining issues:

- Two unit test races that appear related to our broken zombie process
  reaping, doesn't impact Ansible.

Closes #16.
pull/303/head
David Wilson 8 years ago
commit d493a3d7ca

@ -7,11 +7,21 @@ notifications:
email: false
language: python
cache:
- pip
- directories:
- /home/travis/virtualenv
install:
- pip install -r dev_requirements.txt
script:
- ${TRAVIS_BUILD_DIR}/.travis/${MODE}_tests.sh
services:
- docker
matrix:
include:
# Mitogen tests.
@ -27,6 +37,9 @@ matrix:
# 2.6 -> 2.6
- python: "2.6"
env: MODE=mitogen DISTRO=centos6
# 3.6 -> 2.7
- python: "3.6"
env: MODE=mitogen DISTRO=debian
# Debops tests.
# 2.4.3.0; 2.7 -> 2.7
@ -35,6 +48,9 @@ matrix:
# 2.5.5; 2.7 -> 2.7
- python: "2.7"
env: MODE=debops_common VER=2.5.5
# 2.5.5; 3.6 -> 2.7
- python: "3.6"
env: MODE=debops_common VER=2.5.5
# ansible_mitogen tests.
# 2.4.3.0; Debian; 2.7 -> 2.7
@ -55,16 +71,10 @@ matrix:
# 2.5.5; CentOS; 2.6 -> 2.6
- python: "2.6"
env: MODE=ansible VER=2.5.5 DISTRO=centos6
# 2.5.5; Debian; 3.6 -> 2.7
- python: "3.6"
env: MODE=ansible VER=2.5.5 DISTRO=centos6
# Sanity check our tests against vanilla Ansible, they should pass.
- python: "2.7"
env: MODE=ansible VER=2.5.5 DISTRO=debian STRATEGY=linear
install:
- pip install -r dev_requirements.txt
script:
- ${TRAVIS_BUILD_DIR}/.travis/${MODE}_tests.sh
services:
- docker

@ -3,7 +3,7 @@
TRAVIS_BUILD_DIR="${TRAVIS_BUILD_DIR:-`pwd`}"
TMPDIR="/tmp/ansible-tests-$$"
ANSIBLE_VERSION="${VER:-2.4.3.0}"
ANSIBLE_VERSION="${VER:-2.5.5}"
export ANSIBLE_STRATEGY="${STRATEGY:-mitogen_linear}"
DISTRO="${DISTRO:-debian}"

@ -4,7 +4,7 @@
TMPDIR="/tmp/debops-$$"
TRAVIS_BUILD_DIR="${TRAVIS_BUILD_DIR:-`pwd`}"
TARGET_COUNT="${TARGET_COUNT:-2}"
ANSIBLE_VERSION="${VER:-2.4.3.0}"
ANSIBLE_VERSION="${VER:-2.5.5}"
DISTRO=debian # Naturally DebOps only supports Debian.
export PYTHONPATH="${PYTHONPATH}:${TRAVIS_BUILD_DIR}"

@ -27,6 +27,8 @@
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
from __future__ import unicode_literals
import logging
import os
import shlex
@ -239,7 +241,7 @@ def config_from_play_context(transport, inventory_name, connection):
'timeout': connection._play_context.timeout,
'ansible_ssh_timeout': connection.ansible_ssh_timeout,
'ssh_args': [
term
mitogen.core.to_text(term)
for s in (
getattr(connection._play_context, 'ssh_args', ''),
getattr(connection._play_context, 'ssh_common_args', ''),
@ -249,7 +251,7 @@ def config_from_play_context(transport, inventory_name, connection):
],
'become_exe': connection._play_context.become_exe,
'sudo_args': [
term
mitogen.core.to_text(term)
for s in (
connection._play_context.sudo_flags,
connection._play_context.become_flags
@ -351,7 +353,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
def __init__(self, play_context, new_stdin, **kwargs):
assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
'Mitogen connection types may only be instantiated '
'while the "mitogen" strategy is active.'
'while the "mitogen" strategy is active.'
)
super(Connection, self).__init__(play_context, new_stdin)
@ -528,7 +530,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return self.call_async(func, *args, **kwargs).get().unpickle()
finally:
LOG.debug('Call took %d ms: %s%r', 1000 * (time.time() - t0),
func.func_name, args)
func.__name__, args)
def create_fork_child(self):
"""

@ -41,7 +41,7 @@ class Handler(logging.Handler):
"""
def __init__(self, display, normal_method):
logging.Handler.__init__(self)
self.formatter = mitogen.utils.log_get_formatter(usec=True)
self.formatter = mitogen.utils.log_get_formatter()
self.display = display
self.normal_method = normal_method

@ -27,13 +27,17 @@
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
import commands
import logging
import os
import pwd
import shutil
import traceback
try:
from shlex import quote as shlex_quote
except ImportError:
from pipes import quote as shlex_quote
from ansible.module_utils._text import to_bytes
from ansible.parsing.utils.jsonify import jsonify
@ -321,7 +325,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
ansible_mitogen.planner.Invocation(
action=self,
connection=self._connection,
module_name=mitogen.utils.cast(module_name),
module_name=mitogen.core.to_text(module_name),
module_args=mitogen.utils.cast(module_args),
task_vars=task_vars,
templar=self._templar,
@ -369,7 +373,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
if executable is None: # executable defaults to False
executable = self._play_context.executable
if executable:
cmd = executable + ' -c ' + commands.mkarg(cmd)
cmd = executable + ' -c ' + shlex_quote(cmd)
rc, stdout, stderr = self._connection.exec_command(
cmd=cmd,

@ -27,6 +27,8 @@
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
from __future__ import unicode_literals
import collections
import imp
import os
@ -89,18 +91,22 @@ def find(name, path=(), parent=None):
return parent
fp, modpath, (suffix, mode, kind) = tup
if fp:
fp.close()
if parent and modpath == parent.path:
# 'from timeout import timeout', where 'timeout' is a function but also
# the name of the module being imported.
return None
if fp:
fp.close()
if kind == imp.PKG_DIRECTORY:
modpath = os.path.join(modpath, '__init__.py')
module = Module(head, modpath, kind, parent)
if tail:
# TODO: this code is entirely wrong on Python 3.x, but works well enough
# for Ansible. We need a new find_child() that only looks in the package
# directory, never falling back to the parent search path.
if tail and kind == imp.PKG_DIRECTORY:
return find_relative(module, tail, path)
return module

@ -35,6 +35,8 @@ files/modules known missing.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import json
import logging
import os
@ -43,6 +45,7 @@ import random
from ansible.executor import module_common
import ansible.errors
import ansible.module_utils
import mitogen.core
try:
from ansible.plugins.loader import module_loader
@ -71,11 +74,11 @@ def parse_script_interpreter(source):
"""
# Linux requires first 2 bytes with no whitespace, pretty sure it's the
# same everywhere. See binfmt_script.c.
if not source.startswith('#!'):
if not source.startswith(b'#!'):
return None, None
# Find terminating newline. Assume last byte of binprm_buf if absent.
nl = source.find('\n', 0, 128)
nl = source.find(b'\n', 0, 128)
if nl == -1:
nl = min(128, len(source))
@ -83,8 +86,8 @@ def parse_script_interpreter(source):
# bits just contains the interpreter filename.
bits = source[2:nl].strip().split(None, 1)
if len(bits) == 1:
return bits[0], None
return bits[0], bits[1]
return mitogen.core.to_text(bits[0]), None
return mitogen.core.to_text(bits[0]), mitogen.core.to_text(bits[1])
class Invocation(object):
@ -176,9 +179,11 @@ class Planner(object):
# named by `runner_name`.
}
"""
kwargs.setdefault('emulate_tty', True)
kwargs.setdefault('service_context', self._inv.connection.parent)
return kwargs
new = dict((mitogen.core.UnicodeType(k), kwargs[k])
for k in kwargs)
new.setdefault('emulate_tty', True)
new.setdefault('service_context', self._inv.connection.parent)
return new
def __repr__(self):
return '%s()' % (type(self).__name__,)
@ -202,7 +207,7 @@ class BinaryPlanner(Planner):
runner_name=self.runner_name,
module=self._inv.module_name,
path=self._inv.module_path,
args=self._inv.module_args,
json_args=json.dumps(self._inv.module_args),
env=self._inv.env,
**kwargs
)
@ -264,7 +269,7 @@ class WantJsonPlanner(ScriptPlanner):
runner_name = 'WantJsonRunner'
def detect(self):
return 'WANT_JSON' in self._inv.module_source
return b'WANT_JSON' in self._inv.module_source
class NewStylePlanner(ScriptPlanner):
@ -274,9 +279,10 @@ class NewStylePlanner(ScriptPlanner):
preprocessing the module.
"""
runner_name = 'NewStyleRunner'
marker = b'from ansible.module_utils.'
def detect(self):
return 'from ansible.module_utils.' in self._inv.module_source
return self.marker in self._inv.module_source
def _get_interpreter(self):
return None, None
@ -394,7 +400,7 @@ def get_module_data(name):
path = module_loader.find_plugin(name, '')
with open(path, 'rb') as fp:
source = fp.read()
return path, source
return mitogen.core.to_text(path), source
def _propagate_deps(invocation, planner, context):

@ -46,6 +46,8 @@ import mitogen.utils
import ansible_mitogen.logging
import ansible_mitogen.services
from mitogen.core import b
LOG = logging.getLogger(__name__)
@ -103,8 +105,8 @@ class MuxProcess(object):
cls.unix_listener_path = mitogen.unix.make_socket_path()
cls.worker_sock, cls.child_sock = socket.socketpair()
mitogen.core.set_cloexec(cls.worker_sock)
mitogen.core.set_cloexec(cls.child_sock)
mitogen.core.set_cloexec(cls.worker_sock.fileno())
mitogen.core.set_cloexec(cls.child_sock.fileno())
cls.child_pid = os.fork()
ansible_mitogen.logging.setup()
@ -129,7 +131,8 @@ class MuxProcess(object):
self._setup_services()
# Let the parent know our listening socket is ready.
mitogen.core.io_op(self.child_sock.send, '1')
mitogen.core.io_op(self.child_sock.send, b('1'))
self.child_sock.send(b('1'))
# Block until the socket is closed, which happens on parent exit.
mitogen.core.io_op(self.child_sock.recv, 1)
@ -178,7 +181,7 @@ class MuxProcess(object):
self.pool.stop(join=False)
try:
os.unlink(self.listener.path)
except OSError, e:
except OSError as e:
# Prevent a shutdown race with the parent process.
if e.args[0] != errno.ENOENT:
raise

@ -36,7 +36,8 @@ how to build arguments for it, preseed related data, etc.
"""
from __future__ import absolute_import
import cStringIO
from __future__ import unicode_literals
import ctypes
import errno
import imp
@ -50,6 +51,12 @@ import types
import mitogen.core
import ansible_mitogen.target # TODO: circular import
try:
# Cannot use cStringIO as it does not support Unicode.
from StringIO import StringIO
except ImportError:
from io import StringIO
try:
from shlex import quote as shlex_quote
except ImportError:
@ -71,6 +78,7 @@ for symbol in 'res_init', '__res_init':
except AttributeError:
pass
iteritems = getattr(dict, 'iteritems', dict.items)
LOG = logging.getLogger(__name__)
@ -78,7 +86,7 @@ def utf8(s):
"""
Coerce an object to bytes if it is Unicode.
"""
if isinstance(s, unicode):
if isinstance(s, mitogen.core.UnicodeType):
s = s.encode('utf-8')
return s
@ -110,9 +118,13 @@ class Runner(object):
:param mitogen.core.Context service_context:
Context to which we should direct FileService calls. For now, always
the connection multiplexer process on the controller.
:param dict args:
:param str json_args:
Ansible module arguments. A mixture of user and internal keys created
by :meth:`ansible.plugins.action.ActionBase._execute_module`.
This is passed as a string rather than a dict in order to mimic the
implicit bytes/str conversion behaviour of a 2.x controller running
against a 3.x target.
:param dict env:
Additional environment variables to set during the run.
@ -123,16 +135,13 @@ class Runner(object):
When :data:`True`, indicate the runner should detach the context from
its parent after setup has completed successfully.
"""
def __init__(self, module, service_context, args=None, env=None,
def __init__(self, module, service_context, json_args, env=None,
econtext=None, detach=False):
if args is None:
args = {}
self.module = utf8(module)
self.module = module
self.service_context = service_context
self.econtext = econtext
self.detach = detach
self.args = args
self.args = json.loads(json_args)
self.env = env
def setup(self):
@ -236,7 +245,7 @@ class ModuleUtilsImporter(object):
def load_module(self, fullname):
path, is_pkg = self._by_fullname[fullname]
source = ansible_mitogen.target.get_small_file(self._context, path)
code = compile(source, path, 'exec')
code = compile(source, path, 'exec', 0, 1)
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = "master:%s" % (path,)
mod.__loader__ = self
@ -254,7 +263,7 @@ class TemporaryEnvironment(object):
def __init__(self, env=None):
self.original = os.environ.copy()
self.env = env or {}
os.environ.update((k, str(v)) for k, v in self.env.iteritems())
os.environ.update((k, str(v)) for k, v in iteritems(self.env))
def revert(self):
os.environ.clear()
@ -278,14 +287,11 @@ class NewStyleStdio(object):
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
self.original_stdin = sys.stdin
sys.stdout = cStringIO.StringIO()
sys.stderr = cStringIO.StringIO()
ansible.module_utils.basic._ANSIBLE_ARGS = json.dumps({
'ANSIBLE_MODULE_ARGS': args
})
sys.stdin = cStringIO.StringIO(
ansible.module_utils.basic._ANSIBLE_ARGS
)
sys.stdout = StringIO()
sys.stderr = StringIO()
encoded = json.dumps({'ANSIBLE_MODULE_ARGS': args})
ansible.module_utils.basic._ANSIBLE_ARGS = utf8(encoded)
sys.stdin = StringIO(mitogen.core.to_text(encoded))
def revert(self):
sys.stdout = self.original_stdout
@ -309,7 +315,7 @@ class ProgramRunner(Runner):
def __init__(self, path, emulate_tty=None, **kwargs):
super(ProgramRunner, self).__init__(**kwargs)
self.emulate_tty = emulate_tty
self.path = utf8(path)
self.path = path
def setup(self):
super(ProgramRunner, self).setup()
@ -368,7 +374,7 @@ class ProgramRunner(Runner):
args=self._get_program_args(),
emulate_tty=self.emulate_tty,
)
except Exception, e:
except Exception as e:
LOG.exception('While running %s', self._get_program_args())
return {
'rc': 1,
@ -378,8 +384,8 @@ class ProgramRunner(Runner):
return {
'rc': rc,
'stdout': stdout,
'stderr': stderr
'stdout': mitogen.core.to_text(stdout),
'stderr': mitogen.core.to_text(stderr),
}
@ -398,7 +404,7 @@ class ArgsFileRunner(Runner):
suffix='-args',
dir=ansible_mitogen.target.temp_dir,
)
self.args_fp.write(self._get_args_contents())
self.args_fp.write(utf8(self._get_args_contents()))
self.args_fp.flush()
reopen_readonly(self.program_fp)
@ -449,17 +455,17 @@ class ScriptRunner(ProgramRunner):
if not self.interpreter:
return s
shebang = '#!' + utf8(self.interpreter)
shebang = b'#!' + utf8(self.interpreter)
if self.interpreter_arg:
shebang += ' ' + utf8(self.interpreter_arg)
shebang += b' ' + utf8(self.interpreter_arg)
new = [shebang]
if os.path.basename(self.interpreter).startswith('python'):
new.append(self.b_ENCODING_STRING)
_, _, rest = s.partition('\n')
_, _, rest = s.partition(b'\n')
new.append(rest)
return '\n'.join(new)
return b'\n'.join(new)
class NewStyleRunner(ScriptRunner):
@ -533,15 +539,20 @@ class NewStyleRunner(ScriptRunner):
except KeyError:
return self._code_by_path.setdefault(self.path, compile(
source=self.source,
filename=self.path,
filename="master:" + self.path,
mode='exec',
dont_inherit=True,
))
if mitogen.core.PY3:
main_module_name = '__main__'
else:
main_module_name = b'__main__'
def _run(self):
code = self._get_code()
mod = types.ModuleType('__main__')
mod = types.ModuleType(self.main_module_name)
mod.__package__ = None
# Some Ansible modules use __file__ to find the Ansiballz temporary
# directory. We must provide some temporary path in __file__, but we
@ -551,25 +562,28 @@ class NewStyleRunner(ScriptRunner):
ansible_mitogen.target.temp_dir,
'ansible_module_' + os.path.basename(self.path),
)
e = None
exc = None
try:
exec code in vars(mod)
except SystemExit, e:
pass
if mitogen.core.PY3:
exec(code, vars(mod))
else:
exec('exec code in vars(mod)')
except SystemExit as e:
exc = e
return {
'rc': e[0] if e else 2,
'stdout': sys.stdout.getvalue(),
'stderr': sys.stderr.getvalue(),
'rc': exc.args[0] if exc else 2,
'stdout': mitogen.core.to_text(sys.stdout.getvalue()),
'stderr': mitogen.core.to_text(sys.stderr.getvalue()),
}
class JsonArgsRunner(ScriptRunner):
JSON_ARGS = '<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>'
JSON_ARGS = b'<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>'
def _get_args_contents(self):
return json.dumps(self.args)
return json.dumps(self.args).encode()
def _rewrite_source(self, s):
return (

@ -38,6 +38,8 @@ when a child has completed a job.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import logging
import os
import os.path
@ -53,6 +55,20 @@ import ansible_mitogen.target
LOG = logging.getLogger(__name__)
if sys.version_info[0] == 3:
def reraise(tp, value, tb):
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
else:
exec(
"def reraise(tp, value, tb=None):\n"
" raise tp, value, tb\n"
)
class Error(Exception):
pass
@ -118,7 +134,7 @@ class ContextService(mitogen.service.Service):
while stack:
obj = stack.pop()
if isinstance(obj, dict):
stack.extend(sorted(obj.iteritems()))
stack.extend(sorted(obj.items()))
elif isinstance(obj, (list, tuple)):
stack.extend(obj)
else:
@ -351,8 +367,7 @@ class ContextService(mitogen.service.Service):
try:
result = self._wait_or_start(spec, via=via).get()
if isinstance(result, tuple): # exc_info()
e1, e2, e3 = result
raise e1, e2, e3
reraise(*result)
via = result['context']
except mitogen.core.StreamError as e:
return {
@ -390,10 +405,10 @@ class ModuleDepService(mitogen.service.Service):
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'module_name': basestring,
'module_path': basestring,
'module_name': mitogen.core.UnicodeType,
'module_path': mitogen.core.FsPathTypes,
'search_path': tuple,
'builtin_path': basestring,
'builtin_path': mitogen.core.FsPathTypes,
'context': mitogen.core.Context,
})
def scan(self, module_name, module_path, search_path, builtin_path, context):

@ -32,7 +32,10 @@ for file transfer, module execution and sundry bits like changing file modes.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import errno
import functools
import grp
import json
import logging
@ -147,7 +150,7 @@ def prune_tree(path):
try:
os.unlink(path)
return
except OSError, e:
except OSError as e:
if not (os.path.isdir(path) and
e.args[0] in (errno.EPERM, errno.EISDIR)):
LOG.error('prune_tree(%r): %s', path, e)
@ -157,7 +160,7 @@ def prune_tree(path):
# Ensure write access for readonly directories. Ignore error in case
# path is on a weird filesystem (e.g. vfat).
os.chmod(path, int('0700', 8))
except OSError, e:
except OSError as e:
LOG.warning('prune_tree(%r): %s', path, e)
try:
@ -165,7 +168,7 @@ def prune_tree(path):
if name not in ('.', '..'):
prune_tree(os.path.join(path, name))
os.rmdir(path)
except OSError, e:
except OSError as e:
LOG.error('prune_tree(%r): %s', path, e)
@ -229,7 +232,7 @@ def init_child(econtext):
return {
'fork_context': _fork_parent,
'home_dir': os.path.expanduser('~'),
'home_dir': mitogen.core.to_text(os.path.expanduser('~')),
}
@ -319,12 +322,7 @@ class AsyncRunner(object):
'econtext': self.econtext,
'emulate_tty': False,
})
dct = run_module(kwargs)
if mitogen.core.PY3:
for key in 'stdout', 'stderr':
dct[key] = dct[key].decode('utf-8', 'surrogateescape')
return dct
return run_module(kwargs)
def _parse_result(self, dct):
filtered, warnings = (
@ -465,7 +463,7 @@ def exec_args(args, in_data='', chdir=None, shell=None, emulate_tty=False):
stdout, stderr = proc.communicate(in_data)
if emulate_tty:
stdout = stdout.replace('\n', '\r\n')
stdout = stdout.replace(b'\n', b'\r\n')
return proc.returncode, stdout, stderr or ''
@ -481,7 +479,7 @@ def exec_command(cmd, in_data='', chdir=None, shell=None, emulate_tty=False):
:return:
(return code, stdout bytes, stderr bytes)
"""
assert isinstance(cmd, basestring)
assert isinstance(cmd, mitogen.core.UnicodeType)
return exec_args(
args=[get_user_shell(), '-c', cmd],
in_data=in_data,
@ -576,7 +574,7 @@ def apply_mode_spec(spec, mode):
mask = CHMOD_MASKS[ch]
bits = CHMOD_BITS[ch]
cur_perm_bits = mode & mask
new_perm_bits = reduce(operator.or_, (bits[p] for p in perms), 0)
new_perm_bits = functools.reduce(operator.or_, (bits[p] for p in perms), 0)
mode &= ~mask
if op == '=':
mode |= new_perm_bits

@ -5,7 +5,6 @@
Mitogen for Ansible
===================
An extension to `Ansible`_ is included that implements connections over
Mitogen, replacing embedded shell invocations with pure-Python equivalents
invoked via highly efficient remote procedure calls to persistent interpreters
@ -19,6 +18,7 @@ will ensure soundness.
.. _Bug reports: https://goo.gl/yLKZiJ
Overview
--------
@ -53,8 +53,8 @@ Installation
------------
1. Thoroughly review the documented behavioural differences.
2. Verify Ansible 2.3/2.4/2.5 and Python 2.7 are listed in ``ansible --version``
output.
2. Verify Ansible 2.3-2.5 and Python 2.6, 2.7 or 3.6 are listed in ``ansible
--version`` output.
3. Download and extract https://github.com/dw/mitogen/archive/stable.zip
4. Modify ``ansible.cfg``:
@ -71,7 +71,7 @@ Installation
5. If targets have a restrictive ``sudoers`` file, add a rule like:
.. code-block:: plain
::
deploy = (ALL) NOPASSWD:/usr/bin/python -c*

@ -485,7 +485,7 @@ Router Class
:param str python_path:
Path to the Python interpreter to use for bootstrap. Defaults to
``python2.7``. In future this may default to ``sys.executable``.
:data:`sys.executable`. For SSH, defaults to ``python``.
:param bool debug:
If :data:`True`, arrange for debug logging (:py:meth:`enable_debug`) to
@ -848,9 +848,9 @@ Context Class
try:
# Prints output once it is received.
msg = recv.get()
print msg.unpickle()
print(msg.unpickle())
except mitogen.core.CallError, e:
print 'Call failed:', str(e)
print('Call failed:', str(e))
Asynchronous calls may be dispatched in parallel to multiple
contexts and consumed as they complete using
@ -1038,11 +1038,11 @@ Select Class
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.select.Select(recvs):
print 'Got %s from %s' % (msg, msg.receiver)
print('Got %s from %s' % (msg, msg.receiver))
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
print('Received total %s from %s receivers' % (total, len(recvs)))
:py:class:`Select` may drive a long-running scheduler:
@ -1069,7 +1069,7 @@ Select Class
]
for msg in mitogen.select.Select(selects):
print msg.unpickle()
print(msg.unpickle())
.. py:classmethod:: all (it)
@ -1323,7 +1323,7 @@ A random assortment of utility functions useful on masters and children.
OS X bundles some ancient version of the :py:mod:`six` module.
.. currentmodule:: mitogen.utils
.. function:: log_to_file (path=None, io=False, usec=False, level='INFO')
.. function:: log_to_file (path=None, io=False, level='INFO')
Install a new :py:class:`logging.Handler` writing applications logs to the
filesystem. Useful when debugging slave IO problems.
@ -1339,10 +1339,6 @@ A random assortment of utility functions useful on masters and children.
If :data:`True`, include extremely verbose IO logs in the output.
Useful for debugging hangs, less useful for debugging application code.
:parm bool usec:
If :data:`True`, include microsecond timestamps. This greatly helps
when debugging races and similar determinism issues.
:param str level:
Name of the :py:mod:`logging` package constant that is the minimum
level to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``,

@ -193,7 +193,7 @@ nested.py:
context = None
for x in range(1, 11):
print 'Connect local%d via %s' % (x, context)
print('Connect local%d via %s' % (x, context))
context = router.local(via=context, name='local%d' % x)
context.call(os.system, 'pstree -s python -s mitogen')

@ -155,10 +155,6 @@ Logging Environment Variables
produces verbose records of any IO interaction, which is useful for
debugging hangs and deadlocks.
``MITOGEN_LOG_USEC``
If present, forces microsecond-level timestamps for any call to
:py:func:`mitogen.utils.log_to_file`.
Logging Records
@ -225,13 +221,13 @@ caller until the return value is available or an exception is raised::
>>> import os
>>> # Returns the current time.
>>> print 'Time in remote context:', local.call(time.time)
>>> print('Time in remote context:', local.call(time.time))
>>> try:
... # Raises OSError.
... local.call(os.chdir, '/nonexistent')
... except mitogen.core.CallError, e:
... print 'Call failed:', str(e)
... print('Call failed:', str(e))
It is a simple wrapper around the more flexible :meth:`Context.call_async`,
which immediately returns a :class:`Receiver <mitogen.core.Receiver>` wired up
@ -242,7 +238,7 @@ is called to block the thread until the result arrives::
>>> call = local.call_async(time.time)
>>> msg = call.get()
>>> print msg.unpickle()
>>> print(msg.unpickle())
1507292737.75547
@ -259,12 +255,12 @@ We must therefore continue by writing our code as a script::
import mitogen.utils
def my_first_function():
print 'Hello from remote context!'
print('Hello from remote context!')
return 123
def main(router):
local = router.local()
print local.call(my_first_function)
print(local.call(my_first_function))
if __name__ == '__main__':
mitogen.utils.log_to_file(main)
@ -292,7 +288,7 @@ without the need for writing asynchronous code::
calls = [context.call(my_func) for context in contexts]
for msg in mitogen.select.Select(calls):
print 'Reply from %s: %s' % (recv.context, data)
print('Reply from %s: %s' % (recv.context, data))
Running Code That May Hang

@ -135,7 +135,7 @@ configuration.
# myapp/__init__.py, myapp/mypkg/__init__.py, and myapp/mypkg/mymodule.py
# are transferred automatically.
print context.call(myapp.mymodule.my_function)
print(context.call(myapp.mymodule.my_function))
As the forwarder reuses the import mechanism, it should integrate cleanly with
any tool such as `py2exe`_ that correctly implement the protocols in PEP-302,
@ -323,10 +323,10 @@ available.
total = 0
for msg in Select(c.call_async(usage, '/tmp') for c in contexts):
usage = msg.unpickle()
print 'Context %s /tmp usage: %d' % (recv.context, usage)
print('Context %s /tmp usage: %d' % (recv.context, usage))
total += usage
print 'Total /tmp usage across all contexts: %d' % (total,)
print('Total /tmp usage across all contexts: %d' % (total,))
Single File Programs
@ -361,7 +361,7 @@ usual into the slave process.
def main(broker):
if len(sys.argv) != 2:
print __doc__
print(__doc__)
sys.exit(1)
context = mitogen.ssh.connect(broker, sys.argv[1])
@ -406,14 +406,12 @@ a large fleet of machines, or to alert the parent of unexpected state changes.
Compatibility
#############
The package is written using syntax compatible all the way back to **Python
2.4** released November 2004, making it suitable for managing a fleet of
potentially ancient corporate hardware. For example Mitogen can be used out of
the box against Red Hat Enterprise Linux 5, released in 2007.
Mitogen is syntax-compatible with **Python 2.4** released November 2004, making
it suitable for managing a fleet of potentially ancient corporate hardware,
such as Red Hat Enterprise Linux 5, released in 2007.
Support for Python 3 is included using 2to3 triggered during setup.py, however
such a Mitogen install does not support communicating with older 2.x systems. A
future revision will support full cross 2/3 compatibility.
Every combination of Python 3.x/2.x parent and child should be possible,
however at present only Python 2.6, 2.7 and 3.6 are tested automatically.
Zero Dependencies

@ -17,69 +17,8 @@ Latch Class
.. currentmodule:: mitogen.core
.. class:: Latch ()
.. autoclass:: Latch ()
A latch is a :py:class:`Queue.Queue`-like object that supports mutation and
waiting from multiple threads, however unlike :py:class:`Queue.Queue`,
waiting threads always remain interruptible, so CTRL+C always succeeds, and
waits where a timeout is set experience no wake up latency. These
properties are not possible in combination using the built-in threading
primitives available in Python 2.x.
Latches implement queues using the UNIX self-pipe trick, and a per-thread
:py:func:`socket.socketpair` that is lazily created the first time any
latch attempts to sleep on a thread, and dynamically associated with the
waiting Latch only for duration of the wait.
See :ref:`waking-sleeping-threads` for further discussion.
.. method:: empty ()
Return :py:data:`True` if calling :py:meth:`get` would block.
As with :py:class:`Queue.Queue`, :py:data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and
:py:meth:`get`.
As with :py:class:`Queue.Queue`, :py:data:`False` may be returned even
though a subsequent call to :py:meth:`get` will block, since another
waiting thread may be woken at any moment between :py:meth:`empty` and
:py:meth:`get`.
.. method:: get (timeout=None, block=True)
Return the next object enqueued on this latch, or sleep waiting for
one.
:param float timeout:
If not :py:data:`None`, specifies a timeout in seconds.
:param bool block:
If :py:data:`False`, immediately raise
:py:class:`mitogen.core.TimeoutError` if the latch is empty.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the object is no longer valid.
:raises mitogen.core.TimeoutError:
Timeout was reached.
:returns:
The de-queued object.
.. method:: put (obj)
Enqueue an object on this latch, waking the first thread that is asleep
waiting for a result, if one exists.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the object is no longer valid.
.. method:: close ()
Mark the latch as closed, and cause every sleeping thread to be woken,
with :py:class:`mitogen.core.LatchError` raised in each thread.
Side Class
@ -468,7 +407,7 @@ Helper Functions
List of submodule name suffixes.
.. currentmodule:: mitogen.parent
.. currentmodule:: mitogen.minify
.. autofunction:: minimize_source (source)

@ -119,10 +119,11 @@ Reference
.. autofunction:: mitogen.service.Service
.. autoclass:: mitogen.service.Service
:members:
.. autoclass:: mitogen.service.Invoker
.. autoclass:: mitogen.service.SerializedInvoker
.. autoclass:: mitogen.service.DeduplicatingInvoker
.. autoclass:: mitogen.service.DeduplicatingService
.. autoclass:: mitogen.service.Service
:members:
.. autoclass:: mitogen.service.Pool

@ -1,9 +1,13 @@
#!/usr/bin/env python
#
# pip install fusepy
#
# This implementation could improve a /lot/, but the core library is missing
# some functionality (#213) to make that easy. Additionally it needs a set of
# Python bindings for FUSE that stupidly require use of a thread pool.
from __future__ import absolute_import, division
from __future__ import unicode_literals
import errno
import logging
@ -23,13 +27,25 @@ import os
LOG = logging.getLogger(__name__)
def to_text(p):
"""
On 3.x, fusepy returns paths as bytes.
"""
if isinstance(p, bytes):
return p.decode('utf-8')
return p
def errno_wrap(modname, func, *args):
try:
return getattr(globals()[modname], func)(*args), None
except (IOError, OSError):
LOG.exception('While running %r(**%r)', func, args)
e = sys.exc_info()[1]
return None, errno.errorcode[e.args[0]]
if e.args[0] == errno.ENOENT:
LOG.error('%r(**%r): %s', func, args, e)
else:
LOG.exception('While running %r(**%r)', func, args)
return None, to_text(errno.errorcode[e.args[0]])
def errno_call(context, func, *args):
@ -128,14 +144,17 @@ class Operations(fuse.Operations): # fuse.LoggingMixIn,
return self._context
def chmod(self, path, mode):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, os.chmod, path, mode)
def chown(self, path, uid, gid):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, os.chown, path, uid, gid)
def create(self, path, mode):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, _create, path, mode) or 0
@ -156,10 +175,12 @@ class Operations(fuse.Operations): # fuse.LoggingMixIn,
return errno_call(self._context, _stat, path)
def mkdir(self, path, mode):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, os.mkdir, path, mode)
def read(self, path, size, offset, fh):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, _read, path, size, offset)
@ -176,41 +197,52 @@ class Operations(fuse.Operations): # fuse.LoggingMixIn,
return errno_call(self._context, os.readlink, path)
def rename(self, old, new):
old = old.decode(self.encoding)
new = new.decode(self.encoding)
return errno_call(self._context, os.rename, old, new)
# TODO return self.sftp.rename(old, self.root + new)
def rmdir(self, path):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, os.rmdir, path)
def symlink(self, target, source):
target = target.decode(self.encoding)
source = source.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, os.symlink, source, target)
def truncate(self, path, length, fh=None):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, _truncate, path, length)
def unlink(self, path):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, os.unlink, path)
def utimens(self, path, times=None):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, os.utime, path, times)
def write(self, path, data, offset, fh):
path = path.decode(self.encoding)
_evil_name(path)
return errno_call(self._context, _write, path, data, offset)
if __name__ == '__main__':
@mitogen.main(log_level='DEBUG')
def main(router):
if len(sys.argv) != 3:
print('usage: %s <host> <mountpoint>' % sys.argv[0])
sys.exit(1)
ops = Operations(sys.argv[1])
mount_point = sys.argv[2]
mitogen.utils.log_to_file(level='DEBUG')
blerp = fuse.FUSE(ops, mount_point, foreground=True)
blerp = fuse.FUSE(
operations=Operations(sys.argv[1]),
mountpoint=sys.argv[2],
foreground=True,
volname='%s (Mitogen)' % (sys.argv[1],),
)

@ -182,7 +182,7 @@ def main(router):
"""
argv = sys.argv[1:]
if not len(argv):
print 'mitop: Need a list of SSH hosts to connect to.'
print('mitop: Need a list of SSH hosts to connect to.')
sys.exit(1)
delay = 2.0
@ -193,7 +193,7 @@ def main(router):
# connection, a Receiver to accept messages from the host, and finally
# start child_main() on the host to pump messages into the receiver.
for hostname in argv:
print 'Starting on', hostname
print('Starting on', hostname)
host = Host()
host.name = hostname

@ -14,4 +14,4 @@ mitogen.utils.log_to_file()
router, parent = mitogen.unix.connect('/tmp/mitosock')
with router:
print mitogen.service.call(parent, CONNECT_BY_ID, {})
print(mitogen.service.call(parent, CONNECT_BY_ID, {}))

@ -92,7 +92,7 @@ def main(log_level='INFO', profiling=False):
@mitogen.main()
def main(router):
z = router.ssh(hostname='k3')
print z.call(get_url, 'https://example.org/')
print(z.call(get_url, 'https://example.org/')))))
"""

@ -1,117 +0,0 @@
"""Selected backports from Python stdlib collections module
"""
__all__ = [
'namedtuple',
]
from operator import itemgetter as _itemgetter
from keyword import iskeyword as _iskeyword
import sys as _sys
try:
all([])
except NameError:
def all(iterable):
for element in iterable:
if not element:
return False
return True
def namedtuple(typename, field_names, verbose=False):
"""Returns a new subclass of tuple with named fields.
>>> Point = namedtuple('Point', 'x y')
>>> Point.__doc__ # docstring for the new class
'Point(x, y)'
>>> p = Point(11, y=22) # instantiate with positional args or keywords
>>> p[0] + p[1] # indexable like a plain tuple
33
>>> x, y = p # unpack like a regular tuple
>>> x, y
(11, 22)
>>> p.x + p.y # fields also accessable by name
33
>>> d = p._asdict() # convert to a dictionary
>>> d['x']
11
>>> Point(**d) # convert from a dictionary
Point(x=11, y=22)
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
Point(x=100, y=22)
"""
# Parse and validate the field names. Validation serves two purposes,
# generating informative error messages and preventing template injection attacks.
if isinstance(field_names, basestring):
field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas
field_names = tuple(map(str, field_names))
for name in (typename,) + field_names:
if not all(c.isalnum() or c=='_' for c in name):
raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name)
if _iskeyword(name):
raise ValueError('Type names and field names cannot be a keyword: %r' % name)
if name[0].isdigit():
raise ValueError('Type names and field names cannot start with a number: %r' % name)
seen_names = set()
for name in field_names:
if name.startswith('_'):
raise ValueError('Field names cannot start with an underscore: %r' % name)
if name in seen_names:
raise ValueError('Encountered duplicate field name: %r' % name)
seen_names.add(name)
# Create and fill-in the class template
numfields = len(field_names)
argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes
reprtxt = ', '.join('%s=%%r' % name for name in field_names)
dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names))
template = '''class %(typename)s(tuple):
'%(typename)s(%(argtxt)s)' \n
__slots__ = () \n
_fields = %(field_names)r \n
def __new__(_cls, %(argtxt)s):
return _tuple.__new__(_cls, (%(argtxt)s)) \n
@classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new %(typename)s object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != %(numfields)d:
raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result))
return result \n
def __repr__(self):
return '%(typename)s(%(reprtxt)s)' %% self \n
def _asdict(t):
'Return a new dict which maps field names to their values'
return {%(dicttxt)s} \n
def _replace(_self, **kwds):
'Return a new %(typename)s object replacing specified fields with new values'
result = _self._make(map(kwds.pop, %(field_names)r, _self))
if kwds:
raise ValueError('Got unexpected field names: %%r' %% kwds.keys())
return result \n
def __getnewargs__(self):
return tuple(self) \n\n''' % locals()
for i, name in enumerate(field_names):
template += ' %s = _property(_itemgetter(%d))\n' % (name, i)
if verbose:
print template
# Execute the template string in a temporary namespace and
# support tracing utilities by setting a value for frame.f_globals['__name__']
namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename,
_property=property, _tuple=tuple)
try:
exec template in namespace
except SyntaxError, e:
raise SyntaxError(e.message + ':\n' + template)
result = namespace[typename]
# For pickling to work, the __module__ variable needs to be set to the frame
# where the named tuple is created. Bypass this step in enviroments where
# sys._getframe is not defined (Jython for example).
if hasattr(_sys, '_getframe'):
result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__')
return result

@ -11,10 +11,6 @@ __all__ = [
'lru_cache',
]
try:
from collections import namedtuple
except ImportError:
from mitogen.compat.collections import namedtuple
from threading import RLock
@ -102,8 +98,6 @@ def partial(func, *args, **keywords):
### LRU Cache function decorator
################################################################################
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
class _HashedSeq(list):
""" This class guarantees that hash() will be called no more than once
per element. This is important because the lru_cache() will hash
@ -170,12 +164,12 @@ def lru_cache(maxsize=128, typed=False):
raise TypeError('Expected maxsize to be an integer or None')
def decorating_function(user_function):
wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
wrapper = _lru_cache_wrapper(user_function, maxsize, typed)
return update_wrapper(wrapper, user_function)
return decorating_function
def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
def _lru_cache_wrapper(user_function, maxsize, typed):
# Constants shared by all lru cache instances:
sentinel = object() # unique object used to signal cache misses
make_key = _make_key # build a key from the function arguments
@ -277,14 +271,6 @@ def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
lock.release()
return result
def cache_info():
"""Report cache statistics"""
lock.acquire()
try:
return _CacheInfo(hits, misses, maxsize, cache.__len__())
finally:
lock.release()
def cache_clear():
"""Clear the cache and cache statistics"""
lock.acquire()
@ -298,6 +284,5 @@ def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
finally:
lock.release()
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return wrapper

@ -33,7 +33,6 @@ from token import *
import token
__all__ = [x for x in dir(token) if not x.startswith("_")]
__all__ += ["COMMENT", "tokenize", "generate_tokens", "NL", "untokenize"]
del x
del token
COMMENT = N_TOKENS
@ -150,8 +149,8 @@ class StopTokenizing(Exception): pass
def printtoken(type, token, srow_scol, erow_ecol, line): # for testing
srow, scol = srow_scol
erow, ecol = erow_ecol
print "%d,%d-%d,%d:\t%s\t%s" % \
(srow, scol, erow, ecol, tok_name[type], repr(token))
print("%d,%d-%d,%d:\t%s\t%s" % \
(srow, scol, erow, ecol, tok_name[type], repr(token)))
def tokenize(readline, tokeneater=printtoken):
"""
@ -316,7 +315,7 @@ def generate_tokens(readline):
if contstr: # continued string
if not line:
raise TokenError, ("EOF in multi-line string", strstart)
raise TokenError("EOF in multi-line string", strstart)
endmatch = endprog.match(line)
if endmatch:
pos = end = endmatch.end(0)
@ -377,7 +376,7 @@ def generate_tokens(readline):
else: # continued statement
if not line:
raise TokenError, ("EOF in multi-line statement", (lnum, 0))
raise TokenError("EOF in multi-line statement", (lnum, 0))
continued = 0
while pos < max:

@ -33,6 +33,7 @@ bootstrap implementation sent to every new slave context.
"""
import collections
import encodings.latin_1
import errno
import fcntl
import imp
@ -54,9 +55,9 @@ import zlib
select = __import__('select')
try:
import cPickle
import cPickle as pickle
except ImportError:
import pickle as cPickle
import pickle
try:
from cStringIO import StringIO as BytesIO
@ -72,6 +73,8 @@ LOG = logging.getLogger('mitogen')
IOLOG = logging.getLogger('mitogen.io')
IOLOG.setLevel(logging.INFO)
LATIN1_CODEC = encodings.latin_1.Codec()
_v = False
_vv = False
@ -95,14 +98,24 @@ except NameError:
PY3 = sys.version_info > (3,)
if PY3:
b = lambda s: s.encode('latin-1')
b = str.encode
BytesType = bytes
UnicodeType = unicode
UnicodeType = str
FsPathTypes = (str,)
BufferType = lambda buf, start: memoryview(buf)[start:]
long = int
else:
b = str
BytesType = str
FsPathTypes = (str, unicode)
BufferType = buffer
UnicodeType = unicode
AnyTextType = (BytesType, UnicodeType)
if sys.version_info < (2, 5):
next = lambda it: it.next()
CHUNK_SIZE = 131072
_tls = threading.local()
@ -121,6 +134,8 @@ class Error(Exception):
def __init__(self, fmt=None, *args):
if args:
fmt %= args
if fmt and not isinstance(fmt, UnicodeType):
fmt = fmt.decode('utf-8')
Exception.__init__(self, fmt)
@ -140,13 +155,31 @@ class Secret(UnicodeType):
def __repr__(self):
return '[secret]'
def __str__(self):
return UnicodeType(self)
if not PY3:
# TODO: what is this needed for in 2.x?
def __str__(self):
return UnicodeType(self)
def __reduce__(self):
return (Secret, (UnicodeType(self),))
class Kwargs(dict):
if PY3:
def __init__(self, dct):
for k, v in dct.items():
if type(k) is bytes:
self[k.decode()] = v
else:
self[k] = v
def __repr__(self):
return 'Kwargs(%s)' % (dict.__repr__(self),)
def __reduce__(self):
return (Kwargs, (dict(self),))
class CallError(Error):
def __init__(self, fmt=None, *args):
if not isinstance(fmt, BaseException):
@ -162,11 +195,11 @@ class CallError(Error):
Error.__init__(self, fmt)
def __reduce__(self):
return (_unpickle_call_error, (self[0],))
return (_unpickle_call_error, (self.args[0],))
def _unpickle_call_error(s):
if not (type(s) is str and len(s) < 10000):
if not (type(s) is UnicodeType and len(s) < 10000):
raise TypeError('cannot unpickle CallError: bad input')
inst = CallError.__new__(CallError)
Exception.__init__(inst, s)
@ -186,6 +219,14 @@ class TimeoutError(Error):
pass
def to_text(o):
if isinstance(o, UnicodeType):
return UnicodeType(o)
if isinstance(o, BytesType):
return o.decode('utf-8')
return UnicodeType(o)
def has_parent_authority(msg, _stream=None):
return (msg.auth_id == mitogen.context_id or
msg.auth_id in mitogen.parent_ids)
@ -247,9 +288,9 @@ def io_op(func, *args):
except (select.error, OSError, IOError):
e = sys.exc_info()[1]
_vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
if e[0] == errno.EINTR:
if e.args[0] == errno.EINTR:
continue
if e[0] in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
if e.args[0] in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
return None, True
raise
@ -326,13 +367,25 @@ def import_module(modname):
return __import__(modname, None, None, [''])
if PY3:
# In 3.x Unpickler is a class exposing find_class as an overridable, but it
# cannot be overridden without subclassing.
class _Unpickler(pickle.Unpickler):
def find_class(self, module, func):
return self.find_global(module, func)
else:
# In 2.x Unpickler is a function exposing a writeable find_global
# attribute.
_Unpickler = pickle.Unpickler
class Message(object):
dst_id = None
src_id = None
auth_id = None
handle = None
reply_to = None
data = ''
data = b('')
_unpickled = object()
router = None
@ -342,7 +395,7 @@ class Message(object):
self.src_id = mitogen.context_id
self.auth_id = mitogen.context_id
vars(self).update(kwargs)
assert isinstance(self.data, str)
assert isinstance(self.data, BytesType)
def _unpickle_context(self, context_id, name):
return _unpickle_context(self.router, context_id, name)
@ -350,6 +403,10 @@ class Message(object):
def _unpickle_sender(self, context_id, dst_handle):
return _unpickle_sender(self.router, context_id, dst_handle)
def _unpickle_bytes(self, s, encoding):
s, n = LATIN1_CODEC.encode(s)
return s
def _find_global(self, module, func):
"""Return the class implementing `module_name.class_name` or raise
`StreamError` if the module is not whitelisted."""
@ -364,6 +421,12 @@ class Message(object):
return Blob
elif func == 'Secret':
return Secret
elif func == 'Kwargs':
return Kwargs
elif module == '_codecs' and func == 'encode':
return self._unpickle_bytes
elif module == '__builtin__' and func == 'bytes':
return BytesType
raise StreamError('cannot unpickle %r/%r', module, func)
@property
@ -378,10 +441,10 @@ class Message(object):
def pickled(cls, obj, **kwargs):
self = cls(**kwargs)
try:
self.data = cPickle.dumps(obj, protocol=2)
except cPickle.PicklingError:
self.data = pickle.dumps(obj, protocol=2)
except pickle.PicklingError:
e = sys.exc_info()[1]
self.data = cPickle.dumps(CallError(e), protocol=2)
self.data = pickle.dumps(CallError(e), protocol=2)
return self
def reply(self, msg, router=None, **kwargs):
@ -395,6 +458,11 @@ class Message(object):
else:
LOG.debug('Message.reply(): discarding due to zero handle: %r', msg)
if PY3:
UNPICKLER_KWARGS = {'encoding': 'bytes'}
else:
UNPICKLER_KWARGS = {}
def unpickle(self, throw=True, throw_dead=True):
"""Deserialize `data` into an object."""
_vv and IOLOG.debug('%r.unpickle()', self)
@ -404,12 +472,8 @@ class Message(object):
obj = self._unpickled
if obj is Message._unpickled:
fp = BytesIO(self.data)
unpickler = cPickle.Unpickler(fp)
try:
unpickler.find_global = self._find_global
except AttributeError:
unpickler.find_class = self._find_global
unpickler = _Unpickler(fp, **self.UNPICKLER_KWARGS)
unpickler.find_global = self._find_global
try:
# Must occur off the broker thread.
obj = unpickler.load()
@ -513,12 +577,10 @@ class Receiver(object):
def __iter__(self):
while True:
try:
msg = self.get()
msg.unpickle() # Cause .remote_msg to be thrown.
yield msg
except ChannelError:
msg = self.get(throw_dead=False)
if msg.is_dead:
return
yield msg
class Channel(Sender, Receiver):
@ -573,6 +635,8 @@ class Importer(object):
# but very unlikely to trigger a bug report.
'org',
]
if PY3:
self.blacklist += ['cStringIO']
# Presence of an entry in this map indicates in-flight GET_MODULE.
self._callbacks = {}
@ -619,22 +683,18 @@ class Importer(object):
return None
_tls.running = True
# TODO: hack: this is papering over a bug elsewhere.
fullname = fullname.rstrip('.')
try:
_v and LOG.debug('%r.find_module(%r)', self, fullname)
pkgname, dot, _ = fullname.rpartition('.')
_vv and IOLOG.debug('%r.find_module(%r)', self, fullname)
suffix = fullname[len(pkgname+dot):]
if suffix not in self._present.get(pkgname, (suffix,)):
_v and LOG.debug('%r: master doesn\'t know %r', self, fullname)
pkg = sys.modules.get(pkgname)
if pkgname and getattr(pkg, '__loader__', None) is not self:
LOG.debug('%r: %r is submodule of a package we did not load',
self, fullname)
return None
pkg = sys.modules.get(pkgname)
if pkg and getattr(pkg, '__loader__', None) is not self:
_vv and IOLOG.debug(
'%r: %r is submodule of a package we did not load',
self, fullname
)
suffix = fullname[len(pkgname+dot):]
if pkgname and suffix not in self._present.get(pkgname, ()):
LOG.debug('%r: master doesn\'t know %r', self, fullname)
return None
# #114: explicitly whitelisted prefixes override any
@ -707,7 +767,9 @@ class Importer(object):
else:
_v and LOG.debug('_request_module(%r): new request', fullname)
self._callbacks[fullname] = [callback]
self._context.send(Message(data=fullname, handle=GET_MODULE))
self._context.send(
Message(data=b(fullname), handle=GET_MODULE)
)
finally:
self._lock.release()
@ -715,9 +777,8 @@ class Importer(object):
callback()
def load_module(self, fullname):
fullname = to_text(fullname)
_v and LOG.debug('Importer.load_module(%r)', fullname)
# TODO: hack: this is papering over a bug elsewhere.
fullname = fullname.rstrip('.')
self._refuse_imports(fullname)
event = threading.Event()
@ -739,14 +800,12 @@ class Importer(object):
else:
mod.__package__ = fullname.rpartition('.')[0] or None
# TODO: monster hack: work around modules now being imported as their
# actual name, so when Ansible "apt.py" tries to "import apt", it gets
# itself. Instead force absolute imports during compilation.
flags = 0
if fullname.startswith('ansible'):
flags = 0x4000
if mod.__package__ and not PY3:
# 2.x requires __package__ to be exactly a string.
mod.__package__ = mod.__package__.encode()
source = self.get_source(fullname)
code = compile(source, mod.__file__, 'exec', flags, True)
code = compile(source, mod.__file__, 'exec', 0, 1)
if PY3:
exec(code, vars(mod))
else:
@ -755,11 +814,14 @@ class Importer(object):
def get_filename(self, fullname):
if fullname in self._cache:
return 'master:' + self._cache[fullname][2]
return u'master:' + self._cache[fullname][2]
def get_source(self, fullname):
if fullname in self._cache:
return zlib.decompress(self._cache[fullname][3])
source = zlib.decompress(self._cache[fullname][3])
if PY3:
return to_text(source)
return source
class LogHandler(logging.Handler):
@ -777,7 +839,7 @@ class LogHandler(logging.Handler):
try:
msg = self.format(rec)
encoded = '%s\x00%s\x00%s' % (rec.name, rec.levelno, msg)
if isinstance(encoded, unicode):
if isinstance(encoded, UnicodeType):
# Logging package emits both :(
encoded = encoded.encode('utf-8')
self.context.send(Message(data=encoded, handle=FORWARD_LOG))
@ -809,17 +871,23 @@ class Side(object):
def close(self):
if not self.closed:
_vv and IOLOG.debug('%r.close()', self)
os.close(self.fd)
self.closed = True
os.close(self.fd)
def read(self, n=CHUNK_SIZE):
if self.closed:
# Refuse to touch the handle after closed, it may have been reused
# by another thread. TODO: synchronize read()/write()/close().
return b('')
s, disconnected = io_op(os.read, self.fd, n)
if disconnected:
return ''
return b('')
return s
def write(self, s):
if self.fd is None:
if self.closed or self.fd is None:
# Refuse to touch the handle after closed, it may have been reused
# by another thread.
return None
written, disconnected = io_op(os.write, self.fd, s)
@ -865,7 +933,7 @@ class Stream(BasicStream):
def __init__(self, router, remote_id, **kwargs):
self._router = router
self.remote_id = remote_id
self.name = 'default'
self.name = u'default'
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.construct(**kwargs)
self._input_buf = collections.deque()
@ -935,7 +1003,7 @@ class Stream(BasicStream):
prev_start = start
start = 0
msg.data = ''.join(bits)
msg.data = b('').join(bits)
self._input_buf.appendleft(buf[prev_start+len(bit):])
self._input_buf_len -= total_len
self._router._async_route(msg, self)
@ -956,7 +1024,7 @@ class Stream(BasicStream):
self.on_disconnect(broker)
return
elif written != len(buf):
self._output_buf.appendleft(buffer(buf, written))
self._output_buf.appendleft(BufferType(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
self._output_buf_len -= written
@ -1002,7 +1070,10 @@ class Context(object):
self.name = name
def __reduce__(self):
return _unpickle_context, (self.context_id, self.name)
name = self.name
if name and not isinstance(name, UnicodeType):
name = UnicodeType(name, 'utf-8')
return _unpickle_context, (self.context_id, name)
def on_disconnect(self):
_v and LOG.debug('%r.on_disconnect()', self)
@ -1023,9 +1094,11 @@ class Context(object):
def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('%r.call_service_async(%r, %r, %r)',
self, service_name, method_name, kwargs)
if not isinstance(service_name, basestring):
if isinstance(service_name, BytesType):
service_name = service_name.encode('utf-8')
elif not isinstance(service_name, UnicodeType):
service_name = service_name.name() # Service.name()
tup = (service_name, method_name, kwargs)
tup = (service_name, to_text(method_name), Kwargs(kwargs))
msg = Message.pickled(tup, handle=CALL_SERVICE)
return self.send_async(msg)
@ -1055,7 +1128,7 @@ def _unpickle_context(router, context_id, name):
if not (isinstance(router, Router) and
isinstance(context_id, (int, long)) and context_id >= 0 and (
(name is None) or
(isinstance(name, basestring) and len(name) < 100))
(isinstance(name, UnicodeType) and len(name) < 100))
):
raise TypeError('cannot unpickle Context: bad input')
return router.context_class(router, context_id, name)
@ -1110,48 +1183,133 @@ class Poller(object):
class Latch(object):
"""
A latch is a :py:class:`Queue.Queue`-like object that supports mutation and
waiting from multiple threads, however unlike :py:class:`Queue.Queue`,
waiting threads always remain interruptible, so CTRL+C always succeeds, and
waits where a timeout is set experience no wake up latency. These
properties are not possible in combination using the built-in threading
primitives available in Python 2.x.
Latches implement queues using the UNIX self-pipe trick, and a per-thread
:py:func:`socket.socketpair` that is lazily created the first time any
latch attempts to sleep on a thread, and dynamically associated with the
waiting Latch only for duration of the wait.
See :ref:`waking-sleeping-threads` for further discussion.
"""
poller_class = Poller
closed = False
_waking = 0
_sockets = []
_allsockets = []
# The _cls_ prefixes here are to make it crystal clear in the code which
# state mutation isn't covered by :attr:`_lock`.
#: List of reusable :func:`socket.socketpair` tuples. The list is from
#: multiple threads, the only safe operations are `append()` and `pop()`.
_cls_idle_socketpairs = []
#: List of every socket object that must be closed by :meth:`_on_fork`.
#: Inherited descriptors cannot be reused, as the duplicated handles
#: reference the same underlying kernel-side sockets still in use by
#: the parent process.
_cls_all_sockets = []
def __init__(self):
self.closed = False
self._lock = threading.Lock()
#: List of unconsumed enqueued items.
self._queue = []
#: List of `(wsock, cookie)` awaiting an element, where `wsock` is the
#: socketpair's write side, and `cookie` is the string to write.
self._sleeping = []
#: Number of elements of :attr:`_sleeping` that have already been
#: woken, and have a corresponding element index from :attr:`_queue`
#: assigned to them.
self._waking = 0
@classmethod
def _on_fork(cls):
cls._sockets = []
while cls._allsockets:
cls._allsockets.pop().close()
"""
Clean up any files belonging to the parent process after a fork.
"""
cls._cls_idle_socketpairs = []
while cls._cls_all_sockets:
cls._cls_all_sockets.pop().close()
def close(self):
"""
Mark the latch as closed, and cause every sleeping thread to be woken,
with :py:class:`mitogen.core.LatchError` raised in each thread.
"""
self._lock.acquire()
try:
self.closed = True
while self._waking < len(self._sleeping):
self._wake(self._sleeping[self._waking])
wsock, cookie = self._sleeping[self._waking]
self._wake(wsock, cookie)
self._waking += 1
finally:
self._lock.release()
def empty(self):
"""
Return :py:data:`True` if calling :py:meth:`get` would block.
As with :py:class:`Queue.Queue`, :py:data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and
:py:meth:`get`.
As with :py:class:`Queue.Queue`, :py:data:`False` may be returned even
though a subsequent call to :py:meth:`get` will block, since another
waiting thread may be woken at any moment between :py:meth:`empty` and
:py:meth:`get`.
"""
return len(self._queue) == 0
def _tls_init(self):
# pop() must be atomic, which is true for GIL-equipped interpreters.
def _get_socketpair(self):
"""
Return an unused socketpair, creating one if none exist.
"""
try:
return self._sockets.pop()
return self._cls_idle_socketpairs.pop() # pop() must be atomic
except IndexError:
rsock, wsock = socket.socketpair()
set_cloexec(rsock.fileno())
set_cloexec(wsock.fileno())
self._allsockets.extend((rsock, wsock))
self._cls_all_sockets.extend((rsock, wsock))
return rsock, wsock
COOKIE_SIZE = 33
def _make_cookie(self):
"""
Return a 33-byte string encoding the ID of the instance and the current
thread. This disambiguates legitimate wake-ups, accidental writes to
the FD, and buggy internal FD sharing.
"""
ident = threading.currentThread().ident
return b(u'%016x-%016x' % (int(id(self)), ident))
def get(self, timeout=None, block=True):
"""
Return the next enqueued object, or sleep waiting for one.
:param float timeout:
If not :py:data:`None`, specifies a timeout in seconds.
:param bool block:
If :py:data:`False`, immediately raise
:py:class:`mitogen.core.TimeoutError` if the latch is empty.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the object is no longer valid.
:raises mitogen.core.TimeoutError:
Timeout was reached.
:returns:
The de-queued object.
"""
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)',
self, timeout, block)
self._lock.acquire()
@ -1164,39 +1322,54 @@ class Latch(object):
return self._queue.pop(i)
if not block:
raise TimeoutError()
rsock, wsock = self._tls_init()
self._sleeping.append(wsock)
rsock, wsock = self._get_socketpair()
cookie = self._make_cookie()
self._sleeping.append((wsock, cookie))
finally:
self._lock.release()
poller = self.poller_class()
poller.start_receive(rsock.fileno())
try:
return self._get_sleep(poller, timeout, block, rsock, wsock)
return self._get_sleep(poller, timeout, block, rsock, wsock, cookie)
finally:
poller.close()
def _get_sleep(self, poller, timeout, block, rsock, wsock):
_vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)',
self, timeout, block)
def _get_sleep(self, poller, timeout, block, rsock, wsock, cookie):
"""
When a result is not immediately available, sleep waiting for
:meth:`put` to write a byte to our socket pair.
"""
_vv and IOLOG.debug(
'%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)',
self, timeout, block, rsock.fileno(), wsock.fileno()
)
e = None
woken = None
try:
list(poller.poll(timeout))
woken = list(poller.poll(timeout))
except Exception:
e = sys.exc_info()[1]
self._lock.acquire()
try:
i = self._sleeping.index(wsock)
i = self._sleeping.index((wsock, cookie))
del self._sleeping[i]
self._sockets.append((rsock, wsock))
if i >= self._waking:
if not woken:
raise e or TimeoutError()
got_cookie = rsock.recv(self.COOKIE_SIZE)
self._cls_idle_socketpairs.append((rsock, wsock))
assert cookie == got_cookie, (
"Cookie incorrect; got %r, expected %r" \
% (got_cookie, cookie)
)
assert i < self._waking, (
"Cookie correct, but no queue element assigned."
)
self._waking -= 1
if rsock.recv(2) != '\x7f':
raise LatchError('internal error: received >1 wakeups')
if e:
raise e
if self.closed:
raise LatchError()
_vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[i])
@ -1205,6 +1378,13 @@ class Latch(object):
self._lock.release()
def put(self, obj):
"""
Enqueue an object, waking the first thread waiting for a result, if one
exists.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the object is no longer valid.
"""
_vv and IOLOG.debug('%r.put(%r)', self, obj)
self._lock.acquire()
try:
@ -1213,20 +1393,20 @@ class Latch(object):
self._queue.append(obj)
if self._waking < len(self._sleeping):
sock = self._sleeping[self._waking]
wsock, cookie = self._sleeping[self._waking]
self._waking += 1
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, sock.fileno())
self._wake(sock)
self, wsock.fileno())
self._wake(wsock, cookie)
finally:
self._lock.release()
def _wake(self, sock):
def _wake(self, wsock, cookie):
try:
os.write(sock.fileno(), '\x7f')
os.write(wsock.fileno(), cookie)
except OSError:
e = sys.exc_info()[1]
if e[0] != errno.EBADF:
if e.args[0] != errno.EBADF:
raise
def __repr__(self):
@ -1319,7 +1499,7 @@ class Waker(BasicStream):
self.transmit_side.write(b(' '))
except OSError:
e = sys.exc_info()[1]
if e[0] != errno.EBADF:
if e.args[0] != errno.EBADF:
raise
@ -1363,7 +1543,7 @@ class IoLogger(BasicStream):
if not buf:
return self.on_disconnect(broker)
self._buf += buf
self._buf += buf.decode('latin1')
self._log_lines()
@ -1420,7 +1600,7 @@ class Router(object):
def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None):
handle = handle or self._last_handle.next()
handle = handle or next(self._last_handle)
_vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
if respondent:
@ -1761,12 +1941,11 @@ class ExternalContext(object):
else:
core_src_fd = self.config.get('core_src_fd', 101)
if core_src_fd:
fp = os.fdopen(core_src_fd, 'r', 1)
fp = os.fdopen(core_src_fd, 'rb', 1)
try:
core_size = int(fp.readline())
core_src = fp.read(core_size)
core_src = fp.read()
# Strip "ExternalContext.main()" call from last line.
core_src = '\n'.join(core_src.splitlines()[:-1])
core_src = b('\n').join(core_src.splitlines()[:-1])
finally:
fp.close()
else:

@ -59,7 +59,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'docker.' + (self.container or self.image)
self.name = u'docker.' + (self.container or self.image)
def get_boot_command(self):
args = ['--interactive']

@ -299,7 +299,7 @@ def exit():
def die(msg, *args):
if args:
msg %= args
print msg
sys.stderr.write('%s\n' % (msg,))
exit()
@ -419,12 +419,12 @@ def run(dest, router, args, deadline=None, econtext=None):
context_id = router.allocate_id()
fakessh = mitogen.parent.Context(router, context_id)
fakessh.name = 'fakessh.%d' % (context_id,)
fakessh.name = u'fakessh.%d' % (context_id,)
sock1, sock2 = socket.socketpair()
stream = mitogen.core.Stream(router, context_id)
stream.name = 'fakessh'
stream.name = u'fakessh'
stream.accept(sock1.fileno(), sock1.fileno())
router.register(fakessh, stream)
@ -445,7 +445,7 @@ def run(dest, router, args, deadline=None, econtext=None):
finally:
fp.close()
os.chmod(ssh_path, 0755)
os.chmod(ssh_path, int('0755', 8))
env = os.environ.copy()
env.update({
'PATH': '%s:%s' % (tmp_path, env.get('PATH', '')),

@ -45,7 +45,7 @@ def fixup_prngs():
Add 256 bits of /dev/urandom to OpenSSL's PRNG in the child, and re-seed
the random package with the same data.
"""
s = os.urandom(256 / 8)
s = os.urandom(256 // 8)
random.seed(s)
if 'ssl' in sys.modules:
sys.modules['ssl'].RAND_add(s, 75.0)
@ -112,7 +112,7 @@ class Stream(mitogen.parent.Stream):
if isinstance(responder, mitogen.parent.ModuleForwarder):
self.importer = responder.importer
name_prefix = 'fork'
name_prefix = u'fork'
def start_child(self):
parentfp, childfp = mitogen.parent.create_socketpair()

@ -54,7 +54,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'jail.' + self.container
self.name = u'jail.' + self.container
def get_boot_command(self):
bits = [self.jexec_path]

@ -56,7 +56,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'lxc.' + self.container
self.name = u'lxc.' + self.container
def get_boot_command(self):
bits = [

@ -56,8 +56,14 @@ import mitogen
import mitogen.core
import mitogen.minify
import mitogen.parent
from mitogen.core import IOLOG
from mitogen.core import b
from mitogen.core import to_text
from mitogen.core import LOG
from mitogen.core import IOLOG
imap = getattr(itertools, 'imap', map)
izip = getattr(itertools, 'izip', zip)
RLOG = logging.getLogger('mitogen.ctx')
@ -79,7 +85,7 @@ def _stdlib_paths():
def get_child_modules(path):
it = pkgutil.iter_modules([os.path.dirname(path)])
return [name for _, name, _ in it]
return [to_text(name) for _, name, _ in it]
def get_core_source():
@ -99,6 +105,33 @@ LOAD_CONST = dis.opname.index('LOAD_CONST')
IMPORT_NAME = dis.opname.index('IMPORT_NAME')
if sys.version_info < (3, 0):
def iter_opcodes(co):
# Yield `(op, oparg)` tuples from the code object `co`.
ordit = imap(ord, co.co_code)
nextb = ordit.next
return ((c, (None
if c < dis.HAVE_ARGUMENT else
(nextb() | (nextb() << 8))))
for c in ordit)
elif sys.version_info < (3, 6):
def iter_opcodes(co):
# Yield `(op, oparg)` tuples from the code object `co`.
ordit = iter(co.co_code)
nextb = ordit.__next__
return ((c, (None
if c < dis.HAVE_ARGUMENT else
(nextb() | (nextb() << 8))))
for c in ordit)
else:
def iter_opcodes(co):
# Yield `(op, oparg)` tuples from the code object `co`.
ordit = iter(co.co_code)
nextb = ordit.__next__
# https://github.com/abarnert/cpython/blob/c095a32f/Python/wordcode.md
return ((c, nextb()) for c in ordit)
def scan_code_imports(co):
"""Given a code object `co`, scan its bytecode yielding any
``IMPORT_NAME`` and associated prior ``LOAD_CONST`` instructions
@ -114,15 +147,7 @@ def scan_code_imports(co):
* `namelist`: for `ImportFrom`, the list of names to be imported from
`modname`.
"""
# Yield `(op, oparg)` tuples from the code object `co`.
ordit = itertools.imap(ord, co.co_code)
nextb = ordit.next
opit = ((c, (None
if c < dis.HAVE_ARGUMENT else
(nextb() | (nextb() << 8))))
for c in ordit)
opit = iter_opcodes(co)
opit, opit2, opit3 = itertools.tee(opit, 3)
try:
next(opit2)
@ -131,7 +156,7 @@ def scan_code_imports(co):
except StopIteration:
return
for oparg1, oparg2, (op3, arg3) in itertools.izip(opit, opit2, opit3):
for oparg1, oparg2, (op3, arg3) in izip(opit, opit2, opit3):
if op3 == IMPORT_NAME:
op2, arg2 = oparg2
op1, arg1 = oparg1
@ -227,7 +252,7 @@ class LogForwarder(object):
name = '%s.%s' % (RLOG.name, context.name)
self._cache[msg.src_id] = logger = logging.getLogger(name)
name, level_s, s = msg.data.split('\x00', 2)
name, level_s, s = msg.data.decode('latin1').split('\x00', 2)
logger.log(int(level_s), '%s: %s', name, s, extra={
'mitogen_message': s,
'mitogen_context': self._router.context_by_id(msg.src_id),
@ -283,9 +308,10 @@ class ModuleFinder(object):
resembles a Python script. For now we simply verify the file contains
ASCII text.
"""
fp = open(path, 'r')
fp = open(path, 'rb')
try:
return not set(fp.read(512)).difference(string.printable)
sample = fp.read(512).decode('latin-1')
return not set(sample).difference(string.printable)
finally:
fp.close()
@ -305,9 +331,18 @@ class ModuleFinder(object):
def _get_module_via_pkgutil(self, fullname):
"""Attempt to fetch source code via pkgutil. In an ideal world, this
would be the only required implementation of get_module()."""
loader = pkgutil.find_loader(fullname)
IOLOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r',
fullname, loader)
try:
# Pre-'import spec' this returned None, in Python3.6 it raises
# ImportError.
loader = pkgutil.find_loader(fullname)
except ImportError:
e = sys.exc_info()[1]
LOG.debug('%r._get_module_via_pkgutil(%r): %s',
self, fullname, e)
return None
IOLOG.debug('%r._get_module_via_pkgutil(%r) -> %r',
self, fullname, loader)
if not loader:
return
@ -315,16 +350,34 @@ class ModuleFinder(object):
path = self._py_filename(loader.get_filename(fullname))
source = loader.get_source(fullname)
is_pkg = loader.is_package(fullname)
except AttributeError:
except (AttributeError, ImportError):
# - Per PEP-302, get_source() and is_package() are optional,
# calling them may throw AttributeError.
# - get_filename() may throw ImportError if pkgutil.find_loader()
# picks a "parent" package's loader for some crap that's been
# stuffed in sys.modules, for example in the case of urllib3:
# "loader for urllib3.contrib.pyopenssl cannot handle
# requests.packages.urllib3.contrib.pyopenssl"
e = sys.exc_info()[1]
LOG.debug('%r: loading %r using %r failed: %s',
self, fullname, loader)
return
if path is not None and source is not None:
return path, source, is_pkg
if path is None or source is None:
return
if isinstance(source, mitogen.core.UnicodeType):
# get_source() returns "string" according to PEP-302, which was
# reinterpreted for Python 3 to mean a Unicode string.
source = source.encode('utf-8')
return path, source, is_pkg
def _get_module_via_sys_modules(self, fullname):
"""Attempt to fetch source code via sys.modules. This is specifically
to support __main__, but it may catch a few more cases."""
module = sys.modules.get(fullname)
LOG.debug('_get_module_via_sys_modules(%r) -> %r', fullname, module)
if not isinstance(module, types.ModuleType):
LOG.debug('sys.modules[%r] absent or not a regular module',
fullname)
@ -344,6 +397,11 @@ class ModuleFinder(object):
raise
source = '\n'
if isinstance(source, mitogen.core.UnicodeType):
# get_source() returns "string" according to PEP-302, which was
# reinterpreted for Python 3 to mean a Unicode string.
source = source.encode('utf-8')
return path, source, is_pkg
get_module_methods = [_get_module_via_pkgutil,
@ -388,7 +446,7 @@ class ModuleFinder(object):
# This would be an ImportError in real code.
return ''
return '.'.join(bits[:-level])
return '.'.join(bits[:-level]) + '.'
def generate_parent_names(self, fullname):
while '.' in fullname:
@ -423,7 +481,7 @@ class ModuleFinder(object):
modnames = [modname, '%s.%s' % (fullname, modname)]
else:
modnames = [
'%s.%s' % (self.resolve_relpath(fullname, level), modname)
'%s%s' % (self.resolve_relpath(fullname, level), modname)
]
maybe_names.extend(modnames)
@ -439,7 +497,7 @@ class ModuleFinder(object):
for name in maybe_names
if sys.modules.get(name) is not None
and not is_stdlib_name(name)
and 'six.moves' not in name # TODO: crap
and u'six.moves' not in name # TODO: crap
)
))
@ -483,7 +541,7 @@ class ModuleResponder(object):
def __repr__(self):
return 'ModuleResponder(%r)' % (self._router,)
MAIN_RE = re.compile(r'^if\s+__name__\s*==\s*.__main__.\s*:', re.M)
MAIN_RE = re.compile(b(r'^if\s+__name__\s*==\s*.__main__.\s*:'), re.M)
def whitelist_prefix(self, fullname):
if self.whitelist == ['']:
@ -502,6 +560,9 @@ class ModuleResponder(object):
return src[:match.start()]
return src
def _make_negative_response(self, fullname):
return (fullname, None, None, None, ())
def _build_tuple(self, fullname):
if mitogen.core.is_blacklisted_import(self, fullname):
raise ImportError('blacklisted')
@ -512,13 +573,10 @@ class ModuleResponder(object):
path, source, is_pkg = self._finder.get_module_source(fullname)
if source is None:
LOG.error('_build_tuple(%r): could not locate source', fullname)
tup = fullname, None, None, None, ()
tup = self._make_negative_response(fullname)
self._cache[fullname] = tup
return tup
if source is None:
raise ImportError('could not find %r' % (fullname,))
if is_pkg:
pkg_present = get_child_modules(path)
LOG.debug('_build_tuple(%r, %r) -> %r',
@ -528,14 +586,20 @@ class ModuleResponder(object):
if fullname == '__main__':
source = self.neutralize_main(source)
compressed = zlib.compress(source, 9)
compressed = mitogen.core.Blob(zlib.compress(source, 9))
related = [
name
to_text(name)
for name in self._finder.find_related(fullname)
if not mitogen.core.is_blacklisted_import(self, name)
]
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related
tup = fullname, pkg_present, path, compressed, related
tup = (
to_text(fullname),
pkg_present,
to_text(path),
compressed,
related
)
self._cache[fullname] = tup
return tup
@ -563,6 +627,14 @@ class ModuleResponder(object):
def _send_module_and_related(self, stream, fullname):
try:
tup = self._build_tuple(fullname)
if tup[2] and is_stdlib_path(tup[2]):
# Prevent loading of 2.x<->3.x stdlib modules! This costs one
# RTT per hit, so a client-side solution is also required.
LOG.warning('%r: refusing to serve stdlib module %r',
self, fullname)
self._send_module_load_failed(stream, fullname)
return
for name in tup[4]: # related
parent, _, _ = name.partition('.')
if parent != fullname and parent not in stream.sent_modules:
@ -581,7 +653,7 @@ class ModuleResponder(object):
LOG.debug('%r._on_get_module(%r)', self, msg.data)
stream = self._router.stream_by_id(msg.src_id)
fullname = msg.data
fullname = msg.data.decode()
if fullname in stream.sent_modules:
LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream)
@ -592,7 +664,7 @@ class ModuleResponder(object):
if stream.remote_id != context.context_id:
stream.send(
mitogen.core.Message(
data='%s\x00%s' % (context.context_id, fullname),
data=b('%s\x00%s' % (context.context_id, fullname)),
handle=mitogen.core.FORWARD_MODULE,
dst_id=stream.remote_id,
)

@ -28,11 +28,12 @@
import sys
try:
from cStringIO import StringIO as BytesIO
from io import StringIO
except ImportError:
from io import BytesIO
from StringIO import StringIO
import mitogen.core
if sys.version_info < (2, 7, 11):
from mitogen.compat import tokenize
@ -49,7 +50,9 @@ except ImportError:
def minimize_source(source):
"""Remove most comments and docstrings from Python source code.
"""
tokens = tokenize.generate_tokens(BytesIO(source).readline)
if not isinstance(source, mitogen.core.UnicodeType):
source = source.decode('utf-8')
tokens = tokenize.generate_tokens(StringIO(source).readline)
tokens = strip_comments(tokens)
tokens = strip_docstrings(tokens)
tokens = reindent(tokens)

@ -32,6 +32,7 @@ sent to any child context that is due to become a parent, due to recursive
connection.
"""
import codecs
import errno
import fcntl
import getpass
@ -52,11 +53,25 @@ import zlib
# Absolute imports for <2.5.
select = __import__('select')
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
try:
from functools import lru_cache
except ImportError:
from mitogen.compat.functools import lru_cache
import mitogen.core
from mitogen.core import b
from mitogen.core import LOG
from mitogen.core import IOLOG
if mitogen.core.PY3:
xrange = range
try:
SC_OPEN_MAX = os.sysconf('SC_OPEN_MAX')
except:
@ -281,9 +296,13 @@ def write_all(fd, s, deadline=None):
if timeout == 0:
raise mitogen.core.TimeoutError('write timed out')
if mitogen.core.PY3:
window = memoryview(s)[written:]
else:
window = buffer(s, written)
for fd in poller.poll(timeout):
n, disconnected = mitogen.core.io_op(
os.write, fd, buffer(s, written))
n, disconnected = mitogen.core.io_op(os.write, fd, window)
if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
@ -320,8 +339,8 @@ def iter_read(fds, deadline=None):
if not poller.readers:
raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],)
u'EOF on stream; last 300 bytes received: %r' %
(b('').join(bits)[-300:].decode('latin1'),)
)
raise mitogen.core.TimeoutError('read timed out')
@ -380,14 +399,18 @@ def upgrade_router(econtext):
def make_call_msg(fn, *args, **kwargs):
if isinstance(fn, types.MethodType) and \
isinstance(fn.im_self, (type, types.ClassType)):
klass = fn.im_self.__name__
klass = mitogen.core.to_text(fn.im_self.__name__)
else:
klass = None
return mitogen.core.Message.pickled(
(fn.__module__, klass, fn.__name__, args, kwargs),
handle=mitogen.core.CALL_FUNCTION,
tup = (
mitogen.core.to_text(fn.__module__),
klass,
mitogen.core.to_text(fn.__name__),
args,
mitogen.core.Kwargs(kwargs)
)
return mitogen.core.Message.pickled(tup, handle=mitogen.core.CALL_FUNCTION)
def stream_by_method_name(name):
@ -395,9 +418,9 @@ def stream_by_method_name(name):
Given the name of a Mitogen connection method, import its implementation
module and return its Stream subclass.
"""
if name == 'local':
name = 'parent'
module = mitogen.core.import_module('mitogen.' + name)
if name == u'local':
name = u'parent'
module = mitogen.core.import_module(u'mitogen.' + name)
return module.Stream
@ -412,22 +435,25 @@ def _proxy_connect(name, method_name, kwargs, econtext):
)
except mitogen.core.StreamError:
return {
'id': None,
'name': None,
'msg': 'error occurred on host %s: %s' % (
u'id': None,
u'name': None,
u'msg': 'error occurred on host %s: %s' % (
socket.gethostname(),
sys.exc_info()[1],
),
}
return {
'id': context.context_id,
'name': context.name,
'msg': None,
u'id': context.context_id,
u'name': context.name,
u'msg': None,
}
class Argv(object):
"""
Wrapper to defer argv formatting when debug logging is disabled.
"""
def __init__(self, argv):
self.argv = argv
@ -444,6 +470,38 @@ class Argv(object):
return ' '.join(map(self.escape, self.argv))
class CallSpec(object):
"""
Wrapper to defer call argument formatting when debug logging is disabled.
"""
def __init__(self, func, args, kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def _get_name(self):
return u'%s.%s' % (self.func.__module__,
self.func.__name__)
def _get_args(self):
return u', '.join(repr(a) for a in self.args)
def _get_kwargs(self):
s = u''
if self.kwargs:
s = u', '.join('%s=%r' % (k, v) for k, v in self.kwargs.items())
if self.args:
s = u', ' + s
return s
def __repr__(self):
return '%s(%s%s)' % (
self._get_name(),
self._get_args(),
self._get_kwargs(),
)
class KqueuePoller(mitogen.core.Poller):
_repr = 'KqueuePoller()'
@ -636,7 +694,7 @@ class TtyLogStream(mitogen.core.BasicStream):
if not buf:
return self.on_disconnect(broker)
self.buf += buf
self.buf += buf.decode('utf-8', 'replace')
while '\n' in self.buf:
lines = self.buf.split('\n')
self.buf = lines[-1]
@ -649,7 +707,7 @@ class Stream(mitogen.core.Stream):
Base for streams capable of starting new slaves.
"""
#: The path to the remote Python interpreter.
python_path = 'python'
python_path = sys.executable
#: Maximum time to wait for a connection attempt.
connect_timeout = 30.0
@ -767,13 +825,20 @@ class Stream(mitogen.core.Stream):
# file descriptor 0 as 100, creates a pipe, then execs a new interpreter
# with a custom argv.
# * Optimized for minimum byte count after minification & compression.
# * 'CONTEXT_NAME', 'PREAMBLE_COMPRESSED_LEN', and 'PREAMBLE_LEN' are
# substituted with their respective values.
# * 'CONTEXT_NAME' and 'PREAMBLE_COMPRESSED_LEN' are substituted with
# their respective values.
# * CONTEXT_NAME must be prefixed with the name of the Python binary in
# order to allow virtualenvs to detect their install prefix.
# * For Darwin, OS X installs a craptacular argv0-introspecting Python
# version switcher as /usr/bin/python. Override attempts to call it
# with an explicit call to python2.7
#
# Locals:
# R: read side of interpreter stdin.
# W: write side of interpreter stdin.
# r: read side of core_src FD.
# w: write side of core_src FD.
# C: the decompressed core source.
@staticmethod
def _first_stage():
R,W=os.pipe()
@ -790,11 +855,15 @@ class Stream(mitogen.core.Stream):
sys.executable += sys.version[:3]
os.environ['ARGV0']=sys.executable
os.execl(sys.executable,sys.executable+'(mitogen:CONTEXT_NAME)')
os.write(1,'MITO000\n')
os.write(1,'MITO000\n'.encode())
C=_(os.fdopen(0,'rb').read(PREAMBLE_COMPRESSED_LEN),'zip')
os.fdopen(W,'w',0).write(C)
os.fdopen(w,'w',0).write('PREAMBLE_LEN\n'+C)
os.write(1,'MITO001\n')
fp=os.fdopen(W,'wb',0)
fp.write(C)
fp.close()
fp=os.fdopen(w,'wb',0)
fp.write(C)
fp.close()
os.write(1,'MITO001\n'.encode())
def get_boot_command(self):
source = inspect.getsource(self._first_stage)
@ -804,9 +873,8 @@ class Stream(mitogen.core.Stream):
preamble_compressed = self.get_preamble()
source = source.replace('PREAMBLE_COMPRESSED_LEN',
str(len(preamble_compressed)))
source = source.replace('PREAMBLE_LEN',
str(len(zlib.decompress(preamble_compressed))))
encoded = zlib.compress(source, 9).encode('base64').replace('\n', '')
compressed = zlib.compress(source.encode(), 9)
encoded = codecs.encode(compressed, 'base64').replace(b('\n'), b(''))
# We can't use bytes.decode() in 3.x since it was restricted to always
# return unicode, so codecs.decode() is used instead. In 3.x
# codecs.decode() requires a bytes object. Since we must be compatible
@ -815,7 +883,7 @@ class Stream(mitogen.core.Stream):
return [
self.python_path, '-c',
'import codecs,os,sys;_=codecs.decode;'
'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded,)
'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded.decode(),)
]
def get_econtext_config(self):
@ -840,11 +908,11 @@ class Stream(mitogen.core.Stream):
source += '\nExternalContext(%r).main()\n' % (
self.get_econtext_config(),
)
return zlib.compress(source, 9)
return zlib.compress(source.encode('utf-8'), 9)
create_child = staticmethod(create_child)
create_child_args = {}
name_prefix = 'local'
name_prefix = u'local'
def start_child(self):
args = self.get_boot_command()
@ -858,7 +926,7 @@ class Stream(mitogen.core.Stream):
def connect(self):
LOG.debug('%r.connect()', self)
self.pid, fd, extra_fd = self.start_child()
self.name = '%s.%s' % (self.name_prefix, self.pid)
self.name = u'%s.%s' % (self.name_prefix, self.pid)
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r',
@ -871,16 +939,18 @@ class Stream(mitogen.core.Stream):
raise
#: For ssh.py, this must be at least max(len('password'), len('debug1:'))
EC0_MARKER = 'MITO000\n'
EC1_MARKER = 'MITO001\n'
EC0_MARKER = mitogen.core.b('MITO000\n')
EC1_MARKER = mitogen.core.b('MITO001\n')
def _ec0_received(self):
LOG.debug('%r._ec0_received()', self)
write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, 'MITO001\n', self.connect_deadline)
discard_until(self.receive_side.fd, self.EC1_MARKER,
self.connect_deadline)
def _connect_bootstrap(self, extra_fd):
discard_until(self.receive_side.fd, 'MITO000\n', self.connect_deadline)
discard_until(self.receive_side.fd, self.EC0_MARKER,
self.connect_deadline)
self._ec0_received()
@ -919,8 +989,7 @@ class Context(mitogen.core.Context):
return hash((self.router, self.context_id))
def call_async(self, fn, *args, **kwargs):
LOG.debug('%r.call_async(%r, *%r, **%r)',
self, fn, args, kwargs)
LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs))
return self.send_async(make_call_msg(fn, *args, **kwargs))
def call(self, fn, *args, **kwargs):
@ -976,7 +1045,7 @@ class RouteMonitor(object):
self.parent.send(
mitogen.core.Message(
handle=handle,
data=data,
data=data.encode('utf-8'),
)
)
@ -986,7 +1055,8 @@ class RouteMonitor(object):
stream, we're also responsible for broadcasting DEL_ROUTE upstream
if/when that child disconnects.
"""
self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id, stream.name)
self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id,
stream.name)
mitogen.core.listen(
obj=stream,
name='disconnect',
@ -1011,7 +1081,8 @@ class RouteMonitor(object):
if msg.is_dead:
return
target_id_s, _, target_name = msg.data.partition(':')
target_id_s, _, target_name = msg.data.partition(b(':'))
target_name = target_name.decode()
target_id = int(target_id_s)
self.router.context_by_id(target_id).name = target_name
stream = self.router.stream_by_id(msg.auth_id)
@ -1124,7 +1195,7 @@ class Router(mitogen.core.Router):
self._context_by_id[context_id] = context
return context
connection_timeout_msg = "Connection timed out."
connection_timeout_msg = u"Connection timed out."
def _connect(self, klass, name=None, **kwargs):
context_id = self.allocate_id()
@ -1145,11 +1216,11 @@ class Router(mitogen.core.Router):
def connect(self, method_name, name=None, **kwargs):
klass = stream_by_method_name(method_name)
kwargs.setdefault('debug', self.debug)
kwargs.setdefault('profiling', self.profiling)
kwargs.setdefault('unidirectional', self.unidirectional)
kwargs.setdefault(u'debug', self.debug)
kwargs.setdefault(u'profiling', self.profiling)
kwargs.setdefault(u'unidirectional', self.unidirectional)
via = kwargs.pop('via', None)
via = kwargs.pop(u'via', None)
if via is not None:
return self.proxy_connect(via, method_name, name=name, **kwargs)
return self._connect(klass, name=name, **kwargs)
@ -1158,43 +1229,43 @@ class Router(mitogen.core.Router):
resp = via_context.call(_proxy_connect,
name=name,
method_name=method_name,
kwargs=kwargs
kwargs=mitogen.core.Kwargs(kwargs),
)
if resp['msg'] is not None:
raise mitogen.core.StreamError(resp['msg'])
name = '%s.%s' % (via_context.name, resp['name'])
name = u'%s.%s' % (via_context.name, resp['name'])
context = self.context_class(self, resp['id'], name=name)
context.via = via_context
self._context_by_id[context.context_id] = context
return context
def docker(self, **kwargs):
return self.connect('docker', **kwargs)
return self.connect(u'docker', **kwargs)
def fork(self, **kwargs):
return self.connect('fork', **kwargs)
return self.connect(u'fork', **kwargs)
def jail(self, **kwargs):
return self.connect('jail', **kwargs)
return self.connect(u'jail', **kwargs)
def local(self, **kwargs):
return self.connect('local', **kwargs)
return self.connect(u'local', **kwargs)
def lxc(self, **kwargs):
return self.connect('lxc', **kwargs)
return self.connect(u'lxc', **kwargs)
def setns(self, **kwargs):
return self.connect('setns', **kwargs)
return self.connect(u'setns', **kwargs)
def su(self, **kwargs):
return self.connect('su', **kwargs)
return self.connect(u'su', **kwargs)
def sudo(self, **kwargs):
return self.connect('sudo', **kwargs)
return self.connect(u'sudo', **kwargs)
def ssh(self, **kwargs):
return self.connect('ssh', **kwargs)
return self.connect(u'ssh', **kwargs)
class ProcessMonitor(object):
@ -1251,7 +1322,8 @@ class ModuleForwarder(object):
if msg.is_dead:
return
context_id_s, _, fullname = msg.data.partition('\x00')
context_id_s, _, fullname = msg.data.partition(b('\x00'))
fullname = mitogen.core.to_text(fullname)
context_id = int(context_id_s)
stream = self.router.stream_by_id(context_id)
if stream.remote_id == mitogen.parent_id:
@ -1279,9 +1351,17 @@ class ModuleForwarder(object):
if msg.is_dead:
return
fullname = msg.data
self.importer._request_module(fullname,
lambda: self._on_cache_callback(msg, fullname)
fullname = msg.data.decode('utf-8')
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)
def _send_one_module(self, msg, tup):
self.router._async_route(
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _on_cache_callback(self, msg, fullname):

@ -38,6 +38,7 @@ import time
import mitogen.core
import mitogen.select
from mitogen.core import b
from mitogen.core import LOG
@ -48,6 +49,14 @@ _pool_pid = None
_pool_lock = threading.Lock()
if mitogen.core.PY3:
def func_code(func):
return func.__code__
else:
def func_code(func):
return func.func_code
@mitogen.core.takes_router
def get_or_create_pool(size=None, router=None):
global _pool
@ -221,7 +230,7 @@ class Invoker(object):
def _invoke(self, method_name, kwargs, msg):
method = getattr(self.service, method_name)
if 'msg' in method.func_code.co_varnames:
if 'msg' in func_code(method).co_varnames:
kwargs['msg'] = msg # TODO: hack
no_reply = getattr(method, 'mitogen_service__no_reply', False)
@ -371,7 +380,7 @@ class Service(object):
@classmethod
def name(cls):
return '%s.%s' % (cls.__module__, cls.__name__)
return u'%s.%s' % (cls.__module__, cls.__name__)
def __init__(self, router):
self.router = router
@ -468,7 +477,7 @@ class Pool(object):
def join(self):
for th in self._threads:
th.join()
for invoker in self._invoker_by_name.itervalues():
for invoker in self._invoker_by_name.values():
invoker.service.on_shutdown()
def get_invoker(self, name, msg):
@ -492,15 +501,17 @@ class Pool(object):
tup = msg.unpickle(throw=False)
if not (isinstance(tup, tuple) and
len(tup) == 3 and
isinstance(tup[0], basestring) and
isinstance(tup[1], basestring) and
isinstance(tup[0], mitogen.core.AnyTextType) and
isinstance(tup[1], mitogen.core.AnyTextType) and
isinstance(tup[2], dict)):
raise mitogen.core.CallError('Invalid message format.')
def _on_service_call(self, recv, msg):
self._validate(msg)
service_name, method_name, kwargs = msg.unpickle()
service_name = None
method_name = None
try:
self._validate(msg)
service_name, method_name, kwargs = msg.unpickle()
invoker = self.get_invoker(service_name, msg)
return invoker.invoke(method_name, kwargs, msg)
except mitogen.core.CallError:
@ -628,7 +639,7 @@ class PushFileService(Service):
@expose(policy=AllowParents())
@arg_spec({
'context': mitogen.core.Context,
'path': basestring,
'path': mitogen.core.FsPathTypes,
})
def propagate_to(self, context, path):
LOG.debug('%r.propagate_to(%r, %r)', self, context, path)
@ -651,7 +662,7 @@ class PushFileService(Service):
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
'path': basestring,
'path': mitogen.core.FsPathTypes,
'data': mitogen.core.Blob,
'context': mitogen.core.Context,
})
@ -667,7 +678,7 @@ class PushFileService(Service):
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
'path': basestring,
'path': mitogen.core.FsPathTypes,
'context': mitogen.core.Context,
})
def forward(self, path, context):
@ -752,7 +763,7 @@ class FileService(Service):
@expose(policy=AllowParents())
@arg_spec({
'path': basestring,
'path': mitogen.core.FsPathTypes,
})
def register(self, path):
"""
@ -767,7 +778,7 @@ class FileService(Service):
st = os.stat(path)
if not stat.S_ISREG(st.st_mode):
raise IOError('%r is not a regular file.' % (in_path,))
raise IOError('%r is not a regular file.' % (path,))
LOG.debug('%r: registering %r', self, path)
self._metadata_by_path[path] = {
@ -801,7 +812,7 @@ class FileService(Service):
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE)
mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
@ -831,7 +842,7 @@ class FileService(Service):
@expose(policy=AllowAny())
@no_reply()
@arg_spec({
'path': basestring,
'path': mitogen.core.FsPathTypes,
'sender': mitogen.core.Sender,
})
def fetch(self, path, sender, msg):
@ -909,7 +920,7 @@ class FileService(Service):
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used
to fetch the file.
:param bytes in_path:
:param bytes path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.

@ -151,7 +151,8 @@ class Stream(mitogen.parent.Stream):
os.readlink(nspath + name) != os.readlink(selfpath + name)
)
]
except Exception, e:
except Exception:
e = sys.exc_info()[1]
raise Error(str(e))
os.chdir('/proc/%s/root' % (self.leader_pid,))
@ -214,4 +215,4 @@ class Stream(mitogen.parent.Stream):
LOG.debug('Leader PID for %s container %r: %d',
self.kind, self.container, self.leader_pid)
super(Stream, self).connect()
self.name = 'setns.' + self.container
self.name = u'setns.' + self.container

@ -39,18 +39,19 @@ except ImportError:
from pipes import quote as shlex_quote
import mitogen.parent
from mitogen.core import b
LOG = logging.getLogger('mitogen')
# sshpass uses 'assword' because it doesn't lowercase the input.
PASSWORD_PROMPT = 'password'
PERMDENIED_PROMPT = 'permission denied'
HOSTKEY_REQ_PROMPT = 'are you sure you want to continue connecting (yes/no)?'
HOSTKEY_FAIL = 'host key verification failed.'
PASSWORD_PROMPT = b('password')
PERMDENIED_PROMPT = b('permission denied')
HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?')
HOSTKEY_FAIL = b('host key verification failed.')
DEBUG_PREFIXES = ('debug1:', 'debug2:', 'debug3:')
DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:'))
def filter_debug(stream, it):
@ -62,7 +63,7 @@ def filter_debug(stream, it):
lines such as the password prompt.
"""
state = 'start_of_line'
buf = ''
buf = b('')
for chunk in it:
buf += chunk
while buf:
@ -78,13 +79,13 @@ def filter_debug(stream, it):
else:
state = 'in_plain'
elif state == 'in_debug':
if '\n' not in buf:
if b('\n') not in buf:
break
line, _, buf = buf.partition('\n')
line, _, buf = buf.partition(b('\n'))
LOG.debug('%r: %s', stream, line.rstrip())
state = 'start_of_line'
elif state == 'in_plain':
line, nl, buf = buf.partition('\n')
line, nl, buf = buf.partition(b('\n'))
yield line + nl
if nl:
state = 'start_of_line'
@ -102,6 +103,10 @@ class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
#: Default to whatever is available as 'python' on the remote machine,
#: overriding sys.executable use.
python_path = 'python'
#: Number of -v invocations to pass on command line.
ssh_debug_level = 0
@ -189,9 +194,9 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'ssh.' + self.hostname
self.name = u'ssh.' + mitogen.core.to_text(self.hostname)
if self.port:
self.name += ':%s' % (self.port,)
self.name += u':%s' % (self.port,)
auth_incorrect_msg = 'SSH authentication is incorrect'
password_incorrect_msg = 'SSH password is incorrect'
@ -249,7 +254,7 @@ class Stream(mitogen.parent.Stream):
if self.password is None:
raise PasswordError(self.password_required_msg)
LOG.debug('%r: sending password', self)
self.tty_stream.transmit_side.write(self.password + '\n')
self.tty_stream.transmit_side.write((self.password + '\n').encode())
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')

@ -31,6 +31,7 @@ import os
import mitogen.core
import mitogen.parent
from mitogen.core import b
LOG = logging.getLogger(__name__)
@ -54,11 +55,11 @@ class Stream(mitogen.parent.Stream):
username = 'root'
password = None
su_path = 'su'
password_prompt = 'password:'
password_prompt = b('password:')
incorrect_prompts = (
'su: sorry', # BSD
'su: authentication failure', # Linux
'su: incorrect password', # CentOS 6
b('su: sorry'), # BSD
b('su: authentication failure'), # Linux
b('su: incorrect password'), # CentOS 6
)
def construct(self, username=None, password=None, su_path=None,
@ -77,7 +78,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'su.' + self.username
self.name = u'su.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
super(Stream, self).on_disconnect(broker)
@ -110,6 +111,8 @@ class Stream(mitogen.parent.Stream):
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.transmit_side.write(self.password + '\n')
self.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')

@ -33,10 +33,11 @@ import time
import mitogen.core
import mitogen.parent
from mitogen.core import b
LOG = logging.getLogger(__name__)
PASSWORD_PROMPT = 'password'
PASSWORD_PROMPT = b('password')
SUDO_OPTIONS = [
#(False, 'bool', '--askpass', '-A')
#(False, 'str', '--auth-type', '-a')
@ -135,7 +136,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'sudo.' + self.username
self.name = u'sudo.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
@ -176,7 +177,8 @@ class Stream(mitogen.parent.Stream):
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.tty_stream.transmit_side.write(self.password + '\n')
self.tty_stream.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')

@ -73,7 +73,7 @@ class Listener(mitogen.core.BasicStream):
os.unlink(self.path)
self._sock.bind(self.path)
os.chmod(self.path, 0600)
os.chmod(self.path, int('0600', 8))
self._sock.listen(backlog)
self.receive_side = mitogen.core.Side(self, self._sock.fileno())
router.broker.start_receive(self)
@ -87,7 +87,7 @@ class Listener(mitogen.core.BasicStream):
context = mitogen.parent.Context(self._router, context_id)
stream = mitogen.core.Stream(self._router, context_id)
stream.accept(sock.fileno(), sock.fileno())
stream.name = 'unix_client.%d' % (pid,)
stream.name = u'unix_client.%d' % (pid,)
stream.auth_id = mitogen.context_id
stream.is_privileged = True
self._router.register(context, stream)
@ -111,7 +111,7 @@ def connect(path, broker=None):
router = mitogen.master.Router(broker=broker)
stream = mitogen.core.Stream(router, remote_id)
stream.accept(sock.fileno(), sock.fileno())
stream.name = 'unix_listener.%d' % (pid,)
stream.name = u'unix_listener.%d' % (pid,)
context = mitogen.parent.Context(router, remote_id)
router.register(context, stream)

@ -39,6 +39,11 @@ import mitogen.master
LOG = logging.getLogger('mitogen')
iteritems = getattr(dict, 'iteritems', dict.items)
if mitogen.core.PY3:
iteritems = dict.items
else:
iteritems = dict.iteritems
def disable_site_packages():
for entry in sys.path[:]:
@ -51,19 +56,15 @@ def _formatTime(record, datefmt=None):
return dt.strftime(datefmt)
def log_get_formatter(usec=False):
usec = ('MITOGEN_LOG_USEC' in os.environ) or usec
datefmt = '%H:%M:%S'
if usec:
datefmt += '.%f'
def log_get_formatter():
datefmt = '%H:%M:%S.%f'
fmt = '%(asctime)s %(levelname).1s %(name)s: %(message)s'
formatter = logging.Formatter(fmt, datefmt)
formatter.formatTime = _formatTime
return formatter
def log_to_file(path=None, io=False, usec=False, level='INFO'):
def log_to_file(path=None, io=False, level='INFO'):
log = logging.getLogger('')
if path:
fp = open(path, 'w', 1)
@ -80,8 +81,15 @@ def log_to_file(path=None, io=False, usec=False, level='INFO'):
level = getattr(logging, level, logging.INFO)
log.setLevel(level)
# Prevent accidental duplicate log_to_file() calls from generating
# duplicate output.
for handler_ in reversed(log.handlers):
if getattr(handler_, 'is_mitogen', None):
log.handlers.remove(handler_)
handler = logging.StreamHandler(fp)
handler.formatter = log_get_formatter(usec=usec)
handler.is_mitogen = True
handler.formatter = log_get_formatter()
log.handlers.insert(0, handler)
@ -98,7 +106,10 @@ def run_with_router(func, *args, **kwargs):
def with_router(func):
def wrapper(*args, **kwargs):
return run_with_router(func, *args, **kwargs)
wrapper.func_name = func.func_name
if mitogen.core.PY3:
wrapper.func_name = func.__name__
else:
wrapper.func_name = func.func_name
return wrapper
@ -118,9 +129,9 @@ def cast(obj):
return [cast(v) for v in obj]
if isinstance(obj, PASSTHROUGH):
return obj
if isinstance(obj, unicode):
return unicode(obj)
if isinstance(obj, str):
return str(obj)
if isinstance(obj, mitogen.core.UnicodeType):
return mitogen.core.UnicodeType(obj)
if isinstance(obj, mitogen.core.BytesType):
return mitogen.core.BytesType(obj)
raise TypeError("Cannot serialize: %r: %r" % (type(obj), obj))

@ -19,11 +19,11 @@ router = mitogen.master.Router()
context = mitogen.parent.Context(router, 0)
stream = mitogen.ssh.Stream(router, 0, max_message_size=0, hostname='foo')
print 'SSH command size: %s' % (len(' '.join(stream.get_boot_command())),)
print 'Preamble size: %s (%.2fKiB)' % (
print('SSH command size: %s' % (len(' '.join(stream.get_boot_command())),))
print('Preamble size: %s (%.2fKiB)' % (
len(stream.get_preamble()),
len(stream.get_preamble()) / 1024.0,
)
))
print(
' '

@ -1,4 +1,10 @@
#/bin/sh
#/usr/bin/env bash
echo '----- ulimits -----'
ulimit -a
echo '-------------------'
echo
set -o errexit
set -o nounset
set -o pipefail

@ -48,7 +48,6 @@ setup(
license = 'New BSD',
url = 'https://github.com/dw/mitogen/',
packages = find_packages(exclude=['tests', 'examples']),
use_2to3=True,
zip_safe = False,
classifiers = [
'Development Status :: 3 - Alpha',
@ -61,7 +60,7 @@ setup(
'Programming Language :: Python :: 2.5',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 2 :: Only',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Topic :: System :: Distributed Computing',
'Topic :: System :: Systems Administration',

@ -55,8 +55,8 @@ for suffix in suffixes:
tofile='mitogen-output.txt',
))
if diff:
print '++ differ! suffix: %r' % (suffix,)
print('++ differ! suffix: %r' % (suffix,))
for line in diff:
print line
print(line)
print
print

@ -23,7 +23,7 @@
- assert:
that: |
out.content.decode('base64') == '{"I am JSON": true}'
out.content|b64decode == '{"I am JSON": true}'
# Ensure it handles strings.
@ -41,7 +41,7 @@
- assert:
that:
out.content.decode('base64') == 'I am text.'
out.content|b64decode == 'I am text.'
- file:
path: /tmp/transfer-data

@ -4,9 +4,9 @@ import json
from ansible.module_utils.basic import path
def main():
print json.dumps({
print(json.dumps({
'path': path()
})
}))
if __name__ == '__main__':
main()

@ -21,4 +21,4 @@
- assert:
that:
- out.argv_types == ["<type 'str'>"]
- out.argv_types_correct

@ -1,6 +1,8 @@
from __future__ import unicode_literals
import io
from ansible.module_utils import six
try:
from ansible.plugins import callback_loader
except ImportError:
@ -25,15 +27,17 @@ def printi(tio, obj, key=None, indent=0):
write(']')
elif isinstance(obj, dict):
write('{')
for key2, obj2 in sorted(obj.iteritems()):
for key2, obj2 in sorted(six.iteritems(obj)):
if not (key2.startswith('_ansible_') or
key2.endswith('_lines')):
printi(tio, obj2, key=key2, indent=indent+1)
key = None
write('}')
elif isinstance(obj, basestring):
if isinstance(obj, str):
obj = obj.decode('utf-8', 'replace')
elif isinstance(obj, six.text_type):
for line in obj.splitlines():
write('%s', line.rstrip('\r\n'))
elif isinstance(obj, six.binary_type):
obj = obj.decode('utf-8', 'replace')
for line in obj.splitlines():
write('%s', line.rstrip('\r\n'))
else:
@ -47,7 +51,7 @@ class CallbackModule(DefaultModule):
try:
tio = io.StringIO()
printi(tio, result)
return tio.getvalue().encode('ascii', 'replace')
return tio.getvalue() #.encode('ascii', 'replace')
except:
import traceback
traceback.print_exc()

@ -3,6 +3,7 @@
# interpreter I run within.
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils import six
import os
import pwd
@ -16,7 +17,7 @@ def main():
python_version=sys.version[:3],
argv=sys.argv,
__file__=__file__,
argv_types=[str(type(s)) for s in sys.argv],
argv_types_correct=all(type(s) is str for s in sys.argv),
env=dict(os.environ),
cwd=os.getcwd(),
python_path=sys.path,

@ -6,8 +6,8 @@ import sys
json_arguments = """<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>"""
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"input\": [%s]" % (json_arguments,)
print "}"
print("{")
print(" \"changed\": false,")
print(" \"msg\": \"Here is my input\",")
print(" \"input\": [%s]" % (json_arguments,))
print("}")

@ -13,8 +13,8 @@ def usage():
input_json = sys.stdin.read()
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"input\": [%s]" % (input_json,)
print "}"
print("{")
print(" \"changed\": false,")
print(" \"msg\": \"Here is my input\",")
print(" \"input\": [%s]" % (input_json,))
print("}")

@ -14,13 +14,13 @@ def usage():
input_json = sys.stdin.read()
print "{"
print " \"changed\": false,"
print("{")
print(" \"changed\": false,")
# v2.5.1. apt.py started depending on this.
# https://github.com/dw/mitogen/issues/210
print " \"__file__\": \"%s\"," % (__file__,)
print(" \"__file__\": \"%s\"," % (__file__,))
# Python sets this during a regular import.
print " \"__package__\": \"%s\"," % (__package__,)
print " \"msg\": \"Here is my input\","
print " \"input\": [%s]" % (input_json,)
print "}"
print(" \"__package__\": \"%s\"," % (__package__,))
print(" \"msg\": \"Here is my input\",")
print(" \"input\": [%s]" % (input_json,))
print("}")

@ -25,9 +25,9 @@ try:
except IOError:
usage()
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"source\": [%s]," % (json.dumps(me),)
print " \"input\": [%s]" % (input_json,)
print "}"
print("{")
print(" \"changed\": false,")
print(" \"msg\": \"Here is my input\",")
print(" \"source\": [%s]," % (json.dumps(me),))
print(" \"input\": [%s]" % (input_json,))
print("}")

@ -9,7 +9,7 @@ def main():
module = AnsibleModule(argument_spec={'name': {'type': 'str'}})
try:
module.exit_json(addr=socket.gethostbyname(module.params['name']))
except socket.error, e:
except socket.error as e:
module.fail_json(msg=str(e))
if __name__ == '__main__':

@ -10,13 +10,13 @@ import time
@mitogen.main() #(log_level='DEBUG')
def main(router):
for x in xrange(1000):
for x in range(1000):
t = time.time()
f = router.local()# debug=True)
tt = time.time()
print x, 1000 * (tt - t)
print(x, 1000 * (tt - t))
print f
print 'EEK', f.call(socket.gethostname)
print 'MY PID', os.getpid()
print 'EEKERY', f.call(os.getpid)
print(f)
print('EEK', f.call(socket.gethostname))
print('MY PID', os.getpid())
print('EEKERY', f.call(os.getpid))

@ -6,12 +6,12 @@ import time
times = []
for x in xrange(5):
for x in range(5):
t0 = time.time()
os.spawnvp(os.P_WAIT, sys.argv[1], sys.argv[1:])
t = time.time() - t0
times.append(t)
print '+++', t
print('+++', t)
print 'all:', times
print 'min %s max %s diff %s' % (min(times), max(times), (max(times) - min(times)))
print('all:', times)
print('min %s max %s diff %s' % (min(times), max(times), (max(times) - min(times))))

@ -1,40 +1,47 @@
import os
import pickle
import sys
import unittest2
import mitogen.core
import testlib
import plain_old_module
class ConstructorTest(unittest2.TestCase):
klass = mitogen.core.CallError
def test_string_noargs(self):
e = self.klass('%s%s')
self.assertEquals(e[0], '%s%s')
self.assertEquals(e.args[0], '%s%s')
def test_string_args(self):
e = self.klass('%s%s', 1, 1)
self.assertEquals(e[0], '11')
self.assertEquals(e.args[0], '11')
def test_from_exc(self):
ve = ValueError('eek')
ve = plain_old_module.MyError('eek')
e = self.klass(ve)
self.assertEquals(e[0], 'exceptions.ValueError: eek')
self.assertEquals(e.args[0], 'plain_old_module.MyError: eek')
def test_form_base_exc(self):
ve = SystemExit('eek')
e = self.klass(ve)
self.assertEquals(e[0], 'exceptions.SystemExit: eek')
self.assertEquals(e.args[0],
# varies across 2/3.
'%s.%s: eek' % (type(ve).__module__, type(ve).__name__))
def test_from_exc_tb(self):
try:
raise ValueError('eek')
except ValueError, ve:
raise plain_old_module.MyError('eek')
except plain_old_module.MyError:
ve = sys.exc_info()[1]
e = self.klass(ve)
self.assertTrue(e[0].startswith('exceptions.ValueError: eek'))
self.assertTrue('test_from_exc_tb' in e[0])
self.assertTrue(e.args[0].startswith('plain_old_module.MyError: eek'))
self.assertTrue('test_from_exc_tb' in e.args[0])
class PickleTest(unittest2.TestCase):
@ -43,28 +50,29 @@ class PickleTest(unittest2.TestCase):
def test_string_noargs(self):
e = self.klass('%s%s')
e2 = pickle.loads(pickle.dumps(e))
self.assertEquals(e2[0], '%s%s')
self.assertEquals(e2.args[0], '%s%s')
def test_string_args(self):
e = self.klass('%s%s', 1, 1)
e2 = pickle.loads(pickle.dumps(e))
self.assertEquals(e2[0], '11')
self.assertEquals(e2.args[0], '11')
def test_from_exc(self):
ve = ValueError('eek')
ve = plain_old_module.MyError('eek')
e = self.klass(ve)
e2 = pickle.loads(pickle.dumps(e))
self.assertEquals(e2[0], 'exceptions.ValueError: eek')
self.assertEquals(e2.args[0], 'plain_old_module.MyError: eek')
def test_from_exc_tb(self):
try:
raise ValueError('eek')
except ValueError, ve:
raise plain_old_module.MyError('eek')
except plain_old_module.MyError:
ve = sys.exc_info()[1]
e = self.klass(ve)
e2 = pickle.loads(pickle.dumps(e))
self.assertTrue(e2[0].startswith('exceptions.ValueError: eek'))
self.assertTrue('test_from_exc_tb' in e2[0])
self.assertTrue(e2.args[0].startswith('plain_old_module.MyError: eek'))
self.assertTrue('test_from_exc_tb' in e2.args[0])
if __name__ == '__main__':

@ -7,6 +7,7 @@ import mitogen.core
import mitogen.master
import testlib
import plain_old_module
class CrazyType(object):
@ -18,7 +19,7 @@ def function_that_adds_numbers(x, y):
def function_that_fails():
raise ValueError('exception text')
raise plain_old_module.MyError('exception text')
def func_with_bad_return_value():
@ -49,7 +50,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
s = str(exc)
etype, _, s = s.partition(': ')
self.assertEqual(etype, 'exceptions.ValueError')
self.assertEqual(etype, 'plain_old_module.MyError')
msg, _, s = s.partition('\n')
self.assertEqual(msg, 'exception text')
@ -61,7 +62,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
exc = self.assertRaises(mitogen.core.StreamError,
lambda: self.local.call(func_with_bad_return_value))
self.assertEquals(
exc[0],
exc.args[0],
"cannot unpickle '%s'/'CrazyType'" % (__name__,),
)
@ -72,7 +73,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
self.broker.defer(stream.on_disconnect, self.broker)
exc = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(exc[0], mitogen.core.ChannelError.local_msg)
self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg)
def test_aborted_on_local_broker_shutdown(self):
stream = self.router._stream_by_id[self.local.context_id]
@ -81,7 +82,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
self.broker.shutdown()
exc = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(exc[0], mitogen.core.ChannelError.local_msg)
self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg)
def test_accepts_returns_context(self):
context = self.local.call(func_accepts_returns_context, self.local)

@ -3,4 +3,4 @@ import sys
def say_hi():
print 'hi'
print('hi')

@ -6,9 +6,13 @@ fiddlery.
import math
class MyError(Exception):
pass
def get_sentinel_value():
# Some proof we're even talking to the mitogen-test Docker image
return open('/etc/sentinel').read()
return open('/etc/sentinel').read().decode()
def add(x, y):

@ -12,4 +12,4 @@ def repr_stuff():
@mitogen.main()
def main(router):
context = router.local()
print context.call(repr_stuff)
print(context.call(repr_stuff))

@ -30,8 +30,8 @@ import os.path
try:
import six as _system_six
print('six_brokenpkg: using system six:', _system_six)
except ImportError, e:
print('six_brokenpkg: no system six available', e)
except ImportError:
print('six_brokenpkg: no system six available')
_system_six = None
if _system_six:

@ -4,6 +4,8 @@ import subprocess
import unittest2
import mitogen.parent
from mitogen.core import b
import testlib
@ -36,7 +38,7 @@ class CommandLineTest(testlib.RouterMixin, testlib.TestCase):
stdout, stderr = proc.communicate()
self.assertEquals(0, proc.returncode)
self.assertEquals(mitogen.parent.Stream.EC0_MARKER, stdout)
self.assertIn("Error -5 while decompressing data: incomplete or truncated stream", stderr)
self.assertIn(b("Error -5 while decompressing data: incomplete or truncated stream"), stderr)
if __name__ == '__main__':

@ -19,8 +19,12 @@ PLATFORM_TO_PATH = {
('darwin', True): '/usr/lib/libssl.dylib',
('linux2', False): '/usr/lib/libssl.so',
('linux2', True): '/usr/lib/x86_64-linux-gnu/libssl.so',
# Python 2.6
('linux3', False): '/usr/lib/libssl.so',
('linux3', True): '/usr/lib/x86_64-linux-gnu/libssl.so',
# Python 3
('linux', False): '/usr/lib/libssl.so',
('linux', True): '/usr/lib/x86_64-linux-gnu/libssl.so',
}
c_ssl = ctypes.CDLL(PLATFORM_TO_PATH[sys.platform, IS_64BIT])

@ -11,6 +11,7 @@ import unittest2
import mitogen.core
import mitogen.utils
from mitogen.core import b
import testlib
@ -45,7 +46,7 @@ class ImporterMixin(testlib.RouterMixin):
class LoadModuleTest(ImporterMixin, testlib.TestCase):
data = zlib.compress("data = 1\n\n")
data = zlib.compress(b("data = 1\n\n"))
path = 'fake_module.py'
modname = 'fake_module'
@ -83,7 +84,7 @@ class LoadModuleTest(ImporterMixin, testlib.TestCase):
class LoadSubmoduleTest(ImporterMixin, testlib.TestCase):
data = zlib.compress("data = 1\n\n")
data = zlib.compress(b("data = 1\n\n"))
path = 'fake_module.py'
modname = 'mypkg.fake_module'
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related
@ -96,7 +97,7 @@ class LoadSubmoduleTest(ImporterMixin, testlib.TestCase):
class LoadModulePackageTest(ImporterMixin, testlib.TestCase):
data = zlib.compress("func = lambda: 1\n\n")
data = zlib.compress(b("func = lambda: 1\n\n"))
path = 'fake_pkg/__init__.py'
modname = 'fake_pkg'
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related
@ -117,7 +118,8 @@ class LoadModulePackageTest(ImporterMixin, testlib.TestCase):
self.set_get_module_response(self.response)
mod = self.importer.load_module(self.modname)
source = mod.__loader__.get_source(self.modname)
self.assertEquals(source, zlib.decompress(self.data))
self.assertEquals(source,
mitogen.core.to_text(zlib.decompress(self.data)))
def test_module_loader_set(self):
self.set_get_module_response(self.response)

@ -1,4 +1,5 @@
import sys
import threading
import unittest2
@ -66,7 +67,8 @@ class ThreadedGetTest(testlib.TestCase):
def _worker(self, func):
try:
self.results.append(func())
except Exception, e:
except Exception:
e = sys.exc_info()[1]
self.results.append(None)
self.excs.append(e)
@ -89,12 +91,12 @@ class ThreadedGetTest(testlib.TestCase):
def test_five_threads(self):
latch = self.klass()
for x in xrange(5):
for x in range(5):
self.start_one(lambda: latch.get(timeout=3.0))
for x in xrange(5):
for x in range(5):
latch.put(x)
self.join()
self.assertEquals(sorted(self.results), range(5))
self.assertEquals(sorted(self.results), list(range(5)))
self.assertEquals(self.excs, [])
@ -171,7 +173,8 @@ class ThreadedCloseTest(testlib.TestCase):
def _worker(self, func):
try:
self.results.append(func())
except Exception, e:
except Exception:
e = sys.exc_info()[1]
self.results.append(None)
self.excs.append(e)
@ -195,7 +198,7 @@ class ThreadedCloseTest(testlib.TestCase):
def test_five_threads(self):
latch = self.klass()
for x in xrange(5):
for x in range(5):
self.start_one(lambda: latch.get(timeout=3.0))
latch.close()
self.join()

@ -9,15 +9,22 @@ import mitogen.master
class ScanCodeImportsTest(unittest2.TestCase):
func = staticmethod(mitogen.master.scan_code_imports)
if mitogen.core.PY3:
level = 0
else:
level = -1
SIMPLE_EXPECT = [
(level, 'inspect', ()),
(level, 'unittest2', ()),
(level, 'testlib', ()),
(level, 'mitogen.master', ()),
]
def test_simple(self):
source_path = inspect.getsourcefile(ScanCodeImportsTest)
co = compile(open(source_path).read(), source_path, 'exec')
self.assertEquals(list(self.func(co)), [
(-1, 'inspect', ()),
(-1, 'unittest2', ()),
(-1, 'testlib', ()),
(-1, 'mitogen.master', ()),
])
self.assertEquals(list(self.func(co)), self.SIMPLE_EXPECT)
if __name__ == '__main__':

@ -60,14 +60,14 @@ class GetModuleViaPkgutilTest(testlib.TestCase):
path, src, is_pkg = self.call('module_finder_testmod')
self.assertEquals(path,
testlib.data_path('module_finder_testmod/__init__.py'))
self.assertEquals('', src)
self.assertEquals(mitogen.core.b(''), src)
self.assertTrue(is_pkg)
def test_empty_source_module(self):
path, src, is_pkg = self.call('module_finder_testmod.empty_mod')
self.assertEquals(path,
testlib.data_path('module_finder_testmod/empty_mod.py'))
self.assertEquals('', src)
self.assertEquals(mitogen.core.b(''), src)
self.assertFalse(is_pkg)
def test_regular_mod(self):
@ -75,7 +75,8 @@ class GetModuleViaPkgutilTest(testlib.TestCase):
path, src, is_pkg = self.call('module_finder_testmod.regular_mod')
self.assertEquals(path,
testlib.data_path('module_finder_testmod/regular_mod.py'))
self.assertEquals(src, inspect.getsource(regular_mod))
self.assertEquals(mitogen.core.to_text(src),
inspect.getsource(regular_mod))
self.assertFalse(is_pkg)
@ -89,7 +90,7 @@ class GetModuleViaSysModulesTest(testlib.TestCase):
import __main__
path, src, is_pkg = self.call('__main__')
self.assertEquals(path, __main__.__file__)
self.assertEquals(src, open(path).read())
self.assertEquals(src, open(path, 'rb').read())
self.assertFalse(is_pkg)
def test_dylib_fails(self):
@ -119,7 +120,7 @@ class ResolveRelPathTest(testlib.TestCase):
self.assertEquals('', self.call('email.utils', 0))
def test_rel1(self):
self.assertEquals('email', self.call('email.utils', 1))
self.assertEquals('email.', self.call('email.utils', 1))
def test_rel2(self):
self.assertEquals('', self.call('email.utils', 2))
@ -128,7 +129,35 @@ class ResolveRelPathTest(testlib.TestCase):
self.assertEquals('', self.call('email.utils', 3))
class FindRelatedImportsTest(testlib.TestCase):
class DjangoMixin(object):
WEBPROJECT_PATH = testlib.data_path('webproject')
# TODO: rip out Django and replace with a static tree of weird imports that
# don't depend on .. Django! The hack below is because the version of
# Django we need to test against 2.6 doesn't actually run on 3.6. But we
# don't care, we just need to be able to import it.
#
# File "django/utils/html_parser.py", line 12, in <module>
# AttributeError: module 'html.parser' has no attribute 'HTMLParseError'
#
import pkg_resources._vendor.six
from django.utils.six.moves import html_parser as _html_parser
_html_parser.HTMLParseError = Exception
@classmethod
def setUpClass(cls):
super(DjangoMixin, cls).setUpClass()
sys.path.append(cls.WEBPROJECT_PATH)
os.environ['DJANGO_SETTINGS_MODULE'] = 'webproject.settings'
@classmethod
def tearDownClass(cls):
sys.path.remove(cls.WEBPROJECT_PATH)
del os.environ['DJANGO_SETTINGS_MODULE']
super(DjangoMixin, cls).tearDownClass()
class FindRelatedImportsTest(DjangoMixin, testlib.TestCase):
klass = mitogen.master.ModuleFinder
def call(self, fullname):
@ -179,7 +208,7 @@ class FindRelatedImportsTest(testlib.TestCase):
])
class FindRelatedTest(testlib.TestCase):
class FindRelatedTest(DjangoMixin, testlib.TestCase):
klass = mitogen.master.ModuleFinder
def call(self, fullname):
@ -187,15 +216,15 @@ class FindRelatedTest(testlib.TestCase):
SIMPLE_EXPECT = set([
'mitogen',
'mitogen.compat',
'mitogen.compat.collections',
'mitogen.compat.functools',
'mitogen.core',
'mitogen.master',
'mitogen.minify',
'mitogen.parent',
])
if sys.version_info < (3, 2):
SIMPLE_EXPECT.add('mitogen.compat')
SIMPLE_EXPECT.add('mitogen.compat.functools')
if sys.version_info < (2, 7):
SIMPLE_EXPECT.add('mitogen.compat.tokenize')
@ -205,27 +234,13 @@ class FindRelatedTest(testlib.TestCase):
self.assertEquals(set(related), self.SIMPLE_EXPECT)
class DjangoFindRelatedTest(testlib.TestCase):
class DjangoFindRelatedTest(DjangoMixin, testlib.TestCase):
klass = mitogen.master.ModuleFinder
maxDiff = None
def call(self, fullname):
return self.klass().find_related(fullname)
WEBPROJECT_PATH = testlib.data_path('webproject')
@classmethod
def setUpClass(cls):
super(DjangoFindRelatedTest, cls).setUpClass()
sys.path.append(cls.WEBPROJECT_PATH)
os.environ['DJANGO_SETTINGS_MODULE'] = 'webproject.settings'
@classmethod
def tearDownClass(cls):
sys.path.remove(cls.WEBPROJECT_PATH)
del os.environ['DJANGO_SETTINGS_MODULE']
super(DjangoFindRelatedTest, cls).tearDownClass()
def test_django_db(self):
import django.db
related = self.call('django.db')
@ -250,6 +265,9 @@ class DjangoFindRelatedTest(testlib.TestCase):
])
def test_django_db_models(self):
if sys.version_info >= (3, 0):
raise unittest2.SkipTest('broken due to ancient vendored six.py')
import django.db.models
related = self.call('django.db.models')
self.assertEquals(related, [
@ -289,25 +307,45 @@ class DjangoFindRelatedTest(testlib.TestCase):
'django.db.models.related',
'django.db.models.signals',
'django.db.models.sql',
'django.db.models.sql.aggregates',
'django.db.models.sql.constants',
'django.db.models.sql.datastructures',
'django.db.models.sql.expressions',
'django.db.models.sql.query',
'django.db.models.sql.subqueries',
'django.db.models.sql.where',
'django.db.transaction',
'django.db.utils',
'django.dispatch',
'django.dispatch.dispatcher',
'django.dispatch.saferef',
'django.forms',
'django.forms.fields',
'django.forms.forms',
'django.forms.formsets',
'django.forms.models',
'django.forms.util',
'django.forms.widgets',
'django.utils',
'django.utils._os',
'django.utils.crypto',
'django.utils.datastructures',
'django.utils.dateformat',
'django.utils.dateparse',
'django.utils.dates',
'django.utils.datetime_safe',
'django.utils.decorators',
'django.utils.deprecation',
'django.utils.encoding',
'django.utils.formats',
'django.utils.functional',
'django.utils.html',
'django.utils.html_parser',
'django.utils.importlib',
'django.utils.ipv6',
'django.utils.itercompat',
'django.utils.module_loading',
'django.utils.numberformat',
'django.utils.safestring',
'django.utils.six',
'django.utils.text',
@ -316,6 +354,10 @@ class DjangoFindRelatedTest(testlib.TestCase):
'django.utils.tree',
'django.utils.tzinfo',
'pkg_resources',
'pkg_resources.extern',
'pkg_resources.extern.appdirs',
'pkg_resources.extern.packaging',
'pkg_resources.extern.six',
'pytz',
'pytz.exceptions',
'pytz.tzfile',

@ -9,7 +9,6 @@ class NestedTest(testlib.RouterMixin, testlib.TestCase):
def test_nested(self):
context = None
for x in range(1, 11):
#print 'Connect local%d via %s' % (x, context)
context = self.router.local(via=context, name='local%d' % x)
pid = context.call(os.getpid)

@ -57,7 +57,8 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
connect_timeout=3,
)
)
self.assertEquals(e.args[0], "EOF on stream; last 300 bytes received: ''")
prefix = "EOF on stream; last 300 bytes received: "
self.assertTrue(e.args[0].startswith(prefix))
def test_via_eof(self):
# Verify FD leakage does not keep failed process open.
@ -69,7 +70,8 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
connect_timeout=3,
)
)
self.assertTrue("EOF on stream; last 300 bytes received: ''" in e.args[0])
s = "EOF on stream; last 300 bytes received: "
self.assertTrue(s in e.args[0])
def test_direct_enoent(self):
e = self.assertRaises(mitogen.core.StreamError,
@ -78,7 +80,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
connect_timeout=3,
)
)
prefix = 'Child start failed: [Errno 2] No such file or directory.'
prefix = 'Child start failed: [Errno 2] No such file or directory'
self.assertTrue(e.args[0].startswith(prefix))
def test_via_enoent(self):
@ -90,7 +92,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
connect_timeout=3,
)
)
s = 'Child start failed: [Errno 2] No such file or directory.'
s = 'Child start failed: [Errno 2] No such file or directory'
self.assertTrue(s in e.args[0])
@ -123,12 +125,12 @@ class TtyCreateChildTest(unittest2.TestCase):
])
deadline = time.time() + 5.0
for line in mitogen.parent.iter_read([fd], deadline):
self.assertEquals('hi\n', line)
self.assertEquals(mitogen.core.b('hi\n'), line)
break
waited_pid, status = os.waitpid(pid, 0)
self.assertEquals(pid, waited_pid)
self.assertEquals(0, status)
self.assertEquals('', tf.read())
self.assertEquals(mitogen.core.b(''), tf.read())
finally:
tf.close()
@ -194,7 +196,7 @@ class WriteAllTest(unittest2.TestCase):
mitogen.core.set_nonblock(proc.stdin.fileno())
return proc
ten_ms_chunk = ('x' * 65535)
ten_ms_chunk = (mitogen.core.b('x') * 65535)
def test_no_deadline(self):
proc = self.make_proc()

@ -6,7 +6,7 @@ import testlib
def yield_stuff_then_die(sender):
for x in xrange(5):
for x in range(5):
sender.send(x)
sender.close()
return 10

@ -30,7 +30,7 @@ class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase):
# Ensure a program composed of a single script can be imported
# successfully.
args = [sys.executable, testlib.data_path('self_contained_program.py')]
output = testlib.subprocess__check_output(args)
output = testlib.subprocess__check_output(args).decode()
self.assertEquals(output, "['__main__', 50]\n")
@ -45,7 +45,7 @@ class BrokenModulesTest(unittest2.TestCase):
router.stream_by_id = lambda n: stream
msg = mitogen.core.Message(
data='non_existent_module',
data=mitogen.core.b('non_existent_module'),
reply_to=50,
)
msg.router = router
@ -74,7 +74,7 @@ class BrokenModulesTest(unittest2.TestCase):
router.stream_by_id = lambda n: stream
msg = mitogen.core.Message(
data='six_brokenpkg._six',
data=mitogen.core.b('six_brokenpkg._six'),
reply_to=50,
)
msg.router = router

@ -1,5 +1,3 @@
import Queue
import StringIO
import logging
import subprocess
import time
@ -11,6 +9,11 @@ import mitogen.master
import mitogen.parent
import mitogen.utils
try:
import Queue
except ImportError:
import queue as Queue
def ping():
return True
@ -142,7 +145,7 @@ class PolicyTest(testlib.RouterMixin, testlib.TestCase):
# Verify CallError received by reply_to target.
e = self.assertRaises(mitogen.core.CallError,
lambda: reply_target.get().unpickle())
self.assertEquals(e[0], self.router.refused_msg)
self.assertEquals(e.args[0], self.router.refused_msg)
class CrashTest(testlib.BrokerMixin, unittest2.TestCase):

@ -0,0 +1,37 @@
try:
from io import StringIO
from io import BytesIO
except ImportError:
from StringIO import StringIO as StringIO
from StringIO import StringIO as BytesIO
import unittest2
import mitogen.core
from mitogen.core import b
def roundtrip(v):
msg = mitogen.core.Message.pickled(v)
return mitogen.core.Message(data=msg.data).unpickle()
class BlobTest(unittest2.TestCase):
klass = mitogen.core.Blob
# Python 3 pickle protocol 2 does weird stuff depending on whether an empty
# or nonempty bytes is being serialized. For non-empty, it yields a
# _codecs.encode() call. For empty, it yields a bytes() call.
def test_nonempty_bytes(self):
v = mitogen.core.Blob(b('dave'))
self.assertEquals(b('dave'), roundtrip(v))
def test_empty_bytes(self):
v = mitogen.core.Blob(b(''))
self.assertEquals(b(''), roundtrip(v))
if __name__ == '__main__':
unittest2.main()

@ -83,7 +83,7 @@ class PermissionTest(testlib.RouterMixin, testlib.TestCase):
exc = self.assertRaises(mitogen.core.CallError, lambda:
l2.call(call_service_in, l1, MyService.name(), 'privileged_op'))
msg = mitogen.service.Invoker.unauthorized_msg % (
'privileged_op',
u'privileged_op',
MyService.name(),
)
self.assertTrue(msg in exc.args[0])

@ -6,4 +6,4 @@ daemon from the environment.
"""
import testlib
print testlib.get_docker_host()
print(testlib.get_docker_host())

@ -0,0 +1,35 @@
#!/usr/bin/env python
"""
Put the machine's CPUs under pressure to increase the likelihood of scheduling
weirdness. Useful for exposing otherwise difficult to hit races in the library.
"""
import ctypes
import multiprocessing
import os
import time
LIBC = ctypes.CDLL('libc.so.6')
sched_yield = LIBC.sched_yield
def burn():
while 1:
a, b, c = os.urandom(3)
n = int(((ord(a) << 16) |
(ord(b) << 8) |
(ord(c) << 0)) / 1.6)
print(n)
for x in xrange(n): pass
sched_yield()
mul = 1.5
count = int(mul * multiprocessing.cpu_count())
print count
procs = [multiprocessing.Process(target=burn)
for _ in range(count)]
for i, proc in enumerate(procs):
print([i])
proc.start()

@ -26,10 +26,10 @@ def cons():
try:
while 1:
g = l.get()
print 'got=%s consumed=%s produced=%s crash=%s' % (g, consumed, produced, crash)
print('got=%s consumed=%s produced=%s crash=%s' % (g, consumed, produced, crash))
consumed += 1
time.sleep(g)
for x in xrange(int(g * 1000)):
for x in range(int(g * 1000)):
pass
except:
crash += 1

@ -1,3 +1,5 @@
import sys
import mitogen
import mitogen.ssh
import mitogen.utils
@ -53,10 +55,10 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
username='mitogen__has_sudo',
)
assert 0, 'exception not thrown'
except mitogen.ssh.PasswordError, e:
pass
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e[0], self.stream_class.password_required_msg)
self.assertEqual(e.args[0], self.stream_class.password_required_msg)
def test_password_incorrect(self):
try:
@ -65,10 +67,10 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
password='badpw',
)
assert 0, 'exception not thrown'
except mitogen.ssh.PasswordError, e:
pass
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e[0], self.stream_class.password_incorrect_msg)
self.assertEqual(e.args[0], self.stream_class.password_incorrect_msg)
def test_password_specified(self):
context = self.docker_ssh(
@ -87,10 +89,10 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
username='mitogen__has_sudo_pubkey',
)
assert 0, 'exception not thrown'
except mitogen.ssh.PasswordError, e:
pass
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e[0], self.stream_class.password_required_msg)
self.assertEqual(e.args[0], self.stream_class.password_required_msg)
def test_pubkey_specified(self):
context = self.docker_ssh(

@ -1,5 +1,4 @@
import StringIO
import logging
import os
import random
@ -8,7 +7,6 @@ import socket
import subprocess
import sys
import time
import urlparse
import unittest2
@ -16,7 +14,18 @@ import mitogen.core
import mitogen.master
import mitogen.utils
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
LOG = logging.getLogger(__name__)
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
sys.path.append(DATA_DIR)
@ -28,7 +37,7 @@ def data_path(suffix):
path = os.path.join(DATA_DIR, suffix)
if path.endswith('.key'):
# SSH is funny about private key permissions.
os.chmod(path, 0600)
os.chmod(path, int('0600', 8))
return path
@ -82,7 +91,7 @@ def wait_for_port(
return
sock.settimeout(receive_timeout)
data = ''
data = mitogen.core.b('')
found = False
while time.time() < end:
try:
@ -97,21 +106,25 @@ def wait_for_port(
break
data += resp
if re.search(pattern, data):
if re.search(mitogen.core.b(pattern), data):
found = True
break
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error, e:
# On Mac OS X - a BSD variant - the above code only succeeds if the operating system thinks that the
# socket is still open when shutdown() is invoked. If Python is too slow and the FIN packet arrives
# before that statement can be reached, then OS X kills the sock.shutdown() statement with:
except socket.error:
e = sys.exc_info()[1]
# On Mac OS X - a BSD variant - the above code only succeeds if the
# operating system thinks that the socket is still open when
# shutdown() is invoked. If Python is too slow and the FIN packet
# arrives before that statement can be reached, then OS X kills the
# sock.shutdown() statement with:
#
# socket.error: [Errno 57] Socket is not connected
#
# Protect shutdown() with a try...except that catches the socket.error, test to make sure Errno is
# right, and ignore it if Errno matches.
# Protect shutdown() with a try...except that catches the
# socket.error, test to make sure Errno is right, and ignore it if
# Errno matches.
if e.errno == 57:
pass
else:
@ -147,7 +160,7 @@ def sync_with_broker(broker, timeout=10.0):
class LogCapturer(object):
def __init__(self, name=None):
self.sio = StringIO.StringIO()
self.sio = StringIO()
self.logger = logging.getLogger(name)
self.handler = logging.StreamHandler(self.sio)
self.old_propagate = self.logger.propagate
@ -169,9 +182,12 @@ class TestCase(unittest2.TestCase):
raised. Can't use context manager because tests must run on Python2.4"""
try:
func(*args, **kwargs)
except exc, e:
except exc:
e = sys.exc_info()[1]
return e
except BaseException, e:
except BaseException:
LOG.exception('Original exception')
e = sys.exc_info()[1]
assert 0, '%r raised %r, not %r' % (func, e, exc)
assert 0, '%r did not raise %r' % (func, exc)
@ -200,7 +216,7 @@ class DockerizedSshDaemon(object):
def _get_container_port(self):
s = subprocess__check_output(['docker', 'port', self.container_name])
for line in s.splitlines():
for line in s.decode().splitlines():
dport, proto, baddr, bport = self.PORT_RE.match(line).groups()
if dport == '22' and proto == 'tcp':
self.port = int(bport)
@ -277,7 +293,7 @@ class DockerMixin(RouterMixin):
kwargs.setdefault('hostname', self.dockerized_ssh.host)
kwargs.setdefault('port', self.dockerized_ssh.port)
kwargs.setdefault('check_host_keys', 'ignore')
kwargs.setdefault('ssh_debug_level', '3')
kwargs.setdefault('ssh_debug_level', 3)
return self.router.ssh(**kwargs)
def docker_ssh_any(self, **kwargs):

@ -0,0 +1,32 @@
import logging
import time
import unittest2
import mitogen.core
import mitogen.master
import testlib
def roundtrip(*args):
return args
class TwoThreeCompatTest(testlib.RouterMixin, testlib.TestCase):
if mitogen.core.PY3:
python_path = 'python2'
else:
python_path = 'python3'
def test_succeeds(self):
spare = self.router.fork()
target = self.router.local(python_path=self.python_path)
spare2, = target.call(roundtrip, spare)
self.assertEquals(spare.context_id, spare2.context_id)
self.assertEquals(spare.name, spare2.name)
if __name__ == '__main__':
unittest2.main()

@ -1,16 +1,22 @@
import cStringIO
try:
from io import StringIO
from io import BytesIO
except ImportError:
from StringIO import StringIO as StringIO
from StringIO import StringIO as BytesIO
import unittest2
import mitogen.core
from mitogen.core import b
class BlobTest(unittest2.TestCase):
klass = mitogen.core.Blob
def make(self):
return self.klass('x' * 128)
return self.klass(b('x') * 128)
def test_repr(self):
blob = self.make()
@ -18,14 +24,14 @@ class BlobTest(unittest2.TestCase):
def test_decays_on_constructor(self):
blob = self.make()
self.assertEquals('x'*128, mitogen.core.BytesType(blob))
self.assertEquals(b('x')*128, mitogen.core.BytesType(blob))
def test_decays_on_write(self):
blob = self.make()
io = cStringIO.StringIO()
io = BytesIO()
io.write(blob)
self.assertEquals(128, io.tell())
self.assertEquals('x'*128, io.getvalue())
self.assertEquals(b('x')*128, io.getvalue())
def test_message_roundtrip(self):
blob = self.make()
@ -53,7 +59,7 @@ class SecretTest(unittest2.TestCase):
def test_decays_on_write(self):
secret = self.make()
io = cStringIO.StringIO()
io = StringIO()
io.write(secret)
self.assertEquals(8, io.tell())
self.assertEquals('password', io.getvalue())
@ -64,8 +70,8 @@ class SecretTest(unittest2.TestCase):
secret2 = msg.unpickle()
self.assertEquals(type(secret), type(secret2))
self.assertEquals(repr(secret), repr(secret2))
self.assertEquals(mitogen.core.BytesType(secret),
mitogen.core.BytesType(secret2))
self.assertEquals(mitogen.core.b(secret),
mitogen.core.b(secret2))
if __name__ == '__main__':

Loading…
Cancel
Save