Extended documentation

pull/33/head
Thorsten Sick 2 years ago
parent ed20a28128
commit b2fc4c81d4

@ -27,7 +27,7 @@ class AttackLog():
def __init__(self, verbosity: int = 0):
"""
@param verbosity: verbosity setting from 0 to 3 for stdout printing
:param verbosity: verbosity setting from 0 to 3 for stdout printing
"""
self.log: list[dict] = []
self.machines: list[dict] = []
@ -38,7 +38,7 @@ class AttackLog():
def __add_to_log__(self, item: dict):
""" internal command to add a item to the log
@param item: data chunk to add
:param item: data chunk to add
"""
self.log.append(item)
@ -158,11 +158,11 @@ class AttackLog():
def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs):
""" Mark the start of a caldera attack
@param source: source of the attack. Attack IP
@param paw: Caldera paw of the targets being attacked
@param group: Caldera group of the targets being attacked
@param ability_id: Caldera ability id of the attack
@param ttp: TTP of the attack (as stated by Caldera internal settings)
:param source: source of the attack. Attack IP
:param paw: Caldera paw of the targets being attacked
:param group: Caldera group of the targets being attacked
:param ability_id: Caldera ability id of the attack
:param ttp: TTP of the attack (as stated by Caldera internal settings)
"""
timestamp = self.__get_timestamp__()
@ -201,15 +201,15 @@ class AttackLog():
def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs):
""" Mark the end of a caldera attack
@param source: source of the attack. Attack IP
@param paw: Caldera oaw of the targets being attacked
@param group: Caldera group of the targets being attacked
@param ability_id: Caldera ability id of the attack
@param ttp: TTP of the attack (as stated by Caldera internal settings)
@param name: Name of the attack. Data source is Caldera internal settings
@param description: Descirption of the attack. Caldera is the source
@param obfuscator: C&C obfuscator being used
@param jitter: Jitter being used
:param source: source of the attack. Attack IP
:param paw: Caldera oaw of the targets being attacked
:param group: Caldera group of the targets being attacked
:param ability_id: Caldera ability id of the attack
:param ttp: TTP of the attack (as stated by Caldera internal settings)
:param name: Name of the attack. Data source is Caldera internal settings
:param description: Descirption of the attack. Caldera is the source
:param obfuscator: C&C obfuscator being used
:param jitter: Jitter being used
"""
data = {"timestamp": self.__get_timestamp__(),
@ -233,9 +233,9 @@ class AttackLog():
def start_file_write(self, source: str, target: str, file_name: str):
""" Mark the start of a file being written to the target (payload !)
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param file_name: Name of the file being written
:param source: source of the attack. Attack IP (empty if written from controller)
:param target: Target machine of the attack
:param file_name: Name of the file being written
"""
timestamp = self.__get_timestamp__()
@ -257,11 +257,11 @@ class AttackLog():
def stop_file_write(self, source: str, target: str, file_name: str, **kwargs):
""" Mark the stop of a file being written to the target (payload !)
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param attack_name: Name of the attack. From plugin
@param file_name: Name of the file being written
@param logid: logid of the corresponding start command
:param source: source of the attack. Attack IP (empty if written from controller)
:param target: Target machine of the attack
:param attack_name: Name of the attack. From plugin
:param file_name: Name of the file being written
:param logid: logid of the corresponding start command
kwargs: logid to link to start_file_write
"""
@ -281,9 +281,9 @@ class AttackLog():
def start_execute_payload(self, source: str, target: str, command: str):
""" Mark the start of a payload being executed
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param command:
:param source: source of the attack. Attack IP (empty if written from controller)
:param target: Target machine of the attack
:param command:
"""
timestamp = self.__get_timestamp__()
@ -306,11 +306,11 @@ class AttackLog():
def stop_execute_payload(self, source: str, target: str, command: str, **kwargs):
""" Mark the stop of a payload being executed
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param command: Name of the attack. From plugin
@param file_name: Name of the file being written
@param kwargs: logid to link to start_file_write
:param source: source of the attack. Attack IP (empty if written from controller)
:param target: Target machine of the attack
:param command: Name of the attack. From plugin
:param file_name: Name of the file being written
:param kwargs: logid to link to start_file_write
"""
data = {"timestamp": self.__get_timestamp__(),
@ -327,10 +327,10 @@ class AttackLog():
def start_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs):
""" Mark the start of a Kali based attack
@param source: source of the attack. Attack IP
@param target: Target machine of the attack
@param attack_name: Name of the attack. From plugin
@param ttp: TTP of the attack. From plugin
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param attack_name: Name of the attack. From plugin
:param ttp: TTP of the attack. From plugin
"""
timestamp = self.__get_timestamp__()
@ -366,10 +366,10 @@ class AttackLog():
def stop_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs):
""" Mark the end of a Kali based attack
@param source: source of the attack. Attack IP
@param target: Target machine of the attack
@param attack_name: Name of the attack. From plugin
@param ttp: TTP of the attack. From plugin
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param attack_name: Name of the attack. From plugin
:param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": self.__get_timestamp__(),
@ -389,7 +389,7 @@ class AttackLog():
""" Add some user defined narration. Can be used in plugins to describe the situation before and after the attack, ...
At the moment there is no stop narration command. I do not think we need one. But I want to stick to the structure
@param text: Text of the narration
:param text: Text of the narration
"""
timestamp = self.__get_timestamp__()
@ -407,7 +407,7 @@ class AttackLog():
def start_attack_step(self, text: str):
""" Mark the start of an attack step (several attacks in a chunk)
@param text: description of the attack step being started
:param text: description of the attack step being started
"""
timestamp = self.__get_timestamp__()
@ -428,7 +428,7 @@ class AttackLog():
def stop_attack_step(self, text: str, **kwargs):
""" Mark the end of an attack step (several attacks in a chunk)
@param text: description of the attack step being stopped
:param text: description of the attack step being stopped
"""
data = {"timestamp": self.__get_timestamp__(),
@ -443,10 +443,10 @@ class AttackLog():
def start_build(self, **kwargs):
""" Mark the start of a tool building/compilation process
@param source: source of the attack. Attack IP
@param target: Target machine of the attack
@param attack_name: Name of the attack. From plugin
@param ttp: TTP of the attack. From plugin
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param attack_name: Name of the attack. From plugin
:param ttp: TTP of the attack. From plugin
"""
timestamp = self.__get_timestamp__()
@ -483,10 +483,10 @@ class AttackLog():
def stop_build(self, **kwargs):
""" Mark the end of a tool building/compilation process
@param source: source of the attack. Attack IP
@param target: Target machine of the attack
@param attack_name: Name of the attack. From plugin
@param ttp: TTP of the attack. From plugin
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param attack_name: Name of the attack. From plugin
:param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": self.__get_timestamp__(),
@ -500,10 +500,10 @@ class AttackLog():
def start_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs):
""" Mark the start of a Metasploit based attack
@param source: source of the attack. Attack IP
@param target: Target machine of the attack
@param metasploit_command: The command to metasploit
@param ttp: TTP of the attack. From plugin
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param metasploit_command: The command to metasploit
:param ttp: TTP of the attack. From plugin
"""
timestamp = self.__get_timestamp__()
@ -538,10 +538,10 @@ class AttackLog():
def stop_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs):
""" Mark the start of a Metasploit based attack
@param source: source of the attack. Attack IP
@param target: Target machine of the attack
@param metasploit_command: The command to metasploit
@param ttp: TTP of the attack. From plugin
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param metasploit_command: The command to metasploit
:param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": self.__get_timestamp__(),
@ -560,10 +560,10 @@ class AttackLog():
def start_attack_plugin(self, source: str, target: str, plugin_name: str, ttp: str = None):
""" Mark the start of an attack plugin
@param source: source of the attack. Attack IP
@param target: Target machine of the attack
@param plugin_name: Name of the plugin
@param ttp: TTP of the attack. From plugin
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param plugin_name: Name of the plugin
:param ttp: TTP of the attack. From plugin
"""
timestamp = self.__get_timestamp__()
@ -590,11 +590,11 @@ class AttackLog():
def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs):
""" Mark the end of an attack plugin
@param source: source of the attack. Attack IP
@param target: Target machine of the attack
@param plugin_name: Name of the plugin
@param logid: logid of the corresponding start command
@param kwargs: *ttp*, *logid*
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param plugin_name: Name of the plugin
:param logid: logid of the corresponding start command
:param kwargs: *ttp*, *logid*
"""
data = {"timestamp": self.__get_timestamp__(),
@ -612,7 +612,7 @@ class AttackLog():
def write_json(self, filename: str):
""" Write the json data for this log
@param filename: Name of the json file
:param filename: Name of the json file
"""
with open(filename, "wt") as fh:
json.dump(self.get_dict(), fh)
@ -663,8 +663,8 @@ class AttackLog():
2: Detailed progress information
3: Debug logs, data dumps, everything
@param text: The text to print
@param verbosity: the verbosity level the text has.
:param text: The text to print
:param verbosity: the verbosity level the text has.
"""
if verbosity <= self.verbosity:

