Skip to content

Commit

Permalink
Merge pull request kirk-enterprise#2 from HeavyHorst/kelseyhightower-…
Browse files Browse the repository at this point in the history
…master

Kelseyhightower master
  • Loading branch information
HeavyHorst committed Mar 6, 2016
2 parents 195691f + 2806f52 commit 1265714
Show file tree
Hide file tree
Showing 20 changed files with 723 additions and 198 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ before_install:
- unzip vault_0.4.1_linux_amd64.zip
- sudo mv vault /bin/
- vault server -dev &
# Install zookeeper
- wget http://www.eu.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz
- tar xzf zookeeper-3.4.8.tar.gz
- mkdir /tmp/zookeeper && cp integration/zookeeper/zoo.cfg zookeeper-3.4.8/conf/zoo.cfg
- zookeeper-3.4.8/bin/zkServer.sh start
install:
- sudo pip install awscli
- go get golang.org/x/tools/cmd/cover
Expand All @@ -44,3 +49,4 @@ script:
- bash integration/rancher/test.sh
- bash integration/vault/test.sh
- bash integration/file/test.sh
- bash integration/zookeeper/test.sh
4 changes: 2 additions & 2 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
`confd` is a lightweight configuration management tool focused on:

* keeping local configuration files up-to-date using data stored in [etcd](https://github.com/coreos/etcd),
[consul](http://consul.io), [dynamodb](http://aws.amazon.com/dynamodb/), [redis](http://redis.io), [zookeeper](https://zookeeper.apache.org) or env vars and processing [template resources](docs/template-resources.md).
[consul](http://consul.io), [dynamodb](http://aws.amazon.com/dynamodb/), [redis](http://redis.io),
[vault](https://vaultproject.io), [zookeeper](https://zookeeper.apache.org) or env vars and processing [template resources](docs/template-resources.md).
* reloading applications to pick up new config file changes

## Community
Expand Down
2 changes: 1 addition & 1 deletion backends/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func New(config Config) (StoreClient, error) {
case "rancher":
return rancher.NewRancherClient(backendNodes)
case "redis":
return redis.NewRedisClient(backendNodes)
return redis.NewRedisClient(backendNodes, config.ClientKey)
case "env":
return env.NewEnvClient()
case "file":
Expand Down
62 changes: 35 additions & 27 deletions backends/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"net"
"net/http"
"strings"
"time"

"github.com/coreos/etcd/client"
Expand Down Expand Up @@ -115,41 +116,48 @@ func nodeWalk(node *client.Node, vars map[string]string) error {
}

func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) {
if prefix == "" {
prefix = "/"
}

// return something > 0 to trigger a key retrieval from the store
if waitIndex == 0 {
return 1, nil
}

// Setting AfterIndex to 0 (default) means that the Watcher
// should start watching for events starting at the current
// index, whatever that may be.
watcher := c.client.Watcher(prefix, &client.WatcherOptions{AfterIndex: uint64(0), Recursive: true})
ctx, cancel := context.WithCancel(context.Background())
cancelRoutine := make(chan bool)
defer close(cancelRoutine)

go func() {
select {
case <-stopChan:
cancel()
case <-cancelRoutine:
return
for {
// Setting AfterIndex to 0 (default) means that the Watcher
// should start watching for events starting at the current
// index, whatever that may be.
watcher := c.client.Watcher(prefix, &client.WatcherOptions{AfterIndex: uint64(0), Recursive: true})
ctx, cancel := context.WithCancel(context.Background())
cancelRoutine := make(chan bool)
defer close(cancelRoutine)

go func() {
select {
case <-stopChan:
cancel()
case <-cancelRoutine:
return
}
}()

resp, err := watcher.Next(ctx)
if err != nil {
switch e := err.(type) {
case *client.Error:
if e.Code == 401 {
return 0, nil
}
}
return waitIndex, err
}
}()

resp, err := watcher.Next(ctx)
if err != nil {
switch e := err.(type) {
case *client.Error:
if e.Code == 401 {
return 0, nil
// Only return if we have a key prefix we care about.
// This is not an exact match on the key so there is a chance
// we will still pickup on false positives. The net win here
// is reducing the scope of keys that can trigger updates.
for _, k := range keys {
if strings.HasPrefix(resp.Node.Key, k) {
return resp.Node.ModifiedIndex, err
}
}
return waitIndex, err
}
return resp.Node.ModifiedIndex, err
}
25 changes: 19 additions & 6 deletions backends/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
type Client struct {
client redis.Conn
machines []string
password string
}

// Iterate through `machines`, trying to connect to each in turn.
// Returns the first successful connection or the last error encountered.
// Assumes that `machines` is non-empty.
func tryConnect(machines []string) (redis.Conn, error) {
func tryConnect(machines []string, password string) (redis.Conn, error) {
var err error
for _, address := range machines {
var conn redis.Conn
Expand All @@ -27,7 +28,19 @@ func tryConnect(machines []string) (redis.Conn, error) {
network = "unix"
}
log.Debug(fmt.Sprintf("Trying to connect to redis node %s", address))
conn, err = redis.DialTimeout(network, address, time.Second, time.Second, time.Second)

dialops := []redis.DialOption{
redis.DialConnectTimeout(time.Second),
redis.DialReadTimeout(time.Second),
redis.DialWriteTimeout(time.Second),
}

if password != "" {
dialops = append(dialops, redis.DialPassword(password))
}

conn, err = redis.Dial(network, address, dialops...)

if err != nil {
continue
}
Expand All @@ -54,7 +67,7 @@ func (c *Client) connectedClient() (redis.Conn, error) {
// Existing client could have been deleted by previous block
if c.client == nil {
var err error
c.client, err = tryConnect(c.machines)
c.client, err = tryConnect(c.machines, c.password)
if err != nil {
return nil, err
}
Expand All @@ -65,10 +78,10 @@ func (c *Client) connectedClient() (redis.Conn, error) {

// NewRedisClient returns an *redis.Client with a connection to named machines.
// It returns an error if a connection to the cluster cannot be made.
func NewRedisClient(machines []string) (*Client, error) {
func NewRedisClient(machines []string, password string) (*Client, error) {
var err error
clientWrapper := &Client{ machines : machines, client: nil }
clientWrapper.client, err = tryConnect(machines)
clientWrapper := &Client{ machines : machines, password: password, client: nil }
clientWrapper.client, err = tryConnect(machines, password)
return clientWrapper, err
}

Expand Down
94 changes: 85 additions & 9 deletions backends/zookeeper/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package zookeeper

import (
"path/filepath"
"strings"
"time"

"github.com/kelseyhightower/confd/log"
zk "github.com/samuel/go-zookeeper/zk"
)

Expand Down Expand Up @@ -73,15 +75,89 @@ func (c *Client) GetValues(keys []string) (map[string]string, error) {
return vars, nil
}

// WatchPrefix is not yet implemented. There's a WIP.
// Since zookeeper doesn't handle recursive watch, we need to create a *lot* of watches.
// Implementation should take care of this.
// A good start is bamboo
// URL https://github.com/QubitProducts/bamboo/blob/master/qzk/qzk.go
// We also need to encourage users to set prefix and add a flag to enale support for "" prefix (aka "/")
//
type watchResponse struct {
waitIndex uint64
err error
}

func (c *Client) watch(key string, respChan chan watchResponse, cancelRoutine chan bool) {
_, _, keyEventCh, err := c.client.GetW(key)
if err != nil {
respChan <- watchResponse{0, err}
}
_, _, childEventCh, err := c.client.ChildrenW(key)
if err != nil {
respChan <- watchResponse{0, err}
}

for {
select {
case e := <-keyEventCh:
if e.Type == zk.EventNodeDataChanged {
respChan <- watchResponse{1, e.Err}
}
case e := <-childEventCh:
if e.Type == zk.EventNodeChildrenChanged {
respChan <- watchResponse{1, e.Err}
}
case <-cancelRoutine:
log.Debug("Stop watching: " + key)
// There is no way to stop GetW/ChildrenW so just quit
return
}
}
}

func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) {
<-stopChan
return 0, nil
// return something > 0 to trigger a key retrieval from the store
if waitIndex == 0 {
return 1, nil
}

// List the childrens first
entries, err := c.GetValues([]string{prefix})
if err != nil {
return 0, err
}

respChan := make(chan watchResponse)
cancelRoutine := make(chan bool)
defer close(cancelRoutine)

//watch all subfolders for changes
watchMap := make(map[string]string)
for k, _ := range entries {
for _, v := range keys {
if strings.HasPrefix(k, v) {
for dir := filepath.Dir(k); dir != "/"; dir = filepath.Dir(dir) {
if _, ok := watchMap[dir]; !ok {
watchMap[dir] = ""
log.Debug("Watching: " + dir)
go c.watch(dir, respChan, cancelRoutine)
}
}
break
}
}
}

//watch all keys in prefix for changes
for k, _ := range entries {
for _, v := range keys {
if strings.HasPrefix(k, v) {
log.Debug("Watching: " + k)
go c.watch(k, respChan, cancelRoutine)
break
}
}
}

for {
select {
case <-stopChan:
return waitIndex, nil
case r := <-respChan:
return r.waitIndex, r.err
}
}
}
7 changes: 3 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,9 @@ func initConfig() error {

if config.Watch {
unsupportedBackends := map[string]bool{
"zookeeper": true,
"redis": true,
"dynamodb": true,
"rancher": true,
"redis": true,
"dynamodb": true,
"rancher": true,
}

if unsupportedBackends[config.Backend] {
Expand Down
15 changes: 15 additions & 0 deletions docs/quick-start-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ confd supports the following backends:

* etcd
* consul
* vault
* environment variables
* redis
* zookeeper
Expand All @@ -33,6 +34,12 @@ curl -X PUT -d 'db.example.com' http://localhost:8500/v1/kv/myapp/database/url
curl -X PUT -d 'rob' http://localhost:8500/v1/kv/myapp/database/user
```

####vault
```
vault mount -path myapp generic
vault write myapp/database url=db.example.com user=rob
```

#### environment variables

```
Expand Down Expand Up @@ -135,6 +142,14 @@ confd -onetime -backend etcd -node http://127.0.0.1:4001
confd -onetime -backend consul -node 127.0.0.1:8500
```

#### vault
```
ROOT_TOKEN=$(vault read -field id auth/token/lookup-self)
confd -onetime -backend vault -node http://127.0.0.1:8200 \
-auth-type token -auth-token $ROOT_TOKEN
```

#### dynamodb

```
Expand Down
7 changes: 1 addition & 6 deletions integration/zookeeper/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,5 @@
export ZK_PATH="`dirname \"$0\"`"
sh -c "cd $ZK_PATH ; go run main.go"

# Run confd with --watch, expecting it to fail
# Run confd
confd --onetime --log-level debug --confdir ./integration/confdir --interval 5 --backend zookeeper --node 127.0.0.1:2181 -watch
if [ $? -eq 0 ]
then
exit 1
fi
confd --onetime --log-level debug --confdir ./integration/confdir --interval 5 --backend zookeeper --node 127.0.0.1:2181
3 changes: 3 additions & 0 deletions integration/zookeeper/zoo.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181
Loading

0 comments on commit 1265714

Please sign in to comment.