@ -832,10 +832,6 @@ class Stream(BasicStream):
be called from any thread . """
self . _router . broker . defer ( self . _send , msg )
def on_disconnect ( self , broker ) :
super ( Stream , self ) . on_disconnect ( broker )
self . _router . on_disconnect ( self , broker )
def on_shutdown ( self , broker ) :
""" Override BasicStream behaviour of immediately disconnecting. """
_v and LOG . debug ( ' %r .on_shutdown( %r ) ' , self , broker )
@ -862,9 +858,8 @@ class Context(object):
return _unpickle_context , ( self . context_id , self . name )
def on_disconnect ( self , broker ) :
_v and LOG . debug ( ' Parent stream is gone, dying. ' )
_v and LOG . debug ( ' %r .on_disconnect() ' , self )
fire ( self , ' disconnect ' )
broker . shutdown ( )
def on_shutdown ( self , broker ) :
pass
@ -1118,8 +1113,7 @@ class Router(object):
def __repr__ ( self ) :
return ' Router( %r ) ' % ( self . broker , )
def on_disconnect ( self , stream , broker ) :
""" Invoked by Stream.on_disconnect(). """
def on_stream_disconnect ( self , stream ) :
for context in self . _context_by_id . itervalues ( ) :
stream_ = self . _stream_by_id . get ( context . context_id )
if stream_ is stream :
@ -1138,6 +1132,7 @@ class Router(object):
self . _stream_by_id [ context . context_id ] = stream
self . _context_by_id [ context . context_id ] = context
self . broker . start_receive ( stream )
listen ( stream , ' disconnect ' , lambda : self . on_stream_disconnect ( stream ) )
def add_handler ( self , fn , handle = None , persist = True , respondent = None ) :
handle = handle or self . _last_handle . next ( )
@ -1359,6 +1354,10 @@ class ExternalContext(object):
return
self . broker . shutdown ( )
def _on_parent_disconnect ( self ) :
_v and LOG . debug ( ' %r : parent stream is gone, dying. ' , self )
self . broker . shutdown ( )
def _setup_master ( self , profiling , parent_id , context_id , in_fd , out_fd ) :
self . profiling = profiling
if profiling :
@ -1378,6 +1377,7 @@ class ExternalContext(object):
self . stream . accept ( in_fd , out_fd )
self . stream . receive_side . keep_alive = False
listen ( self . stream , ' disconnect ' , self . _on_parent_disconnect )
listen ( self . broker , ' shutdown ' , self . _on_broker_shutdown )
listen ( self . broker , ' exit ' , self . _on_broker_exit )