Type checking

pull/12/head
Thorsten Sick 3 years ago
parent f6a4902596
commit 8d9e1b025b

@ -19,4 +19,8 @@ pylint:
pylint --rcfile=pylint.rc *.py app/*.py plugins/base/*.py
mypy:
mypy --strict-optional app/
mypy --strict-optional app/
# Fixing mypy file by file
stepbystep:
mypy --strict-optional plugins/base/plugin_base.py plugins/base/machinery.py app/config.py plugins/base/caldera.py plugins/base/attack.py plugins/base/sensor.py plugins/base/ssh_features.py plugins/base/vulnerability_plugin.py

@ -17,7 +17,7 @@ from app.exceptions import ConfigurationError
class MachineConfig():
""" Sub config for a specific machine"""
def __init__(self, machinedata):
def __init__(self, machinedata: dict):
""" Init machine control config
@param machinedata: dict containing machine data
@ -147,21 +147,21 @@ class MachineConfig():
class ExperimentConfig():
""" Configuration class for a whole experiments """
def __init__(self, configfile):
def __init__(self, configfile: str):
""" Init the config, process the file
@param configfile: The configuration file to process
"""
self.raw_config = None
self._targets = []
self._attackers = []
self._targets: list[MachineConfig] = []
self._attackers: list[MachineConfig] = []
self.load(configfile)
# Test essential data that is a hard requirement. Should throw errors if anything is wrong
self.loot_dir()
def load(self, configfile):
def load(self, configfile: str):
""" Loads the configuration file
@param configfile: The configuration file to process
@ -170,11 +170,18 @@ class ExperimentConfig():
with open(configfile) as fh:
self.raw_config = yaml.safe_load(fh)
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
# Process targets
if self.raw_config["targets"] is None:
raise ConfigurationError("Config file does not specify targets")
for target in self.raw_config["targets"]:
self._targets.append(MachineConfig(self.raw_config["targets"][target]))
# Process attackers
if self.raw_config["attackers"] is None:
raise ConfigurationError("Config file does not specify attackers")
for attacker in self.raw_config["attackers"]:
self._attackers.append(MachineConfig(self.raw_config["attackers"][attacker]))
@ -188,7 +195,7 @@ class ExperimentConfig():
return self._attackers
def attacker(self, mid) -> MachineConfig:
def attacker(self, mid: int) -> MachineConfig:
""" Return config for attacker as MachineConfig objects
@param mid: id of the attacker, 0 is main attacker
@ -212,12 +219,16 @@ class ExperimentConfig():
raise ConfigurationError("results/loot_dir not properly set in configuration") from error
return res
def attack_conf(self, attack):
def attack_conf(self, attack: str):
""" Get kali config for a specific kali attack
@param attack: Name of the attack to look up config for
"""
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if self.raw_config["attack_conf"] is None:
raise ConfigurationError("Config file missing attacks")
try:
res = self.raw_config["attack_conf"][attack]
except KeyError:
@ -245,12 +256,14 @@ class ExperimentConfig():
return "4/8"
return res
def get_plugin_based_attacks(self, for_os):
def get_plugin_based_attacks(self, for_os: str):
""" Get the configured kali attacks to run for a specific OS
@param for_os: The os to query the registered attacks for
"""
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "plugin_based_attacks" not in self.raw_config:
return []
if for_os not in self.raw_config["plugin_based_attacks"]:
@ -260,12 +273,14 @@ class ExperimentConfig():
return []
return res
def get_caldera_attacks(self, for_os):
def get_caldera_attacks(self, for_os: str):
""" Get the configured caldera attacks to run for a specific OS
@param for_os: The os to query the registered attacks for
"""
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "caldera_attacks" not in self.raw_config:
return []
if for_os not in self.raw_config["caldera_attacks"]:
@ -283,11 +298,14 @@ class ExperimentConfig():
except KeyError:
return 0
def get_sensor_config(self, name):
def get_sensor_config(self, name: str):
""" Return the config for a specific sensor
@param name: name of the sensor
"""
if self.raw_config is None:
raise ConfigurationError("Config file is empty")
if "sensors" not in self.raw_config:
return {}
if self.raw_config["sensors"] is None: # Better for unit tests that way.

@ -6,34 +6,36 @@ from plugins.base.plugin_base import BasePlugin
from app.exceptions import PluginError, ConfigurationError
from app.calderacontrol import CalderaControl
# from app.metasploit import MSFVenom, Metasploit
from typing import Optional
from machinery import MachineryPlugin
class AttackPlugin(BasePlugin):
""" Class to execute a command on a kali system targeting another system """
# Boilerplate
name = None
description = None
ttp = None
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
required_files = [] # Better use the other required_files features
required_files_attacker = [] # a list of files to automatically install to the attacker
required_files_target = [] # a list of files to automatically copy to the targets
required_files: list[str] = [] # Better use the other required_files features
required_files_attacker: list[str] = [] # a list of files to automatically install to the attacker
required_files_target: list[str] = [] # a list of files to automatically copy to the targets
# TODO: parse results
def __init__(self):
super().__init__()
self.conf = {} # Plugin specific configuration
self.conf: dict = {} # Plugin specific configuration
# self.sysconf = {} # System configuration. common for all plugins
self.attacker_machine_plugin = None # The machine plugin referencing the attacker. The Kali machine should be the perfect candidate
self.target_machine_plugin = None # The machine plugin referencing the target
self.caldera = None # The Caldera connection object
self.targets = None
self.metasploit_password = "password"
self.metasploit_user = "user"
self.metasploit_password: str = "password"
self.metasploit_user: str = "user"
self.metasploit = None
def copy_to_attacker_and_defender(self):
@ -50,7 +52,7 @@ class AttackPlugin(BasePlugin):
""" Cleanup afterwards """
pass # pylint: disable=unnecessary-pass
def attacker_run_cmd(self, command, disown=False):
def attacker_run_cmd(self, command: str, disown: bool = False) -> str:
""" Execute a command on the attacker
@param command: Command to execute
@ -65,7 +67,7 @@ class AttackPlugin(BasePlugin):
res = self.attacker_machine_plugin.__call_remote_run__(command, disown=disown)
return res
def targets_run_cmd(self, command, disown=False):
def targets_run_cmd(self, command: str, disown: bool = False) -> str:
""" Execute a command on the target
@param command: Command to execute
@ -80,7 +82,7 @@ class AttackPlugin(BasePlugin):
res = self.target_machine_plugin.__call_remote_run__(command, disown=disown)
return res
def set_target_machines(self, machine):
def set_target_machines(self, machine: MachineryPlugin):
""" Set the machine to target
@param machine: Machine plugin to communicate with
@ -88,7 +90,7 @@ class AttackPlugin(BasePlugin):
self.target_machine_plugin = machine.vm_manager
def set_attacker_machine(self, machine):
def set_attacker_machine(self, machine: MachineryPlugin):
""" Set the machine plugin class to target
@param machine: Machine to communicate with
@ -103,11 +105,11 @@ class AttackPlugin(BasePlugin):
"""
self.caldera = caldera
def caldera_attack(self, target, ability_id, parameters=None, **kwargs):
def caldera_attack(self, target: MachineryPlugin, ability_id: str, parameters=None, **kwargs):
""" Attack a single target using caldera
@param target: Target machine object
@param ability_id: Ability if od caldera ability to run
@param ability_id: Ability or caldera ability to run
@param parameters: parameters to pass to the ability
"""
@ -130,7 +132,7 @@ class AttackPlugin(BasePlugin):
return self.attacker_machine_plugin.get_playground()
def run(self, targets):
def run(self, targets: list[str]):
""" Run the command
@param targets: A list of targets, ip addresses will do
@ -172,7 +174,7 @@ class AttackPlugin(BasePlugin):
raise NotImplementedError
def get_target_by_name(self, name):
def get_target_by_name(self, name: str):
""" Returns a target machine out of the target pool by matching the name
If there is no matching name it will look into the "nicknames" list of the machine config

@ -6,38 +6,39 @@ You only gotta write a plugin if you want some special features
"""
from plugins.base.plugin_base import BasePlugin
from typing import Optional
class CalderaPlugin(BasePlugin):
""" Class to execute a command on a caldera system targeting another system """
# Boilerplate
name = None
description = None
ttp = None
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
required_files = []
required_files: list[str] = []
# TODO: parse results
def __init__(self):
super().__init__()
self.conf = {} # Plugin specific configuration
self.conf: dict = {} # Plugin specific configuration
# self.sysconf = {} # System configuration. common for all plugins
def teardown(self):
""" Cleanup afterwards """
pass # pylint: disable=unnecessary-pass
def run(self, targets):
def run(self, targets: list[str]):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
raise NotImplementedError
def __execute__(self, targets):
def __execute__(self, targets: list[str]) -> str:
""" Execute the plugin. This is called by the code
@param targets: A list of targets, ip addresses will do

@ -7,6 +7,7 @@ import os
from app.config import MachineConfig
from app.interface_sfx import CommandlineColors
from plugins.base.plugin_base import BasePlugin
from typing import Optional
class MachineStates(Enum):
@ -26,9 +27,9 @@ class MachineryPlugin(BasePlugin):
""" Class to control virtual machines, vagrant, .... """
# Boilerplate
name = None
name: Optional[str] = None
required_files = []
required_files: list[str] = []
###############
# This is stuff you might want to implement
@ -38,7 +39,7 @@ class MachineryPlugin(BasePlugin):
self.connection = None # Connection
self.config = None
def create(self, reboot=True):
def create(self, reboot: bool = True):
""" Create a machine
@param reboot: Optionally reboot the machine after creation
@ -61,7 +62,7 @@ class MachineryPlugin(BasePlugin):
""" Connect to a machine """
raise NotImplementedError
def remote_run(self, cmd, disown=False):
def remote_run(self, cmd: str, disown: bool = False):
""" Connects to the machine and runs a command there
@ -75,7 +76,7 @@ class MachineryPlugin(BasePlugin):
""" Disconnect from a machine """
raise NotImplementedError
def put(self, src, dst):
def put(self, src: str, dst: str):
""" Send a file to a machine
@param src: source dir
@ -83,7 +84,7 @@ class MachineryPlugin(BasePlugin):
"""
raise NotImplementedError
def get(self, src, dst):
def get(self, src: str, dst: str):
""" Get a file to a machine
@param src: source dir
@ -108,8 +109,11 @@ class MachineryPlugin(BasePlugin):
return self.config.get_playground()
def get_vm_name(self):
""" Get the specific name of the machine """
def get_vm_name(self) -> str:
""" Get the specific name of the machine
@returns: the machine name
"""
return self.config.vmname()
@ -140,7 +144,7 @@ class MachineryPlugin(BasePlugin):
self.config = config
self.process_config(config.raw_config)
def __call_remote_run__(self, cmd, disown=False):
def __call_remote_run__(self, cmd: str, disown: bool = False):
""" Simplifies connect and run
@param cmd: Command to run as shell command
@ -164,7 +168,7 @@ class MachineryPlugin(BasePlugin):
self.up()
def __call_create__(self, reboot=True):
def __call_create__(self, reboot: bool = True):
""" Create a VM
@param reboot: Reboot the VM during installation. Required if you want to install software

@ -19,7 +19,7 @@ class BasePlugin():
def __init__(self) -> None:
# self.machine = None
self.plugin_path = None
self.plugin_path: Optional[str] = None
self.machine_plugin = None
# self.sysconf = {}
self.conf: dict = {}
@ -82,7 +82,7 @@ class BasePlugin():
# self.sysconf["abs_machinepath_external"] = config["abs_machinepath_external"]
self.load_default_config()
def process_config(self, config):
def process_config(self, config: dict):
""" process config and use defaults if stuff is missing
@param config: The config dict
@ -92,7 +92,7 @@ class BasePlugin():
self.conf = {**self.conf, **config}
def copy_to_machine(self, filename):
def copy_to_machine(self, filename: str):
""" Copies a file shipped with the plugin to the machine share folder
@param filename: File from the plugin folder to copy to the machine share.
@ -100,12 +100,17 @@ class BasePlugin():
if self.machine_plugin is not None:
self.machine_plugin.put(filename, self.machine_plugin.get_playground())
else:
raise PluginError("Missing machine")
def get_from_machine(self, src, dst):
def get_from_machine(self, src: str, dst: str):
""" Get a file from the machine """
self.machine_plugin.get(src, dst) # nosec
if self.machine_plugin is not None:
self.machine_plugin.get(src, dst) # nosec
else:
raise PluginError("Missing machine")
def run_cmd(self, command, disown=False):
def run_cmd(self, command: str, disown: bool = False):
""" Execute a command on the vm using the connection
@param command: Command to execute

@ -3,6 +3,7 @@
import os
from plugins.base.plugin_base import BasePlugin
from typing import Optional
class SensorPlugin(BasePlugin):
@ -12,28 +13,28 @@ class SensorPlugin(BasePlugin):
"""
# Boilerplate
name = None
name: Optional[str] = None
required_files = []
required_files: list[str] = []
def __init__(self):
super().__init__() # pylint:disable=useless-super-delegation
self.debugit = False
def prime(self): # pylint: disable=no-self-use
def prime(self) -> bool: # pylint: disable=no-self-use
""" prime sets hard core configs in the target. You can use it to call everything that permanently alters the OS by settings.
If your prime function returns True the machine will be rebooted after prime-ing it. This is very likely what you want. Only use prime if install is not sufficient.
"""
return False
def install(self): # pylint: disable=no-self-use
def install(self) -> bool: # pylint: disable=no-self-use
""" Install the sensor. Executed on the target. Take the sensor from the share and (maybe) copy it to its destination. Do some setup
"""
return True
def start(self, disown=None): # pylint: disable=unused-argument, no-self-use
def start(self, disown=None) -> bool: # pylint: disable=unused-argument, no-self-use
""" Start the sensor. The connection to the client is disowned here. = Sent to background. This keeps the process running.
@param disown: Send async into background
@ -41,22 +42,22 @@ class SensorPlugin(BasePlugin):
return True
def stop(self): # pylint: disable=no-self-use
def stop(self) -> bool: # pylint: disable=no-self-use
""" Stop the sensor """
return True
def __call_collect__(self, machine_path):
def __call_collect__(self, machine_path: str):
""" Generate the data collect command
@param machine_path: Machine specific path to collect data into
"""
path = os.path.join(machine_path, "sensors", self.name)
path = os.path.join(machine_path, "sensors", self.name) # type: ignore
os.makedirs(path)
return self.collect(path)
def collect(self, path) -> []:
def collect(self, path: str) -> list[str]:
""" Collect data from sensor. Copy it from sensor collection dir on target OS to the share
@param path: The path to copy the data into

@ -62,7 +62,7 @@ class SSHFeatures(BasePlugin):
self.vprint("SSH network error", 0)
raise NetworkError
def remote_run(self, cmd, disown=False):
def remote_run(self, cmd: str, disown: bool = False):
""" Connects to the machine and runs a command there
@param cmd: The command to execute
@ -109,7 +109,7 @@ class SSHFeatures(BasePlugin):
return ""
def put(self, src, dst):
def put(self, src: str, dst: str):
""" Send a file to a machine
@param src: source dir
@ -148,7 +148,7 @@ class SSHFeatures(BasePlugin):
self.vprint("SSH network error on PUT command", 0)
raise NetworkError
def get(self, src, dst):
def get(self, src: str, dst: str):
""" Get a file to a machine
@param src: source dir

@ -3,19 +3,19 @@
""" This is a specific plugin type that installs a vulnerability into a VM. This can be a vulnerable application or a configuration setting """
from plugins.base.plugin_base import BasePlugin
from typing import Optional
class VulnerabilityPlugin(BasePlugin):
""" A plugin that installs a vulnerable application or does vulnerable configuration changes on the target VM
"""
# Boilerplate
name = None
description = None
ttp = None
name: Optional[str] = None
description: Optional[str] = None
ttp: Optional[str] = None
references = None
required_files = []
required_files: list[str] = []
def __init__(self):
super().__init__() # pylint:disable=useless-super-delegation

@ -18,4 +18,5 @@ pylint
mypy
types-PyYAML
types-requests
types-simplejson
types-simplejson
types-paramiko

Loading…
Cancel
Save