|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
# Helpers to resolve $ref recursively in OpenAPI and JSON schemas.
|
|
|
|
|
|
|
|
# Copyright 2016 OpenMarket Ltd
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
import json
|
|
|
|
import os
|
|
|
|
import os.path
|
|
|
|
import referencing
|
|
|
|
import urllib.parse
|
|
|
|
import yaml
|
|
|
|
|
|
|
|
def resolve_references(path, schema):
|
|
|
|
"""Recurse through a given schema until we find a $ref key. Upon doing so,
|
|
|
|
check that the referenced file exists, then load it up and check all of the
|
|
|
|
references in that file. Continue on until we've hit all dead ends.
|
|
|
|
|
|
|
|
$ref values are deleted from schemas as they are validated, to prevent
|
|
|
|
duplicate work.
|
|
|
|
"""
|
|
|
|
if isinstance(schema, dict):
|
|
|
|
# do $ref first
|
|
|
|
if '$ref' in schema:
|
|
|
|
# Pull the referenced URI from the schema
|
|
|
|
ref_uri = schema['$ref']
|
|
|
|
|
|
|
|
# Join the referenced URI with the URI of the file, to resolve
|
|
|
|
# relative URIs
|
|
|
|
full_ref_uri = urllib.parse.urljoin("file://" + path, ref_uri)
|
|
|
|
|
|
|
|
# Separate the fragment.
|
|
|
|
(full_ref_uri, fragment) = urllib.parse.urldefrag(full_ref_uri)
|
|
|
|
|
|
|
|
# Load the referenced file
|
|
|
|
ref = load_file_from_uri(full_ref_uri)
|
|
|
|
|
|
|
|
if fragment:
|
|
|
|
# The fragment should be a JSON Pointer
|
|
|
|
keys = fragment.strip('/').split('/')
|
|
|
|
for key in keys:
|
|
|
|
ref = ref[key]
|
|
|
|
|
|
|
|
# Check that the references in *this* file are valid
|
|
|
|
result = resolve_references(urllib.parse.urlsplit(full_ref_uri).path, ref)
|
|
|
|
|
|
|
|
# They were valid, and so were the sub-references. Delete
|
|
|
|
# the reference here to ensure we don't pass over it again
|
|
|
|
# when checking other files
|
|
|
|
del schema['$ref']
|
|
|
|
else:
|
|
|
|
result = {}
|
|
|
|
|
|
|
|
for key, value in schema.items():
|
|
|
|
result[key] = resolve_references(path, value)
|
|
|
|
return result
|
|
|
|
elif isinstance(schema, list):
|
|
|
|
return [resolve_references(path, value) for value in schema]
|
|
|
|
else:
|
|
|
|
return schema
|
|
|
|
|
|
|
|
|
|
|
|
def load_file_from_uri(path):
|
|
|
|
"""Load a JSON or YAML file from a file:// URI.
|
|
|
|
"""
|
|
|
|
print("Loading reference: %s" % path)
|
|
|
|
if not path.startswith("file://"):
|
|
|
|
raise Exception("Bad ref: %s" % (path,))
|
|
|
|
path = path[len("file://"):]
|
|
|
|
with open(path, "r") as f:
|
|
|
|
if path.endswith(".json"):
|
|
|
|
return json.load(f)
|
|
|
|
else:
|
|
|
|
# We have to assume it's YAML because some of the YAML examples
|
|
|
|
# do not have file extensions.
|
|
|
|
return yaml.safe_load(f)
|
|
|
|
|
|
|
|
def load_resource_from_uri(path):
|
|
|
|
"""Load a JSON or YAML JSON Schema, as a `referencing.Resource` object, from
|
|
|
|
a file:// URI.
|
|
|
|
"""
|
|
|
|
contents = load_file_from_uri(path)
|
|
|
|
resource = referencing.Resource(
|
|
|
|
contents=contents,
|
|
|
|
specification=referencing.jsonschema.DRAFT202012
|
|
|
|
)
|
|
|
|
return resource
|