diff --git a/mitogen/core.py b/mitogen/core.py index 334166c1..65432062 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -638,7 +638,7 @@ class Stream(BasicStream): msg.data = self._input_buf[self.HEADER_LEN:self.HEADER_LEN+msg_len] self._input_buf = self._input_buf[self.HEADER_LEN+msg_len:] - self._router._async_route(msg) + self._router._async_route(msg, self) return True def on_transmit(self, broker): @@ -929,8 +929,16 @@ class Router(object): except Exception: LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) - def _async_route(self, msg): - IOLOG.debug('%r._async_route(%r)', self, msg) + def _async_route(self, msg, stream=None): + IOLOG.debug('%r._async_route(%r, %r)', self, msg, stream) + # Perform source verification. + if stream is not None: + expected_stream = self._stream_by_id.get(msg.src_id, + self._stream_by_id.get(mitogen.parent_id)) + if stream != expected_stream: + LOG.error('%r: bad source: got %r from %r, should be from %r', + self, msg, stream, expected_stream) + if msg.dst_id == mitogen.context_id: return self._invoke(msg)