forked from robinjoseph08/redisqueue
-
Notifications
You must be signed in to change notification settings - Fork 1
/
redis.go
67 lines (58 loc) · 1.9 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package redisqueue
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
)
var redisVersionRE = regexp.MustCompile(`redis_version:(.+)`)
// RedisOptions is an alias to redis.Options so that users can this instead of
// having to import go-redis directly.
type RedisOptions = redis.Options
// newRedisClient creates a new Redis client with the given options. If options
// is nil, it will use default options.
func newRedisClient(options *RedisOptions) *redis.Client {
if options == nil {
options = &RedisOptions{}
}
return redis.NewClient(options)
}
// redisPreflightChecks makes sure the Redis instance backing the *redis.Client
// offers the functionality we need. Specifically, it also that it can connect
// to the actual instance and that the instance supports Redis streams (i.e.
// it's at least v5).
func redisPreflightChecks(client redis.UniversalClient) error {
info, err := client.Info(context.TODO(), "server").Result()
if err != nil {
return err
}
match := redisVersionRE.FindAllStringSubmatch(info, -1)
if len(match) < 1 {
return fmt.Errorf("could not extract redis version")
}
version := strings.TrimSpace(match[0][1])
parts := strings.Split(version, ".")
major, err := strconv.Atoi(parts[0])
if err != nil {
return err
}
if major < 5 {
return fmt.Errorf("redis streams are not supported in version %q", version)
}
return nil
}
// incrementMessageID takes in a message ID (e.g. 1564886140363-0) and
// increments the index section (e.g. 1564886140363-1). This is the next valid
// ID value, and it can be used for paging through messages.
func incrementMessageID(id string) (string, error) {
parts := strings.Split(id, "-")
index := parts[1]
parsed, err := strconv.ParseInt(index, 10, 64)
if err != nil {
return "", errors.Wrapf(err, "error parsing message ID %q", id)
}
return fmt.Sprintf("%s-%d", parts[0], parsed+1), nil
}