From cb73c44084c4b01f520ace8531f7de2fc02d11fe Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 02:48:06 +0100 Subject: [PATCH] ansible: implement streaming in Connection.put_file(). This is the function the copy module uses. --- ansible_mitogen/connection.py | 25 +++++++++++++---- ansible_mitogen/target.py | 52 +++++++++++++++++++++++++++++++---- 2 files changed, 66 insertions(+), 11 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 4c5cad21..69dec732 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -41,7 +41,7 @@ import mitogen.utils import ansible_mitogen.target import ansible_mitogen.process -from ansible_mitogen.services import ContextService +import ansible_mitogen.services LOG = logging.getLogger(__name__) @@ -160,7 +160,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): def _wrap_connect(self, on_error, kwargs): dct = mitogen.service.call( context=self.parent, - handle=ContextService.handle, + handle=ansible_mitogen.services.ContextService.handle, method='get', kwargs=mitogen.utils.cast(kwargs), ) @@ -300,7 +300,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): if context: mitogen.service.call( context=self.parent, - handle=ContextService.handle, + handle=ansible_mitogen.services.ContextService.handle, method='put', kwargs={ 'context': context @@ -398,12 +398,25 @@ class Connection(ansible.plugins.connection.ConnectionBase): def put_file(self, in_path, out_path): """ - Implement put_file() by caling the corresponding - ansible_mitogen.target function in the target. + Implement put_file() by streamily transferring the file via + FileService. :param str in_path: Local filesystem path to read. :param str out_path: Remote filesystem path to write. """ - self.put_data(out_path, ansible_mitogen.target.read_path(in_path)) + mitogen.service.call( + context=self.parent, + handle=ansible_mitogen.services.FileService.handle, + method='register', + kwargs={ + 'path': mitogen.utils.cast(in_path) + } + ) + self.call( + ansible_mitogen.target.transfer_file, + context=self.parent, + in_path=in_path, + out_path=out_path + ) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 4edc0b60..f04a776f 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -66,7 +66,22 @@ _fork_parent = None def _get_file(context, path, out_fp): - LOG.debug('get_file(): fetching %r from %r', path, context) + """ + Streamily download a file from the connection multiplexer process in the + controller. + + :param mitogen.core.Context context: + Reference to the context hosting the FileService that will be used to + fetch the file. + :param bytes in_path: + FileService registered name of the input file. + :param bytes out_path: + Name of the output path on the local disk. + :returns: + :data:`True` on success, or :data:`False` if the transfer was + interrupted and the output should be discarded. + """ + LOG.debug('_get_file(): fetching %r from %r', path, context) recv = mitogen.core.Receiver(router=context.router) size = mitogen.service.call( context=context, @@ -85,12 +100,11 @@ def _get_file(context, path, out_fp): if out_fp.tell() != size: LOG.error('get_file(%r): receiver was closed early, controller ' - 'is likely shutting down. Truncating output file.', path) - out_fp.truncate(0) + 'is likely shutting down.', path) LOG.debug('target.get_file(): fetched %d bytes of %r from %r', size, path, context) - return size + return out_fp.tell() == size def get_file(context, path): @@ -109,11 +123,39 @@ def get_file(context, path): """ if path not in _file_cache: io = cStringIO.StringIO() - _get_file(context, path, io) + if not _get_file(context, path, io): + raise IOError('transfer of %r was interrupted.' % (path,)) _file_cache[path] = io.getvalue() return _file_cache[path] +def transfer_file(context, in_path, out_path): + """ + Streamily download a file from the connection multiplexer process in the + controller. + + :param mitogen.core.Context context: + Reference to the context hosting the FileService that will be used to + fetch the file. + :param bytes in_path: + FileService registered name of the input file. + :param bytes out_path: + Name of the output path on the local disk. + """ + fp = open(out_path+'.tmp', 'wb', mitogen.core.CHUNK_SIZE) + try: + try: + if not _get_file(context, in_path, fp): + raise IOError('transfer of %r was interrupted.' % (in_path,)) + except Exception: + os.unlink(fp.name) + raise + finally: + fp.close() + + os.rename(out_path + '.tmp', out_path) + + @mitogen.core.takes_econtext def start_fork_parent(econtext): """