From 0aa5a4ed8af8454659bf53bd45fbc53fe1ca1daa Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Thu, 20 Jan 2022 17:07:53 +0100 Subject: [PATCH] More caldera features supported --- app/calderacontrol_4.py | 184 +++++++++++++++++++++++++++++++++++++++- caldera_control.py | 85 +++++++++++++++++-- 2 files changed, 261 insertions(+), 8 deletions(-) diff --git a/app/calderacontrol_4.py b/app/calderacontrol_4.py index 0eb7c2f..920ef4b 100644 --- a/app/calderacontrol_4.py +++ b/app/calderacontrol_4.py @@ -123,12 +123,14 @@ class Fact: score: int limit_count: int relationships: list[str] - technique_id: str - collected_by: str source: str trait: str links: list[str] created: str + origin_type: Optional[str] = None + value: Optional[str] = None + technique_id: Optional[str] = None + collected_by: Optional[str] = None @dataclass @@ -208,10 +210,98 @@ class Agent: pending_contact: str privilege: Optional[str] = None # Error, not documented + @dataclass class AgentList: agents: conlist(Agent, min_items=1) + +@dataclass +class Rule: + match: str + trait: str + action: Optional[str] = None + + +@dataclass +class Adjustment: + offset: int + trait: str + value: str + ability_id: str + + +@dataclass +class Source: + name: str + plugin: str + facts: list[Fact] + rules: list[Rule] + relationships: list[Relationship] + id: str + adjustments: Optional[list[Adjustment]] = None + + +@dataclass +class SourceList: + sources: list[Source] + + +@dataclass +class Planner: + module: str + name: str + plugin: str + id: str + stopping_conditions: list[Fact] + params: dict + ignore_enforcement_module: list[str] + description: str + allow_repeatable_abilities: bool + + +@dataclass +class Goal: + target: str + count: int + achieved: bool + operator: str + value: str + + +@dataclass +class Objective: + percentage: int + name: str + goals: list[Goal] + description: str + id: str + + +@dataclass +class Operation: + obfuscator: str + state: str + jitter: str + autonomous: int + name: str + source: Source + adversary: Adversary + objective: Objective + host_group: list[Agent] + start: str + group: str + use_learning_parsers: bool + planner: Planner + visibility: int + id: str + auto_close: bool + + +@dataclass +class OperationList: + operations: conlist(Operation) + class CalderaControl(): """ Remote control Caldera through REST api """ @@ -256,7 +346,16 @@ class CalderaControl(): else: raise ValueError try: - res = request.json() + if request.status_code == 200: + res = request.json() + # Comment: Sometimes we get a 204: succcess, but not content in response + elif request.status_code == 204: + res = {"result": "ok", + "http_status_code": 204} + else: + print(f"Status code: {request.status_code}") + res = request.json() + except simplejson.errors.JSONDecodeError as exception: # type: ignore print("!!! Error !!!!") print(payload) @@ -291,6 +390,24 @@ class CalderaControl(): adversaries = AdversaryList(**data) return adversaries + def list_sources(self): + """ Return all sources """ + + payload = None + data = {"sources": self.__contact_server__(payload, method="get", rest_path="api/v2/sources")} + print(data) + sources = SourceList(**data) + return sources + + def list_operations(self): + """ Return all operations """ + + payload = None + data = {"operations": self.__contact_server__(payload, method="get", rest_path="api/v2/operations")} + print(data) + operations = OperationList(**data) + return operations + def list_agents(self): """ Return all agents """ @@ -300,6 +417,67 @@ class CalderaControl(): agents = AgentList(**data) return agents + # TODO: list_sources + # TODO: list_sources_for_name + # TODO: list_facts_for_name + # TODO: list_paws_of_running_agents + # TODO: list_objectives + # TODO: get_operation + # TODO: get_adversary + # TODO: get_source + # TODO: get_ability + # TODO: does_ability_support_platform + # TODO: get_operation_by_id + # TODO: view_operation_report + # TODO: view_operation_output + # TODO: add_sources + # TODO: add_operation + # TODO: execute_operation + # TODO: delete_operation + # TODO: delete_agent + # TODO: kill_agent + + # TODO is_operation_finished + # TODO: attack + + + def add_adversary(self, name: str, ability: str, description: str = "created automatically"): + payload = { + # "adversary_id": "string", + "atomic_ordering": [ + ability + ], + "name": name, + # "plugin": "string", + "objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc', # default objective + # "tags": [ + # "string" + # ], + "description": description + } + data = {"agents": self.__contact_server__(payload, method="post", rest_path="api/v2/adversaries")} + print(data) + # agents = AgentList(**data) + return data + + def delete_adversary(self, adversary_id: str): + payload = None + data = {"agents": self.__contact_server__(payload, method="delete", rest_path=f"api/v2/adversaries/{adversary_id}")} + print(data) + # agents = AgentList(**data) + return data + + def add_operations(self, adversary_id): + payload = { + "adversary": {"adversary_id": adversary_id}, + "planner": {"id": "foo"}, + "source": {"id": "foo"} + } + data = {"agents": self.__contact_server__(payload, method="post", rest_path="api/v2/operations")} + print(data) + # agents = AgentList(**data) + return data + def get_ability(self, abid: str): """" Return an ability by id diff --git a/caldera_control.py b/caldera_control.py index 7deeea8..a36da13 100755 --- a/caldera_control.py +++ b/caldera_control.py @@ -11,6 +11,9 @@ from pprint import pprint from app.attack_log import AttackLog +class CmdlineArgumentException(Exception): + pass + # https://caldera.readthedocs.io/en/latest/The-REST-API.html # TODO: Check if attack is finished @@ -130,6 +133,53 @@ def adversaries(calcontrol, arguments): for ob in advs: print(ob) + if arguments.add: + if arguments.ability_id is None: + raise CmdlineArgumentException("Creating an adversary requires an ability id") + if arguments.name is None: + raise CmdlineArgumentException("Creating an adversary requires an adversary name") + res = calcontrol.add_adversary(arguments.name, arguments.ability_id) + if arguments.delete: + if arguments.adversary_id is None: + raise CmdlineArgumentException("Deleting an adversary requires an adversary id") + res = calcontrol.delete_adversary(arguments.adversary_id) + + +def sources(calcontrol, arguments): + """ Manage sources caldera control + + @param calcontrol: Connection to the caldera server + @param arguments: Parser command line arguments + """ + + if arguments.list: + srcs = calcontrol.list_sources().__dict__["sources"] + # ob_ids = [aid.ability_id for aid in obfuscators] + # print(ob_ids) + + for ob in srcs: + print(ob) + + +def operations(calcontrol, arguments): + """ Manage operations caldera control + + @param calcontrol: Connection to the caldera server + @param arguments: Parser command line arguments + """ + + if arguments.list: + ops = calcontrol.list_operations().__dict__["operations"] + # ob_ids = [aid.ability_id for aid in obfuscators] + # print(ob_ids) + + for ob in ops: + print(ob) + + if arguments.add: + if arguments.adversary_id is None: + raise CmdlineArgumentException("Adding an operation requires an adversary id") + ops = calcontrol.add_operations(arguments.adversary_id) def attack(calcontrol, arguments): @@ -185,17 +235,39 @@ def create_parser(): parser_facts = subparsers.add_parser("add_facts", help="facts") parser_facts.set_defaults(func=add_facts) - # Sub parser to list obfuscators + # Sub parser for obfuscators parser_obfuscators = subparsers.add_parser("obfuscators", help="obfuscators") parser_obfuscators.set_defaults(func=obfuscators) parser_obfuscators.add_argument("--list", default=False, action="store_true", help="List all obfuscators") - # Sub parser to list adversaries + # Sub parser for adversaries parser_adversaries = subparsers.add_parser("adversaries", help="adversaries") parser_adversaries.set_defaults(func=adversaries) parser_adversaries.add_argument("--list", default=False, action="store_true", - help="List all obfuscators") + help="List all adversaries") + parser_adversaries.add_argument("--add", default=False, action="store_true", + help="Add a new adversary") + parser_adversaries.add_argument("--ability_id", "--abid", default=None, help="Ability ID") + parser_adversaries.add_argument("--ability_name", default=None, help="Adversary name") + parser_adversaries.add_argument("--delete", default=False, action="store_true", + help="Delete adversary") + parser_adversaries.add_argument("--adversary_id", "--advid", default=None, help="Adversary ID") + + # Sub parser for operations + parser_operations = subparsers.add_parser("operations", help="operations") + parser_operations.set_defaults(func=operations) + parser_operations.add_argument("--list", default=False, action="store_true", + help="List all operations") + parser_operations.add_argument("--add", default=False, action="store_true", + help="Add a new operations") + parser_operations.add_argument("--adversary_id", "--advid", default=None, help="Adversary ID") + + # Sub parser for sources + parser_sources = subparsers.add_parser("sources", help="sources") + parser_sources.set_defaults(func=sources) + parser_sources.add_argument("--list", default=False, action="store_true", + help="List all sources") # For all parsers main_parser.add_argument("--caldera_url", help="caldera url, including port", default="http://localhost:8888/") @@ -213,5 +285,8 @@ if __name__ == "__main__": attack_logger = AttackLog(args.verbose) caldera_control = CalderaControl(args.caldera_url, attack_logger, config=None, apikey=args.apikey) print("Caldera Control ready") - - str(args.func(caldera_control, args)) + try: + str(args.func(caldera_control, args)) + except CmdlineArgumentException as ex: + parser.print_help() + print(f"\nCommandline error: {ex}")