Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨(models/api) add RBAC on templates linking accesses to a team name #6

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/backend/core/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,3 @@ class TemplateAdmin(admin.ModelAdmin):
"""Template admin interface declaration."""

inlines = (TemplateAccessInline,)


@admin.register(models.Team)
class TeamAdmin(admin.ModelAdmin):
"""Team admin interface declaration."""
8 changes: 5 additions & 3 deletions src/backend/core/api/serializers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Client serializers for the publish core app."""
from django.db.models import Q
from django.utils.translation import gettext_lazy as _

from rest_framework import exceptions, serializers
Expand Down Expand Up @@ -31,7 +32,7 @@ class TemplateAccessSerializer(serializers.ModelSerializer):

class Meta:
model = models.TemplateAccess
fields = ["id", "user", "role", "abilities"]
fields = ["id", "user", "team", "role", "abilities"]
read_only_fields = ["id", "abilities"]

def update(self, instance, validated_data):
Expand Down Expand Up @@ -68,6 +69,7 @@ def validate(self, attrs):

# Create
else:
teams = user.get_teams()
try:
template_id = self.context["template_id"]
except KeyError as exc:
Expand All @@ -76,8 +78,8 @@ def validate(self, attrs):
) from exc

if not models.TemplateAccess.objects.filter(
Q(user=user) | Q(team__in=teams),
template=template_id,
user=user,
role__in=[models.RoleChoices.OWNER, models.RoleChoices.ADMIN],
).exists():
raise exceptions.PermissionDenied(
Expand All @@ -87,8 +89,8 @@ def validate(self, attrs):
if (
role == models.RoleChoices.OWNER
and not models.TemplateAccess.objects.filter(
Q(user=user) | Q(team__in=teams),
template=template_id,
user=user,
role=models.RoleChoices.OWNER,
).exists()
):
Expand Down
42 changes: 31 additions & 11 deletions src/backend/core/api/viewsets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""API endpoints"""
from io import BytesIO

from django.contrib.postgres.aggregates import ArrayAgg
from django.db.models import (
OuterRef,
Q,
Expand Down Expand Up @@ -156,14 +157,22 @@ def get_queryset(self):
if not self.request.user.is_authenticated:
return models.Template.objects.filter(is_public=True)

user_role_query = models.TemplateAccess.objects.filter(
user=self.request.user, template=OuterRef("pk")
).values("role")[:1]
user = self.request.user
teams = user.get_teams()

user_roles_query = (
models.TemplateAccess.objects.filter(
Q(user=user) | Q(team__in=teams), template=OuterRef("pk")
)
.values("template")
.annotate(roles_array=ArrayAgg("role"))
.values("roles_array")
)
return (
models.Template.objects.filter(
Q(accesses__user=self.request.user) | Q(is_public=True)
Q(accesses__user=user) | Q(accesses__team__in=teams) | Q(is_public=True)
)
.annotate(user_role=Subquery(user_role_query))
.annotate(user_roles=Subquery(user_roles_query))
.distinct()
)

Expand Down Expand Up @@ -263,19 +272,30 @@ def get_queryset(self):
queryset = queryset.filter(template=self.kwargs["template_id"])

if self.action == "list":
user = self.request.user
teams = user.get_teams()

user_roles_query = (
models.TemplateAccess.objects.filter(
Q(user=user) | Q(team__in=teams),
template=self.kwargs["template_id"],
)
.values("template")
.annotate(roles_array=ArrayAgg("role"))
.values("roles_array")
)

# Limit to template access instances related to a template THAT also has
# a template access
# instance for the logged-in user (we don't want to list only the template
# access instances pointing to the logged-in user)
user_role_query = models.TemplateAccess.objects.filter(
template=self.kwargs["template_id"],
template__accesses__user=self.request.user,
).values("role")[:1]
queryset = (
queryset.filter(
template__accesses__user=self.request.user,
Q(template__accesses__user=user)
| Q(template__accesses__team__in=teams),
template=self.kwargs["template_id"],
)
.annotate(user_role=Subquery(user_role_query))
.annotate(user_roles=Subquery(user_roles_query))
.distinct()
)
return queryset
Expand Down
17 changes: 14 additions & 3 deletions src/backend/core/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ def users(self, create, extracted, **kwargs):
if create and extracted:
for item in extracted:
if isinstance(item, models.User):
TemplateAccessFactory(template=self, user=item)
UserTemplateAccessFactory(template=self, user=item)
else:
TemplateAccessFactory(template=self, user=item[0], role=item[1])
UserTemplateAccessFactory(template=self, user=item[0], role=item[1])


class TemplateAccessFactory(factory.django.DjangoModelFactory):
class UserTemplateAccessFactory(factory.django.DjangoModelFactory):
"""Create fake template user accesses for testing."""

class Meta:
Expand All @@ -55,3 +55,14 @@ class Meta:
template = factory.SubFactory(TemplateFactory)
user = factory.SubFactory(UserFactory)
role = factory.fuzzy.FuzzyChoice([r[0] for r in models.RoleChoices.choices])


class TeamTemplateAccessFactory(factory.django.DjangoModelFactory):
"""Create fake template team accesses for testing."""

class Meta:
model = models.TemplateAccess

template = factory.SubFactory(TemplateFactory)
team = factory.Sequence(lambda n: f"team{n}")
role = factory.fuzzy.FuzzyChoice([r[0] for r in models.RoleChoices.choices])
27 changes: 8 additions & 19 deletions src/backend/core/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Generated by Django 5.0.2 on 2024-02-22 20:34
# Generated by Django 5.0.2 on 2024-02-24 17:39

import django.contrib.auth.models
import django.core.validators
Expand All @@ -18,21 +18,6 @@ class Migration(migrations.Migration):
]

