Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic Router and Cluster from registry center #632

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/adapter/dubboregistry/registry/base/baseregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ func (s *SvcListeners) GetAllListener() map[string]registry.Listener {
}

type BaseRegistry struct {
RegisteredType registry.RegisteredType
svcListeners *SvcListeners
facadeRegistry FacadeRegistry
AdapterListener common.RegistryEventListener
}

func NewBaseRegistry(facade FacadeRegistry, adapterListener common.RegistryEventListener) *BaseRegistry {
func NewBaseRegistry(facade FacadeRegistry, adapterListener common.RegistryEventListener, registerType registry.RegisteredType) *BaseRegistry {
return &BaseRegistry{
RegisteredType: registerType,
facadeRegistry: facade,
svcListeners: &SvcListeners{
listeners: make(map[string]registry.Listener),
Expand Down
17 changes: 11 additions & 6 deletions pkg/adapter/dubboregistry/registry/nacos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type NacosRegistry struct {
}

func (n *NacosRegistry) DoSubscribe() error {
intfListener, ok := n.nacosListeners[registry.RegisteredTypeInterface]
intfListener, ok := n.nacosListeners[n.RegisteredType]
if !ok {
return errors.New("Listener for interface level registration does not initialized")
}
Expand Down Expand Up @@ -91,9 +91,14 @@ func newNacosRegistry(regConfig model.Registry, adapterListener common.RegistryE
client: client,
nacosListeners: make(map[registry.RegisteredType]registry.Listener),
}
nacosRegistry.nacosListeners[registry.RegisteredTypeInterface] = newNacosIntfListener(client, nacosRegistry, &regConfig, adapterListener)

baseReg := baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener)
nacosRegistry.BaseRegistry = baseReg
return baseReg, nil
nacosRegistry.BaseRegistry = baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType))
switch nacosRegistry.RegisteredType {
case registry.RegisteredTypeInterface:
nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = newNacosIntfListener(client, nacosRegistry, &regConfig, adapterListener)
//case registry.RegisteredTypeApplication:
//nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener)
default:
return nil, errors.Errorf("Unsupported registry type: %s", regConfig.RegistryType)
}
return nacosRegistry, nil
}
24 changes: 18 additions & 6 deletions pkg/adapter/dubboregistry/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,31 @@ import (

type RegisteredType int8

var RegisteredTypes = []string{"application", "interface"}

const (
RegisteredTypeApplication RegisteredType = iota
RegisteredTypeInterface

RegisteredTypeApplicationName = "application"
RegisteredTypeInterfaceName = "interface"
)

var registryMap = make(map[string]func(model.Registry, common2.RegistryEventListener) (Registry, error), 8)

func (t *RegisteredType) String() string {
return []string{"application", "interface"}[*t]
return RegisteredTypes[*t]
}

func RegisterTypeFromName(name string) RegisteredType {
switch name {
case RegisteredTypeApplicationName:
return RegisteredTypeApplication
case RegisteredTypeInterfaceName:
return RegisteredTypeInterface
default:
return RegisteredTypeInterface
}
}

// Registry interface defines the basic features of a registry
Expand Down Expand Up @@ -134,11 +150,7 @@ func ParseDubboString(urlString string) (config.DubboBackendConfig, []string, st

// GetAPIPattern generate the API path pattern. /application/interface/version
func GetAPIPattern(bkConfig config.DubboBackendConfig) string {
if bkConfig.Version == "" {
// if the version is empty, make sure the url path is valid.
return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface}, constant.PathSlash)
}
return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface, bkConfig.Version}, constant.PathSlash)
return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface}, constant.PathSlash)
}

