Initial Python 3.x port work.

* ansible: use unicode_literals everywhere since it only needs to be
  compatible back to 2.6.
* compat/collections.py: delete this entirely and rip out the parts of
  functools that require it.
* Introduce serializable Kwargs dict subclass that translates keys to
  Unicode on instantiation.
* enable_debug_logging() must set _v/_vv globals.
* cStringIO does not exist in 3.x.
* Treat IOLogger and LogForwarder input as latin-1.
* Avoid ResourceWarnings in first stage by explicitly closing fps.
* Fix preamble_size.py syntax errors.
pull/295/head
David Wilson 7 years ago
parent cb595b30d4
commit 410016ff47

@ -29,52 +29,52 @@ matrix:
- python: "2.7"
env: MODE=mitogen DISTRO=debian
# 2.7 -> 2.6
- python: "2.7"
env: MODE=mitogen DISTRO=centos6
#- python: "2.7"
#env: MODE=mitogen DISTRO=centos6
# 2.6 -> 2.7
- python: "2.6"
env: MODE=mitogen DISTRO=centos7
#- python: "2.6"
#env: MODE=mitogen DISTRO=centos7
# 2.6 -> 2.6
- python: "2.6"
env: MODE=mitogen DISTRO=centos6
#- python: "2.6"
#env: MODE=mitogen DISTRO=centos6
# 3.6 -> 2.7
- python: "3.6"
env: MODE=mitogen DISTRO=debian
# Debops tests.
# 2.4.3.0; 2.7 -> 2.7
- python: "2.7"
env: MODE=debops_common VER=2.4.3.0
#- python: "2.7"
#env: MODE=debops_common VER=2.4.3.0
# 2.5.5; 2.7 -> 2.7
- python: "2.7"
env: MODE=debops_common VER=2.5.5
#- python: "2.7"
#env: MODE=debops_common VER=2.5.5
# 2.5.5; 3.6 -> 2.7
- python: "3.6"
env: MODE=debops_common VER=2.5.5
#- python: "3.6"
#env: MODE=debops_common VER=2.5.5
# ansible_mitogen tests.
# 2.4.3.0; Debian; 2.7 -> 2.7
- python: "2.7"
env: MODE=ansible VER=2.4.3.0 DISTRO=debian
#- python: "2.7"
#env: MODE=ansible VER=2.4.3.0 DISTRO=debian
# 2.5.5; Debian; 2.7 -> 2.7
- python: "2.7"
env: MODE=ansible VER=2.5.5 DISTRO=debian
# 2.5.5; CentOS; 2.7 -> 2.7
- python: "2.7"
env: MODE=ansible VER=2.5.5 DISTRO=centos7
#- python: "2.7"
#env: MODE=ansible VER=2.5.5 DISTRO=centos7
# 2.5.5; CentOS; 2.7 -> 2.6
- python: "2.7"
env: MODE=ansible VER=2.5.5 DISTRO=centos6
#- python: "2.7"
#env: MODE=ansible VER=2.5.5 DISTRO=centos6
# 2.5.5; CentOS; 2.6 -> 2.7
- python: "2.6"
env: MODE=ansible VER=2.5.5 DISTRO=centos7
#- python: "2.6"
#env: MODE=ansible VER=2.5.5 DISTRO=centos7
# 2.5.5; CentOS; 2.6 -> 2.6
- python: "2.6"
env: MODE=ansible VER=2.5.5 DISTRO=centos6
#- python: "2.6"
#env: MODE=ansible VER=2.5.5 DISTRO=centos6
# 2.5.5; Debian; 3.6 -> 2.7
- python: "3.6"
env: MODE=ansible VER=2.5.5 DISTRO=centos6
# Sanity check our tests against vanilla Ansible, they should pass.
- python: "2.7"
env: MODE=ansible VER=2.5.5 DISTRO=debian STRATEGY=linear
#- python: "2.7"
#env: MODE=ansible VER=2.5.5 DISTRO=debian STRATEGY=linear

@ -27,6 +27,8 @@
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
from __future__ import unicode_literals
import logging
import os
import shlex
@ -239,7 +241,7 @@ def config_from_play_context(transport, inventory_name, connection):
'timeout': connection._play_context.timeout,
'ansible_ssh_timeout': connection.ansible_ssh_timeout,
'ssh_args': [
term
mitogen.core.to_text(term)
for s in (
getattr(connection._play_context, 'ssh_args', ''),
getattr(connection._play_context, 'ssh_common_args', ''),
@ -249,7 +251,7 @@ def config_from_play_context(transport, inventory_name, connection):
],
'become_exe': connection._play_context.become_exe,
'sudo_args': [
term
mitogen.core.to_text(term)
for s in (
connection._play_context.sudo_flags,
connection._play_context.become_flags
@ -528,7 +530,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return self.call_async(func, *args, **kwargs).get().unpickle()
finally:
LOG.debug('Call took %d ms: %s%r', 1000 * (time.time() - t0),
func.func_name, args)
func.__name__, args)
def create_fork_child(self):
"""

@ -27,13 +27,17 @@
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
import commands
import logging
import os
import pwd
import shutil
import traceback
try:
from shlex import quote as shlex_quote
except ImportError:
from pipes import quote as shlex_quote
from ansible.module_utils._text import to_bytes
from ansible.parsing.utils.jsonify import jsonify
@ -321,7 +325,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
ansible_mitogen.planner.Invocation(
action=self,
connection=self._connection,
module_name=mitogen.utils.cast(module_name),
module_name=mitogen.core.to_text(module_name),
module_args=mitogen.utils.cast(module_args),
task_vars=task_vars,
templar=self._templar,
@ -369,7 +373,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
if executable is None: # executable defaults to False
executable = self._play_context.executable
if executable:
cmd = executable + ' -c ' + commands.mkarg(cmd)
cmd = executable + ' -c ' + shlex_quote(cmd)
rc, stdout, stderr = self._connection.exec_command(
cmd=cmd,

@ -27,6 +27,8 @@
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
from __future__ import unicode_literals
import collections
import imp
import os

@ -35,6 +35,8 @@ files/modules known missing.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import json
import logging
import os
@ -43,6 +45,7 @@ import random
from ansible.executor import module_common
import ansible.errors
import ansible.module_utils
import mitogen.core
try:
from ansible.plugins.loader import module_loader
@ -71,11 +74,11 @@ def parse_script_interpreter(source):
"""
# Linux requires first 2 bytes with no whitespace, pretty sure it's the
# same everywhere. See binfmt_script.c.
if not source.startswith('#!'):
if not source.startswith(b'#!'):
return None, None
# Find terminating newline. Assume last byte of binprm_buf if absent.
nl = source.find('\n', 0, 128)
nl = source.find(b'\n', 0, 128)
if nl == -1:
nl = min(128, len(source))
@ -83,8 +86,8 @@ def parse_script_interpreter(source):
# bits just contains the interpreter filename.
bits = source[2:nl].strip().split(None, 1)
if len(bits) == 1:
return bits[0], None
return bits[0], bits[1]
return mitogen.core.to_text(bits[0]), None
return mitogen.core.to_text(bits[0]), mitogen.core.to_text(bits[1])
class Invocation(object):
@ -176,9 +179,11 @@ class Planner(object):
# named by `runner_name`.
}
"""
kwargs.setdefault('emulate_tty', True)
kwargs.setdefault('service_context', self._inv.connection.parent)
return kwargs
new = dict((mitogen.core.UnicodeType(k), kwargs[k])
for k in kwargs)
new.setdefault('emulate_tty', True)
new.setdefault('service_context', self._inv.connection.parent)
return new
def __repr__(self):
return '%s()' % (type(self).__name__,)
@ -202,7 +207,7 @@ class BinaryPlanner(Planner):
runner_name=self.runner_name,
module=self._inv.module_name,
path=self._inv.module_path,
args=self._inv.module_args,
json_args=json.dumps(self._inv.module_args),
env=self._inv.env,
**kwargs
)
@ -264,7 +269,7 @@ class WantJsonPlanner(ScriptPlanner):
runner_name = 'WantJsonRunner'
def detect(self):
return 'WANT_JSON' in self._inv.module_source
return b'WANT_JSON' in self._inv.module_source
class NewStylePlanner(ScriptPlanner):
@ -274,9 +279,10 @@ class NewStylePlanner(ScriptPlanner):
preprocessing the module.
"""
runner_name = 'NewStyleRunner'
marker = b'from ansible.module_utils.'
def detect(self):
return 'from ansible.module_utils.' in self._inv.module_source
return self.marker in self._inv.module_source
def _get_interpreter(self):
return None, None
@ -394,7 +400,7 @@ def get_module_data(name):
path = module_loader.find_plugin(name, '')
with open(path, 'rb') as fp:
source = fp.read()
return path, source
return mitogen.core.to_text(path), source
def _propagate_deps(invocation, planner, context):

@ -46,6 +46,8 @@ import mitogen.utils
import ansible_mitogen.logging
import ansible_mitogen.services
from mitogen.core import b
LOG = logging.getLogger(__name__)
@ -103,8 +105,8 @@ class MuxProcess(object):
cls.unix_listener_path = mitogen.unix.make_socket_path()
cls.worker_sock, cls.child_sock = socket.socketpair()
mitogen.core.set_cloexec(cls.worker_sock)
mitogen.core.set_cloexec(cls.child_sock)
mitogen.core.set_cloexec(cls.worker_sock.fileno())
mitogen.core.set_cloexec(cls.child_sock.fileno())
cls.child_pid = os.fork()
ansible_mitogen.logging.setup()
@ -129,7 +131,8 @@ class MuxProcess(object):
self._setup_services()
# Let the parent know our listening socket is ready.
mitogen.core.io_op(self.child_sock.send, '1')
mitogen.core.io_op(self.child_sock.send, b('1'))
self.child_sock.send(b('1'))
# Block until the socket is closed, which happens on parent exit.
mitogen.core.io_op(self.child_sock.recv, 1)
@ -178,7 +181,7 @@ class MuxProcess(object):
self.pool.stop(join=False)
try:
os.unlink(self.listener.path)
except OSError, e:
except OSError as e:
# Prevent a shutdown race with the parent process.
if e.args[0] != errno.ENOENT:
raise

@ -36,7 +36,8 @@ how to build arguments for it, preseed related data, etc.
"""
from __future__ import absolute_import
import cStringIO
from __future__ import unicode_literals
import ctypes
import errno
import imp
@ -50,6 +51,11 @@ import types
import mitogen.core
import ansible_mitogen.target # TODO: circular import
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
try:
from shlex import quote as shlex_quote
except ImportError:
@ -71,6 +77,7 @@ for symbol in 'res_init', '__res_init':
except AttributeError:
pass
iteritems = getattr(dict, 'iteritems', dict.items)
LOG = logging.getLogger(__name__)
@ -78,7 +85,7 @@ def utf8(s):
"""
Coerce an object to bytes if it is Unicode.
"""
if isinstance(s, unicode):
if isinstance(s, mitogen.core.UnicodeType):
s = s.encode('utf-8')
return s
@ -110,9 +117,13 @@ class Runner(object):
:param mitogen.core.Context service_context:
Context to which we should direct FileService calls. For now, always
the connection multiplexer process on the controller.
:param dict args:
:param str json_args:
Ansible module arguments. A mixture of user and internal keys created
by :meth:`ansible.plugins.action.ActionBase._execute_module`.
This is passed as a string rather than a dict in order to mimic the
implicit bytes/str conversion behaviour of a 2.x controller running
against a 3.x target.
:param dict env:
Additional environment variables to set during the run.
@ -123,16 +134,13 @@ class Runner(object):
When :data:`True`, indicate the runner should detach the context from
its parent after setup has completed successfully.
"""
def __init__(self, module, service_context, args=None, env=None,
def __init__(self, module, service_context, json_args, env=None,
econtext=None, detach=False):
if args is None:
args = {}
self.module = utf8(module)
self.module = module
self.service_context = service_context
self.econtext = econtext
self.detach = detach
self.args = args
self.args = json.loads(json_args)
self.env = env
def setup(self):
@ -236,7 +244,7 @@ class ModuleUtilsImporter(object):
def load_module(self, fullname):
path, is_pkg = self._by_fullname[fullname]
source = ansible_mitogen.target.get_small_file(self._context, path)
code = compile(source, path, 'exec')
code = compile(source, path, 'exec', 0, 1)
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = "master:%s" % (path,)
mod.__loader__ = self
@ -254,7 +262,7 @@ class TemporaryEnvironment(object):
def __init__(self, env=None):
self.original = os.environ.copy()
self.env = env or {}
os.environ.update((k, str(v)) for k, v in self.env.iteritems())
os.environ.update((k, str(v)) for k, v in iteritems(self.env))
def revert(self):
os.environ.clear()
@ -278,14 +286,11 @@ class NewStyleStdio(object):
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
self.original_stdin = sys.stdin
sys.stdout = cStringIO.StringIO()
sys.stderr = cStringIO.StringIO()
ansible.module_utils.basic._ANSIBLE_ARGS = json.dumps({
'ANSIBLE_MODULE_ARGS': args
})
sys.stdin = cStringIO.StringIO(
ansible.module_utils.basic._ANSIBLE_ARGS
)
sys.stdout = StringIO()
sys.stderr = StringIO()
encoded = json.dumps({'ANSIBLE_MODULE_ARGS': args})
ansible.module_utils.basic._ANSIBLE_ARGS = utf8(encoded)
sys.stdin = StringIO(mitogen.core.to_text(encoded))
def revert(self):
sys.stdout = self.original_stdout
@ -309,7 +314,7 @@ class ProgramRunner(Runner):
def __init__(self, path, emulate_tty=None, **kwargs):
super(ProgramRunner, self).__init__(**kwargs)
self.emulate_tty = emulate_tty
self.path = utf8(path)
self.path = path
def setup(self):
super(ProgramRunner, self).setup()
@ -368,7 +373,7 @@ class ProgramRunner(Runner):
args=self._get_program_args(),
emulate_tty=self.emulate_tty,
)
except Exception, e:
except Exception as e:
LOG.exception('While running %s', self._get_program_args())
return {
'rc': 1,
@ -378,8 +383,8 @@ class ProgramRunner(Runner):
return {
'rc': rc,
'stdout': stdout,
'stderr': stderr
'stdout': mitogen.core.to_text(stdout),
'stderr': mitogen.core.to_text(stderr),
}
@ -398,7 +403,7 @@ class ArgsFileRunner(Runner):
suffix='-args',
dir=ansible_mitogen.target.temp_dir,
)
self.args_fp.write(self._get_args_contents())
self.args_fp.write(utf8(self._get_args_contents()))
self.args_fp.flush()
reopen_readonly(self.program_fp)
@ -449,17 +454,17 @@ class ScriptRunner(ProgramRunner):
if not self.interpreter:
return s
shebang = '#!' + utf8(self.interpreter)
shebang = b'#!' + utf8(self.interpreter)
if self.interpreter_arg:
shebang += ' ' + utf8(self.interpreter_arg)
shebang += b' ' + utf8(self.interpreter_arg)
new = [shebang]
if os.path.basename(self.interpreter).startswith('python'):
new.append(self.b_ENCODING_STRING)
_, _, rest = s.partition('\n')
_, _, rest = s.partition(b'\n')
new.append(rest)
return '\n'.join(new)
return b'\n'.join(new)
class NewStyleRunner(ScriptRunner):
@ -533,15 +538,20 @@ class NewStyleRunner(ScriptRunner):
except KeyError:
return self._code_by_path.setdefault(self.path, compile(
source=self.source,
filename=self.path,
filename="master:" + self.path,
mode='exec',
dont_inherit=True,
))
if mitogen.core.PY3:
main_module_name = '__main__'
else:
main_module_name = b'__main__'
def _run(self):
code = self._get_code()
mod = types.ModuleType('__main__')
mod = types.ModuleType(self.main_module_name)
mod.__package__ = None
# Some Ansible modules use __file__ to find the Ansiballz temporary
# directory. We must provide some temporary path in __file__, but we
@ -551,25 +561,28 @@ class NewStyleRunner(ScriptRunner):
ansible_mitogen.target.temp_dir,
'ansible_module_' + os.path.basename(self.path),
)
e = None
exc = None
try:
exec code in vars(mod)
except SystemExit, e:
pass
if mitogen.core.PY3:
exec(code, vars(mod))
else:
exec('exec code in vars(mod)')
except SystemExit as e:
exc = e
return {
'rc': e[0] if e else 2,
'stdout': sys.stdout.getvalue(),
'stderr': sys.stderr.getvalue(),
'rc': exc.args[0] if exc else 2,
'stdout': mitogen.core.to_text(sys.stdout.getvalue()),
'stderr': mitogen.core.to_text(sys.stderr.getvalue()),
}
class JsonArgsRunner(ScriptRunner):
JSON_ARGS = '<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>'
JSON_ARGS = b'<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>'
def _get_args_contents(self):
return json.dumps(self.args)
return json.dumps(self.args).encode()
def _rewrite_source(self, s):
return (

@ -38,6 +38,8 @@ when a child has completed a job.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import logging
import os
import os.path
@ -53,6 +55,20 @@ import ansible_mitogen.target
LOG = logging.getLogger(__name__)
if sys.version_info[0] == 3:
def reraise(tp, value, tb):
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
else:
exec(
"def reraise(tp, value, tb=None):\n"
" raise tp, value, tb\n"
)
class Error(Exception):
pass
@ -118,7 +134,7 @@ class ContextService(mitogen.service.Service):
while stack:
obj = stack.pop()
if isinstance(obj, dict):
stack.extend(sorted(obj.iteritems()))
stack.extend(sorted(obj.items()))
elif isinstance(obj, (list, tuple)):
stack.extend(obj)
else:
@ -351,8 +367,7 @@ class ContextService(mitogen.service.Service):
try:
result = self._wait_or_start(spec, via=via).get()
if isinstance(result, tuple): # exc_info()
e1, e2, e3 = result
raise e1, e2, e3
reraise(*result)
via = result['context']
except mitogen.core.StreamError as e:
return {
@ -390,10 +405,10 @@ class ModuleDepService(mitogen.service.Service):
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'module_name': basestring,
'module_path': basestring,
'module_name': mitogen.core.UnicodeType,
'module_path': mitogen.core.FsPathTypes,
'search_path': tuple,
'builtin_path': basestring,
'builtin_path': mitogen.core.FsPathTypes,
'context': mitogen.core.Context,
})
def scan(self, module_name, module_path, search_path, builtin_path, context):

