From 5be1a612d3061003918098f96255021b21ac8497 Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Sun, 11 Mar 2012 18:40:35 -0400 Subject: [PATCH] Add async polling logic to runner. Will add to playbook shortly, have to diagnose why paramiko is not letting async_wrapper daemonize itself when it does work fine when directly executed. --- bin/ansible | 6 +++++- lib/ansible/constants.py | 3 +++ lib/ansible/runner.py | 39 ++++++++++++++++++++++++++++++++++----- lib/ansible/utils.py | 11 ++++++++++- library/async_wrapper | 4 ++-- 5 files changed, 54 insertions(+), 9 deletions(-) diff --git a/bin/ansible b/bin/ansible index 3098e5295ed..8189f1c93fa 100755 --- a/bin/ansible +++ b/bin/ansible @@ -52,7 +52,7 @@ class Cli(object): parser = OptionParser(usage = 'ansible [options]') parser.add_option("-a", "--args", dest="module_args", help="module arguments", default=C.DEFAULT_MODULE_ARGS) - parser.add_option("-B", "--background", dest="seconds", default=0, + parser.add_option("-B", "--background", dest="seconds", type='int', default=0, help="run asynchronously, failing after X seconds") parser.add_option('-f','--forks', dest='forks', default=C.DEFAULT_FORKS, type='int', help='number of parallel processes to use') @@ -66,6 +66,8 @@ class Cli(object): help="module name to execute", default=C.DEFAULT_MODULE_NAME) parser.add_option('-o', '--one-line', dest='one_line', action='store_true', help="condense output") + parser.add_option('-P', '--poll', default=C.DEFAULT_POLL_INTERVAL, type='int', + dest='poll_interval', help="set the poll interval if using -B") parser.add_option('-t', '--tree', dest='tree', default=None, help="log output to this directory") parser.add_option('-T', '--timeout', default=C.DEFAULT_TIMEOUT, type='int', @@ -99,6 +101,8 @@ class Cli(object): timeout=options.timeout, forks=options.forks, background=options.seconds, + poll_interval=options.poll_interval, + async_poll_callback=async_poll_status, pattern=pattern, verbose=True, ).run() diff --git a/lib/ansible/constants.py b/lib/ansible/constants.py index 3004d1a104b..fa0e1a511ef 100644 --- a/lib/ansible/constants.py +++ b/lib/ansible/constants.py @@ -29,5 +29,8 @@ DEFAULT_PATTERN = '*' DEFAULT_FORKS = 5 DEFAULT_MODULE_ARGS = '' DEFAULT_TIMEOUT = 10 +DEFAULT_POLL_INTERVAL = 15 DEFAULT_REMOTE_USER = 'root' DEFAULT_REMOTE_PASS = None + + diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index 0eed2be9a29..4eba3311a08 100755 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -67,6 +67,8 @@ class Runner(object): basedir=None, setup_cache={}, transport='paramiko', + poll_interval=None, + async_poll_callback=None, verbose=False): ''' @@ -95,7 +97,12 @@ class Runner(object): self.verbose = verbose self.remote_user = remote_user self.remote_pass = remote_pass - self.background = background + self.background = background + self.poll_interval = poll_interval + self.async_poll_callback = async_poll_callback + + if self.async_poll_callback is None: + self.async_poll_callback = async_poll_status if basedir is None: basedir = os.getcwd() @@ -231,9 +238,9 @@ class Runner(object): template = jinja2.Template(args) args = template.render(inject_vars) + cmd = "%s %s" % (remote_module_path, args) result = self._exec_command(conn, cmd) - self._delete_remote_files(conn, [ tmp ]) return result def _execute_normal_module(self, conn, host, tmp): @@ -256,7 +263,6 @@ class Runner(object): except: var_result = {} - self._delete_remote_files(conn, tmp) return self._return_from_module(conn, host, result) def _execute_async_module(self, conn, host, tmp): @@ -301,7 +307,6 @@ class Runner(object): # run the copy module args = [ "src=%s" % tmp_src, "dest=%s" % dest ] result = self._execute_module(conn, tmp, module, args) - self._delete_remote_files(conn, tmp_path) return self._return_from_module(conn, host, result) def _execute_template(self, conn, host, tmp): @@ -331,7 +336,6 @@ class Runner(object): # run the template module args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ] result = self._execute_module(conn, tmp, template_module, args) - self._delete_remote_files(conn, [ tpath ]) return self._return_from_module(conn, host, result) @@ -358,6 +362,30 @@ class Runner(object): result = self._execute_normal_module(conn, host, tmp) else: result = self._execute_async_module(conn, host, tmp) + if self.poll_interval > 0: + # poll for completion + # FIXME: refactor + + (host, ok, launch_result) = result + jid = launch_result.get('ansible_job_id', None) + if jid is None: + return result + if self.async_poll_callback is None: + self.async_poll_callback = async_poll_callback + self.module_name = 'async_status' + self.module_args = [ "jid=%s" % jid ] + clock = self.background + while (clock >= 0): + clock -= self.poll_interval + result = self._execute_normal_module(conn, host, tmp) + (host, ok, real_result) = result + self.async_poll_callback(self, clock, self.poll_interval, ok, host, jid, real_result) + if 'finished' in real_result or 'failed' in real_result: + clock=-1 + self._delete_remote_files(conn, tmp) + conn.close() + return result + elif self.module_name == 'copy': result = self._execute_copy(conn, host, tmp) elif self.module_name == 'template': @@ -366,6 +394,7 @@ class Runner(object): # this would be a coding error in THIS module # shouldn't occur raise Exception("???") + self._delete_remote_files(conn, tmp) conn.close() diff --git a/lib/ansible/utils.py b/lib/ansible/utils.py index 0b7b9b2d3b1..a48573c43e9 100755 --- a/lib/ansible/utils.py +++ b/lib/ansible/utils.py @@ -167,6 +167,7 @@ def prepare_writeable_dir(tree): exit("Cannot write to path %s" % tree) def path_dwim(basedir, given): + ''' make relative paths work like folks expect ''' if given.startswith("/"): return given elif given.startswith("~/"): @@ -174,4 +175,12 @@ def path_dwim(basedir, given): else: return os.path.join(basedir, given) - +def async_poll_status(runner, clock, poll_interval, ok, host, jid, result): + if ok and 'finished' in result: + print " finished on %s, %s" % (jid, host, result) + elif not ok or 'failed' in result: + print " FAILED on %s, %s" % (jid, host, result) + else: + print " polling on %s, %s remaining" % (jid, host, clock) + + diff --git a/library/async_wrapper b/library/async_wrapper index fefa4bb81f4..8371a6e5c77 100755 --- a/library/async_wrapper +++ b/library/async_wrapper @@ -67,7 +67,7 @@ def _run_command(wrapped_cmd, jid, log_path): try: cmd = shlex.split(wrapped_cmd) script = subprocess.Popen(cmd, shell=False, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = script.communicate() result = json.loads(out) @@ -94,7 +94,7 @@ def _run_command(wrapped_cmd, jid, log_path): pid = os.fork() if pid == 0: - # "RETURNING SUCCESS IN UNO" + "RETURNING SUCCESS IN UNO" print json.dumps({ "started" : 1, "ansible_job_id" : jid }) sys.exit(0) else: