[stream-refactor] get caught up on internals.rst updates

pull/612/head
David Wilson 5 years ago
parent 7379144a12
commit f0782ccd42

@ -48,46 +48,75 @@ PidfulStreamHandler
:members: :members:
Side Stream & Side
==== =============
.. currentmodule:: mitogen.core
.. autoclass:: Stream
:members:
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Side .. autoclass:: Side
:members: :members:
Stream Protocol
====== ========
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: BasicStream .. autoclass:: Protocol
:members: :members:
.. autoclass:: Stream .. currentmodule:: mitogen.parent
.. autoclass:: BootstrapProtocol
:members: :members:
.. currentmodule:: mitogen.fork .. currentmodule:: mitogen.core
.. autoclass:: Stream .. autoclass:: DelimitedProtocol
:members: :members:
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.core
.. autoclass:: Stream .. autoclass:: IoLoggerProtocol
:members: :members:
.. currentmodule:: mitogen.ssh .. currentmodule:: mitogen.core
.. autoclass:: Stream .. autoclass:: MitogenProtocol
:members: :members:
.. currentmodule:: mitogen.sudo .. currentmodule:: mitogen.parent
.. autoclass:: Stream .. autoclass:: MitogenProtocol
:members: :members:
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: IoLogger .. autoclass:: Waker
:members: :members:
.. currentmodule:: mitogen.core
.. autoclass:: Waker Connection / Options
====================
.. currentmodule:: mitogen.fork
.. autoclass:: Options
:members:
.. autoclass:: Connection
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: Options
:members:
.. autoclass:: Connection
:members:
.. currentmodule:: mitogen.ssh
.. autoclass:: Options
:members:
.. autoclass:: Connection
:members:
.. currentmodule:: mitogen.sudo
.. autoclass:: Options
:members:
.. autoclass:: Connection
:members: :members:
@ -158,21 +187,9 @@ Process
Helpers Helpers
======= =======
Blocking I/O
------------
These functions exist to support the blocking phase of setting up a new
context. They will eventually be replaced with asynchronous equivalents.
.. currentmodule:: mitogen.parent
.. autofunction:: discard_until
.. autofunction:: iter_read
.. autofunction:: write_all
Subprocess Functions Subprocess Functions
------------ ---------------------
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
.. autofunction:: create_child .. autofunction:: create_child
@ -184,15 +201,15 @@ Helpers
------- -------
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autofunction:: to_text
.. autofunction:: has_parent_authority .. autofunction:: has_parent_authority
.. autofunction:: io_op
.. autofunction:: pipe
.. autofunction:: set_block
.. autofunction:: set_cloexec .. autofunction:: set_cloexec
.. autofunction:: set_nonblock .. autofunction:: set_nonblock
.. autofunction:: set_block .. autofunction:: to_text
.. autofunction:: io_op
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
.. autofunction:: close_nonstandard_fds
.. autofunction:: create_socketpair .. autofunction:: create_socketpair
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master

