Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Legacy syslog bindings code cleanup #438

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/cmd/syslog-agent/app/syslog_agent_mtls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,8 @@ func (f *fakeBindingCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/v2/bindings":
results, err = json.Marshal(f.bindings)
case "/bindings":
results, err = json.Marshal(binding.ToLegacyBindings(f.bindings))
case "/v2/aggregate":
results, err = json.Marshal(f.aggregate)
case "/aggregate":
results, err = json.Marshal(binding.ToLegacyBindings(f.aggregate))
default:
w.WriteHeader(500)
return
Expand Down
106 changes: 35 additions & 71 deletions src/cmd/syslog-agent/app/syslog_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package app_test
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -42,7 +41,7 @@ var _ = Describe("SyslogAgent", func() {
aggregateDrain *syslogTCPServer
appIDs []string
cacheCerts *testhelper.TestCerts
bindingCache *fakeLegacyBindingCache
bindingCache *fakeBindingCache

agentCerts *testhelper.TestCerts
agentCfg app.Config
Expand All @@ -63,26 +62,38 @@ var _ = Describe("SyslogAgent", func() {

appIDs = []string{"app-1", "app-2"}
cacheCerts = testhelper.GenerateCerts("binding-cache-ca")
bindingCache = &fakeLegacyBindingCache{
bindings: []binding.LegacyBinding{
bindingCache = &fakeBindingCache{
bindings: []binding.Binding{
{
AppID: appIDs[0],
Hostname: fmt.Sprintf("%s.example.com", appIDs[0]),
Drains: []string{appHTTPSDrain.server.URL},
Url: appHTTPSDrain.server.URL,
Credentials: []binding.Credentials{
{
Apps: []binding.App{
{
Hostname: fmt.Sprintf("%s.example.com", appIDs[0]),
AppID: appIDs[0],
},
},
},
},
},
{
AppID: appIDs[1],
Hostname: fmt.Sprintf("%s.example.com", appIDs[1]),
Drains: []string{
fmt.Sprintf("syslog-tls://localhost:%s", appTLSDrain.port()),
Url: fmt.Sprintf("syslog-tls://localhost:%s", appTLSDrain.port()),
Credentials: []binding.Credentials{
{
Apps: []binding.App{
{
Hostname: fmt.Sprintf("%s.example.com", appIDs[1]),
AppID: appIDs[1],
},
},
},
},
},
},
aggregate: []binding.LegacyBinding{
aggregate: []binding.Binding{
{
Drains: []string{
fmt.Sprintf("syslog-tls://localhost:%s", aggregateDrain.port()),
},
Url: fmt.Sprintf("syslog-tls://localhost:%s", aggregateDrain.port()),
},
},
}
Expand Down Expand Up @@ -216,7 +227,7 @@ var _ = Describe("SyslogAgent", func() {
labels: map[string]string{
"direction": "egress",
"drain_scope": "aggregate",
"drain_url": bindingCache.aggregate[0].Drains[0],
"drain_url": bindingCache.aggregate[0].Url,
},
},
}
Expand Down Expand Up @@ -284,8 +295,8 @@ var _ = Describe("SyslogAgent", func() {
BeforeEach(func() {
agentCfg.DefaultDrainMetadata = false

oldURL := bindingCache.aggregate[0].Drains[0]
bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=false", oldURL)
oldURL := bindingCache.aggregate[0].Url
bindingCache.aggregate[0].Url = fmt.Sprintf("%s?disable-metadata=false", oldURL)
})

It("does not include tags in drains that do not set disable-metadata to false", func() {
Expand All @@ -311,10 +322,10 @@ var _ = Describe("SyslogAgent", func() {
BeforeEach(func() {
agentCfg.DefaultDrainMetadata = true

oldURL := bindingCache.aggregate[0].Drains[0]
bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=true", oldURL)
oldURL = bindingCache.bindings[0].Drains[0]
bindingCache.bindings[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=true", oldURL)
oldURL := bindingCache.aggregate[0].Url
bindingCache.aggregate[0].Url = fmt.Sprintf("%s?disable-metadata=true", oldURL)
oldURL = bindingCache.bindings[0].Url
bindingCache.bindings[0].Url = fmt.Sprintf("%s?disable-metadata=true", oldURL)
})

It("does not send tags to those drains", func() {
Expand Down Expand Up @@ -353,8 +364,8 @@ var _ = Describe("SyslogAgent", func() {

Context("when the ssl-strict-internal param is set in that drain URL", func() {
BeforeEach(func() {
oldURL := bindingCache.aggregate[0].Drains[0]
bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?ssl-strict-internal=true", oldURL)
oldURL := bindingCache.aggregate[0].Url
bindingCache.aggregate[0].Url = fmt.Sprintf("%s?ssl-strict-internal=true", oldURL)
})

It("uses internal TLS settings to communicate with that drain", func() {
Expand Down Expand Up @@ -465,53 +476,6 @@ func emitLogs(ctx context.Context, appIDs []string, grpcPort int, testCerts *tes
}()
}

type fakeLegacyBindingCache struct {
*httptest.Server
bindings []binding.LegacyBinding
aggregate []binding.LegacyBinding
}

func (f *fakeLegacyBindingCache) startTLS(testCerts *testhelper.TestCerts) {
tlsConfig, err := tlsconfig.Build(
tlsconfig.WithInternalServiceDefaults(),
tlsconfig.WithIdentityFromFile(
testCerts.Cert("binding-cache"),
testCerts.Key("binding-cache"),
),
).Server(
tlsconfig.WithClientAuthenticationFromFile(testCerts.CA()),
)

Expect(err).ToNot(HaveOccurred())

f.Server = httptest.NewUnstartedServer(f)
f.Server.TLS = tlsConfig
f.Server.StartTLS()
}

func (f *fakeLegacyBindingCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var results []binding.LegacyBinding
if r.URL.Path == "/bindings" {
results = f.bindings
} else if r.URL.Path == "/aggregate" {
results = f.aggregate
} else {
w.WriteHeader(500)
return
}

resultData, err := json.Marshal(results)
if err != nil {
w.WriteHeader(500)
return
}

_, err = w.Write(resultData)
if err != nil {
w.WriteHeader(500)
}
}

type syslogHTTPSServer struct {
receivedMessages chan *rfc5424.Message
server *httptest.Server
Expand Down
5 changes: 1 addition & 4 deletions src/cmd/syslog-binding-cache/app/syslog_binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,13 @@ func (sbc *SyslogBindingCache) Run() {
go func() { sbc.log.Println("PPROF SERVER STOPPED " + sbc.pprofServer.ListenAndServe().Error()) }()
}
store := binding.NewStore(sbc.metrics)
legacyStore := binding.NewLegacyStore()
aggregateStore := binding.NewAggregateStore(sbc.config.AggregateDrainsFile)
poller := binding.NewPoller(sbc.apiClient(), sbc.config.APIPollingInterval, store, legacyStore, sbc.metrics, sbc.log)
poller := binding.NewPoller(sbc.apiClient(), sbc.config.APIPollingInterval, store, sbc.metrics, sbc.log)

go poller.Poll()

router := chi.NewRouter()
router.Get("/bindings", cache.LegacyHandler(legacyStore))
router.Get("/v2/bindings", cache.Handler(store))
router.Get("/aggregate", cache.LegacyAggregateHandler(aggregateStore))
router.Get("/v2/aggregate", cache.AggregateHandler(aggregateStore))

sbc.startServer(router)
Expand Down
50 changes: 0 additions & 50 deletions src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,56 +244,6 @@ var _ = Describe("App", func() {
}))
})

It("has an HTTP endpoint that returns legacy aggregate drains", func() {
addr := fmt.Sprintf("https://localhost:%d/aggregate", sbcPort)

var resp *http.Response
Eventually(func() error {
var err error
resp, err = client.Get(addr)
return err
}).Should(Succeed())
defer resp.Body.Close()

Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())

var result []binding.LegacyBinding
err = json.Unmarshal(body, &result)
Expect(err).ToNot(HaveOccurred())

Expect(result).To(HaveLen(1))
Expect(result[0].Drains).To(ConsistOf("syslog://test-hostname:1000", "syslog://test2:1000"))
})

It("has an HTTP endpoint that returns legacy bindings", func() {
addr := fmt.Sprintf("https://localhost:%d/bindings", sbcPort)

var resp *http.Response
Eventually(func() error {
var err error
resp, err = client.Get(addr)
return err
}).Should(Succeed())
defer resp.Body.Close()

Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())

var results []binding.LegacyBinding
err = json.Unmarshal(body, &results)
Expect(err).ToNot(HaveOccurred())

Expect(results).To(HaveLen(2))
result1 := binding.LegacyBinding{AppID: "app-id-1", Drains: []string{"syslog://drain-a", "syslog://drain-b"}, Hostname: "org.space.app-name-1", V2Available: true}
result2 := binding.LegacyBinding{AppID: "app-id-2", Drains: []string{"syslog://drain-c", "syslog://drain-d"}, Hostname: "org.space.app-name-2", V2Available: true}
Expect(results).Should(ConsistOf(result1, result2))
})

Context("when debug configuration is enabled", func() {
BeforeEach(func() {
sbcCfg.MetricsServer.DebugMetrics = true
Expand Down
Loading
Loading