Skip to content

Commit

Permalink
ratelimit specification (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
MI-cool authored Aug 10, 2023
1 parent f1e102b commit e72071e
Show file tree
Hide file tree
Showing 16 changed files with 71 additions and 1,623 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/modern-go/reflect2 v1.0.2
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1
github.com/polarismesh/specification v1.3.2
github.com/polarismesh/specification v1.4.0-alpha.5
github.com/prometheus/client_golang v1.12.2
github.com/smartystreets/goconvey v1.7.2
github.com/spaolacci/murmur3 v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polarismesh/specification v1.3.2 h1:NG8guSTi7brxEMTG39VVmRSZeS7XvacKnrpoOAVvOtU=
github.com/polarismesh/specification v1.3.2/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
github.com/polarismesh/specification v1.4.0-alpha.5 h1:1bUiGB2DdL6wlli+TfJ2EoDLcR58fAUzm9FxKLtVBE0=
github.com/polarismesh/specification v1.4.0-alpha.5/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
Expand Down
49 changes: 25 additions & 24 deletions pkg/flow/quota/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package quota
import (
"context"
"fmt"
"github.com/polarismesh/specification/source/go/api/v1/traffic_manage/ratelimiter"
"io"
"net"
"strings"
Expand All @@ -42,17 +43,17 @@ import (
// ResponseCallBack 应答回调函数
type ResponseCallBack interface {
// OnInitResponse 应答回调函数
OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, curTimeMilli int64)
OnInitResponse(counter *ratelimiter.QuotaCounter, duration time.Duration, curTimeMilli int64)
// OnReportResponse 应答回调函数
OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64)
OnReportResponse(counter *ratelimiter.QuotaLeft, duration time.Duration, curTimeMilli int64)
}

// RateLimitMsgSender 限流消息同步器
type RateLimitMsgSender interface {
// HasInitialized 是否已经初始化
HasInitialized(svcKey model.ServiceKey, labels string) bool
// SendInitRequest 发送初始化请求
SendInitRequest(request *rlimitV2.RateLimitInitRequest, callback ResponseCallBack)
SendInitRequest(request *ratelimiter.RateLimitInitRequest, callback ResponseCallBack)
// SendReportRequest 发送上报请求
SendReportRequest(request *rlimitV2.ClientRateLimitReportRequest) error
// AdjustTime 同步时间
Expand Down Expand Up @@ -116,9 +117,9 @@ type StreamCounterSet struct {
// 客户端连接
conn *grpc.ClientConn
// 限流客户端
client rlimitV2.RateLimitGRPCV2Client
client ratelimiter.RateLimitGRPCV2Client
// 消息流
serviceStream rlimitV2.RateLimitGRPCV2_ServiceClient
serviceStream ratelimiter.RateLimitGRPCV2_ServiceClient
// 已发起初始化的窗口,初始化完毕后,value为大于0的值
initialingWindows map[CounterIdentifier]*InitializeRecord
// 回调函数
Expand Down Expand Up @@ -192,7 +193,7 @@ func (s *StreamCounterSet) createConnection() (*grpc.ClientConn, error) {

// preInitCheck 初始化操作的前置检查
func (s *StreamCounterSet) preInitCheck(
counterIdentifier CounterIdentifier, callback ResponseCallBack) rlimitV2.RateLimitGRPCV2_ServiceClient {
counterIdentifier CounterIdentifier, callback ResponseCallBack) ratelimiter.RateLimitGRPCV2_ServiceClient {
s.mutex.Lock()
defer s.mutex.Unlock()
if nil == s.conn {
Expand All @@ -205,7 +206,7 @@ func (s *StreamCounterSet) preInitCheck(
s.conn = conn
}
if nil == s.client {
s.client = rlimitV2.NewRateLimitGRPCV2Client(s.conn)
s.client = ratelimiter.NewRateLimitGRPCV2Client(s.conn)
}
if nil == s.serviceStream {
selfHost := s.asyncConnector.getIPString(s.HostIdentifier.host, s.HostIdentifier.port)
Expand Down Expand Up @@ -249,7 +250,7 @@ func createHeaderContext(headers map[string]string) context.Context {
}

// SendInitRequest 发送初始化请求
func (s *StreamCounterSet) SendInitRequest(initReq *rlimitV2.RateLimitInitRequest, callback ResponseCallBack) {
func (s *StreamCounterSet) SendInitRequest(initReq *ratelimiter.RateLimitInitRequest, callback ResponseCallBack) {
counterIdentifier := CounterIdentifier{
service: initReq.GetTarget().GetService(),
namespace: initReq.GetTarget().GetNamespace(),
Expand All @@ -260,8 +261,8 @@ func (s *StreamCounterSet) SendInitRequest(initReq *rlimitV2.RateLimitInitReques
return
}
// 发起初始化
request := &rlimitV2.RateLimitRequest{
Cmd: rlimitV2.RateLimitCmd_INIT,
request := &ratelimiter.RateLimitRequest{
Cmd: ratelimiter.RateLimitCmd_INIT,
RateLimitInitRequest: initReq,
}
if log.GetNetworkLogger().IsLevelEnabled(log.DebugLog) {
Expand All @@ -275,7 +276,7 @@ func (s *StreamCounterSet) SendInitRequest(initReq *rlimitV2.RateLimitInitReques
}

// checkAndCreateClient 检查并创建客户端
func (s *StreamCounterSet) checkAndCreateClient() (rlimitV2.RateLimitGRPCV2Client, error) {
func (s *StreamCounterSet) checkAndCreateClient() (ratelimiter.RateLimitGRPCV2Client, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
curTimeMilli := model.CurrentMillisecond()
Expand All @@ -298,7 +299,7 @@ func (s *StreamCounterSet) checkAndCreateClient() (rlimitV2.RateLimitGRPCV2Clien
s.conn = conn
}
if reflect2.IsNil(s.client) {
s.client = rlimitV2.NewRateLimitGRPCV2Client(s.conn)
s.client = ratelimiter.NewRateLimitGRPCV2Client(s.conn)
}
return s.client, nil
}
Expand Down Expand Up @@ -357,7 +358,7 @@ func (s *StreamCounterSet) AdjustTime() int64 {
}
ctx, cancel := context.WithTimeout(context.Background(), s.asyncConnector.msgTimeout)
defer cancel()
timeResp, err := client.TimeAdjust(ctx, &rlimitV2.TimeAdjustRequest{})
timeResp, err := client.TimeAdjust(ctx, &ratelimiter.TimeAdjustRequest{})
atomic.StoreInt64(&s.lastSyncTimeMilli, model.CurrentMillisecond())
if err != nil {
log.GetNetworkLogger().Errorf("[RateLimit]fail to send timeAdjust message to %s:%d, key is %s, err is %v",
Expand Down Expand Up @@ -386,7 +387,7 @@ func (s *StreamCounterSet) closeConnection() {
}

// cleanup 清理stream
func (s *StreamCounterSet) cleanup(serviceStream rlimitV2.RateLimitGRPCV2_ServiceClient) {
func (s *StreamCounterSet) cleanup(serviceStream ratelimiter.RateLimitGRPCV2_ServiceClient) {
s.asyncConnector.dropStreamCounterSet(s, serviceStream)
s.closeConnection()
}
Expand All @@ -406,7 +407,7 @@ func IsSuccess(code uint32) bool {
}

// updateByInitResp 通过初始化应答来更新
func (s *StreamCounterSet) updateByInitResp(identifier CounterIdentifier, initResp *rlimitV2.RateLimitInitResponse) {
func (s *StreamCounterSet) updateByInitResp(identifier CounterIdentifier, initResp *ratelimiter.RateLimitInitResponse) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.clientKey = initResp.GetClientKey()
Expand All @@ -427,7 +428,7 @@ func (s *StreamCounterSet) updateByInitResp(identifier CounterIdentifier, initRe
}

// processInitResponse 处理初始化应答
func (s *StreamCounterSet) processInitResponse(initResp *rlimitV2.RateLimitInitResponse) bool {
func (s *StreamCounterSet) processInitResponse(initResp *ratelimiter.RateLimitInitResponse) bool {
target := initResp.GetTarget()
identifier := CounterIdentifier{
service: target.GetService(),
Expand All @@ -454,7 +455,7 @@ func (s *StreamCounterSet) processInitResponse(initResp *rlimitV2.RateLimitInitR
}

// processReportResponse 处理上报的应答
func (s *StreamCounterSet) processReportResponse(reportRsp *rlimitV2.RateLimitReportResponse) bool {
func (s *StreamCounterSet) processReportResponse(reportRsp *ratelimiter.RateLimitReportResponse) bool {
if IsSuccess(reportRsp.GetCode()) {
s.mutex.RLock()
nowMilli := model.CurrentMillisecond()
Expand All @@ -474,7 +475,7 @@ func (s *StreamCounterSet) processReportResponse(reportRsp *rlimitV2.RateLimitRe
}

// processResponse 处理应答消息
func (s *StreamCounterSet) processResponse(serviceStream rlimitV2.RateLimitGRPCV2_ServiceClient) {
func (s *StreamCounterSet) processResponse(serviceStream ratelimiter.RateLimitGRPCV2_ServiceClient) {
defer s.cleanup(serviceStream)
for {
resp, err := serviceStream.Recv()
Expand All @@ -486,7 +487,7 @@ func (s *StreamCounterSet) processResponse(serviceStream rlimitV2.RateLimitGRPCV
return
}
switch resp.Cmd {
case rlimitV2.RateLimitCmd_INIT:
case ratelimiter.RateLimitCmd_INIT:
initResp := resp.GetRateLimitInitResponse()
if log.GetNetworkLogger().IsLevelEnabled(log.DebugLog) {
initRspStr, _ := (&jsonpb.Marshaler{}).MarshalToString(initResp)
Expand All @@ -495,7 +496,7 @@ func (s *StreamCounterSet) processResponse(serviceStream rlimitV2.RateLimitGRPCV
if !s.processInitResponse(initResp) {
return
}
case rlimitV2.RateLimitCmd_ACQUIRE:
case ratelimiter.RateLimitCmd_ACQUIRE:
reportResp := resp.GetRateLimitReportResponse()
if log.GetNetworkLogger().IsLevelEnabled(log.DebugLog) {
reportRspStr, _ := (&jsonpb.Marshaler{}).MarshalToString(reportResp)
Expand Down Expand Up @@ -527,7 +528,7 @@ func (s *StreamCounterSet) SendReportRequest(clientReportReq *rlimitV2.ClientRat
if nil == record {
return fmt.Errorf("fail to find initialingWindow, identifier is %s", identifier)
}
reportReq := &rlimitV2.RateLimitReportRequest{}
reportReq := &ratelimiter.RateLimitReportRequest{}
reportReq.ClientKey = s.clientKey
// 转换系统时间
reportReq.Timestamp = clientReportReq.Timestamp
Expand All @@ -540,8 +541,8 @@ func (s *StreamCounterSet) SendReportRequest(clientReportReq *rlimitV2.ClientRat
reportReq.QuotaUses = append(reportReq.QuotaUses, sum)
}
// 发起上报调用
request := &rlimitV2.RateLimitRequest{
Cmd: rlimitV2.RateLimitCmd_ACQUIRE,
request := &ratelimiter.RateLimitRequest{
Cmd: ratelimiter.RateLimitCmd_ACQUIRE,
RateLimitReportRequest: reportReq,
}
if log.GetNetworkLogger().IsLevelEnabled(log.DebugLog) {
Expand Down Expand Up @@ -635,7 +636,7 @@ func NewAsyncRateLimitConnector(valueCtx model.ValueContext, cfg config.Configur

// dropStreamCounterSet 淘汰流管理器
func (a *asyncRateLimitConnector) dropStreamCounterSet(
streamCounterSet *StreamCounterSet, serviceStream rlimitV2.RateLimitGRPCV2_ServiceClient) {
streamCounterSet *StreamCounterSet, serviceStream ratelimiter.RateLimitGRPCV2_ServiceClient) {
a.mutex.Lock()
defer a.mutex.Unlock()
existStream := a.streams[*streamCounterSet.HostIdentifier]
Expand Down
17 changes: 9 additions & 8 deletions pkg/flow/quota/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/modern-go/reflect2"
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
slimiter "github.com/polarismesh/specification/source/go/api/v1/traffic_manage/ratelimiter"

"github.com/polarismesh/polaris-go/pkg/config"
"github.com/polarismesh/polaris-go/pkg/flow/data"
Expand Down Expand Up @@ -504,7 +505,7 @@ func (r *RateLimitWindow) Init() {
}

func (r *RateLimitWindow) buildInitTargetStr() string {
target := rlimitV2.LimitTarget{
target := slimiter.LimitTarget{
Namespace: r.SvcKey.Namespace,
Service: r.SvcKey.Service,
Labels: r.Labels,
Expand All @@ -523,19 +524,19 @@ func (r *RateLimitWindow) AsyncRateLimitConnector() AsyncRateLimitConnector {
}

// InitializeRequest 转换成限流PB初始化消息
func (r *RateLimitWindow) InitializeRequest() *rlimitV2.RateLimitInitRequest {
func (r *RateLimitWindow) InitializeRequest() *slimiter.RateLimitInitRequest {
clientID := r.Engine().GetContext().GetClientId()
initReq := &rlimitV2.RateLimitInitRequest{}
initReq := &slimiter.RateLimitInitRequest{}
initReq.ClientId = clientID
initReq.Target = &rlimitV2.LimitTarget{}
initReq.Target = &slimiter.LimitTarget{}
initReq.Target.Namespace = r.SvcKey.Namespace
initReq.Target.Service = r.SvcKey.Service
initReq.Target.Labels = r.Labels

quotaMode := rlimitV2.QuotaMode(r.Rule.GetAmountMode())
quotaMode := slimiter.QuotaMode(r.Rule.GetAmountMode())
tokenBuckets := r.trafficShapingBucket.GetAmountInfos()
for _, tokenBucket := range tokenBuckets {
quotaTotal := &rlimitV2.QuotaTotal{
quotaTotal := &slimiter.QuotaTotal{
Mode: quotaMode,
Duration: tokenBucket.ValidDuration,
MaxAmount: tokenBucket.MaxAmount,
Expand Down Expand Up @@ -567,14 +568,14 @@ func (r *RateLimitWindow) acquireRequest() *rlimitV2.ClientRateLimitReportReques
Service: r.SvcKey.Service,
Namespace: r.SvcKey.Namespace,
Labels: r.Labels,
QuotaUsed: make(map[time.Duration]*rlimitV2.QuotaSum),
QuotaUsed: make(map[time.Duration]*slimiter.QuotaSum),
}
curTimeMilli := r.toServerTimeMilli(model.CurrentMillisecond())
usageInfo := r.trafficShapingBucket.GetQuotaUsed(curTimeMilli)
reportReq.Timestamp = usageInfo.CurTimeMilli
if len(usageInfo.Passed) > 0 {
for durationMilli, passed := range usageInfo.Passed {
reportReq.QuotaUsed[time.Duration(durationMilli)*time.Millisecond] = &rlimitV2.QuotaSum{
reportReq.QuotaUsed[time.Duration(durationMilli)*time.Millisecond] = &slimiter.QuotaSum{
Used: passed,
Limited: usageInfo.Limited[durationMilli],
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/flow/quota/window_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"time"

apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
slimiter "github.com/polarismesh/specification/source/go/api/v1/traffic_manage/ratelimiter"

"github.com/polarismesh/polaris-go/pkg/log"
"github.com/polarismesh/polaris-go/pkg/model"
rlimitV2 "github.com/polarismesh/polaris-go/pkg/model/pb/metric/v2"
"github.com/polarismesh/polaris-go/pkg/plugin/ratelimiter"
)

Expand Down Expand Up @@ -79,7 +79,7 @@ func (r *RateLimitWindow) DoAsyncRemoteAcquire() error {
}

// OnInitResponse 应答回调函数
func (r *RateLimitWindow) OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, srvTimeMilli int64) {
func (r *RateLimitWindow) OnInitResponse(counter *slimiter.QuotaCounter, duration time.Duration, srvTimeMilli int64) {
r.SetStatus(Initialized)
log.GetBaseLogger().Infof("[RateLimit]window %s changed to initialized", r.uniqueKey)
r.trafficShapingBucket.OnRemoteUpdate(ratelimiter.RemoteQuotaResult{
Expand All @@ -92,7 +92,7 @@ func (r *RateLimitWindow) OnInitResponse(counter *rlimitV2.QuotaCounter, duratio
}

// OnReportResponse 应答回调函数
func (r *RateLimitWindow) OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64) {
func (r *RateLimitWindow) OnReportResponse(counter *slimiter.QuotaLeft, duration time.Duration, curTimeMilli int64) {
r.trafficShapingBucket.OnRemoteUpdate(ratelimiter.RemoteQuotaResult{
Left: counter.GetLeft(),
ClientCount: counter.GetClientCount(),
Expand Down
Loading

0 comments on commit e72071e

Please sign in to comment.