diff --git a/serveradmin/serverdb/query_committer.py b/serveradmin/serverdb/query_committer.py index 889b05b3..4c419f38 100644 --- a/serveradmin/serverdb/query_committer.py +++ b/serveradmin/serverdb/query_committer.py @@ -5,12 +5,15 @@ import logging from itertools import chain +from typing import Optional +from django.contrib.auth.models import User from django.core.exceptions import PermissionDenied, ValidationError from django.db import IntegrityError, transaction from adminapi.dataset import DatasetCommit from adminapi.request import json_encode_extra +from serveradmin.apps.models import Application from serveradmin.serverdb.models import ( Servertype, Attribute, @@ -53,9 +56,6 @@ def __init__(self, message, newer=None): def commit_query(created=[], changed=[], deleted=[], app=None, user=None): """The main function to commit queries""" - if not user: - user = app.owner - pre_commit.send_robust( commit_query, created=created, changed=changed, deleted=deleted ) @@ -310,19 +310,19 @@ def _upsert_attributes(attribute_lookup, changed, changed_servers): def _access_control( - user, app, unchanged_objects, - created_objects, changed_objects, deleted_objects, -): + user: Optional[User], app: Optional[Application], unchanged_objects: dict, + created_objects: dict, changed_objects: dict, deleted_objects: dict, +) -> None: """Enforce serveradmin ACLs - For servershell commits, ensure the user is allowed to make the requested - changes. For adminapi commits, ensure both the user and its app are - allowed to make the requested changes. + For Servershell commits, ensure the user is allowed to make the requested + changes. For adminapi commits, ensure both the app is allowed to make the + requested changes. - Serveradmin ACLs are additive. This means not all ACL must allow all the - changes a user is trying to make, but a single ACL is enough. + Serveradmin ACLs are additive. This means not all ACLs must allow all the + changes a user or app is trying to make, but a single ACL is enough. - Note: Different ACLs may be be used to permit different parts of a commit. + Note: Different ACLs may be used to permit different parts of a commit. The commit is checked per object changed in the commit. As a result at least all changes to a single object within a commit must be permissible via a single ACL. @@ -335,15 +335,19 @@ def _access_control( Returns None on success. """ + # superusers and apps can not violate permissions. + if (user and user.is_superuser) or (app and app.superuser): + return None + entities = [] - if not user.is_superuser: - entities.append( - ('user', user, list(user.access_control_groups.all())) - ) - if app and not app.superuser: - entities.append( - ('application', app, list(app.access_control_groups.all())) - ) + if app: + entities.append(('application', app, list(app.access_control_groups.all()))) + elif user: + entities.append(('user', user, list(user.access_control_groups.all()))) + else: + # This should not be possible as it means not authenticated but better + # safe than sorry. + raise PermissionDenied('Missing authentication!') # Check all objects touched by this commit for obj in chain( @@ -351,8 +355,7 @@ def _access_control( changed_objects.values(), deleted_objects.values(), ): - # Check both the user and, if applicable, the users app - # If either doesn't have the necessary rights, abort the commit + # Check app or if not present user permissions for entity_class, entity_name, groups in entities: acl_violations = { acl: _acl_violations(unchanged_objects, obj, acl) @@ -394,6 +397,12 @@ def _acl_violations(changed_objects, obj, acl): # Check wether the object matches all the attribute filters of the ACL for attribute_id, attribute_filter in acl.get_filters().items(): + # TODO: This relies on the object to have all attributes that are + # present in the attribute_filter which currently works because + # we fetch all attributes in commit_query (joined_attribtues). + # This method would be better of not relying on the caller + # making sure passing down all relevant attributes which isn't + # even documented. if not attribute_filter.matches(obj.get(attribute_id)): violations.append( 'Object is not covered by ACL "{}", Attribute "{}" ' diff --git a/serveradmin/serverdb/tests/test_acls.py b/serveradmin/serverdb/tests/test_acls.py new file mode 100644 index 00000000..7bd1d460 --- /dev/null +++ b/serveradmin/serverdb/tests/test_acls.py @@ -0,0 +1,494 @@ +from django.contrib.auth.models import User +from django.core.exceptions import PermissionDenied +from django.test import TransactionTestCase + +from serveradmin.access_control.models import AccessControlGroup +from serveradmin.apps.models import Application +from serveradmin.dataset import Query +from serveradmin.serverdb import query_committer +from serveradmin.serverdb.models import Attribute + + +class ACLTestCase(TransactionTestCase): + """Test Permissions are evaluated properly + + Some of the tests might be a redundant but since the amount of test cases + is manageable, and we really want to be sure here that the ACLs behave as + we expect them we just test most of them instead of just a few. + + """ + + fixtures = ["auth_user.json", "test_dataset.json"] + + def test_deny_if_not_authenticated(self): + with self.assertRaises(PermissionDenied) as error: + # Trying to commit without being authenticated must not be possible. + query_committer._access_control(None, None, {}, {}, {}, {}) + self.assertEqual("Missing authentication!", str(error.exception)) + + def test_permit_if_superuser_app(self): + user = User.objects.first() + app = Application.objects.create( + name="superuser test", + app_id="superuser test", + auth_token="secret", + owner=user, + location="test", + ) + + self.assertIsNone(query_committer._access_control(None, app, {}, {}, {}, {})) + + def test_permit_if_superuser(self): + user = User.objects.first() + user.is_superuser = True + + self.assertIsNone(query_committer._access_control(user, None, {}, {}, {}, {})) + + def test_deny_if_app_acl_does_not_cover_object(self): + user = User.objects.first() + app = Application.objects.create( + name="superuser test", + app_id="superuser test", + auth_token="secret", + owner=user, + location="test", + ) + acl = AccessControlGroup.objects.create(name="app test", query="servertype=vm") + acl.applications.add(app) + acl.save() + + unchanged_object = Query({"object_id": 1}, ["os", "hostname"]).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + with self.assertRaises(PermissionDenied) as error: + query_committer._access_control( + None, app, unchanged_objects, {}, changed_objects, {} + ) + self.assertEqual( + 'Insufficient access rights to object "test0" for application ' + '"superuser test": Object is not covered by ACL "app test", ' + 'Attribute "servertype" does not match the filter "\'vm\'".', + str(error.exception), + ) + + def test_deny_if_user_acl_does_not_cover_object(self): + user = User.objects.first() + user.is_superuser = False + + acl = AccessControlGroup.objects.create(name="app test", query="servertype=vm") + acl.members.add(user) + acl.save() + + unchanged_object = Query({"object_id": 1}, ["os", "hostname"]).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + with self.assertRaises(PermissionDenied) as error: + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + self.assertEqual( + 'Insufficient access rights to object "test0" for user ' + '"hannah.acker": Object is not covered by ACL "app test", ' + 'Attribute "servertype" does not match the filter "\'vm\'".', + str(error.exception), + ) + + def test_permit_if_app_acl_covers_object(self): + user = User.objects.first() + app = Application.objects.create( + name="superuser test", + app_id="superuser test", + auth_token="secret", + owner=user, + location="test", + ) + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=False + ) + acl.applications.add(app) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + self.assertIsNone( + query_committer._access_control( + None, app, unchanged_objects, {}, changed_objects, {} + ) + ) + + def test_permit_if_user_acl_covers_object(self): + user = User.objects.first() + user.is_superuser = False + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=False + ) + acl.members.add(user) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + self.assertIsNone( + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + ) + + def test_deny_if_app_acl_whitelist_does_not_list_attribute(self): + user = User.objects.first() + app = Application.objects.create( + name="superuser test", + app_id="superuser test", + auth_token="secret", + owner=user, + location="test", + ) + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=True + ) + acl.applications.add(app) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + with self.assertRaises(PermissionDenied) as error: + query_committer._access_control( + None, app, unchanged_objects, {}, changed_objects, {} + ) + self.assertEqual( + 'Insufficient access rights to object "test0" for application ' + '"superuser test": Change is not covered by ACL "app test", ' + 'Attribute "os" was modified despite not beeing whitelisted.', + str(error.exception), + ) + + def test_deny_if_user_acl_whitelist_does_not_list_attribute(self): + user = User.objects.first() + user.is_superuser = False + + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=True + ) + acl.members.add(user) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + with self.assertRaises(PermissionDenied) as error: + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + self.assertEqual( + 'Insufficient access rights to object "test0" for user ' + '"hannah.acker": Change is not covered by ACL "app test", ' + 'Attribute "os" was modified despite not beeing whitelisted.', + str(error.exception), + ) + + def test_permit_if_app_acl_whitelist_lists_attribute(self): + user = User.objects.first() + app = Application.objects.create( + name="superuser test", + app_id="superuser test", + auth_token="secret", + owner=user, + location="test", + ) + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=True + ) + acl.applications.add(app) + acl.attributes.add(Attribute.objects.get(attribute_id="os")) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + self.assertIsNone( + query_committer._access_control( + None, app, unchanged_objects, {}, changed_objects, {} + ) + ) + + def test_permit_if_user_acl_whitelist_lists_attribute(self): + user = User.objects.first() + user.is_superuser = False + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=True + ) + acl.members.add(user) + acl.attributes.add(Attribute.objects.get(attribute_id="os")) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + self.assertIsNone( + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + ) + + def test_deny_if_app_acl_blacklist_lists_attribute(self): + user = User.objects.first() + app = Application.objects.create( + name="superuser test", + app_id="superuser test", + auth_token="secret", + owner=user, + location="test", + ) + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=False + ) + acl.applications.add(app) + acl.attributes.add(Attribute.objects.get(attribute_id="os")) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + with self.assertRaises(PermissionDenied) as error: + query_committer._access_control( + None, app, unchanged_objects, {}, changed_objects, {} + ) + self.assertEqual( + 'Insufficient access rights to object "test0" for application ' + '"superuser test": Change is not covered by ACL "app test", ' + 'Attribute "os" was modified despite not beeing whitelisted.', + str(error.exception), + ) + + def test_deny_if_user_acl_blacklist_lists_attribute(self): + user = User.objects.first() + user.is_superuser = False + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=False + ) + acl.members.add(user) + acl.attributes.add(Attribute.objects.get(attribute_id="os")) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + with self.assertRaises(PermissionDenied) as error: + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + self.assertEqual( + 'Insufficient access rights to object "test0" for user ' + '"hannah.acker": Change is not covered by ACL "app test", ' + 'Attribute "os" was modified despite not beeing whitelisted.', + str(error.exception), + ) + + def test_permit_if_app_acl_blacklist_misses_attribute(self): + user = User.objects.first() + app = Application.objects.create( + name="superuser test", + app_id="superuser test", + auth_token="secret", + owner=user, + location="test", + ) + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=False + ) + acl.applications.add(app) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + self.assertIsNone( + query_committer._access_control( + None, app, unchanged_objects, {}, changed_objects, {} + ) + ) + + def test_permit_if_user_acl_blacklist_misses_attribute(self): + user = User.objects.first() + user.is_superuser = False + acl = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=False + ) + acl.members.add(user) + acl.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query({"object_id": 1}, ["os", "hostname", "servertype"]).get() + changed_object["os"] = "bookworm" + changed_objects = {changed_object["object_id"]: changed_object} + + self.assertIsNone( + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + ) + + def test_deny_if_multiple_app_acls_cover_one_object_change_set(self): + # One ACL must cover all changes made to one object. Changes to one + # object being covered by multiple ACLs is not allowed. + + user = User.objects.first() + app = Application.objects.create( + name="superuser test", + app_id="superuser test", + auth_token="secret", + owner=user, + location="test", + ) + + acl_1 = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=True + ) + acl_1.applications.add(app) + acl_1.attributes.add(Attribute.objects.get(attribute_id="os")) + acl_1.save() + + acl_2 = AccessControlGroup.objects.create( + name="app test 2", query="servertype=test0", is_whitelist=True + ) + acl_2.applications.add(app) + acl_2.attributes.add(Attribute.objects.get(attribute_id="database")) + acl_2.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "database", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query( + {"object_id": 1}, ["os", "database", "hostname", "servertype"] + ).get() + changed_object["os"] = "bookworm" + changed_object["database"] = "bingo" + changed_objects = {changed_object["object_id"]: changed_object} + + with self.assertRaises(PermissionDenied) as error: + query_committer._access_control( + None, app, unchanged_objects, {}, changed_objects, {} + ) + self.assertEqual( + 'Insufficient access rights to object "test0" for application ' + '"superuser test": Change is not covered by ACL "app test", ' + 'Attribute "database" was modified despite not beeing whitelisted.' + 'Change is not covered by ACL "app test 2", Attribute "os" was ' + "modified despite not beeing whitelisted.", + str(error.exception), + ) + + def test_deny_if_multiple_user_acls_cover_one_object_change_set(self): + # One ACL must cover all changes made to one object. Changes to one + # object being covered by multiple ACLs is not allowed. + + user = User.objects.first() + user.is_superuser = False + + acl_1 = AccessControlGroup.objects.create( + name="app test", query="servertype=test0", is_whitelist=True + ) + acl_1.members.add(user) + acl_1.attributes.add(Attribute.objects.get(attribute_id="os")) + acl_1.save() + + acl_2 = AccessControlGroup.objects.create( + name="app test 2", query="servertype=test0", is_whitelist=True + ) + acl_1.members.add(user) + acl_2.attributes.add(Attribute.objects.get(attribute_id="database")) + acl_2.save() + + unchanged_object = Query( + {"object_id": 1}, ["os", "database", "hostname", "servertype"] + ).get() + unchanged_objects = {unchanged_object["object_id"]: unchanged_object} + + changed_object = Query( + {"object_id": 1}, ["os", "database", "hostname", "servertype"] + ).get() + changed_object["os"] = "bookworm" + changed_object["database"] = "bingo" + changed_objects = {changed_object["object_id"]: changed_object} + + with self.assertRaises(PermissionDenied) as error: + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + self.assertEqual( + 'Insufficient access rights to object "test0" for user ' + '"hannah.acker": Change is not covered by ACL "app test", ' + 'Attribute "database" was modified despite not beeing ' + "whitelisted.", + str(error.exception), + )