You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/lib/ansible_test/_internal/thread.py

83 lines
2.5 KiB
Python

"""Python threading tools."""
from __future__ import annotations
import collections.abc as c
import contextlib
import functools
import sys
import threading
import queue
import typing as t
TCallable = t.TypeVar('TCallable', bound=t.Callable[..., t.Any])
class WrappedThread(threading.Thread):
"""Wrapper around Thread which captures results and exceptions."""
def __init__(self, action: c.Callable[[], t.Any]) -> None:
super().__init__()
self._result: queue.Queue[t.Any] = queue.Queue()
self.action = action
self.result = None
def run(self) -> None:
"""
Run action and capture results or exception.
Do not override. Do not call directly. Executed by the start() method.
"""
# We truly want to catch anything that the worker thread might do including call sys.exit.
# Therefore, we catch *everything* (including old-style class exceptions)
# noinspection PyBroadException
try:
self._result.put((self.action(), None))
# pylint: disable=locally-disabled, bare-except
except: # noqa
self._result.put((None, sys.exc_info()))
def wait_for_result(self) -> t.Any:
"""Wait for thread to exit and return the result or raise an exception."""
result, exception = self._result.get()
if exception:
raise exception[1].with_traceback(exception[2])
self.result = result
return result
def mutex(func: TCallable) -> TCallable:
"""Enforce exclusive access on a decorated function."""
lock = threading.Lock()
@functools.wraps(func)
def wrapper(*args, **kwargs):
"""Wrapper around `func` which uses a lock to provide exclusive access to the function."""
with lock:
return func(*args, **kwargs)
return wrapper # type: ignore[return-value] # requires https://www.python.org/dev/peps/pep-0612/ support
__named_lock = threading.Lock()
__named_locks: dict[str, threading.Lock] = {}
@contextlib.contextmanager
def named_lock(name: str) -> c.Iterator[bool]:
"""
Context manager that provides named locks using threading.Lock instances.
Once named lock instances are created they are not deleted.
Returns True if this is the first instance of the named lock, otherwise False.
"""
with __named_lock:
if lock_instance := __named_locks.get(name):
first = False
else:
first = True
lock_instance = __named_locks[name] = threading.Lock()
with lock_instance:
yield first