mitogen: Send FileService content as ENC_BIN

This should speed up file transfer greatly on Python 3.x, because it avoids
the compatibility shim for byte strings in Pickle protocol 2.
pull/1432/head
Alex Willmer 2 days ago
parent 2de7bf58a6
commit 1402328e3e

@ -28,6 +28,8 @@ In progress (unreleased)
the magic field as a content encoding enumeration.
* :gh:issue:`1430` :mod:`mitogen`: Add explicit binary Message encoding,
marked using :data:`mitogen.core.Message.ENC_BIN`.
* :gh:issue:`1430` :mod:`mitogen`: Send :class:`mitogen.service.FileService`
content raw, without pickle encoding
v0.3.38 (2026-01-23)

@ -981,6 +981,11 @@ class Message(object):
else:
raise ChannelError(ChannelError.remote_msg)
def decode(self, throw=True, throw_dead=True):
if self.enc == self.ENC_PKL: return self.unpickle(throw, throw_dead)
if self.enc == self.ENC_BIN: return self.data
raise ValueError('Invalid explicit enc: %r' % (self.enc,))
def unpickle(self, throw=True, throw_dead=True):
"""
Unpickle :attr:`data`, optionally raising any exceptions present.
@ -1054,12 +1059,12 @@ class Sender(object):
self.context = context
self.dst_handle = dst_handle
def send(self, data):
def send(self, data, enc=Message.ENC_PKL):
"""
Send `data` to the remote end.
"""
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(Message.pickled(data, handle=self.dst_handle))
_vv and IOLOG.debug('%r.send(%*r.., enc=%s)', self, 100, data, enc)
self.context.send(Message.encoded(data, enc, handle=self.dst_handle))
explicit_close_msg = 'Sender was explicitly closed'

@ -974,14 +974,8 @@ class FileService(Service):
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Message.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
# Therefore subtract encoding overhead and Message header size.
IO_SIZE = mitogen.core.CHUNK_SIZE - mitogen.core.Message.HEADER_LEN
def _schedule_pending_unlocked(self, state):
"""
@ -997,7 +991,7 @@ class FileService(Service):
s = fp.read(self.IO_SIZE)
if s:
state.unacked += len(s)
sender.send(mitogen.core.Blob(s))
sender.send(s, mitogen.core.Message.ENC_BIN)
else:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
@ -1145,8 +1139,8 @@ class FileService(Service):
)
received_bytes = 0
for chunk in recv:
s = chunk.unpickle()
for msg in recv:
s = msg.decode()
LOG.debug('get_file(%r): received %d bytes', path, len(s))
context.call_service_async(
service_name=cls.name(),

Loading…
Cancel
Save