From 8ca8432747e31eb6177fe46a953b512a840b1252 Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Wed, 11 Sep 2024 12:50:00 +0530 Subject: [PATCH] fix: read stream from config root --- modules/dagger/config.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/modules/dagger/config.go b/modules/dagger/config.go index d106669..67d2d0d 100644 --- a/modules/dagger/config.go +++ b/modules/dagger/config.go @@ -25,8 +25,8 @@ const keySinkInfluxMeasurementName = "SINK_INFLUX_MEASUREMENT_NAME" const keyRedisServer = "REDIS_SERVER" const SourceKafkaConsumerConfigAutoCommitEnable = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE" const SourceKafkaConsumerConfigAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" -const SinkTypeInflux = "influx" -const SinkTypeKafka = "kafka" +const SinkTypeInflux = "INFLUX" +const SinkTypeKafka = "KAFKA" const keySinkKafkaBrokers = "SINK_KAFKA_BROKERS" const keySinkKafkaStream = "SINK_KAFKA_STREAM" @@ -83,6 +83,7 @@ type Stream struct { SourceParquetFileDateRange interface{} `json:"SOURCE_PARQUET_FILE_DATE_RANGE"` SourceParquetFilePaths interface{} `json:"SOURCE_PARQUET_FILE_PATHS"` SourceKafkaConsumerConfigGroupID string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID"` + SourceKafkaConsumerConfigBootstrapServers string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"` } func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverConf) (*Config, error) { @@ -93,12 +94,7 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo } //transformation #1 - streamsJSON := cfg.EnvVariables[keyStreams] - var streams []Stream - err = json.Unmarshal([]byte(streamsJSON), &streams) - if err != nil { - return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error()) - } + streams := cfg.Streams for i := range streams { if len(streams[i].SourceDetails) == 0 { @@ -150,6 +146,7 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo //Longbow related transformation skipped //transformation #9 and #11 + cfg.Streams = []Stream{} for i := range streams { streams[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i) streams[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]