Skip to content

Commit

Permalink
make core not depend on etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
maxmanuylov committed Mar 14, 2017
1 parent 3364f3e commit e97dcb6
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 79 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

VERSION="v0.8"
VERSION="v0.9"
DOCKER_REGISTRY="docker.io"

rm -rf bin
Expand Down
9 changes: 5 additions & 4 deletions item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
etcd "github.com/coreos/etcd/client"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/maxmanuylov/jongleur/utils"
"github.com/maxmanuylov/jongleur/utils/etcd"
"github.com/maxmanuylov/utils/application"
"log"
"net"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (config *Config) createRuntimeData(logger *log.Logger) (*runtimeData, error
},
healthUrl: config.Health.Value,
etcdClient: etcdClient,
etcdKey: fmt.Sprintf("%s/%s", utils.EtcdItemsKey(config.Type), config.Host),
etcdKey: fmt.Sprintf("%s/%s", etcd_utils.EtcdItemsKey(config.Type), config.Host),
ttl: periodDuration * time.Duration(config.Tolerance) + semiPeriodDuration,
}, nil
}
Expand Down Expand Up @@ -149,9 +150,9 @@ func isItemAlive(data *runtimeData) (bool, error) {
func refreshItem(data *runtimeData) error {
keys := etcd.NewKeysAPI(data.etcdClient)

context := context.Background()
backgroundContext := context.Background()

_, err := keys.Set(context, data.etcdKey, "42", &etcd.SetOptions{
_, err := keys.Set(backgroundContext, data.etcdKey, "42", &etcd.SetOptions{
PrevExist: etcd.PrevNoExist,
TTL: data.ttl,
Refresh: false,
Expand All @@ -161,7 +162,7 @@ func refreshItem(data *runtimeData) error {
return err
}

_, err = keys.Set(context, data.etcdKey, "", &etcd.SetOptions{
_, err = keys.Set(backgroundContext, data.etcdKey, "", &etcd.SetOptions{
PrevExist: etcd.PrevExist,
TTL: data.ttl,
Refresh: true,
Expand Down
38 changes: 20 additions & 18 deletions jongleur/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/maxmanuylov/jongleur/jongleur"
"github.com/maxmanuylov/jongleur/utils"
"github.com/maxmanuylov/jongleur/utils/etcd"
"net/http"
"net/url"
)
Expand Down Expand Up @@ -39,28 +40,29 @@ func (config *Config) ToJongleurConfig() (*jongleur.Config, error) {
return nil, err
}

return &jongleur.Config{
Verbose: config.Verbose,
Listen: config.Listen,
Period: config.Period,
Etcd: etcdCluster.ClientURLs(),
ItemsLoader: func (etcdClient _etcd.Client) ([]string, error) {
if err := etcdClient.Sync(context.Background()); err != nil {
return nil, err
}
itemsLoader, err := etcd_utils.NewEtcdItemsLoader(config.Period, etcdCluster.ClientURLs(), func (etcdClient _etcd.Client) ([]string, error) {
if err := etcdClient.Sync(context.Background()); err != nil {
return nil, err
}

newItems := make([]string, 0)
newItems := make([]string, 0)

for _, endpoint := range etcdClient.Endpoints() {
url, err := url.Parse(endpoint)
if err == nil {
newItems = append(newItems, url.Host)
}
for _, endpoint := range etcdClient.Endpoints() {
endpointUrl, err := url.Parse(endpoint)
if err == nil {
newItems = append(newItems, endpointUrl.Host)
}
}

return newItems, nil
},
return newItems, nil
})

return &jongleur.Config{
Verbose: config.Verbose,
Listen: config.Listen,
Period: config.Period,
ItemsLoader: itemsLoader,
RequestPatcher: jongleur.IDENTICAL_PATCHER,
ResponsePatcher: jongleur.IDENTICAL_PATCHER,
}, nil
}, err
}
30 changes: 7 additions & 23 deletions jongleur/jongleur.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package jongleur

import (
"errors"
etcd "github.com/coreos/etcd/client"
"github.com/maxmanuylov/jongleur/utils"
"github.com/maxmanuylov/jongleur/utils/cycle"
"github.com/maxmanuylov/utils/application"
Expand All @@ -15,7 +14,7 @@ import (
"time"
)

type ItemsLoader func (etcdClient etcd.Client) ([]string, error)
type ItemsLoader func () ([]string, error)

type Patcher func(io.Writer) io.Writer

Expand All @@ -27,7 +26,6 @@ type Config struct {
Verbose bool
Listen string // "[<network>@]<addr>"
Period int
Etcd []string
ItemsLoader ItemsLoader
RequestPatcher Patcher
ResponsePatcher Patcher
Expand All @@ -45,11 +43,11 @@ func Run(config *Config, logger *log.Logger) error {

defer data.mcycle.Stop()

etcdTicker := time.NewTicker(data.period)
defer etcdTicker.Stop()
syncTicker := time.NewTicker(data.period)
defer syncTicker.Stop()

go func() {
for range etcdTicker.C {
for range syncTicker.C {
syncItems(data)
}
}()
Expand All @@ -72,7 +70,6 @@ func Run(config *Config, logger *log.Logger) error {
type runtimeData struct {
period time.Duration
logger *log.Logger
etcdClient etcd.Client
loadItems ItemsLoader
mcycle *cycle.MutableCycle
hosts <-chan string
Expand All @@ -86,24 +83,11 @@ func (config *Config) createRuntimeData(logger *log.Logger) (*runtimeData, error
return nil, errors.New("Period must be positive")
}

periodDuration := time.Duration(config.Period) * time.Second
semiPeriodDuration := periodDuration / 2

etcdClient, err := etcd.New(etcd.Config{
Endpoints: config.Etcd,
Transport: etcd.DefaultTransport,
HeaderTimeoutPerRequest: semiPeriodDuration,
})
if err != nil {
return nil, err
}

hosts := make(chan string)

return &runtimeData{
period: periodDuration,
period: time.Duration(config.Period) * time.Second,
logger: logger,
etcdClient: etcdClient,
loadItems: config.ItemsLoader,
mcycle: cycle.NewMutableCycle(hosts, logger),
hosts: hosts,
Expand All @@ -114,9 +98,9 @@ func (config *Config) createRuntimeData(logger *log.Logger) (*runtimeData, error
}

func syncItems(data *runtimeData) {
newItems, err := data.loadItems(data.etcdClient)
newItems, err := data.loadItems()
if err != nil {
data.logger.Printf("Failed to get items from etcd: %v\n", err)
data.logger.Printf("Failed to load items: %v\n", err)
return
}

Expand Down
60 changes: 31 additions & 29 deletions jongleur/regular/regular.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/maxmanuylov/jongleur/jongleur"
"github.com/maxmanuylov/jongleur/utils"
"github.com/maxmanuylov/jongleur/utils/etcd"
"strconv"
"strings"
)
Expand All @@ -28,49 +29,50 @@ func (config *Config) ToJongleurConfig() (*jongleur.Config, error) {
return nil, errors.New("Invalid symbol in items: '/'")
}

etcdKey := utils.EtcdItemsKey(config.Items)
etcdKey := etcd_utils.EtcdItemsKey(config.Items)
remotePortStr := config.getRemotePortStr()

return &jongleur.Config{
Verbose: config.Verbose,
Listen: config.Listen,
Period: config.Period,
Etcd: []string{config.Etcd},
ItemsLoader: func (etcdClient etcd.Client) ([]string, error) {
keys := etcd.NewKeysAPI(etcdClient)
itemsLoader, err := etcd_utils.NewEtcdItemsLoader(config.Period, []string{config.Etcd}, func (etcdClient etcd.Client) ([]string, error) {
keys := etcd.NewKeysAPI(etcdClient)

response, err := keys.Get(context.Background(), etcdKey, nil)
if err != nil {
return nil, err
}
response, err := keys.Get(context.Background(), etcdKey, nil)
if err != nil {
return nil, err
}

if response.Node == nil {
return nil, nil
}
if response.Node == nil {
return nil, nil
}

newItems := make([]string, 0)
newItems := make([]string, 0)

if response.Node.Nodes != nil {
for _, node := range response.Node.Nodes {
if !node.Dir {
item := simpleKey(node.Key)
if response.Node.Nodes != nil {
for _, node := range response.Node.Nodes {
if !node.Dir {
item := simpleKey(node.Key)

if remotePortStr != "" {
item = strings.Replace(item, "*", remotePortStr, -1)
}
if remotePortStr != "" {
item = strings.Replace(item, "*", remotePortStr, -1)
}

if !strings.Contains(item, "*") {
newItems = append(newItems, item)
}
if !strings.Contains(item, "*") {
newItems = append(newItems, item)
}
}
}
}

return newItems, nil
})

return newItems, nil
},
return &jongleur.Config{
Verbose: config.Verbose,
Listen: config.Listen,
Period: config.Period,
ItemsLoader: itemsLoader,
RequestPatcher: jongleur.IDENTICAL_PATCHER,
ResponsePatcher: jongleur.IDENTICAL_PATCHER,
}, nil
}, err
}

func (config *Config) getRemotePortStr() string {
Expand Down
28 changes: 28 additions & 0 deletions utils/etcd/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package etcd_utils

import (
"fmt"
etcd_client "github.com/coreos/etcd/client"
"github.com/maxmanuylov/jongleur/jongleur"
"time"
)

func NewEtcdItemsLoader(period int, etcdEndpoints []string, loader func (etcd_client.Client) ([]string, error)) (jongleur.ItemsLoader, error) {
etcdClient, err := etcd_client.New(etcd_client.Config{
Endpoints: etcdEndpoints,
Transport: etcd_client.DefaultTransport,
HeaderTimeoutPerRequest: time.Duration(period) * time.Second / 2,
})

if err != nil {
return nil, err
}

return func() ([]string, error) {
return loader(etcdClient)
}, nil
}

func EtcdItemsKey(itemType string) string {
return fmt.Sprintf("/jongleur/items/%s", itemType)
}
4 changes: 0 additions & 4 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,3 @@ type UsageError struct {
func (err UsageError) Error() string {
return err.message
}

func EtcdItemsKey(itemType string) string {
return fmt.Sprintf("/jongleur/items/%s", itemType)
}

0 comments on commit e97dcb6

Please sign in to comment.