-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcas.go
109 lines (86 loc) · 2.27 KB
/
cas.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package service
import (
"time"
"errors"
"github.com/go-yaml/yaml"
"github.com/gocql/gocql"
)
// Cassandra wrapper
type CassandraCfg struct {
Cluster []string `yaml:"cluster"`
Keyspace string `yaml:"keyspace"`
Username string `yaml:"username"`
Password string `yaml:"password"`
NumConns int `yaml:"numConns"`
}
// cas defines a Cassandra session and
// it's configuration.
type cas struct {
cfg CassandraCfg
Session *gocql.Session
}
// CasRows used for simple result sets not bound to a
// specific type
type CasRows []map[string]interface{}
func (c *cas) Query(query string) (CasRows, error) {
// call external libs for business logic here
q := c.Session.Query(query)
err := q.Exec()
if err != nil {
return nil, err
}
itr := q.Iter()
defer itr.Close()
ret := make(CasRows, 0)
for {
// New map each iteration
row := make(map[string]interface{})
if !itr.MapScan(row) {
break
}
ret = append(ret, row)
}
if err := itr.Close(); err != nil {
return nil, err
}
return ret, nil
}
// CassandraFromCfg takes a configuration map
func CassandraFromCfg(cfg Cfg) (*cas, error) {
cas := &cas{}
ccfg := CassandraCfg{}
if cfg["cassandra"] == nil {
return cas, errors.New("no cassandra configuration found")
}
// get yaml
d, err := yaml.Marshal(cfg["cassandra"])
if err != nil {
return cas, err
}
// use the yaml to hydrate the configuration
err = yaml.Unmarshal(d, &ccfg)
if err != nil {
return cas, err
}
return Cassandra(ccfg)
}
// Cassandra produces a cassandra object with session
func Cassandra(cfg CassandraCfg) (*cas, error) {
cluster := gocql.NewCluster(cfg.Cluster...)
cluster.DisableInitialHostLookup = true
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
cluster.Compressor = &gocql.SnappyCompressor{}
cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{NumRetries: 3}
cluster.Consistency = gocql.LocalQuorum
cluster.Timeout = 10 * time.Second
if cfg.Username != "" && cfg.Password != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: cfg.Username,
Password: cfg.Password,
}
}
cluster.Keyspace = cfg.Keyspace
cluster.NumConns = cfg.NumConns
session, err := cluster.CreateSession()
return &cas{cfg: cfg, Session: session}, err
}