@ -32,6 +32,8 @@ for file transfer, module execution and sundry bits like changing file modes.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import errno
import grp
import json
@ -147,7 +149,7 @@ def prune_tree(path):
try:
os.unlink(path)
return
except OSError, e:
except OSError as e:
if not (os.path.isdir(path) and
e.args[0] in (errno.EPERM, errno.EISDIR)):
LOG.error('prune_tree(%r): %s', path, e)
@ -157,7 +159,7 @@ def prune_tree(path):
# Ensure write access for readonly directories. Ignore error in case
# path is on a weird filesystem (e.g. vfat).
os.chmod(path, int('0700', 8))
except OSError, e:
except OSError as e:
LOG.warning('prune_tree(%r): %s', path, e)
try:
@ -165,7 +167,7 @@ def prune_tree(path):
if name not in ('.', '..'):
prune_tree(os.path.join(path, name))
os.rmdir(path)
except OSError, e:
except OSError as e:
LOG.error('prune_tree(%r): %s', path, e)
@ -229,7 +231,7 @@ def init_child(econtext):
return {
'fork_context': _fork_parent,
'home_dir': os.path.expanduser('~'),
'home_dir': mitogen.core.to_text(os.path.expanduser('~')),
}
@ -319,12 +321,7 @@ class AsyncRunner(object):
'econtext': self.econtext,
'emulate_tty': False,
})
dct = run_module(kwargs)
if mitogen.core.PY3:
for key in 'stdout', 'stderr':
dct[key] = dct[key].decode('utf-8', 'surrogateescape')
return dct
return run_module(kwargs)
def _parse_result(self, dct):
filtered, warnings = (
@ -465,7 +462,7 @@ def exec_args(args, in_data='', chdir=None, shell=None, emulate_tty=False):
stdout, stderr = proc.communicate(in_data)
if emulate_tty:
stdout = stdout.replace('\n', '\r\n')
stdout = stdout.replace(b'\n', b'\r\n')
return proc.returncode, stdout, stderr or ''
@ -481,7 +478,7 @@ def exec_command(cmd, in_data='', chdir=None, shell=None, emulate_tty=False):
:return:
(return code, stdout bytes, stderr bytes)
"""
assert isinstance(cmd, basestring)
assert isinstance(cmd, mitogen.core.UnicodeType)
return exec_args(
args=[get_user_shell(), '-c', cmd],
in_data=in_data,

@ -848,9 +848,9 @@ Context Class
try:
# Prints output once it is received.
msg = recv.get()
print msg.unpickle()
print(msg.unpickle())
except mitogen.core.CallError, e:
print 'Call failed:', str(e)
print('Call failed:', str(e))
Asynchronous calls may be dispatched in parallel to multiple
contexts and consumed as they complete using
@ -1038,11 +1038,11 @@ Select Class
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.select.Select(recvs):
print 'Got %s from %s' % (msg, msg.receiver)
print('Got %s from %s' % (msg, msg.receiver))
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
print('Received total %s from %s receivers' % (total, len(recvs)))
:py:class:`Select` may drive a long-running scheduler:
@ -1069,7 +1069,7 @@ Select Class
]
for msg in mitogen.select.Select(selects):
print msg.unpickle()
print(msg.unpickle())
.. py:classmethod:: all (it)

@ -193,7 +193,7 @@ nested.py:
context = None
for x in range(1, 11):
print 'Connect local%d via %s' % (x, context)
print('Connect local%d via %s' % (x, context))
context = router.local(via=context, name='local%d' % x)
context.call(os.system, 'pstree -s python -s mitogen')

@ -225,13 +225,13 @@ caller until the return value is available or an exception is raised::
>>> import os
>>> # Returns the current time.
>>> print 'Time in remote context:', local.call(time.time)
>>> print('Time in remote context:', local.call(time.time))
>>> try:
... # Raises OSError.
... local.call(os.chdir, '/nonexistent')
... except mitogen.core.CallError, e:
... print 'Call failed:', str(e)
... print('Call failed:', str(e))
It is a simple wrapper around the more flexible :meth:`Context.call_async`,
which immediately returns a :class:`Receiver <mitogen.core.Receiver>` wired up
@ -242,7 +242,7 @@ is called to block the thread until the result arrives::
>>> call = local.call_async(time.time)
>>> msg = call.get()
>>> print msg.unpickle()
>>> print(msg.unpickle())
1507292737.75547
@ -259,12 +259,12 @@ We must therefore continue by writing our code as a script::
import mitogen.utils
def my_first_function():
print 'Hello from remote context!'
print('Hello from remote context!')
return 123
def main(router):
local = router.local()
print local.call(my_first_function)
print(local.call(my_first_function))
if __name__ == '__main__':
mitogen.utils.log_to_file(main)
@ -292,7 +292,7 @@ without the need for writing asynchronous code::
calls = [context.call(my_func) for context in contexts]
for msg in mitogen.select.Select(calls):
print 'Reply from %s: %s' % (recv.context, data)
print('Reply from %s: %s' % (recv.context, data))
Running Code That May Hang

@ -135,7 +135,7 @@ configuration.
# myapp/__init__.py, myapp/mypkg/__init__.py, and myapp/mypkg/mymodule.py
# are transferred automatically.
print context.call(myapp.mymodule.my_function)
print(context.call(myapp.mymodule.my_function))
As the forwarder reuses the import mechanism, it should integrate cleanly with
any tool such as `py2exe`_ that correctly implement the protocols in PEP-302,
@ -323,10 +323,10 @@ available.
total = 0
for msg in Select(c.call_async(usage, '/tmp') for c in contexts):
usage = msg.unpickle()
print 'Context %s /tmp usage: %d' % (recv.context, usage)
print('Context %s /tmp usage: %d' % (recv.context, usage))
total += usage
print 'Total /tmp usage across all contexts: %d' % (total,)
print('Total /tmp usage across all contexts: %d' % (total,))
Single File Programs
@ -361,7 +361,7 @@ usual into the slave process.
def main(broker):
if len(sys.argv) != 2:
print __doc__
print(__doc__)
sys.exit(1)
context = mitogen.ssh.connect(broker, sys.argv[1])

@ -182,7 +182,7 @@ def main(router):
"""
argv = sys.argv[1:]
if not len(argv):
print 'mitop: Need a list of SSH hosts to connect to.'
print('mitop: Need a list of SSH hosts to connect to.')
sys.exit(1)
delay = 2.0
@ -193,7 +193,7 @@ def main(router):
# connection, a Receiver to accept messages from the host, and finally
# start child_main() on the host to pump messages into the receiver.
for hostname in argv:
print 'Starting on', hostname
print('Starting on', hostname)
host = Host()
host.name = hostname

@ -14,4 +14,4 @@ mitogen.utils.log_to_file()
router, parent = mitogen.unix.connect('/tmp/mitosock')
with router:
print mitogen.service.call(parent, CONNECT_BY_ID, {})
print(mitogen.service.call(parent, CONNECT_BY_ID, {}))

@ -92,7 +92,7 @@ def main(log_level='INFO', profiling=False):
@mitogen.main()
def main(router):
z = router.ssh(hostname='k3')
print z.call(get_url, 'https://example.org/')
print(z.call(get_url, 'https://example.org/')))))
"""

@ -1,117 +0,0 @@
"""Selected backports from Python stdlib collections module
"""
__all__ = [
'namedtuple',
]
from operator import itemgetter as _itemgetter
from keyword import iskeyword as _iskeyword
import sys as _sys
try:
all([])
except NameError:
def all(iterable):
for element in iterable:
if not element:
return False
return True
def namedtuple(typename, field_names, verbose=False):
"""Returns a new subclass of tuple with named fields.
>>> Point = namedtuple('Point', 'x y')
>>> Point.__doc__ # docstring for the new class
'Point(x, y)'
>>> p = Point(11, y=22) # instantiate with positional args or keywords
>>> p[0] + p[1] # indexable like a plain tuple
33
>>> x, y = p # unpack like a regular tuple
>>> x, y
(11, 22)
>>> p.x + p.y # fields also accessable by name
33
>>> d = p._asdict() # convert to a dictionary
>>> d['x']
11
>>> Point(**d) # convert from a dictionary
Point(x=11, y=22)
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
Point(x=100, y=22)
"""
# Parse and validate the field names. Validation serves two purposes,
# generating informative error messages and preventing template injection attacks.
if isinstance(field_names, basestring):
field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas
field_names = tuple(map(str, field_names))
for name in (typename,) + field_names:
if not all(c.isalnum() or c=='_' for c in name):
raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name)
if _iskeyword(name):
raise ValueError('Type names and field names cannot be a keyword: %r' % name)
if name[0].isdigit():
raise ValueError('Type names and field names cannot start with a number: %r' % name)
seen_names = set()
for name in field_names:
if name.startswith('_'):
raise ValueError('Field names cannot start with an underscore: %r' % name)
if name in seen_names:
raise ValueError('Encountered duplicate field name: %r' % name)
seen_names.add(name)
# Create and fill-in the class template
numfields = len(field_names)
argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes
reprtxt = ', '.join('%s=%%r' % name for name in field_names)
dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names))
template = '''class %(typename)s(tuple):
'%(typename)s(%(argtxt)s)' \n
__slots__ = () \n
_fields = %(field_names)r \n
def __new__(_cls, %(argtxt)s):
return _tuple.__new__(_cls, (%(argtxt)s)) \n
@classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new %(typename)s object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != %(numfields)d:
raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result))
return result \n
def __repr__(self):
return '%(typename)s(%(reprtxt)s)' %% self \n
def _asdict(t):
'Return a new dict which maps field names to their values'
return {%(dicttxt)s} \n
def _replace(_self, **kwds):
'Return a new %(typename)s object replacing specified fields with new values'
result = _self._make(map(kwds.pop, %(field_names)r, _self))
if kwds:
raise ValueError('Got unexpected field names: %%r' %% kwds.keys())
return result \n
def __getnewargs__(self):
return tuple(self) \n\n''' % locals()
for i, name in enumerate(field_names):
template += ' %s = _property(_itemgetter(%d))\n' % (name, i)
if verbose:
print template
# Execute the template string in a temporary namespace and
# support tracing utilities by setting a value for frame.f_globals['__name__']
namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename,
_property=property, _tuple=tuple)
try:
exec template in namespace
except SyntaxError, e:
raise SyntaxError(e.message + ':\n' + template)
result = namespace[typename]
# For pickling to work, the __module__ variable needs to be set to the frame
# where the named tuple is created. Bypass this step in enviroments where
# sys._getframe is not defined (Jython for example).
if hasattr(_sys, '_getframe'):
result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__')
return result

@ -11,10 +11,6 @@ __all__ = [
'lru_cache',
]
try:
from collections import namedtuple
except ImportError:
from mitogen.compat.collections import namedtuple
from threading import RLock
@ -102,8 +98,6 @@ def partial(func, *args, **keywords):
### LRU Cache function decorator
################################################################################
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
class _HashedSeq(list):
""" This class guarantees that hash() will be called no more than once
per element. This is important because the lru_cache() will hash
@ -170,12 +164,12 @@ def lru_cache(maxsize=128, typed=False):
raise TypeError('Expected maxsize to be an integer or None')
def decorating_function(user_function):
wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
wrapper = _lru_cache_wrapper(user_function, maxsize, typed)
return update_wrapper(wrapper, user_function)
return decorating_function
def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
def _lru_cache_wrapper(user_function, maxsize, typed):
# Constants shared by all lru cache instances:
sentinel = object() # unique object used to signal cache misses
make_key = _make_key # build a key from the function arguments
@ -277,14 +271,6 @@ def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
lock.release()
return result
def cache_info():
"""Report cache statistics"""
lock.acquire()
try:
return _CacheInfo(hits, misses, maxsize, cache.__len__())
finally:
lock.release()
def cache_clear():
"""Clear the cache and cache statistics"""
lock.acquire()
@ -298,6 +284,5 @@ def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
finally:
lock.release()
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return wrapper

