Merge remote-tracking branch 'origin/dmw'

- issue #411: fix check_host_keys="accept"
- issue #305: dead message if max message size exceeded
- issue #369: implement Connection.reset()
- issue #76: disconnect propagation
- log format string fixes
- various 2/3 test fixes
- large message benchmark
- centralize stub client utils in data/stubs/
- activate faulthandler in tests
- better OpenSSH 7.5+ permission denied handling
issue260
David Wilson 6 years ago
commit 9ba0561dd2

1
.gitignore vendored

@ -1,6 +1,7 @@
.coverage .coverage
.tox .tox
.venv .venv
venvs/**
**/.DS_Store **/.DS_Store
*.pyc *.pyc
*.pyd *.pyd

@ -14,7 +14,8 @@ cache:
- /home/travis/virtualenv - /home/travis/virtualenv
install: install:
- pip install -r dev_requirements.txt # |cat to disable progress bar.
- pip install -r dev_requirements.txt |cat
script: script:
- | - |

@ -34,6 +34,7 @@ with ci_lib.Fold('job_setup'):
os.chdir(TESTS_DIR) os.chdir(TESTS_DIR)
os.chmod('../data/docker/mitogen__has_sudo_pubkey.key', int('0600', 7)) os.chmod('../data/docker/mitogen__has_sudo_pubkey.key', int('0600', 7))
run("pip install -qr requirements.txt") # tests/ansible/requirements
# Don't set -U as that will upgrade Paramiko to a non-2.6 compatible version. # Don't set -U as that will upgrade Paramiko to a non-2.6 compatible version.
run("pip install -q ansible==%s", ci_lib.ANSIBLE_VERSION) run("pip install -q ansible==%s", ci_lib.ANSIBLE_VERSION)

@ -32,6 +32,17 @@ def subprocess__check_output(*popenargs, **kwargs):
if not hasattr(subprocess, 'check_output'): if not hasattr(subprocess, 'check_output'):
subprocess.check_output = subprocess__check_output subprocess.check_output = subprocess__check_output
# -----------------
# Force stdout FD 1 to be a pipe, so tools like pip don't spam progress bars.
sys.stdout = os.popen('stdbuf -oL cat', 'w', 1)
os.dup2(sys.stdout.fileno(), 1)
sys.stderr = sys.stdout
os.dup2(sys.stderr.fileno(), 2)
# ----------------- # -----------------
def _argv(s, *args): def _argv(s, *args):

@ -27,7 +27,7 @@ mkdir "$TMPDIR"
echo travis_fold:start:job_setup echo travis_fold:start:job_setup
pip install -qqqU debops==0.7.2 ansible==${ANSIBLE_VERSION} pip install -qqqU debops==0.7.2 ansible==${ANSIBLE_VERSION} |cat
debops-init "$TMPDIR/project" debops-init "$TMPDIR/project"
cd "$TMPDIR/project" cd "$TMPDIR/project"

