issue #615: route a dead message to recipients when no reply is expected

new-serialization
David Wilson 5 years ago
parent 151b490890
commit 3f5ff17c8c

@ -2112,8 +2112,8 @@ class MitogenProtocol(Protocol):
return False return False
if msg_len > self._router.max_message_size: if msg_len > self._router.max_message_size:
LOG.error('Maximum message size exceeded (got %d, max %d)', LOG.error('%r: Maximum message size exceeded (got %d, max %d)',
msg_len, self._router.max_message_size) self, msg_len, self._router.max_message_size)
self.stream.on_disconnect(broker) self.stream.on_disconnect(broker)
return False return False
@ -3191,28 +3191,55 @@ class Router(object):
fn(Message.dead(self.respondent_disconnect_msg)) fn(Message.dead(self.respondent_disconnect_msg))
del self._handle_map[handle] del self._handle_map[handle]
def _maybe_send_dead(self, msg, reason, *args): def _maybe_send_dead(self, unreachable, msg, reason, *args):
"""
Send a dead message to either the original sender or the intended
recipient of `msg`, if the original sender was expecting a reply
(because its `reply_to` was set), otherwise assume the message is a
reply of some sort, and send the dead message to the original
destination.
:param bool unreachable:
If :data:`True`, the recipient is known to be dead or routing
failed due to a security precaution, so don't attempt to fallback
to sending the dead message to the recipient if the original sender
did not include a reply address.
:param mitogen.core.Message msg:
Message that triggered the dead message.
:param str reason:
Human-readable error reason.
:param tuple args:
Elements to interpolate with `reason`.
"""
if args: if args:
reason %= args reason %= args
LOG.debug('%r: %r is dead: %r', self, msg, reason) LOG.debug('%r: %r is dead: %r', self, msg, reason)
if msg.reply_to and not msg.is_dead: if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead(reason=reason), router=self) msg.reply(Message.dead(reason=reason), router=self)
elif not unreachable:
self._async_route(
Message.dead(
dst_id=msg.dst_id,
handle=msg.handle,
reason=reason,
)
)
def _invoke(self, msg, stream): def _invoke(self, msg, stream):
# IOLOG.debug('%r._invoke(%r)', self, msg) # IOLOG.debug('%r._invoke(%r)', self, msg)
try: try:
persist, fn, policy, respondent = self._handle_map[msg.handle] persist, fn, policy, respondent = self._handle_map[msg.handle]
except KeyError: except KeyError:
self._maybe_send_dead(msg, reason=self.invalid_handle_msg) self._maybe_send_dead(True, msg, reason=self.invalid_handle_msg)
return return
if respondent and not (msg.is_dead or if respondent and not (msg.is_dead or
msg.src_id == respondent.context_id): msg.src_id == respondent.context_id):
self._maybe_send_dead(msg, 'reply from unexpected context') self._maybe_send_dead(True, msg, 'reply from unexpected context')
return return
if policy and not policy(msg, stream): if policy and not policy(msg, stream):
self._maybe_send_dead(msg, self.refused_msg) self._maybe_send_dead(True, msg, self.refused_msg)
return return
if not persist: if not persist:
@ -3240,7 +3267,7 @@ class Router(object):
_vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream)
if len(msg.data) > self.max_message_size: if len(msg.data) > self.max_message_size:
self._maybe_send_dead(msg, self.too_large_msg % ( self._maybe_send_dead(False, msg, self.too_large_msg % (
self.max_message_size, self.max_message_size,
)) ))
return return
@ -3275,14 +3302,14 @@ class Router(object):
out_stream = self._stream_by_id.get(mitogen.parent_id) out_stream = self._stream_by_id.get(mitogen.parent_id)
if out_stream is None: if out_stream is None:
self._maybe_send_dead(msg, self.no_route_msg, self._maybe_send_dead(True, msg, self.no_route_msg,
msg.dst_id, mitogen.context_id) msg.dst_id, mitogen.context_id)
return return
if in_stream and self.unidirectional and not \ if in_stream and self.unidirectional and not \
(in_stream.protocol.is_privileged or (in_stream.protocol.is_privileged or
out_stream.protocol.is_privileged): out_stream.protocol.is_privileged):
self._maybe_send_dead(msg, self.unidirectional_msg, self._maybe_send_dead(True, msg, self.unidirectional_msg,
in_stream.protocol.remote_id, in_stream.protocol.remote_id,
out_stream.protocol.remote_id, out_stream.protocol.remote_id,
mitogen.context_id) mitogen.context_id)

@ -11,6 +11,7 @@ import mitogen.core
import mitogen.master import mitogen.master
import mitogen.parent import mitogen.parent
import mitogen.utils import mitogen.utils
from mitogen.core import b
try: try:
import Queue import Queue
@ -258,6 +259,23 @@ class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):
self.assertTrue(expect in logs.stop()) self.assertTrue(expect in logs.stop())
def test_remote_dead_message(self):
# Router should send dead message to original recipient when reply_to
# is unset.
router = self.klass(broker=self.broker, max_message_size=4096)
# Try function call. Receiver should be woken by a dead message sent by
# router due to message size exceeded.
child = router.local()
recv = mitogen.core.Receiver(router)
recv.to_sender().send(b('x') * 4097)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get().unpickle()
)
expect = router.too_large_msg % (4096,)
self.assertEquals(e.args[0], expect)
def test_remote_configured(self): def test_remote_configured(self):
router = self.klass(broker=self.broker, max_message_size=64*1024) router = self.klass(broker=self.broker, max_message_size=64*1024)
remote = router.local() remote = router.local()
@ -510,7 +528,7 @@ class ShutdownTest(testlib.RouterMixin, testlib.TestCase):
mitogen.context_id, mitogen.context_id,
)) ))
def test_disconnet_all(self): def test_disconnect_all(self):
l1 = self.router.local() l1 = self.router.local()
l2 = self.router.local() l2 = self.router.local()

Loading…
Cancel
Save