@ -7,14 +7,15 @@ from ansible.module_utils import basic
from ansible . module_utils . _text import to_bytes
from ansible . module_utils . k8s . common import K8sAnsibleMixin
from ansible . module_utils . k8s . raw import KubernetesRawModule
from ansible . module_utils . kubevirt import KubeVirtRawModule
from ansible . modules . cloud . kubevirt import kubevirt_vm as mymodule
openshiftdynamic = pytest . importorskip ( " openshift.dynamic " , minversion = " 0.6.2 " )
helpexceptions = pytest . importorskip ( " openshift.helper.exceptions " , minversion = " 0.6.2 " )
openshiftdynamic = pytest . importorskip ( " openshift.dynamic " )
helpexceptions = pytest . importorskip ( " openshift.helper.exceptions " )
KIND = ' VirtulMachine '
RESOURCE_DEFAULT_ARGS = { ' api_version ' : ' v1 ' , ' group ' : ' kubevirt.io ' ,
RESOURCE_DEFAULT_ARGS = { ' api_version ' : ' v1 alpha3 ' , ' group ' : ' kubevirt.io ' ,
' prefix ' : ' apis ' , ' namespaced ' : True }
@ -56,7 +57,7 @@ def fail_json(*args, **kwargs):
@pytest.fixture ( autouse = True )
def setup_mixtures ( self , monkeypatch ) :
def setup_mixtures ( monkeypatch ) :
monkeypatch . setattr (
KubernetesRawModule , " exit_json " , exit_json )
monkeypatch . setattr (
@ -72,12 +73,14 @@ def setup_mixtures(self, monkeypatch):
K8sAnsibleMixin . get_api_client = MagicMock ( )
K8sAnsibleMixin . get_api_client . return_value = None
K8sAnsibleMixin . find_resource = MagicMock ( )
KubeVirtRawModule . find_supported_resource = MagicMock ( )
def test_vm_multus_creation ( self ) :
def test_vm_multus_creation ( ) :
# Desired state:
args = dict (
state = ' present ' , name = ' testvm ' ,
namespace = ' vms ' , api_version = ' v1 ' ,
namespace = ' vms ' ,
interfaces = [
{ ' bridge ' : { } , ' name ' : ' default ' , ' network ' : { ' pod ' : { } } } ,
{ ' bridge ' : { } , ' name ' : ' mynet ' , ' network ' : { ' multus ' : { ' networkName ' : ' mynet ' } } } ,
@ -86,68 +89,35 @@ def test_vm_multus_creation(self):
)
set_module_args ( args )
# State as "returned" by the "k8s cluster":
openshiftdynamic . Resource . get . return_value = None
resource_args = dict ( kind = KIND , * * RESOURCE_DEFAULT_ARGS )
K 8sAnsibleMixin. fin d_resource. return_value = openshiftdynamic . Resource ( * * resource_args )
K ubeVirtRawModule. find_supporte d_resource. return_value = openshiftdynamic . Resource ( * * resource_args )
# Actual test:
with pytest . raises ( AnsibleExitJson ) as result :
mymodule . KubeVirtVM ( ) . execute_module ( )
assert result . value [ ' changed ' ]
assert result . value [ ' result' ] [ ' method' ] == ' create '
assert result . value [ ' method' ] == ' create '
@pytest.mark.parametrize ( " _wait " , ( False , True ) )
def test_resource_absent ( self , _wait ) :
def test_resource_absent ( _wait ) :
# Desired state:
args = dict (
state = ' absent ' , name = ' testvmi ' ,
namespace = ' vms ' , api_version = ' v1 ' ,
namespace = ' vms ' ,
wait = _wait ,
)
set_module_args ( args )
# State as "returned" by the "k8s cluster":
openshiftdynamic . Resource . get . return_value = None
resource_args = dict ( kind = KIND , * * RESOURCE_DEFAULT_ARGS )
K 8sAnsibleMixin. fin d_resource. return_value = openshiftdynamic . Resource ( * * resource_args )
K ubeVirtRawModule. find_supporte d_resource. return_value = openshiftdynamic . Resource ( * * resource_args )
# Actual test:
with pytest . raises ( AnsibleExitJson ) as result :
mymodule . KubeVirtVM ( ) . execute_module ( )
assert result . value [ ' result ' ] [ ' method ' ] == ' delete '
@patch ( ' openshift.watch.Watch ' )
def test_stream_creation ( self , mock_watch ) :
# Desired state:
args = dict (
state = ' running ' , name = ' testvmi ' , namespace = ' vms ' ,
api_version = ' v1 ' , wait = True ,
)
set_module_args ( args )
# Actual test:
mock_watch . side_effect = helpexceptions . KubernetesException ( " Test " , value = 42 )
with pytest . raises ( AnsibleFailJson ) :
mymodule . KubeVirtVM ( ) . execute_module ( )
def test_simple_merge_dicts ( self ) :
dict1 = { ' labels ' : { ' label1 ' : ' value ' } }
dict2 = { ' labels ' : { ' label2 ' : ' value ' } }
dict3 = json . dumps ( { ' labels ' : { ' label1 ' : ' value ' , ' label2 ' : ' value ' } } , sort_keys = True )
assert dict3 == json . dumps ( mymodule . KubeVirtVM . merge_dicts ( dict1 , dict2 ) , sort_keys = True )
def test_simple_multi_merge_dicts ( self ) :
dict1 = { ' labels ' : { ' label1 ' : ' value ' , ' label3 ' : ' value ' } }
dict2 = { ' labels ' : { ' label2 ' : ' value ' } }
dict3 = json . dumps ( { ' labels ' : { ' label1 ' : ' value ' , ' label2 ' : ' value ' , ' label3 ' : ' value ' } } , sort_keys = True )
assert dict3 == json . dumps ( mymodule . KubeVirtVM . merge_dicts ( dict1 , dict2 ) , sort_keys = True )
def test_double_nested_merge_dicts ( self ) :
dict1 = { ' metadata ' : { ' labels ' : { ' label1 ' : ' value ' , ' label3 ' : ' value ' } } }
dict2 = { ' metadata ' : { ' labels ' : { ' label2 ' : ' value ' } } }
dict3 = json . dumps ( { ' metadata ' : { ' labels ' : { ' label1 ' : ' value ' , ' label2 ' : ' value ' , ' label3 ' : ' value ' } } } , sort_keys = True )
assert dict3 == json . dumps ( mymodule . KubeVirtVM . merge_dicts ( dict1 , dict2 ) , sort_keys = True )
assert result . value [ ' method ' ] == ' delete '
assert not result . value [ ' kubevirt_vm ' ]