@ -2,6 +2,7 @@
import Queue
import cPickle
import cStringIO
import collections
import errno
import fcntl
import imp
@ -586,13 +587,13 @@ class Stream(BasicStream):
protocol < stream - protocol > ` .
"""
_input_buf = ' '
_output_buf = ' '
def __init__ ( self , router , remote_id , * * kwargs ) :
self . _router = router
self . remote_id = remote_id
self . name = ' default '
self . construct ( * * kwargs )
self . _output_buf = collections . deque ( )
def construct ( self ) :
pass
@ -643,14 +644,19 @@ class Stream(BasicStream):
def on_transmit ( self , broker ) :
""" Transmit buffered messages. """
IOLOG . debug ( ' %r .on_transmit() ' , self )
written = self . transmit_side . write ( self . _output_buf )
if written is None :
LOG . debug ( ' %r .on_transmit(): disconnection detected ' , self )
self . on_disconnect ( broker )
return
IOLOG . debug ( ' %r .on_transmit() -> len %d ' , self , written )
self . _output_buf = self . _output_buf [ written : ]
if self . _output_buf :
buf = self . _output_buf . popleft ( )
written = self . transmit_side . write ( buf )
if not written :
LOG . debug ( ' %r .on_transmit(): disconnection detected ' , self )
self . on_disconnect ( broker )
return
elif written != len ( buf ) :
self . _output_buf . appendleft ( buf [ written : ] )
IOLOG . debug ( ' %r .on_transmit() -> len %d ' , self , written )
if not self . _output_buf :
broker . stop_transmit ( self )
@ -659,7 +665,7 @@ class Stream(BasicStream):
pkt = struct . pack ( ' >hhLLL ' , msg . dst_id , msg . src_id ,
msg . handle , msg . reply_to or 0 , len ( msg . data )
) + msg . data
self . _output_buf + = pkt
self . _output_buf . append ( pkt )
self . _router . broker . start_transmit ( self )
def send ( self , msg ) :