@ -423,8 +423,11 @@ def listen(obj, name, func):
def unlisten(obj, name, func): def unlisten(obj, name, func):
""" """
Remove `func` from the list of functions invoked when signal `name` is Remove `func()` from the list of functions invoked when signal `name` is
fired by `obj`. fired by `obj`.
:raises ValueError:
`func()` was not on the list.
""" """
_signals(obj, name).remove(func) _signals(obj, name).remove(func)
@ -946,7 +949,7 @@ class Sender(object):
Senders may be serialized, making them convenient to wire up data flows. Senders may be serialized, making them convenient to wire up data flows.
See :meth:`mitogen.core.Receiver.to_sender` for more information. See :meth:`mitogen.core.Receiver.to_sender` for more information.
:param Context context: :param mitogen.core.Context context:
Context to send messages to. Context to send messages to.
:param int dst_handle: :param int dst_handle:
Destination handle to send messages to. Destination handle to send messages to.
@ -1550,6 +1553,14 @@ class Stream(object):
name = u'default' name = u'default'
def set_protocol(self, protocol): def set_protocol(self, protocol):
"""
Bind a protocol to this stream, by updating :attr:`Protocol.stream` to
refer to this stream, and updating this stream's
:attr:`Stream.protocol` to the refer to the protocol. Any prior
protocol's :attr:`Protocol.stream` is set to :data:`None`.
"""
if self.protocol:
self.protocol.stream = None
self.protocol = protocol self.protocol = protocol
self.protocol.stream = self self.protocol.stream = self
@ -1622,7 +1633,11 @@ class Protocol(object):
implementation to be replaced without modifying behavioural logic. implementation to be replaced without modifying behavioural logic.
""" """
stream_class = Stream stream_class = Stream
#: The :class:`Stream` this protocol is currently bound to, or
#: :data:`None`.
stream = None stream = None
read_size = CHUNK_SIZE read_size = CHUNK_SIZE
@classmethod @classmethod
@ -1695,9 +1710,27 @@ class DelimitedProtocol(Protocol):
self.stream.protocol.on_receive(broker, self._trailer) self.stream.protocol.on_receive(broker, self._trailer)
def on_line_received(self, line): def on_line_received(self, line):
"""
Receive a line from the stream.
:param bytes line:
The encoded line, excluding the delimiter.
:returns:
:data:`False` to indicate this invocation modified the stream's
active protocol, and any remaining buffered data should be passed
to the new protocol's :meth:`on_receive` method.
Any other return value is ignored.
"""
pass pass
def on_partial_line_received(self, line): def on_partial_line_received(self, line):
"""
Receive a trailing unterminated partial line from the stream.
:param bytes line:
The encoded partial line.
"""
pass pass
@ -1766,7 +1799,7 @@ class Side(object):
underlying FD, preventing erroneous duplicate calls to :func:`os.close` due underlying FD, preventing erroneous duplicate calls to :func:`os.close` due
to duplicate :meth:`Stream.on_disconnect` calls, which would otherwise risk to duplicate :meth:`Stream.on_disconnect` calls, which would otherwise risk
silently succeeding by closing an unrelated descriptor. For this reason, it silently succeeding by closing an unrelated descriptor. For this reason, it
is crucial only one :class:`Side` exists per unique descriptor. is crucial only one file object exists per unique descriptor.
:param mitogen.core.Stream stream: :param mitogen.core.Stream stream:
The stream this side is associated with. The stream this side is associated with.
@ -1794,8 +1827,8 @@ class Side(object):
self.fp = fp self.fp = fp
#: Integer file descriptor to perform IO on, or :data:`None` if #: Integer file descriptor to perform IO on, or :data:`None` if
#: :meth:`close` has been called. This is saved separately from the #: :meth:`close` has been called. This is saved separately from the
#: file object, since fileno() cannot be called on it after it has been #: file object, since :meth:`file.fileno` cannot be called on it after
#: closed. #: it has been closed.
self.fd = fp.fileno() self.fd = fp.fileno()
#: If :data:`True`, causes presence of this side in #: If :data:`True`, causes presence of this side in
#: :class:`Broker`'s active reader set to defer shutdown until the #: :class:`Broker`'s active reader set to defer shutdown until the
@ -1822,7 +1855,7 @@ class Side(object):
def close(self): def close(self):
""" """
Call :func:`os.close` on :attr:`fd` if it is not :data:`None`, Call :meth:`file.close` on :attr:`fp` if it is not :data:`None`,
then set it to :data:`None`. then set it to :data:`None`.
""" """
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
@ -1841,7 +1874,7 @@ class Side(object):
in a 0-sized read like a regular file. in a 0-sized read like a regular file.
:returns: :returns:
Bytes read, or the empty to string to indicate disconnection was Bytes read, or the empty string to indicate disconnection was
detected. detected.
""" """
if self.closed: if self.closed:
@ -2024,7 +2057,7 @@ class Context(object):
explicitly, as that method is deduplicating, and returns the only context explicitly, as that method is deduplicating, and returns the only context
instance :ref:`signals` will be raised on. instance :ref:`signals` will be raised on.
:param Router router: :param mitogen.core.Router router:
Router to emit messages through. Router to emit messages through.
:param int context_id: :param int context_id:
Context ID. Context ID.
@ -2669,7 +2702,11 @@ class IoLoggerProtocol(DelimitedProtocol):
def on_shutdown(self, broker): def on_shutdown(self, broker):
""" """
Shut down the write end of the logging socket. Shut down the write end of the socket, preventing any further writes to
it by this process, or subprocess that inherited it. This allows any
remaining kernel-buffered data to be drained during graceful shutdown
without the buffer continuously refilling due to some out of control
child process.
""" """
_v and LOG.debug('%r: shutting down', self) _v and LOG.debug('%r: shutting down', self)
if not IS_WSL: if not IS_WSL:
@ -2681,6 +2718,9 @@ class IoLoggerProtocol(DelimitedProtocol):
self.stream.transmit_side.close() self.stream.transmit_side.close()
def on_line_received(self, line): def on_line_received(self, line):
"""
Decode the received line as UTF-8 and pass it to the logging framework.
"""
self._log.info('%s', line.decode('utf-8', 'replace')) self._log.info('%s', line.decode('utf-8', 'replace'))
@ -2881,7 +2921,7 @@ class Router(object):
If :data:`False`, the handler will be unregistered after a single If :data:`False`, the handler will be unregistered after a single
message has been received. message has been received.
:param Context respondent: :param mitogen.core.Context respondent:
Context that messages to this handle are expected to be sent from. Context that messages to this handle are expected to be sent from.
If specified, arranges for a dead message to be delivered to `fn` If specified, arranges for a dead message to be delivered to `fn`
when disconnection of the context is detected. when disconnection of the context is detected.