operations = [
migrations.CreateModel(
name='Team',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
('name', models.CharField(max_length=100, unique=True)),
],
options={
'verbose_name': 'Team',
'verbose_name_plural': 'Teams',
'db_table': 'publish_role',
'ordering': ('name',),
},
),
migrations.CreateModel(
name='Template',
fields=[
Expand Down Expand Up @@ -87,8 +72,8 @@ class Migration(migrations.Migration):
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
('team', models.CharField(blank=True, max_length=100)),
('role', models.CharField(choices=[('member', 'Member'), ('administrator', 'Administrator'), ('owner', 'Owner')], default='member', max_length=20)),
('team', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='accesses', to='core.team')),
('template', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='accesses', to='core.template')),
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='accesses', to=settings.AUTH_USER_MODEL)),
],
Expand All @@ -100,10 +85,14 @@ class Migration(migrations.Migration):
),
migrations.AddConstraint(
model_name='templateaccess',
constraint=models.UniqueConstraint(fields=('user', 'template'), name='unique_template_user', violation_error_message='This user is already in this template.'),
constraint=models.UniqueConstraint(condition=models.Q(('user__isnull', False)), fields=('user', 'template'), name='unique_template_user', violation_error_message='This user is already in this template.'),
),
migrations.AddConstraint(
model_name='templateaccess',
constraint=models.UniqueConstraint(condition=models.Q(('team__gt', '')), fields=('team', 'template'), name='unique_template_team', violation_error_message='This team is already in this template.'),
),
migrations.AddConstraint(
model_name='templateaccess',
constraint=models.UniqueConstraint(fields=('team', 'template'), name='unique_template_team', violation_error_message='This team is already in this template.'),
constraint=models.CheckConstraint(check=models.Q(models.Q(('team', ''), ('user__isnull', False)), models.Q(('team__gt', ''), ('user__isnull', True)), _connector='OR'), name='check_either_user_or_team', violation_error_message='Either user or team must be set, not both.'),
),
]
80 changes: 40 additions & 40 deletions src/backend/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,12 @@ def email_user(self, subject, message, from_email=None, **kwargs):
raise ValueError("User has no email address.")
mail.send_mail(subject, message, from_email, [self.email], **kwargs)


class Team(BaseModel):
"""Team used for role based access control when matched with templates in OIDC tokens."""

name = models.CharField(max_length=100, unique=True)

class Meta:
db_table = "publish_role"
ordering = ("name",)
verbose_name = _("Team")
verbose_name_plural = _("Teams")

def __str__(self):
return self.name
def get_teams(self):
"""
Get list of teams in which the user is, as a list of strings.
Must be cached if retrieved remotely.
"""
return []


