Skip to content

Commit

Permalink
Merge branch 'main' into c/workload-identity/1
Browse files Browse the repository at this point in the history
  • Loading branch information
franzramadhan authored Mar 4, 2025
2 parents 82391db + d126c15 commit 3f6cda6
Show file tree
Hide file tree
Showing 16 changed files with 539 additions and 177 deletions.
109 changes: 109 additions & 0 deletions server/cmd/db-migrations/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package main

import (
"context"
"fmt"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// Entity is a generic interface for types that can be batch updated
type Entity interface {
GetID() primitive.ObjectID
}

// BatchUpdate processes documents from a MongoDB collection in batches and applies the given update function to each document
func BatchUpdate[T Entity](ctx context.Context, col *mongo.Collection, filter bson.M, batchSize int, fn func(item T) (T, error)) (int, error) {
opts := options.Find().
SetBatchSize(int32(batchSize)).
SetNoCursorTimeout(true)

cursor, err := col.Find(ctx, filter, opts)
if err != nil {
return 0, fmt.Errorf("failed to query collection: %w", err)
}
defer cursor.Close(ctx)

// Process documents in batches
batch := make([]mongo.WriteModel, 0, batchSize)
processed := 0
startTime := time.Now()

for cursor.Next(ctx) {
if ctx.Err() != nil {
return 0, ctx.Err()
}

var item T
if err := cursor.Decode(&item); err != nil {
return 0, fmt.Errorf("failed to decode document: %w", err)
}

// Apply the process function to the document
updatedItem, err := fn(item)
if err != nil {
return 0, fmt.Errorf("failed to process document: %w", err)
}

// Create an update model
update := mongo.NewUpdateOneModel().
SetFilter(bson.M{"_id": item.GetID()}).
SetUpdate(bson.M{"$set": updatedItem})

batch = append(batch, update)
processed++

// If we've reached the batch size, execute the bulk write
if len(batch) >= batchSize {
if err := executeBatch(ctx, col, batch); err != nil {
return 0, err
}

// Log progress
elapsed := time.Since(startTime)
rate := float64(processed) / elapsed.Seconds()
fmt.Printf("Processed %d documents (%.2f docs/sec)\n", processed, rate)

// Clear the batch
batch = batch[:0]
}
}

// Process any remaining documents in the final batch
if len(batch) > 0 {
if err := executeBatch(ctx, col, batch); err != nil {
return 0, err
}
}

// Check for any cursor errors
if err := cursor.Err(); err != nil {
return 0, fmt.Errorf("cursor error: %w", err)
}

// Log final stats
elapsed := time.Since(startTime)
rate := float64(processed) / elapsed.Seconds()
fmt.Printf("Completed processing %d documents in %v (%.2f docs/sec)\n", processed, elapsed, rate)

return processed, nil
}

// executeBatch performs the bulk write operation for a batch of updates
func executeBatch(ctx context.Context, collection *mongo.Collection, batch []mongo.WriteModel) error {
// Define options for the bulk write operation
opts := options.BulkWrite().SetOrdered(false)

// Execute the bulk write
res, err := collection.BulkWrite(ctx, batch, opts)
if err != nil {
return fmt.Errorf("bulk write failed: %w", err)
}
fmt.Printf("Bulk write result: size=%d matched=%d, modified=%d, upserted=%d\n", len(batch), res.MatchedCount, res.ModifiedCount, res.UpsertedCount)

return nil
}
140 changes: 140 additions & 0 deletions server/cmd/db-migrations/item_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package main

import (
"context"
"fmt"

"github.com/samber/lo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type ItemDocument struct {
ID string `bson:"_id,omitempty"`
Fields []ItemFieldDocument
Assets []string `bson:"assets,omitempty"`
}

func (d ItemDocument) GetID() primitive.ObjectID {
id, err := primitive.ObjectIDFromHex(d.ID)
if err != nil {
fmt.Printf("failed to parse id: %v\n", d.ID)
return primitive.NilObjectID
}
return id
}

type ItemFieldDocument struct {
F string `bson:"f,omitempty"`
V ValueDocument `bson:"v,omitempty"`
Field string `bson:"schemafield,omitempty"` // compat
ValueType string `bson:"valuetype,omitempty"` // compat
Value any `bson:"value,omitempty"` // compat
}

type ValueDocument struct {
T string `bson:"t"`
V any `bson:"v"`
}

func ItemMigration(ctx context.Context, dbURL, dbName string, wetRun bool) error {
testID := ""

client, err := mongo.Connect(ctx, options.Client().ApplyURI(dbURL))
if err != nil {
return fmt.Errorf("db: failed to init client err: %w", err)
}
col := client.Database(dbName).Collection("item")

filter := bson.M{}

if testID != "" {
filter = bson.M{
"$and": []bson.M{
{"id": testID},
filter,
},
}
count, err := col.CountDocuments(ctx, filter)
if err != nil {
return fmt.Errorf("failed to count docs: %w", err)
}
fmt.Printf("test mode: filter on item id '%s' is applyed, %d items selected.\n", testID, count)
}

if !wetRun {
fmt.Printf("dry run\n")
count, err := col.CountDocuments(ctx, filter)
if err != nil {
return fmt.Errorf("failed to count docs: %w", err)
}
fmt.Printf("%d docs will be updated\n", count)
return nil
}

_, err = BatchUpdate(ctx, col, filter, 1000, updateItem)
if err != nil {
return fmt.Errorf("failed to apply batches: %w", err)
}

fmt.Printf("done.\n")
return nil
}

func updateItem(item ItemDocument) (ItemDocument, error) {
updatedItem := ItemDocument{}
var assets []string
for _, f := range item.Fields {
if f.Field != "" {
f.F = f.Field
}
if f.ValueType != "" {
f.V.T = f.ValueType
}
if f.Value != nil {
f.V.V = f.Value
}

// value should be an array, the value is always treated as multiple in db
if f.V.V != nil {
_, ok := f.V.V.(bson.A)
if !ok {
f.V.V = bson.A{f.V.V}
}
}

// migrate old value: date to datetime
if f.V.T == "date" {
f.V.T = "datetime"
}
updatedItem.Fields = append(updatedItem.Fields, ItemFieldDocument{
F: f.F,
V: ValueDocument{
T: f.V.T,
V: f.V.V,
},
})
if f.V.T == "asset" && f.V.V != nil {
pa, ok := f.V.V.(bson.A)
if ok {
aa := lo.FilterMap(pa, func(v any, _ int) (string, bool) {
s, ok := v.(string)
if !ok {
fmt.Printf("failed to parse asset: asset=%v item=%v\n", v, item.ID)
}
return s, ok && s != ""
})
assets = append(assets, aa...)
continue
}
fmt.Printf("failed to parse assets: assets=%v item=%v\n", f.V.V, item.ID)
}
}
if len(assets) > 0 {
updatedItem.Assets = lo.Uniq(assets)
}

return updatedItem, nil
}
Loading

0 comments on commit 3f6cda6

Please sign in to comment.