diff --git a/docs/changelog.rst b/docs/changelog.rst index c4ef4b41..3c7350b5 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -155,9 +155,13 @@ Enhancements introduced in Ansible 2.7. * `#415 `_: the interface employed for - in-process queues was changed from Kqueue/epoll() to poll(), which requires + in-process queues was changed from `kqueue + `_ / + `epoll `_ to + `poll() `_, which requires no setup or teardown, yielding a 30% latency reduction for inter-thread - communication. This may manifest as a runtime improvement in many-host runs. + communication and a 50% reduction in context switches. This will manifest as + a runtime improvement in many-host runs. Fixes @@ -294,6 +298,10 @@ Core Library was configured. This would lead to connection timeouts due to the hung response. +* `#414 `_, + `#425 `_: avoid deadlock of forked + children by reinitializing the :mod:`mitogen.service` pool lock. + * `#416 `_: around 1.4KiB of memory was leaked on every RPC, due to a list of strong references keeping alive any handler ever registered for disconnect notification. @@ -390,22 +398,23 @@ bug reports, testing, features and fixes in this release contributed by `Andreas Krüger `_, `Berend De Schouwer `_, `Brian Candler `_, +`dsgnr `_, `Duane Zamrok `_, `Eric Chang `_, `Gerben Meijer `_, `Guy Knights `_, `Jesse London `_, `Jiří Vávra `_, -`Jonathan Rosser `_, `Johan Beisser `_, +`Jonathan Rosser `_, `Josh Smift `_, `Mehdi `_, `Michael DeHaan `_, `Mohammed Naser `_, `Peter V. Saveliev `_, `Stéphane `_, -`@whky `_, `@syntonym `_, +`@whky `_, `@yodatak `_, and `Younès HAFRI `_. diff --git a/examples/ping_pong.py b/examples/ping_pong.py new file mode 100644 index 00000000..406b6e02 --- /dev/null +++ b/examples/ping_pong.py @@ -0,0 +1,46 @@ +# Wire up a ping/pong counting loop between 2 subprocesses. + +from __future__ import print_function +import mitogen.core +import mitogen.select + + +@mitogen.core.takes_router +def ping_pong(control_sender, router): + with mitogen.core.Receiver(router) as recv: + # Tell caller how to communicate with us. + control_sender.send(recv.to_sender()) + + # Wait for caller to tell us how to talk back: + data_sender = recv.get().unpickle() + + n = 0 + while (n + 1) < 30: + n = recv.get().unpickle() + print('the number is currently', n) + data_sender.send(n + 1) + + +@mitogen.main() +def main(router): + # Create a receiver for control messages. + with mitogen.core.Receiver(router) as recv: + # Start ping_pong() in child 1 and fetch its sender. + c1 = router.local() + c1_call = c1.call_async(ping_pong, recv.to_sender()) + c1_sender = recv.get().unpickle() + + # Start ping_pong() in child 2 and fetch its sender. + c2 = router.local() + c2_call = c2.call_async(ping_pong, recv.to_sender()) + c2_sender = recv.get().unpickle() + + # Tell the children about each others' senders. + c1_sender.send(c2_sender) + c2_sender.send(c1_sender) + + # Start the loop. + c1_sender.send(0) + + # Wait for both functions to return. + mitogen.select.Select.all([c1_call, c2_call]) diff --git a/mitogen/core.py b/mitogen/core.py index c0a93514..893a1e1e 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -714,7 +714,7 @@ class Message(object): class Sender(object): """ Senders are used to send pickled messages to a handle in another context, - it is the inverse of :class:`mitogen.core.Sender`. + it is the inverse of :class:`mitogen.core.Receiver`. Senders may be serialized, making them convenient to wire up data flows. See :meth:`mitogen.core.Receiver.to_sender` for more information. @@ -785,10 +785,12 @@ class Receiver(object): :param mitogen.core.Context respondent: Context this receiver is receiving from. If not :data:`None`, arranges for the receiver to receive a dead message if messages can no longer be - routed to the context, due to disconnection or exit. + routed to the context due to disconnection, and ignores messages that + did not originate from the respondent context. """ #: If not :data:`None`, a reference to a function invoked as - #: `notify(receiver)` when a new message is delivered to this receiver. + #: `notify(receiver)` when a new message is delivered to this receiver. The + #: function is invoked on the broker thread, therefore it must not block. #: Used by :class:`mitogen.select.Select` to implement waiting on multiple #: receivers. notify = None @@ -813,6 +815,12 @@ class Receiver(object): def __repr__(self): return 'Receiver(%r, %r)' % (self.router, self.handle) + def __enter__(self): + return self + + def __exit__(self, _1, _2, _3): + self.close() + def to_sender(self): """ Return a :class:`Sender` configured to deliver messages to this @@ -824,18 +832,20 @@ class Receiver(object): sender.send(line) sender.close() - remote = router.ssh(hostname='mainframe') - recv = mitogen.core.Receiver(router) - remote.call(deliver_monthly_report, recv.to_sender()) - for msg in recv: - print(msg) + @mitogen.main() + def main(router): + remote = router.ssh(hostname='mainframe') + recv = mitogen.core.Receiver(router) + remote.call(deliver_monthly_report, recv.to_sender()) + for msg in recv: + print(msg) """ - context = Context(self.router, mitogen.context_id) - return Sender(context, self.handle) + return Sender(self.router.myself(), self.handle) def _on_receive(self, msg): """ - Callback from the Stream; appends data to the internal queue. + Callback registered for the handle with :class:`Router`; appends data + to the internal queue. """ _vv and IOLOG.debug('%r._on_receive(%r)', self, msg) self._latch.put(msg) @@ -873,15 +883,15 @@ class Receiver(object): If not :data:`None`, specifies a timeout in seconds. :raises mitogen.core.ChannelError: - The remote end indicated the channel should be closed, or - communication with its parent context was lost. + The remote end indicated the channel should be closed, + communication with it was lost, or :meth:`close` was called in the + local process. :raises mitogen.core.TimeoutError: Timeout was reached. :returns: - `(msg, data)` tuple, where `msg` is the :class:`Message` that was - received, and `data` is its unpickled data part. + :class:`Message` that was received. """ _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) try: @@ -910,6 +920,13 @@ class Channel(Sender, Receiver): A channel inherits from :class:`mitogen.core.Sender` and `mitogen.core.Receiver` to provide bidirectional functionality. + This class is incomplete and obsolete, it will be removed in Mitogen 0.3. + Channels were an early attempt at syntax sugar. It is always easier to pass + around unidirectional pairs of senders/receivers, even though the syntax is + baroque: + + .. literalinclude:: ../examples/ping_pong.py + Since all handles aren't known until after both ends are constructed, for both ends to communicate through a channel, it is necessary for one end to retrieve the handle allocated to the other and reconfigure its own channel diff --git a/mitogen/fork.py b/mitogen/fork.py index 3e3a98a9..8636bd13 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -85,6 +85,10 @@ def on_fork(): mitogen.core.Latch._on_fork() mitogen.core.Side._on_fork() + mitogen__service = sys.modules.get('mitogen.service') + if mitogen__service: + mitogen__service._pool_lock = threading.Lock() + def handle_child_crash(): """ diff --git a/tests/ansible/integration/async/all.yml b/tests/ansible/integration/async/all.yml index 1a5b20ef..61d2d35c 100644 --- a/tests/ansible/integration/async/all.yml +++ b/tests/ansible/integration/async/all.yml @@ -1,4 +1,4 @@ -# - import_playbook: multiple_items_loop.yml +- import_playbook: multiple_items_loop.yml - import_playbook: result_binary_producing_json.yml - import_playbook: result_binary_producing_junk.yml - import_playbook: result_shell_echo_hi.yml diff --git a/tests/receiver_test.py b/tests/receiver_test.py index 8942ba29..72b0007e 100644 --- a/tests/receiver_test.py +++ b/tests/receiver_test.py @@ -146,5 +146,14 @@ class OnReceiveTest(testlib.RouterMixin, testlib.TestCase): # TODO: what happens to a Select subscribed to the receiver in this case? +class ToSenderTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.core.Receiver + + def test_returned_context(self): + myself = self.router.myself() + recv = self.klass(self.router) + self.assertEquals(myself, recv.to_sender().context) + + if __name__ == '__main__': unittest2.main() diff --git a/tests/router_test.py b/tests/router_test.py index 839692df..351d7ace 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -212,6 +212,14 @@ class AddHandlerTest(testlib.TestCase): router.broker.join() +class MyselfTest(testlib.RouterMixin, testlib.TestCase): + def test_myself(self): + myself = self.router.myself() + self.assertEquals(myself.context_id, mitogen.context_id) + # TODO: context should know its own name too. + self.assertEquals(myself.name, 'self') + + class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase): klass = mitogen.master.Router