From 300cb41e2eeb0eaac4fa21779cc3022f00ed371a Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 10 Dec 2018 02:23:26 +0000 Subject: [PATCH] core: detect stream corruption. Closes #438. --- docs/changelog.rst | 3 +++ docs/howitworks.rst | 4 ++++ mitogen/core.py | 23 ++++++++++++++++++----- tests/stream_test.py | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 tests/stream_test.py diff --git a/docs/changelog.rst b/docs/changelog.rst index dbb2a035..0fd257ea 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -273,6 +273,9 @@ Core Library fail to start if :data:`sys.stdout` was opened in block buffered mode, and buffered data was pending in the parent prior to fork. +* `#438 `_: a descriptive error is + logged when stream corruption is detected. + * `#439 `_: descriptive errors are raised when attempting to invoke unsupported function types. diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 1ff0af48..5bc7b53d 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -273,6 +273,10 @@ parent and child. Integers use big endian in their encoded form. - Size - Description + * - `magic` + - 2 + - Integer 0x4d49 (``MI``), used to detect stream corruption. + * - `dst_id` - 4 - Integer target context ID. :py:class:`Router` delivers messages diff --git a/mitogen/core.py b/mitogen/core.py index 502e6f2f..a55fe93a 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1393,8 +1393,16 @@ class Stream(BasicStream): self._internal_receive(broker, buf) - HEADER_FMT = '>LLLLLL' + HEADER_FMT = '>hLLLLLL' HEADER_LEN = struct.calcsize(HEADER_FMT) + HEADER_MAGIC = 0x4d49 # 'MI' + + corrupt_msg = ( + 'Corruption detected: frame signature incorrect. This likely means ' + 'some external process is interfering with the connection. Received:' + '\n\n' + '%r' + ) def _receive_one(self, broker): if self._input_buf_len < self.HEADER_LEN: @@ -1402,12 +1410,17 @@ class Stream(BasicStream): msg = Message() msg.router = self._router - (msg.dst_id, msg.src_id, msg.auth_id, + (magic, msg.dst_id, msg.src_id, msg.auth_id, msg.handle, msg.reply_to, msg_len) = struct.unpack( self.HEADER_FMT, self._input_buf[0][:self.HEADER_LEN], ) + if magic != self.HEADER_MAGIC: + LOG.error(self.corrupt_msg, self._input_buf[0][:128]) + self.on_disconnect(broker) + return False + if msg_len > self._router.max_message_size: LOG.error('Maximum message size exceeded (got %d, max %d)', msg_len, self._router.max_message_size) @@ -1473,9 +1486,9 @@ class Stream(BasicStream): def _send(self, msg): _vv and IOLOG.debug('%r._send(%r)', self, msg) - pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id, - msg.auth_id, msg.handle, msg.reply_to or 0, - len(msg.data)) + msg.data + pkt = struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, msg.dst_id, + msg.src_id, msg.auth_id, msg.handle, + msg.reply_to or 0, len(msg.data)) + msg.data if not self._output_buf_len: self._router.broker._start_transmit(self) self._output_buf.append(pkt) diff --git a/tests/stream_test.py b/tests/stream_test.py new file mode 100644 index 00000000..d844e610 --- /dev/null +++ b/tests/stream_test.py @@ -0,0 +1,33 @@ + +import unittest2 +import mock + +import mitogen.core + +import testlib + + +class ReceiveOneTest(testlib.TestCase): + klass = mitogen.core.Stream + + def test_corruption(self): + broker = mock.Mock() + router = mock.Mock() + + stream = self.klass(router, 1) + junk = mitogen.core.b('x') * stream.HEADER_LEN + stream._input_buf = [junk] + stream._input_buf_len = len(junk) + + capture = testlib.LogCapturer() + capture.start() + ret = stream._receive_one(broker) + #self.assertEquals(1, broker.stop_receive.mock_calls) + capture.stop() + + self.assertFalse(ret) + self.assertTrue((self.klass.corrupt_msg % (junk,)) in capture.raw()) + + +if __name__ == '__main__': + unittest2.main()