@ -55,6 +55,9 @@ LOG = logging.getLogger('mitogen')
IOLOG = logging . getLogger ( ' mitogen.io ' )
IOLOG . setLevel ( logging . INFO )
_v = False
_vv = False
GET_MODULE = 100
CALL_FUNCTION = 101
FORWARD_LOG = 102
@ -174,7 +177,7 @@ def io_op(func, *args):
try :
return func ( * args ) , False
except OSError , e :
IOLOG. debug ( ' io_op( %r ) -> OSError: %s ' , func , e )
_vv and IOLOG. debug ( ' io_op( %r ) -> OSError: %s ' , func , e )
if e . errno not in ( errno . EIO , errno . ECONNRESET , errno . EPIPE ) :
raise
return None , True
@ -264,7 +267,7 @@ class Message(object):
def unpickle ( self , throw = True ) :
""" Deserialize `data` into an object. """
IOLOG. debug ( ' %r .unpickle() ' , self )
_vv and IOLOG. debug ( ' %r .unpickle() ' , self )
fp = cStringIO . StringIO ( self . data )
unpickler = cPickle . Unpickler ( fp )
unpickler . find_global = self . _find_global
@ -300,7 +303,7 @@ class Sender(object):
def close ( self ) :
""" Indicate this channel is closed to the remote side. """
IOLOG. debug ( ' %r .close() ' , self )
_vv and IOLOG. debug ( ' %r .close() ' , self )
self . context . send (
Message . pickled (
_DEAD ,
@ -310,7 +313,7 @@ class Sender(object):
def put ( self , data ) :
""" Send `data` to the remote. """
IOLOG. debug ( ' %r .put( %r ..) ' , self , data [ : 100 ] )
_vv and IOLOG. debug ( ' %r .put( %r ..) ' , self , data [ : 100 ] )
self . context . send (
Message . pickled (
data ,
@ -335,7 +338,7 @@ class Receiver(object):
def _on_receive ( self , msg ) :
""" Callback from the Stream; appends data to the internal queue. """
IOLOG. debug ( ' %r ._on_receive( %r ) ' , self , msg )
_vv and IOLOG. debug ( ' %r ._on_receive( %r ) ' , self , msg )
self . _latch . put ( msg )
if self . notify :
self . notify ( self )
@ -347,7 +350,7 @@ class Receiver(object):
return self . _latch . empty ( )
def get ( self , timeout = None , block = True ) :
IOLOG. debug ( ' %r .get(timeout= %r , block= %r ) ' , self , timeout , block )
_vv and IOLOG. debug ( ' %r .get(timeout= %r , block= %r ) ' , self , timeout , block )
msg = self . _latch . get ( timeout = timeout , block = block )
#IOLOG.debug('%r.get() got %r', self, msg)
@ -419,22 +422,22 @@ class Importer(object):
fullname = fullname . rstrip ( ' . ' )
try :
pkgname , _ , _ = fullname . rpartition ( ' . ' )
LOG. debug ( ' %r .find_module( %r ) ' , self , fullname )
_v and LOG. debug ( ' %r .find_module( %r ) ' , self , fullname )
if fullname not in self . _present . get ( pkgname , ( fullname , ) ) :
LOG. debug ( ' %r : master doesn \' t know %r ' , self , fullname )
_v and LOG. debug ( ' %r : master doesn \' t know %r ' , self , fullname )
return None
pkg = sys . modules . get ( pkgname )
if pkg and getattr ( pkg , ' __loader__ ' , None ) is not self :
LOG. debug ( ' %r : %r is submodule of a package we did not load ' ,
_v and LOG. debug ( ' %r : %r is submodule of a package we did not load ' ,
self , fullname )
return None
try :
__import__ ( fullname , { } , { } , [ ' ' ] )
LOG. debug ( ' %r : %r is available locally ' , self , fullname )
_v and LOG. debug ( ' %r : %r is available locally ' , self , fullname )
except ImportError :
LOG. debug ( ' find_module( %r ) returning self ' , fullname )
_v and LOG. debug ( ' find_module( %r ) returning self ' , fullname )
return self
finally :
del _tls . running
@ -470,7 +473,7 @@ class Importer(object):
def _on_load_module ( self , msg ) :
tup = msg . unpickle ( )
fullname = tup [ 0 ]
LOG. debug ( ' Importer._on_load_module( %r ) ' , fullname )
_v and LOG. debug ( ' Importer._on_load_module( %r ) ' , fullname )
self . _lock . acquire ( )
try :
@ -489,10 +492,10 @@ class Importer(object):
if not present :
funcs = self . _callbacks . get ( fullname )
if funcs is not None :
LOG. debug ( ' _request_module( %r ): in flight ' , fullname )
_v and LOG. debug ( ' _request_module( %r ): in flight ' , fullname )
funcs . append ( callback )
else :
LOG. debug ( ' _request_module( %r ): new request ' , fullname )
_v and LOG. debug ( ' _request_module( %r ): new request ' , fullname )
self . _callbacks [ fullname ] = [ callback ]
self . _context . send ( Message ( data = fullname , handle = GET_MODULE ) )
finally :
@ -502,7 +505,7 @@ class Importer(object):
callback ( )
def load_module ( self , fullname ) :
LOG. debug ( ' Importer.load_module( %r ) ' , fullname )
_v and LOG. debug ( ' Importer.load_module( %r ) ' , fullname )
self . _load_module_hacks ( fullname )
event = threading . Event ( )
@ -573,7 +576,7 @@ class Side(object):
def close ( self ) :
if self . fd is not None :
IOLOG. debug ( ' %r .close() ' , self )
_vv and IOLOG. debug ( ' %r .close() ' , self )
os . close ( self . fd )
self . fd = None
@ -608,7 +611,7 @@ class BasicStream(object):
fire ( self , ' disconnect ' )
def on_shutdown ( self , broker ) :
LOG. debug ( ' %r .on_shutdown() ' , self )
_v and LOG. debug ( ' %r .on_shutdown() ' , self )
fire ( self , ' shutdown ' )
self . on_disconnect ( broker )
@ -638,7 +641,7 @@ class Stream(BasicStream):
def on_receive ( self , broker ) :
""" Handle the next complete message on the stream. Raise
: py : class : ` StreamError ` on failure . """
IOLOG. debug ( ' %r .on_receive() ' , self )
_vv and IOLOG. debug ( ' %r .on_receive() ' , self )
buf = self . receive_side . read ( )
if buf is None :
@ -669,7 +672,7 @@ class Stream(BasicStream):
)
if ( len ( self . _input_buf ) - self . HEADER_LEN ) < msg_len :
IOLOG. debug ( ' %r : Input too short (want %d , got %d ) ' ,
_vv and IOLOG. debug ( ' %r : Input too short (want %d , got %d ) ' ,
self , msg_len , len ( self . _input_buf ) - self . HEADER_LEN )
return False
@ -680,25 +683,25 @@ class Stream(BasicStream):
def on_transmit ( self , broker ) :
""" Transmit buffered messages. """
IOLOG. debug ( ' %r .on_transmit() ' , self )
_vv and IOLOG. debug ( ' %r .on_transmit() ' , self )
if self . _output_buf :
buf = self . _output_buf . popleft ( )
written = self . transmit_side . write ( buf )
if not written :
LOG. debug ( ' %r .on_transmit(): disconnection detected ' , self )
_v and LOG. debug ( ' %r .on_transmit(): disconnection detected ' , self )
self . on_disconnect ( broker )
return
elif written != len ( buf ) :
self . _output_buf . appendleft ( buf [ written : ] )
IOLOG. debug ( ' %r .on_transmit() -> len %d ' , self , written )
_vv and IOLOG. debug ( ' %r .on_transmit() -> len %d ' , self , written )
if not self . _output_buf :
broker . stop_transmit ( self )
def _send ( self , msg ) :
IOLOG. debug ( ' %r ._send( %r ) ' , self , msg )
_vv and IOLOG. debug ( ' %r ._send( %r ) ' , self , msg )
pkt = struct . pack ( ' >hhhLLL ' , msg . dst_id , msg . src_id , msg . auth_id ,
msg . handle , msg . reply_to or 0 , len ( msg . data )
) + msg . data
@ -716,7 +719,7 @@ class Stream(BasicStream):
def on_shutdown ( self , broker ) :
""" Override BasicStream behaviour of immediately disconnecting. """
LOG. debug ( ' %r .on_shutdown( %r ) ' , self , broker )
_v and LOG. debug ( ' %r .on_shutdown( %r ) ' , self , broker )
def accept ( self , rfd , wfd ) :
# TODO: what is this os.dup for?
@ -742,7 +745,7 @@ class Context(object):
return _unpickle_context , ( self . context_id , self . name )
def on_disconnect ( self , broker ) :
LOG. debug ( ' Parent stream is gone, dying. ' )
_v and LOG. debug ( ' Parent stream is gone, dying. ' )
fire ( self , ' disconnect ' )
broker . shutdown ( )
@ -762,7 +765,7 @@ class Context(object):
receiver = Receiver ( self . router , persist = persist , respondent = self )
msg . reply_to = receiver . handle
LOG. debug ( ' %r .send_async( %r ) ' , self , msg )
_v and LOG. debug ( ' %r .send_async( %r ) ' , self , msg )
self . send ( msg )
return receiver
@ -771,7 +774,7 @@ class Context(object):
receiver = self . send_async ( msg )
response = receiver . get ( deadline )
data = response . unpickle ( )
IOLOG. debug ( ' %r ._send_await() -> %r ' , self , data )
_vv and IOLOG. debug ( ' %r ._send_await() -> %r ' , self , data )
return data
def __repr__ ( self ) :
@ -829,7 +832,7 @@ class Latch(object):
self . lock . release ( )
def put ( self , obj ) :
IOLOG. debug ( ' %r .put( %r ) ' , self , obj )
_vv and IOLOG. debug ( ' %r .put( %r ) ' , self , obj )
self . lock . acquire ( )
try :
self . queue . append ( obj )
@ -838,7 +841,7 @@ class Latch(object):
self . _wake ( self . wake_socks . pop ( 0 ) )
finally :
self . lock . release ( )
LOG. debug ( ' put() done. woken? %s ' , woken )
_v and LOG. debug ( ' put() done. woken? %s ' , woken )
def _wake ( self , sock ) :
try :
@ -878,7 +881,7 @@ class Waker(BasicStream):
Write a byte to the self - pipe , causing the IO multiplexer to wake up .
Nothing is written if the current thread is the IO multiplexer thread .
"""
IOLOG. debug ( ' %r .wake() [fd= %r ] ' , self , self . transmit_side . fd )
_vv and IOLOG. debug ( ' %r .wake() [fd= %r ] ' , self , self . transmit_side . fd )
if threading . currentThread ( ) != self . _broker . _thread :
try :
self . transmit_side . write ( ' ' )
@ -918,13 +921,13 @@ class IoLogger(BasicStream):
def on_shutdown ( self , broker ) :
""" Shut down the write end of the logging socket. """
LOG. debug ( ' %r .on_shutdown() ' , self )
_v and LOG. debug ( ' %r .on_shutdown() ' , self )
self . _wsock . shutdown ( socket . SHUT_WR )
self . _wsock . close ( )
self . transmit_side . close ( )
def on_receive ( self , broker ) :
IOLOG. debug ( ' %r .on_receive() ' , self )
_vv and IOLOG. debug ( ' %r .on_receive() ' , self )
buf = os . read ( self . receive_side . fd , CHUNK_SIZE )
if not buf :
return self . on_disconnect ( broker )
@ -940,6 +943,11 @@ class Router(object):
self . broker = broker
listen ( broker , ' shutdown ' , self . on_broker_shutdown )
# Here seems as good a place as any.
global _v , _vv
_v = logging . getLogger ( ) . level < = logging . DEBUG
_vv = IOLOG . level < = logging . DEBUG
#: context ID -> Stream
self . _stream_by_id = { }
#: List of contexts to notify of shutdown.
@ -970,7 +978,7 @@ class Router(object):
context . on_shutdown ( self . broker )
def add_route ( self , target_id , via_id ) :
LOG. debug ( ' %r .add_route( %r , %r ) ' , self , target_id , via_id )
_v and LOG. debug ( ' %r .add_route( %r , %r ) ' , self , target_id , via_id )
try :
self . _stream_by_id [ target_id ] = self . _stream_by_id [ via_id ]
except KeyError :
@ -983,14 +991,14 @@ class Router(object):
self . add_route ( target_id , via_id )
def register ( self , context , stream ) :
LOG. debug ( ' register( %r , %r ) ' , context , stream )
_v and LOG. debug ( ' register( %r , %r ) ' , context , stream )
self . _stream_by_id [ context . context_id ] = stream
self . _context_by_id [ context . context_id ] = context
self . broker . start_receive ( stream )
def add_handler ( self , fn , handle = None , persist = True , respondent = None ) :
handle = handle or self . _last_handle . next ( )
IOLOG. debug ( ' %r .add_handler( %r , %r , %r ) ' , self , fn , handle , persist )
_vv and IOLOG. debug ( ' %r .add_handler( %r , %r , %r ) ' , self , fn , handle , persist )
self . _handle_map [ handle ] = persist , fn
if respondent :
@ -1005,10 +1013,10 @@ class Router(object):
def on_shutdown ( self , broker ) :
""" Called during :py:meth:`Broker.shutdown`, informs callbacks
registered with : py : meth : ` add_handle_cb ` the connection is dead . """
LOG. debug ( ' %r .on_shutdown( %r ) ' , self , broker )
_v and LOG. debug ( ' %r .on_shutdown( %r ) ' , self , broker )
fire ( self , ' shutdown ' )
for handle , ( persist , fn ) in self . _handle_map . iteritems ( ) :
LOG. debug ( ' %r .on_shutdown(): killing %r : %r ' , self , handle , fn )
_v and LOG. debug ( ' %r .on_shutdown(): killing %r : %r ' , self , handle , fn )
fn ( _DEAD )
def _invoke ( self , msg ) :
@ -1028,7 +1036,7 @@ class Router(object):
LOG . exception ( ' %r ._invoke( %r ): %r crashed ' , self , msg , fn )
def _async_route ( self , msg , stream = None ) :
IOLOG. debug ( ' %r ._async_route( %r , %r ) ' , self , msg , stream )
_vv and IOLOG. debug ( ' %r ._async_route( %r , %r ) ' , self , msg , stream )
# Perform source verification.
if stream is not None :
expected_stream = self . _stream_by_id . get ( msg . auth_id ,
@ -1094,7 +1102,7 @@ class Broker(object):
lst . append ( value )
def start_receive ( self , stream ) :
IOLOG. debug ( ' %r .start_receive( %r ) ' , self , stream )
_vv and IOLOG. debug ( ' %r .start_receive( %r ) ' , self , stream )
assert stream . receive_side and stream . receive_side . fd is not None
self . defer ( self . _list_add , self . _readers , stream . receive_side )
@ -1129,7 +1137,7 @@ class Broker(object):
self . shutdown ( )
def _loop_once ( self , timeout = None ) :
IOLOG. debug ( ' %r ._loop_once( %r ) ' , self , timeout )
_vv and IOLOG. debug ( ' %r ._loop_once( %r ) ' , self , timeout )
self . _run_defer ( )
#IOLOG.debug('readers = %r', self._readers)
@ -1137,11 +1145,11 @@ class Broker(object):
rsides , wsides , _ = select . select ( self . _readers , self . _writers ,
( ) , timeout )
for side in rsides :
IOLOG. debug ( ' %r : POLLIN for %r ' , self , side )
_vv and IOLOG. debug ( ' %r : POLLIN for %r ' , self , side )
self . _call ( side . stream , side . stream . on_receive )
for side in wsides :
IOLOG. debug ( ' %r : POLLOUT for %r ' , self , side )
_vv and IOLOG. debug ( ' %r : POLLOUT for %r ' , self , side )
self . _call ( side . stream , side . stream . on_transmit )
def keep_alive ( self ) :
@ -1178,7 +1186,7 @@ class Broker(object):
fire ( self , ' exit ' )
def shutdown ( self ) :
LOG. debug ( ' %r .shutdown() ' , self )
_v and LOG. debug ( ' %r .shutdown() ' , self )
self . _alive = False
self . _waker . wake ( )
@ -1198,7 +1206,7 @@ class ExternalContext(object):
os . kill ( os . getpid ( ) , signal . SIGTERM )
def _on_shutdown_msg ( self , msg ) :
LOG. debug ( ' _on_shutdown_msg( %r ) ' , msg )
_v and LOG. debug ( ' _on_shutdown_msg( %r ) ' , msg )
if msg . src_id != mitogen . parent_id :
LOG . warning ( ' Ignoring SHUTDOWN from non-parent: %r ' , msg )
return
@ -1286,7 +1294,7 @@ class ExternalContext(object):
def _dispatch_calls ( self ) :
for msg in self . channel :
data = msg . unpickle ( throw = False )
LOG. debug ( ' _dispatch_calls( %r ) ' , data )
_v and LOG. debug ( ' _dispatch_calls( %r ) ' , data )
if msg . auth_id not in mitogen . parent_ids :
LOG . warning ( ' CALL_FUNCTION from non-parent %r ' , msg . auth_id )
@ -1302,7 +1310,7 @@ class ExternalContext(object):
kwargs . setdefault ( ' router ' , self . router )
msg . reply ( fn ( * args , * * kwargs ) )
except Exception , e :
LOG. debug ( ' _dispatch_calls: %s ' , e )
_v and LOG. debug ( ' _dispatch_calls: %s ' , e )
msg . reply ( CallError ( e ) )
self . dispatch_stopped = True
@ -1320,12 +1328,12 @@ class ExternalContext(object):
self . router . register ( self . parent , self . stream )
sys . executable = os . environ . pop ( ' ARGV0 ' , sys . executable )
LOG. debug ( ' Connected to %s ; my ID is %r , PID is %r ' ,
_v and LOG. debug ( ' Connected to %s ; my ID is %r , PID is %r ' ,
self . parent , context_id , os . getpid ( ) )
LOG. debug ( ' Recovered sys.executable: %r ' , sys . executable )
_v and LOG. debug ( ' Recovered sys.executable: %r ' , sys . executable )
_profile_hook ( ' main ' , self . _dispatch_calls )
LOG. debug ( ' ExternalContext.main() normal exit ' )
_v and LOG. debug ( ' ExternalContext.main() normal exit ' )
except BaseException :
LOG . exception ( ' ExternalContext.main() crashed ' )
raise