Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #414: reinitialize service pool lock on fork
  issue #414: reenable test.
  core: many docstring updates and an example substitute for Channel
  core: make Receiver a self-closing context manager.
  core: make Receiver.to_sender() use Router.myself().
  docs: update Changelog.
  docs: update Changelog.
issue510
David Wilson 7 years ago
commit 63e6fe3f76

@ -155,9 +155,13 @@ Enhancements
introduced in Ansible 2.7. introduced in Ansible 2.7.
* `#415 <https://github.com/dw/mitogen/issues/415>`_: the interface employed for * `#415 <https://github.com/dw/mitogen/issues/415>`_: the interface employed for
in-process queues was changed from Kqueue/epoll() to poll(), which requires in-process queues was changed from `kqueue
<https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2>`_ /
`epoll <http://man7.org/linux/man-pages/man7/epoll.7.html>`_ to
`poll() <http://man7.org/linux/man-pages/man2/poll.2.html>`_, which requires
no setup or teardown, yielding a 30% latency reduction for inter-thread no setup or teardown, yielding a 30% latency reduction for inter-thread
communication. This may manifest as a runtime improvement in many-host runs. communication and a 50% reduction in context switches. This will manifest as
a runtime improvement in many-host runs.
Fixes Fixes
@ -294,6 +298,10 @@ Core Library
was configured. This would lead to connection timeouts due to the hung was configured. This would lead to connection timeouts due to the hung
response. response.
* `#414 <https://github.com/dw/mitogen/issues/414>`_,
`#425 <https://github.com/dw/mitogen/issues/425>`_: avoid deadlock of forked
children by reinitializing the :mod:`mitogen.service` pool lock.
* `#416 <https://github.com/dw/mitogen/issues/416>`_: around 1.4KiB of memory * `#416 <https://github.com/dw/mitogen/issues/416>`_: around 1.4KiB of memory
was leaked on every RPC, due to a list of strong references keeping alive any was leaked on every RPC, due to a list of strong references keeping alive any
handler ever registered for disconnect notification. handler ever registered for disconnect notification.
@ -390,22 +398,23 @@ bug reports, testing, features and fixes in this release contributed by
`Andreas Krüger <https://github.com/woopstar>`_, `Andreas Krüger <https://github.com/woopstar>`_,
`Berend De Schouwer <https://github.com/berenddeschouwer>`_, `Berend De Schouwer <https://github.com/berenddeschouwer>`_,
`Brian Candler <https://github.com/candlerb>`_, `Brian Candler <https://github.com/candlerb>`_,
`dsgnr <https://github.com/dsgnr>`_,
`Duane Zamrok <https://github.com/dewthefifth>`_, `Duane Zamrok <https://github.com/dewthefifth>`_,
`Eric Chang <https://github.com/changchichung>`_, `Eric Chang <https://github.com/changchichung>`_,
`Gerben Meijer <https://github.com/infernix>`_, `Gerben Meijer <https://github.com/infernix>`_,
`Guy Knights <https://github.com/knightsg>`_, `Guy Knights <https://github.com/knightsg>`_,
`Jesse London <https://github.com/jesteria>`_, `Jesse London <https://github.com/jesteria>`_,
`Jiří Vávra <https://github.com/Houbovo>`_, `Jiří Vávra <https://github.com/Houbovo>`_,
`Jonathan Rosser <https://github.com/jrosser>`_,
`Johan Beisser <https://github.com/jbeisser>`_, `Johan Beisser <https://github.com/jbeisser>`_,
`Jonathan Rosser <https://github.com/jrosser>`_,
`Josh Smift <https://github.com/jbscare>`_, `Josh Smift <https://github.com/jbscare>`_,
`Mehdi <https://github.com/mehdisat7>`_, `Mehdi <https://github.com/mehdisat7>`_,
`Michael DeHaan <https://github.com/mpdehaan>`_, `Michael DeHaan <https://github.com/mpdehaan>`_,
`Mohammed Naser <https://github.com/mnaser/>`_, `Mohammed Naser <https://github.com/mnaser/>`_,
`Peter V. Saveliev <https://github.com/svinota/>`_, `Peter V. Saveliev <https://github.com/svinota/>`_,
`Stéphane <https://github.com/sboisson/>`_, `Stéphane <https://github.com/sboisson/>`_,
`@whky <https://github.com/whky/>`_,
`@syntonym <https://github.com/syntonym/>`_, `@syntonym <https://github.com/syntonym/>`_,
`@whky <https://github.com/whky/>`_,
`@yodatak <https://github.com/yodatak/>`_, and `@yodatak <https://github.com/yodatak/>`_, and
`Younès HAFRI <https://github.com/yhafri>`_. `Younès HAFRI <https://github.com/yhafri>`_.

