diff --git a/app/attack_log.py b/app/attack_log.py index 6f391d1..9ab730e 100644 --- a/app/attack_log.py +++ b/app/attack_log.py @@ -35,7 +35,7 @@ class AttackLog(): self.datetime_format = "%H:%M:%S.%f" - def __add_to_log__(self, item: dict): + def __add_to_log__(self, item: dict) -> None: """ internal command to add a item to the log :param item: data chunk to add @@ -48,17 +48,22 @@ class AttackLog(): return datetime.datetime.now().strftime(self.datetime_format) - def get_caldera_default_name(self, ability_id: str): + def get_caldera_default_name(self, ability_id: str) -> Optional[str]: """ Returns the default name for this ability based on a db """ + + # TODO: Add a proper database. At least an external file + data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "whoami"} if ability_id not in data: return None return data[ability_id] - def get_caldera_default_description(self, ability_id: str): + def get_caldera_default_description(self, ability_id: str) -> Optional[str]: """ Returns the default description for this ability based on a db """ + # TODO: Add a proper database. At least an external file + data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "Obtain user from current session", "697e8a432031075e47cccba24417013d": "Copy a VBS file to several startup folders", "f39161b2fa5d692ebe3972e0680a8f97": "Copy a BAT file to several startup folders", @@ -71,9 +76,11 @@ class AttackLog(): return data[ability_id] - def get_caldera_default_tactics(self, ability_id: str, ttp: Optional[str]): + def get_caldera_default_tactics(self, ability_id: str, ttp: Optional[str]) -> Optional[str]: """ Returns the default tactics for this ability based on a db """ + # TODO: Add a proper database. At least an external file + data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "System Owner/User Discovery", "f39161b2fa5d692ebe3972e0680a8f97": "Persistence", "16e6823c4656f5cd155051f5f1e5d6ad": "Persistence", @@ -98,9 +105,11 @@ class AttackLog(): return None - def get_caldera_default_tactics_id(self, ability_id: str, ttp: Optional[str]): + def get_caldera_default_tactics_id(self, ability_id: str, ttp: Optional[str]) -> Optional[str]: """ Returns the default name for this ability based on a db """ + # TODO: Add a proper database. At least an external file + data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "T1033", "f39161b2fa5d692ebe3972e0680a8f97": "TA0003", "16e6823c4656f5cd155051f5f1e5d6ad": "TA0003", @@ -125,9 +134,11 @@ class AttackLog(): return None - def get_caldera_default_situation_description(self, ability_id: str): + def get_caldera_default_situation_description(self, ability_id: str) -> Optional[str]: """ Returns the default situation description for this ability based on a db """ + # TODO: Add a proper database. At least an external file + data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": None, "697e8a432031075e47cccba24417013d": None, "f39161b2fa5d692ebe3972e0680a8f97": None, @@ -140,8 +151,13 @@ class AttackLog(): return data[ability_id] - def get_caldera_default_countermeasure(self, ability_id: str): - """ Returns the default countermeasure for this ability based on a db """ + def get_caldera_default_countermeasure(self, ability_id: str) -> Optional[str]: + """ Returns the default countermeasure for this ability based on a db + + :returns: Default countermeasure as string. Or None if not found + """ + + # TODO: Add a proper database. At least an external file data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": None, "697e8a432031075e47cccba24417013d": None, @@ -155,7 +171,7 @@ class AttackLog(): return data[ability_id] - def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs): + def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: Optional[str] = None, **kwargs: dict) -> str: """ Mark the start of a caldera attack :param source: source of the attack. Attack IP @@ -163,6 +179,7 @@ class AttackLog(): :param group: Caldera group of the targets being attacked :param ability_id: Caldera ability id of the attack :param ttp: TTP of the attack (as stated by Caldera internal settings) + :returns: logid """ timestamp = self.__get_timestamp__() @@ -198,7 +215,7 @@ class AttackLog(): # TODO: Add config # TODO: Add results - def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs): + def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs: dict) -> None: """ Mark the end of a caldera attack :param source: source of the attack. Attack IP @@ -230,12 +247,13 @@ class AttackLog(): } self.__add_to_log__(data) - def start_file_write(self, source: str, target: str, file_name: str): + def start_file_write(self, source: str, target: str, file_name: str) -> str: """ Mark the start of a file being written to the target (payload !) :param source: source of the attack. Attack IP (empty if written from controller) :param target: Target machine of the attack :param file_name: Name of the file being written + :returns: logid """ timestamp = self.__get_timestamp__() @@ -254,7 +272,7 @@ class AttackLog(): self.__add_to_log__(data) return logid - def stop_file_write(self, source: str, target: str, file_name: str, **kwargs): + def stop_file_write(self, source: str, target: str, file_name: str, **kwargs: dict) -> None: """ Mark the stop of a file being written to the target (payload !) :param source: source of the attack. Attack IP (empty if written from controller) @@ -278,12 +296,13 @@ class AttackLog(): self.__add_to_log__(data) - def start_execute_payload(self, source: str, target: str, command: str): + def start_execute_payload(self, source: str, target: str, command: str) -> str: """ Mark the start of a payload being executed :param source: source of the attack. Attack IP (empty if written from controller) :param target: Target machine of the attack :param command: + :returns: logid """ timestamp = self.__get_timestamp__() @@ -303,7 +322,7 @@ class AttackLog(): return logid - def stop_execute_payload(self, source: str, target: str, command: str, **kwargs): + def stop_execute_payload(self, source: str, target: str, command: str, **kwargs: dict) -> None: """ Mark the stop of a payload being executed :param source: source of the attack. Attack IP (empty if written from controller) @@ -324,13 +343,14 @@ class AttackLog(): } self.__add_to_log__(data) - def start_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs): + def start_kali_attack(self, source: str, target: str, attack_name: str, ttp: Optional[str] = None, **kwargs: dict) -> str: """ Mark the start of a Kali based attack :param source: source of the attack. Attack IP :param target: Target machine of the attack :param attack_name: Name of the attack. From plugin :param ttp: TTP of the attack. From plugin + :returns: logid """ timestamp = self.__get_timestamp__() @@ -359,11 +379,7 @@ class AttackLog(): return logid - # TODO: Add parameter - # TODO: Add config - # TODO: Add results - - def stop_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs): + def stop_kali_attack(self, source: str, target: str, attack_name: str, ttp: Optional[str] = None, **kwargs: dict) -> None: """ Mark the end of a Kali based attack :param source: source of the attack. Attack IP @@ -385,11 +401,12 @@ class AttackLog(): } self.__add_to_log__(data) - def start_narration(self, text: str): + def start_narration(self, text: str) -> str: """ Add some user defined narration. Can be used in plugins to describe the situation before and after the attack, ... At the moment there is no stop narration command. I do not think we need one. But I want to stick to the structure :param text: Text of the narration + :returns: logid """ timestamp = self.__get_timestamp__() @@ -404,10 +421,11 @@ class AttackLog(): self.__add_to_log__(data) return logid - def start_attack_step(self, text: str): + def start_attack_step(self, text: str) -> str: """ Mark the start of an attack step (several attacks in a chunk) :param text: description of the attack step being started + :returns: logid """ timestamp = self.__get_timestamp__() @@ -425,7 +443,7 @@ class AttackLog(): return logid - def stop_attack_step(self, text: str, **kwargs): + def stop_attack_step(self, text: str, **kwargs: dict) -> None: """ Mark the end of an attack step (several attacks in a chunk) :param text: description of the attack step being stopped @@ -440,13 +458,10 @@ class AttackLog(): } self.__add_to_log__(data) - def start_build(self, **kwargs): + def start_build(self, **kwargs: dict) -> str: """ Mark the start of a tool building/compilation process - :param source: source of the attack. Attack IP - :param target: Target machine of the attack - :param attack_name: Name of the attack. From plugin - :param ttp: TTP of the attack. From plugin + :returns: logid """ timestamp = self.__get_timestamp__() @@ -476,11 +491,7 @@ class AttackLog(): return logid - # TODO: Add parameter - # TODO: Add config - # TODO: Add results - - def stop_build(self, **kwargs): + def stop_build(self, **kwargs: dict) -> None: """ Mark the end of a tool building/compilation process :param source: source of the attack. Attack IP @@ -497,13 +508,14 @@ class AttackLog(): } self.__add_to_log__(data) - def start_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs): + def start_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs: dict) -> str: """ Mark the start of a Metasploit based attack :param source: source of the attack. Attack IP :param target: Target machine of the attack :param metasploit_command: The command to metasploit :param ttp: TTP of the attack. From plugin + :returns: logid """ timestamp = self.__get_timestamp__() @@ -544,7 +556,7 @@ class AttackLog(): return logid - def stop_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs): + def stop_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs: None) -> None: """ Mark the start of a Metasploit based attack :param source: source of the attack. Attack IP @@ -566,13 +578,14 @@ class AttackLog(): } self.__add_to_log__(data) - def start_attack_plugin(self, source: str, target: str, plugin_name: str, ttp: str = None): + def start_attack_plugin(self, source: str, target: str, plugin_name: str, ttp: str = None) -> str: """ Mark the start of an attack plugin :param source: source of the attack. Attack IP :param target: Target machine of the attack :param plugin_name: Name of the plugin :param ttp: TTP of the attack. From plugin + :returns: logid """ timestamp = self.__get_timestamp__() @@ -596,7 +609,7 @@ class AttackLog(): # TODO: Add config # TODO: Add results - def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs): + def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs: dict) -> None: """ Mark the end of an attack plugin :param source: source of the attack. Attack IP @@ -606,6 +619,10 @@ class AttackLog(): :param kwargs: *ttp*, *logid* """ + tag: Optional[str] = None + if kwargs.get("ttp", None) is not None: + tag = str(kwargs.get("ttp", None)) + data = {"timestamp": self.__get_timestamp__(), "event": "stop", "type": "attack", @@ -613,12 +630,12 @@ class AttackLog(): "source": source, "target": target, "plugin_name": plugin_name, - "hunting_tag": __mitre_fix_ttp__(kwargs.get("ttp", None)), + "hunting_tag": __mitre_fix_ttp__(tag), "logid": kwargs.get("logid", None) } self.__add_to_log__(data) - def write_json(self, filename: str): + def write_json(self, filename: str) -> None: """ Write the json data for this log :param filename: Name of the json file @@ -626,7 +643,7 @@ class AttackLog(): with open(filename, "wt") as fh: json.dump(self.get_dict(), fh) - def post_process(self): + def post_process(self) -> None: """ Post process the data before using it """ for entry in self.log: @@ -640,7 +657,7 @@ class AttackLog(): if "result" in entry: replace_entry["result"] = entry["result"] - def get_dict(self): + def get_dict(self) -> dict: """ Return logged data in dict format """ res = {"boilerplate": {"log_format_major_version": 1, # Changes on changes that breaks readers (items are modified or deleted) @@ -652,7 +669,7 @@ class AttackLog(): return res - def add_machine_info(self, machine_info: dict): + def add_machine_info(self, machine_info: dict) -> None: """ Adds a dict with machine info. One machine per call of this method """ self.machines.append(machine_info) @@ -664,7 +681,7 @@ class AttackLog(): # TODO: Return full doc - def vprint(self, text: str, verbosity: int): + def vprint(self, text: str, verbosity: int) -> None: """ verbosity based stdout printing 0: Errors only diff --git a/app/calderaapi_4.py b/app/calderaapi_4.py index 9e3b8c2..c5b4766 100644 --- a/app/calderaapi_4.py +++ b/app/calderaapi_4.py @@ -5,17 +5,18 @@ import json from pprint import pformat -from typing import Optional, Union, Annotated +from typing import Optional, Union, Annotated, Any import requests import simplejson from pydantic.dataclasses import dataclass from pydantic import conlist # pylint: disable=no-name-in-module +from app.attack_log import AttackLog +from app.config import ExperimentConfig # from app.exceptions import CalderaError # from app.interface_sfx import CommandlineColors -# TODO: Ability deserves an own class. # TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)" @dataclass @@ -68,7 +69,7 @@ class Executor: # pylint: disable=missing-class-docstring platform: str command: Optional[str] - def get(self, akey, default=None): + def get(self, akey: str, default: Any = None) -> Any: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ if akey in self.__dict__: return self.__dict__[akey] @@ -95,7 +96,7 @@ class Ability: ability_id: str privilege: Optional[str] = None - def get(self, akey, default=None): + def get(self, akey: str, default: Any = None) -> Any: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ if akey in self.__dict__: return self.__dict__[akey] @@ -108,7 +109,7 @@ class AbilityList: """ A list of exploits """ abilities: Annotated[list, conlist(Ability, min_items=1)] - def get_data(self): + def get_data(self) -> list[Ability]: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.abilities @@ -126,7 +127,7 @@ class ObfuscatorList: """ A list of obfuscators """ obfuscators: Annotated[list, conlist(Obfuscator, min_items=1)] - def get_data(self): + def get_data(self) -> list[Obfuscator]: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.obfuscators @@ -143,7 +144,7 @@ class Adversary: tags: list[str] plugin: Optional[str] = None - def get(self, akey, default=None): + def get(self, akey: str, default: Any = None) -> Any: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ if akey in self.__dict__: return self.__dict__[akey] @@ -156,7 +157,7 @@ class AdversaryList: """ A list of adversary """ adversaries: Annotated[list, conlist(Adversary, min_items=1)] - def get_data(self): + def get_data(self) -> list[Adversary]: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.adversaries @@ -177,7 +178,7 @@ class Fact: # pylint: disable=missing-class-docstring technique_id: Optional[str] = None collected_by: Optional[str] = None - def get(self, akey, default=None): + def get(self, akey: str, default: Any = None) -> Any: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ if akey in self.__dict__: return self.__dict__[akey] @@ -218,7 +219,6 @@ class Link: # pylint: disable=missing-class-docstring score: int used: list[Fact] facts: list[Fact] - agent_reported_time: str id: str # pylint: disable=invalid-name collect: str command: str @@ -226,6 +226,7 @@ class Link: # pylint: disable=missing-class-docstring relationships: list[Relationship] jitter: int deadman: bool + agent_reported_time: Optional[str] = "" @dataclass @@ -262,7 +263,7 @@ class Agent: pending_contact: str privilege: Optional[str] = None # Error, not documented - def get(self, akey, default=None): + def get(self, akey: str, default: Any = None) -> Any: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ if akey in self.__dict__: return self.__dict__[akey] @@ -275,7 +276,7 @@ class AgentList: """ A list of agents """ agents: list[Agent] - def get_data(self): + def get_data(self) -> list[Agent]: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.agents @@ -305,7 +306,7 @@ class Source: # pylint: disable=missing-class-docstring id: str # pylint: disable=invalid-name adjustments: Optional[list[Adjustment]] = None - def get(self, akey, default=None): + def get(self, akey: str, default: Any = None) -> Any: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ if akey in self.__dict__: return self.__dict__[akey] @@ -317,7 +318,7 @@ class Source: # pylint: disable=missing-class-docstring class SourceList: # pylint: disable=missing-class-docstring sources: list[Source] - def get_data(self): + def get_data(self) -> list[Source]: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.sources @@ -342,7 +343,7 @@ class PlannerList: """ A list of planners""" planners: list[Planner] - def get_data(self): + def get_data(self) -> list[Planner]: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.planners @@ -364,7 +365,7 @@ class Objective: # pylint: disable=missing-class-docstring description: str id: str # pylint: disable=invalid-name - def get(self, akey, default=None): + def get(self, akey: str, default: Any = None) -> Any: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ if akey in self.__dict__: return self.__dict__[akey] @@ -393,7 +394,7 @@ class Operation: auto_close: bool chain: Optional[list] = None - def get(self, akey, default=None): + def get(self, akey: str, default: Any = None) -> Any: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ if akey in self.__dict__: return self.__dict__[akey] @@ -406,7 +407,7 @@ class OperationList: """ A list of operations """ operations: Annotated[list, conlist(Operation)] - def get_data(self): + def get_data(self) -> list[Operation]: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.operations @@ -415,7 +416,7 @@ class OperationList: class ObjectiveList: # pylint: disable=missing-class-docstring objectives: Annotated[list, conlist(Objective)] - def get_data(self): + def get_data(self) -> list[Objective]: """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.objectives @@ -423,7 +424,7 @@ class ObjectiveList: # pylint: disable=missing-class-docstring class CalderaAPI(): """ Remote control Caldera through REST api """ - def __init__(self, server: str, attack_logger, config=None, apikey=None): + def __init__(self, server: str, attack_logger: AttackLog, config: ExperimentConfig = None, apikey: str = "ADMIN123") -> None: """ @param server: Caldera server url/ip @@ -440,7 +441,7 @@ class CalderaAPI(): else: self.apikey = apikey - def __contact_server__(self, payload, rest_path: str = "api/v2/abilities", method: str = "get"): + def __contact_server__(self, payload: Optional[dict], rest_path: str = "api/v2/abilities", method: str = "get") -> dict: """ @param payload: payload as dict to send to the server @@ -486,7 +487,7 @@ class CalderaAPI(): return res - def list_abilities(self): + def list_abilities(self) -> list[Ability]: """ Return all ablilities """ payload = None @@ -494,7 +495,7 @@ class CalderaAPI(): abilities = AbilityList(**data) return abilities.get_data() - def list_obfuscators(self): + def list_obfuscators(self) -> list[Obfuscator]: """ Return all obfuscators """ payload = None @@ -502,7 +503,7 @@ class CalderaAPI(): obfuscators = ObfuscatorList(**data) return obfuscators.get_data() - def list_adversaries(self): + def list_adversaries(self) -> list[Adversary]: """ Return all adversaries """ payload = None @@ -510,7 +511,7 @@ class CalderaAPI(): adversaries = AdversaryList(**data) return adversaries.get_data() - def list_sources(self): + def list_sources(self) -> list[Source]: """ Return all sources """ payload = None @@ -518,7 +519,7 @@ class CalderaAPI(): sources = SourceList(**data) return sources.get_data() - def list_planners(self): + def list_planners(self) -> list[Planner]: """ Return all planners """ payload = None @@ -526,7 +527,7 @@ class CalderaAPI(): planners = PlannerList(**data) return planners.get_data() - def list_operations(self): + def list_operations(self) -> list[Operation]: """ Return all operations """ payload = None @@ -534,7 +535,7 @@ class CalderaAPI(): operations = OperationList(**data) return operations.get_data() - def set_operation_state(self, operation_id: str, state: str = "running"): + def set_operation_state(self, operation_id: str, state: str = "running") -> dict: """ Executes an operation on a server @param operation_id: The operation to modify @@ -550,7 +551,7 @@ class CalderaAPI(): payload = {"state": state} return self.__contact_server__(payload, method="patch", rest_path=f"api/v2/operations/{operation_id}") - def list_agents(self): + def list_agents(self) -> list[Agent]: """ Return all agents """ payload = None @@ -558,7 +559,7 @@ class CalderaAPI(): agents = AgentList(**data) return agents.get_data() - def list_objectives(self): + def list_objectives(self) -> list[Objective]: """ Return all objectivs """ payload = None @@ -566,7 +567,7 @@ class CalderaAPI(): objectives = ObjectiveList(**data) return objectives.get_data() - def add_adversary(self, name: str, ability: str, description: str = "created automatically"): + def add_adversary(self, name: str, ability: str, description: str = "created automatically") -> dict: """ Adds a new adversary :param name: Name of the adversary @@ -587,31 +588,34 @@ class CalderaAPI(): # ], "description": description } + # TODO Check this return value data = {"agents": self.__contact_server__(payload, method="post", rest_path="api/v2/adversaries")} # agents = AgentList(**data) return data - def delete_adversary(self, adversary_id: str): + def delete_adversary(self, adversary_id: str) -> dict: """ Deletes an adversary :param adversary_id: The id of this adversary :return: """ payload = None + # TODO Check this return value data = {"agents": self.__contact_server__(payload, method="delete", rest_path=f"api/v2/adversaries/{adversary_id}")} return data - def delete_agent(self, agent_paw: str): + def delete_agent(self, agent_paw: str) -> dict: """ Deletes an agent :param agent_paw: the paw to delete :return: """ payload = None + # TODO Check this return value data = {"agents": self.__contact_server__(payload, method="delete", rest_path=f"api/v2/agents/{agent_paw}")} return data - def kill_agent(self, agent_paw: str): + def kill_agent(self, agent_paw: str) -> dict: """ Kills an agent on the target :param agent_paw: The paw identifying this agent @@ -623,7 +627,7 @@ class CalderaAPI(): data = self.__contact_server__(payload, method="patch", rest_path=f"api/v2/agents/{agent_paw}") return data - def add_operation(self, **kwargs): + def add_operation(self, **kwargs: dict) -> OperationList: """ Adds a new operation :param kwargs: @@ -632,14 +636,18 @@ class CalderaAPI(): # name, adversary_id, source_id = "basic", planner_id = "atomic", group = "", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8' - name: str = kwargs.get("name") - adversary_id: str = kwargs.get("adversary_id") - source_id: str = kwargs.get("source_id", "basic") - planner_id: str = kwargs.get("planner_id", "atomic") - group: str = kwargs.get("group", "") - state: str = kwargs.get("state", "running") - obfuscator: str = kwargs.get("obfuscator", "plain-text") - jitter: str = kwargs.get("jitter", "4/8") + if kwargs.get("adversary_id") is None: + adversary_id = None + else: + adversary_id = str(kwargs.get("adversary_id")) + + name: str = str(kwargs.get("name")) + source_id: str = str(kwargs.get("source_id", "basic")) + planner_id: str = str(kwargs.get("planner_id", "atomic")) + group: str = str(kwargs.get("group", "")) + state: str = str(kwargs.get("state", "running")) + obfuscator: str = str(kwargs.get("obfuscator", "plain-text")) + jitter: str = str(kwargs.get("jitter", "4/8")) payload = {"name": name, "group": group, @@ -657,20 +665,20 @@ class CalderaAPI(): operations = OperationList(**data) return operations - def delete_operation(self, operation_id): + def delete_operation(self, operation_id: str) -> dict: """ Deletes an operation :param operation_id: The Id of the operation to delete :return: """ - payload = {} + payload: dict = {} data = self.__contact_server__(payload, method="delete", rest_path=f"api/v2/operations/{operation_id}") return data - def view_operation_report(self, operation_id): + def view_operation_report(self, operation_id: str) -> dict: """ Views the report of a finished operation :param operation_id: The id of this operation @@ -685,7 +693,7 @@ class CalderaAPI(): return data - def get_ability(self, abid: str): + def get_ability(self, abid: str) -> list[Ability]: """" Return an ability by id @param abid: Ability id @@ -698,12 +706,12 @@ class CalderaAPI(): with open("debug_removeme.txt", "wt") as fh: fh.write(pformat(self.list_abilities())) - for ability in self.list_abilities()["abilities"]: + for ability in self.list_abilities(): if ability.get("ability_id", None) == abid or ability.get("auto_generated_guid", None) == abid: res.append(ability) return res - def pretty_print_ability(self, abi): + def pretty_print_ability(self, abi: dict) -> None: """ Pretty pritns an ability @param abi: A ability dict diff --git a/app/doc_generator.py b/app/doc_generator.py index 9f4a9d0..649f089 100644 --- a/app/doc_generator.py +++ b/app/doc_generator.py @@ -4,16 +4,18 @@ import json import os +from typing import Optional + from jinja2 import Environment, FileSystemLoader, select_autoescape class DocGenerator(): """ Generates human readable docs from attack logs """ - def __init__(self): - self.outfile = None + def __init__(self) -> None: + self.outfile: Optional[str] = None - def generate(self, jfile, outfile="tools/human_readable_documentation/source/contents.rst"): + def generate(self, jfile: str, outfile: str = "tools/human_readable_documentation/source/contents.rst") -> None: """ Generates human readable documentation out of a template. @param jfile: json attack log created by PurpleDome as data source @@ -39,12 +41,12 @@ class DocGenerator(): with open(outfile, "wt") as fh: fh.write(rendered) - def compile_documentation(self): + def compile_documentation(self) -> None: """ Compiles the documentation using make """ os.system("cd tools/human_readable_documentation ; make html; make latexpdf ") - def get_outfile_paths(self): + def get_outfile_paths(self) -> list[str]: """ Returns the path of the output file written """ return ["tools/human_readable_documentation/build/latex/purpledomesimulation.pdf"] diff --git a/app/experimentcontrol.py b/app/experimentcontrol.py index baa21b8..ad03c7d 100644 --- a/app/experimentcontrol.py +++ b/app/experimentcontrol.py @@ -13,7 +13,7 @@ from typing import Optional from app.attack_log import AttackLog from app.config import ExperimentConfig from app.interface_sfx import CommandlineColors -from app.exceptions import ServerError, CalderaError, MachineError +from app.exceptions import ServerError, CalderaError, MachineError, PluginError from app.pluginmanager import PluginManager from app.doc_generator import DocGenerator from app.calderacontrol import CalderaControl @@ -26,7 +26,7 @@ from plugins.base.attack import AttackPlugin class Experiment(): """ Class handling experiments """ - def __init__(self, configfile: str, verbosity=0): + def __init__(self, configfile: str, verbosity: int = 0) -> None: """ :param configfile: Path to the configfile to load @@ -43,7 +43,7 @@ class Experiment(): self.attack_logger = AttackLog(verbosity) self.plugin_manager = PluginManager(self.attack_logger) - def run(self, caldera_attacks: list = None): + def run(self, caldera_attacks: list = None) -> None: """ Run the experiment @@ -118,7 +118,7 @@ class Experiment(): zip_this += document_generator.get_outfile_paths() self.zip_loot(zip_this) - def run_plugin_attacks(self): + def run_plugin_attacks(self) -> None: """ Run plugin based attacks """ @@ -137,7 +137,7 @@ class Experiment(): time.sleep(self.experiment_config.get_nap_time()) self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1) - def run_caldera_attacks(self, caldera_attacks: Optional[list[str]] = None): + def run_caldera_attacks(self, caldera_attacks: Optional[list[str]] = None) -> None: """ Run caldera based attacks @@ -203,17 +203,21 @@ class Experiment(): # End of fix self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1) - def add_running_machines_to_log(self): + def add_running_machines_to_log(self) -> None: """ Add machine infos for targets and attacker to the log """ for target in self.targets: + if target is None: + raise MachineError("Target machine configured to None or whatever happened") i = target.get_machine_info() i["role"] = "target" self.attack_logger.add_machine_info(i) + if self.attacker_1 is None: + raise MachineError("Attacker machine gone") i = self.attacker_1.get_machine_info() i["role"] = "attacker" self.attack_logger.add_machine_info(i) - def wait_until_all_targets_have_caldera_implants(self, caldera_url: str, caldera_attacks: Optional[list[str]] = None): + def wait_until_all_targets_have_caldera_implants(self, caldera_url: str, caldera_attacks: Optional[list[str]] = None) -> None: """ :param caldera_attacks: a list of command line defined caldera attacks @@ -236,7 +240,7 @@ class Experiment(): time.sleep(120) # Was 30, but maybe there are timing issues running_agents = self.caldera_control.list_paws_of_running_agents() - def first_start_of_caldera_implants(self, caldera_attacks: Optional[list[str]] = None): + def first_start_of_caldera_implants(self, caldera_attacks: Optional[list[str]] = None) -> None: """ Start caldera implant on the targets :param caldera_attacks: a list of command line defined caldera attacks @@ -253,7 +257,7 @@ class Experiment(): time.sleep(20) # Wait for all the clients to contact the caldera server # TODO: Smarter wait - def install_sensor_plugins(self): + def install_sensor_plugins(self) -> None: """ Installs sensor plugins on the targets """ @@ -262,7 +266,7 @@ class Experiment(): a_target.install_sensors() a_target.start_sensors() - def install_vulnerabilities(self): + def install_vulnerabilities(self) -> None: """ Install vulnerabilities on the targets """ @@ -271,7 +275,7 @@ class Experiment(): a_target.install_vulnerabilities() a_target.start_vulnerabilities() - def start_target_machines(self, caldera_attacks: Optional[list[str]] = None): + def start_target_machines(self, caldera_attacks: Optional[list[str]] = None) -> None: """ Start target machines :param caldera_attacks: Caldera attacks as defined on the command line @@ -309,7 +313,7 @@ class Experiment(): self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1) self.targets.append(target_1) - def machine_needs_caldera(self, target, caldera_from_cmdline: Optional[list[str]] = None) -> int: + def machine_needs_caldera(self, target: Machine, caldera_from_cmdline: Optional[list[str]] = None) -> int: """ Counts the attacks and plugins needing caldera that are registered for this machine :param target: Target machine we will check the config file for assigned caldera attacks for @@ -328,7 +332,7 @@ class Experiment(): return c_cmdline + c_conffile + c_plugins - def attack(self, target, attack): + def attack(self, target: Machine, attack: str) -> None: """ Pick an attack and run it :param attack: Name of the attack to run @@ -338,20 +342,24 @@ class Experiment(): for plugin in self.plugin_manager.get_plugins(AttackPlugin, [attack]): name = plugin.get_name() + if isinstance(plugin, AttackPlugin): + self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Attack plugin {name}{CommandlineColors.ENDC}", 2) + plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name())) + plugin.set_attacker_machine(self.attacker_1) + plugin.set_sysconf({}) + plugin.set_logger(self.attack_logger) + if self.caldera_control is None: + raise CalderaError("Caldera control not initialised") + plugin.set_caldera(self.caldera_control) + plugin.connect_metasploit() + plugin.install() + + # plugin.__set_logger__(self.attack_logger) + plugin.__execute__([target]) + else: + raise PluginError("AttackPlugin is not really an AttackPlugin type") - self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Attack plugin {name}{CommandlineColors.ENDC}", 2) - plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name())) - plugin.set_attacker_machine(self.attacker_1) - plugin.set_sysconf({}) - plugin.set_logger(self.attack_logger) - plugin.set_caldera(self.caldera_control) - plugin.connect_metasploit() - plugin.install() - - # plugin.__set_logger__(self.attack_logger) - plugin.__execute__([target]) - - def zip_loot(self, zip_this: list[str]): + def zip_loot(self, zip_this: list[str]) -> None: """ Zip the loot together :param zip_this: A list of file paths to add to the zip file @@ -373,7 +381,7 @@ class Experiment(): default_name = os.path.join(self.loot_dir, "..", "most_recent.zip") shutil.copyfile(filename, default_name) - def __start_attacker(self): + def __start_attacker(self) -> None: """ Start the attacking VM """ # Preparing attacker @@ -401,6 +409,8 @@ class Experiment(): raise ServerError # self.attacker_1.set_attack_logger(self.attack_logger) - def __stop_attacker(self): + def __stop_attacker(self) -> None: """ Stop the attacking VM """ + if self.attacker_1 is None: + raise MachineError("Attacker machine not initialised") self.attacker_1.halt()