Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/cel: add ability to remove request trace logs #39969

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add request trace support for Okta and EntraID entity analytics providers. {pull}39821[39821]
- Fix handling of infinite rate values in CEL rate limit handling logic. {pull}39940[39940]
- Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929]
- Add ability to remove request trace logs from CEL input. {pull}39969[39969]

*Auditbeat*

Expand Down
18 changes: 12 additions & 6 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -675,17 +675,23 @@ The value of the response that specifies the maximum overall resource request ra
The maximum burst size. Burst is the maximum number of resource requests that can be made above the overall rate limit.

[float]
==== `resource.tracer.filename`
==== `resource.tracer.enable`

It is possible to log HTTP requests and responses in a CEL program to a local file-system for debugging configurations.
This option is enabled by setting the `resource.tracer.filename` value. Additional options are available to
tune log rotation behavior.

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id.
For Example, `http-request-trace-*.ndjson`.
This option is enabled by setting `resource.tracer.enabled` to true and setting the `resource.tracer.filename` value.
Additional options are available to tune log rotation behavior. To delete existing logs, set `resource.tracer.enabled`
to false without unsetting the filename option.

Enabling this option compromises security and should only be used for debugging.

[float]
==== `resource.tracer.filename`

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the
filename and will be replaced with the input instance id. For Example, `http-request-trace-*.ndjson`. Setting
`resource.tracer.filename` with `resource.tracer.enable` set to false will cause any existing trace logs matching
the filename option to be deleted.

[float]
==== `resource.tracer.maxsize`

Expand Down
11 changes: 10 additions & 1 deletion x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,16 @@ type ResourceConfig struct {

Transport httpcommon.HTTPTransportSettings `config:",inline"`

Tracer *lumberjack.Logger `config:"tracer"`
Tracer *tracerConfig `config:"tracer"`
}

type tracerConfig struct {
Enabled *bool `config:"enabled"`
lumberjack.Logger `config:",inline"`
}

func (t *tracerConfig) enabled() bool {
return t != nil && (t.Enabled == nil || *t.Enabled)
}

type urlConfig struct {
Expand Down
28 changes: 27 additions & 1 deletion x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
"errors"
"fmt"
"io"
"io/fs"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
"regexp"
Expand Down Expand Up @@ -717,6 +719,11 @@
return limit, true
}

// lumberjackTimestamp is a glob expression matching the time format string used
// by lumberjack when rolling over logs, "2006-01-02T15-04-05.000".
// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39
const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]"

func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry) (*http.Client, *httplog.LoggingRoundTripper, error) {
if !wantClient(cfg) {
return nil, nil, nil
Expand All @@ -740,7 +747,7 @@
}

var trace *httplog.LoggingRoundTripper
if cfg.Resource.Tracer != nil {
if cfg.Resource.Tracer.enabled() {
w := zapcore.AddSync(cfg.Resource.Tracer)
go func() {
// Close the logger when we are done.
Expand All @@ -758,6 +765,25 @@
maxSize := cfg.Resource.Tracer.MaxSize * 1e6
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger, max(0, maxSize-margin), log)
c.Transport = trace
} else if cfg.Resource.Tracer != nil {
chrisberkhout marked this conversation as resolved.
Show resolved Hide resolved
// We have a trace log name, but we are not enabled,
// so remove all trace logs we own.
err = os.Remove(cfg.Resource.Tracer.Filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", cfg.Resource.Tracer.Filename, "error", err)
}
ext := filepath.Ext(cfg.Resource.Tracer.Filename)
base := strings.TrimSuffix(cfg.Resource.Tracer.Filename, ext)
paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext)
if err != nil {
log.Errorw("failed to collect request trace log path names", "error", err)
}
for _, p := range paths {
err = os.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", p, "error", err)
}
}
}

