diff --git a/mitogen/core.py b/mitogen/core.py index 45a7f379..f9099e9a 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2112,8 +2112,8 @@ class MitogenProtocol(Protocol): return False if msg_len > self._router.max_message_size: - LOG.error('Maximum message size exceeded (got %d, max %d)', - msg_len, self._router.max_message_size) + LOG.error('%r: Maximum message size exceeded (got %d, max %d)', + self, msg_len, self._router.max_message_size) self.stream.on_disconnect(broker) return False @@ -3191,28 +3191,55 @@ class Router(object): fn(Message.dead(self.respondent_disconnect_msg)) del self._handle_map[handle] - def _maybe_send_dead(self, msg, reason, *args): + def _maybe_send_dead(self, unreachable, msg, reason, *args): + """ + Send a dead message to either the original sender or the intended + recipient of `msg`, if the original sender was expecting a reply + (because its `reply_to` was set), otherwise assume the message is a + reply of some sort, and send the dead message to the original + destination. + + :param bool unreachable: + If :data:`True`, the recipient is known to be dead or routing + failed due to a security precaution, so don't attempt to fallback + to sending the dead message to the recipient if the original sender + did not include a reply address. + :param mitogen.core.Message msg: + Message that triggered the dead message. + :param str reason: + Human-readable error reason. + :param tuple args: + Elements to interpolate with `reason`. + """ if args: reason %= args LOG.debug('%r: %r is dead: %r', self, msg, reason) if msg.reply_to and not msg.is_dead: msg.reply(Message.dead(reason=reason), router=self) + elif not unreachable: + self._async_route( + Message.dead( + dst_id=msg.dst_id, + handle=msg.handle, + reason=reason, + ) + ) def _invoke(self, msg, stream): # IOLOG.debug('%r._invoke(%r)', self, msg) try: persist, fn, policy, respondent = self._handle_map[msg.handle] except KeyError: - self._maybe_send_dead(msg, reason=self.invalid_handle_msg) + self._maybe_send_dead(True, msg, reason=self.invalid_handle_msg) return if respondent and not (msg.is_dead or msg.src_id == respondent.context_id): - self._maybe_send_dead(msg, 'reply from unexpected context') + self._maybe_send_dead(True, msg, 'reply from unexpected context') return if policy and not policy(msg, stream): - self._maybe_send_dead(msg, self.refused_msg) + self._maybe_send_dead(True, msg, self.refused_msg) return if not persist: @@ -3240,7 +3267,7 @@ class Router(object): _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) if len(msg.data) > self.max_message_size: - self._maybe_send_dead(msg, self.too_large_msg % ( + self._maybe_send_dead(False, msg, self.too_large_msg % ( self.max_message_size, )) return @@ -3275,14 +3302,14 @@ class Router(object): out_stream = self._stream_by_id.get(mitogen.parent_id) if out_stream is None: - self._maybe_send_dead(msg, self.no_route_msg, + self._maybe_send_dead(True, msg, self.no_route_msg, msg.dst_id, mitogen.context_id) return if in_stream and self.unidirectional and not \ (in_stream.protocol.is_privileged or out_stream.protocol.is_privileged): - self._maybe_send_dead(msg, self.unidirectional_msg, + self._maybe_send_dead(True, msg, self.unidirectional_msg, in_stream.protocol.remote_id, out_stream.protocol.remote_id, mitogen.context_id) diff --git a/tests/router_test.py b/tests/router_test.py index 1cde016d..ef3fc4d5 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -11,6 +11,7 @@ import mitogen.core import mitogen.master import mitogen.parent import mitogen.utils +from mitogen.core import b try: import Queue @@ -258,6 +259,23 @@ class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase): self.assertTrue(expect in logs.stop()) + def test_remote_dead_message(self): + # Router should send dead message to original recipient when reply_to + # is unset. + router = self.klass(broker=self.broker, max_message_size=4096) + + # Try function call. Receiver should be woken by a dead message sent by + # router due to message size exceeded. + child = router.local() + recv = mitogen.core.Receiver(router) + + recv.to_sender().send(b('x') * 4097) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get().unpickle() + ) + expect = router.too_large_msg % (4096,) + self.assertEquals(e.args[0], expect) + def test_remote_configured(self): router = self.klass(broker=self.broker, max_message_size=64*1024) remote = router.local() @@ -510,7 +528,7 @@ class ShutdownTest(testlib.RouterMixin, testlib.TestCase): mitogen.context_id, )) - def test_disconnet_all(self): + def test_disconnect_all(self): l1 = self.router.local() l2 = self.router.local()