Skip to content

Commit

Permalink
Introduce configurability of the new parameters (#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
suresh-prakash authored Sep 26, 2024
1 parent 0dc1bec commit 9b4eb6e
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

public enum AggregatePipelineMode {
DEFAULT_ALWAYS,
SORT_OPTIMIZED_IF_POSSIBLE
SORT_OPTIMIZED_IF_POSSIBLE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,25 @@
import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig.ConnectionPoolConfigBuilder;
import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder;
import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder;
import org.hypertrace.core.documentstore.model.options.DataFreshness;

@Value
public class TypesafeConfigDatastoreConfigExtractor {
private static final String HOST_KEY = "host";
private static final String PORT_KEY = "port";
private static final String ENDPOINTS_KEY = "endpoints";
private static final String AUTH_DB_KEY = "authDb";
private static final String REPLICA_SET_KEY = "replicaSet";
private static final String DATABASE_KEY = "database";
private static final String USER_KEY = "user";
private static final String PASSWORD_KEY = "password";
private static final String APP_NAME_KEY = "appName";
private static final String MAX_POOL_SIZE_KEY = "maxPoolSize";
private static final String CONNECTION_ACCESS_TIMEOUT_KEY = "connectionAccessTimeout";
private static final String CONNECTION_IDLE_TIME_KEY = "connectionIdleTime";
private static final String AGGREGATION_PIPELINE_MODE = "aggregationPipelineMode";
private static final String DEFAULT_HOST_KEY = "host";
private static final String DEFAULT_PORT_KEY = "port";
private static final String DEFAULT_ENDPOINTS_KEY = "endpoints";
private static final String DEFAULT_AUTH_DB_KEY = "authDb";
private static final String DEFAULT_REPLICA_SET_KEY = "replicaSet";
private static final String DEFAULT_DATABASE_KEY = "database";
private static final String DEFAULT_USER_KEY = "user";
private static final String DEFAULT_PASSWORD_KEY = "password";
private static final String DEFAULT_APP_NAME_KEY = "appName";
private static final String DEFAULT_MAX_POOL_SIZE_KEY = "maxPoolSize";
private static final String DEFAULT_CONNECTION_ACCESS_TIMEOUT_KEY = "connectionAccessTimeout";
private static final String DEFAULT_CONNECTION_IDLE_TIME_KEY = "connectionIdleTime";
private static final String DEFAULT_AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode";
private static final String DEFAULT_DATA_FRESHNESS_KEY = "dataFreshness";
private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout";

@NonNull Config config;
DatastoreConfigBuilder datastoreConfigBuilder;
Expand Down Expand Up @@ -56,19 +59,22 @@ private TypesafeConfigDatastoreConfigExtractor(
this.endpointBuilder = Endpoint.builder();

final String dataStoreType = type.type();
this.hostKey(dataStoreType + "." + HOST_KEY)
.portKey(dataStoreType + "." + PORT_KEY)
.keysForEndpoints(dataStoreType + "." + ENDPOINTS_KEY, HOST_KEY, PORT_KEY)
.authDatabaseKey(dataStoreType + "." + AUTH_DB_KEY)
.replicaSetKey(dataStoreType + "." + REPLICA_SET_KEY)
.databaseKey(dataStoreType + "." + DATABASE_KEY)
.usernameKey(dataStoreType + "." + USER_KEY)
.passwordKey(dataStoreType + "." + PASSWORD_KEY)
.applicationNameKey(APP_NAME_KEY)
.poolMaxConnectionsKey(MAX_POOL_SIZE_KEY)
.poolConnectionAccessTimeoutKey(CONNECTION_ACCESS_TIMEOUT_KEY)
.poolConnectionSurrenderTimeoutKey(CONNECTION_IDLE_TIME_KEY)
.aggregationPipelineMode(AGGREGATION_PIPELINE_MODE);
this.hostKey(dataStoreType + "." + DEFAULT_HOST_KEY)
.portKey(dataStoreType + "." + DEFAULT_PORT_KEY)
.keysForEndpoints(
dataStoreType + "." + DEFAULT_ENDPOINTS_KEY, DEFAULT_HOST_KEY, DEFAULT_PORT_KEY)
.authDatabaseKey(dataStoreType + "." + DEFAULT_AUTH_DB_KEY)
.replicaSetKey(dataStoreType + "." + DEFAULT_REPLICA_SET_KEY)
.databaseKey(dataStoreType + "." + DEFAULT_DATABASE_KEY)
.usernameKey(dataStoreType + "." + DEFAULT_USER_KEY)
.passwordKey(dataStoreType + "." + DEFAULT_PASSWORD_KEY)
.applicationNameKey(DEFAULT_APP_NAME_KEY)
.poolMaxConnectionsKey(DEFAULT_MAX_POOL_SIZE_KEY)
.poolConnectionAccessTimeoutKey(DEFAULT_CONNECTION_ACCESS_TIMEOUT_KEY)
.poolConnectionSurrenderTimeoutKey(DEFAULT_CONNECTION_IDLE_TIME_KEY)
.aggregationPipelineMode(DEFAULT_AGGREGATION_PIPELINE_MODE_KEY)
.dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY)
.queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY);
}

public static TypesafeConfigDatastoreConfigExtractor from(
Expand Down Expand Up @@ -190,10 +196,24 @@ public TypesafeConfigDatastoreConfigExtractor aggregationPipelineMode(@NonNull f
if (config.hasPath(key)) {
connectionConfigBuilder.aggregationPipelineMode(
AggregatePipelineMode.valueOf(config.getString(key)));
return this;
}

connectionConfigBuilder.aggregationPipelineMode(AggregatePipelineMode.DEFAULT_ALWAYS);
return this;
}

public TypesafeConfigDatastoreConfigExtractor dataFreshnessKey(@NonNull final String key) {
if (config.hasPath(key)) {
connectionConfigBuilder.dataFreshness(DataFreshness.valueOf(config.getString(key)));
}

return this;
}

public TypesafeConfigDatastoreConfigExtractor queryTimeoutKey(@NonNull final String key) {
if (config.hasPath(key)) {
connectionConfigBuilder.queryTimeout(config.getDuration(key));
}

return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.hypertrace.core.documentstore.model.config;

import static java.util.Map.entry;
import static org.hypertrace.core.documentstore.model.config.AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE;
import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand All @@ -9,6 +11,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.hypertrace.core.documentstore.model.options.DataFreshness;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -27,7 +30,9 @@ class TypesafeConfigDatastoreConfigExtractorTest {
private static final String MAX_CONNECTIONS_KEY = "maxConnectionsKey";
private static final String CONNECTION_ACCESS_TIMEOUT_KEY = "connectionAccessTimeout";
private static final String CONNECTION_SURRENDER_TIMEOUT_KEY = "connectionSurrenderTimeout";
private static final String AGGREGATION_PIPELINE_MODE = "aggregationPipelineMode";
private static final String AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode";
private static final String DATA_FRESHNESS_KEY = "dataFreshness";
private static final String QUERY_TIMEOUT_KEY = "queryTimeout";

private static final String host = "red.planet";
private static final String host1 = "RED_PLANET";
Expand All @@ -42,6 +47,9 @@ class TypesafeConfigDatastoreConfigExtractorTest {
private static final int maxConnections = 7;
private static final Duration accessTimeout = Duration.ofSeconds(67);
private static final Duration surrenderTimeout = Duration.ofSeconds(56);
private static final AggregatePipelineMode aggregatePipelineMode = SORT_OPTIMIZED_IF_POSSIBLE;
private static final DataFreshness dataFreshness = NEAR_REALTIME_FRESHNESS;
private static final Duration queryTimeout = Duration.ofSeconds(45);

@SuppressWarnings("ConstantConditions")
@Test
Expand Down Expand Up @@ -98,6 +106,9 @@ void testBuildMongo() {
.poolMaxConnectionsKey(MAX_CONNECTIONS_KEY)
.poolConnectionAccessTimeoutKey(CONNECTION_ACCESS_TIMEOUT_KEY)
.poolConnectionSurrenderTimeoutKey(CONNECTION_SURRENDER_TIMEOUT_KEY)
.aggregationPipelineMode(AGGREGATION_PIPELINE_MODE_KEY)
.dataFreshnessKey(DATA_FRESHNESS_KEY)
.queryTimeoutKey(QUERY_TIMEOUT_KEY)
.extract()
.connectionConfig();
final ConnectionConfig expected =
Expand All @@ -119,7 +130,10 @@ void testBuildMongo() {
.connectionAccessTimeout(accessTimeout)
.connectionSurrenderTimeout(surrenderTimeout)
.build())
.aggregationPipelineMode(AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE)
.aggregationPipelineMode(aggregatePipelineMode)
.dataFreshness(dataFreshness)
.aggregationPipelineMode(aggregatePipelineMode)
.queryTimeout(queryTimeout)
.build();

assertEquals(expected, config);
Expand Down Expand Up @@ -160,7 +174,7 @@ void testBuildPostgres() {
.connectionAccessTimeout(accessTimeout)
.connectionSurrenderTimeout(surrenderTimeout)
.build())
.aggregationPipelineMode(AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE)
.aggregationPipelineMode(SORT_OPTIMIZED_IF_POSSIBLE)
.build();

assertEquals(expected, config);
Expand Down Expand Up @@ -274,6 +288,9 @@ void testBuildMongoUsingDefaultKeys() {
.connectionSurrenderTimeout(surrenderTimeout)
.build())
.replicaSet(replicaSet)
.aggregationPipelineMode(aggregatePipelineMode)
.dataFreshness(dataFreshness)
.queryTimeout(queryTimeout)
.build();

assertEquals(expected, config);
Expand Down Expand Up @@ -318,7 +335,9 @@ private Config buildConfigMap() {
entry(MAX_CONNECTIONS_KEY, maxConnections),
entry(CONNECTION_ACCESS_TIMEOUT_KEY, accessTimeout),
entry(CONNECTION_SURRENDER_TIMEOUT_KEY, surrenderTimeout),
entry(AGGREGATION_PIPELINE_MODE, "SORT_OPTIMIZED_IF_POSSIBLE")));
entry(AGGREGATION_PIPELINE_MODE_KEY, SORT_OPTIMIZED_IF_POSSIBLE.name()),
entry(DATA_FRESHNESS_KEY, NEAR_REALTIME_FRESHNESS.name()),
entry(QUERY_TIMEOUT_KEY, queryTimeout)));
}

private Config buildConfigMapWithDefaultKeysForPostgres() {
Expand Down Expand Up @@ -350,7 +369,10 @@ private Config buildConfigMapUsingDefaultKeysForMongo() {
entry("mongo.replicaSet", replicaSet),
entry("maxPoolSize", maxConnections),
entry("connectionAccessTimeout", accessTimeout),
entry("connectionIdleTime", surrenderTimeout)));
entry("connectionIdleTime", surrenderTimeout),
entry("aggregationPipelineMode", aggregatePipelineMode.name()),
entry("dataFreshness", dataFreshness.name()),
entry("queryTimeout", queryTimeout)));
}

private Config buildPostgresConfigMapUsingEndpointsKey() {
Expand Down

0 comments on commit 9b4eb6e

Please sign in to comment.