This adds async poll support to playbooks. See examples. Some more testing due + docs

but this is more or less a mirror of what /bin/ansible does.  It also has a 'fire and
forget' mode if the poll interval is left off or set to 0.
pull/25/merge
Michael DeHaan 13 years ago
parent 32484f2156
commit 86e19cd8c8

@ -53,6 +53,12 @@ class PlaybookCallbacks(object):
def on_play_start(self, pattern): def on_play_start(self, pattern):
print "PLAY [%s] ****************************\n" % pattern print "PLAY [%s] ****************************\n" % pattern
def on_async_confused(self, msg):
print msg
def on_async_poll(self, jid, host, clock, host_result):
print async_poll_status(jid, host, clock, host_result)
def main(args): def main(args):
''' run ansible-playbook operations ''' ''' run ansible-playbook operations '''

@ -5,6 +5,10 @@
http_port: 80 http_port: 80
max_clients: 200 max_clients: 200
tasks: tasks:
- name: simulate long running op (15 sec), wait for up to 45, poll every 5
action: command /bin/sleep 15
async: 45
poll: 5
- include: base.yml favcolor=blue - include: base.yml favcolor=blue
- name: write the foo config file using vars set above - name: write the foo config file using vars set above
action: template src=foo.j2 dest=/etc/some_random_foo.conf action: template src=foo.j2 dest=/etc/some_random_foo.conf

@ -20,6 +20,7 @@
import paramiko import paramiko
import exceptions import exceptions
import os
################################################ ################################################
@ -84,15 +85,18 @@ class ParamikoConnection(object):
def exec_command(self, cmd): def exec_command(self, cmd):
''' run a command on the remote host ''' ''' run a command on the remote host '''
stdin, stdout, stderr = self.ssh.exec_command(cmd) stdin, stdout, stderr = self.ssh.exec_command(cmd)
return (stdin, stdout, stderr) return (stdin, stdout, stderr)
def put_file(self, in_path, out_path): def put_file(self, in_path, out_path):
''' transfer a file from local to remote ''' ''' transfer a file from local to remote '''
if not os.path.exists(in_path):
raise AnsibleConnectionException("file or module does not exist: %s" % in_path)
sftp = self.ssh.open_sftp() sftp = self.ssh.open_sftp()
sftp.put(in_path, out_path) try:
sftp.put(in_path, out_path)
except IOError:
raise AnsibleConnectionException("failed to transfer file to %s" % out_path)
sftp.close() sftp.close()
def close(self): def close(self):

