Porting tests to pytest (#33387)

* Porting tests to pytest

* Achievement Get: No longer need mock/generator.py
  * Now done via pytest's parametrization
  * Port safe_eval to pytest
  * Port text tests to pytest
  * Port test_set_mode_if_different to pytest

* Change conftest AnsibleModule fixtures to be more flexible
  * Move the AnsibleModules fixtures to module_utils/conftest.py for sharing
  * Testing the argspec code requires:
    * injecting both the argspec and the arguments.
    * Patching the arguments into sys.stdin at a different level

* More porting to obsolete mock/procenv.py
  * Port run_command to pytest
  * Port known_hosts tests to pytest
  * Port safe_eval to pytest
  * Port test_distribution_version.py to pytest
  * Port test_log to pytest
  * Port test__log_invocation to pytest
  * Remove unneeded import of procenv in test_postgresql

* Port test_pip to pytest style
  * As part of this, create a pytest ansiblemodule fixture in
    modules/conftest.py.  This is slightly different than the
    approach taken in module_utils because here we need to override the
    AnsibleModule that the modules will inherit from instead of one that
    we're instantiating ourselves.

* Fixup usage of parametrization in test_deprecate_warn

* Check that the pip module failed in our test
pull/33698/head
Toshio Kuratomi 7 years ago committed by GitHub
parent ed376abe42
commit cd36164239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,72 +0,0 @@
# Copyright 2016 Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from collections import Mapping
def make_method(func, args, kwargs):
def test_method(self):
func(self, *args, **kwargs)
# Format the argument string
arg_string = ', '.join(repr(a) for a in args)
kwarg_string = ', '.join('{0}={1}'.format(item[0], repr(item[1])) for item in kwargs.items())
arg_list = []
if arg_string:
arg_list.append(arg_string)
if kwarg_string:
arg_list.append(kwarg_string)
test_method.__name__ = 'test_{0}({1})'.format(func.__name__, ', '.join(arg_list))
return test_method
def add_method(func, *combined_args):
"""
Add a test case via a class decorator.
nose uses generators for this but doesn't work with unittest.TestCase
subclasses. So we have to write our own.
The first argument to this decorator is a test function. All subsequent
arguments are the arguments to create each generated test function with in
the following format:
Each set of arguments is a two-tuple. The first element is an iterable of
positional arguments. the second is a dict representing the kwargs.
"""
def wrapper(cls):
for combined_arg in combined_args:
if len(combined_arg) == 2:
args = combined_arg[0]
kwargs = combined_arg[1]
elif isinstance(combined_arg[0], Mapping):
args = []
kwargs = combined_arg[0]
else:
args = combined_arg[0]
kwargs = {}
test_method = make_method(func, args, kwargs)
setattr(cls, test_method.__name__, test_method)
return cls
return wrapper

@ -1,84 +1,55 @@
# -*- coding: utf-8 -*-
# (c) 2016, James Cammarata <jimi@sngx.net>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# (c) 2017, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import json
import sys
from units.mock.procenv import swap_stdin_and_argv
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import MagicMock
class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic__log_invocation(self):
with swap_stdin_and_argv(stdin_data=json.dumps(dict(
ANSIBLE_MODULE_ARGS=dict(foo=False, bar=[1, 2, 3], bam="bam", baz=u'baz')),
)):
from ansible.module_utils import basic
# test basic log invocation
basic._ANSIBLE_ARGS = None
am = basic.AnsibleModule(
argument_spec=dict(
foo=dict(default=True, type='bool'),
bar=dict(default=[], type='list'),
bam=dict(default="bam"),
baz=dict(default=u"baz"),
password=dict(default=True),
no_log=dict(default="you shouldn't see me", no_log=True),
),
)
am.log = MagicMock()
am._log_invocation()
# Message is generated from a dict so it will be in an unknown order.
# have to check this manually rather than with assert_called_with()
args = am.log.call_args[0]
self.assertEqual(len(args), 1)
message = args[0]
self.assertEqual(
len(message),
len('Invoked with bam=bam bar=[1, 2, 3] foo=False baz=baz no_log=NOT_LOGGING_PARAMETER password=NOT_LOGGING_PASSWORD')
)
self.assertTrue(message.startswith('Invoked with '))
self.assertIn(' bam=bam', message)
self.assertIn(' bar=[1, 2, 3]', message)
self.assertIn(' foo=False', message)
self.assertIn(' baz=baz', message)
self.assertIn(' no_log=NOT_LOGGING_PARAMETER', message)
self.assertIn(' password=NOT_LOGGING_PASSWORD', message)
kwargs = am.log.call_args[1]
self.assertEqual(
kwargs,
dict(log_args={
'foo': 'False',
'bar': '[1, 2, 3]',
'bam': 'bam',
'baz': 'baz',
'password': 'NOT_LOGGING_PASSWORD',
'no_log': 'NOT_LOGGING_PARAMETER',
})
)
import pytest
ARGS = dict(foo=False, bar=[1, 2, 3], bam="bam", baz=u'baz')
ARGUMENT_SPEC = dict(
foo=dict(default=True, type='bool'),
bar=dict(default=[], type='list'),
bam=dict(default="bam"),
baz=dict(default=u"baz"),
password=dict(default=True),
no_log=dict(default="you shouldn't see me", no_log=True),
)
@pytest.mark.parametrize('am, stdin', [(ARGUMENT_SPEC, ARGS)], indirect=['am', 'stdin'])
def test_module_utils_basic__log_invocation(am, mocker):
am.log = mocker.MagicMock()
am._log_invocation()
# Message is generated from a dict so it will be in an unknown order.
# have to check this manually rather than with assert_called_with()
args = am.log.call_args[0]
assert len(args) == 1
message = args[0]
assert len(message) == \
len('Invoked with bam=bam bar=[1, 2, 3] foo=False baz=baz no_log=NOT_LOGGING_PARAMETER password=NOT_LOGGING_PASSWORD')
assert message.startswith('Invoked with ')
assert ' bam=bam' in message
assert ' bar=[1, 2, 3]' in message
assert ' foo=False' in message
assert ' baz=baz' in message
assert ' no_log=NOT_LOGGING_PARAMETER' in message
assert ' password=NOT_LOGGING_PASSWORD' in message
kwargs = am.log.call_args[1]
assert kwargs == \
dict(log_args={
'foo': 'False',
'bar': '[1, 2, 3]',
'bam': 'bam',
'baz': 'baz',
'password': 'NOT_LOGGING_PASSWORD',
'no_log': 'NOT_LOGGING_PARAMETER',
})

@ -6,44 +6,46 @@ __metaclass__ = type
import json
from ansible.compat.tests import unittest
import pytest
from ansible.compat.tests.mock import MagicMock
from units.mock.procenv import swap_stdin_and_argv, swap_stdout
from ansible.module_utils import basic
class TestCallableTypeValidation(unittest.TestCase):
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS=dict(arg="42")))
self.stdin_swap_ctx = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap_ctx.__enter__()
MOCK_VALIDATOR_SUCCESS = MagicMock(return_value=42)
MOCK_VALIDATOR_FAIL = MagicMock(side_effect=TypeError("bad conversion"))
# Data is argspec, argument, expected
VALID_SPECS = (
({'arg': {'type': int}}, {'arg': 42}, 42),
({'arg': {'type': int}}, {'arg': '42'}, 42),
({'arg': {'type': MOCK_VALIDATOR_SUCCESS}}, {'arg': 42}, 42),
)
# since we can't use context managers and "with" without overriding run(), call them directly
self.stdout_swap_ctx = swap_stdout()
self.fake_stream = self.stdout_swap_ctx.__enter__()
INVALID_SPECS = (
({'arg': {'type': int}}, {'arg': "bad"}, "invalid literal for int() with base 10: 'bad'"),
({'arg': {'type': MOCK_VALIDATOR_FAIL}}, {'arg': "bad"}, "bad conversion"),
)
basic._ANSIBLE_ARGS = None
def tearDown(self):
# since we can't use context managers and "with" without overriding run(), call them directly to clean up
self.stdin_swap_ctx.__exit__(None, None, None)
self.stdout_swap_ctx.__exit__(None, None, None)
@pytest.mark.parametrize('argspec, expected, am, stdin', [(s[0], s[2], s[0], s[1]) for s in VALID_SPECS],
indirect=['am', 'stdin'])
def test_validator_success(am, mocker, argspec, expected):
def test_validate_success(self):
mock_validator = MagicMock(return_value=42)
m = basic.AnsibleModule(argument_spec=dict(
arg=dict(type=mock_validator)
))
type_ = argspec['arg']['type']
if isinstance(type_, MagicMock):
assert type_.called
else:
assert isinstance(am.params['arg'], type_)
assert am.params['arg'] == expected
self.assertTrue(mock_validator.called)
self.assertEqual(m.params['arg'], 42)
self.assertEqual(type(m.params['arg']), int)
def test_validate_fail(self):
mock_validator = MagicMock(side_effect=TypeError("bad conversion"))
with self.assertRaises(SystemExit) as ecm:
m = basic.AnsibleModule(argument_spec=dict(
arg=dict(type=mock_validator)
))
@pytest.mark.parametrize('argspec, expected, stdin', [(s[0], s[2], s[1]) for s in INVALID_SPECS],
indirect=['stdin'])
def test_validator_fail(stdin, capfd, argspec, expected):
with pytest.raises(SystemExit) as ecm:
m = basic.AnsibleModule(argument_spec=argspec)
self.assertIn("bad conversion", json.loads(self.fake_stream.getvalue())['msg'])
out, err = capfd.readouterr()
assert not err
assert expected in json.loads(out)['msg']
assert json.loads(out)['failed']