@ -427,16 +427,30 @@ def config_from_hostvars(transport, inventory_name, connection,
class CallChain(mitogen.parent.CallChain): class CallChain(mitogen.parent.CallChain):
"""
Extend :class:`mitogen.parent.CallChain` to additionally cause the
associated :class:`Connection` to be reset if a ChannelError occurs.
This only catches failures that occur while a call is pnding, it is a
stop-gap until a more general method is available to notice connection in
every situation.
"""
call_aborted_msg = ( call_aborted_msg = (
'Mitogen was disconnected from the remote environment while a call ' 'Mitogen was disconnected from the remote environment while a call '
'was in-progress. If you feel this is in error, please file a bug. ' 'was in-progress. If you feel this is in error, please file a bug. '
'Original error was: %s' 'Original error was: %s'
) )
def __init__(self, connection, context, pipelined=False):
super(CallChain, self).__init__(context, pipelined)
#: The connection to reset on CallError.
self._connection = connection
def _rethrow(self, recv): def _rethrow(self, recv):
try: try:
return recv.get().unpickle() return recv.get().unpickle()
except mitogen.core.ChannelError as e: except mitogen.core.ChannelError as e:
self._connection.reset()
raise ansible.errors.AnsibleConnectionFailure( raise ansible.errors.AnsibleConnectionFailure(
self.call_aborted_msg % (e,) self.call_aborted_msg % (e,)
) )
@ -550,7 +564,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self.host_vars = task_vars['hostvars'] self.host_vars = task_vars['hostvars']
self.delegate_to_hostname = delegate_to_hostname self.delegate_to_hostname = delegate_to_hostname
self.loader_basedir = loader_basedir self.loader_basedir = loader_basedir
self.close(new_task=True) self._reset(mode='put')
def get_task_var(self, key, default=None): def get_task_var(self, key, default=None):
if self._task_vars and key in self._task_vars: if self._task_vars and key in self._task_vars:
@ -682,7 +696,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
raise ansible.errors.AnsibleConnectionFailure(dct['msg']) raise ansible.errors.AnsibleConnectionFailure(dct['msg'])
self.context = dct['context'] self.context = dct['context']
self.chain = CallChain(self.context, pipelined=True) self.chain = CallChain(self, self.context, pipelined=True)
if self._play_context.become: if self._play_context.become:
self.login_context = dct['via'] self.login_context = dct['via']
else: else:
@ -709,6 +723,20 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self.get_chain().call_no_reply(os.mkdir, self._shell.tmpdir) self.get_chain().call_no_reply(os.mkdir, self._shell.tmpdir)
return self._shell.tmpdir return self._shell.tmpdir
def _reset_tmp_path(self):
"""
Called by _reset(); ask the remote context to delete any temporary
directory created for the action. CallChain is not used here to ensure
exception is logged by the context on failure, since the CallChain
itself is about to be destructed.
"""
if getattr(self._shell, 'tmpdir', None) is not None:
self.context.call_no_reply(
ansible_mitogen.target.prune_tree,
self._shell.tmpdir,
)
self._shell.tmpdir = None
def _connect(self): def _connect(self):
""" """
Establish a connection to the master process's UNIX listener socket, Establish a connection to the master process's UNIX listener socket,
@ -727,38 +755,53 @@ class Connection(ansible.plugins.connection.ConnectionBase):
stack = self._build_stack() stack = self._build_stack()
self._connect_stack(stack) self._connect_stack(stack)
def close(self, new_task=False): def _reset(self, mode):
""" """
Arrange for the mitogen.master.Router running in the worker to Forget everything we know about the connected context.
gracefully shut down, and wait for shutdown to complete. Safe to call
multiple times. :param str mode:
Name of ContextService method to use to discard the context, either
'put' or 'reset'.
""" """
if getattr(self._shell, 'tmpdir', None) is not None: if not self.context:
# Avoid CallChain to ensure exception is logged on failure. return
self.context.call_no_reply(
ansible_mitogen.target.prune_tree,
self._shell.tmpdir,
)
self._shell.tmpdir = None
if self.context: self._reset_tmp_path()
self.chain.reset() self.chain.reset()
self.parent.call_service( self.parent.call_service(
service_name='ansible_mitogen.services.ContextService', service_name='ansible_mitogen.services.ContextService',
method_name='put', method_name=mode,
context=self.context context=self.context
) )
self.context = None self.context = None
self.login_context = None self.login_context = None
self.init_child_result = None self.init_child_result = None
self.chain = None self.chain = None
if self.broker and not new_task:
def close(self):
"""
Arrange for the mitogen.master.Router running in the worker to
gracefully shut down, and wait for shutdown to complete. Safe to call
multiple times.
"""
self._reset(mode='put')
if self.broker:
self.broker.shutdown() self.broker.shutdown()
self.broker.join() self.broker.join()
self.broker = None self.broker = None
self.router = None self.router = None
def reset(self):
"""
Explicitly terminate the connection to the remote host. This discards
any local state we hold for the connection, returns the Connection to
the 'disconnected' state, and informs ContextService the connection is
bad somehow, and should be shut down and discarded.
"""
self._connect()
self._reset(mode='reset')
def get_chain(self, use_login=False, use_fork=False): def get_chain(self, use_login=False, use_fork=False):
""" """
Return the :class:`mitogen.parent.CallChain` to use for executing Return the :class:`mitogen.parent.CallChain` to use for executing

@ -89,6 +89,24 @@ def _get_candidate_temp_dirs():
return mitogen.utils.cast(dirs) return mitogen.utils.cast(dirs)
def key_from_dict(**kwargs):
"""
Return a unique string representation of a dict as quickly as possible.
Used to generated deduplication keys from a request.
"""
out = []
stack = [kwargs]
while stack:
obj = stack.pop()
if isinstance(obj, dict):
stack.extend(sorted(obj.items()))
elif isinstance(obj, (list, tuple)):
stack.extend(obj)
else:
out.append(str(obj))
return ''.join(out)
class Error(Exception): class Error(Exception):
pass pass
@ -113,7 +131,7 @@ class ContextService(mitogen.service.Service):
super(ContextService, self).__init__(*args, **kwargs) super(ContextService, self).__init__(*args, **kwargs)
self._lock = threading.Lock() self._lock = threading.Lock()
#: Records the :meth:`get` result dict for successful calls, returned #: Records the :meth:`get` result dict for successful calls, returned
#: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`. #: for identical subsequent calls. Keyed by :meth:`key_from_dict`.
self._response_by_key = {} self._response_by_key = {}
#: List of :class:`mitogen.core.Latch` awaiting the result for a #: List of :class:`mitogen.core.Latch` awaiting the result for a
#: particular key. #: particular key.
@ -126,8 +144,27 @@ class ContextService(mitogen.service.Service):
#: :attr:`max_interpreters` is reached, the most recently used context #: :attr:`max_interpreters` is reached, the most recently used context
#: is destroyed to make room for any additional context. #: is destroyed to make room for any additional context.
self._lru_by_via = {} self._lru_by_via = {}
#: :meth:`key_from_kwargs` result by Context. #: :func:`key_from_dict` result by Context.
self._key_by_context = {} self._key_by_context = {}
#: Mapping of Context -> parent Context
self._via_by_context = {}
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'context': mitogen.core.Context
})
def reset(self, context):
"""
Return a reference, forcing close and discard of the underlying
connection. Used for 'meta: reset_connection' or when some other error
is detected.
"""
LOG.debug('%r.reset(%r)', self, context)
self._lock.acquire()
try:
self._shutdown_unlocked(context)
finally:
self._lock.release()
@mitogen.service.expose(mitogen.service.AllowParents()) @mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
@ -149,29 +186,13 @@ class ContextService(mitogen.service.Service):
finally: finally:
self._lock.release() self._lock.release()
def key_from_kwargs(self, **kwargs):
"""
Generate a deduplication key from the request.
"""
out = []
stack = [kwargs]
while stack:
obj = stack.pop()
if isinstance(obj, dict):
stack.extend(sorted(obj.items()))
elif isinstance(obj, (list, tuple)):
stack.extend(obj)
else:
out.append(str(obj))
return ''.join(out)
def _produce_response(self, key, response): def _produce_response(self, key, response):
""" """
Reply to every waiting request matching a configuration key with a Reply to every waiting request matching a configuration key with a
response dictionary, deleting the list of waiters when done. response dictionary, deleting the list of waiters when done.
:param str key: :param str key:
Result of :meth:`key_from_kwargs` Result of :meth:`key_from_dict`
:param dict response: :param dict response:
Response dictionary Response dictionary
:returns: :returns:
@ -187,6 +208,19 @@ class ContextService(mitogen.service.Service):
self._lock.release() self._lock.release()
return count return count
def _forget_context_unlocked(self, context):
key = self._key_by_context.get(context)
if key is None:
LOG.debug('%r: attempt to forget unknown %r', self, context)
return
self._response_by_key.pop(key, None)
self._latches_by_key.pop(key, None)
self._key_by_context.pop(context, None)
self._refs_by_context.pop(context, None)
self._via_by_context.pop(context, None)
self._lru_by_via.pop(context, None)
def _shutdown_unlocked(self, context, lru=None, new_context=None): def _shutdown_unlocked(self, context, lru=None, new_context=None):
""" """
Arrange for `context` to be shut down, and optionally add `new_context` Arrange for `context` to be shut down, and optionally add `new_context`
@ -194,15 +228,15 @@ class ContextService(mitogen.service.Service):
""" """
LOG.info('%r._shutdown_unlocked(): shutting down %r', self, context) LOG.info('%r._shutdown_unlocked(): shutting down %r', self, context)
context.shutdown() context.shutdown()
via = self._via_by_context.get(context)
key = self._key_by_context[context] if via:
del self._response_by_key[key] lru = self._lru_by_via.get(via)
del self._refs_by_context[context] if lru:
del self._key_by_context[context] if context in lru:
if lru and context in lru: lru.remove(context)
lru.remove(context) if new_context:
if new_context: lru.append(new_context)
lru.append(new_context) self._forget_context_unlocked(context)
def _update_lru_unlocked(self, new_context, spec, via): def _update_lru_unlocked(self, new_context, spec, via):
""" """
@ -223,6 +257,7 @@ class ContextService(mitogen.service.Service):
'but they are all marked as in-use.', via) 'but they are all marked as in-use.', via)
return return
self._via_by_context[new_context] = via
self._shutdown_unlocked(context, lru=lru, new_context=new_context) self._shutdown_unlocked(context, lru=lru, new_context=new_context)
def _update_lru(self, new_context, spec, via): def _update_lru(self, new_context, spec, via):
@ -241,7 +276,6 @@ class ContextService(mitogen.service.Service):
try: try:
for context in list(self._key_by_context): for context in list(self._key_by_context):
self._shutdown_unlocked(context) self._shutdown_unlocked(context)
self._lru_by_via = {}
finally: finally:
self._lock.release() self._lock.release()
@ -256,15 +290,12 @@ class ContextService(mitogen.service.Service):
# in _latches_by_key below. # in _latches_by_key below.
self._lock.acquire() self._lock.acquire()
try: try:
for context, key in list(self._key_by_context.items()): routes = self.router.route_monitor.get_routes(stream)
if context.context_id in stream.routes: for context in list(self._key_by_context):
if context.context_id in routes:
LOG.info('Dropping %r due to disconnect of %r', LOG.info('Dropping %r due to disconnect of %r',
context, stream) context, stream)
self._response_by_key.pop(key, None) self._forget_context_unlocked(context)
self._latches_by_key.pop(key, None)
self._refs_by_context.pop(context, None)
self._lru_by_via.pop(context, None)
self._refs_by_context.pop(context, None)
finally: finally:
self._lock.release() self._lock.release()
@ -360,7 +391,7 @@ class ContextService(mitogen.service.Service):
def _wait_or_start(self, spec, via=None): def _wait_or_start(self, spec, via=None):
latch = mitogen.core.Latch() latch = mitogen.core.Latch()
key = self.key_from_kwargs(via=via, **spec) key = key_from_dict(via=via, **spec)
self._lock.acquire() self._lock.acquire()
try: try:
response = self._response_by_key.get(key) response = self._response_by_key.get(key)

@ -241,8 +241,7 @@ def is_good_temp_dir(path):
try: try:
os.chmod(tmp.name, int('0700', 8)) os.chmod(tmp.name, int('0700', 8))
except OSError as e: except OSError as e:
LOG.debug('temp dir %r unusable: %s: chmod failed: %s', LOG.debug('temp dir %r unusable: chmod failed: %s', path, e)
path, e)
return False return False
try: try:
@ -250,7 +249,7 @@ def is_good_temp_dir(path):
if not os.access(tmp.name, os.X_OK): if not os.access(tmp.name, os.X_OK):
raise OSError('filesystem appears to be mounted noexec') raise OSError('filesystem appears to be mounted noexec')
except OSError as e: except OSError as e:
LOG.debug('temp dir %r unusable: %s: %s', path, e) LOG.debug('temp dir %r unusable: %s', path, e)
return False return False
finally: finally:
tmp.close() tmp.close()

@ -1,10 +1,11 @@
-r docs/docs-requirements.txt -r docs/docs-requirements.txt
ansible==2.6.1
coverage==4.5.1 coverage==4.5.1
Django==1.6.11 # Last version supporting 2.6. Django==1.6.11 # Last version supporting 2.6.
mock==2.0.0 mock==2.0.0
pytz==2018.5 pytz==2018.5
paramiko==2.3.2 # Last 2.6-compat version. cffi==1.11.2 # Random pin to try and fix pyparser==2.18 not having effect
pycparser==2.18 # Last version supporting 2.6.
faulthandler==3.1; python_version < '3.3' # used by testlib
pytest-catchlog==1.2.2 pytest-catchlog==1.2.2
pytest==3.1.2 pytest==3.1.2
PyYAML==3.11; python_version < '2.7' PyYAML==3.11; python_version < '2.7'
@ -14,4 +15,3 @@ unittest2==1.1.0
# Fix InsecurePlatformWarning while creating py26 tox environment # Fix InsecurePlatformWarning while creating py26 tox environment
# https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings # https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
urllib3[secure]; python_version < '2.7.9' urllib3[secure]; python_version < '2.7.9'
google-api-python-client==1.6.5

