Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: merge back delete request when we are done processing all its splits #15968

Merged
merged 5 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pkg/compactor/deletion/delete_requests_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeri

go dm.loop()

if err := dm.mergeShardedRequests(context.Background()); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}

return dm
}

Expand Down Expand Up @@ -83,6 +87,39 @@ func (d *DeleteRequestsManager) Stop() {
d.wg.Wait()
}

// mergeShardedRequests merges the sharded requests back to a single request when we are done with processing all the shards
func (d *DeleteRequestsManager) mergeShardedRequests(ctx context.Context) error {
deleteGroups, err := d.deleteRequestsStore.GetAllDeleteRequests(context.Background())
if err != nil {
return err
}

deletesPerRequest := partitionByRequestID(deleteGroups)
deleteRequests := mergeDeletes(deletesPerRequest)
for _, req := range deleteRequests {
// do not consider requests which do not have an id. Request ID won't be set in some tests or there is a bug in our code for loading requests.
if req.RequestID == "" {
level.Error(util_log.Logger).Log("msg", "skipped considering request without an id for merging its shards",
"user_id", req.UserID,
"start_time", req.StartTime.Unix(),
"end_time", req.EndTime.Unix(),
"query", req.Query,
)
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log this information before continuing?

}
// do not do anything if we are not done with processing all the shards or the number of shards is 1
if req.Status != StatusProcessed || len(deletesPerRequest[req.RequestID]) == 1 {
continue
}

if err := d.deleteRequestsStore.MergeShardedRequests(ctx, req, deletesPerRequest[req.RequestID]); err != nil {
return err
}
}

return nil
}

func (d *DeleteRequestsManager) updateMetrics() error {
deleteRequests, err := d.deleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived)
if err != nil {
Expand Down Expand Up @@ -393,6 +430,10 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
)
d.markRequestAsProcessed(req)
}

if err := d.mergeShardedRequests(context.Background()); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}
}

