diff --git a/build.gradle.kts b/build.gradle.kts index a7c13079..831a64f0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { id("org.hypertrace.ci-utils-plugin") version "0.3.0" id("org.hypertrace.publish-plugin") version "1.0.2" apply false id("org.hypertrace.code-style-plugin") version "1.1.0" apply false - id("org.owasp.dependencycheck") version "8.2.1" + id("org.owasp.dependencycheck") version "8.4.3" } subprojects { diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/ArrayFiltersQueryIntegrationTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/ArrayFiltersQueryIntegrationTest.java index 3fe6c73d..d0903b17 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/ArrayFiltersQueryIntegrationTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/ArrayFiltersQueryIntegrationTest.java @@ -97,7 +97,7 @@ private static void initializeAndConnectToPostgres() { private static void initializeAndConnectToMongo() { mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("mongo:7.0.14")) .withExposedPorts(27017) .waitingFor(Wait.forListeningPort()); mongo.start(); diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java index edddd397..b96287a8 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java @@ -132,7 +132,7 @@ public class DocStoreQueryV1Test { public static void init() throws IOException { datastoreMap = Maps.newHashMap(); mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("mongo:7.0.14")) .withExposedPorts(27017) .waitingFor(Wait.forListeningPort()); mongo.start(); diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java index f72d567e..e685c3b3 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java @@ -71,7 +71,7 @@ public class DocStoreTest { public static void init() { datastoreMap = Maps.newHashMap(); mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("mongo:7.0.14")) .withExposedPorts(27017) .waitingFor(Wait.forListeningPort()); mongo.start(); diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java index 5578c08d..8c60844a 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java @@ -75,7 +75,7 @@ public class MongoDocStoreTest { @BeforeAll public static void init() { mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("mongo:7.0.14")) .withExposedPorts(27017) .waitingFor(Wait.forListeningPort()); mongo.start(); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index d52eab6f..3e5a52e1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -7,6 +7,7 @@ import java.util.Set; import org.hypertrace.core.documentstore.expression.impl.KeyExpression; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; @@ -127,6 +128,16 @@ public interface Collection { */ CloseableIterator aggregate(final org.hypertrace.core.documentstore.query.Query query); + /** + * Query the documents conforming to the query specification. + * + * @param query The query specification + * @param queryOptions The query options + * @return {@link CloseableIterator} of matching documents + */ + CloseableIterator query( + final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions); + /** * Delete the document with the given key. * diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java index 5c5212ec..2ee4f942 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java @@ -11,6 +11,7 @@ import org.hypertrace.core.documentstore.model.config.TypesafeConfigDatastoreConfigExtractor; import org.hypertrace.core.documentstore.model.config.mongo.MongoConnectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.hypertrace.core.documentstore.model.options.DataFreshness; @Deprecated(forRemoval = true) interface TypesafeDatastoreConfigAdapter { @@ -24,7 +25,14 @@ class MongoTypesafeDatastoreConfigAdapter implements TypesafeDatastoreConfigAdap public DatastoreConfig convert(final Config config) { final MongoConnectionConfig overridingConnectionConfig = new MongoConnectionConfig( - emptyList(), null, null, "", null, null, AggregatePipelineMode.DEFAULT_ALWAYS) { + emptyList(), + null, + null, + "", + null, + null, + AggregatePipelineMode.DEFAULT_ALWAYS, + DataFreshness.SYSTEM_DEFAULT) { public MongoClientSettings toSettings() { final MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder() diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java index 0249e7ea..213d4317 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java @@ -18,6 +18,7 @@ import lombok.experimental.NonFinal; import org.hypertrace.core.documentstore.model.config.mongo.MongoConnectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.hypertrace.core.documentstore.model.options.DataFreshness; @Value @NonFinal @@ -30,12 +31,18 @@ public class ConnectionConfig { @NonNull String database; @Nullable ConnectionCredentials credentials; @NonNull AggregatePipelineMode aggregationPipelineMode; + @NonNull DataFreshness dataFreshness; public ConnectionConfig( @NonNull List<@NonNull Endpoint> endpoints, @NonNull String database, @Nullable ConnectionCredentials credentials) { - this(endpoints, database, credentials, AggregatePipelineMode.DEFAULT_ALWAYS); + this( + endpoints, + database, + credentials, + AggregatePipelineMode.DEFAULT_ALWAYS, + DataFreshness.SYSTEM_DEFAULT); } public static ConnectionConfigBuilder builder() { @@ -55,6 +62,7 @@ public static class ConnectionConfigBuilder { String replicaSet; ConnectionPoolConfig connectionPoolConfig; AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS; + DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT; public ConnectionConfigBuilder type(final DatabaseType type) { this.type = type; @@ -82,7 +90,8 @@ public ConnectionConfig build() { applicationName, replicaSet, connectionPoolConfig, - aggregationPipelineMode); + aggregationPipelineMode, + dataFreshness); case POSTGRES: return new PostgresConnectionConfig( diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java index a58eb64f..8c6ee5de 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java @@ -27,6 +27,7 @@ import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig; import org.hypertrace.core.documentstore.model.config.DatabaseType; import org.hypertrace.core.documentstore.model.config.Endpoint; +import org.hypertrace.core.documentstore.model.options.DataFreshness; @Value @NonFinal @@ -49,12 +50,14 @@ public MongoConnectionConfig( @NonNull final String applicationName, @Nullable final String replicaSetName, @Nullable final ConnectionPoolConfig connectionPoolConfig, - AggregatePipelineMode aggregationPipelineMode) { + @NonNull final AggregatePipelineMode aggregationPipelineMode, + @NonNull final DataFreshness dataFreshness) { super( ensureAtLeastOneEndpoint(endpoints), getDatabaseOrDefault(database), getCredentialsOrDefault(credentials, database), - aggregationPipelineMode); + aggregationPipelineMode, + dataFreshness); this.applicationName = applicationName; this.replicaSetName = replicaSetName; this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java new file mode 100644 index 00000000..d13aee76 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java @@ -0,0 +1,9 @@ +package org.hypertrace.core.documentstore.model.options; + +@SuppressWarnings("UnnecessarySemicolon") +public enum DataFreshness { + SYSTEM_DEFAULT, + REALTIME_FRESHNESS, + NEAR_REALTIME_FRESHNESS, + ; +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java new file mode 100644 index 00000000..85e70cc2 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java @@ -0,0 +1,15 @@ +package org.hypertrace.core.documentstore.model.options; + +import lombok.Builder; +import lombok.Builder.Default; +import lombok.Value; +import lombok.experimental.Accessors; + +@Value +@Builder +@Accessors(fluent = true, chain = true) +public class QueryOptions { + public static final QueryOptions DEFAULT_QUERY_OPTIONS = QueryOptions.builder().build(); + + @Default DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT; +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index 25b56139..9f70e996 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -61,6 +61,7 @@ import org.hypertrace.core.documentstore.Query; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; import org.hypertrace.core.documentstore.mongo.query.MongoQueryExecutor; import org.hypertrace.core.documentstore.mongo.update.MongoUpdateExecutor; @@ -550,6 +551,12 @@ public CloseableIterator aggregate( return convertToDocumentIterator(queryExecutor.aggregate(query)); } + @Override + public CloseableIterator query( + final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) { + return convertToDocumentIterator(queryExecutor.aggregate(query, queryOptions)); + } + @Override public Optional update( final org.hypertrace.core.documentstore.query.Query query, diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoCollectionOptionsApplier.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoCollectionOptionsApplier.java new file mode 100644 index 00000000..85e93891 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoCollectionOptionsApplier.java @@ -0,0 +1,47 @@ +package org.hypertrace.core.documentstore.mongo.collection; + +import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT; +import static org.hypertrace.core.documentstore.mongo.collection.MongoReadPreferenceConverter.convert; + +import com.mongodb.BasicDBObject; +import com.mongodb.ReadPreference; +import com.mongodb.client.MongoCollection; +import java.util.HashMap; +import java.util.Map; +import lombok.Builder; +import lombok.Value; +import lombok.experimental.Accessors; +import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.options.QueryOptions; + +public class MongoCollectionOptionsApplier { + private final Map> collectionCache; + + public MongoCollectionOptionsApplier() { + this.collectionCache = new HashMap<>(); + } + + public MongoCollection applyOptions( + final ConnectionConfig connectionConfig, + final QueryOptions queryOptions, + final MongoCollection collection) { + final CacheKey cacheKey = + CacheKey.builder().readPreference(readPreference(connectionConfig, queryOptions)).build(); + return collectionCache.computeIfAbsent( + cacheKey, key -> collection.withReadPreference(key.readPreference())); + } + + private ReadPreference readPreference( + final ConnectionConfig connectionConfig, final QueryOptions queryOptions) { + return SYSTEM_DEFAULT.equals(queryOptions.dataFreshness()) + ? convert(connectionConfig.dataFreshness()) + : convert(queryOptions.dataFreshness()); + } + + @Value + @Builder + @Accessors(fluent = true, chain = true) + private static class CacheKey { + ReadPreference readPreference; + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java new file mode 100644 index 00000000..b7168c98 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java @@ -0,0 +1,32 @@ +package org.hypertrace.core.documentstore.mongo.collection; + +import static com.mongodb.ReadPreference.primary; +import static com.mongodb.ReadPreference.secondaryPreferred; +import static java.util.Map.entry; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.REALTIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT; + +import com.mongodb.ReadPreference; +import java.util.Map; +import java.util.Optional; +import org.hypertrace.core.documentstore.model.options.DataFreshness; + +public class MongoReadPreferenceConverter { + private static final Map DATA_FRESHNESS_TO_READ_PREFERENCE = + Map.ofEntries( + entry(SYSTEM_DEFAULT, primary()), + entry(REALTIME_FRESHNESS, primary()), + entry(NEAR_REALTIME_FRESHNESS, secondaryPreferred())); + + public static ReadPreference convert(final DataFreshness dataFreshness) { + if (dataFreshness == null) { + return primary(); + } + + return Optional.ofNullable(DATA_FRESHNESS_TO_READ_PREFERENCE.get(dataFreshness)) + .orElseThrow( + () -> + new UnsupportedOperationException("Unsupported data freshness: " + dataFreshness)); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java index 578039fb..7fdaffe7 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java @@ -5,6 +5,8 @@ import static java.util.function.Function.identity; import static java.util.function.Predicate.not; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toUnmodifiableList; +import static org.hypertrace.core.documentstore.model.options.QueryOptions.DEFAULT_QUERY_OPTIONS; import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.COUNT_ALIAS; import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.getCountClause; import static org.hypertrace.core.documentstore.mongo.query.MongoPaginationHelper.applyPagination; @@ -38,6 +40,8 @@ import org.bson.conversions.Bson; import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.options.QueryOptions; +import org.hypertrace.core.documentstore.mongo.collection.MongoCollectionOptionsApplier; import org.hypertrace.core.documentstore.mongo.query.parser.AliasParser; import org.hypertrace.core.documentstore.mongo.query.parser.MongoFromTypeExpressionParser; import org.hypertrace.core.documentstore.mongo.query.parser.MongoGroupTypeExpressionParser; @@ -52,7 +56,6 @@ @Slf4j @AllArgsConstructor public class MongoQueryExecutor { - private static final List>> DEFAULT_AGGREGATE_PIPELINE_FUNCTIONS = List.of( @@ -116,6 +119,7 @@ public ServerAddress getServerAddress() { } }; + private final MongoCollectionOptionsApplier optionsApplier = new MongoCollectionOptionsApplier(); private final com.mongodb.client.MongoCollection collection; private final ConnectionConfig connectionConfig; @@ -138,6 +142,11 @@ public MongoCursor find(final Query query) { } public MongoCursor aggregate(final Query originalQuery) { + return aggregate(originalQuery, DEFAULT_QUERY_OPTIONS); + } + + public MongoCursor aggregate( + final Query originalQuery, final QueryOptions queryOptions) { if (originalQuery.getPagination().map(Pagination::getLimit).map(ZERO::equals).orElse(false)) { log.debug("Not executing query because of a 0 limit"); return EMPTY_CURSOR; @@ -152,13 +161,16 @@ public MongoCursor aggregate(final Query originalQuery) { aggregatePipeline.stream() .flatMap(function -> function.apply(query).stream()) .filter(not(BasicDBObject::isEmpty)) - .collect(toList()); + .collect(toUnmodifiableList()); - logPipeline(pipeline); + logPipeline(pipeline, queryOptions); try { + final com.mongodb.client.MongoCollection collectionWithOptions = + optionsApplier.applyOptions(connectionConfig, queryOptions, collection); + final AggregateIterable iterable = - collection.aggregate(pipeline).allowDiskUse(true); + collectionWithOptions.aggregate(pipeline).allowDiskUse(true); return iterable.cursor(); } catch (final MongoCommandException e) { log.error("Execution failed for query: {}. Aggregation Pipeline: {}", query, pipeline); @@ -177,7 +189,7 @@ public long count(final Query originalQuery) { .filter(not(BasicDBObject::isEmpty)) .collect(toList()); - logPipeline(pipeline); + logPipeline(pipeline, DEFAULT_QUERY_OPTIONS); final AggregateIterable iterable = collection.aggregate(pipeline); try (final MongoCursor cursor = iterable.cursor()) { @@ -206,11 +218,12 @@ private void logClauses( pagination); } - private void logPipeline(final List pipeline) { + private void logPipeline(final List pipeline, final QueryOptions queryOptions) { log.debug( - "MongoDB aggregation():\n Collection: {}\n Pipeline: {}", + "MongoDB aggregation():\n Collection: {}\n Pipeline: {}\n QueryOptions: {}", collection.getNamespace(), - pipeline); + pipeline, + queryOptions); } private Query transformAndLog(Query query) { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index 1628ad35..790787cc 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -70,6 +70,7 @@ import org.hypertrace.core.documentstore.commons.UpdateValidator; import org.hypertrace.core.documentstore.expression.impl.KeyExpression; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; @@ -494,6 +495,12 @@ public CloseableIterator aggregate( return queryExecutor.execute(client.getConnection(), query); } + @Override + public CloseableIterator query( + final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) { + return aggregate(query); + } + @Override public Optional update( final org.hypertrace.core.documentstore.query.Query query, diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/MongoConnectionConfigTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/MongoConnectionConfigTest.java index cbdbcaf0..8e85f40f 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/MongoConnectionConfigTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/MongoConnectionConfigTest.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.hypertrace.core.documentstore.model.config.mongo.MongoConnectionConfig; +import org.hypertrace.core.documentstore.model.options.DataFreshness; import org.junit.jupiter.api.Test; class MongoConnectionConfigTest { @@ -167,4 +168,16 @@ void testDefaultCredentials_shouldNotSetCredentials() { final MongoClientSettings actual = mongoConnectionConfig.toSettings(); assertEquals(expected, actual); } + + @Test + void testBuildWithDataFreshnessOption() { + final MongoConnectionConfig mongoConnectionConfig = + (MongoConnectionConfig) + ConnectionConfig.builder() + .type(DatabaseType.MONGO) + .dataFreshness(DataFreshness.NEAR_REALTIME_FRESHNESS) + .build(); + + assertEquals(DataFreshness.NEAR_REALTIME_FRESHNESS, mongoConnectionConfig.dataFreshness()); + } } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoCollectionTest.java index 1f59b9b7..75fd6981 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoCollectionTest.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.BasicDBObject; import com.mongodb.MongoNamespace; +import com.mongodb.ReadPreference; import com.mongodb.client.AggregateIterable; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCursor; @@ -85,6 +86,7 @@ public void setup() { MongoNamespace namespace = new MongoNamespace("Mongo.test_collection"); when(collection.getNamespace()).thenReturn(namespace); + when(collection.withReadPreference(any(ReadPreference.class))).thenReturn(collection); } @Test diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java index 40489a11..76d62ea7 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java @@ -20,11 +20,13 @@ import static org.hypertrace.core.documentstore.expression.operators.RelationalOperator.NOT_IN; import static org.hypertrace.core.documentstore.expression.operators.SortOrder.ASC; import static org.hypertrace.core.documentstore.expression.operators.SortOrder.DESC; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.REALTIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -33,6 +35,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.BasicDBObject; +import com.mongodb.ReadPreference; import com.mongodb.client.AggregateIterable; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCursor; @@ -52,6 +55,7 @@ import org.hypertrace.core.documentstore.expression.operators.SortOrder; import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.mongo.query.MongoQueryExecutor; import org.hypertrace.core.documentstore.query.Filter; import org.hypertrace.core.documentstore.query.Pagination; @@ -61,6 +65,7 @@ import org.hypertrace.core.documentstore.query.SortingSpec; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -81,6 +86,8 @@ class MongoQueryExecutorTest { @Mock private MongoCursor cursor; + @Mock private ConnectionConfig connectionConfig; + private MongoQueryExecutor executor; private static final VerificationMode NOT_INVOKED = times(0); @@ -88,12 +95,12 @@ class MongoQueryExecutorTest { @BeforeEach void setUp() { - ConnectionConfig connectionConfig = mock(ConnectionConfig.class); when(connectionConfig.aggregationPipelineMode()) .thenReturn(AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE); executor = new MongoQueryExecutor(collection, connectionConfig); when(collection.find(any(BasicDBObject.class))).thenReturn(iterable); + when(collection.withReadPreference(any(ReadPreference.class))).thenReturn(collection); when(collection.aggregate(anyList())).thenReturn(aggIterable); when(iterable.projection(any(BasicDBObject.class))).thenReturn(iterable); @@ -601,12 +608,121 @@ public void testOptimizeSorts_sortSpecInSelection_selectionWithAlias() query, "mongo/pipeline/optimize_sorts_sort_in_selection_selection_with_alias.json"); } + @SuppressWarnings("resource") + @Nested + class MongoQueryOptionsTest { + @Test + void testDefault() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + testQueryOptions(ReadPreference.primary()); + } + + @Test + void testConnectionLevelReadPreference() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(NEAR_REALTIME_FRESHNESS); + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + testQueryOptions(ReadPreference.secondaryPreferred()); + } + + @Test + void testConnectionLevelReadPreferenceWithNullValue() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(null); + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + testQueryOptions(ReadPreference.primary()); + } + + @Test + void testConnectionLevelReadPreferenceWithSystemDefaultValue() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(SYSTEM_DEFAULT); + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + testQueryOptions(ReadPreference.primary()); + } + + @Test + void testQueryLevelReadPreferenceOverride() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(NEAR_REALTIME_FRESHNESS); + executor.aggregate(query, QueryOptions.builder().dataFreshness(REALTIME_FRESHNESS).build()); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.builder().dataFreshness(REALTIME_FRESHNESS).build()); + + testQueryOptions(ReadPreference.primary()); + } + + @Test + void testQueryLevelReadPreference() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(SYSTEM_DEFAULT); + executor.aggregate( + query, QueryOptions.builder().dataFreshness(NEAR_REALTIME_FRESHNESS).build()); + + // Fire query twice to test the cache behaviour + executor.aggregate( + query, QueryOptions.builder().dataFreshness(NEAR_REALTIME_FRESHNESS).build()); + + testQueryOptions(ReadPreference.secondaryPreferred()); + } + + private void testQueryOptions(final ReadPreference readPreference) { + verify(collection, times(2)).getNamespace(); + verify(collection, times(1)).withReadPreference(readPreference); + verify(collection, times(2)).aggregate(anyList()); + verify(aggIterable, times(2)).allowDiskUse(true); + verify(aggIterable, times(2)).cursor(); + } + } + private void testAggregation(Query query, final String filePath) throws IOException, URISyntaxException { List pipeline = readExpectedPipeline(filePath); executor.aggregate(query); verify(collection).getNamespace(); + verify(collection).withReadPreference(ReadPreference.primary()); verify(collection).aggregate(pipeline); verify(aggIterable).allowDiskUse(true); verify(aggIterable).cursor();