From 266d77392952d27d13451de91da682d744856f28 Mon Sep 17 00:00:00 2001 From: Randy Date: Wed, 6 Jan 2021 20:13:29 +0800 Subject: [PATCH 1/2] grpc status from server to client --- go.mod | 1 + protocol/dubbo3/impl/header.go | 14 +- remoting/dubbo3/codes.go | 244 +++++++++++++++++++++++++++++++ remoting/dubbo3/dubbo3_client.go | 4 +- remoting/dubbo3/http2.go | 80 ++++++++-- remoting/dubbo3/http_util.go | 230 +++++++++++++++++++++++++++++ remoting/dubbo3/processor.go | 19 ++- remoting/dubbo3/rpc_util.go | 21 +++ remoting/dubbo3/status/status.go | 176 ++++++++++++++++++++++ remoting/dubbo3/stream.go | 23 +++ remoting/dubbo3/user_stream.go | 1 + 11 files changed, 798 insertions(+), 15 deletions(-) create mode 100644 remoting/dubbo3/codes.go create mode 100644 remoting/dubbo3/http_util.go create mode 100644 remoting/dubbo3/rpc_util.go create mode 100644 remoting/dubbo3/status/status.go diff --git a/go.mod b/go.mod index 39f5e843b0..2a2ce21023 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( go.uber.org/atomic v1.6.0 go.uber.org/zap v1.16.0 golang.org/x/net v0.0.0-20200822124328-c89045814202 + google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1 google.golang.org/grpc v1.26.0 gopkg.in/yaml.v2 v2.2.8 k8s.io/api v0.16.9 diff --git a/protocol/dubbo3/impl/header.go b/protocol/dubbo3/impl/header.go index e34f901fea..91054db6b4 100644 --- a/protocol/dubbo3/impl/header.go +++ b/protocol/dubbo3/impl/header.go @@ -47,6 +47,8 @@ type TripleHeader struct { TracingRPCID string TracingContext string ClusterInfo string + GrpcStatus string + GrpcMessage string } func (t *TripleHeader) GetMethod() string { @@ -65,6 +67,9 @@ func (t *TripleHeader) FieldToCtx() context.Context { ctx = context.WithValue(ctx, "tri-trace-rpcid", t.TracingRPCID) ctx = context.WithValue(ctx, "tri-trace-proto-bin", t.TracingContext) ctx = context.WithValue(ctx, "tri-unit-info", t.ClusterInfo) + ctx = context.WithValue(ctx, "grpc-status", t.GrpcStatus) + ctx = context.WithValue(ctx, "grpc-message", t.GrpcMessage) + return ctx } @@ -94,6 +99,9 @@ func (t TripleHeaderHandler) WriteHeaderField(url *common.URL, ctx context.Conte headerFields = append(headerFields, hpack.HeaderField{Name: "tri-trace-rpcid", Value: getCtxVaSave(ctx, "tri-trace-rpcid")}) headerFields = append(headerFields, hpack.HeaderField{Name: "tri-trace-proto-bin", Value: getCtxVaSave(ctx, "tri-trace-proto-bin")}) headerFields = append(headerFields, hpack.HeaderField{Name: "tri-unit-info", Value: getCtxVaSave(ctx, "tri-unit-info")}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: getCtxVaSave(ctx, "grpc-status")}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: getCtxVaSave(ctx, "grpc-message")}) + return headerFields } @@ -132,8 +140,10 @@ func (t TripleHeaderHandler) ReadFromH2MetaHeader(frame *http2.MetaHeadersFrame) tripleHeader.Method = f.Value // todo: usage of these part of fields needs to be discussed later //case "grpc-encoding": - //case "grpc-status": - //case "grpc-message": + case "grpc-status": + tripleHeader.GrpcStatus = f.Value + case "grpc-message": + tripleHeader.GrpcMessage = f.Value //case "grpc-status-details-bin": //case "grpc-timeout": //case ":status": diff --git a/remoting/dubbo3/codes.go b/remoting/dubbo3/codes.go new file mode 100644 index 0000000000..906e835007 --- /dev/null +++ b/remoting/dubbo3/codes.go @@ -0,0 +1,244 @@ +/* + * + * Copyright 2014 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package codes defines the canonical error codes used by gRPC. It is +// consistent across various languages. +package dubbo3 // import "google.golang.org/grpc/codes" + +import ( + "fmt" + "strconv" +) + +// A Code is an unsigned 32-bit error code as defined in the gRPC spec. +type Code uint32 + +const ( + // OK is returned on success. + OK Code = 0 + + // Canceled indicates the operation was canceled (typically by the caller). + // + // The gRPC framework will generate this error code when cancellation + // is requested. + Canceled Code = 1 + + // Unknown error. An example of where this error may be returned is + // if a Status value received from another address space belongs to + // an error-space that is not known in this address space. Also + // errors raised by APIs that do not return enough error information + // may be converted to this error. + // + // The gRPC framework will generate this error code in the above two + // mentioned cases. + Unknown Code = 2 + + // InvalidArgument indicates client specified an invalid argument. + // Note that this differs from FailedPrecondition. It indicates arguments + // that are problematic regardless of the state of the system + // (e.g., a malformed file name). + // + // This error code will not be generated by the gRPC framework. + InvalidArgument Code = 3 + + // DeadlineExceeded means operation expired before completion. + // For operations that change the state of the system, this error may be + // returned even if the operation has completed successfully. For + // example, a successful response from a server could have been delayed + // long enough for the deadline to expire. + // + // The gRPC framework will generate this error code when the deadline is + // exceeded. + DeadlineExceeded Code = 4 + + // NotFound means some requested entity (e.g., file or directory) was + // not found. + // + // This error code will not be generated by the gRPC framework. + NotFound Code = 5 + + // AlreadyExists means an attempt to create an entity failed because one + // already exists. + // + // This error code will not be generated by the gRPC framework. + AlreadyExists Code = 6 + + // PermissionDenied indicates the caller does not have permission to + // execute the specified operation. It must not be used for rejections + // caused by exhausting some resource (use ResourceExhausted + // instead for those errors). It must not be + // used if the caller cannot be identified (use Unauthenticated + // instead for those errors). + // + // This error code will not be generated by the gRPC core framework, + // but expect authentication middleware to use it. + PermissionDenied Code = 7 + + // ResourceExhausted indicates some resource has been exhausted, perhaps + // a per-user quota, or perhaps the entire file system is out of space. + // + // This error code will be generated by the gRPC framework in + // out-of-memory and server overload situations, or when a message is + // larger than the configured maximum size. + ResourceExhausted Code = 8 + + // FailedPrecondition indicates operation was rejected because the + // system is not in a state required for the operation's execution. + // For example, directory to be deleted may be non-empty, an rmdir + // operation is applied to a non-directory, etc. + // + // A litmus test that may help a service implementor in deciding + // between FailedPrecondition, Aborted, and Unavailable: + // (a) Use Unavailable if the client can retry just the failing call. + // (b) Use Aborted if the client should retry at a higher-level + // (e.g., restarting a read-modify-write sequence). + // (c) Use FailedPrecondition if the client should not retry until + // the system state has been explicitly fixed. E.g., if an "rmdir" + // fails because the directory is non-empty, FailedPrecondition + // should be returned since the client should not retry unless + // they have first fixed up the directory by deleting files from it. + // (d) Use FailedPrecondition if the client performs conditional + // REST Get/Update/Delete on a resource and the resource on the + // server does not match the condition. E.g., conflicting + // read-modify-write on the same resource. + // + // This error code will not be generated by the gRPC framework. + FailedPrecondition Code = 9 + + // Aborted indicates the operation was aborted, typically due to a + // concurrency issue like sequencer check failures, transaction aborts, + // etc. + // + // See litmus test above for deciding between FailedPrecondition, + // Aborted, and Unavailable. + // + // This error code will not be generated by the gRPC framework. + Aborted Code = 10 + + // OutOfRange means operation was attempted past the valid range. + // E.g., seeking or reading past end of file. + // + // Unlike InvalidArgument, this error indicates a problem that may + // be fixed if the system state changes. For example, a 32-bit file + // system will generate InvalidArgument if asked to read at an + // offset that is not in the range [0,2^32-1], but it will generate + // OutOfRange if asked to read from an offset past the current + // file size. + // + // There is a fair bit of overlap between FailedPrecondition and + // OutOfRange. We recommend using OutOfRange (the more specific + // error) when it applies so that callers who are iterating through + // a space can easily look for an OutOfRange error to detect when + // they are done. + // + // This error code will not be generated by the gRPC framework. + OutOfRange Code = 11 + + // Unimplemented indicates operation is not implemented or not + // supported/enabled in this service. + // + // This error code will be generated by the gRPC framework. Most + // commonly, you will see this error code when a method implementation + // is missing on the server. It can also be generated for unknown + // compression algorithms or a disagreement as to whether an RPC should + // be streaming. + Unimplemented Code = 12 + + // Internal errors. Means some invariants expected by underlying + // system has been broken. If you see one of these errors, + // something is very broken. + // + // This error code will be generated by the gRPC framework in several + // internal error conditions. + Internal Code = 13 + + // Unavailable indicates the service is currently unavailable. + // This is a most likely a transient condition and may be corrected + // by retrying with a backoff. Note that it is not always safe to retry + // non-idempotent operations. + // + // See litmus test above for deciding between FailedPrecondition, + // Aborted, and Unavailable. + // + // This error code will be generated by the gRPC framework during + // abrupt shutdown of a server process or network connection. + Unavailable Code = 14 + + // DataLoss indicates unrecoverable data loss or corruption. + // + // This error code will not be generated by the gRPC framework. + DataLoss Code = 15 + + // Unauthenticated indicates the request does not have valid + // authentication credentials for the operation. + // + // The gRPC framework will generate this error code when the + // authentication metadata is invalid or a Credentials callback fails, + // but also expect authentication middleware to generate it. + Unauthenticated Code = 16 + + _maxCode = 17 +) + +var strToCode = map[string]Code{ + `"OK"`: OK, + `"CANCELLED"`:/* [sic] */ Canceled, + `"UNKNOWN"`: Unknown, + `"INVALID_ARGUMENT"`: InvalidArgument, + `"DEADLINE_EXCEEDED"`: DeadlineExceeded, + `"NOT_FOUND"`: NotFound, + `"ALREADY_EXISTS"`: AlreadyExists, + `"PERMISSION_DENIED"`: PermissionDenied, + `"RESOURCE_EXHAUSTED"`: ResourceExhausted, + `"FAILED_PRECONDITION"`: FailedPrecondition, + `"ABORTED"`: Aborted, + `"OUT_OF_RANGE"`: OutOfRange, + `"UNIMPLEMENTED"`: Unimplemented, + `"INTERNAL"`: Internal, + `"UNAVAILABLE"`: Unavailable, + `"DATA_LOSS"`: DataLoss, + `"UNAUTHENTICATED"`: Unauthenticated, +} + +// UnmarshalJSON unmarshals b into the Code. +func (c *Code) UnmarshalJSON(b []byte) error { + // From json.Unmarshaler: By convention, to approximate the behavior of + // Unmarshal itself, Unmarshalers implement UnmarshalJSON([]byte("null")) as + // a no-op. + if string(b) == "null" { + return nil + } + if c == nil { + return fmt.Errorf("nil receiver passed to UnmarshalJSON") + } + + if ci, err := strconv.ParseUint(string(b), 10, 32); err == nil { + if ci >= _maxCode { + return fmt.Errorf("invalid code: %q", ci) + } + + *c = Code(ci) + return nil + } + + if jc, ok := strToCode[string(b)]; ok { + *c = jc + return nil + } + return fmt.Errorf("invalid code: %q", string(b)) +} diff --git a/remoting/dubbo3/dubbo3_client.go b/remoting/dubbo3/dubbo3_client.go index 8a45d4ed74..a3c991209f 100644 --- a/remoting/dubbo3/dubbo3_client.go +++ b/remoting/dubbo3/dubbo3_client.go @@ -51,7 +51,9 @@ type TripleConn struct { // Invoke called when unary rpc 's pb.go file func (t *TripleConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { - t.client.Request(ctx, method, args, reply) + if err := t.client.Request(ctx, method, args, reply); err != nil { + return err + } return nil } diff --git a/remoting/dubbo3/http2.go b/remoting/dubbo3/http2.go index 7f210f1dfd..2c9da1d18c 100644 --- a/remoting/dubbo3/http2.go +++ b/remoting/dubbo3/http2.go @@ -24,6 +24,7 @@ import ( "github.com/apache/dubbo-go/protocol/dubbo3/impl" "io" "net" + "strconv" "strings" "sync" "sync/atomic" @@ -230,6 +231,29 @@ func (h *H2Controller) runSendUnaryRsp(stream *serverStream) { sendChan := stream.getSend() for { sendMsg := <-sendChan + + if sendMsg.GetMsgType() == ServerStreamCloseMsgType { + var buf bytes.Buffer + enc := hpack.NewEncoder(&buf) + st := sendMsg.st + headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) + for _, f := range headerFields { + if err := enc.WriteField(f); err != nil { + logger.Error("error: enc.WriteField err = ", err) + } + } + hfData := buf.Next(buf.Len()) + h.sendChan <- h2.HeadersFrameParam{ + StreamID: stream.getID(), + EndHeaders: true, + BlockFragment: hfData, + EndStream: true, + } + return + } + sendData := sendMsg.buffer.Bytes() // header headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. @@ -291,9 +315,10 @@ func (h *H2Controller) runSendStreamRsp(stream *serverStream) { if sendMsg.GetMsgType() == ServerStreamCloseMsgType { var buf bytes.Buffer enc := hpack.NewEncoder(&buf) + st := sendMsg.st headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. - headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: "0"}) - headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: ""}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) for _, f := range headerFields { if err := enc.WriteField(f); err != nil { logger.Error("error: enc.WriteField err = ", err) @@ -344,6 +369,9 @@ func (h *H2Controller) runSendStreamRsp(stream *serverStream) { } } + + + func (h *H2Controller) clientRunRecv() { for { fm, err := h.rawFramer.ReadFrame() @@ -355,13 +383,9 @@ func (h *H2Controller) clientRunRecv() { } switch fm := fm.(type) { case *h2.MetaHeadersFrame: - id := fm.StreamID - logger.Debug("MetaHeader frame = ", fm.String(), "id = ", id) - if fm.Flags.Has(h2.FlagDataEndStream) { - // todo graceful close - h.streamMap.Delete(id) - continue - } + + h.operateHeaders(fm) + case *h2.DataFrame: id := fm.StreamID logger.Debug("DataFrame = ", fm.String(), "id = ", id) @@ -383,6 +407,40 @@ func (h *H2Controller) clientRunRecv() { } } +func (h *H2Controller) operateHeaders(fm *h2.MetaHeadersFrame) { + + + id := fm.StreamID + logger.Debug("MetaHeader frame = ", fm.String(), "id = ", id) + + val, ok := h.streamMap.Load(id) + + if !ok { + logger.Errorf("operateHeaders not get stream") + return + } + + s := val.(stream) + + state := &decodeState{} + state.data.isGRPC = true + + + for _, hf := range fm.Fields { + state.processHeaderField(hf) + } + if state.data.rawStatusCode != nil { + s.putRecvErr(state.status().Err()) + } + + if fm.Flags.Has(h2.FlagDataEndStream) { + // todo graceful close + h.streamMap.Delete(id) + } + +} + + // serverRun start a loop, server start listening h2 metaheader func (h *H2Controller) serverRunRecv() { for { @@ -563,8 +621,8 @@ func (h *H2Controller) UnaryInvoke(ctx context.Context, method string, addr stri // recv rsp recvChan := clientStream.getRecv() recvData := <-recvChan - if recvData.GetMsgType() != DataMsgType { - return perrors.New("get data from req not data msg type") + if recvData.GetMsgType() == ServerStreamCloseMsgType { + return toRPCErr(recvData.err) } if err := proto.Unmarshal(h.pkgHandler.Frame2PkgData(recvData.buffer.Bytes()), reply.(proto.Message)); err != nil { diff --git a/remoting/dubbo3/http_util.go b/remoting/dubbo3/http_util.go new file mode 100644 index 0000000000..ad30157718 --- /dev/null +++ b/remoting/dubbo3/http_util.go @@ -0,0 +1,230 @@ +package dubbo3 + +import ( + "bytes" + status2 "github.com/apache/dubbo-go/remoting/dubbo3/status" + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "net/http" + "strconv" + "strings" +) + + +import ( + "fmt" + "unicode/utf8" +) + + + +var ( + clientPreface = []byte(http2.ClientPreface) + http2ErrConvTab = map[http2.ErrCode]codes.Code{ + http2.ErrCodeNo: codes.Internal, + http2.ErrCodeProtocol: codes.Internal, + http2.ErrCodeInternal: codes.Internal, + http2.ErrCodeFlowControl: codes.ResourceExhausted, + http2.ErrCodeSettingsTimeout: codes.Internal, + http2.ErrCodeStreamClosed: codes.Internal, + http2.ErrCodeFrameSize: codes.Internal, + http2.ErrCodeRefusedStream: codes.Unavailable, + http2.ErrCodeCancel: codes.Canceled, + http2.ErrCodeCompression: codes.Internal, + http2.ErrCodeConnect: codes.Internal, + http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted, + http2.ErrCodeInadequateSecurity: codes.PermissionDenied, + http2.ErrCodeHTTP11Required: codes.Internal, + } + // HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table. + HTTPStatusConvTab = map[int]codes.Code{ + // 400 Bad Request - INTERNAL. + http.StatusBadRequest: codes.Internal, + // 401 Unauthorized - UNAUTHENTICATED. + http.StatusUnauthorized: codes.Unauthenticated, + // 403 Forbidden - PERMISSION_DENIED. + http.StatusForbidden: codes.PermissionDenied, + // 404 Not Found - UNIMPLEMENTED. + http.StatusNotFound: codes.Unimplemented, + // 429 Too Many Requests - UNAVAILABLE. + http.StatusTooManyRequests: codes.Unavailable, + // 502 Bad Gateway - UNAVAILABLE. + http.StatusBadGateway: codes.Unavailable, + // 503 Service Unavailable - UNAVAILABLE. + http.StatusServiceUnavailable: codes.Unavailable, + // 504 Gateway timeout - UNAVAILABLE. + http.StatusGatewayTimeout: codes.Unavailable, + } +) + + + +const ( + spaceByte = ' ' + tildeByte = '~' + percentByte = '%' +) + + + + +// decodeState configures decoding criteria and records the decoded data. +type decodeState struct { + // whether decoding on server side or not + serverSide bool + + // Records the states during HPACK decoding. It will be filled with info parsed from HTTP HEADERS + // frame once decodeHeader function has been invoked and returned. + data parsedHeaderData +} + + +func (d *decodeState) status() *status2.Status { + if d.data.statusGen == nil { + // No status-details were provided; generate status using code/msg. + d.data.statusGen = status2.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) + } + return d.data.statusGen +} + + +func (d *decodeState) processHeaderField(f hpack.HeaderField) { + switch f.Name { + case "grpc-status": + code, err := strconv.Atoi(f.Value) + if err != nil { + d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + return + } + d.data.rawStatusCode = &code + case "grpc-message": + d.data.rawStatusMsg = decodeGrpcMessage(f.Value) + + default: + + } +} + + + + +// constructErrMsg constructs error message to be returned in HTTP fallback mode. +// Format: HTTP status code and its corresponding message + content-type error message. +func (d *decodeState) constructHTTPErrMsg() string { + var errMsgs []string + + if d.data.httpStatus == nil { + errMsgs = append(errMsgs, "malformed header: missing HTTP status") + } else { + errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus)) + } + + if d.data.contentTypeErr == "" { + errMsgs = append(errMsgs, "transport: missing content-type field") + } else { + errMsgs = append(errMsgs, d.data.contentTypeErr) + } + + return strings.Join(errMsgs, "; ") +} + + + +// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage. +func decodeGrpcMessage(msg string) string { + if msg == "" { + return "" + } + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + if msg[i] == percentByte && i+2 < lenMsg { + return decodeGrpcMessageUnchecked(msg) + } + } + return msg +} + +func decodeGrpcMessageUnchecked(msg string) string { + var buf bytes.Buffer + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + c := msg[i] + if c == percentByte && i+2 < lenMsg { + parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8) + if err != nil { + buf.WriteByte(c) + } else { + buf.WriteByte(byte(parsed)) + i += 2 + } + } else { + buf.WriteByte(c) + } + } + return buf.String() +} + + + +type parsedHeaderData struct { + encoding string + // statusGen caches the stream status received from the trailer the server + // sent. Client side only. Do not access directly. After all trailers are + // parsed, use the status method to retrieve the status. + statusGen *status2.Status + rawStatusCode *int + rawStatusMsg string + httpStatus *int + isGRPC bool + grpcErr error + httpErr error + contentTypeErr string +} + + + +func encodeGrpcMessage(msg string) string { + if msg == "" { + return "" + } + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + c := msg[i] + if !(c >= spaceByte && c <= tildeByte && c != percentByte) { + return encodeGrpcMessageUnchecked(msg) + } + } + return msg +} + +func encodeGrpcMessageUnchecked(msg string) string { + var buf bytes.Buffer + for len(msg) > 0 { + r, size := utf8.DecodeRuneInString(msg) + for _, b := range []byte(string(r)) { + if size > 1 { + // If size > 1, r is not ascii. Always do percent encoding. + buf.WriteString(fmt.Sprintf("%%%02X", b)) + continue + } + + // The for loop is necessary even if size == 1. r could be + // utf8.RuneError. + // + // fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD". + if b >= spaceByte && b <= tildeByte && b != percentByte { + buf.WriteByte(b) + } else { + buf.WriteString(fmt.Sprintf("%%%02X", b)) + } + } + msg = msg[size:] + } + return buf.String() +} + + + + diff --git a/remoting/dubbo3/processor.go b/remoting/dubbo3/processor.go index 6f445a1242..e256293372 100644 --- a/remoting/dubbo3/processor.go +++ b/remoting/dubbo3/processor.go @@ -19,6 +19,9 @@ package dubbo3 import ( "bytes" + status2 "github.com/apache/dubbo-go/remoting/dubbo3/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) import ( "github.com/golang/protobuf/proto" @@ -100,17 +103,31 @@ func (s *unaryProcessor) runRPC() { go func() { recvMsg := <-recvChan if recvMsg.err != nil { + logger.Error("error ,s.processUnaryRPC err = ", recvMsg.err) return } rspData, err := s.processUnaryRPC(*recvMsg.buffer, s.stream.getService(), s.stream.getHeader()) + if err != nil { - logger.Error("error ,s.processUnaryRPC err = ", err) + s.handleUnaryRPCErr(err) return } + // TODO: status sendResponse should has err, then writeStatus(err) use one function and defer s.stream.putSend(rspData, DataMsgType) }() } + +func (s *unaryProcessor) handleUnaryRPCErr(err error) { + logger.Error("error ,s.processUnaryRPC err = ", err) + appStatus, ok := status2.FromError(err) + if !ok { + err = status.Error(codes.Unknown, err.Error()) + appStatus, _ = status2.FromError(err) + } + s.stream.WriteStatus(appStatus) +} + // streamingProcessor used to process streaming invocation type streamingProcessor struct { baseProcessor diff --git a/remoting/dubbo3/rpc_util.go b/remoting/dubbo3/rpc_util.go new file mode 100644 index 0000000000..6b822bdc20 --- /dev/null +++ b/remoting/dubbo3/rpc_util.go @@ -0,0 +1,21 @@ +package dubbo3 + +import ( + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "io" +) + +// toRPCErr converts an error into an error from the status package. +func toRPCErr(err error) error { + if err == nil || err == io.EOF { + return err + } + if err == io.ErrUnexpectedEOF { + return status.Error(codes.Internal, err.Error()) + } + if _, ok := status.FromError(err); ok { + return err + } + return status.Error(codes.Unknown, err.Error()) +} diff --git a/remoting/dubbo3/status/status.go b/remoting/dubbo3/status/status.go new file mode 100644 index 0000000000..d71c837e27 --- /dev/null +++ b/remoting/dubbo3/status/status.go @@ -0,0 +1,176 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package status implements errors returned by gRPC. These errors are +// serialized and transmitted on the wire between server and client, and allow +// for additional data to be transmitted via the Details field in the status +// proto. gRPC service handlers should return an error created by this +// package, and gRPC clients should expect a corresponding error to be +// returned from the RPC call. +// +// This package upholds the invariants that a non-nil error may not +// contain an OK code, and an OK code must result in a nil error. +package status + +import ( + "errors" + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" +) + +// Status represents an RPC status code, message, and details. It is immutable +// and should be created with New, Newf, or FromProto. +type Status struct { + s *spb.Status +} + + +func FromError(err error) (s *Status, ok bool) { + if err == nil { + return nil, true + } + if se, ok := err.(interface { + GRPCStatus() *Status + }); ok { + return se.GRPCStatus(), true + } + return New(codes.Unknown, err.Error()), false +} + + +// New returns a Status representing c and msg. +func New(c codes.Code, msg string) *Status { + return &Status{s: &spb.Status{Code: int32(c), Message: msg}} +} + +// Newf returns New(c, fmt.Sprintf(format, a...)). +func Newf(c codes.Code, format string, a ...interface{}) *Status { + return New(c, fmt.Sprintf(format, a...)) +} + +// FromProto returns a Status representing s. +func FromProto(s *spb.Status) *Status { + return &Status{s: proto.Clone(s).(*spb.Status)} +} + +// Err returns an error representing c and msg. If c is OK, returns nil. +func Err(c codes.Code, msg string) error { + return New(c, msg).Err() +} + +// Errorf returns Error(c, fmt.Sprintf(format, a...)). +func Errorf(c codes.Code, format string, a ...interface{}) error { + return Err(c, fmt.Sprintf(format, a...)) +} + +// Code returns the status code contained in s. +func (s *Status) Code() codes.Code { + if s == nil || s.s == nil { + return codes.OK + } + return codes.Code(s.s.Code) +} + +// Message returns the message contained in s. +func (s *Status) Message() string { + if s == nil || s.s == nil { + return "" + } + return s.s.Message +} + +// Proto returns s's status as an spb.Status proto message. +func (s *Status) Proto() *spb.Status { + if s == nil { + return nil + } + return proto.Clone(s.s).(*spb.Status) +} + +// Err returns an immutable error representing s; returns nil if s.Code() is OK. +func (s *Status) Err() error { + if s.Code() == codes.OK { + return nil + } + return &Error{e: s.Proto()} +} + +// WithDetails returns a new status with the provided details messages appended to the status. +// If any errors are encountered, it returns nil and the first error encountered. +func (s *Status) WithDetails(details ...proto.Message) (*Status, error) { + if s.Code() == codes.OK { + return nil, errors.New("no error details for status with code OK") + } + // s.Code() != OK implies that s.Proto() != nil. + p := s.Proto() + for _, detail := range details { + any, err := ptypes.MarshalAny(detail) + if err != nil { + return nil, err + } + p.Details = append(p.Details, any) + } + return &Status{s: p}, nil +} + +// Details returns a slice of details messages attached to the status. +// If a detail cannot be decoded, the error is returned in place of the detail. +func (s *Status) Details() []interface{} { + if s == nil || s.s == nil { + return nil + } + details := make([]interface{}, 0, len(s.s.Details)) + for _, any := range s.s.Details { + detail := &ptypes.DynamicAny{} + if err := ptypes.UnmarshalAny(any, detail); err != nil { + details = append(details, err) + continue + } + details = append(details, detail.Message) + } + return details +} + +// Error wraps a pointer of a status proto. It implements error and Status, +// and a nil *Error should never be returned by this package. +type Error struct { + e *spb.Status +} + +func (e *Error) Error() string { + return fmt.Sprintf("rpc error: code = %s desc = %s", codes.Code(e.e.GetCode()), e.e.GetMessage()) +} + +// GRPCStatus returns the Status represented by se. +func (e *Error) GRPCStatus() *Status { + return FromProto(e.e) +} + +// Is implements future error.Is functionality. +// A Error is equivalent if the code and message are identical. +func (e *Error) Is(target error) bool { + tse, ok := target.(*Error) + if !ok { + return false + } + return proto.Equal(e.e, tse.e) +} diff --git a/remoting/dubbo3/stream.go b/remoting/dubbo3/stream.go index e5c1968a88..2d1669b63e 100644 --- a/remoting/dubbo3/stream.go +++ b/remoting/dubbo3/stream.go @@ -19,6 +19,7 @@ package dubbo3 import ( "bytes" + "github.com/apache/dubbo-go/remoting/dubbo3/status" ) import ( "google.golang.org/grpc" @@ -41,6 +42,7 @@ const ( type BufferMsg struct { buffer *bytes.Buffer msgType MsgType + st *status.Status err error } @@ -73,6 +75,7 @@ func (b *MsgBuffer) get() <-chan BufferMsg { type stream interface { putRecv(data []byte, msgType MsgType) putSend(data []byte, msgType MsgType) + putRecvErr(err error) getSend() <-chan BufferMsg getRecv() <-chan BufferMsg } @@ -85,6 +88,17 @@ type baseStream struct { url *common.URL header remoting.ProtocolHeader service common.RPCService + + // On client-side it is the status error received from the server. + // On server-side it is unused. + status *status.Status +} + +func (s *baseStream) WriteStatus(st *status.Status) { + s.sendBuf.put(BufferMsg{ + st : st, + msgType: ServerStreamCloseMsgType, + }) } func (s *baseStream) putRecv(data []byte, msgType MsgType) { @@ -94,6 +108,13 @@ func (s *baseStream) putRecv(data []byte, msgType MsgType) { }) } +func (s *baseStream) putRecvErr(err error) { + s.recvBuf.put(BufferMsg{ + err: err, + msgType: ServerStreamCloseMsgType, + }) +} + func (s *baseStream) putSend(data []byte, msgType MsgType) { s.sendBuf.put(BufferMsg{ buffer: bytes.NewBuffer(data), @@ -109,6 +130,8 @@ func (s *baseStream) getSend() <-chan BufferMsg { return s.sendBuf.get() } + + func newBaseStream(streamID uint32, url *common.URL, service common.RPCService) *baseStream { // stream and pkgHeader are the same level return &baseStream{ diff --git a/remoting/dubbo3/user_stream.go b/remoting/dubbo3/user_stream.go index 7716e8875a..d3d35e8402 100644 --- a/remoting/dubbo3/user_stream.go +++ b/remoting/dubbo3/user_stream.go @@ -60,6 +60,7 @@ func (ss *baseUserStream) SendMsg(m interface{}) error { func (ss *baseUserStream) RecvMsg(m interface{}) error { recvChan := ss.stream.getRecv() readBuf := <-recvChan + logger.Info("RecvMsg from recvChan") pkgData := ss.pkgHandler.Frame2PkgData(readBuf.buffer.Bytes()) if err := ss.serilizer.Unmarshal(pkgData, m); err != nil { return err From f0ddf360c19cd5e5624d2a16e26c2dd34fafd550 Mon Sep 17 00:00:00 2001 From: Randy Date: Thu, 7 Jan 2021 17:56:05 +0800 Subject: [PATCH 2/2] fix ci problem license error and refactor package structure --- .github/workflows/codeql-analysis.yml | 2 +- .github/workflows/github-actions.yml | 2 +- filter/filter_impl/hystrix_filter_test.go | 4 +- metadata/service/service.go | 2 +- protocol/dubbo/hessian2/const.go | 2 +- protocol/dubbo/impl/const.go | 2 +- .../protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go | 14 +- protocol/grpc/common_test.go | 2 +- .../protoc-gen-dubbo/plugin/dubbo/dubbo.go | 14 +- protocol/jsonrpc/http_test.go | 2 +- protocol/jsonrpc/json.go | 6 +- protocol/jsonrpc/json_test.go | 12 +- remoting/dubbo3/codes.go | 244 ------------------ remoting/dubbo3/codes/codes.go | 126 +++++++++ remoting/dubbo3/http_util.go | 39 ++- remoting/dubbo3/processor.go | 11 +- remoting/dubbo3/rpc_util.go | 25 +- remoting/dubbo3/status/status.go | 39 +-- remoting/dubbo3/user_stream.go | 1 - 19 files changed, 223 insertions(+), 326 deletions(-) delete mode 100644 remoting/dubbo3/codes.go create mode 100644 remoting/dubbo3/codes/codes.go diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index f2e185cc1e..0da71922b7 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -43,7 +43,7 @@ jobs: # 📚 https://git.io/JvXDl # ✏️ If the Autobuild fails above, remove it and uncomment the following three lines - # and modify them (or add more) to build your code if your project + # and modify them (or add more) to build your codes if your project # uses a compiled language #- run: | diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 975a2a7520..b00b30f857 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -31,7 +31,7 @@ jobs: go-version: ${{ matrix.go_version }} id: go - - name: Check out code into the Go module directory + - name: Check out codes into the Go module directory uses: actions/checkout@v2 - name: Cache dependencies diff --git a/filter/filter_impl/hystrix_filter_test.go b/filter/filter_impl/hystrix_filter_test.go index 4973ce7f70..8cbc242916 100644 --- a/filter/filter_impl/hystrix_filter_test.go +++ b/filter/filter_impl/hystrix_filter_test.go @@ -186,7 +186,7 @@ func TestHystricFilterInvokeCircuitBreak(t *testing.T) { resChan <- result }() } - //This can not always pass the test when on travis due to concurrency, you can uncomment the code below and test it locally + //This can not always pass the test when on travis due to concurrency, you can uncomment the codes below and test it locally //var lastRest bool //for i := 0; i < 50; i++ { @@ -215,7 +215,7 @@ func TestHystricFilterInvokeCircuitBreakOmitException(t *testing.T) { resChan <- result }() } - //This can not always pass the test when on travis due to concurrency, you can uncomment the code below and test it locally + //This can not always pass the test when on travis due to concurrency, you can uncomment the codes below and test it locally //time.Sleep(time.Second * 6) //var lastRest bool diff --git a/metadata/service/service.go b/metadata/service/service.go index 1d90f8a516..62d0f7d590 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -41,7 +41,7 @@ type MetadataService interface { SubscribeURL(url *common.URL) (bool, error) // UnsubscribeURL will delete the subscribed url in metadata UnsubscribeURL(url *common.URL) error - // PublishServiceDefinition will generate the target url's code info + // PublishServiceDefinition will generate the target url's codes info PublishServiceDefinition(url *common.URL) error // GetExportedURLs will get the target exported url in metadata // the url should be unique diff --git a/protocol/dubbo/hessian2/const.go b/protocol/dubbo/hessian2/const.go index 74a00b601d..ddbfb661b3 100644 --- a/protocol/dubbo/hessian2/const.go +++ b/protocol/dubbo/hessian2/const.go @@ -184,7 +184,7 @@ const ( /** * the dubbo protocol header length is 16 Bytes. - * the first 2 Bytes is magic code '0xdabb' + * the first 2 Bytes is magic codes '0xdabb' * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag * the next 1 Bytes is response state. * the next 8 Bytes is package DI. diff --git a/protocol/dubbo/impl/const.go b/protocol/dubbo/impl/const.go index 70d8bae6ca..d3622d24ac 100644 --- a/protocol/dubbo/impl/const.go +++ b/protocol/dubbo/impl/const.go @@ -184,7 +184,7 @@ const ( /** * the dubbo protocol header length is 16 Bytes. - * the first 2 Bytes is magic code '0xdabb' + * the first 2 Bytes is magic codes '0xdabb' * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag * the next 1 Bytes is response state. * the next 8 Bytes is package DI. diff --git a/protocol/dubbo3/protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go b/protocol/dubbo3/protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go index 9873654d44..649eb19961 100644 --- a/protocol/dubbo3/protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go +++ b/protocol/dubbo3/protoc-gen-dubbo3/plugin/dubbo3/dubbo3.go @@ -28,13 +28,13 @@ import ( "github.com/golang/protobuf/protoc-gen-go/generator" ) -// generatedCodeVersion indicates a version of the generated code. -// It is incremented whenever an incompatibility between the generated code and -// the grpc package is introduced; the generated code references +// generatedCodeVersion indicates a version of the generated codes. +// It is incremented whenever an incompatibility between the generated codes and +// the grpc package is introduced; the generated codes references // a constant, grpc.SupportPackageIsVersionN (where N is generatedCodeVersion). const generatedCodeVersion = 4 -// Paths for packages used by code generated in this file, +// Paths for packages used by codes generated in this file, // relative to the import_prefix of the generator.Generator. const ( contextPkgPath = "context" @@ -58,7 +58,7 @@ func (g *dubboGrpc) Name() string { return "dubbo" } -// The names for packages imported in the generated code. +// The names for packages imported in the generated codes. // They may vary from the final path component of the import path // if the name is used by other packages. var ( @@ -86,7 +86,7 @@ func (g *dubboGrpc) typeName(str string) string { // P forwards to g.gen.P. func (g *dubboGrpc) P(args ...interface{}) { g.gen.P(args...) } -// Generate generates code for the services in the given file. +// Generate generates codes for the services in the given file. // be consistent with grpc plugin func (g *dubboGrpc) Generate(file *generator.FileDescriptor) { if len(file.FileDescriptorProto.Service) == 0 { @@ -117,7 +117,7 @@ func unexport(s string) string { return strings.ToLower(s[:1]) + s[1:] } // messages, fields, enums, and enum values. var deprecationComment = "// Deprecated: Do not use." -// generateService generates all the code for the named service. +// generateService generates all the codes for the named service. func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb.ServiceDescriptorProto, index int) { path := fmt.Sprintf("6,%d", index) // 6 means service. diff --git a/protocol/grpc/common_test.go b/protocol/grpc/common_test.go index b732283a9b..1672d9d3ce 100644 --- a/protocol/grpc/common_test.go +++ b/protocol/grpc/common_test.go @@ -57,7 +57,7 @@ func (g *greeterProvider) Reference() string { return "GrpcGreeterImpl" } -// code generated by greeter.go +// codes generated by greeter.go type greeterProviderBase struct { proxyImpl protocol.Invoker } diff --git a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go index a9f50e8287..40ac5951f3 100644 --- a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go +++ b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go @@ -28,13 +28,13 @@ import ( "github.com/golang/protobuf/protoc-gen-go/generator" ) -// generatedCodeVersion indicates a version of the generated code. -// It is incremented whenever an incompatibility between the generated code and -// the grpc package is introduced; the generated code references +// generatedCodeVersion indicates a version of the generated codes. +// It is incremented whenever an incompatibility between the generated codes and +// the grpc package is introduced; the generated codes references // a constant, grpc.SupportPackageIsVersionN (where N is generatedCodeVersion). const generatedCodeVersion = 4 -// Paths for packages used by code generated in this file, +// Paths for packages used by codes generated in this file, // relative to the import_prefix of the generator.Generator. const ( contextPkgPath = "context" @@ -58,7 +58,7 @@ func (g *dubboGrpc) Name() string { return "dubbo" } -// The names for packages imported in the generated code. +// The names for packages imported in the generated codes. // They may vary from the final path component of the import path // if the name is used by other packages. var ( @@ -86,7 +86,7 @@ func (g *dubboGrpc) typeName(str string) string { // P forwards to g.gen.P. func (g *dubboGrpc) P(args ...interface{}) { g.gen.P(args...) } -// Generate generates code for the services in the given file. +// Generate generates codes for the services in the given file. // be consistent with grpc plugin func (g *dubboGrpc) Generate(file *generator.FileDescriptor) { if len(file.FileDescriptorProto.Service) == 0 { @@ -116,7 +116,7 @@ func unexport(s string) string { return strings.ToLower(s[:1]) + s[1:] } // messages, fields, enums, and enum values. var deprecationComment = "// Deprecated: Do not use." -// generateService generates all the code for the named service. +// generateService generates all the codes for the named service. func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb.ServiceDescriptorProto, index int) { path := fmt.Sprintf("6,%d", index) // 6 means service. diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 4a9645e828..1867bca230 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -110,7 +110,7 @@ func TestHTTPClientCall(t *testing.T) { reply = &User{} err = client.Call(ctx, url, req, reply) assert.True(t, strings.Contains(err.Error(), "500 Internal Server Error")) - assert.True(t, strings.Contains(err.Error(), "\\\"result\\\":{},\\\"error\\\":{\\\"code\\\":-32000,\\\"message\\\":\\\"error\\\"}")) + assert.True(t, strings.Contains(err.Error(), "\\\"result\\\":{},\\\"error\\\":{\\\"codes\\\":-32000,\\\"message\\\":\\\"error\\\"}")) // call GetUser2 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ diff --git a/protocol/jsonrpc/json.go b/protocol/jsonrpc/json.go index 506c4c953b..e142406eab 100644 --- a/protocol/jsonrpc/json.go +++ b/protocol/jsonrpc/json.go @@ -59,7 +59,7 @@ const ( // Error response Error type Error struct { - Code int `json:"code"` + Code int `json:"codes"` Message string `json:"message"` Data interface{} `json:"data,omitempty"` } @@ -72,7 +72,7 @@ func (e *Error) Error() string { if retryErr != nil { msg = []byte("jsonrpc2.Error: json.Marshal failed") } - return fmt.Sprintf(`{"code":%d,"message":%s}`, -32001, string(msg)) + return fmt.Sprintf(`{"codes":%d,"message":%s}`, -32001, string(msg)) } return string(buf) } @@ -362,7 +362,7 @@ func (c *ServerCodec) ReadBody(x interface{}) error { return nil } -// NewError creates a error with @code and @message +// NewError creates a error with @codes and @message func NewError(code int, message string) *Error { return &Error{Code: code, Message: message} } diff --git a/protocol/jsonrpc/json_test.go b/protocol/jsonrpc/json_test.go index a3814e9ad5..d3dafad7eb 100644 --- a/protocol/jsonrpc/json_test.go +++ b/protocol/jsonrpc/json_test.go @@ -56,8 +56,8 @@ func TestJsonClientCodecRead(t *testing.T) { //error codec.pending[1] = "GetUser" - err = codec.Read([]byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"error\":{\"code\":-32000,\"message\":\"error\"}}\n"), rsp) - assert.EqualError(t, err, "{\"code\":-32000,\"message\":\"error\"}") + err = codec.Read([]byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"error\":{\"codes\":-32000,\"message\":\"error\"}}\n"), rsp) + assert.EqualError(t, err, "{\"codes\":-32000,\"message\":\"error\"}") } func TestServerCodecWrite(t *testing.T) { @@ -66,18 +66,18 @@ func TestServerCodecWrite(t *testing.T) { codec.req = serverRequest{Version: "1.0", Method: "GetUser", ID: &a} data, err := codec.Write("error", &TestData{Test: "test"}) assert.NoError(t, err) - assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"code\":-32000,\"message\":\"error\"}}\n", string(data)) + assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"codes\":-32000,\"message\":\"error\"}}\n", string(data)) - data, err = codec.Write("{\"code\":-32000,\"message\":\"error\"}", &TestData{Test: "test"}) + data, err = codec.Write("{\"codes\":-32000,\"message\":\"error\"}", &TestData{Test: "test"}) assert.NoError(t, err) - assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"code\":-32000,\"message\":\"error\"}}\n", string(data)) + assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"codes\":-32000,\"message\":\"error\"}}\n", string(data)) } func TestServerCodecRead(t *testing.T) { codec := newServerCodec() header := map[string]string{} err := codec.ReadHeader(header, []byte("{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n")) - assert.EqualError(t, err, "{\"code\":-32601,\"message\":\"Method not found\"}") + assert.EqualError(t, err, "{\"codes\":-32601,\"message\":\"Method not found\"}") header["HttpMethod"] = "POST" err = codec.ReadHeader(header, []byte("{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n")) diff --git a/remoting/dubbo3/codes.go b/remoting/dubbo3/codes.go deleted file mode 100644 index 906e835007..0000000000 --- a/remoting/dubbo3/codes.go +++ /dev/null @@ -1,244 +0,0 @@ -/* - * - * Copyright 2014 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package codes defines the canonical error codes used by gRPC. It is -// consistent across various languages. -package dubbo3 // import "google.golang.org/grpc/codes" - -import ( - "fmt" - "strconv" -) - -// A Code is an unsigned 32-bit error code as defined in the gRPC spec. -type Code uint32 - -const ( - // OK is returned on success. - OK Code = 0 - - // Canceled indicates the operation was canceled (typically by the caller). - // - // The gRPC framework will generate this error code when cancellation - // is requested. - Canceled Code = 1 - - // Unknown error. An example of where this error may be returned is - // if a Status value received from another address space belongs to - // an error-space that is not known in this address space. Also - // errors raised by APIs that do not return enough error information - // may be converted to this error. - // - // The gRPC framework will generate this error code in the above two - // mentioned cases. - Unknown Code = 2 - - // InvalidArgument indicates client specified an invalid argument. - // Note that this differs from FailedPrecondition. It indicates arguments - // that are problematic regardless of the state of the system - // (e.g., a malformed file name). - // - // This error code will not be generated by the gRPC framework. - InvalidArgument Code = 3 - - // DeadlineExceeded means operation expired before completion. - // For operations that change the state of the system, this error may be - // returned even if the operation has completed successfully. For - // example, a successful response from a server could have been delayed - // long enough for the deadline to expire. - // - // The gRPC framework will generate this error code when the deadline is - // exceeded. - DeadlineExceeded Code = 4 - - // NotFound means some requested entity (e.g., file or directory) was - // not found. - // - // This error code will not be generated by the gRPC framework. - NotFound Code = 5 - - // AlreadyExists means an attempt to create an entity failed because one - // already exists. - // - // This error code will not be generated by the gRPC framework. - AlreadyExists Code = 6 - - // PermissionDenied indicates the caller does not have permission to - // execute the specified operation. It must not be used for rejections - // caused by exhausting some resource (use ResourceExhausted - // instead for those errors). It must not be - // used if the caller cannot be identified (use Unauthenticated - // instead for those errors). - // - // This error code will not be generated by the gRPC core framework, - // but expect authentication middleware to use it. - PermissionDenied Code = 7 - - // ResourceExhausted indicates some resource has been exhausted, perhaps - // a per-user quota, or perhaps the entire file system is out of space. - // - // This error code will be generated by the gRPC framework in - // out-of-memory and server overload situations, or when a message is - // larger than the configured maximum size. - ResourceExhausted Code = 8 - - // FailedPrecondition indicates operation was rejected because the - // system is not in a state required for the operation's execution. - // For example, directory to be deleted may be non-empty, an rmdir - // operation is applied to a non-directory, etc. - // - // A litmus test that may help a service implementor in deciding - // between FailedPrecondition, Aborted, and Unavailable: - // (a) Use Unavailable if the client can retry just the failing call. - // (b) Use Aborted if the client should retry at a higher-level - // (e.g., restarting a read-modify-write sequence). - // (c) Use FailedPrecondition if the client should not retry until - // the system state has been explicitly fixed. E.g., if an "rmdir" - // fails because the directory is non-empty, FailedPrecondition - // should be returned since the client should not retry unless - // they have first fixed up the directory by deleting files from it. - // (d) Use FailedPrecondition if the client performs conditional - // REST Get/Update/Delete on a resource and the resource on the - // server does not match the condition. E.g., conflicting - // read-modify-write on the same resource. - // - // This error code will not be generated by the gRPC framework. - FailedPrecondition Code = 9 - - // Aborted indicates the operation was aborted, typically due to a - // concurrency issue like sequencer check failures, transaction aborts, - // etc. - // - // See litmus test above for deciding between FailedPrecondition, - // Aborted, and Unavailable. - // - // This error code will not be generated by the gRPC framework. - Aborted Code = 10 - - // OutOfRange means operation was attempted past the valid range. - // E.g., seeking or reading past end of file. - // - // Unlike InvalidArgument, this error indicates a problem that may - // be fixed if the system state changes. For example, a 32-bit file - // system will generate InvalidArgument if asked to read at an - // offset that is not in the range [0,2^32-1], but it will generate - // OutOfRange if asked to read from an offset past the current - // file size. - // - // There is a fair bit of overlap between FailedPrecondition and - // OutOfRange. We recommend using OutOfRange (the more specific - // error) when it applies so that callers who are iterating through - // a space can easily look for an OutOfRange error to detect when - // they are done. - // - // This error code will not be generated by the gRPC framework. - OutOfRange Code = 11 - - // Unimplemented indicates operation is not implemented or not - // supported/enabled in this service. - // - // This error code will be generated by the gRPC framework. Most - // commonly, you will see this error code when a method implementation - // is missing on the server. It can also be generated for unknown - // compression algorithms or a disagreement as to whether an RPC should - // be streaming. - Unimplemented Code = 12 - - // Internal errors. Means some invariants expected by underlying - // system has been broken. If you see one of these errors, - // something is very broken. - // - // This error code will be generated by the gRPC framework in several - // internal error conditions. - Internal Code = 13 - - // Unavailable indicates the service is currently unavailable. - // This is a most likely a transient condition and may be corrected - // by retrying with a backoff. Note that it is not always safe to retry - // non-idempotent operations. - // - // See litmus test above for deciding between FailedPrecondition, - // Aborted, and Unavailable. - // - // This error code will be generated by the gRPC framework during - // abrupt shutdown of a server process or network connection. - Unavailable Code = 14 - - // DataLoss indicates unrecoverable data loss or corruption. - // - // This error code will not be generated by the gRPC framework. - DataLoss Code = 15 - - // Unauthenticated indicates the request does not have valid - // authentication credentials for the operation. - // - // The gRPC framework will generate this error code when the - // authentication metadata is invalid or a Credentials callback fails, - // but also expect authentication middleware to generate it. - Unauthenticated Code = 16 - - _maxCode = 17 -) - -var strToCode = map[string]Code{ - `"OK"`: OK, - `"CANCELLED"`:/* [sic] */ Canceled, - `"UNKNOWN"`: Unknown, - `"INVALID_ARGUMENT"`: InvalidArgument, - `"DEADLINE_EXCEEDED"`: DeadlineExceeded, - `"NOT_FOUND"`: NotFound, - `"ALREADY_EXISTS"`: AlreadyExists, - `"PERMISSION_DENIED"`: PermissionDenied, - `"RESOURCE_EXHAUSTED"`: ResourceExhausted, - `"FAILED_PRECONDITION"`: FailedPrecondition, - `"ABORTED"`: Aborted, - `"OUT_OF_RANGE"`: OutOfRange, - `"UNIMPLEMENTED"`: Unimplemented, - `"INTERNAL"`: Internal, - `"UNAVAILABLE"`: Unavailable, - `"DATA_LOSS"`: DataLoss, - `"UNAUTHENTICATED"`: Unauthenticated, -} - -// UnmarshalJSON unmarshals b into the Code. -func (c *Code) UnmarshalJSON(b []byte) error { - // From json.Unmarshaler: By convention, to approximate the behavior of - // Unmarshal itself, Unmarshalers implement UnmarshalJSON([]byte("null")) as - // a no-op. - if string(b) == "null" { - return nil - } - if c == nil { - return fmt.Errorf("nil receiver passed to UnmarshalJSON") - } - - if ci, err := strconv.ParseUint(string(b), 10, 32); err == nil { - if ci >= _maxCode { - return fmt.Errorf("invalid code: %q", ci) - } - - *c = Code(ci) - return nil - } - - if jc, ok := strToCode[string(b)]; ok { - *c = jc - return nil - } - return fmt.Errorf("invalid code: %q", string(b)) -} diff --git a/remoting/dubbo3/codes/codes.go b/remoting/dubbo3/codes/codes.go new file mode 100644 index 0000000000..7816c04945 --- /dev/null +++ b/remoting/dubbo3/codes/codes.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codes + +import ( + "fmt" + "strconv" +) + +// A Code is an unsigned 32-bit error codes. +type Code uint32 + +const ( + // OK is returned on success. + OK Code = 0 + + // Canceled indicates the operation was canceled (typically by the caller). + Canceled Code = 1 + + // Unknown error. An example of where this error may be returned is + // if a Status value received from another address space belongs to + // an error-space that is not known in this address space. Also + // errors raised by APIs that do not return enough error information + // may be converted to this error. + Unknown Code = 2 + + + + // PermissionDenied indicates the caller does not have permission to + // execute the specified operation. It must not be used for rejections + // caused by exhausting some resource (use ResourceExhausted + // instead for those errors). It must not be + // used if the caller cannot be identified (use Unauthenticated + // instead for those errors). + PermissionDenied Code = 7 + + // ResourceExhausted indicates some resource has been exhausted, perhaps + // a per-user quota, or perhaps the entire file system is out of space. + ResourceExhausted Code = 8 + + + + // Unimplemented indicates operation is not implemented or not + // supported/enabled in this service. + // + Unimplemented Code = 12 + + // Internal errors. Means some invariants expected by underlying + // system has been broken. If you see one of these errors, + // something is very broken. + // + Internal Code = 13 + + // Unavailable indicates the service is currently unavailable. + // This is a most likely a transient condition and may be corrected + // by retrying with a backoff. Note that it is not always safe to retry + // non-idempotent operations. + // + // See litmus test above for deciding between FailedPrecondition, + // Aborted, and Unavailable. + // + Unavailable Code = 14 + + + // Unauthenticated indicates the request does not have valid + // authentication credentials for the operation. + // + Unauthenticated Code = 16 + + _maxCode = 17 +) + +var strToCode = map[string]Code{ + `"OK"`: OK, + `"CANCELLED"`:/* [sic] */ Canceled, + `"UNKNOWN"`: Unknown, + `"PERMISSION_DENIED"`: PermissionDenied, + `"RESOURCE_EXHAUSTED"`: ResourceExhausted, + `"UNIMPLEMENTED"`: Unimplemented, + `"INTERNAL"`: Internal, + `"UNAVAILABLE"`: Unavailable, + `"UNAUTHENTICATED"`: Unauthenticated, +} + +// UnmarshalJSON unmarshals b into the Code. +func (c *Code) UnmarshalJSON(b []byte) error { + // From json.Unmarshaler: By convention, to approximate the behavior of + // Unmarshal itself, Unmarshalers implement UnmarshalJSON([]byte("null")) as + // a no-op. + if string(b) == "null" { + return nil + } + if c == nil { + return fmt.Errorf("nil receiver passed to UnmarshalJSON") + } + + if ci, err := strconv.ParseUint(string(b), 10, 32); err == nil { + if ci >= _maxCode { + return fmt.Errorf("invalid codes: %q", ci) + } + + *c = Code(ci) + return nil + } + + if jc, ok := strToCode[string(b)]; ok { + *c = jc + return nil + } + return fmt.Errorf("invalid codes: %q", string(b)) +} diff --git a/remoting/dubbo3/http_util.go b/remoting/dubbo3/http_util.go index ad30157718..e2720c4f38 100644 --- a/remoting/dubbo3/http_util.go +++ b/remoting/dubbo3/http_util.go @@ -1,12 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package dubbo3 import ( "bytes" - status2 "github.com/apache/dubbo-go/remoting/dubbo3/status" + "github.com/apache/dubbo-go/remoting/dubbo3/codes" + "github.com/apache/dubbo-go/remoting/dubbo3/status" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "net/http" "strconv" "strings" @@ -38,7 +54,7 @@ var ( http2.ErrCodeInadequateSecurity: codes.PermissionDenied, http2.ErrCodeHTTP11Required: codes.Internal, } - // HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table. + // HTTPStatusConvTab is the HTTP status codes to triple error codes conversion table. HTTPStatusConvTab = map[int]codes.Code{ // 400 Bad Request - INTERNAL. http.StatusBadRequest: codes.Internal, @@ -81,10 +97,10 @@ type decodeState struct { } -func (d *decodeState) status() *status2.Status { +func (d *decodeState) status() *status.Status { if d.data.statusGen == nil { - // No status-details were provided; generate status using code/msg. - d.data.statusGen = status2.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) + // No status-details were provided; generate status using codes/msg. + d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) } return d.data.statusGen } @@ -107,18 +123,15 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) { } } - - - // constructErrMsg constructs error message to be returned in HTTP fallback mode. -// Format: HTTP status code and its corresponding message + content-type error message. +// Format: HTTP status codes and its corresponding message + content-type error message. func (d *decodeState) constructHTTPErrMsg() string { var errMsgs []string if d.data.httpStatus == nil { errMsgs = append(errMsgs, "malformed header: missing HTTP status") } else { - errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus)) + errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status codes %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus)) } if d.data.contentTypeErr == "" { @@ -173,7 +186,7 @@ type parsedHeaderData struct { // statusGen caches the stream status received from the trailer the server // sent. Client side only. Do not access directly. After all trailers are // parsed, use the status method to retrieve the status. - statusGen *status2.Status + statusGen *status.Status rawStatusCode *int rawStatusMsg string httpStatus *int diff --git a/remoting/dubbo3/processor.go b/remoting/dubbo3/processor.go index e256293372..f213a1713e 100644 --- a/remoting/dubbo3/processor.go +++ b/remoting/dubbo3/processor.go @@ -19,9 +19,8 @@ package dubbo3 import ( "bytes" - status2 "github.com/apache/dubbo-go/remoting/dubbo3/status" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "github.com/apache/dubbo-go/remoting/dubbo3/codes" + "github.com/apache/dubbo-go/remoting/dubbo3/status" ) import ( "github.com/golang/protobuf/proto" @@ -120,10 +119,10 @@ func (s *unaryProcessor) runRPC() { func (s *unaryProcessor) handleUnaryRPCErr(err error) { logger.Error("error ,s.processUnaryRPC err = ", err) - appStatus, ok := status2.FromError(err) + appStatus, ok := status.FromError(err) if !ok { - err = status.Error(codes.Unknown, err.Error()) - appStatus, _ = status2.FromError(err) + err = status.Errorf(codes.Unknown, err.Error()) + appStatus, _ = status.FromError(err) } s.stream.WriteStatus(appStatus) } diff --git a/remoting/dubbo3/rpc_util.go b/remoting/dubbo3/rpc_util.go index 6b822bdc20..08a691024c 100644 --- a/remoting/dubbo3/rpc_util.go +++ b/remoting/dubbo3/rpc_util.go @@ -1,8 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package dubbo3 import ( - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "github.com/apache/dubbo-go/remoting/dubbo3/codes" + "github.com/apache/dubbo-go/remoting/dubbo3/status" "io" ) @@ -12,10 +29,10 @@ func toRPCErr(err error) error { return err } if err == io.ErrUnexpectedEOF { - return status.Error(codes.Internal, err.Error()) + return status.Errorf(codes.Internal, err.Error()) } if _, ok := status.FromError(err); ok { return err } - return status.Error(codes.Unknown, err.Error()) + return status.Errorf(codes.Unknown, err.Error()) } diff --git a/remoting/dubbo3/status/status.go b/remoting/dubbo3/status/status.go index d71c837e27..852eabe8d7 100644 --- a/remoting/dubbo3/status/status.go +++ b/remoting/dubbo3/status/status.go @@ -1,10 +1,10 @@ /* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -13,31 +13,22 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -// Package status implements errors returned by gRPC. These errors are -// serialized and transmitted on the wire between server and client, and allow -// for additional data to be transmitted via the Details field in the status -// proto. gRPC service handlers should return an error created by this -// package, and gRPC clients should expect a corresponding error to be -// returned from the RPC call. -// -// This package upholds the invariants that a non-nil error may not -// contain an OK code, and an OK code must result in a nil error. + package status import ( "errors" "fmt" + "github.com/apache/dubbo-go/remoting/dubbo3/codes" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" spb "google.golang.org/genproto/googleapis/rpc/status" - "google.golang.org/grpc/codes" ) -// Status represents an RPC status code, message, and details. It is immutable +// Status represents an RPC status codes, message, and details. It is immutable // and should be created with New, Newf, or FromProto. type Status struct { s *spb.Status @@ -82,7 +73,7 @@ func Errorf(c codes.Code, format string, a ...interface{}) error { return Err(c, fmt.Sprintf(format, a...)) } -// Code returns the status code contained in s. +// Code returns the status codes contained in s. func (s *Status) Code() codes.Code { if s == nil || s.s == nil { return codes.OK @@ -118,7 +109,7 @@ func (s *Status) Err() error { // If any errors are encountered, it returns nil and the first error encountered. func (s *Status) WithDetails(details ...proto.Message) (*Status, error) { if s.Code() == codes.OK { - return nil, errors.New("no error details for status with code OK") + return nil, errors.New("no error details for status with codes OK") } // s.Code() != OK implies that s.Proto() != nil. p := s.Proto() @@ -157,16 +148,12 @@ type Error struct { } func (e *Error) Error() string { - return fmt.Sprintf("rpc error: code = %s desc = %s", codes.Code(e.e.GetCode()), e.e.GetMessage()) + return fmt.Sprintf("rpc error: codes = %s desc = %s", codes.Code(e.e.GetCode()), e.e.GetMessage()) } -// GRPCStatus returns the Status represented by se. -func (e *Error) GRPCStatus() *Status { - return FromProto(e.e) -} // Is implements future error.Is functionality. -// A Error is equivalent if the code and message are identical. +// A Error is equivalent if the codes and message are identical. func (e *Error) Is(target error) bool { tse, ok := target.(*Error) if !ok { diff --git a/remoting/dubbo3/user_stream.go b/remoting/dubbo3/user_stream.go index d3d35e8402..7716e8875a 100644 --- a/remoting/dubbo3/user_stream.go +++ b/remoting/dubbo3/user_stream.go @@ -60,7 +60,6 @@ func (ss *baseUserStream) SendMsg(m interface{}) error { func (ss *baseUserStream) RecvMsg(m interface{}) error { recvChan := ss.stream.getRecv() readBuf := <-recvChan - logger.Info("RecvMsg from recvChan") pkgData := ss.pkgHandler.Frame2PkgData(readBuf.buffer.Bytes()) if err := ss.serilizer.Unmarshal(pkgData, m); err != nil { return err