Add polling logic in runner such that all actions get kicked off everywhere then polling

happens only on needed hosts, allowing some hosts to fail and drop out of the running.
pull/25/head
Michael DeHaan 13 years ago
parent 49a636d8a0
commit db7ba87111

@ -28,10 +28,11 @@ import sys
import os import os
import getpass import getpass
import shlex import shlex
import time
from optparse import OptionParser
import ansible.runner import ansible.runner
import ansible.playbook import ansible.playbook
import ansible.constants as C import ansible.constants as C
from optparse import OptionParser
from ansible.utils import * from ansible.utils import *
######################################################## ########################################################
@ -91,7 +92,7 @@ class Cli(object):
if options.ask_pass: if options.ask_pass:
sshpass = getpass.getpass(prompt="SSH password: ") sshpass = getpass.getpass(prompt="SSH password: ")
return ansible.runner.Runner( runner = ansible.runner.Runner(
module_name=options.module_name, module_name=options.module_name,
module_path=options.module_path, module_path=options.module_path,
module_args=shlex.split(options.module_args), module_args=shlex.split(options.module_args),
@ -101,15 +102,40 @@ class Cli(object):
timeout=options.timeout, timeout=options.timeout,
forks=options.forks, forks=options.forks,
background=options.seconds, background=options.seconds,
poll_interval=options.poll_interval,
async_poll_callback=async_poll_status,
pattern=pattern, pattern=pattern,
verbose=True, verbose=True,
).run() )
return (runner, runner.run())
# ----------------------------------------------
def get_polling_runner(self, old_runner, hosts, jid):
return ansible.runner.Runner(
module_name='async_status',
module_path=old_runner.module_path,
module_args=[ "jid=%s" % jid ],
remote_user=old_runner.remote_user,
remote_pass=old_runner.remote_pass,
host_list=hosts,
timeout=old_runner.timeout,
forks=old_runner.forks,
pattern='*',
verbose=True,
)
# ----------------------------------------------
def hosts_to_poll(self, results):
hosts = []
for (host, res) in results['contacted'].iteritems():
if res.get('started',False):
hosts.append(host)
return hosts
# ---------------------------------------------- # ----------------------------------------------
def output(self, results, options, args): def output(self, runner, results, options, args):
''' summarize results from Runner ''' ''' summarize results from Runner '''
if results is None: if results is None:
@ -117,6 +143,34 @@ class Cli(object):
if options.tree: if options.tree:
prepare_writeable_dir(options.tree) prepare_writeable_dir(options.tree)
# BACKGROUND POLL LOGIC when -B and -P are specified
# FIXME: refactor
if options.seconds and options.poll_interval > 0:
poll_hosts = results['contacted'].keys()
if len(poll_hosts) == 0:
exit("no jobs were launched successfully")
ahost = poll_hosts[0]
jid = results['contacted'][ahost].get('ansible_job_id', None)
if jid is None:
exit("unexpected error: unable to determine jid")
clock = options.seconds
while (clock >= 0):
polling_runner = self.get_polling_runner(runner, poll_hosts, jid)
poll_results = polling_runner.run()
if poll_results is None:
break
for (host, host_result) in poll_results['contacted'].iteritems():
# override last result with current status result for report
results['contacted'][host] = host_result
print async_poll_status(jid, host, clock, host_result)
clock = clock - options.poll_interval
time.sleep(options.poll_interval)
poll_hosts = self.hosts_to_poll(poll_results)
for (host, host_result) in results['contacted'].iteritems():
if 'started' in host_result:
results['contacted'][host] = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' }
buf = '' buf = ''
for hostname in contacted_hosts(results): for hostname in contacted_hosts(results):
msg = host_report_msg( msg = host_report_msg(
@ -139,7 +193,7 @@ class Cli(object):
if __name__ == '__main__': if __name__ == '__main__':
cli = Cli() cli = Cli()
(options, args) = cli.parse() (options, args) = cli.parse()
results = cli.run(options, args) (runner, results) = cli.run(options, args)
cli.output(results, options, args) cli.output(runner, results, options, args)

@ -68,8 +68,6 @@ class Runner(object):
basedir=None, basedir=None,
setup_cache={}, setup_cache={},
transport='paramiko', transport='paramiko',
poll_interval=None,
async_poll_callback=None,
verbose=False): verbose=False):
''' '''
@ -99,11 +97,6 @@ class Runner(object):
self.remote_user = remote_user self.remote_user = remote_user
self.remote_pass = remote_pass self.remote_pass = remote_pass
self.background = background self.background = background
self.poll_interval = poll_interval
self.async_poll_callback = async_poll_callback
if self.async_poll_callback is None:
self.async_poll_callback = async_poll_status
if basedir is None: if basedir is None:
basedir = os.getcwd() basedir = os.getcwd()
@ -363,33 +356,6 @@ class Runner(object):
result = self._execute_normal_module(conn, host, tmp) result = self._execute_normal_module(conn, host, tmp)
else: else:
result = self._execute_async_module(conn, host, tmp) result = self._execute_async_module(conn, host, tmp)
if self.poll_interval > 0:
# poll for completion
# FIXME: refactor
(host, ok, launch_result) = result
jid = launch_result.get('ansible_job_id', None)
if jid is None:
return result
if self.async_poll_callback is None:
self.async_poll_callback = async_poll_callback
self.module_name = 'async_status'
self.module_args = [ "jid=%s" % jid ]
clock = self.background
while (clock >= 0):
time.sleep(self.poll_interval)
clock -= self.poll_interval
result = self._execute_normal_module(conn, host, tmp)
(host, ok, real_result) = result
self.async_poll_callback(self, clock, self.poll_interval, ok, host, jid, real_result)
if 'finished' in real_result or 'failed' in real_result:
clock=-1
elif (clock < 0 and not 'finished' in real_result):
return [ host, False, "timer expired" ]
self._delete_remote_files(conn, tmp)
conn.close()
return result
elif self.module_name == 'copy': elif self.module_name == 'copy':
result = self._execute_copy(conn, host, tmp) result = self._execute_copy(conn, host, tmp)

