Skip to content

Commit

Permalink
fix message remote registry & string errcode
Browse files Browse the repository at this point in the history
  • Loading branch information
yejunwei-weimob committed Mar 31, 2023
1 parent dde3b82 commit a4f9d82
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 50 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ go 1.18

require (
github.com/smartystreets/goconvey v1.7.2
github.com/weimob-tech/go-project-base v0.0.0-20230330065218-51c9a4de86ef
github.com/weimob-tech/go-project-base v0.0.0-20230331091325-ab23161af9f4
)

require (
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/smartystreets/assertions v1.2.0 // indirect
)
19 changes: 17 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg=
github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM=
github.com/weimob-tech/go-project-base v0.0.0-20230330065218-51c9a4de86ef h1:xvNdk3idCzzIzrc/q6gS9JBd0H4pHft9sWEm+OetTEI=
github.com/weimob-tech/go-project-base v0.0.0-20230330065218-51c9a4de86ef/go.mod h1:VwOcxZJgqvjidGISxdIpFOrbCDJeix9oxqJnSo67WF4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/weimob-tech/go-project-base v0.0.0-20230331091325-ab23161af9f4 h1:7emiC8uVU9OFmsyix1sAdKYwVBT6w3w8Vq0tkV/MGv4=
github.com/weimob-tech/go-project-base v0.0.0-20230331091325-ab23161af9f4/go.mod h1:T3h4JjbG6L7x88RY4bjEjw3mEiClcynm4b7m1+75MEI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
4 changes: 2 additions & 2 deletions pkg/sdk/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (client *Client) DoAction(request RpcRequest, response RpcResponse) (err er
case RPCTypeMultiPart:
return client.doMultipartAction(request, response)
default:
return NewRpcError(90500, "unknown rpc type")
return NewRpcError("90500", "unknown rpc type")
}
}

Expand All @@ -81,7 +81,7 @@ func (client *Client) doJsonAction(request RpcRequest, response RpcResponse) (er

payload, err := codec.Json.Marshal(request)
if err != nil {
return NewRpcError(90500, fmt.Sprintf("json marshal error, %s", err.Error()))
return NewRpcError("90500", fmt.Sprintf("json marshal error, %s", err.Error()))
}
req.SetBody(payload)

Expand Down
4 changes: 2 additions & 2 deletions pkg/sdk/api/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package api
import "fmt"

type RpcError struct {
Errcode int `json:"errcode,omitempty"`
Errcode string `json:"errcode,omitempty"`
Errmsg string `json:"errmsg,omitempty"`
}

func (r RpcError) Error() string {
return fmt.Sprintf("RpcError{errcode: %d, errmsg: %s}", r.Errcode, r.Errmsg)
}

func NewRpcError(code int, msg string) error {
func NewRpcError(code, msg string) error {
return &RpcError{code, msg}
}
5 changes: 2 additions & 3 deletions pkg/sdk/api/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type RpcResponse interface {
}

type Code struct {
Errcode int `json:"errcode,omitempty"`
Errcode string `json:"errcode,omitempty"`
Errmsg string `json:"errmsg,omitempty"`
}

Expand All @@ -27,7 +27,6 @@ type BaseResponse[T any] struct {
httpStatus int
httpContentBytes []byte
Data *T
// httpHeaders map[string][]string
}

func CreateResponse[T any](data *T) (response *BaseResponse[T]) {
Expand Down Expand Up @@ -61,7 +60,7 @@ func (response *BaseResponse[T]) IsSuccess() bool {
}

func (response *BaseResponse[T]) GetData() (data *T, err error) {
if response.Code.Errcode == 0 {
if response.Code.Errcode == "0" {
data = response.Data
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdk/msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package msg
// OnRegistryHook 注册扩展点时的回调方法
type Config struct {
ListenerNotFound func(path ...string) (GenericListener, error)
OnRegistryHook func(listener GenericListener, path ...string)
OnRegistryHook func(listener GenericListener, specType SpecType)
}

func NewConfig() *Config {
Expand Down
44 changes: 33 additions & 11 deletions pkg/sdk/msg/registry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package msg

import (
"fmt"
"strings"
"sync"

Expand All @@ -13,35 +14,36 @@ import (
var (
registryInitialize sync.Once
globalRegistry *registry
msgRegistryInfo []RegisterMsgInfo
msgRegistryInfo = map[SpecType]RegisterMsgInfo{}

defaultListenerNotfound = func(path ...string) (GenericListener, error) {
// 返回空,不抛异常,开放平台会不断重试并将消息放入失败队列
return nil, nil
}

defaultOnRegistryHook = func(listener GenericListener, path ...string) {
defaultOnRegistryHook = func(listener GenericListener, specType SpecType) {
// 注册到扩展点实现注册中心
msgRegistryInfo = append(msgRegistryInfo, RegisterMsgInfo{
ClientId: "",
HostAddress: "",
Path: "",
SpecsType: 0,
})
if _, ok := msgRegistryInfo[specType]; !ok {
msgRegistryInfo[specType] = RegisterMsgInfo{
Path: fmt.Sprintf("weimob/cloud/%s/message/receive", specType.String()),
SpecsType: specType,
}
}
}
)

// 提供扩展点服务注册查找等生命周期管理
type registry struct {
store sync.Map
onRegistryHook func(listener GenericListener, path ...string)
onRegistryHook func(listener GenericListener, specType SpecType)
listenerNotFound func(path ...string) (GenericListener, error)
}

func initRegistry(config *Config) *registry {
registryInitialize.Do(func() {
globalRegistry = &registry{
listenerNotFound: config.ListenerNotFound,
onRegistryHook: config.OnRegistryHook,
}
})
return globalRegistry
Expand All @@ -61,12 +63,12 @@ func (reg *registry) Lookup(path ...string) (listener GenericListener, err error
}

// Register 将扩展点注册到注册中心
func (reg *registry) Register(listener GenericListener, path ...string) {
func (reg *registry) Register(listener GenericListener, spec SpecType, path ...string) {
// 注册
reg.store.Store(strings.Join(path, "/"), listener)
// 调用钩子
if reg.onRegistryHook != nil {
reg.onRegistryHook(listener, path...)
reg.onRegistryHook(listener, spec)
}
}

Expand Down Expand Up @@ -108,3 +110,23 @@ func (reg *registry) Dispatch(c *http.ExtendCallbackContext) (result x.Result, e

return listener.OnMessage(ctx, message)
}

type msgMeta struct {
Topic string `json:"topic"`
Event string `json:"event"`
}

func messageMetaFrom(data []byte) (topic string, event string, err error) {
// get topic
var meta msgMeta
err = codec.Json.Unmarshal(data, &meta)
if err != nil {
wlog.Errorf("[weimob_msg]: unmarshal message meta failed, payload: %s, err: %s", string(data), err)
return "", "", err
}
if meta.Topic == "" || meta.Event == "" {
err = x.Error("90400", fmt.Sprintf("消息Topic与Event不能为空,Topic=%s, Event=%s", meta.Topic, meta.Event))
return
}
return meta.Topic, meta.Event, nil
}
2 changes: 1 addition & 1 deletion pkg/sdk/msg/registry_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func RegistryRemote() (err error) {
for _, clientInfo := range clientInfoMap {
// 为多个 client 注册扩展点实现
for _, msgInfo := range msgRegistryInfo {
register := RegisterInfo{RegisterMsgInfo: msgInfo}
register := RegisterInfo{RegisterMsgInfo: msgInfo, InterfacePathVos: []any{}}
err = RegistryRemoteForClient(register, *clientInfo)
if err != nil {
return
Expand Down
32 changes: 32 additions & 0 deletions pkg/sdk/msg/registry_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,35 @@ func TestRegisterMsgServiceRemote(t *testing.T) {
So(err, ShouldBeNil)
})
}

func TestUnmarshalMessageMeta(t *testing.T) {
Convey("Unmarshal message meta", t, func(c C) {
good := `{"topic":"foo","event":"bar"}`
bad := `{"topic":"foo","event":"bar"`
onlyTopic := `{"topic":"foo"}`
onlyOnlyEvent := `{"event":"bar"}`

topic, event, err := messageMetaFrom([]byte(good))
So(err, ShouldBeNil)
So(topic, ShouldEqual, "foo")
So(event, ShouldEqual, "bar")

topic, event, err = messageMetaFrom([]byte(bad))
So(err, ShouldNotBeNil)
_, _ = c.Println(err)
So(topic, ShouldBeEmpty)
So(event, ShouldBeEmpty)

topic, event, err = messageMetaFrom([]byte(onlyTopic))
So(err, ShouldNotBeNil)
_, _ = c.Println(err)
So(topic, ShouldBeEmpty)
So(event, ShouldBeEmpty)

topic, event, err = messageMetaFrom([]byte(onlyOnlyEvent))
So(err, ShouldNotBeNil)
_, _ = c.Println(err)
So(topic, ShouldBeEmpty)
So(event, ShouldBeEmpty)
})
}
41 changes: 16 additions & 25 deletions pkg/sdk/msg/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,10 @@ import (
"context"
"github.com/weimob-tech/go-project-base/pkg/wlog"

"github.com/weimob-tech/go-project-base/pkg/codec"
"github.com/weimob-tech/go-project-base/pkg/http"
"github.com/weimob-tech/go-project-base/pkg/x"
)

const (
_topic = "topic"
_event = "event"
)

type Dispatcher func(ctx context.Context, payload []byte) (x.Result, error)

// Service 提供扩展点基础服务,例如鉴权、加解密、调用链等
Expand All @@ -24,16 +18,6 @@ type Service struct {
config *Config
}

// XinyunService 提供扩展点基础服务,例如鉴权、加解密、调用链等
type XinyunService struct {
Service
}

// WosService 提供扩展点基础服务,例如鉴权、加解密、调用链等
type WosService struct {
Service
}

func (service *Service) Setup() (err error) {
config := service.InitConfig()

Expand All @@ -55,15 +39,22 @@ func (service *Service) InitWithOptions(config *Config) error {
return nil
}

func messageMetaFrom(data []byte) (topic string, event string, err error) {
// get topic
topic, err = codec.Json.GetString(data, _topic)
if err != nil {
return
}
// get event
event, err = codec.Json.GetString(data, _event)
return
// XinyunService 提供扩展点基础服务,例如鉴权、加解密、调用链等
type XinyunService struct {
Service
}

func (service *XinyunService) Register(listener GenericListener, path ...string) {
service.registry.Register(listener, SpecTypeXinyun, path...)
}

// WosService 提供扩展点基础服务,例如鉴权、加解密、调用链等
type WosService struct {
Service
}

func (service *WosService) Register(listener GenericListener, path ...string) {
service.registry.Register(listener, SpecTypeWos, path...)
}

func NewWosCallbackConfig() *http.ExtendCallbackConfig {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sdk/spi/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ var SuccessCode = InvocationCode{
}

type InvocationResponse[T any] struct {
Code InvocationCode
Data *T
Code InvocationCode `json:"code,omitempty"`
Data *T `json:"data,omitempty"`
}

func Ok[T any](v *T) InvocationResponse[T] {
Expand Down

0 comments on commit a4f9d82

Please sign in to comment.