From f9369c71686a8c3aa1796087717c419cc4c1e6ef Mon Sep 17 00:00:00 2001 From: Giovanni Savarese Date: Fri, 2 Feb 2024 11:06:29 +0100 Subject: [PATCH] Update crud and openstack tests --- src/crud.py | 2 +- src/providers/openstack.py | 67 +-- tests/conftest.py | 463 ++++++++++++++++- tests/crud/test_crud.py | 237 +++++---- tests/providers/openstack/test_connection.py | 107 +--- tests/providers/openstack/test_flavors.py | 136 ++--- tests/providers/openstack/test_images.py | 188 +++---- tests/providers/openstack/test_networks.py | 240 ++++----- tests/providers/openstack/test_projects.py | 45 +- tests/providers/openstack/test_quotas.py | 105 +--- tests/providers/openstack/test_resources.py | 165 +++--- tests/providers/openstack/test_services.py | 504 +++++++------------ tests/providers/test_get_proj_res.py | 5 - tests/schemas/test_limits.py | 34 -- tests/schemas/test_region.py | 9 +- 15 files changed, 1153 insertions(+), 1154 deletions(-) diff --git a/src/crud.py b/src/crud.py index dc5164a..0772222 100644 --- a/src/crud.py +++ b/src/crud.py @@ -65,7 +65,7 @@ def create(self, *, data: ProviderCreateExtended) -> ProviderReadExtended: logger.debug(f"Message: {resp.text}") resp.raise_for_status() - def remove(self, *, item: ProviderReadExtended) -> None: + def remove(self, *, item: ProviderRead) -> None: """Remove item.""" logger.info(f"Removing Provider={item.name}.") logger.debug(f"Url={self.single_url.format(uid=item.uid)}") diff --git a/src/providers/openstack.py b/src/providers/openstack.py index 7e715d9..57872a9 100644 --- a/src/providers/openstack.py +++ b/src/providers/openstack.py @@ -123,11 +123,12 @@ def get_images( logger.debug(f"Image received data={image!r}") is_public = True projects = [] - if image.visibility in ["private", "shared"]: + if image.visibility == "private": projects = [image.owner_id] is_public = False - if image.visibility == "shared": + elif image.visibility == "shared": projects = get_image_projects(conn, image=image, projects=projects) + is_public = False data = image.to_dict() data["uuid"] = data.pop("id") # Openstack image object does not have `description` field @@ -207,7 +208,6 @@ def get_block_storage_service( conn: Connection, *, per_user_limits: Optional[BlockStorageQuotaBase], - project_id: str, ) -> Optional[BlockStorageServiceCreateExtended]: """Retrieve project's block storage service. @@ -231,7 +231,8 @@ def get_block_storage_service( if per_user_limits: block_storage_service.quotas.append( BlockStorageQuotaCreateExtended( - **per_user_limits.dict(exclude_none=True), project=project_id + **per_user_limits.dict(exclude_none=True), + project=conn.current_project_id, ) ) return block_storage_service @@ -241,7 +242,6 @@ def get_compute_service( conn: Connection, *, per_user_limits: Optional[ComputeQuotaBase], - project_id: str, tags: List[str], ) -> Optional[ComputeServiceCreateExtended]: """Create region's compute service. @@ -266,7 +266,8 @@ def get_compute_service( if per_user_limits: compute_service.quotas.append( ComputeQuotaCreateExtended( - **per_user_limits.dict(exclude_none=True), project=project_id + **per_user_limits.dict(exclude_none=True), + project=conn.current_project_id, ) ) return compute_service @@ -276,7 +277,6 @@ def get_network_service( conn: Connection, *, per_user_limits: Optional[NetworkQuotaBase], - project_id: str, tags: List[str], default_private_net: Optional[str], default_public_net: Optional[str], @@ -305,7 +305,8 @@ def get_network_service( if per_user_limits: network_service.quotas.append( NetworkQuotaCreateExtended( - **per_user_limits.dict(exclude_none=True), project=project_id + **per_user_limits.dict(exclude_none=True), + project=conn.current_project_id, ) ) return network_service @@ -318,7 +319,7 @@ def connect_to_provider( project_id: str, region_name: str, token: str, -) -> Optional[Connection]: +) -> Connection: """Connect to Openstack provider""" logger.info( f"Connecting through IDP {idp.endpoint} to openstack " @@ -326,22 +327,16 @@ def connect_to_provider( f"Accessing with project ID: {project_id}" ) auth_type = "v3oidcaccesstoken" - try: - conn = connect( - auth_url=provider_conf.auth_url, - auth_type=auth_type, - identity_provider=idp.relationship.idp_name, - protocol=idp.relationship.protocol, - access_token=token, - project_id=project_id, - region_name=region_name, - timeout=TIMEOUT, - ) - except (ConnectFailure, Unauthorized, NoMatchingPlugin, NotFound) as e: - logger.error(e) - return None - logger.info("Connected.") - return conn + return connect( + auth_url=provider_conf.auth_url, + auth_type=auth_type, + identity_provider=idp.relationship.idp_name, + protocol=idp.relationship.protocol, + access_token=token, + project_id=project_id, + region_name=region_name, + timeout=TIMEOUT, + ) def get_data_from_openstack( @@ -367,8 +362,6 @@ def get_data_from_openstack( region_name=region_name, token=token, ) - if not conn: - return None try: # Create project entity @@ -376,14 +369,11 @@ def get_data_from_openstack( # Retrieve provider services (block_storage, compute, identity and network) block_storage_service = get_block_storage_service( - conn, - per_user_limits=project_conf.per_user_limits.block_storage, - project_id=project_conf.id, + conn, per_user_limits=project_conf.per_user_limits.block_storage ) compute_service = get_compute_service( conn, per_user_limits=project_conf.per_user_limits.compute, - project_id=project_conf.id, tags=provider_conf.image_tags, ) identity_service = IdentityServiceCreate( @@ -393,22 +383,19 @@ def get_data_from_openstack( network_service = get_network_service( conn, per_user_limits=project_conf.per_user_limits.network, - project_id=project_conf.id, tags=provider_conf.network_tags, default_private_net=project_conf.default_private_net, default_public_net=project_conf.default_public_net, proxy=project_conf.private_net_proxy, ) - - conn.close() - logger.info("Connection closed") - except ConnectFailure: - logger.error("Connection closed unexpectedly.") - return None - except Unauthorized: - logger.error("Unauthorized to read project info.") + except (ConnectFailure, Unauthorized, NoMatchingPlugin, NotFound) as e: + logger.error(e) return None + # TODO Check if the closing action can raise an exception + conn.close() + logger.info("Connection closed") + return ( project, block_storage_service, diff --git a/tests/conftest.py b/tests/conftest.py index 1847c20..f41360c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,17 +1,34 @@ import os -from typing import Union +from random import getrandbits, randint +from typing import Any, Dict, List, Tuple, Union from uuid import uuid4 import pytest +from app.location.schemas import LocationBase from app.provider.schemas_extended import ( + AuthMethodCreate, BlockStorageServiceCreateExtended, ComputeServiceCreateExtended, + IdentityProviderCreateExtended, IdentityServiceCreate, NetworkServiceCreateExtended, ProjectCreate, ProviderCreateExtended, + ProviderRead, + ProviderReadExtended, + SLACreateExtended, + UserGroupCreateExtended, ) -from pytest_cases import parametrize +from app.quota.schemas import BlockStorageQuotaBase, ComputeQuotaBase, NetworkQuotaBase +from openstack.block_storage.v3.quota_set import ( + QuotaSet as OpenstackBlockStorageQuotaSet, +) +from openstack.compute.v2.flavor import Flavor as OpenstackFlavor +from openstack.compute.v2.quota_set import QuotaSet as OpenstackComputeQuotaSet +from openstack.image.v2.image import Image as OpenstackImage +from openstack.network.v2.network import Network as OpenstackNetwork +from openstack.network.v2.quota import Quota as OpenstackNetworkQuota +from pytest_cases import case, parametrize, parametrize_with_cases from src.config import APIVersions, Settings, URLs from src.crud import CRUD @@ -28,10 +45,14 @@ Region, ) from src.utils import get_read_write_headers +from tests.providers.openstack.utils import random_image_status, random_network_status from tests.schemas.utils import ( random_block_storage_service_name, random_compute_service_name, + random_country, + random_float, random_identity_service_name, + random_image_os_type, random_ip, random_lower_string, random_network_service_name, @@ -190,6 +211,66 @@ def site_config(issuer: Issuer) -> SiteConfig: return SiteConfig(trusted_idps=[issuer]) +@pytest.fixture +def configurations( + identity_provider_create: IdentityProviderCreateExtended, + openstack_provider: Openstack, + project: Project, +) -> Tuple[IdentityProviderCreateExtended, Openstack, Project]: + project.sla = identity_provider_create.user_groups[0].sla.doc_uuid + openstack_provider.identity_providers[ + 0 + ].endpoint = identity_provider_create.endpoint + openstack_provider.identity_providers[ + 0 + ].idp_name = identity_provider_create.relationship.idp_name + openstack_provider.identity_providers[ + 0 + ].protocol = identity_provider_create.relationship.protocol + return identity_provider_create, openstack_provider, project + + +# Federation-Registry Base Items + + +@pytest.fixture +def location() -> LocationBase: + """Fixture with an LocationBase without projects.""" + return LocationBase(site=random_lower_string(), country=random_country()) + + +@pytest.fixture +def block_storage_quota() -> BlockStorageQuotaBase: + """Fixture with a BlockStorageQuotaBase.""" + return BlockStorageQuotaBase( + gigabytes=randint(0, 100), + per_volume_gigabytes=randint(0, 100), + volumes=randint(1, 100), + ) + + +@pytest.fixture +def compute_quota() -> ComputeQuotaBase: + """Fixture with a ComputeQuotaBase.""" + return ComputeQuotaBase( + cores=randint(0, 100), + instances=randint(0, 100), + ram=randint(1, 100), + ) + + +@pytest.fixture +def network_quota() -> NetworkQuotaBase: + """Fixture with a NetworkQuotaBase.""" + return NetworkQuotaBase( + public_ips=randint(0, 100), + networks=randint(0, 100), + ports=randint(1, 100), + security_groups=randint(1, 100), + security_group_rules=randint(1, 100), + ) + + # Federation-Registry Creation Items @@ -201,6 +282,18 @@ def provider_create() -> ProviderCreateExtended: ) +@pytest.fixture +def provider_read(provider_create: ProviderCreateExtended) -> ProviderRead: + return ProviderRead(uid=uuid4(), **provider_create.dict()) + + +@pytest.fixture +def provider_read_extended( + provider_create: ProviderCreateExtended, +) -> ProviderReadExtended: + return ProviderReadExtended(uid=uuid4(), **provider_create.dict()) + + @pytest.fixture def project_create() -> ProjectCreate: """Fixture with a ProjectCreate.""" @@ -267,3 +360,369 @@ def service_create( IdentityServiceCreate and NetworkServiceCreateExtended. """ return s + + +@pytest.fixture +def sla_create(sla: SLA, project: Project) -> SLACreateExtended: + """Fixture with an SLACreateExtended.""" + return SLACreateExtended(**sla.dict(), project=project.id) + + +@pytest.fixture +def user_group_create(sla_create: SLACreateExtended) -> UserGroupCreateExtended: + """Fixture with a UserGroupCreateExtended.""" + return UserGroupCreateExtended(name=random_lower_string(), sla=sla_create) + + +@pytest.fixture +def auth_method_create() -> AuthMethodCreate: + return AuthMethodCreate( + idp_name=random_lower_string(), protocol=random_lower_string() + ) + + +@pytest.fixture +def identity_provider_create( + auth_method_create: AuthMethodCreate, user_group_create: UserGroupCreateExtended +) -> IdentityProviderCreateExtended: + """Fixture with an IdentityProviderCreateExtended.""" + return IdentityProviderCreateExtended( + user_groups=[user_group_create], + endpoint=random_url(), + group_claim=random_lower_string(), + relationship=auth_method_create, + ) + + +# Openstack specific + + +class CaseDefaultAttr: + @parametrize(value=[True, False]) + def case_is_default(self, value: bool) -> bool: + return value + + +class CaseTags: + def case_single_valid_tag(self) -> List[str]: + return ["one"] + + @parametrize(case=[0, 1]) + def case_single_invalid_tag(self, case: int) -> List[str]: + return ["two"] if case else ["one-two"] + + def case_at_least_one_valid_tag(self) -> List[str]: + return ["one", "two"] + + +def openstack_network_dict() -> Dict[str, Any]: + """Dict with network minimal data.""" + return { + "id": uuid4().hex, + "name": random_lower_string(), + "status": "active", + "project_id": uuid4().hex, + "is_router_external": getrandbits(1), + "is_shared": False, + "mtu": randint(1, 100), + } + + +@pytest.fixture +def openstack_network_base() -> OpenstackNetwork: + """Fixture with network.""" + return OpenstackNetwork(**openstack_network_dict()) + + +@pytest.fixture +def openstack_network_disabled() -> OpenstackNetwork: + """Fixture with disabled network.""" + d = openstack_network_dict() + d["status"] = random_network_status(exclude=["active"]) + return OpenstackNetwork(**d) + + +@pytest.fixture +def openstack_network_with_desc() -> OpenstackNetwork: + """Fixture with network with specified description.""" + d = openstack_network_dict() + d["description"] = random_lower_string() + return OpenstackNetwork(**d) + + +@pytest.fixture +def openstack_network_shared() -> OpenstackNetwork: + """Fixture with shared network.""" + d = openstack_network_dict() + d["is_shared"] = True + return OpenstackNetwork(**d) + + +@pytest.fixture +@parametrize_with_cases("tags", cases=CaseTags) +def openstack_network_with_tags(tags: List[str]) -> OpenstackNetwork: + """Fixture with network with specified tags.""" + d = openstack_network_dict() + d["tags"] = tags + return OpenstackNetwork(**d) + + +@pytest.fixture +@parametrize(i=[openstack_network_base, openstack_network_shared]) +def openstack_priv_pub_network(i: OpenstackNetwork) -> OpenstackNetwork: + """Fixtures union.""" + return i + + +@pytest.fixture +@parametrize( + i=[ + openstack_priv_pub_network, + openstack_network_disabled, + openstack_network_with_desc, + openstack_network_with_tags, + ] +) +def openstack_network(i: OpenstackNetwork) -> OpenstackNetwork: + """Fixtures union.""" + return i + + +@pytest.fixture +@parametrize(n=[openstack_network_base, openstack_network_shared]) +@parametrize_with_cases("is_default", cases=CaseDefaultAttr) +def openstack_default_network( + n: OpenstackNetwork, is_default: bool +) -> OpenstackNetwork: + """Shared and not shared networks with/without is_default.""" + n.is_default = is_default + return n + + +class CaseVisibility: + @case(tags=["private"]) + @parametrize(visibility=["shared", "private"]) + def case_priv_visibility(self, visibility: str) -> str: + return visibility + + @case(tags=["public"]) + @parametrize(visibility=["public", "community"]) + def case_pub_visibility(self, visibility: str) -> str: + return visibility + + +def openstack_image_dict() -> Dict[str, Any]: + """Dict with image minimal data.""" + return { + "id": uuid4().hex, + "name": random_lower_string(), + "status": "active", + "owner_id": uuid4().hex, + "os_type": random_image_os_type(), + "os_distro": random_lower_string(), + "os_version": random_lower_string(), + "architecture": random_lower_string(), + "kernel_id": random_lower_string(), + "visibility": "public", + } + + +@pytest.fixture +def openstack_image_disabled() -> OpenstackImage: + """Fixture with disabled image.""" + d = openstack_image_dict() + d["status"] = random_image_status(exclude=["active"]) + return OpenstackImage(**d) + + +@pytest.fixture +@parametrize_with_cases("visibility", cases=CaseVisibility, has_tag="public") +def openstack_image_public(visibility: str) -> OpenstackImage: + """Fixture with image with specified visibility.""" + d = openstack_image_dict() + d["visibility"] = visibility + return OpenstackImage(**d) + + +@pytest.fixture +@parametrize_with_cases("tags", cases=CaseTags) +def openstack_image_with_tags(tags: List[str]) -> OpenstackImage: + """Fixture with image with specified tags.""" + d = openstack_image_dict() + d["tags"] = tags + return OpenstackImage(**d) + + +@pytest.fixture +@parametrize_with_cases("visibility", cases=CaseVisibility, has_tag="private") +def openstack_image_private(visibility: str) -> OpenstackImage: + """Fixture with private image.""" + d = openstack_image_dict() + d["visibility"] = visibility + return OpenstackImage(**d) + + +@pytest.fixture +@parametrize( + i=[openstack_image_disabled, openstack_image_public, openstack_image_with_tags] +) +def openstack_image(i: OpenstackImage) -> OpenstackImage: + """Fixtures union.""" + return i + + +class CaseExtraSpecs: + @parametrize(second_attr=["gpu_model", "gpu_vendor"]) + def case_gpu(self, second_attr: str) -> Dict[str, Any]: + return {"gpu_number": randint(1, 100), second_attr: random_lower_string()} + + def case_infiniband(self) -> Dict[str, Any]: + return {"infiniband": getrandbits(1)} + + def case_local_storage(self) -> Dict[str, Any]: + return {"aggregate_instance_extra_specs:local_storage": random_lower_string()} + + +def openstack_flavor_dict() -> Dict[str, Any]: + """Dict with flavor minimal data.""" + return { + "name": random_lower_string(), + "disk": randint(0, 100), + "is_public": getrandbits(1), + "ram": randint(0, 100), + "vcpus": randint(0, 100), + "swap": randint(0, 100), + "ephemeral": randint(0, 100), + "is_disabled": False, + "rxtx_factor": random_float(0, 100), + "extra_specs": {}, + } + + +@pytest.fixture +def openstack_flavor_base() -> OpenstackFlavor: + """Fixture with disabled flavor.""" + return OpenstackFlavor(**openstack_flavor_dict()) + + +@pytest.fixture +def openstack_flavor_disabled() -> OpenstackFlavor: + """Fixture with disabled flavor.""" + d = openstack_flavor_dict() + d["is_disabled"] = True + return OpenstackFlavor(**d) + + +@pytest.fixture +def openstack_flavor_with_desc() -> OpenstackFlavor: + """Fixture with a flavor with description.""" + d = openstack_flavor_dict() + d["description"] = random_lower_string() + return OpenstackFlavor(**d) + + +@pytest.fixture +def openstack_flavor_private() -> OpenstackFlavor: + """Fixture with private flavor.""" + d = openstack_flavor_dict() + d["is_public"] = False + return OpenstackFlavor(**d) + + +@pytest.fixture +@parametrize_with_cases("extra_specs", cases=CaseExtraSpecs) +def openstack_flavor_with_extra_specs(extra_specs: Dict[str, Any]) -> OpenstackFlavor: + """Fixture with a flavor with extra specs.""" + d = openstack_flavor_dict() + d["extra_specs"] = extra_specs + return OpenstackFlavor(**d) + + +@pytest.fixture +@parametrize( + f=[ + openstack_flavor_base, + openstack_flavor_disabled, + openstack_flavor_with_extra_specs, + openstack_flavor_private, + openstack_flavor_with_desc, + ] +) +def openstack_flavor(f: OpenstackFlavor) -> OpenstackFlavor: + """Fixtures union.""" + return f + + +def openstack_block_storage_quotas_dict() -> Dict[str, int]: + """Dict with the block storage quotas attributes.""" + return { + "backup_gigabytes": randint(0, 100), + "backups": randint(0, 100), + "gigabytes": randint(0, 100), + "groups": randint(0, 100), + "per_volume_gigabytes": randint(0, 100), + "snapshots": randint(0, 100), + "volumes": randint(0, 100), + } + + +def openstack_compute_quotas_dict() -> Dict[str, int]: + """Dict with the compute quotas attributes.""" + return { + "cores": randint(0, 100), + "fixed_ips": randint(0, 100), + "floating_ips": randint(0, 100), + "injected_file_content_bytes": randint(0, 100), + "injected_file_path_bytes": randint(0, 100), + "injected_files": randint(0, 100), + "instances": randint(0, 100), + "key_pairs": randint(0, 100), + "metadata_items": randint(0, 100), + "networks": randint(0, 100), + "ram": randint(0, 100), + "security_group_rules": randint(0, 100), + "security_groups": randint(0, 100), + "server_groups": randint(0, 100), + "server_group_members": randint(0, 100), + "force": False, + } + + +def openstack_network_quotas_dict() -> Dict[str, int]: + """Dict with the network quotas attributes.""" + return { + "check_limit": False, + "floating_ips": randint(0, 100), + "health_monitors": randint(0, 100), + "listeners": randint(0, 100), + "load_balancers": randint(0, 100), + "l7_policies": randint(0, 100), + "networks": randint(0, 100), + "pools": randint(0, 100), + "ports": randint(0, 100), + # "project_id": ?, + "rbac_policies": randint(0, 100), + "routers": randint(0, 100), + "subnets": randint(0, 100), + "subnet_pools": randint(0, 100), + "security_group_rules": randint(0, 100), + "security_groups": randint(0, 100), + } + + +@pytest.fixture +def openstack_block_storage_quotas() -> OpenstackBlockStorageQuotaSet: + """Fixture with the block storage quotas.""" + return OpenstackBlockStorageQuotaSet(**openstack_block_storage_quotas_dict()) + + +@pytest.fixture +def openstack_compute_quotas() -> OpenstackComputeQuotaSet: + """Fixture with the compute quotas.""" + return OpenstackComputeQuotaSet(**openstack_compute_quotas_dict()) + + +@pytest.fixture +def openstack_network_quotas() -> OpenstackNetworkQuota: + """Fixture with the network quotas.""" + return OpenstackNetworkQuota(**openstack_network_quotas_dict()) diff --git a/tests/crud/test_crud.py b/tests/crud/test_crud.py index b6ab188..da6930f 100644 --- a/tests/crud/test_crud.py +++ b/tests/crud/test_crud.py @@ -1,6 +1,7 @@ -from typing import Type, Union +import copy +import os +from typing import Any, Dict, List, Type, Union from unittest.mock import Mock, patch -from uuid import uuid4 import pytest from app.provider.schemas_extended import ( @@ -15,7 +16,7 @@ from src.crud import CRUD from src.utils import get_read_write_headers -from tests.schemas.utils import random_lower_string, random_provider_type, random_url +from tests.schemas.utils import random_lower_string, random_url class CaseConnException: @@ -42,8 +43,7 @@ class CaseErrorCode: status.HTTP_404_NOT_FOUND, status.HTTP_405_METHOD_NOT_ALLOWED, status.HTTP_408_REQUEST_TIMEOUT, - status.HTTP_409_CONFLICT, - status.HTTP_410_GONE, + status.HTTP_422_UNPROCESSABLE_ENTITY, status.HTTP_500_INTERNAL_SERVER_ERROR, ] ) @@ -51,147 +51,178 @@ def case_error_code(self, code: int) -> int: return code +class CaseRespReadProviders: + def case_empty_list(self) -> List[ProviderRead]: + return [] + + def case_providers(self, provider_read: ProviderRead) -> List[ProviderRead]: + return [provider_read] + + class CaseCRUDCreation: @parametrize(attr=["url", "read_headers", "write_headers"]) def case_missing_attr(self, attr: str) -> str: return attr -def execute_request(*, crud: CRUD, request: str) -> None: +def execute_request( + *, + crud: CRUD, + request: str, + provider_create: ProviderCreateExtended, + provider_read: ProviderRead, +) -> None: if request == "get": crud.read() elif request == "post": - data = ProviderCreateExtended( - name=random_lower_string(), type=random_provider_type() - ) - crud.create(data=data) + crud.create(data=provider_create) elif request == "delete": - item = ProviderRead( - uid=uuid4(), name=random_lower_string(), type=random_provider_type() - ) - crud.remove(item=item) + crud.remove(item=provider_read) elif request == "put": - old_data = ProviderCreateExtended( - name=random_lower_string(), type=random_provider_type() - ) - item = ProviderReadExtended(uid=uuid4(), **old_data.dict()) - new_data = ProviderCreateExtended( - name=random_lower_string(), type=random_provider_type() - ) - crud.update(new_data=new_data, old_data=item) + new_data = copy.deepcopy(provider_create) + new_data.name = random_lower_string() + crud.update(new_data=new_data, old_data=provider_read) -def test_crud_class() -> None: +def crud_dict() -> Dict[str, Any]: + """Dict with CRUD minimal attributes.""" read_header, write_header = get_read_write_headers(token=random_lower_string()) - CRUD(url=random_url(), read_headers=read_header, write_headers=write_header) + return { + "url": random_url(), + "read_headers": read_header, + "write_headers": write_header, + } + + +def test_crud_class() -> None: + """Valid CRUD schema.""" + d = crud_dict() + crud = CRUD(**d) + assert crud.single_url == os.path.join(d.get("url"), "{uid}") + assert crud.multi_url == d.get("url") + assert crud.read_headers == d.get("read_headers") + assert crud.write_headers == d.get("write_headers") @parametrize_with_cases("missing_attr", cases=CaseCRUDCreation) def test_invalid_crud_class(missing_attr: str) -> None: - url = None if missing_attr == "url" else random_url() - read_header, write_header = get_read_write_headers(token=random_lower_string()) - if missing_attr == "read_headers": - read_header = None - if missing_attr == "write_headers": - write_header = None + """Invalid CRUD schema. + + Missing required attributes. + """ + d = crud_dict() + d[missing_attr] = None with pytest.raises(ValueError): - CRUD(url=url, read_headers=read_header, write_headers=write_header) + CRUD(**d) -@patch("src.crud.requests.get") -def test_read(mock_get: Mock, crud: CRUD) -> None: - providers = [ - ProviderRead( - uid=uuid4(), name=random_lower_string(), type=random_provider_type() +@parametrize_with_cases("error_code", cases=CaseErrorCode) +@parametrize_with_cases("request", cases=CaseRequest) +def test_fail_operation( + crud: CRUD, + provider_create: ProviderCreateExtended, + provider_read: ProviderReadExtended, + request: str, + error_code: int, +) -> None: + """Endpoint responds with error codes.""" + with patch(f"src.crud.requests.{request}") as mock_req: + mock_req.return_value.status_code = error_code + execute_request( + crud=crud, + request=request, + provider_create=provider_create, + provider_read=provider_read, ) - ] - mock_get.return_value.status_code = 200 + mock_req.return_value.raise_for_status.assert_called() + + +@parametrize_with_cases("exception", cases=CaseConnException) +@parametrize_with_cases("request", cases=CaseRequest) +def test_read_no_connection( + crud: CRUD, + provider_create: ProviderCreateExtended, + provider_read: ProviderReadExtended, + request: str, + exception: Union[Type[ConnectionError], Type[ReadTimeout]], +) -> None: + """Connection error: no connection or read timeout.""" + with patch(f"src.crud.requests.{request}") as mock_req: + mock_req.side_effect = exception() + with pytest.raises(exception): + execute_request( + crud=crud, + request=request, + provider_create=provider_create, + provider_read=provider_read, + ) + + +@patch("src.crud.requests.get") +@parametrize_with_cases("providers", cases=CaseRespReadProviders) +def test_read(mock_get: Mock, crud: CRUD, providers: List[ProviderRead]) -> None: + mock_get.return_value.status_code = status.HTTP_200_OK mock_get.return_value.json.return_value = jsonable_encoder(providers) resp_body = crud.read() mock_get.return_value.raise_for_status.assert_not_called() assert len(providers) == len(resp_body) - assert isinstance(providers[0], ProviderRead) - assert providers[0].uid == resp_body[0].uid + if len(providers) == 1: + assert isinstance(providers[0], ProviderRead) @patch("src.crud.requests.post") -def test_create(mock_post: Mock, crud: CRUD) -> None: - data = ProviderCreateExtended( - name=random_lower_string(), type=random_provider_type() - ) - item = ProviderReadExtended(uid=uuid4(), **data.dict()) - mock_post.return_value.status_code = 201 - mock_post.return_value.json.return_value = jsonable_encoder(item) - resp_body = crud.create(data=data) - +def test_create( + mock_post: Mock, + crud: CRUD, + provider_create: ProviderCreateExtended, + provider_read_extended: ProviderReadExtended, +) -> None: + mock_post.return_value.status_code = status.HTTP_201_CREATED + mock_post.return_value.json.return_value = jsonable_encoder(provider_read_extended) + resp_body = crud.create(data=provider_create) mock_post.return_value.raise_for_status.assert_not_called() assert isinstance(resp_body, ProviderReadExtended) -@patch("src.crud.requests.delete") -def test_remove(mock_delete: Mock, crud: CRUD) -> None: - item = ProviderRead( - uid=uuid4(), name=random_lower_string(), type=random_provider_type() - ) - mock_delete.return_value.status_code = 204 - resp_body = crud.remove(item=item) - +@patch( + "src.crud.requests.delete", + return_value=Mock(status_code=status.HTTP_204_NO_CONTENT), +) +def test_remove(mock_delete: Mock, crud: CRUD, provider_read: ProviderRead) -> None: + resp_body = crud.remove(item=provider_read) mock_delete.return_value.raise_for_status.assert_not_called() assert not resp_body @patch("src.crud.requests.put") -def test_update(mock_put: Mock, crud: CRUD) -> None: - old_data = ProviderCreateExtended( - name=random_lower_string(), type=random_provider_type() - ) - old_item = ProviderReadExtended(uid=uuid4(), **old_data.dict()) - new_data = ProviderCreateExtended( - name=random_lower_string(), type=random_provider_type() +def test_update( + mock_put: Mock, + crud: CRUD, + provider_create: ProviderCreateExtended, + provider_read: ProviderRead, +) -> None: + new_create_data = copy.deepcopy(provider_create) + new_create_data.name = random_lower_string() + new_read_data = ProviderReadExtended( + **new_create_data.dict(), uid=provider_read.uid ) - new_item = ProviderReadExtended(uid=uuid4(), **new_data.dict()) - mock_put.return_value.status_code = 200 - mock_put.return_value.json.return_value = jsonable_encoder(new_item) - resp_body = crud.update(new_data=new_data, old_data=old_item) - + mock_put.return_value.status_code = status.HTTP_200_OK + mock_put.return_value.json.return_value = jsonable_encoder(new_read_data) + resp_body = crud.update(new_data=new_create_data, old_data=provider_read) mock_put.return_value.raise_for_status.assert_not_called() assert isinstance(resp_body, ProviderReadExtended) -@patch("src.crud.requests.put") -def test_update_no_changes(mock_put: Mock, crud: CRUD) -> None: - old_data = ProviderCreateExtended( - name=random_lower_string(), type=random_provider_type() - ) - old_item = ProviderReadExtended(uid=uuid4(), **old_data.dict()) - new_data = ProviderCreateExtended( - name=random_lower_string(), type=random_provider_type() - ) - new_item = ProviderReadExtended(uid=uuid4(), **new_data.dict()) - mock_put.return_value.status_code = 304 - mock_put.return_value.json.return_value = jsonable_encoder(new_item) - resp_body = crud.update(new_data=new_data, old_data=old_item) - +@patch( + "src.crud.requests.put", return_value=Mock(status_code=status.HTTP_304_NOT_MODIFIED) +) +def test_update_no_changes( + mock_put: Mock, + crud: CRUD, + provider_create: ProviderCreateExtended, + provider_read: ProviderRead, +) -> None: + resp_body = crud.update(new_data=provider_create, old_data=provider_read) mock_put.return_value.raise_for_status.assert_not_called() assert not resp_body - - -@parametrize_with_cases("error_code", cases=CaseErrorCode) -@parametrize_with_cases("request", cases=CaseRequest) -def test_fail_operation(request: str, error_code: int, crud: CRUD) -> None: - with patch(f"src.crud.requests.{request}") as mock_req: - mock_req.return_value.status_code = error_code - execute_request(crud=crud, request=request) - mock_req.return_value.raise_for_status.assert_called() - - -@parametrize_with_cases("exception", cases=CaseConnException) -@parametrize_with_cases("request", cases=CaseRequest) -def test_read_no_connection( - request: str, exception: Union[Type[ConnectionError], Type[ReadTimeout]], crud: CRUD -) -> None: - with patch(f"src.crud.requests.{request}") as mock_req: - mock_req.side_effect = exception() - with pytest.raises(exception): - execute_request(crud=crud, request=request) diff --git a/tests/providers/openstack/test_connection.py b/tests/providers/openstack/test_connection.py index 2b63fce..f88cd87 100644 --- a/tests/providers/openstack/test_connection.py +++ b/tests/providers/openstack/test_connection.py @@ -1,109 +1,20 @@ -from typing import Tuple, Union -from unittest.mock import Mock, patch -from uuid import uuid4 +from typing import Tuple -import pytest -from app.provider.schemas_extended import ( - AuthMethodCreate, - IdentityProviderCreateExtended, - SLACreateExtended, - UserGroupCreateExtended, -) -from keystoneauth1.exceptions.auth_plugins import NoMatchingPlugin -from keystoneauth1.exceptions.connection import ConnectFailure -from keystoneauth1.exceptions.http import NotFound, Unauthorized -from pytest_cases import parametrize, parametrize_with_cases +from app.provider.schemas_extended import IdentityProviderCreateExtended -from src.models.identity_provider import Issuer -from src.models.provider import AuthMethod, Openstack, Project +from src.models.provider import Openstack, Project from src.providers.openstack import connect_to_provider -from tests.schemas.utils import random_lower_string, random_start_end_dates, random_url +from tests.schemas.utils import random_lower_string -@parametrize( - exception=[ - "invalid_url", - "expired_token", - "wrong_auth_type", - "wrong_idp_name", - "wrong_protocol", - "invalid_project_id", - "timeout", - ] -) -def case_exception( - exception: str, -) -> Union[ConnectFailure, Unauthorized, NoMatchingPlugin, NotFound]: - if exception == "invalid_url" or exception == "timeout": - return ConnectFailure() - elif ( - exception == "expired_token" - or exception == "wrong_protocol" - or exception == "invalid_project_id" - ): - return Unauthorized() - elif exception == "wrong_auth_type": - return NoMatchingPlugin("fake") - elif exception == "wrong_idp_name": - return NotFound() - - -@pytest.fixture -def configurations() -> ( - Tuple[Openstack, IdentityProviderCreateExtended, Project, str, str] -): - project_id = uuid4() - start_date, end_date = random_start_end_dates() - sla = SLACreateExtended( - doc_uuid=uuid4(), start_date=start_date, end_date=end_date, project=project_id - ) - user_group = UserGroupCreateExtended(name=random_lower_string(), sla=sla) - relationship = AuthMethodCreate( - idp_name=random_lower_string(), protocol=random_lower_string() - ) - issuer = IdentityProviderCreateExtended( - endpoint=random_url(), - group_claim=random_lower_string(), - relationship=relationship, - user_groups=[user_group], - ) - trusted_idp = AuthMethod( - endpoint=issuer.endpoint, - name=relationship.idp_name, - protocol=relationship.protocol, - ) - project = Project(id=project_id, sla=sla.doc_uuid) - provider_conf = Openstack( - name=random_lower_string(), - auth_url=random_url(), - identity_providers=[trusted_idp], - projects=[project], - ) +def test_connection( + configurations: Tuple[IdentityProviderCreateExtended, Openstack, Project], +) -> None: + """Connection creation always succeeds, it is its usage that may fail.""" + (idp, provider_conf, project) = configurations region_name = random_lower_string() token = random_lower_string() - return provider_conf, issuer, project, region_name, token - - -@patch("src.providers.openstack.connect") -@parametrize_with_cases("exception", cases=".") -def test_fail_connection( - mock_func: Mock, - exception: Union[ConnectFailure, Unauthorized, NoMatchingPlugin, NotFound], - configurations: Tuple[Openstack, Issuer, Project, str], -) -> None: - mock_func.side_effect = exception - (provider_conf, idp, project, region_name, token) = configurations - assert not connect_to_provider( - provider_conf=provider_conf, - idp=idp, - project_id=project.id, - region_name=region_name, - token=token, - ) - -def test_connection(configurations: Tuple[Openstack, Issuer, Project, str]) -> None: - (provider_conf, idp, project, region_name, token) = configurations conn = connect_to_provider( provider_conf=provider_conf, idp=idp, diff --git a/tests/providers/openstack/test_flavors.py b/tests/providers/openstack/test_flavors.py index 7226e84..51230a9 100644 --- a/tests/providers/openstack/test_flavors.py +++ b/tests/providers/openstack/test_flavors.py @@ -1,108 +1,29 @@ -from random import getrandbits, randint -from typing import Any, Dict, List +from typing import Dict, List from unittest.mock import Mock, patch from uuid import uuid4 -import pytest from openstack.compute.v2.flavor import Flavor -from pytest_cases import case, parametrize, parametrize_with_cases from src.providers.openstack import get_flavors -from tests.schemas.utils import random_float, random_lower_string -def flavor_data() -> Dict[str, Any]: - return { - "name": random_lower_string(), - "disk": randint(0, 100), - "is_public": getrandbits(1), - "ram": randint(0, 100), - "vcpus": randint(0, 100), - "swap": randint(0, 100), - "ephemeral": randint(0, 100), - "is_disabled": False, - "rxtx_factor": random_float(0, 100), - "extra_specs": {}, - } - - -@case(tags=["public"]) -@parametrize(public=[True, False]) -def case_public(public: bool) -> bool: - return public - - -@case(tags=["extra_specs"]) -@parametrize( - extra_specs=[ - "gpu_number", - "aggregate_instance_extra_specs:local_storage", - "infiniband", - ] -) -def case_extra_specs(extra_specs: str) -> Dict[str, Any]: - if extra_specs == "gpu_number": - return { - "gpu_number": randint(1, 100), - "gpu_model": random_lower_string(), - "gpu_vendor": random_lower_string(), - } - if extra_specs == "infiniband": - return {extra_specs: getrandbits(1)} - if extra_specs == "aggregate_instance_extra_specs:local_storage": - return {extra_specs: random_lower_string()} - - -@pytest.fixture -def flavor_disabled() -> Flavor: - d = flavor_data() - d["is_disabled"] = True - return Flavor(**d) - - -@pytest.fixture -def flavor_with_desc() -> Flavor: - d = flavor_data() - d["description"] = random_lower_string() - return Flavor(**d) - - -@pytest.fixture -@parametrize_with_cases("public", cases=".", has_tag="public") -def flavor_no_extra_specs(public: bool) -> Flavor: - d = flavor_data() - d["is_public"] = public - return Flavor(**d) - - -@pytest.fixture -@parametrize_with_cases("extra_specs", cases=".", has_tag="extra_specs") -def flavor_with_extra_specs(extra_specs: Dict[str, Any]) -> Flavor: - d = flavor_data() - d["extra_specs"] = extra_specs - return Flavor(**d) - - -@pytest.fixture -@parametrize( - f=[ - flavor_disabled, - flavor_with_extra_specs, - flavor_no_extra_specs, - flavor_with_desc, - ] -) -def flavor(f: Flavor) -> Flavor: - return f +def get_allowed_project_ids(*args, **kwargs) -> List[Dict[str, str]]: + return [{"tenant_id": uuid4().hex}] @patch("src.providers.openstack.Connection.compute") @patch("src.providers.openstack.Connection") -def test_retrieve_flavors(mock_conn: Mock, mock_compute: Mock, flavor: Flavor) -> None: - def get_allowed_project_ids(*args, **kwargs) -> List[Dict[str, str]]: - return [{"tenant_id": uuid4().hex}] +def test_retrieve_flavors( + mock_conn: Mock, mock_compute: Mock, openstack_flavor: Flavor +) -> None: + """Successful retrieval of a flavors. - flavors = list(filter(lambda x: not x.is_disabled, [flavor])) + Retrieve only active flavors. + + Flavors retrieval fail is not tested here. It is tested where the exception is + caught: get_data_from_openstack function. + """ + flavors = list(filter(lambda x: not x.is_disabled, [openstack_flavor])) mock_compute.flavors.return_value = flavors mock_compute.get_flavor_access.side_effect = get_allowed_project_ids mock_conn.compute = mock_compute @@ -111,22 +32,25 @@ def get_allowed_project_ids(*args, **kwargs) -> List[Dict[str, str]]: assert len(data) == len(flavors) if len(data) > 0: item = data[0] - assert item.description == (flavor.description if flavor.description else "") - assert item.uuid == flavor.id - assert item.name == flavor.name - assert item.disk == flavor.disk - assert item.is_public == flavor.is_public - assert item.ram == flavor.ram - assert item.vcpus == flavor.vcpus - assert item.swap == flavor.swap - assert item.ephemeral == flavor.ephemeral - assert item.gpus == flavor.extra_specs.get("gpu_number", 0) - assert item.gpu_model == flavor.extra_specs.get("gpu_model") - assert item.gpu_vendor == flavor.extra_specs.get("gpu_vendor") - assert item.local_storage == flavor.extra_specs.get( + if openstack_flavor.description: + assert item.description == openstack_flavor.description + else: + assert item.description == "" + assert item.uuid == openstack_flavor.id + assert item.name == openstack_flavor.name + assert item.disk == openstack_flavor.disk + assert item.is_public == openstack_flavor.is_public + assert item.ram == openstack_flavor.ram + assert item.vcpus == openstack_flavor.vcpus + assert item.swap == openstack_flavor.swap + assert item.ephemeral == openstack_flavor.ephemeral + assert item.gpus == openstack_flavor.extra_specs.get("gpu_number", 0) + assert item.gpu_model == openstack_flavor.extra_specs.get("gpu_model") + assert item.gpu_vendor == openstack_flavor.extra_specs.get("gpu_vendor") + assert item.local_storage == openstack_flavor.extra_specs.get( "aggregate_instance_extra_specs:local_storage" ) - assert item.infiniband == flavor.extra_specs.get("infiniband", False) + assert item.infiniband == openstack_flavor.extra_specs.get("infiniband", False) if item.is_public: assert len(item.projects) == 0 else: diff --git a/tests/providers/openstack/test_images.py b/tests/providers/openstack/test_images.py index 83e886f..af696dc 100644 --- a/tests/providers/openstack/test_images.py +++ b/tests/providers/openstack/test_images.py @@ -1,107 +1,68 @@ -from typing import Any, Dict, List, Optional +from typing import List, Optional from unittest.mock import Mock, patch from uuid import uuid4 -import pytest from openstack.image.v2.image import Image from openstack.image.v2.member import Member from pytest_cases import case, parametrize, parametrize_with_cases from src.providers.openstack import get_images -from tests.providers.openstack.utils import random_image_status, random_image_visibility -from tests.schemas.utils import random_image_os_type, random_lower_string -# `shared` has a separate case -@case(tags=["visibility"]) -@parametrize(visibility=["public", "private", "community"]) -def case_visibility(visibility: bool) -> bool: - return visibility - - -@case(tags=["tags"]) -@parametrize(tags=range(4)) -def case_tags(tags: int) -> List[str]: - if tags == 0: +class CaseTags: + def case_single_valid_tag(self) -> List[str]: return ["one"] - if tags == 1: - return ["two"] - if tags == 2: - return ["one", "two"] - if tags == 3: - return ["one-two"] - - -@case(tags=["no_tags"]) -@parametrize(empty_list=[True, False]) -def case_empty_tag_list(empty_list: bool) -> Optional[List]: - return [] if empty_list else None - -@case(tags=["acceptance_status"]) -@parametrize(acceptance_status=["accepted", "rejected", "pending"]) -def case_acceptance_status(acceptance_status: bool) -> bool: - return acceptance_status + @parametrize(case=[0, 1]) + def case_single_invalid_tag(self, case: int) -> List[str]: + return ["two"] if case else ["one-two"] - -def image_data() -> Dict[str, Any]: - return { - "id": uuid4().hex, - "name": random_lower_string(), - "status": "active", - "owner": uuid4().hex, - "os_type": random_image_os_type(), - "os_distro": random_lower_string(), - "os_version": random_lower_string(), - "architecture": random_lower_string(), - "kernel_id": random_lower_string(), - "visibility": random_image_visibility(), - } - - -@pytest.fixture -def image_disabled() -> Image: - d = image_data() - d["status"] = random_image_status(exclude=["active"]) - return Image(**d) + def case_at_least_one_valid_tag(self) -> List[str]: + return ["one", "two"] -@pytest.fixture -@parametrize_with_cases("visibility", cases=".", has_tag="visibility") -def image_visible(visibility: str) -> Image: - d = image_data() - d["visibility"] = visibility - return Image(**d) +class CaseTagList: + @case(tags=["empty"]) + def case_empty_tag_list(self) -> Optional[List]: + return [] + @case(tags=["empty"]) + def case_no_list(self) -> Optional[List]: + return None -@pytest.fixture -@parametrize_with_cases("tags", cases=".", has_tag="tags") -def image_with_tags(tags: List[str]) -> Image: - d = image_data() - d["tags"] = tags - return Image(**d) + @case(tags=["full"]) + def case_list(self) -> Optional[List]: + return ["one"] -@pytest.fixture -def image_shared() -> Image: - d = image_data() - d["visibility"] = "shared" - return Image(**d) +class CaseAcceptStatus: + @parametrize(acceptance_status=["accepted", "rejected", "pending"]) + def case_acceptance_status(self, acceptance_status: bool) -> bool: + return acceptance_status -@pytest.fixture -@parametrize(i=[image_disabled, image_visible]) -def image(i: Image) -> Image: - return i +def filter_images(image: Image, tags: Optional[List[str]]) -> bool: + valid_tag = tags is None or len(tags) == 0 + if not valid_tag: + valid_tag = len(set(image.tags).intersection(set(tags))) > 0 + return image.status == "active" and valid_tag @patch("src.providers.openstack.Connection.image") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("tags", cases=".", has_tag="no_tags") -def test_retrieve_images( - mock_conn: Mock, mock_image: Mock, image: Image, tags: Optional[List] +@parametrize_with_cases("tags", cases=CaseTagList) +def test_retrieve_public_images( + mock_conn: Mock, mock_image: Mock, openstack_image: Image, tags: Optional[List[str]] ) -> None: - images = list(filter(lambda x: x.status == "active", [image])) + """Successful retrieval of an Image. + + Retrieve only active images and with the tags contained in the target tags list. + If the target tags list is empty or None, all active images are valid ones. + + Images retrieval fail is not tested here. It is tested where the exception is + caught: get_data_from_openstack function. + """ + images = list(filter(lambda x: filter_images(x, tags), [openstack_image])) mock_image.images.return_value = images mock_conn.image = mock_image data = get_images(mock_conn, tags=tags) @@ -110,66 +71,53 @@ def test_retrieve_images( if len(data) > 0: item = data[0] assert item.description == "" - assert item.uuid == image.id - assert item.name == image.name - assert item.os_type == image.os_type - assert item.os_distro == image.os_distro - assert item.os_version == image.os_version - assert item.architecture == image.architecture - assert item.kernel_id == image.kernel_id + assert item.uuid == openstack_image.id + assert item.name == openstack_image.name + assert item.os_type == openstack_image.os_type + assert item.os_distro == openstack_image.os_distro + assert item.os_version == openstack_image.os_version + assert item.architecture == openstack_image.architecture + assert item.kernel_id == openstack_image.kernel_id assert not item.cuda_support assert not item.gpu_driver - assert item.tags == image.tags - if image.visibility in ["private", "shared"]: - assert not item.is_public - else: - assert item.is_public - if item.is_public: - assert len(item.projects) == 0 - else: - assert len(item.projects) == len([image.owner_id]) + assert item.tags == openstack_image.tags + assert item.is_public + assert len(item.projects) == 0 @patch("src.providers.openstack.Connection.image") @patch("src.providers.openstack.Connection") -def test_retrieve_images_with_tags( - mock_conn: Mock, mock_image: Mock, image_with_tags: Image +@parametrize_with_cases("acceptance_status", cases=CaseAcceptStatus) +def test_retrieve_private_images( + mock_conn: Mock, + mock_image: Mock, + openstack_image_private: Image, + acceptance_status: str, ) -> None: - target_tags = ["one"] - images = list( - filter(lambda x: set(x.tags).intersection(set(target_tags)), [image_with_tags]) - ) - mock_image.images.return_value = images - mock_conn.image = mock_image - data = get_images(mock_conn, tags=target_tags) - assert len(data) == len(images) + """Successful retrieval of an Image with a specified visibility. + Check that the is_public flag is correctly set to False and projects list is + correct. + """ -@patch("src.providers.openstack.Connection.image") -@patch("src.providers.openstack.Connection") -@parametrize_with_cases("acceptance_status", cases=".", has_tag="acceptance_status") -def test_retrieve_images_with_shared_visibility( - mock_conn: Mock, mock_image: Mock, image_shared: Image, acceptance_status: str -) -> None: def get_allowed_members(*args, **kwargs) -> List[Member]: return [ - Member(status="accepted", id=image_shared.owner_id), + Member(status="accepted", id=openstack_image_private.owner_id), Member(status=acceptance_status, id=uuid4().hex), ] - images = [image_shared] + images = [openstack_image_private] mock_image.images.return_value = images mock_image.members.side_effect = get_allowed_members mock_conn.image = mock_image data = get_images(mock_conn) assert len(data) == len(images) - if len(data) > 0: - item = data[0] - assert not item.is_public - allowed_members = filter( - lambda x: x.status == "accepted", get_allowed_members() + assert not data[0].is_public + if openstack_image_private.visibility == "private": + allowed_project_ids = [openstack_image_private.owner_id] + elif openstack_image_private.visibility == "shared": + allowed_project_ids = list( + filter(lambda x: x.status == "accepted", get_allowed_members()) ) - allowed_project_ids = set([i.id for i in allowed_members]) - allowed_project_ids.add(image_shared.owner_id) - assert len(item.projects) == len(allowed_project_ids) + assert len(data[0].projects) == len(allowed_project_ids) diff --git a/tests/providers/openstack/test_networks.py b/tests/providers/openstack/test_networks.py index 21442ff..698854b 100644 --- a/tests/providers/openstack/test_networks.py +++ b/tests/providers/openstack/test_networks.py @@ -1,191 +1,151 @@ -from random import getrandbits, randint -from typing import Any, Dict, List, Optional +from typing import List, Optional from unittest.mock import Mock, PropertyMock, patch from uuid import uuid4 -import pytest from openstack.network.v2.network import Network from pytest_cases import case, parametrize, parametrize_with_cases from src.models.provider import PrivateNetProxy from src.providers.openstack import get_networks -from tests.providers.openstack.utils import random_network_status -from tests.schemas.utils import random_ip, random_lower_string -@case(tags=["is_shared"]) -@parametrize(is_shared=[True, False]) -def case_is_shared(is_shared: bool) -> bool: - return is_shared +class CaseTagList: + @case(tags=["empty"]) + def case_empty_tag_list(self) -> Optional[List]: + return [] + @case(tags=["empty"]) + def case_no_list(self) -> Optional[List]: + return None -@case(tags=["tags"]) -@parametrize(tags=range(4)) -def case_tags(tags: int) -> List[str]: - if tags == 0: + @case(tags=["full"]) + def case_list(self) -> Optional[List]: return ["one"] - if tags == 1: - return ["two"] - if tags == 2: - return ["one", "two"] - if tags == 3: - return ["one-two"] -@case(tags=["no_tags"]) -@parametrize(empty_list=[True, False]) -def case_empty_tag_list(empty_list: bool) -> Optional[List]: - return [] if empty_list else None +class CaseDefaultNet: + @parametrize(default_net=["private", "public"]) + def case_default_attr(self, default_net: str) -> str: + return default_net -@case(tags=["is_default"]) -@parametrize(default_attr=["private", "public", "network"]) -def case_default_attr(default_attr: bool) -> bool: - return default_attr - - -def network_data() -> Dict[str, Any]: - return { - "id": uuid4().hex, - "name": random_lower_string(), - "status": "active", - "project_id": uuid4().hex, - "is_default": False, - "is_router_external": getrandbits(1), - "is_shared": getrandbits(1), - "mtu": randint(1, 100), - } - - -@pytest.fixture -def network_base() -> Network: - return Network(**network_data()) - - -@pytest.fixture -def network_disabled() -> Network: - d = network_data() - d["status"] = random_network_status(exclude=["active"]) - return Network(**d) - - -@pytest.fixture -def network_with_desc() -> Network: - d = network_data() - d["description"] = random_lower_string() - return Network(**d) - - -@pytest.fixture -@parametrize_with_cases("is_shared", cases=".", has_tag="is_shared") -def network_shared(is_shared: bool) -> Network: - d = network_data() - d["is_shared"] = is_shared - return Network(**d) - - -@pytest.fixture -@parametrize_with_cases("tags", cases=".", has_tag="tags") -def network_with_tags(tags: List[str]) -> Network: - d = network_data() - d["tags"] = tags - return Network(**d) - - -@pytest.fixture -@parametrize(i=[network_disabled, network_shared, network_with_desc]) -def network(i: Network) -> Network: - return i +def filter_networks(network: Network, tags: Optional[List[str]]) -> bool: + valid_tag = tags is None or len(tags) == 0 + if not valid_tag: + valid_tag = len(set(network.tags).intersection(set(tags))) > 0 + return network.status == "active" and valid_tag @patch("src.providers.openstack.Connection.network") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("tags", cases=".", has_tag="no_tags") +@parametrize_with_cases("tags", cases=CaseTagList) def test_retrieve_networks( - mock_conn: Mock, mock_network: Mock, network: Network, tags: Optional[List] + mock_conn: Mock, + mock_network: Mock, + openstack_network: Network, + tags: Optional[List[str]], ) -> None: - networks = list(filter(lambda x: x.status == "active", [network])) + """Successful retrieval of a Network. + + Retrieve only active networks and with the tags contained in the target tags list. + If the target tags list is empty or None, all active networks are valid ones. + + Networks retrieval fail is not tested here. It is tested where the exception is + caught: get_data_from_openstack function. + """ + networks = list(filter(lambda x: filter_networks(x, tags), [openstack_network])) mock_network.networks.return_value = networks mock_conn.network = mock_network - type(mock_conn).current_project_id = PropertyMock(return_value=network.project_id) + type(mock_conn).current_project_id = PropertyMock( + return_value=openstack_network.project_id + ) data = get_networks(mock_conn, tags=tags) assert len(data) == len(networks) if len(data) > 0: item = data[0] - assert item.description == (network.description if network.description else "") - assert item.uuid == network.id - assert item.name == network.name - assert item.is_shared == network.is_shared - assert item.is_router_external == network.is_router_external - assert item.is_default == network.is_default - assert item.mtu == network.mtu + if openstack_network.description: + assert item.description == openstack_network.description + else: + assert item.description == "" + assert item.uuid == openstack_network.id + assert item.name == openstack_network.name + assert item.is_shared == openstack_network.is_shared + assert item.is_router_external == openstack_network.is_router_external + assert item.is_default == bool(openstack_network.is_default) + assert item.mtu == openstack_network.mtu assert not item.proxy_ip assert not item.proxy_user - assert item.tags == network.tags + assert item.tags == openstack_network.tags if item.is_shared: assert not item.project else: assert item.project - assert item.project == network.project_id + assert item.project == openstack_network.project_id @patch("src.providers.openstack.Connection.network") @patch("src.providers.openstack.Connection") -def test_retrieve_networks_with_tags( - mock_conn: Mock, mock_network: Mock, network_with_tags: Network +def test_not_owned_private_net( + mock_conn: Mock, mock_network: Mock, openstack_network_base: Network ) -> None: - target_tags = ["one"] - networks = list( - filter( - lambda x: set(x.tags).intersection(set(target_tags)), [network_with_tags] - ) - ) + """Networks owned by another project are not returned.""" + networks = [openstack_network_base] mock_network.networks.return_value = networks mock_conn.network = mock_network - type(mock_conn).current_project_id = PropertyMock( - return_value=network_with_tags.project_id - ) - data = get_networks(mock_conn, tags=target_tags) - assert len(data) == len(networks) + type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) + data = get_networks(mock_conn) + assert len(data) == 0 @patch("src.providers.openstack.Connection.network") @patch("src.providers.openstack.Connection") def test_retrieve_networks_with_proxy( - mock_conn: Mock, mock_network: Mock, network_base: Network + mock_conn: Mock, + mock_network: Mock, + openstack_network_base: Network, + net_proxy: PrivateNetProxy, ) -> None: - networks = [network_base] + """Test retrieving networks with proxy ip and user. + + The network does not have proxy ip and user. This function attaches them to the + network. + """ + networks = [openstack_network_base] mock_network.networks.return_value = networks mock_conn.network = mock_network type(mock_conn).current_project_id = PropertyMock( - return_value=network_base.project_id + return_value=openstack_network_base.project_id ) - proxy = PrivateNetProxy(ip=random_ip(), user=random_lower_string()) - data = get_networks(mock_conn, proxy=proxy) + data = get_networks(mock_conn, proxy=net_proxy) assert len(data) == len(networks) - if len(data) > 0: - item = data[0] - assert item.proxy_ip == str(proxy.ip) - assert item.proxy_user == proxy.user + assert data[0].proxy_ip == str(net_proxy.ip) + assert data[0].proxy_user == net_proxy.user @patch("src.providers.openstack.Connection.network") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("default_attr", cases=".", has_tag="is_default") +@parametrize_with_cases("default_net", cases=CaseDefaultNet) def test_retrieve_networks_with_default_net( - mock_conn: Mock, mock_network: Mock, network_shared: Network, default_attr: str + mock_conn: Mock, + mock_network: Mock, + openstack_default_network: Network, + default_net: str, ) -> None: - default_private_net = network_shared.name if default_attr == "private" else None - default_public_net = network_shared.name if default_attr == "public" else None - network_shared.is_default = default_attr == "network" - - networks = [network_shared] + """Test how the is_default attribute in NetworkCreateExtended is built.""" + default_private_net = None + default_public_net = None + if default_net == "private": + default_private_net = openstack_default_network.name + if default_net == "public": + default_public_net = openstack_default_network.name + + networks = [openstack_default_network] mock_network.networks.return_value = networks mock_conn.network = mock_network type(mock_conn).current_project_id = PropertyMock( - return_value=network_shared.project_id + return_value=openstack_default_network.project_id ) data = get_networks( mock_conn, @@ -196,25 +156,17 @@ def test_retrieve_networks_with_default_net( assert len(data) == len(networks) if len(data) > 0: item = data[0] + public_default_net_match_shared_net = ( + openstack_default_network.is_shared + and default_public_net == openstack_default_network.name + ) + private_default_net_match_not_shared_net = ( + not openstack_default_network.is_shared + and default_private_net == openstack_default_network.name + ) + net_marked_as_default = openstack_default_network.is_default assert item.is_default == ( - (network_shared.is_shared and default_public_net == network_shared.name) - or ( - not network_shared.is_shared - and default_private_net == network_shared.name - ) - or network_shared.is_default + public_default_net_match_shared_net + or private_default_net_match_not_shared_net + or net_marked_as_default ) - - -@patch("src.providers.openstack.Connection.network") -@patch("src.providers.openstack.Connection") -def test_not_owned_private_net( - mock_conn: Mock, mock_network: Mock, network_base: Network -) -> None: - network_base.is_shared = False - networks = [network_base] - mock_network.networks.return_value = networks - mock_conn.network = mock_network - type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) - data = get_networks(mock_conn) - assert len(data) == 0 diff --git a/tests/providers/openstack/test_projects.py b/tests/providers/openstack/test_projects.py index 7698125..10971a9 100644 --- a/tests/providers/openstack/test_projects.py +++ b/tests/providers/openstack/test_projects.py @@ -2,7 +2,6 @@ from uuid import uuid4 import pytest -from keystoneauth1.exceptions.http import Unauthorized from openstack.identity.v3.project import Project from pytest_cases import parametrize, parametrize_with_cases @@ -10,14 +9,16 @@ from tests.schemas.utils import random_lower_string -@parametrize(with_desc=[True, False]) -def case_with_desc(with_desc: bool) -> bool: - return with_desc +class CaseWithDesc: + @parametrize(with_desc=[True, False]) + def case_with_desc(self, with_desc: bool) -> bool: + return with_desc @pytest.fixture -@parametrize_with_cases("with_desc", cases=".") -def project(with_desc) -> Project: +@parametrize_with_cases("with_desc", cases=CaseWithDesc) +def openstack_project(with_desc) -> Project: + """Fixture with an Openstack project.""" d = {"id": uuid4().hex, "name": random_lower_string()} if with_desc: d["description"] = random_lower_string() @@ -27,24 +28,20 @@ def project(with_desc) -> Project: @patch("src.providers.openstack.Connection.identity") @patch("src.providers.openstack.Connection") def test_retrieve_project( - mock_conn: Mock, mock_identity: Mock, project: Project + mock_conn: Mock, mock_identity: Mock, openstack_project: Project ) -> None: - mock_identity.get_project.return_value = project - mock_conn.identity = mock_identity - type(mock_conn).current_project_id = PropertyMock(return_value=project.id) - item = get_project(mock_conn) - assert item.description == (project.description if project.description else "") - assert item.uuid == project.id - assert item.name == project.name + """Successful retrieval of a project. - -@patch("src.providers.openstack.Connection.identity") -@patch("src.providers.openstack.Connection") -def test_fail_retrieve_project( - mock_conn: Mock, mock_identity: Mock, project: Project -) -> None: - mock_identity.get_project.side_effect = Unauthorized() + Project retrieval fail is not tested here. It is tested where the exception is + caught: get_data_from_openstack function. + """ + mock_identity.get_project.return_value = openstack_project mock_conn.identity = mock_identity - type(mock_conn).current_project_id = PropertyMock(return_value=project.id) - with pytest.raises(Unauthorized): - get_project(mock_conn) + type(mock_conn).current_project_id = PropertyMock(return_value=openstack_project.id) + item = get_project(mock_conn) + if openstack_project.description: + assert item.description == openstack_project.description + else: + assert item.description == "" + assert item.uuid == openstack_project.id + assert item.name == openstack_project.name diff --git a/tests/providers/openstack/test_quotas.py b/tests/providers/openstack/test_quotas.py index 4075b6c..c790033 100644 --- a/tests/providers/openstack/test_quotas.py +++ b/tests/providers/openstack/test_quotas.py @@ -1,9 +1,6 @@ -from random import randint -from typing import Dict from unittest.mock import Mock, PropertyMock, patch from uuid import uuid4 -import pytest from app.quota.enum import QuotaType from openstack.block_storage.v3.quota_set import QuotaSet as BlockStorageQuotaSet from openstack.compute.v2.quota_set import QuotaSet as ComputeQuotaSet @@ -16,116 +13,66 @@ ) -@pytest.fixture -def block_storage_quotas() -> Dict[str, int]: - return { - "backup_gigabytes": randint(0, 100), - "backups": randint(0, 100), - "gigabytes": randint(0, 100), - "groups": randint(0, 100), - "per_volume_gigabytes": randint(0, 100), - "snapshots": randint(0, 100), - "volumes": randint(0, 100), - } - - -@pytest.fixture -def compute_quotas() -> Dict[str, int]: - return { - "cores": randint(0, 100), - "fixed_ips": randint(0, 100), - "floating_ips": randint(0, 100), - "injected_file_content_bytes": randint(0, 100), - "injected_file_path_bytes": randint(0, 100), - "injected_files": randint(0, 100), - "instances": randint(0, 100), - "key_pairs": randint(0, 100), - "metadata_items": randint(0, 100), - "networks": randint(0, 100), - "ram": randint(0, 100), - "security_group_rules": randint(0, 100), - "security_groups": randint(0, 100), - "server_groups": randint(0, 100), - "server_group_members": randint(0, 100), - "force": False, - } - - -@pytest.fixture -def network_quotas() -> Dict[str, int]: - return { - "check_limit": False, - "floating_ips": randint(0, 100), - "health_monitors": randint(0, 100), - "listeners": randint(0, 100), - "load_balancers": randint(0, 100), - "l7_policies": randint(0, 100), - "networks": randint(0, 100), - "pools": randint(0, 100), - "ports": randint(0, 100), - # "project_id": ?, - "rbac_policies": randint(0, 100), - "routers": randint(0, 100), - "subnets": randint(0, 100), - "subnet_pools": randint(0, 100), - "security_group_rules": randint(0, 100), - "security_groups": randint(0, 100), - } - - @patch("src.providers.openstack.Connection.block_storage") @patch("src.providers.openstack.Connection") def test_retrieve_block_storage_quotas( - mock_conn: Mock, mock_block_storage: Mock, block_storage_quotas: Dict[str, int] + mock_conn: Mock, + mock_block_storage: Mock, + openstack_block_storage_quotas: BlockStorageQuotaSet, ) -> None: - mock_block_storage.get_quota_set.return_value = BlockStorageQuotaSet( - **block_storage_quotas - ) + """Retrieve a block storage quota.""" + mock_block_storage.get_quota_set.return_value = openstack_block_storage_quotas mock_conn.block_storage = mock_block_storage project_id = uuid4().hex type(mock_conn).current_project_id = PropertyMock(return_value=project_id) data = get_block_storage_quotas(mock_conn) assert data.type == QuotaType.BLOCK_STORAGE.value assert not data.per_user - assert data.gigabytes == block_storage_quotas.get("gigabytes") - assert data.per_volume_gigabytes == block_storage_quotas.get("per_volume_gigabytes") - assert data.volumes == block_storage_quotas.get("volumes") + assert data.gigabytes == openstack_block_storage_quotas.get("gigabytes") + assert data.per_volume_gigabytes == openstack_block_storage_quotas.get( + "per_volume_gigabytes" + ) + assert data.volumes == openstack_block_storage_quotas.get("volumes") assert data.project == project_id @patch("src.providers.openstack.Connection.compute") @patch("src.providers.openstack.Connection") def test_retrieve_compute_quotas( - mock_conn: Mock, mock_compute: Mock, compute_quotas: Dict[str, int] + mock_conn: Mock, mock_compute: Mock, openstack_compute_quotas: ComputeQuotaSet ) -> None: - mock_compute.get_quota_set.return_value = ComputeQuotaSet(**compute_quotas) + """Retrieve a compute quota.""" + mock_compute.get_quota_set.return_value = openstack_compute_quotas mock_conn.compute = mock_compute project_id = uuid4().hex type(mock_conn).current_project_id = PropertyMock(return_value=project_id) data = get_compute_quotas(mock_conn) assert data.type == QuotaType.COMPUTE.value assert not data.per_user - assert data.cores == compute_quotas.get("cores") - assert data.instances == compute_quotas.get("instances") - assert data.ram == compute_quotas.get("ram") + assert data.cores == openstack_compute_quotas.get("cores") + assert data.instances == openstack_compute_quotas.get("instances") + assert data.ram == openstack_compute_quotas.get("ram") assert data.project == project_id @patch("src.providers.openstack.Connection.network") @patch("src.providers.openstack.Connection") def test_retrieve_network_quotas( - mock_conn: Mock, mock_network: Mock, network_quotas: Dict[str, int] + mock_conn: Mock, mock_network: Mock, openstack_network_quotas: NetworkQuota ) -> None: - mock_network.get_quota.return_value = NetworkQuota(**network_quotas) + """Retrieve a network quota.""" + mock_network.get_quota.return_value = openstack_network_quotas mock_conn.network = mock_network project_id = uuid4().hex type(mock_conn).current_project_id = PropertyMock(return_value=project_id) data = get_network_quotas(mock_conn) assert data.type == QuotaType.NETWORK.value assert not data.per_user - assert data.ports == network_quotas.get("ports") - assert data.networks == network_quotas.get("networks") - assert data.public_ips == network_quotas.get("floating_ips") - assert data.security_groups == network_quotas.get("security_groups") - assert data.security_group_rules == network_quotas.get("security_group_rules") + assert data.ports == openstack_network_quotas.get("ports") + assert data.networks == openstack_network_quotas.get("networks") + assert data.public_ips == openstack_network_quotas.get("floating_ips") + assert data.security_groups == openstack_network_quotas.get("security_groups") + assert data.security_group_rules == openstack_network_quotas.get( + "security_group_rules" + ) assert data.project == project_id diff --git a/tests/providers/openstack/test_resources.py b/tests/providers/openstack/test_resources.py index 8f5848f..c6b942a 100644 --- a/tests/providers/openstack/test_resources.py +++ b/tests/providers/openstack/test_resources.py @@ -1,100 +1,91 @@ -from typing import Tuple +from typing import Tuple, Union from unittest.mock import Mock, patch -from uuid import uuid4 -import pytest from app.provider.schemas_extended import ( - AuthMethodCreate, BlockStorageServiceCreateExtended, ComputeServiceCreateExtended, IdentityProviderCreateExtended, NetworkServiceCreateExtended, ProjectCreate, - SLACreateExtended, - UserGroupCreateExtended, ) from app.service.enum import ( BlockStorageServiceName, ComputeServiceName, NetworkServiceName, ) +from keystoneauth1.exceptions.auth_plugins import NoMatchingPlugin from keystoneauth1.exceptions.connection import ConnectFailure -from pytest_cases import case, parametrize, parametrize_with_cases +from keystoneauth1.exceptions.http import NotFound, Unauthorized +from pytest_cases import parametrize, parametrize_with_cases -from src.models.identity_provider import Issuer -from src.models.provider import AuthMethod, Openstack, Project +from src.models.provider import Openstack, Project from src.providers.openstack import get_data_from_openstack -from tests.schemas.utils import random_lower_string, random_start_end_dates, random_url +from tests.schemas.utils import random_lower_string -@case(tags=["connection"]) -@parametrize(item=["connection", "project", "block_storage", "compute", "network"]) -def case_connection(item: str) -> str: - return item +class CaseConnException: + def case_connect_failure(self) -> ConnectFailure: + """The connection raises a ConnectFailure exception. + Happens when the auth_url is invalid or during a timeout. + """ + return ConnectFailure() -@case(tags=["item"]) -@parametrize(item=["block_storage", "compute", "network"]) -def case_absent_item(item: str) -> str: - return item + def case_unauthorized(self) -> Unauthorized: + """The connection raises an Unauthorized exception. + Happens when the token expires, the protocol is wrong or the project_id is + invalid. + """ + return Unauthorized() + + def case_no_matching_plugin(self) -> NoMatchingPlugin: + """The connection raises an NoMatchingPlugin exception. + + Happens when the auth method is not a valid one. + """ + return NoMatchingPlugin(random_lower_string()) + + def case_idp_not_found(self) -> NotFound: + """The connection raises an NotFound exception. + + Happens when the identity provider url is wrong. + """ + return NotFound() -@pytest.fixture -def configurations() -> ( - Tuple[Openstack, IdentityProviderCreateExtended, Project, str, str] -): - project_id = uuid4() - start_date, end_date = random_start_end_dates() - sla = SLACreateExtended( - doc_uuid=uuid4(), start_date=start_date, end_date=end_date, project=project_id - ) - user_group = UserGroupCreateExtended(name=random_lower_string(), sla=sla) - relationship = AuthMethodCreate( - idp_name=random_lower_string(), protocol=random_lower_string() - ) - issuer = IdentityProviderCreateExtended( - endpoint=random_url(), - group_claim=random_lower_string(), - relationship=relationship, - user_groups=[user_group], - ) - trusted_idp = AuthMethod( - endpoint=issuer.endpoint, - name=relationship.idp_name, - protocol=relationship.protocol, - ) - project = Project(id=project_id, sla=sla.doc_uuid) - provider_conf = Openstack( - name=random_lower_string(), - auth_url=random_url(), - identity_providers=[trusted_idp], - projects=[project], - ) - region_name = random_lower_string() - token = random_lower_string() - return provider_conf, issuer, project, region_name, token + +class CaseFailPoint: + @parametrize(call=["project", "block_storage", "compute", "network"]) + def case_lost_connection(self, call: str) -> str: + return call + + +class CaseAbsentService: + @parametrize(service=["block_storage", "compute", "network"]) + def case_absent(self, service: str) -> str: + return service @patch("src.providers.openstack.get_network_service") @patch("src.providers.openstack.get_compute_service") @patch("src.providers.openstack.get_block_storage_service") @patch("src.providers.openstack.get_project") -@patch("src.providers.openstack.connect_to_provider") -@parametrize_with_cases("fail_point", cases=".", has_tag="connection") -def test_no_connection( - mock_conn: Mock, +@parametrize_with_cases("exception", cases=CaseConnException) +@parametrize_with_cases("fail_point", cases=CaseFailPoint) +def test_connection_error( mock_project: Mock, mock_block_storage_service: Mock, mock_compute_service: Mock, mock_network_service: Mock, - configurations: Tuple[Openstack, Issuer, Project, str], + configurations: Tuple[IdentityProviderCreateExtended, Openstack, Project], project_create: ProjectCreate, block_storage_service_create: BlockStorageServiceCreateExtended, compute_service_create: ComputeServiceCreateExtended, network_service_create: NetworkServiceCreateExtended, fail_point: str, + exception: Union[ConnectFailure, Unauthorized, NoMatchingPlugin, NotFound], ) -> None: - """Test no initial connection and connection loss during procedure""" + """Test no initial connection or connection loss during procedure""" block_storage_service_create.name = BlockStorageServiceName.OPENSTACK_CINDER compute_service_create.name = ComputeServiceName.OPENSTACK_NOVA network_service_create.name = NetworkServiceName.OPENSTACK_NEUTRON @@ -104,18 +95,18 @@ def test_no_connection( mock_compute_service.return_value = compute_service_create mock_network_service.return_value = network_service_create - if fail_point == "connection": - mock_conn.return_value = None - elif fail_point == "project": - mock_project.side_effect = ConnectFailure() + if fail_point == "project": + mock_project.side_effect = exception elif fail_point == "block_storage": - mock_block_storage_service.side_effect = ConnectFailure() + mock_block_storage_service.side_effect = exception elif fail_point == "compute": - mock_compute_service.side_effect = ConnectFailure() + mock_compute_service.side_effect = exception elif fail_point == "network": - mock_network_service.side_effect = ConnectFailure() + mock_network_service.side_effect = exception - (provider_conf, issuer, project_conf, region_name, token) = configurations + (issuer, provider_conf, project_conf) = configurations + region_name = random_lower_string() + token = random_lower_string() resp = get_data_from_openstack( provider_conf=provider_conf, project_conf=project_conf, @@ -124,26 +115,49 @@ def test_no_connection( token=token, ) assert not resp + if fail_point == "project": + mock_project.assert_called() + mock_block_storage_service.assert_not_called() + mock_compute_service.assert_not_called() + mock_network_service.assert_not_called() + elif fail_point == "block_storage": + mock_project.assert_called() + mock_block_storage_service.assert_called() + mock_compute_service.assert_not_called() + mock_network_service.assert_not_called() + elif fail_point == "compute": + mock_project.assert_called() + mock_block_storage_service.assert_called() + mock_compute_service.assert_called() + mock_network_service.assert_not_called() + elif fail_point == "network": + mock_project.assert_called() + mock_block_storage_service.assert_called() + mock_compute_service.assert_called() + mock_network_service.assert_called() @patch("src.providers.openstack.get_network_service") @patch("src.providers.openstack.get_compute_service") @patch("src.providers.openstack.get_block_storage_service") @patch("src.providers.openstack.get_project") -@parametrize_with_cases("absent", cases=".", has_tag="item") +@parametrize_with_cases("absent", cases=CaseAbsentService) def test_retrieve_resources( mock_project: Mock, mock_block_storage_service: Mock, mock_compute_service: Mock, mock_network_service: Mock, - configurations: Tuple[Openstack, Issuer, Project, str], + configurations: Tuple[IdentityProviderCreateExtended, Openstack, Project], project_create: ProjectCreate, block_storage_service_create: BlockStorageServiceCreateExtended, compute_service_create: ComputeServiceCreateExtended, network_service_create: NetworkServiceCreateExtended, absent: str, ) -> None: - (provider_conf, issuer, project_conf, region_name, token) = configurations + """One of the services does not exist on the specified region.""" + (issuer, provider_conf, project_conf) = configurations + region_name = random_lower_string() + token = random_lower_string() project_create.uuid = project_conf.id block_storage_service_create.name = BlockStorageServiceName.OPENSTACK_CINDER @@ -179,10 +193,13 @@ def test_retrieve_resources( ) = resp assert proj assert identity_service - assert ( - not block_storage_service - if absent == "block_storage" - else block_storage_service - ) + if absent == "block_storage": + assert not block_storage_service + else: + assert block_storage_service assert not compute_service if absent == "compute" else compute_service assert not network_service if absent == "network" else network_service + mock_project.assert_called() + mock_block_storage_service.assert_called() + mock_compute_service.assert_called() + mock_network_service.assert_called() diff --git a/tests/providers/openstack/test_services.py b/tests/providers/openstack/test_services.py index c0ead4b..5ce6d07 100644 --- a/tests/providers/openstack/test_services.py +++ b/tests/providers/openstack/test_services.py @@ -1,10 +1,8 @@ import os -from random import getrandbits, randint -from typing import Dict, List, Optional +from typing import List, Optional from unittest.mock import Mock, PropertyMock, patch from uuid import uuid4 -import pytest from app.quota.schemas import BlockStorageQuotaBase, ComputeQuotaBase, NetworkQuotaBase from app.service.enum import ( BlockStorageServiceName, @@ -27,275 +25,182 @@ get_compute_service, get_network_service, ) -from tests.providers.openstack.utils import random_image_visibility -from tests.schemas.utils import ( - random_float, - random_image_os_type, - random_ip, - random_lower_string, - random_url, -) - +from tests.providers.openstack.test_flavors import get_allowed_project_ids +from tests.providers.openstack.test_images import filter_images +from tests.providers.openstack.test_networks import filter_networks +from tests.schemas.utils import random_url -@case(tags=["endpoint_resp"]) -@parametrize(raise_exc=[True, False]) -def case_raise_exc(raise_exc: bool) -> bool: - return raise_exc +class CaseServiceType: + @parametrize(type=["block_storage", "compute", "network"]) + def case_service(self, type: str) -> str: + return type -@case(tags=["user_quotas"]) -@parametrize(attr=[True, False]) -def case_with_limits(attr: bool) -> bool: - return attr +class CaseEndpointResp: + def case_raise_exc(self) -> EndpointNotFound: + return EndpointNotFound() -@case(tags=["tags"]) -@parametrize(attr=range(3)) -def case_tags(attr: int) -> Optional[List[str]]: - if attr == 0: + def case_none(self) -> None: return None - if attr == 1: - return [] - if attr == 2: - return ["none"] - - -@case(tags=["network"]) -@parametrize(attr=["priv_net", "pub_net", "proxy"]) -def case_attr(attr: str) -> str: - return attr - - -@pytest.fixture -def block_storage_provider_quota() -> BlockStorageQuotaSet: - return BlockStorageQuotaSet( - backup_gigabytes=randint(0, 100), - backups=randint(0, 100), - gigabytes=randint(0, 100), - groups=randint(0, 100), - per_volume_gigabytes=randint(0, 100), - snapshots=randint(0, 100), - volumes=randint(0, 100), - ) - - -@pytest.fixture -def compute_provider_quota() -> ComputeQuotaSet: - return ComputeQuotaSet( - cores=randint(0, 100), - fixed_ips=randint(0, 100), - floating_ips=randint(0, 100), - injected_file_content_bytes=randint(0, 100), - injected_file_path_bytes=randint(0, 100), - injected_files=randint(0, 100), - instances=randint(0, 100), - key_pairs=randint(0, 100), - metadata_items=randint(0, 100), - networks=randint(0, 100), - ram=randint(0, 100), - security_group_rules=randint(0, 100), - security_groups=randint(0, 100), - server_groups=randint(0, 100), - server_group_members=randint(0, 100), - force=False, - ) - - -@pytest.fixture -def network_provider_quota() -> NetworkQuota: - return NetworkQuota( - check_limit=False, - floating_ips=randint(0, 100), - health_monitors=randint(0, 100), - listeners=randint(0, 100), - load_balancers=randint(0, 100), - l7_policies=randint(0, 100), - networks=randint(0, 100), - pools=randint(0, 100), - ports=randint(0, 100), - # project_id= ?, - rbac_policies=randint(0, 100), - routers=randint(0, 100), - subnets=randint(0, 100), - subnet_pools=randint(0, 100), - security_group_rules=randint(0, 100), - security_groups=randint(0, 100), - ) - - -@pytest.fixture -def block_storage_user_quota() -> BlockStorageQuotaBase: - return BlockStorageQuotaBase( - per_user=True, - gigabytes=randint(0, 100), - per_volume_gigabytes=randint(0, 100), - volumes=randint(1, 100), - ) - - -@pytest.fixture -def compute_user_quota() -> ComputeQuotaBase: - return ComputeQuotaBase( - per_user=True, - cores=randint(0, 100), - instances=randint(0, 100), - ram=randint(1, 100), - ) -@pytest.fixture -def network_user_quota() -> NetworkQuotaBase: - return NetworkQuotaBase( - per_user=True, - public_ips=randint(0, 100), - networks=randint(0, 100), - ports=randint(1, 100), - security_groups=randint(1, 100), - security_group_rules=randint(1, 100), - ) - - -@pytest.fixture -def flavor() -> Flavor: - return Flavor( - name=random_lower_string(), - description=random_lower_string(), - disk=randint(0, 100), - is_public=getrandbits(1), - ram=randint(0, 100), - vcpus=randint(0, 100), - swap=randint(0, 100), - ephemeral=randint(0, 100), - is_disabled=False, - rxtx_factor=random_float(0, 100), - extra_specs={}, - ) +class CaseUserQuota: + @case(tags=["block_storage"]) + @parametrize(presence=[True, False]) + def case_block_storage_quota( + self, presence: bool, block_storage_quota: BlockStorageQuotaBase + ) -> Optional[BlockStorageQuotaBase]: + if presence: + block_storage_quota.per_user = True + return block_storage_quota + return None + @case(tags=["compute"]) + @parametrize(presence=[True, False]) + def case_compute_quota( + self, presence: bool, compute_quota: ComputeQuotaBase + ) -> Optional[ComputeQuotaBase]: + if presence: + compute_quota.per_user = True + return compute_quota + return None -@pytest.fixture -def image() -> Image: - return Image( - id=uuid4().hex, - name=random_lower_string(), - status="active", - owner=uuid4().hex, - os_type=random_image_os_type(), - os_distro=random_lower_string(), - os_version=random_lower_string(), - architecture=random_lower_string(), - kernel_id=random_lower_string(), - visibility=random_image_visibility(), - tags=["one"], - ) + @case(tags=["network"]) + @parametrize(presence=[True, False]) + def case_network_quota( + self, presence: bool, network_quota: NetworkQuotaBase + ) -> Optional[NetworkQuotaBase]: + if presence: + network_quota.per_user = True + return network_quota + return None -@pytest.fixture -def network() -> Network: - return Network( - id=uuid4().hex, - description=random_lower_string(), - name=random_lower_string(), - status="active", - project_id=uuid4().hex, - is_default=False, - is_router_external=getrandbits(1), - is_shared=getrandbits(1), - mtu=randint(1, 100), - ) +class CaseTagList: + @case(tags=["empty"]) + def case_empty_tag_list(self) -> Optional[List]: + return [] + @case(tags=["empty"]) + def case_no_list(self) -> Optional[List]: + return None -@pytest.fixture -def proxy() -> PrivateNetProxy: - return PrivateNetProxy(ip=random_ip(), user=random_lower_string()) + @case(tags=["full"]) + def case_list(self) -> Optional[List]: + return ["one"] @patch("src.providers.openstack.Connection") -@parametrize_with_cases("raised_err", cases=".", has_tag="endpoint_resp") -def test_no_block_storage_service(mock_conn: Mock, raised_err: bool) -> None: - with patch( - "src.providers.openstack.Connection.block_storage" - ) as mock_block_storage: - if raised_err: - mock_block_storage.get_endpoint.side_effect = EndpointNotFound() +@parametrize_with_cases("resp", cases=CaseEndpointResp) +@parametrize_with_cases("service", cases=CaseServiceType) +def test_no_block_storage_service( + mock_conn: Mock, resp: Optional[EndpointNotFound], service: str +) -> None: + """If the endpoint is not found or the service response is None, return None.""" + with patch(f"src.providers.openstack.Connection.{service}") as mock_srv: + if resp: + mock_srv.get_endpoint.side_effect = resp else: - mock_block_storage.get_endpoint.return_value = None - mock_conn.block_storage = mock_block_storage - assert not get_block_storage_service( - mock_conn, per_user_limits=None, project_id=uuid4().hex - ) + mock_srv.get_endpoint.return_value = resp + mock_conn.__setattr__(service, mock_srv) + type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) + if service == "block_storage": + assert not get_block_storage_service(mock_conn, per_user_limits=None) + elif service == "compute": + assert not get_compute_service(mock_conn, per_user_limits=None, tags=[]) + elif service == "network": + assert not get_network_service( + mock_conn, + per_user_limits=None, + tags=[], + default_private_net=None, + default_public_net=None, + proxy=None, + ) @patch("src.providers.openstack.Connection.block_storage") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("with_limits", cases=".", has_tag="user_quotas") +@parametrize_with_cases("user_quota", cases=CaseUserQuota, has_tag="block_storage") def test_retrieve_block_storage_service( mock_conn: Mock, mock_block_storage: Mock, - block_storage_provider_quota: BlockStorageQuotaSet, - block_storage_user_quota: BlockStorageQuotaBase, - with_limits: bool, + openstack_block_storage_quotas: BlockStorageQuotaSet, + user_quota: Optional[BlockStorageQuotaBase], ) -> None: + """Check quotas in the returned service.""" endpoint = random_url() mock_block_storage.get_endpoint.return_value = os.path.join(endpoint, uuid4().hex) - mock_block_storage.get_quota_set.return_value = block_storage_provider_quota + mock_block_storage.get_quota_set.return_value = openstack_block_storage_quotas mock_conn.block_storage = mock_block_storage type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) - item = get_block_storage_service( - mock_conn, - per_user_limits=block_storage_user_quota if with_limits else None, - project_id=uuid4().hex, - ) + item = get_block_storage_service(mock_conn, per_user_limits=user_quota) assert item.description == "" assert item.endpoint == endpoint assert item.type == ServiceType.BLOCK_STORAGE.value assert item.name == BlockStorageServiceName.OPENSTACK_CINDER.value - assert len(item.quotas) == 2 if with_limits else 1 - if with_limits: + assert len(item.quotas) == 2 if user_quota else 1 + if user_quota: assert not item.quotas[0].per_user assert item.quotas[1].per_user -@patch("src.providers.openstack.Connection") -@parametrize_with_cases("raised_err", cases=".", has_tag="endpoint_resp") -def test_no_compute_service(mock_conn: Mock, raised_err: bool) -> None: - with patch("src.providers.openstack.Connection.compute") as mock_compute: - if raised_err: - mock_compute.get_endpoint.side_effect = EndpointNotFound() - else: - mock_compute.get_endpoint.return_value = None - mock_conn.compute = mock_compute - assert not get_compute_service( - mock_conn, per_user_limits=None, project_id=uuid4().hex, tags=[] - ) - - @patch("src.providers.openstack.Connection.compute") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("with_limits", cases=".", has_tag="user_quotas") +@parametrize_with_cases("user_quota", cases=CaseUserQuota, has_tag="compute") def test_retrieve_compute_service( mock_conn: Mock, mock_compute: Mock, - compute_provider_quota: ComputeQuotaSet, - compute_user_quota: ComputeQuotaBase, - with_limits: bool, + openstack_compute_quotas: ComputeQuotaSet, + user_quota: Optional[ComputeQuotaBase], ) -> None: + """Check quotas in the returned service.""" endpoint = random_url() mock_compute.get_endpoint.return_value = endpoint - mock_compute.get_quota_set.return_value = compute_provider_quota + mock_compute.get_quota_set.return_value = openstack_compute_quotas mock_conn.compute = mock_compute type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) - item = get_compute_service( + item = get_compute_service(mock_conn, per_user_limits=user_quota, tags=[]) + assert item.description == "" + assert item.endpoint == endpoint + assert item.type == ServiceType.COMPUTE.value + assert item.name == ComputeServiceName.OPENSTACK_NOVA.value + assert len(item.quotas) == 2 if user_quota else 1 + if user_quota: + assert not item.quotas[0].per_user + assert item.quotas[1].per_user + + +@patch("src.providers.openstack.Connection.network") +@patch("src.providers.openstack.Connection") +@parametrize_with_cases("user_quota", cases=CaseUserQuota, has_tag="network") +def test_retrieve_network_service( + mock_conn: Mock, + mock_network: Mock, + openstack_network_quotas: NetworkQuota, + user_quota: Optional[NetworkQuotaBase], +) -> None: + """Check quotas in the returned service.""" + endpoint = random_url() + mock_network.get_endpoint.return_value = endpoint + mock_network.get_quota_set.return_value = openstack_network_quotas + mock_conn.network = mock_network + type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) + item = get_network_service( mock_conn, - per_user_limits=compute_user_quota if with_limits else None, - project_id=uuid4().hex, + per_user_limits=user_quota, tags=[], + default_private_net=None, + default_public_net=None, + proxy=None, ) assert item.description == "" assert item.endpoint == endpoint - assert item.type == ServiceType.COMPUTE.value - assert item.name == ComputeServiceName.OPENSTACK_NOVA.value - assert len(item.quotas) == 2 if with_limits else 1 - if with_limits: + assert item.type == ServiceType.NETWORK.value + assert item.name == NetworkServiceName.OPENSTACK_NEUTRON.value + assert len(item.quotas) == 2 if user_quota else 1 + if user_quota: assert not item.quotas[0].per_user assert item.quotas[1].per_user @@ -303,175 +208,142 @@ def test_retrieve_compute_service( @patch("src.providers.openstack.Connection.compute") @patch("src.providers.openstack.Connection") def test_retrieve_compute_service_with_flavors( - mock_conn: Mock, mock_compute: Mock, flavor: Flavor + mock_conn: Mock, mock_compute: Mock, openstack_flavor_base: Flavor ) -> None: - def get_allowed_project_ids(*args, **kwargs) -> List[Dict[str, str]]: - return [{"tenant_id": uuid4().hex}] - + """Check flavors in the returned service.""" endpoint = random_url() - flavors = [flavor] + flavors = [openstack_flavor_base] mock_compute.get_endpoint.return_value = endpoint mock_compute.flavors.return_value = flavors mock_compute.get_flavor_access.side_effect = get_allowed_project_ids mock_conn.compute = mock_compute type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) - item = get_compute_service( - mock_conn, - per_user_limits=None, - project_id=uuid4().hex, - tags=[], - ) + item = get_compute_service(mock_conn, per_user_limits=None, tags=[]) assert len(item.flavors) == 1 @patch("src.providers.openstack.Connection.image") @patch("src.providers.openstack.Connection.compute") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("tags", cases=".", has_tag="tags") +@parametrize_with_cases("tags", cases=CaseTagList) def test_retrieve_compute_service_with_images( mock_conn: Mock, mock_compute: Mock, mock_image: Mock, - image: Image, + openstack_image_with_tags: Image, tags: Optional[List[str]], ) -> None: + """Check images in the returned service. + + Check also that the tags filter is working properly. + """ endpoint = random_url() - images = list( - filter(lambda x: set(x.tags).intersection(set(tags if tags else [])), [image]) - ) + images = list(filter(lambda x: filter_images(x, tags), [openstack_image_with_tags])) mock_compute.get_endpoint.return_value = endpoint mock_conn.compute = mock_compute mock_image.images.return_value = images mock_conn.image = mock_image - type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) - item = get_compute_service( - mock_conn, - per_user_limits=None, - project_id=uuid4().hex, - tags=tags, + type(mock_conn).current_project_id = PropertyMock( + return_value=openstack_image_with_tags.owner_id ) + item = get_compute_service(mock_conn, per_user_limits=None, tags=tags) assert len(item.images) == len(images) -@patch("src.providers.openstack.Connection") -@parametrize_with_cases("raised_err", cases=".", has_tag="endpoint_resp") -def test_no_network_service(mock_conn: Mock, raised_err: bool) -> None: - with patch("src.providers.openstack.Connection.network") as mock_network: - if raised_err: - mock_network.get_endpoint.side_effect = EndpointNotFound() - else: - mock_network.get_endpoint.return_value = None - mock_conn.network = mock_network - assert not get_network_service( - mock_conn, - per_user_limits=None, - project_id=uuid4().hex, - tags=[], - default_private_net=None, - default_public_net=None, - proxy=None, - ) - - @patch("src.providers.openstack.Connection.network") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("with_limits", cases=".", has_tag="user_quotas") -def test_retrieve_network_service( +@parametrize_with_cases("tags", cases=CaseTagList) +def test_retrieve_network_service_with_networks( mock_conn: Mock, mock_network: Mock, - network_provider_quota: NetworkQuota, - network_user_quota: NetworkQuotaBase, - with_limits: bool, + openstack_network_with_tags: Network, + tags: Optional[List[str]], ) -> None: + """Check networks in the returned service. + + Check also that the tags filter is working properly. + """ endpoint = random_url() + networks = list( + filter(lambda x: filter_networks(x, tags), [openstack_network_with_tags]) + ) mock_network.get_endpoint.return_value = endpoint - mock_network.get_quota_set.return_value = network_provider_quota + mock_network.networks.return_value = networks mock_conn.network = mock_network - type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) + type(mock_conn).current_project_id = PropertyMock( + return_value=openstack_network_with_tags.project_id + ) item = get_network_service( mock_conn, - per_user_limits=network_user_quota if with_limits else None, - project_id=uuid4().hex, - tags=[], + per_user_limits=None, + tags=tags, default_private_net=None, default_public_net=None, proxy=None, ) - assert item.description == "" - assert item.endpoint == endpoint - assert item.type == ServiceType.NETWORK.value - assert item.name == NetworkServiceName.OPENSTACK_NEUTRON.value - assert len(item.quotas) == 2 if with_limits else 1 - if with_limits: - assert not item.quotas[0].per_user - assert item.quotas[1].per_user + assert len(item.networks) == len(networks) @patch("src.providers.openstack.Connection.network") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("tags", cases=".", has_tag="tags") -def test_retrieve_network_service_with_networks( - mock_conn: Mock, mock_network: Mock, network: Network, tags: Optional[List[str]] +def test_retrieve_network_service_default_net( + mock_conn: Mock, + mock_network: Mock, + openstack_priv_pub_network: Network, ) -> None: + """Check networks in the returned service, passing default net attributes.""" endpoint = random_url() - networks = list( - filter(lambda x: set(x.tags).intersection(set(tags if tags else [])), [network]) - ) + if openstack_priv_pub_network.is_shared: + default_private_net = None + default_public_net = openstack_priv_pub_network.name + else: + default_private_net = openstack_priv_pub_network.name + default_public_net = None + networks = [openstack_priv_pub_network] mock_network.get_endpoint.return_value = endpoint mock_network.networks.return_value = networks mock_conn.network = mock_network - type(mock_conn).current_project_id = PropertyMock(return_value=uuid4().hex) + type(mock_conn).current_project_id = PropertyMock( + return_value=openstack_priv_pub_network.project_id + ) item = get_network_service( mock_conn, per_user_limits=None, - project_id=uuid4().hex, - tags=tags, - default_private_net=None, - default_public_net=None, + tags=[], + default_private_net=default_private_net, + default_public_net=default_public_net, proxy=None, ) assert len(item.networks) == len(networks) + assert item.networks[0].is_default @patch("src.providers.openstack.Connection.network") @patch("src.providers.openstack.Connection") -@parametrize_with_cases("attr", cases=".", has_tag="network") -def test_retrieve_network_service_additional_attrs( +def test_retrieve_network_service_proxy( mock_conn: Mock, mock_network: Mock, - network: Network, - proxy: PrivateNetProxy, - attr: str, + openstack_network_base: Network, + net_proxy: PrivateNetProxy, ) -> None: + """Check networks in the returned service, passing proxy attributes.""" endpoint = random_url() - if attr == "priv_net": - network.is_default = False - network.is_shared = False - elif attr == "pub_net": - network.is_default = False - network.is_shared = True - networks = [network] - + networks = [openstack_network_base] mock_network.get_endpoint.return_value = endpoint mock_network.networks.return_value = networks mock_conn.network = mock_network - type(mock_conn).current_project_id = PropertyMock(return_value=network.project_id) - + type(mock_conn).current_project_id = PropertyMock( + return_value=openstack_network_base.project_id + ) item = get_network_service( mock_conn, per_user_limits=None, - project_id=uuid4().hex, tags=[], - default_private_net=network.name if attr == "priv_net" else None, - default_public_net=network.name if attr == "pub_net" else None, - proxy=proxy if attr == "proxy" else None, + default_private_net=None, + default_public_net=None, + proxy=net_proxy, ) assert len(item.networks) == len(networks) - if attr == "priv_net": - assert item.networks[0].is_default - elif attr == "pub_net": - assert item.networks[0].is_default - elif attr == "proxy": - assert item.networks[0].proxy_ip == str(proxy.ip) - assert item.networks[0].proxy_user == proxy.user + assert item.networks[0].proxy_ip == str(net_proxy.ip) + assert item.networks[0].proxy_user == net_proxy.user diff --git a/tests/providers/test_get_proj_res.py b/tests/providers/test_get_proj_res.py index e64ad8b..989cc4f 100644 --- a/tests/providers/test_get_proj_res.py +++ b/tests/providers/test_get_proj_res.py @@ -123,7 +123,6 @@ def test_no_matching_idp_when_retrieving_project_resources( @parametrize_with_cases("provider_type", cases=CaseProvider, has_tag="type") def test_no_conn_when_retrieving_project_resources( - caplog: pytest.LogCaptureFixture, provider_type: str, issuer: Issuer, openstack_provider: Openstack, @@ -144,7 +143,3 @@ def test_no_conn_when_retrieving_project_resources( issuers=[issuer], ) assert not resp - - if provider_type == ProviderType.OS: - msg = "Connection closed unexpectedly." - assert caplog.text.strip("\n").endswith(msg) diff --git a/tests/schemas/test_limits.py b/tests/schemas/test_limits.py index b614484..cb48968 100644 --- a/tests/schemas/test_limits.py +++ b/tests/schemas/test_limits.py @@ -1,7 +1,5 @@ -from random import randint from typing import Literal, Tuple, Union -import pytest from app.quota.enum import QuotaType from app.quota.schemas import BlockStorageQuotaBase, ComputeQuotaBase, NetworkQuotaBase from pytest_cases import parametrize_with_cases @@ -9,38 +7,6 @@ from src.models.provider import Limits -@pytest.fixture -def block_storage_quota() -> BlockStorageQuotaBase: - """Fixture with a BlockStorageQuotaBase.""" - return BlockStorageQuotaBase( - gigabytes=randint(0, 100), - per_volume_gigabytes=randint(0, 100), - volumes=randint(1, 100), - ) - - -@pytest.fixture -def compute_quota() -> ComputeQuotaBase: - """Fixture with a ComputeQuotaBase.""" - return ComputeQuotaBase( - cores=randint(0, 100), - instances=randint(0, 100), - ram=randint(1, 100), - ) - - -@pytest.fixture -def network_quota() -> NetworkQuotaBase: - """Fixture with a NetworkQuotaBase.""" - return NetworkQuotaBase( - public_ips=randint(0, 100), - networks=randint(0, 100), - ports=randint(1, 100), - security_groups=randint(1, 100), - security_group_rules=randint(1, 100), - ) - - class CaseQuotaType: def case_block_storage_quota( self, block_storage_quota: BlockStorageQuotaBase diff --git a/tests/schemas/test_region.py b/tests/schemas/test_region.py index a181a92..e5aa50d 100644 --- a/tests/schemas/test_region.py +++ b/tests/schemas/test_region.py @@ -1,17 +1,10 @@ from typing import Optional -import pytest from app.location.schemas import LocationBase from pytest_cases import parametrize, parametrize_with_cases from src.models.provider import Region -from tests.schemas.utils import random_country, random_lower_string - - -@pytest.fixture -def location() -> LocationBase: - """Fixture with an LocationBase without projects.""" - return LocationBase(site=random_lower_string(), country=random_country()) +from tests.schemas.utils import random_lower_string class CaseLocation: