Fixing multiple v2 bugs

pull/10622/head
James Cammarata 10 years ago
parent 4bc79a746a
commit e82ba723e2

@ -88,18 +88,11 @@ class PlayIterator:
FAILED_ALWAYS = 8
def __init__(self, inventory, play):
# FIXME: should we save the post_validated play from below here instead?
self._play = play
# post validate the play, as we need some fields to be finalized now
# so that we can use them to setup the iterator properly
all_vars = inventory._variable_manager.get_vars(loader=inventory._loader, play=play)
new_play = play.copy()
new_play.post_validate(all_vars, fail_on_undefined=False)
self._blocks = new_play.compile()
self._blocks = self._play.compile()
self._host_states = {}
for host in inventory.get_hosts(new_play.hosts):
for host in inventory.get_hosts(self._play.hosts):
self._host_states[host.name] = HostState(blocks=self._blocks)
def get_host_state(self, host):

@ -124,7 +124,7 @@ class PlaybookExecutor:
break
if result != 0:
raise AnsibleError("Play failed!: %d" % result)
break
i = i + 1 # per play
@ -138,7 +138,6 @@ class PlaybookExecutor:
if self._tqm is not None:
self._cleanup()
#TODO: move to callback
# FIXME: this stat summary stuff should be cleaned up and moved
# to a new method, if it even belongs here...
self._display.banner("PLAY RECAP")