@ -1227,6 +1227,14 @@ Broker Class
thread, or immediately if the current thread is the broker thread. Safe thread, or immediately if the current thread is the broker thread. Safe
to call from any thread. to call from any thread.
.. method:: defer_sync (func)
Arrange for `func()` to execute on the broker thread, blocking the
current thread until a result or exception is available.
:returns:
Call result.
.. method:: start_receive (stream) .. method:: start_receive (stream)
Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as

@ -15,6 +15,59 @@ Release Notes
</style> </style>
v0.2.4 (2018-??-??)
------------------
Mitogen for Ansible
~~~~~~~~~~~~~~~~~~~
Enhancements
^^^^^^^^^^^^
* `#369 <https://github.com/dw/mitogen/issues/369>`_: :meth:`Connection.reset`
is implemented, allowing `meta: reset_connection
<https://docs.ansible.com/ansible/latest/modules/meta_module.html>`_ to shut
down the remote interpreter as expected, and improving support for the
`reboot
<https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_
module.
Fixes
^^^^^
Core Library
~~~~~~~~~~~~
* `#76 <https://github.com/dw/mitogen/issues/76>`_: routing maintains the set
of destination context ID ever received on each stream, and when
disconnection occurs, propagates ``DEL_ROUTE`` messages downwards towards
every stream that ever communicated with a disappearing peer, rather than
simply toward parents.
Conversations between nodes in any level of the connection tree should
correctly receive ``DEL_ROUTE`` messages when a participant disconnects,
allowing receivers to be woken with :class:`mitogen.core.ChannelError` to
signal the connection has broken, even when one participant is not a parent
of the other.
* `#405 <https://github.com/dw/mitogen/issues/405>`_: if a message is rejected
due to being too large, and it has a ``reply_to`` set, a dead message is
returned to the sender. This ensures function calls exceeding the configured
maximum size crash rather than hang.
* `#411 <https://github.com/dw/mitogen/issues/411>`_: the SSH method typed
"``y``" rather than the requisite "``yes``" when `check_host_keys="accept"`
was configured. This would lead to connection timeouts due to the hung
response.
* `16ca111e <https://github.com/dw/mitogen/commit/16ca111e>`_: handle OpenSSH
7.5 permission denied prompts when ``~/.ssh/config`` rewrites are present.
* `9ec360c2 <https://github.com/dw/mitogen/commit/9ec360c2>`_: a new
:meth:`mitogen.core.Broker.defer_sync` utility function is provided.
v0.2.3 (2018-10-23) v0.2.3 (2018-10-23)
------------------- -------------------

@ -246,7 +246,7 @@ Running User Functions
---------------------- ----------------------
So far we have used the interactive interpreter to call some standard library So far we have used the interactive interpreter to call some standard library
functions, but if since source code typed at the interpreter cannot be functions, but since the source code typed at the interpreter cannot be
recovered, Mitogen is unable to execute functions defined in this way. recovered, Mitogen is unable to execute functions defined in this way.
We must therefore continue by writing our code as a script:: We must therefore continue by writing our code as a script::

@ -575,6 +575,28 @@ When ``sudo:node22a:webapp`` wants to send a message to
.. image:: images/route.png .. image:: images/route.png
Disconnect Propagation
######################
To ensure timely shutdown when a failure occurs, where some context is awaiting
a response from another context that has become disconnected,
:class:`mitogen.core.Router` additionally records the destination context ID of
every message received on a particular stream.
When ``DEL_ROUTE`` is generated locally or received on some other stream,
:class:`mitogen.parent.RouteMonitor` uses this to find every stream that ever
communicated with the route that is about to go away, and forwards the message
to each found.
The recipient ``DEL_ROUTE`` handler in turn uses the message to find any
:class:`mitogen.core.Context` in the local process corresponding to the
disappearing route, and if found, fires a ``disconnected`` event on it.
Any interested party, such as :class:`mitogen.core.Receiver`, may subscribe to
the event and use it to abort any threads that were asleep waiting for a reply
that will never arrive.
.. _source-verification: .. _source-verification:
Source Verification Source Verification

@ -1085,6 +1085,9 @@ class Stream(BasicStream):
self._output_buf = collections.deque() self._output_buf = collections.deque()
self._input_buf_len = 0 self._input_buf_len = 0
self._output_buf_len = 0 self._output_buf_len = 0
#: Routing records the dst_id of every message arriving from this
#: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.
self.egress_ids = set()
def construct(self): def construct(self):
pass pass
@ -1279,7 +1282,7 @@ def _unpickle_context(router, context_id, name):
(isinstance(name, UnicodeType) and len(name) < 100)) (isinstance(name, UnicodeType) and len(name) < 100))
): ):
raise TypeError('cannot unpickle Context: bad input') raise TypeError('cannot unpickle Context: bad input')
return router.context_class(router, context_id, name) return router.context_by_id(context_id, name=name)
class Poller(object): class Poller(object):
@ -1716,10 +1719,27 @@ class Router(object):
self._last_handle = itertools.count(1000) self._last_handle = itertools.count(1000)
#: handle -> (persistent?, func(msg)) #: handle -> (persistent?, func(msg))
self._handle_map = {} self._handle_map = {}
self.add_handler(self._on_del_route, DEL_ROUTE)
def __repr__(self): def __repr__(self):
return 'Router(%r)' % (self.broker,) return 'Router(%r)' % (self.broker,)
def _on_del_route(self, msg):
"""
Stub DEL_ROUTE handler; fires 'disconnect' events on the corresponding
member of :attr:`_context_by_id`. This handler is replaced by
:class:`mitogen.parent.RouteMonitor` in an upgraded context.
"""
LOG.error('%r._on_del_route() %r', self, msg)
if not msg.is_dead:
target_id_s, _, name = msg.data.partition(b(':'))
target_id = int(target_id_s, 10)
if target_id not in self._context_by_id:
LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg)
return
fire(self._context_by_id[target_id], 'disconnect')
def on_stream_disconnect(self, stream): def on_stream_disconnect(self, stream):
for context in self._context_by_id.values(): for context in self._context_by_id.values():
stream_ = self._stream_by_id.get(context.context_id) stream_ = self._stream_by_id.get(context.context_id)
@ -1732,6 +1752,15 @@ class Router(object):
_, (_, func, _) = self._handle_map.popitem() _, (_, func, _) = self._handle_map.popitem()
func(Message.dead()) func(Message.dead())
def context_by_id(self, context_id, via_id=None, create=True, name=None):
context = self._context_by_id.get(context_id)
if create and not context:
context = self.context_class(self, context_id, name=name)
if via_id is not None:
context.via = self.context_by_id(via_id)
self._context_by_id[context_id] = context
return context
def register(self, context, stream): def register(self, context, stream):
_v and LOG.debug('register(%r, %r)', context, stream) _v and LOG.debug('register(%r, %r)', context, stream)
self._stream_by_id[context.context_id] = stream self._stream_by_id[context.context_id] = stream
@ -1803,11 +1832,17 @@ class Router(object):
except Exception: except Exception:
LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn)
def _maybe_send_dead(self, msg):
if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead(), router=self)
def _async_route(self, msg, in_stream=None): def _async_route(self, msg, in_stream=None):
_vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream)
if len(msg.data) > self.max_message_size: if len(msg.data) > self.max_message_size:
LOG.error('message too large (max %d bytes): %r', LOG.error('message too large (max %d bytes): %r',
self.max_message_size, msg) self.max_message_size, msg)
self._maybe_send_dead(msg)
return return
# Perform source verification. # Perform source verification.
@ -1829,6 +1864,9 @@ class Router(object):
if in_stream.auth_id is not None: if in_stream.auth_id is not None:
msg.auth_id = in_stream.auth_id msg.auth_id = in_stream.auth_id
# Maintain a set of IDs the source ever communicated with.
in_stream.egress_ids.add(msg.dst_id)
if msg.dst_id == mitogen.context_id: if msg.dst_id == mitogen.context_id:
return self._invoke(msg, in_stream) return self._invoke(msg, in_stream)
@ -1836,21 +1874,18 @@ class Router(object):
if out_stream is None: if out_stream is None:
out_stream = self._stream_by_id.get(mitogen.parent_id) out_stream = self._stream_by_id.get(mitogen.parent_id)
dead = False
if out_stream is None: if out_stream is None:
LOG.error('%r: no route for %r, my ID is %r', if msg.reply_to not in (0, IS_DEAD):
self, msg, mitogen.context_id) LOG.error('%r: no route for %r, my ID is %r',
dead = True self, msg, mitogen.context_id)
self._maybe_send_dead(msg)
return
if in_stream and self.unidirectional and not dead and \ if in_stream and self.unidirectional and not \
not (in_stream.is_privileged or out_stream.is_privileged): (in_stream.is_privileged or out_stream.is_privileged):
LOG.error('routing mode prevents forward of %r from %r -> %r', LOG.error('routing mode prevents forward of %r from %r -> %r',
msg, in_stream, out_stream) msg, in_stream, out_stream)
dead = True self._maybe_send_dead(msg)
if dead:
if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead(), router=self)
return return
out_stream._send(msg) out_stream._send(msg)
@ -1907,6 +1942,25 @@ class Broker(object):
it = (side.keep_alive for (_, (side, _)) in self.poller.readers) it = (side.keep_alive for (_, (side, _)) in self.poller.readers)
return sum(it, 0) return sum(it, 0)
def defer_sync(self, func):
"""
Block the calling thread while `func` runs on a broker thread.
:returns:
Return value of `func()`.
"""
latch = Latch()
def wrapper():
try:
latch.put(func())
except Exception:
latch.put(sys.exc_info()[1])
self.defer(wrapper)
res = latch.get()
if isinstance(res, Exception):
raise res
return res
def _call(self, stream, func): def _call(self, stream, func):
try: try:
func(self) func(self)
@ -2067,11 +2121,6 @@ class ExternalContext(object):
_v and LOG.debug('%r: parent stream is gone, dying.', self) _v and LOG.debug('%r: parent stream is gone, dying.', self)
self.broker.shutdown() self.broker.shutdown()
def _sync(self, func):
latch = Latch()
self.broker.defer(lambda: latch.put(func()))
return latch.get()
def detach(self): def detach(self):
self.detached = True self.detached = True
stream = self.router.stream_by_id(mitogen.parent_id) stream = self.router.stream_by_id(mitogen.parent_id)
@ -2080,7 +2129,7 @@ class ExternalContext(object):
self.parent.send_await(Message(handle=DETACHING)) self.parent.send_await(Message(handle=DETACHING))
LOG.info('Detaching from %r; parent is %s', stream, self.parent) LOG.info('Detaching from %r; parent is %s', stream, self.parent)
for x in range(20): for x in range(20):
pending = self._sync(lambda: stream.pending_bytes()) pending = self.broker.defer_sync(lambda: stream.pending_bytes())
if not pending: if not pending:
break break
time.sleep(0.05) time.sleep(0.05)

