Skip to content

Commit

Permalink
feat: add method select
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed May 31, 2024
1 parent 3944da7 commit 6502914
Show file tree
Hide file tree
Showing 24 changed files with 194 additions and 158 deletions.
2 changes: 2 additions & 0 deletions README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ go tool pprof localhost:{port}/debug/pprof/{pprof_type}
```bash
# 发送压测请求数
n=5000000
# 请求体类型与请求方法,可选项:("echo", "echoComplex")
method="echo"
# 请求体大小
body=(1024 5120)
# 并发度
Expand Down
2 changes: 1 addition & 1 deletion generic/http/client/kitex_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type genericHTTPClient struct {
client genericclient.Client
}

func (cli *genericHTTPClient) Send(action, msg string) error {
func (cli *genericHTTPClient) Send(method, action, msg string) error {
ctx := context.Background()

url := fmt.Sprintf("http://example.com/echo/complex/%s", action)
Expand Down
2 changes: 1 addition & 1 deletion generic/json/client/kitex_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type genericJSONClient struct {
client genericclient.Client
}

func (cli *genericJSONClient) Send(action, msg string) error {
func (cli *genericJSONClient) Send(method, action, msg string) error {
ctx := context.Background()

reply, err := cli.client.GenericCall(ctx, "EchoComplex", GetJsonString(action, msg))
Expand Down
2 changes: 1 addition & 1 deletion generic/map/client/kitex_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type genericMapClient struct {
reqPool *sync.Pool
}

func (cli *genericMapClient) Send(action, msg string) error {
func (cli *genericMapClient) Send(method, action, msg string) error {
ctx := context.Background()
req := cli.reqPool.Get().(map[string]interface{})
defer cli.reqPool.Put(req)
Expand Down
2 changes: 1 addition & 1 deletion generic/ordinary/client/kitex_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type genericOrdinaryClient struct {
reqPool *sync.Pool
}

func (cli *genericOrdinaryClient) Send(action, msg string) error {
func (cli *genericOrdinaryClient) Send(method, action, msg string) error {
ctx := context.Background()
req := cli.reqPool.Get().(*echo.ComplexRequest)
defer cli.reqPool.Put(req)
Expand Down
2 changes: 1 addition & 1 deletion grpc/grpc/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type pbGrpcClient struct {
connpool *runner.Pool
}

func (cli *pbGrpcClient) Send(action, msg string) error {
func (cli *pbGrpcClient) Send(method, action, msg string) error {
ctx := context.Background()
req := cli.reqPool.Get().(*grpcg.Request)
defer cli.reqPool.Put(req)
Expand Down
2 changes: 1 addition & 1 deletion grpc/kitex/client/kitex_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type kClient struct {
reqPool *sync.Pool
}

func (cli *kClient) Send(action, msg string) error {
func (cli *kClient) Send(method, action, msg string) error {
ctx := context.Background()
req := cli.reqPool.Get().(*echo.Request)
defer cli.reqPool.Put(req)
Expand Down
2 changes: 1 addition & 1 deletion protobuf/arpc-nbio/client/arpc_nbio_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type pbArpcClient struct {
clipool *arpc.ClientPool
}

func (cli *pbArpcClient) Send(action, msg string) (err error) {
func (cli *pbArpcClient) Send(method, action, msg string) (err error) {
args := cli.reqPool.Get().(*gogo.Request)
reply := cli.respPool.Get().(*gogo.Response)
defer cli.reqPool.Put(args)
Expand Down
2 changes: 1 addition & 1 deletion protobuf/arpc/client/arpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type pbArpcClient struct {
clipool *arpc.ClientPool
}

func (cli *pbArpcClient) Send(action, msg string) (err error) {
func (cli *pbArpcClient) Send(method, action, msg string) (err error) {
args := cli.reqPool.Get().(*gogo.Request)
reply := cli.respPool.Get().(*gogo.Response)
defer cli.reqPool.Put(args)
Expand Down
2 changes: 1 addition & 1 deletion protobuf/grpc/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type pbGrpcClient struct {
connpool *runner.Pool
}

func (cli *pbGrpcClient) Send(action, msg string) error {
func (cli *pbGrpcClient) Send(method, action, msg string) error {
ctx := context.Background()
req := cli.reqPool.Get().(*grpcg.Request)
defer cli.reqPool.Put(req)
Expand Down
2 changes: 1 addition & 1 deletion protobuf/kitex-mux/client/kitex_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type pbKitexClient struct {
reqPool *sync.Pool
}

func (cli *pbKitexClient) Send(action, msg string) error {
func (cli *pbKitexClient) Send(method, action, msg string) error {
ctx := context.Background()
req := cli.reqPool.Get().(*echo.Request)
defer cli.reqPool.Put(req)
Expand Down
2 changes: 1 addition & 1 deletion protobuf/kitex/client/kitex_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type pbKitexClient struct {
reqPool *sync.Pool
}

func (cli *pbKitexClient) Send(action, msg string) error {
func (cli *pbKitexClient) Send(method, action, msg string) error {
ctx := context.Background()
req := cli.reqPool.Get().(*echo.Request)
defer cli.reqPool.Put(req)
Expand Down
2 changes: 1 addition & 1 deletion protobuf/rpcx/client/rpcx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type pbRpcxClient struct {
clipool *client.XClientPool
}

func (cli *pbRpcxClient) Send(action, msg string) (err error) {
func (cli *pbRpcxClient) Send(method, action, msg string) (err error) {
args := cli.reqPool.Get().(*gogo.Request)
reply := cli.respPool.Get().(*gogo.Response)
defer cli.reqPool.Put(args)
Expand Down
10 changes: 6 additions & 4 deletions runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
var (
address string
echoSize int
method string
total int64
concurrent int
qps int
Expand All @@ -45,7 +46,7 @@ type ClientNewer func(opt *Options) Client

type Client interface {
// Send implement client's custom RPC call logic
Send(action, msg string) (err error)
Send(method, action, msg string) (err error)
}

type Response struct {
Expand All @@ -55,6 +56,7 @@ type Response struct {

func initFlags() {
flag.StringVar(&address, "addr", "127.0.0.1:8000", "client call address")
flag.StringVar(&method, "method", "echo", "RPC method in (echo, echoComplex)")
flag.IntVar(&echoSize, "b", 1024, "echo size once")
flag.IntVar(&concurrent, "c", 100, "call concurrent")
flag.IntVar(&qps, "qps", 0, "call qps")
Expand Down Expand Up @@ -91,13 +93,13 @@ func Main(name string, newer ClientNewer) {
st := strconv.Itoa(sleepTime)
payload = fmt.Sprintf("%s,%s", st, payload[len(st)+1:])
}
handler := func() error { return cli.Send(action, payload) }
handler := func() error { return cli.Send(method, action, payload) }

// === warming ===
r.Warmup(handler, concurrent, qps, 100*1000)

// === beginning ===
if err := cli.Send(BeginAction, "empty"); err != nil {
if err := cli.Send(method, BeginAction, "empty"); err != nil {
log.Fatalf("beginning server failed: %v", err)
}
recorder := perf.NewRecorder(fmt.Sprintf("%s@Client", name))
Expand All @@ -108,7 +110,7 @@ func Main(name string, newer ClientNewer) {

// == ending ===
recorder.End()
if err := cli.Send(EndAction, "empty"); err != nil {
if err := cli.Send(method, EndAction, "empty"); err != nil {
log.Fatalf("ending server failed: %v", err)
}

Expand Down
11 changes: 5 additions & 6 deletions runner/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import (
)

const (
EchoAction = "echo"
EchoComplexAction = "echo_complex"
BeginAction = "begin"
EndAction = "end"
SleepAction = "sleep"
ReportAction = "report"
EchoAction = "echo"
BeginAction = "begin"
EndAction = "end"
SleepAction = "sleep"
ReportAction = "report"
)

func ProcessRequest(recorder *perf.Recorder, action, msg string) (retAction, retMsg string) {
Expand Down
4 changes: 2 additions & 2 deletions scripts/benchmark_thrift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ for b in ${body[@]}; do
echo "Server [$srp] running with [$cmd_server]"

# run client
echo "Client [$crp] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd
echo "Client [$crp-$method] running with [$cmd_client]"
$cmd_client $output_dir/bin/${crp}_bencher -addr="$addr" -method=$method -b=$b -c=$c -qps=$q -n=$n --sleep=$sleep | $tee_cmd

# stop server
kill_pid_listening_on_port ${ports[i]}
Expand Down
1 change: 1 addition & 0 deletions scripts/env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ set -e

# benchmark params
n=20000000
method="echo"
body=(1024)
concurrent=(100 200 400 600 800 1000)
qps=(0)
Expand Down
2 changes: 1 addition & 1 deletion streaming/grpc/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type grpcClient struct {
reqPool *sync.Pool
}

func (cli *grpcClient) Send(action, msg string) (err error) {
func (cli *grpcClient) Send(method, action, msg string) (err error) {
req := cli.reqPool.Get().(*grpcg.Request)
defer cli.reqPool.Put(req)

Expand Down
2 changes: 1 addition & 1 deletion streaming/kitex/client/kitex_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type kClient struct {
reqPool *sync.Pool
}

func (cli *kClient) Send(action, msg string) error {
func (cli *kClient) Send(method, action, msg string) error {
req := cli.reqPool.Get().(*echo.Request)
defer cli.reqPool.Put(req)

Expand Down
131 changes: 131 additions & 0 deletions thrift/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package thrift

import (
"context"
"fmt"
"strconv"
"strings"
"sync"

"github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo"
"github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo/echoserver"
"github.com/cloudwego/kitex-benchmark/runner"
)

type kitexClient struct {
client echoserver.Client
}

func NewKitexClient(client echoserver.Client) *kitexClient {
c := new(kitexClient)
c.client = client
return c
}

func (cli *kitexClient) Send(method, action, msg string) error {
switch strings.ToLower(method) {
case "echo":
return cli.echo(action, msg)
case "echocomplex":
return cli.echoComplex(action, msg)
default:
return fmt.Errorf("unknow method: %s", method)
}
}

var echoReqPool = sync.Pool{
New: func() interface{} {
return &echo.Request{}
},
}

func (cli *kitexClient) echo(action, msg string) error {
ctx := context.Background()
req := echoReqPool.Get().(*echo.Request)
defer echoReqPool.Put(req)

req.Action = action
req.Msg = msg

reply, err := cli.client.Echo(ctx, req)
if reply != nil {
runner.ProcessResponse(reply.Action, reply.Msg)
}
return err
}

var echoComplexReqPool = sync.Pool{
New: func() interface{} {
return &echo.ComplexRequest{}
},
}

func (cli *kitexClient) echoComplex(action, msg string) error {
ctx := context.Background()
req := echoComplexReqPool.Get().(*echo.ComplexRequest)
defer echoComplexReqPool.Put(req)

// 复杂结构体下,我们需要把 msg string 分拆到 complex request 中,保证整体包大小没有太大变化下,提高字段复杂度
const complexity = 16
msgSize := len(msg)
req.Action = action

req.MsgMap = make(map[string]*echo.SubMessage, complexity)
content := msg[msgSize/4*0 : msgSize/4*1]
for idx, str := range splitString(content, complexity) {
id := int64(idx)
req.MsgMap[strconv.Itoa(idx)] = &echo.SubMessage{
Id: &id,
Value: &str,
}
}

req.SubMsgs = make([]*echo.SubMessage, complexity)
content = msg[msgSize/4*1 : msgSize/4*2]
for idx, str := range splitString(content, complexity) {
id := int64(idx)
req.SubMsgs[idx] = &echo.SubMessage{
Id: &id,
Value: &str,
}
}

req.MsgSet = make([]*echo.Message, complexity)
content = msg[msgSize/4*2 : msgSize/4*3]
for idx, str := range splitString(content, complexity) {
id := int64(idx)
req.MsgSet[idx] = &echo.Message{
Id: &id,
SubMessages: []*echo.SubMessage{
{
Id: &id,
Value: &str,
},
},
}
}

req.FlagMsg = new(echo.Message)
content = msg[msgSize/4*3 : msgSize/4*4]
req.FlagMsg = &echo.Message{
Value: &content,
}

reply, err := cli.client.EchoComplex(ctx, req)
if reply != nil {
runner.ProcessResponse(reply.Action, reply.Msg)
}
return err
}

func splitString(str string, n int) []string {
ret := make([]string, n)
if n < 0 || len(str) < n {
return ret
}
single := len(str) / n
for i := 0; i < n; i++ {
ret[i] = str[i*single : (i+1)*single]
}
return ret
}
Loading

0 comments on commit 6502914

Please sign in to comment.