Unit tests working now

pull/3/head
Thorsten Sick 3 years ago
parent d25676032e
commit 9fc86e9a87

@ -0,0 +1,179 @@
#!/usr/bin/env python3
# A plugin to control already running vms
import os
from plugins.base.machinery import MachineryPlugin, MachineStates
from fabric import Connection
from app.exceptions import ConfigurationError, NetworkError
from invoke.exceptions import UnexpectedExit
import paramiko
import time
import socket
class RunningVMPlugin(MachineryPlugin):
# Boilerplate
name = "running_vm"
description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.c = None
self.vagrantfilepath = None
self.vagrantfile = None
def process_config(self, config):
""" Machine specific processing of configuration """
# TODO: Rename vagrantfilepath in the whole project
self.vagrantfilepath = os.path.abspath(self.config.vagrantfilepath())
self.vagrantfile = os.path.join(self.vagrantfilepath, "Vagrantfile")
if not os.path.isfile(self.vagrantfile):
raise ConfigurationError(f"Vagrantfile not existing: {self.vagrantfile}")
def create(self, reboot=True):
""" Create a machine
@param reboot: Reboot the VM during installation. Required if you want to install software
"""
return
def up(self):
""" Start a machine, create it if it does not exist """
return
def halt(self):
""" Halt a machine """
return
def destroy(self):
""" Destroy a machine """
return
def connect(self):
""" Connect to a machine """
if self.c:
return self.c
retries = 10
retry_sleep = 10
timeout = 30
while retries:
try:
if self.config.os() == "linux":
uhp = self.get_ip()
print(f"Connecting to {uhp}")
self.c = Connection(uhp, connect_timeout=timeout)
if self.config.os() == "windows":
# args = {"key_filename": self.config.ssh_keyfile() or self.v.keyfile(vm_name=self.config.vmname())}
uhp = self.get_ip()
print(f"\n\n !!!!! Connecting to {uhp} !!!!!!!!!! \n\n")
self.c = Connection(uhp, connect_timeout=timeout, user=self.config.ssh_user())
except (paramiko.ssh_exception.SSHException, socket.timeout):
print(f"Failed to connect, will retry {retries} times. Timeout: {timeout}")
retries -= 1
timeout += 10
time.sleep(retry_sleep)
else:
print(f"Connection: {self.c}")
return self.c
print("SSH network error")
raise NetworkError
def remote_run(self, cmd, disown=False):
""" Connects to the machine and runs a command there
@param disown: Send the connection into background
"""
if cmd is None:
return ""
self.connect()
cmd = cmd.strip()
print("Running VM plugin remote run: " + cmd)
print("Disown: " + str(disown))
result = None
try:
result = self.c.run(cmd, disown=disown)
print(result)
except UnexpectedExit:
return "Unexpected Exit"
if result and result.stderr:
print("Debug: Stderr: " + str(result.stderr.strip()))
if result:
return result.stdout.strip()
return ""
def put(self, src, dst):
""" Send a file to a machine
@param src: source dir
@param dst: destination
"""
self.connect()
print(f"PUT {src} -> {dst}")
res = ""
retries = 10
retry_sleep = 10
timeout = 30
while retries:
try:
res = self.c.put(src, dst)
except (paramiko.ssh_exception.SSHException, socket.timeout, UnexpectedExit):
print(f"PUT Failed to connect, will retry {retries} times. Timeout: {timeout}")
retries -= 1
timeout += 10
time.sleep(retry_sleep)
self.disconnect()
self.connect()
else:
return res
print("SSH network error on PUT command")
raise NetworkError
def get(self, src, dst):
""" Get a file to a machine
@param src: source dir
@param dst: destination
"""
self.connect()
res = ""
try:
res = self.c.get(src, dst)
except UnexpectedExit:
pass
return res
def disconnect(self):
""" Disconnect from a machine """
if self.c:
self.c.close()
self.c = None
def get_state(self):
""" Get detailed state of a machine """
return MachineStates.RUNNING
def get_ip(self):
""" Return the machine ip """
return self.config.vm_ip()

