Merge pull request #42 from avast/more_unit_tests

More unit tests
pull/43/head
Thorsten Sick 2 years ago committed by GitHub
commit 32d33f015f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -26,8 +26,8 @@ pylint:
# Testing if types are used properly
mypy:
mypy --strict-optional app/
mypy --strict-optional app/ plugins/base/
# Fixing mypy file by file
stepbystep:
mypy --strict-optional plugins/base/plugin_base.py plugins/base/machinery.py app/config.py plugins/base/caldera.py plugins/base/attack.py plugins/base/sensor.py plugins/base/ssh_features.py plugins/base/vulnerability_plugin.py app/attack_log.py app/calderacontrol.py
mypy --strict-optional --disallow-untyped-defs --check-untyped-defs plugins/base/ app/

@ -510,6 +510,15 @@ class AttackLog():
logid = timestamp + "_" + str(randint(1, 100000))
cframe = currentframe()
default_sourcefile = ""
if cframe is not None:
if cframe.f_back is not None:
default_sourcefile = getsourcefile(cframe.f_back) or ""
default_sourceline = -1
if cframe is not None:
if cframe.f_back is not None:
default_sourceline = cframe.f_back.f_lineno
data = {"timestamp": timestamp,
"timestamp_end": None,
@ -528,8 +537,8 @@ class AttackLog():
"situation_description": kwargs.get("situation_description", None), # Description for the situation this attack was run in. Set by the plugin or attacker emulation
"countermeasure": kwargs.get("countermeasure", None), # Set by the attack
"result": None,
"sourcefile": kwargs.get("sourcefile", getsourcefile(cframe.f_back)),
"sourceline": kwargs.get("sourceline", cframe.f_back.f_lineno)
"sourcefile": kwargs.get("sourcefile", default_sourcefile),
"sourceline": kwargs.get("sourceline", default_sourceline)
}
self.__add_to_log__(data)

@ -5,7 +5,7 @@
import json
from pprint import pformat
from typing import Optional, Union
from typing import Optional, Union, Annotated
import requests
import simplejson
from pydantic.dataclasses import dataclass
@ -19,13 +19,13 @@ from pydantic import conlist # pylint: disable=no-name-in-module
# TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)"
@dataclass
class Variation:
class Variation: # pylint: disable=missing-class-docstring
description: str
command: str
@dataclass
class ParserConfig:
class ParserConfig: # pylint: disable=missing-class-docstring
source: str
edge: str
target: str
@ -33,27 +33,27 @@ class ParserConfig:
@dataclass
class Parser:
class Parser: # pylint: disable=missing-class-docstring
module: str
relationships: list[ParserConfig] # undocumented ! Needs improvement ! TODO
parserconfigs: Optional[list[ParserConfig]] = None
@dataclass
class Requirement:
class Requirement: # pylint: disable=missing-class-docstring
module: str
relationship_match: list[dict]
@dataclass
class AdditionalInfo:
class AdditionalInfo: # pylint: disable=missing-class-docstring
additionalProp1: Optional[str] = None # pylint: disable=invalid-name
additionalProp2: Optional[str] = None # pylint: disable=invalid-name
additionalProp3: Optional[str] = None # pylint: disable=invalid-name
@dataclass
class Executor:
class Executor: # pylint: disable=missing-class-docstring
build_target: Optional[str] # Why can this be None ?
language: Optional[str] # Why can this be None ?
payloads: list[str]
@ -106,9 +106,10 @@ class Ability:
@dataclass
class AbilityList:
""" A list of exploits """
abilities: conlist(Ability, min_items=1)
abilities: Annotated[list, conlist(Ability, min_items=1)]
def get_data(self):
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.abilities
@ -123,9 +124,10 @@ class Obfuscator:
@dataclass
class ObfuscatorList:
""" A list of obfuscators """
obfuscators: conlist(Obfuscator, min_items=1)
obfuscators: Annotated[list, conlist(Obfuscator, min_items=1)]
def get_data(self):
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.obfuscators
@ -152,14 +154,15 @@ class Adversary:
@dataclass
class AdversaryList:
""" A list of adversary """
adversaries: conlist(Adversary, min_items=1)
adversaries: Annotated[list, conlist(Adversary, min_items=1)]
def get_data(self):
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.adversaries
@dataclass
class Fact:
class Fact: # pylint: disable=missing-class-docstring
unique: str
name: str
score: int
@ -183,7 +186,7 @@ class Fact:
@dataclass
class Relationship:
class Relationship: # pylint: disable=missing-class-docstring
target: Fact
unique: str
score: int
@ -193,13 +196,13 @@ class Relationship:
@dataclass
class Visibility:
class Visibility: # pylint: disable=missing-class-docstring
score: int
adjustments: list[int]
@dataclass
class Link:
class Link: # pylint: disable=missing-class-docstring
pin: int
ability: Ability
paw: str
@ -273,18 +276,19 @@ class AgentList:
agents: list[Agent]
def get_data(self):
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.agents
@dataclass
class Rule:
class Rule: # pylint: disable=missing-class-docstring
match: str
trait: str
action: Optional[str] = None
@dataclass
class Adjustment:
class Adjustment: # pylint: disable=missing-class-docstring
offset: int
trait: str
value: str
@ -292,7 +296,7 @@ class Adjustment:
@dataclass
class Source:
class Source: # pylint: disable=missing-class-docstring
name: str
plugin: str
facts: list[Fact]
@ -310,10 +314,11 @@ class Source:
@dataclass
class SourceList:
class SourceList: # pylint: disable=missing-class-docstring
sources: list[Source]
def get_data(self):
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.sources
@ -334,14 +339,16 @@ class Planner:
@dataclass
class PlannerList:
""" A list of planners"""
planners: list[Planner]
def get_data(self):
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.planners
@dataclass
class Goal:
class Goal: # pylint: disable=missing-class-docstring
target: str
count: int
achieved: bool
@ -350,7 +357,7 @@ class Goal:
@dataclass
class Objective:
class Objective: # pylint: disable=missing-class-docstring
percentage: int
name: str
goals: list[Goal]
@ -396,17 +403,20 @@ class Operation:
@dataclass
class OperationList:
operations: conlist(Operation)
""" A list of operations """
operations: Annotated[list, conlist(Operation)]
def get_data(self):
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.operations
@dataclass
class ObjectiveList:
objectives: conlist(Objective)
class ObjectiveList: # pylint: disable=missing-class-docstring
objectives: Annotated[list, conlist(Objective)]
def get_data(self):
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.objectives

