Handle authentication errors and token expiration (#83695)

Fixes #70019
pull/82246/merge
Matt Martz 3 months ago committed by GitHub
parent 5ab5f23487
commit 9b0d2decb2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,2 @@
minor_changes:
- ansible-galaxy - Handle authentication errors and token expiration

@ -21,11 +21,14 @@
from __future__ import annotations from __future__ import annotations
import base64 import base64
import os
import json import json
import os
import time
from stat import S_IRUSR, S_IWUSR from stat import S_IRUSR, S_IWUSR
from urllib.error import HTTPError
from ansible import constants as C from ansible import constants as C
from ansible.galaxy.api import GalaxyError
from ansible.galaxy.user_agent import user_agent from ansible.galaxy.user_agent import user_agent
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.yaml import yaml_dump, yaml_load from ansible.module_utils.common.yaml import yaml_dump, yaml_load
@ -57,12 +60,16 @@ class KeycloakToken(object):
self.client_id = client_id self.client_id = client_id
if self.client_id is None: if self.client_id is None:
self.client_id = 'cloud-services' self.client_id = 'cloud-services'
self._expiration = None
def _form_payload(self): def _form_payload(self):
return 'grant_type=refresh_token&client_id=%s&refresh_token=%s' % (self.client_id, return 'grant_type=refresh_token&client_id=%s&refresh_token=%s' % (self.client_id,
self.access_token) self.access_token)
def get(self): def get(self):
if self._expiration and time.time() >= self._expiration:
self._token = None
if self._token: if self._token:
return self._token return self._token
@ -76,15 +83,20 @@ class KeycloakToken(object):
# or 'azp' (Authorized party - the party to which the ID Token was issued) # or 'azp' (Authorized party - the party to which the ID Token was issued)
payload = self._form_payload() payload = self._form_payload()
resp = open_url(to_native(self.auth_url), try:
data=payload, resp = open_url(to_native(self.auth_url),
validate_certs=self.validate_certs, data=payload,
method='POST', validate_certs=self.validate_certs,
http_agent=user_agent()) method='POST',
http_agent=user_agent())
except HTTPError as e:
raise GalaxyError(e, 'Unable to get access token')
# TODO: handle auth errors data = json.load(resp)
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict')) # So that we have a buffer, expire the token in ~2/3 the given value
expires_in = data['expires_in'] // 3 * 2
self._expiration = time.time() + expires_in
# - extract 'access_token' # - extract 'access_token'
self._token = data.get('access_token') self._token = data.get('access_token')

Loading…
Cancel
Save