ansible: implement streaming in Connection.put_file().

This is the function the copy module uses.
pull/209/head
David Wilson 7 years ago
parent 29087018c7
commit cb73c44084

@ -41,7 +41,7 @@ import mitogen.utils
import ansible_mitogen.target import ansible_mitogen.target
import ansible_mitogen.process import ansible_mitogen.process
from ansible_mitogen.services import ContextService import ansible_mitogen.services
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -160,7 +160,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
def _wrap_connect(self, on_error, kwargs): def _wrap_connect(self, on_error, kwargs):
dct = mitogen.service.call( dct = mitogen.service.call(
context=self.parent, context=self.parent,
handle=ContextService.handle, handle=ansible_mitogen.services.ContextService.handle,
method='get', method='get',
kwargs=mitogen.utils.cast(kwargs), kwargs=mitogen.utils.cast(kwargs),
) )
@ -300,7 +300,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
if context: if context:
mitogen.service.call( mitogen.service.call(
context=self.parent, context=self.parent,
handle=ContextService.handle, handle=ansible_mitogen.services.ContextService.handle,
method='put', method='put',
kwargs={ kwargs={
'context': context 'context': context
@ -398,12 +398,25 @@ class Connection(ansible.plugins.connection.ConnectionBase):
def put_file(self, in_path, out_path): def put_file(self, in_path, out_path):
""" """
Implement put_file() by caling the corresponding Implement put_file() by streamily transferring the file via
ansible_mitogen.target function in the target. FileService.
:param str in_path: :param str in_path:
Local filesystem path to read. Local filesystem path to read.
:param str out_path: :param str out_path:
Remote filesystem path to write. Remote filesystem path to write.
""" """
self.put_data(out_path, ansible_mitogen.target.read_path(in_path)) mitogen.service.call(
context=self.parent,
handle=ansible_mitogen.services.FileService.handle,
method='register',
kwargs={
'path': mitogen.utils.cast(in_path)
}
)
self.call(
ansible_mitogen.target.transfer_file,
context=self.parent,
in_path=in_path,
out_path=out_path
)

@ -66,7 +66,22 @@ _fork_parent = None
def _get_file(context, path, out_fp): def _get_file(context, path, out_fp):
LOG.debug('get_file(): fetching %r from %r', path, context) """
Streamily download a file from the connection multiplexer process in the
controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used to
fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
:returns:
:data:`True` on success, or :data:`False` if the transfer was
interrupted and the output should be discarded.
"""
LOG.debug('_get_file(): fetching %r from %r', path, context)
recv = mitogen.core.Receiver(router=context.router) recv = mitogen.core.Receiver(router=context.router)
size = mitogen.service.call( size = mitogen.service.call(
context=context, context=context,
@ -85,12 +100,11 @@ def _get_file(context, path, out_fp):
if out_fp.tell() != size: if out_fp.tell() != size:
LOG.error('get_file(%r): receiver was closed early, controller ' LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down. Truncating output file.', path) 'is likely shutting down.', path)
out_fp.truncate(0)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r', LOG.debug('target.get_file(): fetched %d bytes of %r from %r',
size, path, context) size, path, context)
return size return out_fp.tell() == size
def get_file(context, path): def get_file(context, path):
@ -109,11 +123,39 @@ def get_file(context, path):
""" """
if path not in _file_cache: if path not in _file_cache:
io = cStringIO.StringIO() io = cStringIO.StringIO()
_get_file(context, path, io) if not _get_file(context, path, io):
raise IOError('transfer of %r was interrupted.' % (path,))
_file_cache[path] = io.getvalue() _file_cache[path] = io.getvalue()
return _file_cache[path] return _file_cache[path]
def transfer_file(context, in_path, out_path):
"""
Streamily download a file from the connection multiplexer process in the
controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used to
fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
"""
fp = open(out_path+'.tmp', 'wb', mitogen.core.CHUNK_SIZE)
try:
try:
if not _get_file(context, in_path, fp):
raise IOError('transfer of %r was interrupted.' % (in_path,))
except Exception:
os.unlink(fp.name)
raise
finally:
fp.close()
os.rename(out_path + '.tmp', out_path)
@mitogen.core.takes_econtext @mitogen.core.takes_econtext
def start_fork_parent(econtext): def start_fork_parent(econtext):
""" """

Loading…
Cancel
Save