@ -695,8 +695,6 @@ class Stream(BasicStream):
: py : class : ` BasicStream ` subclass implementing mitogen ' s :ref:`stream
: py : class : ` BasicStream ` subclass implementing mitogen ' s :ref:`stream
protocol < stream - protocol > ` .
protocol < stream - protocol > ` .
"""
"""
_input_buf = ' '
#: If not ``None``, :py:class:`Router` stamps this into
#: If not ``None``, :py:class:`Router` stamps this into
#: :py:attr:`Message.auth_id` of every message received on this stream.
#: :py:attr:`Message.auth_id` of every message received on this stream.
auth_id = None
auth_id = None
@ -707,6 +705,8 @@ class Stream(BasicStream):
self . name = ' default '
self . name = ' default '
self . sent_modules = set ( )
self . sent_modules = set ( )
self . construct ( * * kwargs )
self . construct ( * * kwargs )
self . _input_buf = collections . deque ( )
self . _input_buf_len = 0
self . _output_buf = collections . deque ( )
self . _output_buf = collections . deque ( )
def construct ( self ) :
def construct ( self ) :
@ -718,40 +718,55 @@ class Stream(BasicStream):
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
buf = self . receive_side . read ( )
buf = self . receive_side . read ( )
if buf is None :
if buf :
buf = ' '
if self . _input_buf and self . _input_buf_len < 128 :
self . _input_buf [ 0 ] + = buf
self . _input_buf + = buf
else :
self . _input_buf . append ( buf )
self . _input_buf_len + = len ( buf )
while self . _receive_one ( broker ) :
while self . _receive_one ( broker ) :
pass
pass
else :
if not buf :
return self . on_disconnect ( broker )
return self . on_disconnect ( broker )
HEADER_FMT = ' >hhhLLL '
HEADER_FMT = ' >hhhLLL '
HEADER_LEN = struct . calcsize ( HEADER_FMT )
HEADER_LEN = struct . calcsize ( HEADER_FMT )
def _receive_one ( self , broker ) :
def _receive_one ( self , broker ) :
if len ( self . _input_buf ) < self . HEADER_LEN :
if self . _input_buf_len < self . HEADER_LEN :
return False
return False
msg = Message ( )
msg = Message ( )
# To support unpickling Contexts.
msg . router = self . _router
msg . router = self . _router
( msg . dst_id , msg . src_id , msg . auth_id ,
( msg . dst_id , msg . src_id , msg . auth_id ,
msg . handle , msg . reply_to , msg_len ) = struct . unpack (
msg . handle , msg . reply_to , msg_len ) = struct . unpack (
self . HEADER_FMT ,
self . HEADER_FMT ,
self . _input_buf [ : self . HEADER_LEN ]
self . _input_buf [ 0 ] [ : self . HEADER_LEN ] ,
)
)
if ( len ( self . _input_buf ) - self . HEADER_LEN ) < msg_len :
if ( self . _input_buf_len - self . HEADER_LEN ) < msg_len :
_vv and IOLOG . debug ( ' %r : Input too short (want %d , got %d ) ' ,
_vv and IOLOG . debug (
self , msg_len , len ( self . _input_buf ) - self . HEADER_LEN )
' %r : Input too short (want %d , got %d ) ' ,
self , msg_len , self . _input_buf_len - self . HEADER_LEN
)
return False
return False
msg . data = self . _input_buf [ self . HEADER_LEN : self . HEADER_LEN + msg_len ]
start = self . HEADER_LEN
self . _input_buf = self . _input_buf [ self . HEADER_LEN + msg_len : ]
prev_start = start
remain = msg_len + start
bits = [ ]
while remain :
buf = self . _input_buf . popleft ( )
bit = buf [ start : remain ]
bits . append ( bit )
remain - = len ( bit ) + start
prev_start = start
start = 0
msg . data = ' ' . join ( bits )
self . _input_buf . appendleft ( buf [ prev_start + len ( bit ) : ] )
self . _input_buf_len - = self . HEADER_LEN + msg_len
self . _router . _async_route ( msg , self )
self . _router . _async_route ( msg , self )
return True
return True