Fixing first batch of file for stricter mypy

pull/44/head
Thorsten Sick 2 years ago
parent 32d33f015f
commit 7a718f79f3

@ -35,7 +35,7 @@ class AttackLog():
self.datetime_format = "%H:%M:%S.%f"
def __add_to_log__(self, item: dict):
def __add_to_log__(self, item: dict) -> None:
""" internal command to add a item to the log
:param item: data chunk to add
@ -48,17 +48,22 @@ class AttackLog():
return datetime.datetime.now().strftime(self.datetime_format)
def get_caldera_default_name(self, ability_id: str):
def get_caldera_default_name(self, ability_id: str) -> Optional[str]:
""" Returns the default name for this ability based on a db """
# TODO: Add a proper database. At least an external file
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "whoami"}
if ability_id not in data:
return None
return data[ability_id]
def get_caldera_default_description(self, ability_id: str):
def get_caldera_default_description(self, ability_id: str) -> Optional[str]:
""" Returns the default description for this ability based on a db """
# TODO: Add a proper database. At least an external file
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "Obtain user from current session",
"697e8a432031075e47cccba24417013d": "Copy a VBS file to several startup folders",
"f39161b2fa5d692ebe3972e0680a8f97": "Copy a BAT file to several startup folders",
@ -71,9 +76,11 @@ class AttackLog():
return data[ability_id]
def get_caldera_default_tactics(self, ability_id: str, ttp: Optional[str]):
def get_caldera_default_tactics(self, ability_id: str, ttp: Optional[str]) -> Optional[str]:
""" Returns the default tactics for this ability based on a db """
# TODO: Add a proper database. At least an external file
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "System Owner/User Discovery",
"f39161b2fa5d692ebe3972e0680a8f97": "Persistence",
"16e6823c4656f5cd155051f5f1e5d6ad": "Persistence",
@ -98,9 +105,11 @@ class AttackLog():
return None
def get_caldera_default_tactics_id(self, ability_id: str, ttp: Optional[str]):
def get_caldera_default_tactics_id(self, ability_id: str, ttp: Optional[str]) -> Optional[str]:
""" Returns the default name for this ability based on a db """
# TODO: Add a proper database. At least an external file
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": "T1033",
"f39161b2fa5d692ebe3972e0680a8f97": "TA0003",
"16e6823c4656f5cd155051f5f1e5d6ad": "TA0003",
@ -125,9 +134,11 @@ class AttackLog():
return None
def get_caldera_default_situation_description(self, ability_id: str):
def get_caldera_default_situation_description(self, ability_id: str) -> Optional[str]:
""" Returns the default situation description for this ability based on a db """
# TODO: Add a proper database. At least an external file
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": None,
"697e8a432031075e47cccba24417013d": None,
"f39161b2fa5d692ebe3972e0680a8f97": None,
@ -140,8 +151,13 @@ class AttackLog():
return data[ability_id]
def get_caldera_default_countermeasure(self, ability_id: str):
""" Returns the default countermeasure for this ability based on a db """
def get_caldera_default_countermeasure(self, ability_id: str) -> Optional[str]:
""" Returns the default countermeasure for this ability based on a db
:returns: Default countermeasure as string. Or None if not found
"""
# TODO: Add a proper database. At least an external file
data = {"bd527b63-9f9e-46e0-9816-b8434d2b8989": None,
"697e8a432031075e47cccba24417013d": None,
@ -155,7 +171,7 @@ class AttackLog():
return data[ability_id]
def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs):
def start_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: Optional[str] = None, **kwargs: dict) -> str:
""" Mark the start of a caldera attack
:param source: source of the attack. Attack IP
@ -163,6 +179,7 @@ class AttackLog():
:param group: Caldera group of the targets being attacked
:param ability_id: Caldera ability id of the attack
:param ttp: TTP of the attack (as stated by Caldera internal settings)
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -198,7 +215,7 @@ class AttackLog():
# TODO: Add config
# TODO: Add results
def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs):
def stop_caldera_attack(self, source: str, paw: str, group: str, ability_id: str, ttp: str = None, **kwargs: dict) -> None:
""" Mark the end of a caldera attack
:param source: source of the attack. Attack IP
@ -230,12 +247,13 @@ class AttackLog():
}
self.__add_to_log__(data)
def start_file_write(self, source: str, target: str, file_name: str):
def start_file_write(self, source: str, target: str, file_name: str) -> str:
""" Mark the start of a file being written to the target (payload !)
:param source: source of the attack. Attack IP (empty if written from controller)
:param target: Target machine of the attack
:param file_name: Name of the file being written
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -254,7 +272,7 @@ class AttackLog():
self.__add_to_log__(data)
return logid
def stop_file_write(self, source: str, target: str, file_name: str, **kwargs):
def stop_file_write(self, source: str, target: str, file_name: str, **kwargs: dict) -> None:
""" Mark the stop of a file being written to the target (payload !)
:param source: source of the attack. Attack IP (empty if written from controller)
@ -278,12 +296,13 @@ class AttackLog():
self.__add_to_log__(data)
def start_execute_payload(self, source: str, target: str, command: str):
def start_execute_payload(self, source: str, target: str, command: str) -> str:
""" Mark the start of a payload being executed
:param source: source of the attack. Attack IP (empty if written from controller)
:param target: Target machine of the attack
:param command:
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -303,7 +322,7 @@ class AttackLog():
return logid
def stop_execute_payload(self, source: str, target: str, command: str, **kwargs):
def stop_execute_payload(self, source: str, target: str, command: str, **kwargs: dict) -> None:
""" Mark the stop of a payload being executed
:param source: source of the attack. Attack IP (empty if written from controller)
@ -324,13 +343,14 @@ class AttackLog():
}
self.__add_to_log__(data)
def start_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs):
def start_kali_attack(self, source: str, target: str, attack_name: str, ttp: Optional[str] = None, **kwargs: dict) -> str:
""" Mark the start of a Kali based attack
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param attack_name: Name of the attack. From plugin
:param ttp: TTP of the attack. From plugin
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -359,11 +379,7 @@ class AttackLog():
return logid
# TODO: Add parameter
# TODO: Add config
# TODO: Add results
def stop_kali_attack(self, source: str, target: str, attack_name: str, ttp: str = None, **kwargs):
def stop_kali_attack(self, source: str, target: str, attack_name: str, ttp: Optional[str] = None, **kwargs: dict) -> None:
""" Mark the end of a Kali based attack
:param source: source of the attack. Attack IP
@ -385,11 +401,12 @@ class AttackLog():
}
self.__add_to_log__(data)
def start_narration(self, text: str):
def start_narration(self, text: str) -> str:
""" Add some user defined narration. Can be used in plugins to describe the situation before and after the attack, ...
At the moment there is no stop narration command. I do not think we need one. But I want to stick to the structure
:param text: Text of the narration
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -404,10 +421,11 @@ class AttackLog():
self.__add_to_log__(data)
return logid
def start_attack_step(self, text: str):
def start_attack_step(self, text: str) -> str:
""" Mark the start of an attack step (several attacks in a chunk)
:param text: description of the attack step being started
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -425,7 +443,7 @@ class AttackLog():
return logid
def stop_attack_step(self, text: str, **kwargs):
def stop_attack_step(self, text: str, **kwargs: dict) -> None:
""" Mark the end of an attack step (several attacks in a chunk)
:param text: description of the attack step being stopped
@ -440,13 +458,10 @@ class AttackLog():
}
self.__add_to_log__(data)
def start_build(self, **kwargs):
def start_build(self, **kwargs: dict) -> str:
""" Mark the start of a tool building/compilation process
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param attack_name: Name of the attack. From plugin
:param ttp: TTP of the attack. From plugin
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -476,11 +491,7 @@ class AttackLog():
return logid
# TODO: Add parameter
# TODO: Add config
# TODO: Add results
def stop_build(self, **kwargs):
def stop_build(self, **kwargs: dict) -> None:
""" Mark the end of a tool building/compilation process
:param source: source of the attack. Attack IP
@ -497,13 +508,14 @@ class AttackLog():
}
self.__add_to_log__(data)
def start_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs):
def start_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs: dict) -> str:
""" Mark the start of a Metasploit based attack
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param metasploit_command: The command to metasploit
:param ttp: TTP of the attack. From plugin
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -544,7 +556,7 @@ class AttackLog():
return logid
def stop_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs):
def stop_metasploit_attack(self, source: str, target: str, metasploit_command: str, ttp: str = None, **kwargs: None) -> None:
""" Mark the start of a Metasploit based attack
:param source: source of the attack. Attack IP
@ -566,13 +578,14 @@ class AttackLog():
}
self.__add_to_log__(data)
def start_attack_plugin(self, source: str, target: str, plugin_name: str, ttp: str = None):
def start_attack_plugin(self, source: str, target: str, plugin_name: str, ttp: str = None) -> str:
""" Mark the start of an attack plugin
:param source: source of the attack. Attack IP
:param target: Target machine of the attack
:param plugin_name: Name of the plugin
:param ttp: TTP of the attack. From plugin
:returns: logid
"""
timestamp = self.__get_timestamp__()
@ -596,7 +609,7 @@ class AttackLog():
# TODO: Add config
# TODO: Add results
def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs):
def stop_attack_plugin(self, source: str, target: str, plugin_name: str, **kwargs: dict) -> None:
""" Mark the end of an attack plugin
:param source: source of the attack. Attack IP
@ -606,6 +619,10 @@ class AttackLog():
:param kwargs: *ttp*, *logid*
"""
tag: Optional[str] = None
if kwargs.get("ttp", None) is not None:
tag = str(kwargs.get("ttp", None))
data = {"timestamp": self.__get_timestamp__(),
"event": "stop",
"type": "attack",
@ -613,12 +630,12 @@ class AttackLog():
"source": source,
"target": target,
"plugin_name": plugin_name,
"hunting_tag": __mitre_fix_ttp__(kwargs.get("ttp", None)),
"hunting_tag": __mitre_fix_ttp__(tag),
"logid": kwargs.get("logid", None)
}
self.__add_to_log__(data)
def write_json(self, filename: str):
def write_json(self, filename: str) -> None:
""" Write the json data for this log
:param filename: Name of the json file
@ -626,7 +643,7 @@ class AttackLog():
with open(filename, "wt") as fh:
json.dump(self.get_dict(), fh)
def post_process(self):
def post_process(self) -> None:
""" Post process the data before using it """
for entry in self.log:
@ -640,7 +657,7 @@ class AttackLog():
if "result" in entry:
replace_entry["result"] = entry["result"]
def get_dict(self):
def get_dict(self) -> dict:
""" Return logged data in dict format """
res = {"boilerplate": {"log_format_major_version": 1, # Changes on changes that breaks readers (items are modified or deleted)
@ -652,7 +669,7 @@ class AttackLog():
return res
def add_machine_info(self, machine_info: dict):
def add_machine_info(self, machine_info: dict) -> None:
""" Adds a dict with machine info. One machine per call of this method """
self.machines.append(machine_info)
@ -664,7 +681,7 @@ class AttackLog():
# TODO: Return full doc
def vprint(self, text: str, verbosity: int):
def vprint(self, text: str, verbosity: int) -> None:
""" verbosity based stdout printing
0: Errors only

