diff --git a/examples/ratelimit/consumer/main.go b/examples/ratelimit/consumer/main.go index ed31d402..1c29ca5f 100644 --- a/examples/ratelimit/consumer/main.go +++ b/examples/ratelimit/consumer/main.go @@ -96,7 +96,7 @@ func (svr *PolarisConsumer) runWebServer() { if err != nil { log.Printf("[errot] send request to %s:%d fail : %s", instance.GetHost(), instance.GetPort(), err) rw.WriteHeader(http.StatusInternalServerError) - _, _ = rw.Write([]byte(fmt.Sprintf("[errot] send request to %s:%d fail : %s", instance.GetHost(), instance.GetPort(), err))) + _, _ = rw.Write([]byte(fmt.Sprintf("[error] send request to %s:%d fail : %s", instance.GetHost(), instance.GetPort(), err))) time.Sleep(time.Millisecond * time.Duration(rand.Intn(10))) delay := time.Since(start) diff --git a/examples/ratelimit/provider/main.go b/examples/ratelimit/provider/main.go index 5267a6ec..4d61cbbe 100644 --- a/examples/ratelimit/provider/main.go +++ b/examples/ratelimit/provider/main.go @@ -20,6 +20,8 @@ package main import ( "flag" "fmt" + "github.com/polarismesh/polaris-go" + "github.com/polarismesh/polaris-go/pkg/model" "log" "net" "net/http" @@ -28,9 +30,6 @@ import ( "strings" "syscall" "time" - - "github.com/polarismesh/polaris-go" - "github.com/polarismesh/polaris-go/pkg/model" ) var ( @@ -81,22 +80,16 @@ func (svr *PolarisProvider) runWebServer() { quotaReq.SetNamespace(namespace) quotaReq.SetService(service) - log.Printf("[info] get quota req : ns=%s, svc=%s, method=%v, labels=%v", - quotaReq.GetNamespace(), quotaReq.GetService(), quotaReq.GetMethod(), quotaReq.GetLabels()) - start := time.Now() + //log.Printf("[info] get quota req : ns=%s, svc=%s, method=%v, labels=%v", + // quotaReq.GetNamespace(), quotaReq.GetService(), quotaReq.GetMethod(), quotaReq.GetLabels()) + //start := time.Now() resp, err := svr.limiter.GetQuota(quotaReq) if err != nil { rw.WriteHeader(http.StatusInternalServerError) _, _ = rw.Write([]byte(fmt.Sprintf("[error] fail to GetQuota, err is %v", err))) return } - log.Printf("[info] %s get quota resp : code=%d, info=%s", time.Since(start).String(), resp.Get().Code, resp.Get().Info) - - if err != nil { - rw.WriteHeader(http.StatusInternalServerError) - _, _ = rw.Write([]byte(fmt.Sprintf("[error] fail to GetQuota, err is %v", err))) - return - } + //log.Printf("[info] %s get quota resp : code=%d, info=%s", time.Since(start).String(), resp.Get().Code, resp.Get().Info) if resp.Get().Code != model.QuotaResultOk { rw.WriteHeader(http.StatusTooManyRequests) @@ -106,6 +99,10 @@ func (svr *PolarisProvider) runWebServer() { rw.WriteHeader(http.StatusOK) _, _ = rw.Write([]byte(fmt.Sprintf("Hello, I'm RateLimitEchoServer Provider, My host : %s:%d", svr.host, svr.port))) + + // 模拟远端处理耗时 + time.Sleep(time.Millisecond * 500) + resp.Release() }) ln, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) @@ -117,7 +114,9 @@ func (svr *PolarisProvider) runWebServer() { go func() { log.Printf("[INFO] start http server, listen port is %v", svr.port) - if err := http.Serve(ln, nil); err != nil { + s := &http.Server{} + s.SetKeepAlivesEnabled(false) + if err := s.Serve(ln); err != nil { log.Fatalf("[ERROR]fail to run webServer, err is %v", err) } }() diff --git a/go.mod b/go.mod index 60242425..01acd169 100644 --- a/go.mod +++ b/go.mod @@ -21,9 +21,10 @@ require ( github.com/pkg/errors v0.9.1 github.com/polarismesh/specification v1.4.1 github.com/prometheus/client_golang v1.12.2 + github.com/shirou/gopsutil/v3 v3.23.8 github.com/smartystreets/goconvey v1.7.2 github.com/spaolacci/murmur3 v1.1.0 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.21.0 google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a // indirect google.golang.org/grpc v1.51.0 diff --git a/go.sum b/go.sum index 0524add4..e103ecfe 100644 --- a/go.sum +++ b/go.sum @@ -229,6 +229,8 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -292,8 +294,9 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= -github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -360,6 +363,8 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= @@ -382,6 +387,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/polarismesh/specification v1.4.1 h1:lTZqeyUhhWuKyr6NDKBwmUrNfcUDvKLxWT/uOq71T5A= github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -406,6 +413,12 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/shirou/gopsutil/v3 v3.23.8 h1:xnATPiybo6GgdRoC4YoGnxXZFRc3dqQTGi73oLvvBrE= +github.com/shirou/gopsutil/v3 v3.23.8/go.mod h1:7hmCaBn+2ZwaZOr6jmPBZDfawwMGuo1id3C6aM8EDqQ= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -428,14 +441,20 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -595,6 +614,7 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -618,6 +638,7 @@ golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -654,9 +675,11 @@ golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220906165534-d0df966e6959 h1:qSa+Hg9oBe6UJXrznE+yYvW51V9UbyIj/nj/KpDigo8= golang.org/x/sys v0.0.0-20220906165534-d0df966e6959/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/config/default.go b/pkg/config/default.go index b7b71de1..645a2e22 100644 --- a/pkg/config/default.go +++ b/pkg/config/default.go @@ -211,6 +211,8 @@ const ( DefaultUniformRateLimiter = "unirate" // DefaultWarmUpWaitLimiter 默认限流插件,预热匀速. DefaultWarmUpWaitLimiter = "warmup-wait" + // DefaultBBRRateLimiter 默认的 CPU 自适应限流器. 使用 BBR 算法 + DefaultBBRRateLimiter = "bbr" // SubscribeLocalChannel 默认订阅事件处理插件. SubscribeLocalChannel = "subscribeLocalChannel" diff --git a/pkg/flow/quota/assist.go b/pkg/flow/quota/assist.go index 895fe7a2..74b2fb71 100644 --- a/pkg/flow/quota/assist.go +++ b/pkg/flow/quota/assist.go @@ -256,19 +256,35 @@ func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest return model.QuotaFutureWithResponse(resp), nil } var maxWaitMs int64 = 0 + var releaseFuncs = make([]model.ReleaseFunc, 0, len(windows)) for _, window := range windows { window.Init() quotaResult := window.AllocateQuota(commonRequest) + if quotaResult == nil { + continue + } + for _, releaseFunc := range quotaResult.ReleaseFuncs { + if releaseFunc != nil { + releaseFuncs = append(releaseFuncs, releaseFunc) + } + } + // 触发限流,提前返回 if quotaResult.Code == model.QuotaResultLimited { + // 先释放资源 + for i := range releaseFuncs { + releaseFuncs[i]() + } return model.QuotaFutureWithResponse(quotaResult), nil } + // 未触发限流,记录令牌桶的最大排队时间 if quotaResult.WaitMs > maxWaitMs { maxWaitMs = quotaResult.WaitMs } } return model.QuotaFutureWithResponse(&model.QuotaResponse{ - Code: model.QuotaResultOk, - WaitMs: maxWaitMs, + Code: model.QuotaResultOk, + WaitMs: maxWaitMs, + ReleaseFuncs: releaseFuncs, }), nil } diff --git a/pkg/model/quota.go b/pkg/model/quota.go index 2901f000..b0289885 100644 --- a/pkg/model/quota.go +++ b/pkg/model/quota.go @@ -156,6 +156,8 @@ const ( QuotaResultLimited QuotaResultCode = -1 ) +type ReleaseFunc func() + // QuotaResponse 配额查询应答. type QuotaResponse struct { // 配额分配的返回码 @@ -164,6 +166,8 @@ type QuotaResponse struct { Info string // 需要等待的时间段 WaitMs int64 + // 释放资源函数 + ReleaseFuncs []ReleaseFunc } // QuotaFutureImpl 异步获取配额的future. @@ -180,7 +184,10 @@ func QuotaFutureWithResponse(resp *QuotaResponse) *QuotaFutureImpl { deadlineCtx, cancel = context.WithTimeout(context.Background(), time.Duration(resp.WaitMs)*time.Millisecond) } return &QuotaFutureImpl{ - resp: resp, deadlineCtx: deadlineCtx, cancel: cancel} + resp: resp, + deadlineCtx: deadlineCtx, + cancel: cancel, + } } // Done 分配是否结束. @@ -204,8 +211,13 @@ func (q *QuotaFutureImpl) Get() *QuotaResponse { return q.resp } -// Release 释放资源,仅用于并发数限流的场景. +// Release 释放资源,仅用于并发数限流/CPU限流场景 func (q *QuotaFutureImpl) Release() { + if q.resp != nil { + for i := range q.resp.ReleaseFuncs { + q.resp.ReleaseFuncs[i]() + } + } } const ( diff --git a/pkg/plugin/register/plugins.go b/pkg/plugin/register/plugins.go index 06468775..30d0b673 100644 --- a/pkg/plugin/register/plugins.go +++ b/pkg/plugin/register/plugins.go @@ -45,6 +45,7 @@ import ( _ "github.com/polarismesh/polaris-go/plugin/location" _ "github.com/polarismesh/polaris-go/plugin/logger/zaplog" _ "github.com/polarismesh/polaris-go/plugin/metrics/prometheus" + _ "github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr" _ "github.com/polarismesh/polaris-go/plugin/ratelimiter/reject" _ "github.com/polarismesh/polaris-go/plugin/ratelimiter/unirate" _ "github.com/polarismesh/polaris-go/plugin/serverconnector/grpc" diff --git a/plugin/ratelimiter/bbr/README.md b/plugin/ratelimiter/bbr/README.md new file mode 100644 index 00000000..c9ca9d07 --- /dev/null +++ b/plugin/ratelimiter/bbr/README.md @@ -0,0 +1,57 @@ +# BBR +BBR CPU 自适应限流组件参考了 [TCP BBR](https://en.wikipedia.org/wiki/TCP_congestion_control#TCP_BBR) 的思想,以及 [阿里 Sentinel](https://github.com/alibaba/Sentinel/wiki/系统自适应限流) 的算法。 + +传统的限流思路为:超过一定负载就拦截流量进入,负载恢复就放开流量,这样做有延迟性,最终是按照果来调节因,无法取得良好效果。 + +BBR 的思路为:根据应用的请求处理时间、请求成功数、最大并发数这些指标,计算当前应用能承载的最大并发请求量,再对比当前系统并发量,判断是否应当拦截本次流量,即所谓"自适应"。 + +BBR 的源码实现可参考: +- [乔卓越 - 深入理解云原生下自适应限流技术原理与应用](https://mp.weixin.qq.com/s?__biz=MzA4ODg0NDkzOA==&mid=2247493581&idx=1&sn=62feb928915eaeb9082b58737829cf19&chksm=90215828a756d13e36cf77fe980f90810236296e5289259e085ccda7a539979c0716b5c8a601&scene=126&sessionid=1634901423&key=92c891f823e779d59fb069c1f73467971e77ea57597e6305cd46077c5115f63362682acb7d71e10dce269b227d3823d11ef9e5ce4116448fda19babccca00938bb97159b2d212d3a739c461a317a413867734e4ff39439da6d669943638ebb44fb5d44b939ae294b9b2eb42fa68fe939e1e4b21d8d806bf0299ecfea6bfa0c80&ascene=0&uin=MTg4NzU0NzUzNw%3D%3D&devicetype=Windows+10+x64&version=63040026&lang=zh_CN&exportkey=AyWZkGTg8xpxVEKYWHzdFvE%3D&pass_ticket=aPS1JJrPDslKIxzL8eyKwCG9loYdUIDyJU6iO22glE0yHlC3foSNMEFaklAFVWTj&wx_header=0&fontgear=2#) +- [yuemoxi - 从kratos分析BBR限流源码实现](https://juejin.cn/post/7004848252109455368) + + + +# 插件设计 +本插件将 BBR 限流器适配成 `QuotaBucket` 接口(主要实现 `GetQuota` 判断限流方法),以及 `ServiceRateLimiter` 接口(实现 `InitQuota` 初始化方法)。 + +由于 CPU 使用率指标为实例单机指标,因此 CPU 限流只适用于单机限流,不适用于分布式限流,未实现分布式限流器需要实现的接口。 + + +## 初始化 InitQuota +kratos - BBR 初始化需要三个入参: +``` +CPUThreshold: CPU使用率阈值,超过该阈值时,根据应用过去能承受的负载判断是否拦截流量 +window: 窗口采样时长,控制采样多久的数据 +bucket: 桶数,BBR 会把 window 分成多个 bucket,沿时间轴向前滑动。如 window=1s, bucket=10 时,整个滑动窗口用来保存最近 1s 的采样数据,每个小的桶用来保存 100ms 的采样数据。当时间流动之后,过期的桶会自动被新桶的数据覆盖掉 +``` +这三个入参,从 `apitraffic.Rule` 结构体中解析,直接使用了结构体中的 `MaxAmount`、`ValidDuration`、`Precision` 字段 + + +## 判断限流 GetQuota +调用了 BBR 的 `Allow()` 方法 + +其内部执行 `shouldDrop()` 方法,其执行流程如下: + +![img.jpg](img.jpg) + +流程中比较关键的一步是计算应用可承受的最大请求量,由下列方法计算: +```go +func (l *BBR) maxInFlight() int64 { + return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5) +} +``` +- `maxPass * bucketPerSecond / 1000` 为每毫秒处理的请求数 +- `l.minRT()` 为 单个采样窗口中最小的响应时间 +- 0.5为向上取整 +- 当CPU利用率过载时,就需要通过上述预期公式进行干预。在服务运行期间持续统计当前服务的请求数,即 `inFlight`,通过在滑动窗口内的所有buckets中比较得出最多请求完成数 `maxPass`,以及最小的耗时 `minRT`,相乘就得出了预期的最佳请求数 `maxInFlight`。 +- `maxInFlight` 表示系统能同时处理的最多请求数,这个水位是一个平衡点,保持该水位可以最大化系统的处理能力,超过该水位则会导致请求堆积。 +- 通过 `inFlight` 与 `maxInFlight` 对比,如果前者大于后者那么就已经过载,进而拒绝后续到来的请求防止服务过载。 + +## 代码结构 +```go +├── core BBR核心算法实现 +├── cpu CPU使用率采集相关实现 +├── window 滑动窗口相关实现 +├── bucket.go 实现 `QuotaBucket` 接口 +├── plugin.go 实现 `Plugin` 接口 +``` diff --git a/plugin/ratelimiter/bbr/bucket.go b/plugin/ratelimiter/bbr/bucket.go new file mode 100644 index 00000000..5676b5c2 --- /dev/null +++ b/plugin/ratelimiter/bbr/bucket.go @@ -0,0 +1,117 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package bbr + +import ( + "github.com/polarismesh/polaris-go/pkg/model" + "github.com/polarismesh/polaris-go/pkg/plugin/ratelimiter" + "github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/core" + "sort" + + apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage" +) + +var ( + denyResp = &model.QuotaResponse{ + Code: model.QuotaResultLimited, + } +) + +// BBRQuotaBucket 实现 BBRQuotaBucket 接口的结构体 +type BBRQuotaBucket struct { + BBR *core.BBR +} + +// GetQuota 获取限额 +func (b *BBRQuotaBucket) GetQuota(_ int64, _ uint32) *model.QuotaResponse { + release, allow := b.BBR.Allow() + if allow { + return &model.QuotaResponse{ + Code: model.QuotaResultOk, + ReleaseFuncs: []model.ReleaseFunc{release}, + } + } + return denyResp +} + +// Release 释放配额(仅对于并发数限流有用) +func (l *BBRQuotaBucket) Release() { + +} + +// OnRemoteUpdate 远端更新的时候通知。CPU限流为单机限流策略,不实现该函数 +func (b *BBRQuotaBucket) OnRemoteUpdate(_ ratelimiter.RemoteQuotaResult) { + +} + +// GetQuotaUsed 返回本地限流信息用于上报 +func (b *BBRQuotaBucket) GetQuotaUsed(_ int64) ratelimiter.UsageInfo { + return ratelimiter.UsageInfo{} +} + +// GetAmountInfos 获取规则的限流阈值信息,用于与服务端pb交互 +func (b *BBRQuotaBucket) GetAmountInfos() []ratelimiter.AmountInfo { + return nil +} + +// createBBRPlugin 初始化 +func createBBRPlugin(rule *apitraffic.Rule) *BBRQuotaBucket { + options := make([]core.Option, 0) + + if amounts := rule.GetAmounts(); len(amounts) > 0 { + // 如果有多条规则: + // 1. 先按CPU阈值比较,阈值小的生效 + // 2. 阈值相同时,按时间窗口比较,窗口小的生效 + // 3. 窗口也相同时,按精度比较,精度大的生效(polaris-server 做了校验,不会出现窗口相同的情况。这里也可以不用判断) + sort.Slice(amounts, func(i, j int) bool { + a, b := amounts[i], amounts[j] + threshold1, threshold2 := a.GetMaxAmount().GetValue(), b.GetMaxAmount().GetValue() + window1, window2 := a.GetValidDuration().AsDuration(), b.GetValidDuration().AsDuration() + precision1, precision2 := a.GetPrecision().GetValue(), b.GetPrecision().GetValue() + + if threshold1 == threshold2 { + if window1 == window2 { + return precision1 > precision2 + } + return window1 < window2 + } + return threshold1 < threshold2 + }) + + amount := amounts[0] + + // CPU使用率阈值,默认80% + if threshold := amount.GetMaxAmount().GetValue(); threshold > 0 { + // bbr 的参数为 800‰ 的形式,需要从 rule 中的百分号转到千分号,因此这里乘10 + options = append(options, core.WithCPUThreshold(int64(threshold*10))) + } + // 统计时间窗口,默认 10s + if window := amount.GetValidDuration().AsDuration(); window > 0 { + options = append(options, core.WithWindow(window)) + } + // 观测时间窗口内 计数桶 的个数(控制滑动窗口精度),默认100个 + // 如 window=1s, bucket=10 时,整个滑动窗口用来保存最近 1s 的采样数据,每个小的桶用来保存 100ms 的采样数据。当时间流动之后,过期的桶会自动被新桶的数据覆盖掉 + if precision := amount.GetPrecision().GetValue(); precision > 0 { + options = append(options, core.WithBucket(int(precision))) + } + } + + return &BBRQuotaBucket{ + BBR: core.NewLimiter(options...), + } +} diff --git a/plugin/ratelimiter/bbr/config.go b/plugin/ratelimiter/bbr/config.go new file mode 100644 index 00000000..aa56e9ae --- /dev/null +++ b/plugin/ratelimiter/bbr/config.go @@ -0,0 +1,29 @@ +package bbr + +import ( + "fmt" + "time" +) + +type Config struct { + CPUSampleInterval time.Duration // CPU使用率采样间隔 + Decay int // 加权平均系数 +} + +// SetDefault 设置默认值 +func (c *Config) SetDefault() { + if c.CPUSampleInterval <= 0 { + c.CPUSampleInterval = time.Millisecond * 500 + } +} + +// Verify 校验配置值 +func (c *Config) Verify() error { + if c.CPUSampleInterval <= 0 { + return fmt.Errorf("Invalid CPUSampleInterval") + } + if c.Decay < 0 || c.Decay > 100 { + return fmt.Errorf("Invalid Decay") + } + return nil +} diff --git a/plugin/ratelimiter/bbr/core/bbr.go b/plugin/ratelimiter/bbr/core/bbr.go new file mode 100644 index 00000000..63c36eae --- /dev/null +++ b/plugin/ratelimiter/bbr/core/bbr.go @@ -0,0 +1,346 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package core + +import ( + "github.com/polarismesh/polaris-go/pkg/log" + "github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/cpu" + "github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/window" + "math" + "runtime" + "sync/atomic" + "time" +) + +var ( + gCPU int64 // 当前 CPU 使用率 + decay = 0.95 // 加权平均系数 + ticker = time.NewTicker(time.Millisecond * 500) // 定时任务定时器 +) + +type ( + cpuGetter func() int64 + + // Option function for bbr limiter + Option func(*options) +) + +// GetEMACPUUsage 获取 CPU 使用率(百分数,加权平均值) +func GetEMACPUUsage() float64 { + return float64(atomic.LoadInt64(&gCPU)) / 10 +} + +// SetDecay 设置加权平均系数 +func SetDecay(decayInt int) { + log.GetBaseLogger().Infof("bbr limiter decay is set to: %v", decayInt) + decay = float64(decayInt) / 100 +} + +// SetInterval 设置定时任务间隔 +func SetInterval(interval time.Duration) { + log.GetBaseLogger().Infof("bbr ticker interval is set to: %v", interval) + ticker.Reset(interval) +} + +// CollectCPUStat 定期更新 CPU 平均使用率 +// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay) +func CollectCPUStat() { + defer func() { + ticker.Stop() + if r := recover(); r != nil { + buf := make([]byte, 1<<18) + n := runtime.Stack(buf, false) + log.GetBaseLogger().Errorf("bbr limiter panic recovered: %v.\nruntime stack: %s", r, buf[0:n]) + } + }() + + // EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863 + for range ticker.C { + stat := &cpu.Stat{} + cpu.ReadStat(stat) + stat.Usage = min(stat.Usage, 1000) + prevCPU := atomic.LoadInt64(&gCPU) + curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay)) + atomic.StoreInt64(&gCPU, curCPU) + } +} + +func min(l, r uint64) uint64 { + if l < r { + return l + } + return r +} + +// Stat contains the metrics snapshot of bbr. +type Stat struct { + CPU int64 + InFlight int64 + MaxInFlight int64 + MinRt int64 + MaxPass int64 +} + +// counterCache is used to cache maxPASS and minRt result. +// Value of current bucket is not counted in real time. +// Cache time is equal to a bucket duration. +type counterCache struct { + val int64 + time time.Time +} + +// options of bbr limiter. +type options struct { + // WindowSize defines time duration per window + Window time.Duration + // BucketNum defines bucket number for each window + Bucket int + // CPUThreshold + CPUThreshold int64 + // CPUQuota + CPUQuota float64 +} + +// WithWindow with window size. +func WithWindow(d time.Duration) Option { + return func(o *options) { + o.Window = d + } +} + +// WithBucket with bucket ize. +func WithBucket(b int) Option { + return func(o *options) { + o.Bucket = b + } +} + +// WithCPUThreshold with cpu threshold; +func WithCPUThreshold(threshold int64) Option { + return func(o *options) { + o.CPUThreshold = threshold + } +} + +// WithCPUQuota with real cpu quota(if it can not collect from process correct); +func WithCPUQuota(quota float64) Option { + return func(o *options) { + o.CPUQuota = quota + } +} + +// BBR implements bbr-like limiter. +// It is inspired by sentinel. +// https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81 +type BBR struct { + cpu cpuGetter + passStat window.RollingCounter + rtStat window.RollingCounter + inFlight int64 + bucketPerSecond int64 + bucketDuration time.Duration + + // prevDropTime defines previous start drop since initTime + prevDropTime atomic.Value + maxPASSCache atomic.Value + minRtCache atomic.Value + + opts options +} + +// NewLimiter returns a bbr limiter +func NewLimiter(opts ...Option) *BBR { + opt := options{ + Window: time.Second * 10, + Bucket: 100, + CPUThreshold: 800, + } + for _, o := range opts { + o(&opt) + } + + bucketDuration := opt.Window / time.Duration(opt.Bucket) + passStat := window.NewRollingCounter(window.RollingCounterOpts{Size: opt.Bucket, BucketDuration: bucketDuration}) + rtStat := window.NewRollingCounter(window.RollingCounterOpts{Size: opt.Bucket, BucketDuration: bucketDuration}) + + limiter := &BBR{ + opts: opt, + passStat: passStat, + rtStat: rtStat, + bucketDuration: bucketDuration, + bucketPerSecond: int64(time.Second / bucketDuration), + cpu: func() int64 { return atomic.LoadInt64(&gCPU) }, + } + + if opt.CPUQuota != 0 { + // if cpuQuota is set, use new cpuGetter,Calculate the real CPU value based on the number of CPUs and Quota. + limiter.cpu = func() int64 { + return int64(float64(atomic.LoadInt64(&gCPU)) * float64(runtime.NumCPU()) / opt.CPUQuota) + } + } + + return limiter +} + +func (l *BBR) maxPASS() int64 { + passCache := l.maxPASSCache.Load() + if passCache != nil { + ps := passCache.(*counterCache) + if l.timespan(ps.time) < 1 { + return ps.val + } + } + rawMaxPass := int64(l.passStat.Reduce(func(iterator window.Iterator) float64 { + var result = 1.0 + for i := 1; iterator.Next() && i < l.opts.Bucket; i++ { + bucket := iterator.Bucket() + count := 0.0 + for _, p := range bucket.Points { + count += p + } + result = math.Max(result, count) + } + return result + })) + l.maxPASSCache.Store(&counterCache{ + val: rawMaxPass, + time: time.Now(), + }) + return rawMaxPass +} + +// timespan returns the passed bucket count +// since lastTime, if it is one bucket duration earlier than +// the last recorded time, it will return the BucketNum. +func (l *BBR) timespan(lastTime time.Time) int { + v := int(time.Since(lastTime) / l.bucketDuration) + if v > -1 { + return v + } + return l.opts.Bucket +} + +func (l *BBR) minRT() int64 { + rtCache := l.minRtCache.Load() + if rtCache != nil { + rc := rtCache.(*counterCache) + if l.timespan(rc.time) < 1 { + return rc.val + } + } + rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator window.Iterator) float64 { + var result = math.MaxFloat64 + for i := 1; iterator.Next() && i < l.opts.Bucket; i++ { + bucket := iterator.Bucket() + if len(bucket.Points) == 0 { + continue + } + total := 0.0 + for _, p := range bucket.Points { + total += p + } + avg := total / float64(bucket.Count) + result = math.Min(result, avg) + } + return result + }))) + if rawMinRT <= 0 { + rawMinRT = 1 + } + l.minRtCache.Store(&counterCache{ + val: rawMinRT, + time: time.Now(), + }) + return rawMinRT +} + +// maxInFlight 每毫秒能同时处理的最多请求数 +// maxPass * bucketPerSecond / 1000 为每毫秒处理的请求数 +// l.minRT() 为 单个采样窗口中最小的响应时间 +// 0.5为向上取整 +func (l *BBR) maxInFlight() int64 { + return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5) +} + +// shouldDrop 是否应当抛弃本次请求 +// 在服务运行期间持续统计当前服务的并发处理请求数,即 inFlight,通过在滑动窗口内的所有buckets中比较得出最多请求完成数 maxPass,以及最小的耗时 minRT,相乘就得出了预期的最佳请求数 maxInFlight。 +// maxInFlight 表示系统能同时处理的最多请求数,这个水位是一个平衡点,保持该水位可以最大化系统的处理能力,超过该水位则会导致请求堆积。 +// 通过 inFlight 与 maxInFlight 对比,如果前者大于后者那么就已经过载,进而拒绝后续到来的请求防止服务过载。 +func (l *BBR) shouldDrop() bool { + now := time.Duration(time.Now().UnixNano()) + if l.cpu() < l.opts.CPUThreshold { + // current cpu payload below the threshold + prevDropTime, _ := l.prevDropTime.Load().(time.Duration) + if prevDropTime == 0 { + // haven't start drop, + // accept current request + return false + } + if now-prevDropTime <= time.Second { + // just start drop one second ago, + // check current inflight count + inFlight := atomic.LoadInt64(&l.inFlight) + return inFlight > 1 && inFlight > l.maxInFlight() + } + l.prevDropTime.Store(time.Duration(0)) + return false + } + // current cpu payload exceeds the threshold + inFlight := atomic.LoadInt64(&l.inFlight) + drop := inFlight > 1 && inFlight > l.maxInFlight() + if drop { + prevDrop, _ := l.prevDropTime.Load().(time.Duration) + if prevDrop != 0 { + // already started drop, return directly + return drop + } + // store start drop time + l.prevDropTime.Store(now) + } + return drop +} + +// Stat tasks a snapshot of the bbr limiter. +func (l *BBR) Stat() Stat { + return Stat{ + CPU: l.cpu(), + MinRt: l.minRT(), + MaxPass: l.maxPASS(), + MaxInFlight: l.maxInFlight(), + InFlight: atomic.LoadInt64(&l.inFlight), + } +} + +// Allow checks all inbound traffic. +// Once overload is detected, it raises limit.ErrLimitExceed error. +func (l *BBR) Allow() (func(), bool) { + if l.shouldDrop() { + return nil, false + } + atomic.AddInt64(&l.inFlight, 1) + start := time.Now().UnixNano() + ms := float64(time.Millisecond) + return func() { + //nolint + if rt := int64(math.Ceil(float64(time.Now().UnixNano()-start)) / ms); rt > 0 { + l.rtStat.Add(rt) + } + atomic.AddInt64(&l.inFlight, -1) + l.passStat.Add(1) + }, true +} diff --git a/plugin/ratelimiter/bbr/core/bbr_test.go b/plugin/ratelimiter/bbr/core/bbr_test.go new file mode 100644 index 00000000..fa1aa941 --- /dev/null +++ b/plugin/ratelimiter/bbr/core/bbr_test.go @@ -0,0 +1,302 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package core + +import ( + "github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/window" + "math" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var ( + windowSizeTest = time.Second + bucketNumTest = 10 + cpuThresholdTest = int64(800) + + optsForTest = []Option{ + WithWindow(windowSizeTest), + WithBucket(bucketNumTest), + WithCPUThreshold(cpuThresholdTest), + } +) + +func warmup(bbr *BBR, count int) { + for i := 0; i < count; i++ { + done, allow := bbr.Allow() + time.Sleep(time.Millisecond * 1) + if allow { + done() + } + } +} + +func forceAllow(bbr *BBR) { + inflight := bbr.inFlight + bbr.inFlight = bbr.maxPASS() - 1 + done, allow := bbr.Allow() + if allow { + done() + } + bbr.inFlight = inflight +} + +func TestBBR(t *testing.T) { + limiter := NewLimiter( + WithWindow(5*time.Second), + WithBucket(50), + WithCPUThreshold(100)) + var wg sync.WaitGroup + var drop int64 + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 300; i++ { + done, allow := limiter.Allow() + if allow { + atomic.AddInt64(&drop, 1) + } else { + count := rand.Intn(100) + time.Sleep(time.Millisecond * time.Duration(count)) + done() + } + } + }() + } + wg.Wait() + t.Log("drop: ", drop) +} + +func TestBBRMaxPass(t *testing.T) { + bucketDuration := windowSizeTest / time.Duration(bucketNumTest) + bbr := NewLimiter(optsForTest...) + for i := 1; i <= 10; i++ { + bbr.passStat.Add(int64(i * 100)) + time.Sleep(bucketDuration) + } + assert.Equal(t, int64(1000), bbr.maxPASS()) + + // default max pass is equal to 1. + bbr = NewLimiter(optsForTest...) + assert.Equal(t, int64(1), bbr.maxPASS()) +} + +func TestBBRMaxPassWithCache(t *testing.T) { + bucketDuration := windowSizeTest / time.Duration(bucketNumTest) + bbr := NewLimiter(optsForTest...) + // witch cache, value of latest bucket is not counted instantly. + // after a bucket duration time, this bucket will be fully counted. + bbr.passStat.Add(int64(50)) + time.Sleep(bucketDuration / 2) + assert.Equal(t, int64(1), bbr.maxPASS()) + + bbr.passStat.Add(int64(50)) + time.Sleep(bucketDuration / 2) + assert.Equal(t, int64(1), bbr.maxPASS()) + + bbr.passStat.Add(int64(1)) + time.Sleep(bucketDuration) + assert.Equal(t, int64(100), bbr.maxPASS()) +} + +func TestBBRMinRt(t *testing.T) { + bucketDuration := windowSizeTest / time.Duration(bucketNumTest) + bbr := NewLimiter(optsForTest...) + for i := 0; i < 10; i++ { + for j := i*10 + 1; j <= i*10+10; j++ { + bbr.rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration) + } + } + assert.Equal(t, int64(6), bbr.minRT()) + + // default max min rt is equal to maxFloat64. + bbr = NewLimiter(optsForTest...) + bbr.rtStat = window.NewRollingCounter(window.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + assert.Equal(t, int64(math.MaxInt64), bbr.minRT()) +} + +func TestBBRMinRtWithCache(t *testing.T) { + bucketDuration := windowSizeTest / time.Duration(bucketNumTest) + bbr := NewLimiter(optsForTest...) + for i := 0; i < 10; i++ { + for j := i*10 + 1; j <= i*10+5; j++ { + bbr.rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration / 2) + } + _ = bbr.minRT() + for j := i*10 + 6; j <= i*10+10; j++ { + bbr.rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration / 2) + } + } + assert.Equal(t, int64(6), bbr.minRT()) +} + +func TestBBRMaxQps(t *testing.T) { + bbr := NewLimiter(optsForTest...) + bucketDuration := windowSizeTest / time.Duration(bucketNumTest) + passStat := window.NewRollingCounter(window.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := window.NewRollingCounter(window.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + for i := 0; i < 10; i++ { + passStat.Add(int64((i + 2) * 100)) + for j := i*10 + 1; j <= i*10+10; j++ { + rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration) + } + } + bbr.passStat = passStat + bbr.rtStat = rtStat + assert.Equal(t, int64(60), bbr.maxInFlight()) +} + +func TestBBRShouldDrop(t *testing.T) { + var cpu int64 + bbr := NewLimiter(optsForTest...) + bbr.cpu = func() int64 { + return cpu + } + bucketDuration := windowSizeTest / time.Duration(bucketNumTest) + passStat := window.NewRollingCounter(window.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := window.NewRollingCounter(window.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + for i := 0; i < 10; i++ { + passStat.Add(int64((i + 1) * 100)) + for j := i*10 + 1; j <= i*10+10; j++ { + rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration) + } + } + bbr.passStat = passStat + bbr.rtStat = rtStat + // cpu >= 800, inflight < maxQps + cpu = 800 + bbr.inFlight = 50 + assert.Equal(t, false, bbr.shouldDrop()) + + // cpu >= 800, inflight > maxQps + cpu = 800 + bbr.inFlight = 80 + assert.Equal(t, true, bbr.shouldDrop()) + + // cpu < 800, inflight > maxQps, cold duration + cpu = 700 + bbr.inFlight = 80 + assert.Equal(t, true, bbr.shouldDrop()) + + // cpu < 800, inflight > maxQps + time.Sleep(2 * time.Second) + cpu = 700 + bbr.inFlight = 80 + assert.Equal(t, false, bbr.shouldDrop()) +} + +func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { + bbr := NewLimiter(optsForTest...) + bbr.cpu = func() int64 { + return 500 + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + done, allow := bbr.Allow() + if allow { + done() + } + } +} + +func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { + bbr := NewLimiter(optsForTest...) + bbr.cpu = func() int64 { + return 900 + } + bbr.inFlight = 1 + b.ResetTimer() + for i := 0; i <= b.N; i++ { + if i%10000 == 0 { + maxFlight := bbr.maxInFlight() + if maxFlight != 0 { + bbr.inFlight = rand.Int63n(bbr.maxInFlight() * 2) + } + } + done, allow := bbr.Allow() + if allow { + done() + } + } +} + +func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { + bbr := NewLimiter(optsForTest...) + bbr.cpu = func() int64 { + return 500 + } + warmup(bbr, 10000) + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + } +} + +func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) { + bbr := NewLimiter(optsForTest...) + bbr.cpu = func() int64 { + return 900 + } + warmup(bbr, 10000) + bbr.inFlight = 1000 + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + if i%10000 == 0 { + forceAllow(bbr) + } + } +} + +func BenchmarkBBRShouldDropUnderUnstableLoad(b *testing.B) { + bbr := NewLimiter(optsForTest...) + bbr.cpu = func() int64 { + return 500 + } + warmup(bbr, 10000) + bbr.prevDropTime.Store(time.Now().UnixNano()) + bbr.inFlight = 1000 + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + if i%100000 == 0 { + forceAllow(bbr) + } + } +} diff --git a/plugin/ratelimiter/bbr/cpu/cgroup.go b/plugin/ratelimiter/bbr/cpu/cgroup.go new file mode 100644 index 00000000..a4eb426f --- /dev/null +++ b/plugin/ratelimiter/bbr/cpu/cgroup.go @@ -0,0 +1,143 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package cpu + +import ( + "bufio" + "fmt" + "io" + "os" + "path" + "strconv" + "strings" +) + +const cgroupRootDir = "/sys/fs/cgroup" + +// cgroup Linux cgroup +type cgroup struct { + cgroupSet map[string]string +} + +// CPUCFSQuotaUs cpu.cfs_quota_us +func (c *cgroup) CPUCFSQuotaUs() (int64, error) { + data, err := readFile(path.Join(c.cgroupSet["cpu"], "cpu.cfs_quota_us")) + if err != nil { + return 0, err + } + return strconv.ParseInt(data, 10, 64) +} + +// CPUCFSPeriodUs cpu.cfs_period_us +func (c *cgroup) CPUCFSPeriodUs() (uint64, error) { + data, err := readFile(path.Join(c.cgroupSet["cpu"], "cpu.cfs_period_us")) + if err != nil { + return 0, err + } + return parseUint(data) +} + +// CPUAcctUsage cpuacct.usage +func (c *cgroup) CPUAcctUsage() (uint64, error) { + data, err := readFile(path.Join(c.cgroupSet["cpuacct"], "cpuacct.usage")) + if err != nil { + return 0, err + } + return parseUint(data) +} + +// CPUAcctUsagePerCPU cpuacct.usage_percpu +func (c *cgroup) CPUAcctUsagePerCPU() ([]uint64, error) { + data, err := readFile(path.Join(c.cgroupSet["cpuacct"], "cpuacct.usage_percpu")) + if err != nil { + return nil, err + } + var usage []uint64 + for _, v := range strings.Fields(string(data)) { + var u uint64 + if u, err = parseUint(v); err != nil { + return nil, err + } + // fix possible_cpu:https://www.ibm.com/support/knowledgecenter/en/linuxonibm/com.ibm.linux.z.lgdd/lgdd_r_posscpusparm.html + if u != 0 { + usage = append(usage, u) + } + } + return usage, nil +} + +// CPUSetCPUs cpuset.cpus +func (c *cgroup) CPUSetCPUs() ([]uint64, error) { + data, err := readFile(path.Join(c.cgroupSet["cpuset"], "cpuset.cpus")) + if err != nil { + return nil, err + } + cpus, err := ParseUintList(data) + if err != nil { + return nil, err + } + sets := make([]uint64, 0) + for k := range cpus { + sets = append(sets, uint64(k)) + } + return sets, nil +} + +// currentcGroup get current process cgroup +func currentcGroup() (*cgroup, error) { + pid := os.Getpid() + cgroupFile := fmt.Sprintf("/proc/%d/cgroup", pid) + cgroupSet := make(map[string]string) + fp, err := os.Open(cgroupFile) + if err != nil { + return nil, err + } + defer fp.Close() + buf := bufio.NewReader(fp) + for { + line, err := buf.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + col := strings.Split(strings.TrimSpace(line), ":") + if len(col) != 3 { + return nil, fmt.Errorf("invalid cgroup format %s", line) + } + dir := col[2] + // When dir is not equal to /, it must be in docker + if dir != "/" { + cgroupSet[col[1]] = path.Join(cgroupRootDir, col[1]) + if strings.Contains(col[1], ",") { + for _, k := range strings.Split(col[1], ",") { + cgroupSet[k] = path.Join(cgroupRootDir, k) + } + } + } else { + cgroupSet[col[1]] = path.Join(cgroupRootDir, col[1], col[2]) + if strings.Contains(col[1], ",") { + for _, k := range strings.Split(col[1], ",") { + cgroupSet[k] = path.Join(cgroupRootDir, k, col[2]) + } + } + } + } + return &cgroup{cgroupSet: cgroupSet}, nil +} diff --git a/plugin/ratelimiter/bbr/cpu/cgroup_cpu.go b/plugin/ratelimiter/bbr/cpu/cgroup_cpu.go new file mode 100644 index 00000000..f4185c1b --- /dev/null +++ b/plugin/ratelimiter/bbr/cpu/cgroup_cpu.go @@ -0,0 +1,259 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package cpu + +import ( + "bufio" + "errors" + "os" + "strconv" + "strings" + + pscpu "github.com/shirou/gopsutil/v3/cpu" +) + +var _ CPU = (*cgroupCPU)(nil) + +type cgroupCPU struct { + frequency uint64 + quota float64 + cores uint64 + + preSystem uint64 + preTotal uint64 +} + +func newCgroupCPU() (cpu *cgroupCPU, err error) { + cores, err := pscpu.Counts(true) + if err != nil || cores == 0 { + var cpus []uint64 + cpus, err = perCPUUsage() + if err != nil { + return nil, err + } + cores = len(cpus) + } + + sets, err := cpuSets() + if err != nil { + return + } + quota := float64(len(sets)) + cq, err := cpuQuota() + if err == nil && cq != -1 { + var period uint64 + if period, err = cpuPeriod(); err != nil { + return + } + limit := float64(cq) / float64(period) + if limit < quota { + quota = limit + } + } + maxFreq := cpuMaxFreq() + + preSystem, err := systemCPUUsage() + if err != nil { + return + } + preTotal, err := totalCPUUsage() + if err != nil { + return + } + cpu = &cgroupCPU{ + frequency: maxFreq, + quota: quota, + cores: uint64(cores), + preSystem: preSystem, + preTotal: preTotal, + } + return +} + +func (cpu *cgroupCPU) Usage() (u uint64, err error) { + var ( + total uint64 + system uint64 + ) + total, err = totalCPUUsage() + if err != nil { + return + } + system, err = systemCPUUsage() + if err != nil { + return + } + if system != cpu.preSystem { + u = uint64(float64((total-cpu.preTotal)*cpu.cores*1e3) / (float64(system-cpu.preSystem) * cpu.quota)) + } + cpu.preSystem = system + cpu.preTotal = total + return +} + +func (cpu *cgroupCPU) Info() Info { + return Info{ + Frequency: cpu.frequency, + Quota: cpu.quota, + } +} + +const nanoSecondsPerSecond = 1e9 + +// ErrNoCFSLimit is no quota limit +var ErrNoCFSLimit = errors.New("no quota limit") + +var clockTicksPerSecond = uint64(getClockTicks()) + +// systemCPUUsage returns the host system's cpu usage in +// nanoseconds. An error is returned if the format of the underlying +// file does not match. +// +// Uses /proc/stat defined by POSIX. Looks for the cpu +// statistics line and then sums up the first seven fields +// provided. See man 5 proc for details on specific field +// information. +func systemCPUUsage() (usage uint64, err error) { + var ( + line string + f *os.File + ) + if f, err = os.Open("/proc/stat"); err != nil { + return + } + bufReader := bufio.NewReaderSize(nil, 128) + defer func() { + bufReader.Reset(nil) + f.Close() + }() + bufReader.Reset(f) + for err == nil { + if line, err = bufReader.ReadString('\n'); err != nil { + return + } + parts := strings.Fields(line) + switch parts[0] { + case "cpu": + if len(parts) < 8 { + err = errors.New("bad format of cpu stats") + return + } + var totalClockTicks uint64 + for _, i := range parts[1:8] { + var v uint64 + if v, err = strconv.ParseUint(i, 10, 64); err != nil { + return + } + totalClockTicks += v + } + usage = (totalClockTicks * nanoSecondsPerSecond) / clockTicksPerSecond + return + } + } + err = errors.New("bad stats format") + return +} + +func totalCPUUsage() (usage uint64, err error) { + var cg *cgroup + if cg, err = currentcGroup(); err != nil { + return + } + return cg.CPUAcctUsage() +} + +func perCPUUsage() (usage []uint64, err error) { + var cg *cgroup + if cg, err = currentcGroup(); err != nil { + return + } + return cg.CPUAcctUsagePerCPU() +} + +func cpuSets() (sets []uint64, err error) { + var cg *cgroup + if cg, err = currentcGroup(); err != nil { + return + } + return cg.CPUSetCPUs() +} + +func cpuQuota() (quota int64, err error) { + var cg *cgroup + if cg, err = currentcGroup(); err != nil { + return + } + return cg.CPUCFSQuotaUs() +} + +func cpuPeriod() (peroid uint64, err error) { + var cg *cgroup + if cg, err = currentcGroup(); err != nil { + return + } + return cg.CPUCFSPeriodUs() +} + +func cpuFreq() uint64 { + lines, err := readLines("/proc/cpuinfo") + if err != nil { + return 0 + } + for _, line := range lines { + fields := strings.Split(line, ":") + if len(fields) < 2 { + continue + } + key := strings.TrimSpace(fields[0]) + value := strings.TrimSpace(fields[1]) + if key == "cpu MHz" || key == "clock" { + // treat this as the fallback value, thus we ignore error + if t, err := strconv.ParseFloat(strings.Replace(value, "MHz", "", 1), 64); err == nil { + return uint64(t * 1000.0 * 1000.0) + } + } + } + return 0 +} + +func cpuMaxFreq() uint64 { + feq := cpuFreq() + data, err := readFile("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") + if err != nil { + return feq + } + // override the max freq from /proc/cpuinfo + cfeq, err := parseUint(data) + if err == nil { + feq = cfeq + } + return feq +} + +// getClockTicks get the OS's ticks per second +func getClockTicks() int { + // TODO figure out a better alternative for platforms where we're missing cgo + // + // TODO Windows. This could be implemented using Win32 QueryPerformanceFrequency(). + // https://msdn.microsoft.com/en-us/library/windows/desktop/ms644905(v=vs.85).aspx + // + // An example of its usage can be found here. + // https://msdn.microsoft.com/en-us/library/windows/desktop/dn553408(v=vs.85).aspx + + return 100 +} diff --git a/plugin/ratelimiter/bbr/cpu/psutil_cpu.go b/plugin/ratelimiter/bbr/cpu/psutil_cpu.go new file mode 100644 index 00000000..d23a91ff --- /dev/null +++ b/plugin/ratelimiter/bbr/cpu/psutil_cpu.go @@ -0,0 +1,64 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package cpu + +import ( + "time" + + "github.com/shirou/gopsutil/v3/cpu" +) + +var _ CPU = (*psutilCPU)(nil) + +type psutilCPU struct { + interval time.Duration +} + +func newPsutilCPU(interval time.Duration) (cpu *psutilCPU, err error) { + cpu = &psutilCPU{interval: interval} + _, err = cpu.Usage() + if err != nil { + return + } + return +} + +func (ps *psutilCPU) Usage() (u uint64, err error) { + var percents []float64 + percents, err = cpu.Percent(ps.interval, false) + if err == nil { + u = uint64(percents[0] * 10) + } + return +} + +func (ps *psutilCPU) Info() (info Info) { + stats, err := cpu.Info() + if err != nil { + return + } + cores, err := cpu.Counts(true) + if err != nil { + return + } + info = Info{ + Frequency: uint64(stats[0].Mhz), + Quota: float64(cores), + } + return +} diff --git a/plugin/ratelimiter/bbr/cpu/stat.go b/plugin/ratelimiter/bbr/cpu/stat.go new file mode 100644 index 00000000..3fb2ac07 --- /dev/null +++ b/plugin/ratelimiter/bbr/cpu/stat.go @@ -0,0 +1,94 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package cpu + +import ( + "fmt" + "github.com/polarismesh/polaris-go/pkg/log" + "sync/atomic" + "time" +) + +const ( + interval time.Duration = time.Millisecond * 100 +) + +var ( + stats CPU + usage uint64 +) + +// CPU is cpu stat usage. +type CPU interface { + Usage() (u uint64, e error) + Info() Info +} + +func Init() error { + var err error + stats, err = newCgroupCPU() + if err != nil { + log.GetBaseLogger().Warnf("cgroup cpu init failed(%s), switch to psutil cpu", err.Error()) + stats, err = newPsutilCPU(interval) + if err != nil { + return fmt.Errorf("cgroup cpu init failed. err: %w", err) + } + } + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range ticker.C { + u, err := stats.Usage() + if err == nil && u != 0 { + atomic.StoreUint64(&usage, u) + } + } + }() + return nil +} + +// Stat cpu stat. +type Stat struct { + Usage uint64 // cpu use ratio. +} + +// Info cpu info. +type Info struct { + Frequency uint64 + Quota float64 +} + +// ReadStat read cpu stat. +func ReadStat(stat *Stat) { + stat.Usage = atomic.LoadUint64(&usage) +} + +// GetInfo get cpu info. +func GetInfo() Info { + return stats.Info() +} + +// GetCPUUsage 获取瞬时 CPU 使用率(百分数) +func GetCPUUsage() float64 { + return float64(atomic.LoadUint64(&usage)) / 10 +} + +// GetCPUUsageInt 获取瞬时 CPU 使用率(百分数,整数) +func GetCPUUsageInt() int { + return int(atomic.LoadUint64(&usage) / 10) +} diff --git a/plugin/ratelimiter/bbr/cpu/stat_test.go b/plugin/ratelimiter/bbr/cpu/stat_test.go new file mode 100644 index 00000000..70e79c9a --- /dev/null +++ b/plugin/ratelimiter/bbr/cpu/stat_test.go @@ -0,0 +1,38 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package cpu + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStat(t *testing.T) { + time.Sleep(time.Second * 3) + + var s Stat + var i Info + ReadStat(&s) + i = GetInfo() + + assert.NotZero(t, s.Usage) + assert.NotZero(t, i.Frequency) + assert.NotZero(t, i.Quota) +} diff --git a/plugin/ratelimiter/bbr/cpu/utils.go b/plugin/ratelimiter/bbr/cpu/utils.go new file mode 100644 index 00000000..ad18e6ea --- /dev/null +++ b/plugin/ratelimiter/bbr/cpu/utils.go @@ -0,0 +1,138 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package cpu + +import ( + "bufio" + "fmt" + "io/ioutil" + "os" + "strconv" + "strings" +) + +func readFile(path string) (string, error) { + contents, err := ioutil.ReadFile(path) + if err != nil { + return "", err + } + return strings.TrimSpace(string(contents)), nil +} + +func parseUint(s string) (uint64, error) { + v, err := strconv.ParseUint(s, 10, 64) + if err != nil { + intValue, intErr := strconv.ParseInt(s, 10, 64) + // 1. Handle negative values greater than MinInt64 (and) + // 2. Handle negative values lesser than MinInt64 + if intErr == nil && intValue < 0 { + return 0, nil + } else if intErr != nil && + intErr.(*strconv.NumError).Err == strconv.ErrRange && + intValue < 0 { + return 0, nil + } + return 0, err + } + return v, nil +} + +// ParseUintList parses and validates the specified string as the value +// found in some cgroup file (e.g. cpuset.cpus, cpuset.mems), which could be +// one of the formats below. Note that duplicates are actually allowed in the +// input string. It returns a map[int]bool with available elements from val +// set to true. +// Supported formats: +// 7 +// 1-6 +// 0,3-4,7,8-10 +// 0-0,0,1-7 +// 03,1-3 <- this is gonna get parsed as [1,2,3] +// 3,2,1 +// 0-2,3,1 +func ParseUintList(val string) (map[int]bool, error) { + if val == "" { + return map[int]bool{}, nil + } + + availableInts := make(map[int]bool) + split := strings.Split(val, ",") + errInvalidFormat := fmt.Errorf("os/stat: invalid format: %s", val) + for _, r := range split { + if !strings.Contains(r, "-") { + v, err := strconv.Atoi(r) + if err != nil { + return nil, errInvalidFormat + } + availableInts[v] = true + } else { + split := strings.SplitN(r, "-", 2) + min, err := strconv.Atoi(split[0]) + if err != nil { + return nil, errInvalidFormat + } + max, err := strconv.Atoi(split[1]) + if err != nil { + return nil, errInvalidFormat + } + if max < min { + return nil, errInvalidFormat + } + for i := min; i <= max; i++ { + availableInts[i] = true + } + } + } + return availableInts, nil +} + +// readLines reads contents from a file and splits them by new lines. +// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). +func readLines(filename string) ([]string, error) { + return readLinesOffsetN(filename, 0, -1) +} + +// readLinesOffsetN reads contents from file and splits them by new line. +// The offset tells at which line number to start. +// The count determines the number of lines to read (starting from offset): +// +// n >= 0: at most n lines +// n < 0: whole file +func readLinesOffsetN(filename string, offset uint, n int) ([]string, error) { + f, err := os.Open(filename) + if err != nil { + return []string{""}, err + } + defer f.Close() + + var ret []string + + r := bufio.NewReader(f) + for i := 0; i < n+int(offset) || n < 0; i++ { + line, err := r.ReadString('\n') + if err != nil { + break + } + if i < int(offset) { + continue + } + ret = append(ret, strings.Trim(line, "\n")) + } + + return ret, nil +} diff --git a/plugin/ratelimiter/bbr/img.jpg b/plugin/ratelimiter/bbr/img.jpg new file mode 100644 index 00000000..1fb4d841 Binary files /dev/null and b/plugin/ratelimiter/bbr/img.jpg differ diff --git a/plugin/ratelimiter/bbr/plugin.go b/plugin/ratelimiter/bbr/plugin.go new file mode 100644 index 00000000..a7c44251 --- /dev/null +++ b/plugin/ratelimiter/bbr/plugin.go @@ -0,0 +1,83 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package bbr + +import ( + "github.com/polarismesh/polaris-go/pkg/config" + "github.com/polarismesh/polaris-go/pkg/model" + "github.com/polarismesh/polaris-go/pkg/plugin" + "github.com/polarismesh/polaris-go/pkg/plugin/common" + "github.com/polarismesh/polaris-go/pkg/plugin/ratelimiter" + "github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/core" + "github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/cpu" +) + +// BBRPlugin 基于 CPU BBR 策略的限流控制器 +type BBRPlugin struct { + *plugin.PluginBase + cfg *Config +} + +// Type 插件类型,这里是算 limiter 的一种 +func (g *BBRPlugin) Type() common.Type { + return common.TypeRateLimiter +} + +// Name 插件名,一个类型下插件名唯一 +func (g *BBRPlugin) Name() string { + return config.DefaultBBRRateLimiter +} + +// Init 初始化插件 +func (g *BBRPlugin) Init(ctx *plugin.InitContext) error { + if err := cpu.Init(); err != nil { + return err + } + + g.PluginBase = plugin.NewPluginBase(ctx) + cfgValue := ctx.Config.GetProvider().GetRateLimit().GetPluginConfig(g.Name()) + if cfgValue != nil { + g.cfg = cfgValue.(*Config) + core.SetDecay(g.cfg.Decay) + core.SetInterval(g.cfg.CPUSampleInterval) + } + + go core.CollectCPUStat() + return nil +} + +// Destroy 销毁插件,可用于释放资源 +func (g *BBRPlugin) Destroy() error { + return nil +} + +// IsEnable 配置是否打开标记 +func (g *BBRPlugin) IsEnable(cfg config.Configuration) bool { + return cfg.GetGlobal().GetSystem().GetMode() != model.ModeWithAgent +} + +// InitQuota 初始化并创建限流窗口 +// 主流程会在首次调用,以及规则对象变更的时候,调用该方法 +func (g *BBRPlugin) InitQuota(criteria *ratelimiter.InitCriteria) ratelimiter.QuotaBucket { + return createBBRPlugin(criteria.DstRule) +} + +// init 注册插件 +func init() { + plugin.RegisterConfigurablePlugin(&BBRPlugin{}, &Config{}) +} diff --git a/plugin/ratelimiter/bbr/window/counter.go b/plugin/ratelimiter/bbr/window/counter.go new file mode 100644 index 00000000..a624ba7c --- /dev/null +++ b/plugin/ratelimiter/bbr/window/counter.go @@ -0,0 +1,115 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package window + +import ( + "fmt" + "time" +) + +// Metric is a sample interface. +// Implementations of Metrics in metric package are Counter, Gauge, +// PointGauge, RollingCounter and RollingGauge. +type Metric interface { + // Add adds the given value to the counter. + Add(int64) + // Value gets the current value. + // If the metric's type is PointGauge, RollingCounter, RollingGauge, + // it returns the sum value within the window. + Value() int64 +} + +// Aggregation contains some common aggregation function. +// Each aggregation can compute summary statistics of window. +type Aggregation interface { + // Min finds the min value within the window. + Min() float64 + // Max finds the max value within the window. + Max() float64 + // Avg computes average value within the window. + Avg() float64 + // Sum computes sum value within the window. + Sum() float64 +} + +// RollingCounter represents a ring window based on time duration. +// e.g. [[1], [3], [5]] +type RollingCounter interface { + Metric + Aggregation + + Timespan() int + // Reduce applies the reduction function to all buckets within the window. + Reduce(func(Iterator) float64) float64 +} + +// RollingCounterOpts contains the arguments for creating RollingCounter. +type RollingCounterOpts struct { + Size int + BucketDuration time.Duration +} + +type rollingCounter struct { + policy *RollingPolicy +} + +// NewRollingCounter creates a new RollingCounter bases on RollingCounterOpts. +func NewRollingCounter(opts RollingCounterOpts) RollingCounter { + window := NewWindow(Options{Size: opts.Size}) + policy := NewRollingPolicy(window, RollingPolicyOpts{BucketDuration: opts.BucketDuration}) + return &rollingCounter{ + policy: policy, + } +} + +func (r *rollingCounter) Add(val int64) { + if val < 0 { + panic(fmt.Errorf("stat/metric: cannot decrease in value. val: %d", val)) + } + r.policy.Add(float64(val)) +} + +func (r *rollingCounter) Reduce(f func(Iterator) float64) float64 { + return r.policy.Reduce(f) +} + +func (r *rollingCounter) Avg() float64 { + return r.policy.Reduce(Avg) +} + +func (r *rollingCounter) Min() float64 { + return r.policy.Reduce(Min) +} + +func (r *rollingCounter) Max() float64 { + return r.policy.Reduce(Max) +} + +func (r *rollingCounter) Sum() float64 { + return r.policy.Reduce(Sum) +} + +func (r *rollingCounter) Value() int64 { + return int64(r.Sum()) +} + +func (r *rollingCounter) Timespan() int { + r.policy.mu.RLock() + defer r.policy.mu.RUnlock() + return r.policy.timespan() +} diff --git a/plugin/ratelimiter/bbr/window/counter_test.go b/plugin/ratelimiter/bbr/window/counter_test.go new file mode 100644 index 00000000..1311e229 --- /dev/null +++ b/plugin/ratelimiter/bbr/window/counter_test.go @@ -0,0 +1,173 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package window + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRollingCounterAdd(t *testing.T) { + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + listBuckets := func() [][]float64 { + buckets := make([][]float64, 0) + r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + buckets = append(buckets, bucket.Points) + } + return 0.0 + }) + return buckets + } + assert.Equal(t, [][]float64{{}, {}, {}}, listBuckets()) + r.Add(1) + assert.Equal(t, [][]float64{{}, {}, {1}}, listBuckets()) + time.Sleep(time.Second) + r.Add(2) + r.Add(3) + assert.Equal(t, [][]float64{{}, {1}, {5}}, listBuckets()) + time.Sleep(time.Second) + r.Add(4) + r.Add(5) + r.Add(6) + assert.Equal(t, [][]float64{{1}, {5}, {15}}, listBuckets()) + time.Sleep(time.Second) + r.Add(7) + assert.Equal(t, [][]float64{{5}, {15}, {7}}, listBuckets()) +} + +func TestRollingCounterReduce(t *testing.T) { + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + for x := 0; x < size; x = x + 1 { + for i := 0; i <= x; i++ { + r.Add(1) + } + if x < size-1 { + time.Sleep(bucketDuration) + } + } + var result = r.Reduce(func(iterator Iterator) float64 { + var result float64 + for iterator.Next() { + bucket := iterator.Bucket() + result += bucket.Points[0] + } + return result + }) + if result != 6.0 { + t.Fatalf("Validate sum of points. result: %f", result) + } +} + +func TestRollingCounterDataRace(t *testing.T) { + size := 3 + bucketDuration := time.Millisecond * 10 + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + var stop = make(chan bool) + go func() { + for { + select { + case <-stop: + return + default: + r.Add(1) + time.Sleep(time.Millisecond * 5) + } + } + }() + go func() { + for { + select { + case <-stop: + return + default: + _ = r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + for range bucket.Points { + continue + } + } + return 0 + }) + } + } + }() + time.Sleep(time.Second * 3) + close(stop) +} + +func BenchmarkRollingCounterIncr(b *testing.B) { + size := 3 + bucketDuration := time.Millisecond * 100 + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + b.ResetTimer() + for i := 0; i <= b.N; i++ { + r.Add(1) + } +} + +func BenchmarkRollingCounterReduce(b *testing.B) { + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + for i := 0; i <= 10; i++ { + r.Add(1) + time.Sleep(time.Millisecond * 500) + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + var _ = r.Reduce(func(i Iterator) float64 { + var result float64 + for i.Next() { + bucket := i.Bucket() + if len(bucket.Points) != 0 { + result += bucket.Points[0] + } + } + return result + }) + } +} diff --git a/plugin/ratelimiter/bbr/window/iterator.go b/plugin/ratelimiter/bbr/window/iterator.go new file mode 100644 index 00000000..a804aa91 --- /dev/null +++ b/plugin/ratelimiter/bbr/window/iterator.go @@ -0,0 +1,43 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package window + +import "fmt" + +// Iterator iterates the buckets within the window. +type Iterator struct { + count int + iteratedCount int + cur *Bucket +} + +// Next returns true util all of the buckets has been iterated. +func (i *Iterator) Next() bool { + return i.count != i.iteratedCount +} + +// Bucket gets current bucket. +func (i *Iterator) Bucket() Bucket { + if !(i.Next()) { + panic(fmt.Errorf("stat/metric: iteration out of range iteratedCount: %d count: %d", i.iteratedCount, i.count)) + } + bucket := *i.cur + i.iteratedCount++ + i.cur = i.cur.Next() + return bucket +} diff --git a/plugin/ratelimiter/bbr/window/policy.go b/plugin/ratelimiter/bbr/window/policy.go new file mode 100644 index 00000000..45b4dee1 --- /dev/null +++ b/plugin/ratelimiter/bbr/window/policy.go @@ -0,0 +1,114 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package window + +import ( + "sync" + "time" +) + +// RollingPolicy is a policy for ring window based on time duration. +// RollingPolicy moves bucket offset with time duration. +// e.g. If the last point is appended one bucket duration ago, +// RollingPolicy will increment current offset. +type RollingPolicy struct { + mu sync.RWMutex + size int + window *Window + offset int + + bucketDuration time.Duration + lastAppendTime time.Time +} + +// RollingPolicyOpts contains the arguments for creating RollingPolicy. +type RollingPolicyOpts struct { + BucketDuration time.Duration +} + +// NewRollingPolicy creates a new RollingPolicy based on the given window and RollingPolicyOpts. +func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy { + return &RollingPolicy{ + window: window, + size: window.Size(), + offset: 0, + + bucketDuration: opts.BucketDuration, + lastAppendTime: time.Now(), + } +} + +// timespan returns passed bucket number since lastAppendTime, +// if it is one bucket duration earlier than the last recorded +// time, it will return the size. +func (r *RollingPolicy) timespan() int { + v := int(time.Since(r.lastAppendTime) / r.bucketDuration) + if v > -1 { // maybe time backwards + return v + } + return r.size +} + +// apply applies function f with value val on +// current offset bucket, expired bucket will be reset +func (r *RollingPolicy) apply(f func(offset int, val float64), val float64) { + r.mu.Lock() + defer r.mu.Unlock() + + // calculate current offset + timespan := r.timespan() + oriTimespan := timespan + if timespan > 0 { + start := (r.offset + 1) % r.size + end := (r.offset + timespan) % r.size + if timespan > r.size { + timespan = r.size + } + // reset the expired buckets + r.window.ResetBuckets(start, timespan) + r.offset = end + r.lastAppendTime = r.lastAppendTime.Add(time.Duration(oriTimespan * int(r.bucketDuration))) + } + f(r.offset, val) +} + +// Append appends the given points to the window. +func (r *RollingPolicy) Append(val float64) { + r.apply(r.window.Append, val) +} + +// Add adds the given value to the latest point within bucket. +func (r *RollingPolicy) Add(val float64) { + r.apply(r.window.Add, val) +} + +// Reduce applies the reduction function to all buckets within the window. +func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) { + r.mu.RLock() + defer r.mu.RUnlock() + + timespan := r.timespan() + if count := r.size - timespan; count > 0 { + offset := r.offset + timespan + 1 + if offset >= r.size { + offset = offset - r.size + } + val = f(r.window.Iterator(offset, count)) + } + return val +} diff --git a/plugin/ratelimiter/bbr/window/policy_test.go b/plugin/ratelimiter/bbr/window/policy_test.go new file mode 100644 index 00000000..22cbc132 --- /dev/null +++ b/plugin/ratelimiter/bbr/window/policy_test.go @@ -0,0 +1,123 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package window + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func GetRollingPolicy() *RollingPolicy { + w := NewWindow(Options{Size: 3}) + return NewRollingPolicy(w, RollingPolicyOpts{BucketDuration: 100 * time.Millisecond}) +} + +func TestRollingPolicy_Add(t *testing.T) { + // test func timespan return real span + tests := []struct { + timeSleep []int + offset []int + points []int + }{ + { + timeSleep: []int{150, 51}, + offset: []int{1, 2}, + points: []int{1, 1}, + }, + { + timeSleep: []int{94, 250}, + offset: []int{0, 0}, + points: []int{1, 1}, + }, + { + timeSleep: []int{150, 300, 600}, + offset: []int{1, 1, 1}, + points: []int{1, 1, 1}, + }, + } + + for _, test := range tests { + t.Run("test policy add", func(t *testing.T) { + var totalTs, lastOffset int + timeSleep := test.timeSleep + policy := GetRollingPolicy() + for i, n := range timeSleep { + totalTs += n + time.Sleep(time.Duration(n) * time.Millisecond) + policy.Add(float64(test.points[i])) + offset, points := test.offset[i], test.points[i] + + assert.Equal(t, points, int(policy.window.buckets[offset].Points[0]), + fmt.Sprintf("error, time since last append: %vms, last offset: %v", totalTs, lastOffset)) + lastOffset = offset + } + }) + } +} + +func TestRollingPolicy_AddWithTimespan(t *testing.T) { + t.Run("timespan < bucket number", func(t *testing.T) { + policy := GetRollingPolicy() + // bucket 0 + policy.Add(0) + // bucket 1 + time.Sleep(101 * time.Millisecond) + policy.Add(1) + // bucket 2 + time.Sleep(101 * time.Millisecond) + policy.Add(2) + // bucket 1 + time.Sleep(201 * time.Millisecond) + policy.Add(4) + + for _, bkt := range policy.window.buckets { + t.Logf("%+v", bkt) + } + + assert.Equal(t, 0, len(policy.window.buckets[0].Points)) + assert.Equal(t, 4, int(policy.window.buckets[1].Points[0])) + assert.Equal(t, 2, int(policy.window.buckets[2].Points[0])) + }) + + t.Run("timespan > bucket number", func(t *testing.T) { + policy := GetRollingPolicy() + + // bucket 0 + policy.Add(0) + // bucket 1 + time.Sleep(101 * time.Millisecond) + policy.Add(1) + // bucket 2 + time.Sleep(101 * time.Millisecond) + policy.Add(2) + // bucket 1 + time.Sleep(501 * time.Millisecond) + policy.Add(4) + + for _, bkt := range policy.window.buckets { + t.Logf("%+v", bkt) + } + + assert.Equal(t, 0, len(policy.window.buckets[0].Points)) + assert.Equal(t, 4, int(policy.window.buckets[1].Points[0])) + assert.Equal(t, 0, len(policy.window.buckets[2].Points)) + }) +} diff --git a/plugin/ratelimiter/bbr/window/reduce.go b/plugin/ratelimiter/bbr/window/reduce.go new file mode 100644 index 00000000..014accee --- /dev/null +++ b/plugin/ratelimiter/bbr/window/reduce.go @@ -0,0 +1,94 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package window + +// Sum the values within the window. +func Sum(iterator Iterator) float64 { + var result = 0.0 + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + result = result + p + } + } + return result +} + +// Avg the values within the window. +func Avg(iterator Iterator) float64 { + var result = 0.0 + var count = 0.0 + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + result = result + p + count = count + 1 + } + } + return result / count +} + +// Min the values within the window. +func Min(iterator Iterator) float64 { + var result = 0.0 + var started = false + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + if !started { + result = p + started = true + continue + } + if p < result { + result = p + } + } + } + return result +} + +// Max the values within the window. +func Max(iterator Iterator) float64 { + var result = 0.0 + var started = false + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + if !started { + result = p + started = true + continue + } + if p > result { + result = p + } + } + } + return result +} + +// Count sums the count value within the window. +func Count(iterator Iterator) float64 { + var result int64 + for iterator.Next() { + bucket := iterator.Bucket() + result += bucket.Count + } + return float64(result) +} diff --git a/plugin/ratelimiter/bbr/window/window.go b/plugin/ratelimiter/bbr/window/window.go new file mode 100644 index 00000000..fc245f91 --- /dev/null +++ b/plugin/ratelimiter/bbr/window/window.go @@ -0,0 +1,125 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package window + +// Bucket contains multiple float64 points. +type Bucket struct { + Points []float64 + Count int64 + next *Bucket +} + +// Append appends the given value to the bucket. +func (b *Bucket) Append(val float64) { + b.Points = append(b.Points, val) + b.Count++ +} + +// Add adds the given value to the point. +func (b *Bucket) Add(offset int, val float64) { + b.Points[offset] += val + b.Count++ +} + +// Reset empties the bucket. +func (b *Bucket) Reset() { + b.Points = b.Points[:0] + b.Count = 0 +} + +// Next returns the next bucket. +func (b *Bucket) Next() *Bucket { + return b.next +} + +// Window contains multiple buckets. +type Window struct { + buckets []Bucket + size int +} + +// Options contains the arguments for creating Window. +type Options struct { + Size int +} + +// NewWindow creates a new Window based on WindowOpts. +func NewWindow(opts Options) *Window { + buckets := make([]Bucket, opts.Size) + for offset := range buckets { + buckets[offset].Points = make([]float64, 0) + nextOffset := offset + 1 + if nextOffset == opts.Size { + nextOffset = 0 + } + buckets[offset].next = &buckets[nextOffset] + } + return &Window{buckets: buckets, size: opts.Size} +} + +// ResetWindow empties all buckets within the window. +func (w *Window) ResetWindow() { + for offset := range w.buckets { + w.ResetBucket(offset) + } +} + +// ResetBucket empties the bucket based on the given offset. +func (w *Window) ResetBucket(offset int) { + w.buckets[offset%w.size].Reset() +} + +// ResetBuckets empties the buckets based on the given offsets. +func (w *Window) ResetBuckets(offset int, count int) { + for i := 0; i < count; i++ { + w.ResetBucket(offset + i) + } +} + +// Append appends the given value to the bucket where index equals the given offset. +func (w *Window) Append(offset int, val float64) { + w.buckets[offset%w.size].Append(val) +} + +// Add adds the given value to the latest point within bucket where index equals the given offset. +func (w *Window) Add(offset int, val float64) { + offset %= w.size + if w.buckets[offset].Count == 0 { + w.buckets[offset].Append(val) + return + } + w.buckets[offset].Add(0, val) +} + +// Bucket returns the bucket where index equals the given offset. +func (w *Window) Bucket(offset int) Bucket { + return w.buckets[offset%w.size] +} + +// Size returns the size of the window. +func (w *Window) Size() int { + return w.size +} + +// Iterator returns the count number buckets iterator from offset. +func (w *Window) Iterator(offset int, count int) Iterator { + return Iterator{ + count: count, + cur: &w.buckets[offset%w.size], + } +} diff --git a/plugin/ratelimiter/bbr/window/window_test.go b/plugin/ratelimiter/bbr/window/window_test.go new file mode 100644 index 00000000..87f1a77e --- /dev/null +++ b/plugin/ratelimiter/bbr/window/window_test.go @@ -0,0 +1,85 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package window + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWindowResetWindow(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetWindow() + for i := 0; i < opts.Size; i++ { + assert.Equal(t, len(window.Bucket(i).Points), 0) + } +} + +func TestWindowResetBucket(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetBucket(1) + assert.Equal(t, len(window.Bucket(1).Points), 0) + assert.Equal(t, window.Bucket(0).Points[0], float64(1.0)) + assert.Equal(t, window.Bucket(2).Points[0], float64(1.0)) +} + +func TestWindowResetBuckets(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetBuckets(0, 3) + for i := 0; i < opts.Size; i++ { + assert.Equal(t, len(window.Bucket(i).Points), 0) + } +} + +func TestWindowAppend(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + for i := 0; i < opts.Size; i++ { + assert.Equal(t, window.Bucket(i).Points[0], float64(1.0)) + } +} + +func TestWindowAdd(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + window.Append(0, 1.0) + window.Add(0, 1.0) + assert.Equal(t, window.Bucket(0).Points[0], float64(2.0)) +} + +func TestWindowSize(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + assert.Equal(t, window.Size(), 3) +}