ansible: use PushFileService for module deps.

planner.py:
  * Rather than grant FileService access to a file for children, use
    PushFileService to trigger deduplicating send of the file through
    the hierarchy immediately.
  * Send the complete list of Ansible module imports to the target so
    runner.py knows which files and scripts must be loaded via
    PushFileService prior to detaching.

runner.py:
  * Teach NewStyleRunner to use the full module map to block until
    everything is loaded prior to detach().

target.py:
  * Delete old _get_file(), replace get_file() with get_small_file()
    which uses PushFileService instead.

Closes #186
pull/262/head
David Wilson 7 years ago
parent 7d4f4b205f
commit 569c12a2d6

@ -182,9 +182,10 @@ class BinaryPlanner(Planner):
def _grant_file_service_access(self, invocation): def _grant_file_service_access(self, invocation):
invocation.connection.parent.call_service( invocation.connection.parent.call_service(
service_name='mitogen.service.FileService', service_name='mitogen.service.PushFileService',
method_name='register', method_name='propagate_to',
path=invocation.module_path, path=invocation.module_path,
context=invocation.connection.context,
) )
def plan(self, invocation, **kwargs): def plan(self, invocation, **kwargs):
@ -295,7 +296,7 @@ class NewStylePlanner(ScriptPlanner):
if os.path.isdir(path) if os.path.isdir(path)
) )
def get_module_utils(self, invocation): def get_module_map(self, invocation):
return invocation.connection.parent.call_service( return invocation.connection.parent.call_service(
service_name='ansible_mitogen.services.ModuleDepService', service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan', method_name='scan',
@ -308,11 +309,14 @@ class NewStylePlanner(ScriptPlanner):
) )
def plan(self, invocation): def plan(self, invocation):
module_utils = self.get_module_utils(invocation) module_map = self.get_module_map(invocation)
return super(NewStylePlanner, self).plan( return super(NewStylePlanner, self).plan(
invocation, invocation,
module_utils=module_utils, module_map=module_map,
should_fork=(self.get_should_fork(invocation) or bool(module_utils)), should_fork=(
self.get_should_fork(invocation) or
len(module_map['custom']) > 0
)
) )

