Skip to content

Commit

Permalink
💥 feat(FhirMappingTask): Add name field into FhirMappingTask. Change …
Browse files Browse the repository at this point in the history
…FhirMappingTask url usages with the new name field.
  • Loading branch information
Okanmercan99 committed Sep 11, 2024
1 parent 4537c2b commit 0d1b1c2
Show file tree
Hide file tree
Showing 42 changed files with 393 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object CommandLineInterface {
val mappingJob = FhirMappingJobFormatter.readMappingJobFromFile(mappingJobFilePath.get)
CommandExecutionContext(toFhirEngine = toFhirEngine,
fhirMappingJob = Some(mappingJob),
mappingNameUrlMap = Load.getMappingNameUrlTuples(mappingJob.mappings, toFhirEngine.mappingRepo))
mappingNameUrlMap = Load.getTaskNameUrlTuples(mappingJob.mappings, toFhirEngine.mappingRepo))
} catch {
case _: FileNotFoundException =>
println(s"The file cannot be found at the specified path found in the config:${mappingJobFilePath.get}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Load extends Command {
try {
val mappingJob = FhirMappingJobFormatter.readMappingJobFromFile(filePath)
println("The following FhirMappingJob successfully loaded.")
val newContext = command.CommandExecutionContext(context.toFhirEngine, Some(mappingJob), Load.getMappingNameUrlTuples(mappingJob.mappings, context.toFhirEngine.mappingRepo))
val newContext = command.CommandExecutionContext(context.toFhirEngine, Some(mappingJob), Load.getTaskNameUrlTuples(mappingJob.mappings, context.toFhirEngine.mappingRepo))
println(Load.serializeMappingJobToCommandLine(newContext))
newContext
} catch {
Expand All @@ -37,9 +37,9 @@ class Load extends Command {
}

object Load {
def getMappingNameUrlTuples(tasks: Seq[FhirMappingTask], mappingRepository: IFhirMappingRepository): Map[String, String] = {
def getTaskNameUrlTuples(tasks: Seq[FhirMappingTask], mappingRepository: IFhirMappingRepository): Map[String, String] = {
tasks.foldLeft(Map.empty[String, String]) { (map, task) => // Convert to tuple (name -> url)
map + (mappingRepository.getFhirMappingByUrl(task.mappingRef).name -> task.mappingRef)
map + (task.name -> task.mappingRef)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,18 @@ class Run extends Command {
context.toFhirEngine.runningJobRegistry.registerBatchJob(
mappingJobExecution,
Some(f),
s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}"
s"Spark job for job: ${mappingJobExecution.jobId} mappingTasks: ${mappingJobExecution.mappingTasks.map(_.name).mkString(" ")}"
)
} else {
// Understand whether the argument is the name or the URL of the mapping and then find/execute it.
if (args.length > 2) {
println(s"There are more than one arguments to run command. I will only process: ${args.head} and optionaly second")
// Find the mappingTask with given name and execute it
if (args.length > 1) {
println(s"There are more than one arguments to run command. I will only process: ${args.head}")
}
val mappingUrl = if (context.mappingNameUrlMap.contains(args.head)) {
context.mappingNameUrlMap(args.head)
} else {
args.head
}

val indexAmongMappingToRun = args.drop(1).headOption.flatMap(ind => Try(ind.toInt).toOption).getOrElse(1)

val task = mappingJob.mappings.filter(_.mappingRef == mappingUrl).drop(indexAmongMappingToRun - 1).headOption
if (task.isEmpty) {
println(s"There is no such mapping: $mappingUrl with index $indexAmongMappingToRun")

if(!context.mappingNameUrlMap.contains(args.head)){
println(s"There are no mappingTask with name ${args.head}!")
} else {
val task = mappingJob.mappings.find(_.name == args.head)
val mappingJobExecution: FhirMappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(task.get), job = mappingJob)
val f =
fhirMappingJobManager
Expand All @@ -71,7 +63,7 @@ class Run extends Command {
context.toFhirEngine.runningJobRegistry.registerBatchJob(
mappingJobExecution,
Some(f),
s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}"
s"Spark job for job: ${mappingJobExecution.jobId} mappingTasks: ${mappingJobExecution.mappingTasks.map(_.name).mkString(" ")}"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ package io.tofhir.engine.cli.command
import scala.util.Try

/**
* Stop one or more running mappings. Its usage is stop <jobID> [<mappingUrl>].
* Stop one or more running mappings. Its usage is stop <jobID> [<mappingTaskName>].
*/
class Stop extends Command {
override def execute(args: Seq[String], context: CommandExecutionContext): CommandExecutionContext = {
if (args.isEmpty || args.size < 2) {
println("stop command requires job id. Usage stop <jobId> <executionId> [<mappingUrl>]")
println("stop command requires job id. Usage stop <jobId> <executionId> [<mappingTaskName>]")
} else {
val jobId: String = args.head
val executionId: String = args(1)
val mappingUrl: Option[String] = Try(args(2)).toOption
mappingUrl match {
val mappingTaskName: Option[String] = Try(args(2)).toOption
mappingTaskName match {
case None =>
// Stop all the mappings running inside a job
// Stop all the mappingTasks running inside a job
context.toFhirEngine.runningJobRegistry.stopJobExecution(jobId, executionId)
case Some(url) =>
// Stop a single mapping
context.toFhirEngine.runningJobRegistry.stopMappingExecution(jobId, executionId, url)
case Some(name) =>
// Stop a single mappingTask
context.toFhirEngine.runningJobRegistry.stopMappingExecution(jobId, executionId, name)
}
println(s"Processed the command: stop ${args.mkString(" ")}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ object SinkHandler {
*
* @param spark The SparkSession instance.
* @param mappingJobExecution The execution context of the FHIR mapping job.
* @param mappingUrl An optional URL for the mapping.
* @param mappingTaskName An optional name for the mapping.
* @param df The DataFrame containing FHIR mapping results.
* @param resourceWriter The writer instance to write the FHIR resources.
*/
def writeMappingResult(spark: SparkSession, mappingJobExecution: FhirMappingJobExecution, mappingUrl: Option[String], df: Dataset[FhirMappingResult], resourceWriter: BaseFhirWriter): Unit = {
def writeMappingResult(spark: SparkSession, mappingJobExecution: FhirMappingJobExecution, mappingTaskName: Option[String], df: Dataset[FhirMappingResult], resourceWriter: BaseFhirWriter): Unit = {
//Cache the dataframe
df.cache()
//Filter out the errors
val invalidInputs = df.filter(_.error.map(_.code).contains(FhirMappingErrorCodes.INVALID_INPUT))
val mappingErrors = df.filter(_.error.exists(_.code != FhirMappingErrorCodes.INVALID_INPUT))
val mappedResults = df.filter(_.mappedResource.isDefined)
//Create an accumulator to accumulate the results that cannot be written
val accumName = s"${mappingJobExecution.jobId}:${mappingUrl.map(u => s"$u:").getOrElse("")}fhirWritingProblems"
val accumName = s"${mappingJobExecution.jobId}:${mappingTaskName.map(u => s"$u:").getOrElse("")}fhirWritingProblems"
val fhirWriteProblemsAccum: CollectionAccumulator[FhirMappingResult] = spark.sparkContext.collectionAccumulator[FhirMappingResult](accumName)
fhirWriteProblemsAccum.reset()
//Write the FHIR resources
resourceWriter.write(spark, mappedResults, fhirWriteProblemsAccum)
logMappingJobResult(mappingJobExecution,mappingUrl,mappedResults,fhirWriteProblemsAccum.value,mappingErrors,invalidInputs)
ErroneousRecordWriter.saveErroneousRecords(spark, mappingJobExecution, mappingUrl, fhirWriteProblemsAccum.value, mappingErrors, invalidInputs)
logMappingJobResult(mappingJobExecution,mappingTaskName,mappedResults,fhirWriteProblemsAccum.value,mappingErrors,invalidInputs)
ErroneousRecordWriter.saveErroneousRecords(spark, mappingJobExecution, mappingTaskName, fhirWriteProblemsAccum.value, mappingErrors, invalidInputs)
//Unpersist the data frame
df.unpersist()
}
Expand All @@ -48,22 +48,22 @@ object SinkHandler {
* @param mappingJobExecution The execution context of the FHIR mapping job.
* @param df The DataFrame containing FHIR mapping results.
* @param resourceWriter The writer instance to write the FHIR resources.
* @param mappingUrl The URL for the mapping.
* @param mappingTaskName The name for the mappingTask.
* @return The StreamingQuery instance representing the streaming query.
*/
def writeStream(spark: SparkSession, mappingJobExecution: FhirMappingJobExecution, df: Dataset[FhirMappingResult], resourceWriter: BaseFhirWriter, mappingUrl: String): StreamingQuery = {
def writeStream(spark: SparkSession, mappingJobExecution: FhirMappingJobExecution, df: Dataset[FhirMappingResult], resourceWriter: BaseFhirWriter, mappingTaskName: String): StreamingQuery = {
val datasetWrite = (dataset: Dataset[FhirMappingResult], _: Long) => try {
writeMappingResult(spark, mappingJobExecution, Some(mappingUrl), dataset, resourceWriter)
writeMappingResult(spark, mappingJobExecution, Some(mappingTaskName), dataset, resourceWriter)
} catch {
case e: Throwable =>
logger.error(s"Streaming chunk resulted in error for project: ${mappingJobExecution.projectId}, job: ${mappingJobExecution.jobId}, execution: ${mappingJobExecution.id}, mapping: $mappingUrl", e.getMessage)
logger.error(s"Streaming chunk resulted in error for project: ${mappingJobExecution.projectId}, job: ${mappingJobExecution.jobId}, execution: ${mappingJobExecution.id}, mappingTask: $mappingTaskName", e.getMessage)
}

df
.writeStream
// We need to provide explicit checkpoints. If not, Spark will use the same checkpoint directory, which mixes up the offsets for different streams.
// We create a new checkpoint directory per job and per mapping task included in the jobs.
.option("checkpointLocation", mappingJobExecution.getCheckpointDirectory(mappingUrl))
.option("checkpointLocation", mappingJobExecution.getCheckpointDirectory(mappingTaskName))
.foreachBatch(datasetWrite)
.start()
}
Expand All @@ -72,14 +72,14 @@ object SinkHandler {
* Logs mapping job results including the problems regarding to source data, mapping and generated FHIR resources.
*
* @param mappingJobExecution The mapping job execution
* @param mappingUrl The url of executed mapping
* @param mappingTaskName The name of executed mappingTask
* @param fhirResources written FHIR resources to the configured server
* @param notWrittenResources The FHIR resource errors
* @param mappingErrors The mapping errors
* @param invalidInputs The source data errors
* */
private def logMappingJobResult(mappingJobExecution:FhirMappingJobExecution,
mappingUrl:Option[String],
mappingTaskName:Option[String],
fhirResources: Dataset[FhirMappingResult],
notWrittenResources:util.List[FhirMappingResult],
mappingErrors:Dataset[FhirMappingResult],
Expand All @@ -94,10 +94,10 @@ object SinkHandler {
// Log the job result
if(mappingJobExecution.isStreamingJob){
// Log the result for streaming mapping task execution
ExecutionLogger.logExecutionResultForStreamingMappingTask(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfWritten, numOfNotWritten)
ExecutionLogger.logExecutionResultForStreamingMappingTask(mappingJobExecution, mappingTaskName, numOfInvalids, numOfNotMapped, numOfWritten, numOfNotWritten)
} else {
// Log the result for batch execution
ExecutionLogger.logExecutionResultForChunk(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfWritten, numOfNotWritten)
ExecutionLogger.logExecutionResultForChunk(mappingJobExecution, mappingTaskName, numOfInvalids, numOfNotMapped, numOfWritten, numOfNotWritten)
}

// Log the mapping and invalid input errors
Expand Down
Loading

0 comments on commit 0d1b1c2

Please sign in to comment.