From 8253f543266166042a7a874d0c1f7cb54920d9a3 Mon Sep 17 00:00:00 2001 From: Thorsten Sick Date: Thu, 11 Nov 2021 16:08:11 +0100 Subject: [PATCH] Made pylint happy --- app/config_verifier.py | 66 ++++++++++++++++++++++++++++++------ app/experimentcontrol.py | 24 ++++++------- app/metasploit.py | 6 ++-- app/pluginmanager.py | 3 +- plugins/base/plugin_base.py | 4 +-- plugins/base/ssh_features.py | 2 +- pydantic_test.py | 20 ++++++++--- pylint.rc | 22 +++++++++++- 8 files changed, 111 insertions(+), 36 deletions(-) diff --git a/app/config_verifier.py b/app/config_verifier.py index 1292892..3bdf90f 100644 --- a/app/config_verifier.py +++ b/app/config_verifier.py @@ -2,27 +2,36 @@ """ Pydantic verifier for config structure """ -from pydantic.dataclasses import dataclass -from pydantic import conlist -from typing import Optional from enum import Enum +from typing import Optional +from pydantic.dataclasses import dataclass +from pydantic import conlist # pylint: disable=no-name-in-module + +# TODO: Move from has_key to iterators and "is in" class OSEnum(str, Enum): - linux = "linux" - windows = "windows" + """ List of all supported OS-es """ + LINUX = "linux" + WINDOWS = "windows" class VMControllerTypeEnum(str, Enum): - vagrant = "vagrant" - running_vm = "running_vm" + """ List of all supported controlled plugins. This is only done for VM controller plugins ! + I do not expect many new ones. And typos in config can be a waste of time. Let's see if I am right. """ + VAGRANT = "vagrant" + RUNNING_VM = "running_vm" @dataclass class CalderaConfig: + """ Configuration for the Caldera server """ apikey: str def has_key(self, keyname): + """ Checks if a key exists + Required for compatibility with DotMap which is used in Unit tests + """ if keyname in self.__dict__.keys(): return True return False @@ -30,11 +39,15 @@ class CalderaConfig: @dataclass class VMController: + """ Configuration for the VM controller """ vm_type: VMControllerTypeEnum vagrantfilepath: str - ip: Optional[str] = "" + ip: Optional[str] = "" # pylint: disable=invalid-name def has_key(self, keyname): + """ Checks if a key exists + Required for compatibility with DotMap which is used in Unit tests + """ if keyname in self.__dict__.keys(): return True return False @@ -47,21 +60,28 @@ class VMController: @dataclass class Attacker: + """ Configuration for a attacker VM """ name: str vm_controller: VMController vm_name: str nicknames: Optional[list[str]] machinepath: str - os: OSEnum + os: OSEnum # pylint: disable=invalid-name use_existing_machine: bool = False playground: Optional[str] = None def has_key(self, keyname): + """ Checks if a key exists + Required for compatibility with DotMap which is used in Unit tests + """ if keyname in self.__dict__.keys(): return True return False def get(self, keyname, default=None): + """ Returns the value of a specific key + Required for compatibility with DotMap which is used in Unit tests + """ if self.has_key(keyname): return self.__dict__[keyname] return default @@ -69,10 +89,11 @@ class Attacker: @dataclass class Target: + """ Configuration for a target VM """ name: str vm_controller: VMController vm_name: str - os: OSEnum + os: OSEnum # pylint: disable=invalid-name paw: str group: str machinepath: str @@ -88,11 +109,17 @@ class Target: vulnerabilities: list[str] = None def has_key(self, keyname): + """ Checks if a key exists + Required for compatibility with DotMap which is used in Unit tests + """ if keyname in self.__dict__.keys(): return True return False def get(self, keyname, default=None): + """ Returns the value of a specific key + Required for compatibility with DotMap which is used in Unit tests + """ if self.has_key(keyname): return self.__dict__[keyname] return default @@ -100,11 +127,15 @@ class Target: @dataclass class AttackConfig: + """ Generic configuration for attacks """ caldera_obfuscator: str = "plain-text" caldera_jitter: str = "4/8" nap_time: int = 5 def has_key(self, keyname): + """ Checks if a key exists + Required for compatibility with DotMap which is used in Unit tests + """ if keyname in self.__dict__.keys(): return True return False @@ -112,15 +143,22 @@ class AttackConfig: @dataclass class AttackList: + """ A list of attacks to run. Either plugin based or caldera based """ linux: Optional[list[str]] windows: Optional[list[str]] def has_key(self, keyname): + """ Checks if a key exists + Required for compatibility with DotMap which is used in Unit tests + """ if keyname in self.__dict__.keys(): return True return False def get(self, keyname, default=None): + """ Returns the value of a specific key + Required for compatibility with DotMap which is used in Unit tests + """ if self.has_key(keyname): return self.__dict__[keyname] return default @@ -128,9 +166,13 @@ class AttackList: @dataclass class Results: + """ What to do with the results """ loot_dir: str def has_key(self, keyname): + """ Checks if a key exists + Required for compatibility with DotMap which is used in Unit tests + """ if keyname in self.__dict__.keys(): return True return False @@ -138,6 +180,7 @@ class Results: @dataclass class MainConfig: + """ Central configuration for PurpleDome """ caldera: CalderaConfig attackers: conlist(Attacker, min_items=1) targets: conlist(Target, min_items=1) @@ -151,6 +194,9 @@ class MainConfig: sensor_conf: Optional[dict] def has_key(self, keyname): + """ Checks if a key exists + Required for compatibility with DotMap which is used in Unit tests + """ if keyname in self.__dict__.keys(): return True return False diff --git a/app/experimentcontrol.py b/app/experimentcontrol.py index 571c2d5..811db63 100644 --- a/app/experimentcontrol.py +++ b/app/experimentcontrol.py @@ -279,18 +279,18 @@ class Experiment(): defaultname = os.path.join(self.lootdir, "..", "most_recent.zip") shutil.copyfile(filename, defaultname) - @staticmethod - def __get_results_files(root): - """ Yields a list of potential result files - - @param root: Root dir of the machine to collect data from - """ - # TODO: Properly implement. Get proper root parameter - - total = [os.path.join(root, "logstash", "filebeat.json")] - for a_file in total: - if os.path.exists(a_file): - yield a_file + # @staticmethod + # def __get_results_files(root): + # """ Yields a list of potential result files + + # @param root: Root dir of the machine to collect data from + # """ + # # TODO: Properly implement. Get proper root parameter + + # total = [os.path.join(root, "logstash", "filebeat.json")] + # for a_file in total: + # if os.path.exists(a_file): + # yield a_file # def __clean_result_files(self, root): # """ Deletes result files diff --git a/app/metasploit.py b/app/metasploit.py index 98bea06..cb323c1 100644 --- a/app/metasploit.py +++ b/app/metasploit.py @@ -54,7 +54,7 @@ class Metasploit(): exp = self.get_client().modules.use('exploit', exploit) # print(exploit.description) # print(exploit.missing_required) - pl = self.get_client().modules.use('payload', payload) + pl = self.get_client().modules.use('payload', payload) # pylint: disable=invalid-name # print(payload.description) # print(payload.missing_required) if lhost is None: @@ -208,7 +208,7 @@ class Metasploit(): if payload_type is None: raise MetasploitError("Payload not defined") try: - ip = socket.gethostbyname(self.attacker.get_ip()) + ip = socket.gethostbyname(self.attacker.get_ip()) # pylint: disable=invalid-name self.start_exploit_stub_for_external_payload(payload_type, lhost=kwargs.get("lhost", ip)) self.wait_for_session(2) except MetasploitError: @@ -459,7 +459,7 @@ class MetasploitInstant(Metasploit): description = "Migrating to another process can escalate privileges, move the meterpreter to a long running process or evade detection. For that the Meterpreter stub is injected into another process and the new stub then connects to the Metasploit server instead of the old one." process_list = self.ps_process_discovery(target) - ps = self.parse_ps(process_list[0]) + ps = self.parse_ps(process_list[0]) # pylint: disable=invalid-name filtered_list = self.filter_ps_results(ps, user, name, arch) if len(filtered_list) == 0: diff --git a/app/pluginmanager.py b/app/pluginmanager.py index bc101f7..b783d43 100644 --- a/app/pluginmanager.py +++ b/app/pluginmanager.py @@ -265,8 +265,7 @@ class PluginManager(): if section["name"] == subclass_name: subclass = section["subclass"] if subclass is None: - print("Use proper subclass. Available subclasses are: ") - "\n- ".join(list(sections["name"])) + print("Use proper subclass") plugins = self.get_plugins(subclass, [name]) for plugin in plugins: diff --git a/plugins/base/plugin_base.py b/plugins/base/plugin_base.py index 049a560..3ebcc9d 100644 --- a/plugins/base/plugin_base.py +++ b/plugins/base/plugin_base.py @@ -29,12 +29,12 @@ class BasePlugin(): def get_filename(self): """ Returns the current filename. """ - cf = currentframe() + cf = currentframe() # pylint: disable=invalid-name return cf.f_back.filename def get_linenumber(self): """ Returns the current linenumber. """ - cf = currentframe() + cf = currentframe() # pylint: disable=invalid-name return cf.f_back.f_lineno def get_playground(self): diff --git a/plugins/base/ssh_features.py b/plugins/base/ssh_features.py index 949682e..bf4789c 100644 --- a/plugins/base/ssh_features.py +++ b/plugins/base/ssh_features.py @@ -167,7 +167,7 @@ class SSHFeatures(BasePlugin): do_retry = False try: res = self.connection.get(src, dst) - except (paramiko.ssh_exception.NoValidConnectionsError, UnexpectedExit) as error: + except (UnexpectedExit) as error: if retry <= 0: raise NetworkError from error do_retry = True diff --git a/pydantic_test.py b/pydantic_test.py index 5f7bf74..83729ad 100755 --- a/pydantic_test.py +++ b/pydantic_test.py @@ -1,11 +1,16 @@ #!/usr/bin/env python3 +""" A command line tool to verify PurpleDome configuration files """ + import argparse +from pprint import pprint +import sys import yaml from app.config_verifier import MainConfig def load(filename): + """ Loads the config file and feeds it into the built in verifier """ with open(filename) as fh: data = yaml.safe_load(fh) return MainConfig(**data) @@ -22,8 +27,13 @@ def create_parser(): if __name__ == "__main__": arguments = create_parser().parse_args() - r = load(arguments.filename) - print(r) - print(r.caldera.apikey) - # print(r.blarg) - print(dir(r.__dict__)) + try: + r = load(arguments.filename) + except TypeError as e: + print("Config file has error(s):") + print(e) + sys.exit(1) + print("Loaded successfully: ") + pprint(r) + + sys.exit(0) diff --git a/pylint.rc b/pylint.rc index f4a5b93..6e884db 100644 --- a/pylint.rc +++ b/pylint.rc @@ -140,7 +140,27 @@ disable=print-statement, exception-escape, comprehension-escape, line-too-long, - fixme + fixme, +# Added: no-self-use: We do not want stray external functions. And decorating it is not worth the mess + no-self-use, +# too-many-arguments: Must be fixed sooner or later in a refactoring. Not now + too-many-arguments, +# too-many-public-methods: Must be fixed sooner or later in a refactoring. Not now + too-many-public-methods, +# too-many-locals: Must be fixed sooner or later in a refactoring. Not now + too-many-locals, +# too-many-instance-attributes: Must be fixed sooner or later in a refactoring. Not now + too-many-instance-attributes, +# too-many-branches: Must be fixed sooner or later in a refactoring. Not now + too-many-branches, +# too-many-statements: Must be fixed sooner or later in a refactoring. Not now + too-many-statements, +# too-many-nested-blocks: Must be fixed sooner or later in a refactoring. Not now + too-many-nested-blocks, +# duplicate lines in files. Removed from the standard RC file. Will be used in a stricter one for manual code reviews + R0801 + + # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option