@ -544,23 +544,12 @@ class ClassicWorkerModel(WorkerModel):
"""
"""
Used to clean up in unit tests .
Used to clean up in unit tests .
"""
"""
# TODO: split this up a bit.
self . on_binding_close ( )
global _classic_worker_model
self . _on_process_exit ( )
assert self . parent_sock is not None
set_worker_model ( None )
self . parent_sock . close ( )
self . parent_sock = None
self . listener_path = None
self . router = None
self . parent = None
for mux in self . _muxes :
pid , status = os . waitpid ( mux . pid , 0 )
status = mitogen . fork . _convert_exit_status ( status )
LOG . debug ( ' mux PID %d %s ' , pid ,
mitogen . parent . returncode_to_str ( status ) )
global _classic_worker_model
_classic_worker_model = None
_classic_worker_model = None
set_worker_model ( None )
def on_strategy_start ( self ) :
def on_strategy_start ( self ) :
"""
"""
@ -593,6 +582,7 @@ class ClassicWorkerModel(WorkerModel):
self . broker . join ( )
self . broker . join ( )
self . router = None
self . router = None
self . broker = None
self . broker = None
self . parent = None
self . listener_path = None
self . listener_path = None
# #420: Ansible executes "meta" actions in the top-level process,
# #420: Ansible executes "meta" actions in the top-level process,
@ -708,8 +698,8 @@ class MuxProcess(object):
max_message_size = 4096 * 1048576 ,
max_message_size = 4096 * 1048576 ,
)
)
_setup_responder ( self . router . responder )
_setup_responder ( self . router . responder )
mitogen . core . listen ( self . broker , ' shutdown ' , self . on_broker_shutdown)
mitogen . core . listen ( self . broker , ' shutdown ' , self . _ on_broker_shutdown)
mitogen . core . listen ( self . broker , ' exit ' , self . on_broker_exit)
mitogen . core . listen ( self . broker , ' exit ' , self . _ on_broker_exit)
self . listener = mitogen . unix . Listener . build_stream (
self . listener = mitogen . unix . Listener . build_stream (
router = self . router ,
router = self . router ,
path = self . path ,
path = self . path ,
@ -729,26 +719,20 @@ class MuxProcess(object):
)
)
setup_pool ( self . pool )
setup_pool ( self . pool )
def on_broker_shutdown( self ) :
def _ on_broker_shutdown( self ) :
"""
"""
Respond to broker shutdown by beginning service pool shutdown . Do no t
Respond to broker shutdown by shutting down the pool . Do not join on i t
join on the pool yet, since that would block the broker thread which
yet, since that would block the broker thread which then cannot clean
then cannot clean up pending handlers , which is required for the
up pending handlers and connections , which is required for the threads
t hreads t o exit gracefully .
t o exit gracefully .
"""
"""
# In normal operation we presently kill the process because there is
self . pool . stop ( join = False )
# not yet any way to cancel connect().
self . pool . stop ( join = self . profiling )
def on_broker_exit( self ) :
def _on_broker_exit ( self ) :
"""
"""
Respond to the broker thread about to exit by sending SIGTERM to
Respond to the broker thread about to exit by finally joining on the
ourself . In future this should gracefully join the pool , but TERM is
pool . This is safe since pools only block in connection attempts , and
fine for now .
connection attempts fail with CancelledError when broker shutdown
begins .
"""
"""
if not os . environ . get ( ' MITOGEN_PROFILING ' ) :
self . pool . join ( )
# In normal operation we presently kill the process because there is
# not yet any way to cancel connect(). When profiling, threads
# including the broker must shut down gracefully, otherwise pstats
# won't be written.
os . kill ( os . getpid ( ) , signal . SIGTERM )