@ -0,0 +1,205 @@
#!/usr/bin/env python3
# A plugin to control vagrant machines
from plugins.base.machinery import MachineryPlugin, MachineStates
import subprocess
import vagrant
from fabric import Connection
import os
from app.exceptions import ConfigurationError
from invoke.exceptions import UnexpectedExit
# Experiment with paramiko instead of fabric. Seems fabric has some issues with the "put" command to Windows. There seems no fix (just my workarounds). Maybe paramiko is better.
class VagrantPlugin(MachineryPlugin):
# Boilerplate
name = "vagrant"
description = "A plugin for vagrant machines"
required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share
def __init__(self):
super().__init__()
self.plugin_path = __file__
self.v = None
self.c = None
self.vagrantfilepath = None
self.vagrantfile = None
def process_config(self, config):
""" Machine specific processing of configuration """
self.vagrantfilepath = os.path.abspath(self.config.vagrantfilepath())
self.vagrantfile = os.path.join(self.vagrantfilepath, "Vagrantfile")
if not os.path.isfile(self.vagrantfile):
raise ConfigurationError(f"Vagrantfile not existing: {self.vagrantfile}")
self.v = vagrant.Vagrant(root=self.vagrantfilepath)
def create(self, reboot=True):
""" Create a machine
@param reboot: Reboot the VM during installation. Required if you want to install software
"""
self.up()
if reboot:
self.halt()
self.up()
def up(self):
""" Start a machine, create it if it does not exist """
try:
self.v.up(vm_name=self.config.vmname())
except subprocess.CalledProcessError as e:
if self.v.status(vm_name=self.config.vmname())[0].state == self.v.RUNNING:
return # Everything is fine
else:
raise e
def halt(self):
""" Halt a machine """
forceit = self.config.halt_needs_force()
try:
self.v.halt(vm_name=self.config.vmname(), force=forceit)
except subprocess.CalledProcessError:
self.v.halt(vm_name=self.config.vmname(), force=True)
def destroy(self):
""" Destroy a machine """
self.v.destroy(vm_name=self.config.vmname())
def connect(self):
""" Connect to a machine. If there is already a connection we keep it """
if self.c:
return self.c
if self.config.os() == "linux":
uhp = self.v.user_hostname_port(vm_name=self.config.vmname())
print(f"Connecting to {uhp}")
self.c = Connection(uhp, connect_kwargs={"key_filename": self.v.keyfile(vm_name=self.config.vmname())})
print(self.c)
return self.c
if self.config.os() == "windows":
args = {"key_filename": os.path.join(self.sysconf["abs_machinepath_external"], self.config.ssh_keyfile())}
uhp = self.get_ip()
print(uhp)
print(args)
print(self.config.ssh_user())
# breakpoint()
self.c = Connection(uhp, user=self.config.ssh_user(), connect_kwargs=args)
print(self.c)
return self.c
def remote_run(self, cmd, disown=False):
""" Connects to the machine and runs a command there
@param disown: Send the connection into background
"""
if cmd is None:
return ""
self.connect()
cmd = cmd.strip()
print("Vagrant plugin remote run: " + cmd)
print("Disown: " + str(disown))
result = None
try:
result = self.c.run(cmd, disown=disown)
print(result)
except UnexpectedExit:
return "Unexpected Exit"
if result and result.stderr:
print("Debug: Stderr: " + str(result.stderr.strip()))
if result:
return result.stdout.strip()
return ""
def put(self, src, dst):
""" Send a file to a machine
@param src: source dir
@param dst: destination
"""
self.connect()
print(f"{src} -> {dst}")
res = ""
try:
res = self.c.put(src, dst)
except UnexpectedExit:
pass
except FileNotFoundError as e:
print(e)
return res
def get(self, src, dst):
""" Get a file to a machine
@param src: source dir
@param dst: destination
"""
self.connect()
res = ""
try:
res = self.c.get(src, dst)
except UnexpectedExit:
pass
return res
def disconnect(self):
""" Disconnect from a machine """
if self.c:
self.c.close()
self.c = None
def get_state(self):
""" Get detailed state of a machine """
vstate = self.v.status(vm_name=self.config.vmname())[0].state
# mapping vagrant states to PurpleDome states
mapping = {self.v.RUNNING: MachineStates.RUNNING,
self.v.NOT_CREATED: MachineStates.NOT_CREATED,
self.v.POWEROFF: MachineStates.POWEROFF,
self.v.ABORTED: MachineStates.ABORTED,
self.v.SAVED: MachineStates.SAVED,
self.v.STOPPED: MachineStates.STOPPED,
self.v.FROZEN: MachineStates.FROZEN,
self.v.SHUTOFF: MachineStates.SHUTOFF
}
return mapping[vstate]
def get_ip(self):
""" Return the machine ip """
# TODO: Create special code to extract windows IPs
# TODO: Find a smarter way to get the ip
# ips = []
# cmd = "ifconfig"
# res = self.vm_manager.__call_remote_run__(cmd)
# for line in res.split("\n"):
# m = re.match(r".*inet (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*", line)
# if m:
# print(m.group(1))
# ips.append(m.group(1))
filename = os.path.join(self.sysconf["abs_machinepath_external"], "ip4.txt")
with open(filename, "rt") as fh:
return fh.readline().strip()

@ -0,0 +1,28 @@
#!/usr/bin/env python3
""" Setup configuration. Required by Tox. """
import setuptools
# https://packaging.python.org/tutorials/packaging-projects/
# https://setuptools.readthedocs.io/en/latest/
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="purpledome-thorsten-sick", # Replace with your own username
version="0.0.1",
author="Thorsten Sick",
author_email="thorsten.sick@avast.com",
description="An attack environment to simulated malware attacks on targets",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/avast/PurpleDome",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
],
python_requires='>=3.6',
)

288
systems/Vagrantfile vendored