@ -66,11 +66,14 @@ class CalderaControl(CalderaAPI):
return {}
res = {}
for i in source.get("facts"):
res[i.get("trait")] = {"value": i.get("value"),
"technique_id": i.get("technique_id"),
"collected_by": i.get("collected_by")
}
if source is not None:
facts = source.get("facts")
if facts is not None:
for fact in facts:
res[fact.get("trait")] = {"value": fact.get("value"),
"technique_id": fact.get("technique_id"),
"collected_by": fact.get("collected_by")
}
return res
def list_paws_of_running_agents(self) -> list[str]:
@ -344,7 +347,12 @@ class CalderaControl(CalderaAPI):
return False
self.add_adversary(adversary_name, ability_id)
adid = self.get_adversary(adversary_name).get("adversary_id")
adversary = self.get_adversary(adversary_name)
if adversary is None:
raise CalderaError("Could not get adversary")
adid = adversary.get("adversary_id", None)
if adid is None:
raise CalderaError("Could not get adversary by id")
logid = self.attack_logger.start_caldera_attack(source=self.url,
paw=paw,
@ -370,7 +378,12 @@ class CalderaControl(CalderaAPI):
)
self.attack_logger.vprint(pformat(res), 3)
opid = self.get_operation(operation_name).get("id")
operation = self.get_operation(operation_name)
if operation is None:
raise CalderaError("Was not able to get operation")
opid = operation.get("id")
if opid is None:
raise CalderaError("Was not able to get operation. Broken ID")
self.attack_logger.vprint("New operation created. OpID: " + str(opid), 3)
self.set_operation_state(opid)

@ -151,7 +151,7 @@ class ExperimentConfig():
:param configfile: The configuration file to process
"""
self.raw_config: MainConfig = None
self.raw_config: Optional[MainConfig] = None
self._targets: list[MachineConfig] = []
self._attackers: list[MachineConfig] = []
self.load(configfile)
@ -232,9 +232,10 @@ class ExperimentConfig():
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
res = {}
try:
res = self.raw_config.attack_conf[attack]
if self.raw_config.attack_conf is not None:
res = self.raw_config.attack_conf[attack]
except KeyError:
res = {}
if res is None:

@ -106,7 +106,7 @@ class Target:
ssh_user: Optional[str] = None
ssh_password: Optional[str] = None
ssh_keyfile: Optional[str] = None
vulnerabilities: list[str] = None
vulnerabilities: Optional[list[str]] = None
def has_key(self, keyname):
""" Checks if a key exists
@ -182,8 +182,8 @@ class Results:
class MainConfig:
""" Central configuration for PurpleDome """
caldera: CalderaConfig
attackers: conlist(Attacker, min_items=1)
targets: conlist(Target, min_items=1)
attackers: conlist(Attacker, min_items=1) # type: ignore
targets: conlist(Target, min_items=1) # type: ignore
attacks: AttackConfig
caldera_attacks: AttackList
plugin_based_attacks: AttackList

@ -28,3 +28,7 @@ class MetasploitError(Exception):
class RequirementError(Exception):
""" Plugin requirements not fulfilled """
class MachineError(Exception):
""" A virtual machine has issues"""

