Enable logging to any domain in the check point machine (#63976)

* enable using any domain in the check point machine

* Update checkpoint.py

* trying to checge `test_chrckpoint` according to `test_ftd` in order to pass the tests

* Update test_checkpoint.py
pull/64436/head
chkp-orso 5 years ago committed by Sumit Jaiswal
parent 48e132b9c3
commit 3a0c6454a9

@ -14,6 +14,13 @@ description:
- This HttpApi plugin provides methods to connect to Checkpoint - This HttpApi plugin provides methods to connect to Checkpoint
devices over a HTTP(S)-based api. devices over a HTTP(S)-based api.
version_added: "2.8" version_added: "2.8"
options:
domain:
type: str
description:
- Specifies the domain of the Check Point device
vars:
- name: ansible_checkpoint_domain
""" """
import json import json
@ -32,7 +39,11 @@ BASE_HEADERS = {
class HttpApi(HttpApiBase): class HttpApi(HttpApiBase):
def login(self, username, password): def login(self, username, password):
if username and password: if username and password:
payload = {'user': username, 'password': password} cp_domain = self.get_option('domain')
if cp_domain:
payload = {'user': username, 'password': password, 'domain': cp_domain}
else:
payload = {'user': username, 'password': password}
url = '/web_api/login' url = '/web_api/login'
response, response_data = self.send_request(url, payload) response, response_data = self.send_request(url, payload)
else: else:

@ -20,6 +20,15 @@ EXPECTED_BASE_HEADERS = {
class FakeCheckpointHttpApiPlugin(HttpApi): class FakeCheckpointHttpApiPlugin(HttpApi):
def __init__(self, conn): def __init__(self, conn):
super(FakeCheckpointHttpApiPlugin, self).__init__(conn) super(FakeCheckpointHttpApiPlugin, self).__init__(conn)
self.hostvars = {
'domain': None
}
def get_option(self, var):
return self.hostvars[var]
def set_option(self, var, val):
self.hostvars[var] = val
class TestCheckpointHttpApi(unittest.TestCase): class TestCheckpointHttpApi(unittest.TestCase):
@ -52,6 +61,19 @@ class TestCheckpointHttpApi(unittest.TestCase):
assert resp == (500, {'errorMessage': 'ERROR'}) assert resp == (500, {'errorMessage': 'ERROR'})
def test_login_to_global_domain(self):
temp_domain = self.checkpoint_plugin.hostvars['domain']
self.checkpoint_plugin.hostvars['domain'] = 'test_domain'
self.connection_mock.send.return_value = self._connection_response(
{'sid': 'SID', 'uid': 'UID'}
)
self.checkpoint_plugin.login('USERNAME', 'PASSWORD')
self.connection_mock.send.assert_called_once_with('/web_api/login', mock.ANY, headers=mock.ANY,
method=mock.ANY)
self.checkpoint_plugin.hostvars['domain'] = temp_domain
@staticmethod @staticmethod
def _connection_response(response, status=200): def _connection_response(response, status=200):
response_mock = mock.Mock() response_mock = mock.Mock()

Loading…
Cancel
Save