Skip to content

Commit

Permalink
add feature about perfer connect to replica node
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 committed Dec 26, 2023
1 parent f7d72a2 commit 961376f
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 23 deletions.
2 changes: 1 addition & 1 deletion internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type scanClusterReader struct {
}

func NewScanClusterReader(opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica)

rd := &scanClusterReader{}
for _, address := range addresses {
Expand Down
15 changes: 8 additions & 7 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
)

type ScanReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
DBS []int `mapstructure:"dbs"`
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
DBS []int `mapstructure:"dbs"`
PerferReplica bool `mapstructure:"perfer_replica" default:"true"`
}

type dbKey struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type syncClusterReader struct {
}

func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica)
log.Debugf("get redis cluster nodes:")
for _, address := range addresses {
log.Debugf("%s", address)
Expand Down
15 changes: 8 additions & 7 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (
)

type SyncReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
SyncRdb bool `mapstructure:"sync_rdb" default:"true"`
SyncAof bool `mapstructure:"sync_aof" default:"true"`
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
SyncRdb bool `mapstructure:"sync_rdb" default:"true"`
SyncAof bool `mapstructure:"sync_aof" default:"true"`
PerferReplica bool `mapstructure:"perfer_replica" default:"true"`
}

type State string
Expand Down
38 changes: 32 additions & 6 deletions internal/utils/cluster_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (
"RedisShake/internal/log"
)

func GetRedisClusterNodes(address string, username string, password string, Tls bool) (addresses []string, slots [][]int) {
func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) {
c := client.NewRedisClient(address, username, password, Tls)
reply := c.DoWithStringReply("cluster", "nodes")
reply = strings.TrimSpace(reply)
slotsCount := 0
log.Infof("address=%v, reply=%v", address, reply)
masters := make(map[string]string)
replicas := make(map[string][]string)
for _, line := range strings.Split(reply, "\n") {
line = strings.TrimSpace(line)
words := strings.Split(line, " ")
if !strings.Contains(words[2], "master") {
continue
}
isMaster := strings.Contains(words[2], "master")
if len(words) < 8 {
log.Panicf("invalid cluster nodes line: %s", line)
}
Expand All @@ -36,11 +36,22 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
ipv6Addr := strings.Join(tok[:len(tok)-1], ":")
address = fmt.Sprintf("[%s]:%s", ipv6Addr, port)
}
if len(words) < 9 {
if isMaster && len(words) < 9 {
log.Warnf("the current master node does not hold any slots. address=[%v]", address)
continue
}
addresses = append(addresses, address)

nodeId := words[0]
if isMaster {
masters[nodeId] = address
} else {
if strings.Contains(words[2], "fail") || strings.Contains(words[2], "noaddr") {
continue
}
masterId := words[3]
replicas[masterId] = append(replicas[masterId], address)
continue
}

// parse slots
slot := make([]int, 0)
Expand Down Expand Up @@ -81,5 +92,20 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
if slotsCount != 16384 {
log.Panicf("invalid cluster nodes slots. slots_count=%v, address=%v", slotsCount, address)
}

if perferReplica && len(replicas) > 0 {
for masterId, replicaAddr := range replicas {
if len(replicaAddr) > 0 {
addresses = append(addresses, replicaAddr[0])
} else {
addresses = append(addresses, masters[masterId])
}
}
} else {
for _, v := range masters {
addresses = append(addresses, v)
}
}

return addresses, slots
}
2 changes: 1 addition & 1 deletion internal/writer/redis_cluster_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *RedisClusterWriter) Close() {
}

func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) {
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false)
r.addresses = addresses
for i, address := range addresses {
theOpts := *opts
Expand Down
2 changes: 2 additions & 0 deletions shake.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ password = "" # keep empty if no authentication is required
tls = false
sync_rdb = true # set to false if you don't want to sync rdb
sync_aof = true # set to false if you don't want to sync aof
prefer_replica = true # set to true if you want to sync from replica node

# [scan_reader]
# cluster = false # set to true if source is a redis cluster
Expand All @@ -18,6 +19,7 @@ sync_aof = true # set to false if you don't want to sync aof
# ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription
# tls = false
# dbs = [] # set you want to scan dbs such as [1,5,7], if you don't want to scan all
# prefer_replica = true # set to true if you want to sync from replica node

# [rdb_reader]
# filepath = "/tmp/dump.rdb"
Expand Down

0 comments on commit 961376f

Please sign in to comment.