@ -0,0 +1,288 @@
# -*- mode: ruby -*-
# vi: set ft=ruby :
# All Vagrant configuration is done below. The "2" in Vagrant.configure
# configures the configuration version (we support older styles for
# backwards compatibility). Please don't change it unless you know what
# you're doing.
Vagrant.configure("2") do |config|
# Use the virtualbox provider with some common settings
config.vm.provider "virtualbox" do |v|
# Workaround Ubuntu 16.04 issue with Virtualbox where Box waits 5 minutes to start if network "cable" is not connected.
# https://github.com/chef/bento/issues/682
#v.customize ["modifyvm", :id, "--cableconnected1", "on"]
# Change network card to PCnet-FAST III
# For NAT adapter
#v.customize ["modifyvm", :id, "--nictype1", "Am79C973"]
end
config.vm.define "target1" do |target1|
# The most common configuration options are documented and commented below.
# For a complete reference, please see the online documentation at
# https://docs.vagrantup.com.
# Every Vagrant development environment requires a box. You can search for
# boxes at https://vagrantcloud.com/search.
target1.vm.box = "hashicorp/bionic64"
# target.vm.base_mac = "080027BB1475"
target1.vm.hostname = "target1"
target1.vm.define "target1"
#target1.vm.synced_folder ".", "/vagrant"
# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs
# `vagrant box outdated`. This is not recommended.
# config.vm.box_check_update = false
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine. In the example below,
# accessing "localhost:8080" will access port 80 on the guest machine.
# NOTE: This will enable public access to the opened port
# config.vm.network "forwarded_port", guest: 80, host: 8080
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine and only allow access
# via 127.0.0.1 to disable public access
# config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1"
# Create a private network, which allows host-only access to the machine
# using a specific IP.
# config.vm.network "private_network", ip: "192.168.33.10"
# Create a public network, which generally matched to bridged network.
# Bridged networks make the machine appear as another physical device on
# your network.
target1.vm.network "public_network", bridge: "enp4s0"
# Share an additional folder to the guest VM. The first argument is
# the path on the host to the actual folder. The second argument is
# the path on the guest to mount the folder. And the optional third
# argument is a set of non-required options.
# config.vm.synced_folder "../data", "/vagrant_data"
# Provider-specific configuration so you can fine-tune various
# backing providers for Vagrant. These expose provider-specific options.
# Example for VirtualBox:
#
# config.vm.provider "virtualbox" do |vb|
# # Display the VirtualBox GUI when booting the machine
# vb.gui = true
#
# # Customize the amount of memory on the VM:
# vb.memory = "1024"
# end
#
# View the documentation for the provider you are using for more
# information on available options.
# Enable provisioning with a shell script. Additional provisioners such as
# Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the
# documentation for more information about their specific syntax and use.
target1.vm.provision "shell", inline: <<-SHELL
ls /vagrant
cd /vagrant/target1
chmod +x bootstrap.sh
./bootstrap.sh
# Install implant
chmod +x hackme.sh
./hackme.sh
SHELL
end
#########
# Windows target
config.vm.define "target2" do |target2|
target2.vm.box = "windows10_64"
#target2.vm.base_mac = "080027BB1475"
#target2.vm.hostname = "target2w"
#target2.vm.define "target2w"
target2.vm.network "public_network", bridge: "enp4s0"
target2.vm.communicator = "winssh"
target2.winssh.shell ="cmd"
target2.vm.provider "virtualbox" do |vb2|
# # Display the VirtualBox GUI when booting the machine
vb2.gui = true
#
# # Customize the amount of memory on the VM:
# vb.memory = "1024"
end
# Sync
target2.vm.synced_folder ".", "/vagrant", disabled: true
# config.vm.boot_timeout
target2.ssh.username = "PURPLEDOME"
target2.ssh.private_key_path = "target2w/id_rsa.3"
end
########################
config.vm.define "target3" do |target3|
# The most common configuration options are documented and commented below.
# For a complete reference, please see the online documentation at
# https://docs.vagrantup.com.
# Every Vagrant development environment requires a box. You can search for
# boxes at https://vagrantcloud.com/search.
target3.vm.box = "ubuntu/groovy64"
# target.vm.base_mac = "080027BB1475"
target3.vm.hostname = "target3"
target3.vm.define "target3"
target3.vm.synced_folder ".", "/vagrant"
# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs
# `vagrant box outdated`. This is not recommended.
# config.vm.box_check_update = false
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine. In the example below,
# accessing "localhost:8080" will access port 80 on the guest machine.
# NOTE: This will enable public access to the opened port
# config.vm.network "forwarded_port", guest: 80, host: 8080
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine and only allow access
# via 127.0.0.1 to disable public access
# config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1"
# Create a private network, which allows host-only access to the machine
# using a specific IP.
# config.vm.network "private_network", ip: "192.168.33.10"
# target3.vm.network :private_network, ip: '192.168.178.163'
# Create a public network, which generally matched to bridged network.
# Bridged networks make the machine appear as another physical device on
# your network.
target3.vm.network "public_network", bridge: "enp4s0"
# Share an additional folder to the guest VM. The first argument is
# the path on the host to the actual folder. The second argument is
# the path on the guest to mount the folder. And the optional third
# argument is a set of non-required options.
# config.vm.synced_folder "../data", "/vagrant_data"
# Provider-specific configuration so you can fine-tune various
# backing providers for Vagrant. These expose provider-specific options.
# Example for VirtualBox:
#
# config.vm.provider "virtualbox" do |vb|
# # Display the VirtualBox GUI when booting the machine
# vb.gui = true
#
# # Customize the amount of memory on the VM:
# vb.memory = "1024"
# end
#
# View the documentation for the provider you are using for more
# information on available options.
# Enable provisioning with a shell script. Additional provisioners such as
# Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the
# documentation for more information about their specific syntax and use.
target3.vm.provision "shell", inline: <<-SHELL
ls /vagrant
cd /vagrant/target3
chmod +x bootstrap.sh
./bootstrap.sh
# Install implant
chmod +x hackme.sh
./hackme.sh
SHELL
end
#####################################################
config.vm.define "attacker" do |attacker|
# The most common configuration options are documented and commented below.
# For a complete reference, please see the online documentation at
# https://docs.vagrantup.com.
# Every Vagrant development environment requires a box. You can search for
# boxes at https://vagrantcloud.com/search.
# https://app.vagrantup.com/kalilinux/boxes/rolling
attacker.vm.box = "kalilinux/rolling"
# config.vm.box_version = "2020.3.0"
#config.vm.base_mac = "080027BB1476"
attacker.vm.hostname = "attacker"
# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs
# `vagrant box outdated`. This is not recommended.
# config.vm.box_check_update = false
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine. In the example below,
# accessing "localhost:8080" will access port 80 on the guest machine.
# NOTE: This will enable public access to the opened port
# config.vm.network "forwarded_port", guest: 80, host: 8080
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine and only allow access
# via 127.0.0.1 to disable public access
# config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1"
# Create a private network, which allows host-only access to the machine
# using a specific IP.
# config.vm.network "private_network", ip: "192.168.33.10"
# Create a public network, which generally matched to bridged network.
# Bridged networks make the machine appear as another physical device on
# your network.
attacker.vm.network "public_network", bridge: "enp4s0"
# Share an additional folder to the guest VM. The first argument is
# the path on the host to the actual folder. The second argument is
# the path on the guest to mount the folder. And the optional third
# argument is a set of non-required options.
# config.vm.synced_folder "../data", "/vagrant_data"
# Provider-specific configuration so you can fine-tune various
# backing providers for Vagrant. These expose provider-specific options.
# Example for VirtualBox:
#
#config.vm.provider "virtualbox" do |vb|
# # Display the VirtualBox GUI when booting the machine
# vb.gui = true
#
# # Customize the amount of memory on the VM:
# vb.memory = "2048"
# end
#
# View the documentation for the provider you are using for more
# information on available options.
# Enable provisioning with a shell script. Additional provisioners such as
# Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the
# documentation for more information about their specific syntax and use.
attacker.vm.provision "shell", inline: <<-SHELL
echo "Attacker1 inline script start"
ls /vagrant
cd /vagrant/attacker1
chmod +x bootstrap.sh
./bootstrap.sh
echo "Attacker ready"
SHELL
end
end

@ -0,0 +1,76 @@
# -*- mode: ruby -*-
# vi: set ft=ruby :
# All Vagrant configuration is done below. The "2" in Vagrant.configure
# configures the configuration version (we support older styles for
# backwards compatibility). Please don't change it unless you know what
# you're doing.
Vagrant.configure("2") do |config|
# The most common configuration options are documented and commented below.
# For a complete reference, please see the online documentation at
# https://docs.vagrantup.com.
# Every Vagrant development environment requires a box. You can search for
# boxes at https://vagrantcloud.com/search.
config.vm.box = "kalilinux/rolling"
config.vm.base_mac = "080027BB1476"
config.vm.hostname = "attacker1"
# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs
# `vagrant box outdated`. This is not recommended.
# config.vm.box_check_update = false
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine. In the example below,
# accessing "localhost:8080" will access port 80 on the guest machine.
# NOTE: This will enable public access to the opened port
# config.vm.network "forwarded_port", guest: 80, host: 8080
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine and only allow access
# via 127.0.0.1 to disable public access
# config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1"
# Create a private network, which allows host-only access to the machine
# using a specific IP.
# config.vm.network "private_network", ip: "192.168.33.10"
# Create a public network, which generally matched to bridged network.
# Bridged networks make the machine appear as another physical device on
# your network.
config.vm.network "public_network", bridge: "enp4s0"
# Share an additional folder to the guest VM. The first argument is
# the path on the host to the actual folder. The second argument is
# the path on the guest to mount the folder. And the optional third
# argument is a set of non-required options.
# config.vm.synced_folder "../data", "/vagrant_data"
# Provider-specific configuration so you can fine-tune various
# backing providers for Vagrant. These expose provider-specific options.
# Example for VirtualBox:
#
# config.vm.provider "virtualbox" do |vb|
# # Display the VirtualBox GUI when booting the machine
# vb.gui = true
#
# # Customize the amount of memory on the VM:
# vb.memory = "1024"
# end
#
# View the documentation for the provider you are using for more
# information on available options.
# Enable provisioning with a shell script. Additional provisioners such as
# Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the
# documentation for more information about their specific syntax and use.
config.vm.provision "shell", inline: <<-SHELL
ls /vagrant
cd /vagrant
chmod +x bootstrap.sh
./bootstrap.sh
SHELL
end