@ -175,12 +175,12 @@ def path_dwim(basedir, given):
else: else:
return os.path.join(basedir, given) return os.path.join(basedir, given)
def async_poll_status(runner, clock, poll_interval, ok, host, jid, result): def async_poll_status(jid, host, clock, result):
if ok and 'finished' in result: if 'finished' in result:
print "<job %s> finished on %s" % (jid, host) return "<job %s> finished on %s" % (jid, host)
elif not ok or 'failed' in result: elif 'failed' in result:
print "<job %s> FAILED on %s" % (jid, host) return "<job %s> FAILED on %s" % (jid, host)
else: else:
print "<job %s> polling on %s, %s remaining" % (jid, host, clock) return "<job %s> polling on %s, %s remaining" % (jid, host, clock)

@ -81,12 +81,13 @@ if mode == 'cleanup':
data = file(log_path).read() data = file(log_path).read()
try: try:
data = json.loads(data) data = json.loads(data)
except: except Exception, e:
if data == '': if data == '':
# file not written yet? That means it is running # file not written yet? That means it is running
print json.dumps({ print json.dumps({
"results_file" : log_path, "results_file" : log_path,
"ansible_job_id" : jid, "ansible_job_id" : jid,
"traceback" : str(e),
"started" : 1, "started" : 1,
}) })
else: else:
@ -96,7 +97,7 @@ except:
"results_file" : log_path, "results_file" : log_path,
"msg" : "Could not parse job output: %s" % data, "msg" : "Could not parse job output: %s" % data,
}) })
sys.exit(1) sys.exit(0)
if not data.has_key("started"): if not data.has_key("started"):
data['finished'] = 1 data['finished'] = 1

Loading…
Cancel
Save