|
|
|
@ -75,6 +75,7 @@ DEL_ROUTE = 104
|
|
|
|
|
ALLOCATE_ID = 105
|
|
|
|
|
SHUTDOWN = 106
|
|
|
|
|
LOAD_MODULE = 107
|
|
|
|
|
IS_DEAD = 999
|
|
|
|
|
|
|
|
|
|
PY3 = sys.version_info > (3,)
|
|
|
|
|
if PY3:
|
|
|
|
@ -146,30 +147,6 @@ class TimeoutError(Error):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Dead(object):
|
|
|
|
|
def __hash__(self):
|
|
|
|
|
return hash(Dead)
|
|
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
|
return type(other) is Dead
|
|
|
|
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
|
|
return type(other) is not Dead
|
|
|
|
|
|
|
|
|
|
def __reduce__(self):
|
|
|
|
|
return (_unpickle_dead, ())
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return '<Dead>'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _unpickle_dead():
|
|
|
|
|
return _DEAD
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_DEAD = Dead()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def has_parent_authority(msg, _stream=None):
|
|
|
|
|
return (msg.auth_id == mitogen.context_id or
|
|
|
|
|
msg.auth_id in mitogen.parent_ids)
|
|
|
|
@ -328,8 +305,6 @@ class Message(object):
|
|
|
|
|
if module == __name__:
|
|
|
|
|
if func == '_unpickle_call_error':
|
|
|
|
|
return _unpickle_call_error
|
|
|
|
|
elif func == '_unpickle_dead':
|
|
|
|
|
return _unpickle_dead
|
|
|
|
|
elif func == '_unpickle_sender':
|
|
|
|
|
return self._unpickle_sender
|
|
|
|
|
elif func == '_unpickle_context':
|
|
|
|
@ -337,6 +312,14 @@ class Message(object):
|
|
|
|
|
|
|
|
|
|
raise StreamError('cannot unpickle %r/%r', module, func)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_dead(self):
|
|
|
|
|
return self.reply_to == IS_DEAD
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def dead(cls, **kwargs):
|
|
|
|
|
return cls(reply_to=IS_DEAD, **kwargs)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def pickled(cls, obj, **kwargs):
|
|
|
|
|
self = cls(**kwargs)
|
|
|
|
@ -347,15 +330,20 @@ class Message(object):
|
|
|
|
|
self.data = cPickle.dumps(CallError(e), protocol=2)
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def reply(self, obj, **kwargs):
|
|
|
|
|
kwargs.setdefault('handle', self.reply_to)
|
|
|
|
|
self.router.route(
|
|
|
|
|
self.pickled(obj, dst_id=self.src_id, **kwargs)
|
|
|
|
|
)
|
|
|
|
|
def reply(self, msg, router=None, **kwargs):
|
|
|
|
|
if not isinstance(msg, Message):
|
|
|
|
|
msg = Message.pickled(msg)
|
|
|
|
|
msg.dst_id = self.src_id
|
|
|
|
|
msg.handle = self.reply_to
|
|
|
|
|
vars(msg).update(kwargs)
|
|
|
|
|
(self.router or router).route(msg)
|
|
|
|
|
|
|
|
|
|
def unpickle(self, throw=True, throw_dead=True):
|
|
|
|
|
"""Deserialize `data` into an object."""
|
|
|
|
|
_vv and IOLOG.debug('%r.unpickle()', self)
|
|
|
|
|
if throw_dead and self.is_dead:
|
|
|
|
|
raise ChannelError(ChannelError.remote_msg)
|
|
|
|
|
|
|
|
|
|
obj = self._unpickled
|
|
|
|
|
if obj is Message._unpickled:
|
|
|
|
|
fp = BytesIO(self.data)
|
|
|
|
@ -374,8 +362,6 @@ class Message(object):
|
|
|
|
|
raise StreamError('invalid message: %s', e)
|
|
|
|
|
|
|
|
|
|
if throw:
|
|
|
|
|
if obj == _DEAD and throw_dead:
|
|
|
|
|
raise ChannelError(ChannelError.remote_msg)
|
|
|
|
|
if isinstance(obj, CallError):
|
|
|
|
|
raise obj
|
|
|
|
|
|
|
|
|
@ -402,22 +388,12 @@ class Sender(object):
|
|
|
|
|
def close(self):
|
|
|
|
|
"""Indicate this channel is closed to the remote side."""
|
|
|
|
|
_vv and IOLOG.debug('%r.close()', self)
|
|
|
|
|
self.context.send(
|
|
|
|
|
Message.pickled(
|
|
|
|
|
_DEAD,
|
|
|
|
|
handle=self.dst_handle
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.context.send(Message.dead(handle=self.dst_handle))
|
|
|
|
|
|
|
|
|
|
def send(self, data):
|
|
|
|
|
"""Send `data` to the remote."""
|
|
|
|
|
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
|
|
|
|
|
self.context.send(
|
|
|
|
|
Message.pickled(
|
|
|
|
|
data,
|
|
|
|
|
handle=self.dst_handle
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.context.send(Message.pickled(data, handle=self.dst_handle))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _unpickle_sender(router, context_id, dst_handle):
|
|
|
|
@ -460,7 +436,7 @@ class Receiver(object):
|
|
|
|
|
self.notify(self)
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
self._latch.put(_DEAD)
|
|
|
|
|
self._latch.put(Message.dead())
|
|
|
|
|
|
|
|
|
|
def empty(self):
|
|
|
|
|
return self._latch.empty()
|
|
|
|
@ -468,10 +444,11 @@ class Receiver(object):
|
|
|
|
|
def get(self, timeout=None, block=True):
|
|
|
|
|
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
|
|
|
|
|
msg = self._latch.get(timeout=timeout, block=block)
|
|
|
|
|
#IOLOG.debug('%r.get() got %r', self, msg)
|
|
|
|
|
|
|
|
|
|
if msg == _DEAD:
|
|
|
|
|
raise ChannelError(ChannelError.local_msg)
|
|
|
|
|
if msg.is_dead:
|
|
|
|
|
if msg.src_id == mitogen.context_id:
|
|
|
|
|
raise ChannelError(ChannelError.local_msg)
|
|
|
|
|
else:
|
|
|
|
|
raise ChannelError(ChannelError.remote_msg)
|
|
|
|
|
return msg
|
|
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
@ -630,7 +607,7 @@ class Importer(object):
|
|
|
|
|
os.environ['PBR_VERSION'] = '0.0.0'
|
|
|
|
|
|
|
|
|
|
def _on_load_module(self, msg):
|
|
|
|
|
if msg == _DEAD:
|
|
|
|
|
if msg.is_dead:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
tup = msg.unpickle()
|
|
|
|
@ -1275,7 +1252,7 @@ class Router(object):
|
|
|
|
|
def _on_broker_exit(self):
|
|
|
|
|
while self._handle_map:
|
|
|
|
|
_, (_, func, _) = self._handle_map.popitem()
|
|
|
|
|
func(_DEAD)
|
|
|
|
|
func(Message.dead())
|
|
|
|
|
|
|
|
|
|
def register(self, context, stream):
|
|
|
|
|
_v and LOG.debug('register(%r, %r)', context, stream)
|
|
|
|
@ -1295,7 +1272,7 @@ class Router(object):
|
|
|
|
|
return msg.src_id == respondent.context_id
|
|
|
|
|
def on_disconnect():
|
|
|
|
|
if handle in self._handle_map:
|
|
|
|
|
fn(_DEAD)
|
|
|
|
|
fn(Message.dead())
|
|
|
|
|
del self._handle_map[handle]
|
|
|
|
|
listen(respondent, 'disconnect', on_disconnect)
|
|
|
|
|
|
|
|
|
@ -1309,7 +1286,7 @@ class Router(object):
|
|
|
|
|
fire(self, 'shutdown')
|
|
|
|
|
for handle, (persist, fn) in self._handle_map.iteritems():
|
|
|
|
|
_v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
|
|
|
|
|
fn(_DEAD)
|
|
|
|
|
fn(Message.dead())
|
|
|
|
|
|
|
|
|
|
refused_msg = 'Refused by policy.'
|
|
|
|
|
|
|
|
|
@ -1319,6 +1296,8 @@ class Router(object):
|
|
|
|
|
persist, fn, policy = self._handle_map[msg.handle]
|
|
|
|
|
except KeyError:
|
|
|
|
|
LOG.error('%r: invalid handle: %r', self, msg)
|
|
|
|
|
if msg.reply_to and not msg.is_dead:
|
|
|
|
|
msg.reply(Message.dead())
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if policy and not policy(msg, stream):
|
|
|
|
@ -1375,6 +1354,8 @@ class Router(object):
|
|
|
|
|
if stream is None:
|
|
|
|
|
LOG.error('%r: no route for %r, my ID is %r',
|
|
|
|
|
self, msg, mitogen.context_id)
|
|
|
|
|
if msg.reply_to and not msg.is_dead:
|
|
|
|
|
msg.reply(Message.dead(), router=self)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
stream._send(msg)
|
|
|
|
@ -1510,7 +1491,7 @@ class ExternalContext(object):
|
|
|
|
|
|
|
|
|
|
def _on_shutdown_msg(self, msg):
|
|
|
|
|
_v and LOG.debug('_on_shutdown_msg(%r)', msg)
|
|
|
|
|
if msg != _DEAD:
|
|
|
|
|
if not msg.is_dead:
|
|
|
|
|
self.broker.shutdown()
|
|
|
|
|
|
|
|
|
|
def _on_parent_disconnect(self):
|
|
|
|
|