"""Contains all the units for the spec.""" from batesian.units import Units import inspect import json import os import re import subprocess import urllib import yaml def get_json_schema_object_fields(obj, enforce_title=False): # Algorithm: # f.e. property => add field info (if field is object then recurse) if obj.get("type") != "object": raise Exception( "get_json_schema_object_fields: Object %s isn't an object." % obj ) if enforce_title and not obj.get("title"): raise Exception( "get_json_schema_object_fields: Nested object %s doesn't have a title." % obj ) required_keys = obj.get("required") if not required_keys: required_keys = [] fields = { "title": obj.get("title"), "rows": [] } tables = [fields] props = obj.get("properties", obj.get("patternProperties")) parents = obj.get("allOf") if not props and not parents: raise Exception( "Object %s has no properties or parents." % obj ) if not props: # parents only return [{ "title": obj["title"], "parent": parents[0]["$ref"], "no-table": True }] for key_name in sorted(props): value_type = None required = key_name in required_keys desc = props[key_name].get("description", "") if props[key_name]["type"] == "object": if props[key_name].get("additionalProperties"): # not "really" an object, just a KV store value_type = ( "{string: %s}" % props[key_name]["additionalProperties"]["type"] ) else: nested_object = get_json_schema_object_fields( props[key_name], enforce_title=True ) value_type = "{%s}" % nested_object[0]["title"] if not nested_object[0].get("no-table"): tables += nested_object elif props[key_name]["type"] == "array": # if the items of the array are objects then recurse if props[key_name]["items"]["type"] == "object": nested_object = get_json_schema_object_fields( props[key_name]["items"], enforce_title=True ) value_type = "[%s]" % nested_object[0]["title"] tables += nested_object else: value_type = "[%s]" % props[key_name]["items"]["type"] else: value_type = props[key_name]["type"] if props[key_name].get("enum"): if len(props[key_name].get("enum")) > 1: value_type = "enum" desc += ( " One of: %s" % json.dumps(props[key_name]["enum"]) ) else: desc += ( " Must be '%s'." % props[key_name]["enum"][0] ) fields["rows"].append({ "key": key_name, "type": value_type, "required": required, "desc": desc, "req_str": "**Required.** " if required else "" }) return tables class MatrixUnits(Units): def _load_swagger_meta(self, api, group_name): endpoints = [] for path in api["paths"]: for method in api["paths"][path]: single_api = api["paths"][path][method] full_path = api.get("basePath", "") + path endpoint = { "title": single_api.get("summary", ""), "desc": single_api.get("description", single_api.get("summary", "")), "method": method.upper(), "path": full_path, "requires_auth": "security" in single_api, "rate_limited": 429 in single_api.get("responses", {}), "req_params": [], "res_tables": [], "example": { "req": "", "responses": [], "good_response": "" } } self.log(".o.O.o. Endpoint: %s %s" % (method, path)) for param in single_api.get("parameters", []): # description desc = param.get("description", "") if param.get("required"): desc = "**Required.** " + desc # assign value expected for this param val_type = param.get("type") # integer/string refType = Units.prop(param, "schema/$ref/") # Error,Event schemaFmt = Units.prop(param, "schema/format") # bytes e.g. uploads if not val_type and refType: val_type = refType # TODO: Resolve to human-readable. if not val_type and schemaFmt: val_type = schemaFmt if val_type: endpoint["req_params"].append({ "key": param["name"], "loc": param["in"], "type": val_type, "desc": desc }) continue # If we're here, either the param has no value or it is an # object which we haven't $reffed (so probably just a json # object with some keys; we'll add entries f.e one) if "schema" not in param: raise Exception( "API endpoint group=%s path=%s method=%s param=%s"+ " has no valid parameter value." % ( group_name, path, method, param ) ) if Units.prop(param, "schema/type") != "object": raise Exception( ("API endpoint group=%s path=%s method=%s defines a"+ " param with a schema which isn't an object. Array?") % (group_name, path, method) ) # loop top-level json keys json_body = Units.prop(param, "schema/properties") for key in json_body: endpoint["req_params"].append({ "key": key, "loc": "JSON body", "type": json_body[key]["type"], "desc": json_body[key]["description"] }) # endfor[param] # group params by location to ease templating endpoint["req_param_by_loc"] = { # path: [...], query: [...], body: [...] } for p in endpoint["req_params"]: if p["loc"] not in endpoint["req_param_by_loc"]: endpoint["req_param_by_loc"][p["loc"]] = [] endpoint["req_param_by_loc"][p["loc"]].append(p) good_response = None for code, res in single_api.get("responses", {}).items(): if not good_response and code == 200: good_response = res description = res.get("description", "") example = res.get("examples", {}).get("application/json", "") if description and example: endpoint["example"]["responses"].append({ "code": code, "description": description, "example": example, }) # form example request if it has one. It "has one" if all params # have either "x-example" or a "schema" with an "example". params_missing_examples = [ p for p in single_api.get("parameters", []) if ( "x-example" not in p and not Units.prop(p, "schema/example") ) ] if len(params_missing_examples) == 0: path_template = api.get("basePath", "") + path qps = {} body = "" for param in single_api.get("parameters", []): if param["in"] == "path": path_template = path_template.replace( "{%s}" % param["name"], urllib.quote( param["x-example"] ) ) elif param["in"] == "body": body = param["schema"]["example"] elif param["in"] == "query": qps[param["name"]] = param["x-example"] query_string = "" if len(qps) == 0 else "?"+urllib.urlencode(qps) endpoint["example"]["req"] = "%s %s%s\n%s" % ( method.upper(), path_template, query_string, body ) else: self.log( "The following parameters are missing examples :( \n %s" % [ p["name"] for p in params_missing_examples ] ) # add response params if this API has any. if good_response: res_type = Units.prop(good_response, "schema/type") if res_type and res_type not in ["object", "array"]: # response is a raw string or something like that endpoint["res_tables"].append({ "title": None, "rows": [{ "key": good_response["schema"].get("name", ""), "type": res_type, "desc": res.get("description", "") }] }) elif res_type and Units.prop(good_response, "schema/properties"): # response is an object: schema = good_response["schema"] res_tables = get_json_schema_object_fields(schema) for table in res_tables: if "no-table" not in table: endpoint["res_tables"].append(table) endpoints.append(endpoint) aliases = single_api.get("x-alias", None) if aliases: alias_link = aliases["canonical-link"] for alias in aliases["aliases"]: endpoints.append({ "method": method.upper(), "path": alias, "alias_for_path": full_path, "alias_link": alias_link }) return { "base": api.get("basePath"), "group": group_name, "endpoints": endpoints, } def load_swagger_apis(self): path = "../api/client-server/v1" apis = {} for filename in os.listdir(path): if not filename.endswith(".yaml"): continue self.log("Reading swagger API: %s" % filename) with open(os.path.join(path, filename), "r") as f: # strip .yaml group_name = filename[:-5] api = yaml.load(f.read()) api["__meta"] = self._load_swagger_meta(api, group_name) apis[group_name] = api return apis def load_common_event_fields(self): path = "../event-schemas/schema/v1/core-event-schema" event_types = {} for (root, dirs, files) in os.walk(path): for filename in files: if not filename.endswith(".json"): continue event_type = filename[:-5] # strip the ".json" filepath = os.path.join(root, filename) with open(filepath) as f: try: event_info = json.load(f) except Exception as e: raise ValueError( "Error reading file %r" % (filepath,), e ) if "event" not in event_type: continue # filter ImageInfo and co table = { "title": event_info["title"], "desc": event_info["description"], "rows": [] } for prop in sorted(event_info["properties"]): row = { "key": prop, "type": event_info["properties"][prop]["type"], "desc": event_info["properties"][prop].get("description","") } table["rows"].append(row) event_types[event_type] = table return event_types def load_event_examples(self): path = "../event-schemas/examples/v1" examples = {} for filename in os.listdir(path): if not filename.startswith("m."): continue with open(os.path.join(path, filename), "r") as f: examples[filename] = json.loads(f.read()) if filename == "m.room.message#m.text": examples["m.room.message"] = examples[filename] return examples def load_event_schemas(self): path = "../event-schemas/schema/v1" schemata = {} for filename in os.listdir(path): if not filename.startswith("m."): continue self.log("Reading %s" % os.path.join(path, filename)) with open(os.path.join(path, filename), "r") as f: json_schema = json.loads(f.read()) schema = { "typeof": None, "typeof_info": "", "type": None, "title": None, "desc": None, "msgtype": None, "content_fields": [ # { # title: "