@ -120,9 +120,9 @@ def debug_closure(func):
for result in results :
for result in results :
task = result . _task
task = result . _task
host = result . _host
host = result . _host
_queue _task_args = self . _queue _task_args. pop ( ' %s %s ' % ( host . name , task . _uuid ) )
_queue d _task_args = self . _queue d_task_cache. pop ( ( host . name , task . _uuid ) , None )
task_vars = _queue _task_args[ ' task_vars ' ]
task_vars = _queue d _task_args[ ' task_vars ' ]
play_context = _queue _task_args[ ' play_context ' ]
play_context = _queue d _task_args[ ' play_context ' ]
# Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state
# Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state
try :
try :
prev_host_state = prev_host_states [ host . name ]
prev_host_state = prev_host_states [ host . name ]
@ -179,7 +179,11 @@ class StrategyBase:
self . _final_q = tqm . _final_q
self . _final_q = tqm . _final_q
self . _step = getattr ( tqm . _options , ' step ' , False )
self . _step = getattr ( tqm . _options , ' step ' , False )
self . _diff = getattr ( tqm . _options , ' diff ' , False )
self . _diff = getattr ( tqm . _options , ' diff ' , False )
self . _queue_task_args = { }
# the task cache is a dictionary of tuples of (host.name, task._uuid)
# used to find the original task object of in-flight tasks and to store
# the task args/vars and play context info used to queue the task.
self . _queued_task_cache = { }
# Backwards compat: self._display isn't really needed, just import the global display and use that.
# Backwards compat: self._display isn't really needed, just import the global display and use that.
self . _display = display
self . _display = display
@ -270,13 +274,6 @@ class StrategyBase:
def _queue_task ( self , host , task , task_vars , play_context ) :
def _queue_task ( self , host , task , task_vars , play_context ) :
''' handles queueing the task up to be sent to a worker '''
''' handles queueing the task up to be sent to a worker '''
self . _queue_task_args [ ' %s %s ' % ( host . name , task . _uuid ) ] = {
' host ' : host ,
' task ' : task ,
' task_vars ' : task_vars ,
' play_context ' : play_context
}
display . debug ( " entering _queue_task() for %s / %s " % ( host . name , task . action ) )
display . debug ( " entering _queue_task() for %s / %s " % ( host . name , task . action ) )
# Add a write lock for tasks.
# Add a write lock for tasks.
@ -306,6 +303,13 @@ class StrategyBase:
while True :
while True :
( worker_prc , rslt_q ) = self . _workers [ self . _cur_worker ]
( worker_prc , rslt_q ) = self . _workers [ self . _cur_worker ]
if worker_prc is None or not worker_prc . is_alive ( ) :
if worker_prc is None or not worker_prc . is_alive ( ) :
self . _queued_task_cache [ ( host . name , task . _uuid ) ] = {
' host ' : host ,
' task ' : task ,
' task_vars ' : task_vars ,
' play_context ' : play_context
}
worker_prc = WorkerProcess ( self . _final_q , task_vars , host , task , play_context , self . _loader , self . _variable_manager , shared_loader_obj )
worker_prc = WorkerProcess ( self . _final_q , task_vars , host , task , play_context , self . _loader , self . _variable_manager , shared_loader_obj )
self . _workers [ self . _cur_worker ] [ 0 ] = worker_prc
self . _workers [ self . _cur_worker ] [ 0 ] = worker_prc
worker_prc . start ( )
worker_prc . start ( )
@ -425,7 +429,8 @@ class StrategyBase:
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
original_host = get_original_host ( task_result . _host )
original_host = get_original_host ( task_result . _host )
found_task = iterator . get_original_task ( original_host , task_result . _task )
queue_cache_entry = ( original_host . name , task_result . _task )
found_task = self . _queued_task_cache . get ( queue_cache_entry ) [ ' task ' ]
original_task = found_task . copy ( exclude_parent = True , exclude_tasks = True )
original_task = found_task . copy ( exclude_parent = True , exclude_tasks = True )
original_task . _parent = found_task . _parent
original_task . _parent = found_task . _parent
original_task . from_attrs ( task_result . _task_fields )
original_task . from_attrs ( task_result . _task_fields )
@ -854,8 +859,6 @@ class StrategyBase:
host_results = [ ]
host_results = [ ]
for host in notified_hosts :
for host in notified_hosts :
if not handler . has_triggered ( host ) and ( not iterator . is_failed ( host ) or play_context . force_handlers ) :
if not handler . has_triggered ( host ) and ( not iterator . is_failed ( host ) or play_context . force_handlers ) :
if handler . _uuid not in iterator . _task_uuid_cache :
iterator . _task_uuid_cache [ handler . _uuid ] = handler
task_vars = self . _variable_manager . get_vars ( play = iterator . _play , host = host , task = handler )
task_vars = self . _variable_manager . get_vars ( play = iterator . _play , host = host , task = handler )
self . add_tqm_variables ( task_vars , play = iterator . _play )
self . add_tqm_variables ( task_vars , play = iterator . _play )
self . _queue_task ( host , handler , task_vars , play_context )
self . _queue_task ( host , handler , task_vars , play_context )