More caldera features supported

caldera_4
Thorsten Sick 2 years ago
parent e49e8abd45
commit 0aa5a4ed8a

@ -123,12 +123,14 @@ class Fact:
score: int
limit_count: int
relationships: list[str]
technique_id: str
collected_by: str
source: str
trait: str
links: list[str]
created: str
origin_type: Optional[str] = None
value: Optional[str] = None
technique_id: Optional[str] = None
collected_by: Optional[str] = None
@dataclass
@ -208,10 +210,98 @@ class Agent:
pending_contact: str
privilege: Optional[str] = None # Error, not documented
@dataclass
class AgentList:
agents: conlist(Agent, min_items=1)
@dataclass
class Rule:
match: str
trait: str
action: Optional[str] = None
@dataclass
class Adjustment:
offset: int
trait: str
value: str
ability_id: str
@dataclass
class Source:
name: str
plugin: str
facts: list[Fact]
rules: list[Rule]
relationships: list[Relationship]
id: str
adjustments: Optional[list[Adjustment]] = None
@dataclass
class SourceList:
sources: list[Source]
@dataclass
class Planner:
module: str
name: str
plugin: str
id: str
stopping_conditions: list[Fact]
params: dict
ignore_enforcement_module: list[str]
description: str
allow_repeatable_abilities: bool
@dataclass
class Goal:
target: str
count: int
achieved: bool
operator: str
value: str
@dataclass
class Objective:
percentage: int
name: str
goals: list[Goal]
description: str
id: str
@dataclass
class Operation:
obfuscator: str
state: str
jitter: str
autonomous: int
name: str
source: Source
adversary: Adversary
objective: Objective
host_group: list[Agent]
start: str
group: str
use_learning_parsers: bool
planner: Planner
visibility: int
id: str
auto_close: bool
@dataclass
class OperationList:
operations: conlist(Operation)
class CalderaControl():
""" Remote control Caldera through REST api """
@ -256,7 +346,16 @@ class CalderaControl():
else:
raise ValueError
try:
res = request.json()
if request.status_code == 200:
res = request.json()
# Comment: Sometimes we get a 204: succcess, but not content in response
elif request.status_code == 204:
res = {"result": "ok",
"http_status_code": 204}
else:
print(f"Status code: {request.status_code}")
res = request.json()
except simplejson.errors.JSONDecodeError as exception: # type: ignore
print("!!! Error !!!!")
print(payload)
@ -291,6 +390,24 @@ class CalderaControl():
adversaries = AdversaryList(**data)
return adversaries
def list_sources(self):
""" Return all sources """
payload = None
data = {"sources": self.__contact_server__(payload, method="get", rest_path="api/v2/sources")}
print(data)
sources = SourceList(**data)
return sources
def list_operations(self):
""" Return all operations """
payload = None
data = {"operations": self.__contact_server__(payload, method="get", rest_path="api/v2/operations")}
print(data)
operations = OperationList(**data)
return operations
def list_agents(self):
""" Return all agents """
@ -300,6 +417,67 @@ class CalderaControl():
agents = AgentList(**data)
return agents
# TODO: list_sources
# TODO: list_sources_for_name
# TODO: list_facts_for_name
# TODO: list_paws_of_running_agents
# TODO: list_objectives
# TODO: get_operation
# TODO: get_adversary
# TODO: get_source
# TODO: get_ability
# TODO: does_ability_support_platform
# TODO: get_operation_by_id
# TODO: view_operation_report
# TODO: view_operation_output
# TODO: add_sources
# TODO: add_operation
# TODO: execute_operation
# TODO: delete_operation
# TODO: delete_agent
# TODO: kill_agent
# TODO is_operation_finished
# TODO: attack
def add_adversary(self, name: str, ability: str, description: str = "created automatically"):
payload = {
# "adversary_id": "string",
"atomic_ordering": [
ability
],
"name": name,
# "plugin": "string",
"objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc', # default objective
# "tags": [
# "string"
# ],
"description": description
}
data = {"agents": self.__contact_server__(payload, method="post", rest_path="api/v2/adversaries")}
print(data)
# agents = AgentList(**data)
return data
def delete_adversary(self, adversary_id: str):
payload = None
data = {"agents": self.__contact_server__(payload, method="delete", rest_path=f"api/v2/adversaries/{adversary_id}")}
print(data)
# agents = AgentList(**data)
return data
def add_operations(self, adversary_id):
payload = {
"adversary": {"adversary_id": adversary_id},
"planner": {"id": "foo"},
"source": {"id": "foo"}
}
data = {"agents": self.__contact_server__(payload, method="post", rest_path="api/v2/operations")}
print(data)
# agents = AgentList(**data)
return data
def get_ability(self, abid: str):
"""" Return an ability by id

@ -11,6 +11,9 @@ from pprint import pprint
from app.attack_log import AttackLog
class CmdlineArgumentException(Exception):
pass
# https://caldera.readthedocs.io/en/latest/The-REST-API.html
# TODO: Check if attack is finished
@ -130,6 +133,53 @@ def adversaries(calcontrol, arguments):
for ob in advs:
print(ob)
if arguments.add:
if arguments.ability_id is None:
raise CmdlineArgumentException("Creating an adversary requires an ability id")
if arguments.name is None:
raise CmdlineArgumentException("Creating an adversary requires an adversary name")
res = calcontrol.add_adversary(arguments.name, arguments.ability_id)
if arguments.delete:
if arguments.adversary_id is None:
raise CmdlineArgumentException("Deleting an adversary requires an adversary id")
res = calcontrol.delete_adversary(arguments.adversary_id)
def sources(calcontrol, arguments):
""" Manage sources caldera control
@param calcontrol: Connection to the caldera server
@param arguments: Parser command line arguments
"""
if arguments.list:
srcs = calcontrol.list_sources().__dict__["sources"]
# ob_ids = [aid.ability_id for aid in obfuscators]
# print(ob_ids)
for ob in srcs:
print(ob)
def operations(calcontrol, arguments):
""" Manage operations caldera control
@param calcontrol: Connection to the caldera server
@param arguments: Parser command line arguments
"""
if arguments.list:
ops = calcontrol.list_operations().__dict__["operations"]
# ob_ids = [aid.ability_id for aid in obfuscators]
# print(ob_ids)
for ob in ops:
print(ob)
if arguments.add:
if arguments.adversary_id is None:
raise CmdlineArgumentException("Adding an operation requires an adversary id")
ops = calcontrol.add_operations(arguments.adversary_id)
def attack(calcontrol, arguments):
@ -185,17 +235,39 @@ def create_parser():
parser_facts = subparsers.add_parser("add_facts", help="facts")
parser_facts.set_defaults(func=add_facts)
# Sub parser to list obfuscators
# Sub parser for obfuscators
parser_obfuscators = subparsers.add_parser("obfuscators", help="obfuscators")
parser_obfuscators.set_defaults(func=obfuscators)
parser_obfuscators.add_argument("--list", default=False, action="store_true",
help="List all obfuscators")
# Sub parser to list adversaries
# Sub parser for adversaries
parser_adversaries = subparsers.add_parser("adversaries", help="adversaries")
parser_adversaries.set_defaults(func=adversaries)
parser_adversaries.add_argument("--list", default=False, action="store_true",
help="List all obfuscators")
help="List all adversaries")
parser_adversaries.add_argument("--add", default=False, action="store_true",
help="Add a new adversary")
parser_adversaries.add_argument("--ability_id", "--abid", default=None, help="Ability ID")
parser_adversaries.add_argument("--ability_name", default=None, help="Adversary name")
parser_adversaries.add_argument("--delete", default=False, action="store_true",
help="Delete adversary")
parser_adversaries.add_argument("--adversary_id", "--advid", default=None, help="Adversary ID")
# Sub parser for operations
parser_operations = subparsers.add_parser("operations", help="operations")
parser_operations.set_defaults(func=operations)
parser_operations.add_argument("--list", default=False, action="store_true",
help="List all operations")
parser_operations.add_argument("--add", default=False, action="store_true",
help="Add a new operations")
parser_operations.add_argument("--adversary_id", "--advid", default=None, help="Adversary ID")
# Sub parser for sources
parser_sources = subparsers.add_parser("sources", help="sources")
parser_sources.set_defaults(func=sources)
parser_sources.add_argument("--list", default=False, action="store_true",
help="List all sources")
# For all parsers
main_parser.add_argument("--caldera_url", help="caldera url, including port", default="http://localhost:8888/")
@ -213,5 +285,8 @@ if __name__ == "__main__":
attack_logger = AttackLog(args.verbose)
caldera_control = CalderaControl(args.caldera_url, attack_logger, config=None, apikey=args.apikey)
print("Caldera Control ready")
str(args.func(caldera_control, args))
try:
str(args.func(caldera_control, args))
except CmdlineArgumentException as ex:
parser.print_help()
print(f"\nCommandline error: {ex}")

Loading…
Cancel
Save