|
|
|
@ -5,6 +5,7 @@
|
|
|
|
|
import yaml
|
|
|
|
|
|
|
|
|
|
from app.exceptions import ConfigurationError
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# So the config being read is distributed into several files and they will have different formats (yaml, CACAO)
|
|
|
|
@ -44,12 +45,12 @@ class MachineConfig():
|
|
|
|
|
if vmcontroller not in ["vagrant", "running_vm"]:
|
|
|
|
|
raise ConfigurationError
|
|
|
|
|
|
|
|
|
|
def vmname(self):
|
|
|
|
|
def vmname(self) -> str:
|
|
|
|
|
""" Returns the vmname """
|
|
|
|
|
|
|
|
|
|
return self.raw_config["vm_name"]
|
|
|
|
|
|
|
|
|
|
def get_nicknames(self):
|
|
|
|
|
def get_nicknames(self) -> list[str]:
|
|
|
|
|
""" Gets the nicknames """
|
|
|
|
|
|
|
|
|
|
if "nicknames" in self.raw_config:
|
|
|
|
@ -57,88 +58,88 @@ class MachineConfig():
|
|
|
|
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
def vmcontroller(self):
|
|
|
|
|
def vmcontroller(self) -> str:
|
|
|
|
|
""" Returns the vm controller. lowercase """
|
|
|
|
|
|
|
|
|
|
return self.raw_config["vm_controller"]["type"].lower()
|
|
|
|
|
|
|
|
|
|
def vm_ip(self):
|
|
|
|
|
def vm_ip(self) -> str:
|
|
|
|
|
""" Return the configured ip/domain name (whatever is needed to reach the machine). Returns None if missing """
|
|
|
|
|
try:
|
|
|
|
|
return self.raw_config["vm_controller"]["ip"]
|
|
|
|
|
except KeyError:
|
|
|
|
|
return self.vmname()
|
|
|
|
|
|
|
|
|
|
def os(self): # pylint: disable=invalid-name
|
|
|
|
|
def os(self) -> str: # pylint: disable=invalid-name
|
|
|
|
|
""" returns the os. lowercase """
|
|
|
|
|
|
|
|
|
|
return self.raw_config["os"].lower()
|
|
|
|
|
|
|
|
|
|
def use_existing_machine(self):
|
|
|
|
|
def use_existing_machine(self) -> bool:
|
|
|
|
|
""" Returns if we want to use the existing machine """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("use_existing_machine", False)
|
|
|
|
|
|
|
|
|
|
def machinepath(self):
|
|
|
|
|
def machinepath(self) -> str:
|
|
|
|
|
""" Returns the machine path. If not configured it will fall back to the vm_name """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("machinepath", self.vmname())
|
|
|
|
|
|
|
|
|
|
def get_playground(self):
|
|
|
|
|
def get_playground(self) -> Optional[str]:
|
|
|
|
|
""" Returns the machine specific playground where all the implants and tools will be installed """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("playground", None)
|
|
|
|
|
|
|
|
|
|
def caldera_paw(self):
|
|
|
|
|
def caldera_paw(self) -> Optional[str]:
|
|
|
|
|
""" Returns the paw (caldera id) of the machine """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("paw", None)
|
|
|
|
|
|
|
|
|
|
def caldera_group(self):
|
|
|
|
|
def caldera_group(self) -> Optional[str]:
|
|
|
|
|
""" Returns the group (caldera group id) of the machine """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("group", None)
|
|
|
|
|
|
|
|
|
|
def ssh_keyfile(self):
|
|
|
|
|
def ssh_keyfile(self) -> Optional[str]:
|
|
|
|
|
""" Returns the configured SSH keyfile """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("ssh_keyfile", None)
|
|
|
|
|
|
|
|
|
|
def ssh_user(self):
|
|
|
|
|
def ssh_user(self) -> str:
|
|
|
|
|
""" Returns configured ssh user or "vagrant" as default """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("ssh_user", "vagrant")
|
|
|
|
|
|
|
|
|
|
def ssh_password(self):
|
|
|
|
|
def ssh_password(self) -> Optional[str]:
|
|
|
|
|
""" Returns configured ssh password or None as default """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("ssh_password", None)
|
|
|
|
|
|
|
|
|
|
def halt_needs_force(self):
|
|
|
|
|
def halt_needs_force(self) -> bool:
|
|
|
|
|
""" Returns if halting the machine needs force False as default """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("halt_needs_force", False)
|
|
|
|
|
|
|
|
|
|
def vagrantfilepath(self):
|
|
|
|
|
def vagrantfilepath(self) -> str:
|
|
|
|
|
""" Vagrant specific config: The vagrant file path """
|
|
|
|
|
|
|
|
|
|
if "vagrantfilepath" not in self.raw_config["vm_controller"]:
|
|
|
|
|
raise ConfigurationError("Vagrantfilepath missing")
|
|
|
|
|
return self.raw_config["vm_controller"]["vagrantfilepath"]
|
|
|
|
|
|
|
|
|
|
def sensors(self):
|
|
|
|
|
def sensors(self) -> list[str]:
|
|
|
|
|
""" Return a list of sensors configured for this machine """
|
|
|
|
|
if "sensors" in self.raw_config:
|
|
|
|
|
return self.raw_config["sensors"] or []
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
def vulnerabilities(self):
|
|
|
|
|
def vulnerabilities(self) -> list[str]:
|
|
|
|
|
""" Return a list of vulnerabilities configured for this machine """
|
|
|
|
|
if "vulnerabilities" in self.raw_config:
|
|
|
|
|
return self.raw_config["vulnerabilities"] or []
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
def is_active(self):
|
|
|
|
|
def is_active(self) -> bool:
|
|
|
|
|
""" Returns if this machine is set to active. Default is true """
|
|
|
|
|
|
|
|
|
|
return self.raw_config.get("active", True)
|
|
|
|
@ -153,7 +154,7 @@ class ExperimentConfig():
|
|
|
|
|
@param configfile: The configuration file to process
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
self.raw_config = None
|
|
|
|
|
self.raw_config: Optional[dict] = None
|
|
|
|
|
self._targets: list[MachineConfig] = []
|
|
|
|
|
self._attackers: list[MachineConfig] = []
|
|
|
|
|
self.load(configfile)
|
|
|
|
@ -203,14 +204,20 @@ class ExperimentConfig():
|
|
|
|
|
|
|
|
|
|
return self.attackers()[mid]
|
|
|
|
|
|
|
|
|
|
def caldera_apikey(self):
|
|
|
|
|
def caldera_apikey(self) -> str:
|
|
|
|
|
""" Returns the caldera apikey """
|
|
|
|
|
|
|
|
|
|
if self.raw_config is None:
|
|
|
|
|
raise ConfigurationError("Config file is empty")
|
|
|
|
|
|
|
|
|
|
return self.raw_config["caldera"]["apikey"]
|
|
|
|
|
|
|
|
|
|
def loot_dir(self):
|
|
|
|
|
def loot_dir(self) -> str:
|
|
|
|
|
""" Returns the loot dir """
|
|
|
|
|
|
|
|
|
|
if self.raw_config is None:
|
|
|
|
|
raise ConfigurationError("Config file is empty")
|
|
|
|
|
|
|
|
|
|
if "results" not in self.raw_config or self.raw_config["results"] is None:
|
|
|
|
|
raise ConfigurationError("results missing in configuration")
|
|
|
|
|
try:
|
|
|
|
@ -219,7 +226,7 @@ class ExperimentConfig():
|
|
|
|
|
raise ConfigurationError("results/loot_dir not properly set in configuration") from error
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
def attack_conf(self, attack: str):
|
|
|
|
|
def attack_conf(self, attack: str) -> dict:
|
|
|
|
|
""" Get kali config for a specific kali attack
|
|
|
|
|
|
|
|
|
|
@param attack: Name of the attack to look up config for
|
|
|
|
@ -238,25 +245,31 @@ class ExperimentConfig():
|
|
|
|
|
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
def get_caldera_obfuscator(self):
|
|
|
|
|
def get_caldera_obfuscator(self) -> str:
|
|
|
|
|
""" Get the caldera configuration. In this case: The obfuscator. Will default to plain-text """
|
|
|
|
|
|
|
|
|
|
if self.raw_config is None:
|
|
|
|
|
raise ConfigurationError("Config file is empty")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
res = self.raw_config["caldera_conf"]["obfuscator"]
|
|
|
|
|
except KeyError:
|
|
|
|
|
return "plain-text"
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
def get_caldera_jitter(self):
|
|
|
|
|
def get_caldera_jitter(self) -> str:
|
|
|
|
|
""" Get the caldera configuration. In this case: Jitter. Will default to 4/8 """
|
|
|
|
|
|
|
|
|
|
if self.raw_config is None:
|
|
|
|
|
raise ConfigurationError("Config file is empty")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
res = self.raw_config["caldera_conf"]["jitter"]
|
|
|
|
|
except KeyError:
|
|
|
|
|
return "4/8"
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
def get_plugin_based_attacks(self, for_os: str):
|
|
|
|
|
def get_plugin_based_attacks(self, for_os: str) -> list[str]:
|
|
|
|
|
""" Get the configured kali attacks to run for a specific OS
|
|
|
|
|
|
|
|
|
|
@param for_os: The os to query the registered attacks for
|
|
|
|
@ -264,6 +277,7 @@ class ExperimentConfig():
|
|
|
|
|
|
|
|
|
|
if self.raw_config is None:
|
|
|
|
|
raise ConfigurationError("Config file is empty")
|
|
|
|
|
|
|
|
|
|
if "plugin_based_attacks" not in self.raw_config:
|
|
|
|
|
return []
|
|
|
|
|
if for_os not in self.raw_config["plugin_based_attacks"]:
|
|
|
|
@ -273,7 +287,7 @@ class ExperimentConfig():
|
|
|
|
|
return []
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
def get_caldera_attacks(self, for_os: str):
|
|
|
|
|
def get_caldera_attacks(self, for_os: str) -> list:
|
|
|
|
|
""" Get the configured caldera attacks to run for a specific OS
|
|
|
|
|
|
|
|
|
|
@param for_os: The os to query the registered attacks for
|
|
|
|
@ -281,6 +295,7 @@ class ExperimentConfig():
|
|
|
|
|
|
|
|
|
|
if self.raw_config is None:
|
|
|
|
|
raise ConfigurationError("Config file is empty")
|
|
|
|
|
|
|
|
|
|
if "caldera_attacks" not in self.raw_config:
|
|
|
|
|
return []
|
|
|
|
|
if for_os not in self.raw_config["caldera_attacks"]:
|
|
|
|
@ -290,15 +305,18 @@ class ExperimentConfig():
|
|
|
|
|
return []
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
def get_nap_time(self):
|
|
|
|
|
def get_nap_time(self) -> int:
|
|
|
|
|
""" Returns the attackers nap time between attack steps """
|
|
|
|
|
|
|
|
|
|
if self.raw_config is None:
|
|
|
|
|
raise ConfigurationError("Config file is empty")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
return self.raw_config["attacks"]["nap_time"]
|
|
|
|
|
return int(self.raw_config["attacks"]["nap_time"])
|
|
|
|
|
except KeyError:
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
def get_sensor_config(self, name: str):
|
|
|
|
|
def get_sensor_config(self, name: str) -> dict:
|
|
|
|
|
""" Return the config for a specific sensor
|
|
|
|
|
|
|
|
|
|
@param name: name of the sensor
|
|
|
|
@ -306,6 +324,7 @@ class ExperimentConfig():
|
|
|
|
|
|
|
|
|
|
if self.raw_config is None:
|
|
|
|
|
raise ConfigurationError("Config file is empty")
|
|
|
|
|
|
|
|
|
|
if "sensors" not in self.raw_config:
|
|
|
|
|
return {}
|
|
|
|
|
if self.raw_config["sensors"] is None: # Better for unit tests that way.
|
|
|
|
|