diff --git a/modules/firehose/config.go b/modules/firehose/config.go index 3428c81b..9f17e939 100644 --- a/modules/firehose/config.go +++ b/modules/firehose/config.go @@ -50,6 +50,9 @@ type Config struct { // EnvVariables contains all the firehose environment config values. EnvVariables map[string]string `json:"env_variables,omitempty"` + // ResetOffset represents the value to which kafka consumer offset was set to + ResetOffset string `json:"reset_offset,omitempty"` + Limits UsageSpec `json:"limits,omitempty"` Requests UsageSpec `json:"requests,omitempty"` Telegraf *Telegraf `json:"telegraf,omitempty"` diff --git a/modules/firehose/driver_plan.go b/modules/firehose/driver_plan.go index 6b590abb..3fafb940 100644 --- a/modules/firehose/driver_plan.go +++ b/modules/firehose/driver_plan.go @@ -130,6 +130,14 @@ func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.Acti immediately := fd.timeNow() + curConf, err := readConfig(exr.Resource, exr.Resource.Spec.Configs, fd.conf) + if err != nil { + return nil, err + } + + curConf.ResetOffset = resetValue + + exr.Resource.Spec.Configs = mustJSON(curConf) exr.Resource.State = resource.State{ Status: resource.StatusPending, Output: exr.Resource.State.Output, diff --git a/modules/firehose/driver_plan_test.go b/modules/firehose/driver_plan_test.go index 7233dde7..fcb8b8ab 100644 --- a/modules/firehose/driver_plan_test.go +++ b/modules/firehose/driver_plan_test.go @@ -368,6 +368,7 @@ func TestFirehoseDriver_Plan(t *testing.T) { "SOURCE_KAFKA_BROKERS": "localhost:9092", "SOURCE_KAFKA_TOPIC": "foo-log", }, + "reset_offset": "latest", "limits": map[string]any{ "cpu": "200m", "memory": "512Mi", @@ -376,6 +377,7 @@ func TestFirehoseDriver_Plan(t *testing.T) { "cpu": "200m", "memory": "512Mi", }, + "stopped": false, }), }, State: resource.State{