Remove unused rslt_q, rename the one queue to final_q everywhere (#43894)

* Remove unused rslt_q, rename the one queue to final_q everywhere

* Test update
pull/44092/head
Matt Martz 6 years ago committed by GitHub
parent 5a49567c71
commit 17c89d1ffa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -57,11 +57,11 @@ class WorkerProcess(multiprocessing.Process):
for reading later. for reading later.
''' '''
def __init__(self, rslt_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):
super(WorkerProcess, self).__init__() super(WorkerProcess, self).__init__()
# takes a task queue manager as the sole param: # takes a task queue manager as the sole param:
self._rslt_q = rslt_q self._final_q = final_q
self._task_vars = task_vars self._task_vars = task_vars
self._host = host self._host = host
self._task = task self._task = task
@ -115,7 +115,7 @@ class WorkerProcess(multiprocessing.Process):
self._new_stdin, self._new_stdin,
self._loader, self._loader,
self._shared_loader_obj, self._shared_loader_obj,
self._rslt_q self._final_q
).run() ).run()
display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid)) display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid))
@ -130,7 +130,7 @@ class WorkerProcess(multiprocessing.Process):
# put the result on the result queue # put the result on the result queue
display.debug("sending task result for task %s" % self._task._uuid) display.debug("sending task result for task %s" % self._task._uuid)
self._rslt_q.put(task_result) self._final_q.put(task_result)
display.debug("done sending task result for task %s" % self._task._uuid) display.debug("done sending task result for task %s" % self._task._uuid)
except AnsibleConnectionFailure: except AnsibleConnectionFailure:
@ -142,7 +142,7 @@ class WorkerProcess(multiprocessing.Process):
dict(unreachable=True), dict(unreachable=True),
task_fields=self._task.dump_attrs(), task_fields=self._task.dump_attrs(),
) )
self._rslt_q.put(task_result, block=False) self._final_q.put(task_result, block=False)
except Exception as e: except Exception as e:
if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound): if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
@ -155,7 +155,7 @@ class WorkerProcess(multiprocessing.Process):
dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''), dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''),
task_fields=self._task.dump_attrs(), task_fields=self._task.dump_attrs(),
) )
self._rslt_q.put(task_result, block=False) self._final_q.put(task_result, block=False)
except: except:
display.debug(u"WORKER EXCEPTION: %s" % to_text(e)) display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc())) display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc()))

