Matt Martz 2 weeks ago committed by GitHub
commit 7e44ecf42a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,2 @@
minor_changes:
- ansible-galaxy - Add support for Keycloak service accounts

@ -76,6 +76,7 @@ SERVER_DEF = [
('api_version', False, 'int'),
('validate_certs', False, 'bool'),
('client_id', False, 'str'),
('client_secret', False, 'str'),
('timeout', False, 'int'),
]
@ -662,6 +663,7 @@ class GalaxyCLI(CLI):
# it doesn't need to be passed as kwarg to GalaxyApi, same for others we pop here
auth_url = server_options.pop('auth_url')
client_id = server_options.pop('client_id')
client_secret = server_options.pop('client_secret')
token_val = server_options['token'] or NoTokenSentinel
username = server_options['username']
api_version = server_options.pop('api_version')
@ -687,15 +689,17 @@ class GalaxyCLI(CLI):
if username:
server_options['token'] = BasicAuthToken(username, server_options['password'])
else:
if token_val:
if auth_url:
server_options['token'] = KeycloakToken(access_token=token_val,
auth_url=auth_url,
validate_certs=validate_certs,
client_id=client_id)
else:
# The galaxy v1 / github / django / 'Token'
server_options['token'] = GalaxyToken(token=token_val)
if auth_url:
server_options['token'] = KeycloakToken(
access_token=token_val,
auth_url=auth_url,
validate_certs=validate_certs,
client_id=client_id,
client_secret=client_secret,
)
elif token_val:
# The galaxy v1 / github / django / 'Token'
server_options['token'] = GalaxyToken(token=token_val)
server_options.update(galaxy_options)
config_servers.append(GalaxyAPI(

@ -21,11 +21,15 @@
from __future__ import annotations
import base64
import os
import json
import os
import time
from stat import S_IRUSR, S_IWUSR
from urllib.error import HTTPError
from urllib.parse import urlencode
from ansible import constants as C
from ansible.galaxy.api import GalaxyError
from ansible.galaxy.user_agent import user_agent
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.yaml import yaml_dump, yaml_load
@ -49,7 +53,7 @@ class KeycloakToken(object):
token_type = 'Bearer'
def __init__(self, access_token=None, auth_url=None, validate_certs=True, client_id=None):
def __init__(self, access_token=None, auth_url=None, validate_certs=True, client_id=None, client_secret=None):
self.access_token = access_token
self.auth_url = auth_url
self._token = None
@ -57,37 +61,49 @@ class KeycloakToken(object):
self.client_id = client_id
if self.client_id is None:
self.client_id = 'cloud-services'
self.client_secret = client_secret
self._expiration = None
def _form_payload(self):
return 'grant_type=refresh_token&client_id=%s&refresh_token=%s' % (self.client_id,
self.access_token)
payload = {
'client_id': self.client_id,
}
if self.client_secret:
payload['client_secret'] = self.client_secret
payload['scope'] = 'openid api.iam.service_accounts'
payload['grant_type'] = 'client_credentials'
else:
payload['refresh_token'] = self.access_token
payload['grant_type'] = 'refresh_token'
return urlencode(payload)
def get(self):
if self._expiration and time.time() >= self._expiration:
self._token = None
if self._token:
return self._token
# - build a request to POST to auth_url
# - body is form encoded
# - 'refresh_token' is the offline token stored in ansible.cfg
# - 'grant_type' is 'refresh_token'
# - 'client_id' is 'cloud-services'
# - should probably be based on the contents of the
# offline_ticket's JWT payload 'aud' (audience)
# or 'azp' (Authorized party - the party to which the ID Token was issued)
payload = self._form_payload()
resp = open_url(to_native(self.auth_url),
data=payload,
validate_certs=self.validate_certs,
method='POST',
http_agent=user_agent())
try:
resp = open_url(to_native(self.auth_url),
data=payload,
validate_certs=self.validate_certs,
method='POST',
http_agent=user_agent())
except HTTPError as e:
raise GalaxyError(e, 'Unable to get access token')
# TODO: handle auth errors
data = json.load(resp)
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
# So that we have a buffer, expire the token in ~2/3 the given value
expires_in = data['expires_in'] // 3 * 2
self._expiration = time.time() + expires_in
# - extract 'access_token'
self._token = data.get('access_token')
if token_type := data.get('token_type'):
self.token_type = token_type
return self._token

Loading…
Cancel
Save