@ -0,0 +1,46 @@
# Wire up a ping/pong counting loop between 2 subprocesses.
from __future__ import print_function
import mitogen.core
import mitogen.select
@mitogen.core.takes_router
def ping_pong(control_sender, router):
with mitogen.core.Receiver(router) as recv:
# Tell caller how to communicate with us.
control_sender.send(recv.to_sender())
# Wait for caller to tell us how to talk back:
data_sender = recv.get().unpickle()
n = 0
while (n + 1) < 30:
n = recv.get().unpickle()
print('the number is currently', n)
data_sender.send(n + 1)
@mitogen.main()
def main(router):
# Create a receiver for control messages.
with mitogen.core.Receiver(router) as recv:
# Start ping_pong() in child 1 and fetch its sender.
c1 = router.local()
c1_call = c1.call_async(ping_pong, recv.to_sender())
c1_sender = recv.get().unpickle()
# Start ping_pong() in child 2 and fetch its sender.
c2 = router.local()
c2_call = c2.call_async(ping_pong, recv.to_sender())
c2_sender = recv.get().unpickle()
# Tell the children about each others' senders.
c1_sender.send(c2_sender)
c2_sender.send(c1_sender)
# Start the loop.
c1_sender.send(0)
# Wait for both functions to return.
mitogen.select.Select.all([c1_call, c2_call])

