From db7ba871110288f7790b999137987b787b5acaaf Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Sun, 11 Mar 2012 20:54:54 -0400 Subject: [PATCH] Add polling logic in runner such that all actions get kicked off everywhere then polling happens only on needed hosts, allowing some hosts to fail and drop out of the running. --- bin/ansible | 70 ++++++++++++++++++++++++++++++++++++++----- lib/ansible/runner.py | 34 --------------------- lib/ansible/utils.py | 12 ++++---- library/async_status | 5 ++-- 4 files changed, 71 insertions(+), 50 deletions(-) diff --git a/bin/ansible b/bin/ansible index 8189f1c93fa..9c90b3a79ab 100755 --- a/bin/ansible +++ b/bin/ansible @@ -28,10 +28,11 @@ import sys import os import getpass import shlex +import time +from optparse import OptionParser import ansible.runner import ansible.playbook import ansible.constants as C -from optparse import OptionParser from ansible.utils import * ######################################################## @@ -91,7 +92,7 @@ class Cli(object): if options.ask_pass: sshpass = getpass.getpass(prompt="SSH password: ") - return ansible.runner.Runner( + runner = ansible.runner.Runner( module_name=options.module_name, module_path=options.module_path, module_args=shlex.split(options.module_args), @@ -101,15 +102,40 @@ class Cli(object): timeout=options.timeout, forks=options.forks, background=options.seconds, - poll_interval=options.poll_interval, - async_poll_callback=async_poll_status, pattern=pattern, verbose=True, - ).run() + ) + return (runner, runner.run()) + + + # ---------------------------------------------- + + def get_polling_runner(self, old_runner, hosts, jid): + return ansible.runner.Runner( + module_name='async_status', + module_path=old_runner.module_path, + module_args=[ "jid=%s" % jid ], + remote_user=old_runner.remote_user, + remote_pass=old_runner.remote_pass, + host_list=hosts, + timeout=old_runner.timeout, + forks=old_runner.forks, + pattern='*', + verbose=True, + ) + + # ---------------------------------------------- + + def hosts_to_poll(self, results): + hosts = [] + for (host, res) in results['contacted'].iteritems(): + if res.get('started',False): + hosts.append(host) + return hosts # ---------------------------------------------- - def output(self, results, options, args): + def output(self, runner, results, options, args): ''' summarize results from Runner ''' if results is None: @@ -117,6 +143,34 @@ class Cli(object): if options.tree: prepare_writeable_dir(options.tree) + # BACKGROUND POLL LOGIC when -B and -P are specified + # FIXME: refactor + if options.seconds and options.poll_interval > 0: + poll_hosts = results['contacted'].keys() + if len(poll_hosts) == 0: + exit("no jobs were launched successfully") + ahost = poll_hosts[0] + jid = results['contacted'][ahost].get('ansible_job_id', None) + if jid is None: + exit("unexpected error: unable to determine jid") + + clock = options.seconds + while (clock >= 0): + polling_runner = self.get_polling_runner(runner, poll_hosts, jid) + poll_results = polling_runner.run() + if poll_results is None: + break + for (host, host_result) in poll_results['contacted'].iteritems(): + # override last result with current status result for report + results['contacted'][host] = host_result + print async_poll_status(jid, host, clock, host_result) + clock = clock - options.poll_interval + time.sleep(options.poll_interval) + poll_hosts = self.hosts_to_poll(poll_results) + for (host, host_result) in results['contacted'].iteritems(): + if 'started' in host_result: + results['contacted'][host] = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' } + buf = '' for hostname in contacted_hosts(results): msg = host_report_msg( @@ -139,7 +193,7 @@ class Cli(object): if __name__ == '__main__': cli = Cli() (options, args) = cli.parse() - results = cli.run(options, args) - cli.output(results, options, args) + (runner, results) = cli.run(options, args) + cli.output(runner, results, options, args) diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index 6aa8734c044..83aee0455df 100755 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -68,8 +68,6 @@ class Runner(object): basedir=None, setup_cache={}, transport='paramiko', - poll_interval=None, - async_poll_callback=None, verbose=False): ''' @@ -99,11 +97,6 @@ class Runner(object): self.remote_user = remote_user self.remote_pass = remote_pass self.background = background - self.poll_interval = poll_interval - self.async_poll_callback = async_poll_callback - - if self.async_poll_callback is None: - self.async_poll_callback = async_poll_status if basedir is None: basedir = os.getcwd() @@ -363,33 +356,6 @@ class Runner(object): result = self._execute_normal_module(conn, host, tmp) else: result = self._execute_async_module(conn, host, tmp) - if self.poll_interval > 0: - # poll for completion - # FIXME: refactor - - (host, ok, launch_result) = result - jid = launch_result.get('ansible_job_id', None) - if jid is None: - return result - if self.async_poll_callback is None: - self.async_poll_callback = async_poll_callback - self.module_name = 'async_status' - self.module_args = [ "jid=%s" % jid ] - clock = self.background - while (clock >= 0): - time.sleep(self.poll_interval) - clock -= self.poll_interval - result = self._execute_normal_module(conn, host, tmp) - (host, ok, real_result) = result - self.async_poll_callback(self, clock, self.poll_interval, ok, host, jid, real_result) - if 'finished' in real_result or 'failed' in real_result: - clock=-1 - elif (clock < 0 and not 'finished' in real_result): - return [ host, False, "timer expired" ] - - self._delete_remote_files(conn, tmp) - conn.close() - return result elif self.module_name == 'copy': result = self._execute_copy(conn, host, tmp) diff --git a/lib/ansible/utils.py b/lib/ansible/utils.py index 295f054f1e0..d561e5f8225 100755 --- a/lib/ansible/utils.py +++ b/lib/ansible/utils.py @@ -175,12 +175,12 @@ def path_dwim(basedir, given): else: return os.path.join(basedir, given) -def async_poll_status(runner, clock, poll_interval, ok, host, jid, result): - if ok and 'finished' in result: - print " finished on %s" % (jid, host) - elif not ok or 'failed' in result: - print " FAILED on %s" % (jid, host) +def async_poll_status(jid, host, clock, result): + if 'finished' in result: + return " finished on %s" % (jid, host) + elif 'failed' in result: + return " FAILED on %s" % (jid, host) else: - print " polling on %s, %s remaining" % (jid, host, clock) + return " polling on %s, %s remaining" % (jid, host, clock) diff --git a/library/async_status b/library/async_status index 839614ca929..8959ec97b19 100755 --- a/library/async_status +++ b/library/async_status @@ -81,12 +81,13 @@ if mode == 'cleanup': data = file(log_path).read() try: data = json.loads(data) -except: +except Exception, e: if data == '': # file not written yet? That means it is running print json.dumps({ "results_file" : log_path, "ansible_job_id" : jid, + "traceback" : str(e), "started" : 1, }) else: @@ -96,7 +97,7 @@ except: "results_file" : log_path, "msg" : "Could not parse job output: %s" % data, }) - sys.exit(1) + sys.exit(0) if not data.has_key("started"): data['finished'] = 1