remove main_q for simplicity.

main_q is not used anywhere in the codebase.

It is created in TaskQueueManager._initialize_processes, bundled with rslt_q
into TaskQueueManger._workers, later unwrapped in StrategyBase but not used.
This queue is closed in TaskQueueManger._cleanup_processes.

Historically, it is passed as a init parameter into WorkerProcess,
introduced in 62d7956, but this behavior is changed in 120b9a7.

Signed-off-by: 夏恺(Xia Kai) <xiaket@gmail.com>
pull/14793/head
夏恺(Xia Kai) 9 years ago
parent 8f1303c81a
commit b33074b703

@ -65,7 +65,7 @@ class ResultProcess(multiprocessing.Process):
result = None result = None
starting_point = self._cur_worker starting_point = self._cur_worker
while True: while True:
(worker_prc, main_q, rslt_q) = self._workers[self._cur_worker] (worker_prc, rslt_q) = self._workers[self._cur_worker]
self._cur_worker += 1 self._cur_worker += 1
if self._cur_worker >= len(self._workers): if self._cur_worker >= len(self._workers):
self._cur_worker = 0 self._cur_worker = 0

@ -99,9 +99,8 @@ class TaskQueueManager:
self._workers = [] self._workers = []
for i in range(num): for i in range(num):
main_q = multiprocessing.Queue()
rslt_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue()
self._workers.append([None, main_q, rslt_q]) self._workers.append([None, rslt_q])
self._result_prc = ResultProcess(self._final_q, self._workers) self._result_prc = ResultProcess(self._final_q, self._workers)
self._result_prc.start() self._result_prc.start()
@ -249,9 +248,8 @@ class TaskQueueManager:
if self._result_prc: if self._result_prc:
self._result_prc.terminate() self._result_prc.terminate()
for (worker_prc, main_q, rslt_q) in self._workers: for (worker_prc, rslt_q) in self._workers:
rslt_q.close() rslt_q.close()
main_q.close()
if worker_prc and worker_prc.is_alive(): if worker_prc and worker_prc.is_alive():
try: try:
worker_prc.terminate() worker_prc.terminate()

@ -153,7 +153,7 @@ class StrategyBase:
queued = False queued = False
while True: while True:
(worker_prc, main_q, rslt_q) = self._workers[self._cur_worker] (worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive(): if worker_prc is None or not worker_prc.is_alive():
worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc self._workers[self._cur_worker][0] = worker_prc

Loading…
Cancel
Save