From e49e8abd45cf604d8179940e1a8a9cd7a54324f7 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Tue, 18 Jan 2022 16:35:32 +0100 Subject: [PATCH] First bunch of Caldera 4 API calls --- app/calderacontrol_4.py | 335 ++++++++++++++++++++++++++++++++++++++++ caldera_control.py | 84 ++++++++-- 2 files changed, 406 insertions(+), 13 deletions(-) create mode 100644 app/calderacontrol_4.py mode change 100644 => 100755 caldera_control.py diff --git a/app/calderacontrol_4.py b/app/calderacontrol_4.py new file mode 100644 index 0000000..0eb7c2f --- /dev/null +++ b/app/calderacontrol_4.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python3 + +""" Remote control a caldera 4 server. Starting compatible to the old control 2.8 calderacontrol. Maybe it will stop being compatible if refactoring is an option """ + +import json +import os +import time + +from pprint import pprint, pformat +from typing import Optional +import requests +import simplejson +from typing import Optional +from pydantic.dataclasses import dataclass +from pydantic import conlist, constr # pylint: disable=no-name-in-module + +from app.exceptions import CalderaError +from app.interface_sfx import CommandlineColors + + +# TODO: Ability deserves an own class. +# TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)" + +@dataclass +class Variation: + description: str + command: str + +@dataclass +class ParserConfig: + source: str + edge: str + target: str + custom_parser_vals: dict # undocumented ! Needs improvement ! TODO + +@dataclass +class Parser: + module: str + relationships: list[ParserConfig] # undocumented ! Needs improvement ! TODO + parserconfigs: Optional[list[ParserConfig]] = None + +@dataclass +class Requirement: + module: str + relationship_match: list[dict] + +@dataclass +class AdditionalInfo: + additionalProp1: Optional[str] = None + additionalProp2: Optional[str] = None + additionalProp3: Optional[str] = None + +@dataclass +class Executor: + build_target: Optional[str] # Why can this be None ? + language: Optional[str] # Why can this be None ? + payloads: list[str] + variations: list[Variation] + additional_info: Optional[AdditionalInfo] + parsers: list[Parser] + cleanup: list[str] + name: str + timeout: int + code: Optional[str] # Why can this be None ? + uploads: list[str] + platform: str + command: Optional[str] + +@dataclass +class Ability: + description: str + plugin: str + technique_name: str + requirements: list[Requirement] + additional_info: AdditionalInfo + singleton: bool + buckets: list[str] + access: dict + executors: list[Executor] + name: str + technique_id: str + tactic: str + repeatable: str + ability_id: str + privilege: Optional[str] = None + + +@dataclass +class AbilityList: + abilities: conlist(Ability, min_items=1) + + +@dataclass +class Obfuscator: + description: str + name: str + module: Optional[str] = None # Documentation error !!! + +@dataclass +class ObfuscatorList: + obfuscators: conlist(Obfuscator, min_items=1) + +@dataclass +class Adversary: + has_repeatable_abilities: bool + adversary_id: str + description: str + name: str + atomic_ordering: list[str] + objective: str + plugin: str + tags: list[str] + +@dataclass +class AdversaryList: + adversaries: conlist(Adversary, min_items=1) + + +@dataclass +class Fact: + unique: str + name: str + score: int + limit_count: int + relationships: list[str] + technique_id: str + collected_by: str + source: str + trait: str + links: list[str] + created: str + + +@dataclass +class Relationship: + target: Fact + unique: str + score: int + edge: str + origin: str + source: Fact + + +@dataclass +class Visibility: + score: int + adjustments: list[int] + + +@dataclass +class Link: + pin: int + ability: Ability + paw: str + status: int + finish: str + decide: str + output: str + visibility: Visibility + pid: str + host: str + executor: Executor + unique: str + score: int + used: list[Fact] + facts: list[Fact] + agent_reported_time: str + id: str + collect: str + command: str + cleanup: int + relationships: list[Relationship] + jitter: int + deadman: bool + + + +@dataclass +class Agent: + paw: str + location: str + platform: str + last_seen: str # Error in document + host_ip_addrs: list[str] + group: str + architecture: str + pid: int + server: str + trusted: bool + username: str + host: str + ppid: int + created: str + links: list[Link] + sleep_max: int + exe_name: str + display_name: str + sleep_min: int + contact: str + deadman_enabled: bool + proxy_receivers: AdditionalInfo + origin_link_id: str + executors: list[str] + watchdog: int + proxy_chain: list[list[str]] + available_contacts: list[str] + upstream_dest: str + pending_contact: str + privilege: Optional[str] = None # Error, not documented + +@dataclass +class AgentList: + agents: conlist(Agent, min_items=1) + +class CalderaControl(): + """ Remote control Caldera through REST api """ + + def __init__(self, server: str, attack_logger, config=None, apikey=None): + """ + + @param server: Caldera server url/ip + @param attack_logger: The attack logger to use + @param config: The configuration + """ + # print(server) + self.url = server if server.endswith("/") else server + "/" + self.attack_logger = attack_logger + + self.config = config + + if self.config: + self.apikey = self.config.caldera_apikey() + else: + self.apikey = apikey + + def __contact_server__(self, payload, rest_path: str = "api/v2/abilities", method: str = "get"): + """ + + @param payload: payload as dict to send to the server + @param rest_path: specific path for this rest api + @param method: http method to use + """ + url = self.url + rest_path + header = {"KEY": "ADMIN123", + "accept": "application/json"} + if method.lower() == "post": + request = requests.post(url, headers=header, data=json.dumps(payload)) + elif method.lower() == "put": + request = requests.put(url, headers=header, data=json.dumps(payload)) + elif method.lower() == "get": + request = requests.get(url, headers=header, data=json.dumps(payload)) + elif method.lower() == "head": + request = requests.head(url, headers=header, data=json.dumps(payload)) + elif method.lower() == "delete": + request = requests.delete(url, headers=header, data=json.dumps(payload)) + else: + raise ValueError + try: + res = request.json() + except simplejson.errors.JSONDecodeError as exception: # type: ignore + print("!!! Error !!!!") + print(payload) + print(request.text) + print("!!! Error !!!!") + raise exception + + return res + + def list_abilities(self): + """ Return all ablilities """ + + payload = None + data = {"abilities": self.__contact_server__(payload, method="get", rest_path="api/v2/abilities")} + abilities = AbilityList(**data) + return abilities + + def list_obfuscators(self): + """ Return all obfuscators """ + + payload = None + data = {"obfuscators": self.__contact_server__(payload, method="get", rest_path="api/v2/obfuscators")} + obfuscators = ObfuscatorList(**data) + return obfuscators + + def list_adversaries(self): + """ Return all adversaries """ + + payload = None + data = {"adversaries": self.__contact_server__(payload, method="get", rest_path="api/v2/adversaries")} + print(data) + adversaries = AdversaryList(**data) + return adversaries + + def list_agents(self): + """ Return all agents """ + + payload = None + data = {"agents": self.__contact_server__(payload, method="get", rest_path="api/v2/agents")} + print(data) + agents = AgentList(**data) + return agents + + def get_ability(self, abid: str): + """" Return an ability by id + + @param abid: Ability id + """ + + res = [] + + print(f"Number of abilities: {len(self.list_abilities())}") + + with open("debug_removeme.txt", "wt") as fh: + fh.write(pformat(self.list_abilities())) + + for ability in self.list_abilities(): + if ability.get("ability_id", None) == abid or ability.get("auto_generated_guid", None) == abid: + res.append(ability) + return res + + def pretty_print_ability(self, abi): + """ Pretty pritns an ability + + @param abi: A ability dict + """ + + print(""" + TTP: {technique_id} + Technique name: {technique_name} + Tactic: {tactic} + Name: {name} + ID: {ability_id} + Description: {description} + + """.format(**abi)) \ No newline at end of file diff --git a/caldera_control.py b/caldera_control.py old mode 100644 new mode 100755 index fd9474a..7deeea8 --- a/caldera_control.py +++ b/caldera_control.py @@ -4,7 +4,10 @@ import argparse -from app.calderacontrol import CalderaControl +#from app.calderacontrol import CalderaControl +from app.calderacontrol_4 import CalderaControl +from pprint import pprint + from app.attack_log import AttackLog @@ -15,13 +18,17 @@ from app.attack_log import AttackLog # TODO: Get results of a specific attack # Arpgparse handling -def list_agents(calcontrol, arguments): # pylint: disable=unused-argument - """ Call list agents in caldera control +def agents(calcontrol, arguments): # pylint: disable=unused-argument + """ Agents in caldera control @param calcontrol: Connection to the caldera server @param arguments: Parser command line arguments """ - print(f"Running agents: {calcontrol.list_agents()}") + + if arguments.list: + for agent in calcontrol.list_agents().__dict__["agents"]: + print(agent) + def list_facts(calcontrol, arguments): # pylint: disable=unused-argument @@ -77,14 +84,52 @@ def list_abilities(calcontrol, arguments): @param arguments: Parser command line arguments """ - abilities = arguments.ability_ids + abilities = calcontrol.list_abilities().__dict__["abilities"] + + if arguments.list: + abi_ids = [aid.ability_id for aid in abilities] + print(abi_ids) + + for abi in abilities: + for executor in abi.executors: + for parser in executor.parsers: + pprint(parser.relationships) - if arguments.all: - abilities = [aid["ability_id"] for aid in calcontrol.list_abilities()] + #for aid in abilities: + # for ability in calcontrol.get_ability(aid): + # calcontrol.pretty_print_ability(ability) + + +def obfuscators(calcontrol, arguments): + """ Manage obfuscators caldera control + + @param calcontrol: Connection to the caldera server + @param arguments: Parser command line arguments + """ - for aid in abilities: - for ability in calcontrol.get_ability(aid): - calcontrol.pretty_print_ability(ability) + if arguments.list: + obfs = calcontrol.list_obfuscators().__dict__["obfuscators"] + # ob_ids = [aid.ability_id for aid in obfuscators] + # print(ob_ids) + + for ob in obfs: + print(ob) + + +def adversaries(calcontrol, arguments): + """ Manage adversaries caldera control + + @param calcontrol: Connection to the caldera server + @param arguments: Parser command line arguments + """ + + if arguments.list: + advs = calcontrol.list_adversaries().__dict__["adversaries"] + # ob_ids = [aid.ability_id for aid in obfuscators] + # print(ob_ids) + + for ob in advs: + print(ob) def attack(calcontrol, arguments): @@ -122,11 +167,12 @@ def create_parser(): parser_abilities.set_defaults(func=list_abilities) parser_abilities.add_argument("--ability_ids", default=[], nargs="+", help="The abilities to look up. One or more ids") - parser_abilities.add_argument("--all", default=False, action="store_true", + parser_abilities.add_argument("--list", default=False, action="store_true", help="List all abilities") parser_agents = subparsers.add_parser("agents", help="agents") - parser_agents.set_defaults(func=list_agents) + parser_agents.set_defaults(func=agents) + parser_agents.add_argument("--list", default=False, action="store_true", help="List all agents") parser_delete_agents = subparsers.add_parser("delete_agents", help="agents") parser_delete_agents.add_argument("--paw", default=None, help="PAW to delete. if not set it will delete all agents") @@ -139,8 +185,20 @@ def create_parser(): parser_facts = subparsers.add_parser("add_facts", help="facts") parser_facts.set_defaults(func=add_facts) + # Sub parser to list obfuscators + parser_obfuscators = subparsers.add_parser("obfuscators", help="obfuscators") + parser_obfuscators.set_defaults(func=obfuscators) + parser_obfuscators.add_argument("--list", default=False, action="store_true", + help="List all obfuscators") + + # Sub parser to list adversaries + parser_adversaries = subparsers.add_parser("adversaries", help="adversaries") + parser_adversaries.set_defaults(func=adversaries) + parser_adversaries.add_argument("--list", default=False, action="store_true", + help="List all obfuscators") + # For all parsers - main_parser.add_argument("--caldera_url", help="caldera url, including port", default="http://192.168.178.125:8888/") + main_parser.add_argument("--caldera_url", help="caldera url, including port", default="http://localhost:8888/") main_parser.add_argument("--apikey", help="caldera api key", default="ADMIN123") return main_parser