Skip to content

Commit

Permalink
remove cluster id
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Sep 11, 2024
1 parent 471dbaa commit b8f2c9e
Show file tree
Hide file tree
Showing 23 changed files with 149 additions and 180 deletions.
4 changes: 2 additions & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
}

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client) {
tsoServiceKey := discovery.TSOPath(keypath.ClusterID())
tsoServiceKey := keypath.TSOPath()

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down Expand Up @@ -1151,7 +1151,7 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
return "", ErrKeyspaceGroupNotExists(id)
}

rootPath := keypath.TSOSvcRootPath(keypath.ClusterID())
rootPath := keypath.TSOSvcRootPath()
primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id)
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
Expand Down
14 changes: 4 additions & 10 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@
package discovery

import (
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/global"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := ServicePath(clusterID, serviceName)
func Discover(cli *clientv3.Client, serviceName string) ([]string, error) {
key := keypath.ServicePath(serviceName)
endKey := clientv3.GetPrefixRangeEnd(key)

withRange := clientv3.WithRange(endKey)
Expand All @@ -49,11 +47,7 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch serviceName {
case constant.TSOServiceName, constant.SchedulingServiceName, constant.ResourceManagerServiceName:
clusterID, err := global.GetClusterIDFromEtcd(client)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName)
servicePath := keypath.ServicePath(serviceName)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
Expand Down
16 changes: 8 additions & 8 deletions pkg/mcs/discovery/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func TestDiscover(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
err := sr1.Register()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "12345", "test_service")
endpoints, err := Discover(client, "test_service")
re.NoError(err)
re.Len(endpoints, 2)
re.Equal("127.0.0.1:1", endpoints[0])
Expand All @@ -43,7 +43,7 @@ func TestDiscover(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "12345", "test_service")
endpoints, err = Discover(client, "test_service")
re.NoError(err)
re.Empty(endpoints)
}
Expand All @@ -55,17 +55,17 @@ func TestServiceRegistryEntry(t *testing.T) {
entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"}
s1, err := entry1.Serialize()
re.NoError(err)
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", s1, 1)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", s1, 1)
err = sr1.Register()
re.NoError(err)
entry2 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:2"}
s2, err := entry2.Serialize()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", s2, 1)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", s2, 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "12345", "test_service")
endpoints, err := Discover(client, "test_service")
re.NoError(err)
re.Len(endpoints, 2)
returnedEntry1 := &ServiceRegistryEntry{}
Expand All @@ -78,7 +78,7 @@ func TestServiceRegistryEntry(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "12345", "test_service")
endpoints, err = Discover(client, "test_service")
re.NoError(err)
re.Empty(endpoints)
}
41 changes: 0 additions & 41 deletions pkg/mcs/discovery/key_path.go

This file was deleted.

5 changes: 3 additions & 2 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand All @@ -40,9 +41,9 @@ type ServiceRegister struct {
}

