|
|
|
@ -22,14 +22,21 @@ __metaclass__ = type
|
|
|
|
|
import multiprocessing
|
|
|
|
|
import os
|
|
|
|
|
import tempfile
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
from collections import deque
|
|
|
|
|
|
|
|
|
|
from ansible import constants as C
|
|
|
|
|
from ansible.errors import AnsibleError
|
|
|
|
|
from ansible.executor import action_write_locks
|
|
|
|
|
from ansible.executor.play_iterator import PlayIterator
|
|
|
|
|
from ansible.executor.process.worker import WorkerProcess
|
|
|
|
|
from ansible.executor.stats import AggregateStats
|
|
|
|
|
from ansible.module_utils.facts import Facts
|
|
|
|
|
from ansible.playbook.block import Block
|
|
|
|
|
from ansible.playbook.play_context import PlayContext
|
|
|
|
|
from ansible.plugins import callback_loader, strategy_loader, module_loader
|
|
|
|
|
from ansible.plugins import action_loader, callback_loader, connection_loader, filter_loader, lookup_loader, module_loader, strategy_loader, test_loader
|
|
|
|
|
from ansible.template import Templar
|
|
|
|
|
from ansible.vars.hostvars import HostVars
|
|
|
|
|
from ansible.plugins.callback import CallbackBase
|
|
|
|
@ -46,6 +53,23 @@ except ImportError:
|
|
|
|
|
__all__ = ['TaskQueueManager']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: this should probably be in the plugins/__init__.py, with
|
|
|
|
|
# a smarter mechanism to set all of the attributes based on
|
|
|
|
|
# the loaders created there
|
|
|
|
|
class SharedPluginLoaderObj:
|
|
|
|
|
'''
|
|
|
|
|
A simple object to make pass the various plugin loaders to
|
|
|
|
|
the forked processes over the queue easier
|
|
|
|
|
'''
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.action_loader = action_loader
|
|
|
|
|
self.connection_loader = connection_loader
|
|
|
|
|
self.filter_loader = filter_loader
|
|
|
|
|
self.test_loader = test_loader
|
|
|
|
|
self.lookup_loader = lookup_loader
|
|
|
|
|
self.module_loader = module_loader
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TaskQueueManager:
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
@ -77,6 +101,8 @@ class TaskQueueManager:
|
|
|
|
|
self._run_additional_callbacks = run_additional_callbacks
|
|
|
|
|
self._run_tree = run_tree
|
|
|
|
|
|
|
|
|
|
self._iterator = None
|
|
|
|
|
|
|
|
|
|
self._callbacks_loaded = False
|
|
|
|
|
self._callback_plugins = []
|
|
|
|
|
self._start_at_done = False
|
|
|
|
@ -98,12 +124,86 @@ class TaskQueueManager:
|
|
|
|
|
self._failed_hosts = dict()
|
|
|
|
|
self._unreachable_hosts = dict()
|
|
|
|
|
|
|
|
|
|
# the "queue" for the background thread to use
|
|
|
|
|
self._queued_tasks = deque()
|
|
|
|
|
self._queued_tasks_lock = threading.Lock()
|
|
|
|
|
|
|
|
|
|
# the background queuing thread
|
|
|
|
|
self._queue_thread = None
|
|
|
|
|
|
|
|
|
|
self._workers = []
|
|
|
|
|
self._final_q = multiprocessing.Queue()
|
|
|
|
|
|
|
|
|
|
# A temporary file (opened pre-fork) used by connection
|
|
|
|
|
# plugins for inter-process locking.
|
|
|
|
|
self._connection_lockfile = tempfile.TemporaryFile()
|
|
|
|
|
|
|
|
|
|
def _queue_thread_main(self):
|
|
|
|
|
|
|
|
|
|
# create a dummy object with plugin loaders set as an easier
|
|
|
|
|
# way to share them with the forked processes
|
|
|
|
|
shared_loader_obj = SharedPluginLoaderObj()
|
|
|
|
|
|
|
|
|
|
display.debug("queuing thread starting")
|
|
|
|
|
while not self._terminated:
|
|
|
|
|
available_workers = []
|
|
|
|
|
for idx, entry in enumerate(self._workers):
|
|
|
|
|
(worker_prc, _) = entry
|
|
|
|
|
if worker_prc is None or not worker_prc.is_alive():
|
|
|
|
|
available_workers.append(idx)
|
|
|
|
|
|
|
|
|
|
if len(available_workers) == 0:
|
|
|
|
|
time.sleep(0.01)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
for worker_idx in available_workers:
|
|
|
|
|
try:
|
|
|
|
|
self._queued_tasks_lock.acquire()
|
|
|
|
|
(host, task, task_vars, play_context) = self._queued_tasks.pop()
|
|
|
|
|
except IndexError:
|
|
|
|
|
break
|
|
|
|
|
finally:
|
|
|
|
|
self._queued_tasks_lock.release()
|
|
|
|
|
|
|
|
|
|
if task.action not in action_write_locks.action_write_locks:
|
|
|
|
|
display.debug('Creating lock for %s' % task.action)
|
|
|
|
|
action_write_locks.action_write_locks[task.action] = multiprocessing.Lock()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
worker_prc = WorkerProcess(
|
|
|
|
|
self._final_q,
|
|
|
|
|
self._iterator._play,
|
|
|
|
|
host,
|
|
|
|
|
task,
|
|
|
|
|
task_vars,
|
|
|
|
|
play_context,
|
|
|
|
|
self._loader,
|
|
|
|
|
self._variable_manager,
|
|
|
|
|
shared_loader_obj,
|
|
|
|
|
)
|
|
|
|
|
self._workers[worker_idx][0] = worker_prc
|
|
|
|
|
worker_prc.start()
|
|
|
|
|
display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(self._workers)))
|
|
|
|
|
|
|
|
|
|
except (EOFError, IOError, AssertionError) as e:
|
|
|
|
|
# most likely an abort
|
|
|
|
|
display.debug("got an error while queuing: %s" % e)
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
display.debug("queuing thread exiting")
|
|
|
|
|
|
|
|
|
|
def queue_task(self, host, task, task_vars, play_context):
|
|
|
|
|
self._queued_tasks_lock.acquire()
|
|
|
|
|
self._queued_tasks.append((host, task, task_vars, play_context))
|
|
|
|
|
self._queued_tasks_lock.release()
|
|
|
|
|
|
|
|
|
|
def queue_multiple_tasks(self, items, play_context):
|
|
|
|
|
for item in items:
|
|
|
|
|
(host, task, task_vars) = item
|
|
|
|
|
self._queued_tasks_lock.acquire()
|
|
|
|
|
self._queued_tasks.append((host, task, task_vars, play_context))
|
|
|
|
|
self._queued_tasks_lock.release()
|
|
|
|
|
|
|
|
|
|
def _initialize_processes(self, num):
|
|
|
|
|
self._workers = []
|
|
|
|
|
|
|
|
|
@ -207,6 +307,10 @@ class TaskQueueManager:
|
|
|
|
|
if not self._callbacks_loaded:
|
|
|
|
|
self.load_callbacks()
|
|
|
|
|
|
|
|
|
|
if self._queue_thread is None:
|
|
|
|
|
self._queue_thread = threading.Thread(target=self._queue_thread_main)
|
|
|
|
|
self._queue_thread.start()
|
|
|
|
|
|
|
|
|
|
all_vars = self._variable_manager.get_vars(loader=self._loader, play=play)
|
|
|
|
|
templar = Templar(loader=self._loader, variables=all_vars)
|
|
|
|
|
|
|
|
|
@ -252,7 +356,7 @@ class TaskQueueManager:
|
|
|
|
|
raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds)
|
|
|
|
|
|
|
|
|
|
# build the iterator
|
|
|
|
|
iterator = PlayIterator(
|
|
|
|
|
self._iterator = PlayIterator(
|
|
|
|
|
inventory=self._inventory,
|
|
|
|
|
play=new_play,
|
|
|
|
|
play_context=play_context,
|
|
|
|
@ -267,7 +371,7 @@ class TaskQueueManager:
|
|
|
|
|
# hosts so we know what failed this round.
|
|
|
|
|
for host_name in self._failed_hosts.keys():
|
|
|
|
|
host = self._inventory.get_host(host_name)
|
|
|
|
|
iterator.mark_host_failed(host)
|
|
|
|
|
self._iterator.mark_host_failed(host)
|
|
|
|
|
|
|
|
|
|
self.clear_failed_hosts()
|
|
|
|
|
|
|
|
|
@ -278,10 +382,10 @@ class TaskQueueManager:
|
|
|
|
|
self._start_at_done = True
|
|
|
|
|
|
|
|
|
|
# and run the play using the strategy and cleanup on way out
|
|
|
|
|
play_return = strategy.run(iterator, play_context)
|
|
|
|
|
play_return = strategy.run(self._iterator, play_context)
|
|
|
|
|
|
|
|
|
|
# now re-save the hosts that failed from the iterator to our internal list
|
|
|
|
|
for host_name in iterator.get_failed_hosts():
|
|
|
|
|
for host_name in self._iterator.get_failed_hosts():
|
|
|
|
|
self._failed_hosts[host_name] = True
|
|
|
|
|
|
|
|
|
|
self._cleanup_processes()
|
|
|
|
@ -294,14 +398,13 @@ class TaskQueueManager:
|
|
|
|
|
self._cleanup_processes()
|
|
|
|
|
|
|
|
|
|
def _cleanup_processes(self):
|
|
|
|
|
if hasattr(self, '_workers'):
|
|
|
|
|
for (worker_prc, rslt_q) in self._workers:
|
|
|
|
|
rslt_q.close()
|
|
|
|
|
if worker_prc and worker_prc.is_alive():
|
|
|
|
|
try:
|
|
|
|
|
worker_prc.terminate()
|
|
|
|
|
except AttributeError:
|
|
|
|
|
pass
|
|
|
|
|
for (worker_prc, rslt_q) in self._workers:
|
|
|
|
|
rslt_q.close()
|
|
|
|
|
if worker_prc and worker_prc.is_alive():
|
|
|
|
|
try:
|
|
|
|
|
worker_prc.terminate()
|
|
|
|
|
except AttributeError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def clear_failed_hosts(self):
|
|
|
|
|
self._failed_hosts = dict()
|
|
|
|
|