@ -123,7 +123,8 @@ class TaskQueueManager:
# FIXME: there is a block compile helper for this...
handler_list = []
for handler_block in handlers:
handler_list.extend(handler_block.compile())
for handler in handler_block.block:
handler_list.append(handler)
# then initalize it with the handler names from the handler list
for handler in handler_list:
@ -138,23 +139,28 @@ class TaskQueueManager:
are done with the current task).
'''
connection_info = ConnectionInformation(play, self._options)
all_vars = self._variable_manager.get_vars(loader=self._loader, play=play)
new_play = play.copy()
new_play.post_validate(all_vars, fail_on_undefined=False)
connection_info = ConnectionInformation(new_play, self._options)
for callback_plugin in self._callback_plugins:
if hasattr(callback_plugin, 'set_connection_info'):
callback_plugin.set_connection_info(connection_info)
self.send_callback('v2_playbook_on_play_start', play)
self.send_callback('v2_playbook_on_play_start', new_play)
# initialize the shared dictionary containing the notified handlers
self._initialize_notified_handlers(play.handlers)
self._initialize_notified_handlers(new_play.handlers)
# load the specified strategy (or the default linear one)
strategy = strategy_loader.get(play.strategy, self)
strategy = strategy_loader.get(new_play.strategy, self)
if strategy is None:
raise AnsibleError("Invalid play strategy specified: %s" % play.strategy, obj=play._ds)
raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds)
# build the iterator
iterator = PlayIterator(inventory=self._inventory, play=play)
iterator = PlayIterator(inventory=self._inventory, play=new_play)
# and run the play using the strategy
return strategy.run(iterator, connection_info)

@ -90,7 +90,7 @@ class ActionModule(ActionBase):
src = self._loader.path_dwim_relative(self._task._role._role_path, 'files', src)
else:
# the source is local, so expand it here
src = os.path.expanduser(src)
src = self._loader.path_dwim(os.path.expanduser(src))
_re = None
if regexp is not None:

@ -390,7 +390,6 @@ class StrategyBase:
# of handlers based on the notified list
for handler_block in iterator._play.handlers:
debug("handlers are: %s" % handlers)
# FIXME: handlers need to support the rescue/always portions of blocks too,
# but this may take some work in the iterator and gets tricky when
# we consider the ability of meta tasks to flush handlers

@ -22,6 +22,7 @@ __metaclass__ = type
import time
from ansible.plugins.strategies import StrategyBase
from ansible.utils.debug import debug
class StrategyModule(StrategyBase):
@ -42,66 +43,106 @@ class StrategyModule(StrategyBase):
# the last host to be given a task
last_host = 0
result = True
work_to_do = True
while work_to_do and not self._tqm._terminated:
hosts_left = self.get_hosts_remaining()
hosts_left = self.get_hosts_remaining(iterator._play)
if len(hosts_left) == 0:
self._callback.playbook_on_no_hosts_remaining()
self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
result = False
break
# using .qsize() is a best estimate anyway, due to the
# multiprocessing/threading concerns (per the python docs)
if 1: #if self._job_queue.qsize() < len(hosts_left):
work_to_do = False # assume we have no more work to do
starting_host = last_host # save current position so we know when we've
# looped back around and need to break
# try and find an unblocked host with a task to run
while True:
host = hosts_left[last_host]
host_name = host.get_name()
# peek at the next task for the host, to see if there's
# anything to do do for this host
if host_name not in self._tqm._failed_hosts and host_name not in self._tqm._unreachable_hosts and iterator.get_next_task_for_host(host, peek=True):
# FIXME: check task tags, etc. here as we do in linear
# FIXME: handle meta tasks here, which will require a tweak
# to run_handlers so that only the handlers on this host
# are flushed and not all
# set the flag so the outer loop knows we've still found
# some work which needs to be done
work_to_do = True
# check to see if this host is blocked (still executing a previous task)
if not host_name in self._blocked_hosts:
# pop the task, mark the host blocked, and queue it
self._blocked_hosts[host_name] = True
task = iterator.get_next_task_for_host(host)
#self._callback.playbook_on_task_start(task.get_name(), False)
self._queue_task(iterator._play, host, task, connection_info)
# move on to the next host and make sure we
# haven't gone past the end of our hosts list
last_host += 1
if last_host > len(hosts_left) - 1:
last_host = 0
# if we've looped around back to the start, break out
if last_host == starting_host:
break
work_to_do = False # assume we have no more work to do
starting_host = last_host # save current position so we know when we've
# looped back around and need to break
# try and find an unblocked host with a task to run
host_results = []
while True:
host = hosts_left[last_host]
debug("next free host: %s" % host)
host_name = host.get_name()
# peek at the next task for the host, to see if there's
# anything to do do for this host
(state, task) = iterator.get_next_task_for_host(host, peek=True)
debug("free host state: %s" % state)
debug("free host task: %s" % task)
if host_name not in self._tqm._failed_hosts and host_name not in self._tqm._unreachable_hosts and task:
# set the flag so the outer loop knows we've still found
# some work which needs to be done
work_to_do = True
debug("this host has work to do")
# check to see if this host is blocked (still executing a previous task)
if not host_name in self._blocked_hosts:
# pop the task, mark the host blocked, and queue it
self._blocked_hosts[host_name] = True
(state, task) = iterator.get_next_task_for_host(host)
debug("getting variables")
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
debug("done getting variables")
# check to see if this task should be skipped, due to it being a member of a
# role which has already run (and whether that role allows duplicate execution)
if task._role and task._role.has_run():
# If there is no metadata, the default behavior is to not allow duplicates,
# if there is metadata, check to see if the allow_duplicates flag was set to true
if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates:
debug("'%s' skipped because role has already run" % task)
continue
if not task.evaluate_tags(connection_info.only_tags, connection_info.skip_tags, task_vars) and task.action != 'setup':
debug("'%s' failed tag evaluation" % task)
continue
if task.action == 'meta':
# meta tasks store their args in the _raw_params field of args,
# since they do not use k=v pairs, so get that
meta_action = task.args.get('_raw_params')
if meta_action == 'noop':
# FIXME: issue a callback for the noop here?
continue
elif meta_action == 'flush_handlers':
# FIXME: in the 'free' mode, flushing handlers should result in
# only those handlers notified for the host doing the flush
self.run_handlers(iterator, connection_info)
else:
raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds)
self._blocked_hosts[host_name] = False
else:
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
self._queue_task(host, task, task_vars, connection_info)
# move on to the next host and make sure we
# haven't gone past the end of our hosts list
last_host += 1
if last_host > len(hosts_left) - 1:
last_host = 0
# if we've looped around back to the start, break out
if last_host == starting_host:
break
results = self._process_pending_results(iterator)
host_results.extend(results)
# pause briefly so we don't spin lock
time.sleep(0.05)
try:
self._wait_for_pending_results()
except:
results = self._wait_on_pending_results(iterator)
host_results.extend(results)
except Exception, e:
# FIXME: ctrl+c can cause some failures here, so catch them
# with the appropriate error type
print("wtf: %s" % e)
pass
# run the base class run() method, which executes the cleanup function

@ -0,0 +1,10 @@
- hosts: all
strategy: free
gather_facts: no
tasks:
- debug: msg="all hosts should print this"
- pause: seconds=5
when: inventory_hostname == 'l2'
- pause: seconds=10
when: inventory_hostname == 'l3'
- debug: msg="and we're done"

@ -1,12 +1,7 @@
# will use linear strategy by default
- hosts:
- "{{hosts|default('all')}}"
#- ubuntu1404
#- awxlocal
connection: ssh
- hosts: "{{hosts|default('all')}}"
#gather_facts: false
#strategy: free
#serial: 3
strategy: "{{strategy|default('linear')}}"
vars:
play_var: foo
test_dict:
@ -15,14 +10,9 @@
vars_files:
- testing/vars.yml
tasks:
- block:
- debug: var=ansible_nodename
when: ansible_nodename == "ubuntu1404"
- block:
- debug: msg="in block for {{inventory_hostname}} ({{ansible_nodename}}), group_var is {{group_var}}, host var is {{host_var}}"
notify: foo
- debug: msg="test dictionary is {{test_dict}}"
when: asdf is defined
- command: hostname
register: hostname_result
- debug: msg="registered result is {{hostname_result.stdout}}"
@ -31,26 +21,18 @@
sudo_user: testing
- assemble: src=./testing/ dest=/tmp/output.txt remote_src=no
- copy: content="hello world\n" dest=/tmp/copy_content.out mode=600
- command: /bin/false
retries: "{{num_retries|default(5)}}"
delay: 1
- debug: msg="you shouldn't see me"
#- command: /bin/false
# retries: "{{num_retries|default(5)}}"
# delay: 1
#- debug: msg="you shouldn't see me"
rescue:
- debug: msg="this is the rescue"
- command: /bin/false
- debug: msg="you should not see this rescue message"
always:
- debug: msg="this is the always block, it should always be seen"
- command: /bin/false
- debug: msg="you should not see this always message"
#- debug: msg="linear task 01"
#- debug: msg="linear task 02"
#- debug: msg="linear task 03"
# with_items:
# - a
# - b
# - c
#- command: /bin/false
#- debug: msg="you should not see this always message"
handlers:
- name: foo
@ -58,13 +40,3 @@
- name: bar
debug: msg="this is the bar handler, you should not see this"
#- hosts: all
# connection: local
# strategy: free
# tasks:
# - ping:
# - command: /bin/false
# - debug: msg="free task 01"
# - debug: msg="free task 02"
# - debug: msg="free task 03"

Loading…
Cancel
Save