|
|
@ -766,14 +766,14 @@ class Stream(BasicStream):
|
|
|
|
|
|
|
|
|
|
|
|
msg = Message()
|
|
|
|
msg = Message()
|
|
|
|
msg.router = self._router
|
|
|
|
msg.router = self._router
|
|
|
|
|
|
|
|
|
|
|
|
(msg.dst_id, msg.src_id, msg.auth_id,
|
|
|
|
(msg.dst_id, msg.src_id, msg.auth_id,
|
|
|
|
msg.handle, msg.reply_to, msg_len) = struct.unpack(
|
|
|
|
msg.handle, msg.reply_to, msg_len) = struct.unpack(
|
|
|
|
self.HEADER_FMT,
|
|
|
|
self.HEADER_FMT,
|
|
|
|
self._input_buf[0][:self.HEADER_LEN],
|
|
|
|
self._input_buf[0][:self.HEADER_LEN],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if (self._input_buf_len - self.HEADER_LEN) < msg_len:
|
|
|
|
total_len = msg_len + self.HEADER_LEN
|
|
|
|
|
|
|
|
if self._input_buf_len < total_len:
|
|
|
|
_vv and IOLOG.debug(
|
|
|
|
_vv and IOLOG.debug(
|
|
|
|
'%r: Input too short (want %d, got %d)',
|
|
|
|
'%r: Input too short (want %d, got %d)',
|
|
|
|
self, msg_len, self._input_buf_len - self.HEADER_LEN
|
|
|
|
self, msg_len, self._input_buf_len - self.HEADER_LEN
|
|
|
@ -782,7 +782,7 @@ class Stream(BasicStream):
|
|
|
|
|
|
|
|
|
|
|
|
start = self.HEADER_LEN
|
|
|
|
start = self.HEADER_LEN
|
|
|
|
prev_start = start
|
|
|
|
prev_start = start
|
|
|
|
remain = msg_len + start
|
|
|
|
remain = total_len
|
|
|
|
bits = []
|
|
|
|
bits = []
|
|
|
|
while remain:
|
|
|
|
while remain:
|
|
|
|
buf = self._input_buf.popleft()
|
|
|
|
buf = self._input_buf.popleft()
|
|
|
@ -794,7 +794,7 @@ class Stream(BasicStream):
|
|
|
|
|
|
|
|
|
|
|
|
msg.data = ''.join(bits)
|
|
|
|
msg.data = ''.join(bits)
|
|
|
|
self._input_buf.appendleft(buf[prev_start+len(bit):])
|
|
|
|
self._input_buf.appendleft(buf[prev_start+len(bit):])
|
|
|
|
self._input_buf_len -= self.HEADER_LEN + msg_len
|
|
|
|
self._input_buf_len -= total_len
|
|
|
|
self._router._async_route(msg, self)
|
|
|
|
self._router._async_route(msg, self)
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|