@ -22,13 +22,14 @@ from app.calderaapi_4 import CalderaAPI
class CalderaControl(CalderaAPI):
""" Remote control Caldera through REST api """
def fetch_client(self, platform: str = "windows", file: str = "sandcat.go", target_dir: str = ".", extension: str = ""):
def fetch_client(self, platform: str = "windows", file: str = "sandcat.go", target_dir: str = ".", extension: str = "") -> str:
""" Downloads the appropriate Caldera client
@param platform: Platform to download the agent for
@param file: file to download from caldera. This defines the agent type
@param target_dir: directory to drop the new file into
@param extension: File extension to add to the downloaded file
:param platform: Platform to download the agent for
:param file: file to download from caldera. This defines the agent type
:param target_dir: directory to drop the new file into
:param extension: File extension to add to the downloaded file
:returns: filename of the client
"""
header = {"platform": platform,
"file": file}
@ -40,18 +41,23 @@ class CalderaControl(CalderaAPI):
# print(r.headers)
return filename
def list_sources_for_name(self, name: str):
""" List facts in a source pool with a specific name """
def list_sources_for_name(self, name: str) -> Optional[dict]:
""" List facts in a source pool with a specific name
:param name: The name of the source pool
:returns: The source data or None
"""
for i in self.list_sources():
if i.get("name") == name:
return i
return None
def list_facts_for_name(self, name: str):
def list_facts_for_name(self, name: str) -> dict:
""" Pretty format for facts
@param name: Name of the source ot look into
:param name: Name of the source ot look into
:returns: A pretty dict with facts
"""
source = self.list_sources_for_name(name)
@ -67,16 +73,20 @@ class CalderaControl(CalderaAPI):
}
return res
def list_paws_of_running_agents(self):
""" Returns a list of all paws of running agents """
def list_paws_of_running_agents(self) -> list[str]:
""" Returns a list of all paws of running agents
:returns: A list of running agents (or better: the list of their paws)
"""
return [i.get("paw") for i in self.list_agents()] # 2.8.1 version
# return [i.paw for i in self.list_agents()] # 4* version
# ######### Get one specific item
def get_operation(self, name: str):
def get_operation(self, name: str) -> Optional[dict]:
""" Gets an operation by name
@param name: Name of the operation to look for
:param name: Name of the operation to look for
:returns: The operation as dict
"""
for operation in self.list_operations():
@ -84,30 +94,33 @@ class CalderaControl(CalderaAPI):
return operation
return None
def get_adversary(self, name: str):
def get_adversary(self, name: str) -> Optional[dict]:
""" Gets a specific adversary by name
@param name: Name to look for
:param name: Name to look for
:returns: The adversary as dict
"""
for adversary in self.list_adversaries():
if adversary.get("name") == name:
return adversary
return None
def get_objective(self, name: str):
def get_objective(self, name: str) -> Optional[dict]:
""" Returns an objective with a given name
@param name: Name to filter for
:param name: Name to filter for
:returns: The objective as dict
"""
for objective in self.list_objectives():
if objective.get("name") == name:
return objective
return None
def get_ability(self, abid: str):
"""" Return an ability by id
def get_ability(self, abid: str) -> list[dict]:
""" Return an ability by id
@param abid: Ability id
:param abid: Ability id
:returns: a list of abilities
"""
res = []
@ -125,8 +138,9 @@ class CalderaControl(CalderaAPI):
def does_ability_support_platform(self, abid: str, platform: str) -> bool:
""" Checks if an ability supports a specific os
@param abid: ability id.
@param platform: os string to match for
:param abid: ability id.
:param platform: os string to match for
:returns: True if platform is supported
"""
# caldera knows the os-es "windows", "linux" and "darwin"
@ -148,10 +162,11 @@ class CalderaControl(CalderaAPI):
print(self.get_ability(abid))
return False
def get_operation_by_id(self, op_id: str):
def get_operation_by_id(self, op_id: str) -> list[dict]:
""" Get operation by id
@param op_id: Operation id
:param op_id: Operation id
:returns: a list of operations matching the id
"""
operations = self.list_operations()
@ -161,12 +176,13 @@ class CalderaControl(CalderaAPI):
return [an_operation]
return []
def get_linkid(self, op_id: str, paw: str, ability_id: str):
def get_linkid(self, op_id: str, paw: str, ability_id: str) -> Optional[dict]:
""" Get the id of a link identified by paw and ability_id
@param op_id: Operation id
@param paw: Paw of the agent
@param ability_id: Ability id to filter for
:param op_id: Operation id
:param paw: Paw of the agent
:param ability_id: Ability id to filter for
:returns: The ability
"""
operation = self.get_operation_by_id(op_id)
@ -181,12 +197,13 @@ class CalderaControl(CalderaAPI):
# ######### View
def view_operation_output(self, opid: str, paw: str, ability_id: str):
def view_operation_output(self, opid: str, paw: str, ability_id: str) -> Optional[dict]:
""" Gets the output of an executed ability
@param opid: Id of the operation to look for
@param paw: Paw of the agent to look up
@param ability_id: if of the ability to extract the output from
:param opid: Id of the operation to look for
:param paw: Paw of the agent to look up
:param ability_id: if of the ability to extract the output from
:returns: The output
"""
orep = self.view_operation_report(opid)
@ -228,11 +245,12 @@ class CalderaControl(CalderaAPI):
# Link, chain and stuff
def is_operation_finished(self, opid: str, debug: bool = False):
def is_operation_finished(self, opid: str, debug: bool = False) -> bool:
""" Checks if an operation finished - finished is not necessary successful !
@param opid: Operation id to check
@param debug: Additional debug output
:param opid: Operation id to check
:param debug: Additional debug output
:returns: True if the operation is finished
"""
# An operation can run several Abilities vs several targets (agents). Each one is a link in the chain (see opperation report).
# Those links can have the states:
@ -260,12 +278,13 @@ class CalderaControl(CalderaAPI):
return False
def is_operation_finished_multi(self, opid: str):
def is_operation_finished_multi(self, opid: str) -> bool:
""" Checks if an operation finished - finished is not necessary successful ! On several targets.
All links (~ abilities) on all targets must have the status 0 for this to be True.
@param opid: Operation id to check
:param opid: Operation id to check
:returns: True if the operation is finished
"""
# An operation can run several Abilities vs several targets (agents). Each one is a link in the chain (see opperation report).
# Those links can have the states:
@ -291,16 +310,16 @@ class CalderaControl(CalderaAPI):
# ######## All inclusive methods
def attack(self, paw: str = "kickme", ability_id: str = "bd527b63-9f9e-46e0-9816-b8434d2b8989",
group: str = "red", target_platform: Optional[str] = None, parameters: Optional[str] = None, **kwargs):
group: str = "red", target_platform: Optional[str] = None, parameters: Optional[str] = None, **kwargs) -> bool:
""" Attacks a system and returns results
@param paw: Paw to attack
@param group: Group to attack. Paw must be in the group
@param ability_id: Ability to run against the target
@param target_platform: Platform of the target machine. Optional. Used for quick-outs
@param parameters: Dict containing key-values of parameters to pass to the ability
:param paw: Paw to attack
:param group: Group to attack. Paw must be in the group
:param ability_id: Ability to run against the target
:param target_platform: Platform of the target machine. Optional. Used for quick-outs
:param parameters: Dict containing key-values of parameters to pass to the ability
@:return : True if the attack was executed. False if it was not. For example the target os is not supported by this attack
:returns: True if the attack was executed. False if it was not. For example the target os is not supported by this attack
"""
# Tested obfuscators (with sandcat):
@ -417,7 +436,7 @@ class CalderaControl(CalderaAPI):
def pretty_print_ability(self, abi):
""" Pretty pritns an ability
@param abi: A ability dict
:param abi: A ability dict
"""
print("""

