Skip to content

Commit

Permalink
feat: qps limiter (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie authored May 28, 2024
1 parent 068acc7 commit e507cd4
Show file tree
Hide file tree
Showing 16 changed files with 175 additions and 120 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/cloudwego/fastpb v0.0.2
github.com/cloudwego/kitex v0.4.3
github.com/gogo/protobuf v1.3.2
github.com/juju/ratelimit v1.0.1
github.com/lesismal/arpc v1.2.4
github.com/lesismal/nbio v1.1.23-0.20210815145206-b106d99bce56
github.com/montanaflynn/stats v0.6.6
Expand Down
8 changes: 4 additions & 4 deletions runner/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func (c *Counter) Report(title string, totalns int64, concurrent int, total int6

var result string
if tp999/1000 < 1 {
result = fmt.Sprintf("[%s]: TPS: %.2f, TP99: %.2fus, TP999: %.2fus (b=%d Byte, c=%d, n=%d)",
title, tps, tp99/1000, tp999/1000, echoSize, concurrent, total)
result = fmt.Sprintf("[%s]: TPS: %.2f, TP99: %.2fus, TP999: %.2fus (b=%d Byte, c=%d, qps=%d, n=%d)",
title, tps, tp99/1000, tp999/1000, echoSize, concurrent, qps, total)
} else {
result = fmt.Sprintf("[%s]: TPS: %.2f, TP99: %.2fms, TP999: %.2fms (b=%d Byte, c=%d, n=%d)",
title, tps, tp99/1000000, tp999/1000000, echoSize, concurrent, total)
result = fmt.Sprintf("[%s]: TPS: %.2f, TP99: %.2fms, TP999: %.2fms (b=%d Byte, c=%d, qps=%d, n=%d)",
title, tps, tp99/1000000, tp999/1000000, echoSize, concurrent, qps, total)
}
logInfo(result)
return nil
Expand Down
6 changes: 4 additions & 2 deletions runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
echoSize int
total int64
concurrent int
qps int
poolSize int
sleepTime int
)
Expand All @@ -55,6 +56,7 @@ func initFlags() {
flag.StringVar(&address, "addr", "127.0.0.1:8000", "client call address")
flag.IntVar(&echoSize, "b", 1024, "echo size once")
flag.IntVar(&concurrent, "c", 100, "call concurrent")
flag.IntVar(&qps, "qps", 0, "call qps")
flag.Int64Var(&total, "n", 1024*100, "call total nums")
flag.IntVar(&poolSize, "pool", 10, "conn poll size")
flag.IntVar(&sleepTime, "sleep", 0, "sleep time for every request handler")
Expand Down Expand Up @@ -91,7 +93,7 @@ func Main(name string, newer ClientNewer) {
handler := func() error { return cli.Echo(action, payload) }

// === warming ===
r.Warmup(handler, concurrent, 100*1000)
r.Warmup(handler, concurrent, qps, 100*1000)

// === beginning ===
if err := cli.Echo(BeginAction, "empty"); err != nil {
Expand All @@ -101,7 +103,7 @@ func Main(name string, newer ClientNewer) {
recorder.Begin()

// === benching ===
r.Run(name, handler, concurrent, total, echoSize, sleepTime)
r.Run(name, handler, concurrent, qps, total, echoSize, sleepTime)

// == ending ===
recorder.End()
Expand Down
30 changes: 21 additions & 9 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/juju/ratelimit"
)

// 为了流量更均匀, 时间间隔设置为 10ms
Expand All @@ -33,21 +34,29 @@ type RunOnce func() error

type Runner struct {
counter *Counter // 计数器
timer *Timer // 计时器
timer Timer // 计时器
}

func NewRunner() *Runner {
r := &Runner{
counter: NewCounter(),
timer: NewTimer(time.Microsecond),
}
if qps == 0 {
r.timer = NewTimer(time.Microsecond)
} else {
r.timer = NewTimer(0)
}
return r
}

func (r *Runner) benching(onceFn RunOnce, concurrent int, total int64) {
func (r *Runner) benching(onceFn RunOnce, concurrent, qps int, total int64) {
var wg sync.WaitGroup
wg.Add(concurrent)
r.counter.Reset(total)
var qpsLimiter *ratelimit.Bucket
if qps > 0 {
qpsLimiter = ratelimit.NewBucketWithRate(float64(qps), int64(concurrent))
}
for i := 0; i < concurrent; i++ {
go func() {
defer wg.Done()
Expand All @@ -56,6 +65,9 @@ func (r *Runner) benching(onceFn RunOnce, concurrent int, total int64) {
if idx >= total {
return
}
if qpsLimiter != nil {
qpsLimiter.Wait(1)
}
begin := r.timer.Now()
err := onceFn()
end := r.timer.Now()
Expand All @@ -75,19 +87,19 @@ func (r *Runner) benching(onceFn RunOnce, concurrent int, total int64) {
r.counter.Total = total
}

func (r *Runner) Warmup(onceFn RunOnce, concurrent int, total int64) {
r.benching(onceFn, concurrent, total)
func (r *Runner) Warmup(onceFn RunOnce, concurrent, qps int, total int64) {
r.benching(onceFn, concurrent, qps, total)
}

// 并发测试
func (r *Runner) Run(title string, onceFn RunOnce, concurrent int, total int64, echoSize, sleepTime int) {
func (r *Runner) Run(title string, onceFn RunOnce, concurrent, qps int, total int64, echoSize, sleepTime int) {
logInfo(
"%s start benching [%s], concurrent: %d, total: %d, sleep: %d",
"["+title+"]", time.Now().String(), concurrent, total, sleepTime,
"%s start benching [%s], concurrent: %d, qps: %d, total: %d, sleep: %d",
"["+title+"]", time.Now().String(), concurrent, qps, total, sleepTime,
)

start := r.timer.Now()
r.benching(onceFn, concurrent, total)
r.benching(onceFn, concurrent, qps, total)
stop := r.timer.Now()
r.counter.Report(title, stop-start, concurrent, total, echoSize)
}
27 changes: 21 additions & 6 deletions runner/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,31 @@ import (
"time"
)

func NewTimer(window time.Duration) *Timer {
t := &Timer{window: window}
type Timer interface {
Now() int64
}

// NewTimer returns a Timer.
// window=0 means using the native timer.
func NewTimer(window time.Duration) Timer {
if window == 0 {
return &nativeTimer{}
}
t := &timer{window: window}
t.refresh()
return t
}

// 全局 Timer, 共享时间周期, 并在到期时执行回调
type Timer struct {
type timer struct {
sync.Once
now int64
window time.Duration
notify []func(now time.Time)
}

// refresh time
func (t *Timer) refresh() {
func (t *timer) refresh() {
t.Do(func() {
atomic.StoreInt64(&t.now, time.Now().UnixNano())
go func() {
Expand All @@ -48,11 +57,17 @@ func (t *Timer) refresh() {
})
}

func (t *Timer) Window() time.Duration {
func (t *timer) Window() time.Duration {
return t.window
}

// Timer 为共享计时器, 减少系统时间调用
func (t *Timer) Now() int64 {
func (t *timer) Now() int64 {
return atomic.LoadInt64(&t.now)
}

type nativeTimer struct{}

func (*nativeTimer) Now() int64 {
return time.Now().UnixNano()
}
32 changes: 17 additions & 15 deletions scripts/benchmark_generic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@ echo "Build finished."
# benchmark
for b in ${body[@]}; do
for c in ${concurrent[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail"
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"
for q in ${qps[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail"
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"

# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd
# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd

# stop server
kill_pid_listening_on_port ${ports[i]}
# stop server
kill_pid_listening_on_port ${ports[i]}
done
done
done
done
Expand Down
32 changes: 17 additions & 15 deletions scripts/benchmark_grpc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,25 @@ echo "Build finished."
# benchmark
for b in ${body[@]}; do
for c in ${concurrent[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail."
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"
for q in ${qps[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail."
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"

# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd
# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd

# stop server
kill_pid_listening_on_port ${ports[i]}
# stop server
kill_pid_listening_on_port ${ports[i]}
done
done
done
done
Expand Down
32 changes: 17 additions & 15 deletions scripts/benchmark_pb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@ echo "Build finished."
# benchmark
for b in ${body[@]}; do
for c in ${concurrent[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail."
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"
for q in ${qps[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail."
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"

# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd
# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd

# stop server
kill_pid_listening_on_port ${ports[i]}
# stop server
kill_pid_listening_on_port ${ports[i]}
done
done
done
done
Expand Down
32 changes: 17 additions & 15 deletions scripts/benchmark_streaming.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@ echo "Build finished."
# benchmark
for b in ${body[@]}; do
for c in ${concurrent[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail."
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"
for q in ${qps[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail."
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"

# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd
# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd

# stop server
kill_pid_listening_on_port ${ports[i]}
# stop server
kill_pid_listening_on_port ${ports[i]}
done
done
done
done
Expand Down
32 changes: 17 additions & 15 deletions scripts/benchmark_thrift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@ echo "Build finished."
# benchmark
for b in ${body[@]}; do
for c in ${concurrent[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail"
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"
for q in ${qps[@]}; do
for ((i = 0; i < ${#srepo[@]}; i++)); do
srp=${srepo[i]}
crp=${crepo[i]}
addr="127.0.0.1:${ports[i]}"
kill_pid_listening_on_port ${ports[i]}
# server start
echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail"
nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 &
sleep 1
echo "Server [$srp] running with [$cmd_server]"

# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd
# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd

# stop server
kill_pid_listening_on_port ${ports[i]}
# stop server
kill_pid_listening_on_port ${ports[i]}
done
done
done
done
Expand Down
Loading

0 comments on commit e507cd4

Please sign in to comment.