// NewServiceRegister creates a new ServiceRegister.
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
cctx, cancel := context.WithCancel(ctx)
serviceKey := RegistryPath(clusterID, serviceName, serviceAddr)
serviceKey := keypath.RegistryPath(serviceName, serviceAddr)
return &ServiceRegister{
ctx: cctx,
cancel: cancel,
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestRegister(t *testing.T) {
etcd, cfg := servers[0], servers[0].Config()

// Test register with http prefix.
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
sr := NewServiceRegister(context.Background(), client, "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
err := sr.Register()
re.NoError(err)
re.Equal("/ms/12345/test_service/registry/http://127.0.0.1:1", sr.key)
Expand All @@ -51,14 +51,14 @@ func TestRegister(t *testing.T) {
re.Empty(resp.Kvs)

// Test the case that ctx is canceled.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr = NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
sr.cancel()
re.Empty(getKeyAfterLeaseExpired(re, client, sr.key))

// Test the case that keepalive is failed when the etcd is restarted.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr = NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
fname := testutil.InitTempFileLogger("info")
Expand Down
7 changes: 3 additions & 4 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

cfg *Config
clusterID uint64
cfg *Config

// for the primary election of resource manager
participant *member.Participant
Expand Down Expand Up @@ -113,7 +112,7 @@ func (s *Server) Run() (err error) {
return err
}

if s.clusterID, s.serviceID, s.serviceRegister, err = utils.Register(s, constant.ResourceManagerServiceName); err != nil {
if s.serviceID, s.serviceRegister, err = utils.Register(s, constant.ResourceManagerServiceName); err != nil {
return err
}

Expand Down Expand Up @@ -310,7 +309,7 @@ func (s *Server) startServer() (err error) {
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, keypath.ResourceManagerSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")
s.participant.InitInfo(p, keypath.ResourceManagerSvcRootPath(), constant.PrimaryKey, "primary election")

s.service = &Service{
ctx: s.Context(),
Expand Down
14 changes: 10 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
)
Expand All @@ -53,7 +54,6 @@ type Cluster struct {
coordinator *schedule.Coordinator
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
clusterID uint64
running atomic.Bool

// heartbeatRunner is used to process the subtree update task asynchronously.
Expand All @@ -78,7 +78,14 @@ const (
var syncRunner = ratelimit.NewSyncRunner()

// NewCluster creates a new cluster.
func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) {
func NewCluster(
parentCtx context.Context,
persistConfig *config.PersistConfig,
storage storage.Storage,
basicCluster *core.BasicCluster,
hbStreams *hbstream.HeartbeatStreams,
checkMembershipCh chan struct{},
) (*Cluster, error) {
ctx, cancel := context.WithCancel(parentCtx)
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval)
if err != nil {
Expand All @@ -97,7 +104,6 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
labelStats: statistics.NewLabelStatistics(),
regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager),
storage: storage,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
Expand Down Expand Up @@ -228,7 +234,7 @@ func (c *Cluster) AllocID() (uint64, error) {
}
ctx, cancel := context.WithTimeout(c.ctx, requestTimeout)
defer cancel()
resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}})
if err != nil {
c.triggerMembershipCheck()
return 0, err
Expand Down
5 changes: 2 additions & 3 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,16 @@ type persistedConfig struct {
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
persistConfig *PersistConfig,
storage storage.Storage,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
cw := &Watcher{
ctx: ctx,
cancel: cancel,
configPath: keypath.ConfigPath(clusterID),
configPath: keypath.ConfigPath(),
ttlConfigPrefix: sc.TTLConfigPrefix,
schedulerConfigPathPrefix: keypath.SchedulerConfigPathPrefix(clusterID),
schedulerConfigPathPrefix: keypath.SchedulerConfigPathPrefix(),
etcdClient: etcdClient,
PersistConfig: persistConfig,
storage: storage,
Expand Down
9 changes: 5 additions & 4 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.uber.org/zap"
Expand Down Expand Up @@ -188,7 +189,7 @@ func (s *Service) StoreHeartbeat(_ context.Context, request *schedulingpb.StoreH
if err := c.HandleStoreHeartbeat(request); err != nil {
log.Error("handle store heartbeat failed", zap.Error(err))
}
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: keypath.ClusterID()}}, nil
}

// SplitRegions split regions by the given split keys
Expand Down Expand Up @@ -345,7 +346,7 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler

func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader {
return &schedulingpb.ResponseHeader{
ClusterId: s.clusterID,
ClusterId: keypath.ClusterID(),
Error: err,
}
}
Expand All @@ -358,10 +359,10 @@ func (s *Service) notBootstrappedHeader() *schedulingpb.ResponseHeader {
}

func (s *Service) header() *schedulingpb.ResponseHeader {
if s.clusterID == 0 {
if keypath.ClusterID() == 0 {
return s.wrapErrorToHeader(schedulingpb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready")
}
return &schedulingpb.ResponseHeader{ClusterId: s.clusterID}
return &schedulingpb.ResponseHeader{ClusterId: keypath.ClusterID()}
}

func (s *Service) wrapErrorToHeader(
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewWatcher(
w := &Watcher{
ctx: ctx,
cancel: cancel,
storePathPrefix: keypath.StorePathPrefix(keypath.ClusterID()),
storePathPrefix: keypath.StorePathPrefix(),
etcdClient: etcdClient,
basicCluster: basicCluster,
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func (w *Watcher) initializeStoreWatcher() error {
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
storeID, err := keypath.ExtractStoreIDFromPath(keypath.ClusterID(), key)
storeID, err := keypath.ExtractStoreIDFromPath(key)
if err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type Watcher struct {
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
ruleStorage endpoint.RuleStorage,
checkerController *checker.Controller,
ruleManager *placement.RuleManager,
Expand All @@ -86,10 +85,10 @@ func NewWatcher(
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPathPrefix: keypath.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: keypath.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: keypath.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: keypath.RegionLabelPathPrefix(clusterID),
rulesPathPrefix: keypath.RulesPathPrefix(),
ruleCommonPathPrefix: keypath.RuleCommonPathPrefix(),
ruleGroupPathPrefix: keypath.RuleGroupPathPrefix(),
regionLabelPathPrefix: keypath.RegionLabelPathPrefix(),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
Expand Down
Loading

0 comments on commit b8f2c9e

Please sign in to comment.