#!/usr/bin/env python3
""" Manage plugins """
from glob import glob
import os
import re
from typing import Optional , Any
import straight . plugin # type: ignore
from straight . plugin . manager import PluginManager as StraightPluginManager # type: ignore
from plugins . base . plugin_base import BasePlugin
from plugins . base . attack import AttackPlugin
from plugins . base . machinery import MachineryPlugin
from plugins . base . ssh_features import SSHFeatures
from plugins . base . sensor import SensorPlugin
from plugins . base . vulnerability_plugin import VulnerabilityPlugin
from app . interface_sfx import CommandlineColors
from app . attack_log import AttackLog
from app . exceptions import PluginError
# from app.interface_sfx import CommandlineColors
sections = [ { " name " : " Vulnerabilities " ,
" subclass " : VulnerabilityPlugin } ,
{ " name " : " Machinery " ,
" subclass " : MachineryPlugin } ,
{ " name " : " Attack " ,
" subclass " : AttackPlugin } ,
{ " name " : " Sensors " ,
" subclass " : SensorPlugin } ,
]
class PluginManager ( ) :
""" Manage plugins """
def __init__ ( self , attack_logger : AttackLog , basedir : Optional [ str ] = None ) :
"""
@param attack_logger : The attack logger to use
@param basedir : optional base directory for plugins . A glob
"""
if basedir is None :
self . base = " plugins/**/*.py "
else :
self . base = basedir
self . attack_logger = attack_logger
def get_plugins ( self , subclass : Any ,
name_filter : Optional [ list [ str ] ] = None ) - > list [ BasePlugin ] :
""" Returns a list plugins matching specified criteria
: param subclass : The subclass to use to filter plugins . Currently : AttackPlugin , MachineryPlugin , SensorPlugin , VulnerabilityPlugin
: param name_filter : an optional list of names to select the plugins by
: return : A list of instantiated plugins
"""
res = [ ]
def get_handlers ( a_plugin : StraightPluginManager ) - > list [ BasePlugin ] :
return a_plugin . produce ( )
plugin_dirs = set ( )
for a_glob in glob ( self . base , recursive = True ) :
plugin_dirs . add ( os . path . dirname ( a_glob ) )
for a_dir in plugin_dirs :
plugins = straight . plugin . load ( a_dir , subclasses = subclass )
handlers = get_handlers ( plugins )
for plugin in handlers :
plugin . set_logger ( self . attack_logger )
if name_filter is None :
res . append ( plugin )
else :
names = set ( plugin . get_names ( ) )
intersection = names . intersection ( name_filter )
if len ( intersection ) :
res . append ( plugin )
return res
def count_caldera_requirements ( self , subclass : Any ,
name_filter : Optional [ list [ str ] ] = None ) - > int :
""" Count the plugins matching the filter that have caldera requirements """
# So far it only supports attack plugins. Maybe this will be extended to other plugin types later.
assert subclass == AttackPlugin
plugins = self . get_plugins ( subclass , name_filter )
res = 0
for plugin in plugins :
if isinstance ( plugin , AttackPlugin ) :
if plugin . needs_caldera ( ) :
res + = 1
else :
raise PluginError ( " Wrong plugin type. Expected AttackPlugin " )
return res
def count_metasploit_requirements ( self , subclass : Any ,
name_filter : Optional [ list [ str ] ] = None ) - > int :
""" Count the plugins matching the filter that have metasploit requirements """
# So far it only supports attack plugins. Maybe this will be extended to other plugin types later.
assert subclass == AttackPlugin
plugins = self . get_plugins ( subclass , name_filter )
res = 0
for plugin in plugins :
if isinstance ( plugin , AttackPlugin ) :
if plugin . needs_metasploit ( ) :
res + = 1
else :
raise PluginError ( " Wrong plugin type. Expected AttackPlugin " )
return res
def print_list ( self ) - > None :
""" Print a pretty list of all available plugins """
for section in sections :
print ( f ' \t \t { section [ " name " ] } ' )
plugins = self . get_plugins ( section [ " subclass " ] ) # type: ignore
for plugin in plugins :
print ( f " Name: { plugin . get_name ( ) } " )
print ( f " Description: { plugin . get_description ( ) } " )
print ( " \t " )
def is_ttp_wrong ( self , ttp : Optional [ str ] ) - > bool :
""" Checks if a ttp is a valid ttp """
if ttp is None :
return True
# Short: T1234
if re . match ( " ^T \\ d {4} $ " , ttp ) :
return False
# Detailed: T1234.123
if re . match ( " ^T \\ d {4} \\ . \\ d {3} $ " , ttp ) :
return False
# Unkown: ???
if ttp == " ??? " :
return False
# Multiple TTPs in this attack
if ttp == " multiple " :
return False
return True
def check ( self , plugin : BasePlugin ) - > list [ str ] :
""" Checks a plugin for valid implementation
: returns : A list of issues
"""
issues = [ ]
# Base functionality for all plugin types
if plugin . name is None :
report = f " No name for plugin: in { plugin . plugin_path } "
issues . append ( report )
if plugin . description is None :
report = f " No description in plugin: { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
# Sensors
if issubclass ( type ( plugin ) , SensorPlugin ) :
# essential methods: collect
if plugin . collect . __func__ is SensorPlugin . collect : # type: ignore
report = f " Method ' collect ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
# Attacks
if issubclass ( type ( plugin ) , AttackPlugin ) :
# essential methods: run
if plugin . run . __func__ is AttackPlugin . run : # type: ignore
report = f " Method ' run ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if self . is_ttp_wrong ( plugin . ttp ) : # type: ignore
report = f " Attack plugins need a valid ttp number (either T1234, T1234.222 or ???) { plugin . get_name ( ) } uses { plugin . ttp } in { plugin . plugin_path } " # type: ignore
issues . append ( report )
# Machinery
if issubclass ( type ( plugin ) , MachineryPlugin ) :
# essential methods: get_ip, get_state, up. halt, create, destroy
if plugin . get_state . __func__ is MachineryPlugin . get_state : # type: ignore
report = f " Method ' get_state ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if ( plugin . get_ip . __func__ is MachineryPlugin . get_ip ) or ( plugin . get_ip . __func__ is SSHFeatures . get_ip ) : # type: ignore
report = f " Method ' get_ip ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . up . __func__ is MachineryPlugin . up : # type: ignore
report = f " Method ' up ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . halt . __func__ is MachineryPlugin . halt : # type: ignore
report = f " Method ' halt ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . create . __func__ is MachineryPlugin . create : # type: ignore
report = f " Method ' create ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . destroy . __func__ is MachineryPlugin . destroy : # type: ignore
report = f " Method ' destroy ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
# Vulnerabilities
if issubclass ( type ( plugin ) , VulnerabilityPlugin ) :
# essential methods: start, stop
if plugin . start . __func__ is VulnerabilityPlugin . start : # type: ignore
report = f " Method ' start ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . stop . __func__ is VulnerabilityPlugin . stop : # type: ignore
report = f " Method ' stop ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if self . is_ttp_wrong ( plugin . ttp ) : # type: ignore
report = f " Vulnerability plugins need a valid ttp number (either T1234, T1234.222 or ???) { plugin . get_name ( ) } uses { plugin . ttp } in { plugin . plugin_path } " # type: ignore
issues . append ( report )
return issues
def print_check ( self ) - > list [ str ] :
""" Iterates through all installed plugins and verifies them """
names : dict [ str , str ] = { }
cnames : dict [ str , object ] = { }
issues = [ ]
for section in sections :
# print(f'\t\t{section["name"]}')
subclass = section [ " subclass " ]
plugins = self . get_plugins ( subclass ) # type: ignore
for plugin in plugins :
# print(f"Checking: {plugin.get_name()}")
# Check for duplicate names
name = plugin . get_name ( )
if name in names :
report = f " Name duplication: { name } is used in { names [ name ] } and { plugin . plugin_path } "
issues . append ( report )
self . attack_logger . vprint ( f " { CommandlineColors . BACKGROUND_RED } { report } { CommandlineColors . ENDC } " , 0 )
ppath = plugin . plugin_path
if ppath is None :
raise Exception ( " A plugin has no path " )
names [ name ] = ppath
# Check for duplicate class names
name = type ( plugin ) . __name__
if name in cnames :
report = f " Class name duplication: { name } is used in { cnames [ name ] } and { plugin . plugin_path } "
issues . append ( report )
self . attack_logger . vprint ( f " { CommandlineColors . BACKGROUND_RED } { report } { CommandlineColors . ENDC } " , 0 )
cnames [ name ] = type ( plugin )
# Deep checks
results = self . check ( plugin )
if len ( results ) > 0 :
for result in results :
print ( f " * Issue: { result } " )
issues . append ( result )
self . attack_logger . vprint ( f " { CommandlineColors . BACKGROUND_RED } { result } { CommandlineColors . ENDC } " , 1 )
return issues
# TODO: Add verify command to verify all plugins (or a specific one)
def print_default_config ( self , subclass_name : str , name : str ) - > None :
""" Pretty prints the default config for this plugin """
subclass = None
for section in sections :
if section [ " name " ] == subclass_name :
subclass = section [ " subclass " ]
if subclass is None :
print ( " Use proper subclass " )
plugins = self . get_plugins ( subclass , [ name ] ) # type: ignore
for plugin in plugins :
print ( plugin . get_raw_default_config ( ) )