Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: merge back delete request when we are done processing all its splits #15968

Merged
merged 5 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions cmd/dataobj-inspect/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/opentracing-contrib/go-grpc v0.1.0 // indirect
github.com/opentracing-contrib/go-grpc v0.1.1 // indirect
github.com/opentracing-contrib/go-stdlib v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
Expand All @@ -109,10 +109,10 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect
go.etcd.io/etcd/client/v3 v3.5.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/pdata v1.23.0 // indirect
go.opentelemetry.io/otel v1.33.0 // indirect
go.opentelemetry.io/otel/metric v1.33.0 // indirect
go.opentelemetry.io/otel/trace v1.33.0 // indirect
go.opentelemetry.io/collector/pdata v1.24.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
Expand All @@ -127,10 +127,10 @@ require (
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.28.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250102185135-69823020774d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/grpc v1.69.4 // indirect
google.golang.org/protobuf v1.36.3 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/grpc v1.70.0 // indirect
google.golang.org/protobuf v1.36.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
9 changes: 9 additions & 0 deletions cmd/dataobj-inspect/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/opentracing-contrib/go-grpc v0.1.0 h1:9JHDtQXv6UL0tFF8KJB/4ApJgeOcaHp1h07d0PJjESc=
github.com/opentracing-contrib/go-grpc v0.1.0/go.mod h1:i3/jx/TvJZ/HKidtT4XGIi/NosUEpzS9xjVJctbKZzI=
github.com/opentracing-contrib/go-grpc v0.1.1/go.mod h1:Nu6sz+4zzgxXu8rvKfnwjBEmHsuhTigxRwV2RhELrS8=
github.com/opentracing-contrib/go-stdlib v1.1.0 h1:cZBWc4pA4e65tqTJddbflK435S0tDImj6c9BMvkdUH0=
github.com/opentracing-contrib/go-stdlib v1.1.0/go.mod h1:S0p+X9p6dcBkoMTL+Qq2VOvxKs9ys5PpYWXWqlCS0bQ=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
Expand Down Expand Up @@ -463,18 +464,22 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/collector/pdata v1.23.0 h1:tEk0dkfB8RdSukoOMfEa8duB938gfZowdfRkrJxGDrw=
go.opentelemetry.io/collector/pdata v1.23.0/go.mod h1:I2jggpBMiO8A+7TXhzNpcJZkJtvi1cU0iVNIi+6bc+o=
go.opentelemetry.io/collector/pdata v1.24.0/go.mod h1:cf3/W9E/uIvPS4MR26SnMFJhraUCattzzM6qusuONuc=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q=
go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw=
go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ=
go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM=
go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM=
go.opentelemetry.io/otel/sdk/metric v1.33.0 h1:Gs5VK9/WUJhNXZgn8MR6ITatvAmKeIuCtNbsP3JkNqU=
go.opentelemetry.io/otel/sdk/metric v1.33.0/go.mod h1:dL5ykHZmm1B1nVRk9dDjChwDmt81MjVp3gLkQRwKf/Q=
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
Expand Down Expand Up @@ -634,8 +639,10 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto/googleapis/api v0.0.0-20250102185135-69823020774d h1:H8tOf8XM88HvKqLTxe755haY6r1fqqzLbEnfrmLXlSA=
google.golang.org/genproto/googleapis/api v0.0.0-20250102185135-69823020774d/go.mod h1:2v7Z7gP2ZUOGsaFyxATQSRoBnKygqVq2Cwnvom7QiqY=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand All @@ -645,6 +652,7 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand All @@ -658,6 +666,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
35 changes: 35 additions & 0 deletions pkg/compactor/deletion/delete_requests_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeri

go dm.loop()

if err := dm.mergeShardedRequests(context.Background()); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}

return dm
}

Expand Down Expand Up @@ -83,6 +87,33 @@ func (d *DeleteRequestsManager) Stop() {
d.wg.Wait()
}

// mergeShardedRequests merges the sharded requests back to a single request when we are done with processing all the shards
func (d *DeleteRequestsManager) mergeShardedRequests(ctx context.Context) error {
deleteGroups, err := d.deleteRequestsStore.GetAllDeleteRequests(context.Background())
if err != nil {
return err
}

deletesPerRequest := partitionByRequestID(deleteGroups)
deleteRequests := mergeDeletes(deletesPerRequest)
for _, req := range deleteRequests {
// do not consider requests which do not have an id. Request ID won't be set in some tests or there is a bug in our code for loading requests.
if req.RequestID == "" {
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log this information before continuing?

}
// do not do anything if we are not done with processing all the shards or the number of shards is 1
if req.Status != StatusProcessed || len(deletesPerRequest[req.RequestID]) == 1 {
continue
}

if err := d.deleteRequestsStore.MergeShardedRequests(ctx, req, deletesPerRequest[req.RequestID]); err != nil {
return err
}
}

return nil
}

func (d *DeleteRequestsManager) updateMetrics() error {
deleteRequests, err := d.deleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived)
if err != nil {
Expand Down Expand Up @@ -393,6 +424,10 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
)
d.markRequestAsProcessed(req)
}