@ -5,17 +5,18 @@
import json
from pprint import pformat
from typing import Optional, Union, Annotated
from typing import Optional, Union, Annotated, Any
import requests
import simplejson
from pydantic.dataclasses import dataclass
from pydantic import conlist # pylint: disable=no-name-in-module
from app.attack_log import AttackLog
from app.config import ExperimentConfig
# from app.exceptions import CalderaError
# from app.interface_sfx import CommandlineColors
# TODO: Ability deserves an own class.
# TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)"
@dataclass
@ -68,7 +69,7 @@ class Executor: # pylint: disable=missing-class-docstring
platform: str
command: Optional[str]
def get(self, akey, default=None):
def get(self, akey: str, default: Any = None) -> Any:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
if akey in self.__dict__:
return self.__dict__[akey]
@ -95,7 +96,7 @@ class Ability:
ability_id: str
privilege: Optional[str] = None
def get(self, akey, default=None):
def get(self, akey: str, default: Any = None) -> Any:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
if akey in self.__dict__:
return self.__dict__[akey]
@ -108,7 +109,7 @@ class AbilityList:
""" A list of exploits """
abilities: Annotated[list, conlist(Ability, min_items=1)]
def get_data(self):
def get_data(self) -> list[Ability]:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.abilities
@ -126,7 +127,7 @@ class ObfuscatorList:
""" A list of obfuscators """
obfuscators: Annotated[list, conlist(Obfuscator, min_items=1)]
def get_data(self):
def get_data(self) -> list[Obfuscator]:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.obfuscators
@ -143,7 +144,7 @@ class Adversary:
tags: list[str]
plugin: Optional[str] = None
def get(self, akey, default=None):
def get(self, akey: str, default: Any = None) -> Any:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
if akey in self.__dict__:
return self.__dict__[akey]
@ -156,7 +157,7 @@ class AdversaryList:
""" A list of adversary """
adversaries: Annotated[list, conlist(Adversary, min_items=1)]
def get_data(self):
def get_data(self) -> list[Adversary]:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.adversaries
@ -177,7 +178,7 @@ class Fact: # pylint: disable=missing-class-docstring
technique_id: Optional[str] = None
collected_by: Optional[str] = None
def get(self, akey, default=None):
def get(self, akey: str, default: Any = None) -> Any:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
if akey in self.__dict__:
return self.__dict__[akey]
@ -218,7 +219,6 @@ class Link: # pylint: disable=missing-class-docstring
score: int
used: list[Fact]
facts: list[Fact]
agent_reported_time: str
id: str # pylint: disable=invalid-name
collect: str
command: str
@ -226,6 +226,7 @@ class Link: # pylint: disable=missing-class-docstring
relationships: list[Relationship]
jitter: int
deadman: bool
agent_reported_time: Optional[str] = ""
@dataclass
@ -262,7 +263,7 @@ class Agent:
pending_contact: str
privilege: Optional[str] = None # Error, not documented
def get(self, akey, default=None):
def get(self, akey: str, default: Any = None) -> Any:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
if akey in self.__dict__:
return self.__dict__[akey]
@ -275,7 +276,7 @@ class AgentList:
""" A list of agents """
agents: list[Agent]
def get_data(self):
def get_data(self) -> list[Agent]:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.agents
@ -305,7 +306,7 @@ class Source: # pylint: disable=missing-class-docstring
id: str # pylint: disable=invalid-name
adjustments: Optional[list[Adjustment]] = None
def get(self, akey, default=None):
def get(self, akey: str, default: Any = None) -> Any:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
if akey in self.__dict__:
return self.__dict__[akey]
@ -317,7 +318,7 @@ class Source: # pylint: disable=missing-class-docstring
class SourceList: # pylint: disable=missing-class-docstring
sources: list[Source]
def get_data(self):
def get_data(self) -> list[Source]:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.sources
@ -342,7 +343,7 @@ class PlannerList:
""" A list of planners"""
planners: list[Planner]
def get_data(self):
def get_data(self) -> list[Planner]:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.planners
@ -364,7 +365,7 @@ class Objective: # pylint: disable=missing-class-docstring
description: str
id: str # pylint: disable=invalid-name
def get(self, akey, default=None):
def get(self, akey: str, default: Any = None) -> Any:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
if akey in self.__dict__:
return self.__dict__[akey]
@ -393,7 +394,7 @@ class Operation:
auto_close: bool
chain: Optional[list] = None
def get(self, akey, default=None):
def get(self, akey: str, default: Any = None) -> Any:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
if akey in self.__dict__:
return self.__dict__[akey]
@ -406,7 +407,7 @@ class OperationList:
""" A list of operations """
operations: Annotated[list, conlist(Operation)]
def get_data(self):
def get_data(self) -> list[Operation]:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.operations
@ -415,7 +416,7 @@ class OperationList:
class ObjectiveList: # pylint: disable=missing-class-docstring
objectives: Annotated[list, conlist(Objective)]
def get_data(self):
def get_data(self) -> list[Objective]:
""" Get a specific element out of the internal data representation, behaves like the well know 'get' """
return self.objectives
@ -423,7 +424,7 @@ class ObjectiveList: # pylint: disable=missing-class-docstring
class CalderaAPI():
""" Remote control Caldera through REST api """
def __init__(self, server: str, attack_logger, config=None, apikey=None):
def __init__(self, server: str, attack_logger: AttackLog, config: ExperimentConfig = None, apikey: str = "ADMIN123") -> None:
"""
@param server: Caldera server url/ip
@ -440,7 +441,7 @@ class CalderaAPI():
else:
self.apikey = apikey
def __contact_server__(self, payload, rest_path: str = "api/v2/abilities", method: str = "get"):
def __contact_server__(self, payload: Optional[dict], rest_path: str = "api/v2/abilities", method: str = "get") -> dict:
"""
@param payload: payload as dict to send to the server
@ -486,7 +487,7 @@ class CalderaAPI():
return res
def list_abilities(self):
def list_abilities(self) -> list[Ability]:
""" Return all ablilities """
payload = None
@ -494,7 +495,7 @@ class CalderaAPI():
abilities = AbilityList(**data)
return abilities.get_data()
def list_obfuscators(self):
def list_obfuscators(self) -> list[Obfuscator]:
""" Return all obfuscators """
payload = None
@ -502,7 +503,7 @@ class CalderaAPI():
obfuscators = ObfuscatorList(**data)
return obfuscators.get_data()
def list_adversaries(self):
def list_adversaries(self) -> list[Adversary]:
""" Return all adversaries """
payload = None
@ -510,7 +511,7 @@ class CalderaAPI():
adversaries = AdversaryList(**data)
return adversaries.get_data()
def list_sources(self):
def list_sources(self) -> list[Source]:
""" Return all sources """
payload = None
@ -518,7 +519,7 @@ class CalderaAPI():
sources = SourceList(**data)
return sources.get_data()
def list_planners(self):
def list_planners(self) -> list[Planner]:
""" Return all planners """
payload = None
@ -526,7 +527,7 @@ class CalderaAPI():
planners = PlannerList(**data)
return planners.get_data()
def list_operations(self):
def list_operations(self) -> list[Operation]:
""" Return all operations """
payload = None
@ -534,7 +535,7 @@ class CalderaAPI():
operations = OperationList(**data)
return operations.get_data()
def set_operation_state(self, operation_id: str, state: str = "running"):
def set_operation_state(self, operation_id: str, state: str = "running") -> dict:
""" Executes an operation on a server
@param operation_id: The operation to modify
@ -550,7 +551,7 @@ class CalderaAPI():
payload = {"state": state}
return self.__contact_server__(payload, method="patch", rest_path=f"api/v2/operations/{operation_id}")
def list_agents(self):
def list_agents(self) -> list[Agent]:
""" Return all agents """
payload = None
@ -558,7 +559,7 @@ class CalderaAPI():
agents = AgentList(**data)
return agents.get_data()
def list_objectives(self):
def list_objectives(self) -> list[Objective]:
""" Return all objectivs """
payload = None
@ -566,7 +567,7 @@ class CalderaAPI():
objectives = ObjectiveList(**data)
return objectives.get_data()
def add_adversary(self, name: str, ability: str, description: str = "created automatically"):
def add_adversary(self, name: str, ability: str, description: str = "created automatically") -> dict:
""" Adds a new adversary
:param name: Name of the adversary
@ -587,31 +588,34 @@ class CalderaAPI():
# ],
"description": description
}
# TODO Check this return value
data = {"agents": self.__contact_server__(payload, method="post", rest_path="api/v2/adversaries")}
# agents = AgentList(**data)
return data
def delete_adversary(self, adversary_id: str):
def delete_adversary(self, adversary_id: str) -> dict:
""" Deletes an adversary
:param adversary_id: The id of this adversary
:return:
"""
payload = None
# TODO Check this return value
data = {"agents": self.__contact_server__(payload, method="delete", rest_path=f"api/v2/adversaries/{adversary_id}")}
return data
def delete_agent(self, agent_paw: str):
def delete_agent(self, agent_paw: str) -> dict:
""" Deletes an agent
:param agent_paw: the paw to delete
:return:
"""
payload = None
# TODO Check this return value
data = {"agents": self.__contact_server__(payload, method="delete", rest_path=f"api/v2/agents/{agent_paw}")}
return data
def kill_agent(self, agent_paw: str):
def kill_agent(self, agent_paw: str) -> dict:
""" Kills an agent on the target
:param agent_paw: The paw identifying this agent
@ -623,7 +627,7 @@ class CalderaAPI():
data = self.__contact_server__(payload, method="patch", rest_path=f"api/v2/agents/{agent_paw}")
return data
def add_operation(self, **kwargs):
def add_operation(self, **kwargs: dict) -> OperationList:
""" Adds a new operation
:param kwargs:
@ -632,14 +636,18 @@ class CalderaAPI():
# name, adversary_id, source_id = "basic", planner_id = "atomic", group = "", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8'
name: str = kwargs.get("name")
adversary_id: str = kwargs.get("adversary_id")
source_id: str = kwargs.get("source_id", "basic")
planner_id: str = kwargs.get("planner_id", "atomic")
group: str = kwargs.get("group", "")
state: str = kwargs.get("state", "running")
obfuscator: str = kwargs.get("obfuscator", "plain-text")
jitter: str = kwargs.get("jitter", "4/8")
if kwargs.get("adversary_id") is None:
adversary_id = None
else:
adversary_id = str(kwargs.get("adversary_id"))
name: str = str(kwargs.get("name"))
source_id: str = str(kwargs.get("source_id", "basic"))
planner_id: str = str(kwargs.get("planner_id", "atomic"))
group: str = str(kwargs.get("group", ""))
state: str = str(kwargs.get("state", "running"))
obfuscator: str = str(kwargs.get("obfuscator", "plain-text"))
jitter: str = str(kwargs.get("jitter", "4/8"))
payload = {"name": name,
"group": group,
@ -657,20 +665,20 @@ class CalderaAPI():
operations = OperationList(**data)
return operations
def delete_operation(self, operation_id):
def delete_operation(self, operation_id: str) -> dict:
""" Deletes an operation
:param operation_id: The Id of the operation to delete
:return:
"""
payload = {}
payload: dict = {}
data = self.__contact_server__(payload, method="delete", rest_path=f"api/v2/operations/{operation_id}")
return data
def view_operation_report(self, operation_id):
def view_operation_report(self, operation_id: str) -> dict:
""" Views the report of a finished operation
:param operation_id: The id of this operation
@ -685,7 +693,7 @@ class CalderaAPI():
return data
def get_ability(self, abid: str):
def get_ability(self, abid: str) -> list[Ability]:
"""" Return an ability by id
@param abid: Ability id
@ -698,12 +706,12 @@ class CalderaAPI():
with open("debug_removeme.txt", "wt") as fh:
fh.write(pformat(self.list_abilities()))
for ability in self.list_abilities()["abilities"]:
for ability in self.list_abilities():
if ability.get("ability_id", None) == abid or ability.get("auto_generated_guid", None) == abid:
res.append(ability)
return res
def pretty_print_ability(self, abi):
def pretty_print_ability(self, abi: dict) -> None:
""" Pretty pritns an ability
@param abi: A ability dict

@ -4,16 +4,18 @@
import json
import os
from typing import Optional
from jinja2 import Environment, FileSystemLoader, select_autoescape
class DocGenerator():
""" Generates human readable docs from attack logs """
def __init__(self):
self.outfile = None
def __init__(self) -> None:
self.outfile: Optional[str] = None
def generate(self, jfile, outfile="tools/human_readable_documentation/source/contents.rst"):
def generate(self, jfile: str, outfile: str = "tools/human_readable_documentation/source/contents.rst") -> None:
""" Generates human readable documentation out of a template.
@param jfile: json attack log created by PurpleDome as data source
@ -39,12 +41,12 @@ class DocGenerator():
with open(outfile, "wt") as fh:
fh.write(rendered)
def compile_documentation(self):
def compile_documentation(self) -> None:
""" Compiles the documentation using make """
os.system("cd tools/human_readable_documentation ; make html; make latexpdf ")
def get_outfile_paths(self):
def get_outfile_paths(self) -> list[str]:
""" Returns the path of the output file written """
return ["tools/human_readable_documentation/build/latex/purpledomesimulation.pdf"]

@ -13,7 +13,7 @@ from typing import Optional
from app.attack_log import AttackLog
from app.config import ExperimentConfig
from app.interface_sfx import CommandlineColors
from app.exceptions import ServerError, CalderaError, MachineError
from app.exceptions import ServerError, CalderaError, MachineError, PluginError
from app.pluginmanager import PluginManager
from app.doc_generator import DocGenerator
from app.calderacontrol import CalderaControl
@ -26,7 +26,7 @@ from plugins.base.attack import AttackPlugin
class Experiment():
""" Class handling experiments """
def __init__(self, configfile: str, verbosity=0):
def __init__(self, configfile: str, verbosity: int = 0) -> None:
"""
:param configfile: Path to the configfile to load
@ -43,7 +43,7 @@ class Experiment():
self.attack_logger = AttackLog(verbosity)
self.plugin_manager = PluginManager(self.attack_logger)
def run(self, caldera_attacks: list = None):
def run(self, caldera_attacks: list = None) -> None:
"""
Run the experiment
@ -118,7 +118,7 @@ class Experiment():
zip_this += document_generator.get_outfile_paths()
self.zip_loot(zip_this)
def run_plugin_attacks(self):
def run_plugin_attacks(self) -> None:
""" Run plugin based attacks
"""
@ -137,7 +137,7 @@ class Experiment():
time.sleep(self.experiment_config.get_nap_time())
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished attack plugins{CommandlineColors.ENDC}", 1)
def run_caldera_attacks(self, caldera_attacks: Optional[list[str]] = None):
def run_caldera_attacks(self, caldera_attacks: Optional[list[str]] = None) -> None:
""" Run caldera based attacks
@ -203,17 +203,21 @@ class Experiment():
# End of fix
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Finished Caldera attacks{CommandlineColors.ENDC}", 1)
def add_running_machines_to_log(self):
def add_running_machines_to_log(self) -> None:
""" Add machine infos for targets and attacker to the log """
for target in self.targets:
if target is None:
raise MachineError("Target machine configured to None or whatever happened")
i = target.get_machine_info()
i["role"] = "target"
self.attack_logger.add_machine_info(i)
if self.attacker_1 is None:
raise MachineError("Attacker machine gone")
i = self.attacker_1.get_machine_info()
i["role"] = "attacker"
self.attack_logger.add_machine_info(i)
def wait_until_all_targets_have_caldera_implants(self, caldera_url: str, caldera_attacks: Optional[list[str]] = None):
def wait_until_all_targets_have_caldera_implants(self, caldera_url: str, caldera_attacks: Optional[list[str]] = None) -> None:
"""
:param caldera_attacks: a list of command line defined caldera attacks
@ -236,7 +240,7 @@ class Experiment():
time.sleep(120) # Was 30, but maybe there are timing issues
running_agents = self.caldera_control.list_paws_of_running_agents()
def first_start_of_caldera_implants(self, caldera_attacks: Optional[list[str]] = None):
def first_start_of_caldera_implants(self, caldera_attacks: Optional[list[str]] = None) -> None:
""" Start caldera implant on the targets
:param caldera_attacks: a list of command line defined caldera attacks
@ -253,7 +257,7 @@ class Experiment():
time.sleep(20) # Wait for all the clients to contact the caldera server
# TODO: Smarter wait
def install_sensor_plugins(self):
def install_sensor_plugins(self) -> None:
""" Installs sensor plugins on the targets
"""
@ -262,7 +266,7 @@ class Experiment():
a_target.install_sensors()
a_target.start_sensors()
def install_vulnerabilities(self):
def install_vulnerabilities(self) -> None:
""" Install vulnerabilities on the targets
"""
@ -271,7 +275,7 @@ class Experiment():
a_target.install_vulnerabilities()
a_target.start_vulnerabilities()
def start_target_machines(self, caldera_attacks: Optional[list[str]] = None):
def start_target_machines(self, caldera_attacks: Optional[list[str]] = None) -> None:
""" Start target machines
:param caldera_attacks: Caldera attacks as defined on the command line
@ -309,7 +313,7 @@ class Experiment():
self.attack_logger.vprint(f"{CommandlineColors.OKGREEN}Target is up: {tname} {CommandlineColors.ENDC}", 1)
self.targets.append(target_1)
def machine_needs_caldera(self, target, caldera_from_cmdline: Optional[list[str]] = None) -> int:
def machine_needs_caldera(self, target: Machine, caldera_from_cmdline: Optional[list[str]] = None) -> int:
""" Counts the attacks and plugins needing caldera that are registered for this machine
:param target: Target machine we will check the config file for assigned caldera attacks for
@ -328,7 +332,7 @@ class Experiment():
return c_cmdline + c_conffile + c_plugins
def attack(self, target, attack):
def attack(self, target: Machine, attack: str) -> None:
""" Pick an attack and run it
:param attack: Name of the attack to run
@ -338,20 +342,24 @@ class Experiment():
for plugin in self.plugin_manager.get_plugins(AttackPlugin, [attack]):
name = plugin.get_name()
if isinstance(plugin, AttackPlugin):
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Attack plugin {name}{CommandlineColors.ENDC}", 2)
plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name()))
plugin.set_attacker_machine(self.attacker_1)
plugin.set_sysconf({})
plugin.set_logger(self.attack_logger)
if self.caldera_control is None:
raise CalderaError("Caldera control not initialised")
plugin.set_caldera(self.caldera_control)
plugin.connect_metasploit()
plugin.install()
# plugin.__set_logger__(self.attack_logger)
plugin.__execute__([target])
else:
raise PluginError("AttackPlugin is not really an AttackPlugin type")
self.attack_logger.vprint(f"{CommandlineColors.OKBLUE}Running Attack plugin {name}{CommandlineColors.ENDC}", 2)
plugin.process_config(self.experiment_config.attack_conf(plugin.get_config_section_name()))
plugin.set_attacker_machine(self.attacker_1)
plugin.set_sysconf({})
plugin.set_logger(self.attack_logger)
plugin.set_caldera(self.caldera_control)
plugin.connect_metasploit()
plugin.install()
# plugin.__set_logger__(self.attack_logger)
plugin.__execute__([target])
def zip_loot(self, zip_this: list[str]):
def zip_loot(self, zip_this: list[str]) -> None:
""" Zip the loot together
:param zip_this: A list of file paths to add to the zip file
@ -373,7 +381,7 @@ class Experiment():
default_name = os.path.join(self.loot_dir, "..", "most_recent.zip")
shutil.copyfile(filename, default_name)
def __start_attacker(self):
def __start_attacker(self) -> None:
""" Start the attacking VM """
# Preparing attacker
@ -401,6 +409,8 @@ class Experiment():
raise ServerError
# self.attacker_1.set_attack_logger(self.attack_logger)
def __stop_attacker(self):
def __stop_attacker(self) -> None:
""" Stop the attacking VM """
if self.attacker_1 is None:
raise MachineError("Attacker machine not initialised")
self.attacker_1.halt()

Loading…
Cancel
Save