Use previous proven multiprocessing logic as the simplification didn't have the same Ctrl-C handling and may

be subject to race issues, though still don't pass Runner to each.  Still seems performant.
pull/1648/head
Michael DeHaan 12 years ago
parent 03f303fba3
commit e04dab904a

@ -58,13 +58,22 @@ multiprocessing_runner = None
################################################ ################################################
def _executor_hook(host): def _executor_hook(job_queue, result_queue):
# attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17 # attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17
# this function also not present in CentOS 6 # this function also not present in CentOS 6
if HAS_ATFORK: if HAS_ATFORK:
atfork() atfork()
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
return multiprocessing_runner._executor(host) while not job_queue.empty():
try:
host = job_queue.get(block=False)
result_queue.put(multiprocessing_runner._executor(host))
except Queue.Empty:
pass
except:
traceback.print_exc()
class HostVars(dict): class HostVars(dict):
''' A special view of setup_cache that adds values from the inventory when needed. ''' ''' A special view of setup_cache that adds values from the inventory when needed. '''
@ -560,20 +569,38 @@ class Runner(object):
# ***************************************************** # *****************************************************
def _parallel_exec(self, hosts): def _parallel_exec(self, hosts):
''' handles mulitprocessing when more than 1 fork is required ''' ''' handles mulitprocessing when more than 1 fork is required '''
# experiment for 0.9, we may revert this if it causes manager = multiprocessing.Manager()
# problems -- used to cause problems when Runner was a passed job_queue = manager.Queue()
# argument but may not be anymore. for host in hosts:
job_queue.put(host)
result_queue = manager.Queue()
workers = []
for i in range(self.forks):
prc = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue))
prc.start()
workers.append(prc)
p = multiprocessing.Pool(self.forks)
try: try:
result = p.map(_executor_hook, hosts) for worker in workers:
worker.join()
except KeyboardInterrupt: except KeyboardInterrupt:
p.terminate() for worker in workers:
raise errors.AnsibleError("Interrupted") worker.terminate()
return result worker.join()
results = []
try:
while not result_queue.empty():
results.append(result_queue.get(block=False))
except socket.error:
raise errors.AnsibleError("<interrupted>")
return results
# ***************************************************** # *****************************************************

@ -132,7 +132,6 @@ class TestRunner(unittest.TestCase):
result = self._run('command', [ "/usr/bin/this_does_not_exist", "splat" ]) result = self._run('command', [ "/usr/bin/this_does_not_exist", "splat" ])
assert 'msg' in result assert 'msg' in result
assert 'failed' in result assert 'failed' in result
assert 'rc' not in result
result = self._run('shell', [ "/bin/echo", "$HOME" ]) result = self._run('shell', [ "/bin/echo", "$HOME" ])
assert 'failed' not in result assert 'failed' not in result

Loading…
Cancel
Save