mirror of https://github.com/avast/PurpleDome
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
177 lines
8.6 KiB
Python
177 lines
8.6 KiB
Python
import unittest
|
|
from app.pluginmanager import PluginManager
|
|
from app.attack_log import AttackLog
|
|
|
|
from plugins.base.sensor import SensorPlugin
|
|
from plugins.base.attack import AttackPlugin
|
|
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
|
|
from plugins.base.machinery import MachineryPlugin
|
|
|
|
# https://docs.python.org/3/library/unittest.html
|
|
|
|
|
|
class TestMachineControl(unittest.TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
self.attack_logger = AttackLog(0)
|
|
|
|
def test_basic_pluginmanager_init(self):
|
|
""" just a simple init """
|
|
p = PluginManager(self.attack_logger)
|
|
self.assertIsNotNone(p)
|
|
|
|
def test_basic_pluginmanager_get_plugins_empty(self):
|
|
""" just a simple getting plugins with empty directory """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/none")
|
|
self.assertEqual(p.get_plugins(AttackPlugin), [])
|
|
|
|
def test_basic_pluginmanager_get_caldera_plugin(self):
|
|
""" just a simple getting the one caldera plugin """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/caldera/caldera_ok.py")
|
|
plugins = p.get_plugins(AttackPlugin)
|
|
self.assertEqual(plugins[0].name, "caldera_autostart_1")
|
|
self.assertEqual(len(plugins), 1)
|
|
|
|
def test_basic_pluginmanager_count_caldera_plugin(self):
|
|
""" counting caldera requirements """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/caldera/caldera_ok.py")
|
|
plugins = p.count_caldera_requirements(AttackPlugin, None)
|
|
self.assertEqual(plugins, 1)
|
|
|
|
def test_basic_pluginmanager_count_metasploit_plugin(self):
|
|
""" counting caldera requirements """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/caldera/caldera_ok.py")
|
|
plugins = p.count_metasploit_requirements(AttackPlugin, None)
|
|
self.assertEqual(plugins, 0)
|
|
|
|
def test_basic_pluginmanager_count_metasploit_plugin_2(self):
|
|
""" counting metasploit requirements """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/metasploit/metasploit_ok.py")
|
|
plugins = p.count_metasploit_requirements(AttackPlugin, None)
|
|
self.assertEqual(plugins, 1)
|
|
|
|
def test_basic_pluginmanager_check_ok(self):
|
|
""" basic check for a plugin, ok """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/metasploit/metasploit_ok.py")
|
|
plugins = p.get_plugins(AttackPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertEqual(c, [])
|
|
|
|
def test_basic_pluginmanager_check_sensor_plugin_ok(self):
|
|
""" just a simple getting the one sensor plugin """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/sensor/sensor_ok/*.py")
|
|
plugins = p.get_plugins(SensorPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertEqual(c, [])
|
|
|
|
def test_basic_pluginmanager_check_sensor_plugin_missing_collect(self):
|
|
""" a sensor plugin with missing collect should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/sensor/missing_collect/*.py")
|
|
plugins = p.get_plugins(SensorPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertRegexpMatches(c[0], "Method 'collect' not implemented in missing_collect in .*")
|
|
|
|
def test_basic_pluginmanager_pick_sensor_plugin_by_name(self):
|
|
""" get a plugin by name """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/sensor/two_sensors/*/*.py")
|
|
plugins = p.get_plugins(SensorPlugin, ["pick_me"])
|
|
self.assertEqual(len(plugins), 1)
|
|
self.assertEqual(plugins[0].get_name(), "pick_me")
|
|
|
|
def test_basic_pluginmanager_pick_sensor_plugin_by_name_2(self):
|
|
""" get two plugins by name """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/sensor/two_sensors/*/*.py")
|
|
plugins = p.get_plugins(SensorPlugin, ["pick_me", "ignore_me"])
|
|
self.assertEqual(len(plugins), 2)
|
|
|
|
def test_basic_pluginmanager_pick_sensor_plugin_by_name_3(self):
|
|
""" not finding any plugin by name """
|
|
p = PluginManager(self.attack_logger, "tests/plugins/sensor/two_sensors/*/*.py")
|
|
plugins = p.get_plugins(SensorPlugin, ["fail"])
|
|
self.assertEqual(len(plugins), 0)
|
|
|
|
def test_basic_pluginmanager_check_attack_plugin_missing_run(self):
|
|
""" a attack plugin with missing run should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/attack/missing_run/*.py")
|
|
plugins = p.get_plugins(AttackPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertRegex(c[0], "Method 'run' not implemented in missing_run in .*")
|
|
|
|
def test_basic_pluginmanager_check_vulnerability_plugin_ok(self):
|
|
""" a vulnerability plugin ok on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/vulnerabilities/ok/*.py")
|
|
plugins = p.get_plugins(VulnerabilityPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertEqual(len(c), 0)
|
|
|
|
def test_basic_pluginmanager_check_vulnerability_plugin_missing_start(self):
|
|
""" a vulnerability plugin with missing start should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/vulnerabilities/no_start/*.py")
|
|
plugins = p.get_plugins(VulnerabilityPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertRegex(c[0], "Method 'start' not implemented in missing_start in .*")
|
|
|
|
def test_basic_pluginmanager_check_vulnerability_plugin_missing_stop(self):
|
|
""" a vulnerability plugin with missing stop should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/vulnerabilities/no_stop/*.py")
|
|
plugins = p.get_plugins(VulnerabilityPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertRegex(c[0], "Method 'stop' not implemented in missing_stop in .*")
|
|
|
|
def test_basic_pluginmanager_check_machinery_plugin_ok(self):
|
|
""" a machinery plugin ok on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/machinery/ok/*.py")
|
|
plugins = p.get_plugins(MachineryPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertEqual(len(c), 0)
|
|
|
|
def test_basic_pluginmanager_check_machinery_plugin_missing_up(self):
|
|
""" a machinery plugin with missing up should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_up/*.py")
|
|
plugins = p.get_plugins(MachineryPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertRegex(c[0], "Method 'up' not implemented in machinery_no_up in .*")
|
|
|
|
def test_basic_pluginmanager_check_machinery_plugin_missing_state(self):
|
|
""" a machinery plugin with missing get_state should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_state/*.py")
|
|
plugins = p.get_plugins(MachineryPlugin)
|
|
c = p.check(plugins[0])
|
|
self.assertRegex(c[0], "Method 'get_state' not implemented in machinery_no_state in .*")
|
|
|
|
def test_basic_pluginmanager_check_machinery_plugin_missing_ip(self):
|
|
""" a machinery plugin with missing get_ip should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_ip/*.py")
|
|
plugins = p.get_plugins(MachineryPlugin)
|
|
# breakpoint()
|
|
c = p.check(plugins[0])
|
|
self.assertEqual(len(c), 1)
|
|
self.assertRegex(c[0], "Method 'get_ip' not implemented in machinery_no_ip in .*")
|
|
|
|
def test_basic_pluginmanager_check_machinery_plugin_missing_halt(self):
|
|
""" a machinery plugin with missing halt should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_halt/*.py")
|
|
plugins = p.get_plugins(MachineryPlugin)
|
|
# breakpoint()
|
|
c = p.check(plugins[0])
|
|
self.assertEqual(len(c), 1)
|
|
self.assertRegex(c[0], "Method 'halt' not implemented in machinery_no_halt in .*")
|
|
|
|
def test_basic_pluginmanager_check_machinery_plugin_missing_destroyt(self):
|
|
""" a machinery plugin with missing destroy should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_destroy/*.py")
|
|
plugins = p.get_plugins(MachineryPlugin)
|
|
# breakpoint()
|
|
c = p.check(plugins[0])
|
|
self.assertEqual(len(c), 1)
|
|
self.assertRegex(c[0], "Method 'destroy' not implemented in machinery_no_destroy in .*")
|
|
|
|
def test_basic_pluginmanager_check_machinery_plugin_missing_create(self):
|
|
""" a machinery plugin with missing create should throw an error on check"""
|
|
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_create/*.py")
|
|
plugins = p.get_plugins(MachineryPlugin)
|
|
# breakpoint()
|
|
c = p.check(plugins[0])
|
|
self.assertEqual(len(c), 1)
|
|
self.assertRegex(c[0], "Method 'create' not implemented in machinery_no_create in .*")
|