func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool {
Expand Down
128 changes: 127 additions & 1 deletion pkg/compactor/deletion/delete_requests_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package deletion

import (
"context"
"path/filepath"
"strings"
"testing"
"time"
Expand All @@ -13,6 +14,8 @@ import (
"github.com/grafana/loki/v3/pkg/compactor/deletionmode"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/v3/pkg/util/filter"
)

Expand Down Expand Up @@ -1047,6 +1050,10 @@ func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, s
return reqs, nil
}

func (m *mockDeleteRequestsStore) GetAllDeleteRequests(_ context.Context) ([]DeleteRequest, error) {
return m.deleteRequests, nil
}

func (m *mockDeleteRequestsStore) AddDeleteRequestGroup(_ context.Context, reqs []DeleteRequest) ([]DeleteRequest, error) {
m.addReqs = reqs
if m.returnZeroDeleteRequests {
Expand Down Expand Up @@ -1085,13 +1092,132 @@ func (m *mockDeleteRequestsStore) UpdateStatus(_ context.Context, req DeleteRequ
return nil
}

func (m *mockDeleteRequestsStore) MergeShardedRequests(_ context.Context, requestToAdd DeleteRequest, requestsToRemove []DeleteRequest) error {
n := 0
for i := range m.deleteRequests {
for j := range requestsToRemove {
if requestsAreEqual(m.deleteRequests[i], requestsToRemove[j]) {
continue
}
m.deleteRequests[n] = m.deleteRequests[i]
n++
break
}
}

m.deleteRequests = m.deleteRequests[:n]
m.deleteRequests = append(m.deleteRequests, requestToAdd)

return nil
}

func requestsAreEqual(req1, req2 DeleteRequest) bool {
if req1.UserID == req2.UserID &&
req1.Query == req2.Query &&
req1.StartTime == req2.StartTime &&
req1.EndTime == req2.EndTime {
req1.EndTime == req2.EndTime &&
req1.SequenceNum == req2.SequenceNum &&
req1.Status == req2.Status {
return true
}

return false
}

func TestDeleteRequestsManager_mergeShardedRequests(t *testing.T) {
for _, tc := range []struct {
name string
reqsToAdd []DeleteRequest
shouldMarkProcessed func(DeleteRequest) bool
requestsShouldBeMerged bool
}{
{
name: "no requests in store",
},
{
name: "none of the requests are processed - should not merge",
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
shouldMarkProcessed: func(_ DeleteRequest) bool {
return false
},
},
{
name: "not all requests are processed - should not merge",
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
shouldMarkProcessed: func(request DeleteRequest) bool {
return request.SequenceNum%2 == 0
},
},
{
name: "all the requests are processed - should merge",
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
shouldMarkProcessed: func(_ DeleteRequest) bool {
return true
},
requestsShouldBeMerged: true,
},
{ // build requests for 2 different users and mark all requests as processed for just one of the two
name: "merging requests from one user should not touch another users requests",
reqsToAdd: append(
buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
buildRequests(time.Hour, `{foo="bar"}`, user2, now.Add(-24*time.Hour), now)...,
),
shouldMarkProcessed: func(request DeleteRequest) bool {
return request.UserID == user2
},
},
} {
t.Run(tc.name, func(t *testing.T) {
mgr := setupManager(t)
reqs, err := mgr.deleteRequestsStore.AddDeleteRequestGroup(context.Background(), tc.reqsToAdd)
require.NoError(t, err)
require.GreaterOrEqual(t, len(reqs), len(tc.reqsToAdd))

for _, req := range reqs {
if !tc.shouldMarkProcessed(req) {
continue
}
require.NoError(t, mgr.deleteRequestsStore.UpdateStatus(context.Background(), req, StatusProcessed))
}

inStoreReqs, err := mgr.deleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1)
require.NoError(t, err)

require.NoError(t, mgr.mergeShardedRequests(context.Background()))
inStoreReqsAfterMerging, err := mgr.deleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1)
require.NoError(t, err)

if tc.requestsShouldBeMerged {
require.Len(t, inStoreReqsAfterMerging, 1)
require.True(t, requestsAreEqual(inStoreReqsAfterMerging[0], DeleteRequest{
UserID: user1,
Query: tc.reqsToAdd[0].Query,
StartTime: tc.reqsToAdd[0].StartTime,
EndTime: tc.reqsToAdd[len(tc.reqsToAdd)-1].EndTime,
Status: StatusProcessed,
}))
} else {
require.Len(t, inStoreReqsAfterMerging, len(inStoreReqs))
require.Equal(t, inStoreReqs, inStoreReqsAfterMerging)
}
})
}
}

func setupManager(t *testing.T) *DeleteRequestsManager {
t.Helper()
// build the store
tempDir := t.TempDir()

workingDir := filepath.Join(tempDir, "working-dir")
objectStorePath := filepath.Join(tempDir, "object-store")

objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: objectStorePath,
})
require.NoError(t, err)
ds, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
require.NoError(t, err)

return NewDeleteRequestsManager(ds, time.Hour, 1, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
}
48 changes: 33 additions & 15 deletions pkg/compactor/deletion/delete_requests_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ var ErrDeleteRequestNotFound = errors.New("could not find matching delete reques
type DeleteRequestsStore interface {
AddDeleteRequestGroup(ctx context.Context, req []DeleteRequest) ([]DeleteRequest, error)
GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error)
GetAllDeleteRequests(ctx context.Context) ([]DeleteRequest, error)
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
UpdateStatus(ctx context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error
GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error)
RemoveDeleteRequests(ctx context.Context, req []DeleteRequest) error
GetCacheGenerationNumber(ctx context.Context, userID string) (string, error)
MergeShardedRequests(ctx context.Context, requestToAdd DeleteRequest, requestsToRemove []DeleteRequest) error
Stop()
Name() string
}
Expand Down Expand Up @@ -99,6 +101,7 @@ func (ds *deleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, reqs [
results = append(results, newReq)
ds.writeDeleteRequest(newReq, writeBatch)
}
ds.updateCacheGen(reqs[0].UserID, writeBatch)

if err := ds.indexClient.BatchWrite(ctx, writeBatch); err != nil {
return nil, err
Expand All @@ -107,6 +110,18 @@ func (ds *deleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, reqs [
return results, nil
}

func (ds *deleteRequestsStore) MergeShardedRequests(ctx context.Context, requestToAdd DeleteRequest, requestsToRemove []DeleteRequest) error {
writeBatch := ds.indexClient.NewWriteBatch()

ds.writeDeleteRequest(requestToAdd, writeBatch)

for _, req := range requestsToRemove {
ds.removeRequest(req, writeBatch)
}

return ds.indexClient.BatchWrite(ctx, writeBatch)
}

func newRequest(req DeleteRequest, requestID []byte, createdAt model.Time, seqNumber int) (DeleteRequest, error) {
req.RequestID = string(requestID)
req.Status = StatusReceived
Expand All @@ -124,14 +139,15 @@ func (ds *deleteRequestsStore) writeDeleteRequest(req DeleteRequest, writeBatch
// Add an entry with userID, requestID, and sequence number as range key and status as value to make it easy
// to manage and lookup status. We don't want to set anything in hash key here since we would want to find
// delete requests by just status
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(req.Status))

// Add another entry with additional details like creation time, time range of delete request and the logQL requests in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(ds.now()), int64(req.StartTime), int64(req.EndTime))
rangeValue := fmt.Sprintf("%x:%x:%x", int64(req.CreatedAt), int64(req.StartTime), int64(req.EndTime))
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue), []byte(req.Query))
}

// create a gen number for this result
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, generateCacheGenNumber())
func (ds *deleteRequestsStore) updateCacheGen(userID string, writeBatch index.WriteBatch) {
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, userID), []byte{}, generateCacheGenNumber())
}