@ -1121,10 +1121,10 @@ class RegexProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
class BootstrapProtocol(RegexProtocol): class BootstrapProtocol(RegexProtocol):
""" """
Respond to stdout of a child during bootstrap. Wait for EC0_MARKER to be Respond to stdout of a child during bootstrap. Wait for :attr:`EC0_MARKER`
written by the first stage to indicate it can receive the bootstrap, then to be written by the first stage to indicate it can receive the bootstrap,
await EC1_MARKER to indicate success, and :class:`MitogenProtocol` can be then await :attr:`EC1_MARKER` to indicate success, and
enabled. :class:`MitogenProtocol` can be enabled.
""" """
#: Sentinel value emitted by the first stage to indicate it is ready to #: Sentinel value emitted by the first stage to indicate it is ready to
#: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have #: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have
@ -1951,9 +1951,9 @@ class RouteMonitor(object):
RouteMonitor lives entirely on the broker thread, so its data requires no RouteMonitor lives entirely on the broker thread, so its data requires no
locking. locking.
:param Router router: :param mitogen.master.Router router:
Router to install handlers on. Router to install handlers on.
:param Context parent: :param mitogen.core.Context parent:
:data:`None` in the master process, or reference to the parent context :data:`None` in the master process, or reference to the parent context
we should propagate route updates towards. we should propagate route updates towards.
""" """

@ -57,9 +57,7 @@ class Select(object):
If `oneshot` is :data:`True`, then remove each receiver as it yields a If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :meth:`__iter__` terminates once the final receiver is result; since :meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel: removed, this makes it convenient to respond to calls made in parallel::
.. code-block:: python
total = 0 total = 0
recvs = [c.call_async(long_running_operation) for c in contexts] recvs = [c.call_async(long_running_operation) for c in contexts]
@ -98,7 +96,7 @@ class Select(object):
for msg in mitogen.select.Select(selects): for msg in mitogen.select.Select(selects):
print(msg.unpickle()) print(msg.unpickle())
:class:`Select` may be used to mix inter-thread and inter-process IO: :class:`Select` may be used to mix inter-thread and inter-process IO::
latch = mitogen.core.Latch() latch = mitogen.core.Latch()
start_thread(latch) start_thread(latch)

Loading…
Cancel
Save