import sys import threading import unittest import mitogen.core import testlib def yield_stuff_then_die(sender): for x in range(5): sender.send(x) sender.close() return 10 class ConstructorTest(testlib.RouterMixin, testlib.TestCase): def test_handle(self): recv = mitogen.core.Receiver(self.router) self.assertIsInstance(recv.handle, int) self.assertGreater(recv.handle, 100) self.router.route( mitogen.core.Message.pickled( 'hi', dst_id=0, handle=recv.handle, ) ) self.assertEqual('hi', recv.get().unpickle()) class IterationTest(testlib.RouterMixin, testlib.TestCase): def test_dead_stops_iteration(self): recv = mitogen.core.Receiver(self.router) fork = self.router.local() ret = fork.call_async(yield_stuff_then_die, recv.to_sender()) self.assertEqual(list(range(5)), list(m.unpickle() for m in recv)) self.assertEqual(10, ret.get().unpickle()) def iter_and_put(self, recv, latch): try: for msg in recv: latch.put(msg) except Exception: latch.put(sys.exc_info()[1]) def test_close_stops_iteration(self): recv = mitogen.core.Receiver(self.router) latch = mitogen.core.Latch() t = threading.Thread( target=self.iter_and_put, args=(recv, latch), ) t.start() t.join(0.1) recv.close() t.join() self.assertTrue(latch.empty()) class CloseTest(testlib.RouterMixin, testlib.TestCase): def wait(self, latch, wait_recv): try: latch.put(wait_recv.get()) except Exception: latch.put(sys.exc_info()[1]) def test_closes_one(self): latch = mitogen.core.Latch() wait_recv = mitogen.core.Receiver(self.router) t = threading.Thread(target=lambda: self.wait(latch, wait_recv)) t.start() wait_recv.close() def throw(): raise latch.get() t.join() e = self.assertRaises(mitogen.core.ChannelError, throw) self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg) def test_closes_all(self): latch = mitogen.core.Latch() wait_recv = mitogen.core.Receiver(self.router) ts = [ threading.Thread(target=lambda: self.wait(latch, wait_recv)) for x in range(5) ] for t in ts: t.start() wait_recv.close() def throw(): raise latch.get() for x in range(5): e = self.assertRaises(mitogen.core.ChannelError, throw) self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg) for t in ts: t.join() class OnReceiveTest(testlib.RouterMixin, testlib.TestCase): # Verify behaviour of _on_receive dead message handling. A dead message # should unregister the receiver and wake all threads. def wait(self, latch, wait_recv): try: latch.put(wait_recv.get()) except Exception: latch.put(sys.exc_info()[1]) def test_sender_closes_one_thread(self): latch = mitogen.core.Latch() wait_recv = mitogen.core.Receiver(self.router) t = threading.Thread(target=lambda: self.wait(latch, wait_recv)) t.start() sender = wait_recv.to_sender() sender.close() def throw(): raise latch.get() t.join() e = self.assertRaises(mitogen.core.ChannelError, throw) self.assertEqual(e.args[0], sender.explicit_close_msg) @unittest.skip(reason=( 'Unclear if a asingle dead message received from remote should ' 'cause all threads to wake up.' )) def test_sender_closes_all_threads(self): latch = mitogen.core.Latch() wait_recv = mitogen.core.Receiver(self.router) ts = [ threading.Thread(target=lambda: self.wait(latch, wait_recv)) for x in range(5) ] for t in ts: t.start() sender = wait_recv.to_sender() sender.close() def throw(): raise latch.get() for x in range(5): e = self.assertRaises(mitogen.core.ChannelError, throw) self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg) for t in ts: t.join() # TODO: what happens to a Select subscribed to the receiver in this case? class ToSenderTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.core.Receiver def test_returned_context(self): myself = self.router.myself() recv = self.klass(self.router) self.assertEqual(myself, recv.to_sender().context)