From 82e6a91c3f33e3e12969a5ee8a9b36f477680b5a Mon Sep 17 00:00:00 2001 From: Ivan Bojer Date: Fri, 2 Feb 2018 04:38:27 -0800 Subject: [PATCH] new module panos_query_rules (#35203) * new module --- CHANGELOG.md | 2 + .../network/panos/panos_query_rules.py | 511 ++++++++++++++++++ 2 files changed, 513 insertions(+) create mode 100644 lib/ansible/modules/network/panos/panos_query_rules.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4df306d463c..0604d4f9060 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -323,6 +323,8 @@ See [Porting Guide](http://docs.ansible.com/ansible/devel/porting_guides.html) f * onyx_pfc_interface * onyx_protocol * onyx_vlan +- panos + * panos_query_rules - radware * vdirect_commit * vdirect_runnable diff --git a/lib/ansible/modules/network/panos/panos_query_rules.py b/lib/ansible/modules/network/panos/panos_query_rules.py new file mode 100644 index 00000000000..c52636c414a --- /dev/null +++ b/lib/ansible/modules/network/panos/panos_query_rules.py @@ -0,0 +1,511 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Ansible module to manage PaloAltoNetworks Firewall +# (c) 2016, techbizdev +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . +# limitations under the License. + +ANSIBLE_METADATA = {'metadata_version': '1.1', + 'status': ['preview'], + 'supported_by': 'community'} + +DOCUMENTATION = ''' +--- +module: panos_query_rules +short_description: PANOS module that allows search for security rules in PANW NGFW devices. +description: > + - Security policies allow you to enforce rules and take action, and can be as general or specific as needed. The + policy rules are compared against the incoming traffic in sequence, and because the first rule that matches the + traffic is applied, the more specific rules must precede the more general ones. +author: "Bob Hagen (@rnh556)" +version_added: "2.5" +requirements: + - pan-python can be obtained from PyPi U(https://pypi.python.org/pypi/pan-python) + - pandevice can be obtained from PyPi U(https://pypi.python.org/pypi/pandevice) + - xmltodict can be obtains from PyPi U(https://pypi.python.org/pypi/xmltodict) +notes: + - Checkmode is not supported. + - Panorama is supported. +options: + ip_address: + description: + - IP address (or hostname) of PAN-OS firewall or Panorama management console being queried. + required: true + username: + description: + - Username credentials to use for authentication. + required: false + default: "admin" + password: + description: + - Password credentials to use for authentication. + required: true + api_key: + description: + - API key that can be used instead of I(username)/I(password) credentials. + application: + description: + - Name of the application or application group to be queried. + required: false + default: None + source_zone: + description: + - Name of the source security zone to be queried. + required: false + default: None + source_ip: + description: + - The source IP address to be queried. + required: false + default: None + source_port: + description: + - The source port to be queried. + required: false + default: None + destination_zone: + description: + - Name of the destination security zone to be queried. + required: false + default: None + destination_ip: + description: + - The destination IP address to be queried. + required: false + default: None + destination_port: + description: + - The destination port to be queried. + required: false + default: None + protocol: + description: + - The protocol used to be queried. Must be either I(tcp) or I(udp). + required: false + default: None + tag_name: + description: + - Name of the rule tag to be queried. + required: false + default: None + devicegroup: + description: + - The Panorama device group in which to conduct the query. + required: false + default: None +''' + +EXAMPLES = ''' +- name: search for rules with tcp/3306 + panos_query_rules: + ip_address: '{{ ip_address }}' + username: '{{ username }}' + password: '{{ password }}' + source_zone: 'DevNet' + destination_zone: 'DevVPC' + destination_port: '3306' + protocol: 'tcp' + +- name: search devicegroup for inbound rules to dmz host + panos_query_rules: + ip_address: '{{ ip_address }}' + api_key: '{{ api_key }}' + destination_zone: 'DMZ' + destination_ip: '10.100.42.18' + address: 'DeviceGroupA' + +- name: search for rules containing a specified rule tag + panos_query_rules: + ip_address: '{{ ip_address }}' + username: '{{ username }}' + password: '{{ password }}' + tag_name: 'ProjectX' +''' + +RETURN = ''' +# Default return values +''' + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.basic import get_exception + +try: + import pan.xapi + from pan.xapi import PanXapiError + import pandevice + from pandevice import base + from pandevice import firewall + from pandevice import panorama + from pandevice import objects + from pandevice import policies + import ipaddress + import xmltodict + import json + HAS_LIB = True +except ImportError: + HAS_LIB = False + + +def get_devicegroup(device, devicegroup): + dg_list = device.refresh_devices() + for group in dg_list: + if isinstance(group, pandevice.panorama.DeviceGroup): + if group.name == devicegroup: + return group + return False + + +def get_rulebase(device, devicegroup): + # Build the rulebase + if isinstance(device, firewall.Firewall): + rulebase = policies.Rulebase() + device.add(rulebase) + elif isinstance(device, panorama.Panorama): + dg = panorama.DeviceGroup(devicegroup) + device.add(dg) + rulebase = policies.PreRulebase() + dg.add(rulebase) + else: + return False + policies.SecurityRule.refreshall(rulebase) + return rulebase + + +def get_object(device, dev_group, obj_name): + # Search global address objects + match = device.find(obj_name, objects.AddressObject) + if match: + return match + + # Search global address groups + match = device.find(obj_name, objects.AddressGroup) + if match: + return match + + # Search Panorama device group + if isinstance(device, pandevice.panorama.Panorama): + # Search device group address objects + match = dev_group.find(obj_name, objects.AddressObject) + if match: + return match + + # Search device group address groups + match = dev_group.find(obj_name, objects.AddressGroup) + if match: + return match + return False + + +def addr_in_obj(addr, obj): + ip = ipaddress.ip_address(addr) + # Process address objects + if isinstance(obj, objects.AddressObject): + if obj.type == 'ip-netmask': + net = ipaddress.ip_network(obj.value) + if ip in net: + return True + if obj.type == 'ip-range': + ip_range = obj.value.split('-') + lower = ipaddress.ip_address(ip_range[0]) + upper = ipaddress.ip_address(ip_range[1]) + if lower < ip < upper: + return True + return False + + +def get_services(device, dev_group, svc_list, obj_list): + for svc in svc_list: + + # Search global address objects + global_obj_match = device.find(svc, objects.ServiceObject) + if global_obj_match: + obj_list.append(global_obj_match) + + # Search global address groups + global_grp_match = device.find(svc, objects.ServiceGroup) + if global_grp_match: + get_services(device, dev_group, global_grp_match.value, obj_list) + + # Search Panorama device group + if isinstance(device, pandevice.panorama.Panorama): + + # Search device group address objects + dg_obj_match = dev_group.find(svc, objects.ServiceObject) + if dg_obj_match: + obj_list.append(dg_obj_match) + + # Search device group address groups + dg_grp_match = dev_group.find(svc, objects.ServiceGroup) + if dg_grp_match: + get_services(device, dev_group, dg_grp_match.value, obj_list) + + return obj_list + + +def port_in_svc(orientation, port, protocol, obj): + # Process address objects + if orientation is 'source': + for x in obj.source_port.split(','): + if '-' in x: + port_range = x.split('-') + lower = int(port_range[0]) + upper = int(port_range[1]) + if (lower <= int(port) <= upper) and (obj.protocol == protocol): + return True + else: + if port == x and obj.protocol == protocol: + return True + elif orientation is 'destination': + for x in obj.destination_port.split(','): + if '-' in x: + port_range = x.split('-') + lower = int(port_range[0]) + upper = int(port_range[1]) + if (lower <= int(port) <= upper) and (obj.protocol == protocol): + return True + else: + if port == x and obj.protocol == protocol: + return True + return False + + +def get_tag(device, dev_group, tag_name): + # Search global address objects + match = device.find(tag_name, objects.Tag) + if match: + return match + # Search Panorama device group + if isinstance(device, panorama.Panorama): + # Search device group address objects + match = dev_group.find(tag_name, objects.Tag) + if match: + return match + return False + + +def main(): + argument_spec = dict( + ip_address=dict(required=True), + password=dict(no_log=True), + username=dict(default='admin'), + api_key=dict(no_log=True), + application=dict(default=None), + source_zone=dict(default=None), + destination_zone=dict(default=None), + source_ip=dict(default=None), + destination_ip=dict(default=None), + source_port=dict(default=None), + destination_port=dict(default=None), + protocol=dict(default=None, choices=['tcp', 'udp']), + tag_name=dict(default=None), + devicegroup=dict(default=None) + ) + module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=False, + required_one_of=[['api_key', 'password']] + ) + if not HAS_LIB: + module.fail_json(msg='Missing required libraries.') + + ip_address = module.params["ip_address"] + password = module.params["password"] + username = module.params['username'] + api_key = module.params['api_key'] + application = module.params['application'] + source_zone = module.params['source_zone'] + source_ip = module.params['source_ip'] + source_port = module.params['source_port'] + destination_zone = module.params['destination_zone'] + destination_ip = module.params['destination_ip'] + destination_port = module.params['destination_port'] + protocol = module.params['protocol'] + tag_name = module.params['tag_name'] + devicegroup = module.params['devicegroup'] + + # Create the device with the appropriate pandevice type + device = base.PanDevice.create_from_device(ip_address, username, password, api_key=api_key) + + # Grab the global objects + objects.AddressObject.refreshall(device) + objects.AddressGroup.refreshall(device) + objects.ServiceObject.refreshall(device) + objects.ServiceGroup.refreshall(device) + objects.Tag.refreshall(device) + + # If Panorama, validate the devicegroup and grab the devicegroup objects + dev_group = None + if devicegroup and isinstance(device, panorama.Panorama): + dev_group = get_devicegroup(device, devicegroup) + if dev_group: + device.add(dev_group) + objects.AddressObject.refreshall(dev_group) + objects.AddressGroup.refreshall(dev_group) + objects.ServiceObject.refreshall(dev_group) + objects.ServiceGroup.refreshall(dev_group) + objects.Tag.refreshall(dev_group) + else: + module.fail_json( + failed=1, + msg='\'%s\' device group not found in Panorama. Is the name correct?' % devicegroup + ) + + # Build the rulebase and produce list + rulebase = get_rulebase(device, dev_group) + rulelist = rulebase.children + hitbase = policies.Rulebase() + loose_match = True + + # Process each rule + for rule in rulelist: + hitlist = [] + + if source_zone: + source_zone_match = False + if loose_match and 'any' in rule.fromzone: + source_zone_match = True + else: + for object_string in rule.fromzone: + if object_string == source_zone: + source_zone_match = True + hitlist.append(source_zone_match) + + if destination_zone: + destination_zone_match = False + if loose_match and 'any' in rule.tozone: + destination_zone_match = True + else: + for object_string in rule.tozone: + if object_string == destination_zone: + destination_zone_match = True + hitlist.append(destination_zone_match) + + if source_ip: + source_ip_match = False + if loose_match and 'any' in rule.source: + source_ip_match = True + else: + for object_string in rule.source: + # Get a valid AddressObject or AddressGroup + obj = get_object(device, dev_group, object_string) + # Otherwise the object_string is not an object and should be handled differently + if obj is False: + if '-' in object_string: + obj = ipaddress.ip_address(source_ip) + source_range = object_string.split('-') + source_lower = ipaddress.ip_address(source_range[0]) + source_upper = ipaddress.ip_address(source_range[1]) + if source_lower <= obj <= source_upper: + source_ip_match = True + else: + if source_ip == object_string: + source_ip_match = True + if isinstance(obj, objects.AddressObject) and addr_in_obj(source_ip, obj): + source_ip_match = True + elif isinstance(obj, objects.AddressGroup) and obj.static_value: + for member_string in obj.static_value: + member = get_object(device, dev_group, member_string) + if addr_in_obj(source_ip, member): + source_ip_match = True + hitlist.append(source_ip_match) + + if destination_ip: + destination_ip_match = False + if loose_match and 'any' in rule.destination: + destination_ip_match = True + else: + for object_string in rule.destination: + # Get a valid AddressObject or AddressGroup + obj = get_object(device, dev_group, object_string) + # Otherwise the object_string is not an object and should be handled differently + if obj is False: + if '-' in object_string: + obj = ipaddress.ip_address(destination_ip) + destination_range = object_string.split('-') + destination_lower = ipaddress.ip_address(destination_range[0]) + destination_upper = ipaddress.ip_address(destination_range[1]) + if destination_lower <= obj <= destination_upper: + destination_ip_match = True + else: + if destination_ip == object_string: + destination_ip_match = True + if isinstance(obj, objects.AddressObject) and addr_in_obj(destination_ip, obj): + destination_ip_match = True + elif isinstance(obj, objects.AddressGroup) and obj.static_value: + for member_string in obj.static_value: + member = get_object(device, dev_group, member_string) + if addr_in_obj(destination_ip, member): + destination_ip_match = True + hitlist.append(destination_ip_match) + + if source_port: + source_port_match = False + orientation = 'source' + if loose_match and (rule.service[0] == 'any'): + source_port_match = True + elif rule.service[0] == 'application-default': + source_port_match = False # Fix this once apps are supported + else: + service_list = [] + service_list = get_services(device, dev_group, rule.service, service_list) + for obj in service_list: + if port_in_svc(orientation, source_port, protocol, obj): + source_port_match = True + break + hitlist.append(source_port_match) + + if destination_port: + destination_port_match = False + orientation = 'destination' + if loose_match and (rule.service[0] == 'any'): + destination_port_match = True + elif rule.service[0] == 'application-default': + destination_port_match = False # Fix this once apps are supported + else: + service_list = [] + service_list = get_services(device, dev_group, rule.service, service_list) + for obj in service_list: + if port_in_svc(orientation, destination_port, protocol, obj): + destination_port_match = True + break + hitlist.append(destination_port_match) + + if tag_name: + tag_match = False + if rule.tag: + for object_string in rule.tag: + obj = get_tag(device, dev_group, object_string) + if obj and (obj.name == tag_name): + tag_match = True + hitlist.append(tag_match) + + # Add to hit rulebase + if False not in hitlist: + hitbase.add(rule) + + # Dump the hit rulebase + if hitbase.children: + output_string = xmltodict.parse(hitbase.element_str()) + module.exit_json( + stdout_lines=json.dumps(output_string, indent=2), + msg='%s of %s rules matched' % (hitbase.children.__len__(), rulebase.children.__len__()) + ) + else: + module.fail_json(msg='No matching rules found.') + + +if __name__ == '__main__': + main()