@ -68,7 +68,7 @@ class TaskExecutor:
# the module # the module
SQUASH_ACTIONS = frozenset(C.DEFAULT_SQUASH_ACTIONS) SQUASH_ACTIONS = frozenset(C.DEFAULT_SQUASH_ACTIONS)
def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, rslt_q): def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q):
self._host = host self._host = host
self._task = task self._task = task
self._job_vars = job_vars self._job_vars = job_vars
@ -77,7 +77,7 @@ class TaskExecutor:
self._loader = loader self._loader = loader
self._shared_loader_obj = shared_loader_obj self._shared_loader_obj = shared_loader_obj
self._connection = None self._connection = None
self._rslt_q = rslt_q self._final_q = final_q
self._loop_eval_error = None self._loop_eval_error = None
self._task.squash() self._task.squash()
@ -348,7 +348,7 @@ class TaskExecutor:
# gets templated here unlike rest of loop_control fields, depends on loop_var above # gets templated here unlike rest of loop_control fields, depends on loop_var above
res['_ansible_item_label'] = templar.template(label, cache=False) res['_ansible_item_label'] = templar.template(label, cache=False)
self._rslt_q.put( self._final_q.put(
TaskResult( TaskResult(
self._host.name, self._host.name,
self._task._uuid, self._task._uuid,
@ -669,7 +669,7 @@ class TaskExecutor:
result['_ansible_retry'] = True result['_ansible_retry'] = True
result['retries'] = retries result['retries'] = retries
display.debug('Retrying task, attempt %d of %d' % (attempt, retries)) display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()), block=False) self._final_q.put(TaskResult(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()), block=False)
time.sleep(delay) time.sleep(delay)
else: else:
if retries > 1: if retries > 1:

@ -114,8 +114,7 @@ class TaskQueueManager:
self._workers = [] self._workers = []
for i in range(num): for i in range(num):
rslt_q = multiprocessing.Queue() self._workers.append(None)
self._workers.append([None, rslt_q])
def _initialize_notified_handlers(self, play): def _initialize_notified_handlers(self, play):
''' '''
@ -307,8 +306,7 @@ class TaskQueueManager:
def _cleanup_processes(self): def _cleanup_processes(self):
if hasattr(self, '_workers'): if hasattr(self, '_workers'):
for (worker_prc, rslt_q) in self._workers: for worker_prc in self._workers:
rslt_q.close()
if worker_prc and worker_prc.is_alive(): if worker_prc and worker_prc.is_alive():
try: try:
worker_prc.terminate() worker_prc.terminate()
@ -340,8 +338,8 @@ class TaskQueueManager:
defunct = False defunct = False
for (idx, x) in enumerate(self._workers): for (idx, x) in enumerate(self._workers):
if hasattr(x[0], 'exitcode'): if hasattr(x, 'exitcode'):
if x[0].exitcode in [-9, -11, -15]: if x.exitcode in [-9, -11, -15]:
defunct = True defunct = True
return defunct return defunct

@ -302,7 +302,7 @@ class StrategyBase:
queued = False queued = False
starting_worker = self._cur_worker starting_worker = self._cur_worker
while True: while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker] worker_prc = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive(): if worker_prc is None or not worker_prc.is_alive():
self._queued_task_cache[(host.name, task._uuid)] = { self._queued_task_cache[(host.name, task._uuid)] = {
'host': host, 'host': host,
@ -312,7 +312,7 @@ class StrategyBase:
} }
worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc self._workers[self._cur_worker] = worker_prc
worker_prc.start() worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers))) display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True queued = True

@ -55,7 +55,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin=new_stdin, new_stdin=new_stdin,
loader=fake_loader, loader=fake_loader,
shared_loader_obj=mock_shared_loader, shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue, final_q=mock_queue,
) )
def test_task_executor_run(self): def test_task_executor_run(self):
@ -82,7 +82,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin=new_stdin, new_stdin=new_stdin,
loader=fake_loader, loader=fake_loader,
shared_loader_obj=mock_shared_loader, shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue, final_q=mock_queue,
) )
te._get_loop_items = MagicMock(return_value=None) te._get_loop_items = MagicMock(return_value=None)
@ -126,7 +126,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin=new_stdin, new_stdin=new_stdin,
loader=fake_loader, loader=fake_loader,
shared_loader_obj=mock_shared_loader, shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue, final_q=mock_queue,
) )
items = te._get_loop_items() items = te._get_loop_items()
@ -162,7 +162,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin=new_stdin, new_stdin=new_stdin,
loader=fake_loader, loader=fake_loader,
shared_loader_obj=mock_shared_loader, shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue, final_q=mock_queue,
) )
def _execute(variables): def _execute(variables):
@ -208,7 +208,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin=new_stdin, new_stdin=new_stdin,
loader=fake_loader, loader=fake_loader,
shared_loader_obj=mock_shared_loader, shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue, final_q=mock_queue,
) )
# No replacement # No replacement
@ -400,7 +400,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin=new_stdin, new_stdin=new_stdin,
loader=fake_loader, loader=fake_loader,
shared_loader_obj=shared_loader, shared_loader_obj=shared_loader,
rslt_q=mock_queue, final_q=mock_queue,
) )
te._get_connection = MagicMock(return_value=mock_connection) te._get_connection = MagicMock(return_value=mock_connection)
@ -455,7 +455,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin=new_stdin, new_stdin=new_stdin,
loader=fake_loader, loader=fake_loader,
shared_loader_obj=shared_loader, shared_loader_obj=shared_loader,
rslt_q=mock_queue, final_q=mock_queue,
) )
te._connection = MagicMock() te._connection = MagicMock()

Loading…
Cancel
Save