You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
PurpleDome/app/calderaapi_2.py

283 lines
11 KiB
Python

#!/usr/bin/env python3
""" Direct API to the caldera server. Not abstract simplification methods. Compatible with Caldera 2.8.1 """
import json
from typing import Optional, Any
import requests
import simplejson
from app.attack_log import AttackLog
from app.config import ExperimentConfig
from app.exceptions import ConfigurationError
class CalderaAPI:
""" API to Caldera 2.8.1 """
def __init__(self, server: str, attack_logger: AttackLog, config: Optional[ExperimentConfig] = None, apikey: str = None) -> None:
"""
@param server: Caldera server url/ip
@param attack_logger: The attack logger to use
@param config: The configuration
"""
# print(server)
self.url = server if server.endswith("/") else server + "/"
self.attack_logger = attack_logger
self.config = config
self.apikey: str = ""
if self.config is not None:
self.apikey = self.config.caldera_apikey()
else:
if apikey is None:
raise ConfigurationError("No APIKEY configured")
self.apikey = apikey
def __contact_server__(self, payload, rest_path: str = "api/rest", method: str = "post") -> Any:
"""
@param payload: payload as dict to send to the server
@param rest_path: specific path for this rest api
@param method: http method to use
"""
url = self.url + rest_path
header = {"KEY": self.apikey,
"Content-Type": "application/json"}
if method.lower() == "post":
request = requests.post(url, headers=header, data=json.dumps(payload))
elif method.lower() == "put":
request = requests.put(url, headers=header, data=json.dumps(payload))
elif method.lower() == "get":
request = requests.get(url, headers=header, data=json.dumps(payload))
elif method.lower() == "delete":
request = requests.delete(url, headers=header, data=json.dumps(payload))
else:
raise ValueError
try:
res = request.json()
except simplejson.errors.JSONDecodeError as exception: # type: ignore
print("!!! Error !!!!")
print(payload)
print(request.text)
print("!!! Error !!!!")
raise exception
return res
def list_operations(self) -> Any:
""" Return operations """
payload = {"index": "operations"}
return self.__contact_server__(payload)
def list_abilities(self):
""" Return all ablilities """
# curl -H 'KEY: ADMIN123' http://192.168.178.102:8888/api/rest -H 'Content-Type: application/json' -d '{"index":"abilities"}'
payload = {"index": "abilities"}
return self.__contact_server__(payload)
def list_agents(self):
""" List running agents
"""
# TODO: Add filters for specific platforms/executors : , platform_filter=None, executor_filter=None as parameters
# curl -H 'KEY: ADMIN123' http://192.168.178.102:8888/api/rest -H 'Content-Type: application/json' -d '{"index":"agents"}'
payload = {"index": "agents"}
agents = self.__contact_server__(payload)
return agents
def list_sources(self):
""" List stored facts
"""
# TODO: Add filters for specific platforms/executors : , platform_filter=None, executor_filter=None as parameters
# curl -H 'KEY: ADMIN123' http://192.168.178.102:8888/api/rest -H 'Content-Type: application/json' -d '{"index":"agents"}'
payload = {"index": "sources"}
facts = self.__contact_server__(payload)
return facts
def list_adversaries(self):
""" List registered adversaries """
# curl -H 'KEY: ADMIN123' http://192.168.178.102:8888/api/rest -H 'Content-Type: application/json' -d '{"index":"adversaries"}'
payload = {"index": "adversaries"}
return self.__contact_server__(payload)
def list_objectives(self):
""" List registered objectives """
# curl -H 'KEY: ADMIN123' http://192.168.178.102:8888/api/rest -H 'Content-Type: application/json' -d '{"index":"objectives"}'
payload = {"index": "objectives"}
return self.__contact_server__(payload)
def add_sources(self, name: str, parameters):
""" Adds a data source and seeds it with facts """
payload = {"index": "sources",
"name": name,
# "id": "123456-1234-1234-1234-12345678",
"rules": [],
"relationships": []
}
facts = []
if parameters is not None:
for key, value in parameters.items():
facts.append({"trait": key, "value": value})
# TODO: We need something better than a dict here as payload to have strong typing
payload["facts"] = facts # type: ignore
print(payload)
return self.__contact_server__(payload, method="put")
def add_operation(self, **kwargs):
""" Adds a new operation
@param name: Name of the operation
@param advid: Adversary id
@param group: agent group to attack
@param state: state to initially set
@param obfuscator: obfuscator to use for the attack
@param jitter: jitter to use for the attack
@param parameters: parameters to pass to the ability
"""
# name: str, advid: str, group: str = "red", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8', parameters=None
name: str = kwargs.get("name")
advid: str = kwargs.get("adversary_id")
group: str = kwargs.get("group", "red")
state: str = kwargs.get("state", "running")
obfuscator: str = kwargs.get("obfuscator", "plain-text")
jitter: str = kwargs.get("jitter", "4/8")
parameters = kwargs.get("parameters", None)
# Add operation: curl -X PUT -H "KEY:$KEY" http://127.0.0.1:8888/api/rest -d '{"index":"operations","name":"testoperation1"}'
# observed from GUI sniffing: PUT {'name': 'schnuffel2', 'group': 'red', 'adversary_id': '0f4c3c67-845e-49a0-927e-90ed33c044e0', 'state': 'running', 'planner': 'atomic', 'autonomous': '1', 'obfuscator': 'plain-text', 'auto_close': '1', 'jitter': '4/8', 'source': 'Alice Filters', 'visibility': '50'}
sources_name = "source_" + name
self.add_sources(sources_name, parameters)
# To verify:
# print(self.get_source(sources_name))
payload = {"index": "operations",
"name": name,
"state": state,
"autonomous": 1,
'obfuscator': obfuscator,
'auto_close': '1',
'jitter': jitter,
'source': sources_name,
'visibility': '50',
"group": group,
#
"planner": "atomic",
"adversary_id": advid,
}
return self.__contact_server__(payload, method="put")
def view_operation_report(self, opid: str):
""" views the operation report
@param opid: Operation id to look for
"""
# let postData = selectedOperationId ? {'index':'operation_report', 'op_id': selectedOperationId, 'agent_output': Number(agentOutput)} : null;
# checking it (from snifffing protocol at the server): POST {'id': 539687}
payload = {"index": "operation_report",
"op_id": opid,
'agent_output': 1
}
return self.__contact_server__(payload)
def set_operation_state(self, operation_id: str, state: str = "running"):
""" Executes an operation on a server
@param operation_id: The operation to modify
@param state: The state to set this operation into
"""
# TODO: Change state of an operation: curl -X POST -H "KEY:ADMIN123" http://localhost:8888/api/rest -d '{"index":"operation", "op_id":123, "state":"finished"}'
# curl -X POST -H "KEY:ADMIN123" http://localhost:8888/api/rest -d '{"index":"operation", "op_id":123, "state":"finished"}'
if state not in ["running", "finished", "paused", "run_one_link", "cleanup"]:
raise ValueError
payload = {"index": "operation",
"op_id": operation_id,
"state": state}
return self.__contact_server__(payload)
def add_adversary(self, name: str, ability: str, description: str = "created automatically"):
""" Adds a new adversary
@param name: Name of the adversary
@param ability: One ability for this adversary
@param description: Description of this adversary
"""
# Add operation: curl -X PUT -H "KEY:$KEY" http://127.0.0.1:8888/api/rest -d '{"index":"operations","name":"testoperation1"}'
# Sniffed from gui:
# Rest core: PUT adversaries {'name': 'removeme', 'description': 'description', 'atomic_ordering': [{'id': 'bd527b63-9f9e-46e0-9816-b8434d2b8989'}], 'id': '558932cb-3ac6-43d2-b821-2db0fa8ad469', 'objective': ''}
# Returns: [{'name': 'removeme', 'adversary_id': '558932cb-3ac6-43d2-b821-2db0fa8ad469', 'description': 'description', 'tags': [], 'atomic_ordering': ['bd527b63-9f9e-46e0-9816-b8434d2b8989'], 'objective': '495a9828-cab1-44dd-a0ca-66e58177d8cc'}]
payload = {"index": "adversaries",
"name": name,
"description": description,
"atomic_ordering": [{"id": ability}],
#
"objective": '495a9828-cab1-44dd-a0ca-66e58177d8cc' # default objective
# "objective": ''
}
return self.__contact_server__(payload, method="put")
# curl -X DELETE http://localhost:8888/api/rest -d '{"index":"operations","id":"$operation_id"}'
def delete_operation(self, opid: str):
""" Delete operation by id
@param opid: Operation id
"""
payload = {"index": "operations",
"id": opid}
return self.__contact_server__(payload, method="delete")
def delete_adversary(self, adid: str):
""" Delete adversary by id
@param adid: Adversary id
"""
payload = {"index": "adversaries",
"adversary_id": [{"adversary_id": adid}]}
return self.__contact_server__(payload, method="delete")
def delete_agent(self, paw: str):
""" Delete a specific agent from the kali db. implant may still be running and reconnect
@param paw: The Id of the agent to delete
"""
payload = {"index": "adversaries",
"paw": paw}
return self.__contact_server__(payload, method="delete")
def kill_agent(self, paw: str):
""" Send a message to an agent to kill itself
@param paw: The Id of the agent to delete
"""
payload = {"index": "agents",
"paw": paw,
"watchdog": 1,
"sleep_min": 3,
"sleep_max": 3}
return self.__contact_server__(payload, method="put")