@ -335,6 +335,11 @@ class LogForwarder(object):
class ModuleFinder(object): class ModuleFinder(object):
"""
Given the name of a loaded module, make a best-effort attempt at finding
related modules likely needed by a child context requesting the original
module.
"""
def __init__(self): def __init__(self):
#: Import machinery is expensive, keep :py:meth`:get_module_source` #: Import machinery is expensive, keep :py:meth`:get_module_source`
#: results around. #: results around.
@ -477,7 +482,8 @@ class ModuleFinder(object):
def resolve_relpath(self, fullname, level): def resolve_relpath(self, fullname, level):
"""Given an ImportFrom AST node, guess the prefix that should be tacked """Given an ImportFrom AST node, guess the prefix that should be tacked
on to an alias name to produce a canonical name. `fullname` is the name on to an alias name to produce a canonical name. `fullname` is the name
of the module in which the ImportFrom appears.""" of the module in which the ImportFrom appears.
"""
mod = sys.modules.get(fullname, None) mod = sys.modules.get(fullname, None)
if hasattr(mod, '__path__'): if hasattr(mod, '__path__'):
fullname += '.__init__' fullname += '.__init__'
@ -499,7 +505,7 @@ class ModuleFinder(object):
def find_related_imports(self, fullname): def find_related_imports(self, fullname):
""" """
Return a list of non-stdlb modules that are directly imported by Return a list of non-stdlib modules that are directly imported by
`fullname`, plus their parents. `fullname`, plus their parents.
The list is determined by retrieving the source code of The list is determined by retrieving the source code of
@ -550,8 +556,8 @@ class ModuleFinder(object):
Return a list of non-stdlib modules that are imported directly or Return a list of non-stdlib modules that are imported directly or
indirectly by `fullname`, plus their parents. indirectly by `fullname`, plus their parents.
This method is like :py:meth:`on_disconect`, but it also recursively This method is like :py:meth:`find_related_imports`, but also
searches any modules which are imported by `fullname`. recursively searches any modules which are imported by `fullname`.
:param fullname: Fully qualified name of an _already imported_ module :param fullname: Fully qualified name of an _already imported_ module
for which source code can be retrieved for which source code can be retrieved

@ -69,6 +69,7 @@ from mitogen.core import IOLOG
IS_WSL = 'Microsoft' in os.uname()[2] IS_WSL = 'Microsoft' in os.uname()[2]
itervalues = getattr(dict, 'itervalues', dict.values)
if mitogen.core.PY3: if mitogen.core.PY3:
xrange = range xrange = range
@ -543,6 +544,7 @@ def _upgrade_broker(broker):
len(old.readers), len(old.writers)) len(old.readers), len(old.writers))
@mitogen.core.takes_econtext
def upgrade_router(econtext): def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO if not isinstance(econtext.router, Router): # TODO
econtext.broker.defer(_upgrade_broker, econtext.broker) econtext.broker.defer(_upgrade_broker, econtext.broker)
@ -911,9 +913,6 @@ class Stream(mitogen.core.Stream):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs) super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core']) self.sent_modules = set(['mitogen', 'mitogen.core'])
#: List of contexts reachable via this stream; used to cleanup routes
#: during disconnection.
self.routes = set([self.remote_id])
def construct(self, max_message_size, remote_name=None, python_path=None, def construct(self, max_message_size, remote_name=None, python_path=None,
debug=False, connect_timeout=None, profiling=False, debug=False, connect_timeout=None, profiling=False,
@ -1428,45 +1427,75 @@ class RouteMonitor(object):
persist=True, persist=True,
policy=is_immediate_child, policy=is_immediate_child,
) )
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.
self._routes_by_stream = {}
def propagate(self, handle, target_id, name=None): def _send_one(self, stream, handle, target_id, name):
# self.parent is None in the master.
if not self.parent:
return
data = str(target_id) data = str(target_id)
if name: if name:
data = '%s:%s' % (target_id, mitogen.core.b(name)) data = '%s:%s' % (target_id, mitogen.core.b(name))
self.parent.send( stream.send(
mitogen.core.Message( mitogen.core.Message(
handle=handle, handle=handle,
data=data.encode('utf-8'), data=data.encode('utf-8'),
dst_id=stream.remote_id,
) )
) )
def _propagate(self, handle, target_id, name=None):
if not self.parent:
# self.parent is None in the master.
return
stream = self.router.stream_by_id(self.parent.context_id)
self._send_one(stream, handle, target_id, name)
def _child_propagate(self, handle, target_id):
"""
For DEL_ROUTE, we additionally want to broadcast the message to any
stream that has ever communicated with the disconnecting ID, so
core.py's :meth:`mitogen.core.Router._on_del_route` can turn the
message into a disconnect event.
"""
for stream in itervalues(self.router._stream_by_id):
if target_id in stream.egress_ids:
self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None)
def notice_stream(self, stream): def notice_stream(self, stream):
""" """
When this parent is responsible for a new directly connected child When this parent is responsible for a new directly connected child
stream, we're also responsible for broadcasting DEL_ROUTE upstream stream, we're also responsible for broadcasting DEL_ROUTE upstream
if/when that child disconnects. if/when that child disconnects.
""" """
self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id, self._routes_by_stream[stream] = set([stream.remote_id])
stream.name) self._propagate(mitogen.core.ADD_ROUTE, stream.remote_id,
stream.name)
mitogen.core.listen( mitogen.core.listen(
obj=stream, obj=stream,
name='disconnect', name='disconnect',
func=lambda: self._on_stream_disconnect(stream), func=lambda: self._on_stream_disconnect(stream),
) )
def get_routes(self, stream):
"""
Return the set of context IDs reachable on a stream.
:param mitogen.core.Stream stream:
:returns: set([int])
"""
return self._routes_by_stream.get(stream) or set()
def _on_stream_disconnect(self, stream): def _on_stream_disconnect(self, stream):
""" """
Respond to disconnection of a local stream by Respond to disconnection of a local stream by
""" """
LOG.debug('%r is gone; propagating DEL_ROUTE for %r', routes = self._routes_by_stream.pop(stream)
stream, stream.routes) LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes)
for target_id in stream.routes: for target_id in routes:
self.router.del_route(target_id) self.router.del_route(target_id)
self.propagate(mitogen.core.DEL_ROUTE, target_id) self._propagate(mitogen.core.DEL_ROUTE, target_id)
self._child_propagate(mitogen.core.DEL_ROUTE, target_id)
context = self.router.context_by_id(target_id, create=False) context = self.router.context_by_id(target_id, create=False)
if context: if context:
@ -1489,9 +1518,9 @@ class RouteMonitor(object):
return return
LOG.debug('Adding route to %d via %r', target_id, stream) LOG.debug('Adding route to %d via %r', target_id, stream)
stream.routes.add(target_id) self._routes_by_stream[stream].add(target_id)
self.router.add_route(target_id, stream) self.router.add_route(target_id, stream)
self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name) self._propagate(mitogen.core.ADD_ROUTE, target_id, target_name)
def _on_del_route(self, msg): def _on_del_route(self, msg):
if msg.is_dead: if msg.is_dead:
@ -1505,14 +1534,21 @@ class RouteMonitor(object):
target_id, stream, registered_stream) target_id, stream, registered_stream)
return return
LOG.debug('Deleting route to %d via %r', target_id, stream)
stream.routes.discard(target_id)
self.router.del_route(target_id)
self.propagate(mitogen.core.DEL_ROUTE, target_id)
context = self.router.context_by_id(target_id, create=False) context = self.router.context_by_id(target_id, create=False)
if context: if context:
LOG.debug('%r: Firing local disconnect for %r', self, context)
mitogen.core.fire(context, 'disconnect') mitogen.core.fire(context, 'disconnect')
LOG.debug('Deleting route to %d via %r', target_id, stream)
routes = self._routes_by_stream.get(stream)
if routes:
routes.discard(target_id)
self.router.del_route(target_id)
if stream.remote_id != mitogen.parent_id:
self._propagate(mitogen.core.DEL_ROUTE, target_id)
self._child_propagate(mitogen.core.DEL_ROUTE, target_id)
class Router(mitogen.core.Router): class Router(mitogen.core.Router):
context_class = Context context_class = Context
@ -1562,11 +1598,12 @@ class Router(mitogen.core.Router):
def del_route(self, target_id): def del_route(self, target_id):
LOG.debug('%r.del_route(%r)', self, target_id) LOG.debug('%r.del_route(%r)', self, target_id)
try: # DEL_ROUTE may be sent by a parent if it knows this context sent
del self._stream_by_id[target_id] # messages to a peer that has now disconnected, to let us raise
except KeyError: # 'disconnect' event on the appropriate Context instance. In that case,
LOG.error('%r: cant delete route to %r: no such stream', # we won't a matching _stream_by_id entry for the disappearing route,
self, target_id) # so don't raise an error for a missing key here.
self._stream_by_id.pop(target_id, None)
def get_module_blacklist(self): def get_module_blacklist(self):
if mitogen.context_id == 0: if mitogen.context_id == 0:
@ -1581,15 +1618,6 @@ class Router(mitogen.core.Router):
def allocate_id(self): def allocate_id(self):
return self.id_allocator.allocate() return self.id_allocator.allocate()
def context_by_id(self, context_id, via_id=None, create=True):
context = self._context_by_id.get(context_id)
if create and not context:
context = self.context_class(self, context_id)
if via_id is not None:
context.via = self.context_by_id(via_id)
self._context_by_id[context_id] = context
return context
connection_timeout_msg = u"Connection timed out." connection_timeout_msg = u"Connection timed out."
def _connect(self, klass, name=None, **kwargs): def _connect(self, klass, name=None, **kwargs):

