From 86e19cd8c842c5996725032392e92f16aac545ff Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Mon, 12 Mar 2012 20:53:10 -0400 Subject: [PATCH] This adds async poll support to playbooks. See examples. Some more testing due + docs but this is more or less a mirror of what /bin/ansible does. It also has a 'fire and forget' mode if the poll interval is left off or set to 0. --- bin/ansible-playbook | 6 +++ examples/playbook.yml | 4 ++ lib/ansible/connection.py | 10 +++-- lib/ansible/playbook.py | 91 +++++++++++++++++++++++++++++++++++++-- lib/ansible/runner.py | 16 ++++++- 5 files changed, 119 insertions(+), 8 deletions(-) diff --git a/bin/ansible-playbook b/bin/ansible-playbook index 08d56117f6c..c91a17bac58 100755 --- a/bin/ansible-playbook +++ b/bin/ansible-playbook @@ -53,6 +53,12 @@ class PlaybookCallbacks(object): def on_play_start(self, pattern): print "PLAY [%s] ****************************\n" % pattern + def on_async_confused(self, msg): + print msg + + def on_async_poll(self, jid, host, clock, host_result): + print async_poll_status(jid, host, clock, host_result) + def main(args): ''' run ansible-playbook operations ''' diff --git a/examples/playbook.yml b/examples/playbook.yml index 2f40598e2d8..9c73273fff9 100644 --- a/examples/playbook.yml +++ b/examples/playbook.yml @@ -5,6 +5,10 @@ http_port: 80 max_clients: 200 tasks: + - name: simulate long running op (15 sec), wait for up to 45, poll every 5 + action: command /bin/sleep 15 + async: 45 + poll: 5 - include: base.yml favcolor=blue - name: write the foo config file using vars set above action: template src=foo.j2 dest=/etc/some_random_foo.conf diff --git a/lib/ansible/connection.py b/lib/ansible/connection.py index 8f57f61aeec..997a401255e 100755 --- a/lib/ansible/connection.py +++ b/lib/ansible/connection.py @@ -20,6 +20,7 @@ import paramiko import exceptions +import os ################################################ @@ -84,15 +85,18 @@ class ParamikoConnection(object): def exec_command(self, cmd): ''' run a command on the remote host ''' - stdin, stdout, stderr = self.ssh.exec_command(cmd) return (stdin, stdout, stderr) def put_file(self, in_path, out_path): ''' transfer a file from local to remote ''' - + if not os.path.exists(in_path): + raise AnsibleConnectionException("file or module does not exist: %s" % in_path) sftp = self.ssh.open_sftp() - sftp.put(in_path, out_path) + try: + sftp.put(in_path, out_path) + except IOError: + raise AnsibleConnectionException("failed to transfer file to %s" % out_path) sftp.close() def close(self): diff --git a/lib/ansible/playbook.py b/lib/ansible/playbook.py index 8e1888a670f..fd4a8c3c94f 100755 --- a/lib/ansible/playbook.py +++ b/lib/ansible/playbook.py @@ -24,6 +24,7 @@ import yaml import shlex import os import jinja2 +import time # used to transfer variables to Runner SETUP_CACHE={ } @@ -167,9 +168,82 @@ class PlayBook(object): new_hosts.append(x) return new_hosts - def _run_module(self, pattern, module, args, hosts, remote_user): + def hosts_to_poll(self, results): + ''' which hosts need more polling? ''' + hosts = [] + for (host, res) in results['contacted'].iteritems(): + # FIXME: make polling pattern in /bin/ansible match + # move to common function in utils + if not 'finished' in res and 'started' in res: + hosts.append(host) + return hosts + + + def _async_poll(self, runner, async_seconds, async_poll_interval): + ''' launch an async job, if poll_interval is set, wait for completion ''' + + # TODO: refactor this function + runner.background = async_seconds + results = runner.run() + + if async_poll_interval <= 0: + # if not polling, playbook requested fire and forget + # trust the user wanted that and return immediately + return results + + poll_hosts = results['contacted'].keys() + if len(poll_hosts) == 0: + # no hosts launched ok, return that. + return results + ahost = poll_hosts[0] + jid = results['contacted'][ahost].get('ansible_job_id', None) + if jid is None: + # FIXME this really shouldn't happen. consider marking hosts failed + # and looking for jid in other host. + self.callbacks.on_async_confused("unexpected error: unable to determine jid") + return results + + clock = async_seconds + runner.hosts = self.hosts_to_poll(results) + poll_results = results + while (clock >= 0): + runner.hosts = poll_hosts + # FIXME: make a "get_async_runner" method like in /bin/ansible + # loop until polling duration complete + runner.module_args = [ "jid=%s" % jid ] + runner.module_name = 'async_status' + # FIXME: make it such that if you say 'async_status' you + # can't background that op! + runner.background = 0 + runner.pattern = '*' + runner.hosts = self.hosts_to_poll(poll_results) + poll_results = runner.run() + if len(runner.hosts) == 0: + break + if poll_results is None: + break + for (host, host_result) in poll_results['contacted'].iteritems(): + # override last result with current status result for report + results['contacted'][host] = host_result + # output if requested + self.callbacks.on_async_poll(jid, host, clock, host_result) + # run down the clock + clock = clock - async_poll_interval + time.sleep(async_poll_interval) + # do not have to poll the completed hosts, smaller list + runner.hosts = self.hosts_to_poll(poll_results) + # mark any hosts that are still listed as started as failed + # since these likely got killed by async_wrapper + for (host, host_result) in results['contacted'].iteritems(): + if 'started' in host_result: + results['contacted'][host] = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' } + return results + + def _run_module(self, pattern, module, args, hosts, remote_user, + async_seconds, async_poll_interval): + ''' run a particular module step in a playbook ''' - return ansible.runner.Runner( + runner = ansible.runner.Runner( pattern=pattern, module_name=module, module_args=args, @@ -181,7 +255,12 @@ class PlayBook(object): remote_user=remote_user, setup_cache=SETUP_CACHE, basedir=self.basedir - ).run() + ) + + if async_seconds == 0: + return runner.run() + else: + return self._async_poll(runner, async_seconds, async_poll_interval) def _run_task(self, pattern=None, task=None, host_list=None, remote_user=None, handlers=None, conditional=False): @@ -203,6 +282,9 @@ class PlayBook(object): # load the module name and parameters from the task entry name = task['name'] action = task['action'] + async_seconds = int(task.get('async', 0)) # not async by default + async_poll_interval = int(task.get('poll', 30)) # default poll = 30 seconds + # comment = task.get('comment', '') tokens = shlex.split(action) @@ -219,7 +301,8 @@ class PlayBook(object): # load up an appropriate ansible runner to # run the task in parallel results = self._run_module(pattern, module_name, - module_args, host_list, remote_user) + module_args, host_list, remote_user, + async_seconds, async_poll_interval) # if no hosts are matched, carry on, unlike /bin/ansible # which would warn you about this diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index 83aee0455df..3b44b51bf8d 100755 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -18,6 +18,11 @@ ################################################ +# FIXME: need to add global error handling around +# executor_hook mapping all exceptions into failures +# with the traceback converted into a string and +# if the exception is typed, a *nice* string + try: import json except ImportError: @@ -373,8 +378,8 @@ class Runner(object): def remote_log(self, conn, msg): ''' this is the function we use to log things ''' + # FIXME: TODO: make this optional as it's executed a lot stdin, stdout, stderr = conn.exec_command('/usr/bin/logger -t ansible -p auth.info "%s"' % msg) - # TODO: maybe make that optional def _exec_command(self, conn, cmd): ''' execute a command string over SSH, return the output ''' @@ -392,9 +397,18 @@ class Runner(object): def _copy_module(self, conn, tmp, module): ''' transfer a module over SFTP, does not run it ''' + if module.startswith("/"): + # user probably did "/bin/foo" instead of "command /bin/foo" in a playbook + # or tried "-m /bin/foo" instead of "a /bin/foo" + # FIXME: type this exception + raise Exception("%s is not a module" % module) in_path = os.path.expanduser( os.path.join(self.module_path, module) ) + if not os.path.exists(in_path): + # FIXME: type this exception + raise Exception("module not found: %s" % in_path) + out_path = tmp + module conn.put_file(in_path, out_path) return out_path