@ -33,6 +33,7 @@ bootstrap implementation sent to every new slave context.
"""
import collections
import encodings.latin_1
import errno
import fcntl
import imp
@ -54,9 +55,9 @@ import zlib
select = __import__('select')
try:
import cPickle
import cPickle as pickle
except ImportError:
import pickle as cPickle
import pickle
try:
from cStringIO import StringIO as BytesIO
@ -72,6 +73,8 @@ LOG = logging.getLogger('mitogen')
IOLOG = logging.getLogger('mitogen.io')
IOLOG.setLevel(logging.INFO)
LATIN1_CODEC = encodings.latin_1.Codec()
_v = False
_vv = False
@ -95,14 +98,24 @@ except NameError:
PY3 = sys.version_info > (3,)
if PY3:
b = lambda s: s.encode('latin-1')
b = str.encode
BytesType = bytes
UnicodeType = unicode
UnicodeType = str
FsPathTypes = (str,)
BufferType = lambda buf, start: memoryview(buf)[start:]
long = int
else:
b = str
BytesType = str
FsPathTypes = (str, unicode)
BufferType = buffer
UnicodeType = unicode
AnyTextType = (BytesType, UnicodeType)
if sys.version_info < (2, 5):
next = lambda it: it.next()
CHUNK_SIZE = 131072
_tls = threading.local()
@ -121,6 +134,8 @@ class Error(Exception):
def __init__(self, fmt=None, *args):
if args:
fmt %= args
if fmt and not isinstance(fmt, UnicodeType):
fmt = fmt.decode('utf-8')
Exception.__init__(self, fmt)
@ -140,6 +155,8 @@ class Secret(UnicodeType):
def __repr__(self):
return '[secret]'
if not PY3:
# TODO: what is this needed for in 2.x?
def __str__(self):
return UnicodeType(self)
@ -147,6 +164,22 @@ class Secret(UnicodeType):
return (Secret, (UnicodeType(self),))
class Kwargs(dict):
if PY3:
def __init__(self, dct):
for k, v in dct.items():
if type(k) is bytes:
self[k.decode()] = v
else:
self[k] = v
def __repr__(self):
return 'Kwargs(%s)' % (dict.__repr__(self),)
def __reduce__(self):
return (Kwargs, (dict(self),))
class CallError(Error):
def __init__(self, fmt=None, *args):
if not isinstance(fmt, BaseException):
@ -162,11 +195,11 @@ class CallError(Error):
Error.__init__(self, fmt)
def __reduce__(self):
return (_unpickle_call_error, (self[0],))
return (_unpickle_call_error, (self.args[0],))
def _unpickle_call_error(s):
if not (type(s) is str and len(s) < 10000):
if not (type(s) is UnicodeType and len(s) < 10000):
raise TypeError('cannot unpickle CallError: bad input')
inst = CallError.__new__(CallError)
Exception.__init__(inst, s)
@ -186,6 +219,14 @@ class TimeoutError(Error):
pass
def to_text(o):
if isinstance(o, UnicodeType):
return UnicodeType(o)
if isinstance(o, BytesType):
return o.decode('utf-8')
return UnicodeType(o)
def has_parent_authority(msg, _stream=None):
return (msg.auth_id == mitogen.context_id or
msg.auth_id in mitogen.parent_ids)
@ -247,9 +288,9 @@ def io_op(func, *args):
except (select.error, OSError, IOError):
e = sys.exc_info()[1]
_vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
if e[0] == errno.EINTR:
if e.args[0] == errno.EINTR:
continue
if e[0] in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
if e.args[0] in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
return None, True
raise
@ -326,13 +367,25 @@ def import_module(modname):
return __import__(modname, None, None, [''])
if PY3:
# In 3.x Unpickler is a class exposing find_class as an overridable, but it
# cannot be overridden without subclassing.
class _Unpickler(pickle.Unpickler):
def find_class(self, module, func):
return self.find_global(module, func)
else:
# In 2.x Unpickler is a function exposing a writeable find_global
# attribute.
_Unpickler = pickle.Unpickler
class Message(object):
dst_id = None
src_id = None
auth_id = None
handle = None
reply_to = None
data = ''
data = b('')
_unpickled = object()
router = None
@ -342,7 +395,7 @@ class Message(object):
self.src_id = mitogen.context_id
self.auth_id = mitogen.context_id
vars(self).update(kwargs)
assert isinstance(self.data, str)
assert isinstance(self.data, BytesType)
def _unpickle_context(self, context_id, name):
return _unpickle_context(self.router, context_id, name)
@ -350,6 +403,10 @@ class Message(object):
def _unpickle_sender(self, context_id, dst_handle):
return _unpickle_sender(self.router, context_id, dst_handle)
def _unpickle_bytes(self, s, encoding):
s, n = LATIN1_CODEC.encode(s)
return s
def _find_global(self, module, func):
"""Return the class implementing `module_name.class_name` or raise
`StreamError` if the module is not whitelisted."""
@ -364,6 +421,10 @@ class Message(object):
return Blob
elif func == 'Secret':
return Secret
elif func == 'Kwargs':
return Kwargs
elif module == '_codecs' and func == 'encode':
return self._unpickle_bytes
raise StreamError('cannot unpickle %r/%r', module, func)
@property
@ -378,10 +439,10 @@ class Message(object):
def pickled(cls, obj, **kwargs):
self = cls(**kwargs)
try:
self.data = cPickle.dumps(obj, protocol=2)
except cPickle.PicklingError:
self.data = pickle.dumps(obj, protocol=2)
except pickle.PicklingError:
e = sys.exc_info()[1]
self.data = cPickle.dumps(CallError(e), protocol=2)
self.data = pickle.dumps(CallError(e), protocol=2)
return self
def reply(self, msg, router=None, **kwargs):
@ -395,6 +456,11 @@ class Message(object):
else:
LOG.debug('Message.reply(): discarding due to zero handle: %r', msg)
if PY3:
UNPICKLER_KWARGS = {'encoding': 'bytes'}
else:
UNPICKLER_KWARGS = {}
def unpickle(self, throw=True, throw_dead=True):
"""Deserialize `data` into an object."""
_vv and IOLOG.debug('%r.unpickle()', self)
@ -404,12 +470,8 @@ class Message(object):
obj = self._unpickled
if obj is Message._unpickled:
fp = BytesIO(self.data)
unpickler = cPickle.Unpickler(fp)
try:
unpickler = _Unpickler(fp, **self.UNPICKLER_KWARGS)
unpickler.find_global = self._find_global
except AttributeError:
unpickler.find_class = self._find_global
try:
# Must occur off the broker thread.
obj = unpickler.load()
@ -513,12 +575,10 @@ class Receiver(object):
def __iter__(self):
while True:
try:
msg = self.get()
msg.unpickle() # Cause .remote_msg to be thrown.
yield msg
except ChannelError:
if msg.is_dead:
return
yield msg
class Channel(Sender, Receiver):
@ -573,6 +633,8 @@ class Importer(object):
# but very unlikely to trigger a bug report.
'org',
]
if PY3:
self.blacklist += ['cStringIO']
# Presence of an entry in this map indicates in-flight GET_MODULE.
self._callbacks = {}
@ -619,8 +681,6 @@ class Importer(object):
return None
_tls.running = True
# TODO: hack: this is papering over a bug elsewhere.
fullname = fullname.rstrip('.')
try:
pkgname, dot, _ = fullname.rpartition('.')
_vv and IOLOG.debug('%r.find_module(%r)', self, fullname)
@ -707,7 +767,9 @@ class Importer(object):
else:
_v and LOG.debug('_request_module(%r): new request', fullname)
self._callbacks[fullname] = [callback]
self._context.send(Message(data=fullname, handle=GET_MODULE))
self._context.send(
Message(data=b(fullname), handle=GET_MODULE)
)
finally:
self._lock.release()
@ -715,9 +777,8 @@ class Importer(object):
callback()
def load_module(self, fullname):
fullname = to_text(fullname)
_v and LOG.debug('Importer.load_module(%r)', fullname)
# TODO: hack: this is papering over a bug elsewhere.
fullname = fullname.rstrip('.')
self._refuse_imports(fullname)
event = threading.Event()
@ -739,14 +800,12 @@ class Importer(object):
else:
mod.__package__ = fullname.rpartition('.')[0] or None
# TODO: monster hack: work around modules now being imported as their
# actual name, so when Ansible "apt.py" tries to "import apt", it gets
# itself. Instead force absolute imports during compilation.
flags = 0
if fullname.startswith('ansible'):
flags = 0x4000
if mod.__package__ and not PY3:
# 2.x requires __package__ to be exactly a string.
mod.__package__ = mod.__package__.encode()
source = self.get_source(fullname)
code = compile(source, mod.__file__, 'exec', flags, True)
code = compile(source, mod.__file__, 'exec', 0, 1)
if PY3:
exec(code, vars(mod))
else:
@ -755,11 +814,14 @@ class Importer(object):
def get_filename(self, fullname):
if fullname in self._cache:
return 'master:' + self._cache[fullname][2]
return u'master:' + self._cache[fullname][2]
def get_source(self, fullname):
if fullname in self._cache:
return zlib.decompress(self._cache[fullname][3])
source = zlib.decompress(self._cache[fullname][3])
if PY3:
return to_text(source)
return source
class LogHandler(logging.Handler):
@ -777,7 +839,7 @@ class LogHandler(logging.Handler):
try:
msg = self.format(rec)
encoded = '%s\x00%s\x00%s' % (rec.name, rec.levelno, msg)
if isinstance(encoded, unicode):
if isinstance(encoded, UnicodeType):
# Logging package emits both :(
encoded = encoded.encode('utf-8')
self.context.send(Message(data=encoded, handle=FORWARD_LOG))
@ -865,7 +927,7 @@ class Stream(BasicStream):
def __init__(self, router, remote_id, **kwargs):
self._router = router
self.remote_id = remote_id
self.name = 'default'
self.name = u'default'
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.construct(**kwargs)
self._input_buf = collections.deque()
@ -935,7 +997,7 @@ class Stream(BasicStream):
prev_start = start
start = 0
msg.data = ''.join(bits)
msg.data = b('').join(bits)
self._input_buf.appendleft(buf[prev_start+len(bit):])
self._input_buf_len -= total_len
self._router._async_route(msg, self)
@ -956,7 +1018,7 @@ class Stream(BasicStream):
self.on_disconnect(broker)
return
elif written != len(buf):
self._output_buf.appendleft(buffer(buf, written))
self._output_buf.appendleft(BufferType(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
self._output_buf_len -= written
@ -1002,7 +1064,10 @@ class Context(object):
self.name = name
def __reduce__(self):
return _unpickle_context, (self.context_id, self.name)
name = self.name
if name and not isinstance(name, UnicodeType):
name = UnicodeType(name, 'utf-8')
return _unpickle_context, (self.context_id, name)
def on_disconnect(self):
_v and LOG.debug('%r.on_disconnect()', self)
@ -1023,9 +1088,11 @@ class Context(object):
def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('%r.call_service_async(%r, %r, %r)',
self, service_name, method_name, kwargs)
if not isinstance(service_name, basestring):
if isinstance(service_name, BytesType):
service_name = service_name.encode('utf-8')
elif not isinstance(service_name, UnicodeType):
service_name = service_name.name() # Service.name()
tup = (service_name, method_name, kwargs)
tup = (service_name, to_text(method_name), Kwargs(kwargs))
msg = Message.pickled(tup, handle=CALL_SERVICE)
return self.send_async(msg)
@ -1055,7 +1122,7 @@ def _unpickle_context(router, context_id, name):
if not (isinstance(router, Router) and
isinstance(context_id, (int, long)) and context_id >= 0 and (
(name is None) or
(isinstance(name, basestring) and len(name) < 100))
(isinstance(name, UnicodeType) and len(name) < 100))
):
raise TypeError('cannot unpickle Context: bad input')
return router.context_class(router, context_id, name)
@ -1193,7 +1260,7 @@ class Latch(object):
if i >= self._waking:
raise e or TimeoutError()
self._waking -= 1
if rsock.recv(2) != '\x7f':
if rsock.recv(2) != b('\x7f'):
raise LatchError('internal error: received >1 wakeups')
if e:
raise e
@ -1223,10 +1290,10 @@ class Latch(object):
def _wake(self, sock):
try:
os.write(sock.fileno(), '\x7f')
os.write(sock.fileno(), b('\x7f'))
except OSError:
e = sys.exc_info()[1]
if e[0] != errno.EBADF:
if e.args[0] != errno.EBADF:
raise
def __repr__(self):
@ -1319,7 +1386,7 @@ class Waker(BasicStream):
self.transmit_side.write(b(' '))
except OSError:
e = sys.exc_info()[1]
if e[0] != errno.EBADF:
if e.args[0] != errno.EBADF:
raise
@ -1363,7 +1430,7 @@ class IoLogger(BasicStream):
if not buf:
return self.on_disconnect(broker)
self._buf += buf
self._buf += buf.decode('latin1')
self._log_lines()
@ -1420,7 +1487,7 @@ class Router(object):
def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None):
handle = handle or self._last_handle.next()
handle = handle or next(self._last_handle)
_vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
if respondent:
@ -1761,12 +1828,12 @@ class ExternalContext(object):
else:
core_src_fd = self.config.get('core_src_fd', 101)
if core_src_fd:
fp = os.fdopen(core_src_fd, 'r', 1)
fp = os.fdopen(core_src_fd, 'rb', 1)
try:
core_size = int(fp.readline())
core_src = fp.read(core_size)
# Strip "ExternalContext.main()" call from last line.
core_src = '\n'.join(core_src.splitlines()[:-1])
core_src = b('\n').join(core_src.splitlines()[:-1])
finally:
fp.close()
else:

@ -59,7 +59,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'docker.' + (self.container or self.image)
self.name = u'docker.' + (self.container or self.image)
def get_boot_command(self):
args = ['--interactive']

@ -299,7 +299,7 @@ def exit():
def die(msg, *args):
if args:
msg %= args
print msg
sys.stderr.write('%s\n' % (msg,))
exit()
@ -419,12 +419,12 @@ def run(dest, router, args, deadline=None, econtext=None):
context_id = router.allocate_id()
fakessh = mitogen.parent.Context(router, context_id)
fakessh.name = 'fakessh.%d' % (context_id,)
fakessh.name = u'fakessh.%d' % (context_id,)
sock1, sock2 = socket.socketpair()
stream = mitogen.core.Stream(router, context_id)
stream.name = 'fakessh'
stream.name = u'fakessh'
stream.accept(sock1.fileno(), sock1.fileno())
router.register(fakessh, stream)
@ -445,7 +445,7 @@ def run(dest, router, args, deadline=None, econtext=None):
finally:
fp.close()
os.chmod(ssh_path, 0755)
os.chmod(ssh_path, int('0755', 8))
env = os.environ.copy()
env.update({
'PATH': '%s:%s' % (tmp_path, env.get('PATH', '')),

@ -45,7 +45,7 @@ def fixup_prngs():
Add 256 bits of /dev/urandom to OpenSSL's PRNG in the child, and re-seed
the random package with the same data.
"""
s = os.urandom(256 / 8)
s = os.urandom(256 // 8)
random.seed(s)
if 'ssl' in sys.modules:
sys.modules['ssl'].RAND_add(s, 75.0)
@ -112,7 +112,7 @@ class Stream(mitogen.parent.Stream):
if isinstance(responder, mitogen.parent.ModuleForwarder):
self.importer = responder.importer
name_prefix = 'fork'
name_prefix = u'fork'
def start_child(self):
parentfp, childfp = mitogen.parent.create_socketpair()

@ -54,7 +54,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'jail.' + self.container
self.name = u'jail.' + self.container
def get_boot_command(self):
bits = [self.jexec_path]

@ -56,7 +56,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'lxc.' + self.container
self.name = u'lxc.' + self.container
def get_boot_command(self):
bits = [

@ -56,8 +56,14 @@ import mitogen
import mitogen.core
import mitogen.minify
import mitogen.parent
from mitogen.core import IOLOG
from mitogen.core import b
from mitogen.core import to_text
from mitogen.core import LOG
from mitogen.core import IOLOG
imap = getattr(itertools, 'imap', map)
izip = getattr(itertools, 'izip', zip)
RLOG = logging.getLogger('mitogen.ctx')
@ -79,7 +85,7 @@ def _stdlib_paths():
def get_child_modules(path):
it = pkgutil.iter_modules([os.path.dirname(path)])
return [name for _, name, _ in it]
return [to_text(name) for _, name, _ in it]
def get_core_source():
@ -115,7 +121,11 @@ def scan_code_imports(co):
`modname`.
"""
# Yield `(op, oparg)` tuples from the code object `co`.
ordit = itertools.imap(ord, co.co_code)
if mitogen.core.PY3:
ordit = iter(co.co_code)
nextb = ordit.__next__
else:
ordit = imap(ord, co.co_code)
nextb = ordit.next
opit = ((c, (None
@ -131,7 +141,7 @@ def scan_code_imports(co):
except StopIteration:
return
for oparg1, oparg2, (op3, arg3) in itertools.izip(opit, opit2, opit3):
for oparg1, oparg2, (op3, arg3) in izip(opit, opit2, opit3):
if op3 == IMPORT_NAME:
op2, arg2 = oparg2
op1, arg1 = oparg1
@ -227,7 +237,7 @@ class LogForwarder(object):
name = '%s.%s' % (RLOG.name, context.name)
self._cache[msg.src_id] = logger = logging.getLogger(name)
name, level_s, s = msg.data.split('\x00', 2)
name, level_s, s = msg.data.decode('latin1').split('\x00', 2)
logger.log(int(level_s), '%s: %s', name, s, extra={
'mitogen_message': s,
'mitogen_context': self._router.context_by_id(msg.src_id),
@ -305,9 +315,18 @@ class ModuleFinder(object):
def _get_module_via_pkgutil(self, fullname):
"""Attempt to fetch source code via pkgutil. In an ideal world, this
would be the only required implementation of get_module()."""
try:
# Pre-'import spec' this returned None, in Python3.6 it raises
# ImportError.
loader = pkgutil.find_loader(fullname)
IOLOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r',
fullname, loader)
except ImportError:
e = sys.exc_info()[1]
LOG.debug('%r._get_module_via_pkgutil(%r): %s',
self, fullname, e)
return None
IOLOG.debug('%r._get_module_via_pkgutil(%r) -> %r',
self, fullname, loader)
if not loader:
return
@ -318,13 +337,21 @@ class ModuleFinder(object):
except AttributeError:
return
if path is not None and source is not None:
if path is None or source is None:
return
if isinstance(source, mitogen.core.UnicodeType):
# get_source() returns "string" according to PEP-302, which was
# reinterpreted for Python 3 to mean a Unicode string.
source = source.encode('utf-8')
return path, source, is_pkg
def _get_module_via_sys_modules(self, fullname):
"""Attempt to fetch source code via sys.modules. This is specifically
to support __main__, but it may catch a few more cases."""
module = sys.modules.get(fullname)
LOG.debug('_get_module_via_sys_modules(%r) -> %r', fullname, module)
if not isinstance(module, types.ModuleType):
LOG.debug('sys.modules[%r] absent or not a regular module',
fullname)
@ -344,6 +371,11 @@ class ModuleFinder(object):
raise
source = '\n'
if isinstance(source, mitogen.core.UnicodeType):
# get_source() returns "string" according to PEP-302, which was
# reinterpreted for Python 3 to mean a Unicode string.
source = source.encode('utf-8')
return path, source, is_pkg
get_module_methods = [_get_module_via_pkgutil,
@ -483,7 +515,7 @@ class ModuleResponder(object):
def __repr__(self):
return 'ModuleResponder(%r)' % (self._router,)
MAIN_RE = re.compile(r'^if\s+__name__\s*==\s*.__main__.\s*:', re.M)
MAIN_RE = re.compile(b(r'^if\s+__name__\s*==\s*.__main__.\s*:'), re.M)
def whitelist_prefix(self, fullname):
if self.whitelist == ['']:
@ -502,6 +534,9 @@ class ModuleResponder(object):
return src[:match.start()]
return src
def _make_negative_response(self, fullname):
return (fullname, None, None, None, ())
def _build_tuple(self, fullname):
if mitogen.core.is_blacklisted_import(self, fullname):
raise ImportError('blacklisted')
@ -512,13 +547,10 @@ class ModuleResponder(object):
path, source, is_pkg = self._finder.get_module_source(fullname)
if source is None:
LOG.error('_build_tuple(%r): could not locate source', fullname)
tup = fullname, None, None, None, ()
tup = self._make_negative_response(fullname)
self._cache[fullname] = tup
return tup
if source is None:
raise ImportError('could not find %r' % (fullname,))
if is_pkg:
pkg_present = get_child_modules(path)
LOG.debug('_build_tuple(%r, %r) -> %r',
@ -528,14 +560,20 @@ class ModuleResponder(object):
if fullname == '__main__':
source = self.neutralize_main(source)
compressed = zlib.compress(source, 9)
compressed = mitogen.core.Blob(zlib.compress(source, 9))
related = [
name
to_text(name)
for name in self._finder.find_related(fullname)
if not mitogen.core.is_blacklisted_import(self, name)
]
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related
tup = fullname, pkg_present, path, compressed, related
tup = (
to_text(fullname),
pkg_present,
to_text(path),
compressed,
related
)
self._cache[fullname] = tup
return tup
@ -563,6 +601,14 @@ class ModuleResponder(object):
def _send_module_and_related(self, stream, fullname):
try:
tup = self._build_tuple(fullname)
if tup[2] and is_stdlib_path(tup[2]):
# Prevent loading of 2.x<->3.x stdlib modules! This costs one
# RTT per hit, so a client-side solution is also required.
LOG.warning('%r: refusing to serve stdlib module %r',
self, fullname)
self._send_module_load_failed(stream, fullname)
return
for name in tup[4]: # related
parent, _, _ = name.partition('.')
if parent != fullname and parent not in stream.sent_modules:
@ -581,7 +627,7 @@ class ModuleResponder(object):
LOG.debug('%r._on_get_module(%r)', self, msg.data)
stream = self._router.stream_by_id(msg.src_id)
fullname = msg.data
fullname = msg.data.decode()
if fullname in stream.sent_modules:
LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream)
@ -592,7 +638,7 @@ class ModuleResponder(object):
if stream.remote_id != context.context_id:
stream.send(
mitogen.core.Message(
data='%s\x00%s' % (context.context_id, fullname),
data=b('%s\x00%s' % (context.context_id, fullname)),
handle=mitogen.core.FORWARD_MODULE,
dst_id=stream.remote_id,
)

@ -28,11 +28,12 @@
import sys
try:
from cStringIO import StringIO as BytesIO
from cStringIO import StringIO
except ImportError:
from io import BytesIO
from io import StringIO
import mitogen.core
if sys.version_info < (2, 7, 11):
from mitogen.compat import tokenize
@ -49,7 +50,9 @@ except ImportError:
def minimize_source(source):
"""Remove most comments and docstrings from Python source code.
"""
tokens = tokenize.generate_tokens(BytesIO(source).readline)
if not isinstance(source, mitogen.core.UnicodeType):
source = source.decode('utf-8')
tokens = tokenize.generate_tokens(StringIO(source).readline)
tokens = strip_comments(tokens)
tokens = strip_docstrings(tokens)
tokens = reindent(tokens)

@ -32,6 +32,7 @@ sent to any child context that is due to become a parent, due to recursive
connection.
"""
import codecs
import errno
import fcntl
import getpass
@ -52,11 +53,25 @@ import zlib
# Absolute imports for <2.5.
select = __import__('select')
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
try:
from functools import lru_cache
except ImportError:
from mitogen.compat.functools import lru_cache
import mitogen.core
from mitogen.core import b
from mitogen.core import LOG
from mitogen.core import IOLOG
if mitogen.core.PY3:
xrange = range
try:
SC_OPEN_MAX = os.sysconf('SC_OPEN_MAX')
except:
@ -281,9 +296,13 @@ def write_all(fd, s, deadline=None):
if timeout == 0:
raise mitogen.core.TimeoutError('write timed out')
if mitogen.core.PY3:
window = memoryview(s)[written:]
else:
window = buffer(s, written)
for fd in poller.poll(timeout):
n, disconnected = mitogen.core.io_op(
os.write, fd, buffer(s, written))
n, disconnected = mitogen.core.io_op(os.write, fd, window)
if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
@ -321,7 +340,7 @@ def iter_read(fds, deadline=None):
if not poller.readers:
raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],)
(b('').join(bits)[-300:],)
)
raise mitogen.core.TimeoutError('read timed out')
@ -380,14 +399,18 @@ def upgrade_router(econtext):
def make_call_msg(fn, *args, **kwargs):
if isinstance(fn, types.MethodType) and \
isinstance(fn.im_self, (type, types.ClassType)):
klass = fn.im_self.__name__
klass = mitogen.core.to_text(fn.im_self.__name__)
else:
klass = None
return mitogen.core.Message.pickled(
(fn.__module__, klass, fn.__name__, args, kwargs),
handle=mitogen.core.CALL_FUNCTION,
tup = (
mitogen.core.to_text(fn.__module__),
klass,
mitogen.core.to_text(fn.__name__),
args,
mitogen.core.Kwargs(kwargs)
)
return mitogen.core.Message.pickled(tup, handle=mitogen.core.CALL_FUNCTION)
def stream_by_method_name(name):
@ -395,9 +418,9 @@ def stream_by_method_name(name):
Given the name of a Mitogen connection method, import its implementation
module and return its Stream subclass.
"""
if name == 'local':
name = 'parent'
module = mitogen.core.import_module('mitogen.' + name)
if name == u'local':
name = u'parent'
module = mitogen.core.import_module(u'mitogen.' + name)
return module.Stream
@ -412,18 +435,18 @@ def _proxy_connect(name, method_name, kwargs, econtext):
)
except mitogen.core.StreamError:
return {
'id': None,
'name': None,
'msg': 'error occurred on host %s: %s' % (
u'id': None,
u'name': None,
u'msg': 'error occurred on host %s: %s' % (
socket.gethostname(),
sys.exc_info()[1],
),
}
return {
'id': context.context_id,
'name': context.name,
'msg': None,
u'id': context.context_id,
u'name': context.name,
u'msg': None,
}
@ -790,11 +813,15 @@ class Stream(mitogen.core.Stream):
sys.executable += sys.version[:3]
os.environ['ARGV0']=sys.executable
os.execl(sys.executable,sys.executable+'(mitogen:CONTEXT_NAME)')
os.write(1,'MITO000\n')
os.write(1,'MITO000\n'.encode())
C=_(os.fdopen(0,'rb').read(PREAMBLE_COMPRESSED_LEN),'zip')
os.fdopen(W,'w',0).write(C)
os.fdopen(w,'w',0).write('PREAMBLE_LEN\n'+C)
os.write(1,'MITO001\n')
wfp=os.fdopen(W,'wb',0)
wfp.write(C)
Wfp=os.fdopen(w,'wb',0)
Wfp.write('PREAMBLE_LEN\n'.encode()+C)
wfp.close()
Wfp.close()
os.write(1,'MITO001\n'.encode())
def get_boot_command(self):
source = inspect.getsource(self._first_stage)
@ -806,7 +833,8 @@ class Stream(mitogen.core.Stream):
str(len(preamble_compressed)))
source = source.replace('PREAMBLE_LEN',
str(len(zlib.decompress(preamble_compressed))))
encoded = zlib.compress(source, 9).encode('base64').replace('\n', '')
compressed = zlib.compress(source.encode(), 9)
encoded = codecs.encode(compressed, 'base64').replace(b('\n'), b(''))
# We can't use bytes.decode() in 3.x since it was restricted to always
# return unicode, so codecs.decode() is used instead. In 3.x
# codecs.decode() requires a bytes object. Since we must be compatible
@ -815,7 +843,7 @@ class Stream(mitogen.core.Stream):
return [
self.python_path, '-c',
'import codecs,os,sys;_=codecs.decode;'
'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded,)
'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded.decode(),)
]
def get_econtext_config(self):
@ -840,11 +868,11 @@ class Stream(mitogen.core.Stream):
source += '\nExternalContext(%r).main()\n' % (
self.get_econtext_config(),
)
return zlib.compress(source, 9)
return zlib.compress(source.encode('utf-8'), 9)
create_child = staticmethod(create_child)
create_child_args = {}
name_prefix = 'local'
name_prefix = u'local'
def start_child(self):
args = self.get_boot_command()
@ -858,7 +886,7 @@ class Stream(mitogen.core.Stream):
def connect(self):
LOG.debug('%r.connect()', self)
self.pid, fd, extra_fd = self.start_child()
self.name = '%s.%s' % (self.name_prefix, self.pid)
self.name = u'%s.%s' % (self.name_prefix, self.pid)
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r',
@ -871,16 +899,18 @@ class Stream(mitogen.core.Stream):
raise
#: For ssh.py, this must be at least max(len('password'), len('debug1:'))
EC0_MARKER = 'MITO000\n'
EC1_MARKER = 'MITO001\n'
EC0_MARKER = mitogen.core.b('MITO000\n')
EC1_MARKER = mitogen.core.b('MITO001\n')
def _ec0_received(self):
LOG.debug('%r._ec0_received()', self)
write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, 'MITO001\n', self.connect_deadline)
discard_until(self.receive_side.fd, self.EC1_MARKER,
self.connect_deadline)
def _connect_bootstrap(self, extra_fd):
discard_until(self.receive_side.fd, 'MITO000\n', self.connect_deadline)
discard_until(self.receive_side.fd, self.EC0_MARKER,
self.connect_deadline)
self._ec0_received()
@ -976,7 +1006,7 @@ class RouteMonitor(object):
self.parent.send(
mitogen.core.Message(
handle=handle,
data=data,
data=data.encode('utf-8'),
)
)
@ -986,7 +1016,8 @@ class RouteMonitor(object):
stream, we're also responsible for broadcasting DEL_ROUTE upstream
if/when that child disconnects.
"""
self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id, stream.name)
self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id,
stream.name)
mitogen.core.listen(
obj=stream,
name='disconnect',
@ -1011,7 +1042,8 @@ class RouteMonitor(object):
if msg.is_dead:
return
target_id_s, _, target_name = msg.data.partition(':')
target_id_s, _, target_name = msg.data.partition(b(':'))
target_name = target_name.decode()
target_id = int(target_id_s)
self.router.context_by_id(target_id).name = target_name
stream = self.router.stream_by_id(msg.auth_id)
@ -1124,7 +1156,7 @@ class Router(mitogen.core.Router):
self._context_by_id[context_id] = context
return context
connection_timeout_msg = "Connection timed out."
connection_timeout_msg = u"Connection timed out."
def _connect(self, klass, name=None, **kwargs):
context_id = self.allocate_id()
@ -1145,11 +1177,11 @@ class Router(mitogen.core.Router):
def connect(self, method_name, name=None, **kwargs):
klass = stream_by_method_name(method_name)
kwargs.setdefault('debug', self.debug)
kwargs.setdefault('profiling', self.profiling)
kwargs.setdefault('unidirectional', self.unidirectional)
kwargs.setdefault(u'debug', self.debug)
kwargs.setdefault(u'profiling', self.profiling)
kwargs.setdefault(u'unidirectional', self.unidirectional)
via = kwargs.pop('via', None)
via = kwargs.pop(u'via', None)
if via is not None:
return self.proxy_connect(via, method_name, name=name, **kwargs)
return self._connect(klass, name=name, **kwargs)
@ -1158,43 +1190,43 @@ class Router(mitogen.core.Router):
resp = via_context.call(_proxy_connect,
name=name,
method_name=method_name,
kwargs=kwargs
kwargs=mitogen.core.Kwargs(kwargs),
)
if resp['msg'] is not None:
raise mitogen.core.StreamError(resp['msg'])
name = '%s.%s' % (via_context.name, resp['name'])
name = u'%s.%s' % (via_context.name, resp['name'])
context = self.context_class(self, resp['id'], name=name)
context.via = via_context
self._context_by_id[context.context_id] = context
return context
def docker(self, **kwargs):
return self.connect('docker', **kwargs)
return self.connect(u'docker', **kwargs)
def fork(self, **kwargs):
return self.connect('fork', **kwargs)
return self.connect(u'fork', **kwargs)
def jail(self, **kwargs):
return self.connect('jail', **kwargs)
return self.connect(u'jail', **kwargs)
def local(self, **kwargs):
return self.connect('local', **kwargs)
return self.connect(u'local', **kwargs)
def lxc(self, **kwargs):
return self.connect('lxc', **kwargs)
return self.connect(u'lxc', **kwargs)
def setns(self, **kwargs):
return self.connect('setns', **kwargs)
return self.connect(u'setns', **kwargs)
def su(self, **kwargs):
return self.connect('su', **kwargs)
return self.connect(u'su', **kwargs)
def sudo(self, **kwargs):
return self.connect('sudo', **kwargs)
return self.connect(u'sudo', **kwargs)
def ssh(self, **kwargs):
return self.connect('ssh', **kwargs)
return self.connect(u'ssh', **kwargs)
class ProcessMonitor(object):
@ -1251,7 +1283,8 @@ class ModuleForwarder(object):
if msg.is_dead:
return
context_id_s, _, fullname = msg.data.partition('\x00')
context_id_s, _, fullname = msg.data.partition(b('\x00'))
fullname = mitogen.core.to_text(fullname)
context_id = int(context_id_s)
stream = self.router.stream_by_id(context_id)
if stream.remote_id == mitogen.parent_id:
@ -1279,9 +1312,17 @@ class ModuleForwarder(object):
if msg.is_dead:
return
fullname = msg.data
self.importer._request_module(fullname,
lambda: self._on_cache_callback(msg, fullname)
fullname = msg.data.decode('utf-8')
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)
def _send_one_module(self, msg, tup):
self.router._async_route(
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _on_cache_callback(self, msg, fullname):

@ -38,6 +38,7 @@ import time
import mitogen.core
import mitogen.select
from mitogen.core import b
from mitogen.core import LOG
@ -48,6 +49,14 @@ _pool_pid = None
_pool_lock = threading.Lock()
if mitogen.core.PY3:
def func_code(func):
return func.__code__
else:
def func_code(func):
return func.func_code
@mitogen.core.takes_router
def get_or_create_pool(size=None, router=None):
global _pool
@ -221,7 +230,7 @@ class Invoker(object):
def _invoke(self, method_name, kwargs, msg):
method = getattr(self.service, method_name)
if 'msg' in method.func_code.co_varnames:
if 'msg' in func_code(method).co_varnames:
kwargs['msg'] = msg # TODO: hack
no_reply = getattr(method, 'mitogen_service__no_reply', False)
@ -371,7 +380,7 @@ class Service(object):
@classmethod
def name(cls):
return '%s.%s' % (cls.__module__, cls.__name__)
return u'%s.%s' % (cls.__module__, cls.__name__)
def __init__(self, router):
self.router = router
@ -468,7 +477,7 @@ class Pool(object):
def join(self):
for th in self._threads:
th.join()
for invoker in self._invoker_by_name.itervalues():
for invoker in self._invoker_by_name.values():
invoker.service.on_shutdown()
def get_invoker(self, name, msg):
@ -490,17 +499,21 @@ class Pool(object):
def _validate(self, msg):
tup = msg.unpickle(throw=False)
LOG.debug('_validate(): %r', tup)
LOG.debug('_validate(): %r', mitogen.core.PY3)
LOG.debug('_validate(): %r', list(map(type, tup)))
LOG.debug('_validate(): UnicodeType=%r', mitogen.core.UnicodeType)
if not (isinstance(tup, tuple) and
len(tup) == 3 and
isinstance(tup[0], basestring) and
isinstance(tup[1], basestring) and
isinstance(tup[0], mitogen.core.AnyTextType) and
isinstance(tup[1], mitogen.core.AnyTextType) and
isinstance(tup[2], dict)):
raise mitogen.core.CallError('Invalid message format.')
def _on_service_call(self, recv, msg):
try:
self._validate(msg)
service_name, method_name, kwargs = msg.unpickle()
try:
invoker = self.get_invoker(service_name, msg)
return invoker.invoke(method_name, kwargs, msg)
except mitogen.core.CallError:
@ -628,7 +641,7 @@ class PushFileService(Service):
@expose(policy=AllowParents())
@arg_spec({
'context': mitogen.core.Context,
'path': basestring,
'path': mitogen.core.FsPathTypes,
})
def propagate_to(self, context, path):
LOG.debug('%r.propagate_to(%r, %r)', self, context, path)
@ -651,7 +664,7 @@ class PushFileService(Service):
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
'path': basestring,
'path': mitogen.core.FsPathTypes,
'data': mitogen.core.Blob,
'context': mitogen.core.Context,
})
@ -667,7 +680,7 @@ class PushFileService(Service):
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
'path': basestring,
'path': mitogen.core.FsPathTypes,
'context': mitogen.core.Context,
})
def forward(self, path, context):
@ -752,7 +765,7 @@ class FileService(Service):
@expose(policy=AllowParents())
@arg_spec({
'path': basestring,
'path': mitogen.core.FsPathTypes,
})
def register(self, path):
"""
@ -801,7 +814,7 @@ class FileService(Service):
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE)
mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
@ -831,7 +844,7 @@ class FileService(Service):
@expose(policy=AllowAny())
@no_reply()
@arg_spec({
'path': basestring,
'path': mitogen.core.FsPathTypes,
'sender': mitogen.core.Sender,
})
def fetch(self, path, sender, msg):

@ -214,4 +214,4 @@ class Stream(mitogen.parent.Stream):
LOG.debug('Leader PID for %s container %r: %d',
self.kind, self.container, self.leader_pid)
super(Stream, self).connect()
self.name = 'setns.' + self.container
self.name = u'setns.' + self.container

@ -39,18 +39,19 @@ except ImportError:
from pipes import quote as shlex_quote
import mitogen.parent
from mitogen.core import b
LOG = logging.getLogger('mitogen')
# sshpass uses 'assword' because it doesn't lowercase the input.
PASSWORD_PROMPT = 'password'
PERMDENIED_PROMPT = 'permission denied'
HOSTKEY_REQ_PROMPT = 'are you sure you want to continue connecting (yes/no)?'
HOSTKEY_FAIL = 'host key verification failed.'
PASSWORD_PROMPT = b('password')
PERMDENIED_PROMPT = b('permission denied')
HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?')
HOSTKEY_FAIL = b('host key verification failed.')
DEBUG_PREFIXES = ('debug1:', 'debug2:', 'debug3:')
DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:'))
def filter_debug(stream, it):
@ -62,7 +63,7 @@ def filter_debug(stream, it):
lines such as the password prompt.
"""
state = 'start_of_line'
buf = ''
buf = b('')
for chunk in it:
buf += chunk
while buf:
@ -78,13 +79,13 @@ def filter_debug(stream, it):
else:
state = 'in_plain'
elif state == 'in_debug':
if '\n' not in buf:
if b('\n') not in buf:
break
line, _, buf = buf.partition('\n')
line, _, buf = buf.partition(b('\n'))
LOG.debug('%r: %s', stream, line.rstrip())
state = 'start_of_line'
elif state == 'in_plain':
line, nl, buf = buf.partition('\n')
line, nl, buf = buf.partition(b('\n'))
yield line + nl
if nl:
state = 'start_of_line'
@ -189,9 +190,9 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'ssh.' + self.hostname
self.name = u'ssh.' + mitogen.core.to_text(self.hostname)
if self.port:
self.name += ':%s' % (self.port,)
self.name += u':%s' % (self.port,)
auth_incorrect_msg = 'SSH authentication is incorrect'
password_incorrect_msg = 'SSH password is incorrect'
@ -249,7 +250,7 @@ class Stream(mitogen.parent.Stream):
if self.password is None:
raise PasswordError(self.password_required_msg)
LOG.debug('%r: sending password', self)
self.tty_stream.transmit_side.write(self.password + '\n')
self.tty_stream.transmit_side.write((self.password + '\n').encode())
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')

