diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index f2e185cc1e..0da71922b7 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -43,7 +43,7 @@ jobs: # 📚 https://git.io/JvXDl # ✏️ If the Autobuild fails above, remove it and uncomment the following three lines - # and modify them (or add more) to build your code if your project + # and modify them (or add more) to build your codes if your project # uses a compiled language #- run: | diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 975a2a7520..b00b30f857 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -31,7 +31,7 @@ jobs: go-version: ${{ matrix.go_version }} id: go - - name: Check out code into the Go module directory + - name: Check out codes into the Go module directory uses: actions/checkout@v2 - name: Cache dependencies diff --git a/filter/filter_impl/hystrix_filter_test.go b/filter/filter_impl/hystrix_filter_test.go index 4973ce7f70..8cbc242916 100644 --- a/filter/filter_impl/hystrix_filter_test.go +++ b/filter/filter_impl/hystrix_filter_test.go @@ -186,7 +186,7 @@ func TestHystricFilterInvokeCircuitBreak(t *testing.T) { resChan <- result }() } - //This can not always pass the test when on travis due to concurrency, you can uncomment the code below and test it locally + //This can not always pass the test when on travis due to concurrency, you can uncomment the codes below and test it locally //var lastRest bool //for i := 0; i < 50; i++ { @@ -215,7 +215,7 @@ func TestHystricFilterInvokeCircuitBreakOmitException(t *testing.T) { resChan <- result }() } - //This can not always pass the test when on travis due to concurrency, you can uncomment the code below and test it locally + //This can not always pass the test when on travis due to concurrency, you can uncomment the codes below and test it locally //time.Sleep(time.Second * 6) //var lastRest bool diff --git a/go.mod b/go.mod index 39f5e843b0..2a2ce21023 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( go.uber.org/atomic v1.6.0 go.uber.org/zap v1.16.0 golang.org/x/net v0.0.0-20200822124328-c89045814202 + google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1 google.golang.org/grpc v1.26.0 gopkg.in/yaml.v2 v2.2.8 k8s.io/api v0.16.9 diff --git a/metadata/service/service.go b/metadata/service/service.go index 1d90f8a516..62d0f7d590 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -41,7 +41,7 @@ type MetadataService interface { SubscribeURL(url *common.URL) (bool, error) // UnsubscribeURL will delete the subscribed url in metadata UnsubscribeURL(url *common.URL) error - // PublishServiceDefinition will generate the target url's code info + // PublishServiceDefinition will generate the target url's codes info PublishServiceDefinition(url *common.URL) error // GetExportedURLs will get the target exported url in metadata // the url should be unique diff --git a/protocol/dubbo/hessian2/const.go b/protocol/dubbo/hessian2/const.go index 74a00b601d..ddbfb661b3 100644 --- a/protocol/dubbo/hessian2/const.go +++ b/protocol/dubbo/hessian2/const.go @@ -184,7 +184,7 @@ const ( /** * the dubbo protocol header length is 16 Bytes. - * the first 2 Bytes is magic code '0xdabb' + * the first 2 Bytes is magic codes '0xdabb' * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag * the next 1 Bytes is response state. * the next 8 Bytes is package DI. diff --git a/protocol/dubbo/impl/const.go b/protocol/dubbo/impl/const.go index 70d8bae6ca..d3622d24ac 100644 --- a/protocol/dubbo/impl/const.go +++ b/protocol/dubbo/impl/const.go @@ -184,7 +184,7 @@ const ( /** * the dubbo protocol header length is 16 Bytes. - * the first 2 Bytes is magic code '0xdabb' + * the first 2 Bytes is magic codes '0xdabb' * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag * the next 1 Bytes is response state. * the next 8 Bytes is package DI. diff --git a/protocol/dubbo3/impl/header.go b/protocol/dubbo3/impl/header.go index e34f901fea..91054db6b4 100644 --- a/protocol/dubbo3/impl/header.go +++ b/protocol/dubbo3/impl/header.go @@ -47,6 +47,8 @@ type TripleHeader struct { TracingRPCID string TracingContext string ClusterInfo string + GrpcStatus string + GrpcMessage string } func (t *TripleHeader) GetMethod() string { @@ -65,6 +67,9 @@ func (t *TripleHeader) FieldToCtx() context.Context { ctx = context.WithValue(ctx, "tri-trace-rpcid", t.TracingRPCID) ctx = context.WithValue(ctx, "tri-trace-proto-bin", t.TracingContext) ctx = context.WithValue(ctx, "tri-unit-info", t.ClusterInfo) + ctx = context.WithValue(ctx, "grpc-status", t.GrpcStatus) + ctx = context.WithValue(ctx, "grpc-message", t.GrpcMessage) + return ctx } @@ -94,6 +99,9 @@ func (t TripleHeaderHandler) WriteHeaderField(url *common.URL, ctx context.Conte headerFields = append(headerFields, hpack.HeaderField{Name: "tri-trace-rpcid", Value: getCtxVaSave(ctx, "tri-trace-rpcid")}) headerFields = append(headerFields, hpack.HeaderField{Name: "tri-trace-proto-bin", Value: getCtxVaSave(ctx, "tri-trace-proto-bin")}) headerFields = append(headerFields, hpack.HeaderField{Name: "tri-unit-info", Value: getCtxVaSave(ctx, "tri-unit-info")}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: getCtxVaSave(ctx, "grpc-status")}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: getCtxVaSave(ctx, "grpc-message")}) + return headerFields } @@ -132,8 +140,10 @@ func (t TripleHeaderHandler) ReadFromH2MetaHeader(frame *http2.MetaHeadersFrame) tripleHeader.Method = f.Value // todo: usage of these part of fields needs to be discussed later //case "grpc-encoding": - //case "grpc-status": - //case "grpc-message": + case "grpc-status": + tripleHeader.GrpcStatus = f.Value + case "grpc-message": + tripleHeader.GrpcMessage = f.Value //case "grpc-status-details-bin": //case "grpc-timeout": //case ":status": diff --git a/protocol/dubbo3/protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go b/protocol/dubbo3/protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go index 9873654d44..649eb19961 100644 --- a/protocol/dubbo3/protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go +++ b/protocol/dubbo3/protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go @@ -28,13 +28,13 @@ import ( "github.com/golang/protobuf/protoc-gen-go/generator" ) -// generatedCodeVersion indicates a version of the generated code. -// It is incremented whenever an incompatibility between the generated code and -// the grpc package is introduced; the generated code references +// generatedCodeVersion indicates a version of the generated codes. +// It is incremented whenever an incompatibility between the generated codes and +// the grpc package is introduced; the generated codes references // a constant, grpc.SupportPackageIsVersionN (where N is generatedCodeVersion). const generatedCodeVersion = 4 -// Paths for packages used by code generated in this file, +// Paths for packages used by codes generated in this file, // relative to the import_prefix of the generator.Generator. const ( contextPkgPath = "context" @@ -58,7 +58,7 @@ func (g *dubboGrpc) Name() string { return "dubbo" } -// The names for packages imported in the generated code. +// The names for packages imported in the generated codes. // They may vary from the final path component of the import path // if the name is used by other packages. var ( @@ -86,7 +86,7 @@ func (g *dubboGrpc) typeName(str string) string { // P forwards to g.gen.P. func (g *dubboGrpc) P(args ...interface{}) { g.gen.P(args...) } -// Generate generates code for the services in the given file. +// Generate generates codes for the services in the given file. // be consistent with grpc plugin func (g *dubboGrpc) Generate(file *generator.FileDescriptor) { if len(file.FileDescriptorProto.Service) == 0 { @@ -117,7 +117,7 @@ func unexport(s string) string { return strings.ToLower(s[:1]) + s[1:] } // messages, fields, enums, and enum values. var deprecationComment = "// Deprecated: Do not use." -// generateService generates all the code for the named service. +// generateService generates all the codes for the named service. func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb.ServiceDescriptorProto, index int) { path := fmt.Sprintf("6,%d", index) // 6 means service. diff --git a/protocol/grpc/common_test.go b/protocol/grpc/common_test.go index b732283a9b..1672d9d3ce 100644 --- a/protocol/grpc/common_test.go +++ b/protocol/grpc/common_test.go @@ -57,7 +57,7 @@ func (g *greeterProvider) Reference() string { return "GrpcGreeterImpl" } -// code generated by greeter.go +// codes generated by greeter.go type greeterProviderBase struct { proxyImpl protocol.Invoker } diff --git a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go index a9f50e8287..40ac5951f3 100644 --- a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go +++ b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go @@ -28,13 +28,13 @@ import ( "github.com/golang/protobuf/protoc-gen-go/generator" ) -// generatedCodeVersion indicates a version of the generated code. -// It is incremented whenever an incompatibility between the generated code and -// the grpc package is introduced; the generated code references +// generatedCodeVersion indicates a version of the generated codes. +// It is incremented whenever an incompatibility between the generated codes and +// the grpc package is introduced; the generated codes references // a constant, grpc.SupportPackageIsVersionN (where N is generatedCodeVersion). const generatedCodeVersion = 4 -// Paths for packages used by code generated in this file, +// Paths for packages used by codes generated in this file, // relative to the import_prefix of the generator.Generator. const ( contextPkgPath = "context" @@ -58,7 +58,7 @@ func (g *dubboGrpc) Name() string { return "dubbo" } -// The names for packages imported in the generated code. +// The names for packages imported in the generated codes. // They may vary from the final path component of the import path // if the name is used by other packages. var ( @@ -86,7 +86,7 @@ func (g *dubboGrpc) typeName(str string) string { // P forwards to g.gen.P. func (g *dubboGrpc) P(args ...interface{}) { g.gen.P(args...) } -// Generate generates code for the services in the given file. +// Generate generates codes for the services in the given file. // be consistent with grpc plugin func (g *dubboGrpc) Generate(file *generator.FileDescriptor) { if len(file.FileDescriptorProto.Service) == 0 { @@ -116,7 +116,7 @@ func unexport(s string) string { return strings.ToLower(s[:1]) + s[1:] } // messages, fields, enums, and enum values. var deprecationComment = "// Deprecated: Do not use." -// generateService generates all the code for the named service. +// generateService generates all the codes for the named service. func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb.ServiceDescriptorProto, index int) { path := fmt.Sprintf("6,%d", index) // 6 means service. diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 4a9645e828..1867bca230 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -110,7 +110,7 @@ func TestHTTPClientCall(t *testing.T) { reply = &User{} err = client.Call(ctx, url, req, reply) assert.True(t, strings.Contains(err.Error(), "500 Internal Server Error")) - assert.True(t, strings.Contains(err.Error(), "\\\"result\\\":{},\\\"error\\\":{\\\"code\\\":-32000,\\\"message\\\":\\\"error\\\"}")) + assert.True(t, strings.Contains(err.Error(), "\\\"result\\\":{},\\\"error\\\":{\\\"codes\\\":-32000,\\\"message\\\":\\\"error\\\"}")) // call GetUser2 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ diff --git a/protocol/jsonrpc/json.go b/protocol/jsonrpc/json.go index 506c4c953b..e142406eab 100644 --- a/protocol/jsonrpc/json.go +++ b/protocol/jsonrpc/json.go @@ -59,7 +59,7 @@ const ( // Error response Error type Error struct { - Code int `json:"code"` + Code int `json:"codes"` Message string `json:"message"` Data interface{} `json:"data,omitempty"` } @@ -72,7 +72,7 @@ func (e *Error) Error() string { if retryErr != nil { msg = []byte("jsonrpc2.Error: json.Marshal failed") } - return fmt.Sprintf(`{"code":%d,"message":%s}`, -32001, string(msg)) + return fmt.Sprintf(`{"codes":%d,"message":%s}`, -32001, string(msg)) } return string(buf) } @@ -362,7 +362,7 @@ func (c *ServerCodec) ReadBody(x interface{}) error { return nil } -// NewError creates a error with @code and @message +// NewError creates a error with @codes and @message func NewError(code int, message string) *Error { return &Error{Code: code, Message: message} } diff --git a/protocol/jsonrpc/json_test.go b/protocol/jsonrpc/json_test.go index a3814e9ad5..d3dafad7eb 100644 --- a/protocol/jsonrpc/json_test.go +++ b/protocol/jsonrpc/json_test.go @@ -56,8 +56,8 @@ func TestJsonClientCodecRead(t *testing.T) { //error codec.pending[1] = "GetUser" - err = codec.Read([]byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"error\":{\"code\":-32000,\"message\":\"error\"}}\n"), rsp) - assert.EqualError(t, err, "{\"code\":-32000,\"message\":\"error\"}") + err = codec.Read([]byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"error\":{\"codes\":-32000,\"message\":\"error\"}}\n"), rsp) + assert.EqualError(t, err, "{\"codes\":-32000,\"message\":\"error\"}") } func TestServerCodecWrite(t *testing.T) { @@ -66,18 +66,18 @@ func TestServerCodecWrite(t *testing.T) { codec.req = serverRequest{Version: "1.0", Method: "GetUser", ID: &a} data, err := codec.Write("error", &TestData{Test: "test"}) assert.NoError(t, err) - assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"code\":-32000,\"message\":\"error\"}}\n", string(data)) + assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"codes\":-32000,\"message\":\"error\"}}\n", string(data)) - data, err = codec.Write("{\"code\":-32000,\"message\":\"error\"}", &TestData{Test: "test"}) + data, err = codec.Write("{\"codes\":-32000,\"message\":\"error\"}", &TestData{Test: "test"}) assert.NoError(t, err) - assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"code\":-32000,\"message\":\"error\"}}\n", string(data)) + assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"codes\":-32000,\"message\":\"error\"}}\n", string(data)) } func TestServerCodecRead(t *testing.T) { codec := newServerCodec() header := map[string]string{} err := codec.ReadHeader(header, []byte("{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n")) - assert.EqualError(t, err, "{\"code\":-32601,\"message\":\"Method not found\"}") + assert.EqualError(t, err, "{\"codes\":-32601,\"message\":\"Method not found\"}") header["HttpMethod"] = "POST" err = codec.ReadHeader(header, []byte("{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n")) diff --git a/remoting/dubbo3/codes/codes.go b/remoting/dubbo3/codes/codes.go new file mode 100644 index 0000000000..7816c04945 --- /dev/null +++ b/remoting/dubbo3/codes/codes.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codes + +import ( + "fmt" + "strconv" +) + +// A Code is an unsigned 32-bit error codes. +type Code uint32 + +const ( + // OK is returned on success. + OK Code = 0 + + // Canceled indicates the operation was canceled (typically by the caller). + Canceled Code = 1 + + // Unknown error. An example of where this error may be returned is + // if a Status value received from another address space belongs to + // an error-space that is not known in this address space. Also + // errors raised by APIs that do not return enough error information + // may be converted to this error. + Unknown Code = 2 + + + + // PermissionDenied indicates the caller does not have permission to + // execute the specified operation. It must not be used for rejections + // caused by exhausting some resource (use ResourceExhausted + // instead for those errors). It must not be + // used if the caller cannot be identified (use Unauthenticated + // instead for those errors). + PermissionDenied Code = 7 + + // ResourceExhausted indicates some resource has been exhausted, perhaps + // a per-user quota, or perhaps the entire file system is out of space. + ResourceExhausted Code = 8 + + + + // Unimplemented indicates operation is not implemented or not + // supported/enabled in this service. + // + Unimplemented Code = 12 + + // Internal errors. Means some invariants expected by underlying + // system has been broken. If you see one of these errors, + // something is very broken. + // + Internal Code = 13 + + // Unavailable indicates the service is currently unavailable. + // This is a most likely a transient condition and may be corrected + // by retrying with a backoff. Note that it is not always safe to retry + // non-idempotent operations. + // + // See litmus test above for deciding between FailedPrecondition, + // Aborted, and Unavailable. + // + Unavailable Code = 14 + + + // Unauthenticated indicates the request does not have valid + // authentication credentials for the operation. + // + Unauthenticated Code = 16 + + _maxCode = 17 +) + +var strToCode = map[string]Code{ + `"OK"`: OK, + `"CANCELLED"`:/* [sic] */ Canceled, + `"UNKNOWN"`: Unknown, + `"PERMISSION_DENIED"`: PermissionDenied, + `"RESOURCE_EXHAUSTED"`: ResourceExhausted, + `"UNIMPLEMENTED"`: Unimplemented, + `"INTERNAL"`: Internal, + `"UNAVAILABLE"`: Unavailable, + `"UNAUTHENTICATED"`: Unauthenticated, +} + +// UnmarshalJSON unmarshals b into the Code. +func (c *Code) UnmarshalJSON(b []byte) error { + // From json.Unmarshaler: By convention, to approximate the behavior of + // Unmarshal itself, Unmarshalers implement UnmarshalJSON([]byte("null")) as + // a no-op. + if string(b) == "null" { + return nil + } + if c == nil { + return fmt.Errorf("nil receiver passed to UnmarshalJSON") + } + + if ci, err := strconv.ParseUint(string(b), 10, 32); err == nil { + if ci >= _maxCode { + return fmt.Errorf("invalid codes: %q", ci) + } + + *c = Code(ci) + return nil + } + + if jc, ok := strToCode[string(b)]; ok { + *c = jc + return nil + } + return fmt.Errorf("invalid codes: %q", string(b)) +} diff --git a/remoting/dubbo3/dubbo3_client.go b/remoting/dubbo3/dubbo3_client.go index 8a45d4ed74..a3c991209f 100644 --- a/remoting/dubbo3/dubbo3_client.go +++ b/remoting/dubbo3/dubbo3_client.go @@ -51,7 +51,9 @@ type TripleConn struct { // Invoke called when unary rpc 's pb.go file func (t *TripleConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { - t.client.Request(ctx, method, args, reply) + if err := t.client.Request(ctx, method, args, reply); err != nil { + return err + } return nil } diff --git a/remoting/dubbo3/http2.go b/remoting/dubbo3/http2.go index 7f210f1dfd..2c9da1d18c 100644 --- a/remoting/dubbo3/http2.go +++ b/remoting/dubbo3/http2.go @@ -24,6 +24,7 @@ import ( "github.com/apache/dubbo-go/protocol/dubbo3/impl" "io" "net" + "strconv" "strings" "sync" "sync/atomic" @@ -230,6 +231,29 @@ func (h *H2Controller) runSendUnaryRsp(stream *serverStream) { sendChan := stream.getSend() for { sendMsg := <-sendChan + + if sendMsg.GetMsgType() == ServerStreamCloseMsgType { + var buf bytes.Buffer + enc := hpack.NewEncoder(&buf) + st := sendMsg.st + headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) + for _, f := range headerFields { + if err := enc.WriteField(f); err != nil { + logger.Error("error: enc.WriteField err = ", err) + } + } + hfData := buf.Next(buf.Len()) + h.sendChan <- h2.HeadersFrameParam{ + StreamID: stream.getID(), + EndHeaders: true, + BlockFragment: hfData, + EndStream: true, + } + return + } + sendData := sendMsg.buffer.Bytes() // header headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. @@ -291,9 +315,10 @@ func (h *H2Controller) runSendStreamRsp(stream *serverStream) { if sendMsg.GetMsgType() == ServerStreamCloseMsgType { var buf bytes.Buffer enc := hpack.NewEncoder(&buf) + st := sendMsg.st headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. - headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: "0"}) - headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: ""}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) for _, f := range headerFields { if err := enc.WriteField(f); err != nil { logger.Error("error: enc.WriteField err = ", err) @@ -344,6 +369,9 @@ func (h *H2Controller) runSendStreamRsp(stream *serverStream) { } } + + + func (h *H2Controller) clientRunRecv() { for { fm, err := h.rawFramer.ReadFrame() @@ -355,13 +383,9 @@ func (h *H2Controller) clientRunRecv() { } switch fm := fm.(type) { case *h2.MetaHeadersFrame: - id := fm.StreamID - logger.Debug("MetaHeader frame = ", fm.String(), "id = ", id) - if fm.Flags.Has(h2.FlagDataEndStream) { - // todo graceful close - h.streamMap.Delete(id) - continue - } + + h.operateHeaders(fm) + case *h2.DataFrame: id := fm.StreamID logger.Debug("DataFrame = ", fm.String(), "id = ", id) @@ -383,6 +407,40 @@ func (h *H2Controller) clientRunRecv() { } } +func (h *H2Controller) operateHeaders(fm *h2.MetaHeadersFrame) { + + + id := fm.StreamID + logger.Debug("MetaHeader frame = ", fm.String(), "id = ", id) + + val, ok := h.streamMap.Load(id) + + if !ok { + logger.Errorf("operateHeaders not get stream") + return + } + + s := val.(stream) + + state := &decodeState{} + state.data.isGRPC = true + + + for _, hf := range fm.Fields { + state.processHeaderField(hf) + } + if state.data.rawStatusCode != nil { + s.putRecvErr(state.status().Err()) + } + + if fm.Flags.Has(h2.FlagDataEndStream) { + // todo graceful close + h.streamMap.Delete(id) + } + +} + + // serverRun start a loop, server start listening h2 metaheader func (h *H2Controller) serverRunRecv() { for { @@ -563,8 +621,8 @@ func (h *H2Controller) UnaryInvoke(ctx context.Context, method string, addr stri // recv rsp recvChan := clientStream.getRecv() recvData := <-recvChan - if recvData.GetMsgType() != DataMsgType { - return perrors.New("get data from req not data msg type") + if recvData.GetMsgType() == ServerStreamCloseMsgType { + return toRPCErr(recvData.err) } if err := proto.Unmarshal(h.pkgHandler.Frame2PkgData(recvData.buffer.Bytes()), reply.(proto.Message)); err != nil { diff --git a/remoting/dubbo3/http_util.go b/remoting/dubbo3/http_util.go new file mode 100644 index 0000000000..e2720c4f38 --- /dev/null +++ b/remoting/dubbo3/http_util.go @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo3 + +import ( + "bytes" + "github.com/apache/dubbo-go/remoting/dubbo3/codes" + "github.com/apache/dubbo-go/remoting/dubbo3/status" + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" + "net/http" + "strconv" + "strings" +) + + +import ( + "fmt" + "unicode/utf8" +) + + + +var ( + clientPreface = []byte(http2.ClientPreface) + http2ErrConvTab = map[http2.ErrCode]codes.Code{ + http2.ErrCodeNo: codes.Internal, + http2.ErrCodeProtocol: codes.Internal, + http2.ErrCodeInternal: codes.Internal, + http2.ErrCodeFlowControl: codes.ResourceExhausted, + http2.ErrCodeSettingsTimeout: codes.Internal, + http2.ErrCodeStreamClosed: codes.Internal, + http2.ErrCodeFrameSize: codes.Internal, + http2.ErrCodeRefusedStream: codes.Unavailable, + http2.ErrCodeCancel: codes.Canceled, + http2.ErrCodeCompression: codes.Internal, + http2.ErrCodeConnect: codes.Internal, + http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted, + http2.ErrCodeInadequateSecurity: codes.PermissionDenied, + http2.ErrCodeHTTP11Required: codes.Internal, + } + // HTTPStatusConvTab is the HTTP status codes to triple error codes conversion table. + HTTPStatusConvTab = map[int]codes.Code{ + // 400 Bad Request - INTERNAL. + http.StatusBadRequest: codes.Internal, + // 401 Unauthorized - UNAUTHENTICATED. + http.StatusUnauthorized: codes.Unauthenticated, + // 403 Forbidden - PERMISSION_DENIED. + http.StatusForbidden: codes.PermissionDenied, + // 404 Not Found - UNIMPLEMENTED. + http.StatusNotFound: codes.Unimplemented, + // 429 Too Many Requests - UNAVAILABLE. + http.StatusTooManyRequests: codes.Unavailable, + // 502 Bad Gateway - UNAVAILABLE. + http.StatusBadGateway: codes.Unavailable, + // 503 Service Unavailable - UNAVAILABLE. + http.StatusServiceUnavailable: codes.Unavailable, + // 504 Gateway timeout - UNAVAILABLE. + http.StatusGatewayTimeout: codes.Unavailable, + } +) + + + +const ( + spaceByte = ' ' + tildeByte = '~' + percentByte = '%' +) + + + + +// decodeState configures decoding criteria and records the decoded data. +type decodeState struct { + // whether decoding on server side or not + serverSide bool + + // Records the states during HPACK decoding. It will be filled with info parsed from HTTP HEADERS + // frame once decodeHeader function has been invoked and returned. + data parsedHeaderData +} + + +func (d *decodeState) status() *status.Status { + if d.data.statusGen == nil { + // No status-details were provided; generate status using codes/msg. + d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) + } + return d.data.statusGen +} + + +func (d *decodeState) processHeaderField(f hpack.HeaderField) { + switch f.Name { + case "grpc-status": + code, err := strconv.Atoi(f.Value) + if err != nil { + d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + return + } + d.data.rawStatusCode = &code + case "grpc-message": + d.data.rawStatusMsg = decodeGrpcMessage(f.Value) + + default: + + } +} + +// constructErrMsg constructs error message to be returned in HTTP fallback mode. +// Format: HTTP status codes and its corresponding message + content-type error message. +func (d *decodeState) constructHTTPErrMsg() string { + var errMsgs []string + + if d.data.httpStatus == nil { + errMsgs = append(errMsgs, "malformed header: missing HTTP status") + } else { + errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status codes %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus)) + } + + if d.data.contentTypeErr == "" { + errMsgs = append(errMsgs, "transport: missing content-type field") + } else { + errMsgs = append(errMsgs, d.data.contentTypeErr) + } + + return strings.Join(errMsgs, "; ") +} + + + +// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage. +func decodeGrpcMessage(msg string) string { + if msg == "" { + return "" + } + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + if msg[i] == percentByte && i+2 < lenMsg { + return decodeGrpcMessageUnchecked(msg) + } + } + return msg +} + +func decodeGrpcMessageUnchecked(msg string) string { + var buf bytes.Buffer + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + c := msg[i] + if c == percentByte && i+2 < lenMsg { + parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8) + if err != nil { + buf.WriteByte(c) + } else { + buf.WriteByte(byte(parsed)) + i += 2 + } + } else { + buf.WriteByte(c) + } + } + return buf.String() +} + + + +type parsedHeaderData struct { + encoding string + // statusGen caches the stream status received from the trailer the server + // sent. Client side only. Do not access directly. After all trailers are + // parsed, use the status method to retrieve the status. + statusGen *status.Status + rawStatusCode *int + rawStatusMsg string + httpStatus *int + isGRPC bool + grpcErr error + httpErr error + contentTypeErr string +} + + + +func encodeGrpcMessage(msg string) string { + if msg == "" { + return "" + } + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + c := msg[i] + if !(c >= spaceByte && c <= tildeByte && c != percentByte) { + return encodeGrpcMessageUnchecked(msg) + } + } + return msg +} + +func encodeGrpcMessageUnchecked(msg string) string { + var buf bytes.Buffer + for len(msg) > 0 { + r, size := utf8.DecodeRuneInString(msg) + for _, b := range []byte(string(r)) { + if size > 1 { + // If size > 1, r is not ascii. Always do percent encoding. + buf.WriteString(fmt.Sprintf("%%%02X", b)) + continue + } + + // The for loop is necessary even if size == 1. r could be + // utf8.RuneError. + // + // fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD". + if b >= spaceByte && b <= tildeByte && b != percentByte { + buf.WriteByte(b) + } else { + buf.WriteString(fmt.Sprintf("%%%02X", b)) + } + } + msg = msg[size:] + } + return buf.String() +} + + + + diff --git a/remoting/dubbo3/processor.go b/remoting/dubbo3/processor.go index 6f445a1242..f213a1713e 100644 --- a/remoting/dubbo3/processor.go +++ b/remoting/dubbo3/processor.go @@ -19,6 +19,8 @@ package dubbo3 import ( "bytes" + "github.com/apache/dubbo-go/remoting/dubbo3/codes" + "github.com/apache/dubbo-go/remoting/dubbo3/status" ) import ( "github.com/golang/protobuf/proto" @@ -100,17 +102,31 @@ func (s *unaryProcessor) runRPC() { go func() { recvMsg := <-recvChan if recvMsg.err != nil { + logger.Error("error ,s.processUnaryRPC err = ", recvMsg.err) return } rspData, err := s.processUnaryRPC(*recvMsg.buffer, s.stream.getService(), s.stream.getHeader()) + if err != nil { - logger.Error("error ,s.processUnaryRPC err = ", err) + s.handleUnaryRPCErr(err) return } + // TODO: status sendResponse should has err, then writeStatus(err) use one function and defer s.stream.putSend(rspData, DataMsgType) }() } + +func (s *unaryProcessor) handleUnaryRPCErr(err error) { + logger.Error("error ,s.processUnaryRPC err = ", err) + appStatus, ok := status.FromError(err) + if !ok { + err = status.Errorf(codes.Unknown, err.Error()) + appStatus, _ = status.FromError(err) + } + s.stream.WriteStatus(appStatus) +} + // streamingProcessor used to process streaming invocation type streamingProcessor struct { baseProcessor diff --git a/remoting/dubbo3/rpc_util.go b/remoting/dubbo3/rpc_util.go new file mode 100644 index 0000000000..08a691024c --- /dev/null +++ b/remoting/dubbo3/rpc_util.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo3 + +import ( + "github.com/apache/dubbo-go/remoting/dubbo3/codes" + "github.com/apache/dubbo-go/remoting/dubbo3/status" + "io" +) + +// toRPCErr converts an error into an error from the status package. +func toRPCErr(err error) error { + if err == nil || err == io.EOF { + return err + } + if err == io.ErrUnexpectedEOF { + return status.Errorf(codes.Internal, err.Error()) + } + if _, ok := status.FromError(err); ok { + return err + } + return status.Errorf(codes.Unknown, err.Error()) +} diff --git a/remoting/dubbo3/status/status.go b/remoting/dubbo3/status/status.go new file mode 100644 index 0000000000..852eabe8d7 --- /dev/null +++ b/remoting/dubbo3/status/status.go @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package status + +import ( + "errors" + "fmt" + "github.com/apache/dubbo-go/remoting/dubbo3/codes" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + spb "google.golang.org/genproto/googleapis/rpc/status" +) + +// Status represents an RPC status codes, message, and details. It is immutable +// and should be created with New, Newf, or FromProto. +type Status struct { + s *spb.Status +} + + +func FromError(err error) (s *Status, ok bool) { + if err == nil { + return nil, true + } + if se, ok := err.(interface { + GRPCStatus() *Status + }); ok { + return se.GRPCStatus(), true + } + return New(codes.Unknown, err.Error()), false +} + + +// New returns a Status representing c and msg. +func New(c codes.Code, msg string) *Status { + return &Status{s: &spb.Status{Code: int32(c), Message: msg}} +} + +// Newf returns New(c, fmt.Sprintf(format, a...)). +func Newf(c codes.Code, format string, a ...interface{}) *Status { + return New(c, fmt.Sprintf(format, a...)) +} + +// FromProto returns a Status representing s. +func FromProto(s *spb.Status) *Status { + return &Status{s: proto.Clone(s).(*spb.Status)} +} + +// Err returns an error representing c and msg. If c is OK, returns nil. +func Err(c codes.Code, msg string) error { + return New(c, msg).Err() +} + +// Errorf returns Error(c, fmt.Sprintf(format, a...)). +func Errorf(c codes.Code, format string, a ...interface{}) error { + return Err(c, fmt.Sprintf(format, a...)) +} + +// Code returns the status codes contained in s. +func (s *Status) Code() codes.Code { + if s == nil || s.s == nil { + return codes.OK + } + return codes.Code(s.s.Code) +} + +// Message returns the message contained in s. +func (s *Status) Message() string { + if s == nil || s.s == nil { + return "" + } + return s.s.Message +} + +// Proto returns s's status as an spb.Status proto message. +func (s *Status) Proto() *spb.Status { + if s == nil { + return nil + } + return proto.Clone(s.s).(*spb.Status) +} + +// Err returns an immutable error representing s; returns nil if s.Code() is OK. +func (s *Status) Err() error { + if s.Code() == codes.OK { + return nil + } + return &Error{e: s.Proto()} +} + +// WithDetails returns a new status with the provided details messages appended to the status. +// If any errors are encountered, it returns nil and the first error encountered. +func (s *Status) WithDetails(details ...proto.Message) (*Status, error) { + if s.Code() == codes.OK { + return nil, errors.New("no error details for status with codes OK") + } + // s.Code() != OK implies that s.Proto() != nil. + p := s.Proto() + for _, detail := range details { + any, err := ptypes.MarshalAny(detail) + if err != nil { + return nil, err + } + p.Details = append(p.Details, any) + } + return &Status{s: p}, nil +} + +// Details returns a slice of details messages attached to the status. +// If a detail cannot be decoded, the error is returned in place of the detail. +func (s *Status) Details() []interface{} { + if s == nil || s.s == nil { + return nil + } + details := make([]interface{}, 0, len(s.s.Details)) + for _, any := range s.s.Details { + detail := &ptypes.DynamicAny{} + if err := ptypes.UnmarshalAny(any, detail); err != nil { + details = append(details, err) + continue + } + details = append(details, detail.Message) + } + return details +} + +// Error wraps a pointer of a status proto. It implements error and Status, +// and a nil *Error should never be returned by this package. +type Error struct { + e *spb.Status +} + +func (e *Error) Error() string { + return fmt.Sprintf("rpc error: codes = %s desc = %s", codes.Code(e.e.GetCode()), e.e.GetMessage()) +} + + +// Is implements future error.Is functionality. +// A Error is equivalent if the codes and message are identical. +func (e *Error) Is(target error) bool { + tse, ok := target.(*Error) + if !ok { + return false + } + return proto.Equal(e.e, tse.e) +} diff --git a/remoting/dubbo3/stream.go b/remoting/dubbo3/stream.go index e5c1968a88..2d1669b63e 100644 --- a/remoting/dubbo3/stream.go +++ b/remoting/dubbo3/stream.go @@ -19,6 +19,7 @@ package dubbo3 import ( "bytes" + "github.com/apache/dubbo-go/remoting/dubbo3/status" ) import ( "google.golang.org/grpc" @@ -41,6 +42,7 @@ const ( type BufferMsg struct { buffer *bytes.Buffer msgType MsgType + st *status.Status err error } @@ -73,6 +75,7 @@ func (b *MsgBuffer) get() <-chan BufferMsg { type stream interface { putRecv(data []byte, msgType MsgType) putSend(data []byte, msgType MsgType) + putRecvErr(err error) getSend() <-chan BufferMsg getRecv() <-chan BufferMsg } @@ -85,6 +88,17 @@ type baseStream struct { url *common.URL header remoting.ProtocolHeader service common.RPCService + + // On client-side it is the status error received from the server. + // On server-side it is unused. + status *status.Status +} + +func (s *baseStream) WriteStatus(st *status.Status) { + s.sendBuf.put(BufferMsg{ + st : st, + msgType: ServerStreamCloseMsgType, + }) } func (s *baseStream) putRecv(data []byte, msgType MsgType) { @@ -94,6 +108,13 @@ func (s *baseStream) putRecv(data []byte, msgType MsgType) { }) } +func (s *baseStream) putRecvErr(err error) { + s.recvBuf.put(BufferMsg{ + err: err, + msgType: ServerStreamCloseMsgType, + }) +} + func (s *baseStream) putSend(data []byte, msgType MsgType) { s.sendBuf.put(BufferMsg{ buffer: bytes.NewBuffer(data), @@ -109,6 +130,8 @@ func (s *baseStream) getSend() <-chan BufferMsg { return s.sendBuf.get() } + + func newBaseStream(streamID uint32, url *common.URL, service common.RPCService) *baseStream { // stream and pkgHeader are the same level return &baseStream{