Simplify the API, make Broker optional and auto-shutdown on main thread exit.

wip-fakessh-exit-status
David Wilson 7 years ago
parent f077b5b092
commit f1f36cec35

@ -351,7 +351,7 @@ Router Class
.. currentmodule:: mitogen.master
.. class:: Router
.. class:: Router (broker=None)
Extend :py:class:`mitogen.core.Router` with functionality useful to
masters, and child contexts who later become masters. Currently when this
@ -365,6 +365,10 @@ Router Class
domains, for example, manipulating infrastructure belonging to separate
customers or projects.
:param mitogen.master.Broker broker:
:py:class:`Broker` instance to use. If not specified, a private
:py:class:`Broker` is created.
.. data:: profiling
When enabled, causes the broker thread and any subsequent broker and
@ -828,7 +832,7 @@ Broker Class
.. currentmodule:: mitogen.master
.. class:: Broker
.. class:: Broker (install_watcher=True)
.. note::
@ -838,6 +842,16 @@ Broker Class
differing lifetimes. For example, a subscription service where
non-payment results in termination for one customer.
:param bool install_watcher:
If ``True``, an additional thread is started to monitor the lifetime of
the main thread, triggering :py:meth:`shutdown` automatically in case
the user forgets to call it, or their code crashed.
You should not rely on this functionality in your program, it is only
intended as a fail-safe and to simplify the API for new users. In
particular, alternative Python implementations may not be able to
support watching the main thread.
.. attribute:: shutdown_timeout = 5.0
Seconds grace to allow :py:class:`streams <Stream>` to shutdown

@ -246,6 +246,18 @@ def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'),
co.co_consts[arg2] or ())
def join_thread_async(target_thread, on_join):
"""Start a thread that waits for another thread to shutdown, before
invoking `on_join()`. In CPython it seems possible to use this method to
ensure a non-main thread is signalled when the main thread has exitted,
using yet another thread as a proxy."""
def _watch():
target_thread.join()
on_join()
thread = threading.Thread(target=_watch)
thread.start()
class SelectError(mitogen.core.Error):
pass
@ -791,6 +803,11 @@ class Stream(mitogen.core.Stream):
class Broker(mitogen.core.Broker):
shutdown_timeout = 5.0
def __init__(self, install_watcher=True):
if install_watcher:
join_thread_async(threading.currentThread(), self.shutdown)
super(Broker, self).__init__()
class Context(mitogen.core.Context):
via = None
@ -914,12 +931,15 @@ class ChildIdAllocator(object):
class Router(mitogen.core.Router):
broker_class = Broker
debug = False
profiling = False
def __init__(self, *args, **kwargs):
super(Router, self).__init__(*args, **kwargs)
def __init__(self, broker=None):
if broker is None:
broker = self.broker_class()
super(Router, self).__init__(broker)
self.id_allocator = IdAllocator(self)
self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self)

Loading…
Cancel
Save