Merge pull request #231 from dw/dmw

Many file transfer improvements, docs improvements.
pull/246/head
dw 7 years ago committed by GitHub
commit b99b4eb048
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -30,6 +30,7 @@ from __future__ import absolute_import
import logging import logging
import os import os
import shlex import shlex
import stat
import sys import sys
import time import time
@ -470,7 +471,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
mitogen.utils.cast(in_path)) mitogen.utils.cast(in_path))
ansible_mitogen.target.write_path(out_path, output) ansible_mitogen.target.write_path(out_path, output)
def put_data(self, out_path, data): def put_data(self, out_path, data, mode=None, utimes=None):
""" """
Implement put_file() by caling the corresponding Implement put_file() by caling the corresponding
ansible_mitogen.target function in the target. ansible_mitogen.target function in the target.
@ -482,7 +483,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
""" """
self.call(ansible_mitogen.target.write_path, self.call(ansible_mitogen.target.write_path,
mitogen.utils.cast(out_path), mitogen.utils.cast(out_path),
mitogen.utils.cast(data)) mitogen.utils.cast(data),
mode=mode,
utimes=utimes)
def put_file(self, in_path, out_path): def put_file(self, in_path, out_path):
""" """
@ -494,6 +497,25 @@ class Connection(ansible.plugins.connection.ConnectionBase):
:param str out_path: :param str out_path:
Remote filesystem path to write. Remote filesystem path to write.
""" """
st = os.stat(in_path)
if not stat.S_ISREG(st.st_mode):
raise IOError('%r is not a regular file.' % (in_path,))
# If the file is sufficiently small, just ship it in the argument list
# rather than introducing an extra RTT for the child to request it from
# FileService.
if st.st_size <= 32768:
fp = open(in_path, 'rb')
try:
s = fp.read(32769)
finally:
fp.close()
# Ensure file was not growing during call.
if len(s) == st.st_size:
return self.put_data(out_path, s, mode=st.st_mode,
utimes=(st.st_atime, st.st_mtime))
mitogen.service.call( mitogen.service.call(
context=self.parent, context=self.parent,
handle=ansible_mitogen.services.FileService.handle, handle=ansible_mitogen.services.FileService.handle,

@ -27,6 +27,7 @@
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import from __future__ import absolute_import
import errno
import logging import logging
import os import os
import socket import socket
@ -167,4 +168,9 @@ class MuxProcess(object):
happen explicitly, but Ansible provides no hook to allow it. happen explicitly, but Ansible provides no hook to allow it.
""" """
self.pool.stop() self.pool.stop()
os.unlink(self.listener.path) try:
os.unlink(self.listener.path)
except OSError, e:
# Prevent a shutdown race with the parent process.
if e.args[0] != errno.ENOENT:
raise

@ -38,11 +38,12 @@ when a child has completed a job.
""" """
from __future__ import absolute_import from __future__ import absolute_import
import hashlib import grp
import logging import logging
import os import os
import os.path import os.path
import pprint import pwd
import stat
import sys import sys
import threading import threading
import zlib import zlib
@ -435,7 +436,7 @@ class FileService(mitogen.service.Service):
def __init__(self, router): def __init__(self, router):
super(FileService, self).__init__(router) super(FileService, self).__init__(router)
#: Mapping of registered path -> file size. #: Mapping of registered path -> file size.
self._size_by_path = {} self._metadata_by_path = {}
#: Queue used to communicate from service to scheduler thread. #: Queue used to communicate from service to scheduler thread.
self._queue = mitogen.core.Latch() self._queue = mitogen.core.Latch()
#: Mapping of Stream->[(Sender, file object)]. #: Mapping of Stream->[(Sender, file object)].
@ -545,6 +546,12 @@ class FileService(mitogen.service.Service):
sender.close() sender.close()
fp.close() fp.close()
def _name_or_none(self, func, n, attr):
try:
return getattr(func(n), attr)
except KeyError:
return None
@mitogen.service.expose(policy=mitogen.service.AllowParents()) @mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
'path': basestring 'path': basestring
@ -557,9 +564,22 @@ class FileService(mitogen.service.Service):
:param str path: :param str path:
File path. File path.
""" """
if path not in self._size_by_path: if path in self._metadata_by_path:
LOG.debug('%r: registering %r', self, path) return
self._size_by_path[path] = os.path.getsize(path)
st = os.stat(path)
if not stat.S_ISREG(st.st_mode):
raise IOError('%r is not a regular file.' % (in_path,))
LOG.debug('%r: registering %r', self, path)
self._metadata_by_path[path] = {
'size': st.st_size,
'mode': st.st_mode,
'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'),
'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'),
'mtime': st.st_mtime,
'atime': st.st_atime,
}
@mitogen.service.expose(policy=mitogen.service.AllowAny()) @mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
@ -575,15 +595,21 @@ class FileService(mitogen.service.Service):
:param mitogen.core.Sender sender: :param mitogen.core.Sender sender:
Sender to receive file data. Sender to receive file data.
:returns: :returns:
File size. The target can decide whether to keep the file in RAM or Dict containing the file metadata:
disk based on the return value.
* ``size``: File size in bytes.
* ``mode``: Integer file mode.
* ``owner``: Owner account name on host machine.
* ``group``: Owner group name on host machine.
* ``mtime``: Floating point modification time.
* ``ctime``: Floating point change time.
:raises mitogen.core.CallError: :raises mitogen.core.CallError:
The path was not registered. The path was not registered.
""" """
if path not in self._size_by_path: if path not in self._metadata_by_path:
raise mitogen.core.CallError(self.unregistered_msg) raise mitogen.core.CallError(self.unregistered_msg)
LOG.debug('Serving %r', path) LOG.debug('Serving %r', path)
fp = open(path, 'rb', mitogen.core.CHUNK_SIZE) fp = open(path, 'rb', mitogen.core.CHUNK_SIZE)
self._queue.put((sender, fp)) self._queue.put((sender, fp))
return self._size_by_path[path] return self._metadata_by_path[path]

