diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariable.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariable.scala index 70b7c684..5f75b9d1 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariable.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariable.scala @@ -4,4 +4,6 @@ object EnvironmentVariable extends Enumeration { type EnvironmentVariable = Value final val FHIR_REPO_URL = Value("FHIR_REPO_URL") final val DATA_FOLDER_PATH= Value("DATA_FOLDER_PATH") + final val SOURCE_URL = Value("SOURCE_URL") + final val REDCAP_PROJECT_ID= Value("REDCAP_PROJECT_ID") } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariableResolver.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariableResolver.scala index e03bf208..97fe77d1 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariableResolver.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariableResolver.scala @@ -1,6 +1,6 @@ package io.tofhir.engine.env -import io.tofhir.engine.model.{FhirMappingJob, FhirRepositorySinkSettings, FhirSinkSettings, FileSystemSourceSettings, MappingJobSourceSettings} +import io.tofhir.engine.model.{FhirMappingJob, FhirMappingTask, FhirRepositorySinkSettings, FhirSinkSettings, FileSystemSourceSettings, KafkaSource, KafkaSourceSettings, MappingJobSourceSettings} /** * Provide functions to find and resolve the environment variables used within FhirMappingJob definitions. @@ -49,7 +49,8 @@ object EnvironmentVariableResolver { def resolveFhirMappingJob(job: FhirMappingJob): FhirMappingJob = { job.copy( sourceSettings = resolveSourceSettings(job.sourceSettings), - sinkSettings = resolveSinkSettings(job.sinkSettings) + sinkSettings = resolveSinkSettings(job.sinkSettings), + mappings = resolveMappings(job.mappings) ) } @@ -64,11 +65,32 @@ object EnvironmentVariableResolver { key -> (value match { case fs: FileSystemSourceSettings => fs.copy(dataFolderPath = resolveEnvironmentVariables(fs.dataFolderPath)) + case ks: KafkaSourceSettings if ks.asRedCap => + ks.copy(sourceUri = resolveEnvironmentVariables(ks.sourceUri)) case other => other // No changes for other types }) } } + /** + * Resolves environment variables in mappings. + * + * @param mappings The list of mapping tasks + * @return The resolved mappings + */ + private def resolveMappings(mappings: Seq[FhirMappingTask]): Seq[FhirMappingTask] = { + mappings.map { mapping => + val updatedSourceBinding = mapping.sourceBinding.map { case (key, source) => + key -> (source match { + case kafkaSource: KafkaSource => + kafkaSource.copy(topicName = resolveEnvironmentVariables(kafkaSource.topicName)) + case other => other // No changes for other source types + }) + } + + mapping.copy(sourceBinding = updatedSourceBinding) + } + } /** * Resolves environment variables in FhirRepositorySinkSettings.