@ -1015,8 +1015,8 @@ class Broker(object):
def __init__ ( self ) :
def __init__ ( self ) :
self . _alive = True
self . _alive = True
self . _queue = Queue . Queue ( )
self . _queue = Queue . Queue ( )
self . _readers = set ( )
self . _readers = [ ]
self . _writers = set ( )
self . _writers = [ ]
self . _waker = Waker ( self )
self . _waker = Waker ( self )
self . start_receive ( self . _waker )
self . start_receive ( self . _waker )
self . _thread = threading . Thread (
self . _thread = threading . Thread (
@ -1033,23 +1033,33 @@ class Broker(object):
self . _queue . put ( ( func , args , kwargs ) )
self . _queue . put ( ( func , args , kwargs ) )
self . _waker . wake ( )
self . _waker . wake ( )
def _list_discard ( self , lst , value ) :
try :
lst . remove ( value )
except ValueError :
pass
def _list_add ( self , lst , value ) :
if value not in lst :
lst . append ( value )
def start_receive ( self , stream ) :
def start_receive ( self , stream ) :
IOLOG . debug ( ' %r .start_receive( %r ) ' , self , stream )
IOLOG . debug ( ' %r .start_receive( %r ) ' , self , stream )
assert stream . receive_side and stream . receive_side . fd is not None
assert stream . receive_side and stream . receive_side . fd is not None
self . defer ( self . _readers . add , stream . receive_side )
self . defer ( self . _ list_add, self . _readers , stream . receive_side )
def stop_receive ( self , stream ) :
def stop_receive ( self , stream ) :
IOLOG . debug ( ' %r .stop_receive( %r ) ' , self , stream )
IOLOG . debug ( ' %r .stop_receive( %r ) ' , self , stream )
self . defer ( self . _readers . discard , stream . receive_side )
self . defer ( self . _ list_discard, self . _readers , stream . receive_side )
def start_transmit ( self , stream ) :
def start_transmit ( self , stream ) :
IOLOG . debug ( ' %r .start_transmit( %r ) ' , self , stream )
IOLOG . debug ( ' %r .start_transmit( %r ) ' , self , stream )
assert stream . transmit_side and stream . transmit_side . fd is not None
assert stream . transmit_side and stream . transmit_side . fd is not None
self . defer ( self . _ writers. add , stream . transmit_side )
self . defer ( self . _ list_add, self . _writers , stream . transmit_side )
def stop_transmit ( self , stream ) :
def stop_transmit ( self , stream ) :
IOLOG . debug ( ' %r .stop_transmit( %r ) ' , self , stream )
IOLOG . debug ( ' %r .stop_transmit( %r ) ' , self , stream )
self . defer ( self . _ writers. discard , stream . transmit_side )
self . defer ( self . _ list_discard, self . _writers , stream . transmit_side )
def _call ( self , stream , func ) :
def _call ( self , stream , func ) :
try :
try :
@ -1096,7 +1106,7 @@ class Broker(object):
self . _run_defer ( )
self . _run_defer ( )
fire ( self , ' shutdown ' )
fire ( self , ' shutdown ' )
for side in self . _readers | self . _writers :
for side in set ( self . _readers ) . union ( self . _writers ) :
self . _call ( side . stream , side . stream . on_shutdown )
self . _call ( side . stream , side . stream . on_shutdown )
deadline = time . time ( ) + self . shutdown_timeout
deadline = time . time ( ) + self . shutdown_timeout
@ -1109,7 +1119,7 @@ class Broker(object):
' more child processes still connected to '
' more child processes still connected to '
' our stdout/stderr pipes. ' , self )
' our stdout/stderr pipes. ' , self )
for side in self . _readers | self . _writers :
for side in set ( self . _readers ) . union ( self . _writers ) :
LOG . error ( ' _broker_main() force disconnecting %r ' , side )
LOG . error ( ' _broker_main() force disconnecting %r ' , side )
side . stream . on_disconnect ( self )
side . stream . on_disconnect ( self )
except Exception :
except Exception :