Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for Coriolis api #272

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions coriolis/api/middleware/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,33 @@


class CoriolisKeystoneContext(wsgi.Middleware):
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
def _get_project_id(self, req):
if 'X_TENANT_ID' in req.headers:
# This is the new header since Keystone went to ID/Name
return req.headers['X_TENANT_ID']
elif 'X_TENANT' in req.headers:
# This is for legacy compatibility
return req.headers['X_TENANT']
else:
raise webob.exc.HTTPBadRequest(
explanation=_("No 'X_TENANT_ID' or 'X_TENANT' passed."))

def _get_user(self, req):
user = req.headers.get('X_USER')
user = req.headers.get('X_USER_ID', user)
if user is None:
LOG.debug("Neither X_USER_ID nor X_USER found in request")
return webob.exc.HTTPUnauthorized()
raise webob.exc.HTTPUnauthorized(
explanation=_("Neither X_USER_ID nor X_USER found in request"))
return user

@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
user = self._get_user(req)

project_id = self._get_project_id(req)

# get the roles
roles = [r.strip() for r in req.headers.get('X_ROLE', '').split(',')]
if 'X_TENANT_ID' in req.headers:
# This is the new header since Keystone went to ID/Name
project_id = req.headers['X_TENANT_ID']
else:
# This is for legacy compatibility
project_id = req.headers['X_TENANT']

project_name = req.headers.get('X_TENANT_NAME')
project_domain_name = req.headers.get('X-Project-Domain-Name')
Expand Down
Empty file added coriolis/tests/api/__init__.py
Empty file.
Empty file.
286 changes: 286 additions & 0 deletions coriolis/tests/api/middleware/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
# Copyright 2022 Cloudbase Solutions Srl
# All Rights Reserved.

from unittest import mock

from oslo_middleware import request_id
import webob

from coriolis.api.middleware import auth
from coriolis.api import wsgi
from coriolis import context
from coriolis.tests import test_base


class CoriolisKeystoneContextTestCase(test_base.CoriolisBaseTestCase):
"""Test suite for the Coriolis api middleware auth."""

def setUp(self):
super(CoriolisKeystoneContextTestCase, self).setUp()
self.auth = auth.CoriolisKeystoneContext(wsgi.Middleware)

@mock.patch.object(context, "RequestContext")
@mock.patch.object(auth.CoriolisKeystoneContext, "_get_project_id")
@mock.patch.object(auth.CoriolisKeystoneContext, "_get_user")
def test__call__(
self,
mock_get_user,
mock__get_project_id,
mock_request_context
):
req_mock = mock.Mock()

expected_roles = ['1', '2', '3']
expected_project_name = 'mock_project_name'
expected_project_domain_name = 'mock_project_domain_name'
expected_user_domain_name = 'mock_user_domain_name'
expected_auth_token = 'mock_token123'
expected_remote_address = 'mock_addr'
expected_service_catalog = {"catalog1": ["service1", "service2"]}
expected_req_id = 'mock_req_id'

req_mock.remote_addr = expected_remote_address
req_mock.environ = {
request_id.ENV_REQUEST_ID: expected_req_id
}

req_mock.headers = {
'X_ROLE': '1,2,3',
'X_TENANT_NAME': expected_project_name,
'X-Project-Domain-Name': expected_project_domain_name,
'X-User-Domain-Name': expected_user_domain_name,
'X_AUTH_TOKEN': expected_auth_token,
'X_SERVICE_CATALOG':
str(expected_service_catalog).replace("'", '"'),
}

result = self.auth(req_mock)

self.assertEqual(
self.auth.application,
result
)

mock_get_user.assert_called_once_with(req_mock)
mock__get_project_id.assert_called_once_with(req_mock)
mock_request_context.assert_called_once_with(
mock_get_user.return_value,
mock__get_project_id.return_value,
project_name=expected_project_name,
project_domain_name=expected_project_domain_name,
user_domain_name=expected_user_domain_name,
roles=expected_roles,
auth_token=expected_auth_token,
remote_address=expected_remote_address,
service_catalog=expected_service_catalog,
request_id=expected_req_id
)

self.assertEqual(
req_mock.environ['coriolis.context'],
mock_request_context.return_value
)

@mock.patch.object(context, "RequestContext")
@mock.patch.object(auth.CoriolisKeystoneContext, "_get_project_id")
@mock.patch.object(auth.CoriolisKeystoneContext, "_get_user")
def test__call__req_id_is_bytes(
self,
mock_get_user,
mock__get_project_id,
mock_request_context
):
req_mock = mock.Mock()

