mirror of https://github.com/ansible/ansible.git
Add Shippable request signing to ansible-test. (#69526)
parent
6fffb0607b
commit
e7c2eb519b
@ -0,0 +1,2 @@
|
||||
minor_changes:
|
||||
- ansible-test - Added support for Ansible Core CI request signing for Shippable.
|
@ -0,0 +1,31 @@
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
from .util import common_auth_test
|
||||
|
||||
|
||||
def test_auth():
|
||||
# noinspection PyProtectedMember
|
||||
from ansible_test._internal.ci.shippable import (
|
||||
ShippableAuthHelper,
|
||||
)
|
||||
|
||||
class TestShippableAuthHelper(ShippableAuthHelper):
|
||||
def __init__(self):
|
||||
self.public_key_pem = None
|
||||
self.private_key_pem = None
|
||||
|
||||
def publish_public_key(self, public_key_pem):
|
||||
# avoid publishing key
|
||||
self.public_key_pem = public_key_pem
|
||||
|
||||
def initialize_private_key(self):
|
||||
# cache in memory instead of on disk
|
||||
if not self.private_key_pem:
|
||||
self.private_key_pem = self.generate_private_key()
|
||||
|
||||
return self.private_key_pem
|
||||
|
||||
auth = TestShippableAuthHelper()
|
||||
|
||||
common_auth_test(auth)
|
@ -0,0 +1,53 @@
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
|
||||
|
||||
def common_auth_test(auth):
|
||||
private_key_pem = auth.initialize_private_key()
|
||||
public_key_pem = auth.public_key_pem
|
||||
|
||||
extract_pem_key(private_key_pem, private=True)
|
||||
extract_pem_key(public_key_pem, private=False)
|
||||
|
||||
request = dict(hello='World')
|
||||
auth.sign_request(request)
|
||||
|
||||
verify_signature(request, public_key_pem)
|
||||
|
||||
|
||||
def extract_pem_key(value, private):
|
||||
assert isinstance(value, type(u''))
|
||||
|
||||
key_type = '(EC )?PRIVATE' if private else 'PUBLIC'
|
||||
pattern = r'^-----BEGIN ' + key_type + r' KEY-----\n(?P<key>.*?)\n-----END ' + key_type + r' KEY-----\n$'
|
||||
match = re.search(pattern, value, flags=re.DOTALL)
|
||||
|
||||
assert match, 'key "%s" does not match pattern "%s"' % (value, pattern)
|
||||
|
||||
base64.b64decode(match.group('key')) # make sure the key can be decoded
|
||||
|
||||
|
||||
def verify_signature(request, public_key_pem):
|
||||
signature = request.pop('signature')
|
||||
payload_bytes = json.dumps(request, sort_keys=True).encode()
|
||||
|
||||
assert isinstance(signature, type(u''))
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
||||
|
||||
public_key = load_pem_public_key(public_key_pem.encode(), default_backend())
|
||||
|
||||
verifier = public_key.verifier(
|
||||
base64.b64decode(signature.encode()),
|
||||
ec.ECDSA(hashes.SHA256()),
|
||||
)
|
||||
|
||||
verifier.update(payload_bytes)
|
||||
verifier.verify()
|
@ -0,0 +1,14 @@
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
import os
|
||||
import pytest
|
||||
import sys
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope='session')
|
||||
def ansible_test():
|
||||
"""Make ansible_test available on sys.path for unit testing ansible-test."""
|
||||
test_lib = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'lib')
|
||||
sys.path.insert(0, test_lib)
|
Loading…
Reference in New Issue