func GetRouter() model.Router {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ import (

import (
dubboCommon "dubbo.apache.org/dubbo-go/v3/common"
ex "dubbo.apache.org/dubbo-go/v3/common/extension"
dubboConst "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metadata/definition"
dr "dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery"
"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
"github.com/dubbogo/go-zookeeper/zk"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/logger"

"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"

"github.com/dubbogo/go-zookeeper/zk"
)

var _ registry.Listener = new(applicationServiceListener)
Expand Down Expand Up @@ -154,8 +155,7 @@ func (asl *applicationServiceListener) handleEvent(children []string) {
}
methods, err := asl.getMethods(bkConfig.Interface)
if err != nil {
logger.Warnf("Get methods of interface %s failed; due to %s", bkConfig.Interface, err.Error())
continue
logger.Warnf("Get methods of interface %s failed; use prefix pattern to match url, due to %s", bkConfig.Interface, err.Error())
}

apiPattern := registry.GetAPIPattern(bkConfig)
Expand All @@ -169,8 +169,16 @@ func (asl *applicationServiceListener) handleEvent(children []string) {
MapTo: "opt.types",
},
}
for i := range methods {
api := registry.CreateAPIConfig(apiPattern, location, bkConfig, methods[i], mappingParams)
if methods != nil && len(methods) != 0 {
for i := range methods {
api := registry.CreateAPIConfig(apiPattern, location, bkConfig, methods[i], mappingParams)
if err := asl.adapterListener.OnAddAPI(api); err != nil {
logger.Errorf("Error={%s} happens when try to add api %s", err.Error(), api.Path)
}
}
} else {
// can't fetch methods, use http prefix pattern
api := registry.CreateAPIConfig(apiPattern, location, bkConfig, constant.AnyValue, mappingParams)
if err := asl.adapterListener.OnAddAPI(api); err != nil {
logger.Errorf("Error={%s} happens when try to add api %s", err.Error(), api.Path)
}
Expand All @@ -197,26 +205,16 @@ func (asl *applicationServiceListener) getUrls(path string) []*dubboCommon.URL {
instance := toZookeeperInstance(iss)

metaData := instance.GetMetadata()
metadataStorageType, ok := metaData[constant.MetadataStorageTypeKey]
if !ok {
metadataStorageType = constant.DefaultMetadataStorageType
}
// get metadata service proxy factory according to the metadataStorageType
proxyFactory := ex.GetMetadataServiceProxyFactory(metadataStorageType)
if proxyFactory == nil {
return nil
}
metadataService := proxyFactory.GetProxy(instance)
if metadataService == nil {
logger.Warnf("Get metadataService of instance %s failed", instance)
return nil
}
// call GetExportedURLs to get the exported urls
urls, err := metadataService.GetExportedURLs(constant.AnyValue, constant.AnyValue, constant.AnyValue, constant.AnyValue)
metadataInfo, err := servicediscovery.GetMetadataInfo(instance.GetServiceName(), instance, metaData[dubboConst.ExportedServicesRevisionPropertyName])
if err != nil {
logger.Errorf("Get exported urls of instance %s failed; due to %s", instance, err.Error())
logger.Errorf("get instance %s metadata info error %v", insPath, err.Error())
return nil
}
instance.SetServiceMetadata(metadataInfo)
urls := make([]*dubboCommon.URL, 0)
for _, service := range metadataInfo.Services {
urls = append(urls, instance.ToURLs(service)...)
}
return urls
}

Expand Down
32 changes: 20 additions & 12 deletions pkg/adapter/dubboregistry/registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import (
)

import (
"github.com/pkg/errors"
)
dubboCommon "dubbo.apache.org/dubbo-go/v3/common"

hessian "github.com/apache/dubbo-go-hessian2"

import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
baseRegistry "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry/base"
zk "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/model"

"github.com/pkg/errors"
)

var (
Expand All @@ -49,6 +51,9 @@ const (

func init() {
registry.SetRegistry(constant.Zookeeper, newZKRegistry)
hessian.RegisterPOJO(&dubboCommon.MetadataInfo{})
hessian.RegisterPOJO(&dubboCommon.ServiceInfo{})
hessian.RegisterPOJO(&dubboCommon.URL{})
}

type ZKRegistry struct {
Expand All @@ -61,7 +66,7 @@ var _ registry.Registry = new(ZKRegistry)

func newZKRegistry(regConfig model.Registry, adapterListener common.RegistryEventListener) (registry.Registry, error) {
var zkReg = &ZKRegistry{}
baseReg := baseRegistry.NewBaseRegistry(zkReg, adapterListener)
baseReg := baseRegistry.NewBaseRegistry(zkReg, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType))
timeout, err := time.ParseDuration(regConfig.Timeout)
if err != nil {
return nil, errors.Errorf("Incorrect timeout configuration: %s", regConfig.Timeout)
Expand All @@ -73,15 +78,18 @@ func newZKRegistry(regConfig model.Registry, adapterListener common.RegistryEven
client.RegisterHandler(eventChan)
zkReg.BaseRegistry = baseReg
zkReg.client = client
initZKListeners(zkReg)
zkReg.zkListeners = make(map[registry.RegisteredType]registry.Listener)
switch zkReg.RegisteredType {
case registry.RegisteredTypeInterface:
zkReg.zkListeners[zkReg.RegisteredType] = newZKIntfListener(zkReg.client, zkReg, zkReg.AdapterListener)
case registry.RegisteredTypeApplication:
zkReg.zkListeners[zkReg.RegisteredType] = newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener)
default:
return nil, errors.Errorf("Unsupported registry type: %s", regConfig.RegistryType)
}
return zkReg, nil
}

func initZKListeners(reg *ZKRegistry) {
reg.zkListeners = make(map[registry.RegisteredType]registry.Listener)
reg.zkListeners[registry.RegisteredTypeInterface] = newZKIntfListener(reg.client, reg, reg.AdapterListener)
}

func (r *ZKRegistry) GetClient() *zk.ZooKeeperClient {
return r.client
}
Expand All @@ -96,7 +104,7 @@ func (r *ZKRegistry) DoSubscribe() error {

// To subscribe service level service discovery
func (r *ZKRegistry) interfaceSubscribe() error {
intfListener, ok := r.zkListeners[registry.RegisteredTypeInterface]
intfListener, ok := r.zkListeners[r.RegisteredType]
if !ok {
return errors.New("Listener for interface level registration does not initialized")
}
Expand All @@ -106,7 +114,7 @@ func (r *ZKRegistry) interfaceSubscribe() error {

// DoUnsubscribe stops monitoring the target registry.
func (r *ZKRegistry) DoUnsubscribe() error {
intfListener, ok := r.zkListeners[registry.RegisteredTypeInterface]
intfListener, ok := r.zkListeners[r.RegisteredType]
if !ok {
return errors.New("Listener for interface level registration does not initialized")
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package zookeeper

import (
"strings"
"sync"
"time"
)
Expand All @@ -33,7 +32,6 @@ import (
common2 "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)

Expand Down Expand Up @@ -128,16 +126,15 @@ func (zkl *serviceListener) waitEventAndHandlePeriod(children []string, e <-chan

// whenever it is called, the children node changed and refresh the api configuration.
func (zkl *serviceListener) handleEvent() {
// get all children of provider, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService/providers
children, err := zkl.client.GetChildren(zkl.path)
if err != nil {
// disable the API
bkConf, methods, _, _ := registry.ParseDubboString(zkl.url.String())
// disable the service all methods
bkConf, _, _, _ := registry.ParseDubboString(zkl.url.String())
apiPattern := registry.GetAPIPattern(bkConf)
for i := range methods {
path := strings.Join([]string{apiPattern, methods[i]}, constant.PathSlash)
if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: path}); err != nil {
logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), path)
}
// delete all config of an interface, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService
if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: apiPattern}); err != nil {
logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), apiPattern)
}
return
}
Expand Down
Loading
Loading