From 388649df9780c71d62ef8f990c723c7d9a13a9ae Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 19 Jan 2019 16:08:44 +0000 Subject: [PATCH] core: Receiver.close() now wakes all threads; closes #446. --- docs/changelog.rst | 6 ++++++ mitogen/core.py | 7 +++++-- tests/receiver_test.py | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 715d6383..567ea382 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -304,6 +304,12 @@ Core Library have dead messages sent in reply to them, preventing peer contexts from hanging due to a forgotten buffered message. +* `#446 `_: given thread A calling + :meth:`mitogen.core.Receiver.close`, and thread B, C, and D sleeping in + :meth:`mitogen.core.Receiver.get`, previously only one sleeping thread would + be woken with :class:`mitogen.core.ChannelError` when the receiver was + closed. Now all threads are woken per the docstring. + * `#447 `_: duplicate attempts to invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised, ensuring accidental re-registration of service pools are reported correctly. diff --git a/mitogen/core.py b/mitogen/core.py index dad472d7..fb182e3c 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -848,7 +848,7 @@ class Receiver(object): if self.handle: self.router.del_handler(self.handle) self.handle = None - self._latch.put(Message.dead(self.closed_msg)) + self._latch.close() def empty(self): """ @@ -879,7 +879,10 @@ class Receiver(object): received, and `data` is its unpickled data part. """ _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) - msg = self._latch.get(timeout=timeout, block=block) + try: + msg = self._latch.get(timeout=timeout, block=block) + except LatchError: + raise ChannelError(self.closed_msg) if msg.is_dead and throw_dead: msg._throw_dead() return msg diff --git a/tests/receiver_test.py b/tests/receiver_test.py index 5dd6273a..75f61b40 100644 --- a/tests/receiver_test.py +++ b/tests/receiver_test.py @@ -1,4 +1,6 @@ +import sys +import threading import unittest2 import mitogen.core @@ -36,5 +38,43 @@ class IterationTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(10, ret.get().unpickle()) +class CloseTest(testlib.RouterMixin, testlib.TestCase): + def wait(self, latch, wait_recv): + try: + latch.put(wait_recv.get()) + except Exception: + latch.put(sys.exc_info()[1]) + + def test_closes_one(self): + latch = mitogen.core.Latch() + wait_recv = mitogen.core.Receiver(self.router) + t = threading.Thread(target=lambda: self.wait(latch, wait_recv)) + t.start() + wait_recv.close() + def throw(): + raise latch.get() + t.join() + e = self.assertRaises(mitogen.core.ChannelError, throw) + self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg) + + def test_closes_all(self): + latch = mitogen.core.Latch() + wait_recv = mitogen.core.Receiver(self.router) + ts = [ + threading.Thread(target=lambda: self.wait(latch, wait_recv)) + for x in range(5) + ] + for t in ts: + t.start() + wait_recv.close() + def throw(): + raise latch.get() + for x in range(5): + e = self.assertRaises(mitogen.core.ChannelError, throw) + self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg) + for t in ts: + t.join() + + if __name__ == '__main__': unittest2.main()