diff --git a/docs/changelog.rst b/docs/changelog.rst
index 8707871b..e232afe5 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -21,7 +21,15 @@ v0.2.10 (unreleased)
To avail of fixes in an unreleased version, please download a ZIP file
`directly from GitHub `_.
-*(no changes)*
+* A custom codec replaces the :mod:`pickle` module, eliminating a source of
+ worries, many compatibility and security hacks, and bringing achievable file
+ transfer throughput on Python 3 in line with Python 2 (approx 800 MiB/s).
+
+ The serializer increases bootstrap size by 400 bytes and is around 5-10x
+ slower on complex messages, requiring 62 μsec to encode a typical Ansible
+ module RPC compared to 12 μsec in Mitogen 0.2.9, while RPC size is reduced by
+ around 10%.
+
v0.2.9 (2019-11-02)
diff --git a/docs/getting_started.rst b/docs/getting_started.rst
index 12056c55..b452c40d 100644
--- a/docs/getting_started.rst
+++ b/docs/getting_started.rst
@@ -344,8 +344,9 @@ remote procedure calls:
* :func:`bytes` (:class:`str` on Python 2.x)
* :class:`dict`
* :class:`int`
-* :func:`list`
* :class:`long`
+* :class:`float`
+* :func:`list`
* :func:`tuple`
* :func:`unicode` (:class:`str` on Python 3.x)
diff --git a/docs/howitworks.rst b/docs/howitworks.rst
index 05c097e5..3f658dbe 100644
--- a/docs/howitworks.rst
+++ b/docs/howitworks.rst
@@ -145,10 +145,9 @@ such that :py:mod:`cPickle` correctly serializes instance module names.
Once a synthetic :py:mod:`mitogen` package and :py:mod:`mitogen.core` module
have been generated, the bootstrap **deletes** `sys.modules['__main__']`, so
-that any attempt to import it (by :py:mod:`cPickle`) will cause the import to
-be satisfied by fetching the master's actual :mod:`__main__` module. This is
-necessary to allow master programs to be written as a self-contained Python
-script.
+that any attempt to import it will cause the import to be satisfied by fetching
+the master's actual :mod:`__main__` module. This is necessary to allow master
+programs to be written as a self-contained Python script.
Reaping The First Stage
@@ -312,7 +311,7 @@ parent and child. Integers use big endian in their encoded form.
* - `data`
- n/a
- - Message data, which may be raw or pickled.
+ - Message data, which may be raw or serialized.
@@ -496,24 +495,22 @@ Additional handles are created to receive the result of every function call
triggered by :py:meth:`call_async() `.
-Use of Pickle
+Serialization
#############
-The current implementation uses the Python :py:mod:`cPickle` module, with a
-restrictive class whitelist to prevent triggering undesirable code execution.
-The primary reason for using :py:mod:`cPickle` is that it is computationally
-efficient, and avoids including a potentially large body of serialization code
-in the bootstrap.
-
-The pickler will instantiate only built-in types and one of 3 constructor
-functions, to support unpickling :py:class:`CallError
-`, :py:class:`mitogen.core.Sender`,and
-:py:class:`Context `.
-
-The choice of Pickle is one area to be revisited later. All accounts suggest it
-cannot be used securely, however few of those accounts appear to be expert, and
-none mention any additional attacks that would not be prevented by using a
-restrictive class whitelist.
+Previous releases used the native :py:mod:`cPickle` module with various
+security restrictions, however this became impractical due to `compatibility
+kludges `_ introduced during Python 3 that
+cause excess CPU usage and an input-dependent variable length encoding for
+8-bit data. Python 2-compatible pickles produced by Python 3 therefore became
+unsuitable for high bandwidth use, such as in file transfer.
+
+Mitogen 0.3 introduced a built-in serializer, primarily aiming for minimal code
+and CPU footprint. Compared to pickle it is much slower for complex messages
+and cannot encode cyclical data structures, but type errors can now be detected
+during encoding rather than decoding, many compatibility hacks are removed,
+decoded messages always have size proportional to their encoding, and behaviour
+is identical on every Python version.
The IO Multiplexer
diff --git a/mitogen/core.py b/mitogen/core.py
index 4dd44925..76195a49 100644
--- a/mitogen/core.py
+++ b/mitogen/core.py
@@ -34,6 +34,9 @@ non-essential code in order to reduce its size, since it is also serves as the
bootstrap implementation sent to every new slave context.
"""
+# Direct encodings.* imports below are to allow decoding strings with the
+# import lock held: bytes.decode() invokes the importer!
+
import binascii
import collections
import encodings.latin_1
@@ -44,7 +47,6 @@ import itertools
import linecache
import logging
import os
-import pickle as py_pickle
import pstats
import signal
import socket
@@ -75,11 +77,6 @@ try:
except ImportError:
import threading as thread
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
-
try:
from cStringIO import StringIO as BytesIO
except ImportError:
@@ -95,6 +92,12 @@ try:
except NameError:
ModuleNotFoundError = ImportError
+try:
+ xrange
+except NameError:
+ xrange = range
+
+
# TODO: usage of 'import' after setting __name__, but before fixing up
# sys.modules generates a warning. This happens when profiling = True.
warnings.filterwarnings('ignore',
@@ -151,12 +154,14 @@ if PY3:
FsPathTypes = (str,)
BufferType = lambda buf, start: memoryview(buf)[start:]
long = int
+ iteritems = dict.items
else:
b = str
BytesType = str
FsPathTypes = (str, unicode)
BufferType = buffer
UnicodeType = unicode
+ iteritems = dict.iteritems
AnyTextType = (BytesType, UnicodeType)
@@ -248,9 +253,6 @@ class Blob(BytesType):
def __repr__(self):
return '[blob: %d bytes]' % len(self)
- def __reduce__(self):
- return (Blob, (BytesType(self),))
-
class Secret(UnicodeType):
"""
@@ -265,9 +267,6 @@ class Secret(UnicodeType):
def __str__(self):
return UnicodeType(self)
- def __reduce__(self):
- return (Secret, (UnicodeType(self),))
-
class Kwargs(dict):
"""
@@ -296,9 +295,6 @@ class Kwargs(dict):
def __repr__(self):
return 'Kwargs(%s)' % (dict.__repr__(self),)
- def __reduce__(self):
- return (Kwargs, (dict(self),))
-
class CallError(Error):
"""
@@ -319,15 +315,6 @@ class CallError(Error):
fmt += ''.join(traceback.format_tb(tb))
Error.__init__(self, fmt)
- def __reduce__(self):
- return (_unpickle_call_error, (self.args[0],))
-
-
-def _unpickle_call_error(s):
- if not (type(s) is UnicodeType and len(s) < 10000):
- raise TypeError('cannot unpickle CallError: bad input')
- return CallError(s)
-
class ChannelError(Error):
"""
@@ -705,54 +692,6 @@ def iter_split(buf, delim, func):
return buf[start:], cont
-class Py24Pickler(py_pickle.Pickler):
- """
- Exceptions were classic classes until Python 2.5. Sadly for 2.4, cPickle
- offers little control over how a classic instance is pickled. Therefore 2.4
- uses a pure-Python pickler, so CallError can be made to look as it does on
- newer Pythons.
-
- This mess will go away once proper serialization exists.
- """
- @classmethod
- def dumps(cls, obj, protocol):
- bio = BytesIO()
- self = cls(bio, protocol=protocol)
- self.dump(obj)
- return bio.getvalue()
-
- def save_exc_inst(self, obj):
- if isinstance(obj, CallError):
- func, args = obj.__reduce__()
- self.save(func)
- self.save(args)
- self.write(py_pickle.REDUCE)
- else:
- py_pickle.Pickler.save_inst(self, obj)
-
- if PY24:
- dispatch = py_pickle.Pickler.dispatch.copy()
- dispatch[py_pickle.InstanceType] = save_exc_inst
-
-
-if PY3:
- # In 3.x Unpickler is a class exposing find_class as an overridable, but it
- # cannot be overridden without subclassing.
- class _Unpickler(pickle.Unpickler):
- def find_class(self, module, func):
- return self.find_global(module, func)
- pickle__dumps = pickle.dumps
-elif PY24:
- # On Python 2.4, we must use a pure-Python pickler.
- pickle__dumps = Py24Pickler.dumps
- _Unpickler = pickle.Unpickler
-else:
- pickle__dumps = pickle.dumps
- # In 2.x Unpickler is a function exposing a writeable find_global
- # attribute.
- _Unpickler = pickle.Unpickler
-
-
class Message(object):
"""
Messages are the fundamental unit of communication, comprising fields from
@@ -818,40 +757,6 @@ class Message(object):
+ self.data
)
- def _unpickle_context(self, context_id, name):
- return _unpickle_context(context_id, name, router=self.router)
-
- def _unpickle_sender(self, context_id, dst_handle):
- return _unpickle_sender(self.router, context_id, dst_handle)
-
- def _unpickle_bytes(self, s, encoding):
- s, n = LATIN1_CODEC.encode(s)
- return s
-
- def _find_global(self, module, func):
- """
- Return the class implementing `module_name.class_name` or raise
- `StreamError` if the module is not whitelisted.
- """
- if module == __name__:
- if func == '_unpickle_call_error' or func == 'CallError':
- return _unpickle_call_error
- elif func == '_unpickle_sender':
- return self._unpickle_sender
- elif func == '_unpickle_context':
- return self._unpickle_context
- elif func == 'Blob':
- return Blob
- elif func == 'Secret':
- return Secret
- elif func == 'Kwargs':
- return Kwargs
- elif module == '_codecs' and func == 'encode':
- return self._unpickle_bytes
- elif module == '__builtin__' and func == 'bytes':
- return BytesType
- raise StreamError('cannot unpickle %r/%r', module, func)
-
@property
def is_dead(self):
"""
@@ -880,11 +785,11 @@ class Message(object):
The new message.
"""
self = cls(**kwargs)
- try:
- self.data = pickle__dumps(obj, protocol=2)
- except pickle.PicklingError:
- e = sys.exc_info()[1]
- self.data = pickle__dumps(CallError(e), protocol=2)
+ #try:
+ self.data = encode(obj)
+ #except pickle.PicklingError:
+ #e = sys.exc_info()[1]
+ #self.data = encode(CallError(e))
return self
def reply(self, msg, router=None, **kwargs):
@@ -911,11 +816,6 @@ class Message(object):
LOG.debug('dropping reply to message with no return address: %r',
msg)
- if PY3:
- UNPICKLER_KWARGS = {'encoding': 'bytes'}
- else:
- UNPICKLER_KWARGS = {}
-
def _throw_dead(self):
if len(self.data):
raise ChannelError(self.data.decode('utf-8', 'replace'))
@@ -943,13 +843,9 @@ class Message(object):
obj = self._unpickled
if obj is Message._unpickled:
- fp = BytesIO(self.data)
- unpickler = _Unpickler(fp, **self.UNPICKLER_KWARGS)
- unpickler.find_global = self._find_global
try:
- # Must occur off the broker thread.
try:
- obj = unpickler.load()
+ obj = decode(self.data, self.router)
except:
LOG.error('raw pickle was: %r', self.data)
raise
@@ -1013,17 +909,6 @@ class Sender(object):
def __repr__(self):
return 'Sender(%r, %r)' % (self.context, self.dst_handle)
- def __reduce__(self):
- return _unpickle_sender, (self.context.context_id, self.dst_handle)
-
-
-def _unpickle_sender(router, context_id, dst_handle):
- if not (isinstance(router, Router) and
- isinstance(context_id, (int, long)) and context_id >= 0 and
- isinstance(dst_handle, (int, long)) and dst_handle > 0):
- raise TypeError('cannot unpickle Sender: bad input or missing router')
- return Sender(Context(router, context_id), dst_handle)
-
class Receiver(object):
"""
@@ -2216,9 +2101,6 @@ class Context(object):
if name:
self.name = to_text(name)
- def __reduce__(self):
- return _unpickle_context, (self.context_id, self.name)
-
def on_disconnect(self):
_v and LOG.debug('%r: disconnecting', self)
fire(self, 'disconnect')
@@ -2299,16 +2181,199 @@ class Context(object):
return 'Context(%s, %r)' % (self.context_id, self.name)
-def _unpickle_context(context_id, name, router=None):
- if not (isinstance(context_id, (int, long)) and context_id >= 0 and (
- (name is None) or
- (isinstance(name, UnicodeType) and len(name) < 100))
- ):
- raise TypeError('cannot unpickle Context: bad input')
+#
+# Serializer.
+#
+
+(
+ KIND_TRUE, KIND_FALSE, KIND_NONE, KIND_NEG_32, KIND_NEG_64, KIND_POS_32,
+ KIND_POS_64, KIND_DOUBLE, KIND_UTF8, KIND_BYTES, KIND_LIST, KIND_TUPLE,
+ KIND_SET, KIND_DICT, KIND_BLOB, KIND_SECRET, KIND_ERROR, KIND_CONTEXT,
+ KIND_SENDER, KIND_KWARGS,
+) = (
+ chr(n).encode()
+ for n in range(20)
+)
+
+_pack_u32 = lambda n: struct.pack('>I', n)
+_pack_u64 = lambda n: struct.pack('>Q', n)
+_pack_d64 = lambda n: struct.pack('>d', n)
+_unpack_u32 = lambda s: struct.unpack('>I', s)[0]
+_unpack_u64 = lambda s: struct.unpack('>Q', s)[0]
+_unpack_d64 = lambda s: struct.unpack('>d', s)[0]
+_bools = (KIND_FALSE, KIND_TRUE)
+
+
+def _decode_s(read, _):
+ length = _unpack_u32(read(4))
+ encoded = read(length)
+ if len(encoded) != length:
+ raise ValueError('cannot decode: truncated input')
+ return encoded
+
+
+def _decode_dict(read, router):
+ return dict(
+ (_decode(read, router), _decode(read, router))
+ for _ in xrange(_unpack_u32(read(4)))
+ )
+
+
+_DECODE_MAP = {
+ KIND_TRUE: lambda read, _: True,
+ KIND_FALSE: lambda read, _: False,
+ KIND_NONE: lambda read, _: None,
+ KIND_NEG_32: lambda read, _: -_unpack_u32(read(4)),
+ KIND_NEG_64: lambda read, _: -_unpack_u64(read(8)),
+ KIND_POS_32: lambda read, _: _unpack_u32(read(4)),
+ KIND_POS_64: lambda read, _: _unpack_u64(read(8)),
+ KIND_DOUBLE: lambda read, _: _unpack_d64(read(8)),
+ KIND_UTF8: lambda read, router: _decode_s(read, router).decode('utf-8'),
+ KIND_BYTES: _decode_s,
+ KIND_LIST: lambda read, router: [
+ _decode(read, router)
+ for _ in xrange(_unpack_u32(read(4)))
+ ],
+ KIND_TUPLE: lambda read, router: tuple(
+ _decode(read, router)
+ for _ in xrange(_unpack_u32(read(4)))
+ ),
+ KIND_SET: lambda read, router: set(
+ _decode(read, router)
+ for _ in xrange(_unpack_u32(read(4)))
+ ),
+ KIND_DICT: lambda read, router: _decode_dict(read, router),
+ KIND_BLOB: lambda read, router: Blob(_decode_s(read, router)),
+ KIND_SECRET: lambda read, router: Secret(
+ _decode_s(read, router).decode('utf-8')
+ ),
+ KIND_ERROR: lambda read, router: CallError(
+ _decode_s(read, router).decode('utf-8')
+ ),
+ KIND_CONTEXT: lambda read, router: Context(router, _unpack_u32(read(4))),
+ KIND_SENDER: lambda read, router: Sender(
+ router.context_by_id(_unpack_u32(read(4))),
+ _unpack_u32(read(4))
+ ),
+ KIND_KWARGS: lambda read, router: Kwargs(_decode_dict(read, router)),
+}
+
+
+def _encode_int(w, o):
+ if o < 0:
+ o = -o
+ if o <= 2**32-1:
+ w(KIND_NEG_32)
+ w(_pack_u32(o))
+ elif o <= 2**64-1:
+ w(KIND_NEG_64)
+ w(_pack_u64(o))
+ else:
+ raise ValueError('cannot encode %r: exceeds -(2**64-1)' % (o,))
+ elif o <= 2**32-1:
+ w(KIND_POS_32)
+ w(_pack_u32(o))
+ elif o <= 2**64-1:
+ w(KIND_POS_64)
+ w(_pack_u64(o))
+ else:
+ raise ValueError('cannot encode %r: exceeds 2**64-1' % (o,))
+
+
+def _encode_double(w, o):
+ w(KIND_DOUBLE)
+ w(_pack_d64(o))
+
+
+def _encode_s(w, o, kind=KIND_BYTES):
+ l = len(o)
+ if l > 2**32:
+ raise ValueError('cannot encode: size %r exceeds 2**32' % (l,))
+ w(kind)
+ w(_pack_u32(l))
+ w(o)
+
+
+def _encode_list(w, o, kind=KIND_LIST, size=-1):
+ w(kind)
+ if size == -1:
+ size = len(o)
+ w(_pack_u32(size))
+ for elem in o:
+ _encode(w, elem)
- if isinstance(router, Router):
- return router.context_by_id(context_id, name=name)
- return Context(None, context_id, name) # For plain Jane pickle.
+
+def _encode_dict(w, o, kind=KIND_DICT):
+ _encode_list(
+ w,
+ (vv for item in iteritems(o) for vv in item),
+ kind,
+ len(o),
+ )
+
+
+def _encode_context(w, o):
+ w(KIND_CONTEXT)
+ w(_pack_u32(o.context_id))
+
+
+def _encode_sender(w, o):
+ w(KIND_SENDER)
+ w(_pack_u32(o.context.context_id))
+ w(_pack_u32(o.dst_handle))
+
+
+_ENCODE_MAP = {
+ bool: lambda w, o: w(_bools[o]),
+ type(None): lambda w, o: w(KIND_NONE),
+ int: _encode_int,
+ long: _encode_int,
+ float: _encode_double,
+ BytesType: _encode_s,
+ UnicodeType: lambda w, o: _encode_s(w, o.encode('utf-8'), KIND_UTF8),
+ Blob: lambda w, o: _encode_s(w, o, KIND_BLOB),
+ Secret: lambda w, o: _encode_s(w, o.encode('utf-8'), KIND_SECRET),
+ CallError: lambda w, o: _encode_s(w, str(o).encode('utf-8'), KIND_ERROR),
+ Context: _encode_context,
+ Sender: _encode_sender,
+ list: _encode_list,
+ tuple: lambda w, o: _encode_list(w, o, KIND_TUPLE),
+ set: lambda w, o: _encode_list(w, o, KIND_SET),
+ dict: lambda w, o: _encode_dict(w, o, KIND_DICT),
+ Kwargs: lambda w, o: _encode_dict(w, o, KIND_KWARGS),
+}
+
+
+def _encode(w, o):
+ try:
+ return _ENCODE_MAP[o.__class__](w, o)
+ except KeyError:
+ for key in _ENCODE_MAP:
+ if isinstance(o, key):
+ _ENCODE_MAP[o.__class__] = _ENCODE_MAP[key]
+ return _encode(w, o)
+ raise TypeError('cannot serialize ' + str(o.__class__))
+
+
+def encode(o):
+ io = []
+ _encode(io.append, o)
+ return b('').join(io)
+
+
+def _decode(read, router):
+ op = read(1)
+ if not op:
+ raise ValueError('cannot decode: truncated input')
+
+ try:
+ return _DECODE_MAP[op](read, router)
+ except KeyError:
+ raise ValueError('cannot decode: unknown opcode %r' % (ord(op),))
+
+
+def decode(s, router=None):
+ return _decode(BytesIO(s).read, router)
class Poller(object):
@@ -3634,7 +3699,6 @@ class Dispatcher(object):
self._service_recv.notify = None
self.recv.close()
-
@classmethod
@takes_econtext
def forget_chain(cls, chain_id, econtext):
diff --git a/mitogen/utils.py b/mitogen/utils.py
index b1347d02..d8d546a8 100644
--- a/mitogen/utils.py
+++ b/mitogen/utils.py
@@ -194,33 +194,23 @@ PASSTHROUGH = (
def cast(obj):
"""
- Many tools love to subclass built-in types in order to implement useful
- functionality, such as annotating the safety of a Unicode string, or adding
- additional methods to a dict. However, cPickle loves to preserve those
- subtypes during serialization, resulting in CallError during :meth:`call
- ` in the target when it tries to deserialize
- the data.
+ Produce a copy of the object graph `obj` with any subclasses of types
+ supported for serialization replaced with the original type.
- This function walks the object graph `obj`, producing a copy with any
- custom sub-types removed. The functionality is not default since the
- resulting walk may be computationally expensive given a large enough graph.
+ Many tools use subclasses to annotate the safety of a string, or add
+ additional methods to a dict. Since Mitogen 0.2.9 and earlier used pickle,
+ which preserved those subtypes, errors could occur during deserialization
+ as custom types were not safe to unpickle.
- See :ref:`serialization-rules` for a list of supported types.
+ Mitogen 0.2.10 introduced a custom encoding that automatically strips
+ subclasses. This function therefore is only maintained for compatibility,
+ and simply returns the original object.
+
+ See :ref:`serialization-rules` for supported types.
:param obj:
Object to undecorate.
:returns:
Undecorated object.
"""
- if isinstance(obj, dict):
- return dict((cast(k), cast(v)) for k, v in iteritems(obj))
- if isinstance(obj, (list, tuple)):
- return [cast(v) for v in obj]
- if isinstance(obj, PASSTHROUGH):
- return obj
- if isinstance(obj, mitogen.core.UnicodeType):
- return mitogen.core.UnicodeType(obj)
- if isinstance(obj, mitogen.core.BytesType):
- return mitogen.core.BytesType(obj)
-
- raise TypeError("Cannot serialize: %r: %r" % (type(obj), obj))
+ return obj
diff --git a/tests/serialization_test.py b/tests/serialization_test.py
new file mode 100644
index 00000000..fdf06f58
--- /dev/null
+++ b/tests/serialization_test.py
@@ -0,0 +1,40 @@
+
+from mitogen.core import encode
+from mitogen.core import decode
+
+from mitogen.core import *
+
+assert KIND_TRUE == encode(True)
+assert True is decode(encode(True))
+
+assert KIND_FALSE == encode(False)
+assert False is decode(encode(False))
+
+assert KIND_NONE == encode(None)
+assert None is decode(encode(None))
+
+assert -(2**32-1) == decode(encode(-(2**32-1)))
+assert (2**32-1) == decode(encode(2**32-1))
+assert -(2**64-1) == decode(encode(-(2**64-1)))
+assert (2**64-1) == decode(encode(2**64-1))
+assert u'\N{snowman}' == decode(encode(u'\N{snowman}'))
+assert b'snowman' == decode(encode(b'snowman'))
+assert [] == decode(encode([]))
+assert [False, True] == decode(encode([False, True]))
+assert (False, True) == decode(encode((False, True)))
+assert set([False, True]) == decode(encode(set([False, True])))
+assert {'a': 0, 'b': 1} == decode(encode({'a': 0, 'b': 1}))
+
+assert type(decode(encode(Blob(b('dave'))))) is Blob
+assert type(decode(encode(Secret('dave')))) is Secret
+
+assert type(decode(encode(Kwargs({})))) is Kwargs
+assert Kwargs({'a': 1}) == decode(encode(Kwargs({'a': 1})))
+
+assert 1 == decode(encode(Context(None, 1))).context_id
+b = Broker()
+r = Router(b)
+try:
+ assert 1234 == decode(encode(Sender(Context(r, 1), 1234)), r).dst_handle
+finally:
+ b.shutdown()