Update serializers to work with new database models

This commit is contained in:
Ethan Paul 2020-03-08 23:14:17 -04:00
parent 8de4da92ef
commit ad7a4ea278
5 changed files with 177 additions and 59 deletions

View File

@ -1,4 +1,4 @@
# pylint: disable=missing-docstring
from keyosk.serializers.account import AccountSerializer
from keyosk.serializers.account_acl import AccountACLSerializer
from keyosk.serializers.domain import DomainSerializer
from keyosk.serializers.scope import AccountScopeSerializer

View File

@ -1,25 +1,61 @@
from typing import List
import datetime
from typing import Any
from typing import Dict
from typing import Union
from uuid import UUID
from uuid import uuid4
import marshmallow as msh
from playhouse import shortcuts
from keyosk import fields as custom_fields
from keyosk.serializers.account_acl import AccountACLSerializer
from keyosk._fields import Epoch
from keyosk._fields import RawMultiType
from keyosk.database import KeyoskAccount
from keyosk.serializers.scope import AccountScopeSerializer
class AccountSerializer(msh.Schema):
uuid = msh.fields.UUID(required=True)
created = custom_fields.Epoch(required=True)
updated = custom_fields.Epoch(required=True)
created = Epoch(required=True)
updated = Epoch(required=True)
username = msh.fields.String(required=True)
enabled = msh.fields.Boolean(required=True)
extras = msh.fields.Dict(
keys=msh.fields.String(),
values=custom_fields.RawMultiType([int, float, bool, str], allow_none=True),
values=RawMultiType([int, float, bool, str], allow_none=True),
required=True,
)
acls = msh.fields.List(msh.fields.Nested(AccountACLSerializer), required=True)
scopes = msh.fields.List(msh.fields.Nested(AccountScopeSerializer), required=True)
@staticmethod
def creation_fields() -> List[str]:
return ["uuid", "created", "updated"]
@msh.post_load
def _make_model(data: Dict[str, Any], **kwargs) -> KeyoskAccount:
scopes = []
for item in data["scopes"]:
item.account_id = data["uuid"]
scopes.append(item)
data["scopes"] = scopes
return KeyoskAccount(**data)
@staticmethod
@msh.pre_dump
def _unmake_model(data: KeyoskAccount, **kwargs) -> Dict[str, Any]:
return shortcuts.model_to_dict(
data, recurse=False, backrefs=True, extra_attrs=["extras"],
)
@classmethod
def update(cls, uuid: Union[str, uuid.UUID], data: Dict[str, Any]) -> KeyoskAccount:
data.update({"uuid": UUID(str(uuid))})
loaded = cls(exclude=["created", "updated"]).load(data)
loaded.updated = datetime.datetime.utcnow()
return loaded
@classmethod
def create(cls, data: Dict[str, Any]) -> KeyoskAccount:
data.update({"uuid": uuid4()})
loaded = cls(exclude=["created", "updated"]).load(data)
loaded.updated = datetime.datetime.utcnow()
loaded.created = datetime.datetime.utcnow()
return loaded

View File

@ -1,13 +0,0 @@
import marshmallow as msh
class AccountACLSerializer(msh.Schema):
access_list = msh.fields.String(required=True, data_key="access-list")
permission = msh.fields.String(required=True)
with_server_secret = msh.fields.Boolean(
required=True, data_key="with-server-secret"
)
with_client_secret = msh.fields.Boolean(
required=True, data_key="with-client-secret"
)

View File

