From 4feeeb700e8a2b21fcda4a27f107448f08ed0e0f Mon Sep 17 00:00:00 2001 From: Jovan Kostovski Date: Tue, 21 Nov 2023 00:59:44 +0100 Subject: [PATCH] Legacy syslog bindings code cleanup With the introduction of mTLS for Syslog Drains we have changed the internal structure of the bindings. This PR cleans up the code of the legacy syslog bindings and removes the http calls to the old Syslog Binding Cache API endpoint. From now on, only the `/v2/binding` and `/v2/aggregate-bindings` API endpoints of the Syslog Binding Cache will be used. --- .../app/syslog_agent_mtls_test.go | 4 - src/cmd/syslog-agent/app/syslog_agent_test.go | 106 ++--- .../app/syslog_binding_cache.go | 5 +- .../app/syslog_binding_cache_test.go | 50 -- src/pkg/binding/poller.go | 129 +----- src/pkg/binding/poller_test.go | 429 +----------------- src/pkg/binding/store.go | 40 -- src/pkg/binding/store_test.go | 47 -- src/pkg/cache/client.go | 31 -- src/pkg/cache/client_test.go | 73 --- src/pkg/cache/handler.go | 25 - src/pkg/cache/handler_test.go | 68 +-- src/pkg/ingress/api/client.go | 7 +- src/pkg/ingress/bindings/aggregate_fetcher.go | 24 +- .../bindings/aggregate_fetcher_test.go | 52 +-- src/pkg/ingress/bindings/binding_fetcher.go | 41 +- .../ingress/bindings/binding_fetcher_test.go | 103 +---- 17 files changed, 76 insertions(+), 1158 deletions(-) diff --git a/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go b/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go index 501d3aed8..eecbd2023 100644 --- a/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go +++ b/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go @@ -298,12 +298,8 @@ func (f *fakeBindingCache) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/v2/bindings": results, err = json.Marshal(f.bindings) - case "/bindings": - results, err = json.Marshal(binding.ToLegacyBindings(f.bindings)) case "/v2/aggregate": results, err = json.Marshal(f.aggregate) - case "/aggregate": - results, err = json.Marshal(binding.ToLegacyBindings(f.aggregate)) default: w.WriteHeader(500) return diff --git a/src/cmd/syslog-agent/app/syslog_agent_test.go b/src/cmd/syslog-agent/app/syslog_agent_test.go index a2525a1a6..7f87ad117 100644 --- a/src/cmd/syslog-agent/app/syslog_agent_test.go +++ b/src/cmd/syslog-agent/app/syslog_agent_test.go @@ -3,7 +3,6 @@ package app_test import ( "context" "crypto/tls" - "encoding/json" "fmt" "io" "log" @@ -42,7 +41,7 @@ var _ = Describe("SyslogAgent", func() { aggregateDrain *syslogTCPServer appIDs []string cacheCerts *testhelper.TestCerts - bindingCache *fakeLegacyBindingCache + bindingCache *fakeBindingCache agentCerts *testhelper.TestCerts agentCfg app.Config @@ -63,26 +62,38 @@ var _ = Describe("SyslogAgent", func() { appIDs = []string{"app-1", "app-2"} cacheCerts = testhelper.GenerateCerts("binding-cache-ca") - bindingCache = &fakeLegacyBindingCache{ - bindings: []binding.LegacyBinding{ + bindingCache = &fakeBindingCache{ + bindings: []binding.Binding{ { - AppID: appIDs[0], - Hostname: fmt.Sprintf("%s.example.com", appIDs[0]), - Drains: []string{appHTTPSDrain.server.URL}, + Url: appHTTPSDrain.server.URL, + Credentials: []binding.Credentials{ + { + Apps: []binding.App{ + { + Hostname: fmt.Sprintf("%s.example.com", appIDs[0]), + AppID: appIDs[0], + }, + }, + }, + }, }, { - AppID: appIDs[1], - Hostname: fmt.Sprintf("%s.example.com", appIDs[1]), - Drains: []string{ - fmt.Sprintf("syslog-tls://localhost:%s", appTLSDrain.port()), + Url: fmt.Sprintf("syslog-tls://localhost:%s", appTLSDrain.port()), + Credentials: []binding.Credentials{ + { + Apps: []binding.App{ + { + Hostname: fmt.Sprintf("%s.example.com", appIDs[1]), + AppID: appIDs[1], + }, + }, + }, }, }, }, - aggregate: []binding.LegacyBinding{ + aggregate: []binding.Binding{ { - Drains: []string{ - fmt.Sprintf("syslog-tls://localhost:%s", aggregateDrain.port()), - }, + Url: fmt.Sprintf("syslog-tls://localhost:%s", aggregateDrain.port()), }, }, } @@ -216,7 +227,7 @@ var _ = Describe("SyslogAgent", func() { labels: map[string]string{ "direction": "egress", "drain_scope": "aggregate", - "drain_url": bindingCache.aggregate[0].Drains[0], + "drain_url": bindingCache.aggregate[0].Url, }, }, } @@ -284,8 +295,8 @@ var _ = Describe("SyslogAgent", func() { BeforeEach(func() { agentCfg.DefaultDrainMetadata = false - oldURL := bindingCache.aggregate[0].Drains[0] - bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=false", oldURL) + oldURL := bindingCache.aggregate[0].Url + bindingCache.aggregate[0].Url = fmt.Sprintf("%s?disable-metadata=false", oldURL) }) It("does not include tags in drains that do not set disable-metadata to false", func() { @@ -311,10 +322,10 @@ var _ = Describe("SyslogAgent", func() { BeforeEach(func() { agentCfg.DefaultDrainMetadata = true - oldURL := bindingCache.aggregate[0].Drains[0] - bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=true", oldURL) - oldURL = bindingCache.bindings[0].Drains[0] - bindingCache.bindings[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=true", oldURL) + oldURL := bindingCache.aggregate[0].Url + bindingCache.aggregate[0].Url = fmt.Sprintf("%s?disable-metadata=true", oldURL) + oldURL = bindingCache.bindings[0].Url + bindingCache.bindings[0].Url = fmt.Sprintf("%s?disable-metadata=true", oldURL) }) It("does not send tags to those drains", func() { @@ -353,8 +364,8 @@ var _ = Describe("SyslogAgent", func() { Context("when the ssl-strict-internal param is set in that drain URL", func() { BeforeEach(func() { - oldURL := bindingCache.aggregate[0].Drains[0] - bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?ssl-strict-internal=true", oldURL) + oldURL := bindingCache.aggregate[0].Url + bindingCache.aggregate[0].Url = fmt.Sprintf("%s?ssl-strict-internal=true", oldURL) }) It("uses internal TLS settings to communicate with that drain", func() { @@ -465,53 +476,6 @@ func emitLogs(ctx context.Context, appIDs []string, grpcPort int, testCerts *tes }() } -type fakeLegacyBindingCache struct { - *httptest.Server - bindings []binding.LegacyBinding - aggregate []binding.LegacyBinding -} - -func (f *fakeLegacyBindingCache) startTLS(testCerts *testhelper.TestCerts) { - tlsConfig, err := tlsconfig.Build( - tlsconfig.WithInternalServiceDefaults(), - tlsconfig.WithIdentityFromFile( - testCerts.Cert("binding-cache"), - testCerts.Key("binding-cache"), - ), - ).Server( - tlsconfig.WithClientAuthenticationFromFile(testCerts.CA()), - ) - - Expect(err).ToNot(HaveOccurred()) - - f.Server = httptest.NewUnstartedServer(f) - f.Server.TLS = tlsConfig - f.Server.StartTLS() -} - -func (f *fakeLegacyBindingCache) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var results []binding.LegacyBinding - if r.URL.Path == "/bindings" { - results = f.bindings - } else if r.URL.Path == "/aggregate" { - results = f.aggregate - } else { - w.WriteHeader(500) - return - } - - resultData, err := json.Marshal(results) - if err != nil { - w.WriteHeader(500) - return - } - - _, err = w.Write(resultData) - if err != nil { - w.WriteHeader(500) - } -} - type syslogHTTPSServer struct { receivedMessages chan *rfc5424.Message server *httptest.Server diff --git a/src/cmd/syslog-binding-cache/app/syslog_binding_cache.go b/src/cmd/syslog-binding-cache/app/syslog_binding_cache.go index 781f5acc3..ad2dba59d 100644 --- a/src/cmd/syslog-binding-cache/app/syslog_binding_cache.go +++ b/src/cmd/syslog-binding-cache/app/syslog_binding_cache.go @@ -53,16 +53,13 @@ func (sbc *SyslogBindingCache) Run() { go func() { sbc.log.Println("PPROF SERVER STOPPED " + sbc.pprofServer.ListenAndServe().Error()) }() } store := binding.NewStore(sbc.metrics) - legacyStore := binding.NewLegacyStore() aggregateStore := binding.NewAggregateStore(sbc.config.AggregateDrainsFile) - poller := binding.NewPoller(sbc.apiClient(), sbc.config.APIPollingInterval, store, legacyStore, sbc.metrics, sbc.log) + poller := binding.NewPoller(sbc.apiClient(), sbc.config.APIPollingInterval, store, sbc.metrics, sbc.log) go poller.Poll() router := chi.NewRouter() - router.Get("/bindings", cache.LegacyHandler(legacyStore)) router.Get("/v2/bindings", cache.Handler(store)) - router.Get("/aggregate", cache.LegacyAggregateHandler(aggregateStore)) router.Get("/v2/aggregate", cache.AggregateHandler(aggregateStore)) sbc.startServer(router) diff --git a/src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go b/src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go index 5845c2de7..3561ec5c6 100644 --- a/src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go +++ b/src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go @@ -244,56 +244,6 @@ var _ = Describe("App", func() { })) }) - It("has an HTTP endpoint that returns legacy aggregate drains", func() { - addr := fmt.Sprintf("https://localhost:%d/aggregate", sbcPort) - - var resp *http.Response - Eventually(func() error { - var err error - resp, err = client.Get(addr) - return err - }).Should(Succeed()) - defer resp.Body.Close() - - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - - body, err := io.ReadAll(resp.Body) - Expect(err).ToNot(HaveOccurred()) - - var result []binding.LegacyBinding - err = json.Unmarshal(body, &result) - Expect(err).ToNot(HaveOccurred()) - - Expect(result).To(HaveLen(1)) - Expect(result[0].Drains).To(ConsistOf("syslog://test-hostname:1000", "syslog://test2:1000")) - }) - - It("has an HTTP endpoint that returns legacy bindings", func() { - addr := fmt.Sprintf("https://localhost:%d/bindings", sbcPort) - - var resp *http.Response - Eventually(func() error { - var err error - resp, err = client.Get(addr) - return err - }).Should(Succeed()) - defer resp.Body.Close() - - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - - body, err := io.ReadAll(resp.Body) - Expect(err).ToNot(HaveOccurred()) - - var results []binding.LegacyBinding - err = json.Unmarshal(body, &results) - Expect(err).ToNot(HaveOccurred()) - - Expect(results).To(HaveLen(2)) - result1 := binding.LegacyBinding{AppID: "app-id-1", Drains: []string{"syslog://drain-a", "syslog://drain-b"}, Hostname: "org.space.app-name-1", V2Available: true} - result2 := binding.LegacyBinding{AppID: "app-id-2", Drains: []string{"syslog://drain-c", "syslog://drain-d"}, Hostname: "org.space.app-name-2", V2Available: true} - Expect(results).Should(ConsistOf(result1, result2)) - }) - Context("when debug configuration is enabled", func() { BeforeEach(func() { sbcCfg.MetricsServer.DebugMetrics = true diff --git a/src/pkg/binding/poller.go b/src/pkg/binding/poller.go index c2fcdba2a..30aa1e2cb 100644 --- a/src/pkg/binding/poller.go +++ b/src/pkg/binding/poller.go @@ -13,7 +13,6 @@ type Poller struct { apiClient client pollingInterval time.Duration store Setter - legacyStore LegacySetter logger *log.Logger bindingRefreshErrorCounter metrics.Counter @@ -22,7 +21,6 @@ type Poller struct { type client interface { Get(int) (*http.Response, error) - LegacyGet(int) (*http.Response, error) } type Credentials struct { @@ -49,27 +47,15 @@ type AggBinding struct { CA string `yaml:"ca"` } -type LegacyBinding struct { - AppID string `json:"app_id"` - Drains []string `json:"drains"` - Hostname string `json:"hostname"` - V2Available bool `json:"v2_available"` -} - type Setter interface { Set(bindings []Binding, bindingCount int) } -type LegacySetter interface { - Set([]LegacyBinding) -} - -func NewPoller(ac client, pi time.Duration, s Setter, legacyStore LegacySetter, m Metrics, logger *log.Logger) *Poller { +func NewPoller(ac client, pi time.Duration, s Setter, m Metrics, logger *log.Logger) *Poller { p := &Poller{ apiClient: ac, pollingInterval: pi, store: s, - legacyStore: legacyStore, logger: logger, bindingRefreshErrorCounter: m.NewCounter( "binding_refresh_error", @@ -103,16 +89,14 @@ func (p *Poller) poll() { return } if resp.StatusCode != http.StatusOK { - p.logger.Printf("unexpected response from internal bindings endpoint. status code: %d, falling back to legacy endpoint", resp.StatusCode) - p.pollLegacyFallback() + p.logger.Printf("unexpected response from internal bindings endpoint. status code: %d", resp.StatusCode) return } var aResp apiResponse err = json.NewDecoder(resp.Body).Decode(&aResp) if err != nil { - p.logger.Printf("failed to decode JSON: %s, falling back to legacy endpoint", err) - p.pollLegacyFallback() + p.logger.Printf("failed to decode JSON: %s", err) return } @@ -127,54 +111,6 @@ func (p *Poller) poll() { bindingCount := CalculateBindingCount(bindings) p.lastBindingCount.Set(float64(bindingCount)) p.store.Set(bindings, bindingCount) - p.legacyStore.Set(ToLegacyBindings(bindings)) -} - -func (p *Poller) pollLegacyFallback() { - nextID := 0 - var legacyBindings []LegacyBinding - - for { - resp, err := p.apiClient.LegacyGet(nextID) - if err != nil { - p.bindingRefreshErrorCounter.Add(1) - p.logger.Printf("failed to get page %d from internal legacy bindings endpoint: %s", nextID, err) - return - } - if resp.StatusCode != http.StatusOK { - p.logger.Printf("unexpected response from internal legacy bindings endpoint. status code: %d", resp.StatusCode) - return - } - - var aRespLegacy legacyApiResponse - err = json.NewDecoder(resp.Body).Decode(&aRespLegacy) - if err != nil { - p.logger.Printf("failed to decode legacy JSON: %s", err) - return - } - if aRespLegacy.V5Available { - p.logger.Printf("V4 endpoint is deprecated, skipping v4 result parsing.") - return - } - for k, v := range aRespLegacy.Results { - legacyBindings = append(legacyBindings, LegacyBinding{ - AppID: k, - Drains: v.Drains, - Hostname: v.Hostname, - V2Available: true, - }) - } - nextID = aRespLegacy.NextID - - if nextID == 0 { - break - } - } - bindings := ToBindings(legacyBindings) - bindingCount := CalculateBindingCount(bindings) - p.lastBindingCount.Set(float64(bindingCount)) - p.store.Set(bindings, bindingCount) - p.legacyStore.Set(legacyBindings) } func CalculateBindingCount(bindings []Binding) int { @@ -189,66 +125,7 @@ func CalculateBindingCount(bindings []Binding) int { return len(apps) } -type legacyMold struct { - drains []string - hostname string -} - -func ToBindings(legacyBindings []LegacyBinding) []Binding { - var bindings []Binding - var remodel = make(map[string]Credentials) - for _, lb := range legacyBindings { - for _, d := range lb.Drains { - if val, ok := remodel[d]; ok { - app := App{AppID: lb.AppID, Hostname: lb.Hostname} - remodel[d] = Credentials{Apps: append(val.Apps, app)} - } else { - app := App{AppID: lb.AppID, Hostname: lb.Hostname} - remodel[d] = Credentials{Apps: []App{app}} - } - } - } - - for url, credentials := range remodel { - binding := Binding{Url: url, Credentials: []Credentials{credentials}} - bindings = append(bindings, binding) - } - return bindings -} - -func ToLegacyBindings(bindings []Binding) []LegacyBinding { - var legacyBindings []LegacyBinding - remodel := make(map[string]legacyMold) - for _, b := range bindings { - drain := b.Url - for _, c := range b.Credentials { - for _, a := range c.Apps { - if val, ok := remodel[a.AppID]; ok { - remodel[a.AppID] = legacyMold{drains: append(val.drains, drain), hostname: a.Hostname} - } else { - remodel[a.AppID] = legacyMold{drains: []string{drain}, hostname: a.Hostname} - } - } - } - } - - for appID, app := range remodel { - legacyBinding := LegacyBinding{appID, app.drains, app.hostname, true} - legacyBindings = append(legacyBindings, legacyBinding) - } - return legacyBindings -} - type apiResponse struct { Results []Binding NextID int `json:"next_id"` } - -type legacyApiResponse struct { - Results map[string]struct { - Drains []string - Hostname string - } - NextID int `json:"next_id"` - V5Available bool `json:"v5_available"` -} diff --git a/src/pkg/binding/poller_test.go b/src/pkg/binding/poller_test.go index 9d6f6c754..bc6212e37 100644 --- a/src/pkg/binding/poller_test.go +++ b/src/pkg/binding/poller_test.go @@ -20,22 +20,20 @@ import ( var _ = Describe("Poller", func() { var ( - apiClient *fakeAPIClient - store *fakeStore - legacyStore *fakeLegacyStore - metrics *metricsHelpers.SpyMetricsRegistry - logger = log.New(GinkgoWriter, "", 0) + apiClient *fakeAPIClient + store *fakeStore + metrics *metricsHelpers.SpyMetricsRegistry + logger = log.New(GinkgoWriter, "", 0) ) BeforeEach(func() { apiClient = newFakeAPIClient() store = newFakeStore() - legacyStore = newFakeLegacyStore() metrics = metricsHelpers.NewMetricsRegistry() }) It("polls for bindings on an interval", func() { - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) + p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) go p.Poll() Eventually(apiClient.called).Should(BeNumerically(">=", 2)) @@ -69,7 +67,7 @@ var _ = Describe("Poller", func() { }, } - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) + p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) go p.Poll() var expectedBindings []binding.Binding @@ -98,29 +96,6 @@ var _ = Describe("Poller", func() { }, }, })) - - var expectedLegacyBindings []binding.LegacyBinding - Eventually(legacyStore.bindings).Should(Receive(&expectedLegacyBindings)) - Expect(expectedLegacyBindings).To(ConsistOf([]binding.LegacyBinding{ - { - AppID: "app-id-0", - Drains: []string{"drain-0", "drain-1"}, - Hostname: "app-hostname0", - V2Available: true, - }, - { - AppID: "app-id-1", - Drains: []string{"drain-0"}, - Hostname: "app-hostname1", - V2Available: true, - }, - { - AppID: "app-id-2", - Drains: []string{"drain-0"}, - Hostname: "app-hostname2", - V2Available: true, - }, - })) }) It("fetches the next page of bindings and stores the result", func() { @@ -167,7 +142,7 @@ var _ = Describe("Poller", func() { }, } - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) + p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) go p.Poll() var expectedBindings []binding.Binding @@ -209,40 +184,11 @@ var _ = Describe("Poller", func() { }, )) - var expectedLegacyBindings []binding.LegacyBinding - Eventually(legacyStore.bindings).Should(Receive(&expectedLegacyBindings)) - Expect(expectedLegacyBindings).To(ConsistOf([]binding.LegacyBinding{ - { - AppID: "app-id-0", - Drains: []string{"drain-0"}, - Hostname: "app-hostname0", - V2Available: true, - }, - { - AppID: "app-id-1", - Drains: []string{"drain-1"}, - Hostname: "app-hostname1", - V2Available: true, - }, - { - AppID: "app-id-2", - Drains: []string{"drain-2"}, - Hostname: "app-hostname2", - V2Available: true, - }, - { - AppID: "app-id-3", - Drains: []string{"drain-3"}, - Hostname: "app-hostname3", - V2Available: true, - }, - })) - Expect(apiClient.requestedIDs).To(ConsistOf(0, 2)) }) It("tracks the number of API errors", func() { - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) + p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) go p.Poll() apiClient.errors <- errors.New("expected") @@ -252,161 +198,13 @@ var _ = Describe("Poller", func() { }).Should(BeNumerically("==", 1)) }) - It("tracks the number of API errors if fallback fails", func() { - apiClient.statusCode <- 404 - apiClient.legacyErrors <- errors.New("expected") - - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) - go p.Poll() - - Eventually(func() float64 { - return metrics.GetMetric("binding_refresh_error", nil).Value() - }).Should(BeNumerically("==", 1)) - }) - - It("fetches results with legacy fallback functionality if CAPI v5 endpoint is unavailable", func() { - + It("does not update the stores if the response code is bad", func() { apiClient.statusCode <- 404 - apiClient.legacyBindings <- legacyResponse{ - Results: map[string]struct { - Drains []string - Hostname string - }{"app-id-0": {Drains: []string{"drain-0", "drain-1"}, Hostname: "app-hostname0"}}, - } - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) - go p.Poll() - - var expectedBindings []binding.Binding - Eventually(store.bindings).Should(Receive(&expectedBindings)) - Expect(expectedBindings).To(ConsistOf([]binding.Binding{ - { - Url: "drain-0", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", CA: "", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, - }, - }, - }, - { - Url: "drain-1", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", CA: "", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, - }, - }, - }, - })) - - var expectedLegacyBindings []binding.LegacyBinding - Eventually(legacyStore.bindings).Should(Receive(&expectedLegacyBindings)) - Expect(expectedLegacyBindings).To(ConsistOf([]binding.LegacyBinding{ - { - AppID: "app-id-0", - Drains: []string{"drain-0", "drain-1"}, - Hostname: "app-hostname0", - V2Available: true, - }, - })) - - }) - - It("fetches the next page with legacy fallback functionality and stores the result", func() { - - apiClient.statusCode <- 404 - apiClient.legacyBindings <- legacyResponse{ - NextID: 2, - Results: map[string]struct { - Drains []string - Hostname string - }{"app-id-0": {Drains: []string{"drain-0", "drain-1"}, Hostname: "app-hostname0"}}, - } - apiClient.legacyBindings <- legacyResponse{ - Results: map[string]struct { - Drains []string - Hostname string - }{"app-id-1": {Drains: []string{"drain-1", "drain-2"}, Hostname: "app-hostname1"}}, - } - - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) - go p.Poll() - - var expectedBindings []binding.Binding - Eventually(store.bindings).Should(Receive(&expectedBindings)) - Expect(expectedBindings).To(ConsistOf([]binding.Binding{ - { - Url: "drain-0", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", CA: "", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, - }, - }, - }, - { - Url: "drain-1", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", CA: "", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}, - {Hostname: "app-hostname1", AppID: "app-id-1"}}, - }, - }, - }, - { - Url: "drain-2", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", CA: "", Apps: []binding.App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, - }, - }, - }, - })) - - var expectedLegacyBindings []binding.LegacyBinding - Eventually(legacyStore.bindings).Should(Receive(&expectedLegacyBindings)) - Expect(expectedLegacyBindings).To(ConsistOf([]binding.LegacyBinding{ - { - AppID: "app-id-0", - Drains: []string{"drain-0", "drain-1"}, - Hostname: "app-hostname0", - V2Available: true, - }, - { - AppID: "app-id-1", - Drains: []string{"drain-1", "drain-2"}, - Hostname: "app-hostname1", - V2Available: true, - }, - })) - - }) - - It("skips parsing v4 results if CAPI v5 endpoint is unavailable but CAPI is already updated", func() { - - apiClient.statusCode <- 404 - apiClient.legacyBindings <- legacyResponse{ - Results: map[string]struct { - Drains []string - Hostname string - }{"app-id-0": {Drains: []string{"drain-0", "drain-1"}, Hostname: "app-hostname0"}}, - V5Available: true, - } - - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) + p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) go p.Poll() Eventually(store.bindings).Should(BeEmpty()) - Eventually(legacyStore.bindings).Should(BeEmpty()) - }) - - It("does not update the stores if both response codes are bad", func() { - apiClient.statusCode <- 404 - apiClient.legacyStatusCode <- 404 - - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, legacyStore, metrics, logger) - go p.Poll() - - Eventually(store.bindings).Should(BeEmpty()) - Eventually(legacyStore.bindings).Should(BeEmpty()) }) It("tracks the number of bindings returned from CAPI", func() { @@ -430,7 +228,7 @@ var _ = Describe("Poller", func() { }, }, } - binding.NewPoller(apiClient, time.Hour, store, legacyStore, metrics, logger) + binding.NewPoller(apiClient, time.Hour, store, metrics, logger) Expect(metrics.GetMetric("last_binding_refresh_count", nil).Value()). To(BeNumerically("==", 2)) @@ -481,160 +279,21 @@ var _ = Describe("Poller", func() { Expect(binding.CalculateBindingCount(multipleBindings)). To(BeNumerically("==", 2)) }) - - It("tracks the correct transformation to LegacyBindings", func() { - noBinding := []binding.Binding{} - singleBinding := []binding.Binding{ - { - Url: "drain-0", - Credentials: []binding.Credentials{ - { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, - }, - }, - }, - { - Url: "drain-1", - Credentials: []binding.Credentials{ - { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, - }, - }, - }, - } - multipleBindings := []binding.Binding{ - { - Url: "drain-0", - Credentials: []binding.Credentials{ - { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, - }, - }, - }, - { - Url: "drain-1", - Credentials: []binding.Credentials{ - { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, - }, - }, - }, - } - expectedSingleLegacyBindings := []binding.LegacyBinding{ - { - AppID: "app-id-0", - Drains: []string{"drain-0", "drain-1"}, - Hostname: "app-hostname0", - V2Available: true, - }, - } - expectedMultiLegacyBindings := []binding.LegacyBinding{ - { - AppID: "app-id-0", - Drains: []string{"drain-0"}, - Hostname: "app-hostname0", - V2Available: true, - }, - { - AppID: "app-id-1", - Drains: []string{"drain-1"}, - Hostname: "app-hostname1", - V2Available: true, - }, - } - - Expect(binding.ToLegacyBindings(noBinding)).To(ConsistOf([]binding.LegacyBinding{})) - Expect(binding.ToLegacyBindings(singleBinding)).To(ConsistOf(expectedSingleLegacyBindings)) - Expect(binding.ToLegacyBindings(multipleBindings)).To(ConsistOf(expectedMultiLegacyBindings)) - - }) - - It("tracks the correct transformation from LegacyBindings to Bindings", func() { - noBinding := []binding.LegacyBinding{} - singleLegacyBinding := []binding.LegacyBinding{ - { - AppID: "app-id-0", - Drains: []string{"drain-0"}, - Hostname: "app-hostname0", - }, - } - multipleLegacyBindings := []binding.LegacyBinding{ - { - AppID: "app-id-0", - Drains: []string{"drain-0", "drain-1"}, - Hostname: "app-hostname0", - }, - { - AppID: "app-id-1", - Drains: []string{"drain-1", "drain-2"}, - Hostname: "app-hostname1", - }, - } - expectedSingleBinding := []binding.Binding{ - { - Url: "drain-0", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, - }, - }, - }, - } - expectedMultiBindings := []binding.Binding{ - { - Url: "drain-0", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", CA: "", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, - }, - }, - }, - { - Url: "drain-1", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}, - {Hostname: "app-hostname1", AppID: "app-id-1"}}, - }, - }, - }, - { - Url: "drain-2", - Credentials: []binding.Credentials{ - { - Cert: "", Key: "", Apps: []binding.App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, - }, - }, - }, - } - - Expect(binding.ToBindings(noBinding)).To(ConsistOf([]binding.LegacyBinding{})) - Expect(binding.ToBindings(singleLegacyBinding)).To(ConsistOf(expectedSingleBinding)) - Expect(binding.ToBindings(multipleLegacyBindings)).To(ConsistOf(expectedMultiBindings)) - - }) - }) type fakeAPIClient struct { - numRequests int64 - bindings chan response - errors chan error - legacyErrors chan error - legacyBindings chan legacyResponse - statusCode chan int - legacyStatusCode chan int - requestedIDs []int + numRequests int64 + bindings chan response + errors chan error + statusCode chan int + requestedIDs []int } func newFakeAPIClient() *fakeAPIClient { return &fakeAPIClient{ - bindings: make(chan response, 100), - legacyBindings: make(chan legacyResponse, 100), - errors: make(chan error, 100), - legacyErrors: make(chan error, 100), - legacyStatusCode: make(chan int, 100), - statusCode: make(chan int, 100), + bindings: make(chan response, 100), + errors: make(chan error, 100), + statusCode: make(chan int, 100), } } @@ -665,33 +324,6 @@ func (c *fakeAPIClient) Get(nextID int) (*http.Response, error) { return resp, err } -func (c *fakeAPIClient) LegacyGet(nextID int) (*http.Response, error) { - atomic.AddInt64(&c.numRequests, 1) - - var legacyBinding legacyResponse - var statusCode = 200 - select { - case err := <-c.legacyErrors: - return nil, err - case legacyBinding = <-c.legacyBindings: - c.requestedIDs = append(c.requestedIDs, nextID) - case injectedStatusCode := <-c.legacyStatusCode: - statusCode = injectedStatusCode - default: - } - - var body []byte - b, err := json.Marshal(&legacyBinding) - Expect(err).ToNot(HaveOccurred()) - body = b - resp := &http.Response{ - Body: io.NopCloser(bytes.NewReader(body)), - } - resp.StatusCode = statusCode - - return resp, nil -} - func (c *fakeAPIClient) called() int64 { return atomic.LoadInt64(&c.numRequests) } @@ -710,30 +342,7 @@ func (c *fakeStore) Set(b []binding.Binding, bindingCount int) { c.bindings <- b } -type fakeLegacyStore struct { - bindings chan []binding.LegacyBinding -} - -func newFakeLegacyStore() *fakeLegacyStore { - return &fakeLegacyStore{ - bindings: make(chan []binding.LegacyBinding, 100), - } -} - -func (c *fakeLegacyStore) Set(b []binding.LegacyBinding) { - c.bindings <- b -} - type response struct { Results []binding.Binding NextID int `json:"next_id"` } - -type legacyResponse struct { - Results map[string]struct { - Drains []string - Hostname string - } - NextID int `json:"next_id"` - V5Available bool `json:"v5_available"` -} diff --git a/src/pkg/binding/store.go b/src/pkg/binding/store.go index 68a3a28cd..367c31c25 100644 --- a/src/pkg/binding/store.go +++ b/src/pkg/binding/store.go @@ -43,32 +43,6 @@ func (s *Store) Set(bindings []Binding, bindingCount int) { s.mu.Unlock() } -type LegacyStore struct { - mu sync.Mutex - legacyBindings []LegacyBinding -} - -func NewLegacyStore() *LegacyStore { - return &LegacyStore{ - legacyBindings: make([]LegacyBinding, 0), - } -} - -func (s *LegacyStore) Get() []LegacyBinding { - s.mu.Lock() - defer s.mu.Unlock() - return s.legacyBindings -} - -func (s *LegacyStore) Set(bindings []LegacyBinding) { - if bindings == nil { - bindings = []LegacyBinding{} - } - s.mu.Lock() - s.legacyBindings = bindings - s.mu.Unlock() -} - type AggregateStore struct { Drains []Binding } @@ -107,17 +81,3 @@ func NewAggregateStore(drainFileName string) *AggregateStore { func (store *AggregateStore) Get() []Binding { return store.Drains } - -func (store *AggregateStore) LegacyGet() []LegacyBinding { - var drains []string - for _, binding := range store.Drains { - drains = append(drains, binding.Url) - } - return []LegacyBinding{ - { - AppID: "", - Drains: drains, - V2Available: true, - }, - } -} diff --git a/src/pkg/binding/store_test.go b/src/pkg/binding/store_test.go index 876efea9c..efd6867c5 100644 --- a/src/pkg/binding/store_test.go +++ b/src/pkg/binding/store_test.go @@ -27,31 +27,11 @@ var _ = Describe("Store", func() { Expect(store.Get()).To(Equal(bindings)) }) - It("should store and retrieve legacy bindings", func() { - legacyStore := binding.NewLegacyStore() - bindings := []binding.LegacyBinding{ - { - AppID: "app-1", - Drains: []string{"drain-1"}, - Hostname: "host-1", - }, - } - - legacyStore.Set(bindings) - Expect(legacyStore.Get()).To(Equal(bindings)) - - }) - It("should not return nil bindings", func() { store := binding.NewStore(metricsHelpers.NewMetricsRegistry()) Expect(store.Get()).ToNot(BeNil()) }) - It("should not return nil legacy bindings", func() { - store := binding.NewLegacyStore() - Expect(store.Get()).ToNot(BeNil()) - }) - It("should not allow setting of bindings to nil", func() { store := binding.NewStore(metricsHelpers.NewMetricsRegistry()) @@ -74,25 +54,6 @@ var _ = Describe("Store", func() { Expect(storedBindings).To(BeEmpty()) }) - It("should not allow setting of legacy bindings to nil", func() { - store := binding.NewLegacyStore() - - bindings := []binding.LegacyBinding{ - { - AppID: "app-1", - Drains: []string{"drain-1"}, - Hostname: "host-1", - }, - } - - store.Set(bindings) - store.Set(nil) - - storedBindings := store.Get() - Expect(storedBindings).ToNot(BeNil()) - Expect(storedBindings).To(BeEmpty()) - }) - // The race detector will cause a failure here // if the store is not thread safe It("should be thread safe", func() { @@ -165,14 +126,6 @@ var _ = Describe("Store", func() { }, }, )) - Expect(aggStore.LegacyGet()).To(ConsistOf( - binding.LegacyBinding{ - AppID: "", - Drains: []string{"syslog://test-hostname:1000", "syslog://test2:1000"}, - Hostname: "", - V2Available: true, - }, - )) }) }) diff --git a/src/pkg/cache/client.go b/src/pkg/cache/client.go index 5fe969978..87a8fcc77 100644 --- a/src/pkg/cache/client.go +++ b/src/pkg/cache/client.go @@ -29,18 +29,10 @@ func (c *CacheClient) Get() ([]binding.Binding, error) { return c.get("v2/bindings") } -func (c *CacheClient) LegacyGet() ([]binding.LegacyBinding, error) { - return c.legacyGet("bindings") -} - func (c *CacheClient) GetAggregate() ([]binding.Binding, error) { return c.get("v2/aggregate") } -func (c *CacheClient) GetLegacyAggregate() ([]binding.LegacyBinding, error) { - return c.legacyGet("aggregate") -} - func (c *CacheClient) get(path string) ([]binding.Binding, error) { var bindings []binding.Binding resp, err := c.h.Get(fmt.Sprintf("%s/"+path, c.cacheAddr)) @@ -63,26 +55,3 @@ func (c *CacheClient) get(path string) ([]binding.Binding, error) { return bindings, nil } - -func (c *CacheClient) legacyGet(path string) ([]binding.LegacyBinding, error) { - var bindings []binding.LegacyBinding - resp, err := c.h.Get(fmt.Sprintf("%s/"+path, c.cacheAddr)) - if err != nil { - return nil, err - } - defer func() { - _, _ = io.Copy(io.Discard, resp.Body) - resp.Body.Close() - }() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected http response from binding cache: %d", resp.StatusCode) - } - - err = json.NewDecoder(resp.Body).Decode(&bindings) - if err != nil { - return nil, err - } - - return bindings, nil -} diff --git a/src/pkg/cache/client_test.go b/src/pkg/cache/client_test.go index d0a64b102..94c87ff73 100644 --- a/src/pkg/cache/client_test.go +++ b/src/pkg/cache/client_test.go @@ -51,27 +51,6 @@ var _ = Describe("Client", func() { Expect(spyHTTPClient.requestURL).To(Equal("https://cache.address.com/v2/bindings")) }) - It("returns legacy bindings from the cache", func() { - bindings := []binding.LegacyBinding{ - { - AppID: "app-id-1", - Drains: []string{"drain-1"}, - Hostname: "host-1", - V2Available: true, - }, - } - - j, err := json.Marshal(bindings) - Expect(err).ToNot(HaveOccurred()) - spyHTTPClient.response = &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader(j)), - } - - Expect(client.LegacyGet()).To(Equal(bindings)) - Expect(spyHTTPClient.requestURL).To(Equal("https://cache.address.com/bindings")) - }) - It("returns aggregate drains from the cache", func() { bindings := []binding.Binding{ { @@ -97,26 +76,6 @@ var _ = Describe("Client", func() { Expect(spyHTTPClient.requestURL).To(Equal("https://cache.address.com/v2/aggregate")) }) - It("returns legacy aggregate drains from the cache", func() { - bindings := []binding.LegacyBinding{ - { - AppID: "app-id-1", - Drains: []string{"drain-1"}, - Hostname: "host-1", - }, - } - - j, err := json.Marshal(bindings) - Expect(err).ToNot(HaveOccurred()) - spyHTTPClient.response = &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader(j)), - } - - Expect(client.GetLegacyAggregate()).To(Equal(bindings)) - Expect(spyHTTPClient.requestURL).To(Equal("https://cache.address.com/aggregate")) - }) - It("returns empty bindings if an HTTP error occurs", func() { spyHTTPClient.err = errors.New("http error") @@ -129,18 +88,6 @@ var _ = Describe("Client", func() { Expect(err).To(MatchError("http error")) }) - It("returns empty legacy bindings if an HTTP error occurs", func() { - spyHTTPClient.err = errors.New("http error") - - _, err := client.Get() - - Expect(err).To(MatchError("http error")) - - _, err = client.GetLegacyAggregate() - - Expect(err).To(MatchError("http error")) - }) - It("returns empty bindings if cache returns a non-OK status code", func() { spyHTTPClient.response = &http.Response{ StatusCode: http.StatusInternalServerError, @@ -155,21 +102,6 @@ var _ = Describe("Client", func() { Expect(err).To(MatchError("unexpected http response from binding cache: 500")) }) - - It("returns empty legacy bindings if cache returns a non-OK status code", func() { - spyHTTPClient.response = &http.Response{ - StatusCode: http.StatusInternalServerError, - Body: io.NopCloser(strings.NewReader("")), - } - - _, err := client.Get() - - Expect(err).To(MatchError("unexpected http response from binding cache: 500")) - - _, err = client.GetLegacyAggregate() - - Expect(err).To(MatchError("unexpected http response from binding cache: 500")) - }) }) type spyHTTPClient struct { @@ -187,11 +119,6 @@ func (s *spyHTTPClient) Get(url string) (*http.Response, error) { return s.response, s.err } -func (s *spyHTTPClient) LegacyGet(url string) (*http.Response, error) { - s.requestURL = url - return s.response, s.err -} - func (s *spyHTTPClient) GetAggregate(url string) (*http.Response, error) { s.requestURL = url return s.response, s.err diff --git a/src/pkg/cache/handler.go b/src/pkg/cache/handler.go index 9f4a7da5a..f3631b52f 100644 --- a/src/pkg/cache/handler.go +++ b/src/pkg/cache/handler.go @@ -12,13 +12,8 @@ type Getter interface { Get() []binding.Binding } -type LegacyGetter interface { - Get() []binding.LegacyBinding -} - type AggregateGetter interface { Get() []binding.Binding - LegacyGet() []binding.LegacyBinding } func Handler(store Getter) http.HandlerFunc { @@ -31,16 +26,6 @@ func Handler(store Getter) http.HandlerFunc { } } -func LegacyHandler(store LegacyGetter) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - err := json.NewEncoder(w).Encode(store.Get()) - if err != nil { - log.Printf("failed to encode response body: %s", err) - return - } - } -} - func AggregateHandler(store AggregateGetter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { err := json.NewEncoder(w).Encode(store.Get()) @@ -50,13 +35,3 @@ func AggregateHandler(store AggregateGetter) http.HandlerFunc { } } } - -func LegacyAggregateHandler(store AggregateGetter) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - err := json.NewEncoder(w).Encode(store.LegacyGet()) - if err != nil { - log.Printf("failed to encode response body: %s", err) - return - } - } -} diff --git a/src/pkg/cache/handler_test.go b/src/pkg/cache/handler_test.go index b37111a41..28bcbfaed 100644 --- a/src/pkg/cache/handler_test.go +++ b/src/pkg/cache/handler_test.go @@ -37,27 +37,6 @@ var _ = Describe("Handler", func() { Expect(rw.Body.String()).To(MatchJSON(j)) }) - It("should write results from the legacy store", func() { - bindings := []binding.LegacyBinding{ - { - AppID: "app-1", - Drains: []string{"drain-1"}, - Hostname: "host-1", - }, - } - - handler := cache.LegacyHandler(newStubLegacyStore(bindings)) - rw := httptest.NewRecorder() - req, err := http.NewRequest(http.MethodGet, "/bindings", nil) - Expect(err).ToNot(HaveOccurred()) - handler.ServeHTTP(rw, req) - - j, err := json.Marshal(&bindings) - Expect(err).ToNot(HaveOccurred()) - - Expect(rw.Body.String()).To(MatchJSON(j)) - }) - It("should write results from the v2 aggregateStore", func() { aggregateDrains := []binding.Binding{ { @@ -72,7 +51,7 @@ var _ = Describe("Handler", func() { }, } - handler := cache.AggregateHandler(newStubAggregateStore(nil, aggregateDrains)) + handler := cache.AggregateHandler(newStubAggregateStore(aggregateDrains)) rw := httptest.NewRecorder() req, err := http.NewRequest(http.MethodGet, "/v2/aggregate", nil) Expect(err).ToNot(HaveOccurred()) @@ -82,39 +61,14 @@ var _ = Describe("Handler", func() { Expect(err).ToNot(HaveOccurred()) Expect(rw.Body.String()).To(MatchJSON(j)) }) - - It("should write results from the legacy aggregateStore", func() { - aggregateDrains := []binding.LegacyBinding{ - { - AppID: "", - Drains: []string{"drain-1", "drain-2"}, - V2Available: true, - }, - } - - handler := cache.LegacyAggregateHandler(newStubAggregateStore(aggregateDrains, nil)) - rw := httptest.NewRecorder() - req, err := http.NewRequest(http.MethodGet, "/aggregate", nil) - Expect(err).ToNot(HaveOccurred()) - handler.ServeHTTP(rw, req) - - j, err := json.Marshal(&aggregateDrains) - Expect(err).ToNot(HaveOccurred()) - Expect(rw.Body.String()).To(MatchJSON(j)) - }) }) type stubStore struct { bindings []binding.Binding } -type stubLegacyStore struct { - bindings []binding.LegacyBinding -} - type stubAggregateStore struct { - AggregateDrains []binding.Binding - LegacyAggregateDrains []binding.LegacyBinding + AggregateDrains []binding.Binding } func newStubStore(bindings []binding.Binding) *stubStore { @@ -127,24 +81,10 @@ func (s *stubStore) Get() []binding.Binding { return s.bindings } -func newStubLegacyStore(bindings []binding.LegacyBinding) *stubLegacyStore { - return &stubLegacyStore{ - bindings: bindings, - } -} - -func (s *stubLegacyStore) Get() []binding.LegacyBinding { - return s.bindings -} - -func newStubAggregateStore(legacyAggregateDrains []binding.LegacyBinding, aggregateDrains []binding.Binding) *stubAggregateStore { - return &stubAggregateStore{LegacyAggregateDrains: legacyAggregateDrains, AggregateDrains: aggregateDrains} +func newStubAggregateStore(aggregateDrains []binding.Binding) *stubAggregateStore { + return &stubAggregateStore{AggregateDrains: aggregateDrains} } func (as *stubAggregateStore) Get() []binding.Binding { return as.AggregateDrains } - -func (as *stubAggregateStore) LegacyGet() []binding.LegacyBinding { - return as.LegacyAggregateDrains -} diff --git a/src/pkg/ingress/api/client.go b/src/pkg/ingress/api/client.go index 8ca378353..e9b56c68f 100644 --- a/src/pkg/ingress/api/client.go +++ b/src/pkg/ingress/api/client.go @@ -6,8 +6,7 @@ import ( ) var ( - legacyPathTemplate = "%s/internal/v4/syslog_drain_urls?batch_size=%d&next_id=%d" - pathTemplate = "%s/internal/v5/syslog_drain_urls?batch_size=%d&next_id=%d" + pathTemplate = "%s/internal/v5/syslog_drain_urls?batch_size=%d&next_id=%d" ) type Client struct { @@ -19,7 +18,3 @@ type Client struct { func (w Client) Get(nextID int) (*http.Response, error) { return w.Client.Get(fmt.Sprintf(pathTemplate, w.Addr, w.BatchSize, nextID)) } - -func (w Client) LegacyGet(nextID int) (*http.Response, error) { - return w.Client.Get(fmt.Sprintf(legacyPathTemplate, w.Addr, w.BatchSize, nextID)) -} diff --git a/src/pkg/ingress/bindings/aggregate_fetcher.go b/src/pkg/ingress/bindings/aggregate_fetcher.go index d1bfafa84..7e53fc846 100644 --- a/src/pkg/ingress/bindings/aggregate_fetcher.go +++ b/src/pkg/ingress/bindings/aggregate_fetcher.go @@ -1,15 +1,12 @@ package bindings import ( - "errors" - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" ) type CacheFetcher interface { GetAggregate() ([]binding.Binding, error) - GetLegacyAggregate() ([]binding.LegacyBinding, error) } type AggregateDrainFetcher struct { @@ -19,7 +16,7 @@ type AggregateDrainFetcher struct { func NewAggregateDrainFetcher(bindings []string, cf CacheFetcher) *AggregateDrainFetcher { drainFetcher := &AggregateDrainFetcher{cf: cf} - parsedDrains := constructLegacyBindings(bindings) + parsedDrains := constructBindings(bindings) drainFetcher.bindings = parsedDrains return drainFetcher } @@ -32,7 +29,7 @@ func (a *AggregateDrainFetcher) FetchBindings() ([]syslog.Binding, error) { } else if a.cf != nil { aggregate, err := a.cf.GetAggregate() if err != nil { - return a.FetchBindingsLegacyFallback() + return []syslog.Binding{}, err } syslogBindings := []syslog.Binding{} for _, i := range aggregate { @@ -58,22 +55,7 @@ func (a *AggregateDrainFetcher) FetchBindings() ([]syslog.Binding, error) { } } -func (a *AggregateDrainFetcher) FetchBindingsLegacyFallback() ([]syslog.Binding, error) { - aggregateLegacy, err := a.cf.GetLegacyAggregate() - if err != nil { - return []syslog.Binding{}, err - } - syslogBindings := []syslog.Binding{} - for _, i := range aggregateLegacy { - if i.V2Available { - return nil, errors.New("v2 is available") - } - syslogBindings = append(syslogBindings, constructLegacyBindings(i.Drains)...) - } - return syslogBindings, nil -} - -func constructLegacyBindings(urls []string) []syslog.Binding { +func constructBindings(urls []string) []syslog.Binding { syslogBindings := []syslog.Binding{} for _, u := range urls { if u == "" { diff --git a/src/pkg/ingress/bindings/aggregate_fetcher_test.go b/src/pkg/ingress/bindings/aggregate_fetcher_test.go index f3488e7b0..3365c5c13 100644 --- a/src/pkg/ingress/bindings/aggregate_fetcher_test.go +++ b/src/pkg/ingress/bindings/aggregate_fetcher_test.go @@ -45,7 +45,7 @@ var _ = Describe("Aggregate Drain Binding Fetcher", func() { "syslog://aggregate-drain1.url.com", "syslog://aggregate-drain2.url.com", } - cacheFetcher := mockCacheFetcher{legacyBindings: []binding.LegacyBinding{{Drains: []string{"syslog://drain.url.com"}}}} + cacheFetcher := mockCacheFetcher{} fetcher := bindings.NewAggregateDrainFetcher(bs, &cacheFetcher) b, err := fetcher.FetchBindings() @@ -152,64 +152,22 @@ var _ = Describe("Aggregate Drain Binding Fetcher", func() { }, )) }) - It("returns results from legacy cache if regular cache fails", func() { - bs := []string{""} - cacheFetcher := mockCacheFetcher{ - legacyBindings: []binding.LegacyBinding{{Drains: []string{ - "syslog://aggregate-drain1.url.com", - "syslog://aggregate-drain2.url.com", - }}}, - err: errors.New("error"), - } - fetcher := bindings.NewAggregateDrainFetcher(bs, &cacheFetcher) - - b, err := fetcher.FetchBindings() - Expect(err).ToNot(HaveOccurred()) - - Expect(b).To(ConsistOf( - syslog.Binding{ - AppId: "", - Drain: syslog.Drain{Url: "syslog://aggregate-drain1.url.com"}, - }, - syslog.Binding{ - AppId: "", - Drain: syslog.Drain{Url: "syslog://aggregate-drain2.url.com"}, - }, - )) - }) It("returns error if fetching fails", func() { bs := []string{""} - cacheFetcher := mockCacheFetcher{legacyErr: errors.New("error2"), err: errors.New("error")} + cacheFetcher := mockCacheFetcher{err: errors.New("error")} fetcher := bindings.NewAggregateDrainFetcher(bs, &cacheFetcher) _, err := fetcher.FetchBindings() - Expect(err).To(MatchError("error2")) - }) - It("returns error if v2 available and fall back", func() { - bs := []string{""} - cacheFetcher := mockCacheFetcher{ - legacyBindings: []binding.LegacyBinding{{V2Available: true, Drains: []string{"syslog://aggregate-drain1.url.com"}}}, - err: errors.New("error"), - } - fetcher := bindings.NewAggregateDrainFetcher(bs, &cacheFetcher) - - _, err := fetcher.FetchBindings() - Expect(err).To(MatchError("v2 is available")) + Expect(err).To(MatchError("error")) }) }) }) type mockCacheFetcher struct { - legacyBindings []binding.LegacyBinding - bindings []binding.Binding - legacyErr error - err error + bindings []binding.Binding + err error } func (m *mockCacheFetcher) GetAggregate() ([]binding.Binding, error) { return m.bindings, m.err } - -func (m *mockCacheFetcher) GetLegacyAggregate() ([]binding.LegacyBinding, error) { - return m.legacyBindings, m.legacyErr -} diff --git a/src/pkg/ingress/bindings/binding_fetcher.go b/src/pkg/ingress/bindings/binding_fetcher.go index f2fee2e82..00627d327 100644 --- a/src/pkg/ingress/bindings/binding_fetcher.go +++ b/src/pkg/ingress/bindings/binding_fetcher.go @@ -1,7 +1,6 @@ package bindings import ( - "errors" "log" "math" "sort" @@ -22,7 +21,6 @@ type Metrics interface { // Getter is configured to fetch HTTP responses type Getter interface { Get() ([]binding.Binding, error) - LegacyGet() ([]binding.LegacyBinding, error) } // BindingFetcher uses a Getter to fetch and decode Bindings @@ -68,20 +66,8 @@ func (f *BindingFetcher) FetchBindings() ([]syslog.Binding, error) { start := time.Now() bindings, err := f.getter.Get() if err != nil { - f.logger.Printf("fetching v2/bindings failed: %s . Falling back to /bindings endpoint", err) - var legacySyslogBindings []binding.LegacyBinding - legacySyslogBindings, err = f.getter.LegacyGet() - if err != nil { - return nil, err - } - if len(legacySyslogBindings) == 0 { - return []syslog.Binding{}, nil - } - if legacySyslogBindings[0].V2Available { - return nil, errors.New("legacy endpoint is deprecated: skipping result parsing") - } - latency = time.Since(start).Nanoseconds() - return f.legacyToSyslogBindings(legacySyslogBindings, f.limit), nil + f.logger.Printf("fetching v2/bindings failed: %s", err) + return nil, err } latency = time.Since(start).Nanoseconds() return f.toSyslogBindings(bindings, f.limit), nil @@ -147,29 +133,6 @@ func (f *BindingFetcher) toSyslogBindings(bs []binding.Binding, perAppLimit int) return bindings } -func (f *BindingFetcher) legacyToSyslogBindings(bs []binding.LegacyBinding, perAppLimit int) []syslog.Binding { - var bindings []syslog.Binding - for _, b := range bs { - drains := b.Drains - sort.Strings(drains) - - if perAppLimit < len(drains) { - drains = drains[:perAppLimit] - } - - for _, d := range drains { - binding := syslog.Binding{ - AppId: b.AppID, - Hostname: b.Hostname, - Drain: syslog.Drain{Url: d}, - } - bindings = append(bindings, binding) - } - } - - return bindings -} - // toMilliseconds truncates the calculated milliseconds float to microsecond // precision. func toMilliseconds(num int64) float64 { diff --git a/src/pkg/ingress/bindings/binding_fetcher_test.go b/src/pkg/ingress/bindings/binding_fetcher_test.go index 75af99e1b..447b559a2 100644 --- a/src/pkg/ingress/bindings/binding_fetcher_test.go +++ b/src/pkg/ingress/bindings/binding_fetcher_test.go @@ -68,33 +68,6 @@ var _ = Describe("BindingFetcher", func() { Credentials: []binding.Credentials{{Apps: []binding.App{{Hostname: "org.space.logspinner", AppID: "testAppID"}}}}, }, } - - getter.legacyBindings = []binding.LegacyBinding{ - { - AppID: "9be15160-4845-4f05-b089-40e827ba61f1", - Drains: []string{ - "syslog://zzz-not-included.url-legacy", - "syslog://other.url-legacy", - "syslog://zzz-not-included-again.url-legacy", - "https://other.url-legacy", - "syslog://other-included.url-legacy"}, - Hostname: "org.space.logspinner-legacy", - V2Available: true, - }, - { - AppID: "testAppID", - Drains: []string{ - "syslog://zzz-not-included.url-legacy", - "syslog://other.url-legacy", - "syslog://zzz-not-included-again.url-legacy", - "https://other.url-legacy", - "syslog://other-included.url-legacy", - }, - Hostname: "org.space.logspinner-legacy", - V2Available: true, - }, - } - }) It("returns the max number of v2 bindings by app id", func() { @@ -141,61 +114,6 @@ var _ = Describe("BindingFetcher", func() { Expect(fetchedBindings).To(ConsistOf(expectedSyslogBindings)) }) - It("returns the max number of syslog bindings by app id in case of v2 endpoint failing", func() { - getter.err = errors.New("getter error occurred") - getter.legacyBindings[0].V2Available = false - getter.legacyBindings[1].V2Available = false - fetcher = bindings.NewBindingFetcher(maxDrains, getter, metrics, logger) - fetchedBindings, err := fetcher.FetchBindings() - Expect(err).ToNot(HaveOccurred()) - - expectedSyslogBindings := []syslog.Binding{ - { - AppId: "9be15160-4845-4f05-b089-40e827ba61f1", - Hostname: "org.space.logspinner-legacy", - Drain: syslog.Drain{Url: "https://other.url-legacy"}, - }, - { - AppId: "9be15160-4845-4f05-b089-40e827ba61f1", - Hostname: "org.space.logspinner-legacy", - Drain: syslog.Drain{Url: "syslog://other-included.url-legacy"}, - }, - { - AppId: "9be15160-4845-4f05-b089-40e827ba61f1", - Hostname: "org.space.logspinner-legacy", - Drain: syslog.Drain{Url: "syslog://other.url-legacy"}, - }, - { - AppId: "testAppID", - Hostname: "org.space.logspinner-legacy", - Drain: syslog.Drain{Url: "https://other.url-legacy"}, - }, - { - AppId: "testAppID", - Hostname: "org.space.logspinner-legacy", - Drain: syslog.Drain{Url: "syslog://other-included.url-legacy"}, - }, - { - AppId: "testAppID", - Hostname: "org.space.logspinner-legacy", - Drain: syslog.Drain{Url: "syslog://other.url-legacy"}, - }, - } - Expect(fetchedBindings).To(ConsistOf(expectedSyslogBindings)) - - }) - - It("returns an empty array of syslog bindings if v1 endpoint sends no results", func() { - getter.err = errors.New("getter error occurred") - getter.legacyBindings = []binding.LegacyBinding{} - fetcher = bindings.NewBindingFetcher(maxDrains, getter, metrics, logger) - fetchedBindings, err := fetcher.FetchBindings() - Expect(err).To(Not(HaveOccurred())) - - var expectedSyslogBindings []syslog.Binding - Expect(fetchedBindings).To(ConsistOf(expectedSyslogBindings)) - }) - It("tracks the number of binding refreshes", func() { _, err := fetcher.FetchBindings() Expect(err).ToNot(HaveOccurred()) @@ -237,33 +155,18 @@ var _ = Describe("BindingFetcher", func() { It("returns an error if the Getter returns an error", func() { getter.err = errors.New("boom") - getter.legacyError = errors.New("boom-legacy") - - _, err := fetcher.FetchBindings() - Expect(err).To(MatchError("boom-legacy")) - }) - - It("returns an error if the Getter returns an error and V2 is available", func() { - getter.err = errors.New("boom") _, err := fetcher.FetchBindings() - Expect(err).To(MatchError("legacy endpoint is deprecated: skipping result parsing")) + Expect(err).To(MatchError("boom")) }) }) type SpyGetter struct { - bindings []binding.Binding - legacyBindings []binding.LegacyBinding - err error - legacyError error + bindings []binding.Binding + err error } func (s *SpyGetter) Get() ([]binding.Binding, error) { time.Sleep(10 * time.Millisecond) return s.bindings, s.err } - -func (s *SpyGetter) LegacyGet() ([]binding.LegacyBinding, error) { - time.Sleep(10 * time.Millisecond) - return s.legacyBindings, s.legacyError -}