Update tests to use new database schema layout

This commit is contained in:
Ethan Paul 2020-03-09 00:14:29 -04:00
parent ce20868b49
commit e5b4de799c
5 changed files with 87 additions and 157 deletions

View File

@ -11,4 +11,4 @@ class KeyoskDomainAccessList(KeyoskBaseModel):
domain = peewee.ForeignKeyField( domain = peewee.ForeignKeyField(
KeyoskDomain, null=False, on_delete="CASCADE", backref="access_lists" KeyoskDomain, null=False, on_delete="CASCADE", backref="access_lists"
) )
name = peewee.CharField(null=False) name = peewee.CharField(null=False, unique=True)

View File

@ -7,11 +7,11 @@ import pytest
from keyosk import config from keyosk import config
from keyosk import database from keyosk import database
from keyosk.database import Account from keyosk.database import KeyoskAccount
from keyosk.database import AccountACLEntry from keyosk.database import KeyoskAccountScope
from keyosk.database import Domain from keyosk.database import KeyoskDomain
from keyosk.database import DomainAccessList from keyosk.database import KeyoskDomainAccessList
from keyosk.database import DomainPermission from keyosk.database import KeyoskDomainPermission
@contextlib.contextmanager @contextlib.contextmanager
@ -42,7 +42,7 @@ def demo_database(request, tmp_path_factory):
tmp_path = _pytest.tmpdir._mk_tmp(request, tmp_path_factory) tmp_path = _pytest.tmpdir._mk_tmp(request, tmp_path_factory)
accounts = [ accounts = [
Account( KeyoskAccount(
username="lskywalker", username="lskywalker",
encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash( encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash(
"xWingLuvr4evA" "xWingLuvr4evA"
@ -55,7 +55,7 @@ def demo_database(request, tmp_path_factory):
"jedi": True, "jedi": True,
}, },
), ),
Account( KeyoskAccount(
username="dvader", username="dvader",
encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash( encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash(
"nobodyKnowsIKilledAllTheYounglings" "nobodyKnowsIKilledAllTheYounglings"
@ -68,14 +68,14 @@ def demo_database(request, tmp_path_factory):
"jedi": False, "jedi": False,
}, },
), ),
Account( KeyoskAccount(
username="hsolo", username="hsolo",
encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash("landosux"), encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash("landosux"),
encrypted_server_set_secret=passlib.hash.pbkdf2_sha512.hash("12ab34cd"), encrypted_server_set_secret=passlib.hash.pbkdf2_sha512.hash("12ab34cd"),
enabled=True, enabled=True,
extras={"full-name": "Han Solo", "homeworld": "Corellia", "jedi": False,}, extras={"full-name": "Han Solo", "homeworld": "Corellia", "jedi": False,},
), ),
Account( KeyoskAccount(
username="deusexmachina", username="deusexmachina",
encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash( encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash(
"whenyouneedsomethingtosavetheday:whoyagonnacall" "whenyouneedsomethingtosavetheday:whoyagonnacall"
@ -90,14 +90,14 @@ def demo_database(request, tmp_path_factory):
"species": None, "species": None,
}, },
), ),
Account( KeyoskAccount(
username="jack.oneill@airforce.gov", username="jack.oneill@airforce.gov",
encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash("topgun"), encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash("topgun"),
encrypted_server_set_secret=passlib.hash.pbkdf2_sha512.hash("987654321"), encrypted_server_set_secret=passlib.hash.pbkdf2_sha512.hash("987654321"),
enabled=True, enabled=True,
extras={"rank": "colonel", "species": "human",}, extras={"rank": "colonel", "species": "human",},
), ),
Account( KeyoskAccount(
username="tealc@airforce.gov", username="tealc@airforce.gov",
encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash( encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash(
"yourloginpassword" "yourloginpassword"
@ -106,7 +106,7 @@ def demo_database(request, tmp_path_factory):
enabled=True, enabled=True,
extras={"rank": None, "species": "jaffa"}, extras={"rank": None, "species": "jaffa"},
), ),
Account( KeyoskAccount(
username="jonas.quinn@airforce.gov", username="jonas.quinn@airforce.gov",
encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash( encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash(
"d7409ed1dd0a485b8e09f7147ad0e3ab" "d7409ed1dd0a485b8e09f7147ad0e3ab"
@ -118,7 +118,7 @@ def demo_database(request, tmp_path_factory):
] ]
domains = [ domains = [
Domain( KeyoskDomain(
name="star-wars", name="star-wars",
audience="stwr", audience="stwr",
title="Star Wars (by Disney)", title="Star Wars (by Disney)",
@ -131,7 +131,7 @@ def demo_database(request, tmp_path_factory):
lifespan_access=datetime.timedelta(minutes=30), lifespan_access=datetime.timedelta(minutes=30),
lifespan_refresh=datetime.timedelta(days=30), lifespan_refresh=datetime.timedelta(days=30),
), ),
Domain( KeyoskDomain(
name="stargate", name="stargate",
audience="sg1", audience="sg1",
title="Stargate SG-1", title="Stargate SG-1",
@ -148,132 +148,136 @@ def demo_database(request, tmp_path_factory):
with sqlite_database(tmp_path): with sqlite_database(tmp_path):
with database.interface.atomic(): with database.interface.atomic():
Account.bulk_create(accounts) KeyoskAccount.bulk_create(accounts)
Domain.bulk_create(domains) KeyoskDomain.bulk_create(domains)
starwars = Domain.get(Domain.name == "star-wars") starwars = KeyoskDomain.get(KeyoskDomain.name == "star-wars")
stargate = Domain.get(Domain.name == "stargate") stargate = KeyoskDomain.get(KeyoskDomain.name == "stargate")
access_lists = [ access_lists = [
DomainAccessList(name="imperial-star-destroyer", domain=starwars), KeyoskDomainAccessList(name="imperial-star-destroyer", domain=starwars),
DomainAccessList(name="millenium-falcon", domain=starwars), KeyoskDomainAccessList(name="millenium-falcon", domain=starwars),
DomainAccessList(name="x-wing", domain=starwars), KeyoskDomainAccessList(name="x-wing", domain=starwars),
DomainAccessList(name="nebulon-b", domain=starwars), KeyoskDomainAccessList(name="nebulon-b", domain=starwars),
DomainAccessList(name="p90", domain=stargate), KeyoskDomainAccessList(name="p90", domain=stargate),
DomainAccessList(name="staff-weapon", domain=stargate), KeyoskDomainAccessList(name="staff-weapon", domain=stargate),
DomainAccessList(name="zatniktel", domain=stargate), KeyoskDomainAccessList(name="zatniktel", domain=stargate),
] ]
permissions = [ permissions = [
DomainPermission(name="access", bitindex=0, domain=starwars), KeyoskDomainPermission(name="access", bitindex=0, domain=starwars),
DomainPermission(name="fly", bitindex=1, domain=starwars), KeyoskDomainPermission(name="fly", bitindex=1, domain=starwars),
DomainPermission(name="attack", bitindex=2, domain=starwars), KeyoskDomainPermission(name="attack", bitindex=2, domain=starwars),
DomainPermission(name="own", bitindex=0, domain=stargate), KeyoskDomainPermission(name="own", bitindex=0, domain=stargate),
DomainPermission(name="fire", bitindex=1, domain=stargate), KeyoskDomainPermission(name="fire", bitindex=1, domain=stargate),
DomainPermission(name="reload", bitindex=2, domain=stargate), KeyoskDomainPermission(name="reload", bitindex=2, domain=stargate),
DomainPermission(name="repair", bitindex=3, domain=stargate), KeyoskDomainPermission(name="repair", bitindex=3, domain=stargate),
] ]
with database.interface.atomic(): with database.interface.atomic():
DomainAccessList.bulk_create(access_lists) KeyoskDomainAccessList.bulk_create(access_lists)
DomainPermission.bulk_create(permissions) KeyoskDomainPermission.bulk_create(permissions)
deusexmachina = Account.get(Account.username == "deusexmachina") deusexmachina = KeyoskAccount.get(KeyoskAccount.username == "deusexmachina")
lskywalker = Account.get(Account.username == "lskywalker") lskywalker = KeyoskAccount.get(KeyoskAccount.username == "lskywalker")
jackoneill = Account.get(Account.username == "jack.oneill@airforce.gov") jackoneill = KeyoskAccount.get(
KeyoskAccount.username == "jack.oneill@airforce.gov"
sw_isd = DomainAccessList.get(
DomainAccessList.name == "imperial-star-destroyer"
) )
sg_zatniktel = DomainAccessList.get(DomainAccessList.name == "zatniktel")
sw_access = DomainPermission.get(DomainPermission.name == "access") sw_isd = KeyoskDomainAccessList.get(
sw_fly = DomainPermission.get(DomainPermission.name == "fly") KeyoskDomainAccessList.name == "imperial-star-destroyer"
sw_attack = DomainPermission.get(DomainPermission.name == "attack") )
sg_own = DomainPermission.get(DomainPermission.name == "own") sg_zatniktel = KeyoskDomainAccessList.get(
sg_fire = DomainPermission.get(DomainPermission.name == "fire") KeyoskDomainAccessList.name == "zatniktel"
sg_reload = DomainPermission.get(DomainPermission.name == "reload") )
sg_repair = DomainPermission.get(DomainPermission.name == "repair")
sw_access = KeyoskDomainPermission.get(KeyoskDomainPermission.name == "access")
sw_fly = KeyoskDomainPermission.get(KeyoskDomainPermission.name == "fly")
sw_attack = KeyoskDomainPermission.get(KeyoskDomainPermission.name == "attack")
sg_own = KeyoskDomainPermission.get(KeyoskDomainPermission.name == "own")
sg_fire = KeyoskDomainPermission.get(KeyoskDomainPermission.name == "fire")
sg_reload = KeyoskDomainPermission.get(KeyoskDomainPermission.name == "reload")
sg_repair = KeyoskDomainPermission.get(KeyoskDomainPermission.name == "repair")
acls = [ acls = [
AccountACLEntry( KeyoskAccountScope(
account=deusexmachina, account=deusexmachina,
access_list=sw_isd, access_list=sw_isd,
permission=sw_access, permission=sw_access,
with_server_secret=True, with_server_secret=True,
with_client_secret=False, with_client_secret=False,
), ),
AccountACLEntry( KeyoskAccountScope(
account=deusexmachina, account=deusexmachina,
access_list=sw_isd, access_list=sw_isd,
permission=sw_fly, permission=sw_fly,
with_server_secret=True, with_server_secret=True,
with_client_secret=False, with_client_secret=False,
), ),
AccountACLEntry( KeyoskAccountScope(
account=deusexmachina, account=deusexmachina,
access_list=sw_isd, access_list=sw_isd,
permission=sw_attack, permission=sw_attack,
with_server_secret=True, with_server_secret=True,
with_client_secret=False, with_client_secret=False,
), ),
AccountACLEntry( KeyoskAccountScope(
account=deusexmachina, account=deusexmachina,
access_list=sg_zatniktel, access_list=sg_zatniktel,
permission=sg_own, permission=sg_own,
with_server_secret=True, with_server_secret=True,
with_client_secret=False, with_client_secret=False,
), ),
AccountACLEntry( KeyoskAccountScope(
account=deusexmachina, account=deusexmachina,
access_list=sg_zatniktel, access_list=sg_zatniktel,
permission=sg_fire, permission=sg_fire,
with_server_secret=True, with_server_secret=True,
with_client_secret=False, with_client_secret=False,
), ),
AccountACLEntry( KeyoskAccountScope(
account=deusexmachina, account=deusexmachina,
access_list=sg_zatniktel, access_list=sg_zatniktel,
permission=sg_reload, permission=sg_reload,
with_server_secret=True, with_server_secret=True,
with_client_secret=False, with_client_secret=False,
), ),
AccountACLEntry( KeyoskAccountScope(
account=deusexmachina, account=deusexmachina,
access_list=sg_zatniktel, access_list=sg_zatniktel,
permission=sg_repair, permission=sg_repair,
with_server_secret=True, with_server_secret=True,
with_client_secret=False, with_client_secret=False,
), ),
AccountACLEntry( KeyoskAccountScope(
account=lskywalker, account=lskywalker,
access_list=sw_isd, access_list=sw_isd,
permission=sw_attack, permission=sw_attack,
with_server_secret=True, with_server_secret=True,
with_client_secret=True, with_client_secret=True,
), ),
AccountACLEntry( KeyoskAccountScope(
account=lskywalker, account=lskywalker,
access_list=sw_isd, access_list=sw_isd,
permission=sw_access, permission=sw_access,
with_server_secret=True, with_server_secret=True,
with_client_secret=False, with_client_secret=False,
), ),
AccountACLEntry( KeyoskAccountScope(
account=jackoneill, account=jackoneill,
access_list=sg_zatniktel, access_list=sg_zatniktel,
permission=sg_fire, permission=sg_fire,
with_server_secret=True, with_server_secret=True,
with_client_secret=True, with_client_secret=True,
), ),
AccountACLEntry( KeyoskAccountScope(
account=jackoneill, account=jackoneill,
access_list=sg_zatniktel, access_list=sg_zatniktel,
permission=sg_reload, permission=sg_reload,
with_server_secret=True, with_server_secret=True,
with_client_secret=True, with_client_secret=True,
), ),
AccountACLEntry( KeyoskAccountScope(
account=jackoneill, account=jackoneill,
access_list=sg_zatniktel, access_list=sg_zatniktel,
permission=sg_repair, permission=sg_repair,
@ -283,6 +287,6 @@ def demo_database(request, tmp_path_factory):
] ]
with database.interface.atomic(): with database.interface.atomic():
AccountACLEntry.bulk_create(acls) KeyoskAccountScope.bulk_create(acls)
yield yield

View File

@ -8,31 +8,16 @@ from fixtures import demo_database
from keyosk import database from keyosk import database
def test_meta():
for key in database.Account.dict_keys():
assert hasattr(database.Account, key)
attr = getattr(database.Account, key)
if key in database.Account.foreign_ref():
assert isinstance(attr, peewee.ForeignKeyField)
else:
assert not isinstance(attr, peewee.ForeignKeyField)
if key in database.Account.foreign_backref():
assert isinstance(attr, peewee.BackrefAccessor)
else:
assert not isinstance(attr, peewee.BackrefAccessor)
def test_formatting(demo_database): def test_formatting(demo_database):
for account in database.Account.select(): for account in database.KeyoskAccount.select():
assert list(dict(account).keys()) == database.Account.dict_keys()
assert str(account.uuid) in str(account) assert str(account.uuid) in str(account)
assert account.username in str(account) assert account.username in str(account)
def test_extras(demo_database): def test_extras(demo_database):
account = database.Account.get(database.Account.username == "lskywalker") account = database.KeyoskAccount.get(
database.KeyoskAccount.username == "lskywalker"
)
new_extras = {"foo": "bar", "fizz": "buzz", "baz": False, "blop": 1234.567} new_extras = {"foo": "bar", "fizz": "buzz", "baz": False, "blop": 1234.567}
@ -40,34 +25,14 @@ def test_extras(demo_database):
with database.interface.atomic(): with database.interface.atomic():
account.save() account.save()
account = database.Account.get(database.Account.username == "lskywalker") account = database.KeyoskAccount.get(
database.KeyoskAccount.username == "lskywalker"
)
assert account.extras == new_extras assert account.extras == new_extras
def test_crypto(demo_database):
account = database.Account.get(
database.Account.username == "jack.oneill@airforce.gov"
)
account.update_client_set_secret("oneillWithTwoLs")
with database.interface.atomic():
account.save()
account = database.Account.get(
database.Account.username == "jack.oneill@airforce.gov"
)
assert account.verify_client_set_secret("oneillWithTwoLs")
new_autopass = account.update_server_set_secret()
with database.interface.atomic():
account.save()
account = database.Account.get(
database.Account.username == "jack.oneill@airforce.gov"
)
assert account.verify_server_set_secret(new_autopass)
def test_unique(demo_database): def test_unique(demo_database):
new_base = database.Account( new_base = database.KeyoskAccount(
username="garbage", username="garbage",
encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash("garbage"), encrypted_client_set_secret=passlib.hash.pbkdf2_sha512.hash("garbage"),
encrypted_server_set_secret=passlib.hash.pbkdf2_sha512.hash("garbage"), encrypted_server_set_secret=passlib.hash.pbkdf2_sha512.hash("garbage"),
@ -75,7 +40,7 @@ def test_unique(demo_database):
extras={"gar": "bage"}, extras={"gar": "bage"},
) )
vader = database.Account.get(database.Account.username == "dvader") vader = database.KeyoskAccount.get(database.KeyoskAccount.username == "dvader")
unique = ["username"] unique = ["username"]
nonunique = ["extras"] nonunique = ["extras"]
@ -91,13 +56,13 @@ def test_unique(demo_database):
# create gives me that integrity error I'm after # create gives me that integrity error I'm after
with pytest.raises(peewee.IntegrityError): with pytest.raises(peewee.IntegrityError):
with database.interface.atomic(): with database.interface.atomic():
database.Account.bulk_create([new]) database.KeyoskAccount.bulk_create([new])
for item in nonunique: for item in nonunique:
new = copy.deepcopy(new_base) new = copy.deepcopy(new_base)
setattr(new, item, getattr(vader, item)) setattr(new, item, getattr(vader, item))
with database.interface.atomic(): with database.interface.atomic():
database.Account.bulk_create([new]) database.KeyoskAccount.bulk_create([new])
with database.interface.atomic(): with database.interface.atomic():
new.delete_instance() new.delete_instance()

View File

@ -1,12 +0,0 @@
import json
import peewee
from fixtures import demo_database
from keyosk import database
def test_formatting(demo_database):
for acl in database.AccountACLEntry.select():
assert dict(acl) == json.loads(json.dumps(dict(acl)))
assert str(acl.uuid) not in str(acl)

View File

@ -8,41 +8,14 @@ from fixtures import demo_database
from keyosk import database from keyosk import database
def test_meta():
models = [database.Domain, database.DomainAccessList, database.DomainPermission]
for model in models:
for key in model.dict_keys():
assert hasattr(model, key)
attr = getattr(model, key)
if key in model.foreign_ref():
assert isinstance(attr, peewee.ForeignKeyField)
else:
assert not isinstance(attr, peewee.ForeignKeyField)
if key in model.foreign_backref():
assert isinstance(attr, peewee.BackrefAccessor)
else:
assert not isinstance(attr, peewee.BackrefAccessor)
def test_formatting(demo_database): def test_formatting(demo_database):
for domain in database.Domain.select(): for domain in database.KeyoskDomain.select():
assert list(dict(domain).keys()) == database.Domain.dict_keys()
assert str(domain.uuid) in str(domain) assert str(domain.uuid) in str(domain)
assert domain.name in str(domain) assert domain.name in str(domain)
for permission in database.DomainPermission.select():
assert list(dict(permission).keys()) == database.DomainPermission.dict_keys()
assert str(permission.uuid) not in str(permission)
for access_list in database.DomainAccessList.select():
assert list(dict(access_list).keys()) == database.DomainAccessList.dict_keys()
assert str(access_list.uuid) not in str(access_list)
def test_unique(demo_database): def test_unique(demo_database):
new_base = database.Domain( new_base = database.KeyoskDomain(
name="garbage", name="garbage",
audience="garbage", audience="garbage",
title="garbage", title="garbage",
@ -56,7 +29,7 @@ def test_unique(demo_database):
lifespan_refresh=datetime.timedelta(days=30), lifespan_refresh=datetime.timedelta(days=30),
) )
starwars = database.Domain.get(database.Domain.name == "star-wars") starwars = database.KeyoskDomain.get(database.KeyoskDomain.name == "star-wars")
unique = ["name", "audience"] unique = ["name", "audience"]
nonunique = ["title", "description", "contact"] nonunique = ["title", "description", "contact"]
@ -66,26 +39,26 @@ def test_unique(demo_database):
setattr(new, item, getattr(starwars, item)) setattr(new, item, getattr(starwars, item))
with pytest.raises(peewee.IntegrityError): with pytest.raises(peewee.IntegrityError):
with database.interface.atomic(): with database.interface.atomic():
database.Domain.bulk_create([new]) database.KeyoskDomain.bulk_create([new])
for item in nonunique: for item in nonunique:
new = copy.deepcopy(new_base) new = copy.deepcopy(new_base)
setattr(new, item, getattr(starwars, item)) setattr(new, item, getattr(starwars, item))
with database.interface.atomic(): with database.interface.atomic():
database.Domain.bulk_create([new]) database.KeyoskDomain.bulk_create([new])
with database.interface.atomic(): with database.interface.atomic():
new.delete_instance() new.delete_instance()
def test_unique_access_lists(demo_database): def test_unique_access_lists(demo_database):
new_base = database.DomainAccessList( new_base = database.KeyoskDomainAccessList(
name="imperial-star-destroyer", name="imperial-star-destroyer",
domain=database.Domain.get(database.Domain.name == "star-wars"), domain=database.KeyoskDomain.get(database.KeyoskDomain.name == "star-wars"),
) )
isd = database.DomainAccessList.get( isd = database.KeyoskDomainAccessList.get(
database.DomainAccessList.name == "imperial-star-destroyer" database.KeyoskDomainAccessList.name == "imperial-star-destroyer"
) )
unique = ["name"] unique = ["name"]
@ -95,4 +68,4 @@ def test_unique_access_lists(demo_database):
setattr(new, item, getattr(isd, item)) setattr(new, item, getattr(isd, item))
with pytest.raises(peewee.IntegrityError): with pytest.raises(peewee.IntegrityError):
with database.interface.atomic(): with database.interface.atomic():
database.DomainAccessList.bulk_create([new]) database.KeyoskDomainAccessList.bulk_create([new])