From 85d66b9a0c87849c7a83099141e5b2b4731abbe5 Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Fri, 7 Feb 2014 16:52:17 -0500 Subject: [PATCH] This patch makes Ansible reuse fork allocation between seperate instantations of the runner API, therefore the overhead of recreating forks between tasks in a playbook is avoided. The fork pool will be regenerated when a second play comes along and needs more hosts. --- lib/ansible/runner/__init__.py | 68 +++++++++++----------------------- 1 file changed, 21 insertions(+), 47 deletions(-) diff --git a/lib/ansible/runner/__init__.py b/lib/ansible/runner/__init__.py index 528dfb42fff..9160a0b5928 100644 --- a/lib/ansible/runner/__init__.py +++ b/lib/ansible/runner/__init__.py @@ -49,6 +49,7 @@ from ansible.module_common import ModuleReplacer module_replacer = ModuleReplacer(strip_comments=False) +NEED_ATFORK=False HAS_ATFORK=True try: from Crypto.Random import atfork @@ -60,30 +61,28 @@ multiprocessing_runner = None OUTPUT_LOCKFILE = tempfile.TemporaryFile() PROCESS_LOCKFILE = tempfile.TemporaryFile() +from foon import Foon + +FOON = Foon() + ################################################ -def _executor_hook(job_queue, result_queue, new_stdin): +class KeyboardInterruptError(Exception): + pass + +def _executor_hook(params): + + (host, my_stdin) = params # attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17 # this function also not present in CentOS 6 - if HAS_ATFORK: + if HAS_ATFORK and NEED_ATFORK: atfork() - signal.signal(signal.SIGINT, signal.SIG_IGN) - while not job_queue.empty(): - try: - host = job_queue.get(block=False) - return_data = multiprocessing_runner._executor(host, new_stdin) - result_queue.put(return_data) - - if 'LEGACY_TEMPLATE_WARNING' in return_data.flags: - # pass data back up across the multiprocessing fork boundary - template.Flags.LEGACY_TEMPLATE_WARNING = True - - except Queue.Empty: - pass - except: - traceback.print_exc() + try: + return multiprocessing_runner._executor(host, my_stdin) + except KeyboardInterrupt: + raise KeyboardInterruptError() class HostVars(dict): ''' A special view of setup_cache that adds values from the inventory when needed. ''' @@ -209,6 +208,9 @@ class Runner(object): else: self.transport = "ssh" + if self.transport == "paramiko": + global NEED_ATFORK + NEED_ATFORK=True # misc housekeeping if subset and self.inventory._subset is None: @@ -1056,39 +1058,11 @@ class Runner(object): # ***************************************************** - def _parallel_exec(self, hosts): ''' handles mulitprocessing when more than 1 fork is required ''' - manager = multiprocessing.Manager() - job_queue = manager.Queue() - for host in hosts: - job_queue.put(host) - result_queue = manager.Queue() - - workers = [] - for i in range(self.forks): - new_stdin = os.fdopen(os.dup(sys.stdin.fileno())) - prc = multiprocessing.Process(target=_executor_hook, - args=(job_queue, result_queue, new_stdin)) - prc.start() - workers.append(prc) - - try: - for worker in workers: - worker.join() - except KeyboardInterrupt: - for worker in workers: - worker.terminate() - worker.join() - - results = [] - try: - while not result_queue.empty(): - results.append(result_queue.get(block=False)) - except socket.error: - raise errors.AnsibleError("") - return results + FOON.set_size(self.forks) + return FOON.map(_executor_hook, hosts) # *****************************************************