Skip to content

Commit

Permalink
SCKAN-323 feat: Add permission check to notes
Browse files Browse the repository at this point in the history
  • Loading branch information
afonsobspinto committed Oct 24, 2024
1 parent d9607e6 commit 6586e23
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 39 deletions.
65 changes: 64 additions & 1 deletion backend/composer/api/permissions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from rest_framework import permissions
from composer.enums import CSState
from rest_framework.exceptions import PermissionDenied
from composer.models import ConnectivityStatement, Sentence



# Permission Checks: Only staff users can update a Connectivity Statement when it is in state exported
Expand Down Expand Up @@ -47,4 +50,64 @@ def has_object_permission(self, request, view, obj):
return True

# Write permissions are only allowed to the owner of the related ConnectivityStatement
return obj.connectivity_statement.owner == request.user
return obj.connectivity_statement.owner == request.user


class IsSentenceOrStatementOwnerOrSystemUserOrReadOnly(permissions.BasePermission):
"""
Custom permission to allow:
- System user to bypass all checks.
- Only the owner of a sentence or connectivity statement can create a note.
"""

def has_permission(self, request, view):
# Allow system user to bypass all checks
if request.user.username == 'system' and request.user.is_staff:
return True

# Allow read-only access (GET, HEAD, OPTIONS)
if request.method in permissions.SAFE_METHODS:
return True

# For POST (create), PUT, PATCH (update), or DELETE, check ownership
return self.check_ownership(request)

def has_object_permission(self, request, view, obj):
# Allow system user to bypass all checks
if request.user.username == 'system' and request.user.is_staff:
return True

# Allow read-only access (GET, HEAD, OPTIONS)
if request.method in permissions.SAFE_METHODS:
return True

# Check ownership for unsafe methods (PUT, PATCH, DELETE)
return self.check_ownership(request)


def check_ownership(self, request):
"""
Helper method to check ownership of sentence or connectivity statement.
Raises PermissionDenied if the user is not the owner.
"""
sentence_id = request.data.get('sentence_id')
connectivity_statement_id = request.data.get('connectivity_statement_id')

# Check ownership for sentence_id
if sentence_id:
try:
sentence = Sentence.objects.get(id=sentence_id)
except Sentence.DoesNotExist:
raise PermissionDenied("Invalid sentence_id.")
if sentence.owner != request.user:
raise PermissionDenied("You are not the owner of this sentence.")

# Check ownership for connectivity_statement_id
if connectivity_statement_id:
try:
connectivity_statement = ConnectivityStatement.objects.get(id=connectivity_statement_id)
except ConnectivityStatement.DoesNotExist:
raise PermissionDenied("Invalid connectivity_statement_id.")
if connectivity_statement.owner != request.user:
raise PermissionDenied("You are not the owner of this connectivity statement.")
return True
105 changes: 67 additions & 38 deletions backend/composer/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
AnatomicalEntityFilter,
NoteFilter,
ViaFilter,
SpecieFilter, DestinationFilter,
SpecieFilter,
DestinationFilter,
)
from .serializers import (
AnatomicalEntitySerializer,
Expand All @@ -38,10 +39,17 @@
TagSerializer,
ViaSerializer,
ProvenanceSerializer,
SexSerializer, ConnectivityStatementUpdateSerializer, DestinationSerializer, BaseConnectivityStatementSerializer,
SexSerializer,
ConnectivityStatementUpdateSerializer,
DestinationSerializer,
BaseConnectivityStatementSerializer,
)
from .permissions import (
IsSentenceOrStatementOwnerOrSystemUserOrReadOnly,
IsStaffUserIfExportedStateInConnectivityStatement,
IsOwnerOrAssignOwnerOrCreateOrReadOnly,
IsOwnerOfConnectivityStatementOrReadOnly,
)
from .permissions import IsStaffUserIfExportedStateInConnectivityStatement, IsOwnerOrAssignOwnerOrCreateOrReadOnly, \
IsOwnerOfConnectivityStatementOrReadOnly
from ..models import (
AnatomicalEntity,
Phenotype,
Expand All @@ -54,22 +62,27 @@
Tag,
Via,
Provenance,
Sex, Destination, GraphRenderingState,
Sex,
Destination,
GraphRenderingState,
)


# Mixins
class AssignOwnerMixin(viewsets.GenericViewSet):
@action(detail=True, methods=['patch'], permission_classes=[permissions.IsAuthenticated])
@action(
detail=True, methods=["patch"], permission_classes=[permissions.IsAuthenticated]
)
def assign_owner(self, request, pk=None):
instance = self.get_object()
instance.assign_owner(request)
serializer = self.get_serializer(instance)
return Response(serializer.data)

class TagMixin(
viewsets.GenericViewSet,
):

class TagMixin(viewsets.GenericViewSet):
permission_classes = [IsOwnerOrAssignOwnerOrCreateOrReadOnly]

@extend_schema(
parameters=[
OpenApiParameter(
Expand Down Expand Up @@ -142,7 +155,11 @@ def add_provenance(self, request, pk=None, uri=None):
],
request=None,
)
@action(detail=True, methods=["delete"], url_path="del_provenance/(?P<provenance_id>\d+)")
@action(
detail=True,
methods=["delete"],
url_path="del_provenance/(?P<provenance_id>\d+)",
)
def del_provenance(self, request, pk=None, provenance_id=None):
count, deleted = Provenance.objects.filter(
id=provenance_id, connectivity_statement_id=pk
Expand Down Expand Up @@ -211,8 +228,10 @@ def clone_statement(self, request, pk=None, statement_id=None):
instance.origins = None
instance.save()
instance.species.add(*self.get_object().species.all())
provenances = (Provenance(connectivity_statement=instance, uri=provenance.uri) for provenance in
self.get_object().provenance_set.all())
provenances = (
Provenance(connectivity_statement=instance, uri=provenance.uri)
for provenance in self.get_object().provenance_set.all()
)
Provenance.objects.bulk_create(provenances)
return Response(self.get_serializer(instance).data)

Expand All @@ -227,23 +246,20 @@ class ModelRetrieveViewSet(
# mixins.DestroyModelMixin,
# mixins.ListModelMixin,
viewsets.GenericViewSet,
):
...
): ...


class ModelCreateRetrieveViewSet(
ModelRetrieveViewSet,
mixins.CreateModelMixin,
mixins.ListModelMixin,
):
...
): ...