@ -714,7 +714,7 @@ class Message(object):
class Sender(object): class Sender(object):
""" """
Senders are used to send pickled messages to a handle in another context, Senders are used to send pickled messages to a handle in another context,
it is the inverse of :class:`mitogen.core.Sender`. it is the inverse of :class:`mitogen.core.Receiver`.
Senders may be serialized, making them convenient to wire up data flows. Senders may be serialized, making them convenient to wire up data flows.
See :meth:`mitogen.core.Receiver.to_sender` for more information. See :meth:`mitogen.core.Receiver.to_sender` for more information.
@ -785,10 +785,12 @@ class Receiver(object):
:param mitogen.core.Context respondent: :param mitogen.core.Context respondent:
Context this receiver is receiving from. If not :data:`None`, arranges Context this receiver is receiving from. If not :data:`None`, arranges
for the receiver to receive a dead message if messages can no longer be for the receiver to receive a dead message if messages can no longer be
routed to the context, due to disconnection or exit. routed to the context due to disconnection, and ignores messages that
did not originate from the respondent context.
""" """
#: If not :data:`None`, a reference to a function invoked as #: If not :data:`None`, a reference to a function invoked as
#: `notify(receiver)` when a new message is delivered to this receiver. #: `notify(receiver)` when a new message is delivered to this receiver. The
#: function is invoked on the broker thread, therefore it must not block.
#: Used by :class:`mitogen.select.Select` to implement waiting on multiple #: Used by :class:`mitogen.select.Select` to implement waiting on multiple
#: receivers. #: receivers.
notify = None notify = None
@ -813,6 +815,12 @@ class Receiver(object):
def __repr__(self): def __repr__(self):
return 'Receiver(%r, %r)' % (self.router, self.handle) return 'Receiver(%r, %r)' % (self.router, self.handle)
def __enter__(self):
return self
def __exit__(self, _1, _2, _3):
self.close()
def to_sender(self): def to_sender(self):
""" """
Return a :class:`Sender` configured to deliver messages to this Return a :class:`Sender` configured to deliver messages to this
@ -824,18 +832,20 @@ class Receiver(object):
sender.send(line) sender.send(line)
sender.close() sender.close()
@mitogen.main()
def main(router):
remote = router.ssh(hostname='mainframe') remote = router.ssh(hostname='mainframe')
recv = mitogen.core.Receiver(router) recv = mitogen.core.Receiver(router)
remote.call(deliver_monthly_report, recv.to_sender()) remote.call(deliver_monthly_report, recv.to_sender())
for msg in recv: for msg in recv:
print(msg) print(msg)
""" """
context = Context(self.router, mitogen.context_id) return Sender(self.router.myself(), self.handle)
return Sender(context, self.handle)
def _on_receive(self, msg): def _on_receive(self, msg):
""" """
Callback from the Stream; appends data to the internal queue. Callback registered for the handle with :class:`Router`; appends data
to the internal queue.
""" """
_vv and IOLOG.debug('%r._on_receive(%r)', self, msg) _vv and IOLOG.debug('%r._on_receive(%r)', self, msg)
self._latch.put(msg) self._latch.put(msg)
@ -873,15 +883,15 @@ class Receiver(object):
If not :data:`None`, specifies a timeout in seconds. If not :data:`None`, specifies a timeout in seconds.
:raises mitogen.core.ChannelError: :raises mitogen.core.ChannelError:
The remote end indicated the channel should be closed, or The remote end indicated the channel should be closed,
communication with its parent context was lost. communication with it was lost, or :meth:`close` was called in the
local process.
:raises mitogen.core.TimeoutError: :raises mitogen.core.TimeoutError:
Timeout was reached. Timeout was reached.
:returns: :returns:
`(msg, data)` tuple, where `msg` is the :class:`Message` that was :class:`Message` that was received.
received, and `data` is its unpickled data part.
""" """
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
try: try:
@ -910,6 +920,13 @@ class Channel(Sender, Receiver):
A channel inherits from :class:`mitogen.core.Sender` and A channel inherits from :class:`mitogen.core.Sender` and
`mitogen.core.Receiver` to provide bidirectional functionality. `mitogen.core.Receiver` to provide bidirectional functionality.
This class is incomplete and obsolete, it will be removed in Mitogen 0.3.
Channels were an early attempt at syntax sugar. It is always easier to pass
around unidirectional pairs of senders/receivers, even though the syntax is
baroque:
.. literalinclude:: ../examples/ping_pong.py
Since all handles aren't known until after both ends are constructed, for Since all handles aren't known until after both ends are constructed, for
both ends to communicate through a channel, it is necessary for one end to both ends to communicate through a channel, it is necessary for one end to
retrieve the handle allocated to the other and reconfigure its own channel retrieve the handle allocated to the other and reconfigure its own channel

@ -85,6 +85,10 @@ def on_fork():
mitogen.core.Latch._on_fork() mitogen.core.Latch._on_fork()
mitogen.core.Side._on_fork() mitogen.core.Side._on_fork()
mitogen__service = sys.modules.get('mitogen.service')
if mitogen__service:
mitogen__service._pool_lock = threading.Lock()
def handle_child_crash(): def handle_child_crash():
""" """

@ -1,4 +1,4 @@
# - import_playbook: multiple_items_loop.yml - import_playbook: multiple_items_loop.yml
- import_playbook: result_binary_producing_json.yml - import_playbook: result_binary_producing_json.yml
- import_playbook: result_binary_producing_junk.yml - import_playbook: result_binary_producing_junk.yml
- import_playbook: result_shell_echo_hi.yml - import_playbook: result_shell_echo_hi.yml

@ -146,5 +146,14 @@ class OnReceiveTest(testlib.RouterMixin, testlib.TestCase):
# TODO: what happens to a Select subscribed to the receiver in this case? # TODO: what happens to a Select subscribed to the receiver in this case?
class ToSenderTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.core.Receiver
def test_returned_context(self):
myself = self.router.myself()
recv = self.klass(self.router)
self.assertEquals(myself, recv.to_sender().context)
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

@ -212,6 +212,14 @@ class AddHandlerTest(testlib.TestCase):
router.broker.join() router.broker.join()
class MyselfTest(testlib.RouterMixin, testlib.TestCase):
def test_myself(self):
myself = self.router.myself()
self.assertEquals(myself.context_id, mitogen.context_id)
# TODO: context should know its own name too.
self.assertEquals(myself.name, 'self')
class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase): class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):
klass = mitogen.master.Router klass = mitogen.master.Router

Loading…
Cancel
Save