From 4e3bb0529a74a8b8feba10b85e8828f72bf6ac57 Mon Sep 17 00:00:00 2001 From: Zhuowei Wang Date: Thu, 21 Nov 2024 15:24:58 +0800 Subject: [PATCH] update gen code --- codec/thrift/gen_code.sh | 2 +- codec/thrift/kitex_gen/echo/echo.go | 14 +- .../kitex_gen/echo/echoserver/client.go | 2 +- .../kitex_gen/echo/echoserver/echoserver.go | 4 +- .../kitex_gen/echo/echoserver/server.go | 2 +- codec/thrift/kitex_gen/echo/k-echo.go | 50 +++---- .../kitex_gen/echo/streamserver/client.go | 41 +++--- .../kitex_gen/echo/streamserver/server.go | 23 ++- .../echo/streamserver/streamserver.go | 136 +++++++++++++++--- go.mod | 9 +- go.sum | 17 ++- streaming/kitex_tts_lconn/client/client.go | 2 +- streaming/kitex_tts_lconn/main.go | 10 +- streaming/kitex_tts_mux/client/client.go | 2 +- streaming/kitex_tts_mux/main.go | 10 +- 15 files changed, 217 insertions(+), 107 deletions(-) diff --git a/codec/thrift/gen_code.sh b/codec/thrift/gen_code.sh index 6daf9b4..7c170df 100755 --- a/codec/thrift/gen_code.sh +++ b/codec/thrift/gen_code.sh @@ -1 +1 @@ -kitex -stream-v2 -module github.com/cloudwego/kitex-benchmark ./echo.thrift +kitex -streamx -module github.com/cloudwego/kitex-benchmark ./echo.thrift diff --git a/codec/thrift/kitex_gen/echo/echo.go b/codec/thrift/kitex_gen/echo/echo.go index 2f089df..f29a959 100644 --- a/codec/thrift/kitex_gen/echo/echo.go +++ b/codec/thrift/kitex_gen/echo/echo.go @@ -1,4 +1,4 @@ -// Code generated by thriftgo (0.3.17). DO NOT EDIT. +// Code generated by thriftgo (0.3.18). DO NOT EDIT. package echo @@ -144,6 +144,7 @@ func (p *Request) ReadField2(iprot thrift.TProtocol) error { } func (p *Request) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("Request"); err != nil { goto WriteStructBeginError @@ -381,6 +382,7 @@ func (p *Response) ReadField2(iprot thrift.TProtocol) error { } func (p *Response) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("Response"); err != nil { goto WriteStructBeginError @@ -621,6 +623,7 @@ func (p *SubMessage) ReadField2(iprot thrift.TProtocol) error { } func (p *SubMessage) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("SubMessage"); err != nil { goto WriteStructBeginError @@ -924,6 +927,7 @@ func (p *Message) ReadField3(iprot thrift.TProtocol) error { } func (p *Message) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("Message"); err != nil { goto WriteStructBeginError @@ -1449,6 +1453,7 @@ func (p *ComplexRequest) ReadField7(iprot thrift.TProtocol) error { } func (p *ComplexRequest) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("ComplexRequest"); err != nil { goto WriteStructBeginError @@ -2097,6 +2102,7 @@ func (p *ComplexResponse) ReadField6(iprot thrift.TProtocol) error { } func (p *ComplexResponse) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("ComplexResponse"); err != nil { goto WriteStructBeginError @@ -2498,6 +2504,7 @@ func (p *EchoServerEchoArgs) ReadField1(iprot thrift.TProtocol) error { } func (p *EchoServerEchoArgs) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("Echo_args"); err != nil { goto WriteStructBeginError @@ -2667,6 +2674,7 @@ func (p *EchoServerEchoResult) ReadField0(iprot thrift.TProtocol) error { } func (p *EchoServerEchoResult) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("Echo_result"); err != nil { goto WriteStructBeginError @@ -2838,6 +2846,7 @@ func (p *EchoServerEchoComplexArgs) ReadField1(iprot thrift.TProtocol) error { } func (p *EchoServerEchoComplexArgs) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("EchoComplex_args"); err != nil { goto WriteStructBeginError @@ -3007,6 +3016,7 @@ func (p *EchoServerEchoComplexResult) ReadField0(iprot thrift.TProtocol) error { } func (p *EchoServerEchoComplexResult) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("EchoComplex_result"); err != nil { goto WriteStructBeginError @@ -3178,6 +3188,7 @@ func (p *StreamServerEchoArgs) ReadField1(iprot thrift.TProtocol) error { } func (p *StreamServerEchoArgs) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("Echo_args"); err != nil { goto WriteStructBeginError @@ -3347,6 +3358,7 @@ func (p *StreamServerEchoResult) ReadField0(iprot thrift.TProtocol) error { } func (p *StreamServerEchoResult) Write(oprot thrift.TProtocol) (err error) { + var fieldId int16 if err = oprot.WriteStructBegin("Echo_result"); err != nil { goto WriteStructBeginError diff --git a/codec/thrift/kitex_gen/echo/echoserver/client.go b/codec/thrift/kitex_gen/echo/echoserver/client.go index 736266d..12d19ac 100644 --- a/codec/thrift/kitex_gen/echo/echoserver/client.go +++ b/codec/thrift/kitex_gen/echo/echoserver/client.go @@ -1,4 +1,4 @@ -// Code generated by Kitex v0.11.0. DO NOT EDIT. +// Code generated by Kitex v0.11.3. DO NOT EDIT. package echoserver diff --git a/codec/thrift/kitex_gen/echo/echoserver/echoserver.go b/codec/thrift/kitex_gen/echo/echoserver/echoserver.go index 8a7486e..9b12fc0 100644 --- a/codec/thrift/kitex_gen/echo/echoserver/echoserver.go +++ b/codec/thrift/kitex_gen/echo/echoserver/echoserver.go @@ -1,4 +1,4 @@ -// Code generated by Kitex v0.11.0. DO NOT EDIT. +// Code generated by Kitex v0.11.3. DO NOT EDIT. package echoserver @@ -87,7 +87,7 @@ func newServiceInfo(hasStreaming bool, keepStreamingMethods bool, keepNonStreami HandlerType: handlerType, Methods: methods, PayloadCodec: kitex.Thrift, - KiteXGenVersion: "v0.11.0", + KiteXGenVersion: "v0.11.3", Extra: extra, } return svcInfo diff --git a/codec/thrift/kitex_gen/echo/echoserver/server.go b/codec/thrift/kitex_gen/echo/echoserver/server.go index df7063b..c95efce 100644 --- a/codec/thrift/kitex_gen/echo/echoserver/server.go +++ b/codec/thrift/kitex_gen/echo/echoserver/server.go @@ -1,4 +1,4 @@ -// Code generated by Kitex v0.11.0. DO NOT EDIT. +// Code generated by Kitex v0.11.3. DO NOT EDIT. package echoserver import ( diff --git a/codec/thrift/kitex_gen/echo/k-echo.go b/codec/thrift/kitex_gen/echo/k-echo.go index faca8bc..dfb2711 100644 --- a/codec/thrift/kitex_gen/echo/k-echo.go +++ b/codec/thrift/kitex_gen/echo/k-echo.go @@ -1,4 +1,4 @@ -// Code generated by Kitex v0.11.0. DO NOT EDIT. +// Code generated by Kitex v0.11.3. DO NOT EDIT. package echo @@ -21,6 +21,7 @@ var ( ) func (p *Request) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -125,9 +126,8 @@ func (p *Request) FastReadField2(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *Request) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *Request) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -179,6 +179,7 @@ func (p *Request) field2Length() int { } func (p *Response) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -283,9 +284,8 @@ func (p *Response) FastReadField2(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *Response) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *Response) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -337,6 +337,7 @@ func (p *Response) field2Length() int { } func (p *SubMessage) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -426,9 +427,8 @@ func (p *SubMessage) FastReadField2(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *SubMessage) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *SubMessage) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -488,6 +488,7 @@ func (p *SubMessage) field2Length() int { } func (p *Message) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -616,9 +617,8 @@ func (p *Message) FastReadField3(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *Message) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *Message) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -709,6 +709,7 @@ func (p *Message) field3Length() int { } func (p *ComplexRequest) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -1013,9 +1014,8 @@ func (p *ComplexRequest) FastReadField7(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *ComplexRequest) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *ComplexRequest) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -1217,6 +1217,7 @@ func (p *ComplexRequest) field7Length() int { } func (p *ComplexResponse) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -1493,9 +1494,8 @@ func (p *ComplexResponse) FastReadField6(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *ComplexResponse) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *ComplexResponse) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -1677,6 +1677,7 @@ func (p *ComplexResponse) field6Length() int { } func (p *EchoServerEchoArgs) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -1736,9 +1737,8 @@ func (p *EchoServerEchoArgs) FastReadField1(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *EchoServerEchoArgs) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *EchoServerEchoArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -1774,6 +1774,7 @@ func (p *EchoServerEchoArgs) field1Length() int { } func (p *EchoServerEchoResult) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -1833,9 +1834,8 @@ func (p *EchoServerEchoResult) FastReadField0(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *EchoServerEchoResult) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *EchoServerEchoResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -1875,6 +1875,7 @@ func (p *EchoServerEchoResult) field0Length() int { } func (p *EchoServerEchoComplexArgs) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -1934,9 +1935,8 @@ func (p *EchoServerEchoComplexArgs) FastReadField1(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *EchoServerEchoComplexArgs) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *EchoServerEchoComplexArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -1972,6 +1972,7 @@ func (p *EchoServerEchoComplexArgs) field1Length() int { } func (p *EchoServerEchoComplexResult) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -2031,9 +2032,8 @@ func (p *EchoServerEchoComplexResult) FastReadField0(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *EchoServerEchoComplexResult) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *EchoServerEchoComplexResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -2073,6 +2073,7 @@ func (p *EchoServerEchoComplexResult) field0Length() int { } func (p *StreamServerEchoArgs) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -2132,9 +2133,8 @@ func (p *StreamServerEchoArgs) FastReadField1(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *StreamServerEchoArgs) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *StreamServerEchoArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { @@ -2170,6 +2170,7 @@ func (p *StreamServerEchoArgs) field1Length() int { } func (p *StreamServerEchoResult) FastRead(buf []byte) (int, error) { + var err error var offset int var l int @@ -2229,9 +2230,8 @@ func (p *StreamServerEchoResult) FastReadField0(buf []byte) (int, error) { return offset, nil } -// for compatibility func (p *StreamServerEchoResult) FastWrite(buf []byte) int { - return 0 + return p.FastWriteNocopy(buf, nil) } func (p *StreamServerEchoResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { diff --git a/codec/thrift/kitex_gen/echo/streamserver/client.go b/codec/thrift/kitex_gen/echo/streamserver/client.go index 4fa8173..3ae78e3 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/client.go +++ b/codec/thrift/kitex_gen/echo/streamserver/client.go @@ -1,48 +1,49 @@ -// Code generated by Kitex v0.11.0. DO NOT EDIT. +// Code generated by Kitex v0.11.3. DO NOT EDIT. package streamserver import ( "context" - echo "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" client "github.com/cloudwego/kitex/client" - "github.com/cloudwego/kitex/client/streamxclient" "github.com/cloudwego/kitex/client/streamxclient/streamxcallopt" - "github.com/cloudwego/kitex/pkg/serviceinfo" "github.com/cloudwego/kitex/pkg/streamx" - "github.com/cloudwego/kitex/pkg/streamx/provider/ttstream" ) +// Client is designed to provide IDL-compatible methods with call-option parameter for kitex framework. type Client interface { Echo(ctx context.Context, callOptions ...streamxcallopt.CallOption) (context.Context, streamx.BidiStreamingClient[echo.Request, echo.Response], error) } +// NewClient creates a client for the service defined in IDL. func NewClient(destService string, opts ...client.Option) (Client, error) { var options []client.Option options = append(options, client.WithDestService(destService)) - cp, err := ttstream.NewClientProvider(ServiceInfo) - if err != nil { - return nil, err - } - options = append(options, streamxclient.WithProvider(cp)) + options = append(options, opts...) - cli, err := client.NewClient(ServiceInfo, options...) + + kc, err := client.NewClient(serviceInfoForClient(), options...) if err != nil { return nil, err } - kc := &kClient{streamer: cli.(client.StreamX), caller: cli.(client.Client)} - return kc, nil + return &kStreamServerClient{ + kClient: newServiceClient(kc), + }, nil } -var _ Client = (*kClient)(nil) +// MustNewClient creates a client for the service defined in IDL. It panics if any error occurs. +func MustNewClient(destService string, opts ...client.Option) Client { + kc, err := NewClient(destService, opts...) + if err != nil { + panic(err) + } + return kc +} -type kClient struct { - caller client.Client - streamer client.StreamX +type kStreamServerClient struct { + *kClient } -func (c *kClient) Echo(ctx context.Context, callOptions ...streamxcallopt.CallOption) (context.Context, streamx.BidiStreamingClient[echo.Request, echo.Response], error) { - return streamxclient.InvokeStream[echo.Request, echo.Response]( - ctx, c.streamer, serviceinfo.StreamingBidirectional, "Echo", nil, nil, callOptions...) +func (p *kStreamServerClient) Echo(ctx context.Context, callOptions ...streamxcallopt.CallOption) (context.Context, streamx.BidiStreamingClient[echo.Request, echo.Response], error) { + return p.kClient.Echo(ctx, callOptions...) } diff --git a/codec/thrift/kitex_gen/echo/streamserver/server.go b/codec/thrift/kitex_gen/echo/streamserver/server.go index 2ddeb66..f770b09 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/server.go +++ b/codec/thrift/kitex_gen/echo/streamserver/server.go @@ -1,36 +1,31 @@ -// Code generated by Kitex v0.11.0. DO NOT EDIT. +// Code generated by Kitex v0.11.3. DO NOT EDIT. package streamserver import ( "context" - echo "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" "github.com/cloudwego/kitex/pkg/streamx" - "github.com/cloudwego/kitex/pkg/streamx/provider/ttstream" server "github.com/cloudwego/kitex/server" - "github.com/cloudwego/kitex/server/streamxserver" ) -type Server interface { +type StreamServer interface { Echo(ctx context.Context, stream streamx.BidiStreamingServer[echo.Request, echo.Response]) error } -func RegisterService(svr server.Server, handler Server, opts ...server.RegisterOption) error { - return svr.RegisterService(ServiceInfo, handler, opts...) -} - -func NewServer(handler Server, opts ...server.Option) server.Server { +// NewServer creates a server.Server with the given handler and options. +func NewServer(handler StreamServer, opts ...server.Option) server.Server { var options []server.Option - sp, _ := ttstream.NewServerProvider(ServiceInfo) - options = append(options, streamxserver.WithProvider(sp)) - options = append(options, opts...) options = append(options, server.WithCompatibleMiddlewareForUnary()) svr := server.NewServer(options...) - if err := svr.RegisterService(ServiceInfo, handler); err != nil { + if err := svr.RegisterService(serviceInfo(), handler); err != nil { panic(err) } return svr } + +func RegisterService(svr server.Server, handler StreamServer, opts ...server.RegisterOption) error { + return svr.RegisterService(serviceInfo(), handler, opts...) +} diff --git a/codec/thrift/kitex_gen/echo/streamserver/streamserver.go b/codec/thrift/kitex_gen/echo/streamserver/streamserver.go index 8891dd0..0df5cba 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/streamserver.go +++ b/codec/thrift/kitex_gen/echo/streamserver/streamserver.go @@ -1,32 +1,124 @@ -// Code generated by Kitex v0.11.0. DO NOT EDIT. +// Code generated by Kitex v0.11.3. DO NOT EDIT. + package streamserver import ( "context" - + "errors" echo "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" - "github.com/cloudwego/kitex/pkg/serviceinfo" + client "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/client/streamxclient" + "github.com/cloudwego/kitex/client/streamxclient/streamxcallopt" + kitex "github.com/cloudwego/kitex/pkg/serviceinfo" "github.com/cloudwego/kitex/pkg/streamx" "github.com/cloudwego/kitex/server/streamxserver" ) -var ServiceInfo = &serviceinfo.ServiceInfo{ - ServiceName: "StreamServer", - Methods: map[string]serviceinfo.MethodInfo{ - "Echo": serviceinfo.NewMethodInfo( - func(ctx context.Context, handler, reqArgs, resArgs interface{}) error { - return streamxserver.InvokeBidiStreamHandler[echo.Request, echo.Response]( - ctx, reqArgs.(streamx.StreamReqArgs), resArgs.(streamx.StreamResArgs), - func(ctx context.Context, stream streamx.BidiStreamingServer[echo.Request, echo.Response]) error { - return handler.(Server).Echo(ctx, stream) - }, - ) - }, - nil, - nil, - false, - serviceinfo.WithStreamingMode(serviceinfo.StreamingBidirectional), - serviceinfo.WithMethodExtra("streamx", "true"), - ), - }, +var errInvalidMessageType = errors.New("invalid message type for service method handler") + +var serviceMethods = map[string]kitex.MethodInfo{ + "Echo": kitex.NewMethodInfo( + echoHandler, + newStreamServerEchoArgs, + newStreamServerEchoResult, + false, + kitex.WithStreamingMode(kitex.StreamingBidirectional), + kitex.WithMethodExtra("streamx", "true"), + ), +} + +var ( + streamServerServiceInfo = NewServiceInfo() + streamServerServiceInfoForClient = NewServiceInfoForClient() + streamServerServiceInfoForStreamClient = NewServiceInfoForStreamClient() +) + +// for server +func serviceInfo() *kitex.ServiceInfo { + return streamServerServiceInfo +} + +// for stream client +func serviceInfoForStreamClient() *kitex.ServiceInfo { + return streamServerServiceInfoForStreamClient +} + +// for client +func serviceInfoForClient() *kitex.ServiceInfo { + return streamServerServiceInfoForClient +} + +// NewServiceInfo creates a new ServiceInfo containing all methods +func NewServiceInfo() *kitex.ServiceInfo { + return newServiceInfo(true, true, true) +} + +// NewServiceInfo creates a new ServiceInfo containing non-streaming methods +func NewServiceInfoForClient() *kitex.ServiceInfo { + return newServiceInfo(false, false, true) +} +func NewServiceInfoForStreamClient() *kitex.ServiceInfo { + return newServiceInfo(true, true, false) +} + +func newServiceInfo(hasStreaming bool, keepStreamingMethods bool, keepNonStreamingMethods bool) *kitex.ServiceInfo { + serviceName := "StreamServer" + handlerType := (*echo.StreamServer)(nil) + methods := map[string]kitex.MethodInfo{} + for name, m := range serviceMethods { + if m.IsStreaming() && !keepStreamingMethods { + continue + } + if !m.IsStreaming() && !keepNonStreamingMethods { + continue + } + methods[name] = m + } + extra := map[string]interface{}{ + "PackageName": "echo", + } + if hasStreaming { + extra["streaming"] = hasStreaming + } + svcInfo := &kitex.ServiceInfo{ + ServiceName: serviceName, + HandlerType: handlerType, + Methods: methods, + PayloadCodec: kitex.Thrift, + KiteXGenVersion: "v0.11.3", + Extra: extra, + } + return svcInfo +} + +func echoHandler(ctx context.Context, handler interface{}, arg, result interface{}) error { + return streamxserver.InvokeBidiStreamHandler[echo.Request, echo.Response]( + ctx, arg.(streamx.StreamReqArgs), result.(streamx.StreamResArgs), func(ctx context.Context, stream streamx.BidiStreamingServer[echo.Request, echo.Response]) error { + return handler.(StreamServer).Echo(ctx, stream) + }, + ) +} +func newStreamServerEchoArgs() interface{} { + return echo.NewStreamServerEchoArgs() +} + +func newStreamServerEchoResult() interface{} { + return echo.NewStreamServerEchoResult() +} + +type kClient struct { + c client.Client +} + +func newServiceClient(c client.Client) *kClient { + return &kClient{ + c: c, + } +} + +func (p *kClient) Echo(ctx context.Context, callOptions ...streamxcallopt.CallOption) ( + context.Context, streamx.BidiStreamingClient[echo.Request, echo.Response], error, +) { + return streamxclient.InvokeStream[echo.Request, echo.Response]( + ctx, p.c, kitex.StreamingBidirectional, "Echo", nil, nil, callOptions...) } diff --git a/go.mod b/go.mod index 0a3fa5c..bd38ff0 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,10 @@ require ( github.com/bytedance/gopkg v0.1.1 github.com/cloudfoundry/gosigar v1.3.3 github.com/cloudwego/fastpb v0.0.5 - github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682 - github.com/cloudwego/kitex v0.11.3-0.20241112064056-c0dad5a05e03 + github.com/cloudwego/gopkg v0.1.3-0.20241118053554-db5d7d475e7e + github.com/cloudwego/kitex v0.11.3-0.20241119060422-2a368b5a48ba github.com/cloudwego/kitex-tests v0.1.0 + github.com/cloudwego/kitex/pkg/protocol/bthrift v0.0.0-20241120040446-fa081214728e github.com/gogo/protobuf v1.3.2 github.com/juju/ratelimit v1.0.1 github.com/lesismal/arpc v1.2.4 @@ -30,7 +31,7 @@ require ( github.com/cheekybits/genny v1.0.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/configmanager v0.2.2 // indirect - github.com/cloudwego/dynamicgo v0.4.4 // indirect + github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128 // indirect github.com/cloudwego/frugal v0.2.0 // indirect github.com/cloudwego/iasm v0.2.0 // indirect github.com/cloudwego/localsession v0.1.1 // indirect @@ -65,7 +66,6 @@ require ( github.com/hashicorp/serf v0.9.5 // indirect github.com/iancoleman/strcase v0.2.0 // indirect github.com/jhump/protoreflect v1.8.2 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/kavu/go_reuseport v1.5.0 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect @@ -78,7 +78,6 @@ require ( github.com/miekg/dns v1.1.26 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nxadm/tail v1.4.8 // indirect diff --git a/go.sum b/go.sum index 2dbc981..e85c4e3 100644 --- a/go.sum +++ b/go.sum @@ -61,20 +61,22 @@ github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/ github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/configmanager v0.2.2 h1:sVrJB8gWYTlPV2OS3wcgJSO9F2/9Zbkmcm1Z7jempOU= github.com/cloudwego/configmanager v0.2.2/go.mod h1:ppiyU+5TPLonE8qMVi/pFQk2eL3Q4P7d4hbiNJn6jwI= -github.com/cloudwego/dynamicgo v0.4.4 h1:RuHhjy44Ajy2PLjrwOhI9EY874t9srhgwd/rkKTUKfQ= -github.com/cloudwego/dynamicgo v0.4.4/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY= +github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128 h1:StnQNfU+fb3PavTNydupCP/8Ges3/DDYRjL8HmLwOMI= +github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY= github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZU= github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk= github.com/cloudwego/frugal v0.2.0 h1:0ETSzQYoYqVvdl7EKjqJ9aJnDoG6TzvNKV3PMQiQTS8= github.com/cloudwego/frugal v0.2.0/go.mod h1:cpnV6kdRMjN3ylxRo63RNbZ9rBK6oxs70Zk6QZ4Enj4= -github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682 h1:hj/AhlEngERp5Tjt864veEvyK6RglXKcXpxkIOSRfug= -github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw= +github.com/cloudwego/gopkg v0.1.3-0.20241118053554-db5d7d475e7e h1:fRZIRv5bgpF/9TBlQMYwryV6d2BXDLw2MEghdzFecXY= +github.com/cloudwego/gopkg v0.1.3-0.20241118053554-db5d7d475e7e/go.mod h1:FQuXsRWRsSqJLsMVd5SYzp8/Z1y5gXKnVvRrWUOsCMI= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= -github.com/cloudwego/kitex v0.11.3-0.20241112064056-c0dad5a05e03 h1:MlgUHc+caxiEUeFz0xSrb04tQI7WcpxDPv8yPMXi19g= -github.com/cloudwego/kitex v0.11.3-0.20241112064056-c0dad5a05e03/go.mod h1:/beJ9Uxjzxv+tOu8JR404PMG7pObWoTR3UtYeytXjug= +github.com/cloudwego/kitex v0.11.3-0.20241119060422-2a368b5a48ba h1:YHhZN8ngQ07r4Z3Syh5LC/60+WJdu35akQrtkjzLe0g= +github.com/cloudwego/kitex v0.11.3-0.20241119060422-2a368b5a48ba/go.mod h1:FJ/tsHnA2RYH+U4TvXdlh4ybVkEnI1hf4wppyTS+r08= github.com/cloudwego/kitex-tests v0.1.0 h1:ot96dU4j0mUkav1C+k7kfZgyC/yCSMYDCZwMnKxA4C8= github.com/cloudwego/kitex-tests v0.1.0/go.mod h1:tu/EAaw3/UQq82ejp9n0DOyNE6GG6UGguMdWLdQPkRg= +github.com/cloudwego/kitex/pkg/protocol/bthrift v0.0.0-20241120040446-fa081214728e h1:LAQEoFCNYDZ+yvEuXBzKbw0mTSqAZM4AZ4mxK4Q1An0= +github.com/cloudwego/kitex/pkg/protocol/bthrift v0.0.0-20241120040446-fa081214728e/go.mod h1:OP63V8YwwSlPVFqHZblV3mJXLPIjcIdwkT6ZYjEggcI= github.com/cloudwego/localsession v0.1.1 h1:tbK7laDVrYfFDXoBXo4uCGMAxU4qmz2dDm8d4BGBnDo= github.com/cloudwego/localsession v0.1.1/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8= github.com/cloudwego/netpoll v0.6.5-0.20240911104114-8a1f5597a920 h1:WT7vsDDb+ammyB7XLmNSS4vKGpPvM2JDl6h34Jj7mY4= @@ -241,8 +243,6 @@ github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY= github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= @@ -316,7 +316,6 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 h1:uiS4zKYKJVj5F3ID+5iylfKPsEQmBEOucSD9Vgmn0i0= github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5/go.mod h1:I8AX+yW//L8Hshx6+a1m3bYkwXkpsVjA2795vP4f4oQ= diff --git a/streaming/kitex_tts_lconn/client/client.go b/streaming/kitex_tts_lconn/client/client.go index 38d3720..aec19ad 100644 --- a/streaming/kitex_tts_lconn/client/client.go +++ b/streaming/kitex_tts_lconn/client/client.go @@ -38,7 +38,7 @@ func NewKClient(opt *runner.Options) runner.Client { klog.SetLevel(klog.LevelWarn) cp, _ := ttstream.NewClientProvider( - streamserver.ServiceInfo, + streamserver.NewServiceInfo(), ttstream.WithClientLongConnPool(ttstream.DefaultLongConnConfig), ) c, err := streamserver.NewClient( diff --git a/streaming/kitex_tts_lconn/main.go b/streaming/kitex_tts_lconn/main.go index af8e16c..bef6f56 100644 --- a/streaming/kitex_tts_lconn/main.go +++ b/streaming/kitex_tts_lconn/main.go @@ -29,14 +29,16 @@ import ( "github.com/cloudwego/kitex-benchmark/perf" "github.com/cloudwego/kitex-benchmark/runner" "github.com/cloudwego/kitex/pkg/streamx" + "github.com/cloudwego/kitex/pkg/streamx/provider/ttstream" "github.com/cloudwego/kitex/server" + "github.com/cloudwego/kitex/server/streamxserver" ) const port = 8002 var ( - _ streamserver.Server = &StreamServerImpl{} - recorder = perf.NewRecorder("KITEX_TTS_LCONN@Server") + _ streamserver.StreamServer = &StreamServerImpl{} + recorder = perf.NewRecorder("KITEX_TTS_LCONN@Server") ) type StreamServerImpl struct{} @@ -73,9 +75,13 @@ func main() { perf.ServeMonitor(fmt.Sprintf(":%d", port+10000)) }() + sp, _ := ttstream.NewServerProvider( + streamserver.NewServiceInfo(), + ) svr := streamserver.NewServer( new(StreamServerImpl), server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port}), + streamxserver.WithProvider(sp), ) if err := svr.Run(); err != nil { log.Println(err.Error()) diff --git a/streaming/kitex_tts_mux/client/client.go b/streaming/kitex_tts_mux/client/client.go index 168db9d..18e156f 100644 --- a/streaming/kitex_tts_mux/client/client.go +++ b/streaming/kitex_tts_mux/client/client.go @@ -38,7 +38,7 @@ func NewKClient(opt *runner.Options) runner.Client { klog.SetLevel(klog.LevelWarn) cp, _ := ttstream.NewClientProvider( - streamserver.ServiceInfo, + streamserver.NewServiceInfo(), ttstream.WithClientMuxConnPool(ttstream.MuxConnConfig{PoolSize: 4}), ) c, err := streamserver.NewClient( diff --git a/streaming/kitex_tts_mux/main.go b/streaming/kitex_tts_mux/main.go index e931a7b..d4f7171 100644 --- a/streaming/kitex_tts_mux/main.go +++ b/streaming/kitex_tts_mux/main.go @@ -29,14 +29,16 @@ import ( "github.com/cloudwego/kitex-benchmark/perf" "github.com/cloudwego/kitex-benchmark/runner" "github.com/cloudwego/kitex/pkg/streamx" + "github.com/cloudwego/kitex/pkg/streamx/provider/ttstream" "github.com/cloudwego/kitex/server" + "github.com/cloudwego/kitex/server/streamxserver" ) const port = 8003 var ( - _ streamserver.Server = &StreamServerImpl{} - recorder = perf.NewRecorder("KITEX_TTS_MUX@Server") + _ streamserver.StreamServer = &StreamServerImpl{} + recorder = perf.NewRecorder("KITEX_TTS_MUX@Server") ) type StreamServerImpl struct{} @@ -73,9 +75,13 @@ func main() { perf.ServeMonitor(fmt.Sprintf(":%d", port+10000)) }() + sp, _ := ttstream.NewServerProvider( + streamserver.NewServiceInfo(), + ) svr := streamserver.NewServer( new(StreamServerImpl), server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port}), + streamxserver.WithProvider(sp), ) if err := svr.Run(); err != nil { log.Println(err.Error())