diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/AggregatePipelineMode.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/AggregatePipelineMode.java index 2958e24a..8bda5591 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/AggregatePipelineMode.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/AggregatePipelineMode.java @@ -2,5 +2,5 @@ public enum AggregatePipelineMode { DEFAULT_ALWAYS, - SORT_OPTIMIZED_IF_POSSIBLE + SORT_OPTIMIZED_IF_POSSIBLE, } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java index d5a1e776..00ebde33 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java @@ -10,22 +10,25 @@ import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig.ConnectionPoolConfigBuilder; import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder; import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder; +import org.hypertrace.core.documentstore.model.options.DataFreshness; @Value public class TypesafeConfigDatastoreConfigExtractor { - private static final String HOST_KEY = "host"; - private static final String PORT_KEY = "port"; - private static final String ENDPOINTS_KEY = "endpoints"; - private static final String AUTH_DB_KEY = "authDb"; - private static final String REPLICA_SET_KEY = "replicaSet"; - private static final String DATABASE_KEY = "database"; - private static final String USER_KEY = "user"; - private static final String PASSWORD_KEY = "password"; - private static final String APP_NAME_KEY = "appName"; - private static final String MAX_POOL_SIZE_KEY = "maxPoolSize"; - private static final String CONNECTION_ACCESS_TIMEOUT_KEY = "connectionAccessTimeout"; - private static final String CONNECTION_IDLE_TIME_KEY = "connectionIdleTime"; - private static final String AGGREGATION_PIPELINE_MODE = "aggregationPipelineMode"; + private static final String DEFAULT_HOST_KEY = "host"; + private static final String DEFAULT_PORT_KEY = "port"; + private static final String DEFAULT_ENDPOINTS_KEY = "endpoints"; + private static final String DEFAULT_AUTH_DB_KEY = "authDb"; + private static final String DEFAULT_REPLICA_SET_KEY = "replicaSet"; + private static final String DEFAULT_DATABASE_KEY = "database"; + private static final String DEFAULT_USER_KEY = "user"; + private static final String DEFAULT_PASSWORD_KEY = "password"; + private static final String DEFAULT_APP_NAME_KEY = "appName"; + private static final String DEFAULT_MAX_POOL_SIZE_KEY = "maxPoolSize"; + private static final String DEFAULT_CONNECTION_ACCESS_TIMEOUT_KEY = "connectionAccessTimeout"; + private static final String DEFAULT_CONNECTION_IDLE_TIME_KEY = "connectionIdleTime"; + private static final String DEFAULT_AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode"; + private static final String DEFAULT_DATA_FRESHNESS_KEY = "dataFreshness"; + private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout"; @NonNull Config config; DatastoreConfigBuilder datastoreConfigBuilder; @@ -56,19 +59,22 @@ private TypesafeConfigDatastoreConfigExtractor( this.endpointBuilder = Endpoint.builder(); final String dataStoreType = type.type(); - this.hostKey(dataStoreType + "." + HOST_KEY) - .portKey(dataStoreType + "." + PORT_KEY) - .keysForEndpoints(dataStoreType + "." + ENDPOINTS_KEY, HOST_KEY, PORT_KEY) - .authDatabaseKey(dataStoreType + "." + AUTH_DB_KEY) - .replicaSetKey(dataStoreType + "." + REPLICA_SET_KEY) - .databaseKey(dataStoreType + "." + DATABASE_KEY) - .usernameKey(dataStoreType + "." + USER_KEY) - .passwordKey(dataStoreType + "." + PASSWORD_KEY) - .applicationNameKey(APP_NAME_KEY) - .poolMaxConnectionsKey(MAX_POOL_SIZE_KEY) - .poolConnectionAccessTimeoutKey(CONNECTION_ACCESS_TIMEOUT_KEY) - .poolConnectionSurrenderTimeoutKey(CONNECTION_IDLE_TIME_KEY) - .aggregationPipelineMode(AGGREGATION_PIPELINE_MODE); + this.hostKey(dataStoreType + "." + DEFAULT_HOST_KEY) + .portKey(dataStoreType + "." + DEFAULT_PORT_KEY) + .keysForEndpoints( + dataStoreType + "." + DEFAULT_ENDPOINTS_KEY, DEFAULT_HOST_KEY, DEFAULT_PORT_KEY) + .authDatabaseKey(dataStoreType + "." + DEFAULT_AUTH_DB_KEY) + .replicaSetKey(dataStoreType + "." + DEFAULT_REPLICA_SET_KEY) + .databaseKey(dataStoreType + "." + DEFAULT_DATABASE_KEY) + .usernameKey(dataStoreType + "." + DEFAULT_USER_KEY) + .passwordKey(dataStoreType + "." + DEFAULT_PASSWORD_KEY) + .applicationNameKey(DEFAULT_APP_NAME_KEY) + .poolMaxConnectionsKey(DEFAULT_MAX_POOL_SIZE_KEY) + .poolConnectionAccessTimeoutKey(DEFAULT_CONNECTION_ACCESS_TIMEOUT_KEY) + .poolConnectionSurrenderTimeoutKey(DEFAULT_CONNECTION_IDLE_TIME_KEY) + .aggregationPipelineMode(DEFAULT_AGGREGATION_PIPELINE_MODE_KEY) + .dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY) + .queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY); } public static TypesafeConfigDatastoreConfigExtractor from( @@ -190,10 +196,24 @@ public TypesafeConfigDatastoreConfigExtractor aggregationPipelineMode(@NonNull f if (config.hasPath(key)) { connectionConfigBuilder.aggregationPipelineMode( AggregatePipelineMode.valueOf(config.getString(key))); - return this; } - connectionConfigBuilder.aggregationPipelineMode(AggregatePipelineMode.DEFAULT_ALWAYS); + return this; + } + + public TypesafeConfigDatastoreConfigExtractor dataFreshnessKey(@NonNull final String key) { + if (config.hasPath(key)) { + connectionConfigBuilder.dataFreshness(DataFreshness.valueOf(config.getString(key))); + } + + return this; + } + + public TypesafeConfigDatastoreConfigExtractor queryTimeoutKey(@NonNull final String key) { + if (config.hasPath(key)) { + connectionConfigBuilder.queryTimeout(config.getDuration(key)); + } + return this; } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java index a1d2b2aa..222dc731 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java @@ -1,6 +1,8 @@ package org.hypertrace.core.documentstore.model.config; import static java.util.Map.entry; +import static org.hypertrace.core.documentstore.model.config.AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -9,6 +11,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import org.hypertrace.core.documentstore.model.options.DataFreshness; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -27,7 +30,9 @@ class TypesafeConfigDatastoreConfigExtractorTest { private static final String MAX_CONNECTIONS_KEY = "maxConnectionsKey"; private static final String CONNECTION_ACCESS_TIMEOUT_KEY = "connectionAccessTimeout"; private static final String CONNECTION_SURRENDER_TIMEOUT_KEY = "connectionSurrenderTimeout"; - private static final String AGGREGATION_PIPELINE_MODE = "aggregationPipelineMode"; + private static final String AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode"; + private static final String DATA_FRESHNESS_KEY = "dataFreshness"; + private static final String QUERY_TIMEOUT_KEY = "queryTimeout"; private static final String host = "red.planet"; private static final String host1 = "RED_PLANET"; @@ -42,6 +47,9 @@ class TypesafeConfigDatastoreConfigExtractorTest { private static final int maxConnections = 7; private static final Duration accessTimeout = Duration.ofSeconds(67); private static final Duration surrenderTimeout = Duration.ofSeconds(56); + private static final AggregatePipelineMode aggregatePipelineMode = SORT_OPTIMIZED_IF_POSSIBLE; + private static final DataFreshness dataFreshness = NEAR_REALTIME_FRESHNESS; + private static final Duration queryTimeout = Duration.ofSeconds(45); @SuppressWarnings("ConstantConditions") @Test @@ -98,6 +106,9 @@ void testBuildMongo() { .poolMaxConnectionsKey(MAX_CONNECTIONS_KEY) .poolConnectionAccessTimeoutKey(CONNECTION_ACCESS_TIMEOUT_KEY) .poolConnectionSurrenderTimeoutKey(CONNECTION_SURRENDER_TIMEOUT_KEY) + .aggregationPipelineMode(AGGREGATION_PIPELINE_MODE_KEY) + .dataFreshnessKey(DATA_FRESHNESS_KEY) + .queryTimeoutKey(QUERY_TIMEOUT_KEY) .extract() .connectionConfig(); final ConnectionConfig expected = @@ -119,7 +130,10 @@ void testBuildMongo() { .connectionAccessTimeout(accessTimeout) .connectionSurrenderTimeout(surrenderTimeout) .build()) - .aggregationPipelineMode(AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE) + .aggregationPipelineMode(aggregatePipelineMode) + .dataFreshness(dataFreshness) + .aggregationPipelineMode(aggregatePipelineMode) + .queryTimeout(queryTimeout) .build(); assertEquals(expected, config); @@ -160,7 +174,7 @@ void testBuildPostgres() { .connectionAccessTimeout(accessTimeout) .connectionSurrenderTimeout(surrenderTimeout) .build()) - .aggregationPipelineMode(AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE) + .aggregationPipelineMode(SORT_OPTIMIZED_IF_POSSIBLE) .build(); assertEquals(expected, config); @@ -274,6 +288,9 @@ void testBuildMongoUsingDefaultKeys() { .connectionSurrenderTimeout(surrenderTimeout) .build()) .replicaSet(replicaSet) + .aggregationPipelineMode(aggregatePipelineMode) + .dataFreshness(dataFreshness) + .queryTimeout(queryTimeout) .build(); assertEquals(expected, config); @@ -318,7 +335,9 @@ private Config buildConfigMap() { entry(MAX_CONNECTIONS_KEY, maxConnections), entry(CONNECTION_ACCESS_TIMEOUT_KEY, accessTimeout), entry(CONNECTION_SURRENDER_TIMEOUT_KEY, surrenderTimeout), - entry(AGGREGATION_PIPELINE_MODE, "SORT_OPTIMIZED_IF_POSSIBLE"))); + entry(AGGREGATION_PIPELINE_MODE_KEY, SORT_OPTIMIZED_IF_POSSIBLE.name()), + entry(DATA_FRESHNESS_KEY, NEAR_REALTIME_FRESHNESS.name()), + entry(QUERY_TIMEOUT_KEY, queryTimeout))); } private Config buildConfigMapWithDefaultKeysForPostgres() { @@ -350,7 +369,10 @@ private Config buildConfigMapUsingDefaultKeysForMongo() { entry("mongo.replicaSet", replicaSet), entry("maxPoolSize", maxConnections), entry("connectionAccessTimeout", accessTimeout), - entry("connectionIdleTime", surrenderTimeout))); + entry("connectionIdleTime", surrenderTimeout), + entry("aggregationPipelineMode", aggregatePipelineMode.name()), + entry("dataFreshness", dataFreshness.name()), + entry("queryTimeout", queryTimeout))); } private Config buildPostgresConfigMapUsingEndpointsKey() {