-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2729 from bcgov/485-initial-rls-setup-part-1
485 initial rls setup - PART 1
- Loading branch information
Showing
15 changed files
with
314 additions
and
128 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import json | ||
import logging | ||
from typing import Callable, Optional | ||
from uuid import UUID | ||
from django.core.cache import cache | ||
from django.http import JsonResponse, HttpRequest, HttpResponse | ||
from django.shortcuts import get_object_or_404 | ||
from registration.constants import USER_CACHE_PREFIX | ||
from registration.models import User | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class CurrentUserMiddleware: | ||
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): | ||
self.get_response = get_response | ||
|
||
def __call__(self, request: HttpRequest) -> HttpResponse: | ||
auth_header = request.headers.get("Authorization") | ||
if auth_header: | ||
try: | ||
user_guid = self._extract_user_guid(auth_header) | ||
user = self.get_or_cache_user(user_guid) | ||
setattr(request, "current_user", user) | ||
except ValueError as e: | ||
logger.warning(f"Invalid Authorization header: {e}", exc_info=True) | ||
return JsonResponse({"error": "Invalid Authorization header"}, status=400) | ||
except Exception as e: | ||
logger.error(f"Unexpected error in CurrentUserMiddleware: {e}", exc_info=True) | ||
|
||
return self.get_response(request) | ||
|
||
@staticmethod | ||
def _extract_user_guid(auth_header: str) -> UUID: | ||
""" | ||
Extracts and validates the user GUID from the Authorization header. | ||
""" | ||
try: | ||
user_data = json.loads(auth_header) | ||
user_guid = user_data.get("user_guid") | ||
if not user_guid: | ||
raise ValueError("Missing user_guid in Authorization header") | ||
return UUID(user_guid, version=4) | ||
except (KeyError, ValueError, TypeError) as e: | ||
raise ValueError(f"Failed to extract user GUID: {e}") | ||
|
||
@staticmethod | ||
def get_or_cache_user(user_guid: UUID) -> User: | ||
""" | ||
Retrieves the full user object from the cache or database. | ||
""" | ||
cache_key = f"{USER_CACHE_PREFIX}{user_guid}" | ||
user: Optional[User] = cache.get(cache_key) | ||
|
||
if not user: | ||
user = get_object_or_404(User, user_guid=user_guid) | ||
cache.set(cache_key, user, 300) # Cache for 5 minutes | ||
|
||
return user |
36 changes: 0 additions & 36 deletions
36
bc_obps/registration/middleware/current_user_middleware.py
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
bc_obps/registration/tests/middleware/test_current_user.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import json | ||
from unittest.mock import MagicMock | ||
from django.test import TestCase, RequestFactory | ||
from registration.middleware.current_user import CurrentUserMiddleware | ||
from registration.models import User | ||
from model_bakery import baker | ||
|
||
|
||
class TestCurrentUserMiddleware(TestCase): | ||
def setUp(self): | ||
self.factory = RequestFactory() | ||
self.middleware = CurrentUserMiddleware(lambda request: MagicMock(status_code=200)) | ||
|
||
def test_no_authorization_header(self): | ||
middleware = self.middleware | ||
request = self.factory.get('/') | ||
middleware(request) | ||
assert not hasattr(request, 'current_user') | ||
|
||
def test_invalid_authorization_header(self): | ||
middleware = self.middleware | ||
request = self.factory.get('/', HTTP_AUTHORIZATION='invalid_token') | ||
middleware(request) | ||
assert not hasattr(request, 'current_user') | ||
|
||
def test_valid_authorization_header(self): | ||
middleware = self.middleware | ||
user = baker.make(User) | ||
auth_header = {'user_guid': str(user.user_guid)} | ||
request = self.factory.get('/', HTTP_AUTHORIZATION=json.dumps(auth_header)) | ||
middleware(request) | ||
assert hasattr(request, 'current_user') | ||
assert request.current_user == user | ||
|
||
def test_user_does_not_exist(self): | ||
middleware = self.middleware | ||
auth_header = {'user_guid': 'non_existing_guid'} | ||
request = self.factory.get('/', HTTP_AUTHORIZATION=json.dumps(auth_header)) | ||
middleware(request) | ||
assert not hasattr(request, 'current_user') | ||
|
||
def test_missing_user_guid_in_authorization_header(self): | ||
middleware = self.middleware | ||
auth_header = {} | ||
request = self.factory.get('/', HTTP_AUTHORIZATION=json.dumps(auth_header)) | ||
response = middleware(request) | ||
assert response.status_code == 400 | ||
self.assertEqual(json.loads(response.content.decode())["error"], "Invalid Authorization header") | ||
|
||
def test_malformed_json_in_authorization_header(self): | ||
middleware = self.middleware | ||
request = self.factory.get('/', HTTP_AUTHORIZATION="invalid_json") | ||
response = middleware(request) | ||
assert response.status_code == 400 | ||
self.assertEqual(json.loads(response.content.decode())["error"], "Invalid Authorization header") | ||
|
||
def test_invalid_uuid_format_in_authorization_header(self): | ||
middleware = self.middleware | ||
auth_header = {"user_guid": "invalid_uuid"} | ||
request = self.factory.get('/', HTTP_AUTHORIZATION=json.dumps(auth_header)) | ||
response = middleware(request) | ||
assert response.status_code == 400 | ||
self.assertEqual(json.loads(response.content.decode())["error"], "Invalid Authorization header") |
21 changes: 21 additions & 0 deletions
21
bc_obps/registration/tests/middleware/test_kubernetes_health_check.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from unittest.mock import MagicMock | ||
from django.test import TestCase, RequestFactory | ||
from registration.middleware.kubernetes_health_check import KubernetesHealthCheckMiddleware | ||
|
||
|
||
class TestKubernetesHealthCheckMiddleware(TestCase): | ||
def setUp(self): | ||
self.factory = RequestFactory() | ||
self.middleware = KubernetesHealthCheckMiddleware(lambda request: MagicMock(status_code=200)) | ||
|
||
def test_liveness_endpoint(self): | ||
request = self.factory.get('/liveness') | ||
response = self.middleware(request) | ||
self.assertEqual(response.status_code, 200) | ||
self.assertEqual(response.content, b'OK. Server is running.') | ||
|
||
def test_readiness_endpoint(self): | ||
request = self.factory.get('/readiness') | ||
response = self.middleware(request) | ||
self.assertEqual(response.status_code, 200) | ||
self.assertEqual(response.content, b'OK. Server is ready.') |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from django.apps import AppConfig | ||
|
||
|
||
class RlsConfig(AppConfig): | ||
default_auto_field = 'django.db.models.BigAutoField' | ||
name = 'rls' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from enum import Enum | ||
|
||
|
||
class RlsRoles(Enum): | ||
INDUSTRY_USER = "industry_user" | ||
CAS_DIRECTOR = "cas_director" | ||
CAS_ADMIN = "cas_admin" | ||
CAS_ANALYST = "cas_analyst" | ||
CAS_PENDING = "cas_pending" | ||
CAS_VIEW_ONLY = "cas_view_only" | ||
ALL_ROLES = "industry_user, cas_director, cas_admin, cas_analyst, cas_pending, cas_view_only" | ||
|
||
|
||
class RlsOperations(Enum): | ||
SELECT = "select" | ||
UPDATE = "update" | ||
INSERT = "insert" | ||
DELETE = "delete" |
Empty file.
Oops, something went wrong.