From c3534ad0e8e9b12e39fd12a3635302ade63d8cf5 Mon Sep 17 00:00:00 2001 From: Zhuowei Wang Date: Tue, 12 Nov 2024 17:08:14 +0800 Subject: [PATCH] using new api --- .../kitex_gen/echo/streamserver/client.go | 17 +++++++------ .../kitex_gen/echo/streamserver/server.go | 25 +++++++++++++------ .../echo/streamserver/streamserver.go | 13 +++++----- go.mod | 8 +++--- go.sum | 16 ++++++------ streaming/kitex_tts_lconn/client/client.go | 5 ++-- streaming/kitex_tts_lconn/main.go | 7 ++---- streaming/kitex_tts_mux/client/client.go | 9 ++++--- streaming/kitex_tts_mux/main.go | 7 ++---- 9 files changed, 58 insertions(+), 49 deletions(-) diff --git a/codec/thrift/kitex_gen/echo/streamserver/client.go b/codec/thrift/kitex_gen/echo/streamserver/client.go index 0ab5c1c..4fa8173 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/client.go +++ b/codec/thrift/kitex_gen/echo/streamserver/client.go @@ -4,6 +4,7 @@ package streamserver import ( "context" + echo "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" client "github.com/cloudwego/kitex/client" "github.com/cloudwego/kitex/client/streamxclient" @@ -14,23 +15,23 @@ import ( ) type Client interface { - Echo(ctx context.Context, callOptions ...streamxcallopt.CallOption) (stream streamx.BidiStreamingClient[echo.Request, echo.Response], err error) + Echo(ctx context.Context, callOptions ...streamxcallopt.CallOption) (context.Context, streamx.BidiStreamingClient[echo.Request, echo.Response], error) } -func NewClient(destService string, opts ...streamxclient.Option) (Client, error) { - var options []streamxclient.Option - options = append(options, streamxclient.WithDestService(destService)) +func NewClient(destService string, opts ...client.Option) (Client, error) { + var options []client.Option + options = append(options, client.WithDestService(destService)) cp, err := ttstream.NewClientProvider(ServiceInfo) if err != nil { return nil, err } options = append(options, streamxclient.WithProvider(cp)) options = append(options, opts...) - cli, err := streamxclient.NewClient(ServiceInfo, options...) + cli, err := client.NewClient(ServiceInfo, options...) if err != nil { return nil, err } - kc := &kClient{streamer: cli, caller: cli.(client.Client)} + kc := &kClient{streamer: cli.(client.StreamX), caller: cli.(client.Client)} return kc, nil } @@ -38,10 +39,10 @@ var _ Client = (*kClient)(nil) type kClient struct { caller client.Client - streamer streamxclient.Client + streamer client.StreamX } -func (c *kClient) Echo(ctx context.Context, callOptions ...streamxcallopt.CallOption) (stream streamx.BidiStreamingClient[echo.Request, echo.Response], err error) { +func (c *kClient) Echo(ctx context.Context, callOptions ...streamxcallopt.CallOption) (context.Context, streamx.BidiStreamingClient[echo.Request, echo.Response], error) { return streamxclient.InvokeStream[echo.Request, echo.Response]( ctx, c.streamer, serviceinfo.StreamingBidirectional, "Echo", nil, nil, callOptions...) } diff --git a/codec/thrift/kitex_gen/echo/streamserver/server.go b/codec/thrift/kitex_gen/echo/streamserver/server.go index 98f956a..2ddeb66 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/server.go +++ b/codec/thrift/kitex_gen/echo/streamserver/server.go @@ -3,6 +3,7 @@ package streamserver import ( "context" + echo "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" "github.com/cloudwego/kitex/pkg/streamx" "github.com/cloudwego/kitex/pkg/streamx/provider/ttstream" @@ -15,13 +16,21 @@ type Server interface { } func RegisterService(svr server.Server, handler Server, opts ...server.RegisterOption) error { - sp, err := ttstream.NewServerProvider(ServiceInfo) - if err != nil { - return err - } - nopts := []server.RegisterOption{ - streamxserver.WithProvider(sp), + return svr.RegisterService(ServiceInfo, handler, opts...) +} + +func NewServer(handler Server, opts ...server.Option) server.Server { + var options []server.Option + + sp, _ := ttstream.NewServerProvider(ServiceInfo) + options = append(options, streamxserver.WithProvider(sp)) + + options = append(options, opts...) + options = append(options, server.WithCompatibleMiddlewareForUnary()) + + svr := server.NewServer(options...) + if err := svr.RegisterService(ServiceInfo, handler); err != nil { + panic(err) } - nopts = append(nopts, opts...) - return svr.RegisterService(ServiceInfo, handler, nopts...) + return svr } diff --git a/codec/thrift/kitex_gen/echo/streamserver/streamserver.go b/codec/thrift/kitex_gen/echo/streamserver/streamserver.go index 1c5d1af..8891dd0 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/streamserver.go +++ b/codec/thrift/kitex_gen/echo/streamserver/streamserver.go @@ -15,17 +15,18 @@ var ServiceInfo = &serviceinfo.ServiceInfo{ Methods: map[string]serviceinfo.MethodInfo{ "Echo": serviceinfo.NewMethodInfo( func(ctx context.Context, handler, reqArgs, resArgs interface{}) error { - return streamxserver.InvokeStream[echo.Request, echo.Response]( - ctx, serviceinfo.StreamingBidirectional, handler.(streamx.StreamHandler), reqArgs.(streamx.StreamReqArgs), resArgs.(streamx.StreamResArgs)) + return streamxserver.InvokeBidiStreamHandler[echo.Request, echo.Response]( + ctx, reqArgs.(streamx.StreamReqArgs), resArgs.(streamx.StreamResArgs), + func(ctx context.Context, stream streamx.BidiStreamingServer[echo.Request, echo.Response]) error { + return handler.(Server).Echo(ctx, stream) + }, + ) }, nil, nil, false, serviceinfo.WithStreamingMode(serviceinfo.StreamingBidirectional), + serviceinfo.WithMethodExtra("streamx", "true"), ), }, - Extra: map[string]interface{}{ - "streaming": true, - "streamx": true, - }, } diff --git a/go.mod b/go.mod index 3d2f2ae..a685401 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/cloudfoundry/gosigar v1.3.3 github.com/cloudwego/fastpb v0.0.5 github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682 - github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9 + github.com/cloudwego/kitex v0.11.3-0.20241112064056-c0dad5a05e03 github.com/gogo/protobuf v1.3.2 github.com/juju/ratelimit v1.0.1 github.com/lesismal/arpc v1.2.4 @@ -29,13 +29,13 @@ require ( github.com/cheekybits/genny v1.0.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/configmanager v0.2.2 // indirect - github.com/cloudwego/dynamicgo v0.4.0 // indirect + github.com/cloudwego/dynamicgo v0.4.4 // indirect github.com/cloudwego/frugal v0.2.0 // indirect github.com/cloudwego/iasm v0.2.0 // indirect - github.com/cloudwego/localsession v0.0.2 // indirect + github.com/cloudwego/localsession v0.1.1 // indirect github.com/cloudwego/netpoll v0.6.5-0.20240911104114-8a1f5597a920 // indirect github.com/cloudwego/runtimex v0.1.0 // indirect - github.com/cloudwego/thriftgo v0.3.17 // indirect + github.com/cloudwego/thriftgo v0.3.18 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-jump v0.0.0-20170409065014-e1f439676b57 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect diff --git a/go.sum b/go.sum index c6d5bc6..72e6b08 100644 --- a/go.sum +++ b/go.sum @@ -61,8 +61,8 @@ github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/ github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/configmanager v0.2.2 h1:sVrJB8gWYTlPV2OS3wcgJSO9F2/9Zbkmcm1Z7jempOU= github.com/cloudwego/configmanager v0.2.2/go.mod h1:ppiyU+5TPLonE8qMVi/pFQk2eL3Q4P7d4hbiNJn6jwI= -github.com/cloudwego/dynamicgo v0.4.0 h1:wQqNRNiSQaLkbcn3sfpEJGZsz3xf8Il4P/3DcENsrFI= -github.com/cloudwego/dynamicgo v0.4.0/go.mod h1:zgWk2oz56EyH790LJSxrTz1j01GJBO964jJQ/y7qjJc= +github.com/cloudwego/dynamicgo v0.4.4 h1:RuHhjy44Ajy2PLjrwOhI9EY874t9srhgwd/rkKTUKfQ= +github.com/cloudwego/dynamicgo v0.4.4/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY= github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZU= github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk= github.com/cloudwego/frugal v0.2.0 h1:0ETSzQYoYqVvdl7EKjqJ9aJnDoG6TzvNKV3PMQiQTS8= @@ -71,16 +71,16 @@ github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682 h1:hj/AhlEngERp5 github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= -github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9 h1:E0NJGsawD+Jpv9VL5Nvr4vgThpNqIti4vzFRbEzvHRw= -github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9/go.mod h1:xavkyMxJxzdHCLuSDk9r51V6z11eQVAL4UDow3DH0kM= -github.com/cloudwego/localsession v0.0.2 h1:N9/IDtCPj1fCL9bCTP+DbXx3f40YjVYWcwkJG0YhQkY= -github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8= +github.com/cloudwego/kitex v0.11.3-0.20241112064056-c0dad5a05e03 h1:MlgUHc+caxiEUeFz0xSrb04tQI7WcpxDPv8yPMXi19g= +github.com/cloudwego/kitex v0.11.3-0.20241112064056-c0dad5a05e03/go.mod h1:/beJ9Uxjzxv+tOu8JR404PMG7pObWoTR3UtYeytXjug= +github.com/cloudwego/localsession v0.1.1 h1:tbK7laDVrYfFDXoBXo4uCGMAxU4qmz2dDm8d4BGBnDo= +github.com/cloudwego/localsession v0.1.1/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8= github.com/cloudwego/netpoll v0.6.5-0.20240911104114-8a1f5597a920 h1:WT7vsDDb+ammyB7XLmNSS4vKGpPvM2JDl6h34Jj7mY4= github.com/cloudwego/netpoll v0.6.5-0.20240911104114-8a1f5597a920/go.mod h1:BtM+GjKTdwKoC8IOzD08/+8eEn2gYoiNLipFca6BVXQ= github.com/cloudwego/runtimex v0.1.0 h1:HG+WxWoj5/CDChDZ7D99ROwvSMkuNXAqt6hnhTTZDiI= github.com/cloudwego/runtimex v0.1.0/go.mod h1:23vL/HGV0W8nSCHbe084AgEBdDV4rvXenEUMnUNvUd8= -github.com/cloudwego/thriftgo v0.3.17 h1:k0iQe2jEAN1WhPsXWvatwHzoxObUSX2Nw5NqdnywS8k= -github.com/cloudwego/thriftgo v0.3.17/go.mod h1:AdLEJJVGW/ZJYvkkYAZf5SaJH+pA3OyC801WSwqcBwI= +github.com/cloudwego/thriftgo v0.3.18 h1:gnr1vz7G3RbwwCK9AMKHZf63VYGa7ene6WbI9VrBJSw= +github.com/cloudwego/thriftgo v0.3.18/go.mod h1:AdLEJJVGW/ZJYvkkYAZf5SaJH+pA3OyC801WSwqcBwI= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= diff --git a/streaming/kitex_tts_lconn/client/client.go b/streaming/kitex_tts_lconn/client/client.go index 6aaff54..38d3720 100644 --- a/streaming/kitex_tts_lconn/client/client.go +++ b/streaming/kitex_tts_lconn/client/client.go @@ -27,6 +27,7 @@ import ( "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo/streamserver" "github.com/cloudwego/kitex-benchmark/runner" + "github.com/cloudwego/kitex/client" "github.com/cloudwego/kitex/client/streamxclient" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/streamx" @@ -42,7 +43,7 @@ func NewKClient(opt *runner.Options) runner.Client { ) c, err := streamserver.NewClient( "test.echo.kitex", - streamxclient.WithHostPorts(opt.Address), + client.WithHostPorts(opt.Address), streamxclient.WithProvider(cp), ) if err != nil { @@ -53,7 +54,7 @@ func NewKClient(opt *runner.Options) runner.Client { streampool: &sync.Pool{ New: func() interface{} { ctx := metainfo.WithValue(context.Background(), "header", "hello") - stream, err := c.Echo(ctx) + _, stream, err := c.Echo(ctx) if err != nil { log.Printf("client new stream failed: %v", err) return nil diff --git a/streaming/kitex_tts_lconn/main.go b/streaming/kitex_tts_lconn/main.go index 1481e66..af8e16c 100644 --- a/streaming/kitex_tts_lconn/main.go +++ b/streaming/kitex_tts_lconn/main.go @@ -73,13 +73,10 @@ func main() { perf.ServeMonitor(fmt.Sprintf(":%d", port+10000)) }() - svr := server.NewServer( + svr := streamserver.NewServer( + new(StreamServerImpl), server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port}), ) - err := streamserver.RegisterService(svr, new(StreamServerImpl)) - if err != nil { - panic(err) - } if err := svr.Run(); err != nil { log.Println(err.Error()) } diff --git a/streaming/kitex_tts_mux/client/client.go b/streaming/kitex_tts_mux/client/client.go index b82088d..2db573a 100644 --- a/streaming/kitex_tts_mux/client/client.go +++ b/streaming/kitex_tts_mux/client/client.go @@ -27,7 +27,7 @@ import ( "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo/streamserver" "github.com/cloudwego/kitex-benchmark/runner" - "github.com/cloudwego/kitex/client/streamxclient" + "github.com/cloudwego/kitex/client" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/streamx" ) @@ -35,7 +35,10 @@ import ( func NewKClient(opt *runner.Options) runner.Client { klog.SetLevel(klog.LevelWarn) - c, err := streamserver.NewClient("test.echo.kitex", streamxclient.WithHostPorts(opt.Address)) + c, err := streamserver.NewClient( + "test.echo.kitex", + client.WithHostPorts(opt.Address), + ) if err != nil { log.Fatal(err) } @@ -44,7 +47,7 @@ func NewKClient(opt *runner.Options) runner.Client { streampool: &sync.Pool{ New: func() interface{} { ctx := metainfo.WithValue(context.Background(), "header", "hello") - stream, err := c.Echo(ctx) + _, stream, err := c.Echo(ctx) if err != nil { log.Printf("client new stream failed: %v", err) return nil diff --git a/streaming/kitex_tts_mux/main.go b/streaming/kitex_tts_mux/main.go index f8729db..e931a7b 100644 --- a/streaming/kitex_tts_mux/main.go +++ b/streaming/kitex_tts_mux/main.go @@ -73,13 +73,10 @@ func main() { perf.ServeMonitor(fmt.Sprintf(":%d", port+10000)) }() - svr := server.NewServer( + svr := streamserver.NewServer( + new(StreamServerImpl), server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port}), ) - err := streamserver.RegisterService(svr, new(StreamServerImpl)) - if err != nil { - panic(err) - } if err := svr.Run(); err != nil { log.Println(err.Error()) }