diff --git a/v2/ansible/executor/play_iterator.py b/v2/ansible/executor/play_iterator.py index d6fe3750955..38bebb21132 100644 --- a/v2/ansible/executor/play_iterator.py +++ b/v2/ansible/executor/play_iterator.py @@ -88,18 +88,11 @@ class PlayIterator: FAILED_ALWAYS = 8 def __init__(self, inventory, play): - # FIXME: should we save the post_validated play from below here instead? self._play = play - # post validate the play, as we need some fields to be finalized now - # so that we can use them to setup the iterator properly - all_vars = inventory._variable_manager.get_vars(loader=inventory._loader, play=play) - new_play = play.copy() - new_play.post_validate(all_vars, fail_on_undefined=False) - - self._blocks = new_play.compile() + self._blocks = self._play.compile() self._host_states = {} - for host in inventory.get_hosts(new_play.hosts): + for host in inventory.get_hosts(self._play.hosts): self._host_states[host.name] = HostState(blocks=self._blocks) def get_host_state(self, host): diff --git a/v2/ansible/executor/playbook_executor.py b/v2/ansible/executor/playbook_executor.py index 9f02cddddb6..6504fddfc82 100644 --- a/v2/ansible/executor/playbook_executor.py +++ b/v2/ansible/executor/playbook_executor.py @@ -124,7 +124,7 @@ class PlaybookExecutor: break if result != 0: - raise AnsibleError("Play failed!: %d" % result) + break i = i + 1 # per play @@ -138,7 +138,6 @@ class PlaybookExecutor: if self._tqm is not None: self._cleanup() - #TODO: move to callback # FIXME: this stat summary stuff should be cleaned up and moved # to a new method, if it even belongs here... self._display.banner("PLAY RECAP") diff --git a/v2/ansible/executor/task_queue_manager.py b/v2/ansible/executor/task_queue_manager.py index 28904676eb2..d0354786da9 100644 --- a/v2/ansible/executor/task_queue_manager.py +++ b/v2/ansible/executor/task_queue_manager.py @@ -123,7 +123,8 @@ class TaskQueueManager: # FIXME: there is a block compile helper for this... handler_list = [] for handler_block in handlers: - handler_list.extend(handler_block.compile()) + for handler in handler_block.block: + handler_list.append(handler) # then initalize it with the handler names from the handler list for handler in handler_list: @@ -138,23 +139,28 @@ class TaskQueueManager: are done with the current task). ''' - connection_info = ConnectionInformation(play, self._options) + all_vars = self._variable_manager.get_vars(loader=self._loader, play=play) + + new_play = play.copy() + new_play.post_validate(all_vars, fail_on_undefined=False) + + connection_info = ConnectionInformation(new_play, self._options) for callback_plugin in self._callback_plugins: if hasattr(callback_plugin, 'set_connection_info'): callback_plugin.set_connection_info(connection_info) - self.send_callback('v2_playbook_on_play_start', play) + self.send_callback('v2_playbook_on_play_start', new_play) # initialize the shared dictionary containing the notified handlers - self._initialize_notified_handlers(play.handlers) + self._initialize_notified_handlers(new_play.handlers) # load the specified strategy (or the default linear one) - strategy = strategy_loader.get(play.strategy, self) + strategy = strategy_loader.get(new_play.strategy, self) if strategy is None: - raise AnsibleError("Invalid play strategy specified: %s" % play.strategy, obj=play._ds) + raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds) # build the iterator - iterator = PlayIterator(inventory=self._inventory, play=play) + iterator = PlayIterator(inventory=self._inventory, play=new_play) # and run the play using the strategy return strategy.run(iterator, connection_info) diff --git a/v2/ansible/plugins/action/assemble.py b/v2/ansible/plugins/action/assemble.py index b1bdc06c6d3..638d4b92bb5 100644 --- a/v2/ansible/plugins/action/assemble.py +++ b/v2/ansible/plugins/action/assemble.py @@ -90,7 +90,7 @@ class ActionModule(ActionBase): src = self._loader.path_dwim_relative(self._task._role._role_path, 'files', src) else: # the source is local, so expand it here - src = os.path.expanduser(src) + src = self._loader.path_dwim(os.path.expanduser(src)) _re = None if regexp is not None: diff --git a/v2/ansible/plugins/strategies/__init__.py b/v2/ansible/plugins/strategies/__init__.py index 59c0b9b84ee..afbc373f4f3 100644 --- a/v2/ansible/plugins/strategies/__init__.py +++ b/v2/ansible/plugins/strategies/__init__.py @@ -390,7 +390,6 @@ class StrategyBase: # of handlers based on the notified list for handler_block in iterator._play.handlers: - debug("handlers are: %s" % handlers) # FIXME: handlers need to support the rescue/always portions of blocks too, # but this may take some work in the iterator and gets tricky when # we consider the ability of meta tasks to flush handlers diff --git a/v2/ansible/plugins/strategies/free.py b/v2/ansible/plugins/strategies/free.py index 6aab495fec3..4fd8a132018 100644 --- a/v2/ansible/plugins/strategies/free.py +++ b/v2/ansible/plugins/strategies/free.py @@ -22,6 +22,7 @@ __metaclass__ = type import time from ansible.plugins.strategies import StrategyBase +from ansible.utils.debug import debug class StrategyModule(StrategyBase): @@ -42,66 +43,106 @@ class StrategyModule(StrategyBase): # the last host to be given a task last_host = 0 + result = True + work_to_do = True while work_to_do and not self._tqm._terminated: - hosts_left = self.get_hosts_remaining() + hosts_left = self.get_hosts_remaining(iterator._play) if len(hosts_left) == 0: - self._callback.playbook_on_no_hosts_remaining() + self._tqm.send_callback('v2_playbook_on_no_hosts_remaining') + result = False break - # using .qsize() is a best estimate anyway, due to the - # multiprocessing/threading concerns (per the python docs) - if 1: #if self._job_queue.qsize() < len(hosts_left): - - work_to_do = False # assume we have no more work to do - starting_host = last_host # save current position so we know when we've - # looped back around and need to break - - # try and find an unblocked host with a task to run - while True: - host = hosts_left[last_host] - host_name = host.get_name() - - # peek at the next task for the host, to see if there's - # anything to do do for this host - if host_name not in self._tqm._failed_hosts and host_name not in self._tqm._unreachable_hosts and iterator.get_next_task_for_host(host, peek=True): - - # FIXME: check task tags, etc. here as we do in linear - # FIXME: handle meta tasks here, which will require a tweak - # to run_handlers so that only the handlers on this host - # are flushed and not all - - # set the flag so the outer loop knows we've still found - # some work which needs to be done - work_to_do = True - - # check to see if this host is blocked (still executing a previous task) - if not host_name in self._blocked_hosts: - # pop the task, mark the host blocked, and queue it - self._blocked_hosts[host_name] = True - task = iterator.get_next_task_for_host(host) - #self._callback.playbook_on_task_start(task.get_name(), False) - self._queue_task(iterator._play, host, task, connection_info) - - # move on to the next host and make sure we - # haven't gone past the end of our hosts list - last_host += 1 - if last_host > len(hosts_left) - 1: - last_host = 0 - - # if we've looped around back to the start, break out - if last_host == starting_host: - break + work_to_do = False # assume we have no more work to do + starting_host = last_host # save current position so we know when we've + # looped back around and need to break + + # try and find an unblocked host with a task to run + host_results = [] + while True: + host = hosts_left[last_host] + debug("next free host: %s" % host) + host_name = host.get_name() + + # peek at the next task for the host, to see if there's + # anything to do do for this host + (state, task) = iterator.get_next_task_for_host(host, peek=True) + debug("free host state: %s" % state) + debug("free host task: %s" % task) + if host_name not in self._tqm._failed_hosts and host_name not in self._tqm._unreachable_hosts and task: + + # set the flag so the outer loop knows we've still found + # some work which needs to be done + work_to_do = True + + debug("this host has work to do") + + # check to see if this host is blocked (still executing a previous task) + if not host_name in self._blocked_hosts: + # pop the task, mark the host blocked, and queue it + self._blocked_hosts[host_name] = True + (state, task) = iterator.get_next_task_for_host(host) + + debug("getting variables") + task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) + debug("done getting variables") + + # check to see if this task should be skipped, due to it being a member of a + # role which has already run (and whether that role allows duplicate execution) + if task._role and task._role.has_run(): + # If there is no metadata, the default behavior is to not allow duplicates, + # if there is metadata, check to see if the allow_duplicates flag was set to true + if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates: + debug("'%s' skipped because role has already run" % task) + continue + + if not task.evaluate_tags(connection_info.only_tags, connection_info.skip_tags, task_vars) and task.action != 'setup': + debug("'%s' failed tag evaluation" % task) + continue + + if task.action == 'meta': + # meta tasks store their args in the _raw_params field of args, + # since they do not use k=v pairs, so get that + meta_action = task.args.get('_raw_params') + if meta_action == 'noop': + # FIXME: issue a callback for the noop here? + continue + elif meta_action == 'flush_handlers': + # FIXME: in the 'free' mode, flushing handlers should result in + # only those handlers notified for the host doing the flush + self.run_handlers(iterator, connection_info) + else: + raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds) + + self._blocked_hosts[host_name] = False + else: + self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) + self._queue_task(host, task, task_vars, connection_info) + + # move on to the next host and make sure we + # haven't gone past the end of our hosts list + last_host += 1 + if last_host > len(hosts_left) - 1: + last_host = 0 + + # if we've looped around back to the start, break out + if last_host == starting_host: + break + + results = self._process_pending_results(iterator) + host_results.extend(results) # pause briefly so we don't spin lock time.sleep(0.05) try: - self._wait_for_pending_results() - except: + results = self._wait_on_pending_results(iterator) + host_results.extend(results) + except Exception, e: # FIXME: ctrl+c can cause some failures here, so catch them # with the appropriate error type + print("wtf: %s" % e) pass # run the base class run() method, which executes the cleanup function diff --git a/v2/samples/test_free.yml b/v2/samples/test_free.yml new file mode 100644 index 00000000000..d5f8bcaac94 --- /dev/null +++ b/v2/samples/test_free.yml @@ -0,0 +1,10 @@ +- hosts: all + strategy: free + gather_facts: no + tasks: + - debug: msg="all hosts should print this" + - pause: seconds=5 + when: inventory_hostname == 'l2' + - pause: seconds=10 + when: inventory_hostname == 'l3' + - debug: msg="and we're done" diff --git a/v2/samples/test_pb.yml b/v2/samples/test_pb.yml index 3912d4566b2..ab5b7ab2954 100644 --- a/v2/samples/test_pb.yml +++ b/v2/samples/test_pb.yml @@ -1,12 +1,7 @@ # will use linear strategy by default -- hosts: - - "{{hosts|default('all')}}" - #- ubuntu1404 - #- awxlocal - connection: ssh +- hosts: "{{hosts|default('all')}}" #gather_facts: false - #strategy: free - #serial: 3 + strategy: "{{strategy|default('linear')}}" vars: play_var: foo test_dict: @@ -15,14 +10,9 @@ vars_files: - testing/vars.yml tasks: - - block: - - debug: var=ansible_nodename - when: ansible_nodename == "ubuntu1404" - block: - debug: msg="in block for {{inventory_hostname}} ({{ansible_nodename}}), group_var is {{group_var}}, host var is {{host_var}}" notify: foo - - debug: msg="test dictionary is {{test_dict}}" - when: asdf is defined - command: hostname register: hostname_result - debug: msg="registered result is {{hostname_result.stdout}}" @@ -31,26 +21,18 @@ sudo_user: testing - assemble: src=./testing/ dest=/tmp/output.txt remote_src=no - copy: content="hello world\n" dest=/tmp/copy_content.out mode=600 - - command: /bin/false - retries: "{{num_retries|default(5)}}" - delay: 1 - - debug: msg="you shouldn't see me" + #- command: /bin/false + # retries: "{{num_retries|default(5)}}" + # delay: 1 + #- debug: msg="you shouldn't see me" rescue: - debug: msg="this is the rescue" - command: /bin/false - debug: msg="you should not see this rescue message" always: - debug: msg="this is the always block, it should always be seen" - - command: /bin/false - - debug: msg="you should not see this always message" - - #- debug: msg="linear task 01" - #- debug: msg="linear task 02" - #- debug: msg="linear task 03" - # with_items: - # - a - # - b - # - c + #- command: /bin/false + #- debug: msg="you should not see this always message" handlers: - name: foo @@ -58,13 +40,3 @@ - name: bar debug: msg="this is the bar handler, you should not see this" -#- hosts: all -# connection: local -# strategy: free -# tasks: -# - ping: -# - command: /bin/false -# - debug: msg="free task 01" -# - debug: msg="free task 02" -# - debug: msg="free task 03" -