if reg != nil {
Expand Down Expand Up @@ -1213,7 +1239,7 @@
// walkMap walks to all ends of the provided path in m and applies fn to the
// final element of each walk. Nested arrays are not handled.
func walkMap(m mapstr.M, path string, fn func(parent mapstr.M, key string)) {
key, rest, more := strings.Cut(path, ".")

Check failure on line 1242 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

rest declared and not used (typecheck)
v, ok := m[key]
if !ok {
return
Expand Down
106 changes: 106 additions & 0 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var inputTests = []struct {
wantCursor []map[string]interface{}
wantErr error
wantFile string
wantNoFile string
}{
// Autonomous tests (no FS or net dependency).
{
Expand Down Expand Up @@ -1141,6 +1142,102 @@ var inputTests = []struct {
},
wantFile: filepath.Join("logs", "http-request-trace-test_id_tracer_filename_sanitization.ndjson"),
},
{
name: "tracer_filename_sanitization_enabled",
server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
server := httptest.NewServer(h)
config["resource.url"] = server.URL
t.Cleanup(server.Close)
},
config: map[string]interface{}{
"interval": 1,
"resource.tracer.enabled": true,
"resource.tracer.filename": "logs/http-request-trace-*.ndjson",
"state": map[string]interface{}{
"fake_now": "2002-10-02T15:00:00Z",
},
"program": `
// Use terse non-standard check for presence of timestamp. The standard
// alternative is to use has(state.cursor) && has(state.cursor.timestamp).
(!is_error(state.cursor.timestamp) ?
state.cursor.timestamp
:
timestamp(state.fake_now)-duration('10m')
).as(time_cursor,
string(state.url).parse_url().with_replace({
"RawQuery": {"$filter": ["alertCreationTime ge "+string(time_cursor)]}.format_query()
}).format_url().as(url, bytes(get(url).Body)).decode_json().as(event, {
"events": [event],
// Get the timestamp from the event if it exists, otherwise advance a little to break a request loop.
// Due to the name of the @timestamp field, we can't use has(), so use is_error().
"cursor": [{"timestamp": !is_error(event["@timestamp"]) ? event["@timestamp"] : time_cursor+duration('1s')}],

// Just for testing, cycle this back into the next state.
"fake_now": state.fake_now
}))
`,
chrisberkhout marked this conversation as resolved.
Show resolved Hide resolved
},
handler: dateCursorHandler(),
want: []map[string]interface{}{
{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"},
{"@timestamp": "2002-10-02T15:00:02Z", "foo": "bar"},
},
wantCursor: []map[string]interface{}{
{"timestamp": "2002-10-02T15:00:00Z"},
{"timestamp": "2002-10-02T15:00:01Z"},
{"timestamp": "2002-10-02T15:00:02Z"},
},
wantFile: filepath.Join("logs", "http-request-trace-test_id_tracer_filename_sanitization_enabled.ndjson"),
},
{
name: "tracer_filename_sanitization_disabled",
server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
server := httptest.NewServer(h)
config["resource.url"] = server.URL
t.Cleanup(server.Close)
},
config: map[string]interface{}{
"interval": 1,
"resource.tracer.enabled": false,
"resource.tracer.filename": "logs/http-request-trace-*.ndjson",
"state": map[string]interface{}{
"fake_now": "2002-10-02T15:00:00Z",
},
"program": `
// Use terse non-standard check for presence of timestamp. The standard
// alternative is to use has(state.cursor) && has(state.cursor.timestamp).
(!is_error(state.cursor.timestamp) ?
state.cursor.timestamp
:
timestamp(state.fake_now)-duration('10m')
).as(time_cursor,
string(state.url).parse_url().with_replace({
"RawQuery": {"$filter": ["alertCreationTime ge "+string(time_cursor)]}.format_query()
}).format_url().as(url, bytes(get(url).Body)).decode_json().as(event, {
"events": [event],
// Get the timestamp from the event if it exists, otherwise advance a little to break a request loop.
// Due to the name of the @timestamp field, we can't use has(), so use is_error().
"cursor": [{"timestamp": !is_error(event["@timestamp"]) ? event["@timestamp"] : time_cursor+duration('1s')}],

// Just for testing, cycle this back into the next state.
"fake_now": state.fake_now
}))
`,
},
handler: dateCursorHandler(),
want: []map[string]interface{}{
{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"},
{"@timestamp": "2002-10-02T15:00:02Z", "foo": "bar"},
},
wantCursor: []map[string]interface{}{
{"timestamp": "2002-10-02T15:00:00Z"},
{"timestamp": "2002-10-02T15:00:01Z"},
{"timestamp": "2002-10-02T15:00:02Z"},
},
wantNoFile: filepath.Join("logs", "http-request-trace-test_id_tracer_filename_sanitization_disabled*"),
},
{
name: "pagination_cursor_object",
server: newTestServer(httptest.NewServer),
Expand Down Expand Up @@ -1625,6 +1722,15 @@ func TestInput(t *testing.T) {
t.Errorf("expected log file not found: %v", err)
}
}
if test.wantNoFile != "" {
paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile))
if err != nil {
t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err)
}
if len(paths) != 0 {
t.Errorf("unexpected files found: %v", paths)
}
}
})
}
}
Expand Down
Loading