core: many docstring updates and an example substitute for Channel

issue510
David Wilson 5 years ago
parent 84f75551a3
commit 120c667052

@ -0,0 +1,46 @@
# Wire up a ping/pong counting loop between 2 subprocesses.
from __future__ import print_function
import mitogen.core
import mitogen.select
@mitogen.core.takes_router
def ping_pong(control_sender, router):
with mitogen.core.Receiver(router) as recv:
# Tell caller how to communicate with us.
control_sender.send(recv.to_sender())
# Wait for caller to tell us how to talk back:
data_sender = recv.get().unpickle()
n = 0
while (n + 1) < 30:
n = recv.get().unpickle()
print('the number is currently', n)
data_sender.send(n + 1)
@mitogen.main()
def main(router):
# Create a receiver for control messages.
with mitogen.core.Receiver(router) as recv:
# Start ping_pong() in child 1 and fetch its sender.
c1 = router.local()
c1_call = c1.call_async(ping_pong, recv.to_sender())
c1_sender = recv.get().unpickle()
# Start ping_pong() in child 2 and fetch its sender.
c2 = router.local()
c2_call = c2.call_async(ping_pong, recv.to_sender())
c2_sender = recv.get().unpickle()
# Tell the children about each others' senders.
c1_sender.send(c2_sender)
c2_sender.send(c1_sender)
# Start the loop.
c1_sender.send(0)
# Wait for both functions to return.
mitogen.select.Select.all([c1_call, c2_call])

@ -714,7 +714,7 @@ class Message(object):
class Sender(object): class Sender(object):
""" """
Senders are used to send pickled messages to a handle in another context, Senders are used to send pickled messages to a handle in another context,
it is the inverse of :class:`mitogen.core.Sender`. it is the inverse of :class:`mitogen.core.Receiver`.
Senders may be serialized, making them convenient to wire up data flows. Senders may be serialized, making them convenient to wire up data flows.
See :meth:`mitogen.core.Receiver.to_sender` for more information. See :meth:`mitogen.core.Receiver.to_sender` for more information.
@ -785,10 +785,12 @@ class Receiver(object):
:param mitogen.core.Context respondent: :param mitogen.core.Context respondent:
Context this receiver is receiving from. If not :data:`None`, arranges Context this receiver is receiving from. If not :data:`None`, arranges
for the receiver to receive a dead message if messages can no longer be for the receiver to receive a dead message if messages can no longer be
routed to the context, due to disconnection or exit. routed to the context due to disconnection, and ignores messages that
did not originate from the respondent context.
""" """
#: If not :data:`None`, a reference to a function invoked as #: If not :data:`None`, a reference to a function invoked as
#: `notify(receiver)` when a new message is delivered to this receiver. #: `notify(receiver)` when a new message is delivered to this receiver. The
#: function is invoked on the broker thread, therefore it must not block.
#: Used by :class:`mitogen.select.Select` to implement waiting on multiple #: Used by :class:`mitogen.select.Select` to implement waiting on multiple
#: receivers. #: receivers.
notify = None notify = None
@ -830,17 +832,20 @@ class Receiver(object):
sender.send(line) sender.send(line)
sender.close() sender.close()
remote = router.ssh(hostname='mainframe') @mitogen.main()
recv = mitogen.core.Receiver(router) def main(router):
remote.call(deliver_monthly_report, recv.to_sender()) remote = router.ssh(hostname='mainframe')
for msg in recv: recv = mitogen.core.Receiver(router)
print(msg) remote.call(deliver_monthly_report, recv.to_sender())
for msg in recv:
print(msg)
""" """
return Sender(self.router.myself(), self.handle) return Sender(self.router.myself(), self.handle)
def _on_receive(self, msg): def _on_receive(self, msg):
""" """
Callback from the Stream; appends data to the internal queue. Callback registered for the handle with :class:`Router`; appends data
to the internal queue.
""" """
_vv and IOLOG.debug('%r._on_receive(%r)', self, msg) _vv and IOLOG.debug('%r._on_receive(%r)', self, msg)
self._latch.put(msg) self._latch.put(msg)
@ -878,15 +883,15 @@ class Receiver(object):
If not :data:`None`, specifies a timeout in seconds. If not :data:`None`, specifies a timeout in seconds.
:raises mitogen.core.ChannelError: :raises mitogen.core.ChannelError:
The remote end indicated the channel should be closed, or The remote end indicated the channel should be closed,
communication with its parent context was lost. communication with it was lost, or :meth:`close` was called in the
local process.
:raises mitogen.core.TimeoutError: :raises mitogen.core.TimeoutError:
Timeout was reached. Timeout was reached.
:returns: :returns:
`(msg, data)` tuple, where `msg` is the :class:`Message` that was :class:`Message` that was received.
received, and `data` is its unpickled data part.
""" """
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
try: try:
@ -915,6 +920,13 @@ class Channel(Sender, Receiver):
A channel inherits from :class:`mitogen.core.Sender` and A channel inherits from :class:`mitogen.core.Sender` and
`mitogen.core.Receiver` to provide bidirectional functionality. `mitogen.core.Receiver` to provide bidirectional functionality.
This class is incomplete and obsolete, it will be removed in Mitogen 0.3.
Channels were an early attempt at syntax sugar. It is always easier to pass
around unidirectional pairs of senders/receivers, even though the syntax is
baroque:
.. literalinclude:: ../examples/ping_pong.py
Since all handles aren't known until after both ends are constructed, for Since all handles aren't known until after both ends are constructed, for
both ends to communicate through a channel, it is necessary for one end to both ends to communicate through a channel, it is necessary for one end to
retrieve the handle allocated to the other and reconfigure its own channel retrieve the handle allocated to the other and reconfigure its own channel

Loading…
Cancel
Save