@ -1,19 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import json
import sys
@ -27,49 +15,7 @@ from ansible.module_utils.six import PY3, string_types
from ansible.module_utils._text import to_bytes
@pytest.fixture
def stdin(mocker, request):
if isinstance(request.param, string_types):
args = request.param
elif isinstance(request.param, MutableMapping):
if 'ANSIBLE_MODULE_ARGS' not in request.param:
request.param = {'ANSIBLE_MODULE_ARGS': request.param}
args = json.dumps(request.param)
else:
raise Exception('Malformed data to the stdin pytest fixture')
real_stdin = sys.stdin
fake_stdin = BytesIO(to_bytes(args, errors='surrogate_or_strict'))
if PY3:
sys.stdin = mocker.MagicMock()
sys.stdin.buffer = fake_stdin
else:
sys.stdin = fake_stdin
yield fake_stdin
sys.stdin = real_stdin
@pytest.fixture
def am(stdin, request):
old_args = ansible.module_utils.basic._ANSIBLE_ARGS
ansible.module_utils.basic._ANSIBLE_ARGS = None
old_argv = sys.argv
sys.argv = ['ansible_unittest']
am = ansible.module_utils.basic.AnsibleModule(
argument_spec=dict(),
)
am._name = 'ansible_unittest'
yield am
ansible.module_utils.basic._ANSIBLE_ARGS = old_args
sys.argv = old_argv
@pytest.mark.parametrize('stdin', [{}], indirect=True)
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_warn(am, capfd):
am.warn('warning1')
@ -80,7 +26,7 @@ def test_warn(am, capfd):
assert json.loads(out)['warnings'] == ['warning1', 'warning2']
@pytest.mark.parametrize('stdin', [{}], indirect=True)
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_deprecate(am, capfd):
am.deprecate('deprecation1')
am.deprecate('deprecation2', '2.3')
@ -99,7 +45,7 @@ def test_deprecate(am, capfd):
]
@pytest.mark.parametrize('stdin', [{}], indirect=True)
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_deprecate_without_list(am, capfd):
with pytest.raises(SystemExit):
am.exit_json(deprecations='Simple deprecation warning')