@ -0,0 +1,149 @@
###
# Caldera configuration
caldera:
###
# API key for caldera. See caldera configuration. Default is ADMIN123
apikey: ADMIN123
###
# Attacks configuration
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
###
# Name of machine in Vagrantfile
vm_name: attacker
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
# If machinepoath is not set AttackX will try "vm_name"
machinepath: attacker1
###
# OS of the VM guest. Options are so far "windows", "linux"
os: linux
###
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
###
# List of targets
targets:
###
# Specific target
target1:
vm_controller:
type: vagrant
vagrantfilepath: systems
vm_name: target1
os: linux
###
# Targets need a unique PAW name for caldera
paw: target1
###
# Targets need to be in a group for caldera
group: red
machinepath: target1
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
#root: systems/target1
vm_controller:
type: vagrant
vagrantfilepath: systems
vm_name: target2
os: windows
paw: target2w
group: red
machinepath: target2w
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
###
# Optional setting to activate force when halting the machine. Windows guests sometime get stuck
halt_needs_force: yes
###
# If SSH without vagrant support is used (Windows !) we need a user name (uppercase)
ssh_user: ATTACKX
###
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
###
# General attack config
attacks:
###
# configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match
nap_time: 5
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
###
# Linux specific attacks. A list of caldera ability IDs
linux:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Windows specific attacks. A list of caldera ability IDs
windows:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
###
# Linux specific attacks, a list
linux:
- hydra
###
# Windows specific attacks, a list
windows:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
###
# Hydra configuration
hydra:
###
# A list of protocols to brute force against. Supported: "ssh"
protocols:
- ssh
#- ftp
#- ftps
###
# A file containing potential user names
userfile: users.txt
###
# A file containing potential passwords
pwdfile: passwords.txt
###
# Settings for the results being harvested
results:
###
# The directory the loot will be in
loot_dir: loot

@ -0,0 +1,142 @@
###
# Caldera configuration
caldera:
###
# API key for caldera. See caldera configuration. Default is ADMIN123
apikey: ADMIN123
###
# Attacks configuration
attackers:
###
# Configuration for the first attacker. One should normally be enough
attacker:
###
# Defining VM controller settings for this machine
vm_controller:
###
# Type of the VM controller, Options are "vagrant"
type: vagrant
###
# # path where the vagrantfile is in
vagrantfilepath: systems
###
# Name of machine in Vagrantfile
vm_name: attacker
###
# machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path
# and will be mounted internally as /vagrant/<name>
# If machinepoath is not set PurpleDome will try "vm_name"
machinepath: attacker1
###
# OS of the VM guest. Options are so far "windows", "linux"
os: linux
###
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
###
# List of targets
targets:
###
# Specific target
target1:
vm_controller:
type: vagrant
vagrantfilepath: systems
vm_name: target1
os: linux
###
# Targets need a unique PAW name for caldera
paw: target1
###
# Targets need to be in a group for caldera
group: red
machinepath: target1
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
target2:
#root: systems/target1
vm_controller:
type: vagrant
vagrantfilepath: systems
vm_name: target2
os: windows
paw: target2w
group: red
machinepath: target2w
# Do not destroy/create the machine: Set this to "yes".
use_existing_machine: yes
###
# Optional setting to activate force when halting the machine. Windows guests sometime get stuck
halt_needs_force: yes
###
# If SSH without vagrant support is used (Windows !) we need a user name (uppercase)
ssh_user: PURPLEDOME
###
# For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required.
ssh_keyfile: id_rsa.3
###
# A list of caldera attacks to run against the targets.
caldera_attacks:
###
# Linux specific attacks. A list of caldera ability IDs
linux:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Windows specific attacks. A list of caldera ability IDs
windows:
- "bd527b63-9f9e-46e0-9816-b8434d2b8989"
###
# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra"
kali_attacks:
###
# Linux specific attacks, a list
linux:
- hydra
###
# Windows specific attacks, a list
windows:
- hydra
###
# Configuration for the kali attack tools
kali_conf:
###
# Hydra configuration
hydra:
###
# A list of protocols to brute force against. Supported: "ssh"
protocols:
- ssh
#- ftp
#- ftps
###
# A file containing potential user names
userfile: users.txt
###
# A file containing potential passwords
pwdfile: passwords.txt
###
# Settings for the results being harvested
results:
###
# The directory the loot will be in
loot_dir: loot

