Update domain serializer to improve validation accuracy

Remove extras serializers submodule
Add validation regexes to constants
This commit is contained in:
Ethan Paul 2020-02-27 00:07:19 -05:00
parent 31845ef7c5
commit 32cd33728d
4 changed files with 70 additions and 34 deletions

View File

@ -3,3 +3,15 @@
DEFAULT_CONFIG_PATH = "/etc/keyosk/conf.toml"
ENV_CONFIG_PATH = "KYSK_CONF_PATH"
REGEX_FRIENDLY_NAME = r"^([a-z][a-z0-9]+)(-[a-z0-9]+)*$"
REGEX_DOMAIN_NAME = REGEX_FRIENDLY_NAME
REGEX_DOMAIN_ACCESS_LIST_NAME = REGEX_FRIENDLY_NAME
REGEX_DOMAIN_PERMISSION_NAME = REGEX_FRIENDLY_NAME
REGEX_DOMAIN_AUDIENCE = r"^[a-z][a-z0-9]{2,9}$"
REGEX_DOMAIN_TITLE = r"^.{1,30}$"

View File

@ -2,5 +2,3 @@
from keyosk.serializers.account import AccountSerializer
from keyosk.serializers.account_acl import AccountACLSerializer
from keyosk.serializers.domain import DomainSerializer
from keyosk.serializers.domain_extras import DomainAccessListSerializer
from keyosk.serializers.domain_extras import DomainPermissionSerializer

View File

@ -1,20 +1,48 @@
from typing import Dict
from typing import List
from typing import Union
import marshmallow as msh
from keyosk import constants
from keyosk import fields as custom_fields
from keyosk.serializers.domain_extras import DomainAccessListSerializer
from keyosk.serializers.domain_extras import DomainPermissionSerializer
class DomainPermissionSerializer(msh.Schema):
""""""
name = msh.fields.String(
required=True,
validate=msh.validate.Regexp(constants.REGEX_DOMAIN_PERMISSION_NAME),
)
bitindex = msh.fields.Integer(required=True, validate=msh.validate.Range(min=0))
class DomainSerializer(msh.Schema):
"""Serializer for domain records
This serializer is meant to translate between the client-facing data format and a
data format that is ready to be used with the :class:`database.Domain` model for
access to and from the database.
.. note:: Schema fields here map 1:1 with the model fields/properties of the same
names on :class:`database.Domain`.
"""
uuid = msh.fields.UUID(required=True)
created = custom_fields.Epoch(required=True)
updated = custom_fields.Epoch(required=True)
name = msh.fields.String(required=True)
audience = msh.fields.String(required=True)
title = msh.fields.String(required=True, allow_none=True)
name = msh.fields.String(
required=True, validate=msh.validate.Regexp(constants.REGEX_DOMAIN_NAME)
)
audience = msh.fields.String(
required=True, validate=msh.validate.Regexp(constants.REGEX_DOMAIN_AUDIENCE)
)
title = msh.fields.String(
required=True,
allow_none=True,
validate=msh.validate.Regexp(constants.REGEX_DOMAIN_TITLE),
)
description = msh.fields.String(required=True, allow_none=True)
contact = msh.fields.String(required=True, allow_none=True)
enabled = msh.fields.Boolean(required=True)
@ -25,17 +53,37 @@ class DomainSerializer(msh.Schema):
required=True, data_key="enable-server-set-auth"
)
enable_refresh = msh.fields.Boolean(required=True, data_key="enable-refresh")
lifespan_access = msh.fields.Boolean(required=True, data_key="lifespan-access")
lifespan_refresh = msh.fields.Boolean(required=True, data_key="lifespan-refresh")
access_lists = msh.fields.List(
msh.fields.Nested(DomainAccessListSerializer),
lifespan_access = msh.fields.TimeDelta(required=True, data_key="lifespan-access")
lifespan_refresh = msh.fields.TimeDelta(required=True, data_key="lifespan-refresh")
access_list_names = msh.fields.List(
msh.fields.String(
validate=msh.validate.Regexp(constants.REGEX_DOMAIN_ACCESS_LIST_NAME)
),
required=True,
data_key="access-lists",
)
permissions = msh.fields.List(
msh.fields.Nested(DomainPermissionSerializer), required=True
msh.fields.Nested(DomainPermissionSerializer), required=True,
)
@msh.validates("access_list_names")
def validate_acl_names(self, data: List[str], **kwargs):
if len(data) != len(set(data)):
raise msh.ValidationError("Duplicate access list names")
@msh.validates("permissions")
def validate_permissions(self, data: List[Dict[str, Union[str, int]]], **kwargs):
names = [item["name"] for item in data]
if len(names) != len(set(names)):
raise msh.ValidationError("Duplicat permission names")
indexes = sorted([item["bitindex"] for item in data])
for index in len(indexes):
if indexes[index - 1] != (index - 1):
raise msh.ValidationError(
f"Invalid bitindexes provided: expected zero-index sequential sequence, recieved {indexes}"
)
@staticmethod
def creation_fields() -> List[str]:
return ["uuid", "created", "updated"]

View File

@ -1,22 +0,0 @@
from typing import Dict
import marshmallow as msh
class DomainAccessListSerializer(msh.Schema):
name = msh.fields.String(required=True)
@msh.pre_load
def _from_string(self, data: str, *args, **kwargs) -> Dict[str, str]:
return {"name": data}
@msh.post_dump
def _to_string(self, data, *args, **kwargs) -> str:
return data["name"]
class DomainPermissionSerializer(msh.Schema):
name = msh.fields.String(required=True)
bitindex = msh.fields.Integer(required=True, validate=msh.validate.Range(min=0))