Add async polling logic to runner. Will add to playbook shortly, have to diagnose why paramiko

is not letting async_wrapper daemonize itself when it does work fine when directly executed.
pull/25/head
Michael DeHaan 13 years ago
parent 60a13cf540
commit 5be1a612d3

@ -52,7 +52,7 @@ class Cli(object):
parser = OptionParser(usage = 'ansible <host-pattern> [options]') parser = OptionParser(usage = 'ansible <host-pattern> [options]')
parser.add_option("-a", "--args", dest="module_args", parser.add_option("-a", "--args", dest="module_args",
help="module arguments", default=C.DEFAULT_MODULE_ARGS) help="module arguments", default=C.DEFAULT_MODULE_ARGS)
parser.add_option("-B", "--background", dest="seconds", default=0, parser.add_option("-B", "--background", dest="seconds", type='int', default=0,
help="run asynchronously, failing after X seconds") help="run asynchronously, failing after X seconds")
parser.add_option('-f','--forks', dest='forks', default=C.DEFAULT_FORKS, type='int', parser.add_option('-f','--forks', dest='forks', default=C.DEFAULT_FORKS, type='int',
help='number of parallel processes to use') help='number of parallel processes to use')
@ -66,6 +66,8 @@ class Cli(object):
help="module name to execute", default=C.DEFAULT_MODULE_NAME) help="module name to execute", default=C.DEFAULT_MODULE_NAME)
parser.add_option('-o', '--one-line', dest='one_line', action='store_true', parser.add_option('-o', '--one-line', dest='one_line', action='store_true',
help="condense output") help="condense output")
parser.add_option('-P', '--poll', default=C.DEFAULT_POLL_INTERVAL, type='int',
dest='poll_interval', help="set the poll interval if using -B")
parser.add_option('-t', '--tree', dest='tree', default=None, parser.add_option('-t', '--tree', dest='tree', default=None,
help="log output to this directory") help="log output to this directory")
parser.add_option('-T', '--timeout', default=C.DEFAULT_TIMEOUT, type='int', parser.add_option('-T', '--timeout', default=C.DEFAULT_TIMEOUT, type='int',
@ -99,6 +101,8 @@ class Cli(object):
timeout=options.timeout, timeout=options.timeout,
forks=options.forks, forks=options.forks,
background=options.seconds, background=options.seconds,
poll_interval=options.poll_interval,
async_poll_callback=async_poll_status,
pattern=pattern, pattern=pattern,
verbose=True, verbose=True,
).run() ).run()

@ -29,5 +29,8 @@ DEFAULT_PATTERN = '*'
DEFAULT_FORKS = 5 DEFAULT_FORKS = 5
DEFAULT_MODULE_ARGS = '' DEFAULT_MODULE_ARGS = ''
DEFAULT_TIMEOUT = 10 DEFAULT_TIMEOUT = 10
DEFAULT_POLL_INTERVAL = 15
DEFAULT_REMOTE_USER = 'root' DEFAULT_REMOTE_USER = 'root'
DEFAULT_REMOTE_PASS = None DEFAULT_REMOTE_PASS = None

