From 8077db876e89d89aee4e976172e90660185e7ba8 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Fri, 13 Sep 2024 15:54:59 +0530 Subject: [PATCH 01/10] Introduced data freshness with the MongoDB implementation at connection-level and at query-level --- .../core/documentstore/Collection.java | 11 ++++++++ .../TypesafeDatastoreConfigAdapter.java | 10 ++++++- .../model/config/ConnectionConfig.java | 13 +++++++-- .../config/mongo/MongoConnectionConfig.java | 14 +++++++--- .../model/options/DataFreshness.java | 7 +++++ .../model/options/QueryOptions.java | 15 +++++++++++ .../documentstore/mongo/MongoCollection.java | 8 ++++++ .../mongo/query/MongoQueryExecutor.java | 27 ++++++++++++++----- .../query/MongoReadPreferenceMapper.java | 24 +++++++++++++++++ .../postgres/PostgresCollection.java | 7 +++++ .../mongo/MongoCollectionTest.java | 2 ++ .../mongo/MongoQueryExecutorTest.java | 3 +++ 12 files changed, 128 insertions(+), 13 deletions(-) create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index d52eab6f..3e5a52e1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -7,6 +7,7 @@ import java.util.Set; import org.hypertrace.core.documentstore.expression.impl.KeyExpression; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; @@ -127,6 +128,16 @@ public interface Collection { */ CloseableIterator aggregate(final org.hypertrace.core.documentstore.query.Query query); + /** + * Query the documents conforming to the query specification. + * + * @param query The query specification + * @param queryOptions The query options + * @return {@link CloseableIterator} of matching documents + */ + CloseableIterator query( + final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions); + /** * Delete the document with the given key. * diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java index 5c5212ec..3cfd7873 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java @@ -11,6 +11,7 @@ import org.hypertrace.core.documentstore.model.config.TypesafeConfigDatastoreConfigExtractor; import org.hypertrace.core.documentstore.model.config.mongo.MongoConnectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.hypertrace.core.documentstore.model.options.DataFreshness; @Deprecated(forRemoval = true) interface TypesafeDatastoreConfigAdapter { @@ -24,7 +25,14 @@ class MongoTypesafeDatastoreConfigAdapter implements TypesafeDatastoreConfigAdap public DatastoreConfig convert(final Config config) { final MongoConnectionConfig overridingConnectionConfig = new MongoConnectionConfig( - emptyList(), null, null, "", null, null, AggregatePipelineMode.DEFAULT_ALWAYS) { + emptyList(), + null, + null, + "", + null, + null, + AggregatePipelineMode.DEFAULT_ALWAYS, + DataFreshness.REAL_TIME_FRESHNESS) { public MongoClientSettings toSettings() { final MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder() diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java index 0249e7ea..c06a773d 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java @@ -18,6 +18,7 @@ import lombok.experimental.NonFinal; import org.hypertrace.core.documentstore.model.config.mongo.MongoConnectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.hypertrace.core.documentstore.model.options.DataFreshness; @Value @NonFinal @@ -30,12 +31,18 @@ public class ConnectionConfig { @NonNull String database; @Nullable ConnectionCredentials credentials; @NonNull AggregatePipelineMode aggregationPipelineMode; + @NonNull DataFreshness dataFreshness; public ConnectionConfig( @NonNull List<@NonNull Endpoint> endpoints, @NonNull String database, @Nullable ConnectionCredentials credentials) { - this(endpoints, database, credentials, AggregatePipelineMode.DEFAULT_ALWAYS); + this( + endpoints, + database, + credentials, + AggregatePipelineMode.DEFAULT_ALWAYS, + DataFreshness.REAL_TIME_FRESHNESS); } public static ConnectionConfigBuilder builder() { @@ -55,6 +62,7 @@ public static class ConnectionConfigBuilder { String replicaSet; ConnectionPoolConfig connectionPoolConfig; AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS; + DataFreshness dataFreshness = DataFreshness.REAL_TIME_FRESHNESS; public ConnectionConfigBuilder type(final DatabaseType type) { this.type = type; @@ -82,7 +90,8 @@ public ConnectionConfig build() { applicationName, replicaSet, connectionPoolConfig, - aggregationPipelineMode); + aggregationPipelineMode, + dataFreshness); case POSTGRES: return new PostgresConnectionConfig( diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java index a58eb64f..eeaf5603 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java @@ -27,6 +27,8 @@ import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig; import org.hypertrace.core.documentstore.model.config.DatabaseType; import org.hypertrace.core.documentstore.model.config.Endpoint; +import org.hypertrace.core.documentstore.model.options.DataFreshness; +import org.hypertrace.core.documentstore.mongo.query.MongoReadPreferenceMapper; @Value @NonFinal @@ -34,6 +36,9 @@ @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) public class MongoConnectionConfig extends ConnectionConfig { + private static final MongoReadPreferenceMapper mongoReadPreferenceMapper = + new MongoReadPreferenceMapper(); + @NonNull String applicationName; @Nullable String replicaSetName; @NonNull ConnectionPoolConfig connectionPoolConfig; @@ -49,12 +54,14 @@ public MongoConnectionConfig( @NonNull final String applicationName, @Nullable final String replicaSetName, @Nullable final ConnectionPoolConfig connectionPoolConfig, - AggregatePipelineMode aggregationPipelineMode) { + @NonNull final AggregatePipelineMode aggregationPipelineMode, + @NonNull final DataFreshness dataFreshness) { super( ensureAtLeastOneEndpoint(endpoints), getDatabaseOrDefault(database), getCredentialsOrDefault(credentials, database), - aggregationPipelineMode); + aggregationPipelineMode, + dataFreshness); this.applicationName = applicationName; this.replicaSetName = replicaSetName; this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig); @@ -65,7 +72,8 @@ public MongoClientSettings toSettings() { MongoClientSettings.builder() .applicationName(applicationName()) .retryWrites(true) - .retryReads(true); + .retryReads(true) + .readPreference(mongoReadPreferenceMapper.readPreferenceFor(dataFreshness())); applyClusterSettings(settingsBuilder); applyConnectionPoolSettings(settingsBuilder); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java new file mode 100644 index 00000000..f70614ec --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java @@ -0,0 +1,7 @@ +package org.hypertrace.core.documentstore.model.options; + +public enum DataFreshness { + REAL_TIME_FRESHNESS, + NEAR_REAL_TIME_FRESHNESS, + ; +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java new file mode 100644 index 00000000..e661d183 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java @@ -0,0 +1,15 @@ +package org.hypertrace.core.documentstore.model.options; + +import lombok.Builder; +import lombok.Builder.Default; +import lombok.Value; +import lombok.experimental.Accessors; + +@Value +@Builder +@Accessors(fluent = true, chain = true) +public class QueryOptions { + public static final QueryOptions DEFAULT_QUERY_OPTIONS = QueryOptions.builder().build(); + + @Default DataFreshness dataFreshness = DataFreshness.REAL_TIME_FRESHNESS; +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index 25b56139..1a95f1e2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -61,6 +61,7 @@ import org.hypertrace.core.documentstore.Query; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; import org.hypertrace.core.documentstore.mongo.query.MongoQueryExecutor; import org.hypertrace.core.documentstore.mongo.update.MongoUpdateExecutor; @@ -550,6 +551,13 @@ public CloseableIterator aggregate( return convertToDocumentIterator(queryExecutor.aggregate(query)); } + @Override + public CloseableIterator query( + final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'query'"); + } + @Override public Optional update( final org.hypertrace.core.documentstore.query.Query query, diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java index 578039fb..4afc7e73 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java @@ -5,6 +5,7 @@ import static java.util.function.Function.identity; import static java.util.function.Predicate.not; import static java.util.stream.Collectors.toList; +import static org.hypertrace.core.documentstore.model.options.QueryOptions.DEFAULT_QUERY_OPTIONS; import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.COUNT_ALIAS; import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.getCountClause; import static org.hypertrace.core.documentstore.mongo.query.MongoPaginationHelper.applyPagination; @@ -20,6 +21,7 @@ import com.mongodb.BasicDBObject; import com.mongodb.MongoCommandException; +import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; import com.mongodb.client.AggregateIterable; @@ -38,6 +40,7 @@ import org.bson.conversions.Bson; import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.mongo.query.parser.AliasParser; import org.hypertrace.core.documentstore.mongo.query.parser.MongoFromTypeExpressionParser; import org.hypertrace.core.documentstore.mongo.query.parser.MongoGroupTypeExpressionParser; @@ -52,7 +55,6 @@ @Slf4j @AllArgsConstructor public class MongoQueryExecutor { - private static final List>> DEFAULT_AGGREGATE_PIPELINE_FUNCTIONS = List.of( @@ -116,6 +118,9 @@ public ServerAddress getServerAddress() { } }; + private final MongoReadPreferenceMapper mongoReadPreferenceMapper = + new MongoReadPreferenceMapper(); + private final com.mongodb.client.MongoCollection collection; private final ConnectionConfig connectionConfig; @@ -138,6 +143,11 @@ public MongoCursor find(final Query query) { } public MongoCursor aggregate(final Query originalQuery) { + return aggregate(originalQuery, DEFAULT_QUERY_OPTIONS); + } + + public MongoCursor aggregate( + final Query originalQuery, final QueryOptions queryOptions) { if (originalQuery.getPagination().map(Pagination::getLimit).map(ZERO::equals).orElse(false)) { log.debug("Not executing query because of a 0 limit"); return EMPTY_CURSOR; @@ -154,11 +164,13 @@ public MongoCursor aggregate(final Query originalQuery) { .filter(not(BasicDBObject::isEmpty)) .collect(toList()); - logPipeline(pipeline); + logPipeline(pipeline, queryOptions); try { + final ReadPreference readPreference = + mongoReadPreferenceMapper.readPreferenceFor(queryOptions.dataFreshness()); final AggregateIterable iterable = - collection.aggregate(pipeline).allowDiskUse(true); + collection.withReadPreference(readPreference).aggregate(pipeline).allowDiskUse(true); return iterable.cursor(); } catch (final MongoCommandException e) { log.error("Execution failed for query: {}. Aggregation Pipeline: {}", query, pipeline); @@ -177,7 +189,7 @@ public long count(final Query originalQuery) { .filter(not(BasicDBObject::isEmpty)) .collect(toList()); - logPipeline(pipeline); + logPipeline(pipeline, DEFAULT_QUERY_OPTIONS); final AggregateIterable iterable = collection.aggregate(pipeline); try (final MongoCursor cursor = iterable.cursor()) { @@ -206,11 +218,12 @@ private void logClauses( pagination); } - private void logPipeline(final List pipeline) { + private void logPipeline(final List pipeline, final QueryOptions queryOptions) { log.debug( - "MongoDB aggregation():\n Collection: {}\n Pipeline: {}", + "MongoDB aggregation():\n Collection: {}\n Pipeline: {}\n QueryOptions: {}", collection.getNamespace(), - pipeline); + pipeline, + queryOptions); } private Query transformAndLog(Query query) { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java new file mode 100644 index 00000000..6c11d8d7 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java @@ -0,0 +1,24 @@ +package org.hypertrace.core.documentstore.mongo.query; + +import static java.util.Map.entry; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REAL_TIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.REAL_TIME_FRESHNESS; + +import com.mongodb.ReadPreference; +import java.util.Map; +import java.util.Optional; +import org.hypertrace.core.documentstore.model.options.DataFreshness; + +public class MongoReadPreferenceMapper { + private static final Map DATA_FRESHNESS_TO_READ_PREFERENCE = + Map.ofEntries( + entry(REAL_TIME_FRESHNESS, ReadPreference.primary()), + entry(NEAR_REAL_TIME_FRESHNESS, ReadPreference.secondary())); + + public ReadPreference readPreferenceFor(final DataFreshness dataFreshness) { + return Optional.ofNullable(DATA_FRESHNESS_TO_READ_PREFERENCE.get(dataFreshness)) + .orElseThrow( + () -> + new UnsupportedOperationException("Unsupported data freshness: " + dataFreshness)); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index 1628ad35..790787cc 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -70,6 +70,7 @@ import org.hypertrace.core.documentstore.commons.UpdateValidator; import org.hypertrace.core.documentstore.expression.impl.KeyExpression; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; @@ -494,6 +495,12 @@ public CloseableIterator aggregate( return queryExecutor.execute(client.getConnection(), query); } + @Override + public CloseableIterator query( + final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) { + return aggregate(query); + } + @Override public Optional update( final org.hypertrace.core.documentstore.query.Query query, diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoCollectionTest.java index 1f59b9b7..75fd6981 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoCollectionTest.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.BasicDBObject; import com.mongodb.MongoNamespace; +import com.mongodb.ReadPreference; import com.mongodb.client.AggregateIterable; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCursor; @@ -85,6 +86,7 @@ public void setup() { MongoNamespace namespace = new MongoNamespace("Mongo.test_collection"); when(collection.getNamespace()).thenReturn(namespace); + when(collection.withReadPreference(any(ReadPreference.class))).thenReturn(collection); } @Test diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java index 40489a11..afdbd6c5 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java @@ -33,6 +33,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.BasicDBObject; +import com.mongodb.ReadPreference; import com.mongodb.client.AggregateIterable; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCursor; @@ -94,6 +95,7 @@ void setUp() { executor = new MongoQueryExecutor(collection, connectionConfig); when(collection.find(any(BasicDBObject.class))).thenReturn(iterable); + when(collection.withReadPreference(any(ReadPreference.class))).thenReturn(collection); when(collection.aggregate(anyList())).thenReturn(aggIterable); when(iterable.projection(any(BasicDBObject.class))).thenReturn(iterable); @@ -607,6 +609,7 @@ private void testAggregation(Query query, final String filePath) executor.aggregate(query); verify(collection).getNamespace(); + verify(collection).withReadPreference(ReadPreference.primary()); verify(collection).aggregate(pipeline); verify(aggIterable).allowDiskUse(true); verify(aggIterable).cursor(); From b3ab9a0c32f23a8dc3b76efe87d8a226a81ef346 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Fri, 13 Sep 2024 16:00:18 +0530 Subject: [PATCH 02/10] Minor fixes --- .../core/documentstore/model/options/DataFreshness.java | 1 + .../mongo/query/MongoReadPreferenceMapper.java | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java index f70614ec..03c16d3e 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java @@ -1,5 +1,6 @@ package org.hypertrace.core.documentstore.model.options; +@SuppressWarnings("UnnecessarySemicolon") public enum DataFreshness { REAL_TIME_FRESHNESS, NEAR_REAL_TIME_FRESHNESS, diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java index 6c11d8d7..8f40783a 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java @@ -1,5 +1,7 @@ package org.hypertrace.core.documentstore.mongo.query; +import static com.mongodb.ReadPreference.primary; +import static com.mongodb.ReadPreference.secondaryPreferred; import static java.util.Map.entry; import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REAL_TIME_FRESHNESS; import static org.hypertrace.core.documentstore.model.options.DataFreshness.REAL_TIME_FRESHNESS; @@ -12,8 +14,8 @@ public class MongoReadPreferenceMapper { private static final Map DATA_FRESHNESS_TO_READ_PREFERENCE = Map.ofEntries( - entry(REAL_TIME_FRESHNESS, ReadPreference.primary()), - entry(NEAR_REAL_TIME_FRESHNESS, ReadPreference.secondary())); + entry(REAL_TIME_FRESHNESS, primary()), + entry(NEAR_REAL_TIME_FRESHNESS, secondaryPreferred())); public ReadPreference readPreferenceFor(final DataFreshness dataFreshness) { return Optional.ofNullable(DATA_FRESHNESS_TO_READ_PREFERENCE.get(dataFreshness)) From 8d1037f8781affa7a49fd85b1a307acd4079787a Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Fri, 13 Sep 2024 18:33:52 +0530 Subject: [PATCH 03/10] Made proper fixes with unit tests --- .../TypesafeDatastoreConfigAdapter.java | 2 +- .../model/config/ConnectionConfig.java | 4 +- .../config/mongo/MongoConnectionConfig.java | 7 +- .../model/options/DataFreshness.java | 1 + .../model/options/QueryOptions.java | 2 +- .../MongoCollectionOptionsApplier.java | 47 +++++++ .../MongoReadPreferenceConverter.java} | 12 +- .../mongo/query/MongoQueryExecutor.java | 16 +-- .../mongo/MongoQueryExecutorTest.java | 117 +++++++++++++++++- 9 files changed, 185 insertions(+), 23 deletions(-) create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoCollectionOptionsApplier.java rename document-store/src/main/java/org/hypertrace/core/documentstore/mongo/{query/MongoReadPreferenceMapper.java => collection/MongoReadPreferenceConverter.java} (71%) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java index 3cfd7873..2ee4f942 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java @@ -32,7 +32,7 @@ public DatastoreConfig convert(final Config config) { null, null, AggregatePipelineMode.DEFAULT_ALWAYS, - DataFreshness.REAL_TIME_FRESHNESS) { + DataFreshness.SYSTEM_DEFAULT) { public MongoClientSettings toSettings() { final MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder() diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java index c06a773d..213d4317 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java @@ -42,7 +42,7 @@ public ConnectionConfig( database, credentials, AggregatePipelineMode.DEFAULT_ALWAYS, - DataFreshness.REAL_TIME_FRESHNESS); + DataFreshness.SYSTEM_DEFAULT); } public static ConnectionConfigBuilder builder() { @@ -62,7 +62,7 @@ public static class ConnectionConfigBuilder { String replicaSet; ConnectionPoolConfig connectionPoolConfig; AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS; - DataFreshness dataFreshness = DataFreshness.REAL_TIME_FRESHNESS; + DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT; public ConnectionConfigBuilder type(final DatabaseType type) { this.type = type; diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java index eeaf5603..8c6ee5de 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java @@ -28,7 +28,6 @@ import org.hypertrace.core.documentstore.model.config.DatabaseType; import org.hypertrace.core.documentstore.model.config.Endpoint; import org.hypertrace.core.documentstore.model.options.DataFreshness; -import org.hypertrace.core.documentstore.mongo.query.MongoReadPreferenceMapper; @Value @NonFinal @@ -36,9 +35,6 @@ @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) public class MongoConnectionConfig extends ConnectionConfig { - private static final MongoReadPreferenceMapper mongoReadPreferenceMapper = - new MongoReadPreferenceMapper(); - @NonNull String applicationName; @Nullable String replicaSetName; @NonNull ConnectionPoolConfig connectionPoolConfig; @@ -72,8 +68,7 @@ public MongoClientSettings toSettings() { MongoClientSettings.builder() .applicationName(applicationName()) .retryWrites(true) - .retryReads(true) - .readPreference(mongoReadPreferenceMapper.readPreferenceFor(dataFreshness())); + .retryReads(true); applyClusterSettings(settingsBuilder); applyConnectionPoolSettings(settingsBuilder); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java index 03c16d3e..c07cf789 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java @@ -2,6 +2,7 @@ @SuppressWarnings("UnnecessarySemicolon") public enum DataFreshness { + SYSTEM_DEFAULT, REAL_TIME_FRESHNESS, NEAR_REAL_TIME_FRESHNESS, ; diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java index e661d183..85e70cc2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/QueryOptions.java @@ -11,5 +11,5 @@ public class QueryOptions { public static final QueryOptions DEFAULT_QUERY_OPTIONS = QueryOptions.builder().build(); - @Default DataFreshness dataFreshness = DataFreshness.REAL_TIME_FRESHNESS; + @Default DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT; } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoCollectionOptionsApplier.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoCollectionOptionsApplier.java new file mode 100644 index 00000000..85e93891 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoCollectionOptionsApplier.java @@ -0,0 +1,47 @@ +package org.hypertrace.core.documentstore.mongo.collection; + +import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT; +import static org.hypertrace.core.documentstore.mongo.collection.MongoReadPreferenceConverter.convert; + +import com.mongodb.BasicDBObject; +import com.mongodb.ReadPreference; +import com.mongodb.client.MongoCollection; +import java.util.HashMap; +import java.util.Map; +import lombok.Builder; +import lombok.Value; +import lombok.experimental.Accessors; +import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.options.QueryOptions; + +public class MongoCollectionOptionsApplier { + private final Map> collectionCache; + + public MongoCollectionOptionsApplier() { + this.collectionCache = new HashMap<>(); + } + + public MongoCollection applyOptions( + final ConnectionConfig connectionConfig, + final QueryOptions queryOptions, + final MongoCollection collection) { + final CacheKey cacheKey = + CacheKey.builder().readPreference(readPreference(connectionConfig, queryOptions)).build(); + return collectionCache.computeIfAbsent( + cacheKey, key -> collection.withReadPreference(key.readPreference())); + } + + private ReadPreference readPreference( + final ConnectionConfig connectionConfig, final QueryOptions queryOptions) { + return SYSTEM_DEFAULT.equals(queryOptions.dataFreshness()) + ? convert(connectionConfig.dataFreshness()) + : convert(queryOptions.dataFreshness()); + } + + @Value + @Builder + @Accessors(fluent = true, chain = true) + private static class CacheKey { + ReadPreference readPreference; + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java similarity index 71% rename from document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java rename to document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java index 8f40783a..26405e55 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoReadPreferenceMapper.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java @@ -1,23 +1,29 @@ -package org.hypertrace.core.documentstore.mongo.query; +package org.hypertrace.core.documentstore.mongo.collection; import static com.mongodb.ReadPreference.primary; import static com.mongodb.ReadPreference.secondaryPreferred; import static java.util.Map.entry; import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REAL_TIME_FRESHNESS; import static org.hypertrace.core.documentstore.model.options.DataFreshness.REAL_TIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT; import com.mongodb.ReadPreference; import java.util.Map; import java.util.Optional; import org.hypertrace.core.documentstore.model.options.DataFreshness; -public class MongoReadPreferenceMapper { +public class MongoReadPreferenceConverter { private static final Map DATA_FRESHNESS_TO_READ_PREFERENCE = Map.ofEntries( + entry(SYSTEM_DEFAULT, primary()), entry(REAL_TIME_FRESHNESS, primary()), entry(NEAR_REAL_TIME_FRESHNESS, secondaryPreferred())); - public ReadPreference readPreferenceFor(final DataFreshness dataFreshness) { + public static ReadPreference convert(final DataFreshness dataFreshness) { + if (dataFreshness == null) { + return primary(); + } + return Optional.ofNullable(DATA_FRESHNESS_TO_READ_PREFERENCE.get(dataFreshness)) .orElseThrow( () -> diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java index 4afc7e73..7fdaffe7 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/query/MongoQueryExecutor.java @@ -5,6 +5,7 @@ import static java.util.function.Function.identity; import static java.util.function.Predicate.not; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toUnmodifiableList; import static org.hypertrace.core.documentstore.model.options.QueryOptions.DEFAULT_QUERY_OPTIONS; import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.COUNT_ALIAS; import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.getCountClause; @@ -21,7 +22,6 @@ import com.mongodb.BasicDBObject; import com.mongodb.MongoCommandException; -import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; import com.mongodb.client.AggregateIterable; @@ -41,6 +41,7 @@ import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; import org.hypertrace.core.documentstore.model.options.QueryOptions; +import org.hypertrace.core.documentstore.mongo.collection.MongoCollectionOptionsApplier; import org.hypertrace.core.documentstore.mongo.query.parser.AliasParser; import org.hypertrace.core.documentstore.mongo.query.parser.MongoFromTypeExpressionParser; import org.hypertrace.core.documentstore.mongo.query.parser.MongoGroupTypeExpressionParser; @@ -118,9 +119,7 @@ public ServerAddress getServerAddress() { } }; - private final MongoReadPreferenceMapper mongoReadPreferenceMapper = - new MongoReadPreferenceMapper(); - + private final MongoCollectionOptionsApplier optionsApplier = new MongoCollectionOptionsApplier(); private final com.mongodb.client.MongoCollection collection; private final ConnectionConfig connectionConfig; @@ -162,15 +161,16 @@ public MongoCursor aggregate( aggregatePipeline.stream() .flatMap(function -> function.apply(query).stream()) .filter(not(BasicDBObject::isEmpty)) - .collect(toList()); + .collect(toUnmodifiableList()); logPipeline(pipeline, queryOptions); try { - final ReadPreference readPreference = - mongoReadPreferenceMapper.readPreferenceFor(queryOptions.dataFreshness()); + final com.mongodb.client.MongoCollection collectionWithOptions = + optionsApplier.applyOptions(connectionConfig, queryOptions, collection); + final AggregateIterable iterable = - collection.withReadPreference(readPreference).aggregate(pipeline).allowDiskUse(true); + collectionWithOptions.aggregate(pipeline).allowDiskUse(true); return iterable.cursor(); } catch (final MongoCommandException e) { log.error("Execution failed for query: {}. Aggregation Pipeline: {}", query, pipeline); diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java index afdbd6c5..e6dfbcf1 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java @@ -20,11 +20,13 @@ import static org.hypertrace.core.documentstore.expression.operators.RelationalOperator.NOT_IN; import static org.hypertrace.core.documentstore.expression.operators.SortOrder.ASC; import static org.hypertrace.core.documentstore.expression.operators.SortOrder.DESC; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REAL_TIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.REAL_TIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -53,6 +55,7 @@ import org.hypertrace.core.documentstore.expression.operators.SortOrder; import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.mongo.query.MongoQueryExecutor; import org.hypertrace.core.documentstore.query.Filter; import org.hypertrace.core.documentstore.query.Pagination; @@ -62,6 +65,7 @@ import org.hypertrace.core.documentstore.query.SortingSpec; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -82,6 +86,8 @@ class MongoQueryExecutorTest { @Mock private MongoCursor cursor; + @Mock private ConnectionConfig connectionConfig; + private MongoQueryExecutor executor; private static final VerificationMode NOT_INVOKED = times(0); @@ -89,7 +95,6 @@ class MongoQueryExecutorTest { @BeforeEach void setUp() { - ConnectionConfig connectionConfig = mock(ConnectionConfig.class); when(connectionConfig.aggregationPipelineMode()) .thenReturn(AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE); executor = new MongoQueryExecutor(collection, connectionConfig); @@ -603,6 +608,114 @@ public void testOptimizeSorts_sortSpecInSelection_selectionWithAlias() query, "mongo/pipeline/optimize_sorts_sort_in_selection_selection_with_alias.json"); } + @SuppressWarnings("resource") + @Nested + class MongoQueryOptionsTest { + @Test + void testDefault() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + testQueryOptions(ReadPreference.primary()); + } + + @Test + void testConnectionLevelReadPreference() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(NEAR_REAL_TIME_FRESHNESS); + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + testQueryOptions(ReadPreference.secondaryPreferred()); + } + + @Test + void testConnectionLevelReadPreferenceWithNullValue() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(null); + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + testQueryOptions(ReadPreference.primary()); + } + + @Test + void testConnectionLevelReadPreferenceWithSystemDefaultValue() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(SYSTEM_DEFAULT); + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); + + testQueryOptions(ReadPreference.primary()); + } + + @Test + void testQueryLevelReadPreferenceOverride() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(NEAR_REAL_TIME_FRESHNESS); + executor.aggregate(query, QueryOptions.builder().dataFreshness(REAL_TIME_FRESHNESS).build()); + + // Fire query twice to test the cache behaviour + executor.aggregate(query, QueryOptions.builder().dataFreshness(REAL_TIME_FRESHNESS).build()); + + testQueryOptions(ReadPreference.primary()); + } + + @Test + void testQueryLevelReadPreference() { + final Query query = + Query.builder() + .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") + .build(); + + when(connectionConfig.dataFreshness()).thenReturn(SYSTEM_DEFAULT); + executor.aggregate( + query, QueryOptions.builder().dataFreshness(NEAR_REAL_TIME_FRESHNESS).build()); + + // Fire query twice to test the cache behaviour + executor.aggregate( + query, QueryOptions.builder().dataFreshness(NEAR_REAL_TIME_FRESHNESS).build()); + + testQueryOptions(ReadPreference.secondaryPreferred()); + } + + private void testQueryOptions(final ReadPreference readPreference) { + verify(collection, times(2)).getNamespace(); + verify(collection, times(1)).withReadPreference(readPreference); + verify(collection, times(2)).aggregate(anyList()); + verify(aggIterable, times(2)).allowDiskUse(true); + verify(aggIterable, times(2)).cursor(); + } + } + private void testAggregation(Query query, final String filePath) throws IOException, URISyntaxException { List pipeline = readExpectedPipeline(filePath); From a97c11a864bd68a4937e62ad9033e94ac3480cad Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Fri, 13 Sep 2024 18:36:01 +0530 Subject: [PATCH 04/10] Renaming --- .../model/options/DataFreshness.java | 4 ++-- .../collection/MongoReadPreferenceConverter.java | 8 ++++---- .../mongo/MongoQueryExecutorTest.java | 16 ++++++++-------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java index c07cf789..d13aee76 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/DataFreshness.java @@ -3,7 +3,7 @@ @SuppressWarnings("UnnecessarySemicolon") public enum DataFreshness { SYSTEM_DEFAULT, - REAL_TIME_FRESHNESS, - NEAR_REAL_TIME_FRESHNESS, + REALTIME_FRESHNESS, + NEAR_REALTIME_FRESHNESS, ; } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java index 26405e55..b7168c98 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java @@ -3,8 +3,8 @@ import static com.mongodb.ReadPreference.primary; import static com.mongodb.ReadPreference.secondaryPreferred; import static java.util.Map.entry; -import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REAL_TIME_FRESHNESS; -import static org.hypertrace.core.documentstore.model.options.DataFreshness.REAL_TIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.REALTIME_FRESHNESS; import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT; import com.mongodb.ReadPreference; @@ -16,8 +16,8 @@ public class MongoReadPreferenceConverter { private static final Map DATA_FRESHNESS_TO_READ_PREFERENCE = Map.ofEntries( entry(SYSTEM_DEFAULT, primary()), - entry(REAL_TIME_FRESHNESS, primary()), - entry(NEAR_REAL_TIME_FRESHNESS, secondaryPreferred())); + entry(REALTIME_FRESHNESS, primary()), + entry(NEAR_REALTIME_FRESHNESS, secondaryPreferred())); public static ReadPreference convert(final DataFreshness dataFreshness) { if (dataFreshness == null) { diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java index e6dfbcf1..76d62ea7 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorTest.java @@ -20,8 +20,8 @@ import static org.hypertrace.core.documentstore.expression.operators.RelationalOperator.NOT_IN; import static org.hypertrace.core.documentstore.expression.operators.SortOrder.ASC; import static org.hypertrace.core.documentstore.expression.operators.SortOrder.DESC; -import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REAL_TIME_FRESHNESS; -import static org.hypertrace.core.documentstore.model.options.DataFreshness.REAL_TIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.REALTIME_FRESHNESS; import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -632,7 +632,7 @@ void testConnectionLevelReadPreference() { .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") .build(); - when(connectionConfig.dataFreshness()).thenReturn(NEAR_REAL_TIME_FRESHNESS); + when(connectionConfig.dataFreshness()).thenReturn(NEAR_REALTIME_FRESHNESS); executor.aggregate(query, QueryOptions.DEFAULT_QUERY_OPTIONS); // Fire query twice to test the cache behaviour @@ -680,11 +680,11 @@ void testQueryLevelReadPreferenceOverride() { .addSelection(AggregateExpression.of(COUNT, ConstantExpression.of(1)), "total") .build(); - when(connectionConfig.dataFreshness()).thenReturn(NEAR_REAL_TIME_FRESHNESS); - executor.aggregate(query, QueryOptions.builder().dataFreshness(REAL_TIME_FRESHNESS).build()); + when(connectionConfig.dataFreshness()).thenReturn(NEAR_REALTIME_FRESHNESS); + executor.aggregate(query, QueryOptions.builder().dataFreshness(REALTIME_FRESHNESS).build()); // Fire query twice to test the cache behaviour - executor.aggregate(query, QueryOptions.builder().dataFreshness(REAL_TIME_FRESHNESS).build()); + executor.aggregate(query, QueryOptions.builder().dataFreshness(REALTIME_FRESHNESS).build()); testQueryOptions(ReadPreference.primary()); } @@ -698,11 +698,11 @@ void testQueryLevelReadPreference() { when(connectionConfig.dataFreshness()).thenReturn(SYSTEM_DEFAULT); executor.aggregate( - query, QueryOptions.builder().dataFreshness(NEAR_REAL_TIME_FRESHNESS).build()); + query, QueryOptions.builder().dataFreshness(NEAR_REALTIME_FRESHNESS).build()); // Fire query twice to test the cache behaviour executor.aggregate( - query, QueryOptions.builder().dataFreshness(NEAR_REAL_TIME_FRESHNESS).build()); + query, QueryOptions.builder().dataFreshness(NEAR_REALTIME_FRESHNESS).build()); testQueryOptions(ReadPreference.secondaryPreferred()); } From 21b878364a9e535e049ee8142eeb13b58a6c62d8 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Fri, 13 Sep 2024 18:58:02 +0530 Subject: [PATCH 05/10] Add a missing implementation link --- .../hypertrace/core/documentstore/mongo/MongoCollection.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index 1a95f1e2..9f70e996 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -554,8 +554,7 @@ public CloseableIterator aggregate( @Override public CloseableIterator query( final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'query'"); + return convertToDocumentIterator(queryExecutor.aggregate(query, queryOptions)); } @Override From 78503c2d2a1d1de9882647c4feb1899694dc7a07 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Mon, 16 Sep 2024 13:10:12 +0530 Subject: [PATCH 06/10] Test for builder and getter --- .../model/config/MongoConnectionConfigTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/MongoConnectionConfigTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/MongoConnectionConfigTest.java index cbdbcaf0..8e85f40f 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/MongoConnectionConfigTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/MongoConnectionConfigTest.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.hypertrace.core.documentstore.model.config.mongo.MongoConnectionConfig; +import org.hypertrace.core.documentstore.model.options.DataFreshness; import org.junit.jupiter.api.Test; class MongoConnectionConfigTest { @@ -167,4 +168,16 @@ void testDefaultCredentials_shouldNotSetCredentials() { final MongoClientSettings actual = mongoConnectionConfig.toSettings(); assertEquals(expected, actual); } + + @Test + void testBuildWithDataFreshnessOption() { + final MongoConnectionConfig mongoConnectionConfig = + (MongoConnectionConfig) + ConnectionConfig.builder() + .type(DatabaseType.MONGO) + .dataFreshness(DataFreshness.NEAR_REALTIME_FRESHNESS) + .build(); + + assertEquals(DataFreshness.NEAR_REALTIME_FRESHNESS, mongoConnectionConfig.dataFreshness()); + } } From 38d3b6c61f142afb78a6ec6fd6154530c09a929e Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Tue, 17 Sep 2024 12:20:50 +0530 Subject: [PATCH 07/10] Upgrade MongoDB version in integration tests --- .../org/hypertrace/core/documentstore/DocStoreQueryV1Test.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java index edddd397..b96287a8 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java @@ -132,7 +132,7 @@ public class DocStoreQueryV1Test { public static void init() throws IOException { datastoreMap = Maps.newHashMap(); mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("mongo:7.0.14")) .withExposedPorts(27017) .waitingFor(Wait.forListeningPort()); mongo.start(); From a16f4d58d509b41d1084015244d0d5ecf0d3cf98 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Tue, 17 Sep 2024 12:21:47 +0530 Subject: [PATCH 08/10] Update MongoDB version in integration tests --- .../core/documentstore/ArrayFiltersQueryIntegrationTest.java | 2 +- .../java/org/hypertrace/core/documentstore/DocStoreTest.java | 2 +- .../hypertrace/core/documentstore/mongo/MongoDocStoreTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/ArrayFiltersQueryIntegrationTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/ArrayFiltersQueryIntegrationTest.java index 3fe6c73d..d0903b17 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/ArrayFiltersQueryIntegrationTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/ArrayFiltersQueryIntegrationTest.java @@ -97,7 +97,7 @@ private static void initializeAndConnectToPostgres() { private static void initializeAndConnectToMongo() { mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("mongo:7.0.14")) .withExposedPorts(27017) .waitingFor(Wait.forListeningPort()); mongo.start(); diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java index f72d567e..e685c3b3 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java @@ -71,7 +71,7 @@ public class DocStoreTest { public static void init() { datastoreMap = Maps.newHashMap(); mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("mongo:7.0.14")) .withExposedPorts(27017) .waitingFor(Wait.forListeningPort()); mongo.start(); diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java index 5578c08d..8c60844a 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java @@ -75,7 +75,7 @@ public class MongoDocStoreTest { @BeforeAll public static void init() { mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("mongo:7.0.14")) .withExposedPorts(27017) .waitingFor(Wait.forListeningPort()); mongo.start(); From 4e0b29c52e598e7fc8274ad33b7710a93cfadc3e Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Tue, 17 Sep 2024 13:08:54 +0530 Subject: [PATCH 09/10] Upgrade dependency check version --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index a7c13079..98157ed4 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { id("org.hypertrace.ci-utils-plugin") version "0.3.0" id("org.hypertrace.publish-plugin") version "1.0.2" apply false id("org.hypertrace.code-style-plugin") version "1.1.0" apply false - id("org.owasp.dependencycheck") version "8.2.1" + id("org.owasp.dependencycheck") version "10.0.4" } subprojects { From e85b7868547d82204ea4b343fe4718197406bb7e Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Tue, 17 Sep 2024 13:16:21 +0530 Subject: [PATCH 10/10] Update version --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index 98157ed4..831a64f0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { id("org.hypertrace.ci-utils-plugin") version "0.3.0" id("org.hypertrace.publish-plugin") version "1.0.2" apply false id("org.hypertrace.code-style-plugin") version "1.1.0" apply false - id("org.owasp.dependencycheck") version "10.0.4" + id("org.owasp.dependencycheck") version "8.4.3" } subprojects {