diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index da63b4565b67..562aba8b9559 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -282,6 +282,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of infinite rate values in CEL rate limit handling logic. {pull}39940[39940] - Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929] - Add ability to remove request trace logs from CEL input. {pull}39969[39969] +- Add ability to remove request trace logs from http_endpoint input. {pull}[] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 6acceb3bd501..251163fe4324 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -349,17 +349,21 @@ The HTTP method handled by the endpoint. If specified, `method` must be `POST`, The default method is `POST`. If `PUT` or `PATCH` are specified, requests using those method types are accepted, but are treated as `POST` requests and are expected to have a request body containing the request data. [float] -==== `tracer.filename` +==== `tracer.enabled` It is possible to log HTTP requests to a local file-system for debugging configurations. -This option is enabled by setting the `tracer.filename` value. Additional options are available to -tune log rotation behavior. - -To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id. -For Example, `http-request-trace-*.ndjson`. +This option is enabled by setting `tracer.enabled` to true and setting the `tracer.filename` value. +Additional options are available to tune log rotation behavior. To delete existing logs, set `tracer.enabled` +to false without unsetting the filename option. Enabling this option compromises security and should only be used for debugging. +[float] +==== `tracer.filename` + +To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the +filename and will be replaced with the input instance id. For Example, `http-request-trace-*.ndjson`. + [float] ==== `tracer.maxsize` diff --git a/x-pack/filebeat/input/http_endpoint/0001-review-changes.patch b/x-pack/filebeat/input/http_endpoint/0001-review-changes.patch new file mode 100644 index 000000000000..c6eca9fa6ebe --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/0001-review-changes.patch @@ -0,0 +1,221 @@ +From 4c19d586bba2ae0f37547f6411572d7da322e62c Mon Sep 17 00:00:00 2001 +From: Dan Kortschak +Date: Wed, 15 May 2024 07:17:13 +0930 +Subject: [PATCH] review changes + +--- + x-pack/filebeat/input/cel/input.go | 13 ++--- + x-pack/filebeat/input/cel/integration_test.go | 48 +++++++------------ + 2 files changed, 21 insertions(+), 40 deletions(-) + +diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go +index fc4a7e7e3d..89488ec172 100644 +--- a/x-pack/filebeat/input/cel/input.go ++++ b/x-pack/filebeat/input/cel/input.go +@@ -115,13 +115,12 @@ func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor + updateStatus(env.StatusReporter, status.Failed, "failed to run: "+err.Error()) + return err + } +- + updateStatus(env.StatusReporter, status.Stopped, "") + return nil + } + + // sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances. +-// The request.tracer.filename may have ":" when a httpjson input has cursor config and ++// The request.tracer.filename may have ":" when a cel input has cursor config and + // the macOS Finder will treat this as path-separator and causes to show up strange filepaths. + func sanitizeFileName(name string) string { + name = strings.ReplaceAll(name, ":", string(filepath.Separator)) +@@ -181,6 +180,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p + goodURL := cfg.Resource.URL.String() + state["url"] = goodURL + metrics.resource.Set(goodURL) ++ updateStatus(rep, status.Running, "") + // On entry, state is expected to be in the shape: + // + // { +@@ -214,18 +214,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p + // In addition to this and the functions and globals available + // from mito/lib, a global, useragent, is available to use + // in requests. +- +- updateStatus(rep, status.Running, "") + err = periodically(ctx, cfg.Interval, func() error { + log.Info("process repeated request") + var ( + budget = *cfg.MaxExecutions + waitUntil time.Time + ) ++ // Keep track of whether CEL is degraded for this periodic run. ++ var isDegraded bool + for { +- // keep track if CEL is degraded for this iteration +- isDegraded := false +- + if wait := time.Until(waitUntil); wait > 0 { + // We have a special-case wait for when we have a zero limit. + // x/time/rate allow a burst through even when the limit is zero +@@ -259,8 +256,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p + } + log.Errorw("failed evaluation", "error", err) + updateStatus(rep, status.Degraded, "failed evaluation: "+err.Error()) +- isDegraded = true + } ++ isDegraded = err != nil + metrics.celProcessingTime.Update(time.Since(start).Nanoseconds()) + if trace != nil { + log.Debugw("final transaction", "transaction.id", trace.TxID()) +diff --git a/x-pack/filebeat/input/cel/integration_test.go b/x-pack/filebeat/input/cel/integration_test.go +index 38e48991ee..709bb5d7b9 100644 +--- a/x-pack/filebeat/input/cel/integration_test.go ++++ b/x-pack/filebeat/input/cel/integration_test.go +@@ -56,7 +56,7 @@ func TestCheckinV2(t *testing.T) { + defer svrTwo.Close() + + // allStreams is an elastic-agent configuration with an ES output and one CEL +- // input with two streams ++ // input with two streams. + allStreams := []*proto.UnitExpected{ + { + Id: "output-unit", +@@ -138,7 +138,7 @@ func TestCheckinV2(t *testing.T) { + + // oneStream is an elastic-agent configuration with an ES output and one CEL + // input with one stream. Effectively this is the same as allStreams with +- // stream cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2 removed ++ // stream cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2 removed. + oneStream := []*proto.UnitExpected{ + { + Id: "output-unit", +@@ -204,7 +204,7 @@ func TestCheckinV2(t *testing.T) { + }, + } + +- // noStream is an elastic-agent configuration with just an ES output ++ // noStream is an elastic-agent configuration with just an ES output. + noStream := []*proto.UnitExpected{ + { + Id: "output-unit", +@@ -253,9 +253,9 @@ func TestCheckinV2(t *testing.T) { + } + defer server.Stop() + ++ // It's necessary to change os.Args so filebeat.Filebeat() can read the ++ // appropriate args at beat.Execute() + initialOSArgs := os.Args +- // necessary to change this so filebeat.Filebeat() can read the appropriate args +- // at beat.Execute() + os.Args = []string{ + "filebeat", + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), +@@ -278,7 +278,7 @@ func TestCheckinV2(t *testing.T) { + // of units expected for the server to respond with. + checks := []func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected){ + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all healthy ++ // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams +@@ -304,7 +304,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for one degraded ++ // Wait for one degraded. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams +@@ -329,7 +329,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all degraded ++ // Wait for all degraded. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams +@@ -355,7 +355,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all healthy ++ // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams +@@ -380,7 +380,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all healthy ++ // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams +@@ -436,7 +436,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all healthy ++ // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams +@@ -473,39 +473,24 @@ func TestCheckinV2(t *testing.T) { + }, + } + +- timer := time.NewTimer(3 * time.Minute) ++ const wait = 3 * time.Minute ++ timer := time.NewTimer(wait) + defer timer.Stop() +- for { ++ for len(checks) == 0 { + select { + case observed := <-observedStates: + matched, expected := checks[0](t, observed) +- + expectedUnits <- expected +- +- // if not matched, do not proceed to the next check + if !matched { + continue + } +- +- // check returned true, so reset the timer +- timer.Reset(3 * time.Minute) +- +- // proceed to the next check +- if len(checks) > 0 { +- checks = checks[1:] +- } +- +- // if no more checks, return +- if len(checks) == 0 { +- return +- } ++ timer.Reset(wait) ++ checks = checks[1:] + case err := <-beatRunErr: + if err != nil { + t.Fatalf("beat run err: %v", err) + } + case <-timer.C: +- // a check hasn't returned true for the whole timeout +- // so fail + t.Fatal("timeout waiting for checkin") + } + } +@@ -513,7 +498,6 @@ func TestCheckinV2(t *testing.T) { + } + + func extractStateAndPayload(observed *proto.CheckinObserved, inputID string) (proto.State, map[string]interface{}) { +- + for _, unit := range observed.GetUnits() { + if unit.Id == inputID { + return unit.GetState(), unit.Payload.AsMap() +-- +2.34.1 + diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 1618dc907583..5e1c93d2bc36 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -48,7 +48,16 @@ type config struct { CRCSecret string `config:"crc.secret"` IncludeHeaders []string `config:"include_headers"` PreserveOriginalEvent bool `config:"preserve_original_event"` - Tracer *lumberjack.Logger `config:"tracer"` + Tracer *tracerConfig `config:"tracer"` +} + +type tracerConfig struct { + Enabled *bool `config:"enabled"` + lumberjack.Logger `config:",inline"` +} + +func (t *tracerConfig) enabled() bool { + return t != nil && (t.Enabled == nil || *t.Enabled) } func defaultConfig() config { diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index b41d20e72c2b..f108420fd638 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -473,7 +473,7 @@ func Test_apiResponse(t *testing.T) { pub := new(publisher) metrics := newInputMetrics("") defer metrics.Close() - apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub.Publish, logp.NewLogger("http_endpoint.test"), metrics) + apiHandler := newHandler(ctx, newTracerConfig(tc.name, tc.conf, *withTraces), nil, pub.Publish, logp.NewLogger("http_endpoint.test"), metrics) // Execute handler. respRec := httptest.NewRecorder() @@ -491,12 +491,12 @@ func Test_apiResponse(t *testing.T) { } } -func tracerConfig(name string, cfg config, withTrace bool) config { +func newTracerConfig(name string, cfg config, withTrace bool) config { if !withTrace { return cfg } - cfg.Tracer = &lumberjack.Logger{ + cfg.Tracer = &tracerConfig{Logger: lumberjack.Logger{ Filename: filepath.Join(traceLogsDir, name+".ndjson"), - } + }} return cfg } diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 7f0440deb601..263188a481ee 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -13,9 +13,11 @@ import ( "encoding/json" "errors" "fmt" + "io/fs" "net" "net/http" "net/url" + "os" "path/filepath" "reflect" "strings" @@ -353,7 +355,7 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event preserveOriginalEvent: c.PreserveOriginalEvent, crc: newCRC(c.CRCProvider, c.CRCSecret), } - if c.Tracer != nil { + if c.Tracer.enabled() { w := zapcore.AddSync(c.Tracer) go func() { // Close the logger when we are done. @@ -372,10 +374,34 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event } else { h.scheme = "http" } + } else if c.Tracer != nil { + // We have a trace log name, but we are not enabled, + // so remove all trace logs we own. + err := os.Remove(c.Tracer.Filename) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("failed to remove request trace log", "path", c.Tracer.Filename, "error", err) + } + ext := filepath.Ext(c.Tracer.Filename) + base := strings.TrimSuffix(c.Tracer.Filename, ext) + paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext) + if err != nil { + log.Errorw("failed to collect request trace log path names", "error", err) + } + for _, p := range paths { + err = os.Remove(p) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("failed to remove request trace log", "path", p, "error", err) + } + } } return h } +// lumberjackTimestamp is a glob expression matching the time format string used +// by lumberjack when rolling over logs, "2006-01-02T15-04-05.000". +// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39 +const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]" + func htmlEscape(s string) string { var buf bytes.Buffer json.HTMLEscape(&buf, []byte(s))