|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2017 Ansible Project
|
|
|
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
|
|
|
|
# Make coding more python3-ish
|
|
|
|
from __future__ import (absolute_import, division)
|
|
|
|
__metaclass__ = type
|
|
|
|
|
|
|
|
import errno
|
|
|
|
from itertools import product
|
|
|
|
from io import BytesIO
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
from ansible.module_utils._text import to_native
|
|
|
|
|
|
|
|
|
|
|
|
class OpenBytesIO(BytesIO):
|
|
|
|
"""BytesIO with dummy close() method
|
|
|
|
|
|
|
|
So that you can inspect the content after close() was called.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def mock_os(mocker):
|
|
|
|
def mock_os_read(fd, nbytes):
|
|
|
|
return os._cmd_out[fd].read(nbytes)
|
|
|
|
|
|
|
|
def mock_os_chdir(path):
|
|
|
|
if path == '/inaccessible':
|
|
|
|
raise OSError(errno.EPERM, "Permission denied: '/inaccessible'")
|
|
|
|
|
|
|
|
def mock_os_abspath(path):
|
|
|
|
if path.startswith('/'):
|
|
|
|
return path
|
|
|
|
else:
|
|
|
|
return os.getcwd.return_value + '/' + path
|
|
|
|
|
|
|
|
os = mocker.patch('ansible.module_utils.basic.os')
|
|
|
|
os._cmd_out = {
|
|
|
|
# os.read() is returning 'bytes', not strings
|
|
|
|
mocker.sentinel.stdout: BytesIO(),
|
|
|
|
mocker.sentinel.stderr: BytesIO(),
|
|
|
|
}
|
|
|
|
|
|
|
|
os.path.expandvars.side_effect = lambda x: x
|
|
|
|
os.path.expanduser.side_effect = lambda x: x
|
|
|
|
os.environ = {'PATH': '/bin'}
|
|
|
|
os.getcwd.return_value = '/home/foo'
|
|
|
|
os.path.isdir.return_value = True
|
|
|
|
os.chdir.side_effect = mock_os_chdir
|
|
|
|
os.read.side_effect = mock_os_read
|
|
|
|
os.path.abspath.side_effect = mock_os_abspath
|
|
|
|
|
|
|
|
yield os
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def mock_subprocess(mocker):
|
|
|
|
def mock_select(rlist, wlist, xlist, timeout=1):
|
|
|
|
return (rlist, [], [])
|
|
|
|
|
|
|
|
fake_select = mocker.patch('ansible.module_utils.basic.select')
|
|
|
|
fake_select.select.side_effect = mock_select
|
|
|
|
|
|
|
|
subprocess = mocker.patch('ansible.module_utils.basic.subprocess')
|
|
|
|
cmd = mocker.MagicMock()
|
|
|
|
cmd.returncode = 0
|
|
|
|
cmd.stdin = OpenBytesIO()
|
|
|
|
cmd.stdout.fileno.return_value = mocker.sentinel.stdout
|
|
|
|
cmd.stderr.fileno.return_value = mocker.sentinel.stderr
|
|
|
|
subprocess.Popen.return_value = cmd
|
|
|
|
|
|
|
|
yield subprocess
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
def rc_am(mocker, am, mock_os, mock_subprocess):
|
|
|
|
am.fail_json = mocker.MagicMock(side_effect=SystemExit)
|
|
|
|
am._os = mock_os
|
|
|
|
am._subprocess = mock_subprocess
|
|
|
|
am.get_buffer_size = mocker.MagicMock(return_value=900)
|
|
|
|
yield am
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunCommandArgs:
|
|
|
|
# Format is command as passed to run_command, command to Popen as list, command to Popen as string
|
|
|
|
ARGS_DATA = (
|
|
|
|
(['/bin/ls', 'a', 'b', 'c'], ['/bin/ls', 'a', 'b', 'c'], '/bin/ls a b c'),
|
|
|
|
('/bin/ls a " b" "c "', ['/bin/ls', 'a', ' b', 'c '], '/bin/ls a " b" "c "'),
|
|
|
|
)
|
|
|
|
|
|
|
|
# pylint bug: https://github.com/PyCQA/pylint/issues/511
|
|
|
|
# pylint: disable=undefined-variable
|
|
|
|
@pytest.mark.parametrize('cmd, expected, shell, stdin',
|
|
|
|
((arg, cmd_str if sh else cmd_lst, sh, {})
|
|
|
|
for (arg, cmd_lst, cmd_str), sh in product(ARGS_DATA, (True, False))),
|
|
|
|
indirect=['stdin'])
|
|
|
|
def test_args(self, cmd, expected, shell, rc_am):
|
|
|
|
rc_am.run_command(cmd, use_unsafe_shell=shell)
|
|
|
|
assert rc_am._subprocess.Popen.called
|
|
|
|
args, kwargs = rc_am._subprocess.Popen.call_args
|
|
|
|
assert args == (expected, )
|
|
|
|
assert kwargs['shell'] == shell
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_tuple_as_args(self, rc_am):
|
|
|
|
with pytest.raises(SystemExit):
|
|
|
|
rc_am.run_command(('ls', '/'))
|
|
|
|
assert rc_am.fail_json.called
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunCommandCwd:
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_cwd(self, mocker, rc_am):
|
|
|
|
rc_am._os.getcwd.return_value = '/old'
|
|
|
|
rc_am.run_command('/bin/ls', cwd='/new')
|
|
|
|
assert rc_am._os.chdir.mock_calls == [mocker.call('/new'), mocker.call('/old'), ]
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_cwd_relative_path(self, mocker, rc_am):
|
|
|
|
rc_am._os.getcwd.return_value = '/old'
|
|
|
|
rc_am.run_command('/bin/ls', cwd='sub-dir')
|
|
|
|
assert rc_am._os.chdir.mock_calls == [mocker.call('/old/sub-dir'), mocker.call('/old'), ]
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_cwd_not_a_dir(self, mocker, rc_am):
|
|
|
|
rc_am._os.getcwd.return_value = '/old'
|
|
|
|
rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir'
|
|
|
|
rc_am.run_command('/bin/ls', cwd='/not-a-dir')
|
|
|
|
assert rc_am._os.chdir.mock_calls == [mocker.call('/old'), ]
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_cwd_inaccessible(self, rc_am):
|
|
|
|
with pytest.raises(SystemExit):
|
|
|
|
rc_am.run_command('/bin/ls', cwd='/inaccessible')
|
|
|
|
assert rc_am.fail_json.called
|
|
|
|
args, kwargs = rc_am.fail_json.call_args
|
|
|
|
assert kwargs['rc'] == errno.EPERM
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunCommandPrompt:
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_prompt_bad_regex(self, rc_am):
|
|
|
|
with pytest.raises(SystemExit):
|
|
|
|
rc_am.run_command('foo', prompt_regex='[pP)assword:')
|
|
|
|
assert rc_am.fail_json.called
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_prompt_no_match(self, mocker, rc_am):
|
|
|
|
rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello')
|
|
|
|
(rc, _, _) = rc_am.run_command('foo', prompt_regex='[pP]assword:')
|
|
|
|
assert rc == 0
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_prompt_match_wo_data(self, mocker, rc_am):
|
|
|
|
rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'Authentication required!\nEnter password: ')
|
|
|
|
(rc, _, _) = rc_am.run_command('foo', prompt_regex=r'[pP]assword:', data=None)
|
|
|
|
assert rc == 257
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunCommandRc:
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_check_rc_false(self, rc_am):
|
|
|
|
rc_am._subprocess.Popen.return_value.returncode = 1
|
|
|
|
(rc, _, _) = rc_am.run_command('/bin/false', check_rc=False)
|
|
|
|
assert rc == 1
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_check_rc_true(self, rc_am):
|
|
|
|
rc_am._subprocess.Popen.return_value.returncode = 1
|
|
|
|
with pytest.raises(SystemExit):
|
|
|
|
rc_am.run_command('/bin/false', check_rc=True)
|
|
|
|
assert rc_am.fail_json.called
|
|
|
|
args, kwargs = rc_am.fail_json.call_args
|
|
|
|
assert kwargs['rc'] == 1
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunCommandOutput:
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_text_stdin(self, rc_am):
|
|
|
|
(rc, stdout, stderr) = rc_am.run_command('/bin/foo', data='hello world')
|
|
|
|
assert rc_am._subprocess.Popen.return_value.stdin.getvalue() == b'hello world\n'
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_ascii_stdout(self, mocker, rc_am):
|
|
|
|
rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello')
|
|
|
|
(rc, stdout, stderr) = rc_am.run_command('/bin/cat hello.txt')
|
|
|
|
assert rc == 0
|
|
|
|
# module_utils function. On py3 it returns text and py2 it returns
|
|
|
|
# bytes because it's returning native strings
|
|
|
|
assert stdout == 'hello'
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
|
|
|
def test_utf8_output(self, mocker, rc_am):
|
|
|
|
rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(u'Žarn§'.encode('utf-8'))
|
|
|
|
rc_am._os._cmd_out[mocker.sentinel.stderr] = BytesIO(u'لرئيسية'.encode('utf-8'))
|
|
|
|
(rc, stdout, stderr) = rc_am.run_command('/bin/something_ugly')
|
|
|
|
assert rc == 0
|
|
|
|
# module_utils function. On py3 it returns text and py2 it returns
|
|
|
|
# bytes because it's returning native strings
|
|
|
|
assert stdout == to_native(u'Žarn§')
|
|
|
|
assert stderr == to_native(u'لرئيسية')
|