@ -13,7 +13,7 @@ from typing import Optional
from app.attack_log import AttackLog
from app.config import ExperimentConfig
from app.interface_sfx import CommandlineColors
from app.exceptions import ServerError
from app.exceptions import ServerError, CalderaError, MachineError
from app.pluginmanager import PluginManager
from app.doc_generator import DocGenerator
from app.calderacontrol import CalderaControl
@ -26,21 +26,33 @@ from plugins.base.attack import AttackPlugin
class Experiment():
""" Class handling experiments """
def __init__(self, configfile: str, verbosity=0, caldera_attacks: list = None):
def __init__(self, configfile: str, verbosity=0):
"""
:param configfile: Path to the configfile to load
:param verbosity: verbosity level between 0 and 3
:param caldera_attacks: an optional argument to override caldera attacks in the config file and run just this one caldera attack. A list of caldera ID
"""
self.attacker_1: Optional[Machine] = None
self.start_time: str = datetime.now().strftime("%Y_%m_%d___%H_%M_%S") #: time the experiment started.
self.caldera_control: Optional[CalderaControl] = None #: Controller for Caldera interaction
self.loot_dir: str = "loot" #: Directory to store the loot into. Will be fetched from config
self.targets: list[Machine] = [] #: A list of target machines
self.attacker_1: Optional[Machine] = None #: The attacker machine
self.experiment_config = ExperimentConfig(configfile)
self.attack_logger = AttackLog(verbosity)
self.plugin_manager = PluginManager(self.attack_logger)
def run(self, caldera_attacks: list = None):
"""
Run the experiment
:param caldera_attacks: an optional argument to override caldera attacks in the config file and run just this one caldera attack. A list of caldera ID
:return:
"""
self.__start_attacker()
if self.attacker_1 is None:
raise ServerError
raise MachineError("Attacker not initialised")
caldera_url = "http://" + self.attacker_1.get_ip() + ":8888"
self.caldera_control = CalderaControl(caldera_url, attack_logger=self.attack_logger, config=self.experiment_config)
# self.caldera_control = CalderaControl("http://" + self.attacker_1.get_ip() + ":8888", self.attack_logger,
@ -49,92 +61,88 @@ class Experiment():
self.attack_logger.vprint(self.caldera_control.kill_all_agents(), 3)
self.attack_logger.vprint(self.caldera_control.delete_all_agents(), 3)
self.starttime = datetime.now().strftime("%Y_%m_%d___%H_%M_%S")
self.lootdir = os.path.join(self.experiment_config.loot_dir(), self.starttime)
os.makedirs(self.lootdir)
self.start_time = datetime.now().strftime("%Y_%m_%d___%H_%M_%S")
self.loot_dir = os.path.join(self.experiment_config.loot_dir(), self.start_time)
os.makedirs(self.loot_dir)
self.targets = []
# start target machines
for target_conf in self.experiment_config.targets():
if not target_conf.is_active():
continue
tname = target_conf.vmname()
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}preparing target {tname} ....{CommandlineColors.ENDC}", 1)
target_1 = Machine(target_conf, attack_logger=self.attack_logger)
target_1.set_caldera_server(self.attacker_1.get_ip())
try:
if not target_conf.use_existing_machine():
target_1.destroy()
except subprocess.CalledProcessError:
# Maybe the machine just does not exist yet
pass
if self.machine_needs_caldera(target_1, caldera_attacks):
target_1.install_caldera_service()
target_1.up()
target_1.reboot() # Kernel changes on system creation require a reboot
needs_reboot = target_1.prime_vulnerabilities()
needs_reboot |= target_1.prime_sensors()
if needs_reboot:
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}rebooting target {tname} ....{CommandlineColors.ENDC}", 1)
target_1.reboot()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1)
self.targets.append(target_1)
self.start_target_machines(caldera_attacks)
# Install vulnerabilities
for a_target in self.targets:
self.attack_logger.vprint(f"Installing vulnerabilities on {a_target.get_paw()}", 2)
a_target.install_vulnerabilities()
a_target.start_vulnerabilities()
self.install_vulnerabilities()
# Install sensor plugins
for a_target in self.targets:
self.attack_logger.vprint(f"Installing sensors on {a_target.get_paw()}", 2)
a_target.install_sensors()
a_target.start_sensors()
self.install_sensor_plugins()
# First start of caldera implants
at_least_one_caldera_started = False
for target_1 in self.targets:
if self.machine_needs_caldera(target_1, caldera_attacks):
target_1.start_caldera_client()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1)
else:
at_least_one_caldera_started = True
if at_least_one_caldera_started:
time.sleep(20) # Wait for all the clients to contact the caldera server
# TODO: Smarter wait
self.first_start_of_caldera_implants(caldera_attacks)
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Contacting caldera agents on all targets ....{CommandlineColors.ENDC}", 1)
# Wait until all targets are registered as Caldera targets
for target_1 in self.targets:
running_agents = self.caldera_control.list_paws_of_running_agents()
self.attack_logger.vprint(f"Agents currently running: {running_agents}", 2)
while target_1.get_paw() not in running_agents:
if self.machine_needs_caldera(target_1, caldera_attacks) == 0:
self.attack_logger.vprint(f"No caldera agent needed for: {target_1.get_paw()} ", 3)
break
self.attack_logger.vprint(f"Connecting to caldera {caldera_url}, running agents are: {running_agents}", 3)
self.attack_logger.vprint(f"Missing agent: {target_1.get_paw()} ...", 3)
target_1.start_caldera_client()
self.attack_logger.vprint(f"Restarted caldera agent: {target_1.get_paw()} ...", 3)
time.sleep(120) # Was 30, but maybe there are timing issues
running_agents = self.caldera_control.list_paws_of_running_agents()
self.wait_until_all_targets_have_caldera_implants(caldera_url, caldera_attacks)
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1)
# Add running machines to log
for target in self.targets:
i = target.get_machine_info()
i["role"] = "target"
self.attack_logger.add_machine_info(i)
i = self.attacker_1.get_machine_info()
i["role"] = "attacker"
self.attack_logger.add_machine_info(i)
self.add_running_machines_to_log()
# Attack them
self.run_caldera_attacks(caldera_attacks)
# Run plugin based attacks
self.run_plugin_attacks()
# Stop sensor plugins
# Collect data
zip_this = []
for a_target in self.targets:
a_target.stop_sensors()
zip_this += a_target.collect_sensors(self.loot_dir)
# Uninstall vulnerabilities
for a_target in self.targets:
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE} Uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1)
a_target.stop_vulnerabilities()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN} Done uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1)
# Stop target machines
for target_1 in self.targets:
target_1.halt()
self.__stop_attacker()
self.attack_logger.post_process()
attack_log_file_path = os.path.join(self.loot_dir, "attack.json")
self.attack_logger.write_json(attack_log_file_path)
document_generator = DocGenerator()
document_generator.generate(attack_log_file_path)
document_generator.compile_documentation()
zip_this += document_generator.get_outfile_paths()
self.zip_loot(zip_this)
def run_plugin_attacks(self):
""" Run plugin based attacks
"""
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1)
for target_1 in self.targets:
plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os())
metasploit_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks)
print(f"Plugins needing metasploit for {target_1.get_paw()} : {metasploit_plugins}")
for attack in plugin_based_attacks:
# TODO: Work with snapshots
self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with attack: {attack}", 1)
self.attack(target_1, attack)
self.attack_logger.vprint(
f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", 3)
time.sleep(self.experiment_config.get_nap_time())
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1)
def run_caldera_attacks(self, caldera_attacks: Optional[list[str]] = None):
""" Run caldera based attacks
:param caldera_attacks: An optional list of caldera attack ids as string
"""
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1)
for target_1 in self.targets:
if caldera_attacks is None:
@ -147,7 +155,8 @@ class Experiment():
# TODO: Work with snapshots
# TODO: If we have several targets in the same group, it is nonsense to attack each one separately. Make this smarter
self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with {attack}", 2)
if self.caldera_control is None:
raise CalderaError("Caldera control not initialised")
it_worked = self.caldera_control.attack(paw=target_1.get_paw(),
ability_id=attack,
group=target_1.get_group(),
@ -161,12 +170,20 @@ class Experiment():
# Fix: Caldera sometimes gets stuck. This is why we better re-start the caldera server and wait till all the implants re-connected
# Reason: In some scenarios we keep the infra up for hours or days. No re-creation like intended. This can cause Caldera to hick up
if it_worked:
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Restarting caldera server and waiting for clients to re-connect{CommandlineColors.ENDC}", 1)
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Restarting caldera server and waiting for clients to re-connect{CommandlineColors.ENDC}",
1)
if self.attacker_1 is None:
raise MachineError("attacker not initialised")
self.attacker_1.start_caldera_server()
self.attack_logger.vprint(f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", 2)
self.attack_logger.vprint(
f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}",
2)
time.sleep(self.experiment_config.get_nap_time())
retries = 100
for target_system in self.targets:
if self.caldera_control is None:
raise CalderaError("Caldera is not initialised")
if self.machine_needs_caldera(target_system, caldera_attacks) == 0:
self.attack_logger.vprint(f"No caldera agent needed for: {target_system.get_paw()} ", 3)
continue
@ -176,63 +193,133 @@ class Experiment():
time.sleep(1)
running_agents = self.caldera_control.list_paws_of_running_agents()
retries -= 1
self.attack_logger.vprint(f"Waiting for clients to re-connect ({retries}, {running_agents}) ", 3)
self.attack_logger.vprint(
f"Waiting for clients to re-connect ({retries}, {running_agents}) ", 3)
if retries <= 0:
raise ServerError
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Restarted caldera server clients re-connected{CommandlineColors.ENDC}", 1)
self.attack_logger.vprint(
f"{CommandlineColors.OKGREEN}Restarted caldera server clients re-connected{CommandlineColors.ENDC}",
1)
# End of fix
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1)
# Run plugin based attacks
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1)
def add_running_machines_to_log(self):
""" Add machine infos for targets and attacker to the log """
for target in self.targets:
i = target.get_machine_info()
i["role"] = "target"
self.attack_logger.add_machine_info(i)
i = self.attacker_1.get_machine_info()
i["role"] = "attacker"
self.attack_logger.add_machine_info(i)
def wait_until_all_targets_have_caldera_implants(self, caldera_url: str, caldera_attacks: Optional[list[str]] = None):
"""
:param caldera_attacks: a list of command line defined caldera attacks
:param caldera_url: URL of the caldera server
"""
for target_1 in self.targets:
plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os())
metasploit_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks)
print(f"Plugins needing metasploit for {target_1.get_paw()} : {metasploit_plugins}")
for attack in plugin_based_attacks:
# TODO: Work with snapshots
self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with attack: {attack}", 1)
if self.caldera_control is None:
raise CalderaError("Caldera is not initialised")
running_agents = self.caldera_control.list_paws_of_running_agents()
self.attack_logger.vprint(f"Agents currently running: {running_agents}", 2)
while target_1.get_paw() not in running_agents:
if self.machine_needs_caldera(target_1, caldera_attacks) == 0:
self.attack_logger.vprint(f"No caldera agent needed for: {target_1.get_paw()} ", 3)
break
self.attack_logger.vprint(f"Connecting to caldera {caldera_url}, running agents are: {running_agents}",
3)
self.attack_logger.vprint(f"Missing agent: {target_1.get_paw()} ...", 3)
target_1.start_caldera_client()
self.attack_logger.vprint(f"Restarted caldera agent: {target_1.get_paw()} ...", 3)
time.sleep(120) # Was 30, but maybe there are timing issues
running_agents = self.caldera_control.list_paws_of_running_agents()
self.attack(target_1, attack)
self.attack_logger.vprint(f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", 3)
time.sleep(self.experiment_config.get_nap_time())
def first_start_of_caldera_implants(self, caldera_attacks: Optional[list[str]] = None):
""" Start caldera implant on the targets
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1)
:param caldera_attacks: a list of command line defined caldera attacks
"""
at_least_one_caldera_started = False
for target_1 in self.targets:
if self.machine_needs_caldera(target_1, caldera_attacks):
target_1.start_caldera_client()
self.attack_logger.vprint(
f"{CommandlineColors.OKGREEN}Initial start of caldera client: {target_1.get_name()} {CommandlineColors.ENDC}", 1)
else:
at_least_one_caldera_started = True
if at_least_one_caldera_started:
time.sleep(20) # Wait for all the clients to contact the caldera server
# TODO: Smarter wait
# Stop sensor plugins
# Collect data
zip_this = []
def install_sensor_plugins(self):
""" Installs sensor plugins on the targets
"""
for a_target in self.targets:
a_target.stop_sensors()
zip_this += a_target.collect_sensors(self.lootdir)
self.attack_logger.vprint(f"Installing sensors on {a_target.get_paw()}", 2)
a_target.install_sensors()
a_target.start_sensors()
# Uninstall vulnerabilities
def install_vulnerabilities(self):
""" Install vulnerabilities on the targets
"""
for a_target in self.targets:
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE} Uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1)
a_target.stop_vulnerabilities()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN} Done uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1)
self.attack_logger.vprint(f"Installing vulnerabilities on {a_target.get_paw()}", 2)
a_target.install_vulnerabilities()
a_target.start_vulnerabilities()
# Stop target machines
for target_1 in self.targets:
target_1.halt()
self.__stop_attacker()
def start_target_machines(self, caldera_attacks: Optional[list[str]] = None):
""" Start target machines
self.attack_logger.post_process()
attack_log_file_path = os.path.join(self.lootdir, "attack.json")
self.attack_logger.write_json(attack_log_file_path)
document_generator = DocGenerator()
document_generator.generate(attack_log_file_path)
document_generator.compile_documentation()
zip_this += document_generator.get_outfile_paths()
self.zip_loot(zip_this)
:param caldera_attacks: Caldera attacks as defined on the command line
"""
for target_conf in self.experiment_config.targets():
if not target_conf.is_active():
continue
tname = target_conf.vmname()
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}preparing target {tname} ....{CommandlineColors.ENDC}", 1)
target_1 = Machine(target_conf, attack_logger=self.attack_logger)
if target_1 is None:
raise MachineError("Creating target machine failed")
if self.attacker_1 is None:
raise MachineError("Creating attacker machine failed")
target_1.set_caldera_server(self.attacker_1.get_ip())
try:
if not target_conf.use_existing_machine():
target_1.destroy()
except subprocess.CalledProcessError:
# Maybe the machine just does not exist yet
pass
if self.machine_needs_caldera(target_1, caldera_attacks):
target_1.install_caldera_service()
target_1.up()
target_1.reboot() # Kernel changes on system creation require a reboot
needs_reboot = target_1.prime_vulnerabilities()
needs_reboot |= target_1.prime_sensors()
if needs_reboot:
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}rebooting target {tname} ....{CommandlineColors.ENDC}", 1)
target_1.reboot()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1)
self.targets.append(target_1)
def machine_needs_caldera(self, target, caldera_conf):
""" Counts the attacks and plugins needing caldera that are registered for this machine """
def machine_needs_caldera(self, target, caldera_from_cmdline: Optional[list[str]] = None) -> int:
""" Counts the attacks and plugins needing caldera that are registered for this machine
:param target: Target machine we will check the config file for assigned caldera attacks for
:param caldera_from_cmdline: Caldera attacks listed on the commandline
:returns: the number of caldera attacks planned for this machine
"""
c_cmdline = 0
if caldera_conf is not None:
c_cmdline = len(caldera_conf)
if caldera_from_cmdline is not None:
c_cmdline = len(caldera_from_cmdline)
c_conffile = len(self.experiment_config.get_caldera_attacks(target.get_os()))
plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target.get_os())
c_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks)
@ -264,10 +351,13 @@ class Experiment():
# plugin.__set_logger__(self.attack_logger)
plugin.__execute__([target])
def zip_loot(self, zip_this):
""" Zip the loot together """
def zip_loot(self, zip_this: list[str]):
""" Zip the loot together
:param zip_this: A list of file paths to add to the zip file
"""
filename = os.path.join(self.lootdir, self.starttime + ".zip")
filename = os.path.join(self.loot_dir, self.start_time + ".zip")
self.attack_logger.vprint(f"Creating zip file {filename}", 1)
@ -277,11 +367,11 @@ class Experiment():
self.attack_logger.vprint(a_file, 2)
zfh.write(a_file)
zfh.write(os.path.join(self.lootdir, "attack.json"))
zfh.write(os.path.join(self.loot_dir, "attack.json"))
# For automation purpose we copy the file into a standard file name
defaultname = os.path.join(self.lootdir, "..", "most_recent.zip")
shutil.copyfile(filename, defaultname)
default_name = os.path.join(self.loot_dir, "..", "most_recent.zip")
shutil.copyfile(filename, default_name)
def __start_attacker(self):
""" Start the attacking VM """
@ -289,6 +379,9 @@ class Experiment():
# Preparing attacker
self.attacker_1 = Machine(self.experiment_config.attacker(0).raw_config, attack_logger=self.attack_logger)
if self.attacker_1 is None:
raise ServerError
if not self.experiment_config.attacker(0).use_existing_machine():
try:
self.attacker_1.destroy()
@ -303,6 +396,9 @@ class Experiment():
self.attacker_1.install_caldera_server(cleanup=False)
self.attacker_1.start_caldera_server()
if self.attacker_1 is None:
raise ServerError
# self.attacker_1.set_attack_logger(self.attack_logger)
def __stop_attacker(self):

