diff --git a/go.mod b/go.mod index 15a33de..625fad4 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/cloudwego/fastpb v0.0.2 github.com/cloudwego/kitex v0.4.3 github.com/gogo/protobuf v1.3.2 + github.com/juju/ratelimit v1.0.1 github.com/lesismal/arpc v1.2.4 github.com/lesismal/nbio v1.1.23-0.20210815145206-b106d99bce56 github.com/montanaflynn/stats v0.6.6 diff --git a/runner/counter.go b/runner/counter.go index fea4fcf..8a3c925 100644 --- a/runner/counter.go +++ b/runner/counter.go @@ -74,11 +74,11 @@ func (c *Counter) Report(title string, totalns int64, concurrent int, total int6 var result string if tp999/1000 < 1 { - result = fmt.Sprintf("[%s]: TPS: %.2f, TP99: %.2fus, TP999: %.2fus (b=%d Byte, c=%d, n=%d)", - title, tps, tp99/1000, tp999/1000, echoSize, concurrent, total) + result = fmt.Sprintf("[%s]: TPS: %.2f, TP99: %.2fus, TP999: %.2fus (b=%d Byte, c=%d, qps=%d, n=%d)", + title, tps, tp99/1000, tp999/1000, echoSize, concurrent, qps, total) } else { - result = fmt.Sprintf("[%s]: TPS: %.2f, TP99: %.2fms, TP999: %.2fms (b=%d Byte, c=%d, n=%d)", - title, tps, tp99/1000000, tp999/1000000, echoSize, concurrent, total) + result = fmt.Sprintf("[%s]: TPS: %.2f, TP99: %.2fms, TP999: %.2fms (b=%d Byte, c=%d, qps=%d, n=%d)", + title, tps, tp99/1000000, tp999/1000000, echoSize, concurrent, qps, total) } logInfo(result) return nil diff --git a/runner/main.go b/runner/main.go index 8f09c4f..82005f2 100644 --- a/runner/main.go +++ b/runner/main.go @@ -30,6 +30,7 @@ var ( echoSize int total int64 concurrent int + qps int poolSize int sleepTime int ) @@ -55,6 +56,7 @@ func initFlags() { flag.StringVar(&address, "addr", "127.0.0.1:8000", "client call address") flag.IntVar(&echoSize, "b", 1024, "echo size once") flag.IntVar(&concurrent, "c", 100, "call concurrent") + flag.IntVar(&qps, "qps", 0, "call qps") flag.Int64Var(&total, "n", 1024*100, "call total nums") flag.IntVar(&poolSize, "pool", 10, "conn poll size") flag.IntVar(&sleepTime, "sleep", 0, "sleep time for every request handler") @@ -91,7 +93,7 @@ func Main(name string, newer ClientNewer) { handler := func() error { return cli.Echo(action, payload) } // === warming === - r.Warmup(handler, concurrent, 100*1000) + r.Warmup(handler, concurrent, qps, 100*1000) // === beginning === if err := cli.Echo(BeginAction, "empty"); err != nil { @@ -101,7 +103,7 @@ func Main(name string, newer ClientNewer) { recorder.Begin() // === benching === - r.Run(name, handler, concurrent, total, echoSize, sleepTime) + r.Run(name, handler, concurrent, qps, total, echoSize, sleepTime) // == ending === recorder.End() diff --git a/runner/runner.go b/runner/runner.go index c29fa56..cb4e944 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -23,6 +23,7 @@ import ( "github.com/cloudwego/kitex/pkg/kerrors" "github.com/cloudwego/kitex/pkg/klog" + "github.com/juju/ratelimit" ) // 为了流量更均匀, 时间间隔设置为 10ms @@ -33,21 +34,29 @@ type RunOnce func() error type Runner struct { counter *Counter // 计数器 - timer *Timer // 计时器 + timer Timer // 计时器 } func NewRunner() *Runner { r := &Runner{ counter: NewCounter(), - timer: NewTimer(time.Microsecond), + } + if qps == 0 { + r.timer = NewTimer(time.Microsecond) + } else { + r.timer = NewTimer(0) } return r } -func (r *Runner) benching(onceFn RunOnce, concurrent int, total int64) { +func (r *Runner) benching(onceFn RunOnce, concurrent, qps int, total int64) { var wg sync.WaitGroup wg.Add(concurrent) r.counter.Reset(total) + var qpsLimiter *ratelimit.Bucket + if qps > 0 { + qpsLimiter = ratelimit.NewBucketWithRate(float64(qps), int64(concurrent)) + } for i := 0; i < concurrent; i++ { go func() { defer wg.Done() @@ -56,6 +65,9 @@ func (r *Runner) benching(onceFn RunOnce, concurrent int, total int64) { if idx >= total { return } + if qpsLimiter != nil { + qpsLimiter.Wait(1) + } begin := r.timer.Now() err := onceFn() end := r.timer.Now() @@ -75,19 +87,19 @@ func (r *Runner) benching(onceFn RunOnce, concurrent int, total int64) { r.counter.Total = total } -func (r *Runner) Warmup(onceFn RunOnce, concurrent int, total int64) { - r.benching(onceFn, concurrent, total) +func (r *Runner) Warmup(onceFn RunOnce, concurrent, qps int, total int64) { + r.benching(onceFn, concurrent, qps, total) } // 并发测试 -func (r *Runner) Run(title string, onceFn RunOnce, concurrent int, total int64, echoSize, sleepTime int) { +func (r *Runner) Run(title string, onceFn RunOnce, concurrent, qps int, total int64, echoSize, sleepTime int) { logInfo( - "%s start benching [%s], concurrent: %d, total: %d, sleep: %d", - "["+title+"]", time.Now().String(), concurrent, total, sleepTime, + "%s start benching [%s], concurrent: %d, qps: %d, total: %d, sleep: %d", + "["+title+"]", time.Now().String(), concurrent, qps, total, sleepTime, ) start := r.timer.Now() - r.benching(onceFn, concurrent, total) + r.benching(onceFn, concurrent, qps, total) stop := r.timer.Now() r.counter.Report(title, stop-start, concurrent, total, echoSize) } diff --git a/runner/timer.go b/runner/timer.go index c5f942d..4d4dab0 100644 --- a/runner/timer.go +++ b/runner/timer.go @@ -22,14 +22,23 @@ import ( "time" ) -func NewTimer(window time.Duration) *Timer { - t := &Timer{window: window} +type Timer interface { + Now() int64 +} + +// NewTimer returns a Timer. +// window=0 means using the native timer. +func NewTimer(window time.Duration) Timer { + if window == 0 { + return &nativeTimer{} + } + t := &timer{window: window} t.refresh() return t } // 全局 Timer, 共享时间周期, 并在到期时执行回调 -type Timer struct { +type timer struct { sync.Once now int64 window time.Duration @@ -37,7 +46,7 @@ type Timer struct { } // refresh time -func (t *Timer) refresh() { +func (t *timer) refresh() { t.Do(func() { atomic.StoreInt64(&t.now, time.Now().UnixNano()) go func() { @@ -48,11 +57,17 @@ func (t *Timer) refresh() { }) } -func (t *Timer) Window() time.Duration { +func (t *timer) Window() time.Duration { return t.window } // Timer 为共享计时器, 减少系统时间调用 -func (t *Timer) Now() int64 { +func (t *timer) Now() int64 { return atomic.LoadInt64(&t.now) } + +type nativeTimer struct{} + +func (*nativeTimer) Now() int64 { + return time.Now().UnixNano() +} diff --git a/scripts/benchmark_generic.sh b/scripts/benchmark_generic.sh index ca3088a..c25eb7d 100755 --- a/scripts/benchmark_generic.sh +++ b/scripts/benchmark_generic.sh @@ -17,23 +17,25 @@ echo "Build finished." # benchmark for b in ${body[@]}; do for c in ${concurrent[@]}; do - for ((i = 0; i < ${#srepo[@]}; i++)); do - srp=${srepo[i]} - crp=${crepo[i]} - addr="127.0.0.1:${ports[i]}" - kill_pid_listening_on_port ${ports[i]} - # server start - echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail" - nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & - sleep 1 - echo "Server [$srp] running with [$cmd_server]" + for q in ${qps[@]}; do + for ((i = 0; i < ${#srepo[@]}; i++)); do + srp=${srepo[i]} + crp=${crepo[i]} + addr="127.0.0.1:${ports[i]}" + kill_pid_listening_on_port ${ports[i]} + # server start + echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail" + nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & + sleep 1 + echo "Server [$srp] running with [$cmd_server]" - # run client - echo "Client [$crp] running with [$cmd_client]" - $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + # run client + echo "Client [$crp] running with [$cmd_client]" + $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd - # stop server - kill_pid_listening_on_port ${ports[i]} + # stop server + kill_pid_listening_on_port ${ports[i]} + done done done done diff --git a/scripts/benchmark_grpc.sh b/scripts/benchmark_grpc.sh index a600e0c..f095580 100755 --- a/scripts/benchmark_grpc.sh +++ b/scripts/benchmark_grpc.sh @@ -16,23 +16,25 @@ echo "Build finished." # benchmark for b in ${body[@]}; do for c in ${concurrent[@]}; do - for ((i = 0; i < ${#srepo[@]}; i++)); do - srp=${srepo[i]} - crp=${crepo[i]} - addr="127.0.0.1:${ports[i]}" - kill_pid_listening_on_port ${ports[i]} - # server start - echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail." - nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & - sleep 1 - echo "Server [$srp] running with [$cmd_server]" + for q in ${qps[@]}; do + for ((i = 0; i < ${#srepo[@]}; i++)); do + srp=${srepo[i]} + crp=${crepo[i]} + addr="127.0.0.1:${ports[i]}" + kill_pid_listening_on_port ${ports[i]} + # server start + echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail." + nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & + sleep 1 + echo "Server [$srp] running with [$cmd_server]" - # run client - echo "Client [$crp] running with [$cmd_client]" - $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + # run client + echo "Client [$crp] running with [$cmd_client]" + $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd - # stop server - kill_pid_listening_on_port ${ports[i]} + # stop server + kill_pid_listening_on_port ${ports[i]} + done done done done diff --git a/scripts/benchmark_pb.sh b/scripts/benchmark_pb.sh index b7a086f..936cf2d 100755 --- a/scripts/benchmark_pb.sh +++ b/scripts/benchmark_pb.sh @@ -17,23 +17,25 @@ echo "Build finished." # benchmark for b in ${body[@]}; do for c in ${concurrent[@]}; do - for ((i = 0; i < ${#srepo[@]}; i++)); do - srp=${srepo[i]} - crp=${crepo[i]} - addr="127.0.0.1:${ports[i]}" - kill_pid_listening_on_port ${ports[i]} - # server start - echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail." - nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & - sleep 1 - echo "Server [$srp] running with [$cmd_server]" + for q in ${qps[@]}; do + for ((i = 0; i < ${#srepo[@]}; i++)); do + srp=${srepo[i]} + crp=${crepo[i]} + addr="127.0.0.1:${ports[i]}" + kill_pid_listening_on_port ${ports[i]} + # server start + echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail." + nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & + sleep 1 + echo "Server [$srp] running with [$cmd_server]" - # run client - echo "Client [$crp] running with [$cmd_client]" - $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + # run client + echo "Client [$crp] running with [$cmd_client]" + $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd - # stop server - kill_pid_listening_on_port ${ports[i]} + # stop server + kill_pid_listening_on_port ${ports[i]} + done done done done diff --git a/scripts/benchmark_streaming.sh b/scripts/benchmark_streaming.sh index bc97236..1ac5373 100755 --- a/scripts/benchmark_streaming.sh +++ b/scripts/benchmark_streaming.sh @@ -17,23 +17,25 @@ echo "Build finished." # benchmark for b in ${body[@]}; do for c in ${concurrent[@]}; do - for ((i = 0; i < ${#srepo[@]}; i++)); do - srp=${srepo[i]} - crp=${crepo[i]} - addr="127.0.0.1:${ports[i]}" - kill_pid_listening_on_port ${ports[i]} - # server start - echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail." - nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & - sleep 1 - echo "Server [$srp] running with [$cmd_server]" + for q in ${qps[@]}; do + for ((i = 0; i < ${#srepo[@]}; i++)); do + srp=${srepo[i]} + crp=${crepo[i]} + addr="127.0.0.1:${ports[i]}" + kill_pid_listening_on_port ${ports[i]} + # server start + echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail." + nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & + sleep 1 + echo "Server [$srp] running with [$cmd_server]" - # run client - echo "Client [$crp] running with [$cmd_client]" - $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + # run client + echo "Client [$crp] running with [$cmd_client]" + $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd - # stop server - kill_pid_listening_on_port ${ports[i]} + # stop server + kill_pid_listening_on_port ${ports[i]} + done done done done diff --git a/scripts/benchmark_thrift.sh b/scripts/benchmark_thrift.sh index 6915c43..a4180d2 100755 --- a/scripts/benchmark_thrift.sh +++ b/scripts/benchmark_thrift.sh @@ -17,23 +17,25 @@ echo "Build finished." # benchmark for b in ${body[@]}; do for c in ${concurrent[@]}; do - for ((i = 0; i < ${#srepo[@]}; i++)); do - srp=${srepo[i]} - crp=${crepo[i]} - addr="127.0.0.1:${ports[i]}" - kill_pid_listening_on_port ${ports[i]} - # server start - echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail" - nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & - sleep 1 - echo "Server [$srp] running with [$cmd_server]" + for q in ${qps[@]}; do + for ((i = 0; i < ${#srepo[@]}; i++)); do + srp=${srepo[i]} + crp=${crepo[i]} + addr="127.0.0.1:${ports[i]}" + kill_pid_listening_on_port ${ports[i]} + # server start + echo "Starting server [$srp], if failed please check [output/log/nohup.log] for detail" + nohup $cmd_server $output_dir/bin/${srp}_reciever >> $output_dir/log/nohup.log 2>&1 & + sleep 1 + echo "Server [$srp] running with [$cmd_server]" - # run client - echo "Client [$crp] running with [$cmd_client]" - $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + # run client + echo "Client [$crp] running with [$cmd_client]" + $cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd - # stop server - kill_pid_listening_on_port ${ports[i]} + # stop server + kill_pid_listening_on_port ${ports[i]} + done done done done diff --git a/scripts/env.sh b/scripts/env.sh index f3253ea..a8edd35 100755 --- a/scripts/env.sh +++ b/scripts/env.sh @@ -5,6 +5,7 @@ set -e n=20000000 body=(1024) concurrent=(100 200 400 600 800 1000) +qps=(0) sleep=0 CURDIR=$(cd $(dirname $0); pwd) diff --git a/scripts/reports/diff.py b/scripts/reports/diff.py index cd91cb7..e4e57ba 100644 --- a/scripts/reports/diff.py +++ b/scripts/reports/diff.py @@ -3,8 +3,8 @@ import sys '''CSV Format Example -Kind,Concurrency,Data Size,TPS,P99,P999 -[GRPC],100,1024,101152.29,3.36,5.30 +Kind,Concurrency,Data Size,TPS,P99,P999,Server_CPU,Client_CPU +[GRPC],100,1024,101152.29,3.36,5.30,188.04,423.07 ''' @@ -23,7 +23,7 @@ class bcolors: def diff(from_csv, to_csv): from_reader = list(csv.reader(open(from_csv))) to_reader = csv.reader(open(to_csv)) - title = ['Kind', 'Concurrency', 'Data Size', 'QPS', 'P99', 'P999'] + title = ['Kind', 'Concurrency', 'Data Size', 'QPS', 'P99', 'P999', 'Client CPU', 'Server CPU'] results = [] for line_num, line in enumerate(to_reader): @@ -35,6 +35,8 @@ def diff(from_csv, to_csv): result.append(diff_cell(from_reader[line_num][3], line[3])) # tps result.append(diff_cell(from_reader[line_num][4], line[4])) # p99 result.append(diff_cell(from_reader[line_num][5], line[5])) # p999 + result.append(diff_cell(from_reader[line_num][6], line[6])) # Server CPU + result.append(diff_cell(from_reader[line_num][7], line[7])) # Client CPU results.append(result) diff --git a/scripts/reports/to_csv.sh b/scripts/reports/to_csv.sh index 92edd60..4550e0a 100755 --- a/scripts/reports/to_csv.sh +++ b/scripts/reports/to_csv.sh @@ -2,4 +2,10 @@ # csv # usage: to_csv.sh xxx.log -grep TPS "$1" | awk -F '[ :,]+' '{split($6,a,"m");split($8,b,"m");print $2","substr($11,3)","substr($9,4)","$4","a[1]","b[1]}' +output_dir=$(dirname "$1") +# Kind,Concurrency,Data_Size,TPS,P99,P999,Server_CPU,Client_CPU +grep TPS "$1" | awk '{print $2" "$11" "$9" "$4" "$6" "$8}' | awk '{gsub(/[:c=,(b=msAVG%]/, "")} 1' > $output_dir/tps.out +grep '@Server' "$1" | grep CPU | awk '{print $14}' | awk '{gsub(/[:c=,(b=msAVG%]/, "")} 1' > $output_dir/server.out +grep '@Client' "$1" | grep CPU | awk '{print $14}' | awk '{gsub(/[:c=,(b=msAVG%]/, "")} 1' > $output_dir/client.out +# combine each line, replace space by comma +awk '{ lines[FNR] = lines[FNR] $0 " " } END { for (i=1; i<=FNR; i++) print lines[i] }' $output_dir/tps.out $output_dir/server.out $output_dir/client.out | awk '{ print substr($0, 1, length($0)-1) }' | awk '{gsub(" ", ",")} 1' diff --git a/scripts/run_grpc_clients.sh b/scripts/run_grpc_clients.sh index 60238fa..dff1a4f 100755 --- a/scripts/run_grpc_clients.sh +++ b/scripts/run_grpc_clients.sh @@ -12,13 +12,15 @@ source $CURDIR/build_grpc.sh # benchmark for b in ${body[@]}; do for c in ${concurrent[@]}; do - for ((i = 0; i < ${#repo[@]}; i++)); do - rp=${repo[i]} - addr="${ip}:${ports[i]}" + for q in ${qps[@]}; do + for ((i = 0; i < ${#repo[@]}; i++)); do + rp=${repo[i]} + addr="${ip}:${ports[i]}" - # run client - echo "Client [$rp] running with [$taskset_client]" - $cmd_client $output_dir/bin/${rp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + # run client + echo "Client [$rp] running with [$taskset_client]" + $cmd_client $output_dir/bin/${rp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd + done done done done diff --git a/scripts/run_pb_clients.sh b/scripts/run_pb_clients.sh index 2d6253d..ee0dc3e 100755 --- a/scripts/run_pb_clients.sh +++ b/scripts/run_pb_clients.sh @@ -12,13 +12,15 @@ source $CURDIR/build_pb.sh # benchmark for b in ${body[@]}; do for c in ${concurrent[@]}; do - for ((i = 0; i < ${#repo[@]}; i++)); do - rp=${repo[i]} - addr="${ip}:${ports[i]}" + for q in ${qps[@]}; do + for ((i = 0; i < ${#repo[@]}; i++)); do + rp=${repo[i]} + addr="${ip}:${ports[i]}" - # run client - echo "Client [$rp] running with [$taskset_client]" - $cmd_client $output_dir/bin/${rp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + # run client + echo "Client [$rp] running with [$taskset_client]" + $cmd_client $output_dir/bin/${rp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd + done done done done diff --git a/scripts/run_thrift_clients.sh b/scripts/run_thrift_clients.sh index 81f1ca1..899af04 100755 --- a/scripts/run_thrift_clients.sh +++ b/scripts/run_thrift_clients.sh @@ -12,16 +12,18 @@ source $CURDIR/build_thrift.sh # benchmark for b in ${body[@]}; do for c in ${concurrent[@]}; do - for ((i = 0; i < ${#repo[@]}; i++)); do - rp=${repo[i]} - addr="${ip}:${ports[i]}" + for q in ${qps[@]}; do + for ((i = 0; i < ${#repo[@]}; i++)); do + rp=${repo[i]} + addr="${ip}:${ports[i]}" - # run client - echo "Client [$rp] running with [$cmd_client]" - $cmd_client $output_dir/bin/${rp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + # run client + echo "Client [$rp] running with [$cmd_client]" + $cmd_client $output_dir/bin/${rp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd - echo "client $rp running with $cmd_client" - $cmd_client $output_dir/bin/${rp}_bencher -addr="$addr" -b=$b -c=$c -n=$n --sleep=$sleep | $tee_cmd + echo "client $rp running with $cmd_client" + $cmd_client $output_dir/bin/${rp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd + done done done done