issue #186: move target._get_file into mitogen.service

For lack of a better place to keep the client function, make it a
classmethod of FileService itself for now.

The old _get_file() is removed in a subsequent commit.
pull/262/head
David Wilson 6 years ago
parent a3b747af1b
commit 76beea6554

@ -53,6 +53,7 @@ import ansible_mitogen.runner
import mitogen.core
import mitogen.fork
import mitogen.parent
import mitogen.service
LOG = logging.getLogger(__name__)
@ -168,7 +169,11 @@ def transfer_file(context, in_path, out_path, sync=False, set_owner=False):
try:
try:
ok, metadata = _get_file(context, in_path, fp)
ok, metadata = mitogen.service.FileService.get(
context=context,
path=in_path,
out_fp=fp,
)
if not ok:
raise IOError('transfer of %r was interrupted.' % (in_path,))

@ -669,9 +669,8 @@ class PushFileService(Service):
class FileService(Service):
"""
Streaming file server, used to serve small files and huge files alike.
Paths must be registered by a trusted context before they will be served to
a child.
Streaming file server, used to serve small and huge files alike. Paths must
be registered by a trusted context before they will be served to a child.
Transfers are divided among the physical streams that connect external
contexts, ensuring each stream never has excessive data buffered in RAM,
@ -889,3 +888,49 @@ class FileService(Service):
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
@classmethod
def get(cls, context, path, out_fp):
"""
Streamily download a file from the connection multiplexer process in
the controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used
to fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
:returns:
:data:`True` on success, or :data:`False` if the transfer was
interrupted and the output should be discarded.
"""
LOG.debug('get_file(): fetching %r from %r', path, context)
t0 = time.time()
recv = mitogen.core.Receiver(router=context.router)
metadata = context.call_service(
service_name=cls.name(),
method_name='fetch',
path=path,
sender=recv.to_sender(),
)
for chunk in recv:
s = chunk.unpickle()
LOG.debug('get_file(%r): received %d bytes', path, len(s))
context.call_service_async(
service_name=cls.name(),
method_name='acknowledge',
size=len(s),
).close()
out_fp.write(s)
ok = out_fp.tell() == metadata['size']
if not ok:
LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down.', path)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
metadata['size'], path, context, 1000 * (time.time() - t0))
return ok, metadata

Loading…
Cancel
Save