@ -7,7 +7,7 @@ import os
import random
import requests
from pymetasploit3.msfrpc import MsfRpcClient
from pymetasploit3.msfrpc import MsfRpcClient # type: ignore
# from app.machinecontrol import Machine
from app.attack_log import AttackLog
from app.interface_sfx import CommandlineColors
@ -28,9 +28,9 @@ class Metasploit():
:param kwargs: Relevant ones: uri, port, server, username
"""
self.password = password
self.attack_logger = attack_logger
self.username = kwargs.get("username", None)
self.password: str = password
self.attack_logger: AttackLog = attack_logger
self.username: str = kwargs.get("username", None)
self.kwargs = kwargs
self.client = None
@ -65,12 +65,15 @@ class Metasploit():
print(res)
return res
def __msfrpcd_cmd__(self):
return f"killall msfrpcd; nohup msfrpcd -P {self.password} -U {self.username} -S &"
def start_msfrpcd(self):
""" Starts the msfrpcs on the attacker. Metasploit must alredy be installed there ! """
cmd = f"killall msfrpcd; nohup msfrpcd -P {self.password} -U {self.username} -S &"
# cmd = f"killall msfrpcd; nohup msfrpcd -P {self.password} -U {self.username} -S &"
self.attacker.remote_run(cmd, disown=True)
self.attacker.remote_run(self.__msfrpcd_cmd__(), disown=True)
# print("msfrpcd started")
# breakpoint()
time.sleep(3)
@ -97,7 +100,7 @@ class Metasploit():
self.start_msfrpcd()
time.sleep(sleeptime)
sleeptime += 5
print("Failed getting connection to msfrpcd. Retries left: {retries}")
print(f"Failed getting connection to msfrpcd. Retries left: {retries}")
retries -= 1
if self.client is None:

@ -17,6 +17,7 @@ from plugins.base.sensor import SensorPlugin
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
from app.interface_sfx import CommandlineColors
from app.attack_log import AttackLog
from app.exceptions import PluginError
# from app.interface_sfx import CommandlineColors
@ -89,8 +90,11 @@ class PluginManager():
plugins = self.get_plugins(subclass, name_filter)
res = 0
for plugin in plugins:
if plugin.needs_caldera():
res += 1
if isinstance(plugin, AttackPlugin):
if plugin.needs_caldera():
res += 1
else:
raise PluginError("Wrong plugin type. Expected AttackPlugin")
return res
@ -103,8 +107,11 @@ class PluginManager():
plugins = self.get_plugins(subclass, name_filter)
res = 0
for plugin in plugins:
if plugin.needs_metasploit():
res += 1
if isinstance(plugin, AttackPlugin):
if plugin.needs_metasploit():
res += 1
else:
raise PluginError("Wrong plugin type. Expected AttackPlugin")
return res

@ -27,13 +27,15 @@ def run(args):
for line in fh:
line = line.strip()
print(f"Running calder attack {line}")
Experiment(args.configfile, args.verbose, [line])
exp = Experiment(args.configfile, args.verbose)
exp.run([line])
else:
caldera_attack = None
if args.caldera_attack:
caldera_attack = [args.caldera_attack]
Experiment(args.configfile, args.verbose, caldera_attack)
exp = Experiment(args.configfile, args.verbose)
exp.run(caldera_attack)
def create_parser():

@ -0,0 +1,13 @@
# My own Mypy configuration
# Global settings
[mypy]
warn_unused_configs = True
mypy_path = $MYPY_CONFIG_FILE_DIR:$MYPY_CONFIG_FILE_DIR/app:$MYPY_CONFIG_FILE_DIR/plugins/base
# Setting for the main app
[mypy-app.*]
# Setting for the plugins
[mypy-plugins.base.*]

@ -69,8 +69,9 @@ class AttackPlugin(BasePlugin):
:returns: True if this plugin requires Caldera
"""
if Requirement.CALDERA in self.requirements:
return True
if self.requirements is not None:
if Requirement.CALDERA in self.requirements:
return True
return False
def needs_metasploit(self) -> bool:
@ -79,8 +80,9 @@ class AttackPlugin(BasePlugin):
:meta private:
:returns: True if this plugin requires Metasploit
"""
if Requirement.METASPLOIT in self.requirements:
return True
if self.requirements is not None:
if Requirement.METASPLOIT in self.requirements:
return True
return False
def connect_metasploit(self):
@ -130,7 +132,7 @@ class AttackPlugin(BasePlugin):
self.vprint(f" Plugin running command {command}", 3)
res = self.attacker_machine_plugin.__call_remote_run__(command, disown=disown)
res = self.attacker_machine_plugin.remote_run(command, disown=disown)
return res
def targets_run_cmd(self, command: str, disown: bool = False) -> str:
@ -145,7 +147,7 @@ class AttackPlugin(BasePlugin):
self.vprint(f" Plugin running command {command}", 3)
res = self.target_machine_plugin.__call_remote_run__(command, disown=disown)
res = self.target_machine_plugin.remote_run(command, disown=disown)
return res
def set_target_machines(self, machine: MachineryPlugin):
@ -154,7 +156,7 @@ class AttackPlugin(BasePlugin):
:param machine: Machine plugin to communicate with
"""
self.target_machine_plugin = machine.vm_manager
self.target_machine_plugin = machine
def set_attacker_machine(self, machine: MachineryPlugin):
""" Set the machine plugin class to target
@ -162,7 +164,7 @@ class AttackPlugin(BasePlugin):
:param machine: Machine to communicate with
"""
self.attacker_machine_plugin = machine.vm_manager
self.attacker_machine_plugin = machine
def set_caldera(self, caldera: CalderaControl):
""" Set the caldera control to be used for caldera attacks

@ -121,6 +121,19 @@ class MachineryPlugin(BasePlugin):
"""
raise NotImplementedError
def get_paw(self):
""" Returns the paw of the current machine """
return self.config.caldera_paw()
def get_group(self):
""" Returns the group of the current machine """
return self.config.caldera_group()
def get_os(self):
""" Returns the OS of the machine """
return self.config.os()
def get_playground(self):
""" Path on the machine where all the attack tools will be copied to. """