@ -22,7 +22,7 @@ class MachineConfig():
def __init__(self, machinedata):
""" Init machine control config
@param machinedata: dict containing machine data
:param machinedata: dict containing machine data
"""
if machinedata is None:
raise ConfigurationError
@ -148,7 +148,7 @@ class ExperimentConfig():
def __init__(self, configfile: str):
""" Init the config, process the file
@param configfile: The configuration file to process
:param configfile: The configuration file to process
"""
self.raw_config: MainConfig = None
@ -162,7 +162,7 @@ class ExperimentConfig():
def load(self, configfile: str):
""" Loads the configuration file
@param configfile: The configuration file to process
:param configfile: The configuration file to process
"""
with open(configfile) as fh:
@ -199,7 +199,7 @@ class ExperimentConfig():
def attacker(self, mid: int) -> MachineConfig:
""" Return config for attacker as MachineConfig objects
@param mid: id of the attacker, 0 is main attacker
:param mid: id of the attacker, 0 is main attacker
"""
return self.attackers()[mid]
@ -227,7 +227,7 @@ class ExperimentConfig():
def attack_conf(self, attack: str) -> dict:
""" Get kali config for a specific kali attack
@param attack: Name of the attack to look up config for
:param attack: Name of the attack to look up config for
"""
if self.raw_config is None:
@ -267,7 +267,7 @@ class ExperimentConfig():
def get_plugin_based_attacks(self, for_os: str) -> list[str]:
""" Get the configured kali attacks to run for a specific OS
@param for_os: The os to query the registered attacks for
:param for_os: The os to query the registered attacks for
"""
if self.raw_config is None:
@ -285,7 +285,7 @@ class ExperimentConfig():
def get_caldera_attacks(self, for_os: str) -> list:
""" Get the configured caldera attacks to run for a specific OS
@param for_os: The os to query the registered attacks for
:param for_os: The os to query the registered attacks for
"""
if self.raw_config is None:
@ -314,7 +314,7 @@ class ExperimentConfig():
def get_sensor_config(self, name: str) -> dict:
""" Return the config for a specific sensor
@param name: name of the sensor
:param name: name of the sensor
"""
if self.raw_config is None:

@ -26,12 +26,12 @@ from plugins.base.attack import AttackPlugin
class Experiment():
""" Class handling experiments """
def __init__(self, configfile, verbosity=0, caldera_attacks: list = None):
def __init__(self, configfile: str, verbosity=0, caldera_attacks: list = None):
"""
@param configfile: Path to the configfile to load
@param verbosity: verbosity level between 0 and 3
@param caldera_attacks: an optional argument to override caldera attacks in the config file and run just this one caldera attack. A list of caldera ID
:param configfile: Path to the configfile to load
:param verbosity: verbosity level between 0 and 3
:param caldera_attacks: an optional argument to override caldera attacks in the config file and run just this one caldera attack. A list of caldera ID
"""
self.attacker_1: Optional[Machine] = None
@ -244,9 +244,9 @@ class Experiment():
def attack(self, target, attack):
""" Pick an attack and run it
@param attack: Name of the attack to run
@param target: IP address of the target
@returns: The output of the cmdline attacking tool
:param attack: Name of the attack to run
:param target: IP address of the target
:returns: The output of the cmdline attacking tool
"""
for plugin in self.plugin_manager.get_plugins(AttackPlugin, [attack]):
@ -283,43 +283,6 @@ class Experiment():
defaultname = os.path.join(self.lootdir, "..", "most_recent.zip")
shutil.copyfile(filename, defaultname)
# @staticmethod
# def __get_results_files(root):
# """ Yields a list of potential result files
# @param root: Root dir of the machine to collect data from
# """
# # TODO: Properly implement. Get proper root parameter
# total = [os.path.join(root, "logstash", "filebeat.json")]
# for a_file in total:
# if os.path.exists(a_file):
# yield a_file
# def __clean_result_files(self, root):
# """ Deletes result files
# @param root: Root dir of the machine to collect data from
# """
# TODO: Properly implement. Get proper root parameter
# for a_file in self.__get_results_files(root):
# os.remove(a_file)
# def __collect_loot(self, root):
# """ Collect results into loot dir
# @param root: Root dir of the machine to collect data from
# """
# try:
# os.makedirs(os.path.abspath(self.experiment_config.loot_dir()))
# except FileExistsError:
# pass
# for a_file in self.__get_results_files(root):
# self.attack_logger.vprint("Copy {} {}".format(a_file, os.path.abspath(self.experiment_config.loot_dir())), 3)
def __start_attacker(self):
""" Start the attacking VM """

@ -24,9 +24,9 @@ class Machine():
def __init__(self, config, attack_logger, calderakey="ADMIN123",):
"""
@param config: The machine configuration as dict
@param attack_logger: The attack logger to use
@param calderakey: Key to the caldera controller
:param config: The machine configuration as dict
:param attack_logger: The attack logger to use
:param calderakey: Key to the caldera controller
"""
self.vm_manager = None
@ -94,7 +94,7 @@ class Machine():
def create(self, reboot=True):
""" Create a VM
@param reboot: Reboot the VM during installation. Required if you want to install software
:param reboot: Reboot the VM during installation. Required if you want to install software
"""
self.vm_manager.__call_create__(reboot)
@ -143,8 +143,8 @@ class Machine():
def remote_run(self, cmd, disown=False):
""" Simplifies connect and run
@param cmd: Command to run as shell command
@param disown: run in background
:param cmd: Command to run as shell command
:param disown: run in background
"""
return self.vm_manager.__call_remote_run__(cmd, disown)
@ -247,7 +247,7 @@ class Machine():
A machine can have several sensors running. Those are defined in a list in the config. This collects the data from the sensors
@param lootdir: Fresh created directory for loot
:param lootdir: Fresh created directory for loot
"""
machine_specific_path = os.path.join(lootdir, self.config.vmname())
@ -336,39 +336,39 @@ class Machine():
############
def get_ip(self):
def get_ip(self) -> str:
""" Returns the IP of the main ethernet interface of this machine """
# TODO: Find a smarter way to get the ip
return self.vm_manager.get_ip()
def get_name(self):
def get_name(self) -> str:
""" Returns the machine name """
return self.config.vmname()
def get_nicknames(self):
def get_nicknames(self) -> list[str]:
""" Returns the machine name """
return self.config.get_nicknames()
def get_playground(self):
def get_playground(self) -> str:
""" Return this machine's playground """
return self.vm_manager.get_playground()
def get_machine_path_external(self):
def get_machine_path_external(self) -> str:
""" Returns the external path for this machine """
return self.vm_manager.get_machine_path_external()
def put(self, src, dst):
def put(self, src: str, dst: str):
""" Send a file to the machine """
return self.vm_manager.put(src, dst)
def get(self, src, dst):
def get(self, src: str, dst: str):
""" Get a file from a machine """
return self.vm_manager.get(src, dst)
@ -390,12 +390,12 @@ class Machine():
# TODO: Caldera implant
# TODO: Metasploit implant
# options for version: 4.0.0-alpha.2 2.8.1
# options for version: "4.0.0-alpha.2" and "2.8.1"
def install_caldera_server(self, cleanup=False, version="4.0.0-alpha.2"):
""" Installs the caldera server on the VM
@param cleanup: Remove the old caldera version. Slow but reduces side effects
@param version: Caldera version to use. Check Caldera git for potential branches to use
:param cleanup: Remove the old caldera version. Slow but reduces side effects
:param version: Caldera version to use. Check Caldera git for potential branches to use
"""
# https://github.com/mitre/caldera.git
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Installing Caldera server {CommandlineColors.ENDC}", 1)
@ -413,7 +413,7 @@ class Machine():
def wait_for_caldera_server(self, timeout=6):
""" Ping caldera server. return as soon as it is responding
@param timeout: timeout in seconds
:param timeout: timeout in seconds
"""
for i in range(timeout):
time.sleep(10)
@ -575,7 +575,7 @@ START {playground}{filename} -server {url} -group {self.config.caldera_group()}
def set_attack_logger(self, attack_logger):
""" Configure the attack logger for this server
@param attack_logger: The attack logger to set
:param attack_logger: The attack logger to set
"""
self.attack_logger = attack_logger

