Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #615: ensure 4GB max_message_size is configured for task workers.
  issue #615: update Changelog.
  issue #615: route a dead message to recipients when no reply is expected
  issue #615: fetch_file() might be called with AnsibleUnicode.
  issue #615: redirect 'fetch' action to 'mitogen_fetch'.
  issue #615: extricate slurp brainwrong from mitogen_fetch
  issue #615: ansible: import Ansible fetch.py action plug-in
  issue #533: include object identity of Stream in repr()
  docs: lots more changelog
  issue #595: add buildah to docs and changelog.
  docs: a few more internals.rst additions
new-serialization
David Wilson 5 years ago
commit e701fae41d

@ -953,7 +953,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self._connect()
ansible_mitogen.target.transfer_file(
context=self.context,
in_path=in_path,
# in_path may be AnsibleUnicode
in_path=mitogen.utils.cast(in_path),
out_path=out_path
)

@ -0,0 +1,167 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ansible.module_utils._text import to_bytes
from ansible.module_utils.six import string_types
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.plugins.action import ActionBase
from ansible.utils.hashing import checksum, md5, secure_hash
from ansible.utils.path import makedirs_safe
REMOTE_CHECKSUM_ERRORS = {
'0': "unable to calculate the checksum of the remote file",
'1': "the remote file does not exist",
'2': "no read permission on remote file",
'3': "remote file is a directory, fetch cannot work on directories",
'4': "python isn't present on the system. Unable to compute checksum",
'5': "stdlib json was not found on the remote machine. Only the raw module can work without those installed",
}
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
''' handler for fetch operations '''
if task_vars is None:
task_vars = dict()
result = super(ActionModule, self).run(tmp, task_vars)
del tmp # tmp no longer has any effect
try:
if self._play_context.check_mode:
result['skipped'] = True
result['msg'] = 'check mode not (yet) supported for this module'
return result
source = self._task.args.get('src', None)
dest = self._task.args.get('dest', None)
flat = boolean(self._task.args.get('flat'), strict=False)
fail_on_missing = boolean(self._task.args.get('fail_on_missing', True), strict=False)
validate_checksum = boolean(self._task.args.get('validate_checksum', True), strict=False)
# validate source and dest are strings FIXME: use basic.py and module specs
if not isinstance(source, string_types):
result['msg'] = "Invalid type supplied for source option, it must be a string"
if not isinstance(dest, string_types):
result['msg'] = "Invalid type supplied for dest option, it must be a string"
if source is None or dest is None:
result['msg'] = "src and dest are required"
if result.get('msg'):
result['failed'] = True
return result
source = self._connection._shell.join_path(source)
source = self._remote_expand_user(source)
# calculate checksum for the remote file, don't bother if using
# become as slurp will be used Force remote_checksum to follow
# symlinks because fetch always follows symlinks
remote_checksum = self._remote_checksum(source, all_vars=task_vars, follow=True)
# calculate the destination name
if os.path.sep not in self._connection._shell.join_path('a', ''):
source = self._connection._shell._unquote(source)
source_local = source.replace('\\', '/')
else:
source_local = source
dest = os.path.expanduser(dest)
if flat:
if os.path.isdir(to_bytes(dest, errors='surrogate_or_strict')) and not dest.endswith(os.sep):
result['msg'] = "dest is an existing directory, use a trailing slash if you want to fetch src into that directory"
result['file'] = dest
result['failed'] = True
return result
if dest.endswith(os.sep):
# if the path ends with "/", we'll use the source filename as the
# destination filename
base = os.path.basename(source_local)
dest = os.path.join(dest, base)
if not dest.startswith("/"):
# if dest does not start with "/", we'll assume a relative path
dest = self._loader.path_dwim(dest)
else:
# files are saved in dest dir, with a subdir for each host, then the filename
if 'inventory_hostname' in task_vars:
target_name = task_vars['inventory_hostname']
else:
target_name = self._play_context.remote_addr
dest = "%s/%s/%s" % (self._loader.path_dwim(dest), target_name, source_local)
dest = dest.replace("//", "/")
if remote_checksum in REMOTE_CHECKSUM_ERRORS:
result['changed'] = False
result['file'] = source
result['msg'] = REMOTE_CHECKSUM_ERRORS[remote_checksum]
# Historically, these don't fail because you may want to transfer
# a log file that possibly MAY exist but keep going to fetch other
# log files. Today, this is better achieved by adding
# ignore_errors or failed_when to the task. Control the behaviour
# via fail_when_missing
if fail_on_missing:
result['failed'] = True
del result['changed']
else:
result['msg'] += ", not transferring, ignored"
return result
# calculate checksum for the local file
local_checksum = checksum(dest)
if remote_checksum != local_checksum:
# create the containing directories, if needed
makedirs_safe(os.path.dirname(dest))
# fetch the file and check for changes
self._connection.fetch_file(source, dest)
new_checksum = secure_hash(dest)
# For backwards compatibility. We'll return None on FIPS enabled systems
try:
new_md5 = md5(dest)
except ValueError:
new_md5 = None
if validate_checksum and new_checksum != remote_checksum:
result.update(dict(failed=True, md5sum=new_md5,
msg="checksum mismatch", file=source, dest=dest, remote_md5sum=None,
checksum=new_checksum, remote_checksum=remote_checksum))
else:
result.update({'changed': True, 'md5sum': new_md5, 'dest': dest,
'remote_md5sum': None, 'checksum': new_checksum,
'remote_checksum': remote_checksum})
else:
# For backwards compatibility. We'll return None on FIPS enabled systems
try:
local_md5 = md5(dest)
except ValueError:
local_md5 = None
result.update(dict(changed=False, md5sum=local_md5, file=source, dest=dest, checksum=local_checksum))
finally:
self._remove_tmp_path(self._connection._shell.tmpdir)
return result

