Skip to content

Commit

Permalink
✨ Extend FhirMappingJobManager with a function to execute a job and r…
Browse files Browse the repository at this point in the history
…eturn its results without writing them to a sink
  • Loading branch information
dogukan10 committed Jun 24, 2024
1 parent 9d0e467 commit 8891fd7
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
.options(otherOptions)
.schema(csvSchema.orNull)
.csv(finalPath)
case SourceFileFormats.JSON =>
// assume that each line in the txt files contains a separate JSON object.
case SourceFileFormats.JSON | SourceFileFormats.TXT=>
if(sourceSettings.asStream)
spark.readStream.options(mappingSource.options).schema(schema.orNull).json(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,34 @@ class FhirMappingJobManager(
.toSeq // Convert to Seq[Resource]
}
}

override def executeMappingJobAndReturn(mappingJobExecution: FhirMappingJobExecution,
sourceSettings: Map[String, DataSourceSettings],
terminologyServiceSettings: Option[TerminologyServiceSettings],
identityServiceSettings: Option[IdentityServiceSettings],
taskCompletionCallback: () => Unit): Future[Dataset[FhirMappingResult]] = {
import spark.implicits._
// create an initial empty DataFrame
val initialDataFrame: Dataset[FhirMappingResult] = spark.emptyDataset[FhirMappingResult]
// fold over the mapping tasks to chain the futures sequentially
mappingJobExecution.mappingTasks.foldLeft(Future.successful(initialDataFrame)) { (accFuture, task) =>
accFuture.flatMap { accDataFrame =>
logger.info(s"Executing mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}")
readSourceAndExecuteTask(mappingJobExecution.job.id, task, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId))
.map { dataFrame =>
logger.info(s"Completed the execution of mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}")
// notify the caller that the mapping task execution is complete by invoking the taskCompletionCallback function
taskCompletionCallback()
// combine the accumulated DataFrame with the current task's DataFrame
accDataFrame.union(dataFrame)
}.recover {
case e: Throwable =>
logger.error(s"Failed to execute mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}",e)
throw e
}
}
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.tofhir.engine.mapping
import java.time.LocalDateTime

import io.tofhir.engine.model._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.streaming.StreamingQuery

import scala.concurrent.Future
Expand Down Expand Up @@ -97,4 +98,22 @@ trait IFhirMappingJobManager {
terminologyServiceSettings: Option[TerminologyServiceSettings] = None,
identityServiceSettings: Option[IdentityServiceSettings] = None
): Future[Seq[FhirMappingResult]]

/**
* Executes the specified FHIR mapping job and returns the resulting FhirMappingResult dataset.
*
* @param mappingJobExecution The FHIR Mapping Job execution details.
* @param sourceSettings A map containing settings for the source system(s).
* @param terminologyServiceSettings Optional settings for the terminology service to use within mappings (e.g., for lookupDisplay).
* @param identityServiceSettings Optional settings for the identity service to use within mappings (e.g., for resolveIdentifier).
* @param taskCompletionCallback A callback function to be invoked when a mapping task execution is completed.
* @return A Future containing a Dataset of FhirMappingResult representing the outcome of the mapping job.
*/
def executeMappingJobAndReturn(mappingJobExecution: FhirMappingJobExecution,
sourceSettings: Map[String, DataSourceSettings],
terminologyServiceSettings: Option[TerminologyServiceSettings] = None,
identityServiceSettings: Option[IdentityServiceSettings] = None,
taskCompletionCallback: () => Unit
): Future[Dataset[FhirMappingResult]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import java.util.regex.Pattern
* @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job)
* @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs
*/
// TODO: The FhirMappingJobExecution model currently includes the entire FhirMappingJob ('job' field), which is unnecessary.
// We should remove the 'job' field from the model. Instead, add only the necessary fields to the model.
case class FhirMappingJobExecution(id: String = UUID.randomUUID().toString,
projectId: String = "",
job: FhirMappingJob,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,6 @@ object SourceFileFormats {
final val PARQUET = "parquet"
final val JSON = "json"
final val AVRO = "avro"
final val TXT = "txt"
}

0 comments on commit 8891fd7

Please sign in to comment.