@ -36,13 +36,16 @@ FORWARD_LOG = 102
class Error ( Exception ) :
""" Raised when a problem occurs with a context ."""
""" Base for all exceptions raised by this module ."""
def __init__ ( self , fmt , * args ) :
Exception . __init__ ( self , fmt % args )
class CallError ( Error ) :
""" Raised when .call() fails. """
""" Raised when :py:meth:`Context.call() <econtext.master.Context.call>`
fails . A copy of the traceback from the external context is appended to the
exception message .
"""
def __init__ ( self , e ) :
name = ' %s . %s ' % ( type ( e ) . __module__ , type ( e ) . __name__ )
tb = sys . exc_info ( ) [ 2 ]
@ -73,7 +76,7 @@ class Dead(object):
return ' <Dead> '
#: Sentinel value used to represent Channel disconnection.
#: Sentinel value used to represent :py:class:` Channel` disconnection.
_DEAD = Dead ( )
@ -221,19 +224,34 @@ class LogHandler(logging.Handler):
class Side ( object ) :
def __init__ ( self , stream , fd ) :
"""
Represent a single side of a : py : class : ` BasicStream ` . This exists to allow
streams implemented using unidirectional ( e . g . UNIX pipe ) and bidirectional
( e . g . UNIX socket ) file descriptors to operate identically .
"""
def __init__ ( self , stream , fd , keep_alive = False ) :
#: The :py:class:`Stream` for which this is a read or write side.
self . stream = stream
#: Integer file descriptor to perform IO on.
self . fd = fd
#: If ``True``, causes presence of this side in :py:class:`Broker`'s
#: active reader set to defer shutdown until the side is disconnected.
self . keep_alive = keep_alive
def __repr__ ( self ) :
return ' <Side of %r fd %s > ' % ( self . stream , self . fd )
def fileno ( self ) :
""" Return :py:attr:`fd` if it is not ``None``, otherwise raise
` ` StreamError ` ` . This method is implemented so that : py : class : ` Side `
can be used directly by : py : func : ` select . select ` . """
if self . fd is None :
raise StreamError ( ' %r .fileno() called but no FD set ' , self )
return self . fd
def close ( self ) :
""" Call :py:func:`os.close` on :py:attr:`fd` if it is not ``None``,
then set it to ` ` None ` ` . """
if self . fd is not None :
IOLOG . debug ( ' %r .close() ' , self )
os . close ( self . fd )
@ -241,36 +259,70 @@ class Side(object):
class BasicStream ( object ) :
read_side = None
write_side = None
"""
. . method : : on_disconnect ( broker )
Called by : py : class : ` Broker ` to force disconnect the stream . The base
implementation simply closes : py : attr : ` receive_side ` and
: py : attr : ` transmit_side ` and unregisters the stream from the broker .
. . method : : on_receive ( broker )
Called by : py : class : ` Broker ` when the stream ' s :py:attr:`receive_side` has
been marked readable using : py : meth : ` Broker . start_receive ` and the
broker has detected the associated file descriptor is ready for
reading .
Subclasses must implement this method if
: py : meth : ` Broker . start_receive ` is ever called on them , and the method
must call : py : meth : ` on_disconect ` if reading produces an empty string .
. . method : : on_transmit ( broker )
Called by : py : class : ` Broker ` when the stream ' s :py:attr:`transmit_side`
has been marked writeable using : py : meth : ` Broker . start_transmit ` and
the broker has detected the associated file descriptor is ready for
writing .
def on_disconnect ( self ) :
""" Close our associated descriptors. """
Subclasses must implement this method if
: py : meth : ` Broker . start_transmit ` is ever called on them .
. . method : : on_shutdown ( broker )
Called by : py : meth : ` Broker . shutdown ` to allow the stream time to
gracefully shutdown . The base implementation simply called
: py : meth : ` on_disconnect ` .
"""
#: A :py:class:`Side` representing the stream's receive file descriptor.
receive_side = None
#: A :py:class:`Side` representing the stream's transmit file descriptor.
transmit_side = None
def on_disconnect ( self , broker ) :
LOG . debug ( ' %r .on_disconnect() ' , self )
self . read_side . close ( )
self . write_side . close ( )
broker . stop_receive ( self )
broker . stop_transmit ( self )
self . receive_side . close ( )
self . transmit_side . close ( )
def on_shutdown ( self ) :
""" Disconnect gracefully. Base implementation calls on_disconnect(). """
def on_shutdown ( self , broker ) :
LOG . debug ( ' %r .on_shutdown() ' , self )
self . on_disconnect ( )
def has_output ( self ) :
return False
self . on_disconnect ( broker )
class Stream ( BasicStream ) :
"""
Initialize a new Stream instance .
: param context : Context to communicate with .
: py : class : ` BasicStream ` subclass implementing econtext ' s :ref:`stream
protocol < stream - protocol > ` .
"""
_input_buf = ' '
_output_buf = ' '
def __init__ ( self , context ) :
self . _context = context
self . _lock = threading . Lock ( )
self . _rhmac = hmac . new ( context . key , digestmod = sha )
self . _whmac = self . _rhmac . copy ( )
@ -285,18 +337,18 @@ class Stream(BasicStream):
unpickler . find_global = self . _find_global
return unpickler . load ( )
def on_receive ( self ):
def on_receive ( self , broker ):
""" Handle the next complete message on the stream. Raise
StreamError or IOError on failure . """
: py : class : ` StreamError ` on failure . """
IOLOG . debug ( ' %r .on_receive() ' , self )
buf = os . read ( self . re ad _side. fd , 4096 )
buf = os . read ( self . re ceive _side. fd , 4096 )
self . _input_buf + = buf
while self . _receive_one ( ) :
pass
if not buf :
return self . on_disconnect ( )
return self . on_disconnect ( broker )
def _receive_one ( self ) :
if len ( self . _input_buf ) < 24 :
@ -340,51 +392,41 @@ class Stream(BasicStream):
except Exception :
LOG . debug ( ' %r ._invoke( %r , %r ): %r crashed ' , self , handle , data , fn )
def on_transmit ( self ):
def on_transmit ( self , broker ):
""" Transmit buffered messages. """
IOLOG . debug ( ' %r .on_transmit() ' , self )
written = os . write ( self . write_side . fd , self . _output_buf [ : 4096 ] )
self . _lock . acquire ( )
try :
self . _output_buf = self . _output_buf [ written : ]
finally :
self . _lock . release ( )
if ( not self . _output_buf ) and not self . _context . broker . graceful_count :
self . on_disconnect ( )
def has_output ( self ) :
return bool ( self . _output_buf )
written = os . write ( self . transmit_side . fd , self . _output_buf [ : 4096 ] )
self . _output_buf = self . _output_buf [ written : ]
if not self . _output_buf :
broker . stop_transmit ( self )
def enqueue ( self , handle , obj ) :
""" Enqueue `obj` to `handle`, and tell the broker we have output. """
IOLOG . debug ( ' %r .enqueue( %r , %r ) ' , self , handle , obj )
def _enqueue ( self , handle , obj ) :
IOLOG . debug ( ' %r ._enqueue( %r , %r ) ' , self , handle , obj )
encoded = cPickle . dumps ( ( handle , obj ) , protocol = 2 )
msg = struct . pack ( ' >L ' , len ( encoded ) ) + encoded
self . _lock . acquire ( )
try :
self . _whmac . update ( msg )
self . _output_buf + = self . _whmac . digest ( ) + msg
finally :
self . _lock . release ( )
self . _context . broker . update_stream ( self )
self . _whmac . update ( msg )
self . _output_buf + = self . _whmac . digest ( ) + msg
self . _context . broker . start_transmit ( self )
def on_disconnect( self ) :
super ( Stream , self ) . on_disconnect ( )
if self . _context . stream is self :
self . _context . on_disconnect( )
def enqueue ( self , handle , obj ) :
""" Enqueue `obj` to `handle`, and tell the broker we have output. May
be called from any thread . """
self . _context . broker. on_thread ( self . _enqueue , handle , obj )
for handle , ( persist , fn ) in self . _context . _handle_map . iteritems ( ) :
LOG . debug ( ' %r .on_disconnect(): killing %r : %r ' , self , handle , fn )
fn ( _DEAD )
def on_disconnect ( self , broker ) :
super ( Stream , self ) . on_disconnect ( broker )
if self . _context . stream is self :
self . _context . on_disconnect ( broker )
def on_shutdown ( self ):
def on_shutdown ( self , broker ):
""" Override BasicStream behaviour of immediately disconnecting. """
LOG . debug ( ' %r .on_shutdown( %r ) ' , self , broker )
def accept ( self , rfd , wfd ) :
self . re ad _side = Side ( self , os . dup ( rfd ) )
self . write _side = Side ( self , os . dup ( wfd ) )
set_cloexec ( self . re ad _side. fd )
set_cloexec ( self . write _side. fd )
self . re ceive _side = Side ( self , os . dup ( rfd ) )
self . transmit _side = Side ( self , os . dup ( wfd ) )
set_cloexec ( self . re ceive _side. fd )
set_cloexec ( self . transmit _side. fd )
self . _context . stream = self
def connect ( self ) :
@ -392,13 +434,13 @@ class Stream(BasicStream):
Context . """
LOG . debug ( ' %r .connect() ' , self )
sock = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
self . re ad _side = Side ( self , sock . fileno ( ) )
self . write _side = Side ( self , sock . fileno ( ) )
self . re ceive _side = Side ( self , sock . fileno ( ) )
self . transmit _side = Side ( self , sock . fileno ( ) )
sock . connect ( self . _context . parent_addr )
self . enqueue ( 0 , self . _context . name )
def __repr__ ( self ) :
return ' %s ( <context= %r > )' % ( self . __class__ . __name__ , self . _context )
return ' %s ( %r )' % ( self . __class__ . __name__ , self . _context )
class Context ( object ) :
@ -419,13 +461,17 @@ class Context(object):
self . _last_handle = itertools . count ( 1000 )
self . _handle_map = { }
def on_shutdown ( self ) :
""" Slave does nothing, _broker_main() will shutdown its streams. """
def on_shutdown ( self , broker ) :
""" Called during :py:meth:`Broker.shutdown`, informs callbacks
registered with : py : meth : ` add_handle_cb ` the connection is dead . """
for handle , ( persist , fn ) in self . _handle_map . iteritems ( ) :
LOG . debug ( ' %r .on_disconnect(): killing %r : %r ' , self , handle , fn )
fn ( _DEAD )
def on_disconnect ( self ) :
def on_disconnect ( self , broker ):
self . stream = None
LOG . debug ( ' Parent stream is gone, dying. ' )
self . broker . shutdown ( )
broker . shutdown ( )
def alloc_handle ( self ) :
""" Allocate a handle. """
@ -456,7 +502,7 @@ class Context(object):
try :
data = queue . get ( True , deadline )
except Queue . Empty :
self . stream. on_disconnect ( )
self . broker. on_thread ( self . stream . on_disconnect , self . broker )
raise TimeoutError ( ' deadline exceeded. ' )
if data == _DEAD :
@ -476,19 +522,20 @@ class Waker(BasicStream):
rfd , wfd = os . pipe ( )
set_cloexec ( rfd )
set_cloexec ( wfd )
self . re ad _side = Side ( self , rfd )
self . write _side = Side ( self , wfd )
broker . update_stream ( self )
self . re ceive _side = Side ( self , rfd )
self . transmit _side = Side ( self , wfd )
broker . start_receive ( self )
def __repr__ ( self ) :
return ' <Waker> '
def wake ( self ) :
if self . write_side . fd :
os . write ( self . write_side . fd , ' ' )
if threading . currentThread ( ) != self . _broker . _thread and \
self . transmit_side . fd :
os . write ( self . transmit_side . fd , ' ' )
def on_receive ( self ):
os . read ( self . re ad _side. fd , 1 )
def on_receive ( self , broker ):
os . read ( self . re ceive _side. fd , 1 )
class IoLogger ( BasicStream ) :
@ -498,37 +545,35 @@ class IoLogger(BasicStream):
self . _broker = broker
self . _name = name
self . _log = logging . getLogger ( name )
self . _rsock , self . _wsock = socket . socketpair ( )
self . _rsock , self . _wsock = socket . socketpair ( )
os . dup2 ( self . _wsock . fileno ( ) , dest_fd )
set_cloexec ( self . _rsock . fileno ( ) )
set_cloexec ( self . _wsock . fileno ( ) )
self . read_side = Side ( self , self . _rsock . fileno ( ) )
self . write_side = Side ( self , dest_fd )
broker . graceful_count + = 1
self . _broker . update_stream ( self )
self . receive_side = Side ( self , self . _rsock . fileno ( ) , keep_alive = True )
self . transmit_side = Side ( self , dest_fd )
self . _broker . start_receive ( self )
def __repr__ ( self ) :
return ' <IoLogger %s fd %d >' % ( self . _name , self . read_side . fd )
return ' <IoLogger %s >' % ( self . _name , )
def _log_lines ( self ) :
while self . _buf . find ( ' \n ' ) != - 1 :
line , _ , self . _buf = self . _buf . partition ( ' \n ' )
self . _log . info ( ' %s ' , line . rstrip ( ' \n ' ) )
def on_shutdown ( self ):
def on_shutdown ( self , broker ):
LOG . debug ( ' %r .on_shutdown() ' , self )
self . _wsock . shutdown ( socket . SHUT_WR )
self . _wsock . close ( )
self . transmit_side . close ( )
def on_receive ( self ):
def on_receive ( self , broker ):
LOG . debug ( ' %r .on_receive() ' , self )
buf = os . read ( self . re ad _side. fd , 4096 )
buf = os . read ( self . re ceive _side. fd , 4096 )
if not buf :
LOG . debug ( ' %r decrement graceful_count ' , self )
self . _broker . graceful_count - = 1
return self . on_disconnect ( )
return self . on_disconnect ( broker )
self . _buf + = buf
self . _log_lines ( )
@ -536,15 +581,19 @@ class IoLogger(BasicStream):
class Broker ( object ) :
"""
Broker: r esponsible for tracking contexts , associated streams , and I / O
R esponsible for tracking contexts , their associated streams and I / O
multiplexing .
"""
_waker = None
graceful_count = 0
graceful_timeout = 3.0
_thread = None
#: Seconds grace to allow :py:class:`Streams <Stream>` to shutdown
#: gracefully before force-disconnecting them during :py:meth:`shutdown`.
shutdown_timeout = 3.0
def __init__ ( self ) :
self . _alive = True
self . _queue = Queue . Queue ( )
self . _contexts = { }
self . _readers = set ( )
self . _writers = set ( )
@ -553,77 +602,110 @@ class Broker(object):
name = ' econtext-broker ' )
self . _thread . start ( )
def _update_stream ( self , stream ) :
IOLOG . debug ( ' _update_stream( %r ) ' , stream )
if stream . read_side . fd is not None :
self . _readers . add ( stream . read_side )
else :
self . _readers . discard ( stream . read_side )
if stream . write_side . fd is not None and stream . has_output ( ) :
self . _writers . add ( stream . write_side )
def on_thread ( self , func , * args , * * kwargs ) :
if threading . currentThread ( ) == self . _thread :
func ( * args , * * kwargs )
else :
self . _writers . discard ( stream . write_side )
def update_stream ( self , stream ) :
self . _update_stream ( stream )
if self . _waker :
self . _waker . wake ( )
self . _queue . put ( ( func , args , kwargs ) )
if self . _waker :
self . _waker . wake ( )
def start_receive ( self , stream ) :
""" Mark the :py:attr:`receive_side <Stream.receive_side>` on `stream` as
ready for reading . May be called from any thread . When the associated
file descriptor becomes ready for reading ,
: py : meth : ` BasicStream . on_transmit ` will be called . """
IOLOG . debug ( ' %r .start_receive( %r ) ' , self , stream )
self . on_thread ( self . _readers . add , stream . receive_side )
def stop_receive ( self , stream ) :
IOLOG . debug ( ' %r .stop_receive( %r ) ' , self , stream )
self . on_thread ( self . _readers . discard , stream . receive_side )
def start_transmit ( self , stream ) :
IOLOG . debug ( ' %r .start_transmit( %r ) ' , self , stream )
self . on_thread ( self . _writers . add , stream . transmit_side )
def stop_transmit ( self , stream ) :
IOLOG . debug ( ' %r .stop_transmit( %r ) ' , self , stream )
self . on_thread ( self . _writers . discard , stream . transmit_side )
def register ( self , context ) :
""" Put a context under control of this broker. """
""" Register `context` with this broker. Registration simply calls
: py : meth : ` start_receive ` on the context ' s :py:class:`Stream`, and records
a reference to it so that : py : meth : ` Context . on_shutdown ` can be
called during : py : meth : ` shutdown ` . """
LOG . debug ( ' %r .register( %r ) -> r= %r w= %r ' , self , context ,
context . stream . read_side ,
context . stream . write_side )
self . update_stream ( context . stream )
context . stream . re ceive _side,
context . stream . transmit _side)
self . start_receive ( context . stream )
self . _contexts [ context . name ] = context
return context
def _call_and_update ( self , stream , func ) :
def _call ( self , stream , func ) :
try :
func ( )
func ( self )
except Exception :
LOG . exception ( ' %r crashed ' , stream )
stream . on_disconnect ( )
self . _update_stream ( stream )
stream . on_disconnect ( self )
def _loop_once ( self , timeout = None ) :
IOLOG . debug ( ' %r ._loop_once( %r ) ' , self , timeout )
#IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers])
#IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers])
while not self . _queue . empty ( ) :
func , args , kwargs = self . _queue . get ( )
func ( * args , * * kwargs )
#IOLOG.debug('readers = %r', self._readers)
#IOLOG.debug('writers = %r', self._writers)
rsides , wsides , _ = select . select ( self . _readers , self . _writers ,
( ) , timeout )
for side in rsides :
IOLOG . debug ( ' %r : POLLIN for %r ' , self , side . stream )
self . _call_and_update ( side . stream , side . stream . on_receive )
self . _call ( side . stream , side . stream . on_receive )
for side in wsides :
IOLOG . debug ( ' %r : POLLOUT for %r ' , self , side . stream )
self . _call_and_update ( side . stream , side . stream . on_transmit )
self . _call ( side . stream , side . stream . on_transmit )
def keep_alive ( self ) :
""" Return ``True`` if any reader ' s :py:attr:`Side.keep_alive`
attribute is ` ` True ` ` , or any : py : class : ` Context ` is still registered
that is not the master . Used to delay shutdown while some important
work is in progress ( e . g . log draining ) . """
return any ( c . stream and c . name != ' master '
for c in self . _contexts . itervalues ( ) ) or \
any ( side . keep_alive for side in self . _readers )
def _broker_main ( self ) :
""" Handle events until shutdown(). """
""" Handle events until :py:meth:`shutdown`. On shutdown, invoke
: py : meth : ` Stream . on_shutdown ` for every active stream , then allow up to
: py : attr : ` shutdown_timeout ` seconds for the streams to unregister
themselves before forcefully calling
: py : meth : ` Stream . on_disconnect ` . """
try :
while self . _alive :
self . _loop_once ( )
for side in self . _readers | self . _writers :
self . _call_and_update ( side . stream , side . stream . on_shutdown )
self . _call ( side . stream , side . stream . on_shutdown )
deadline = time . time ( ) + self . shutdown_timeout
while self . keep_alive ( ) and time . time ( ) < deadline :
self . _loop_once ( max ( 0 , deadline - time . time ( ) ) )
deadline = time . time ( ) + self . graceful_timeout
while ( ( self . _readers or self . _writers ) and
( self . graceful_count or time . time ( ) < deadline ) ) :
self . _loop_once ( 1.0 )
if self . keep_alive ( ) :
LOG . error ( ' %r : some streams did not close gracefully. '
' The most likely cause for this is one or '
' more child processes still connected to '
' ou stdout/stderr pipes. ' , self )
for context in self . _contexts . itervalues ( ) :
stream = context . stream
if stream :
stream . on_disconnect ( )
self . _update_stream ( stream )
context . on_shutdown ( self )
for side in self . _readers | self . _writers :
LOG . error ( ' _broker_main() force disconnecting %r ' , side )
side . stream . on_disconnect ( )
side . stream . on_disconnect ( self )
except Exception :
LOG . exception ( ' _broker_main() crashed ' )
@ -634,7 +716,8 @@ class Broker(object):
self . _waker . wake ( )
def join ( self ) :
""" Wait for the broker to stop. """
""" Wait for the broker to stop, expected to be called after
: py : meth : ` shutdown ` . """
self . _thread . join ( )
def __repr__ ( self ) :