import json
import re
import typing
from copy import deepcopy
from decimal import Decimal
from distutils.version import StrictVersion
from functools import lru_cache
from pathlib import Path
from typing import Tuple, Union, Optional
from uuid import UUID
import funcy
from funcy import cached_property
from jsonschema import SchemaError, RefResolutionError, FormatChecker
from jsonschema.validators import Draft4Validator
from hedwig.conf import settings
from hedwig.exceptions import ValidationError
from hedwig.validators.base import HedwigBaseValidator, MetaAttributes
def _json_default(obj):
if isinstance(obj, Decimal):
int_val = int(obj)
if int_val == obj:
return int_val
else:
return float(obj)
elif isinstance(obj, UUID):
return str(obj)
raise TypeError
[docs]class JSONSchemaValidator(HedwigBaseValidator):
checker = FormatChecker()
"""
FormatChecker that checks for `format` JSON-schema field. This may be customized by an app by overriding setting
`HEDWIG_DATA_VALIDATOR_CLASS` and defining more format checkers.
"""
schema: dict
"""
The schema to validate data against - supplied by app
"""
# uuid separated by hyphens:
_human_uuid_re = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$")
_version_pattern_re = re.compile(r"^([0-9]+)\.\*$")
_validator: Draft4Validator
_container_validator: Draft4Validator
FORMAT_VERSIONS = [StrictVersion('1.0')]
'''
Here are the schema definitions:
Version 1.0:
{
"format_version": "1.0",
"schema": "https://github.com/cloudchacho/hedwig-python/schema#/schemas/trip.created/1.0",
"id": "b1328174-a21c-43d3-b303-964dfcc76efc",
"metadata": {
"timestamp": 1460868253255,
"publisher": "myapp",
"headers": {
...
}
},
"data": {
...
}
}
All the top-level fields (other than `metadata`) are required to be non-empty. `metadata` field is expected to
be present, but may be empty. All fields in `metadata` are optional. `data` is validated using `schema`.
'''
def __init__(self, schema: typing.Optional[dict] = None) -> None:
# automatically load schema
container_schema_filepath = Path(__file__).resolve().parent / 'jsonschema_container_schema.json'
with open(container_schema_filepath) as f:
container_schema = json.load(f)
self._container_validator = Draft4Validator(container_schema)
if schema is None:
# automatically load schema
schema_filepath = settings.HEDWIG_JSONSCHEMA_FILE
with open(schema_filepath) as f:
schema = json.load(f)
self._check_schema(schema)
self.schema = schema
self._validator = Draft4Validator(schema, format_checker=self.checker)
# schema encoding, eg: hedwig.automatic.com/schema#/schemas/trip.created/1.0
schema_fmt = f'{self.schema_root}#/schemas/{{message_type}}/{{message_version}}'
schema_re = re.compile(r'([^/]+)/([^/]+)$')
super().__init__(
schema_fmt,
schema_re,
StrictVersion('1.0'),
)
@cached_property
def schema_root(self) -> str:
return self.schema['id']
def _extract_data_helper(
self, message_payload: Union[str, bytes], attributes: dict, use_transport_message_attributes: bool
) -> Tuple[MetaAttributes, dict]:
assert isinstance(message_payload, str)
try:
payload = json.loads(message_payload)
except ValueError:
raise ValidationError('not a valid JSON')
if not use_transport_message_attributes:
errors = list(self._container_validator.iter_errors(payload))
if errors:
raise ValidationError(errors)
data = payload['data']
meta_attrs = MetaAttributes(
payload['metadata']['timestamp'],
payload['metadata']['publisher'],
payload['metadata']['headers'],
payload['id'],
payload['schema'],
payload['format_version'],
)
else:
data = payload
meta_attrs = self._decode_meta_attributes(attributes)
if meta_attrs.format_version != self._current_format_version:
raise ValidationError(f"Invalid format version: {meta_attrs.format_version}")
return meta_attrs, data
def _extract_data(
self, message_payload: Union[str, bytes], attributes: dict, use_transport_attributes: bool
) -> Tuple[MetaAttributes, dict]:
return self._extract_data_helper(
message_payload,
attributes,
use_transport_message_attributes=use_transport_attributes,
)
def _extract_data_firehose(self, line: str) -> Tuple[MetaAttributes, dict]:
return self._extract_data_helper(line, {}, use_transport_message_attributes=False)
@lru_cache(maxsize=20)
def _schema(self, message_type: str, major_version: int) -> dict:
schema_ptr = self._schema_fmt.format(message_type=message_type, message_version=f"{major_version}.*")
try:
_, schema = self._validator.resolver.resolve(schema_ptr)
except RefResolutionError:
raise ValidationError(f'Definition not found in schema: {schema_ptr}')
return schema
def _verify_known_minor_version(self, message_type: str, full_version: StrictVersion):
schema = self._schema(message_type, full_version.version[0])
schema_full_version = StrictVersion(schema["x-version"])
if schema_full_version.version[1] < full_version.version[1]:
raise ValidationError(
f'Unknown minor version: {full_version.version[1]}, last known minor version: '
f'{schema_full_version.version[1]}'
)
def _decode_data(
self,
meta_attrs: MetaAttributes,
message_type: str,
full_version: StrictVersion,
data: dict,
) -> dict:
if not meta_attrs.schema.startswith(self.schema_root):
raise ValidationError(f'message schema must start with "{self.schema_root}"')
schema = self._schema(message_type, full_version.version[0])
errors = list(self._validator.iter_errors(data, schema))
if errors:
raise ValidationError(errors)
return data
def _encode_data(self, data: dict) -> dict:
assert isinstance(data, dict)
# will get encoded in _encode_payload
return data
def _encode_payload_helper(
self,
meta_attrs: MetaAttributes,
data: dict,
use_transport_message_attributes: bool,
) -> Tuple[str, dict]:
if not use_transport_message_attributes:
payload = {
'format_version': str(self._current_format_version),
'schema': meta_attrs.schema,
'id': meta_attrs.id,
'metadata': {
'timestamp': meta_attrs.timestamp,
'publisher': meta_attrs.publisher,
'headers': meta_attrs.headers,
},
'data': data,
}
msg_attrs = deepcopy(meta_attrs.headers)
else:
payload = data
msg_attrs = self._encode_meta_attributes(meta_attrs)
return (
json.dumps(payload, default=_json_default, allow_nan=False, separators=(',', ':'), indent=None),
msg_attrs,
)
def _encode_payload(
self, meta_attrs: MetaAttributes, data: dict, use_transport_attributes: bool
) -> Tuple[str, dict]:
return self._encode_payload_helper(meta_attrs, data, use_transport_attributes)
def _encode_payload_firehose(
self, message_type: str, version: StrictVersion, meta_attrs: MetaAttributes, data: dict
) -> str:
return self._encode_payload_helper(meta_attrs, data, use_transport_message_attributes=False)[0]
@classmethod
def _check_schema(cls, schema: dict) -> None:
msg_types_found = {k: False for k in funcy.chain(settings.HEDWIG_MESSAGE_ROUTING, settings.HEDWIG_CALLBACKS)}
# custom validation for Hedwig - TODO ideally should just be represented in json-schema file as meta schema,
# however jsonschema lib doesn't support draft 06 which is what's really required here
errors = []
if not schema.get('schemas'):
errors.append("Invalid schema file: expected key 'schemas' with non-empty value")
else:
for msg_type, versions in schema['schemas'].items():
if not isinstance(versions, dict) or not versions:
errors.append(f"Invalid definition for: '{msg_type}', value must contain a dict of valid versions")
continue
for version_pattern, definition in versions.items():
m = cls._version_pattern_re.match(version_pattern)
major_version: Optional[int]
if not m:
errors.append(f"Invalid version '{version_pattern}' for: '{msg_type}'")
major_version = None
else:
major_version = int(m.group(1))
if (msg_type, version_pattern) in msg_types_found:
msg_types_found[(msg_type, version_pattern)] = True
if not isinstance(definition, dict) and not definition:
errors.append(f"Invalid schema for: '{msg_type}' '{version_pattern}'")
continue
if 'x-version' not in definition:
errors.append(f"Invalid schema for: '{msg_type}' '{version_pattern}': missing x-version")
continue
try:
full_version = StrictVersion(definition['x-version'])
if major_version and full_version.version[0] != major_version:
errors.append(
f"Invalid full version: '{full_version}' for: '{msg_type}' '{version_pattern}'"
)
except ValueError:
errors.append(
f"Invalid full version: '{definition['x-version']}' for: '{msg_type}' "
f"'{version_pattern}'"
)
for (msg_type, version_pattern), found in msg_types_found.items():
if not found:
errors.append(f"Schema not found for '{msg_type}' v{version_pattern}")
if errors:
raise SchemaError(str(errors))
@staticmethod
@FormatChecker.cls_checks('human-uuid')
def _check_human_uuid(instance):
if not isinstance(instance, str):
return False
return bool(JSONSchemaValidator._human_uuid_re.match(instance))