Type checking 2

pull/12/head
Thorsten Sick 3 years ago
parent 8d9e1b025b
commit 442c89e8c6

@ -23,4 +23,4 @@ mypy:
# Fixing mypy file by file
stepbystep:
mypy --strict-optional plugins/base/plugin_base.py plugins/base/machinery.py app/config.py plugins/base/caldera.py plugins/base/attack.py plugins/base/sensor.py plugins/base/ssh_features.py plugins/base/vulnerability_plugin.py
mypy --strict-optional plugins/base/plugin_base.py plugins/base/machinery.py app/config.py plugins/base/caldera.py plugins/base/attack.py plugins/base/sensor.py plugins/base/ssh_features.py plugins/base/vulnerability_plugin.py app/attack_log.py app/calderacontrol.py

@ -13,6 +13,8 @@ import simplejson
from app.exceptions import CalderaError
from app.interface_sfx import CommandlineColors
from typing import Optional
# TODO: Ability deserves an own class.
# TODO: Support all Caldera agents: "Sandcat (GoLang)","Elasticat (Blue Python/ Elasticsearch)","Manx (Reverse Shell TCP)","Ragdoll (Python/HTML)"
@ -20,7 +22,7 @@ from app.interface_sfx import CommandlineColors
class CalderaControl():
""" Remote control Caldera through REST api """
def __init__(self, server, attack_logger, config=None, apikey=None):
def __init__(self, server: str, attack_logger, config=None, apikey=None):
"""
@param server: Caldera server url/ip
@ -38,7 +40,7 @@ class CalderaControl():
else:
self.apikey = apikey
def fetch_client(self, platform="windows", file="sandcat.go", target_dir=".", extension=""):
def fetch_client(self, platform: str = "windows", file: str = "sandcat.go", target_dir: str = ".", extension: str = ""):
""" Downloads the appropriate Caldera client
@param platform: Platform to download the agent for
@ -56,7 +58,7 @@ class CalderaControl():
# print(r.headers)
return filename
def __contact_server__(self, payload, rest_path="api/rest", method="post"):
def __contact_server__(self, payload, rest_path: str = "api/rest", method: str = "post"):
"""
@param payload: payload as dict to send to the server
@ -78,7 +80,7 @@ class CalderaControl():
raise ValueError
try:
res = request.json()
except simplejson.errors.JSONDecodeError as exception:
except simplejson.errors.JSONDecodeError as exception: # type: ignore
print("!!! Error !!!!")
print(payload)
print(request.text)
@ -88,7 +90,7 @@ class CalderaControl():
return res
# ############## List
def list_links(self, opid):
def list_links(self, opid: str):
""" List links associated with an operation
@param opid: operation id to list links for
@ -98,7 +100,7 @@ class CalderaControl():
"op_id": opid}
return self.__contact_server__(payload)
def list_results(self, linkid):
def list_results(self, linkid: str):
""" List results for a link
@param linkid: ID of the link
@ -143,7 +145,7 @@ class CalderaControl():
facts = self.__contact_server__(payload)
return facts
def list_sources_for_name(self, name):
def list_sources_for_name(self, name: str):
""" List facts in a source pool with a specific name """
for i in self.list_sources():
@ -151,7 +153,7 @@ class CalderaControl():
return i
return None
def list_facts_for_name(self, name):
def list_facts_for_name(self, name: str):
""" Pretty format for facts
@param name: Name of the source ot look into
@ -188,7 +190,7 @@ class CalderaControl():
# ######### Get one specific item
def get_operation(self, name):
def get_operation(self, name: str):
""" Gets an operation by name
@param name: Name of the operation to look for
@ -199,7 +201,7 @@ class CalderaControl():
return operation
return None
def get_adversary(self, name):
def get_adversary(self, name: str):
""" Gets a specific adversary by name
@param name: Name to look for
@ -209,7 +211,7 @@ class CalderaControl():
return adversary
return None
def get_objective(self, name):
def get_objective(self, name: str):
""" Returns an objective with a given name
@param name: Name to filter for
@ -221,7 +223,7 @@ class CalderaControl():
# ######### Get by id
def get_source(self, source_name):
def get_source(self, source_name: str):
""" Retrieves data source and detailed facts
@param: The name of the source
@ -231,7 +233,7 @@ class CalderaControl():
"name": source_name}
return self.__contact_server__(payload)
def get_ability(self, abid):
def get_ability(self, abid: str):
"""" Return an ability by id
@param abid: Ability id
@ -258,7 +260,7 @@ class CalderaControl():
return True
return False
def get_operation_by_id(self, op_id):
def get_operation_by_id(self, op_id: str):
""" Get operation by id
@param op_id: Operation id
@ -267,7 +269,7 @@ class CalderaControl():
"id": op_id}
return self.__contact_server__(payload)
def get_result_by_id(self, linkid):
def get_result_by_id(self, linkid: str):
""" Get the result from a link id
@param linkid: link id
@ -276,7 +278,7 @@ class CalderaControl():
"link_id": linkid}
return self.__contact_server__(payload)
def get_linkid(self, op_id, paw, ability_id):
def get_linkid(self, op_id: str, paw: str, ability_id: str):
""" Get the id of a link identified by paw and ability_id
@param op_id: Operation id
@ -296,7 +298,7 @@ class CalderaControl():
# ######### View
def view_operation_report(self, opid):
def view_operation_report(self, opid: str):
""" views the operation report
@param opid: Operation id to look for
@ -310,7 +312,7 @@ class CalderaControl():
}
return self.__contact_server__(payload)
def view_operation_output(self, opid, paw, ability_id):
def view_operation_output(self, opid: str, paw: str, ability_id: str):
""" Gets the output of an executed ability
@param opid: Id of the operation to look for
@ -336,7 +338,7 @@ class CalderaControl():
# ######### Add
def add_sources(self, name, parameters):
def add_sources(self, name: str, parameters):
""" Adds a data source and seeds it with facts """
payload = {"index": "sources",
@ -350,12 +352,14 @@ class CalderaControl():
if parameters is not None:
for key, value in parameters.items():
facts.append({"trait": key, "value": value})
payload["facts"] = facts
# TODO: We need something better than a dict here as payload to have strong typing
payload["facts"] = facts # type: ignore
print(payload)
return self.__contact_server__(payload, method="put")
def add_operation(self, name, advid, group="red", state="running", obfuscator="plain-text", jitter='4/8', parameters=None):
def add_operation(self, name: str, advid: str, group: str = "red", state: str = "running", obfuscator: str = "plain-text", jitter: str = '4/8', parameters=None):
""" Adds a new operation
@param name: Name of the operation
@ -393,7 +397,7 @@ class CalderaControl():
return self.__contact_server__(payload, method="put")
def add_adversary(self, name, ability, description="created automatically"):
def add_adversary(self, name: str, ability: str, description: str = "created automatically"):
""" Adds a new adversary
@param name: Name of the adversary
@ -421,7 +425,7 @@ class CalderaControl():
# TODO View the abilities a given agent could execute. curl -H "key:$API_KEY" -X POST localhost:8888/plugin/access/abilities -d '{"paw":"$PAW"}'
def execute_ability(self, paw, ability_id, obfuscator="plain-text", parameters=None):
def execute_ability(self, paw: str, ability_id: str, obfuscator: str = "plain-text", parameters=None):
""" Executes an ability on a target. This happens outside of the scop of an operation. You will get no result of the ability back
@param paw: Paw of the target
@ -441,13 +445,15 @@ class CalderaControl():
if parameters is not None:
for key, value in parameters.items():
facts.append({"trait": key, "value": value})
payload["facts"] = facts
print(payload)
# TODO. We need something better than a dict here for strong typing
payload["facts"] = facts # type: ignore
# print(payload)
return self.__contact_server__(payload, rest_path="plugin/access/exploit_ex")
def execute_operation(self, operation_id, state="running"):
def execute_operation(self, operation_id: str, state: str = "running"):
""" Executes an operation on a server
@param operation_id: The operation to modify
@ -468,7 +474,7 @@ class CalderaControl():
# ######### Delete
# curl -X DELETE http://localhost:8888/api/rest -d '{"index":"operations","id":"$operation_id"}'
def delete_operation(self, opid):
def delete_operation(self, opid: str):
""" Delete operation by id
@param opid: Operation id
@ -477,7 +483,7 @@ class CalderaControl():
"id": opid}
return self.__contact_server__(payload, method="delete")
def delete_adversary(self, adid):
def delete_adversary(self, adid: str):
""" Delete adversary by id
@param adid: Adversary id
@ -486,7 +492,7 @@ class CalderaControl():
"adversary_id": [{"adversary_id": adid}]}
return self.__contact_server__(payload, method="delete")
def delete_agent(self, paw):
def delete_agent(self, paw: str):
""" Delete a specific agent from the kali db. implant may still be running and reconnect
@param paw: The Id of the agent to delete
@ -495,7 +501,7 @@ class CalderaControl():
"paw": paw}
return self.__contact_server__(payload, method="delete")
def kill_agent(self, paw):
def kill_agent(self, paw: str):
""" Send a message to an agent to kill itself
@param paw: The Id of the agent to delete
@ -529,7 +535,7 @@ class CalderaControl():
# Link, chain and stuff
def is_operation_finished(self, opid, debug=False):
def is_operation_finished(self, opid: str, debug: bool = False):
""" Checks if an operation finished - finished is not necessary successful !
@param opid: Operation id to check
@ -559,7 +565,7 @@ class CalderaControl():
return False
def is_operation_finished_multi(self, opid):
def is_operation_finished_multi(self, opid: str):
""" Checks if an operation finished - finished is not necessary successful ! On several targets.
All links (~ abilities) on all targets must have the status 0 for this to be True.
@ -589,7 +595,8 @@ class CalderaControl():
# ######## All inclusive methods
def attack(self, paw="kickme", ability_id="bd527b63-9f9e-46e0-9816-b8434d2b8989", group="red", target_platform=None, parameters=None, **kwargs):
def attack(self, paw: str = "kickme", ability_id: str = "bd527b63-9f9e-46e0-9816-b8434d2b8989",
group: str = "red", target_platform: Optional[str] = None, parameters: Optional[str] = None, **kwargs):
""" Attacks a system and returns results
@param paw: Paw to attack

@ -7,7 +7,7 @@ from app.exceptions import PluginError, ConfigurationError
from app.calderacontrol import CalderaControl
# from app.metasploit import MSFVenom, Metasploit
from typing import Optional
from machinery import MachineryPlugin
from plugins.base.machinery import MachineryPlugin
class AttackPlugin(BasePlugin):

@ -5,6 +5,7 @@
from plugins.base.plugin_base import BasePlugin
from typing import Optional
class VulnerabilityPlugin(BasePlugin):
""" A plugin that installs a vulnerable application or does vulnerable configuration changes on the target VM
"""

Loading…
Cancel
Save