@ -67,6 +67,8 @@ class Runner(object):
basedir=None, basedir=None,
setup_cache={}, setup_cache={},
transport='paramiko', transport='paramiko',
poll_interval=None,
async_poll_callback=None,
verbose=False): verbose=False):
''' '''
@ -95,7 +97,12 @@ class Runner(object):
self.verbose = verbose self.verbose = verbose
self.remote_user = remote_user self.remote_user = remote_user
self.remote_pass = remote_pass self.remote_pass = remote_pass
self.background = background self.background = background
self.poll_interval = poll_interval
self.async_poll_callback = async_poll_callback
if self.async_poll_callback is None:
self.async_poll_callback = async_poll_status
if basedir is None: if basedir is None:
basedir = os.getcwd() basedir = os.getcwd()
@ -231,9 +238,9 @@ class Runner(object):
template = jinja2.Template(args) template = jinja2.Template(args)
args = template.render(inject_vars) args = template.render(inject_vars)
cmd = "%s %s" % (remote_module_path, args) cmd = "%s %s" % (remote_module_path, args)
result = self._exec_command(conn, cmd) result = self._exec_command(conn, cmd)
self._delete_remote_files(conn, [ tmp ])
return result return result
def _execute_normal_module(self, conn, host, tmp): def _execute_normal_module(self, conn, host, tmp):
@ -256,7 +263,6 @@ class Runner(object):
except: except:
var_result = {} var_result = {}
self._delete_remote_files(conn, tmp)
return self._return_from_module(conn, host, result) return self._return_from_module(conn, host, result)
def _execute_async_module(self, conn, host, tmp): def _execute_async_module(self, conn, host, tmp):
@ -301,7 +307,6 @@ class Runner(object):
# run the copy module # run the copy module
args = [ "src=%s" % tmp_src, "dest=%s" % dest ] args = [ "src=%s" % tmp_src, "dest=%s" % dest ]
result = self._execute_module(conn, tmp, module, args) result = self._execute_module(conn, tmp, module, args)
self._delete_remote_files(conn, tmp_path)
return self._return_from_module(conn, host, result) return self._return_from_module(conn, host, result)
def _execute_template(self, conn, host, tmp): def _execute_template(self, conn, host, tmp):
@ -331,7 +336,6 @@ class Runner(object):
# run the template module # run the template module
args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ] args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ]
result = self._execute_module(conn, tmp, template_module, args) result = self._execute_module(conn, tmp, template_module, args)
self._delete_remote_files(conn, [ tpath ])
return self._return_from_module(conn, host, result) return self._return_from_module(conn, host, result)
@ -358,6 +362,30 @@ class Runner(object):
result = self._execute_normal_module(conn, host, tmp) result = self._execute_normal_module(conn, host, tmp)
else: else:
result = self._execute_async_module(conn, host, tmp) result = self._execute_async_module(conn, host, tmp)
if self.poll_interval > 0:
# poll for completion
# FIXME: refactor
(host, ok, launch_result) = result
jid = launch_result.get('ansible_job_id', None)
if jid is None:
return result
if self.async_poll_callback is None:
self.async_poll_callback = async_poll_callback
self.module_name = 'async_status'
self.module_args = [ "jid=%s" % jid ]
clock = self.background
while (clock >= 0):
clock -= self.poll_interval
result = self._execute_normal_module(conn, host, tmp)
(host, ok, real_result) = result
self.async_poll_callback(self, clock, self.poll_interval, ok, host, jid, real_result)
if 'finished' in real_result or 'failed' in real_result:
clock=-1
self._delete_remote_files(conn, tmp)
conn.close()
return result
elif self.module_name == 'copy': elif self.module_name == 'copy':
result = self._execute_copy(conn, host, tmp) result = self._execute_copy(conn, host, tmp)
elif self.module_name == 'template': elif self.module_name == 'template':
@ -366,6 +394,7 @@ class Runner(object):
# this would be a coding error in THIS module # this would be a coding error in THIS module
# shouldn't occur # shouldn't occur
raise Exception("???") raise Exception("???")
self._delete_remote_files(conn, tmp) self._delete_remote_files(conn, tmp)
conn.close() conn.close()

@ -167,6 +167,7 @@ def prepare_writeable_dir(tree):
exit("Cannot write to path %s" % tree) exit("Cannot write to path %s" % tree)
def path_dwim(basedir, given): def path_dwim(basedir, given):
''' make relative paths work like folks expect '''
if given.startswith("/"): if given.startswith("/"):
return given return given
elif given.startswith("~/"): elif given.startswith("~/"):
@ -174,4 +175,12 @@ def path_dwim(basedir, given):
else: else:
return os.path.join(basedir, given) return os.path.join(basedir, given)
def async_poll_status(runner, clock, poll_interval, ok, host, jid, result):
if ok and 'finished' in result:
print "<job %s> finished on %s, %s" % (jid, host, result)
elif not ok or 'failed' in result:
print "<job %s> FAILED on %s, %s" % (jid, host, result)
else:
print "<job %s> polling on %s, %s remaining" % (jid, host, clock)

@ -67,7 +67,7 @@ def _run_command(wrapped_cmd, jid, log_path):
try: try:
cmd = shlex.split(wrapped_cmd) cmd = shlex.split(wrapped_cmd)
script = subprocess.Popen(cmd, shell=False, script = subprocess.Popen(cmd, shell=False,
stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = script.communicate() out, err = script.communicate()
result = json.loads(out) result = json.loads(out)
@ -94,7 +94,7 @@ def _run_command(wrapped_cmd, jid, log_path):
pid = os.fork() pid = os.fork()
if pid == 0: if pid == 0:
# "RETURNING SUCCESS IN UNO" "RETURNING SUCCESS IN UNO"
print json.dumps({ "started" : 1, "ansible_job_id" : jid }) print json.dumps({ "started" : 1, "ansible_job_id" : jid })
sys.exit(0) sys.exit(0)
else: else:

Loading…
Cancel
Save