Unit test and code cleanup

pull/3/head
Thorsten Sick 3 years ago
parent f4cdde6776
commit d88a1625a9

@ -12,7 +12,7 @@ import simplejson
from app.exceptions import CalderaError
from app.interface_sfx import CommandlineColors
from app.attack_log import AttackLog
from pprint import pprint
from pprint import pprint, pformat
# TODO: Ability deserves an own class.
@ -519,18 +519,17 @@ class CalderaControl():
self.attack_logger.vprint(f"New adversary generated. ID: {adid}, ability: {ability_id} group: {group}", 2)
res = self.add_operation(operation_name, advid=adid, group=group)
# print("Add operation: ")
# pprint(res)
self.attack_logger.vprint(pformat(res), 3)
opid = self.get_operation(operation_name)["id"]
self.attack_logger.vprint("New operation created. OpID: " + str(opid), 3)
self.execute_operation(opid)
self.attack_logger.vprint("Execute operation",3 )
self.attack_logger.vprint("Execute operation", 3)
retries = 30
ability_name = self.get_ability(ability_id)[0]["name"]
ability_description = self.get_ability(ability_id)[0]["description"]
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Executed attack operation{CommandlineColors.ENDC}",1)
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Executed attack operation{CommandlineColors.ENDC}", 1)
self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_BLUE} PAW: {paw} Group: {group} Ability: {ability_id} {CommandlineColors.ENDC}", 1)
self.attack_logger.vprint(f"{CommandlineColors.BACKGROUND_BLUE} {ability_name}: {ability_description} {CommandlineColors.ENDC}", 1)
while not self.is_operation_finished(opid) and retries > 0:

@ -65,18 +65,18 @@ class Experiment():
needs_reboot = target_1.prime_sensors()
if needs_reboot:
target_1.reboot()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}",1)
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1)
self.targets.append(target_1)
# Install vulnerabilities
for a_target in self.targets:
self.attack_logger.vprint(f"Installing vulnerabilities on {a_target.get_paw()}",2)
self.attack_logger.vprint(f"Installing vulnerabilities on {a_target.get_paw()}", 2)
a_target.install_vulnerabilities()
a_target.start_vulnerabilities()
# Install sensor plugins
for a_target in self.targets:
self.attack_logger.vprint(f"Installing sensors on {a_target.get_paw()}",2)
self.attack_logger.vprint(f"Installing sensors on {a_target.get_paw()}", 2)
a_target.install_sensors()
a_target.start_sensors()
@ -101,7 +101,7 @@ class Experiment():
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Caldera agents reached{CommandlineColors.ENDC}", 1)
# Attack them
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}",1 )
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Caldera attacks{CommandlineColors.ENDC}", 1)
for target_1 in self.targets:
# Run caldera attacks
caldera_attacks = self.experiment_control.get_caldera_attacks(target_1.get_os())
@ -163,8 +163,7 @@ class Experiment():
for a_target in self.targets:
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE} Uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1)
a_target.stop_vulnerabilities()
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN} Done uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}",
1)
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN} Done uninstalling vulnerabilities on {a_target.get_paw()} {CommandlineColors.ENDC}", 1)
# Stop target machines
for target_1 in self.targets:

