Skip to content

Commit

Permalink
feature/mx 1744 expand accepted models of sink to rule items (#377)
Browse files Browse the repository at this point in the history
### Changes

- BREAKING: move ItemsContainer and PaginatedItemsContainer to
mex.common.models
- BREAKING: replace post_extracted_items with ingest and allow
AnyRuleSetResponses
- allow AnyRuleSetResponses as arguments to sinks
- BREAKING: sinks now yield the models they loaded, instead of just
their identifiers

---------

Co-authored-by: Nicolas Drebenstedt <[email protected]>
Co-authored-by: Nicolas Drebenstedt <[email protected]>
  • Loading branch information
3 people authored Feb 6, 2025
1 parent 24e6516 commit 8a34283
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 98 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changes

- BREAKING: move ItemsContainer and PaginatedItemsContainer to mex.common.models
- BREAKING: replace post_extracted_items with ingest and allow AnyRuleSetResponses
- allow AnyRuleSetResponses as arguments to sinks
- BREAKING: sinks now yield the models they loaded, instead of just their identifiers

### Deprecated

### Removed
Expand Down
27 changes: 16 additions & 11 deletions mex/common/backend_api/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
from requests.exceptions import HTTPError

from mex.common.backend_api.models import (
IdentifiersResponse,
ItemsContainer,
MergedModelTypeAdapter,
PaginatedItemsContainer,
RuleSetResponseTypeAdapter,
)
from mex.common.connector import HTTPConnector
Expand All @@ -16,6 +13,8 @@
AnyPreviewModel,
AnyRuleSetRequest,
AnyRuleSetResponse,
ItemsContainer,
PaginatedItemsContainer,
)
from mex.common.settings import BaseSettings

Expand All @@ -40,28 +39,34 @@ def _set_url(self) -> None:
settings = BaseSettings.get()
self.url = urljoin(str(settings.backend_api_url), self.API_VERSION)

