From b2c36de4029616ef00734478268bb110f64ebb83 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Tue, 13 Jul 2021 11:06:04 +0200 Subject: [PATCH] Automated tests for plugins --- app/pluginmanager.py | 105 ++++++++++++++++++++++++++++++++++++++++++- plugin_manager.py | 25 ++++++++++- tox.ini | 1 + 3 files changed, 128 insertions(+), 3 deletions(-) mode change 100644 => 100755 plugin_manager.py diff --git a/app/pluginmanager.py b/app/pluginmanager.py index 74fdfe9..722e656 100644 --- a/app/pluginmanager.py +++ b/app/pluginmanager.py @@ -10,13 +10,14 @@ from plugins.base.attack import AttackPlugin from plugins.base.machinery import MachineryPlugin from plugins.base.sensor import SensorPlugin from plugins.base.vulnerability_plugin import VulnerabilityPlugin +from app.interface_sfx import CommandlineColors # from app.interface_sfx import CommandlineColors sections = [{"name": "Vulnerabilities", "subclass": VulnerabilityPlugin}, {"name": "Machinery", "subclass": MachineryPlugin}, - {"name": "Kali", + {"name": "Attack", "subclass": AttackPlugin}, {"name": "Sensors", "subclass": SensorPlugin}, @@ -79,6 +80,108 @@ class PluginManager(): print(f"Description: {plugin.get_description()}") print("\t") + def check(self, plugin): + """ Checks a plugin for valid implementation + + @returns: A list of issues + """ + + issues = [] + + # Sensors + if issubclass(type(plugin), SensorPlugin): + # essential methods: collect + if plugin.collect.__func__ is SensorPlugin.collect: + report = f"Method 'collect' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + + # Attacks + if issubclass(type(plugin), AttackPlugin): + # essential methods: run + if plugin.run.__func__ is AttackPlugin.run: + report = f"Method 'run' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + + # Machinery + if issubclass(type(plugin), MachineryPlugin): + # essential methods: get_ip, get_state, up. halt, create, destroy + if plugin.get_state.__func__ is MachineryPlugin.get_state: + report = f"Method 'get_state' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + if plugin.get_ip.__func__ is MachineryPlugin.get_ip: + report = f"Method 'get_ip' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + if plugin.up.__func__ is MachineryPlugin.up: + report = f"Method 'up' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + if plugin.halt.__func__ is MachineryPlugin.halt: + report = f"Method 'halt' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + if plugin.create.__func__ is MachineryPlugin.create: + report = f"Method 'create' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + if plugin.destroy.__func__ is MachineryPlugin.destroy: + report = f"Method 'destroy' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + + # Vulnerabilities + if issubclass(type(plugin), VulnerabilityPlugin): + # essential methods: start, stop + if plugin.start.__func__ is VulnerabilityPlugin.start: + report = f"Method 'start' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + if plugin.stop.__func__ is VulnerabilityPlugin.stop: + report = f"Method 'stop' not implemented in {plugin.get_name()} in {plugin.plugin_path}" + issues.append(report) + + return issues + + def print_check(self): + """ Iterates through all installed plugins and verifies them """ + + # TODO: Identical name + # TODO: identical class name + + names = {} + cnames = {} + + issues = [] + for section in sections: + # print(f'\t\t{section["name"]}') + plugins = self.get_plugins(section["subclass"]) + + for plugin in plugins: + # print(f"Checking: {plugin.get_name()}") + # Check for duplicate names + name = plugin.get_name() + if name in names: + report = f"Name duplication: {name} is used in {names[name]} and {plugin.plugin_path}" + issues.append(report) + self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{report}{CommandlineColors.ENDC}", 0) + names[name] = plugin.plugin_path + + # Check for duplicate class names + name = type(plugin).__name__ + if name in cnames: + report = f"Class name duplication: {name} is used in {cnames[name]} and {plugin.plugin_path}" + issues.append(report) + self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{report}{CommandlineColors.ENDC}", 0) + cnames[name] = type(plugin) + + # Deep checks + + result = self.check(plugin) + + for r in result: + print(f"* Issue: {r}") + if len(result): + for r in result: + issues.append(r) + self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{r}{CommandlineColors.ENDC}", 1) + return issues + + # TODO: Add verify command to verify all plugins (or a specific one) + def print_default_config(self, subclass_name, name): subclass = None diff --git a/plugin_manager.py b/plugin_manager.py old mode 100644 new mode 100755 index e45bdcf..f1d3e2c --- a/plugin_manager.py +++ b/plugin_manager.py @@ -2,6 +2,8 @@ """ Managing plugins """ import argparse +import sys + from app.pluginmanager import PluginManager from app.attack_log import AttackLog @@ -12,6 +14,20 @@ def list_plugins(arguments): attack_logger = AttackLog(arguments.verbose) p = PluginManager(attack_logger) p.print_list() + return 0 + + +def check_plugins(arguments): + """ Check plugins for validity """ + + attack_logger = AttackLog(arguments.verbose) + p = PluginManager(attack_logger) + res = p.print_check() + if len(res): + print("*************************************") + print("Some issues in plugins were found: ") + print("\n".join(res)) + return len(res) def get_default_config(arguments): @@ -29,11 +45,15 @@ def create_parser(): main_parser.add_argument('--verbose', '-v', action='count', default=0) subparsers = main_parser.add_subparsers(help="sub-commands") - # Sub parser for machine creation + # Sub parser for plugin list parser_list = subparsers.add_parser("list", help="list plugins") parser_list.set_defaults(func=list_plugins) # parser_list.add_argument("--configfile", default="experiment.yaml", help="Config file to create from") + # Sub parser for plugin check + parser_list = subparsers.add_parser("check", help="check plugin implementation") + parser_list.set_defaults(func=check_plugins) + parser_default_config = subparsers.add_parser("raw_config", help="print raw default config of the given plugin") parser_default_config.set_defaults(func=get_default_config) parser_default_config.add_argument("subclass_name", help="name of the subclass") @@ -49,4 +69,5 @@ if __name__ == "__main__": args = parser.parse_args() - args.func(args) + exval = args.func(args) + sys.exit(exval) diff --git a/tox.ini b/tox.ini index ef5891f..09b0859 100644 --- a/tox.ini +++ b/tox.ini @@ -51,3 +51,4 @@ commands = bandit -ll -r app/ plugins/ *.py # Linting # pylint *.py # currently off. Needs configuration + python3 ./plugin_manager.py check