@ -1,98 +1,76 @@
# -*- coding: utf-8 -*-
# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright (c) 2015-2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
__metaclass__ = type
import copy
import json
import sys
from itertools import chain
from ansible.compat.tests import unittest
from ansible.module_utils import basic
from units.mock.procenv import swap_stdin_and_argv, swap_stdout
empty_invocation = {u'module_args': {}}
class TestAnsibleModuleExitJson(unittest.TestCase):
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}))
self.stdin_swap_ctx = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap_ctx.__enter__()
# since we can't use context managers and "with" without overriding run(), call them directly
self.stdout_swap_ctx = swap_stdout()
self.fake_stream = self.stdout_swap_ctx.__enter__()
basic._ANSIBLE_ARGS = None
self.module = basic.AnsibleModule(argument_spec=dict())
def tearDown(self):
# since we can't use context managers and "with" without overriding run(), call them directly to clean up
self.stdin_swap_ctx.__exit__(None, None, None)
self.stdout_swap_ctx.__exit__(None, None, None)
def test_exit_json_no_args_exits(self):
with self.assertRaises(SystemExit) as ctx:
self.module.exit_json()
if isinstance(ctx.exception, int):
# Python2.6... why does sys.exit behave this way?
self.assertEquals(ctx.exception, 0)
else:
self.assertEquals(ctx.exception.code, 0)
return_val = json.loads(self.fake_stream.getvalue())
self.assertEquals(return_val, dict(invocation=empty_invocation))
def test_exit_json_args_exits(self):
with self.assertRaises(SystemExit) as ctx:
self.module.exit_json(msg='message')
if isinstance(ctx.exception, int):
# Python2.6... why does sys.exit behave this way?
self.assertEquals(ctx.exception, 0)
else:
self.assertEquals(ctx.exception.code, 0)
return_val = json.loads(self.fake_stream.getvalue())
self.assertEquals(return_val, dict(msg="message", invocation=empty_invocation))
def test_fail_json_exits(self):
with self.assertRaises(SystemExit) as ctx:
self.module.fail_json(msg='message')
if isinstance(ctx.exception, int):
# Python2.6... why does sys.exit behave this way?
self.assertEquals(ctx.exception, 1)
else:
self.assertEquals(ctx.exception.code, 1)
return_val = json.loads(self.fake_stream.getvalue())
self.assertEquals(return_val, dict(msg="message", failed=True, invocation=empty_invocation))
def test_exit_json_proper_changed(self):
with self.assertRaises(SystemExit) as ctx:
self.module.exit_json(changed=True, msg='success')
return_val = json.loads(self.fake_stream.getvalue())
self.assertEquals(return_val, dict(changed=True, msg='success', invocation=empty_invocation))
class TestAnsibleModuleExitValuesRemoved(unittest.TestCase):
import pytest
EMPTY_INVOCATION = {u'module_args': {}}
class TestAnsibleModuleExitJson:
"""
Test that various means of calling exitJson and FailJson return the messages they've been given
"""
DATA = (
({}, {'invocation': EMPTY_INVOCATION}),
({'msg': 'message'}, {'msg': 'message', 'invocation': EMPTY_INVOCATION}),
({'msg': 'success', 'changed': True},
{'msg': 'success', 'changed': True, 'invocation': EMPTY_INVOCATION}),
({'msg': 'nochange', 'changed': False},
{'msg': 'nochange', 'changed': False, 'invocation': EMPTY_INVOCATION}),
)
# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize('args, expected, stdin', ((a, e, {}) for a, e in DATA), indirect=['stdin'])
def test_exit_json_exits(self, am, capfd, args, expected):
with pytest.raises(SystemExit) as ctx:
am.exit_json(**args)
assert ctx.value.code == 0
out, err = capfd.readouterr()
return_val = json.loads(out)
assert return_val == expected
# Fail_json is only legal if it's called with a message
# pylint bug: https://github.com/PyCQA/pylint/issues/511
@pytest.mark.parametrize('args, expected, stdin',
((a, e, {}) for a, e in DATA if 'msg' in a), # pylint: disable=undefined-variable
indirect=['stdin'])
def test_fail_json_exits(self, am, capfd, args, expected):
with pytest.raises(SystemExit) as ctx:
am.fail_json(**args)
assert ctx.value.code == 1
out, err = capfd.readouterr()
return_val = json.loads(out)
# Fail_json should add failed=True
expected['failed'] = True
assert return_val == expected
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_fail_json_no_msg(self, am):
with pytest.raises(AssertionError) as ctx:
am.fail_json()
assert ctx.value.args[0] == "implementation error -- msg to explain the error is required"
class TestAnsibleModuleExitValuesRemoved:
"""
Test that ExitJson and FailJson remove password-like values
"""
OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
dataset = (
DATA = (
(
dict(username='person', password='$ecret k3y'),
dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
@ -119,43 +97,27 @@ class TestAnsibleModuleExitValuesRemoved(unittest.TestCase):
),
)
def test_exit_json_removes_values(self):
self.maxDiff = None
for args, return_val, expected in self.dataset:
params = dict(ANSIBLE_MODULE_ARGS=args)
params = json.dumps(params)
with swap_stdin_and_argv(stdin_data=params):
with swap_stdout():
basic._ANSIBLE_ARGS = None
module = basic.AnsibleModule(
argument_spec=dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
with self.assertRaises(SystemExit) as ctx:
self.assertEquals(module.exit_json(**return_val), expected)
self.assertEquals(json.loads(sys.stdout.getvalue()), expected)
def test_fail_json_removes_values(self):
self.maxDiff = None
for args, return_val, expected in self.dataset:
expected = copy.deepcopy(expected)
expected['failed'] = True
params = dict(ANSIBLE_MODULE_ARGS=args)
params = json.dumps(params)
with swap_stdin_and_argv(stdin_data=params):
with swap_stdout():
basic._ANSIBLE_ARGS = None
module = basic.AnsibleModule(
argument_spec=dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
with self.assertRaises(SystemExit) as ctx:
self.assertEquals(module.fail_json(**return_val), expected)
self.assertEquals(json.loads(sys.stdout.getvalue()), expected)
# pylint bug: https://github.com/PyCQA/pylint/issues/511
@pytest.mark.parametrize('am, stdin, return_val, expected',
(({'username': {}, 'password': {'no_log': True}, 'token': {'no_log': True}}, s, r, e)
for s, r, e in DATA), # pylint: disable=undefined-variable
indirect=['am', 'stdin'])
def test_exit_json_removes_values(self, am, capfd, return_val, expected):
with pytest.raises(SystemExit) as ctx:
am.exit_json(**return_val)
out, err = capfd.readouterr()
assert json.loads(out) == expected
# pylint bug: https://github.com/PyCQA/pylint/issues/511
@pytest.mark.parametrize('am, stdin, return_val, expected',
(({'username': {}, 'password': {'no_log': True}, 'token': {'no_log': True}}, s, r, e)
for s, r, e in DATA), # pylint: disable=undefined-variable
indirect=['am', 'stdin'])
def test_fail_json_removes_values(self, am, capfd, return_val, expected):
expected['failed'] = True
with pytest.raises(SystemExit) as ctx:
am.fail_json(**return_val) == expected
out, err = capfd.readouterr()
assert json.loads(out) == expected

@ -1,35 +1,20 @@
# -*- coding: utf-8 -*-
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# (c) 2017, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import sys
import json
import syslog
from itertools import product
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from units.mock.procenv import swap_stdin_and_argv
import pytest
import ansible.module_utils.basic
from ansible.module_utils.six import PY3
try:
# Python 3.4+
@ -41,80 +26,47 @@ except ImportError:
pass
class TestAnsibleModuleSysLogSmokeTest(unittest.TestCase):
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
ansible.module_utils.basic._ANSIBLE_ARGS = None
self.am = ansible.module_utils.basic.AnsibleModule(
argument_spec=dict(),
)
self.am._name = 'unittest'
self.has_journal = ansible.module_utils.basic.has_journal
if self.has_journal:
# Systems with journal can still test syslog
ansible.module_utils.basic.has_journal = False
def tearDown(self):
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
ansible.module_utils.basic.has_journal = self.has_journal
class TestAnsibleModuleLogSmokeTest:
DATA = [u'Text string', u'Toshio くらとみ non-ascii test']
DATA = DATA + [d.encode('utf-8') for d in DATA]
DATA += [b'non-utf8 :\xff: test']
def test_smoketest_syslog(self):
# pylint bug: https://github.com/PyCQA/pylint/issues/511
@pytest.mark.parametrize('msg, stdin', ((m, {}) for m in DATA), indirect=['stdin']) # pylint: disable=undefined-variable
def test_smoketest_syslog(self, am, mocker, msg):
# These talk to the live daemons on the system. Need to do this to
# show that what we send doesn't cause an issue once it gets to the
# daemon. These are just smoketests to test that we don't fail.
mocker.patch('ansible.module_utils.basic.has_journal', False)
self.am.log(u'Text string')
self.am.log(u'Toshio くらとみ non-ascii test')
self.am.log(b'Byte string')
self.am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8'))
self.am.log(b'non-utf8 :\xff: test')
class TestAnsibleModuleJournaldSmokeTest(unittest.TestCase):
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
am.log(u'Text string')
am.log(u'Toshio くらとみ non-ascii test')
ansible.module_utils.basic._ANSIBLE_ARGS = None
self.am = ansible.module_utils.basic.AnsibleModule(
argument_spec=dict(),
)
self.am._name = 'unittest'
am.log(b'Byte string')
am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8'))
am.log(b'non-utf8 :\xff: test')
def tearDown(self):
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
@unittest.skipUnless(ansible.module_utils.basic.has_journal, 'python systemd bindings not installed')
def test_smoketest_journal(self):
@pytest.mark.skipif(not ansible.module_utils.basic.has_journal, reason='python systemd bindings not installed')
# pylint bug: https://github.com/PyCQA/pylint/issues/511
@pytest.mark.parametrize('msg, stdin', ((m, {}) for m in DATA), indirect=['stdin']) # pylint: disable=undefined-variable
def test_smoketest_journal(self, am, mocker, msg):
# These talk to the live daemons on the system. Need to do this to
# show that what we send doesn't cause an issue once it gets to the
# daemon. These are just smoketests to test that we don't fail.
mocker.patch('ansible.module_utils.basic.has_journal', True)
self.am.log(u'Text string')
self.am.log(u'Toshio くらとみ non-ascii test')
am.log(u'Text string')
am.log(u'Toshio くらとみ non-ascii test')
self.am.log(b'Byte string')
self.am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8'))
self.am.log(b'non-utf8 :\xff: test')
am.log(b'Byte string')
am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8'))
am.log(b'non-utf8 :\xff: test')
class TestAnsibleModuleLogSyslog(unittest.TestCase):
class TestAnsibleModuleLogSyslog:
"""Test the AnsibleModule Log Method"""
py2_output_data = {
PY2_OUTPUT_DATA = {
u'Text string': b'Text string',
u'Toshio くらとみ non-ascii test': u'Toshio くらとみ non-ascii test'.encode('utf-8'),
b'Byte string': b'Byte string',
@ -122,7 +74,7 @@ class TestAnsibleModuleLogSyslog(unittest.TestCase):
b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'.decode('utf-8', 'replace').encode('utf-8'),
}
py3_output_data = {
PY3_OUTPUT_DATA = {
u'Text string': u'Text string',
u'Toshio くらとみ non-ascii test': u'Toshio くらとみ non-ascii test',
b'Byte string': u'Byte string',
@ -130,60 +82,37 @@ class TestAnsibleModuleLogSyslog(unittest.TestCase):
b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'.decode('utf-8', 'replace')
}
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
ansible.module_utils.basic._ANSIBLE_ARGS = None
self.am = ansible.module_utils.basic.AnsibleModule(
argument_spec=dict(),
)
self.am._name = 'unittest'
self.has_journal = ansible.module_utils.basic.has_journal
if self.has_journal:
# Systems with journal can still test syslog
ansible.module_utils.basic.has_journal = False
def tearDown(self):
# teardown/reset
ansible.module_utils.basic.has_journal = self.has_journal
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
@patch('syslog.syslog', autospec=True)
def test_no_log(self, mock_func):
no_log = self.am.no_log
self.am.no_log = True
self.am.log('unittest no_log')
self.assertFalse(mock_func.called)
self.am.no_log = False
self.am.log('unittest no_log')
mock_func.assert_called_once_with(syslog.LOG_INFO, 'unittest no_log')
self.am.no_log = no_log
def test_output_matches(self):
if sys.version_info >= (3,):
output_data = self.py3_output_data
@pytest.mark.parametrize('no_log, stdin', (product((True, False), [{}])), indirect=['stdin'])
def test_no_log(self, am, mocker, no_log):
"""Test that when no_log is set, logging does not occur"""
mock_syslog = mocker.patch('syslog.syslog', autospec=True)
mocker.patch('ansible.module_utils.basic.has_journal', False)
am.no_log = no_log
am.log('unittest no_log')
if no_log:
assert not mock_syslog.called
else:
output_data = self.py2_output_data
mock_syslog.assert_called_once_with(syslog.LOG_INFO, 'unittest no_log')
# pylint bug: https://github.com/PyCQA/pylint/issues/511
@pytest.mark.parametrize('msg, param, stdin',
((m, p, {}) for m, p in
(PY3_OUTPUT_DATA.items() if PY3 else PY2_OUTPUT_DATA.items())), # pylint: disable=undefined-variable
indirect=['stdin'])
def test_output_matches(self, am, mocker, msg, param):
"""Check that log messages are sent correctly"""
mocker.patch('ansible.module_utils.basic.has_journal', False)
mock_syslog = mocker.patch('syslog.syslog', autospec=True)
for msg, param in output_data.items():
with patch('syslog.syslog', autospec=True) as mock_func:
self.am.log(msg)
mock_func.assert_called_once_with(syslog.LOG_INFO, param)
am.log(msg)
mock_syslog.assert_called_once_with(syslog.LOG_INFO, param)
class TestAnsibleModuleLogJournal(unittest.TestCase):
@pytest.mark.skipif(not ansible.module_utils.basic.has_journal, reason='python systemd bindings not installed')
class TestAnsibleModuleLogJournal:
"""Test the AnsibleModule Log Method"""
output_data = {
OUTPUT_DATA = {
u'Text string': u'Text string',
u'Toshio くらとみ non-ascii test': u'Toshio くらとみ non-ascii test',
b'Byte string': u'Byte string',
@ -191,82 +120,43 @@ class TestAnsibleModuleLogJournal(unittest.TestCase):
b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'.decode('utf-8', 'replace')
}
# overriding run lets us use context managers for setup/teardown-esque behavior
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
ansible.module_utils.basic._ANSIBLE_ARGS = None
self.am = ansible.module_utils.basic.AnsibleModule(
argument_spec=dict(),
)
self.am._name = 'unittest'
self.has_journal = ansible.module_utils.basic.has_journal
ansible.module_utils.basic.has_journal = True
self.module_patcher = None
# In case systemd-python is not installed
if not self.has_journal:
self.module_patcher = patch.dict('sys.modules', {'systemd': MagicMock(), 'systemd.journal': MagicMock()})
self.module_patcher.start()
try:
reload(ansible.module_utils.basic)
except NameError:
self._fake_out_reload(ansible.module_utils.basic)
def tearDown(self):
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
# teardown/reset
ansible.module_utils.basic.has_journal = self.has_journal
if self.module_patcher:
self.module_patcher.stop()
reload(ansible.module_utils.basic)
@patch('systemd.journal.send')
def test_no_log(self, mock_func):
no_log = self.am.no_log
self.am.no_log = True
self.am.log('unittest no_log')
self.assertFalse(mock_func.called)
self.am.no_log = False
self.am.log('unittest no_log')
self.assertEqual(mock_func.called, 1)
# Message
# call_args is a 2-tuple of (arg_list, kwarg_dict)
self.assertTrue(mock_func.call_args[0][0].endswith('unittest no_log'), msg='Message was not sent to log')
# log adds this journal field
self.assertIn('MODULE', mock_func.call_args[1])
self.assertIn('basic.py', mock_func.call_args[1]['MODULE'])
self.am.no_log = no_log
def test_output_matches(self):
for msg, param in self.output_data.items():
with patch('systemd.journal.send', autospec=True) as mock_func:
self.am.log(msg)
self.assertEqual(mock_func.call_count, 1, msg='journal.send not called exactly once')
self.assertTrue(mock_func.call_args[0][0].endswith(param))
@patch('systemd.journal.send')
def test_log_args(self, mock_func):
self.am.log('unittest log_args', log_args=dict(TEST='log unittest'))
self.assertEqual(mock_func.called, 1)
self.assertTrue(mock_func.call_args[0][0].endswith('unittest log_args'), msg='Message was not sent to log')
@pytest.mark.parametrize('no_log, stdin', (product((True, False), [{}])), indirect=['stdin'])
def test_no_log(self, am, mocker, no_log):
journal_send = mocker.patch('systemd.journal.send')
am.no_log = no_log
am.log('unittest no_log')
if no_log:
assert not journal_send.called
else:
assert journal_send.called == 1
# Message
# call_args is a 2-tuple of (arg_list, kwarg_dict)
assert journal_send.call_args[0][0].endswith('unittest no_log'), 'Message was not sent to log'
# log adds this journal field
assert 'MODULE' in journal_send.call_args[1]
assert 'basic.py' in journal_send.call_args[1]['MODULE']
# pylint bug: https://github.com/PyCQA/pylint/issues/511
@pytest.mark.parametrize('msg, param, stdin',
((m, p, {}) for m, p in OUTPUT_DATA.items()), # pylint: disable=undefined-variable
indirect=['stdin'])
def test_output_matches(self, am, mocker, msg, param):
journal_send = mocker.patch('systemd.journal.send')
am.log(msg)
assert journal_send.call_count == 1, 'journal.send not called exactly once'
assert journal_send.call_args[0][0].endswith(param)
@pytest.mark.parametrize('stdin', ({},), indirect=['stdin'])
def test_log_args(self, am, mocker):
journal_send = mocker.patch('systemd.journal.send')
am.log('unittest log_args', log_args=dict(TEST='log unittest'))
assert journal_send.called == 1
assert journal_send.call_args[0][0].endswith('unittest log_args'), 'Message was not sent to log'
# log adds this journal field
self.assertIn('MODULE', mock_func.call_args[1])
self.assertIn('basic.py', mock_func.call_args[1]['MODULE'])
assert 'MODULE' in journal_send.call_args[1]
assert 'basic.py' in journal_send.call_args[1]['MODULE']
# We added this journal field
self.assertIn('TEST', mock_func.call_args[1])
self.assertIn('log unittest', mock_func.call_args[1]['TEST'])
assert 'TEST' in journal_send.call_args[1]
assert 'log unittest' in journal_send.call_args[1]['TEST']

@ -1,38 +1,18 @@
# -*- coding: utf-8 -*-
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
__metaclass__ = type
import errno
import json
import sys
import time
from io import BytesIO, StringIO
from itertools import product
from io import BytesIO
import pytest
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import call, MagicMock, Mock, patch, sentinel
from ansible.module_utils.six import PY3
import ansible.module_utils.basic
from units.mock.procenv import swap_stdin_and_argv
from ansible.module_utils._text import to_native
class OpenBytesIO(BytesIO):
@ -45,165 +25,182 @@ class OpenBytesIO(BytesIO):
pass
class TestAnsibleModuleRunCommand(unittest.TestCase):
@pytest.fixture(autouse=True)
def run_command_mocked_env(self, mocker):
self.cmd_out = {
# os.read() is returning 'bytes', not strings
sentinel.stdout: BytesIO(),
sentinel.stderr: BytesIO(),
}
def mock_os_read(fd, nbytes):
return self.cmd_out[fd].read(nbytes)
def mock_select(rlist, wlist, xlist, timeout=1):
return (rlist, [], [])
def mock_os_chdir(path):
if path == '/inaccessible':
raise OSError(errno.EPERM, "Permission denied: '/inaccessible'")
def mock_os_abspath(path):
if path.startswith('/'):
return path
else:
return self.os.getcwd.return_value + '/' + path
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
ansible.module_utils.basic._ANSIBLE_ARGS = None
self.module = ansible.module_utils.basic.AnsibleModule(argument_spec=dict())
self.module.fail_json = MagicMock(side_effect=SystemExit)
self.os = mocker.patch('ansible.module_utils.basic.os')
self.os.path.expandvars.side_effect = lambda x: x
self.os.path.expanduser.side_effect = lambda x: x
self.os.environ = {'PATH': '/bin'}
self.os.getcwd.return_value = '/home/foo'
self.os.path.isdir.return_value = True
self.os.chdir.side_effect = mock_os_chdir
self.os.read.side_effect = mock_os_read
self.os.path.abspath.side_effect = mock_os_abspath
self.subprocess = mocker.patch('ansible.module_utils.basic.subprocess')
self.cmd = Mock()
self.cmd.returncode = 0
self.cmd.stdin = OpenBytesIO()
self.cmd.stdout.fileno.return_value = sentinel.stdout
self.cmd.stderr.fileno.return_value = sentinel.stderr
self.subprocess.Popen.return_value = self.cmd
self.select = mocker.patch('ansible.module_utils.basic.select')
self.select.select.side_effect = mock_select
yield
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
def test_list_as_args(self):
self.module.run_command(['/bin/ls', 'a', ' b', 'c '])
self.assertTrue(self.subprocess.Popen.called)
args, kwargs = self.subprocess.Popen.call_args
self.assertEqual(args, (['/bin/ls', 'a', ' b', 'c '], ))
self.assertEqual(kwargs['shell'], False)
def test_str_as_args(self):
self.module.run_command('/bin/ls a " b" "c "')
self.assertTrue(self.subprocess.Popen.called)
args, kwargs = self.subprocess.Popen.call_args
self.assertEqual(args, (['/bin/ls', 'a', ' b', 'c '], ))
self.assertEqual(kwargs['shell'], False)
def test_tuple_as_args(self):
self.assertRaises(SystemExit, self.module.run_command, ('ls', '/'))
self.assertTrue(self.module.fail_json.called)
def test_unsafe_shell(self):
self.module.run_command('ls a " b" "c "', use_unsafe_shell=True)
self.assertTrue(self.subprocess.Popen.called)
args, kwargs = self.subprocess.Popen.call_args
self.assertEqual(args, ('ls a " b" "c "', ))
self.assertEqual(kwargs['shell'], True)
def test_cwd(self):
self.os.getcwd.return_value = '/old'
self.module.run_command('/bin/ls', cwd='/new')
self.assertEqual(self.os.chdir.mock_calls,
[call('/new'), call('/old'), ])
def test_cwd_relative_path(self):
self.os.getcwd.return_value = '/old'
self.module.run_command('/bin/ls', cwd='sub-dir')
self.assertEqual(self.os.chdir.mock_calls,
[call('/old/sub-dir'), call('/old'), ])
def test_cwd_not_a_dir(self):
self.os.getcwd.return_value = '/old'
self.os.path.isdir.side_effect = lambda d: d != '/not-a-dir'
self.module.run_command('/bin/ls', cwd='/not-a-dir')
self.assertEqual(self.os.chdir.mock_calls, [call('/old'), ])
def test_cwd_inaccessible(self):
self.assertRaises(SystemExit, self.module.run_command, '/bin/ls', cwd='/inaccessible')
self.assertTrue(self.module.fail_json.called)
args, kwargs = self.module.fail_json.call_args
self.assertEqual(kwargs['rc'], errno.EPERM)
def test_prompt_bad_regex(self):
self.assertRaises(SystemExit, self.module.run_command, 'foo', prompt_regex='[pP)assword:')
self.assertTrue(self.module.fail_json.called)
def test_prompt_no_match(self):
self.cmd_out[sentinel.stdout] = BytesIO(b'hello')
(rc, _, _) = self.module.run_command('foo', prompt_regex='[pP]assword:')
self.assertEqual(rc, 0)
def test_prompt_match_wo_data(self):
self.cmd_out[sentinel.stdout] = BytesIO(b'Authentication required!\nEnter password: ')
(rc, _, _) = self.module.run_command('foo', prompt_regex=r'[pP]assword:', data=None)
self.assertEqual(rc, 257)
def test_check_rc_false(self):
self.cmd.returncode = 1
(rc, _, _) = self.module.run_command('/bin/false', check_rc=False)
self.assertEqual(rc, 1)
def test_check_rc_true(self):
self.cmd.returncode = 1
self.assertRaises(SystemExit, self.module.run_command, '/bin/false', check_rc=True)
self.assertTrue(self.module.fail_json.called)
args, kwargs = self.module.fail_json.call_args
self.assertEqual(kwargs['rc'], 1)
def test_text_stdin(self):
(rc, stdout, stderr) = self.module.run_command('/bin/foo', data='hello world')
self.assertEqual(self.cmd.stdin.getvalue(), b'hello world\n')
def test_ascii_stdout(self):
self.cmd_out[sentinel.stdout] = BytesIO(b'hello')
(rc, stdout, stderr) = self.module.run_command('/bin/cat hello.txt')
self.assertEqual(rc, 0)
@pytest.fixture
def mock_os(mocker):
def mock_os_read(fd, nbytes):
return os._cmd_out[fd].read(nbytes)
def mock_os_chdir(path):
if path == '/inaccessible':
raise OSError(errno.EPERM, "Permission denied: '/inaccessible'")
def mock_os_abspath(path):
if path.startswith('/'):
return path
else:
return os.getcwd.return_value + '/' + path
os = mocker.patch('ansible.module_utils.basic.os')
os._cmd_out = {
# os.read() is returning 'bytes', not strings
mocker.sentinel.stdout: BytesIO(),
mocker.sentinel.stderr: BytesIO(),
}
os.path.expandvars.side_effect = lambda x: x
os.path.expanduser.side_effect = lambda x: x
os.environ = {'PATH': '/bin'}
os.getcwd.return_value = '/home/foo'
os.path.isdir.return_value = True
os.chdir.side_effect = mock_os_chdir
os.read.side_effect = mock_os_read
os.path.abspath.side_effect = mock_os_abspath
yield os
@pytest.fixture
def mock_subprocess(mocker):
def mock_select(rlist, wlist, xlist, timeout=1):
return (rlist, [], [])
fake_select = mocker.patch('ansible.module_utils.basic.select')
fake_select.select.side_effect = mock_select
subprocess = mocker.patch('ansible.module_utils.basic.subprocess')
cmd = mocker.MagicMock()
cmd.returncode = 0
cmd.stdin = OpenBytesIO()
cmd.stdout.fileno.return_value = mocker.sentinel.stdout
cmd.stderr.fileno.return_value = mocker.sentinel.stderr
subprocess.Popen.return_value = cmd
yield subprocess
@pytest.fixture()
def rc_am(mocker, am, mock_os, mock_subprocess):
am.fail_json = mocker.MagicMock(side_effect=SystemExit)
am._os = mock_os
am._subprocess = mock_subprocess
yield am
class TestRunCommandArgs:
# Format is command as passed to run_command, command to Popen as list, command to Popen as string
ARGS_DATA = (
(['/bin/ls', 'a', 'b', 'c'], ['/bin/ls', 'a', 'b', 'c'], '/bin/ls a b c'),
('/bin/ls a " b" "c "', ['/bin/ls', 'a', ' b', 'c '], '/bin/ls a " b" "c "'),
)
# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize('cmd, expected, shell, stdin',
((arg, cmd_str if sh else cmd_lst, sh, {})
for (arg, cmd_lst, cmd_str), sh in product(ARGS_DATA, (True, False))),
indirect=['stdin'])
def test_args(self, cmd, expected, shell, rc_am):
rc_am.run_command(cmd, use_unsafe_shell=shell)
assert rc_am._subprocess.Popen.called
args, kwargs = rc_am._subprocess.Popen.call_args
assert args == (expected, )
assert kwargs['shell'] == shell
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_tuple_as_args(self, rc_am):
with pytest.raises(SystemExit):
rc_am.run_command(('ls', '/'))
assert rc_am.fail_json.called
class TestRunCommandCwd:
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_cwd(self, mocker, rc_am):
rc_am._os.getcwd.return_value = '/old'
rc_am.run_command('/bin/ls', cwd='/new')
assert rc_am._os.chdir.mock_calls == [mocker.call('/new'), mocker.call('/old'), ]
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_cwd_relative_path(self, mocker, rc_am):
rc_am._os.getcwd.return_value = '/old'
rc_am.run_command('/bin/ls', cwd='sub-dir')
assert rc_am._os.chdir.mock_calls == [mocker.call('/old/sub-dir'), mocker.call('/old'), ]
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_cwd_not_a_dir(self, mocker, rc_am):
rc_am._os.getcwd.return_value = '/old'
rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir'
rc_am.run_command('/bin/ls', cwd='/not-a-dir')
assert rc_am._os.chdir.mock_calls == [mocker.call('/old'), ]
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_cwd_inaccessible(self, rc_am):
with pytest.raises(SystemExit):
rc_am.run_command('/bin/ls', cwd='/inaccessible')
assert rc_am.fail_json.called
args, kwargs = rc_am.fail_json.call_args
assert kwargs['rc'] == errno.EPERM
class TestRunCommandPrompt:
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_prompt_bad_regex(self, rc_am):
with pytest.raises(SystemExit):
rc_am.run_command('foo', prompt_regex='[pP)assword:')
assert rc_am.fail_json.called
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_prompt_no_match(self, mocker, rc_am):
rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello')
(rc, _, _) = rc_am.run_command('foo', prompt_regex='[pP]assword:')
assert rc == 0
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_prompt_match_wo_data(self, mocker, rc_am):
rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'Authentication required!\nEnter password: ')
(rc, _, _) = rc_am.run_command('foo', prompt_regex=r'[pP]assword:', data=None)
assert rc == 257
class TestRunCommandRc:
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_check_rc_false(self, rc_am):
rc_am._subprocess.Popen.return_value.returncode = 1
(rc, _, _) = rc_am.run_command('/bin/false', check_rc=False)
assert rc == 1
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_check_rc_true(self, rc_am):
rc_am._subprocess.Popen.return_value.returncode = 1
with pytest.raises(SystemExit):
rc_am.run_command('/bin/false', check_rc=True)
assert rc_am.fail_json.called
args, kwargs = rc_am.fail_json.call_args
assert kwargs['rc'] == 1
class TestRunCommandOutput:
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_text_stdin(self, rc_am):
(rc, stdout, stderr) = rc_am.run_command('/bin/foo', data='hello world')
assert rc_am._subprocess.Popen.return_value.stdin.getvalue() == b'hello world\n'
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_ascii_stdout(self, mocker, rc_am):
rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello')
(rc, stdout, stderr) = rc_am.run_command('/bin/cat hello.txt')
assert rc == 0
# module_utils function. On py3 it returns text and py2 it returns
# bytes because it's returning native strings
if PY3:
self.assertEqual(stdout, u'hello')
else:
self.assertEqual(stdout, b'hello')
def test_utf8_output(self):
self.cmd_out[sentinel.stdout] = BytesIO(u'Žarn§'.encode('utf-8'))
self.cmd_out[sentinel.stderr] = BytesIO(u'لرئيسية'.encode('utf-8'))
(rc, stdout, stderr) = self.module.run_command('/bin/something_ugly')
self.assertEqual(rc, 0)
assert stdout == 'hello'
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
def test_utf8_output(self, mocker, rc_am):
rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(u'Žarn§'.encode('utf-8'))
rc_am._os._cmd_out[mocker.sentinel.stderr] = BytesIO(u'لرئيسية'.encode('utf-8'))
(rc, stdout, stderr) = rc_am.run_command('/bin/something_ugly')
assert rc == 0
# module_utils function. On py3 it returns text and py2 it returns
# bytes because it's returning native strings
if PY3:
self.assertEqual(stdout, u'Žarn§')
self.assertEqual(stderr, u'لرئيسية')
else:
self.assertEqual(stdout.decode('utf-8'), u'Žarn§')
self.assertEqual(stderr.decode('utf-8'), u'لرئيسية')
assert stdout == to_native(u'Žarn§')
assert stderr == to_native(u'لرئيسية')

@ -1,99 +1,70 @@
# -*- coding: utf-8 -*-
# (c) 2015-2016, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# (c) 2015-2017, Toshio Kuratomi <tkuratomi@ansible.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
__metaclass__ = type
import sys
import json
from ansible.compat.tests import unittest
from units.mock.procenv import ModuleTestCase
from units.mock.generator import add_method
from itertools import chain
import pytest
# Strings that should be converted into a typed value
VALID_STRINGS = (
[("'a'", 'a')],
[("'1'", '1')],
[("1", 1)],
[("True", True)],
[("False", False)],
[("{}", {})],
("'a'", 'a'),
("'1'", '1'),
("1", 1),
("True", True),
("False", False),
("{}", {}),
)
# Passing things that aren't strings should just return the object
NONSTRINGS = (
[({'a': 1}, {'a': 1})],
({'a': 1}, {'a': 1}),
)
# These strings are not basic types. For security, these should not be
# executed. We return the same string and get an exception for some
INVALID_STRINGS = (
[("a=1", "a=1", SyntaxError)],
[("a.foo()", "a.foo()", None)],
[("import foo", "import foo", None)],
[("__import__('foo')", "__import__('foo')", ValueError)],
("a=1", "a=1", SyntaxError),
("a.foo()", "a.foo()", None),
("import foo", "import foo", None),
("__import__('foo')", "__import__('foo')", ValueError),
)
def _check_simple_types(self, code, expected):
@pytest.mark.parametrize('code, expected, stdin',
((c, e, {}) for c, e in chain(VALID_STRINGS, NONSTRINGS)),
indirect=['stdin'])
def test_simple_types(am, code, expected):
# test some basic usage for various types
self.assertEqual(self.am.safe_eval(code), expected)
assert am.safe_eval(code) == expected
def _check_simple_types_with_exceptions(self, code, expected):
@pytest.mark.parametrize('code, expected, stdin',
((c, e, {}) for c, e in chain(VALID_STRINGS, NONSTRINGS)),
indirect=['stdin'])
def test_simple_types_with_exceptions(am, code, expected):
# Test simple types with exceptions requested
self.assertEqual(self.am.safe_eval(code, include_exceptions=True), (expected, None))
def _check_invalid_strings(self, code, expected):
self.assertEqual(self.am.safe_eval(code), expected)
def _check_invalid_strings_with_exceptions(self, code, expected, exception):
res = self.am.safe_eval("a=1", include_exceptions=True)
self.assertEqual(res[0], "a=1")
self.assertEqual(type(res[1]), SyntaxError)
@add_method(_check_simple_types, *VALID_STRINGS)
@add_method(_check_simple_types, *NONSTRINGS)
@add_method(_check_simple_types_with_exceptions, *VALID_STRINGS)
@add_method(_check_simple_types_with_exceptions, *NONSTRINGS)
@add_method(_check_invalid_strings, *[[i[0][0:-1]] for i in INVALID_STRINGS])
@add_method(_check_invalid_strings_with_exceptions, *INVALID_STRINGS)
class TestSafeEval(ModuleTestCase):
def setUp(self):
super(TestSafeEval, self).setUp()
from ansible.module_utils import basic
self.old_ansible_args = basic._ANSIBLE_ARGS
basic._ANSIBLE_ARGS = None
self.am = basic.AnsibleModule(
argument_spec=dict(),
)
def tearDown(self):
super(TestSafeEval, self).tearDown()
from ansible.module_utils import basic
basic._ANSIBLE_ARGS = self.old_ansible_args
assert am.safe_eval(code, include_exceptions=True), (expected, None)
@pytest.mark.parametrize('code, expected, stdin',
((c, e, {}) for c, e, dummy in INVALID_STRINGS),
indirect=['stdin'])
def test_invalid_strings(am, code, expected):
assert am.safe_eval(code) == expected
@pytest.mark.parametrize('code, expected, exception, stdin',
((c, e, ex, {}) for c, e, ex in INVALID_STRINGS),
indirect=['stdin'])
def test_invalid_strings_with_exceptions(am, code, expected, exception):
res = am.safe_eval(code, include_exceptions=True)
assert res[0] == expected
if exception is None:
assert res[1] == exception
else:
assert type(res[1]) == exception

@ -1,144 +1,147 @@
# -*- coding: utf-8 -*-
# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import pytest
from itertools import product
try:
import builtins
except ImportError:
import __builtin__ as builtins
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.module_utils import known_hosts
from units.mock.procenv import ModuleTestCase
from units.mock.generator import add_method
import pytest
class TestSetModeIfDifferentBase(ModuleTestCase):
SYNONYMS_0660 = (
0o660,
'0o660',
'660',
'u+rw-x,g+rw-x,o-rwx',
'u=rw,g=rw,o-rwx',
)
def setUp(self):
self.mock_stat1 = MagicMock()
self.mock_stat1.st_mode = 0o444
self.mock_stat2 = MagicMock()
self.mock_stat2.st_mode = 0o660
super(TestSetModeIfDifferentBase, self).setUp()
from ansible.module_utils import basic
self.old_ANSIBLE_ARGS = basic._ANSIBLE_ARGS
basic._ANSIBLE_ARGS = None
@pytest.fixture
def mock_stats(mocker):
mock_stat1 = mocker.MagicMock()
mock_stat1.st_mode = 0o444
mock_stat2 = mocker.MagicMock()
mock_stat2.st_mode = 0o660
yield {"before": mock_stat1, "after": mock_stat2}
self.am = basic.AnsibleModule(
argument_spec=dict(),
)
def tearDown(self):
super(TestSetModeIfDifferentBase, self).tearDown()
from ansible.module_utils import basic
basic._ANSIBLE_ARGS = self.old_ANSIBLE_ARGS
@pytest.fixture
def am_check_mode(am):
am.check_mode = True
yield am
am.check_mode = False
def _check_no_mode_given_returns_previous_changes(self, previous_changes=True):
with patch('os.lstat', side_effect=[self.mock_stat1]):
@pytest.fixture
def mock_lchmod(mocker):
m_lchmod = mocker.patch('ansible.module_utils.basic.os.lchmod', return_value=None, create=True)
yield m_lchmod
self.assertEqual(self.am.set_mode_if_different('/path/to/file', None, previous_changes), previous_changes)
@pytest.mark.parametrize('previous_changes, check_mode, stdin',
product((True, False), (True, False), ({},)),
indirect=['stdin'])
def test_no_mode_given_returns_previous_changes(am, mock_stats, mock_lchmod, mocker, previous_changes, check_mode):
am.check_mode = check_mode
mocker.patch('os.lstat', side_effect=[mock_stats['before']])
m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True)
def _check_mode_changed_to_0660(self, mode):
# Note: This is for checking that all the different ways of specifying
# 0660 mode work. It cannot be used to check that setting a mode that is
# not equivalent to 0660 works.
with patch('os.lstat', side_effect=[self.mock_stat1, self.mock_stat2, self.mock_stat2]) as m_lstat:
with patch('os.lchmod', return_value=None, create=True) as m_lchmod:
self.assertEqual(self.am.set_mode_if_different('/path/to/file', mode, False), True)
m_lchmod.assert_called_with(b'/path/to/file', 0o660)
assert am.set_mode_if_different('/path/to/file', None, previous_changes) == previous_changes
assert not m_lchmod.called
def _check_mode_unchanged_when_already_0660(self, mode):
@pytest.mark.parametrize('mode, check_mode, stdin',
product(SYNONYMS_0660, (True, False), ({},)),
indirect=['stdin'])
def test_mode_changed_to_0660(am, mock_stats, mocker, mode, check_mode):
# Note: This is for checking that all the different ways of specifying
# 0660 mode work. It cannot be used to check that setting a mode that is
# not equivalent to 0660 works.
with patch('os.lstat', side_effect=[self.mock_stat2, self.mock_stat2, self.mock_stat2]) as m_lstat:
self.assertEqual(self.am.set_mode_if_different('/path/to/file', mode, False), False)
am.check_mode = check_mode
mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after'], mock_stats['after']])
m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True)
SYNONYMS_0660 = (
[[0o660]],
[['0o660']],
[['660']],
)
assert am.set_mode_if_different('/path/to/file', mode, False)
if check_mode:
assert not m_lchmod.called
else:
m_lchmod.assert_called_with(b'/path/to/file', 0o660)
@add_method(_check_no_mode_given_returns_previous_changes, [dict(previous_changes=True)], [dict(previous_changes=False)], )
@add_method(_check_mode_changed_to_0660, *SYNONYMS_0660)
@add_method(_check_mode_unchanged_when_already_0660, *SYNONYMS_0660)
class TestSetModeIfDifferent(TestSetModeIfDifferentBase):
def test_module_utils_basic_ansible_module_set_mode_if_different(self):
with patch('os.lstat') as m:
with patch('os.lchmod', return_value=None, create=True) as m_os:
m.side_effect = [self.mock_stat1, self.mock_stat2, self.mock_stat2]
self.am._symbolic_mode_to_octal = MagicMock(side_effect=Exception)
with pytest.raises(SystemExit):
self.am.set_mode_if_different('/path/to/file', 'o+w,g+w,a-r', False)
original_hasattr = hasattr
def _hasattr(obj, name):
if obj == os and name == 'lchmod':
return False
return original_hasattr(obj, name)
# FIXME: this isn't working yet
with patch('os.lstat', side_effect=[self.mock_stat1, self.mock_stat2]):
with patch.object(builtins, 'hasattr', side_effect=_hasattr):
with patch('os.path.islink', return_value=False):
with patch('os.chmod', return_value=None) as m_chmod:
self.assertEqual(self.am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False), True)
with patch('os.lstat', side_effect=[self.mock_stat1, self.mock_stat2]):
with patch.object(builtins, 'hasattr', side_effect=_hasattr):
with patch('os.path.islink', return_value=True):
with patch('os.chmod', return_value=None) as m_chmod:
with patch('os.stat', return_value=self.mock_stat2):
self.assertEqual(self.am.set_mode_if_different('/path/to/file', 0o660, False), True)
def _check_knows_to_change_to_0660_in_check_mode(self, mode):
@pytest.mark.parametrize('mode, check_mode, stdin',
product(SYNONYMS_0660, (True, False), ({},)),
indirect=['stdin'])
def test_mode_unchanged_when_already_0660(am, mock_stats, mocker, mode, check_mode):
# Note: This is for checking that all the different ways of specifying
# 0660 mode work. It cannot be used to check that setting a mode that is
# not equivalent to 0660 works.
with patch('os.lstat', side_effect=[self.mock_stat1, self.mock_stat2, self.mock_stat2]) as m_lstat:
self.assertEqual(self.am.set_mode_if_different('/path/to/file', mode, False), True)
@add_method(_check_no_mode_given_returns_previous_changes, [dict(previous_changes=True)], [dict(previous_changes=False)],)
@add_method(_check_knows_to_change_to_0660_in_check_mode, *SYNONYMS_0660)
@add_method(_check_mode_unchanged_when_already_0660, *SYNONYMS_0660)
class TestSetModeIfDifferentWithCheckMode(TestSetModeIfDifferentBase):
def setUp(self):
super(TestSetModeIfDifferentWithCheckMode, self).setUp()
self.am.check_mode = True
def tearDown(self):
super(TestSetModeIfDifferentWithCheckMode, self).tearDown()
self.am.check_mode = False
am.check_mode = check_mode
mocker.patch('os.lstat', side_effect=[mock_stats['after'], mock_stats['after'], mock_stats['after']])
m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True)
assert not am.set_mode_if_different('/path/to/file', mode, False)
assert not m_lchmod.called
@pytest.mark.parametrize('check_mode, stdin',
product((True, False), ({},)),
indirect=['stdin'])
def test_missing_lchmod_is_not_link(am, mock_stats, mocker, check_mode):
"""Some platforms have lchmod (*BSD) others do not (Linux)"""
am.check_mode = check_mode
original_hasattr = hasattr
def _hasattr(obj, name):
if obj == os and name == 'lchmod':
return False
return original_hasattr(obj, name)
mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after']])
mocker.patch.object(builtins, 'hasattr', side_effect=_hasattr)
mocker.patch('os.path.islink', return_value=False)
m_chmod = mocker.patch('os.chmod', return_value=None)
assert am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False)
if check_mode:
assert not m_chmod.called
else:
m_chmod.assert_called_with(b'/path/to/file/no_lchmod', 0o660)
@pytest.mark.parametrize('check_mode, stdin',
product((True, False), ({},)),
indirect=['stdin'])
def test_missing_lchmod_is_link(am, mock_stats, mocker, check_mode):
"""Some platforms have lchmod (*BSD) others do not (Linux)"""
am.check_mode = check_mode
original_hasattr = hasattr
def _hasattr(obj, name):
if obj == os and name == 'lchmod':
return False
return original_hasattr(obj, name)
mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after']])
mocker.patch.object(builtins, 'hasattr', side_effect=_hasattr)
mocker.patch('os.path.islink', return_value=True)
m_chmod = mocker.patch('os.chmod', return_value=None)
mocker.patch('os.stat', return_value=mock_stats['after'])
assert am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False)
if check_mode:
assert not m_chmod.called
else:
m_chmod.assert_called_with(b'/path/to/file/no_lchmod', 0o660)

