core: dead messages have optional body, use it everywhere; closes #387.

issue260
David Wilson 6 years ago
parent fdcd6810e1
commit cf97932fad

@ -594,10 +594,11 @@ class Message(object):
return self.reply_to == IS_DEAD return self.reply_to == IS_DEAD
@classmethod @classmethod
def dead(cls, **kwargs): def dead(cls, reason=None, **kwargs):
""" """
Syntax helper to construct a dead message. Syntax helper to construct a dead message.
""" """
kwargs['data'] = (reason or u'').encode()
return cls(reply_to=IS_DEAD, **kwargs) return cls(reply_to=IS_DEAD, **kwargs)
@classmethod @classmethod
@ -645,6 +646,14 @@ class Message(object):
else: else:
UNPICKLER_KWARGS = {} UNPICKLER_KWARGS = {}
def _throw_dead(self):
if len(self.data):
raise ChannelError(self.data.decode(errors='replace'))
elif self.src_id == mitogen.context_id:
raise ChannelError(ChannelError.local_msg)
else:
raise ChannelError(ChannelError.remote_msg)
def unpickle(self, throw=True, throw_dead=True): def unpickle(self, throw=True, throw_dead=True):
""" """
Unpickle :attr:`data`, optionally raising any exceptions present. Unpickle :attr:`data`, optionally raising any exceptions present.
@ -660,7 +669,7 @@ class Message(object):
""" """
_vv and IOLOG.debug('%r.unpickle()', self) _vv and IOLOG.debug('%r.unpickle()', self)
if throw_dead and self.is_dead: if throw_dead and self.is_dead:
raise ChannelError(ChannelError.remote_msg) self._throw_dead()
obj = self._unpickled obj = self._unpickled
if obj is Message._unpickled: if obj is Message._unpickled:
@ -811,6 +820,8 @@ class Receiver(object):
if self.notify: if self.notify:
self.notify(self) self.notify(self)
closed_msg = 'the Receiver has been closed'
def close(self): def close(self):
""" """
Unregister the receiver's handle from its associated router, and cause Unregister the receiver's handle from its associated router, and cause
@ -820,7 +831,7 @@ class Receiver(object):
if self.handle: if self.handle:
self.router.del_handler(self.handle) self.router.del_handler(self.handle)
self.handle = None self.handle = None
self._latch.put(Message.dead()) self._latch.put(Message.dead(self.closed_msg))
def empty(self): def empty(self):
""" """
@ -853,10 +864,7 @@ class Receiver(object):
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
msg = self._latch.get(timeout=timeout, block=block) msg = self._latch.get(timeout=timeout, block=block)
if msg.is_dead and throw_dead: if msg.is_dead and throw_dead:
if msg.src_id == mitogen.context_id: msg._throw_dead()
raise ChannelError(ChannelError.local_msg)
else:
raise ChannelError(ChannelError.remote_msg)
return msg return msg
def __iter__(self): def __iter__(self):
@ -2117,10 +2125,12 @@ class Router(object):
del self._stream_by_id[context.context_id] del self._stream_by_id[context.context_id]
context.on_disconnect() context.on_disconnect()
broker_exit_msg = 'Broker has exitted'
def _on_broker_exit(self): def _on_broker_exit(self):
while self._handle_map: while self._handle_map:
_, (_, func, _, _) = self._handle_map.popitem() _, (_, func, _, _) = self._handle_map.popitem()
func(Message.dead()) func(Message.dead(self.broker_exit_msg))
def context_by_id(self, context_id, via_id=None, create=True, name=None): def context_by_id(self, context_id, via_id=None, create=True, name=None):
""" """
@ -2230,10 +2240,21 @@ class Router(object):
return handle return handle
refused_msg = 'refused by policy'
invalid_handle_msg = 'invalid handle'
too_large_msg = 'message too large (max %d bytes)'
respondent_disconnect_msg = 'the respondent Context has disconnected'
broker_shutdown_msg = 'Broker is shutting down'
no_route_msg = 'no route to %r, my ID is %r'
unidirectional_msg = (
'routing mode prevents forward of message from context %d via '
'context %d'
)
def _on_respondent_disconnect(self, context): def _on_respondent_disconnect(self, context):
for handle in self._handles_by_respondent.pop(context, ()): for handle in self._handles_by_respondent.pop(context, ()):
_, fn, _, _ = self._handle_map[handle] _, fn, _, _ = self._handle_map[handle]
fn(Message.dead()) fn(Message.dead(self.respondent_disconnect_msg))
del self._handle_map[handle] del self._handle_map[handle]
def on_shutdown(self, broker): def on_shutdown(self, broker):
@ -2243,37 +2264,30 @@ class Router(object):
fire(self, 'shutdown') fire(self, 'shutdown')
for handle, (persist, fn) in self._handle_map.iteritems(): for handle, (persist, fn) in self._handle_map.iteritems():
_v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) _v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(Message.dead()) fn(Message.dead(self.broker_shutdown_msg))
def _maybe_send_dead(self, msg): def _maybe_send_dead(self, msg, reason, *args):
if args:
reason %= args
LOG.debug('%r: %r is dead: %r', self, msg, reason)
if msg.reply_to and not msg.is_dead: if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead(), router=self) msg.reply(Message.dead(reason=reason), router=self)
refused_msg = 'Refused by policy.'
def _invoke(self, msg, stream): def _invoke(self, msg, stream):
# IOLOG.debug('%r._invoke(%r)', self, msg) # IOLOG.debug('%r._invoke(%r)', self, msg)
try: try:
persist, fn, policy, respondent = self._handle_map[msg.handle] persist, fn, policy, respondent = self._handle_map[msg.handle]
except KeyError: except KeyError:
LOG.error('%r: invalid handle: %r', self, msg) self._maybe_send_dead(msg, reason=self.invalid_handle_msg)
self._maybe_send_dead(msg)
return return
if respondent and not (msg.is_dead or if respondent and not (msg.is_dead or
msg.src_id == respondent.context_id): msg.src_id == respondent.context_id):
LOG.error('%r: reply from unexpected context: %r', self, msg) self._maybe_send_dead(msg, 'reply from unexpected context')
self._maybe_send_dead(msg)
return return
if policy and not policy(msg, stream): if policy and not policy(msg, stream):
LOG.error('%r: policy refused message: %r', self, msg) self._maybe_send_dead(msg, self.refused_msg)
if msg.reply_to:
self.route(Message.pickled(
CallError(self.refused_msg),
dst_id=msg.src_id,
handle=msg.reply_to
))
return return
if not persist: if not persist:
@ -2301,9 +2315,9 @@ class Router(object):
_vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream)
if len(msg.data) > self.max_message_size: if len(msg.data) > self.max_message_size:
LOG.error('message too large (max %d bytes): %r', self._maybe_send_dead(msg, self.too_large_msg % (
self.max_message_size, msg) self.max_message_size,
self._maybe_send_dead(msg) ))
return return
# Perform source verification. # Perform source verification.
@ -2336,17 +2350,14 @@ class Router(object):
out_stream = self._stream_by_id.get(mitogen.parent_id) out_stream = self._stream_by_id.get(mitogen.parent_id)
if out_stream is None: if out_stream is None:
if msg.reply_to not in (0, IS_DEAD): self._maybe_send_dead(msg, self.no_route_msg,
LOG.error('%r: no route for %r, my ID is %r', msg.dst_id, mitogen.context_id)
self, msg, mitogen.context_id)
self._maybe_send_dead(msg)
return return
if in_stream and self.unidirectional and not \ if in_stream and self.unidirectional and not \
(in_stream.is_privileged or out_stream.is_privileged): (in_stream.is_privileged or out_stream.is_privileged):
LOG.error('routing mode prevents forward of %r from %r -> %r', self._maybe_send_dead(msg, self.unidirectional_msg,
msg, in_stream, out_stream) in_stream.remote_id, out_stream.remote_id)
self._maybe_send_dead(msg)
return return
out_stream._send(msg) out_stream._send(msg)