@ -31,6 +31,7 @@ Functionality to allow establishing new slave contexts over an SSH connection.
""" """
import logging import logging
import re
import time import time
try: try:
@ -46,10 +47,16 @@ LOG = logging.getLogger('mitogen')
# sshpass uses 'assword' because it doesn't lowercase the input. # sshpass uses 'assword' because it doesn't lowercase the input.
PASSWORD_PROMPT = b('password') PASSWORD_PROMPT = b('password')
PERMDENIED_PROMPT = b('permission denied')
HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?') HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?')
HOSTKEY_FAIL = b('host key verification failed.') HOSTKEY_FAIL = b('host key verification failed.')
# [user@host: ] permission denied
PERMDENIED_RE = re.compile(
('(?:[^@]+@[^:]+: )?' # Absent in OpenSSH <7.5
'Permission denied').encode(),
re.I
)
DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:')) DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:'))
@ -258,7 +265,7 @@ class Stream(mitogen.parent.Stream):
def _host_key_prompt(self): def _host_key_prompt(self):
if self.check_host_keys == 'accept': if self.check_host_keys == 'accept':
LOG.debug('%r: accepting host key', self) LOG.debug('%r: accepting host key', self)
self.tty_stream.transmit_side.write(b('y\n')) self.tty_stream.transmit_side.write(b('yes\n'))
return return
# _host_key_prompt() should never be reached with ignore or enforce # _host_key_prompt() should never be reached with ignore or enforce
@ -289,11 +296,7 @@ class Stream(mitogen.parent.Stream):
self._host_key_prompt() self._host_key_prompt()
elif HOSTKEY_FAIL in buf.lower(): elif HOSTKEY_FAIL in buf.lower():
raise HostKeyError(self.hostkey_failed_msg) raise HostKeyError(self.hostkey_failed_msg)
elif buf.lower().startswith(( elif PERMDENIED_RE.match(buf):
PERMDENIED_PROMPT,
b("%s@%s: " % (self.username, self.hostname))
+ PERMDENIED_PROMPT,
)):
# issue #271: work around conflict with user shell reporting # issue #271: work around conflict with user shell reporting
# 'permission denied' e.g. during chdir($HOME) by only matching # 'permission denied' e.g. during chdir($HOME) by only matching
# it at the start of the line. # it at the start of the line.

@ -52,7 +52,7 @@ def is_path_dead(path):
s.connect(path) s.connect(path)
except socket.error: except socket.error:
e = sys.exc_info()[1] e = sys.exc_info()[1]
return e[0] in (errno.ECONNREFUSED, errno.ENOENT) return e.args[0] in (errno.ECONNREFUSED, errno.ENOENT)
return False return False
@ -82,7 +82,7 @@ class Listener(mitogen.core.BasicStream):
sock.setblocking(True) sock.setblocking(True)
try: try:
pid, = struct.unpack('>L', sock.recv(4)) pid, = struct.unpack('>L', sock.recv(4))
except socket.error: except (struct.error, socket.error):
LOG.error('%r: failed to read remote identity: %s', LOG.error('%r: failed to read remote identity: %s',
self, sys.exc_info()[1]) self, sys.exc_info()[1])
return return

@ -0,0 +1,28 @@
#!/usr/bin/env python
"""
Print shell environment exports adding ARA plugins to the list of plugins
from ansible.cfg in the CWD.
"""
import os
import ara.setup
import ansible.constants as C
os.chdir(os.path.dirname(__file__))
print('export ANSIBLE_ACTION_PLUGINS=%s:%s' % (
':'.join(C.DEFAULT_ACTION_PLUGIN_PATH),
ara.setup.action_plugins,
))
print('export ANSIBLE_CALLBACK_PLUGINS=%s:%s' % (
':'.join(C.DEFAULT_CALLBACK_PLUGIN_PATH),
ara.setup.callback_plugins,
))
print('export ANSIBLE_LIBRARY=%s:%s' % (
':'.join(C.DEFAULT_MODULE_PATH),
ara.setup.library,
))

@ -0,0 +1,25 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: Create file tree
connection: local
shell: >
mkdir -p /tmp/filetree.in;
for i in `seq -f /tmp/filetree.in/%g 1 100`; do echo $RANDOM > $i; done;
- name: Delete remote file tree
file: path=/tmp/filetree.out state=absent
when: 0
- file:
state: directory
path: /tmp/filetree.out
- name: Trigger nasty process pileup
copy:
src: "{{item.src}}"
dest: "/tmp/filetree.out/{{item.path}}"
with_filetree: /tmp/filetree.in
when: item.state == 'file'

@ -0,0 +1,25 @@
z
[z-x10]
z-[01:10]
[z-x20]
z-[01:20]
[z-x50]
z-[01:50]
[z-x100]
z-[001:100]
[z-x200]
z-[001:200]
[z-x300]
z-[001:300]
[z-x400]
z-[001:400]
[z-x500]
z-[001:500]

@ -1,5 +1,8 @@
--- ---
- import_playbook: disconnect_during_module.yml
- import_playbook: disconnect_resets_connection.yml
- import_playbook: exec_command.yml - import_playbook: exec_command.yml
- import_playbook: put_small_file.yml
- import_playbook: put_large_file.yml - import_playbook: put_large_file.yml
- import_playbook: put_small_file.yml
- import_playbook: reset.yml

