|
|
|
@ -281,6 +281,7 @@ class Message(object):
|
|
|
|
|
handle = None
|
|
|
|
|
reply_to = None
|
|
|
|
|
data = ''
|
|
|
|
|
_unpickled = object()
|
|
|
|
|
|
|
|
|
|
router = None
|
|
|
|
|
receiver = None
|
|
|
|
@ -324,15 +325,18 @@ class Message(object):
|
|
|
|
|
def unpickle(self, throw=True, throw_dead=True):
|
|
|
|
|
"""Deserialize `data` into an object."""
|
|
|
|
|
_vv and IOLOG.debug('%r.unpickle()', self)
|
|
|
|
|
fp = cStringIO.StringIO(self.data)
|
|
|
|
|
unpickler = cPickle.Unpickler(fp)
|
|
|
|
|
unpickler.find_global = self._find_global
|
|
|
|
|
obj = self._unpickled
|
|
|
|
|
if obj is Message._unpickled:
|
|
|
|
|
fp = cStringIO.StringIO(self.data)
|
|
|
|
|
unpickler = cPickle.Unpickler(fp)
|
|
|
|
|
unpickler.find_global = self._find_global
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Must occur off the broker thread.
|
|
|
|
|
obj = unpickler.load()
|
|
|
|
|
except (TypeError, ValueError), ex:
|
|
|
|
|
raise StreamError('invalid message: %s', ex)
|
|
|
|
|
try:
|
|
|
|
|
# Must occur off the broker thread.
|
|
|
|
|
obj = unpickler.load()
|
|
|
|
|
self._unpickled = obj
|
|
|
|
|
except (TypeError, ValueError), ex:
|
|
|
|
|
raise StreamError('invalid message: %s', ex)
|
|
|
|
|
|
|
|
|
|
if throw:
|
|
|
|
|
if obj == _DEAD and throw_dead:
|
|
|
|
|