Adding additional data to the log file

pull/12/head
Thorsten Sick 3 years ago
parent 442c89e8c6
commit a1346e0ca5

@ -29,6 +29,7 @@ class AttackLog():
@param verbosity: verbosity setting from 0 to 3 for stdout printing @param verbosity: verbosity setting from 0 to 3 for stdout printing
""" """
self.log: list[dict] = [] self.log: list[dict] = []
self.machines: dict = []
self.verbosity = verbosity self.verbosity = verbosity
# TODO. As soon as someone wants custom timestamps, make the format variable # TODO. As soon as someone wants custom timestamps, make the format variable
@ -575,7 +576,18 @@ class AttackLog():
def get_dict(self): def get_dict(self):
""" Return logged data in dict format """ """ Return logged data in dict format """
return self.log res = {"boilerplate": {"log_format_major_version": 1, # Changes on changes that breaks readers (items are modified or deleted)
"log_format_minor_version": 1 # Changes even if just new data is added
},
"system_overview": self.machines,
"attack_log": self.log
}
return res
def add_machine_info(self, machine_info):
""" Adds a dict with machine info. One machine per call of this method """
self.machines.append(machine_info)
# TODO: doc_start_environment # TODO: doc_start_environment

@ -26,9 +26,9 @@ class DocGenerator():
template = env.get_template("attack_description.rst") template = env.get_template("attack_description.rst")
with open(jfile) as fh: with open(jfile) as fh:
events = json.load(fh) attack = json.load(fh)
rendered = template.render(events=events) rendered = template.render(events=attack["attack_log"], systems=attack["system_overview"], boilerplate=attack["boilerplate"])
print(rendered) print(rendered)
with open(outfile, "wt") as fh: with open(outfile, "wt") as fh:

@ -6,6 +6,7 @@ import os
import subprocess import subprocess
import time import time
import zipfile import zipfile
import shutil
from datetime import datetime from datetime import datetime
from app.attack_log import AttackLog from app.attack_log import AttackLog
@ -108,6 +109,16 @@ class Experiment():
running_agents = self.caldera_control.list_paws_of_running_agents() running_agents = self.caldera_control.list_paws_of_running_agents()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1) self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1)
# Add running machines to log
for t in self.targets:
i = t.get_machine_info()
i["role"] = "target"
self.attack_logger.add_machine_info(i)
i = self.attacker_1.get_machine_info()
i["role"] = "attacker"
self.attack_logger.add_machine_info(i)
# Attack them # Attack them
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1) self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1)
for target_1 in self.targets: for target_1 in self.targets:
@ -153,7 +164,7 @@ class Experiment():
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1) self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1)
# Run Kali attacks # Run plugin based attacks
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1) self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running attack plugins{CommandlineColors.ENDC}", 1)
for target_1 in self.targets: for target_1 in self.targets:
plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os()) plugin_based_attacks = self.experiment_config.get_plugin_based_attacks(target_1.get_os())
@ -231,6 +242,10 @@ class Experiment():
zfh.write(os.path.join(self.lootdir, "attack.json")) zfh.write(os.path.join(self.lootdir, "attack.json"))
# For automation purpose we copy the file into a standard file name
defaultname = os.path.join(self.lootdir, "..", "most_recent.zip")
shutil.copyfile(filename, defaultname)
@staticmethod @staticmethod
def __get_results_files(root): def __get_results_files(root):
""" Yields a list of potential result files """ Yields a list of potential result files

@ -3,6 +3,7 @@
""" (Virtual) machine handling. Start, stop, create and destroy. Starting remote commands on them. """ """ (Virtual) machine handling. Start, stop, create and destroy. Starting remote commands on them. """
import os import os
import socket
import time import time
import requests import requests
@ -372,6 +373,23 @@ class Machine():
return self.vm_manager.get(src, dst) return self.vm_manager.get(src, dst)
def get_machine_info(self) -> dict:
""" Returns a dict containing machine info """
return {"name": self.get_name(),
"nicknames": self.get_nicknames(),
"playground": self.get_playground(),
"net_id": self.get_ip(),
"ip": socket.gethostbyname(self.get_ip()),
"os": self.get_os(),
"paw": self.get_paw(),
"group": self.get_group(),
"sensors": [s.name for s in self.get_sensors()],
"vulnerabilities": [v.name for v in self.get_vulnerabilities()]
}
# TODO: Caldera implant
# TODO: Metasploit implant
def install_caldera_server(self, cleanup=False, version="2.8.1"): def install_caldera_server(self, cleanup=False, version="2.8.1"):
""" Installs the caldera server on the VM """ Installs the caldera server on the VM