@ -46,10 +46,10 @@ class Metasploit():
def start_exploit_stub_for_external_payload(self, payload='linux/x64/meterpreter_reverse_tcp', exploit='exploit/multi/handler', lhost=None):
""" Start a metasploit handler and wait for external payload to connect
@param payload: The payload being used in the implant
@param exploit: Normally the generic handler. Overwrite it if you feel lucky
@param lhost: the ip of the attack host. Use this to use the attacker ip as seen from the controller.
@:returns: res, which contains "job_id" and "uuid"
:param payload: The payload being used in the implant
:param exploit: Normally the generic handler. Overwrite it if you feel lucky
:param lhost: the ip of the attack host. Use this to use the attacker ip as seen from the controller.
:returns: res, which contains "job_id" and "uuid"
"""
exp = self.get_client().modules.use('exploit', exploit)
# print(exploit.description)
@ -118,7 +118,7 @@ class Metasploit():
def get_sid(self, session_number=0):
""" Get the first session between hacked target and the metasploit server
@param session_number: number of the session to get
:param session_number: number of the session to get
"""
self.wait_for_session()
@ -128,7 +128,7 @@ class Metasploit():
def get_sid_to(self, target):
""" Get the session to a specified target
@param target: a target machine to find in the session list
:param target: a target machine to find in the session list
"""
print(f"Sessions: {self.get_client().sessions.list}")
@ -156,10 +156,10 @@ class Metasploit():
def meterpreter_execute(self, cmds: list[str], session_number: int, delay=0) -> list[str]:
""" Executes commands on the meterpreter, returns results read from shell
@param cmds: commands to execute, a list
@param session_number: session number
@param delay: optional delay between calling the command and expecting a result
@:return: the string results
:param cmds: commands to execute, a list
:param session_number: session number
:param delay: optional delay between calling the command and expecting a result
:return: the string results
"""
shell = self.client.sessions.session(self.get_sid(session_number))
@ -173,10 +173,10 @@ class Metasploit():
def meterpreter_execute_on(self, cmds: list[str], target, delay=0) -> list[str]:
""" Executes commands on the meterpreter, returns results read from shell
@param cmds: commands to execute, a list
@param target: target machine
@param delay: optional delay between calling the command and expecting a result
@:return: the string results
:param cmds: commands to execute, a list
:param target: target machine
:param delay: optional delay between calling the command and expecting a result
:return: the string results
"""
session_id = self.get_sid_to(target)
@ -198,7 +198,10 @@ class Metasploit():
return res
def smart_infect(self, target, **kwargs):
""" Checks if a target already has a meterpreter session open. Will deploy a payload if not """
""" Checks if a target already has a meterpreter session open. Will deploy a payload if not.
:param target: Infect the target
"""
# TODO Smart_infect should detect the platform of the target and pick the proper parameters based on that
@ -310,7 +313,6 @@ class MSFVenom():
def generate_and_deploy(self, **kwargs):
""" Will generate the payload and directly deploy it to the target
:return:
"""
self.generate_payload(**kwargs)
@ -362,8 +364,9 @@ class MetasploitInstant(Metasploit):
"""
def parse_ps(self, ps_output):
def parse_ps(self, ps_output) -> list[dict]:
""" Parses the data from ps
:param ps_output: Metasploit ps output
:return: A list of dicts
"""
@ -398,9 +401,9 @@ class MetasploitInstant(Metasploit):
def filter_ps_results(self, data, user=None, name=None, arch=None):
""" Filter the process lists for certain
@param user: The user to filter for.
@param name: The process name to filter for (executable name)
@param arch: The architecture to select. 'x64' is one option
:param user: The user to filter for.
:param name: The process name to filter for (executable name)
:param arch: The architecture to select. 'x64' is one option
"""
res = data
@ -448,9 +451,9 @@ class MetasploitInstant(Metasploit):
def migrate(self, target, user=None, name=None, arch=None):
""" Migrate to a process matching certain criteria
@param user: The user to filter for.
@param name: The process name to filter for (executable name)
@param arch: The architecture to select. 'x64' is one option
:param user: The user to filter for.
:param name: The process name to filter for (executable name)
:param arch: The architecture to select. 'x64' is one option
"""
ttp = "T1055"
@ -525,8 +528,8 @@ class MetasploitInstant(Metasploit):
def nslookup(self, target, target2, **kwargs):
""" Do a nslookup discovery on the target
@param target: Command runs here
@param target2: This one is looked up
:param target: Command runs here
:param target2: This one is looked up
"""
command = f"execute -f nslookup.exe -H -i -a '{target2.get_ip()}'"
@ -563,8 +566,8 @@ class MetasploitInstant(Metasploit):
def getsystem(self, target, variant=0, **kwargs):
""" Do a network discovery on the target
@param target: Target to attack
@param variant: Variant of getsystem to use. 0 is auto, max is 3
:param target: Target to attack
:param variant: Variant of getsystem to use. 0 is auto, max is 3
"""
command = "getsystem"
@ -688,7 +691,7 @@ Do screen grabbing to collect data on target
"winlogon.exe" will monitor user logins. "explorer.exe" during the session.
@param monitoring_time: Seconds the keylogger is running
:param monitoring_time: Seconds the keylogger is running
"""
command = "keyscan_start"
@ -801,8 +804,8 @@ Get basic system information
def upload(self, target, src, dst, **kwargs):
""" Upload file from metasploit controller to target
@param src: source file name on metasploit controller
@param dst: destination file name on target machine
:param src: source file name on metasploit controller
:param dst: destination file name on target machine
"""
command = f"upload {src} '{dst}' "
@ -841,8 +844,8 @@ Uploading new files to the target. Can be config files, tools, implants, ...
def kiwi(self, target, variant="creds_all", **kwargs):
""" Kiwi is the modern equivalent to mimikatz
@param target: target being attacked
@param variant: kiwi command being used
:param target: target being attacked
:param variant: kiwi command being used
"""
ttp = "t1003"

@ -145,7 +145,7 @@ class PluginManager():
def check(self, plugin):
""" Checks a plugin for valid implementation
@returns: A list of issues
:returns: A list of issues
"""
issues = []

@ -31,6 +31,10 @@ author = 'Thorsten Sick'
# ones.
extensions = ['sphinx.ext.autodoc']
autodoc_default_options = {
'member-order': 'bysource',
}
# Sphinx argparse https://sphinx-argparse.readthedocs.io/en/latest/install.html
extensions += ['sphinxarg.ext']

@ -7,11 +7,11 @@ Attack features of PurpleDome can be extended using a plugin system. Those attac
An example plugin is in the file *hydra_plugin.py*. It contains a plugin class that **MUST** be based on the *AttackPlugin* class.
::
.. attention::
Important: This projects goal is to improve defense. Adding any attack must be done with this goal. To guarantee that:
This projects goal is to improve defense. Adding any attack must be done with this goal in mind. To guarantee that:
* Only add attacks that are already in the wild
* Only add attacks that are already used by malware and attackers
* Link to blog posts describing this attack
* Maybe already drop some ideas how to detect and block
* Or even add code to detect and block it
@ -21,8 +21,6 @@ Usage
To create a new plugin, start a sub-folder in *plugins*. The python file in there must contain a class that inherits from *AttackPlugin*.
There is an example plugin *hydra.py* that you can use as template.
Boilerplate
-----------
@ -33,7 +31,9 @@ The boilerplate contains some basics:
* ttp: The TTP number of this kali attack. See https://attack.mitre.org/ "???" if it is unknown "multiple" if it covers several TTPs
* references. A list of urls to blog posts or similar describing the attack
* required_files: A list. If you ship files with your plugin, listing them here will cause them to be installed on plugin init.
Better than using required_files is to use:
* required_files_attacker: required files to send to the attacker
* required_files_target: required files to send to the target
@ -49,4 +49,6 @@ The plugin class
================
.. autoclass:: plugins.base.attack.AttackPlugin
:members:
:members:
:member-order: bysource
:show-inheritance:

@ -0,0 +1,10 @@
*****************
Plugin base class
*****************
.. autoclass:: plugins.base.plugin_base.BasePlugin
:members:
:member-order: bysource

@ -1,6 +1,6 @@
===========================
Extending the documentation
===========================
=================================
Contributing to the documentation
=================================
The documentation is Sphinx based.

