Add DaemonThreadPoolExecutor impl (#83880)

* Add DaemonThreadPoolExecutor impl
* Provide a simple parallel execution method with the ability to abandon timed-out operations that won't block threadpool/process shutdown, and without a dependency on /dev/shm (as multiprocessing Thread/Process pools have).
* Create module_utils/_internal to ensure that this is clearly not supported for public consumption.
pull/83887/head
Matt Davis 3 months ago committed by GitHub
parent 1a4644ff15
commit 24e5b0d4fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,28 @@
"""Proxy stdlib threading module that only supports non-joinable daemon threads."""
# NB: all new local module attrs are _ prefixed to ensure an identical public attribute surface area to the module we're proxying
from __future__ import annotations as _annotations
import threading as _threading
import typing as _t
class _DaemonThread(_threading.Thread):
"""
Daemon-only Thread subclass; prevents running threads of this type from blocking interpreter shutdown and process exit.
The join() method is a no-op.
"""
def __init__(self, *args, daemon: bool | None = None, **kwargs) -> None:
super().__init__(*args, daemon=daemon or True, **kwargs)
def join(self, timeout=None) -> None:
"""ThreadPoolExecutor's atexit handler joins all queue threads before allowing shutdown; prevent them from blocking."""
Thread = _DaemonThread # shadow the real Thread attr with our _DaemonThread
def __getattr__(name: str) -> _t.Any:
"""Delegate anything not defined locally to the real `threading` module."""
return getattr(_threading, name)

@ -0,0 +1,21 @@
"""Utilities for concurrent code execution using futures."""
from __future__ import annotations
import concurrent.futures
import types
from . import _daemon_threading
class DaemonThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"""ThreadPoolExecutor subclass that creates non-joinable daemon threads for non-blocking pool and process shutdown with abandoned threads."""
atc = concurrent.futures.ThreadPoolExecutor._adjust_thread_count
# clone the base class `_adjust_thread_count` method with a copy of its globals dict
_adjust_thread_count = types.FunctionType(atc.__code__, atc.__globals__.copy(), name=atc.__name__, argdefs=atc.__defaults__, closure=atc.__closure__)
# patch the method closure's `threading` module import to use our daemon-only thread factory instead
_adjust_thread_count.__globals__.update(threading=_daemon_threading)
del atc # don't expose this as a class attribute

@ -0,0 +1,15 @@
from __future__ import annotations
import threading
from ansible.module_utils._internal._concurrent import _daemon_threading
def test_daemon_thread_getattr() -> None:
"""Ensure that the threading module proxy delegates properly to the real module."""
assert _daemon_threading.current_thread is threading.current_thread
def test_daemon_threading_thread_override() -> None:
"""Ensure that the proxy module's Thread attribute is different from the real module's."""
assert _daemon_threading.Thread is not threading.Thread

@ -0,0 +1,62 @@
from __future__ import annotations
import concurrent.futures as _cf
import subprocess
import sys
import time
import pytest
from ansible.module_utils._internal._concurrent import _futures
def test_daemon_thread_pool_nonblocking_cm_exit() -> None:
"""Ensure that the ThreadPoolExecutor context manager exit is not blocked by in-flight tasks."""
with _futures.DaemonThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 5)
with pytest.raises(_cf.TimeoutError): # deprecated: description='aliased to stdlib TimeoutError in 3.11' python_version='3.10'
future.result(timeout=1)
assert future.running() # ensure the future is still going (ie, we didn't have to wait for it to return)
_task_success_msg = "work completed"
_process_success_msg = "exit success"
_timeout_sec = 3
_sleep_time_sec = _timeout_sec * 2
def test_blocking_shutdown() -> None:
"""Run with the DaemonThreadPoolExecutor patch disabled to verify that shutdown is blocked by in-flight tasks."""
with pytest.raises(subprocess.TimeoutExpired):
subprocess.run(args=[sys.executable, __file__, 'block'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True, timeout=_timeout_sec)
def test_non_blocking_shutdown() -> None:
"""Run with the DaemonThreadPoolExecutor patch enabled to verify that shutdown is not blocked by in-flight tasks."""
cp = subprocess.run(args=[sys.executable, __file__, ''], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True, timeout=_timeout_sec)
assert _task_success_msg in cp.stdout
assert _process_success_msg in cp.stdout
def _run_blocking_exit_test(use_patched: bool) -> None: # pragma: nocover
"""Helper for external process integration test."""
tpe_type = _futures.DaemonThreadPoolExecutor if use_patched else _cf.ThreadPoolExecutor
with tpe_type(max_workers=2) as tp:
fs_non_blocking = tp.submit(lambda: print(_task_success_msg))
assert [tp.submit(time.sleep, _sleep_time_sec) for _idx in range(4)] # not a pointless statement
fs_non_blocking.result(timeout=1)
print(_process_success_msg)
def main() -> None: # pragma: nocover
"""Used by test_(non)blocking_shutdown as a script-style run."""
_run_blocking_exit_test(sys.argv[1] != 'block')
if __name__ == '__main__': # pragma: nocover
main()
Loading…
Cancel
Save