mirror of https://github.com/ansible/ansible.git
Remove unused unit test files.
parent
9126ac53e3
commit
7e8dae5790
@ -1,118 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
#
|
|
||||||
# Copyright: (c) 2019, Bojan Vitnik <bvitnik@mainstream.rs>
|
|
||||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
||||||
|
|
||||||
from __future__ import absolute_import, division, print_function
|
|
||||||
__metaclass__ = type
|
|
||||||
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import importlib
|
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from .FakeAnsibleModule import FakeAnsibleModule
|
|
||||||
from ansible.module_utils import six
|
|
||||||
from mock import MagicMock
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def fake_ansible_module(request):
|
|
||||||
"""Returns fake AnsibleModule with fake module params."""
|
|
||||||
if hasattr(request, 'param'):
|
|
||||||
return FakeAnsibleModule(request.param)
|
|
||||||
else:
|
|
||||||
params = {
|
|
||||||
"hostname": "somehost",
|
|
||||||
"username": "someuser",
|
|
||||||
"password": "somepwd",
|
|
||||||
"validate_certs": True,
|
|
||||||
}
|
|
||||||
|
|
||||||
return FakeAnsibleModule(params)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def XenAPI():
|
|
||||||
"""Imports and returns fake XenAPI module."""
|
|
||||||
|
|
||||||
# Import of fake XenAPI module is wrapped by fixture so that it does not
|
|
||||||
# affect other unit tests which could potentialy also use XenAPI module.
|
|
||||||
|
|
||||||
# First we use importlib.import_module() to import the module and assign
|
|
||||||
# it to a local symbol.
|
|
||||||
fake_xenapi = importlib.import_module('units.module_utils.xenserver.FakeXenAPI')
|
|
||||||
|
|
||||||
# Now we populate Python module cache with imported fake module using the
|
|
||||||
# original module name (XenAPI). That way, any 'import XenAPI' statement
|
|
||||||
# will just load already imported fake module from the cache.
|
|
||||||
sys.modules['XenAPI'] = fake_xenapi
|
|
||||||
|
|
||||||
return fake_xenapi
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def xenserver(XenAPI):
|
|
||||||
"""Imports and returns xenserver module util."""
|
|
||||||
|
|
||||||
# Since we are wrapping fake XenAPI module inside a fixture, all modules
|
|
||||||
# that depend on it have to be imported inside a test function. To make
|
|
||||||
# this easier to handle and remove some code repetition, we wrap the import
|
|
||||||
# of xenserver module util with a fixture.
|
|
||||||
from ansible.module_utils import xenserver
|
|
||||||
|
|
||||||
return xenserver
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_xenapi_failure(XenAPI, mocker):
|
|
||||||
"""
|
|
||||||
Returns mock object that raises XenAPI.Failure on any XenAPI
|
|
||||||
method call.
|
|
||||||
"""
|
|
||||||
fake_error_msg = "Fake XAPI method call error!"
|
|
||||||
|
|
||||||
# We need to use our MagicMock based class that passes side_effect to its
|
|
||||||
# children because calls to xenapi methods can generate an arbitrary
|
|
||||||
# hierarchy of mock objects. Any such object when called should use the
|
|
||||||
# same side_effect as its parent mock object.
|
|
||||||
class MagicMockSideEffect(MagicMock):
|
|
||||||
def _get_child_mock(self, **kw):
|
|
||||||
child_mock = super(MagicMockSideEffect, self)._get_child_mock(**kw)
|
|
||||||
child_mock.side_effect = self.side_effect
|
|
||||||
return child_mock
|
|
||||||
|
|
||||||
mocked_xenapi = mocker.patch.object(XenAPI.Session, 'xenapi', new=MagicMockSideEffect(), create=True)
|
|
||||||
mocked_xenapi.side_effect = XenAPI.Failure(fake_error_msg)
|
|
||||||
|
|
||||||
return mocked_xenapi, fake_error_msg
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def fixture_data_from_file(request):
|
|
||||||
"""Loads fixture data from files."""
|
|
||||||
if not hasattr(request, 'param'):
|
|
||||||
return {}
|
|
||||||
|
|
||||||
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
|
|
||||||
fixture_data = {}
|
|
||||||
|
|
||||||
if isinstance(request.param, six.string_types):
|
|
||||||
request.param = [request.param]
|
|
||||||
|
|
||||||
for fixture_name in request.param:
|
|
||||||
path = os.path.join(fixture_path, fixture_name)
|
|
||||||
|
|
||||||
with open(path) as f:
|
|
||||||
data = f.read()
|
|
||||||
|
|
||||||
try:
|
|
||||||
data = json.loads(data)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
fixture_data[fixture_name] = data
|
|
||||||
|
|
||||||
return fixture_data
|
|
@ -1,80 +0,0 @@
|
|||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def api_key(monkeypatch):
|
|
||||||
monkeypatch.setenv('LINODE_API_KEY', 'foobar')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def auth(monkeypatch):
|
|
||||||
def patched_test_echo(dummy):
|
|
||||||
return []
|
|
||||||
monkeypatch.setattr('linode.api.Api.test_echo', patched_test_echo)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def access_token(monkeypatch):
|
|
||||||
monkeypatch.setenv('LINODE_ACCESS_TOKEN', 'barfoo')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def no_access_token_in_env(monkeypatch):
|
|
||||||
try:
|
|
||||||
monkeypatch.delenv('LINODE_ACCESS_TOKEN')
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def default_args():
|
|
||||||
return {'state': 'present', 'label': 'foo'}
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_linode():
|
|
||||||
class Linode():
|
|
||||||
def delete(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _raw_json(self):
|
|
||||||
return {
|
|
||||||
"alerts": {
|
|
||||||
"cpu": 90,
|
|
||||||
"io": 10000,
|
|
||||||
"network_in": 10,
|
|
||||||
"network_out": 10,
|
|
||||||
"transfer_quota": 80
|
|
||||||
},
|
|
||||||
"backups": {
|
|
||||||
"enabled": False,
|
|
||||||
"schedule": {
|
|
||||||
"day": None,
|
|
||||||
"window": None,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"created": "2018-09-26T08:12:33",
|
|
||||||
"group": "Foobar Group",
|
|
||||||
"hypervisor": "kvm",
|
|
||||||
"id": 10480444,
|
|
||||||
"image": "linode/centos7",
|
|
||||||
"ipv4": [
|
|
||||||
"130.132.285.233"
|
|
||||||
],
|
|
||||||
"ipv6": "2a82:7e00::h03c:46ff:fe04:5cd2/64",
|
|
||||||
"label": "lin-foo",
|
|
||||||
"region": "eu-west",
|
|
||||||
"specs": {
|
|
||||||
"disk": 25600,
|
|
||||||
"memory": 1024,
|
|
||||||
"transfer": 1000,
|
|
||||||
"vcpus": 1
|
|
||||||
},
|
|
||||||
"status": "running",
|
|
||||||
"tags": [],
|
|
||||||
"type": "g6-nanode-1",
|
|
||||||
"updated": "2018-09-26T10:10:14",
|
|
||||||
"watchdog_enabled": True
|
|
||||||
}
|
|
||||||
return Linode()
|
|
@ -1,75 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
#
|
|
||||||
# Copyright: (c) 2019, Bojan Vitnik <bvitnik@mainstream.rs>
|
|
||||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
||||||
|
|
||||||
from __future__ import absolute_import, division, print_function
|
|
||||||
__metaclass__ = type
|
|
||||||
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import importlib
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from .FakeAnsibleModule import FakeAnsibleModule
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def fake_ansible_module(request):
|
|
||||||
"""Returns fake AnsibleModule with fake module params."""
|
|
||||||
if hasattr(request, 'param'):
|
|
||||||
return FakeAnsibleModule(request.param)
|
|
||||||
else:
|
|
||||||
params = {
|
|
||||||
"hostname": "somehost",
|
|
||||||
"username": "someuser",
|
|
||||||
"password": "somepwd",
|
|
||||||
"validate_certs": True,
|
|
||||||
}
|
|
||||||
|
|
||||||
return FakeAnsibleModule(params)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def XenAPI():
|
|
||||||
"""Imports and returns fake XenAPI module."""
|
|
||||||
|
|
||||||
# Import of fake XenAPI module is wrapped by fixture so that it does not
|
|
||||||
# affect other unit tests which could potentialy also use XenAPI module.
|
|
||||||
|
|
||||||
# First we use importlib.import_module() to import the module and assign
|
|
||||||
# it to a local symbol.
|
|
||||||
fake_xenapi = importlib.import_module('units.modules.cloud.xenserver.FakeXenAPI')
|
|
||||||
|
|
||||||
# Now we populate Python module cache with imported fake module using the
|
|
||||||
# original module name (XenAPI). That way, any 'import XenAPI' statement
|
|
||||||
# will just load already imported fake module from the cache.
|
|
||||||
sys.modules['XenAPI'] = fake_xenapi
|
|
||||||
|
|
||||||
return fake_xenapi
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def xenserver_guest_info(XenAPI):
|
|
||||||
"""Imports and returns xenserver_guest_info module."""
|
|
||||||
|
|
||||||
# Since we are wrapping fake XenAPI module inside a fixture, all modules
|
|
||||||
# that depend on it have to be imported inside a test function. To make
|
|
||||||
# this easier to handle and remove some code repetition, we wrap the import
|
|
||||||
# of xenserver_guest_info module with a fixture.
|
|
||||||
from ansible.modules.cloud.xenserver import xenserver_guest_info
|
|
||||||
|
|
||||||
return xenserver_guest_info
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def xenserver_guest_powerstate(XenAPI):
|
|
||||||
"""Imports and returns xenserver_guest_powerstate module."""
|
|
||||||
|
|
||||||
# Since we are wrapping fake XenAPI module inside a fixture, all modules
|
|
||||||
# that depend on it have to be imported inside a test function. To make
|
|
||||||
# this easier to handle and remove some code repetition, we wrap the import
|
|
||||||
# of xenserver_guest_powerstate module with a fixture.
|
|
||||||
from ansible.modules.cloud.xenserver import xenserver_guest_powerstate
|
|
||||||
|
|
||||||
return xenserver_guest_powerstate
|
|
@ -1,29 +0,0 @@
|
|||||||
from units.compat.mock import patch
|
|
||||||
from ansible.module_utils.six.moves import xmlrpc_client
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
def get_method_name(request_body):
|
|
||||||
return xmlrpc_client.loads(request_body)[1]
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_request(request, mocker):
|
|
||||||
responses = request.getfixturevalue('testcase')['calls']
|
|
||||||
module_name = request.module.TESTED_MODULE
|
|
||||||
|
|
||||||
def transport_request(host, handler, request_body, verbose=0):
|
|
||||||
"""Fake request"""
|
|
||||||
method_name = get_method_name(request_body)
|
|
||||||
excepted_name, response = responses.pop(0)
|
|
||||||
if method_name == excepted_name:
|
|
||||||
if isinstance(response, Exception):
|
|
||||||
raise response
|
|
||||||
else:
|
|
||||||
return response
|
|
||||||
else:
|
|
||||||
raise Exception('Expected call: %r, called with: %r' % (excepted_name, method_name))
|
|
||||||
|
|
||||||
target = '{0}.xmlrpc_client.Transport.request'.format(module_name)
|
|
||||||
mocker.patch(target, side_effect=transport_request)
|
|
@ -1,23 +0,0 @@
|
|||||||
# Copyright (c) 2016-2017 Hewlett Packard Enterprise Development LP
|
|
||||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from mock import Mock, patch
|
|
||||||
from oneview_module_loader import ONEVIEW_MODULE_UTILS_PATH
|
|
||||||
from hpOneView.oneview_client import OneViewClient
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_ov_client():
|
|
||||||
patcher_json_file = patch.object(OneViewClient, 'from_json_file')
|
|
||||||
client = patcher_json_file.start()
|
|
||||||
return client.return_value
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_ansible_module():
|
|
||||||
patcher_ansible = patch(ONEVIEW_MODULE_UTILS_PATH + '.AnsibleModule')
|
|
||||||
patcher_ansible = patcher_ansible.start()
|
|
||||||
ansible_module = Mock()
|
|
||||||
patcher_ansible.return_value = ansible_module
|
|
||||||
return ansible_module
|
|
@ -1,213 +0,0 @@
|
|||||||
from __future__ import absolute_import, division, print_function
|
|
||||||
__metaclass__ = type
|
|
||||||
|
|
||||||
import errno
|
|
||||||
import os
|
|
||||||
import time
|
|
||||||
import mock
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
boto3 = pytest.importorskip("boto3")
|
|
||||||
botocore = pytest.importorskip("botocore")
|
|
||||||
placebo = pytest.importorskip("placebo")
|
|
||||||
|
|
||||||
"""
|
|
||||||
Using Placebo to test modules using boto3:
|
|
||||||
|
|
||||||
This is an example test, using the placeboify fixture to test that a module
|
|
||||||
will fail if resources it depends on don't exist.
|
|
||||||
|
|
||||||
> from placebo_fixtures import placeboify, scratch_vpc
|
|
||||||
>
|
|
||||||
> def test_create_with_nonexistent_launch_config(placeboify):
|
|
||||||
> connection = placeboify.client('autoscaling')
|
|
||||||
> module = FakeModule('test-asg-created', None, min_size=0, max_size=0, desired_capacity=0)
|
|
||||||
> with pytest.raises(FailJSON) as excinfo:
|
|
||||||
> asg_module.create_autoscaling_group(connection, module)
|
|
||||||
> .... asserts based on module state/exceptions ....
|
|
||||||
|
|
||||||
In more advanced cases, use unrecorded resource fixtures to fill in ARNs/IDs of
|
|
||||||
things modules depend on, such as:
|
|
||||||
|
|
||||||
> def test_create_in_vpc(placeboify, scratch_vpc):
|
|
||||||
> connection = placeboify.client('autoscaling')
|
|
||||||
> module = FakeModule(name='test-asg-created',
|
|
||||||
> min_size=0, max_size=0, desired_capacity=0,
|
|
||||||
> availability_zones=[s['az'] for s in scratch_vpc['subnets']],
|
|
||||||
> vpc_zone_identifier=[s['id'] for s in scratch_vpc['subnets']],
|
|
||||||
> )
|
|
||||||
> ..... so on and so forth ....
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def placeboify(request, monkeypatch):
|
|
||||||
"""This fixture puts a recording/replaying harness around `boto3_conn`
|
|
||||||
|
|
||||||
Placeboify patches the `boto3_conn` function in ec2 module_utils to return
|
|
||||||
a boto3 session that in recording or replaying mode, depending on the
|
|
||||||
PLACEBO_RECORD environment variable. Unset PLACEBO_RECORD (the common case
|
|
||||||
for just running tests) will put placebo in replay mode, set PLACEBO_RECORD
|
|
||||||
to any value to turn off replay & operate on real AWS resources.
|
|
||||||
|
|
||||||
The recorded sessions are stored in the test file's directory, under the
|
|
||||||
namespace `placebo_recordings/{testfile name}/{test function name}` to
|
|
||||||
distinguish them.
|
|
||||||
"""
|
|
||||||
session = boto3.Session(region_name='us-west-2')
|
|
||||||
|
|
||||||
recordings_path = os.path.join(
|
|
||||||
request.fspath.dirname,
|
|
||||||
'placebo_recordings',
|
|
||||||
request.fspath.basename.replace('.py', ''),
|
|
||||||
request.function.__name__
|
|
||||||
# remove the test_ prefix from the function & file name
|
|
||||||
).replace('test_', '')
|
|
||||||
|
|
||||||
if not os.getenv('PLACEBO_RECORD'):
|
|
||||||
if not os.path.isdir(recordings_path):
|
|
||||||
raise NotImplementedError('Missing Placebo recordings in directory: %s' % recordings_path)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
# make sure the directory for placebo test recordings is available
|
|
||||||
os.makedirs(recordings_path)
|
|
||||||
except OSError as e:
|
|
||||||
if e.errno != errno.EEXIST:
|
|
||||||
raise
|
|
||||||
|
|
||||||
pill = placebo.attach(session, data_path=recordings_path)
|
|
||||||
if os.getenv('PLACEBO_RECORD'):
|
|
||||||
pill.record()
|
|
||||||
else:
|
|
||||||
pill.playback()
|
|
||||||
|
|
||||||
def boto3_middleman_connection(module, conn_type, resource, region='us-west-2', **kwargs):
|
|
||||||
if conn_type != 'client':
|
|
||||||
# TODO support resource-based connections
|
|
||||||
raise ValueError('Mocker only supports client, not %s' % conn_type)
|
|
||||||
return session.client(resource, region_name=region)
|
|
||||||
|
|
||||||
import ansible.module_utils.ec2
|
|
||||||
monkeypatch.setattr(
|
|
||||||
ansible.module_utils.ec2,
|
|
||||||
'boto3_conn',
|
|
||||||
boto3_middleman_connection,
|
|
||||||
)
|
|
||||||
yield session
|
|
||||||
|
|
||||||
# tear down
|
|
||||||
pill.stop()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
|
||||||
def basic_launch_config():
|
|
||||||
"""Create an EC2 launch config whose creation *is not* recorded and return its name
|
|
||||||
|
|
||||||
This fixture is module-scoped, since launch configs are immutable and this
|
|
||||||
can be reused for many tests.
|
|
||||||
"""
|
|
||||||
if not os.getenv('PLACEBO_RECORD'):
|
|
||||||
yield 'pytest_basic_lc'
|
|
||||||
return
|
|
||||||
|
|
||||||
# use a *non recording* session to make the launch config
|
|
||||||
# since that's a prereq of the ec2_asg module, and isn't what
|
|
||||||
# we're testing.
|
|
||||||
asg = boto3.client('autoscaling')
|
|
||||||
asg.create_launch_configuration(
|
|
||||||
LaunchConfigurationName='pytest_basic_lc',
|
|
||||||
ImageId='ami-9be6f38c', # Amazon Linux 2016.09 us-east-1 AMI, can be any valid AMI
|
|
||||||
SecurityGroups=[],
|
|
||||||
UserData='#!/bin/bash\necho hello world',
|
|
||||||
InstanceType='t2.micro',
|
|
||||||
InstanceMonitoring={'Enabled': False},
|
|
||||||
AssociatePublicIpAddress=True
|
|
||||||
)
|
|
||||||
|
|
||||||
yield 'pytest_basic_lc'
|
|
||||||
|
|
||||||
try:
|
|
||||||
asg.delete_launch_configuration(LaunchConfigurationName='pytest_basic_lc')
|
|
||||||
except botocore.exceptions.ClientError as e:
|
|
||||||
if 'not found' in e.message:
|
|
||||||
return
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
|
||||||
def scratch_vpc():
|
|
||||||
if not os.getenv('PLACEBO_RECORD'):
|
|
||||||
yield {
|
|
||||||
'vpc_id': 'vpc-123456',
|
|
||||||
'cidr_range': '10.0.0.0/16',
|
|
||||||
'subnets': [
|
|
||||||
{
|
|
||||||
'id': 'subnet-123456',
|
|
||||||
'az': 'us-east-1d',
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'id': 'subnet-654321',
|
|
||||||
'az': 'us-east-1e',
|
|
||||||
},
|
|
||||||
]
|
|
||||||
}
|
|
||||||
return
|
|
||||||
|
|
||||||
# use a *non recording* session to make the base VPC and subnets
|
|
||||||
ec2 = boto3.client('ec2')
|
|
||||||
vpc_resp = ec2.create_vpc(
|
|
||||||
CidrBlock='10.0.0.0/16',
|
|
||||||
AmazonProvidedIpv6CidrBlock=False,
|
|
||||||
)
|
|
||||||
subnets = (
|
|
||||||
ec2.create_subnet(
|
|
||||||
VpcId=vpc_resp['Vpc']['VpcId'],
|
|
||||||
CidrBlock='10.0.0.0/24',
|
|
||||||
),
|
|
||||||
ec2.create_subnet(
|
|
||||||
VpcId=vpc_resp['Vpc']['VpcId'],
|
|
||||||
CidrBlock='10.0.1.0/24',
|
|
||||||
)
|
|
||||||
)
|
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
yield {
|
|
||||||
'vpc_id': vpc_resp['Vpc']['VpcId'],
|
|
||||||
'cidr_range': '10.0.0.0/16',
|
|
||||||
'subnets': [
|
|
||||||
{
|
|
||||||
'id': s['Subnet']['SubnetId'],
|
|
||||||
'az': s['Subnet']['AvailabilityZone'],
|
|
||||||
} for s in subnets
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
for s in subnets:
|
|
||||||
try:
|
|
||||||
ec2.delete_subnet(SubnetId=s['Subnet']['SubnetId'])
|
|
||||||
except botocore.exceptions.ClientError as e:
|
|
||||||
if 'not found' in e.message:
|
|
||||||
continue
|
|
||||||
raise
|
|
||||||
ec2.delete_vpc(VpcId=vpc_resp['Vpc']['VpcId'])
|
|
||||||
except botocore.exceptions.ClientError as e:
|
|
||||||
if 'not found' in e.message:
|
|
||||||
return
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
|
||||||
def maybe_sleep():
|
|
||||||
"""If placebo is reading saved sessions, make sleep always take 0 seconds.
|
|
||||||
|
|
||||||
AWS modules often perform polling or retries, but when using recorded
|
|
||||||
sessions there's no reason to wait. We can still exercise retry and other
|
|
||||||
code paths without waiting for wall-clock time to pass."""
|
|
||||||
if not os.getenv('PLACEBO_RECORD'):
|
|
||||||
p = mock.patch('time.sleep', return_value=None)
|
|
||||||
p.start()
|
|
||||||
yield
|
|
||||||
p.stop()
|
|
||||||
else:
|
|
||||||
yield
|
|
@ -1,70 +0,0 @@
|
|||||||
import pytest
|
|
||||||
|
|
||||||
from units.compat.mock import MagicMock
|
|
||||||
|
|
||||||
from ansible.module_utils.k8s.common import K8sAnsibleMixin
|
|
||||||
from ansible.module_utils.k8s.raw import KubernetesRawModule
|
|
||||||
from ansible.module_utils.kubevirt import KubeVirtRawModule
|
|
||||||
|
|
||||||
import openshift.dynamic
|
|
||||||
|
|
||||||
RESOURCE_DEFAULT_ARGS = {'api_version': 'v1alpha3', 'group': 'kubevirt.io',
|
|
||||||
'prefix': 'apis', 'namespaced': True}
|
|
||||||
|
|
||||||
|
|
||||||
class AnsibleExitJson(Exception):
|
|
||||||
"""Exception class to be raised by module.exit_json and caught
|
|
||||||
by the test case"""
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
for k in kwargs:
|
|
||||||
setattr(self, k, kwargs[k])
|
|
||||||
|
|
||||||
def __getitem__(self, attr):
|
|
||||||
return getattr(self, attr)
|
|
||||||
|
|
||||||
|
|
||||||
class AnsibleFailJson(Exception):
|
|
||||||
"""Exception class to be raised by module.fail_json and caught
|
|
||||||
by the test case"""
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
for k in kwargs:
|
|
||||||
setattr(self, k, kwargs[k])
|
|
||||||
|
|
||||||
def __getitem__(self, attr):
|
|
||||||
return getattr(self, attr)
|
|
||||||
|
|
||||||
|
|
||||||
def exit_json(*args, **kwargs):
|
|
||||||
kwargs['success'] = True
|
|
||||||
if 'changed' not in kwargs:
|
|
||||||
kwargs['changed'] = False
|
|
||||||
raise AnsibleExitJson(**kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def fail_json(*args, **kwargs):
|
|
||||||
kwargs['success'] = False
|
|
||||||
raise AnsibleFailJson(**kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def base_fixture(monkeypatch):
|
|
||||||
monkeypatch.setattr(
|
|
||||||
KubernetesRawModule, "exit_json", exit_json)
|
|
||||||
monkeypatch.setattr(
|
|
||||||
KubernetesRawModule, "fail_json", fail_json)
|
|
||||||
# Create mock methods in Resource directly, otherwise dyn client
|
|
||||||
# tries binding those to corresponding methods in DynamicClient
|
|
||||||
# (with partial()), which is more problematic to intercept
|
|
||||||
openshift.dynamic.Resource.get = MagicMock()
|
|
||||||
openshift.dynamic.Resource.create = MagicMock()
|
|
||||||
openshift.dynamic.Resource.delete = MagicMock()
|
|
||||||
openshift.dynamic.Resource.patch = MagicMock()
|
|
||||||
openshift.dynamic.Resource.search = MagicMock()
|
|
||||||
openshift.dynamic.Resource.watch = MagicMock()
|
|
||||||
# Globally mock some methods, since all tests will use this
|
|
||||||
KubernetesRawModule.patch_resource = MagicMock()
|
|
||||||
KubernetesRawModule.patch_resource.return_value = ({}, None)
|
|
||||||
K8sAnsibleMixin.get_api_client = MagicMock()
|
|
||||||
K8sAnsibleMixin.get_api_client.return_value = None
|
|
||||||
K8sAnsibleMixin.find_resource = MagicMock()
|
|
||||||
KubeVirtRawModule.find_supported_resource = MagicMock()
|
|
Loading…
Reference in New Issue