Laying the groundwork for async mode, async status script still needs to be done, and async_wrapper

still needs to daemonize.  Then, once done, playbook can be taught how to poll async within the
timing window.
pull/3/head
Michael DeHaan 13 years ago
parent 1ed212513c
commit eaa7714ff8

@ -52,6 +52,8 @@ class Cli(object):
parser = OptionParser(usage = 'ansible <host-pattern> [options]') parser = OptionParser(usage = 'ansible <host-pattern> [options]')
parser.add_option("-a", "--args", dest="module_args", parser.add_option("-a", "--args", dest="module_args",
help="module arguments", default=C.DEFAULT_MODULE_ARGS) help="module arguments", default=C.DEFAULT_MODULE_ARGS)
parser.add_option("-B", "--background", dest="seconds", default=0,
help="run asynchronously, failing after X seconds")
parser.add_option('-f','--forks', dest='forks', default=C.DEFAULT_FORKS, type='int', parser.add_option('-f','--forks', dest='forks', default=C.DEFAULT_FORKS, type='int',
help='number of parallel processes to use') help='number of parallel processes to use')
parser.add_option("-i", "--inventory-file", dest="inventory", parser.add_option("-i", "--inventory-file", dest="inventory",
@ -96,6 +98,7 @@ class Cli(object):
host_list=options.inventory, host_list=options.inventory,
timeout=options.timeout, timeout=options.timeout,
forks=options.forks, forks=options.forks,
background=options.seconds,
pattern=pattern, pattern=pattern,
verbose=True, verbose=True,
).run() ).run()