@ -2,18 +2,24 @@
Extending
*********
Modules
=======
I recommend to start contributing code by using the plugin system. But beyond that there is much more you can do.
Code code
=========
Several core module create the system. They are in the *app* folder
* experimentcontrol: Control experiments. This is the central control for everything
* calderacontrol: remote control for Caldera using the Caldera REST API
* calderaapi_2: Direct REST Api to caldera 2.* (deprecated)
* calderaapi_4: Direct REST Api to caldera 4.* (Caldera 4 is alpha)
* calderacontrol: Remote control for Caldera with convenience methods
* metasploit: Metasploit control. Simplifies the basic attack step so they can be used from plugins
* machinecontrol: Create/start and stop VMs. Will call the machinery plugin
* pluginmanager: Plugin manager tasks. Has methods to verify plugin quality as well
* config: Reading and processing configuration files
* attacklog: Logging attack steps and output to stdio
* config_verifier: Verifyies the configuration
* attack_log: Logging attack steps and output to stdio
* doc_generator: Generates human readable documents from attack logs
@ -24,7 +30,9 @@ CalderaControl
Class for Caldera communication
.. autoclass:: app.calderacontrol.CalderaControl
:members:
:members:
:member-order: bysource
:show-inheritance:
----------
Metasploit
@ -33,7 +41,9 @@ Metasploit
Class for Metasploit automation
.. autoclass:: app.metasploit.Metasploit
:members:
:members:
:member-order: bysource
:show-inheritance:
-----------------
MetasploitInstant
@ -42,7 +52,9 @@ MetasploitInstant
Extends. In addition to the communication features from the superclass Metasploit it simplifies basic commands.
.. autoclass:: app.metasploit.MetasploitInstant
:members:
:members:
:member-order: bysource
:show-inheritance:
--------
MSFVenom
@ -51,7 +63,9 @@ MSFVenom
Class for MSFVenom automation
.. autoclass:: app.metasploit.MSFVenom
:members:
:members:
:member-order: bysource
:show-inheritance:
--------------
MachineControl
@ -60,7 +74,9 @@ MachineControl
Class controlling a machine
.. autoclass:: app.machinecontrol.Machine
:members:
:members:
:member-order: bysource
:show-inheritance:
-----------------
ExperimentControl
@ -69,7 +85,9 @@ ExperimentControl
Class controlling the experiment
.. autoclass:: app.experimentcontrol.Experiment
:members:
:members:
:member-order: bysource
:show-inheritance:
------
Config
@ -78,10 +96,14 @@ Config
Internal configuration handling. Currently there are two classes. One for the whole experiment configuration. The second one for machine specific configuration.
.. autoclass:: app.config.ExperimentConfig
:members:
:members:
:member-order: bysource
:show-inheritance:
.. autoclass:: app.config.MachineConfig
:members:
:members:
:member-order: bysource
:show-inheritance:
-------------
PluginManager
@ -90,7 +112,9 @@ PluginManager
Managing plugins
.. autoclass:: app.pluginmanager.PluginManager
:members:
:members:
:member-order: bysource
:show-inheritance:
---------
AttackLog
@ -99,4 +123,6 @@ AttackLog
Attack specific logging
.. autoclass:: app.attack_log.AttackLog
:members:
:members:
:member-order: bysource
:show-inheritance:

@ -4,17 +4,15 @@ Sensor plugins
To experiment with different sensors installed on the targets there is the sensor plugin. It contains a plugin class that **MUST** be based on the *SensorPlugin* class.
The main goal of PurpleDome is to study sensor technology, which data they can collect and how to create an accurate picture of what happens during an attack. So this can be one of the most important plugin classes to extend.
The main goal of PurpleDome is to study sensor technology: what data can be collected and how to create an accurate picture of what happens during an attack. So this can be one of the most important plugin classes to extend.
Usage
=====
To create a new plugin, start a sub-folder in plugins. The python file in there must contain a class that inherits from *SensorPlugin*.
If the plugin is activated for a specific machine specific methods will be called to interact with the target:
If the plugin is activated for a specific machine specific methods will be called to interact with the target. Especially:
* prime: Easly installation steps, can trigger a reboot of the machine by returning True
* install: Normal, simple installation. No reboot
* start: Start the sensor
* stop: Stop the sensor
* collect: Collect results
@ -30,9 +28,26 @@ The boilerplate contains some basics:
Additionally you can set *self.debugit* to True. This will run the sensor on execution in gdb and make the call blocking. So you can debug your sensor.
Method: collect
---------------
This is the essential method you will have to implement. It will collect the data produced by the sensor and make it available to be stored in the zip-results
Method: start
-------------
Also an important method. Will be called before the attack to start the sensor. You will have to implement this. *But* if your collect method just collects log files from the system that are already generated you can also skip that.
Method: stop
------------
Will stop the sensor just prior to calling collect. There could be scenarios where you do not need it.
The sensor plugin class
=======================
.. autoclass:: plugins.base.sensor.SensorPlugin
:members:
:members:
:member-order: bysource
:show-inheritance:

@ -2,7 +2,7 @@
VM Controller plugins
*********************
The experiment being run handles the machines. Those machines can be targets or attacker machines. Different types of machine controllers are covered by those plugins.
The experiment being run handles the machines. Those machines can be targets or attacker machines. Different types of machine controllers are covered by plugins of the type "MachineryPlugin".
A VM plugin handles several things:
@ -32,42 +32,44 @@ The boilerplate contains some basics:
* description. A human readable description for this plugin.
* required_files: A list. If you ship files with your plugin, listing them here will cause them to be installed on plugin init.
Some relevant methods that must be implemented (even if they will not contain code) are:
up
--
There are two sets of commands to implement for machines
Starts the machine
Basic handling:
create
------
* up
* create
* halt
* destroy
* get_state
* get_ip
Creates the machine. Vagrant for example can create machines based on config files.
Communication:
halt
----
* connect
* remote_run
* disconnect
* put
* get
Stop the machine
The communication commands are already implemented in *ssh_features.py* and you can use them they way the vagrant_plugin.py is doing. At least as long as you want to use SSH to communicate (recommended !).
destroy
-------
Destroy the machine
The machinery plugin
====================
get_state
---------
Get the machines state. The class MachineStates contains potential return values
get_ip
------
For a full list of methods read on:
Get the ip of the machine. If the machine is registered at the system resolver (/etc/hosts, dns, ...) a machine name would also be a valid response. As long as the network layer can reach it, everything is fine.
.. autoclass:: plugins.base.machinery.MachineryPlugin
:members:
:member-order: bysource
:show-inheritance:
The plugin class
================
For a full list of methods read on:
The SSH mixin
=============
.. autoclass:: plugins.base.machinery.MachineryPlugin
:members:
.. autoclass:: plugins.base.ssh_features.SSHFeatures
:members:
:member-order: bysource
:show-inheritance:

@ -2,9 +2,15 @@
Vulnerability plugins
*********************
For an attack leave attack traces on a machine it should be vulnerable. Services should run. Old application be installed, users with weak passwords added to the system. You get the idea.
For an attack to leave attack traces the target machie machine should be vulnerable. This means:
For you as a user to be flexible there is a vulnerability plugin type that (surprise !) adds vulnerabilities to targets.
* Services run
* Old and unpatched application are installed
* Users with weak passwords are added to the system
You get the idea.
To get your systems vulnerable there is a vulnerability plugin type that adds vulnerabilities to targets.
This plugin type allows you to punch some holes into the protection of a machine. Which vulnerability plugins are loaded for a specific target is defined in the configuration file. Feel free to weaken the defenses.
@ -24,25 +30,23 @@ The boilerplate contains some basics:
* description: A human readable description for this plugin.
* ttp: The TTP number linked to this vulnerability. See https://attack.mitre.org/ as a hint which TTP this vulnerability could be related to. If you do not know the TTP, use "???"
* references: A list of urls to blog posts or similar describing the vulnerability
* required_files: If you ship files with your plugin, listing them here will cause them to be installed on plugin init.
Method: install (optional)
--------------------------
* required_files: If you ship files with your plugin: listing them here will cause them to be installed on plugin init.
*start* starts the vulnerability on the target. *install* is called before that. If you have to setup anything in the plugin space (and not on the target) do it here.
Method: start
-------------
Starts the vulnerability on the machine. The most important method you can use here is "self.run_cmd" and execute a shell command.
Starts the vulnerability on the machine. The most important method you can use here is "self.run_cmd" and execute a shell command. This must be implemented in your plugin.
Method: stop
------------
Undo the changes after the attacks ran. If the machine is re-used (and not re-built or run from a snapshot) this will make it simpler for the user to run more experiments on slightly modified systems.
Undo the changes after the attacks ran. If the machine is re-used (and not re-built or run from a snapshot) this will make it simpler for the user to run more experiments on slightly modified systems. This must be implemented in your plugin. Even if is just an empty method that does nothing.
The plugin class
================
.. autoclass:: plugins.base.vulnerability_plugin.VulnerabilityPlugin
:members:
:members:
:member-order: bysource
:show-inheritance:

