diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index c043137c95f..c3094d17004 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -169,6 +169,7 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin # Set the queue on Display so calls to Display.display are proxied over the queue display.set_queue(self._final_q) + self._shared_loader_obj.set_queue(self._final_q) global current_worker current_worker = self diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 3bbf3d592e1..85359c1f176 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -66,6 +66,12 @@ class DisplaySend: self.args = args self.kwargs = kwargs +class PluginLoaderSend: + def __init__(self, method, *args, **kwargs): + self.method = method + self.args = args + self.kwargs = kwargs + @dataclass class PromptSend: @@ -106,6 +112,8 @@ class FinalQueue(multiprocessing.queues.SimpleQueue): PromptSend(**kwargs), ) + def send_plugin_loader(self, *args, **kwargs): + self.put(PluginLoaderSend(*args, **kwargs)) class AnsibleEndPlay(Exception): def __init__(self, result): diff --git a/lib/ansible/plugins/loader.py b/lib/ansible/plugins/loader.py index dc0eb891a9a..d54c2f0b92e 100644 --- a/lib/ansible/plugins/loader.py +++ b/lib/ansible/plugins/loader.py @@ -34,6 +34,7 @@ from ansible.plugins import get_plugin_class, MODULE_CACHE, PATH_CACHE, PLUGIN_P from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef from ansible.utils.collection_loader._collection_finder import _AnsibleCollectionFinder, _get_collection_metadata from ansible.utils.display import Display +from ansible.utils.multiprocessing import context as multiprocessing_context from ansible.utils.plugin_docs import add_fragments # TODO: take the packaging dep, or vendor SpecifierSet? @@ -52,6 +53,29 @@ display = Display() get_with_context_result = namedtuple('get_with_context_result', ['object', 'plugin_load_context']) +_final_q = None + + +def proxy_load(method): + + def proxyit(self, *args, **kwargs): + if sys.modules[__name__]._final_q: + # If _final_q is set, that means we are in a WorkerProcess + # and instead of displaying messages directly from the fork + # we will proxy them through the queue + sys.modules[__name__]._final_q.send_plugin_loader(method.__name__, *args, **kwargs) + return method(self, *args, **kwargs) + + return proxyit + +def set_queue(queue): + """Set the _final_q on PLuginLoader, so that we know to proxy requests over the queue + This is only needed in ansible.executor.process.worker:WorkerProcess._run + """ + if multiprocessing_context.parent_process() is None: + raise RuntimeError('queue cannot be set in parent process') + sys.modules[__name__]._final_q = queue + def get_all_plugin_loaders(): return [(name, obj) for (name, obj) in globals().items() if isinstance(obj, PluginLoader)] @@ -858,6 +882,7 @@ class PluginLoader: def get(self, name, *args, **kwargs): return self.get_with_context(name, *args, **kwargs).object + @proxy_load def get_with_context(self, name, *args, **kwargs): ''' instantiates a plugin of the given name using arguments ''' @@ -932,6 +957,7 @@ class PluginLoader: display.debug(msg) + @proxy_load def all(self, *args, **kwargs): ''' Iterate through all plugins of this type, in configured paths (no collections) @@ -1125,6 +1151,7 @@ class Jinja2Loader(PluginLoader): # FUTURE: now that the resulting plugins are closer, refactor base class method with some extra # hooks so we can avoid all the duplicated plugin metadata logic, and also cache the collection results properly here + @proxy_load def get_with_context(self, name, *args, **kwargs): # pop N/A kwargs to avoid passthrough to parent methods kwargs.pop('class_only', False) @@ -1258,6 +1285,7 @@ class Jinja2Loader(PluginLoader): return get_with_context_result(plugin, context) + @proxy_load def all(self, *args, **kwargs): kwargs.pop('_dedupe', None) path_only = kwargs.pop('path_only', False)