From 8a342834d3a9b317a7382f11e40393c2fbdc739b Mon Sep 17 00:00:00 2001 From: rababerladuseladim Date: Thu, 6 Feb 2025 12:03:53 +0100 Subject: [PATCH] feature/mx 1744 expand accepted models of sink to rule items (#377) ### Changes - BREAKING: move ItemsContainer and PaginatedItemsContainer to mex.common.models - BREAKING: replace post_extracted_items with ingest and allow AnyRuleSetResponses - allow AnyRuleSetResponses as arguments to sinks - BREAKING: sinks now yield the models they loaded, instead of just their identifiers --------- Co-authored-by: Nicolas Drebenstedt <897972+cutoffthetop@users.noreply.github.com> Co-authored-by: Nicolas Drebenstedt --- CHANGELOG.md | 5 +++++ mex/common/backend_api/connector.py | 27 ++++++++++++++++----------- mex/common/backend_api/models.py | 27 ++------------------------- mex/common/identity/backend_api.py | 2 +- mex/common/models/__init__.py | 6 ++++++ mex/common/models/base/container.py | 18 ++++++++++++++++++ mex/common/sinks/backend_api.py | 20 +++++++++----------- mex/common/sinks/base.py | 14 ++++++++------ mex/common/sinks/ndjson.py | 25 ++++++++++++------------- mex/common/sinks/registry.py | 12 ++++++------ tests/backend_api/test_connector.py | 11 ++++++----- tests/sinks/test_backend_api.py | 17 +++++++---------- tests/sinks/test_ndjson.py | 26 ++++++++++++++++---------- 13 files changed, 112 insertions(+), 98 deletions(-) create mode 100644 mex/common/models/base/container.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b7dbacd..1d5eed7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes +- BREAKING: move ItemsContainer and PaginatedItemsContainer to mex.common.models +- BREAKING: replace post_extracted_items with ingest and allow AnyRuleSetResponses +- allow AnyRuleSetResponses as arguments to sinks +- BREAKING: sinks now yield the models they loaded, instead of just their identifiers + ### Deprecated ### Removed diff --git a/mex/common/backend_api/connector.py b/mex/common/backend_api/connector.py index 8f5bb32a..aa99c660 100644 --- a/mex/common/backend_api/connector.py +++ b/mex/common/backend_api/connector.py @@ -3,10 +3,7 @@ from requests.exceptions import HTTPError from mex.common.backend_api.models import ( - IdentifiersResponse, - ItemsContainer, MergedModelTypeAdapter, - PaginatedItemsContainer, RuleSetResponseTypeAdapter, ) from mex.common.connector import HTTPConnector @@ -16,6 +13,8 @@ AnyPreviewModel, AnyRuleSetRequest, AnyRuleSetResponse, + ItemsContainer, + PaginatedItemsContainer, ) from mex.common.settings import BaseSettings @@ -40,28 +39,34 @@ def _set_url(self) -> None: settings = BaseSettings.get() self.url = urljoin(str(settings.backend_api_url), self.API_VERSION) - def post_extracted_items( + def ingest( self, - extracted_items: list[AnyExtractedModel], - ) -> IdentifiersResponse: - """Post extracted items to the backend in bulk. + models_or_rule_sets: list[AnyExtractedModel | AnyRuleSetResponse], + ) -> list[AnyExtractedModel | AnyRuleSetResponse]: + """Post extracted models or rule-sets to the backend in bulk. Args: - extracted_items: Extracted items to post + models_or_rule_sets: Extracted models or rule-sets to ingest Raises: HTTPError: If post was not accepted, crashes or times out Returns: - Response model from the endpoint + List of extracted models or rule-sets from the endpoint """ response = self.request( method="POST", endpoint="ingest", - payload=ItemsContainer[AnyExtractedModel](items=extracted_items), + payload=ItemsContainer[AnyExtractedModel | AnyRuleSetResponse]( + items=models_or_rule_sets + ), timeout=self.INGEST_TIMEOUT, ) - return IdentifiersResponse.model_validate(response) + return ( + ItemsContainer[AnyExtractedModel | AnyRuleSetResponse] + .model_validate(response) + .items + ) def fetch_extracted_items( self, diff --git a/mex/common/backend_api/models.py b/mex/common/backend_api/models.py index 63593bb3..c72708d4 100644 --- a/mex/common/backend_api/models.py +++ b/mex/common/backend_api/models.py @@ -1,31 +1,8 @@ -from typing import Annotated, Generic, TypeVar +from typing import Annotated from pydantic import Field, TypeAdapter -from mex.common.models import AnyMergedModel, AnyRuleSetResponse, BaseModel -from mex.common.types import Identifier - -T = TypeVar("T") - - -class ItemsContainer(BaseModel, Generic[T]): - """Generic container that contains items.""" - - items: list[T] - - -class PaginatedItemsContainer(BaseModel, Generic[T]): - """Generic container that contains items and has a total item count.""" - - items: list[T] - total: int - - -class IdentifiersResponse(BaseModel): - """Response models for a list of identifiers.""" - - identifiers: list[Identifier] - +from mex.common.models import AnyMergedModel, AnyRuleSetResponse MergedModelTypeAdapter: TypeAdapter[AnyMergedModel] = TypeAdapter( Annotated[AnyMergedModel, Field(discriminator="entityType")] diff --git a/mex/common/identity/backend_api.py b/mex/common/identity/backend_api.py index d1dcb757..852bb242 100644 --- a/mex/common/identity/backend_api.py +++ b/mex/common/identity/backend_api.py @@ -1,9 +1,9 @@ from functools import cache from mex.common.backend_api.connector import BackendApiConnector -from mex.common.backend_api.models import ItemsContainer from mex.common.identity.base import BaseProvider from mex.common.identity.models import Identity +from mex.common.models import ItemsContainer from mex.common.types import Identifier, MergedPrimarySourceIdentifier diff --git a/mex/common/models/__init__.py b/mex/common/models/__init__.py index 5884efc0..9ac4413b 100644 --- a/mex/common/models/__init__.py +++ b/mex/common/models/__init__.py @@ -98,6 +98,10 @@ PreviewActivity, SubtractiveActivity, ) +from mex.common.models.base.container import ( + ItemsContainer, + PaginatedItemsContainer, +) from mex.common.models.base.extracted_data import ExtractedData from mex.common.models.base.filter import BaseFilter, FilterField, FilterRule from mex.common.models.base.mapping import BaseMapping, MappingField, MappingRule @@ -353,6 +357,7 @@ "ExtractedVariableGroup", "FilterField", "FilterRule", + "ItemsContainer", "MappingField", "MappingRule", "MergedAccessPlatform", @@ -377,6 +382,7 @@ "OrganizationalUnitMapping", "OrganizationalUnitRuleSetRequest", "OrganizationalUnitRuleSetResponse", + "PaginatedItemsContainer", "PersonFilter", "PersonMapping", "PersonRuleSetRequest", diff --git a/mex/common/models/base/container.py b/mex/common/models/base/container.py new file mode 100644 index 00000000..81d429b6 --- /dev/null +++ b/mex/common/models/base/container.py @@ -0,0 +1,18 @@ +from typing import Generic, TypeVar + +from pydantic import BaseModel + +T = TypeVar("T") + + +class ItemsContainer(BaseModel, Generic[T]): + """Generic container that contains items.""" + + items: list[T] + + +class PaginatedItemsContainer(BaseModel, Generic[T]): + """Generic container that contains items and has a total item count.""" + + items: list[T] + total: int diff --git a/mex/common/sinks/backend_api.py b/mex/common/sinks/backend_api.py index b5fa2e34..0711e6ec 100644 --- a/mex/common/sinks/backend_api.py +++ b/mex/common/sinks/backend_api.py @@ -1,11 +1,9 @@ from collections.abc import Generator, Iterable -from typing import cast from mex.common.backend_api.connector import BackendApiConnector from mex.common.logging import logger -from mex.common.models import AnyExtractedModel +from mex.common.models import AnyExtractedModel, AnyRuleSetResponse from mex.common.sinks.base import BaseSink -from mex.common.types import AnyExtractedIdentifier from mex.common.utils import grouper @@ -16,21 +14,21 @@ class BackendApiSink(BaseSink): def load( self, - models: Iterable[AnyExtractedModel], - ) -> Generator[AnyExtractedIdentifier, None, None]: - """Load models to the Backend API using bulk insertion. + models_or_rule_sets: Iterable[AnyExtractedModel | AnyRuleSetResponse], + ) -> Generator[AnyExtractedModel | AnyRuleSetResponse, None, None]: + """Load extracted models or rule-sets to the Backend API using bulk insertion. Args: - models: Iterable of extracted models + models_or_rule_sets: Iterable of extracted models or rule-sets Returns: - Generator for identifiers of posted models + Generator for posted models """ total_count = 0 connector = BackendApiConnector.get() - for chunk in grouper(self.CHUNK_SIZE, models): + for chunk in grouper(self.CHUNK_SIZE, models_or_rule_sets): model_list = [model for model in chunk if model is not None] - response = connector.post_extracted_items(model_list) + connector.ingest(model_list) total_count += len(model_list) - yield from cast(list[AnyExtractedIdentifier], response.identifiers) + yield from model_list logger.info("%s - written %s models", type(self).__name__, total_count) diff --git a/mex/common/sinks/base.py b/mex/common/sinks/base.py index 673002a8..751f3d94 100644 --- a/mex/common/sinks/base.py +++ b/mex/common/sinks/base.py @@ -1,9 +1,8 @@ from abc import abstractmethod -from collections.abc import Iterable +from collections.abc import Generator, Iterable from mex.common.connector import BaseConnector -from mex.common.models import AnyExtractedModel -from mex.common.types import Identifier +from mex.common.models import AnyExtractedModel, AnyRuleSetResponse class BaseSink(BaseConnector): @@ -17,7 +16,10 @@ def close(self) -> None: @abstractmethod def load( - self, models: Iterable[AnyExtractedModel] - ) -> Iterable[Identifier]: # pragma: no cover - """Iteratively load models to a destination and yield their identifiers.""" + self, + models_or_rule_sets: Iterable[AnyExtractedModel | AnyRuleSetResponse], + ) -> Generator[ + AnyExtractedModel | AnyRuleSetResponse, None, None + ]: # pragma: no cover + """Load extracted models or rule-sets to a destination and yield them.""" ... diff --git a/mex/common/sinks/ndjson.py b/mex/common/sinks/ndjson.py index 1bb4cf58..a4231915 100644 --- a/mex/common/sinks/ndjson.py +++ b/mex/common/sinks/ndjson.py @@ -2,16 +2,17 @@ from collections.abc import Generator, Iterable from contextlib import ExitStack from pathlib import Path -from typing import IO, Any +from typing import IO, Any, TypeVar from mex.common.logging import logger -from mex.common.models import AnyExtractedModel +from mex.common.models.base.model import BaseModel from mex.common.settings import BaseSettings from mex.common.sinks.base import BaseSink from mex.common.transform import MExEncoder -from mex.common.types import AnyExtractedIdentifier from mex.common.utils import grouper +T = TypeVar("T", bound=BaseModel) + class NdjsonSink(BaseSink): """Sink to load models into new-line delimited JSON files.""" @@ -27,17 +28,14 @@ def __init__(self) -> None: def close(self) -> None: """Nothing to close, since load already closes all file handles.""" - def load( - self, - models: Iterable[AnyExtractedModel], - ) -> Generator[AnyExtractedIdentifier, None, None]: - """Write models into a new-line delimited JSON file. + def load(self, models: Iterable[T]) -> Generator[T, None, None]: + """Write any models into a new-line delimited JSON file. Args: - models: Iterable of extracted models to write + models: Iterable of any kind of models Returns: - Generator for identifiers of written models + Generator for the loaded models """ file_handles: dict[str, IO[Any]] = {} total_count = 0 @@ -59,7 +57,8 @@ def load( class_name, file_name.as_posix(), ) - fh.write(f"{json.dumps(model, sort_keys=True, cls=MExEncoder)}\n") + dumped_json = json.dumps(model, sort_keys=True, cls=MExEncoder) + fh.write(f"{dumped_json}\n") total_count += 1 - yield model.identifier - logger.info("%s - written %s models", type(self).__name__, total_count) + yield model + logger.info("%s - written %s items", type(self).__name__, total_count) diff --git a/mex/common/sinks/registry.py b/mex/common/sinks/registry.py index d61d35cd..7770c79b 100644 --- a/mex/common/sinks/registry.py +++ b/mex/common/sinks/registry.py @@ -2,12 +2,12 @@ from itertools import tee from typing import Final -from mex.common.models import AnyExtractedModel +from mex.common.models import AnyExtractedModel, AnyRuleSetResponse from mex.common.settings import BaseSettings from mex.common.sinks.backend_api import BackendApiSink from mex.common.sinks.base import BaseSink from mex.common.sinks.ndjson import NdjsonSink -from mex.common.types import Identifier, Sink +from mex.common.types import Sink _SINK_REGISTRY: Final[dict[Sink, type["BaseSink"]]] = {} @@ -37,11 +37,11 @@ def close(self) -> None: def load( self, - models: Iterable[AnyExtractedModel], - ) -> Generator[Identifier, None, None]: - """Load models to multiple sinks simultaneously.""" + models_or_rule_sets: Iterable[AnyExtractedModel | AnyRuleSetResponse], + ) -> Generator[AnyExtractedModel | AnyRuleSetResponse, None, None]: + """Load models or rule-sets to multiple sinks simultaneously.""" for sink, model_gen in zip( - self._sinks, tee(models, len(self._sinks)), strict=True + self._sinks, tee(models_or_rule_sets, len(self._sinks)), strict=True ): yield from sink.load(model_gen) diff --git a/tests/backend_api/test_connector.py b/tests/backend_api/test_connector.py index ae11aad7..76c19183 100644 --- a/tests/backend_api/test_connector.py +++ b/tests/backend_api/test_connector.py @@ -5,12 +5,13 @@ from requests.exceptions import HTTPError from mex.common.backend_api.connector import BackendApiConnector -from mex.common.backend_api.models import ItemsContainer, PaginatedItemsContainer from mex.common.models import ( AnyExtractedModel, AnyPreviewModel, ExtractedPerson, + ItemsContainer, MergedPerson, + PaginatedItemsContainer, PersonRuleSetRequest, PersonRuleSetResponse, PreviewPerson, @@ -24,16 +25,16 @@ def test_set_authentication_mocked() -> None: assert connector.session.headers["X-API-Key"] == "dummy_write_key" -def test_post_extracted_items_mocked( +def test_ingest_mocked( mocked_backend: MagicMock, extracted_person: ExtractedPerson ) -> None: - mocked_return = {"identifiers": [extracted_person.identifier]} + mocked_return = {"items": [extracted_person]} mocked_backend.return_value.json.return_value = mocked_return connector = BackendApiConnector.get() - response = connector.post_extracted_items([extracted_person]) + response = connector.ingest([extracted_person]) - assert response.identifiers == [extracted_person.identifier] + assert response == [extracted_person] assert mocked_backend.call_args == call( "POST", "http://localhost:8080/v0/ingest", diff --git a/tests/sinks/test_backend_api.py b/tests/sinks/test_backend_api.py index d8015184..fd6bd762 100644 --- a/tests/sinks/test_backend_api.py +++ b/tests/sinks/test_backend_api.py @@ -3,8 +3,7 @@ from pytest import MonkeyPatch from mex.common.backend_api.connector import BackendApiConnector -from mex.common.backend_api.models import IdentifiersResponse -from mex.common.models import ExtractedPerson +from mex.common.models import ExtractedPerson, ItemsContainer from mex.common.sinks.backend_api import BackendApiSink @@ -16,13 +15,11 @@ def __init__(self: BackendApiConnector) -> None: monkeypatch.setattr(BackendApiConnector, "__init__", __init__) - response = IdentifiersResponse(identifiers=[extracted_person.identifier]) - post_extracted_items = Mock(return_value=response) - monkeypatch.setattr( - BackendApiConnector, "post_extracted_items", post_extracted_items - ) + response = ItemsContainer[ExtractedPerson](items=[extracted_person]) + ingest = Mock(return_value=response) + monkeypatch.setattr(BackendApiConnector, "ingest", ingest) sink = BackendApiSink.get() - model_ids = list(sink.load([extracted_person])) - assert model_ids == response.identifiers - post_extracted_items.assert_called_once_with([extracted_person]) + models_or_rule_sets = list(sink.load([extracted_person])) + assert models_or_rule_sets == [extracted_person] + ingest.assert_called_once_with([extracted_person]) diff --git a/tests/sinks/test_ndjson.py b/tests/sinks/test_ndjson.py index 07ec4f6e..79934d0d 100644 --- a/tests/sinks/test_ndjson.py +++ b/tests/sinks/test_ndjson.py @@ -3,7 +3,7 @@ from pydantic import UUID4 -from mex.common.models import ExtractedData +from mex.common.models import BaseModel from mex.common.settings import BaseSettings from mex.common.sinks.ndjson import NdjsonSink from mex.common.types import Identifier, TemporalEntity @@ -13,7 +13,7 @@ class DummyEnum(Enum): NAME = "value" -class ExtractedThing(ExtractedData): +class Thing(BaseModel): identifier: Identifier str_attr: str enum_attr: DummyEnum | None = None @@ -25,15 +25,21 @@ def test_sink_load() -> None: settings = BaseSettings.get() test_models = [ - ExtractedThing.model_construct(identifier="1", str_attr="foo"), - ExtractedThing.model_construct( - identifier="2", str_attr="bar", enum_attr=DummyEnum.NAME + Thing(identifier=Identifier.generate(seed=1), str_attr="foo"), + Thing( + identifier=Identifier.generate(seed=2), + str_attr="bar", + enum_attr=DummyEnum.NAME, ), - ExtractedThing.model_construct( - identifier="3", str_attr="baz", uuid_attr=UUID(int=42, version=4) + Thing( + identifier=Identifier.generate(seed=3), + str_attr="baz", + uuid_attr=UUID(int=42, version=4), ), - ExtractedThing.model_construct( - identifier="4", str_attr="dat", ts_attr=TemporalEntity(2000, 1, 1) + Thing( + identifier=Identifier.generate(seed=4), + str_attr="dat", + ts_attr=TemporalEntity(2000, 1, 1), ), ] @@ -41,7 +47,7 @@ def test_sink_load() -> None: ids = list(sink.load(test_models)) assert len(ids) - with open(settings.work_dir / "ExtractedThing.ndjson") as handle: + with open(settings.work_dir / "Thing.ndjson") as handle: output = handle.read() expected = """\