|
|
|
@ -31,6 +31,7 @@ from jinja2.exceptions import UndefinedError
|
|
|
|
|
from ansible import constants as C
|
|
|
|
|
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
|
|
|
|
|
from ansible.executor.play_iterator import PlayIterator
|
|
|
|
|
from ansible.executor.process.worker import WorkerProcess
|
|
|
|
|
from ansible.executor.task_result import TaskResult
|
|
|
|
|
from ansible.inventory.host import Host
|
|
|
|
|
from ansible.inventory.group import Group
|
|
|
|
@ -138,38 +139,29 @@ class StrategyBase:
|
|
|
|
|
|
|
|
|
|
display.debug("entering _queue_task() for %s/%s" % (host, task))
|
|
|
|
|
|
|
|
|
|
task_vars['hostvars'] = self._tqm.hostvars
|
|
|
|
|
# and then queue the new task
|
|
|
|
|
display.debug("%s - putting task (%s) in queue" % (host, task))
|
|
|
|
|
try:
|
|
|
|
|
display.debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers)))
|
|
|
|
|
|
|
|
|
|
(worker_prc, main_q, rslt_q) = self._workers[self._cur_worker]
|
|
|
|
|
self._cur_worker += 1
|
|
|
|
|
if self._cur_worker >= len(self._workers):
|
|
|
|
|
self._cur_worker = 0
|
|
|
|
|
|
|
|
|
|
# create a dummy object with plugin loaders set as an easier
|
|
|
|
|
# way to share them with the forked processes
|
|
|
|
|
shared_loader_obj = SharedPluginLoaderObj()
|
|
|
|
|
|
|
|
|
|
# compress (and convert) the data if so configured, which can
|
|
|
|
|
# help a lot when the variable dictionary is huge. We pop the
|
|
|
|
|
# hostvars out of the task variables right now, due to the fact
|
|
|
|
|
# that they're not JSON serializable
|
|
|
|
|
compressed_vars = False
|
|
|
|
|
if C.DEFAULT_VAR_COMPRESSION_LEVEL > 0:
|
|
|
|
|
zip_vars = zlib.compress(json.dumps(task_vars), C.DEFAULT_VAR_COMPRESSION_LEVEL)
|
|
|
|
|
compressed_vars = True
|
|
|
|
|
# we're done with the original dict now, so delete it to
|
|
|
|
|
# try and reclaim some memory space, which is helpful if the
|
|
|
|
|
# data contained in the dict is very large
|
|
|
|
|
del task_vars
|
|
|
|
|
else:
|
|
|
|
|
zip_vars = task_vars # noqa (pyflakes false positive because task_vars is deleted in the conditional above)
|
|
|
|
|
|
|
|
|
|
# and queue the task
|
|
|
|
|
main_q.put((host, task, self._loader.get_basedir(), zip_vars, compressed_vars, play_context, shared_loader_obj))
|
|
|
|
|
while True:
|
|
|
|
|
(worker_prc, main_q, rslt_q) = self._workers[self._cur_worker]
|
|
|
|
|
if worker_prc is None or not worker_prc.is_alive():
|
|
|
|
|
worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
|
|
|
|
|
self._workers[self._cur_worker][0] = worker_prc
|
|
|
|
|
worker_prc.start()
|
|
|
|
|
break
|
|
|
|
|
self._cur_worker += 1
|
|
|
|
|
if self._cur_worker >= len(self._workers):
|
|
|
|
|
self._cur_worker = 0
|
|
|
|
|
time.sleep(0.0001)
|
|
|
|
|
|
|
|
|
|
del task_vars
|
|
|
|
|
self._pending_results += 1
|
|
|
|
|
except (EOFError, IOError, AssertionError) as e:
|
|
|
|
|
# most likely an abort
|
|
|
|
@ -177,7 +169,7 @@ class StrategyBase:
|
|
|
|
|
return
|
|
|
|
|
display.debug("exiting _queue_task() for %s/%s" % (host, task))
|
|
|
|
|
|
|
|
|
|
def _process_pending_results(self, iterator):
|
|
|
|
|
def _process_pending_results(self, iterator, one_pass=False):
|
|
|
|
|
'''
|
|
|
|
|
Reads results off the final queue and takes appropriate action
|
|
|
|
|
based on the result (executing callbacks, updating state, etc.).
|
|
|
|
@ -247,13 +239,11 @@ class StrategyBase:
|
|
|
|
|
new_host_info = result_item.get('add_host', dict())
|
|
|
|
|
|
|
|
|
|
self._add_host(new_host_info, iterator)
|
|
|
|
|
self._tqm._hostvars_manager.hostvars().set_inventory(self._inventory)
|
|
|
|
|
|
|
|
|
|
elif result[0] == 'add_group':
|
|
|
|
|
host = result[1]
|
|
|
|
|
result_item = result[2]
|
|
|
|
|
self._add_group(host, result_item)
|
|
|
|
|
self._tqm._hostvars_manager.hostvars().set_inventory(self._inventory)
|
|
|
|
|
|
|
|
|
|
elif result[0] == 'notify_handler':
|
|
|
|
|
task_result = result[1]
|
|
|
|
@ -283,7 +273,6 @@ class StrategyBase:
|
|
|
|
|
|
|
|
|
|
for target_host in host_list:
|
|
|
|
|
self._variable_manager.set_nonpersistent_facts(target_host, {var_name: var_value})
|
|
|
|
|
self._tqm._hostvars_manager.hostvars().set_nonpersistent_facts(target_host, {var_name: var_value})
|
|
|
|
|
|
|
|
|
|
elif result[0] in ('set_host_var', 'set_host_facts'):
|
|
|
|
|
host = result[1]
|
|
|
|
@ -316,21 +305,22 @@ class StrategyBase:
|
|
|
|
|
|
|
|
|
|
for target_host in host_list:
|
|
|
|
|
self._variable_manager.set_host_variable(target_host, var_name, var_value)
|
|
|
|
|
self._tqm._hostvars_manager.hostvars().set_host_variable(target_host, var_name, var_value)
|
|
|
|
|
elif result[0] == 'set_host_facts':
|
|
|
|
|
facts = result[4]
|
|
|
|
|
if task.action == 'set_fact':
|
|
|
|
|
self._variable_manager.set_nonpersistent_facts(actual_host, facts)
|
|
|
|
|
self._tqm._hostvars_manager.hostvars().set_nonpersistent_facts(actual_host, facts)
|
|
|
|
|
else:
|
|
|
|
|
self._variable_manager.set_host_facts(actual_host, facts)
|
|
|
|
|
self._tqm._hostvars_manager.hostvars().set_host_facts(actual_host, facts)
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise AnsibleError("unknown result message received: %s" % result[0])
|
|
|
|
|
|
|
|
|
|
except Queue.Empty:
|
|
|
|
|
time.sleep(0.0001)
|
|
|
|
|
|
|
|
|
|
if one_pass:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
return ret_results
|
|
|
|
|
|
|
|
|
|
def _wait_on_pending_results(self, iterator):
|
|
|
|
|