diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 761eb8ea..eee0ac0e 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -30,6 +30,7 @@ from __future__ import absolute_import import logging import os import shlex +import stat import sys import time @@ -470,7 +471,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): mitogen.utils.cast(in_path)) ansible_mitogen.target.write_path(out_path, output) - def put_data(self, out_path, data): + def put_data(self, out_path, data, mode=None, utimes=None): """ Implement put_file() by caling the corresponding ansible_mitogen.target function in the target. @@ -482,7 +483,9 @@ class Connection(ansible.plugins.connection.ConnectionBase): """ self.call(ansible_mitogen.target.write_path, mitogen.utils.cast(out_path), - mitogen.utils.cast(data)) + mitogen.utils.cast(data), + mode=mode, + utimes=utimes) def put_file(self, in_path, out_path): """ @@ -494,6 +497,25 @@ class Connection(ansible.plugins.connection.ConnectionBase): :param str out_path: Remote filesystem path to write. """ + st = os.stat(in_path) + if not stat.S_ISREG(st.st_mode): + raise IOError('%r is not a regular file.' % (in_path,)) + + # If the file is sufficiently small, just ship it in the argument list + # rather than introducing an extra RTT for the child to request it from + # FileService. + if st.st_size <= 32768: + fp = open(in_path, 'rb') + try: + s = fp.read(32769) + finally: + fp.close() + + # Ensure file was not growing during call. + if len(s) == st.st_size: + return self.put_data(out_path, s, mode=st.st_mode, + utimes=(st.st_atime, st.st_mtime)) + mitogen.service.call( context=self.parent, handle=ansible_mitogen.services.FileService.handle, diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 57f19ebd..c3157bf0 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -38,11 +38,12 @@ when a child has completed a job. """ from __future__ import absolute_import -import hashlib +import grp import logging import os import os.path -import pprint +import pwd +import stat import sys import threading import zlib @@ -435,7 +436,7 @@ class FileService(mitogen.service.Service): def __init__(self, router): super(FileService, self).__init__(router) #: Mapping of registered path -> file size. - self._size_by_path = {} + self._metadata_by_path = {} #: Queue used to communicate from service to scheduler thread. self._queue = mitogen.core.Latch() #: Mapping of Stream->[(Sender, file object)]. @@ -545,6 +546,12 @@ class FileService(mitogen.service.Service): sender.close() fp.close() + def _name_or_none(self, func, n, attr): + try: + return getattr(func(n), attr) + except KeyError: + return None + @mitogen.service.expose(policy=mitogen.service.AllowParents()) @mitogen.service.arg_spec({ 'path': basestring @@ -557,9 +564,22 @@ class FileService(mitogen.service.Service): :param str path: File path. """ - if path not in self._size_by_path: - LOG.debug('%r: registering %r', self, path) - self._size_by_path[path] = os.path.getsize(path) + if path in self._metadata_by_path: + return + + st = os.stat(path) + if not stat.S_ISREG(st.st_mode): + raise IOError('%r is not a regular file.' % (in_path,)) + + LOG.debug('%r: registering %r', self, path) + self._metadata_by_path[path] = { + 'size': st.st_size, + 'mode': st.st_mode, + 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), + 'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'), + 'mtime': st.st_mtime, + 'atime': st.st_atime, + } @mitogen.service.expose(policy=mitogen.service.AllowAny()) @mitogen.service.arg_spec({ @@ -575,15 +595,21 @@ class FileService(mitogen.service.Service): :param mitogen.core.Sender sender: Sender to receive file data. :returns: - File size. The target can decide whether to keep the file in RAM or - disk based on the return value. + Dict containing the file metadata: + + * ``size``: File size in bytes. + * ``mode``: Integer file mode. + * ``owner``: Owner account name on host machine. + * ``group``: Owner group name on host machine. + * ``mtime``: Floating point modification time. + * ``ctime``: Floating point change time. :raises mitogen.core.CallError: The path was not registered. """ - if path not in self._size_by_path: + if path not in self._metadata_by_path: raise mitogen.core.CallError(self.unregistered_msg) LOG.debug('Serving %r', path) fp = open(path, 'rb', mitogen.core.CHUNK_SIZE) self._queue.put((sender, fp)) - return self._size_by_path[path] + return self._metadata_by_path[path] diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index ee779546..4b66745f 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -33,6 +33,7 @@ for file transfer, module execution and sundry bits like changing file modes. from __future__ import absolute_import import cStringIO +import grp import json import logging import operator @@ -85,7 +86,7 @@ def _get_file(context, path, out_fp): LOG.debug('_get_file(): fetching %r from %r', path, context) t0 = time.time() recv = mitogen.core.Receiver(router=context.router) - size = mitogen.service.call( + metadata = mitogen.service.call( context=context, handle=ansible_mitogen.services.FileService.handle, method='fetch', @@ -100,13 +101,14 @@ def _get_file(context, path, out_fp): LOG.debug('_get_file(%r): received %d bytes', path, len(s)) out_fp.write(s) - if out_fp.tell() != size: + ok = out_fp.tell() == metadata['size'] + if not ok: LOG.error('get_file(%r): receiver was closed early, controller ' 'is likely shutting down.', path) LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', - size, path, context, 1000*(time.time() - t0)) - return out_fp.tell() == size + metadata['size'], path, context, 1000*(time.time() - t0)) + return ok, metadata def get_file(context, path): @@ -125,13 +127,14 @@ def get_file(context, path): """ if path not in _file_cache: io = cStringIO.StringIO() - if not _get_file(context, path, io): + ok, metadata = _get_file(context, path, io) + if not ok: raise IOError('transfer of %r was interrupted.' % (path,)) _file_cache[path] = io.getvalue() return _file_cache[path] -def transfer_file(context, in_path, out_path): +def transfer_file(context, in_path, out_path, sync=False, set_owner=False): """ Streamily download a file from the connection multiplexer process in the controller. @@ -143,19 +146,42 @@ def transfer_file(context, in_path, out_path): FileService registered name of the input file. :param bytes out_path: Name of the output path on the local disk. - """ - fp = open(out_path+'.tmp', 'wb', mitogen.core.CHUNK_SIZE) + :param bool sync: + If :data:`True`, ensure the file content and metadat are fully on disk + before renaming the temporary file over the existing file. This should + ensure in the case of system crash, either the entire old or new file + are visible post-reboot. + :param bool set_owner: + If :data:`True`, look up the metadata username and group on the local + system and file the file owner using :func:`os.fchmod`. + """ + out_path = os.path.abspath(out_path) + fd, tmp_path = tempfile.mkstemp(suffix='.tmp', + prefix='.ansible_mitogen_transfer-', + dir=os.path.dirname(out_path)) + fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE) + LOG.debug('transfer_file(out_path=%r) tempory file: %s', out_path, tmp_path) + try: try: - if not _get_file(context, in_path, fp): + os.fchmod(tmp_path, metadata['mode']) + if set_owner: + set_fd_owner(fp.fileno(), metadata['owner'], metadata['group']) + + ok, metadata = _get_file(context, in_path, fp) + if not ok: raise IOError('transfer of %r was interrupted.' % (in_path,)) - except Exception: - os.unlink(fp.name) - raise - finally: - fp.close() + finally: + fp.close() + + if sync: + os.fsync(fp.fileno()) + os.rename(tmp_path, out_path) + except: + os.unlink(tmp_path) + raise - os.rename(out_path + '.tmp', out_path) + os.utime(out_path, (metadata['atime'], metadata['mtime'])) @mitogen.core.takes_econtext @@ -392,11 +418,51 @@ def read_path(path): return open(path, 'rb').read() -def write_path(path, s): +def set_fd_owner(fd, owner, group=None): + if owner: + uid = pwd.getpwnam(owner).pw_uid + else: + uid = os.geteuid() + + if group: + gid = grp.getgrnam(group).gr_gid + else: + gid = os.getegid() + + os.fchown(fd, (uid, gid)) + + +def write_path(path, s, owner=None, group=None, mode=None, + utimes=None, sync=False): """ Writes bytes `s` to a filesystem `path`. """ - open(path, 'wb').write(s) + path = os.path.abspath(path) + fd, tmp_path = tempfile.mkstemp(suffix='.tmp', + prefix='.ansible_mitogen_transfer-', + dir=os.path.dirname(path)) + fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE) + LOG.debug('write_path(path=%r) tempory file: %s', path, tmp_path) + + try: + try: + if mode: + os.fchmod(tmp_path, mode) + if owner or group: + set_fd_owner(fp.fileno(), owner, group) + fp.write(s) + finally: + fp.close() + + if sync: + os.fsync(fp.fileno()) + os.rename(tmp_path, out_path) + except: + os.unlink(tmp_path) + raise + + if utimes: + os.utime(out_path, utimes) CHMOD_CLAUSE_PAT = re.compile(r'([uoga]*)([+\-=])([ugo]|[rwx]*)')