mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
153 lines
3.6 KiB
Python
153 lines
3.6 KiB
Python
2 years ago
|
# -*- coding: utf-8 -*-
|
||
|
# (c) 2021 Matt Martz <matt@sivel.net>
|
||
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||
|
|
||
|
from __future__ import absolute_import, division, print_function
|
||
|
__metaclass__ = type
|
||
|
|
||
|
import gzip
|
||
|
import io
|
||
|
import sys
|
||
|
|
||
|
try:
|
||
|
from urllib.response import addinfourl
|
||
|
except ImportError:
|
||
|
from urllib import addinfourl
|
||
|
|
||
|
from ansible.module_utils.six import PY3
|
||
|
from ansible.module_utils.six.moves import http_client
|
||
|
from ansible.module_utils.urls import GzipDecodedReader, Request
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
|
||
|
def compress(data):
|
||
|
buf = io.BytesIO()
|
||
|
try:
|
||
|
f = gzip.GzipFile(fileobj=buf, mode='wb')
|
||
|
f.write(data)
|
||
|
finally:
|
||
|
f.close()
|
||
|
return buf.getvalue()
|
||
|
|
||
|
|
||
|
class Sock(io.BytesIO):
|
||
|
def makefile(self, *args, **kwds):
|
||
|
return self
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def urlopen_mock(mocker):
|
||
|
return mocker.patch('ansible.module_utils.urls.urllib_request.urlopen')
|
||
|
|
||
|
|
||
|
JSON_DATA = b'{"foo": "bar", "baz": "qux", "sandwich": "ham", "tech_level": "pickle", "pop": "corn", "ansible": "awesome"}'
|
||
|
|
||
|
|
||
|
RESP = b'''HTTP/1.1 200 OK
|
||
|
Content-Type: application/json; charset=utf-8
|
||
|
Set-Cookie: foo
|
||
|
Set-Cookie: bar
|
||
|
Content-Length: 108
|
||
|
|
||
|
%s''' % JSON_DATA
|
||
|
|
||
|
GZIP_RESP = b'''HTTP/1.1 200 OK
|
||
|
Content-Type: application/json; charset=utf-8
|
||
|
Set-Cookie: foo
|
||
|
Set-Cookie: bar
|
||
|
Content-Encoding: gzip
|
||
|
Content-Length: 100
|
||
|
|
||
|
%s''' % compress(JSON_DATA)
|
||
|
|
||
|
|
||
|
def test_Request_open_gzip(urlopen_mock):
|
||
|
h = http_client.HTTPResponse(
|
||
|
Sock(GZIP_RESP),
|
||
|
method='GET',
|
||
|
)
|
||
|
h.begin()
|
||
|
|
||
|
if PY3:
|
||
|
urlopen_mock.return_value = h
|
||
|
else:
|
||
|
urlopen_mock.return_value = addinfourl(
|
||
|
h.fp,
|
||
|
h.msg,
|
||
|
'http://ansible.com/',
|
||
|
h.status,
|
||
|
)
|
||
|
urlopen_mock.return_value.msg = h.reason
|
||
|
|
||
|
r = Request().open('GET', 'https://ansible.com/')
|
||
|
assert isinstance(r.fp, GzipDecodedReader)
|
||
|
assert r.read() == JSON_DATA
|
||
|
|
||
|
|
||
|
def test_Request_open_not_gzip(urlopen_mock):
|
||
|
h = http_client.HTTPResponse(
|
||
|
Sock(RESP),
|
||
|
method='GET',
|
||
|
)
|
||
|
h.begin()
|
||
|
|
||
|
if PY3:
|
||
|
urlopen_mock.return_value = h
|
||
|
else:
|
||
|
urlopen_mock.return_value = addinfourl(
|
||
|
h.fp,
|
||
|
h.msg,
|
||
|
'http://ansible.com/',
|
||
|
h.status,
|
||
|
)
|
||
|
urlopen_mock.return_value.msg = h.reason
|
||
|
|
||
|
r = Request().open('GET', 'https://ansible.com/')
|
||
|
assert not isinstance(r.fp, GzipDecodedReader)
|
||
|
assert r.read() == JSON_DATA
|
||
|
|
||
|
|
||
|
def test_Request_open_decompress_false(urlopen_mock):
|
||
|
h = http_client.HTTPResponse(
|
||
|
Sock(RESP),
|
||
|
method='GET',
|
||
|
)
|
||
|
h.begin()
|
||
|
|
||
|
if PY3:
|
||
|
urlopen_mock.return_value = h
|
||
|
else:
|
||
|
urlopen_mock.return_value = addinfourl(
|
||
|
h.fp,
|
||
|
h.msg,
|
||
|
'http://ansible.com/',
|
||
|
h.status,
|
||
|
)
|
||
|
urlopen_mock.return_value.msg = h.reason
|
||
|
|
||
|
r = Request().open('GET', 'https://ansible.com/', decompress=False)
|
||
|
assert not isinstance(r.fp, GzipDecodedReader)
|
||
|
assert r.read() == JSON_DATA
|
||
|
|
||
|
|
||
|
def test_GzipDecodedReader_no_gzip(monkeypatch, mocker):
|
||
|
monkeypatch.delitem(sys.modules, 'gzip')
|
||
|
monkeypatch.delitem(sys.modules, 'ansible.module_utils.urls')
|
||
|
|
||
|
orig_import = __import__
|
||
|
|
||
|
def _import(*args):
|
||
|
if args[0] == 'gzip':
|
||
|
raise ImportError
|
||
|
return orig_import(*args)
|
||
|
|
||
|
if PY3:
|
||
|
mocker.patch('builtins.__import__', _import)
|
||
|
else:
|
||
|
mocker.patch('__builtin__.__import__', _import)
|
||
|
|
||
|
mod = __import__('ansible.module_utils.urls').module_utils.urls
|
||
|
assert mod.HAS_GZIP is False
|
||
|
pytest.raises(mod.MissingModuleError, mod.GzipDecodedReader, None)
|