if err := d.mergeShardedRequests(context.Background()); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}
}

func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool {
Expand Down
128 changes: 127 additions & 1 deletion pkg/compactor/deletion/delete_requests_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package deletion

import (
"context"
"path/filepath"
"strings"
"testing"
"time"
Expand All @@ -13,6 +14,8 @@ import (
"github.com/grafana/loki/v3/pkg/compactor/deletionmode"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/v3/pkg/util/filter"
)

Expand Down Expand Up @@ -1047,6 +1050,10 @@ func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, s
return reqs, nil
}

func (m *mockDeleteRequestsStore) GetAllDeleteRequests(_ context.Context) ([]DeleteRequest, error) {
return m.deleteRequests, nil
}

func (m *mockDeleteRequestsStore) AddDeleteRequestGroup(_ context.Context, reqs []DeleteRequest) ([]DeleteRequest, error) {
m.addReqs = reqs
if m.returnZeroDeleteRequests {
Expand Down Expand Up @@ -1085,13 +1092,132 @@ func (m *mockDeleteRequestsStore) UpdateStatus(_ context.Context, req DeleteRequ
return nil
}

func (m *mockDeleteRequestsStore) MergeShardedRequests(_ context.Context, requestToAdd DeleteRequest, requestsToRemove []DeleteRequest) error {
n := 0
for i := range m.deleteRequests {
for j := range requestsToRemove {
if requestsAreEqual(m.deleteRequests[i], requestsToRemove[j]) {
continue
}
m.deleteRequests[n] = m.deleteRequests[i]
n++
break
}
}

m.deleteRequests = m.deleteRequests[:n]
m.deleteRequests = append(m.deleteRequests, requestToAdd)

return nil
}

func requestsAreEqual(req1, req2 DeleteRequest) bool {
if req1.UserID == req2.UserID &&
req1.Query == req2.Query &&
req1.StartTime == req2.StartTime &&
req1.EndTime == req2.EndTime {
req1.EndTime == req2.EndTime &&
req1.SequenceNum == req2.SequenceNum &&
req1.Status == req2.Status {
return true
}

return false
}

func TestDeleteRequestsManager_mergeShardedRequests(t *testing.T) {
for _, tc := range []struct {
name string
reqsToAdd []DeleteRequest
shouldMarkProcessed func(DeleteRequest) bool
requestsShouldBeMerged bool
}{
{
name: "no requests in store",
},
{
name: "none of the requests are processed - should not merge",
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
shouldMarkProcessed: func(_ DeleteRequest) bool {
return false
},
},
{
name: "not all requests are processed - should not merge",
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
shouldMarkProcessed: func(request DeleteRequest) bool {
return request.SequenceNum%2 == 0
},
},
{
name: "all the requests are processed - should merge",
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
shouldMarkProcessed: func(_ DeleteRequest) bool {
return true
},
requestsShouldBeMerged: true,
},
{ // build requests for 2 different users and mark all requests as processed for just one of the two
name: "merging requests from one user should not touch another users requests",
reqsToAdd: append(
buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
buildRequests(time.Hour, `{foo="bar"}`, user2, now.Add(-24*time.Hour), now)...,
),
shouldMarkProcessed: func(request DeleteRequest) bool {
return request.UserID == user2
},
},
} {
t.Run(tc.name, func(t *testing.T) {
mgr := setupManager(t)
reqs, err := mgr.deleteRequestsStore.AddDeleteRequestGroup(context.Background(), tc.reqsToAdd)
require.NoError(t, err)
require.GreaterOrEqual(t, len(reqs), len(tc.reqsToAdd))

for _, req := range reqs {
if !tc.shouldMarkProcessed(req) {
continue
}
require.NoError(t, mgr.deleteRequestsStore.UpdateStatus(context.Background(), req, StatusProcessed))
}

inStoreReqs, err := mgr.deleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1)
require.NoError(t, err)

require.NoError(t, mgr.mergeShardedRequests(context.Background()))
inStoreReqsAfterMerging, err := mgr.deleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1)
require.NoError(t, err)

if tc.requestsShouldBeMerged {
require.Len(t, inStoreReqsAfterMerging, 1)
require.True(t, requestsAreEqual(inStoreReqsAfterMerging[0], DeleteRequest{
UserID: user1,
Query: tc.reqsToAdd[0].Query,
StartTime: tc.reqsToAdd[0].StartTime,
EndTime: tc.reqsToAdd[len(tc.reqsToAdd)-1].EndTime,
Status: StatusProcessed,
}))
} else {
require.Len(t, inStoreReqsAfterMerging, len(inStoreReqs))
require.Equal(t, inStoreReqs, inStoreReqsAfterMerging)
}
})
}
}

func setupManager(t *testing.T) *DeleteRequestsManager {
t.Helper()
// build the store
tempDir := t.TempDir()

workingDir := filepath.Join(tempDir, "working-dir")
objectStorePath := filepath.Join(tempDir, "object-store")

objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: objectStorePath,
})
require.NoError(t, err)
ds, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
require.NoError(t, err)

return NewDeleteRequestsManager(ds, time.Hour, 1, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
}
Loading
Loading