docs: move most remaining docstrings back into *.py; closes #388

The remaining ones are decorators which don't seem to have an autodoc
equivlent.
issue260
David Wilson 6 years ago
parent 711aed7a4c
commit 804bacdadb

File diff suppressed because it is too large Load Diff

@ -126,7 +126,7 @@ Core Library
v0.2.4 (2018-??-??) v0.2.4 (2018-??-??)
------------------ -------------------
Mitogen for Ansible Mitogen for Ansible
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~

@ -35,162 +35,33 @@ Side Class
========== ==========
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Side
.. class:: Side (stream, fd, keep_alive=True) :members:
Represent a single side of a :py:class:`BasicStream`. This exists to allow
streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional
(e.g. UNIX socket) file descriptors to operate identically.
:param mitogen.core.Stream stream:
The stream this side is associated with.
:param int fd:
Underlying file descriptor.
:param bool keep_alive:
Value for :py:attr:`keep_alive`
During construction, the file descriptor has its :py:data:`os.O_NONBLOCK`
flag enabled using :py:func:`fcntl.fcntl`.
.. attribute:: stream
The :py:class:`Stream` for which this is a read or write side.
.. attribute:: fd
Integer file descriptor to perform IO on, or :data:`None` if
:py:meth:`close` has been called.
.. attribute:: keep_alive
If :data:`True`, causes presence of this side in :py:class:`Broker`'s
active reader set to defer shutdown until the side is disconnected.
.. method:: fileno
Return :py:attr:`fd` if it is not :data:`None`, otherwise raise
:py:class:`StreamError`. This method is implemented so that
:py:class:`Side` can be used directly by :py:func:`select.select`.
.. method:: close
Call :py:func:`os.close` on :py:attr:`fd` if it is not :data:`None`,
then set it to :data:`None`.
.. method:: read (n=CHUNK_SIZE)
Read up to `n` bytes from the file descriptor, wrapping the underlying
:py:func:`os.read` call with :py:func:`io_op` to trap common
disconnection conditions.
:py:meth:`read` always behaves as if it is reading from a regular UNIX
file; socket, pipe, and TTY disconnection errors are masked and result
in a 0-sized read just like a regular file.
:returns:
Bytes read, or the empty to string to indicate disconnection was
detected.
.. method:: write (s)
Write as much of the bytes from `s` as possible to the file descriptor,
wrapping the underlying :py:func:`os.write` call with :py:func:`io_op`
to trap common disconnection connditions.
:returns:
Number of bytes written, or :data:`None` if disconnection was
detected.
Stream Classes Stream Classes
============== ==============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: BasicStream
.. class:: BasicStream :members:
.. attribute:: receive_side
A :py:class:`Side` representing the stream's receive file descriptor.
.. attribute:: transmit_side
A :py:class:`Side` representing the stream's transmit file descriptor.
.. method:: on_disconnect (broker)
Called by :py:class:`Broker` to force disconnect the stream. The base
implementation simply closes :py:attr:`receive_side` and
:py:attr:`transmit_side` and unregisters the stream from the broker.
.. method:: on_receive (broker)
Called by :py:class:`Broker` when the stream's :py:attr:`receive_side` has
been marked readable using :py:meth:`Broker.start_receive` and the
broker has detected the associated file descriptor is ready for
reading.
Subclasses must implement this method if
:py:meth:`Broker.start_receive` is ever called on them, and the method
must call :py:meth:`on_disconect` if reading produces an empty string.
.. method:: on_transmit (broker)
Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side`
has been marked writeable using :py:meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this method if
:py:meth:`Broker._start_transmit` is ever called on them.
.. method:: on_shutdown (broker)
Called by :py:meth:`Broker.shutdown` to allow the stream time to
gracefully shutdown. The base implementation simply called
:py:meth:`on_disconnect`.
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. method:: pending_bytes ()
Returns the number of bytes queued for transmission on this stream.
This can be used to limit the amount of data buffered in RAM by an
otherwise unlimited consumer.
For an accurate result, this method should be called from the Broker
thread, using a wrapper like:
::
def get_pending_bytes(self, stream):
latch = mitogen.core.Latch()
self.broker.defer(
lambda: latch.put(stream.pending_bytes())
)
return latch.get()
.. currentmodule:: mitogen.fork .. currentmodule:: mitogen.fork
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. currentmodule:: mitogen.ssh .. currentmodule:: mitogen.ssh
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. currentmodule:: mitogen.sudo .. currentmodule:: mitogen.sudo
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
@ -212,6 +83,7 @@ Poller Class
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Poller .. autoclass:: Poller
:members:
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
.. autoclass:: KqueuePoller .. autoclass:: KqueuePoller
@ -256,64 +128,16 @@ ExternalContext Class
===================== =====================
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: ExternalContext
:members:
.. class:: ExternalContext
External context implementation.
.. attribute:: broker
The :py:class:`mitogen.core.Broker` instance.
.. attribute:: context
The :py:class:`mitogen.core.Context` instance.
.. attribute:: channel
The :py:class:`mitogen.core.Channel` over which
:py:data:`CALL_FUNCTION` requests are received.
.. attribute:: stdout_log
The :py:class:`mitogen.core.IoLogger` connected to ``stdout``.
.. attribute:: importer
The :py:class:`mitogen.core.Importer` instance.
.. attribute:: stdout_log
The :py:class:`IoLogger` connected to ``stdout``.
.. attribute:: stderr_log
The :py:class:`IoLogger` connected to ``stderr``.
.. method:: _dispatch_calls
Implementation for the main thread in every child context.
mitogen.master mitogen.master
============== ==============
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.parent
.. autoclass:: ProcessMonitor
.. class:: ProcessMonitor :members:
Install a :py:data:`signal.SIGCHLD` handler that generates callbacks when a
specific child process has exitted.
.. method:: add (pid, callback)
Add a callback function to be notified of the exit status of a process.
:param int pid:
Process ID to be notified of.
:param callback:
Function invoked as `callback(status)`, where `status` is the raw
exit status of the child process.
Blocking I/O Functions Blocking I/O Functions

@ -100,7 +100,7 @@ CALL_SERVICE = 110
#: * a remote receiver is disconnected or explicitly closed. #: * a remote receiver is disconnected or explicitly closed.
#: * a related message could not be delivered due to no route existing for it. #: * a related message could not be delivered due to no route existing for it.
#: * a router is being torn down, as a sentinel value to notify #: * a router is being torn down, as a sentinel value to notify
#: :py:meth:`mitogen.core.Router.add_handler` callbacks to clean up. #: :meth:`mitogen.core.Router.add_handler` callbacks to clean up.
IS_DEAD = 999 IS_DEAD = 999
try: try:
@ -187,7 +187,7 @@ class Error(Exception):
class LatchError(Error): class LatchError(Error):
"""Raised when an attempt is made to use a :py:class:`mitogen.core.Latch` """Raised when an attempt is made to use a :class:`mitogen.core.Latch`
that has been marked closed.""" that has been marked closed."""
pass pass
@ -239,7 +239,7 @@ class Kwargs(dict):
class CallError(Error): class CallError(Error):
"""Serializable :class:`Error` subclass raised when """Serializable :class:`Error` subclass raised when
:py:meth:`Context.call() <mitogen.parent.Context.call>` fails. A copy of :meth:`Context.call() <mitogen.parent.Context.call>` fails. A copy of
the traceback from the external context is appended to the exception the traceback from the external context is appended to the exception
message.""" message."""
def __init__(self, fmt=None, *args): def __init__(self, fmt=None, *args):
@ -872,6 +872,15 @@ class Receiver(object):
class Channel(Sender, Receiver): class Channel(Sender, Receiver):
"""
A channel inherits from :class:`mitogen.core.Sender` and
`mitogen.core.Receiver` to provide bidirectional functionality.
Since all handles aren't known until after both ends are constructed, for
both ends to communicate through a channel, it is necessary for one end to
retrieve the handle allocated to the other and reconfigure its own channel
to match. Currently this is a manual task.
"""
def __init__(self, router, context, dst_handle, handle=None): def __init__(self, router, context, dst_handle, handle=None):
Sender.__init__(self, context, dst_handle) Sender.__init__(self, context, dst_handle)
Receiver.__init__(self, router, handle) Receiver.__init__(self, router, handle)
@ -1160,12 +1169,35 @@ class LogHandler(logging.Handler):
class Side(object): class Side(object):
"""
Represent a single side of a :class:`BasicStream`. This exists to allow
streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional
(e.g. UNIX socket) file descriptors to operate identically.
:param mitogen.core.Stream stream:
The stream this side is associated with.
:param int fd:
Underlying file descriptor.
:param bool keep_alive:
Value for :attr:`keep_alive`
During construction, the file descriptor has its :data:`os.O_NONBLOCK` flag
enabled using :func:`fcntl.fcntl`.
"""
_fork_refs = weakref.WeakValueDictionary() _fork_refs = weakref.WeakValueDictionary()
def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False): def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False):
#: The :class:`Stream` for which this is a read or write side.
self.stream = stream self.stream = stream
#: Integer file descriptor to perform IO on, or :data:`None` if
#: :meth:`close` has been called.
self.fd = fd self.fd = fd
self.closed = False self.closed = False
#: If :data:`True`, causes presence of this side in
#: :class:`Broker`'s active reader set to defer shutdown until the
#: side is disconnected.
self.keep_alive = keep_alive self.keep_alive = keep_alive
self._fork_refs[id(self)] = self self._fork_refs[id(self)] = self
if cloexec: if cloexec:
@ -1182,12 +1214,29 @@ class Side(object):
side.close() side.close()
def close(self): def close(self):
"""
Call :func:`os.close` on :attr:`fd` if it is not :data:`None`,
then set it to :data:`None`.
"""
if not self.closed: if not self.closed:
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
self.closed = True self.closed = True
os.close(self.fd) os.close(self.fd)
def read(self, n=CHUNK_SIZE): def read(self, n=CHUNK_SIZE):
"""
Read up to `n` bytes from the file descriptor, wrapping the underlying
:func:`os.read` call with :func:`io_op` to trap common disconnection
conditions.
:meth:`read` always behaves as if it is reading from a regular UNIX
file; socket, pipe, and TTY disconnection errors are masked and result
in a 0-sized read like a regular file.
:returns:
Bytes read, or the empty to string to indicate disconnection was
detected.
"""
if self.closed: if self.closed:
# Refuse to touch the handle after closed, it may have been reused # Refuse to touch the handle after closed, it may have been reused
# by another thread. TODO: synchronize read()/write()/close(). # by another thread. TODO: synchronize read()/write()/close().
@ -1198,6 +1247,15 @@ class Side(object):
return s return s
def write(self, s): def write(self, s):
"""
Write as much of the bytes from `s` as possible to the file descriptor,
wrapping the underlying :func:`os.write` call with :func:`io_op` to
trap common disconnection connditions.
:returns:
Number of bytes written, or :data:`None` if disconnection was
detected.
"""
if self.closed or self.fd is None: if self.closed or self.fd is None:
# Refuse to touch the handle after closed, it may have been reused # Refuse to touch the handle after closed, it may have been reused
# by another thread. # by another thread.
@ -1210,10 +1268,50 @@ class Side(object):
class BasicStream(object): class BasicStream(object):
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None receive_side = None
#: A :class:`Side` representing the stream's transmit file descriptor.
transmit_side = None transmit_side = None
def on_receive(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`receive_side` has
been marked readable using :meth:`Broker.start_receive` and the broker
has detected the associated file descriptor is ready for reading.
Subclasses must implement this if :meth:`Broker.start_receive` is ever
called on them, and the method must call :meth:`on_disconect` if
reading produces an empty string.
"""
def on_transmit(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`transmit_side`
has been marked writeable using :meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this if :meth:`Broker._start_transmit` is
ever called on them.
"""
def on_shutdown(self, broker):
"""
Called by :meth:`Broker.shutdown` to allow the stream time to
gracefully shutdown. The base implementation simply called
:meth:`on_disconnect`.
"""
_v and LOG.debug('%r.on_shutdown()', self)
fire(self, 'shutdown')
self.on_disconnect(broker)
def on_disconnect(self, broker): def on_disconnect(self, broker):
"""
Called by :class:`Broker` to force disconnect the stream. The base
implementation simply closes :attr:`receive_side` and
:attr:`transmit_side` and unregisters the stream from the broker.
"""
LOG.debug('%r.on_disconnect()', self) LOG.debug('%r.on_disconnect()', self)
if self.receive_side: if self.receive_side:
broker.stop_receive(self) broker.stop_receive(self)
@ -1223,19 +1321,14 @@ class BasicStream(object):
self.transmit_side.close() self.transmit_side.close()
fire(self, 'disconnect') fire(self, 'disconnect')
def on_shutdown(self, broker):
_v and LOG.debug('%r.on_shutdown()', self)
fire(self, 'shutdown')
self.on_disconnect(broker)
class Stream(BasicStream): class Stream(BasicStream):
""" """
:py:class:`BasicStream` subclass implementing mitogen's :ref:`stream :class:`BasicStream` subclass implementing mitogen's :ref:`stream
protocol <stream-protocol>`. protocol <stream-protocol>`.
""" """
#: If not :data:`None`, :py:class:`Router` stamps this into #: If not :data:`None`, :class:`Router` stamps this into
#: :py:attr:`Message.auth_id` of every message received on this stream. #: :attr:`Message.auth_id` of every message received on this stream.
auth_id = None auth_id = None
#: If not :data:`False`, indicates the stream has :attr:`auth_id` set and #: If not :data:`False`, indicates the stream has :attr:`auth_id` set and
@ -1272,7 +1365,7 @@ class Stream(BasicStream):
def on_receive(self, broker): def on_receive(self, broker):
"""Handle the next complete message on the stream. Raise """Handle the next complete message on the stream. Raise
:py:class:`StreamError` on failure.""" :class:`StreamError` on failure."""
_vv and IOLOG.debug('%r.on_receive()', self) _vv and IOLOG.debug('%r.on_receive()', self)
buf = self.receive_side.read() buf = self.receive_side.read()
@ -1329,6 +1422,14 @@ class Stream(BasicStream):
return True return True
def pending_bytes(self): def pending_bytes(self):
"""
Return the number of bytes queued for transmission on this stream. This
can be used to limit the amount of data buffered in RAM by an otherwise
unlimited consumer.
For an accurate result, this method should be called from the Broker
thread, for example by using :meth:`Broker.defer_sync`.
"""
return self._output_buf_len return self._output_buf_len
def on_transmit(self, broker): def on_transmit(self, broker):
@ -1572,15 +1673,15 @@ class Poller(object):
class Latch(object): class Latch(object):
""" """
A latch is a :py:class:`Queue.Queue`-like object that supports mutation and A latch is a :class:`Queue.Queue`-like object that supports mutation and
waiting from multiple threads, however unlike :py:class:`Queue.Queue`, waiting from multiple threads, however unlike :class:`Queue.Queue`,
waiting threads always remain interruptible, so CTRL+C always succeeds, and waiting threads always remain interruptible, so CTRL+C always succeeds, and
waits where a timeout is set experience no wake up latency. These waits where a timeout is set experience no wake up latency. These
properties are not possible in combination using the built-in threading properties are not possible in combination using the built-in threading
primitives available in Python 2.x. primitives available in Python 2.x.
Latches implement queues using the UNIX self-pipe trick, and a per-thread Latches implement queues using the UNIX self-pipe trick, and a per-thread
:py:func:`socket.socketpair` that is lazily created the first time any :func:`socket.socketpair` that is lazily created the first time any
latch attempts to sleep on a thread, and dynamically associated with the latch attempts to sleep on a thread, and dynamically associated with the
waiting Latch only for duration of the wait. waiting Latch only for duration of the wait.
@ -1626,7 +1727,7 @@ class Latch(object):
def close(self): def close(self):
""" """
Mark the latch as closed, and cause every sleeping thread to be woken, Mark the latch as closed, and cause every sleeping thread to be woken,
with :py:class:`mitogen.core.LatchError` raised in each thread. with :class:`mitogen.core.LatchError` raised in each thread.
""" """
self._lock.acquire() self._lock.acquire()
try: try:
@ -1640,17 +1741,17 @@ class Latch(object):
def empty(self): def empty(self):
""" """
Return :py:data:`True` if calling :py:meth:`get` would block. Return :data:`True` if calling :meth:`get` would block.
As with :py:class:`Queue.Queue`, :py:data:`True` may be returned even As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and message may be posted at any moment between :meth:`empty` and
:py:meth:`get`. :meth:`get`.
As with :py:class:`Queue.Queue`, :py:data:`False` may be returned even As with :class:`Queue.Queue`, :data:`False` may be returned even
though a subsequent call to :py:meth:`get` will block, since another though a subsequent call to :meth:`get` will block, since another
waiting thread may be woken at any moment between :py:meth:`empty` and waiting thread may be woken at any moment between :meth:`empty` and
:py:meth:`get`. :meth:`get`.
""" """
return len(self._queue) == 0 return len(self._queue) == 0
@ -1683,14 +1784,14 @@ class Latch(object):
Return the next enqueued object, or sleep waiting for one. Return the next enqueued object, or sleep waiting for one.
:param float timeout: :param float timeout:
If not :py:data:`None`, specifies a timeout in seconds. If not :data:`None`, specifies a timeout in seconds.
:param bool block: :param bool block:
If :py:data:`False`, immediately raise If :data:`False`, immediately raise
:py:class:`mitogen.core.TimeoutError` if the latch is empty. :class:`mitogen.core.TimeoutError` if the latch is empty.
:raises mitogen.core.LatchError: :raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the object is no longer valid. :meth:`close` has been called, and the object is no longer valid.
:raises mitogen.core.TimeoutError: :raises mitogen.core.TimeoutError:
Timeout was reached. Timeout was reached.
@ -1771,7 +1872,7 @@ class Latch(object):
exists. exists.
:raises mitogen.core.LatchError: :raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the object is no longer valid. :meth:`close` has been called, and the object is no longer valid.
""" """
_vv and IOLOG.debug('%r.put(%r)', self, obj) _vv and IOLOG.debug('%r.put(%r)', self, obj)
self._lock.acquire() self._lock.acquire()
@ -1807,7 +1908,7 @@ class Latch(object):
class Waker(BasicStream): class Waker(BasicStream):
""" """
:py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. :class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_.
Used to wake the multiplexer when another thread needs to modify its state Used to wake the multiplexer when another thread needs to modify its state
(via a cross-thread function call). (via a cross-thread function call).
@ -1893,8 +1994,8 @@ class Waker(BasicStream):
class IoLogger(BasicStream): class IoLogger(BasicStream):
""" """
:py:class:`BasicStream` subclass that sets up redirection of a standard :class:`BasicStream` subclass that sets up redirection of a standard
UNIX file descriptor back into the Python :py:mod:`logging` package. UNIX file descriptor back into the Python :mod:`logging` package.
""" """
_buf = '' _buf = ''
@ -2126,8 +2227,8 @@ class Router(object):
return handle return handle
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Called during :py:meth:`Broker.shutdown`, informs callbacks """Called during :meth:`Broker.shutdown`, informs callbacks registered
registered with :py:meth:`add_handle_cb` the connection is dead.""" with :meth:`add_handle_cb` the connection is dead."""
_v and LOG.debug('%r.on_shutdown(%r)', self, broker) _v and LOG.debug('%r.on_shutdown(%r)', self, broker)
fire(self, 'shutdown') fire(self, 'shutdown')
for handle, (persist, fn) in self._handle_map.iteritems(): for handle, (persist, fn) in self._handle_map.iteritems():
@ -2249,14 +2350,26 @@ class Router(object):
class Broker(object): class Broker(object):
"""
Responsible for handling I/O multiplexing in a private thread.
**Note:** This is the somewhat limited core version of the Broker class
used by child contexts. The master subclass is documented below.
"""
poller_class = Poller poller_class = Poller
_waker = None _waker = None
_thread = None _thread = None
#: Seconds grace to allow :class:`streams <Stream>` to shutdown gracefully
#: before force-disconnecting them during :meth:`shutdown`.
shutdown_timeout = 3.0 shutdown_timeout = 3.0
def __init__(self, poller_class=None): def __init__(self, poller_class=None):
self._alive = True self._alive = True
self._waker = Waker(self) self._waker = Waker(self)
#: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
#: thread, or immediately if the current thread is the broker thread.
#: Safe to call from any thread.
self.defer = self._waker.defer self.defer = self._waker.defer
self.poller = self.poller_class() self.poller = self.poller_class()
self.poller.start_receive( self.poller.start_receive(
@ -2272,6 +2385,12 @@ class Broker(object):
self._waker.broker_ident = self._thread.ident self._waker.broker_ident = self._thread.ident
def start_receive(self, stream): def start_receive(self, stream):
"""
Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as
ready for reading. Safe to call from any thread. When the associated
file descriptor becomes ready for reading,
:meth:`BasicStream.on_receive` will be called.
"""
_vv and IOLOG.debug('%r.start_receive(%r)', self, stream) _vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
side = stream.receive_side side = stream.receive_side
assert side and side.fd is not None assert side and side.fd is not None
@ -2279,26 +2398,47 @@ class Broker(object):
side.fd, (side, stream.on_receive)) side.fd, (side, stream.on_receive))
def stop_receive(self, stream): def stop_receive(self, stream):
"""
Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as not
ready for reading. Safe to call from any thread.
"""
_vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
self.defer(self.poller.stop_receive, stream.receive_side.fd) self.defer(self.poller.stop_receive, stream.receive_side.fd)
def _start_transmit(self, stream): def _start_transmit(self, stream):
"""
Mark the :attr:`transmit_side <Stream.transmit_side>` on `stream` as
ready for writing. Must only be called from the Broker thread. When the
associated file descriptor becomes ready for writing,
:meth:`BasicStream.on_transmit` will be called.
"""
_vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream)
side = stream.transmit_side side = stream.transmit_side
assert side and side.fd is not None assert side and side.fd is not None
self.poller.start_transmit(side.fd, (side, stream.on_transmit)) self.poller.start_transmit(side.fd, (side, stream.on_transmit))
def _stop_transmit(self, stream): def _stop_transmit(self, stream):
"""
Mark the :attr:`transmit_side <Stream.receive_side>` on `stream` as not
ready for writing.
"""
_vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream)
self.poller.stop_transmit(stream.transmit_side.fd) self.poller.stop_transmit(stream.transmit_side.fd)
def keep_alive(self): def keep_alive(self):
"""
Return :data:`True` if any reader's :attr:`Side.keep_alive` attribute
is :data:`True`, or any :class:`Context` is still registered that is
not the master. Used to delay shutdown while some important work is in
progress (e.g. log draining).
"""
it = (side.keep_alive for (_, (side, _)) in self.poller.readers) it = (side.keep_alive for (_, (side, _)) in self.poller.readers)
return sum(it, 0) return sum(it, 0)
def defer_sync(self, func): def defer_sync(self, func):
""" """
Block the calling thread while `func` runs on a broker thread. Arrange for `func()` to execute on the broker thread, blocking the
current thread until a result or exception is available.
:returns: :returns:
Return value of `func()`. Return value of `func()`.
@ -2349,6 +2489,12 @@ class Broker(object):
side.stream.on_disconnect(self) side.stream.on_disconnect(self)
def _broker_main(self): def _broker_main(self):
"""
Handle events until :meth:`shutdown`. On shutdown, invoke
:meth:`Stream.on_shutdown` for every active stream, then allow up to
:attr:`shutdown_timeout` seconds for the streams to unregister
themselves before forcefully calling :meth:`Stream.on_disconnect`.
"""
try: try:
while self._alive: while self._alive:
self._loop_once() self._loop_once()
@ -2361,12 +2507,20 @@ class Broker(object):
fire(self, 'exit') fire(self, 'exit')
def shutdown(self): def shutdown(self):
"""
Request broker gracefully disconnect streams and stop. Safe to call
from any thread.
"""
_v and LOG.debug('%r.shutdown()', self) _v and LOG.debug('%r.shutdown()', self)
def _shutdown(): def _shutdown():
self._alive = False self._alive = False
self.defer(_shutdown) self.defer(_shutdown)
def join(self): def join(self):
"""
Wait for the broker to stop, expected to be called after
:meth:`shutdown`.
"""
self._thread.join() self._thread.join()
def __repr__(self): def __repr__(self):
@ -2438,6 +2592,34 @@ class Dispatcher(object):
class ExternalContext(object): class ExternalContext(object):
"""
External context implementation.
.. attribute:: broker
The :class:`mitogen.core.Broker` instance.
.. attribute:: context
The :class:`mitogen.core.Context` instance.
.. attribute:: channel
The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION`
requests are received.
.. attribute:: stdout_log
The :class:`mitogen.core.IoLogger` connected to ``stdout``.
.. attribute:: importer
The :class:`mitogen.core.Importer` instance.
.. attribute:: stdout_log
The :class:`IoLogger` connected to ``stdout``.
.. attribute:: stderr_log
The :class:`IoLogger` connected to ``stderr``.
.. method:: _dispatch_calls
Implementation for the main thread in every child context.
"""
detached = False detached = False
def __init__(self, config): def __init__(self, config):

@ -754,6 +754,26 @@ class ModuleResponder(object):
class Broker(mitogen.core.Broker): class Broker(mitogen.core.Broker):
"""
.. note::
You may construct as many brokers as desired, and use the same broker
for multiple routers, however usually only one broker need exist.
Multiple brokers may be useful when dealing with sets of children with
differing lifetimes. For example, a subscription service where
non-payment results in termination for one customer.
:param bool install_watcher:
If :data:`True`, an additional thread is started to monitor the
lifetime of the main thread, triggering :meth:`shutdown`
automatically in case the user forgets to call it, or their code
crashed.
You should not rely on this functionality in your program, it is only
intended as a fail-safe and to simplify the API for new users. In
particular, alternative Python implementations may not be able to
support watching the main thread.
"""
shutdown_timeout = 5.0 shutdown_timeout = 5.0
_watcher = None _watcher = None
poller_class = mitogen.parent.PREFERRED_POLLER poller_class = mitogen.parent.PREFERRED_POLLER
@ -773,7 +793,32 @@ class Broker(mitogen.core.Broker):
class Router(mitogen.parent.Router): class Router(mitogen.parent.Router):
"""
Extend :class:`mitogen.core.Router` with functionality useful to masters,
and child contexts who later become masters. Currently when this class is
required, the target context's router is upgraded at runtime.
.. note::
You may construct as many routers as desired, and use the same broker
for multiple routers, however usually only one broker and router need
exist. Multiple routers may be useful when dealing with separate trust
domains, for example, manipulating infrastructure belonging to separate
customers or projects.
:param mitogen.master.Broker broker:
Broker to use. If not specified, a private :class:`Broker` is created.
"""
broker_class = Broker broker_class = Broker
#: When :data:`True`, cause the broker thread and any subsequent broker and
#: main threads existing in any child to write
#: ``/tmp/mitogen.stats.<pid>.<thread_name>.log`` containing a
#: :mod:`cProfile` dump on graceful exit. Must be set prior to construction
#: of any :class:`Broker`, e.g. via:
#:
#: .. code::
#: mitogen.master.Router.profiling = True
profiling = False profiling = False
def __init__(self, broker=None, max_message_size=None): def __init__(self, broker=None, max_message_size=None):
@ -796,6 +841,10 @@ class Router(mitogen.parent.Router):
) )
def enable_debug(self): def enable_debug(self):
"""
Cause this context and any descendant child contexts to write debug
logs to ``/tmp/mitogen.<pid>.log``.
"""
mitogen.core.enable_debug_logging() mitogen.core.enable_debug_logging()
self.debug = True self.debug = True
@ -830,6 +879,12 @@ class IdAllocator(object):
BLOCK_SIZE = 1000 BLOCK_SIZE = 1000
def allocate(self): def allocate(self):
"""
Arrange for a unique context ID to be allocated and associated with a
route leading to the active context. In masters, the ID is generated
directly, in children it is forwarded to the master via a
:data:`mitogen.core.ALLOCATE_ID` message.
"""
self.lock.acquire() self.lock.acquire()
try: try:
id_ = self.next_id id_ = self.next_id

@ -1822,6 +1822,10 @@ class Router(mitogen.core.Router):
class ProcessMonitor(object): class ProcessMonitor(object):
"""
Install a :data:`signal.SIGCHLD` handler that generates callbacks when a
specific child process has exitted. This class is obsolete, do not use.
"""
def __init__(self): def __init__(self):
# pid -> callback() # pid -> callback()
self.callback_by_pid = {} self.callback_by_pid = {}
@ -1835,6 +1839,16 @@ class ProcessMonitor(object):
del self.callback_by_pid[pid] del self.callback_by_pid[pid]
def add(self, pid, callback): def add(self, pid, callback):
"""
Add a callback function to be notified of the exit status of a process.
:param int pid:
Process ID to be notified of.
:param callback:
Function invoked as `callback(status)`, where `status` is the raw
exit status of the child process.
"""
self.callback_by_pid[pid] = callback self.callback_by_pid[pid] = callback
_instance = None _instance = None

Loading…
Cancel
Save