More MyPy enforcements

pull/44/head
Thorsten Sick 2 years ago
parent 3dd5eda374
commit c8b960f610

@ -6,7 +6,7 @@ from inspect import currentframe, getsourcefile
import json
import datetime
from random import randint
from typing import Optional
from typing import Optional, Any
def __mitre_fix_ttp__(ttp: Optional[str]) -> str:
@ -171,7 +171,7 @@ class AttackLog():
return data[ability_id]
def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: Optional[str] = None, **kwargs: dict) -> str:
def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: Optional[str] = None, **kwargs: Any) -> str:
""" Mark the start of a caldera attack
:param source: source of the attack. Attack IP
@ -215,7 +215,7 @@ class AttackLog():
# TODO: Add config
# TODO: Add results
def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs: dict) -> None:
def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs: Any) -> None:
""" Mark the end of a caldera attack
:param source: source of the attack. Attack IP
@ -609,7 +609,7 @@ class AttackLog():
# TODO: Add config
# TODO: Add results
def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs: dict) -> None:
def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs: Any) -> None:
""" Mark the end of an attack plugin
:param source: source of the attack. Attack IP

@ -624,7 +624,7 @@ class CalderaAPI():
data = self.__contact_server__(payload, method="patch", rest_path=f"api/v2/agents/{agent_paw}")
return data
def add_operation(self, **kwargs: dict) -> OperationList:
def add_operation(self, **kwargs: Any) -> OperationList:
""" Adds a new operation
:param kwargs:

