Skip to content

Commit

Permalink
chore | optimize aggregate pipelines queries to use optimized indexes…
Browse files Browse the repository at this point in the history
… for trivial use cases (#199)

chore |  optimize aggregate pipelines queries to use optimized indexes for trivial use cases
  • Loading branch information
aman-bansal authored Jun 24, 2024
1 parent 300dae9 commit 43b12e8
Show file tree
Hide file tree
Showing 25 changed files with 637 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@

@Testcontainers
public class DocStoreQueryV1Test {

private static final String COLLECTION_NAME = "myTest";
private static final String UPDATABLE_COLLECTION_NAME = "updatable_collection";

Expand All @@ -132,6 +133,7 @@ public static void init() throws IOException {
Map<String, String> mongoConfig = new HashMap<>();
mongoConfig.putIfAbsent("host", "localhost");
mongoConfig.putIfAbsent("port", mongo.getMappedPort(27017).toString());
mongoConfig.putIfAbsent("isSortOptimizedQueryEnabled", "true");
Config config = ConfigFactory.parseMap(mongoConfig);

Datastore mongoDatastore = DatastoreProvider.getDatastore("Mongo", config);
Expand Down Expand Up @@ -181,20 +183,23 @@ public static void shutdown() {
}

private static class AllProvider implements ArgumentsProvider {

@Override
public Stream<Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(Arguments.of(MONGO_STORE), Arguments.of(POSTGRES_STORE));
}
}

private static class MongoProvider implements ArgumentsProvider {

@Override
public Stream<Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(Arguments.of(MONGO_STORE));
}
}

private static class PostgresProvider implements ArgumentsProvider {

@Override
public Stream<Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(Arguments.of(POSTGRES_STORE));
Expand Down Expand Up @@ -1321,6 +1326,7 @@ public void testQueryV1AggregationFilterWithWhereClause(String dataStoreName) th

@Nested
class StartsWithOperatorTest {

@ParameterizedTest
@ArgumentsSource(AllProvider.class)
public void testWithUnnestingAndRegularFilters(final String datastoreName) throws IOException {
Expand Down Expand Up @@ -1398,6 +1404,7 @@ public void testRequirementForUsingIndex_ShouldBeCaseSensitive(final String data

@Nested
class ContainsOperatorTest {

@ParameterizedTest
@ArgumentsSource(AllProvider.class)
public void testContains(final String datastoreName) throws IOException {
Expand Down Expand Up @@ -1945,6 +1952,7 @@ public void testAtomicCreateOrReplaceAndReturn(final String datastoreName)

@Nested
class KeyFilterTest {

@ParameterizedTest
@ArgumentsSource(AllProvider.class)
public void testFindWithSingleKey(final String datastoreName) throws IOException {
Expand Down Expand Up @@ -2111,6 +2119,7 @@ public void testAggregateWithZeroLimitAndOffset(final String datastoreName) thro

@Nested
class AtomicUpdateTest {

@ParameterizedTest
@ArgumentsSource(AllProvider.class)
public void testAtomicUpdateWithFilter(final String datastoreName)
Expand Down Expand Up @@ -2399,6 +2408,7 @@ public void testAtomicUpdateDocumentWithoutSelections(final String datastoreName

@Nested
class UpdateOperatorTest {

@ParameterizedTest
@ArgumentsSource(AllProvider.class)
void testUpdateSetEmptyObject(final String datastoreName) throws IOException {
Expand Down Expand Up @@ -2901,6 +2911,7 @@ private void assertExceptionForNonNumericValues(

@Nested
class BulkUpdateTest {

@ParameterizedTest
@ArgumentsSource(AllProvider.class)
void testBulkUpdateWithFilterAndGetNoDocuments(final String datastoreName) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.typesafe.config.Config;
import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode;
import org.hypertrace.core.documentstore.model.config.DatabaseType;
import org.hypertrace.core.documentstore.model.config.DatastoreConfig;
import org.hypertrace.core.documentstore.model.config.TypesafeConfigDatastoreConfigExtractor;
Expand All @@ -13,14 +14,17 @@

@Deprecated(forRemoval = true)
interface TypesafeDatastoreConfigAdapter {

DatastoreConfig convert(final Config config);

@Deprecated(forRemoval = true)
class MongoTypesafeDatastoreConfigAdapter implements TypesafeDatastoreConfigAdapter {

@Override
public DatastoreConfig convert(final Config config) {
final MongoConnectionConfig overridingConnectionConfig =
new MongoConnectionConfig(emptyList(), null, null, "", null, null) {
new MongoConnectionConfig(
emptyList(), null, null, "", null, null, AggregatePipelineMode.DEFAULT_ALWAYS) {
public MongoClientSettings toSettings() {
final MongoClientSettings.Builder settingsBuilder =
MongoClientSettings.builder()
Expand Down Expand Up @@ -50,6 +54,7 @@ private ConnectionString toConnectionString() {

@Deprecated(forRemoval = true)
class PostgresTypesafeDatastoreConfigAdapter implements TypesafeDatastoreConfigAdapter {

@Override
public DatastoreConfig convert(final Config config) {
final PostgresConnectionConfig connectionConfig =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.hypertrace.core.documentstore.model.config;

public enum AggregatePipelineMode {
DEFAULT_ALWAYS,
SORT_OPTIMIZED_IF_POSSIBLE
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ public class ConnectionConfig {
@Singular @NonNull List<@NonNull Endpoint> endpoints;
@NonNull String database;
@Nullable ConnectionCredentials credentials;
@NonNull AggregatePipelineMode aggregationPipelineMode;

public ConnectionConfig(
@NonNull List<@NonNull Endpoint> endpoints,
@NonNull String database,
@Nullable ConnectionCredentials credentials) {
this(endpoints, database, credentials, AggregatePipelineMode.DEFAULT_ALWAYS);
}

public static ConnectionConfigBuilder builder() {
return new ConnectionConfigBuilder();
Expand All @@ -46,6 +54,7 @@ public static class ConnectionConfigBuilder {
String applicationName = DEFAULT_APP_NAME;
String replicaSet;
ConnectionPoolConfig connectionPoolConfig;
AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS;

public ConnectionConfigBuilder type(final DatabaseType type) {
this.type = type;
Expand All @@ -72,7 +81,8 @@ public ConnectionConfig build() {
credentials,
applicationName,
replicaSet,
connectionPoolConfig);
connectionPoolConfig,
aggregationPipelineMode);

case POSTGRES:
return new PostgresConnectionConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class TypesafeConfigDatastoreConfigExtractor {
private static final String MAX_POOL_SIZE_KEY = "maxPoolSize";
private static final String CONNECTION_ACCESS_TIMEOUT_KEY = "connectionAccessTimeout";
private static final String CONNECTION_IDLE_TIME_KEY = "connectionIdleTime";
private static final String AGGREGATION_PIPELINE_MODE = "aggregationPipelineMode";

@NonNull Config config;
DatastoreConfigBuilder datastoreConfigBuilder;
Expand Down Expand Up @@ -66,7 +67,8 @@ private TypesafeConfigDatastoreConfigExtractor(
.applicationNameKey(APP_NAME_KEY)
.poolMaxConnectionsKey(MAX_POOL_SIZE_KEY)
.poolConnectionAccessTimeoutKey(CONNECTION_ACCESS_TIMEOUT_KEY)
.poolConnectionSurrenderTimeoutKey(CONNECTION_IDLE_TIME_KEY);
.poolConnectionSurrenderTimeoutKey(CONNECTION_IDLE_TIME_KEY)
.aggregationPipelineMode(AGGREGATION_PIPELINE_MODE);
}

public static TypesafeConfigDatastoreConfigExtractor from(
Expand Down Expand Up @@ -184,6 +186,17 @@ public TypesafeConfigDatastoreConfigExtractor poolConnectionSurrenderTimeoutKey(
return this;
}

public TypesafeConfigDatastoreConfigExtractor aggregationPipelineMode(@NonNull final String key) {
if (config.hasPath(key)) {
connectionConfigBuilder.aggregationPipelineMode(
AggregatePipelineMode.valueOf(config.getString(key)));
return this;
}

connectionConfigBuilder.aggregationPipelineMode(AggregatePipelineMode.DEFAULT_ALWAYS);
return this;
}

public DatastoreConfig extract() {
if (connectionConfigBuilder.endpoints().isEmpty()
&& !Endpoint.builder().build().equals(endpointBuilder.build())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Value;
import lombok.experimental.Accessors;
import lombok.experimental.NonFinal;
import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode;
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.config.ConnectionCredentials;
import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig;
Expand All @@ -47,11 +48,13 @@ public MongoConnectionConfig(
@Nullable final ConnectionCredentials credentials,
@NonNull final String applicationName,
@Nullable final String replicaSetName,
@Nullable final ConnectionPoolConfig connectionPoolConfig) {
@Nullable final ConnectionPoolConfig connectionPoolConfig,
AggregatePipelineMode aggregationPipelineMode) {
super(
ensureAtLeastOneEndpoint(endpoints),
getDatabaseOrDefault(database),
getCredentialsOrDefault(credentials, database));
getCredentialsOrDefault(credentials, database),
aggregationPipelineMode);
this.applicationName = applicationName;
this.replicaSetName = replicaSetName;
this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.hypertrace.core.documentstore.Filter;
import org.hypertrace.core.documentstore.Key;
import org.hypertrace.core.documentstore.Query;
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
import org.hypertrace.core.documentstore.mongo.query.MongoQueryExecutor;
Expand Down Expand Up @@ -109,10 +110,12 @@ && allBulkWriteErrorsAreDueToDuplicateKey((MongoBulkWriteException) failure))
.withMaxRetries(MAX_RETRY_ATTEMPTS_FOR_DUPLICATE_KEY_ISSUE);
private final String IF_NULL_CLAUSE = "$ifNull";

MongoCollection(com.mongodb.client.MongoCollection<BasicDBObject> collection) {
MongoCollection(
com.mongodb.client.MongoCollection<BasicDBObject> collection,
ConnectionConfig connectionConfig) {
this.collection = collection;
this.queryExecutor = new MongoQueryExecutor(collection);
this.updateExecutor = new MongoUpdateExecutor(collection);
this.queryExecutor = new MongoQueryExecutor(collection, connectionConfig);
this.updateExecutor = new MongoUpdateExecutor(collection, connectionConfig);
}

/**
Expand Down Expand Up @@ -713,6 +716,7 @@ private CloseableIterator<Document> convertToDocumentIterator(MongoCursor<BasicD
}

static class MongoResultsIterator implements CloseableIterator<Document> {

private final MongoCursor<BasicDBObject> cursor;
private boolean closed = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
public class MongoDatastore implements Datastore {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDatastore.class);

private final ConnectionConfig connectionConfig;
private final MongoClient client;
private final MongoDatabase database;
private final DocStoreMetricProvider docStoreMetricProvider;

public MongoDatastore(final DatastoreConfig datastoreConfig) {
final ConnectionConfig connectionConfig = datastoreConfig.connectionConfig();
connectionConfig = datastoreConfig.connectionConfig();

if (!(connectionConfig instanceof MongoConnectionConfig)) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -79,7 +80,8 @@ public boolean deleteCollection(String collectionName) {

@Override
public Collection getCollection(String collectionName) {
return new MongoCollection(database.getCollection(collectionName, BasicDBObject.class));
return new MongoCollection(
database.getCollection(collectionName, BasicDBObject.class), connectionConfig);
}

@Override
Expand Down
Loading

0 comments on commit 43b12e8

Please sign in to comment.