@ -31,6 +31,8 @@ Welcome to the Purple Dome documentation
extending/vm_controller_plugins
extending/base_plugins
extending/extending
extending/documentation

@ -22,18 +22,16 @@ class AttackPlugin(BasePlugin):
""" Class to execute a command on a kali system targeting another system """
# Boilerplate
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
# name: Optional[str] = None
# description: Optional[str] = None
ttp: Optional[str] = None #: TTP of this attack. Or ??? if unknown
references = None # A list of urls or other references
required_files: list[str] = [] # Better use the other required_files features
required_files_attacker: list[str] = [] # a list of files to automatically install to the attacker
required_files_target: list[str] = [] # a list of files to automatically copy to the targets
required_files_attacker: list[str] = [] #: A list of files to automatically install to the attacker
required_files_target: list[str] = [] #: A list of files to automatically copy to the targets
requirements: Optional[list[Requirement]] = [] # Requirements to run this plugin
# TODO: parse results
requirements: Optional[list[Requirement]] = [] #: Requirements to run this plugin, Available are METASPLOIT and CALDERA at the moment
def __init__(self):
super().__init__()
@ -48,20 +46,48 @@ class AttackPlugin(BasePlugin):
self.metasploit_user: str = "user"
self.metasploit = None
def run(self, targets: list[str]):
""" The attack is ran here. This method **must be implemented**
@param targets: A list of targets, ip addresses will do
"""
raise NotImplementedError
def install(self): # pylint: disable=no-self-use
""" Install and setup requirements for the attack
This step is *optional*
"""
return None
def needs_caldera(self) -> bool:
""" Returns True if this plugin has Caldera in the requirements """
""" Returns True if this plugin has Caldera in the requirements
:meta private:
:returns: True if this plugin requires Caldera
"""
if Requirement.CALDERA in self.requirements:
return True
return False
def needs_metasploit(self) -> bool:
""" Returns True if this plugin has Metasploit in the requirements """
""" Returns True if this plugin has Metasploit in the requirements
:meta private:
:returns: True if this plugin requires Metasploit
"""
if Requirement.METASPLOIT in self.requirements:
return True
return False
def connect_metasploit(self):
""" Inits metasploit """
""" Inits metasploit
:meta private:
"""
if self.needs_metasploit():
self.metasploit = MetasploitInstant(self.metasploit_password,
@ -71,7 +97,11 @@ class AttackPlugin(BasePlugin):
# If metasploit requirements are not set, self.metasploit stay None and using metasploit from a plugin not having the requirements will trigger an exception
def copy_to_attacker_and_defender(self):
""" Copy attacker/defender specific files to the machines. Called by setup, do not call it yourself. template processing happens before """
""" Copy attacker/defender specific files to the machines. Called by setup, do not call it yourself. template processing happens before
:meta private:
"""
for a_file in self.required_files_attacker:
src = os.path.join(os.path.dirname(self.plugin_path), a_file)
@ -81,14 +111,18 @@ class AttackPlugin(BasePlugin):
# TODO: add target(s)
def teardown(self):
""" Cleanup afterwards """
""" Cleanup afterwards
This is an *optional* method which is called after the attack. If you want to do some cleanup in your plugin, implement it.
"""
pass # pylint: disable=unnecessary-pass
def attacker_run_cmd(self, command: str, disown: bool = False) -> str:
""" Execute a command on the attacker
@param command: Command to execute
@param disown: Run in background
:param command: Command to execute
:param disown: Run in background
"""
if self.attacker_machine_plugin is None:
@ -102,8 +136,8 @@ class AttackPlugin(BasePlugin):
def targets_run_cmd(self, command: str, disown: bool = False) -> str:
""" Execute a command on the target
@param command: Command to execute
@param disown: Run in background
:param command: Command to execute
:param disown: Run in background
"""
if self.target_machine_plugin is None:
@ -117,7 +151,7 @@ class AttackPlugin(BasePlugin):
def set_target_machines(self, machine: MachineryPlugin):
""" Set the machine to target
@param machine: Machine plugin to communicate with
:param machine: Machine plugin to communicate with
"""
self.target_machine_plugin = machine.vm_manager
@ -125,7 +159,7 @@ class AttackPlugin(BasePlugin):
def set_attacker_machine(self, machine: MachineryPlugin):
""" Set the machine plugin class to target
@param machine: Machine to communicate with
:param machine: Machine to communicate with
"""
self.attacker_machine_plugin = machine.vm_manager
@ -142,9 +176,9 @@ class AttackPlugin(BasePlugin):
def caldera_attack(self, target: MachineryPlugin, ability_id: str, parameters=None, **kwargs):
""" Attack a single target using caldera
@param target: Target machine object
@param ability_id: Ability or caldera ability to run
@param parameters: parameters to pass to the ability
:param target: Target machine object
:param ability_id: Ability or caldera ability to run
:param parameters: parameters to pass to the ability
"""
if not self.needs_caldera():
@ -158,10 +192,12 @@ class AttackPlugin(BasePlugin):
**kwargs
)
def get_attacker_playground(self):
def get_attacker_playground(self) -> str:
""" Returns the attacker machine specific playground
Which is the folder on the machine where we run our tasks in
This is the folder on the machine where we run our tasks in
:returns: playground on the attacker (path, str)
"""
if self.attacker_machine_plugin is None:
@ -169,22 +205,11 @@ class AttackPlugin(BasePlugin):
return self.attacker_machine_plugin.get_playground()
def run(self, targets: list[str]):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
raise NotImplementedError
def install(self): # pylint: disable=no-self-use
""" Install and setup requirements for the attack
"""
return None
def __execute__(self, targets):
""" Execute the plugin. This is called by the code
:meta private:
@param targets: A list of targets => machines
"""
@ -198,14 +223,22 @@ class AttackPlugin(BasePlugin):
return res
def get_ttp(self):
""" Returns the ttp of the plugin, please set in boilerplate """
""" Returns the ttp of the plugin, please set in boilerplate
:meta private:
"""
if self.ttp:
return self.ttp
raise NotImplementedError
def get_references(self):
""" Returns the references of the plugin, please set in boilerplate """
""" Returns the references of the plugin, please set in boilerplate
:meta private:
"""
if self.references:
return self.references

@ -1,66 +0,0 @@
#!/usr/bin/env python3
""" Base class for Caldera plugins
Special for this plugin class: If there is no plugin matching a specified attack-id the system can fallback to default handling.
You only gotta write a plugin if you want some special features
"""
from typing import Optional
from plugins.base.plugin_base import BasePlugin
class CalderaPlugin(BasePlugin):
""" Class to execute a command on a caldera system targeting another system """
# Boilerplate
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
required_files: list[str] = []
# TODO: parse results
def __init__(self):
super().__init__()
self.conf: dict = {} # Plugin specific configuration
# self.sysconf = {} # System configuration. common for all plugins
def teardown(self):
""" Cleanup afterwards """
pass # pylint: disable=unnecessary-pass
def run(self, targets: list[str]):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
raise NotImplementedError
def __execute__(self, targets: list[str]) -> str:
""" Execute the plugin. This is called by the code
@param targets: A list of targets, ip addresses will do
"""
self.setup()
self.attack_logger.start_kali_attack(self.machine_plugin.config.vmname(), targets, self.name, ttp=self.get_ttp())
res = self.run(targets)
self.teardown()
self.attack_logger.stop_kali_attack(self.machine_plugin.config.vmname(), targets, self.name, ttp=self.get_ttp())
return res
def get_ttp(self):
""" Returns the ttp of the plugin, please set in boilerplate """
if self.ttp:
return self.ttp
raise NotImplementedError
def get_references(self):
""" Returns the references of the plugin, please set in boilerplate """
if self.references:
return self.references
raise NotImplementedError

