From 120c667052656af2d145baa030b645b2fd75224d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 20 Jan 2019 22:54:15 +0000 Subject: [PATCH] core: many docstring updates and an example substitute for Channel --- examples/ping_pong.py | 46 +++++++++++++++++++++++++++++++++++++++++++ mitogen/core.py | 38 +++++++++++++++++++++++------------ 2 files changed, 71 insertions(+), 13 deletions(-) create mode 100644 examples/ping_pong.py diff --git a/examples/ping_pong.py b/examples/ping_pong.py new file mode 100644 index 00000000..406b6e02 --- /dev/null +++ b/examples/ping_pong.py @@ -0,0 +1,46 @@ +# Wire up a ping/pong counting loop between 2 subprocesses. + +from __future__ import print_function +import mitogen.core +import mitogen.select + + +@mitogen.core.takes_router +def ping_pong(control_sender, router): + with mitogen.core.Receiver(router) as recv: + # Tell caller how to communicate with us. + control_sender.send(recv.to_sender()) + + # Wait for caller to tell us how to talk back: + data_sender = recv.get().unpickle() + + n = 0 + while (n + 1) < 30: + n = recv.get().unpickle() + print('the number is currently', n) + data_sender.send(n + 1) + + +@mitogen.main() +def main(router): + # Create a receiver for control messages. + with mitogen.core.Receiver(router) as recv: + # Start ping_pong() in child 1 and fetch its sender. + c1 = router.local() + c1_call = c1.call_async(ping_pong, recv.to_sender()) + c1_sender = recv.get().unpickle() + + # Start ping_pong() in child 2 and fetch its sender. + c2 = router.local() + c2_call = c2.call_async(ping_pong, recv.to_sender()) + c2_sender = recv.get().unpickle() + + # Tell the children about each others' senders. + c1_sender.send(c2_sender) + c2_sender.send(c1_sender) + + # Start the loop. + c1_sender.send(0) + + # Wait for both functions to return. + mitogen.select.Select.all([c1_call, c2_call]) diff --git a/mitogen/core.py b/mitogen/core.py index 223e17f6..893a1e1e 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -714,7 +714,7 @@ class Message(object): class Sender(object): """ Senders are used to send pickled messages to a handle in another context, - it is the inverse of :class:`mitogen.core.Sender`. + it is the inverse of :class:`mitogen.core.Receiver`. Senders may be serialized, making them convenient to wire up data flows. See :meth:`mitogen.core.Receiver.to_sender` for more information. @@ -785,10 +785,12 @@ class Receiver(object): :param mitogen.core.Context respondent: Context this receiver is receiving from. If not :data:`None`, arranges for the receiver to receive a dead message if messages can no longer be - routed to the context, due to disconnection or exit. + routed to the context due to disconnection, and ignores messages that + did not originate from the respondent context. """ #: If not :data:`None`, a reference to a function invoked as - #: `notify(receiver)` when a new message is delivered to this receiver. + #: `notify(receiver)` when a new message is delivered to this receiver. The + #: function is invoked on the broker thread, therefore it must not block. #: Used by :class:`mitogen.select.Select` to implement waiting on multiple #: receivers. notify = None @@ -830,17 +832,20 @@ class Receiver(object): sender.send(line) sender.close() - remote = router.ssh(hostname='mainframe') - recv = mitogen.core.Receiver(router) - remote.call(deliver_monthly_report, recv.to_sender()) - for msg in recv: - print(msg) + @mitogen.main() + def main(router): + remote = router.ssh(hostname='mainframe') + recv = mitogen.core.Receiver(router) + remote.call(deliver_monthly_report, recv.to_sender()) + for msg in recv: + print(msg) """ return Sender(self.router.myself(), self.handle) def _on_receive(self, msg): """ - Callback from the Stream; appends data to the internal queue. + Callback registered for the handle with :class:`Router`; appends data + to the internal queue. """ _vv and IOLOG.debug('%r._on_receive(%r)', self, msg) self._latch.put(msg) @@ -878,15 +883,15 @@ class Receiver(object): If not :data:`None`, specifies a timeout in seconds. :raises mitogen.core.ChannelError: - The remote end indicated the channel should be closed, or - communication with its parent context was lost. + The remote end indicated the channel should be closed, + communication with it was lost, or :meth:`close` was called in the + local process. :raises mitogen.core.TimeoutError: Timeout was reached. :returns: - `(msg, data)` tuple, where `msg` is the :class:`Message` that was - received, and `data` is its unpickled data part. + :class:`Message` that was received. """ _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) try: @@ -915,6 +920,13 @@ class Channel(Sender, Receiver): A channel inherits from :class:`mitogen.core.Sender` and `mitogen.core.Receiver` to provide bidirectional functionality. + This class is incomplete and obsolete, it will be removed in Mitogen 0.3. + Channels were an early attempt at syntax sugar. It is always easier to pass + around unidirectional pairs of senders/receivers, even though the syntax is + baroque: + + .. literalinclude:: ../examples/ping_pong.py + Since all handles aren't known until after both ends are constructed, for both ends to communicate through a channel, it is necessary for one end to retrieve the handle allocated to the other and reconfigure its own channel