mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1193 lines
37 KiB
Python
1193 lines
37 KiB
Python
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
from mock import patch
|
|
from ansible.module_utils import basic
|
|
from ansible.modules import iptables
|
|
from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
|
|
|
|
|
|
def get_bin_path(*args, **kwargs):
|
|
return "/sbin/iptables"
|
|
|
|
|
|
def get_iptables_version(iptables_path, module):
|
|
return "1.8.2"
|
|
|
|
|
|
class TestIptables(ModuleTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestIptables, self).setUp()
|
|
self.mock_get_bin_path = patch.object(basic.AnsibleModule, 'get_bin_path', get_bin_path)
|
|
self.mock_get_bin_path.start()
|
|
self.addCleanup(self.mock_get_bin_path.stop) # ensure that the patching is 'undone'
|
|
self.mock_get_iptables_version = patch.object(iptables, 'get_iptables_version', get_iptables_version)
|
|
self.mock_get_iptables_version.start()
|
|
self.addCleanup(self.mock_get_iptables_version.stop) # ensure that the patching is 'undone'
|
|
|
|
def test_without_required_parameters(self):
|
|
"""Failure must occurs when all parameters are missing"""
|
|
with self.assertRaises(AnsibleFailJson):
|
|
set_module_args({})
|
|
iptables.main()
|
|
|
|
def test_flush_table_without_chain(self):
|
|
"""Test flush without chain, flush the table"""
|
|
set_module_args({
|
|
'flush': True,
|
|
})
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.return_value = 0, '', '' # successful execution, no output
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args[0][0][0], '/sbin/iptables')
|
|
self.assertEqual(run_command.call_args[0][0][1], '-t')
|
|
self.assertEqual(run_command.call_args[0][0][2], 'filter')
|
|
self.assertEqual(run_command.call_args[0][0][3], '-F')
|
|
|
|
def test_flush_table_check_true(self):
|
|
"""Test flush without parameters and check == true"""
|
|
set_module_args({
|
|
'flush': True,
|
|
'_ansible_check_mode': True,
|
|
})
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.return_value = 0, '', '' # successful execution, no output
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 0)
|
|
|
|
# TODO ADD test flush table nat
|
|
# TODO ADD test flush with chain
|
|
# TODO ADD test flush with chain and table nat
|
|
|
|
def test_policy_table(self):
|
|
"""Test change policy of a chain"""
|
|
set_module_args({
|
|
'policy': 'ACCEPT',
|
|
'chain': 'INPUT',
|
|
})
|
|
commands_results = [
|
|
(0, 'Chain INPUT (policy DROP)\n', ''),
|
|
(0, '', '')
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 2)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-L',
|
|
'INPUT',
|
|
])
|
|
self.assertEqual(run_command.call_args_list[1][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-P',
|
|
'INPUT',
|
|
'ACCEPT',
|
|
])
|
|
|
|
def test_policy_table_no_change(self):
|
|
"""Test don't change policy of a chain if the policy is right"""
|
|
set_module_args({
|
|
'policy': 'ACCEPT',
|
|
'chain': 'INPUT',
|
|
})
|
|
commands_results = [
|
|
(0, 'Chain INPUT (policy ACCEPT)\n', ''),
|
|
(0, '', '')
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertFalse(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-L',
|
|
'INPUT',
|
|
])
|
|
|
|
def test_policy_table_changed_false(self):
|
|
"""Test flush without parameters and change == false"""
|
|
set_module_args({
|
|
'policy': 'ACCEPT',
|
|
'chain': 'INPUT',
|
|
'_ansible_check_mode': True,
|
|
})
|
|
commands_results = [
|
|
(0, 'Chain INPUT (policy DROP)\n', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-L',
|
|
'INPUT',
|
|
])
|
|
|
|
# TODO ADD test policy without chain fail
|
|
# TODO ADD test policy with chain don't exists
|
|
# TODO ADD test policy with wrong choice fail
|
|
|
|
def test_insert_rule_change_false(self):
|
|
"""Test flush without parameters"""
|
|
set_module_args({
|
|
'chain': 'OUTPUT',
|
|
'source': '1.2.3.4/32',
|
|
'destination': '7.8.9.10/42',
|
|
'jump': 'ACCEPT',
|
|
'action': 'insert',
|
|
'_ansible_check_mode': True,
|
|
})
|
|
|
|
commands_results = [
|
|
(1, '', ''), # check_rule_present
|
|
(0, '', ''), # check_chain_present
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 2)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'OUTPUT',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'ACCEPT'
|
|
])
|
|
|
|
def test_insert_rule(self):
|
|
"""Test flush without parameters"""
|
|
set_module_args({
|
|
'chain': 'OUTPUT',
|
|
'source': '1.2.3.4/32',
|
|
'destination': '7.8.9.10/42',
|
|
'jump': 'ACCEPT',
|
|
'action': 'insert'
|
|
})
|
|
|
|
commands_results = [
|
|
(1, '', ''), # check_rule_present
|
|
(0, '', ''), # check_chain_present
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 3)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'OUTPUT',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'ACCEPT'
|
|
])
|
|
self.assertEqual(run_command.call_args_list[2][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-I',
|
|
'OUTPUT',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'ACCEPT'
|
|
])
|
|
|
|
def test_append_rule_check_mode(self):
|
|
"""Test append a redirection rule in check mode"""
|
|
set_module_args({
|
|
'chain': 'PREROUTING',
|
|
'source': '1.2.3.4/32',
|
|
'destination': '7.8.9.10/42',
|
|
'jump': 'REDIRECT',
|
|
'table': 'nat',
|
|
'to_destination': '5.5.5.5/32',
|
|
'protocol': 'udp',
|
|
'destination_port': '22',
|
|
'to_ports': '8600',
|
|
'_ansible_check_mode': True,
|
|
})
|
|
|
|
commands_results = [
|
|
(1, '', ''), # check_rule_present
|
|
(0, '', ''), # check_chain_present
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 2)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'nat',
|
|
'-C',
|
|
'PREROUTING',
|
|
'-p',
|
|
'udp',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'REDIRECT',
|
|
'--to-destination',
|
|
'5.5.5.5/32',
|
|
'--destination-port',
|
|
'22',
|
|
'--to-ports',
|
|
'8600'
|
|
])
|
|
|
|
def test_append_rule(self):
|
|
"""Test append a redirection rule"""
|
|
set_module_args({
|
|
'chain': 'PREROUTING',
|
|
'source': '1.2.3.4/32',
|
|
'destination': '7.8.9.10/42',
|
|
'jump': 'REDIRECT',
|
|
'table': 'nat',
|
|
'to_destination': '5.5.5.5/32',
|
|
'protocol': 'udp',
|
|
'destination_port': '22',
|
|
'to_ports': '8600'
|
|
})
|
|
|
|
commands_results = [
|
|
(1, '', ''), # check_rule_present
|
|
(0, '', ''), # check_chain_present
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 3)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'nat',
|
|
'-C',
|
|
'PREROUTING',
|
|
'-p',
|
|
'udp',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'REDIRECT',
|
|
'--to-destination',
|
|
'5.5.5.5/32',
|
|
'--destination-port',
|
|
'22',
|
|
'--to-ports',
|
|
'8600'
|
|
])
|
|
self.assertEqual(run_command.call_args_list[2][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'nat',
|
|
'-A',
|
|
'PREROUTING',
|
|
'-p',
|
|
'udp',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'REDIRECT',
|
|
'--to-destination',
|
|
'5.5.5.5/32',
|
|
'--destination-port',
|
|
'22',
|
|
'--to-ports',
|
|
'8600'
|
|
])
|
|
|
|
def test_remove_rule(self):
|
|
"""Test flush without parameters"""
|
|
set_module_args({
|
|
'chain': 'PREROUTING',
|
|
'source': '1.2.3.4/32',
|
|
'destination': '7.8.9.10/42',
|
|
'jump': 'SNAT',
|
|
'table': 'nat',
|
|
'to_source': '5.5.5.5/32',
|
|
'protocol': 'udp',
|
|
'source_port': '22',
|
|
'to_ports': '8600',
|
|
'state': 'absent',
|
|
'in_interface': 'eth0',
|
|
'out_interface': 'eth1',
|
|
'comment': 'this is a comment'
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''),
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 2)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'nat',
|
|
'-C',
|
|
'PREROUTING',
|
|
'-p',
|
|
'udp',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'SNAT',
|
|
'--to-source',
|
|
'5.5.5.5/32',
|
|
'-i',
|
|
'eth0',
|
|
'-o',
|
|
'eth1',
|
|
'--source-port',
|
|
'22',
|
|
'--to-ports',
|
|
'8600',
|
|
'-m',
|
|
'comment',
|
|
'--comment',
|
|
'this is a comment'
|
|
])
|
|
self.assertEqual(run_command.call_args_list[1][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'nat',
|
|
'-D',
|
|
'PREROUTING',
|
|
'-p',
|
|
'udp',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'SNAT',
|
|
'--to-source',
|
|
'5.5.5.5/32',
|
|
'-i',
|
|
'eth0',
|
|
'-o',
|
|
'eth1',
|
|
'--source-port',
|
|
'22',
|
|
'--to-ports',
|
|
'8600',
|
|
'-m',
|
|
'comment',
|
|
'--comment',
|
|
'this is a comment'
|
|
])
|
|
|
|
def test_remove_rule_check_mode(self):
|
|
"""Test flush without parameters check mode"""
|
|
set_module_args({
|
|
'chain': 'PREROUTING',
|
|
'source': '1.2.3.4/32',
|
|
'destination': '7.8.9.10/42',
|
|
'jump': 'SNAT',
|
|
'table': 'nat',
|
|
'to_source': '5.5.5.5/32',
|
|
'protocol': 'udp',
|
|
'source_port': '22',
|
|
'to_ports': '8600',
|
|
'state': 'absent',
|
|
'in_interface': 'eth0',
|
|
'out_interface': 'eth1',
|
|
'comment': 'this is a comment',
|
|
'_ansible_check_mode': True,
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'nat',
|
|
'-C',
|
|
'PREROUTING',
|
|
'-p',
|
|
'udp',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'SNAT',
|
|
'--to-source',
|
|
'5.5.5.5/32',
|
|
'-i',
|
|
'eth0',
|
|
'-o',
|
|
'eth1',
|
|
'--source-port',
|
|
'22',
|
|
'--to-ports',
|
|
'8600',
|
|
'-m',
|
|
'comment',
|
|
'--comment',
|
|
'this is a comment'
|
|
])
|
|
|
|
def test_insert_with_reject(self):
|
|
""" Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'protocol': 'tcp',
|
|
'reject_with': 'tcp-reset',
|
|
'ip_version': 'ipv4',
|
|
})
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'INPUT',
|
|
'-p',
|
|
'tcp',
|
|
'-j',
|
|
'REJECT',
|
|
'--reject-with',
|
|
'tcp-reset',
|
|
])
|
|
|
|
def test_insert_jump_reject_with_reject(self):
|
|
""" Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'protocol': 'tcp',
|
|
'jump': 'REJECT',
|
|
'reject_with': 'tcp-reset',
|
|
'ip_version': 'ipv4',
|
|
})
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'INPUT',
|
|
'-p',
|
|
'tcp',
|
|
'-j',
|
|
'REJECT',
|
|
'--reject-with',
|
|
'tcp-reset',
|
|
])
|
|
|
|
def test_jump_tee_gateway_negative(self):
|
|
""" Missing gateway when JUMP is set to TEE """
|
|
set_module_args({
|
|
'table': 'mangle',
|
|
'chain': 'PREROUTING',
|
|
'in_interface': 'eth0',
|
|
'protocol': 'udp',
|
|
'match': 'state',
|
|
'jump': 'TEE',
|
|
'ctstate': ['NEW'],
|
|
'destination_port': '9521',
|
|
'destination': '127.0.0.1'
|
|
})
|
|
|
|
with self.assertRaises(AnsibleFailJson) as e:
|
|
iptables.main()
|
|
self.assertTrue(e.exception.args[0]['failed'])
|
|
self.assertEqual(e.exception.args[0]['msg'], 'jump is TEE but all of the following are missing: gateway')
|
|
|
|
def test_jump_tee_gateway(self):
|
|
""" Using gateway when JUMP is set to TEE """
|
|
set_module_args({
|
|
'table': 'mangle',
|
|
'chain': 'PREROUTING',
|
|
'in_interface': 'eth0',
|
|
'protocol': 'udp',
|
|
'match': 'state',
|
|
'jump': 'TEE',
|
|
'ctstate': ['NEW'],
|
|
'destination_port': '9521',
|
|
'gateway': '192.168.10.1',
|
|
'destination': '127.0.0.1'
|
|
})
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'mangle',
|
|
'-C', 'PREROUTING',
|
|
'-p', 'udp',
|
|
'-d', '127.0.0.1',
|
|
'-m', 'state',
|
|
'-j', 'TEE',
|
|
'--gateway', '192.168.10.1',
|
|
'-i', 'eth0',
|
|
'--destination-port', '9521',
|
|
'--state', 'NEW'
|
|
])
|
|
|
|
def test_tcp_flags(self):
|
|
""" Test various ways of inputting tcp_flags """
|
|
args = [
|
|
{
|
|
'chain': 'OUTPUT',
|
|
'protocol': 'tcp',
|
|
'jump': 'DROP',
|
|
'tcp_flags': 'flags=ALL flags_set="ACK,RST,SYN,FIN"'
|
|
},
|
|
{
|
|
'chain': 'OUTPUT',
|
|
'protocol': 'tcp',
|
|
'jump': 'DROP',
|
|
'tcp_flags': {
|
|
'flags': 'ALL',
|
|
'flags_set': 'ACK,RST,SYN,FIN'
|
|
}
|
|
},
|
|
{
|
|
'chain': 'OUTPUT',
|
|
'protocol': 'tcp',
|
|
'jump': 'DROP',
|
|
'tcp_flags': {
|
|
'flags': ['ALL'],
|
|
'flags_set': ['ACK', 'RST', 'SYN', 'FIN']
|
|
}
|
|
},
|
|
|
|
]
|
|
|
|
for item in args:
|
|
set_module_args(item)
|
|
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'OUTPUT',
|
|
'-p',
|
|
'tcp',
|
|
'--tcp-flags',
|
|
'ALL',
|
|
'ACK,RST,SYN,FIN',
|
|
'-j',
|
|
'DROP'
|
|
])
|
|
|
|
def test_log_level(self):
|
|
""" Test various ways of log level flag """
|
|
|
|
log_levels = ['0', '1', '2', '3', '4', '5', '6', '7',
|
|
'emerg', 'alert', 'crit', 'error', 'warning', 'notice', 'info', 'debug']
|
|
|
|
for log_lvl in log_levels:
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'jump': 'LOG',
|
|
'log_level': log_lvl,
|
|
'source': '1.2.3.4/32',
|
|
'log_prefix': '** DROP-this_ip **'
|
|
})
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-C', 'INPUT',
|
|
'-s', '1.2.3.4/32',
|
|
'-j', 'LOG',
|
|
'--log-prefix', '** DROP-this_ip **',
|
|
'--log-level', log_lvl
|
|
])
|
|
|
|
def test_iprange(self):
|
|
""" Test iprange module with its flags src_range and dst_range """
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'match': ['iprange'],
|
|
'src_range': '192.168.1.100-192.168.1.199',
|
|
'jump': 'ACCEPT'
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'INPUT',
|
|
'-m',
|
|
'iprange',
|
|
'-j',
|
|
'ACCEPT',
|
|
'--src-range',
|
|
'192.168.1.100-192.168.1.199',
|
|
])
|
|
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'src_range': '192.168.1.100-192.168.1.199',
|
|
'dst_range': '10.0.0.50-10.0.0.100',
|
|
'jump': 'ACCEPT'
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'INPUT',
|
|
'-j',
|
|
'ACCEPT',
|
|
'-m',
|
|
'iprange',
|
|
'--src-range',
|
|
'192.168.1.100-192.168.1.199',
|
|
'--dst-range',
|
|
'10.0.0.50-10.0.0.100'
|
|
])
|
|
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'dst_range': '10.0.0.50-10.0.0.100',
|
|
'jump': 'ACCEPT'
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'INPUT',
|
|
'-j',
|
|
'ACCEPT',
|
|
'-m',
|
|
'iprange',
|
|
'--dst-range',
|
|
'10.0.0.50-10.0.0.100'
|
|
])
|
|
|
|
def test_insert_rule_with_wait(self):
|
|
"""Test flush without parameters"""
|
|
set_module_args({
|
|
'chain': 'OUTPUT',
|
|
'source': '1.2.3.4/32',
|
|
'destination': '7.8.9.10/42',
|
|
'jump': 'ACCEPT',
|
|
'action': 'insert',
|
|
'wait': '10'
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'OUTPUT',
|
|
'-w',
|
|
'10',
|
|
'-s',
|
|
'1.2.3.4/32',
|
|
'-d',
|
|
'7.8.9.10/42',
|
|
'-j',
|
|
'ACCEPT'
|
|
])
|
|
|
|
def test_comment_position_at_end(self):
|
|
"""Test comment position to make sure it is at the end of command"""
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'jump': 'ACCEPT',
|
|
'action': 'insert',
|
|
'ctstate': ['NEW'],
|
|
'comment': 'this is a comment',
|
|
'_ansible_check_mode': True,
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t',
|
|
'filter',
|
|
'-C',
|
|
'INPUT',
|
|
'-j',
|
|
'ACCEPT',
|
|
'-m',
|
|
'conntrack',
|
|
'--ctstate',
|
|
'NEW',
|
|
'-m',
|
|
'comment',
|
|
'--comment',
|
|
'this is a comment'
|
|
])
|
|
self.assertEqual(run_command.call_args[0][0][14], 'this is a comment')
|
|
|
|
def test_destination_ports(self):
|
|
""" Test multiport module usage with multiple ports """
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'protocol': 'tcp',
|
|
'in_interface': 'eth0',
|
|
'source': '192.168.0.1/32',
|
|
'destination_ports': ['80', '443', '8081:8085'],
|
|
'jump': 'ACCEPT',
|
|
'comment': 'this is a comment',
|
|
})
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-C', 'INPUT',
|
|
'-p', 'tcp',
|
|
'-s', '192.168.0.1/32',
|
|
'-j', 'ACCEPT',
|
|
'-m', 'multiport',
|
|
'--dports', '80,443,8081:8085',
|
|
'-i', 'eth0',
|
|
'-m', 'comment',
|
|
'--comment', 'this is a comment'
|
|
])
|
|
|
|
def test_match_set(self):
|
|
""" Test match_set together with match_set_flags """
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'protocol': 'tcp',
|
|
'match_set': 'admin_hosts',
|
|
'match_set_flags': 'src',
|
|
'destination_port': '22',
|
|
'jump': 'ACCEPT',
|
|
'comment': 'this is a comment',
|
|
})
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-C', 'INPUT',
|
|
'-p', 'tcp',
|
|
'-j', 'ACCEPT',
|
|
'--destination-port', '22',
|
|
'-m', 'set',
|
|
'--match-set', 'admin_hosts', 'src',
|
|
'-m', 'comment',
|
|
'--comment', 'this is a comment'
|
|
])
|
|
|
|
set_module_args({
|
|
'chain': 'INPUT',
|
|
'protocol': 'udp',
|
|
'match_set': 'banned_hosts',
|
|
'match_set_flags': 'src,dst',
|
|
'jump': 'REJECT',
|
|
})
|
|
commands_results = [
|
|
(0, '', ''),
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-C', 'INPUT',
|
|
'-p', 'udp',
|
|
'-j', 'REJECT',
|
|
'-m', 'set',
|
|
'--match-set', 'banned_hosts', 'src,dst'
|
|
])
|
|
|
|
def test_chain_creation(self):
|
|
"""Test chain creation when absent"""
|
|
set_module_args({
|
|
'chain': 'FOOBAR',
|
|
'state': 'present',
|
|
'chain_management': True,
|
|
})
|
|
|
|
commands_results = [
|
|
(1, '', ''), # check_rule_present
|
|
(1, '', ''), # check_chain_present
|
|
(0, '', ''), # create_chain
|
|
(0, '', ''), # append_rule
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 4)
|
|
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-C', 'FOOBAR',
|
|
])
|
|
|
|
self.assertEqual(run_command.call_args_list[1][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-L', 'FOOBAR',
|
|
])
|
|
|
|
self.assertEqual(run_command.call_args_list[2][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-N', 'FOOBAR',
|
|
])
|
|
|
|
self.assertEqual(run_command.call_args_list[3][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-A', 'FOOBAR',
|
|
])
|
|
|
|
commands_results = [
|
|
(0, '', ''), # check_rule_present
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertFalse(result.exception.args[0]['changed'])
|
|
|
|
def test_chain_creation_check_mode(self):
|
|
"""Test chain creation when absent"""
|
|
set_module_args({
|
|
'chain': 'FOOBAR',
|
|
'state': 'present',
|
|
'chain_management': True,
|
|
'_ansible_check_mode': True,
|
|
})
|
|
|
|
commands_results = [
|
|
(1, '', ''), # check_rule_present
|
|
(1, '', ''), # check_chain_present
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 2)
|
|
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-C', 'FOOBAR',
|
|
])
|
|
|
|
self.assertEqual(run_command.call_args_list[1][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-L', 'FOOBAR',
|
|
])
|
|
|
|
commands_results = [
|
|
(0, '', ''), # check_rule_present
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertFalse(result.exception.args[0]['changed'])
|
|
|
|
def test_chain_deletion(self):
|
|
"""Test chain deletion when present"""
|
|
set_module_args({
|
|
'chain': 'FOOBAR',
|
|
'state': 'absent',
|
|
'chain_management': True,
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''), # check_chain_present
|
|
(0, '', ''), # delete_chain
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 2)
|
|
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-L', 'FOOBAR',
|
|
])
|
|
|
|
self.assertEqual(run_command.call_args_list[1][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-X', 'FOOBAR',
|
|
])
|
|
|
|
commands_results = [
|
|
(1, '', ''), # check_rule_present
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertFalse(result.exception.args[0]['changed'])
|
|
|
|
def test_chain_deletion_check_mode(self):
|
|
"""Test chain deletion when present"""
|
|
set_module_args({
|
|
'chain': 'FOOBAR',
|
|
'state': 'absent',
|
|
'chain_management': True,
|
|
'_ansible_check_mode': True,
|
|
})
|
|
|
|
commands_results = [
|
|
(0, '', ''), # check_chain_present
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertTrue(result.exception.args[0]['changed'])
|
|
|
|
self.assertEqual(run_command.call_count, 1)
|
|
|
|
self.assertEqual(run_command.call_args_list[0][0][0], [
|
|
'/sbin/iptables',
|
|
'-t', 'filter',
|
|
'-L', 'FOOBAR',
|
|
])
|
|
|
|
commands_results = [
|
|
(1, '', ''), # check_rule_present
|
|
]
|
|
|
|
with patch.object(basic.AnsibleModule, 'run_command') as run_command:
|
|
run_command.side_effect = commands_results
|
|
with self.assertRaises(AnsibleExitJson) as result:
|
|
iptables.main()
|
|
self.assertFalse(result.exception.args[0]['changed'])
|