From 219a202a824de8f40f79f2aabbf6531666ed2140 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 28 Apr 2018 20:11:03 +0100 Subject: [PATCH 1/5] issue #226: ansible: file transfer improvements * put_data() supports setting mode and times. * put_file() refuses to copy non-regular files (sockets, FIFOs). * put_file() saves one RTT for <32KiB files by using put_data() and embedding file content in argument list. * FileService returns dict with size/mode/owner/group/mtime/atime. * FileService refuses to copy non-regular files. * transfer_file() preserves file mode. * transfer_file() preserves atime/mtime. * transfer_file() optionally preserves ownership. * transfer_file() optionally calls fsync(). * transfer_file() uses unique temporary file name to avoid conflicting with parallel transfers. * transfer_file() ensures temporary file is deleted on any error. * write_path() writes to a temporary file and deletes it on failure. * write_path() uses unique temporary file name to avoid conflicting with parallel transfers. * write_path() supports setting symbolic owner/group. * write_path() optionally calls fsync(). * write_path() supports setting symbolic mode/mtime/atime. Closes #226, #227, #229 --- ansible_mitogen/connection.py | 26 ++++++++- ansible_mitogen/services.py | 46 ++++++++++++---- ansible_mitogen/target.py | 100 ++++++++++++++++++++++++++++------ 3 files changed, 143 insertions(+), 29 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 761eb8ea..eee0ac0e 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -30,6 +30,7 @@ from __future__ import absolute_import import logging import os import shlex +import stat import sys import time @@ -470,7 +471,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): mitogen.utils.cast(in_path)) ansible_mitogen.target.write_path(out_path, output) - def put_data(self, out_path, data): + def put_data(self, out_path, data, mode=None, utimes=None): """ Implement put_file() by caling the corresponding ansible_mitogen.target function in the target. @@ -482,7 +483,9 @@ class Connection(ansible.plugins.connection.ConnectionBase): """ self.call(ansible_mitogen.target.write_path, mitogen.utils.cast(out_path), - mitogen.utils.cast(data)) + mitogen.utils.cast(data), + mode=mode, + utimes=utimes) def put_file(self, in_path, out_path): """ @@ -494,6 +497,25 @@ class Connection(ansible.plugins.connection.ConnectionBase): :param str out_path: Remote filesystem path to write. """ + st = os.stat(in_path) + if not stat.S_ISREG(st.st_mode): + raise IOError('%r is not a regular file.' % (in_path,)) + + # If the file is sufficiently small, just ship it in the argument list + # rather than introducing an extra RTT for the child to request it from + # FileService. + if st.st_size <= 32768: + fp = open(in_path, 'rb') + try: + s = fp.read(32769) + finally: + fp.close() + + # Ensure file was not growing during call. + if len(s) == st.st_size: + return self.put_data(out_path, s, mode=st.st_mode, + utimes=(st.st_atime, st.st_mtime)) + mitogen.service.call( context=self.parent, handle=ansible_mitogen.services.FileService.handle, diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 57f19ebd..c3157bf0 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -38,11 +38,12 @@ when a child has completed a job. """ from __future__ import absolute_import -import hashlib +import grp import logging import os import os.path -import pprint +import pwd +import stat import sys import threading import zlib @@ -435,7 +436,7 @@ class FileService(mitogen.service.Service): def __init__(self, router): super(FileService, self).__init__(router) #: Mapping of registered path -> file size. - self._size_by_path = {} + self._metadata_by_path = {} #: Queue used to communicate from service to scheduler thread. self._queue = mitogen.core.Latch() #: Mapping of Stream->[(Sender, file object)]. @@ -545,6 +546,12 @@ class FileService(mitogen.service.Service): sender.close() fp.close() + def _name_or_none(self, func, n, attr): + try: + return getattr(func(n), attr) + except KeyError: + return None + @mitogen.service.expose(policy=mitogen.service.AllowParents()) @mitogen.service.arg_spec({ 'path': basestring @@ -557,9 +564,22 @@ class FileService(mitogen.service.Service): :param str path: File path. """ - if path not in self._size_by_path: - LOG.debug('%r: registering %r', self, path) - self._size_by_path[path] = os.path.getsize(path) + if path in self._metadata_by_path: + return + + st = os.stat(path) + if not stat.S_ISREG(st.st_mode): + raise IOError('%r is not a regular file.' % (in_path,)) + + LOG.debug('%r: registering %r', self, path) + self._metadata_by_path[path] = { + 'size': st.st_size, + 'mode': st.st_mode, + 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), + 'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'), + 'mtime': st.st_mtime, + 'atime': st.st_atime, + } @mitogen.service.expose(policy=mitogen.service.AllowAny()) @mitogen.service.arg_spec({ @@ -575,15 +595,21 @@ class FileService(mitogen.service.Service): :param mitogen.core.Sender sender: Sender to receive file data. :returns: - File size. The target can decide whether to keep the file in RAM or - disk based on the return value. + Dict containing the file metadata: + + * ``size``: File size in bytes. + * ``mode``: Integer file mode. + * ``owner``: Owner account name on host machine. + * ``group``: Owner group name on host machine. + * ``mtime``: Floating point modification time. + * ``ctime``: Floating point change time. :raises mitogen.core.CallError: The path was not registered. """ - if path not in self._size_by_path: + if path not in self._metadata_by_path: raise mitogen.core.CallError(self.unregistered_msg) LOG.debug('Serving %r', path) fp = open(path, 'rb', mitogen.core.CHUNK_SIZE) self._queue.put((sender, fp)) - return self._size_by_path[path] + return self._metadata_by_path[path] diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index ee779546..4b66745f 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -33,6 +33,7 @@ for file transfer, module execution and sundry bits like changing file modes. from __future__ import absolute_import import cStringIO +import grp import json import logging import operator @@ -85,7 +86,7 @@ def _get_file(context, path, out_fp): LOG.debug('_get_file(): fetching %r from %r', path, context) t0 = time.time() recv = mitogen.core.Receiver(router=context.router) - size = mitogen.service.call( + metadata = mitogen.service.call( context=context, handle=ansible_mitogen.services.FileService.handle, method='fetch', @@ -100,13 +101,14 @@ def _get_file(context, path, out_fp): LOG.debug('_get_file(%r): received %d bytes', path, len(s)) out_fp.write(s) - if out_fp.tell() != size: + ok = out_fp.tell() == metadata['size'] + if not ok: LOG.error('get_file(%r): receiver was closed early, controller ' 'is likely shutting down.', path) LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', - size, path, context, 1000*(time.time() - t0)) - return out_fp.tell() == size + metadata['size'], path, context, 1000*(time.time() - t0)) + return ok, metadata def get_file(context, path): @@ -125,13 +127,14 @@ def get_file(context, path): """ if path not in _file_cache: io = cStringIO.StringIO() - if not _get_file(context, path, io): + ok, metadata = _get_file(context, path, io) + if not ok: raise IOError('transfer of %r was interrupted.' % (path,)) _file_cache[path] = io.getvalue() return _file_cache[path] -def transfer_file(context, in_path, out_path): +def transfer_file(context, in_path, out_path, sync=False, set_owner=False): """ Streamily download a file from the connection multiplexer process in the controller. @@ -143,19 +146,42 @@ def transfer_file(context, in_path, out_path): FileService registered name of the input file. :param bytes out_path: Name of the output path on the local disk. - """ - fp = open(out_path+'.tmp', 'wb', mitogen.core.CHUNK_SIZE) + :param bool sync: + If :data:`True`, ensure the file content and metadat are fully on disk + before renaming the temporary file over the existing file. This should + ensure in the case of system crash, either the entire old or new file + are visible post-reboot. + :param bool set_owner: + If :data:`True`, look up the metadata username and group on the local + system and file the file owner using :func:`os.fchmod`. + """ + out_path = os.path.abspath(out_path) + fd, tmp_path = tempfile.mkstemp(suffix='.tmp', + prefix='.ansible_mitogen_transfer-', + dir=os.path.dirname(out_path)) + fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE) + LOG.debug('transfer_file(out_path=%r) tempory file: %s', out_path, tmp_path) + try: try: - if not _get_file(context, in_path, fp): + os.fchmod(tmp_path, metadata['mode']) + if set_owner: + set_fd_owner(fp.fileno(), metadata['owner'], metadata['group']) + + ok, metadata = _get_file(context, in_path, fp) + if not ok: raise IOError('transfer of %r was interrupted.' % (in_path,)) - except Exception: - os.unlink(fp.name) - raise - finally: - fp.close() + finally: + fp.close() + + if sync: + os.fsync(fp.fileno()) + os.rename(tmp_path, out_path) + except: + os.unlink(tmp_path) + raise - os.rename(out_path + '.tmp', out_path) + os.utime(out_path, (metadata['atime'], metadata['mtime'])) @mitogen.core.takes_econtext @@ -392,11 +418,51 @@ def read_path(path): return open(path, 'rb').read() -def write_path(path, s): +def set_fd_owner(fd, owner, group=None): + if owner: + uid = pwd.getpwnam(owner).pw_uid + else: + uid = os.geteuid() + + if group: + gid = grp.getgrnam(group).gr_gid + else: + gid = os.getegid() + + os.fchown(fd, (uid, gid)) + + +def write_path(path, s, owner=None, group=None, mode=None, + utimes=None, sync=False): """ Writes bytes `s` to a filesystem `path`. """ - open(path, 'wb').write(s) + path = os.path.abspath(path) + fd, tmp_path = tempfile.mkstemp(suffix='.tmp', + prefix='.ansible_mitogen_transfer-', + dir=os.path.dirname(path)) + fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE) + LOG.debug('write_path(path=%r) tempory file: %s', path, tmp_path) + + try: + try: + if mode: + os.fchmod(tmp_path, mode) + if owner or group: + set_fd_owner(fp.fileno(), owner, group) + fp.write(s) + finally: + fp.close() + + if sync: + os.fsync(fp.fileno()) + os.rename(tmp_path, out_path) + except: + os.unlink(tmp_path) + raise + + if utimes: + os.utime(out_path, utimes) CHMOD_CLAUSE_PAT = re.compile(r'([uoga]*)([+\-=])([ugo]|[rwx]*)') From 58d8f60f570badb59de0030bcabeb86ae6cca552 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 28 Apr 2018 21:36:51 +0100 Subject: [PATCH 2/5] docs: better connection type docs --- docs/ansible.rst | 94 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/docs/ansible.rst b/docs/ansible.rst index 7131df31..71607d7a 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -122,8 +122,8 @@ Noteworthy Differences `lxc `_, `lxd `_, and `ssh `_ - connection types are available, with more planned. File bugs to register - interest. + built-in connection types are supported, along with a Mitogen-specific + ``setns`` container type. File bugs to register interest in more. * Local commands execute in a reuseable interpreter created identically to interpreters on targets. Presently one interpreter per ``become_user`` @@ -436,27 +436,23 @@ equivalent semantics. This allows: grew support for Paramiko. -Supported Variables -------------------- +Connection Types +---------------- -Matching Ansible's model, variables are treated on a per-task basis, causing +Matching Ansible, connection variables are treated on a per-task basis, causing establishment of additional reuseable interpreters as necessary to match the configuration of each task. -SSH -~~~ +Docker +~~~~~~ -This list will grow as more missing pieces are discovered. +Behaves like `docker +`_ except +connection delegation is supported. -* ``ansible_ssh_timeout`` -* ``ansible_host``, ``ansible_ssh_host`` -* ``ansible_user``, ``ansible_ssh_user`` -* ``ansible_port``, ``ssh_port`` -* ``ansible_ssh_executable``, ``ssh_executable`` -* ``ansible_ssh_private_key_file`` -* ``ansible_ssh_pass``, ``ansible_password`` (default: assume passwordless) -* ``ssh_args``, ``ssh_common_args``, ``ssh_extra_args`` +* ``ansible_host``: Name of Docker container (default: inventory hostname). +* ``ansible_user``: Name of user within the container to execute as. Sudo @@ -470,30 +466,76 @@ Sudo * ansible.cfg: ``timeout`` -Docker -~~~~~~ +Setns +~~~~~ -* ``ansible_host``: Name of Docker container (default: inventory hostname). -* ``ansible_user``: Name of user within the container to execute as. +The ``setns`` method connects to Linux containers via `setns(2) +`_. Unlike ``docker`` and ``lxc`` the +namespace transition is handled directly, ensuring optimal throughput to the +child. This is necessary for ``machinectl`` where only PTY channels are +supported. + +Utility programs must still be installed to discover the PID of the container's +root process. + +* ``mitogen_container_kind``: one of ``docker``, ``lxc`` or ``machinectl``. +* ``ansible_host``: Name of container as it is known to the corresponding tool + (default: inventory hostname). +* ``mitogen_docker_path``: path to Docker if not available on the system path. +* ``mitogen_lxc_info_path``: path to ``lxc-info`` command if not available as + ``/usr/bin/lxc-info``. +* ``mitogen_machinectl_path``: path to ``machinectl`` command if not available + as ``/bin/machinectl``. + + +SSH +~~~ + +Behaves like `ssh +`_ except +connection delegation is supported. + +* ``ansible_ssh_timeout`` +* ``ansible_host``, ``ansible_ssh_host`` +* ``ansible_user``, ``ansible_ssh_user`` +* ``ansible_port``, ``ssh_port`` +* ``ansible_ssh_executable``, ``ssh_executable`` +* ``ansible_ssh_private_key_file`` +* ``ansible_ssh_pass``, ``ansible_password`` (default: assume passwordless) +* ``ssh_args``, ``ssh_common_args``, ``ssh_extra_args`` FreeBSD Jails ~~~~~~~~~~~~~ +Behaves like `jail +`_ except +connection delegation is supported. + * ``ansible_host``: Name of jail (default: inventory hostname). * ``ansible_user``: Name of user within the jail to execute as. +Local +~~~~~ + +Behaves like `local +`_ except +connection delegation is supported. + +* ``ansible_python_interpreter`` + + LXC ~~~ -Both ``lxc`` and ``lxd`` connection plug-ins are hijacked, however the -resulting implementation always uses the ``lxc-attach`` command line tool -rather than the LXC Python bindings, as is usual with the Ansible ``lxd`` -plug-in. +Behaves like `lxc +`_ and `lxd +`_ except +conncetion delegation is supported, and the ``lxc-attach`` tool is always used +rather than the LXC Python bindings, as is usual with the ``lxc`` method. -Consequently the ``lxc-attach`` command is required to be available on the host -machine. +The ``lxc-attach`` command must be available on the host machine. * ``ansible_host``: Name of LXC container (default: inventory hostname). From 780b63520f2f252be8199e891026671f839be0f6 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 28 Apr 2018 21:56:23 +0100 Subject: [PATCH 3/5] issue #226: don't attempt to fchmod() a pathname --- ansible_mitogen/target.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 4b66745f..98615b0e 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -164,7 +164,7 @@ def transfer_file(context, in_path, out_path, sync=False, set_owner=False): try: try: - os.fchmod(tmp_path, metadata['mode']) + os.fchmod(fp.fileno(), metadata['mode']) if set_owner: set_fd_owner(fp.fileno(), metadata['owner'], metadata['group']) @@ -447,7 +447,7 @@ def write_path(path, s, owner=None, group=None, mode=None, try: try: if mode: - os.fchmod(tmp_path, mode) + os.fchmod(fp.fileno(), mode) if owner or group: set_fd_owner(fp.fileno(), owner, group) fp.write(s) From 6edb3f165d72f1c5ee40b23d642c6ab9404e4ebd Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 28 Apr 2018 22:08:35 +0100 Subject: [PATCH 4/5] ansible: avoid a race during shutdown. --- ansible_mitogen/process.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 8febea90..f5dc7be5 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -27,6 +27,7 @@ # POSSIBILITY OF SUCH DAMAGE. from __future__ import absolute_import +import errno import logging import os import socket @@ -167,4 +168,9 @@ class MuxProcess(object): happen explicitly, but Ansible provides no hook to allow it. """ self.pool.stop() - os.unlink(self.listener.path) + try: + os.unlink(self.listener.path) + except OSError, e: + # Prevent a shutdown race with the parent process. + if e.args[0] != errno.ENOENT: + raise From 003f30b5a9e13e46ccf06db9680f5515e4050874 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 28 Apr 2018 22:09:07 +0100 Subject: [PATCH 5/5] issue #226: test fixes. --- ansible_mitogen/target.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 98615b0e..99bb5ec4 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -46,7 +46,6 @@ import subprocess import tempfile import time import traceback -import zlib import ansible.module_utils.json_utils import ansible_mitogen.runner @@ -107,7 +106,7 @@ def _get_file(context, path, out_fp): 'is likely shutting down.', path) LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', - metadata['size'], path, context, 1000*(time.time() - t0)) + metadata['size'], path, context, 1000 * (time.time() - t0)) return ok, metadata @@ -160,24 +159,24 @@ def transfer_file(context, in_path, out_path, sync=False, set_owner=False): prefix='.ansible_mitogen_transfer-', dir=os.path.dirname(out_path)) fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE) - LOG.debug('transfer_file(out_path=%r) tempory file: %s', out_path, tmp_path) + LOG.debug('transfer_file(%r) tempory file: %s', out_path, tmp_path) try: try: - os.fchmod(fp.fileno(), metadata['mode']) - if set_owner: - set_fd_owner(fp.fileno(), metadata['owner'], metadata['group']) - ok, metadata = _get_file(context, in_path, fp) if not ok: raise IOError('transfer of %r was interrupted.' % (in_path,)) + + os.fchmod(fp.fileno(), metadata['mode']) + if set_owner: + set_fd_owner(fp.fileno(), metadata['owner'], metadata['group']) finally: fp.close() if sync: os.fsync(fp.fileno()) os.rename(tmp_path, out_path) - except: + except BaseException: os.unlink(tmp_path) raise @@ -456,13 +455,13 @@ def write_path(path, s, owner=None, group=None, mode=None, if sync: os.fsync(fp.fileno()) - os.rename(tmp_path, out_path) - except: + os.rename(tmp_path, path) + except BaseException: os.unlink(tmp_path) raise if utimes: - os.utime(out_path, utimes) + os.utime(path, utimes) CHMOD_CLAUSE_PAT = re.compile(r'([uoga]*)([+\-=])([ugo]|[rwx]*)')