@ -9,11 +9,11 @@ from pprint import pprint, pformat
from typing import Optional
import requests
from app.exceptions import CalderaError
from app.exceptions import CalderaError, ConfigurationError
from app.interface_sfx import CommandlineColors
# from app.calderaapi_2 import CalderaAPI
from app.calderaapi_4 import CalderaAPI
from app.calderaapi_4 import CalderaAPI, Operation, Source, Adversary, Objective, Ability
# TODO: Ability deserves an own class.
@ -41,7 +41,7 @@ class CalderaControl(CalderaAPI):
# print(r.headers)
return filename
def list_sources_for_name(self, name: str) -> Optional[dict]:
def list_sources_for_name(self, name: str) -> Optional[Source]:
""" List facts in a source pool with a specific name
:param name: The name of the source pool
@ -85,7 +85,7 @@ class CalderaControl(CalderaAPI):
# return [i.paw for i in self.list_agents()] # 4* version
# ######### Get one specific item
def get_operation(self, name: str) -> Optional[dict]:
def get_operation(self, name: str) -> Optional[Operation]:
""" Gets an operation by name
:param name: Name of the operation to look for
@ -97,7 +97,7 @@ class CalderaControl(CalderaAPI):
return operation
return None
def get_adversary(self, name: str) -> Optional[dict]:
def get_adversary(self, name: str) -> Optional[Adversary]:
""" Gets a specific adversary by name
:param name: Name to look for
@ -108,7 +108,7 @@ class CalderaControl(CalderaAPI):
return adversary
return None
def get_objective(self, name: str) -> Optional[dict]:
def get_objective(self, name: str) -> Optional[Objective]:
""" Returns an objective with a given name
:param name: Name to filter for
@ -119,7 +119,7 @@ class CalderaControl(CalderaAPI):
return objective
return None
def get_ability(self, abid: str) -> list[dict]:
def get_ability(self, abid: str) -> list[Ability]:
""" Return an ability by id
:param abid: Ability id
@ -165,7 +165,7 @@ class CalderaControl(CalderaAPI):
print(self.get_ability(abid))
return False
def get_operation_by_id(self, op_id: str) -> list[dict]:
def get_operation_by_id(self, op_id: str) -> list[Operation]:
""" Get operation by id
:param op_id: Operation id
@ -190,11 +190,15 @@ class CalderaControl(CalderaAPI):
operation = self.get_operation_by_id(op_id)
# print("Check for: {} {}".format(paw, ability_id))
for alink in operation[0]["chain"]:
if len(operation) == 0:
return None
if operation[0].chain is None:
return None
for alink in operation[0].chain:
# print("Lookup: PAW: {} Ability: {}".format(alink["paw"], alink["ability"]["ability_id"]))
# print("In: " + str(alink))
if alink["paw"] == paw and alink["ability"]["ability_id"] == ability_id:
return alink["id"]
if alink.paw == paw and alink.ability.ability_id == ability_id:
return alink.id
return None
@ -217,7 +221,7 @@ class CalderaControl(CalderaAPI):
print(f"Could not find {paw} in {orep['steps']}")
raise CalderaError
# print("oprep: " + str(orep))
for a_step in orep.get("steps").get(paw).get("steps"):
for a_step in orep.get("steps").get(paw).get("steps"): # type: ignore
if a_step.get("ability_id") == ability_id:
try:
return a_step.get("output")
@ -299,12 +303,12 @@ class CalderaControl(CalderaAPI):
# Plus: 0 as "finished"
#
operation = self.get_operation_by_id(opid)
operation: list[Operation] = self.get_operation_by_id(opid)
# print(f"Operation data {operation}")
try:
for host_group in operation[0]["host_group"]:
for alink in host_group["links"]:
if alink["status"] != 0:
for host_group in operation[0].host_group:
for alink in host_group.links:
if alink.status != 0:
return False
except Exception as exception:
raise CalderaError from exception
@ -313,7 +317,7 @@ class CalderaControl(CalderaAPI):
# ######## All inclusive methods
def attack(self, paw: str = "kickme", ability_id: str = "bd527b63-9f9e-46e0-9816-b8434d2b8989",
group: str = "red", target_platform: Optional[str] = None, parameters: Optional[str] = None, **kwargs) -> bool:
group: str = "red", target_platform: Optional[str] = None, parameters: Optional[dict] = None, **kwargs) -> bool:
""" Attacks a system and returns results
:param paw: Paw to attack
@ -332,8 +336,10 @@ class CalderaControl(CalderaAPI):
# caesar: failed
# base64noPadding: worked
# steganopgraphy: ?
obfuscator = self.config.get_caldera_obfuscator()
jitter = self.config.get_caldera_jitter()
if self.config is None:
raise ConfigurationError("No Config")
obfuscator: str = self.config.get_caldera_obfuscator()
jitter: str = self.config.get_caldera_jitter()
adversary_name = "generated_adv__" + str(time.time())
operation_name = "testoperation__" + str(time.time())
@ -426,7 +432,7 @@ class CalderaControl(CalderaAPI):
self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_GREEN} Output: {outp} {CommandlineColors.ENDC}", 2)
pprint(output)
self.attack_logger.vprint(self.list_facts_for_name("source_" + operation_name), 2)
self.attack_logger.vprint(str(self.list_facts_for_name("source_" + operation_name)), 2)
# ######## Cleanup
self.set_operation_state(opid, "cleanup")

@ -357,7 +357,9 @@ class Experiment():
plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name()))
if self.attacker_1 is None:
raise PluginError("Attacker not properly configured")
plugin.set_attacker_machine(self.attacker_1)
if self.attacker_1.vm_manager is None:
raise PluginError("Attacker not properly configured")
plugin.set_attacker_machine(self.attacker_1.vm_manager)
plugin.set_sysconf({})
plugin.set_logger(self.attack_logger)
if self.caldera_control is None:

@ -23,7 +23,7 @@ from app.interface_sfx import CommandlineColors
class Metasploit():
""" Metasploit class for basic Metasploit wrapping """
def __init__(self, password: str, attack_logger: AttackLog, **kwargs: dict) -> None:
def __init__(self, password: str, attack_logger: AttackLog, **kwargs: Any) -> None:
"""
:param password: password for the msfrpcd
@ -170,6 +170,11 @@ class Metasploit():
:return: the string results
"""
if self.client is None:
raise MetasploitError("No client")
if self.client.sessions is None:
raise MetasploitError("No sessions")
shell = self.client.sessions.session(self.get_sid(session_number))
res = []
for cmd in cmds:
@ -187,6 +192,11 @@ class Metasploit():
:return: the string results
"""
if self.client is None:
raise MetasploitError("No client")
if self.client.sessions is None:
raise MetasploitError("No sessions")
session_id = self.get_sid_to(target)
# print(f"Session ID: {session_id}")
shell = self.client.sessions.session(session_id)

@ -4,7 +4,9 @@
[mypy]
warn_unused_configs = True
mypy_path = $MYPY_CONFIG_FILE_DIR:$MYPY_CONFIG_FILE_DIR/app:$MYPY_CONFIG_FILE_DIR/plugins/base
# disallow_untyped_defs = True # Activate that as soon as refactoring and "make stepbystep" works
# check_untyped_defs = True # Activate that as soon as refactoring and "make stepbystep" works
exclude = app/calderaapi_2.py
# Setting for the main app
[mypy-app.*]

