Skip to content

Commit

Permalink
Merge pull request #508 from ComputeIO/main
Browse files Browse the repository at this point in the history
feat(common): add redis cluster support
  • Loading branch information
yedf2 authored May 31, 2024
2 parents 46bcac5 + 8f7cf1a commit 741b6ab
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 22 deletions.
11 changes: 6 additions & 5 deletions client/dtmcli/barrier_redis.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package dtmcli

import (
"context"
"fmt"

"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/logger"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)

// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error {
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd redis.Cmdable, key string, amount int, barrierExpire int) error {
bid := bb.newBarrierID()
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, bb.BranchID, bb.Op, bid)
originOp := map[string]string{
dtmimp.OpCancel: dtmimp.OpTry,
dtmimp.OpCompensate: dtmimp.OpAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, bb.BranchID, originOp, bid)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount
v, err := rd.Eval(context.Background(), ` -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
local e1 = redis.call('GET', KEYS[2])
Expand Down Expand Up @@ -54,9 +55,9 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
}

// RedisQueryPrepared query prepared for redis
func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error {
func (bb *BranchBarrier) RedisQueryPrepared(rd redis.Cmdable, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1)
v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared
v, err := rd.Eval(context.Background(), ` -- RedisQueryPrepared
local v = redis.call('GET', KEYS[1])
if v == false then
redis.call('SET', KEYS[1], 'rollback', 'EX', ARGV[1])
Expand Down
4 changes: 2 additions & 2 deletions conf.sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
# Driver: 'boltdb' # default store engine

# Driver: 'redis'
# Host: 'localhost'
# Host: 'localhost' # host1:port1,host2:port2 for cluster connection
# User: ''
# Password: ''
# Port: 6379
# Port: 6379 # required but won't be used for cluster connection

# Driver: 'postgres'
# Host: 'localhost'
Expand Down
26 changes: 18 additions & 8 deletions dtmsvr/storage/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -19,7 +20,7 @@ import (
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/logger"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)

// TODO: optimize this, it's very strange to use pointer to dtmutil.Config
Expand Down Expand Up @@ -506,18 +507,27 @@ redis.call('SET', KEYS[1], ARGV[1])
}

var (
rdb *redis.Client
rdb redis.Cmdable
once sync.Once
)

func redisGet() *redis.Client {
func redisGet() redis.Cmdable {
once.Do(func() {
logger.Debugf("connecting to redis: %v", conf.Store)
rdb = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", conf.Store.Host, conf.Store.Port),
Username: conf.Store.User,
Password: conf.Store.Password,
})
endpoints := strings.Split(conf.Store.Host, ",")
if len(endpoints) == 1 {
rdb = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", conf.Store.Host, conf.Store.Port),
Username: conf.Store.User,
Password: conf.Store.Password,
})
} else {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: endpoints,
Username: conf.Store.User,
Password: conf.Store.Password,
})
}
})
return rdb
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/dtm-labs/dtmdriver-springcloud v1.2.3
github.com/dtm-labs/logger v0.0.2
github.com/gin-gonic/gin v1.9.1
github.com/go-redis/redis/v8 v8.11.5
github.com/redis/go-redis/v9 v9.5.1
github.com/go-resty/resty/v2 v2.7.0
github.com/go-sql-driver/mysql v1.7.0
github.com/lib/pq v1.10.7
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2110,6 +2110,8 @@ github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0V
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
Expand Down
2 changes: 1 addition & 1 deletion test/busi/base_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (*UserAccount) TableName() string {
func GetBalanceByUID(uid int, store string) int {
if store == "redis" {
rd := RedisGet()
accA, err := rd.Get(rd.Context(), GetRedisAccountKey(uid)).Result()
accA, err := rd.Get(context.Background(), GetRedisAccountKey(uid)).Result()
dtmimp.E2P(err)
return dtmimp.MustAtoi(accA)
} else if store == "mongo" {
Expand Down
10 changes: 5 additions & 5 deletions test/busi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/logger"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis/v8"
"github.com/go-resty/resty/v2"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -131,12 +131,12 @@ func oldWrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc
}

var (
rdb *redis.Client
rdb redis.Cmdable
once sync.Once
)

// RedisGet 1
func RedisGet() *redis.Client {
func RedisGet() redis.Cmdable {
once.Do(func() {
logger.Debugf("connecting to client redis")
rdb = redis.NewClient(&redis.Options{
Expand Down Expand Up @@ -170,9 +170,9 @@ func MongoGet() *mongo.Client {
// SetRedisBothAccount 1
func SetRedisBothAccount(amountA int, ammountB int) {
rd := RedisGet()
_, err := rd.Set(rd.Context(), GetRedisAccountKey(TransOutUID), amountA, 0).Result()
_, err := rd.Set(context.Background(), GetRedisAccountKey(TransOutUID), amountA, 0).Result()
dtmimp.E2P(err)
_, err = rd.Set(rd.Context(), GetRedisAccountKey(TransInUID), ammountB, 0).Result()
_, err = rd.Set(context.Background(), GetRedisAccountKey(TransInUID), ammountB, 0).Result()
dtmimp.E2P(err)
}

Expand Down

0 comments on commit 741b6ab

Please sign in to comment.