class Template(BaseModel):
Expand Down Expand Up @@ -213,21 +205,25 @@ def get_abilities(self, user):
Compute and return abilities for a given user on the template.
"""
# Compute user role
role = None
roles = []
lebaudantoine marked this conversation as resolved.
Show resolved Hide resolved
if user.is_authenticated:
try:
role = self.user_role
roles = self.user_roles or []
except AttributeError:
teams = user.get_teams()
try:
role = self.accesses.filter(user=user).values("role")[0]["role"]
roles = self.accesses.filter(
models.Q(user=user) | models.Q(team__in=teams)
).values_list("role", flat=True)
sampaccoud marked this conversation as resolved.
Show resolved Hide resolved
except (TemplateAccess.DoesNotExist, IndexError):
role = None

is_owner_or_admin = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
can_get = self.is_public or role is not None
roles = []
is_owner_or_admin = bool(
set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
)
can_get = self.is_public or bool(roles)

return {
"destroy": role == RoleChoices.OWNER,
"destroy": RoleChoices.OWNER in roles,
"generate_document": can_get,
"manage_accesses": is_owner_or_admin,
"update": is_owner_or_admin,
Expand All @@ -250,13 +246,7 @@ class TemplateAccess(BaseModel):
null=True,
blank=True,
)
team = models.ForeignKey(
Team,
on_delete=models.CASCADE,
related_name="accesses",
null=True,
blank=True,
)
team = models.CharField(max_length=100, blank=True)
role = models.CharField(
max_length=20, choices=RoleChoices.choices, default=RoleChoices.MEMBER
)
Expand All @@ -268,14 +258,22 @@ class Meta:
constraints = [
models.UniqueConstraint(
fields=["user", "template"],
condition=models.Q(user__isnull=False), # Exclude null users
name="unique_template_user",
violation_error_message=_("This user is already in this template."),
),
models.UniqueConstraint(
fields=["team", "template"],
condition=models.Q(team__gt=""), # Exclude empty string teams
name="unique_template_team",
violation_error_message=_("This team is already in this template."),
),
models.CheckConstraint(
check=models.Q(user__isnull=False, team="")
| models.Q(user__isnull=True, team__gt=""),
name="check_either_user_or_team",
violation_error_message=_("Either user or team must be set, not both."),
),
]

def __str__(self):
Expand All @@ -287,32 +285,34 @@ def get_abilities(self, user):
the current state of the object.
"""
is_template_owner_or_admin = False
role = None

roles = []
if user.is_authenticated:
teams = user.get_teams()
try:
role = self.user_role
roles = self.user_roles or []
except AttributeError:
try:
role = self._meta.model.objects.filter(
roles = self._meta.model.objects.filter(
models.Q(user=user) | models.Q(team__in=teams),
template=self.template_id,
user=user,
).values("role")[0]["role"]
).values_list("role", flat=True)
except (self._meta.model.DoesNotExist, IndexError):
role = None

is_template_owner_or_admin = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
roles = []

is_template_owner_or_admin = bool(
set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
)
if self.role == RoleChoices.OWNER:
can_delete = (
role == RoleChoices.OWNER
RoleChoices.OWNER in roles
and self.template.accesses.filter(role=RoleChoices.OWNER).count() > 1
)
set_role_to = [RoleChoices.ADMIN, RoleChoices.MEMBER] if can_delete else []
else:
can_delete = is_template_owner_or_admin
set_role_to = []
if role == RoleChoices.OWNER:
if RoleChoices.OWNER in roles:
set_role_to.append(RoleChoices.OWNER)
if is_template_owner_or_admin:
set_role_to.extend([RoleChoices.ADMIN, RoleChoices.MEMBER])
Expand All @@ -326,6 +326,6 @@ def get_abilities(self, user):
return {
"destroy": can_delete,
"update": bool(set_role_to),
"retrieve": bool(role),
"retrieve": bool(roles),
lebaudantoine marked this conversation as resolved.
Show resolved Hide resolved
"set_role_to": set_role_to,
}
15 changes: 15 additions & 0 deletions src/backend/core/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Fixtures for tests in the publish core application"""
from unittest import mock

import pytest

USER = "user"
TEAM = "team"
VIA = [USER, TEAM]


@pytest.fixture
def mock_user_get_teams():
"""Mock for the "get_teams" method on the User model."""
with mock.patch("core.models.User.get_teams") as mock_get_teams:
yield mock_get_teams
Loading
Loading