@ -33,6 +33,7 @@ for file transfer, module execution and sundry bits like changing file modes.
from __future__ import absolute_import from __future__ import absolute_import
import cStringIO import cStringIO
import grp
import json import json
import logging import logging
import operator import operator
@ -45,7 +46,6 @@ import subprocess
import tempfile import tempfile
import time import time
import traceback import traceback
import zlib
import ansible.module_utils.json_utils import ansible.module_utils.json_utils
import ansible_mitogen.runner import ansible_mitogen.runner
@ -85,7 +85,7 @@ def _get_file(context, path, out_fp):
LOG.debug('_get_file(): fetching %r from %r', path, context) LOG.debug('_get_file(): fetching %r from %r', path, context)
t0 = time.time() t0 = time.time()
recv = mitogen.core.Receiver(router=context.router) recv = mitogen.core.Receiver(router=context.router)
size = mitogen.service.call( metadata = mitogen.service.call(
context=context, context=context,
handle=ansible_mitogen.services.FileService.handle, handle=ansible_mitogen.services.FileService.handle,
method='fetch', method='fetch',
@ -100,13 +100,14 @@ def _get_file(context, path, out_fp):
LOG.debug('_get_file(%r): received %d bytes', path, len(s)) LOG.debug('_get_file(%r): received %d bytes', path, len(s))
out_fp.write(s) out_fp.write(s)
if out_fp.tell() != size: ok = out_fp.tell() == metadata['size']
if not ok:
LOG.error('get_file(%r): receiver was closed early, controller ' LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down.', path) 'is likely shutting down.', path)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
size, path, context, 1000*(time.time() - t0)) metadata['size'], path, context, 1000 * (time.time() - t0))
return out_fp.tell() == size return ok, metadata
def get_file(context, path): def get_file(context, path):
@ -125,13 +126,14 @@ def get_file(context, path):
""" """
if path not in _file_cache: if path not in _file_cache:
io = cStringIO.StringIO() io = cStringIO.StringIO()
if not _get_file(context, path, io): ok, metadata = _get_file(context, path, io)
if not ok:
raise IOError('transfer of %r was interrupted.' % (path,)) raise IOError('transfer of %r was interrupted.' % (path,))
_file_cache[path] = io.getvalue() _file_cache[path] = io.getvalue()
return _file_cache[path] return _file_cache[path]
def transfer_file(context, in_path, out_path): def transfer_file(context, in_path, out_path, sync=False, set_owner=False):
""" """
Streamily download a file from the connection multiplexer process in the Streamily download a file from the connection multiplexer process in the
controller. controller.
@ -143,19 +145,42 @@ def transfer_file(context, in_path, out_path):
FileService registered name of the input file. FileService registered name of the input file.
:param bytes out_path: :param bytes out_path:
Name of the output path on the local disk. Name of the output path on the local disk.
""" :param bool sync:
fp = open(out_path+'.tmp', 'wb', mitogen.core.CHUNK_SIZE) If :data:`True`, ensure the file content and metadat are fully on disk
before renaming the temporary file over the existing file. This should
ensure in the case of system crash, either the entire old or new file
are visible post-reboot.
:param bool set_owner:
If :data:`True`, look up the metadata username and group on the local
system and file the file owner using :func:`os.fchmod`.
"""
out_path = os.path.abspath(out_path)
fd, tmp_path = tempfile.mkstemp(suffix='.tmp',
prefix='.ansible_mitogen_transfer-',
dir=os.path.dirname(out_path))
fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE)
LOG.debug('transfer_file(%r) tempory file: %s', out_path, tmp_path)
try: try:
try: try:
if not _get_file(context, in_path, fp): ok, metadata = _get_file(context, in_path, fp)
if not ok:
raise IOError('transfer of %r was interrupted.' % (in_path,)) raise IOError('transfer of %r was interrupted.' % (in_path,))
except Exception:
os.unlink(fp.name)
raise
finally:
fp.close()
os.rename(out_path + '.tmp', out_path) os.fchmod(fp.fileno(), metadata['mode'])
if set_owner:
set_fd_owner(fp.fileno(), metadata['owner'], metadata['group'])
finally:
fp.close()
if sync:
os.fsync(fp.fileno())
os.rename(tmp_path, out_path)
except BaseException:
os.unlink(tmp_path)
raise
os.utime(out_path, (metadata['atime'], metadata['mtime']))
@mitogen.core.takes_econtext @mitogen.core.takes_econtext
@ -392,11 +417,51 @@ def read_path(path):
return open(path, 'rb').read() return open(path, 'rb').read()
def write_path(path, s): def set_fd_owner(fd, owner, group=None):
if owner:
uid = pwd.getpwnam(owner).pw_uid
else:
uid = os.geteuid()
if group:
gid = grp.getgrnam(group).gr_gid
else:
gid = os.getegid()
os.fchown(fd, (uid, gid))
def write_path(path, s, owner=None, group=None, mode=None,
utimes=None, sync=False):
""" """
Writes bytes `s` to a filesystem `path`. Writes bytes `s` to a filesystem `path`.
""" """
open(path, 'wb').write(s) path = os.path.abspath(path)
fd, tmp_path = tempfile.mkstemp(suffix='.tmp',
prefix='.ansible_mitogen_transfer-',
dir=os.path.dirname(path))
fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE)
LOG.debug('write_path(path=%r) tempory file: %s', path, tmp_path)
try:
try:
if mode:
os.fchmod(fp.fileno(), mode)
if owner or group:
set_fd_owner(fp.fileno(), owner, group)
fp.write(s)
finally:
fp.close()
if sync:
os.fsync(fp.fileno())
os.rename(tmp_path, path)
except BaseException:
os.unlink(tmp_path)
raise
if utimes:
os.utime(path, utimes)
CHMOD_CLAUSE_PAT = re.compile(r'([uoga]*)([+\-=])([ugo]|[rwx]*)') CHMOD_CLAUSE_PAT = re.compile(r'([uoga]*)([+\-=])([ugo]|[rwx]*)')

