From 1cc9cb49ad15298b6c783b540b6537a21588df80 Mon Sep 17 00:00:00 2001 From: SimFG Date: Fri, 25 Oct 2024 16:47:29 +0800 Subject: [PATCH] enhance: allow to delete data when disk quota exhausted (#37134) - issue: #37133 Signed-off-by: SimFG --- internal/rootcoord/quota_center.go | 55 +++++++++++++++---- internal/rootcoord/quota_center_test.go | 71 +++++++++++++++++++++++-- pkg/util/typeutil/set.go | 3 ++ 3 files changed, 115 insertions(+), 14 deletions(-) diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 70d239f5310ed..cca525f8fbbea 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -109,6 +109,12 @@ var dqlRateTypes = typeutil.NewSet( internalpb.RateType_DQLQuery, ) +type LimiterRange struct { + RateScope internalpb.RateScope + OpType opType + ExcludeRateTypes typeutil.Set[internalpb.RateType] +} + // QuotaCenter manages the quota and limitations of the whole cluster, // it receives metrics info from DataNodes, QueryNodes and Proxies, and // notifies Proxies to limit rate of requests from clients or reject @@ -223,18 +229,19 @@ func initLimiter(limiterFunc func(internalpb.RateType) *ratelimitutil.Limiter, r return rateLimiters } -func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limiter, rateScope internalpb.RateScope, opType opType) { +func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limiter, limiterRange *LimiterRange) { if node == nil { - log.Warn("update limiter failed, node is nil", zap.Any("rateScope", rateScope), zap.Any("opType", opType)) + log.Warn("update limiter failed, node is nil", zap.Any("rateScope", limiterRange.RateScope), zap.Any("opType", limiterRange.OpType)) return } limiters := node.GetLimiters() - getRateTypes(rateScope, opType).Range(func(rt internalpb.RateType) bool { + getRateTypes(limiterRange.RateScope, limiterRange.OpType). + Complement(limiterRange.ExcludeRateTypes).Range(func(rt internalpb.RateType) bool { originLimiter, ok := limiters.Get(rt) if !ok { log.Warn("update limiter failed, limiter not found", - zap.Any("rateScope", rateScope), - zap.Any("opType", opType), + zap.Any("rateScope", limiterRange.RateScope), + zap.Any("opType", limiterRange.OpType), zap.Any("rateType", rt)) return true } @@ -552,9 +559,17 @@ func (q *QuotaCenter) collectMetrics() error { // forceDenyWriting sets dml rates to 0 to reject all dml requests. func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error { log := log.Ctx(context.TODO()).WithRateGroup("quotaCenter.forceDenyWriting", 1.0, 60.0) + var excludeRange typeutil.Set[internalpb.RateType] + if errorCode == commonpb.ErrorCode_DiskQuotaExhausted { + excludeRange = typeutil.NewSet(internalpb.RateType_DMLDelete) + } if cluster { clusterLimiters := q.rateLimiter.GetRootLimiters() - updateLimiter(clusterLimiters, GetEarliestLimiter(), internalpb.RateScope_Cluster, dml) + updateLimiter(clusterLimiters, GetEarliestLimiter(), &LimiterRange{ + RateScope: internalpb.RateScope_Cluster, + OpType: dml, + ExcludeRateTypes: excludeRange, + }) clusterLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode) } @@ -564,7 +579,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo log.Warn("db limiter not found of db ID", zap.Int64("dbID", dbID)) continue } - updateLimiter(dbLimiters, GetEarliestLimiter(), internalpb.RateScope_Database, dml) + updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{ + RateScope: internalpb.RateScope_Database, + OpType: dml, + ExcludeRateTypes: excludeRange, + }) dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode) } @@ -581,7 +600,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo zap.Int64("collectionID", collectionID)) continue } - updateLimiter(collectionLimiter, GetEarliestLimiter(), internalpb.RateScope_Collection, dml) + updateLimiter(collectionLimiter, GetEarliestLimiter(), &LimiterRange{ + RateScope: internalpb.RateScope_Collection, + OpType: dml, + ExcludeRateTypes: excludeRange, + }) collectionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode) } @@ -600,7 +623,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo zap.Int64("partitionID", partitionID)) continue } - updateLimiter(partitionLimiter, GetEarliestLimiter(), internalpb.RateScope_Partition, dml) + updateLimiter(partitionLimiter, GetEarliestLimiter(), &LimiterRange{ + RateScope: internalpb.RateScope_Partition, + OpType: dml, + ExcludeRateTypes: excludeRange, + }) partitionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode) } } @@ -624,7 +651,10 @@ func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode, cluster boo for dbID, collectionIDToPartIDs := range q.readableCollections { for collectionID := range collectionIDToPartIDs { collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collectionID) - updateLimiter(collectionLimiter, GetEarliestLimiter(), internalpb.RateScope_Collection, dql) + updateLimiter(collectionLimiter, GetEarliestLimiter(), &LimiterRange{ + RateScope: internalpb.RateScope_Collection, + OpType: dql, + }) collectionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToRead, errorCode) collectionIDs = append(collectionIDs, collectionID) } @@ -642,7 +672,10 @@ func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode, cluster boo log.Warn("db limiter not found of db ID", zap.Int64("dbID", dbID)) continue } - updateLimiter(dbLimiters, GetEarliestLimiter(), internalpb.RateScope_Database, dql) + updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{ + RateScope: internalpb.RateScope_Database, + OpType: dql, + }) dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToRead, errorCode) mlog.RatedWarn(10, "QuotaCenter force to deny reading", zap.Int64s("dbIDs", dbIDs), diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index e21c6b300ccef..3ad5653769d3f 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -393,6 +393,65 @@ func TestQuotaCenter(t *testing.T) { } }) + t.Run("disk quota exhausted", func(t *testing.T) { + qc := mocks.NewMockQueryCoordClient(t) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT(). + GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything). + Return(nil, merr.ErrCollectionNotFound). + Maybe() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + quotaCenter.collectionIDToDBID = typeutil.NewConcurrentMap[int64, int64]() + quotaCenter.collectionIDToDBID.Insert(1, 0) + quotaCenter.collectionIDToDBID.Insert(2, 0) + + quotaCenter.writableCollections = map[int64]map[int64][]int64{ + 0: collectionIDToPartitionIDs, + } + quotaCenter.writableCollections[0][1] = append(quotaCenter.writableCollections[0][1], 1000) + + err := quotaCenter.resetAllCurrentRates() + assert.NoError(t, err) + + updateLimit := func(node *interalratelimitutil.RateLimiterNode, rateType internalpb.RateType, limit int64) { + limiter, ok := node.GetLimiters().Get(rateType) + if !ok { + return + } + limiter.SetLimit(Limit(limit)) + } + assertLimit := func(node *interalratelimitutil.RateLimiterNode, rateType internalpb.RateType, expectValue int64) { + limiter, ok := node.GetLimiters().Get(rateType) + if !ok { + assert.FailNow(t, "limiter not found") + return + } + assert.EqualValues(t, expectValue, limiter.Limit()) + } + + updateLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLInsert, 10) + updateLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLDelete, 9) + updateLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLInsert, 10) + updateLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLDelete, 9) + updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLInsert, 10) + updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLDelete, 9) + updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLInsert, 10) + updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLDelete, 9) + + err = quotaCenter.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, []int64{0}, []int64{1}, nil) + assert.NoError(t, err) + + assertLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLInsert, 0) + assertLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLDelete, 9) + assertLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLInsert, 0) + assertLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLDelete, 9) + assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLInsert, 0) + assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLDelete, 9) + assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLInsert, 10) + assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLDelete, 9) + }) + t.Run("test calculateRates", func(t *testing.T) { forceBak := Params.QuotaConfig.ForceDenyWriting.GetValue() paramtable.Get().Save(Params.QuotaConfig.ForceDenyWriting.Key, "false") @@ -854,7 +913,7 @@ func TestQuotaCenter(t *testing.T) { b, _ := limiters.Get(internalpb.RateType_DMLUpsert) assert.Equal(t, Limit(0), b.Limit()) c, _ := limiters.Get(internalpb.RateType_DMLDelete) - assert.Equal(t, Limit(0), c.Limit()) + assert.NotEqual(t, Limit(0), c.Limit()) } } } @@ -1473,14 +1532,20 @@ func TestQuotaCenterSuite(t *testing.T) { func TestUpdateLimiter(t *testing.T) { t.Run("nil node", func(t *testing.T) { - updateLimiter(nil, nil, internalpb.RateScope_Database, dql) + updateLimiter(nil, nil, &LimiterRange{ + RateScope: internalpb.RateScope_Collection, + OpType: dql, + }) }) t.Run("normal op", func(t *testing.T) { node := interalratelimitutil.NewRateLimiterNode(internalpb.RateScope_Collection) node.GetLimiters().Insert(internalpb.RateType_DQLSearch, ratelimitutil.NewLimiter(5, 5)) newLimit := ratelimitutil.NewLimiter(10, 10) - updateLimiter(node, newLimit, internalpb.RateScope_Collection, dql) + updateLimiter(node, newLimit, &LimiterRange{ + RateScope: internalpb.RateScope_Collection, + OpType: dql, + }) searchLimit, _ := node.GetLimiters().Get(internalpb.RateType_DQLSearch) assert.Equal(t, Limit(10), searchLimit.Limit()) diff --git a/pkg/util/typeutil/set.go b/pkg/util/typeutil/set.go index ea7e145aa525b..a76fcb507e2f4 100644 --- a/pkg/util/typeutil/set.go +++ b/pkg/util/typeutil/set.go @@ -67,6 +67,9 @@ func (set Set[T]) Union(other Set[T]) Set[T] { // Complement returns the complement with the given set func (set Set[T]) Complement(other Set[T]) Set[T] { + if other == nil { + return set + } ret := NewSet(set.Collect()...) ret.Remove(other.Collect()...) return ret