From 065badd0c37449eb02d40ec8feafa68868492d91 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Mon, 31 Jan 2022 14:31:32 +0100 Subject: [PATCH] Moved to Caldera 4 alpha as default --- app/calderaapi_2.py | 43 ++++++++- app/calderaapi_4.py | 166 ++++++++++++++++++++++++++++------- app/calderacontrol.py | 105 +++++++++------------- app/machinecontrol.py | 3 +- caldera_control.py | 22 ++++- tests/test_calderacontrol.py | 162 +++++++++++++++------------------- 6 files changed, 310 insertions(+), 191 deletions(-) diff --git a/app/calderaapi_2.py b/app/calderaapi_2.py index 95a94de..9d84701 100644 --- a/app/calderaapi_2.py +++ b/app/calderaapi_2.py @@ -126,7 +126,7 @@ class CalderaAPI: print(payload) return self.__contact_server__(payload, method="put") - def add_operation(self, name: str, advid: str, group: str = "red", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8', parameters=None): + def add_operation(self, **kwargs): """ Adds a new operation @param name: Name of the operation @@ -138,6 +138,15 @@ class CalderaAPI: @param parameters: parameters to pass to the ability """ + # name: str, advid: str, group: str = "red", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8', parameters=None + name: str = kwargs.get("name") + advid: str = kwargs.get("adversary_id") + group: str = kwargs.get("group", "red") + state: str = kwargs.get("state", "running") + obfuscator: str = kwargs.get("obfuscator", "plain-text") + jitter: str = kwargs.get("jitter", "4/8") + parameters = kwargs.get("parameters", None) + # Add operation: curl -X PUT -H "KEY:$KEY" http://127.0.0.1:8888/api/rest -d '{"index":"operations","name":"testoperation1"}' # observed from GUI sniffing: PUT {'name': 'schnuffel2', 'group': 'red', 'adversary_id': '0f4c3c67-845e-49a0-927e-90ed33c044e0', 'state': 'running', 'planner': 'atomic', 'autonomous': '1', 'obfuscator': 'plain-text', 'auto_close': '1', 'jitter': '4/8', 'source': 'Alice Filters', 'visibility': '50'} @@ -164,6 +173,38 @@ class CalderaAPI: return self.__contact_server__(payload, method="put") + def view_operation_report(self, opid: str): + """ views the operation report + + @param opid: Operation id to look for + """ + + # let postData = selectedOperationId ? {'index':'operation_report', 'op_id': selectedOperationId, 'agent_output': Number(agentOutput)} : null; + # checking it (from snifffing protocol at the server): POST {'id': 539687} + payload = {"index": "operation_report", + "op_id": opid, + 'agent_output': 1 + } + return self.__contact_server__(payload) + + def set_operation_state(self, operation_id: str, state: str = "running"): + """ Executes an operation on a server + + @param operation_id: The operation to modify + @param state: The state to set this operation into + """ + + # TODO: Change state of an operation: curl -X POST -H "KEY:ADMIN123" http://localhost:8888/api/rest -d '{"index":"operation", "op_id":123, "state":"finished"}' + # curl -X POST -H "KEY:ADMIN123" http://localhost:8888/api/rest -d '{"index":"operation", "op_id":123, "state":"finished"}' + + if state not in ["running", "finished", "paused", "run_one_link", "cleanup"]: + raise ValueError + + payload = {"index": "operation", + "op_id": operation_id, + "state": state} + return self.__contact_server__(payload) + def add_adversary(self, name: str, ability: str, description: str = "created automatically"): """ Adds a new adversary diff --git a/app/calderaapi_4.py b/app/calderaapi_4.py index 12e352f..99efb93 100644 --- a/app/calderaapi_4.py +++ b/app/calderaapi_4.py @@ -68,6 +68,13 @@ class Executor: platform: str command: Optional[str] + def get(self, akey, default=None): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ + if akey in self.__dict__: + return self.__dict__[akey] + + return default + @dataclass class Ability: @@ -88,6 +95,13 @@ class Ability: ability_id: str privilege: Optional[str] = None + def get(self, akey, default=None): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ + if akey in self.__dict__: + return self.__dict__[akey] + + return default + @dataclass class AbilityList: @@ -127,6 +141,13 @@ class Adversary: tags: list[str] plugin: Optional[str] = None + def get(self, akey, default=None): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ + if akey in self.__dict__: + return self.__dict__[akey] + + return default + @dataclass class AdversaryList: @@ -153,6 +174,13 @@ class Fact: technique_id: Optional[str] = None collected_by: Optional[str] = None + def get(self, akey, default=None): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ + if akey in self.__dict__: + return self.__dict__[akey] + + return default + @dataclass class Relationship: @@ -231,11 +259,18 @@ class Agent: pending_contact: str privilege: Optional[str] = None # Error, not documented + def get(self, akey, default=None): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ + if akey in self.__dict__: + return self.__dict__[akey] + + return default + @dataclass class AgentList: """ A list of agents """ - agents: conlist(Agent) + agents: list[Agent] def get_data(self): return self.agents @@ -266,6 +301,13 @@ class Source: id: str # pylint: disable=invalid-name adjustments: Optional[list[Adjustment]] = None + def get(self, akey, default=None): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ + if akey in self.__dict__: + return self.__dict__[akey] + + return default + @dataclass class SourceList: @@ -315,6 +357,13 @@ class Objective: description: str id: str # pylint: disable=invalid-name + def get(self, akey, default=None): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ + if akey in self.__dict__: + return self.__dict__[akey] + + return default + @dataclass class Operation: @@ -337,6 +386,13 @@ class Operation: auto_close: bool chain: Optional[list] = None + def get(self, akey, default=None): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ + if akey in self.__dict__: + return self.__dict__[akey] + + return default + @dataclass class OperationList: @@ -354,7 +410,7 @@ class ObjectiveList: return self.objectives -class CalderaControl(): +class CalderaAPI(): """ Remote control Caldera through REST api """ def __init__(self, server: str, attack_logger, config=None, apikey=None): @@ -364,7 +420,6 @@ class CalderaControl(): @param attack_logger: The attack logger to use @param config: The configuration """ - # print(server) self.url = server if server.endswith("/") else server + "/" self.attack_logger = attack_logger @@ -383,13 +438,11 @@ class CalderaControl(): @param method: http method to use """ url = self.url + rest_path - print(url) header = {"KEY": "ADMIN123", "accept": "application/json", "Content-Type": "application/json"} if method.lower() == "post": j = json.dumps(payload) - print(j) request = requests.post(url, headers=header, data=j) elif method.lower() == "put": request = requests.put(url, headers=header, data=json.dumps(payload)) @@ -468,16 +521,30 @@ class CalderaControl(): payload = None data = {"operations": self.__contact_server__(payload, method="get", rest_path="api/v2/operations")} - print(data) operations = OperationList(**data) return operations.get_data() + def set_operation_state(self, operation_id: str, state: str = "running"): + """ Executes an operation on a server + + @param operation_id: The operation to modify + @param state: The state to set this operation into + """ + + # TODO: Change state of an operation: curl -X POST -H "KEY:ADMIN123" http://localhost:8888/api/rest -d '{"index":"operation", "op_id":123, "state":"finished"}' + # curl -X POST -H "KEY:ADMIN123" http://localhost:8888/api/rest -d '{"index":"operation", "op_id":123, "state":"finished"}' + + if state not in ["running", "finished", "paused", "run_one_link", "cleanup"]: + raise ValueError + + payload = {"state": state} + return self.__contact_server__(payload, method="patch", rest_path=f"api/v2/operations/{operation_id}") + def list_agents(self): """ Return all agents """ payload = None data = {"agents": self.__contact_server__(payload, method="get", rest_path="api/v2/agents")} - # print(data) agents = AgentList(**data) return agents.get_data() @@ -486,28 +553,17 @@ class CalderaControl(): payload = None data = {"objectives": self.__contact_server__(payload, method="get", rest_path="api/v2/objectives")} - print(data) objectives = ObjectiveList(**data) return objectives.get_data() - # TODO: list_sources_for_name - # TODO: list_facts_for_name - # TODO: list_paws_of_running_agents - # TODO: get_operation - # TODO: get_adversary - # TODO: get_source - # TODO: get_ability - # TODO: does_ability_support_platform - # TODO: get_operation_by_id - # TODO: view_operation_report - # TODO: view_operation_output - # TODO: add_sources (maybe not needed anymore) - # TODO: execute_operation - - # TODO is_operation_finished - # TODO: attack - def add_adversary(self, name: str, ability: str, description: str = "created automatically"): + """ Adds a new adversary + + :param name: Name of the adversary + :param ability: Ability ID to add + :param description: Human readable description + :return: + """ payload = { # "adversary_id": "string", "atomic_ordering": [ @@ -522,34 +578,58 @@ class CalderaControl(): "description": description } data = {"agents": self.__contact_server__(payload, method="post", rest_path="api/v2/adversaries")} - print(data) # agents = AgentList(**data) return data def delete_adversary(self, adversary_id: str): + """ Deletes an adversary + + :param adversary_id: The id of this adversary + :return: + """ payload = None data = {"agents": self.__contact_server__(payload, method="delete", rest_path=f"api/v2/adversaries/{adversary_id}")} - # print(data) - # agents = AgentList(**data) return data def delete_agent(self, agent_paw: str): + """ Deletes an agent + + :param agent_paw: the paw to delete + :return: + """ payload = None data = {"agents": self.__contact_server__(payload, method="delete", rest_path=f"api/v2/agents/{agent_paw}")} - # print(data) - # agents = AgentList(**data) return data def kill_agent(self, agent_paw: str): + """ Kills an agent on the target + + :param agent_paw: The paw identifying this agent + :return: + """ payload = {"watchdog": 1, "sleep_min": 3, "sleep_max": 3} data = self.__contact_server__(payload, method="patch", rest_path=f"api/v2/agents/{agent_paw}") - # print(data) - # agents = AgentList(**data) return data - def add_operation(self, name, adversary_id, source_id="basic", planner_id="atomic", group="", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8'): + def add_operation(self, **kwargs): + """ Adds a new operation + + :param kwargs: + :return: + """ + + # name, adversary_id, source_id = "basic", planner_id = "atomic", group = "", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8' + + name: str = kwargs.get("name") + adversary_id: str = kwargs.get("adversary_id") + source_id: str = kwargs.get("source_id", "basic") + planner_id: str = kwargs.get("planner_id", "atomic") + group: str = kwargs.get("group", "") + state: str = kwargs.get("state", "running") + obfuscator: str = kwargs.get("obfuscator", "plain-text") + jitter: str = kwargs.get("jitter", "4/8") payload = {"name": name, "group": group, @@ -568,6 +648,11 @@ class CalderaControl(): return operations def delete_operation(self, operation_id): + """ Deletes an operation + + :param operation_id: The Id of the operation to delete + :return: + """ payload = {} @@ -575,6 +660,21 @@ class CalderaControl(): return data + def view_operation_report(self, operation_id): + """ Views the report of a finished operation + + :param operation_id: The id of this operation + :return: + """ + + payload = { + "enable_agent_output": True + } + + data = self.__contact_server__(payload, method="post", rest_path=f"api/v2/operations/{operation_id}/report") + + return data + def get_ability(self, abid: str): """" Return an ability by id diff --git a/app/calderacontrol.py b/app/calderacontrol.py index 93a7948..dddc880 100644 --- a/app/calderacontrol.py +++ b/app/calderacontrol.py @@ -12,7 +12,8 @@ import requests from app.exceptions import CalderaError from app.interface_sfx import CommandlineColors -from app.calderaapi_2 import CalderaAPI +# from app.calderaapi_2 import CalderaAPI +from app.calderaapi_4 import CalderaAPI # TODO: Ability deserves an own class. @@ -43,7 +44,7 @@ class CalderaControl(CalderaAPI): """ List facts in a source pool with a specific name """ for i in self.list_sources(): - if i["name"] == name: + if i.get("name") == name: return i return None @@ -59,19 +60,19 @@ class CalderaControl(CalderaAPI): return {} res = {} - for i in source["facts"]: - res[i["trait"]] = {"value": i["value"], - "technique_id": i["technique_id"], - "collected_by": i["collected_by"] - } + for i in source.get("facts"): + res[i.get("trait")] = {"value": i.get("value"), + "technique_id": i.get("technique_id"), + "collected_by": i.get("collected_by") + } return res def list_paws_of_running_agents(self): """ Returns a list of all paws of running agents """ - return [i["paw"] for i in self.list_agents()] + return [i.get("paw") for i in self.list_agents()] # 2.8.1 version + # return [i.paw for i in self.list_agents()] # 4* version # ######### Get one specific item - def get_operation(self, name: str): """ Gets an operation by name @@ -79,7 +80,7 @@ class CalderaControl(CalderaAPI): """ for operation in self.list_operations(): - if operation["name"] == name: + if operation.get("name") == name: return operation return None @@ -89,7 +90,7 @@ class CalderaControl(CalderaAPI): @param name: Name to look for """ for adversary in self.list_adversaries(): - if adversary["name"] == name: + if adversary.get("name") == name: return adversary return None @@ -99,7 +100,7 @@ class CalderaControl(CalderaAPI): @param name: Name to filter for """ for objective in self.list_objectives(): - if objective["name"] == name: + if objective.get("name") == name: return objective return None @@ -133,12 +134,17 @@ class CalderaControl(CalderaAPI): abilities = self.get_ability(abid) for ability in abilities: - if ability["platform"] == platform: + if ability.get("platform") == platform: return True if platform in ability.get("supported_platforms", []): return True if platform in ability.get("platforms", []): return True + executors = ability.get("executors") # For Caldera 4.* + if executors is not None: + for executor in executors: + if executor.get("platform") == platform: + return True print(self.get_ability(abid)) return False @@ -151,7 +157,7 @@ class CalderaControl(CalderaAPI): if operations is not None: for an_operation in operations: - if an_operation["id"] == op_id: + if an_operation.get("id") == op_id: return [an_operation] return [] @@ -175,20 +181,6 @@ class CalderaControl(CalderaAPI): # ######### View - def view_operation_report(self, opid: str): - """ views the operation report - - @param opid: Operation id to look for - """ - - # let postData = selectedOperationId ? {'index':'operation_report', 'op_id': selectedOperationId, 'agent_output': Number(agentOutput)} : null; - # checking it (from snifffing protocol at the server): POST {'id': 539687} - payload = {"index": "operation_report", - "op_id": opid, - 'agent_output': 1 - } - return self.__contact_server__(payload) - def view_operation_output(self, opid: str, paw: str, ability_id: str): """ Gets the output of an executed ability @@ -198,39 +190,22 @@ class CalderaControl(CalderaAPI): """ orep = self.view_operation_report(opid) + # print(orep) if paw not in orep["steps"]: print("Broken operation report:") pprint(orep) print(f"Could not find {paw} in {orep['steps']}") raise CalderaError # print("oprep: " + str(orep)) - for a_step in orep["steps"][paw]["steps"]: - if a_step["ability_id"] == ability_id: + for a_step in orep.get("steps").get(paw).get("steps"): + if a_step.get("ability_id") == ability_id: try: - return a_step["output"] + return a_step.get("output") except KeyError as exception: raise CalderaError from exception # print(f"Did not find ability {ability_id} in caldera operation output") return None - def execute_operation(self, operation_id: str, state: str = "running"): - """ Executes an operation on a server - - @param operation_id: The operation to modify - @param state: The state to set this operation into - """ - - # TODO: Change state of an operation: curl -X POST -H "KEY:ADMIN123" http://localhost:8888/api/rest -d '{"index":"operation", "op_id":123, "state":"finished"}' - # curl -X POST -H "KEY:ADMIN123" http://localhost:8888/api/rest -d '{"index":"operation", "op_id":123, "state":"finished"}' - - if state not in ["running", "finished", "paused", "run_one_link", "cleanup"]: - raise ValueError - - payload = {"index": "operation", - "op_id": operation_id, - "state": state} - return self.__contact_server__(payload) - # ######### Delete def delete_all_agents(self): @@ -269,12 +244,14 @@ class CalderaControl(CalderaAPI): # Plus: 0 as "finished" # + # TODO: Maybe try to get the report and continue until we have it. Could be done in addition. + operation = self.get_operation_by_id(opid) if debug: print(f"Operation data {operation}") try: # print(operation[0]["state"]) - if operation[0]["state"] == "finished": + if operation[0].get("state") == "finished": return True except KeyError as exception: raise CalderaError from exception @@ -348,15 +325,15 @@ class CalderaControl(CalderaAPI): return False self.add_adversary(adversary_name, ability_id) - adid = self.get_adversary(adversary_name)["adversary_id"] + adid = self.get_adversary(adversary_name).get("adversary_id") logid = self.attack_logger.start_caldera_attack(source=self.url, paw=paw, group=group, ability_id=ability_id, - ttp=self.get_ability(ability_id)[0]["technique_id"], - name=self.get_ability(ability_id)[0]["name"], - description=self.get_ability(ability_id)[0]["description"], + ttp=self.get_ability(ability_id)[0].get("technique_id"), + name=self.get_ability(ability_id)[0].get("name"), + description=self.get_ability(ability_id)[0].get("description"), obfuscator=obfuscator, jitter=jitter, **kwargs @@ -365,8 +342,8 @@ class CalderaControl(CalderaAPI): # ##### Create / Run Operation self.attack_logger.vprint(f"New adversary generated. ID: {adid}, ability: {ability_id} group: {group}", 2) - res = self.add_operation(operation_name, - advid=adid, + res = self.add_operation(name=operation_name, + adversary_id=adid, group=group, obfuscator=obfuscator, jitter=jitter, @@ -374,14 +351,14 @@ class CalderaControl(CalderaAPI): ) self.attack_logger.vprint(pformat(res), 3) - opid = self.get_operation(operation_name)["id"] + opid = self.get_operation(operation_name).get("id") self.attack_logger.vprint("New operation created. OpID: " + str(opid), 3) - self.execute_operation(opid) + self.set_operation_state(opid) self.attack_logger.vprint("Execute operation", 3) retries = 30 - ability_name = self.get_ability(ability_id)[0]["name"] - ability_description = self.get_ability(ability_id)[0]["description"] + ability_name = self.get_ability(ability_id)[0].get("name") + ability_description = self.get_ability(ability_id)[0].get("description") self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Executed attack operation{CommandlineColors.ENDC}", 1) self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_BLUE} PAW: {paw} Group: {group} Ability: {ability_id} {CommandlineColors.ENDC}", 1) self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_BLUE} {ability_name}: {ability_description} {CommandlineColors.ENDC}", 1) @@ -420,16 +397,16 @@ class CalderaControl(CalderaAPI): self.attack_logger.vprint(self.list_facts_for_name("source_" + operation_name), 2) # ######## Cleanup - self.execute_operation(opid, "cleanup") + self.set_operation_state(opid, "cleanup") self.delete_adversary(adid) self.delete_operation(opid) self.attack_logger.stop_caldera_attack(source=self.url, paw=paw, group=group, ability_id=ability_id, - ttp=self.get_ability(ability_id)[0]["technique_id"], - name=self.get_ability(ability_id)[0]["name"], - description=self.get_ability(ability_id)[0]["description"], + ttp=self.get_ability(ability_id)[0].get("technique_id"), + name=self.get_ability(ability_id)[0].get("name"), + description=self.get_ability(ability_id)[0].get("description"), obfuscator=obfuscator, jitter=jitter, logid=logid, diff --git a/app/machinecontrol.py b/app/machinecontrol.py index 2ba938b..b6a865e 100644 --- a/app/machinecontrol.py +++ b/app/machinecontrol.py @@ -390,7 +390,8 @@ class Machine(): # TODO: Caldera implant # TODO: Metasploit implant - def install_caldera_server(self, cleanup=False, version="2.8.1"): + # options for version: 4.0.0-alpha.2 2.8.1 + def install_caldera_server(self, cleanup=False, version="4.0.0-alpha.2"): """ Installs the caldera server on the VM @param cleanup: Remove the old caldera version. Slow but reduces side effects diff --git a/caldera_control.py b/caldera_control.py index f0a6241..ba70683 100755 --- a/caldera_control.py +++ b/caldera_control.py @@ -6,7 +6,7 @@ import argparse from pprint import pprint # from app.calderacontrol import CalderaControl -from app.calderaapi_4 import CalderaControl +from app.calderaapi_4 import CalderaAPI from app.attack_log import AttackLog @@ -32,6 +32,7 @@ def agents(calcontrol, arguments): # pylint: disable=unused-argument if arguments.list: print(calcontrol.list_agents()) + print([i["paw"] for i in calcontrol.list_agents()]) if arguments.delete: print(calcontrol.delete_agent(arguments.paw)) if arguments.kill: @@ -176,7 +177,14 @@ def operations(calcontrol, arguments): if arguments.name is None: raise CmdlineArgumentException("Adding an operation requires a name for it") - ops = calcontrol.add_operation(arguments.name, arguments.adversary_id, arguments.source_id, arguments.planner_id, arguments.group, arguments.state, arguments.obfuscator, arguments.jitter) + ops = calcontrol.add_operation(name=arguments.name, + adversary_id=arguments.adversary_id, + source_id=arguments.source_id, + planner_id=arguments.planner_id, + group=arguments.group, + state=arguments.state, + obfuscator=arguments.obfuscator, + jitter=arguments.jitter) print(ops) if arguments.delete: @@ -185,6 +193,12 @@ def operations(calcontrol, arguments): ops = calcontrol.delete_operation(arguments.id) print(ops) + if arguments.view_report: + if arguments.id is None: + raise CmdlineArgumentException("Viewing an operation report requires an operation id") + report = calcontrol.view_operation_report(arguments.id) + print(report) + def attack(calcontrol, arguments): """ Calling attack @@ -272,6 +286,8 @@ def create_parser(): help="Add a new operations") parser_operations.add_argument("--delete", default=False, action="store_true", help="Delete an operation") + parser_operations.add_argument("--view_report", default=False, action="store_true", + help="View the report of a finished operation") parser_operations.add_argument("--name", default=None, help="Name of the operation") parser_operations.add_argument("--adversary_id", "--advid", default=None, help="Adversary ID") parser_operations.add_argument("--source_id", "--sourceid", default="basic", help="'Source' ID") @@ -308,7 +324,7 @@ if __name__ == "__main__": print(args.caldera_url) attack_logger = AttackLog(args.verbose) - caldera_control = CalderaControl(args.caldera_url, attack_logger, config=None, apikey=args.apikey) + caldera_control = CalderaAPI(args.caldera_url, attack_logger, config=None, apikey=args.apikey) print("Caldera Control ready") try: str(args.func(caldera_control, args)) diff --git a/tests/test_calderacontrol.py b/tests/test_calderacontrol.py index 55deafc..855a06a 100644 --- a/tests/test_calderacontrol.py +++ b/tests/test_calderacontrol.py @@ -4,6 +4,7 @@ from app.calderacontrol import CalderaControl from simplejson.errors import JSONDecodeError from app.exceptions import CalderaError from app.attack_log import AttackLog +import pydantic # https://docs.python.org/3/library/unittest.html @@ -20,8 +21,11 @@ class TestExample(unittest.TestCase): # list_operations def test_list_operations(self): with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.list_operations() - mock_method.assert_called_once_with({"index": "operations"}) + try: + self.cc.list_operations() + except pydantic.error_wrappers.ValidationError: + pass + mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/operations') # list operations gets the expected exception def test_list_operations_with_exception(self): @@ -32,8 +36,11 @@ class TestExample(unittest.TestCase): # list_abilities def test_list_abilities(self): with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.list_abilities() - mock_method.assert_called_once_with({"index": "abilities"}) + try: + self.cc.list_abilities() + except pydantic.error_wrappers.ValidationError: + pass + mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/abilities') # list abilities gets the expected exception def test_list_abilities_with_exception(self): @@ -44,8 +51,11 @@ class TestExample(unittest.TestCase): # list_agents def test_list_agents(self): with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.list_agents() - mock_method.assert_called_once_with({"index": "agents"}) + try: + self.cc.list_agents() + except pydantic.error_wrappers.ValidationError: + pass + mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/agents') # list agents gets the expected exception def test_list_agents_with_exception(self): @@ -56,8 +66,11 @@ class TestExample(unittest.TestCase): # list_adversaries def test_list_adversaries(self): with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.list_adversaries() - mock_method.assert_called_once_with({"index": "adversaries"}) + try: + self.cc.list_adversaries() + except pydantic.error_wrappers.ValidationError: + pass + mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/adversaries') # list adversaries gets the expected exception def test_list_adversaries_with_exception(self): @@ -68,8 +81,11 @@ class TestExample(unittest.TestCase): # list_objectives def test_list_objectives(self): with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.list_objectives() - mock_method.assert_called_once_with({"index": "objectives"}) + try: + self.cc.list_objectives() + except pydantic.error_wrappers.ValidationError: + pass + mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/objectives') # list objectives gets the expected exception def test_list_objectives_with_exception(self): @@ -137,8 +153,11 @@ class TestExample(unittest.TestCase): def test_get_operation_by_id(self): opid = "FooBar" with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.get_operation_by_id(opid) - mock_method.assert_called_once_with({"index": "operations"}) + try: + self.cc.get_operation_by_id(opid) + except pydantic.error_wrappers.ValidationError: + pass + mock_method.assert_called_once_with(None, method='get', rest_path='api/v2/operations') # get_linkid def test_get_linkid(self): @@ -175,7 +194,7 @@ class TestExample(unittest.TestCase): opid = "FooBar" with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: self.cc.view_operation_report(opid) - mock_method.assert_called_once_with({"index": "operation_report", "op_id": opid, "agent_output": 1}) + mock_method.assert_called_once_with({"enable_agent_output": True}, method="post", rest_path="api/v2/operations/FooBar/report") # get_result_by_id gets the expected exception def test_view_operation_report_with_exception(self): @@ -220,57 +239,32 @@ class TestExample(unittest.TestCase): group = "test_group" advid = "test_id" - exp1 = {"index": "sources", - "name": "source_test_name", - "rules": [], - "relationships": [], - "facts": [] - } - exp3 = {"index": "operations", - "name": name, - "state": state, - "autonomous": 1, - 'obfuscator': 'plain-text', - 'auto_close': '1', - 'jitter': '4/8', - 'source': 'source_test_name', - 'visibility': '50', - "group": group, - "planner": "atomic", - "adversary_id": advid, - } + exp1 = {'name': 'test_name', 'group': 'test_group', 'adversary': {'adversary_id': None}, 'auto_close': False, 'state': 'test_state', 'autonomous': 1, 'planner': {'id': 'atomic'}, 'source': {'id': 'basic'}, 'use_learning_parsers': True, 'obfuscator': 'plain-text', 'jitter': '4/8', 'visibility': '51'} + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.add_operation(name, advid, group, state) - # mock_method.assert_called_once_with(exp, method="put") - mock_method.assert_has_calls([call(exp1, method="put"), call(exp3, method="put")]) + try: + self.cc.add_operation(name=name, + advid=advid, + group=group, + state=state) + except pydantic.error_wrappers.ValidationError: + pass + mock_method.assert_has_calls([call(exp1, method='post', rest_path='api/v2/operations')]) # add_operation defaults def test_add_operation_defaults(self): name = "test_name" advid = "test_id" - exp1 = {"index": "sources", - "name": "source_test_name", - "rules": [], - "relationships": [], - "facts": [] - } - exp3 = {"index": "operations", - "name": name, - "state": "running", # default - "autonomous": 1, - 'obfuscator': 'plain-text', - 'auto_close': '1', - 'jitter': '4/8', - 'source': 'source_test_name', - 'visibility': '50', - "group": "red", # default - "planner": "atomic", - "adversary_id": advid, - } + exp1 = {'name': 'test_name', 'group': '', 'adversary': {'adversary_id': None}, 'auto_close': False, 'state': 'running', 'autonomous': 1, 'planner': {'id': 'atomic'}, 'source': {'id': 'basic'}, 'use_learning_parsers': True, 'obfuscator': 'plain-text', 'jitter': '4/8', 'visibility': '51'} + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.add_operation(name, advid) - mock_method.assert_has_calls([call(exp1, method="put"), call(exp3, method="put")]) + try: + self.cc.add_operation(name=name, + advid=advid) + except pydantic.error_wrappers.ValidationError: + pass + mock_method.assert_has_calls([call(exp1, method='post', rest_path='api/v2/operations')]) # add_adversary def test_add_adversary(self): @@ -278,31 +272,33 @@ class TestExample(unittest.TestCase): ability = "test_ability" description = "test_descritption" - exp = {"index": "adversaries", - "name": name, - "description": description, - "atomic_ordering": [{"id": ability}], - # - "objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective - } + # Caldera 4 + exp_4 = { + "name": name, + "description": description, + "atomic_ordering": ["test_ability"], + # + "objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective + } with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: self.cc.add_adversary(name, ability, description) - mock_method.assert_called_once_with(exp, method="put") + mock_method.assert_called_once_with(exp_4, method="post", rest_path="api/v2/adversaries") def test_add_adversary_default(self): name = "test_name" ability = "test_ability" - exp = {"index": "adversaries", - "name": name, - "description": "created automatically", - "atomic_ordering": [{"id": ability}], - # - "objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective - } + # Caldera 4 + exp_4 = { + "name": name, + "description": "created automatically", + "atomic_ordering": ["test_ability"], + # + "objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective + } with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: self.cc.add_adversary(name, ability) - mock_method.assert_called_once_with(exp, method="put") + mock_method.assert_called_once_with(exp_4, method="post", rest_path="api/v2/adversaries") # execute_operation def test_execute_operation(self): @@ -313,7 +309,7 @@ class TestExample(unittest.TestCase): "op_id": operation_id, "state": state} with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.execute_operation(operation_id, state) + self.cc.set_operation_state(operation_id, state) mock_method.assert_called_once_with(exp) # not supported state @@ -323,7 +319,7 @@ class TestExample(unittest.TestCase): with self.assertRaises(ValueError): with patch.object(self.cc, "__contact_server__", return_value=None): - self.cc.execute_operation(operation_id, state) + self.cc.set_operation_state(operation_id, state) def test_execute_operation_default(self): operation_id = "test_opid" @@ -333,28 +329,24 @@ class TestExample(unittest.TestCase): "state": "running" # default } with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: - self.cc.execute_operation(operation_id) + self.cc.set_operation_state(operation_id) mock_method.assert_called_once_with(exp) # delete_operation def test_delete_operation(self): opid = "test_opid" - exp = {"index": "operations", - "id": opid} with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: self.cc.delete_operation(opid) - mock_method.assert_called_once_with(exp, method="delete") + mock_method.assert_called_once_with({}, method="delete", rest_path="api/v2/operations/test_opid") # delete_adversary def test_delete_adversary(self): adid = "test_adid" - exp = {"index": "adversaries", - "adversary_id": [{"adversary_id": adid}]} with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: self.cc.delete_adversary(adid) - mock_method.assert_called_once_with(exp, method="delete") + mock_method.assert_called_once_with(None, method="delete", rest_path="api/v2/adversaries/test_adid") # is_operation_finished def test_is_operation_finished_true(self): @@ -373,14 +365,6 @@ class TestExample(unittest.TestCase): res = self.cc.is_operation_finished(opid) self.assertEqual(res, False) - def test_is_operation_finished_exception(self): - opdata = [{"chain": [{"statusa": 1}]}] - opid = "does not matter" - - with self.assertRaises(CalderaError): - with patch.object(self.cc, "get_operation_by_id", return_value=opdata): - self.cc.is_operation_finished(opid) - def test_is_operation_finished_exception2(self): opdata = [] opid = "does not matter"