Skip to content

Commit

Permalink
Merge branch 'main' into handle_doller_prefix_values
Browse files Browse the repository at this point in the history
  • Loading branch information
suresh-prakash authored Sep 9, 2024
2 parents a6274ec + d5362f6 commit ba4898b
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.hypertrace.core.documentstore.expression.operators.RelationalOperator;
import org.hypertrace.core.documentstore.expression.type.FilterTypeExpression;
import org.hypertrace.core.documentstore.model.options.UpdateOptions;
import org.hypertrace.core.documentstore.model.options.UpdateOptions.MissingDocumentStrategy;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue;
import org.hypertrace.core.documentstore.query.Aggregation;
Expand Down Expand Up @@ -843,6 +844,9 @@ void testAggregateWithNestedArraysAndUnnestFilters(final String dataStoreName)
final FilterTypeExpression filter =
LogicalExpression.builder()
.operator(AND)
.operand(
RelationalExpression.of(
IdentifierExpression.of("item"), NEQ, ConstantExpression.of((String) null)))
.operand(
RelationalExpression.of(
IdentifierExpression.of("sales.medium.type"),
Expand Down Expand Up @@ -2591,6 +2595,66 @@ void testUpdateWithOnlyAddUpdateOperator(final String datastoreName) throws IOEx
datastoreName, iterator_new, "query/update_operator/add_updated2.json", 9);
}

@ParameterizedTest
@ArgumentsSource(MongoProvider.class)
void testUpdateWithUpsertOptions(final String dataStoreName) throws IOException {
final Collection collection = getCollection(dataStoreName, UPDATABLE_COLLECTION_NAME);
createCollectionData("query/updatable_collection_data.json", UPDATABLE_COLLECTION_NAME);
final SubDocumentUpdate set =
SubDocumentUpdate.builder()
.subDocument("props.brand")
.operator(SET)
.subDocumentValue(SubDocumentValue.of(new JSONDocument(Map.of("value", "nike"))))
.build();
Filter filter =
Filter.builder()
.expression(
RelationalExpression.of(
IdentifierExpression.of("item"),
IN,
ConstantExpression.ofStrings(List.of("shoes"))))
.build();

final Query query = Query.builder().setFilter(filter).build();
final List<SubDocumentUpdate> updates = List.of(set);
final CloseableIterator<Document> iterator =
collection.bulkUpdate(
query,
updates,
UpdateOptions.builder()
.returnDocumentType(AFTER_UPDATE)
.missingDocumentStrategy(MissingDocumentStrategy.CREATE_USING_UPDATES)
.build());
assertDocsAndSizeEqualWithoutOrder(
dataStoreName, iterator, "query/update_operator/updated4.json", 1);
Filter filter1 =
Filter.builder()
.expression(
RelationalExpression.of(
IdentifierExpression.of("item"),
IN,
ConstantExpression.ofStrings(List.of("shirt"))))
.build();
final Query query1 = Query.builder().setFilter(filter1).build();
final SubDocumentUpdate add =
SubDocumentUpdate.builder()
.subDocument("quantity")
.operator(ADD)
.subDocumentValue(SubDocumentValue.of(1))
.build();
final List<SubDocumentUpdate> update1 = List.of(add);
final CloseableIterator<Document> iterator1 =
collection.bulkUpdate(
query1,
update1,
UpdateOptions.builder()
.returnDocumentType(AFTER_UPDATE)
.missingDocumentStrategy(MissingDocumentStrategy.CREATE_USING_UPDATES)
.build());
assertDocsAndSizeEqualWithoutOrder(
dataStoreName, iterator1, "query/update_operator/updated5.json", 1);
}

