Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced data freshness with the MongoDB implementation at connection-level and at query-level #209

Merged
merged 11 commits into from
Sep 19, 2024
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
id("org.hypertrace.ci-utils-plugin") version "0.3.0"
id("org.hypertrace.publish-plugin") version "1.0.2" apply false
id("org.hypertrace.code-style-plugin") version "1.1.0" apply false
id("org.owasp.dependencycheck") version "8.2.1"
id("org.owasp.dependencycheck") version "8.4.3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for not moving to latest version - 10.0.4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that version. It's failing (mostly due to Java version incompatibility).

}

subprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private static void initializeAndConnectToPostgres() {

private static void initializeAndConnectToMongo() {
mongo =
new GenericContainer<>(DockerImageName.parse("mongo:4.4.0"))
new GenericContainer<>(DockerImageName.parse("mongo:7.0.14"))
.withExposedPorts(27017)
.waitingFor(Wait.forListeningPort());
mongo.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class DocStoreQueryV1Test {
public static void init() throws IOException {
datastoreMap = Maps.newHashMap();
mongo =
new GenericContainer<>(DockerImageName.parse("mongo:4.4.0"))
new GenericContainer<>(DockerImageName.parse("mongo:7.0.14"))
.withExposedPorts(27017)
.waitingFor(Wait.forListeningPort());
mongo.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class DocStoreTest {
public static void init() {
datastoreMap = Maps.newHashMap();
mongo =
new GenericContainer<>(DockerImageName.parse("mongo:4.4.0"))
new GenericContainer<>(DockerImageName.parse("mongo:7.0.14"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the MongoDB version 7.0.14? Is this the same version that has been deployed?

.withExposedPorts(27017)
.waitingFor(Wait.forListeningPort());
mongo.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class MongoDocStoreTest {
@BeforeAll
public static void init() {
mongo =
new GenericContainer<>(DockerImageName.parse("mongo:4.4.0"))
new GenericContainer<>(DockerImageName.parse("mongo:7.0.14"))
.withExposedPorts(27017)
.waitingFor(Wait.forListeningPort());
mongo.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Set;
import org.hypertrace.core.documentstore.expression.impl.KeyExpression;
import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException;
import org.hypertrace.core.documentstore.model.options.QueryOptions;
import org.hypertrace.core.documentstore.model.options.UpdateOptions;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;

Expand Down Expand Up @@ -127,6 +128,16 @@ public interface Collection {
*/
CloseableIterator<Document> aggregate(final org.hypertrace.core.documentstore.query.Query query);

/**
* Query the documents conforming to the query specification.
*
* @param query The query specification
* @param queryOptions The query options
* @return {@link CloseableIterator} of matching documents
*/
CloseableIterator<Document> query(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: the existing query api deprecated?

Copy link
Contributor Author

@suresh-prakash suresh-prakash Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think that's required. If clients want to query with the default configurations, the older method can be used.

final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions);

/**
* Delete the document with the given key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.hypertrace.core.documentstore.model.config.TypesafeConfigDatastoreConfigExtractor;
import org.hypertrace.core.documentstore.model.config.mongo.MongoConnectionConfig;
import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig;
import org.hypertrace.core.documentstore.model.options.DataFreshness;

@Deprecated(forRemoval = true)
interface TypesafeDatastoreConfigAdapter {
Expand All @@ -24,7 +25,14 @@ class MongoTypesafeDatastoreConfigAdapter implements TypesafeDatastoreConfigAdap
public DatastoreConfig convert(final Config config) {
final MongoConnectionConfig overridingConnectionConfig =
new MongoConnectionConfig(
emptyList(), null, null, "", null, null, AggregatePipelineMode.DEFAULT_ALWAYS) {
emptyList(),
null,
null,
"",
null,
null,
AggregatePipelineMode.DEFAULT_ALWAYS,
DataFreshness.SYSTEM_DEFAULT) {
public MongoClientSettings toSettings() {
final MongoClientSettings.Builder settingsBuilder =
MongoClientSettings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.experimental.NonFinal;
import org.hypertrace.core.documentstore.model.config.mongo.MongoConnectionConfig;
import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig;
import org.hypertrace.core.documentstore.model.options.DataFreshness;

@Value
@NonFinal
Expand All @@ -30,12 +31,18 @@ public class ConnectionConfig {
@NonNull String database;
@Nullable ConnectionCredentials credentials;
@NonNull AggregatePipelineMode aggregationPipelineMode;
@NonNull DataFreshness dataFreshness;

public ConnectionConfig(
@NonNull List<@NonNull Endpoint> endpoints,
@NonNull String database,
@Nullable ConnectionCredentials credentials) {
this(endpoints, database, credentials, AggregatePipelineMode.DEFAULT_ALWAYS);
this(
endpoints,
database,
credentials,
AggregatePipelineMode.DEFAULT_ALWAYS,
DataFreshness.SYSTEM_DEFAULT);
}

public static ConnectionConfigBuilder builder() {
Expand All @@ -55,6 +62,7 @@ public static class ConnectionConfigBuilder {
String replicaSet;
ConnectionPoolConfig connectionPoolConfig;
AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS;
DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;

public ConnectionConfigBuilder type(final DatabaseType type) {
this.type = type;
Expand Down Expand Up @@ -82,7 +90,8 @@ public ConnectionConfig build() {
applicationName,
replicaSet,
connectionPoolConfig,
aggregationPipelineMode);
aggregationPipelineMode,
dataFreshness);

case POSTGRES:
return new PostgresConnectionConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig;
import org.hypertrace.core.documentstore.model.config.DatabaseType;
import org.hypertrace.core.documentstore.model.config.Endpoint;
import org.hypertrace.core.documentstore.model.options.DataFreshness;

@Value
@NonFinal
Expand All @@ -49,12 +50,14 @@ public MongoConnectionConfig(
@NonNull final String applicationName,
@Nullable final String replicaSetName,
@Nullable final ConnectionPoolConfig connectionPoolConfig,
AggregatePipelineMode aggregationPipelineMode) {
@NonNull final AggregatePipelineMode aggregationPipelineMode,
@NonNull final DataFreshness dataFreshness) {
super(
ensureAtLeastOneEndpoint(endpoints),
getDatabaseOrDefault(database),
getCredentialsOrDefault(credentials, database),
aggregationPipelineMode);
aggregationPipelineMode,
dataFreshness);
this.applicationName = applicationName;
this.replicaSetName = replicaSetName;
this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.hypertrace.core.documentstore.model.options;

@SuppressWarnings("UnnecessarySemicolon")
public enum DataFreshness {
SYSTEM_DEFAULT,
REALTIME_FRESHNESS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: What does the freshness mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Freshness means the liveliness of the data.
REALTIME_FRESHNESS means we want the most up-to-date data.
NEAR_REALTIME_FRESHNESS means a little bit of stale data is acceptable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here NEAR_REALTIME_FRESHNESS means fetching data from secondary, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Basically "SECONDARY" is preferred. In case "SECONDARY" is not available, it'll fetch from "PRIMARY".

NEAR_REALTIME_FRESHNESS,
;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.hypertrace.core.documentstore.model.options;

import lombok.Builder;
import lombok.Builder.Default;
import lombok.Value;
import lombok.experimental.Accessors;

@Value
@Builder
@Accessors(fluent = true, chain = true)
public class QueryOptions {
public static final QueryOptions DEFAULT_QUERY_OPTIONS = QueryOptions.builder().build();

@Default DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.hypertrace.core.documentstore.Query;
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException;
import org.hypertrace.core.documentstore.model.options.QueryOptions;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
import org.hypertrace.core.documentstore.mongo.query.MongoQueryExecutor;
import org.hypertrace.core.documentstore.mongo.update.MongoUpdateExecutor;
Expand Down Expand Up @@ -550,6 +551,12 @@
return convertToDocumentIterator(queryExecutor.aggregate(query));
}

@Override
public CloseableIterator<Document> query(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
return convertToDocumentIterator(queryExecutor.aggregate(query, queryOptions));

Check warning on line 557 in document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java#L557

Added line #L557 was not covered by tests
}

@Override
public Optional<Document> update(
final org.hypertrace.core.documentstore.query.Query query,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.hypertrace.core.documentstore.mongo.collection;

import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT;
import static org.hypertrace.core.documentstore.mongo.collection.MongoReadPreferenceConverter.convert;

import com.mongodb.BasicDBObject;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoCollection;
import java.util.HashMap;
import java.util.Map;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.Accessors;
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.options.QueryOptions;

public class MongoCollectionOptionsApplier {
private final Map<CacheKey, MongoCollection<BasicDBObject>> collectionCache;

public MongoCollectionOptionsApplier() {
this.collectionCache = new HashMap<>();
}

public MongoCollection<BasicDBObject> applyOptions(
final ConnectionConfig connectionConfig,
final QueryOptions queryOptions,
final MongoCollection<BasicDBObject> collection) {
final CacheKey cacheKey =
CacheKey.builder().readPreference(readPreference(connectionConfig, queryOptions)).build();
return collectionCache.computeIfAbsent(
cacheKey, key -> collection.withReadPreference(key.readPreference()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the CacheKey associated with the collection? Since ReadPreference can be either primary or secondary, does this mean that this.collectionCache contains a maximum of 2 values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ReadPreference can be either primary or secondary, does this mean that this.collectionCache contains a maximum of 2 values?

Yes.

}

private ReadPreference readPreference(
final ConnectionConfig connectionConfig, final QueryOptions queryOptions) {
return SYSTEM_DEFAULT.equals(queryOptions.dataFreshness())
? convert(connectionConfig.dataFreshness())
: convert(queryOptions.dataFreshness());
}

@Value
@Builder
@Accessors(fluent = true, chain = true)
private static class CacheKey {
ReadPreference readPreference;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.hypertrace.core.documentstore.mongo.collection;

import static com.mongodb.ReadPreference.primary;
import static com.mongodb.ReadPreference.secondaryPreferred;
import static java.util.Map.entry;
import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS;
import static org.hypertrace.core.documentstore.model.options.DataFreshness.REALTIME_FRESHNESS;
import static org.hypertrace.core.documentstore.model.options.DataFreshness.SYSTEM_DEFAULT;

import com.mongodb.ReadPreference;
import java.util.Map;
import java.util.Optional;
import org.hypertrace.core.documentstore.model.options.DataFreshness;

public class MongoReadPreferenceConverter {

Check warning on line 15 in document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java#L15

Added line #L15 was not covered by tests
private static final Map<DataFreshness, ReadPreference> DATA_FRESHNESS_TO_READ_PREFERENCE =
Map.ofEntries(
entry(SYSTEM_DEFAULT, primary()),
entry(REALTIME_FRESHNESS, primary()),
entry(NEAR_REALTIME_FRESHNESS, secondaryPreferred()));

public static ReadPreference convert(final DataFreshness dataFreshness) {
if (dataFreshness == null) {
return primary();
}

return Optional.ofNullable(DATA_FRESHNESS_TO_READ_PREFERENCE.get(dataFreshness))
.orElseThrow(
() ->
new UnsupportedOperationException("Unsupported data freshness: " + dataFreshness));

Check warning on line 30 in document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/collection/MongoReadPreferenceConverter.java#L30

Added line #L30 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static java.util.function.Function.identity;
import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.hypertrace.core.documentstore.model.options.QueryOptions.DEFAULT_QUERY_OPTIONS;
import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.COUNT_ALIAS;
import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.getCountClause;
import static org.hypertrace.core.documentstore.mongo.query.MongoPaginationHelper.applyPagination;
Expand Down Expand Up @@ -38,6 +40,8 @@
import org.bson.conversions.Bson;
import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode;
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.options.QueryOptions;
import org.hypertrace.core.documentstore.mongo.collection.MongoCollectionOptionsApplier;
import org.hypertrace.core.documentstore.mongo.query.parser.AliasParser;
import org.hypertrace.core.documentstore.mongo.query.parser.MongoFromTypeExpressionParser;
import org.hypertrace.core.documentstore.mongo.query.parser.MongoGroupTypeExpressionParser;
Expand All @@ -52,7 +56,6 @@
@Slf4j
@AllArgsConstructor
public class MongoQueryExecutor {

private static final List<Function<Query, Collection<BasicDBObject>>>
DEFAULT_AGGREGATE_PIPELINE_FUNCTIONS =
List.of(
Expand Down Expand Up @@ -116,6 +119,7 @@ public ServerAddress getServerAddress() {
}
};

private final MongoCollectionOptionsApplier optionsApplier = new MongoCollectionOptionsApplier();
private final com.mongodb.client.MongoCollection<BasicDBObject> collection;
private final ConnectionConfig connectionConfig;

Expand All @@ -138,6 +142,11 @@ public MongoCursor<BasicDBObject> find(final Query query) {
}

public MongoCursor<BasicDBObject> aggregate(final Query originalQuery) {
return aggregate(originalQuery, DEFAULT_QUERY_OPTIONS);
}

public MongoCursor<BasicDBObject> aggregate(
final Query originalQuery, final QueryOptions queryOptions) {
if (originalQuery.getPagination().map(Pagination::getLimit).map(ZERO::equals).orElse(false)) {
log.debug("Not executing query because of a 0 limit");
return EMPTY_CURSOR;
Expand All @@ -152,13 +161,16 @@ public MongoCursor<BasicDBObject> aggregate(final Query originalQuery) {
aggregatePipeline.stream()
.flatMap(function -> function.apply(query).stream())
.filter(not(BasicDBObject::isEmpty))
.collect(toList());
.collect(toUnmodifiableList());

logPipeline(pipeline);
logPipeline(pipeline, queryOptions);

try {
final com.mongodb.client.MongoCollection<BasicDBObject> collectionWithOptions =
optionsApplier.applyOptions(connectionConfig, queryOptions, collection);

final AggregateIterable<BasicDBObject> iterable =
collection.aggregate(pipeline).allowDiskUse(true);
collectionWithOptions.aggregate(pipeline).allowDiskUse(true);
return iterable.cursor();
} catch (final MongoCommandException e) {
log.error("Execution failed for query: {}. Aggregation Pipeline: {}", query, pipeline);
Expand All @@ -177,7 +189,7 @@ public long count(final Query originalQuery) {
.filter(not(BasicDBObject::isEmpty))
.collect(toList());

logPipeline(pipeline);
logPipeline(pipeline, DEFAULT_QUERY_OPTIONS);
final AggregateIterable<BasicDBObject> iterable = collection.aggregate(pipeline);

try (final MongoCursor<BasicDBObject> cursor = iterable.cursor()) {
Expand Down Expand Up @@ -206,11 +218,12 @@ private void logClauses(
pagination);
}

private void logPipeline(final List<BasicDBObject> pipeline) {
private void logPipeline(final List<BasicDBObject> pipeline, final QueryOptions queryOptions) {
log.debug(
"MongoDB aggregation():\n Collection: {}\n Pipeline: {}",
"MongoDB aggregation():\n Collection: {}\n Pipeline: {}\n QueryOptions: {}",
collection.getNamespace(),
pipeline);
pipeline,
queryOptions);
}

private Query transformAndLog(Query query) {
Expand Down
Loading
Loading