Unit tests ok, integration test works

caldera_4
Thorsten Sick 2 years ago
parent 896bfa8ff8
commit c5abcad563

@ -1,8 +1,8 @@
#!/usr/bin/env python3
""" Direct API to the caldera server. Not abstract simplification methods. Compatible with Caldera 2.8.1 """
import requests
import json
import requests
import simplejson

@ -5,9 +5,9 @@
import json
from pprint import pformat
from typing import Optional, Union
import requests
import simplejson
from typing import Optional, Union
from pydantic.dataclasses import dataclass
from pydantic import conlist # pylint: disable=no-name-in-module
@ -47,9 +47,9 @@ class Requirement:
@dataclass
class AdditionalInfo:
additionalProp1: Optional[str] = None
additionalProp2: Optional[str] = None
additionalProp3: Optional[str] = None
additionalProp1: Optional[str] = None # pylint: disable=invalid-name
additionalProp2: Optional[str] = None # pylint: disable=invalid-name
additionalProp3: Optional[str] = None # pylint: disable=invalid-name
@dataclass
@ -71,6 +71,7 @@ class Executor:
@dataclass
class Ability:
""" An ability is an exploit, a TTP, an attack step ...more or less... """
description: str
plugin: str
technique_name: str
@ -90,11 +91,13 @@ class Ability:
@dataclass
class AbilityList:
""" A list of exploits """
abilities: conlist(Ability, min_items=1)
@dataclass
class Obfuscator:
""" An obfuscator hides the attack by encryption/encoding """
description: str
name: str
module: Optional[str] = None # Documentation error !!!
@ -102,11 +105,13 @@ class Obfuscator:
@dataclass
class ObfuscatorList:
""" A list of obfuscators """
obfuscators: conlist(Obfuscator, min_items=1)
@dataclass
class Adversary:
""" An adversary is a defined attacker """
has_repeatable_abilities: bool
adversary_id: str
description: str
@ -119,6 +124,7 @@ class Adversary:
@dataclass
class AdversaryList:
""" A list of adversary """
adversaries: conlist(Adversary, min_items=1)
@ -173,7 +179,7 @@ class Link:
used: list[Fact]
facts: list[Fact]
agent_reported_time: str
id: str
id: str # pylint: disable=invalid-name
collect: str
command: str
cleanup: int
@ -184,6 +190,7 @@ class Link:
@dataclass
class Agent:
""" A representation of an agent on the target (agent = implant) """
paw: str
location: str
platform: str
@ -218,6 +225,7 @@ class Agent:
@dataclass
class AgentList:
""" A list of agents """
agents: conlist(Agent)
@ -243,7 +251,7 @@ class Source:
facts: list[Fact]
rules: list[Rule]
relationships: list[Relationship]
id: str
id: str # pylint: disable=invalid-name
adjustments: Optional[list[Adjustment]] = None
@ -254,9 +262,10 @@ class SourceList:
@dataclass
class Planner:
""" A logic defining the order in which attack steps are executed """
name: str
plugin: str
id: str
id: str # pylint: disable=invalid-name
stopping_conditions: list[Fact]
params: dict
description: str
@ -286,11 +295,12 @@ class Objective:
name: str
goals: list[Goal]
description: str
id: str
id: str # pylint: disable=invalid-name
@dataclass
class Operation:
""" An attack operation collecting all the relevant items (obfuscator, adversary, planner) """
obfuscator: str
state: str
jitter: str
@ -305,7 +315,7 @@ class Operation:
use_learning_parsers: bool
planner: Planner
visibility: int
id: str
id: str # pylint: disable=invalid-name
auto_close: bool
chain: Optional[list] = None
@ -533,11 +543,11 @@ class CalderaControl():
operations = OperationList(**data)
return operations
def delete_operation(self, id):
def delete_operation(self, operation_id):
payload = {}
data = self.__contact_server__(payload, method="delete", rest_path=f"api/v2/operations/{id}")
data = self.__contact_server__(payload, method="delete", rest_path=f"api/v2/operations/{operation_id}")
return data
@ -554,7 +564,7 @@ class CalderaControl():
with open("debug_removeme.txt", "wt") as fh:
fh.write(pformat(self.list_abilities()))
for ability in self.list_abilities():
for ability in self.list_abilities()["abilities"]:
if ability.get("ability_id", None) == abid or ability.get("auto_generated_guid", None) == abid:
res.append(ability)
return res

