@ -45,7 +45,7 @@ from ansible.module_utils.connection import Connection, ConnectionError
from ansible . playbook . helpers import load_list_of_blocks
from ansible . playbook . included_file import IncludedFile
from ansible . playbook . task_include import TaskInclude
from ansible . plugins . loader import action_loader , connection_loader , filter_loader , lookup_loader , module_loader , test _loader
from ansible . plugins import loader as plugin _loader
from ansible . template import Templar
from ansible . utils . display import Display
from ansible . utils . vars import combine_vars
@ -60,21 +60,12 @@ class StrategySentinel:
pass
# TODO: this should probably be in the plugins/__init__.py, with
# a smarter mechanism to set all of the attributes based on
# the loaders created there
class SharedPluginLoaderObj :
def SharedPluginLoaderObj ( ) :
''' This only exists for backwards compat, do not use.
'''
A simple object to make pass the various plugin loaders to
the forked processes over the queue easier
'''
def __init__ ( self ) :
self . action_loader = action_loader
self . connection_loader = connection_loader
self . filter_loader = filter_loader
self . test_loader = test_loader
self . lookup_loader = lookup_loader
self . module_loader = module_loader
display . deprecated ( ' SharedPluginLoaderObj is deprecated, please directly use ansible.plugins.loader ' ,
version = ' 2.11 ' )
return plugin_loader
_sentinel = StrategySentinel ( )
@ -207,8 +198,29 @@ class StrategyBase:
# play completion
self . _active_connections = dict ( )
# Caches for get_host calls, to avoid calling excessively
# These values should be set at the top of the ``run`` method of each
# strategy plugin. Use ``_set_hosts_cache`` to set these values
self . _hosts_cache = [ ]
self . _hosts_cache_all = [ ]
self . debugger_active = C . ENABLE_TASK_DEBUGGER
def _set_hosts_cache ( self , play , refresh = True ) :
""" Responsible for setting _hosts_cache and _hosts_cache_all
See comment in ` ` __init__ ` ` for the purpose of these caches
"""
if not refresh and all ( ( self . _hosts_cache , self . _hosts_cache_all ) ) :
return
if Templar ( None ) . is_template ( play . hosts ) :
_pattern = ' all '
else :
_pattern = play . hosts or ' all '
self . _hosts_cache_all = [ h . name for h in self . _inventory . get_hosts ( pattern = _pattern , ignore_restrictions = True ) ]
self . _hosts_cache = [ h . name for h in self . _inventory . get_hosts ( play . hosts , order = play . order ) ]
def cleanup ( self ) :
# close active persistent connections
for sock in itervalues ( self . _active_connections ) :
@ -227,8 +239,12 @@ class StrategyBase:
# This should be safe, as everything should be ITERATING_COMPLETE by
# this point, though the strategy may not advance the hosts itself.
inv_hosts = self . _inventory . get_hosts ( iterator . _play . hosts , order = iterator . _play . order )
[ iterator . get_next_task_for_host ( host ) for host in inv_hosts if host . name not in self . _tqm . _unreachable_hosts ]
for host in self . _hosts_cache :
if host not in self . _tqm . _unreachable_hosts :
try :
iterator . get_next_task_for_host ( self . _inventory . hosts [ host ] )
except KeyError :
iterator . get_next_task_for_host ( self . _inventory . get_host ( host ) )
# save the failed/unreachable hosts, as the run_handlers()
# method will clear that information during its execution
@ -258,19 +274,21 @@ class StrategyBase:
return self . _tqm . RUN_OK
def get_hosts_remaining ( self , play ) :
return [ host for host in self . _inventory . get_hosts ( play . hosts )
if host . name not in self . _tqm . _failed_hosts and host . name not in self . _tqm . _unreachable_hosts ]
self . _set_hosts_cache ( play , refresh = False )
ignore = set ( self . _tqm . _failed_hosts ) . union ( self . _tqm . _unreachable_hosts )
return [ host for host in self . _hosts_cache if host not in ignore ]
def get_failed_hosts ( self , play ) :
return [ host for host in self . _inventory . get_hosts ( play . hosts ) if host . name in self . _tqm . _failed_hosts ]
self . _set_hosts_cache ( play , refresh = False )
return [ host for host in self . _hosts_cache if host in self . _tqm . _failed_hosts ]
def add_tqm_variables ( self , vars , play ) :
'''
Base class method to add extra variables / information to the list of task
vars sent through the executor engine regarding the task queue manager state .
'''
vars [ ' ansible_current_hosts ' ] = [ h . name for h in self . get_hosts_remaining ( play ) ]
vars [ ' ansible_failed_hosts ' ] = [ h . name for h in self . get_failed_hosts ( play ) ]
vars [ ' ansible_current_hosts ' ] = self . get_hosts_remaining ( play )
vars [ ' ansible_failed_hosts ' ] = self . get_failed_hosts ( play )
def _queue_task ( self , host , task , task_vars , play_context ) :
''' handles queueing the task up to be sent to a worker '''
@ -294,11 +312,6 @@ class StrategyBase:
# and then queue the new task
try :
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj ( )
queued = False
starting_worker = self . _cur_worker
while True :
@ -311,7 +324,7 @@ class StrategyBase:
' play_context ' : play_context
}
worker_prc = WorkerProcess ( self . _final_q , task_vars , host , task , play_context , self . _loader , self . _variable_manager , shared_loader_obj )
worker_prc = WorkerProcess ( self . _final_q , task_vars , host , task , play_context , self . _loader , self . _variable_manager , plugin_loader )
self . _workers [ self . _cur_worker ] = worker_prc
self . _tqm . send_callback ( ' v2_runner_on_start ' , host , task )
worker_prc . start ( )
@ -334,24 +347,19 @@ class StrategyBase:
def get_task_hosts ( self , iterator , task_host , task ) :
if task . run_once :
host_list = [ host for host in self . _ inventory. get_hosts ( iterator . _play . hosts ) if host . name not in self . _tqm . _unreachable_hosts ]
host_list = [ host for host in self . _ hosts_cache if host not in self . _tqm . _unreachable_hosts ]
else :
host_list = [ task_host ]
host_list = [ task_host . name ]
return host_list
def get_delegated_hosts ( self , result , task ) :
host_name = result . get ( ' _ansible_delegated_vars ' , { } ) . get ( ' ansible_delegated_host ' , None )
if host_name is not None :
actual_host = self . _inventory . get_host ( host_name )
if actual_host is None :
actual_host = Host ( name = host_name )
else :
actual_host = Host ( name = task . delegate_to )
return [ actual_host ]
return [ host_name or task . delegate_to ]
def get_handler_templar ( self , handler_task , iterator ) :
handler_vars = self . _variable_manager . get_vars ( play = iterator . _play , task = handler_task )
handler_vars = self . _variable_manager . get_vars ( play = iterator . _play , task = handler_task ,
_hosts = self . _hosts_cache ,
_hosts_all = self . _hosts_cache_all )
return Templar ( loader = self . _loader , variables = handler_vars )
@debug_closure
@ -703,6 +711,7 @@ class StrategyBase:
# Check if host in inventory, add if not
if host_name not in self . _inventory . hosts :
self . _inventory . add_host ( host_name , ' all ' )
self . _hosts_cache_all . append ( host_name )
new_host = self . _inventory . hosts . get ( host_name )
# Set/update the vars for this host
@ -882,7 +891,7 @@ class StrategyBase:
bypass_host_loop = False
try :
action = action_loader. get ( handler . action , class_only = True )
action = plugin_loader. action_loader. get ( handler . action , class_only = True )
if getattr ( action , ' BYPASS_HOST_LOOP ' , False ) :
bypass_host_loop = True
except KeyError :
@ -893,7 +902,8 @@ class StrategyBase:
host_results = [ ]
for host in notified_hosts :
if not iterator . is_failed ( host ) or iterator . _play . force_handlers :
task_vars = self . _variable_manager . get_vars ( play = iterator . _play , host = host , task = handler )
task_vars = self . _variable_manager . get_vars ( play = iterator . _play , host = host , task = handler ,
_hosts = self . _hosts_cache , _hosts_all = self . _hosts_cache_all )
self . add_tqm_variables ( task_vars , play = iterator . _play )
templar = Templar ( loader = self . _loader , variables = task_vars )
if not handler . cached_name :
@ -993,7 +1003,8 @@ class StrategyBase:
meta_action = task . args . get ( ' _raw_params ' )
def _evaluate_conditional ( h ) :
all_vars = self . _variable_manager . get_vars ( play = iterator . _play , host = h , task = task )
all_vars = self . _variable_manager . get_vars ( play = iterator . _play , host = h , task = task ,
_hosts = self . _hosts_cache , _hosts_all = self . _hosts_cache_all )
templar = Templar ( loader = self . _loader , variables = all_vars )
return task . evaluate_conditional ( templar , all_vars )
@ -1015,6 +1026,7 @@ class StrategyBase:
if task . when :
self . _cond_not_supported_warn ( meta_action )
self . _inventory . refresh_inventory ( )
self . _set_hosts_cache ( iterator . _play )
msg = " inventory successfully refreshed "
elif meta_action == ' clear_facts ' :
if _evaluate_conditional ( target_host ) :
@ -1047,7 +1059,8 @@ class StrategyBase:
skipped = True
msg = " end_host conditional evaluated to false, continuing execution for %s " % target_host . name
elif meta_action == ' reset_connection ' :
all_vars = self . _variable_manager . get_vars ( play = iterator . _play , host = target_host , task = task )
all_vars = self . _variable_manager . get_vars ( play = iterator . _play , host = target_host , task = task ,
_hosts = self . _hosts_cache , _hosts_all = self . _hosts_cache_all )
templar = Templar ( loader = self . _loader , variables = all_vars )
# apply the given task's information to the connection info,
@ -1075,7 +1088,7 @@ class StrategyBase:
connection = Connection ( self . _active_connections [ target_host ] )
del self . _active_connections [ target_host ]
else :
connection = connection_loader. get ( play_context . connection , play_context , os . devnull )
connection = plugin_loader. connection_loader. get ( play_context . connection , play_context , os . devnull )
play_context . set_attributes_from_plugin ( connection )
if connection :
@ -1104,9 +1117,12 @@ class StrategyBase:
''' returns list of available hosts for this iterator by filtering out unreachables '''
hosts_left = [ ]
for host in self . _inventory . get_hosts ( iterator . _play . hosts , order = iterator . _play . order ) :
if host . name not in self . _tqm . _unreachable_hosts :
hosts_left . append ( host )
for host in self . _hosts_cache :
if host not in self . _tqm . _unreachable_hosts :
try :
hosts_left . append ( self . _inventory . hosts [ host ] )
except KeyError :
hosts_left . append ( self . _inventory . get_host ( host ) )
return hosts_left
def update_active_connections ( self , results ) :