@ -30,6 +30,7 @@ import os
import ansible.constants as C import ansible.constants as C
import Queue import Queue
import paramiko import paramiko
import random
################################################ ################################################
@ -56,6 +57,7 @@ class Runner(object):
pattern=C.DEFAULT_PATTERN, pattern=C.DEFAULT_PATTERN,
remote_user=C.DEFAULT_REMOTE_USER, remote_user=C.DEFAULT_REMOTE_USER,
remote_pass=C.DEFAULT_REMOTE_PASS, remote_pass=C.DEFAULT_REMOTE_PASS,
background=0,
verbose=False): verbose=False):
''' '''
@ -68,6 +70,7 @@ class Runner(object):
forks -------- how parallel should we be? 1 is extra debuggable. forks -------- how parallel should we be? 1 is extra debuggable.
remote_user -- who to login as (default root) remote_user -- who to login as (default root)
remote_pass -- provide only if you don't want to use keys or ssh-agent remote_pass -- provide only if you don't want to use keys or ssh-agent
background --- if non 0, run async, failing after X seconds, -1 == infinite
''' '''
# save input values # save input values
@ -82,9 +85,14 @@ class Runner(object):
self.verbose = verbose self.verbose = verbose
self.remote_user = remote_user self.remote_user = remote_user
self.remote_pass = remote_pass self.remote_pass = remote_pass
self.background = background
# hosts in each group name in the inventory file # hosts in each group name in the inventory file
self._tmp_paths = {} self._tmp_paths = {}
random.seed()
self.generated_jid = str(random.randint(0, 999999999999))
@classmethod @classmethod
def parse_hosts(cls, host_list): def parse_hosts(cls, host_list):
''' '''
@ -162,7 +170,11 @@ class Runner(object):
def _delete_remote_files(self, conn, files): def _delete_remote_files(self, conn, files):
''' deletes one or more remote files ''' ''' deletes one or more remote files '''
if type(files) == str:
files = [ files ]
for filename in files: for filename in files:
if not filename.startswith('/tmp/'):
raise Exception("not going to happen")
self._exec_command(conn, "rm -rf %s" % filename) self._exec_command(conn, "rm -rf %s" % filename)
def _transfer_file(self, conn, source, dest): def _transfer_file(self, conn, source, dest):
@ -172,20 +184,21 @@ class Runner(object):
sftp.put(source, dest) sftp.put(source, dest)
sftp.close() sftp.close()
def _transfer_module(self, conn, tmp): def _transfer_module(self, conn, tmp, module):
''' '''
transfers a module file to the remote side to execute it, transfers a module file to the remote side to execute it,
but does not execute it yet but does not execute it yet
''' '''
outpath = self._copy_module(conn, tmp) outpath = self._copy_module(conn, tmp, module)
self._exec_command(conn, "chmod +x %s" % outpath) self._exec_command(conn, "chmod +x %s" % outpath)
return outpath return outpath
def _execute_module(self, conn, outpath, tmp): def _execute_module(self, conn, tmp, remote_module_path, module_args):
''' '''
runs a module that has already been transferred runs a module that has already been transferred
''' '''
cmd = self._command(outpath) args = " ".join(module_args)
cmd = "%s %s" % (remote_module_path, args)
result = self._exec_command(conn, cmd) result = self._exec_command(conn, cmd)
self._delete_remote_files(conn, [ tmp ]) self._delete_remote_files(conn, [ tmp ])
return result return result
@ -195,11 +208,23 @@ class Runner(object):
transfer & execute a module that is not 'copy' or 'template' transfer & execute a module that is not 'copy' or 'template'
because those require extra work. because those require extra work.
''' '''
module = self._transfer_module(conn, tmp) module = self._transfer_module(conn, tmp, self.module_name)
result = self._execute_module(conn, module, tmp) result = self._execute_module(conn, tmp, module, self.module_args)
self._delete_remote_files(conn, tmp) self._delete_remote_files(conn, tmp)
result = self._return_from_module(conn, host, result) return self._return_from_module(conn, host, result)
return result
def _execute_async_module(self, conn, host, tmp):
'''
transfer the given module name, plus the async module
and then run the async module wrapping the other module
'''
async = self._transfer_module(conn, tmp, 'async_wrapper')
module = self._transfer_module(conn, tmp, self.module_name)
new_args = []
new_args = [ self.generated_jid, module ]
new_args.extend(self.module_args)
result = self._execute_module(conn, tmp, async, new_args)
return self._return_from_module(conn, host, result)
def _parse_kv(self, args): def _parse_kv(self, args):
''' helper function to convert a string of key/value items to a dict ''' ''' helper function to convert a string of key/value items to a dict '''
@ -225,11 +250,11 @@ class Runner(object):
# install the copy module # install the copy module
self.module_name = 'copy' self.module_name = 'copy'
module = self._transfer_module(conn) module = self._transfer_module(conn, tmp, 'copy')
# run the copy module # run the copy module
self.module_args = [ "src=%s" % tmp_src, "dest=%s" % dest ] args = [ "src=%s" % tmp_src, "dest=%s" % dest ]
result = self._execute_module(conn, module, tmp) result = self._execute_module(conn, tmp, module, args)
self._delete_remote_files(conn, tmp_path) self._delete_remote_files(conn, tmp_path)
return self._return_from_module(conn, host, result) return self._return_from_module(conn, host, result)
@ -253,8 +278,8 @@ class Runner(object):
module = self._transfer_module(conn, tmp) module = self._transfer_module(conn, tmp)
# run the template module # run the template module
self.module_args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ] args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ]
result = self._execute_module(conn, module, tmp) result = self._execute_module(conn, tmp, module, args)
self._delete_remote_files(conn, [ tpath ]) self._delete_remote_files(conn, [ tpath ])
return self._return_from_module(conn, host, result) return self._return_from_module(conn, host, result)
@ -278,7 +303,10 @@ class Runner(object):
tmp = self._get_tmp_path(conn) tmp = self._get_tmp_path(conn)
result = None result = None
if self.module_name not in [ 'copy', 'template' ]: if self.module_name not in [ 'copy', 'template' ]:
result = self._execute_normal_module(conn, host, tmp) if self.background == 0:
result = self._execute_normal_module(conn, host, tmp)
else:
result = self._execute_async_module(conn, host, tmp)
elif self.module_name == 'copy': elif self.module_name == 'copy':
result = self._execute_copy(conn, host, tmp) result = self._execute_copy(conn, host, tmp)
elif self.module_name == 'template': elif self.module_name == 'template':
@ -291,12 +319,6 @@ class Runner(object):
conn.close() conn.close()
return result return result
def _command(self, outpath):
''' form up a command string for running over SSH '''
cmd = "%s %s" % (outpath, " ".join(self.module_args))
return cmd
def remote_log(self, conn, msg): def remote_log(self, conn, msg):
''' this is the function we use to log things ''' ''' this is the function we use to log things '''
stdin, stdout, stderr = conn.exec_command('/usr/bin/logger -t ansible -p auth.info %r' % msg) stdin, stdout, stderr = conn.exec_command('/usr/bin/logger -t ansible -p auth.info %r' % msg)
@ -315,12 +337,12 @@ class Runner(object):
result = self._exec_command(conn, "mktemp -d /tmp/ansible.XXXXXX") result = self._exec_command(conn, "mktemp -d /tmp/ansible.XXXXXX")
return result.split("\n")[0] + '/' return result.split("\n")[0] + '/'
def _copy_module(self, conn, tmp): def _copy_module(self, conn, tmp, module):
''' transfer a module over SFTP, does not run it ''' ''' transfer a module over SFTP, does not run it '''
in_path = os.path.expanduser( in_path = os.path.expanduser(
os.path.join(self.module_path, self.module_name) os.path.join(self.module_path, module)
) )
out_path = tmp + self.module_name out_path = tmp + module
sftp = conn.open_sftp() sftp = conn.open_sftp()
sftp.put(in_path, out_path) sftp.put(in_path, out_path)
sftp.close() sftp.close()
@ -359,7 +381,6 @@ class Runner(object):
for worker in workers: for worker in workers:
worker.join() worker.join()
except KeyboardInterrupt: except KeyboardInterrupt:
print 'parent received ctrl-c'
for worker in workers: for worker in workers:
worker.terminate() worker.terminate()
worker.join() worker.join()

