Skip to content

Commit

Permalink
enhance: allow to delete data when disk quota exhausted (milvus-io#37134
Browse files Browse the repository at this point in the history
)

- issue: milvus-io#37133

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Oct 25, 2024
1 parent ff0b7ea commit 1cc9cb4
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 14 deletions.
55 changes: 44 additions & 11 deletions internal/rootcoord/quota_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ var dqlRateTypes = typeutil.NewSet(
internalpb.RateType_DQLQuery,
)

type LimiterRange struct {
RateScope internalpb.RateScope
OpType opType
ExcludeRateTypes typeutil.Set[internalpb.RateType]
}

// QuotaCenter manages the quota and limitations of the whole cluster,
// it receives metrics info from DataNodes, QueryNodes and Proxies, and
// notifies Proxies to limit rate of requests from clients or reject
Expand Down Expand Up @@ -223,18 +229,19 @@ func initLimiter(limiterFunc func(internalpb.RateType) *ratelimitutil.Limiter, r
return rateLimiters
}

func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limiter, rateScope internalpb.RateScope, opType opType) {
func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limiter, limiterRange *LimiterRange) {
if node == nil {
log.Warn("update limiter failed, node is nil", zap.Any("rateScope", rateScope), zap.Any("opType", opType))
log.Warn("update limiter failed, node is nil", zap.Any("rateScope", limiterRange.RateScope), zap.Any("opType", limiterRange.OpType))
return
}
limiters := node.GetLimiters()
getRateTypes(rateScope, opType).Range(func(rt internalpb.RateType) bool {
getRateTypes(limiterRange.RateScope, limiterRange.OpType).
Complement(limiterRange.ExcludeRateTypes).Range(func(rt internalpb.RateType) bool {
originLimiter, ok := limiters.Get(rt)
if !ok {
log.Warn("update limiter failed, limiter not found",
zap.Any("rateScope", rateScope),
zap.Any("opType", opType),
zap.Any("rateScope", limiterRange.RateScope),
zap.Any("opType", limiterRange.OpType),
zap.Any("rateType", rt))
return true
}
Expand Down Expand Up @@ -552,9 +559,17 @@ func (q *QuotaCenter) collectMetrics() error {
// forceDenyWriting sets dml rates to 0 to reject all dml requests.
func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error {
log := log.Ctx(context.TODO()).WithRateGroup("quotaCenter.forceDenyWriting", 1.0, 60.0)
var excludeRange typeutil.Set[internalpb.RateType]
if errorCode == commonpb.ErrorCode_DiskQuotaExhausted {
excludeRange = typeutil.NewSet(internalpb.RateType_DMLDelete)
}
if cluster {
clusterLimiters := q.rateLimiter.GetRootLimiters()
updateLimiter(clusterLimiters, GetEarliestLimiter(), internalpb.RateScope_Cluster, dml)
updateLimiter(clusterLimiters, GetEarliestLimiter(), &LimiterRange{
RateScope: internalpb.RateScope_Cluster,
OpType: dml,
ExcludeRateTypes: excludeRange,
})
clusterLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
}

Expand All @@ -564,7 +579,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo
log.Warn("db limiter not found of db ID", zap.Int64("dbID", dbID))
continue
}
updateLimiter(dbLimiters, GetEarliestLimiter(), internalpb.RateScope_Database, dml)
updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{
RateScope: internalpb.RateScope_Database,
OpType: dml,
ExcludeRateTypes: excludeRange,
})
dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
}

Expand All @@ -581,7 +600,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo
zap.Int64("collectionID", collectionID))
continue
}
updateLimiter(collectionLimiter, GetEarliestLimiter(), internalpb.RateScope_Collection, dml)
updateLimiter(collectionLimiter, GetEarliestLimiter(), &LimiterRange{
RateScope: internalpb.RateScope_Collection,
OpType: dml,
ExcludeRateTypes: excludeRange,
})
collectionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
}

Expand All @@ -600,7 +623,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo
zap.Int64("partitionID", partitionID))
continue
}
updateLimiter(partitionLimiter, GetEarliestLimiter(), internalpb.RateScope_Partition, dml)
updateLimiter(partitionLimiter, GetEarliestLimiter(), &LimiterRange{
RateScope: internalpb.RateScope_Partition,
OpType: dml,
ExcludeRateTypes: excludeRange,
})
partitionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
}
}
Expand All @@ -624,7 +651,10 @@ func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode, cluster boo
for dbID, collectionIDToPartIDs := range q.readableCollections {
for collectionID := range collectionIDToPartIDs {
collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collectionID)
updateLimiter(collectionLimiter, GetEarliestLimiter(), internalpb.RateScope_Collection, dql)
updateLimiter(collectionLimiter, GetEarliestLimiter(), &LimiterRange{
RateScope: internalpb.RateScope_Collection,
OpType: dql,
})
collectionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToRead, errorCode)
collectionIDs = append(collectionIDs, collectionID)
}
Expand All @@ -642,7 +672,10 @@ func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode, cluster boo
log.Warn("db limiter not found of db ID", zap.Int64("dbID", dbID))
continue
}
updateLimiter(dbLimiters, GetEarliestLimiter(), internalpb.RateScope_Database, dql)
updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{
RateScope: internalpb.RateScope_Database,
OpType: dql,
})
dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToRead, errorCode)
mlog.RatedWarn(10, "QuotaCenter force to deny reading",
zap.Int64s("dbIDs", dbIDs),
Expand Down
71 changes: 68 additions & 3 deletions internal/rootcoord/quota_center_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,65 @@ func TestQuotaCenter(t *testing.T) {
}
})

