Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/provider/okta: add request trac…
Browse files Browse the repository at this point in the history
…ing support
  • Loading branch information
efd6 committed Jun 6, 2024
1 parent 68371a0 commit 2714dab
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.ndjson
20 changes: 19 additions & 1 deletion x-pack/filebeat/input/entityanalytics/provider/okta/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

Expand Down Expand Up @@ -62,6 +64,9 @@ type conf struct {
// Request is the configuration for establishing
// HTTP requests to the API.
Request *requestConfig `config:"request"`

// Tracer allows configuration of request trace logging.
Tracer *lumberjack.Logger `config:"tracer"`
}

type requestConfig struct {
Expand Down Expand Up @@ -163,10 +168,23 @@ func (c *conf) Validate() error {
}
switch strings.ToLower(c.Dataset) {
case "", "all", "users", "devices":
return nil
default:
return errors.New("dataset must be 'all', 'users', 'devices' or empty")
}

if c.Tracer == nil {
return nil
}
if c.Tracer.Filename == "" {
return errors.New("request tracer must have a filename if used")
}
if c.Tracer.MaxSize == 0 {
// By default Lumberjack caps file sizes at 100MB which
// is excessive for a debugging logger, so default to 1MB
// which is the minimum.
c.Tracer.MaxSize = 1
}
return nil
}

func (c *conf) wantUsers() bool {
Expand Down
52 changes: 49 additions & 3 deletions x-pack/filebeat/input/entityanalytics/provider/okta/okta.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,22 @@ import (
"io"
"net/http"
"net/url"
"path/filepath"
"strings"
"time"

"github.com/hashicorp/go-retryablehttp"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/kvstore"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -105,8 +110,13 @@ func (p *oktaInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C
// Allow a single fetch operation to obtain limits from the API.
p.lim = rate.NewLimiter(1, 1)

if p.cfg.Tracer != nil {
id := sanitizeFileName(inputCtx.ID)
p.cfg.Tracer.Filename = strings.ReplaceAll(p.cfg.Tracer.Filename, "*", id)
}

var err error
p.client, err = newClient(p.cfg, p.logger)
p.client, err = newClient(ctxtool.FromCanceller(inputCtx.Cancelation), p.cfg, p.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -152,12 +162,14 @@ func (p *oktaInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C
}
}

func newClient(cfg conf, log *logp.Logger) (*http.Client, error) {
func newClient(ctx context.Context, cfg conf, log *logp.Logger) (*http.Client, error) {
c, err := cfg.Request.Transport.Client(clientOptions(cfg.Request.KeepAlive.settings())...)
if err != nil {
return nil, err
}

c = requestTrace(ctx, c, cfg, log)

c.CheckRedirect = checkRedirect(cfg.Request, log)

client := &retryablehttp.Client{
Expand All @@ -169,10 +181,44 @@ func newClient(cfg conf, log *logp.Logger) (*http.Client, error) {
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}

return client.StandardClient(), nil
}

// requestTrace decorates cli with an httplog.LoggingRoundTripper if cfg.Trace
// is non-nil.
func requestTrace(ctx context.Context, cli *http.Client, cfg conf, log *logp.Logger) *http.Client {
if cfg.Tracer == nil {
return cli
}
w := zapcore.AddSync(cfg.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
cfg.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

const margin = 1e3 // 1OkB ought to be enough room for all the remainder of the trace details.
maxSize := cfg.Tracer.MaxSize * 1e6
cli.Transport = httplog.NewLoggingRoundTripper(cli.Transport, traceLogger, max(0, maxSize-margin), log)
return cli
}

// sanitizeFileName returns name with ":" and "/" replaced with "_", removing
// repeated instances. The request.tracer.filename may have ":" when an input
// has cursor config and the macOS Finder will treat this as path-separator and
// causes to show up strange filepaths.
func sanitizeFileName(name string) string {
name = strings.ReplaceAll(name, ":", string(filepath.Separator))
name = filepath.Clean(name)
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
Expand Down
14 changes: 14 additions & 0 deletions x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package okta
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -17,11 +18,14 @@ import (
"time"

"golang.org/x/time/rate"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta"
"github.com/elastic/elastic-agent-libs/logp"
)

var trace = flag.Bool("request_trace", false, "enable request tracing during tests")

func TestOktaDoFetch(t *testing.T) {
tests := []struct {
dataset string
Expand Down Expand Up @@ -153,6 +157,16 @@ func TestOktaDoFetch(t *testing.T) {
lim: rate.NewLimiter(1, 1),
logger: logp.L(),
}
if *trace {
name := test.dataset
if name == "" {
name = "default"
}
a.cfg.Tracer = &lumberjack.Logger{
Filename: fmt.Sprintf("test_trace_%s.ndjson", name),
}
}
a.client = requestTrace(context.Background(), a.client, a.cfg, a.logger)

ss, err := newStateStore(store)
if err != nil {
Expand Down

0 comments on commit 2714dab

Please sign in to comment.