// backwardCompatibleDeleteRequestHash generates the hash key for a delete request.
Expand Down Expand Up @@ -172,6 +188,14 @@ func (ds *deleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, st
})
}

// GetAllDeleteRequests returns all the delete requests.
func (ds *deleteRequestsStore) GetAllDeleteRequests(ctx context.Context) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, index.Query{
TableName: DeleteRequestsTableName,
HashValue: string(deleteRequestID),
})
}

// GetAllDeleteRequestsForUser returns all delete requests for a user.
func (ds *deleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, index.Query{
Expand All @@ -188,11 +212,6 @@ func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, req DeleteReque
writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus))

if newStatus == StatusProcessed {
// remove runtime filtering for deleted data
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, generateCacheGenNumber())
}

return ds.indexClient.BatchWrite(ctx, writeBatch)
}

Expand Down Expand Up @@ -319,20 +338,22 @@ func unmarshalDeleteRequestDetails(itr index.ReadBatchIterator, req DeleteReques
return DeleteRequest{}, nil
}

if err = requestWithDetails.SetQuery(string(itr.Value())); err != nil {
return DeleteRequest{}, err
}
requestWithDetails.Query = string(itr.Value())

return requestWithDetails, nil
}

// RemoveDeleteRequests the passed delete requests
func (ds *deleteRequestsStore) RemoveDeleteRequests(ctx context.Context, reqs []DeleteRequest) error {
if len(reqs) == 0 {
return nil
}
writeBatch := ds.indexClient.NewWriteBatch()

for _, r := range reqs {
ds.removeRequest(r, writeBatch)
}
ds.updateCacheGen(reqs[0].UserID, writeBatch)

return ds.indexClient.BatchWrite(ctx, writeBatch)
}
Expand All @@ -344,9 +365,6 @@ func (ds *deleteRequestsStore) removeRequest(req DeleteRequest, writeBatch index
// Add another entry with additional details like creation time, time range of delete request and selectors in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(req.CreatedAt), int64(req.StartTime), int64(req.EndTime))
writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue))

// ensure caches are invalidated
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, []byte(strconv.FormatInt(time.Now().UnixNano(), 10)))
}

func (ds *deleteRequestsStore) Name() string {
Expand Down
11 changes: 5 additions & 6 deletions pkg/compactor/deletion/delete_requests_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func TestDeleteRequestsStore(t *testing.T) {
require.NoError(t, err)
compareRequests(t, tc.user2Requests, user2Requests)

// caches should not be invalidated when we mark delete request as processed
updateGenNumber, err := tc.store.GetCacheGenerationNumber(context.Background(), user1)
require.NoError(t, err)
require.NotEqual(t, createGenNumber, updateGenNumber)
require.Equal(t, createGenNumber, updateGenNumber)

updateGenNumber2, err := tc.store.GetCacheGenerationNumber(context.Background(), user2)
require.NoError(t, err)
require.NotEqual(t, createGenNumber2, updateGenNumber2)
require.Equal(t, createGenNumber2, updateGenNumber2)

// delete the requests from the store updated previously
var remainingRequests []DeleteRequest
Expand Down Expand Up @@ -159,7 +160,7 @@ func TestBatchCreateGet(t *testing.T) {
results, err := tc.store.GetDeleteRequestGroup(context.Background(), savedRequests[0].UserID, savedRequests[0].RequestID)
require.NoError(t, err)

require.Equal(t, savedRequests, results)
compareRequests(t, savedRequests, results)
})

t.Run("updates a single request with a new status", func(t *testing.T) {
Expand Down Expand Up @@ -203,7 +204,7 @@ func compareRequests(t *testing.T, expected []DeleteRequest, actual []DeleteRequ
return actual[i].RequestID < actual[j].RequestID
})
for i, deleteRequest := range actual {
require.Equal(t, expected[i], deleteRequest)
require.True(t, requestsAreEqual(expected[i], deleteRequest))
}
}

Expand Down Expand Up @@ -238,8 +239,6 @@ func setup(t *testing.T) *testContext {

// build the store
tempDir := t.TempDir()
//tempDir := os.TempDir()
fmt.Println(tempDir)

workingDir := filepath.Join(tempDir, "working-dir")
objectStorePath := filepath.Join(tempDir, "object-store")
Expand Down
Loading
Loading