@ -24,6 +24,7 @@ import yaml
import shlex import shlex
import os import os
import jinja2 import jinja2
import time
# used to transfer variables to Runner # used to transfer variables to Runner
SETUP_CACHE={ } SETUP_CACHE={ }
@ -167,9 +168,82 @@ class PlayBook(object):
new_hosts.append(x) new_hosts.append(x)
return new_hosts return new_hosts
def _run_module(self, pattern, module, args, hosts, remote_user): def hosts_to_poll(self, results):
''' which hosts need more polling? '''
hosts = []
for (host, res) in results['contacted'].iteritems():
# FIXME: make polling pattern in /bin/ansible match
# move to common function in utils
if not 'finished' in res and 'started' in res:
hosts.append(host)
return hosts
def _async_poll(self, runner, async_seconds, async_poll_interval):
''' launch an async job, if poll_interval is set, wait for completion '''
# TODO: refactor this function
runner.background = async_seconds
results = runner.run()
if async_poll_interval <= 0:
# if not polling, playbook requested fire and forget
# trust the user wanted that and return immediately
return results
poll_hosts = results['contacted'].keys()
if len(poll_hosts) == 0:
# no hosts launched ok, return that.
return results
ahost = poll_hosts[0]
jid = results['contacted'][ahost].get('ansible_job_id', None)
if jid is None:
# FIXME this really shouldn't happen. consider marking hosts failed
# and looking for jid in other host.
self.callbacks.on_async_confused("unexpected error: unable to determine jid")
return results
clock = async_seconds
runner.hosts = self.hosts_to_poll(results)
poll_results = results
while (clock >= 0):
runner.hosts = poll_hosts
# FIXME: make a "get_async_runner" method like in /bin/ansible
# loop until polling duration complete
runner.module_args = [ "jid=%s" % jid ]
runner.module_name = 'async_status'
# FIXME: make it such that if you say 'async_status' you
# can't background that op!
runner.background = 0
runner.pattern = '*'
runner.hosts = self.hosts_to_poll(poll_results)
poll_results = runner.run()
if len(runner.hosts) == 0:
break
if poll_results is None:
break
for (host, host_result) in poll_results['contacted'].iteritems():
# override last result with current status result for report
results['contacted'][host] = host_result
# output if requested
self.callbacks.on_async_poll(jid, host, clock, host_result)
# run down the clock
clock = clock - async_poll_interval
time.sleep(async_poll_interval)
# do not have to poll the completed hosts, smaller list
runner.hosts = self.hosts_to_poll(poll_results)
# mark any hosts that are still listed as started as failed
# since these likely got killed by async_wrapper
for (host, host_result) in results['contacted'].iteritems():
if 'started' in host_result:
results['contacted'][host] = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' }
return results
def _run_module(self, pattern, module, args, hosts, remote_user,
async_seconds, async_poll_interval):
''' run a particular module step in a playbook ''' ''' run a particular module step in a playbook '''
return ansible.runner.Runner( runner = ansible.runner.Runner(
pattern=pattern, pattern=pattern,
module_name=module, module_name=module,
module_args=args, module_args=args,
@ -181,7 +255,12 @@ class PlayBook(object):
remote_user=remote_user, remote_user=remote_user,
setup_cache=SETUP_CACHE, setup_cache=SETUP_CACHE,
basedir=self.basedir basedir=self.basedir
).run() )
if async_seconds == 0:
return runner.run()
else:
return self._async_poll(runner, async_seconds, async_poll_interval)
def _run_task(self, pattern=None, task=None, host_list=None, def _run_task(self, pattern=None, task=None, host_list=None,
remote_user=None, handlers=None, conditional=False): remote_user=None, handlers=None, conditional=False):
@ -203,6 +282,9 @@ class PlayBook(object):
# load the module name and parameters from the task entry # load the module name and parameters from the task entry
name = task['name'] name = task['name']
action = task['action'] action = task['action']
async_seconds = int(task.get('async', 0)) # not async by default
async_poll_interval = int(task.get('poll', 30)) # default poll = 30 seconds
# comment = task.get('comment', '') # comment = task.get('comment', '')
tokens = shlex.split(action) tokens = shlex.split(action)
@ -219,7 +301,8 @@ class PlayBook(object):
# load up an appropriate ansible runner to # load up an appropriate ansible runner to
# run the task in parallel # run the task in parallel
results = self._run_module(pattern, module_name, results = self._run_module(pattern, module_name,
module_args, host_list, remote_user) module_args, host_list, remote_user,
async_seconds, async_poll_interval)
# if no hosts are matched, carry on, unlike /bin/ansible # if no hosts are matched, carry on, unlike /bin/ansible
# which would warn you about this # which would warn you about this

@ -18,6 +18,11 @@
################################################ ################################################
# FIXME: need to add global error handling around
# executor_hook mapping all exceptions into failures
# with the traceback converted into a string and
# if the exception is typed, a *nice* string
try: try:
import json import json
except ImportError: except ImportError:
@ -373,8 +378,8 @@ class Runner(object):
def remote_log(self, conn, msg): def remote_log(self, conn, msg):
''' this is the function we use to log things ''' ''' this is the function we use to log things '''
# FIXME: TODO: make this optional as it's executed a lot
stdin, stdout, stderr = conn.exec_command('/usr/bin/logger -t ansible -p auth.info "%s"' % msg) stdin, stdout, stderr = conn.exec_command('/usr/bin/logger -t ansible -p auth.info "%s"' % msg)
# TODO: maybe make that optional
def _exec_command(self, conn, cmd): def _exec_command(self, conn, cmd):
''' execute a command string over SSH, return the output ''' ''' execute a command string over SSH, return the output '''
@ -392,9 +397,18 @@ class Runner(object):
def _copy_module(self, conn, tmp, module): def _copy_module(self, conn, tmp, module):
''' transfer a module over SFTP, does not run it ''' ''' transfer a module over SFTP, does not run it '''
if module.startswith("/"):
# user probably did "/bin/foo" instead of "command /bin/foo" in a playbook
# or tried "-m /bin/foo" instead of "a /bin/foo"
# FIXME: type this exception
raise Exception("%s is not a module" % module)
in_path = os.path.expanduser( in_path = os.path.expanduser(
os.path.join(self.module_path, module) os.path.join(self.module_path, module)
) )
if not os.path.exists(in_path):
# FIXME: type this exception
raise Exception("module not found: %s" % in_path)
out_path = tmp + module out_path = tmp + module
conn.put_file(in_path, out_path) conn.put_file(in_path, out_path)
return out_path return out_path

Loading…
Cancel
Save