Skip to content

Commit

Permalink
metricbeat/module/mongodb/replstatus: Update getOpTimestamp in `rep…
Browse files Browse the repository at this point in the history
…lstatus` to fix sort and temp files generation issue (#37688)

* Update getOpTimestamp implementation
  • Loading branch information
ritalwar authored Jan 25, 2024
1 parent b09ac16 commit 77abcf3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add a `/inputs/` route to the HTTP monitoring endpoint that exposes metrics for each metricset instance. {pull}36971[36971]
- Add linux IO metrics to system/process {pull}37213[37213]
- Add new memory/cgroup metrics to Kibana module {pull}37232[37232]
- Update `getOpTimestamp` in `replstatus` to fix sort and temp files generation issue in mongodb. {pull}37688[37688]

*Osquerybeat*

Expand Down
47 changes: 24 additions & 23 deletions metricbeat/module/mongodb/replstatus/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"context"
"errors"
"fmt"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type oplogInfo struct {
Expand Down Expand Up @@ -71,14 +70,9 @@ func getReplicationInfo(client *mongo.Client) (*oplogInfo, error) {
}

// get first and last items in the oplog
firstTs, err := getOpTimestamp(collection, "$natural")
firstTs, lastTs, err := getOpTimestamp(collection)
if err != nil {
return nil, fmt.Errorf("could not get first operation timestamp in op log: %w", err)
}

lastTs, err := getOpTimestamp(collection, "-$natural")
if err != nil {
return nil, fmt.Errorf("could not get last operation timestamp in op log: %w", err)
return nil, fmt.Errorf("could not get operation timestamp in op log: %w", err)
}

diff := lastTs - firstTs
Expand All @@ -92,28 +86,35 @@ func getReplicationInfo(client *mongo.Client) (*oplogInfo, error) {
}, nil
}

func getOpTimestamp(collection *mongo.Collection, sort string) (uint32, error) {
opt := options.Find().SetSort(bson.D{{Key: sort, Value: 1}})
cursor, err := collection.Find(context.Background(), bson.D{}, opt)
if err != nil {
return 0, fmt.Errorf("could not get cursor on collection '%s': %w", collection.Name(), err)
func getOpTimestamp(collection *mongo.Collection) (uint32, uint32, error) {

// Find both first and last timestamps using $min and $max
pipeline := bson.A{
bson.M{"$group": bson.M{"_id": 1, "minTS": bson.M{"$min": "$ts"}, "maxTS": bson.M{"$max": "$ts"}}},
}

if !cursor.Next(context.Background()) {
return 0, errors.New("objects not found in local.oplog.rs")
cursor, err := collection.Aggregate(context.Background(), pipeline)
if err != nil {
return 0, 0, fmt.Errorf("could not get operation timestamps in op log: %w", err)
}
defer cursor.Close(context.Background())

var opTime map[string]interface{}
if err = cursor.Decode(&opTime); err != nil {
return 0, fmt.Errorf("error decoding response: %w", err)
var result struct {
MinTS time.Time `bson:"minTS"`
MaxTS time.Time `bson:"maxTS"`
}

ts, ok := opTime["ts"].(primitive.Timestamp)
if !ok {
return 0, errors.New("an expected timestamp was not found")
if !cursor.Next(context.Background()) {
return 0, 0, errors.New("no documents found in op log")
}
if err := cursor.Decode(&result); err != nil {
return 0, 0, fmt.Errorf("error decoding response for timestamps: %w", err)
}

minTS := uint32(result.MinTS.Unix())
maxTS := uint32(result.MaxTS.Unix())

return ts.T, nil
return minTS, maxTS, nil
}

func contains(s []string, x string) bool {
Expand Down

0 comments on commit 77abcf3

Please sign in to comment.