@ParameterizedTest
@ArgumentsSource(AllProvider.class)
void testUpdateWithAllOperatorsOnObject(final String datastoreName) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"item": "shoes",
"props": {
"brand": {
"value": "nike"
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{
"item": "shirt",
"quantity": 1
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@
import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.AFTER_UPDATE;

import lombok.Builder;
import lombok.Builder.Default;
import lombok.Value;

@Value
@Builder
public class UpdateOptions {
public static UpdateOptions DEFAULT_UPDATE_OPTIONS =
UpdateOptions.builder().returnDocumentType(AFTER_UPDATE).build();
UpdateOptions.builder()
.returnDocumentType(AFTER_UPDATE)
.missingDocumentStrategy(MissingDocumentStrategy.SKIP_UPDATES)
.build();

ReturnDocumentType returnDocumentType;
@Default ReturnDocumentType returnDocumentType = AFTER_UPDATE;
@Default MissingDocumentStrategy missingDocumentStrategy = MissingDocumentStrategy.SKIP_UPDATES;

public enum MissingDocumentStrategy {
CREATE_USING_UPDATES,
SKIP_UPDATES,
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.hypertrace.core.documentstore.mongo.query.parser.filter;

import static java.util.Collections.unmodifiableMap;

import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import org.hypertrace.core.documentstore.expression.impl.RelationalExpression;
Expand All @@ -16,6 +19,11 @@ public Map<String, Object> parse(
final String parsedLhs = expression.getLhs().accept(context.lhsParser());
final String operator = mapping.getOperator(expression.getOperator());
final Object parsedRhs = expression.getRhs().accept(context.rhsParser());
return Map.of(parsedLhs, Map.of(operator, parsedRhs));

// using HashMap instead of Map.of() as RHS value can be null
// but Map.of() doesn't support null values
final Map<String, Object> operatorToRhsMap = new HashMap<>();
operatorToRhsMap.put(operator, parsedRhs);
return Map.of(parsedLhs, unmodifiableMap(operatorToRhsMap));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.options.ReturnDocumentType;
import org.hypertrace.core.documentstore.model.options.UpdateOptions;
import org.hypertrace.core.documentstore.model.options.UpdateOptions.MissingDocumentStrategy;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
import org.hypertrace.core.documentstore.mongo.MongoUtils;
import org.hypertrace.core.documentstore.mongo.query.MongoQueryExecutor;
Expand Down Expand Up @@ -54,6 +55,10 @@ public Optional<Document> update(
final BasicDBObject selections = getSelections(query);
final BasicDBObject sorts = getOrders(query);
final FindOneAndUpdateOptions options = new FindOneAndUpdateOptions();
options.upsert(
updateOptions
.getMissingDocumentStrategy()
.equals(MissingDocumentStrategy.CREATE_USING_UPDATES));
final ReturnDocumentType returnDocumentType = updateOptions.getReturnDocumentType();

options.returnDocument(getReturnDocument(returnDocumentType));
Expand Down Expand Up @@ -91,34 +96,47 @@ public Optional<MongoCursor<BasicDBObject>> bulkUpdate(
final BasicDBObject filter = getFilter(query, Query::getFilter);
final BasicDBObject updateObject = updateParser.buildUpdateClause(updates);
final ReturnDocumentType returnDocumentType = updateOptions.getReturnDocumentType();
final com.mongodb.client.model.UpdateOptions mongoUpdateOptions =
new com.mongodb.client.model.UpdateOptions();
mongoUpdateOptions.upsert(
updateOptions
.getMissingDocumentStrategy()
.equals(MissingDocumentStrategy.CREATE_USING_UPDATES));
final MongoCursor<BasicDBObject> cursor;

switch (returnDocumentType) {
case BEFORE_UPDATE:
cursor = queryExecutor.aggregate(query);
logAndUpdate(filter, updateObject);
logAndUpdate(filter, updateObject, mongoUpdateOptions);
return Optional.of(cursor);

case AFTER_UPDATE:
logAndUpdate(filter, updateObject);
logAndUpdate(filter, updateObject, mongoUpdateOptions);
cursor = queryExecutor.aggregate(query);
return Optional.of(cursor);

case NONE:
logAndUpdate(filter, updateObject);
logAndUpdate(filter, updateObject, mongoUpdateOptions);
return Optional.empty();

default:
throw new IOException("Unrecognized return document type: " + returnDocumentType);
}
}

private void logAndUpdate(final BasicDBObject filter, final BasicDBObject setObject)
private void logAndUpdate(
final BasicDBObject filter,
final BasicDBObject setObject,
com.mongodb.client.model.UpdateOptions updateOptions)
throws IOException {
try {
log.debug(
"Updating {} using {} with filter {}", collection.getNamespace(), setObject, filter);
collection.updateMany(filter, setObject);
"Updating {} using {} with filter {} and updateOptions: {}",
collection.getNamespace(),
setObject,
filter,
updateOptions);
collection.updateMany(filter, setObject, updateOptions);
} catch (Exception e) {
throw new IOException("Error while updating", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ class PostgresStandardRelationalFilterParser implements PostgresRelationalFilter
public String parse(
final RelationalExpression expression, final PostgresRelationalFilterContext context) {
final Object parsedLhs = expression.getLhs().accept(context.lhsParser());
final String operator = mapper.getMapping(expression.getOperator());
final Object parsedRhs = expression.getRhs().accept(context.rhsParser());
final String operator = mapper.getMapping(expression.getOperator(), parsedRhs);

context.getParamsBuilder().addObjectParam(parsedRhs);
return String.format("%s %s ?", parsedLhs, operator);
if (parsedRhs != null) {
context.getParamsBuilder().addObjectParam(parsedRhs);
return String.format("%s %s ?", parsedLhs, operator);
} else {
return String.format("%s %s NULL", parsedLhs, operator);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.google.common.collect.Maps;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.hypertrace.core.documentstore.expression.operators.RelationalOperator;

Expand All @@ -24,8 +25,18 @@ class PostgresStandardRelationalOperatorMapper {
entry(GTE, ">="),
entry(LTE, "<=")));

String getMapping(final RelationalOperator operator) {
return Optional.ofNullable(mapping.get(operator))
.orElseThrow(() -> new UnsupportedOperationException("Unsupported operator: " + operator));
private static final Map<RelationalOperator, String> nullRhsOperatorMapping =
Maps.immutableEnumMap(Map.ofEntries(entry(EQ, "IS"), entry(NEQ, "IS NOT")));

String getMapping(final RelationalOperator operator, Object parsedRhs) {
if (Objects.nonNull(parsedRhs)) {
return Optional.ofNullable(mapping.get(operator))
.orElseThrow(
() -> new UnsupportedOperationException("Unsupported operator: " + operator));
} else {
return Optional.ofNullable(nullRhsOperatorMapping.get(operator))
.orElseThrow(
() -> new UnsupportedOperationException("Unsupported operator: " + operator));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode;
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.options.UpdateOptions;
import org.hypertrace.core.documentstore.model.options.UpdateOptions.MissingDocumentStrategy;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue;
import org.hypertrace.core.documentstore.query.SortingSpec;
Expand Down Expand Up @@ -310,7 +311,10 @@ void testUpdateAtomicWithFilter_getNone() throws IOException {
mongoCollection.update(
query,
List.of(dateUpdate, quantityUpdate, propsUpdate),
UpdateOptions.builder().returnDocumentType(NONE).build());
UpdateOptions.builder()
.returnDocumentType(NONE)
.missingDocumentStrategy(MissingDocumentStrategy.SKIP_UPDATES)
.build());

assertFalse(result.isPresent());

Expand Down Expand Up @@ -392,13 +396,19 @@ void testBulkUpdateWithFilter() throws IOException {
query,
List.of(dateUpdate, quantityUpdate, propsUpdate),
UpdateOptions.DEFAULT_UPDATE_OPTIONS);

final ArgumentCaptor<com.mongodb.client.model.UpdateOptions> updateOptionsArgumentCaptor =
ArgumentCaptor.forClass(com.mongodb.client.model.UpdateOptions.class);
com.mongodb.client.model.UpdateOptions mongoUpdateOption =
new com.mongodb.client.model.UpdateOptions();
mongoUpdateOption.upsert(false);
assertTrue(result.hasNext());
assertJsonEquals(
readFileFromResource("atomic_read_and_update/response.json").orElseThrow(),
result.next().toJson());

verify(collection, times(1)).updateMany(filter, setObject);
verify(collection, times(1))
.updateMany(eq(filter), eq(setObject), updateOptionsArgumentCaptor.capture());
assertEquals(updateOptionsArgumentCaptor.getValue().isUpsert(), mongoUpdateOption.isUpsert());
}

@Test
Expand All @@ -409,10 +419,21 @@ void testBulkUpdateWithFilter_getNone() throws IOException {
mongoCollection.bulkUpdate(
query,
List.of(dateUpdate, quantityUpdate, propsUpdate),
UpdateOptions.builder().returnDocumentType(NONE).build());
UpdateOptions.builder()
.returnDocumentType(NONE)
.missingDocumentStrategy(MissingDocumentStrategy.SKIP_UPDATES)
.build());

final ArgumentCaptor<com.mongodb.client.model.UpdateOptions> updateOptionsArgumentCaptor =
ArgumentCaptor.forClass(com.mongodb.client.model.UpdateOptions.class);

assertFalse(result.hasNext());
verify(collection, times(1)).updateMany(filter, setObject);
com.mongodb.client.model.UpdateOptions mongoUpdateOption =
new com.mongodb.client.model.UpdateOptions();
mongoUpdateOption.upsert(false);
verify(collection, times(1))
.updateMany(eq(filter), eq(setObject), updateOptionsArgumentCaptor.capture());
assertEquals(updateOptionsArgumentCaptor.getValue().toString(), mongoUpdateOption.toString());
}

@Test
Expand Down

0 comments on commit ba4898b

Please sign in to comment.