@ -90,7 +90,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
self.broker.defer(stream.on_disconnect, self.broker) self.broker.defer(stream.on_disconnect, self.broker)
exc = self.assertRaises(mitogen.core.ChannelError, exc = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()) lambda: recv.get())
self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(exc.args[0], self.router.respondent_disconnect_msg)
def test_aborted_on_local_broker_shutdown(self): def test_aborted_on_local_broker_shutdown(self):
stream = self.router._stream_by_id[self.local.context_id] stream = self.router._stream_by_id[self.local.context_id]
@ -99,7 +99,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
self.broker.shutdown() self.broker.shutdown()
exc = self.assertRaises(mitogen.core.ChannelError, exc = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()) lambda: recv.get())
self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(exc.args[0], self.router.respondent_disconnect_msg)
def test_accepts_returns_context(self): def test_accepts_returns_context(self):
context = self.local.call(func_returns_arg, self.local) context = self.local.call(func_returns_arg, self.local)

@ -319,7 +319,7 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
c1.shutdown(wait=True) c1.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError, e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()) lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(e.args[0], self.router.respondent_disconnect_msg)
def test_indirect_child_disconnected(self): def test_indirect_child_disconnected(self):
# Achievement unlocked: process notices an indirectly connected child # Achievement unlocked: process notices an indirectly connected child
@ -330,7 +330,7 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
c2.shutdown(wait=True) c2.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError, e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()) lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(e.args[0], self.router.respondent_disconnect_msg)
def test_indirect_child_intermediary_disconnected(self): def test_indirect_child_intermediary_disconnected(self):
# Battlefield promotion: process notices indirect child disconnected # Battlefield promotion: process notices indirect child disconnected
@ -341,7 +341,7 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
c1.shutdown(wait=True) c1.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError, e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()) lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(e.args[0], self.router.respondent_disconnect_msg)
def test_near_sibling_disconnected(self): def test_near_sibling_disconnected(self):
# Hard mode: child notices sibling connected to same parent has # Hard mode: child notices sibling connected to same parent has
@ -357,9 +357,8 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
c2.shutdown(wait=True) c2.shutdown(wait=True)
e = self.assertRaises(mitogen.core.CallError, e = self.assertRaises(mitogen.core.CallError,
lambda: recv.get().unpickle()) lambda: recv.get().unpickle())
self.assertTrue(e.args[0].startswith( s = 'mitogen.core.ChannelError: ' + self.router.respondent_disconnect_msg
'mitogen.core.ChannelError: Channel closed by local end.' self.assertTrue(e.args[0].startswith(s))
))
def test_far_sibling_disconnected(self): def test_far_sibling_disconnected(self):
# God mode: child of child notices child of child of parent has # God mode: child of child notices child of child of parent has
@ -378,9 +377,8 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
c22.shutdown(wait=True) c22.shutdown(wait=True)
e = self.assertRaises(mitogen.core.CallError, e = self.assertRaises(mitogen.core.CallError,
lambda: recv.get().unpickle()) lambda: recv.get().unpickle())
self.assertTrue(e.args[0].startswith( s = 'mitogen.core.ChannelError: ' + self.router.respondent_disconnect_msg
'mitogen.core.ChannelError: Channel closed by local end.' self.assertTrue(e.args[0].startswith(s))
))
if __name__ == '__main__': if __name__ == '__main__':