@ -0,0 +1,65 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import json
import sys
from collections import MutableMapping
from io import BytesIO
import pytest
import ansible.module_utils.basic
from ansible.module_utils.six import PY3, string_types
from ansible.module_utils._text import to_bytes
@pytest.fixture
def stdin(mocker, request):
old_args = ansible.module_utils.basic._ANSIBLE_ARGS
ansible.module_utils.basic._ANSIBLE_ARGS = None
old_argv = sys.argv
sys.argv = ['ansible_unittest']
if isinstance(request.param, string_types):
args = request.param
elif isinstance(request.param, MutableMapping):
if 'ANSIBLE_MODULE_ARGS' not in request.param:
request.param = {'ANSIBLE_MODULE_ARGS': request.param}
args = json.dumps(request.param)
else:
raise Exception('Malformed data to the stdin pytest fixture')
fake_stdin = BytesIO(to_bytes(args, errors='surrogate_or_strict'))
if PY3:
mocker.patch('ansible.module_utils.basic.sys.stdin', mocker.MagicMock())
mocker.patch('ansible.module_utils.basic.sys.stdin.buffer', fake_stdin)
else:
mocker.patch('ansible.module_utils.basic.sys.stdin', fake_stdin)
yield fake_stdin
ansible.module_utils.basic._ANSIBLE_ARGS = old_args
sys.argv = old_argv
@pytest.fixture
def am(stdin, request):
old_args = ansible.module_utils.basic._ANSIBLE_ARGS
ansible.module_utils.basic._ANSIBLE_ARGS = None
old_argv = sys.argv
sys.argv = ['ansible_unittest']
argspec = {}
if hasattr(request, 'param'):
if isinstance(request.param, dict):
argspec = request.param
am = ansible.module_utils.basic.AnsibleModule(
argument_spec=argspec,
)
am._name = 'ansible_unittest'
yield am
ansible.module_utils.basic._ANSIBLE_ARGS = old_args
sys.argv = old_argv

