diff --git a/invenio_rdm_records/ext.py b/invenio_rdm_records/ext.py index 0892b1d63e..906624332f 100644 --- a/invenio_rdm_records/ext.py +++ b/invenio_rdm_records/ext.py @@ -26,6 +26,8 @@ RDMCommunityRecordsResource, RDMCommunityRecordsResourceConfig, RDMDraftFilesResourceConfig, + RDMGrantsAccessResource, + RDMGrantUserAccessResourceConfig, RDMParentGrantsResource, RDMParentGrantsResourceConfig, RDMParentRecordLinksResource, @@ -223,6 +225,11 @@ def init_resource(self, app): config=RDMParentGrantsResourceConfig.build(app), ) + self.grant_user_access_resource = RDMGrantsAccessResource( + service=self.records_service, + config=RDMGrantUserAccessResourceConfig.build(app), + ) + # Record's communities self.record_communities_resource = RDMRecordCommunitiesResource( service=self.record_communities_service, diff --git a/invenio_rdm_records/resources/__init__.py b/invenio_rdm_records/resources/__init__.py index 6793ede577..de7dc93f84 100644 --- a/invenio_rdm_records/resources/__init__.py +++ b/invenio_rdm_records/resources/__init__.py @@ -12,6 +12,7 @@ IIIFResourceConfig, RDMCommunityRecordsResourceConfig, RDMDraftFilesResourceConfig, + RDMGrantUserAccessResourceConfig, RDMParentGrantsResourceConfig, RDMParentRecordLinksResourceConfig, RDMRecordCommunitiesResourceConfig, @@ -22,6 +23,7 @@ from .resources import ( IIIFResource, RDMCommunityRecordsResource, + RDMGrantsAccessResource, RDMParentGrantsResource, RDMParentRecordLinksResource, RDMRecordRequestsResource, @@ -35,7 +37,9 @@ "RDMCommunityRecordsResourceConfig", "RDMDraftFilesResourceConfig", "RDMParentGrantsResource", + "RDMGrantsAccessResource", "RDMParentGrantsResourceConfig", + "RDMGrantUserAccessResourceConfig", "RDMParentRecordLinksResource", "RDMParentRecordLinksResourceConfig", "RDMRecordCommunitiesResourceConfig", diff --git a/invenio_rdm_records/resources/config.py b/invenio_rdm_records/resources/config.py index ac7bd2a223..6fdfeee6c4 100644 --- a/invenio_rdm_records/resources/config.py +++ b/invenio_rdm_records/resources/config.py @@ -34,6 +34,7 @@ from ..services.errors import ( AccessRequestExistsError, + GrantExistsError, InvalidAccessRestrictions, RecordDeletedException, ReviewExistsError, @@ -357,7 +358,23 @@ class RDMDraftMediaFilesResourceConfig(FileResourceConfig, ConfiguratorMixin): { LookupError: create_error_handler( HTTPJSONException(code=404, description="No grant found with the given ID.") - ) + ), + GrantExistsError: create_error_handler( + lambda e: HTTPJSONException( + code=400, + description=e.description, + ) + ), + } +) + +user_access_error_handlers = RecordResourceConfig.error_handlers.copy() + +user_access_error_handlers.update( + { + LookupError: create_error_handler( + HTTPJSONException(code=404, description="No grant found by given user id.") + ), } ) @@ -399,8 +416,8 @@ class RDMParentGrantsResourceConfig(RecordResourceConfig, ConfiguratorMixin): url_prefix = "/records//access" routes = { - "list": "/users", - "item": "/users/", + "list": "/grants", + "item": "/grants/", } links_config = {} @@ -421,6 +438,37 @@ class RDMParentGrantsResourceConfig(RecordResourceConfig, ConfiguratorMixin): error_handlers = grants_error_handlers +class RDMGrantUserAccessResourceConfig(RecordResourceConfig, ConfiguratorMixin): + """Record grants user access resource configuration.""" + + blueprint_name = "record_user_access" + + url_prefix = "/records//access" + + routes = { + "item": "/users/", + "list": "/users", + } + + links_config = {} + + request_view_args = { + "pid_value": ma.fields.Str(), + "subject_id": ma.fields.Str(), # user id + } + + grant_subject_type = "user" + + response_handlers = { + "application/vnd.inveniordm.v1+json": RecordResourceConfig.response_handlers[ + "application/json" + ], + **RecordResourceConfig.response_handlers, + } + + error_handlers = user_access_error_handlers + + # # Community's records # diff --git a/invenio_rdm_records/resources/resources.py b/invenio_rdm_records/resources/resources.py index 53121355f9..bc06423db4 100644 --- a/invenio_rdm_records/resources/resources.py +++ b/invenio_rdm_records/resources/resources.py @@ -628,6 +628,80 @@ def search(self): return items.to_dict(), 200 +class RDMGrantsAccessResource(RecordResource): + """Users and groups grant access resource.""" + + def create_url_rules(self): + """Create the URL rules for the record resource.""" + + def p(route_name): + """Prefix a route with the URL prefix.""" + return f"{self.config.url_prefix}{self.config.routes[route_name]}" + + return [ + route("GET", p("item"), self.read), + route("DELETE", p("item"), self.delete), + route("GET", p("list"), self.search), + route("PATCH", p("item"), self.partial_update), + ] + + @request_extra_args + @request_view_args + @response_handler() + def read(self): + """Read an access grant for a record by subject.""" + item = self.service.access.read_grant_by_subject( + identity=g.identity, + id_=resource_requestctx.view_args["pid_value"], + subject_id=resource_requestctx.view_args["subject_id"], + subject_type=self.config.grant_subject_type, + expand=resource_requestctx.args.get("expand", False), + ) + + return item.to_dict(), 200 + + @request_view_args + def delete(self): + """Delete an access grant for a record by subject.""" + self.service.access.delete_grant_by_subject( + identity=g.identity, + id_=resource_requestctx.view_args["pid_value"], + subject_id=resource_requestctx.view_args["subject_id"], + subject_type=self.config.grant_subject_type, + ) + return "", 204 + + @request_extra_args + @request_search_args + @request_view_args + @response_handler(many=True) + def search(self): + """List access grants for a record by subject type.""" + items = self.service.access.read_all_grants_by_subject( + identity=g.identity, + id_=resource_requestctx.view_args["pid_value"], + subject_type=self.config.grant_subject_type, + expand=resource_requestctx.args.get("expand", False), + ) + return items.to_dict(), 200 + + @request_extra_args + @request_view_args + @request_data + @response_handler() + def partial_update(self): + """Patch access grant for a record by subject.""" + item = self.service.access.update_grant_by_subject( + identity=g.identity, + id_=resource_requestctx.view_args["pid_value"], + subject_id=resource_requestctx.view_args["subject_id"], + subject_type=self.config.grant_subject_type, + data=resource_requestctx.data, + expand=resource_requestctx.args.get("expand", False), + ) + return item.to_dict(), 200 + + # # Community's records # diff --git a/invenio_rdm_records/services/access/service.py b/invenio_rdm_records/services/access/service.py index c11736985e..23b7c4a78c 100644 --- a/invenio_rdm_records/services/access/service.py +++ b/invenio_rdm_records/services/access/service.py @@ -34,7 +34,7 @@ from ...requests.access import AccessRequestToken, GuestAccessRequest, UserAccessRequest from ...secret_links.errors import InvalidPermissionLevelError -from ..errors import AccessRequestExistsError +from ..errors import AccessRequestExistsError, GrantExistsError from ..results import GrantSubjectExpandableField @@ -365,6 +365,13 @@ def create_grant(self, identity, id_, data, expand=False, uow=None): data, context={"identity": identity}, raise_errors=True ) + for grant in parent.access.grants: + if ( + grant.subject_id == data["subject"]["id"] + and grant.subject_type == data["subject"]["type"] + ): + raise GrantExistsError() + # Creation grant = parent.access.grants.create( subject_type=data["subject"]["type"], @@ -756,3 +763,139 @@ def update_access_settings( record, links_tpl=self.links_item_tpl, ) + + # TODO: rework the whole service and move these to a separate one: + # https://github.com/inveniosoftware/invenio-rdm-records/issues/1685 + def read_grant_by_subject( + self, identity, id_, subject_id, subject_type, expand=False + ): + """Read a specific access grant of a record by subject.""" + record, parent = self.get_parent_and_record_or_draft(id_) + + # Permissions + self.require_permission(identity, "manage", record=record) + + result = None + for grant in parent.access.grants: + if grant.subject_id == subject_id and grant.subject_type == subject_type: + result = grant + + if not result: + raise LookupError(subject_id) + + return self.grant_result_item( + self, + identity, + result, + expand=expand, + ) + + def read_all_grants_by_subject(self, identity, id_, subject_type, expand=False): + """Read access grants of a record (resp. its parent) by subject type.""" + record, parent = self.get_parent_and_record_or_draft(id_) + + # Permissions + self.require_permission(identity, "manage", record=record) + + user_grants = [] + for grant in parent.access.grants: + if grant.subject_type == subject_type: + user_grants.append(grant) + + # Fetching + return self.grant_result_list( + service=self, + identity=identity, + results=user_grants, + expand=expand, + ) + + @unit_of_work() + def update_grant_by_subject( + self, + identity, + id_, + subject_id, + subject_type, + data, + expand=False, + uow=None, + ): + """Update access grant for a record (resp. its parent) by subject.""" + record, parent = self.get_parent_and_record_or_draft(id_) + + # Permissions + self.require_permission(identity, "manage", record=record) + + # Fetching (required for parts of the validation) + grant_index = None + for grant in parent.access.grants: + if grant.subject_id == subject_id and grant.subject_type == subject_type: + grant_index = parent.access.grants.index(grant) + + if grant_index is None: + raise LookupError(subject_id) + + old_grant = parent.access.grants[grant_index] + data = { + "permission": data.get("permission", old_grant.permission), + "subject": { + "type": data.get("subject", {}).get("type", old_grant.subject_type), + "id": data.get("subject", {}).get("id", old_grant.subject_id), + }, + "origin": data.get("origin", old_grant.origin), + } + + # Validation + data, __ = self.schema_grant.load( + data, context={"identity": identity}, raise_errors=True + ) + + # Update + try: + new_grant = parent.access.grants.grant_cls.create( + origin=data["origin"], + permission=data["permission"], + subject_type=data["subject"]["type"], + subject_id=data["subject"]["id"], + resolve_subject=True, + ) + except LookupError: + raise ValidationError( + _("Could not find the specified subject."), field_name="subject.id" + ) + + parent.access.grants[grant_index] = new_grant + + uow.register(ParentRecordCommitOp(parent, indexer_context=dict(service=self))) + + return self.grant_result_item( + self, + identity, + new_grant, + expand=expand, + ) + + @unit_of_work() + def delete_grant_by_subject( + self, identity, id_, subject_id, subject_type, uow=None + ): + """Delete an access grant for a record by subject.""" + record, parent = self.get_parent_and_record_or_draft(id_) + + # Permissions + self.require_permission(identity, "manage", record=record) + + # Deletion + result = None + for grant in parent.access.grants: + if grant.subject_id == subject_id and grant.subject_type == subject_type: + result = grant + parent.access.grants.remove(grant) + + if not result: + raise LookupError(subject_id) + + uow.register(ParentRecordCommitOp(parent, indexer_context=dict(service=self))) + + return True diff --git a/invenio_rdm_records/services/config.py b/invenio_rdm_records/services/config.py index e9298c8413..3cfbd88416 100644 --- a/invenio_rdm_records/services/config.py +++ b/invenio_rdm_records/services/config.py @@ -428,6 +428,7 @@ class RDMRecordServiceConfig(RecordServiceConfig, ConfiguratorMixin): ), "versions": RecordLink("{+api}/records/{id}/versions"), "access_links": RecordLink("{+api}/records/{id}/access/links"), + "access_grants": RecordLink("{+api}/records/{id}/access/grants"), "access_users": RecordLink("{+api}/records/{id}/access/users"), "access_request": RecordLink("{+api}/records/{id}/access/request"), "access": RecordLink("{+api}/records/{id}/access"), diff --git a/invenio_rdm_records/services/errors.py b/invenio_rdm_records/services/errors.py index f34af33b82..5f08c920ef 100644 --- a/invenio_rdm_records/services/errors.py +++ b/invenio_rdm_records/services/errors.py @@ -15,6 +15,12 @@ class RDMRecordsException(Exception): """Base exception for RDMRecords errors.""" +class GrantExistsError(RDMRecordsException): + """Exception raised when trying to create a grant that already exists for user/role.""" + + description = _("Grant for this user/role already exists within this record.") + + class RecordDeletedException(RDMRecordsException): """Exception denoting that the record was deleted.""" diff --git a/invenio_rdm_records/services/permissions.py b/invenio_rdm_records/services/permissions.py index 7d1ec78090..bebb79e05e 100644 --- a/invenio_rdm_records/services/permissions.py +++ b/invenio_rdm_records/services/permissions.py @@ -64,6 +64,7 @@ class RDMRecordPermissionPolicy(RecordPermissionPolicy): can_manage = [ RecordOwners(), RecordCommunitiesAction("curate"), + AccessGrant("manage"), SystemProcess(), ] can_curate = can_manage + [AccessGrant("edit"), SecretLinks("edit")] diff --git a/invenio_rdm_records/services/schemas/parent/access.py b/invenio_rdm_records/services/schemas/parent/access.py index b957438143..ac0dd3b1dd 100644 --- a/invenio_rdm_records/services/schemas/parent/access.py +++ b/invenio_rdm_records/services/schemas/parent/access.py @@ -61,7 +61,7 @@ class SecretLink(Schema): class Agent(Schema): """An agent schema.""" - user = fields.Integer(required=True) + user = fields.String(required=True) class AccessSettingsSchema(Schema): diff --git a/invenio_rdm_records/services/services.py b/invenio_rdm_records/services/services.py index 1cebe079d9..b433a68142 100644 --- a/invenio_rdm_records/services/services.py +++ b/invenio_rdm_records/services/services.py @@ -86,6 +86,7 @@ def expandable_fields(self): return [ EntityResolverExpandableField("parent.review.receiver"), ParentCommunitiesExpandableField("parent.communities.default"), + EntityResolverExpandableField("parent.access.owned_by"), ] @property diff --git a/invenio_rdm_records/views.py b/invenio_rdm_records/views.py index b68ab7baca..e91ef4f471 100644 --- a/invenio_rdm_records/views.py +++ b/invenio_rdm_records/views.py @@ -80,6 +80,12 @@ def create_parent_grants_bp(app): return ext.parent_grants_resource.as_blueprint() +def create_grant_user_access_bp(app): + """Create grant user access blueprint.""" + ext = app.extensions["invenio-rdm-records"] + return ext.grant_user_access_resource.as_blueprint() + + def create_pid_resolver_resource_bp(app): """Create pid resource blueprint.""" ext = app.extensions["invenio-rdm-records"] diff --git a/setup.cfg b/setup.cfg index 4ff8799533..9fc6559cf4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -86,6 +86,7 @@ invenio_base.api_blueprints = invenio_rdm_records_ext = invenio_rdm_records.views:blueprint invenio_rdm_records_parent_links = invenio_rdm_records.views:create_parent_record_links_bp invenio_rdm_records_parent_grants = invenio_rdm_records.views:create_parent_grants_bp + invenio_rdm_records_user_access = invenio_rdm_records.views:create_grant_user_access_bp invenio_rdm_records_record_files = invenio_rdm_records.views:create_record_files_bp invenio_rdm_records_record_media_files = invenio_rdm_records.views:create_record_media_files_bp invenio_rdm_community_records = invenio_rdm_records.views:create_community_records_bp diff --git a/tests/resources/test_resources_user_access.py b/tests/resources/test_resources_user_access.py new file mode 100644 index 0000000000..493620f096 --- /dev/null +++ b/tests/resources/test_resources_user_access.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-RDM-Records is free software; you can redistribute it and/or modify +# it under the terms of the MIT License; see LICENSE file for more details. + +"""Tests for RDMGrantsAccessResource.""" +import json + + +def test_read_by_subject_found( + running_app, client_with_login, minimal_record, headers, community_owner +): + """Test read grant by user id.""" + # create record + client = client_with_login + response = client.post("/records", data=json.dumps(minimal_record), headers=headers) + assert response.status_code == 201 + recid = response.json["id"] + user_id = community_owner.id + + # create grant + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + response = client.post( + f"/records/{recid}/access/grants", json=grant_payload, headers=headers + ) + assert response.status_code == 201 + + # read grant + response = client.get(f"/records/{recid}/access/users/{user_id}", headers=headers) + assert response.status_code == 200 + assert response.json["permission"] == "preview" + assert response.json["subject"]["id"] == user_id + assert response.json["subject"]["type"] == "user" + + +def test_read_by_subject_not_found( + running_app, client_with_login, minimal_record, headers, community_owner +): + """Test read grant by user id. Not found.""" + # create record + client = client_with_login + response = client.post("/records", data=json.dumps(minimal_record), headers=headers) + assert response.status_code == 201 + recid = response.json["id"] + user_id = community_owner.id + + # create grant + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + response = client.post( + f"/records/{recid}/access/grants", json=grant_payload, headers=headers + ) + assert response.status_code == 201 + + # read grant with different user id + response = client.get(f"/records/{recid}/access/users/3", headers=headers) + assert response.status_code == 404 + assert response.json["message"] == "No grant found by given user id." + + +def test_delete_by_subject_found( + running_app, client_with_login, minimal_record, headers, community_owner +): + """Test delete grant by user id.""" + # create record + client = client_with_login + response = client.post("/records", data=json.dumps(minimal_record), headers=headers) + assert response.status_code == 201 + recid = response.json["id"] + user_id = community_owner.id + + # create grant + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + response = client.post( + f"/records/{recid}/access/grants", json=grant_payload, headers=headers + ) + assert response.status_code == 201 + + # read grant + response = client.get(f"/records/{recid}/access/users/{user_id}", headers=headers) + assert response.status_code == 200 + + # delete grant + response = client.delete( + f"/records/{recid}/access/users/{user_id}", headers=headers + ) + assert response.status_code == 204 + + # read grant + response = client.get(f"/records/{recid}/access/users/{user_id}", headers=headers) + assert response.status_code == 404 + assert response.json["message"] == "No grant found by given user id." + + +def test_delete_by_subject_id_not_found( + running_app, client_with_login, minimal_record, headers +): + """Test delete grant by user id. Not found.""" + # create record + client = client_with_login + response = client.post("/records", data=json.dumps(minimal_record), headers=headers) + assert response.status_code == 201 + recid = response.json["id"] + + # delete grant + response = client.delete(f"/records/{recid}/access/users/2", headers=headers) + assert response.status_code == 404 + assert response.json["message"] == "No grant found by given user id." + + +def test_search_grants_by_subject( + running_app, client_with_login, minimal_record, headers, community_owner, curator +): + """Test get all user grants for record.""" + # create record + client = client_with_login + response = client.post("/records", data=json.dumps(minimal_record), headers=headers) + assert response.status_code == 201 + recid = response.json["id"] + + user_id = community_owner.id + user_id2 = curator.id + + # create user grants + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "edit", + } + grant_payload2 = { + "subject": {"type": "user", "id": user_id2}, + "permission": "preview", + } + response = client.post( + f"/records/{recid}/access/grants", json=grant_payload2, headers=headers + ) + assert response.status_code == 201 + + response = client.post( + f"/records/{recid}/access/grants", json=grant_payload, headers=headers + ) + assert response.status_code == 201 + + # search user grants + response = client.get(f"/records/{recid}/access/users", headers=headers) + assert response.status_code == 200 + assert response.json["hits"]["total"] == 2 + assert response.json["hits"]["hits"][0]["subject"]["type"] == "user" + assert response.json["hits"]["hits"][0]["subject"]["id"] == user_id2 + assert response.json["hits"]["hits"][1]["subject"]["type"] == "user" + assert response.json["hits"]["hits"][1]["subject"]["id"] == user_id + + +def test_search_grants_by_subject_not_found( + running_app, client_with_login, minimal_record, headers +): + """Test get all user grants for record. Not found.""" + # create record + client = client_with_login + response = client.post("/records", data=json.dumps(minimal_record), headers=headers) + assert response.status_code == 201 + recid = response.json["id"] + + # search user grants + response = client.get(f"/records/{recid}/access/users", headers=headers) + assert response.status_code == 200 + assert response.json["hits"]["total"] == 0 + + +def test_patch_grants_by_subject( + running_app, client_with_login, minimal_record, headers, community_owner +): + """Test partial update of user grants for record.""" + # create record + client = client_with_login + response = client.post("/records", data=json.dumps(minimal_record), headers=headers) + assert response.status_code == 201 + recid = response.json["id"] + user_id = community_owner.id + + # create user grants + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + "origin": "origin", + } + response = client.post( + f"/records/{recid}/access/grants", json=grant_payload, headers=headers + ) + assert response.status_code == 201 + + # update grant + payload = { + "permission": "manage", + } + response = client.patch( + f"/records/{recid}/access/users/{user_id}", json=payload, headers=headers + ) + assert response.status_code == 200 + + # read grant + response = client.get(f"/records/{recid}/access/users/{user_id}", headers=headers) + assert response.status_code == 200 + assert response.json["permission"] == "manage" + + +def test_patch_grants_by_subject_not_found( + running_app, client_with_login, minimal_record, headers +): + """Test partial update of user grants for record. Not found""" + # create record + client = client_with_login + response = client.post("/records", data=json.dumps(minimal_record), headers=headers) + assert response.status_code == 201 + recid = response.json["id"] + + # update grant + payload = { + "permission": "manage", + } + response = client.patch( + f"/records/{recid}/access/users/55", json=payload, headers=headers + ) + assert response.status_code == 404 + assert response.json["message"] == "No grant found by given user id." diff --git a/tests/resources/test_serialized_links.py b/tests/resources/test_serialized_links.py index 08d8b1fa89..0f36a39aaf 100644 --- a/tests/resources/test_serialized_links.py +++ b/tests/resources/test_serialized_links.py @@ -74,6 +74,7 @@ def test_draft_links(client, draft_json, minimal_record, headers): "requests": f"https://127.0.0.1:5000/api/records/{pid_value}/requests", # noqa "access": f"https://127.0.0.1:5000/api/records/{pid_value}/access", "access_request": f"https://127.0.0.1:5000/api/records/{pid_value}/access/request", + "access_grants": f"https://127.0.0.1:5000/api/records/{pid_value}/access/grants", "access_users": f"https://127.0.0.1:5000/api/records/{pid_value}/access/users", } assert expected_links == created_draft_links == read_draft_links @@ -114,6 +115,7 @@ def test_record_links(client, published_json, headers): "requests": f"https://127.0.0.1:5000/api/records/{pid_value}/requests", # noqa "access": f"https://127.0.0.1:5000/api/records/{pid_value}/access", "access_request": f"https://127.0.0.1:5000/api/records/{pid_value}/access/request", + "access_grants": f"https://127.0.0.1:5000/api/records/{pid_value}/access/grants", "access_users": f"https://127.0.0.1:5000/api/records/{pid_value}/access/users", } assert expected_links == published_record_links == read_record_links diff --git a/tests/services/test_service_access.py b/tests/services/test_service_access.py new file mode 100644 index 0000000000..cc66b5dc04 --- /dev/null +++ b/tests/services/test_service_access.py @@ -0,0 +1,470 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-RDM-Records is free software; you can redistribute it and/or modify +# it under the terms of the MIT License; see LICENSE file for more details. + +"""Tests for RecordAccessService.""" +import pytest +from invenio_records_resources.services.errors import PermissionDeniedError + +from invenio_rdm_records.proxies import current_rdm_records +from invenio_rdm_records.services.errors import GrantExistsError + + +def test_cant_create_multiple_grants_for_same_user(running_app, minimal_record, users): + """Test that grant for the same record can not be created more than 1 time for a certain user.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create grant + user_id = str(users[0].id) + access_service = records_service.access + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + + grant = access_service.create_grant(superuser_identity, record.id, grant_payload) + assert grant.to_dict() == { + "permission": "preview", + "subject": {"id": user_id, "type": "user"}, + "origin": None, + } + + # try to create again + with pytest.raises(GrantExistsError): + access_service.create_grant(superuser_identity, record.id, grant_payload) + + +def test_cant_create_multiple_grants_for_same_role(running_app, minimal_record, roles): + """Test that grant for the same record can not be created more than 1 time for a certain role.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create grant + access_service = records_service.access + grant_payload = {"subject": {"type": "role", "id": "test"}, "permission": "preview"} + + grant = access_service.create_grant(superuser_identity, record.id, grant_payload) + assert grant.to_dict() == { + "permission": "preview", + "subject": {"id": "test", "type": "role"}, + "origin": None, + } + + # try to create again + with pytest.raises(GrantExistsError): + access_service.create_grant(superuser_identity, record.id, grant_payload) + + +def test_read_grant_by_subjectid_found(running_app, minimal_record, users): + """Test read grant by user id.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create grant + subject_type = "user" + access_service = records_service.access + user_id = str(users[0].id) + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # read grant + grant = access_service.read_grant_by_subject( + superuser_identity, record.id, user_id, subject_type + ) + assert grant.to_dict() == { + "permission": "preview", + "subject": {"id": user_id, "type": "user"}, + "origin": None, + } + + +def test_read_grant_by_subjectid_not_found(running_app, minimal_record, users): + """Test read grant by user id. Not found.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create grant for user + subject_type = "user" + user_id = str(users[0].id) + access_service = records_service.access + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # try to read grant for different user + access_service = records_service.access + with pytest.raises(LookupError): + access_service.read_grant_by_subject( + superuser_identity, record.id, "10000000", subject_type + ) + + +def test_read_grant_by_subjectid_no_grants(running_app, minimal_record, users): + """Test read grant by user id. No grants.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # try to read grant + subject_type = "user" + user_id = str(users[0].id) + access_service = records_service.access + with pytest.raises(LookupError): + access_service.read_grant_by_subject( + superuser_identity, record.id, user_id, subject_type + ) + + +def test_read_grant_by_subjectid_provide_role(running_app, minimal_record, roles): + """Test read grant by user id while searching by role""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create grant for role "test" + subject_type = "user" + access_service = records_service.access + grant_payload = {"subject": {"type": "role", "id": "test"}, "permission": "preview"} + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # try to read grant by role "test" + access_service = records_service.access + with pytest.raises(LookupError): + access_service.read_grant_by_subject( + superuser_identity, record.id, "test", subject_type + ) + + +def test_delete_grant_by_subjectid_found(running_app, minimal_record, users): + """Test delete grant by user id.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create grant + subject_type = "user" + access_service = records_service.access + user_id = str(users[0].id) + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # delete grant + grant = access_service.delete_grant_by_subject( + superuser_identity, record.id, user_id, subject_type + ) + assert grant is True + + # try to read grant + with pytest.raises(LookupError): + access_service.read_grant_by_subject( + superuser_identity, record.id, user_id, subject_type + ) + + +def test_delete_grant_by_subjectid_not_found(running_app, minimal_record, users): + """Test delete grant by user id. Not found.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create grant for user + subject_type = "user" + user_id = str(users[0].id) + access_service = records_service.access + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # try to delete grant for different user + access_service = records_service.access + with pytest.raises(LookupError): + access_service.delete_grant_by_subject( + superuser_identity, record.id, "10000000", subject_type + ) + + +def test_delete_grant_by_subjectid_no_grants(running_app, minimal_record, users): + """Test delete grant by user id. No grants.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # try to read grant + subject_type = "user" + access_service = records_service.access + with pytest.raises(LookupError): + access_service.delete_grant_by_subject( + superuser_identity, record.id, str(users[0].id), subject_type + ) + + +def test_delete_grant_by_subjectid_provide_role(running_app, minimal_record, roles): + """Test delete grant by user id while searching by role""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create grant for role "test" + subject_type = "user" + access_service = records_service.access + grant_payload = {"subject": {"type": "role", "id": "test"}, "permission": "preview"} + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # try to read grant by role "test" + access_service = records_service.access + with pytest.raises(LookupError): + access_service.delete_grant_by_subject( + superuser_identity, record.id, "test", subject_type + ) + + +def test_read_all_grants_by_subject_found(running_app, minimal_record, users, roles): + """Test read user grants.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create user grants + subject_type = "user" + access_service = records_service.access + user_id = str(users[0].id) + user_id2 = str(users[1].id) + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + } + grant_payload2 = { + "subject": {"type": "user", "id": user_id2}, + "permission": "preview", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + access_service.create_grant(superuser_identity, record.id, grant_payload2) + + # create role grants + grant_payload = { + "subject": {"type": "role", "id": "test"}, + "permission": "preview", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # search user grants + grants = access_service.read_all_grants_by_subject( + superuser_identity, record.id, subject_type + ) + assert grants.to_dict()["hits"]["total"] == 2 + assert grants.to_dict()["hits"]["hits"][0]["subject"]["type"] == "user" + assert grants.to_dict()["hits"]["hits"][0]["subject"]["id"] == user_id + assert grants.to_dict()["hits"]["hits"][1]["subject"]["type"] == "user" + assert grants.to_dict()["hits"]["hits"][1]["subject"]["id"] == user_id2 + + +def test_read_all_grants_by_subject_not_found( + running_app, minimal_record, users, roles +): + """Test read user grants. Not found.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + access_service = records_service.access + + # create role grants + grant_payload = { + "subject": {"type": "role", "id": "test"}, + "permission": "preview", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # search user grants + subject_type = "user" + grants = access_service.read_all_grants_by_subject( + superuser_identity, record.id, subject_type + ) + assert grants.to_dict()["hits"]["total"] == 0 + + +def test_update_grant_by_subject_found(running_app, minimal_record, users): + """Test partial update of user grant.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create user grant + subject_type = "user" + access_service = records_service.access + user_id = str(users[0].id) + grant_payload = { + "subject": {"type": "user", "id": user_id}, + "permission": "preview", + "origin": "origin", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # update user grant + payload = {"permission": "manage"} + result = access_service.update_grant_by_subject( + superuser_identity, + id_=record.id, + subject_id=user_id, + subject_type=subject_type, + data=payload, + ) + assert result.to_dict()["permission"] == "manage" + + # read grant + grant = access_service.read_grant_by_subject( + superuser_identity, record.id, user_id, subject_type + ) + assert grant.to_dict()["permission"] == "manage" + + +def test_update_grant_by_subject_not_found(running_app, minimal_record, roles): + """Test partial update of user grant. Not found.""" + # create record + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(superuser_identity, minimal_record) + record = records_service.publish(superuser_identity, draft.id) + + # create role grant + access_service = records_service.access + grant_payload = { + "subject": {"type": "role", "id": "test"}, + "permission": "preview", + "origin": "origin", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # update user grant + payload = {"permission": "manage"} + subject_type = "user" + with pytest.raises(LookupError): + access_service.update_grant_by_subject( + superuser_identity, + id_=record.id, + subject_id="test", + subject_type=subject_type, + data=payload, + ) + + +def test_update_grant_by_subject_permissions( + running_app, minimal_record, uploader, users, test_user +): + """Test permissions when you partially update a user grant.""" + # create record (owner - uploader) + records_service = current_rdm_records.records_service + draft = records_service.create(uploader.identity, minimal_record) + record = records_service.publish(uploader.identity, draft.id) + + # grant manage access to user + subject_type = "user" + access_service = records_service.access + user_with_grant = test_user + user_with_grant_id = user_with_grant.id + grant_payload = { + "subject": {"type": subject_type, "id": user_with_grant_id}, + "permission": "manage", + "origin": "origin", + } + access_service.create_grant( + running_app.superuser_identity, record.id, grant_payload + ) + + # assert that user can manage record - they can create a new version + res = records_service.new_version(user_with_grant.identity, id_=record.id) + assert res.data["versions"]["index"] == 2 + + # update user grant - view access + payload = {"permission": "view"} + result = access_service.update_grant_by_subject( + uploader.identity, + id_=record.id, + subject_id=user_with_grant_id, + subject_type=subject_type, + data=payload, + ) + assert result.to_dict()["permission"] == "view" + + # assert that now user can not create a new version + with pytest.raises(PermissionDeniedError): + records_service.new_version(user_with_grant.identity, id_=record.id) + + +def test_delete_grant_by_subject_permissions( + running_app, minimal_record, uploader, users, test_user +): + """Test permissions when you delete a user grant.""" + # create record (owner - uploader) + superuser_identity = running_app.superuser_identity + records_service = current_rdm_records.records_service + draft = records_service.create(uploader.identity, minimal_record) + record = records_service.publish(uploader.identity, draft.id) + + # grant manage access to user + subject_type = "user" + access_service = records_service.access + user_with_grant = test_user + user_with_grant_id = user_with_grant.id + grant_payload = { + "subject": {"type": subject_type, "id": user_with_grant_id}, + "permission": "manage", + "origin": "origin", + } + access_service.create_grant(superuser_identity, record.id, grant_payload) + + # assert that user can manage record - they can create a new version + res = records_service.new_version(user_with_grant.identity, id_=record.id) + assert res.data["versions"]["index"] == 2 + + # delete user grant + access_service.delete_grant_by_subject( + superuser_identity, record.id, user_with_grant_id, subject_type + ) + + # assert that now user can not create a new version + with pytest.raises(PermissionDeniedError): + records_service.new_version(user_with_grant.identity, id_=record.id)