Skip to content

Commit

Permalink
move place
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Sep 11, 2024
1 parent 0f85d4c commit 471dbaa
Show file tree
Hide file tree
Showing 28 changed files with 160 additions and 143 deletions.
35 changes: 6 additions & 29 deletions pkg/global/cluster_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,17 @@ package global
import (
"context"
"math/rand"
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

// clusterID is the unique ID for the cluster.
var clusterID atomic.Value

// ClusterID returns the cluster ID.
func ClusterID() uint64 {
id := clusterID.Load()
if id == nil {
return 0
}
return id.(uint64)
}

// setClusterID sets the cluster ID.
func setClusterID(id uint64) {
clusterID.Store(id)
}

// ResetClusterID resets the cluster ID to 0. It's only used in tests.
func ResetClusterID() {
clusterID.Store(uint64(0))
}

// InitClusterID creates a cluster ID if it hasn't existed.
// This function assumes the cluster ID has already existed and always use a
// cheaper read to retrieve it; if it doesn't exist, invoke the more expensive
Expand All @@ -70,18 +47,18 @@ func InitClusterID(c *clientv3.Client) (uint64, error) {
if err != nil {
return 0, err
}
setClusterID(clusterID)
keypath.SetClusterID(clusterID)
log.Info("init cluster id", zap.Uint64("cluster-id", clusterID))
return clusterID, nil
}

// GetClusterIDFromEtcd gets the cluster ID from etcd if local cache is not set.
func GetClusterIDFromEtcd(c *clientv3.Client) (clusterID uint64, err error) {
if id := ClusterID(); id != 0 {
if id := keypath.ClusterID(); id != 0 {
return id, nil
}
// Get any cluster key to parse the cluster ID.
resp, err := etcdutil.EtcdKVGet(c, endpoint.ClusterIDPath)
resp, err := etcdutil.EtcdKVGet(c, keypath.ClusterIDPath)
if err != nil {
return 0, err
}
Expand All @@ -93,7 +70,7 @@ func GetClusterIDFromEtcd(c *clientv3.Client) (clusterID uint64, err error) {
if err != nil {
return 0, err
}
setClusterID(id)
keypath.SetClusterID(id)
return id, nil
}

Expand All @@ -109,7 +86,7 @@ func initOrGetClusterID(c *clientv3.Client) (uint64, error) {
ts = uint64(time.Now().Unix())
clusterID = (ts << 32) + uint64(r.Uint32())
value = typeutil.Uint64ToBytes(clusterID)
key = endpoint.ClusterIDPath
key = keypath.ClusterIDPath
)

// Multiple servers may try to init the cluster ID at the same time.
Expand Down
7 changes: 4 additions & 3 deletions pkg/global/cluster_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
)

func TestInitClusterID(t *testing.T) {
Expand All @@ -29,12 +30,12 @@ func TestInitClusterID(t *testing.T) {
id, err := GetClusterIDFromEtcd(client)
re.NoError(err)
re.Equal(uint64(0), id)
re.Equal(uint64(0), ClusterID())
re.Equal(uint64(0), keypath.ClusterID())

clusterID, err := InitClusterID(client)
re.NoError(err)
re.NotZero(clusterID)
re.Equal(clusterID, ClusterID())
re.Equal(clusterID, keypath.ClusterID())

clusterID1, err := InitClusterID(client)
re.NoError(err)
Expand All @@ -43,5 +44,5 @@ func TestInitClusterID(t *testing.T) {
id, err = GetClusterIDFromEtcd(client)
re.NoError(err)
re.Equal(clusterID, id)
re.Equal(clusterID, ClusterID())
re.Equal(clusterID, keypath.ClusterID())
}
5 changes: 2 additions & 3 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/balancer"
"github.com/tikv/pd/pkg/global"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/slice"
Expand Down Expand Up @@ -217,7 +216,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
}

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client) {
tsoServiceKey := discovery.TSOPath(global.ClusterID())
tsoServiceKey := discovery.TSOPath(keypath.ClusterID())

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down Expand Up @@ -1152,7 +1151,7 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
return "", ErrKeyspaceGroupNotExists(id)
}

rootPath := keypath.TSOSvcRootPath(global.ClusterID())
rootPath := keypath.TSOSvcRootPath(keypath.ClusterID())
primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id)
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/global"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -147,7 +147,7 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb.
}
if len(events) > 0 {
if err := server.Send(&meta_storagepb.WatchResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: global.ClusterID(), Revision: res.Header.GetRevision()},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: res.Header.GetRevision()},
Events: events, CompactRevision: res.CompactRevision}); err != nil {
return err
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (s *Service) Get(ctx context.Context, req *meta_storagepb.GetRequest) (*met
return &meta_storagepb.GetResponse{Header: s.wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
}
resp := &meta_storagepb.GetResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: global.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},
Count: res.Count,
More: res.More,
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s *Service) Put(ctx context.Context, req *meta_storagepb.PutRequest) (*met
}

resp := &meta_storagepb.PutResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: global.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},
}
if res.PrevKv != nil {
resp.PrevKv = &meta_storagepb.KeyValue{Key: res.PrevKv.Key, Value: res.PrevKv.Value}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *Service) Delete(ctx context.Context, req *meta_storagepb.DeleteRequest)
}

