Skip to content

Commit

Permalink
context propagation: pkg/apiserver (#3272)
Browse files Browse the repository at this point in the history
* context propagation: apic.Push()

* context propagation: NewServer()

* lint
  • Loading branch information
mmetc authored Oct 9, 2024
1 parent 40021b6 commit b9bccfa
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ issues:
# `err` is often shadowed, we may continue to do it
- linters:
- govet
text: "shadow: declaration of \"err\" shadows declaration"
text: "shadow: declaration of \"(err|ctx)\" shadows declaration"

- linters:
- errcheck
Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec-cli/clipapi/papi.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (cli *cliPapi) sync(ctx context.Context, out io.Writer, db *database.Client
return fmt.Errorf("unable to initialize API client: %w", err)
}

t.Go(apic.Push)
t.Go(func() error { return apic.Push(ctx) })

papi, err := apiserver.NewPAPI(apic, db, cfg.API.Server.ConsoleConfig, log.GetLevel())
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions cmd/crowdsec/api.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"errors"
"fmt"
"runtime"
Expand All @@ -14,12 +15,12 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
)

func initAPIServer(cConfig *csconfig.Config) (*apiserver.APIServer, error) {
func initAPIServer(ctx context.Context, cConfig *csconfig.Config) (*apiserver.APIServer, error) {
if cConfig.API.Server.OnlineClient == nil || cConfig.API.Server.OnlineClient.Credentials == nil {
log.Info("push and pull to Central API disabled")
}

apiServer, err := apiserver.NewServer(cConfig.API.Server)
apiServer, err := apiserver.NewServer(ctx, cConfig.API.Server)
if err != nil {
return nil, fmt.Errorf("unable to run local API: %w", err)
}
Expand Down Expand Up @@ -58,11 +59,14 @@ func initAPIServer(cConfig *csconfig.Config) (*apiserver.APIServer, error) {

func serveAPIServer(apiServer *apiserver.APIServer) {
apiReady := make(chan bool, 1)

apiTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/serveAPIServer")

go func() {
defer trace.CatchPanic("crowdsec/runAPIServer")
log.Debugf("serving API after %s ms", time.Since(crowdsecT0))

if err := apiServer.Run(apiReady); err != nil {
log.Fatal(err)
}
Expand All @@ -76,6 +80,7 @@ func serveAPIServer(apiServer *apiserver.APIServer) {
<-apiTomb.Dying() // lock until go routine is dying
pluginTomb.Kill(nil)
log.Infof("serve: shutting down api server")

return apiServer.Shutdown()
})
<-apiReady
Expand All @@ -87,5 +92,6 @@ func hasPlugins(profiles []*csconfig.ProfileCfg) bool {
return true
}
}

return false
}
10 changes: 6 additions & 4 deletions cmd/crowdsec/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func debugHandler(sig os.Signal, cConfig *csconfig.Config) error {
func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
var tmpFile string

ctx := context.TODO()

// re-initialize tombs
acquisTomb = tomb.Tomb{}
parsersTomb = tomb.Tomb{}
Expand All @@ -74,7 +76,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
cConfig.API.Server.OnlineClient = nil
}

apiServer, err := initAPIServer(cConfig)
apiServer, err := initAPIServer(ctx, cConfig)
if err != nil {
return nil, fmt.Errorf("unable to init api server: %w", err)
}
Expand All @@ -88,7 +90,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
return nil, err
}

if err := hub.Load(); err != nil {
if err = hub.Load(); err != nil {
return nil, err
}

Expand Down Expand Up @@ -374,7 +376,7 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
cConfig.API.Server.OnlineClient = nil
}

apiServer, err := initAPIServer(cConfig)
apiServer, err := initAPIServer(ctx, cConfig)
if err != nil {
return fmt.Errorf("api server init: %w", err)
}
Expand All @@ -390,7 +392,7 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
return err
}