@ -1,21 +1,20 @@
import datetime
import re
from typing import Any
from typing import Dict
from typing import List
from typing import Union
from uuid import UUID
from uuid import uuid4
import marshmallow as msh
from playhouse import shortcuts
from keyosk import constants
from keyosk import fields as custom_fields
class DomainPermissionSerializer(msh.Schema):
""""""
name = msh.fields.String(
required=True,
validate=msh.validate.Regexp(constants.REGEX_DOMAIN_PERMISSION_NAME),
)
bitindex = msh.fields.Integer(required=True, validate=msh.validate.Range(min=0))
from keyosk._fields import Epoch
from keyosk.database import KeyoskDomain
from keyosk.database import KeyoskDomainAccessList
from keyosk.database import KeyoskDomainPermission
class DomainSerializer(msh.Schema):
@ -30,8 +29,8 @@ class DomainSerializer(msh.Schema):
"""
uuid = msh.fields.UUID(required=True)
created = custom_fields.Epoch(required=True)
updated = custom_fields.Epoch(required=True)
created = Epoch(required=True)
updated = Epoch(required=True)
name = msh.fields.String(
required=True, validate=msh.validate.Regexp(constants.REGEX_DOMAIN_NAME)
)
@ -55,35 +54,102 @@ class DomainSerializer(msh.Schema):
enable_refresh = msh.fields.Boolean(required=True, data_key="enable-refresh")
lifespan_access = msh.fields.TimeDelta(required=True, data_key="lifespan-access")
lifespan_refresh = msh.fields.TimeDelta(required=True, data_key="lifespan-refresh")
access_list_names = msh.fields.List(
msh.fields.String(
validate=msh.validate.Regexp(constants.REGEX_DOMAIN_ACCESS_LIST_NAME)
),
access_lists = msh.fields.Method(
serialize="serialize_access_lists",
deserialize="deserialize_access_lists",
required=True,
data_key="access-lists",
)
permissions = msh.fields.List(
msh.fields.Nested(DomainPermissionSerializer), required=True,
permissions = msh.fields.Method(
serialize="serialize_permissions",
deserialize="deserialize_permissions",
required=True,
)
@msh.validates("access_list_names")
def validate_acl_names(self, data: List[str], **kwargs):
if len(data) != len(set(data)):
raise msh.ValidationError("Duplicate access list names")
@staticmethod
def deserialize_access_lists(value: List[str]) -> List[KeyoskDomainAccessList]:
models = []
errors = {}
for index, item in enumerate(set(value)):
if not isinstance(item, str):
errors[index] = f"Invalid type '{type(item)}', expected 'str'"
elif not re.search(constants.REGEX_DOMAIN_ACCESS_LIST_NAME, item):
errors[
index
] = f"Invalid format for value '{item}', must match '{constants.REGEX_DOMAIN_ACCESS_LIST_NAME}'"
else:
models.append(KeyoskDomainAccessList(name=item))
@msh.validates("permissions")
def validate_permissions(self, data: List[Dict[str, Union[str, int]]], **kwargs):
names = [item["name"] for item in data]
if len(names) != len(set(names)):
raise msh.ValidationError("Duplicat permission names")
if errors:
raise msh.ValidationError(errors)
indexes = sorted([item["bitindex"] for item in data])
for index in len(indexes):
if indexes[index - 1] != (index - 1):
raise msh.ValidationError(
f"Invalid bitindexes provided: expected zero-index sequential sequence, recieved {indexes}"
)
return models
@staticmethod
def creation_fields() -> List[str]:
return ["uuid", "created", "updated"]
def serialize_access_lists(obj: Dict[Any, Any]) -> List[str]:
return [item.name for item in obj["access_lists"]]
@staticmethod
def deserialize_permissions(value: List[str]) -> List[KeyoskDomainPermission]:
models = []
errors = {}
for index, item in enumerate(set(value)):
if not isinstance(item, str):
errors[index] = f"Invalid type '{type(item)}', expected 'str'"
elif not re.search(constants.REGEX_DOMAIN_PERMISSION_NAME, item):
errors[
index
] = f"Invalid format for value '{item}', must match '{constants.REGEX_DOMAIN_PERMISSION_NAME}'"
else:
models.append(KeyoskDomainPermission(name=item))
if errors:
raise msh.ValidationError(errors)
return models
@staticmethod
def serialize_permissions(obj: Dict[Any, Any]) -> List[str]:
return [item.name for item in obj["permissions"]]
@staticmethod
@msh.post_load
def _make_model(data: Dict[str, Any], **kwargs) -> KeyoskDomain:
acls = []
for item in data["access_lists"]:
item.domain_id = data["uuid"]
acls.append(item)
data["access_lists"] = acls
permissions = []
for item in data["permissions"]:
item.domain_id = data["uuid"]
permissions.append(item)
data["permissions"] = permissions
return KeyoskDomain(**data)
@staticmethod
@msh.pre_dump
def _unmake_model(data: KeyoskDomain, **kwargs) -> Dict[str, Any]:
return shortcuts.model_to_dict(
data,
recurse=False,
backrefs=True,
extra_attrs=["lifespan_access", "lifespan_refresh",],
)
@classmethod
def update(cls, uuid: Union[str, uuid.UUID], data: Dict[str, Any]) -> KeyoskDomain:
data.update({"uuid": UUID(str(uuid))})
loaded = cls(exclude=["created", "updated"]).load(data)
loaded.updated = datetime.datetime.utcnow()
return loaded
@classmethod
def create(cls, data: Dict[str, Any]) -> KeyoskDomain:
data.update({"uuid": uuid4()})
loaded = cls(exclude=["created", "updated"]).load(data)
loaded.updated = datetime.datetime.utcnow()
loaded.created = datetime.datetime.utcnow()
return loaded

View File

@ -0,0 +1,29 @@
from typing import Any
from typing import Dict
import marshmallow as msh
from playhouse import shortcuts
from keyosk.database import KeyoskAccountScope
class AccountScopeSerializer(msh.Schema):
access_list = msh.fields.String(required=True, data_key="access-list")
permission = msh.fields.String(required=True)
with_server_secret = msh.fields.Boolean(
required=True, data_key="with-server-secret"
)
with_client_secret = msh.fields.Boolean(
required=True, data_key="with-client-secret"
)
@staticmethod
@msh.post_load
def _make_model(data: Dict[str, Any], **kwargs) -> KeyoskAccountScope:
return KeyoskAccountScope(**data)
@staticmethod
@msh.pre_dump
def _unmake_model(data: KeyoskAccountScope, **kwargs) -> Dict[str, Any]:
return shortcuts.model_to_dict(data, recurse=False, backrefs=False,)