From fabce9810445f3bc11e9cf1b38d9b69a58cd5099 Mon Sep 17 00:00:00 2001 From: Ken Evensen Date: Wed, 23 May 2018 09:24:54 -0400 Subject: [PATCH] Pamd++ (#35709) * Cleaner, more pythonic, shorter, easier to maintain * Added validation --- lib/ansible/modules/system/pamd.py | 842 ++++++++++++++----------- test/units/modules/system/test_pamd.py | 528 +++++++++------- 2 files changed, 771 insertions(+), 599 deletions(-) diff --git a/lib/ansible/modules/system/pamd.py b/lib/ansible/modules/system/pamd.py index 9289ec3f603..56a805dc407 100644 --- a/lib/ansible/modules/system/pamd.py +++ b/lib/ansible/modules/system/pamd.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -# (c) 2017, Kenneth D. Evensen +# (c) 2017, Kenneth D. Evensen # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) @@ -89,6 +89,14 @@ options: default: /etc/pam.d/ description: - This is the path to the PAM service files + backup: + description: + - Create a backup file including the timestamp information so you can + get the original file back if you somehow clobbered it incorrectly. + type: bool + default: 'no' + version_added: '2.6' + """ EXAMPLES = """ @@ -157,7 +165,8 @@ EXAMPLES = """ - name: Remove specific arguments from a rule pamd: name: system-auth - type: session control='[success=1 default=ignore]' + type: session + control: '[success=1 default=ignore]' module_path: pam_succeed_if.so module_arguments: crond,quiet state: args_absent @@ -221,13 +230,14 @@ change_count: returned: success version_added: 2.4 new_rule: - description: The changes to the rule + description: The changes to the rule. This was available in Ansible version 2.4 and 2.5. It was removed in 2.6. type: string sample: None None None sha512 shadow try_first_pass use_authtok returned: success version_added: 2.4 updated_rule_(n): - description: The rule(s) that was/were changed + description: The rule(s) that was/were changed. This is only available in + Ansible version 2.4 and was removed in 2.5. type: string sample: - password sufficient pam_unix.so sha512 shadow try_first_pass @@ -250,346 +260,418 @@ dest: returned: success type: string sample: "/etc/pam.d/system-auth" +backupdest: + description: + - "The file name of the the backup file, if created." + returned: success + type: string + version_added: 2.6 ... ''' from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils._text import to_native import os import re -import time +from tempfile import NamedTemporaryFile +from datetime import datetime + + +RULE_REGEX = re.compile(r"""(?Pauth|account|session|password)\s+ + (?P\[.*\]|\S*)\s+ + (?P\S*)\s? + (?P.*)""", re.X) + + +class PamdLine(object): + + def __init__(self, line): + self.line = line + self.prev = None + self.next = None + + # Method to check if a rule matches the type, control and path. + def matches(self, rule_type, rule_control, rule_path, rule_args=None): + return False + + def __str__(self): + return str(self.line) + + +class PamdComment(PamdLine): + + def __init__(self, line): + super(PamdComment, self).__init__(line) + + @property + def is_valid(self): + if self.line.startswith('#'): + return True + return False + + +class PamdInclude(PamdLine): + def __init__(self, line): + super(PamdInclude, self).__init__(line) + @property + def is_valid(self): + if self.line.startswith('@include'): + return True + return False -# The PamdRule class encapsulates a rule in a pam.d service -class PamdRule(object): - def __init__(self, rule_type, - rule_control, rule_module_path, - rule_module_args=None): +class PamdRule(PamdLine): + valid_types = ['account', 'auth', 'password', 'session'] + valid_simple_controls = ['required', 'requisite', 'sufficicent', 'optional', 'include', 'substack'] + valid_control_values = ['success', 'open_err', 'symbol_err', 'service_err', 'system_err', 'buf_err', + 'perm_denied', 'auth_err', 'cred_insufficient', 'authinfo_unavail', 'user_unknown', + 'maxtries', 'new_authtok_reqd', 'acct_expired', 'session_err', 'cred_unavail', + 'cred_expired', 'cred_err', 'no_module_data', 'conv_err', 'authtok_err', + 'authtok_recover_err', 'authtok_lock_busy', 'authtok_disable_aging', 'try_again', + 'ignore', 'abort', 'authtok_expired', 'module_unknown', 'bad_item', 'conv_again', + 'incomplete', 'default'] + valid_control_actions = ['ignore', 'bad', 'die', 'ok', 'done', 'reset'] + + def __init__(self, rule_type, rule_control, rule_path, rule_args=None): + self._control = None + self._args = None self.rule_type = rule_type self.rule_control = rule_control - self.rule_module_path = rule_module_path - try: - if (rule_module_args is not None and - type(rule_module_args) is list): - self.rule_module_args = rule_module_args - elif (rule_module_args is not None and - type(rule_module_args) is str): - self.rule_module_args = rule_module_args.split() - except AttributeError: - self.rule_module_args = [] + self.rule_path = rule_path + self.rule_args = rule_args + + # Method to check if a rule matches the type, control and path. + def matches(self, rule_type, rule_control, rule_path, rule_args=None): + if (rule_type == self.rule_type and + rule_control == self.rule_control and + rule_path == self.rule_path): + return True + return False @classmethod - def rulefromstring(cls, stringline): - pattern = None - - rule_type = '' - rule_control = '' - rule_module_path = '' - rule_module_args = '' - complicated = False - - if '[' in stringline: - pattern = re.compile( - r"""([\-A-Za-z0-9_]+)\s* # Rule Type - \[([A-Za-z0-9_=\s]+)\]\s* # Rule Control - ([A-Za-z0-9/_\-\.]+)\s* # Rule Path - ([A-Za-z0-9,_=<>\-\s\./]*)""", # Rule Args - re.X) - complicated = True + def rule_from_string(cls, line): + match = RULE_REGEX.search(line) + return cls(match.group('rule_type'), match.group('control'), match.group('path'), match.group('args')) + + def __str__(self): + if self.rule_args: + return '{0: <11}{1} {2} {3}'.format(self.rule_type, self.rule_control, self.rule_path, ' '.join(self.rule_args)) + return '{0: <11}{1} {2}'.format(self.rule_type, self.rule_control, self.rule_path) + + @property + def rule_control(self): + if isinstance(self._control, list): + return '[' + ' '.join(self._control) + ']' + return self._control + + @rule_control.setter + def rule_control(self, control): + if control.startswith('['): + control = control.replace(' = ', '=').replace('[', '').replace(']', '') + self._control = control.split(' ') else: - pattern = re.compile( - r"""([@\-A-Za-z0-9_]+)\s* # Rule Type - ([A-Za-z0-9_\-]+)\s* # Rule Control - ([A-Za-z0-9/_\-\.]*)\s* # Rule Path - ([A-Za-z0-9,_=<>\-\s\./]*)""", # Rule Args - re.X) - - result = pattern.match(stringline) - rule_type = result.group(1) - if complicated: - rule_control = '[' + result.group(2) + ']' + self._control = control + + @property + def rule_args(self): + if not self._args: + return [] + return self._args + + @rule_args.setter + def rule_args(self, args): + if isinstance(args, str): + args = args.replace(" = ", "=") + self._args = args.split(" ") else: - rule_control = result.group(2) - rule_module_path = result.group(3) - if result.group(4) is not None: - rule_module_args = result.group(4) + self._args = args - return cls(rule_type, rule_control, rule_module_path, rule_module_args) + @property + def line(self): + return str(self) - def get_module_args_as_string(self): + @classmethod + def is_action_unsigned_int(cls, string_num): + number = 0 try: - if self.rule_module_args is not None: - return ' '.join(self.rule_module_args) - except AttributeError: - pass - return '' + number = int(string_num) + except ValueError: + return False + + if number >= 0: + return True + return False + + def validate(self): + # Validate the rule type + if self.rule_type not in PamdRule.valid_types: + return False, "Rule type, " + self.rule_type + ", is not valid in rule " + self.line + # Validate the rule control + if isinstance(self.rule_control, str) and self.rule_control not in PamdRule.valid_simple_controls: + return False, "Rule control, " + self.rule_control + ", is not valid in rule " + self.line + elif isinstance(self.rule_control, list): + for control in self.rule_control: + value, action = control.split("=") + if value not in PamdRule.valid_control_values: + return False, "Rule control value, " + value + ", is not valid in rule " + self.line + if action not in PamdRule.valid_control_actions and not PamdRule.is_action_unsigned_int(action): + return False, "Rule control action, " + action + ", is not valid in rule " + self.line + + # TODO: Validate path + + return True, "Rule is valid " + self.line + + +# PamdService encapsulates an entire service and contains one or more rules. It seems the best way is to do this +# as a doubly linked list. +class PamdService(object): - def __str__(self): - return "%-10s\t%s\t%s %s" % (self.rule_type, - self.rule_control, - self.rule_module_path, - self.get_module_args_as_string()) + def __init__(self, content): + self._head = None + self._tail = None + for line in content.splitlines(): + if line.lstrip().startswith('#'): + pamd_line = PamdComment(line) + elif line.lstrip().startswith('@include'): + pamd_line = PamdInclude(line) + elif line == '': + pamd_line = PamdLine(line) + else: + pamd_line = PamdRule.rule_from_string(line) + + self.append(pamd_line) + + def append(self, pamd_line): + if self._head is None: + self._head = self._tail = pamd_line + else: + pamd_line.prev = self._tail + pamd_line.next = None + self._tail.next = pamd_line + self._tail = pamd_line + + def remove(self, rule_type, rule_control, rule_path): + current_line = self._head + changed = 0 + + while current_line is not None: + if current_line.matches(rule_type, rule_control, rule_path): + if current_line.prev is not None: + current_line.prev.next = current_line.next + current_line.next.prev = current_line.prev + else: + self._head = current_line.next + current_line.next.prev = None + changed += 1 + + current_line = current_line.next + return changed + + def get(self, rule_type, rule_control, rule_path): + lines = [] + current_line = self._head + while current_line is not None: + + if isinstance(current_line, PamdRule) and current_line.matches(rule_type, rule_control, rule_path): + lines.append(current_line) + + current_line = current_line.next + + return lines + + def has_rule(self, rule_type, rule_control, rule_path): + if self.get(rule_type, rule_control, rule_path): + return True + return False + + def update_rule(self, rule_type, rule_control, rule_path, + new_type=None, new_control=None, new_path=None, new_args=None): + # Get a list of rules we want to change + rules_to_find = self.get(rule_type, rule_control, rule_path) + + for current_rule in rules_to_find: + if new_type: + current_rule.rule_type = new_type + if new_control: + current_rule.rule_control = new_control + if new_path: + current_rule.rule_path = new_path + if new_args: + if isinstance(new_args, str): + new_args = new_args.replace(" = ", "=") + new_args = new_args.split(' ') + current_rule.rule_args = new_args + + return len(rules_to_find) + + def insert_before(self, rule_type, rule_control, rule_path, + new_type=None, new_control=None, new_path=None, new_args=None): + # Get a list of rules we want to change + rules_to_find = self.get(rule_type, rule_control, rule_path) + changed = 0 + # There are two cases to consider. + # 1. The new rule doesn't exist before the existing rule + # 2. The new rule exists + + for current_rule in rules_to_find: + # Create a new rule + new_rule = PamdRule(new_type, new_control, new_path, new_args) + # First we'll get the previous rule. + previous_rule = current_rule.prev + + # Next we may have to loop backwards if the previous line is a comment. If it + # is, we'll get the previous "rule's" previous. + while previous_rule is not None and isinstance(previous_rule, PamdComment): + previous_rule = previous_rule.prev + # Next we'll see if the previous rule matches what we are trying to insert. + if previous_rule is not None and not previous_rule.matches(new_type, new_control, new_path): + # First set the original previous rule's next to the new_rule + previous_rule.next = new_rule + # Second, set the new_rule's previous to the original previous + new_rule.prev = previous_rule + # Third, set the new rule's next to the current rule + new_rule.next = current_rule + # Fourth, set the current rule's previous to the new_rule + current_rule.prev = new_rule + + changed += 1 + + # Handle the case where it is the first rule in the list. + elif previous_rule is None: + # This is the case where the current rule is not only the first rule + # but the first line as well. So we set the head to the new rule + if current_rule.prev is None: + self._head = new_rule + # This case would occur if the previous line was a comment. + else: + current_rule.prev.next = new_rule + new_rule.prev = current_rule.prev + new_rule.next = current_rule + current_rule.prev = new_rule + changed += 1 + + return changed + + def insert_after(self, rule_type, rule_control, rule_path, + new_type=None, new_control=None, new_path=None, new_args=None): + # Get a list of rules we want to change + rules_to_find = self.get(rule_type, rule_control, rule_path) + changed = 0 + # There are two cases to consider. + # 1. The new rule doesn't exist after the existing rule + # 2. The new rule exists + for current_rule in rules_to_find: + # First we'll get the next rule. + next_rule = current_rule.next + # Next we may have to loop forwards if the next line is a comment. If it + # is, we'll get the next "rule's" next. + while next_rule is not None and isinstance(next_rule, PamdComment): + next_rule = next_rule.next + + # First we create a new rule + new_rule = PamdRule(new_type, new_control, new_path, new_args) + if next_rule is not None and not next_rule.matches(new_type, new_control, new_path): + # If the previous rule doesn't match we'll insert our new rule. + + # Second set the original next rule's previuous to the new_rule + next_rule.prev = new_rule + # Third, set the new_rule's next to the original next rule + new_rule.next = next_rule + # Fourth, set the new rule's previous to the current rule + new_rule.prev = current_rule + # Fifth, set the current rule's next to the new_rule + current_rule.next = new_rule + + changed += 1 + + # This is the case where the current_rule is the last in the list + elif next_rule is None: + new_rule.prev = self._tail + new_rule.next = None + self._tail.next = new_rule + self._tail = new_rule + + current_rule.next = new_rule + changed += 1 + + return changed + + def add_module_arguments(self, rule_type, rule_control, rule_path, args_to_add): + # Get a list of rules we want to change + rules_to_find = self.get(rule_type, rule_control, rule_path) + + changed = 0 + + for current_rule in rules_to_find: + if isinstance(args_to_add, str): + args_to_add = args_to_add.replace(" = ", "=") + args_to_add = args_to_add.split(' ') + if not args_to_add: + args_to_add = [] + # Create a list of new args that aren't already present + new_args = [arg for arg in args_to_add if arg not in current_rule.rule_args] + # If there aren't any new args to add, we'll move on to the next rule + if not new_args: + continue + + current_rule.rule_args = current_rule.rule_args + new_args + + changed += 1 + + return changed + + def remove_module_arguments(self, rule_type, rule_control, rule_path, args_to_remove): + # Get a list of rules we want to change + rules_to_find = self.get(rule_type, rule_control, rule_path) + + changed = 0 + + for current_rule in rules_to_find: + if isinstance(args_to_remove, str): + args_to_remove = args_to_remove.replace(" = ", "=") + args_to_remove = args_to_remove.split(' ') + if not args_to_remove: + args_to_remove = [] + + # Let's check to see if there are any args to remove by finding the intersection + # of the rule's current args and the args_to_remove lists + if not list(set(current_rule.rule_args) & set(args_to_remove)): + continue + + # There are args to remove, so we create a list of new_args absent the args + # to remove. + current_rule.rule_args = [arg for arg in current_rule.rule_args if arg not in args_to_remove] + + changed += 1 + + return changed + + def validate(self): + current_line = self._head + + while current_line is not None: + if not current_line.is_valid()[0]: + return current_line.is_valid() + current_line = current_line.next + return True, "Module is valid" + def __str__(self): + lines = [] + current_line = self._head -# PamdService encapsulates an entire service and contains one or more rules -class PamdService(object): + while current_line is not None: + lines.append(str(current_line)) + current_line = current_line.next - def __init__(self, ansible=None): - - if ansible is not None: - self.check = ansible.check_mode - self.check = False - self.ansible = ansible - self.preamble = [] - self.rules = [] - self.fname = None - if ansible is not None: - self.path = self.ansible.params["path"] - self.name = self.ansible.params["name"] - - def load_rules_from_file(self): - self.fname = os.path.join(self.path, self.name) - stringline = '' - try: - for line in open(self.fname, 'r'): - stringline += line.rstrip().lstrip() - stringline += '\n' - self.load_rules_from_string(stringline.replace("\\\n", "")) - - except IOError as e: - self.ansible.fail_json(msg='Unable to open/read PAM module \ - file %s with error %s. And line %s' % - (self.fname, to_native(e), stringline)) - - def load_rules_from_string(self, stringvalue): - for line in stringvalue.splitlines(): - stringline = line.rstrip() - if line.startswith('#') and not line.isspace(): - self.preamble.append(line.rstrip()) - elif (not line.startswith('#') and - not line.isspace() and - len(line) != 0): - try: - self.ansible.log(msg="Creating rule from string %s" % stringline) - except AttributeError: - pass - self.rules.append(PamdRule.rulefromstring(stringline)) - - def write(self): - if self.fname is None: - self.fname = self.path + "/" + self.name - # If the file is a symbollic link, we'll write to the source. - pamd_file = os.path.realpath(self.fname) - temp_file = "/tmp/" + self.name + "_" + time.strftime("%y%m%d%H%M%S") - try: - f = open(temp_file, 'w') - f.write(str(self)) - f.close() - except IOError: - self.ansible.fail_json(msg='Unable to create temporary \ - file %s' % self.temp_file) + if lines[1].startswith("# Updated by Ansible"): + lines.pop(1) - self.ansible.atomic_move(temp_file, pamd_file) + lines.insert(1, "# Updated by Ansible - " + datetime.now().isoformat()) - def __str__(self): - stringvalue = '' - previous_rule = None - for amble in self.preamble: - stringvalue += amble - stringvalue += '\n' - - for rule in self.rules: - if (previous_rule is not None and - (previous_rule.rule_type.replace('-', '') != - rule.rule_type.replace('-', ''))): - stringvalue += '\n' - stringvalue += str(rule).rstrip() - stringvalue += '\n' - previous_rule = rule - - if stringvalue.endswith('\n'): - stringvalue = stringvalue[:-1] - - return stringvalue - - -def update_rule(service, old_rule, new_rule): - - changed = False - change_count = 0 - result = {'action': 'update_rule'} - - for rule in service.rules: - if (old_rule.rule_type == rule.rule_type and - old_rule.rule_control == rule.rule_control and - old_rule.rule_module_path == rule.rule_module_path): - - if (new_rule.rule_type is not None and - new_rule.rule_type != rule.rule_type): - rule.rule_type = new_rule.rule_type - changed = True - if (new_rule.rule_control is not None and - new_rule.rule_control != rule.rule_control): - rule.rule_control = new_rule.rule_control - changed = True - if (new_rule.rule_module_path is not None and - new_rule.rule_module_path != rule.rule_module_path): - rule.rule_module_path = new_rule.rule_module_path - changed = True - try: - if (new_rule.rule_module_args is not None and - new_rule.get_module_args_as_string() != - rule.get_module_args_as_string()): - rule.rule_module_args = new_rule.rule_module_args - changed = True - except AttributeError: - pass - if changed: - result['updated_rule_' + str(change_count)] = str(rule) - result['new_rule'] = str(new_rule) - - change_count += 1 - - result['change_count'] = change_count - return changed, result - - -def insert_before_rule(service, old_rule, new_rule): - index = 0 - change_count = 0 - result = {'action': - 'insert_before_rule'} - changed = False - for rule in service.rules: - if (old_rule.rule_type == rule.rule_type and - old_rule.rule_control == rule.rule_control and - old_rule.rule_module_path == rule.rule_module_path): - if index == 0: - service.rules.insert(0, new_rule) - changed = True - elif (new_rule.rule_type != service.rules[index - 1].rule_type or - new_rule.rule_control != - service.rules[index - 1].rule_control or - new_rule.rule_module_path != - service.rules[index - 1].rule_module_path): - service.rules.insert(index, new_rule) - changed = True - if changed: - result['new_rule'] = str(new_rule) - result['before_rule_' + str(change_count)] = str(rule) - change_count += 1 - index += 1 - result['change_count'] = change_count - return changed, result - - -def insert_after_rule(service, old_rule, new_rule): - index = 0 - change_count = 0 - result = {'action': 'insert_after_rule'} - changed = False - for rule in service.rules: - if (old_rule.rule_type == rule.rule_type and - old_rule.rule_control == rule.rule_control and - old_rule.rule_module_path == rule.rule_module_path): - if (index == len(service.rules) - 1): - service.rules.insert(len(service.rules), new_rule) - changed = True - elif (new_rule.rule_type != service.rules[index + 1].rule_type or - new_rule.rule_control != - service.rules[index + 1].rule_control or - new_rule.rule_module_path != - service.rules[index + 1].rule_module_path): - service.rules.insert(index + 1, new_rule) - changed = True - if changed: - result['new_rule'] = str(new_rule) - result['after_rule_' + str(change_count)] = str(rule) - change_count += 1 - index += 1 - - result['change_count'] = change_count - return changed, result - - -def remove_module_arguments(service, old_rule, module_args): - result = {'action': 'args_absent'} - changed = False - change_count = 0 - - for rule in service.rules: - if (old_rule.rule_type == rule.rule_type and - old_rule.rule_control == rule.rule_control and - old_rule.rule_module_path == rule.rule_module_path): - for arg_to_remove in module_args: - for arg in rule.rule_module_args: - if arg == arg_to_remove: - rule.rule_module_args.remove(arg) - changed = True - result['removed_arg_' + str(change_count)] = arg - result['from_rule_' + str(change_count)] = str(rule) - change_count += 1 - - result['change_count'] = change_count - return changed, result - - -def add_module_arguments(service, old_rule, module_args): - result = {'action': 'args_present'} - changed = False - change_count = 0 - - for rule in service.rules: - if (old_rule.rule_type == rule.rule_type and - old_rule.rule_control == rule.rule_control and - old_rule.rule_module_path == rule.rule_module_path): - for arg_to_add in module_args: - if "=" in arg_to_add: - pre_string = arg_to_add[:arg_to_add.index('=') + 1] - indicies = [i for i, arg - in enumerate(rule.rule_module_args) - if arg.startswith(pre_string)] - if len(indicies) == 0: - rule.rule_module_args.append(arg_to_add) - changed = True - result['added_arg_' + str(change_count)] = arg_to_add - result['to_rule_' + str(change_count)] = str(rule) - change_count += 1 - else: - for i in indicies: - if rule.rule_module_args[i] != arg_to_add: - rule.rule_module_args[i] = arg_to_add - changed = True - result['updated_arg_' + - str(change_count)] = arg_to_add - result['in_rule_' + - str(change_count)] = str(rule) - change_count += 1 - elif arg_to_add not in rule.rule_module_args: - rule.rule_module_args.append(arg_to_add) - changed = True - result['added_arg_' + str(change_count)] = arg_to_add - result['to_rule_' + str(change_count)] = str(rule) - change_count += 1 - result['change_count'] = change_count - return changed, result - - -def remove_rule(service, old_rule): - result = {'action': 'absent'} - changed = False - change_count = 0 - for rule in service.rules: - if (old_rule.rule_type == rule.rule_type and - old_rule.rule_control == rule.rule_control and - old_rule.rule_module_path == rule.rule_module_path): - service.rules.remove(rule) - changed = True - return changed, result + return '\n'.join(lines) def main(): @@ -611,7 +693,8 @@ def main(): state=dict(required=False, default="updated", choices=['before', 'after', 'updated', 'args_absent', 'args_present', 'absent']), - path=dict(required=False, default='/etc/pam.d', type='str') + path=dict(required=False, default='/etc/pam.d', type='str'), + backup=dict(default=False, type='bool') ), supports_check_mode=True, required_if=[ @@ -626,66 +709,77 @@ def main(): ] ) + content = str() + fname = os.path.join(module.params["path"], module.params["name"]) + backupdest = "" + # Open the file and read the content or fail + try: + with open(fname, 'r') as service_file_obj: + content = service_file_obj.read() + except IOError as e: + # If unable to read the file, fail out + module.fail_json(msg='Unable to open/read PAM module \ + file %s with error %s.' % + (fname, str(e))) + + # Assuming we didnt fail, create the service + service = PamdService(content) + # Set the action + action = module.params['state'] + + # Take action + if action == 'updated': + changes = service.update_rule(module.params['type'], module.params['control'], module.params['module_path'], + module.params['new_type'], module.params['new_control'], module.params['new_module_path'], + module.params['module_arguments']) + elif action == 'before': + changes = service.insert_before(module.params['type'], module.params['control'], module.params['module_path'], + module.params['new_type'], module.params['new_control'], module.params['new_module_path'], + module.params['module_arguments']) + elif action == 'after': + changes = service.insert_after(module.params['type'], module.params['control'], module.params['module_path'], + module.params['new_type'], module.params['new_control'], module.params['new_module_path'], + module.params['module_arguments']) + elif action == 'args_absent': + changes = service.remove_module_arguments(module.params['type'], module.params['control'], module.params['module_path'], + module.params['module_arguments']) + elif action == 'args_present': + changes = service.add_module_arguments(module.params['type'], module.params['control'], module.params['module_path'], + module.params['module_arguments']) + elif action == 'absent': + changes = service.remove(module.params['type'], module.params['control'], module.params['module_path']) + + valid, msg = service.validate() + + # If the module is not valid (meaning one of the rules is invalid), we will fail + if not valid: + module.fail_json(msg=msg) + + # If not check mode and something changed, backup the original if necessary then write out the file or fail + if not module.check_mode and changes > 0: + pamd_file = os.path.realpath(fname) + # First, create a backup if desired. + if module.params['backup']: + backupdest = module.backup_local(fname) + print("BACKUP DEST", backupdest) + try: + temp_file = NamedTemporaryFile(mode='w') + with open(temp_file.name, 'w') as fd: + fd.write(str(service)) - service = module.params['name'] - old_type = module.params['type'] - old_control = module.params['control'] - old_module_path = module.params['module_path'] - - new_type = module.params['new_type'] - new_control = module.params['new_control'] - new_module_path = module.params['new_module_path'] - - module_arguments = module.params['module_arguments'] - state = module.params['state'] - - path = module.params['path'] - - pamd = PamdService(module) - pamd.load_rules_from_file() - - old_rule = PamdRule(old_type, - old_control, - old_module_path) - new_rule = PamdRule(new_type, - new_control, - new_module_path, - module_arguments) - - if state == 'updated': - change, result = update_rule(pamd, - old_rule, - new_rule) - elif state == 'before': - change, result = insert_before_rule(pamd, - old_rule, - new_rule) - elif state == 'after': - change, result = insert_after_rule(pamd, - old_rule, - new_rule) - elif state == 'args_absent': - change, result = remove_module_arguments(pamd, - old_rule, - module_arguments) - elif state == 'args_present': - change, result = add_module_arguments(pamd, - old_rule, - module_arguments) - elif state == 'absent': - change, result = remove_rule(pamd, - old_rule) - - if not module.check_mode and change: - pamd.write() - - facts = {} - facts['pamd'] = {'changed': change, 'result': result} + except IOError: + module.fail_json(msg='Unable to create temporary \ + file %s' % temp_file) - module.params['dest'] = pamd.fname + module.atomic_move(temp_file.name, pamd_file) - module.exit_json(changed=change, ansible_facts=facts) + facts = {} + facts['pamd'] = {'changed': changes > 0, + 'change_count': changes, + 'action': action, + 'backupdest': backupdest} + module.exit_json(changed=changes > 0, ansible_facts=facts) if __name__ == '__main__': main() diff --git a/test/units/modules/system/test_pamd.py b/test/units/modules/system/test_pamd.py index 653402a2448..6a7141c69dd 100644 --- a/test/units/modules/system/test_pamd.py +++ b/test/units/modules/system/test_pamd.py @@ -1,92 +1,98 @@ +from __future__ import (absolute_import, division, print_function) from ansible.compat.tests import unittest + from ansible.modules.system.pamd import PamdRule +from ansible.modules.system.pamd import PamdLine +from ansible.modules.system.pamd import PamdComment +from ansible.modules.system.pamd import PamdInclude from ansible.modules.system.pamd import PamdService -from ansible.modules.system.pamd import update_rule -from ansible.modules.system.pamd import insert_before_rule -from ansible.modules.system.pamd import insert_after_rule -from ansible.modules.system.pamd import remove_module_arguments -from ansible.modules.system.pamd import add_module_arguments -from ansible.modules.system.pamd import remove_rule -import re + +class PamdLineTestCase(unittest.TestCase): + + def setUp(self): + self.pamd_line = PamdLine("This is a test") + + def test_line(self): + self.assertEqual("This is a test", str(self.pamd_line)) + + def test_matches(self): + self.assertFalse(self.pamd_line.matches("test", "matches", "foo", "bar")) + + +class PamdIncludeTestCase(unittest.TestCase): + + def setUp(self): + self.good_include = PamdInclude("@include foobar") + self.bad_include = PamdInclude("include foobar") + + def test_line(self): + self.assertEqual("@include foobar", str(self.good_include)) + + def test_matches(self): + self.assertFalse(self.good_include.matches("something", "something", "dark", "side")) + + def test_valid(self): + self.assertTrue(self.good_include.is_valid) + self.assertFalse(self.bad_include.is_valid) + + +class PamdCommentTestCase(unittest.TestCase): + + def setUp(self): + self.good_comment = PamdComment("# This is a test comment") + self.bad_comment = PamdComment("This is a bad test comment") + + def test_line(self): + self.assertEqual("# This is a test comment", str(self.good_comment)) + + def test_matches(self): + self.assertFalse(self.good_comment.matches("test", "matches", "foo", "bar")) + + def test_valid(self): + self.assertTrue(self.good_comment.is_valid) + self.assertFalse(self.bad_comment.is_valid) class PamdRuleTestCase(unittest.TestCase): + def setUp(self): + self.rule = PamdRule('account', 'optional', 'pam_keyinit.so', 'revoke') + + def test_type(self): + self.assertEqual(self.rule.rule_type, 'account') + + def test_control(self): + self.assertEqual(self.rule.rule_control, 'optional') + self.assertEqual(self.rule._control, 'optional') + + def test_path(self): + self.assertEqual(self.rule.rule_path, 'pam_keyinit.so') + + def test_args(self): + self.assertEqual(self.rule.rule_args, ['revoke']) + + def test_valid(self): + self.assertTrue(self.rule.validate()[0]) + + +class PamdRuleBadValidationTestCase(unittest.TestCase): + def setUp(self): + self.bad_type = PamdRule('foobar', 'optional', 'pam_keyinit.so', 'revoke') + self.bad_control_simple = PamdRule('account', 'foobar', 'pam_keyinit.so', 'revoke') + self.bad_control_value = PamdRule('account', '[foobar=1 default=ignore]', 'pam_keyinit.so', 'revoke') + self.bad_control_action = PamdRule('account', '[success=1 default=foobar]', 'pam_keyinit.so', 'revoke') - def test_simple(self): - simple = "auth required pam_env.so".rstrip() - module = PamdRule.rulefromstring(stringline=simple) - module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) - self.assertEqual(simple, module_string.rstrip()) - self.assertEqual('', module.get_module_args_as_string()) - - def test_simple_more(self): - simple = "auth required pam_tally2.so deny=5 onerr=fail".rstrip() - module = PamdRule.rulefromstring(stringline=simple) - module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) - self.assertEqual(simple, module_string.rstrip()) - self.assertEqual('deny=5 onerr=fail', - module.get_module_args_as_string()) - - def test_complicated_rule(self): - complicated = "-auth [default=1 success=ok] pam_localuser.so".rstrip() - module = PamdRule.rulefromstring(stringline=complicated) - module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) - self.assertEqual(complicated, module_string.rstrip()) - self.assertEqual('', module.get_module_args_as_string()) - - def test_more_complicated_rule(self): - complicated = "auth" - complicated += " [success=done ignore=ignore default=die]" - complicated += " pam_unix.so" - complicated += " try_first_pass".rstrip() - module = PamdRule.rulefromstring(stringline=complicated) - module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) - self.assertEqual(complicated, module_string.rstrip()) - self.assertEqual('try_first_pass', module.get_module_args_as_string()) - - def test_rule_with_arg(self): - line = "account optional pam_echo.so file=/etc/lockout.txt" - module = PamdRule.rulefromstring(stringline=line) - self.assertEqual(module.rule_type, 'account') - self.assertEqual(module.rule_control, 'optional') - self.assertEqual(module.rule_module_path, 'pam_echo.so') - self.assertEqual(module.rule_module_args, ['file=/etc/lockout.txt']) - - def test_rule_with_args(self): - line = "account optional pam_echo.so file1=/etc/lockout1.txt file2=/etc/lockout2.txt" - module = PamdRule.rulefromstring(stringline=line) - self.assertEqual(module.rule_type, 'account') - self.assertEqual(module.rule_control, 'optional') - self.assertEqual(module.rule_module_path, 'pam_echo.so') - self.assertEqual(module.rule_module_args, ['file1=/etc/lockout1.txt', 'file2=/etc/lockout2.txt']) - - def test_less_than_in_args(self): - rule = "auth requisite pam_succeed_if.so uid >= 1025 quiet_success" - module = PamdRule.rulefromstring(stringline=rule) - module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) - self.assertEqual(rule, module_string.rstrip()) - self.assertEqual('uid >= 1025 quiet_success', module.get_module_args_as_string()) - - def test_trailing_comma(self): - rule = "account required pam_access.so listsep=," - module = PamdRule.rulefromstring(stringline=rule) - module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) - self.assertEqual(rule, module_string.rstrip()) - - def test_slash_in_args(self): - rule = "auth sufficient /lib64/security/pam_duo.so".rstrip() - module = PamdRule.rulefromstring(stringline=rule) - module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) - self.assertEqual(rule, module_string.rstrip()) - self.assertEqual('', module.get_module_args_as_string()) - - def test_slash_in_args_more(self): - rule = "auth [success=1 default=ignore] /lib64/security/pam_duo.so".rstrip() - module = PamdRule.rulefromstring(stringline=rule) - module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) - self.assertEqual(rule, module_string.rstrip()) - self.assertEqual('', module.get_module_args_as_string()) + def test_validate_bad_type(self): + self.assertFalse(self.bad_type.validate()[0]) + + def test_validate_bad_control_simple(self): + self.assertFalse(self.bad_control_simple.validate()[0]) + + def test_validate_bad_control_value(self): + self.assertFalse(self.bad_control_value.validate()[0]) + + def test_validate_bad_control_action(self): + self.assertFalse(self.bad_control_action.validate()[0]) class PamdServiceTestCase(unittest.TestCase): @@ -94,182 +100,254 @@ class PamdServiceTestCase(unittest.TestCase): self.system_auth_string = """#%PAM-1.0 # This file is auto-generated. # User changes will be destroyed the next time authconfig is run. -auth \trequired\tpam_env.so -auth \tsufficient\tpam_unix.so nullok try_first_pass -auth \trequisite\tpam_succeed_if.so uid -auth \trequired\tpam_deny.so -auth \tsufficient\tpam_rootok.so - -account \trequired\tpam_unix.so -account \tsufficient\tpam_localuser.so -account \tsufficient\tpam_succeed_if.so uid +@include common-auth +@include common-account +@include common-session +auth required pam_env.so +auth sufficient pam_unix.so nullok try_first_pass +auth requisite pam_succeed_if.so uid +auth required pam_deny.so +# Test comment +auth sufficient pam_rootok.so + +account required pam_unix.so +account sufficient pam_localuser.so +account sufficient pam_succeed_if.so uid account [success=1 default=ignore] \ -\t\t\t\tpam_succeed_if.so user = vagrant use_uid quiet -account \trequired\tpam_permit.so -account \trequired\tpam_access.so listsep=, -session \tinclude\tsystem-auth - -password \trequisite\tpam_pwquality.so try_first_pass local_users_only retry=3 authtok_type= -password \tsufficient\tpam_unix.so sha512 shadow nullok try_first_pass use_authtok -password \trequired\tpam_deny.so - -session \toptional\tpam_keyinit.so revoke -session \trequired\tpam_limits.so --session \toptional\tpam_systemd.so -session \t[success=1 default=ignore]\tpam_succeed_if.so service in crond quiet use_uid -session \t[success=1 test=me default=ignore]\tpam_succeed_if.so service in crond quiet use_uid -session \trequired\tpam_unix.so -@include \tcommon-auth -@include \tcommon-account -@include \tcommon-session""" - - self.pamd = PamdService() - self.pamd.load_rules_from_string(self.system_auth_string) + pam_succeed_if.so user = vagrant use_uid quiet +account required pam_permit.so +account required pam_access.so listsep=, +session include system-auth + +password requisite pam_pwquality.so try_first_pass local_users_only retry=3 authtok_type= +password sufficient pam_unix.so sha512 shadow nullok try_first_pass use_authtok +password required pam_deny.so + +session optional pam_keyinit.so revoke +session required pam_limits.so +-session optional pam_systemd.so +session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid +session [success=1 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid +session required pam_unix.so""" + + self.simple_system_auth_string = """#%PAM-1.0 + auth required pam_env.so +""" + + self.pamd = PamdService(self.system_auth_string) + + def test_properly_parsed(self): + num_lines = len(self.system_auth_string.splitlines()) + 1 + num_lines_processed = len(str(self.pamd).splitlines()) + self.assertEqual(num_lines, num_lines_processed) + def test_has_rule(self): + self.assertTrue(self.pamd.has_rule('account', 'required', 'pam_permit.so')) + self.assertTrue(self.pamd.has_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so')) + + def test_doesnt_have_rule(self): + self.assertFalse(self.pamd.has_rule('account', 'requisite', 'pam_permit.so')) + + # Test Update def test_update_rule_type(self): - old_rule = PamdRule.rulefromstring('auth required pam_env.so') - new_rule = PamdRule.rulefromstring('session required pam_env.so') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.update_rule('session', 'optional', 'pam_keyinit.so', new_type='account')) + self.assertTrue(self.pamd.has_rule('account', 'optional', 'pam_keyinit.so')) + test_rule = PamdRule('account', 'optional', 'pam_keyinit.so', 'revoke') + self.assertIn(str(test_rule), str(self.pamd)) + + def test_update_rule_that_doesnt_exist(self): + self.assertFalse(self.pamd.update_rule('blah', 'blah', 'blah', new_type='account')) + self.assertFalse(self.pamd.has_rule('blah', 'blah', 'blah')) + test_rule = PamdRule('blah', 'blah', 'blah', 'account') + self.assertNotIn(str(test_rule), str(self.pamd)) + + def test_update_rule_type_two(self): + self.assertTrue(self.pamd.update_rule('session', '[success=1 default=ignore]', 'pam_succeed_if.so', new_type='account')) + self.assertTrue(self.pamd.has_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so')) + test_rule = PamdRule('account', '[success=1 default=ignore]', 'pam_succeed_if.so') + self.assertIn(str(test_rule), str(self.pamd)) def test_update_rule_control_simple(self): - old_rule = PamdRule.rulefromstring('auth required pam_env.so') - new_rule = PamdRule.rulefromstring('auth sufficent pam_env.so') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.update_rule('session', 'optional', 'pam_keyinit.so', new_control='required')) + self.assertTrue(self.pamd.has_rule('session', 'required', 'pam_keyinit.so')) + test_rule = PamdRule('session', 'required', 'pam_keyinit.so') + self.assertIn(str(test_rule), str(self.pamd)) def test_update_rule_control_complex(self): - old_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid') - new_rule = PamdRule.rulefromstring('session [success=2 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.update_rule('session', + '[success=1 default=ignore]', + 'pam_succeed_if.so', + new_control='[success=2 test=me default=ignore]')) + self.assertTrue(self.pamd.has_rule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so')) + test_rule = PamdRule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so') + self.assertIn(str(test_rule), str(self.pamd)) def test_update_rule_control_more_complex(self): - old_rule = PamdRule.rulefromstring('session [success=1 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid') - new_rule = PamdRule.rulefromstring('session [success=2 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + + self.assertTrue(self.pamd.update_rule('session', + '[success=1 test=me default=ignore]', + 'pam_succeed_if.so', + new_control='[success=2 test=me default=ignore]')) + self.assertTrue(self.pamd.has_rule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so')) + test_rule = PamdRule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so') + self.assertIn(str(test_rule), str(self.pamd)) def test_update_rule_module_path(self): - old_rule = PamdRule.rulefromstring('auth required pam_env.so') - new_rule = PamdRule.rulefromstring('session required pam_limits.so') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', new_path='pam_limits.so')) + self.assertTrue(self.pamd.has_rule('auth', 'required', 'pam_limits.so')) def test_update_rule_module_path_slash(self): - old_rule = PamdRule.rulefromstring('auth required pam_env.so') - new_rule = PamdRule.rulefromstring('auth required /lib64/security/pam_duo.so') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', new_path='/lib64/security/pam_duo.so')) + self.assertTrue(self.pamd.has_rule('auth', 'required', '/lib64/security/pam_duo.so')) def test_update_rule_module_args(self): - old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') - new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so uid uid') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so', new_args='uid uid')) + test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'uid uid') + self.assertIn(str(test_rule), str(self.pamd)) + + test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'nullok try_first_pass') + self.assertNotIn(str(test_rule), str(self.pamd)) def test_update_first_three(self): - old_rule = PamdRule.rulefromstring('auth required pam_env.so') - new_rule = PamdRule.rulefromstring('one two three') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', + new_type='one', new_control='two', new_path='three')) + self.assertTrue(self.pamd.has_rule('one', 'two', 'three')) def test_update_first_three_with_module_args(self): - old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') - new_rule = PamdRule.rulefromstring('one two three') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so', + new_type='one', new_control='two', new_path='three')) + self.assertTrue(self.pamd.has_rule('one', 'two', 'three')) + test_rule = PamdRule('one', 'two', 'three') + self.assertIn(str(test_rule), str(self.pamd)) + self.assertIn(str(test_rule), str(self.pamd)) def test_update_all_four(self): - old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') - new_rule = PamdRule.rulefromstring('one two three four five') - update_rule(self.pamd, old_rule, new_rule) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) - + self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so', + new_type='one', new_control='two', new_path='three', + new_args='four five')) + test_rule = PamdRule('one', 'two', 'three', 'four five') + self.assertIn(str(test_rule), str(self.pamd)) + + test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'nullok try_first_pass') + self.assertNotIn(str(test_rule), str(self.pamd)) + + def test_update_rule_with_slash(self): + self.assertTrue(self.pamd.update_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so', + new_type='session', new_path='pam_access.so')) + test_rule = PamdRule('session', '[success=1 default=ignore]', 'pam_access.so') + self.assertIn(str(test_rule), str(self.pamd)) + + # Insert Before def test_insert_before_rule(self): - old_rule = PamdRule.rulefromstring('account required pam_unix.so') - new_rule = PamdRule.rulefromstring('account required pam_permit.so') - insert_before_rule(self.pamd, old_rule, new_rule) - line_to_test = str(new_rule).rstrip() - line_to_test += '\n' - line_to_test += str(old_rule).rstrip() - self.assertIn(line_to_test, str(self.pamd)) + count = self.pamd.insert_before('account', 'required', 'pam_access.so', + new_type='account', new_control='required', new_path='pam_limits.so') + self.assertEqual(count, 1) + + rules = self.pamd.get("account", "required", "pam_access.so") + for current_rule in rules: + self.assertTrue(current_rule.prev.matches("account", "required", "pam_limits.so")) + + def test_insert_before_rule_where_rule_doesnt_exist(self): + + count = self.pamd.insert_before('account', 'sufficient', 'pam_access.so', + new_type='account', new_control='required', new_path='pam_limits.so') + self.assertFalse(count) + + def test_insert_before_rule_with_args(self): + self.assertTrue(self.pamd.insert_before('account', 'required', 'pam_access.so', + new_type='account', new_control='required', new_path='pam_limits.so', + new_args='uid')) + + rules = self.pamd.get("account", "required", "pam_access.so") + for current_rule in rules: + self.assertTrue(current_rule.prev.matches("account", "required", "pam_limits.so", 'uid')) + + def test_insert_before_rule_test_duplicates(self): + self.assertTrue(self.pamd.insert_before('account', 'required', 'pam_access.so', + new_type='account', new_control='required', new_path='pam_limits.so')) + + self.pamd.insert_before('account', 'required', 'pam_access.so', + new_type='account', new_control='required', new_path='pam_limits.so') + + rules = self.pamd.get("account", "required", "pam_access.so") + for current_rule in rules: + previous_rule = current_rule.prev + self.assertTrue(previous_rule.matches("account", "required", "pam_limits.so")) + self.assertFalse(previous_rule.prev.matches("account", "required", "pam_limits.so")) + + def test_insert_before_first_rule(self): + self.assertTrue(self.pamd.insert_before('auth', 'required', 'pam_env.so', + new_type='account', new_control='required', new_path='pam_limits.so')) + + def test_insert_before_first_rule_simple(self): + simple_service = PamdService(self.simple_system_auth_string) + self.assertTrue(simple_service.insert_before('auth', 'required', 'pam_env.so', + new_type='account', new_control='required', new_path='pam_limits.so')) + + # Insert After def test_insert_after_rule(self): - old_rule = PamdRule.rulefromstring('account required pam_unix.so') - new_rule = PamdRule.rulefromstring('account required pam_permit.so arg1 arg2 arg3') - insert_after_rule(self.pamd, old_rule, new_rule) - line_to_test = str(old_rule).rstrip() - line_to_test += '\n' - line_to_test += str(new_rule).rstrip() - self.assertIn(line_to_test, str(self.pamd)) - - def test_insert_after_rule_another(self): - old_rule = PamdRule.rulefromstring('auth sufficient pam_rootok.so') - new_rule = PamdRule.rulefromstring('auth required pam_wheel.so use_id') - insert_after_rule(self.pamd, old_rule, new_rule) - line_to_test = str(old_rule).rstrip() - line_to_test += '\n' - line_to_test += str(new_rule).rstrip() - self.assertIn(line_to_test, str(self.pamd)) + self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_unix.so', + new_type='account', new_control='required', new_path='pam_permit.so')) + rules = self.pamd.get("account", "required", "pam_unix.so") + for current_rule in rules: + self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so")) + + def test_insert_after_rule_with_args(self): + self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_access.so', + new_type='account', new_control='required', new_path='pam_permit.so', + new_args='uid')) + rules = self.pamd.get("account", "required", "pam_access.so") + for current_rule in rules: + self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid")) + + def test_insert_after_test_duplicates(self): + self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_access.so', + new_type='account', new_control='required', new_path='pam_permit.so', + new_args='uid')) + self.assertFalse(self.pamd.insert_after('account', 'required', 'pam_access.so', + new_type='account', new_control='required', new_path='pam_permit.so', + new_args='uid')) + + rules = self.pamd.get("account", "required", "pam_access.so") + for current_rule in rules: + self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid")) + self.assertFalse(current_rule.next.next.matches("account", "required", "pam_permit.so", "uid")) def test_insert_after_rule_last_rule(self): - old_rule = PamdRule.rulefromstring('session required pam_unix.so') - new_rule = PamdRule.rulefromstring('session required pam_permit.so arg1 arg2 arg3') - insert_after_rule(self.pamd, old_rule, new_rule) - line_to_test = str(old_rule).rstrip() - line_to_test += '\n' - line_to_test += str(new_rule).rstrip() - self.assertIn(line_to_test, str(self.pamd)) - + self.assertTrue(self.pamd.insert_after('session', 'required', 'pam_unix.so', + new_type='account', new_control='required', new_path='pam_permit.so', + new_args='uid')) + rules = self.pamd.get("session", "required", "pam_unix.so") + for current_rule in rules: + self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid")) + + # Remove Module Arguments def test_remove_module_arguments_one(self): - old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') - new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so try_first_pass') - args_to_remove = ['nullok'] - remove_module_arguments(self.pamd, old_rule, args_to_remove) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.remove_module_arguments('auth', 'sufficient', 'pam_unix.so', 'nullok')) + + def test_remove_module_arguments_one_list(self): + self.assertTrue(self.pamd.remove_module_arguments('auth', 'sufficient', 'pam_unix.so', ['nullok'])) def test_remove_module_arguments_two(self): - old_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid') - new_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so in quiet use_uid') - args_to_remove = ['service', 'crond'] - remove_module_arguments(self.pamd, old_rule, args_to_remove) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.remove_module_arguments('session', '[success=1 default=ignore]', 'pam_succeed_if.so', 'service crond')) + + def test_remove_module_arguments_two_list(self): + self.assertTrue(self.pamd.remove_module_arguments('session', '[success=1 default=ignore]', 'pam_succeed_if.so', ['service', 'crond'])) + + def test_remove_module_arguments_where_none_existed(self): + self.assertTrue(self.pamd.add_module_arguments('session', 'required', 'pam_limits.so', 'arg1 arg2= arg3=arg3')) def test_add_module_arguments_where_none_existed(self): - old_rule = PamdRule.rulefromstring('account required pam_unix.so') - new_rule = PamdRule.rulefromstring('account required pam_unix.so arg1 arg2= arg3=arg3') - args_to_add = ['arg1', 'arg2=', 'arg3=arg3'] - add_module_arguments(self.pamd, old_rule, args_to_add) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.add_module_arguments('account', 'required', 'pam_unix.so', 'arg1 arg2= arg3=arg3')) + + def test_add_module_arguments_where_none_existed_list(self): + self.assertTrue(self.pamd.add_module_arguments('account', 'required', 'pam_unix.so', ['arg1', 'arg2=', 'arg3=arg3'])) def test_add_module_arguments_where_some_existed(self): - old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') - new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass arg1 arg2= arg3=arg3') - args_to_add = ['arg1', 'arg2=', 'arg3=arg3'] - add_module_arguments(self.pamd, old_rule, args_to_add) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.add_module_arguments('auth', 'sufficient', 'pam_unix.so', 'arg1 arg2= arg3=arg3')) def test_remove_rule(self): - old_rule = PamdRule.rulefromstring('account required pam_unix.so') - remove_rule(self.pamd, old_rule) - self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) - - def test_add_module_args_with_string(self): - old_rule = PamdRule.rulefromstring("account required pam_access.so") - new_rule = PamdRule.rulefromstring("account required pam_access.so listsep=,") - args_to_add = ['listsep=,'] - add_module_arguments(self.pamd, old_rule, args_to_add) - self.assertIn(str(new_rule).rstrip(), str(self.pamd)) + self.assertTrue(self.pamd.remove('account', 'required', 'pam_unix.so')) + test_rule = PamdRule('account', 'required', 'pam_unix.so') + self.assertNotIn(str(test_rule), str(self.pamd))