@ -4,7 +4,7 @@
from enum import Enum
import os
from typing import Optional
# from typing import Optional
from app.config import MachineConfig
from app.interface_sfx import CommandlineColors
from plugins.base.plugin_base import BasePlugin
@ -27,9 +27,9 @@ class MachineryPlugin(BasePlugin):
""" Class to control virtual machines, vagrant, .... """
# Boilerplate
name: Optional[str] = None
# name: Optional[str] = None
required_files: list[str] = []
# required_files: list[str] = []
###############
# This is stuff you might want to implement
@ -42,7 +42,7 @@ class MachineryPlugin(BasePlugin):
def create(self, reboot: bool = True):
""" Create a machine
@param reboot: Optionally reboot the machine after creation
@param reboot: Reboot the machine after creation
"""
raise NotImplementedError
@ -59,36 +59,49 @@ class MachineryPlugin(BasePlugin):
raise NotImplementedError
def connect(self):
""" Connect to a machine """
""" Connect to a machine
If you want to use SSH, check out the class SSHFeatures, it is already implemented there
"""
raise NotImplementedError
def remote_run(self, cmd: str, disown: bool = False):
""" Connects to the machine and runs a command there
If you want to use SSH, check out the class SSHFeatures, it is already implemented there
@param cmd: command to run int he machine's shell
@param disown: Send the connection into background
:param cmd: command to run int he machine's shell
:param disown: Send the connection into background
"""
raise NotImplementedError
def disconnect(self):
""" Disconnect from a machine """
""" Disconnect from a machine
If you want to use SSH, check out the class SSHFeatures, it is already implemented there
"""
raise NotImplementedError
def put(self, src: str, dst: str):
""" Send a file to a machine
@param src: source dir
@param dst: destination
If you want to use SSH, check out the class SSHFeatures, it is already implemented there
:param src: source dir
:param dst: destination
"""
raise NotImplementedError
def get(self, src: str, dst: str):
""" Get a file to a machine
@param src: source dir
@param dst: destination
If you want to use SSH, check out the class SSHFeatures, it is already implemented there
:param src: source dir
:param dst: destination
"""
raise NotImplementedError
@ -96,12 +109,16 @@ class MachineryPlugin(BasePlugin):
""" Returns if the machine is running """
return self.get_state() == MachineStates.RUNNING
def get_state(self):
def get_state(self) -> MachineStates:
""" Get detailed state of a machine """
raise NotImplementedError
def get_ip(self):
""" Return the IP of the machine. If there are several it should be the one accepting ssh or similar. If a resolver is running, a domain is also ok. """
def get_ip(self) -> str:
""" Return the IP of the machine.
If there are several it should be the one accepting ssh or similar. If a resolver is running, a machine name is also ok as return value.
:returns: machine name or ip. Some handle we can use to get a network connection to this machine
"""
raise NotImplementedError
def get_playground(self):