@ -275,10 +275,10 @@ class Machine():
os.mkdir(machine_specific_path)
for plugin in self.get_sensors():
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Collecting sensor: {plugin.get_name()}{CommandlineColors.ENDC}",2 )
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Collecting sensor: {plugin.get_name()}{CommandlineColors.ENDC}", 2)
plugin.set_machine_plugin(self.vm_manager)
plugin.__call_collect__(machine_specific_path)
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Collected sensor: {plugin.get_name()}{CommandlineColors.ENDC}",2 )
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Collected sensor: {plugin.get_name()}{CommandlineColors.ENDC}", 2)
############
@ -291,7 +291,7 @@ class Machine():
for plugin in self.plugin_manager.get_plugins(VulnerabilityPlugin, self.config.vulnerabilities()):
name = plugin.get_name()
self.attack_logger.vprint(f"Configured vulnerabilities: {self.config.vulnerabilities()}",3 )
self.attack_logger.vprint(f"Configured vulnerabilities: {self.config.vulnerabilities()}", 3)
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Installing vulnerability: {name}{CommandlineColors.ENDC}", 2)
syscon = {"abs_machinepath_internal": self.abs_machinepath_internal,
"abs_machinepath_external": self.abs_machinepath_external}
@ -313,7 +313,7 @@ class Machine():
"""
for plugin in self.get_vulnerabilities():
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Activating vulnerability: {plugin.get_name()}{CommandlineColors.ENDC}",2 )
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Activating vulnerability: {plugin.get_name()}{CommandlineColors.ENDC}", 2)
plugin.set_machine_plugin(self.vm_manager)
plugin.start()

@ -4,7 +4,6 @@
from plugins.base.sensor import SensorPlugin
import os
import time
from jinja2 import Environment, FileSystemLoader, select_autoescape
@ -15,7 +14,7 @@ class LinuxFilebeatPlugin(SensorPlugin):
required_files = ["filebeat.conf",
"filebeat.yml",
]
]
def __init__(self):
super().__init__()
@ -58,7 +57,7 @@ class LinuxFilebeatPlugin(SensorPlugin):
self.vprint("Installing Linux filebeat sensor", 3)
self.run_cmd(f"sudo wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -")
self.run_cmd("sudo wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -")
self.run_cmd('sudo echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list')
self.run_cmd("sudo apt update")
self.run_cmd("sudo apt -y install default-jre")
@ -100,4 +99,4 @@ class LinuxFilebeatPlugin(SensorPlugin):
""" Collect sensor data """
pg = self.get_playground()
self.get_from_machine(f"{pg}/filebeat.json", os.path.join(path, "filebeat.json")) # nosec
self.get_from_machine(f"{pg}/filebeat.json", os.path.join(path, "filebeat.json")) # nosec

@ -3,6 +3,7 @@ from unittest.mock import patch
from app.calderacontrol import CalderaControl
from simplejson.errors import JSONDecodeError
from app.exceptions import CalderaError
from app.attack_log import AttackLog
# https://docs.python.org/3/library/unittest.html
@ -10,7 +11,8 @@ from app.exceptions import CalderaError
class TestExample(unittest.TestCase):
def setUp(self) -> None:
self.cc = CalderaControl("https://localhost", apikey="123")
self.attack_logger = AttackLog(0)
self.cc = CalderaControl("https://localhost", attack_logger=self.attack_logger, apikey="123")
def tearDown(self) -> None:
pass

@ -4,12 +4,16 @@ from app.machinecontrol import Machine
from app.exceptions import ConfigurationError
from app.config import MachineConfig
from unittest.mock import patch
from app.attack_log import AttackLog
# https://docs.python.org/3/library/unittest.html
class TestMachineControl(unittest.TestCase):
def setUp(self) -> None:
self.attack_logger = AttackLog(0)
def test_get_os_linux_machine(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
@ -17,7 +21,7 @@ class TestMachineControl(unittest.TestCase):
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
"vm_name": "target3"}, self.attack_logger)
self.assertEqual(m.get_os(), "linux")
def test_get_os_linux_machine_with_config_class(self):
@ -28,7 +32,7 @@ class TestMachineControl(unittest.TestCase):
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
m = Machine(mc)
m = Machine(mc, self.attack_logger)
self.assertEqual(m.get_os(), "linux")
def test_get_os_missing(self):
@ -39,7 +43,7 @@ class TestMachineControl(unittest.TestCase):
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
}, self.attack_logger)
def test_get_os_not_supported(self):
with self.assertRaises(ConfigurationError):
@ -49,7 +53,7 @@ class TestMachineControl(unittest.TestCase):
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
"vm_name": "target3"}, self.attack_logger)
def test_get_paw_good(self):
m = Machine({"root": "systems/attacker1",
@ -59,7 +63,7 @@ class TestMachineControl(unittest.TestCase):
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
"vm_name": "target3"}, self.attack_logger)
self.assertEqual(m.get_paw(), "testme")
def test_get_paw_missing(self):
@ -70,7 +74,7 @@ class TestMachineControl(unittest.TestCase):
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
}, self.attack_logger)
self.assertEqual(m.get_paw(), None)
def test_get_group_good(self):
@ -81,7 +85,7 @@ class TestMachineControl(unittest.TestCase):
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
"vm_name": "target3"}, self.attack_logger)
self.assertEqual(m.get_group(), "testme")
def test_get_group_missing(self):
@ -92,7 +96,7 @@ class TestMachineControl(unittest.TestCase):
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
}, self.attack_logger)
self.assertEqual(m.get_group(), None)
def test_vagrantfilepath_missing(self):
@ -103,7 +107,7 @@ class TestMachineControl(unittest.TestCase):
"type": "vagrant",
},
"vm_name": "target3"
})
}, self.attack_logger)
def test_vagrantfile_missing(self):
with self.assertRaises(ConfigurationError):
@ -114,7 +118,7 @@ class TestMachineControl(unittest.TestCase):
"vagrantfilepath": "non_existing",
},
"vm_name": "target3"
})
}, self.attack_logger)
def test_vagrantfile_existing(self):
m = Machine({"root": "systems/attacker1",
@ -124,7 +128,7 @@ class TestMachineControl(unittest.TestCase):
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
}, self.attack_logger)
self.assertIsNotNone(m)
def test_name_missing(self):
@ -135,7 +139,7 @@ class TestMachineControl(unittest.TestCase):
"type": "vagrant",
"vagrantfilepath": "systems",
},
})
}, self.attack_logger)
# test: auto generated, dir missing
def test_auto_generated_machinepath_with_path_missing(self):
@ -147,7 +151,7 @@ class TestMachineControl(unittest.TestCase):
"vagrantfilepath": "systems",
},
"vm_name": "missing"
})
}, self.attack_logger)
# test manual config, dir missing
def test_configured_machinepath_with_path_missing(self):
@ -160,7 +164,7 @@ class TestMachineControl(unittest.TestCase):
},
"vm_name": "target3",
"machinepath": "missing"
})
}, self.attack_logger)
# test auto generated, dir there (external/internal dirs must work !)
def test_auto_generated_machinepath_with_good_config(self):
@ -171,7 +175,7 @@ class TestMachineControl(unittest.TestCase):
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
}, self.attack_logger)
vagrantfilepath = os.path.abspath("systems")
ext = os.path.join(vagrantfilepath, "target3")
internal = os.path.join("/vagrant/", "target3")
@ -189,7 +193,7 @@ class TestMachineControl(unittest.TestCase):
},
"vm_name": "missing",
"machinepath": "target3"
})
}, self.attack_logger)
vagrantfilepath = os.path.abspath("systems")
ext = os.path.join(vagrantfilepath, "target3")
internal = os.path.join("/vagrant/", "target3")
@ -204,7 +208,7 @@ class TestMachineControl(unittest.TestCase):
"os": "linux",
"vm_name": "missing",
"machinepath": "target3"
})
}, self.attack_logger)
# vm_controller wrong
def test_configured_vm_controller_wrong_type(self):
@ -217,7 +221,7 @@ class TestMachineControl(unittest.TestCase):
},
"vm_name": "missing",
"machinepath": "target3"
})
}, self.attack_logger)
# Create caldera start command and verify it
def test_get_linux_caldera_start_cmd(self):
@ -229,7 +233,7 @@ class TestMachineControl(unittest.TestCase):
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw"})
"paw": "testpaw"}, self.attack_logger)
m.set_caldera_server("http://www.test.test")
with patch.object(m.vm_manager, "get_playground", return_value="/vagrant/target3"):
cmd = m.create_start_caldera_client_cmd()
@ -246,7 +250,7 @@ class TestMachineControl(unittest.TestCase):
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw",
"machinepath": "target2w"})
"machinepath": "target2w"}, self.attack_logger)
m.set_caldera_server("www.test.test")
cmd = m.create_start_caldera_client_cmd()
self.maxDiff = None

Loading…
Cancel
Save