From 848af1a65da31566add384bfb324007fa0636cc1 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Mon, 21 Feb 2022 17:17:00 +0100 Subject: [PATCH 1/7] Starting Metasploit unit tests --- app/metasploit.py | 15 ++++++---- tests/test_metasploit.py | 65 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/app/metasploit.py b/app/metasploit.py index d3da2d1..5aee5d0 100644 --- a/app/metasploit.py +++ b/app/metasploit.py @@ -28,9 +28,9 @@ class Metasploit(): :param kwargs: Relevant ones: uri, port, server, username """ - self.password = password - self.attack_logger = attack_logger - self.username = kwargs.get("username", None) + self.password: str = password + self.attack_logger: AttackLog = attack_logger + self.username: str = kwargs.get("username", None) self.kwargs = kwargs self.client = None @@ -65,12 +65,15 @@ class Metasploit(): print(res) return res + def __msfrpcd_cmd__(self): + return f"killall msfrpcd; nohup msfrpcd -P {self.password} -U {self.username} -S &" + def start_msfrpcd(self): """ Starts the msfrpcs on the attacker. Metasploit must alredy be installed there ! """ - cmd = f"killall msfrpcd; nohup msfrpcd -P {self.password} -U {self.username} -S &" + # cmd = f"killall msfrpcd; nohup msfrpcd -P {self.password} -U {self.username} -S &" - self.attacker.remote_run(cmd, disown=True) + self.attacker.remote_run(self.__msfrpcd_cmd__(), disown=True) # print("msfrpcd started") # breakpoint() time.sleep(3) @@ -97,7 +100,7 @@ class Metasploit(): self.start_msfrpcd() time.sleep(sleeptime) sleeptime += 5 - print("Failed getting connection to msfrpcd. Retries left: {retries}") + print(f"Failed getting connection to msfrpcd. Retries left: {retries}") retries -= 1 if self.client is None: diff --git a/tests/test_metasploit.py b/tests/test_metasploit.py index e69de29..9a3fce8 100644 --- a/tests/test_metasploit.py +++ b/tests/test_metasploit.py @@ -0,0 +1,65 @@ +import unittest +from unittest.mock import patch +from app.metasploit import Metasploit +from app.attack_log import AttackLog +from pymetasploit3.msfrpc import MsfRpcClient +import requests +from app.exceptions import ServerError +import time + +# https://docs.python.org/3/library/unittest.html + + +class FakeAttacker(): + + def __init__(self): + pass + + def remote_run(self, cmd, disown): + pass + + def get_ip(self): + return "66.55.44.33" + + +class TestMetasploit(unittest.TestCase): + + def setUp(self) -> None: + with patch.object(time, "sleep") as _: + self.attack_logger = AttackLog(0) + + def test_basic_init(self): + with patch.object(time, "sleep") as _: + m = Metasploit("FooBar", self.attack_logger) + self.assertEqual(m.password, "FooBar") + self.assertEqual(m.attack_logger, self.attack_logger) + + def test_msfrpcd_cmd(self): + attacker = FakeAttacker() + with patch.object(time, "sleep") as _: + m = Metasploit("FooBar", self.attack_logger, attacker=attacker, username="Pennywise") + self.assertEqual(m.__msfrpcd_cmd__(), "killall msfrpcd; nohup msfrpcd -P FooBar -U Pennywise -S &") + + def test_get_client_simple(self): + attacker = FakeAttacker() + with patch.object(time, "sleep") as _: + m = Metasploit("FooBar", self.attack_logger, attacker=attacker, username="Pennywise") + m.client = "Foo" + self.assertEqual(m.get_client(), "Foo") + + def test_get_client_success(self): + attacker = FakeAttacker() + with patch.object(time, "sleep") as _: + m = Metasploit("FooBar", self.attack_logger, attacker=attacker, username="Pennywise") + with patch.object(MsfRpcClient, "__init__", return_value=None) as mock_method: + m.get_client() + mock_method.assert_called_once_with("FooBar", attacker=attacker, username="Pennywise", server="66.55.44.33") + + def test_get_client_retries(self): + attacker = FakeAttacker() + with patch.object(time, "sleep") as _: + m = Metasploit("FooBar", self.attack_logger, attacker=attacker, username="Pennywise") + with self.assertRaises(ServerError): + with patch.object(MsfRpcClient, "__init__", side_effect=requests.exceptions.ConnectionError()) as mock_method: + m.get_client() + mock_method.assert_called_with("FooBar", attacker=attacker, username="Pennywise", server="66.55.44.33") From 51e75b8b223162030ff0870d01126f10fbbfa1d0 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Tue, 22 Feb 2022 10:21:00 +0100 Subject: [PATCH 2/7] Refactoring round 1 for experimentcontrol. Tested and works --- app/experimentcontrol.py | 281 +++++++++++++++++++++++---------------- experiment_control.py | 6 +- 2 files changed, 169 insertions(+), 118 deletions(-) diff --git a/app/experimentcontrol.py b/app/experimentcontrol.py index 495793e..f13dc56 100644 --- a/app/experimentcontrol.py +++ b/app/experimentcontrol.py @@ -26,21 +26,32 @@ from plugins.base.attack import AttackPlugin class Experiment(): """ Class handling experiments """ - def __init__(self, configfile: str, verbosity=0, caldera_attacks: list = None): + def __init__(self, configfile: str, verbosity=0): """ :param configfile: Path to the configfile to load :param verbosity: verbosity level between 0 and 3 - :param caldera_attacks: an optional argument to override caldera attacks in the config file and run just this one caldera attack. A list of caldera ID """ + self.start_time: Optional[datetime] = None #: time the experiment started + self.caldera_control: Optional[CalderaControl] = None + self.loot_dir: Optional[str] = None + self.targets = [] + self.attacker_1: Optional[Machine] = None self.experiment_config = ExperimentConfig(configfile) self.attack_logger = AttackLog(verbosity) self.plugin_manager = PluginManager(self.attack_logger) + + def run(self, caldera_attacks: list = None): + """ + Run the experiment + + :param caldera_attacks: an optional argument to override caldera attacks in the config file and run just this one caldera attack. A list of caldera ID + :return: + """ self.__start_attacker() - if self.attacker_1 is None: - raise ServerError + caldera_url = "http://" + self.attacker_1.get_ip() + ":8888" self.caldera_control = CalderaControl(caldera_url, attack_logger=self.attack_logger, config=self.experiment_config) # self.caldera_control = CalderaControl("http://" + self.attacker_1.get_ip() + ":8888", self.attack_logger, @@ -49,92 +60,80 @@ class Experiment(): self.attack_logger.vprint(self.caldera_control.kill_all_agents(), 3) self.attack_logger.vprint(self.caldera_control.delete_all_agents(), 3) - self.starttime = datetime.now().strftime("%Y_%m_%d___%H_%M_%S") - self.lootdir = os.path.join(self.experiment_config.loot_dir(), self.starttime) - os.makedirs(self.lootdir) + self.start_time = datetime.now().strftime("%Y_%m_%d___%H_%M_%S") + self.loot_dir = os.path.join(self.experiment_config.loot_dir(), self.start_time) + os.makedirs(self.loot_dir) - self.targets = [] # start target machines - for target_conf in self.experiment_config.targets(): - if not target_conf.is_active(): - continue - - tname = target_conf.vmname() - - self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}preparing target {tname} ....{CommandlineColors.ENDC}", 1) - target_1 = Machine(target_conf, attack_logger=self.attack_logger) - target_1.set_caldera_server(self.attacker_1.get_ip()) - try: - if not target_conf.use_existing_machine(): - target_1.destroy() - except subprocess.CalledProcessError: - # Maybe the machine just does not exist yet - pass - if self.machine_needs_caldera(target_1, caldera_attacks): - target_1.install_caldera_service() - target_1.up() - target_1.reboot() # Kernel changes on system creation require a reboot - needs_reboot = target_1.prime_vulnerabilities() - needs_reboot |= target_1.prime_sensors() - if needs_reboot: - self.attack_logger.vprint( - f"{CommandlineColors.OKBLUE}rebooting target {tname} ....{CommandlineColors.ENDC}", 1) - target_1.reboot() - self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1) - self.targets.append(target_1) + tname = self.start_target_machines(caldera_attacks) # Install vulnerabilities - for a_target in self.targets: - self.attack_logger.vprint(f"Installing vulnerabilities on {a_target.get_paw()}", 2) - a_target.install_vulnerabilities() - a_target.start_vulnerabilities() + self.install_vulnerabilities() # Install sensor plugins - for a_target in self.targets: - self.attack_logger.vprint(f"Installing sensors on {a_target.get_paw()}", 2) - a_target.install_sensors() - a_target.start_sensors() + self.install_sensor_plugins() # First start of caldera implants - at_least_one_caldera_started = False - for target_1 in self.targets: - if self.machine_needs_caldera(target_1, caldera_attacks): - target_1.start_caldera_client() - self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1) - else: - at_least_one_caldera_started = True - if at_least_one_caldera_started: - time.sleep(20) # Wait for all the clients to contact the caldera server - # TODO: Smarter wait + self.first_start_of_caldera_implants(caldera_attacks, tname) self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Contacting caldera agents on all targets ....{CommandlineColors.ENDC}", 1) # Wait until all targets are registered as Caldera targets - for target_1 in self.targets: - running_agents = self.caldera_control.list_paws_of_running_agents() - self.attack_logger.vprint(f"Agents currently running: {running_agents}", 2) - while target_1.get_paw() not in running_agents: - if self.machine_needs_caldera(target_1, caldera_attacks) == 0: - self.attack_logger.vprint(f"No caldera agent needed for: {target_1.get_paw()} ", 3) - break - self.attack_logger.vprint(f"Connecting to caldera {caldera_url}, running agents are: {running_agents}", 3) - self.attack_logger.vprint(f"Missing agent: {target_1.get_paw()} ...", 3) - target_1.start_caldera_client() - self.attack_logger.vprint(f"Restarted caldera agent: {target_1.get_paw()} ...", 3) - time.sleep(120) # Was 30, but maybe there are timing issues - running_agents = self.caldera_control.list_paws_of_running_agents() + self.wait_until_all_targets_have_caldera_implants(caldera_attacks, caldera_url) self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1) # Add running machines to log - for target in self.targets: - i = target.get_machine_info() - i["role"] = "target" - self.attack_logger.add_machine_info(i) - - i = self.attacker_1.get_machine_info() - i["role"] = "attacker" - self.attack_logger.add_machine_info(i) + self.add_running_machines_to_log() # Attack them + self.run_caldera_attacks(caldera_attacks) + + # Run plugin based attacks + self.run_plugin_attacks() + + # Stop sensor plugins + # Collect data + zip_this = [] + for a_target in self.targets: + a_target.stop_sensors() + zip_this += a_target.collect_sensors(self.loot_dir) + + # Uninstall vulnerabilities + for a_target in self.targets: + self.attack_logger.vprint(f"{CommandlineColors.OKBLUE} Uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1) + a_target.stop_vulnerabilities() + self.attack_logger.vprint(f"{CommandlineColors.OKGREEN} Done uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1) + + # Stop target machines + for target_1 in self.targets: + target_1.halt() + self.__stop_attacker() + + self.attack_logger.post_process() + attack_log_file_path = os.path.join(self.loot_dir, "attack.json") + self.attack_logger.write_json(attack_log_file_path) + document_generator = DocGenerator() + document_generator.generate(attack_log_file_path) + document_generator.compile_documentation() + zip_this += document_generator.get_outfile_paths() + self.zip_loot(zip_this) + + def run_plugin_attacks(self): + self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1) + for target_1 in self.targets: + plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os()) + metasploit_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks) + print(f"Plugins needing metasploit for {target_1.get_paw()} : {metasploit_plugins}") + for attack in plugin_based_attacks: + # TODO: Work with snapshots + self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with attack: {attack}", 1) + + self.attack(target_1, attack) + self.attack_logger.vprint( + f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", 3) + time.sleep(self.experiment_config.get_nap_time()) + self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1) + + def run_caldera_attacks(self, caldera_attacks): self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1) for target_1 in self.targets: if caldera_attacks is None: @@ -161,9 +160,13 @@ class Experiment(): # Fix: Caldera sometimes gets stuck. This is why we better re-start the caldera server and wait till all the implants re-connected # Reason: In some scenarios we keep the infra up for hours or days. No re-creation like intended. This can cause Caldera to hick up if it_worked: - self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Restarting caldera server and waiting for clients to re-connect{CommandlineColors.ENDC}", 1) + self.attack_logger.vprint( + f"{CommandlineColors.OKBLUE}Restarting caldera server and waiting for clients to re-connect{CommandlineColors.ENDC}", + 1) self.attacker_1.start_caldera_server() - self.attack_logger.vprint(f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", 2) + self.attack_logger.vprint( + f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", + 2) time.sleep(self.experiment_config.get_nap_time()) retries = 100 for target_system in self.targets: @@ -176,56 +179,96 @@ class Experiment(): time.sleep(1) running_agents = self.caldera_control.list_paws_of_running_agents() retries -= 1 - self.attack_logger.vprint(f"Waiting for clients to re-connect ({retries}, {running_agents}) ", 3) + self.attack_logger.vprint( + f"Waiting for clients to re-connect ({retries}, {running_agents}) ", 3) if retries <= 0: raise ServerError - self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Restarted caldera server clients re-connected{CommandlineColors.ENDC}", 1) + self.attack_logger.vprint( + f"{CommandlineColors.OKGREEN}Restarted caldera server clients re-connected{CommandlineColors.ENDC}", + 1) # End of fix - self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1) - # Run plugin based attacks - self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1) - for target_1 in self.targets: - plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os()) - metasploit_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks) - print(f"Plugins needing metasploit for {target_1.get_paw()} : {metasploit_plugins}") - for attack in plugin_based_attacks: - # TODO: Work with snapshots - self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with attack: {attack}", 1) + def add_running_machines_to_log(self): + for target in self.targets: + i = target.get_machine_info() + i["role"] = "target" + self.attack_logger.add_machine_info(i) + i = self.attacker_1.get_machine_info() + i["role"] = "attacker" + self.attack_logger.add_machine_info(i) - self.attack(target_1, attack) - self.attack_logger.vprint(f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", 3) - time.sleep(self.experiment_config.get_nap_time()) + def wait_until_all_targets_have_caldera_implants(self, caldera_attacks, caldera_url): + for target_1 in self.targets: + running_agents = self.caldera_control.list_paws_of_running_agents() + self.attack_logger.vprint(f"Agents currently running: {running_agents}", 2) + while target_1.get_paw() not in running_agents: + if self.machine_needs_caldera(target_1, caldera_attacks) == 0: + self.attack_logger.vprint(f"No caldera agent needed for: {target_1.get_paw()} ", 3) + break + self.attack_logger.vprint(f"Connecting to caldera {caldera_url}, running agents are: {running_agents}", + 3) + self.attack_logger.vprint(f"Missing agent: {target_1.get_paw()} ...", 3) + target_1.start_caldera_client() + self.attack_logger.vprint(f"Restarted caldera agent: {target_1.get_paw()} ...", 3) + time.sleep(120) # Was 30, but maybe there are timing issues + running_agents = self.caldera_control.list_paws_of_running_agents() - self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1) + def first_start_of_caldera_implants(self, caldera_attacks, tname): + at_least_one_caldera_started = False + for target_1 in self.targets: + if self.machine_needs_caldera(target_1, caldera_attacks): + target_1.start_caldera_client() + self.attack_logger.vprint( + f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1) + else: + at_least_one_caldera_started = True + if at_least_one_caldera_started: + time.sleep(20) # Wait for all the clients to contact the caldera server + # TODO: Smarter wait - # Stop sensor plugins - # Collect data - zip_this = [] + def install_sensor_plugins(self): for a_target in self.targets: - a_target.stop_sensors() - zip_this += a_target.collect_sensors(self.lootdir) + self.attack_logger.vprint(f"Installing sensors on {a_target.get_paw()}", 2) + a_target.install_sensors() + a_target.start_sensors() - # Uninstall vulnerabilities + def install_vulnerabilities(self): for a_target in self.targets: - self.attack_logger.vprint(f"{CommandlineColors.OKBLUE} Uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1) - a_target.stop_vulnerabilities() - self.attack_logger.vprint(f"{CommandlineColors.OKGREEN} Done uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1) + self.attack_logger.vprint(f"Installing vulnerabilities on {a_target.get_paw()}", 2) + a_target.install_vulnerabilities() + a_target.start_vulnerabilities() - # Stop target machines - for target_1 in self.targets: - target_1.halt() - self.__stop_attacker() + def start_target_machines(self, caldera_attacks): + for target_conf in self.experiment_config.targets(): + if not target_conf.is_active(): + continue - self.attack_logger.post_process() - attack_log_file_path = os.path.join(self.lootdir, "attack.json") - self.attack_logger.write_json(attack_log_file_path) - document_generator = DocGenerator() - document_generator.generate(attack_log_file_path) - document_generator.compile_documentation() - zip_this += document_generator.get_outfile_paths() - self.zip_loot(zip_this) + tname = target_conf.vmname() + + self.attack_logger.vprint( + f"{CommandlineColors.OKBLUE}preparing target {tname} ....{CommandlineColors.ENDC}", 1) + target_1 = Machine(target_conf, attack_logger=self.attack_logger) + target_1.set_caldera_server(self.attacker_1.get_ip()) + try: + if not target_conf.use_existing_machine(): + target_1.destroy() + except subprocess.CalledProcessError: + # Maybe the machine just does not exist yet + pass + if self.machine_needs_caldera(target_1, caldera_attacks): + target_1.install_caldera_service() + target_1.up() + target_1.reboot() # Kernel changes on system creation require a reboot + needs_reboot = target_1.prime_vulnerabilities() + needs_reboot |= target_1.prime_sensors() + if needs_reboot: + self.attack_logger.vprint( + f"{CommandlineColors.OKBLUE}rebooting target {tname} ....{CommandlineColors.ENDC}", 1) + target_1.reboot() + self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1) + self.targets.append(target_1) + return tname def machine_needs_caldera(self, target, caldera_conf): """ Counts the attacks and plugins needing caldera that are registered for this machine """ @@ -267,7 +310,7 @@ class Experiment(): def zip_loot(self, zip_this): """ Zip the loot together """ - filename = os.path.join(self.lootdir, self.starttime + ".zip") + filename = os.path.join(self.loot_dir, self.start_time + ".zip") self.attack_logger.vprint(f"Creating zip file {filename}", 1) @@ -277,10 +320,10 @@ class Experiment(): self.attack_logger.vprint(a_file, 2) zfh.write(a_file) - zfh.write(os.path.join(self.lootdir, "attack.json")) + zfh.write(os.path.join(self.loot_dir, "attack.json")) # For automation purpose we copy the file into a standard file name - defaultname = os.path.join(self.lootdir, "..", "most_recent.zip") + defaultname = os.path.join(self.loot_dir, "..", "most_recent.zip") shutil.copyfile(filename, defaultname) def __start_attacker(self): @@ -289,6 +332,9 @@ class Experiment(): # Preparing attacker self.attacker_1 = Machine(self.experiment_config.attacker(0).raw_config, attack_logger=self.attack_logger) + if self.attacker_1 is None: + raise ServerError + if not self.experiment_config.attacker(0).use_existing_machine(): try: self.attacker_1.destroy() @@ -303,6 +349,9 @@ class Experiment(): self.attacker_1.install_caldera_server(cleanup=False) self.attacker_1.start_caldera_server() + + if self.attacker_1 is None: + raise ServerError # self.attacker_1.set_attack_logger(self.attack_logger) def __stop_attacker(self): diff --git a/experiment_control.py b/experiment_control.py index 2003655..c355019 100755 --- a/experiment_control.py +++ b/experiment_control.py @@ -27,13 +27,15 @@ def run(args): for line in fh: line = line.strip() print(f"Running calder attack {line}") - Experiment(args.configfile, args.verbose, [line]) + exp = Experiment(args.configfile, args.verbose) + exp.run([line]) else: caldera_attack = None if args.caldera_attack: caldera_attack = [args.caldera_attack] - Experiment(args.configfile, args.verbose, caldera_attack) + exp = Experiment(args.configfile, args.verbose) + exp.run(caldera_attack) def create_parser(): From 5c70b7202b2c8e62ba187f8d1b3b46d4454af868 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Wed, 23 Feb 2022 12:40:53 +0100 Subject: [PATCH 3/7] Make code survive a mypy scan --- app/__init__.py | 0 app/attack_log.py | 13 +++++++++++-- app/calderaapi_4.py | 14 +++++++------- app/calderacontrol.py | 27 ++++++++++++++++++++------- app/config.py | 7 ++++--- app/config_verifier.py | 6 +++--- app/exceptions.py | 4 ++++ app/metasploit.py | 2 +- app/pluginmanager.py | 15 +++++++++++---- plugins/__init__.py | 0 plugins/base/__init__.py | 0 plugins/base/attack.py | 18 ++++++++++-------- plugins/base/machinery.py | 13 +++++++++++++ plugins/base/plugin_base.py | 17 ++++++++++++++--- plugins/base/ssh_features.py | 6 +++--- 15 files changed, 101 insertions(+), 41 deletions(-) create mode 100644 app/__init__.py create mode 100644 plugins/__init__.py create mode 100644 plugins/base/__init__.py diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/attack_log.py b/app/attack_log.py index 57ff022..6f391d1 100644 --- a/app/attack_log.py +++ b/app/attack_log.py @@ -510,6 +510,15 @@ class AttackLog(): logid = timestamp + "_" + str(randint(1, 100000)) cframe = currentframe() + default_sourcefile = "" + if cframe is not None: + if cframe.f_back is not None: + default_sourcefile = getsourcefile(cframe.f_back) or "" + + default_sourceline = -1 + if cframe is not None: + if cframe.f_back is not None: + default_sourceline = cframe.f_back.f_lineno data = {"timestamp": timestamp, "timestamp_end": None, @@ -528,8 +537,8 @@ class AttackLog(): "situation_description": kwargs.get("situation_description", None), # Description for the situation this attack was run in. Set by the plugin or attacker emulation "countermeasure": kwargs.get("countermeasure", None), # Set by the attack "result": None, - "sourcefile": kwargs.get("sourcefile", getsourcefile(cframe.f_back)), - "sourceline": kwargs.get("sourceline", cframe.f_back.f_lineno) + "sourcefile": kwargs.get("sourcefile", default_sourcefile), + "sourceline": kwargs.get("sourceline", default_sourceline) } self.__add_to_log__(data) diff --git a/app/calderaapi_4.py b/app/calderaapi_4.py index 712d9d2..e3e0e80 100644 --- a/app/calderaapi_4.py +++ b/app/calderaapi_4.py @@ -5,7 +5,7 @@ import json from pprint import pformat -from typing import Optional, Union +from typing import Optional, Union, Annotated import requests import simplejson from pydantic.dataclasses import dataclass @@ -104,9 +104,9 @@ class Ability: @dataclass -class AbilityList: +class AbilityList(): """ A list of exploits """ - abilities: conlist(Ability, min_items=1) + abilities: Annotated[list, conlist(Ability, min_items=1)] def get_data(self): return self.abilities @@ -123,7 +123,7 @@ class Obfuscator: @dataclass class ObfuscatorList: """ A list of obfuscators """ - obfuscators: conlist(Obfuscator, min_items=1) + obfuscators: Annotated[list, conlist(Obfuscator, min_items=1)] def get_data(self): return self.obfuscators @@ -152,7 +152,7 @@ class Adversary: @dataclass class AdversaryList: """ A list of adversary """ - adversaries: conlist(Adversary, min_items=1) + adversaries: Annotated[list, conlist(Adversary, min_items=1)] def get_data(self): return self.adversaries @@ -396,7 +396,7 @@ class Operation: @dataclass class OperationList: - operations: conlist(Operation) + operations: Annotated[list, conlist(Operation)] def get_data(self): return self.operations @@ -404,7 +404,7 @@ class OperationList: @dataclass class ObjectiveList: - objectives: conlist(Objective) + objectives: Annotated[list, conlist(Objective)] def get_data(self): return self.objectives diff --git a/app/calderacontrol.py b/app/calderacontrol.py index ab8bf4b..df11d52 100644 --- a/app/calderacontrol.py +++ b/app/calderacontrol.py @@ -66,11 +66,14 @@ class CalderaControl(CalderaAPI): return {} res = {} - for i in source.get("facts"): - res[i.get("trait")] = {"value": i.get("value"), - "technique_id": i.get("technique_id"), - "collected_by": i.get("collected_by") - } + if source is not None: + facts = source.get("facts") + if facts is not None: + for fact in facts: + res[fact.get("trait")] = {"value": fact.get("value"), + "technique_id": fact.get("technique_id"), + "collected_by": fact.get("collected_by") + } return res def list_paws_of_running_agents(self) -> list[str]: @@ -344,7 +347,12 @@ class CalderaControl(CalderaAPI): return False self.add_adversary(adversary_name, ability_id) - adid = self.get_adversary(adversary_name).get("adversary_id") + adversary = self.get_adversary(adversary_name) + if adversary is None: + raise CalderaError("Could not get adversary") + adid = adversary.get("adversary_id", None) + if adid is None: + raise CalderaError("Could not get adversary by id") logid = self.attack_logger.start_caldera_attack(source=self.url, paw=paw, @@ -370,7 +378,12 @@ class CalderaControl(CalderaAPI): ) self.attack_logger.vprint(pformat(res), 3) - opid = self.get_operation(operation_name).get("id") + operation = self.get_operation(operation_name) + if operation is None: + raise CalderaError("Was not able to get operation") + opid = operation.get("id") + if opid is None: + raise CalderaError("Was not able to get operation. Broken ID") self.attack_logger.vprint("New operation created. OpID: " + str(opid), 3) self.set_operation_state(opid) diff --git a/app/config.py b/app/config.py index 4fda843..64a2e88 100644 --- a/app/config.py +++ b/app/config.py @@ -151,7 +151,7 @@ class ExperimentConfig(): :param configfile: The configuration file to process """ - self.raw_config: MainConfig = None + self.raw_config: Optional[MainConfig] = None self._targets: list[MachineConfig] = [] self._attackers: list[MachineConfig] = [] self.load(configfile) @@ -232,9 +232,10 @@ class ExperimentConfig(): if self.raw_config is None: raise ConfigurationError("Config file is empty") - + res = {} try: - res = self.raw_config.attack_conf[attack] + if self.raw_config.attack_conf is not None: + res = self.raw_config.attack_conf[attack] except KeyError: res = {} if res is None: diff --git a/app/config_verifier.py b/app/config_verifier.py index 3bdf90f..367af4e 100644 --- a/app/config_verifier.py +++ b/app/config_verifier.py @@ -106,7 +106,7 @@ class Target: ssh_user: Optional[str] = None ssh_password: Optional[str] = None ssh_keyfile: Optional[str] = None - vulnerabilities: list[str] = None + vulnerabilities: Optional[list[str]] = None def has_key(self, keyname): """ Checks if a key exists @@ -182,8 +182,8 @@ class Results: class MainConfig: """ Central configuration for PurpleDome """ caldera: CalderaConfig - attackers: conlist(Attacker, min_items=1) - targets: conlist(Target, min_items=1) + attackers: conlist(Attacker, min_items=1) # type: ignore + targets: conlist(Target, min_items=1) # type: ignore attacks: AttackConfig caldera_attacks: AttackList plugin_based_attacks: AttackList diff --git a/app/exceptions.py b/app/exceptions.py index 181dba1..55ce047 100644 --- a/app/exceptions.py +++ b/app/exceptions.py @@ -28,3 +28,7 @@ class MetasploitError(Exception): class RequirementError(Exception): """ Plugin requirements not fulfilled """ + + +class MachineError(Exception): + """ A virtual machine has issues""" diff --git a/app/metasploit.py b/app/metasploit.py index 5aee5d0..8a56cd2 100644 --- a/app/metasploit.py +++ b/app/metasploit.py @@ -7,7 +7,7 @@ import os import random import requests -from pymetasploit3.msfrpc import MsfRpcClient +from pymetasploit3.msfrpc import MsfRpcClient # type: ignore # from app.machinecontrol import Machine from app.attack_log import AttackLog from app.interface_sfx import CommandlineColors diff --git a/app/pluginmanager.py b/app/pluginmanager.py index 00d44df..fe86282 100644 --- a/app/pluginmanager.py +++ b/app/pluginmanager.py @@ -17,6 +17,7 @@ from plugins.base.sensor import SensorPlugin from plugins.base.vulnerability_plugin import VulnerabilityPlugin from app.interface_sfx import CommandlineColors from app.attack_log import AttackLog +from app.exceptions import PluginError # from app.interface_sfx import CommandlineColors @@ -89,8 +90,11 @@ class PluginManager(): plugins = self.get_plugins(subclass, name_filter) res = 0 for plugin in plugins: - if plugin.needs_caldera(): - res += 1 + if isinstance(plugin, AttackPlugin): + if plugin.needs_caldera(): + res += 1 + else: + raise PluginError("Wrong plugin type. Expected AttackPlugin") return res @@ -103,8 +107,11 @@ class PluginManager(): plugins = self.get_plugins(subclass, name_filter) res = 0 for plugin in plugins: - if plugin.needs_metasploit(): - res += 1 + if isinstance(plugin, AttackPlugin): + if plugin.needs_metasploit(): + res += 1 + else: + raise PluginError("Wrong plugin type. Expected AttackPlugin") return res diff --git a/plugins/__init__.py b/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/base/__init__.py b/plugins/base/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/base/attack.py b/plugins/base/attack.py index a2d49a5..da6c2ec 100644 --- a/plugins/base/attack.py +++ b/plugins/base/attack.py @@ -69,8 +69,9 @@ class AttackPlugin(BasePlugin): :returns: True if this plugin requires Caldera """ - if Requirement.CALDERA in self.requirements: - return True + if self.requirements is not None: + if Requirement.CALDERA in self.requirements: + return True return False def needs_metasploit(self) -> bool: @@ -79,8 +80,9 @@ class AttackPlugin(BasePlugin): :meta private: :returns: True if this plugin requires Metasploit """ - if Requirement.METASPLOIT in self.requirements: - return True + if self.requirements is not None: + if Requirement.METASPLOIT in self.requirements: + return True return False def connect_metasploit(self): @@ -130,7 +132,7 @@ class AttackPlugin(BasePlugin): self.vprint(f" Plugin running command {command}", 3) - res = self.attacker_machine_plugin.__call_remote_run__(command, disown=disown) + res = self.attacker_machine_plugin.remote_run(command, disown=disown) return res def targets_run_cmd(self, command: str, disown: bool = False) -> str: @@ -145,7 +147,7 @@ class AttackPlugin(BasePlugin): self.vprint(f" Plugin running command {command}", 3) - res = self.target_machine_plugin.__call_remote_run__(command, disown=disown) + res = self.target_machine_plugin.remote_run(command, disown=disown) return res def set_target_machines(self, machine: MachineryPlugin): @@ -154,7 +156,7 @@ class AttackPlugin(BasePlugin): :param machine: Machine plugin to communicate with """ - self.target_machine_plugin = machine.vm_manager + self.target_machine_plugin = machine def set_attacker_machine(self, machine: MachineryPlugin): """ Set the machine plugin class to target @@ -162,7 +164,7 @@ class AttackPlugin(BasePlugin): :param machine: Machine to communicate with """ - self.attacker_machine_plugin = machine.vm_manager + self.attacker_machine_plugin = machine def set_caldera(self, caldera: CalderaControl): """ Set the caldera control to be used for caldera attacks diff --git a/plugins/base/machinery.py b/plugins/base/machinery.py index e874b33..7130380 100644 --- a/plugins/base/machinery.py +++ b/plugins/base/machinery.py @@ -121,6 +121,19 @@ class MachineryPlugin(BasePlugin): """ raise NotImplementedError + def get_paw(self): + """ Returns the paw of the current machine """ + return self.config.caldera_paw() + + def get_group(self): + """ Returns the group of the current machine """ + return self.config.caldera_group() + + def get_os(self): + """ Returns the OS of the machine """ + + return self.config.os() + def get_playground(self): """ Path on the machine where all the attack tools will be copied to. """ diff --git a/plugins/base/plugin_base.py b/plugins/base/plugin_base.py index f7a7ec4..5566e67 100644 --- a/plugins/base/plugin_base.py +++ b/plugins/base/plugin_base.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 """ Base class for all plugin types """ -from inspect import currentframe +from inspect import currentframe, getframeinfo import os from typing import Optional import yaml from app.exceptions import PluginError # type: ignore -import app.exceptions # type: ignore +import app.exceptions # type: ignore class BasePlugin(): @@ -73,7 +73,11 @@ class BasePlugin(): """ cf = currentframe() # pylint: disable=invalid-name - return cf.f_back.filename + if cf is None: + raise PluginError("can not get current frame") + if cf.f_back is None: + raise PluginError("can not get current frame") + return getframeinfo(cf.f_back).filename def get_linenumber(self) -> int: """ Returns the current linenumber. This can be used for debugging @@ -81,6 +85,10 @@ class BasePlugin(): :returns: currently executed linenumber """ cf = currentframe() # pylint: disable=invalid-name + if cf is None: + raise PluginError("can not get current frame") + if cf.f_back is None: + raise PluginError("can not get current frame") return cf.f_back.f_lineno def get_playground(self) -> str: @@ -224,6 +232,9 @@ class BasePlugin(): :returns: The path with the plugin code """ + if self.plugin_path is None: + raise PluginError("Non existing plugin path") + return os.path.join(os.path.dirname(self.plugin_path)) def get_default_config_filename(self) -> str: diff --git a/plugins/base/ssh_features.py b/plugins/base/ssh_features.py index 144693f..79a2d25 100644 --- a/plugins/base/ssh_features.py +++ b/plugins/base/ssh_features.py @@ -5,8 +5,8 @@ import socket import time import paramiko -from fabric import Connection -from invoke.exceptions import UnexpectedExit +from fabric import Connection # type: ignore +from invoke.exceptions import UnexpectedExit # type: ignore from app.exceptions import NetworkError from plugins.base.plugin_base import BasePlugin @@ -175,7 +175,7 @@ class SSHFeatures(BasePlugin): self.vprint(f"SSH GET: No valid connection. Errors: {error.errors}", 1) do_retry = True except FileNotFoundError as error: - self.vprint(error, 0) + self.vprint(str(error), 0) break except OSError: self.vprint("SSH GET: Obscure OSError, ignoring (file should have been copied)", 1) From ca6fb401fdf387fbbecba91fe35af7c59c0c9b3f Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Wed, 23 Feb 2022 12:41:22 +0100 Subject: [PATCH 4/7] Split experimentcontrol init function --- app/experimentcontrol.py | 97 +++++++++++++++++++++++++++++----------- 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/app/experimentcontrol.py b/app/experimentcontrol.py index f13dc56..baa21b8 100644 --- a/app/experimentcontrol.py +++ b/app/experimentcontrol.py @@ -13,7 +13,7 @@ from typing import Optional from app.attack_log import AttackLog from app.config import ExperimentConfig from app.interface_sfx import CommandlineColors -from app.exceptions import ServerError +from app.exceptions import ServerError, CalderaError, MachineError from app.pluginmanager import PluginManager from app.doc_generator import DocGenerator from app.calderacontrol import CalderaControl @@ -32,12 +32,12 @@ class Experiment(): :param configfile: Path to the configfile to load :param verbosity: verbosity level between 0 and 3 """ - self.start_time: Optional[datetime] = None #: time the experiment started - self.caldera_control: Optional[CalderaControl] = None - self.loot_dir: Optional[str] = None - self.targets = [] + self.start_time: str = datetime.now().strftime("%Y_%m_%d___%H_%M_%S") #: time the experiment started. + self.caldera_control: Optional[CalderaControl] = None #: Controller for Caldera interaction + self.loot_dir: str = "loot" #: Directory to store the loot into. Will be fetched from config + self.targets: list[Machine] = [] #: A list of target machines - self.attacker_1: Optional[Machine] = None + self.attacker_1: Optional[Machine] = None #: The attacker machine self.experiment_config = ExperimentConfig(configfile) self.attack_logger = AttackLog(verbosity) @@ -51,7 +51,8 @@ class Experiment(): :return: """ self.__start_attacker() - + if self.attacker_1 is None: + raise MachineError("Attacker not initialised") caldera_url = "http://" + self.attacker_1.get_ip() + ":8888" self.caldera_control = CalderaControl(caldera_url, attack_logger=self.attack_logger, config=self.experiment_config) # self.caldera_control = CalderaControl("http://" + self.attacker_1.get_ip() + ":8888", self.attack_logger, @@ -65,7 +66,7 @@ class Experiment(): os.makedirs(self.loot_dir) # start target machines - tname = self.start_target_machines(caldera_attacks) + self.start_target_machines(caldera_attacks) # Install vulnerabilities self.install_vulnerabilities() @@ -74,11 +75,11 @@ class Experiment(): self.install_sensor_plugins() # First start of caldera implants - self.first_start_of_caldera_implants(caldera_attacks, tname) + self.first_start_of_caldera_implants(caldera_attacks) self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Contacting caldera agents on all targets ....{CommandlineColors.ENDC}", 1) # Wait until all targets are registered as Caldera targets - self.wait_until_all_targets_have_caldera_implants(caldera_attacks, caldera_url) + self.wait_until_all_targets_have_caldera_implants(caldera_url, caldera_attacks) self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1) # Add running machines to log @@ -118,6 +119,9 @@ class Experiment(): self.zip_loot(zip_this) def run_plugin_attacks(self): + """ Run plugin based attacks + + """ self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1) for target_1 in self.targets: plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os()) @@ -133,7 +137,12 @@ class Experiment(): time.sleep(self.experiment_config.get_nap_time()) self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1) - def run_caldera_attacks(self, caldera_attacks): + def run_caldera_attacks(self, caldera_attacks: Optional[list[str]] = None): + """ Run caldera based attacks + + + :param caldera_attacks: An optional list of caldera attack ids as string + """ self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1) for target_1 in self.targets: if caldera_attacks is None: @@ -146,7 +155,8 @@ class Experiment(): # TODO: Work with snapshots # TODO: If we have several targets in the same group, it is nonsense to attack each one separately. Make this smarter self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with {attack}", 2) - + if self.caldera_control is None: + raise CalderaError("Caldera control not initialised") it_worked = self.caldera_control.attack(paw=target_1.get_paw(), ability_id=attack, group=target_1.get_group(), @@ -163,6 +173,8 @@ class Experiment(): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}Restarting caldera server and waiting for clients to re-connect{CommandlineColors.ENDC}", 1) + if self.attacker_1 is None: + raise MachineError("attacker not initialised") self.attacker_1.start_caldera_server() self.attack_logger.vprint( f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", @@ -170,6 +182,8 @@ class Experiment(): time.sleep(self.experiment_config.get_nap_time()) retries = 100 for target_system in self.targets: + if self.caldera_control is None: + raise CalderaError("Caldera is not initialised") if self.machine_needs_caldera(target_system, caldera_attacks) == 0: self.attack_logger.vprint(f"No caldera agent needed for: {target_system.get_paw()} ", 3) continue @@ -190,6 +204,7 @@ class Experiment(): self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1) def add_running_machines_to_log(self): + """ Add machine infos for targets and attacker to the log """ for target in self.targets: i = target.get_machine_info() i["role"] = "target" @@ -198,8 +213,15 @@ class Experiment(): i["role"] = "attacker" self.attack_logger.add_machine_info(i) - def wait_until_all_targets_have_caldera_implants(self, caldera_attacks, caldera_url): + def wait_until_all_targets_have_caldera_implants(self, caldera_url: str, caldera_attacks: Optional[list[str]] = None): + """ + + :param caldera_attacks: a list of command line defined caldera attacks + :param caldera_url: URL of the caldera server + """ for target_1 in self.targets: + if self.caldera_control is None: + raise CalderaError("Caldera is not initialised") running_agents = self.caldera_control.list_paws_of_running_agents() self.attack_logger.vprint(f"Agents currently running: {running_agents}", 2) while target_1.get_paw() not in running_agents: @@ -214,13 +236,17 @@ class Experiment(): time.sleep(120) # Was 30, but maybe there are timing issues running_agents = self.caldera_control.list_paws_of_running_agents() - def first_start_of_caldera_implants(self, caldera_attacks, tname): + def first_start_of_caldera_implants(self, caldera_attacks: Optional[list[str]] = None): + """ Start caldera implant on the targets + + :param caldera_attacks: a list of command line defined caldera attacks + """ at_least_one_caldera_started = False for target_1 in self.targets: if self.machine_needs_caldera(target_1, caldera_attacks): target_1.start_caldera_client() self.attack_logger.vprint( - f"{CommandlineColors.OKGREEN}Initial start of caldera client: {tname} {CommandlineColors.ENDC}", 1) + f"{CommandlineColors.OKGREEN}Initial start of caldera client: {target_1.get_name()} {CommandlineColors.ENDC}", 1) else: at_least_one_caldera_started = True if at_least_one_caldera_started: @@ -228,18 +254,28 @@ class Experiment(): # TODO: Smarter wait def install_sensor_plugins(self): + """ Installs sensor plugins on the targets + + """ for a_target in self.targets: self.attack_logger.vprint(f"Installing sensors on {a_target.get_paw()}", 2) a_target.install_sensors() a_target.start_sensors() def install_vulnerabilities(self): + """ Install vulnerabilities on the targets + + """ for a_target in self.targets: self.attack_logger.vprint(f"Installing vulnerabilities on {a_target.get_paw()}", 2) a_target.install_vulnerabilities() a_target.start_vulnerabilities() - def start_target_machines(self, caldera_attacks): + def start_target_machines(self, caldera_attacks: Optional[list[str]] = None): + """ Start target machines + + :param caldera_attacks: Caldera attacks as defined on the command line + """ for target_conf in self.experiment_config.targets(): if not target_conf.is_active(): continue @@ -249,6 +285,10 @@ class Experiment(): self.attack_logger.vprint( f"{CommandlineColors.OKBLUE}preparing target {tname} ....{CommandlineColors.ENDC}", 1) target_1 = Machine(target_conf, attack_logger=self.attack_logger) + if target_1 is None: + raise MachineError("Creating target machine failed") + if self.attacker_1 is None: + raise MachineError("Creating attacker machine failed") target_1.set_caldera_server(self.attacker_1.get_ip()) try: if not target_conf.use_existing_machine(): @@ -268,14 +308,18 @@ class Experiment(): target_1.reboot() self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1) self.targets.append(target_1) - return tname - def machine_needs_caldera(self, target, caldera_conf): - """ Counts the attacks and plugins needing caldera that are registered for this machine """ + def machine_needs_caldera(self, target, caldera_from_cmdline: Optional[list[str]] = None) -> int: + """ Counts the attacks and plugins needing caldera that are registered for this machine + + :param target: Target machine we will check the config file for assigned caldera attacks for + :param caldera_from_cmdline: Caldera attacks listed on the commandline + :returns: the number of caldera attacks planned for this machine + """ c_cmdline = 0 - if caldera_conf is not None: - c_cmdline = len(caldera_conf) + if caldera_from_cmdline is not None: + c_cmdline = len(caldera_from_cmdline) c_conffile = len(self.experiment_config.get_caldera_attacks(target.get_os())) plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target.get_os()) c_plugins = self.plugin_manager.count_caldera_requirements(AttackPlugin, plugin_based_attacks) @@ -307,8 +351,11 @@ class Experiment(): # plugin.__set_logger__(self.attack_logger) plugin.__execute__([target]) - def zip_loot(self, zip_this): - """ Zip the loot together """ + def zip_loot(self, zip_this: list[str]): + """ Zip the loot together + + :param zip_this: A list of file paths to add to the zip file + """ filename = os.path.join(self.loot_dir, self.start_time + ".zip") @@ -323,8 +370,8 @@ class Experiment(): zfh.write(os.path.join(self.loot_dir, "attack.json")) # For automation purpose we copy the file into a standard file name - defaultname = os.path.join(self.loot_dir, "..", "most_recent.zip") - shutil.copyfile(filename, defaultname) + default_name = os.path.join(self.loot_dir, "..", "most_recent.zip") + shutil.copyfile(filename, default_name) def __start_attacker(self): """ Start the attacking VM """ From 1f75d6cfc6d53645cd05156b457112f943ce8758 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Wed, 23 Feb 2022 12:41:49 +0100 Subject: [PATCH 5/7] More agressive mypy --- Makefile | 4 ++-- mypy.ini | 13 +++++++++++++ requirements.txt | 2 +- tox.ini | 3 +++ 4 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 mypy.ini diff --git a/Makefile b/Makefile index 2906aeb..e570b69 100644 --- a/Makefile +++ b/Makefile @@ -26,8 +26,8 @@ pylint: # Testing if types are used properly mypy: - mypy --strict-optional app/ + mypy --strict-optional app/ plugins/base/ # Fixing mypy file by file stepbystep: - mypy --strict-optional plugins/base/plugin_base.py plugins/base/machinery.py app/config.py plugins/base/caldera.py plugins/base/attack.py plugins/base/sensor.py plugins/base/ssh_features.py plugins/base/vulnerability_plugin.py app/attack_log.py app/calderacontrol.py \ No newline at end of file + mypy --strict-optional --disallow-untyped-defs --check-untyped-defs plugins/base/ app/ \ No newline at end of file diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..94dcf19 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,13 @@ +# My own Mypy configuration + +# Global settings +[mypy] +warn_unused_configs = True +mypy_path = $MYPY_CONFIG_FILE_DIR:$MYPY_CONFIG_FILE_DIR/app:$MYPY_CONFIG_FILE_DIR/plugins/base + + +# Setting for the main app +[mypy-app.*] + +# Setting for the plugins +[mypy-plugins.base.*] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b1aa4c4..74c66d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,7 +22,7 @@ sphinx-revealjs # sphinx-pydantic # This one has issues that must be fixed upstream first # Mypy stuff -mypy==0.910 +mypy==0.931 types-PyYAML==5.4.6 types-requests==2.25.6 types-simplejson==3.17.0 diff --git a/tox.ini b/tox.ini index fba54b8..b0429f1 100644 --- a/tox.ini +++ b/tox.ini @@ -36,6 +36,7 @@ deps = -r requirements.txt bandit pylint argcomplete + mypy commands = @@ -54,3 +55,5 @@ commands = # Linting # pylint *.py # currently off. Needs configuration python3 ./plugin_manager.py check + # mypy checks + mypy --strict-optional app/ plugins/base/ \ No newline at end of file From 2b6b00fd8ee43c925c530863075119003c1465b3 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Wed, 23 Feb 2022 13:51:28 +0100 Subject: [PATCH 6/7] pylint fix for caldera 4 api. For some classes I just do not know the exact Caldera definition --- app/calderaapi_4.py | 46 +++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/app/calderaapi_4.py b/app/calderaapi_4.py index e3e0e80..9e3b8c2 100644 --- a/app/calderaapi_4.py +++ b/app/calderaapi_4.py @@ -19,13 +19,13 @@ from pydantic import conlist # pylint: disable=no-name-in-module # TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)" @dataclass -class Variation: +class Variation: # pylint: disable=missing-class-docstring description: str command: str @dataclass -class ParserConfig: +class ParserConfig: # pylint: disable=missing-class-docstring source: str edge: str target: str @@ -33,27 +33,27 @@ class ParserConfig: @dataclass -class Parser: +class Parser: # pylint: disable=missing-class-docstring module: str relationships: list[ParserConfig] # undocumented ! Needs improvement ! TODO parserconfigs: Optional[list[ParserConfig]] = None @dataclass -class Requirement: +class Requirement: # pylint: disable=missing-class-docstring module: str relationship_match: list[dict] @dataclass -class AdditionalInfo: +class AdditionalInfo: # pylint: disable=missing-class-docstring additionalProp1: Optional[str] = None # pylint: disable=invalid-name additionalProp2: Optional[str] = None # pylint: disable=invalid-name additionalProp3: Optional[str] = None # pylint: disable=invalid-name @dataclass -class Executor: +class Executor: # pylint: disable=missing-class-docstring build_target: Optional[str] # Why can this be None ? language: Optional[str] # Why can this be None ? payloads: list[str] @@ -104,11 +104,12 @@ class Ability: @dataclass -class AbilityList(): +class AbilityList: """ A list of exploits """ abilities: Annotated[list, conlist(Ability, min_items=1)] def get_data(self): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.abilities @@ -126,6 +127,7 @@ class ObfuscatorList: obfuscators: Annotated[list, conlist(Obfuscator, min_items=1)] def get_data(self): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.obfuscators @@ -155,11 +157,12 @@ class AdversaryList: adversaries: Annotated[list, conlist(Adversary, min_items=1)] def get_data(self): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.adversaries @dataclass -class Fact: +class Fact: # pylint: disable=missing-class-docstring unique: str name: str score: int @@ -183,7 +186,7 @@ class Fact: @dataclass -class Relationship: +class Relationship: # pylint: disable=missing-class-docstring target: Fact unique: str score: int @@ -193,13 +196,13 @@ class Relationship: @dataclass -class Visibility: +class Visibility: # pylint: disable=missing-class-docstring score: int adjustments: list[int] @dataclass -class Link: +class Link: # pylint: disable=missing-class-docstring pin: int ability: Ability paw: str @@ -273,18 +276,19 @@ class AgentList: agents: list[Agent] def get_data(self): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.agents @dataclass -class Rule: +class Rule: # pylint: disable=missing-class-docstring match: str trait: str action: Optional[str] = None @dataclass -class Adjustment: +class Adjustment: # pylint: disable=missing-class-docstring offset: int trait: str value: str @@ -292,7 +296,7 @@ class Adjustment: @dataclass -class Source: +class Source: # pylint: disable=missing-class-docstring name: str plugin: str facts: list[Fact] @@ -310,10 +314,11 @@ class Source: @dataclass -class SourceList: +class SourceList: # pylint: disable=missing-class-docstring sources: list[Source] def get_data(self): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.sources @@ -334,14 +339,16 @@ class Planner: @dataclass class PlannerList: + """ A list of planners""" planners: list[Planner] def get_data(self): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.planners @dataclass -class Goal: +class Goal: # pylint: disable=missing-class-docstring target: str count: int achieved: bool @@ -350,7 +357,7 @@ class Goal: @dataclass -class Objective: +class Objective: # pylint: disable=missing-class-docstring percentage: int name: str goals: list[Goal] @@ -396,17 +403,20 @@ class Operation: @dataclass class OperationList: + """ A list of operations """ operations: Annotated[list, conlist(Operation)] def get_data(self): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.operations @dataclass -class ObjectiveList: +class ObjectiveList: # pylint: disable=missing-class-docstring objectives: Annotated[list, conlist(Objective)] def get_data(self): + """ Get a specific element out of the internal data representation, behaves like the well know 'get' """ return self.objectives From 76317d0ebb0bfc95dc4c40837e00b80abd2fc03e Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Wed, 23 Feb 2022 13:51:57 +0100 Subject: [PATCH 7/7] Activating pylint for code checks --- tox.ini | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tox.ini b/tox.ini index b0429f1..273ad41 100644 --- a/tox.ini +++ b/tox.ini @@ -52,8 +52,9 @@ commands = safety check -r requirements.txt # Check for common vulnerabilities bandit -ll -r app/ plugins/ *.py - # Linting - # pylint *.py # currently off. Needs configuration + # pylint check (linter, basic checks only) + pylint --rcfile=pylint.rc app/ plugins/base/ caldera_control.py doc_generator.py experiment_control.py machine_control.py metasploit_control.py plugin_manager.py + # Own plugin checker python3 ./plugin_manager.py check - # mypy checks + # mypy checks (type checker) mypy --strict-optional app/ plugins/base/ \ No newline at end of file