|
|
|
@ -71,14 +71,12 @@ class SharedPluginLoaderObj:
|
|
|
|
|
|
|
|
|
|
_sentinel = object()
|
|
|
|
|
def results_thread_main(strategy):
|
|
|
|
|
#print("RESULT THREAD STARTING: %s" % threading.current_thread())
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
result = strategy._final_q.get()
|
|
|
|
|
if type(result) == object:
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
#print("result in thread is: %s" % result._result)
|
|
|
|
|
strategy._results_lock.acquire()
|
|
|
|
|
strategy._results.append(result)
|
|
|
|
|
strategy._results_lock.release()
|
|
|
|
@ -86,7 +84,6 @@ def results_thread_main(strategy):
|
|
|
|
|
break
|
|
|
|
|
except Queue.Empty:
|
|
|
|
|
pass
|
|
|
|
|
#print("RESULT THREAD EXITED: %s" % threading.current_thread())
|
|
|
|
|
|
|
|
|
|
class StrategyBase:
|
|
|
|
|
|
|
|
|
@ -121,7 +118,7 @@ class StrategyBase:
|
|
|
|
|
self._results = deque()
|
|
|
|
|
self._results_lock = threading.Condition(threading.Lock())
|
|
|
|
|
|
|
|
|
|
#print("creating thread for strategy %s" % id(self))
|
|
|
|
|
# create the result processing thread for reading results in the background
|
|
|
|
|
self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
|
|
|
|
|
self._results_thread.daemon = True
|
|
|
|
|
self._results_thread.start()
|
|
|
|
@ -316,7 +313,6 @@ class StrategyBase:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if original_task.register:
|
|
|
|
|
#print("^ REGISTERING RESULT %s" % original_task.register)
|
|
|
|
|
if original_task.run_once:
|
|
|
|
|
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
|
|
|
|
|
else:
|
|
|
|
@ -533,6 +529,8 @@ class StrategyBase:
|
|
|
|
|
|
|
|
|
|
results = self._process_pending_results(iterator)
|
|
|
|
|
ret_results.extend(results)
|
|
|
|
|
if self._pending_results > 0:
|
|
|
|
|
time.sleep(0.001)
|
|
|
|
|
|
|
|
|
|
display.debug("no more pending results, returning what we have")
|
|
|
|
|
|
|
|
|
|