diff --git a/mitogen/master.py b/mitogen/master.py index 5201d16d..ea7fd4ff 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -117,16 +117,46 @@ def scan_code_imports(co): co.co_consts[arg2] or ()) +_join_lock = threading.Lock() +_join_process_id = None +_join_callbacks_by_target = {} +_join_thread_by_target = {} + + +def _join_thread_reset(): + """If we have forked since the watch dictionaries were initialized, all + that has is garbage, so clear it.""" + global _join_process_id + + if os.getpid() != _join_process_id: + _join_process_id = os.getpid() + _join_callbacks_by_target.clear() + _join_thread_by_target.clear() + + def join_thread_async(target_thread, on_join): """Start a thread that waits for another thread to shutdown, before invoking `on_join()`. In CPython it seems possible to use this method to ensure a non-main thread is signalled when the main thread has exitted, using yet another thread as a proxy.""" + def _watch(): target_thread.join() - on_join() - thread = threading.Thread(target=_watch) - thread.start() + for on_join in _join_callbacks_by_target[target_thread]: + on_join() + + _join_lock.acquire() + try: + _join_thread_reset() + _join_callbacks_by_target.setdefault(target_thread, []).append(on_join) + if target_thread not in _join_thread_by_target: + _join_thread_by_target[target_thread] = threading.Thread( + name='mitogen.master.join_thread_async', + target=_watch, + ) + _join_thread_by_target[target_thread].start() + finally: + _join_lock.release() class SelectError(mitogen.core.Error):