From 5c70b7202b2c8e62ba187f8d1b3b46d4454af868 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Wed, 23 Feb 2022 12:40:53 +0100 Subject: [PATCH] Make code survive a mypy scan --- app/__init__.py | 0 app/attack_log.py | 13 +++++++++++-- app/calderaapi_4.py | 14 +++++++------- app/calderacontrol.py | 27 ++++++++++++++++++++------- app/config.py | 7 ++++--- app/config_verifier.py | 6 +++--- app/exceptions.py | 4 ++++ app/metasploit.py | 2 +- app/pluginmanager.py | 15 +++++++++++---- plugins/__init__.py | 0 plugins/base/__init__.py | 0 plugins/base/attack.py | 18 ++++++++++-------- plugins/base/machinery.py | 13 +++++++++++++ plugins/base/plugin_base.py | 17 ++++++++++++++--- plugins/base/ssh_features.py | 6 +++--- 15 files changed, 101 insertions(+), 41 deletions(-) create mode 100644 app/__init__.py create mode 100644 plugins/__init__.py create mode 100644 plugins/base/__init__.py diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/attack_log.py b/app/attack_log.py index 57ff022..6f391d1 100644 --- a/app/attack_log.py +++ b/app/attack_log.py @@ -510,6 +510,15 @@ class AttackLog(): logid = timestamp + "_" + str(randint(1, 100000)) cframe = currentframe() + default_sourcefile = "" + if cframe is not None: + if cframe.f_back is not None: + default_sourcefile = getsourcefile(cframe.f_back) or "" + + default_sourceline = -1 + if cframe is not None: + if cframe.f_back is not None: + default_sourceline = cframe.f_back.f_lineno data = {"timestamp": timestamp, "timestamp_end": None, @@ -528,8 +537,8 @@ class AttackLog(): "situation_description": kwargs.get("situation_description", None), # Description for the situation this attack was run in. Set by the plugin or attacker emulation "countermeasure": kwargs.get("countermeasure", None), # Set by the attack "result": None, - "sourcefile": kwargs.get("sourcefile", getsourcefile(cframe.f_back)), - "sourceline": kwargs.get("sourceline", cframe.f_back.f_lineno) + "sourcefile": kwargs.get("sourcefile", default_sourcefile), + "sourceline": kwargs.get("sourceline", default_sourceline) } self.__add_to_log__(data) diff --git a/app/calderaapi_4.py b/app/calderaapi_4.py index 712d9d2..e3e0e80 100644 --- a/app/calderaapi_4.py +++ b/app/calderaapi_4.py @@ -5,7 +5,7 @@ import json from pprint import pformat -from typing import Optional, Union +from typing import Optional, Union, Annotated import requests import simplejson from pydantic.dataclasses import dataclass @@ -104,9 +104,9 @@ class Ability: @dataclass -class AbilityList: +class AbilityList(): """ A list of exploits """ - abilities: conlist(Ability, min_items=1) + abilities: Annotated[list, conlist(Ability, min_items=1)] def get_data(self): return self.abilities @@ -123,7 +123,7 @@ class Obfuscator: @dataclass class ObfuscatorList: """ A list of obfuscators """ - obfuscators: conlist(Obfuscator, min_items=1) + obfuscators: Annotated[list, conlist(Obfuscator, min_items=1)] def get_data(self): return self.obfuscators @@ -152,7 +152,7 @@ class Adversary: @dataclass class AdversaryList: """ A list of adversary """ - adversaries: conlist(Adversary, min_items=1) + adversaries: Annotated[list, conlist(Adversary, min_items=1)] def get_data(self): return self.adversaries @@ -396,7 +396,7 @@ class Operation: @dataclass class OperationList: - operations: conlist(Operation) + operations: Annotated[list, conlist(Operation)] def get_data(self): return self.operations @@ -404,7 +404,7 @@ class OperationList: @dataclass class ObjectiveList: - objectives: conlist(Objective) + objectives: Annotated[list, conlist(Objective)] def get_data(self): return self.objectives diff --git a/app/calderacontrol.py b/app/calderacontrol.py index ab8bf4b..df11d52 100644 --- a/app/calderacontrol.py +++ b/app/calderacontrol.py @@ -66,11 +66,14 @@ class CalderaControl(CalderaAPI): return {} res = {} - for i in source.get("facts"): - res[i.get("trait")] = {"value": i.get("value"), - "technique_id": i.get("technique_id"), - "collected_by": i.get("collected_by") - } + if source is not None: + facts = source.get("facts") + if facts is not None: + for fact in facts: + res[fact.get("trait")] = {"value": fact.get("value"), + "technique_id": fact.get("technique_id"), + "collected_by": fact.get("collected_by") + } return res def list_paws_of_running_agents(self) -> list[str]: @@ -344,7 +347,12 @@ class CalderaControl(CalderaAPI): return False self.add_adversary(adversary_name, ability_id) - adid = self.get_adversary(adversary_name).get("adversary_id") + adversary = self.get_adversary(adversary_name) + if adversary is None: + raise CalderaError("Could not get adversary") + adid = adversary.get("adversary_id", None) + if adid is None: + raise CalderaError("Could not get adversary by id") logid = self.attack_logger.start_caldera_attack(source=self.url, paw=paw, @@ -370,7 +378,12 @@ class CalderaControl(CalderaAPI): ) self.attack_logger.vprint(pformat(res), 3) - opid = self.get_operation(operation_name).get("id") + operation = self.get_operation(operation_name) + if operation is None: + raise CalderaError("Was not able to get operation") + opid = operation.get("id") + if opid is None: + raise CalderaError("Was not able to get operation. Broken ID") self.attack_logger.vprint("New operation created. OpID: " + str(opid), 3) self.set_operation_state(opid) diff --git a/app/config.py b/app/config.py index 4fda843..64a2e88 100644 --- a/app/config.py +++ b/app/config.py @@ -151,7 +151,7 @@ class ExperimentConfig(): :param configfile: The configuration file to process """ - self.raw_config: MainConfig = None + self.raw_config: Optional[MainConfig] = None self._targets: list[MachineConfig] = [] self._attackers: list[MachineConfig] = [] self.load(configfile) @@ -232,9 +232,10 @@ class ExperimentConfig(): if self.raw_config is None: raise ConfigurationError("Config file is empty") - + res = {} try: - res = self.raw_config.attack_conf[attack] + if self.raw_config.attack_conf is not None: + res = self.raw_config.attack_conf[attack] except KeyError: res = {} if res is None: diff --git a/app/config_verifier.py b/app/config_verifier.py index 3bdf90f..367af4e 100644 --- a/app/config_verifier.py +++ b/app/config_verifier.py @@ -106,7 +106,7 @@ class Target: ssh_user: Optional[str] = None ssh_password: Optional[str] = None ssh_keyfile: Optional[str] = None - vulnerabilities: list[str] = None + vulnerabilities: Optional[list[str]] = None def has_key(self, keyname): """ Checks if a key exists @@ -182,8 +182,8 @@ class Results: class MainConfig: """ Central configuration for PurpleDome """ caldera: CalderaConfig - attackers: conlist(Attacker, min_items=1) - targets: conlist(Target, min_items=1) + attackers: conlist(Attacker, min_items=1) # type: ignore + targets: conlist(Target, min_items=1) # type: ignore attacks: AttackConfig caldera_attacks: AttackList plugin_based_attacks: AttackList diff --git a/app/exceptions.py b/app/exceptions.py index 181dba1..55ce047 100644 --- a/app/exceptions.py +++ b/app/exceptions.py @@ -28,3 +28,7 @@ class MetasploitError(Exception): class RequirementError(Exception): """ Plugin requirements not fulfilled """ + + +class MachineError(Exception): + """ A virtual machine has issues""" diff --git a/app/metasploit.py b/app/metasploit.py index 5aee5d0..8a56cd2 100644 --- a/app/metasploit.py +++ b/app/metasploit.py @@ -7,7 +7,7 @@ import os import random import requests -from pymetasploit3.msfrpc import MsfRpcClient +from pymetasploit3.msfrpc import MsfRpcClient # type: ignore # from app.machinecontrol import Machine from app.attack_log import AttackLog from app.interface_sfx import CommandlineColors diff --git a/app/pluginmanager.py b/app/pluginmanager.py index 00d44df..fe86282 100644 --- a/app/pluginmanager.py +++ b/app/pluginmanager.py @@ -17,6 +17,7 @@ from plugins.base.sensor import SensorPlugin from plugins.base.vulnerability_plugin import VulnerabilityPlugin from app.interface_sfx import CommandlineColors from app.attack_log import AttackLog +from app.exceptions import PluginError # from app.interface_sfx import CommandlineColors @@ -89,8 +90,11 @@ class PluginManager(): plugins = self.get_plugins(subclass, name_filter) res = 0 for plugin in plugins: - if plugin.needs_caldera(): - res += 1 + if isinstance(plugin, AttackPlugin): + if plugin.needs_caldera(): + res += 1 + else: + raise PluginError("Wrong plugin type. Expected AttackPlugin") return res @@ -103,8 +107,11 @@ class PluginManager(): plugins = self.get_plugins(subclass, name_filter) res = 0 for plugin in plugins: - if plugin.needs_metasploit(): - res += 1 + if isinstance(plugin, AttackPlugin): + if plugin.needs_metasploit(): + res += 1 + else: + raise PluginError("Wrong plugin type. Expected AttackPlugin") return res diff --git a/plugins/__init__.py b/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/base/__init__.py b/plugins/base/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/base/attack.py b/plugins/base/attack.py index a2d49a5..da6c2ec 100644 --- a/plugins/base/attack.py +++ b/plugins/base/attack.py @@ -69,8 +69,9 @@ class AttackPlugin(BasePlugin): :returns: True if this plugin requires Caldera """ - if Requirement.CALDERA in self.requirements: - return True + if self.requirements is not None: + if Requirement.CALDERA in self.requirements: + return True return False def needs_metasploit(self) -> bool: @@ -79,8 +80,9 @@ class AttackPlugin(BasePlugin): :meta private: :returns: True if this plugin requires Metasploit """ - if Requirement.METASPLOIT in self.requirements: - return True + if self.requirements is not None: + if Requirement.METASPLOIT in self.requirements: + return True return False def connect_metasploit(self): @@ -130,7 +132,7 @@ class AttackPlugin(BasePlugin): self.vprint(f" Plugin running command {command}", 3) - res = self.attacker_machine_plugin.__call_remote_run__(command, disown=disown) + res = self.attacker_machine_plugin.remote_run(command, disown=disown) return res def targets_run_cmd(self, command: str, disown: bool = False) -> str: @@ -145,7 +147,7 @@ class AttackPlugin(BasePlugin): self.vprint(f" Plugin running command {command}", 3) - res = self.target_machine_plugin.__call_remote_run__(command, disown=disown) + res = self.target_machine_plugin.remote_run(command, disown=disown) return res def set_target_machines(self, machine: MachineryPlugin): @@ -154,7 +156,7 @@ class AttackPlugin(BasePlugin): :param machine: Machine plugin to communicate with """ - self.target_machine_plugin = machine.vm_manager + self.target_machine_plugin = machine def set_attacker_machine(self, machine: MachineryPlugin): """ Set the machine plugin class to target @@ -162,7 +164,7 @@ class AttackPlugin(BasePlugin): :param machine: Machine to communicate with """ - self.attacker_machine_plugin = machine.vm_manager + self.attacker_machine_plugin = machine def set_caldera(self, caldera: CalderaControl): """ Set the caldera control to be used for caldera attacks diff --git a/plugins/base/machinery.py b/plugins/base/machinery.py index e874b33..7130380 100644 --- a/plugins/base/machinery.py +++ b/plugins/base/machinery.py @@ -121,6 +121,19 @@ class MachineryPlugin(BasePlugin): """ raise NotImplementedError + def get_paw(self): + """ Returns the paw of the current machine """ + return self.config.caldera_paw() + + def get_group(self): + """ Returns the group of the current machine """ + return self.config.caldera_group() + + def get_os(self): + """ Returns the OS of the machine """ + + return self.config.os() + def get_playground(self): """ Path on the machine where all the attack tools will be copied to. """ diff --git a/plugins/base/plugin_base.py b/plugins/base/plugin_base.py index f7a7ec4..5566e67 100644 --- a/plugins/base/plugin_base.py +++ b/plugins/base/plugin_base.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 """ Base class for all plugin types """ -from inspect import currentframe +from inspect import currentframe, getframeinfo import os from typing import Optional import yaml from app.exceptions import PluginError # type: ignore -import app.exceptions # type: ignore +import app.exceptions # type: ignore class BasePlugin(): @@ -73,7 +73,11 @@ class BasePlugin(): """ cf = currentframe() # pylint: disable=invalid-name - return cf.f_back.filename + if cf is None: + raise PluginError("can not get current frame") + if cf.f_back is None: + raise PluginError("can not get current frame") + return getframeinfo(cf.f_back).filename def get_linenumber(self) -> int: """ Returns the current linenumber. This can be used for debugging @@ -81,6 +85,10 @@ class BasePlugin(): :returns: currently executed linenumber """ cf = currentframe() # pylint: disable=invalid-name + if cf is None: + raise PluginError("can not get current frame") + if cf.f_back is None: + raise PluginError("can not get current frame") return cf.f_back.f_lineno def get_playground(self) -> str: @@ -224,6 +232,9 @@ class BasePlugin(): :returns: The path with the plugin code """ + if self.plugin_path is None: + raise PluginError("Non existing plugin path") + return os.path.join(os.path.dirname(self.plugin_path)) def get_default_config_filename(self) -> str: diff --git a/plugins/base/ssh_features.py b/plugins/base/ssh_features.py index 144693f..79a2d25 100644 --- a/plugins/base/ssh_features.py +++ b/plugins/base/ssh_features.py @@ -5,8 +5,8 @@ import socket import time import paramiko -from fabric import Connection -from invoke.exceptions import UnexpectedExit +from fabric import Connection # type: ignore +from invoke.exceptions import UnexpectedExit # type: ignore from app.exceptions import NetworkError from plugins.base.plugin_base import BasePlugin @@ -175,7 +175,7 @@ class SSHFeatures(BasePlugin): self.vprint(f"SSH GET: No valid connection. Errors: {error.errors}", 1) do_retry = True except FileNotFoundError as error: - self.vprint(error, 0) + self.vprint(str(error), 0) break except OSError: self.vprint("SSH GET: Obscure OSError, ignoring (file should have been copied)", 1)