mirror of https://github.com/avast/PurpleDome
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
385 lines
15 KiB
Python
385 lines
15 KiB
Python
import unittest
|
|
from unittest.mock import patch, call
|
|
from app.calderacontrol import CalderaControl
|
|
from simplejson.errors import JSONDecodeError
|
|
from app.exceptions import CalderaError
|
|
from app.attack_log import AttackLog
|
|
import pydantic
|
|
|
|
# https://docs.python.org/3/library/unittest.html
|
|
|
|
|
|
class TestExample(unittest.TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
self.attack_logger = AttackLog(0)
|
|
self.cc = CalderaControl("https://localhost", attack_logger=self.attack_logger, apikey="123")
|
|
|
|
def tearDown(self) -> None:
|
|
pass
|
|
|
|
# list_operations
|
|
def test_list_operations(self):
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
try:
|
|
self.cc.list_operations()
|
|
except pydantic.error_wrappers.ValidationError:
|
|
pass
|
|
mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/operations')
|
|
|
|
# list operations gets the expected exception
|
|
def test_list_operations_with_exception(self):
|
|
with self.assertRaises(JSONDecodeError):
|
|
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
|
|
self.cc.list_operations()
|
|
|
|
# list_abilities
|
|
def test_list_abilities(self):
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
try:
|
|
self.cc.list_abilities()
|
|
except pydantic.error_wrappers.ValidationError:
|
|
pass
|
|
mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/abilities')
|
|
|
|
# list abilities gets the expected exception
|
|
def test_list_abilities_with_exception(self):
|
|
with self.assertRaises(JSONDecodeError):
|
|
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
|
|
self.cc.list_abilities()
|
|
|
|
# list_agents
|
|
def test_list_agents(self):
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
try:
|
|
self.cc.list_agents()
|
|
except pydantic.error_wrappers.ValidationError:
|
|
pass
|
|
mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/agents')
|
|
|
|
# list agents gets the expected exception
|
|
def test_list_agents_with_exception(self):
|
|
with self.assertRaises(JSONDecodeError):
|
|
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
|
|
self.cc.list_agents()
|
|
|
|
# list_adversaries
|
|
def test_list_adversaries(self):
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
try:
|
|
self.cc.list_adversaries()
|
|
except pydantic.error_wrappers.ValidationError:
|
|
pass
|
|
mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/adversaries')
|
|
|
|
# list adversaries gets the expected exception
|
|
def test_list_adversaries_with_exception(self):
|
|
with self.assertRaises(JSONDecodeError):
|
|
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
|
|
self.cc.list_adversaries()
|
|
|
|
# list_objectives
|
|
def test_list_objectives(self):
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
try:
|
|
self.cc.list_objectives()
|
|
except pydantic.error_wrappers.ValidationError:
|
|
pass
|
|
mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/objectives')
|
|
|
|
# list objectives gets the expected exception
|
|
def test_list_objectives_with_exception(self):
|
|
with self.assertRaises(JSONDecodeError):
|
|
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
|
|
self.cc.list_objectives()
|
|
|
|
# Get operation (working)
|
|
def test_get_operation(self):
|
|
a = {"name": "bar", "bar": "baz"}
|
|
with patch.object(self.cc, "list_operations", return_value=[a, {"name": "zigg", "bar": "baz"}]):
|
|
op = self.cc.get_operation("bar")
|
|
self.assertEqual(op, a)
|
|
|
|
# Get operation (no matching name)
|
|
def test_get_operation_not_available(self):
|
|
a = {"name": "bar", "bar": "baz"}
|
|
with patch.object(self.cc, "list_operations", return_value=[a, {"name": "zigg", "bar": "baz"}]):
|
|
op = self.cc.get_operation("baaaaar")
|
|
self.assertEqual(op, None)
|
|
|
|
# get_adversary
|
|
def test_get_adversary(self):
|
|
a = {"name": "bar", "bar": "baz"}
|
|
with patch.object(self.cc, "list_adversaries", return_value=[a, {"name": "zigg", "bar": "baz"}]):
|
|
op = self.cc.get_adversary("bar")
|
|
self.assertEqual(op, a)
|
|
|
|
# get_adversary (no matching name)
|
|
def test_get_adversary_not_available(self):
|
|
a = {"name": "bar", "bar": "baz"}
|
|
with patch.object(self.cc, "list_adversaries", return_value=[a, {"name": "zigg", "bar": "baz"}]):
|
|
op = self.cc.get_adversary("baaaar")
|
|
self.assertEqual(op, None)
|
|
|
|
# get_objective
|
|
def test_get_objective(self):
|
|
a = {"name": "bar", "bar": "baz"}
|
|
with patch.object(self.cc, "list_objectives", return_value=[a, {"name": "zigg", "bar": "baz"}]):
|
|
op = self.cc.get_objective("bar")
|
|
self.assertEqual(op, a)
|
|
|
|
# get_objective (no matching name)
|
|
def test_get_objective_not_available(self):
|
|
a = {"name": "bar", "bar": "baz"}
|
|
with patch.object(self.cc, "list_objectives", return_value=[a, {"name": "zigg", "bar": "baz"}]):
|
|
op = self.cc.get_objective("baaaar")
|
|
self.assertEqual(op, None)
|
|
|
|
# get_ability
|
|
def test_get_ability(self):
|
|
a = {"ability_id": "bar", "bar": "baz"}
|
|
with patch.object(self.cc, "list_abilities", return_value=[a, {"ability_id": "zigg", "bar": "baz"}]):
|
|
op = self.cc.get_ability("bar")
|
|
self.assertEqual(op, [a])
|
|
|
|
# get_ability (no matching name)
|
|
def test_get_ability_not_available(self):
|
|
a = {"ability_id": "bar", "bar": "baz"}
|
|
with patch.object(self.cc, "list_abilities", return_value=[a, {"ability_id": "zigg", "bar": "baz"}]):
|
|
op = self.cc.get_ability("baaaar")
|
|
self.assertEqual(op, [])
|
|
|
|
# get_operation_by_id
|
|
def test_get_operation_by_id(self):
|
|
opid = "FooBar"
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
try:
|
|
self.cc.get_operation_by_id(opid)
|
|
except pydantic.error_wrappers.ValidationError:
|
|
pass
|
|
mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/operations')
|
|
|
|
# get_linkid
|
|
def test_get_linkid(self):
|
|
paw = "FooBar"
|
|
ability_id = "AID"
|
|
|
|
alink = {"paw": paw,
|
|
"ability": {"ability_id": ability_id},
|
|
"id": "Getme"}
|
|
|
|
op = [{"chain": [alink]}]
|
|
|
|
with patch.object(self.cc, "get_operation_by_id", return_value=op):
|
|
res = self.cc.get_linkid("Foo", paw, ability_id)
|
|
self.assertEqual(res, "Getme")
|
|
|
|
# get missing link id
|
|
def test_get_linkid_missing(self):
|
|
paw = "FooBar"
|
|
ability_id = "AID"
|
|
|
|
alink = {"paw": paw,
|
|
"ability": {"ability_id": ability_id},
|
|
"id": "Getme"}
|
|
|
|
op = [{"chain": [alink]}]
|
|
|
|
with patch.object(self.cc, "get_operation_by_id", return_value=op):
|
|
res = self.cc.get_linkid("Foo", "Bar", ability_id)
|
|
self.assertEqual(res, None)
|
|
|
|
# view_operation_report
|
|
def test_view_operation_report(self):
|
|
opid = "FooBar"
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
self.cc.view_operation_report(opid)
|
|
mock_method.assert_called_once_with({"enable_agent_output": True}, method="post", rest_path="api/v2/operations/FooBar/report")
|
|
|
|
# get_result_by_id gets the expected exception
|
|
def test_view_operation_report_with_exception(self):
|
|
with self.assertRaises(JSONDecodeError):
|
|
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
|
|
self.cc.view_operation_report("FooBar")
|
|
|
|
# view_operation_output
|
|
def test_view_operation_output(self):
|
|
opid = "OPID_123"
|
|
paw = "PAW_123"
|
|
ability_id = "AID_123"
|
|
or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}}
|
|
with patch.object(self.cc, "view_operation_report", return_value=or_ret):
|
|
res = self.cc.view_operation_output(opid, paw, ability_id)
|
|
self.assertEqual(res, "red")
|
|
|
|
# view_operation_output paw missing
|
|
def test_view_operation_output_paw_missing(self):
|
|
opid = "OPID_123"
|
|
paw = "PAW_123"
|
|
ability_id = "AID_123"
|
|
or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}}
|
|
with self.assertRaises(CalderaError):
|
|
with patch.object(self.cc, "view_operation_report", return_value=or_ret):
|
|
self.cc.view_operation_output(opid, "missing", ability_id)
|
|
|
|
# view_operation_output ability_id missing
|
|
def test_view_operation_output_ability_id_missing(self):
|
|
opid = "OPID_123"
|
|
paw = "PAW_123"
|
|
ability_id = "AID_123"
|
|
or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}}
|
|
with patch.object(self.cc, "view_operation_report", return_value=or_ret):
|
|
res = self.cc.view_operation_output(opid, paw, "missing")
|
|
self.assertEqual(res, None)
|
|
|
|
# add_operation
|
|
def test_add_operation(self):
|
|
name = "test_name"
|
|
state = "test_state"
|
|
group = "test_group"
|
|
advid = "test_id"
|
|
|
|
exp1 = {'name': 'test_name', 'group': 'test_group', 'adversary': {'adversary_id': None}, 'auto_close': False, 'state': 'test_state', 'autonomous': 1, 'planner': {'id': 'atomic'}, 'source': {'id': 'basic'}, 'use_learning_parsers': True, 'obfuscator': 'plain-text', 'jitter': '4/8', 'visibility': '51'}
|
|
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
try:
|
|
self.cc.add_operation(name=name,
|
|
advid=advid,
|
|
group=group,
|
|
state=state)
|
|
except pydantic.error_wrappers.ValidationError:
|
|
pass
|
|
mock_method.assert_has_calls([call(exp1, method='post', rest_path='api/v2/operations')])
|
|
|
|
# add_operation defaults
|
|
def test_add_operation_defaults(self):
|
|
name = "test_name"
|
|
advid = "test_id"
|
|
|
|
exp1 = {'name': 'test_name', 'group': '', 'adversary': {'adversary_id': None}, 'auto_close': False, 'state': 'running', 'autonomous': 1, 'planner': {'id': 'atomic'}, 'source': {'id': 'basic'}, 'use_learning_parsers': True, 'obfuscator': 'plain-text', 'jitter': '4/8', 'visibility': '51'}
|
|
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
try:
|
|
self.cc.add_operation(name=name,
|
|
advid=advid)
|
|
except pydantic.error_wrappers.ValidationError:
|
|
pass
|
|
mock_method.assert_has_calls([call(exp1, method='post', rest_path='api/v2/operations')])
|
|
|
|
# add_adversary
|
|
def test_add_adversary(self):
|
|
name = "test_name"
|
|
ability = "test_ability"
|
|
description = "test_descritption"
|
|
|
|
# Caldera 4
|
|
exp_4 = {
|
|
"name": name,
|
|
"description": description,
|
|
"atomic_ordering": ["test_ability"],
|
|
#
|
|
"objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective
|
|
}
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
self.cc.add_adversary(name, ability, description)
|
|
mock_method.assert_called_once_with(exp_4, method="post", rest_path="api/v2/adversaries")
|
|
|
|
def test_add_adversary_default(self):
|
|
name = "test_name"
|
|
ability = "test_ability"
|
|
|
|
# Caldera 4
|
|
exp_4 = {
|
|
"name": name,
|
|
"description": "created automatically",
|
|
"atomic_ordering": ["test_ability"],
|
|
#
|
|
"objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective
|
|
}
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
self.cc.add_adversary(name, ability)
|
|
mock_method.assert_called_once_with(exp_4, method="post", rest_path="api/v2/adversaries")
|
|
|
|
# execute_operation
|
|
def test_execute_operation(self):
|
|
operation_id = "test_opid"
|
|
state = "paused"
|
|
|
|
exp = {"index": "operation",
|
|
"op_id": operation_id,
|
|
"state": state}
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
self.cc.set_operation_state(operation_id, state)
|
|
mock_method.assert_called_once_with(exp)
|
|
|
|
# not supported state
|
|
def test_execute_operation_not_supported(self):
|
|
operation_id = "test_opid"
|
|
state = "not supported"
|
|
|
|
with self.assertRaises(ValueError):
|
|
with patch.object(self.cc, "__contact_server__", return_value=None):
|
|
self.cc.set_operation_state(operation_id, state)
|
|
|
|
def test_execute_operation_default(self):
|
|
operation_id = "test_opid"
|
|
|
|
exp = {"index": "operation",
|
|
"op_id": operation_id,
|
|
"state": "running" # default
|
|
}
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
self.cc.set_operation_state(operation_id)
|
|
mock_method.assert_called_once_with(exp)
|
|
|
|
# delete_operation
|
|
def test_delete_operation(self):
|
|
opid = "test_opid"
|
|
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
self.cc.delete_operation(opid)
|
|
mock_method.assert_called_once_with({}, method="delete", rest_path="api/v2/operations/test_opid")
|
|
|
|
# delete_adversary
|
|
def test_delete_adversary(self):
|
|
adid = "test_adid"
|
|
|
|
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
|
|
self.cc.delete_adversary(adid)
|
|
mock_method.assert_called_once_with(None, method="delete", rest_path="api/v2/adversaries/test_adid")
|
|
|
|
# is_operation_finished
|
|
def test_is_operation_finished_true(self):
|
|
opdata = [{"state": "finished", "chain": [{"status": 0}]}]
|
|
opid = "does not matter"
|
|
|
|
with patch.object(self.cc, "get_operation_by_id", return_value=opdata):
|
|
res = self.cc.is_operation_finished(opid)
|
|
self.assertEqual(res, True)
|
|
|
|
def test_is_operation_finished_false(self):
|
|
opdata = [{"state": "running", "chain": [{"status": 1}]}]
|
|
opid = "does not matter"
|
|
|
|
with patch.object(self.cc, "get_operation_by_id", return_value=opdata):
|
|
res = self.cc.is_operation_finished(opid)
|
|
self.assertEqual(res, False)
|
|
|
|
def test_is_operation_finished_exception2(self):
|
|
opdata = []
|
|
opid = "does not matter"
|
|
|
|
with self.assertRaises(CalderaError):
|
|
with patch.object(self.cc, "get_operation_by_id", return_value=opdata):
|
|
self.cc.is_operation_finished(opid)
|
|
|
|
# TODO attack (lots of complexity !)
|
|
|
|
# TODO test fetch_client
|
|
|
|
# TODO test __contact_server__
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|