core: docstring tidyups

issue510
David Wilson 6 years ago
parent 81a68223d4
commit 1eb08fb5c5

@ -592,7 +592,9 @@ class Message(object):
def is_dead(self): def is_dead(self):
""" """
:data:`True` if :attr:`reply_to` is set to the magic value :data:`True` if :attr:`reply_to` is set to the magic value
:data:`IS_DEAD`, indicating the sender considers the channel dead. :data:`IS_DEAD`, indicating the sender considers the channel dead. Dead
messages can be raised in a variety of circumstances, see
:data:`IS_DEAD` for more information.
""" """
return self.reply_to == IS_DEAD return self.reply_to == IS_DEAD
@ -2010,18 +2012,26 @@ class Waker(BasicStream):
if e.args[0] != errno.EBADF: if e.args[0] != errno.EBADF:
raise raise
dead_msg = ( broker_shutdown_msg = (
"An attempt was made to enqueue a message with a Broker that has " "An attempt was made to enqueue a message with a Broker that has "
"already begun shutting down. If you are receiving this message, it " "already begun shutting down. It is likely your program called "
"is likely your program indirectly called Broker.shutdown() too early." "Broker.shutdown() too early."
) )
def defer(self, func, *args, **kwargs): def defer(self, func, *args, **kwargs):
"""
Arrange for `func()` to execute on the broker thread. This function
returns immediately without waiting the result of `func()`. Use
:meth:`defer_sync` to block until a result is available.
:raises mitogen.core.Error:
:meth:`defer` was called after :class:`Broker` has begun shutdown.
"""
if threading.currentThread().ident == self.broker_ident: if threading.currentThread().ident == self.broker_ident:
_vv and IOLOG.debug('%r.defer() [immediate]', self) _vv and IOLOG.debug('%r.defer() [immediate]', self)
return func(*args, **kwargs) return func(*args, **kwargs)
if not self._broker._alive: if not self._broker._alive:
raise Error(self.dead_msg) raise Error(self.broker_shutdown_msg)
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd)
self._lock.acquire() self._lock.acquire()
@ -2088,8 +2098,7 @@ class Router(object):
""" """
Route messages between contexts, and invoke local handlers for messages Route messages between contexts, and invoke local handlers for messages
addressed to this context. :meth:`Router.route() <route>` straddles the addressed to this context. :meth:`Router.route() <route>` straddles the
:class:`Broker <mitogen.core.Broker>` and user threads, it is safe to call :class:`Broker` thread and user threads, it is safe to call anywhere.
anywhere.
**Note:** This is the somewhat limited core version of the Router class **Note:** This is the somewhat limited core version of the Router class
used by child contexts. The master subclass is documented below this one. used by child contexts. The master subclass is documented below this one.
@ -2225,9 +2234,10 @@ class Router(object):
def add_handler(self, fn, handle=None, persist=True, def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None): policy=None, respondent=None):
""" """
Invoke `fn(msg)` for each Message sent to `handle` from this context. Invoke `fn(msg)` on the :class:`Broker` thread for each Message sent to
Unregister after one invocation if `persist` is :data:`False`. If `handle` from this context. Unregister after one invocation if
`handle` is :data:`None`, a new handle is allocated and returned. `persist` is :data:`False`. If `handle` is :data:`None`, a new handle
is allocated and returned.
:param int handle: :param int handle:
If not :data:`None`, an explicit handle to register, usually one of If not :data:`None`, an explicit handle to register, usually one of
@ -2350,7 +2360,7 @@ class Router(object):
using the local handlers. using the local handlers.
This is a lower overhead version of :meth:`route` that may only be This is a lower overhead version of :meth:`route` that may only be
called from the I/O multiplexer thread. called from the :class:`Broker` thread.
:param Stream in_stream: :param Stream in_stream:
If not :data:`None`, the stream the message arrived on. Used for If not :data:`None`, the stream the message arrived on. Used for
@ -2508,7 +2518,7 @@ class Broker(object):
def defer_sync(self, func): def defer_sync(self, func):
""" """
Arrange for `func()` to execute on the broker thread, blocking the Arrange for `func()` to execute on :class:`Broker` thread, blocking the
current thread until a result or exception is available. current thread until a result or exception is available.
:returns: :returns:
@ -2527,6 +2537,10 @@ class Broker(object):
return res return res
def _call(self, stream, func): def _call(self, stream, func):
"""
Call `func(self)`, catching any exception that might occur, logging it,
and force-disconnecting the related `stream`.
"""
try: try:
func(self) func(self)
except Exception: except Exception:
@ -2534,14 +2548,25 @@ class Broker(object):
stream.on_disconnect(self) stream.on_disconnect(self)
def _loop_once(self, timeout=None): def _loop_once(self, timeout=None):
"""
Execute a single :class:`Poller` wait, dispatching any IO events that
caused the wait to complete.
:param float timeout:
If not :data:`None`, maximum time in seconds to wait for events.
"""
_vv and IOLOG.debug('%r._loop_once(%r, %r)', _vv and IOLOG.debug('%r._loop_once(%r, %r)',
self, timeout, self.poller) self, timeout, self.poller)
#IOLOG.debug('readers =\n%s', pformat(self.poller.readers)) #IOLOG.debug('readers =\n%s', pformat(self.poller.readers))
#IOLOG.debug('writers =\n%s', pformat(self.poller.writers)) #IOLOG.debug('writers =\n%s', pformat(self.poller.writers))
for (side, func) in self.poller.poll(timeout): for side, func in self.poller.poll(timeout):
self._call(side.stream, func) self._call(side.stream, func)
def _broker_exit(self): def _broker_exit(self):
"""
Forcefully call :meth:`Stream.on_disconnect` on any streams that failed
to shut down gracefully, then discard the :class:`Poller`.
"""
for _, (side, _) in self.poller.readers + self.poller.writers: for _, (side, _) in self.poller.readers + self.poller.writers:
LOG.error('_broker_main() force disconnecting %r', side) LOG.error('_broker_main() force disconnecting %r', side)
side.stream.on_disconnect(self) side.stream.on_disconnect(self)
@ -2549,6 +2574,12 @@ class Broker(object):
self.poller.close() self.poller.close()
def _broker_shutdown(self): def _broker_shutdown(self):
"""
Invoke :meth:`Stream.on_shutdown` for every active stream, then allow
up to :attr:`shutdown_timeout` seconds for the streams to unregister
themselves, logging an error if any did not unregister during the grace
period.
"""
for _, (side, _) in self.poller.readers + self.poller.writers: for _, (side, _) in self.poller.readers + self.poller.writers:
self._call(side.stream, side.stream.on_shutdown) self._call(side.stream, side.stream.on_shutdown)
@ -2564,10 +2595,8 @@ class Broker(object):
def _broker_main(self): def _broker_main(self):
""" """
Handle events until :meth:`shutdown`. On shutdown, invoke Broker thread main function. Dispatches IO events until
:meth:`Stream.on_shutdown` for every active stream, then allow up to :meth:`shutdown` is called.
:attr:`shutdown_timeout` seconds for the streams to unregister
themselves before forcefully calling :meth:`Stream.on_disconnect`.
""" """
try: try:
while self._alive: while self._alive:
@ -2603,6 +2632,16 @@ class Broker(object):
class Dispatcher(object): class Dispatcher(object):
"""
Implementation of the :data:`CALL_FUNCTION` handle for a child context.
Listens on the child's main thread for messages sent by
:class:`mitogen.parent.CallChain` and dispatches the function calls they
describe.
If a :class:`mitogen.parent.CallChain` sending a message is in pipelined
mode, any exception that occurs is recorded, and causes all subsequent
calls with the same `chain_id` to fail with the same exception.
"""
def __init__(self, econtext): def __init__(self, econtext):
self.econtext = econtext self.econtext = econtext
#: Chain ID -> CallError if prior call failed. #: Chain ID -> CallError if prior call failed.

@ -43,7 +43,7 @@ class DeferTest(testlib.TestCase):
e = self.assertRaises(mitogen.core.Error, e = self.assertRaises(mitogen.core.Error,
lambda: broker.defer(lambda: latch.put(123))) lambda: broker.defer(lambda: latch.put(123)))
self.assertEquals(e.args[0], mitogen.core.Waker.dead_msg) self.assertEquals(e.args[0], mitogen.core.Waker.broker_shutdown_msg)
class DeferSyncTest(testlib.TestCase): class DeferSyncTest(testlib.TestCase):

Loading…
Cancel
Save