|
|
|
@ -1024,11 +1024,11 @@ class Receiver(object):
|
|
|
|
|
routed to the context due to disconnection, and ignores messages that
|
|
|
|
|
did not originate from the respondent context.
|
|
|
|
|
"""
|
|
|
|
|
#: If not :data:`None`, a reference to a function invoked as
|
|
|
|
|
#: `notify(receiver)` when a new message is delivered to this receiver. The
|
|
|
|
|
#: function is invoked on the broker thread, therefore it must not block.
|
|
|
|
|
#: Used by :class:`mitogen.select.Select` to implement waiting on multiple
|
|
|
|
|
#: receivers.
|
|
|
|
|
#: If not :data:`None`, a function invoked as `notify(receiver)` after a
|
|
|
|
|
#: message has been received. The function is invoked on :class:`Broker`
|
|
|
|
|
#: thread, therefore it must not block. Used by
|
|
|
|
|
#: :class:`mitogen.select.Select` to efficiently implement waiting on
|
|
|
|
|
#: multiple event sources.
|
|
|
|
|
notify = None
|
|
|
|
|
|
|
|
|
|
raise_channelerror = True
|
|
|
|
@ -1513,6 +1513,22 @@ class Importer(object):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LogHandler(logging.Handler):
|
|
|
|
|
"""
|
|
|
|
|
A :class:`logging.Handler` subclass that arranges for :data:`FORWARD_LOG`
|
|
|
|
|
messages to be sent to a parent context in response to logging messages
|
|
|
|
|
generated by the current context. This is installed by default in child
|
|
|
|
|
contexts during bootstrap, so that :mod:`logging` events can be viewed and
|
|
|
|
|
managed centrally in the master process.
|
|
|
|
|
|
|
|
|
|
The handler is initially *corked* after construction, such that it buffers
|
|
|
|
|
messages until :meth:`uncork` is called. This allows logging to be
|
|
|
|
|
installed prior to communication with the target being available, and
|
|
|
|
|
avoids any possible race where early log messages might be dropped.
|
|
|
|
|
|
|
|
|
|
:param mitogen.core.Context context:
|
|
|
|
|
The context to send log messages towards. At present this is always
|
|
|
|
|
the master process.
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, context):
|
|
|
|
|
logging.Handler.__init__(self)
|
|
|
|
|
self.context = context
|
|
|
|
@ -1549,6 +1565,9 @@ class LogHandler(logging.Handler):
|
|
|
|
|
self._buffer_lock.release()
|
|
|
|
|
|
|
|
|
|
def emit(self, rec):
|
|
|
|
|
"""
|
|
|
|
|
Send a :data:`FORWARD_LOG` message towards the target context.
|
|
|
|
|
"""
|
|
|
|
|
if rec.name == 'mitogen.io' or \
|
|
|
|
|
getattr(self.local, 'in_emit', False):
|
|
|
|
|
return
|
|
|
|
@ -1566,6 +1585,30 @@ class LogHandler(logging.Handler):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Stream(object):
|
|
|
|
|
"""
|
|
|
|
|
A :class:`Stream` is one readable and optionally one writeable file
|
|
|
|
|
descriptor (represented by :class:`Side`) aggregated alongside an
|
|
|
|
|
associated :class:`Protocol` that knows how to respond to IO readiness
|
|
|
|
|
events for those descriptors.
|
|
|
|
|
|
|
|
|
|
Streams are registered with :class:`Broker`, and callbacks are invoked on
|
|
|
|
|
the broker thread in response to IO activity. When registered using
|
|
|
|
|
:meth:`Broker.start_receive` or :meth:`Broker._start_transmit`, the broker
|
|
|
|
|
may call any of :meth:`on_receive`, :meth:`on_transmit`,
|
|
|
|
|
:meth:`on_shutdown` or :meth:`on_disconnect`.
|
|
|
|
|
|
|
|
|
|
It is expected that the :class:`Protocol` associated with a stream will
|
|
|
|
|
change over its life. For example during connection setup, the initial
|
|
|
|
|
protocol may be :class:`mitogen.parent.BootstrapProtocol` that knows how to
|
|
|
|
|
enter SSH and sudo passwords and transmit the :mod:`mitogen.core` source to
|
|
|
|
|
the target, before handing off to :class:`MitogenProtocol` when the target
|
|
|
|
|
process is initialized.
|
|
|
|
|
|
|
|
|
|
Streams connecting to children are in turn aggregated by
|
|
|
|
|
:class:`mitogen.parent.Connection`, which contains additional logic for
|
|
|
|
|
managing any child process, and a reference to any separate ``stderr``
|
|
|
|
|
:class:`Stream` connected to that process.
|
|
|
|
|
"""
|
|
|
|
|
#: A :class:`Side` representing the stream's receive file descriptor.
|
|
|
|
|
receive_side = None
|
|
|
|
|
|
|
|
|
@ -1578,14 +1621,16 @@ class Stream(object):
|
|
|
|
|
#: In parents, the :class:`mitogen.parent.Connection` instance.
|
|
|
|
|
conn = None
|
|
|
|
|
|
|
|
|
|
#: The stream name. This is used in the :meth:`__repr__` output in any log
|
|
|
|
|
#: messages, it may be any descriptive string.
|
|
|
|
|
name = u'default'
|
|
|
|
|
|
|
|
|
|
def set_protocol(self, protocol):
|
|
|
|
|
"""
|
|
|
|
|
Bind a protocol to this stream, by updating :attr:`Protocol.stream` to
|
|
|
|
|
refer to this stream, and updating this stream's
|
|
|
|
|
:attr:`Stream.protocol` to the refer to the protocol. Any prior
|
|
|
|
|
protocol's :attr:`Protocol.stream` is set to :data:`None`.
|
|
|
|
|
Bind a :class:`Protocol` to this stream, by updating
|
|
|
|
|
:attr:`Protocol.stream` to refer to this stream, and updating this
|
|
|
|
|
stream's :attr:`Stream.protocol` to the refer to the protocol. Any
|
|
|
|
|
prior protocol's :attr:`Protocol.stream` is set to :data:`None`.
|
|
|
|
|
"""
|
|
|
|
|
if self.protocol:
|
|
|
|
|
self.protocol.stream = None
|
|
|
|
@ -1593,6 +1638,21 @@ class Stream(object):
|
|
|
|
|
self.protocol.stream = self
|
|
|
|
|
|
|
|
|
|
def accept(self, rfp, wfp):
|
|
|
|
|
"""
|
|
|
|
|
Attach a pair of file objects to :attr:`receive_side` and
|
|
|
|
|
:attr:`transmit_side`, after wrapping them in :class:`Side` instances.
|
|
|
|
|
:class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec`
|
|
|
|
|
on the underlying file descriptors during construction.
|
|
|
|
|
|
|
|
|
|
The same file object may be used for both sides. The default
|
|
|
|
|
:meth:`on_disconnect` is handles the possibility that only one
|
|
|
|
|
descriptor may need to be closed.
|
|
|
|
|
|
|
|
|
|
:param file rfp:
|
|
|
|
|
The file object to receive from.
|
|
|
|
|
:param file wfp:
|
|
|
|
|
The file object to transmit to.
|
|
|
|
|
"""
|
|
|
|
|
self.receive_side = Side(self, rfp)
|
|
|
|
|
self.transmit_side = Side(self, wfp)
|
|
|
|
|
|
|
|
|
@ -1601,13 +1661,17 @@ class Stream(object):
|
|
|
|
|
|
|
|
|
|
def on_receive(self, broker):
|
|
|
|
|
"""
|
|
|
|
|
Called by :class:`Broker` when the stream's :attr:`receive_side` has
|
|
|
|
|
Invoked by :class:`Broker` when the stream's :attr:`receive_side` has
|
|
|
|
|
been marked readable using :meth:`Broker.start_receive` and the broker
|
|
|
|
|
has detected the associated file descriptor is ready for reading.
|
|
|
|
|
|
|
|
|
|
Subclasses must implement this if :meth:`Broker.start_receive` is ever
|
|
|
|
|
called on them, and the method must call :meth:`on_disconect` if
|
|
|
|
|
reading produces an empty string.
|
|
|
|
|
Subclasses must implement this if they are registered using
|
|
|
|
|
:meth:`Broker.start_receive`, and the method must invoke
|
|
|
|
|
:meth:`on_disconnect` if reading produces an empty string.
|
|
|
|
|
|
|
|
|
|
The default implementation reads :attr:`Protocol.read_size` bytes and
|
|
|
|
|
passes the resulting bytestring to :meth:`Protocol.on_receive`. If the
|
|
|
|
|
bytestring is 0 bytes, invokes :meth:`on_disconnect` instead.
|
|
|
|
|
"""
|
|
|
|
|
buf = self.receive_side.read(self.protocol.read_size)
|
|
|
|
|
if not buf:
|
|
|
|
@ -1618,30 +1682,39 @@ class Stream(object):
|
|
|
|
|
|
|
|
|
|
def on_transmit(self, broker):
|
|
|
|
|
"""
|
|
|
|
|
Called by :class:`Broker` when the stream's :attr:`transmit_side`
|
|
|
|
|
has been marked writeable using :meth:`Broker._start_transmit` and
|
|
|
|
|
the broker has detected the associated file descriptor is ready for
|
|
|
|
|
Invoked by :class:`Broker` when the stream's :attr:`transmit_side` has
|
|
|
|
|
been marked writeable using :meth:`Broker._start_transmit` and the
|
|
|
|
|
broker has detected the associated file descriptor is ready for
|
|
|
|
|
writing.
|
|
|
|
|
|
|
|
|
|
Subclasses must implement this if :meth:`Broker._start_transmit` is
|
|
|
|
|
ever called on them.
|
|
|
|
|
Subclasses must implement they are ever registerd with
|
|
|
|
|
:meth:`Broker._start_transmit`.
|
|
|
|
|
|
|
|
|
|
The default implementation invokes :meth:`Protocol.on_transmit`.
|
|
|
|
|
"""
|
|
|
|
|
self.protocol.on_transmit(broker)
|
|
|
|
|
|
|
|
|
|
def on_shutdown(self, broker):
|
|
|
|
|
"""
|
|
|
|
|
Called by :meth:`Broker.shutdown` to allow the stream time to
|
|
|
|
|
gracefully shutdown. The base implementation simply called
|
|
|
|
|
:meth:`on_disconnect`.
|
|
|
|
|
Invoked by :meth:`Broker.shutdown` to allow the stream time to
|
|
|
|
|
gracefully shutdown.
|
|
|
|
|
|
|
|
|
|
The default implementation emits a ``shutdown`` signal before
|
|
|
|
|
invoking :meth:`on_disconnect`.
|
|
|
|
|
"""
|
|
|
|
|
fire(self, 'shutdown')
|
|
|
|
|
self.protocol.on_shutdown(broker)
|
|
|
|
|
|
|
|
|
|
def on_disconnect(self, broker):
|
|
|
|
|
"""
|
|
|
|
|
Called by :class:`Broker` to force disconnect the stream. The base
|
|
|
|
|
implementation simply closes :attr:`receive_side` and
|
|
|
|
|
:attr:`transmit_side` and unregisters the stream from the broker.
|
|
|
|
|
Invoked by :class:`Broker` to force disconnect the stream during
|
|
|
|
|
shutdown, invoked by the default :meth:`on_shutdown` implementation,
|
|
|
|
|
and usually invoked by any subclass :meth:`on_receive` implementation
|
|
|
|
|
in response to a 0-byte read.
|
|
|
|
|
|
|
|
|
|
The base implementation fires a ``disconnect`` event, then closes
|
|
|
|
|
:attr:`receive_side` and :attr:`transmit_side` after unregistering the
|
|
|
|
|
stream from the broker.
|
|
|
|
|
"""
|
|
|
|
|
fire(self, 'disconnect')
|
|
|
|
|
self.protocol.on_disconnect(broker)
|
|
|
|
@ -1666,6 +1739,8 @@ class Protocol(object):
|
|
|
|
|
#: :data:`None`.
|
|
|
|
|
stream = None
|
|
|
|
|
|
|
|
|
|
#: The size of the read buffer used by :class:`Stream` when this is the
|
|
|
|
|
#: active protocol for the stream.
|
|
|
|
|
read_size = CHUNK_SIZE
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
@ -2369,8 +2444,18 @@ class Latch(object):
|
|
|
|
|
|
|
|
|
|
See :ref:`waking-sleeping-threads` for further discussion.
|
|
|
|
|
"""
|
|
|
|
|
#: The :class:`Poller` implementation to use for waiting. Since the poller
|
|
|
|
|
#: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller`
|
|
|
|
|
#: if it is available, or :class:`mitogen.core.Poller` otherwise, since
|
|
|
|
|
#: these implementations require no system calls to create, configure or
|
|
|
|
|
#: destroy.
|
|
|
|
|
poller_class = Poller
|
|
|
|
|
|
|
|
|
|
#: If not :data:`None`, a function invoked as `notify(latch)` after a
|
|
|
|
|
#: successful call to :meth:`put`. The function is invoked on the
|
|
|
|
|
#: :meth:`put` caller's thread, which may be the :class:`Broker` thread,
|
|
|
|
|
#: therefore it must not block. Used by :class:`mitogen.select.Select` to
|
|
|
|
|
#: efficiently implement waiting on multiple event sources.
|
|
|
|
|
notify = None
|
|
|
|
|
|
|
|
|
|
# The _cls_ prefixes here are to make it crystal clear in the code which
|
|
|
|
@ -2725,15 +2810,22 @@ class Waker(Protocol):
|
|
|
|
|
|
|
|
|
|
class IoLoggerProtocol(DelimitedProtocol):
|
|
|
|
|
"""
|
|
|
|
|
Handle redirection of standard IO into the :mod:`logging` package.
|
|
|
|
|
Attached to one end of a socket pair whose other end overwrites one of the
|
|
|
|
|
standard ``stdout`` or ``stderr`` file descriptors in a child context.
|
|
|
|
|
Received data is split up into lines, decoded as UTF-8 and logged to the
|
|
|
|
|
:mod:`logging` package as either the ``stdout`` or ``stderr`` logger.
|
|
|
|
|
|
|
|
|
|
Logging in child contexts is in turn forwarded to the master process using
|
|
|
|
|
:class:`LogHandler`.
|
|
|
|
|
"""
|
|
|
|
|
@classmethod
|
|
|
|
|
def build_stream(cls, name, dest_fd):
|
|
|
|
|
"""
|
|
|
|
|
Even though the descriptor `dest_fd` will hold the opposite end of the
|
|
|
|
|
socket open, we must keep a separate dup() of it (i.e. wsock) in case
|
|
|
|
|
some code decides to overwrite `dest_fd` later, which would thus break
|
|
|
|
|
:meth:`on_shutdown`.
|
|
|
|
|
Even though the file descriptor `dest_fd` will hold the opposite end of
|
|
|
|
|
the socket open, we must keep a separate dup() of it (i.e. wsock) in
|
|
|
|
|
case some code decides to overwrite `dest_fd` later, which would
|
|
|
|
|
prevent break :meth:`on_shutdown` from calling :meth:`shutdown()
|
|
|
|
|
<socket.socket.shutdown>` on it.
|
|
|
|
|
"""
|
|
|
|
|
rsock, wsock = socket.socketpair()
|
|
|
|
|
os.dup2(wsock.fileno(), dest_fd)
|
|
|
|
|