@ -122,8 +122,8 @@ Noteworthy Differences
`lxc <https://docs.ansible.com/ansible/2.5/plugins/connection/lxc.html>`_, `lxc <https://docs.ansible.com/ansible/2.5/plugins/connection/lxc.html>`_,
`lxd <https://docs.ansible.com/ansible/2.5/plugins/connection/lxd.html>`_, `lxd <https://docs.ansible.com/ansible/2.5/plugins/connection/lxd.html>`_,
and `ssh <https://docs.ansible.com/ansible/2.5/plugins/connection/ssh.html>`_ and `ssh <https://docs.ansible.com/ansible/2.5/plugins/connection/ssh.html>`_
connection types are available, with more planned. File bugs to register built-in connection types are supported, along with a Mitogen-specific
interest. ``setns`` container type. File bugs to register interest in more.
* Local commands execute in a reuseable interpreter created identically to * Local commands execute in a reuseable interpreter created identically to
interpreters on targets. Presently one interpreter per ``become_user`` interpreters on targets. Presently one interpreter per ``become_user``
@ -436,27 +436,23 @@ equivalent semantics. This allows:
grew support for Paramiko. grew support for Paramiko.
Supported Variables Connection Types
------------------- ----------------
Matching Ansible's model, variables are treated on a per-task basis, causing Matching Ansible, connection variables are treated on a per-task basis, causing
establishment of additional reuseable interpreters as necessary to match the establishment of additional reuseable interpreters as necessary to match the
configuration of each task. configuration of each task.
SSH Docker
~~~ ~~~~~~
This list will grow as more missing pieces are discovered. Behaves like `docker
<https://docs.ansible.com/ansible/2.5/plugins/connection/docker.html>`_ except
connection delegation is supported.
* ``ansible_ssh_timeout`` * ``ansible_host``: Name of Docker container (default: inventory hostname).
* ``ansible_host``, ``ansible_ssh_host`` * ``ansible_user``: Name of user within the container to execute as.
* ``ansible_user``, ``ansible_ssh_user``
* ``ansible_port``, ``ssh_port``
* ``ansible_ssh_executable``, ``ssh_executable``
* ``ansible_ssh_private_key_file``
* ``ansible_ssh_pass``, ``ansible_password`` (default: assume passwordless)
* ``ssh_args``, ``ssh_common_args``, ``ssh_extra_args``
Sudo Sudo
@ -470,30 +466,76 @@ Sudo
* ansible.cfg: ``timeout`` * ansible.cfg: ``timeout``
Docker Setns
~~~~~~ ~~~~~
* ``ansible_host``: Name of Docker container (default: inventory hostname). The ``setns`` method connects to Linux containers via `setns(2)
* ``ansible_user``: Name of user within the container to execute as. <https://linux.die.net/man/2/setns>`_. Unlike ``docker`` and ``lxc`` the
namespace transition is handled directly, ensuring optimal throughput to the
child. This is necessary for ``machinectl`` where only PTY channels are
supported.
Utility programs must still be installed to discover the PID of the container's
root process.
* ``mitogen_container_kind``: one of ``docker``, ``lxc`` or ``machinectl``.
* ``ansible_host``: Name of container as it is known to the corresponding tool
(default: inventory hostname).
* ``mitogen_docker_path``: path to Docker if not available on the system path.
* ``mitogen_lxc_info_path``: path to ``lxc-info`` command if not available as
``/usr/bin/lxc-info``.
* ``mitogen_machinectl_path``: path to ``machinectl`` command if not available
as ``/bin/machinectl``.
SSH
~~~
Behaves like `ssh
<https://docs.ansible.com/ansible/2.5/plugins/connection/ssh.html>`_ except
connection delegation is supported.
* ``ansible_ssh_timeout``
* ``ansible_host``, ``ansible_ssh_host``
* ``ansible_user``, ``ansible_ssh_user``
* ``ansible_port``, ``ssh_port``
* ``ansible_ssh_executable``, ``ssh_executable``
* ``ansible_ssh_private_key_file``
* ``ansible_ssh_pass``, ``ansible_password`` (default: assume passwordless)
* ``ssh_args``, ``ssh_common_args``, ``ssh_extra_args``
FreeBSD Jails FreeBSD Jails
~~~~~~~~~~~~~ ~~~~~~~~~~~~~
Behaves like `jail
<https://docs.ansible.com/ansible/2.5/plugins/connection/jail.html>`_ except
connection delegation is supported.
* ``ansible_host``: Name of jail (default: inventory hostname). * ``ansible_host``: Name of jail (default: inventory hostname).
* ``ansible_user``: Name of user within the jail to execute as. * ``ansible_user``: Name of user within the jail to execute as.
Local
~~~~~
Behaves like `local
<https://docs.ansible.com/ansible/2.5/plugins/connection/local.html>`_ except
connection delegation is supported.
* ``ansible_python_interpreter``
LXC LXC
~~~ ~~~
Both ``lxc`` and ``lxd`` connection plug-ins are hijacked, however the Behaves like `lxc
resulting implementation always uses the ``lxc-attach`` command line tool <https://docs.ansible.com/ansible/2.5/plugins/connection/lxc.html>`_ and `lxd
rather than the LXC Python bindings, as is usual with the Ansible ``lxd`` <https://docs.ansible.com/ansible/2.5/plugins/connection/lxd.html>`_ except
plug-in. conncetion delegation is supported, and the ``lxc-attach`` tool is always used
rather than the LXC Python bindings, as is usual with the ``lxc`` method.
Consequently the ``lxc-attach`` command is required to be available on the host The ``lxc-attach`` command must be available on the host machine.
machine.
* ``ansible_host``: Name of LXC container (default: inventory hostname). * ``ansible_host``: Name of LXC container (default: inventory hostname).

Loading…
Cancel
Save