OpenSlides/docs/datavalidator/check_json.py

669 lines
23 KiB
Python
Raw Normal View History

import json
import re
import sys
from collections import defaultdict
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
import fastjsonschema
import yaml
MODELS_YML_PATH = "../../docs/models.yml"
DEFAULT_FILES = [
"../../docker/initial-data.json",
"../../docs/example-data.json",
]
SCHEMA = fastjsonschema.compile(
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Schema for initial and example data.",
"type": "object",
"patternProperties": {
"^[a-z_]+$": {
"type": "array",
"items": {
"type": "object",
"properties": {"id": {"type": "number"}},
"required": ["id"],
},
}
},
"additionalProperties": False,
}
)
class CheckException(Exception):
pass
def check_string(value: Any) -> bool:
return value is None or isinstance(value, str)
2021-02-16 10:18:09 +01:00
color_regex = re.compile("^#[0-9a-f]{6}$")
2021-02-16 10:18:09 +01:00
def check_color(value: Any) -> bool:
return value is None or bool(isinstance(value, str) and color_regex.match(value))
2021-02-16 10:18:09 +01:00
def check_number(value: Any) -> bool:
return value is None or type(value) == int
2021-02-25 10:58:51 +01:00
def check_float(value: Any) -> bool:
return value is None or type(value) in (int, float)
2021-02-25 10:58:51 +01:00
def check_boolean(value: Any) -> bool:
return value is None or value is False or value is True
def check_string_list(value: Any) -> bool:
return check_x_list(value, check_string)
def check_number_list(value: Any) -> bool:
return check_x_list(value, check_number)
def check_x_list(value: Any, fn: Callable) -> bool:
if value is None:
return True
if not isinstance(value, list):
return False
return all([fn(sv) for sv in value])
def check_decimal(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str):
pattern = r"^-?(\d|[1-9]\d+)\.\d{6}$"
if re.match(pattern, value):
return True
return False
def check_json(value: Any, root: bool = True) -> bool:
if value is None:
return True
if not root and (isinstance(value, int) or isinstance(value, str)):
return True
if isinstance(value, list):
return all(check_json(x, root=False) for x in value)
if isinstance(value, dict):
return all(check_json(x, root=False) for x in value.values())
return False
checker_mapping = {
"string": check_string,
"HTMLStrict": check_string,
"HTMLPermissive": check_string,
"generic-relation": check_string,
"number": check_number,
"timestamp": check_number,
"relation": check_number,
"float": check_float,
"boolean": check_boolean,
"string[]": check_string_list,
"generic-relation-list": check_string_list,
"number[]": check_number_list,
"relation-list": check_number_list,
"decimal(6)": check_decimal,
"color": check_color,
"JSON": check_json,
}
class Checker:
def __init__(self, data: Dict[str, List[Any]], is_import: bool = False) -> None:
self.data = data
with open(MODELS_YML_PATH, "rb") as x:
models_yml = x.read()
models_yml = models_yml.replace(" yes:".encode(), ' "yes":'.encode())
models_yml = models_yml.replace(" no:".encode(), ' "no":'.encode())
self.models = yaml.safe_load(models_yml)
if is_import:
self.modify_models_for_import()
self.errors: List[str] = []
self.template_prefixes: Dict[
str, Dict[str, Tuple[str, int, int]]
] = defaultdict(dict)
self.generate_template_prefixes()
def modify_models_for_import(self) -> None:
collection_allowlist = (
"user",
"meeting",
"group",
"personal_note",
"tag",
"agenda_item",
"list_of_speakers",
"speaker",
"topic",
"motion",
"motion_submitter",
"motion_comment",
"motion_comment_section",
"motion_category",
"motion_block",
"motion_change_recommendation",
"motion_state",
"motion_workflow",
"motion_statute_paragraph",
"poll",
"option",
"vote",
"assignment",
"assignment_candidate",
"mediafile",
"projector",
"projection",
"projector_message",
"projector_countdown",
"chat_group",
)
for collection in list(self.models.keys()):
if collection not in collection_allowlist:
del self.models[collection]
self.models["mediafile"]["blob"] = "string"
def generate_template_prefixes(self) -> None:
for collection in self.models.keys():
for field in self.models[collection]:
if not self.is_template_field(field):
continue
parts = field.split("$")
prefix = parts[0]
suffix = parts[1]
if prefix in self.template_prefixes[collection]:
raise ValueError(
f"the template prefix {prefix} is not unique within {collection}"
)
self.template_prefixes[collection][prefix] = (
field,
len(prefix),
len(suffix),
)
def is_template_field(self, field: str) -> bool:
return "$_" in field or field.endswith("$")
def is_structured_field(self, field: str) -> bool:
return "$" in field and not self.is_template_field(field)
def is_normal_field(self, field: str) -> bool:
return "$" not in field
def is_calculated_field(self, field: str) -> bool:
return field == "content"
def make_structured(self, field: str, replacement: Any) -> str:
if type(replacement) not in (str, int):
raise CheckException(
f"Invalid type {type(replacement)} for the replacement of field {field}"
)
parts = field.split("$")
return parts[0] + "$" + str(replacement) + parts[1]
def to_template_field(
self, collection: str, structured_field: str
) -> Tuple[str, str]:
"""Returns template_field, replacement"""
parts = structured_field.split("$")
descriptor = self.template_prefixes[collection].get(parts[0])
if not descriptor:
raise CheckException(
f"Unknown template field for prefix {parts[0]} in collection {collection}"
)
return (
descriptor[0],
structured_field[descriptor[1] + 1 : len(structured_field) - descriptor[2]],
)
def run_check(self) -> None:
self.check_json()
self.check_collections()
for collection, models in self.data.items():
for model in models:
self.check_model(collection, model)
if self.errors:
errors = [f"\t{error}" for error in self.errors]
raise CheckException("\n".join(errors))
def check_json(self) -> None:
try:
SCHEMA(self.data)
except fastjsonschema.exceptions.JsonSchemaException as e:
raise CheckException(f"JSON does not match schema: {str(e)}")
def check_collections(self) -> None:
c1 = set(self.data.keys())
c2 = set(self.models.keys())
if c1 != c2:
err = "Collections in JSON file do not match with models.yml."
if c2 - c1:
err += f" Missing collections: {', '.join(c2-c1)}."
if c1 - c2:
err += f" Invalid collections: {', '.join(c1-c2)}."
raise CheckException(err)
def check_model(self, collection: str, model: Dict[str, Any]) -> None:
errors = self.check_normal_fields(model, collection)
if not errors:
errors = self.check_template_fields(model, collection)
if not errors:
self.check_types(model, collection)
self.check_relations(model, collection)
def check_normal_fields(self, model: Dict[str, Any], collection: str) -> bool:
model_fields = set(
x
for x in model.keys()
if self.is_normal_field(x) or self.is_template_field(x)
)
collection_fields = set(
x
for x in self.models[collection].keys()
if self.is_normal_field(x) or self.is_template_field(x)
)
calculated_fields = set(
x
for x in self.models[collection].keys()
if self.is_calculated_field(x)
)
errors = False
if collection_fields - model_fields - calculated_fields:
error = f"{collection}/{model['id']}: Missing fields {', '.join(collection_fields - model_fields)}"
self.errors.append(error)
errors = True
if model_fields - collection_fields:
error = f"{collection}/{model['id']}: Invalid fields {', '.join(model_fields - collection_fields)}"
self.errors.append(error)
errors = True
return errors
def check_template_fields(self, model: Dict[str, Any], collection: str) -> bool:
"""
Only checks that for each replacement a structured field exists and
not too many structured fields. Does not check the content.
Returns True on errors.
"""
errors = False
for template_field in self.models[collection].keys():
if not self.is_template_field(template_field):
continue
field_error = False
replacements = model[template_field]
if not isinstance(replacements, list):
self.errors.append(
f"{collection}/{model['id']}/{template_field}: Replacements for the template field must be a list"
)
field_error = True
continue
for replacement in replacements:
if not isinstance(replacement, str):
self.errors.append(
f"{collection}/{model['id']}/{template_field}: Each replacement for the template field must be a string"
)
field_error = True
if field_error:
error = True
continue
replacement_collection = None
field_description = self.models[collection][template_field]
if isinstance(field_description, dict):
replacement_collection = field_description.get("replacement_collection")
for replacement in replacements:
structured_field = self.make_structured(template_field, replacement)
if structured_field not in model:
self.errors.append(
f"{collection}/{model['id']}/{template_field}: Missing {structured_field} since it is given as a replacement"
)
errors = True
if replacement_collection:
try:
as_id = int(replacement)
except (TypeError, ValueError):
self.errors.append(
f"{collection}/{model['id']}/{template_field}: Replacement {replacement} is not an integer"
)
if not self.find_model(replacement_collection, as_id):
self.errors.append(
f"{collection}/{model['id']}/{template_field}: Replacement {replacement} does not exist as a model of collection {replacement_collection}"
)
for field in model.keys():
if self.is_structured_field(field):
try:
_template_field, _replacement = self.to_template_field(
collection, field
)
if (
template_field == _template_field
and _replacement not in model[template_field]
):
self.errors.append(
f"{collection}/{model['id']}/{field}: Invalid structured field. Missing replacement {_replacement} in {template_field}"
)
errors = True
except CheckException as e:
self.errors.append(
f"{collection}/{model['id']}/{field} error: " + str(e)
)
errors = True
return errors
def check_types(self, model: Dict[str, Any], collection: str) -> None:
for field in model.keys():
if self.is_template_field(field):
continue
field_type = self.get_type_from_collection(field, collection)
enum = self.get_enum_from_collection_field(field, collection)
checker = checker_mapping.get(field_type)
if checker is None:
raise NotImplementedError(
f"TODO implement check for field type {field_type}"
)
if not checker(model[field]):
error = f"{collection}/{model['id']}/{field}: Type error: Type is not {field_type}"
self.errors.append(error)
if enum and model[field] not in enum:
error = f"{collection}/{model['id']}/{field}: Value error: Value {model[field]} is not a valid enum value"
self.errors.append(error)
def get_type_from_collection(self, field: str, collection: str) -> str:
if self.is_structured_field(field):
field, _ = self.to_template_field(collection, field)
field_description = self.models[collection][field]
if isinstance(field_description, dict):
if field_description["type"] == "template":
if isinstance(field_description["fields"], dict):
return field_description["fields"]["type"]
return field_description["fields"]
return field_description["type"]
return field_description
field_description = self.get_field_description(field, collection)
if field_description:
return field_description["type"]
return self.models[collection][field]
def get_enum_from_collection_field(
self, field: str, collection: str
) -> Optional[Set[str]]:
if self.is_structured_field(field):
field, _ = self.to_template_field(collection, field)
field_description = self.models[collection][field]
if isinstance(field_description, dict):
if field_description["type"] == "template":
if isinstance(field_description["fields"], dict):
field_description = field_description["fields"]
else:
return None
if "enum" in field_description:
enum = set(field_description["enum"])
if not field_description.get("required", False):
enum.add(None)
return enum
return None
def check_relations(self, model: Dict[str, Any], collection: str) -> None:
for field in model.keys():
try:
self.check_relation(model, collection, field)
except CheckException as e:
self.errors.append(
f"{collection}/{model['id']}/{field} error: " + str(e)
)
def check_relation(
self, model: Dict[str, Any], collection: str, field: str
) -> None:
if self.is_template_field(field):
return
field_type = self.get_type_from_collection(field, collection)
basemsg = f"{collection}/{model['id']}/{field}: Relation Error: "
replacement = None
if self.is_structured_field(field):
_, replacement = self.to_template_field(collection, field)
if field_type == "relation":
foreign_id = model[field]
if foreign_id is None:
return
foreign_collection, foreign_field = self.get_to(field, collection)
self.check_reverse_relation(
collection,
model["id"],
model,
foreign_collection,
foreign_id,
foreign_field,
basemsg,
replacement,
)
elif field_type == "relation-list":
foreign_ids = model[field]
if foreign_ids is None:
return
foreign_collection, foreign_field = self.get_to(field, collection)
for foreign_id in foreign_ids:
self.check_reverse_relation(
collection,
model["id"],
model,
foreign_collection,
foreign_id,
foreign_field,
basemsg,
replacement,
)
elif field_type == "generic-relation" and model[field] is not None:
foreign_collection, foreign_id = self.split_fqid(model[field])
foreign_field = self.get_to_generic_case(
collection, field, foreign_collection
)
self.check_reverse_relation(
collection,
model["id"],
model,
foreign_collection,
foreign_id,
foreign_field,
basemsg,
replacement,
)
elif field_type == "generic-relation-list" and model[field] is not None:
for fqid in model[field]:
foreign_collection, foreign_id = self.split_fqid(fqid)
foreign_field = self.get_to_generic_case(
collection, field, foreign_collection
)
self.check_reverse_relation(
collection,
model["id"],
model,
foreign_collection,
foreign_id,
foreign_field,
basemsg,
replacement,
)
def get_to(self, field: str, collection: str) -> Tuple[str, str]:
if self.is_structured_field(field):
field, _ = self.to_template_field(collection, field)
field_description = self.models[collection][field]
if field_description["type"] == "template":
to = field_description["fields"]["to"]
else:
to = field_description["to"]
return to.split("/")
def find_model(self, collection: str, id: int) -> Optional[Dict[str, Any]]:
c = self.data.get(collection, [])
for model in c:
if model["id"] == id:
return model
return None
def check_reverse_relation(
self,
collection: str,
id: int,
model: Dict[str, Any],
foreign_collection: str,
foreign_id: int,
foreign_field: str,
basemsg: str,
replacement: Optional[str],
) -> None:
actual_foreign_field = foreign_field
if self.is_template_field(foreign_field):
if replacement:
actual_foreign_field = self.make_structured(foreign_field, replacement)
else:
replacement_collection = self.models[foreign_collection][foreign_field][
"replacement_collection"
]
replacement = model.get(f"{replacement_collection}_id")
if not replacement:
self.errors.append(
f"{basemsg} points to {foreign_collection}/{foreign_id}/{foreign_field},"
f" but there is no replacement for {replacement_collection}"
)
actual_foreign_field = self.make_structured(foreign_field, replacement)
foreign_model = self.find_model(foreign_collection, foreign_id)
foreign_value = (
foreign_model.get(actual_foreign_field)
if foreign_model is not None
else None
)
foreign_field_type = self.get_type_from_collection(
foreign_field, foreign_collection
)
fqid = f"{collection}/{id}"
error = False
if foreign_field_type == "relation":
error = foreign_value != id
elif foreign_field_type == "relation-list":
error = not foreign_value or id not in foreign_value
elif foreign_field_type == "generic-relation":
error = foreign_value != fqid
elif foreign_field_type == "generic-relation-list":
error = not foreign_value or fqid not in foreign_value
else:
raise NotImplementedError()
if error:
self.errors.append(
f"{basemsg} points to {foreign_collection}/{foreign_id}/{actual_foreign_field},"
" but the reverse relation for is corrupt"
)
def split_fqid(self, fqid: str) -> Tuple[str, int]:
try:
collection, _id = fqid.split("/")
id = int(_id)
if collection not in self.models.keys():
raise CheckException(f"Fqid {fqid} has an invalid collection")
return collection, id
except (ValueError, AttributeError):
raise CheckException(f"Fqid {fqid} is malformed")
def split_collectionfield(self, collectionfield: str) -> Tuple[str, str]:
collection, field = collectionfield.split("/")
if collection not in self.models.keys():
raise CheckException(
f"Collectionfield {collectionfield} has an invalid collection"
)
if (
field not in self.models[collection]
): # Note: this has to be adopted when supporting template fields
raise CheckException(
f"Collectionfield {collectionfield} has an invalid field"
)
return collection, field
def get_to_generic_case(
self, collection: str, field: str, foreign_collection: str
) -> str:
"""Returns all reverse relations as collectionfields"""
to = self.models[collection][field]["to"]
if isinstance(to, dict):
if foreign_collection not in to["collections"]:
raise CheckException(
f"The collection {foreign_collection} is not supported "
"as a reverse relation in {collection}/{field}"
)
return to["field"]
for cf in to:
c, f = self.split_collectionfield(cf)
if c == foreign_collection:
return f
raise CheckException(
f"The collection {foreign_collection} is not supported as a reverse relation in {collection}/{field}"
)
def main() -> int:
files = sys.argv[1:]
if not files:
files = DEFAULT_FILES
is_import = "--import" in files
if is_import:
files = [x for x in files if x != "--import"]
failed = False
for f in files:
with open(f) as data:
try:
Checker(json.load(data), is_import=is_import).run_check()
except CheckException as e:
print(f"Check for {f} failed:\n", e)
failed = True
else:
print(f"Check for {f} successful.")
return 1 if failed else 0
if __name__ == "__main__":
sys.exit(main())