@ -4,8 +4,9 @@
from glob import glob
import os
import re
from typing import Optional
from typing import Optional , Any
import straight . plugin # type: ignore
from straight . plugin . manager import PluginManager as StraightPluginManager # type: ignore
from plugins . base . plugin_base import BasePlugin
@ -47,7 +48,8 @@ class PluginManager():
self . base = basedir
self . attack_logger = attack_logger
def get_plugins ( self , subclass , name_filter : Optional [ list [ str ] ] = None ) - > list [ BasePlugin ] :
def get_plugins ( self , subclass : Any ,
name_filter : Optional [ list [ str ] ] = None ) - > list [ BasePlugin ] :
""" Returns a list plugins matching specified criteria
@ -58,7 +60,7 @@ class PluginManager():
res = [ ]
def get_handlers ( a_plugin ):
def get_handlers ( a_plugin : StraightPluginManager ) - > list [ BasePlugin ] :
return a_plugin . produce ( )
plugin_dirs = set ( )
@ -81,7 +83,8 @@ class PluginManager():
res . append ( plugin )
return res
def count_caldera_requirements ( self , subclass , name_filter = None ) - > int :
def count_caldera_requirements ( self , subclass : Any ,
name_filter : Optional [ list [ str ] ] = None ) - > int :
""" Count the plugins matching the filter that have caldera requirements """
# So far it only supports attack plugins. Maybe this will be extended to other plugin types later.
@ -98,7 +101,8 @@ class PluginManager():
return res
def count_metasploit_requirements ( self , subclass , name_filter = None ) - > int :
def count_metasploit_requirements ( self , subclass : Any ,
name_filter : Optional [ list [ str ] ] = None ) - > int :
""" Count the plugins matching the filter that have metasploit requirements """
# So far it only supports attack plugins. Maybe this will be extended to other plugin types later.
@ -115,18 +119,18 @@ class PluginManager():
return res
def print_list ( self ) :
def print_list ( self ) - > None :
""" Print a pretty list of all available plugins """
for section in sections :
print ( f ' \t \t { section [ " name " ] } ' )
plugins = self . get_plugins ( section [ " subclass " ] )
plugins = self . get_plugins ( section [ " subclass " ] ) # type: ignore
for plugin in plugins :
print ( f " Name: { plugin . get_name ( ) } " )
print ( f " Description: { plugin . get_description ( ) } " )
print ( " \t " )
def is_ttp_wrong ( self , ttp ):
def is_ttp_wrong ( self , ttp : Optional [ str ] ) - > bool :
""" Checks if a ttp is a valid ttp """
if ttp is None :
return True
@ -149,7 +153,7 @@ class PluginManager():
return True
def check ( self , plugin ):
def check ( self , plugin : BasePlugin ) - > list [ str ] :
""" Checks a plugin for valid implementation
: returns : A list of issues
@ -170,67 +174,69 @@ class PluginManager():
# Sensors
if issubclass ( type ( plugin ) , SensorPlugin ) :
# essential methods: collect
if plugin . collect . __func__ is SensorPlugin . collect :
if plugin . collect . __func__ is SensorPlugin . collect : # type: ignore
report = f " Method ' collect ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
# Attacks
if issubclass ( type ( plugin ) , AttackPlugin ) :
# essential methods: run
if plugin . run . __func__ is AttackPlugin . run :
if plugin . run . __func__ is AttackPlugin . run : # type: ignore
report = f " Method ' run ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if self . is_ttp_wrong ( plugin . ttp ) :
report = f " Attack plugins need a valid ttp number (either T1234, T1234.222 or ???) { plugin . get_name ( ) } uses { plugin . ttp } in { plugin . plugin_path } "
if self . is_ttp_wrong ( plugin . ttp ) : # type: ignore
report = f " Attack plugins need a valid ttp number (either T1234, T1234.222 or ???) { plugin . get_name ( ) } uses { plugin . ttp } in { plugin . plugin_path } " # type: ignore
issues . append ( report )
# Machinery
if issubclass ( type ( plugin ) , MachineryPlugin ) :
# essential methods: get_ip, get_state, up. halt, create, destroy
if plugin . get_state . __func__ is MachineryPlugin . get_state :
if plugin . get_state . __func__ is MachineryPlugin . get_state : # type: ignore
report = f " Method ' get_state ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if ( plugin . get_ip . __func__ is MachineryPlugin . get_ip ) or ( plugin . get_ip . __func__ is SSHFeatures . get_ip ) :
if ( plugin . get_ip . __func__ is MachineryPlugin . get_ip ) or ( plugin . get_ip . __func__ is SSHFeatures . get_ip ) : # type: ignore
report = f " Method ' get_ip ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . up . __func__ is MachineryPlugin . up :
if plugin . up . __func__ is MachineryPlugin . up : # type: ignore
report = f " Method ' up ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . halt . __func__ is MachineryPlugin . halt :
if plugin . halt . __func__ is MachineryPlugin . halt : # type: ignore
report = f " Method ' halt ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . create . __func__ is MachineryPlugin . create :
if plugin . create . __func__ is MachineryPlugin . create : # type: ignore
report = f " Method ' create ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . destroy . __func__ is MachineryPlugin . destroy :
if plugin . destroy . __func__ is MachineryPlugin . destroy : # type: ignore
report = f " Method ' destroy ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
# Vulnerabilities
if issubclass ( type ( plugin ) , VulnerabilityPlugin ) :
# essential methods: start, stop
if plugin . start . __func__ is VulnerabilityPlugin . start :
if plugin . start . __func__ is VulnerabilityPlugin . start : # type: ignore
report = f " Method ' start ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if plugin . stop . __func__ is VulnerabilityPlugin . stop :
if plugin . stop . __func__ is VulnerabilityPlugin . stop : # type: ignore
report = f " Method ' stop ' not implemented in { plugin . get_name ( ) } in { plugin . plugin_path } "
issues . append ( report )
if self . is_ttp_wrong ( plugin . ttp ) :
report = f " Vulnerability plugins need a valid ttp number (either T1234, T1234.222 or ???) { plugin . get_name ( ) } uses { plugin . ttp } in { plugin . plugin_path } "
if self . is_ttp_wrong ( plugin . ttp ) : # type: ignore
report = f " Vulnerability plugins need a valid ttp number (either T1234, T1234.222 or ???) { plugin . get_name ( ) } uses { plugin . ttp } in { plugin . plugin_path } " # type: ignore
issues . append ( report )
return issues
def print_check ( self ) :
def print_check ( self ) - > list [ str ] :
""" Iterates through all installed plugins and verifies them """
names = { }
cnames = { }
names : dict [ str , str ] = { }
cnames : dict [ str , object ] = { }
issues = [ ]
for section in sections :
# print(f'\t\t{section["name"]}')
plugins = self . get_plugins ( section [ " subclass " ] )
subclass = section [ " subclass " ]
plugins = self . get_plugins ( subclass ) # type: ignore
for plugin in plugins :
# print(f"Checking: {plugin.get_name()}")
@ -240,7 +246,10 @@ class PluginManager():
report = f " Name duplication: { name } is used in { names [ name ] } and { plugin . plugin_path } "
issues . append ( report )
self . attack_logger . vprint ( f " { CommandlineColors . BACKGROUND_RED } { report } { CommandlineColors . ENDC } " , 0 )
names [ name ] = plugin . plugin_path
ppath = plugin . plugin_path
if ppath is None :
raise Exception ( " A plugin has no path " )
names [ name ] = ppath
# Check for duplicate class names
name = type ( plugin ) . __name__
@ -263,7 +272,7 @@ class PluginManager():
# TODO: Add verify command to verify all plugins (or a specific one)
def print_default_config ( self , subclass_name , name ):
def print_default_config ( self , subclass_name : str , name : str ) - > None :
""" Pretty prints the default config for this plugin """
subclass = None
@ -274,6 +283,6 @@ class PluginManager():
if subclass is None :
print ( " Use proper subclass " )
plugins = self . get_plugins ( subclass , [ name ] )
plugins = self . get_plugins ( subclass , [ name ] ) # type: ignore
for plugin in plugins :
print ( plugin . get_raw_default_config ( ) )