diff --git a/keyosk/serializers/__init__.py b/keyosk/serializers/__init__.py index e3b3f98..1b8d609 100644 --- a/keyosk/serializers/__init__.py +++ b/keyosk/serializers/__init__.py @@ -1,4 +1,4 @@ # pylint: disable=missing-docstring from keyosk.serializers.account import AccountSerializer -from keyosk.serializers.account_acl import AccountACLSerializer from keyosk.serializers.domain import DomainSerializer +from keyosk.serializers.scope import AccountScopeSerializer diff --git a/keyosk/serializers/account.py b/keyosk/serializers/account.py index 9d063ef..fb9cf28 100644 --- a/keyosk/serializers/account.py +++ b/keyosk/serializers/account.py @@ -1,25 +1,61 @@ -from typing import List +import datetime +from typing import Any +from typing import Dict +from typing import Union +from uuid import UUID +from uuid import uuid4 import marshmallow as msh +from playhouse import shortcuts -from keyosk import fields as custom_fields -from keyosk.serializers.account_acl import AccountACLSerializer +from keyosk._fields import Epoch +from keyosk._fields import RawMultiType +from keyosk.database import KeyoskAccount +from keyosk.serializers.scope import AccountScopeSerializer class AccountSerializer(msh.Schema): uuid = msh.fields.UUID(required=True) - created = custom_fields.Epoch(required=True) - updated = custom_fields.Epoch(required=True) + created = Epoch(required=True) + updated = Epoch(required=True) username = msh.fields.String(required=True) enabled = msh.fields.Boolean(required=True) extras = msh.fields.Dict( keys=msh.fields.String(), - values=custom_fields.RawMultiType([int, float, bool, str], allow_none=True), + values=RawMultiType([int, float, bool, str], allow_none=True), required=True, ) - acls = msh.fields.List(msh.fields.Nested(AccountACLSerializer), required=True) + scopes = msh.fields.List(msh.fields.Nested(AccountScopeSerializer), required=True) @staticmethod - def creation_fields() -> List[str]: - return ["uuid", "created", "updated"] + @msh.post_load + def _make_model(data: Dict[str, Any], **kwargs) -> KeyoskAccount: + scopes = [] + for item in data["scopes"]: + item.account_id = data["uuid"] + scopes.append(item) + data["scopes"] = scopes + return KeyoskAccount(**data) + + @staticmethod + @msh.pre_dump + def _unmake_model(data: KeyoskAccount, **kwargs) -> Dict[str, Any]: + return shortcuts.model_to_dict( + data, recurse=False, backrefs=True, extra_attrs=["extras"], + ) + + @classmethod + def update(cls, uuid: Union[str, uuid.UUID], data: Dict[str, Any]) -> KeyoskAccount: + data.update({"uuid": UUID(str(uuid))}) + loaded = cls(exclude=["created", "updated"]).load(data) + loaded.updated = datetime.datetime.utcnow() + return loaded + + @classmethod + def create(cls, data: Dict[str, Any]) -> KeyoskAccount: + data.update({"uuid": uuid4()}) + loaded = cls(exclude=["created", "updated"]).load(data) + loaded.updated = datetime.datetime.utcnow() + loaded.created = datetime.datetime.utcnow() + return loaded diff --git a/keyosk/serializers/account_acl.py b/keyosk/serializers/account_acl.py deleted file mode 100644 index 94302ca..0000000 --- a/keyosk/serializers/account_acl.py +++ /dev/null @@ -1,13 +0,0 @@ -import marshmallow as msh - - -class AccountACLSerializer(msh.Schema): - - access_list = msh.fields.String(required=True, data_key="access-list") - permission = msh.fields.String(required=True) - with_server_secret = msh.fields.Boolean( - required=True, data_key="with-server-secret" - ) - with_client_secret = msh.fields.Boolean( - required=True, data_key="with-client-secret" - ) diff --git a/keyosk/serializers/domain.py b/keyosk/serializers/domain.py index 131a7dd..ee654ba 100644 --- a/keyosk/serializers/domain.py +++ b/keyosk/serializers/domain.py @@ -1,21 +1,20 @@ +import datetime +import re +from typing import Any from typing import Dict from typing import List from typing import Union +from uuid import UUID +from uuid import uuid4 import marshmallow as msh +from playhouse import shortcuts from keyosk import constants -from keyosk import fields as custom_fields - - -class DomainPermissionSerializer(msh.Schema): - """""" - - name = msh.fields.String( - required=True, - validate=msh.validate.Regexp(constants.REGEX_DOMAIN_PERMISSION_NAME), - ) - bitindex = msh.fields.Integer(required=True, validate=msh.validate.Range(min=0)) +from keyosk._fields import Epoch +from keyosk.database import KeyoskDomain +from keyosk.database import KeyoskDomainAccessList +from keyosk.database import KeyoskDomainPermission class DomainSerializer(msh.Schema): @@ -30,8 +29,8 @@ class DomainSerializer(msh.Schema): """ uuid = msh.fields.UUID(required=True) - created = custom_fields.Epoch(required=True) - updated = custom_fields.Epoch(required=True) + created = Epoch(required=True) + updated = Epoch(required=True) name = msh.fields.String( required=True, validate=msh.validate.Regexp(constants.REGEX_DOMAIN_NAME) ) @@ -55,35 +54,102 @@ class DomainSerializer(msh.Schema): enable_refresh = msh.fields.Boolean(required=True, data_key="enable-refresh") lifespan_access = msh.fields.TimeDelta(required=True, data_key="lifespan-access") lifespan_refresh = msh.fields.TimeDelta(required=True, data_key="lifespan-refresh") - access_list_names = msh.fields.List( - msh.fields.String( - validate=msh.validate.Regexp(constants.REGEX_DOMAIN_ACCESS_LIST_NAME) - ), + access_lists = msh.fields.Method( + serialize="serialize_access_lists", + deserialize="deserialize_access_lists", required=True, data_key="access-lists", ) - permissions = msh.fields.List( - msh.fields.Nested(DomainPermissionSerializer), required=True, + permissions = msh.fields.Method( + serialize="serialize_permissions", + deserialize="deserialize_permissions", + required=True, ) - @msh.validates("access_list_names") - def validate_acl_names(self, data: List[str], **kwargs): - if len(data) != len(set(data)): - raise msh.ValidationError("Duplicate access list names") + @staticmethod + def deserialize_access_lists(value: List[str]) -> List[KeyoskDomainAccessList]: + models = [] + errors = {} + for index, item in enumerate(set(value)): + if not isinstance(item, str): + errors[index] = f"Invalid type '{type(item)}', expected 'str'" + elif not re.search(constants.REGEX_DOMAIN_ACCESS_LIST_NAME, item): + errors[ + index + ] = f"Invalid format for value '{item}', must match '{constants.REGEX_DOMAIN_ACCESS_LIST_NAME}'" + else: + models.append(KeyoskDomainAccessList(name=item)) - @msh.validates("permissions") - def validate_permissions(self, data: List[Dict[str, Union[str, int]]], **kwargs): - names = [item["name"] for item in data] - if len(names) != len(set(names)): - raise msh.ValidationError("Duplicat permission names") + if errors: + raise msh.ValidationError(errors) - indexes = sorted([item["bitindex"] for item in data]) - for index in len(indexes): - if indexes[index - 1] != (index - 1): - raise msh.ValidationError( - f"Invalid bitindexes provided: expected zero-index sequential sequence, recieved {indexes}" - ) + return models @staticmethod - def creation_fields() -> List[str]: - return ["uuid", "created", "updated"] + def serialize_access_lists(obj: Dict[Any, Any]) -> List[str]: + return [item.name for item in obj["access_lists"]] + + @staticmethod + def deserialize_permissions(value: List[str]) -> List[KeyoskDomainPermission]: + models = [] + errors = {} + for index, item in enumerate(set(value)): + if not isinstance(item, str): + errors[index] = f"Invalid type '{type(item)}', expected 'str'" + elif not re.search(constants.REGEX_DOMAIN_PERMISSION_NAME, item): + errors[ + index + ] = f"Invalid format for value '{item}', must match '{constants.REGEX_DOMAIN_PERMISSION_NAME}'" + else: + models.append(KeyoskDomainPermission(name=item)) + + if errors: + raise msh.ValidationError(errors) + + return models + + @staticmethod + def serialize_permissions(obj: Dict[Any, Any]) -> List[str]: + return [item.name for item in obj["permissions"]] + + @staticmethod + @msh.post_load + def _make_model(data: Dict[str, Any], **kwargs) -> KeyoskDomain: + acls = [] + for item in data["access_lists"]: + item.domain_id = data["uuid"] + acls.append(item) + data["access_lists"] = acls + + permissions = [] + for item in data["permissions"]: + item.domain_id = data["uuid"] + permissions.append(item) + data["permissions"] = permissions + + return KeyoskDomain(**data) + + @staticmethod + @msh.pre_dump + def _unmake_model(data: KeyoskDomain, **kwargs) -> Dict[str, Any]: + return shortcuts.model_to_dict( + data, + recurse=False, + backrefs=True, + extra_attrs=["lifespan_access", "lifespan_refresh",], + ) + + @classmethod + def update(cls, uuid: Union[str, uuid.UUID], data: Dict[str, Any]) -> KeyoskDomain: + data.update({"uuid": UUID(str(uuid))}) + loaded = cls(exclude=["created", "updated"]).load(data) + loaded.updated = datetime.datetime.utcnow() + return loaded + + @classmethod + def create(cls, data: Dict[str, Any]) -> KeyoskDomain: + data.update({"uuid": uuid4()}) + loaded = cls(exclude=["created", "updated"]).load(data) + loaded.updated = datetime.datetime.utcnow() + loaded.created = datetime.datetime.utcnow() + return loaded diff --git a/keyosk/serializers/scope.py b/keyosk/serializers/scope.py new file mode 100644 index 0000000..73a7ded --- /dev/null +++ b/keyosk/serializers/scope.py @@ -0,0 +1,29 @@ +from typing import Any +from typing import Dict + +import marshmallow as msh +from playhouse import shortcuts + +from keyosk.database import KeyoskAccountScope + + +class AccountScopeSerializer(msh.Schema): + + access_list = msh.fields.String(required=True, data_key="access-list") + permission = msh.fields.String(required=True) + with_server_secret = msh.fields.Boolean( + required=True, data_key="with-server-secret" + ) + with_client_secret = msh.fields.Boolean( + required=True, data_key="with-client-secret" + ) + + @staticmethod + @msh.post_load + def _make_model(data: Dict[str, Any], **kwargs) -> KeyoskAccountScope: + return KeyoskAccountScope(**data) + + @staticmethod + @msh.pre_dump + def _unmake_model(data: KeyoskAccountScope, **kwargs) -> Dict[str, Any]: + return shortcuts.model_to_dict(data, recurse=False, backrefs=False,)