@ -0,0 +1,121 @@
#!/usr/bin/env python3
# Testing the attack log class
import unittest
from app.attack_log import AttackLog
# from app.exceptions import ConfigurationError
# https://docs.python.org/3/library/unittest.html
class TestMachineConfig(unittest.TestCase):
""" Test machine specific config """
def test_init(self):
""" The init is empty """
al = AttackLog()
self.assertIsNotNone(al)
self.assertEqual(al.get_dict(), [])
def test_caldera_attack_start(self):
""" Starting a caldera attack """
al = AttackLog()
source = "asource"
paw = "apaw"
group = "agroup"
ability_id = "aability_id"
ttp = "1234"
name = "aname"
description = "adescription"
al.start_caldera_attack(source=source,
paw=paw,
group=group,
ability_id=ability_id,
ttp=ttp,
name=name,
description=description
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub-type"], "caldera")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target_paw"], paw)
self.assertEqual(data[0]["target_group"], group)
self.assertEqual(data[0]["ability_id"], ability_id)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data[0]["name"], name)
self.assertEqual(data[0]["description"], description)
def test_caldera_attack_stop(self):
""" Stopping a caldera attack """
al = AttackLog()
source = "asource"
paw = "apaw"
group = "agroup"
ability_id = "aability_id"
ttp = "1234"
name = "aname"
description = "adescription"
al.stop_caldera_attack(source=source,
paw=paw,
group=group,
ability_id=ability_id,
ttp=ttp,
name=name,
description=description
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub-type"], "caldera")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target_paw"], paw)
self.assertEqual(data[0]["target_group"], group)
self.assertEqual(data[0]["ability_id"], ability_id)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
self.assertEqual(data[0]["name"], name)
self.assertEqual(data[0]["description"], description)
def test_kali_attack_start(self):
""" Starting a kali attack """
al = AttackLog()
source = "asource"
target = "a target"
ttp = "1234"
attack_name = "a name"
al.start_kali_attack(source=source,
target=target,
attack_name=attack_name,
ttp=ttp,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "start")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub-type"], "kali")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["kali_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)
def test_kali_attack_stop(self):
""" Stopping a kali attack """
al = AttackLog()
source = "asource"
target = "a target"
ttp = "1234"
attack_name = "a name"
al.stop_kali_attack(source=source,
target=target,
attack_name=attack_name,
ttp=ttp,
)
data = al.get_dict()
self.assertEqual(data[0]["event"], "stop")
self.assertEqual(data[0]["type"], "attack")
self.assertEqual(data[0]["sub-type"], "kali")
self.assertEqual(data[0]["source"], source)
self.assertEqual(data[0]["target"], target)
self.assertEqual(data[0]["kali_name"], attack_name)
self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp)

