diff --git a/tests/parent_test.py b/tests/parent_test.py index 797845df..0bd8079c 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -31,8 +31,25 @@ def wait_for_child(pid, timeout=1.0): @mitogen.core.takes_econtext -def call_func_in_sibling(ctx, econtext): - ctx.call(time.sleep, 99999) +def call_func_in_sibling(ctx, econtext, sync_sender): + recv = ctx.call_async(time.sleep, 99999) + sync_sender.send(None) + recv.get().unpickle() + + +def wait_for_empty_output_queue(sync_recv, context): + # wait for sender to submit their RPC. Since the RPC is sent first, the + # message sent to this sender cannot arrive until we've routed the RPC. + sync_recv.get() + + router = context.router + broker = router.broker + while True: + # Now wait for the RPC to exit the output queue. + stream = router.stream_by_id(context.context_id) + if broker.defer_sync(lambda: stream.pending_bytes()) == 0: + return + time.sleep(0.1) class GetDefaultRemoteNameTest(testlib.TestCase): @@ -353,12 +370,17 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase): self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id c1.call(mitogen.parent.upgrade_router) - recv = c1.call_async(call_func_in_sibling, c2) + sync_recv = mitogen.core.Receiver(self.router) + recv = c1.call_async(call_func_in_sibling, c2, + sync_sender=sync_recv.to_sender()) + + wait_for_empty_output_queue(sync_recv, c2) c2.shutdown(wait=True) + e = self.assertRaises(mitogen.core.CallError, lambda: recv.get().unpickle()) s = 'mitogen.core.ChannelError: ' + self.router.respondent_disconnect_msg - self.assertTrue(e.args[0].startswith(s)) + self.assertTrue(e.args[0].startswith(s), str(e)) def test_far_sibling_disconnected(self): # God mode: child of child notices child of child of parent has @@ -373,8 +395,13 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase): self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id c11.call(mitogen.parent.upgrade_router) - recv = c11.call_async(call_func_in_sibling, c22) + sync_recv = mitogen.core.Receiver(self.router) + recv = c11.call_async(call_func_in_sibling, c22, + sync_sender=sync_recv.to_sender()) + + wait_for_empty_output_queue(sync_recv, c22) c22.shutdown(wait=True) + e = self.assertRaises(mitogen.core.CallError, lambda: recv.get().unpickle()) s = 'mitogen.core.ChannelError: ' + self.router.respondent_disconnect_msg