|
|
|
@ -1036,6 +1036,16 @@ class Stream(BasicStream):
|
|
|
|
|
def construct(self):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def _internal_receive(self, broker, buf):
|
|
|
|
|
if self._input_buf and self._input_buf_len < 128:
|
|
|
|
|
self._input_buf[0] += buf
|
|
|
|
|
else:
|
|
|
|
|
self._input_buf.append(buf)
|
|
|
|
|
|
|
|
|
|
self._input_buf_len += len(buf)
|
|
|
|
|
while self._receive_one(broker):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def on_receive(self, broker):
|
|
|
|
|
"""Handle the next complete message on the stream. Raise
|
|
|
|
|
:py:class:`StreamError` on failure."""
|
|
|
|
@ -1045,14 +1055,7 @@ class Stream(BasicStream):
|
|
|
|
|
if not buf:
|
|
|
|
|
return self.on_disconnect(broker)
|
|
|
|
|
|
|
|
|
|
if self._input_buf and self._input_buf_len < 128:
|
|
|
|
|
self._input_buf[0] += buf
|
|
|
|
|
else:
|
|
|
|
|
self._input_buf.append(buf)
|
|
|
|
|
|
|
|
|
|
self._input_buf_len += len(buf)
|
|
|
|
|
while self._receive_one(broker):
|
|
|
|
|
pass
|
|
|
|
|
self._internal_receive(broker, buf)
|
|
|
|
|
|
|
|
|
|
HEADER_FMT = '>LLLLLL'
|
|
|
|
|
HEADER_LEN = struct.calcsize(HEADER_FMT)
|
|
|
|
|