@ -0,0 +1,452 @@
import unittest
from unittest.mock import patch
from app.calderacontrol import CalderaControl
from simplejson.errors import JSONDecodeError
from app.exceptions import CalderaError
# https://docs.python.org/3/library/unittest.html
class TestExample(unittest.TestCase):
def setUp(self) -> None:
self.cc = CalderaControl("https://localhost", apikey="123")
def tearDown(self) -> None:
pass
# List links sends the right commands and post
def test_list_links(self):
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.list_links("asd")
mock_method.assert_called_once_with({"index": "link", "op_id": "asd"})
# List links gets an Exception and does not handle it (as expected)
def test_list_links_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.list_links("asd")
# list results sends the right commands and post
def test_list_results(self):
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.list_results("asd")
mock_method.assert_called_once_with({"index": "result", "link_id": "asd"})
# List results gets an Exception and does not handle it (as expected)
def test_list_results_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.list_results("asd")
# list_operations
def test_list_operations(self):
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.list_operations()
mock_method.assert_called_once_with({"index": "operations"})
# list operations gets the expected exception
def test_list_operations_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.list_operations()
# list_abilities
def test_list_abilities(self):
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.list_abilities()
mock_method.assert_called_once_with({"index": "abilities"})
# list abilities gets the expected exception
def test_list_abilities_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.list_abilities()
# list_agents
def test_list_agents(self):
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.list_agents()
mock_method.assert_called_once_with({"index": "agents"})
# list agents gets the expected exception
def test_list_agents_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.list_agents()
# list_adversaries
def test_list_adversaries(self):
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.list_adversaries()
mock_method.assert_called_once_with({"index": "adversaries"})
# list adversaries gets the expected exception
def test_list_adversaries_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.list_adversaries()
# list_objectives
def test_list_objectives(self):
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.list_objectives()
mock_method.assert_called_once_with({"index": "objectives"})
# list objectives gets the expected exception
def test_list_objectives_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.list_objectives()
# Get operation (working)
def test_get_operation(self):
a = {"name": "bar", "bar": "baz"}
with patch.object(self.cc, "list_operations", return_value=[a, {"name": "zigg", "bar": "baz"}]):
op = self.cc.get_operation("bar")
self.assertEqual(op, a)
# Get operation (no matching name)
def test_get_operation_not_available(self):
a = {"name": "bar", "bar": "baz"}
with patch.object(self.cc, "list_operations", return_value=[a, {"name": "zigg", "bar": "baz"}]):
op = self.cc.get_operation("baaaaar")
self.assertEqual(op, None)
# get_adversary
def test_get_adversary(self):
a = {"name": "bar", "bar": "baz"}
with patch.object(self.cc, "list_adversaries", return_value=[a, {"name": "zigg", "bar": "baz"}]):
op = self.cc.get_adversary("bar")
self.assertEqual(op, a)
# get_adversary (no matching name)
def test_get_adversary_not_available(self):
a = {"name": "bar", "bar": "baz"}
with patch.object(self.cc, "list_adversaries", return_value=[a, {"name": "zigg", "bar": "baz"}]):
op = self.cc.get_adversary("baaaar")
self.assertEqual(op, None)
# get_objective
def test_get_objective(self):
a = {"name": "bar", "bar": "baz"}
with patch.object(self.cc, "list_objectives", return_value=[a, {"name": "zigg", "bar": "baz"}]):
op = self.cc.get_objective("bar")
self.assertEqual(op, a)
# get_objective (no matching name)
def test_get_objective_not_available(self):
a = {"name": "bar", "bar": "baz"}
with patch.object(self.cc, "list_objectives", return_value=[a, {"name": "zigg", "bar": "baz"}]):
op = self.cc.get_objective("baaaar")
self.assertEqual(op, None)
# get_ability
def test_get_ability(self):
a = {"ability_id": "bar", "bar": "baz"}
with patch.object(self.cc, "list_abilities", return_value=[a, {"ability_id": "zigg", "bar": "baz"}]):
op = self.cc.get_ability("bar")
self.assertEqual(op, [a])
# get_ability (no matching name)
def test_get_ability_not_available(self):
a = {"ability_id": "bar", "bar": "baz"}
with patch.object(self.cc, "list_abilities", return_value=[a, {"ability_id": "zigg", "bar": "baz"}]):
op = self.cc.get_ability("baaaar")
self.assertEqual(op, [])
# get_operation_by_id
def test_get_operation_by_id(self):
opid = "FooBar"
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.get_operation_by_id(opid)
mock_method.assert_called_once_with({"index": "operations", "id": opid})
# get_operation_by_id gets the expected exception
def test_get_operation_by_id_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.get_result_by_id("FooBar")
# get_result_by_id
def test_get_result_by_id(self):
opid = "FooBar"
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.get_result_by_id(opid)
mock_method.assert_called_once_with({"index": "result", "link_id": opid})
# get_result_by_id gets the expected exception
def test_get_result_by_id_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.get_result_by_id("FooBar")
# get_linkid
def test_get_linkid(self):
paw = "FooBar"
ability_id = "AID"
alink = {"paw": paw,
"ability": {"ability_id": ability_id},
"id": "Getme"}
op = [{"chain": [alink]}]
with patch.object(self.cc, "get_operation_by_id", return_value=op):
res = self.cc.get_linkid("Foo", paw, ability_id)
self.assertEqual(res, "Getme")
# get missing link id
def test_get_linkid_missing(self):
paw = "FooBar"
ability_id = "AID"
alink = {"paw": paw,
"ability": {"ability_id": ability_id},
"id": "Getme"}
op = [{"chain": [alink]}]
with patch.object(self.cc, "get_operation_by_id", return_value=op):
res = self.cc.get_linkid("Foo", "Bar", ability_id)
self.assertEqual(res, None)
# view_operation_report
def test_view_operation_report(self):
opid = "FooBar"
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.view_operation_report(opid)
mock_method.assert_called_once_with({"index": "operation_report", "op_id": opid, "agent_output": 1})
# get_result_by_id gets the expected exception
def test_view_operation_report_with_exception(self):
with self.assertRaises(JSONDecodeError):
with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)):
self.cc.view_operation_report("FooBar")
# view_operation_output
def test_view_operation_output(self):
opid = "OPID_123"
paw = "PAW_123"
ability_id = "AID_123"
or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}}
with patch.object(self.cc, "view_operation_report", return_value=or_ret):
res = self.cc.view_operation_output(opid, paw, ability_id)
self.assertEqual(res, "red")
# view_operation_output paw missing
def test_view_operation_output_paw_missing(self):
opid = "OPID_123"
paw = "PAW_123"
ability_id = "AID_123"
or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}}
with self.assertRaises(CalderaError):
with patch.object(self.cc, "view_operation_report", return_value=or_ret):
self.cc.view_operation_output(opid, "missing", ability_id)
# view_operation_output ability_id missing
def test_view_operation_output_ability_id_missing(self):
opid = "OPID_123"
paw = "PAW_123"
ability_id = "AID_123"
or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}}
with patch.object(self.cc, "view_operation_report", return_value=or_ret):
res = self.cc.view_operation_output(opid, paw, "missing")
self.assertEqual(res, None)
# add_operation
def test_add_operation(self):
name = "test_name"
state = "test_state"
group = "test_group"
advid = "test_id"
exp = {"index": "operations",
"name": name,
"state": state,
"autonomous": 1,
'obfuscator': 'plain-text',
'auto_close': '1',
'jitter': '4/8',
'source': 'Alice Filters',
'visibility': '50',
"group": group,
"planner": "atomic",
"adversary_id": advid,
}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.add_operation(name, advid, group, state)
mock_method.assert_called_once_with(exp, method="put")
# add_operation defaults
def test_add_operation_defaults(self):
name = "test_name"
advid = "test_id"
exp = {"index": "operations",
"name": name,
"state": "running", # default
"autonomous": 1,
'obfuscator': 'plain-text',
'auto_close': '1',
'jitter': '4/8',
'source': 'Alice Filters',
'visibility': '50',
"group": "red", # default
"planner": "atomic",
"adversary_id": advid,
}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.add_operation(name, advid)
mock_method.assert_called_once_with(exp, method="put")
# add_adversary
def test_add_adversary(self):
name = "test_name"
ability = "test_ability"
description = "test_descritption"
exp = {"index": "adversaries",
"name": name,
"description": description,
"atomic_ordering": [{"id": ability}],
#
"objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective
}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.add_adversary(name, ability, description)
mock_method.assert_called_once_with(exp, method="put")
def test_add_adversary_default(self):
name = "test_name"
ability = "test_ability"
exp = {"index": "adversaries",
"name": name,
"description": "created automatically",
"atomic_ordering": [{"id": ability}],
#
"objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective
}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.add_adversary(name, ability)
mock_method.assert_called_once_with(exp, method="put")
# execute_ability
def test_execute_ability(self):
paw = "test_paw"
ability_id = "test_ability"
obfuscator = "plain-text"
exp = {"paw": paw,
"ability_id": ability_id,
"obfuscator": obfuscator}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.execute_ability(paw, ability_id, obfuscator)
mock_method.assert_called_once_with(exp, rest_path="plugin/access/exploit_ex")
def test_execute_ability_default(self):
paw = "test_paw"
ability_id = "test_ability"
exp = {"paw": paw,
"ability_id": ability_id,
"obfuscator": "plain-text"}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.execute_ability(paw, ability_id)
mock_method.assert_called_once_with(exp, rest_path="plugin/access/exploit_ex")
# execute_operation
def test_execute_operation(self):
operation_id = "test_opid"
state = "paused"
exp = {"index": "operation",
"op_id": operation_id,
"state": state}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.execute_operation(operation_id, state)
mock_method.assert_called_once_with(exp)
# not supported state
def test_execute_operation_not_supported(self):
operation_id = "test_opid"
state = "not supported"
with self.assertRaises(ValueError):
with patch.object(self.cc, "__contact_server__", return_value=None):
self.cc.execute_operation(operation_id, state)
def test_execute_operation_default(self):
operation_id = "test_opid"
exp = {"index": "operation",
"op_id": operation_id,
"state": "running" # default
}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.execute_operation(operation_id)
mock_method.assert_called_once_with(exp)
# delete_operation
def test_delete_operation(self):
opid = "test_opid"
exp = {"index": "operations",
"id": opid}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.delete_operation(opid)
mock_method.assert_called_once_with(exp, method="delete")
# delete_adversary
def test_delete_adversary(self):
adid = "test_adid"
exp = {"index": "adversaries",
"adversary_id": [{"adversary_id": adid}]}
with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method:
self.cc.delete_adversary(adid)
mock_method.assert_called_once_with(exp, method="delete")
# is_operation_finished
def test_is_operation_finished_true(self):
opdata = [{"state": "finished", "chain": [{"status": 0}]}]
opid = "does not matter"
with patch.object(self.cc, "get_operation_by_id", return_value=opdata):
res = self.cc.is_operation_finished(opid)
self.assertEqual(res, True)
def test_is_operation_finished_false(self):
opdata = [{"state": "running", "chain": [{"status": 1}]}]
opid = "does not matter"
with patch.object(self.cc, "get_operation_by_id", return_value=opdata):
res = self.cc.is_operation_finished(opid)
self.assertEqual(res, False)
def test_is_operation_finished_exception(self):
opdata = [{"chain": [{"statusa": 1}]}]
opid = "does not matter"
with self.assertRaises(CalderaError):
with patch.object(self.cc, "get_operation_by_id", return_value=opdata):
self.cc.is_operation_finished(opid)
def test_is_operation_finished_exception2(self):
opdata = []
opid = "does not matter"
with self.assertRaises(CalderaError):
with patch.object(self.cc, "get_operation_by_id", return_value=opdata):
self.cc.is_operation_finished(opid)
# TODO attack (lots of complexity !)
# TODO test fetch_client
# TODO test __contact_server__
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,536 @@
#!/usr/bin/env python3
# Testing the central configuration loader
import unittest
# import os
from app.config import ExperimentConfig, MachineConfig
from app.exceptions import ConfigurationError
# https://docs.python.org/3/library/unittest.html
class TestMachineConfig(unittest.TestCase):
""" Test machine specific config """
def test_empty_init(self):
""" The init is empty """
with self.assertRaises(ConfigurationError):
MachineConfig(None)
def test_basic_init(self):
""" The init is basic and working """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.raw_config["root"], "systems/attacker1")
self.assertEqual(mc.raw_config["vm_controller"]["type"], "vagrant")
def test_missing_vm_name(self):
""" The vm name is missing """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
}})
def test_use_existing_machine_is_true(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": True})
self.assertEqual(mc.use_existing_machine(), True)
def test_use_existing_machine_is_false(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.use_existing_machine(), False)
def test_use_existing_machine_is_default(self):
""" Testing use_existing:machine setting """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.use_existing_machine(), False)
def test_windows_is_valid_os(self):
""" Testing if windows is valid os """
mc = MachineConfig({"root": "systems/attacker1",
"os": "windows",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.os(), "windows")
def test_windows_is_valid_os_casefix(self):
""" Testing if windows is valid os - using lowercase fix"""
mc = MachineConfig({"root": "systems/attacker1",
"os": "WINDOWS",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.os(), "windows")
def test_linux_is_valid_os(self):
""" Testing if windows is valid os """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.os(), "linux")
def test_missing_os(self):
""" The os is missing """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_wrong_os(self):
""" The os is wrong """
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "BROKEN",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_vagrant_is_valid_vmcontroller(self):
""" Testing if vagrant is valid vmcontroller """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.vmcontroller(), "vagrant")
def test_vagrant_is_valid_vmcontroller_casefix(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "VAGRANT",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.vmcontroller(), "vagrant")
def test_invalid_vmcontroller(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "BROKEN",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
def test_missing_vmcontroller_2(self):
""" Testing if vagrant is valid vmcontroller case fixxed"""
with self.assertRaises(ConfigurationError):
MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_name": "target1"})
def test_vagrant_is_valid_vmip(self):
""" Testing if vagrant is valid ip/url """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"ip": "kali",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.vm_ip(), "kali")
def test_missing_vmip(self):
""" Testing if missing vm ip is handled"""
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1"})
self.assertEqual(mc.vm_ip(), None)
def test_machinepath(self):
""" Testing machinepath setting """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False,
"machinepath": "foo"})
self.assertEqual(mc.machinepath(), "foo")
def test_machinepath_fallback(self):
""" Testing machinepath setting fallback to vmname"""
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.machinepath(), "target1")
def test_paw(self):
""" Testing for caldera paw """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"paw": "Bar",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.caldera_paw(), "Bar")
def test_paw_fallback(self):
""" Testing for caldera paw fallback """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.caldera_paw(), None)
def test_group(self):
""" Testing for caldera group """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"group": "Bar",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.caldera_group(), "Bar")
def test_group_fallback(self):
""" Testing for caldera group fallback """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.caldera_group(), None)
def test_ssh_keyfile(self):
""" Testing keyfile config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"ssh_keyfile": "Bar",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.ssh_keyfile(), "Bar")
def test_ssh_keyfile_default(self):
""" Testing keyfile config default """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.ssh_keyfile(), None)
def test_ssh_user(self):
""" Testing ssh user config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"ssh_user": "Bob",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.ssh_user(), "Bob")
def test_ssh_user_default(self):
""" Testing ssh user default config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.ssh_user(), "vagrant")
def test_halt_needs_force_default(self):
""" Testing 'halt needs force' default config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.halt_needs_force(), False)
def test_halt_needs_force(self):
""" Testing 'halt needs force' config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.halt_needs_force(), True)
def test_vagrantfilepath(self):
""" Testing vagrantfilepath config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.vagrantfilepath(), "systems")
def test_vagrantfilepath_missing(self):
""" Testing missing vagrantfilepath config """
with self.assertRaises(ConfigurationError):
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
mc.vagrantfilepath()
def test_sensors_empty(self):
""" Testing empty sensor config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.sensors(), [])
def test_sensors_set(self):
""" Testing empty sensor config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"sensors": ["linux_idp", "test_sensor"]})
self.assertEqual(mc.sensors(), ["linux_idp", "test_sensor"])
def test_vulnerabilities_empty(self):
""" Testing empty vulnerabilities config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False})
self.assertEqual(mc.vulnerabilities(), [])
def test_vulnerabilities_set(self):
""" Testing empty vulnerabilities config """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"vulnerabilities": ["PEBKAC", "USER"]})
self.assertEqual(mc.vulnerabilities(), ["PEBKAC", "USER"])
def test_active_not_set(self):
""" machine active not set """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"sensors": ["linux_idp", "test_sensor"]})
self.assertEqual(mc.is_active(), True)
def test_active_is_false(self):
""" machine active is set to false """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"active": False,
"sensors": ["linux_idp", "test_sensor"]})
self.assertEqual(mc.is_active(), False)
def test_active_is_true(self):
""" machine active is set to true """
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"halt_needs_force": True,
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target1",
"use_existing_machine": False,
"active": True,
"sensors": ["linux_idp", "test_sensor"]})
self.assertEqual(mc.is_active(), True)
class TestExperimentConfig(unittest.TestCase):
def test_missing_config_file(self):
""" Missing config file """
with self.assertRaises(FileNotFoundError):
ExperimentConfig("tests/data/missing.yaml")
def test_basic_loading(self):
""" Existing, basic config file, testing the values are loaded properly """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.raw_config["caldera"]["apikey"], "ADMIN123")
self.assertEqual(ex.caldera_apikey(), "ADMIN123")
def test_basic_loading_targets_read(self):
""" Targets in config: can be found """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(len(ex._targets), 2)
self.assertEqual(ex._targets[0].raw_config["vm_name"], "target1")
self.assertEqual(ex._targets[0].vmname(), "target1")
self.assertEqual(ex.targets()[0].vmname(), "target1")
def test_basic_loading_attacker_read(self):
""" Attackers in config: can be found """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(len(ex._targets), 2)
self.assertEqual(ex._attackers[0].raw_config["vm_name"], "attacker")
self.assertEqual(ex._attackers[0].vmname(), "attacker")
self.assertEqual(ex.attackers()[0].vmname(), "attacker")
self.assertEqual(ex.attacker(0).vmname(), "attacker")
def test_missing_kali_config(self):
""" Getting kali config for a specific attack. Attack missing """
ex = ExperimentConfig("tests/data/basic.yaml")
with self.assertRaises(ConfigurationError):
ex.kali_conf("BOOM")
def test_working_kali_config(self):
""" Getting kali config for a specific attack """
ex = ExperimentConfig("tests/data/basic.yaml")
data = ex.kali_conf("hydra")
self.assertEqual(data["userfile"], "users.txt")
def test_nap_time(self):
""" nap time is set """
ex = ExperimentConfig("tests/data/basic.yaml")
self.assertEqual(ex.get_nap_time(), 5)
def test_nap_time_not_set(self):
""" nap time is not set """
ex = ExperimentConfig("tests/data/nap_time_missing.yaml")
self.assertEqual(ex.get_nap_time(), 0)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,30 @@
import unittest
# https://docs.python.org/3/library/unittest.html
class TestExample2(unittest.TestCase):
def setUp(self) -> None:
pass
def tearDown(self) -> None:
pass
def test_upper(self):
self.assertEqual('foo'.upper(), 'FOO')
def test_isupper(self):
self.assertTrue('FOO'.isupper())
self.assertFalse('Foo'.isupper())
def test_split(self):
s = 'hello world'
self.assertEqual(s.split(), ['hello', 'world'])
# check that s.split fails when the separator is not a string
with self.assertRaises(TypeError):
s.split(2)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,259 @@
import unittest
import os
from app.machinecontrol import Machine
from app.exceptions import ConfigurationError
from app.config import MachineConfig
from unittest.mock import patch
# https://docs.python.org/3/library/unittest.html
class TestMachineControl(unittest.TestCase):
def test_get_os_linux_machine(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
self.assertEqual(m.get_os(), "linux")
def test_get_os_linux_machine_with_config_class(self):
mc = MachineConfig({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
m = Machine(mc)
self.assertEqual(m.get_os(), "linux")
def test_get_os_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
def test_get_os_not_supported(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "nintendo_switch",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
def test_get_paw_good(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"paw": "testme",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
self.assertEqual(m.get_paw(), "testme")
def test_get_paw_missing(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
self.assertEqual(m.get_paw(), None)
def test_get_group_good(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"group": "testme",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"})
self.assertEqual(m.get_group(), "testme")
def test_get_group_missing(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
self.assertEqual(m.get_group(), None)
def test_vagrantfilepath_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
},
"vm_name": "target3"
})
def test_vagrantfile_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "non_existing",
},
"vm_name": "target3"
})
def test_vagrantfile_existing(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
self.assertIsNotNone(m)
def test_name_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
})
# test: auto generated, dir missing
def test_auto_generated_machinepath_with_path_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "missing"
})
# test manual config, dir missing
def test_configured_machinepath_with_path_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"machinepath": "missing"
})
# test auto generated, dir there (external/internal dirs must work !)
def test_auto_generated_machinepath_with_good_config(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3"
})
vagrantfilepath = os.path.abspath("systems")
ext = os.path.join(vagrantfilepath, "target3")
internal = os.path.join("/vagrant/", "target3")
self.assertEqual(m.abs_machinepath_external, ext)
self.assertEqual(m.abs_machinepath_internal, internal)
# test: manual config, dir there (external/internal dirs must work !)
def test_configured_machinepath_with_good_config(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "missing",
"machinepath": "target3"
})
vagrantfilepath = os.path.abspath("systems")
ext = os.path.join(vagrantfilepath, "target3")
internal = os.path.join("/vagrant/", "target3")
self.assertEqual(m.abs_machinepath_external, ext)
self.assertEqual(m.abs_machinepath_internal, internal)
# vm_controller missing
def test_configured_vm_controller_missing(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_name": "missing",
"machinepath": "target3"
})
# vm_controller wrong
def test_configured_vm_controller_wrong_type(self):
with self.assertRaises(ConfigurationError):
Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "wrong_controller",
"vagrantfilepath": "systems",
},
"vm_name": "missing",
"machinepath": "target3"
})
# Create caldera start command and verify it
def test_get_linux_caldera_start_cmd(self):
m = Machine({"root": "systems/attacker1",
"os": "linux",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw"})
m.set_caldera_server("http://www.test.test")
with patch.object(m.vm_manager, "get_playground", return_value="/vagrant/target3"):
cmd = m.create_start_caldera_client_cmd()
self.assertEqual(cmd.strip(), "nohup /vagrant/target3/caldera_agent.sh start &".strip())
# Create caldera start command and verify it (windows)
def test_get_windows_caldera_start_cmd(self):
m = Machine({"root": "systems/attacker1",
"os": "windows",
"vm_controller": {
"type": "vagrant",
"vagrantfilepath": "systems",
},
"vm_name": "target3",
"group": "testgroup",
"paw": "testpaw",
"machinepath": "target2w"})
m.set_caldera_server("www.test.test")
cmd = m.create_start_caldera_client_cmd()
self.maxDiff = None
expected = """
caldera_agent.bat"""
self.assertEqual(cmd.strip(), expected.strip())
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,53 @@
# tox (https://tox.readthedocs.io/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py38
[flake8]
# E501 Line length. Ignored here. But still: please do not abuse the freedom
# W291 trailing whitespace
ignore = E501,
W291
exclude =
.git,
vm_images,
venv,
vagrant_templates,
vagrantboxes,
.vagrant
systems,
real_machines_to_build_vagrant_boxes_from,
__pycache__,
.idea,
htmlcov,
.tox,
max-complexity = 10
[testenv]
deps = -rrequirements.txt
flake8
safety
bandit
pylint
commands =
# python -m unittest discover -s tests
coverage run --source=app -m unittest discover -s tests
# Gotta ignore some flake8 warnings, because they are maskerading real issues.
# Ignoring:
# C901 complex code. Reduce complexitiy. But this thing is over-reacting
# E501: line too long. Please: Still keep it short. But 80 chars is just incredibly short nowadays
flake8 --ignore C901,E501
# Check if dependencies are vulnerable
safety check -r requirements.txt
# Check for common vulnerabilities
bandit -ll -r app/ plugins/ *.py
# Linting
# pylint *.py # currently off. Needs configuration
Loading…
Cancel
Save