From e97dcb6b1eb938a6d39f293de61402f0da63864c Mon Sep 17 00:00:00 2001 From: "maxim.manuylov" Date: Tue, 14 Mar 2017 14:55:52 +0300 Subject: [PATCH] make core not depend on etcd --- build.sh | 2 +- item/item.go | 9 +++--- jongleur/etcd/etcd.go | 38 ++++++++++++----------- jongleur/jongleur.go | 30 +++++-------------- jongleur/regular/regular.go | 60 +++++++++++++++++++------------------ utils/etcd/etcd.go | 28 +++++++++++++++++ utils/utils.go | 4 --- 7 files changed, 92 insertions(+), 79 deletions(-) create mode 100644 utils/etcd/etcd.go diff --git a/build.sh b/build.sh index 5b56021..62f8bd8 100644 --- a/build.sh +++ b/build.sh @@ -1,6 +1,6 @@ #!/bin/bash -VERSION="v0.8" +VERSION="v0.9" DOCKER_REGISTRY="docker.io" rm -rf bin diff --git a/item/item.go b/item/item.go index 9e9fb50..4417889 100644 --- a/item/item.go +++ b/item/item.go @@ -6,6 +6,7 @@ import ( etcd "github.com/coreos/etcd/client" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/maxmanuylov/jongleur/utils" + "github.com/maxmanuylov/jongleur/utils/etcd" "github.com/maxmanuylov/utils/application" "log" "net" @@ -105,7 +106,7 @@ func (config *Config) createRuntimeData(logger *log.Logger) (*runtimeData, error }, healthUrl: config.Health.Value, etcdClient: etcdClient, - etcdKey: fmt.Sprintf("%s/%s", utils.EtcdItemsKey(config.Type), config.Host), + etcdKey: fmt.Sprintf("%s/%s", etcd_utils.EtcdItemsKey(config.Type), config.Host), ttl: periodDuration * time.Duration(config.Tolerance) + semiPeriodDuration, }, nil } @@ -149,9 +150,9 @@ func isItemAlive(data *runtimeData) (bool, error) { func refreshItem(data *runtimeData) error { keys := etcd.NewKeysAPI(data.etcdClient) - context := context.Background() + backgroundContext := context.Background() - _, err := keys.Set(context, data.etcdKey, "42", &etcd.SetOptions{ + _, err := keys.Set(backgroundContext, data.etcdKey, "42", &etcd.SetOptions{ PrevExist: etcd.PrevNoExist, TTL: data.ttl, Refresh: false, @@ -161,7 +162,7 @@ func refreshItem(data *runtimeData) error { return err } - _, err = keys.Set(context, data.etcdKey, "", &etcd.SetOptions{ + _, err = keys.Set(backgroundContext, data.etcdKey, "", &etcd.SetOptions{ PrevExist: etcd.PrevExist, TTL: data.ttl, Refresh: true, diff --git a/jongleur/etcd/etcd.go b/jongleur/etcd/etcd.go index bb48e12..967fa24 100644 --- a/jongleur/etcd/etcd.go +++ b/jongleur/etcd/etcd.go @@ -8,6 +8,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/maxmanuylov/jongleur/jongleur" "github.com/maxmanuylov/jongleur/utils" + "github.com/maxmanuylov/jongleur/utils/etcd" "net/http" "net/url" ) @@ -39,28 +40,29 @@ func (config *Config) ToJongleurConfig() (*jongleur.Config, error) { return nil, err } - return &jongleur.Config{ - Verbose: config.Verbose, - Listen: config.Listen, - Period: config.Period, - Etcd: etcdCluster.ClientURLs(), - ItemsLoader: func (etcdClient _etcd.Client) ([]string, error) { - if err := etcdClient.Sync(context.Background()); err != nil { - return nil, err - } + itemsLoader, err := etcd_utils.NewEtcdItemsLoader(config.Period, etcdCluster.ClientURLs(), func (etcdClient _etcd.Client) ([]string, error) { + if err := etcdClient.Sync(context.Background()); err != nil { + return nil, err + } - newItems := make([]string, 0) + newItems := make([]string, 0) - for _, endpoint := range etcdClient.Endpoints() { - url, err := url.Parse(endpoint) - if err == nil { - newItems = append(newItems, url.Host) - } + for _, endpoint := range etcdClient.Endpoints() { + endpointUrl, err := url.Parse(endpoint) + if err == nil { + newItems = append(newItems, endpointUrl.Host) } + } - return newItems, nil - }, + return newItems, nil + }) + + return &jongleur.Config{ + Verbose: config.Verbose, + Listen: config.Listen, + Period: config.Period, + ItemsLoader: itemsLoader, RequestPatcher: jongleur.IDENTICAL_PATCHER, ResponsePatcher: jongleur.IDENTICAL_PATCHER, - }, nil + }, err } diff --git a/jongleur/jongleur.go b/jongleur/jongleur.go index da28125..baf942c 100644 --- a/jongleur/jongleur.go +++ b/jongleur/jongleur.go @@ -2,7 +2,6 @@ package jongleur import ( "errors" - etcd "github.com/coreos/etcd/client" "github.com/maxmanuylov/jongleur/utils" "github.com/maxmanuylov/jongleur/utils/cycle" "github.com/maxmanuylov/utils/application" @@ -15,7 +14,7 @@ import ( "time" ) -type ItemsLoader func (etcdClient etcd.Client) ([]string, error) +type ItemsLoader func () ([]string, error) type Patcher func(io.Writer) io.Writer @@ -27,7 +26,6 @@ type Config struct { Verbose bool Listen string // "[@]" Period int - Etcd []string ItemsLoader ItemsLoader RequestPatcher Patcher ResponsePatcher Patcher @@ -45,11 +43,11 @@ func Run(config *Config, logger *log.Logger) error { defer data.mcycle.Stop() - etcdTicker := time.NewTicker(data.period) - defer etcdTicker.Stop() + syncTicker := time.NewTicker(data.period) + defer syncTicker.Stop() go func() { - for range etcdTicker.C { + for range syncTicker.C { syncItems(data) } }() @@ -72,7 +70,6 @@ func Run(config *Config, logger *log.Logger) error { type runtimeData struct { period time.Duration logger *log.Logger - etcdClient etcd.Client loadItems ItemsLoader mcycle *cycle.MutableCycle hosts <-chan string @@ -86,24 +83,11 @@ func (config *Config) createRuntimeData(logger *log.Logger) (*runtimeData, error return nil, errors.New("Period must be positive") } - periodDuration := time.Duration(config.Period) * time.Second - semiPeriodDuration := periodDuration / 2 - - etcdClient, err := etcd.New(etcd.Config{ - Endpoints: config.Etcd, - Transport: etcd.DefaultTransport, - HeaderTimeoutPerRequest: semiPeriodDuration, - }) - if err != nil { - return nil, err - } - hosts := make(chan string) return &runtimeData{ - period: periodDuration, + period: time.Duration(config.Period) * time.Second, logger: logger, - etcdClient: etcdClient, loadItems: config.ItemsLoader, mcycle: cycle.NewMutableCycle(hosts, logger), hosts: hosts, @@ -114,9 +98,9 @@ func (config *Config) createRuntimeData(logger *log.Logger) (*runtimeData, error } func syncItems(data *runtimeData) { - newItems, err := data.loadItems(data.etcdClient) + newItems, err := data.loadItems() if err != nil { - data.logger.Printf("Failed to get items from etcd: %v\n", err) + data.logger.Printf("Failed to load items: %v\n", err) return } diff --git a/jongleur/regular/regular.go b/jongleur/regular/regular.go index 6cba8ba..4d61152 100644 --- a/jongleur/regular/regular.go +++ b/jongleur/regular/regular.go @@ -6,6 +6,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/maxmanuylov/jongleur/jongleur" "github.com/maxmanuylov/jongleur/utils" + "github.com/maxmanuylov/jongleur/utils/etcd" "strconv" "strings" ) @@ -28,49 +29,50 @@ func (config *Config) ToJongleurConfig() (*jongleur.Config, error) { return nil, errors.New("Invalid symbol in items: '/'") } - etcdKey := utils.EtcdItemsKey(config.Items) + etcdKey := etcd_utils.EtcdItemsKey(config.Items) remotePortStr := config.getRemotePortStr() - return &jongleur.Config{ - Verbose: config.Verbose, - Listen: config.Listen, - Period: config.Period, - Etcd: []string{config.Etcd}, - ItemsLoader: func (etcdClient etcd.Client) ([]string, error) { - keys := etcd.NewKeysAPI(etcdClient) + itemsLoader, err := etcd_utils.NewEtcdItemsLoader(config.Period, []string{config.Etcd}, func (etcdClient etcd.Client) ([]string, error) { + keys := etcd.NewKeysAPI(etcdClient) - response, err := keys.Get(context.Background(), etcdKey, nil) - if err != nil { - return nil, err - } + response, err := keys.Get(context.Background(), etcdKey, nil) + if err != nil { + return nil, err + } - if response.Node == nil { - return nil, nil - } + if response.Node == nil { + return nil, nil + } - newItems := make([]string, 0) + newItems := make([]string, 0) - if response.Node.Nodes != nil { - for _, node := range response.Node.Nodes { - if !node.Dir { - item := simpleKey(node.Key) + if response.Node.Nodes != nil { + for _, node := range response.Node.Nodes { + if !node.Dir { + item := simpleKey(node.Key) - if remotePortStr != "" { - item = strings.Replace(item, "*", remotePortStr, -1) - } + if remotePortStr != "" { + item = strings.Replace(item, "*", remotePortStr, -1) + } - if !strings.Contains(item, "*") { - newItems = append(newItems, item) - } + if !strings.Contains(item, "*") { + newItems = append(newItems, item) } } } + } + + return newItems, nil + }) - return newItems, nil - }, + return &jongleur.Config{ + Verbose: config.Verbose, + Listen: config.Listen, + Period: config.Period, + ItemsLoader: itemsLoader, RequestPatcher: jongleur.IDENTICAL_PATCHER, ResponsePatcher: jongleur.IDENTICAL_PATCHER, - }, nil + }, err } func (config *Config) getRemotePortStr() string { diff --git a/utils/etcd/etcd.go b/utils/etcd/etcd.go new file mode 100644 index 0000000..62d2511 --- /dev/null +++ b/utils/etcd/etcd.go @@ -0,0 +1,28 @@ +package etcd_utils + +import ( + "fmt" + etcd_client "github.com/coreos/etcd/client" + "github.com/maxmanuylov/jongleur/jongleur" + "time" +) + +func NewEtcdItemsLoader(period int, etcdEndpoints []string, loader func (etcd_client.Client) ([]string, error)) (jongleur.ItemsLoader, error) { + etcdClient, err := etcd_client.New(etcd_client.Config{ + Endpoints: etcdEndpoints, + Transport: etcd_client.DefaultTransport, + HeaderTimeoutPerRequest: time.Duration(period) * time.Second / 2, + }) + + if err != nil { + return nil, err + } + + return func() ([]string, error) { + return loader(etcdClient) + }, nil +} + +func EtcdItemsKey(itemType string) string { + return fmt.Sprintf("/jongleur/items/%s", itemType) +} \ No newline at end of file diff --git a/utils/utils.go b/utils/utils.go index 4afe389..d91ea26 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -48,7 +48,3 @@ type UsageError struct { func (err UsageError) Error() string { return err.message } - -func EtcdItemsKey(itemType string) string { - return fmt.Sprintf("/jongleur/items/%s", itemType) -} \ No newline at end of file