@ -0,0 +1,19 @@
# issue 352: test ability to notice disconnection during a module invocation.
---
- name: integration/connection/disconnect_during_module.yml
hosts: test-targets localhost
gather_facts: no
any_errors_fatal: false
tasks:
- run_once: true # don't run against localhost
shell: |
kill -9 $PPID
register: out
ignore_errors: true
- assert:
that:
- out.msg.startswith('Mitogen was disconnected from the remote environment while a call was in-progress.')
- meta: clear_host_errors

@ -0,0 +1,44 @@
# issue 370: Connection should reset to 'disconnected' state when disconnect
# detected
#
# Previously the 'Mitogen was disconnected' error would fail the first task,
# but the Connection instance would still think it still had a valid
# connection.
#
# See also disconnect_during_module.yml
---
- name: integration/connection/disconnect_resets_connection.yml
hosts: test-targets
gather_facts: no
any_errors_fatal: true
tasks:
- mitogen_action_script:
script: |
import sys
from ansible.errors import AnsibleConnectionFailure
assert not self._connection.connected, \
"Connection was not initially disconnected."
self._low_level_execute_command('echo')
assert self._connection.connected, \
"Connection was not connected after good command."
try:
self._low_level_execute_command('kill -9 $PPID')
assert 0, 'AnsibleConnectionFailure was not raised'
except AnsibleConnectionFailure:
e = sys.exc_info()[1]
assert str(e).startswith('Mitogen was disconnected')
assert not self._connection.connected, \
"Connection did not reset."
try:
self._low_level_execute_command('kill -9 $PPID')
assert 0, 'AnsibleConnectionFailure was not raised'
except AnsibleConnectionFailure:
e = sys.exc_info()[1]
assert str(e).startswith('Mitogen was disconnected')

@ -0,0 +1,38 @@
# issue #369: Connection.reset() should cause destruction of the remote
# interpreter and any children.
---
- name: integration/connection/reset.yml
hosts: test-targets
tasks:
- when: is_mitogen
block:
- custom_python_detect_environment:
register: out
- custom_python_detect_environment:
become: true
register: out_become
- meta: reset_connection
- custom_python_detect_environment:
register: out2
- custom_python_detect_environment:
register: out_become2
- assert:
that:
# Interpreter PID has changed.
- out.pid != out2.pid
# SSH PID has changed.
- out.ppid != out2.ppid
# Interpreter PID has changed.
- out_become.pid != out_become2.pid
# sudo PID has changed.
- out_become.ppid != out_become2.ppid

@ -0,0 +1,28 @@
# I am an Ansible action plug-in. I run the script provided in the parameter in
# the context of the action.
import sys
from ansible.plugins.action import ActionBase
def execute(s, gbls, lcls):
if sys.version_info > (3,):
exec(s, gbls, lcls)
else:
exec('exec s in gbls, lcls')
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
super(ActionModule, self).run(tmp=tmp, task_vars=task_vars)
lcls = {
'self': self,
'result': {}
}
execute(self._task.args['script'], globals(), lcls)
return lcls['result']
if __name__ == '__main__':
main()

@ -0,0 +1,2 @@
paramiko==2.3.2 # Last 2.6-compat version.
google-api-python-client==1.6.5

@ -28,10 +28,10 @@ class ApplyModeSpecTest(unittest2.TestCase):
def test_simple(self): def test_simple(self):
spec = 'u+rwx,go=x' spec = 'u+rwx,go=x'
self.assertEquals(0711, self.func(spec, 0)) self.assertEquals(int('0711', 8), self.func(spec, 0))
spec = 'g-rw' spec = 'g-rw'
self.assertEquals(0717, self.func(spec, 0777)) self.assertEquals(int('0717', 8), self.func(spec, int('0777', 8)))
class IsGoodTempDirTest(unittest2.TestCase): class IsGoodTempDirTest(unittest2.TestCase):

@ -0,0 +1,5 @@
# tests/bench/
Various manually executed scripts to aid benchmarking, or trigger old
performance problems.

@ -0,0 +1,28 @@
# Verify _receive_one() quadratic behaviour fixed.
import subprocess
import time
import socket
import mitogen
@mitogen.main()
def main(router):
c = router.fork()
n = 1048576 * 127
s = ' ' * n
print('bytes in %.2fMiB string...' % (n/1048576.0),)
t0 = time.time()
for x in range(10):
tt0 = time.time()
assert n == c.call(len, s)
print('took %dms' % (1000 * (time.time() - tt0),))
t1 = time.time()
print('total %dms / %dms avg / %.2fMiB/sec' % (
1000 * (t1 - t0),
(1000 * (t1 - t0)) / (x + 1),
((n * (x + 1)) / (t1 - t0)) / 1048576.0,
))

@ -12,6 +12,6 @@ def do_nothing():
def main(router): def main(router):
f = router.fork() f = router.fork()
t0 = time.time() t0 = time.time()
for x in xrange(10000): for x in range(1000):
f.call(do_nothing) f.call(do_nothing)
print '++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec' print '++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec'

@ -0,0 +1,23 @@
"""
Measure latency of local service RPC.
"""
import time
import mitogen.service
import mitogen
class MyService(mitogen.service.Service):
@mitogen.service.expose(policy=mitogen.service.AllowParents())
def ping(self):
return 'pong'
@mitogen.main()
def main(router):
f = router.fork()
t0 = time.time()
for x in range(1000):
f.call_service(service_name=MyService, method_name='ping')
print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec')

@ -0,0 +1,32 @@
import threading
import unittest2
import testlib
import mitogen.core
class DeferSyncTest(testlib.TestCase):
klass = mitogen.core.Broker
def test_okay(self):
broker = self.klass()
try:
th = broker.defer_sync(lambda: threading.currentThread())
self.assertEquals(th, broker._thread)
finally:
broker.shutdown()
def test_exception(self):
broker = self.klass()
try:
self.assertRaises(ValueError,
broker.defer_sync, lambda: int('dave'))
finally:
broker.shutdown()
if __name__ == '__main__':
unittest2.main()

@ -103,7 +103,8 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
def test_accepts_returns_context(self): def test_accepts_returns_context(self):
context = self.local.call(func_returns_arg, self.local) context = self.local.call(func_returns_arg, self.local)
self.assertIsNot(context, self.local) # Unpickling now deduplicates Context instances.
self.assertIs(context, self.local)
self.assertEqual(context.context_id, self.local.context_id) self.assertEqual(context.context_id, self.local.context_id)
self.assertEqual(context.name, self.local.name) self.assertEqual(context.name, self.local.name)
@ -119,12 +120,12 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
lambda: recv.get().unpickle()) lambda: recv.get().unpickle())
class ChainTest(testlib.RouterMixin, testlib.TestCase): class CallChainTest(testlib.RouterMixin, testlib.TestCase):
# Verify mitogen_chain functionality. # Verify mitogen_chain functionality.
klass = mitogen.parent.CallChain klass = mitogen.parent.CallChain
def setUp(self): def setUp(self):
super(ChainTest, self).setUp() super(CallChainTest, self).setUp()
self.local = self.router.fork() self.local = self.router.fork()
def test_subsequent_calls_produce_same_error(self): def test_subsequent_calls_produce_same_error(self):

@ -0,0 +1,12 @@
import logging
import mitogen.master
def foo():
pass
logging.basicConfig(level=logging.INFO)
router = mitogen.master.Router()
l = router.local()
l.call(foo)

@ -0,0 +1,5 @@
# tests/data/stubs/
Dummy implementations of various third party tools that just spawn local Python
interpreters. Used to roughly test the tools' associated Mitogen classes.

@ -0,0 +1,7 @@
#!/usr/bin/env python
import sys
import os
os.environ['ORIGINAL_ARGV'] = repr(sys.argv)
os.execv(sys.executable, sys.argv[sys.argv.index('-c') - 1:])

@ -15,6 +15,9 @@ Are you sure you want to continue connecting (yes/no)?
HOST_KEY_STRICT_MSG = """Host key verification failed.\n""" HOST_KEY_STRICT_MSG = """Host key verification failed.\n"""
PERMDENIED_CLASSIC_MSG = 'Permission denied (publickey,password)\n'
PERMDENIED_75_MSG = 'chicken@nandos.com: permission denied (publickey,password)\n'
def tty(msg): def tty(msg):
fp = open('/dev/tty', 'wb', 0) fp = open('/dev/tty', 'wb', 0)
@ -37,13 +40,23 @@ def confirm(msg):
fp.close() fp.close()
if os.getenv('FAKESSH_MODE') == 'ask': mode = os.getenv('STUBSSH_MODE')
assert 'y\n' == confirm(HOST_KEY_ASK_MSG)
if mode == 'ask':
assert 'yes\n' == confirm(HOST_KEY_ASK_MSG)
if os.getenv('FAKESSH_MODE') == 'strict': elif mode == 'strict':
stderr(HOST_KEY_STRICT_MSG) stderr(HOST_KEY_STRICT_MSG)
sys.exit(255) sys.exit(255)
elif mode == 'permdenied_classic':
stderr(PERMDENIED_CLASSIC_MSG)
sys.exit(255)
elif mode == 'permdenied_75':
stderr(PERMDENIED_75_MSG)
sys.exit(255)
# #
# Set an env var if stderr was a TTY to make ssh_test tests easier to write. # Set an env var if stderr was a TTY to make ssh_test tests easier to write.