@ -0,0 +1,96 @@
#!/usr/bin/python
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>, and others
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
try:
import json
except ImportError:
import simplejson as json
import shlex
import os
import subprocess
import sys
import datetime
import traceback
if len(sys.argv) < 3:
print json.dumps({
"failed" : True,
"msg" : "usage: async_wrapper <jid> <module_script> <args>. Humans, do not call directly!"
})
sys.exit(1)
jid = sys.argv[1]
wrapped_module = sys.argv[2]
args = sys.argv[3:]
cmd = "%s %s" % (wrapped_module, " ".join(args))
# setup logging directory
logdir = os.path.expanduser("~/.ansible_async")
log_path = os.path.join(logdir, jid)
if not os.path.exists(logdir):
try:
os.makedirs(logdir)
except:
print json.dumps({
"failed" : 1,
"msg" : "could not create: %s" % logdir
})
def _run_command(wrapped_cmd, jid, log_path):
logfile = open(log_path, "w+")
logfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid }))
result = {}
try:
cmd = shlex.split(wrapped_cmd)
subprocess.call("/usr/bin/logger %s" % wrapped_cmd, shell=True)
script = subprocess.Popen(cmd, shell=False,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = script.communicate()
result = json.loads(out)
except (OSError, IOError), e:
result = {
"failed": 1,
"msg": str(e),
}
except:
result = {
"failed" : 1,
"msg" : traceback.format_exc()
}
result['ansible_job_id'] = jid
logfile = open(log_path, "w+")
logfile.write(json.dumps(result))
logfile.close()
# TEMPORARY:
print json.dumps(result)
# TODO: daemonize this with time limits
# TODO: might be nice to keep timing data, eventually...
_run_command(cmd, jid, log_path)

@ -22,4 +22,4 @@ try:
except ImportError: except ImportError:
import simplejson as json import simplejson as json
print json.dumps(1) print json.dumps({ "ping" : "pong" })

Loading…
Cancel
Save