try to proxy copy of pl loading

pull/81854/head
Brian Coca 1 year ago
parent d8f791d88c
commit dabae80ca9

@ -169,6 +169,7 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
# Set the queue on Display so calls to Display.display are proxied over the queue # Set the queue on Display so calls to Display.display are proxied over the queue
display.set_queue(self._final_q) display.set_queue(self._final_q)
self._shared_loader_obj.set_queue(self._final_q)
global current_worker global current_worker
current_worker = self current_worker = self

@ -66,6 +66,12 @@ class DisplaySend:
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
class PluginLoaderSend:
def __init__(self, method, *args, **kwargs):
self.method = method
self.args = args
self.kwargs = kwargs
@dataclass @dataclass
class PromptSend: class PromptSend:
@ -106,6 +112,8 @@ class FinalQueue(multiprocessing.queues.SimpleQueue):
PromptSend(**kwargs), PromptSend(**kwargs),
) )
def send_plugin_loader(self, *args, **kwargs):
self.put(PluginLoaderSend(*args, **kwargs))
class AnsibleEndPlay(Exception): class AnsibleEndPlay(Exception):
def __init__(self, result): def __init__(self, result):

@ -34,6 +34,7 @@ from ansible.plugins import get_plugin_class, MODULE_CACHE, PATH_CACHE, PLUGIN_P
from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef
from ansible.utils.collection_loader._collection_finder import _AnsibleCollectionFinder, _get_collection_metadata from ansible.utils.collection_loader._collection_finder import _AnsibleCollectionFinder, _get_collection_metadata
from ansible.utils.display import Display from ansible.utils.display import Display
from ansible.utils.multiprocessing import context as multiprocessing_context
from ansible.utils.plugin_docs import add_fragments from ansible.utils.plugin_docs import add_fragments
# TODO: take the packaging dep, or vendor SpecifierSet? # TODO: take the packaging dep, or vendor SpecifierSet?
@ -52,6 +53,29 @@ display = Display()
get_with_context_result = namedtuple('get_with_context_result', ['object', 'plugin_load_context']) get_with_context_result = namedtuple('get_with_context_result', ['object', 'plugin_load_context'])
_final_q = None
def proxy_load(method):
def proxyit(self, *args, **kwargs):
if sys.modules[__name__]._final_q:
# If _final_q is set, that means we are in a WorkerProcess
# and instead of displaying messages directly from the fork
# we will proxy them through the queue
sys.modules[__name__]._final_q.send_plugin_loader(method.__name__, *args, **kwargs)
return method(self, *args, **kwargs)
return proxyit
def set_queue(queue):
"""Set the _final_q on PLuginLoader, so that we know to proxy requests over the queue
This is only needed in ansible.executor.process.worker:WorkerProcess._run
"""
if multiprocessing_context.parent_process() is None:
raise RuntimeError('queue cannot be set in parent process')
sys.modules[__name__]._final_q = queue
def get_all_plugin_loaders(): def get_all_plugin_loaders():
return [(name, obj) for (name, obj) in globals().items() if isinstance(obj, PluginLoader)] return [(name, obj) for (name, obj) in globals().items() if isinstance(obj, PluginLoader)]
@ -858,6 +882,7 @@ class PluginLoader:
def get(self, name, *args, **kwargs): def get(self, name, *args, **kwargs):
return self.get_with_context(name, *args, **kwargs).object return self.get_with_context(name, *args, **kwargs).object
@proxy_load
def get_with_context(self, name, *args, **kwargs): def get_with_context(self, name, *args, **kwargs):
''' instantiates a plugin of the given name using arguments ''' ''' instantiates a plugin of the given name using arguments '''
@ -932,6 +957,7 @@ class PluginLoader:
display.debug(msg) display.debug(msg)
@proxy_load
def all(self, *args, **kwargs): def all(self, *args, **kwargs):
''' '''
Iterate through all plugins of this type, in configured paths (no collections) Iterate through all plugins of this type, in configured paths (no collections)
@ -1125,6 +1151,7 @@ class Jinja2Loader(PluginLoader):
# FUTURE: now that the resulting plugins are closer, refactor base class method with some extra # FUTURE: now that the resulting plugins are closer, refactor base class method with some extra
# hooks so we can avoid all the duplicated plugin metadata logic, and also cache the collection results properly here # hooks so we can avoid all the duplicated plugin metadata logic, and also cache the collection results properly here
@proxy_load
def get_with_context(self, name, *args, **kwargs): def get_with_context(self, name, *args, **kwargs):
# pop N/A kwargs to avoid passthrough to parent methods # pop N/A kwargs to avoid passthrough to parent methods
kwargs.pop('class_only', False) kwargs.pop('class_only', False)
@ -1258,6 +1285,7 @@ class Jinja2Loader(PluginLoader):
return get_with_context_result(plugin, context) return get_with_context_result(plugin, context)
@proxy_load
def all(self, *args, **kwargs): def all(self, *args, **kwargs):
kwargs.pop('_dedupe', None) kwargs.pop('_dedupe', None)
path_only = kwargs.pop('path_only', False) path_only = kwargs.pop('path_only', False)

Loading…
Cancel
Save