diff --git a/go.mod b/go.mod index febfa2a..4e06d42 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,14 @@ go 1.18 require ( github.com/smartystreets/goconvey v1.7.2 - github.com/weimob-tech/go-project-base v0.0.0-20230330065218-51c9a4de86ef + github.com/weimob-tech/go-project-base v0.0.0-20230331091325-ab23161af9f4 ) require ( github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/smartystreets/assertions v1.2.0 // indirect ) diff --git a/go.sum b/go.sum index 4a2e48b..71e0c8e 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,28 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= -github.com/weimob-tech/go-project-base v0.0.0-20230330065218-51c9a4de86ef h1:xvNdk3idCzzIzrc/q6gS9JBd0H4pHft9sWEm+OetTEI= -github.com/weimob-tech/go-project-base v0.0.0-20230330065218-51c9a4de86ef/go.mod h1:VwOcxZJgqvjidGISxdIpFOrbCDJeix9oxqJnSo67WF4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/weimob-tech/go-project-base v0.0.0-20230331091325-ab23161af9f4 h1:7emiC8uVU9OFmsyix1sAdKYwVBT6w3w8Vq0tkV/MGv4= +github.com/weimob-tech/go-project-base v0.0.0-20230331091325-ab23161af9f4/go.mod h1:T3h4JjbG6L7x88RY4bjEjw3mEiClcynm4b7m1+75MEI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/sdk/api/client.go b/pkg/sdk/api/client.go index 4eceed4..2b49f8c 100644 --- a/pkg/sdk/api/client.go +++ b/pkg/sdk/api/client.go @@ -71,7 +71,7 @@ func (client *Client) DoAction(request RpcRequest, response RpcResponse) (err er case RPCTypeMultiPart: return client.doMultipartAction(request, response) default: - return NewRpcError(90500, "unknown rpc type") + return NewRpcError("90500", "unknown rpc type") } } @@ -81,7 +81,7 @@ func (client *Client) doJsonAction(request RpcRequest, response RpcResponse) (er payload, err := codec.Json.Marshal(request) if err != nil { - return NewRpcError(90500, fmt.Sprintf("json marshal error, %s", err.Error())) + return NewRpcError("90500", fmt.Sprintf("json marshal error, %s", err.Error())) } req.SetBody(payload) diff --git a/pkg/sdk/api/error.go b/pkg/sdk/api/error.go index 11239b4..380569f 100644 --- a/pkg/sdk/api/error.go +++ b/pkg/sdk/api/error.go @@ -3,7 +3,7 @@ package api import "fmt" type RpcError struct { - Errcode int `json:"errcode,omitempty"` + Errcode string `json:"errcode,omitempty"` Errmsg string `json:"errmsg,omitempty"` } @@ -11,6 +11,6 @@ func (r RpcError) Error() string { return fmt.Sprintf("RpcError{errcode: %d, errmsg: %s}", r.Errcode, r.Errmsg) } -func NewRpcError(code int, msg string) error { +func NewRpcError(code, msg string) error { return &RpcError{code, msg} } diff --git a/pkg/sdk/api/response.go b/pkg/sdk/api/response.go index 9b7396f..a03a167 100644 --- a/pkg/sdk/api/response.go +++ b/pkg/sdk/api/response.go @@ -13,7 +13,7 @@ type RpcResponse interface { } type Code struct { - Errcode int `json:"errcode,omitempty"` + Errcode string `json:"errcode,omitempty"` Errmsg string `json:"errmsg,omitempty"` } @@ -27,7 +27,6 @@ type BaseResponse[T any] struct { httpStatus int httpContentBytes []byte Data *T - // httpHeaders map[string][]string } func CreateResponse[T any](data *T) (response *BaseResponse[T]) { @@ -61,7 +60,7 @@ func (response *BaseResponse[T]) IsSuccess() bool { } func (response *BaseResponse[T]) GetData() (data *T, err error) { - if response.Code.Errcode == 0 { + if response.Code.Errcode == "0" { data = response.Data return } diff --git a/pkg/sdk/msg/config.go b/pkg/sdk/msg/config.go index 915ef7a..c245a64 100644 --- a/pkg/sdk/msg/config.go +++ b/pkg/sdk/msg/config.go @@ -5,7 +5,7 @@ package msg // OnRegistryHook 注册扩展点时的回调方法 type Config struct { ListenerNotFound func(path ...string) (GenericListener, error) - OnRegistryHook func(listener GenericListener, path ...string) + OnRegistryHook func(listener GenericListener, specType SpecType) } func NewConfig() *Config { diff --git a/pkg/sdk/msg/registry.go b/pkg/sdk/msg/registry.go index 810c785..d65e5a3 100644 --- a/pkg/sdk/msg/registry.go +++ b/pkg/sdk/msg/registry.go @@ -1,6 +1,7 @@ package msg import ( + "fmt" "strings" "sync" @@ -13,28 +14,28 @@ import ( var ( registryInitialize sync.Once globalRegistry *registry - msgRegistryInfo []RegisterMsgInfo + msgRegistryInfo = map[SpecType]RegisterMsgInfo{} defaultListenerNotfound = func(path ...string) (GenericListener, error) { // 返回空,不抛异常,开放平台会不断重试并将消息放入失败队列 return nil, nil } - defaultOnRegistryHook = func(listener GenericListener, path ...string) { + defaultOnRegistryHook = func(listener GenericListener, specType SpecType) { // 注册到扩展点实现注册中心 - msgRegistryInfo = append(msgRegistryInfo, RegisterMsgInfo{ - ClientId: "", - HostAddress: "", - Path: "", - SpecsType: 0, - }) + if _, ok := msgRegistryInfo[specType]; !ok { + msgRegistryInfo[specType] = RegisterMsgInfo{ + Path: fmt.Sprintf("weimob/cloud/%s/message/receive", specType.String()), + SpecsType: specType, + } + } } ) // 提供扩展点服务注册查找等生命周期管理 type registry struct { store sync.Map - onRegistryHook func(listener GenericListener, path ...string) + onRegistryHook func(listener GenericListener, specType SpecType) listenerNotFound func(path ...string) (GenericListener, error) } @@ -42,6 +43,7 @@ func initRegistry(config *Config) *registry { registryInitialize.Do(func() { globalRegistry = ®istry{ listenerNotFound: config.ListenerNotFound, + onRegistryHook: config.OnRegistryHook, } }) return globalRegistry @@ -61,12 +63,12 @@ func (reg *registry) Lookup(path ...string) (listener GenericListener, err error } // Register 将扩展点注册到注册中心 -func (reg *registry) Register(listener GenericListener, path ...string) { +func (reg *registry) Register(listener GenericListener, spec SpecType, path ...string) { // 注册 reg.store.Store(strings.Join(path, "/"), listener) // 调用钩子 if reg.onRegistryHook != nil { - reg.onRegistryHook(listener, path...) + reg.onRegistryHook(listener, spec) } } @@ -108,3 +110,23 @@ func (reg *registry) Dispatch(c *http.ExtendCallbackContext) (result x.Result, e return listener.OnMessage(ctx, message) } + +type msgMeta struct { + Topic string `json:"topic"` + Event string `json:"event"` +} + +func messageMetaFrom(data []byte) (topic string, event string, err error) { + // get topic + var meta msgMeta + err = codec.Json.Unmarshal(data, &meta) + if err != nil { + wlog.Errorf("[weimob_msg]: unmarshal message meta failed, payload: %s, err: %s", string(data), err) + return "", "", err + } + if meta.Topic == "" || meta.Event == "" { + err = x.Error("90400", fmt.Sprintf("消息Topic与Event不能为空,Topic=%s, Event=%s", meta.Topic, meta.Event)) + return + } + return meta.Topic, meta.Event, nil +} diff --git a/pkg/sdk/msg/registry_remote.go b/pkg/sdk/msg/registry_remote.go index c468b7a..7b6e7a5 100644 --- a/pkg/sdk/msg/registry_remote.go +++ b/pkg/sdk/msg/registry_remote.go @@ -83,7 +83,7 @@ func RegistryRemote() (err error) { for _, clientInfo := range clientInfoMap { // 为多个 client 注册扩展点实现 for _, msgInfo := range msgRegistryInfo { - register := RegisterInfo{RegisterMsgInfo: msgInfo} + register := RegisterInfo{RegisterMsgInfo: msgInfo, InterfacePathVos: []any{}} err = RegistryRemoteForClient(register, *clientInfo) if err != nil { return diff --git a/pkg/sdk/msg/registry_remote_test.go b/pkg/sdk/msg/registry_remote_test.go index 9214720..6780ed1 100644 --- a/pkg/sdk/msg/registry_remote_test.go +++ b/pkg/sdk/msg/registry_remote_test.go @@ -31,3 +31,35 @@ func TestRegisterMsgServiceRemote(t *testing.T) { So(err, ShouldBeNil) }) } + +func TestUnmarshalMessageMeta(t *testing.T) { + Convey("Unmarshal message meta", t, func(c C) { + good := `{"topic":"foo","event":"bar"}` + bad := `{"topic":"foo","event":"bar"` + onlyTopic := `{"topic":"foo"}` + onlyOnlyEvent := `{"event":"bar"}` + + topic, event, err := messageMetaFrom([]byte(good)) + So(err, ShouldBeNil) + So(topic, ShouldEqual, "foo") + So(event, ShouldEqual, "bar") + + topic, event, err = messageMetaFrom([]byte(bad)) + So(err, ShouldNotBeNil) + _, _ = c.Println(err) + So(topic, ShouldBeEmpty) + So(event, ShouldBeEmpty) + + topic, event, err = messageMetaFrom([]byte(onlyTopic)) + So(err, ShouldNotBeNil) + _, _ = c.Println(err) + So(topic, ShouldBeEmpty) + So(event, ShouldBeEmpty) + + topic, event, err = messageMetaFrom([]byte(onlyOnlyEvent)) + So(err, ShouldNotBeNil) + _, _ = c.Println(err) + So(topic, ShouldBeEmpty) + So(event, ShouldBeEmpty) + }) +} diff --git a/pkg/sdk/msg/service.go b/pkg/sdk/msg/service.go index e440a91..6eedcf4 100644 --- a/pkg/sdk/msg/service.go +++ b/pkg/sdk/msg/service.go @@ -4,16 +4,10 @@ import ( "context" "github.com/weimob-tech/go-project-base/pkg/wlog" - "github.com/weimob-tech/go-project-base/pkg/codec" "github.com/weimob-tech/go-project-base/pkg/http" "github.com/weimob-tech/go-project-base/pkg/x" ) -const ( - _topic = "topic" - _event = "event" -) - type Dispatcher func(ctx context.Context, payload []byte) (x.Result, error) // Service 提供扩展点基础服务,例如鉴权、加解密、调用链等 @@ -24,16 +18,6 @@ type Service struct { config *Config } -// XinyunService 提供扩展点基础服务,例如鉴权、加解密、调用链等 -type XinyunService struct { - Service -} - -// WosService 提供扩展点基础服务,例如鉴权、加解密、调用链等 -type WosService struct { - Service -} - func (service *Service) Setup() (err error) { config := service.InitConfig() @@ -55,15 +39,22 @@ func (service *Service) InitWithOptions(config *Config) error { return nil } -func messageMetaFrom(data []byte) (topic string, event string, err error) { - // get topic - topic, err = codec.Json.GetString(data, _topic) - if err != nil { - return - } - // get event - event, err = codec.Json.GetString(data, _event) - return +// XinyunService 提供扩展点基础服务,例如鉴权、加解密、调用链等 +type XinyunService struct { + Service +} + +func (service *XinyunService) Register(listener GenericListener, path ...string) { + service.registry.Register(listener, SpecTypeXinyun, path...) +} + +// WosService 提供扩展点基础服务,例如鉴权、加解密、调用链等 +type WosService struct { + Service +} + +func (service *WosService) Register(listener GenericListener, path ...string) { + service.registry.Register(listener, SpecTypeWos, path...) } func NewWosCallbackConfig() *http.ExtendCallbackConfig { diff --git a/pkg/sdk/spi/response.go b/pkg/sdk/spi/response.go index 0ce8f7f..8b20e7e 100644 --- a/pkg/sdk/spi/response.go +++ b/pkg/sdk/spi/response.go @@ -11,8 +11,8 @@ var SuccessCode = InvocationCode{ } type InvocationResponse[T any] struct { - Code InvocationCode - Data *T + Code InvocationCode `json:"code,omitempty"` + Data *T `json:"data,omitempty"` } func Ok[T any](v *T) InvocationResponse[T] {