From b317ee0b490cdb13b4430d5378683860aae912d9 Mon Sep 17 00:00:00 2001 From: foghost Date: Tue, 19 Mar 2024 09:53:29 +0800 Subject: [PATCH 1/5] dynamic Router and Cluster from registry center --- .../registry/zookeeper/service_listener.go | 15 +++--- .../adapter/dubboregistry/registrycenter.go | 50 +++++++++++++------ pixiu/pkg/model/router.go | 13 +++++ pixiu/pkg/server/api_config_manager.go | 6 +-- pixiu/pkg/server/router_manager.go | 3 ++ 5 files changed, 60 insertions(+), 27 deletions(-) diff --git a/pixiu/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go b/pixiu/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go index 93d874486..66cdb22c1 100644 --- a/pixiu/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go +++ b/pixiu/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go @@ -18,7 +18,6 @@ package zookeeper import ( - "strings" "sync" "time" ) @@ -33,7 +32,6 @@ import ( common2 "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/common" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/registry" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/remoting/zookeeper" - "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger" ) @@ -128,16 +126,15 @@ func (zkl *serviceListener) waitEventAndHandlePeriod(children []string, e <-chan // whenever it is called, the children node changed and refresh the api configuration. func (zkl *serviceListener) handleEvent() { + // get all children of provider, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService/providers children, err := zkl.client.GetChildren(zkl.path) if err != nil { - // disable the API - bkConf, methods, _, _ := registry.ParseDubboString(zkl.url.String()) + // disable the service all methods + bkConf, _, _, _ := registry.ParseDubboString(zkl.url.String()) apiPattern := registry.GetAPIPattern(bkConf) - for i := range methods { - path := strings.Join([]string{apiPattern, methods[i]}, constant.PathSlash) - if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: path}); err != nil { - logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), path) - } + // delete all config of an interface, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService + if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: apiPattern}); err != nil { + logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), apiPattern) } return } diff --git a/pixiu/pkg/adapter/dubboregistry/registrycenter.go b/pixiu/pkg/adapter/dubboregistry/registrycenter.go index 7b6472749..724214c53 100644 --- a/pixiu/pkg/adapter/dubboregistry/registrycenter.go +++ b/pixiu/pkg/adapter/dubboregistry/registrycenter.go @@ -19,11 +19,8 @@ package dubboregistry import ( "os" -) - -import ( - "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" - "github.com/dubbo-go-pixiu/pixiu-api/pkg/router" + "strconv" + "strings" ) import ( @@ -35,6 +32,9 @@ import ( "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/server" + + "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" + "github.com/dubbo-go-pixiu/pixiu-api/pkg/router" ) func init() { @@ -65,19 +65,19 @@ func (p Plugin) Kind() string { func (p *Plugin) CreateAdapter(a *model.Adapter) (adapter.Adapter, error) { adapter := &Adapter{id: a.ID, registries: make(map[string]registry.Registry), - cfg: AdaptorConfig{Registries: make(map[string]model.Registry)}} + cfg: &AdaptorConfig{Registries: make(map[string]model.Registry)}} return adapter, nil } // Adapter to monitor dubbo services on registry center type Adapter struct { id string - cfg AdaptorConfig + cfg *AdaptorConfig registries map[string]registry.Registry } // Start starts the adaptor -func (a Adapter) Start() { +func (a *Adapter) Start() { for _, reg := range a.registries { if err := reg.Subscribe(); err != nil { logger.Errorf("Subscribe fail, error is {%s}", err.Error()) @@ -113,21 +113,41 @@ func (a *Adapter) Apply() error { } // Config returns the config of the adaptor -func (a Adapter) Config() interface{} { +func (a *Adapter) Config() interface{} { return a.cfg } func (a *Adapter) OnAddAPI(r router.API) error { - acm := server.GetApiConfigManager() - return acm.AddAPI(a.id, r) + ipPort := strings.Split(r.IntegrationRequest.URL, ":") + port, err := strconv.Atoi(ipPort[1]) + if err != nil { + return err + } + cluster := strings.Join([]string{r.ApplicationName, r.Interface, r.Version, r.Group}, constant.PathSlash) + server.GetClusterManager().SetEndpoint(cluster, &model.Endpoint{ + ID: r.IntegrationRequest.URL, + Address: model.SocketAddress{ + Address: ipPort[0], + Port: port, + }}, + ) + prefix := strings.Join([]string{"/" + r.ApplicationName, r.Interface}, constant.PathSlash) + match := model.RouterMatch{Prefix: prefix, Methods: []string{string(r.HTTPVerb)}} + route := model.RouteAction{Cluster: cluster} + added := &model.Router{ID: r.URLPattern, Match: match, Route: route} + server.GetRouterManager().AddRouter(added) + return server.GetApiConfigManager().AddAPI(a.id, r) } func (a *Adapter) OnRemoveAPI(r router.API) error { - acm := server.GetApiConfigManager() - return acm.RemoveAPI(a.id, r) + cluster := strings.Join([]string{r.ApplicationName, r.Interface, r.Version, r.Group}, constant.PathSlash) + server.GetClusterManager().DeleteEndpoint(cluster, r.IntegrationRequest.URL) + return server.GetApiConfigManager().RemoveAPI(a.id, r) } func (a *Adapter) OnDeleteRouter(r config.Resource) error { - acm := server.GetApiConfigManager() - return acm.DeleteRouter(a.id, r) + empty := &model.ClusterConfig{Name: r.Path, LbStr: model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{}} + server.GetClusterManager().UpdateCluster(empty) + server.GetRouterManager().DeleteRouter(&model.Router{Match: model.RouterMatch{Prefix: r.Path}}) + return server.GetApiConfigManager().DeleteRouter(a.id, r) } diff --git a/pixiu/pkg/model/router.go b/pixiu/pkg/model/router.go index ce41669de..a33dfa867 100644 --- a/pixiu/pkg/model/router.go +++ b/pixiu/pkg/model/router.go @@ -20,6 +20,7 @@ package model import ( stdHttp "net/http" "regexp" + "strings" ) import ( @@ -141,3 +142,15 @@ func (hm *HeaderMatcher) SetValueRegex(regex string) error { hm.Regex = false return err } + +func (r *Router) String() string { + var builder strings.Builder + builder.WriteString("[" + strings.Join(r.Match.Methods, ",") + "] ") + if r.Match.Prefix != "" { + builder.WriteString("prefix " + r.Match.Prefix) + } else { + builder.WriteString("path " + r.Match.Path) + } + builder.WriteString(" to cluster " + r.Route.Cluster) + return builder.String() +} diff --git a/pixiu/pkg/server/api_config_manager.go b/pixiu/pkg/server/api_config_manager.go index 616e555fb..b656b6fda 100644 --- a/pixiu/pkg/server/api_config_manager.go +++ b/pixiu/pkg/server/api_config_manager.go @@ -58,10 +58,10 @@ func (acm *ApiConfigManager) AddApiConfigListener(adapterID string, l ApiConfigL func (acm *ApiConfigManager) AddAPI(adapterID string, r router.API) error { l, existed := acm.als[adapterID] - if !existed { - return errors.Errorf("no listener found") + if existed { + return l.OnAddAPI(r) } - return l.OnAddAPI(r) + return nil } func (acm *ApiConfigManager) RemoveAPI(adapterID string, r router.API) error { diff --git a/pixiu/pkg/server/router_manager.go b/pixiu/pkg/server/router_manager.go index 1b1d4f734..65d7444be 100644 --- a/pixiu/pkg/server/router_manager.go +++ b/pixiu/pkg/server/router_manager.go @@ -18,6 +18,7 @@ package server import ( + "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model" ) @@ -42,12 +43,14 @@ func (rm *RouterManager) AddRouterListener(l RouterListener) { } func (rm *RouterManager) AddRouter(r *model.Router) { + logger.Infof("add router: %v", r) for _, l := range rm.rls { l.OnAddRouter(r) } } func (rm *RouterManager) DeleteRouter(r *model.Router) { + logger.Infof("del router: %v", r) for _, l := range rm.rls { l.OnDeleteRouter(r) } From c58d42eaf230f2e4850c1376ccd4dbaa8152ec32 Mon Sep 17 00:00:00 2001 From: foghost Date: Sun, 24 Mar 2024 09:35:32 +0800 Subject: [PATCH 2/5] change cluster identity to ApplicationName+Interface+Method+Version+Group change path to /ApplicationName/Interface/Method --- .../dubboregistry/registry/registry.go | 6 +----- .../adapter/dubboregistry/registrycenter.go | 20 ++++++++++--------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/pixiu/pkg/adapter/dubboregistry/registry/registry.go b/pixiu/pkg/adapter/dubboregistry/registry/registry.go index 3d8d3b12c..7ddf918e8 100644 --- a/pixiu/pkg/adapter/dubboregistry/registry/registry.go +++ b/pixiu/pkg/adapter/dubboregistry/registry/registry.go @@ -134,11 +134,7 @@ func ParseDubboString(urlString string) (config.DubboBackendConfig, []string, st // GetAPIPattern generate the API path pattern. /application/interface/version func GetAPIPattern(bkConfig config.DubboBackendConfig) string { - if bkConfig.Version == "" { - // if the version is empty, make sure the url path is valid. - return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface}, constant.PathSlash) - } - return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface, bkConfig.Version}, constant.PathSlash) + return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface}, constant.PathSlash) } func GetRouter() model.Router { diff --git a/pixiu/pkg/adapter/dubboregistry/registrycenter.go b/pixiu/pkg/adapter/dubboregistry/registrycenter.go index 724214c53..5ab6c4739 100644 --- a/pixiu/pkg/adapter/dubboregistry/registrycenter.go +++ b/pixiu/pkg/adapter/dubboregistry/registrycenter.go @@ -123,7 +123,7 @@ func (a *Adapter) OnAddAPI(r router.API) error { if err != nil { return err } - cluster := strings.Join([]string{r.ApplicationName, r.Interface, r.Version, r.Group}, constant.PathSlash) + cluster := getClusterName(r) server.GetClusterManager().SetEndpoint(cluster, &model.Endpoint{ ID: r.IntegrationRequest.URL, Address: model.SocketAddress{ @@ -131,23 +131,25 @@ func (a *Adapter) OnAddAPI(r router.API) error { Port: port, }}, ) - prefix := strings.Join([]string{"/" + r.ApplicationName, r.Interface}, constant.PathSlash) - match := model.RouterMatch{Prefix: prefix, Methods: []string{string(r.HTTPVerb)}} + path := strings.Join([]string{r.ApplicationName, r.Interface, r.Method.Method}, constant.PathSlash) + match := model.RouterMatch{Path: path, Methods: []string{string(r.HTTPVerb)}} route := model.RouteAction{Cluster: cluster} - added := &model.Router{ID: r.URLPattern, Match: match, Route: route} + added := &model.Router{ID: path, Match: match, Route: route} server.GetRouterManager().AddRouter(added) return server.GetApiConfigManager().AddAPI(a.id, r) } func (a *Adapter) OnRemoveAPI(r router.API) error { - cluster := strings.Join([]string{r.ApplicationName, r.Interface, r.Version, r.Group}, constant.PathSlash) + cluster := getClusterName(r) server.GetClusterManager().DeleteEndpoint(cluster, r.IntegrationRequest.URL) return server.GetApiConfigManager().RemoveAPI(a.id, r) } func (a *Adapter) OnDeleteRouter(r config.Resource) error { - empty := &model.ClusterConfig{Name: r.Path, LbStr: model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{}} - server.GetClusterManager().UpdateCluster(empty) - server.GetRouterManager().DeleteRouter(&model.Router{Match: model.RouterMatch{Prefix: r.Path}}) - return server.GetApiConfigManager().DeleteRouter(a.id, r) + acm := server.GetApiConfigManager() + return acm.DeleteRouter(a.id, r) +} + +func getClusterName(r router.API) string { + return strings.Join([]string{r.ApplicationName, r.Interface, r.Method.Method, r.Version, r.Group}, constant.PathSlash) } From 3d44f85c824814fdbfd033ac8a40d9ac22a80378 Mon Sep 17 00:00:00 2001 From: foghost Date: Fri, 17 May 2024 11:26:23 +0800 Subject: [PATCH 3/5] add RegisteredType to registry --- .../registry/base/baseregistry.go | 4 +- .../dubboregistry/registry/nacos/registry.go | 7 ++-- .../dubboregistry/registry/registry.go | 18 +++++++- .../zookeeper/application_service_listener.go | 42 +++++++++---------- .../registry/zookeeper/registry.go | 24 +++++++---- .../registry/zookeeper/service_listener.go | 1 - pkg/adapter/dubboregistry/registrycenter.go | 20 +++++---- pkg/common/constant/key.go | 3 -- pkg/model/cluster.go | 15 +++---- 9 files changed, 80 insertions(+), 54 deletions(-) diff --git a/pkg/adapter/dubboregistry/registry/base/baseregistry.go b/pkg/adapter/dubboregistry/registry/base/baseregistry.go index 731312fc9..19370f48c 100644 --- a/pkg/adapter/dubboregistry/registry/base/baseregistry.go +++ b/pkg/adapter/dubboregistry/registry/base/baseregistry.go @@ -71,13 +71,15 @@ func (s *SvcListeners) GetAllListener() map[string]registry.Listener { } type BaseRegistry struct { + RegisteredType registry.RegisteredType svcListeners *SvcListeners facadeRegistry FacadeRegistry AdapterListener common.RegistryEventListener } -func NewBaseRegistry(facade FacadeRegistry, adapterListener common.RegistryEventListener) *BaseRegistry { +func NewBaseRegistry(facade FacadeRegistry, adapterListener common.RegistryEventListener, registerType registry.RegisteredType) *BaseRegistry { return &BaseRegistry{ + RegisteredType: registerType, facadeRegistry: facade, svcListeners: &SvcListeners{ listeners: make(map[string]registry.Listener), diff --git a/pkg/adapter/dubboregistry/registry/nacos/registry.go b/pkg/adapter/dubboregistry/registry/nacos/registry.go index 8d1356a25..5ef48a8c8 100644 --- a/pkg/adapter/dubboregistry/registry/nacos/registry.go +++ b/pkg/adapter/dubboregistry/registry/nacos/registry.go @@ -45,7 +45,7 @@ type NacosRegistry struct { } func (n *NacosRegistry) DoSubscribe() error { - intfListener, ok := n.nacosListeners[registry.RegisteredTypeInterface] + intfListener, ok := n.nacosListeners[n.RegisteredType] if !ok { return errors.New("Listener for interface level registration does not initialized") } @@ -91,9 +91,10 @@ func newNacosRegistry(regConfig model.Registry, adapterListener common.RegistryE client: client, nacosListeners: make(map[registry.RegisteredType]registry.Listener), } - nacosRegistry.nacosListeners[registry.RegisteredTypeInterface] = newNacosIntfListener(client, nacosRegistry, ®Config, adapterListener) + nacosRegistry.BaseRegistry = baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType)) + nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = newNacosIntfListener(client, nacosRegistry, ®Config, adapterListener) - baseReg := baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener) + baseReg := baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType)) nacosRegistry.BaseRegistry = baseReg return baseReg, nil } diff --git a/pkg/adapter/dubboregistry/registry/registry.go b/pkg/adapter/dubboregistry/registry/registry.go index e1864e2e8..74dc48abc 100644 --- a/pkg/adapter/dubboregistry/registry/registry.go +++ b/pkg/adapter/dubboregistry/registry/registry.go @@ -37,15 +37,31 @@ import ( type RegisteredType int8 +var RegisteredTypes = []string{"application", "interface"} + const ( RegisteredTypeApplication RegisteredType = iota RegisteredTypeInterface + + RegisteredTypeApplicationName = "application" + RegisteredTypeInterfaceName = "interface" ) var registryMap = make(map[string]func(model.Registry, common2.RegistryEventListener) (Registry, error), 8) func (t *RegisteredType) String() string { - return []string{"application", "interface"}[*t] + return RegisteredTypes[*t] +} + +func RegisterTypeFromName(name string) RegisteredType { + switch name { + case RegisteredTypeApplicationName: + return RegisteredTypeApplication + case RegisteredTypeInterfaceName: + return RegisteredTypeInterface + default: + return RegisteredTypeInterface + } } // Registry interface defines the basic features of a registry diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go b/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go index aeb5fc603..22ff1b6f3 100644 --- a/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go +++ b/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go @@ -18,6 +18,7 @@ package zookeeper import ( + "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery" "encoding/json" "fmt" "strings" @@ -27,7 +28,7 @@ import ( import ( dubboCommon "dubbo.apache.org/dubbo-go/v3/common" - ex "dubbo.apache.org/dubbo-go/v3/common/extension" + dubboConst "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/metadata/definition" dr "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery" @@ -154,8 +155,7 @@ func (asl *applicationServiceListener) handleEvent(children []string) { } methods, err := asl.getMethods(bkConfig.Interface) if err != nil { - logger.Warnf("Get methods of interface %s failed; due to %s", bkConfig.Interface, err.Error()) - continue + logger.Warnf("Get methods of interface %s failed; use prefix pattern to match url, due to %s", bkConfig.Interface, err.Error()) } apiPattern := registry.GetAPIPattern(bkConfig) @@ -169,8 +169,16 @@ func (asl *applicationServiceListener) handleEvent(children []string) { MapTo: "opt.types", }, } - for i := range methods { - api := registry.CreateAPIConfig(apiPattern, location, bkConfig, methods[i], mappingParams) + if methods != nil && len(methods) != 0 { + for i := range methods { + api := registry.CreateAPIConfig(apiPattern, location, bkConfig, methods[i], mappingParams) + if err := asl.adapterListener.OnAddAPI(api); err != nil { + logger.Errorf("Error={%s} happens when try to add api %s", err.Error(), api.Path) + } + } + } else { + // can't fetch methods, use http prefix pattern + api := registry.CreateAPIConfig(apiPattern, location, bkConfig, constant.AnyValue, mappingParams) if err := asl.adapterListener.OnAddAPI(api); err != nil { logger.Errorf("Error={%s} happens when try to add api %s", err.Error(), api.Path) } @@ -197,26 +205,16 @@ func (asl *applicationServiceListener) getUrls(path string) []*dubboCommon.URL { instance := toZookeeperInstance(iss) metaData := instance.GetMetadata() - metadataStorageType, ok := metaData[constant.MetadataStorageTypeKey] - if !ok { - metadataStorageType = constant.DefaultMetadataStorageType - } - // get metadata service proxy factory according to the metadataStorageType - proxyFactory := ex.GetMetadataServiceProxyFactory(metadataStorageType) - if proxyFactory == nil { - return nil - } - metadataService := proxyFactory.GetProxy(instance) - if metadataService == nil { - logger.Warnf("Get metadataService of instance %s failed", instance) - return nil - } - // call GetExportedURLs to get the exported urls - urls, err := metadataService.GetExportedURLs(constant.AnyValue, constant.AnyValue, constant.AnyValue, constant.AnyValue) + metadataInfo, err := servicediscovery.GetMetadataInfo(instance.GetServiceName(), instance, metaData[dubboConst.ExportedServicesRevisionPropertyName]) if err != nil { - logger.Errorf("Get exported urls of instance %s failed; due to %s", instance, err.Error()) + logger.Errorf("get instance %s metadata info error %v", insPath, err.Error()) return nil } + instance.SetServiceMetadata(metadataInfo) + urls := make([]*dubboCommon.URL, 0) + for _, service := range metadataInfo.Services { + urls = append(urls, instance.ToURLs(service)...) + } return urls } diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/registry.go b/pkg/adapter/dubboregistry/registry/zookeeper/registry.go index 1a2ff1860..b8c90c1e3 100644 --- a/pkg/adapter/dubboregistry/registry/zookeeper/registry.go +++ b/pkg/adapter/dubboregistry/registry/zookeeper/registry.go @@ -18,6 +18,7 @@ package zookeeper import ( + hessian "github.com/apache/dubbo-go-hessian2" "strings" "time" ) @@ -27,6 +28,7 @@ import ( ) import ( + dubboCommon "dubbo.apache.org/dubbo-go/v3/common" "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common" "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry" baseRegistry "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry/base" @@ -49,6 +51,9 @@ const ( func init() { registry.SetRegistry(constant.Zookeeper, newZKRegistry) + hessian.RegisterPOJO(&dubboCommon.MetadataInfo{}) + hessian.RegisterPOJO(&dubboCommon.ServiceInfo{}) + hessian.RegisterPOJO(&dubboCommon.URL{}) } type ZKRegistry struct { @@ -61,7 +66,7 @@ var _ registry.Registry = new(ZKRegistry) func newZKRegistry(regConfig model.Registry, adapterListener common.RegistryEventListener) (registry.Registry, error) { var zkReg = &ZKRegistry{} - baseReg := baseRegistry.NewBaseRegistry(zkReg, adapterListener) + baseReg := baseRegistry.NewBaseRegistry(zkReg, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType)) timeout, err := time.ParseDuration(regConfig.Timeout) if err != nil { return nil, errors.Errorf("Incorrect timeout configuration: %s", regConfig.Timeout) @@ -73,15 +78,16 @@ func newZKRegistry(regConfig model.Registry, adapterListener common.RegistryEven client.RegisterHandler(eventChan) zkReg.BaseRegistry = baseReg zkReg.client = client - initZKListeners(zkReg) + zkReg.zkListeners = make(map[registry.RegisteredType]registry.Listener) + switch zkReg.RegisteredType { + case registry.RegisteredTypeInterface: + zkReg.zkListeners[zkReg.RegisteredType] = newZKIntfListener(zkReg.client, zkReg, zkReg.AdapterListener) + case registry.RegisteredTypeApplication: + zkReg.zkListeners[zkReg.RegisteredType] = newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener) + } return zkReg, nil } -func initZKListeners(reg *ZKRegistry) { - reg.zkListeners = make(map[registry.RegisteredType]registry.Listener) - reg.zkListeners[registry.RegisteredTypeInterface] = newZKIntfListener(reg.client, reg, reg.AdapterListener) -} - func (r *ZKRegistry) GetClient() *zk.ZooKeeperClient { return r.client } @@ -96,7 +102,7 @@ func (r *ZKRegistry) DoSubscribe() error { // To subscribe service level service discovery func (r *ZKRegistry) interfaceSubscribe() error { - intfListener, ok := r.zkListeners[registry.RegisteredTypeInterface] + intfListener, ok := r.zkListeners[r.RegisteredType] if !ok { return errors.New("Listener for interface level registration does not initialized") } @@ -106,7 +112,7 @@ func (r *ZKRegistry) interfaceSubscribe() error { // DoUnsubscribe stops monitoring the target registry. func (r *ZKRegistry) DoUnsubscribe() error { - intfListener, ok := r.zkListeners[registry.RegisteredTypeInterface] + intfListener, ok := r.zkListeners[r.RegisteredType] if !ok { return errors.New("Listener for interface level registration does not initialized") } diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go b/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go index c6787a2e4..c6697426b 100644 --- a/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go +++ b/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go @@ -32,7 +32,6 @@ import ( common2 "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common" "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry" "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper" - "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/logger" ) diff --git a/pkg/adapter/dubboregistry/registrycenter.go b/pkg/adapter/dubboregistry/registrycenter.go index aadc15e45..ff5c93563 100644 --- a/pkg/adapter/dubboregistry/registrycenter.go +++ b/pkg/adapter/dubboregistry/registrycenter.go @@ -23,11 +23,6 @@ import ( "strings" ) -import ( - "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" - "github.com/dubbo-go-pixiu/pixiu-api/pkg/router" -) - import ( "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry" _ "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry/nacos" @@ -37,6 +32,9 @@ import ( "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" "github.com/apache/dubbo-go-pixiu/pkg/server" + + "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" + "github.com/dubbo-go-pixiu/pixiu-api/pkg/router" ) func init() { @@ -133,8 +131,16 @@ func (a *Adapter) OnAddAPI(r router.API) error { Port: port, }}, ) - path := strings.Join([]string{r.ApplicationName, r.Interface, r.Method.Method}, constant.PathSlash) - match := model.RouterMatch{Path: path, Methods: []string{string(r.HTTPVerb)}} + + var match model.RouterMatch + var path string + if r.DubboBackendConfig.Method == constant.AnyValue { + path = strings.Join([]string{r.ApplicationName, r.Interface}, constant.PathSlash) + match = model.RouterMatch{Prefix: path, Methods: []string{string(r.HTTPVerb)}} + } else { + path = strings.Join([]string{r.ApplicationName, r.Interface, r.Method.Method}, constant.PathSlash) + match = model.RouterMatch{Path: path, Methods: []string{string(r.HTTPVerb)}} + } route := model.RouteAction{Cluster: cluster} added := &model.Router{ID: path, Match: match, Route: route} server.GetRouterManager().AddRouter(added) diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go index 236415ca9..5de033fca 100644 --- a/pkg/common/constant/key.go +++ b/pkg/common/constant/key.go @@ -84,7 +84,4 @@ const ( NameKey = "name" // RetriesKey retry times RetriesKey = "retries" - // MetadataStorageTypeKey the storage type of metadata - MetadataStorageTypeKey = "dubbo.metadata.storage-type" - DefaultMetadataStorageType = "local" ) diff --git a/pkg/model/cluster.go b/pkg/model/cluster.go index 2040305e8..a16e699e2 100644 --- a/pkg/model/cluster.go +++ b/pkg/model/cluster.go @@ -74,13 +74,14 @@ type ( // so any modification to the config, should apply to both `pkg/client/dubbo/dubbo.go` // and `pkg\adapter\dubboregistry\registry` Registry struct { - Protocol string `default:"zookeeper" yaml:"protocol" json:"protocol"` - Timeout string `yaml:"timeout" json:"timeout"` - Address string `yaml:"address" json:"address"` - Username string `yaml:"username" json:"username"` - Password string `yaml:"password" json:"password"` - Group string `default:"DEFAULT_GROUP" yaml:"group" json:"group"` - Namespace string `yaml:"namespace" json:"namespace"` + Protocol string `default:"zookeeper" yaml:"protocol" json:"protocol"` + Timeout string `yaml:"timeout" json:"timeout"` + Address string `yaml:"address" json:"address"` + Username string `yaml:"username" json:"username"` + Password string `yaml:"password" json:"password"` + Group string `default:"DEFAULT_GROUP" yaml:"group" json:"group"` + Namespace string `yaml:"namespace" json:"namespace"` + RegistryType string `yaml:"registry_type" json:"registry_type"` // "application", "interface" } // DiscoveryType From fb700a9abb230e26dcd1184c006887b7fcb5832d Mon Sep 17 00:00:00 2001 From: foghost Date: Sat, 18 May 2024 21:04:26 +0800 Subject: [PATCH 4/5] fmt --- .../registry/zookeeper/application_service_listener.go | 10 +++++----- .../dubboregistry/registry/zookeeper/registry.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go b/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go index 22ff1b6f3..8ae17827d 100644 --- a/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go +++ b/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go @@ -18,7 +18,6 @@ package zookeeper import ( - "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery" "encoding/json" "fmt" "strings" @@ -31,17 +30,18 @@ import ( dubboConst "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/metadata/definition" dr "dubbo.apache.org/dubbo-go/v3/registry" + "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery" "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery" - "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" - "github.com/dubbogo/go-zookeeper/zk" -) -import ( "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common" "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry" "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper" "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/logger" + + "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" + + "github.com/dubbogo/go-zookeeper/zk" ) var _ registry.Listener = new(applicationServiceListener) diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/registry.go b/pkg/adapter/dubboregistry/registry/zookeeper/registry.go index b8c90c1e3..67f11e4bb 100644 --- a/pkg/adapter/dubboregistry/registry/zookeeper/registry.go +++ b/pkg/adapter/dubboregistry/registry/zookeeper/registry.go @@ -18,23 +18,23 @@ package zookeeper import ( - hessian "github.com/apache/dubbo-go-hessian2" "strings" "time" ) -import ( - "github.com/pkg/errors" -) - import ( dubboCommon "dubbo.apache.org/dubbo-go/v3/common" + + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common" "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry" baseRegistry "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry/base" zk "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper" "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/model" + + "github.com/pkg/errors" ) var ( From e81bf83a98df1e84799652d2c266fc2c86af78a7 Mon Sep 17 00:00:00 2001 From: foghost Date: Mon, 23 Sep 2024 18:45:53 +0800 Subject: [PATCH 5/5] add check for registry type --- .../dubboregistry/registry/nacos/registry.go | 14 +++++++++----- .../dubboregistry/registry/zookeeper/registry.go | 2 ++ pkg/model/cluster.go | 2 +- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/adapter/dubboregistry/registry/nacos/registry.go b/pkg/adapter/dubboregistry/registry/nacos/registry.go index 5ef48a8c8..72eb15f4b 100644 --- a/pkg/adapter/dubboregistry/registry/nacos/registry.go +++ b/pkg/adapter/dubboregistry/registry/nacos/registry.go @@ -92,9 +92,13 @@ func newNacosRegistry(regConfig model.Registry, adapterListener common.RegistryE nacosListeners: make(map[registry.RegisteredType]registry.Listener), } nacosRegistry.BaseRegistry = baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType)) - nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = newNacosIntfListener(client, nacosRegistry, ®Config, adapterListener) - - baseReg := baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType)) - nacosRegistry.BaseRegistry = baseReg - return baseReg, nil + switch nacosRegistry.RegisteredType { + case registry.RegisteredTypeInterface: + nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = newNacosIntfListener(client, nacosRegistry, ®Config, adapterListener) + //case registry.RegisteredTypeApplication: + //nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener) + default: + return nil, errors.Errorf("Unsupported registry type: %s", regConfig.RegistryType) + } + return nacosRegistry, nil } diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/registry.go b/pkg/adapter/dubboregistry/registry/zookeeper/registry.go index 67f11e4bb..d7ad96ba5 100644 --- a/pkg/adapter/dubboregistry/registry/zookeeper/registry.go +++ b/pkg/adapter/dubboregistry/registry/zookeeper/registry.go @@ -84,6 +84,8 @@ func newZKRegistry(regConfig model.Registry, adapterListener common.RegistryEven zkReg.zkListeners[zkReg.RegisteredType] = newZKIntfListener(zkReg.client, zkReg, zkReg.AdapterListener) case registry.RegisteredTypeApplication: zkReg.zkListeners[zkReg.RegisteredType] = newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener) + default: + return nil, errors.Errorf("Unsupported registry type: %s", regConfig.RegistryType) } return zkReg, nil } diff --git a/pkg/model/cluster.go b/pkg/model/cluster.go index a16e699e2..4fd66ba9f 100644 --- a/pkg/model/cluster.go +++ b/pkg/model/cluster.go @@ -81,7 +81,7 @@ type ( Password string `yaml:"password" json:"password"` Group string `default:"DEFAULT_GROUP" yaml:"group" json:"group"` Namespace string `yaml:"namespace" json:"namespace"` - RegistryType string `yaml:"registry_type" json:"registry_type"` // "application", "interface" + RegistryType string `default:"interface" yaml:"registry_type" json:"registry_type"` // "application", "interface" } // DiscoveryType