diff --git a/.run/SearchService.run.xml b/.run/SearchService.run.xml
index 25444065ea..34f704c771 100644
--- a/.run/SearchService.run.xml
+++ b/.run/SearchService.run.xml
@@ -15,4 +15,4 @@
-
+
\ No newline at end of file
diff --git a/deployment/helm/ditto/Chart.yaml b/deployment/helm/ditto/Chart.yaml
index 0e73c5a1ae..b21a183a8d 100644
--- a/deployment/helm/ditto/Chart.yaml
+++ b/deployment/helm/ditto/Chart.yaml
@@ -16,7 +16,7 @@ description: |
A digital twin is a virtual, cloud based, representation of his real world counterpart
(real world “Things”, e.g. devices like sensors, smart heating, connected cars, smart grids, EV charging stations etc).
type: application
-version: 3.4.4 # chart version is effectively set by release-job
+version: 3.4.5 # chart version is effectively set by release-job
appVersion: 3.4.4
keywords:
- iot-chart
diff --git a/deployment/helm/ditto/templates/thingssearch-deployment.yaml b/deployment/helm/ditto/templates/thingssearch-deployment.yaml
index aabaccdfc4..261b1256f9 100644
--- a/deployment/helm/ditto/templates/thingssearch-deployment.yaml
+++ b/deployment/helm/ditto/templates/thingssearch-deployment.yaml
@@ -136,6 +136,15 @@ spec:
{{- if .Values.global.logging.customConfigFile.enabled }}
-Dlogback.configurationFile=/opt/ditto/{{ .Values.global.logging.customConfigFile.fileName }}
{{- end }}
+ {{- if .Values.thingsSearch.config.indexedFieldsLimiting.enabled }}
+ -Dditto.extensions.caching-signal-enrichment-facade-provider=org.eclipse.ditto.thingsearch.service.persistence.write.streaming.SearchIndexingSignalEnrichmentFacadeProvider
+ {{- range $index, $value := .Values.thingsSearch.config.indexedFieldsLimiting.items }}
+ "{{ printf "%s%d%s=%s" "-Dditto.search.namespace-indexed-fields." $index ".namespace-pattern" $value.namespacePattern }}"
+ {{- range $fieldIndex, $indexedField := $value.indexedFields }}
+ "{{ printf "%s%d%s%d=%s" "-Dditto.search.namespace-indexed-fields." $index ".indexed-fields." $fieldIndex $indexedField }}"
+ {{- end }}
+ {{- end }}
+ {{- end }}
{{- range $key, $value := .Values.thingsSearch.config.operatorMetrics.customMetrics }}
"{{ printf "%s%s%s=%t" "-Dditto.search.operator-metrics.custom-metrics." $key ".enabled" $value.enabled }}"
"{{ printf "%s%s%s=%s" "-Dditto.search.operator-metrics.custom-metrics." $key ".scrape-interval" $value.scrapeInterval }}"
diff --git a/deployment/helm/ditto/values.yaml b/deployment/helm/ditto/values.yaml
index 483601d1fe..e1cfdc2763 100644
--- a/deployment/helm/ditto/values.yaml
+++ b/deployment/helm/ditto/values.yaml
@@ -1077,6 +1077,20 @@ thingsSearch:
throughput: 100
# period the throttle period
period: 30s
+ # indexedFieldsLimiting by default, Ditto indexed all fields of things in its search.
+ # However, this behavior can be customized, providing configuration to only index certain fields for specified namespaces.
+ indexedFieldsLimiting:
+ # enabled whether field index limiting should be enabled or not
+ enabled: false
+ # items contains the list of per-namespace configuration of which fields to include into to the search index
+ items:
+ # - # namespacePattern holds the namespace for which the single limiting configuration entry should apply.
+ # # Wildcards `*` (Matching any number of any character) and `?` (Matches any single character) are supported.
+ # namespacePattern: "org.eclipse.*"
+ # # indexedFields holds a list of fields that will be explicitly included in the search index
+ # indexedFields:
+ # - "attributes"
+ # - "features/included"
# operatorMetrics contains configuration for operator defined custom metrics, using a search "count" with namespaces and filter
operatorMetrics:
# enabled configures whether operator metrics should be enabled or not
diff --git a/documentation/src/main/resources/pages/ditto/installation-operating.md b/documentation/src/main/resources/pages/ditto/installation-operating.md
index a5e10ab9e9..e6fe7f6120 100644
--- a/documentation/src/main/resources/pages/ditto/installation-operating.md
+++ b/documentation/src/main/resources/pages/ditto/installation-operating.md
@@ -324,6 +324,69 @@ entities (things/policies) and no-one other:
These system properties would have to be configured for the "things" and "policies" services.
+## Limiting Indexed Fields
+
+The default behavior of Ditto is to index the complete JSON of a thing, which includes all its attributes and features. This may not be desired behavior for certain use cases:
+* Increased load on the search database, leading to performance degradation and increased database cost.
+* Only a few fields are ever used for searching.
+
+Since Ditto *3.5.0*, there is a configuration to specify, by a namespace pattern, which fields will be included in the search database.
+
+To enable this functionality, there are two new options in the `thing-search.conf` configuration:
+
+```hocon
+ditto {
+ //...
+ caching-signal-enrichment-facade-provider = org.eclipse.ditto.thingsearch.service.persistence.write.streaming.SearchIndexingSignalEnrichmentFacadeProvider
+ //...
+ search {
+ namespace-indexed-fields = [
+ {
+ namespace-pattern = "org.eclipse.test"
+ indexed-fields = [
+ "attributes",
+ "features/info/properties",
+ "features/info/other"
+ ]
+ },
+ {
+ namespace-pattern = "org.eclipse*"
+ indexed-fields = [
+ "attributes",
+ "features/info"
+ ]
+ }
+ ]
+ }
+```
+
+There is a new implementation of the caching signal enrichment facade provider that must be configured to enable this
+functionality.
+
+For each namespace pattern, only the selected fields are included in the search database. In the example above, for
+things in the "org.eclipse.test" namespace, the fields indexed in the search database will
+only be "attributes", "features/info/properties", and "features/info/other".
+Things matching the "org.eclipse*" namespace, only the "attributes" and "features/info" paths will be the only fields
+indexed in the search database.
+
+Important notes:
+* Ditto will use the namespace of the thing and match the FIRST namespace-pattern it encounters. So make sure any
+ configured namespace-patterns are unique enough to match.
+* Ditto will automatically add the system-level fields it needs to operate, so no manual configuration of these is
+ necessary.
+
+Example for configuring the same configuration via system properties for the `things-search` service:
+
+```shell
+-Dditto.search.namespace-indexed-fields.0.namespace-pattern=org.eclipse.test
+-Dditto.search.namespace-indexed-fields.0.indexed-fields.0=attributes
+-Dditto.search.namespace-indexed-fields.0.indexed-fields.1=features/info/properties
+-Dditto.search.namespace-indexed-fields.0.indexed-fields.2=features/info/other
+-Dditto.search.namespace-indexed-fields.1.namespace-pattern=org.eclipse*
+-Dditto.search.namespace-indexed-fields.1.indexed-fields.0=attributes
+-Dditto.search.namespace-indexed-fields.1.indexed-fields.1=features/info
+```
+
## Logging
Gathering logs for a running Ditto installation can be achieved by:
diff --git a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java
index 8eb226661e..0e17e8114f 100644
--- a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java
+++ b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java
@@ -28,11 +28,11 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithResource;
-import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
-import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
+import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
+import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
@@ -52,15 +52,15 @@
* Instantiated once per cluster node so that it builds up a cache across all signal enrichments on a local cluster
* node.
*/
-public final class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade {
+public class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade {
- private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory
- .getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
+ private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
private static final String CACHE_NAME_SUFFIX = "_signal_enrichment_cache";
- private final Cache extraFieldsCache;
+ protected final Cache extraFieldsCache;
- private DittoCachingSignalEnrichmentFacade(final SignalEnrichmentFacade cacheLoaderFacade,
+ protected DittoCachingSignalEnrichmentFacade(
+ final SignalEnrichmentFacade cacheLoaderFacade,
final CacheConfig cacheConfig,
final Executor cacheLoaderExecutor,
final String cacheNamePrefix) {
@@ -96,14 +96,17 @@ public CompletionStage retrieveThing(final ThingId thingId, final Li
final long minAcceptableSeqNr) {
final DittoHeaders dittoHeaders = DittoHeaders.empty();
+
+ final JsonFieldSelector fieldSelector = determineSelector(thingId.getNamespace());
+
if (minAcceptableSeqNr < 0) {
final var cacheKey =
- SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, null));
+ SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, fieldSelector));
extraFieldsCache.invalidate(cacheKey);
return doCacheLookup(cacheKey, dittoHeaders);
} else {
final var cachingParameters =
- new CachingParameters(null, events, false, minAcceptableSeqNr);
+ new CachingParameters(fieldSelector, events, false, minAcceptableSeqNr);
return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters);
}
@@ -157,9 +160,9 @@ public CompletionStage retrievePartialThing(final EntityId thingId,
.thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector));
}
- private CompletionStage doRetrievePartialThing(final EntityId thingId,
- final DittoHeaders dittoHeaders,
- final CachingParameters cachingParameters) {
+ protected CompletionStage doRetrievePartialThing(final EntityId thingId,
+ final DittoHeaders dittoHeaders,
+ final CachingParameters cachingParameters) {
final var fieldSelector = cachingParameters.fieldSelector;
final JsonFieldSelector enhancedFieldSelector = enhanceFieldSelectorWithRevision(fieldSelector);
@@ -278,8 +281,8 @@ private static DittoHeaders getLastDittoHeaders(final List extends Signal>>
}
}
- private CompletableFuture doCacheLookup(final SignalEnrichmentCacheKey cacheKey,
- final DittoHeaders dittoHeaders) {
+ protected CompletableFuture doCacheLookup(final SignalEnrichmentCacheKey cacheKey,
+ final DittoHeaders dittoHeaders) {
LOGGER.withCorrelationId(dittoHeaders).debug("Looking up cache entry for <{}>", cacheKey);
return extraFieldsCache.get(cacheKey)
@@ -446,17 +449,23 @@ private JsonObject enhanceJsonObject(final JsonObject jsonObject, final List> concernedEvents;
private final boolean invalidateCacheOnPolicyChange;
private final long minAcceptableSeqNr;
- private CachingParameters(@Nullable final JsonFieldSelector fieldSelector,
- final List> concernedEvents,
- final boolean invalidateCacheOnPolicyChange,
- final long minAcceptableSeqNr) {
+ public CachingParameters(@Nullable final JsonFieldSelector fieldSelector,
+ final List> concernedEvents,
+ final boolean invalidateCacheOnPolicyChange,
+ final long minAcceptableSeqNr) {
this.fieldSelector = fieldSelector;
this.concernedEvents = concernedEvents;
diff --git a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/SearchIndexingSignalEnrichmentFacade.java b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/SearchIndexingSignalEnrichmentFacade.java
new file mode 100644
index 0000000000..96974e5680
--- /dev/null
+++ b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/SearchIndexingSignalEnrichmentFacade.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.internal.models.signalenrichment;
+
+import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.regex.Pattern;
+
+import org.apache.pekko.japi.Pair;
+import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
+import org.eclipse.ditto.json.JsonFieldSelector;
+
+/**
+ * Extension of {@code DittoCachingSignalEnrichmentFacade} that allows a selected map of selected indexes grouped by
+ * namespace to be added to the signal enrichment cache.
+ */
+public final class SearchIndexingSignalEnrichmentFacade extends DittoCachingSignalEnrichmentFacade {
+
+ private final List> selectedIndexes;
+ private final Map selectedIndexesCache;
+
+ private SearchIndexingSignalEnrichmentFacade(
+ final List> selectedIndexes,
+ final SignalEnrichmentFacade cacheLoaderFacade,
+ final CacheConfig cacheConfig,
+ final Executor cacheLoaderExecutor,
+ final String cacheNamePrefix) {
+
+ super(cacheLoaderFacade, cacheConfig, cacheLoaderExecutor, cacheNamePrefix);
+
+ this.selectedIndexes = List.copyOf(selectedIndexes);
+ selectedIndexesCache = new HashMap<>();
+ }
+
+ /**
+ * Returns a new {@code SearchIndexingSignalEnrichmentFacade} instance.
+ *
+ * @param selectedIndexes The selected indexes to be loaded into the search context
+ * @param cacheLoaderFacade the facade whose argument-result-pairs we are caching.
+ * @param cacheConfig the cache configuration to use for the cache.
+ * @param cacheLoaderExecutor the executor to use in order to asynchronously load cache entries.
+ * @param cacheNamePrefix the prefix to use as cacheName of the cache.
+ * @throws NullPointerException if any argument is null.
+ */
+ public static SearchIndexingSignalEnrichmentFacade newInstance(
+ final List> selectedIndexes,
+ final SignalEnrichmentFacade cacheLoaderFacade,
+ final CacheConfig cacheConfig,
+ final Executor cacheLoaderExecutor,
+ final String cacheNamePrefix) {
+
+ return new SearchIndexingSignalEnrichmentFacade(
+ checkNotNull(selectedIndexes, "selectedIndexes"),
+ checkNotNull(cacheLoaderFacade, "cacheLoaderFacade"),
+ checkNotNull(cacheConfig, "cacheConfig"),
+ checkNotNull(cacheLoaderExecutor, "cacheLoaderExecutor"),
+ checkNotNull(cacheNamePrefix, "cacheNamePrefix"));
+ }
+
+ @Override
+ protected JsonFieldSelector determineSelector(final String namespace) {
+
+ if (!selectedIndexesCache.containsKey(namespace)) {
+ // We iterate through the list and return the first JsonFieldSelector that matches the namespace pattern.
+ selectedIndexes.stream()
+ .filter(pair -> pair.first().matcher(namespace).matches())
+ .findFirst()
+ .ifPresent(pair -> selectedIndexesCache.put(namespace, pair.second()));
+ }
+ return selectedIndexesCache.get(namespace);
+ }
+}
diff --git a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractCachingSignalEnrichmentFacadeTest.java b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractCachingSignalEnrichmentFacadeTest.java
new file mode 100644
index 0000000000..2d51c3cc96
--- /dev/null
+++ b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractCachingSignalEnrichmentFacadeTest.java
@@ -0,0 +1,498 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.internal.models.signalenrichment;
+
+import java.time.Duration;
+import java.util.concurrent.CompletionStage;
+
+import org.apache.pekko.actor.ActorSelection;
+import org.apache.pekko.testkit.javadsl.TestKit;
+import org.assertj.core.api.JUnitSoftAssertions;
+import org.eclipse.ditto.base.model.auth.AuthorizationContext;
+import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
+import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
+import org.eclipse.ditto.base.model.entity.metadata.Metadata;
+import org.eclipse.ditto.base.model.entity.metadata.MetadataModelFactory;
+import org.eclipse.ditto.base.model.headers.DittoHeaders;
+import org.eclipse.ditto.base.model.json.FieldType;
+import org.eclipse.ditto.base.model.signals.DittoTestSystem;
+import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
+import org.eclipse.ditto.internal.utils.cache.config.DefaultCacheConfig;
+import org.eclipse.ditto.json.JsonFactory;
+import org.eclipse.ditto.json.JsonFieldSelector;
+import org.eclipse.ditto.json.JsonObject;
+import org.eclipse.ditto.json.JsonPointer;
+import org.eclipse.ditto.json.JsonValue;
+import org.eclipse.ditto.things.model.Thing;
+import org.eclipse.ditto.things.model.ThingId;
+import org.eclipse.ditto.things.model.ThingsModelFactory;
+import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
+import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
+import org.eclipse.ditto.things.model.signals.events.AttributeDeleted;
+import org.eclipse.ditto.things.model.signals.events.ThingMerged;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Abstract base test for different {@link SignalEnrichmentFacade} implementations providing caching.
+ */
+abstract class AbstractCachingSignalEnrichmentFacadeTest extends AbstractSignalEnrichmentFacadeTest {
+
+ private static final String ISSUER_PREFIX = "test:";
+ private static final String CACHE_CONFIG_KEY = "my-cache";
+ private static final String CACHE_CONFIG = CACHE_CONFIG_KEY + """
+ {
+ maximum-size = 10
+ expire-after-create = 2m
+ }
+ """;
+
+ private static final JsonObject THING_RESPONSE_JSON = JsonObject.of("""
+ {
+ "_revision": 3,
+ "policyId": "policy:id",
+ "attributes": {"x": 5},
+ "features": {"y": {"properties": {"z": true}}},
+ "_metadata": {"attributes": {"x": {"type": "x attribute"}}}
+ }""");
+
+
+ @Rule
+ public final JUnitSoftAssertions softly = new JUnitSoftAssertions();
+
+ @Test
+ public void alreadyLoadedCacheEntryIsReused() {
+ DittoTestSystem.run(this, kit -> {
+ // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
+ final SignalEnrichmentFacade underTest =
+ createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+ final ThingId thingId = ThingId.generateRandom();
+ final String userId = ISSUER_PREFIX + "user";
+ final DittoHeaders headers = DittoHeaders.newBuilder()
+ .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
+ AuthorizationSubject.newInstance(userId)))
+ .randomCorrelationId()
+ .build();
+ final CompletionStage askResult =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, getThingEvent());
+
+ // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
+ final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId);
+ softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
+ // WHEN: response is handled so that it is also added to the cache
+ kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
+ askResult.toCompletableFuture().join();
+ softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
+
+ // WHEN: same thing is asked again with same selector for an event with one revision ahead
+ final CompletionStage askResultCached =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers,
+ getThingEvent().setRevision(getThingEvent().getRevision() + 1));
+
+ // THEN: no cache lookup should be done
+ kit.expectNoMessage(Duration.ofSeconds(1));
+ askResultCached.toCompletableFuture().join();
+ softly.assertThat(askResultCached).isCompletedWithValue(getExpectedThingJson());
+ });
+ }
+
+ @Test
+ public void alreadyLoadedCacheEntryIsReusedForMergedEvent() {
+ DittoTestSystem.run(this, kit -> {
+ // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
+ final SignalEnrichmentFacade underTest =
+ createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+ final ThingId thingId = ThingId.generateRandom();
+ final String userId = ISSUER_PREFIX + "user";
+ final DittoHeaders headers = DittoHeaders.newBuilder()
+ .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
+ AuthorizationSubject.newInstance(userId)))
+ .randomCorrelationId()
+ .build();
+ final CompletionStage askResult =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, getThingEvent());
+
+ // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
+ final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId);
+ softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
+ // WHEN: response is handled so that it is also added to the cache
+ kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
+ askResult.toCompletableFuture().join();
+ softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
+
+ // WHEN: same thing is asked again with same selector for an event with one revision ahead
+ final ThingMerged mergeAttributes = ThingMerged.of(thingId, JsonPointer.of("/attributes"),
+ JsonObject.newBuilder()
+ .set("x", 42)
+ .set("foo", "bar")
+ .build(),
+ getThingEvent().getRevision() + 1,
+ null,
+ DittoHeaders.empty(),
+ null
+ );
+ final CompletionStage askResultCached =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, mergeAttributes);
+
+ // THEN: no cache lookup should be done
+ kit.expectNoMessage(Duration.ofSeconds(1));
+ askResultCached.toCompletableFuture().join();
+ // AND: the resulting thing JSON includes the with the merge update updated value:
+ final JsonObject expectedThingJson = getExpectedThingJson().toBuilder()
+ .set("/attributes/x", 42)
+ .build();
+ softly.assertThat(askResultCached).isCompletedWithValue(expectedThingJson);
+
+ // WHEN: then the attribute "x" is modified with a merge:
+ final ThingMerged mergeAttributeX = ThingMerged.of(thingId, JsonPointer.of("/attributes/x"),
+ JsonValue.of(1337),
+ mergeAttributes.getRevision() + 1,
+ null,
+ DittoHeaders.empty(),
+ null
+ );
+ final CompletionStage askResultCached2 =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, mergeAttributeX);
+
+ // THEN: no cache lookup should be done
+ kit.expectNoMessage(Duration.ofSeconds(1));
+ askResultCached2.toCompletableFuture().join();
+ // AND: the resulting thing JSON includes the with the merge update updated value:
+ final JsonObject expectedThingJson2 = getExpectedThingJson().toBuilder()
+ .set("/attributes/x", 1337)
+ .build();
+ softly.assertThat(askResultCached2).isCompletedWithValue(expectedThingJson2);
+ });
+ }
+
+ @Test
+ public void alreadyLoadedCacheEntryIsReusedForMergedEventOnRootLevel() {
+ DittoTestSystem.run(this, kit -> {
+ // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
+ final SignalEnrichmentFacade underTest =
+ createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+ final ThingId thingId = ThingId.generateRandom();
+ final String userId = ISSUER_PREFIX + "user";
+ final DittoHeaders headers = DittoHeaders.newBuilder()
+ .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
+ AuthorizationSubject.newInstance(userId)))
+ .randomCorrelationId()
+ .build();
+ final CompletionStage askResult =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, getThingEvent());
+
+ // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
+ final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId);
+ softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
+ // WHEN: response is handled so that it is also added to the cache
+ kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
+ askResult.toCompletableFuture().join();
+ softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
+
+ // WHEN: same thing is asked again with same selector for an event with one revision ahead
+ final ThingMerged mergeAttributes = ThingMerged.of(thingId, JsonPointer.of("/"),
+ JsonObject.newBuilder()
+ .set("attributes",
+ JsonObject.newBuilder()
+ .set("x", 42)
+ .set("foo", "bar")
+ .build())
+ .build(),
+ getThingEvent().getRevision() + 1,
+ null,
+ DittoHeaders.empty(),
+ null
+ );
+ final CompletionStage askResultCached =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, mergeAttributes);
+
+ // THEN: no cache lookup should be done
+ kit.expectNoMessage(Duration.ofSeconds(1));
+ askResultCached.toCompletableFuture().join();
+ });
+ }
+
+ @Test
+ public void alreadyLoadedCacheEntryIsInvalidatedForUnexpectedEventRevision() {
+ DittoTestSystem.run(this, kit -> {
+ // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
+ final SignalEnrichmentFacade underTest =
+ createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+ final ThingId thingId = ThingId.generateRandom();
+ final DittoHeaders headers = DittoHeaders.newBuilder().randomCorrelationId().build();
+ final CompletionStage askResult =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, getThingEvent());
+
+ // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
+ final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
+ // WHEN: response is handled so that it is also added to the cache
+ kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
+ askResult.toCompletableFuture().join();
+ softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
+
+ // WHEN: same thing is asked again with same selector with event with 2 revisions ahead
+ final DittoHeaders headers2 = DittoHeaders.newBuilder().randomCorrelationId().build();
+ final CompletionStage askResultCached =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers2,
+ getThingEvent().setRevision(getThingEvent().getRevision() + 2)); // notice +2 here
+
+ // THEN: do another cache lookup after invalidation
+ final RetrieveThing retrieveThing2 = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing2.getSelectedFields())
+ .contains(actualSelectedFields(getJsonFieldSelector()));
+ final Thing thing2 = ThingsModelFactory.newThing(getThingResponseThingJson());
+ final Thing thing2WithUpdatedRev = thing2.toBuilder()
+ .setRevision(thing2.getRevision().get().increment().increment())
+ .build();
+ kit.reply(RetrieveThingResponse.of(thingId, thing2WithUpdatedRev.toJson(
+ thing2WithUpdatedRev.getImplementedSchemaVersion(), FieldType.all()), headers2));
+ askResultCached.toCompletableFuture().join();
+ softly.assertThat(askResultCached).isCompletedWithValue(getExpectedThingJson());
+ });
+ }
+
+ @Test
+ public void differentAuthSubjectsLeadToCacheRetrievals() {
+ DittoTestSystem.run(this, kit -> {
+ // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
+ final SignalEnrichmentFacade underTest =
+ createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+ final ThingId thingId = ThingId.generateRandom();
+ final String userId1 = ISSUER_PREFIX + "user1";
+ final String userId2 = ISSUER_PREFIX + "user2";
+ final DittoHeaders headers = DittoHeaders.newBuilder()
+ .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
+ AuthorizationSubject.newInstance(userId1)))
+ .randomCorrelationId()
+ .build();
+ final CompletionStage askResult =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, getThingEvent());
+
+ // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
+ final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId1);
+ softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
+ // WHEN: response is handled so that it is also added to the cache
+ kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
+ askResult.toCompletableFuture().join();
+ softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
+
+ // WHEN: same thing is asked again with same selector for an event with one revision ahead but other auth subjects
+ final DittoHeaders headers2 = headers.toBuilder()
+ .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
+ AuthorizationSubject.newInstance(ISSUER_PREFIX + "user2")
+ ))
+ .build();
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers2,
+ getThingEvent().setRevision(getThingEvent().getRevision() + 1));
+
+ // THEN: a cache lookup should be done containing the other auth subject header
+ final RetrieveThing retrieveThing2 = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing2.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId2);
+ softly.assertThat(retrieveThing2.getSelectedFields())
+ .contains(actualSelectedFields(getJsonFieldSelector()));
+ });
+ }
+
+ @Test
+ public void differentFieldSelectorsLeadToCacheRetrievals() {
+ DittoTestSystem.run(this, kit -> {
+ // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
+ final SignalEnrichmentFacade underTest =
+ createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+ final ThingId thingId = ThingId.generateRandom();
+ final String userId = ISSUER_PREFIX + "user1";
+ final DittoHeaders headers = DittoHeaders.newBuilder()
+ .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
+ AuthorizationSubject.newInstance(userId)))
+ .randomCorrelationId()
+ .build();
+ final CompletionStage askResult =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, getThingEvent());
+
+ final JsonFieldSelector selector2 = JsonFieldSelector.newInstance("attributes", "features");
+
+ // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
+ final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId);
+ softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
+ // WHEN: response is handled so that it is also added to the cache
+ kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
+ askResult.toCompletableFuture().join();
+ softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
+
+ // WHEN: same thing is asked again with different selector for an event with one revision ahead
+ underTest.retrievePartialThing(thingId, selector2, headers,
+ getThingEvent().setRevision(getThingEvent().getRevision() + 1));
+
+ // THEN: a cache lookup should be done using the other selector
+ final RetrieveThing retrieveThing2 = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing2.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId);
+ softly.assertThat(retrieveThing2.getSelectedFields()).contains(actualSelectedFields(selector2));
+ });
+ }
+
+ @Test
+ public void metadataIsUpdatedForMergedEvent() {
+ DittoTestSystem.run(this, kit -> {
+ // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
+ final SignalEnrichmentFacade underTest =
+ createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+ final ThingId thingId = ThingId.generateRandom();
+ final String userId = ISSUER_PREFIX + "user";
+ final DittoHeaders headers = DittoHeaders.newBuilder()
+ .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
+ AuthorizationSubject.newInstance(userId)))
+ .randomCorrelationId()
+ .build();
+ final CompletionStage askResult =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, getThingEvent());
+
+ // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
+ final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId);
+ softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
+ // WHEN: response is handled so that it is also added to the cache
+ kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
+ askResult.toCompletableFuture().join();
+
+ softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
+
+ // WHEN: same thing is asked again with same selector for an event with one revision ahead
+ final ThingMerged mergeAttribute = ThingMerged.of(thingId, JsonPointer.of("/attributes/x"),
+ JsonFactory.newValue(6),
+ getThingEvent().getRevision() + 1,
+ null,
+ DittoHeaders.empty(),
+ Metadata.newMetadata(JsonObject.newBuilder()
+ .set("type", "x is now y attribute")
+ .build()
+ )
+ );
+ final CompletionStage askResultCached =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, mergeAttribute);
+
+ // THEN: no cache lookup should be done
+ kit.expectNoMessage(Duration.ofSeconds(1));
+ askResultCached.toCompletableFuture().join();
+ // AND: the resulting thing JSON includes the with the merged metadata updated value:
+ final JsonObject expectedThingJson = getExpectedThingJson().toBuilder()
+ .set("attributes", JsonObject.newBuilder()
+ .set("x", 6)
+ .build())
+ .set("_metadata", JsonObject.newBuilder()
+ .set("attributes", JsonObject.newBuilder()
+ .set("x", JsonObject.newBuilder()
+ .set("type", "x is now y attribute")
+ .build())
+ .build())
+ .build())
+ .build();
+
+ softly.assertThat(askResultCached).isCompletedWithValue(expectedThingJson);
+ });
+ }
+
+ @Test
+ public void metadataIsDeletedForDeletedEvent() {
+ DittoTestSystem.run(this, kit -> {
+ // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
+ final SignalEnrichmentFacade underTest =
+ createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+ final ThingId thingId = ThingId.generateRandom();
+ final String userId = ISSUER_PREFIX + "user";
+ final DittoHeaders headers = DittoHeaders.newBuilder()
+ .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
+ AuthorizationSubject.newInstance(userId)))
+ .randomCorrelationId()
+ .build();
+ final CompletionStage askResult =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, getThingEvent());
+
+ // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
+ final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
+ softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
+ .contains(userId);
+ softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
+ // WHEN: response is handled so that it is also added to the cache
+ kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
+ askResult.toCompletableFuture().join();
+
+ softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
+
+ // WHEN: same thing is asked again with same selector for an event with one revision ahead
+ final AttributeDeleted attributeDeleted =
+ AttributeDeleted.of(thingId, JsonPointer.of("/x"),
+ getThingEvent().getRevision() + 1,
+ null,
+ DittoHeaders.empty(),
+ MetadataModelFactory.nullMetadata());
+
+ final CompletionStage askResultCached =
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, attributeDeleted);
+
+ // THEN: no cache lookup should be done
+ kit.expectNoMessage(Duration.ofSeconds(1));
+ askResultCached.toCompletableFuture().join();
+ // AND: the resulting thing JSON includes the with the merged metadata updated value:
+ final JsonObject expectedThingJson = getExpectedThingJson().toBuilder()
+ .remove("attributes")
+ .set("_metadata", JsonObject.newBuilder()
+ .set("attributes", JsonObject.newBuilder().build())
+ .build())
+ .build();
+
+ softly.assertThat(askResultCached).isCompletedWithValue(expectedThingJson);
+ });
+ }
+
+ @Override
+ protected JsonFieldSelector actualSelectedFields(final JsonFieldSelector selector) {
+ return JsonFactory.newFieldSelectorBuilder()
+ .addPointers(selector)
+ .addFieldDefinition(Thing.JsonFields.REVISION) // additionally always select the revision
+ .build();
+ }
+
+ @Override
+ protected JsonObject getThingResponseThingJson() {
+ return THING_RESPONSE_JSON;
+ }
+
+ @Override
+ protected SignalEnrichmentFacade createSignalEnrichmentFacadeUnderTest(final TestKit kit, final Duration duration) {
+ final CacheConfig cacheConfig =
+ DefaultCacheConfig.of(ConfigFactory.parseString(CACHE_CONFIG), CACHE_CONFIG_KEY);
+ final ActorSelection commandHandler = ActorSelection.apply(kit.getRef(), "");
+ final ByRoundTripSignalEnrichmentFacade cacheLoaderFacade =
+ ByRoundTripSignalEnrichmentFacade.of(commandHandler, Duration.ofSeconds(10L));
+ return createCachingSignalEnrichmentFacade(kit, cacheLoaderFacade, cacheConfig);
+ }
+
+ protected abstract CachingSignalEnrichmentFacade createCachingSignalEnrichmentFacade(TestKit kit,
+ ByRoundTripSignalEnrichmentFacade cacheLoaderFacade, CacheConfig cacheConfig);
+}
diff --git a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractSignalEnrichmentFacadeTest.java b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractSignalEnrichmentFacadeTest.java
index 5e53d16e2d..d3b81ce0db 100644
--- a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractSignalEnrichmentFacadeTest.java
+++ b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractSignalEnrichmentFacadeTest.java
@@ -19,6 +19,8 @@
import java.util.UUID;
import java.util.concurrent.CompletionStage;
+import org.apache.pekko.pattern.AskTimeoutException;
+import org.apache.pekko.testkit.javadsl.TestKit;
import org.eclipse.ditto.base.model.entity.metadata.MetadataModelFactory;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.DittoTestSystem;
@@ -34,9 +36,6 @@
import org.eclipse.ditto.things.model.signals.events.ThingDeleted;
import org.junit.Test;
-import org.apache.pekko.pattern.AskTimeoutException;
-import org.apache.pekko.testkit.javadsl.TestKit;
-
/**
* Abstract base test for different {@link SignalEnrichmentFacade} implementations.
*/
@@ -46,7 +45,7 @@ abstract class AbstractSignalEnrichmentFacadeTest {
JsonFieldSelector.newInstance("policyId", "attributes/x", "features/y/properties/z", "_metadata");
protected static final String RESULT_POLICY_ID = "policy:id";
- protected static final AttributeModified THING_EVENT = AttributeModified.of(ThingId.generateRandom(),
+ private static final AttributeModified THING_EVENT = AttributeModified.of(ThingId.generateRandom(),
JsonPointer.of("x"),
JsonValue.of(5),
3L,
@@ -65,11 +64,11 @@ public void success() {
final ThingId thingId = ThingId.of("test:thing-id");
final DittoHeaders headers = DittoHeaders.newBuilder().correlationId(UUID.randomUUID().toString()).build();
final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, THING_EVENT);
// WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
+ assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
// THEN: The result future completes with the entity of the RetrieveThingResponse
@@ -94,6 +93,14 @@ protected JsonObject getExpectedThingJson() {
return getThingResponseThingJson();
}
+ protected JsonFieldSelector getJsonFieldSelector() {
+ return SELECTOR;
+ }
+
+ protected AttributeModified getThingEvent() {
+ return THING_EVENT;
+ }
+
@Test
public void thingNotAccessible() {
DittoTestSystem.run(this, kit -> {
@@ -103,11 +110,11 @@ public void thingNotAccessible() {
final ThingId thingId = ThingId.generateRandom();
final DittoHeaders headers = DittoHeaders.newBuilder().correlationId(UUID.randomUUID().toString()).build();
final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, THING_EVENT);
// WHEN: Command handler receives expected RetrieveThing and responds with ThingNotAccessibleException
final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
+ assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
final ThingNotAccessibleException thingNotAccessibleException =
ThingNotAccessibleException.newBuilder(thingId).dittoHeaders(headers).build();
kit.reply(thingNotAccessibleException);
@@ -127,11 +134,11 @@ public void unexpectedMessage() {
final ThingId thingId = ThingId.generateRandom();
final DittoHeaders headers = DittoHeaders.newBuilder().correlationId(UUID.randomUUID().toString()).build();
final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, THING_EVENT);
// WHEN: Command handler receives expected RetrieveThing and responds with a random object
final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
+ assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
final Object randomObject = new Object();
kit.reply(randomObject);
@@ -152,11 +159,11 @@ public void timeout() {
final ThingId thingId = ThingId.generateRandom();
final DittoHeaders headers = DittoHeaders.newBuilder().correlationId(UUID.randomUUID().toString()).build();
final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, THING_EVENT);
// WHEN: Command handler does not respond
final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
+ assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
// THEN: The result future fails with an AskTimeoutException.
askResult.toCompletableFuture().exceptionally(e -> null).join();
@@ -177,9 +184,9 @@ public void enrichThingDeleted() {
// WHEN: ThingDeleted event is about to be enriched by facade
final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, thingDeleted);
+ underTest.retrievePartialThing(thingId, getJsonFieldSelector(), headers, thingDeleted);
final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
+ assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(getJsonFieldSelector()));
kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
// THEN: The result future completes with the entity of the RetrieveThingResponse
@@ -189,4 +196,5 @@ public void enrichThingDeleted() {
}
protected abstract SignalEnrichmentFacade createSignalEnrichmentFacadeUnderTest(TestKit kit, Duration duration);
+
}
diff --git a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacadeTest.java b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacadeTest.java
index 3cbc626125..8b54abe56e 100644
--- a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacadeTest.java
+++ b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacadeTest.java
@@ -12,62 +12,15 @@
*/
package org.eclipse.ditto.internal.models.signalenrichment;
-import java.time.Duration;
-import java.util.concurrent.CompletionStage;
-
-import org.assertj.core.api.JUnitSoftAssertions;
-import org.eclipse.ditto.base.model.auth.AuthorizationContext;
-import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
-import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
-import org.eclipse.ditto.base.model.entity.metadata.Metadata;
-import org.eclipse.ditto.base.model.entity.metadata.MetadataModelFactory;
-import org.eclipse.ditto.base.model.headers.DittoHeaders;
-import org.eclipse.ditto.base.model.json.FieldType;
-import org.eclipse.ditto.base.model.signals.DittoTestSystem;
+import org.apache.pekko.testkit.javadsl.TestKit;
import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
-import org.eclipse.ditto.internal.utils.cache.config.DefaultCacheConfig;
-import org.eclipse.ditto.json.JsonFactory;
-import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
-import org.eclipse.ditto.json.JsonPointer;
-import org.eclipse.ditto.json.JsonValue;
-import org.eclipse.ditto.things.model.Thing;
-import org.eclipse.ditto.things.model.ThingId;
-import org.eclipse.ditto.things.model.ThingsModelFactory;
-import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
-import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
-import org.eclipse.ditto.things.model.signals.events.AttributeDeleted;
-import org.eclipse.ditto.things.model.signals.events.ThingMerged;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.pekko.actor.ActorSelection;
-import org.apache.pekko.testkit.javadsl.TestKit;
/**
* Unit tests for {@link DittoCachingSignalEnrichmentFacade}.
*/
-public final class DittoCachingSignalEnrichmentFacadeTest extends AbstractSignalEnrichmentFacadeTest {
+public final class DittoCachingSignalEnrichmentFacadeTest extends AbstractCachingSignalEnrichmentFacadeTest {
- private static final String CACHE_CONFIG_KEY = "my-cache";
- private static final String CACHE_CONFIG = CACHE_CONFIG_KEY + """
- {
- maximum-size = 10
- expire-after-create = 2m
- }
- """;
- private static final String ISSUER_PREFIX = "test:";
-
- private static final JsonObject THING_RESPONSE_JSON = JsonObject.of("""
- {
- "_revision": 3,
- "policyId": "policy:id",
- "attributes": {"x": 5},
- "features": {"y": {"properties": {"z": true}}},
- "_metadata": {"attributes": {"x": {"type": "x attribute"}}}
- }""");
private static final JsonObject EXPECTED_THING_JSON = JsonObject.of("""
{
"policyId": "policy:id",
@@ -76,16 +29,9 @@ public final class DittoCachingSignalEnrichmentFacadeTest extends AbstractSignal
"_metadata": {"attributes": {"x": {"type": "x attribute"}}}
}""");
- @Rule
- public final JUnitSoftAssertions softly = new JUnitSoftAssertions();
-
@Override
- protected SignalEnrichmentFacade createSignalEnrichmentFacadeUnderTest(final TestKit kit, final Duration duration) {
- final CacheConfig cacheConfig =
- DefaultCacheConfig.of(ConfigFactory.parseString(CACHE_CONFIG), CACHE_CONFIG_KEY);
- final ActorSelection commandHandler = ActorSelection.apply(kit.getRef(), "");
- final ByRoundTripSignalEnrichmentFacade cacheLoaderFacade =
- ByRoundTripSignalEnrichmentFacade.of(commandHandler, Duration.ofSeconds(10L));
+ protected CachingSignalEnrichmentFacade createCachingSignalEnrichmentFacade(final TestKit kit,
+ final ByRoundTripSignalEnrichmentFacade cacheLoaderFacade, final CacheConfig cacheConfig) {
return DittoCachingSignalEnrichmentFacade.newInstance(
cacheLoaderFacade,
cacheConfig,
@@ -93,418 +39,10 @@ protected SignalEnrichmentFacade createSignalEnrichmentFacadeUnderTest(final Tes
"test");
}
- @Override
- protected JsonObject getThingResponseThingJson() {
- return THING_RESPONSE_JSON;
- }
-
@Override
protected JsonObject getExpectedThingJson() {
return EXPECTED_THING_JSON;
}
- @Override
- protected JsonFieldSelector actualSelectedFields(final JsonFieldSelector selector) {
- return JsonFactory.newFieldSelectorBuilder()
- .addPointers(selector)
- .addFieldDefinition(Thing.JsonFields.REVISION) // additionally always select the revision
- .build();
- }
-
- @Test
- public void alreadyLoadedCacheEntryIsReused() {
- DittoTestSystem.run(this, kit -> {
- // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
- final SignalEnrichmentFacade underTest =
- createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
- final ThingId thingId = ThingId.generateRandom();
- final String userId = ISSUER_PREFIX + "user";
- final DittoHeaders headers = DittoHeaders.newBuilder()
- .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
- AuthorizationSubject.newInstance(userId)))
- .randomCorrelationId()
- .build();
- final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
-
- // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
- final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId);
- softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- // WHEN: response is handled so that it is also added to the cache
- kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
- askResult.toCompletableFuture().join();
- softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
-
- // WHEN: same thing is asked again with same selector for an event with one revision ahead
- final CompletionStage askResultCached =
- underTest.retrievePartialThing(thingId, SELECTOR, headers,
- THING_EVENT.setRevision(THING_EVENT.getRevision() + 1));
-
- // THEN: no cache lookup should be done
- kit.expectNoMessage(Duration.ofSeconds(1));
- askResultCached.toCompletableFuture().join();
- softly.assertThat(askResultCached).isCompletedWithValue(getExpectedThingJson());
- });
- }
-
- @Test
- public void alreadyLoadedCacheEntryIsReusedForMergedEvent() {
- DittoTestSystem.run(this, kit -> {
- // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
- final SignalEnrichmentFacade underTest =
- createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
- final ThingId thingId = ThingId.generateRandom();
- final String userId = ISSUER_PREFIX + "user";
- final DittoHeaders headers = DittoHeaders.newBuilder()
- .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
- AuthorizationSubject.newInstance(userId)))
- .randomCorrelationId()
- .build();
- final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
-
- // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
- final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId);
- softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- // WHEN: response is handled so that it is also added to the cache
- kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
- askResult.toCompletableFuture().join();
- softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
-
- // WHEN: same thing is asked again with same selector for an event with one revision ahead
- final ThingMerged mergeAttributes = ThingMerged.of(thingId, JsonPointer.of("/attributes"),
- JsonObject.newBuilder()
- .set("x", 42)
- .set("foo", "bar")
- .build(),
- THING_EVENT.getRevision() + 1,
- null,
- DittoHeaders.empty(),
- null
- );
- final CompletionStage askResultCached =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, mergeAttributes);
-
- // THEN: no cache lookup should be done
- kit.expectNoMessage(Duration.ofSeconds(1));
- askResultCached.toCompletableFuture().join();
- // AND: the resulting thing JSON includes the with the merge update updated value:
- final JsonObject expectedThingJson = EXPECTED_THING_JSON.toBuilder()
- .set("/attributes/x", 42)
- .build();
- softly.assertThat(askResultCached).isCompletedWithValue(expectedThingJson);
-
- // WHEN: then the attribute "x" is modified with a merge:
- final ThingMerged mergeAttributeX = ThingMerged.of(thingId, JsonPointer.of("/attributes/x"),
- JsonValue.of(1337),
- mergeAttributes.getRevision() + 1,
- null,
- DittoHeaders.empty(),
- null
- );
- final CompletionStage askResultCached2 =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, mergeAttributeX);
-
- // THEN: no cache lookup should be done
- kit.expectNoMessage(Duration.ofSeconds(1));
- askResultCached2.toCompletableFuture().join();
- // AND: the resulting thing JSON includes the with the merge update updated value:
- final JsonObject expectedThingJson2 = EXPECTED_THING_JSON.toBuilder()
- .set("/attributes/x", 1337)
- .build();
- softly.assertThat(askResultCached2).isCompletedWithValue(expectedThingJson2);
- });
- }
-
- @Test
- public void alreadyLoadedCacheEntryIsReusedForMergedEventOnRootLevel() {
- DittoTestSystem.run(this, kit -> {
- // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
- final SignalEnrichmentFacade underTest =
- createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
- final ThingId thingId = ThingId.generateRandom();
- final String userId = ISSUER_PREFIX + "user";
- final DittoHeaders headers = DittoHeaders.newBuilder()
- .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
- AuthorizationSubject.newInstance(userId)))
- .randomCorrelationId()
- .build();
- final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
-
- // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
- final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId);
- softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- // WHEN: response is handled so that it is also added to the cache
- kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
- askResult.toCompletableFuture().join();
- softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
-
- // WHEN: same thing is asked again with same selector for an event with one revision ahead
- final ThingMerged mergeAttributes = ThingMerged.of(thingId, JsonPointer.of("/"),
- JsonObject.newBuilder()
- .set("attributes",
- JsonObject.newBuilder()
- .set("x", 42)
- .set("foo", "bar")
- .build())
- .build(),
- THING_EVENT.getRevision() + 1,
- null,
- DittoHeaders.empty(),
- null
- );
- final CompletionStage askResultCached =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, mergeAttributes);
-
- // THEN: no cache lookup should be done
- kit.expectNoMessage(Duration.ofSeconds(1));
- askResultCached.toCompletableFuture().join();
- });
- }
-
- @Test
- public void alreadyLoadedCacheEntryIsInvalidatedForUnexpectedEventRevision() {
- DittoTestSystem.run(this, kit -> {
- // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
- final SignalEnrichmentFacade underTest =
- createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
- final ThingId thingId = ThingId.generateRandom();
- final DittoHeaders headers = DittoHeaders.newBuilder().randomCorrelationId().build();
- final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
-
- // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
- final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- // WHEN: response is handled so that it is also added to the cache
- kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
- askResult.toCompletableFuture().join();
- softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
-
- // WHEN: same thing is asked again with same selector with event with 2 revisions ahead
- final DittoHeaders headers2 = DittoHeaders.newBuilder().randomCorrelationId().build();
- final CompletionStage askResultCached =
- underTest.retrievePartialThing(thingId, SELECTOR, headers2,
- THING_EVENT.setRevision(THING_EVENT.getRevision() + 2)); // notice +2 here
-
- // THEN: do another cache lookup after invalidation
- final RetrieveThing retrieveThing2 = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing2.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- final Thing thing2 = ThingsModelFactory.newThing(getThingResponseThingJson());
- final Thing thing2WithUpdatedRev = thing2.toBuilder()
- .setRevision(thing2.getRevision().get().increment().increment())
- .build();
- kit.reply(RetrieveThingResponse.of(thingId, thing2WithUpdatedRev.toJson(
- thing2WithUpdatedRev.getImplementedSchemaVersion(), FieldType.all()), headers2));
- askResultCached.toCompletableFuture().join();
- softly.assertThat(askResultCached).isCompletedWithValue(getExpectedThingJson());
- });
- }
-
- @Test
- public void differentAuthSubjectsLeadToCacheRetrievals() {
- DittoTestSystem.run(this, kit -> {
- // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
- final SignalEnrichmentFacade underTest =
- createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
- final ThingId thingId = ThingId.generateRandom();
- final String userId1 = ISSUER_PREFIX + "user1";
- final String userId2 = ISSUER_PREFIX + "user2";
- final DittoHeaders headers = DittoHeaders.newBuilder()
- .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
- AuthorizationSubject.newInstance(userId1)))
- .randomCorrelationId()
- .build();
- final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
-
- // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
- final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId1);
- softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- // WHEN: response is handled so that it is also added to the cache
- kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
- askResult.toCompletableFuture().join();
- softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
-
- // WHEN: same thing is asked again with same selector for an event with one revision ahead but other auth subjects
- final DittoHeaders headers2 = headers.toBuilder()
- .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
- AuthorizationSubject.newInstance(ISSUER_PREFIX + "user2")
- ))
- .build();
- underTest.retrievePartialThing(thingId, SELECTOR, headers2,
- THING_EVENT.setRevision(THING_EVENT.getRevision() + 1));
-
- // THEN: a cache lookup should be done containing the other auth subject header
- final RetrieveThing retrieveThing2 = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing2.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId2);
- softly.assertThat(retrieveThing2.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- });
- }
-
- @Test
- public void differentFieldSelectorsLeadToCacheRetrievals() {
- DittoTestSystem.run(this, kit -> {
- // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
- final SignalEnrichmentFacade underTest =
- createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
- final ThingId thingId = ThingId.generateRandom();
- final String userId = ISSUER_PREFIX + "user1";
- final DittoHeaders headers = DittoHeaders.newBuilder()
- .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
- AuthorizationSubject.newInstance(userId)))
- .randomCorrelationId()
- .build();
- final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
-
- final JsonFieldSelector selector2 = JsonFieldSelector.newInstance("attributes", "features");
-
- // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
- final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId);
- softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- // WHEN: response is handled so that it is also added to the cache
- kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
- askResult.toCompletableFuture().join();
- softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
-
- // WHEN: same thing is asked again with different selector for an event with one revision ahead
- underTest.retrievePartialThing(thingId, selector2, headers,
- THING_EVENT.setRevision(THING_EVENT.getRevision() + 1));
-
- // THEN: a cache lookup should be done using the other selector
- final RetrieveThing retrieveThing2 = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing2.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId);
- softly.assertThat(retrieveThing2.getSelectedFields()).contains(actualSelectedFields(selector2));
- });
- }
-
- @Test
- public void metadataIsUpdatedForMergedEvent() {
- DittoTestSystem.run(this, kit -> {
- // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
- final SignalEnrichmentFacade underTest =
- createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
- final ThingId thingId = ThingId.generateRandom();
- final String userId = ISSUER_PREFIX + "user";
- final DittoHeaders headers = DittoHeaders.newBuilder()
- .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
- AuthorizationSubject.newInstance(userId)))
- .randomCorrelationId()
- .build();
- final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
-
- // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
- final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId);
- softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- // WHEN: response is handled so that it is also added to the cache
- kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
- askResult.toCompletableFuture().join();
-
- softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
-
- // WHEN: same thing is asked again with same selector for an event with one revision ahead
- final ThingMerged mergeAttribute = ThingMerged.of(thingId, JsonPointer.of("/attributes/x"),
- JsonFactory.newValue(6),
- THING_EVENT.getRevision() + 1,
- null,
- DittoHeaders.empty(),
- Metadata.newMetadata(JsonObject.newBuilder()
- .set("type", "x is now y attribute")
- .build()
- )
- );
- final CompletionStage askResultCached =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, mergeAttribute);
-
- // THEN: no cache lookup should be done
- kit.expectNoMessage(Duration.ofSeconds(1));
- askResultCached.toCompletableFuture().join();
- // AND: the resulting thing JSON includes the with the merged metadata updated value:
- final JsonObject expectedThingJson = EXPECTED_THING_JSON.toBuilder()
- .set("attributes", JsonObject.newBuilder()
- .set("x", 6)
- .build())
- .set("_metadata", JsonObject.newBuilder()
- .set("attributes", JsonObject.newBuilder()
- .set("x", JsonObject.newBuilder()
- .set("type", "x is now y attribute")
- .build())
- .build())
- .build())
- .build();
-
- softly.assertThat(askResultCached).isCompletedWithValue(expectedThingJson);
- });
- }
-
- @Test
- public void metadataIsDeletedForDeletedEvent() {
- DittoTestSystem.run(this, kit -> {
- // GIVEN: SignalEnrichmentFacade.retrievePartialThing()
- final SignalEnrichmentFacade underTest =
- createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
- final ThingId thingId = ThingId.generateRandom();
- final String userId = ISSUER_PREFIX + "user";
- final DittoHeaders headers = DittoHeaders.newBuilder()
- .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
- AuthorizationSubject.newInstance(userId)))
- .randomCorrelationId()
- .build();
- final CompletionStage askResult =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, THING_EVENT);
-
- // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse
- final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class);
- softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds())
- .contains(userId);
- softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(SELECTOR));
- // WHEN: response is handled so that it is also added to the cache
- kit.reply(RetrieveThingResponse.of(thingId, getThingResponseThingJson(), headers));
- askResult.toCompletableFuture().join();
-
- softly.assertThat(askResult).isCompletedWithValue(getExpectedThingJson());
-
- // WHEN: same thing is asked again with same selector for an event with one revision ahead
- final AttributeDeleted attributeDeleted =
- AttributeDeleted.of(thingId, JsonPointer.of("/x"),
- THING_EVENT.getRevision() + 1,
- null,
- DittoHeaders.empty(),
- MetadataModelFactory.nullMetadata());
-
- final CompletionStage askResultCached =
- underTest.retrievePartialThing(thingId, SELECTOR, headers, attributeDeleted);
-
- // THEN: no cache lookup should be done
- kit.expectNoMessage(Duration.ofSeconds(1));
- askResultCached.toCompletableFuture().join();
- // AND: the resulting thing JSON includes the with the merged metadata updated value:
- final JsonObject expectedThingJson = EXPECTED_THING_JSON.toBuilder()
- .remove("attributes")
- .set("_metadata", JsonObject.newBuilder()
- .set("attributes", JsonObject.newBuilder().build())
- .build())
- .build();
-
- softly.assertThat(askResultCached).isCompletedWithValue(expectedThingJson);
- });
- }
}
diff --git a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/SearchIndexingSignalEnrichmentFacadeTest.java b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/SearchIndexingSignalEnrichmentFacadeTest.java
new file mode 100644
index 0000000000..f91fa362b7
--- /dev/null
+++ b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/SearchIndexingSignalEnrichmentFacadeTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.internal.models.signalenrichment;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.testkit.javadsl.TestKit;
+import org.assertj.core.api.JUnitSoftAssertions;
+import org.eclipse.ditto.base.model.common.LikeHelper;
+import org.eclipse.ditto.base.model.entity.metadata.MetadataModelFactory;
+import org.eclipse.ditto.base.model.headers.DittoHeaders;
+import org.eclipse.ditto.base.model.signals.DittoTestSystem;
+import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
+import org.eclipse.ditto.json.JsonFieldSelector;
+import org.eclipse.ditto.json.JsonObject;
+import org.eclipse.ditto.json.JsonPointer;
+import org.eclipse.ditto.json.JsonValue;
+import org.eclipse.ditto.things.model.ThingId;
+import org.eclipse.ditto.things.model.signals.events.AttributeModified;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link SearchIndexingSignalEnrichmentFacade}.
+ */
+public final class SearchIndexingSignalEnrichmentFacadeTest extends AbstractCachingSignalEnrichmentFacadeTest {
+
+
+ private static final JsonObject EXPECTED_THING_JSON = JsonObject.of("""
+ {
+ "policyId": "policy:id",
+ "attributes": {"x": 5},
+ "_metadata": {"attributes": {"x": {"type": "x attribute"}}}
+ }""");
+
+ private static final JsonFieldSelector SELECTED_INDEXES =
+ JsonFieldSelector.newInstance("policyId", "attributes/x", "_metadata");
+
+ private static final JsonFieldSelector SELECTED_INDEXES_WILDCARD_NS =
+ JsonFieldSelector.newInstance("attributes/wild");
+
+ private static final AttributeModified THING_EVENT = AttributeModified.of(
+ ThingId.generateRandom("org.eclipse.test"),
+ JsonPointer.of("x"),
+ JsonValue.of(5),
+ 3L,
+ Instant.EPOCH,
+ DittoHeaders.empty(),
+ MetadataModelFactory.newMetadataBuilder()
+ .set("type", "x attribute")
+ .build());
+
+ @Rule
+ public final JUnitSoftAssertions softly = new JUnitSoftAssertions();
+
+ @Override
+ protected CachingSignalEnrichmentFacade createCachingSignalEnrichmentFacade(final TestKit kit,
+ final ByRoundTripSignalEnrichmentFacade cacheLoaderFacade, final CacheConfig cacheConfig) {
+ return SearchIndexingSignalEnrichmentFacade.newInstance(
+ List.of(
+ Pair.create(
+ Pattern.compile(
+ Objects.requireNonNull(LikeHelper.convertToRegexSyntax("org.eclipse.test"))),
+ SELECTED_INDEXES
+ ),
+ Pair.create(
+ Pattern.compile(
+ Objects.requireNonNull(LikeHelper.convertToRegexSyntax("org.eclipse*"))),
+ SELECTED_INDEXES_WILDCARD_NS
+ )
+ ),
+ cacheLoaderFacade,
+ cacheConfig,
+ kit.getSystem().getDispatcher(),
+ "test");
+ }
+
+ @Override
+ protected JsonObject getExpectedThingJson() {
+ return EXPECTED_THING_JSON;
+ }
+
+ @Override
+ protected JsonFieldSelector getJsonFieldSelector() {
+ return SELECTED_INDEXES;
+ }
+
+ @Override
+ protected AttributeModified getThingEvent() {
+ return THING_EVENT;
+ }
+
+ @Test
+ public void determineRightSelectorForMultipleNamespacesConfigured() {
+ DittoTestSystem.run(this, kit -> {
+ final SearchIndexingSignalEnrichmentFacade underTest =
+ (SearchIndexingSignalEnrichmentFacade) createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L));
+
+ assertThat(underTest.determineSelector("org.eclipse.test"))
+ .isEqualTo(SELECTED_INDEXES);
+
+ assertThat(underTest.determineSelector("org.eclipse"))
+ .isEqualTo(SELECTED_INDEXES_WILDCARD_NS);
+
+ assertThat(underTest.determineSelector("org.eclipsefoo"))
+ .isEqualTo(SELECTED_INDEXES_WILDCARD_NS);
+
+ assertThat(underTest.determineSelector("org.eclipse.wild"))
+ .isEqualTo(SELECTED_INDEXES_WILDCARD_NS);
+ });
+ }
+
+}
diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultNamespaceSearchIndexConfig.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultNamespaceSearchIndexConfig.java
new file mode 100644
index 0000000000..37163575dd
--- /dev/null
+++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultNamespaceSearchIndexConfig.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.thingsearch.service.common.config;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
+
+import com.typesafe.config.Config;
+
+/**
+ * This class is the default implementation of the NamespaceSearchIndex config.
+ * It is instantiated for each namespace search index entry containing the namespace definition and the list of search indexes.
+ */
+public final class DefaultNamespaceSearchIndexConfig implements NamespaceSearchIndexConfig {
+
+ private final String namespacePattern;
+
+ private final List includedFields;
+
+ private DefaultNamespaceSearchIndexConfig(final ConfigWithFallback configWithFallback) {
+
+ this.namespacePattern =
+ configWithFallback.getString(NamespaceSearchIndexConfigValue.NAMESPACE_PATTERN.getConfigPath());
+
+ final List fields =
+ configWithFallback.getStringList(NamespaceSearchIndexConfigValue.INDEXED_FIELDS.getConfigPath());
+ if (!fields.isEmpty()) {
+ this.includedFields = List.copyOf(fields);
+ } else {
+ this.includedFields = List.of();
+ }
+ }
+
+ /**
+ * Returns an instance of {@code DefaultNamespaceSearchIndexConfig} based on the settings of the specified Config.
+ *
+ * @param config is supposed to provide the config for the issuer at its current level.
+ * @return the instance.
+ */
+ public static DefaultNamespaceSearchIndexConfig of(final Config config) {
+ return new DefaultNamespaceSearchIndexConfig(
+ ConfigWithFallback.newInstance(config, NamespaceSearchIndexConfigValue.values()));
+ }
+
+ @Override
+ public String getNamespacePattern() {
+ return namespacePattern;
+ }
+
+ @Override
+ public List getIndexedFields() {
+ return includedFields;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final DefaultNamespaceSearchIndexConfig that = (DefaultNamespaceSearchIndexConfig) o;
+ return Objects.equals(namespacePattern, that.namespacePattern) &&
+ includedFields.equals(that.includedFields);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(namespacePattern, includedFields);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " [" +
+ "namespacePattern=" + namespacePattern +
+ ", searchIncludeFields=" + includedFields +
+ "]";
+ }
+}
\ No newline at end of file
diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DittoSearchConfig.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DittoSearchConfig.java
index 178540fecc..05408f824d 100644
--- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DittoSearchConfig.java
+++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DittoSearchConfig.java
@@ -12,10 +12,20 @@
*/
package org.eclipse.ditto.thingsearch.service.common.config;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
@@ -40,6 +50,9 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigList;
+import com.typesafe.config.ConfigValue;
+
/**
* This class is the default implementation of {@link SearchConfig}.
@@ -60,6 +73,7 @@ public final class DittoSearchConfig implements SearchConfig, WithConfigPath {
private final MongoDbConfig mongoDbConfig;
private final SearchPersistenceConfig queryPersistenceConfig;
private final Map simpleFieldMappings;
+ private final List namespaceIndexedFields;
private final DefaultOperatorMetricsConfig operatorMetricsConfig;
private DittoSearchConfig(final ScopedConfig dittoScopedConfig) {
@@ -80,6 +94,7 @@ private DittoSearchConfig(final ScopedConfig dittoScopedConfig) {
queryPersistenceConfig = DefaultSearchPersistenceConfig.of(queryConfig);
simpleFieldMappings =
convertToMap(configWithFallback.getConfig(SearchConfigValue.SIMPLE_FIELD_MAPPINGS.getConfigPath()));
+ namespaceIndexedFields = loadNamespaceSearchIndexList(configWithFallback);
operatorMetricsConfig = DefaultOperatorMetricsConfig.of(configWithFallback);
}
@@ -100,6 +115,11 @@ public Optional getMongoHintsByNamespace() {
return Optional.ofNullable(mongoHintsByNamespace);
}
+ @Override
+ public List getNamespaceIndexedFields() {
+ return namespaceIndexedFields;
+ }
+
@Override
public UpdaterConfig getUpdaterConfig() {
return updaterConfig;
@@ -183,14 +203,15 @@ public boolean equals(final Object o) {
Objects.equals(mongoDbConfig, that.mongoDbConfig) &&
Objects.equals(queryPersistenceConfig, that.queryPersistenceConfig) &&
Objects.equals(simpleFieldMappings, that.simpleFieldMappings) &&
- Objects.equals(operatorMetricsConfig, that.operatorMetricsConfig);
+ Objects.equals(operatorMetricsConfig, that.operatorMetricsConfig) &&
+ Objects.equals(namespaceIndexedFields, that.namespaceIndexedFields);
}
@Override
public int hashCode() {
return Objects.hash(mongoHintsByNamespace, updaterConfig, dittoServiceConfig, healthCheckConfig,
indexInitializationConfig, persistenceOperationsConfig, mongoDbConfig, queryPersistenceConfig,
- simpleFieldMappings, operatorMetricsConfig);
+ simpleFieldMappings, operatorMetricsConfig, namespaceIndexedFields);
}
@Override
@@ -205,6 +226,7 @@ public String toString() {
", mongoDbConfig=" + mongoDbConfig +
", queryPersistenceConfig=" + queryPersistenceConfig +
", simpleFieldMappings=" + simpleFieldMappings +
+ ", namespaceIndexedFields=" + namespaceIndexedFields +
", operatorMetricsConfig=" + operatorMetricsConfig +
"]";
}
@@ -222,4 +244,47 @@ private static Map convertToMap(final Config config) {
.filter(entry -> entry.getValue() instanceof String)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> (String) entry.getValue()));
}
+
+ private static List loadNamespaceSearchIndexList(final ConfigWithFallback config) {
+
+ final ConfigList namespaceIndexedFieldsConfig = config.getList(
+ SearchConfigValue.NAMESPACE_INDEXED_FIELDS.getConfigPath());
+
+ return namespaceIndexedFieldsConfig.stream().collect(NamespaceSearchIndexCollector.toNamespaceSearchIndexList());
+ }
+
+ private static class NamespaceSearchIndexCollector implements
+ Collector, List> {
+
+ private static NamespaceSearchIndexCollector toNamespaceSearchIndexList() {
+ return new NamespaceSearchIndexCollector();
+ }
+
+ @Override
+ public Supplier> supplier() {
+ return ArrayList::new;
+ }
+
+ @Override
+ public BiConsumer, ConfigValue> accumulator() {
+ return (list, entry) ->
+ list.add(DefaultNamespaceSearchIndexConfig.of(ConfigFactory.empty().withFallback(entry)));
+ }
+
+ @Override
+ public BinaryOperator> combiner() {
+ return (left, right) -> Stream.concat(left.stream(), right.stream())
+ .toList();
+ }
+
+ @Override
+ public Function, List> finisher() {
+ return List::copyOf;
+ }
+
+ @Override
+ public Set characteristics() {
+ return Collections.singleton(Characteristics.UNORDERED);
+ }
+ }
}
diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/NamespaceSearchIndexConfig.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/NamespaceSearchIndexConfig.java
new file mode 100644
index 0000000000..f5dc7b037e
--- /dev/null
+++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/NamespaceSearchIndexConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.thingsearch.service.common.config;
+
+import java.util.List;
+
+import javax.annotation.concurrent.Immutable;
+
+import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
+
+/**
+ * Provides configuration settings of the namespace-scoped search indexes.
+ * @since 3.5.0
+ */
+@Immutable
+public interface NamespaceSearchIndexConfig {
+
+ /**
+ * Returns the namespace pattern definition.
+ *
+ * @return the namespace pattern definition
+ */
+ String getNamespacePattern();
+
+ /**
+ * Returns a list of fields that will be explicitly included in the search index.
+ *
+ * @return the indexed fields.
+ */
+ List getIndexedFields();
+
+ enum NamespaceSearchIndexConfigValue implements KnownConfigValue {
+
+ /**
+ * The namespace pattern to apply the search indexed fields.
+ */
+ NAMESPACE_PATTERN("namespace-pattern", ""),
+
+ /**
+ * The list of fields that will be included in the search DB.
+ */
+ INDEXED_FIELDS("indexed-fields", List.of());
+
+ private final String configPath;
+ private final Object defaultValue;
+
+ NamespaceSearchIndexConfigValue(final String configPath, final Object defaultValue) {
+ this.configPath = configPath;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public Object getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public String getConfigPath() {
+ return configPath;
+ }
+ }
+}
diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/SearchConfig.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/SearchConfig.java
index b8eaaf0fed..a734915262 100644
--- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/SearchConfig.java
+++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/SearchConfig.java
@@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.thingsearch.service.common.config;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -64,6 +66,14 @@ public interface SearchConfig extends ServiceSpecificConfig, WithHealthCheckConf
*/
OperatorMetricsConfig getOperatorMetricsConfig();
+ /*
+ * Returns a map of fields scoped by namespaces that will be explicitly included in the search index.
+ *
+ * @return the search projection fields.
+ * @since 3.5.0
+ */
+ List getNamespaceIndexedFields();
+
/**
* An enumeration of the known config path expressions and their associated default values for SearchConfig.
*/
@@ -90,7 +100,14 @@ enum SearchConfigValue implements KnownConfigValue {
"definition", "/definition",
"_metadata", "/_metadata"
)
- ));
+ )),
+
+ /**
+ * Any fields to include in the search index, scoped by namespace.
+ *
+ * @since 3.5.0
+ */
+ NAMESPACE_INDEXED_FIELDS("namespace-indexed-fields", Collections.emptyList());
private final String path;
private final Object defaultValue;
diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/SearchIndexingSignalEnrichmentFacadeProvider.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/SearchIndexingSignalEnrichmentFacadeProvider.java
new file mode 100644
index 0000000000..2c22c0d1c7
--- /dev/null
+++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/SearchIndexingSignalEnrichmentFacadeProvider.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.thingsearch.service.persistence.write.streaming;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.regex.Pattern;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.japi.Pair;
+import org.eclipse.ditto.base.model.common.LikeHelper;
+import org.eclipse.ditto.internal.models.signalenrichment.CachingSignalEnrichmentFacade;
+import org.eclipse.ditto.internal.models.signalenrichment.SearchIndexingSignalEnrichmentFacade;
+import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
+import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
+import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
+import org.eclipse.ditto.json.JsonFactory;
+import org.eclipse.ditto.json.JsonFieldDefinition;
+import org.eclipse.ditto.json.JsonFieldSelector;
+import org.eclipse.ditto.json.JsonParseOptions;
+import org.eclipse.ditto.json.JsonPointer;
+import org.eclipse.ditto.things.model.Thing;
+import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
+import org.eclipse.ditto.thingsearch.service.common.config.NamespaceSearchIndexConfig;
+import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;
+
+import com.typesafe.config.Config;
+
+/**
+ * Default {@link SearchIndexingSignalEnrichmentFacadeProvider} who provides a {@link org.eclipse.ditto.internal.models.signalenrichment.SearchIndexingSignalEnrichmentFacade}.
+ */
+public final class SearchIndexingSignalEnrichmentFacadeProvider implements CachingSignalEnrichmentFacadeProvider {
+
+ private static final List> REQUIRED_INDEXED_FIELDS = List.of(
+ Thing.JsonFields.ID,
+ Thing.JsonFields.POLICY_ID,
+ Thing.JsonFields.NAMESPACE,
+ Thing.JsonFields.REVISION);
+
+ /**
+ * Instantiate this provider. Called by reflection.
+ *
+ * @param actorSystem the actor system in which to load the extension.
+ * @param config the configuration for this extension.
+ */
+ @SuppressWarnings("unused")
+ public SearchIndexingSignalEnrichmentFacadeProvider(final ActorSystem actorSystem, final Config config) {
+ // No-Op but required for extension initialisation
+ }
+
+ @Override
+ public CachingSignalEnrichmentFacade getSignalEnrichmentFacade(
+ final ActorSystem actorSystem,
+ final SignalEnrichmentFacade cacheLoaderFacade,
+ final CacheConfig cacheConfig,
+ final Executor cacheLoaderExecutor,
+ final String cacheNamePrefix) {
+
+ final SearchConfig searchConfig =
+ DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config()));
+
+ // Build a map of field selectors for the enrichment facade to use to quickly look up by Thing namespace.
+ final List> namespaceAndFieldSelector = new ArrayList<>();
+
+ for (final NamespaceSearchIndexConfig namespaceConfig : searchConfig.getNamespaceIndexedFields()) {
+
+ if (!namespaceConfig.getIndexedFields().isEmpty()) {
+
+ // Ensure the constructed JsonFieldSelector has the required fields needed for the search to work.
+ final Set set = new LinkedHashSet<>();
+ set.addAll(REQUIRED_INDEXED_FIELDS.stream()
+ .map(JsonFieldDefinition::getPointer)
+ .map(JsonPointer::toString)
+ .toList());
+ set.addAll(namespaceConfig.getIndexedFields());
+
+ final List searchIncludeFields = new ArrayList<>(set);
+
+ final JsonFieldSelector indexedFields =
+ JsonFactory.newFieldSelector(searchIncludeFields, JsonParseOptions.newBuilder().build());
+
+ // Build a Pattern from the namespace value.
+ final Pattern namespacePattern = Pattern.compile(
+ Objects.requireNonNull(LikeHelper.convertToRegexSyntax(namespaceConfig.getNamespacePattern())));
+
+ namespaceAndFieldSelector.add(Pair.create(namespacePattern, indexedFields));
+ }
+ }
+
+ return SearchIndexingSignalEnrichmentFacade.newInstance(
+ namespaceAndFieldSelector,
+ cacheLoaderFacade,
+ cacheConfig,
+ cacheLoaderExecutor,
+ cacheNamePrefix);
+ }
+}
diff --git a/thingsearch/service/src/main/resources/search.conf b/thingsearch/service/src/main/resources/search.conf
index d413059d32..945f66f6f0 100755
--- a/thingsearch/service/src/main/resources/search.conf
+++ b/thingsearch/service/src/main/resources/search.conf
@@ -9,6 +9,9 @@ ditto {
search-update-observer = org.eclipse.ditto.thingsearch.service.updater.actors.DefaultSearchUpdateObserver
query-criteria-validator = org.eclipse.ditto.thingsearch.service.persistence.query.validation.DefaultQueryCriteriaValidator
caching-signal-enrichment-facade-provider = org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DittoCachingSignalEnrichmentFacadeProvider
+ # The below is an alternate implementation of the caching-signal-enrichment-facade-provider that enables
+ # only indexing selected JSON paths by namespace.
+ # caching-signal-enrichment-facade-provider = org.eclipse.ditto.thingsearch.service.persistence.write.streaming.SearchIndexingSignalEnrichmentFacadeProvider
search-update-mapper = org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper
}
@@ -32,7 +35,7 @@ ditto {
}
breaker {
- # defines ater how many failures the circuit breaker should open
+ # defines after how many failures the circuit breaker should open
maxFailures = 5
maxFailures = ${?BREAKER_MAXFAILURES}
timeout {
@@ -64,6 +67,16 @@ ditto {
enabled = ${?INDEX_INITIALIZATION_ENABLED}
}
+ # This configuration is used in conjunction with the new caching-signal-enrichment-facade-provider implementation
+ # to only index a selected array of JSON pointers scoped by namespace.
+ namespace-indexed-fields = [
+ # Example: For the namespace-pattern "org.eclipse", only the "attributes" and "features/info" will be indexed in the
+ # search database.
+ # {
+ # namespace-pattern = "org.eclipse",
+ # indexed-fields = [ "attributes", "features/info" ]
+ # }
+ ]
query {
persistence {
diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultNamespaceSearchIndexConfigTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultNamespaceSearchIndexConfigTest.java
new file mode 100644
index 0000000000..d844d58e7e
--- /dev/null
+++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultNamespaceSearchIndexConfigTest.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+
+package org.eclipse.ditto.thingsearch.service.common.config;
+
+import static org.mutabilitydetector.unittesting.AllowedReason.assumingFields;
+import static org.mutabilitydetector.unittesting.AllowedReason.provided;
+import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
+import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;
+
+import java.util.List;
+
+import org.assertj.core.api.JUnitSoftAssertions;
+import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+public final class DefaultNamespaceSearchIndexConfigTest {
+
+ @Rule
+ public final JUnitSoftAssertions softly = new JUnitSoftAssertions();
+
+ @Test
+ public void assertImmutability() {
+ assertInstancesOf(DefaultNamespaceSearchIndexConfig.class, areImmutable(),
+ provided(String.class).isAlsoImmutable(),
+ assumingFields(
+ "includedFields").areSafelyCopiedUnmodifiableCollectionsWithImmutableElements()
+ );
+ }
+
+ @Test
+ public void testHashCodeAndEquals() {
+ EqualsVerifier.forClass(DefaultNamespaceSearchIndexConfig.class)
+ .usingGetClass()
+ .verify();
+ }
+
+ @Test
+ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
+
+ final Config searchConfig = ConfigFactory.load("search-test");
+ final DittoSearchConfig underTest = DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(searchConfig));
+
+
+ softly.assertThat(underTest.getNamespaceIndexedFields()).isNotNull();
+
+ softly.assertThat(underTest.getNamespaceIndexedFields()).isEmpty();
+ }
+
+ @Test
+ public void underTestReturnsValuesOfConfigFile() {
+
+ final Config searchConfig = ConfigFactory.load("namespace-search-index-test");
+ final DittoSearchConfig underTest = DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(searchConfig));
+
+ softly.assertThat(underTest.getNamespaceIndexedFields()).isNotNull();
+
+ softly.assertThat(underTest.getNamespaceIndexedFields()).isNotEmpty();
+
+ softly.assertThat(underTest.getNamespaceIndexedFields().size()).isEqualTo(2);
+
+ NamespaceSearchIndexConfig first = underTest.getNamespaceIndexedFields().get(0);
+ NamespaceSearchIndexConfig second = underTest.getNamespaceIndexedFields().get(1);
+
+ // First config
+ softly.assertThat(first.getNamespacePattern()).isEqualTo("org.eclipse.test");
+
+ softly.assertThat(first.getIndexedFields())
+ .as(NamespaceSearchIndexConfig.NamespaceSearchIndexConfigValue.INDEXED_FIELDS.getConfigPath())
+ .isEqualTo(
+ List.of("attributes", "features/info/properties", "features/info/other"));
+
+ // Second config
+ softly.assertThat(second.getNamespacePattern()).isEqualTo("org.eclipse*");
+
+ softly.assertThat(second.getIndexedFields())
+ .as(NamespaceSearchIndexConfig.NamespaceSearchIndexConfigValue.INDEXED_FIELDS.getConfigPath())
+ .isEqualTo(
+ List.of("attributes", "features/info"));
+ }
+}
diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/starter/config/DittoSearchConfigTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/starter/config/DittoSearchConfigTest.java
index b054893eb9..2b55a526e2 100644
--- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/starter/config/DittoSearchConfigTest.java
+++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/starter/config/DittoSearchConfigTest.java
@@ -47,7 +47,8 @@ public void assertImmutability() {
DefaultMongoDbConfig.class, DefaultSearchPersistenceConfig.class,
DefaultOperatorMetricsConfig.class)
.areAlsoImmutable(),
- assumingFields("simpleFieldMappings").areSafelyCopiedUnmodifiableCollectionsWithImmutableElements());
+ assumingFields("simpleFieldMappings", "namespaceIndexedFields")
+ .areSafelyCopiedUnmodifiableCollectionsWithImmutableElements());
}
@Test
@@ -61,6 +62,7 @@ public void testHashCodeAndEquals() {
public void testQueryPersistenceConfig() {
final var config = ConfigFactory.load("search-test.conf");
final var underTest = DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(config));
+
final var queryPersistenceConfig = underTest.getQueryPersistenceConfig();
assertThat(queryPersistenceConfig.readConcern()).isEqualTo(ReadConcern.LINEARIZABLE);
assertThat(queryPersistenceConfig.readPreference()).isEqualTo(ReadPreference.NEAREST);
diff --git a/thingsearch/service/src/test/resources/namespace-search-index-test.conf b/thingsearch/service/src/test/resources/namespace-search-index-test.conf
new file mode 100644
index 0000000000..6af53203ef
--- /dev/null
+++ b/thingsearch/service/src/test/resources/namespace-search-index-test.conf
@@ -0,0 +1,30 @@
+ditto {
+ mongodb {
+ uri = "mongodb://localhost:27017/test"
+ pool {
+ max-size = 100
+ max-wait-time = 30s
+ max-wait-queue-size = 500000
+ }
+ }
+ search {
+ namespace-indexed-fields = [
+ # The list of thing paths that are included in the search index.
+ {
+ namespace-pattern = "org.eclipse.test"
+ indexed-fields = [
+ "attributes",
+ "features/info/properties",
+ "features/info/other"
+ ]
+ },
+ {
+ namespace-pattern = "org.eclipse*"
+ indexed-fields = [
+ "attributes",
+ "features/info"
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file