Martin Krizek 2 weeks ago committed by GitHub
commit 299da2aa86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,19 +1,6 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright: (c) Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
@ -44,6 +31,9 @@ class IteratingStates(IntEnum):
COMPLETE = 5
# TODO this can be probably just replaced with introducing:
# `HostState.failed: bool` as we do not need to track
# in which section the host failed
class FailedStates(IntFlag):
NONE = 0
SETUP = 1
@ -53,12 +43,12 @@ class FailedStates(IntFlag):
HANDLERS = 16 # NOTE not in use anymore
# FIXME investigate turning HostState into a dataclass
class HostState:
def __init__(self, blocks):
self._blocks = blocks[:]
self.handlers = []
self.handler_notifications = []
self.handler_notifications = {}
self.cur_block = 0
self.cur_regular_task = 0
@ -70,10 +60,7 @@ class HostState:
self.pre_flushing_run_state = None
self.update_handlers = True
self.pending_setup = False
self.tasks_child_state = None
self.rescue_child_state = None
self.always_child_state = None
self.did_rescue = False
self.child_state = None
self.did_start_at_task = False
def __repr__(self):
@ -82,8 +69,7 @@ class HostState:
def __str__(self):
return ("HOST STATE: block=%d, task=%d, rescue=%d, always=%d, handlers=%d, run_state=%s, fail_state=%s, "
"pre_flushing_run_state=%s, update_handlers=%s, pending_setup=%s, "
"tasks child state? (%s), rescue child state? (%s), always child state? (%s), "
"did rescue? %s, did start at task? %s" % (
"child state? (%s), did start at task? %s" % (
self.cur_block,
self.cur_regular_task,
self.cur_rescue_task,
@ -94,10 +80,7 @@ class HostState:
self.pre_flushing_run_state,
self.update_handlers,
self.pending_setup,
self.tasks_child_state,
self.rescue_child_state,
self.always_child_state,
self.did_rescue,
bool(self.child_state),
self.did_start_at_task,
))
@ -108,7 +91,7 @@ class HostState:
for attr in ('_blocks',
'cur_block', 'cur_regular_task', 'cur_rescue_task', 'cur_always_task', 'cur_handlers_task',
'run_state', 'fail_state', 'pre_flushing_run_state', 'update_handlers', 'pending_setup',
'tasks_child_state', 'rescue_child_state', 'always_child_state'):
'child_state'):
if getattr(self, attr) != getattr(other, attr):
return False
@ -120,7 +103,7 @@ class HostState:
def copy(self):
new_state = HostState(self._blocks)
new_state.handlers = self.handlers[:]
new_state.handler_notifications = self.handler_notifications[:]
new_state.handler_notifications = self.handler_notifications.copy()
new_state.cur_block = self.cur_block
new_state.cur_regular_task = self.cur_regular_task
new_state.cur_rescue_task = self.cur_rescue_task
@ -131,14 +114,9 @@ class HostState:
new_state.pre_flushing_run_state = self.pre_flushing_run_state
new_state.update_handlers = self.update_handlers
new_state.pending_setup = self.pending_setup
new_state.did_rescue = self.did_rescue
new_state.did_start_at_task = self.did_start_at_task
if self.tasks_child_state is not None:
new_state.tasks_child_state = self.tasks_child_state.copy()
if self.rescue_child_state is not None:
new_state.rescue_child_state = self.rescue_child_state.copy()
if self.always_child_state is not None:
new_state.always_child_state = self.always_child_state.copy()
if self.child_state is not None:
new_state.child_state = self.child_state.copy()
return new_state
@ -231,36 +209,36 @@ class PlayIterator:
self.cur_task = 0
def get_host_state(self, host):
# Since we're using the PlayIterator to carry forward failed hosts,
# in the event that a previous host was not in the current inventory
# we create a stub state for it now
"""Returns a **copy** of the HostState object for a given host,
creating a stub in case it does not exist.
"""
# NOTE This is now unused in the core codebase as we try and operate
# on HostState objects in-place for efficiency, only for when we
# peek at the next state we create a copy of the HostState
# object ourselves.
if host.name not in self._host_states:
self.set_state_for_host(host.name, HostState(blocks=[]))
return self._host_states[host.name].copy()
def get_next_task_for_host(self, host, peek=False):
display.debug("getting the next task for host %s" % host.name)
s = self.get_host_state(host)
s = self.get_state_for_host(host.name)
task = None
if s.run_state == IteratingStates.COMPLETE:
display.debug("host %s is done iterating, returning" % host.name)
return (s, None)
return s, None
(s, task) = self._get_next_task_from_state(s, host=host)
if peek:
s = s.copy()
if not peek:
self.set_state_for_host(host.name, s)
s, task = self._get_next_task_from_state(s, host=host)
display.debug("done getting next task for host %s" % host.name)
display.debug(" ^ task is: %s" % task)
display.debug(" ^ state is: %s" % s)
return (s, task)
return s, task
def _get_next_task_from_state(self, state, host):
task = None
# try and find the next task, given the current state.
@ -269,10 +247,10 @@ class PlayIterator:
# if we run past the end of the list we know we're done with
# this block
try:
block = state._blocks[state.cur_block]
block = state.get_current_block()
except IndexError:
state.run_state = IteratingStates.COMPLETE
return (state, None)
return state, None
if state.run_state == IteratingStates.SETUP:
# First, we check to see if we were pending setup. If not, this is
@ -303,55 +281,40 @@ class PlayIterator:
# the flag and move onto the next block in the list while setting
# the run state to IteratingStates.TASKS
state.pending_setup = False
state.run_state = IteratingStates.TASKS
if not state.did_start_at_task:
state.cur_block += 1
state.cur_regular_task = 0
state.cur_rescue_task = 0
state.cur_always_task = 0
state.tasks_child_state = None
state.rescue_child_state = None
state.always_child_state = None
elif state.run_state == IteratingStates.TASKS:
# clear the pending setup flag, since we're past that and it didn't fail
if state.pending_setup:
state.pending_setup = False
# First, we check for a child task state that is not failed, and if we
# have one recurse into it for the next task. If we're done with the child
# state, we clear it and drop back to getting the next task from the list.
if state.tasks_child_state:
(state.tasks_child_state, task) = self._get_next_task_from_state(state.tasks_child_state, host=host)
if self._check_failed_state(state.tasks_child_state):
if state.child_state:
state.child_state, task = self._get_next_task_from_state(state.child_state, host=host)
if self._check_failed_state(state.child_state):
# failed child state, so clear it and move into the rescue portion
state.tasks_child_state = None
state.child_state = None
self._set_failed_state(state)
else:
# get the next task recursively
if task is None or state.tasks_child_state.run_state == IteratingStates.COMPLETE:
# we're done with the child state, so clear it and continue
# back to the top of the loop to get the next task
state.tasks_child_state = None
continue
elif state.child_state.run_state == IteratingStates.COMPLETE:
# we're done with the child state, so clear it and continue
# back to the top of the loop to get the next task
state.child_state = None
else:
# First here, we check to see if we've failed anywhere down the chain
# of states we have, and if so we move onto the rescue portion. Otherwise,
# we check to see if we've moved past the end of the list of tasks. If so,
# we move into the always portion of the block, otherwise we get the next
# task from the list.
if self._check_failed_state(state):
state.run_state = IteratingStates.RESCUE
elif state.cur_regular_task >= len(block.block):
try:
task = block.block[state.cur_regular_task]
except IndexError:
state.run_state = IteratingStates.ALWAYS
else:
task = block.block[state.cur_regular_task]
# if the current task is actually a child block, create a child
# state for us to recurse into on the next pass
if isinstance(task, Block):
state.tasks_child_state = HostState(blocks=[task])
state.tasks_child_state.run_state = IteratingStates.TASKS
state.child_state = HostState(blocks=[task])
state.child_state.run_state = IteratingStates.TASKS
# since we've created the child state, clear the task
# so we can pick up the child state on the next pass
task = None
@ -360,28 +323,24 @@ class PlayIterator:
elif state.run_state == IteratingStates.RESCUE:
# The process here is identical to IteratingStates.TASKS, except instead
# we move into the always portion of the block.
if state.rescue_child_state:
(state.rescue_child_state, task) = self._get_next_task_from_state(state.rescue_child_state, host=host)
if self._check_failed_state(state.rescue_child_state):
state.rescue_child_state = None
if state.child_state:
state.child_state, task = self._get_next_task_from_state(state.child_state, host=host)
if self._check_failed_state(state.child_state):
state.child_state = None
self._set_failed_state(state)
else:
if task is None or state.rescue_child_state.run_state == IteratingStates.COMPLETE:
state.rescue_child_state = None
continue
elif state.child_state.run_state == IteratingStates.COMPLETE:
state.child_state = None
else:
if state.fail_state & FailedStates.RESCUE == FailedStates.RESCUE:
state.run_state = IteratingStates.ALWAYS
elif state.cur_rescue_task >= len(block.rescue):
if len(block.rescue) > 0:
try:
task = block.rescue[state.cur_rescue_task]
except IndexError:
if block.rescue:
state.fail_state = FailedStates.NONE
state.run_state = IteratingStates.ALWAYS
state.did_rescue = True
else:
task = block.rescue[state.cur_rescue_task]
if isinstance(task, Block):
state.rescue_child_state = HostState(blocks=[task])
state.rescue_child_state.run_state = IteratingStates.TASKS
state.child_state = HostState(blocks=[task])
state.child_state.run_state = IteratingStates.TASKS
task = None
state.cur_rescue_task += 1
@ -390,17 +349,17 @@ class PlayIterator:
# instead we either move onto the next block in the list, or we set the
# run state to IteratingStates.COMPLETE in the event of any errors, or when we
# have hit the end of the list of blocks.
if state.always_child_state:
(state.always_child_state, task) = self._get_next_task_from_state(state.always_child_state, host=host)
if self._check_failed_state(state.always_child_state):
state.always_child_state = None
if state.child_state:
state.child_state, task = self._get_next_task_from_state(state.child_state, host=host)
if self._check_failed_state(state.child_state):
state.child_state = None
self._set_failed_state(state)
else:
if task is None or state.always_child_state.run_state == IteratingStates.COMPLETE:
state.always_child_state = None
continue
elif state.child_state.run_state == IteratingStates.COMPLETE:
state.child_state = None
else:
if state.cur_always_task >= len(block.always):
try:
task = block.always[state.cur_always_task]
except IndexError:
if state.fail_state != FailedStates.NONE:
state.run_state = IteratingStates.COMPLETE
else:
@ -409,15 +368,10 @@ class PlayIterator:
state.cur_rescue_task = 0
state.cur_always_task = 0
state.run_state = IteratingStates.TASKS
state.tasks_child_state = None
state.rescue_child_state = None
state.always_child_state = None
state.did_rescue = False
else:
task = block.always[state.cur_always_task]
if isinstance(task, Block):
state.always_child_state = HostState(blocks=[task])
state.always_child_state.run_state = IteratingStates.TASKS
state.child_state = HostState(blocks=[task])
state.child_state.run_state = IteratingStates.TASKS
task = None
state.cur_always_task += 1
@ -443,173 +397,129 @@ class PlayIterator:
break
elif state.run_state == IteratingStates.COMPLETE:
return (state, None)
return state, None
# if something above set the task, break out of the loop now
if task:
break
return (state, task)
return state, task
def _set_failed_state(self, state):
if state.run_state == IteratingStates.HANDLERS:
# we are failing `meta: flush_handlers`, so just reset the state to whatever
# it was before and let `_set_failed_state` figure out the next state
state.run_state = state.pre_flushing_run_state
state.update_handlers = True
if state.run_state == IteratingStates.SETUP:
state.fail_state |= FailedStates.SETUP
state.run_state = IteratingStates.COMPLETE
elif state.run_state == IteratingStates.TASKS:
if state.tasks_child_state is not None:
state.tasks_child_state = self._set_failed_state(state.tasks_child_state)
else:
state.fail_state |= FailedStates.TASKS
if state._blocks[state.cur_block].rescue:
state.run_state = IteratingStates.RESCUE
elif state._blocks[state.cur_block].always:
state.run_state = IteratingStates.ALWAYS
else:
state.run_state = IteratingStates.COMPLETE
elif state.run_state == IteratingStates.RESCUE:
if state.rescue_child_state is not None:
state.rescue_child_state = self._set_failed_state(state.rescue_child_state)
else:
state.fail_state |= FailedStates.RESCUE
if state._blocks[state.cur_block].always:
state.run_state = IteratingStates.ALWAYS
else:
state.run_state = IteratingStates.COMPLETE
elif state.run_state == IteratingStates.ALWAYS:
if state.always_child_state is not None:
state.always_child_state = self._set_failed_state(state.always_child_state)
else:
state.fail_state |= FailedStates.ALWAYS
state.run_state = IteratingStates.COMPLETE
return state
else:
s = self.get_active_state(state)
match s.run_state:
case IteratingStates.TASKS:
s.fail_state |= FailedStates.TASKS
if s.get_current_block().rescue:
s.run_state = IteratingStates.RESCUE
elif s.get_current_block().always:
s.run_state = IteratingStates.ALWAYS
else:
s.run_state = IteratingStates.COMPLETE
case IteratingStates.RESCUE:
s.fail_state |= FailedStates.RESCUE
if s.get_current_block().always:
s.run_state = IteratingStates.ALWAYS
else:
s.run_state = IteratingStates.COMPLETE
case IteratingStates.ALWAYS:
s.fail_state |= FailedStates.ALWAYS
s.run_state = IteratingStates.COMPLETE
def mark_host_failed(self, host):
s = self.get_host_state(host)
s = self.get_state_for_host(host.name)
display.debug("marking host %s failed, current state: %s" % (host, s))
if s.run_state == IteratingStates.HANDLERS:
# we are failing `meta: flush_handlers`, so just reset the state to whatever
# it was before and let `_set_failed_state` figure out the next state
s.run_state = s.pre_flushing_run_state
s.update_handlers = True
s = self._set_failed_state(s)
self._set_failed_state(s)
display.debug("^ failed state is now: %s" % s)
self.set_state_for_host(host.name, s)
self._play._removed_hosts.append(host.name)
def get_failed_hosts(self):
return dict((host, True) for (host, state) in self._host_states.items() if self._check_failed_state(state))
def _check_failed_state(self, state):
# TODO this can be moved into HostState
if state is None:
return False
elif state.run_state == IteratingStates.RESCUE and self._check_failed_state(state.rescue_child_state):
return True
elif state.run_state == IteratingStates.ALWAYS and self._check_failed_state(state.always_child_state):
return True
elif state.fail_state != FailedStates.NONE:
if state.run_state == IteratingStates.RESCUE and state.fail_state & FailedStates.RESCUE == 0:
return False
elif state.run_state == IteratingStates.ALWAYS and state.fail_state & FailedStates.ALWAYS == 0:
return False
else:
return not (state.did_rescue and state.fail_state & FailedStates.ALWAYS == 0)
elif state.run_state == IteratingStates.TASKS and self._check_failed_state(state.tasks_child_state):
cur_block = state._blocks[state.cur_block]
if len(cur_block.rescue) > 0 and state.fail_state & FailedStates.RESCUE == 0:
return False
else:
return True
return False
return state.fail_state != FailedStates.NONE and state.run_state == IteratingStates.COMPLETE
def is_failed(self, host):
s = self.get_host_state(host)
s = self.get_state_for_host(host.name)
return self._check_failed_state(s)
def clear_host_errors(self, host):
self._clear_state_errors(self.get_state_for_host(host.name))
def _clear_state_errors(self, state: HostState) -> None:
state.fail_state = FailedStates.NONE
if state.tasks_child_state is not None:
self._clear_state_errors(state.tasks_child_state)
elif state.rescue_child_state is not None:
self._clear_state_errors(state.rescue_child_state)
elif state.always_child_state is not None:
self._clear_state_errors(state.always_child_state)
s = self.get_state_for_host(host.name)
while s is not None:
s.fail_state = FailedStates.NONE
s = s.child_state
def get_active_state(self, state):
'''
Finds the active state, recursively if necessary when there are child states.
'''
if state.run_state == IteratingStates.TASKS and state.tasks_child_state is not None:
return self.get_active_state(state.tasks_child_state)
elif state.run_state == IteratingStates.RESCUE and state.rescue_child_state is not None:
return self.get_active_state(state.rescue_child_state)
elif state.run_state == IteratingStates.ALWAYS and state.always_child_state is not None:
return self.get_active_state(state.always_child_state)
return state
# TODO this can be moved into HostState
s = state
if s.run_state in {
IteratingStates.TASKS,
IteratingStates.RESCUE,
IteratingStates.ALWAYS
}:
while s.child_state is not None:
s = s.child_state
return s
def is_any_block_rescuing(self, state):
'''
Given the current HostState state, determines if the current block, or any child blocks,
are in rescue mode.
'''
if state.run_state == IteratingStates.TASKS and state.get_current_block().rescue:
return True
if state.tasks_child_state is not None:
return self.is_any_block_rescuing(state.tasks_child_state)
if state.rescue_child_state is not None:
return self.is_any_block_rescuing(state.rescue_child_state)
if state.always_child_state is not None:
return self.is_any_block_rescuing(state.always_child_state)
return False
# TODO this can be moved into HostState
s = state
while s is not None:
if s.run_state == IteratingStates.TASKS and s.get_current_block().rescue:
return True
s = s.child_state
def _insert_tasks_into_state(self, state, task_list):
# if we've failed at all, or if the task list is empty, just return the current state
if (state.fail_state != FailedStates.NONE and state.run_state == IteratingStates.TASKS) or not task_list:
return state
if state.run_state == IteratingStates.TASKS:
if state.tasks_child_state:
state.tasks_child_state = self._insert_tasks_into_state(state.tasks_child_state, task_list)
else:
target_block = state._blocks[state.cur_block].copy()
before = target_block.block[:state.cur_regular_task]
after = target_block.block[state.cur_regular_task:]
target_block.block = before + task_list + after
state._blocks[state.cur_block] = target_block
elif state.run_state == IteratingStates.RESCUE:
if state.rescue_child_state:
state.rescue_child_state = self._insert_tasks_into_state(state.rescue_child_state, task_list)
else:
target_block = state._blocks[state.cur_block].copy()
before = target_block.rescue[:state.cur_rescue_task]
after = target_block.rescue[state.cur_rescue_task:]
target_block.rescue = before + task_list + after
state._blocks[state.cur_block] = target_block
elif state.run_state == IteratingStates.ALWAYS:
if state.always_child_state:
state.always_child_state = self._insert_tasks_into_state(state.always_child_state, task_list)
else:
target_block = state._blocks[state.cur_block].copy()
before = target_block.always[:state.cur_always_task]
after = target_block.always[state.cur_always_task:]
target_block.always = before + task_list + after
state._blocks[state.cur_block] = target_block
elif state.run_state == IteratingStates.HANDLERS:
state.handlers[state.cur_handlers_task:state.cur_handlers_task] = [h for b in task_list for h in b.block]
return state
return False
def add_tasks(self, host, task_list):
self.set_state_for_host(host.name, self._insert_tasks_into_state(self.get_host_state(host), task_list))
if not task_list:
return
s = self.get_active_state(self._host_states[host.name])
if s.run_state == IteratingStates.HANDLERS:
s.handlers[s.cur_handlers_task:s.cur_handlers_task] = [h for b in task_list for h in b.block]
else:
target_block = s.get_current_block().copy()
match s.run_state:
case IteratingStates.TASKS:
target_block.block[s.cur_regular_task:s.cur_regular_task] = task_list
case IteratingStates.RESCUE:
target_block.rescue[s.cur_rescue_task:s.cur_rescue_task] = task_list
case IteratingStates.ALWAYS:
target_block.always[s.cur_always_task:s.cur_always_task] = task_list
s._blocks[s.cur_block] = target_block
@property
def host_states(self):
return self._host_states
def get_state_for_host(self, hostname: str) -> HostState:
# Since we're using the PlayIterator to carry forward failed hosts,
# in the event that a previous host was not in the current inventory
# we create a stub state for it now
if hostname not in self._host_states:
self.set_state_for_host(hostname, HostState(blocks=[]))
return self._host_states[hostname]
def set_state_for_host(self, hostname: str, state: HostState) -> None:
@ -628,10 +538,10 @@ class PlayIterator:
self._host_states[hostname].fail_state = fail_state
def add_notification(self, hostname: str, notification: str) -> None:
# preserve order
host_state = self._host_states[hostname]
if notification not in host_state.handler_notifications:
host_state.handler_notifications.append(notification)
self._host_states[hostname].handler_notifications[notification] = ...
def clear_notification(self, hostname: str, notification: str) -> None:
self._host_states[hostname].handler_notifications.remove(notification)
try:
self._host_states[hostname].handler_notifications.pop(notification)
except KeyError as e:
raise AnsibleAssertionError(f"Failed to remove a handler notification: {e}")

