@ -10,6 +10,7 @@ import itertools
import logging
import logging
import os
import os
import select
import select
import signal
import socket
import socket
import struct
import struct
import sys
import sys
@ -28,6 +29,7 @@ CALL_FUNCTION = 101
FORWARD_LOG = 102
FORWARD_LOG = 102
ADD_ROUTE = 103
ADD_ROUTE = 103
ALLOCATE_ID = 104
ALLOCATE_ID = 104
SHUTDOWN = 105
CHUNK_SIZE = 16384
CHUNK_SIZE = 16384
@ -1071,7 +1073,8 @@ class Broker(object):
attribute is ` ` True ` ` , or any : py : class : ` Context ` is still registered
attribute is ` ` True ` ` , or any : py : class : ` Context ` is still registered
that is not the master . Used to delay shutdown while some important
that is not the master . Used to delay shutdown while some important
work is in progress ( e . g . log draining ) . """
work is in progress ( e . g . log draining ) . """
return sum ( ( side . keep_alive for side in self . _readers ) , 0 )
return ( sum ( ( side . keep_alive for side in self . _readers ) , 0 ) +
( not self . _queue . empty ( ) ) )
def _broker_main ( self ) :
def _broker_main ( self ) :
""" Handle events until :py:meth:`shutdown`. On shutdown, invoke
""" Handle events until :py:meth:`shutdown`. On shutdown, invoke
@ -1104,6 +1107,8 @@ class Broker(object):
except Exception :
except Exception :
LOG . exception ( ' _broker_main() crashed ' )
LOG . exception ( ' _broker_main() crashed ' )
fire ( self , ' exit ' )
def shutdown ( self ) :
def shutdown ( self ) :
""" Request broker gracefully disconnect streams and stop. """
""" Request broker gracefully disconnect streams and stop. """
LOG . debug ( ' %r .shutdown() ' , self )
LOG . debug ( ' %r .shutdown() ' , self )
@ -1123,11 +1128,22 @@ class ExternalContext(object):
def _on_broker_shutdown ( self ) :
def _on_broker_shutdown ( self ) :
self . channel . close ( )
self . channel . close ( )
def _on_broker_exit ( self ) :
os . kill ( os . getpid ( ) , signal . SIGTERM )
def _on_shutdown_msg ( self , msg ) :
LOG . debug ( ' _on_shutdown_msg( %r ) ' , msg )
if msg . src_id != mitogen . parent_id :
LOG . warning ( ' Ignoring SHUTDOWN from non-parent: %r ' , msg )
return
self . broker . shutdown ( )
def _setup_master ( self , profiling , parent_id , context_id , in_fd , out_fd ) :
def _setup_master ( self , profiling , parent_id , context_id , in_fd , out_fd ) :
if profiling :
if profiling :
enable_profiling ( )
enable_profiling ( )
self . broker = Broker ( )
self . broker = Broker ( )
self . router = Router ( self . broker )
self . router = Router ( self . broker )
self . router . add_handler ( self . _on_shutdown_msg , SHUTDOWN )
self . master = Context ( self . router , 0 , ' master ' )
self . master = Context ( self . router , 0 , ' master ' )
if parent_id == 0 :
if parent_id == 0 :
self . parent = self . master
self . parent = self . master
@ -1141,6 +1157,7 @@ class ExternalContext(object):
self . stream . receive_side . keep_alive = False
self . stream . receive_side . keep_alive = False
listen ( self . broker , ' shutdown ' , self . _on_broker_shutdown )
listen ( self . broker , ' shutdown ' , self . _on_broker_shutdown )
listen ( self . broker , ' exit ' , self . _on_broker_exit )
os . close ( in_fd )
os . close ( in_fd )
try :
try :
@ -1224,6 +1241,7 @@ class ExternalContext(object):
self . router . route (
self . router . route (
Message . pickled ( e , dst_id = msg . src_id , handle = msg . reply_to )
Message . pickled ( e , dst_id = msg . src_id , handle = msg . reply_to )
)
)
self . dispatch_stopped = True
def main ( self , parent_ids , context_id , debug , profiling , log_level ,
def main ( self , parent_ids , context_id , debug , profiling , log_level ,
in_fd = 100 , out_fd = 1 , core_src_fd = 101 , setup_stdio = True ) :
in_fd = 100 , out_fd = 1 , core_src_fd = 101 , setup_stdio = True ) :