Add support for URI fragments with JSON Pointer in CI scripts

And use a single implementation of resolve_references for all scripts.

Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
pull/1751/head
Kévin Commaille 2 months ago
parent fd8d5f325c
commit 7413a9ee3b
No known key found for this signature in database
GPG Key ID: 29A48C1F03620416

@ -18,6 +18,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import helpers
import sys
import json
import os
@ -48,51 +49,16 @@ except ImportError as e:
raise
def load_file(path):
print("Loading reference: %s" % path)
if not path.startswith("file://"):
raise Exception("Bad ref: %s" % (path,))
path = path[len("file://"):]
with open(path, "r") as f:
if path.endswith(".json"):
return json.load(f)
else:
# We have to assume it's YAML because some of the YAML examples
# do not have file extensions.
return yaml.safe_load(f)
def resolve_references(path, schema):
if isinstance(schema, dict):
# do $ref first
if '$ref' in schema:
value = schema['$ref']
path = os.path.abspath(os.path.join(os.path.dirname(path), value))
ref = load_file("file://" + path)
result = resolve_references(path, ref)
del schema['$ref']
else:
result = {}
for key, value in schema.items():
result[key] = resolve_references(path, value)
return result
elif isinstance(schema, list):
return [resolve_references(path, value) for value in schema]
else:
return schema
def check_example_file(examplepath, schemapath):
with open(examplepath) as f:
example = resolve_references(examplepath, json.load(f))
example = helpers.resolve_references(examplepath, json.load(f))
with open(schemapath) as f:
schema = yaml.safe_load(f)
fileurl = "file://" + os.path.abspath(schemapath)
schema["id"] = fileurl
resolver = jsonschema.RefResolver(fileurl, schema, handlers={"file": load_file})
resolver = jsonschema.RefResolver(fileurl, schema, handlers={"file": helpers.load_file_from_uri})
print ("Checking schema for: %r %r" % (examplepath, schemapath))
try:

@ -18,6 +18,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import helpers
import sys
import json
import os
@ -67,23 +68,11 @@ class SchemaDirReport:
def add(self, other_report):
self.files += other_report.files
self.errors += other_report.errors
def load_file(path):
if not path.startswith("file://"):
raise Exception(f"Bad ref: {path}")
path = path[len("file://"):]
with open(path, "r") as f:
if path.endswith(".json"):
return json.load(f)
else:
# We have to assume it's YAML because some of the YAML examples
# do not have file extensions.
return yaml.safe_load(f)
def check_example(path, schema, example):
# URI with scheme is necessary to make RefResolver work.
fileurl = "file://" + os.path.abspath(path)
resolver = jsonschema.RefResolver(fileurl, schema, handlers={"file": load_file})
resolver = jsonschema.RefResolver(fileurl, schema, handlers={"file": helpers.load_file_from_uri})
validator = jsonschema.Draft202012Validator(schema, resolver)
validator.validate(example)

@ -19,6 +19,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import helpers
import sys
import json
import os
@ -49,9 +50,7 @@ except ImportError as e:
def check_schema(filepath, example, schema):
example = resolve_references(filepath, example)
schema = resolve_references(filepath, schema)
resolver = jsonschema.RefResolver(filepath, schema, handlers={"file": load_file})
resolver = jsonschema.RefResolver(filepath, schema, handlers={"file": helpers.load_file_from_uri})
validator = jsonschema.Draft202012Validator(schema, resolver)
validator.validate(example)
@ -120,6 +119,8 @@ def check_openapi_file(filepath):
with open(filepath) as f:
openapi = yaml.safe_load(f)
openapi = helpers.resolve_references(filepath, openapi)
openapi_version = openapi.get('openapi')
if not openapi_version:
# This is not an OpenAPI file, skip.
@ -149,64 +150,6 @@ def check_openapi_file(filepath):
check_response(filepath, request, code, json_response)
def resolve_references(path, schema):
"""Recurse through a given schema until we find a $ref key. Upon doing so,
check that the referenced file exists, then load it up and check all of the
references in that file. Continue on until we've hit all dead ends.
$ref values are deleted from schemas as they are validated, to prevent
duplicate work.
"""
if isinstance(schema, dict):
# do $ref first
if '$ref' in schema:
# Pull the referenced filepath from the schema
referenced_file = schema['$ref']
# Referenced filepaths are relative, so take the current path's
# directory and append the relative, referenced path to it.
inner_path = os.path.join(os.path.dirname(path), referenced_file)
# Then convert the path (which may contiain '../') into a
# normalised, absolute path
inner_path = os.path.abspath(inner_path)
# Load the referenced file
ref = load_file("file://" + inner_path)
# Check that the references in *this* file are valid
result = resolve_references(inner_path, ref)
# They were valid, and so were the sub-references. Delete
# the reference here to ensure we don't pass over it again
# when checking other files
del schema['$ref']
else:
result = {}
for key, value in schema.items():
result[key] = resolve_references(path, value)
return result
elif isinstance(schema, list):
return [resolve_references(path, value) for value in schema]
else:
return schema
def load_file(path):
print("Loading reference: %s" % path)
if not path.startswith("file://"):
raise Exception("Bad ref: %s" % (path,))
path = path[len("file://"):]
with open(path, "r") as f:
if path.endswith(".json"):
return json.load(f)
else:
# We have to assume it's YAML because some of the YAML examples
# do not have file extensions.
return yaml.safe_load(f)
if __name__ == '__main__':
# Get the directory that this script is residing in
script_directory = os.path.dirname(os.path.realpath(__file__))

