diff --git a/lib/ansible/module_utils/hetzner.py b/lib/ansible/module_utils/hetzner.py index 87de79a5a5a..2bc3d1666a5 100644 --- a/lib/ansible/module_utils/hetzner.py +++ b/lib/ansible/module_utils/hetzner.py @@ -17,6 +17,9 @@ __metaclass__ = type from ansible.module_utils.urls import fetch_url from ansible.module_utils.six.moves.urllib.parse import urlencode +import time + + HETZNER_DEFAULT_ARGUMENT_SPEC = dict( hetzner_user=dict(type='str', required=True), hetzner_password=dict(type='str', required=True, no_log=True), @@ -57,6 +60,40 @@ def fetch_url_json(module, url, method='GET', timeout=10, data=None, headers=Non module.fail_json(msg='Cannot decode content retrieved from {0}'.format(url)) +class CheckDoneTimeoutException(Exception): + def __init__(self, result, error): + super(CheckDoneTimeoutException, self).__init__() + self.result = result + self.error = error + + +def fetch_url_json_with_retries(module, url, check_done_callback, check_done_delay=10, check_done_timeout=180, skip_first=False, **kwargs): + ''' + Make general request to Hetzner's JSON robot API, with retries until a condition is satisfied. + + The condition is tested by calling ``check_done_callback(result, error)``. If it is not satisfied, + it will be retried with delays ``check_done_delay`` (in seconds) until a total timeout of + ``check_done_timeout`` (in seconds) since the time the first request is started is reached. + + If ``skip_first`` is specified, will assume that a first call has already been made and will + directly start with waiting. + ''' + start_time = time.time() + if not skip_first: + result, error = fetch_url_json(module, url, **kwargs) + if check_done_callback(result, error): + return result, error + while True: + elapsed = (time.time() - start_time) + left_time = check_done_timeout - elapsed + time.sleep(max(min(check_done_delay, left_time), 0)) + result, error = fetch_url_json(module, url, **kwargs) + if check_done_callback(result, error): + return result, error + if left_time < check_done_delay: + raise CheckDoneTimeoutException(result, error) + + # ##################################################################################### # ## FAILOVER IP ###################################################################### diff --git a/lib/ansible/modules/net_tools/hetzner_firewall.py b/lib/ansible/modules/net_tools/hetzner_firewall.py new file mode 100644 index 00000000000..4b9847c4720 --- /dev/null +++ b/lib/ansible/modules/net_tools/hetzner_firewall.py @@ -0,0 +1,513 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# (c) 2019 Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +ANSIBLE_METADATA = {'metadata_version': '1.1', + 'status': ['preview'], + 'supported_by': 'community'} + + +DOCUMENTATION = r''' +--- +module: hetzner_firewall +version_added: "2.10" +short_description: Manage Hetzner's dedicated server firewall +author: + - Felix Fontein (@felixfontein) +description: + - Manage Hetzner's dedicated server firewall. + - Note that idempotency check for TCP flags simply compares strings and doesn't + try to interpret the rules. This might change in the future. +seealso: + - name: Firewall documentation + description: Hetzner's documentation on the stateless firewall for dedicated servers + link: https://wiki.hetzner.de/index.php/Robot_Firewall/en + - module: hetzner_firewall_info + description: Retrieve information on firewall configuration. +extends_documentation_fragment: + - hetzner +options: + server_ip: + description: The server's main IP address. + required: yes + type: str + port: + description: + - Switch port of firewall. + type: str + choices: [ main, kvm ] + default: main + state: + description: + - Status of the firewall. + - Firewall is active if state is C(present), and disabled if state is C(absent). + type: str + default: present + choices: [ present, absent ] + whitelist_hos: + description: + - Whether Hetzner services have access. + type: bool + rules: + description: + - Firewall rules. + type: dict + suboptions: + input: + description: + - Input firewall rules. + type: list + elements: dict + suboptions: + name: + description: + - Name of the firewall rule. + type: str + ip_version: + description: + - Internet protocol version. + - Note that currently, only IPv4 is supported by Hetzner. + required: yes + type: str + choices: [ ipv4, ipv6 ] + dst_ip: + description: + - Destination IP address or subnet address. + - CIDR notation. + type: str + dst_port: + description: + - Destination port or port range. + type: str + src_ip: + description: + - Source IP address or subnet address. + - CIDR notation. + type: str + src_port: + description: + - Source port or port range. + type: str + protocol: + description: + - Protocol above IP layer + type: str + tcp_flags: + description: + - TCP flags or logical combination of flags. + - Flags supported by Hetzner are C(syn), C(fin), C(rst), C(psh) and C(urg). + - They can be combined with C(|) (logical or) and C(&) (logical and). + - See U(the documentation,https://wiki.hetzner.de/index.php/Robot_Firewall/en#Parameter) + for more information. + type: str + action: + description: + - Action if rule matches. + required: yes + type: str + choices: [ accept, discard ] + update_timeout: + description: + - Timeout to use when configuring the firewall. + - Note that the API call returns before the firewall has been + successfully set up. + type: int + default: 30 + wait_for_configured: + description: + - Whether to wait until the firewall has been successfully configured before + determining what to do, and before returning from the module. + - The API returns status C(in progress) when the firewall is currently + being configured. If this happens, the module will try again until + the status changes to C(active) or C(disabled). + - Please note that there is a request limit. If you have to do multiple + updates, it can be better to disable waiting, and regularly use + M(hetzner_firewall_info) to query status. + type: bool + default: yes + wait_delay: + description: + - Delay to wait (in seconds) before checking again whether the firewall has + been configured. + type: int + default: 10 + timeout: + description: + - Timeout (in seconds) for waiting for firewall to be configured. + type: int + default: 180 +''' + +EXAMPLES = r''' +- name: Configure firewall for server with main IP 1.2.3.4 + hetzner_firewall: + hetzner_user: foo + hetzner_password: bar + server_ip: 1.2.3.4 + status: active + whitelist_hos: yes + rules: + input: + - name: Allow everything to ports 20-23 from 4.3.2.1/24 + ip_version: ipv4 + src_ip: 4.3.2.1/24 + dst_port: '20-23' + action: accept + - name: Allow everything to port 443 + ip_version: ipv4 + dst_port: '443' + action: accept + - name: Drop everything else + ip_version: ipv4 + action: discard + register: result + +- debug: + msg: "{{ result }}" +''' + +RETURN = r''' +firewall: + description: + - The firewall configuration. + type: dict + returned: success + contains: + port: + description: + - Switch port of firewall. + - C(main) or C(kvm). + type: str + sample: main + server_ip: + description: + - Server's main IP address. + type: str + sample: 1.2.3.4 + server_number: + description: + - Hetzner's internal server number. + type: int + sample: 12345 + status: + description: + - Status of the firewall. + - C(active) or C(disabled). + - Will be C(in process) if the firewall is currently updated, and + I(wait_for_configured) is set to C(no) or I(timeout) to a too small value. + type: str + sample: active + whitelist_hos: + description: + - Whether Hetzner services have access. + type: bool + sample: true + rules: + description: + - Firewall rules. + type: dict + contains: + input: + description: + - Input firewall rules. + type: list + elements: dict + contains: + name: + description: + - Name of the firewall rule. + type: str + sample: Allow HTTP access to server + ip_version: + description: + - Internet protocol version. + type: str + sample: ipv4 + dst_ip: + description: + - Destination IP address or subnet address. + - CIDR notation. + type: str + sample: 1.2.3.4/32 + dst_port: + description: + - Destination port or port range. + type: str + sample: "443" + src_ip: + description: + - Source IP address or subnet address. + - CIDR notation. + type: str + sample: null + src_port: + description: + - Source port or port range. + type: str + sample: null + protocol: + description: + - Protocol above IP layer + type: str + sample: tcp + tcp_flags: + description: + - TCP flags or logical combination of flags. + type: str + sample: null + action: + description: + - Action if rule matches. + - C(accept) or C(discard). + type: str + sample: accept +''' + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.compat import ipaddress as compat_ipaddress +from ansible.module_utils.hetzner import ( + HETZNER_DEFAULT_ARGUMENT_SPEC, + BASE_URL, + fetch_url_json, + fetch_url_json_with_retries, + CheckDoneTimeoutException, +) +from ansible.module_utils.six.moves.urllib.parse import urlencode +from ansible.module_utils._text import to_native, to_text + + +RULE_OPTION_NAMES = [ + 'name', 'ip_version', 'dst_ip', 'dst_port', 'src_ip', 'src_port', + 'protocol', 'tcp_flags', 'action', +] + +RULES = ['input'] + + +def restrict_dict(dictionary, fields): + result = dict() + for k, v in dictionary.items(): + if k in fields: + result[k] = v + return result + + +def restrict_firewall_config(config): + result = restrict_dict(config, ['port', 'status', 'whitelist_hos']) + result['rules'] = dict() + for ruleset in RULES: + result['rules'][ruleset] = [ + restrict_dict(rule, RULE_OPTION_NAMES) + for rule in config['rules'].get(ruleset) or [] + ] + return result + + +def update(before, after, params, name): + bv = before.get(name) + after[name] = bv + changed = False + pv = params[name] + if pv is not None: + changed = pv != bv + if changed: + after[name] = pv + return changed + + +def normalize_ip(ip, ip_version): + if ip is None: + return ip + if '/' in ip: + ip, range = ip.split('/') + else: + ip, range = ip, '' + ip_addr = to_native(compat_ipaddress.ip_address(to_text(ip)).compressed) + if range == '': + range = '32' if ip_version.lower() == 'ipv4' else '128' + return ip_addr + '/' + range + + +def update_rules(before, after, params, ruleset): + before_rules = before['rules'][ruleset] + after_rules = after['rules'][ruleset] + params_rules = params['rules'][ruleset] + changed = len(before_rules) != len(params_rules) + for no, rule in enumerate(params_rules): + rule['src_ip'] = normalize_ip(rule['src_ip'], rule['ip_version']) + rule['dst_ip'] = normalize_ip(rule['dst_ip'], rule['ip_version']) + if no < len(before_rules): + before_rule = before_rules[no] + before_rule['src_ip'] = normalize_ip(before_rule['src_ip'], before_rule['ip_version']) + before_rule['dst_ip'] = normalize_ip(before_rule['dst_ip'], before_rule['ip_version']) + if before_rule != rule: + changed = True + after_rules.append(rule) + return changed + + +def encode_rule(output, rulename, input): + for i, rule in enumerate(input['rules'][rulename]): + for k, v in rule.items(): + if v is not None: + output['rules[{0}][{1}][{2}]'.format(rulename, i, k)] = v + + +def create_default_rules_object(): + rules = dict() + for ruleset in RULES: + rules[ruleset] = [] + return rules + + +def firewall_configured(result, error): + return result['firewall']['status'] != 'in process' + + +def main(): + argument_spec = dict( + server_ip=dict(type='str', required=True), + port=dict(type='str', default='main', choices=['main', 'kvm']), + state=dict(type='str', default='present', choices=['present', 'absent']), + whitelist_hos=dict(type='bool'), + rules=dict(type='dict', options=dict( + input=dict(type='list', elements='dict', options=dict( + name=dict(type='str'), + ip_version=dict(type='str', required=True, choices=['ipv4', 'ipv6']), + dst_ip=dict(type='str'), + dst_port=dict(type='str'), + src_ip=dict(type='str'), + src_port=dict(type='str'), + protocol=dict(type='str'), + tcp_flags=dict(type='str'), + action=dict(type='str', required=True, choices=['accept', 'discard']), + )), + )), + update_timeout=dict(type='int', default=30), + wait_for_configured=dict(type='bool', default=True), + wait_delay=dict(type='int', default=10), + timeout=dict(type='int', default=180), + ) + argument_spec.update(HETZNER_DEFAULT_ARGUMENT_SPEC) + module = AnsibleModule( + argument_spec=argument_spec, + supports_check_mode=True, + ) + + # Sanitize input + module.params['status'] = 'active' if (module.params['state'] == 'present') else 'disabled' + if module.params['rules'] is None: + module.params['rules'] = {} + if module.params['rules'].get('input') is None: + module.params['rules']['input'] = [] + + server_ip = module.params['server_ip'] + + # https://robot.your-server.de/doc/webservice/en.html#get-firewall-server-ip + url = "{0}/firewall/{1}".format(BASE_URL, server_ip) + if module.params['wait_for_configured']: + try: + result, error = fetch_url_json_with_retries( + module, + url, + check_done_callback=firewall_configured, + check_done_delay=module.params['wait_delay'], + check_done_timeout=module.params['timeout'], + ) + except CheckDoneTimeoutException as dummy: + module.fail_json(msg='Timeout while waiting for firewall to be configured.') + else: + result, error = fetch_url_json(module, url) + if not firewall_configured(result, error): + module.fail_json(msg='Firewall configuration cannot be read as it is not configured.') + + full_before = result['firewall'] + if not full_before.get('rules'): + full_before['rules'] = create_default_rules_object() + before = restrict_firewall_config(full_before) + + # Build wanted (after) state and compare + after = dict(before) + changed = False + changed |= update(before, after, module.params, 'port') + changed |= update(before, after, module.params, 'status') + changed |= update(before, after, module.params, 'whitelist_hos') + after['rules'] = create_default_rules_object() + if module.params['status'] == 'active': + for ruleset in RULES: + changed |= update_rules(before, after, module.params, ruleset) + + # Update if different + construct_result = True + construct_status = None + if changed and not module.check_mode: + # https://robot.your-server.de/doc/webservice/en.html#post-firewall-server-ip + url = "{0}/firewall/{1}".format(BASE_URL, server_ip) + headers = {"Content-type": "application/x-www-form-urlencoded"} + data = dict(after) + data['whitelist_hos'] = str(data['whitelist_hos']).lower() + del data['rules'] + for ruleset in RULES: + encode_rule(data, ruleset, after) + result, error = fetch_url_json( + module, + url, + method='POST', + timeout=module.params['update_timeout'], + data=urlencode(data), + headers=headers, + ) + if module.params['wait_for_configured'] and not firewall_configured(result, error): + try: + result, error = fetch_url_json_with_retries( + module, + url, + check_done_callback=firewall_configured, + check_done_delay=module.params['wait_delay'], + check_done_timeout=module.params['timeout'], + skip_first=True, + ) + except CheckDoneTimeoutException as e: + result, error = e.result, e.error + module.warn('Timeout while waiting for firewall to be configured.') + + full_after = result['firewall'] + if not full_after.get('rules'): + full_after['rules'] = create_default_rules_object() + construct_status = full_after['status'] + if construct_status != 'in process': + # Only use result if configuration is done, so that diff will be ok + after = restrict_firewall_config(full_after) + construct_result = False + + if construct_result: + # Construct result (used for check mode, and configuration still in process) + full_after = dict(full_before) + for k, v in after.items(): + if k != 'rules': + full_after[k] = after[k] + if construct_status is not None: + # We want 'in process' here + full_after['status'] = construct_status + full_after['rules'] = dict() + for ruleset in RULES: + full_after['rules'][ruleset] = after['rules'][ruleset] + + module.exit_json( + changed=changed, + diff=dict( + before=before, + after=after, + ), + firewall=full_after, + ) + + +if __name__ == '__main__': + main() diff --git a/test/units/modules/net_tools/test_hetzner_firewall.py b/test/units/modules/net_tools/test_hetzner_firewall.py new file mode 100644 index 00000000000..baccca97a9e --- /dev/null +++ b/test/units/modules/net_tools/test_hetzner_firewall.py @@ -0,0 +1,1405 @@ +# (c) 2019 Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import pytest + +from ansible.module_utils.hetzner import BASE_URL +from ansible.modules.net_tools import hetzner_firewall + + +# ########################################################## +# ## General test framework + +import json + +from mock import MagicMock +from units.modules.utils import set_module_args +from ansible.module_utils.six.moves.urllib.parse import parse_qs + + +class FetchUrlCall: + def __init__(self, method, status): + assert method == method.upper(), \ + 'HTTP method names are case-sensitive and should be upper-case (RFCs 7230 and 7231)' + self.method = method + self.status = status + self.body = None + self.headers = {} + self.error_data = {} + self.expected_url = None + self.expected_headers = {} + self.form_parse = False + self.form_present = set() + self.form_values = {} + self.form_values_one = {} + + def result(self, body): + self.body = body + assert self.error_data.get('body') is None, 'Error body must not be given' + return self + + def result_str(self, str_body): + return self.result(str_body.encode('utf-8')) + + def result_json(self, json_body): + return self.result(json.dumps(json_body).encode('utf-8')) + + def result_error(self, msg, body=None): + self.error_data['msg'] = msg + if body is not None: + self.error_data['body'] = body + assert self.body is None, 'Result must not be given if error body is provided' + return self + + def expect_url(self, url): + self.expected_url = url + return self + + def return_header(self, name, value): + assert value is not None + self.headers[name] = value + return self + + def expect_header(self, name, value): + self.expected_headers[name] = value + return self + + def expect_header_unset(self, name): + self.expected_headers[name] = None + return self + + def expect_form_present(self, key): + self.form_parse = True + self.form_present.append(key) + return self + + def expect_form_value(self, key, value): + self.form_parse = True + self.form_values[key] = [value] + return self + + def expect_form_value_absent(self, key): + self.form_parse = True + self.form_values[key] = [] + return self + + def expect_form_value_one_of(self, key, value): + self.form_parse = True + if key not in self.form_values_subset: + self.form_values_subset[key] = set() + self.form_values_subset[key].add(value) + return self + + +class FetchUrlProxy: + def __init__(self, calls): + self.calls = calls + self.index = 0 + + def _validate_form(self, call, data): + form = {} + if data is not None: + form = parse_qs(data, keep_blank_values=True) + for k in call.form_present: + assert k in form + for k, v in call.form_values.items(): + if len(v) == 0: + assert k not in form + else: + assert form[k] == v + for k, v in call.form_values_one.items(): + assert v <= set(form[k]) + + def _validate_headers(self, call, headers): + given_headers = {} + if headers is not None: + for k, v in headers.items(): + given_headers[k.lower()] = v + for k, v in call.expected_headers: + if v is None: + assert k.lower() not in given_headers, \ + 'Header "{0}" specified for fetch_url call, but should not be'.format(k) + else: + assert given_headers.get(k.lower()) == v, \ + 'Header "{0}" specified for fetch_url call, but with wrong value'.format(k) + + def __call__(self, module, url, data=None, headers=None, method=None, + use_proxy=True, force=False, last_mod_time=None, timeout=10, + use_gssapi=False, unix_socket=None, ca_path=None, cookies=None): + assert self.index < len(self.calls), 'Got more fetch_url calls than expected' + call = self.calls[self.index] + self.index += 1 + + # Validate call + assert method == call.method + if call.expected_url is not None: + assert url == call.expected_url, \ + 'Exepected URL does not match for fetch_url call' + if call.expected_headers: + self._validate_headers(call, headers) + if call.form_parse: + self._validate_form(call, data) + + # Compose result + info = dict(status=call.status) + for k, v in call.headers.items(): + info[k.lower()] = v + info.update(call.error_data) + res = object() + if call.body is not None: + res = MagicMock() + res.read = MagicMock(return_value=call.body) + return (res, info) + + def assert_is_done(self): + assert self.index == len(self.calls), 'Got less fetch_url calls than expected' + + +class ModuleExitException(Exception): + def __init__(self, kwargs): + self.kwargs = kwargs + + +class ModuleFailException(Exception): + def __init__(self, kwargs): + self.kwargs = kwargs + + +def run_module(mocker, arguments, fetch_url): + def exit_json(module, **kwargs): + module._return_formatted(kwargs) + raise ModuleExitException(kwargs) + + def fail_json(module, **kwargs): + module._return_formatted(kwargs) + raise ModuleFailException(kwargs) + + mocker.patch('ansible.module_utils.hetzner.fetch_url', fetch_url) + mocker.patch('ansible.module_utils.hetzner.time.sleep', lambda duration: None) + mocker.patch('ansible.modules.net_tools.hetzner_firewall.AnsibleModule.exit_json', exit_json) + mocker.patch('ansible.modules.net_tools.hetzner_firewall.AnsibleModule.fail_json', fail_json) + set_module_args(arguments) + hetzner_firewall.main() + + +def run_module_success(mocker, arguments, fetch_url_calls): + fetch_url = FetchUrlProxy(fetch_url_calls or []) + with pytest.raises(ModuleExitException) as e: + run_module(mocker, arguments, fetch_url) + fetch_url.assert_is_done() + return e.value.kwargs + + +def run_module_failed(mocker, arguments, fetch_url_calls): + fetch_url = FetchUrlProxy(fetch_url_calls or []) + with pytest.raises(ModuleFailException) as e: + run_module(mocker, arguments, fetch_url) + fetch_url.assert_is_done() + return e.value.kwargs + + +# ########################################################## +# ## Hetzner firewall tests + + +# Tests for state (absent and present) + + +def test_absent_idempotency(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +def test_absent_changed(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'disabled'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +def test_present_idempotency(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +def test_present_changed(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +# Tests for state (absent and present) with check mode + + +def test_absent_idempotency_check(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +def test_absent_changed_check(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +def test_present_idempotency_check(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +def test_present_changed_check(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +# Tests for port + + +def test_port_idempotency(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'port': 'main', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['port'] == 'main' + assert result['diff']['after']['port'] == 'main' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['port'] == 'main' + + +def test_port_changed(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'port': 'main', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'kvm', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('port', 'main'), + ]) + assert result['changed'] is True + assert result['diff']['before']['port'] == 'kvm' + assert result['diff']['after']['port'] == 'main' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['port'] == 'main' + + +# Tests for whitelist_hos + + +def test_whitelist_hos_idempotency(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'whitelist_hos': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['whitelist_hos'] is True + assert result['diff']['after']['whitelist_hos'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['whitelist_hos'] is True + + +def test_whitelist_hos_changed(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'whitelist_hos': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('whitelist_hos', 'true'), + ]) + assert result['changed'] is True + assert result['diff']['before']['whitelist_hos'] is False + assert result['diff']['after']['whitelist_hos'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['whitelist_hos'] is True + + +# Tests for wait_for_configured in getting status + + +def test_wait_get(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +def test_wait_get_timeout(mocker): + result = run_module_failed(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + 'timeout': 0, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'Timeout while waiting for firewall to be configured.' + + +def test_nowait_get(mocker): + result = run_module_failed(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': False, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'Firewall configuration cannot be read as it is not configured.' + + +# Tests for wait_for_configured in setting status + + +def test_wait_update(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'wait_for_configured': True, + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +def test_wait_update_timeout(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + 'timeout': 0, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'in process' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert 'Timeout while waiting for firewall to be configured.' in result['warnings'] + + +def test_nowait_update(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': False, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'in process' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + +# Idempotency checks: different amount of input rules + +def test_input_rule_len_change_0_1(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [ + { + 'ip_version': 'ipv4', + 'action': 'discard', + }, + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][0][name]') + .expect_form_value('rules[input][0][ip_version]', 'ipv4') + .expect_form_value_absent('rules[input][0][dst_ip]') + .expect_form_value_absent('rules[input][0][dst_port]') + .expect_form_value_absent('rules[input][0][src_ip]') + .expect_form_value_absent('rules[input][0][src_port]') + .expect_form_value_absent('rules[input][0][protocol]') + .expect_form_value_absent('rules[input][0][tcp_flags]') + .expect_form_value('rules[input][0][action]', 'discard') + .expect_form_value_absent('rules[input][1][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert len(result['diff']['after']['rules']['input']) == 1 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + + +def test_input_rule_len_change_1_0(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][0][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['input']) == 0 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 0 + + +def test_input_rule_len_change_1_2(mocker): + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [ + { + 'ip_version': 'ipv4', + 'dst_port': 80, + 'protocol': 'tcp', + 'action': 'accept', + }, + { + 'ip_version': 'ipv4', + 'action': 'discard', + }, + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': '80', + 'src_ip': None, + 'src_port': None, + 'protocol': 'tcp', + 'tcp_flags': None, + 'action': 'accept', + }, + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value('rules[input][0][action]', 'accept') + .expect_form_value('rules[input][1][action]', 'discard') + .expect_form_value_absent('rules[input][2][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['input']) == 2 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 2 + + +# Idempotency checks: change one value + + +def create_params(parameter, *values): + assert len(values) > 1 + result = [] + for i in range(1, len(values)): + result.append((parameter, values[i - 1], values[i])) + return result + + +def flatten(list_of_lists): + result = [] + for l in list_of_lists: + result.extend(l) + return result + + +@pytest.mark.parametrize("parameter, before, after", flatten([ + create_params('name', None, '', 'Test', 'Test', 'foo', '', None), + create_params('ip_version', 'ipv4', 'ipv4', 'ipv6', 'ipv6'), + create_params('dst_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None), + create_params('dst_port', None, '80', '80-443', '80-443', None), + create_params('src_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None), + create_params('src_port', None, '80', '80-443', '80-443', None), + create_params('protocol', None, 'tcp', 'tcp', 'udp', 'udp', None), + create_params('tcp_flags', None, 'syn', 'syn|fin', 'syn|fin', 'syn&fin', '', None), + create_params('action', 'accept', 'accept', 'discard', 'discard'), +])) +def test_input_rule_value_change(mocker, parameter, before, after): + input_call = { + 'ip_version': 'ipv4', + 'action': 'discard', + } + input_before = { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + input_after = { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + if after is not None: + input_call[parameter] = after + input_before[parameter] = before + input_after[parameter] = after + + calls = [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [input_before], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ] + + changed = (before != after) + if changed: + after_call = ( + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [input_after], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][1][action]') + ) + if parameter != 'ip_version': + after_call.expect_form_value('rules[input][0][ip_version]', 'ipv4') + if parameter != 'action': + after_call.expect_form_value('rules[input][0][action]', 'discard') + if after is not None: + after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after) + else: + after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter)) + calls.append(after_call) + + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [input_call], + }, + }, calls) + assert result['changed'] == changed + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['input']) == 1 + assert result['diff']['before']['rules']['input'][0][parameter] == before + assert result['diff']['after']['rules']['input'][0][parameter] == after + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + assert result['firewall']['rules']['input'][0][parameter] == after + + +# Idempotency checks: IP address normalization + + +@pytest.mark.parametrize("ip_version, parameter, before_normalized, after_normalized, after", [ + ('ipv4', 'src_ip', '1.2.3.4/32', '1.2.3.4/32', '1.2.3.4'), + ('ipv6', 'src_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3::4'), + ('ipv6', 'dst_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3:0::4'), + ('ipv6', 'dst_ip', '::/0', '::/0', '0:0::0/0'), +]) +def test_input_rule_ip_normalization(mocker, ip_version, parameter, before_normalized, after_normalized, after): + assert ip_version in ('ipv4', 'ipv6') + assert parameter in ('src_ip', 'dst_ip') + input_call = { + 'ip_version': ip_version, + 'action': 'discard', + } + input_before = { + 'name': None, + 'ip_version': ip_version, + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + input_after = { + 'name': None, + 'ip_version': ip_version, + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + if after is not None: + input_call[parameter] = after + input_before[parameter] = before_normalized + input_after[parameter] = after_normalized + + calls = [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [input_before], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ] + + changed = (before_normalized != after_normalized) + if changed: + after_call = ( + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [input_after], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][1][action]') + ) + after_call.expect_form_value('rules[input][0][ip_version]', ip_version) + after_call.expect_form_value('rules[input][0][action]', 'discard') + after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after_normalized) + calls.append(after_call) + + result = run_module_success(mocker, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [input_call], + }, + }, calls) + assert result['changed'] == changed + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['input']) == 1 + assert result['diff']['before']['rules']['input'][0][parameter] == before_normalized + assert result['diff']['after']['rules']['input'][0][parameter] == after_normalized + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + assert result['firewall']['rules']['input'][0][parameter] == after_normalized