@ -0,0 +1,28 @@
import os
import mitogen
import unittest2
import testlib
class ConstructorTest(testlib.RouterMixin, unittest2.TestCase):
def test_okay(self):
docker_path = testlib.data_path('stubs/docker.py')
context = self.router.docker(
container='container_name',
docker_path=docker_path,
)
stream = self.router.stream_by_id(context.context_id)
argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV'))
self.assertEquals(argv[0], docker_path)
self.assertEquals(argv[1], 'exec')
self.assertEquals(argv[2], '--interactive')
self.assertEquals(argv[3], 'container_name')
self.assertEquals(argv[4], stream.python_path)
if __name__ == '__main__':
unittest2.main()

@ -11,9 +11,9 @@ def has_subseq(seq, subseq):
return any(seq[x:x+len(subseq)] == subseq for x in range(0, len(seq))) return any(seq[x:x+len(subseq)] == subseq for x in range(0, len(seq)))
class FakeLxcAttachTest(testlib.RouterMixin, unittest2.TestCase): class ConstructorTest(testlib.RouterMixin, unittest2.TestCase):
def test_okay(self): def test_okay(self):
lxc_attach_path = testlib.data_path('fake_lxc_attach.py') lxc_attach_path = testlib.data_path('stubs/lxc-attach.py')
context = self.router.lxc( context = self.router.lxc(
container='container_name', container='container_name',
lxc_attach_path=lxc_attach_path, lxc_attach_path=lxc_attach_path,

@ -7,9 +7,9 @@ import unittest2
import testlib import testlib
class FakeLxcTest(testlib.RouterMixin, unittest2.TestCase): class ConstructorTest(testlib.RouterMixin, unittest2.TestCase):
def test_okay(self): def test_okay(self):
lxc_path = testlib.data_path('fake_lxc.py') lxc_path = testlib.data_path('stubs/lxc.py')
context = self.router.lxd( context = self.router.lxd(
container='container_name', container='container_name',
lxc_path=lxc_path, lxc_path=lxc_path,

@ -30,6 +30,11 @@ def wait_for_child(pid, timeout=1.0):
assert False, "wait_for_child() timed out" assert False, "wait_for_child() timed out"
@mitogen.core.takes_econtext
def call_func_in_sibling(ctx, econtext):
ctx.call(time.sleep, 99999)
class GetDefaultRemoteNameTest(testlib.TestCase): class GetDefaultRemoteNameTest(testlib.TestCase):
func = staticmethod(mitogen.parent.get_default_remote_name) func = staticmethod(mitogen.parent.get_default_remote_name)
@ -74,7 +79,7 @@ class WstatusToStrTest(testlib.TestCase):
(pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0) (pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0)
self.assertEquals( self.assertEquals(
self.func(status), self.func(status),
'exited due to signal %s (SIGKILL)' % (signal.SIGKILL,) 'exited due to signal %s (SIGKILL)' % (int(signal.SIGKILL),)
) )
# can't test SIGSTOP without POSIX sessions rabbithole # can't test SIGSTOP without POSIX sessions rabbithole
@ -298,5 +303,78 @@ class WriteAllTest(unittest2.TestCase):
proc.terminate() proc.terminate()
class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
def test_child_disconnected(self):
# Easy mode: process notices its own directly connected child is
# disconnected.
c1 = self.router.fork()
recv = c1.call_async(time.sleep, 9999)
c1.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_indirect_child_disconnected(self):
# Achievement unlocked: process notices an indirectly connected child
# is disconnected.
c1 = self.router.fork()
c2 = self.router.fork(via=c1)
recv = c2.call_async(time.sleep, 9999)
c2.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_indirect_child_intermediary_disconnected(self):
# Battlefield promotion: process notices indirect child disconnected
# due to an intermediary child disconnecting.
c1 = self.router.fork()
c2 = self.router.fork(via=c1)
recv = c2.call_async(time.sleep, 9999)
c1.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_near_sibling_disconnected(self):
# Hard mode: child notices sibling connected to same parent has
# disconnected.
c1 = self.router.fork()
c2 = self.router.fork()
# Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id
c1.call(mitogen.parent.upgrade_router)
recv = c1.call_async(call_func_in_sibling, c2)
c2.shutdown(wait=True)
e = self.assertRaises(mitogen.core.CallError,
lambda: recv.get().unpickle())
self.assertTrue(e.args[0].startswith(
'mitogen.core.ChannelError: Channel closed by local end.'
))
def test_far_sibling_disconnected(self):
# God mode: child of child notices child of child of parent has
# disconnected.
c1 = self.router.fork()
c11 = self.router.fork(via=c1)
c2 = self.router.fork()
c22 = self.router.fork(via=c2)
# Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id
c11.call(mitogen.parent.upgrade_router)
recv = c11.call_async(call_func_in_sibling, c22)
c22.shutdown(wait=True)
e = self.assertRaises(mitogen.core.CallError,
lambda: recv.get().unpickle())
self.assertTrue(e.args[0].startswith(
'mitogen.core.ChannelError: Channel closed by local end.'
))
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

@ -1,6 +1,7 @@
import logging import logging
import subprocess import subprocess
import time import time
import zlib
import unittest2 import unittest2
@ -189,20 +190,35 @@ class AddHandlerTest(unittest2.TestCase):
self.assertTrue(queue.get(timeout=5).is_dead) self.assertTrue(queue.get(timeout=5).is_dead)
class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase): class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):
klass = mitogen.master.Router klass = mitogen.master.Router
def test_local_exceeded(self): def test_local_exceeded(self):
router = self.klass(broker=self.broker, max_message_size=4096) router = self.klass(broker=self.broker, max_message_size=4096)
recv = mitogen.core.Receiver(router)
logs = testlib.LogCapturer() logs = testlib.LogCapturer()
logs.start() logs.start()
sem = mitogen.core.Latch() # Send message and block for one IO loop, so _async_route can run.
router.route(mitogen.core.Message.pickled(' '*8192)) router.route(mitogen.core.Message.pickled(' '*8192))
router.broker.defer(sem.put, ' ') # wlil always run after _async_route router.broker.defer_sync(lambda: None)
sem.get()
expect = 'message too large (max 4096 bytes)'
self.assertTrue(expect in logs.stop())
def test_local_dead_message(self):
# Local router should generate dead message when reply_to is set.
router = self.klass(broker=self.broker, max_message_size=4096)
logs = testlib.LogCapturer()
logs.start()
# Try function call. Receiver should be woken by a dead message sent by
# router due to message size exceeded.
child = router.fork()
e = self.assertRaises(mitogen.core.ChannelError,
lambda: child.call(zlib.crc32, ' '*8192))
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
expect = 'message too large (max 4096 bytes)' expect = 'message too large (max 4096 bytes)'
self.assertTrue(expect in logs.stop()) self.assertTrue(expect in logs.stop())
@ -314,5 +330,16 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
self.assertTrue('policy refused message: ' in logs.stop()) self.assertTrue('policy refused message: ' in logs.stop())
class EgressIdsTest(testlib.RouterMixin, testlib.TestCase):
def test_egress_ids_populated(self):
# Ensure Stream.egress_ids is populated on message reception.
c1 = self.router.fork()
stream = self.router.stream_by_id(c1.context_id)
self.assertEquals(set(), stream.egress_ids)
c1.call(time.sleep, 0)
self.assertEquals(set([mitogen.context_id]), stream.egress_ids)
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

@ -1,5 +1,6 @@
import os import os
import sys import sys
import tempfile
import mitogen import mitogen
import mitogen.ssh import mitogen.ssh
@ -11,19 +12,36 @@ import testlib
import plain_old_module import plain_old_module
class FakeSshTest(testlib.RouterMixin, unittest2.TestCase): class StubSshMixin(testlib.RouterMixin):
"""
Mix-in that provides :meth:`stub_ssh` executing the stub 'ssh.py'.
"""
def stub_ssh(self, STUBSSH_MODE=None, **kwargs):
os.environ['STUBSSH_MODE'] = str(STUBSSH_MODE)
try:
return self.router.ssh(
hostname='hostname',
username='mitogen__has_sudo',
ssh_path=testlib.data_path('stubs/ssh.py'),
**kwargs
)
finally:
del os.environ['STUBSSH_MODE']
class ConstructorTest(testlib.RouterMixin, unittest2.TestCase):
def test_okay(self): def test_okay(self):
context = self.router.ssh( context = self.router.ssh(
hostname='hostname', hostname='hostname',
username='mitogen__has_sudo', username='mitogen__has_sudo',
ssh_path=testlib.data_path('fakessh.py'), ssh_path=testlib.data_path('stubs/ssh.py'),
) )
#context.call(mitogen.utils.log_to_file, '/tmp/log') #context.call(mitogen.utils.log_to_file, '/tmp/log')
#context.call(mitogen.utils.disable_site_packages) #context.call(mitogen.utils.disable_site_packages)
self.assertEquals(3, context.call(plain_old_module.add, 1, 2)) self.assertEquals(3, context.call(plain_old_module.add, 1, 2))
class SshTest(testlib.DockerMixin, unittest2.TestCase): class SshTest(testlib.DockerMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream stream_class = mitogen.ssh.Stream
def test_stream_name(self): def test_stream_name(self):
@ -105,6 +123,47 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
context.call(plain_old_module.get_sentinel_value), context.call(plain_old_module.get_sentinel_value),
) )
def test_enforce_unknown_host_key(self):
fp = tempfile.NamedTemporaryFile()
try:
e = self.assertRaises(mitogen.ssh.HostKeyError,
lambda: self.docker_ssh(
username='mitogen__has_sudo_pubkey',
password='has_sudo_password',
ssh_args=['-o', 'UserKnownHostsFile ' + fp.name],
check_host_keys='enforce',
)
)
self.assertEquals(e.args[0], mitogen.ssh.Stream.hostkey_failed_msg)
finally:
fp.close()
def test_accept_enforce_host_keys(self):
fp = tempfile.NamedTemporaryFile()
try:
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_args=['-o', 'UserKnownHostsFile ' + fp.name],
check_host_keys='accept',
)
context.shutdown(wait=True)
fp.seek(0)
# Lame test, but we're about to use enforce mode anyway, which
# verifies the file contents.
self.assertTrue(len(fp.read()) > 0)
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_args=['-o', 'UserKnownHostsFile ' + fp.name],
check_host_keys='enforce',
)
context.shutdown(wait=True)
finally:
fp.close()
class BannerTest(testlib.DockerMixin, unittest2.TestCase): class BannerTest(testlib.DockerMixin, unittest2.TestCase):
# Verify the ability to disambiguate random spam appearing in the SSHd's # Verify the ability to disambiguate random spam appearing in the SSHd's
@ -124,39 +183,37 @@ class BannerTest(testlib.DockerMixin, unittest2.TestCase):
self.assertEquals(name, context.name) self.assertEquals(name, context.name)
class RequirePtyTest(testlib.DockerMixin, testlib.TestCase): class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream def test_classic_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.stub_ssh(STUBSSH_MODE='permdenied_classic'))
def fake_ssh(self, FAKESSH_MODE=None, **kwargs): def test_openssh_75_prompt(self):
os.environ['FAKESSH_MODE'] = str(FAKESSH_MODE) self.assertRaises(mitogen.ssh.PasswordError,
try: lambda: self.stub_ssh(STUBSSH_MODE='permdenied_75'))
return self.router.ssh(
hostname='hostname',
username='mitogen__has_sudo', class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase):
ssh_path=testlib.data_path('fakessh.py'), stream_class = mitogen.ssh.Stream
**kwargs
)
finally:
del os.environ['FAKESSH_MODE']
def test_check_host_keys_accept(self): def test_check_host_keys_accept(self):
# required=true, host_key_checking=accept # required=true, host_key_checking=accept
context = self.fake_ssh(FAKESSH_MODE='ask', check_host_keys='accept') context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept')
self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY')) self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_check_host_keys_enforce(self): def test_check_host_keys_enforce(self):
# required=false, host_key_checking=enforce # required=false, host_key_checking=enforce
context = self.fake_ssh(check_host_keys='enforce') context = self.stub_ssh(check_host_keys='enforce')
self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY')) self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_check_host_keys_ignore(self): def test_check_host_keys_ignore(self):
# required=false, host_key_checking=ignore # required=false, host_key_checking=ignore
context = self.fake_ssh(check_host_keys='ignore') context = self.stub_ssh(check_host_keys='ignore')
self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY')) self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_password_present(self): def test_password_present(self):
# required=true, password is not None # required=true, password is not None
context = self.fake_ssh(check_host_keys='ignore', password='willick') context = self.stub_ssh(check_host_keys='ignore', password='willick')
self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY')) self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY'))