@ -1,36 +1,18 @@
# -*- coding: utf-8 -*-
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright: (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from itertools import product
# to work around basic.py reading stdin
import json
import pytest
from units.mock.procenv import swap_stdin_and_argv
# for testing
from ansible.compat.tests.mock import patch
# the module we are actually testing (sort of
# the module we are actually testing (sort of)
from ansible.module_utils.facts.system.distribution import DistributionFactCollector
# to generate the testcase data, you can use the script gen_distribution_version_testcase.py in hacking/tests
TESTSETS = [
{
@ -880,8 +862,8 @@ DISTRIB_DESCRIPTION="CoreOS 976.0.0 (Coeur Rouge)"
]
@pytest.mark.parametrize("testcase", TESTSETS, ids=lambda x: x['name'])
def test_distribution_version(testcase):
@pytest.mark.parametrize("stdin, testcase", product([{}], TESTSETS), ids=lambda x: x['name'], indirect=['stdin'])
def test_distribution_version(am, mocker, testcase):
"""tests the distribution parsing code of the Facts class
testsets have
@ -891,26 +873,10 @@ def test_distribution_version(testcase):
* all files that are not listed here are assumed to not exist at all
* the output of pythons platform.dist()
* results for the ansible variables distribution* and os_family
"""
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}))
with swap_stdin_and_argv(stdin_data=args):
basic._ANSIBLE_ARGS = None
module = basic.AnsibleModule(argument_spec=dict())
_test_one_distribution(module, testcase)
def _test_one_distribution(module, testcase):
"""run the test on one distribution testcase
* prepare some mock functions to get the testdata in
* run Facts()
* compare with the expected output
"""
# prepare some mock functions to get the testdata in
def mock_get_file_content(fname, default=None, strip=True):
"""give fake content if it exists, otherwise pretend the file is empty"""
data = default
@ -922,7 +888,7 @@ def _test_one_distribution(module, testcase):
data = data.strip()
return data
def mock_get_uname_version(module):
def mock_get_uname_version(am):
return testcase.get('uname_v', None)
def mock_file_exists(fname, allow_empty=False):
@ -942,19 +908,19 @@ def _test_one_distribution(module, testcase):
def mock_platform_version():
return testcase.get('platform.version', '')
@patch('ansible.module_utils.facts.system.distribution.get_file_content', mock_get_file_content)
@patch('ansible.module_utils.facts.system.distribution.get_uname_version', mock_get_uname_version)
@patch('ansible.module_utils.facts.system.distribution._file_exists', mock_file_exists)
@patch('platform.dist', lambda: testcase['platform.dist'])
@patch('platform.system', mock_platform_system)
@patch('platform.release', mock_platform_release)
@patch('platform.version', mock_platform_version)
def get_facts(testcase):
distro_collector = DistributionFactCollector()
res = distro_collector.collect(module)
return res
mocker.patch('ansible.module_utils.facts.system.distribution.get_file_content', mock_get_file_content)
mocker.patch('ansible.module_utils.facts.system.distribution.get_uname_version', mock_get_uname_version)
mocker.patch('ansible.module_utils.facts.system.distribution._file_exists', mock_file_exists)
mocker.patch('platform.dist', lambda: testcase['platform.dist'])
mocker.patch('platform.system', mock_platform_system)
mocker.patch('platform.release', mock_platform_release)
mocker.patch('platform.version', mock_platform_version)
# run Facts()
distro_collector = DistributionFactCollector()
generated_facts = distro_collector.collect(am)
generated_facts = get_facts(testcase)
# compare with the expected output
# testcase['result'] has a list of variables and values it expects Facts() to set
for key, val in testcase['result'].items():

