Merge pull request #1432 from moreati/issue485-fileservice

mitogen: Add explicit Message encoding, send FileService data as raw binary
master
Alex Willmer 2 days ago committed by GitHub
commit f2498689df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -23,6 +23,13 @@ In progress (unreleased)
* :gh:issue:`1430` :mod:`mitogen`: Pickle :data:`mitogen.core.GET_RESOURCE`
parameters directly as textual strings (rather than ASCII in byte strings)
* :gh:issue:`1430` :mod:`mitogen`: Explicitly mark messages known to carry
pickled data, using :data:`mitogen.core.Message.ENC_PKL`. This repurposes
the magic field as a content encoding enumeration.
* :gh:issue:`1430` :mod:`mitogen`: Add explicit binary Message encoding,
marked using :data:`mitogen.core.Message.ENC_BIN`.
* :gh:issue:`1430` :mod:`mitogen`: Send :class:`mitogen.service.FileService`
content raw, without pickle encoding
v0.3.38 (2026-01-23)

@ -801,6 +801,9 @@ class Message(object):
:class:`mitogen.core.Router` for ingress messages, and helper methods for
deserialization and generating replies.
"""
ENCS = frozenset(range(0x4d49, 0x4d49+3))
ENC_MGC, ENC_PKL, ENC_BIN = sorted(ENCS)
#: Integer target context ID. :class:`Router` delivers messages locally
#: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise
#: they are routed up or downstream.
@ -827,6 +830,11 @@ class Message(object):
#: Raw message data bytes.
data = b('')
#: Encoding of payload in :attr:`data`, one of the ``ENC_*`` constants.
#: :attr:`ENC_MGC` is an implicit, legacy value. New features &
#: :ref:`standard-handles` should explicitly declare an encoding.
enc = ENC_MGC
_unpickled = object()
#: The :class:`Router` responsible for routing the message. This is
@ -839,7 +847,7 @@ class Message(object):
HEADER_FMT = '>hLLLLLL'
HEADER_LEN = struct.calcsize(HEADER_FMT)
HEADER_MAGIC = 0x4d49 # 'MI'
HEADER_MAGIC = ENC_MGC
def __init__(self, **kwargs):
"""
@ -850,10 +858,12 @@ class Message(object):
self.auth_id = mitogen.context_id
vars(self).update(kwargs)
assert isinstance(self.data, BytesType), 'Message data is not Bytes'
if self.enc not in self.ENCS:
raise ValueError('Invalid enc: %r' % (self.enc,))
def pack(self):
return (
struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, self.dst_id,
struct.pack(self.HEADER_FMT, self.enc, self.dst_id,
self.src_id, self.auth_id, self.handle,
self.reply_to or 0, len(self.data))
+ self.data
@ -911,6 +921,12 @@ class Message(object):
kwargs['data'], _ = _codecs.utf_8_encode(reason or u'')
return cls(reply_to=IS_DEAD, **kwargs)
@classmethod
def encoded(cls, obj, enc, **kwargs):
if enc == cls.ENC_PKL: return cls.pickled(obj, **kwargs)
if enc == cls.ENC_BIN: return cls(data=obj, enc=enc, **kwargs)
raise ValueError('Invalid explicit enc: %r' % (enc,))
@classmethod
def pickled(cls, obj, **kwargs):
"""
@ -920,7 +936,7 @@ class Message(object):
:returns:
The new message.
"""
self = cls(**kwargs)
self = cls(enc=cls.ENC_PKL, **kwargs)
try:
self.data = pickle__dumps(obj, protocol=2)
except pickle.PicklingError:
@ -965,6 +981,11 @@ class Message(object):
else:
raise ChannelError(ChannelError.remote_msg)
def decode(self, throw=True, throw_dead=True):
if self.enc == self.ENC_PKL: return self.unpickle(throw, throw_dead)
if self.enc == self.ENC_BIN: return self.data
raise ValueError('Invalid explicit enc: %r' % (self.enc,))
def unpickle(self, throw=True, throw_dead=True):
"""
Unpickle :attr:`data`, optionally raising any exceptions present.
@ -979,6 +1000,10 @@ class Message(object):
The `is_dead` field was set.
"""
_vv and IOLOG.debug('%r.unpickle()', self)
if self.enc not in (self.ENC_MGC, self.ENC_PKL):
raise ValueError(
'Message %r is not pickled, invalid enc=%r', self, self.enc,
)
if throw_dead and self.is_dead:
self._throw_dead()
@ -1034,12 +1059,12 @@ class Sender(object):
self.context = context
self.dst_handle = dst_handle
def send(self, data):
def send(self, data, enc=Message.ENC_PKL):
"""
Send `data` to the remote end.
"""
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(Message.pickled(data, handle=self.dst_handle))
_vv and IOLOG.debug('%r.send(%*r.., enc=%s)', self, 100, data, enc)
self.context.send(Message.encoded(data, enc, handle=self.dst_handle))
explicit_close_msg = 'Sender was explicitly closed'
@ -2386,13 +2411,13 @@ class MitogenProtocol(Protocol):
msg = Message()
msg.router = self._router
(magic, msg.dst_id, msg.src_id, msg.auth_id,
(msg.enc, msg.dst_id, msg.src_id, msg.auth_id,
msg.handle, msg.reply_to, msg_len) = struct.unpack(
Message.HEADER_FMT,
self._input_buf[0][:Message.HEADER_LEN],
)
if magic != Message.HEADER_MAGIC:
if msg.enc not in Message.ENCS:
LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048])
self.stream.on_disconnect(broker)
return False

