|
|
|
@ -217,6 +217,7 @@ def enable_profiling():
|
|
|
|
|
class Message(object):
|
|
|
|
|
dst_id = None
|
|
|
|
|
src_id = None
|
|
|
|
|
auth_id = None
|
|
|
|
|
handle = None
|
|
|
|
|
reply_to = None
|
|
|
|
|
data = ''
|
|
|
|
@ -225,6 +226,7 @@ class Message(object):
|
|
|
|
|
|
|
|
|
|
def __init__(self, **kwargs):
|
|
|
|
|
self.src_id = mitogen.context_id
|
|
|
|
|
self.auth_id = mitogen.context_id
|
|
|
|
|
vars(self).update(kwargs)
|
|
|
|
|
|
|
|
|
|
def _unpickle_context(self, context_id, name):
|
|
|
|
@ -264,9 +266,9 @@ class Message(object):
|
|
|
|
|
raise StreamError('invalid message: %s', ex)
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return 'Message(%r, %r, %r, %r, %r..%d)' % (
|
|
|
|
|
self.dst_id, self.src_id, self.handle, self.reply_to,
|
|
|
|
|
(self.data or '')[:50], len(self.data)
|
|
|
|
|
return 'Message(%r, %r, %r, %r, %r, %r..%d)' % (
|
|
|
|
|
self.dst_id, self.src_id, self.auth_id, self.handle,
|
|
|
|
|
self.reply_to, (self.data or '')[:50], len(self.data)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -307,8 +309,6 @@ def _queue_interruptible_get(queue, timeout=None, block=True):
|
|
|
|
|
if timeout is not None:
|
|
|
|
|
timeout += time.time()
|
|
|
|
|
|
|
|
|
|
LOG.info('timeout = %r, block = %r', timeout, block)
|
|
|
|
|
|
|
|
|
|
msg = None
|
|
|
|
|
while msg is None and (timeout is None or timeout > time.time()):
|
|
|
|
|
try:
|
|
|
|
@ -407,6 +407,7 @@ class Importer(object):
|
|
|
|
|
'mitogen.compat.pkgutil',
|
|
|
|
|
'mitogen.fakessh',
|
|
|
|
|
'mitogen.master',
|
|
|
|
|
'mitogen.parent',
|
|
|
|
|
'mitogen.ssh',
|
|
|
|
|
'mitogen.sudo',
|
|
|
|
|
'mitogen.utils',
|
|
|
|
@ -635,6 +636,7 @@ class Stream(BasicStream):
|
|
|
|
|
protocol <stream-protocol>`.
|
|
|
|
|
"""
|
|
|
|
|
_input_buf = ''
|
|
|
|
|
auth_id = None
|
|
|
|
|
|
|
|
|
|
def __init__(self, router, remote_id, **kwargs):
|
|
|
|
|
self._router = router
|
|
|
|
@ -663,7 +665,7 @@ class Stream(BasicStream):
|
|
|
|
|
if not buf:
|
|
|
|
|
return self.on_disconnect(broker)
|
|
|
|
|
|
|
|
|
|
HEADER_FMT = '>hhLLL'
|
|
|
|
|
HEADER_FMT = '>hhhLLL'
|
|
|
|
|
HEADER_LEN = struct.calcsize(HEADER_FMT)
|
|
|
|
|
|
|
|
|
|
def _receive_one(self, broker):
|
|
|
|
@ -674,7 +676,7 @@ class Stream(BasicStream):
|
|
|
|
|
# To support unpickling Contexts.
|
|
|
|
|
msg.router = self._router
|
|
|
|
|
|
|
|
|
|
(msg.dst_id, msg.src_id,
|
|
|
|
|
(msg.dst_id, msg.src_id, msg.auth_id,
|
|
|
|
|
msg.handle, msg.reply_to, msg_len) = struct.unpack(
|
|
|
|
|
self.HEADER_FMT,
|
|
|
|
|
self._input_buf[:self.HEADER_LEN]
|
|
|
|
@ -711,7 +713,7 @@ class Stream(BasicStream):
|
|
|
|
|
|
|
|
|
|
def _send(self, msg):
|
|
|
|
|
IOLOG.debug('%r._send(%r)', self, msg)
|
|
|
|
|
pkt = struct.pack('>hhLLL', msg.dst_id, msg.src_id,
|
|
|
|
|
pkt = struct.pack('>hhhLLL', msg.dst_id, msg.src_id, msg.auth_id,
|
|
|
|
|
msg.handle, msg.reply_to or 0, len(msg.data)
|
|
|
|
|
) + msg.data
|
|
|
|
|
self._output_buf.append(pkt)
|
|
|
|
@ -765,8 +767,6 @@ class Context(object):
|
|
|
|
|
"""send `obj` to `handle`, and tell the broker we have output. May
|
|
|
|
|
be called from any thread."""
|
|
|
|
|
msg.dst_id = self.context_id
|
|
|
|
|
if msg.src_id is None:
|
|
|
|
|
msg.src_id = mitogen.context_id
|
|
|
|
|
self.router.route(msg)
|
|
|
|
|
|
|
|
|
|
def send_async(self, msg, persist=False):
|
|
|
|
@ -979,10 +979,10 @@ class Router(object):
|
|
|
|
|
IOLOG.debug('%r._async_route(%r, %r)', self, msg, stream)
|
|
|
|
|
# Perform source verification.
|
|
|
|
|
if stream is not None:
|
|
|
|
|
expected_stream = self._stream_by_id.get(msg.src_id,
|
|
|
|
|
expected_stream = self._stream_by_id.get(msg.auth_id,
|
|
|
|
|
self._stream_by_id.get(mitogen.parent_id))
|
|
|
|
|
if stream != expected_stream:
|
|
|
|
|
LOG.error('%r: bad source: got %r from %r, should be from %r',
|
|
|
|
|
LOG.error('%r: bad source: got auth ID %r from %r, should be from %r',
|
|
|
|
|
self, msg, stream, expected_stream)
|
|
|
|
|
|
|
|
|
|
if msg.dst_id == mitogen.context_id:
|
|
|
|
|