@ -72,6 +72,8 @@ ANSIBLE_PKG_OVERRIDE = (
u"__author__ = %r\n"
)
MAX_MESSAGE_SIZE = 4096 * 1048576
worker_model_msg = (
'Mitogen connection types may only be instantiated when one of the '
'"mitogen_*" or "operon_*" strategies are active.'
@ -502,6 +504,7 @@ class ClassicWorkerModel(WorkerModel):
# with_items loops.
raise ansible.errors.AnsibleError(shutting_down_msg % (e,))
self.router.max_message_size = MAX_MESSAGE_SIZE
self.listener_path = path
def _on_process_exit(self):
@ -692,7 +695,7 @@ class MuxProcess(object):
self.broker = mitogen.master.Broker(install_watcher=False)
self.router = mitogen.master.Router(
broker=self.broker,
max_message_size=4096 * 1048576,
max_message_size=MAX_MESSAGE_SIZE,
)
_setup_responder(self.router.responder)
mitogen.core.listen(self.broker, 'shutdown', self._on_broker_shutdown)

@ -127,6 +127,8 @@ def wrap_action_loader__get(name, *args, **kwargs):
action plugins outside the Ansible tree.
"""
get_kwargs = {'class_only': True}
if name in ('fetch',):
name = 'mitogen_' + name
if ansible.__version__ >= '2.8':
get_kwargs['collection_list'] = kwargs.pop('collection_list', None)

@ -175,7 +175,8 @@ Noteworthy Differences
your_ssh_username = (ALL) NOPASSWD:/usr/bin/python -c*
* The `docker <https://docs.ansible.com/ansible/2.6/plugins/connection/docker.html>`_,
* The `buildah <https://docs.ansible.com/ansible/latest/plugins/connection/buildah.html>`_,
`docker <https://docs.ansible.com/ansible/2.6/plugins/connection/docker.html>`_,
`jail <https://docs.ansible.com/ansible/2.6/plugins/connection/jail.html>`_,
`kubectl <https://docs.ansible.com/ansible/2.6/plugins/connection/kubectl.html>`_,
`local <https://docs.ansible.com/ansible/2.6/plugins/connection/local.html>`_,
@ -722,6 +723,19 @@ establishment of additional reuseable interpreters as necessary to match the
configuration of each task.
.. _method-buildah:
Buildah
~~~~~~~
Like `buildah
<https://docs.ansible.com/ansible/2.6/plugins/connection/buildah.html>`_ except
connection delegation is supported.
* ``ansible_host``: Name of Buildah container (default: inventory hostname).
* ``ansible_user``: Name of user within the container to execute as.
.. _doas:
Doas

@ -87,6 +87,20 @@ Router Class
Connection Methods
==================
.. currentmodule:: mitogen.parent
.. method:: Router.buildah (container=None, buildah_path=None, username=None, \**kwargs)
Construct a context on the local machine over a ``buildah`` invocation.
Accepts all parameters accepted by :meth:`local`, in addition to:
:param str container:
The name of the Buildah container to connect to.
:param str doas_path:
Filename or complete path to the ``buildah`` binary. ``PATH`` will be
searched if given as a filename. Defaults to ``buildah``.
:param str username:
Username to use, defaults to unset.
.. currentmodule:: mitogen.parent
.. method:: Router.fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None)

@ -32,25 +32,32 @@ Enhancements
<https://docs.ansible.com/ansible/latest/reference_appendices/interpreter_discovery.html>`_
are not yet handled.
* The ``MITOGEN_CPU_COUNT`` environment variable shards the connection
multiplexer into per-CPU workers. This improves throughput for large runs
especially involving file transfer, and is a prerequisite for future
in-process SSH support. One multiplexer starts by default, to match existing
behaviour.
* `Operon <https://networkgenomics.com/operon/>`_ no longer requires a custom
installation, both Operon and Ansible are supported by a unified release.
* `#419 <https://github.com/dw/mitogen/issues/419>`_,
`#470 <https://github.com/dw/mitogen/issues/470>`_, file descriptor usage
during large runs is halved, as it is no longer necessary to manage read and
write sides distinctly in order to work around a design limitation.
write sides distinctly in order to work around a design problem.
* `#419 <https://github.com/dw/mitogen/issues/419>`_: almost all connection
setup happens on one thread, reducing GIL contention and context switching
early in a run.
setup happens on one thread, reducing contention and context switching early
in a run.
* `#419 <https://github.com/dw/mitogen/issues/419>`_: Connection setup is
pipelined, eliminating several network round-trips. Most infrastructure is in
place to support future removal of the final round-trip between a target
fully booting and receiving its first function call.
better pipelined, eliminating some network round-trips. Most infrastructure
is in place to support future removal of the final round-trips between a
target fully booting and receiving function calls.
* `#595 <https://github.com/dw/mitogen/pull/595>`_: the
:meth:`Router.buildah() <mitogen.parent.Router.buildah>` connection method is
available to manipulate `Buildah <https://buildah.io/>`_ containers, and is
exposed to Ansible as the ``buildah`` transport.
* The ``MITOGEN_CPU_COUNT`` environment variable shards the connection
multiplexer into per-CPU workers. This may improve throughput for runs
involving large file transfers, and is required for future in-process SSH
support. One multiplexer starts by default, to match existing behaviour.
* `d6faff06 <https://github.com/dw/mitogen/commit/d6faff06>`_,
`807cbef9 <https://github.com/dw/mitogen/commit/807cbef9>`_,
@ -193,6 +200,14 @@ Core Library
* `#612 <https://github.com/dw/mitogen/issues/612>`_: fix various errors
introduced by stream refactoring.
* `#615 <https://github.com/dw/mitogen/issues/615>`_: when routing fails to
deliver a message for some reason other than the sender cannot or should not
reach the recipient, and no reply-to address is present on the message,
instead send a dead message to the original recipient. This ensures a
descriptive messages is delivered to a thread sleeping on the reply to a
function call, where the reply might be dropped due to exceeding the maximum
configured message size.
* `a5536c35 <https://github.com/dw/mitogen/commit/a5536c35>`_: avoid quadratic
buffer management when logging lines received from a child's redirected
standard IO.
@ -230,6 +245,7 @@ bug reports, testing, features and fixes in this release contributed by
`El Mehdi CHAOUKI <https://github.com/elmchaouki>`_,
`Florent Dutheil <https://github.com/fdutheil>`_,
`James Hogarth <https://github.com/hogarthj>`_,
`Jordan Webb <https://github.com/jordemort>`_,
`Marc Hartmayer <https://github.com/marc1006>`_,
`Nigel Metheringham <https://github.com/nigelm>`_,
`Orion Poplawski <https://github.com/opoplawski>`_,

@ -65,6 +65,10 @@ Stream, Side & Protocol
.. autoclass:: Stream
:members:
.. currentmodule:: mitogen.core
.. autoclass:: BufferedWriter
:members:
.. currentmodule:: mitogen.core
.. autoclass:: Side
:members:
@ -81,6 +85,10 @@ Stream, Side & Protocol
.. autoclass:: DelimitedProtocol
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: LogProtocol
:members:
.. currentmodule:: mitogen.core
.. autoclass:: IoLoggerProtocol
:members:

@ -1681,7 +1681,7 @@ class Stream(object):
self.transmit_side = Side(self, wfp)
def __repr__(self):
return "<Stream %s>" % (self.name,)
return "<Stream %s #%04x>" % (self.name, id(self) & 0xffff,)
def on_receive(self, broker):
"""
@ -2112,8 +2112,8 @@ class MitogenProtocol(Protocol):
return False
if msg_len > self._router.max_message_size:
LOG.error('Maximum message size exceeded (got %d, max %d)',
msg_len, self._router.max_message_size)
LOG.error('%r: Maximum message size exceeded (got %d, max %d)',
self, msg_len, self._router.max_message_size)
self.stream.on_disconnect(broker)
return False
@ -2727,9 +2727,9 @@ class Latch(object):
class Waker(Protocol):
"""
:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_.
Used to wake the multiplexer when another thread needs to modify its state
(via a cross-thread function call).
:class:`Protocol` implementing the `UNIX self-pipe trick`_. Used to wake
:class:`Broker` when another thread needs to modify its state, by enqueing
a function call to run on the :class:`Broker` thread.
.. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
"""
@ -3191,28 +3191,55 @@ class Router(object):
fn(Message.dead(self.respondent_disconnect_msg))
del self._handle_map[handle]
def _maybe_send_dead(self, msg, reason, *args):
def _maybe_send_dead(self, unreachable, msg, reason, *args):
"""
Send a dead message to either the original sender or the intended
recipient of `msg`, if the original sender was expecting a reply
(because its `reply_to` was set), otherwise assume the message is a
reply of some sort, and send the dead message to the original
destination.
:param bool unreachable:
If :data:`True`, the recipient is known to be dead or routing
failed due to a security precaution, so don't attempt to fallback
to sending the dead message to the recipient if the original sender
did not include a reply address.
:param mitogen.core.Message msg:
Message that triggered the dead message.
:param str reason:
Human-readable error reason.
:param tuple args:
Elements to interpolate with `reason`.
"""
if args:
reason %= args
LOG.debug('%r: %r is dead: %r', self, msg, reason)
if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead(reason=reason), router=self)
elif not unreachable:
self._async_route(
Message.dead(
dst_id=msg.dst_id,
handle=msg.handle,
reason=reason,
)
)
def _invoke(self, msg, stream):
# IOLOG.debug('%r._invoke(%r)', self, msg)
try:
persist, fn, policy, respondent = self._handle_map[msg.handle]
except KeyError:
self._maybe_send_dead(msg, reason=self.invalid_handle_msg)
self._maybe_send_dead(True, msg, reason=self.invalid_handle_msg)
return
if respondent and not (msg.is_dead or
msg.src_id == respondent.context_id):
self._maybe_send_dead(msg, 'reply from unexpected context')
self._maybe_send_dead(True, msg, 'reply from unexpected context')
return
if policy and not policy(msg, stream):
self._maybe_send_dead(msg, self.refused_msg)
self._maybe_send_dead(True, msg, self.refused_msg)
return
if not persist:
@ -3240,7 +3267,7 @@ class Router(object):
_vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream)
if len(msg.data) > self.max_message_size:
self._maybe_send_dead(msg, self.too_large_msg % (
self._maybe_send_dead(False, msg, self.too_large_msg % (
self.max_message_size,
))
return
@ -3275,14 +3302,14 @@ class Router(object):
out_stream = self._stream_by_id.get(mitogen.parent_id)
if out_stream is None:
self._maybe_send_dead(msg, self.no_route_msg,
self._maybe_send_dead(True, msg, self.no_route_msg,
msg.dst_id, mitogen.context_id)
return
if in_stream and self.unidirectional and not \
(in_stream.protocol.is_privileged or
out_stream.protocol.is_privileged):
self._maybe_send_dead(msg, self.unidirectional_msg,
self._maybe_send_dead(True, msg, self.unidirectional_msg,
in_stream.protocol.remote_id,
out_stream.protocol.remote_id,
mitogen.context_id)

@ -1250,6 +1250,9 @@ class LogProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
written to it.
"""
def on_line_received(self, line):
"""
Read a line, decode it as UTF-8, and log it.
"""
super(LogProtocol, self).on_line_received(line)
LOG.info(u'%s: %s', self.stream.name, line.decode('utf-8', 'replace'))

@ -11,6 +11,7 @@ import mitogen.core
import mitogen.master
import mitogen.parent
import mitogen.utils
from mitogen.core import b
try:
import Queue
@ -258,6 +259,23 @@ class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):
self.assertTrue(expect in logs.stop())
def test_remote_dead_message(self):
# Router should send dead message to original recipient when reply_to
# is unset.
router = self.klass(broker=self.broker, max_message_size=4096)
# Try function call. Receiver should be woken by a dead message sent by
# router due to message size exceeded.
child = router.local()
recv = mitogen.core.Receiver(router)
recv.to_sender().send(b('x') * 4097)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get().unpickle()
)
expect = router.too_large_msg % (4096,)
self.assertEquals(e.args[0], expect)
def test_remote_configured(self):
router = self.klass(broker=self.broker, max_message_size=64*1024)
remote = router.local()
@ -510,7 +528,7 @@ class ShutdownTest(testlib.RouterMixin, testlib.TestCase):
mitogen.context_id,
))
def test_disconnet_all(self):
def test_disconnect_all(self):
l1 = self.router.local()
l2 = self.router.local()

Loading…
Cancel
Save