t.Run("disk quota exhausted", func(t *testing.T) {
qc := mocks.NewMockQueryCoordClient(t)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().
GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).
Return(nil, merr.ErrCollectionNotFound).
Maybe()

quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
quotaCenter.collectionIDToDBID = typeutil.NewConcurrentMap[int64, int64]()
quotaCenter.collectionIDToDBID.Insert(1, 0)
quotaCenter.collectionIDToDBID.Insert(2, 0)

quotaCenter.writableCollections = map[int64]map[int64][]int64{
0: collectionIDToPartitionIDs,
}
quotaCenter.writableCollections[0][1] = append(quotaCenter.writableCollections[0][1], 1000)

err := quotaCenter.resetAllCurrentRates()
assert.NoError(t, err)

updateLimit := func(node *interalratelimitutil.RateLimiterNode, rateType internalpb.RateType, limit int64) {
limiter, ok := node.GetLimiters().Get(rateType)
if !ok {
return
}
limiter.SetLimit(Limit(limit))
}
assertLimit := func(node *interalratelimitutil.RateLimiterNode, rateType internalpb.RateType, expectValue int64) {
limiter, ok := node.GetLimiters().Get(rateType)
if !ok {
assert.FailNow(t, "limiter not found")
return
}
assert.EqualValues(t, expectValue, limiter.Limit())
}

updateLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLInsert, 10)
updateLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLDelete, 9)
updateLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLInsert, 10)
updateLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLDelete, 9)
updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLInsert, 10)
updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLDelete, 9)
updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLInsert, 10)
updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLDelete, 9)

err = quotaCenter.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, []int64{0}, []int64{1}, nil)
assert.NoError(t, err)

assertLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLInsert, 0)
assertLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLDelete, 9)
assertLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLInsert, 0)
assertLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLDelete, 9)
assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLInsert, 0)
assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLDelete, 9)
assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLInsert, 10)
assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLDelete, 9)
})

t.Run("test calculateRates", func(t *testing.T) {
forceBak := Params.QuotaConfig.ForceDenyWriting.GetValue()
paramtable.Get().Save(Params.QuotaConfig.ForceDenyWriting.Key, "false")
Expand Down Expand Up @@ -854,7 +913,7 @@ func TestQuotaCenter(t *testing.T) {
b, _ := limiters.Get(internalpb.RateType_DMLUpsert)
assert.Equal(t, Limit(0), b.Limit())
c, _ := limiters.Get(internalpb.RateType_DMLDelete)
assert.Equal(t, Limit(0), c.Limit())
assert.NotEqual(t, Limit(0), c.Limit())
}
}
}
Expand Down Expand Up @@ -1473,14 +1532,20 @@ func TestQuotaCenterSuite(t *testing.T) {

func TestUpdateLimiter(t *testing.T) {
t.Run("nil node", func(t *testing.T) {
updateLimiter(nil, nil, internalpb.RateScope_Database, dql)
updateLimiter(nil, nil, &LimiterRange{
RateScope: internalpb.RateScope_Collection,
OpType: dql,
})
})

t.Run("normal op", func(t *testing.T) {
node := interalratelimitutil.NewRateLimiterNode(internalpb.RateScope_Collection)
node.GetLimiters().Insert(internalpb.RateType_DQLSearch, ratelimitutil.NewLimiter(5, 5))
newLimit := ratelimitutil.NewLimiter(10, 10)
updateLimiter(node, newLimit, internalpb.RateScope_Collection, dql)
updateLimiter(node, newLimit, &LimiterRange{
RateScope: internalpb.RateScope_Collection,
OpType: dql,
})

searchLimit, _ := node.GetLimiters().Get(internalpb.RateType_DQLSearch)
assert.Equal(t, Limit(10), searchLimit.Limit())
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/typeutil/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (set Set[T]) Union(other Set[T]) Set[T] {

// Complement returns the complement with the given set
func (set Set[T]) Complement(other Set[T]) Set[T] {
if other == nil {
return set
}
ret := NewSet(set.Collect()...)
ret.Remove(other.Collect()...)
return ret
Expand Down

0 comments on commit 1cc9cb4

Please sign in to comment.