diff --git a/modules/dagger/config.go b/modules/dagger/config.go index 6107d8f..bc82405 100644 --- a/modules/dagger/config.go +++ b/modules/dagger/config.go @@ -213,9 +213,15 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo if source[i].SourceKafkaConsumerConfigGroupID == "" { source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i) } - source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] - source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] - source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] + if source[i].SourceKafkaConsumerConfigAutoCommitEnable == "" { + source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] + } + if source[i].SourceKafkaConsumerConfigAutoOffsetReset == "" { + source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] + } + if source[i].SourceKafkaConsumerConfigBootstrapServers == "" { + source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] + } } } @@ -247,10 +253,6 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo return nil, errors.ErrInvalid.WithMsgf("deployment_id must not have more than 53 chars") } - //transformation #7 - cfg.EnvVariables[keySinkInfluxURL] = flinkOut.Influx.URL - cfg.EnvVariables[keySinkInfluxPassword] = flinkOut.Influx.Password - cfg.EnvVariables[keySinkInfluxUsername] = flinkOut.Influx.Username //SINK_INFLUX_DB_NAME is added by client //SINK_INFLUX_MEASUREMENT_NAME is added by client //REDIS_SERVER is skipped @@ -272,6 +274,10 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo cfg.EnvVariables[keySinkKafkaKey] = cfg.Sink.SinkKafka.SinkKafkaProtoKey cfg.EnvVariables[keySinkKafkaLingerMs] = cfg.Sink.SinkKafka.SinkKafkaLingerMs } else if cfg.SinkType == SinkTypeInflux { + //transformation #7 + cfg.EnvVariables[keySinkInfluxURL] = flinkOut.Influx.URL + cfg.EnvVariables[keySinkInfluxPassword] = flinkOut.Influx.Password + cfg.EnvVariables[keySinkInfluxUsername] = flinkOut.Influx.Username cfg.EnvVariables[keySinkInfluxDBName] = cfg.Sink.SinkInflux.SinkInfluxDBName cfg.EnvVariables[keySinkInfluxMeasurementName] = cfg.Sink.SinkInflux.SinkInfluxMeasurementName } else if cfg.SinkType == SinkTypeBigquery {