issue #533: update routing to account for DEL_ROUTE propagation race

new-serialization
David Wilson 5 years ago
parent 3d72cf82e3
commit bcca47df3c

@ -198,6 +198,18 @@ Core Library
`closed` flag, preventing historical bugs where a double close could destroy
descriptors belonging to unrelated streams.
* `#533 <https://github.com/dw/mitogen/issues/533>`_: routing accounts for
a race between a parent sending a message to a child via an intermediary,
where the child had recently disconnected, and ``DEL_ROUTE`` propagating from
the intermediary to the parent, informing it that the child no longer exists.
This condition is detected at the intermediary and a dead message is returned
to the parent.
Previously since the intermediary had already removed its route for the
child, the *route messages upwards* rule would be triggered, causing the
message (with a privileged ``src_id``/``auth_id``) to be sent upstream,
resulting in a ``bad auth_id`` log message and a hang.
* `#586 <https://github.com/dw/mitogen/issues/586>`_: fix import of
:mod:`__main__` on later versions of Python 3 when running from the
interactive console.

@ -3272,34 +3272,48 @@ class Router(object):
))
return
# Perform source verification.
parent_stream = self._stream_by_id.get(mitogen.parent_id)
src_stream = self._stream_by_id.get(msg.src_id, parent_stream)
# When the ingress stream is known, verify the message was received on
# the same as the stream we would expect to receive messages from the
# src_id and auth_id. This is like Reverse Path Filtering in IP, and
# ensures messages from a privileged context cannot be spoofed by a
# child.
if in_stream:
parent = self._stream_by_id.get(mitogen.parent_id)
expect = self._stream_by_id.get(msg.auth_id, parent)
if in_stream != expect:
auth_stream = self._stream_by_id.get(msg.auth_id, parent_stream)
if in_stream != auth_stream:
LOG.error('%r: bad auth_id: got %r via %r, not %r: %r',
self, msg.auth_id, in_stream, expect, msg)
self, msg.auth_id, in_stream, auth_stream, msg)
return
if msg.src_id != msg.auth_id:
expect = self._stream_by_id.get(msg.src_id, parent)
if in_stream != expect:
if msg.src_id != msg.auth_id and in_stream != src_stream:
LOG.error('%r: bad src_id: got %r via %r, not %r: %r',
self, msg.src_id, in_stream, expect, msg)
self, msg.src_id, in_stream, src_stream, msg)
return
# If the stream's MitogenProtocol has auth_id set, copy it to the
# message. This allows subtrees to become privileged by stamping a
# parent's context ID. It is used by mitogen.unix to mark client
# streams (like Ansible WorkerProcess) as having the same rights as
# the parent.
if in_stream.protocol.auth_id is not None:
msg.auth_id = in_stream.protocol.auth_id
# Maintain a set of IDs the source ever communicated with.
# Record the IDs the source ever communicated with.
in_stream.protocol.egress_ids.add(msg.dst_id)
if msg.dst_id == mitogen.context_id:
return self._invoke(msg, in_stream)
out_stream = self._stream_by_id.get(msg.dst_id)
if out_stream is None:
out_stream = self._stream_by_id.get(mitogen.parent_id)
if (not out_stream) and (parent_stream != src_stream or not in_stream):
# No downstream route exists. The message could be from a child or
# ourselves for a parent, in which case we must forward it
# upstream, or it could be from a parent for a dead child, in which
# case its src_id/auth_id would fail verification if returned to
# the parent, so in that case reply with a dead message instead.
out_stream = parent_stream
if out_stream is None:
self._maybe_send_dead(True, msg, self.no_route_msg,

@ -76,6 +76,34 @@ class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase):
expect = 'bad auth_id: got %r via' % (self.child2_msg.auth_id,)
self.assertTrue(expect in log.stop())
def test_parent_unaware_of_disconnect(self):
# Parent -> Child A -> Child B. B disconnects concurrent to Parent
# sending message. Parent does not yet know B has disconnected, A
# receives message from Parent with Parent's auth_id, for a stream that
# no longer exists.
c1 = self.router.local()
strm = self.router.stream_by_id(c1.context_id)
recv = mitogen.core.Receiver(self.router)
self.broker.defer(lambda:
strm.protocol._send(
mitogen.core.Message(
dst_id=1234, # nonexistent child
handle=1234,
src_id=mitogen.context_id,
reply_to=recv.handle,
)
)
)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get().unpickle()
)
self.assertEquals(e.args[0], self.router.no_route_msg % (
1234,
c1.context_id,
))
def test_bad_src_id(self):
# Deliver a message locally from child2 with the correct auth_id, but
# the wrong src_id.

Loading…
Cancel
Save