diff --git a/lib/ansible/module_utils/_internal/__init__.py b/lib/ansible/module_utils/_internal/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/lib/ansible/module_utils/_internal/_concurrent/__init__.py b/lib/ansible/module_utils/_internal/_concurrent/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/lib/ansible/module_utils/_internal/_concurrent/_daemon_threading.py b/lib/ansible/module_utils/_internal/_concurrent/_daemon_threading.py new file mode 100644 index 00000000000..0b32a062fed --- /dev/null +++ b/lib/ansible/module_utils/_internal/_concurrent/_daemon_threading.py @@ -0,0 +1,28 @@ +"""Proxy stdlib threading module that only supports non-joinable daemon threads.""" +# NB: all new local module attrs are _ prefixed to ensure an identical public attribute surface area to the module we're proxying + +from __future__ import annotations as _annotations + +import threading as _threading +import typing as _t + + +class _DaemonThread(_threading.Thread): + """ + Daemon-only Thread subclass; prevents running threads of this type from blocking interpreter shutdown and process exit. + The join() method is a no-op. + """ + + def __init__(self, *args, daemon: bool | None = None, **kwargs) -> None: + super().__init__(*args, daemon=daemon or True, **kwargs) + + def join(self, timeout=None) -> None: + """ThreadPoolExecutor's atexit handler joins all queue threads before allowing shutdown; prevent them from blocking.""" + + +Thread = _DaemonThread # shadow the real Thread attr with our _DaemonThread + + +def __getattr__(name: str) -> _t.Any: + """Delegate anything not defined locally to the real `threading` module.""" + return getattr(_threading, name) diff --git a/lib/ansible/module_utils/_internal/_concurrent/_futures.py b/lib/ansible/module_utils/_internal/_concurrent/_futures.py new file mode 100644 index 00000000000..2ca493f6873 --- /dev/null +++ b/lib/ansible/module_utils/_internal/_concurrent/_futures.py @@ -0,0 +1,21 @@ +"""Utilities for concurrent code execution using futures.""" + +from __future__ import annotations + +import concurrent.futures +import types + +from . import _daemon_threading + + +class DaemonThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): + """ThreadPoolExecutor subclass that creates non-joinable daemon threads for non-blocking pool and process shutdown with abandoned threads.""" + + atc = concurrent.futures.ThreadPoolExecutor._adjust_thread_count + + # clone the base class `_adjust_thread_count` method with a copy of its globals dict + _adjust_thread_count = types.FunctionType(atc.__code__, atc.__globals__.copy(), name=atc.__name__, argdefs=atc.__defaults__, closure=atc.__closure__) + # patch the method closure's `threading` module import to use our daemon-only thread factory instead + _adjust_thread_count.__globals__.update(threading=_daemon_threading) + + del atc # don't expose this as a class attribute diff --git a/test/units/module_utils/_internal/__init__.py b/test/units/module_utils/_internal/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/units/module_utils/_internal/_concurrent/__init__.py b/test/units/module_utils/_internal/_concurrent/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/units/module_utils/_internal/_concurrent/test_daemon_threading.py b/test/units/module_utils/_internal/_concurrent/test_daemon_threading.py new file mode 100644 index 00000000000..4140fae1aea --- /dev/null +++ b/test/units/module_utils/_internal/_concurrent/test_daemon_threading.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +import threading + +from ansible.module_utils._internal._concurrent import _daemon_threading + + +def test_daemon_thread_getattr() -> None: + """Ensure that the threading module proxy delegates properly to the real module.""" + assert _daemon_threading.current_thread is threading.current_thread + + +def test_daemon_threading_thread_override() -> None: + """Ensure that the proxy module's Thread attribute is different from the real module's.""" + assert _daemon_threading.Thread is not threading.Thread diff --git a/test/units/module_utils/_internal/_concurrent/test_futures.py b/test/units/module_utils/_internal/_concurrent/test_futures.py new file mode 100644 index 00000000000..71e032da27c --- /dev/null +++ b/test/units/module_utils/_internal/_concurrent/test_futures.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import concurrent.futures as _cf +import subprocess +import sys +import time + +import pytest + +from ansible.module_utils._internal._concurrent import _futures + + +def test_daemon_thread_pool_nonblocking_cm_exit() -> None: + """Ensure that the ThreadPoolExecutor context manager exit is not blocked by in-flight tasks.""" + with _futures.DaemonThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(time.sleep, 5) + + with pytest.raises(_cf.TimeoutError): # deprecated: description='aliased to stdlib TimeoutError in 3.11' python_version='3.10' + future.result(timeout=1) + + assert future.running() # ensure the future is still going (ie, we didn't have to wait for it to return) + + +_task_success_msg = "work completed" +_process_success_msg = "exit success" +_timeout_sec = 3 +_sleep_time_sec = _timeout_sec * 2 + + +def test_blocking_shutdown() -> None: + """Run with the DaemonThreadPoolExecutor patch disabled to verify that shutdown is blocked by in-flight tasks.""" + with pytest.raises(subprocess.TimeoutExpired): + subprocess.run(args=[sys.executable, __file__, 'block'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True, timeout=_timeout_sec) + + +def test_non_blocking_shutdown() -> None: + """Run with the DaemonThreadPoolExecutor patch enabled to verify that shutdown is not blocked by in-flight tasks.""" + cp = subprocess.run(args=[sys.executable, __file__, ''], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True, timeout=_timeout_sec) + + assert _task_success_msg in cp.stdout + assert _process_success_msg in cp.stdout + + +def _run_blocking_exit_test(use_patched: bool) -> None: # pragma: nocover + """Helper for external process integration test.""" + tpe_type = _futures.DaemonThreadPoolExecutor if use_patched else _cf.ThreadPoolExecutor + + with tpe_type(max_workers=2) as tp: + fs_non_blocking = tp.submit(lambda: print(_task_success_msg)) + assert [tp.submit(time.sleep, _sleep_time_sec) for _idx in range(4)] # not a pointless statement + fs_non_blocking.result(timeout=1) + + print(_process_success_msg) + + +def main() -> None: # pragma: nocover + """Used by test_(non)blocking_shutdown as a script-style run.""" + _run_blocking_exit_test(sys.argv[1] != 'block') + + +if __name__ == '__main__': # pragma: nocover + main()