diff --git a/app/config.py b/app/config.py index 9e95ca8..43d6641 100644 --- a/app/config.py +++ b/app/config.py @@ -4,6 +4,7 @@ from typing import Optional import yaml +from app.config_verifier import MainConfig from app.exceptions import ConfigurationError @@ -18,7 +19,7 @@ from app.exceptions import ConfigurationError class MachineConfig(): """ Sub config for a specific machine""" - def __init__(self, machinedata: dict): + def __init__(self, machinedata): """ Init machine control config @param machinedata: dict containing machine data @@ -27,53 +28,46 @@ class MachineConfig(): raise ConfigurationError self.raw_config = machinedata - self.verify() - - def verify(self): - """ Verify essential data is present """ - try: - self.vmname() - operating_system = self.os() - vmcontroller = self.vmcontroller() - except KeyError as exception: - raise ConfigurationError from exception - - if operating_system not in ["linux", "windows"]: - raise ConfigurationError - - # TODO: Verify with plugins - if vmcontroller not in ["vagrant", "running_vm"]: - raise ConfigurationError def vmname(self) -> str: """ Returns the vmname """ - return self.raw_config["vm_name"] + return self.raw_config.vm_name def get_nicknames(self) -> list[str]: """ Gets the nicknames """ - if "nicknames" in self.raw_config: - return self.raw_config["nicknames"] or [] + if self.raw_config.has_key("nicknames"): + return self.raw_config.nicknames or [] return [] def vmcontroller(self) -> str: """ Returns the vm controller. lowercase """ - return self.raw_config["vm_controller"]["type"].lower() + if not self.raw_config.has_key("vm_controller"): + raise ConfigurationError + + return self.raw_config.vm_controller.vm_type.lower() def vm_ip(self) -> str: """ Return the configured ip/domain name (whatever is needed to reach the machine). Returns None if missing """ + + if not self.raw_config.has_key("vm_controller"): + return self.vmname() + + if not self.raw_config.vm_controller.has_key("ip"): + return self.vmname() + try: - return self.raw_config["vm_controller"]["ip"] + return self.raw_config.vm_controller.ip except KeyError: return self.vmname() def os(self) -> str: # pylint: disable=invalid-name """ returns the os. lowercase """ - return self.raw_config["os"].lower() + return self.raw_config.os.lower() def use_existing_machine(self) -> bool: """ Returns if we want to use the existing machine """ @@ -83,7 +77,10 @@ class MachineConfig(): def machinepath(self) -> str: """ Returns the machine path. If not configured it will fall back to the vm_name """ - return self.raw_config.get("machinepath", self.vmname()) + if self.raw_config.has_key("machinepath"): + return self.raw_config.machinepath + + return self.vmname() def get_playground(self) -> Optional[str]: """ Returns the machine specific playground where all the implants and tools will be installed """ @@ -169,22 +166,25 @@ class ExperimentConfig(): """ with open(configfile) as fh: - self.raw_config = yaml.safe_load(fh) + data = yaml.safe_load(fh) - if self.raw_config is None: + if data is None: raise ConfigurationError("Config file is empty") + self.raw_config = MainConfig(**data) + # Process targets - if self.raw_config["targets"] is None: + if self.raw_config.targets is None: raise ConfigurationError("Config file does not specify targets") - for target in self.raw_config["targets"]: - self._targets.append(MachineConfig(self.raw_config["targets"][target])) + + for target in self.raw_config.targets: + self._targets.append(MachineConfig(target)) # Process attackers - if self.raw_config["attackers"] is None: + if self.raw_config.attackers is None: raise ConfigurationError("Config file does not specify attackers") - for attacker in self.raw_config["attackers"]: - self._attackers.append(MachineConfig(self.raw_config["attackers"][attacker])) + for attacker in self.raw_config.attackers: + self._attackers.append(MachineConfig(attacker)) def targets(self) -> list[MachineConfig]: """ Return config for targets as MachineConfig objects """ @@ -210,7 +210,7 @@ class ExperimentConfig(): if self.raw_config is None: raise ConfigurationError("Config file is empty") - return self.raw_config["caldera"]["apikey"] + return self.raw_config.caldera.apikey def loot_dir(self) -> str: """ Returns the loot dir """ @@ -218,10 +218,8 @@ class ExperimentConfig(): if self.raw_config is None: raise ConfigurationError("Config file is empty") - if "results" not in self.raw_config or self.raw_config["results"] is None: - raise ConfigurationError("results missing in configuration") try: - res = self.raw_config["results"]["loot_dir"] + res = self.raw_config.results.loot_dir except KeyError as error: raise ConfigurationError("results/loot_dir not properly set in configuration") from error return res @@ -234,10 +232,9 @@ class ExperimentConfig(): if self.raw_config is None: raise ConfigurationError("Config file is empty") - if self.raw_config["attack_conf"] is None: - raise ConfigurationError("Config file missing attacks") + try: - res = self.raw_config["attack_conf"][attack] + res = self.raw_config.attack_conf[attack] except KeyError: res = {} if res is None: @@ -252,10 +249,9 @@ class ExperimentConfig(): raise ConfigurationError("Config file is empty") try: - res = self.raw_config["caldera_conf"]["obfuscator"] + return self.raw_config.attacks.caldera_obfuscator except KeyError: return "plain-text" - return res def get_caldera_jitter(self) -> str: """ Get the caldera configuration. In this case: Jitter. Will default to 4/8 """ @@ -264,10 +260,9 @@ class ExperimentConfig(): raise ConfigurationError("Config file is empty") try: - res = self.raw_config["caldera_conf"]["jitter"] + return self.raw_config.attacks.caldera_jitter except KeyError: return "4/8" - return res def get_plugin_based_attacks(self, for_os: str) -> list[str]: """ Get the configured kali attacks to run for a specific OS @@ -278,11 +273,11 @@ class ExperimentConfig(): if self.raw_config is None: raise ConfigurationError("Config file is empty") - if "plugin_based_attacks" not in self.raw_config: + if not self.raw_config.has_key("plugin_based_attacks"): return [] - if for_os not in self.raw_config["plugin_based_attacks"]: + if not self.raw_config.plugin_based_attacks.has_key(for_os): return [] - res = self.raw_config["plugin_based_attacks"][for_os] + res = self.raw_config.plugin_based_attacks.get(for_os) if res is None: return [] return res @@ -296,11 +291,11 @@ class ExperimentConfig(): if self.raw_config is None: raise ConfigurationError("Config file is empty") - if "caldera_attacks" not in self.raw_config: + if not self.raw_config.has_key("caldera_attacks"): return [] - if for_os not in self.raw_config["caldera_attacks"]: + if not self.raw_config.caldera_attacks.has_key(for_os): return [] - res = self.raw_config["caldera_attacks"][for_os] + res = self.raw_config.caldera_attacks.get(for_os) if res is None: return [] return res @@ -325,11 +320,9 @@ class ExperimentConfig(): if self.raw_config is None: raise ConfigurationError("Config file is empty") - if "sensors" not in self.raw_config: - return {} - if self.raw_config["sensors"] is None: # Better for unit tests that way. + if self.raw_config.sensor_conf is None: # Better for unit tests that way. return {} - if name in self.raw_config["sensors"]: - return self.raw_config["sensors"][name] + if name in self.raw_config.sensor_conf: + return self.raw_config.sensor_conf[name] return {} diff --git a/app/config_verifier.py b/app/config_verifier.py index 76b0cdd..60622a6 100644 --- a/app/config_verifier.py +++ b/app/config_verifier.py @@ -3,7 +3,7 @@ """ Pydantic verifier for config structure """ from pydantic.dataclasses import dataclass -from pydantic import conlist, BaseModel +from pydantic import conlist from typing import Literal, Optional, TypedDict, Union from enum import Enum @@ -13,26 +13,48 @@ class OSEnum(str, Enum): windows = "windows" +class VMControllerTypeEnum(str, Enum): + vagrant = "vagrant" + running_vm = "running_vm" + + @dataclass class CalderaConfig: apikey: str + def has_key(self, keyname): + if keyname in self.__dict__.keys(): + return True + return False + + +@dataclass +class VMController: + vm_type: VMControllerTypeEnum + vagrantfilepath: str + ip: Optional[str] = "" + + def has_key(self, keyname): + if keyname in self.__dict__.keys(): + return True + return False + @dataclass class Attacker: name: str - vm_controller: dict + vm_controller: VMController vm_name: str + nicknames: Optional[list[str]] machinepath: str os: OSEnum use_existing_machine: bool = False + def has_key(self, keyname): + if keyname in self.__dict__.keys(): + return True + return False -@dataclass -class VMController: - type: str - vagrantfilepath: str - ip: Optional[str] = "" @dataclass class Target: @@ -44,6 +66,7 @@ class Target: group: str machinepath: str sensors: Optional[list[str]] + nicknames: Optional[list[str]] active: bool = True use_existing_machine: bool = False playground: Optional[str] = None @@ -53,12 +76,22 @@ class Target: ssh_keyfile: Optional[str] = None vulnerabilities: list[str] = None + def has_key(self, keyname): + if keyname in self.__dict__.keys(): + return True + return False + @dataclass class AttackConfig: - nap_time: int - caldera_obfuscator: str - caldera_jitter: str + caldera_obfuscator: str = "plain-text" + caldera_jitter: str = "4/8" + nap_time: int = 5 + + def has_key(self, keyname): + if keyname in self.__dict__.keys(): + return True + return False @dataclass @@ -66,14 +99,29 @@ class AttackList: linux: Optional[list[str]] windows: Optional[list[str]] + def has_key(self, keyname): + if keyname in self.__dict__.keys(): + return True + return False + + def get(self, keyname, default=None): + if self.has_key(keyname): + return self.__dict__[keyname] + return default + @dataclass class Results: loot_dir: str + def has_key(self, keyname): + if keyname in self.__dict__.keys(): + return True + return False + @dataclass -class MainConfig(): +class MainConfig: caldera: CalderaConfig attackers: conlist(Attacker, min_items=1) targets: conlist(Target, min_items=1) @@ -83,8 +131,13 @@ class MainConfig(): results: Results # Free form configuration for plugins - attack_conf: dict - sensor_conf: dict + attack_conf: Optional[dict] + sensor_conf: Optional[dict] + + def has_key(self, keyname): + if keyname in self.__dict__.keys(): + return True + return False # TODO: Check for name duplication \ No newline at end of file diff --git a/doc/source/conf.py b/doc/source/conf.py index 5ea90ab..74535ef 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -41,6 +41,11 @@ extensions += ['sphinx_pyreverse'] extensions += ['sphinxcontrib.autoyaml'] autoyaml_level = 5 +# Pydantic plugin for sphinx. Another way to generate config documentation +# extensions += ['sphinx-pydantic'] +# This has bugs and is not properly maintained +# But would be awesome: https://sphinx-pydantic.readthedocs.io/en/latest/ + # Properly display command line behaviour https://pypi.org/project/sphinxcontrib.asciinema/ # https://github.com/divi255/sphinxcontrib.asciinema/issues/11 extensions += ['sphinxcontrib.asciinema'] diff --git a/pydantic_test.py b/pydantic_test.py index eabd546..5f7bf74 100755 --- a/pydantic_test.py +++ b/pydantic_test.py @@ -25,3 +25,5 @@ if __name__ == "__main__": r = load(arguments.filename) print(r) print(r.caldera.apikey) + # print(r.blarg) + print(dir(r.__dict__)) diff --git a/requirements.txt b/requirements.txt index 0fe9dd6..ddf586e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,13 +11,14 @@ pymetasploit3==1.0.3 pylint==2.9.3 flask==2.0.2 pydantic==1.8.2 +dotmap==1.3.25 # Sphinx stuff sphinx-argparse==0.2.5 sphinxcontrib-autoyaml==0.6.1 sphinx-pyreverse==0.0.13 sphinxcontrib.asciinema==0.3.2 -sphinx-pydantic +# sphinx-pydantic # This one has issues that must be fixed upstream first # Mypy stuff mypy==0.910 diff --git a/tests/data/attacker_has_empty_nicknames.yaml b/tests/data/attacker_has_empty_nicknames.yaml index 4585861..7d4952d 100644 --- a/tests/data/attacker_has_empty_nicknames.yaml +++ b/tests/data/attacker_has_empty_nicknames.yaml @@ -11,14 +11,14 @@ caldera: attackers: ### # Configuration for the first attacker. One should normally be enough - attacker: + - name: attacker ### # Defining VM controller settings for this machine vm_controller: ### # Type of the VM controller, Options are "vagrant" - type: vagrant + vm_type: vagrant ### # # path where the vagrantfile is in vagrantfilepath: systems @@ -49,9 +49,9 @@ attackers: targets: ### # Specific target - target1: + - name: target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target1 @@ -74,13 +74,20 @@ targets: # Do not destroy/create the machine: Set this to "yes". use_existing_machine: yes - target2: + ### Sensors to run on this machine + sensors: + # - windows_osquery + + - name: target2 #root: systems/target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target2 + + nicknames: + os: windows paw: target2w group: red @@ -101,6 +108,10 @@ targets: # For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required. ssh_keyfile: id_rsa.3 + ### Sensors to run on this machine + sensors: + # - windows_osquery + ### # General attack config attacks: @@ -108,6 +119,15 @@ attacks: # configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match nap_time: 5 + ### + # The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators: + # plain-text, base64, base64jumble, caesar, base64noPadding, steganography + caldera_obfuscator: plain-text + + ### + # Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8 + caldera_jitter: 4/8 + ### # A list of caldera attacks to run against the targets. caldera_attacks: @@ -160,7 +180,7 @@ results: ### # General sensor config config -sensors: +sensor_conf: ### # Windows sensor plugin configuration windows_sensor: diff --git a/tests/data/attacks_half.yaml b/tests/data/attacks_half.yaml index 1676f2b..9af3dd7 100644 --- a/tests/data/attacks_half.yaml +++ b/tests/data/attacks_half.yaml @@ -11,14 +11,14 @@ caldera: attackers: ### # Configuration for the first attacker. One should normally be enough - attacker: + - name: attacker ### # Defining VM controller settings for this machine vm_controller: ### # Type of the VM controller, Options are "vagrant" - type: vagrant + vm_type: vagrant ### # # path where the vagrantfile is in vagrantfilepath: systems @@ -27,6 +27,8 @@ attackers: # Name of machine in Vagrantfile vm_name: attacker + nicknames: + ### # machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path # and will be mounted internally as /vagrant/ @@ -46,12 +48,13 @@ attackers: targets: ### # Specific target - target1: + - name: target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target1 + nicknames: os: linux ### # Targets need a unique PAW name for caldera @@ -64,13 +67,16 @@ targets: # Do not destroy/create the machine: Set this to "yes". use_existing_machine: yes - target2: + sensors: + + - name: target2 #root: systems/target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target2 + nicknames: os: windows paw: target2w group: red @@ -91,6 +97,8 @@ targets: # For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required. ssh_keyfile: id_rsa.3 + sensors: + ### # A list of caldera attacks to run against the targets. caldera_attacks: @@ -105,7 +113,24 @@ caldera_attacks: #- "foo" #- "bar" +sensor_conf: + + +### +# General attack config +attacks: + ### + # configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match + nap_time: 5 + ### + # The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators: + # plain-text, base64, base64jumble, caesar, base64noPadding, steganography + caldera_obfuscator: plain-text + + ### + # Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8 + caldera_jitter: 4/8 ## A bug in production was triggered by this half config. Adding a unit test ### diff --git a/tests/data/attacks_perfect.yaml b/tests/data/attacks_perfect.yaml index bb3978c..b45e636 100644 --- a/tests/data/attacks_perfect.yaml +++ b/tests/data/attacks_perfect.yaml @@ -11,14 +11,14 @@ caldera: attackers: ### # Configuration for the first attacker. One should normally be enough - attacker: + - name: attacker ### # Defining VM controller settings for this machine vm_controller: ### # Type of the VM controller, Options are "vagrant" - type: vagrant + vm_type: vagrant ### # # path where the vagrantfile is in vagrantfilepath: systems @@ -27,6 +27,8 @@ attackers: # Name of machine in Vagrantfile vm_name: attacker + nicknames: + ### # machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path # and will be mounted internally as /vagrant/ @@ -46,12 +48,15 @@ attackers: targets: ### # Specific target - target1: + - name: target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target1 + + nicknames: + os: linux ### # Targets need a unique PAW name for caldera @@ -64,13 +69,17 @@ targets: # Do not destroy/create the machine: Set this to "yes". use_existing_machine: yes - target2: + sensors: + + - name: target2 #root: systems/target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target2 + nicknames: + os: windows paw: target2w group: red @@ -91,17 +100,8 @@ targets: # For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required. ssh_keyfile: id_rsa.3 -### -# Configuration for caldera -caldera_conf: - ### - # The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators: - # plain-text, base64, base64jumble, caesar, base64noPadding, steganography - obfuscator: foo-bar + sensors: - ### - # Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8 - jitter: 08/15 ### # A list of caldera attacks to run against the targets. @@ -131,6 +131,16 @@ plugin_based_attacks: - medusa - skylla +attacks: + ### + # The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators: + # plain-text, base64, base64jumble, caesar, base64noPadding, steganography + caldera_obfuscator: foo-bar + + ### + # Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8 + caldera_jitter: 08/15 + ### # Configuration for the plugin based attack tools attack_conf: @@ -150,6 +160,8 @@ attack_conf: # A file containing potential passwords pwdfile: passwords.txt +sensor_conf: + ### # Settings for the results being harvested results: diff --git a/tests/data/basic.yaml b/tests/data/basic.yaml index f292ebb..32e26bf 100644 --- a/tests/data/basic.yaml +++ b/tests/data/basic.yaml @@ -11,14 +11,14 @@ caldera: attackers: ### # Configuration for the first attacker. One should normally be enough - attacker: + - name: attacker ### # Defining VM controller settings for this machine vm_controller: ### # Type of the VM controller, Options are "vagrant" - type: vagrant + vm_type: vagrant ### # # path where the vagrantfile is in vagrantfilepath: systems @@ -27,6 +27,8 @@ attackers: # Name of machine in Vagrantfile vm_name: attacker + nicknames: + ### # machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path # and will be mounted internally as /vagrant/ @@ -46,12 +48,19 @@ attackers: targets: ### # Specific target - target1: + - name: target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target1 + + # Used for tests + nicknames: + # - 1 + # - 2 + # - 3 + os: linux ### # Targets need a unique PAW name for caldera @@ -64,13 +73,23 @@ targets: # Do not destroy/create the machine: Set this to "yes". use_existing_machine: yes - target2: + ### Sensors to run on this machine + sensors: + # - windows_osquery + + - name: target2 #root: systems/target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target2 + # Used for tests + nicknames: + - a + - b + - c + os: windows paw: target2w group: red @@ -91,6 +110,10 @@ targets: # For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required. ssh_keyfile: id_rsa.3 + ### Sensors to run on this machine + sensors: + # - windows_osquery + ### # General attack config attacks: @@ -98,6 +121,15 @@ attacks: # configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match nap_time: 5 + ### + # The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators: + # plain-text, base64, base64jumble, caesar, base64noPadding, steganography + caldera_obfuscator: plain-text + + ### + # Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8 + caldera_jitter: 4/8 + ### # A list of caldera attacks to run against the targets. caldera_attacks: @@ -150,7 +182,7 @@ results: ### # General sensor config config -sensors: +sensor_conf: ### # Windows sensor plugin configuration windows_sensor: diff --git a/tests/data/basic_empty_sensor.yaml b/tests/data/basic_empty_sensor.yaml index 27bc8ca..b71d938 100644 --- a/tests/data/basic_empty_sensor.yaml +++ b/tests/data/basic_empty_sensor.yaml @@ -11,14 +11,14 @@ caldera: attackers: ### # Configuration for the first attacker. One should normally be enough - attacker: + - name: attacker ### # Defining VM controller settings for this machine vm_controller: ### # Type of the VM controller, Options are "vagrant" - type: vagrant + vm_type: vagrant ### # # path where the vagrantfile is in vagrantfilepath: systems @@ -27,6 +27,8 @@ attackers: # Name of machine in Vagrantfile vm_name: attacker + nicknames: + ### # machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path # and will be mounted internally as /vagrant/ @@ -46,12 +48,13 @@ attackers: targets: ### # Specific target - target1: + - name: target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target1 + nicknames: os: linux ### # Targets need a unique PAW name for caldera @@ -64,13 +67,16 @@ targets: # Do not destroy/create the machine: Set this to "yes". use_existing_machine: yes - target2: + sensors: + + - name: target2 #root: systems/target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target2 + nicknames: os: windows paw: target2w group: red @@ -91,6 +97,8 @@ targets: # For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required. ssh_keyfile: id_rsa.3 + sensors: + ### # General attack config attacks: @@ -150,6 +158,4 @@ results: ### # General sensor config config -sensors: - -foo: \ No newline at end of file +sensor_conf: diff --git a/tests/data/partial.yaml b/tests/data/partial.yaml index 5dd41a7..d6013a1 100644 --- a/tests/data/partial.yaml +++ b/tests/data/partial.yaml @@ -11,14 +11,14 @@ caldera: attackers: ### # Configuration for the first attacker. One should normally be enough - attacker: + - name: attacker ### # Defining VM controller settings for this machine vm_controller: ### # Type of the VM controller, Options are "vagrant" - type: vagrant + vm_type: vagrant ### # # path where the vagrantfile is in vagrantfilepath: systems @@ -27,6 +27,8 @@ attackers: # Name of machine in Vagrantfile vm_name: attacker + nicknames: + ### # machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path # and will be mounted internally as /vagrant/ @@ -46,12 +48,13 @@ attackers: targets: ### # Specific target - target1: + - name: target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target1 + nicknames: os: linux ### # Targets need a unique PAW name for caldera @@ -64,13 +67,17 @@ targets: # Do not destroy/create the machine: Set this to "yes". use_existing_machine: yes - target2: + sensors: + + - name: target2 #root: systems/target1 vm_controller: - type: vagrant + vm_type: vagrant vagrantfilepath: systems vm_name: target2 + nicknames: + os: windows paw: target2w group: red @@ -91,6 +98,8 @@ targets: # For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required. ssh_keyfile: id_rsa.3 + sensors: + ### # General attack config attacks: @@ -99,9 +108,7 @@ attacks: nap_time: 5 -## Broken caldera conf -caldera_conf: - foo: bar +sensor_conf: ### # A list of caldera attacks to run against the targets. diff --git a/tests/test_config.py b/tests/test_config.py index 9d56e0f..0c52171 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -6,6 +6,7 @@ import unittest # import os from app.config import ExperimentConfig, MachineConfig from app.exceptions import ConfigurationError +from dotmap import DotMap # https://docs.python.org/3/library/unittest.html @@ -21,481 +22,429 @@ class TestMachineConfig(unittest.TestCase): def test_basic_init(self): """ The init is basic and working """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target1"}) + "vm_name": "target1"})) self.assertEqual(mc.raw_config["root"], "systems/attacker1") - self.assertEqual(mc.raw_config["vm_controller"]["type"], "vagrant") - - def test_missing_vm_name(self): - """ The vm name is missing """ - - with self.assertRaises(ConfigurationError): - MachineConfig({"root": "systems/attacker1", - "os": "linux", - "vm_controller": { - "type": "vagrant", - "vagrantfilepath": "systems", - }}) + self.assertEqual(mc.raw_config.vm_controller.vm_type, "vagrant") def test_use_existing_machine_is_true(self): """ Testing use_existing:machine setting """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": True}) + "use_existing_machine": True})) self.assertEqual(mc.use_existing_machine(), True) def test_use_existing_machine_is_false(self): """ Testing use_existing:machine setting """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.use_existing_machine(), False) def test_use_existing_machine_is_default(self): """ Testing use_existing:machine setting """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target1"}) + "vm_name": "target1"})) self.assertEqual(mc.use_existing_machine(), False) def test_windows_is_valid_os(self): """ Testing if windows is valid os """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "windows", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target1"}) + "vm_name": "target1"})) self.assertEqual(mc.os(), "windows") def test_windows_is_valid_os_casefix(self): """ Testing if windows is valid os - using lowercase fix""" - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "WINDOWS", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target1"}) + "vm_name": "target1"})) self.assertEqual(mc.os(), "windows") def test_linux_is_valid_os(self): """ Testing if windows is valid os """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target1"}) + "vm_name": "target1"})) self.assertEqual(mc.os(), "linux") - def test_missing_os(self): - """ The os is missing """ - - with self.assertRaises(ConfigurationError): - MachineConfig({"root": "systems/attacker1", - "vm_controller": { - "type": "vagrant", - "vagrantfilepath": "systems", - }, - "vm_name": "target1"}) - - def test_wrong_os(self): - """ The os is wrong """ - - with self.assertRaises(ConfigurationError): - MachineConfig({"root": "systems/attacker1", - "os": "BROKEN", - "vm_controller": { - "type": "vagrant", - "vagrantfilepath": "systems", - }, - "vm_name": "target1"}) - def test_vagrant_is_valid_vmcontroller(self): """ Testing if vagrant is valid vmcontroller """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target1"}) + "vm_name": "target1"})) self.assertEqual(mc.vmcontroller(), "vagrant") def test_vagrant_is_valid_vmcontroller_casefix(self): """ Testing if vagrant is valid vmcontroller case fixxed""" - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "VAGRANT", + "vm_type": "VAGRANT", "vagrantfilepath": "systems", }, - "vm_name": "target1"}) + "vm_name": "target1"})) self.assertEqual(mc.vmcontroller(), "vagrant") - def test_invalid_vmcontroller(self): - """ Testing if vagrant is valid vmcontroller case fixxed""" - with self.assertRaises(ConfigurationError): - MachineConfig({"root": "systems/attacker1", - "os": "linux", - "vm_controller": { - "type": "BROKEN", - "vagrantfilepath": "systems", - }, - "vm_name": "target1"}) - - def test_missing_vmcontroller_2(self): - """ Testing if vagrant is valid vmcontroller case fixxed""" - with self.assertRaises(ConfigurationError): - MachineConfig({"root": "systems/attacker1", - "os": "linux", - "vm_name": "target1"}) - def test_vagrant_is_valid_vmip(self): """ Testing if vagrant is valid ip/url """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "ip": "kali", "vagrantfilepath": "systems", }, - "vm_name": "target1"}) + "vm_name": "target1"})) self.assertEqual(mc.vm_ip(), "kali") def test_missing_vmip(self): """ Testing if missing vm ip is handled""" vm_name = "target1" - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": vm_name}) + "vm_name": vm_name})) self.assertEqual(mc.vm_ip(), vm_name) def test_machinepath(self): """ Testing machinepath setting """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", "use_existing_machine": False, - "machinepath": "foo"}) + "machinepath": "foo"})) self.assertEqual(mc.machinepath(), "foo") def test_machinepath_fallback(self): """ Testing machinepath setting fallback to vmname""" - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.machinepath(), "target1") def test_paw(self): """ Testing for caldera paw """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "paw": "Bar", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.caldera_paw(), "Bar") def test_paw_fallback(self): """ Testing for caldera paw fallback """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.caldera_paw(), None) def test_group(self): """ Testing for caldera group """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "group": "Bar", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.caldera_group(), "Bar") def test_group_fallback(self): """ Testing for caldera group fallback """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.caldera_group(), None) def test_ssh_keyfile(self): """ Testing keyfile config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "ssh_keyfile": "Bar", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.ssh_keyfile(), "Bar") def test_ssh_keyfile_default(self): """ Testing keyfile config default """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.ssh_keyfile(), None) def test_ssh_user(self): """ Testing ssh user config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "ssh_user": "Bob", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.ssh_user(), "Bob") def test_ssh_user_default(self): """ Testing ssh user default config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.ssh_user(), "vagrant") def test_ssh_password(self): """ Testing ssh password config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "ssh_user": "Bob", "ssh_password": "Ross", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.ssh_password(), "Ross") def test_ssh_password_default(self): """ Testing ssh password default config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertIsNone(mc.ssh_password()) def test_halt_needs_force_default(self): """ Testing 'halt needs force' default config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.halt_needs_force(), False) def test_halt_needs_force(self): """ Testing 'halt needs force' config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.halt_needs_force(), True) def test_vagrantfilepath(self): """ Testing vagrantfilepath config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.vagrantfilepath(), "systems") def test_vagrantfilepath_missing(self): """ Testing missing vagrantfilepath config """ with self.assertRaises(ConfigurationError): - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False}))) mc.vagrantfilepath() def test_sensors_empty(self): """ Testing empty sensor config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.sensors(), []) def test_sensors_set(self): """ Testing empty sensor config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target1", "use_existing_machine": False, - "sensors": ["linux_idp", "test_sensor"]}) + "sensors": ["linux_idp", "test_sensor"]})) self.assertEqual(mc.sensors(), ["linux_idp", "test_sensor"]) def test_vulnerabilities_empty(self): """ Testing empty vulnerabilities config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target1", - "use_existing_machine": False}) + "use_existing_machine": False})) self.assertEqual(mc.vulnerabilities(), []) def test_vulnerabilities_set(self): """ Testing empty vulnerabilities config """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target1", "use_existing_machine": False, - "vulnerabilities": ["PEBKAC", "USER"]}) + "vulnerabilities": ["PEBKAC", "USER"]})) self.assertEqual(mc.vulnerabilities(), ["PEBKAC", "USER"]) def test_active_not_set(self): """ machine active not set """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target1", "use_existing_machine": False, - "sensors": ["linux_idp", "test_sensor"]}) + "sensors": ["linux_idp", "test_sensor"]})) self.assertEqual(mc.is_active(), True) def test_active_is_false(self): """ machine active is set to false """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target1", "use_existing_machine": False, "active": False, - "sensors": ["linux_idp", "test_sensor"]}) + "sensors": ["linux_idp", "test_sensor"]})) self.assertEqual(mc.is_active(), False) def test_active_is_true(self): """ machine active is set to true """ - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "halt_needs_force": True, "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target1", "use_existing_machine": False, "active": True, - "sensors": ["linux_idp", "test_sensor"]}) + "sensors": ["linux_idp", "test_sensor"]})) self.assertEqual(mc.is_active(), True) @@ -509,7 +458,7 @@ class TestExperimentConfig(unittest.TestCase): """ Existing, basic config file, testing the values are loaded properly """ ex = ExperimentConfig("tests/data/basic.yaml") - self.assertEqual(ex.raw_config["caldera"]["apikey"], "ADMIN123") + self.assertEqual(ex.raw_config.caldera.apikey, "ADMIN123") self.assertEqual(ex.caldera_apikey(), "ADMIN123") def test_broken_apikey(self): @@ -528,22 +477,6 @@ class TestExperimentConfig(unittest.TestCase): with self.assertRaises(ConfigurationError): e.loot_dir() - def test_broken_attack_conf(self): - """ Test with partially empty config file """ - - e = ExperimentConfig("tests/data/basic.yaml") - e.raw_config = None - with self.assertRaises(ConfigurationError): - e.attack_conf("hydra") - - def test_broken_attack_conf_2(self): - """ Test with partially empty config file """ - - e = ExperimentConfig("tests/data/basic.yaml") - e.raw_config["attack_conf"] = None - with self.assertRaises(ConfigurationError): - e.attack_conf("hydra") - def test_broken_caldera_obfuscator_conf(self): """ Test with partially empty config file """ @@ -598,53 +531,12 @@ class TestExperimentConfig(unittest.TestCase): ex = ExperimentConfig("tests/data/basic.yaml") self.assertEqual(ex.loot_dir(), "loot") - def test_missing_loot_dir(self): - """ Test with missing loot dir """ - - with self.assertRaises(ConfigurationError): - ExperimentConfig("tests/data/basic_loot_missing.yaml") - def test_empty_config(self): """ Test with empty config file """ with self.assertRaises(ConfigurationError): ExperimentConfig("tests/data/empty.yaml") - def test_empty_targets(self): - """ Test with empty targets in file """ - - with self.assertRaises(ConfigurationError): - ExperimentConfig("tests/data/empty_targets.yaml") - - def test_empty_attackers(self): - """ Test with empty attackers in file """ - - with self.assertRaises(ConfigurationError): - ExperimentConfig("tests/data/empty_attackers.yaml") - - def test_missing_results(self): - """ Test with missing results """ - - with self.assertRaises(ConfigurationError): - ExperimentConfig("tests/data/basic_results_missing.yaml") - - def test_basic_loading_targets_read(self): - """ Targets in config: can be found """ - ex = ExperimentConfig("tests/data/basic.yaml") - self.assertEqual(len(ex._targets), 2) - self.assertEqual(ex._targets[0].raw_config["vm_name"], "target1") - self.assertEqual(ex._targets[0].vmname(), "target1") - self.assertEqual(ex.targets()[0].vmname(), "target1") - - def test_basic_loading_attacker_read(self): - """ Attackers in config: can be found """ - ex = ExperimentConfig("tests/data/basic.yaml") - self.assertEqual(len(ex._targets), 2) - self.assertEqual(ex._attackers[0].raw_config["vm_name"], "attacker") - self.assertEqual(ex._attackers[0].vmname(), "attacker") - self.assertEqual(ex.attackers()[0].vmname(), "attacker") - self.assertEqual(ex.attacker(0).vmname(), "attacker") - def test_nicknames_missing(self): """ Test when the machine nicknames are non existing """ ex = ExperimentConfig("tests/data/basic.yaml") @@ -658,7 +550,7 @@ class TestExperimentConfig(unittest.TestCase): def test_nicknames_present(self): """ Test when the machine nicknames are there """ ex = ExperimentConfig("tests/data/attacker_has_empty_nicknames.yaml") - self.assertEqual(ex._targets[0].get_nicknames(), [1, 2, 3]) + self.assertEqual(ex._targets[0].get_nicknames(), ["1", "2", "3"]) def test_missing_attack_config(self): """ Getting attack config for a specific attack. Attack missing """ @@ -674,14 +566,6 @@ class TestExperimentConfig(unittest.TestCase): data = ex.attack_conf("hydra") self.assertEqual(data["userfile"], "users.txt") - def test_attack_config_missing_attack_data(self): - """ Getting attack config for a specific attack: Missing """ - - ex = ExperimentConfig("tests/data/attacks_missing.yaml") - - data = ex.attack_conf("missing") - self.assertEqual(data, {}) - def test_missing_caldera_config_obfuscator(self): """ A config file with no caldera config at all """ @@ -718,27 +602,6 @@ class TestExperimentConfig(unittest.TestCase): ex = ExperimentConfig("tests/data/attacks_perfect.yaml") self.assertEqual(ex.get_caldera_jitter(), "08/15") - def test_nap_time(self): - """ nap time is set """ - - ex = ExperimentConfig("tests/data/basic.yaml") - - self.assertEqual(ex.get_nap_time(), 5) - - def test_nap_time_not_set(self): - """ nap time is not set """ - - ex = ExperimentConfig("tests/data/nap_time_missing.yaml") - - self.assertEqual(ex.get_nap_time(), 0) - - def test_kali_attacks_missing(self): - """ kali attacks entry fully missing from config """ - - ex = ExperimentConfig("tests/data/attacks_missing.yaml") - - self.assertEqual(ex.get_plugin_based_attacks("linux"), []) - def test_kali_attacks_empty(self): """ zero entries in kali attacks list """ @@ -760,13 +623,6 @@ class TestExperimentConfig(unittest.TestCase): self.assertEqual(ex.get_plugin_based_attacks("windows"), ["hydra", "medusa", "skylla"]) - def test_caldera_attacks_missing(self): - """ caldera attacks entry fully missing from config """ - - ex = ExperimentConfig("tests/data/attacks_missing.yaml") - - self.assertEqual(ex.get_caldera_attacks("linux"), []) - def test_kali_attacks_half(self): """ kali attacks entry partially missing from config """ @@ -818,13 +674,6 @@ class TestExperimentConfig(unittest.TestCase): self.assertEqual(ex.get_sensor_config("missing_windows_sensor"), {}) - def test_basic_sensor_entry_missing(self): - """ Test global configuration for a specific and missing sensor entry""" - - ex = ExperimentConfig("tests/data/attacks_missing.yaml") - - self.assertEqual(ex.get_sensor_config("windows_sensor"), {}) - def test_basic_sensor_entry_empty(self): """ Test global configuration for a specific and empty sensor entry""" diff --git a/tests/test_machinecontrol.py b/tests/test_machinecontrol.py index d215e42..e7d9cab 100644 --- a/tests/test_machinecontrol.py +++ b/tests/test_machinecontrol.py @@ -1,5 +1,6 @@ import unittest import os +from dotmap import DotMap from app.machinecontrol import Machine from app.exceptions import ConfigurationError from app.config import MachineConfig @@ -15,167 +16,137 @@ class TestMachineControl(unittest.TestCase): self.attack_logger = AttackLog(0) def test_get_os_linux_machine(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target3"}, self.attack_logger) + "vm_name": "target3"}), self.attack_logger) self.assertEqual(m.get_os(), "linux") def test_get_os_linux_machine_with_config_class(self): - mc = MachineConfig({"root": "systems/attacker1", + mc = MachineConfig(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target3"}) + "vm_name": "target3"})) m = Machine(mc, self.attack_logger) self.assertEqual(m.get_os(), "linux") - def test_get_os_missing(self): - with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", - "vm_controller": { - "type": "vagrant", - "vagrantfilepath": "systems", - }, - "vm_name": "target3" - }, self.attack_logger) - - def test_get_os_not_supported(self): - with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", - "os": "nintendo_switch", - "vm_controller": { - "type": "vagrant", - "vagrantfilepath": "systems", - }, - "vm_name": "target3"}, self.attack_logger) - def test_get_paw_good(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "paw": "testme", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target3"}, self.attack_logger) + "vm_name": "target3"}), self.attack_logger) self.assertEqual(m.get_paw(), "testme") def test_get_paw_missing(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target3" - }, self.attack_logger) + }), self.attack_logger) self.assertEqual(m.get_paw(), None) def test_get_group_good(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "group": "testme", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, - "vm_name": "target3"}, self.attack_logger) + "vm_name": "target3"}), self.attack_logger) self.assertEqual(m.get_group(), "testme") def test_get_group_missing(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target3" - }, self.attack_logger) + }), self.attack_logger) self.assertEqual(m.get_group(), None) def test_vagrantfilepath_missing(self): with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", + Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", }, "vm_name": "target3" - }, self.attack_logger) + }), self.attack_logger) def test_vagrantfile_missing(self): with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", + Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "non_existing", }, "vm_name": "target3" - }, self.attack_logger) + }), self.attack_logger) def test_vagrantfile_existing(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target3" - }, self.attack_logger) + }), self.attack_logger) self.assertIsNotNone(m) - def test_name_missing(self): - with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", - "os": "linux", - "vm_controller": { - "type": "vagrant", - "vagrantfilepath": "systems", - }, - }, self.attack_logger) - # test: auto generated, dir missing def test_auto_generated_machinepath_with_path_missing(self): with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", + Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "missing" - }, self.attack_logger) + }), self.attack_logger) # test manual config, dir missing def test_configured_machinepath_with_path_missing(self): with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", + Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target3", "machinepath": "missing" - }, self.attack_logger) + }), self.attack_logger) # test auto generated, dir there (external/internal dirs must work !) def test_auto_generated_machinepath_with_good_config(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target3" - }, self.attack_logger) + }), self.attack_logger) vagrantfilepath = os.path.abspath("systems") ext = os.path.join(vagrantfilepath, "target3") internal = os.path.join("/vagrant/", "target3") @@ -185,15 +156,15 @@ class TestMachineControl(unittest.TestCase): # test: manual config, dir there (external/internal dirs must work !) def test_configured_machinepath_with_good_config(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "missing", "machinepath": "target3" - }, self.attack_logger) + }), self.attack_logger) vagrantfilepath = os.path.abspath("systems") ext = os.path.join(vagrantfilepath, "target3") internal = os.path.join("/vagrant/", "target3") @@ -204,36 +175,23 @@ class TestMachineControl(unittest.TestCase): # vm_controller missing def test_configured_vm_controller_missing(self): with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", - "os": "linux", - "vm_name": "missing", - "machinepath": "target3" - }, self.attack_logger) - - # vm_controller wrong - def test_configured_vm_controller_wrong_type(self): - with self.assertRaises(ConfigurationError): - Machine({"root": "systems/attacker1", + Machine(DotMap({"root": "systems/attacker1", "os": "linux", - "vm_controller": { - "type": "wrong_controller", - "vagrantfilepath": "systems", - }, "vm_name": "missing", "machinepath": "target3" - }, self.attack_logger) + }), self.attack_logger) # Create caldera start command and verify it def test_get_linux_caldera_start_cmd(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "linux", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target3", "group": "testgroup", - "paw": "testpaw"}, self.attack_logger) + "paw": "testpaw"}), self.attack_logger) m.set_caldera_server("http://www.test.test") with patch.object(m.vm_manager, "get_playground", return_value="/vagrant/target3"): cmd = m.create_start_caldera_client_cmd() @@ -241,16 +199,16 @@ class TestMachineControl(unittest.TestCase): # Create caldera start command and verify it (windows) def test_get_windows_caldera_start_cmd(self): - m = Machine({"root": "systems/attacker1", + m = Machine(DotMap({"root": "systems/attacker1", "os": "windows", "vm_controller": { - "type": "vagrant", + "vm_type": "vagrant", "vagrantfilepath": "systems", }, "vm_name": "target3", "group": "testgroup", "paw": "testpaw", - "machinepath": "target2w"}, self.attack_logger) + "machinepath": "target2w"}), self.attack_logger) m.set_caldera_server("www.test.test") cmd = m.create_start_caldera_client_cmd() self.maxDiff = None diff --git a/tox.ini b/tox.ini index 09b0859..8558e0c 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,7 @@ exclude = max-complexity = 10 [testenv] -deps = -rrequirements.txt +deps = -r requirements.txt flake8 safety bandit