diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/FileDataSourceReader.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/FileDataSourceReader.scala index 30b6bf09..a5e81650 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/FileDataSourceReader.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/FileDataSourceReader.scala @@ -92,7 +92,8 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil .options(otherOptions) .schema(csvSchema.orNull) .csv(finalPath) - case SourceFileFormats.JSON => + // assume that each line in the txt files contains a separate JSON object. + case SourceFileFormats.JSON | SourceFileFormats.TXT=> if(sourceSettings.asStream) spark.readStream.options(mappingSource.options).schema(schema.orNull).json(finalPath) // add a dummy column called 'filename' to print a log when the data reading is started for a file diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala index 43714ed6..c8709acb 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala @@ -496,6 +496,34 @@ class FhirMappingJobManager( .toSeq // Convert to Seq[Resource] } } + + override def executeMappingJobAndReturn(mappingJobExecution: FhirMappingJobExecution, + sourceSettings: Map[String, DataSourceSettings], + terminologyServiceSettings: Option[TerminologyServiceSettings], + identityServiceSettings: Option[IdentityServiceSettings], + taskCompletionCallback: () => Unit): Future[Dataset[FhirMappingResult]] = { + import spark.implicits._ + // create an initial empty DataFrame + val initialDataFrame: Dataset[FhirMappingResult] = spark.emptyDataset[FhirMappingResult] + // fold over the mapping tasks to chain the futures sequentially + mappingJobExecution.mappingTasks.foldLeft(Future.successful(initialDataFrame)) { (accFuture, task) => + accFuture.flatMap { accDataFrame => + logger.info(s"Executing mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}") + readSourceAndExecuteTask(mappingJobExecution.job.id, task, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) + .map { dataFrame => + logger.info(s"Completed the execution of mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}") + // notify the caller that the mapping task execution is complete by invoking the taskCompletionCallback function + taskCompletionCallback() + // combine the accumulated DataFrame with the current task's DataFrame + accDataFrame.union(dataFrame) + }.recover { + case e: Throwable => + logger.error(s"Failed to execute mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}",e) + throw e + } + } + } + } } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/IFhirMappingJobManager.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/IFhirMappingJobManager.scala index dcd6d642..f3a58f95 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/IFhirMappingJobManager.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/IFhirMappingJobManager.scala @@ -3,6 +3,7 @@ package io.tofhir.engine.mapping import java.time.LocalDateTime import io.tofhir.engine.model._ +import org.apache.spark.sql.Dataset import org.apache.spark.sql.streaming.StreamingQuery import scala.concurrent.Future @@ -97,4 +98,22 @@ trait IFhirMappingJobManager { terminologyServiceSettings: Option[TerminologyServiceSettings] = None, identityServiceSettings: Option[IdentityServiceSettings] = None ): Future[Seq[FhirMappingResult]] + + /** + * Executes the specified FHIR mapping job and returns the resulting FhirMappingResult dataset. + * + * @param mappingJobExecution The FHIR Mapping Job execution details. + * @param sourceSettings A map containing settings for the source system(s). + * @param terminologyServiceSettings Optional settings for the terminology service to use within mappings (e.g., for lookupDisplay). + * @param identityServiceSettings Optional settings for the identity service to use within mappings (e.g., for resolveIdentifier). + * @param taskCompletionCallback A callback function to be invoked when a mapping task execution is completed. + * @return A Future containing a Dataset of FhirMappingResult representing the outcome of the mapping job. + */ + def executeMappingJobAndReturn(mappingJobExecution: FhirMappingJobExecution, + sourceSettings: Map[String, DataSourceSettings], + terminologyServiceSettings: Option[TerminologyServiceSettings] = None, + identityServiceSettings: Option[IdentityServiceSettings] = None, + taskCompletionCallback: () => Unit + ): Future[Dataset[FhirMappingResult]] + } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala index a88aa844..4fe18279 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala @@ -17,6 +17,8 @@ import java.util.regex.Pattern * @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job) * @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs */ +// TODO: The FhirMappingJobExecution model currently includes the entire FhirMappingJob ('job' field), which is unnecessary. +// We should remove the 'job' field from the model. Instead, add only the necessary fields to the model. case class FhirMappingJobExecution(id: String = UUID.randomUUID().toString, projectId: String = "", job: FhirMappingJob, diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala index 02cc7d84..99144acb 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala @@ -126,5 +126,6 @@ object SourceFileFormats { final val PARQUET = "parquet" final val JSON = "json" final val AVRO = "avro" + final val TXT = "txt" }