Made pylint happy

pull/14/head
Thorsten Sick 3 years ago
parent 97c5277062
commit 8253f54326

@ -2,27 +2,36 @@
""" Pydantic verifier for config structure """ """ Pydantic verifier for config structure """
from pydantic.dataclasses import dataclass
from pydantic import conlist
from typing import Optional
from enum import Enum from enum import Enum
from typing import Optional
from pydantic.dataclasses import dataclass
from pydantic import conlist # pylint: disable=no-name-in-module
# TODO: Move from has_key to iterators and "is in"
class OSEnum(str, Enum): class OSEnum(str, Enum):
linux = "linux" """ List of all supported OS-es """
windows = "windows" LINUX = "linux"
WINDOWS = "windows"
class VMControllerTypeEnum(str, Enum): class VMControllerTypeEnum(str, Enum):
vagrant = "vagrant" """ List of all supported controlled plugins. This is only done for VM controller plugins !
running_vm = "running_vm" I do not expect many new ones. And typos in config can be a waste of time. Let's see if I am right. """
VAGRANT = "vagrant"
RUNNING_VM = "running_vm"
@dataclass @dataclass
class CalderaConfig: class CalderaConfig:
""" Configuration for the Caldera server """
apikey: str apikey: str
def has_key(self, keyname): def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys(): if keyname in self.__dict__.keys():
return True return True
return False return False
@ -30,11 +39,15 @@ class CalderaConfig:
@dataclass @dataclass
class VMController: class VMController:
""" Configuration for the VM controller """
vm_type: VMControllerTypeEnum vm_type: VMControllerTypeEnum
vagrantfilepath: str vagrantfilepath: str
ip: Optional[str] = "" ip: Optional[str] = "" # pylint: disable=invalid-name
def has_key(self, keyname): def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys(): if keyname in self.__dict__.keys():
return True return True
return False return False
@ -47,21 +60,28 @@ class VMController:
@dataclass @dataclass
class Attacker: class Attacker:
""" Configuration for a attacker VM """
name: str name: str
vm_controller: VMController vm_controller: VMController
vm_name: str vm_name: str
nicknames: Optional[list[str]] nicknames: Optional[list[str]]
machinepath: str machinepath: str
os: OSEnum os: OSEnum # pylint: disable=invalid-name
use_existing_machine: bool = False use_existing_machine: bool = False
playground: Optional[str] = None playground: Optional[str] = None
def has_key(self, keyname): def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys(): if keyname in self.__dict__.keys():
return True return True
return False return False
def get(self, keyname, default=None): def get(self, keyname, default=None):
""" Returns the value of a specific key
Required for compatibility with DotMap which is used in Unit tests
"""
if self.has_key(keyname): if self.has_key(keyname):
return self.__dict__[keyname] return self.__dict__[keyname]
return default return default
@ -69,10 +89,11 @@ class Attacker:
@dataclass @dataclass
class Target: class Target:
""" Configuration for a target VM """
name: str name: str
vm_controller: VMController vm_controller: VMController
vm_name: str vm_name: str
os: OSEnum os: OSEnum # pylint: disable=invalid-name
paw: str paw: str
group: str group: str
machinepath: str machinepath: str
@ -88,11 +109,17 @@ class Target:
vulnerabilities: list[str] = None vulnerabilities: list[str] = None
def has_key(self, keyname): def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys(): if keyname in self.__dict__.keys():
return True return True
return False return False
def get(self, keyname, default=None): def get(self, keyname, default=None):
""" Returns the value of a specific key
Required for compatibility with DotMap which is used in Unit tests
"""
if self.has_key(keyname): if self.has_key(keyname):
return self.__dict__[keyname] return self.__dict__[keyname]
return default return default
@ -100,11 +127,15 @@ class Target:
@dataclass @dataclass
class AttackConfig: class AttackConfig:
""" Generic configuration for attacks """
caldera_obfuscator: str = "plain-text" caldera_obfuscator: str = "plain-text"
caldera_jitter: str = "4/8" caldera_jitter: str = "4/8"
nap_time: int = 5 nap_time: int = 5
def has_key(self, keyname): def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys(): if keyname in self.__dict__.keys():
return True return True
return False return False
@ -112,15 +143,22 @@ class AttackConfig:
@dataclass @dataclass
class AttackList: class AttackList:
""" A list of attacks to run. Either plugin based or caldera based """
linux: Optional[list[str]] linux: Optional[list[str]]
windows: Optional[list[str]] windows: Optional[list[str]]
def has_key(self, keyname): def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys(): if keyname in self.__dict__.keys():
return True return True
return False return False
def get(self, keyname, default=None): def get(self, keyname, default=None):
""" Returns the value of a specific key
Required for compatibility with DotMap which is used in Unit tests
"""
if self.has_key(keyname): if self.has_key(keyname):
return self.__dict__[keyname] return self.__dict__[keyname]
return default return default
@ -128,9 +166,13 @@ class AttackList:
@dataclass @dataclass
class Results: class Results:
""" What to do with the results """
loot_dir: str loot_dir: str
def has_key(self, keyname): def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys(): if keyname in self.__dict__.keys():
return True return True
return False return False
@ -138,6 +180,7 @@ class Results:
@dataclass @dataclass
class MainConfig: class MainConfig:
""" Central configuration for PurpleDome """
caldera: CalderaConfig caldera: CalderaConfig
attackers: conlist(Attacker, min_items=1) attackers: conlist(Attacker, min_items=1)
targets: conlist(Target, min_items=1) targets: conlist(Target, min_items=1)
@ -151,6 +194,9 @@ class MainConfig:
sensor_conf: Optional[dict] sensor_conf: Optional[dict]
def has_key(self, keyname): def has_key(self, keyname):
""" Checks if a key exists
Required for compatibility with DotMap which is used in Unit tests
"""
if keyname in self.__dict__.keys(): if keyname in self.__dict__.keys():
return True return True
return False return False

