From 624a8dd1d9b7e4c2463d320122ea8a01ac4d1f65 Mon Sep 17 00:00:00 2001 From: Toshio Kuratomi Date: Sun, 1 Feb 2015 17:05:37 -0800 Subject: [PATCH] Replace large if-elif-else blocks with a dict-dispatcher --- lib/ansible/runner/filter_plugins/ipaddr.py | 530 ++++++++++++-------- 1 file changed, 308 insertions(+), 222 deletions(-) diff --git a/lib/ansible/runner/filter_plugins/ipaddr.py b/lib/ansible/runner/filter_plugins/ipaddr.py index d0408d6e7b4..bf8398fc79d 100644 --- a/lib/ansible/runner/filter_plugins/ipaddr.py +++ b/lib/ansible/runner/filter_plugins/ipaddr.py @@ -28,16 +28,287 @@ else: mac_linux.word_fmt = '%.2x' +# ---- IP address and network query helpers ---- + +def _empty_ipaddr_query(v, vtype): + # We don't have any query to process, so just check what type the user + # expects, and return the IP address in a correct format + if v: + if vtype == 'address': + return str(v.ip) + elif vtype == 'network': + return str(v) + +def _6to4_query(v, vtype, value): + if v.version == 4: + + if v.size == 1: + ipconv = str(v.ip) + elif v.size > 1: + if v.ip != v.network: + ipconv = str(v.ip) + else: + ipconv = False + + if ipaddr(ipconv, 'public'): + numbers = list(map(int, ipconv.split('.'))) + + try: + return '2002:{:02x}{:02x}:{:02x}{:02x}::1/48'.format(*numbers) + except: + return False + + elif v.version == 6: + if vtype == 'address': + if ipaddr(str(v), '2002::/16'): + return value + elif vtype == 'network': + if v.ip != v.network: + if ipaddr(str(v.ip), '2002::/16'): + return value + else: + return False + +def _ip_query(v): + if v.size == 1: + return str(v.ip) + if v.size > 1: + if v.ip != v.network: + return str(v.ip) + +def _gateway_query(v): + if v.size > 1: + if v.ip != v.network: + return str(v.ip) + '/' + str(v.prefixlen) + +def _bool_ipaddr_query(v): + if v: + return True + +def _broadcast_query(v): + if v.size > 1: + return str(v.broadcast) + +def _cidr_query(v): + return str(v) + +def _cidr_lookup_query(v, iplist, value): + try: + if v in iplist: + return value + except: + return False + +def _host_query(v): + if v.size == 1: + return str(v) + elif v.size > 1: + if v.ip != v.network: + return str(v.ip) + '/' + str(v.prefixlen) + +def _hostmask_query(v): + return str(v.hostmask) + +def _int_query(v, vtype): + if vtype == 'address': + return int(v.ip) + elif vtype == 'network': + return str(int(v.ip)) + '/' + str(int(v.prefixlen)) + +def _ipv4_query(v, value): + if v.version == 6: + try: + return str(v.ipv4()) + except: + return False + else: + return value + +def _ipv6_query(v, value): + if v.version == 4: + return str(v.ipv6()) + else: + return value + +def _link_local_query(v, value): + v_ip = netaddr.IPAddress(str(v.ip)) + if v.version == 4: + if ipaddr(str(v_ip), '169.254.0.0/24'): + return value + + elif v.version == 6: + if ipaddr(str(v_ip), 'fe80::/10'): + return value + +def _loopback_query(v, value): + v_ip = netaddr.IPAddress(str(v.ip)) + if v_ip.is_loopback(): + return value + +def _multicast_query(v, value): + if v.is_multicast(): + return value + +def _net_query(v): + if v.size > 1: + if v.ip == v.network: + return str(v.network) + '/' + str(v.prefixlen) + +def _netmask_query(v): + if v.size > 1: + return str(v.netmask) + +def _network_query(v): + if v.size > 1: + return str(v.network) + +def _prefix_query(v): + return int(v.prefixlen) + +def _private_query(v, value): + if v.is_private(): + return value + +def _public_query(v, value): + v_ip = netaddr.IPAddress(str(v.ip)) + if v_ip.is_unicast() and not v_ip.is_private() and \ + not v_ip.is_loopback() and not v_ip.is_netmask() and \ + not v_ip.is_hostmask(): + return value + +def _revdns_query(v): + v_ip = netaddr.IPAddress(str(v.ip)) + return v_ip.reverse_dns + +def _size_query(v): + return v.size + +def _subnet_query(v): + return str(v.cidr) + +def _type_query(v): + if v.size == 1: + return 'address' + if v.size > 1: + if v.ip != v.network: + return 'address' + else: + return 'network' + +def _unicast_query(v, value): + if v.is_unicast(): + return value + +def _version_query(v): + return v.version + +def _wrap_query(v, vtype, value): + if v.version == 6: + if vtype == 'address': + return '[' + str(v.ip) + ']' + elif vtype == 'network': + return '[' + str(v.ip) + ']/' + str(v.prefixlen) + else: + return value + + +# ---- HWaddr query helpers ---- +def _bare_query(v): + v.dialect = netaddr.mac_bare + return str(v) + +def _bool_hwaddr_query(v): + if v: + return True + +def _cisco_query(v): + v.dialect = netaddr.mac_cisco + return str(v) + +def _empty_hwaddr_query(v, value): + if v: + return value + +def _linux_query(v): + v.dialect = mac_linux + return str(v) + +def _postgresql_query(v): + v.dialect = netaddr.mac_pgsql + return str(v) + +def _unix_query(v): + v.dialect = netaddr.mac_unix + return str(v) + +def _win_query(v): + v.dialect = netaddr.mac_eui48 + return str(v) + + # ---- IP address and network filters ---- def ipaddr(value, query = '', version = False, alias = 'ipaddr'): ''' Check if string is an IP address or network and filter it ''' - query_types = [ 'type', 'bool', 'int', 'version', 'size', 'address', 'ip', 'host', \ - 'network', 'subnet', 'prefix', 'broadcast', 'netmask', 'hostmask', \ - 'unicast', 'multicast', 'private', 'public', 'loopback', 'lo', \ - 'revdns', 'wrap', 'ipv6', 'v6', 'ipv4', 'v4', 'cidr', 'net', \ - 'hostnet', 'router', 'gateway', 'gw', 'host/prefix', 'address/prefix' ] + query_func_extra_args = { + '': ('vtype',), + '6to4': ('vtype', 'value'), + 'cidr_lookup': ('iplist', 'value'), + 'int': ('vtype',), + 'ipv4': ('value',), + 'ipv6': ('value',), + 'link-local': ('value',), + 'loopback': ('value',), + 'lo': ('value',), + 'multicast': ('value',), + 'private': ('value',), + 'public': ('value',), + 'unicast': ('value',), + 'wrap': ('vtype', 'value'), + } + query_func_map = { + '': _empty_ipaddr_query, + '6to4': _6to4_query, + 'address': _ip_query, + 'address/prefix': _gateway_query, + 'bool': _bool_ipaddr_query, + 'broadcast': _broadcast_query, + 'cidr': _cidr_query, + 'cidr_lookup': _cidr_lookup_query, + 'gateway': _gateway_query, + 'gw': _gateway_query, + 'host': _host_query, + 'host/prefix': _gateway_query, + 'hostmask': _hostmask_query, + 'hostnet': _gateway_query, + 'int': _int_query, + 'ip': _ip_query, + 'ipv4': _ipv4_query, + 'ipv6': _ipv6_query, + 'link-local': _link_local_query, + 'lo': _loopback_query, + 'loopback': _loopback_query, + 'multicast': _multicast_query, + 'net': _net_query, + 'netmask': _netmask_query, + 'network': _network_query, + 'prefix': _prefix_query, + 'private': _private_query, + 'public': _public_query, + 'revdns': _revdns_query, + 'router': _gateway_query, + 'size': _size_query, + 'subnet': _subnet_query, + 'type': _type_query, + 'unicast': _unicast_query, + 'v4': _ipv4_query, + 'v6': _ipv6_query, + 'version': _version_query, + 'wrap': _wrap_query, + } + + vtype = None if not value: return False @@ -136,13 +407,12 @@ def ipaddr(value, query = '', version = False, alias = 'ipaddr'): value = str(v) vtype = 'network' - v_ip = netaddr.IPAddress(str(v.ip)) - # We have a query string but it's not in the known query types. Check if # that string is a valid subnet, if so, we can check later if given IP # address/network is inside that specific subnet try: - if query and query not in query_types and ipaddr(query, 'network'): + ### ?? 6to4 and link-local were True here before. Should they still? + if query and (query not in query_func_map or query == 'cidr_lookup') and ipaddr(query, 'network'): iplist = netaddr.IPSet([netaddr.IPNetwork(query)]) query = 'cidr_lookup' except: @@ -154,185 +424,12 @@ def ipaddr(value, query = '', version = False, alias = 'ipaddr'): if version and v.version != version: return False - # We don't have any query to process, so just check what type the user - # expects, and return the IP address in a correct format - if not query: - if v: - if vtype == 'address': - return str(v.ip) - elif vtype == 'network': - return str(v) - - elif query == 'type': - if v.size == 1: - return 'address' - if v.size > 1: - if v.ip != v.network: - return 'address' - else: - return 'network' - - elif query == 'bool': - if v: - return True - - elif query == 'int': - if vtype == 'address': - return int(v.ip) - elif vtype == 'network': - return str(int(v.ip)) + '/' + str(int(v.prefixlen)) - - elif query == 'version': - return v.version - - elif query == 'size': - return v.size - - elif query in [ 'address', 'ip' ]: - if v.size == 1: - return str(v.ip) - if v.size > 1: - if v.ip != v.network: - return str(v.ip) - - elif query == 'host': - if v.size == 1: - return str(v) - elif v.size > 1: - if v.ip != v.network: - return str(v.ip) + '/' + str(v.prefixlen) - - elif query == 'net': - if v.size > 1: - if v.ip == v.network: - return str(v.network) + '/' + str(v.prefixlen) - - elif query in [ 'hostnet', 'router', 'gateway', 'gw', 'host/prefix', 'address/prefix' ]: - if v.size > 1: - if v.ip != v.network: - return str(v.ip) + '/' + str(v.prefixlen) - - elif query == 'network': - if v.size > 1: - return str(v.network) - - elif query == 'subnet': - return str(v.cidr) - - elif query == 'cidr': - return str(v) - - elif query == 'prefix': - return int(v.prefixlen) - - elif query == 'broadcast': - if v.size > 1: - return str(v.broadcast) - - elif query == 'netmask': - if v.size > 1: - return str(v.netmask) - - elif query == 'hostmask': - return str(v.hostmask) - - elif query == 'unicast': - if v.is_unicast(): - return value - - elif query == 'multicast': - if v.is_multicast(): - return value - - elif query == 'link-local': - if v.version == 4: - if ipaddr(str(v_ip), '169.254.0.0/24'): - return value - - elif v.version == 6: - if ipaddr(str(v_ip), 'fe80::/10'): - return value - - elif query == 'private': - if v.is_private(): - return value - - elif query == 'public': - if v_ip.is_unicast() and not v_ip.is_private() and \ - not v_ip.is_loopback() and not v_ip.is_netmask() and \ - not v_ip.is_hostmask(): - return value - - elif query in [ 'loopback', 'lo' ]: - if v_ip.is_loopback(): - return value - - elif query == 'revdns': - return v_ip.reverse_dns - - elif query == 'wrap': - if v.version == 6: - if vtype == 'address': - return '[' + str(v.ip) + ']' - elif vtype == 'network': - return '[' + str(v.ip) + ']/' + str(v.prefixlen) - else: - return value - - elif query in [ 'ipv6', 'v6' ]: - if v.version == 4: - return str(v.ipv6()) - else: - return value - - elif query in [ 'ipv4', 'v4' ]: - if v.version == 6: - try: - return str(v.ipv4()) - except: - return False - else: - return value - - elif query == '6to4': - - if v.version == 4: - - if v.size == 1: - ipconv = str(v.ip) - elif v.size > 1: - if v.ip != v.network: - ipconv = str(v.ip) - else: - ipconv = False - - if ipaddr(ipconv, 'public'): - numbers = list(map(int, ipconv.split('.'))) - - try: - return '2002:{:02x}{:02x}:{:02x}{:02x}::1/48'.format(*numbers) - except: - return False - - elif v.version == 6: - if vtype == 'address': - if ipaddr(str(v), '2002::/16'): - return value - elif vtype == 'network': - if v.ip != v.network: - if ipaddr(str(v.ip), '2002::/16'): - return value - else: - return False - - elif query == 'cidr_lookup': - try: - if v in iplist: - return value - except: - return False - - else: + extras = [] + for arg in query_func_extra_args.get(query, tuple()): + extras.append(locals()[arg]) + try: + return query_func_map[query](v, *extras) + except KeyError: try: float(query) if v.size == 1: @@ -462,45 +559,35 @@ def ipsubnet(value, query = '', index = 'x'): def hwaddr(value, query = '', alias = 'hwaddr'): ''' Check if string is a HW/MAC address and filter it ''' + query_func_extra_args = { + '': ('value',), + } + query_func_map = { + '': _empty_hwaddr_query, + 'bare': _bare_query, + 'bool': _bool_hwaddr_query, + 'cisco': _cisco_query, + 'eui48': _win_query, + 'linux': _linux_query, + 'pgsql': _postgresql_query, + 'postgresql': _postgresql_query, + 'psql': _postgresql_query, + 'unix': _unix_query, + 'win': _win_query, + } + try: v = netaddr.EUI(value) except: - if query and query not in [ 'bool' ]: + if query and query != 'bool': raise errors.AnsibleFilterError(alias + ': not a hardware address: %s' % value) - if not query: - if v: - return value - - elif query == 'bool': - if v: - return True - - elif query in [ 'win', 'eui48' ]: - v.dialect = netaddr.mac_eui48 - return str(v) - - elif query == 'unix': - v.dialect = netaddr.mac_unix - return str(v) - - elif query in [ 'pgsql', 'postgresql', 'psql' ]: - v.dialect = netaddr.mac_pgsql - return str(v) - - elif query == 'cisco': - v.dialect = netaddr.mac_cisco - return str(v) - - elif query == 'bare': - v.dialect = netaddr.mac_bare - return str(v) - - elif query == 'linux': - v.dialect = mac_linux - return str(v) - - else: + extras = [] + for arg in query_func_extra_args.get(query, tuple()): + extras.append(locals()[arg]) + try: + return query_func_map[query](v, *extras) + except KeyError: raise errors.AnsibleFilterError(alias + ': unknown filter type: %s' % query) return False @@ -508,7 +595,6 @@ def hwaddr(value, query = '', alias = 'hwaddr'): def macaddr(value, query = ''): return hwaddr(value, query, alias = 'macaddr') - def _need_netaddr(*args, **kwargs): raise errors.AnsibleFilterError('python-netaddr package is not installed')