diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 06a152b2..2dd3bfa9 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -953,7 +953,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): self._connect() ansible_mitogen.target.transfer_file( context=self.context, - in_path=in_path, + # in_path may be AnsibleUnicode + in_path=mitogen.utils.cast(in_path), out_path=out_path ) diff --git a/ansible_mitogen/plugins/action/mitogen_fetch.py b/ansible_mitogen/plugins/action/mitogen_fetch.py new file mode 100644 index 00000000..ffa737e5 --- /dev/null +++ b/ansible_mitogen/plugins/action/mitogen_fetch.py @@ -0,0 +1,167 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +from ansible.module_utils._text import to_bytes +from ansible.module_utils.six import string_types +from ansible.module_utils.parsing.convert_bool import boolean +from ansible.plugins.action import ActionBase +from ansible.utils.hashing import checksum, md5, secure_hash +from ansible.utils.path import makedirs_safe + + +REMOTE_CHECKSUM_ERRORS = { + '0': "unable to calculate the checksum of the remote file", + '1': "the remote file does not exist", + '2': "no read permission on remote file", + '3': "remote file is a directory, fetch cannot work on directories", + '4': "python isn't present on the system. Unable to compute checksum", + '5': "stdlib json was not found on the remote machine. Only the raw module can work without those installed", +} + + +class ActionModule(ActionBase): + + def run(self, tmp=None, task_vars=None): + ''' handler for fetch operations ''' + if task_vars is None: + task_vars = dict() + + result = super(ActionModule, self).run(tmp, task_vars) + del tmp # tmp no longer has any effect + + try: + if self._play_context.check_mode: + result['skipped'] = True + result['msg'] = 'check mode not (yet) supported for this module' + return result + + source = self._task.args.get('src', None) + dest = self._task.args.get('dest', None) + flat = boolean(self._task.args.get('flat'), strict=False) + fail_on_missing = boolean(self._task.args.get('fail_on_missing', True), strict=False) + validate_checksum = boolean(self._task.args.get('validate_checksum', True), strict=False) + + # validate source and dest are strings FIXME: use basic.py and module specs + if not isinstance(source, string_types): + result['msg'] = "Invalid type supplied for source option, it must be a string" + + if not isinstance(dest, string_types): + result['msg'] = "Invalid type supplied for dest option, it must be a string" + + if source is None or dest is None: + result['msg'] = "src and dest are required" + + if result.get('msg'): + result['failed'] = True + return result + + source = self._connection._shell.join_path(source) + source = self._remote_expand_user(source) + + # calculate checksum for the remote file, don't bother if using + # become as slurp will be used Force remote_checksum to follow + # symlinks because fetch always follows symlinks + remote_checksum = self._remote_checksum(source, all_vars=task_vars, follow=True) + + # calculate the destination name + if os.path.sep not in self._connection._shell.join_path('a', ''): + source = self._connection._shell._unquote(source) + source_local = source.replace('\\', '/') + else: + source_local = source + + dest = os.path.expanduser(dest) + if flat: + if os.path.isdir(to_bytes(dest, errors='surrogate_or_strict')) and not dest.endswith(os.sep): + result['msg'] = "dest is an existing directory, use a trailing slash if you want to fetch src into that directory" + result['file'] = dest + result['failed'] = True + return result + if dest.endswith(os.sep): + # if the path ends with "/", we'll use the source filename as the + # destination filename + base = os.path.basename(source_local) + dest = os.path.join(dest, base) + if not dest.startswith("/"): + # if dest does not start with "/", we'll assume a relative path + dest = self._loader.path_dwim(dest) + else: + # files are saved in dest dir, with a subdir for each host, then the filename + if 'inventory_hostname' in task_vars: + target_name = task_vars['inventory_hostname'] + else: + target_name = self._play_context.remote_addr + dest = "%s/%s/%s" % (self._loader.path_dwim(dest), target_name, source_local) + + dest = dest.replace("//", "/") + + if remote_checksum in REMOTE_CHECKSUM_ERRORS: + result['changed'] = False + result['file'] = source + result['msg'] = REMOTE_CHECKSUM_ERRORS[remote_checksum] + # Historically, these don't fail because you may want to transfer + # a log file that possibly MAY exist but keep going to fetch other + # log files. Today, this is better achieved by adding + # ignore_errors or failed_when to the task. Control the behaviour + # via fail_when_missing + if fail_on_missing: + result['failed'] = True + del result['changed'] + else: + result['msg'] += ", not transferring, ignored" + return result + + # calculate checksum for the local file + local_checksum = checksum(dest) + + if remote_checksum != local_checksum: + # create the containing directories, if needed + makedirs_safe(os.path.dirname(dest)) + + # fetch the file and check for changes + self._connection.fetch_file(source, dest) + new_checksum = secure_hash(dest) + # For backwards compatibility. We'll return None on FIPS enabled systems + try: + new_md5 = md5(dest) + except ValueError: + new_md5 = None + + if validate_checksum and new_checksum != remote_checksum: + result.update(dict(failed=True, md5sum=new_md5, + msg="checksum mismatch", file=source, dest=dest, remote_md5sum=None, + checksum=new_checksum, remote_checksum=remote_checksum)) + else: + result.update({'changed': True, 'md5sum': new_md5, 'dest': dest, + 'remote_md5sum': None, 'checksum': new_checksum, + 'remote_checksum': remote_checksum}) + else: + # For backwards compatibility. We'll return None on FIPS enabled systems + try: + local_md5 = md5(dest) + except ValueError: + local_md5 = None + result.update(dict(changed=False, md5sum=local_md5, file=source, dest=dest, checksum=local_checksum)) + + finally: + self._remove_tmp_path(self._connection._shell.tmpdir) + + return result diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 503e9bb7..10d55fdf 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -72,6 +72,8 @@ ANSIBLE_PKG_OVERRIDE = ( u"__author__ = %r\n" ) +MAX_MESSAGE_SIZE = 4096 * 1048576 + worker_model_msg = ( 'Mitogen connection types may only be instantiated when one of the ' '"mitogen_*" or "operon_*" strategies are active.' @@ -502,6 +504,7 @@ class ClassicWorkerModel(WorkerModel): # with_items loops. raise ansible.errors.AnsibleError(shutting_down_msg % (e,)) + self.router.max_message_size = MAX_MESSAGE_SIZE self.listener_path = path def _on_process_exit(self): @@ -692,7 +695,7 @@ class MuxProcess(object): self.broker = mitogen.master.Broker(install_watcher=False) self.router = mitogen.master.Router( broker=self.broker, - max_message_size=4096 * 1048576, + max_message_size=MAX_MESSAGE_SIZE, ) _setup_responder(self.router.responder) mitogen.core.listen(self.broker, 'shutdown', self._on_broker_shutdown) diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index 755b9113..8f093999 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -127,6 +127,8 @@ def wrap_action_loader__get(name, *args, **kwargs): action plugins outside the Ansible tree. """ get_kwargs = {'class_only': True} + if name in ('fetch',): + name = 'mitogen_' + name if ansible.__version__ >= '2.8': get_kwargs['collection_list'] = kwargs.pop('collection_list', None) diff --git a/docs/ansible_detailed.rst b/docs/ansible_detailed.rst index c3bbad59..fba7a86a 100644 --- a/docs/ansible_detailed.rst +++ b/docs/ansible_detailed.rst @@ -175,7 +175,8 @@ Noteworthy Differences your_ssh_username = (ALL) NOPASSWD:/usr/bin/python -c* -* The `docker `_, +* The `buildah `_, + `docker `_, `jail `_, `kubectl `_, `local `_, @@ -722,6 +723,19 @@ establishment of additional reuseable interpreters as necessary to match the configuration of each task. +.. _method-buildah: + +Buildah +~~~~~~~ + +Like `buildah +`_ except +connection delegation is supported. + +* ``ansible_host``: Name of Buildah container (default: inventory hostname). +* ``ansible_user``: Name of user within the container to execute as. + + .. _doas: Doas diff --git a/docs/api.rst b/docs/api.rst index 09aa8582..7ab3274e 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -87,6 +87,20 @@ Router Class Connection Methods ================== +.. currentmodule:: mitogen.parent +.. method:: Router.buildah (container=None, buildah_path=None, username=None, \**kwargs) + + Construct a context on the local machine over a ``buildah`` invocation. + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + The name of the Buildah container to connect to. + :param str doas_path: + Filename or complete path to the ``buildah`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``buildah``. + :param str username: + Username to use, defaults to unset. + .. currentmodule:: mitogen.parent .. method:: Router.fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) diff --git a/docs/changelog.rst b/docs/changelog.rst index 77c311c0..7f0ff234 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -32,25 +32,32 @@ Enhancements `_ are not yet handled. -* The ``MITOGEN_CPU_COUNT`` environment variable shards the connection - multiplexer into per-CPU workers. This improves throughput for large runs - especially involving file transfer, and is a prerequisite for future - in-process SSH support. One multiplexer starts by default, to match existing - behaviour. +* `Operon `_ no longer requires a custom + installation, both Operon and Ansible are supported by a unified release. * `#419 `_, `#470 `_, file descriptor usage during large runs is halved, as it is no longer necessary to manage read and - write sides distinctly in order to work around a design limitation. + write sides distinctly in order to work around a design problem. * `#419 `_: almost all connection - setup happens on one thread, reducing GIL contention and context switching - early in a run. + setup happens on one thread, reducing contention and context switching early + in a run. * `#419 `_: Connection setup is - pipelined, eliminating several network round-trips. Most infrastructure is in - place to support future removal of the final round-trip between a target - fully booting and receiving its first function call. + better pipelined, eliminating some network round-trips. Most infrastructure + is in place to support future removal of the final round-trips between a + target fully booting and receiving function calls. + +* `#595 `_: the + :meth:`Router.buildah() ` connection method is + available to manipulate `Buildah `_ containers, and is + exposed to Ansible as the ``buildah`` transport. + +* The ``MITOGEN_CPU_COUNT`` environment variable shards the connection + multiplexer into per-CPU workers. This may improve throughput for runs + involving large file transfers, and is required for future in-process SSH + support. One multiplexer starts by default, to match existing behaviour. * `d6faff06 `_, `807cbef9 `_, @@ -193,6 +200,14 @@ Core Library * `#612 `_: fix various errors introduced by stream refactoring. +* `#615 `_: when routing fails to + deliver a message for some reason other than the sender cannot or should not + reach the recipient, and no reply-to address is present on the message, + instead send a dead message to the original recipient. This ensures a + descriptive messages is delivered to a thread sleeping on the reply to a + function call, where the reply might be dropped due to exceeding the maximum + configured message size. + * `a5536c35 `_: avoid quadratic buffer management when logging lines received from a child's redirected standard IO. @@ -230,6 +245,7 @@ bug reports, testing, features and fixes in this release contributed by `El Mehdi CHAOUKI `_, `Florent Dutheil `_, `James Hogarth `_, +`Jordan Webb `_, `Marc Hartmayer `_, `Nigel Metheringham `_, `Orion Poplawski `_, diff --git a/docs/internals.rst b/docs/internals.rst index 40ea33df..c3247be0 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -65,6 +65,10 @@ Stream, Side & Protocol .. autoclass:: Stream :members: +.. currentmodule:: mitogen.core +.. autoclass:: BufferedWriter + :members: + .. currentmodule:: mitogen.core .. autoclass:: Side :members: @@ -81,6 +85,10 @@ Stream, Side & Protocol .. autoclass:: DelimitedProtocol :members: +.. currentmodule:: mitogen.parent +.. autoclass:: LogProtocol + :members: + .. currentmodule:: mitogen.core .. autoclass:: IoLoggerProtocol :members: diff --git a/mitogen/core.py b/mitogen/core.py index 2f52ebe4..f9099e9a 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1681,7 +1681,7 @@ class Stream(object): self.transmit_side = Side(self, wfp) def __repr__(self): - return "" % (self.name,) + return "" % (self.name, id(self) & 0xffff,) def on_receive(self, broker): """ @@ -2112,8 +2112,8 @@ class MitogenProtocol(Protocol): return False if msg_len > self._router.max_message_size: - LOG.error('Maximum message size exceeded (got %d, max %d)', - msg_len, self._router.max_message_size) + LOG.error('%r: Maximum message size exceeded (got %d, max %d)', + self, msg_len, self._router.max_message_size) self.stream.on_disconnect(broker) return False @@ -2727,9 +2727,9 @@ class Latch(object): class Waker(Protocol): """ - :class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. - Used to wake the multiplexer when another thread needs to modify its state - (via a cross-thread function call). + :class:`Protocol` implementing the `UNIX self-pipe trick`_. Used to wake + :class:`Broker` when another thread needs to modify its state, by enqueing + a function call to run on the :class:`Broker` thread. .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html """ @@ -3191,28 +3191,55 @@ class Router(object): fn(Message.dead(self.respondent_disconnect_msg)) del self._handle_map[handle] - def _maybe_send_dead(self, msg, reason, *args): + def _maybe_send_dead(self, unreachable, msg, reason, *args): + """ + Send a dead message to either the original sender or the intended + recipient of `msg`, if the original sender was expecting a reply + (because its `reply_to` was set), otherwise assume the message is a + reply of some sort, and send the dead message to the original + destination. + + :param bool unreachable: + If :data:`True`, the recipient is known to be dead or routing + failed due to a security precaution, so don't attempt to fallback + to sending the dead message to the recipient if the original sender + did not include a reply address. + :param mitogen.core.Message msg: + Message that triggered the dead message. + :param str reason: + Human-readable error reason. + :param tuple args: + Elements to interpolate with `reason`. + """ if args: reason %= args LOG.debug('%r: %r is dead: %r', self, msg, reason) if msg.reply_to and not msg.is_dead: msg.reply(Message.dead(reason=reason), router=self) + elif not unreachable: + self._async_route( + Message.dead( + dst_id=msg.dst_id, + handle=msg.handle, + reason=reason, + ) + ) def _invoke(self, msg, stream): # IOLOG.debug('%r._invoke(%r)', self, msg) try: persist, fn, policy, respondent = self._handle_map[msg.handle] except KeyError: - self._maybe_send_dead(msg, reason=self.invalid_handle_msg) + self._maybe_send_dead(True, msg, reason=self.invalid_handle_msg) return if respondent and not (msg.is_dead or msg.src_id == respondent.context_id): - self._maybe_send_dead(msg, 'reply from unexpected context') + self._maybe_send_dead(True, msg, 'reply from unexpected context') return if policy and not policy(msg, stream): - self._maybe_send_dead(msg, self.refused_msg) + self._maybe_send_dead(True, msg, self.refused_msg) return if not persist: @@ -3240,7 +3267,7 @@ class Router(object): _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) if len(msg.data) > self.max_message_size: - self._maybe_send_dead(msg, self.too_large_msg % ( + self._maybe_send_dead(False, msg, self.too_large_msg % ( self.max_message_size, )) return @@ -3275,14 +3302,14 @@ class Router(object): out_stream = self._stream_by_id.get(mitogen.parent_id) if out_stream is None: - self._maybe_send_dead(msg, self.no_route_msg, + self._maybe_send_dead(True, msg, self.no_route_msg, msg.dst_id, mitogen.context_id) return if in_stream and self.unidirectional and not \ (in_stream.protocol.is_privileged or out_stream.protocol.is_privileged): - self._maybe_send_dead(msg, self.unidirectional_msg, + self._maybe_send_dead(True, msg, self.unidirectional_msg, in_stream.protocol.remote_id, out_stream.protocol.remote_id, mitogen.context_id) diff --git a/mitogen/parent.py b/mitogen/parent.py index bc4bfc0d..82b4a7d1 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1250,6 +1250,9 @@ class LogProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol): written to it. """ def on_line_received(self, line): + """ + Read a line, decode it as UTF-8, and log it. + """ super(LogProtocol, self).on_line_received(line) LOG.info(u'%s: %s', self.stream.name, line.decode('utf-8', 'replace')) diff --git a/tests/router_test.py b/tests/router_test.py index 1cde016d..ef3fc4d5 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -11,6 +11,7 @@ import mitogen.core import mitogen.master import mitogen.parent import mitogen.utils +from mitogen.core import b try: import Queue @@ -258,6 +259,23 @@ class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase): self.assertTrue(expect in logs.stop()) + def test_remote_dead_message(self): + # Router should send dead message to original recipient when reply_to + # is unset. + router = self.klass(broker=self.broker, max_message_size=4096) + + # Try function call. Receiver should be woken by a dead message sent by + # router due to message size exceeded. + child = router.local() + recv = mitogen.core.Receiver(router) + + recv.to_sender().send(b('x') * 4097) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get().unpickle() + ) + expect = router.too_large_msg % (4096,) + self.assertEquals(e.args[0], expect) + def test_remote_configured(self): router = self.klass(broker=self.broker, max_message_size=64*1024) remote = router.local() @@ -510,7 +528,7 @@ class ShutdownTest(testlib.RouterMixin, testlib.TestCase): mitogen.context_id, )) - def test_disconnet_all(self): + def test_disconnect_all(self): l1 = self.router.local() l2 = self.router.local()