@ -31,6 +31,7 @@ import os
import mitogen.core
import mitogen.parent
from mitogen.core import b
LOG = logging.getLogger(__name__)
@ -54,11 +55,11 @@ class Stream(mitogen.parent.Stream):
username = 'root'
password = None
su_path = 'su'
password_prompt = 'password:'
password_prompt = b('password:')
incorrect_prompts = (
'su: sorry', # BSD
'su: authentication failure', # Linux
'su: incorrect password', # CentOS 6
b('su: sorry'), # BSD
b('su: authentication failure'), # Linux
b('su: incorrect password'), # CentOS 6
)
def construct(self, username=None, password=None, su_path=None,
@ -77,7 +78,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'su.' + self.username
self.name = u'su.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
super(Stream, self).on_disconnect(broker)
@ -110,6 +111,8 @@ class Stream(mitogen.parent.Stream):
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.transmit_side.write(self.password + '\n')
self.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')

@ -33,10 +33,11 @@ import time
import mitogen.core
import mitogen.parent
from mitogen.core import b
LOG = logging.getLogger(__name__)
PASSWORD_PROMPT = 'password'
PASSWORD_PROMPT = b('password')
SUDO_OPTIONS = [
#(False, 'bool', '--askpass', '-A')
#(False, 'str', '--auth-type', '-a')
@ -135,7 +136,7 @@ class Stream(mitogen.parent.Stream):
def connect(self):
super(Stream, self).connect()
self.name = 'sudo.' + self.username
self.name = u'sudo.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
@ -176,7 +177,8 @@ class Stream(mitogen.parent.Stream):
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.tty_stream.transmit_side.write(self.password + '\n')
self.tty_stream.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')

@ -73,7 +73,7 @@ class Listener(mitogen.core.BasicStream):
os.unlink(self.path)
self._sock.bind(self.path)
os.chmod(self.path, 0600)
os.chmod(self.path, int('0600', 8))
self._sock.listen(backlog)
self.receive_side = mitogen.core.Side(self, self._sock.fileno())
router.broker.start_receive(self)
@ -87,7 +87,7 @@ class Listener(mitogen.core.BasicStream):
context = mitogen.parent.Context(self._router, context_id)
stream = mitogen.core.Stream(self._router, context_id)
stream.accept(sock.fileno(), sock.fileno())
stream.name = 'unix_client.%d' % (pid,)
stream.name = u'unix_client.%d' % (pid,)
stream.auth_id = mitogen.context_id
stream.is_privileged = True
self._router.register(context, stream)
@ -111,7 +111,7 @@ def connect(path, broker=None):
router = mitogen.master.Router(broker=broker)
stream = mitogen.core.Stream(router, remote_id)
stream.accept(sock.fileno(), sock.fileno())
stream.name = 'unix_listener.%d' % (pid,)
stream.name = u'unix_listener.%d' % (pid,)
context = mitogen.parent.Context(router, remote_id)
router.register(context, stream)

@ -39,6 +39,11 @@ import mitogen.master
LOG = logging.getLogger('mitogen')
iteritems = getattr(dict, 'iteritems', dict.items)
if mitogen.core.PY3:
iteritems = dict.items
else:
iteritems = dict.iteritems
def disable_site_packages():
for entry in sys.path[:]:
@ -118,9 +123,9 @@ def cast(obj):
return [cast(v) for v in obj]
if isinstance(obj, PASSTHROUGH):
return obj
if isinstance(obj, unicode):
return unicode(obj)
if isinstance(obj, str):
return str(obj)
if isinstance(obj, mitogen.core.UnicodeType):
return mitogen.core.UnicodeType(obj)
if isinstance(obj, mitogen.core.BytesType):
return mitogen.core.BytesType(obj)
raise TypeError("Cannot serialize: %r: %r" % (type(obj), obj))

@ -19,11 +19,11 @@ router = mitogen.master.Router()
context = mitogen.parent.Context(router, 0)
stream = mitogen.ssh.Stream(router, 0, max_message_size=0, hostname='foo')
print 'SSH command size: %s' % (len(' '.join(stream.get_boot_command())),)
print 'Preamble size: %s (%.2fKiB)' % (
print('SSH command size: %s' % (len(' '.join(stream.get_boot_command())),))
print('Preamble size: %s (%.2fKiB)' % (
len(stream.get_preamble()),
len(stream.get_preamble()) / 1024.0,
)
))
print(
' '

@ -48,7 +48,6 @@ setup(
license = 'New BSD',
url = 'https://github.com/dw/mitogen/',
packages = find_packages(exclude=['tests', 'examples']),
use_2to3=True,
zip_safe = False,
classifiers = [
'Development Status :: 3 - Alpha',

@ -55,8 +55,8 @@ for suffix in suffixes:
tofile='mitogen-output.txt',
))
if diff:
print '++ differ! suffix: %r' % (suffix,)
print('++ differ! suffix: %r' % (suffix,))
for line in diff:
print line
print(line)
print
print

@ -23,7 +23,7 @@
- assert:
that: |
out.content.decode('base64') == '{"I am JSON": true}'
out.content|b64decode == '{"I am JSON": true}'
# Ensure it handles strings.
@ -41,7 +41,7 @@
- assert:
that:
out.content.decode('base64') == 'I am text.'
out.content|b64decode == 'I am text.'
- file:
path: /tmp/transfer-data

@ -4,9 +4,9 @@ import json
from ansible.module_utils.basic import path
def main():
print json.dumps({
print(json.dumps({
'path': path()
})
}))
if __name__ == '__main__':
main()

@ -21,4 +21,4 @@
- assert:
that:
- out.argv_types == ["<type 'str'>"]
- out.argv_types_correct

@ -1,6 +1,8 @@
from __future__ import unicode_literals
import io
from ansible.module_utils import six
try:
from ansible.plugins import callback_loader
except ImportError:
@ -25,14 +27,16 @@ def printi(tio, obj, key=None, indent=0):
write(']')
elif isinstance(obj, dict):
write('{')
for key2, obj2 in sorted(obj.iteritems()):
for key2, obj2 in sorted(six.iteritems(obj)):
if not (key2.startswith('_ansible_') or
key2.endswith('_lines')):
printi(tio, obj2, key=key2, indent=indent+1)
key = None
write('}')
elif isinstance(obj, basestring):
if isinstance(obj, str):
elif isinstance(obj, six.text_type):
for line in obj.splitlines():
write('%s', line.rstrip('\r\n'))
elif isinstance(obj, six.binary_type):
obj = obj.decode('utf-8', 'replace')
for line in obj.splitlines():
write('%s', line.rstrip('\r\n'))
@ -47,7 +51,7 @@ class CallbackModule(DefaultModule):
try:
tio = io.StringIO()
printi(tio, result)
return tio.getvalue().encode('ascii', 'replace')
return tio.getvalue() #.encode('ascii', 'replace')
except:
import traceback
traceback.print_exc()

@ -3,6 +3,7 @@
# interpreter I run within.
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils import six
import os
import pwd
@ -16,7 +17,7 @@ def main():
python_version=sys.version[:3],
argv=sys.argv,
__file__=__file__,
argv_types=[str(type(s)) for s in sys.argv],
argv_types_correct=all(type(s) is str for s in sys.argv),
env=dict(os.environ),
cwd=os.getcwd(),
python_path=sys.path,

@ -6,8 +6,8 @@ import sys
json_arguments = """<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>"""
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"input\": [%s]" % (json_arguments,)
print "}"
print("{")
print(" \"changed\": false,")
print(" \"msg\": \"Here is my input\",")
print(" \"input\": [%s]" % (json_arguments,))
print("}")

@ -13,8 +13,8 @@ def usage():
input_json = sys.stdin.read()
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"input\": [%s]" % (input_json,)
print "}"
print("{")
print(" \"changed\": false,")
print(" \"msg\": \"Here is my input\",")
print(" \"input\": [%s]" % (input_json,))
print("}")

@ -14,13 +14,13 @@ def usage():
input_json = sys.stdin.read()
print "{"
print " \"changed\": false,"
print("{")
print(" \"changed\": false,")
# v2.5.1. apt.py started depending on this.
# https://github.com/dw/mitogen/issues/210
print " \"__file__\": \"%s\"," % (__file__,)
print(" \"__file__\": \"%s\"," % (__file__,))
# Python sets this during a regular import.
print " \"__package__\": \"%s\"," % (__package__,)
print " \"msg\": \"Here is my input\","
print " \"input\": [%s]" % (input_json,)
print "}"
print(" \"__package__\": \"%s\"," % (__package__,))
print(" \"msg\": \"Here is my input\",")
print(" \"input\": [%s]" % (input_json,))
print("}")

@ -25,9 +25,9 @@ try:
except IOError:
usage()
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"source\": [%s]," % (json.dumps(me),)
print " \"input\": [%s]" % (input_json,)
print "}"
print("{")
print(" \"changed\": false,")
print(" \"msg\": \"Here is my input\",")
print(" \"source\": [%s]," % (json.dumps(me),))
print(" \"input\": [%s]" % (input_json,))
print("}")

@ -10,13 +10,13 @@ import time
@mitogen.main() #(log_level='DEBUG')
def main(router):
for x in xrange(1000):
for x in range(1000):
t = time.time()
f = router.local()# debug=True)
tt = time.time()
print x, 1000 * (tt - t)
print(x, 1000 * (tt - t))
print f
print 'EEK', f.call(socket.gethostname)
print 'MY PID', os.getpid()
print 'EEKERY', f.call(os.getpid)
print(f)
print('EEK', f.call(socket.gethostname))
print('MY PID', os.getpid())
print('EEKERY', f.call(os.getpid))

@ -6,12 +6,12 @@ import time
times = []
for x in xrange(5):
for x in range(5):
t0 = time.time()
os.spawnvp(os.P_WAIT, sys.argv[1], sys.argv[1:])
t = time.time() - t0
times.append(t)
print '+++', t
print('+++', t)
print 'all:', times
print 'min %s max %s diff %s' % (min(times), max(times), (max(times) - min(times)))
print('all:', times)
print('min %s max %s diff %s' % (min(times), max(times), (max(times) - min(times))))

@ -1,26 +1,31 @@
import os
import pickle
import sys
import unittest2
import mitogen.core
class MyError(Exception):
pass
class ConstructorTest(unittest2.TestCase):
klass = mitogen.core.CallError
def test_string_noargs(self):
e = self.klass('%s%s')
self.assertEquals(e[0], '%s%s')
self.assertEquals(e.args[0], '%s%s')
def test_string_args(self):
e = self.klass('%s%s', 1, 1)
self.assertEquals(e[0], '11')
self.assertEquals(e.args[0], '11')
def test_from_exc(self):
ve = ValueError('eek')
ve = MyError('eek')
e = self.klass(ve)
self.assertEquals(e[0], 'exceptions.ValueError: eek')
self.assertEquals(e.args[0], '__main__.MyError: eek')
def test_form_base_exc(self):
ve = SystemExit('eek')
@ -29,12 +34,13 @@ class ConstructorTest(unittest2.TestCase):
def test_from_exc_tb(self):
try:
raise ValueError('eek')
except ValueError, ve:
raise MyError('eek')
except MyError:
ve = sys.exc_info()[1]
e = self.klass(ve)
self.assertTrue(e[0].startswith('exceptions.ValueError: eek'))
self.assertTrue('test_from_exc_tb' in e[0])
self.assertTrue(e.args[0].startswith('__main__.MyError: eek'))
self.assertTrue('test_from_exc_tb' in e.args[0])
class PickleTest(unittest2.TestCase):
@ -43,28 +49,29 @@ class PickleTest(unittest2.TestCase):
def test_string_noargs(self):
e = self.klass('%s%s')
e2 = pickle.loads(pickle.dumps(e))
self.assertEquals(e2[0], '%s%s')
self.assertEquals(e2.args[0], '%s%s')
def test_string_args(self):
e = self.klass('%s%s', 1, 1)
e2 = pickle.loads(pickle.dumps(e))
self.assertEquals(e2[0], '11')
self.assertEquals(e2.args[0], '11')
def test_from_exc(self):
ve = ValueError('eek')
ve = MyError('eek')
e = self.klass(ve)
e2 = pickle.loads(pickle.dumps(e))
self.assertEquals(e2[0], 'exceptions.ValueError: eek')
self.assertEquals(e2.args[0], '__main__.MyError: eek')
def test_from_exc_tb(self):
try:
raise ValueError('eek')
except ValueError, ve:
raise MyError('eek')
except MyError:
ve = sys.exc_info()[1]
e = self.klass(ve)
e2 = pickle.loads(pickle.dumps(e))
self.assertTrue(e2[0].startswith('exceptions.ValueError: eek'))
self.assertTrue('test_from_exc_tb' in e2[0])
self.assertTrue(e2.args[0].startswith('__main__.MyError: eek'))
self.assertTrue('test_from_exc_tb' in e2.args[0])
if __name__ == '__main__':

@ -9,6 +9,10 @@ import mitogen.master
import testlib
class MyError(Exception):
pass
class CrazyType(object):
pass
@ -18,7 +22,7 @@ def function_that_adds_numbers(x, y):
def function_that_fails():
raise ValueError('exception text')
raise MyError('exception text')
def func_with_bad_return_value():
@ -49,7 +53,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
s = str(exc)
etype, _, s = s.partition(': ')
self.assertEqual(etype, 'exceptions.ValueError')
self.assertEqual(etype, '__main__.MyError')
msg, _, s = s.partition('\n')
self.assertEqual(msg, 'exception text')
@ -61,7 +65,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
exc = self.assertRaises(mitogen.core.StreamError,
lambda: self.local.call(func_with_bad_return_value))
self.assertEquals(
exc[0],
exc.args[0],
"cannot unpickle '%s'/'CrazyType'" % (__name__,),
)
@ -72,7 +76,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
self.broker.defer(stream.on_disconnect, self.broker)
exc = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(exc[0], mitogen.core.ChannelError.local_msg)
self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg)
def test_aborted_on_local_broker_shutdown(self):
stream = self.router._stream_by_id[self.local.context_id]
@ -81,7 +85,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
self.broker.shutdown()
exc = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(exc[0], mitogen.core.ChannelError.local_msg)
self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg)
def test_accepts_returns_context(self):
context = self.local.call(func_accepts_returns_context, self.local)

@ -3,4 +3,4 @@ import sys
def say_hi():
print 'hi'
print('hi')

@ -12,4 +12,4 @@ def repr_stuff():
@mitogen.main()
def main(router):
context = router.local()
print context.call(repr_stuff)
print(context.call(repr_stuff))

@ -30,8 +30,8 @@ import os.path
try:
import six as _system_six
print('six_brokenpkg: using system six:', _system_six)
except ImportError, e:
print('six_brokenpkg: no system six available', e)
except ImportError:
print('six_brokenpkg: no system six available')
_system_six = None
if _system_six:

@ -4,6 +4,8 @@ import subprocess
import unittest2
import mitogen.parent
from mitogen.core import b
import testlib
@ -36,7 +38,7 @@ class CommandLineTest(testlib.RouterMixin, testlib.TestCase):
stdout, stderr = proc.communicate()
self.assertEquals(0, proc.returncode)
self.assertEquals(mitogen.parent.Stream.EC0_MARKER, stdout)
self.assertIn("Error -5 while decompressing data: incomplete or truncated stream", stderr)
self.assertIn(b("Error -5 while decompressing data: incomplete or truncated stream"), stderr)
if __name__ == '__main__':

@ -19,8 +19,12 @@ PLATFORM_TO_PATH = {
('darwin', True): '/usr/lib/libssl.dylib',
('linux2', False): '/usr/lib/libssl.so',
('linux2', True): '/usr/lib/x86_64-linux-gnu/libssl.so',
# Python 2.6
('linux3', False): '/usr/lib/libssl.so',
('linux3', True): '/usr/lib/x86_64-linux-gnu/libssl.so',
# Python 3
('linux', False): '/usr/lib/libssl.so',
('linux', True): '/usr/lib/x86_64-linux-gnu/libssl.so',
}
c_ssl = ctypes.CDLL(PLATFORM_TO_PATH[sys.platform, IS_64BIT])

@ -15,7 +15,7 @@ def allocate_an_id(econtext):
class SlaveTest(testlib.RouterMixin, testlib.TestCase):
def test_slave_allocates_id(self):
context = self.router.local()
context = self.router.local(python_path='/users/dmw/src/cpython/build/bin/python2')
# Master's allocator named the context 1.
self.assertEquals(1, context.context_id)

@ -11,6 +11,7 @@ import unittest2
import mitogen.core
import mitogen.utils
from mitogen.core import b
import testlib
@ -45,7 +46,7 @@ class ImporterMixin(testlib.RouterMixin):
class LoadModuleTest(ImporterMixin, testlib.TestCase):
data = zlib.compress("data = 1\n\n")
data = zlib.compress(b("data = 1\n\n"))
path = 'fake_module.py'
modname = 'fake_module'
@ -83,7 +84,7 @@ class LoadModuleTest(ImporterMixin, testlib.TestCase):
class LoadSubmoduleTest(ImporterMixin, testlib.TestCase):
data = zlib.compress("data = 1\n\n")
data = zlib.compress(b("data = 1\n\n"))
path = 'fake_module.py'
modname = 'mypkg.fake_module'
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related
@ -96,7 +97,7 @@ class LoadSubmoduleTest(ImporterMixin, testlib.TestCase):
class LoadModulePackageTest(ImporterMixin, testlib.TestCase):
data = zlib.compress("func = lambda: 1\n\n")
data = zlib.compress(b("func = lambda: 1\n\n"))
path = 'fake_pkg/__init__.py'
modname = 'fake_pkg'
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related

@ -1,4 +1,5 @@
import sys
import threading
import unittest2
@ -66,7 +67,8 @@ class ThreadedGetTest(testlib.TestCase):
def _worker(self, func):
try:
self.results.append(func())
except Exception, e:
except Exception:
e = sys.exc_info()[1]
self.results.append(None)
self.excs.append(e)
@ -89,12 +91,12 @@ class ThreadedGetTest(testlib.TestCase):
def test_five_threads(self):
latch = self.klass()
for x in xrange(5):
for x in range(5):
self.start_one(lambda: latch.get(timeout=3.0))
for x in xrange(5):
for x in range(5):
latch.put(x)
self.join()
self.assertEquals(sorted(self.results), range(5))
self.assertEquals(sorted(self.results), list(range(5)))
self.assertEquals(self.excs, [])
@ -171,7 +173,8 @@ class ThreadedCloseTest(testlib.TestCase):
def _worker(self, func):
try:
self.results.append(func())
except Exception, e:
except Exception:
e = sys.exc_info()[1]
self.results.append(None)
self.excs.append(e)
@ -195,7 +198,7 @@ class ThreadedCloseTest(testlib.TestCase):
def test_five_threads(self):
latch = self.klass()
for x in xrange(5):
for x in range(5):
self.start_one(lambda: latch.get(timeout=3.0))
latch.close()
self.join()

@ -188,7 +188,6 @@ class FindRelatedTest(testlib.TestCase):
SIMPLE_EXPECT = set([
'mitogen',
'mitogen.compat',
'mitogen.compat.collections',
'mitogen.compat.functools',
'mitogen.core',
'mitogen.master',

@ -9,7 +9,6 @@ class NestedTest(testlib.RouterMixin, testlib.TestCase):
def test_nested(self):
context = None
for x in range(1, 11):
#print 'Connect local%d via %s' % (x, context)
context = self.router.local(via=context, name='local%d' % x)
pid = context.call(os.getpid)

@ -6,7 +6,7 @@ import testlib
def yield_stuff_then_die(sender):
for x in xrange(5):
for x in range(5):
sender.send(x)
sender.close()
return 10

@ -1,4 +1,3 @@
import Queue
import StringIO
import logging
import subprocess
@ -11,6 +10,11 @@ import mitogen.master
import mitogen.parent
import mitogen.utils
try:
import Queue
except ImportError:
import queue as Queue
def ping():
return True

@ -6,4 +6,4 @@ daemon from the environment.
"""
import testlib
print testlib.get_docker_host()
print(testlib.get_docker_host())

@ -26,10 +26,10 @@ def cons():
try:
while 1:
g = l.get()
print 'got=%s consumed=%s produced=%s crash=%s' % (g, consumed, produced, crash)
print('got=%s consumed=%s produced=%s crash=%s' % (g, consumed, produced, crash))
consumed += 1
time.sleep(g)
for x in xrange(int(g * 1000)):
for x in range(int(g * 1000)):
pass
except:
crash += 1

@ -1,3 +1,5 @@
import sys
import mitogen
import mitogen.ssh
import mitogen.utils
@ -53,8 +55,8 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
username='mitogen__has_sudo',
)
assert 0, 'exception not thrown'
except mitogen.ssh.PasswordError, e:
pass
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e[0], self.stream_class.password_required_msg)
@ -65,8 +67,8 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
password='badpw',
)
assert 0, 'exception not thrown'
except mitogen.ssh.PasswordError, e:
pass
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e[0], self.stream_class.password_incorrect_msg)
@ -87,8 +89,8 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
username='mitogen__has_sudo_pubkey',
)
assert 0, 'exception not thrown'
except mitogen.ssh.PasswordError, e:
pass
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e[0], self.stream_class.password_required_msg)

@ -1,5 +1,4 @@
import StringIO
import logging
import os
import random
@ -8,7 +7,6 @@ import socket
import subprocess
import sys
import time
import urlparse
import unittest2
@ -16,6 +14,16 @@ import mitogen.core
import mitogen.master
import mitogen.utils
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
sys.path.append(DATA_DIR)
@ -28,7 +36,7 @@ def data_path(suffix):
path = os.path.join(DATA_DIR, suffix)
if path.endswith('.key'):
# SSH is funny about private key permissions.
os.chmod(path, 0600)
os.chmod(path, int('0600', 8))
return path
@ -82,7 +90,7 @@ def wait_for_port(
return
sock.settimeout(receive_timeout)
data = ''
data = mitogen.core.b('')
found = False
while time.time() < end:
try:
@ -97,21 +105,25 @@ def wait_for_port(
break
data += resp
if re.search(pattern, data):
if re.search(mitogen.core.b(pattern), data):
found = True
break
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error, e:
# On Mac OS X - a BSD variant - the above code only succeeds if the operating system thinks that the
# socket is still open when shutdown() is invoked. If Python is too slow and the FIN packet arrives
# before that statement can be reached, then OS X kills the sock.shutdown() statement with:
except socket.error:
e = sys.exc_info()[1]
# On Mac OS X - a BSD variant - the above code only succeeds if the
# operating system thinks that the socket is still open when
# shutdown() is invoked. If Python is too slow and the FIN packet
# arrives before that statement can be reached, then OS X kills the
# sock.shutdown() statement with:
#
# socket.error: [Errno 57] Socket is not connected
#
# Protect shutdown() with a try...except that catches the socket.error, test to make sure Errno is
# right, and ignore it if Errno matches.
# Protect shutdown() with a try...except that catches the
# socket.error, test to make sure Errno is right, and ignore it if
# Errno matches.
if e.errno == 57:
pass
else:
@ -147,7 +159,7 @@ def sync_with_broker(broker, timeout=10.0):
class LogCapturer(object):
def __init__(self, name=None):
self.sio = StringIO.StringIO()
self.sio = StringIO()
self.logger = logging.getLogger(name)
self.handler = logging.StreamHandler(self.sio)
self.old_propagate = self.logger.propagate
@ -169,9 +181,11 @@ class TestCase(unittest2.TestCase):
raised. Can't use context manager because tests must run on Python2.4"""
try:
func(*args, **kwargs)
except exc, e:
except exc:
e = sys.exc_info()[1]
return e
except BaseException, e:
except BaseException:
e = sys.exc_info()[1]
assert 0, '%r raised %r, not %r' % (func, e, exc)
assert 0, '%r did not raise %r' % (func, exc)

@ -0,0 +1,33 @@
import logging
import time
import unittest2
import mitogen.core
import mitogen.master
import testlib
def roundtrip(*args):
return args
class TwoThreeCompatTest(testlib.RouterMixin, testlib.TestCase):
if mitogen.core.PY3:
python_path = 'python2'
else:
python_path = 'python3'
def test_succeeds(self):
self.router.enable_debug()
spare = self.router.fork()
target = self.router.local(python_path=self.python_path)
spare2, = target.call(roundtrip, spare)
self.assertEquals(spare.context_id, spare2.context_id)
self.assertEquals(spare.name, spare2.name)
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save