@ -1,135 +1,119 @@
# -*- coding: utf-8 -*-
# (c) 2015, Michael Scherer <mscherer@redhat.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import os.path
import ansible.module_utils.basic
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import Mock, patch
import pytest
from ansible.module_utils import known_hosts
from units.mock.procenv import swap_stdin_and_argv
class TestAnsibleModuleKnownHosts(unittest.TestCase):
urls = {
'ssh://one.example.org/example.git': {
'is_ssh_url': True,
'get_fqdn': 'one.example.org',
'add_host_key_cmd': " -t rsa one.example.org",
'port': None,
},
'ssh+git://two.example.org/example.git': {
'is_ssh_url': True,
'get_fqdn': 'two.example.org',
'add_host_key_cmd': " -t rsa two.example.org",
'port': None,
},
'rsync://three.example.org/user/example.git': {
'is_ssh_url': False,
'get_fqdn': 'three.example.org',
'add_host_key_cmd': None, # not called for non-ssh urls
'port': None,
},
'git@four.example.org:user/example.git': {
'is_ssh_url': True,
'get_fqdn': 'four.example.org',
'add_host_key_cmd': " -t rsa four.example.org",
'port': None,
},
'git+ssh://five.example.org/example.git': {
'is_ssh_url': True,
'get_fqdn': 'five.example.org',
'add_host_key_cmd': " -t rsa five.example.org",
'port': None,
},
'ssh://six.example.org:21/example.org': {
# ssh on FTP Port?
'is_ssh_url': True,
'get_fqdn': 'six.example.org',
'add_host_key_cmd': " -t rsa -p 21 six.example.org",
'port': '21',
},
'ssh://[2001:DB8::abcd:abcd]/example.git': {
'is_ssh_url': True,
'get_fqdn': '[2001:DB8::abcd:abcd]',
'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]",
'port': None,
},
'ssh://[2001:DB8::abcd:abcd]:22/example.git': {
'is_ssh_url': True,
'get_fqdn': '[2001:DB8::abcd:abcd]',
'add_host_key_cmd': " -t rsa -p 22 [2001:DB8::abcd:abcd]",
'port': '22',
},
'username@[2001:DB8::abcd:abcd]/example.git': {
'is_ssh_url': True,
'get_fqdn': '[2001:DB8::abcd:abcd]',
'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]",
'port': None,
},
'username@[2001:DB8::abcd:abcd]:path/example.git': {
'is_ssh_url': True,
'get_fqdn': '[2001:DB8::abcd:abcd]',
'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]",
'port': None,
},
'ssh://internal.git.server:7999/repos/repo.git': {
'is_ssh_url': True,
'get_fqdn': 'internal.git.server',
'add_host_key_cmd': " -t rsa -p 7999 internal.git.server",
'port': '7999',
},
}
def test_is_ssh_url(self):
for u in self.urls:
self.assertEqual(known_hosts.is_ssh_url(u), self.urls[u]['is_ssh_url'])
def test_get_fqdn_and_port(self):
for u in self.urls:
self.assertEqual(known_hosts.get_fqdn_and_port(u), (self.urls[u]['get_fqdn'], self.urls[u]['port']))
def test_add_host_key(self):
# Copied
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
with swap_stdin_and_argv(stdin_data=args):
ansible.module_utils.basic._ANSIBLE_ARGS = None
self.module = ansible.module_utils.basic.AnsibleModule(argument_spec=dict())
get_bin_path = Mock()
get_bin_path.return_value = keyscan_cmd = "/custom/path/ssh-keyscan"
self.module.get_bin_path = get_bin_path
run_command = Mock()
run_command.return_value = (0, "Needs output, otherwise thinks ssh-keyscan timed out'", "")
self.module.run_command = run_command
append_to_file = Mock()
append_to_file.return_value = (None,)
self.module.append_to_file = append_to_file
with patch('os.path.isdir', return_value=True):
with patch('os.path.exists', return_value=True):
for u in self.urls:
if self.urls[u]['is_ssh_url']:
known_hosts.add_host_key(self.module, self.urls[u]['get_fqdn'], port=self.urls[u]['port'])
run_command.assert_called_with(keyscan_cmd + self.urls[u]['add_host_key_cmd'])
URLS = {
'ssh://one.example.org/example.git': {
'is_ssh_url': True,
'get_fqdn': 'one.example.org',
'add_host_key_cmd': " -t rsa one.example.org",
'port': None,
},
'ssh+git://two.example.org/example.git': {
'is_ssh_url': True,
'get_fqdn': 'two.example.org',
'add_host_key_cmd': " -t rsa two.example.org",
'port': None,
},
'rsync://three.example.org/user/example.git': {
'is_ssh_url': False,
'get_fqdn': 'three.example.org',
'add_host_key_cmd': None, # not called for non-ssh urls
'port': None,
},
'git@four.example.org:user/example.git': {
'is_ssh_url': True,
'get_fqdn': 'four.example.org',
'add_host_key_cmd': " -t rsa four.example.org",
'port': None,
},
'git+ssh://five.example.org/example.git': {
'is_ssh_url': True,
'get_fqdn': 'five.example.org',
'add_host_key_cmd': " -t rsa five.example.org",
'port': None,
},
'ssh://six.example.org:21/example.org': {
# ssh on FTP Port?
'is_ssh_url': True,
'get_fqdn': 'six.example.org',
'add_host_key_cmd': " -t rsa -p 21 six.example.org",
'port': '21',
},
'ssh://[2001:DB8::abcd:abcd]/example.git': {
'is_ssh_url': True,
'get_fqdn': '[2001:DB8::abcd:abcd]',
'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]",
'port': None,
},
'ssh://[2001:DB8::abcd:abcd]:22/example.git': {
'is_ssh_url': True,
'get_fqdn': '[2001:DB8::abcd:abcd]',
'add_host_key_cmd': " -t rsa -p 22 [2001:DB8::abcd:abcd]",
'port': '22',
},
'username@[2001:DB8::abcd:abcd]/example.git': {
'is_ssh_url': True,
'get_fqdn': '[2001:DB8::abcd:abcd]',
'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]",
'port': None,
},
'username@[2001:DB8::abcd:abcd]:path/example.git': {
'is_ssh_url': True,
'get_fqdn': '[2001:DB8::abcd:abcd]',
'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]",
'port': None,
},
'ssh://internal.git.server:7999/repos/repo.git': {
'is_ssh_url': True,
'get_fqdn': 'internal.git.server',
'add_host_key_cmd': " -t rsa -p 7999 internal.git.server",
'port': '7999',
},
}
@pytest.mark.parametrize('url, is_ssh_url', ((k, v['is_ssh_url']) for k, v in URLS.items()))
def test_is_ssh_url(url, is_ssh_url):
assert known_hosts.is_ssh_url(url) == is_ssh_url
@pytest.mark.parametrize('url, fqdn, port', ((k, v['get_fqdn'], v['port']) for k, v in URLS.items()))
def test_get_fqdn_and_port(url, fqdn, port):
assert known_hosts.get_fqdn_and_port(url) == (fqdn, port)
@pytest.mark.parametrize('fqdn, port, add_host_key_cmd, stdin',
((v['get_fqdn'], v['port'], v['add_host_key_cmd'], {})
for v in URLS.values() if v['is_ssh_url']),
indirect=['stdin'])
def test_add_host_key(am, mocker, fqdn, port, add_host_key_cmd):
get_bin_path = mocker.MagicMock()
get_bin_path.return_value = keyscan_cmd = "/custom/path/ssh-keyscan"
am.get_bin_path = get_bin_path
run_command = mocker.MagicMock()
run_command.return_value = (0, "Needs output, otherwise thinks ssh-keyscan timed out'", "")
am.run_command = run_command
append_to_file = mocker.MagicMock()
append_to_file.return_value = (None,)
am.append_to_file = append_to_file
mocker.patch('os.path.isdir', return_value=True)
mocker.patch('os.path.exists', return_value=True)
known_hosts.add_host_key(am, fqdn, port=port)
run_command.assert_called_with(keyscan_cmd + add_host_key_cmd)