@ -137,14 +137,13 @@ class PolicyTest(testlib.RouterMixin, testlib.TestCase):
self.sync_with_broker() self.sync_with_broker()
# Verify log. # Verify log.
expect = '%r: policy refused message: ' % (self.router,) self.assertTrue(self.router.refused_msg in log.stop())
self.assertTrue(expect in log.stop())
# Verify message was not delivered. # Verify message was not delivered.
self.assertTrue(recv.empty()) self.assertTrue(recv.empty())
# Verify CallError received by reply_to target. # Verify CallError received by reply_to target.
e = self.assertRaises(mitogen.core.CallError, e = self.assertRaises(mitogen.core.ChannelError,
lambda: reply_target.get().unpickle()) lambda: reply_target.get().unpickle())
self.assertEquals(e.args[0], self.router.refused_msg) self.assertEquals(e.args[0], self.router.refused_msg)
@ -212,14 +211,15 @@ class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):
logs = testlib.LogCapturer() logs = testlib.LogCapturer()
logs.start() logs.start()
expect = router.too_large_msg % (4096,)
# Try function call. Receiver should be woken by a dead message sent by # Try function call. Receiver should be woken by a dead message sent by
# router due to message size exceeded. # router due to message size exceeded.
child = router.fork() child = router.fork()
e = self.assertRaises(mitogen.core.ChannelError, e = self.assertRaises(mitogen.core.ChannelError,
lambda: child.call(zlib.crc32, ' '*8192)) lambda: child.call(zlib.crc32, ' '*8192))
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(e.args[0], expect)
expect = 'message too large (max 4096 bytes)'
self.assertTrue(expect in logs.stop()) self.assertTrue(expect in logs.stop())
def test_remote_configured(self): def test_remote_configured(self):
@ -252,11 +252,12 @@ class NoRouteTest(testlib.RouterMixin, testlib.TestCase):
msg = recv.get(throw_dead=False) msg = recv.get(throw_dead=False)
self.assertEquals(msg.is_dead, True) self.assertEquals(msg.is_dead, True)
self.assertEquals(msg.src_id, l1.context_id) self.assertEquals(msg.src_id, l1.context_id)
self.assertEquals(msg.data, self.router.invalid_handle_msg.encode())
recv = l1.send_async(mitogen.core.Message(handle=999)) recv = l1.send_async(mitogen.core.Message(handle=999))
e = self.assertRaises(mitogen.core.ChannelError, e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()) lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.remote_msg) self.assertEquals(e.args[0], self.router.invalid_handle_msg)
def test_totally_invalid_context_returns_dead(self): def test_totally_invalid_context_returns_dead(self):
recv = mitogen.core.Receiver(self.router) recv = mitogen.core.Receiver(self.router)
@ -269,11 +270,18 @@ class NoRouteTest(testlib.RouterMixin, testlib.TestCase):
rmsg = recv.get(throw_dead=False) rmsg = recv.get(throw_dead=False)
self.assertEquals(rmsg.is_dead, True) self.assertEquals(rmsg.is_dead, True)
self.assertEquals(rmsg.src_id, mitogen.context_id) self.assertEquals(rmsg.src_id, mitogen.context_id)
self.assertEquals(rmsg.data, (self.router.no_route_msg % (
1234,
mitogen.context_id,
)).encode())
self.router.route(msg) self.router.route(msg)
e = self.assertRaises(mitogen.core.ChannelError, e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()) lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(e.args[0], (self.router.no_route_msg % (
1234,
mitogen.context_id,
)))
def test_previously_alive_context_returns_dead(self): def test_previously_alive_context_returns_dead(self):
l1 = self.router.fork() l1 = self.router.fork()
@ -288,11 +296,18 @@ class NoRouteTest(testlib.RouterMixin, testlib.TestCase):
rmsg = recv.get(throw_dead=False) rmsg = recv.get(throw_dead=False)
self.assertEquals(rmsg.is_dead, True) self.assertEquals(rmsg.is_dead, True)
self.assertEquals(rmsg.src_id, mitogen.context_id) self.assertEquals(rmsg.src_id, mitogen.context_id)
self.assertEquals(rmsg.data, (self.router.no_route_msg % (
l1.context_id,
mitogen.context_id,
)).encode())
self.router.route(msg) self.router.route(msg)
e = self.assertRaises(mitogen.core.ChannelError, e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()) lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(e.args[0], self.router.no_route_msg % (
l1.context_id,
mitogen.context_id,
))
class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase): class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
@ -305,7 +320,10 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.CallError, e = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(ping_context, l1)) lambda: l2.call(ping_context, l1))
msg = 'mitogen.core.ChannelError: Channel closed by remote end.' msg = self.router.unidirectional_msg % (
l2.context_id,
l1.context_id,
)
self.assertTrue(msg in str(e)) self.assertTrue(msg in str(e))
self.assertTrue('routing mode prevents forward of ' in logs.stop()) self.assertTrue('routing mode prevents forward of ' in logs.stop())
@ -319,14 +337,11 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
l1s.is_privileged = True l1s.is_privileged = True
l2 = self.router.fork() l2 = self.router.fork()
logs = testlib.LogCapturer()
logs.start()
e = self.assertRaises(mitogen.core.CallError, e = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(ping_context, l1)) lambda: l2.call(ping_context, l1))
msg = 'mitogen.core.CallError: Refused by policy.' msg = 'mitogen.core.ChannelError: %s' % (self.router.refused_msg,)
self.assertTrue(msg in str(e)) self.assertTrue(str(e).startswith(msg))
self.assertTrue('policy refused message: ' in logs.stop())
class EgressIdsTest(testlib.RouterMixin, testlib.TestCase): class EgressIdsTest(testlib.RouterMixin, testlib.TestCase):

Loading…
Cancel
Save