if err := hub.Load(); err != nil {
if err = hub.Load(); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/apiserver/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (l *LAPI) RecordResponse(t *testing.T, ctx context.Context, verb string, ur
}

func InitMachineTest(t *testing.T, ctx context.Context) (*gin.Engine, models.WatcherAuthResponse, csconfig.Config) {
router, config := NewAPITest(t)
router, config := NewAPITest(t, ctx)
loginResp := LoginToTestAPI(t, ctx, router, config)

return router, loginResp, config
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestCreateAlert(t *testing.T) {

func TestCreateAlertChannels(t *testing.T) {
ctx := context.Background()
apiServer, config := NewAPIServer(t)
apiServer, config := NewAPIServer(t, ctx)
apiServer.controller.PluginChannel = make(chan csplugin.ProfileAlert)
apiServer.InitController()

Expand Down Expand Up @@ -437,7 +437,7 @@ func TestDeleteAlertTrustedIPS(t *testing.T) {
// cfg.API.Server.TrustedIPs = []string{"1.2.3.4", "1.2.4.0/24", "::"}
cfg.API.Server.TrustedIPs = []string{"1.2.3.4", "1.2.4.0/24"}
cfg.API.Server.ListenURI = "::8080"
server, err := NewServer(cfg.API.Server)
server, err := NewServer(ctx, cfg.API.Server)
require.NoError(t, err)

err = server.InitController()
Expand Down
3 changes: 1 addition & 2 deletions pkg/apiserver/api_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
)

func TestAPIKey(t *testing.T) {
router, config := NewAPITest(t)

ctx := context.Background()
router, config := NewAPITest(t, ctx)

APIKey := CreateTestBouncer(t, ctx, config.API.Server.DbConfig)

Expand Down
12 changes: 6 additions & 6 deletions pkg/apiserver/apic.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func NewAPIC(ctx context.Context, config *csconfig.OnlineApiClientCfg, dbClient
}

// keep track of all alerts in cache and push it to CAPI every PushInterval.
func (a *apic) Push() error {
func (a *apic) Push(ctx context.Context) error {
defer trace.CatchPanic("lapi/pushToAPIC")

var cache models.AddSignalsRequest
Expand All @@ -276,7 +276,7 @@ func (a *apic) Push() error {
return nil
}

go a.Send(&cache)
go a.Send(ctx, &cache)

return nil
case <-ticker.C:
Expand All @@ -289,7 +289,7 @@ func (a *apic) Push() error {
a.mu.Unlock()
log.Infof("Signal push: %d signals to push", len(cacheCopy))

go a.Send(&cacheCopy)
go a.Send(ctx, &cacheCopy)
}
case alerts := <-a.AlertsAddChan:
var signals []*models.AddSignalsRequestItem
Expand Down Expand Up @@ -351,7 +351,7 @@ func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig
return true
}

func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
func (a *apic) Send(ctx context.Context, cacheOrig *models.AddSignalsRequest) {
/*we do have a problem with this :
The apic.Push background routine reads from alertToPush chan.
This chan is filled by Controller.CreateAlert
Expand All @@ -375,7 +375,7 @@ func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
for {
if pageEnd >= len(cache) {
send = cache[pageStart:]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

defer cancel()

Expand All @@ -389,7 +389,7 @@ func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
}

send = cache[pageStart:pageEnd]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

defer cancel()

Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/apic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ func TestAPICPush(t *testing.T) {
api.Shutdown()
}()

err = api.Push()
err = api.Push(ctx)
require.NoError(t, err)
assert.Equal(t, tc.expectedCalls, httpmock.GetTotalCallCount())
})
Expand Down
10 changes: 4 additions & 6 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,9 @@ func newGinLogger(config *csconfig.LocalApiServerCfg) (*log.Logger, string, erro

// NewServer creates a LAPI server.
// It sets up a gin router, a database client, and a controller.
func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
func NewServer(ctx context.Context, config *csconfig.LocalApiServerCfg) (*APIServer, error) {
var flushScheduler *gocron.Scheduler

ctx := context.TODO()

dbClient, err := database.NewClient(ctx, config.DbConfig)
if err != nil {
return nil, fmt.Errorf("unable to init database client: %w", err)
Expand Down Expand Up @@ -300,8 +298,8 @@ func (s *APIServer) Router() (*gin.Engine, error) {
return s.router, nil
}

func (s *APIServer) apicPush() error {
if err := s.apic.Push(); err != nil {
func (s *APIServer) apicPush(ctx context.Context) error {
if err := s.apic.Push(ctx); err != nil {
log.Errorf("capi push: %s", err)
return err
}
Expand Down Expand Up @@ -337,7 +335,7 @@ func (s *APIServer) papiSync() error {
}

func (s *APIServer) initAPIC(ctx context.Context) {
s.apic.pushTomb.Go(s.apicPush)
s.apic.pushTomb.Go(func() error { return s.apicPush(ctx) })
s.apic.pullTomb.Go(func() error { return s.apicPull(ctx) })

// csConfig.API.Server.ConsoleConfig.ShareCustomScenarios
Expand Down
Loading

0 comments on commit b9bccfa

Please sign in to comment.