@ -3,13 +3,14 @@
from enum import Enum
import os
from typing import Optional
from typing import Optional, Any
from app.calderacontrol import CalderaControl
from app.exceptions import PluginError, ConfigurationError, RequirementError
from app.metasploit import MetasploitInstant
from plugins.base.machinery import MachineryPlugin
from plugins.base.plugin_base import BasePlugin
# from app.machinecontrol import Machine
class Requirement(Enum):
@ -25,7 +26,7 @@ class AttackPlugin(BasePlugin):
# name: Optional[str] = None
# description: Optional[str] = None
ttp: Optional[str] = None #: TTP of this attack. Or ??? if unknown
references = None # A list of urls or other references
references: list[str] = [] #: A list of urls or other references
required_files: list[str] = [] # Better use the other required_files features
required_files_attacker: list[str] = [] #: A list of files to automatically install to the attacker
@ -33,27 +34,28 @@ class AttackPlugin(BasePlugin):
requirements: Optional[list[Requirement]] = [] #: Requirements to run this plugin, Available are METASPLOIT and CALDERA at the moment
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.conf: dict = {} # Plugin specific configuration
# self.sysconf = {} # System configuration. common for all plugins
self.attacker_machine_plugin = None # The machine plugin referencing the attacker. The Kali machine should be the perfect candidate
self.target_machine_plugin = None # The machine plugin referencing the target
self.caldera = None # The Caldera connection object
self.targets = None
self.attacker_machine_plugin: Optional[MachineryPlugin] = None # The machine plugin referencing the attacker. The Kali machine should be the perfect candidate
self.target_machine_plugin: Optional[MachineryPlugin] = None # The machine plugin referencing the target
self.caldera: Optional[CalderaControl] = None # The Caldera connection object
self.targets: list[Any] = []
self.metasploit_password: str = "password"
self.metasploit_user: str = "user"
self.metasploit = None
self.metasploit: Optional[MetasploitInstant] = None
def run(self, targets: list[str]):
def run(self, targets: list[str]) -> str:
""" The attack is ran here. This method **must be implemented**
@param targets: A list of targets, ip addresses will do
:param targets: A list of targets, ip addresses will do
:return: The result as a string
"""
raise NotImplementedError
def install(self): # pylint: disable=no-self-use
def install(self) -> None: # pylint: disable=no-self-use
""" Install and setup requirements for the attack
This step is *optional*
@ -85,12 +87,15 @@ class AttackPlugin(BasePlugin):
return True
return False
def connect_metasploit(self):
def connect_metasploit(self) -> None:
""" Inits metasploit
:meta private:
"""
if self.attack_logger is None:
raise PluginError("Attack logger is required")
if self.needs_metasploit():
self.metasploit = MetasploitInstant(self.metasploit_password,
attack_logger=self.attack_logger,
@ -98,13 +103,19 @@ class AttackPlugin(BasePlugin):
username=self.metasploit_user)
# If metasploit requirements are not set, self.metasploit stay None and using metasploit from a plugin not having the requirements will trigger an exception
def copy_to_attacker_and_defender(self):
def copy_to_attacker_and_defender(self) -> None:
""" Copy attacker/defender specific files to the machines. Called by setup, do not call it yourself. template processing happens before
:meta private:
"""
if self.plugin_path is None:
raise PluginError("Path for plugin not set")
if self.attacker_machine_plugin is None:
raise PluginError("Attacker machine not registered")
for a_file in self.required_files_attacker:
src = os.path.join(os.path.dirname(self.plugin_path), a_file)
self.vprint(src, 3)
@ -112,7 +123,7 @@ class AttackPlugin(BasePlugin):
# TODO: add target(s)
def teardown(self):
def teardown(self) -> None:
""" Cleanup afterwards
This is an *optional* method which is called after the attack. If you want to do some cleanup in your plugin, implement it.
@ -150,7 +161,7 @@ class AttackPlugin(BasePlugin):
res = self.target_machine_plugin.remote_run(command, disown=disown)
return res
def set_target_machines(self, machine: MachineryPlugin):
def set_target_machines(self, machine: MachineryPlugin) -> None:
""" Set the machine to target
:param machine: Machine plugin to communicate with
@ -158,7 +169,7 @@ class AttackPlugin(BasePlugin):
self.target_machine_plugin = machine
def set_attacker_machine(self, machine: MachineryPlugin):
def set_attacker_machine(self, machine: MachineryPlugin) -> None:
""" Set the machine plugin class to target
:param machine: Machine to communicate with
@ -166,7 +177,7 @@ class AttackPlugin(BasePlugin):
self.attacker_machine_plugin = machine
def set_caldera(self, caldera: CalderaControl):
def set_caldera(self, caldera: CalderaControl) -> None:
""" Set the caldera control to be used for caldera attacks
@param caldera: The caldera object to connect through
@ -175,7 +186,7 @@ class AttackPlugin(BasePlugin):
if self.needs_caldera():
self.caldera = caldera
def caldera_attack(self, target: MachineryPlugin, ability_id: str, parameters=None, **kwargs):
def caldera_attack(self, target: MachineryPlugin, ability_id: str, parameters: Optional[dict] = None, **kwargs) -> None:
""" Attack a single target using caldera
:param target: Target machine object
@ -186,6 +197,9 @@ class AttackPlugin(BasePlugin):
if not self.needs_caldera():
raise RequirementError("Caldera not in requirements")
if self.caldera is None:
raise PluginError("Caldera not configured")
self.caldera.attack(paw=target.get_paw(),
ability_id=ability_id,
group=target.get_group(),
@ -194,7 +208,7 @@ class AttackPlugin(BasePlugin):
**kwargs
)
def get_attacker_playground(self) -> str:
def get_attacker_playground(self) -> Optional[str]:
""" Returns the attacker machine specific playground
This is the folder on the machine where we run our tasks in
@ -205,26 +219,39 @@ class AttackPlugin(BasePlugin):
if self.attacker_machine_plugin is None:
raise PluginError("Attacker machine not configured.")
return self.attacker_machine_plugin.get_playground()
playground = self.attacker_machine_plugin.get_playground()
return playground
def __execute__(self, targets):
def __execute__(self, targets: list[Any]) -> str:
""" Execute the plugin. This is called by the code
:meta private:
@param targets: A list of targets => machines
:param targets: A list of targets => machines (and it would be smarter to use MachineryPlugin instead of machine)
"""
# TODO: Use MachineryPlugin instead of Machine
if self.attack_logger is None:
raise PluginError("Attack logger not defined")
if self.name is None:
raise PluginError("Plugin has no name")
if self.attacker_machine_plugin is None:
raise PluginError("No attacker machine plugin present")
if self.attacker_machine_plugin.config is None:
raise PluginError("Configuration broken")
self.targets = targets
ips = [tgt.get_ip() for tgt in targets]
target_ip = targets[0].get_ip()
self.setup()
self.attack_logger.start_attack_plugin(self.attacker_machine_plugin.config.vmname(), ips, self.name, ttp=self.get_ttp())
self.attack_logger.start_attack_plugin(self.attacker_machine_plugin.config.vmname(), target_ip, self.name, ttp=self.get_ttp())
res = self.run(targets)
self.teardown()
self.attack_logger.stop_attack_plugin(self.attacker_machine_plugin.config.vmname(), ips, self.name, ttp=self.get_ttp())
self.attack_logger.stop_attack_plugin(self.attacker_machine_plugin.config.vmname(), target_ip, self.name, ttp=self.get_ttp())
return res
def get_ttp(self):
def get_ttp(self) -> str:
""" Returns the ttp of the plugin, please set in boilerplate
:meta private:
@ -235,7 +262,7 @@ class AttackPlugin(BasePlugin):
raise NotImplementedError
def get_references(self):
def get_references(self) -> list[str]:
""" Returns the references of the plugin, please set in boilerplate
:meta private:
@ -246,7 +273,7 @@ class AttackPlugin(BasePlugin):
raise NotImplementedError
def get_target_by_name(self, name: str):
def get_target_by_name(self, name: str) -> Any:
""" Returns a target machine out of the target pool by matching the name
If there is no matching name it will look into the "nicknames" list of the machine config
@ -254,6 +281,11 @@ class AttackPlugin(BasePlugin):
@returns: the machine
"""
# TODO: Current return is Machine, but refactoring should replace it with MachineryPlugin
if self.targets is None:
raise PluginError("No targets available")
for target in self.targets:
if target.get_name() == name:
return target

@ -1,10 +1,11 @@
import unittest
from unittest.mock import patch, call
from app.calderacontrol import CalderaControl
from simplejson.errors import JSONDecodeError
from simplejson.errors import JSONDecodeError # type: ignore
from app.exceptions import CalderaError
from app.attack_log import AttackLog
import pydantic
from dotmap import DotMap # type: ignore
# https://docs.python.org/3/library/unittest.html
@ -168,7 +169,7 @@ class TestExample(unittest.TestCase):
"ability": {"ability_id": ability_id},
"id": "Getme"}
op = [{"chain": [alink]}]
op = [DotMap({"chain": [alink]})]
with patch.object(self.cc, "get_operation_by_id", return_value=op):
res = self.cc.get_linkid("Foo", paw, ability_id)
@ -183,7 +184,7 @@ class TestExample(unittest.TestCase):
"ability": {"ability_id": ability_id},
"id": "Getme"}
op = [{"chain": [alink]}]
op = [DotMap({"chain": [alink]})]
with patch.object(self.cc, "get_operation_by_id", return_value=op):
res = self.cc.get_linkid("Foo", "Bar", ability_id)

Loading…
Cancel
Save