Skip to content

Commit

Permalink
Merge pull request #1824 from bosch-io/bugfix/mongoReadJournal-preser…
Browse files Browse the repository at this point in the history
…ve-maxPid

preserve maxPid in journal aggregation
  • Loading branch information
alstanchev authored Nov 28, 2023
2 parents 8509077 + 8b1bfdc commit b14d757
Showing 1 changed file with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
Expand Down Expand Up @@ -896,8 +897,18 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.ascending(S_ID)));

// Separate $group Stage to Calculate maxPid as this is not possible after filtering out DELETED snapshots
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(
new Document("_id", new BsonNull()),
Accumulators.max(maxPid, "$"+ S_ID),
Accumulators.push(items,"$$ROOT")));

// redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false
// if includeDeleted=true keeps them using "$$DESCEND"
// redacts operates recursively, so it evaluates all documents in items array which
// allows us to preserve maxPid even when all elements in the array are PRUNE-ed
pipeline.add(new Document().append("$redact", new Document()
.append("$cond", new Document()
.append("if",
Expand All @@ -906,14 +917,6 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
.append("else", includeDeleted ? "$$DESCEND" : "$$PRUNE")
)));

// group stage 2: group by max encountered pid, "push" all elements calculated in previous "redact"
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(null,
Accumulators.max(maxPid, "$" + S_ID),
Accumulators.push(items, "$$ROOT")
));

return Source.fromPublisher(snapshotStore.aggregate(pipeline)
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
)
Expand Down

0 comments on commit b14d757

Please sign in to comment.