@ -974,14 +974,8 @@ class FileService(Service):
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Message.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
# Therefore subtract encoding overhead and Message header size.
IO_SIZE = mitogen.core.CHUNK_SIZE - mitogen.core.Message.HEADER_LEN
def _schedule_pending_unlocked(self, state):
"""
@ -997,7 +991,7 @@ class FileService(Service):
s = fp.read(self.IO_SIZE)
if s:
state.unacked += len(s)
sender.send(mitogen.core.Blob(s))
sender.send(s, mitogen.core.Message.ENC_BIN)
else:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
@ -1145,8 +1139,8 @@ class FileService(Service):
)
received_bytes = 0
for chunk in recv:
s = chunk.unpickle()
for msg in recv:
s = msg.decode()
LOG.debug('get_file(%r): received %d bytes', path, len(s))
context.call_service_async(
service_name=cls.name(),

@ -59,9 +59,14 @@ class ConstructorTest(testlib.TestCase):
self.assertEqual(m.data, b('asdf'))
self.assertIsInstance(m.data, mitogen.core.BytesType)
def test_data_hates_unicode(self):
def test_enc(self):
self.assertEqual(self.klass().enc, self.klass.ENC_MGC)
self.assertEqual(self.klass(enc=self.klass.ENC_PKL).enc, self.klass.ENC_PKL)
def test_invalid_args(self):
self.assertRaises(Exception,
lambda: self.klass(data=u'asdf'))
self.assertRaises(ValueError, lambda: self.klass(enc=42))
class PackTest(testlib.TestCase):
@ -75,10 +80,10 @@ class PackTest(testlib.TestCase):
s = self.klass(dst_id=123, handle=123).pack()
self.assertEqual(len(s), self.klass.HEADER_LEN)
def test_magic(self):
def test_enc(self):
s = self.klass(dst_id=123, handle=123).pack()
magic, = struct.unpack('>h', s[:2])
self.assertEqual(self.klass.HEADER_MAGIC, magic)
enc, = struct.unpack('>h', s[:2])
self.assertEqual(self.klass.ENC_MGC, enc)
def test_dst_id(self):
s = self.klass(dst_id=123, handle=123).pack()
@ -158,13 +163,32 @@ class EvilObject(object):
pass
class EncodedTest(testlib.TestCase):
klass = mitogen.core.Message
def test_ctor(self):
msg = self.klass.encoded(42, self.klass.ENC_PKL)
self.assertEqual(self.klass.ENC_PKL, msg.enc)
msg = self.klass.encoded(b'abc', self.klass.ENC_BIN)
self.assertEqual(b'abc', msg.data)
self.assertEqual(self.klass.ENC_BIN, msg.enc)
def test_invalid_args(self):
self.assertRaises(ValueError, lambda: self.klass.encoded(42, enc=self.klass.ENC_MGC))
self.assertRaises(ValueError, lambda: self.klass.encoded(b('abc'), enc=self.klass.ENC_MGC))
self.assertRaises(Exception, lambda: self.klass.encoded(42, enc=self.klass.ENC_BIN))
self.assertRaises(Exception, lambda: self.klass.encoded(u'abc', enc=self.klass.ENC_BIN))
class PickledTest(testlib.TestCase):
# getting_started.html#rpc-serialization-rules
klass = mitogen.core.Message
def roundtrip(self, v, router=None):
msg = self.klass.pickled(v)
self.assertEqual(self.klass.ENC_PKL, msg.enc)
msg2 = self.klass(data=msg.data)
self.assertEqual(self.klass.ENC_MGC, msg2.enc)
msg2.router = router
return msg2.unpickle()
@ -362,6 +386,11 @@ class UnpickleTest(testlib.TestCase):
m = self.klass.pickled('derp', reply_to=mitogen.core.IS_DEAD)
self.assertEqual('derp', m.unpickle(throw_dead=False))
def test_invalid_enc(self):
msg = self.klass.pickled(42)
msg.enc = self.klass.ENC_BIN
self.assertRaises(ValueError, msg.unpickle)
class UnpickleCompatTest(testlib.TestCase):
# try weird variations of pickles from different Python versions.

Loading…
Cancel
Save