@ -1945,17 +1945,14 @@ class Waker(BasicStream):
def on_receive ( self , broker ) :
def on_receive ( self , broker ) :
"""
"""
Drain the pipe and fire callbacks . Reading multiple bytes is safe since
Drain the pipe and fire callbacks . Since : attr : ` _deferred ` is
new bytes corresponding to future . defer ( ) calls are written only after
synchronized , : meth : ` defer ` and : meth : ` on_receive ` can conspire to
. defer ( ) takes _lock : either a byte we read corresponds to something
ensure only one byte needs to be pending regardless of queue length .
already on the queue by the time we take _lock , or a byte remains
buffered , causing another wake up , because it was written after we
released _lock .
"""
"""
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
self . receive_side . read ( 128 )
self . _lock . acquire ( )
self . _lock . acquire ( )
try :
try :
self . receive_side . read ( 1 )
deferred = self . _deferred
deferred = self . _deferred
self . _deferred = [ ]
self . _deferred = [ ]
finally :
finally :
@ -1969,6 +1966,18 @@ class Waker(BasicStream):
func , args , kwargs )
func , args , kwargs )
self . _broker . shutdown ( )
self . _broker . shutdown ( )
def _wake ( self ) :
"""
Wake the multiplexer by writing a byte . If Broker is midway through
teardown , the FD may already be closed , so ignore EBADF .
"""
try :
self . transmit_side . write ( b ( ' ' ) )
except OSError :
e = sys . exc_info ( ) [ 1 ]
if e . args [ 0 ] != errno . EBADF :
raise
def defer ( self , func , * args , * * kwargs ) :
def defer ( self , func , * args , * * kwargs ) :
if threading . currentThread ( ) . ident == self . broker_ident :
if threading . currentThread ( ) . ident == self . broker_ident :
_vv and IOLOG . debug ( ' %r .defer() [immediate] ' , self )
_vv and IOLOG . debug ( ' %r .defer() [immediate] ' , self )
@ -1977,20 +1986,12 @@ class Waker(BasicStream):
_vv and IOLOG . debug ( ' %r .defer() [fd= %r ] ' , self , self . transmit_side . fd )
_vv and IOLOG . debug ( ' %r .defer() [fd= %r ] ' , self , self . transmit_side . fd )
self . _lock . acquire ( )
self . _lock . acquire ( )
try :
try :
if not self . _deferred :
self . _wake ( )
self . _deferred . append ( ( func , args , kwargs ) )
self . _deferred . append ( ( func , args , kwargs ) )
finally :
finally :
self . _lock . release ( )
self . _lock . release ( )
# Wake the multiplexer by writing a byte. If the broker is in the midst
# of tearing itself down, the waker fd may already have been closed, so
# ignore EBADF here.
try :
self . transmit_side . write ( b ( ' ' ) )
except OSError :
e = sys . exc_info ( ) [ 1 ]
if e . args [ 0 ] != errno . EBADF :
raise
class IoLogger ( BasicStream ) :
class IoLogger ( BasicStream ) :
"""
"""