diff --git a/docs/internals.rst b/docs/internals.rst index 96f9269c..fc6206e0 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -48,46 +48,75 @@ PidfulStreamHandler :members: -Side -==== +Stream & Side +============= + +.. currentmodule:: mitogen.core +.. autoclass:: Stream + :members: .. currentmodule:: mitogen.core .. autoclass:: Side :members: -Stream -====== +Protocol +======== .. currentmodule:: mitogen.core -.. autoclass:: BasicStream +.. autoclass:: Protocol :members: -.. autoclass:: Stream +.. currentmodule:: mitogen.parent +.. autoclass:: BootstrapProtocol :members: -.. currentmodule:: mitogen.fork -.. autoclass:: Stream +.. currentmodule:: mitogen.core +.. autoclass:: DelimitedProtocol :members: -.. currentmodule:: mitogen.parent -.. autoclass:: Stream +.. currentmodule:: mitogen.core +.. autoclass:: IoLoggerProtocol :members: -.. currentmodule:: mitogen.ssh -.. autoclass:: Stream +.. currentmodule:: mitogen.core +.. autoclass:: MitogenProtocol :members: -.. currentmodule:: mitogen.sudo -.. autoclass:: Stream +.. currentmodule:: mitogen.parent +.. autoclass:: MitogenProtocol :members: .. currentmodule:: mitogen.core -.. autoclass:: IoLogger +.. autoclass:: Waker :members: -.. currentmodule:: mitogen.core -.. autoclass:: Waker + +Connection / Options +==================== + +.. currentmodule:: mitogen.fork +.. autoclass:: Options + :members: +.. autoclass:: Connection + :members: + +.. currentmodule:: mitogen.parent +.. autoclass:: Options + :members: +.. autoclass:: Connection + :members: + +.. currentmodule:: mitogen.ssh +.. autoclass:: Options + :members: +.. autoclass:: Connection + :members: + +.. currentmodule:: mitogen.sudo +.. autoclass:: Options + :members: +.. autoclass:: Connection :members: @@ -158,21 +187,9 @@ Process Helpers ======= -Blocking I/O ------------- - -These functions exist to support the blocking phase of setting up a new -context. They will eventually be replaced with asynchronous equivalents. - - -.. currentmodule:: mitogen.parent -.. autofunction:: discard_until -.. autofunction:: iter_read -.. autofunction:: write_all - Subprocess Functions ------------- +--------------------- .. currentmodule:: mitogen.parent .. autofunction:: create_child @@ -184,15 +201,15 @@ Helpers ------- .. currentmodule:: mitogen.core -.. autofunction:: to_text .. autofunction:: has_parent_authority +.. autofunction:: io_op +.. autofunction:: pipe +.. autofunction:: set_block .. autofunction:: set_cloexec .. autofunction:: set_nonblock -.. autofunction:: set_block -.. autofunction:: io_op +.. autofunction:: to_text .. currentmodule:: mitogen.parent -.. autofunction:: close_nonstandard_fds .. autofunction:: create_socketpair .. currentmodule:: mitogen.master diff --git a/mitogen/core.py b/mitogen/core.py index aca7972f..c9b1f9df 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -423,8 +423,11 @@ def listen(obj, name, func): def unlisten(obj, name, func): """ - Remove `func` from the list of functions invoked when signal `name` is + Remove `func()` from the list of functions invoked when signal `name` is fired by `obj`. + + :raises ValueError: + `func()` was not on the list. """ _signals(obj, name).remove(func) @@ -946,7 +949,7 @@ class Sender(object): Senders may be serialized, making them convenient to wire up data flows. See :meth:`mitogen.core.Receiver.to_sender` for more information. - :param Context context: + :param mitogen.core.Context context: Context to send messages to. :param int dst_handle: Destination handle to send messages to. @@ -1550,6 +1553,14 @@ class Stream(object): name = u'default' def set_protocol(self, protocol): + """ + Bind a protocol to this stream, by updating :attr:`Protocol.stream` to + refer to this stream, and updating this stream's + :attr:`Stream.protocol` to the refer to the protocol. Any prior + protocol's :attr:`Protocol.stream` is set to :data:`None`. + """ + if self.protocol: + self.protocol.stream = None self.protocol = protocol self.protocol.stream = self @@ -1622,7 +1633,11 @@ class Protocol(object): implementation to be replaced without modifying behavioural logic. """ stream_class = Stream + + #: The :class:`Stream` this protocol is currently bound to, or + #: :data:`None`. stream = None + read_size = CHUNK_SIZE @classmethod @@ -1695,9 +1710,27 @@ class DelimitedProtocol(Protocol): self.stream.protocol.on_receive(broker, self._trailer) def on_line_received(self, line): + """ + Receive a line from the stream. + + :param bytes line: + The encoded line, excluding the delimiter. + :returns: + :data:`False` to indicate this invocation modified the stream's + active protocol, and any remaining buffered data should be passed + to the new protocol's :meth:`on_receive` method. + + Any other return value is ignored. + """ pass def on_partial_line_received(self, line): + """ + Receive a trailing unterminated partial line from the stream. + + :param bytes line: + The encoded partial line. + """ pass @@ -1766,7 +1799,7 @@ class Side(object): underlying FD, preventing erroneous duplicate calls to :func:`os.close` due to duplicate :meth:`Stream.on_disconnect` calls, which would otherwise risk silently succeeding by closing an unrelated descriptor. For this reason, it - is crucial only one :class:`Side` exists per unique descriptor. + is crucial only one file object exists per unique descriptor. :param mitogen.core.Stream stream: The stream this side is associated with. @@ -1794,8 +1827,8 @@ class Side(object): self.fp = fp #: Integer file descriptor to perform IO on, or :data:`None` if #: :meth:`close` has been called. This is saved separately from the - #: file object, since fileno() cannot be called on it after it has been - #: closed. + #: file object, since :meth:`file.fileno` cannot be called on it after + #: it has been closed. self.fd = fp.fileno() #: If :data:`True`, causes presence of this side in #: :class:`Broker`'s active reader set to defer shutdown until the @@ -1822,7 +1855,7 @@ class Side(object): def close(self): """ - Call :func:`os.close` on :attr:`fd` if it is not :data:`None`, + Call :meth:`file.close` on :attr:`fp` if it is not :data:`None`, then set it to :data:`None`. """ _vv and IOLOG.debug('%r.close()', self) @@ -1841,7 +1874,7 @@ class Side(object): in a 0-sized read like a regular file. :returns: - Bytes read, or the empty to string to indicate disconnection was + Bytes read, or the empty string to indicate disconnection was detected. """ if self.closed: @@ -2024,7 +2057,7 @@ class Context(object): explicitly, as that method is deduplicating, and returns the only context instance :ref:`signals` will be raised on. - :param Router router: + :param mitogen.core.Router router: Router to emit messages through. :param int context_id: Context ID. @@ -2669,7 +2702,11 @@ class IoLoggerProtocol(DelimitedProtocol): def on_shutdown(self, broker): """ - Shut down the write end of the logging socket. + Shut down the write end of the socket, preventing any further writes to + it by this process, or subprocess that inherited it. This allows any + remaining kernel-buffered data to be drained during graceful shutdown + without the buffer continuously refilling due to some out of control + child process. """ _v and LOG.debug('%r: shutting down', self) if not IS_WSL: @@ -2681,6 +2718,9 @@ class IoLoggerProtocol(DelimitedProtocol): self.stream.transmit_side.close() def on_line_received(self, line): + """ + Decode the received line as UTF-8 and pass it to the logging framework. + """ self._log.info('%s', line.decode('utf-8', 'replace')) @@ -2881,7 +2921,7 @@ class Router(object): If :data:`False`, the handler will be unregistered after a single message has been received. - :param Context respondent: + :param mitogen.core.Context respondent: Context that messages to this handle are expected to be sent from. If specified, arranges for a dead message to be delivered to `fn` when disconnection of the context is detected. diff --git a/mitogen/parent.py b/mitogen/parent.py index f42d22e6..22e40610 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1121,10 +1121,10 @@ class RegexProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol): class BootstrapProtocol(RegexProtocol): """ - Respond to stdout of a child during bootstrap. Wait for EC0_MARKER to be - written by the first stage to indicate it can receive the bootstrap, then - await EC1_MARKER to indicate success, and :class:`MitogenProtocol` can be - enabled. + Respond to stdout of a child during bootstrap. Wait for :attr:`EC0_MARKER` + to be written by the first stage to indicate it can receive the bootstrap, + then await :attr:`EC1_MARKER` to indicate success, and + :class:`MitogenProtocol` can be enabled. """ #: Sentinel value emitted by the first stage to indicate it is ready to #: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have @@ -1951,9 +1951,9 @@ class RouteMonitor(object): RouteMonitor lives entirely on the broker thread, so its data requires no locking. - :param Router router: + :param mitogen.master.Router router: Router to install handlers on. - :param Context parent: + :param mitogen.core.Context parent: :data:`None` in the master process, or reference to the parent context we should propagate route updates towards. """ diff --git a/mitogen/select.py b/mitogen/select.py index 51aebc22..ca3c32bc 100644 --- a/mitogen/select.py +++ b/mitogen/select.py @@ -57,9 +57,7 @@ class Select(object): If `oneshot` is :data:`True`, then remove each receiver as it yields a result; since :meth:`__iter__` terminates once the final receiver is - removed, this makes it convenient to respond to calls made in parallel: - - .. code-block:: python + removed, this makes it convenient to respond to calls made in parallel:: total = 0 recvs = [c.call_async(long_running_operation) for c in contexts] @@ -98,7 +96,7 @@ class Select(object): for msg in mitogen.select.Select(selects): print(msg.unpickle()) - :class:`Select` may be used to mix inter-thread and inter-process IO: + :class:`Select` may be used to mix inter-thread and inter-process IO:: latch = mitogen.core.Latch() start_thread(latch)