@ -155,14 +155,16 @@ class MuxProcess(object):
arriving from worker processes. arriving from worker processes.
""" """
file_service = mitogen.service.FileService(router=self.router) file_service = mitogen.service.FileService(router=self.router)
push_file_service = mitogen.service.PushFileService(router=self.router)
self.pool = mitogen.service.Pool( self.pool = mitogen.service.Pool(
router=self.router, router=self.router,
services=[ services=[
file_service, file_service,
push_file_service,
ansible_mitogen.services.ContextService(self.router), ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.ModuleDepService( ansible_mitogen.services.ModuleDepService(
router=self.router, router=self.router,
file_service=file_service, push_file_service=push_file_service,
), ),
], ],
size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')),

@ -47,6 +47,7 @@ import sys
import tempfile import tempfile
import types import types
import mitogen.core
import ansible_mitogen.target # TODO: circular import import ansible_mitogen.target # TODO: circular import
try: try:
@ -213,7 +214,7 @@ class ModuleUtilsImporter(object):
def load_module(self, fullname): def load_module(self, fullname):
path, is_pkg = self._by_fullname[fullname] path, is_pkg = self._by_fullname[fullname]
source = ansible_mitogen.target.get_file(self._context, path) source = ansible_mitogen.target.get_small_file(self._context, path)
code = compile(source, path, 'exec') code = compile(source, path, 'exec')
mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = "master:%s" % (path,) mod.__file__ = "master:%s" % (path,)
@ -320,7 +321,7 @@ class ProgramRunner(Runner):
""" """
Fetch the module binary from the master if necessary. Fetch the module binary from the master if necessary.
""" """
return ansible_mitogen.target.get_file( return ansible_mitogen.target.get_small_file(
context=self.service_context, context=self.service_context,
path=self.path, path=self.path,
) )
@ -448,12 +449,30 @@ class NewStyleRunner(ScriptRunner):
#: path => new-style module bytecode. #: path => new-style module bytecode.
_code_by_path = {} _code_by_path = {}
def __init__(self, module_utils, **kwargs): def __init__(self, module_map, **kwargs):
super(NewStyleRunner, self).__init__(**kwargs) super(NewStyleRunner, self).__init__(**kwargs)
self.module_utils = module_utils self.module_map = module_map
def _setup_imports(self):
"""
Ensure the local importer has loaded every module needed by the Ansible
module before setup() completes, but before detach() is called in an
asynchronous task.
The master automatically streams modules towards us concurrent to the
runner invocation, however there is no public API to synchronize on the
completion of those preloads. Instead simply reuse the importer's
synchronization mechanism by importing everything the module will need
prior to detaching.
"""
for fullname, _, _ in self.module_map['custom']:
mitogen.core.import_module(fullname)
for fullname in self.module_map['builtin']:
mitogen.core.import_module(fullname)
def setup(self): def setup(self):
super(NewStyleRunner, self).setup() super(NewStyleRunner, self).setup()
self._stdio = NewStyleStdio(self.args) self._stdio = NewStyleStdio(self.args)
# It is possible that not supplying the script filename will break some # It is possible that not supplying the script filename will break some
# module, but this has never been a bug report. Instead act like an # module, but this has never been a bug report. Instead act like an
@ -461,8 +480,9 @@ class NewStyleRunner(ScriptRunner):
self._argv = TemporaryArgv(['']) self._argv = TemporaryArgv([''])
self._importer = ModuleUtilsImporter( self._importer = ModuleUtilsImporter(
context=self.service_context, context=self.service_context,
module_utils=self.module_utils, module_utils=self.module_map['custom'],
) )
self._setup_imports()
if libc__res_init: if libc__res_init:
libc__res_init() libc__res_init()
@ -484,7 +504,7 @@ class NewStyleRunner(ScriptRunner):
pass pass
def _get_code(self): def _get_code(self):
self.source = ansible_mitogen.target.get_file( self.source = ansible_mitogen.target.get_small_file(
context=self.service_context, context=self.service_context,
path=self.path, path=self.path,
) )

@ -369,9 +369,9 @@ class ModuleDepService(mitogen.service.Service):
Scan a new-style module and produce a cached mapping of module_utils names Scan a new-style module and produce a cached mapping of module_utils names
to their resolved filesystem paths. to their resolved filesystem paths.
""" """
def __init__(self, file_service, **kwargs): def __init__(self, push_file_service, **kwargs):
super(ModuleDepService, self).__init__(**kwargs) super(ModuleDepService, self).__init__(**kwargs)
self._file_service = file_service self._push_file_service = push_file_service
self._cache = {} self._cache = {}
def _get_builtin_names(self, builtin_path, resolved): def _get_builtin_names(self, builtin_path, resolved):
@ -414,10 +414,17 @@ class ModuleDepService(mitogen.service.Service):
# Grant FileService access to paths in here to avoid another 2 IPCs # Grant FileService access to paths in here to avoid another 2 IPCs
# from WorkerProcess. # from WorkerProcess.
self._file_service.register(path=module_path) self._push_file_service.propagate_to(
path=module_path,
context=context,
)
for fullname, path, is_pkg in custom: for fullname, path, is_pkg in custom:
self._file_service.register(path=path) self._push_file_service.propagate_to(
path=path,
context=context,
)
for name in self._cache[key]['builtin']: for name in self._cache[key]['builtin']:
self.router.responder.forward_module(context, name) self.router.responder.forward_module(context, name)
return self._cache[key]['custom'] return self._cache[key]

@ -62,61 +62,12 @@ LOG = logging.getLogger(__name__)
#: the duration of the process. #: the duration of the process.
temp_dir = None temp_dir = None
#: Caching of fetched file data.
_file_cache = {}
#: Initialized to an econtext.parent.Context pointing at a pristine fork of #: Initialized to an econtext.parent.Context pointing at a pristine fork of
#: the target Python interpreter before it executes any code or imports. #: the target Python interpreter before it executes any code or imports.
_fork_parent = None _fork_parent = None
def _get_file(context, path, out_fp): def get_small_file(context, path):
"""
Streamily download a file from the connection multiplexer process in the
controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used to
fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
:returns:
:data:`True` on success, or :data:`False` if the transfer was
interrupted and the output should be discarded.
"""
LOG.debug('_get_file(): fetching %r from %r', path, context)
t0 = time.time()
recv = mitogen.core.Receiver(router=context.router)
metadata = context.call_service(
service_name='mitogen.service.FileService',
method_name='fetch',
path=path,
sender=recv.to_sender(),
)
for chunk in recv:
s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
context.call_service_async(
service_name='mitogen.service.FileService',
method_name='acknowledge',
size=len(s),
).close()
out_fp.write(s)
ok = out_fp.tell() == metadata['size']
if not ok:
LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down.', path)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
metadata['size'], path, context, 1000 * (time.time() - t0))
return ok, metadata
def get_file(context, path):
""" """
Basic in-memory caching module fetcher. This generates an one roundtrip for Basic in-memory caching module fetcher. This generates an one roundtrip for
every previously unseen file, so it is only a temporary solution. every previously unseen file, so it is only a temporary solution.
@ -130,13 +81,9 @@ def get_file(context, path):
:returns: :returns:
Bytestring file data. Bytestring file data.
""" """
if path not in _file_cache: pool = mitogen.service.get_or_create_pool()
io = cStringIO.StringIO() service = pool.get_service('mitogen.service.PushFileService')
ok, metadata = _get_file(context, path, io) return service.get(path)
if not ok:
raise IOError('transfer of %r was interrupted.' % (path,))
_file_cache[path] = io.getvalue()
return _file_cache[path]
def transfer_file(context, in_path, out_path, sync=False, set_owner=False): def transfer_file(context, in_path, out_path, sync=False, set_owner=False):

Loading…
Cancel
Save