Unit tests work. Still some PEP errors and no integration checks

pull/14/head
Thorsten Sick 3 years ago
parent 5dfd9d6a12
commit 32af5cf956

@ -4,6 +4,7 @@
from typing import Optional
import yaml
from app.config_verifier import MainConfig
from app.exceptions import ConfigurationError
@ -18,7 +19,7 @@ from app.exceptions import ConfigurationError
class MachineConfig():
""" Sub config for a specific machine"""
def __init__(self, machinedata: dict):
def __init__(self, machinedata):
""" Init machine control config
@param machinedata: dict containing machine data
@ -27,53 +28,46 @@ class MachineConfig():
raise ConfigurationError
self.raw_config = machinedata
self.verify()
def verify(self):
""" Verify essential data is present """
try:
self.vmname()
operating_system = self.os()
vmcontroller = self.vmcontroller()
except KeyError as exception:
raise ConfigurationError from exception
if operating_system not in ["linux", "windows"]:
raise ConfigurationError
# TODO: Verify with plugins
if vmcontroller not in ["vagrant", "running_vm"]:
raise ConfigurationError
def vmname(self) -> str:
""" Returns the vmname """
return self.raw_config["vm_name"]
return self.raw_config.vm_name
def get_nicknames(self) -> list[str]:
""" Gets the nicknames """
if "nicknames" in self.raw_config:
return self.raw_config["nicknames"] or []
if self.raw_config.has_key("nicknames"):
return self.raw_config.nicknames or []
return []
def vmcontroller(self) -> str:
""" Returns the vm controller. lowercase """
return self.raw_config["vm_controller"]["type"].lower()
if not self.raw_config.has_key("vm_controller"):
raise ConfigurationError
return self.raw_config.vm_controller.vm_type.lower()
def vm_ip(self) -> str:
""" Return the configured ip/domain name (whatever is needed to reach the machine). Returns None if missing """
if not self.raw_config.has_key("vm_controller"):
return self.vmname()
if not self.raw_config.vm_controller.has_key("ip"):
return self.vmname()
try:
return self.raw_config["vm_controller"]["ip"]
return self.raw_config.vm_controller.ip
except KeyError:
return self.vmname()
def os(self) -> str: # pylint: disable=invalid-name
""" returns the os. lowercase """
return self.raw_config["os"].lower()
return self.raw_config.os.lower()
def use_existing_machine(self) -> bool:
""" Returns if we want to use the existing machine """
@ -83,7 +77,10 @@ class MachineConfig():
def machinepath(self) -> str:
""" Returns the machine path. If not configured it will fall back to the vm_name """
return self.raw_config.get("machinepath", self.vmname())
if self.raw_config.has_key("machinepath"):
return self.raw_config.machinepath
return self.vmname()
def get_playground(self) -> Optional[str]:
""" Returns the machine specific playground where all the implants and tools will be installed """
@ -169,22 +166,25 @@ class ExperimentConfig():
"""
with open(configfile) as fh:
self.raw_config = yaml.safe_load(fh)
data = yaml.safe_load(fh)
if self.raw_config is None:
if data is None:
raise ConfigurationError("Config file is empty")
self.raw_config = MainConfig(**data)
# Process targets
if self.raw_config["targets"] is None:
if self.raw_config.targets is None:
raise ConfigurationError("Config file does not specify targets")
for target in self.raw_config["targets"]:
self._targets.append(MachineConfig(self.raw_config["targets"][target]))
for target in self.raw_config.targets:
self._targets.append(MachineConfig(target))
# Process attackers
if self.raw_config["attackers"] is None:
if self.raw_config.attackers is None:
raise ConfigurationError("Config file does not specify attackers")
for attacker in self.raw_config["attackers"]:
self._attackers.append(MachineConfig(self.raw_config["attackers"][attacker]))
for attacker in self.raw_config.attackers:
self._attackers.append(MachineConfig(attacker))
def targets(self) -> list[MachineConfig]:
""" Return config for targets as MachineConfig objects """
@ -210,7 +210,7 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
return self.raw_config["caldera"]["apikey"]
return self.raw_config.caldera.apikey
def loot_dir(self) -> str:
""" Returns the loot dir """
@ -218,10 +218,8 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "results" not in self.raw_config or self.raw_config["results"] is None:
raise ConfigurationError("results missing in configuration")
try:
res = self.raw_config["results"]["loot_dir"]
res = self.raw_config.results.loot_dir
except KeyError as error:
raise ConfigurationError("results/loot_dir not properly set in configuration") from error
return res
@ -234,10 +232,9 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if self.raw_config["attack_conf"] is None:
raise ConfigurationError("Config file missing attacks")
try:
res = self.raw_config["attack_conf"][attack]
res = self.raw_config.attack_conf[attack]
except KeyError:
res = {}
if res is None:
@ -252,10 +249,9 @@ class ExperimentConfig():
raise ConfigurationError("Config file is empty")
try:
res = self.raw_config["caldera_conf"]["obfuscator"]
return self.raw_config.attacks.caldera_obfuscator
except KeyError:
return "plain-text"
return res
def get_caldera_jitter(self) -> str:
""" Get the caldera configuration. In this case: Jitter. Will default to 4/8 """
@ -264,10 +260,9 @@ class ExperimentConfig():
raise ConfigurationError("Config file is empty")
try:
res = self.raw_config["caldera_conf"]["jitter"]
return self.raw_config.attacks.caldera_jitter
except KeyError:
return "4/8"
return res
def get_plugin_based_attacks(self, for_os: str) -> list[str]:
""" Get the configured kali attacks to run for a specific OS
@ -278,11 +273,11 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "plugin_based_attacks" not in self.raw_config:
if not self.raw_config.has_key("plugin_based_attacks"):
return []
if for_os not in self.raw_config["plugin_based_attacks"]:
if not self.raw_config.plugin_based_attacks.has_key(for_os):
return []
res = self.raw_config["plugin_based_attacks"][for_os]
res = self.raw_config.plugin_based_attacks.get(for_os)
if res is None:
return []
return res
@ -296,11 +291,11 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "caldera_attacks" not in self.raw_config:
if not self.raw_config.has_key("caldera_attacks"):
return []
if for_os not in self.raw_config["caldera_attacks"]:
if not self.raw_config.caldera_attacks.has_key(for_os):
return []
res = self.raw_config["caldera_attacks"][for_os]
res = self.raw_config.caldera_attacks.get(for_os)
if res is None:
return []
return res
@ -325,11 +320,9 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "sensors" not in self.raw_config:
return {}
if self.raw_config["sensors"] is None: # Better for unit tests that way.
if self.raw_config.sensor_conf is None: # Better for unit tests that way.
return {}
if name in self.raw_config["sensors"]:
return self.raw_config["sensors"][name]
if name in self.raw_config.sensor_conf:
return self.raw_config.sensor_conf[name]
return {}

@ -3,7 +3,7 @@
""" Pydantic verifier for config structure """
from pydantic.dataclasses import dataclass
from pydantic import conlist, BaseModel
from pydantic import conlist
from typing import Literal, Optional, TypedDict, Union
from enum import Enum
@ -13,26 +13,48 @@ class OSEnum(str, Enum):
windows = "windows"
class VMControllerTypeEnum(str, Enum):
vagrant = "vagrant"
running_vm = "running_vm"
@dataclass
class CalderaConfig:
apikey: str
def has_key(self, keyname):
if keyname in self.__dict__.keys():
return True
return False
@dataclass
class VMController:
vm_type: VMControllerTypeEnum
vagrantfilepath: str
ip: Optional[str] = ""
def has_key(self, keyname):
if keyname in self.__dict__.keys():
return True
return False
@dataclass
class Attacker:
name: str
vm_controller: dict
vm_controller: VMController
vm_name: str
nicknames: Optional[list[str]]
machinepath: str
os: OSEnum
use_existing_machine: bool = False
def has_key(self, keyname):
if keyname in self.__dict__.keys():
return True
return False
@dataclass
class VMController:
type: str
vagrantfilepath: str
ip: Optional[str] = ""
@dataclass
class Target:
@ -44,6 +66,7 @@ class Target:
group: str
machinepath: str
sensors: Optional[list[str]]
nicknames: Optional[list[str]]
active: bool = True
use_existing_machine: bool = False
playground: Optional[str] = None
@ -53,12 +76,22 @@ class Target:
ssh_keyfile: Optional[str] = None
vulnerabilities: list[str] = None
def has_key(self, keyname):
if keyname in self.__dict__.keys():
return True
return False
@dataclass
class AttackConfig:
nap_time: int
caldera_obfuscator: str
caldera_jitter: str
caldera_obfuscator: str = "plain-text"
caldera_jitter: str = "4/8"
nap_time: int = 5
def has_key(self, keyname):
if keyname in self.__dict__.keys():
return True
return False
@dataclass
@ -66,14 +99,29 @@ class AttackList:
linux: Optional[list[str]]
windows: Optional[list[str]]
def has_key(self, keyname):
if keyname in self.__dict__.keys():
return True
return False
def get(self, keyname, default=None):
if self.has_key(keyname):
return self.__dict__[keyname]
return default
@dataclass
class Results:
loot_dir: str
def has_key(self, keyname):
if keyname in self.__dict__.keys():
return True
return False
@dataclass
class MainConfig():
class MainConfig:
caldera: CalderaConfig
attackers: conlist(Attacker, min_items=1)
targets: conlist(Target, min_items=1)
@ -83,8 +131,13 @@ class MainConfig():
results: Results
# Free form configuration for plugins
attack_conf: dict
sensor_conf: dict
attack_conf: Optional[dict]
sensor_conf: Optional[dict]
def has_key(self, keyname):
if keyname in self.__dict__.keys():
return True
return False
# TODO: Check for name duplication

@ -41,6 +41,11 @@ extensions += ['sphinx_pyreverse']
extensions += ['sphinxcontrib.autoyaml']
autoyaml_level = 5
# Pydantic plugin for sphinx. Another way to generate config documentation
# extensions += ['sphinx-pydantic']
# This has bugs and is not properly maintained
# But would be awesome: https://sphinx-pydantic.readthedocs.io/en/latest/
# Properly display command line behaviour https://pypi.org/project/sphinxcontrib.asciinema/
# https://github.com/divi255/sphinxcontrib.asciinema/issues/11
extensions += ['sphinxcontrib.asciinema']

@ -25,3 +25,5 @@ if __name__ == "__main__":
r = load(arguments.filename)
print(r)
print(r.caldera.apikey)
# print(r.blarg)
print(dir(r.__dict__))

@ -11,13 +11,14 @@ pymetasploit3==1.0.3
pylint==2.9.3
flask==2.0.2
pydantic==1.8.2
dotmap==1.3.25
# Sphinx stuff
sphinx-argparse==0.2.5
sphinxcontrib-autoyaml==0.6.1
sphinx-pyreverse==0.0.13
sphinxcontrib.asciinema==0.3.2
sphinx-pydantic
# sphinx-pydantic # This one has issues that must be fixed upstream first
# Mypy stuff
mypy==0.910

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -49,9 +49,9 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
@ -74,13 +74,20 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
### Sensors to run on this machine
sensors:
# - windows_osquery
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -101,6 +108,10 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
### Sensors to run on this machine
sensors:
# - windows_osquery
###
# General attack config
attacks:
@ -108,6 +119,15 @@ attacks:
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
caldera_obfuscator: plain-text
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
caldera_jitter: 4/8
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
@ -160,7 +180,7 @@ results:
###
# General sensor config config
sensors:
sensor_conf:
###
# Windows sensor plugin configuration
windows_sensor:

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,13 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
nicknames:
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +67,16 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
sensors:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -91,6 +97,8 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
sensors:
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
@ -105,7 +113,24 @@ caldera_attacks:
#- "foo"
#- "bar"
sensor_conf:
###
# General attack config
attacks:
###
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
caldera_obfuscator: plain-text
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
caldera_jitter: 4/8
## A bug in production was triggered by this half config. Adding a unit test
###

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,15 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
nicknames:
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +69,17 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
sensors:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -91,17 +100,8 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
###
# Configuration for caldera
caldera_conf:
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
obfuscator: foo-bar
sensors:
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
jitter: 08/15
###
# A list of caldera attacks to run against the targets.
@ -131,6 +131,16 @@ plugin_based_attacks:
- medusa
- skylla
attacks:
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
caldera_obfuscator: foo-bar
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
caldera_jitter: 08/15
###
# Configuration for the plugin based attack tools
attack_conf:
@ -150,6 +160,8 @@ attack_conf:
# A file containing potential passwords
pwdfile: passwords.txt
sensor_conf:
###
# Settings for the results being harvested
results:

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,19 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
# Used for tests
nicknames:
# - 1
# - 2
# - 3
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +73,23 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
### Sensors to run on this machine
sensors:
# - windows_osquery
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
# Used for tests
nicknames:
- a
- b
- c
os: windows
paw: target2w
group: red
@ -91,6 +110,10 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
### Sensors to run on this machine
sensors:
# - windows_osquery
###
# General attack config
attacks:
@ -98,6 +121,15 @@ attacks:
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# The obfuscator to use between the implant and the server. Not all obfuscators are supported by all implants. Existing obfuscators:
# plain-text, base64, base64jumble, caesar, base64noPadding, steganography
caldera_obfuscator: plain-text
###
# Jitter settings for the implant. it is min/max seconds. The first number has to be smaller. Default is 4/8
caldera_jitter: 4/8
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
@ -150,7 +182,7 @@ results:
###
# General sensor config config
sensors:
sensor_conf:
###
# Windows sensor plugin configuration
windows_sensor:

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,13 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
nicknames:
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +67,16 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
sensors:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -91,6 +97,8 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
sensors:
###
# General attack config
attacks:
@ -150,6 +158,4 @@ results:
###
# General sensor config config
sensors:
foo:
sensor_conf:

@ -11,14 +11,14 @@ caldera:
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
- name: attacker
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
vm_type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
@ -27,6 +27,8 @@ attackers:
# Name of machine in Vagrantfile
vm_name: attacker
nicknames:
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
@ -46,12 +48,13 @@ attackers:
targets:
###
# Specific target
target1:
- name: target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target1
nicknames:
os: linux
###
# Targets need a unique PAW name for caldera
@ -64,13 +67,17 @@ targets:
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
sensors:
- name: target2
#root: systems/target1
vm_controller:
type: vagrant
vm_type: vagrant
vagrantfilepath: systems
vm_name: target2
nicknames:
os: windows
paw: target2w
group: red
@ -91,6 +98,8 @@ targets:
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
sensors:
###
# General attack config
attacks:
@ -99,9 +108,7 @@ attacks:
nap_time: 5
## Broken caldera conf
caldera_conf:
foo: bar
sensor_conf:
###
# A list of caldera attacks to run against the targets.

@ -6,6 +6,7 @@ import unittest
# import os
from app.config import ExperimentConfig, MachineConfig
from app.exceptions import ConfigurationError
from dotmap import DotMap
# https://docs.python.org/3/library/unittest.html
@ -21,481 +22,429 @@ class TestMachineConfig(unittest.TestCase):
def test_basic_init(self):
""" The init is basic and working """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
"vm_name": "target1"}))
self.assertEqual(mc.raw_config["root"], "systems/attacker1")
self.assertEqual(mc.raw_config["vm_controller"]["type"], "vagrant")
def test_missing_vm_name(self):
""" The vm name is missing """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
}})
self.assertEqual(mc.raw_config.vm_controller.vm_type, "vagrant")
def test_use_existing_machine_is_true(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": True})
"use_existing_machine": True}))
self.assertEqual(mc.use_existing_machine(), True)
def test_use_existing_machine_is_false(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.use_existing_machine(), False)
def test_use_existing_machine_is_default(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
"vm_name": "target1"}))
self.assertEqual(mc.use_existing_machine(), False)
def test_windows_is_valid_os(self):
""" Testing if windows is valid os """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "windows",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
"vm_name": "target1"}))
self.assertEqual(mc.os(), "windows")
def test_windows_is_valid_os_casefix(self):
""" Testing if windows is valid os - using lowercase fix"""
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "WINDOWS",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
"vm_name": "target1"}))
self.assertEqual(mc.os(), "windows")
def test_linux_is_valid_os(self):
""" Testing if windows is valid os """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
"vm_name": "target1"}))
self.assertEqual(mc.os(), "linux")
def test_missing_os(self):
""" The os is missing """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_wrong_os(self):
""" The os is wrong """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "BROKEN",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_vagrant_is_valid_vmcontroller(self):
""" Testing if vagrant is valid vmcontroller """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
"vm_name": "target1"}))
self.assertEqual(mc.vmcontroller(), "vagrant")
def test_vagrant_is_valid_vmcontroller_casefix(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "VAGRANT",
"vm_type": "VAGRANT",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
"vm_name": "target1"}))
self.assertEqual(mc.vmcontroller(), "vagrant")
def test_invalid_vmcontroller(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "BROKEN",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_missing_vmcontroller_2(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_name": "target1"})
def test_vagrant_is_valid_vmip(self):
""" Testing if vagrant is valid ip/url """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"ip": "kali",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
"vm_name": "target1"}))
self.assertEqual(mc.vm_ip(), "kali")
def test_missing_vmip(self):
""" Testing if missing vm ip is handled"""
vm_name = "target1"
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": vm_name})
"vm_name": vm_name}))
self.assertEqual(mc.vm_ip(), vm_name)
def test_machinepath(self):
""" Testing machinepath setting """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False,
"machinepath": "foo"})
"machinepath": "foo"}))
self.assertEqual(mc.machinepath(), "foo")
def test_machinepath_fallback(self):
""" Testing machinepath setting fallback to vmname"""
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.machinepath(), "target1")
def test_paw(self):
""" Testing for caldera paw """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"paw": "Bar",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.caldera_paw(), "Bar")
def test_paw_fallback(self):
""" Testing for caldera paw fallback """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.caldera_paw(), None)
def test_group(self):
""" Testing for caldera group """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"group": "Bar",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.caldera_group(), "Bar")
def test_group_fallback(self):
""" Testing for caldera group fallback """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.caldera_group(), None)
def test_ssh_keyfile(self):
""" Testing keyfile config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"ssh_keyfile": "Bar",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.ssh_keyfile(), "Bar")
def test_ssh_keyfile_default(self):
""" Testing keyfile config default """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.ssh_keyfile(), None)
def test_ssh_user(self):
""" Testing ssh user config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"ssh_user": "Bob",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.ssh_user(), "Bob")
def test_ssh_user_default(self):
""" Testing ssh user default config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.ssh_user(), "vagrant")
def test_ssh_password(self):
""" Testing ssh password config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"ssh_user": "Bob",
"ssh_password": "Ross",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.ssh_password(), "Ross")
def test_ssh_password_default(self):
""" Testing ssh password default config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertIsNone(mc.ssh_password())
def test_halt_needs_force_default(self):
""" Testing 'halt needs force' default config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.halt_needs_force(), False)
def test_halt_needs_force(self):
""" Testing 'halt needs force' config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.halt_needs_force(), True)
def test_vagrantfilepath(self):
""" Testing vagrantfilepath config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.vagrantfilepath(), "systems")
def test_vagrantfilepath_missing(self):
""" Testing missing vagrantfilepath config """
with self.assertRaises(ConfigurationError):
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False})))
mc.vagrantfilepath()
def test_sensors_empty(self):
""" Testing empty sensor config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.sensors(), [])
def test_sensors_set(self):
""" Testing empty sensor config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"sensors": ["linux_idp", "test_sensor"]})
"sensors": ["linux_idp", "test_sensor"]}))
self.assertEqual(mc.sensors(), ["linux_idp", "test_sensor"])
def test_vulnerabilities_empty(self):
""" Testing empty vulnerabilities config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
"use_existing_machine": False}))
self.assertEqual(mc.vulnerabilities(), [])
def test_vulnerabilities_set(self):
""" Testing empty vulnerabilities config """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"vulnerabilities": ["PEBKAC", "USER"]})
"vulnerabilities": ["PEBKAC", "USER"]}))
self.assertEqual(mc.vulnerabilities(), ["PEBKAC", "USER"])
def test_active_not_set(self):
""" machine active not set """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"sensors": ["linux_idp", "test_sensor"]})
"sensors": ["linux_idp", "test_sensor"]}))
self.assertEqual(mc.is_active(), True)
def test_active_is_false(self):
""" machine active is set to false """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"active": False,
"sensors": ["linux_idp", "test_sensor"]})
"sensors": ["linux_idp", "test_sensor"]}))
self.assertEqual(mc.is_active(), False)
def test_active_is_true(self):
""" machine active is set to true """
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"active": True,
"sensors": ["linux_idp", "test_sensor"]})
"sensors": ["linux_idp", "test_sensor"]}))
self.assertEqual(mc.is_active(), True)
@ -509,7 +458,7 @@ class TestExperimentConfig(unittest.TestCase):
""" Existing, basic config file, testing the values are loaded properly """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.raw_config["caldera"]["apikey"], "ADMIN123")
self.assertEqual(ex.raw_config.caldera.apikey, "ADMIN123")
self.assertEqual(ex.caldera_apikey(), "ADMIN123")
def test_broken_apikey(self):
@ -528,22 +477,6 @@ class TestExperimentConfig(unittest.TestCase):
with self.assertRaises(ConfigurationError):
e.loot_dir()
def test_broken_attack_conf(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config = None
with self.assertRaises(ConfigurationError):
e.attack_conf("hydra")
def test_broken_attack_conf_2(self):
""" Test with partially empty config file """
e = ExperimentConfig("tests/data/basic.yaml")
e.raw_config["attack_conf"] = None
with self.assertRaises(ConfigurationError):
e.attack_conf("hydra")
def test_broken_caldera_obfuscator_conf(self):
""" Test with partially empty config file """
@ -598,53 +531,12 @@ class TestExperimentConfig(unittest.TestCase):
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.loot_dir(), "loot")
def test_missing_loot_dir(self):
""" Test with missing loot dir """
with self.assertRaises(ConfigurationError):
ExperimentConfig("tests/data/basic_loot_missing.yaml")
def test_empty_config(self):
""" Test with empty config file """
with self.assertRaises(ConfigurationError):
ExperimentConfig("tests/data/empty.yaml")
def test_empty_targets(self):
""" Test with empty targets in file """
with self.assertRaises(ConfigurationError):
ExperimentConfig("tests/data/empty_targets.yaml")
def test_empty_attackers(self):
""" Test with empty attackers in file """
with self.assertRaises(ConfigurationError):
ExperimentConfig("tests/data/empty_attackers.yaml")
def test_missing_results(self):
""" Test with missing results """
with self.assertRaises(ConfigurationError):
ExperimentConfig("tests/data/basic_results_missing.yaml")
def test_basic_loading_targets_read(self):
""" Targets in config: can be found """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(len(ex._targets), 2)
self.assertEqual(ex._targets[0].raw_config["vm_name"], "target1")
self.assertEqual(ex._targets[0].vmname(), "target1")
self.assertEqual(ex.targets()[0].vmname(), "target1")
def test_basic_loading_attacker_read(self):
""" Attackers in config: can be found """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(len(ex._targets), 2)
self.assertEqual(ex._attackers[0].raw_config["vm_name"], "attacker")
self.assertEqual(ex._attackers[0].vmname(), "attacker")
self.assertEqual(ex.attackers()[0].vmname(), "attacker")
self.assertEqual(ex.attacker(0).vmname(), "attacker")
def test_nicknames_missing(self):
""" Test when the machine nicknames are non existing """
ex = ExperimentConfig("tests/data/basic.yaml")
@ -658,7 +550,7 @@ class TestExperimentConfig(unittest.TestCase):
def test_nicknames_present(self):
""" Test when the machine nicknames are there """
ex = ExperimentConfig("tests/data/attacker_has_empty_nicknames.yaml")
self.assertEqual(ex._targets[0].get_nicknames(), [1, 2, 3])
self.assertEqual(ex._targets[0].get_nicknames(), ["1", "2", "3"])
def test_missing_attack_config(self):
""" Getting attack config for a specific attack. Attack missing """
@ -674,14 +566,6 @@ class TestExperimentConfig(unittest.TestCase):
data = ex.attack_conf("hydra")
self.assertEqual(data["userfile"], "users.txt")
def test_attack_config_missing_attack_data(self):
""" Getting attack config for a specific attack: Missing """
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
data = ex.attack_conf("missing")
self.assertEqual(data, {})
def test_missing_caldera_config_obfuscator(self):
""" A config file with no caldera config at all """
@ -718,27 +602,6 @@ class TestExperimentConfig(unittest.TestCase):
ex = ExperimentConfig("tests/data/attacks_perfect.yaml")
self.assertEqual(ex.get_caldera_jitter(), "08/15")
def test_nap_time(self):
""" nap time is set """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.get_nap_time(), 5)
def test_nap_time_not_set(self):
""" nap time is not set """
ex = ExperimentConfig("tests/data/nap_time_missing.yaml")
self.assertEqual(ex.get_nap_time(), 0)
def test_kali_attacks_missing(self):
""" kali attacks entry fully missing from config """
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
self.assertEqual(ex.get_plugin_based_attacks("linux"), [])
def test_kali_attacks_empty(self):
""" zero entries in kali attacks list """
@ -760,13 +623,6 @@ class TestExperimentConfig(unittest.TestCase):
self.assertEqual(ex.get_plugin_based_attacks("windows"), ["hydra", "medusa", "skylla"])
def test_caldera_attacks_missing(self):
""" caldera attacks entry fully missing from config """
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
self.assertEqual(ex.get_caldera_attacks("linux"), [])
def test_kali_attacks_half(self):
""" kali attacks entry partially missing from config """
@ -818,13 +674,6 @@ class TestExperimentConfig(unittest.TestCase):
self.assertEqual(ex.get_sensor_config("missing_windows_sensor"), {})
def test_basic_sensor_entry_missing(self):
""" Test global configuration for a specific and missing sensor entry"""
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
self.assertEqual(ex.get_sensor_config("windows_sensor"), {})
def test_basic_sensor_entry_empty(self):
""" Test global configuration for a specific and empty sensor entry"""

