diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a8008f0e58b..16db23a1f62 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027] - Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008] - Fix handling of region name configuration in awss3 input {pull}36034[36034] +- Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107] *Heartbeat* diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 24c9684def1..56eadf2f894 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -76,7 +76,17 @@ func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { } } -type input struct{} +type input struct { + time func() time.Time +} + +// now is time.Now with a modifiable time source. +func (i input) now() time.Time { + if i.time == nil { + return time.Now() + } + return i.time() +} func (input) Name() string { return inputName } @@ -110,7 +120,7 @@ func sanitizeFileName(name string) string { return strings.ReplaceAll(name, string(filepath.Separator), "_") } -func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error { +func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error { cfg := src.cfg log := env.Logger.With("input_url", cfg.Resource.URL) @@ -221,8 +231,8 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub // Process a set of event requests. log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete}) metrics.executions.Add(1) - start := time.Now() - state, err = evalWith(ctx, prg, state) + start := i.now() + state, err = evalWith(ctx, prg, state, start) log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete}) if err != nil { switch { @@ -899,8 +909,20 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi return prg, nil } -func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) { - out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state}) +func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}, now time.Time) (map[string]interface{}, error) { + out, _, err := prg.ContextEval(ctx, map[string]interface{}{ + // Replace global program "now" with current time. This is necessary + // as the lib.Time now global is static at program instantiation time + // which will persist over multiple evaluations. The lib.Time behaviour + // is correct for mito where CEL program instances live for only a + // single evaluation. Rather than incurring the cost of creating a new + // cel.Program for each evaluation, shadow lib.Time's now with a new + // value for each eval. We retain the lib.Time now global for + // compatibility between CEL programs developed in mito with programs + // run in the input. + "now": now, + root: state, + }) if e := ctx.Err(); e != nil { err = e } diff --git a/x-pack/filebeat/input/cel/input_test.go b/x-pack/filebeat/input/cel/input_test.go index 0ab48bbfe21..4db00b26ad4 100644 --- a/x-pack/filebeat/input/cel/input_test.go +++ b/x-pack/filebeat/input/cel/input_test.go @@ -36,6 +36,7 @@ var inputTests = []struct { server func(*testing.T, http.HandlerFunc, map[string]interface{}) handler http.HandlerFunc config map[string]interface{} + time func() time.Time persistCursor map[string]interface{} want []map[string]interface{} wantCursor []map[string]interface{} @@ -57,6 +58,23 @@ var inputTests = []struct { {"message": "Hello, World!"}, }, }, + { + name: "hello_world_time", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":{"Hello, World!": now}}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + }, + time: func() time.Time { return time.Date(2010, 2, 8, 0, 0, 0, 0, time.UTC) }, + want: []map[string]interface{}{{ + "message": map[string]interface{}{ + "Hello, World!": "2010-02-08T00:00:00Z", + }, + }}, + }, { name: "bad_events_type", config: map[string]interface{}{ @@ -1238,7 +1256,7 @@ func TestInput(t *testing.T) { cancel() } } - err = input{}.run(v2Ctx, src, test.persistCursor, &client) + err = input{test.time}.run(v2Ctx, src, test.persistCursor, &client) if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr) }