Fix casing/underscore convention in method name, split polling logic away from runner.

pull/603/head
Michael DeHaan 13 years ago
parent 6d580aea02
commit db1d5b154a

@ -101,7 +101,7 @@ class Cli(object):
if options.seconds: if options.seconds:
print "background launch...\n\n" print "background launch...\n\n"
results, poller = runner.runAsync(options.seconds) results, poller = runner.run_async(options.seconds)
results = self.poll_while_needed(poller, options) results = self.poll_while_needed(poller, options)
else: else:
results = runner.run() results = runner.run()

@ -295,7 +295,7 @@ class PlayBook(object):
if async_seconds == 0: if async_seconds == 0:
results = runner.run() results = runner.run()
else: else:
results, poller = runner.runAsync(async_seconds) results, poller = runner.run_async(async_seconds)
self.stats.compute(results) self.stats.compute(results)
if async_poll_interval > 0: if async_poll_interval > 0:
# if not polling, playbook requested fire and forget # if not polling, playbook requested fire and forget

@ -0,0 +1,99 @@
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import time
class AsyncPoller(object):
""" Manage asynchronous jobs. """
def __init__(self, results, runner):
self.runner = runner
self.results = { 'contacted': {}, 'dark': {}}
self.hosts_to_poll = []
self.completed = False
# Get job id and which hosts to poll again in the future
jid = None
for (host, res) in results['contacted'].iteritems():
if res.get('started', False):
self.hosts_to_poll.append(host)
jid = res.get('ansible_job_id', None)
else:
self.results['contacted'][host] = res
for (host, res) in results['dark'].iteritems():
self.results['dark'][host] = res
if jid is None:
raise errors.AnsibleError("unexpected error: unable to determine jid")
if len(self.hosts_to_poll)==0:
raise errors.AnsibleErrot("unexpected error: no hosts to poll")
self.jid = jid
def poll(self):
""" Poll the job status.
Returns the changes in this iteration."""
self.runner.module_name = 'async_status'
self.runner.module_args = "jid=%s" % self.jid
self.runner.pattern = "*"
self.runner.background = 0
self.runner.inventory.restrict_to(self.hosts_to_poll)
results = self.runner.run()
self.runner.inventory.lift_restriction()
hosts = []
poll_results = { 'contacted': {}, 'dark': {}, 'polled': {}}
for (host, res) in results['contacted'].iteritems():
if res.get('started',False):
hosts.append(host)
poll_results['polled'][host] = res
else:
self.results['contacted'][host] = res
poll_results['contacted'][host] = res
if 'failed' in res:
self.runner.callbacks.on_async_failed(host, res, self.jid)
else:
self.runner.callbacks.on_async_ok(host, res, self.jid)
for (host, res) in results['dark'].iteritems():
self.results['dark'][host] = res
poll_results['dark'][host] = res
self.runner.callbacks.on_async_failed(host, res, self.jid)
self.hosts_to_poll = hosts
if len(hosts)==0:
self.completed = True
return poll_results
def wait(self, seconds, poll_interval):
""" Wait a certain time for job completion, check status every poll_interval. """
clock = seconds - poll_interval
while (clock >= 0 and not self.completed):
time.sleep(poll_interval)
poll_results = self.poll()
for (host, res) in poll_results['polled'].iteritems():
if res.get('started'):
self.runner.callbacks.on_async_poll(host, res, self.jid, clock)
clock = clock - poll_interval
return self.results

@ -36,6 +36,7 @@ import ansible.connection
import ansible.inventory import ansible.inventory
from ansible import utils from ansible import utils
from ansible import errors from ansible import errors
from ansible import poller
from ansible import callbacks as ans_callbacks from ansible import callbacks as ans_callbacks
HAS_ATFORK=True HAS_ATFORK=True
@ -682,12 +683,9 @@ class Runner(object):
else: else:
out=stdout out=stdout
# sudo mode paramiko doesn't capture stderr, so not relaying here either... # sudo mode paramiko doesn't capture stderr, so not relaying here either...
return out return out
# ***************************************************** # *****************************************************
def _get_tmp_path(self, conn): def _get_tmp_path(self, conn):
@ -798,12 +796,12 @@ class Runner(object):
results = [ self._executor(h[1]) for h in hosts ] results = [ self._executor(h[1]) for h in hosts ]
return self._partition_results(results) return self._partition_results(results)
def runAsync(self, time_limit): def run_async(self, time_limit):
''' Run this module asynchronously and return a poller. ''' ''' Run this module asynchronously and return a poller. '''
self.background = time_limit self.background = time_limit
results = self.run() results = self.run()
return results, AsyncPoller(results, self) return results, poller.AsyncPoller(results, self)
class AsyncPoller(object): class AsyncPoller(object):
""" Manage asynchronous jobs. """ """ Manage asynchronous jobs. """

Loading…
Cancel
Save