You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/units/executor/test_task_executor.py

487 lines
17 KiB
Python

# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.executor.task_executor import TaskExecutor
from ansible.playbook.play_context import PlayContext
Ansible Config part2 (#27448) * Ansible Config part2 - made dump_me nicer, added note this is not prod - moved internal key removal function to vars - carry tracebacks in errors we can now show tracebacks for plugins on vvv - show inventory plugin tracebacks on vvv - minor fixes to cg groups plugin - draft config from plugin docs - made search path warning 'saner' (top level dirs only) - correctly display config entries and others - removed unneeded code - commented out some conn plugin specific from base.yml - also deprecated sudo/su - updated ssh conn docs - shared get option method for connection plugins - note about needing eval for defaults - tailored yaml ext - updated strategy entry - for connection pliugins, options load on plugin load - allow for long types in definitions - better display in ansible-doc - cleaned up/updated source docs and base.yml - added many descriptions - deprecated include toggles as include is - draft backwards compat get_config - fixes to ansible-config, added --only-changed - some code reoorg - small license headers - show default in doc type - pushed module utils details to 5vs - work w/o config file - PEPE ATE! - moved loader to it's own file - fixed rhn_register test - fixed boto requirement in make tests - I ate Pepe - fixed dynamic eval of defaults - better doc code skip ipaddr filter tests when missing netaddr removed devnull string from config better becoem resolution * killed extra space with extreeme prejudice cause its an affront against all that is holy that 2 spaces touch each other! shippable timing out on some images, but merging as it passes most
7 years ago
from ansible.plugins.loader import action_loader, lookup_loader
from ansible.parsing.yaml.objects import AnsibleUnicode
from units.mock.loader import DictDataLoader
class TestTaskExecutor(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_task_executor_init(self):
fake_loader = DictDataLoader({})
mock_host = MagicMock()
mock_task = MagicMock()
mock_play_context = MagicMock()
mock_shared_loader = MagicMock()
new_stdin = None
job_vars = dict()
mock_queue = MagicMock()
te = TaskExecutor(
host=mock_host,
task=mock_task,
job_vars=job_vars,
play_context=mock_play_context,
new_stdin=new_stdin,
loader=fake_loader,
shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue,
)
def test_task_executor_run(self):
fake_loader = DictDataLoader({})
mock_host = MagicMock()
mock_task = MagicMock()
mock_task._role._role_path = '/path/to/role/foo'
mock_play_context = MagicMock()
mock_shared_loader = MagicMock()
mock_queue = MagicMock()
new_stdin = None
job_vars = dict()
te = TaskExecutor(
host=mock_host,
task=mock_task,
job_vars=job_vars,
play_context=mock_play_context,
new_stdin=new_stdin,
loader=fake_loader,
shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue,
)
te._get_loop_items = MagicMock(return_value=None)
te._execute = MagicMock(return_value=dict())
res = te.run()
te._get_loop_items = MagicMock(return_value=[])
res = te.run()
te._get_loop_items = MagicMock(return_value=['a', 'b', 'c'])
te._run_loop = MagicMock(return_value=[dict(item='a', changed=True), dict(item='b', failed=True), dict(item='c')])
res = te.run()
te._get_loop_items = MagicMock(side_effect=AnsibleError(""))
res = te.run()
self.assertIn("failed", res)
def test_task_executor_get_loop_items(self):
fake_loader = DictDataLoader({})
mock_host = MagicMock()
mock_task = MagicMock()
mock_task.loop = 'items'
mock_task.loop_args = ['a', 'b', 'c']
mock_play_context = MagicMock()
mock_shared_loader = MagicMock()
mock_shared_loader.lookup_loader = lookup_loader
new_stdin = None
job_vars = dict()
mock_queue = MagicMock()
te = TaskExecutor(
host=mock_host,
task=mock_task,
job_vars=job_vars,
play_context=mock_play_context,
new_stdin=new_stdin,
loader=fake_loader,
shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue,
)
items = te._get_loop_items()
self.assertEqual(items, ['a', 'b', 'c'])
def test_task_executor_run_loop(self):
items = ['a', 'b', 'c']
fake_loader = DictDataLoader({})
mock_host = MagicMock()
def _copy(exclude_parent=False, exclude_tasks=False):
new_item = MagicMock()
return new_item
mock_task = MagicMock()
mock_task.copy.side_effect = _copy
mock_play_context = MagicMock()
mock_shared_loader = MagicMock()
mock_queue = MagicMock()
new_stdin = None
job_vars = dict()
te = TaskExecutor(
host=mock_host,
task=mock_task,
job_vars=job_vars,
play_context=mock_play_context,
new_stdin=new_stdin,
loader=fake_loader,
shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue,
)
def _execute(variables):
return dict(item=variables.get('item'))
te._squash_items = MagicMock(return_value=items)
te._execute = MagicMock(side_effect=_execute)
res = te._run_loop(items)
self.assertEqual(len(res), 3)
def test_task_executor_squash_items(self):
items = ['a', 'b', 'c']
fake_loader = DictDataLoader({})
mock_host = MagicMock()
loop_var = 'item'
def _evaluate_conditional(templar, variables):
item = variables.get(loop_var)
if item == 'b':
return False
return True
mock_task = MagicMock()
mock_task.evaluate_conditional.side_effect = _evaluate_conditional
mock_play_context = MagicMock()
mock_shared_loader = None
mock_queue = MagicMock()
new_stdin = None
job_vars = dict(pkg_mgr='yum')
te = TaskExecutor(
host=mock_host,
task=mock_task,
job_vars=job_vars,
play_context=mock_play_context,
new_stdin=new_stdin,
loader=fake_loader,
shared_loader_obj=mock_shared_loader,
rslt_q=mock_queue,
)
# No replacement
mock_task.action = 'yum'
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, ['a', 'b', 'c'])
self.assertIsInstance(mock_task.args, MagicMock)
mock_task.action = 'foo'
mock_task.args = {'name': '{{item}}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, ['a', 'b', 'c'])
self.assertEqual(mock_task.args, {'name': '{{item}}'})
mock_task.action = 'yum'
mock_task.args = {'name': 'static'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, ['a', 'b', 'c'])
self.assertEqual(mock_task.args, {'name': 'static'})
mock_task.action = 'yum'
mock_task.args = {'name': '{{pkg_mgr}}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, ['a', 'b', 'c'])
self.assertEqual(mock_task.args, {'name': '{{pkg_mgr}}'})
mock_task.action = '{{unknown}}'
mock_task.args = {'name': '{{item}}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, ['a', 'b', 'c'])
self.assertEqual(mock_task.args, {'name': '{{item}}'})
# Could do something like this to recover from bad deps in a package
job_vars = dict(pkg_mgr='yum', packages=['a', 'b'])
items = ['absent', 'latest']
mock_task.action = 'yum'
mock_task.args = {'name': '{{ packages }}', 'state': '{{ item }}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, items)
self.assertEqual(mock_task.args, {'name': '{{ packages }}', 'state': '{{ item }}'})
# Maybe should raise an error in this case. The user would have to specify:
# - yum: name="{{ packages[item] }}"
# with_items:
# - ['a', 'b']
# - ['foo', 'bar']
# you can't use a list as a dict key so that would probably throw
# an error later. If so, we can throw it now instead.
# Squashing in this case would not be intuitive as the user is being
# explicit in using each list entry as a key.
job_vars = dict(pkg_mgr='yum', packages={"a": "foo", "b": "bar", "foo": "baz", "bar": "quux"})
items = [['a', 'b'], ['foo', 'bar']]
mock_task.action = 'yum'
mock_task.args = {'name': '{{ packages[item] }}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, items)
self.assertEqual(mock_task.args, {'name': '{{ packages[item] }}'})
# Replaces
items = ['a', 'b', 'c']
mock_task.action = 'yum'
mock_task.args = {'name': '{{item}}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, [['a', 'c']])
self.assertEqual(mock_task.args, {'name': ['a', 'c']})
mock_task.action = '{{pkg_mgr}}'
mock_task.args = {'name': '{{item}}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
self.assertEqual(new_items, [['a', 'c']])
self.assertEqual(mock_task.args, {'name': ['a', 'c']})
# New loop_var
mock_task.action = 'yum'
mock_task.args = {'name': '{{a_loop_var_item}}'}
mock_task.loop_control = {'loop_var': 'a_loop_var_item'}
loop_var = 'a_loop_var_item'
new_items = te._squash_items(items=items, loop_var='a_loop_var_item', variables=job_vars)
self.assertEqual(new_items, [['a', 'c']])
self.assertEqual(mock_task.args, {'name': ['a', 'c']})
loop_var = 'item'
#
# These are presently not optimized but could be in the future.
# Expected output if they were optimized is given as a comment
# Please move these to a different section if they are optimized
#
# Squashing lists
job_vars = dict(pkg_mgr='yum')
items = [['a', 'b'], ['foo', 'bar']]
mock_task.action = 'yum'
mock_task.args = {'name': '{{ item }}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
# self.assertEqual(new_items, [['a', 'b', 'foo', 'bar']])
# self.assertEqual(mock_task.args, {'name': ['a', 'b', 'foo', 'bar']})
self.assertEqual(new_items, items)
self.assertEqual(mock_task.args, {'name': '{{ item }}'})
# Retrieving from a dict
items = ['a', 'b', 'foo']
mock_task.action = 'yum'
mock_task.args = {'name': '{{ packages[item] }}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
# self.assertEqual(new_items, [['foo', 'baz']])
# self.assertEqual(mock_task.args, {'name': ['foo', 'baz']})
self.assertEqual(new_items, items)
self.assertEqual(mock_task.args, {'name': '{{ packages[item] }}'})
# Another way to retrieve from a dict
job_vars = dict(pkg_mgr='yum')
items = [{'package': 'foo'}, {'package': 'bar'}]
mock_task.action = 'yum'
mock_task.args = {'name': '{{ item["package"] }}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
# self.assertEqual(new_items, [['foo', 'bar']])
# self.assertEqual(mock_task.args, {'name': ['foo', 'bar']})
self.assertEqual(new_items, items)
self.assertEqual(mock_task.args, {'name': '{{ item["package"] }}'})
items = [
dict(name='a', state='present'),
dict(name='b', state='present'),
dict(name='c', state='present'),
]
mock_task.action = 'yum'
mock_task.args = {'name': '{{item.name}}', 'state': '{{item.state}}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
# self.assertEqual(new_items, [dict(name=['a', 'b', 'c'], state='present')])
# self.assertEqual(mock_task.args, {'name': ['a', 'b', 'c'], 'state': 'present'})
self.assertEqual(new_items, items)
self.assertEqual(mock_task.args, {'name': '{{item.name}}', 'state': '{{item.state}}'})
items = [
dict(name='a', state='present'),
dict(name='b', state='present'),
dict(name='c', state='absent'),
]
mock_task.action = 'yum'
mock_task.args = {'name': '{{item.name}}', 'state': '{{item.state}}'}
new_items = te._squash_items(items=items, loop_var='item', variables=job_vars)
# self.assertEqual(new_items, [dict(name=['a', 'b'], state='present'),
# dict(name='c', state='absent')])
# self.assertEqual(mock_task.args, {'name': '{{item.name}}', 'state': '{{item.state}}'})
self.assertEqual(new_items, items)
self.assertEqual(mock_task.args, {'name': '{{item.name}}', 'state': '{{item.state}}'})
def test_task_executor_execute(self):
fake_loader = DictDataLoader({})
mock_host = MagicMock()
mock_task = MagicMock()
mock_task.args = dict()
mock_task.retries = 0
mock_task.delay = -1
mock_task.register = 'foo'
mock_task.until = None
mock_task.changed_when = None
mock_task.failed_when = None
mock_task.post_validate.return_value = None
# mock_task.async cannot be left unset, because on Python 3 MagicMock()
# > 0 raises a TypeError There are two reasons for using the value 1
# here: on Python 2 comparing MagicMock() > 0 returns True, and the
# other reason is that if I specify 0 here, the test fails. ;)
mock_task.async = 1
mock_task.poll = 0
mock_play_context = MagicMock()
mock_play_context.post_validate.return_value = None
mock_play_context.update_vars.return_value = None
mock_connection = MagicMock()
mock_connection.set_host_overrides.return_value = None
mock_connection._connect.return_value = None
mock_action = MagicMock()
mock_queue = MagicMock()
shared_loader = None
new_stdin = None
job_vars = dict(omit="XXXXXXXXXXXXXXXXXXX")
te = TaskExecutor(
host=mock_host,
task=mock_task,
job_vars=job_vars,
play_context=mock_play_context,
new_stdin=new_stdin,
loader=fake_loader,
shared_loader_obj=shared_loader,
rslt_q=mock_queue,
)
te._get_connection = MagicMock(return_value=mock_connection)
te._get_action_handler = MagicMock(return_value=mock_action)
mock_action.run.return_value = dict(ansible_facts=dict())
res = te._execute()
mock_task.changed_when = MagicMock(return_value=AnsibleUnicode("1 == 1"))
res = te._execute()
mock_task.changed_when = None
mock_task.failed_when = MagicMock(return_value=AnsibleUnicode("1 == 1"))
res = te._execute()
mock_task.failed_when = None
mock_task.evaluate_conditional.return_value = False
res = te._execute()
mock_task.evaluate_conditional.return_value = True
mock_task.args = dict(_raw_params='foo.yml', a='foo', b='bar')
mock_task.action = 'include'
res = te._execute()
def test_task_executor_poll_async_result(self):
fake_loader = DictDataLoader({})
mock_host = MagicMock()
mock_task = MagicMock()
mock_task.async = 0.1
mock_task.poll = 0.05
mock_play_context = MagicMock()
mock_connection = MagicMock()
mock_action = MagicMock()
mock_queue = MagicMock()
shared_loader = MagicMock()
shared_loader.action_loader = action_loader
new_stdin = None
job_vars = dict(omit="XXXXXXXXXXXXXXXXXXX")
te = TaskExecutor(
host=mock_host,
task=mock_task,
job_vars=job_vars,
play_context=mock_play_context,
new_stdin=new_stdin,
loader=fake_loader,
shared_loader_obj=shared_loader,
rslt_q=mock_queue,
)
te._connection = MagicMock()
def _get(*args, **kwargs):
mock_action = MagicMock()
mock_action.run.return_value = dict(stdout='')
return mock_action
# testing with some bad values in the result passed to poll async,
# and with a bad value returned from the mock action
with patch.object(action_loader, 'get', _get):
mock_templar = MagicMock()
res = te._poll_async_result(result=dict(), templar=mock_templar)
self.assertIn('failed', res)
res = te._poll_async_result(result=dict(ansible_job_id=1), templar=mock_templar)
self.assertIn('failed', res)
def _get(*args, **kwargs):
mock_action = MagicMock()
mock_action.run.return_value = dict(finished=1)
return mock_action
# now testing with good values
with patch.object(action_loader, 'get', _get):
mock_templar = MagicMock()
res = te._poll_async_result(result=dict(ansible_job_id=1), templar=mock_templar)
self.assertEqual(res, dict(finished=1))