diff --git a/mitogen/core.py b/mitogen/core.py index 17f4482b..6c58da16 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -254,17 +254,27 @@ class Message(object): self.data = cPickle.dumps(CallError(e), protocol=2) return self - def unpickle(self): + def unpickle(self, throw=True): """Deserialize `data` into an object.""" IOLOG.debug('%r.unpickle()', self) fp = cStringIO.StringIO(self.data) unpickler = cPickle.Unpickler(fp) unpickler.find_global = self._find_global + try: - return unpickler.load() + # Must occur off the broker thread. + obj = unpickler.load() except (TypeError, ValueError), ex: raise StreamError('invalid message: %s', ex) + if throw: + if obj == _DEAD: + raise ChannelError(ChannelError.remote_msg) + if isinstance(obj, CallError): + raise obj + + return obj + def __repr__(self): return 'Message(%r, %r, %r, %r, %r, %r..%d)' % ( self.dst_id, self.src_id, self.auth_id, self.handle, @@ -358,19 +368,7 @@ class Receiver(object): if msg == _DEAD: raise ChannelError(ChannelError.local_msg) - - # Must occur off the broker thread. - data = msg.unpickle() - if data == _DEAD and self.raise_channelerror: - raise ChannelError(ChannelError.remote_msg) - - if isinstance(data, CallError): - raise data - - return msg, data - - def get_data(self, timeout=None): - return self.get(timeout)[1] + return msg def __iter__(self): while True: @@ -1230,7 +1228,8 @@ class ExternalContext(object): fp.close() def _dispatch_calls(self): - for msg, data in self.channel: + for msg in self.channel: + data = msg.unpickle(throw=False) LOG.debug('_dispatch_calls(%r)', data) if msg.src_id not in mitogen.parent_ids: LOG.warning('CALL_FUNCTION from non-parent %r', msg.src_id) diff --git a/mitogen/master.py b/mitogen/master.py index 3f79cecf..236127e9 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -564,17 +564,15 @@ class Context(mitogen.core.Context): else: klass = None - recv = self.send_async( + return self.send_async( mitogen.core.Message.pickled( (fn.__module__, klass, fn.__name__, args, kwargs), handle=mitogen.core.CALL_FUNCTION, ) ) - recv.raise_channelerror = False - return recv def call(self, fn, *args, **kwargs): - return self.call_async(fn, *args, **kwargs).get_data() + return self.call_async(fn, *args, **kwargs).get().unpickle() class Router(mitogen.parent.Router):