|
|
|
|
@ -801,6 +801,9 @@ class Message(object):
|
|
|
|
|
:class:`mitogen.core.Router` for ingress messages, and helper methods for
|
|
|
|
|
deserialization and generating replies.
|
|
|
|
|
"""
|
|
|
|
|
ENCS = frozenset(range(0x4d49, 0x4d49+2))
|
|
|
|
|
ENC_MGC, ENC_PKL = sorted(ENCS)
|
|
|
|
|
|
|
|
|
|
#: Integer target context ID. :class:`Router` delivers messages locally
|
|
|
|
|
#: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise
|
|
|
|
|
#: they are routed up or downstream.
|
|
|
|
|
@ -827,6 +830,11 @@ class Message(object):
|
|
|
|
|
#: Raw message data bytes.
|
|
|
|
|
data = b('')
|
|
|
|
|
|
|
|
|
|
#: Encoding of payload in :attr:`data`, one of the ``ENC_*`` constants.
|
|
|
|
|
#: :attr:`ENC_MGC` is an implicit, legacy value. New features &
|
|
|
|
|
#: :ref:`standard-handles` should explicitly declare an encoding.
|
|
|
|
|
enc = ENC_MGC
|
|
|
|
|
|
|
|
|
|
_unpickled = object()
|
|
|
|
|
|
|
|
|
|
#: The :class:`Router` responsible for routing the message. This is
|
|
|
|
|
@ -839,7 +847,7 @@ class Message(object):
|
|
|
|
|
|
|
|
|
|
HEADER_FMT = '>hLLLLLL'
|
|
|
|
|
HEADER_LEN = struct.calcsize(HEADER_FMT)
|
|
|
|
|
HEADER_MAGIC = 0x4d49 # 'MI'
|
|
|
|
|
HEADER_MAGIC = ENC_MGC
|
|
|
|
|
|
|
|
|
|
def __init__(self, **kwargs):
|
|
|
|
|
"""
|
|
|
|
|
@ -850,10 +858,12 @@ class Message(object):
|
|
|
|
|
self.auth_id = mitogen.context_id
|
|
|
|
|
vars(self).update(kwargs)
|
|
|
|
|
assert isinstance(self.data, BytesType), 'Message data is not Bytes'
|
|
|
|
|
if self.enc not in self.ENCS:
|
|
|
|
|
raise ValueError('Invalid enc: %r' % (self.enc,))
|
|
|
|
|
|
|
|
|
|
def pack(self):
|
|
|
|
|
return (
|
|
|
|
|
struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, self.dst_id,
|
|
|
|
|
struct.pack(self.HEADER_FMT, self.enc, self.dst_id,
|
|
|
|
|
self.src_id, self.auth_id, self.handle,
|
|
|
|
|
self.reply_to or 0, len(self.data))
|
|
|
|
|
+ self.data
|
|
|
|
|
@ -920,7 +930,7 @@ class Message(object):
|
|
|
|
|
:returns:
|
|
|
|
|
The new message.
|
|
|
|
|
"""
|
|
|
|
|
self = cls(**kwargs)
|
|
|
|
|
self = cls(enc=cls.ENC_PKL, **kwargs)
|
|
|
|
|
try:
|
|
|
|
|
self.data = pickle__dumps(obj, protocol=2)
|
|
|
|
|
except pickle.PicklingError:
|
|
|
|
|
@ -2386,13 +2396,13 @@ class MitogenProtocol(Protocol):
|
|
|
|
|
|
|
|
|
|
msg = Message()
|
|
|
|
|
msg.router = self._router
|
|
|
|
|
(magic, msg.dst_id, msg.src_id, msg.auth_id,
|
|
|
|
|
(msg.enc, msg.dst_id, msg.src_id, msg.auth_id,
|
|
|
|
|
msg.handle, msg.reply_to, msg_len) = struct.unpack(
|
|
|
|
|
Message.HEADER_FMT,
|
|
|
|
|
self._input_buf[0][:Message.HEADER_LEN],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if magic != Message.HEADER_MAGIC:
|
|
|
|
|
if msg.enc not in Message.ENCS:
|
|
|
|
|
LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048])
|
|
|
|
|
self.stream.on_disconnect(broker)
|
|
|
|
|
return False
|
|
|
|
|
|