diff --git a/docs/api.rst b/docs/api.rst index 5f91cd83..9c09b0ec 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -351,7 +351,7 @@ Router Class .. currentmodule:: mitogen.master -.. class:: Router +.. class:: Router (broker=None) Extend :py:class:`mitogen.core.Router` with functionality useful to masters, and child contexts who later become masters. Currently when this @@ -365,6 +365,10 @@ Router Class domains, for example, manipulating infrastructure belonging to separate customers or projects. + :param mitogen.master.Broker broker: + :py:class:`Broker` instance to use. If not specified, a private + :py:class:`Broker` is created. + .. data:: profiling When enabled, causes the broker thread and any subsequent broker and @@ -828,7 +832,7 @@ Broker Class .. currentmodule:: mitogen.master -.. class:: Broker +.. class:: Broker (install_watcher=True) .. note:: @@ -838,6 +842,16 @@ Broker Class differing lifetimes. For example, a subscription service where non-payment results in termination for one customer. + :param bool install_watcher: + If ``True``, an additional thread is started to monitor the lifetime of + the main thread, triggering :py:meth:`shutdown` automatically in case + the user forgets to call it, or their code crashed. + + You should not rely on this functionality in your program, it is only + intended as a fail-safe and to simplify the API for new users. In + particular, alternative Python implementations may not be able to + support watching the main thread. + .. attribute:: shutdown_timeout = 5.0 Seconds grace to allow :py:class:`streams ` to shutdown diff --git a/mitogen/master.py b/mitogen/master.py index ca676299..d74b9179 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -246,6 +246,18 @@ def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'), co.co_consts[arg2] or ()) +def join_thread_async(target_thread, on_join): + """Start a thread that waits for another thread to shutdown, before + invoking `on_join()`. In CPython it seems possible to use this method to + ensure a non-main thread is signalled when the main thread has exitted, + using yet another thread as a proxy.""" + def _watch(): + target_thread.join() + on_join() + thread = threading.Thread(target=_watch) + thread.start() + + class SelectError(mitogen.core.Error): pass @@ -791,6 +803,11 @@ class Stream(mitogen.core.Stream): class Broker(mitogen.core.Broker): shutdown_timeout = 5.0 + def __init__(self, install_watcher=True): + if install_watcher: + join_thread_async(threading.currentThread(), self.shutdown) + super(Broker, self).__init__() + class Context(mitogen.core.Context): via = None @@ -914,12 +931,15 @@ class ChildIdAllocator(object): class Router(mitogen.core.Router): + broker_class = Broker debug = False profiling = False - def __init__(self, *args, **kwargs): - super(Router, self).__init__(*args, **kwargs) + def __init__(self, broker=None): + if broker is None: + broker = self.broker_class() + super(Router, self).__init__(broker) self.id_allocator = IdAllocator(self) self.responder = ModuleResponder(self) self.log_forwarder = LogForwarder(self)