From 6a5b42c1ed30a95ec5298d769278c422d8ec9839 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 25 Jun 2024 10:42:44 +0930 Subject: [PATCH] x-pack/filebeat/input/http_endpoint: add ability to remove request trace logs This is essentially a replay of #39969, but for the http_endpoint input. The previous configuration system did not allow users to remove trace logs from agents after they are no longer needed. This is potential security risk as it leaves potentially sensitive information on the file system beyond its required lifetime. The mechanism for communicating to the input whether to write logs is not currently powerful enough to indicate that existing logs should be removed without deleting logs from other instances. So add an enabled configuration option to allow the target name to be sent independently of whether the files should be written or removed. The new option is optional, defaulting to the previous behaviour so that it can be opted into via progressive repair in the client integrations. --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-http-endpoint.asciidoc | 16 +- .../http_endpoint/0001-review-changes.patch | 221 ++++++++++++++++++ x-pack/filebeat/input/http_endpoint/config.go | 11 +- .../input/http_endpoint/handler_test.go | 8 +- x-pack/filebeat/input/http_endpoint/input.go | 28 ++- 6 files changed, 273 insertions(+), 12 deletions(-) create mode 100644 x-pack/filebeat/input/http_endpoint/0001-review-changes.patch diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index da63b4565b67..562aba8b9559 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -282,6 +282,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of infinite rate values in CEL rate limit handling logic. {pull}39940[39940] - Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929] - Add ability to remove request trace logs from CEL input. {pull}39969[39969] +- Add ability to remove request trace logs from http_endpoint input. {pull}[] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 6acceb3bd501..251163fe4324 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -349,17 +349,21 @@ The HTTP method handled by the endpoint. If specified, `method` must be `POST`, The default method is `POST`. If `PUT` or `PATCH` are specified, requests using those method types are accepted, but are treated as `POST` requests and are expected to have a request body containing the request data. [float] -==== `tracer.filename` +==== `tracer.enabled` It is possible to log HTTP requests to a local file-system for debugging configurations. -This option is enabled by setting the `tracer.filename` value. Additional options are available to -tune log rotation behavior. - -To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id. -For Example, `http-request-trace-*.ndjson`. +This option is enabled by setting `tracer.enabled` to true and setting the `tracer.filename` value. +Additional options are available to tune log rotation behavior. To delete existing logs, set `tracer.enabled` +to false without unsetting the filename option. Enabling this option compromises security and should only be used for debugging. +[float] +==== `tracer.filename` + +To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the +filename and will be replaced with the input instance id. For Example, `http-request-trace-*.ndjson`. + [float] ==== `tracer.maxsize` diff --git a/x-pack/filebeat/input/http_endpoint/0001-review-changes.patch b/x-pack/filebeat/input/http_endpoint/0001-review-changes.patch new file mode 100644 index 000000000000..c6eca9fa6ebe --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/0001-review-changes.patch @@ -0,0 +1,221 @@ +From 4c19d586bba2ae0f37547f6411572d7da322e62c Mon Sep 17 00:00:00 2001 +From: Dan Kortschak +Date: Wed, 15 May 2024 07:17:13 +0930 +Subject: [PATCH] review changes + +--- + x-pack/filebeat/input/cel/input.go | 13 ++--- + x-pack/filebeat/input/cel/integration_test.go | 48 +++++++------------ + 2 files changed, 21 insertions(+), 40 deletions(-) + +diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go +index fc4a7e7e3d..89488ec172 100644 +--- a/x-pack/filebeat/input/cel/input.go ++++ b/x-pack/filebeat/input/cel/input.go +@@ -115,13 +115,12 @@ func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor + updateStatus(env.StatusReporter, status.Failed, "failed to run: "+err.Error()) + return err + } +- + updateStatus(env.StatusReporter, status.Stopped, "") + return nil + } + + // sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances. +-// The request.tracer.filename may have ":" when a httpjson input has cursor config and ++// The request.tracer.filename may have ":" when a cel input has cursor config and + // the macOS Finder will treat this as path-separator and causes to show up strange filepaths. + func sanitizeFileName(name string) string { + name = strings.ReplaceAll(name, ":", string(filepath.Separator)) +@@ -181,6 +180,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p + goodURL := cfg.Resource.URL.String() + state["url"] = goodURL + metrics.resource.Set(goodURL) ++ updateStatus(rep, status.Running, "") + // On entry, state is expected to be in the shape: + // + // { +@@ -214,18 +214,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p + // In addition to this and the functions and globals available + // from mito/lib, a global, useragent, is available to use + // in requests. +- +- updateStatus(rep, status.Running, "") + err = periodically(ctx, cfg.Interval, func() error { + log.Info("process repeated request") + var ( + budget = *cfg.MaxExecutions + waitUntil time.Time + ) ++ // Keep track of whether CEL is degraded for this periodic run. ++ var isDegraded bool + for { +- // keep track if CEL is degraded for this iteration +- isDegraded := false +- + if wait := time.Until(waitUntil); wait > 0 { + // We have a special-case wait for when we have a zero limit. + // x/time/rate allow a burst through even when the limit is zero +@@ -259,8 +256,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p + } + log.Errorw("failed evaluation", "error", err) + updateStatus(rep, status.Degraded, "failed evaluation: "+err.Error()) +- isDegraded = true + } ++ isDegraded = err != nil + metrics.celProcessingTime.Update(time.Since(start).Nanoseconds()) + if trace != nil { + log.Debugw("final transaction", "transaction.id", trace.TxID()) +diff --git a/x-pack/filebeat/input/cel/integration_test.go b/x-pack/filebeat/input/cel/integration_test.go +index 38e48991ee..709bb5d7b9 100644 +--- a/x-pack/filebeat/input/cel/integration_test.go ++++ b/x-pack/filebeat/input/cel/integration_test.go +@@ -56,7 +56,7 @@ func TestCheckinV2(t *testing.T) { + defer svrTwo.Close() + + // allStreams is an elastic-agent configuration with an ES output and one CEL +- // input with two streams ++ // input with two streams. + allStreams := []*proto.UnitExpected{ + { + Id: "output-unit", +@@ -138,7 +138,7 @@ func TestCheckinV2(t *testing.T) { + + // oneStream is an elastic-agent configuration with an ES output and one CEL + // input with one stream. Effectively this is the same as allStreams with +- // stream cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2 removed ++ // stream cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2 removed. + oneStream := []*proto.UnitExpected{ + { + Id: "output-unit", +@@ -204,7 +204,7 @@ func TestCheckinV2(t *testing.T) { + }, + } + +- // noStream is an elastic-agent configuration with just an ES output ++ // noStream is an elastic-agent configuration with just an ES output. + noStream := []*proto.UnitExpected{ + { + Id: "output-unit", +@@ -253,9 +253,9 @@ func TestCheckinV2(t *testing.T) { + } + defer server.Stop() + ++ // It's necessary to change os.Args so filebeat.Filebeat() can read the ++ // appropriate args at beat.Execute() + initialOSArgs := os.Args +- // necessary to change this so filebeat.Filebeat() can read the appropriate args +- // at beat.Execute() + os.Args = []string{ + "filebeat", + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), +@@ -278,7 +278,7 @@ func TestCheckinV2(t *testing.T) { + // of units expected for the server to respond with. + checks := []func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected){ + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all healthy ++ // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams +@@ -304,7 +304,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for one degraded ++ // Wait for one degraded. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams +@@ -329,7 +329,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all degraded ++ // Wait for all degraded. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams +@@ -355,7 +355,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all healthy ++ // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams +@@ -380,7 +380,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all healthy ++ // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams +@@ -436,7 +436,7 @@ func TestCheckinV2(t *testing.T) { + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { +- // wait for all healthy ++ // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams +@@ -473,39 +473,24 @@ func TestCheckinV2(t *testing.T) { + }, + } + +- timer := time.NewTimer(3 * time.Minute) ++ const wait = 3 * time.Minute ++ timer := time.NewTimer(wait) + defer timer.Stop() +- for { ++ for len(checks) == 0 { + select { + case observed := <-observedStates: + matched, expected := checks[0](t, observed) +- + expectedUnits <- expected +- +- // if not matched, do not proceed to the next check + if !matched { + continue + } +- +- // check returned true, so reset the timer +- timer.Reset(3 * time.Minute) +- +- // proceed to the next check +- if len(checks) > 0 { +- checks = checks[1:] +- } +- +- // if no more checks, return +- if len(checks) == 0 { +- return +- } ++ timer.Reset(wait) ++ checks = checks[1:] + case err := <-beatRunErr: + if err != nil { + t.Fatalf("beat run err: %v", err) + } + case <-timer.C: +- // a check hasn't returned true for the whole timeout +- // so fail + t.Fatal("timeout waiting for checkin") + } + } +@@ -513,7 +498,6 @@ func TestCheckinV2(t *testing.T) { + } + + func extractStateAndPayload(observed *proto.CheckinObserved, inputID string) (proto.State, map[string]interface{}) { +- + for _, unit := range observed.GetUnits() { + if unit.Id == inputID { + return unit.GetState(), unit.Payload.AsMap() +-- +2.34.1 + diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 1618dc907583..5e1c93d2bc36 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -48,7 +48,16 @@ type config struct { CRCSecret string `config:"crc.secret"` IncludeHeaders []string `config:"include_headers"` PreserveOriginalEvent bool `config:"preserve_original_event"` - Tracer *lumberjack.Logger `config:"tracer"` + Tracer *tracerConfig `config:"tracer"` +} + +type tracerConfig struct { + Enabled *bool `config:"enabled"` + lumberjack.Logger `config:",inline"` +} + +func (t *tracerConfig) enabled() bool { + return t != nil && (t.Enabled == nil || *t.Enabled) } func defaultConfig() config { diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index b41d20e72c2b..f108420fd638 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -473,7 +473,7 @@ func Test_apiResponse(t *testing.T) { pub := new(publisher) metrics := newInputMetrics("") defer metrics.Close() - apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub.Publish, logp.NewLogger("http_endpoint.test"), metrics) + apiHandler := newHandler(ctx, newTracerConfig(tc.name, tc.conf, *withTraces), nil, pub.Publish, logp.NewLogger("http_endpoint.test"), metrics) // Execute handler. respRec := httptest.NewRecorder() @@ -491,12 +491,12 @@ func Test_apiResponse(t *testing.T) { } } -func tracerConfig(name string, cfg config, withTrace bool) config { +func newTracerConfig(name string, cfg config, withTrace bool) config { if !withTrace { return cfg } - cfg.Tracer = &lumberjack.Logger{ + cfg.Tracer = &tracerConfig{Logger: lumberjack.Logger{ Filename: filepath.Join(traceLogsDir, name+".ndjson"), - } + }} return cfg } diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 7f0440deb601..263188a481ee 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -13,9 +13,11 @@ import ( "encoding/json" "errors" "fmt" + "io/fs" "net" "net/http" "net/url" + "os" "path/filepath" "reflect" "strings" @@ -353,7 +355,7 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event preserveOriginalEvent: c.PreserveOriginalEvent, crc: newCRC(c.CRCProvider, c.CRCSecret), } - if c.Tracer != nil { + if c.Tracer.enabled() { w := zapcore.AddSync(c.Tracer) go func() { // Close the logger when we are done. @@ -372,10 +374,34 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event } else { h.scheme = "http" } + } else if c.Tracer != nil { + // We have a trace log name, but we are not enabled, + // so remove all trace logs we own. + err := os.Remove(c.Tracer.Filename) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("failed to remove request trace log", "path", c.Tracer.Filename, "error", err) + } + ext := filepath.Ext(c.Tracer.Filename) + base := strings.TrimSuffix(c.Tracer.Filename, ext) + paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext) + if err != nil { + log.Errorw("failed to collect request trace log path names", "error", err) + } + for _, p := range paths { + err = os.Remove(p) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("failed to remove request trace log", "path", p, "error", err) + } + } } return h } +// lumberjackTimestamp is a glob expression matching the time format string used +// by lumberjack when rolling over logs, "2006-01-02T15-04-05.000". +// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39 +const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]" + func htmlEscape(s string) string { var buf bytes.Buffer json.HTMLEscape(&buf, []byte(s))