@ -14,6 +14,11 @@ import mitogen.core
import mitogen.master import mitogen.master
import mitogen.utils import mitogen.utils
try:
import faulthandler
except ImportError:
pass
try: try:
import urlparse import urlparse
except ImportError: except ImportError:
@ -32,6 +37,9 @@ sys.path.append(DATA_DIR)
if mitogen.is_master: if mitogen.is_master:
mitogen.utils.log_to_file() mitogen.utils.log_to_file()
if faulthandler is not None:
faulthandler.enable()
def data_path(suffix): def data_path(suffix):
path = os.path.join(DATA_DIR, suffix) path = os.path.join(DATA_DIR, suffix)
@ -160,12 +168,12 @@ def sync_with_broker(broker, timeout=10.0):
class CaptureStreamHandler(logging.StreamHandler): class CaptureStreamHandler(logging.StreamHandler):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(CaptureStreamHandler, self).__init__(*args, **kwargs) logging.StreamHandler.__init__(self, *args, **kwargs)
self.msgs = [] self.msgs = []
def emit(self, msg): def emit(self, msg):
self.msgs.append(msg) self.msgs.append(msg)
return super(CaptureStreamHandler, self).emit(msg) logging.StreamHandler.emit(self, msg)
class LogCapturer(object): class LogCapturer(object):

@ -0,0 +1,119 @@
import os
import socket
import sys
import time
import unittest2
import mitogen
import mitogen.fork
import mitogen.master
import mitogen.service
import mitogen.unix
import testlib
class MyService(mitogen.service.Service):
def __init__(self, latch, **kwargs):
super(MyService, self).__init__(**kwargs)
# used to wake up main thread once client has made its request
self.latch = latch
@mitogen.service.expose(policy=mitogen.service.AllowParents())
def ping(self, msg):
self.latch.put(None)
return {
'src_id': msg.src_id,
'auth_id': msg.auth_id,
}
class IsPathDeadTest(unittest2.TestCase):
func = staticmethod(mitogen.unix.is_path_dead)
path = '/tmp/stale-socket'
def test_does_not_exist(self):
self.assertTrue(self.func('/tmp/does-not-exist'))
def make_socket(self):
if os.path.exists(self.path):
os.unlink(self.path)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(self.path)
return s
def test_conn_refused(self):
s = self.make_socket()
s.close()
self.assertTrue(self.func(self.path))
def test_is_alive(self):
s = self.make_socket()
s.listen(5)
self.assertFalse(self.func(self.path))
s.close()
os.unlink(self.path)
class ListenerTest(testlib.RouterMixin, unittest2.TestCase):
klass = mitogen.unix.Listener
def test_constructor_basic(self):
listener = self.klass(router=self.router)
self.assertFalse(mitogen.unix.is_path_dead(listener.path))
os.unlink(listener.path)
class ClientTest(unittest2.TestCase):
klass = mitogen.unix.Listener
def _try_connect(self, path):
# give server a chance to setup listener
for x in range(10):
try:
return mitogen.unix.connect(path)
except socket.error:
if x == 9:
raise
time.sleep(0.1)
def _test_simple_client(self, path):
router, context = self._try_connect(path)
self.assertEquals(0, context.context_id)
self.assertEquals(1, mitogen.context_id)
self.assertEquals(0, mitogen.parent_id)
resp = context.call_service(service_name=MyService, method_name='ping')
self.assertEquals(mitogen.context_id, resp['src_id'])
self.assertEquals(0, resp['auth_id'])
def _test_simple_server(self, path):
router = mitogen.master.Router()
latch = mitogen.core.Latch()
try:
try:
listener = self.klass(path=path, router=router)
pool = mitogen.service.Pool(router=router, services=[
MyService(latch=latch, router=router),
])
latch.get()
# give broker a chance to deliver service resopnse
time.sleep(0.1)
finally:
pool.shutdown()
router.broker.shutdown()
finally:
os._exit(0)
def test_simple(self):
path = mitogen.unix.make_socket_path()
if os.fork():
self._test_simple_client(path)
else:
mitogen.fork.on_fork()
self._test_simple_server(path)
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save