From c8b960f610c38b653d68c224b2d5d97f1bc449d2 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Fri, 4 Mar 2022 07:57:52 +0100 Subject: [PATCH] More MyPy enforcements --- app/attack_log.py | 8 ++-- app/calderaapi_4.py | 2 +- app/calderacontrol.py | 46 +++++++++++-------- app/experimentcontrol.py | 4 +- app/metasploit.py | 12 ++++- mypy.ini | 4 +- plugins/base/attack.py | 88 ++++++++++++++++++++++++------------ tests/test_calderacontrol.py | 7 +-- 8 files changed, 112 insertions(+), 59 deletions(-) diff --git a/app/attack_log.py b/app/attack_log.py index 9ab730e..994c376 100644 --- a/app/attack_log.py +++ b/app/attack_log.py @@ -6,7 +6,7 @@ from inspect import currentframe, getsourcefile import json import datetime from random import randint -from typing import Optional +from typing import Optional, Any def __mitre_fix_ttp__(ttp: Optional[str]) -> str: @@ -171,7 +171,7 @@ class AttackLog(): return data[ability_id] - def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: Optional[str] = None, **kwargs: dict) -> str: + def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: Optional[str] = None, **kwargs: Any) -> str: """ Mark the start of a caldera attack :param source: source of the attack. Attack IP @@ -215,7 +215,7 @@ class AttackLog(): # TODO: Add config # TODO: Add results - def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs: dict) -> None: + def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs: Any) -> None: """ Mark the end of a caldera attack :param source: source of the attack. Attack IP @@ -609,7 +609,7 @@ class AttackLog(): # TODO: Add config # TODO: Add results - def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs: dict) -> None: + def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs: Any) -> None: """ Mark the end of an attack plugin :param source: source of the attack. Attack IP diff --git a/app/calderaapi_4.py b/app/calderaapi_4.py index ffab8be..71126cc 100644 --- a/app/calderaapi_4.py +++ b/app/calderaapi_4.py @@ -624,7 +624,7 @@ class CalderaAPI(): data = self.__contact_server__(payload, method="patch", rest_path=f"api/v2/agents/{agent_paw}") return data - def add_operation(self, **kwargs: dict) -> OperationList: + def add_operation(self, **kwargs: Any) -> OperationList: """ Adds a new operation :param kwargs: diff --git a/app/calderacontrol.py b/app/calderacontrol.py index df11d52..b2b0c49 100644 --- a/app/calderacontrol.py +++ b/app/calderacontrol.py @@ -9,11 +9,11 @@ from pprint import pprint, pformat from typing import Optional import requests -from app.exceptions import CalderaError +from app.exceptions import CalderaError, ConfigurationError from app.interface_sfx import CommandlineColors # from app.calderaapi_2 import CalderaAPI -from app.calderaapi_4 import CalderaAPI +from app.calderaapi_4 import CalderaAPI, Operation, Source, Adversary, Objective, Ability # TODO: Ability deserves an own class. @@ -41,7 +41,7 @@ class CalderaControl(CalderaAPI): # print(r.headers) return filename - def list_sources_for_name(self, name: str) -> Optional[dict]: + def list_sources_for_name(self, name: str) -> Optional[Source]: """ List facts in a source pool with a specific name :param name: The name of the source pool @@ -85,7 +85,7 @@ class CalderaControl(CalderaAPI): # return [i.paw for i in self.list_agents()] # 4* version # ######### Get one specific item - def get_operation(self, name: str) -> Optional[dict]: + def get_operation(self, name: str) -> Optional[Operation]: """ Gets an operation by name :param name: Name of the operation to look for @@ -97,7 +97,7 @@ class CalderaControl(CalderaAPI): return operation return None - def get_adversary(self, name: str) -> Optional[dict]: + def get_adversary(self, name: str) -> Optional[Adversary]: """ Gets a specific adversary by name :param name: Name to look for @@ -108,7 +108,7 @@ class CalderaControl(CalderaAPI): return adversary return None - def get_objective(self, name: str) -> Optional[dict]: + def get_objective(self, name: str) -> Optional[Objective]: """ Returns an objective with a given name :param name: Name to filter for @@ -119,7 +119,7 @@ class CalderaControl(CalderaAPI): return objective return None - def get_ability(self, abid: str) -> list[dict]: + def get_ability(self, abid: str) -> list[Ability]: """ Return an ability by id :param abid: Ability id @@ -165,7 +165,7 @@ class CalderaControl(CalderaAPI): print(self.get_ability(abid)) return False - def get_operation_by_id(self, op_id: str) -> list[dict]: + def get_operation_by_id(self, op_id: str) -> list[Operation]: """ Get operation by id :param op_id: Operation id @@ -190,11 +190,15 @@ class CalderaControl(CalderaAPI): operation = self.get_operation_by_id(op_id) # print("Check for: {} {}".format(paw, ability_id)) - for alink in operation[0]["chain"]: + if len(operation) == 0: + return None + if operation[0].chain is None: + return None + for alink in operation[0].chain: # print("Lookup: PAW: {} Ability: {}".format(alink["paw"], alink["ability"]["ability_id"])) # print("In: " + str(alink)) - if alink["paw"] == paw and alink["ability"]["ability_id"] == ability_id: - return alink["id"] + if alink.paw == paw and alink.ability.ability_id == ability_id: + return alink.id return None @@ -217,7 +221,7 @@ class CalderaControl(CalderaAPI): print(f"Could not find {paw} in {orep['steps']}") raise CalderaError # print("oprep: " + str(orep)) - for a_step in orep.get("steps").get(paw).get("steps"): + for a_step in orep.get("steps").get(paw).get("steps"): # type: ignore if a_step.get("ability_id") == ability_id: try: return a_step.get("output") @@ -299,12 +303,12 @@ class CalderaControl(CalderaAPI): # Plus: 0 as "finished" # - operation = self.get_operation_by_id(opid) + operation: list[Operation] = self.get_operation_by_id(opid) # print(f"Operation data {operation}") try: - for host_group in operation[0]["host_group"]: - for alink in host_group["links"]: - if alink["status"] != 0: + for host_group in operation[0].host_group: + for alink in host_group.links: + if alink.status != 0: return False except Exception as exception: raise CalderaError from exception @@ -313,7 +317,7 @@ class CalderaControl(CalderaAPI): # ######## All inclusive methods def attack(self, paw: str = "kickme", ability_id: str = "bd527b63-9f9e-46e0-9816-b8434d2b8989", - group: str = "red", target_platform: Optional[str] = None, parameters: Optional[str] = None, **kwargs) -> bool: + group: str = "red", target_platform: Optional[str] = None, parameters: Optional[dict] = None, **kwargs) -> bool: """ Attacks a system and returns results :param paw: Paw to attack @@ -332,8 +336,10 @@ class CalderaControl(CalderaAPI): # caesar: failed # base64noPadding: worked # steganopgraphy: ? - obfuscator = self.config.get_caldera_obfuscator() - jitter = self.config.get_caldera_jitter() + if self.config is None: + raise ConfigurationError("No Config") + obfuscator: str = self.config.get_caldera_obfuscator() + jitter: str = self.config.get_caldera_jitter() adversary_name = "generated_adv__" + str(time.time()) operation_name = "testoperation__" + str(time.time()) @@ -426,7 +432,7 @@ class CalderaControl(CalderaAPI): self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_GREEN} Output: {outp} {CommandlineColors.ENDC}", 2) pprint(output) - self.attack_logger.vprint(self.list_facts_for_name("source_" + operation_name), 2) + self.attack_logger.vprint(str(self.list_facts_for_name("source_" + operation_name)), 2) # ######## Cleanup self.set_operation_state(opid, "cleanup") diff --git a/app/experimentcontrol.py b/app/experimentcontrol.py index c29e14a..ab7b8da 100644 --- a/app/experimentcontrol.py +++ b/app/experimentcontrol.py @@ -357,7 +357,9 @@ class Experiment(): plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name())) if self.attacker_1 is None: raise PluginError("Attacker not properly configured") - plugin.set_attacker_machine(self.attacker_1) + if self.attacker_1.vm_manager is None: + raise PluginError("Attacker not properly configured") + plugin.set_attacker_machine(self.attacker_1.vm_manager) plugin.set_sysconf({}) plugin.set_logger(self.attack_logger) if self.caldera_control is None: diff --git a/app/metasploit.py b/app/metasploit.py index d785526..ec19729 100644 --- a/app/metasploit.py +++ b/app/metasploit.py @@ -23,7 +23,7 @@ from app.interface_sfx import CommandlineColors class Metasploit(): """ Metasploit class for basic Metasploit wrapping """ - def __init__(self, password: str, attack_logger: AttackLog, **kwargs: dict) -> None: + def __init__(self, password: str, attack_logger: AttackLog, **kwargs: Any) -> None: """ :param password: password for the msfrpcd @@ -170,6 +170,11 @@ class Metasploit(): :return: the string results """ + if self.client is None: + raise MetasploitError("No client") + if self.client.sessions is None: + raise MetasploitError("No sessions") + shell = self.client.sessions.session(self.get_sid(session_number)) res = [] for cmd in cmds: @@ -187,6 +192,11 @@ class Metasploit(): :return: the string results """ + if self.client is None: + raise MetasploitError("No client") + if self.client.sessions is None: + raise MetasploitError("No sessions") + session_id = self.get_sid_to(target) # print(f"Session ID: {session_id}") shell = self.client.sessions.session(session_id) diff --git a/mypy.ini b/mypy.ini index 94dcf19..da06686 100644 --- a/mypy.ini +++ b/mypy.ini @@ -4,7 +4,9 @@ [mypy] warn_unused_configs = True mypy_path = $MYPY_CONFIG_FILE_DIR:$MYPY_CONFIG_FILE_DIR/app:$MYPY_CONFIG_FILE_DIR/plugins/base - +# disallow_untyped_defs = True # Activate that as soon as refactoring and "make stepbystep" works +# check_untyped_defs = True # Activate that as soon as refactoring and "make stepbystep" works +exclude = app/calderaapi_2.py # Setting for the main app [mypy-app.*] diff --git a/plugins/base/attack.py b/plugins/base/attack.py index da6c2ec..6386d67 100644 --- a/plugins/base/attack.py +++ b/plugins/base/attack.py @@ -3,13 +3,14 @@ from enum import Enum import os -from typing import Optional +from typing import Optional, Any from app.calderacontrol import CalderaControl from app.exceptions import PluginError, ConfigurationError, RequirementError from app.metasploit import MetasploitInstant from plugins.base.machinery import MachineryPlugin from plugins.base.plugin_base import BasePlugin +# from app.machinecontrol import Machine class Requirement(Enum): @@ -25,7 +26,7 @@ class AttackPlugin(BasePlugin): # name: Optional[str] = None # description: Optional[str] = None ttp: Optional[str] = None #: TTP of this attack. Or ??? if unknown - references = None # A list of urls or other references + references: list[str] = [] #: A list of urls or other references required_files: list[str] = [] # Better use the other required_files features required_files_attacker: list[str] = [] #: A list of files to automatically install to the attacker @@ -33,27 +34,28 @@ class AttackPlugin(BasePlugin): requirements: Optional[list[Requirement]] = [] #: Requirements to run this plugin, Available are METASPLOIT and CALDERA at the moment - def __init__(self): + def __init__(self) -> None: super().__init__() self.conf: dict = {} # Plugin specific configuration # self.sysconf = {} # System configuration. common for all plugins - self.attacker_machine_plugin = None # The machine plugin referencing the attacker. The Kali machine should be the perfect candidate - self.target_machine_plugin = None # The machine plugin referencing the target - self.caldera = None # The Caldera connection object - self.targets = None + self.attacker_machine_plugin: Optional[MachineryPlugin] = None # The machine plugin referencing the attacker. The Kali machine should be the perfect candidate + self.target_machine_plugin: Optional[MachineryPlugin] = None # The machine plugin referencing the target + self.caldera: Optional[CalderaControl] = None # The Caldera connection object + self.targets: list[Any] = [] self.metasploit_password: str = "password" self.metasploit_user: str = "user" - self.metasploit = None + self.metasploit: Optional[MetasploitInstant] = None - def run(self, targets: list[str]): + def run(self, targets: list[str]) -> str: """ The attack is ran here. This method **must be implemented** - @param targets: A list of targets, ip addresses will do + :param targets: A list of targets, ip addresses will do + :return: The result as a string """ raise NotImplementedError - def install(self): # pylint: disable=no-self-use + def install(self) -> None: # pylint: disable=no-self-use """ Install and setup requirements for the attack This step is *optional* @@ -85,12 +87,15 @@ class AttackPlugin(BasePlugin): return True return False - def connect_metasploit(self): + def connect_metasploit(self) -> None: """ Inits metasploit :meta private: """ + if self.attack_logger is None: + raise PluginError("Attack logger is required") + if self.needs_metasploit(): self.metasploit = MetasploitInstant(self.metasploit_password, attack_logger=self.attack_logger, @@ -98,13 +103,19 @@ class AttackPlugin(BasePlugin): username=self.metasploit_user) # If metasploit requirements are not set, self.metasploit stay None and using metasploit from a plugin not having the requirements will trigger an exception - def copy_to_attacker_and_defender(self): + def copy_to_attacker_and_defender(self) -> None: """ Copy attacker/defender specific files to the machines. Called by setup, do not call it yourself. template processing happens before :meta private: """ + if self.plugin_path is None: + raise PluginError("Path for plugin not set") + + if self.attacker_machine_plugin is None: + raise PluginError("Attacker machine not registered") + for a_file in self.required_files_attacker: src = os.path.join(os.path.dirname(self.plugin_path), a_file) self.vprint(src, 3) @@ -112,7 +123,7 @@ class AttackPlugin(BasePlugin): # TODO: add target(s) - def teardown(self): + def teardown(self) -> None: """ Cleanup afterwards This is an *optional* method which is called after the attack. If you want to do some cleanup in your plugin, implement it. @@ -150,7 +161,7 @@ class AttackPlugin(BasePlugin): res = self.target_machine_plugin.remote_run(command, disown=disown) return res - def set_target_machines(self, machine: MachineryPlugin): + def set_target_machines(self, machine: MachineryPlugin) -> None: """ Set the machine to target :param machine: Machine plugin to communicate with @@ -158,7 +169,7 @@ class AttackPlugin(BasePlugin): self.target_machine_plugin = machine - def set_attacker_machine(self, machine: MachineryPlugin): + def set_attacker_machine(self, machine: MachineryPlugin) -> None: """ Set the machine plugin class to target :param machine: Machine to communicate with @@ -166,7 +177,7 @@ class AttackPlugin(BasePlugin): self.attacker_machine_plugin = machine - def set_caldera(self, caldera: CalderaControl): + def set_caldera(self, caldera: CalderaControl) -> None: """ Set the caldera control to be used for caldera attacks @param caldera: The caldera object to connect through @@ -175,7 +186,7 @@ class AttackPlugin(BasePlugin): if self.needs_caldera(): self.caldera = caldera - def caldera_attack(self, target: MachineryPlugin, ability_id: str, parameters=None, **kwargs): + def caldera_attack(self, target: MachineryPlugin, ability_id: str, parameters: Optional[dict] = None, **kwargs) -> None: """ Attack a single target using caldera :param target: Target machine object @@ -186,6 +197,9 @@ class AttackPlugin(BasePlugin): if not self.needs_caldera(): raise RequirementError("Caldera not in requirements") + if self.caldera is None: + raise PluginError("Caldera not configured") + self.caldera.attack(paw=target.get_paw(), ability_id=ability_id, group=target.get_group(), @@ -194,7 +208,7 @@ class AttackPlugin(BasePlugin): **kwargs ) - def get_attacker_playground(self) -> str: + def get_attacker_playground(self) -> Optional[str]: """ Returns the attacker machine specific playground This is the folder on the machine where we run our tasks in @@ -205,26 +219,39 @@ class AttackPlugin(BasePlugin): if self.attacker_machine_plugin is None: raise PluginError("Attacker machine not configured.") - return self.attacker_machine_plugin.get_playground() + playground = self.attacker_machine_plugin.get_playground() + + return playground - def __execute__(self, targets): + def __execute__(self, targets: list[Any]) -> str: """ Execute the plugin. This is called by the code :meta private: - @param targets: A list of targets => machines + :param targets: A list of targets => machines (and it would be smarter to use MachineryPlugin instead of machine) """ + # TODO: Use MachineryPlugin instead of Machine + + if self.attack_logger is None: + raise PluginError("Attack logger not defined") + if self.name is None: + raise PluginError("Plugin has no name") + if self.attacker_machine_plugin is None: + raise PluginError("No attacker machine plugin present") + if self.attacker_machine_plugin.config is None: + raise PluginError("Configuration broken") + self.targets = targets - ips = [tgt.get_ip() for tgt in targets] + target_ip = targets[0].get_ip() self.setup() - self.attack_logger.start_attack_plugin(self.attacker_machine_plugin.config.vmname(), ips, self.name, ttp=self.get_ttp()) + self.attack_logger.start_attack_plugin(self.attacker_machine_plugin.config.vmname(), target_ip, self.name, ttp=self.get_ttp()) res = self.run(targets) self.teardown() - self.attack_logger.stop_attack_plugin(self.attacker_machine_plugin.config.vmname(), ips, self.name, ttp=self.get_ttp()) + self.attack_logger.stop_attack_plugin(self.attacker_machine_plugin.config.vmname(), target_ip, self.name, ttp=self.get_ttp()) return res - def get_ttp(self): + def get_ttp(self) -> str: """ Returns the ttp of the plugin, please set in boilerplate :meta private: @@ -235,7 +262,7 @@ class AttackPlugin(BasePlugin): raise NotImplementedError - def get_references(self): + def get_references(self) -> list[str]: """ Returns the references of the plugin, please set in boilerplate :meta private: @@ -246,7 +273,7 @@ class AttackPlugin(BasePlugin): raise NotImplementedError - def get_target_by_name(self, name: str): + def get_target_by_name(self, name: str) -> Any: """ Returns a target machine out of the target pool by matching the name If there is no matching name it will look into the "nicknames" list of the machine config @@ -254,6 +281,11 @@ class AttackPlugin(BasePlugin): @returns: the machine """ + # TODO: Current return is Machine, but refactoring should replace it with MachineryPlugin + + if self.targets is None: + raise PluginError("No targets available") + for target in self.targets: if target.get_name() == name: return target diff --git a/tests/test_calderacontrol.py b/tests/test_calderacontrol.py index 91dd59d..215f6a6 100644 --- a/tests/test_calderacontrol.py +++ b/tests/test_calderacontrol.py @@ -1,10 +1,11 @@ import unittest from unittest.mock import patch, call from app.calderacontrol import CalderaControl -from simplejson.errors import JSONDecodeError +from simplejson.errors import JSONDecodeError # type: ignore from app.exceptions import CalderaError from app.attack_log import AttackLog import pydantic +from dotmap import DotMap # type: ignore # https://docs.python.org/3/library/unittest.html @@ -168,7 +169,7 @@ class TestExample(unittest.TestCase): "ability": {"ability_id": ability_id}, "id": "Getme"} - op = [{"chain": [alink]}] + op = [DotMap({"chain": [alink]})] with patch.object(self.cc, "get_operation_by_id", return_value=op): res = self.cc.get_linkid("Foo", paw, ability_id) @@ -183,7 +184,7 @@ class TestExample(unittest.TestCase): "ability": {"ability_id": ability_id}, "id": "Getme"} - op = [{"chain": [alink]}] + op = [DotMap({"chain": [alink]})] with patch.object(self.cc, "get_operation_by_id", return_value=op): res = self.cc.get_linkid("Foo", "Bar", ability_id)