expected_roles = ['1', '2', '3']
expected_project_name = 'mock_project_name'
expected_project_domain_name = 'mock_project_domain_name'
expected_user_domain_name = 'mock_user_domain_name'
expected_auth_token = 'mock_token123'
expected_remote_address = 'mock_addr'
expected_service_catalog = {"catalog1": ["service1", "service2"]}
Dany9966 marked this conversation as resolved.
Show resolved Hide resolved
expected_req_id = 'mock_req_id'

req_mock.remote_addr = expected_remote_address
req_mock.environ = {
request_id.ENV_REQUEST_ID: expected_req_id.encode()
Dany9966 marked this conversation as resolved.
Show resolved Hide resolved
}

req_mock.headers = {
'X_ROLE': '1,2,3',
'X_TENANT_NAME': expected_project_name,
'X-Project-Domain-Name': expected_project_domain_name,
'X-User-Domain-Name': expected_user_domain_name,
'X_AUTH_TOKEN': expected_auth_token,
'X_SERVICE_CATALOG':
str(expected_service_catalog).replace("'", '"'),
}

result = self.auth(req_mock)

self.assertEqual(
self.auth.application,
result
)

mock_get_user.assert_called_once_with(req_mock)
mock__get_project_id.assert_called_once_with(req_mock)
mock_request_context.assert_called_once_with(
mock_get_user.return_value,
mock__get_project_id.return_value,
project_name=expected_project_name,
project_domain_name=expected_project_domain_name,
user_domain_name=expected_user_domain_name,
roles=expected_roles,
auth_token=expected_auth_token,
remote_address=expected_remote_address,
service_catalog=expected_service_catalog,
request_id=expected_req_id
)

self.assertEqual(
req_mock.environ['coriolis.context'],
mock_request_context.return_value
)

@mock.patch.object(auth.CoriolisKeystoneContext, "_get_project_id")
@mock.patch.object(auth.CoriolisKeystoneContext, "_get_user")
def test__call__invalid_service_catalog(
self,
mock_get_user,
mock__get_project_id,
):
req_mock = mock.Mock()

invalid_service_catalog = "mock_invalid_service_catalog"

req_mock.headers = {
'X_SERVICE_CATALOG': invalid_service_catalog,
}

self.assertRaises(
webob.exc.HTTPInternalServerError,
self.auth,
req_mock,
)

@mock.patch.object(auth.CoriolisKeystoneContext, "_get_user")
def test__call__no_headers(
self,
mock_get_user,
):
req_mock = mock.Mock()
mock_get_user.side_effect = webob.exc.HTTPUnauthorized()

req_mock.headers = {}

self.assertRaises(
webob.exc.HTTPUnauthorized,
self.auth,
req_mock
)

def test_get_project_id_tenant_id(self):
req_mock = mock.Mock()

expected_result = 'mock_tenant'

req_mock.headers = {
'X_TENANT_ID': expected_result,
}

result = self.auth._get_project_id(req_mock)

self.assertEqual(
expected_result,
result
)

def test_get_project_id_tenant(self):
req_mock = mock.Mock()

expected_result = 'mock_tenant'

req_mock.headers = {
'X_TENANT': expected_result,
}

result = self.auth._get_project_id(req_mock)

self.assertEqual(
expected_result,
result
)

def test_get_project_id_no_tenant(self):
Dany9966 marked this conversation as resolved.
Show resolved Hide resolved
req_mock = mock.Mock()

req_mock.headers = {}

self.assertRaises(
webob.exc.HTTPBadRequest,
self.auth._get_project_id,
req_mock
)

def test_get_user(self):
req_mock = mock.Mock()

mock_user = 'mock_user'

expected_result = 'mock_user_id'

req_mock.headers = {
'X_USER': mock_user,
'X_USER_ID': expected_result,
}

result = self.auth._get_user(req_mock)

self.assertEqual(
expected_result,
result
)

def test_get_user_has_user(self):
req_mock = mock.Mock()

expected_result = 'mock_user'

req_mock.headers = {
'X_USER': expected_result,
}

result = self.auth._get_user(req_mock)

self.assertEqual(
expected_result,
result
)

def test_get_user_has_user_id(self):
req_mock = mock.Mock()

expected_result = 'mock_user_id'

req_mock.headers = {
'X_USER_ID': expected_result,
}

result = self.auth._get_user(req_mock)

self.assertEqual(
expected_result,
result
)

def test_get_user_none(self):
req_mock = mock.Mock()

req_mock.headers = {}

self.assertRaises(
webob.exc.HTTPUnauthorized,
self.auth._get_user,
req_mock
)
Empty file.
Empty file.