Skip to content

Commit

Permalink
Legacy syslog bindings code cleanup
Browse files Browse the repository at this point in the history
With the introduction of mTLS for Syslog Drains we have changed the
internal structure of the bindings. This PR cleans up the code of the
legacy syslog bindings and removes the http calls to the old Syslog
Binding Cache API endpoint. From now on, only the `/v2/binding` and
`/v2/aggregate-bindings` API endpoints of the Syslog Binding Cache will
be used.
  • Loading branch information
chombium authored and mkocher committed Jan 17, 2024
1 parent bed24df commit 6c78221
Show file tree
Hide file tree
Showing 17 changed files with 76 additions and 1,158 deletions.
4 changes: 0 additions & 4 deletions src/cmd/syslog-agent/app/syslog_agent_mtls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,8 @@ func (f *fakeBindingCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/v2/bindings":
results, err = json.Marshal(f.bindings)
case "/bindings":
results, err = json.Marshal(binding.ToLegacyBindings(f.bindings))
case "/v2/aggregate":
results, err = json.Marshal(f.aggregate)
case "/aggregate":
results, err = json.Marshal(binding.ToLegacyBindings(f.aggregate))
default:
w.WriteHeader(500)
return
Expand Down
106 changes: 35 additions & 71 deletions src/cmd/syslog-agent/app/syslog_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package app_test
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -42,7 +41,7 @@ var _ = Describe("SyslogAgent", func() {
aggregateDrain *syslogTCPServer
appIDs []string
cacheCerts *testhelper.TestCerts
bindingCache *fakeLegacyBindingCache
bindingCache *fakeBindingCache

agentCerts *testhelper.TestCerts
agentCfg app.Config
Expand All @@ -63,26 +62,38 @@ var _ = Describe("SyslogAgent", func() {

appIDs = []string{"app-1", "app-2"}
cacheCerts = testhelper.GenerateCerts("binding-cache-ca")
bindingCache = &fakeLegacyBindingCache{
bindings: []binding.LegacyBinding{
bindingCache = &fakeBindingCache{
bindings: []binding.Binding{
{
AppID: appIDs[0],
Hostname: fmt.Sprintf("%s.example.com", appIDs[0]),
Drains: []string{appHTTPSDrain.server.URL},
Url: appHTTPSDrain.server.URL,
Credentials: []binding.Credentials{
{
Apps: []binding.App{
{
Hostname: fmt.Sprintf("%s.example.com", appIDs[0]),
AppID: appIDs[0],
},
},
},
},
},
{
AppID: appIDs[1],
Hostname: fmt.Sprintf("%s.example.com", appIDs[1]),
Drains: []string{
fmt.Sprintf("syslog-tls://localhost:%s", appTLSDrain.port()),
Url: fmt.Sprintf("syslog-tls://localhost:%s", appTLSDrain.port()),
Credentials: []binding.Credentials{
{
Apps: []binding.App{
{
Hostname: fmt.Sprintf("%s.example.com", appIDs[1]),
AppID: appIDs[1],
},
},
},
},
},
},
aggregate: []binding.LegacyBinding{
aggregate: []binding.Binding{
{
Drains: []string{
fmt.Sprintf("syslog-tls://localhost:%s", aggregateDrain.port()),
},
Url: fmt.Sprintf("syslog-tls://localhost:%s", aggregateDrain.port()),
},
},
}
Expand Down Expand Up @@ -216,7 +227,7 @@ var _ = Describe("SyslogAgent", func() {
labels: map[string]string{
"direction": "egress",
"drain_scope": "aggregate",
"drain_url": bindingCache.aggregate[0].Drains[0],
"drain_url": bindingCache.aggregate[0].Url,
},
},
}
Expand Down Expand Up @@ -284,8 +295,8 @@ var _ = Describe("SyslogAgent", func() {
BeforeEach(func() {
agentCfg.DefaultDrainMetadata = false

oldURL := bindingCache.aggregate[0].Drains[0]
bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=false", oldURL)
oldURL := bindingCache.aggregate[0].Url
bindingCache.aggregate[0].Url = fmt.Sprintf("%s?disable-metadata=false", oldURL)
})

It("does not include tags in drains that do not set disable-metadata to false", func() {
Expand All @@ -311,10 +322,10 @@ var _ = Describe("SyslogAgent", func() {
BeforeEach(func() {
agentCfg.DefaultDrainMetadata = true

oldURL := bindingCache.aggregate[0].Drains[0]
bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=true", oldURL)
oldURL = bindingCache.bindings[0].Drains[0]
bindingCache.bindings[0].Drains[0] = fmt.Sprintf("%s?disable-metadata=true", oldURL)
oldURL := bindingCache.aggregate[0].Url
bindingCache.aggregate[0].Url = fmt.Sprintf("%s?disable-metadata=true", oldURL)
oldURL = bindingCache.bindings[0].Url
bindingCache.bindings[0].Url = fmt.Sprintf("%s?disable-metadata=true", oldURL)
})

It("does not send tags to those drains", func() {
Expand Down Expand Up @@ -353,8 +364,8 @@ var _ = Describe("SyslogAgent", func() {

Context("when the ssl-strict-internal param is set in that drain URL", func() {
BeforeEach(func() {
oldURL := bindingCache.aggregate[0].Drains[0]
bindingCache.aggregate[0].Drains[0] = fmt.Sprintf("%s?ssl-strict-internal=true", oldURL)
oldURL := bindingCache.aggregate[0].Url
bindingCache.aggregate[0].Url = fmt.Sprintf("%s?ssl-strict-internal=true", oldURL)
})

It("uses internal TLS settings to communicate with that drain", func() {
Expand Down Expand Up @@ -465,53 +476,6 @@ func emitLogs(ctx context.Context, appIDs []string, grpcPort int, testCerts *tes
}()
}

type fakeLegacyBindingCache struct {
*httptest.Server
bindings []binding.LegacyBinding
aggregate []binding.LegacyBinding
}

func (f *fakeLegacyBindingCache) startTLS(testCerts *testhelper.TestCerts) {
tlsConfig, err := tlsconfig.Build(
tlsconfig.WithInternalServiceDefaults(),
tlsconfig.WithIdentityFromFile(
testCerts.Cert("binding-cache"),
testCerts.Key("binding-cache"),
),
).Server(
tlsconfig.WithClientAuthenticationFromFile(testCerts.CA()),
)

Expect(err).ToNot(HaveOccurred())

f.Server = httptest.NewUnstartedServer(f)
f.Server.TLS = tlsConfig
f.Server.StartTLS()
}

func (f *fakeLegacyBindingCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var results []binding.LegacyBinding
if r.URL.Path == "/bindings" {
results = f.bindings
} else if r.URL.Path == "/aggregate" {
results = f.aggregate
} else {
w.WriteHeader(500)
return
}

resultData, err := json.Marshal(results)
if err != nil {
w.WriteHeader(500)
return
}

_, err = w.Write(resultData)
if err != nil {
w.WriteHeader(500)
}
}

type syslogHTTPSServer struct {
receivedMessages chan *rfc5424.Message
server *httptest.Server
Expand Down
5 changes: 1 addition & 4 deletions src/cmd/syslog-binding-cache/app/syslog_binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,13 @@ func (sbc *SyslogBindingCache) Run() {
go func() { sbc.log.Println("PPROF SERVER STOPPED " + sbc.pprofServer.ListenAndServe().Error()) }()
}
store := binding.NewStore(sbc.metrics)
legacyStore := binding.NewLegacyStore()
aggregateStore := binding.NewAggregateStore(sbc.config.AggregateDrainsFile)
poller := binding.NewPoller(sbc.apiClient(), sbc.config.APIPollingInterval, store, legacyStore, sbc.metrics, sbc.log)
poller := binding.NewPoller(sbc.apiClient(), sbc.config.APIPollingInterval, store, sbc.metrics, sbc.log)

go poller.Poll()

router := chi.NewRouter()
router.Get("/bindings", cache.LegacyHandler(legacyStore))
router.Get("/v2/bindings", cache.Handler(store))
router.Get("/aggregate", cache.LegacyAggregateHandler(aggregateStore))
router.Get("/v2/aggregate", cache.AggregateHandler(aggregateStore))

sbc.startServer(router)
Expand Down
50 changes: 0 additions & 50 deletions src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,56 +244,6 @@ var _ = Describe("App", func() {
}))
})

It("has an HTTP endpoint that returns legacy aggregate drains", func() {
addr := fmt.Sprintf("https://localhost:%d/aggregate", sbcPort)

var resp *http.Response
Eventually(func() error {
var err error
resp, err = client.Get(addr)
return err
}).Should(Succeed())
defer resp.Body.Close()

Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())

var result []binding.LegacyBinding
err = json.Unmarshal(body, &result)
Expect(err).ToNot(HaveOccurred())

Expect(result).To(HaveLen(1))
Expect(result[0].Drains).To(ConsistOf("syslog://test-hostname:1000", "syslog://test2:1000"))
})

It("has an HTTP endpoint that returns legacy bindings", func() {
addr := fmt.Sprintf("https://localhost:%d/bindings", sbcPort)

var resp *http.Response
Eventually(func() error {
var err error
resp, err = client.Get(addr)
return err
}).Should(Succeed())
defer resp.Body.Close()

Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())

var results []binding.LegacyBinding
err = json.Unmarshal(body, &results)
Expect(err).ToNot(HaveOccurred())

Expect(results).To(HaveLen(2))
result1 := binding.LegacyBinding{AppID: "app-id-1", Drains: []string{"syslog://drain-a", "syslog://drain-b"}, Hostname: "org.space.app-name-1", V2Available: true}
result2 := binding.LegacyBinding{AppID: "app-id-2", Drains: []string{"syslog://drain-c", "syslog://drain-d"}, Hostname: "org.space.app-name-2", V2Available: true}
Expect(results).Should(ConsistOf(result1, result2))
})

Context("when debug configuration is enabled", func() {
BeforeEach(func() {
sbcCfg.MetricsServer.DebugMetrics = true
Expand Down
Loading

0 comments on commit 6c78221

Please sign in to comment.