From 74cf9c3c962961fab97c9ef5a7e21ad1f245ad98 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 4 Oct 2018 19:15:59 +0000 Subject: [PATCH] master: document ThreadWatcher --- mitogen/master.py | 56 ++++++++++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/mitogen/master.py b/mitogen/master.py index 0a434a94..22ea0fc1 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -179,24 +179,35 @@ def scan_code_imports(co): class ThreadWatcher(object): """ - Manage threads that waits for nother threads to shutdown, before invoking - `on_join()`. In CPython it seems possible to use this method to ensure a - non-main thread is signalled when the main thread has exitted, using yet - another thread as a proxy. + Manage threads that wait for another thread to shut down, before invoking + `on_join()` for each associated ThreadWatcher. + + In CPython it seems possible to use this method to ensure a non-main thread + is signalled when the main thread has exited, using a third thread as a + proxy. """ - _lock = threading.Lock() - _pid = None - _instances_by_target = {} - _thread_by_target = {} + #: Protects remaining _cls_* members. + _cls_lock = threading.Lock() + + #: PID of the process that last modified the class data. If the PID + #: changes, it means the thread watch dict refers to threads that no longer + #: exist in the current process (since it forked), and so must be reset. + _cls_pid = None + + #: Map watched Thread -> list of ThreadWatcher instances. + _cls_instances_by_target = {} + + #: Map watched Thread -> watcher Thread for each watched thread. + _cls_thread_by_target = {} @classmethod def _reset(cls): """If we have forked since the watch dictionaries were initialized, all that has is garbage, so clear it.""" - if os.getpid() != cls._pid: - cls._pid = os.getpid() - cls._instances_by_target.clear() - cls._thread_by_target.clear() + if os.getpid() != cls._cls_pid: + cls._cls_pid = os.getpid() + cls._cls_instances_by_target.clear() + cls._cls_thread_by_target.clear() def __init__(self, target, on_join): self.target = target @@ -205,33 +216,34 @@ class ThreadWatcher(object): @classmethod def _watch(cls, target): target.join() - for watcher in cls._instances_by_target[target]: + for watcher in cls._cls_instances_by_target[target]: watcher.on_join() def install(self): - self._lock.acquire() + self._cls_lock.acquire() try: self._reset() - self._instances_by_target.setdefault(self.target, []).append(self) - if self.target not in self._thread_by_target: - self._thread_by_target[self.target] = threading.Thread( + lst = self._cls_instances_by_target.setdefault(self.target, []) + lst.append(self) + if self.target not in self._cls_thread_by_target: + self._cls_thread_by_target[self.target] = threading.Thread( name='mitogen.master.join_thread_async', target=self._watch, args=(self.target,) ) - self._thread_by_target[self.target].start() + self._cls_thread_by_target[self.target].start() finally: - self._lock.release() + self._cls_lock.release() def remove(self): - self._lock.acquire() + self._cls_lock.acquire() try: self._reset() - lst = self._instances_by_target.get(self.target, []) + lst = self._cls_instances_by_target.get(self.target, []) if self in lst: lst.remove(self) finally: - self._lock.release() + self._cls_lock.release() @classmethod def watch(cls, target, on_join):