mirror of
https://github.com/enpaul/keyosk.git
synced 2025-01-13 18:43:32 +00:00
Refactor database models to better leverage foreign key relationships
This commit is contained in:
parent
d3306120d5
commit
440d6e68f4
@ -27,20 +27,20 @@ from keyosk import config
|
||||
from keyosk import datatypes
|
||||
from keyosk.database._shared import INTERFACE as interface
|
||||
from keyosk.database._shared import KeyoskBaseModel
|
||||
from keyosk.database.account import Account
|
||||
from keyosk.database.account_acl import AccountACLEntry
|
||||
from keyosk.database.domain import Domain
|
||||
from keyosk.database.domain import DomainAccessList
|
||||
from keyosk.database.domain import DomainPermission
|
||||
from keyosk.database.account import KeyoskAccount
|
||||
from keyosk.database.account import KeyoskAccountScope
|
||||
from keyosk.database.domain import KeyoskDomain
|
||||
from keyosk.database.domain import KeyoskDomainAccessList
|
||||
from keyosk.database.domain import KeyoskDomainPermission
|
||||
from keyosk.database.token import Token
|
||||
|
||||
|
||||
MODELS: List[Type[KeyoskBaseModel]] = [
|
||||
Account,
|
||||
DomainAccessList,
|
||||
DomainPermission,
|
||||
Domain,
|
||||
AccountACLEntry,
|
||||
KeyoskAccount,
|
||||
KeyoskDomain,
|
||||
KeyoskDomainAccessList,
|
||||
KeyoskDomainPermission,
|
||||
KeyoskAccountScope,
|
||||
Token,
|
||||
]
|
||||
|
||||
|
@ -12,10 +12,6 @@ separate modules, and for somewhat arbitrary reasons the base model was put here
|
||||
init function kept in init.
|
||||
"""
|
||||
import uuid
|
||||
from typing import Any
|
||||
from typing import Generator
|
||||
from typing import List
|
||||
from typing import Tuple
|
||||
|
||||
import peewee
|
||||
|
||||
@ -41,41 +37,5 @@ class KeyoskBaseModel(peewee.Model):
|
||||
null=False, unique=True, primary_key=True, default=uuid.uuid4
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def dict_keys() -> List[str]:
|
||||
"""Return tuple of attribute names that should be included in the dict form of the model
|
||||
Inteneded to be used in a dictionary comprehension; see the :meth:`__iter__` method for
|
||||
usage example.
|
||||
"""
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def foreign_ref() -> List[str]:
|
||||
"""Return tuple of attribute names that point to foreign key references on the model
|
||||
Intended for usage when recursively converting models into dictionaries ahead of
|
||||
serialization; see the :meth:`__iter__` method for usage example.
|
||||
|
||||
.. warning:: Foreign keys should only be included here when their attribute appears in the
|
||||
tuple returned from :meth:`dict_keys`
|
||||
"""
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def foreign_backref() -> List[str]:
|
||||
"""Return tuple of attribute names that point to foreign backreferences on the model
|
||||
Inteneded for usage when recursively converting models into dictionaries ahead of
|
||||
serialization; see the :meth:`__iter__` method for usage example.
|
||||
|
||||
.. warning:: Foreign keys should only be included here when their attribute appears in the
|
||||
tuple returned from :meth:`dict_keys`
|
||||
"""
|
||||
return []
|
||||
|
||||
def __iter__(self) -> Generator[Tuple[str, Any], None, None]:
|
||||
for key in self.dict_keys():
|
||||
if key in self.foreign_ref():
|
||||
yield key, dict(getattr(self, key))
|
||||
elif key in self.foreign_backref():
|
||||
yield key, [dict(item) for item in getattr(self, key)]
|
||||
else:
|
||||
yield key, getattr(self, key)
|
||||
def save_recursive(self, force_insert: bool = False):
|
||||
pass
|
||||
|
@ -1,17 +1,16 @@
|
||||
"""Authentication account model definition"""
|
||||
import datetime
|
||||
import json
|
||||
import secrets
|
||||
from typing import List
|
||||
|
||||
import passlib.hash
|
||||
import peewee
|
||||
|
||||
from keyosk.database._shared import KeyoskBaseModel
|
||||
from keyosk.database.domain import KeyoskDomainAccessList
|
||||
from keyosk.database.domain import KeyoskDomainPermission
|
||||
from keyosk.datatypes import Extras
|
||||
|
||||
|
||||
class Account(KeyoskBaseModel):
|
||||
class KeyoskAccount(KeyoskBaseModel):
|
||||
"""Authentication account storage model
|
||||
|
||||
:attribute created: Datetime indicating when the account was first created
|
||||
@ -47,52 +46,44 @@ class Account(KeyoskBaseModel):
|
||||
"""Set the extras dictionary"""
|
||||
self._extras = json.dumps(value)
|
||||
|
||||
def verify_client_set_secret(self, value: str) -> bool:
|
||||
"""Verify the client set secret matches a value
|
||||
|
||||
:param value: The string to check matches the client-set-secret
|
||||
:returns: Boolean indicating whether the provided value matches the encrypted
|
||||
secret
|
||||
"""
|
||||
return passlib.hash.pbkdf2_sha512.verify(
|
||||
value, self.encrypted_client_set_secret
|
||||
)
|
||||
|
||||
def verify_server_set_secret(self, value: str) -> bool:
|
||||
"""Verify the server set secret matches a value
|
||||
|
||||
:param value: The string to check matches the server-set-secret
|
||||
:returns: Boolean indicating whether the provided value matches the encrypted
|
||||
secret
|
||||
"""
|
||||
return passlib.hash.pbkdf2_sha512.verify(
|
||||
value, self.encrypted_server_set_secret
|
||||
)
|
||||
|
||||
def update_client_set_secret(self, value: str) -> None:
|
||||
"""Update the client set secret
|
||||
|
||||
:param value: The string to set the encrypted client-set-secret to
|
||||
"""
|
||||
self.encrypted_client_set_secret = passlib.hash.pbkdf2_sha512.hash(value)
|
||||
|
||||
def update_server_set_secret(self, length: int = 42) -> str:
|
||||
"""Update the server set secret
|
||||
|
||||
:param length: Optional length of the generated token
|
||||
:returns: The new value of the server set secret
|
||||
"""
|
||||
value = secrets.token_urlsafe(length)
|
||||
self.encrypted_server_set_secret = passlib.hash.pbkdf2_sha512.hash(value)
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def dict_keys() -> List[str]:
|
||||
return ["uuid", "created", "updated", "username", "enabled", "extras", "acls"]
|
||||
|
||||
@staticmethod
|
||||
def foreign_backref() -> List[str]:
|
||||
return ["acls"]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Account '{self.username}' ({self.uuid})"
|
||||
|
||||
|
||||
class KeyoskAccountScope(KeyoskBaseModel):
|
||||
"""Access control list entry model definition
|
||||
|
||||
:attribute account: Account the ACL entry applies to
|
||||
:attribute access_list: The access list the entry is for
|
||||
:attribute permission: The permission the entry is for
|
||||
:attribute with_server_secret: Whether the permission should be applied when the
|
||||
account authenticates with the account's
|
||||
server-set-secret
|
||||
:attribute with_client_secret: Whether the permission should be applied when the
|
||||
account authenticates with the account's
|
||||
client-set-secret
|
||||
|
||||
.. note:: Since permissions are by definition boolean, there is no need to store a
|
||||
value parameter with an ACL entry: if an entry exists for a given account
|
||||
for a given access list with a given permission, then that permission is
|
||||
granted on that access list to that account; similarly, if one does not
|
||||
exist then it is not granted.
|
||||
"""
|
||||
|
||||
class Meta: # pylint: disable=missing-docstring,too-few-public-methods
|
||||
table_name = "account_scope"
|
||||
|
||||
account = peewee.ForeignKeyField(
|
||||
KeyoskAccount, null=False, on_delete="CASCADE", backref="scopes"
|
||||
)
|
||||
access_list = peewee.ForeignKeyField(
|
||||
KeyoskDomainAccessList, null=False, on_delete="CASCADE"
|
||||
)
|
||||
permission = peewee.ForeignKeyField(
|
||||
KeyoskDomainPermission, null=False, on_delete="CASCADE"
|
||||
)
|
||||
with_server_secret = peewee.BooleanField(null=False)
|
||||
with_client_secret = peewee.BooleanField(null=False)
|
||||
|
||||
def __str__(self):
|
||||
return f"ACL {self.permission.name}@{self.access_list.name} (scope:{'+'.join([item for item in ['server' if self.with_server_secret else '', 'client' if self.with_client_secret else ''] if item])})"
|
||||
|
@ -1,52 +0,0 @@
|
||||
"""Account access control list entry model definition
|
||||
|
||||
Access Control Lists (ACLs) are entities that can have permissions assigned to them
|
||||
under certain conditions. Permissions are the possible permissions that can be granted-
|
||||
or not granted- to an ACL. An entry in an ACL comprises the ACL identifier, the
|
||||
permission to grant, and the identity that should be granted the permission.
|
||||
"""
|
||||
import peewee
|
||||
|
||||
from keyosk.database._shared import KeyoskBaseModel
|
||||
from keyosk.database.account import Account
|
||||
from keyosk.database.domain import DomainAccessList
|
||||
from keyosk.database.domain import DomainPermission
|
||||
|
||||
|
||||
class AccountACLEntry(KeyoskBaseModel):
|
||||
"""Access control list entry model definition
|
||||
|
||||
:attribute account: Account the ACL entry applies to
|
||||
:attribute access_list: The access list the entry is for
|
||||
:attribute permission: The permission the entry is for
|
||||
:attribute with_server_secret: Whether the permission should be applied when the
|
||||
account authenticates with the account's
|
||||
server-set-secret
|
||||
:attribute with_client_secret: Whether the permission should be applied when the
|
||||
account authenticates with the account's
|
||||
client-set-secret
|
||||
|
||||
.. note:: Since permissions are by definition boolean, there is no need to store a
|
||||
value parameter with an ACL entry: if an entry exists for a given account
|
||||
for a given access list with a given permission, then that permission is
|
||||
granted on that access list to that account; similarly, if one does not
|
||||
exist then it is not granted.
|
||||
"""
|
||||
|
||||
class Meta: # pylint: disable=missing-docstring,too-few-public-methods
|
||||
table_name = "account_acl"
|
||||
|
||||
account = peewee.ForeignKeyField(Account, backref="acls")
|
||||
access_list = peewee.ForeignKeyField(DomainAccessList)
|
||||
permission = peewee.ForeignKeyField(DomainPermission)
|
||||
with_server_secret = peewee.BooleanField(null=False)
|
||||
with_client_secret = peewee.BooleanField(null=False)
|
||||
|
||||
def __iter__(self):
|
||||
yield "access_list", self.access_list.name
|
||||
yield "permission", self.permission.name
|
||||
yield "with_server_secret", self.with_server_secret
|
||||
yield "with_client_secret", self.with_client_secret
|
||||
|
||||
def __str__(self):
|
||||
return f"ACL {self.permission.name}@{self.access_list.name} (scope:{'+'.join([item for item in ['server' if self.with_server_secret else '', 'client' if self.with_client_secret else ''] if item])})"
|
@ -1,13 +1,12 @@
|
||||
"""Authentication domain model definition"""
|
||||
import datetime
|
||||
from typing import List
|
||||
|
||||
import peewee
|
||||
|
||||
from keyosk.database._shared import KeyoskBaseModel
|
||||
|
||||
|
||||
class Domain(KeyoskBaseModel):
|
||||
class KeyoskDomain(KeyoskBaseModel):
|
||||
"""Authentication domain storage model
|
||||
|
||||
:attribute created: Datetime indicating when the domain was first created
|
||||
@ -53,11 +52,6 @@ class Domain(KeyoskBaseModel):
|
||||
_lifespan_access = peewee.IntegerField(null=False)
|
||||
_lifespan_refresh = peewee.IntegerField(null=False)
|
||||
|
||||
@property
|
||||
def access_list_names(self) -> List[str]:
|
||||
"""Return the list of access list names"""
|
||||
return [item.name for item in self.access_lists]
|
||||
|
||||
@property
|
||||
def lifespan_access(self) -> datetime.timedelta:
|
||||
"""Return the access lifespan as a timedelta"""
|
||||
@ -78,75 +72,25 @@ class Domain(KeyoskBaseModel):
|
||||
"""Set the refresh lifespan as an integer from a timedelta"""
|
||||
self._lifespan_refresh = int(value.total_seconds())
|
||||
|
||||
@staticmethod
|
||||
def dict_keys() -> List[str]:
|
||||
return [
|
||||
"uuid",
|
||||
"created",
|
||||
"updated",
|
||||
"name",
|
||||
"audience",
|
||||
"title",
|
||||
"description",
|
||||
"contact",
|
||||
"enabled",
|
||||
"enable_client_set_auth",
|
||||
"enable_server_set_auth",
|
||||
"enable_refresh",
|
||||
"lifespan_access",
|
||||
"lifespan_refresh",
|
||||
"access_list_names",
|
||||
"permissions",
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def foreign_backref() -> List[str]:
|
||||
return ["permissions"]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Domain '{self.name}' ({self.uuid})"
|
||||
|
||||
|
||||
class DomainAccessList(KeyoskBaseModel):
|
||||
"""Access list name model definition
|
||||
class KeyoskDomainAccessList(KeyoskBaseModel):
|
||||
class Meta: # pylint: disable=too-few-public-methods,missing-docstring
|
||||
table_name = "domain_access_list"
|
||||
|
||||
:attribute name: Name of the access control list
|
||||
:attribute domain: Authentication domain the ACL applies to
|
||||
"""
|
||||
|
||||
class Meta: # pylint: disable=missing-docstring,too-few-public-methods
|
||||
table_name = "domain_acl"
|
||||
|
||||
name = peewee.CharField(null=False, unique=True)
|
||||
domain = peewee.ForeignKeyField(Domain, backref="access_lists")
|
||||
|
||||
@staticmethod
|
||||
def dict_keys() -> List[str]:
|
||||
return ["name"]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
domain = peewee.ForeignKeyField(
|
||||
KeyoskDomain, null=False, on_delete="CASCADE", backref="access_lists"
|
||||
)
|
||||
name = peewee.CharField(null=False)
|
||||
|
||||
|
||||
class DomainPermission(KeyoskBaseModel):
|
||||
"""Permission name model definition
|
||||
|
||||
:attribute name: Name of the permission
|
||||
:attribute bitindex: Index in the generated bitmask that indicates this permission;
|
||||
zero-indexed
|
||||
:attribute domain: Authentication domain the permission should apply to the ACLs of
|
||||
"""
|
||||
|
||||
class Meta: # pylint: disable=missing-docstring,too-few-public-methods
|
||||
class KeyoskDomainPermission(KeyoskBaseModel):
|
||||
class Meta: # pylint: disable=too-few-public-methods,missing-docstring
|
||||
table_name = "domain_permission"
|
||||
|
||||
domain = peewee.ForeignKeyField(
|
||||
KeyoskDomain, null=False, on_delete="CASCADE", backref="permissions"
|
||||
)
|
||||
name = peewee.CharField(null=False)
|
||||
bitindex = peewee.IntegerField(null=False)
|
||||
domain = peewee.ForeignKeyField(Domain, backref="permissions")
|
||||
|
||||
@staticmethod
|
||||
def dict_keys() -> List[str]:
|
||||
return ["name", "bitindex"]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
@ -6,8 +6,8 @@ import peewee
|
||||
|
||||
from keyosk import datatypes
|
||||
from keyosk.database._shared import KeyoskBaseModel
|
||||
from keyosk.database.account import Account
|
||||
from keyosk.database.domain import Domain
|
||||
from keyosk.database.account import KeyoskAccount
|
||||
from keyosk.database.domain import KeyoskDomain
|
||||
|
||||
|
||||
class Token(KeyoskBaseModel):
|
||||
@ -33,22 +33,28 @@ class Token(KeyoskBaseModel):
|
||||
class Meta: # pylint: disable=missing-docstring,too-few-public-methods
|
||||
table_name = "token"
|
||||
|
||||
account = peewee.ForeignKeyField(Account, backref="tokens", null=True)
|
||||
domain = peewee.ForeignKeyField(Domain, backref="tokens", null=True)
|
||||
account = peewee.ForeignKeyField(
|
||||
KeyoskAccount, backref="tokens", null=True, on_delete="SET NULL"
|
||||
)
|
||||
domain = peewee.ForeignKeyField(
|
||||
KeyoskDomain, backref="tokens", null=True, on_delete="SET NULL"
|
||||
)
|
||||
issuer = peewee.CharField(null=False)
|
||||
issued = peewee.DateTimeField(null=False, default=datetime.datetime.utcnow)
|
||||
expires = peewee.DateTimeField(null=False)
|
||||
revoked = peewee.BooleanField(null=False)
|
||||
refresh = peewee.CharField(null=True)
|
||||
refresh_expires = peewee.DateTimeField(null=True)
|
||||
_claims = peewee.CharField(null=False)
|
||||
hash_publickey = peewee.CharField(null=False)
|
||||
hash_blacklist = peewee.CharField(null=False)
|
||||
_scopes = peewee.CharField(null=False, default="[]")
|
||||
|
||||
@property
|
||||
def claims(self) -> datatypes.TokenClaims:
|
||||
def scopes(self) -> datatypes.TokenClaims:
|
||||
"""Return the claims dictionary"""
|
||||
return json.loads(self._claims)
|
||||
return json.loads(self._scopes)
|
||||
|
||||
@claims.setter
|
||||
def claims(self, value: datatypes.TokenClaims):
|
||||
@scopes.setter
|
||||
def scopes(self, value: datatypes.TokenClaims):
|
||||
"""Set the claims dictionary"""
|
||||
self._claims = json.dumps(value)
|
||||
self._scopes = json.dumps(value)
|
||||
|
Loading…
Reference in New Issue
Block a user