From b25b42e02803fee036be82d75173768af50f2b1d Mon Sep 17 00:00:00 2001 From: Alex Willmer Date: Mon, 26 Jan 2026 17:55:24 +0000 Subject: [PATCH 1/3] mitogen: Explicitly mark pickled messages, re-uses magic header field --- docs/changelog.rst | 3 +++ mitogen/core.py | 20 +++++++++++++++----- tests/message_test.py | 15 +++++++++++---- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 9ff1ff82..16444487 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -23,6 +23,9 @@ In progress (unreleased) * :gh:issue:`1430` :mod:`mitogen`: Pickle :data:`mitogen.core.GET_RESOURCE` parameters directly as textual strings (rather than ASCII in byte strings) +* :gh:issue:`1430` :mod:`mitogen`: Explicitly mark messages known to carry + pickled data, using :data:`mitogen.core.Message.ENC_PKL`. This repurposes + the magic field as a content encoding enumeration. v0.3.38 (2026-01-23) diff --git a/mitogen/core.py b/mitogen/core.py index 2295b62a..27b0a544 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -801,6 +801,9 @@ class Message(object): :class:`mitogen.core.Router` for ingress messages, and helper methods for deserialization and generating replies. """ + ENCS = frozenset(range(0x4d49, 0x4d49+2)) + ENC_MGC, ENC_PKL = sorted(ENCS) + #: Integer target context ID. :class:`Router` delivers messages locally #: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise #: they are routed up or downstream. @@ -827,6 +830,11 @@ class Message(object): #: Raw message data bytes. data = b('') + #: Encoding of payload in :attr:`data`, one of the ``ENC_*`` constants. + #: :attr:`ENC_MGC` is an implicit, legacy value. New features & + #: :ref:`standard-handles` should explicitly declare an encoding. + enc = ENC_MGC + _unpickled = object() #: The :class:`Router` responsible for routing the message. This is @@ -839,7 +847,7 @@ class Message(object): HEADER_FMT = '>hLLLLLL' HEADER_LEN = struct.calcsize(HEADER_FMT) - HEADER_MAGIC = 0x4d49 # 'MI' + HEADER_MAGIC = ENC_MGC def __init__(self, **kwargs): """ @@ -850,10 +858,12 @@ class Message(object): self.auth_id = mitogen.context_id vars(self).update(kwargs) assert isinstance(self.data, BytesType), 'Message data is not Bytes' + if self.enc not in self.ENCS: + raise ValueError('Invalid enc: %r' % (self.enc,)) def pack(self): return ( - struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, self.dst_id, + struct.pack(self.HEADER_FMT, self.enc, self.dst_id, self.src_id, self.auth_id, self.handle, self.reply_to or 0, len(self.data)) + self.data @@ -920,7 +930,7 @@ class Message(object): :returns: The new message. """ - self = cls(**kwargs) + self = cls(enc=cls.ENC_PKL, **kwargs) try: self.data = pickle__dumps(obj, protocol=2) except pickle.PicklingError: @@ -2386,13 +2396,13 @@ class MitogenProtocol(Protocol): msg = Message() msg.router = self._router - (magic, msg.dst_id, msg.src_id, msg.auth_id, + (msg.enc, msg.dst_id, msg.src_id, msg.auth_id, msg.handle, msg.reply_to, msg_len) = struct.unpack( Message.HEADER_FMT, self._input_buf[0][:Message.HEADER_LEN], ) - if magic != Message.HEADER_MAGIC: + if msg.enc not in Message.ENCS: LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048]) self.stream.on_disconnect(broker) return False diff --git a/tests/message_test.py b/tests/message_test.py index fc50a327..27421466 100644 --- a/tests/message_test.py +++ b/tests/message_test.py @@ -59,9 +59,14 @@ class ConstructorTest(testlib.TestCase): self.assertEqual(m.data, b('asdf')) self.assertIsInstance(m.data, mitogen.core.BytesType) - def test_data_hates_unicode(self): + def test_enc(self): + self.assertEqual(self.klass().enc, self.klass.ENC_MGC) + self.assertEqual(self.klass(enc=self.klass.ENC_PKL).enc, self.klass.ENC_PKL) + + def test_invalid_args(self): self.assertRaises(Exception, lambda: self.klass(data=u'asdf')) + self.assertRaises(ValueError, lambda: self.klass(enc=42)) class PackTest(testlib.TestCase): @@ -75,10 +80,10 @@ class PackTest(testlib.TestCase): s = self.klass(dst_id=123, handle=123).pack() self.assertEqual(len(s), self.klass.HEADER_LEN) - def test_magic(self): + def test_enc(self): s = self.klass(dst_id=123, handle=123).pack() - magic, = struct.unpack('>h', s[:2]) - self.assertEqual(self.klass.HEADER_MAGIC, magic) + enc, = struct.unpack('>h', s[:2]) + self.assertEqual(self.klass.ENC_MGC, enc) def test_dst_id(self): s = self.klass(dst_id=123, handle=123).pack() @@ -164,7 +169,9 @@ class PickledTest(testlib.TestCase): def roundtrip(self, v, router=None): msg = self.klass.pickled(v) + self.assertEqual(self.klass.ENC_PKL, msg.enc) msg2 = self.klass(data=msg.data) + self.assertEqual(self.klass.ENC_MGC, msg2.enc) msg2.router = router return msg2.unpickle() From 2de7bf58a608ebcf629c37dd6743d78ada05e40a Mon Sep 17 00:00:00 2001 From: Alex Willmer Date: Mon, 26 Jan 2026 18:36:41 +0000 Subject: [PATCH 2/3] mitogen: Add explicit binary Message encoding, marked using ENC_BIN --- docs/changelog.rst | 2 ++ mitogen/core.py | 14 ++++++++++++-- tests/message_test.py | 22 ++++++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 16444487..7feaca9c 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -26,6 +26,8 @@ In progress (unreleased) * :gh:issue:`1430` :mod:`mitogen`: Explicitly mark messages known to carry pickled data, using :data:`mitogen.core.Message.ENC_PKL`. This repurposes the magic field as a content encoding enumeration. +* :gh:issue:`1430` :mod:`mitogen`: Add explicit binary Message encoding, + marked using :data:`mitogen.core.Message.ENC_BIN`. v0.3.38 (2026-01-23) diff --git a/mitogen/core.py b/mitogen/core.py index 27b0a544..d97c0c39 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -801,8 +801,8 @@ class Message(object): :class:`mitogen.core.Router` for ingress messages, and helper methods for deserialization and generating replies. """ - ENCS = frozenset(range(0x4d49, 0x4d49+2)) - ENC_MGC, ENC_PKL = sorted(ENCS) + ENCS = frozenset(range(0x4d49, 0x4d49+3)) + ENC_MGC, ENC_PKL, ENC_BIN = sorted(ENCS) #: Integer target context ID. :class:`Router` delivers messages locally #: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise @@ -921,6 +921,12 @@ class Message(object): kwargs['data'], _ = _codecs.utf_8_encode(reason or u'') return cls(reply_to=IS_DEAD, **kwargs) + @classmethod + def encoded(cls, obj, enc, **kwargs): + if enc == cls.ENC_PKL: return cls.pickled(obj, **kwargs) + if enc == cls.ENC_BIN: return cls(data=obj, enc=enc, **kwargs) + raise ValueError('Invalid explicit enc: %r' % (enc,)) + @classmethod def pickled(cls, obj, **kwargs): """ @@ -989,6 +995,10 @@ class Message(object): The `is_dead` field was set. """ _vv and IOLOG.debug('%r.unpickle()', self) + if self.enc not in (self.ENC_MGC, self.ENC_PKL): + raise ValueError( + 'Message %r is not pickled, invalid enc=%r', self, self.enc, + ) if throw_dead and self.is_dead: self._throw_dead() diff --git a/tests/message_test.py b/tests/message_test.py index 27421466..1c37cc9c 100644 --- a/tests/message_test.py +++ b/tests/message_test.py @@ -163,6 +163,23 @@ class EvilObject(object): pass +class EncodedTest(testlib.TestCase): + klass = mitogen.core.Message + def test_ctor(self): + msg = self.klass.encoded(42, self.klass.ENC_PKL) + self.assertEqual(self.klass.ENC_PKL, msg.enc) + + msg = self.klass.encoded(b'abc', self.klass.ENC_BIN) + self.assertEqual(b'abc', msg.data) + self.assertEqual(self.klass.ENC_BIN, msg.enc) + + def test_invalid_args(self): + self.assertRaises(ValueError, lambda: self.klass.encoded(42, enc=self.klass.ENC_MGC)) + self.assertRaises(ValueError, lambda: self.klass.encoded(b('abc'), enc=self.klass.ENC_MGC)) + self.assertRaises(Exception, lambda: self.klass.encoded(42, enc=self.klass.ENC_BIN)) + self.assertRaises(Exception, lambda: self.klass.encoded(u'abc', enc=self.klass.ENC_BIN)) + + class PickledTest(testlib.TestCase): # getting_started.html#rpc-serialization-rules klass = mitogen.core.Message @@ -369,6 +386,11 @@ class UnpickleTest(testlib.TestCase): m = self.klass.pickled('derp', reply_to=mitogen.core.IS_DEAD) self.assertEqual('derp', m.unpickle(throw_dead=False)) + def test_invalid_enc(self): + msg = self.klass.pickled(42) + msg.enc = self.klass.ENC_BIN + self.assertRaises(ValueError, msg.unpickle) + class UnpickleCompatTest(testlib.TestCase): # try weird variations of pickles from different Python versions. From 1402328e3e639d64f8b34c8f8d6c269e168a083a Mon Sep 17 00:00:00 2001 From: Alex Willmer Date: Mon, 26 Jan 2026 22:13:59 +0000 Subject: [PATCH 3/3] mitogen: Send FileService content as ENC_BIN This should speed up file transfer greatly on Python 3.x, because it avoids the compatibility shim for byte strings in Pickle protocol 2. --- docs/changelog.rst | 2 ++ mitogen/core.py | 11 ++++++++--- mitogen/service.py | 16 +++++----------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 7feaca9c..cc6b6a83 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -28,6 +28,8 @@ In progress (unreleased) the magic field as a content encoding enumeration. * :gh:issue:`1430` :mod:`mitogen`: Add explicit binary Message encoding, marked using :data:`mitogen.core.Message.ENC_BIN`. +* :gh:issue:`1430` :mod:`mitogen`: Send :class:`mitogen.service.FileService` + content raw, without pickle encoding v0.3.38 (2026-01-23) diff --git a/mitogen/core.py b/mitogen/core.py index d97c0c39..5fd44b42 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -981,6 +981,11 @@ class Message(object): else: raise ChannelError(ChannelError.remote_msg) + def decode(self, throw=True, throw_dead=True): + if self.enc == self.ENC_PKL: return self.unpickle(throw, throw_dead) + if self.enc == self.ENC_BIN: return self.data + raise ValueError('Invalid explicit enc: %r' % (self.enc,)) + def unpickle(self, throw=True, throw_dead=True): """ Unpickle :attr:`data`, optionally raising any exceptions present. @@ -1054,12 +1059,12 @@ class Sender(object): self.context = context self.dst_handle = dst_handle - def send(self, data): + def send(self, data, enc=Message.ENC_PKL): """ Send `data` to the remote end. """ - _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) - self.context.send(Message.pickled(data, handle=self.dst_handle)) + _vv and IOLOG.debug('%r.send(%*r.., enc=%s)', self, 100, data, enc) + self.context.send(Message.encoded(data, enc, handle=self.dst_handle)) explicit_close_msg = 'Sender was explicitly closed' diff --git a/mitogen/service.py b/mitogen/service.py index b852f44d..d27f75f6 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -974,14 +974,8 @@ class FileService(Service): # The IO loop pumps 128KiB chunks. An ideal message is a multiple of this, # odd-sized messages waste one tiny write() per message on the trailer. - # Therefore subtract 10 bytes pickle overhead + 24 bytes header. - IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Message.HEADER_LEN + ( - len( - mitogen.core.Message.pickled( - mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE) - ).data - ) - mitogen.core.CHUNK_SIZE - )) + # Therefore subtract encoding overhead and Message header size. + IO_SIZE = mitogen.core.CHUNK_SIZE - mitogen.core.Message.HEADER_LEN def _schedule_pending_unlocked(self, state): """ @@ -997,7 +991,7 @@ class FileService(Service): s = fp.read(self.IO_SIZE) if s: state.unacked += len(s) - sender.send(mitogen.core.Blob(s)) + sender.send(s, mitogen.core.Message.ENC_BIN) else: # File is done. Cause the target's receive loop to exit by # closing the sender, close the file, and remove the job entry. @@ -1145,8 +1139,8 @@ class FileService(Service): ) received_bytes = 0 - for chunk in recv: - s = chunk.unpickle() + for msg in recv: + s = msg.decode() LOG.debug('get_file(%r): received %d bytes', path, len(s)) context.call_service_async( service_name=cls.name(),