VyOS: firewall_rules module added (#65680)

* firewall_rules module added

Signed-off-by: rohitthakur2590 <rohitthakur2590@outlook.com>

* state fact gathering code

Signed-off-by: rohitthakur2590 <rohitthakur2590@outlook.com>

* facts and operation code added

Signed-off-by: rohitthakur2590 <rohitthakur2590@outlook.com>

* unit tests added

Signed-off-by: rohitthakur2590 <rohitthakur2590@outlook.com>

* sanity fixes

Signed-off-by: rohitthakur2590 <rohitthakur2590@outlook.com>

* sanity fixes

Signed-off-by: rohitthakur2590 <rohitthakur2590@outlook.com>

* sanity fixes

Signed-off-by: rohitthakur2590 <rohitthakur2590@outlook.com>

* delete opr updated

Signed-off-by: rohitthakur2590 <rohitthakur2590@outlook.com>
pull/67483/head
Rohit 5 years ago committed by GitHub
parent eff51c0122
commit e76630c4e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,318 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#############################################
# WARNING #
#############################################
#
# This file is auto generated by the resource
# module builder playbook.
#
# Do not edit this file manually.
#
# Changes to this file will be over written
# by the resource module builder.
#
# Changes should be made in the model used to
# generate this file or in the resource module
# builder template.
#
#############################################
"""
The arg spec for the vyos_firewall_rules module
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
class Firewall_rulesArgs(object): # pylint: disable=R0903
"""The arg spec for the vyos_firewall_rules module
"""
def __init__(self, **kwargs):
pass
argument_spec = {
'config': {
'elements': 'dict',
'options': {
'afi': {
'choices': ['ipv4', 'ipv6'],
'required': True,
'type': 'str'
},
'rule_sets': {
'elements': 'dict',
'options': {
'default_action': {
'choices': ['drop', 'reject', 'accept'],
'type': 'str'
},
'description': {
'type': 'str'
},
'enable_default_log': {
'type': 'bool'
},
'name': {
'type': 'str'
},
'rules': {
'elements': 'dict',
'options': {
'action': {
'choices':
['drop', 'reject', 'accept', 'inspect'],
'type':
'str'
},
'description': {
'type': 'str'
},
'destination': {
'options': {
'address': {
'type': 'str'
},
'group': {
'options': {
'address_group': {
'type': 'str'
},
'network_group': {
'type': 'str'
},
'port_group': {
'type': 'str'
}
},
'type': 'dict'
},
'port': {
'type': 'str'
}
},
'type': 'dict'
},
'disabled': {
'type': 'bool'
},
'fragment': {
'choices':
['match-frag', 'match-non-frag'],
'type': 'str'
},
'icmp': {
'options': {
'code': {
'type': 'int'
},
'type': {
'type': 'int'
},
'type_name': {
'choices': [
'any', 'echo-reply',
'destination-unreachable',
'network-unreachable',
'host-unreachable',
'protocol-unreachable',
'port-unreachable',
'fragmentation-needed',
'source-route-failed',
'network-unknown',
'host-unknown',
'network-prohibited',
'host-prohibited',
'TOS-network-unreachable',
'TOS-host-unreachable',
'communication-prohibited',
'host-precedence-violation',
'precedence-cutoff',
'source-quench', 'redirect',
'network-redirect',
'host-redirect',
'TOS-network-redirect',
'TOS-host-redirect',
'echo-request',
'router-advertisement',
'router-solicitation',
'time-exceeded',
'ttl-zero-during-transit',
'ttl-zero-during-reassembly',
'parameter-problem',
'ip-header-bad',
'required-option-missing',
'timestamp-request',
'timestamp-reply',
'address-mask-request',
'address-mask-reply', 'ping',
'pong', 'ttl-exceeded'
],
'type':
'str'
}
},
'type': 'dict'
},
'ipsec': {
'choices': ['match-ipsec', 'match-none'],
'type': 'str'
},
'limit': {
'options': {
'burst': {
'type': 'int'
},
'rate': {
'options': {
'number': {
'type': 'int'
},
'unit': {
'type': 'str'
}
},
'type': 'dict'
}
},
'type': 'dict'
},
'number': {
'required': True,
'type': 'int'
},
'p2p': {
'elements': 'dict',
'options': {
'application': {
'choices': [
'all', 'applejuice',
'bittorrent', 'directconnect',
'edonkey', 'gnutella', 'kazaa'
],
'type':
'str'
}
},
'type': 'list'
},
'protocol': {
'type': 'str'
},
'recent': {
'options': {
'count': {
'type': 'int'
},
'time': {
'type': 'int'
}
},
'type': 'dict'
},
'source': {
'options': {
'address': {
'type': 'str'
},
'group': {
'options': {
'address_group': {
'type': 'str'
},
'network_group': {
'type': 'str'
},
'port_group': {
'type': 'str'
}
},
'type': 'dict'
},
'mac_address': {
'type': 'str'
},
'port': {
'type': 'str'
}
},
'type': 'dict'
},
'state': {
'options': {
'established': {
'type': 'bool'
},
'invalid': {
'type': 'bool'
},
'new': {
'type': 'bool'
},
'related': {
'type': 'bool'
}
},
'type': 'dict'
},
'tcp': {
'options': {
'flags': {
'type': 'str'
}
},
'type': 'dict'
},
'time': {
'options': {
'monthdays': {
'type': 'str'
},
'startdate': {
'type': 'str'
},
'starttime': {
'type': 'str'
},
'stopdate': {
'type': 'str'
},
'stoptime': {
'type': 'str'
},
'utc': {
'type': 'bool'
},
'weekdays': {
'type': 'str'
}
},
'type': 'dict'
}
},
'type': 'list'
}
},
'type': 'list'
}
},
'type': 'list'
},
'running_config': {
'type': 'str'
},
'state': {
'choices': [
'merged', 'replaced', 'overridden', 'deleted', 'gathered',
'rendered', 'parsed'
],
'default':
'merged',
'type':
'str'
}
} # pylint: disable=C0301

@ -0,0 +1,706 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The vyos_firewall_rules class
It is in this file where the current configuration (as dict)
is compared to the provided configuration (as dict) and the command set
necessary to bring the current configuration to it's desired end-state is
created
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from copy import deepcopy
from ansible.module_utils.network.common.cfg.base import ConfigBase
from ansible.module_utils.network.common.utils import to_list, dict_diff, remove_empties
from ansible.module_utils.network.vyos.facts.facts import Facts
from ansible.module_utils.six import iteritems
from ansible.module_utils.network.vyos.utils.utils import list_diff_want_only
class Firewall_rules(ConfigBase):
"""
The vyos_firewall_rules class
"""
gather_subset = [
'!all',
'!min',
]
gather_network_resources = [
'firewall_rules',
]
def __init__(self, module):
super(Firewall_rules, self).__init__(module)
def get_firewall_rules_facts(self, data=None):
""" Get the 'facts' (the current configuration)
:rtype: A dictionary
:returns: The current configuration as a dictionary
"""
facts, _warnings = Facts(self._module).get_facts(self.gather_subset, self.gather_network_resources, data=data)
firewall_rules_facts = facts['ansible_network_resources'].get('firewall_rules')
if not firewall_rules_facts:
return []
return firewall_rules_facts
def execute_module(self):
""" Execute the module
:rtype: A dictionary
:returns: The result from module execution
"""
result = {'changed': False}
warnings = list()
commands = list()
if self.state in self.ACTION_STATES:
existing_firewall_rules_facts = self.get_firewall_rules_facts()
else:
existing_firewall_rules_facts = []
if self.state in self.ACTION_STATES or self.state == 'rendered':
commands.extend(self.set_config(existing_firewall_rules_facts))
if commands and self.state in self.ACTION_STATES:
if not self._module.check_mode:
self._connection.edit_config(commands)
result['changed'] = True
if self.state in self.ACTION_STATES:
result['commands'] = commands
if self.state in self.ACTION_STATES or self.state == 'gathered':
changed_firewall_rules_facts = self.get_firewall_rules_facts()
elif self.state == 'rendered':
result['rendered'] = commands
elif self.state == 'parsed':
running_config = self._module.params['running_config']
if not running_config:
self._module.fail_json(
msg="value of running_config parameter must not be empty for state parsed"
)
result['parsed'] = self.get_firewall_rules_facts(data=running_config)
else:
changed_firewall_rules_facts = []
if self.state in self.ACTION_STATES:
result['before'] = existing_firewall_rules_facts
if result['changed']:
result['after'] = changed_firewall_rules_facts
elif self.state == 'gathered':
result['gathered'] = changed_firewall_rules_facts
result['warnings'] = warnings
return result
def set_config(self, existing_firewall_rules_facts):
""" Collect the configuration from the args passed to the module,
collect the current configuration (as a dict from facts)
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
want = self._module.params['config']
have = existing_firewall_rules_facts
resp = self.set_state(want, have)
return to_list(resp)
def set_state(self, w, h):
""" Select the appropriate function based on the state provided
:param want: the desired configuration as a dictionary
:param have: the current configuration as a dictionary
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
if self.state in ('merged', 'replaced', 'overridden', 'rendered') and not w:
self._module.fail_json(msg='value of config parameter must not be empty for state {0}'.format(self.state))
if self.state == 'overridden':
commands.extend(self._state_overridden(w, h))
elif self.state == 'deleted':
commands.extend(self._state_deleted(w, h))
elif w:
if self.state == 'merged' or self.state == 'rendered':
commands.extend(self._state_merged(w, h))
elif self.state == 'replaced':
commands.extend(self._state_replaced(w, h))
return commands
def _state_replaced(self, want, have):
""" The command generator when state is replaced
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
if have:
for h in have:
r_sets = self._get_r_sets(h)
for rs in r_sets:
w = self.search_r_sets_in_have(want, rs['name'], 'r_list')
commands.extend(self._add_r_sets(h['afi'], rs, w, opr=False))
commands.extend(self._state_merged(want, have))
return commands
def _state_overridden(self, want, have):
""" The command generator when state is overridden
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
if have:
for h in have:
r_sets = self._get_r_sets(h)
for rs in r_sets:
w = self.search_r_sets_in_have(want, rs['name'], 'r_list')
if not w:
commands.append(self._compute_command(h['afi'], rs['name'], remove=True))
else:
commands.extend(self._add_r_sets(h['afi'], rs, w, opr=False))
commands.extend(self._state_merged(want, have))
return commands
def _state_merged(self, want, have):
""" The command generator when state is merged
:rtype: A list
:returns: the commands necessary to merge the provided into
the current configuration
"""
commands = []
for w in want:
r_sets = self._get_r_sets(w)
for rs in r_sets:
h = self.search_r_sets_in_have(have, rs['name'], 'r_list')
commands.extend(self._add_r_sets(w['afi'], rs, h))
return commands
def _state_deleted(self, want, have):
""" The command generator when state is deleted
:rtype: A list
:returns: the commands necessary to remove the current configuration
of the provided objects
"""
commands = []
if want:
for w in want:
r_sets = self._get_r_sets(w)
if r_sets:
for rs in r_sets:
h = self.search_r_sets_in_have(have, rs['name'], 'r_list')
if h:
w_rules = rs.get('rules') or []
h_rules = h.get('rules') or []
if w_rules and h_rules:
for rule in w_rules:
if self.search_r_sets_in_have(h_rules, rule['number'], 'rules'):
commands.append(self._add_r_base_attrib(w['afi'], rs['name'], 'number', rule, opr=False))
else:
commands.append(self._compute_command(w['afi'], h['name'], remove=True))
elif have:
for h in have:
if h['afi'] == w['afi']:
commands.append(self._compute_command(w['afi'], remove=True))
elif have:
for h in have:
r_sets = self._get_r_sets(h)
if r_sets:
commands.append(self._compute_command(afi=h['afi'], remove=True))
return commands
def _add_r_sets(self, afi, want, have, opr=True):
"""
This function forms the set/delete commands based on the 'opr' type
for rule-sets attributes.
:param afi: address type.
:param want: desired config.
:param have: target config.
:param opr: True/False.
:return: generated commands list.
"""
commands = []
l_set = ('description',
'default_action',
'enable_default_log')
h_rs = {}
h_rules = {}
w_rs = deepcopy(remove_empties(want))
w_rules = w_rs.pop('rules', None)
if have:
h_rs = deepcopy(remove_empties(have))
h_rules = h_rs.pop('rules', None)
if w_rs:
for key, val in iteritems(w_rs):
if opr and key in l_set and not (h_rs and self._is_w_same(w_rs, h_rs, key)):
if key == 'enable_default_log':
if val and (not h_rs or key not in h_rs or not h_rs[key]):
commands.append(self._add_rs_base_attrib(afi, want['name'], key, w_rs))
else:
commands.append(self._add_rs_base_attrib(afi, want['name'], key, w_rs))
elif not opr and key in l_set:
if key == 'enable_default_log' and val and h_rs and (key not in h_rs or not h_rs[key]):
commands.append(self._add_rs_base_attrib(afi, want['name'], key, w_rs, opr))
elif not (h_rs and self._in_target(h_rs, key)):
commands.append(self._add_rs_base_attrib(afi, want['name'], key, w_rs, opr))
commands.extend(self._add_rules(afi, want['name'], w_rules, h_rules, opr))
if h_rules:
have['rules'] = h_rules
if w_rules:
want['rules'] = w_rules
return commands
def _add_rules(self, afi, name, w_rules, h_rules, opr=True):
"""
This function forms the set/delete commands based on the 'opr' type
for rules attributes.
:param want: desired config.
:param have: target config.
:return: generated commands list.
"""
commands = []
l_set = ('ipsec',
'action',
'number',
'protocol',
'fragment',
'disabled',
'description')
if w_rules:
for w in w_rules:
cmd = self._compute_command(afi, name, w['number'], opr=opr)
h = self.search_r_sets_in_have(h_rules, w['number'], type='rules')
for key, val in iteritems(w):
if val:
if opr and key in l_set and not (h and self._is_w_same(w, h, key)):
if key == 'disabled':
if not (not val and (not h or key not in h or not h[key])):
commands.append(self._add_r_base_attrib(afi, name, key, w))
else:
commands.append(self._add_r_base_attrib(afi, name, key, w))
elif not opr:
if key == 'number' and self._is_del(l_set, h):
commands.append(self._add_r_base_attrib(afi, name, key, w, opr=opr))
continue
elif key == 'disabled' and val and h and (key not in h or not h[key]):
commands.append(self._add_r_base_attrib(afi, name, key, w, opr=opr))
elif key in l_set and not (h and self._in_target(h, key)) and not self._is_del(l_set, h):
commands.append(self._add_r_base_attrib(afi, name, key, w, opr=opr))
elif key == 'p2p':
commands.extend(self._add_p2p(key, w, h, cmd, opr))
elif key == 'tcp':
commands.extend(self._add_tcp(key, w, h, cmd, opr))
elif key == 'time':
commands.extend(self._add_time(key, w, h, cmd, opr))
elif key == 'icmp':
commands.extend(self._add_icmp(key, w, h, cmd, opr))
elif key == 'state':
commands.extend(self._add_state(key, w, h, cmd, opr))
elif key == 'limit':
commands.extend(self._add_limit(key, w, h, cmd, opr))
elif key == 'recent':
commands.extend(self._add_recent(key, w, h, cmd, opr))
elif key == 'destination' or key == 'source':
commands.extend(self._add_src_or_dest(key, w, h, cmd, opr))
return commands
def _add_p2p(self, attr, w, h, cmd, opr):
"""
This function forms the set/delete commands based on the 'opr' type
for p2p applications attributes.
:param want: desired config.
:param have: target config.
:return: generated commands list.
"""
commands = []
have = []
if w:
want = w.get(attr) or []
if h:
have = h.get(attr) or []
if want:
if opr:
applications = list_diff_want_only(want, have)
for app in applications:
commands.append(cmd + (' ' + attr + ' ' + app['application']))
elif not opr and have:
applications = list_diff_want_only(want, have)
for app in applications:
commands.append(cmd + (' ' + attr + ' ' + app['application']))
return commands
def _add_state(self, attr, w, h, cmd, opr):
"""
This function forms the command for 'state' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
h_state = {}
commands = []
l_set = ('new',
'invalid',
'related',
'established')
if w[attr]:
if h and attr in h.keys():
h_state = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if opr and item in l_set and not (h_state and self._is_w_same(w[attr], h_state, item)):
commands.append(cmd + (' ' + attr + ' ' + item + ' ' + self._bool_to_str(val)))
elif not opr and item in l_set and not (h_state and self._in_target(h_state, item)):
commands.append(cmd + (' ' + attr + ' ' + item))
return commands
def _add_recent(self, attr, w, h, cmd, opr):
"""
This function forms the command for 'recent' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_recent = {}
l_set = ('count', 'time')
if w[attr]:
if h and attr in h.keys():
h_recent = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if opr and item in l_set and not (h_recent and self._is_w_same(w[attr], h_recent, item)):
commands.append(cmd + (' ' + attr + ' ' + item + ' ' + str(val)))
elif not opr and item in l_set and not (h_recent and self._in_target(h_recent, item)):
commands.append(cmd + (' ' + attr + ' ' + item))
return commands
def _add_icmp(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'icmp' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_icmp = {}
l_set = ('code', 'type', 'type_name')
if w[attr]:
if h and attr in h.keys():
h_icmp = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if opr and item in l_set and not (h_icmp and self._is_w_same(w[attr], h_icmp, item)):
if item == 'type_name':
if 'ipv6-name' in cmd:
commands.append(cmd + (' ' + 'icmpv6' + ' ' + 'type' + ' ' + val))
else:
commands.append(cmd + (' ' + attr + ' ' + item.replace("_", "-") + ' ' + val))
else:
commands.append(cmd + (' ' + attr + ' ' + item + ' ' + str(val)))
elif not opr and item in l_set and not (h_icmp and self._in_target(h_icmp, item)):
commands.append(cmd + (' ' + attr + ' ' + item))
return commands
def _add_time(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'time' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_time = {}
l_set = ('utc',
'stopdate',
'stoptime',
'weekdays',
'monthdays',
'startdate',
'starttime')
if w[attr]:
if h and attr in h.keys():
h_time = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if opr and item in l_set and not (h_time and self._is_w_same(w[attr], h_time, item)):
if item == 'utc':
if not (not val and (not h_time or item not in h_time)):
commands.append(cmd + (' ' + attr + ' ' + item))
else:
commands.append(cmd + (' ' + attr + ' ' + item + ' ' + val))
elif not opr and item in l_set and not (h_time and self._is_w_same(w[attr], h_time, item)):
commands.append(cmd + (' ' + attr + ' ' + item))
return commands
def _add_tcp(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'tcp' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
h_tcp = {}
commands = []
if w[attr]:
key = 'flags'
flags = w[attr].get(key) or {}
if flags:
if h and key in h[attr].keys():
h_tcp = h[attr].get(key) or {}
if flags:
if opr and not (h_tcp and self._is_w_same(w[attr], h[attr], key)):
commands.append(cmd + (' ' + attr + ' ' + key + ' ' + flags))
if not opr and not (h_tcp and self._is_w_same(w[attr], h[attr], key)):
commands.append(cmd + (' ' + attr + ' ' + key + ' ' + flags))
return commands
def _add_limit(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'limit' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
h_limit = {}
commands = []
if w[attr]:
key = 'burst'
if opr and key in w[attr].keys() and not (h and attr in h.keys() and self._is_w_same(w[attr], h[attr], key)):
commands.append(cmd + (' ' + attr + ' ' + key + ' ' + str(w[attr].get(key))))
elif not opr and key in w[attr].keys() and not (h and attr in h.keys() and self._in_target(h[attr], key)):
commands.append(cmd + (' ' + attr + ' ' + key + ' ' + str(w[attr].get(key))))
key = 'rate'
rate = w[attr].get(key) or {}
if rate:
if h and key in h[attr].keys():
h_limit = h[attr].get(key) or {}
if 'unit' in rate and 'number' in rate:
if opr and not (h_limit and self._is_w_same(rate, h_limit, 'unit') and self.is_w_same(rate, h_limit, 'number')):
commands.append(cmd + (' ' + attr + ' ' + key + ' ' + str(rate['number']) + '/' + rate['unit']))
if not opr and not (h_limit and self._is_w_same(rate, h_limit, 'unit') and self._is_w_same(rate, h_limit, 'number')):
commands.append(cmd + (' ' + attr + ' ' + key))
return commands
def _add_src_or_dest(self, attr, w, h, cmd, opr=True):
"""
This function forms the commands for 'src/dest' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_group = {}
g_set = ('port_group',
'address_group',
'network_group')
if w[attr]:
keys = ('address', 'mac_address', 'port')
for key in keys:
if opr and key in w[attr].keys() and not (h and attr in h.keys() and self._is_w_same(w[attr], h[attr], key)):
commands.append(cmd + (' ' + attr + ' ' + key.replace("_", "-") + ' ' + w[attr].get(key)))
elif not opr and key in w[attr].keys() and not (h and attr in h.keys() and self._in_target(h[attr], key)):
commands.append(cmd + (' ' + attr + ' ' + key))
key = 'group'
group = w[attr].get(key) or {}
if group:
if h and key in h[attr].keys():
h_group = h[attr].get(key) or {}
for item, val in iteritems(group):
if val:
if opr and item in g_set and not (h_group and self._is_w_same(group, h_group, item)):
commands.append(cmd + (' ' + attr + ' ' + key + ' ' + item.replace("_", "-") + ' ' + val))
elif not opr and item in g_set and not (h_group and self._in_target(h_group, item)):
commands.append(cmd + (' ' + attr + ' ' + key + ' ' + item.replace("_", "-")))
return commands
def search_r_sets_in_have(self, have, w_name, type='rule_sets'):
"""
This function returns the rule-set/rule if it is present in target config.
:param have: target config.
:param w_name: rule-set name.
:param type: rule_sets/rule/r_list.
:return: rule-set/rule.
"""
if have:
key = 'name'
if type == 'rules':
key = 'number'
for r in have:
if r[key] == w_name:
return r
elif type == 'r_list':
for h in have:
r_sets = self._get_r_sets(h)
for rs in r_sets:
if rs[key] == w_name:
return rs
else:
for rs in have:
if rs[key] == w_name:
return rs
return None
def _get_r_sets(self, item, type='rule_sets'):
"""
This function returns the list of rule-sets/rules.
:param item: config dictionary.
:param type: rule_sets/rule/r_list.
:return: list of rule-sets/rules.
"""
rs_list = []
r_sets = item[type]
if r_sets:
for rs in r_sets:
rs_list.append(rs)
return rs_list
def _compute_command(self, afi, name=None, number=None, attrib=None, value=None, remove=False, opr=True):
"""
This function construct the add/delete command based on passed attributes.
:param afi: address type.
:param name: rule-set name.
:param number: rule-number.
:param attrib: attribute name.
:param value: value.
:param remove: True if delete command needed to be construct.
:param opr: opeeration flag.
:return: generated command.
"""
if remove or not opr:
cmd = 'delete firewall ' + self._get_fw_type(afi)
else:
cmd = 'set firewall ' + self._get_fw_type(afi)
if name:
cmd += (' ' + name)
if number:
cmd += (' rule ' + str(number))
if attrib:
cmd += (' ' + attrib.replace("_", "-"))
if value and opr and attrib != 'enable_default_log' and attrib != 'disabled':
cmd += (" '" + str(value) + "'")
return cmd
def _add_r_base_attrib(self, afi, name, attr, rule, opr=True):
"""
This function forms the command for 'rules' attributes which doesn't
have further sub attributes.
:param afi: address type.
:param name: rule-set name
:param attrib: attribute name
:param rule: rule config dictionary.
:param opr: True/False.
:return: generated command.
"""
if attr == 'number':
command = self._compute_command(
afi=afi, name=name, number=rule['number'], opr=opr
)
else:
command = self._compute_command(
afi=afi, name=name, number=rule['number'], attrib=attr, value=rule[attr], opr=opr
)
return command
def _add_rs_base_attrib(self, afi, name, attrib, rule, opr=True):
"""
This function forms the command for 'rule-sets' attributes which doesn't
have further sub attributes.
:param afi: address type.
:param name: rule-set name
:param attrib: attribute name
:param rule: rule config dictionary.
:param opr: True/False.
:return: generated command.
"""
command = self._compute_command(afi=afi, name=name, attrib=attrib, value=rule[attrib], opr=opr)
return command
def _bool_to_str(self, val):
"""
This function converts the bool value into string.
:param val: bool value.
:return: enable/disable.
"""
return 'enable' if val else 'disable'
def _get_fw_type(self, afi):
"""
This function returns the firewall rule-set type based on IP address.
:param afi: address type
:return: rule-set type.
"""
return 'ipv6-name' if afi == 'ipv6' else 'name'
def _is_del(self, l_set, h, key='number'):
"""
This function checks whether rule needs to be deleted based on
the rule number.
:param l_set: attribute set.
:param h: target config.
:param key: number.
:return: True/False.
"""
return key in l_set and not (h and self._in_target(h, key))
def _is_w_same(self, w, h, key):
"""
This function checks whether the key value is same in base and
target config dictionary.
:param w: base config.
:param h: target config.
:param key:attribute name.
:return: True/False.
"""
return True if h and key in h and h[key] == w[key] else False
def _in_target(self, h, key):
"""
This function checks whether the target nexist and key present in target config.
:param h: target config.
:param key: attribute name.
:return: True/False.
"""
return True if h and key in h else False
def _is_base_attrib(self, key):
"""
This function checks whether key is present in predefined
based attribute set.
:param key:
:return: True/False.
"""
r_set = ('p2p',
'ipsec',
'action',
'fragment',
'protocol',
'disabled',
'description',
'mac_address',
'default_action',
'enable_default_log')
return True if key in r_set else False

@ -14,6 +14,7 @@ from ansible.module_utils.network.vyos.facts.l3_interfaces.l3_interfaces import
from ansible.module_utils.network.vyos.facts.lag_interfaces.lag_interfaces import Lag_interfacesFacts
from ansible.module_utils.network.vyos.facts.lldp_global.lldp_global import Lldp_globalFacts
from ansible.module_utils.network.vyos.facts.lldp_interfaces.lldp_interfaces import Lldp_interfacesFacts
from ansible.module_utils.network.vyos.facts.firewall_rules.firewall_rules import Firewall_rulesFacts
from ansible.module_utils.network.vyos.facts.legacy.base import Default, Neighbors, Config
@ -27,7 +28,8 @@ FACT_RESOURCE_SUBSETS = dict(
l3_interfaces=L3_interfacesFacts,
lag_interfaces=Lag_interfacesFacts,
lldp_global=Lldp_globalFacts,
lldp_interfaces=Lldp_interfacesFacts
lldp_interfaces=Lldp_interfacesFacts,
firewall_rules=Firewall_rulesFacts
)

@ -0,0 +1,348 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The vyos firewall_rules fact class
It is in this file the configuration is collected from the device
for a given resource, parsed, and the facts tree is populated
based on the configuration.
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from re import findall, search, M
from copy import deepcopy
from ansible.module_utils.network.common import utils
from ansible.module_utils.network.vyos.argspec.firewall_rules.firewall_rules import Firewall_rulesArgs
class Firewall_rulesFacts(object):
""" The vyos firewall_rules fact class
"""
def __init__(self, module, subspec='config', options='options'):
self._module = module
self.argument_spec = Firewall_rulesArgs.argument_spec
spec = deepcopy(self.argument_spec)
if subspec:
if options:
facts_argument_spec = spec[subspec][options]
else:
facts_argument_spec = spec[subspec]
else:
facts_argument_spec = spec
self.generated_spec = utils.generate_dict(facts_argument_spec)
def get_device_data(self, connection):
return connection.get_config()
def populate_facts(self, connection, ansible_facts, data=None):
""" Populate the facts for firewall_rules
:param connection: the device connection
:param ansible_facts: Facts dictionary
:param data: previously collected conf
:rtype: dictionary
:returns: facts
"""
if not data:
# typically data is populated from the current device configuration
# data = connection.get('show running-config | section ^interface')
# using mock data instead
data = self.get_device_data(connection)
# split the config into instances of the resource
objs = []
v6_rules = findall(r'^set firewall ipv6-name (?:\'*)(\S+)(?:\'*)', data, M)
v4_rules = findall(r'^set firewall name (?:\'*)(\S+)(?:\'*)', data, M)
if v6_rules:
config = self.get_rules(data, v6_rules, type='ipv6')
if config:
config = utils.remove_empties(config)
objs.append(config)
if v4_rules:
config = self.get_rules(data, v4_rules, type='ipv4')
if config:
config = utils.remove_empties(config)
objs.append(config)
ansible_facts['ansible_network_resources'].pop('firewall_rules', None)
facts = {}
if objs:
facts['firewall_rules'] = []
params = utils.validate_config(self.argument_spec, {'config': objs})
for cfg in params['config']:
facts['firewall_rules'].append(utils.remove_empties(cfg))
ansible_facts['ansible_network_resources'].update(facts)
return ansible_facts
def get_rules(self, data, rules, type):
"""
This function performs following:
- Form regex to fetch 'rule-sets' specific config from data.
- Form the rule-set list based on ip address.
:param data: configuration.
:param rules: list of rule-sets.
:param type: ip address type.
:return: generated rule-sets configuration.
"""
r_v4 = []
r_v6 = []
for r in set(rules):
rule_regex = r' %s .+$' % r.strip("'")
cfg = findall(rule_regex, data, M)
fr = self.render_config(cfg, r.strip("'"))
fr['name'] = r.strip("'")
if type == 'ipv6':
r_v6.append(fr)
else:
r_v4.append(fr)
if r_v4:
config = {'afi': 'ipv4', 'rule_sets': r_v4}
if r_v6:
config = {'afi': 'ipv6', 'rule_sets': r_v6}
return config
def render_config(self, conf, match):
"""
Render config as dictionary structure and delete keys
from spec for null values
:param spec: The facts tree, generated from the argspec
:param conf: The configuration
:rtype: dictionary
:returns: The generated config
"""
conf = '\n'.join(filter(lambda x: x, conf))
a_lst = ['description', 'default_action', 'enable_default_log']
config = self.parse_attr(conf, a_lst, match)
if not config:
config = {}
config['rules'] = self.parse_rules_lst(conf)
return config
def parse_rules_lst(self, conf):
"""
This function forms the regex to fetch the 'rules' with in
'rule-sets'
:param conf: configuration data.
:return: generated rule list configuration.
"""
r_lst = []
rules = findall(r'rule (?:\'*)(\d+)(?:\'*)', conf, M)
if rules:
rules_lst = []
for r in set(rules):
r_regex = r' %s .+$' % r
cfg = '\n'.join(findall(r_regex, conf, M))
obj = self.parse_rules(cfg)
obj['number'] = int(r)
if obj:
rules_lst.append(obj)
r_lst = sorted(rules_lst, key=lambda i: i['number'])
return r_lst
def parse_rules(self, conf):
"""
This function triggers the parsing of 'rule' attributes.
a_lst is a list having rule attributes which doesn't
have further sub attributes.
:param conf: configuration
:return: generated rule configuration dictionary.
"""
a_lst = ['ipsec', 'action', 'protocol', 'fragment', 'disabled', 'description']
rule = self.parse_attr(conf, a_lst)
r_sub = {'p2p': self.parse_p2p(conf),
'tcp': self.parse_tcp(conf, 'tcp'),
'icmp': self.parse_icmp(conf, 'icmp'),
'time': self.parse_time(conf, 'time'),
'limit': self.parse_limit(conf, 'limit'),
'state': self.parse_state(conf, 'state'),
'recent': self.parse_recent(conf, 'recent'),
'source': self.parse_src_or_dest(conf, 'source'),
'destination': self.parse_src_or_dest(conf, 'destination')}
rule.update(r_sub)
return rule
def parse_p2p(self, conf):
"""
This function forms the regex to fetch the 'p2p' with in
'rules'
:param conf: configuration data.
:return: generated rule list configuration.
"""
a_lst = []
applications = findall(r'p2p (?:\'*)(\d+)(?:\'*)', conf, M)
if applications:
app_lst = []
for r in set(applications):
obj = {'application': r.strip("'")}
app_lst.append(obj)
a_lst = sorted(app_lst, key=lambda i: i['application'])
return a_lst
def parse_src_or_dest(self, conf, attrib=None):
"""
This function triggers the parsing of 'source or
destination' attributes.
:param conf: configuration.
:param attrib:'source/destination'.
:return:generated source/destination configuration dictionary.
"""
a_lst = ['port', 'address', 'mac_address']
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
cfg_dict['group'] = self.parse_group(conf, attrib + ' group')
return cfg_dict
def parse_recent(self, conf, attrib=None):
"""
This function triggers the parsing of 'recent' attributes
:param conf: configuration.
:param attrib: 'recent'.
:return: generated config dictionary.
"""
a_lst = ['time', 'count']
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_tcp(self, conf, attrib=None):
"""
This function triggers the parsing of 'tcp' attributes.
:param conf: configuration.
:param attrib: 'tcp'.
:return: generated config dictionary.
"""
cfg_dict = self.parse_attr(conf, ['flags'], match=attrib)
return cfg_dict
def parse_time(self, conf, attrib=None):
"""
This function triggers the parsing of 'time' attributes.
:param conf: configuration.
:param attrib: 'time'.
:return: generated config dictionary.
"""
a_lst = ['stopdate', 'stoptime', 'weekdays', 'monthdays', 'startdate', 'starttime']
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_state(self, conf, attrib=None):
"""
This function triggers the parsing of 'state' attributes.
:param conf: configuration
:param attrib: 'state'.
:return: generated config dictionary.
"""
a_lst = ['new', 'invalid', 'related', 'established']
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_group(self, conf, attrib=None):
"""
This function triggers the parsing of 'group' attributes.
:param conf: configuration.
:param attrib: 'group'.
:return: generated config dictionary.
"""
a_lst = ['port_group', 'address_group', 'network_group']
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_icmp(self, conf, attrib=None):
"""
This function triggers the parsing of 'icmp' attributes.
:param conf: configuration to be parsed.
:param attrib: 'icmp'.
:return: generated config dictionary.
"""
a_lst = ['code', 'type', 'type_name']
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_limit(self, conf, attrib=None):
"""
This function triggers the parsing of 'limit' attributes.
:param conf: configuration to be parsed.
:param attrib: 'limit'
:return: generated config dictionary.
"""
cfg_dict = self.parse_attr(conf, ['burst'], match=attrib)
cfg_dict['rate'] = self.parse_rate(conf, 'rate')
return cfg_dict
def parse_rate(self, conf, attrib=None):
"""
This function triggers the parsing of 'rate' attributes.
:param conf: configuration.
:param attrib: 'rate'
:return: generated config dictionary.
"""
a_lst = ['unit', 'number']
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_attr(self, conf, attr_list, match=None):
"""
This function peforms the following:
- Form the regex to fetch the required attribute config.
- Type cast the output in desired format.
:param conf: configuration.
:param attr_list: list of attributes.
:param match: parent node/attribute name.
:return: generated config dictionary.
"""
config = {}
for attrib in attr_list:
regex = self.map_regex(attrib)
if match:
regex = match + ' ' + regex
if conf:
if self.is_bool(attrib):
out = conf.find(attrib.replace("_", "-"))
dis = conf.find(attrib.replace("_", "-") + " 'disable'")
if out >= 1:
if dis >= 1:
config[attrib] = False
else:
config[attrib] = True
else:
out = search(r'^.*' + regex + ' (.+)', conf, M)
if out:
val = out.group(1).strip("'")
if self.is_num(attrib):
val = int(val)
config[attrib] = val
return config
def map_regex(self, attrib):
"""
- This function construct the regex string.
- replace the underscore with hyphen.
:param attrib: attribute
:return: regex string
"""
regex = attrib.replace("_", "-")
if attrib == 'disabled':
regex = 'disable'
return regex
def is_bool(self, attrib):
"""
This function looks for the attribute in predefined bool type set.
:param attrib: attribute.
:return: True/False
"""
bool_set = ('new', 'invalid', 'related', 'disabled', 'established', 'enable_default_log')
return True if attrib in bool_set else False
def is_num(self, attrib):
"""
This function looks for the attribute in predefined integer type set.
:param attrib: attribute.
:return: True/false.
"""
num_set = ('time', 'code', 'type', 'count', 'burst', 'number')
return True if attrib in num_set else False

@ -52,7 +52,7 @@ options:
can also be used with an initial C(M(!)) to specify that a
specific subset should not be collected.
Valid subsets are 'all', 'interfaces', 'l3_interfaces', 'lag_interfaces',
'lldp_global', 'lldp_interfaces'.
'lldp_global', 'lldp_interfaces', 'firewall_rules'.
required: false
version_added: "2.9"
"""

File diff suppressed because it is too large Load Diff

@ -0,0 +1,3 @@
---
testcase: "[^_].*"
test_items: []

@ -0,0 +1,3 @@
---
dependencies:
- prepare_vyos_tests

@ -0,0 +1,19 @@
---
- name: Collect all cli test cases
find:
paths: "{{ role_path }}/tests/cli"
patterns: "{{ testcase }}.yaml"
use_regex: true
register: test_cases
delegate_to: localhost
- name: Set test_items
set_fact: test_items="{{ test_cases.files | map(attribute='path') | list }}"
- name: Run test case (connection=network_cli)
include: "{{ test_case_to_run }}"
vars:
ansible_connection: network_cli
with_items: "{{ test_items }}"
loop_control:
loop_var: test_case_to_run

@ -0,0 +1,2 @@
---
- {include: cli.yaml, tags: ['cli']}

@ -0,0 +1,25 @@
set firewall group address-group 'inbound'
set firewall ipv6-name UPLINK default-action 'accept'
set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
set firewall ipv6-name UPLINK rule 1 action 'accept'
set firewall ipv6-name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
set firewall ipv6-name UPLINK rule 1 ipsec 'match-ipsec'
set firewall ipv6-name UPLINK rule 2 action 'accept'
set firewall ipv6-name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
set firewall ipv6-name UPLINK rule 2 ipsec 'match-ipsec'
set firewall name INBOUND default-action 'accept'
set firewall name INBOUND description 'IPv4 INBOUND rule set'
set firewall name INBOUND rule 101 action 'accept'
set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
set firewall name INBOUND rule 101 ipsec 'match-ipsec'
set firewall name INBOUND rule 102 action 'reject'
set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
set firewall name INBOUND rule 102 ipsec 'match-ipsec'
set firewall name INBOUND rule 103 action 'accept'
set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
set firewall name INBOUND rule 103 destination group address-group 'inbound'
set firewall name INBOUND rule 103 source address '192.0.2.0'
set firewall name INBOUND rule 103 state established 'enable'
set firewall name INBOUND rule 103 state invalid 'disable'
set firewall name INBOUND rule 103 state new 'disable'
set firewall name INBOUND rule 103 state related 'enable'

@ -0,0 +1,31 @@
---
- name: Setup
cli_config:
config: "{{ lines }}"
vars:
lines: |
set firewall group address-group 'inbound'
set firewall ipv6-name UPLINK default-action 'accept'
set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
set firewall ipv6-name UPLINK rule 1 action 'accept'
set firewall ipv6-name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
set firewall ipv6-name UPLINK rule 1 ipsec 'match-ipsec'
set firewall ipv6-name UPLINK rule 2 action 'accept'
set firewall ipv6-name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
set firewall ipv6-name UPLINK rule 2 ipsec 'match-ipsec'
set firewall name INBOUND default-action 'accept'
set firewall name INBOUND description 'IPv4 INBOUND rule set'
set firewall name INBOUND rule 101 action 'accept'
set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
set firewall name INBOUND rule 101 ipsec 'match-ipsec'
set firewall name INBOUND rule 102 action 'reject'
set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
set firewall name INBOUND rule 102 ipsec 'match-ipsec'
set firewall name INBOUND rule 103 action 'accept'
set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
set firewall name INBOUND rule 103 destination group address-group 'inbound'
set firewall name INBOUND rule 103 source address '192.0.2.0'
set firewall name INBOUND rule 103 state established 'enable'
set firewall name INBOUND rule 103 state invalid 'disable'
set firewall name INBOUND rule 103 state new 'disable'
set firewall name INBOUND rule 103 state related 'enable'

@ -0,0 +1,8 @@
---
- name: Remove Config
cli_config:
config: "{{ lines }}"
vars:
lines: |
delete firewall ipv6-name
delete firewall name

@ -0,0 +1,50 @@
---
- debug:
msg: "Start vyos_firewall_rules deleted integration tests ansible_connection={{ ansible_connection }}"
- include_tasks: _populate.yaml
- block:
- name: Delete firewall rule set.
vyos_firewall_rules: &deleted_rs
config:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
state: deleted
register: result
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that the correct set of commands were generated
assert:
that:
- "{{ deleted_rs['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that the after dicts were correctly generated
assert:
that:
- "{{ deleted_rs['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Delete attributes of given interfaces (IDEMPOTENT)
vyos_firewall_rules: *deleted_rs
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result.changed == false"
- "result.commands|length == 0"
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ deleted_rs['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,46 @@
---
- debug:
msg: "Start vyos_firewall_rules deleted integration tests ansible_connection={{ ansible_connection }}"
- include_tasks: _populate.yaml
- block:
- name: Delete firewall rule.
vyos_firewall_rules: &deleted_afi
config:
- afi: 'ipv6'
- afi: 'ipv4'
state: deleted
register: result
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that the correct set of commands were generated
assert:
that:
- "{{ deleted_afi_all['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that the after dicts were correctly generated
assert:
that:
- "{{ deleted_afi_all['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Delete attributes of given interfaces (IDEMPOTENT)
vyos_firewall_rules: *deleted_afi
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result.changed == false"
- "result.commands|length == 0"
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ deleted_afi_all['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,44 @@
---
- debug:
msg: "Start vyos_firewall_rules deleted integration tests ansible_connection={{ ansible_connection }}"
- include_tasks: _populate.yaml
- block:
- name: Delete all the firewall rules.
vyos_firewall_rules: &deleted_all
config:
state: deleted
register: result
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that the correct set of commands were generated
assert:
that:
- "{{ deleted_afi_all['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that the after dicts were correctly generated
assert:
that:
- "{{ deleted_afi_all['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Delete attributes of given interfaces (IDEMPOTENT)
vyos_firewall_rules: *deleted_all
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result.changed == false"
- "result.commands|length == 0"
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ deleted_afi_all['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,49 @@
---
- debug:
msg: "Start vyos_firewall_rules deleted integration tests ansible_connection={{ ansible_connection }}"
- include_tasks: _populate.yaml
- block:
- name: Delete firewall rule.
vyos_firewall_rules: &deleted_r
config:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
rules:
- number: 1
state: deleted
register: result
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that the correct set of commands were generated
assert:
that:
- "{{ deleted_r['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that the after dicts were correctly generated
assert:
that:
- "{{ deleted_r['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Delete attributes of given interfaces (IDEMPOTENT)
vyos_firewall_rules: *deleted_r
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result.changed == false"
- "result.commands|length == 0"
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ deleted_r['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,58 @@
---
- debug:
msg: "START vyos_firewall_rules empty_config integration tests on connection={{ ansible_connection }}"
- name: Merged with empty config should give appropriate error message
vyos_firewall_rules:
config:
state: merged
register: result
ignore_errors: true
- assert:
that:
- result.msg == 'value of config parameter must not be empty for state merged'
- name: Replaced with empty config should give appropriate error message
vyos_firewall_rules:
config:
state: replaced
register: result
ignore_errors: true
- assert:
that:
- result.msg == 'value of config parameter must not be empty for state replaced'
- name: Overridden with empty config should give appropriate error message
vyos_firewall_rules:
config:
state: overridden
register: result
ignore_errors: true
- assert:
that:
- result.msg == 'value of config parameter must not be empty for state overridden'
- name: Parsed with empty running_config should give appropriate error message
vyos_firewall_rules:
running_config:
state: parsed
register: result
ignore_errors: true
- assert:
that:
- result.msg == 'value of running_config parameter must not be empty for state parsed'
- name: Rendered with empty config should give appropriate error message
vyos_firewall_rules:
config:
state: rendered
register: result
ignore_errors: true
- assert:
that:
- result.msg == 'value of config parameter must not be empty for state rendered'

@ -0,0 +1,31 @@
---
- debug:
msg: "START vyos_firewall_rules gathered integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate.yaml
- block:
- name: Merge the provided configuration with the exisiting running configuration
vyos_firewall_rules: &gathered
config:
state: gathered
register: result
- name: Assert that gathered dicts was correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['gathered']) |length == 0 }}"
- name: Gather the existing running configuration (IDEMPOTENT)
vyos_firewall_rules: *gathered
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result['changed'] == false"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,87 @@
---
- debug:
msg: "START vyos_firewall_rules merged integration tests on connection={{ ansible_connection }}"
- include_tasks: _populate.yaml
- include_tasks: _remove_config.yaml
- block:
- name: Merge the provided configuration with the exisiting running configuration
vyos_firewall_rules: &merged
config:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
rules:
- number: 1
action: 'accept'
description: 'Fwipv6-Rule 1 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 2
action: 'accept'
description: 'Fwipv6-Rule 2 is configured by Ansible'
ipsec: 'match-ipsec'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 102
action: 'reject'
description: 'Rule 102 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 103
action: 'accept'
description: 'Rule 103 is configured by Ansible'
destination:
group:
address_group: 'inbound'
source:
address: '192.0.2.0'
state:
established: true
new: false
invalid: false
related: true
state: merged
register: result
- name: Assert that before dicts were correctly generated
assert:
that: "{{ merged['before'] | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that correct set of commands were generated
assert:
that:
- "{{ merged['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that after dicts was correctly generated
assert:
that:
- "{{ merged['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Merge the provided configuration with the existing running configuration (IDEMPOTENT)
vyos_firewall_rules: *merged
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result['changed'] == false"
- name: Assert that before dicts were correctly generated
assert:
that:
- "{{ merged['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,60 @@
---
- debug:
msg: "START vyos_firewall_rules overridden integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate.yaml
- block:
- name: Overrides all device configuration with provided configuration
vyos_firewall_rules: &overridden
config:
- afi: 'ipv4'
rule_sets:
- name: 'Downlink'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 501
action: 'accept'
description: 'Rule 501 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 502
action: 'reject'
description: 'Rule 502 is configured by Ansible'
ipsec: 'match-ipsec'
state: overridden
register: result
- name: Assert that before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that correct commands were generated
assert:
that:
- "{{ overridden['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that after dicts were correctly generated
assert:
that:
- "{{ overridden['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Overrides all device configuration with provided configurations (IDEMPOTENT)
vyos_firewall_rules: *overridden
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result['changed'] == false"
- name: Assert that before dicts were correctly generated
assert:
that:
- "{{ overridden['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,39 @@
---
- debug:
msg: "START vyos_firewall_rules parsed integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate.yaml
- block:
- name: Gather firewall_rules facts
vyos_facts:
gather_subset:
- default
gather_network_resources:
- firewall_rules
register: firewall_rules_facts
- name: Provide the running configuration for parsing (config to be parsed)
vyos_firewall_rules: &parsed
running_config:
"{{ lookup('file', '_parsed_config.cfg') }}"
state: parsed
register: result
- name: Assert that correct parsing done
assert:
that: "{{ ansible_facts['network_resources']['firewall_rules'] | symmetric_difference(result['parsed']) |length == 0 }}"
- name: Gather the existing running configuration (IDEMPOTENT)
vyos_firewall_rules: *parsed
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result['changed'] == false"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,63 @@
---
- debug:
msg: "START vyos_firewall_rules rendered integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate.yaml
- block:
- name: Structure provided configuration into device specific commands
vyos_firewall_rules: &rendered
config:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 102
action: 'reject'
description: 'Rule 102 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 103
action: 'accept'
description: 'Rule 103 is configured by Ansible'
destination:
group:
address_group: 'inbound'
source:
address: '192.0.2.0'
state:
established: true
new: false
invalid: false
related: true
state: rendered
register: result
- name: Assert that correct set of commands were generated
assert:
that:
- "{{ rendered['commands'] | symmetric_difference(result['rendered']) |length == 0 }}"
- name: Structure provided configuration into device specific commands (IDEMPOTENT)
vyos_firewall_rules: *rendered
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result['changed'] == false"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,65 @@
---
- debug:
msg: "START vyos_firewall_rules replaced integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate.yaml
- block:
- name: Replace device configurations of listed firewall rules with provided configurations
vyos_firewall_rules: &replaced
config:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 104
action: 'reject'
description: 'Rule 104 is configured by Ansible'
ipsec: 'match-none'
state: replaced
register: result
- name: Assert that correct set of commands were generated
assert:
that:
- "{{ replaced['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that before dicts are correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that after dict is correctly generated
assert:
that:
- "{{ replaced['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Replace device configurations of listed firewall rules with provided configurarions (IDEMPOTENT)
vyos_firewall_rules: *replaced
register: result
- name: Assert that task was idempotent
assert:
that:
- "result['changed'] == false"
- name: Assert that before dict is correctly generated
assert:
that:
- "{{ replaced['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,87 @@
---
- debug:
msg: "START vyos_firewall_rules round trip integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- block:
- name: Apply the provided configuration (base config)
vyos_firewall_rules:
config:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
rules:
- number: 1
action: 'accept'
description: 'Fwipv6-Rule 1 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 2
action: 'accept'
description: 'Fwipv6-Rule 2 is configured by Ansible'
ipsec: 'match-ipsec'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 102
action: 'reject'
description: 'Rule 102 is configured by Ansible'
ipsec: 'match-ipsec'
state: merged
register: base_config
- name: Gather firewall_rules facts
vyos_facts:
gather_subset:
- default
gather_network_resources:
- firewall_rules
- name: Apply the provided configuration (config to be reverted)
vyos_firewall_rules:
config:
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 103
action: 'accept'
description: 'Rule 103 is configured by Ansible'
source:
address: '192.0.2.0'
state:
established: true
new: false
invalid: false
related: true
state: merged
register: result
- name: Assert that changes were applied
assert:
that: "{{ round_trip['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Revert back to base config using facts round trip
vyos_firewall_rules:
config: "{{ ansible_facts['network_resources']['firewall_rules'] }}"
state: overridden
register: revert
- name: Assert that config was reverted
assert:
that: "{{ base_config['after'] | symmetric_difference(revert['after']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml

@ -0,0 +1,327 @@
---
merged:
before: []
commands:
- "set firewall ipv6-name UPLINK default-action 'accept'"
- "set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'"
- "set firewall ipv6-name UPLINK rule 1 action 'accept'"
- "set firewall ipv6-name UPLINK rule 1"
- "set firewall ipv6-name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'"
- "set firewall ipv6-name UPLINK rule 1 ipsec 'match-ipsec'"
- "set firewall ipv6-name UPLINK rule 2 action 'accept'"
- "set firewall ipv6-name UPLINK rule 2"
- "set firewall ipv6-name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'"
- "set firewall ipv6-name UPLINK rule 2 ipsec 'match-ipsec'"
- "set firewall name INBOUND default-action 'accept'"
- "set firewall name INBOUND description 'IPv4 INBOUND rule set'"
- "set firewall name INBOUND rule 101 action 'accept'"
- "set firewall name INBOUND rule 101"
- "set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'"
- "set firewall name INBOUND rule 101 ipsec 'match-ipsec'"
- "set firewall name INBOUND rule 102 action 'reject'"
- "set firewall name INBOUND rule 102"
- "set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'"
- "set firewall name INBOUND rule 102 ipsec 'match-ipsec'"
- "set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'"
- "set firewall name INBOUND rule 103 destination group address-group inbound"
- "set firewall name INBOUND rule 103"
- "set firewall name INBOUND rule 103 source address 192.0.2.0"
- "set firewall name INBOUND rule 103 state established enable"
- "set firewall name INBOUND rule 103 state related enable"
- "set firewall name INBOUND rule 103 state invalid disable"
- "set firewall name INBOUND rule 103 state new disable"
- "set firewall name INBOUND rule 103 action 'accept'"
after:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
rules:
- number: 1
action: 'accept'
description: 'Fwipv6-Rule 1 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 2
action: 'accept'
description: 'Fwipv6-Rule 2 is configured by Ansible'
ipsec: 'match-ipsec'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 102
action: 'reject'
description: 'Rule 102 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 103
action: 'accept'
description: 'Rule 103 is configured by Ansible'
destination:
group:
address_group: 'inbound'
source:
address: '192.0.2.0'
state:
established: true
new: false
invalid: false
related: true
populate:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
rules:
- number: 1
action: 'accept'
description: 'Fwipv6-Rule 1 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 2
action: 'accept'
description: 'Fwipv6-Rule 2 is configured by Ansible'
ipsec: 'match-ipsec'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 102
action: 'reject'
description: 'Rule 102 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 103
action: 'accept'
description: 'Rule 103 is configured by Ansible'
destination:
group:
address_group: 'inbound'
source:
address: '192.0.2.0'
state:
established: true
new: false
invalid: false
related: true
replaced:
commands:
- "delete firewall ipv6-name UPLINK rule 1"
- "delete firewall ipv6-name UPLINK rule 2"
- "delete firewall name INBOUND rule 102"
- "delete firewall name INBOUND rule 103"
- "set firewall name INBOUND rule 104 action 'reject'"
- "set firewall name INBOUND rule 104 description 'Rule 104 is configured by Ansible'"
- "set firewall name INBOUND rule 104"
- "set firewall name INBOUND rule 104 ipsec 'match-none'"
after:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 104
action: 'reject'
description: 'Rule 104 is configured by Ansible'
ipsec: 'match-none'
overridden:
before:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 104
action: 'reject'
description: 'Rule 104 is configured by Ansible'
ipsec: 'match-none'
commands:
- "delete firewall ipv6-name UPLINK"
- "delete firewall name INBOUND"
- "set firewall name Downlink default-action 'accept'"
- "set firewall name Downlink description 'IPv4 INBOUND rule set'"
- "set firewall name Downlink rule 501 action 'accept'"
- "set firewall name Downlink rule 501"
- "set firewall name Downlink rule 501 description 'Rule 501 is configured by Ansible'"
- "set firewall name Downlink rule 501 ipsec 'match-ipsec'"
- "set firewall name Downlink rule 502 action 'reject'"
- "set firewall name Downlink rule 502"
- "set firewall name Downlink rule 502 description 'Rule 502 is configured by Ansible'"
- "set firewall name Downlink rule 502 ipsec 'match-ipsec'"
after:
- afi: 'ipv4'
rule_sets:
- name: 'Downlink'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 501
action: 'accept'
description: 'Rule 501 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 502
action: 'reject'
description: 'Rule 502 is configured by Ansible'
ipsec: 'match-ipsec'
rendered:
commands:
- "set firewall ipv6-name UPLINK default-action 'accept'"
- "set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'"
- "set firewall name INBOUND default-action 'accept'"
- "set firewall name INBOUND description 'IPv4 INBOUND rule set'"
- "set firewall name INBOUND rule 101 action 'accept'"
- "set firewall name INBOUND rule 101"
- "set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'"
- "set firewall name INBOUND rule 101 ipsec 'match-ipsec'"
- "set firewall name INBOUND rule 102 action 'reject'"
- "set firewall name INBOUND rule 102"
- "set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'"
- "set firewall name INBOUND rule 102 ipsec 'match-ipsec'"
- "set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'"
- "set firewall name INBOUND rule 103 destination group address-group inbound"
- "set firewall name INBOUND rule 103"
- "set firewall name INBOUND rule 103 source address 192.0.2.0"
- "set firewall name INBOUND rule 103 state established enable"
- "set firewall name INBOUND rule 103 state related enable"
- "set firewall name INBOUND rule 103 state invalid disable"
- "set firewall name INBOUND rule 103 state new disable"
- "set firewall name INBOUND rule 103 action 'accept'"
deleted_rs:
commands:
- "delete firewall ipv6-name UPLINK"
- "delete firewall name INBOUND"
after: []
deleted_afi_all:
commands:
- "delete firewall ipv6-name"
- "delete firewall name"
after: []
deleted_r:
commands:
- "delete firewall ipv6-name UPLINK rule 1"
after:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
rules:
- number: 2
action: 'accept'
description: 'Fwipv6-Rule 2 is configured by Ansible'
ipsec: 'match-ipsec'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 102
action: 'reject'
description: 'Rule 102 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 103
action: 'accept'
description: 'Rule 103 is configured by Ansible'
destination:
group:
address_group: 'inbound'
source:
address: '192.0.2.0'
state:
established: true
new: false
invalid: false
related: true
round_trip:
after:
- afi: 'ipv6'
rule_sets:
- name: 'UPLINK'
description: 'This is ipv6 specific rule-set'
default_action: 'accept'
rules:
- number: 1
action: 'accept'
description: 'Fwipv6-Rule 1 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 2
action: 'accept'
description: 'Fwipv6-Rule 2 is configured by Ansible'
ipsec: 'match-ipsec'
- afi: 'ipv4'
rule_sets:
- name: 'INBOUND'
description: 'IPv4 INBOUND rule set'
default_action: 'accept'
rules:
- number: 101
action: 'accept'
description: 'Rule 101 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 102
action: 'reject'
description: 'Rule 102 is configured by Ansible'
ipsec: 'match-ipsec'
- number: 103
action: 'accept'
description: 'Rule 103 is configured by Ansible'
source:
address: '192.0.2.0'
state:
established: true
new: false
invalid: false
related: true

@ -0,0 +1,13 @@
set firewall name V4-INGRESS default-action 'accept'
set firewall ipv6-name V6-INGRESS default-action 'accept'
set firewall name V4-INGRESS description 'This is IPv4 V4-INGRESS rule set'
set firewall name V4-INGRESS enable-default-log
set firewall name V4-INGRESS rule 101 protocol 'icmp'
set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'
set firewall name V4-INGRESS rule 101 fragment 'match-frag'
set firewall name V4-INGRESS rule 101
set firewall name V4-INGRESS rule 101 disabled
set firewall name V4-INGRESS rule 101 action 'accept'
set firewall name V4-INGRESS rule 101 ipsec 'match-ipsec'
set firewall name V4-EGRESS default-action 'reject'
set firewall ipv6-name V6-EGRESS default-action 'reject'

@ -0,0 +1,827 @@
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from units.compat.mock import patch, MagicMock
from ansible.modules.network.vyos import vyos_firewall_rules
from units.modules.utils import set_module_args
from .vyos_module import TestVyosModule, load_fixture
class TestVyosFirewallRulesModule(TestVyosModule):
module = vyos_firewall_rules
def setUp(self):
super(TestVyosFirewallRulesModule, self).setUp()
self.mock_get_config = patch(
'ansible.module_utils.network.common.network.Config.get_config')
self.get_config = self.mock_get_config.start()
self.mock_load_config = patch(
'ansible.module_utils.network.common.network.Config.load_config')
self.load_config = self.mock_load_config.start()
self.mock_get_resource_connection_config = patch(
'ansible.module_utils.network.common.cfg.base.get_resource_connection'
)
self.get_resource_connection_config = self.mock_get_resource_connection_config.start(
)
self.mock_get_resource_connection_facts = patch(
'ansible.module_utils.network.common.facts.facts.get_resource_connection'
)
self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start(
)
self.mock_execute_show_command = patch(
'ansible.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data'
)
self.mock_execute_show_command = patch(
'ansible.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data'
)
self.execute_show_command = self.mock_execute_show_command.start()
def tearDown(self):
super(TestVyosFirewallRulesModule, self).tearDown()
self.mock_get_resource_connection_config.stop()
self.mock_get_resource_connection_facts.stop()
self.mock_get_config.stop()
self.mock_load_config.stop()
self.mock_execute_show_command.stop()
def load_fixtures(self, commands=None):
def load_from_file(*args, **kwargs):
return load_fixture('vyos_firewall_rules_config.cfg')
self.execute_show_command.side_effect = load_from_file
def test_vyos_firewall_rule_set_01_merged(self):
set_module_args(
dict(config=[
dict(afi='ipv6',
rule_sets=[
dict(name='V6-INBOUND',
description='This is IPv6 INBOUND rule set',
default_action='reject',
enable_default_log=True,
rules=[]),
dict(name='V6-OUTBOUND',
description='This is IPv6 OUTBOUND rule set',
default_action='accept',
enable_default_log=False,
rules=[])
]),
dict(afi='ipv4',
rule_sets=[
dict(name='V4-INBOUND',
description='This is IPv4 INBOUND rule set',
default_action='reject',
enable_default_log=True,
rules=[]),
dict(name='V4-OUTBOUND',
description='This is IPv4 OUTBOUND rule set',
default_action='accept',
enable_default_log=False,
rules=[])
])
],
state="merged"))
commands = [
"set firewall ipv6-name V6-INBOUND default-action 'reject'",
"set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'",
'set firewall ipv6-name V6-INBOUND enable-default-log',
"set firewall ipv6-name V6-OUTBOUND default-action 'accept'",
"set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'",
"set firewall name V4-INBOUND default-action 'reject'",
"set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'",
'set firewall name V4-INBOUND enable-default-log',
"set firewall name V4-OUTBOUND default-action 'accept'",
"set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'"
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_rule_set_02_merged(self):
set_module_args(
dict(config=[
dict(afi='ipv6',
rule_sets=[
dict(name='V6-INBOUND',
description='This is IPv6 INBOUND rule set',
default_action='reject',
enable_default_log=True,
rules=[]),
dict(name='V6-OUTBOUND',
description='This is IPv6 OUTBOUND rule set',
default_action='accept',
enable_default_log=False,
rules=[])
]),
dict(afi='ipv4',
rule_sets=[
dict(name='V4-INBOUND',
description='This is IPv4 INBOUND rule set',
default_action='reject',
enable_default_log=True,
rules=[]),
dict(name='V4-OUTBOUND',
description='This is IPv4 OUTBOUND rule set',
default_action='accept',
enable_default_log=False,
rules=[])
])
],
state="merged"))
commands = [
"set firewall ipv6-name V6-INBOUND default-action 'reject'",
"set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'",
'set firewall ipv6-name V6-INBOUND enable-default-log',
"set firewall ipv6-name V6-OUTBOUND default-action 'accept'",
"set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'",
"set firewall name V4-INBOUND default-action 'reject'",
"set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'",
'set firewall name V4-INBOUND enable-default-log',
"set firewall name V4-OUTBOUND default-action 'accept'",
"set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'"
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_01(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='INBOUND',
description='This is IPv4 INBOUND rule set',
default_action='accept',
enable_default_log=True,
rules=[
dict(number='101',
action='accept',
description='Rule 101 is configured by Ansible',
ipsec='match-ipsec',
protocol='icmp',
fragment='match-frag',
disabled=True)
]),
])
],
state="merged"))
commands = [
"set firewall name INBOUND default-action 'accept'",
"set firewall name INBOUND description 'This is IPv4 INBOUND rule set'",
'set firewall name INBOUND enable-default-log',
"set firewall name INBOUND rule 101 protocol 'icmp'",
"set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
"set firewall name INBOUND rule 101 fragment 'match-frag'",
'set firewall name INBOUND rule 101',
'set firewall name INBOUND rule 101 disabled',
"set firewall name INBOUND rule 101 action 'accept'",
"set firewall name INBOUND rule 101 ipsec 'match-ipsec'"
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_02(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
protocol='tcp',
source=dict(
address='192.0.2.0',
mac_address='38:00:25:19:76:0c',
port=2127),
destination=dict(address='192.0.1.0',
port=2124),
limit=dict(burst=10,
rate=dict(number=20,
unit='second')),
recent=dict(count=10, time=20),
state=dict(established=True,
related=True,
invalid=True,
new=True))
]),
])
],
state="merged"))
commands = [
"set firewall name INBOUND rule 101 protocol 'tcp'",
'set firewall name INBOUND rule 101 destination address 192.0.1.0',
'set firewall name INBOUND rule 101 destination port 2124',
'set firewall name INBOUND rule 101',
'set firewall name INBOUND rule 101 source address 192.0.2.0',
'set firewall name INBOUND rule 101 source mac-address 38:00:25:19:76:0c',
'set firewall name INBOUND rule 101 source port 2127',
'set firewall name INBOUND rule 101 state new enable',
'set firewall name INBOUND rule 101 state invalid enable',
'set firewall name INBOUND rule 101 state related enable',
'set firewall name INBOUND rule 101 state established enable',
'set firewall name INBOUND rule 101 limit burst 10',
'set firewall name INBOUND rule 101 limit rate 20/second',
'set firewall name INBOUND rule 101 recent count 10',
'set firewall name INBOUND rule 101 recent time 20',
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_03(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
destination=dict(group=dict(
address_group='OUT-ADDR-GROUP',
network_group='OUT-NET-GROUP',
port_group='OUT-PORT-GROUP')),
source=dict(group=dict(
address_group='IN-ADDR-GROUP',
network_group='IN-NET-GROUP',
port_group='IN-PORT-GROUP')))
]),
])
],
state="merged"))
commands = [
'set firewall name INBOUND rule 101 source group address-group IN-ADDR-GROUP',
'set firewall name INBOUND rule 101 source group network-group IN-NET-GROUP',
'set firewall name INBOUND rule 101 source group port-group IN-PORT-GROUP',
'set firewall name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP',
'set firewall name INBOUND rule 101 destination group network-group OUT-NET-GROUP',
'set firewall name INBOUND rule 101 destination group port-group OUT-PORT-GROUP',
'set firewall name INBOUND rule 101'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_04(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
time=dict(monthdays='2',
startdate='2020-01-24',
starttime='13:20:00',
stopdate='2020-01-28',
stoptime='13:30:00',
weekdays='!Sat,Sun',
utc=True),
tcp=dict(flags='ALL'))
]),
])
],
state="merged"))
commands = [
'set firewall name INBOUND rule 101',
'set firewall name INBOUND rule 101 tcp flags ALL',
'set firewall name INBOUND rule 101 time utc',
'set firewall name INBOUND rule 101 time monthdays 2',
'set firewall name INBOUND rule 101 time startdate 2020-01-24',
'set firewall name INBOUND rule 101 time stopdate 2020-01-28',
'set firewall name INBOUND rule 101 time weekdays !Sat,Sun',
'set firewall name INBOUND rule 101 time stoptime 13:30:00',
'set firewall name INBOUND rule 101 time starttime 13:20:00',
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_01(self):
set_module_args(
dict(config=[
dict(afi='ipv6',
rule_sets=[
dict(name='INBOUND',
description='This is IPv6 INBOUND rule set',
default_action='accept',
enable_default_log=True,
rules=[
dict(number='101',
action='accept',
description='Rule 101 is configured by Ansible',
ipsec='match-ipsec',
protocol='icmp',
disabled=True)
]),
])
],
state="merged"))
commands = [
"set firewall ipv6-name INBOUND default-action 'accept'",
"set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set'",
'set firewall ipv6-name INBOUND enable-default-log',
"set firewall ipv6-name INBOUND rule 101 protocol 'icmp'",
"set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
'set firewall ipv6-name INBOUND rule 101',
'set firewall ipv6-name INBOUND rule 101 disabled',
"set firewall ipv6-name INBOUND rule 101 action 'accept'",
"set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'"
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_02(self):
set_module_args(
dict(config=[
dict(afi='ipv6',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
protocol='tcp',
source=dict(
address='2001:db8::12',
mac_address='38:00:25:19:76:0c',
port=2127),
destination=dict(address='2001:db8::11',
port=2124),
limit=dict(burst=10,
rate=dict(number=20,
unit='second')),
recent=dict(count=10, time=20),
state=dict(established=True,
related=True,
invalid=True,
new=True))
]),
])
],
state="merged"))
commands = [
"set firewall ipv6-name INBOUND rule 101 protocol 'tcp'",
'set firewall ipv6-name INBOUND rule 101 destination address 2001:db8::11',
'set firewall ipv6-name INBOUND rule 101 destination port 2124',
'set firewall ipv6-name INBOUND rule 101',
'set firewall ipv6-name INBOUND rule 101 source address 2001:db8::12',
'set firewall ipv6-name INBOUND rule 101 source mac-address 38:00:25:19:76:0c',
'set firewall ipv6-name INBOUND rule 101 source port 2127',
'set firewall ipv6-name INBOUND rule 101 state new enable',
'set firewall ipv6-name INBOUND rule 101 state invalid enable',
'set firewall ipv6-name INBOUND rule 101 state related enable',
'set firewall ipv6-name INBOUND rule 101 state established enable',
'set firewall ipv6-name INBOUND rule 101 limit burst 10',
'set firewall ipv6-name INBOUND rule 101 recent count 10',
'set firewall ipv6-name INBOUND rule 101 recent time 20',
'set firewall ipv6-name INBOUND rule 101 limit rate 20/second'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_03(self):
set_module_args(
dict(config=[
dict(afi='ipv6',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
destination=dict(group=dict(
address_group='OUT-ADDR-GROUP',
network_group='OUT-NET-GROUP',
port_group='OUT-PORT-GROUP')),
source=dict(group=dict(
address_group='IN-ADDR-GROUP',
network_group='IN-NET-GROUP',
port_group='IN-PORT-GROUP')))
]),
])
],
state="merged"))
commands = [
'set firewall ipv6-name INBOUND rule 101 source group address-group IN-ADDR-GROUP',
'set firewall ipv6-name INBOUND rule 101 source group network-group IN-NET-GROUP',
'set firewall ipv6-name INBOUND rule 101 source group port-group IN-PORT-GROUP',
'set firewall ipv6-name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP',
'set firewall ipv6-name INBOUND rule 101 destination group network-group OUT-NET-GROUP',
'set firewall ipv6-name INBOUND rule 101 destination group port-group OUT-PORT-GROUP',
'set firewall ipv6-name INBOUND rule 101'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_04(self):
set_module_args(
dict(config=[
dict(afi='ipv6',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
time=dict(monthdays='2',
startdate='2020-01-24',
starttime='13:20:00',
stopdate='2020-01-28',
stoptime='13:30:00',
weekdays='!Sat,Sun',
utc=True),
tcp=dict(flags='ALL'))
]),
])
],
state="merged"))
commands = [
'set firewall ipv6-name INBOUND rule 101',
'set firewall ipv6-name INBOUND rule 101 tcp flags ALL',
'set firewall ipv6-name INBOUND rule 101 time utc',
'set firewall ipv6-name INBOUND rule 101 time monthdays 2',
'set firewall ipv6-name INBOUND rule 101 time startdate 2020-01-24',
'set firewall ipv6-name INBOUND rule 101 time stopdate 2020-01-28',
'set firewall ipv6-name INBOUND rule 101 time weekdays !Sat,Sun',
'set firewall ipv6-name INBOUND rule 101 time stoptime 13:30:00',
'set firewall ipv6-name INBOUND rule 101 time starttime 13:20:00'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_icmp_01(self):
set_module_args(
dict(config=[
dict(afi='ipv6',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
protocol='icmp',
icmp=dict(type_name='port-unreachable'))
]),
])
],
state="merged"))
commands = [
'set firewall ipv6-name INBOUND rule 101 icmpv6 type port-unreachable',
"set firewall ipv6-name INBOUND rule 101 protocol 'icmp'",
'set firewall ipv6-name INBOUND rule 101'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_01(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
protocol='icmp',
icmp=dict(type=1, code=1))
]),
])
],
state="merged"))
commands = [
'set firewall name INBOUND rule 101 icmp type 1',
'set firewall name INBOUND rule 101 icmp code 1',
"set firewall name INBOUND rule 101 protocol 'icmp'",
'set firewall name INBOUND rule 101'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_02(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='INBOUND',
rules=[
dict(number='101',
protocol='icmp',
icmp=dict(type_name='echo-request'))
]),
])
],
state="merged"))
commands = [
'set firewall name INBOUND rule 101 icmp type-name echo-request',
"set firewall name INBOUND rule 101 protocol 'icmp'",
'set firewall name INBOUND rule 101'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_del_01(self):
set_module_args(
dict(config=[
dict(afi='ipv4', rule_sets=[
dict(name='V4-INGRESS'),
])
],
state="deleted"))
commands = ['delete firewall name V4-INGRESS']
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_02(self):
set_module_args(
dict(config=[
dict(afi='ipv4', rule_sets=[
dict(name='V4-INGRESS'),
]),
dict(afi='ipv6', rule_sets=[
dict(name='V6-INGRESS'),
])
],
state="deleted"))
commands = [
'delete firewall name V4-INGRESS',
'delete firewall ipv6-name V6-INGRESS'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_03(self):
set_module_args(dict(config=[], state="deleted"))
commands = ['delete firewall name', 'delete firewall ipv6-name']
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_04(self):
set_module_args(
dict(config=[
dict(afi='ipv4', rule_sets=[
dict(name='V4-ING'),
]),
dict(afi='ipv6', rule_sets=[
dict(name='V6-ING'),
])
],
state="deleted"))
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_rep_01(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='V4-INGRESS',
description='This is IPv4 INGRESS rule set',
default_action='accept',
enable_default_log=True,
rules=[
dict(number='101',
action='reject',
description='Rule 101 is configured by Ansible RM',
ipsec='match-ipsec',
protocol='tcp',
fragment='match-frag',
disabled=False),
dict(number='102',
action='accept',
description='Rule 102 is configured by Ansible RM',
protocol='icmp',
disabled=True)
]),
]),
dict(afi='ipv6',
rule_sets=[
dict(name='V6-INGRESS',
default_action='accept',
description='This rule-set is configured by Ansible RM'),
dict(name='V6-EGRESS',
default_action='reject',
description='This rule-set is configured by Ansible RM')
])
],
state="replaced"))
commands = [
'delete firewall name V4-INGRESS rule 101 disabled',
'delete firewall name V4-EGRESS default-action',
"set firewall name V4-INGRESS description 'This is IPv4 INGRESS rule set'",
"set firewall name V4-INGRESS rule 101 protocol 'tcp'",
"set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible RM'",
"set firewall name V4-INGRESS rule 101 action 'reject'",
'set firewall name V4-INGRESS rule 102 disabled',
"set firewall name V4-INGRESS rule 102 action 'accept'",
"set firewall name V4-INGRESS rule 102 protocol 'icmp'",
"set firewall name V4-INGRESS rule 102 description 'Rule 102 is configured by Ansible RM'",
'set firewall name V4-INGRESS rule 102',
"set firewall ipv6-name V6-INGRESS description 'This rule-set is configured by Ansible RM'",
"set firewall ipv6-name V6-EGRESS description 'This rule-set is configured by Ansible RM'"
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_rule_rep_02(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='V4-INGRESS',
description='This is IPv4 V4-INGRESS rule set',
default_action='accept',
enable_default_log=False,
rules=[
dict(number='101',
action='accept',
description='Rule 101 is configured by Ansible',
ipsec='match-ipsec',
protocol='icmp',
fragment='match-frag',
disabled=True),
]),
]),
dict(afi='ipv6',
rule_sets=[
dict(
name='V6-INGRESS',
default_action='accept',
),
dict(
name='V6-EGRESS',
default_action='reject',
)
])
],
state="replaced"))
commands = [
'delete firewall name V4-INGRESS enable-default-log',
'delete firewall name V4-EGRESS default-action'
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_01(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='V4-INGRESS',
description='This is IPv4 V4-INGRESS rule set',
default_action='accept',
enable_default_log=True,
rules=[
dict(number='101',
action='accept',
description='Rule 101 is configured by Ansible',
ipsec='match-ipsec',
protocol='icmp',
fragment='match-frag',
disabled=True)
]),
dict(
name='V4-EGRESS',
default_action='reject',
),
]),
dict(afi='ipv6',
rule_sets=[
dict(
name='V6-INGRESS',
default_action='accept',
),
dict(
name='V6-EGRESS',
default_action='reject',
)
])
],
state="replaced"))
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_mer_idem_01(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='V4-INGRESS',
description='This is IPv4 V4-INGRESS rule set',
default_action='accept',
enable_default_log=True,
rules=[
dict(number='101',
action='accept',
description='Rule 101 is configured by Ansible',
ipsec='match-ipsec',
protocol='icmp',
fragment='match-frag',
disabled=True)
]),
dict(
name='V4-EGRESS',
default_action='reject',
),
]),
dict(afi='ipv6',
rule_sets=[
dict(
name='V6-INGRESS',
default_action='accept',
),
dict(
name='V6-EGRESS',
default_action='reject',
)
])
],
state="merged"))
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_ovr_01(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='V4-IN',
description='This is IPv4 INGRESS rule set',
default_action='accept',
enable_default_log=True,
rules=[
dict(number='1',
action='reject',
description='Rule 1 is configured by Ansible RM',
ipsec='match-ipsec',
protocol='tcp',
fragment='match-frag',
disabled=False),
dict(number='2',
action='accept',
description='Rule 102 is configured by Ansible RM',
protocol='icmp',
disabled=True)
]),
]),
dict(afi='ipv6',
rule_sets=[
dict(name='V6-IN',
default_action='accept',
description='This rule-set is configured by Ansible RM'),
dict(name='V6-EG',
default_action='reject',
description='This rule-set is configured by Ansible RM')
])
],
state="overridden"))
commands = [
'delete firewall ipv6-name V6-INGRESS',
'delete firewall ipv6-name V6-EGRESS',
'delete firewall name V4-INGRESS',
'delete firewall name V4-EGRESS',
"set firewall name V4-IN default-action 'accept'",
"set firewall name V4-IN description 'This is IPv4 INGRESS rule set'",
'set firewall name V4-IN enable-default-log',
"set firewall name V4-IN rule 1 protocol 'tcp'",
"set firewall name V4-IN rule 1 description 'Rule 1 is configured by Ansible RM'",
"set firewall name V4-IN rule 1 fragment 'match-frag'",
'set firewall name V4-IN rule 1',
"set firewall name V4-IN rule 1 action 'reject'",
"set firewall name V4-IN rule 1 ipsec 'match-ipsec'",
'set firewall name V4-IN rule 2 disabled',
"set firewall name V4-IN rule 2 action 'accept'",
"set firewall name V4-IN rule 2 protocol 'icmp'",
"set firewall name V4-IN rule 2 description 'Rule 102 is configured by Ansible RM'",
'set firewall name V4-IN rule 2',
"set firewall ipv6-name V6-IN default-action 'accept'",
"set firewall ipv6-name V6-IN description 'This rule-set is configured by Ansible RM'",
"set firewall ipv6-name V6-EG default-action 'reject'",
"set firewall ipv6-name V6-EG description 'This rule-set is configured by Ansible RM'"
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_rule_ovr_idem_01(self):
set_module_args(
dict(config=[
dict(afi='ipv4',
rule_sets=[
dict(name='V4-INGRESS',
description='This is IPv4 V4-INGRESS rule set',
default_action='accept',
enable_default_log=True,
rules=[
dict(number='101',
action='accept',
description='Rule 101 is configured by Ansible',
ipsec='match-ipsec',
protocol='icmp',
fragment='match-frag',
disabled=True)
]),
dict(
name='V4-EGRESS',
default_action='reject',
),
]),
dict(afi='ipv6',
rule_sets=[
dict(
name='V6-INGRESS',
default_action='accept',
),
dict(
name='V6-EGRESS',
default_action='reject',
)
])
],
state="overridden"))
self.execute_module(changed=False, commands=[])
Loading…
Cancel
Save