@ -1,8 +1,38 @@
Attack Attack
====== ======
Target systems Boilerplate
-------------- -----------
PurpleDome, attack-log version: {{ boilerplate.log_format_major_version }}.{{ boilerplate.log_format_minor_version }}
Systems
-------
{% for s in systems %}
{{ s.role }}:{{ s.name }}
~~~~~~~~~~~~
IP: {{ s.ip }}
OS: {{ s.os }}
Paw: {{ s.paw }}
Group: {{ s.group }}
Sensors:
{% for sensor in s.sensors %}
* {{ sensor }}
{% endfor %} {# sensors #}
Vulnerabilities:
{% for vulnerability in s.vulnerabilities %}
* {{ vulnerability }}
{% endfor %} {# vulnerabilities #}
{% endfor %} {# systems #}
Attack steps Attack steps
------------ ------------

@ -18,7 +18,11 @@ class TestMachineConfig(unittest.TestCase):
""" The init is empty """ """ The init is empty """
al = AttackLog() al = AttackLog()
self.assertIsNotNone(al) self.assertIsNotNone(al)
self.assertEqual(al.get_dict(), [])
default = {"boilerplate": {'log_format_major_version': 1, 'log_format_minor_version': 1},
"system_overview": [],
"attack_log": []}
self.assertEqual(al.get_dict(), default)
def test_caldera_attack_start(self): def test_caldera_attack_start(self):
""" Starting a caldera attack """ """ Starting a caldera attack """
@ -39,16 +43,16 @@ class TestMachineConfig(unittest.TestCase):
description=description description=description
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "attack") self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "caldera") self.assertEqual(data["attack_log"][0]["sub_type"], "caldera")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target_paw"], paw) self.assertEqual(data["attack_log"][0]["target_paw"], paw)
self.assertEqual(data[0]["target_group"], group) self.assertEqual(data["attack_log"][0]["target_group"], group)
self.assertEqual(data[0]["ability_id"], ability_id) self.assertEqual(data["attack_log"][0]["ability_id"], ability_id)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data[0]["name"], name) self.assertEqual(data["attack_log"][0]["name"], name)
self.assertEqual(data[0]["description"], description) self.assertEqual(data["attack_log"][0]["description"], description)
def test_caldera_attack_stop(self): def test_caldera_attack_stop(self):
""" Stopping a caldera attack """ """ Stopping a caldera attack """
@ -69,16 +73,16 @@ class TestMachineConfig(unittest.TestCase):
description=description description=description
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "stop") self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack") self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "caldera") self.assertEqual(data["attack_log"][0]["sub_type"], "caldera")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target_paw"], paw) self.assertEqual(data["attack_log"][0]["target_paw"], paw)
self.assertEqual(data[0]["target_group"], group) self.assertEqual(data["attack_log"][0]["target_group"], group)
self.assertEqual(data[0]["ability_id"], ability_id) self.assertEqual(data["attack_log"][0]["ability_id"], ability_id)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data[0]["name"], name) self.assertEqual(data["attack_log"][0]["name"], name)
self.assertEqual(data[0]["description"], description) self.assertEqual(data["attack_log"][0]["description"], description)
def test_kali_attack_start(self): def test_kali_attack_start(self):
""" Starting a kali attack """ """ Starting a kali attack """
@ -93,13 +97,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp, ttp=ttp,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "attack") self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "kali") self.assertEqual(data["attack_log"][0]["sub_type"], "kali")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["kali_name"], attack_name) self.assertEqual(data["attack_log"][0]["kali_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_kali_attack_stop(self): def test_kali_attack_stop(self):
""" Stopping a kali attack """ """ Stopping a kali attack """
@ -114,13 +118,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp, ttp=ttp,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "stop") self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack") self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "kali") self.assertEqual(data["attack_log"][0]["sub_type"], "kali")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["kali_name"], attack_name) self.assertEqual(data["attack_log"][0]["kali_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_narration_start(self): def test_narration_start(self):
""" Starting a narration """ """ Starting a narration """
@ -130,10 +134,10 @@ class TestMachineConfig(unittest.TestCase):
al.start_narration(text al.start_narration(text
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "narration") self.assertEqual(data["attack_log"][0]["type"], "narration")
self.assertEqual(data[0]["sub_type"], "user defined narration") self.assertEqual(data["attack_log"][0]["sub_type"], "user defined narration")
self.assertEqual(data[0]["text"], text) self.assertEqual(data["attack_log"][0]["text"], text)
def test_build_start(self): def test_build_start(self):
""" Starting a build """ """ Starting a build """
@ -167,21 +171,21 @@ class TestMachineConfig(unittest.TestCase):
comment=comment comment=comment
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "build") self.assertEqual(data["attack_log"][0]["type"], "build")
self.assertEqual(data[0]["dl_uri"], dl_uri) self.assertEqual(data["attack_log"][0]["dl_uri"], dl_uri)
self.assertEqual(data[0]["dl_uris"], dl_uris) self.assertEqual(data["attack_log"][0]["dl_uris"], dl_uris)
self.assertEqual(data[0]["payload"], payload) self.assertEqual(data["attack_log"][0]["payload"], payload)
self.assertEqual(data[0]["platform"], platform) self.assertEqual(data["attack_log"][0]["platform"], platform)
self.assertEqual(data[0]["architecture"], architecture) self.assertEqual(data["attack_log"][0]["architecture"], architecture)
self.assertEqual(data[0]["lhost"], lhost) self.assertEqual(data["attack_log"][0]["lhost"], lhost)
self.assertEqual(data[0]["lport"], lport) self.assertEqual(data["attack_log"][0]["lport"], lport)
self.assertEqual(data[0]["filename"], filename) self.assertEqual(data["attack_log"][0]["filename"], filename)
self.assertEqual(data[0]["encoding"], encoding) self.assertEqual(data["attack_log"][0]["encoding"], encoding)
self.assertEqual(data[0]["encoded_filename"], encoded_filename) self.assertEqual(data["attack_log"][0]["encoded_filename"], encoded_filename)
self.assertEqual(data[0]["sRDI_conversion"], sRDI_conversion) self.assertEqual(data["attack_log"][0]["sRDI_conversion"], sRDI_conversion)
self.assertEqual(data[0]["for_step"], for_step) self.assertEqual(data["attack_log"][0]["for_step"], for_step)
self.assertEqual(data[0]["comment"], comment) self.assertEqual(data["attack_log"][0]["comment"], comment)
def test_build_start_default(self): def test_build_start_default(self):
""" Starting a build default values""" """ Starting a build default values"""
@ -189,21 +193,21 @@ class TestMachineConfig(unittest.TestCase):
al.start_build() al.start_build()
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "build") self.assertEqual(data["attack_log"][0]["type"], "build")
self.assertEqual(data[0]["dl_uri"], None) self.assertEqual(data["attack_log"][0]["dl_uri"], None)
self.assertEqual(data[0]["dl_uris"], None) self.assertEqual(data["attack_log"][0]["dl_uris"], None)
self.assertEqual(data[0]["payload"], None) self.assertEqual(data["attack_log"][0]["payload"], None)
self.assertEqual(data[0]["platform"], None) self.assertEqual(data["attack_log"][0]["platform"], None)
self.assertEqual(data[0]["architecture"], None) self.assertEqual(data["attack_log"][0]["architecture"], None)
self.assertEqual(data[0]["lhost"], None) self.assertEqual(data["attack_log"][0]["lhost"], None)
self.assertEqual(data[0]["lport"], None) self.assertEqual(data["attack_log"][0]["lport"], None)
self.assertEqual(data[0]["filename"], None) self.assertEqual(data["attack_log"][0]["filename"], None)
self.assertEqual(data[0]["encoding"], None) self.assertEqual(data["attack_log"][0]["encoding"], None)
self.assertEqual(data[0]["encoded_filename"], None) self.assertEqual(data["attack_log"][0]["encoded_filename"], None)
self.assertEqual(data[0]["sRDI_conversion"], False) self.assertEqual(data["attack_log"][0]["sRDI_conversion"], False)
self.assertEqual(data[0]["for_step"], None) self.assertEqual(data["attack_log"][0]["for_step"], None)
self.assertEqual(data[0]["comment"], None) self.assertEqual(data["attack_log"][0]["comment"], None)
def test_build_stop(self): def test_build_stop(self):
""" Stopping a build """ """ Stopping a build """
@ -212,9 +216,9 @@ class TestMachineConfig(unittest.TestCase):
al.stop_build(logid=logid) al.stop_build(logid=logid)
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "stop") self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data[0]["type"], "build") self.assertEqual(data["attack_log"][0]["type"], "build")
self.assertEqual(data[0]["logid"], logid) self.assertEqual(data["attack_log"][0]["logid"], logid)
def test_metasploit_attack_start(self): def test_metasploit_attack_start(self):
""" Starting a metasploit attack """ """ Starting a metasploit attack """
@ -229,13 +233,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp, ttp=ttp,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "attack") self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "metasploit") self.assertEqual(data["attack_log"][0]["sub_type"], "metasploit")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["metasploit_command"], attack_name) self.assertEqual(data["attack_log"][0]["metasploit_command"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_metasploit_attack_stop(self): def test_metasploit_attack_stop(self):
""" Stopping a metasploit attack """ """ Stopping a metasploit attack """
@ -250,13 +254,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp, ttp=ttp,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "stop") self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack") self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "metasploit") self.assertEqual(data["attack_log"][0]["sub_type"], "metasploit")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["metasploit_command"], attack_name) self.assertEqual(data["attack_log"][0]["metasploit_command"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_attack_plugin_start(self): def test_attack_plugin_start(self):
""" Starting a attack plugin """ """ Starting a attack plugin """
@ -271,13 +275,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp, ttp=ttp,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "attack") self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "attack_plugin") self.assertEqual(data["attack_log"][0]["sub_type"], "attack_plugin")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["plugin_name"], attack_name) self.assertEqual(data["attack_log"][0]["plugin_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_attack_plugin_stop(self): def test_attack_plugin_stop(self):
""" Stopping a attack plugin""" """ Stopping a attack plugin"""
@ -292,13 +296,13 @@ class TestMachineConfig(unittest.TestCase):
ttp=ttp, ttp=ttp,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "stop") self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack") self.assertEqual(data["attack_log"][0]["type"], "attack")
self.assertEqual(data[0]["sub_type"], "attack_plugin") self.assertEqual(data["attack_log"][0]["sub_type"], "attack_plugin")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["plugin_name"], attack_name) self.assertEqual(data["attack_log"][0]["plugin_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) self.assertEqual(data["attack_log"][0]["hunting_tag"], "MITRE_" + ttp)
def test_file_write_start(self): def test_file_write_start(self):
""" Starting a file write """ """ Starting a file write """
@ -311,12 +315,12 @@ class TestMachineConfig(unittest.TestCase):
file_name=file_name, file_name=file_name,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "dropping_file") self.assertEqual(data["attack_log"][0]["type"], "dropping_file")
self.assertEqual(data[0]["sub_type"], "by PurpleDome") self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["file_name"], file_name) self.assertEqual(data["attack_log"][0]["file_name"], file_name)
def test_file_write_stop(self): def test_file_write_stop(self):
""" Stopping a file write """ """ Stopping a file write """
@ -329,12 +333,12 @@ class TestMachineConfig(unittest.TestCase):
file_name=file_name, file_name=file_name,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "stop") self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data[0]["type"], "dropping_file") self.assertEqual(data["attack_log"][0]["type"], "dropping_file")
self.assertEqual(data[0]["sub_type"], "by PurpleDome") self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["file_name"], file_name) self.assertEqual(data["attack_log"][0]["file_name"], file_name)
def test_execute_payload_start(self): def test_execute_payload_start(self):
""" Starting a execute payload """ """ Starting a execute payload """
@ -347,12 +351,12 @@ class TestMachineConfig(unittest.TestCase):
command=command, command=command,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "start") self.assertEqual(data["attack_log"][0]["event"], "start")
self.assertEqual(data[0]["type"], "execute_payload") self.assertEqual(data["attack_log"][0]["type"], "execute_payload")
self.assertEqual(data[0]["sub_type"], "by PurpleDome") self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["command"], command) self.assertEqual(data["attack_log"][0]["command"], command)
def test_execute_payload_stop(self): def test_execute_payload_stop(self):
""" Stopping a execute payload """ """ Stopping a execute payload """
@ -365,12 +369,12 @@ class TestMachineConfig(unittest.TestCase):
command=command, command=command,
) )
data = al.get_dict() data = al.get_dict()
self.assertEqual(data[0]["event"], "stop") self.assertEqual(data["attack_log"][0]["event"], "stop")
self.assertEqual(data[0]["type"], "execute_payload") self.assertEqual(data["attack_log"][0]["type"], "execute_payload")
self.assertEqual(data[0]["sub_type"], "by PurpleDome") self.assertEqual(data["attack_log"][0]["sub_type"], "by PurpleDome")
self.assertEqual(data[0]["source"], source) self.assertEqual(data["attack_log"][0]["source"], source)
self.assertEqual(data[0]["target"], target) self.assertEqual(data["attack_log"][0]["target"], target)
self.assertEqual(data[0]["command"], command) self.assertEqual(data["attack_log"][0]["command"], command)
def test_mitre_fix_ttp_is_none(self): def test_mitre_fix_ttp_is_none(self):
""" Testing the mitre ttp fix for ttp being none """ """ Testing the mitre ttp fix for ttp being none """

Loading…
Cancel
Save