@ -12,10 +12,10 @@ import app.exceptions # type: ignore
class BasePlugin():
""" Base class for plugins """
required_files: list[str] = [] # a list of files shipped with the plugin to be installed
name: str = "" # The name of the plugin
alternative_names: list[str] = [] # The is an optional list of alternative names
description: Optional[str] = None # The description of this plugin
required_files: list[str] = [] #: a list of files shipped with the plugin to be installed
name: Optional[str] = None #: The name of the plugin
alternative_names: list[str] = [] #: An optional list of alternative names
description: Optional[str] = None #: The description of this plugin
def __init__(self) -> None:
# self.machine = None
@ -27,20 +27,68 @@ class BasePlugin():
self.default_config_name = "default_config.yaml"
def get_filename(self):
""" Returns the current filename. """
def run_cmd(self, command: str, disown: bool = False):
""" Execute a command on the vm using the connection
:param command: Command to execute
:param disown: Run in background
"""
if self.machine_plugin is None:
raise PluginError("machine to run command on is not registered")
self.vprint(f" Plugin running command {command}", 3)
res = self.machine_plugin.__call_remote_run__(command, disown=disown)
return res
def copy_to_machine(self, filename: str):
""" Copies a file shipped with the plugin to the machine share folder
:param filename: File from the plugin folder to copy to the machine share.
"""
if self.machine_plugin is not None:
self.machine_plugin.put(filename, self.machine_plugin.get_playground())
else:
raise PluginError("Missing machine")
def get_from_machine(self, src: str, dst: str):
""" Get a file from the machine
:param src: source file name on the machine
:param dst: destination filename on the host
"""
if self.machine_plugin is not None:
self.machine_plugin.get(src, dst) # nosec
else:
raise PluginError("Missing machine")
def get_filename(self) -> str:
""" Returns the current filename. This can be used for debugging
:meta private:
:returns: Filename of currenty executed py file
"""
cf = currentframe() # pylint: disable=invalid-name
return cf.f_back.filename
def get_linenumber(self):
""" Returns the current linenumber. """
def get_linenumber(self) -> int:
""" Returns the current linenumber. This can be used for debugging
:returns: currently executed linenumber
"""
cf = currentframe() # pylint: disable=invalid-name
return cf.f_back.f_lineno
def get_playground(self):
""" Returns the machine specific playground
def get_playground(self) -> str:
""" Returns the machine specific playground path name
This is the folder on the machine where we run our tasks in
Which is the folder on the machine where we run our tasks in
:returns: playground path on the target machine
"""
if self.machine_plugin is None:
@ -49,21 +97,38 @@ class BasePlugin():
return self.machine_plugin.get_playground()
def set_logger(self, attack_logger):
""" Set the attack logger for this machine """
""" Set the attack logger for this machine
:meta private:
:param attack_logger: Attack logger object
"""
self.attack_logger = attack_logger
def process_templates(self): # pylint: disable=no-self-use
""" A method you can optionally implement to transfer your jinja2 templates into the files yo want to send to the target. See 'required_files' """
""" A method you can optionally implement to transfer your jinja2 templates into the files yo want to send to the target. See 'required_files'
:meta private:
"""
return
def copy_to_attacker_and_defender(self): # pylint: disable=no-self-use
""" Copy attacker/defender specific files to the machines """
""" Copy attacker/defender specific files to the machines
:meta private:
"""
return
def setup(self):
""" Prepare everything for the plugin """
""" Prepare everything for the plugin
:meta private:
"""
self.process_templates()
@ -77,7 +142,9 @@ class BasePlugin():
def set_machine_plugin(self, machine_plugin):
""" Set the machine plugin class to communicate with
@param machine_plugin: Machine plugin to communicate with
:meta private:
:param machine_plugin: Machine plugin to communicate with
"""
self.machine_plugin = machine_plugin
@ -85,7 +152,9 @@ class BasePlugin():
def set_sysconf(self, config): # pylint:disable=unused-argument
""" Set system config
@param config: A dict with system configuration relevant for all plugins
:meta private:
:param config: A dict with system configuration relevant for all plugins
"""
# self.sysconf["abs_machinepath_internal"] = config["abs_machinepath_internal"]
@ -95,55 +164,34 @@ class BasePlugin():
def process_config(self, config: dict):
""" process config and use defaults if stuff is missing
@param config: The config dict
:meta private:
:param config: The config dict
"""
# TODO: Move to python 3.9 syntax z = x | y
self.conf = {**self.conf, **config}
def copy_to_machine(self, filename: str):
""" Copies a file shipped with the plugin to the machine share folder
@param filename: File from the plugin folder to copy to the machine share.
"""
if self.machine_plugin is not None:
self.machine_plugin.put(filename, self.machine_plugin.get_playground())
else:
raise PluginError("Missing machine")
def get_from_machine(self, src: str, dst: str):
""" Get a file from the machine """
if self.machine_plugin is not None:
self.machine_plugin.get(src, dst) # nosec
else:
raise PluginError("Missing machine")
def run_cmd(self, command: str, disown: bool = False):
""" Execute a command on the vm using the connection
@param command: Command to execute
@param disown: Run in background
"""
if self.machine_plugin is None:
raise PluginError("machine to run command on is not registered")
self.vprint(f" Plugin running command {command}", 3)
def get_name(self) -> str:
""" Returns the name of the plugin
res = self.machine_plugin.__call_remote_run__(command, disown=disown)
return res
This method checks the boilerplate for the name
def get_name(self):
""" Returns the name of the plugin, please set in boilerplate """
:returns: The plugin name
"""
if self.name:
return self.name
raise NotImplementedError
def get_names(self) -> list[str]:
""" Adds the name of the plugin to the alternative names and returns the list """
""" Returns a list of names and nicknames for a plugin.
Please set that in the boilerplate
:returns: A list of potential names
"""
res = set()
@ -158,25 +206,42 @@ class BasePlugin():
raise NotImplementedError
def get_description(self):
""" Returns the description of the plugin, please set in boilerplate """
def get_description(self) -> str:
""" Returns the description of the plugin, please set it in boilerplate
:returns: The description of the plugin
"""
if self.description:
return self.description
raise NotImplementedError
def get_plugin_path(self):
""" Returns the path the plugin file(s) are stored in """
def get_plugin_path(self) -> str:
""" Returns the path the plugin file(s) are stored in
:meta private:
:returns: The path with the plugin code
"""
return os.path.join(os.path.dirname(self.plugin_path))
def get_default_config_filename(self):
""" Generate the default filename of the default configuration file """
def get_default_config_filename(self) -> str:
""" Generate the default filename of the default configuration file
:meta private:
:returns: The filename of the default config
"""
return os.path.join(self.get_plugin_path(), self.default_config_name)
def get_raw_default_config(self):
""" Returns the default config as string. Usable as an example and for documentation """
def get_raw_default_config(self) -> str:
""" Returns the default config as string. Usable as an example and for documentation
:meta private:
"""
if os.path.isfile(self.get_default_config_filename()):
with open(self.get_default_config_filename(), "rt") as fh:
@ -185,7 +250,11 @@ class BasePlugin():
return f"# The plugin {self.get_name()} does not support configuration"
def load_default_config(self):
""" Reads and returns the default config as dict """
""" Reads and returns the default config as dict
:meta private:
"""
filename = self.get_default_config_filename()
@ -202,12 +271,22 @@ class BasePlugin():
def get_config_section_name(self) -> str:
""" Returns the name for the config sub-section to use for this plugin.
Defaults to the name of the plugin. This method should be overwritten if it gets more complicated """
:meta private:
Defaults to the name of the plugin. This method should be overwritten if it gets more complicated
:returns: The name of the config section
"""
return self.get_name()
def main_path(self) -> str: # pylint:disable=no-self-use
""" Returns the main path of the Purple Dome installation """
""" Returns the main path of the Purple Dome installation
:meta private:
:returns: the main path
"""
app_dir = os.path.dirname(app.exceptions.__file__)
return os.path.split(app_dir)[0]
@ -220,8 +299,8 @@ class BasePlugin():
2: Detailed progress information
3: Debug logs, data dumps, everything
@param text: The text to print
@param verbosity: the verbosity level the text has.
:param text: The text to print
:param verbosity: the verbosity level the text has.
"""
if self.attack_logger is not None:
self.attack_logger.vprint(text, verbosity)

@ -2,7 +2,7 @@
""" A base plugin class for sensors. Anything installed on the target to collect system information and identify the attack """
import os
from typing import Optional
# from typing import Optional
from plugins.base.plugin_base import BasePlugin
@ -13,54 +13,60 @@ class SensorPlugin(BasePlugin):
"""
# Boilerplate
name: Optional[str] = None
# name: Optional[str] = None
required_files: list[str] = []
# required_files: list[str] = []
def __init__(self):
super().__init__() # pylint:disable=useless-super-delegation
self.debugit = False
def prime(self) -> bool: # pylint: disable=no-self-use
""" prime sets hard core configs in the target. You can use it to call everything that permanently alters the OS by settings.
If your prime function returns True the machine will be rebooted after prime-ing it. This is very likely what you want. Only use prime if install is not sufficient.
def start(self) -> bool: # pylint: disable=unused-argument, no-self-use
""" Start the sensor. This is *optional* if your sensor is 'just collecting default OS logs' or something similar
:returns: Currently, the return value is ignored. Set to True.
"""
return False
return True
def install(self) -> bool: # pylint: disable=no-self-use
""" Install the sensor. Executed on the target. Take the sensor from the share and (maybe) copy it to its destination. Do some setup
def stop(self) -> bool: # pylint: disable=no-self-use
""" Stop the sensor. This is *optional* if you do not have to stop the sensor before collecting
:returns: Currently, the return value is ignored. Set to True.
"""
return True
def start(self, disown=None) -> bool: # pylint: disable=unused-argument, no-self-use
""" Start the sensor. The connection to the client is disowned here. = Sent to background. This keeps the process running.
def collect(self, path: str) -> list[str]:
""" Collect data from the sensor. Copy it from sensor collection dir on target OS to the share.
This step is essential: We want the data.
@param disown: Send async into background
:param path: The path to copy the data into
:returns: A list of files to put into the loot zip
"""
raise NotImplementedError
return True
def prime(self) -> bool: # pylint: disable=no-self-use
""" prime sets hard core configs in the target. You can use it to call everything that permanently alters the OS by settings.
If your prime function returns True the machine will be rebooted after prime-ing it. This is very likely what you want. Only use prime if install is not sufficient.
"""
def stop(self) -> bool: # pylint: disable=no-self-use
""" Stop the sensor """
return False
def install(self) -> bool: # pylint: disable=no-self-use
""" Install the sensor. Executed on the target. Take the sensor from the share and (maybe) copy it to its destination. Do some setup
"""
return True
def __call_collect__(self, machine_path: str):
""" Generate the data collect command
:meta private:
@param machine_path: Machine specific path to collect data into
"""
path = os.path.join(machine_path, "sensors", self.name) # type: ignore
os.makedirs(path)
return self.collect(path)
def collect(self, path: str) -> list[str]:
""" Collect data from sensor. Copy it from sensor collection dir on target OS to the share
@param path: The path to copy the data into
@returns: A list of files to put into the loot zip
"""
raise NotImplementedError

@ -65,8 +65,8 @@ class SSHFeatures(BasePlugin):
def remote_run(self, cmd: str, disown: bool = False):
""" Connects to the machine and runs a command there
@param cmd: The command to execute
@param disown: Send the connection into background
:param cmd: The command to execute
:param disown: Send the connection into background
"""
if cmd is None:
@ -109,8 +109,8 @@ class SSHFeatures(BasePlugin):
def put(self, src: str, dst: str):
""" Send a file to a machine
@param src: source dir
@param dst: destination
:param src: source dir
:param dst: destination
"""
self.connect()
@ -153,8 +153,8 @@ class SSHFeatures(BasePlugin):
def get(self, src: str, dst: str):
""" Get a file to a machine
@param src: source dir
@param dst: destination
:param src: source dir
:param dst: destination
"""
self.connect()
res = None

@ -11,53 +11,71 @@ class VulnerabilityPlugin(BasePlugin):
"""
# Boilerplate
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
# name: Optional[str] = None
# description: Optional[str] = None
ttp: Optional[str] = None #: The TTP of this vulnerability
references: Optional[list[str]] = None #: References (links) to external sources
required_files: list[str] = []
# required_files: list[str] = []
def __init__(self):
super().__init__() # pylint:disable=useless-super-delegation
self.debugit = False
def prime(self):
""" Early install. Can reboot the machine if it returns True after installation. """
def start(self):
""" Starts the vulnerability on the machine. The most important method you can use here is "self.run_cmd" and execute a shell command.
This must be implemented by the plugin."""
# It is ok if install is empty. But this function here is the core. So implement it !
raise NotImplementedError
def stop(self):
""" Modifying the target machine and remove the vulnerability after the attacks ran.
This must be implemented by the plugin.
"""
# Must be implemented. If you want to leave a mess create an empty function and be honest :-)
raise NotImplementedError
def prime(self) -> bool:
""" *Optional* Early install phase.
Use this if install is not sufficient. This method is called int he first install phase and can reboot the tagret machine.
:return: True to reboot the machine after installation. False is the default
"""
return False
def install(self, machine_plugin=None):
""" This is setting up everything up to the point where the machine itself would be modified. But system
modification is done by start
""" *Optional* This installs the vulnerability.
If the modification is very small, you can also just do that during start.
This method is executed in the second install phase. It can **not** reboot the machine. Using install is preferred to using *prime*
@param machine_plugin: Optional: you can already set the machine to use
:param machine_plugin: Optional: you can already set the machine to use
"""
if machine_plugin:
self.machine_plugin = machine_plugin
def start(self):
""" Modifying the target machine and add the vulnerability """
# It is ok if install is empty. But this function here is the core. So implement it !
raise NotImplementedError
def stop(self):
""" Modifying the target machine and remove the vulnerability """
def get_ttp(self):
""" Returns the ttp of the plugin, please set in boilerplate
# Must be implemented. If you want to leave a mess create an empty function and be honest :-)
raise NotImplementedError
:meta private:
def get_ttp(self):
""" Returns the ttp of the plugin, please set in boilerplate """
"""
if self.ttp:
return self.ttp
raise NotImplementedError
def get_references(self):
""" Returns the references of the plugin, please set in boilerplate """
""" Returns the references of the plugin, please set in boilerplate
:meta private:
"""
if self.references:
return self.references

Loading…
Cancel
Save