import logging import subprocess import time import zlib import unittest2 import testlib import mitogen.master import mitogen.parent import mitogen.utils try: import Queue except ImportError: import queue as Queue def ping(): return True @mitogen.core.takes_router def ping_context(other, router): other = mitogen.parent.Context(router, other.context_id) other.call(ping) @mitogen.core.takes_router def return_router_max_message_size(router): return router.max_message_size def send_n_sized_reply(sender, n): sender.send(' ' * n) return 123 class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase): def setUp(self): super(SourceVerifyTest, self).setUp() # Create some children, ping them, and store what their messages look # like so we can mess with them later. self.child1 = self.router.fork() self.child1_msg = self.child1.call_async(ping).get() self.child1_stream = self.router._stream_by_id[self.child1.context_id] self.child2 = self.router.fork() self.child2_msg = self.child2.call_async(ping).get() self.child2_stream = self.router._stream_by_id[self.child2.context_id] def test_bad_auth_id(self): # Deliver a message locally from child2, but using child1's stream. log = testlib.LogCapturer() log.start() # Used to ensure the message was dropped rather than routed after the # error is logged. recv = mitogen.core.Receiver(self.router) self.child2_msg.handle = recv.handle self.broker.defer(self.router._async_route, self.child2_msg, in_stream=self.child1_stream) # Wait for IO loop to finish everything above. self.sync_with_broker() # Ensure message wasn't forwarded. self.assertTrue(recv.empty()) # Ensure error was logged. expect = 'bad auth_id: got %d via' % (self.child2_msg.auth_id,) self.assertTrue(expect in log.stop()) def test_bad_src_id(self): # Deliver a message locally from child2 with the correct auth_id, but # the wrong src_id. log = testlib.LogCapturer() log.start() # Used to ensure the message was dropped rather than routed after the # error is logged. recv = mitogen.core.Receiver(self.router) self.child2_msg.handle = recv.handle self.child2_msg.src_id = self.child1.context_id self.broker.defer(self.router._async_route, self.child2_msg, self.child2_stream) # Wait for IO loop to finish everything above. self.sync_with_broker() # Ensure message wasn't forwarded. self.assertTrue(recv.empty()) # Ensure error was lgoged. expect = 'bad src_id: got %d via' % (self.child1_msg.src_id,) self.assertTrue(expect in log.stop()) class PolicyTest(testlib.RouterMixin, testlib.TestCase): def test_allow_any(self): # This guy gets everything. recv = mitogen.core.Receiver(self.router) recv.to_sender().send(123) self.sync_with_broker() self.assertFalse(recv.empty()) self.assertEquals(123, recv.get().unpickle()) def test_refuse_all(self): # Deliver a message locally from child2 with the correct auth_id, but # the wrong src_id. log = testlib.LogCapturer() log.start() # This guy never gets anything. recv = mitogen.core.Receiver( router=self.router, policy=(lambda msg, stream: False), ) # This guy becomes the reply_to of our refused message. reply_target = mitogen.core.Receiver(self.router) # Send the message. self.router.route( mitogen.core.Message( dst_id=mitogen.context_id, handle=recv.handle, reply_to=reply_target.handle, ) ) # Wait for IO loop. self.sync_with_broker() # Verify log. self.assertTrue(self.router.refused_msg in log.stop()) # Verify message was not delivered. self.assertTrue(recv.empty()) # Verify CallError received by reply_to target. e = self.assertRaises(mitogen.core.ChannelError, lambda: reply_target.get().unpickle()) self.assertEquals(e.args[0], self.router.refused_msg) class CrashTest(testlib.BrokerMixin, testlib.TestCase): # This is testing both Broker's ability to crash nicely, and Router's # ability to respond to the crash event. klass = mitogen.master.Router def _naughty(self): raise ValueError('eek') def test_shutdown(self): router = self.klass(self.broker) sem = mitogen.core.Latch() router.add_handler(sem.put) log = testlib.LogCapturer('mitogen') log.start() # Force a crash and ensure it wakes up. self.broker._loop_once = self._naughty self.broker.defer(lambda: None) # sem should have received dead message. self.assertTrue(sem.get().is_dead) # Ensure it was logged. expect = '_broker_main() crashed' self.assertTrue(expect in log.stop()) class AddHandlerTest(testlib.TestCase): klass = mitogen.master.Router def test_dead_message_sent_at_shutdown(self): router = self.klass() queue = Queue.Queue() handle = router.add_handler(queue.put) router.broker.shutdown() self.assertTrue(queue.get(timeout=5).is_dead) router.broker.join() def test_cannot_double_register(self): router = self.klass() try: router.add_handler((lambda: None), handle=1234) e = self.assertRaises(mitogen.core.Error, lambda: router.add_handler((lambda: None), handle=1234)) self.assertEquals(router.duplicate_handle_msg, e.args[0]) router.del_handler(1234) finally: router.broker.shutdown() router.broker.join() def test_can_reregister(self): router = self.klass() try: router.add_handler((lambda: None), handle=1234) router.del_handler(1234) router.add_handler((lambda: None), handle=1234) router.del_handler(1234) finally: router.broker.shutdown() router.broker.join() class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase): klass = mitogen.master.Router def test_local_exceeded(self): router = self.klass(broker=self.broker, max_message_size=4096) logs = testlib.LogCapturer() logs.start() # Send message and block for one IO loop, so _async_route can run. router.route(mitogen.core.Message.pickled(' '*8192)) router.broker.defer_sync(lambda: None) expect = 'message too large (max 4096 bytes)' self.assertTrue(expect in logs.stop()) def test_local_dead_message(self): # Local router should generate dead message when reply_to is set. router = self.klass(broker=self.broker, max_message_size=4096) logs = testlib.LogCapturer() logs.start() expect = router.too_large_msg % (4096,) # Try function call. Receiver should be woken by a dead message sent by # router due to message size exceeded. child = router.fork() e = self.assertRaises(mitogen.core.ChannelError, lambda: child.call(zlib.crc32, ' '*8192)) self.assertEquals(e.args[0], expect) self.assertTrue(expect in logs.stop()) def test_remote_configured(self): router = self.klass(broker=self.broker, max_message_size=4096) remote = router.fork() size = remote.call(return_router_max_message_size) self.assertEquals(size, 4096) def test_remote_exceeded(self): # Ensure new contexts receive a router with the same value. router = self.klass(broker=self.broker, max_message_size=4096) recv = mitogen.core.Receiver(router) logs = testlib.LogCapturer() logs.start() remote = router.fork() remote.call(send_n_sized_reply, recv.to_sender(), 8192) expect = 'message too large (max 4096 bytes)' self.assertTrue(expect in logs.stop()) class NoRouteTest(testlib.RouterMixin, testlib.TestCase): def test_invalid_handle_returns_dead(self): # Verify sending a message to an invalid handle yields a dead message # from the target context. l1 = self.router.fork() recv = l1.send_async(mitogen.core.Message(handle=999)) msg = recv.get(throw_dead=False) self.assertEquals(msg.is_dead, True) self.assertEquals(msg.src_id, l1.context_id) self.assertEquals(msg.data, self.router.invalid_handle_msg.encode()) recv = l1.send_async(mitogen.core.Message(handle=999)) e = self.assertRaises(mitogen.core.ChannelError, lambda: recv.get()) self.assertEquals(e.args[0], self.router.invalid_handle_msg) def test_totally_invalid_context_returns_dead(self): recv = mitogen.core.Receiver(self.router) msg = mitogen.core.Message( dst_id=1234, handle=1234, reply_to=recv.handle, ) self.router.route(msg) rmsg = recv.get(throw_dead=False) self.assertEquals(rmsg.is_dead, True) self.assertEquals(rmsg.src_id, mitogen.context_id) self.assertEquals(rmsg.data, (self.router.no_route_msg % ( 1234, mitogen.context_id, )).encode()) self.router.route(msg) e = self.assertRaises(mitogen.core.ChannelError, lambda: recv.get()) self.assertEquals(e.args[0], (self.router.no_route_msg % ( 1234, mitogen.context_id, ))) def test_previously_alive_context_returns_dead(self): l1 = self.router.fork() l1.shutdown(wait=True) recv = mitogen.core.Receiver(self.router) msg = mitogen.core.Message( dst_id=l1.context_id, handle=mitogen.core.CALL_FUNCTION, reply_to=recv.handle, ) self.router.route(msg) rmsg = recv.get(throw_dead=False) self.assertEquals(rmsg.is_dead, True) self.assertEquals(rmsg.src_id, mitogen.context_id) self.assertEquals(rmsg.data, (self.router.no_route_msg % ( l1.context_id, mitogen.context_id, )).encode()) self.router.route(msg) e = self.assertRaises(mitogen.core.ChannelError, lambda: recv.get()) self.assertEquals(e.args[0], self.router.no_route_msg % ( l1.context_id, mitogen.context_id, )) class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase): def test_siblings_cant_talk(self): self.router.unidirectional = True l1 = self.router.fork() l2 = self.router.fork() logs = testlib.LogCapturer() logs.start() e = self.assertRaises(mitogen.core.CallError, lambda: l2.call(ping_context, l1)) msg = self.router.unidirectional_msg % ( l2.context_id, l1.context_id, ) self.assertTrue(msg in str(e)) self.assertTrue('routing mode prevents forward of ' in logs.stop()) def test_auth_id_can_talk(self): self.router.unidirectional = True # One stream has auth_id stamped to that of the master, so it should be # treated like a parent. l1 = self.router.fork() l1s = self.router.stream_by_id(l1.context_id) l1s.auth_id = mitogen.context_id l1s.is_privileged = True l2 = self.router.fork() e = self.assertRaises(mitogen.core.CallError, lambda: l2.call(ping_context, l1)) msg = 'mitogen.core.ChannelError: %s' % (self.router.refused_msg,) self.assertTrue(str(e).startswith(msg)) class EgressIdsTest(testlib.RouterMixin, testlib.TestCase): def test_egress_ids_populated(self): # Ensure Stream.egress_ids is populated on message reception. c1 = self.router.fork() stream = self.router.stream_by_id(c1.context_id) self.assertEquals(set(), stream.egress_ids) c1.call(time.sleep, 0) self.assertEquals(set([mitogen.context_id]), stream.egress_ids) if __name__ == '__main__': unittest2.main()