resp := &meta_storagepb.DeleteResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: global.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},
}
resp.PrevKvs = make([]*meta_storagepb.KeyValue, len(res.PrevKvs))
for i, kv := range res.PrevKvs {
Expand All @@ -274,7 +274,7 @@ func (s *Service) wrapErrorAndRevision(revision int64, errorType meta_storagepb.

func (s *Service) errorHeader(revision int64, err *meta_storagepb.Error) *meta_storagepb.ResponseHeader {
return &meta_storagepb.ResponseHeader{
ClusterId: global.ClusterID(),
ClusterId: keypath.ClusterID(),
Revision: revision,
Error: err,
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/global"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
Expand Down Expand Up @@ -57,7 +56,7 @@ func NewWatcher(
w := &Watcher{
ctx: ctx,
cancel: cancel,
storePathPrefix: keypath.StorePathPrefix(global.ClusterID()),
storePathPrefix: keypath.StorePathPrefix(keypath.ClusterID()),
etcdClient: etcdClient,
basicCluster: basicCluster,
}
Expand Down Expand Up @@ -93,7 +92,7 @@ func (w *Watcher) initializeStoreWatcher() error {
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
storeID, err := keypath.ExtractStoreIDFromPath(global.ClusterID(), key)
storeID, err := keypath.ExtractStoreIDFromPath(keypath.ClusterID(), key)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/hbstream/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/global"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -90,7 +90,7 @@ func newHbStreams(ctx context.Context, typ string, storeInformer core.StoreSetIn
hs := &HeartbeatStreams{
hbStreamCtx: hbStreamCtx,
hbStreamCancel: hbStreamCancel,
clusterID: global.ClusterID(),
clusterID: keypath.ClusterID(),
streams: make(map[uint64]HeartbeatStream),
msgCh: make(chan core.RegionHeartbeatResponse, heartbeatChanCapacity),
streamCh: make(chan streamUpdate, 1),
Expand Down
4 changes: 2 additions & 2 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/global"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -68,7 +68,7 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C
return nil, err
}
err = syncStream.Send(&pdpb.SyncRegionRequest{
Header: &pdpb.RequestHeader{ClusterId: global.ClusterID()},
Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()},
Member: s.server.GetMemberInfo(),
StartIndex: s.history.getNextIndex(),
})
Expand Down
16 changes: 8 additions & 8 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/global"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -153,7 +153,7 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor
s.history.record(region)
}
regions := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: global.ClusterID()},
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
Regions: requests,
StartIndex: startIndex,
RegionStats: stats,
Expand All @@ -163,7 +163,7 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor
s.broadcast(regions)
case <-ticker.C:
alive := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: global.ClusterID()},
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
StartIndex: s.history.getNextIndex(),
}
s.broadcast(alive)
Expand Down Expand Up @@ -205,8 +205,8 @@ func (s *RegionSyncer) Sync(ctx context.Context, stream pdpb.PD_SyncRegionsServe
return errors.WithStack(err)
}
clusterID := request.GetHeader().GetClusterId()
if clusterID != global.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", global.ClusterID(), clusterID)
if clusterID != keypath.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", keypath.ClusterID(), clusterID)
}
log.Info("establish sync region stream",
zap.String("requested-server", request.GetMember().GetName()),
Expand All @@ -230,7 +230,7 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex))
// still send a response to follower to show the history region sync.
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: global.ClusterID()},
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
Regions: nil,
StartIndex: startIndex,
RegionStats: nil,
Expand Down Expand Up @@ -275,7 +275,7 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync
continue
}
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: global.ClusterID()},
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
Regions: metas,
StartIndex: uint64(lastIndex),
RegionStats: stats,
Expand Down Expand Up @@ -327,7 +327,7 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync
}
}
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: global.ClusterID()},
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
Regions: regions,
StartIndex: startIndex,
RegionStats: stats,
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/global"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -1045,7 +1046,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() {

var err error
defaultPriority := constant.DefaultKeyspaceGroupReplicaPriority
clusterID, err := etcdutil.InitOrGetClusterID(suite.etcdClient, "/pd/cluster_id")
clusterID, err := global.InitClusterID(suite.etcdClient)
re.NoError(err)
clusterIDStr := strconv.FormatUint(clusterID, 10)
rootPath := path.Join("/pd", clusterIDStr)
Expand Down
40 changes: 40 additions & 0 deletions pkg/utils/keypath/cluster_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package keypath

import "sync/atomic"

// clusterID is the unique ID for the cluster. We put it in this package is
// because it is always used with key path.
var clusterID atomic.Value

// ClusterID returns the cluster ID.
func ClusterID() uint64 {
id := clusterID.Load()
if id == nil {
return 0
}
return id.(uint64)
}

// SetClusterID sets the cluster ID.
func SetClusterID(id uint64) {
clusterID.Store(id)
}

// ResetClusterID resets the cluster ID to 0. It's only used in tests.
func ResetClusterID() {
clusterID.Store(uint64(0))
}
Loading

0 comments on commit 471dbaa

Please sign in to comment.