@ -21,16 +21,6 @@ from app.calderaapi_2 import CalderaAPI
class CalderaControl(CalderaAPI):
""" Remote control Caldera through REST api """
def __init__(self, server: str, attack_logger, config=None, apikey=None):
"""
@param server: Caldera server url/ip
@param attack_logger: The attack logger to use
@param config: The configuration
"""
super().__init__(server, attack_logger, config, apikey)
def fetch_client(self, platform: str = "windows", file: str = "sandcat.go", target_dir: str = ".", extension: str = ""):
""" Downloads the appropriate Caldera client
@ -157,12 +147,12 @@ class CalderaControl(CalderaAPI):
@param op_id: Operation id
"""
ops = self.list_operations()
operations = self.list_operations()
if ops is not None:
for op in ops:
if op["id"] == op_id:
return [op]
if operations is not None:
for an_operation in operations:
if an_operation["id"] == op_id:
return [an_operation]
return []
def get_linkid(self, op_id: str, paw: str, ability_id: str):

@ -3,16 +3,17 @@
""" A command line tool to control a caldera server """
import argparse
from pprint import pprint
# from app.calderacontrol import CalderaControl
from app.calderaapi_4 import CalderaControl
from pprint import pprint
from app.attack_log import AttackLog
class CmdlineArgumentException(Exception):
pass
""" An error in the user supplied command line """
# https://caldera.readthedocs.io/en/latest/The-REST-API.html
@ -30,8 +31,7 @@ def agents(calcontrol, arguments): # pylint: disable=unused-argument
"""
if arguments.list:
agents = calcontrol.list_agents().__dict__["agents"]
print(agents)
print(calcontrol.list_agents().__dict__["agents"])
if arguments.delete:
print(calcontrol.delete_agent(arguments.paw))
if arguments.kill:
@ -82,8 +82,8 @@ def list_abilities(calcontrol, arguments):
for abi in abilities:
for executor in abi.executors:
for parser in executor.parsers:
pprint(parser.relationships)
for a_parser in executor.parsers:
pprint(a_parser.relationships)
def obfuscators(calcontrol, arguments):
@ -98,8 +98,8 @@ def obfuscators(calcontrol, arguments):
# ob_ids = [aid.ability_id for aid in obfuscators]
# print(ob_ids)
for ob in obfs:
print(ob)
for obfuscator in obfs:
print(obfuscator)
def objectives(calcontrol, arguments):
@ -110,12 +110,8 @@ def objectives(calcontrol, arguments):
"""
if arguments.list:
objectives = calcontrol.list_objectives().__dict__["objectives"]
# ob_ids = [aid.ability_id for aid in objectives]
# print(ob_ids)
for ob in objectives:
print(ob)
for objective in calcontrol.list_objectives().__dict__["objectives"]:
print(objective)
def adversaries(calcontrol, arguments):
@ -126,12 +122,8 @@ def adversaries(calcontrol, arguments):
"""
if arguments.list:
advs = calcontrol.list_adversaries().__dict__["adversaries"]
# ob_ids = [aid.ability_id for aid in obfuscators]
# print(ob_ids)
for ob in advs:
print(ob)
for adversary in calcontrol.list_adversaries().__dict__["adversaries"]:
print(adversary)
if arguments.add:
if arguments.ability_id is None:
raise CmdlineArgumentException("Creating an adversary requires an ability id")
@ -152,12 +144,8 @@ def sources(calcontrol, arguments):
"""
if arguments.list:
srcs = calcontrol.list_sources().__dict__["sources"]
# ob_ids = [aid.ability_id for aid in obfuscators]
# print(ob_ids)
for ob in srcs:
print(ob)
for a_source in calcontrol.list_sources().__dict__["sources"]:
print(a_source)
def planners(calcontrol, arguments):
@ -168,12 +156,8 @@ def planners(calcontrol, arguments):
"""
if arguments.list:
plns = calcontrol.list_planners().__dict__["planners"]
# ob_ids = [aid.ability_id for aid in obfuscators]
# print(ob_ids)
for ob in plns:
print(ob)
for a_planner in calcontrol.list_planners().__dict__["planners"]:
print(a_planner)
def operations(calcontrol, arguments):
@ -184,12 +168,8 @@ def operations(calcontrol, arguments):
"""
if arguments.list:
ops = calcontrol.list_operations().__dict__["operations"]
# ob_ids = [aid.ability_id for aid in obfuscators]
# print(ob_ids)
for ob in ops:
print(ob)
for an_operation in calcontrol.list_operations().__dict__["operations"]:
print(an_operation)
if arguments.add:
if arguments.adversary_id is None:

Loading…
Cancel
Save