@ -165,7 +165,7 @@ def debug_closure(func):
)
# We don't know the host yet, copy the previous states, for lookup after we process new results
prev_host_states = iterator.host_states.copy()
prev_host_states = {hostname: state.copy() for hostname, state in iterator.host_states.items()}
results = func(self, iterator, one_pass=one_pass, max_passes=max_passes)
_processed_results = []
@ -176,11 +176,11 @@ def debug_closure(func):
_queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None)
task_vars = _queued_task_args['task_vars']
play_context = _queued_task_args['play_context']
# Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state
# Try to grab the previous host state, if it doesn't exist use get_state_for_host to generate an empty state
try:
prev_host_state = prev_host_states[host.name]
except KeyError:
prev_host_state = iterator.get_host_state(host)
prev_host_state = iterator.get_state_for_host(host.name)
while result.needs_debugger(globally_enabled=self.debugger_active):
next_action = NextAction()
@ -311,7 +311,8 @@ class StrategyBase:
# execute one more pass through the iterator without peeking, to
# make sure that all of the hosts are advanced to their final task.
# This should be safe, as everything should be IteratingStates.COMPLETE by
# this point, though the strategy may not advance the hosts itself.
# this point, though the strategy may not advance the hosts itself as it
# may just end operating once peeking at the next state shows IteratingStates.COMPLETE.
for host in self._hosts_cache:
if host not in self._tqm._unreachable_hosts:
@ -597,8 +598,7 @@ class StrategyBase:
role_ran = True
ignore_errors = original_task.ignore_errors
if not ignore_errors:
# save the current state before failing it for later inspection
state_when_failed = iterator.get_state_for_host(original_host.name)
rescued = iterator.is_any_block_rescuing(iterator.get_state_for_host(original_host.name))
display.debug("marking %s as failed" % original_host.name)
if original_task.run_once:
# if we're using run_once, we have to fail every host here
@ -608,15 +608,13 @@ class StrategyBase:
else:
iterator.mark_host_failed(original_host)
state, dummy = iterator.get_next_task_for_host(original_host, peek=True)
if iterator.is_failed(original_host) and state and state.run_state == IteratingStates.COMPLETE:
if iterator.is_failed(original_host):
self._tqm._failed_hosts[original_host.name] = True
# if we're iterating on the rescue portion of a block then
# we save the failed task in a special var for use
# within the rescue/always
if iterator.is_any_block_rescuing(state_when_failed):
if rescued:
self._tqm._stats.increment('rescued', original_host.name)
iterator._play._removed_hosts.remove(original_host.name)
self._variable_manager.set_nonpersistent_facts(

@ -73,9 +73,9 @@ class StrategyModule(StrategyBase):
if not state_task_per_host:
return [(h, None) for h in hosts]
if self._in_handlers and not any(filter(
lambda rs: rs == IteratingStates.HANDLERS,
(s.run_state for s, dummy in state_task_per_host.values()))
if self._in_handlers and all(
s.run_state != IteratingStates.HANDLERS
for s, dummy in state_task_per_host.values()
):
self._in_handlers = False

@ -20,7 +20,7 @@ from __future__ import annotations
import unittest
from unittest.mock import patch, MagicMock
from ansible.executor.play_iterator import HostState, PlayIterator, IteratingStates, FailedStates
from ansible.executor.play_iterator import HostState, PlayIterator
from ansible.playbook import Playbook
from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import init_plugin_loader
@ -375,88 +375,3 @@ class TestPlayIterator(unittest.TestCase):
# end of iteration
(host_state, task) = itr.get_next_task_for_host(hosts[0])
self.assertIsNone(task)
def test_play_iterator_add_tasks(self):
fake_loader = DictDataLoader({
'test_play.yml': """
- hosts: all
gather_facts: no
tasks:
- debug: msg="dummy task"
""",
})
mock_var_manager = MagicMock()
mock_var_manager._fact_cache = dict()
mock_var_manager.get_vars.return_value = dict()
p = Playbook.load('test_play.yml', loader=fake_loader, variable_manager=mock_var_manager)
hosts = []
for i in range(0, 10):
host = MagicMock()
host.name = host.get_name.return_value = 'host%02d' % i
hosts.append(host)
inventory = MagicMock()
inventory.get_hosts.return_value = hosts
inventory.filter_hosts.return_value = hosts
play_context = PlayContext(play=p._entries[0])
itr = PlayIterator(
inventory=inventory,
play=p._entries[0],
play_context=play_context,
variable_manager=mock_var_manager,
all_vars=dict(),
)
# test the high-level add_tasks() method
s = HostState(blocks=[0, 1, 2])
itr._insert_tasks_into_state = MagicMock(return_value=s)
itr.add_tasks(hosts[0], [MagicMock(), MagicMock(), MagicMock()])
self.assertEqual(itr._host_states[hosts[0].name], s)
# now actually test the lower-level method that does the work
itr = PlayIterator(
inventory=inventory,
play=p._entries[0],
play_context=play_context,
variable_manager=mock_var_manager,
all_vars=dict(),
)
# iterate past first task
dummy, task = itr.get_next_task_for_host(hosts[0])
while (task and task.action != 'debug'):
dummy, task = itr.get_next_task_for_host(hosts[0])
self.assertIsNotNone(task, 'iterated past end of play while looking for place to insert tasks')
# get the current host state and copy it so we can mutate it
s = itr.get_host_state(hosts[0])
s_copy = s.copy()
# assert with an empty task list, or if we're in a failed state, we simply return the state as-is
res_state = itr._insert_tasks_into_state(s_copy, task_list=[])
self.assertEqual(res_state, s_copy)
s_copy.fail_state = FailedStates.TASKS
res_state = itr._insert_tasks_into_state(s_copy, task_list=[MagicMock()])
self.assertEqual(res_state, s_copy)
# but if we've failed with a rescue/always block
mock_task = MagicMock()
s_copy.run_state = IteratingStates.RESCUE
res_state = itr._insert_tasks_into_state(s_copy, task_list=[mock_task])
self.assertEqual(res_state, s_copy)
self.assertIn(mock_task, res_state._blocks[res_state.cur_block].rescue)
itr.set_state_for_host(hosts[0].name, res_state)
(next_state, next_task) = itr.get_next_task_for_host(hosts[0], peek=True)
self.assertEqual(next_task, mock_task)
itr.set_state_for_host(hosts[0].name, s)
# test a regular insertion
s_copy = s.copy()
res_state = itr._insert_tasks_into_state(s_copy, task_list=[MagicMock()])

Loading…
Cancel
Save