138 lines
4.2 KiB
Python
138 lines
4.2 KiB
Python
"""Functions that help us generate and use info.json files.
|
|
"""
|
|
import json
|
|
import hjson
|
|
import jsonschema
|
|
from collections.abc import Mapping
|
|
from functools import lru_cache
|
|
from typing import OrderedDict
|
|
from pathlib import Path
|
|
|
|
from milc import cli
|
|
|
|
|
|
def _dict_raise_on_duplicates(ordered_pairs):
|
|
"""Reject duplicate keys."""
|
|
d = {}
|
|
for k, v in ordered_pairs:
|
|
if k in d:
|
|
raise ValueError("duplicate key: %r" % (k,))
|
|
else:
|
|
d[k] = v
|
|
return d
|
|
|
|
|
|
def json_load(json_file, strict=True):
|
|
"""Load a json file from disk.
|
|
|
|
Note: file must be a Path object.
|
|
"""
|
|
try:
|
|
# Get the IO Stream for Path objects
|
|
# Not necessary if the data is provided via stdin
|
|
if isinstance(json_file, Path):
|
|
json_file = json_file.open(encoding='utf-8')
|
|
return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
|
|
|
|
except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
|
|
cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
|
|
exit(1)
|
|
except Exception as e:
|
|
cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
|
|
exit(1)
|
|
|
|
|
|
@lru_cache(maxsize=0)
|
|
def load_jsonschema(schema_name):
|
|
"""Read a jsonschema file from disk.
|
|
"""
|
|
if Path(schema_name).exists():
|
|
return json_load(schema_name)
|
|
|
|
schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
|
|
|
|
if not schema_path.exists():
|
|
schema_path = Path('data/schemas/false.jsonschema')
|
|
|
|
return json_load(schema_path)
|
|
|
|
|
|
@lru_cache(maxsize=0)
|
|
def compile_schema_store():
|
|
"""Compile all our schemas into a schema store.
|
|
"""
|
|
schema_store = {}
|
|
|
|
for schema_file in Path('data/schemas').glob('*.jsonschema'):
|
|
schema_data = load_jsonschema(schema_file)
|
|
if not isinstance(schema_data, dict):
|
|
cli.log.debug('Skipping schema file %s', schema_file)
|
|
continue
|
|
schema_store[schema_data['$id']] = schema_data
|
|
|
|
return schema_store
|
|
|
|
|
|
@lru_cache(maxsize=0)
|
|
def create_validator(schema):
|
|
"""Creates a validator for the given schema id.
|
|
"""
|
|
schema_store = compile_schema_store()
|
|
resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
|
|
|
|
return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
|
|
|
|
|
|
def validate(data, schema):
|
|
"""Validates data against a schema.
|
|
"""
|
|
validator = create_validator(schema)
|
|
|
|
return validator(data)
|
|
|
|
|
|
def deep_update(origdict, newdict):
|
|
"""Update a dictionary in place, recursing to do a depth-first deep copy.
|
|
"""
|
|
for key, value in newdict.items():
|
|
if isinstance(value, Mapping):
|
|
origdict[key] = deep_update(origdict.get(key, {}), value)
|
|
|
|
else:
|
|
origdict[key] = value
|
|
|
|
return origdict
|
|
|
|
|
|
def merge_ordered_dicts(dicts):
|
|
"""Merges nested OrderedDict objects resulting from reading a hjson file.
|
|
Later input dicts overrides earlier dicts for plain values.
|
|
If any value is "!delete!", the existing value will be removed from its parent.
|
|
Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
|
|
Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
|
|
"""
|
|
result = OrderedDict()
|
|
|
|
def add_entry(target, k, v):
|
|
if k in target and isinstance(v, (OrderedDict, dict)):
|
|
if "!reset!" in v:
|
|
target[k] = v
|
|
else:
|
|
target[k] = merge_ordered_dicts([target[k], v])
|
|
if "!reset!" in target[k]:
|
|
del target[k]["!reset!"]
|
|
elif k in target and isinstance(v, list):
|
|
if v[0] == '!reset!':
|
|
target[k] = v[1:]
|
|
else:
|
|
target[k] = target[k] + v
|
|
elif v == "!delete!" and isinstance(target, (OrderedDict, dict)):
|
|
del target[k]
|
|
else:
|
|
target[k] = v
|
|
|
|
for d in dicts:
|
|
for (k, v) in d.items():
|
|
add_entry(result, k, v)
|
|
|
|
return result
|