Skip to content

Commit

Permalink
fix: both replicas in a OceanBase Cluster have the primary role assig…
Browse files Browse the repository at this point in the history
…ned (#7390)

Co-authored-by: xuriwuyun <[email protected]>
  • Loading branch information
kizuna-lek and xuriwuyun authored May 23, 2024
1 parent fa1066f commit 70f3065
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 25 deletions.
24 changes: 2 additions & 22 deletions pkg/lorry/engines/redis/get_replica_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ package redis

import (
"context"
"encoding/json"
"strings"
"time"

"github.com/apecloud/kubeblocks/pkg/common"
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines/models"
)
Expand Down Expand Up @@ -71,7 +69,7 @@ func (mgr *Manager) GetReplicaRole(ctx context.Context, _ *dcs.Cluster) (string,
return models.PRIMARY, nil
}

func (mgr *Manager) SubscribeRoleChange(ctx context.Context, cluster *dcs.Cluster) {
func (mgr *Manager) SubscribeRoleChange(ctx context.Context) {
pubSub := mgr.sentinelClient.Subscribe(ctx, "+switch-master")

// go-redis periodically sends ping messages to test connection health
Expand All @@ -83,26 +81,8 @@ func (mgr *Manager) SubscribeRoleChange(ctx context.Context, cluster *dcs.Cluste
masterAddr := strings.Split(msg.Payload, " ")
masterName := strings.Split(masterAddr[3], ".")[0]

// When network partition occurs, the new primary needs to send global role change information to the controller.
if masterName == mgr.CurrentMemberName {
roleSnapshot := &common.GlobalRoleSnapshot{}
oldMasterName := strings.Split(masterAddr[1], ".")[0]
roleSnapshot.PodRoleNamePairs = []common.PodRoleNamePair{
{
PodName: oldMasterName,
RoleName: models.SECONDARY,
PodUID: cluster.GetMemberWithName(oldMasterName).UID,
},
{
PodName: masterName,
RoleName: models.PRIMARY,
PodUID: cluster.GetMemberWithName(masterName).UID,
},
}
roleSnapshot.Version = time.Now().Format(time.RFC3339Nano)

b, _ := json.Marshal(roleSnapshot)
mgr.role = string(b)
mgr.role = models.PRIMARY
} else {
mgr.role = models.SECONDARY
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/lorry/engines/redis/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)
Expand Down Expand Up @@ -89,7 +88,7 @@ func NewManager(properties engines.Properties) (engines.DBManager, error) {

mgr.ctx, mgr.cancel = context.WithCancel(context.Background())

go mgr.SubscribeRoleChange(mgr.ctx, dcs.GetStore().GetClusterFromCache())
go mgr.SubscribeRoleChange(mgr.ctx)
return mgr, nil
}

Expand Down
58 changes: 57 additions & 1 deletion pkg/lorry/operations/replica/checkrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import (
"strings"
"time"

"github.com/apecloud/kubeblocks/pkg/lorry/engines"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/spf13/viper"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/apecloud/kubeblocks/pkg/common"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines/register"
Expand Down Expand Up @@ -155,10 +158,27 @@ func (s *CheckRole) Do(ctx context.Context, _ *operations.OpsRequest) (*operatio
return resp, nil
}

resp.Data["role"] = role
if s.OriRole == role {
return nil, nil
}

// When network partition occurs, the new primary needs to send global role change information to the controller.
isLeader, err := manager.IsLeader(ctx, cluster)
if err != nil {
return nil, err
}
if isLeader {
// we need to get latest member info to build global role snapshot
members, err := s.dcsStore.GetMembers()
if err != nil {
return nil, err
}
cluster.Members = members
resp.Data["role"] = s.buildGlobalRoleSnapshot(cluster, manager, role)
} else {
resp.Data["role"] = role
}

resp.Data["event"] = util.OperationSuccess
s.OriRole = role
err = util.SentEventForProbe(ctx, resp.Data)
Expand Down Expand Up @@ -191,3 +211,39 @@ func (s *CheckRole) roleValidate(role string) (bool, string) {
}
return isValid, msg
}

func (s *CheckRole) buildGlobalRoleSnapshot(cluster *dcs.Cluster, mgr engines.DBManager, role string) string {
currentMemberName := mgr.GetCurrentMemberName()
roleSnapshot := &common.GlobalRoleSnapshot{
Version: time.Now().Format(time.RFC3339Nano),
PodRoleNamePairs: []common.PodRoleNamePair{
{
PodName: currentMemberName,
RoleName: role,
PodUID: cluster.GetMemberWithName(currentMemberName).UID,
},
},
}

for _, member := range cluster.Members {
if member.Name != currentMemberName {
// get old primary and set it's role to none
if member.Role == role {
_, err := mgr.IsLeaderMember(context.Background(), cluster, &member)
if err == nil {
// old leader member is still healthy, just ignore it, and let it's lorry to handle the role change
continue
}
s.logger.Info("old leader member access failed and reset it's role", "member", member.Name, "error", err.Error())
roleSnapshot.PodRoleNamePairs = append(roleSnapshot.PodRoleNamePairs, common.PodRoleNamePair{
PodName: member.Name,
RoleName: "",
PodUID: cluster.GetMemberWithName(member.Name).UID,
})
}
}
}

b, _ := json.Marshal(roleSnapshot)
return string(b)
}

0 comments on commit 70f3065

Please sign in to comment.