@ -1,13 +1,13 @@
import json
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import sys
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six.moves import builtins
from ansible.module_utils._text import to_native
from units.mock.procenv import swap_stdin_and_argv
import pprint

@ -1,34 +1,23 @@
# -*- coding: utf-8 -*-
# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
__metaclass__ = type
from ansible.compat.tests import unittest
from ansible.module_utils.six import PY3
from units.mock.generator import add_method
import itertools
import pytest
from ansible.module_utils.six import PY3
# Internal API while this is still being developed. Eventually move to
# module_utils.text
# module_utils.common.text
from ansible.module_utils._text import to_text, to_bytes, to_native
# Format: byte representation, text representation, encoding of byte representation
VALID_STRINGS = (
(b'abcde', u'abcde', 'ascii'),
@ -40,29 +29,25 @@ VALID_STRINGS = (
)
def _check_to_text(self, in_string, encoding, expected):
@pytest.mark.parametrize('in_string, encoding, expected',
itertools.chain(((d[0], d[2], d[1]) for d in VALID_STRINGS),
((d[1], d[2], d[1]) for d in VALID_STRINGS)))
def test_to_text(in_string, encoding, expected):
"""test happy path of decoding to text"""
self.assertEqual(to_text(in_string, encoding), expected)
assert to_text(in_string, encoding) == expected
def _check_to_bytes(self, in_string, encoding, expected):
@pytest.mark.parametrize('in_string, encoding, expected',
itertools.chain(((d[0], d[2], d[0]) for d in VALID_STRINGS),
((d[1], d[2], d[0]) for d in VALID_STRINGS)))
def test_to_bytes(in_string, encoding, expected):
"""test happy path of encoding to bytes"""
self.assertEqual(to_bytes(in_string, encoding), expected)
assert to_bytes(in_string, encoding) == expected
def _check_to_native(self, in_string, encoding, py2_expected, py3_expected):
@pytest.mark.parametrize('in_string, encoding, expected',
itertools.chain(((d[0], d[2], d[1] if PY3 else d[0]) for d in VALID_STRINGS),
((d[1], d[2], d[1] if PY3 else d[0]) for d in VALID_STRINGS)))
def test_to_native(in_string, encoding, expected):
"""test happy path of encoding to native strings"""
if PY3:
self.assertEqual(to_native(in_string, encoding), py3_expected)
else:
self.assertEqual(to_native(in_string, encoding), py2_expected)
@add_method(_check_to_text, [(i[0], i[2], i[1]) for i in VALID_STRINGS])
@add_method(_check_to_text, [(i[1], i[2], i[1]) for i in VALID_STRINGS])
@add_method(_check_to_bytes, [(i[0], i[2], i[0]) for i in VALID_STRINGS])
@add_method(_check_to_bytes, [(i[1], i[2], i[0]) for i in VALID_STRINGS])
@add_method(_check_to_native, [(i[0], i[2], i[0], i[1]) for i in VALID_STRINGS])
@add_method(_check_to_native, [(i[1], i[2], i[0], i[1]) for i in VALID_STRINGS])
class TestModuleUtilsText(unittest.TestCase):
pass
assert to_native(in_string, encoding) == expected

@ -0,0 +1,24 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import json
from collections import MutableMapping
import pytest
from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_bytes
@pytest.fixture
def patch_ansible_module(request, mocker):
if isinstance(request.param, string_types):
args = request.param
elif isinstance(request.param, MutableMapping):
if 'ANSIBLE_MODULE_ARGS' not in request.param:
request.param = {'ANSIBLE_MODULE_ARGS': request.param}
args = json.dumps(request.param)
else:
raise Exception('Malformed data to the patch_ansible_module pytest fixture')
mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args))

@ -1,23 +1,24 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import json
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch
from ansible.module_utils import basic
import pytest
from ansible.modules.packaging.language import pip
from units.modules.utils import set_module_args, AnsibleFailJson, ModuleTestCase
pytestmark = pytest.mark.usefixtures('patch_ansible_module')
class TestPip(ModuleTestCase):
def setUp(self):
super(TestPip, self).setUp()
@patch.object(basic.AnsibleModule, 'get_bin_path')
def test_failure_when_pip_absent(self, mock_get_bin_path):
@pytest.mark.parametrize('patch_ansible_module', [{'name': 'six'}], indirect=['patch_ansible_module'])
def test_failure_when_pip_absent(mocker, capfd):
get_bin_path = mocker.patch('ansible.module_utils.basic.AnsibleModule.get_bin_path')
get_bin_path.return_value = None
mock_get_bin_path.return_value = None
with pytest.raises(SystemExit):
pip.main()
with self.assertRaises(AnsibleFailJson):
set_module_args({'name': 'six'})
pip.main()
out, err = capfd.readouterr()
results = json.loads(out)
assert results['failed']
assert 'pip needs to be installed' in results['msg']

Loading…
Cancel
Save