def post_extracted_items(
def ingest(
self,
extracted_items: list[AnyExtractedModel],
) -> IdentifiersResponse:
"""Post extracted items to the backend in bulk.
models_or_rule_sets: list[AnyExtractedModel | AnyRuleSetResponse],
) -> list[AnyExtractedModel | AnyRuleSetResponse]:
"""Post extracted models or rule-sets to the backend in bulk.
Args:
extracted_items: Extracted items to post
models_or_rule_sets: Extracted models or rule-sets to ingest
Raises:
HTTPError: If post was not accepted, crashes or times out
Returns:
Response model from the endpoint
List of extracted models or rule-sets from the endpoint
"""
response = self.request(
method="POST",
endpoint="ingest",
payload=ItemsContainer[AnyExtractedModel](items=extracted_items),
payload=ItemsContainer[AnyExtractedModel | AnyRuleSetResponse](
items=models_or_rule_sets
),
timeout=self.INGEST_TIMEOUT,
)
return IdentifiersResponse.model_validate(response)
return (
ItemsContainer[AnyExtractedModel | AnyRuleSetResponse]
.model_validate(response)
.items
)

def fetch_extracted_items(
self,
Expand Down
27 changes: 2 additions & 25 deletions mex/common/backend_api/models.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,8 @@
from typing import Annotated, Generic, TypeVar
from typing import Annotated

from pydantic import Field, TypeAdapter

from mex.common.models import AnyMergedModel, AnyRuleSetResponse, BaseModel
from mex.common.types import Identifier

T = TypeVar("T")


class ItemsContainer(BaseModel, Generic[T]):
"""Generic container that contains items."""

items: list[T]


class PaginatedItemsContainer(BaseModel, Generic[T]):
"""Generic container that contains items and has a total item count."""

items: list[T]
total: int


class IdentifiersResponse(BaseModel):
"""Response models for a list of identifiers."""

identifiers: list[Identifier]

from mex.common.models import AnyMergedModel, AnyRuleSetResponse

MergedModelTypeAdapter: TypeAdapter[AnyMergedModel] = TypeAdapter(
Annotated[AnyMergedModel, Field(discriminator="entityType")]
Expand Down
2 changes: 1 addition & 1 deletion mex/common/identity/backend_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from functools import cache

from mex.common.backend_api.connector import BackendApiConnector
from mex.common.backend_api.models import ItemsContainer
from mex.common.identity.base import BaseProvider
from mex.common.identity.models import Identity
from mex.common.models import ItemsContainer
from mex.common.types import Identifier, MergedPrimarySourceIdentifier


Expand Down
6 changes: 6 additions & 0 deletions mex/common/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@
PreviewActivity,
SubtractiveActivity,
)
from mex.common.models.base.container import (
ItemsContainer,
PaginatedItemsContainer,
)
from mex.common.models.base.extracted_data import ExtractedData
from mex.common.models.base.filter import BaseFilter, FilterField, FilterRule
from mex.common.models.base.mapping import BaseMapping, MappingField, MappingRule
Expand Down Expand Up @@ -353,6 +357,7 @@
"ExtractedVariableGroup",
"FilterField",
"FilterRule",
"ItemsContainer",
"MappingField",
"MappingRule",
"MergedAccessPlatform",
Expand All @@ -377,6 +382,7 @@
"OrganizationalUnitMapping",
"OrganizationalUnitRuleSetRequest",
"OrganizationalUnitRuleSetResponse",
"PaginatedItemsContainer",
"PersonFilter",
"PersonMapping",
"PersonRuleSetRequest",
Expand Down
18 changes: 18 additions & 0 deletions mex/common/models/base/container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Generic, TypeVar

from pydantic import BaseModel

T = TypeVar("T")


class ItemsContainer(BaseModel, Generic[T]):
"""Generic container that contains items."""

items: list[T]


class PaginatedItemsContainer(BaseModel, Generic[T]):
"""Generic container that contains items and has a total item count."""

items: list[T]
total: int
20 changes: 9 additions & 11 deletions mex/common/sinks/backend_api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from collections.abc import Generator, Iterable
from typing import cast

from mex.common.backend_api.connector import BackendApiConnector
from mex.common.logging import logger
from mex.common.models import AnyExtractedModel
from mex.common.models import AnyExtractedModel, AnyRuleSetResponse
from mex.common.sinks.base import BaseSink
from mex.common.types import AnyExtractedIdentifier
from mex.common.utils import grouper


Expand All @@ -16,21 +14,21 @@ class BackendApiSink(BaseSink):

def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Load models to the Backend API using bulk insertion.
models_or_rule_sets: Iterable[AnyExtractedModel | AnyRuleSetResponse],
) -> Generator[AnyExtractedModel | AnyRuleSetResponse, None, None]:
"""Load extracted models or rule-sets to the Backend API using bulk insertion.
Args:
models: Iterable of extracted models
models_or_rule_sets: Iterable of extracted models or rule-sets
Returns:
Generator for identifiers of posted models
Generator for posted models
"""
total_count = 0
connector = BackendApiConnector.get()
for chunk in grouper(self.CHUNK_SIZE, models):
for chunk in grouper(self.CHUNK_SIZE, models_or_rule_sets):
model_list = [model for model in chunk if model is not None]
response = connector.post_extracted_items(model_list)
connector.ingest(model_list)
total_count += len(model_list)
yield from cast(list[AnyExtractedIdentifier], response.identifiers)
yield from model_list
logger.info("%s - written %s models", type(self).__name__, total_count)
14 changes: 8 additions & 6 deletions mex/common/sinks/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from abc import abstractmethod
from collections.abc import Iterable
from collections.abc import Generator, Iterable

from mex.common.connector import BaseConnector
from mex.common.models import AnyExtractedModel
from mex.common.types import Identifier
from mex.common.models import AnyExtractedModel, AnyRuleSetResponse


class BaseSink(BaseConnector):
Expand All @@ -17,7 +16,10 @@ def close(self) -> None:

@abstractmethod
def load(
self, models: Iterable[AnyExtractedModel]
) -> Iterable[Identifier]: # pragma: no cover
"""Iteratively load models to a destination and yield their identifiers."""
self,
models_or_rule_sets: Iterable[AnyExtractedModel | AnyRuleSetResponse],
) -> Generator[
AnyExtractedModel | AnyRuleSetResponse, None, None
]: # pragma: no cover
"""Load extracted models or rule-sets to a destination and yield them."""
...
25 changes: 12 additions & 13 deletions mex/common/sinks/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
from collections.abc import Generator, Iterable
from contextlib import ExitStack
from pathlib import Path
from typing import IO, Any
from typing import IO, Any, TypeVar

from mex.common.logging import logger
from mex.common.models import AnyExtractedModel
from mex.common.models.base.model import BaseModel
from mex.common.settings import BaseSettings
from mex.common.sinks.base import BaseSink
from mex.common.transform import MExEncoder
from mex.common.types import AnyExtractedIdentifier
from mex.common.utils import grouper

T = TypeVar("T", bound=BaseModel)


class NdjsonSink(BaseSink):
"""Sink to load models into new-line delimited JSON files."""
Expand All @@ -27,17 +28,14 @@ def __init__(self) -> None:
def close(self) -> None:
"""Nothing to close, since load already closes all file handles."""

def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Write models into a new-line delimited JSON file.
def load(self, models: Iterable[T]) -> Generator[T, None, None]:
"""Write any models into a new-line delimited JSON file.
Args:
models: Iterable of extracted models to write
models: Iterable of any kind of models
Returns:
Generator for identifiers of written models
Generator for the loaded models
"""
file_handles: dict[str, IO[Any]] = {}
total_count = 0
Expand All @@ -59,7 +57,8 @@ def load(
class_name,
file_name.as_posix(),
)
fh.write(f"{json.dumps(model, sort_keys=True, cls=MExEncoder)}\n")
dumped_json = json.dumps(model, sort_keys=True, cls=MExEncoder)
fh.write(f"{dumped_json}\n")
total_count += 1
yield model.identifier
logger.info("%s - written %s models", type(self).__name__, total_count)
yield model
logger.info("%s - written %s items", type(self).__name__, total_count)
12 changes: 6 additions & 6 deletions mex/common/sinks/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from itertools import tee
from typing import Final

from mex.common.models import AnyExtractedModel
from mex.common.models import AnyExtractedModel, AnyRuleSetResponse
from mex.common.settings import BaseSettings
from mex.common.sinks.backend_api import BackendApiSink
from mex.common.sinks.base import BaseSink
from mex.common.sinks.ndjson import NdjsonSink
from mex.common.types import Identifier, Sink
from mex.common.types import Sink

_SINK_REGISTRY: Final[dict[Sink, type["BaseSink"]]] = {}

Expand Down Expand Up @@ -37,11 +37,11 @@ def close(self) -> None:

def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[Identifier, None, None]:
"""Load models to multiple sinks simultaneously."""
models_or_rule_sets: Iterable[AnyExtractedModel | AnyRuleSetResponse],
) -> Generator[AnyExtractedModel | AnyRuleSetResponse, None, None]:
"""Load models or rule-sets to multiple sinks simultaneously."""
for sink, model_gen in zip(
self._sinks, tee(models, len(self._sinks)), strict=True
self._sinks, tee(models_or_rule_sets, len(self._sinks)), strict=True
):
yield from sink.load(model_gen)

Expand Down
11 changes: 6 additions & 5 deletions tests/backend_api/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from requests.exceptions import HTTPError

from mex.common.backend_api.connector import BackendApiConnector
from mex.common.backend_api.models import ItemsContainer, PaginatedItemsContainer
from mex.common.models import (
AnyExtractedModel,
AnyPreviewModel,
ExtractedPerson,
ItemsContainer,
MergedPerson,
PaginatedItemsContainer,
PersonRuleSetRequest,
PersonRuleSetResponse,
PreviewPerson,
Expand All @@ -24,16 +25,16 @@ def test_set_authentication_mocked() -> None:
assert connector.session.headers["X-API-Key"] == "dummy_write_key"


def test_post_extracted_items_mocked(
def test_ingest_mocked(
mocked_backend: MagicMock, extracted_person: ExtractedPerson
) -> None:
mocked_return = {"identifiers": [extracted_person.identifier]}
mocked_return = {"items": [extracted_person]}
mocked_backend.return_value.json.return_value = mocked_return

connector = BackendApiConnector.get()
response = connector.post_extracted_items([extracted_person])
response = connector.ingest([extracted_person])

assert response.identifiers == [extracted_person.identifier]
assert response == [extracted_person]
assert mocked_backend.call_args == call(
"POST",
"http://localhost:8080/v0/ingest",
Expand Down
17 changes: 7 additions & 10 deletions tests/sinks/test_backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from pytest import MonkeyPatch

from mex.common.backend_api.connector import BackendApiConnector
from mex.common.backend_api.models import IdentifiersResponse
from mex.common.models import ExtractedPerson
from mex.common.models import ExtractedPerson, ItemsContainer
from mex.common.sinks.backend_api import BackendApiSink


Expand All @@ -16,13 +15,11 @@ def __init__(self: BackendApiConnector) -> None:

monkeypatch.setattr(BackendApiConnector, "__init__", __init__)

response = IdentifiersResponse(identifiers=[extracted_person.identifier])
post_extracted_items = Mock(return_value=response)
monkeypatch.setattr(
BackendApiConnector, "post_extracted_items", post_extracted_items
)
response = ItemsContainer[ExtractedPerson](items=[extracted_person])
ingest = Mock(return_value=response)
monkeypatch.setattr(BackendApiConnector, "ingest", ingest)

sink = BackendApiSink.get()
model_ids = list(sink.load([extracted_person]))
assert model_ids == response.identifiers
post_extracted_items.assert_called_once_with([extracted_person])
models_or_rule_sets = list(sink.load([extracted_person]))
assert models_or_rule_sets == [extracted_person]
ingest.assert_called_once_with([extracted_person])
Loading

0 comments on commit 8a34283

Please sign in to comment.