@ -42,6 +42,7 @@ from ansible.module_utils.six.moves import queue as Queue
from ansible . module_utils . six import iteritems , itervalues , string_types
from ansible . module_utils . six import iteritems , itervalues , string_types
from ansible . module_utils . _text import to_text
from ansible . module_utils . _text import to_text
from ansible . module_utils . connection import Connection , ConnectionError
from ansible . module_utils . connection import Connection , ConnectionError
from ansible . playbook . handler import Handler
from ansible . playbook . helpers import load_list_of_blocks
from ansible . playbook . helpers import load_list_of_blocks
from ansible . playbook . included_file import IncludedFile
from ansible . playbook . included_file import IncludedFile
from ansible . playbook . task_include import TaskInclude
from ansible . playbook . task_include import TaskInclude
@ -85,7 +86,13 @@ def results_thread_main(strategy):
break
break
else :
else :
strategy . _results_lock . acquire ( )
strategy . _results_lock . acquire ( )
strategy . _results . append ( result )
# only handlers have the listen attr, so this must be a handler
# we split up the results into two queues here to make sure
# handler and regular result processing don't cross wires
if ' listen ' in result . _task_fields :
strategy . _handler_results . append ( result )
else :
strategy . _results . append ( result )
strategy . _results_lock . release ( )
strategy . _results_lock . release ( )
except ( IOError , EOFError ) :
except ( IOError , EOFError ) :
break
break
@ -96,7 +103,7 @@ def results_thread_main(strategy):
def debug_closure ( func ) :
def debug_closure ( func ) :
""" Closure to wrap ``StrategyBase._process_pending_results`` and invoke the task debugger """
""" Closure to wrap ``StrategyBase._process_pending_results`` and invoke the task debugger """
@functools.wraps ( func )
@functools.wraps ( func )
def inner ( self , iterator , one_pass = False , max_passes = None ):
def inner ( self , iterator , one_pass = False , max_passes = None , do_handlers = False ):
status_to_stats_map = (
status_to_stats_map = (
( ' is_failed ' , ' failures ' ) ,
( ' is_failed ' , ' failures ' ) ,
( ' is_unreachable ' , ' dark ' ) ,
( ' is_unreachable ' , ' dark ' ) ,
@ -107,7 +114,7 @@ def debug_closure(func):
# We don't know the host yet, copy the previous states, for lookup after we process new results
# We don't know the host yet, copy the previous states, for lookup after we process new results
prev_host_states = iterator . _host_states . copy ( )
prev_host_states = iterator . _host_states . copy ( )
results = func ( self , iterator , one_pass = one_pass , max_passes = max_passes )
results = func ( self , iterator , one_pass = one_pass , max_passes = max_passes , do_handlers = do_handlers )
_processed_results = [ ]
_processed_results = [ ]
for result in results :
for result in results :
@ -187,6 +194,7 @@ class StrategyBase:
# internal counters
# internal counters
self . _pending_results = 0
self . _pending_results = 0
self . _pending_handler_results = 0
self . _cur_worker = 0
self . _cur_worker = 0
# this dictionary is used to keep track of hosts that have
# this dictionary is used to keep track of hosts that have
@ -198,6 +206,7 @@ class StrategyBase:
self . _flushed_hosts = dict ( )
self . _flushed_hosts = dict ( )
self . _results = deque ( )
self . _results = deque ( )
self . _handler_results = deque ( )
self . _results_lock = threading . Condition ( threading . Lock ( ) )
self . _results_lock = threading . Condition ( threading . Lock ( ) )
# create the result processing thread for reading results in the background
# create the result processing thread for reading results in the background
@ -377,7 +386,10 @@ class StrategyBase:
elif self . _cur_worker == starting_worker :
elif self . _cur_worker == starting_worker :
time . sleep ( 0.0001 )
time . sleep ( 0.0001 )
self . _pending_results + = 1
if isinstance ( task , Handler ) :
self . _pending_handler_results + = 1
else :
self . _pending_results + = 1
except ( EOFError , IOError , AssertionError ) as e :
except ( EOFError , IOError , AssertionError ) as e :
# most likely an abort
# most likely an abort
display . debug ( " got an error while queuing: %s " % e )
display . debug ( " got an error while queuing: %s " % e )
@ -424,7 +436,7 @@ class StrategyBase:
_set_host_facts ( target_host , always_facts )
_set_host_facts ( target_host , always_facts )
@debug_closure
@debug_closure
def _process_pending_results ( self , iterator , one_pass = False , max_passes = None ):
def _process_pending_results ( self , iterator , one_pass = False , max_passes = None , do_handlers = False ):
'''
'''
Reads results off the final queue and takes appropriate action
Reads results off the final queue and takes appropriate action
based on the result ( executing callbacks , updating state , etc . ) .
based on the result ( executing callbacks , updating state , etc . ) .
@ -480,7 +492,10 @@ class StrategyBase:
while True :
while True :
try :
try :
self . _results_lock . acquire ( )
self . _results_lock . acquire ( )
task_result = self . _results . popleft ( )
if do_handlers :
task_result = self . _handler_results . popleft ( )
else :
task_result = self . _results . popleft ( )
except IndexError :
except IndexError :
break
break
finally :
finally :
@ -699,7 +714,10 @@ class StrategyBase:
# finally, send the ok for this task
# finally, send the ok for this task
self . _tqm . send_callback ( ' v2_runner_on_ok ' , task_result )
self . _tqm . send_callback ( ' v2_runner_on_ok ' , task_result )
self . _pending_results - = 1
if do_handlers :
self . _pending_handler_results - = 1
else :
self . _pending_results - = 1
if original_host . name in self . _blocked_hosts :
if original_host . name in self . _blocked_hosts :
del self . _blocked_hosts [ original_host . name ]
del self . _blocked_hosts [ original_host . name ]
@ -731,19 +749,19 @@ class StrategyBase:
handler_results = 0
handler_results = 0
display . debug ( " waiting for handler results... " )
display . debug ( " waiting for handler results... " )
while ( self . _pending_ results > 0 and
while ( self . _pending_ handler_ results > 0 and
handler_results < len ( notified_hosts ) and
handler_results < len ( notified_hosts ) and
not self . _tqm . _terminated ) :
not self . _tqm . _terminated ) :
if self . _tqm . has_dead_workers ( ) :
if self . _tqm . has_dead_workers ( ) :
raise AnsibleError ( " A worker was found in a dead state " )
raise AnsibleError ( " A worker was found in a dead state " )
results = self . _process_pending_results ( iterator )
results = self . _process_pending_results ( iterator , do_handlers = True )
ret_results . extend ( results )
ret_results . extend ( results )
handler_results + = len ( [
handler_results + = len ( [
r . _host for r in results if r . _host in notified_hosts and
r . _host for r in results if r . _host in notified_hosts and
r . task_name == handler . name ] )
r . task_name == handler . name ] )
if self . _pending_ results > 0 :
if self . _pending_ handler_ results > 0 :
time . sleep ( C . DEFAULT_INTERNAL_POLL_INTERVAL )
time . sleep ( C . DEFAULT_INTERNAL_POLL_INTERVAL )
display . debug ( " no more pending handlers, returning what we have " )
display . debug ( " no more pending handlers, returning what we have " )