@ -1,5 +1,6 @@
import unittest
import os
from dotmap import DotMap
from app.machinecontrol import Machine
from app.exceptions import ConfigurationError
from app.config import MachineConfig
@ -15,167 +16,137 @@ class TestMachineControl(unittest.TestCase):
self.attack_logger = AttackLog(0)
def test_get_os_linux_machine(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}, self.attack_logger)
"vm_name": "target3"}), self.attack_logger)
self.assertEqual(m.get_os(), "linux")
def test_get_os_linux_machine_with_config_class(self):
mc = MachineConfig({"root": "systems/attacker1",
mc = MachineConfig(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
"vm_name": "target3"}))
m = Machine(mc, self.attack_logger)
self.assertEqual(m.get_os(), "linux")
def test_get_os_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
def test_get_os_not_supported(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "nintendo_switch",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}, self.attack_logger)
def test_get_paw_good(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"paw": "testme",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}, self.attack_logger)
"vm_name": "target3"}), self.attack_logger)
self.assertEqual(m.get_paw(), "testme")
def test_get_paw_missing(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
}), self.attack_logger)
self.assertEqual(m.get_paw(), None)
def test_get_group_good(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"group": "testme",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"}, self.attack_logger)
"vm_name": "target3"}), self.attack_logger)
self.assertEqual(m.get_group(), "testme")
def test_get_group_missing(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
}), self.attack_logger)
self.assertEqual(m.get_group(), None)
def test_vagrantfilepath_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
},
"vm_name": "target3"
}, self.attack_logger)
}), self.attack_logger)
def test_vagrantfile_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "non_existing",
},
"vm_name": "target3"
}, self.attack_logger)
}), self.attack_logger)
def test_vagrantfile_existing(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
}), self.attack_logger)
self.assertIsNotNone(m)
def test_name_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
}, self.attack_logger)
# test: auto generated, dir missing
def test_auto_generated_machinepath_with_path_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "missing"
}, self.attack_logger)
}), self.attack_logger)
# test manual config, dir missing
def test_configured_machinepath_with_path_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"machinepath": "missing"
}, self.attack_logger)
}), self.attack_logger)
# test auto generated, dir there (external/internal dirs must work !)
def test_auto_generated_machinepath_with_good_config(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
}, self.attack_logger)
}), self.attack_logger)
vagrantfilepath = os.path.abspath("systems")
ext = os.path.join(vagrantfilepath, "target3")
internal = os.path.join("/vagrant/", "target3")
@ -185,15 +156,15 @@ class TestMachineControl(unittest.TestCase):
# test: manual config, dir there (external/internal dirs must work !)
def test_configured_machinepath_with_good_config(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "missing",
"machinepath": "target3"
}, self.attack_logger)
}), self.attack_logger)
vagrantfilepath = os.path.abspath("systems")
ext = os.path.join(vagrantfilepath, "target3")
internal = os.path.join("/vagrant/", "target3")
@ -204,36 +175,23 @@ class TestMachineControl(unittest.TestCase):
# vm_controller missing
def test_configured_vm_controller_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_name": "missing",
"machinepath": "target3"
}, self.attack_logger)
# vm_controller wrong
def test_configured_vm_controller_wrong_type(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "wrong_controller",
"vagrantfilepath": "systems",
},
"vm_name": "missing",
"machinepath": "target3"
}, self.attack_logger)
}), self.attack_logger)
# Create caldera start command and verify it
def test_get_linux_caldera_start_cmd(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw"}, self.attack_logger)
"paw": "testpaw"}), self.attack_logger)
m.set_caldera_server("http://www.test.test")
with patch.object(m.vm_manager, "get_playground", return_value="/vagrant/target3"):
cmd = m.create_start_caldera_client_cmd()
@ -241,16 +199,16 @@ class TestMachineControl(unittest.TestCase):
# Create caldera start command and verify it (windows)
def test_get_windows_caldera_start_cmd(self):
m = Machine({"root": "systems/attacker1",
m = Machine(DotMap({"root": "systems/attacker1",
"os": "windows",
"vm_controller": {
"type": "vagrant",
"vm_type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw",
"machinepath": "target2w"}, self.attack_logger)
"machinepath": "target2w"}), self.attack_logger)
m.set_caldera_server("www.test.test")
cmd = m.create_start_caldera_client_cmd()
self.maxDiff = None

@ -30,7 +30,7 @@ exclude =
max-complexity = 10
[testenv]
deps = -rrequirements.txt
deps = -r requirements.txt
flake8
safety
bandit

Loading…
Cancel
Save