@ -279,18 +279,18 @@ class Experiment():
defaultname = os.path.join(self.lootdir, "..", "most_recent.zip") defaultname = os.path.join(self.lootdir, "..", "most_recent.zip")
shutil.copyfile(filename, defaultname) shutil.copyfile(filename, defaultname)
@staticmethod # @staticmethod
def __get_results_files(root): # def __get_results_files(root):
""" Yields a list of potential result files # """ Yields a list of potential result files
@param root: Root dir of the machine to collect data from # @param root: Root dir of the machine to collect data from
""" # """
# TODO: Properly implement. Get proper root parameter # # TODO: Properly implement. Get proper root parameter
total = [os.path.join(root, "logstash", "filebeat.json")] # total = [os.path.join(root, "logstash", "filebeat.json")]
for a_file in total: # for a_file in total:
if os.path.exists(a_file): # if os.path.exists(a_file):
yield a_file # yield a_file
# def __clean_result_files(self, root): # def __clean_result_files(self, root):
# """ Deletes result files # """ Deletes result files

@ -54,7 +54,7 @@ class Metasploit():
exp = self.get_client().modules.use('exploit', exploit) exp = self.get_client().modules.use('exploit', exploit)
# print(exploit.description) # print(exploit.description)
# print(exploit.missing_required) # print(exploit.missing_required)
pl = self.get_client().modules.use('payload', payload) pl = self.get_client().modules.use('payload', payload) # pylint: disable=invalid-name
# print(payload.description) # print(payload.description)
# print(payload.missing_required) # print(payload.missing_required)
if lhost is None: if lhost is None:
@ -208,7 +208,7 @@ class Metasploit():
if payload_type is None: if payload_type is None:
raise MetasploitError("Payload not defined") raise MetasploitError("Payload not defined")
try: try:
ip = socket.gethostbyname(self.attacker.get_ip()) ip = socket.gethostbyname(self.attacker.get_ip()) # pylint: disable=invalid-name
self.start_exploit_stub_for_external_payload(payload_type, lhost=kwargs.get("lhost", ip)) self.start_exploit_stub_for_external_payload(payload_type, lhost=kwargs.get("lhost", ip))
self.wait_for_session(2) self.wait_for_session(2)
except MetasploitError: except MetasploitError:
@ -459,7 +459,7 @@ class MetasploitInstant(Metasploit):
description = "Migrating to another process can escalate privileges, move the meterpreter to a long running process or evade detection. For that the Meterpreter stub is injected into another process and the new stub then connects to the Metasploit server instead of the old one." description = "Migrating to another process can escalate privileges, move the meterpreter to a long running process or evade detection. For that the Meterpreter stub is injected into another process and the new stub then connects to the Metasploit server instead of the old one."
process_list = self.ps_process_discovery(target) process_list = self.ps_process_discovery(target)
ps = self.parse_ps(process_list[0]) ps = self.parse_ps(process_list[0]) # pylint: disable=invalid-name
filtered_list = self.filter_ps_results(ps, user, name, arch) filtered_list = self.filter_ps_results(ps, user, name, arch)
if len(filtered_list) == 0: if len(filtered_list) == 0:

@ -265,8 +265,7 @@ class PluginManager():
if section["name"] == subclass_name: if section["name"] == subclass_name:
subclass = section["subclass"] subclass = section["subclass"]
if subclass is None: if subclass is None:
print("Use proper subclass. Available subclasses are: ") print("Use proper subclass")
"\n- ".join(list(sections["name"]))
plugins = self.get_plugins(subclass, [name]) plugins = self.get_plugins(subclass, [name])
for plugin in plugins: for plugin in plugins:

@ -29,12 +29,12 @@ class BasePlugin():
def get_filename(self): def get_filename(self):
""" Returns the current filename. """ """ Returns the current filename. """
cf = currentframe() cf = currentframe() # pylint: disable=invalid-name
return cf.f_back.filename return cf.f_back.filename
def get_linenumber(self): def get_linenumber(self):
""" Returns the current linenumber. """ """ Returns the current linenumber. """
cf = currentframe() cf = currentframe() # pylint: disable=invalid-name
return cf.f_back.f_lineno return cf.f_back.f_lineno
def get_playground(self): def get_playground(self):

@ -167,7 +167,7 @@ class SSHFeatures(BasePlugin):
do_retry = False do_retry = False
try: try:
res = self.connection.get(src, dst) res = self.connection.get(src, dst)
except (paramiko.ssh_exception.NoValidConnectionsError, UnexpectedExit) as error: except (UnexpectedExit) as error:
if retry <= 0: if retry <= 0:
raise NetworkError from error raise NetworkError from error
do_retry = True do_retry = True

@ -1,11 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" A command line tool to verify PurpleDome configuration files """
import argparse import argparse
from pprint import pprint
import sys
import yaml import yaml
from app.config_verifier import MainConfig from app.config_verifier import MainConfig
def load(filename): def load(filename):
""" Loads the config file and feeds it into the built in verifier """
with open(filename) as fh: with open(filename) as fh:
data = yaml.safe_load(fh) data = yaml.safe_load(fh)
return MainConfig(**data) return MainConfig(**data)
@ -22,8 +27,13 @@ def create_parser():
if __name__ == "__main__": if __name__ == "__main__":
arguments = create_parser().parse_args() arguments = create_parser().parse_args()
r = load(arguments.filename) try:
print(r) r = load(arguments.filename)
print(r.caldera.apikey) except TypeError as e:
# print(r.blarg) print("Config file has error(s):")
print(dir(r.__dict__)) print(e)
sys.exit(1)
print("Loaded successfully: ")
pprint(r)
sys.exit(0)

@ -140,7 +140,27 @@ disable=print-statement,
exception-escape, exception-escape,
comprehension-escape, comprehension-escape,
line-too-long, line-too-long,
fixme fixme,
# Added: no-self-use: We do not want stray external functions. And decorating it is not worth the mess
no-self-use,
# too-many-arguments: Must be fixed sooner or later in a refactoring. Not now
too-many-arguments,
# too-many-public-methods: Must be fixed sooner or later in a refactoring. Not now
too-many-public-methods,
# too-many-locals: Must be fixed sooner or later in a refactoring. Not now
too-many-locals,
# too-many-instance-attributes: Must be fixed sooner or later in a refactoring. Not now
too-many-instance-attributes,
# too-many-branches: Must be fixed sooner or later in a refactoring. Not now
too-many-branches,
# too-many-statements: Must be fixed sooner or later in a refactoring. Not now
too-many-statements,
# too-many-nested-blocks: Must be fixed sooner or later in a refactoring. Not now
too-many-nested-blocks,
# duplicate lines in files. Removed from the standard RC file. Will be used in a stricter one for manual code reviews
R0801
# Enable the message, report, category or checker with the given id(s). You can # Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option # either give multiple identifier separated by comma (,) or put this option

Loading…
Cancel
Save