#!/usr/bin/env python3 """ Manage plugins """ from glob import glob import os import re from typing import Optional, Any import straight.plugin # type: ignore from straight.plugin.manager import PluginManager as StraightPluginManager # type: ignore from plugins.base.plugin_base import BasePlugin from plugins.base.attack import AttackPlugin from plugins.base.machinery import MachineryPlugin from plugins.base.ssh_features import SSHFeatures from plugins.base.sensor import SensorPlugin from plugins.base.vulnerability_plugin import VulnerabilityPlugin from app.interface_sfx import CommandlineColors from app.attack_log import AttackLog from app.exceptions import PluginError # from app.interface_sfx import CommandlineColors sections = [{"name": "Vulnerabilities", "subclass": VulnerabilityPlugin}, {"name": "Machinery", "subclass": MachineryPlugin}, {"name": "Attack", "subclass": AttackPlugin}, {"name": "Sensors", "subclass": SensorPlugin}, ] class PluginManager(): """ Manage plugins """ def __init__(self, attack_logger: AttackLog, basedir: Optional[str] = None): """ @param attack_logger: The attack logger to use @param basedir: optional base directory for plugins. A glob """ if basedir is None: self.base = "plugins/**/*.py" else: self.base = basedir self.attack_logger = attack_logger def get_plugins(self, subclass: Any, name_filter: Optional[list[str]] = None) -> list[BasePlugin]: """ Returns a list plugins matching specified criteria :param subclass: The subclass to use to filter plugins. Currently: AttackPlugin, MachineryPlugin, SensorPlugin, VulnerabilityPlugin :param name_filter: an optional list of names to select the plugins by :return: A list of instantiated plugins """ res = [] def get_handlers(a_plugin: StraightPluginManager) -> list[BasePlugin]: return a_plugin.produce() plugin_dirs = set() for a_glob in glob(self.base, recursive=True): plugin_dirs.add(os.path.dirname(a_glob)) for a_dir in plugin_dirs: plugins = straight.plugin.load(a_dir, subclasses=subclass) handlers = get_handlers(plugins) for plugin in handlers: plugin.set_logger(self.attack_logger) if name_filter is None: res.append(plugin) else: names = set(plugin.get_names()) intersection = names.intersection(name_filter) if len(intersection): res.append(plugin) return res def count_caldera_requirements(self, subclass: Any, name_filter: Optional[list[str]] = None) -> int: """ Count the plugins matching the filter that have caldera requirements """ # So far it only supports attack plugins. Maybe this will be extended to other plugin types later. assert subclass == AttackPlugin plugins = self.get_plugins(subclass, name_filter) res = 0 for plugin in plugins: if isinstance(plugin, AttackPlugin): if plugin.needs_caldera(): res += 1 else: raise PluginError("Wrong plugin type. Expected AttackPlugin") return res def count_metasploit_requirements(self, subclass: Any, name_filter: Optional[list[str]] = None) -> int: """ Count the plugins matching the filter that have metasploit requirements """ # So far it only supports attack plugins. Maybe this will be extended to other plugin types later. assert subclass == AttackPlugin plugins = self.get_plugins(subclass, name_filter) res = 0 for plugin in plugins: if isinstance(plugin, AttackPlugin): if plugin.needs_metasploit(): res += 1 else: raise PluginError("Wrong plugin type. Expected AttackPlugin") return res def print_list(self) -> None: """ Print a pretty list of all available plugins """ for section in sections: print(f'\t\t{section["name"]}') plugins = self.get_plugins(section["subclass"]) # type: ignore for plugin in plugins: print(f"Name: {plugin.get_name()}") print(f"Description: {plugin.get_description()}") print("\t") def is_ttp_wrong(self, ttp: Optional[str]) -> bool: """ Checks if a ttp is a valid ttp """ if ttp is None: return True # Short: T1234 if re.match("^T\\d{4}$", ttp): return False # Detailed: T1234.123 if re.match("^T\\d{4}\\.\\d{3}$", ttp): return False # Unkown: ??? if ttp == "???": return False # Multiple TTPs in this attack if ttp == "multiple": return False return True def check(self, plugin: BasePlugin) -> list[str]: """ Checks a plugin for valid implementation :returns: A list of issues """ issues = [] # Base functionality for all plugin types if plugin.name is None: report = f"No name for plugin: in {plugin.plugin_path}" issues.append(report) if plugin.description is None: report = f"No description in plugin: {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) # Sensors if issubclass(type(plugin), SensorPlugin): # essential methods: collect if plugin.collect.__func__ is SensorPlugin.collect: # type: ignore report = f"Method 'collect' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) # Attacks if issubclass(type(plugin), AttackPlugin): # essential methods: run if plugin.run.__func__ is AttackPlugin.run: # type: ignore report = f"Method 'run' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) if self.is_ttp_wrong(plugin.ttp): # type: ignore report = f"Attack plugins need a valid ttp number (either T1234, T1234.222 or ???) {plugin.get_name()} uses {plugin.ttp} in {plugin.plugin_path}" # type: ignore issues.append(report) # Machinery if issubclass(type(plugin), MachineryPlugin): # essential methods: get_ip, get_state, up. halt, create, destroy if plugin.get_state.__func__ is MachineryPlugin.get_state: # type: ignore report = f"Method 'get_state' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) if (plugin.get_ip.__func__ is MachineryPlugin.get_ip) or (plugin.get_ip.__func__ is SSHFeatures.get_ip): # type: ignore report = f"Method 'get_ip' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) if plugin.up.__func__ is MachineryPlugin.up: # type: ignore report = f"Method 'up' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) if plugin.halt.__func__ is MachineryPlugin.halt: # type: ignore report = f"Method 'halt' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) if plugin.create.__func__ is MachineryPlugin.create: # type: ignore report = f"Method 'create' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) if plugin.destroy.__func__ is MachineryPlugin.destroy: # type: ignore report = f"Method 'destroy' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) # Vulnerabilities if issubclass(type(plugin), VulnerabilityPlugin): # essential methods: start, stop if plugin.start.__func__ is VulnerabilityPlugin.start: # type: ignore report = f"Method 'start' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) if plugin.stop.__func__ is VulnerabilityPlugin.stop: # type: ignore report = f"Method 'stop' not implemented in {plugin.get_name()} in {plugin.plugin_path}" issues.append(report) if self.is_ttp_wrong(plugin.ttp): # type: ignore report = f"Vulnerability plugins need a valid ttp number (either T1234, T1234.222 or ???) {plugin.get_name()} uses {plugin.ttp} in {plugin.plugin_path}" # type: ignore issues.append(report) return issues def print_check(self) -> list[str]: """ Iterates through all installed plugins and verifies them """ names: dict[str, str] = {} cnames: dict[str, object] = {} issues = [] for section in sections: # print(f'\t\t{section["name"]}') subclass = section["subclass"] plugins = self.get_plugins(subclass) # type: ignore for plugin in plugins: # print(f"Checking: {plugin.get_name()}") # Check for duplicate names name = plugin.get_name() if name in names: report = f"Name duplication: {name} is used in {names[name]} and {plugin.plugin_path}" issues.append(report) self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{report}{CommandlineColors.ENDC}", 0) ppath = plugin.plugin_path if ppath is None: raise Exception("A plugin has no path") names[name] = ppath # Check for duplicate class names name = type(plugin).__name__ if name in cnames: report = f"Class name duplication: {name} is used in {cnames[name]} and {plugin.plugin_path}" issues.append(report) self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{report}{CommandlineColors.ENDC}", 0) cnames[name] = type(plugin) # Deep checks results = self.check(plugin) if len(results) > 0: for result in results: print(f"* Issue: {result}") issues.append(result) self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{result}{CommandlineColors.ENDC}", 1) return issues # TODO: Add verify command to verify all plugins (or a specific one) def print_default_config(self, subclass_name: str, name: str) -> None: """ Pretty prints the default config for this plugin """ subclass = None for section in sections: if section["name"] == subclass_name: subclass = section["subclass"] if subclass is None: print("Use proper subclass") plugins = self.get_plugins(subclass, [name]) # type: ignore for plugin in plugins: print(plugin.get_raw_default_config())