Add hetzner_firewall_info module. (#65421)

pull/65730/head
Felix Fontein 5 years ago committed by Abhijit Menon-Sen
parent c59e061cff
commit 95c85b395c

@ -0,0 +1,230 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# (c) 2019 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = r'''
---
module: hetzner_firewall_info
version_added: "2.10"
short_description: Manage Hetzner's dedicated server firewall
author:
- Felix Fontein (@felixfontein)
description:
- Manage Hetzner's dedicated server firewall.
seealso:
- name: Firewall documentation
description: Hetzner's documentation on the stateless firewall for dedicated servers
link: https://wiki.hetzner.de/index.php/Robot_Firewall/en
- module: hetzner_firewall
description: Configure firewall.
extends_documentation_fragment:
- hetzner
options:
server_ip:
description: The server's main IP address.
type: str
required: yes
wait_for_configured:
description:
- Whether to wait until the firewall has been successfully configured before
determining what to do, and before returning from the module.
- The API returns status C(in progress) when the firewall is currently
being configured. If this happens, the module will try again until
the status changes to C(active) or C(disabled).
- Please note that there is a request limit. If you have to do multiple
updates, it can be better to disable waiting, and regularly use
M(hetzner_firewall_info) to query status.
type: bool
default: yes
wait_delay:
description:
- Delay to wait (in seconds) before checking again whether the firewall has
been configured.
type: int
default: 10
timeout:
description:
- Timeout (in seconds) for waiting for firewall to be configured.
type: int
default: 180
'''
EXAMPLES = r'''
- name: Get firewall configuration for server with main IP 1.2.3.4
hetzner_firewall_info:
hetzner_user: foo
hetzner_password: bar
server_ip: 1.2.3.4
register: result
- debug:
msg: "{{ result.firewall }}"
'''
RETURN = r'''
firewall:
description:
- The firewall configuration.
type: dict
returned: success
contains:
port:
description:
- Switch port of firewall.
- C(main) or C(kvm).
type: str
sample: main
server_ip:
description:
- Server's main IP address.
type: str
sample: 1.2.3.4
server_number:
description:
- Hetzner's internal server number.
type: int
sample: 12345
status:
description:
- Status of the firewall.
- C(active) or C(disabled).
- Will be C(in process) if the firewall is currently updated, and
I(wait_for_configured) is set to C(no) or I(timeout) to a too small value.
type: str
sample: active
whitelist_hos:
description:
- Whether Hetzner services have access.
type: bool
sample: true
rules:
description:
- Firewall rules.
type: dict
contains:
input:
description:
- Input firewall rules.
type: list
elements: dict
contains:
name:
description:
- Name of the firewall rule.
type: str
sample: Allow HTTP access to server
ip_version:
description:
- Internet protocol version.
type: str
sample: ipv4
dst_ip:
description:
- Destination IP address or subnet address.
- CIDR notation.
type: str
sample: 1.2.3.4/32
dst_port:
description:
- Destination port or port range.
type: str
sample: "443"
src_ip:
description:
- Source IP address or subnet address.
- CIDR notation.
type: str
sample: null
src_port:
description:
- Source port or port range.
type: str
sample: null
protocol:
description:
- Protocol above IP layer
type: str
sample: tcp
tcp_flags:
description:
- TCP flags or logical combination of flags.
type: str
sample: null
action:
description:
- Action if rule matches.
- C(accept) or C(discard).
type: str
sample: accept
'''
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.hetzner import (
HETZNER_DEFAULT_ARGUMENT_SPEC,
BASE_URL,
fetch_url_json,
fetch_url_json_with_retries,
CheckDoneTimeoutException,
)
def firewall_configured(result, error):
return result['firewall']['status'] != 'in process'
def main():
argument_spec = dict(
server_ip=dict(type='str', required=True),
wait_for_configured=dict(type='bool', default=True),
wait_delay=dict(type='int', default=10),
timeout=dict(type='int', default=180),
)
argument_spec.update(HETZNER_DEFAULT_ARGUMENT_SPEC)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
)
server_ip = module.params['server_ip']
# https://robot.your-server.de/doc/webservice/en.html#get-firewall-server-ip
url = "{0}/firewall/{1}".format(BASE_URL, server_ip)
if module.params['wait_for_configured']:
try:
result, error = fetch_url_json_with_retries(
module,
url,
check_done_callback=firewall_configured,
check_done_delay=module.params['wait_delay'],
check_done_timeout=module.params['timeout'],
)
except CheckDoneTimeoutException as dummy:
module.fail_json(msg='Timeout while waiting for firewall to be configured.')
else:
result, error = fetch_url_json(module, url)
firewall = result['firewall']
if not firewall.get('rules'):
firewall['rules'] = dict()
for ruleset in ['input']:
firewall['rules'][ruleset] = []
module.exit_json(
changed=False,
firewall=firewall,
)
if __name__ == '__main__':
main()

@ -169,7 +169,7 @@ class ModuleFailException(Exception):
self.kwargs = kwargs
def run_module(mocker, arguments, fetch_url):
def run_module(mocker, module, arguments, fetch_url):
def exit_json(module, **kwargs):
module._return_formatted(kwargs)
raise ModuleExitException(kwargs)
@ -183,21 +183,21 @@ def run_module(mocker, arguments, fetch_url):
mocker.patch('ansible.modules.net_tools.hetzner_firewall.AnsibleModule.exit_json', exit_json)
mocker.patch('ansible.modules.net_tools.hetzner_firewall.AnsibleModule.fail_json', fail_json)
set_module_args(arguments)
hetzner_firewall.main()
module.main()
def run_module_success(mocker, arguments, fetch_url_calls):
def run_module_success(mocker, module, arguments, fetch_url_calls):
fetch_url = FetchUrlProxy(fetch_url_calls or [])
with pytest.raises(ModuleExitException) as e:
run_module(mocker, arguments, fetch_url)
run_module(mocker, module, arguments, fetch_url)
fetch_url.assert_is_done()
return e.value.kwargs
def run_module_failed(mocker, arguments, fetch_url_calls):
def run_module_failed(mocker, module, arguments, fetch_url_calls):
fetch_url = FetchUrlProxy(fetch_url_calls or [])
with pytest.raises(ModuleFailException) as e:
run_module(mocker, arguments, fetch_url)
run_module(mocker, module, arguments, fetch_url)
fetch_url.assert_is_done()
return e.value.kwargs
@ -210,7 +210,7 @@ def run_module_failed(mocker, arguments, fetch_url_calls):
def test_absent_idempotency(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -240,7 +240,7 @@ def test_absent_idempotency(mocker):
def test_absent_changed(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -285,7 +285,7 @@ def test_absent_changed(mocker):
def test_present_idempotency(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -315,7 +315,7 @@ def test_present_idempotency(mocker):
def test_present_changed(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -363,7 +363,7 @@ def test_present_changed(mocker):
def test_absent_idempotency_check(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -394,7 +394,7 @@ def test_absent_idempotency_check(mocker):
def test_absent_changed_check(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -425,7 +425,7 @@ def test_absent_changed_check(mocker):
def test_present_idempotency_check(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -456,7 +456,7 @@ def test_present_idempotency_check(mocker):
def test_present_changed_check(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -490,7 +490,7 @@ def test_present_changed_check(mocker):
def test_port_idempotency(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -522,7 +522,7 @@ def test_port_idempotency(mocker):
def test_port_changed(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -572,7 +572,7 @@ def test_port_changed(mocker):
def test_whitelist_hos_idempotency(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -604,7 +604,7 @@ def test_whitelist_hos_idempotency(mocker):
def test_whitelist_hos_changed(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -654,7 +654,7 @@ def test_whitelist_hos_changed(mocker):
def test_wait_get(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -699,7 +699,7 @@ def test_wait_get(mocker):
def test_wait_get_timeout(mocker):
result = run_module_failed(mocker, {
result = run_module_failed(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -740,7 +740,7 @@ def test_wait_get_timeout(mocker):
def test_nowait_get(mocker):
result = run_module_failed(mocker, {
result = run_module_failed(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -769,7 +769,7 @@ def test_nowait_get(mocker):
def test_wait_update(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -828,7 +828,7 @@ def test_wait_update(mocker):
def test_wait_update_timeout(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -889,7 +889,7 @@ def test_wait_update_timeout(mocker):
def test_nowait_update(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -936,7 +936,7 @@ def test_nowait_update(mocker):
# Idempotency checks: different amount of input rules
def test_input_rule_len_change_0_1(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -1012,7 +1012,7 @@ def test_input_rule_len_change_0_1(mocker):
def test_input_rule_len_change_1_0(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -1073,7 +1073,7 @@ def test_input_rule_len_change_1_0(mocker):
def test_input_rule_len_change_1_2(mocker):
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -1278,7 +1278,7 @@ def test_input_rule_value_change(mocker, parameter, before, after):
after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter))
calls.append(after_call)
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
@ -1384,7 +1384,7 @@ def test_input_rule_ip_normalization(mocker, ip_version, parameter, before_norma
after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after_normalized)
calls.append(after_call)
result = run_module_success(mocker, {
result = run_module_success(mocker, hetzner_firewall, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',

@ -0,0 +1,239 @@
# (c) 2019 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible.module_utils.hetzner import BASE_URL
from ansible.modules.net_tools import hetzner_firewall_info
from .test_hetzner_firewall import FetchUrlCall, run_module_success, run_module_failed
# Tests for state (absent and present)
def test_absent(mocker):
result = run_module_success(mocker, hetzner_firewall_info, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
}, [
FetchUrlCall('GET', 200)
.result_json({
'firewall': {
'server_ip': '1.2.3.4',
'server_number': 1,
'status': 'disabled',
'whitelist_hos': False,
'port': 'main',
'rules': {
'input': [],
},
},
})
.expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
])
assert result['changed'] is False
assert result['firewall']['status'] == 'disabled'
assert result['firewall']['server_ip'] == '1.2.3.4'
assert result['firewall']['server_number'] == 1
def test_present(mocker):
result = run_module_success(mocker, hetzner_firewall_info, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
}, [
FetchUrlCall('GET', 200)
.result_json({
'firewall': {
'server_ip': '1.2.3.4',
'server_number': 1,
'status': 'active',
'whitelist_hos': False,
'port': 'main',
'rules': {
'input': [],
},
},
})
.expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
])
assert result['changed'] is False
assert result['firewall']['status'] == 'active'
assert result['firewall']['server_ip'] == '1.2.3.4'
assert result['firewall']['server_number'] == 1
assert len(result['firewall']['rules']['input']) == 0
def test_present_w_rules(mocker):
result = run_module_success(mocker, hetzner_firewall_info, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
}, [
FetchUrlCall('GET', 200)
.result_json({
'firewall': {
'server_ip': '1.2.3.4',
'server_number': 1,
'status': 'active',
'whitelist_hos': False,
'port': 'main',
'rules': {
'input': [
{
'name': 'Accept HTTPS traffic',
'ip_version': 'ipv4',
'dst_ip': None,
'dst_port': '443',
'src_ip': None,
'src_port': None,
'protocol': 'tcp',
'tcp_flags': None,
'action': 'accept',
},
{
'name': None,
'ip_version': 'ipv4',
'dst_ip': None,
'dst_port': None,
'src_ip': None,
'src_port': None,
'protocol': None,
'tcp_flags': None,
'action': 'discard',
}
],
},
},
})
.expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
])
assert result['changed'] is False
assert result['firewall']['status'] == 'active'
assert result['firewall']['server_ip'] == '1.2.3.4'
assert result['firewall']['server_number'] == 1
assert len(result['firewall']['rules']['input']) == 2
assert result['firewall']['rules']['input'][0]['name'] == 'Accept HTTPS traffic'
assert result['firewall']['rules']['input'][0]['dst_port'] == '443'
assert result['firewall']['rules']['input'][0]['action'] == 'accept'
assert result['firewall']['rules']['input'][1]['dst_port'] is None
assert result['firewall']['rules']['input'][1]['action'] == 'discard'
# Tests for wait_for_configured in getting status
def test_wait_get(mocker):
result = run_module_success(mocker, hetzner_firewall_info, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
'wait_for_configured': True,
}, [
FetchUrlCall('GET', 200)
.result_json({
'firewall': {
'server_ip': '1.2.3.4',
'server_number': 1,
'status': 'in process',
'whitelist_hos': False,
'port': 'main',
'rules': {
'input': [],
},
},
})
.expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
FetchUrlCall('GET', 200)
.result_json({
'firewall': {
'server_ip': '1.2.3.4',
'server_number': 1,
'status': 'active',
'whitelist_hos': False,
'port': 'main',
'rules': {
'input': [],
},
},
})
.expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
])
assert result['changed'] is False
assert result['firewall']['status'] == 'active'
assert result['firewall']['server_ip'] == '1.2.3.4'
assert result['firewall']['server_number'] == 1
def test_wait_get_timeout(mocker):
result = run_module_failed(mocker, hetzner_firewall_info, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
'wait_for_configured': True,
'timeout': 0,
}, [
FetchUrlCall('GET', 200)
.result_json({
'firewall': {
'server_ip': '1.2.3.4',
'server_number': 1,
'status': 'in process',
'whitelist_hos': False,
'port': 'main',
'rules': {
'input': [],
},
},
})
.expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
FetchUrlCall('GET', 200)
.result_json({
'firewall': {
'server_ip': '1.2.3.4',
'server_number': 1,
'status': 'in process',
'whitelist_hos': False,
'port': 'main',
'rules': {
'input': [],
},
},
})
.expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
])
assert result['msg'] == 'Timeout while waiting for firewall to be configured.'
def test_nowait_get(mocker):
result = run_module_success(mocker, hetzner_firewall_info, {
'hetzner_user': '',
'hetzner_password': '',
'server_ip': '1.2.3.4',
'wait_for_configured': False,
}, [
FetchUrlCall('GET', 200)
.result_json({
'firewall': {
'server_ip': '1.2.3.4',
'server_number': 1,
'status': 'in process',
'whitelist_hos': False,
'port': 'main',
'rules': {
'input': [],
},
},
})
.expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
])
assert result['changed'] is False
assert result['firewall']['status'] == 'in process'
assert result['firewall']['server_ip'] == '1.2.3.4'
assert result['firewall']['server_number'] == 1
Loading…
Cancel
Save