Skip to content

Commit

Permalink
fix: kafka source overriding
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishan Arya committed Oct 7, 2024
1 parent 4301fdc commit 5e79f32
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions modules/dagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,15 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo
if source[i].SourceKafkaConsumerConfigGroupID == "" {
source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i)
}
source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]
source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset]
source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers]
if source[i].SourceKafkaConsumerConfigAutoCommitEnable == "" {
source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]
}
if source[i].SourceKafkaConsumerConfigAutoOffsetReset == "" {
source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset]
}
if source[i].SourceKafkaConsumerConfigBootstrapServers == "" {
source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers]
}
}
}

Expand Down Expand Up @@ -247,10 +253,6 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo
return nil, errors.ErrInvalid.WithMsgf("deployment_id must not have more than 53 chars")
}

//transformation #7
cfg.EnvVariables[keySinkInfluxURL] = flinkOut.Influx.URL
cfg.EnvVariables[keySinkInfluxPassword] = flinkOut.Influx.Password
cfg.EnvVariables[keySinkInfluxUsername] = flinkOut.Influx.Username
//SINK_INFLUX_DB_NAME is added by client
//SINK_INFLUX_MEASUREMENT_NAME is added by client
//REDIS_SERVER is skipped
Expand All @@ -272,6 +274,10 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo
cfg.EnvVariables[keySinkKafkaKey] = cfg.Sink.SinkKafka.SinkKafkaProtoKey
cfg.EnvVariables[keySinkKafkaLingerMs] = cfg.Sink.SinkKafka.SinkKafkaLingerMs
} else if cfg.SinkType == SinkTypeInflux {
//transformation #7
cfg.EnvVariables[keySinkInfluxURL] = flinkOut.Influx.URL
cfg.EnvVariables[keySinkInfluxPassword] = flinkOut.Influx.Password
cfg.EnvVariables[keySinkInfluxUsername] = flinkOut.Influx.Username
cfg.EnvVariables[keySinkInfluxDBName] = cfg.Sink.SinkInflux.SinkInfluxDBName
cfg.EnvVariables[keySinkInfluxMeasurementName] = cfg.Sink.SinkInflux.SinkInfluxMeasurementName
} else if cfg.SinkType == SinkTypeBigquery {
Expand Down

0 comments on commit 5e79f32

Please sign in to comment.