* Cleaner, more pythonic, shorter, easier to maintain

* Added validation
pull/40610/head
Ken Evensen 6 years ago committed by Adam Miller
parent a3cfe0d72f
commit fabce98104

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# (c) 2017, Kenneth D. Evensen <kevensen@redhat.com> # (c) 2017, Kenneth D. Evensen <kdevensen@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
@ -89,6 +89,14 @@ options:
default: /etc/pam.d/ default: /etc/pam.d/
description: description:
- This is the path to the PAM service files - This is the path to the PAM service files
backup:
description:
- Create a backup file including the timestamp information so you can
get the original file back if you somehow clobbered it incorrectly.
type: bool
default: 'no'
version_added: '2.6'
""" """
EXAMPLES = """ EXAMPLES = """
@ -157,7 +165,8 @@ EXAMPLES = """
- name: Remove specific arguments from a rule - name: Remove specific arguments from a rule
pamd: pamd:
name: system-auth name: system-auth
type: session control='[success=1 default=ignore]' type: session
control: '[success=1 default=ignore]'
module_path: pam_succeed_if.so module_path: pam_succeed_if.so
module_arguments: crond,quiet module_arguments: crond,quiet
state: args_absent state: args_absent
@ -221,13 +230,14 @@ change_count:
returned: success returned: success
version_added: 2.4 version_added: 2.4
new_rule: new_rule:
description: The changes to the rule description: The changes to the rule. This was available in Ansible version 2.4 and 2.5. It was removed in 2.6.
type: string type: string
sample: None None None sha512 shadow try_first_pass use_authtok sample: None None None sha512 shadow try_first_pass use_authtok
returned: success returned: success
version_added: 2.4 version_added: 2.4
updated_rule_(n): updated_rule_(n):
description: The rule(s) that was/were changed description: The rule(s) that was/were changed. This is only available in
Ansible version 2.4 and was removed in 2.5.
type: string type: string
sample: sample:
- password sufficient pam_unix.so sha512 shadow try_first_pass - password sufficient pam_unix.so sha512 shadow try_first_pass
@ -250,346 +260,418 @@ dest:
returned: success returned: success
type: string type: string
sample: "/etc/pam.d/system-auth" sample: "/etc/pam.d/system-auth"
backupdest:
description:
- "The file name of the the backup file, if created."
returned: success
type: string
version_added: 2.6
... ...
''' '''
from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native
import os import os
import re import re
import time from tempfile import NamedTemporaryFile
from datetime import datetime
RULE_REGEX = re.compile(r"""(?P<rule_type>auth|account|session|password)\s+
(?P<control>\[.*\]|\S*)\s+
(?P<path>\S*)\s?
(?P<args>.*)""", re.X)
class PamdLine(object):
def __init__(self, line):
self.line = line
self.prev = None
self.next = None
# Method to check if a rule matches the type, control and path.
def matches(self, rule_type, rule_control, rule_path, rule_args=None):
return False
def __str__(self):
return str(self.line)
class PamdComment(PamdLine):
def __init__(self, line):
super(PamdComment, self).__init__(line)
@property
def is_valid(self):
if self.line.startswith('#'):
return True
return False
class PamdInclude(PamdLine):
def __init__(self, line):
super(PamdInclude, self).__init__(line)
@property
def is_valid(self):
if self.line.startswith('@include'):
return True
return False
# The PamdRule class encapsulates a rule in a pam.d service
class PamdRule(object):
def __init__(self, rule_type, class PamdRule(PamdLine):
rule_control, rule_module_path,
rule_module_args=None):
valid_types = ['account', 'auth', 'password', 'session']
valid_simple_controls = ['required', 'requisite', 'sufficicent', 'optional', 'include', 'substack']
valid_control_values = ['success', 'open_err', 'symbol_err', 'service_err', 'system_err', 'buf_err',
'perm_denied', 'auth_err', 'cred_insufficient', 'authinfo_unavail', 'user_unknown',
'maxtries', 'new_authtok_reqd', 'acct_expired', 'session_err', 'cred_unavail',
'cred_expired', 'cred_err', 'no_module_data', 'conv_err', 'authtok_err',
'authtok_recover_err', 'authtok_lock_busy', 'authtok_disable_aging', 'try_again',
'ignore', 'abort', 'authtok_expired', 'module_unknown', 'bad_item', 'conv_again',
'incomplete', 'default']
valid_control_actions = ['ignore', 'bad', 'die', 'ok', 'done', 'reset']
def __init__(self, rule_type, rule_control, rule_path, rule_args=None):
self._control = None
self._args = None
self.rule_type = rule_type self.rule_type = rule_type
self.rule_control = rule_control self.rule_control = rule_control
self.rule_module_path = rule_module_path
try: self.rule_path = rule_path
if (rule_module_args is not None and self.rule_args = rule_args
type(rule_module_args) is list):
self.rule_module_args = rule_module_args # Method to check if a rule matches the type, control and path.
elif (rule_module_args is not None and def matches(self, rule_type, rule_control, rule_path, rule_args=None):
type(rule_module_args) is str): if (rule_type == self.rule_type and
self.rule_module_args = rule_module_args.split() rule_control == self.rule_control and
except AttributeError: rule_path == self.rule_path):
self.rule_module_args = [] return True
return False
@classmethod @classmethod
def rulefromstring(cls, stringline): def rule_from_string(cls, line):
pattern = None match = RULE_REGEX.search(line)
return cls(match.group('rule_type'), match.group('control'), match.group('path'), match.group('args'))
rule_type = ''
rule_control = '' def __str__(self):
rule_module_path = '' if self.rule_args:
rule_module_args = '' return '{0: <11}{1} {2} {3}'.format(self.rule_type, self.rule_control, self.rule_path, ' '.join(self.rule_args))
complicated = False return '{0: <11}{1} {2}'.format(self.rule_type, self.rule_control, self.rule_path)
if '[' in stringline: @property
pattern = re.compile( def rule_control(self):
r"""([\-A-Za-z0-9_]+)\s* # Rule Type if isinstance(self._control, list):
\[([A-Za-z0-9_=\s]+)\]\s* # Rule Control return '[' + ' '.join(self._control) + ']'
([A-Za-z0-9/_\-\.]+)\s* # Rule Path return self._control
([A-Za-z0-9,_=<>\-\s\./]*)""", # Rule Args
re.X) @rule_control.setter
complicated = True def rule_control(self, control):
if control.startswith('['):
control = control.replace(' = ', '=').replace('[', '').replace(']', '')
self._control = control.split(' ')
else: else:
pattern = re.compile( self._control = control
r"""([@\-A-Za-z0-9_]+)\s* # Rule Type
([A-Za-z0-9_\-]+)\s* # Rule Control @property
([A-Za-z0-9/_\-\.]*)\s* # Rule Path def rule_args(self):
([A-Za-z0-9,_=<>\-\s\./]*)""", # Rule Args if not self._args:
re.X) return []
return self._args
result = pattern.match(stringline)
rule_type = result.group(1) @rule_args.setter
if complicated: def rule_args(self, args):
rule_control = '[' + result.group(2) + ']' if isinstance(args, str):
args = args.replace(" = ", "=")
self._args = args.split(" ")
else: else:
rule_control = result.group(2) self._args = args
rule_module_path = result.group(3)
if result.group(4) is not None:
rule_module_args = result.group(4)
return cls(rule_type, rule_control, rule_module_path, rule_module_args) @property
def line(self):
return str(self)
def get_module_args_as_string(self): @classmethod
def is_action_unsigned_int(cls, string_num):
number = 0
try: try:
if self.rule_module_args is not None: number = int(string_num)
return ' '.join(self.rule_module_args) except ValueError:
except AttributeError: return False
pass
return '' if number >= 0:
return True
return False
def validate(self):
# Validate the rule type
if self.rule_type not in PamdRule.valid_types:
return False, "Rule type, " + self.rule_type + ", is not valid in rule " + self.line
# Validate the rule control
if isinstance(self.rule_control, str) and self.rule_control not in PamdRule.valid_simple_controls:
return False, "Rule control, " + self.rule_control + ", is not valid in rule " + self.line
elif isinstance(self.rule_control, list):
for control in self.rule_control:
value, action = control.split("=")
if value not in PamdRule.valid_control_values:
return False, "Rule control value, " + value + ", is not valid in rule " + self.line
if action not in PamdRule.valid_control_actions and not PamdRule.is_action_unsigned_int(action):
return False, "Rule control action, " + action + ", is not valid in rule " + self.line
# TODO: Validate path
return True, "Rule is valid " + self.line
# PamdService encapsulates an entire service and contains one or more rules. It seems the best way is to do this
# as a doubly linked list.
class PamdService(object):
def __str__(self): def __init__(self, content):
return "%-10s\t%s\t%s %s" % (self.rule_type, self._head = None
self.rule_control, self._tail = None
self.rule_module_path, for line in content.splitlines():
self.get_module_args_as_string()) if line.lstrip().startswith('#'):
pamd_line = PamdComment(line)
elif line.lstrip().startswith('@include'):
pamd_line = PamdInclude(line)
elif line == '':
pamd_line = PamdLine(line)
else:
pamd_line = PamdRule.rule_from_string(line)
self.append(pamd_line)
def append(self, pamd_line):
if self._head is None:
self._head = self._tail = pamd_line
else:
pamd_line.prev = self._tail
pamd_line.next = None
self._tail.next = pamd_line
self._tail = pamd_line
def remove(self, rule_type, rule_control, rule_path):
current_line = self._head
changed = 0
while current_line is not None:
if current_line.matches(rule_type, rule_control, rule_path):
if current_line.prev is not None:
current_line.prev.next = current_line.next
current_line.next.prev = current_line.prev
else:
self._head = current_line.next
current_line.next.prev = None
changed += 1
current_line = current_line.next
return changed
def get(self, rule_type, rule_control, rule_path):
lines = []
current_line = self._head
while current_line is not None:
if isinstance(current_line, PamdRule) and current_line.matches(rule_type, rule_control, rule_path):
lines.append(current_line)
current_line = current_line.next
return lines
def has_rule(self, rule_type, rule_control, rule_path):
if self.get(rule_type, rule_control, rule_path):
return True
return False
def update_rule(self, rule_type, rule_control, rule_path,
new_type=None, new_control=None, new_path=None, new_args=None):
# Get a list of rules we want to change
rules_to_find = self.get(rule_type, rule_control, rule_path)
for current_rule in rules_to_find:
if new_type:
current_rule.rule_type = new_type
if new_control:
current_rule.rule_control = new_control
if new_path:
current_rule.rule_path = new_path
if new_args:
if isinstance(new_args, str):
new_args = new_args.replace(" = ", "=")
new_args = new_args.split(' ')
current_rule.rule_args = new_args
return len(rules_to_find)
def insert_before(self, rule_type, rule_control, rule_path,
new_type=None, new_control=None, new_path=None, new_args=None):
# Get a list of rules we want to change
rules_to_find = self.get(rule_type, rule_control, rule_path)
changed = 0
# There are two cases to consider.
# 1. The new rule doesn't exist before the existing rule
# 2. The new rule exists
for current_rule in rules_to_find:
# Create a new rule
new_rule = PamdRule(new_type, new_control, new_path, new_args)
# First we'll get the previous rule.
previous_rule = current_rule.prev
# Next we may have to loop backwards if the previous line is a comment. If it
# is, we'll get the previous "rule's" previous.
while previous_rule is not None and isinstance(previous_rule, PamdComment):
previous_rule = previous_rule.prev
# Next we'll see if the previous rule matches what we are trying to insert.
if previous_rule is not None and not previous_rule.matches(new_type, new_control, new_path):
# First set the original previous rule's next to the new_rule
previous_rule.next = new_rule
# Second, set the new_rule's previous to the original previous
new_rule.prev = previous_rule
# Third, set the new rule's next to the current rule
new_rule.next = current_rule
# Fourth, set the current rule's previous to the new_rule
current_rule.prev = new_rule
changed += 1
# Handle the case where it is the first rule in the list.
elif previous_rule is None:
# This is the case where the current rule is not only the first rule
# but the first line as well. So we set the head to the new rule
if current_rule.prev is None:
self._head = new_rule
# This case would occur if the previous line was a comment.
else:
current_rule.prev.next = new_rule
new_rule.prev = current_rule.prev
new_rule.next = current_rule
current_rule.prev = new_rule
changed += 1
return changed
def insert_after(self, rule_type, rule_control, rule_path,
new_type=None, new_control=None, new_path=None, new_args=None):
# Get a list of rules we want to change
rules_to_find = self.get(rule_type, rule_control, rule_path)
changed = 0
# There are two cases to consider.
# 1. The new rule doesn't exist after the existing rule
# 2. The new rule exists
for current_rule in rules_to_find:
# First we'll get the next rule.
next_rule = current_rule.next
# Next we may have to loop forwards if the next line is a comment. If it
# is, we'll get the next "rule's" next.
while next_rule is not None and isinstance(next_rule, PamdComment):
next_rule = next_rule.next
# First we create a new rule
new_rule = PamdRule(new_type, new_control, new_path, new_args)
if next_rule is not None and not next_rule.matches(new_type, new_control, new_path):
# If the previous rule doesn't match we'll insert our new rule.
# Second set the original next rule's previuous to the new_rule
next_rule.prev = new_rule
# Third, set the new_rule's next to the original next rule
new_rule.next = next_rule
# Fourth, set the new rule's previous to the current rule
new_rule.prev = current_rule
# Fifth, set the current rule's next to the new_rule
current_rule.next = new_rule
changed += 1
# This is the case where the current_rule is the last in the list
elif next_rule is None:
new_rule.prev = self._tail
new_rule.next = None
self._tail.next = new_rule
self._tail = new_rule
current_rule.next = new_rule
changed += 1
return changed
def add_module_arguments(self, rule_type, rule_control, rule_path, args_to_add):
# Get a list of rules we want to change
rules_to_find = self.get(rule_type, rule_control, rule_path)
changed = 0
for current_rule in rules_to_find:
if isinstance(args_to_add, str):
args_to_add = args_to_add.replace(" = ", "=")
args_to_add = args_to_add.split(' ')
if not args_to_add:
args_to_add = []
# Create a list of new args that aren't already present
new_args = [arg for arg in args_to_add if arg not in current_rule.rule_args]
# If there aren't any new args to add, we'll move on to the next rule
if not new_args:
continue
current_rule.rule_args = current_rule.rule_args + new_args
changed += 1
return changed
def remove_module_arguments(self, rule_type, rule_control, rule_path, args_to_remove):
# Get a list of rules we want to change
rules_to_find = self.get(rule_type, rule_control, rule_path)
changed = 0
for current_rule in rules_to_find:
if isinstance(args_to_remove, str):
args_to_remove = args_to_remove.replace(" = ", "=")
args_to_remove = args_to_remove.split(' ')
if not args_to_remove:
args_to_remove = []
# Let's check to see if there are any args to remove by finding the intersection
# of the rule's current args and the args_to_remove lists
if not list(set(current_rule.rule_args) & set(args_to_remove)):
continue
# There are args to remove, so we create a list of new_args absent the args
# to remove.
current_rule.rule_args = [arg for arg in current_rule.rule_args if arg not in args_to_remove]
changed += 1
return changed
def validate(self):
current_line = self._head
while current_line is not None:
if not current_line.is_valid()[0]:
return current_line.is_valid()
current_line = current_line.next
return True, "Module is valid"
def __str__(self):
lines = []
current_line = self._head
# PamdService encapsulates an entire service and contains one or more rules while current_line is not None:
class PamdService(object): lines.append(str(current_line))
current_line = current_line.next
def __init__(self, ansible=None): if lines[1].startswith("# Updated by Ansible"):
lines.pop(1)
if ansible is not None:
self.check = ansible.check_mode
self.check = False
self.ansible = ansible
self.preamble = []
self.rules = []
self.fname = None
if ansible is not None:
self.path = self.ansible.params["path"]
self.name = self.ansible.params["name"]
def load_rules_from_file(self):
self.fname = os.path.join(self.path, self.name)
stringline = ''
try:
for line in open(self.fname, 'r'):
stringline += line.rstrip().lstrip()
stringline += '\n'
self.load_rules_from_string(stringline.replace("\\\n", ""))
except IOError as e:
self.ansible.fail_json(msg='Unable to open/read PAM module \
file %s with error %s. And line %s' %
(self.fname, to_native(e), stringline))
def load_rules_from_string(self, stringvalue):
for line in stringvalue.splitlines():
stringline = line.rstrip()
if line.startswith('#') and not line.isspace():
self.preamble.append(line.rstrip())
elif (not line.startswith('#') and
not line.isspace() and
len(line) != 0):
try:
self.ansible.log(msg="Creating rule from string %s" % stringline)
except AttributeError:
pass
self.rules.append(PamdRule.rulefromstring(stringline))
def write(self):
if self.fname is None:
self.fname = self.path + "/" + self.name
# If the file is a symbollic link, we'll write to the source.
pamd_file = os.path.realpath(self.fname)
temp_file = "/tmp/" + self.name + "_" + time.strftime("%y%m%d%H%M%S")
try:
f = open(temp_file, 'w')
f.write(str(self))
f.close()
except IOError:
self.ansible.fail_json(msg='Unable to create temporary \
file %s' % self.temp_file)
self.ansible.atomic_move(temp_file, pamd_file) lines.insert(1, "# Updated by Ansible - " + datetime.now().isoformat())
def __str__(self): return '\n'.join(lines)
stringvalue = ''
previous_rule = None
for amble in self.preamble:
stringvalue += amble
stringvalue += '\n'
for rule in self.rules:
if (previous_rule is not None and
(previous_rule.rule_type.replace('-', '') !=
rule.rule_type.replace('-', ''))):
stringvalue += '\n'
stringvalue += str(rule).rstrip()
stringvalue += '\n'
previous_rule = rule
if stringvalue.endswith('\n'):
stringvalue = stringvalue[:-1]
return stringvalue
def update_rule(service, old_rule, new_rule):
changed = False
change_count = 0
result = {'action': 'update_rule'}
for rule in service.rules:
if (old_rule.rule_type == rule.rule_type and
old_rule.rule_control == rule.rule_control and
old_rule.rule_module_path == rule.rule_module_path):
if (new_rule.rule_type is not None and
new_rule.rule_type != rule.rule_type):
rule.rule_type = new_rule.rule_type
changed = True
if (new_rule.rule_control is not None and
new_rule.rule_control != rule.rule_control):
rule.rule_control = new_rule.rule_control
changed = True
if (new_rule.rule_module_path is not None and
new_rule.rule_module_path != rule.rule_module_path):
rule.rule_module_path = new_rule.rule_module_path
changed = True
try:
if (new_rule.rule_module_args is not None and
new_rule.get_module_args_as_string() !=
rule.get_module_args_as_string()):
rule.rule_module_args = new_rule.rule_module_args
changed = True
except AttributeError:
pass
if changed:
result['updated_rule_' + str(change_count)] = str(rule)
result['new_rule'] = str(new_rule)
change_count += 1
result['change_count'] = change_count
return changed, result
def insert_before_rule(service, old_rule, new_rule):
index = 0
change_count = 0
result = {'action':
'insert_before_rule'}
changed = False
for rule in service.rules:
if (old_rule.rule_type == rule.rule_type and
old_rule.rule_control == rule.rule_control and
old_rule.rule_module_path == rule.rule_module_path):
if index == 0:
service.rules.insert(0, new_rule)
changed = True
elif (new_rule.rule_type != service.rules[index - 1].rule_type or
new_rule.rule_control !=
service.rules[index - 1].rule_control or
new_rule.rule_module_path !=
service.rules[index - 1].rule_module_path):
service.rules.insert(index, new_rule)
changed = True
if changed:
result['new_rule'] = str(new_rule)
result['before_rule_' + str(change_count)] = str(rule)
change_count += 1
index += 1
result['change_count'] = change_count
return changed, result
def insert_after_rule(service, old_rule, new_rule):
index = 0
change_count = 0
result = {'action': 'insert_after_rule'}
changed = False
for rule in service.rules:
if (old_rule.rule_type == rule.rule_type and
old_rule.rule_control == rule.rule_control and
old_rule.rule_module_path == rule.rule_module_path):
if (index == len(service.rules) - 1):
service.rules.insert(len(service.rules), new_rule)
changed = True
elif (new_rule.rule_type != service.rules[index + 1].rule_type or
new_rule.rule_control !=
service.rules[index + 1].rule_control or
new_rule.rule_module_path !=
service.rules[index + 1].rule_module_path):
service.rules.insert(index + 1, new_rule)
changed = True
if changed:
result['new_rule'] = str(new_rule)
result['after_rule_' + str(change_count)] = str(rule)
change_count += 1
index += 1
result['change_count'] = change_count
return changed, result
def remove_module_arguments(service, old_rule, module_args):
result = {'action': 'args_absent'}
changed = False
change_count = 0
for rule in service.rules:
if (old_rule.rule_type == rule.rule_type and
old_rule.rule_control == rule.rule_control and
old_rule.rule_module_path == rule.rule_module_path):
for arg_to_remove in module_args:
for arg in rule.rule_module_args:
if arg == arg_to_remove:
rule.rule_module_args.remove(arg)
changed = True
result['removed_arg_' + str(change_count)] = arg
result['from_rule_' + str(change_count)] = str(rule)
change_count += 1
result['change_count'] = change_count
return changed, result
def add_module_arguments(service, old_rule, module_args):
result = {'action': 'args_present'}
changed = False
change_count = 0
for rule in service.rules:
if (old_rule.rule_type == rule.rule_type and
old_rule.rule_control == rule.rule_control and
old_rule.rule_module_path == rule.rule_module_path):
for arg_to_add in module_args:
if "=" in arg_to_add:
pre_string = arg_to_add[:arg_to_add.index('=') + 1]
indicies = [i for i, arg
in enumerate(rule.rule_module_args)
if arg.startswith(pre_string)]
if len(indicies) == 0:
rule.rule_module_args.append(arg_to_add)
changed = True
result['added_arg_' + str(change_count)] = arg_to_add
result['to_rule_' + str(change_count)] = str(rule)
change_count += 1
else:
for i in indicies:
if rule.rule_module_args[i] != arg_to_add:
rule.rule_module_args[i] = arg_to_add
changed = True
result['updated_arg_' +
str(change_count)] = arg_to_add
result['in_rule_' +
str(change_count)] = str(rule)
change_count += 1
elif arg_to_add not in rule.rule_module_args:
rule.rule_module_args.append(arg_to_add)
changed = True
result['added_arg_' + str(change_count)] = arg_to_add
result['to_rule_' + str(change_count)] = str(rule)
change_count += 1
result['change_count'] = change_count
return changed, result
def remove_rule(service, old_rule):
result = {'action': 'absent'}
changed = False
change_count = 0
for rule in service.rules:
if (old_rule.rule_type == rule.rule_type and
old_rule.rule_control == rule.rule_control and
old_rule.rule_module_path == rule.rule_module_path):
service.rules.remove(rule)
changed = True
return changed, result
def main(): def main():
@ -611,7 +693,8 @@ def main():
state=dict(required=False, default="updated", state=dict(required=False, default="updated",
choices=['before', 'after', 'updated', choices=['before', 'after', 'updated',
'args_absent', 'args_present', 'absent']), 'args_absent', 'args_present', 'absent']),
path=dict(required=False, default='/etc/pam.d', type='str') path=dict(required=False, default='/etc/pam.d', type='str'),
backup=dict(default=False, type='bool')
), ),
supports_check_mode=True, supports_check_mode=True,
required_if=[ required_if=[
@ -626,66 +709,77 @@ def main():
] ]
) )
content = str()
fname = os.path.join(module.params["path"], module.params["name"])
backupdest = ""
# Open the file and read the content or fail
try:
with open(fname, 'r') as service_file_obj:
content = service_file_obj.read()
except IOError as e:
# If unable to read the file, fail out
module.fail_json(msg='Unable to open/read PAM module \
file %s with error %s.' %
(fname, str(e)))
# Assuming we didnt fail, create the service
service = PamdService(content)
# Set the action
action = module.params['state']
# Take action
if action == 'updated':
changes = service.update_rule(module.params['type'], module.params['control'], module.params['module_path'],
module.params['new_type'], module.params['new_control'], module.params['new_module_path'],
module.params['module_arguments'])
elif action == 'before':
changes = service.insert_before(module.params['type'], module.params['control'], module.params['module_path'],
module.params['new_type'], module.params['new_control'], module.params['new_module_path'],
module.params['module_arguments'])
elif action == 'after':
changes = service.insert_after(module.params['type'], module.params['control'], module.params['module_path'],
module.params['new_type'], module.params['new_control'], module.params['new_module_path'],
module.params['module_arguments'])
elif action == 'args_absent':
changes = service.remove_module_arguments(module.params['type'], module.params['control'], module.params['module_path'],
module.params['module_arguments'])
elif action == 'args_present':
changes = service.add_module_arguments(module.params['type'], module.params['control'], module.params['module_path'],
module.params['module_arguments'])
elif action == 'absent':
changes = service.remove(module.params['type'], module.params['control'], module.params['module_path'])
valid, msg = service.validate()
# If the module is not valid (meaning one of the rules is invalid), we will fail
if not valid:
module.fail_json(msg=msg)
# If not check mode and something changed, backup the original if necessary then write out the file or fail
if not module.check_mode and changes > 0:
pamd_file = os.path.realpath(fname)
# First, create a backup if desired.
if module.params['backup']:
backupdest = module.backup_local(fname)
print("BACKUP DEST", backupdest)
try:
temp_file = NamedTemporaryFile(mode='w')
with open(temp_file.name, 'w') as fd:
fd.write(str(service))
service = module.params['name'] except IOError:
old_type = module.params['type'] module.fail_json(msg='Unable to create temporary \
old_control = module.params['control'] file %s' % temp_file)
old_module_path = module.params['module_path']
new_type = module.params['new_type']
new_control = module.params['new_control']
new_module_path = module.params['new_module_path']
module_arguments = module.params['module_arguments']
state = module.params['state']
path = module.params['path']
pamd = PamdService(module)
pamd.load_rules_from_file()
old_rule = PamdRule(old_type,
old_control,
old_module_path)
new_rule = PamdRule(new_type,
new_control,
new_module_path,
module_arguments)
if state == 'updated':
change, result = update_rule(pamd,
old_rule,
new_rule)
elif state == 'before':
change, result = insert_before_rule(pamd,
old_rule,
new_rule)
elif state == 'after':
change, result = insert_after_rule(pamd,
old_rule,
new_rule)
elif state == 'args_absent':
change, result = remove_module_arguments(pamd,
old_rule,
module_arguments)
elif state == 'args_present':
change, result = add_module_arguments(pamd,
old_rule,
module_arguments)
elif state == 'absent':
change, result = remove_rule(pamd,
old_rule)
if not module.check_mode and change:
pamd.write()
facts = {}
facts['pamd'] = {'changed': change, 'result': result}
module.params['dest'] = pamd.fname module.atomic_move(temp_file.name, pamd_file)
module.exit_json(changed=change, ansible_facts=facts) facts = {}
facts['pamd'] = {'changed': changes > 0,
'change_count': changes,
'action': action,
'backupdest': backupdest}
module.exit_json(changed=changes > 0, ansible_facts=facts)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

@ -1,92 +1,98 @@
from __future__ import (absolute_import, division, print_function)
from ansible.compat.tests import unittest from ansible.compat.tests import unittest
from ansible.modules.system.pamd import PamdRule from ansible.modules.system.pamd import PamdRule
from ansible.modules.system.pamd import PamdLine
from ansible.modules.system.pamd import PamdComment
from ansible.modules.system.pamd import PamdInclude
from ansible.modules.system.pamd import PamdService from ansible.modules.system.pamd import PamdService
from ansible.modules.system.pamd import update_rule
from ansible.modules.system.pamd import insert_before_rule
from ansible.modules.system.pamd import insert_after_rule
from ansible.modules.system.pamd import remove_module_arguments
from ansible.modules.system.pamd import add_module_arguments
from ansible.modules.system.pamd import remove_rule
import re
class PamdLineTestCase(unittest.TestCase):
def setUp(self):
self.pamd_line = PamdLine("This is a test")
def test_line(self):
self.assertEqual("This is a test", str(self.pamd_line))
def test_matches(self):
self.assertFalse(self.pamd_line.matches("test", "matches", "foo", "bar"))
class PamdIncludeTestCase(unittest.TestCase):
def setUp(self):
self.good_include = PamdInclude("@include foobar")
self.bad_include = PamdInclude("include foobar")
def test_line(self):
self.assertEqual("@include foobar", str(self.good_include))
def test_matches(self):
self.assertFalse(self.good_include.matches("something", "something", "dark", "side"))
def test_valid(self):
self.assertTrue(self.good_include.is_valid)
self.assertFalse(self.bad_include.is_valid)
class PamdCommentTestCase(unittest.TestCase):
def setUp(self):
self.good_comment = PamdComment("# This is a test comment")
self.bad_comment = PamdComment("This is a bad test comment")
def test_line(self):
self.assertEqual("# This is a test comment", str(self.good_comment))
def test_matches(self):
self.assertFalse(self.good_comment.matches("test", "matches", "foo", "bar"))
def test_valid(self):
self.assertTrue(self.good_comment.is_valid)
self.assertFalse(self.bad_comment.is_valid)
class PamdRuleTestCase(unittest.TestCase): class PamdRuleTestCase(unittest.TestCase):
def setUp(self):
self.rule = PamdRule('account', 'optional', 'pam_keyinit.so', 'revoke')
def test_type(self):
self.assertEqual(self.rule.rule_type, 'account')
def test_control(self):
self.assertEqual(self.rule.rule_control, 'optional')
self.assertEqual(self.rule._control, 'optional')
def test_path(self):
self.assertEqual(self.rule.rule_path, 'pam_keyinit.so')
def test_args(self):
self.assertEqual(self.rule.rule_args, ['revoke'])
def test_valid(self):
self.assertTrue(self.rule.validate()[0])
class PamdRuleBadValidationTestCase(unittest.TestCase):
def setUp(self):
self.bad_type = PamdRule('foobar', 'optional', 'pam_keyinit.so', 'revoke')
self.bad_control_simple = PamdRule('account', 'foobar', 'pam_keyinit.so', 'revoke')
self.bad_control_value = PamdRule('account', '[foobar=1 default=ignore]', 'pam_keyinit.so', 'revoke')
self.bad_control_action = PamdRule('account', '[success=1 default=foobar]', 'pam_keyinit.so', 'revoke')
def test_simple(self): def test_validate_bad_type(self):
simple = "auth required pam_env.so".rstrip() self.assertFalse(self.bad_type.validate()[0])
module = PamdRule.rulefromstring(stringline=simple)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) def test_validate_bad_control_simple(self):
self.assertEqual(simple, module_string.rstrip()) self.assertFalse(self.bad_control_simple.validate()[0])
self.assertEqual('', module.get_module_args_as_string())
def test_validate_bad_control_value(self):
def test_simple_more(self): self.assertFalse(self.bad_control_value.validate()[0])
simple = "auth required pam_tally2.so deny=5 onerr=fail".rstrip()
module = PamdRule.rulefromstring(stringline=simple) def test_validate_bad_control_action(self):
module_string = re.sub(' +', ' ', str(module).replace('\t', ' ')) self.assertFalse(self.bad_control_action.validate()[0])
self.assertEqual(simple, module_string.rstrip())
self.assertEqual('deny=5 onerr=fail',
module.get_module_args_as_string())
def test_complicated_rule(self):
complicated = "-auth [default=1 success=ok] pam_localuser.so".rstrip()
module = PamdRule.rulefromstring(stringline=complicated)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(complicated, module_string.rstrip())
self.assertEqual('', module.get_module_args_as_string())
def test_more_complicated_rule(self):
complicated = "auth"
complicated += " [success=done ignore=ignore default=die]"
complicated += " pam_unix.so"
complicated += " try_first_pass".rstrip()
module = PamdRule.rulefromstring(stringline=complicated)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(complicated, module_string.rstrip())
self.assertEqual('try_first_pass', module.get_module_args_as_string())
def test_rule_with_arg(self):
line = "account optional pam_echo.so file=/etc/lockout.txt"
module = PamdRule.rulefromstring(stringline=line)
self.assertEqual(module.rule_type, 'account')
self.assertEqual(module.rule_control, 'optional')
self.assertEqual(module.rule_module_path, 'pam_echo.so')
self.assertEqual(module.rule_module_args, ['file=/etc/lockout.txt'])
def test_rule_with_args(self):
line = "account optional pam_echo.so file1=/etc/lockout1.txt file2=/etc/lockout2.txt"
module = PamdRule.rulefromstring(stringline=line)
self.assertEqual(module.rule_type, 'account')
self.assertEqual(module.rule_control, 'optional')
self.assertEqual(module.rule_module_path, 'pam_echo.so')
self.assertEqual(module.rule_module_args, ['file1=/etc/lockout1.txt', 'file2=/etc/lockout2.txt'])
def test_less_than_in_args(self):
rule = "auth requisite pam_succeed_if.so uid >= 1025 quiet_success"
module = PamdRule.rulefromstring(stringline=rule)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(rule, module_string.rstrip())
self.assertEqual('uid >= 1025 quiet_success', module.get_module_args_as_string())
def test_trailing_comma(self):
rule = "account required pam_access.so listsep=,"
module = PamdRule.rulefromstring(stringline=rule)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(rule, module_string.rstrip())
def test_slash_in_args(self):
rule = "auth sufficient /lib64/security/pam_duo.so".rstrip()
module = PamdRule.rulefromstring(stringline=rule)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(rule, module_string.rstrip())
self.assertEqual('', module.get_module_args_as_string())
def test_slash_in_args_more(self):
rule = "auth [success=1 default=ignore] /lib64/security/pam_duo.so".rstrip()
module = PamdRule.rulefromstring(stringline=rule)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(rule, module_string.rstrip())
self.assertEqual('', module.get_module_args_as_string())
class PamdServiceTestCase(unittest.TestCase): class PamdServiceTestCase(unittest.TestCase):
@ -94,182 +100,254 @@ class PamdServiceTestCase(unittest.TestCase):
self.system_auth_string = """#%PAM-1.0 self.system_auth_string = """#%PAM-1.0
# This file is auto-generated. # This file is auto-generated.
# User changes will be destroyed the next time authconfig is run. # User changes will be destroyed the next time authconfig is run.
auth \trequired\tpam_env.so @include common-auth
auth \tsufficient\tpam_unix.so nullok try_first_pass @include common-account
auth \trequisite\tpam_succeed_if.so uid @include common-session
auth \trequired\tpam_deny.so auth required pam_env.so
auth \tsufficient\tpam_rootok.so auth sufficient pam_unix.so nullok try_first_pass
auth requisite pam_succeed_if.so uid
account \trequired\tpam_unix.so auth required pam_deny.so
account \tsufficient\tpam_localuser.so # Test comment
account \tsufficient\tpam_succeed_if.so uid auth sufficient pam_rootok.so
account required pam_unix.so
account sufficient pam_localuser.so
account sufficient pam_succeed_if.so uid
account [success=1 default=ignore] \ account [success=1 default=ignore] \
\t\t\t\tpam_succeed_if.so user = vagrant use_uid quiet pam_succeed_if.so user = vagrant use_uid quiet
account \trequired\tpam_permit.so account required pam_permit.so
account \trequired\tpam_access.so listsep=, account required pam_access.so listsep=,
session \tinclude\tsystem-auth session include system-auth
password \trequisite\tpam_pwquality.so try_first_pass local_users_only retry=3 authtok_type= password requisite pam_pwquality.so try_first_pass local_users_only retry=3 authtok_type=
password \tsufficient\tpam_unix.so sha512 shadow nullok try_first_pass use_authtok password sufficient pam_unix.so sha512 shadow nullok try_first_pass use_authtok
password \trequired\tpam_deny.so password required pam_deny.so
session \toptional\tpam_keyinit.so revoke session optional pam_keyinit.so revoke
session \trequired\tpam_limits.so session required pam_limits.so
-session \toptional\tpam_systemd.so -session optional pam_systemd.so
session \t[success=1 default=ignore]\tpam_succeed_if.so service in crond quiet use_uid session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid
session \t[success=1 test=me default=ignore]\tpam_succeed_if.so service in crond quiet use_uid session [success=1 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid
session \trequired\tpam_unix.so session required pam_unix.so"""
@include \tcommon-auth
@include \tcommon-account self.simple_system_auth_string = """#%PAM-1.0
@include \tcommon-session""" auth required pam_env.so
"""
self.pamd = PamdService()
self.pamd.load_rules_from_string(self.system_auth_string) self.pamd = PamdService(self.system_auth_string)
def test_properly_parsed(self):
num_lines = len(self.system_auth_string.splitlines()) + 1
num_lines_processed = len(str(self.pamd).splitlines())
self.assertEqual(num_lines, num_lines_processed)
def test_has_rule(self):
self.assertTrue(self.pamd.has_rule('account', 'required', 'pam_permit.so'))
self.assertTrue(self.pamd.has_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so'))
def test_doesnt_have_rule(self):
self.assertFalse(self.pamd.has_rule('account', 'requisite', 'pam_permit.so'))
# Test Update
def test_update_rule_type(self): def test_update_rule_type(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so') self.assertTrue(self.pamd.update_rule('session', 'optional', 'pam_keyinit.so', new_type='account'))
new_rule = PamdRule.rulefromstring('session required pam_env.so') self.assertTrue(self.pamd.has_rule('account', 'optional', 'pam_keyinit.so'))
update_rule(self.pamd, old_rule, new_rule) test_rule = PamdRule('account', 'optional', 'pam_keyinit.so', 'revoke')
self.assertIn(str(new_rule).rstrip(), str(self.pamd)) self.assertIn(str(test_rule), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_that_doesnt_exist(self):
self.assertFalse(self.pamd.update_rule('blah', 'blah', 'blah', new_type='account'))
self.assertFalse(self.pamd.has_rule('blah', 'blah', 'blah'))
test_rule = PamdRule('blah', 'blah', 'blah', 'account')
self.assertNotIn(str(test_rule), str(self.pamd))
def test_update_rule_type_two(self):
self.assertTrue(self.pamd.update_rule('session', '[success=1 default=ignore]', 'pam_succeed_if.so', new_type='account'))
self.assertTrue(self.pamd.has_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so'))
test_rule = PamdRule('account', '[success=1 default=ignore]', 'pam_succeed_if.so')
self.assertIn(str(test_rule), str(self.pamd))
def test_update_rule_control_simple(self): def test_update_rule_control_simple(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so') self.assertTrue(self.pamd.update_rule('session', 'optional', 'pam_keyinit.so', new_control='required'))
new_rule = PamdRule.rulefromstring('auth sufficent pam_env.so') self.assertTrue(self.pamd.has_rule('session', 'required', 'pam_keyinit.so'))
update_rule(self.pamd, old_rule, new_rule) test_rule = PamdRule('session', 'required', 'pam_keyinit.so')
self.assertIn(str(new_rule).rstrip(), str(self.pamd)) self.assertIn(str(test_rule), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_control_complex(self): def test_update_rule_control_complex(self):
old_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid') self.assertTrue(self.pamd.update_rule('session',
new_rule = PamdRule.rulefromstring('session [success=2 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid') '[success=1 default=ignore]',
update_rule(self.pamd, old_rule, new_rule) 'pam_succeed_if.so',
self.assertIn(str(new_rule).rstrip(), str(self.pamd)) new_control='[success=2 test=me default=ignore]'))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) self.assertTrue(self.pamd.has_rule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so'))
test_rule = PamdRule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so')
self.assertIn(str(test_rule), str(self.pamd))
def test_update_rule_control_more_complex(self): def test_update_rule_control_more_complex(self):
old_rule = PamdRule.rulefromstring('session [success=1 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid')
new_rule = PamdRule.rulefromstring('session [success=2 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid') self.assertTrue(self.pamd.update_rule('session',
update_rule(self.pamd, old_rule, new_rule) '[success=1 test=me default=ignore]',
self.assertIn(str(new_rule).rstrip(), str(self.pamd)) 'pam_succeed_if.so',
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) new_control='[success=2 test=me default=ignore]'))
self.assertTrue(self.pamd.has_rule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so'))
test_rule = PamdRule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so')
self.assertIn(str(test_rule), str(self.pamd))
def test_update_rule_module_path(self): def test_update_rule_module_path(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so') self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', new_path='pam_limits.so'))
new_rule = PamdRule.rulefromstring('session required pam_limits.so') self.assertTrue(self.pamd.has_rule('auth', 'required', 'pam_limits.so'))
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_module_path_slash(self): def test_update_rule_module_path_slash(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so') self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', new_path='/lib64/security/pam_duo.so'))
new_rule = PamdRule.rulefromstring('auth required /lib64/security/pam_duo.so') self.assertTrue(self.pamd.has_rule('auth', 'required', '/lib64/security/pam_duo.so'))
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_module_args(self): def test_update_rule_module_args(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so', new_args='uid uid'))
new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so uid uid') test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'uid uid')
update_rule(self.pamd, old_rule, new_rule) self.assertIn(str(test_rule), str(self.pamd))
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'nullok try_first_pass')
self.assertNotIn(str(test_rule), str(self.pamd))
def test_update_first_three(self): def test_update_first_three(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so') self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so',
new_rule = PamdRule.rulefromstring('one two three') new_type='one', new_control='two', new_path='three'))
update_rule(self.pamd, old_rule, new_rule) self.assertTrue(self.pamd.has_rule('one', 'two', 'three'))
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_first_three_with_module_args(self): def test_update_first_three_with_module_args(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so',
new_rule = PamdRule.rulefromstring('one two three') new_type='one', new_control='two', new_path='three'))
update_rule(self.pamd, old_rule, new_rule) self.assertTrue(self.pamd.has_rule('one', 'two', 'three'))
self.assertIn(str(new_rule).rstrip(), str(self.pamd)) test_rule = PamdRule('one', 'two', 'three')
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) self.assertIn(str(test_rule), str(self.pamd))
self.assertIn(str(test_rule), str(self.pamd))
def test_update_all_four(self): def test_update_all_four(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so',
new_rule = PamdRule.rulefromstring('one two three four five') new_type='one', new_control='two', new_path='three',
update_rule(self.pamd, old_rule, new_rule) new_args='four five'))
self.assertIn(str(new_rule).rstrip(), str(self.pamd)) test_rule = PamdRule('one', 'two', 'three', 'four five')
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) self.assertIn(str(test_rule), str(self.pamd))
test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'nullok try_first_pass')
self.assertNotIn(str(test_rule), str(self.pamd))
def test_update_rule_with_slash(self):
self.assertTrue(self.pamd.update_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so',
new_type='session', new_path='pam_access.so'))
test_rule = PamdRule('session', '[success=1 default=ignore]', 'pam_access.so')
self.assertIn(str(test_rule), str(self.pamd))
# Insert Before
def test_insert_before_rule(self): def test_insert_before_rule(self):
old_rule = PamdRule.rulefromstring('account required pam_unix.so')
new_rule = PamdRule.rulefromstring('account required pam_permit.so')
insert_before_rule(self.pamd, old_rule, new_rule)
line_to_test = str(new_rule).rstrip()
line_to_test += '\n'
line_to_test += str(old_rule).rstrip()
self.assertIn(line_to_test, str(self.pamd))
count = self.pamd.insert_before('account', 'required', 'pam_access.so',
new_type='account', new_control='required', new_path='pam_limits.so')
self.assertEqual(count, 1)
rules = self.pamd.get("account", "required", "pam_access.so")
for current_rule in rules:
self.assertTrue(current_rule.prev.matches("account", "required", "pam_limits.so"))
def test_insert_before_rule_where_rule_doesnt_exist(self):
count = self.pamd.insert_before('account', 'sufficient', 'pam_access.so',
new_type='account', new_control='required', new_path='pam_limits.so')
self.assertFalse(count)
def test_insert_before_rule_with_args(self):
self.assertTrue(self.pamd.insert_before('account', 'required', 'pam_access.so',
new_type='account', new_control='required', new_path='pam_limits.so',
new_args='uid'))
rules = self.pamd.get("account", "required", "pam_access.so")
for current_rule in rules:
self.assertTrue(current_rule.prev.matches("account", "required", "pam_limits.so", 'uid'))
def test_insert_before_rule_test_duplicates(self):
self.assertTrue(self.pamd.insert_before('account', 'required', 'pam_access.so',
new_type='account', new_control='required', new_path='pam_limits.so'))
self.pamd.insert_before('account', 'required', 'pam_access.so',
new_type='account', new_control='required', new_path='pam_limits.so')
rules = self.pamd.get("account", "required", "pam_access.so")
for current_rule in rules:
previous_rule = current_rule.prev
self.assertTrue(previous_rule.matches("account", "required", "pam_limits.so"))
self.assertFalse(previous_rule.prev.matches("account", "required", "pam_limits.so"))
def test_insert_before_first_rule(self):
self.assertTrue(self.pamd.insert_before('auth', 'required', 'pam_env.so',
new_type='account', new_control='required', new_path='pam_limits.so'))
def test_insert_before_first_rule_simple(self):
simple_service = PamdService(self.simple_system_auth_string)
self.assertTrue(simple_service.insert_before('auth', 'required', 'pam_env.so',
new_type='account', new_control='required', new_path='pam_limits.so'))
# Insert After
def test_insert_after_rule(self): def test_insert_after_rule(self):
old_rule = PamdRule.rulefromstring('account required pam_unix.so') self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_unix.so',
new_rule = PamdRule.rulefromstring('account required pam_permit.so arg1 arg2 arg3') new_type='account', new_control='required', new_path='pam_permit.so'))
insert_after_rule(self.pamd, old_rule, new_rule) rules = self.pamd.get("account", "required", "pam_unix.so")
line_to_test = str(old_rule).rstrip() for current_rule in rules:
line_to_test += '\n' self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so"))
line_to_test += str(new_rule).rstrip()
self.assertIn(line_to_test, str(self.pamd)) def test_insert_after_rule_with_args(self):
self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_access.so',
def test_insert_after_rule_another(self): new_type='account', new_control='required', new_path='pam_permit.so',
old_rule = PamdRule.rulefromstring('auth sufficient pam_rootok.so') new_args='uid'))
new_rule = PamdRule.rulefromstring('auth required pam_wheel.so use_id') rules = self.pamd.get("account", "required", "pam_access.so")
insert_after_rule(self.pamd, old_rule, new_rule) for current_rule in rules:
line_to_test = str(old_rule).rstrip() self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid"))
line_to_test += '\n'
line_to_test += str(new_rule).rstrip() def test_insert_after_test_duplicates(self):
self.assertIn(line_to_test, str(self.pamd)) self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_access.so',
new_type='account', new_control='required', new_path='pam_permit.so',
new_args='uid'))
self.assertFalse(self.pamd.insert_after('account', 'required', 'pam_access.so',
new_type='account', new_control='required', new_path='pam_permit.so',
new_args='uid'))
rules = self.pamd.get("account", "required", "pam_access.so")
for current_rule in rules:
self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid"))
self.assertFalse(current_rule.next.next.matches("account", "required", "pam_permit.so", "uid"))
def test_insert_after_rule_last_rule(self): def test_insert_after_rule_last_rule(self):
old_rule = PamdRule.rulefromstring('session required pam_unix.so') self.assertTrue(self.pamd.insert_after('session', 'required', 'pam_unix.so',
new_rule = PamdRule.rulefromstring('session required pam_permit.so arg1 arg2 arg3') new_type='account', new_control='required', new_path='pam_permit.so',
insert_after_rule(self.pamd, old_rule, new_rule) new_args='uid'))
line_to_test = str(old_rule).rstrip() rules = self.pamd.get("session", "required", "pam_unix.so")
line_to_test += '\n' for current_rule in rules:
line_to_test += str(new_rule).rstrip() self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid"))
self.assertIn(line_to_test, str(self.pamd))
# Remove Module Arguments
def test_remove_module_arguments_one(self): def test_remove_module_arguments_one(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') self.assertTrue(self.pamd.remove_module_arguments('auth', 'sufficient', 'pam_unix.so', 'nullok'))
new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so try_first_pass')
args_to_remove = ['nullok'] def test_remove_module_arguments_one_list(self):
remove_module_arguments(self.pamd, old_rule, args_to_remove) self.assertTrue(self.pamd.remove_module_arguments('auth', 'sufficient', 'pam_unix.so', ['nullok']))
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_remove_module_arguments_two(self): def test_remove_module_arguments_two(self):
old_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid') self.assertTrue(self.pamd.remove_module_arguments('session', '[success=1 default=ignore]', 'pam_succeed_if.so', 'service crond'))
new_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so in quiet use_uid')
args_to_remove = ['service', 'crond'] def test_remove_module_arguments_two_list(self):
remove_module_arguments(self.pamd, old_rule, args_to_remove) self.assertTrue(self.pamd.remove_module_arguments('session', '[success=1 default=ignore]', 'pam_succeed_if.so', ['service', 'crond']))
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) def test_remove_module_arguments_where_none_existed(self):
self.assertTrue(self.pamd.add_module_arguments('session', 'required', 'pam_limits.so', 'arg1 arg2= arg3=arg3'))
def test_add_module_arguments_where_none_existed(self): def test_add_module_arguments_where_none_existed(self):
old_rule = PamdRule.rulefromstring('account required pam_unix.so') self.assertTrue(self.pamd.add_module_arguments('account', 'required', 'pam_unix.so', 'arg1 arg2= arg3=arg3'))
new_rule = PamdRule.rulefromstring('account required pam_unix.so arg1 arg2= arg3=arg3')
args_to_add = ['arg1', 'arg2=', 'arg3=arg3'] def test_add_module_arguments_where_none_existed_list(self):
add_module_arguments(self.pamd, old_rule, args_to_add) self.assertTrue(self.pamd.add_module_arguments('account', 'required', 'pam_unix.so', ['arg1', 'arg2=', 'arg3=arg3']))
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
def test_add_module_arguments_where_some_existed(self): def test_add_module_arguments_where_some_existed(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass') self.assertTrue(self.pamd.add_module_arguments('auth', 'sufficient', 'pam_unix.so', 'arg1 arg2= arg3=arg3'))
new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass arg1 arg2= arg3=arg3')
args_to_add = ['arg1', 'arg2=', 'arg3=arg3']
add_module_arguments(self.pamd, old_rule, args_to_add)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
def test_remove_rule(self): def test_remove_rule(self):
old_rule = PamdRule.rulefromstring('account required pam_unix.so') self.assertTrue(self.pamd.remove('account', 'required', 'pam_unix.so'))
remove_rule(self.pamd, old_rule) test_rule = PamdRule('account', 'required', 'pam_unix.so')
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd)) self.assertNotIn(str(test_rule), str(self.pamd))
def test_add_module_args_with_string(self):
old_rule = PamdRule.rulefromstring("account required pam_access.so")
new_rule = PamdRule.rulefromstring("account required pam_access.so listsep=,")
args_to_add = ['listsep=,']
add_module_arguments(self.pamd, old_rule, args_to_add)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))

Loading…
Cancel
Save