diff --git a/docs/changelog.rst b/docs/changelog.rst index 715d6383..567ea382 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -304,6 +304,12 @@ Core Library have dead messages sent in reply to them, preventing peer contexts from hanging due to a forgotten buffered message. +* `#446 `_: given thread A calling + :meth:`mitogen.core.Receiver.close`, and thread B, C, and D sleeping in + :meth:`mitogen.core.Receiver.get`, previously only one sleeping thread would + be woken with :class:`mitogen.core.ChannelError` when the receiver was + closed. Now all threads are woken per the docstring. + * `#447 `_: duplicate attempts to invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised, ensuring accidental re-registration of service pools are reported correctly. diff --git a/mitogen/core.py b/mitogen/core.py index dad472d7..8c03145f 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -742,7 +742,8 @@ class Sender(object): self.context.send( Message.dead( reason=self.explicit_close_msg, - handle=self.dst_handle) + handle=self.dst_handle + ) ) def __repr__(self): @@ -848,7 +849,7 @@ class Receiver(object): if self.handle: self.router.del_handler(self.handle) self.handle = None - self._latch.put(Message.dead(self.closed_msg)) + self._latch.close() def empty(self): """ @@ -879,7 +880,10 @@ class Receiver(object): received, and `data` is its unpickled data part. """ _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) - msg = self._latch.get(timeout=timeout, block=block) + try: + msg = self._latch.get(timeout=timeout, block=block) + except LatchError: + raise ChannelError(self.closed_msg) if msg.is_dead and throw_dead: msg._throw_dead() return msg @@ -1881,8 +1885,17 @@ class Latch(object): though a subsequent call to :meth:`get` will block, since another waiting thread may be woken at any moment between :meth:`empty` and :meth:`get`. + + :raises LatchError: + The latch has already been marked closed. """ - return len(self._queue) == 0 + self._lock.acquire() + try: + if self.closed: + raise LatchError() + return len(self._queue) == 0 + finally: + self._lock.release() def _get_socketpair(self): """ diff --git a/mitogen/unix.py b/mitogen/unix.py index 12182a28..44139ae5 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -66,6 +66,13 @@ def make_socket_path(): class Listener(mitogen.core.BasicStream): keep_alive = True + def __repr__(self): + return '%s.%s(%r)' % ( + __name__, + self.__class__.__name__, + self.path, + ) + def __init__(self, router, path=None, backlog=100): self._router = router self.path = path or make_socket_path() diff --git a/tests/latch_test.py b/tests/latch_test.py index 5be12d73..6ae43221 100644 --- a/tests/latch_test.py +++ b/tests/latch_test.py @@ -21,6 +21,13 @@ class EmptyTest(testlib.TestCase): latch.put(None) self.assertTrue(not latch.empty()) + def test_closed_is_empty(self): + latch = self.klass() + latch.put(None) + latch.close() + self.assertRaises(mitogen.core.LatchError, + lambda: latch.empty()) + class GetTest(testlib.TestCase): klass = mitogen.core.Latch diff --git a/tests/receiver_test.py b/tests/receiver_test.py index 5dd6273a..550b4525 100644 --- a/tests/receiver_test.py +++ b/tests/receiver_test.py @@ -1,4 +1,6 @@ +import sys +import threading import unittest2 import mitogen.core @@ -36,5 +38,92 @@ class IterationTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(10, ret.get().unpickle()) +class CloseTest(testlib.RouterMixin, testlib.TestCase): + def wait(self, latch, wait_recv): + try: + latch.put(wait_recv.get()) + except Exception: + latch.put(sys.exc_info()[1]) + + def test_closes_one(self): + latch = mitogen.core.Latch() + wait_recv = mitogen.core.Receiver(self.router) + t = threading.Thread(target=lambda: self.wait(latch, wait_recv)) + t.start() + wait_recv.close() + def throw(): + raise latch.get() + t.join() + e = self.assertRaises(mitogen.core.ChannelError, throw) + self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg) + + def test_closes_all(self): + latch = mitogen.core.Latch() + wait_recv = mitogen.core.Receiver(self.router) + ts = [ + threading.Thread(target=lambda: self.wait(latch, wait_recv)) + for x in range(5) + ] + for t in ts: + t.start() + wait_recv.close() + def throw(): + raise latch.get() + for x in range(5): + e = self.assertRaises(mitogen.core.ChannelError, throw) + self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg) + for t in ts: + t.join() + + +class OnReceiveTest(testlib.RouterMixin, testlib.TestCase): + # Verify behaviour of _on_receive dead message handling. A dead message + # should unregister the receiver and wake all threads. + + def wait(self, latch, wait_recv): + try: + latch.put(wait_recv.get()) + except Exception: + latch.put(sys.exc_info()[1]) + + def test_sender_closes_one_thread(self): + latch = mitogen.core.Latch() + wait_recv = mitogen.core.Receiver(self.router) + t = threading.Thread(target=lambda: self.wait(latch, wait_recv)) + t.start() + sender = wait_recv.to_sender() + sender.close() + def throw(): + raise latch.get() + t.join() + e = self.assertRaises(mitogen.core.ChannelError, throw) + self.assertEquals(e.args[0], sender.explicit_close_msg) + + @unittest2.skip(reason=( + 'Unclear if a asingle dead message received from remote should ' + 'cause all threads to wake up.' + )) + def test_sender_closes_all_threads(self): + latch = mitogen.core.Latch() + wait_recv = mitogen.core.Receiver(self.router) + ts = [ + threading.Thread(target=lambda: self.wait(latch, wait_recv)) + for x in range(5) + ] + for t in ts: + t.start() + sender = wait_recv.to_sender() + sender.close() + def throw(): + raise latch.get() + for x in range(5): + e = self.assertRaises(mitogen.core.ChannelError, throw) + self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg) + for t in ts: + t.join() + + # TODO: what happens to a Select subscribed to the receiver in this case? + + if __name__ == '__main__': unittest2.main()