class ModelNoDeleteViewSet(
ModelCreateRetrieveViewSet,
mixins.UpdateModelMixin,
):
...
): ...


class AnatomicalEntityViewSet(viewsets.ReadOnlyModelViewSet):
Expand Down Expand Up @@ -275,6 +291,7 @@ class ProjectionPhenotypeViewSet(viewsets.ReadOnlyModelViewSet):
"""
Projection Phenotype
"""

queryset = ProjectionPhenotype.objects.all()
serializer_class = ProjectionPhenotypeSerializer
permission_classes = [
Expand Down Expand Up @@ -302,7 +319,7 @@ class NoteViewSet(viewsets.ModelViewSet):
queryset = Note.objects.all()
serializer_class = NoteSerializer
permission_classes = [
permissions.IsAuthenticatedOrReadOnly,
IsSentenceOrStatementOwnerOrSystemUserOrReadOnly
]
filterset_class = NoteFilter

Expand Down Expand Up @@ -330,7 +347,7 @@ class ConnectivityStatementViewSet(
service = ConnectivityStatementStateService

def get_serializer_class(self):
if self.action == 'list':
if self.action == "list":
return BaseConnectivityStatementSerializer
return ConnectivityStatementSerializer

Expand All @@ -341,52 +358,65 @@ def get_queryset(self):

def handle_graph_rendering_state(self, instance, graph_rendering_state_data, user):
if graph_rendering_state_data:
if hasattr(instance, 'graph_rendering_state') and instance.graph_rendering_state is not None:
if (
hasattr(instance, "graph_rendering_state")
and instance.graph_rendering_state is not None
):
# Update the existing graph state
instance.graph_rendering_state.serialized_graph = graph_rendering_state_data.get(
'serialized_graph', instance.graph_rendering_state.serialized_graph)
instance.graph_rendering_state.serialized_graph = (
graph_rendering_state_data.get(
"serialized_graph",
instance.graph_rendering_state.serialized_graph,
)
)
instance.graph_rendering_state.saved_by = user
instance.graph_rendering_state.save()
else:
# Create a new graph state if none exists
GraphRenderingState.objects.create(
connectivity_statement=instance,
serialized_graph=graph_rendering_state_data.get('serialized_graph', {}),
saved_by=user
serialized_graph=graph_rendering_state_data.get(
"serialized_graph", {}
),
saved_by=user,
)

@extend_schema(
methods=['PUT'],
methods=["PUT"],
request=ConnectivityStatementUpdateSerializer,
responses={200: ConnectivityStatementSerializer}
responses={200: ConnectivityStatementSerializer},
)
def update(self, request, *args, **kwargs):
origin_ids = request.data.pop('origins', None)
graph_rendering_state_data = request.data.pop('graph_rendering_state', None)
origin_ids = request.data.pop("origins", None)
graph_rendering_state_data = request.data.pop("graph_rendering_state", None)

response = super().update(request, *args, **kwargs)

if response.status_code == status.HTTP_200_OK:
instance = self.get_object()
self.handle_graph_rendering_state(instance, graph_rendering_state_data, request.user)
self.handle_graph_rendering_state(
instance, graph_rendering_state_data, request.user
)
if origin_ids is not None:
instance.set_origins(origin_ids)

return response

@extend_schema(
methods=['PATCH'],
methods=["PATCH"],
request=ConnectivityStatementUpdateSerializer,
responses={200: ConnectivityStatementSerializer}
responses={200: ConnectivityStatementSerializer},
)
def partial_update(self, request, *args, **kwargs):
graph_rendering_state_data = request.data.pop('graph_rendering_state', None)
graph_rendering_state_data = request.data.pop("graph_rendering_state", None)

response = super().partial_update(request, *args, **kwargs)

if response.status_code == status.HTTP_200_OK:
instance = self.get_object()
self.handle_graph_rendering_state(instance, graph_rendering_state_data, request.user)
self.handle_graph_rendering_state(
instance, graph_rendering_state_data, request.user
)

return response

Expand All @@ -398,6 +428,7 @@ class KnowledgeStatementViewSet(
"""
KnowledgeStatement that only allows GET to get the list of ConnectivityStatements
"""

model = ConnectivityStatement
queryset = ConnectivityStatement.objects.exported()
serializer_class = KnowledgeStatementSerializer
Expand All @@ -409,7 +440,7 @@ class KnowledgeStatementViewSet(

@property
def allowed_methods(self):
return ['GET']
return ["GET"]

def get_serializer_class(self):
return KnowledgeStatementSerializer
Expand Down Expand Up @@ -508,9 +539,7 @@ class DestinationViewSet(viewsets.ModelViewSet):

queryset = Destination.objects.all()
serializer_class = DestinationSerializer
permission_classes = [
IsOwnerOfConnectivityStatementOrReadOnly
]
permission_classes = [IsOwnerOfConnectivityStatementOrReadOnly]
filterset_class = DestinationFilter


Expand Down

0 comments on commit 6586e23

Please sign in to comment.