Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/provider/azuread: add request t…
Browse files Browse the repository at this point in the history
…racing support
  • Loading branch information
efd6 committed Jun 6, 2024
1 parent 2714dab commit 78f2fe1
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type azure struct {
logger *logp.Logger
auth authenticator.Authenticator
fetcher fetcher.Fetcher

ctx v2.Context
}

// Name returns the name of this provider.
Expand All @@ -71,6 +73,7 @@ func (p *azure) Test(testCtx v2.TestContext) error {
// Run will start data collection on this provider.
func (p *azure) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
p.logger = inputCtx.Logger.With("tenant_id", p.conf.TenantID, "provider", Name)
p.ctx = inputCtx
p.auth.SetLogger(p.logger)
p.fetcher.SetLogger(p.logger)
p.metrics = newMetrics(inputCtx.ID, nil)
Expand Down Expand Up @@ -575,7 +578,7 @@ func (p *azure) configure(cfg *config.C) (kvstore.Input, error) {
if p.auth, err = oauth2.New(cfg, p.Manager.Logger); err != nil {
return nil, fmt.Errorf("unable to create authenticator: %w", err)
}
if p.fetcher, err = graph.New(cfg, p.Manager.Logger, p.auth); err != nil {
if p.fetcher, err = graph.New(ctxtool.FromCanceller(p.ctx.Cancelation), p.ctx.ID, cfg, p.Manager.Logger, p.auth); err != nil {
return nil, fmt.Errorf("unable to create fetcher: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.ndjson
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ import (
"io"
"net/http"
"net/url"
"path/filepath"
"strings"

"github.com/google/uuid"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/collections"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/authenticator"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/fetcher"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -104,6 +110,9 @@ type graphConf struct {
Select selection `config:"select"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

// Tracer allows configuration of request trace logging.
Tracer *lumberjack.Logger `config:"tracer"`
}

type selection struct {
Expand Down Expand Up @@ -329,16 +338,22 @@ func (f *graph) doRequest(ctx context.Context, method, url string, body io.Reade
}

// New creates a new instance of the graph fetcher.
func New(cfg *config.C, logger *logp.Logger, auth authenticator.Authenticator) (fetcher.Fetcher, error) {
func New(ctx context.Context, id string, cfg *config.C, logger *logp.Logger, auth authenticator.Authenticator) (fetcher.Fetcher, error) {
var c graphConf
if err := cfg.Unpack(&c); err != nil {
return nil, fmt.Errorf("unable to unpack Graph API Fetcher config: %w", err)
}

if c.Tracer != nil {
id = sanitizeFileName(id)
c.Tracer.Filename = strings.ReplaceAll(c.Tracer.Filename, "*", id)
}

client, err := c.Transport.Client()
if err != nil {
return nil, fmt.Errorf("unable to create HTTP client: %w", err)
}
client = requestTrace(ctx, client, c, logger)

f := graph{
conf: c,
Expand Down Expand Up @@ -383,6 +398,41 @@ func New(cfg *config.C, logger *logp.Logger, auth authenticator.Authenticator) (
return &f, nil
}

// requestTrace decorates cli with an httplog.LoggingRoundTripper if cfg.Trace
// is non-nil.
func requestTrace(ctx context.Context, cli *http.Client, cfg graphConf, log *logp.Logger) *http.Client {
if cfg.Tracer == nil {
return cli
}
w := zapcore.AddSync(cfg.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
cfg.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

const margin = 1e3 // 1OkB ought to be enough room for all the remainder of the trace details.
maxSize := cfg.Tracer.MaxSize * 1e6
cli.Transport = httplog.NewLoggingRoundTripper(cli.Transport, traceLogger, max(0, maxSize-margin), log)
return cli
}

// sanitizeFileName returns name with ":" and "/" replaced with "_", removing
// repeated instances. The request.tracer.filename may have ":" when an input
// has cursor config and the macOS Finder will treat this as path-separator and
// causes to show up strange filepaths.
func sanitizeFileName(name string) string {
name = strings.ReplaceAll(name, ":", string(filepath.Separator))
name = filepath.Clean(name)
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}

func formatQuery(name string, query []string, dflt string) string {
q := dflt
if len(query) != 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package graph
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/collections"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/authenticator/mock"
Expand All @@ -27,6 +29,8 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

var trace = flag.Bool("request_trace", false, "enable request tracing during tests")

var usersResponse1 = apiUserResponse{
Users: []userAPI{
{
Expand Down Expand Up @@ -313,11 +317,16 @@ func TestGraph_Groups(t *testing.T) {
rawConf := graphConf{
APIEndpoint: "http://" + testSrv.addr,
}
if *trace {
rawConf.Tracer = &lumberjack.Logger{
Filename: "test_trace-*.ndjson",
}
}
c, err := config.NewConfigFrom(&rawConf)
require.NoError(t, err)
auth := mock.New(mock.DefaultTokenValue)

f, err := New(c, logp.L(), auth)
f, err := New(context.Background(), t.Name(), c, logp.L(), auth)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -372,11 +381,16 @@ func TestGraph_Users(t *testing.T) {
rawConf := graphConf{
APIEndpoint: "http://" + testSrv.addr,
}
if *trace {
rawConf.Tracer = &lumberjack.Logger{
Filename: "test_trace-*.ndjson",
}
}
c, err := config.NewConfigFrom(&rawConf)
require.NoError(t, err)
auth := mock.New(mock.DefaultTokenValue)

f, err := New(c, logp.L(), auth)
f, err := New(context.Background(), t.Name(), c, logp.L(), auth)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -477,11 +491,16 @@ func TestGraph_Devices(t *testing.T) {
APIEndpoint: "http://" + testSrv.addr,
Select: test.selection,
}
if *trace {
rawConf.Tracer = &lumberjack.Logger{
Filename: "test_trace-*.ndjson",
}
}
c, err := config.NewConfigFrom(&rawConf)
require.NoError(t, err)
auth := mock.New(mock.DefaultTokenValue)

f, err := New(c, logp.L(), auth)
f, err := New(context.Background(), t.Name(), c, logp.L(), auth)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down

0 comments on commit 78f2fe1

Please sign in to comment.