diff --git a/app/experimentcontrol.py b/app/experimentcontrol.py index f13dc56..baa21b8 100644 --- a/app/experimentcontrol.py +++ b/app/experimentcontrol.py @@ -13,7 +13,7 @@ from typing import Optional from app.attack_log import AttackLog from app.config import ExperimentConfig from app.interface_sfx import CommandlineColors -from app.exceptions import ServerError +from app.exceptions import ServerError, CalderaError, MachineError from app.pluginmanager import PluginManager from app.doc_generator import DocGenerator from app.calderacontrol import CalderaControl @@ -32,12 +32,12 @@ class Experiment(): :param configfile: Path to the configfile to load :param verbosity: verbosity level between 0 and 3 """ - self.start_time: Optional[datetime] = None #: time the experiment started - self.caldera_control: Optional[CalderaControl] = None - self.loot_dir: Optional[str] = None - self.targets = [] + self.start_time: str = datetime.now().strftime("%Y_%m_%d___%H_%M_%S") #: time the experiment started. + self.caldera_control: Optional[CalderaControl] = None #: Controller for Caldera interaction + self.loot_dir: str = "loot" #: Directory to store the loot into. Will be fetched from config + self.targets: list[Machine] = [] #: A list of target machines - self.attacker_1: Optional[Machine] = None + self.attacker_1: Optional[Machine] = None #: The attacker machine self.experiment_config = ExperimentConfig(configfile) self.attack_logger = AttackLog(verbosity) @@ -51,7 +51,8 @@ class Experiment(): :return: """ self.__start_attacker() - + if self.attacker_1 is None: + raise MachineError("Attacker not initialised") caldera_url = "http://" + self.attacker_1.get_ip() + ":8888" self.caldera_control = CalderaControl(caldera_url, attack_logger=self.attack_logger, config=self.experiment_config) # self.caldera_control = CalderaControl("http://" + self.attacker_1.get_ip() + ":8888", self.attack_logger, @@ -65,7 +66,7 @@ class Experiment(): os.makedirs(self.loot_dir) # start target machines - tname = self.start_target_machines(caldera_attacks) + self.start_target_machines(caldera_attacks) # Install vulnerabilities self.install_vulnerabilities() @@ -74,11 +75,11 @@ class Experiment(): self.install_sensor_plugins() # First start of caldera implants - self.first_start_of_caldera_implants(caldera_attacks, tname) + self.first_start_of_caldera_implants(caldera_attacks) self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Contacting caldera agents on all targets ....{CommandlineColors.ENDC}", 1) # Wait until all targets are registered as Caldera targets - self.wait_until_all_targets_have_caldera_implants(caldera_attacks, caldera_url) + self.wait_until_all_targets_have_caldera_implants(caldera_url, caldera_attacks) self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1) # Add running machines to log @@ -118,6 +119,9 @@ class Experiment(): self.zip_loot(zip_this) def run_plugin_attacks(self): + """ Run plugin based attacks + + """ self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1) for target_1 in self.targets: plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os()) @@ -133,7 +137,12 @@ class Experiment(): time.sleep(self.experiment_config.get_nap_time()) self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1) - def run_caldera_attacks(self, caldera_attacks): + def run_caldera_attacks(self, caldera_attacks: Optional[list[str]] = None): + """ Run caldera based attacks + + + :param caldera_attacks: An optional list of caldera attack ids as string + """ self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1) for target_1 in self.targets: if caldera_attacks is None: @@ -146,7 +155,8 @@ class Experiment(): # TODO: Work with snapshots # TODO: If we have several targets in the same group, it is nonsense to attack each one separately. Make this smarter self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with {attack}", 2) - + if self.caldera_control is None: + raise CalderaError("Caldera control not initialised") it_worked = self.caldera_control.attack(paw=target_1.get_paw(), ability_id=attack, group=target_1.get_group(), @@ -163,6 +173,8 @@ class Experiment(): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Restarting caldera server and waiting for clients to re-connect{CommandlineColors.ENDC}", 1) + if self.attacker_1 is None: + raise MachineError("attacker not initialised") self.attacker_1.start_caldera_server() self.attack_logger.vprint( f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", @@ -170,6 +182,8 @@ class Experiment(): time.sleep(self.experiment_config.get_nap_time()) retries = 100 for target_system in self.targets: + if self.caldera_control is None: + raise CalderaError("Caldera is not initialised") if self.machine_needs_caldera(target_system, caldera_attacks) == 0: self.attack_logger.vprint(f"No caldera agent needed for: {target_system.get_paw()} ", 3) continue @@ -190,6 +204,7 @@ class Experiment(): self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1) def add_running_machines_to_log(self): + """ Add machine infos for targets and attacker to the log """ for target in self.targets: i = target.get_machine_info() i["role"] = "target" @@ -198,8 +213,15 @@ class Experiment(): i["role"] = "attacker" self.attack_logger.add_machine_info(i) - def wait_until_all_targets_have_caldera_implants(self, caldera_attacks, caldera_url): + def wait_until_all_targets_have_caldera_implants(self, caldera_url: str, caldera_attacks: Optional[list[str]] = None): + """ + + :param caldera_attacks: a list of command line defined caldera attacks + :param caldera_url: URL of the caldera server + """ for target_1 in self.targets: + if self.caldera_control is None: + raise CalderaError("Caldera is not initialised") running_agents = self.caldera_control.list_paws_of_running_agents() self.attack_logger.vprint(f"Agents currently running: {running_agents}", 2) while target_1.get_paw() not in running_agents: @@ -214,13 +236,17 @@ class Experiment(): time.sleep(120) # Was 30, but maybe there are timing issues running_agents = self.caldera_control.list_paws_of_running_agents() - def first_start_of_caldera_implants(self, caldera_attacks, tname): + def first_start_of_caldera_implants(self, caldera_attacks: Optional[list[str]] = None): + """ Start caldera implant on the targets + + :param caldera_attacks: a list of command line defined caldera attacks + """ at_least_one_caldera_started = False for target_1 in self.targets: if self.machine_needs_caldera(target_1, caldera_attacks): target_1.start_caldera_client() self.attack_logger.vprint( - f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1) + f"{CommandlineColors.OKGREEN}Initial start of caldera client: {target_1.get_name()} {CommandlineColors.ENDC}", 1) else: at_least_one_caldera_started = True if at_least_one_caldera_started: @@ -228,18 +254,28 @@ class Experiment(): # TODO: Smarter wait def install_sensor_plugins(self): + """ Installs sensor plugins on the targets + + """ for a_target in self.targets: self.attack_logger.vprint(f"Installing sensors on {a_target.get_paw()}", 2) a_target.install_sensors() a_target.start_sensors() def install_vulnerabilities(self): + """ Install vulnerabilities on the targets + + """ for a_target in self.targets: self.attack_logger.vprint(f"Installing vulnerabilities on {a_target.get_paw()}", 2) a_target.install_vulnerabilities() a_target.start_vulnerabilities() - def start_target_machines(self, caldera_attacks): + def start_target_machines(self, caldera_attacks: Optional[list[str]] = None): + """ Start target machines + + :param caldera_attacks: Caldera attacks as defined on the command line + """ for target_conf in self.experiment_config.targets(): if not target_conf.is_active(): continue @@ -249,6 +285,10 @@ class Experiment(): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}preparing target {tname} ....{CommandlineColors.ENDC}", 1) target_1 = Machine(target_conf, attack_logger=self.attack_logger) + if target_1 is None: + raise MachineError("Creating target machine failed") + if self.attacker_1 is None: + raise MachineError("Creating attacker machine failed") target_1.set_caldera_server(self.attacker_1.get_ip()) try: if not target_conf.use_existing_machine(): @@ -268,14 +308,18 @@ class Experiment(): target_1.reboot() self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1) self.targets.append(target_1) - return tname - def machine_needs_caldera(self, target, caldera_conf): - """ Counts the attacks and plugins needing caldera that are registered for this machine """ + def machine_needs_caldera(self, target, caldera_from_cmdline: Optional[list[str]] = None) -> int: + """ Counts the attacks and plugins needing caldera that are registered for this machine + + :param target: Target machine we will check the config file for assigned caldera attacks for + :param caldera_from_cmdline: Caldera attacks listed on the commandline + :returns: the number of caldera attacks planned for this machine + """ c_cmdline = 0 - if caldera_conf is not None: - c_cmdline = len(caldera_conf) + if caldera_from_cmdline is not None: + c_cmdline = len(caldera_from_cmdline) c_conffile = len(self.experiment_config.get_caldera_attacks(target.get_os())) plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target.get_os()) c_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks) @@ -307,8 +351,11 @@ class Experiment(): # plugin.__set_logger__(self.attack_logger) plugin.__execute__([target]) - def zip_loot(self, zip_this): - """ Zip the loot together """ + def zip_loot(self, zip_this: list[str]): + """ Zip the loot together + + :param zip_this: A list of file paths to add to the zip file + """ filename = os.path.join(self.loot_dir, self.start_time + ".zip") @@ -323,8 +370,8 @@ class Experiment(): zfh.write(os.path.join(self.loot_dir, "attack.json")) # For automation purpose we copy the file into a standard file name - defaultname = os.path.join(self.loot_dir, "..", "most_recent.zip") - shutil.copyfile(filename, defaultname) + default_name = os.path.join(self.loot_dir, "..", "most_recent.zip") + shutil.copyfile(filename, default_name) def __start_attacker(self): """ Start the attacking VM """