diff --git a/plugins/default/vm_controller/running_vm/running_vm_plugin.py b/plugins/default/vm_controller/running_vm/running_vm_plugin.py new file mode 100644 index 0000000..7c382c4 --- /dev/null +++ b/plugins/default/vm_controller/running_vm/running_vm_plugin.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 + +# A plugin to control already running vms + +import os +from plugins.base.machinery import MachineryPlugin, MachineStates +from fabric import Connection +from app.exceptions import ConfigurationError, NetworkError +from invoke.exceptions import UnexpectedExit +import paramiko +import time +import socket + + +class RunningVMPlugin(MachineryPlugin): + + # Boilerplate + name = "running_vm" + description = "A plugin to handle already running machines. The machine will not be started/stopped by this plugin" + + required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share + + def __init__(self): + super().__init__() + self.plugin_path = __file__ + self.c = None + self.vagrantfilepath = None + self.vagrantfile = None + + def process_config(self, config): + """ Machine specific processing of configuration """ + + # TODO: Rename vagrantfilepath in the whole project + self.vagrantfilepath = os.path.abspath(self.config.vagrantfilepath()) + self.vagrantfile = os.path.join(self.vagrantfilepath, "Vagrantfile") + if not os.path.isfile(self.vagrantfile): + raise ConfigurationError(f"Vagrantfile not existing: {self.vagrantfile}") + + def create(self, reboot=True): + """ Create a machine + + @param reboot: Reboot the VM during installation. Required if you want to install software + """ + return + + def up(self): + """ Start a machine, create it if it does not exist """ + return + + def halt(self): + """ Halt a machine """ + return + + def destroy(self): + """ Destroy a machine """ + return + + def connect(self): + """ Connect to a machine """ + + if self.c: + return self.c + + retries = 10 + retry_sleep = 10 + timeout = 30 + while retries: + try: + if self.config.os() == "linux": + uhp = self.get_ip() + print(f"Connecting to {uhp}") + self.c = Connection(uhp, connect_timeout=timeout) + + if self.config.os() == "windows": + # args = {"key_filename": self.config.ssh_keyfile() or self.v.keyfile(vm_name=self.config.vmname())} + uhp = self.get_ip() + print(f"\n\n !!!!! Connecting to {uhp} !!!!!!!!!! \n\n") + self.c = Connection(uhp, connect_timeout=timeout, user=self.config.ssh_user()) + except (paramiko.ssh_exception.SSHException, socket.timeout): + print(f"Failed to connect, will retry {retries} times. Timeout: {timeout}") + retries -= 1 + timeout += 10 + time.sleep(retry_sleep) + else: + print(f"Connection: {self.c}") + return self.c + + print("SSH network error") + raise NetworkError + + def remote_run(self, cmd, disown=False): + """ Connects to the machine and runs a command there + + @param disown: Send the connection into background + """ + + if cmd is None: + return "" + + self.connect() + cmd = cmd.strip() + + print("Running VM plugin remote run: " + cmd) + print("Disown: " + str(disown)) + result = None + try: + result = self.c.run(cmd, disown=disown) + print(result) + except UnexpectedExit: + return "Unexpected Exit" + + if result and result.stderr: + print("Debug: Stderr: " + str(result.stderr.strip())) + + if result: + return result.stdout.strip() + + return "" + + def put(self, src, dst): + """ Send a file to a machine + + @param src: source dir + @param dst: destination + """ + self.connect() + + print(f"PUT {src} -> {dst}") + + res = "" + retries = 10 + retry_sleep = 10 + timeout = 30 + while retries: + try: + res = self.c.put(src, dst) + except (paramiko.ssh_exception.SSHException, socket.timeout, UnexpectedExit): + print(f"PUT Failed to connect, will retry {retries} times. Timeout: {timeout}") + retries -= 1 + timeout += 10 + time.sleep(retry_sleep) + self.disconnect() + self.connect() + else: + return res + print("SSH network error on PUT command") + raise NetworkError + + def get(self, src, dst): + """ Get a file to a machine + + @param src: source dir + @param dst: destination + """ + self.connect() + + res = "" + try: + res = self.c.get(src, dst) + except UnexpectedExit: + pass + + return res + + def disconnect(self): + """ Disconnect from a machine """ + if self.c: + self.c.close() + self.c = None + + def get_state(self): + """ Get detailed state of a machine """ + + return MachineStates.RUNNING + + def get_ip(self): + """ Return the machine ip """ + + return self.config.vm_ip() diff --git a/plugins/default/vm_controller/vagrant/vagrant_plugin.py b/plugins/default/vm_controller/vagrant/vagrant_plugin.py new file mode 100644 index 0000000..d1532b1 --- /dev/null +++ b/plugins/default/vm_controller/vagrant/vagrant_plugin.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 + +# A plugin to control vagrant machines + +from plugins.base.machinery import MachineryPlugin, MachineStates +import subprocess +import vagrant +from fabric import Connection +import os +from app.exceptions import ConfigurationError +from invoke.exceptions import UnexpectedExit + +# Experiment with paramiko instead of fabric. Seems fabric has some issues with the "put" command to Windows. There seems no fix (just my workarounds). Maybe paramiko is better. + + +class VagrantPlugin(MachineryPlugin): + + # Boilerplate + name = "vagrant" + description = "A plugin for vagrant machines" + + required_files = [] # Files shipped with the plugin which are needed by the machine. Will be copied to the share + + def __init__(self): + super().__init__() + self.plugin_path = __file__ + self.v = None + self.c = None + self.vagrantfilepath = None + self.vagrantfile = None + + def process_config(self, config): + """ Machine specific processing of configuration """ + + self.vagrantfilepath = os.path.abspath(self.config.vagrantfilepath()) + self.vagrantfile = os.path.join(self.vagrantfilepath, "Vagrantfile") + if not os.path.isfile(self.vagrantfile): + raise ConfigurationError(f"Vagrantfile not existing: {self.vagrantfile}") + self.v = vagrant.Vagrant(root=self.vagrantfilepath) + + def create(self, reboot=True): + """ Create a machine + + @param reboot: Reboot the VM during installation. Required if you want to install software + """ + self.up() + if reboot: + self.halt() + self.up() + + def up(self): + """ Start a machine, create it if it does not exist """ + try: + self.v.up(vm_name=self.config.vmname()) + except subprocess.CalledProcessError as e: + if self.v.status(vm_name=self.config.vmname())[0].state == self.v.RUNNING: + return # Everything is fine + else: + raise e + + def halt(self): + """ Halt a machine """ + forceit = self.config.halt_needs_force() + try: + self.v.halt(vm_name=self.config.vmname(), force=forceit) + except subprocess.CalledProcessError: + self.v.halt(vm_name=self.config.vmname(), force=True) + + def destroy(self): + """ Destroy a machine """ + self.v.destroy(vm_name=self.config.vmname()) + + def connect(self): + """ Connect to a machine. If there is already a connection we keep it """ + + if self.c: + return self.c + + if self.config.os() == "linux": + uhp = self.v.user_hostname_port(vm_name=self.config.vmname()) + print(f"Connecting to {uhp}") + self.c = Connection(uhp, connect_kwargs={"key_filename": self.v.keyfile(vm_name=self.config.vmname())}) + print(self.c) + return self.c + + if self.config.os() == "windows": + args = {"key_filename": os.path.join(self.sysconf["abs_machinepath_external"], self.config.ssh_keyfile())} + uhp = self.get_ip() + print(uhp) + print(args) + print(self.config.ssh_user()) + # breakpoint() + self.c = Connection(uhp, user=self.config.ssh_user(), connect_kwargs=args) + print(self.c) + return self.c + + def remote_run(self, cmd, disown=False): + """ Connects to the machine and runs a command there + + @param disown: Send the connection into background + """ + + if cmd is None: + return "" + + self.connect() + cmd = cmd.strip() + + print("Vagrant plugin remote run: " + cmd) + print("Disown: " + str(disown)) + result = None + try: + result = self.c.run(cmd, disown=disown) + print(result) + except UnexpectedExit: + return "Unexpected Exit" + + if result and result.stderr: + print("Debug: Stderr: " + str(result.stderr.strip())) + + if result: + return result.stdout.strip() + + return "" + + def put(self, src, dst): + """ Send a file to a machine + + @param src: source dir + @param dst: destination + """ + self.connect() + + print(f"{src} -> {dst}") + + res = "" + try: + res = self.c.put(src, dst) + except UnexpectedExit: + pass + except FileNotFoundError as e: + print(e) + + return res + + def get(self, src, dst): + """ Get a file to a machine + + @param src: source dir + @param dst: destination + """ + self.connect() + + res = "" + try: + res = self.c.get(src, dst) + except UnexpectedExit: + pass + + return res + + def disconnect(self): + """ Disconnect from a machine """ + if self.c: + self.c.close() + self.c = None + + def get_state(self): + """ Get detailed state of a machine """ + + vstate = self.v.status(vm_name=self.config.vmname())[0].state + + # mapping vagrant states to PurpleDome states + mapping = {self.v.RUNNING: MachineStates.RUNNING, + self.v.NOT_CREATED: MachineStates.NOT_CREATED, + self.v.POWEROFF: MachineStates.POWEROFF, + self.v.ABORTED: MachineStates.ABORTED, + self.v.SAVED: MachineStates.SAVED, + self.v.STOPPED: MachineStates.STOPPED, + self.v.FROZEN: MachineStates.FROZEN, + self.v.SHUTOFF: MachineStates.SHUTOFF + } + + return mapping[vstate] + + def get_ip(self): + """ Return the machine ip """ + + # TODO: Create special code to extract windows IPs + + # TODO: Find a smarter way to get the ip + + # ips = [] + # cmd = "ifconfig" + # res = self.vm_manager.__call_remote_run__(cmd) + + # for line in res.split("\n"): + # m = re.match(r".*inet (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*", line) + # if m: + # print(m.group(1)) + # ips.append(m.group(1)) + + filename = os.path.join(self.sysconf["abs_machinepath_external"], "ip4.txt") + with open(filename, "rt") as fh: + return fh.readline().strip() diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8440780 --- /dev/null +++ b/setup.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +""" Setup configuration. Required by Tox. """ + +import setuptools + + +# https://packaging.python.org/tutorials/packaging-projects/ +# https://setuptools.readthedocs.io/en/latest/ + +with open("README.md", "r") as fh: + long_description = fh.read() + +setuptools.setup( + name="purpledome-thorsten-sick", # Replace with your own username + version="0.0.1", + author="Thorsten Sick", + author_email="thorsten.sick@avast.com", + description="An attack environment to simulated malware attacks on targets", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/avast/PurpleDome", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", + ], + python_requires='>=3.6', +) diff --git a/systems/Vagrantfile b/systems/Vagrantfile new file mode 100644 index 0000000..fc5a939 --- /dev/null +++ b/systems/Vagrantfile @@ -0,0 +1,288 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +# All Vagrant configuration is done below. The "2" in Vagrant.configure +# configures the configuration version (we support older styles for +# backwards compatibility). Please don't change it unless you know what +# you're doing. +Vagrant.configure("2") do |config| + + # Use the virtualbox provider with some common settings + config.vm.provider "virtualbox" do |v| + # Workaround Ubuntu 16.04 issue with Virtualbox where Box waits 5 minutes to start if network "cable" is not connected. + # https://github.com/chef/bento/issues/682 + #v.customize ["modifyvm", :id, "--cableconnected1", "on"] + # Change network card to PCnet-FAST III + # For NAT adapter + #v.customize ["modifyvm", :id, "--nictype1", "Am79C973"] + + end + + config.vm.define "target1" do |target1| + # The most common configuration options are documented and commented below. + # For a complete reference, please see the online documentation at + # https://docs.vagrantup.com. + + # Every Vagrant development environment requires a box. You can search for + # boxes at https://vagrantcloud.com/search. + target1.vm.box = "hashicorp/bionic64" + # target.vm.base_mac = "080027BB1475" + target1.vm.hostname = "target1" + target1.vm.define "target1" + + #target1.vm.synced_folder ".", "/vagrant" + + + # Disable automatic box update checking. If you disable this, then + # boxes will only be checked for updates when the user runs + # `vagrant box outdated`. This is not recommended. + # config.vm.box_check_update = false + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine. In the example below, + # accessing "localhost:8080" will access port 80 on the guest machine. + # NOTE: This will enable public access to the opened port + # config.vm.network "forwarded_port", guest: 80, host: 8080 + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine and only allow access + # via 127.0.0.1 to disable public access + # config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1" + + # Create a private network, which allows host-only access to the machine + # using a specific IP. + # config.vm.network "private_network", ip: "192.168.33.10" + + # Create a public network, which generally matched to bridged network. + # Bridged networks make the machine appear as another physical device on + # your network. + target1.vm.network "public_network", bridge: "enp4s0" + + # Share an additional folder to the guest VM. The first argument is + # the path on the host to the actual folder. The second argument is + # the path on the guest to mount the folder. And the optional third + # argument is a set of non-required options. + # config.vm.synced_folder "../data", "/vagrant_data" + + # Provider-specific configuration so you can fine-tune various + # backing providers for Vagrant. These expose provider-specific options. + # Example for VirtualBox: + # + # config.vm.provider "virtualbox" do |vb| + # # Display the VirtualBox GUI when booting the machine + # vb.gui = true + # + # # Customize the amount of memory on the VM: + # vb.memory = "1024" + # end + # + # View the documentation for the provider you are using for more + # information on available options. + + # Enable provisioning with a shell script. Additional provisioners such as + # Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the + # documentation for more information about their specific syntax and use. + target1.vm.provision "shell", inline: <<-SHELL + ls /vagrant + cd /vagrant/target1 + chmod +x bootstrap.sh + ./bootstrap.sh + + # Install implant + chmod +x hackme.sh + ./hackme.sh + + SHELL + end + + ######### + + # Windows target + config.vm.define "target2" do |target2| + target2.vm.box = "windows10_64" + #target2.vm.base_mac = "080027BB1475" + #target2.vm.hostname = "target2w" + #target2.vm.define "target2w" + + target2.vm.network "public_network", bridge: "enp4s0" + + target2.vm.communicator = "winssh" + target2.winssh.shell ="cmd" + + target2.vm.provider "virtualbox" do |vb2| + # # Display the VirtualBox GUI when booting the machine + vb2.gui = true + # + # # Customize the amount of memory on the VM: + # vb.memory = "1024" + end + + # Sync + target2.vm.synced_folder ".", "/vagrant", disabled: true + + # config.vm.boot_timeout + + target2.ssh.username = "PURPLEDOME" + target2.ssh.private_key_path = "target2w/id_rsa.3" + + end + + ######################## + + config.vm.define "target3" do |target3| + # The most common configuration options are documented and commented below. + # For a complete reference, please see the online documentation at + # https://docs.vagrantup.com. + + # Every Vagrant development environment requires a box. You can search for + # boxes at https://vagrantcloud.com/search. + target3.vm.box = "ubuntu/groovy64" + # target.vm.base_mac = "080027BB1475" + target3.vm.hostname = "target3" + target3.vm.define "target3" + + target3.vm.synced_folder ".", "/vagrant" + + + # Disable automatic box update checking. If you disable this, then + # boxes will only be checked for updates when the user runs + # `vagrant box outdated`. This is not recommended. + # config.vm.box_check_update = false + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine. In the example below, + # accessing "localhost:8080" will access port 80 on the guest machine. + # NOTE: This will enable public access to the opened port + # config.vm.network "forwarded_port", guest: 80, host: 8080 + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine and only allow access + # via 127.0.0.1 to disable public access + # config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1" + + # Create a private network, which allows host-only access to the machine + # using a specific IP. + # config.vm.network "private_network", ip: "192.168.33.10" + # target3.vm.network :private_network, ip: '192.168.178.163' + + # Create a public network, which generally matched to bridged network. + # Bridged networks make the machine appear as another physical device on + # your network. + target3.vm.network "public_network", bridge: "enp4s0" + + # Share an additional folder to the guest VM. The first argument is + # the path on the host to the actual folder. The second argument is + # the path on the guest to mount the folder. And the optional third + # argument is a set of non-required options. + # config.vm.synced_folder "../data", "/vagrant_data" + + # Provider-specific configuration so you can fine-tune various + # backing providers for Vagrant. These expose provider-specific options. + # Example for VirtualBox: + # + # config.vm.provider "virtualbox" do |vb| + # # Display the VirtualBox GUI when booting the machine + # vb.gui = true + # + # # Customize the amount of memory on the VM: + # vb.memory = "1024" + # end + # + # View the documentation for the provider you are using for more + # information on available options. + + # Enable provisioning with a shell script. Additional provisioners such as + # Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the + # documentation for more information about their specific syntax and use. + target3.vm.provision "shell", inline: <<-SHELL + ls /vagrant + cd /vagrant/target3 + chmod +x bootstrap.sh + ./bootstrap.sh + + # Install implant + chmod +x hackme.sh + ./hackme.sh + + SHELL + end + +##################################################### + + config.vm.define "attacker" do |attacker| + # The most common configuration options are documented and commented below. + # For a complete reference, please see the online documentation at + # https://docs.vagrantup.com. + + # Every Vagrant development environment requires a box. You can search for + # boxes at https://vagrantcloud.com/search. + + # https://app.vagrantup.com/kalilinux/boxes/rolling + attacker.vm.box = "kalilinux/rolling" + # config.vm.box_version = "2020.3.0" + #config.vm.base_mac = "080027BB1476" + attacker.vm.hostname = "attacker" + + + # Disable automatic box update checking. If you disable this, then + # boxes will only be checked for updates when the user runs + # `vagrant box outdated`. This is not recommended. + # config.vm.box_check_update = false + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine. In the example below, + # accessing "localhost:8080" will access port 80 on the guest machine. + # NOTE: This will enable public access to the opened port + # config.vm.network "forwarded_port", guest: 80, host: 8080 + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine and only allow access + # via 127.0.0.1 to disable public access + # config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1" + + # Create a private network, which allows host-only access to the machine + # using a specific IP. + # config.vm.network "private_network", ip: "192.168.33.10" + + # Create a public network, which generally matched to bridged network. + # Bridged networks make the machine appear as another physical device on + # your network. + attacker.vm.network "public_network", bridge: "enp4s0" + + # Share an additional folder to the guest VM. The first argument is + # the path on the host to the actual folder. The second argument is + # the path on the guest to mount the folder. And the optional third + # argument is a set of non-required options. + # config.vm.synced_folder "../data", "/vagrant_data" + + # Provider-specific configuration so you can fine-tune various + # backing providers for Vagrant. These expose provider-specific options. + # Example for VirtualBox: + # + #config.vm.provider "virtualbox" do |vb| + # # Display the VirtualBox GUI when booting the machine + # vb.gui = true + # + # # Customize the amount of memory on the VM: + # vb.memory = "2048" + # end + # + # View the documentation for the provider you are using for more + # information on available options. + + # Enable provisioning with a shell script. Additional provisioners such as + # Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the + # documentation for more information about their specific syntax and use. + attacker.vm.provision "shell", inline: <<-SHELL + echo "Attacker1 inline script start" + ls /vagrant + cd /vagrant/attacker1 + chmod +x bootstrap.sh + ./bootstrap.sh + echo "Attacker ready" + + SHELL + end + + +end diff --git a/systems/attacker1/Vagrantfile b/systems/attacker1/Vagrantfile new file mode 100644 index 0000000..c5e3fda --- /dev/null +++ b/systems/attacker1/Vagrantfile @@ -0,0 +1,76 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +# All Vagrant configuration is done below. The "2" in Vagrant.configure +# configures the configuration version (we support older styles for +# backwards compatibility). Please don't change it unless you know what +# you're doing. +Vagrant.configure("2") do |config| + # The most common configuration options are documented and commented below. + # For a complete reference, please see the online documentation at + # https://docs.vagrantup.com. + + # Every Vagrant development environment requires a box. You can search for + # boxes at https://vagrantcloud.com/search. + config.vm.box = "kalilinux/rolling" + config.vm.base_mac = "080027BB1476" + config.vm.hostname = "attacker1" + + + # Disable automatic box update checking. If you disable this, then + # boxes will only be checked for updates when the user runs + # `vagrant box outdated`. This is not recommended. + # config.vm.box_check_update = false + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine. In the example below, + # accessing "localhost:8080" will access port 80 on the guest machine. + # NOTE: This will enable public access to the opened port + # config.vm.network "forwarded_port", guest: 80, host: 8080 + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine and only allow access + # via 127.0.0.1 to disable public access + # config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1" + + # Create a private network, which allows host-only access to the machine + # using a specific IP. + # config.vm.network "private_network", ip: "192.168.33.10" + + # Create a public network, which generally matched to bridged network. + # Bridged networks make the machine appear as another physical device on + # your network. + config.vm.network "public_network", bridge: "enp4s0" + + # Share an additional folder to the guest VM. The first argument is + # the path on the host to the actual folder. The second argument is + # the path on the guest to mount the folder. And the optional third + # argument is a set of non-required options. + # config.vm.synced_folder "../data", "/vagrant_data" + + # Provider-specific configuration so you can fine-tune various + # backing providers for Vagrant. These expose provider-specific options. + # Example for VirtualBox: + # + # config.vm.provider "virtualbox" do |vb| + # # Display the VirtualBox GUI when booting the machine + # vb.gui = true + # + # # Customize the amount of memory on the VM: + # vb.memory = "1024" + # end + # + # View the documentation for the provider you are using for more + # information on available options. + + # Enable provisioning with a shell script. Additional provisioners such as + # Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the + # documentation for more information about their specific syntax and use. + config.vm.provision "shell", inline: <<-SHELL + ls /vagrant + cd /vagrant + chmod +x bootstrap.sh + ./bootstrap.sh + + SHELL +end \ No newline at end of file diff --git a/tests/data/basic.yaml b/tests/data/basic.yaml new file mode 100644 index 0000000..8607465 --- /dev/null +++ b/tests/data/basic.yaml @@ -0,0 +1,149 @@ + +### +# Caldera configuration +caldera: + ### + # API key for caldera. See caldera configuration. Default is ADMIN123 + apikey: ADMIN123 + +### +# Attacks configuration +attackers: + ### + # Configuration for the first attacker. One should normally be enough + attacker: + + ### + # Defining VM controller settings for this machine + vm_controller: + ### + # Type of the VM controller, Options are "vagrant" + type: vagrant + ### + # # path where the vagrantfile is in + vagrantfilepath: systems + + ### + # Name of machine in Vagrantfile + vm_name: attacker + + ### + # machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path + # and will be mounted internally as /vagrant/ + # If machinepoath is not set AttackX will try "vm_name" + machinepath: attacker1 + + ### + # OS of the VM guest. Options are so far "windows", "linux" + os: linux + + ### + # Do not destroy/create the machine: Set this to "yes". + use_existing_machine: yes + +### +# List of targets +targets: + ### + # Specific target + target1: + vm_controller: + type: vagrant + vagrantfilepath: systems + + vm_name: target1 + os: linux + ### + # Targets need a unique PAW name for caldera + paw: target1 + ### + # Targets need to be in a group for caldera + group: red + + machinepath: target1 + # Do not destroy/create the machine: Set this to "yes". + use_existing_machine: yes + + target2: + #root: systems/target1 + vm_controller: + type: vagrant + vagrantfilepath: systems + + vm_name: target2 + os: windows + paw: target2w + group: red + + machinepath: target2w + + # Do not destroy/create the machine: Set this to "yes". + use_existing_machine: yes + ### + # Optional setting to activate force when halting the machine. Windows guests sometime get stuck + halt_needs_force: yes + + ### + # If SSH without vagrant support is used (Windows !) we need a user name (uppercase) + ssh_user: ATTACKX + + ### + # For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required. + ssh_keyfile: id_rsa.3 + +### +# General attack config +attacks: + ### + # configure the seconds the system idles between the attacks. Makes it slower. But attack and defense logs will be simpler to match + nap_time: 5 + +### +# A list of caldera attacks to run against the targets. +caldera_attacks: + ### + # Linux specific attacks. A list of caldera ability IDs + linux: + - "bd527b63-9f9e-46e0-9816-b8434d2b8989" + ### + # Windows specific attacks. A list of caldera ability IDs + windows: + - "bd527b63-9f9e-46e0-9816-b8434d2b8989" + +### +# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra" +kali_attacks: + ### + # Linux specific attacks, a list + linux: + - hydra + ### + # Windows specific attacks, a list + windows: + - hydra + +### +# Configuration for the kali attack tools +kali_conf: + ### + # Hydra configuration + hydra: + ### + # A list of protocols to brute force against. Supported: "ssh" + protocols: + - ssh + #- ftp + #- ftps + ### + # A file containing potential user names + userfile: users.txt + ### + # A file containing potential passwords + pwdfile: passwords.txt + +### +# Settings for the results being harvested +results: + ### + # The directory the loot will be in + loot_dir: loot \ No newline at end of file diff --git a/tests/data/nap_time_missing.yaml b/tests/data/nap_time_missing.yaml new file mode 100644 index 0000000..718aada --- /dev/null +++ b/tests/data/nap_time_missing.yaml @@ -0,0 +1,142 @@ + +### +# Caldera configuration +caldera: + ### + # API key for caldera. See caldera configuration. Default is ADMIN123 + apikey: ADMIN123 + +### +# Attacks configuration +attackers: + ### + # Configuration for the first attacker. One should normally be enough + attacker: + + ### + # Defining VM controller settings for this machine + vm_controller: + ### + # Type of the VM controller, Options are "vagrant" + type: vagrant + ### + # # path where the vagrantfile is in + vagrantfilepath: systems + + ### + # Name of machine in Vagrantfile + vm_name: attacker + + ### + # machinepath is a path where the machine specific files and logs are stored. Relative to the Vagrantfile path + # and will be mounted internally as /vagrant/ + # If machinepoath is not set PurpleDome will try "vm_name" + machinepath: attacker1 + + ### + # OS of the VM guest. Options are so far "windows", "linux" + os: linux + + ### + # Do not destroy/create the machine: Set this to "yes". + use_existing_machine: yes + +### +# List of targets +targets: + ### + # Specific target + target1: + vm_controller: + type: vagrant + vagrantfilepath: systems + + vm_name: target1 + os: linux + ### + # Targets need a unique PAW name for caldera + paw: target1 + ### + # Targets need to be in a group for caldera + group: red + + machinepath: target1 + # Do not destroy/create the machine: Set this to "yes". + use_existing_machine: yes + + target2: + #root: systems/target1 + vm_controller: + type: vagrant + vagrantfilepath: systems + + vm_name: target2 + os: windows + paw: target2w + group: red + + machinepath: target2w + + # Do not destroy/create the machine: Set this to "yes". + use_existing_machine: yes + ### + # Optional setting to activate force when halting the machine. Windows guests sometime get stuck + halt_needs_force: yes + + ### + # If SSH without vagrant support is used (Windows !) we need a user name (uppercase) + ssh_user: PURPLEDOME + + ### + # For non-vagrant ssh connections a ssh keyfile stored in the machinepath is required. + ssh_keyfile: id_rsa.3 + +### +# A list of caldera attacks to run against the targets. +caldera_attacks: + ### + # Linux specific attacks. A list of caldera ability IDs + linux: + - "bd527b63-9f9e-46e0-9816-b8434d2b8989" + ### + # Windows specific attacks. A list of caldera ability IDs + windows: + - "bd527b63-9f9e-46e0-9816-b8434d2b8989" + +### +# Kali tool based attacks. Will result in kali commandline tools to be called. Currently supported are: "hydra" +kali_attacks: + ### + # Linux specific attacks, a list + linux: + - hydra + ### + # Windows specific attacks, a list + windows: + - hydra + +### +# Configuration for the kali attack tools +kali_conf: + ### + # Hydra configuration + hydra: + ### + # A list of protocols to brute force against. Supported: "ssh" + protocols: + - ssh + #- ftp + #- ftps + ### + # A file containing potential user names + userfile: users.txt + ### + # A file containing potential passwords + pwdfile: passwords.txt + +### +# Settings for the results being harvested +results: + ### + # The directory the loot will be in + loot_dir: loot diff --git a/tests/test_attack_log.py b/tests/test_attack_log.py new file mode 100644 index 0000000..e37a1e6 --- /dev/null +++ b/tests/test_attack_log.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 + +# Testing the attack log class + +import unittest +from app.attack_log import AttackLog +# from app.exceptions import ConfigurationError + +# https://docs.python.org/3/library/unittest.html + + +class TestMachineConfig(unittest.TestCase): + """ Test machine specific config """ + + def test_init(self): + """ The init is empty """ + al = AttackLog() + self.assertIsNotNone(al) + self.assertEqual(al.get_dict(), []) + + def test_caldera_attack_start(self): + """ Starting a caldera attack """ + al = AttackLog() + source = "asource" + paw = "apaw" + group = "agroup" + ability_id = "aability_id" + ttp = "1234" + name = "aname" + description = "adescription" + al.start_caldera_attack(source=source, + paw=paw, + group=group, + ability_id=ability_id, + ttp=ttp, + name=name, + description=description + ) + data = al.get_dict() + self.assertEqual(data[0]["event"], "start") + self.assertEqual(data[0]["type"], "attack") + self.assertEqual(data[0]["sub-type"], "caldera") + self.assertEqual(data[0]["source"], source) + self.assertEqual(data[0]["target_paw"], paw) + self.assertEqual(data[0]["target_group"], group) + self.assertEqual(data[0]["ability_id"], ability_id) + self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data[0]["name"], name) + self.assertEqual(data[0]["description"], description) + + def test_caldera_attack_stop(self): + """ Stopping a caldera attack """ + al = AttackLog() + source = "asource" + paw = "apaw" + group = "agroup" + ability_id = "aability_id" + ttp = "1234" + name = "aname" + description = "adescription" + al.stop_caldera_attack(source=source, + paw=paw, + group=group, + ability_id=ability_id, + ttp=ttp, + name=name, + description=description + ) + data = al.get_dict() + self.assertEqual(data[0]["event"], "stop") + self.assertEqual(data[0]["type"], "attack") + self.assertEqual(data[0]["sub-type"], "caldera") + self.assertEqual(data[0]["source"], source) + self.assertEqual(data[0]["target_paw"], paw) + self.assertEqual(data[0]["target_group"], group) + self.assertEqual(data[0]["ability_id"], ability_id) + self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + self.assertEqual(data[0]["name"], name) + self.assertEqual(data[0]["description"], description) + + def test_kali_attack_start(self): + """ Starting a kali attack """ + al = AttackLog() + source = "asource" + target = "a target" + ttp = "1234" + attack_name = "a name" + al.start_kali_attack(source=source, + target=target, + attack_name=attack_name, + ttp=ttp, + ) + data = al.get_dict() + self.assertEqual(data[0]["event"], "start") + self.assertEqual(data[0]["type"], "attack") + self.assertEqual(data[0]["sub-type"], "kali") + self.assertEqual(data[0]["source"], source) + self.assertEqual(data[0]["target"], target) + self.assertEqual(data[0]["kali_name"], attack_name) + self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) + + def test_kali_attack_stop(self): + """ Stopping a kali attack """ + al = AttackLog() + source = "asource" + target = "a target" + ttp = "1234" + attack_name = "a name" + al.stop_kali_attack(source=source, + target=target, + attack_name=attack_name, + ttp=ttp, + ) + data = al.get_dict() + self.assertEqual(data[0]["event"], "stop") + self.assertEqual(data[0]["type"], "attack") + self.assertEqual(data[0]["sub-type"], "kali") + self.assertEqual(data[0]["source"], source) + self.assertEqual(data[0]["target"], target) + self.assertEqual(data[0]["kali_name"], attack_name) + self.assertEqual(data[0]["hunting_tag"], "MITRE_" + ttp) diff --git a/tests/test_calderacontrol.py b/tests/test_calderacontrol.py new file mode 100644 index 0000000..957b7bc --- /dev/null +++ b/tests/test_calderacontrol.py @@ -0,0 +1,452 @@ +import unittest +from unittest.mock import patch +from app.calderacontrol import CalderaControl +from simplejson.errors import JSONDecodeError +from app.exceptions import CalderaError + +# https://docs.python.org/3/library/unittest.html + + +class TestExample(unittest.TestCase): + + def setUp(self) -> None: + self.cc = CalderaControl("https://localhost", apikey="123") + + def tearDown(self) -> None: + pass + + # List links sends the right commands and post + def test_list_links(self): + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.list_links("asd") + mock_method.assert_called_once_with({"index": "link", "op_id": "asd"}) + + # List links gets an Exception and does not handle it (as expected) + def test_list_links_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.list_links("asd") + + # list results sends the right commands and post + def test_list_results(self): + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.list_results("asd") + mock_method.assert_called_once_with({"index": "result", "link_id": "asd"}) + + # List results gets an Exception and does not handle it (as expected) + def test_list_results_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.list_results("asd") + + # list_operations + def test_list_operations(self): + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.list_operations() + mock_method.assert_called_once_with({"index": "operations"}) + + # list operations gets the expected exception + def test_list_operations_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.list_operations() + + # list_abilities + def test_list_abilities(self): + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.list_abilities() + mock_method.assert_called_once_with({"index": "abilities"}) + + # list abilities gets the expected exception + def test_list_abilities_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.list_abilities() + + # list_agents + def test_list_agents(self): + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.list_agents() + mock_method.assert_called_once_with({"index": "agents"}) + + # list agents gets the expected exception + def test_list_agents_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.list_agents() + + # list_adversaries + def test_list_adversaries(self): + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.list_adversaries() + mock_method.assert_called_once_with({"index": "adversaries"}) + + # list adversaries gets the expected exception + def test_list_adversaries_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.list_adversaries() + + # list_objectives + def test_list_objectives(self): + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.list_objectives() + mock_method.assert_called_once_with({"index": "objectives"}) + + # list objectives gets the expected exception + def test_list_objectives_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.list_objectives() + + # Get operation (working) + def test_get_operation(self): + a = {"name": "bar", "bar": "baz"} + with patch.object(self.cc, "list_operations", return_value=[a, {"name": "zigg", "bar": "baz"}]): + op = self.cc.get_operation("bar") + self.assertEqual(op, a) + + # Get operation (no matching name) + def test_get_operation_not_available(self): + a = {"name": "bar", "bar": "baz"} + with patch.object(self.cc, "list_operations", return_value=[a, {"name": "zigg", "bar": "baz"}]): + op = self.cc.get_operation("baaaaar") + self.assertEqual(op, None) + + # get_adversary + def test_get_adversary(self): + a = {"name": "bar", "bar": "baz"} + with patch.object(self.cc, "list_adversaries", return_value=[a, {"name": "zigg", "bar": "baz"}]): + op = self.cc.get_adversary("bar") + self.assertEqual(op, a) + + # get_adversary (no matching name) + def test_get_adversary_not_available(self): + a = {"name": "bar", "bar": "baz"} + with patch.object(self.cc, "list_adversaries", return_value=[a, {"name": "zigg", "bar": "baz"}]): + op = self.cc.get_adversary("baaaar") + self.assertEqual(op, None) + + # get_objective + def test_get_objective(self): + a = {"name": "bar", "bar": "baz"} + with patch.object(self.cc, "list_objectives", return_value=[a, {"name": "zigg", "bar": "baz"}]): + op = self.cc.get_objective("bar") + self.assertEqual(op, a) + + # get_objective (no matching name) + def test_get_objective_not_available(self): + a = {"name": "bar", "bar": "baz"} + with patch.object(self.cc, "list_objectives", return_value=[a, {"name": "zigg", "bar": "baz"}]): + op = self.cc.get_objective("baaaar") + self.assertEqual(op, None) + + # get_ability + def test_get_ability(self): + a = {"ability_id": "bar", "bar": "baz"} + with patch.object(self.cc, "list_abilities", return_value=[a, {"ability_id": "zigg", "bar": "baz"}]): + op = self.cc.get_ability("bar") + self.assertEqual(op, [a]) + + # get_ability (no matching name) + def test_get_ability_not_available(self): + a = {"ability_id": "bar", "bar": "baz"} + with patch.object(self.cc, "list_abilities", return_value=[a, {"ability_id": "zigg", "bar": "baz"}]): + op = self.cc.get_ability("baaaar") + self.assertEqual(op, []) + + # get_operation_by_id + def test_get_operation_by_id(self): + opid = "FooBar" + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.get_operation_by_id(opid) + mock_method.assert_called_once_with({"index": "operations", "id": opid}) + + # get_operation_by_id gets the expected exception + def test_get_operation_by_id_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.get_result_by_id("FooBar") + + # get_result_by_id + def test_get_result_by_id(self): + opid = "FooBar" + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.get_result_by_id(opid) + mock_method.assert_called_once_with({"index": "result", "link_id": opid}) + + # get_result_by_id gets the expected exception + def test_get_result_by_id_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.get_result_by_id("FooBar") + + # get_linkid + def test_get_linkid(self): + paw = "FooBar" + ability_id = "AID" + + alink = {"paw": paw, + "ability": {"ability_id": ability_id}, + "id": "Getme"} + + op = [{"chain": [alink]}] + + with patch.object(self.cc, "get_operation_by_id", return_value=op): + res = self.cc.get_linkid("Foo", paw, ability_id) + self.assertEqual(res, "Getme") + + # get missing link id + def test_get_linkid_missing(self): + paw = "FooBar" + ability_id = "AID" + + alink = {"paw": paw, + "ability": {"ability_id": ability_id}, + "id": "Getme"} + + op = [{"chain": [alink]}] + + with patch.object(self.cc, "get_operation_by_id", return_value=op): + res = self.cc.get_linkid("Foo", "Bar", ability_id) + self.assertEqual(res, None) + + # view_operation_report + def test_view_operation_report(self): + opid = "FooBar" + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.view_operation_report(opid) + mock_method.assert_called_once_with({"index": "operation_report", "op_id": opid, "agent_output": 1}) + + # get_result_by_id gets the expected exception + def test_view_operation_report_with_exception(self): + with self.assertRaises(JSONDecodeError): + with patch.object(self.cc, "__contact_server__", side_effect=JSONDecodeError("foo", "bar", 2)): + self.cc.view_operation_report("FooBar") + + # view_operation_output + def test_view_operation_output(self): + opid = "OPID_123" + paw = "PAW_123" + ability_id = "AID_123" + or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}} + with patch.object(self.cc, "view_operation_report", return_value=or_ret): + res = self.cc.view_operation_output(opid, paw, ability_id) + self.assertEqual(res, "red") + + # view_operation_output paw missing + def test_view_operation_output_paw_missing(self): + opid = "OPID_123" + paw = "PAW_123" + ability_id = "AID_123" + or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}} + with self.assertRaises(CalderaError): + with patch.object(self.cc, "view_operation_report", return_value=or_ret): + self.cc.view_operation_output(opid, "missing", ability_id) + + # view_operation_output ability_id missing + def test_view_operation_output_ability_id_missing(self): + opid = "OPID_123" + paw = "PAW_123" + ability_id = "AID_123" + or_ret = {"steps": {paw: {"steps": [{"ability_id": ability_id, "output": "red"}]}}} + with patch.object(self.cc, "view_operation_report", return_value=or_ret): + res = self.cc.view_operation_output(opid, paw, "missing") + self.assertEqual(res, None) + + # add_operation + def test_add_operation(self): + name = "test_name" + state = "test_state" + group = "test_group" + advid = "test_id" + + exp = {"index": "operations", + "name": name, + "state": state, + "autonomous": 1, + 'obfuscator': 'plain-text', + 'auto_close': '1', + 'jitter': '4/8', + 'source': 'Alice Filters', + 'visibility': '50', + "group": group, + "planner": "atomic", + "adversary_id": advid, + } + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.add_operation(name, advid, group, state) + mock_method.assert_called_once_with(exp, method="put") + + # add_operation defaults + def test_add_operation_defaults(self): + name = "test_name" + advid = "test_id" + + exp = {"index": "operations", + "name": name, + "state": "running", # default + "autonomous": 1, + 'obfuscator': 'plain-text', + 'auto_close': '1', + 'jitter': '4/8', + 'source': 'Alice Filters', + 'visibility': '50', + "group": "red", # default + "planner": "atomic", + "adversary_id": advid, + } + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.add_operation(name, advid) + mock_method.assert_called_once_with(exp, method="put") + + # add_adversary + def test_add_adversary(self): + name = "test_name" + ability = "test_ability" + description = "test_descritption" + + exp = {"index": "adversaries", + "name": name, + "description": description, + "atomic_ordering": [{"id": ability}], + # + "objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective + } + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.add_adversary(name, ability, description) + mock_method.assert_called_once_with(exp, method="put") + + def test_add_adversary_default(self): + name = "test_name" + ability = "test_ability" + + exp = {"index": "adversaries", + "name": name, + "description": "created automatically", + "atomic_ordering": [{"id": ability}], + # + "objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective + } + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.add_adversary(name, ability) + mock_method.assert_called_once_with(exp, method="put") + + # execute_ability + def test_execute_ability(self): + paw = "test_paw" + ability_id = "test_ability" + obfuscator = "plain-text" + + exp = {"paw": paw, + "ability_id": ability_id, + "obfuscator": obfuscator} + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.execute_ability(paw, ability_id, obfuscator) + mock_method.assert_called_once_with(exp, rest_path="plugin/access/exploit_ex") + + def test_execute_ability_default(self): + paw = "test_paw" + ability_id = "test_ability" + + exp = {"paw": paw, + "ability_id": ability_id, + "obfuscator": "plain-text"} + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.execute_ability(paw, ability_id) + mock_method.assert_called_once_with(exp, rest_path="plugin/access/exploit_ex") + + # execute_operation + def test_execute_operation(self): + operation_id = "test_opid" + state = "paused" + + exp = {"index": "operation", + "op_id": operation_id, + "state": state} + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.execute_operation(operation_id, state) + mock_method.assert_called_once_with(exp) + + # not supported state + def test_execute_operation_not_supported(self): + operation_id = "test_opid" + state = "not supported" + + with self.assertRaises(ValueError): + with patch.object(self.cc, "__contact_server__", return_value=None): + self.cc.execute_operation(operation_id, state) + + def test_execute_operation_default(self): + operation_id = "test_opid" + + exp = {"index": "operation", + "op_id": operation_id, + "state": "running" # default + } + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.execute_operation(operation_id) + mock_method.assert_called_once_with(exp) + + # delete_operation + def test_delete_operation(self): + opid = "test_opid" + + exp = {"index": "operations", + "id": opid} + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.delete_operation(opid) + mock_method.assert_called_once_with(exp, method="delete") + + # delete_adversary + def test_delete_adversary(self): + adid = "test_adid" + + exp = {"index": "adversaries", + "adversary_id": [{"adversary_id": adid}]} + with patch.object(self.cc, "__contact_server__", return_value=None) as mock_method: + self.cc.delete_adversary(adid) + mock_method.assert_called_once_with(exp, method="delete") + + # is_operation_finished + def test_is_operation_finished_true(self): + opdata = [{"state": "finished", "chain": [{"status": 0}]}] + opid = "does not matter" + + with patch.object(self.cc, "get_operation_by_id", return_value=opdata): + res = self.cc.is_operation_finished(opid) + self.assertEqual(res, True) + + def test_is_operation_finished_false(self): + opdata = [{"state": "running", "chain": [{"status": 1}]}] + opid = "does not matter" + + with patch.object(self.cc, "get_operation_by_id", return_value=opdata): + res = self.cc.is_operation_finished(opid) + self.assertEqual(res, False) + + def test_is_operation_finished_exception(self): + opdata = [{"chain": [{"statusa": 1}]}] + opid = "does not matter" + + with self.assertRaises(CalderaError): + with patch.object(self.cc, "get_operation_by_id", return_value=opdata): + self.cc.is_operation_finished(opid) + + def test_is_operation_finished_exception2(self): + opdata = [] + opid = "does not matter" + + with self.assertRaises(CalderaError): + with patch.object(self.cc, "get_operation_by_id", return_value=opdata): + self.cc.is_operation_finished(opid) + + # TODO attack (lots of complexity !) + + # TODO test fetch_client + + # TODO test __contact_server__ + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..96e4b92 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,536 @@ +#!/usr/bin/env python3 + +# Testing the central configuration loader + +import unittest +# import os +from app.config import ExperimentConfig, MachineConfig +from app.exceptions import ConfigurationError + +# https://docs.python.org/3/library/unittest.html + + +class TestMachineConfig(unittest.TestCase): + """ Test machine specific config """ + + def test_empty_init(self): + """ The init is empty """ + + with self.assertRaises(ConfigurationError): + MachineConfig(None) + + def test_basic_init(self): + """ The init is basic and working """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.raw_config["root"], "systems/attacker1") + self.assertEqual(mc.raw_config["vm_controller"]["type"], "vagrant") + + def test_missing_vm_name(self): + """ The vm name is missing """ + + with self.assertRaises(ConfigurationError): + MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }}) + + def test_use_existing_machine_is_true(self): + """ Testing use_existing:machine setting """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": True}) + self.assertEqual(mc.use_existing_machine(), True) + + def test_use_existing_machine_is_false(self): + """ Testing use_existing:machine setting """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.use_existing_machine(), False) + + def test_use_existing_machine_is_default(self): + """ Testing use_existing:machine setting """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.use_existing_machine(), False) + + def test_windows_is_valid_os(self): + """ Testing if windows is valid os """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "windows", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.os(), "windows") + + def test_windows_is_valid_os_casefix(self): + """ Testing if windows is valid os - using lowercase fix""" + mc = MachineConfig({"root": "systems/attacker1", + "os": "WINDOWS", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.os(), "windows") + + def test_linux_is_valid_os(self): + """ Testing if windows is valid os """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.os(), "linux") + + def test_missing_os(self): + """ The os is missing """ + + with self.assertRaises(ConfigurationError): + MachineConfig({"root": "systems/attacker1", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + + def test_wrong_os(self): + """ The os is wrong """ + + with self.assertRaises(ConfigurationError): + MachineConfig({"root": "systems/attacker1", + "os": "BROKEN", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + + def test_vagrant_is_valid_vmcontroller(self): + """ Testing if vagrant is valid vmcontroller """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.vmcontroller(), "vagrant") + + def test_vagrant_is_valid_vmcontroller_casefix(self): + """ Testing if vagrant is valid vmcontroller case fixxed""" + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "VAGRANT", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.vmcontroller(), "vagrant") + + def test_invalid_vmcontroller(self): + """ Testing if vagrant is valid vmcontroller case fixxed""" + with self.assertRaises(ConfigurationError): + MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "BROKEN", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + + def test_missing_vmcontroller_2(self): + """ Testing if vagrant is valid vmcontroller case fixxed""" + with self.assertRaises(ConfigurationError): + MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_name": "target1"}) + + def test_vagrant_is_valid_vmip(self): + """ Testing if vagrant is valid ip/url """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "ip": "kali", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.vm_ip(), "kali") + + def test_missing_vmip(self): + """ Testing if missing vm ip is handled""" + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1"}) + self.assertEqual(mc.vm_ip(), None) + + def test_machinepath(self): + """ Testing machinepath setting """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False, + "machinepath": "foo"}) + self.assertEqual(mc.machinepath(), "foo") + + def test_machinepath_fallback(self): + """ Testing machinepath setting fallback to vmname""" + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.machinepath(), "target1") + + def test_paw(self): + """ Testing for caldera paw """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "paw": "Bar", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.caldera_paw(), "Bar") + + def test_paw_fallback(self): + """ Testing for caldera paw fallback """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.caldera_paw(), None) + + def test_group(self): + """ Testing for caldera group """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "group": "Bar", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.caldera_group(), "Bar") + + def test_group_fallback(self): + """ Testing for caldera group fallback """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.caldera_group(), None) + + def test_ssh_keyfile(self): + """ Testing keyfile config """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "ssh_keyfile": "Bar", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.ssh_keyfile(), "Bar") + + def test_ssh_keyfile_default(self): + """ Testing keyfile config default """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.ssh_keyfile(), None) + + def test_ssh_user(self): + """ Testing ssh user config """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "ssh_user": "Bob", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.ssh_user(), "Bob") + + def test_ssh_user_default(self): + """ Testing ssh user default config """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.ssh_user(), "vagrant") + + def test_halt_needs_force_default(self): + """ Testing 'halt needs force' default config """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.halt_needs_force(), False) + + def test_halt_needs_force(self): + """ Testing 'halt needs force' config """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.halt_needs_force(), True) + + def test_vagrantfilepath(self): + """ Testing vagrantfilepath config """ + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.vagrantfilepath(), "systems") + + def test_vagrantfilepath_missing(self): + """ Testing missing vagrantfilepath config """ + + with self.assertRaises(ConfigurationError): + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target1", + "use_existing_machine": False}) + mc.vagrantfilepath() + + def test_sensors_empty(self): + """ Testing empty sensor config """ + + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.sensors(), []) + + def test_sensors_set(self): + """ Testing empty sensor config """ + + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target1", + "use_existing_machine": False, + "sensors": ["linux_idp", "test_sensor"]}) + self.assertEqual(mc.sensors(), ["linux_idp", "test_sensor"]) + + def test_vulnerabilities_empty(self): + """ Testing empty vulnerabilities config """ + + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target1", + "use_existing_machine": False}) + self.assertEqual(mc.vulnerabilities(), []) + + def test_vulnerabilities_set(self): + """ Testing empty vulnerabilities config """ + + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target1", + "use_existing_machine": False, + "vulnerabilities": ["PEBKAC", "USER"]}) + self.assertEqual(mc.vulnerabilities(), ["PEBKAC", "USER"]) + + def test_active_not_set(self): + """ machine active not set """ + + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target1", + "use_existing_machine": False, + "sensors": ["linux_idp", "test_sensor"]}) + self.assertEqual(mc.is_active(), True) + + def test_active_is_false(self): + """ machine active is set to false """ + + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target1", + "use_existing_machine": False, + "active": False, + "sensors": ["linux_idp", "test_sensor"]}) + self.assertEqual(mc.is_active(), False) + + def test_active_is_true(self): + """ machine active is set to true """ + + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "halt_needs_force": True, + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target1", + "use_existing_machine": False, + "active": True, + "sensors": ["linux_idp", "test_sensor"]}) + self.assertEqual(mc.is_active(), True) + + +class TestExperimentConfig(unittest.TestCase): + def test_missing_config_file(self): + """ Missing config file """ + with self.assertRaises(FileNotFoundError): + ExperimentConfig("tests/data/missing.yaml") + + def test_basic_loading(self): + """ Existing, basic config file, testing the values are loaded properly """ + + ex = ExperimentConfig("tests/data/basic.yaml") + self.assertEqual(ex.raw_config["caldera"]["apikey"], "ADMIN123") + self.assertEqual(ex.caldera_apikey(), "ADMIN123") + + def test_basic_loading_targets_read(self): + """ Targets in config: can be found """ + ex = ExperimentConfig("tests/data/basic.yaml") + self.assertEqual(len(ex._targets), 2) + self.assertEqual(ex._targets[0].raw_config["vm_name"], "target1") + self.assertEqual(ex._targets[0].vmname(), "target1") + self.assertEqual(ex.targets()[0].vmname(), "target1") + + def test_basic_loading_attacker_read(self): + """ Attackers in config: can be found """ + ex = ExperimentConfig("tests/data/basic.yaml") + self.assertEqual(len(ex._targets), 2) + self.assertEqual(ex._attackers[0].raw_config["vm_name"], "attacker") + self.assertEqual(ex._attackers[0].vmname(), "attacker") + self.assertEqual(ex.attackers()[0].vmname(), "attacker") + self.assertEqual(ex.attacker(0).vmname(), "attacker") + + def test_missing_kali_config(self): + """ Getting kali config for a specific attack. Attack missing """ + + ex = ExperimentConfig("tests/data/basic.yaml") + with self.assertRaises(ConfigurationError): + ex.kali_conf("BOOM") + + def test_working_kali_config(self): + """ Getting kali config for a specific attack """ + + ex = ExperimentConfig("tests/data/basic.yaml") + + data = ex.kali_conf("hydra") + self.assertEqual(data["userfile"], "users.txt") + + def test_nap_time(self): + """ nap time is set """ + + ex = ExperimentConfig("tests/data/basic.yaml") + + self.assertEqual(ex.get_nap_time(), 5) + + def test_nap_time_not_set(self): + """ nap time is not set """ + + ex = ExperimentConfig("tests/data/nap_time_missing.yaml") + + self.assertEqual(ex.get_nap_time(), 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_experimentcontrol.py b/tests/test_experimentcontrol.py new file mode 100644 index 0000000..c770f13 --- /dev/null +++ b/tests/test_experimentcontrol.py @@ -0,0 +1,30 @@ +import unittest + +# https://docs.python.org/3/library/unittest.html + + +class TestExample2(unittest.TestCase): + + def setUp(self) -> None: + pass + + def tearDown(self) -> None: + pass + + def test_upper(self): + self.assertEqual('foo'.upper(), 'FOO') + + def test_isupper(self): + self.assertTrue('FOO'.isupper()) + self.assertFalse('Foo'.isupper()) + + def test_split(self): + s = 'hello world' + self.assertEqual(s.split(), ['hello', 'world']) + # check that s.split fails when the separator is not a string + with self.assertRaises(TypeError): + s.split(2) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_machinecontrol.py b/tests/test_machinecontrol.py new file mode 100644 index 0000000..cbd37b7 --- /dev/null +++ b/tests/test_machinecontrol.py @@ -0,0 +1,259 @@ +import unittest +import os +from app.machinecontrol import Machine +from app.exceptions import ConfigurationError +from app.config import MachineConfig +from unittest.mock import patch + +# https://docs.python.org/3/library/unittest.html + + +class TestMachineControl(unittest.TestCase): + + def test_get_os_linux_machine(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3"}) + self.assertEqual(m.get_os(), "linux") + + def test_get_os_linux_machine_with_config_class(self): + mc = MachineConfig({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3"}) + m = Machine(mc) + self.assertEqual(m.get_os(), "linux") + + def test_get_os_missing(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3" + }) + + def test_get_os_not_supported(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "os": "nintendo_switch", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3"}) + + def test_get_paw_good(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "paw": "testme", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3"}) + self.assertEqual(m.get_paw(), "testme") + + def test_get_paw_missing(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3" + }) + self.assertEqual(m.get_paw(), None) + + def test_get_group_good(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "group": "testme", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3"}) + self.assertEqual(m.get_group(), "testme") + + def test_get_group_missing(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3" + }) + self.assertEqual(m.get_group(), None) + + def test_vagrantfilepath_missing(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + }, + "vm_name": "target3" + }) + + def test_vagrantfile_missing(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "non_existing", + }, + "vm_name": "target3" + }) + + def test_vagrantfile_existing(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3" + }) + self.assertIsNotNone(m) + + def test_name_missing(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + }) + + # test: auto generated, dir missing + def test_auto_generated_machinepath_with_path_missing(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "missing" + }) + + # test manual config, dir missing + def test_configured_machinepath_with_path_missing(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3", + "machinepath": "missing" + }) + + # test auto generated, dir there (external/internal dirs must work !) + def test_auto_generated_machinepath_with_good_config(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3" + }) + vagrantfilepath = os.path.abspath("systems") + ext = os.path.join(vagrantfilepath, "target3") + internal = os.path.join("/vagrant/", "target3") + + self.assertEqual(m.abs_machinepath_external, ext) + self.assertEqual(m.abs_machinepath_internal, internal) + + # test: manual config, dir there (external/internal dirs must work !) + def test_configured_machinepath_with_good_config(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "missing", + "machinepath": "target3" + }) + vagrantfilepath = os.path.abspath("systems") + ext = os.path.join(vagrantfilepath, "target3") + internal = os.path.join("/vagrant/", "target3") + + self.assertEqual(m.abs_machinepath_external, ext) + self.assertEqual(m.abs_machinepath_internal, internal) + + # vm_controller missing + def test_configured_vm_controller_missing(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "os": "linux", + "vm_name": "missing", + "machinepath": "target3" + }) + + # vm_controller wrong + def test_configured_vm_controller_wrong_type(self): + with self.assertRaises(ConfigurationError): + Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "wrong_controller", + "vagrantfilepath": "systems", + }, + "vm_name": "missing", + "machinepath": "target3" + }) + + # Create caldera start command and verify it + def test_get_linux_caldera_start_cmd(self): + m = Machine({"root": "systems/attacker1", + "os": "linux", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3", + "group": "testgroup", + "paw": "testpaw"}) + m.set_caldera_server("http://www.test.test") + with patch.object(m.vm_manager, "get_playground", return_value="/vagrant/target3"): + cmd = m.create_start_caldera_client_cmd() + self.assertEqual(cmd.strip(), "nohup /vagrant/target3/caldera_agent.sh start &".strip()) + + # Create caldera start command and verify it (windows) + def test_get_windows_caldera_start_cmd(self): + m = Machine({"root": "systems/attacker1", + "os": "windows", + "vm_controller": { + "type": "vagrant", + "vagrantfilepath": "systems", + }, + "vm_name": "target3", + "group": "testgroup", + "paw": "testpaw", + "machinepath": "target2w"}) + m.set_caldera_server("www.test.test") + cmd = m.create_start_caldera_client_cmd() + self.maxDiff = None + expected = """ +caldera_agent.bat""" + self.assertEqual(cmd.strip(), expected.strip()) + + +if __name__ == '__main__': + unittest.main() diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..18a4122 --- /dev/null +++ b/tox.ini @@ -0,0 +1,53 @@ +# tox (https://tox.readthedocs.io/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. To use it, "pip install tox" +# and then run "tox" from this directory. + +[tox] +envlist = py38 + +[flake8] +# E501 Line length. Ignored here. But still: please do not abuse the freedom +# W291 trailing whitespace +ignore = E501, + W291 + +exclude = + .git, + vm_images, + venv, + vagrant_templates, + vagrantboxes, + .vagrant + systems, + real_machines_to_build_vagrant_boxes_from, + __pycache__, + .idea, + htmlcov, + .tox, + + +max-complexity = 10 + +[testenv] +deps = -rrequirements.txt + flake8 + safety + bandit + pylint + + +commands = + # python -m unittest discover -s tests + coverage run --source=app -m unittest discover -s tests + # Gotta ignore some flake8 warnings, because they are maskerading real issues. + # Ignoring: + # C901 complex code. Reduce complexitiy. But this thing is over-reacting + # E501: line too long. Please: Still keep it short. But 80 chars is just incredibly short nowadays + flake8 --ignore C901,E501 + # Check if dependencies are vulnerable + safety check -r requirements.txt + # Check for common vulnerabilities + bandit -ll -r app/ plugins/ *.py + # Linting + # pylint *.py # currently off. Needs configuration