@ -638,7 +638,7 @@ class Stream(BasicStream):
msg . data = self . _input_buf [ self . HEADER_LEN : self . HEADER_LEN + msg_len ]
msg . data = self . _input_buf [ self . HEADER_LEN : self . HEADER_LEN + msg_len ]
self . _input_buf = self . _input_buf [ self . HEADER_LEN + msg_len : ]
self . _input_buf = self . _input_buf [ self . HEADER_LEN + msg_len : ]
self . _router . _async_route ( msg )
self . _router . _async_route ( msg , self )
return True
return True
def on_transmit ( self , broker ) :
def on_transmit ( self , broker ) :
@ -929,8 +929,16 @@ class Router(object):
except Exception :
except Exception :
LOG . exception ( ' %r ._invoke( %r ): %r crashed ' , self , msg , fn )
LOG . exception ( ' %r ._invoke( %r ): %r crashed ' , self , msg , fn )
def _async_route ( self , msg ) :
def _async_route ( self , msg , stream = None ) :
IOLOG . debug ( ' %r ._async_route( %r ) ' , self , msg )
IOLOG . debug ( ' %r ._async_route( %r , %r ) ' , self , msg , stream )
# Perform source verification.
if stream is not None :
expected_stream = self . _stream_by_id . get ( msg . src_id ,
self . _stream_by_id . get ( mitogen . parent_id ) )
if stream != expected_stream :
LOG . error ( ' %r : bad source: got %r from %r , should be from %r ' ,
self , msg , stream , expected_stream )
if msg . dst_id == mitogen . context_id :
if msg . dst_id == mitogen . context_id :
return self . _invoke ( msg )
return self . _invoke ( msg )