@ -20,6 +20,7 @@
import argparse
import errno
import helpers
import json
import logging
import os.path
@ -31,34 +32,6 @@ import yaml
scripts_dir = os.path.dirname(os.path.abspath(__file__))
api_dir = os.path.join(os.path.dirname(scripts_dir), "data", "api")
def resolve_references(path, schema):
if isinstance(schema, dict):
# do $ref first
if '$ref' in schema:
value = schema['$ref']
previous_path = path
path = os.path.join(os.path.dirname(path), value)
try:
with open(path, encoding="utf-8") as f:
ref = yaml.safe_load(f)
result = resolve_references(path, ref)
del schema['$ref']
path = previous_path
except FileNotFoundError:
print("Resolving {}".format(schema))
print("File not found: {}".format(path))
result = {}
else:
result = {}
for key, value in schema.items():
result[key] = resolve_references(path, value)
return result
elif isinstance(schema, list):
return [resolve_references(path, value) for value in schema]
else:
return schema
def prefix_absolute_path_references(text, base_url):
"""Adds base_url to absolute-path references.
@ -176,7 +149,7 @@ for filename in os.listdir(selected_api_dir):
print("Reading OpenAPI: %s" % filepath)
with open(filepath, "r") as f:
api = yaml.safe_load(f.read())
api = resolve_references(filepath, api)
api = helpers.resolve_references(filepath, api)
basePath = api['servers'][0]['variables']['basePath']['default']
for path, methods in api["paths"].items():

@ -0,0 +1,87 @@
#!/usr/bin/env python3
# Helpers to resolve $ref recursively in OpenAPI and JSON schemas.
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import os.path
import urllib.parse
import yaml
def resolve_references(path, schema):
"""Recurse through a given schema until we find a $ref key. Upon doing so,
check that the referenced file exists, then load it up and check all of the
references in that file. Continue on until we've hit all dead ends.
$ref values are deleted from schemas as they are validated, to prevent
duplicate work.
"""
if isinstance(schema, dict):
# do $ref first
if '$ref' in schema:
# Pull the referenced URI from the schema
ref_uri = schema['$ref']
# Join the referenced URI with the URI of the file, to resolve
# relative URIs
full_ref_uri = urllib.parse.urljoin("file://" + path, ref_uri)
# Separate the fragment.
(full_ref_uri, fragment) = urllib.parse.urldefrag(full_ref_uri)
# Load the referenced file
ref = load_file_from_uri(full_ref_uri)
if fragment:
# The fragment should be a JSON Pointer
keys = fragment.strip('/').split('/')
for key in keys:
ref = ref[key]
# Check that the references in *this* file are valid
result = resolve_references(urllib.parse.urlsplit(full_ref_uri).path, ref)
# They were valid, and so were the sub-references. Delete
# the reference here to ensure we don't pass over it again
# when checking other files
del schema['$ref']
else:
result = {}
for key, value in schema.items():
result[key] = resolve_references(path, value)
return result
elif isinstance(schema, list):
return [resolve_references(path, value) for value in schema]
else:
return schema
def load_file_from_uri(path):
"""Load a JSON or YAML file from a file:// URI.
"""
print("Loading reference: %s" % path)
if not path.startswith("file://"):
raise Exception("Bad ref: %s" % (path,))
path = path[len("file://"):]
with open(path, "r") as f:
if path.endswith(".json"):
return json.load(f)
else:
# We have to assume it's YAML because some of the YAML examples
# do not have file extensions.
return yaml.safe_load(f)
Loading…
Cancel
Save