You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/lib/ansible/galaxy/token.py

187 lines
6.0 KiB
Python

########################################################################
#
# (C) 2015, Chris Houseknecht <chouse@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
from __future__ import annotations
import base64
import os
import json
from stat import S_IRUSR, S_IWUSR
from ansible import constants as C
from ansible.galaxy.user_agent import user_agent
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.yaml import yaml_dump, yaml_load
from ansible.module_utils.urls import open_url
from ansible.utils.display import Display
display = Display()
class NoTokenSentinel(object):
""" Represents an ansible.cfg server with not token defined (will ignore cmdline and GALAXY_TOKEN_PATH. """
def __new__(cls, *args, **kwargs):
return cls
class KeycloakToken(object):
'''A token granted by a Keycloak server.
Like sso.redhat.com as used by cloud.redhat.com
ie Automation Hub'''
token_type = 'Bearer'
def __init__(self, access_token=None, auth_url=None, validate_certs=True, client_id=None):
self.access_token = access_token
self.auth_url = auth_url
self._token = None
self.validate_certs = validate_certs
self.client_id = client_id
if self.client_id is None:
self.client_id = 'cloud-services'
def _form_payload(self):
return 'grant_type=refresh_token&client_id=%s&refresh_token=%s' % (self.client_id,
self.access_token)
def get(self):
if self._token:
return self._token
# - build a request to POST to auth_url
# - body is form encoded
# - 'refresh_token' is the offline token stored in ansible.cfg
# - 'grant_type' is 'refresh_token'
# - 'client_id' is 'cloud-services'
# - should probably be based on the contents of the
# offline_ticket's JWT payload 'aud' (audience)
# or 'azp' (Authorized party - the party to which the ID Token was issued)
payload = self._form_payload()
resp = open_url(to_native(self.auth_url),
data=payload,
validate_certs=self.validate_certs,
method='POST',
http_agent=user_agent())
# TODO: handle auth errors
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
# - extract 'access_token'
self._token = data.get('access_token')
return self._token
def headers(self):
headers = {}
headers['Authorization'] = '%s %s' % (self.token_type, self.get())
return headers
class GalaxyToken(object):
''' Class to storing and retrieving local galaxy token '''
token_type = 'Token'
def __init__(self, token=None):
self.b_file = to_bytes(C.GALAXY_TOKEN_PATH, errors='surrogate_or_strict')
# Done so the config file is only opened when set/get/save is called
self._config = None
self._token = token
@property
def config(self):
if self._config is None:
self._config = self._read()
# Prioritise the token passed into the constructor
if self._token:
self._config['token'] = None if self._token is NoTokenSentinel else self._token
return self._config
def _read(self):
action = 'Opened'
if not os.path.isfile(self.b_file):
# token file not found, create and chmod u+rw
open(self.b_file, 'w').close()
os.chmod(self.b_file, S_IRUSR | S_IWUSR) # owner has +rw
action = 'Created'
with open(self.b_file, 'r') as f:
config = yaml_load(f)
display.vvv('%s %s' % (action, to_text(self.b_file)))
if config and not isinstance(config, dict):
display.vvv('Galaxy token file %s malformed, unable to read it' % to_text(self.b_file))
return {}
return config or {}
def set(self, token):
self._token = token
self.save()
def get(self):
return self.config.get('token', None)
def save(self):
with open(self.b_file, 'w') as f:
yaml_dump(self.config, f, default_flow_style=False)
def headers(self):
headers = {}
token = self.get()
if token:
headers['Authorization'] = '%s %s' % (self.token_type, self.get())
return headers
class BasicAuthToken(object):
token_type = 'Basic'
def __init__(self, username, password=None):
self.username = username
self.password = password
self._token = None
@staticmethod
def _encode_token(username, password):
token = "%s:%s" % (to_text(username, errors='surrogate_or_strict'),
to_text(password, errors='surrogate_or_strict', nonstring='passthru') or '')
b64_val = base64.b64encode(to_bytes(token, encoding='utf-8', errors='surrogate_or_strict'))
return to_text(b64_val)
def get(self):
if self._token:
return self._token
self._token = self._encode_token(self.username, self.password)
return self._token
def headers(self):
headers = {}
headers['Authorization'] = '%s %s' % (self.token_type, self.get())
return headers