diff --git a/mitogen/core.py b/mitogen/core.py index 0c738eb1..25816e18 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -592,7 +592,9 @@ class Message(object): def is_dead(self): """ :data:`True` if :attr:`reply_to` is set to the magic value - :data:`IS_DEAD`, indicating the sender considers the channel dead. + :data:`IS_DEAD`, indicating the sender considers the channel dead. Dead + messages can be raised in a variety of circumstances, see + :data:`IS_DEAD` for more information. """ return self.reply_to == IS_DEAD @@ -2010,18 +2012,26 @@ class Waker(BasicStream): if e.args[0] != errno.EBADF: raise - dead_msg = ( + broker_shutdown_msg = ( "An attempt was made to enqueue a message with a Broker that has " - "already begun shutting down. If you are receiving this message, it " - "is likely your program indirectly called Broker.shutdown() too early." + "already begun shutting down. It is likely your program called " + "Broker.shutdown() too early." ) def defer(self, func, *args, **kwargs): + """ + Arrange for `func()` to execute on the broker thread. This function + returns immediately without waiting the result of `func()`. Use + :meth:`defer_sync` to block until a result is available. + + :raises mitogen.core.Error: + :meth:`defer` was called after :class:`Broker` has begun shutdown. + """ if threading.currentThread().ident == self.broker_ident: _vv and IOLOG.debug('%r.defer() [immediate]', self) return func(*args, **kwargs) if not self._broker._alive: - raise Error(self.dead_msg) + raise Error(self.broker_shutdown_msg) _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) self._lock.acquire() @@ -2088,8 +2098,7 @@ class Router(object): """ Route messages between contexts, and invoke local handlers for messages addressed to this context. :meth:`Router.route() ` straddles the - :class:`Broker ` and user threads, it is safe to call - anywhere. + :class:`Broker` thread and user threads, it is safe to call anywhere. **Note:** This is the somewhat limited core version of the Router class used by child contexts. The master subclass is documented below this one. @@ -2225,9 +2234,10 @@ class Router(object): def add_handler(self, fn, handle=None, persist=True, policy=None, respondent=None): """ - Invoke `fn(msg)` for each Message sent to `handle` from this context. - Unregister after one invocation if `persist` is :data:`False`. If - `handle` is :data:`None`, a new handle is allocated and returned. + Invoke `fn(msg)` on the :class:`Broker` thread for each Message sent to + `handle` from this context. Unregister after one invocation if + `persist` is :data:`False`. If `handle` is :data:`None`, a new handle + is allocated and returned. :param int handle: If not :data:`None`, an explicit handle to register, usually one of @@ -2350,7 +2360,7 @@ class Router(object): using the local handlers. This is a lower overhead version of :meth:`route` that may only be - called from the I/O multiplexer thread. + called from the :class:`Broker` thread. :param Stream in_stream: If not :data:`None`, the stream the message arrived on. Used for @@ -2508,7 +2518,7 @@ class Broker(object): def defer_sync(self, func): """ - Arrange for `func()` to execute on the broker thread, blocking the + Arrange for `func()` to execute on :class:`Broker` thread, blocking the current thread until a result or exception is available. :returns: @@ -2527,6 +2537,10 @@ class Broker(object): return res def _call(self, stream, func): + """ + Call `func(self)`, catching any exception that might occur, logging it, + and force-disconnecting the related `stream`. + """ try: func(self) except Exception: @@ -2534,14 +2548,25 @@ class Broker(object): stream.on_disconnect(self) def _loop_once(self, timeout=None): + """ + Execute a single :class:`Poller` wait, dispatching any IO events that + caused the wait to complete. + + :param float timeout: + If not :data:`None`, maximum time in seconds to wait for events. + """ _vv and IOLOG.debug('%r._loop_once(%r, %r)', self, timeout, self.poller) #IOLOG.debug('readers =\n%s', pformat(self.poller.readers)) #IOLOG.debug('writers =\n%s', pformat(self.poller.writers)) - for (side, func) in self.poller.poll(timeout): + for side, func in self.poller.poll(timeout): self._call(side.stream, func) def _broker_exit(self): + """ + Forcefully call :meth:`Stream.on_disconnect` on any streams that failed + to shut down gracefully, then discard the :class:`Poller`. + """ for _, (side, _) in self.poller.readers + self.poller.writers: LOG.error('_broker_main() force disconnecting %r', side) side.stream.on_disconnect(self) @@ -2549,6 +2574,12 @@ class Broker(object): self.poller.close() def _broker_shutdown(self): + """ + Invoke :meth:`Stream.on_shutdown` for every active stream, then allow + up to :attr:`shutdown_timeout` seconds for the streams to unregister + themselves, logging an error if any did not unregister during the grace + period. + """ for _, (side, _) in self.poller.readers + self.poller.writers: self._call(side.stream, side.stream.on_shutdown) @@ -2564,10 +2595,8 @@ class Broker(object): def _broker_main(self): """ - Handle events until :meth:`shutdown`. On shutdown, invoke - :meth:`Stream.on_shutdown` for every active stream, then allow up to - :attr:`shutdown_timeout` seconds for the streams to unregister - themselves before forcefully calling :meth:`Stream.on_disconnect`. + Broker thread main function. Dispatches IO events until + :meth:`shutdown` is called. """ try: while self._alive: @@ -2603,6 +2632,16 @@ class Broker(object): class Dispatcher(object): + """ + Implementation of the :data:`CALL_FUNCTION` handle for a child context. + Listens on the child's main thread for messages sent by + :class:`mitogen.parent.CallChain` and dispatches the function calls they + describe. + + If a :class:`mitogen.parent.CallChain` sending a message is in pipelined + mode, any exception that occurs is recorded, and causes all subsequent + calls with the same `chain_id` to fail with the same exception. + """ def __init__(self, econtext): self.econtext = econtext #: Chain ID -> CallError if prior call failed. diff --git a/tests/broker_test.py b/tests/broker_test.py index 0a9e51a2..23839a54 100644 --- a/tests/broker_test.py +++ b/tests/broker_test.py @@ -43,7 +43,7 @@ class DeferTest(testlib.TestCase): e = self.assertRaises(mitogen.core.Error, lambda: broker.defer(lambda: latch.put(123))) - self.assertEquals(e.args[0], mitogen.core.Waker.dead_msg) + self.assertEquals(e.args[0], mitogen.core.Waker.broker_shutdown_msg) class DeferSyncTest(testlib.TestCase):