diff --git a/pkg/rules/grpc/grpc_config.go b/pkg/rules/grpc/grpc_config.go index 8974e345..f97fc965 100644 --- a/pkg/rules/grpc/grpc_config.go +++ b/pkg/rules/grpc/grpc_config.go @@ -94,15 +94,19 @@ func (c *grpcOtelConfig) handleRPC(ctx context.Context, rs stats.RPCStats, isSer } } else { + methodName := "" + if gctx != nil { + methodName = gctx.methodName + } if isServer { grpcServerInstrument.End(ctx, grpcRequest{ - methodName: gctx.methodName, + methodName: methodName, }, grpcResponse{ statusCode: 200, }, nil) } else { grpcClientInstrument.End(ctx, grpcRequest{ - methodName: gctx.methodName, + methodName: methodName, }, grpcResponse{ statusCode: 200, }, nil) diff --git a/test/grpc/v1.44.0/grpc.pb.go b/test/grpc/v1.44.0/grpc.pb.go index e95f8f48..92bc71cf 100644 --- a/test/grpc/v1.44.0/grpc.pb.go +++ b/test/grpc/v1.44.0/grpc.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.33.0 -// protoc v5.28.2 +// protoc v5.28.3 // source: grpc.proto package main @@ -135,9 +135,11 @@ var file_grpc_proto_rawDesc = []byte{ 0x52, 0x65, 0x71, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x20, 0x0a, 0x04, 0x52, 0x65, 0x73, 0x70, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x23, 0x0a, 0x09, 0x48, + 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x43, 0x0a, 0x09, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x47, 0x72, 0x70, 0x63, 0x12, 0x16, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x04, 0x2e, 0x52, 0x65, 0x71, 0x1a, 0x05, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, + 0x12, 0x1e, 0x0a, 0x0b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, + 0x04, 0x2e, 0x52, 0x65, 0x71, 0x1a, 0x05, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } @@ -161,9 +163,11 @@ var file_grpc_proto_goTypes = []interface{}{ } var file_grpc_proto_depIdxs = []int32{ 0, // 0: HelloGrpc.Hello:input_type -> Req - 1, // 1: HelloGrpc.Hello:output_type -> Resp - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type + 0, // 1: HelloGrpc.StreamHello:input_type -> Req + 1, // 2: HelloGrpc.Hello:output_type -> Resp + 1, // 3: HelloGrpc.StreamHello:output_type -> Resp + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/test/grpc/v1.44.0/grpc.proto b/test/grpc/v1.44.0/grpc.proto index d5202d46..0aaf5454 100644 --- a/test/grpc/v1.44.0/grpc.proto +++ b/test/grpc/v1.44.0/grpc.proto @@ -4,6 +4,7 @@ option go_package = "/pkgs"; service HelloGrpc { rpc Hello(Req) returns (Resp) {} + rpc StreamHello(Req) returns (stream Resp) {} } message Req { diff --git a/test/grpc/v1.44.0/grpc_common.go b/test/grpc/v1.44.0/grpc_common.go index 6ef25a2c..2e4956d3 100644 --- a/test/grpc/v1.44.0/grpc_common.go +++ b/test/grpc/v1.44.0/grpc_common.go @@ -19,6 +19,7 @@ import ( "errors" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "io" "log" "net" ) @@ -34,6 +35,19 @@ func (s *service) Hello(ctx context.Context, req *Req) (*Resp, error) { return &Resp{Message: "Hello Gprc"}, nil } +func (s *service) StreamHello(req *Req, res HelloGrpc_StreamHelloServer) error { + if req.Error { + return errors.New("error Grpc") + } + for i := 0; i < 2; i++ { + err := res.Send(&Resp{Message: "Hello Gprc"}) + if err != nil { + return err + } + } + return nil +} + func setupGRPC() { lis, err := net.Listen("tcp", "0.0.0.0:9003") if err != nil { @@ -74,3 +88,25 @@ func sendErrReq(ctx context.Context) string { } return "error resp" } + +func sendStreamReq(ctx context.Context) string { + conn, err := grpc.NewClient("localhost:9003", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + panic(err) + } + c := NewHelloGrpcClient(conn) + stream, err := c.StreamHello(ctx, &Req{}) + if err != nil { + panic(err) + } + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + panic(err) + } + } + return "stream success" +} diff --git a/test/grpc/v1.44.0/grpc_grpc.pb.go b/test/grpc/v1.44.0/grpc_grpc.pb.go index e0724edf..f0156786 100644 --- a/test/grpc/v1.44.0/grpc_grpc.pb.go +++ b/test/grpc/v1.44.0/grpc_grpc.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v3.21.12 +// - protoc v5.28.3 // source: grpc.proto package main @@ -33,7 +33,8 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - HelloGrpc_Hello_FullMethodName = "/HelloGrpc/Hello" + HelloGrpc_Hello_FullMethodName = "/HelloGrpc/Hello" + HelloGrpc_StreamHello_FullMethodName = "/HelloGrpc/StreamHello" ) // HelloGrpcClient is the client API for HelloGrpc service. @@ -41,6 +42,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type HelloGrpcClient interface { Hello(ctx context.Context, in *Req, opts ...grpc.CallOption) (*Resp, error) + StreamHello(ctx context.Context, in *Req, opts ...grpc.CallOption) (HelloGrpc_StreamHelloClient, error) } type helloGrpcClient struct { @@ -60,11 +62,44 @@ func (c *helloGrpcClient) Hello(ctx context.Context, in *Req, opts ...grpc.CallO return out, nil } +func (c *helloGrpcClient) StreamHello(ctx context.Context, in *Req, opts ...grpc.CallOption) (HelloGrpc_StreamHelloClient, error) { + stream, err := c.cc.NewStream(ctx, &HelloGrpc_ServiceDesc.Streams[0], HelloGrpc_StreamHello_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &helloGrpcStreamHelloClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type HelloGrpc_StreamHelloClient interface { + Recv() (*Resp, error) + grpc.ClientStream +} + +type helloGrpcStreamHelloClient struct { + grpc.ClientStream +} + +func (x *helloGrpcStreamHelloClient) Recv() (*Resp, error) { + m := new(Resp) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // HelloGrpcServer is the server API for HelloGrpc service. // All implementations must embed UnimplementedHelloGrpcServer // for forward compatibility type HelloGrpcServer interface { Hello(context.Context, *Req) (*Resp, error) + StreamHello(*Req, HelloGrpc_StreamHelloServer) error mustEmbedUnimplementedHelloGrpcServer() } @@ -75,6 +110,9 @@ type UnimplementedHelloGrpcServer struct { func (UnimplementedHelloGrpcServer) Hello(context.Context, *Req) (*Resp, error) { return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented") } +func (UnimplementedHelloGrpcServer) StreamHello(*Req, HelloGrpc_StreamHelloServer) error { + return status.Errorf(codes.Unimplemented, "method StreamHello not implemented") +} func (UnimplementedHelloGrpcServer) mustEmbedUnimplementedHelloGrpcServer() {} // UnsafeHelloGrpcServer may be embedded to opt out of forward compatibility for this service. @@ -106,6 +144,27 @@ func _HelloGrpc_Hello_Handler(srv interface{}, ctx context.Context, dec func(int return interceptor(ctx, in, info, handler) } +func _HelloGrpc_StreamHello_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Req) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(HelloGrpcServer).StreamHello(m, &helloGrpcStreamHelloServer{stream}) +} + +type HelloGrpc_StreamHelloServer interface { + Send(*Resp) error + grpc.ServerStream +} + +type helloGrpcStreamHelloServer struct { + grpc.ServerStream +} + +func (x *helloGrpcStreamHelloServer) Send(m *Resp) error { + return x.ServerStream.SendMsg(m) +} + // HelloGrpc_ServiceDesc is the grpc.ServiceDesc for HelloGrpc service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -118,6 +177,12 @@ var HelloGrpc_ServiceDesc = grpc.ServiceDesc{ Handler: _HelloGrpc_Hello_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamHello", + Handler: _HelloGrpc_StreamHello_Handler, + ServerStreams: true, + }, + }, Metadata: "grpc.proto", } diff --git a/test/grpc/v1.44.0/test_grpc_client_stream.go b/test/grpc/v1.44.0/test_grpc_client_stream.go new file mode 100644 index 00000000..27e96a8b --- /dev/null +++ b/test/grpc/v1.44.0/test_grpc_client_stream.go @@ -0,0 +1,37 @@ +// Copyright (c) 2025 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "github.com/alibaba/opentelemetry-go-auto-instrumentation/test/verifier" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + "time" +) + +func main() { + // starter server + go setupGRPC() + time.Sleep(3 * time.Second) + // use a http client to request to the server + sendStreamReq(context.Background()) + // verify trace + verifier.WaitAndAssertTraces(func(stubs []tracetest.SpanStubs) { + // filter out client + verifier.Assert(len(stubs[0]) == 1, "Except client grpc system to be 1, got %d", len(stubs)) + verifier.Assert(stubs[0][0].SpanKind == trace.SpanKindServer, "Except client grpc system to be server, got %v", stubs[0][0].SpanKind) + }, 1) +} diff --git a/test/grpc_tests.go b/test/grpc_tests.go index b9ec0204..bfcf8d86 100644 --- a/test/grpc_tests.go +++ b/test/grpc_tests.go @@ -25,6 +25,7 @@ func init() { NewGeneralTestCase("grpc-fail-status-test", grpc_module_name, "v1.44.0", "", "1.21", "", TestGrpcStatus), NewLatestDepthTestCase("grpc-latest-depth", grpc_dependency_name, grpc_module_name, "v1.44.0", "v1.69.2", "1.21", "", TestBasicGrpc), NewMuzzleTestCase("grpc-muzzle", grpc_dependency_name, grpc_module_name, "v1.44.0", "", "1.21", "", []string{"go", "build", "test_grpc_basic.go", "grpc_common.go", "grpc.pb.go", "grpc_grpc.pb.go"}), + NewGeneralTestCase("grpc-client-stream-test", grpc_module_name, "v1.44.0", "", "1.21", "", TestGrpcClientStream), ) } @@ -41,3 +42,10 @@ func TestGrpcStatus(t *testing.T, env ...string) { env = append(env, "GOLANG_PROTOBUF_REGISTRATION_CONFLICT=warn") RunApp(t, "test_grpc_fail_status", env...) } + +func TestGrpcClientStream(t *testing.T, env ...string) { + UseApp("grpc/v1.44.0") + RunGoBuild(t, "go", "build", "test_grpc_client_stream.go", "grpc_common.go", "grpc.pb.go", "grpc_grpc.pb.go") + env = append(env, "GOLANG_PROTOBUF_REGISTRATION_CONFLICT=warn") + RunApp(t, "test_grpc_client_stream", env...) +}