Skip to content

Commit

Permalink
fix: read stream from config root
Browse files Browse the repository at this point in the history
  • Loading branch information
ishanarya0 committed Sep 11, 2024
1 parent 0fe1f26 commit 8ca8432
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions modules/dagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ const keySinkInfluxMeasurementName = "SINK_INFLUX_MEASUREMENT_NAME"
const keyRedisServer = "REDIS_SERVER"
const SourceKafkaConsumerConfigAutoCommitEnable = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"
const SourceKafkaConsumerConfigAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
const SinkTypeInflux = "influx"
const SinkTypeKafka = "kafka"
const SinkTypeInflux = "INFLUX"
const SinkTypeKafka = "KAFKA"
const keySinkKafkaBrokers = "SINK_KAFKA_BROKERS"
const keySinkKafkaStream = "SINK_KAFKA_STREAM"

Expand Down Expand Up @@ -83,6 +83,7 @@ type Stream struct {
SourceParquetFileDateRange interface{} `json:"SOURCE_PARQUET_FILE_DATE_RANGE"`
SourceParquetFilePaths interface{} `json:"SOURCE_PARQUET_FILE_PATHS"`
SourceKafkaConsumerConfigGroupID string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID"`
SourceKafkaConsumerConfigBootstrapServers string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"`
}

func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverConf) (*Config, error) {
Expand All @@ -93,12 +94,7 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo
}

//transformation #1
streamsJSON := cfg.EnvVariables[keyStreams]
var streams []Stream
err = json.Unmarshal([]byte(streamsJSON), &streams)
if err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error())
}
streams := cfg.Streams

for i := range streams {
if len(streams[i].SourceDetails) == 0 {
Expand Down Expand Up @@ -150,6 +146,7 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo
//Longbow related transformation skipped

//transformation #9 and #11
cfg.Streams = []Stream{}
for i := range streams {
streams[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i)
streams[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]
Expand Down

0 comments on commit 8ca8432

Please sign in to comment.