@ -1,12 +1,12 @@
#!/usr/bin/env python3
""" Base class for all plugin types """
from inspect import currentframe
from inspect import currentframe, getframeinfo
import os
from typing import Optional
import yaml
from app.exceptions import PluginError # type: ignore
import app.exceptions # type: ignore
import app.exceptions # type: ignore
class BasePlugin():
@ -73,7 +73,11 @@ class BasePlugin():
"""
cf = currentframe() # pylint: disable=invalid-name
return cf.f_back.filename
if cf is None:
raise PluginError("can not get current frame")
if cf.f_back is None:
raise PluginError("can not get current frame")
return getframeinfo(cf.f_back).filename
def get_linenumber(self) -> int:
""" Returns the current linenumber. This can be used for debugging
@ -81,6 +85,10 @@ class BasePlugin():
:returns: currently executed linenumber
"""
cf = currentframe() # pylint: disable=invalid-name
if cf is None:
raise PluginError("can not get current frame")
if cf.f_back is None:
raise PluginError("can not get current frame")
return cf.f_back.f_lineno
def get_playground(self) -> str:
@ -224,6 +232,9 @@ class BasePlugin():
:returns: The path with the plugin code
"""
if self.plugin_path is None:
raise PluginError("Non existing plugin path")
return os.path.join(os.path.dirname(self.plugin_path))
def get_default_config_filename(self) -> str:

@ -5,8 +5,8 @@ import socket
import time
import paramiko
from fabric import Connection
from invoke.exceptions import UnexpectedExit
from fabric import Connection # type: ignore
from invoke.exceptions import UnexpectedExit # type: ignore
from app.exceptions import NetworkError
from plugins.base.plugin_base import BasePlugin
@ -175,7 +175,7 @@ class SSHFeatures(BasePlugin):
self.vprint(f"SSH GET: No valid connection. Errors: {error.errors}", 1)
do_retry = True
except FileNotFoundError as error:
self.vprint(error, 0)
self.vprint(str(error), 0)
break
except OSError:
self.vprint("SSH GET: Obscure OSError, ignoring (file should have been copied)", 1)

@ -22,7 +22,7 @@ sphinx-revealjs
# sphinx-pydantic # This one has issues that must be fixed upstream first
# Mypy stuff
mypy==0.910
mypy==0.931
types-PyYAML==5.4.6
types-requests==2.25.6
types-simplejson==3.17.0

@ -0,0 +1,65 @@
import unittest
from unittest.mock import patch
from app.metasploit import Metasploit
from app.attack_log import AttackLog
from pymetasploit3.msfrpc import MsfRpcClient
import requests
from app.exceptions import ServerError
import time
# https://docs.python.org/3/library/unittest.html
class FakeAttacker():
def __init__(self):
pass
def remote_run(self, cmd, disown):
pass
def get_ip(self):
return "66.55.44.33"
class TestMetasploit(unittest.TestCase):
def setUp(self) -> None:
with patch.object(time, "sleep") as _:
self.attack_logger = AttackLog(0)
def test_basic_init(self):
with patch.object(time, "sleep") as _:
m = Metasploit("FooBar", self.attack_logger)
self.assertEqual(m.password, "FooBar")
self.assertEqual(m.attack_logger, self.attack_logger)
def test_msfrpcd_cmd(self):
attacker = FakeAttacker()
with patch.object(time, "sleep") as _:
m = Metasploit("FooBar", self.attack_logger, attacker=attacker, username="Pennywise")
self.assertEqual(m.__msfrpcd_cmd__(), "killall msfrpcd; nohup msfrpcd -P FooBar -U Pennywise -S &")
def test_get_client_simple(self):
attacker = FakeAttacker()
with patch.object(time, "sleep") as _:
m = Metasploit("FooBar", self.attack_logger, attacker=attacker, username="Pennywise")
m.client = "Foo"
self.assertEqual(m.get_client(), "Foo")
def test_get_client_success(self):
attacker = FakeAttacker()
with patch.object(time, "sleep") as _:
m = Metasploit("FooBar", self.attack_logger, attacker=attacker, username="Pennywise")
with patch.object(MsfRpcClient, "__init__", return_value=None) as mock_method:
m.get_client()
mock_method.assert_called_once_with("FooBar", attacker=attacker, username="Pennywise", server="66.55.44.33")
def test_get_client_retries(self):
attacker = FakeAttacker()
with patch.object(time, "sleep") as _:
m = Metasploit("FooBar", self.attack_logger, attacker=attacker, username="Pennywise")
with self.assertRaises(ServerError):
with patch.object(MsfRpcClient, "__init__", side_effect=requests.exceptions.ConnectionError()) as mock_method:
m.get_client()
mock_method.assert_called_with("FooBar", attacker=attacker, username="Pennywise", server="66.55.44.33")

@ -36,6 +36,7 @@ deps = -r requirements.txt
bandit
pylint
argcomplete
mypy
commands =
@ -51,6 +52,9 @@ commands =
safety check -r requirements.txt
# Check for common vulnerabilities
bandit -ll -r app/ plugins/ *.py
# Linting
# pylint *.py # currently off. Needs configuration
# pylint check (linter, basic checks only)
pylint --rcfile=pylint.rc app/ plugins/base/ caldera_control.py doc_generator.py experiment_control.py machine_control.py metasploit_control.py plugin_manager.py
# Own plugin checker
python3 ./plugin_manager.py check
# mypy checks (type checker)
mypy --strict-optional app/ plugins/base/
Loading…
Cancel
Save