From f89ae3a0f470f259d8301a4d38739de72336cca6 Mon Sep 17 00:00:00 2001 From: emrecam Date: Wed, 26 Jun 2024 16:20:02 +0300 Subject: [PATCH] :recycle: Refactor(FhirMappingJobExecution): Add an apply method to simplify FhirMappingJobExecution by removing job from definition --- .../main/scala/io/tofhir/engine/cli/Run.scala | 4 +- .../engine/data/write/SinkHandler.scala | 4 +- .../execution/ErroneousRecordWriter.scala | 2 +- .../execution/FileStreamInputArchiver.scala | 14 +++--- .../engine/execution/RunningJobRegistry.scala | 18 ++++---- .../mapping/FhirMappingJobManager.scala | 44 +++++++++---------- .../model/FhirMappingJobExecution.scala | 39 +++++++++++----- .../engine/model/FhirMappingJobResult.scala | 4 +- .../server/service/ExecutionService.scala | 4 +- 9 files changed, 74 insertions(+), 59 deletions(-) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/cli/Run.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/cli/Run.scala index 8ae9d9ca..2dc9badf 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/cli/Run.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/cli/Run.scala @@ -38,7 +38,7 @@ class Run extends Command { context.toFhirEngine.runningJobRegistry.registerBatchJob( mappingJobExecution, Some(f), - s"Spark job for job: ${mappingJobExecution.job.id} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}" + s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}" ) } else { // Understand whether the argument is the name or the URL of the mapping and then find/execute it. @@ -71,7 +71,7 @@ class Run extends Command { context.toFhirEngine.runningJobRegistry.registerBatchJob( mappingJobExecution, Some(f), - s"Spark job for job: ${mappingJobExecution.job.id} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}" + s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}" ) } } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala index f58a6109..5b29bacf 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala @@ -28,7 +28,7 @@ object SinkHandler { val mappingErrors = df.filter(_.error.exists(_.code != FhirMappingErrorCodes.INVALID_INPUT)) val mappedResults = df.filter(_.mappedResource.isDefined) //Create an accumulator to accumulate the results that cannot be written - val accumName = s"${mappingJobExecution.job.id}:${mappingUrl.map(u => s"$u:").getOrElse("")}fhirWritingProblems" + val accumName = s"${mappingJobExecution.jobId}:${mappingUrl.map(u => s"$u:").getOrElse("")}fhirWritingProblems" val fhirWriteProblemsAccum: CollectionAccumulator[FhirMappingResult] = spark.sparkContext.collectionAccumulator[FhirMappingResult](accumName) fhirWriteProblemsAccum.reset() //Write the FHIR resources @@ -53,7 +53,7 @@ object SinkHandler { writeBatch(spark, mappingJobExecution, Some(mappingUrl), dataset, resourceWriter) } catch { case e: Throwable => - logger.error(s"Streaming batch resulted in error for project: ${mappingJobExecution.projectId}, job: ${mappingJobExecution.job.id}, execution: ${mappingJobExecution.id}, mapping: $mappingUrl", e.getMessage) + logger.error(s"Streaming batch resulted in error for project: ${mappingJobExecution.projectId}, job: ${mappingJobExecution.jobId}, execution: ${mappingJobExecution.id}, mapping: $mappingUrl", e.getMessage) } df diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/ErroneousRecordWriter.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/ErroneousRecordWriter.scala index ac951ef9..aa1e4da9 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/ErroneousRecordWriter.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/ErroneousRecordWriter.scala @@ -30,7 +30,7 @@ object ErroneousRecordWriter { notWrittenResources: util.List[FhirMappingResult], mappingErrors: Dataset[FhirMappingResult], invalidInputs: Dataset[FhirMappingResult]): Unit = { - if (mappingJobExecution.job.dataProcessingSettings.saveErroneousRecords) { + if (mappingJobExecution.saveErroneousRecords) { if (!invalidInputs.isEmpty) { this.writeErroneousDataset(mappingJobExecution, invalidInputs, mappingUrl.get, FhirMappingErrorCodes.INVALID_INPUT) } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/FileStreamInputArchiver.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/FileStreamInputArchiver.scala index d85ace2f..f2f5e689 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/FileStreamInputArchiver.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/FileStreamInputArchiver.scala @@ -42,7 +42,7 @@ class FileStreamInputArchiver(runningJobRegistry: RunningJobRegistry) { * @param taskExecution */ def applyArchivingOnStreamingJob(taskExecution: FhirMappingJobExecution, mappingUrl: String): Unit = { - val archiveMode: ArchiveModes = taskExecution.job.dataProcessingSettings.archiveMode + val archiveMode: ArchiveModes = taskExecution.archiveMode if (archiveMode != ArchiveModes.OFF) { // Get the commit file directory for this execution val commitDirectory: File = new File(taskExecution.getCommitDirectory(mappingUrl)) @@ -90,9 +90,9 @@ object FileStreamInputArchiver { // Putting the archiving logic for the batch file inside within a try block so that it would not affect the caller in case of any exception. // That means archiving works in best-effort mode. try { - val archiveMode: ArchiveModes = execution.job.dataProcessingSettings.archiveMode + val archiveMode: ArchiveModes = execution.archiveMode if (archiveMode != ArchiveModes.OFF) { - val fileSystemSourceSettings = execution.job.sourceSettings.head._2.asInstanceOf[FileSystemSourceSettings] + val fileSystemSourceSettings = execution.sourceSettings.head._2.asInstanceOf[FileSystemSourceSettings] // get data folder path from data source settings val dataFolderPath = FileUtils.getPath(fileSystemSourceSettings.dataFolderPath).toString @@ -111,7 +111,7 @@ object FileStreamInputArchiver { }) } } catch { - case t:Throwable => logger.warn(s"Failed to apply archiving for job: ${execution.job.id}, execution: ${execution.id}") + case t:Throwable => logger.warn(s"Failed to apply archiving for job: ${execution.jobId}, execution: ${execution.id}") } } @@ -181,10 +181,10 @@ class StreamingArchiverTask(archiver: FileStreamInputArchiver, runningJobRegistr override def run(): Unit = { // Get executions with streaming queries and file system sources and apply val executions = runningJobRegistry.getRunningExecutionsWithCompleteMetadata() - .filter(execution => execution.isStreaming()) - .filter(execution => execution.job.sourceSettings.head._2.isInstanceOf[FileSystemSourceSettings]) + .filter(execution => execution.isStreaming) + .filter(execution => execution.sourceSettings.head._2.isInstanceOf[FileSystemSourceSettings]) executions.foreach(execution => { - if (execution.isStreaming()) { + if (execution.isStreaming) { execution.getStreamingQueryMap().keys.foreach(mappingUrl => { archiver.applyArchivingOnStreamingJob(execution, mappingUrl) }) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/RunningJobRegistry.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/RunningJobRegistry.scala index 0a7dd62d..1803ed57 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/RunningJobRegistry.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/RunningJobRegistry.scala @@ -51,10 +51,10 @@ class RunningJobRegistry(spark: SparkSession) { // If there is an error in the streaming query execution, call 'stopMappingExecution' function, // which is responsible for removing it from the registry. Without that, the registry might contain incorrect // information such as indicating that the job is still running when it has encountered an error. - streamingQueryFuture.recover(_ => stopMappingExecution(execution.job.id, execution.id, mappingUrl)) + streamingQueryFuture.recover(_ => stopMappingExecution(execution.jobId, execution.id, mappingUrl)) // Wait for the initial Future to be resolved val streamingQuery: StreamingQuery = Await.result(streamingQueryFuture, Duration.Inf) - val jobId: String = execution.job.id + val jobId: String = execution.jobId val executionId: String = execution.id // Multiple threads can update the global task map. So, updates are synchronized. @@ -106,7 +106,7 @@ class RunningJobRegistry(spark: SparkSession) { def registerBatchJob(execution: FhirMappingJobExecution, jobFuture: Option[Future[Unit]], jobDescription: String = ""): Unit = { val jobGroup: String = setSparkJobGroup(jobDescription) val executionWithJobGroupId = execution.copy(jobGroupIdOrStreamingQuery = Some(Left(jobGroup))) - val jobId: String = executionWithJobGroupId.job.id + val jobId: String = executionWithJobGroupId.jobId val executionId: String = executionWithJobGroupId.id runningTasks.synchronized { @@ -132,13 +132,13 @@ class RunningJobRegistry(spark: SparkSession) { def registerSchedulingJob(mappingJobExecution: FhirMappingJobExecution, scheduler: Scheduler): Unit = { // add it to the scheduledTasks map scheduledTasks - .getOrElseUpdate(mappingJobExecution.job.id, collection.mutable.Map[String, Scheduler]()) + .getOrElseUpdate(mappingJobExecution.jobId, collection.mutable.Map[String, Scheduler]()) .put(mappingJobExecution.id, scheduler) - logger.debug(s"Scheduling job ${mappingJobExecution.job.id} has been registered") + logger.debug(s"Scheduling job ${mappingJobExecution.jobId} has been registered") // add a scheduler listener to monitor task events scheduler.addSchedulerListener(new SchedulerListener { override def taskLaunching(executor: TaskExecutor): Unit = { - registerBatchJob(mappingJobExecution,None,s"Spark job for job: ${mappingJobExecution.job.id} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}") + registerBatchJob(mappingJobExecution,None,s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}") } override def taskSucceeded(executor: TaskExecutor): Unit = { @@ -270,7 +270,7 @@ class RunningJobRegistry(spark: SparkSession) { val execution: FhirMappingJobExecution = jobMapping(executionId) var removedMappingEntry: Option[Either[String, StreamingQuery]] = None // If it is a batch job do nothing but warn user about the situation - if (!execution.isStreaming()) { + if (!execution.isStreaming) { logger.warn(s"Execution with $jobId: $jobId, executionId: $executionId, mappingUrl: $mappingUrl won't be stopped with a specific mapping as this is a batch job." + s"Stop execution by providing only the jobId and executionId") @@ -308,7 +308,7 @@ class RunningJobRegistry(spark: SparkSession) { case None => true // We know we have an execution at this point case Some(url) => // For streaming jobs, we check whether there is a streaming query for the given mapping - if (runningTasks(jobId)(executionId).isStreaming()) { + if (runningTasks(jobId)(executionId).isStreaming) { runningTasks(jobId)(executionId).getStreamingQueryMap().contains(url) // For batch jobs, we don't differentiate mapping tasks. So, returning true directly (which indicates that the job execution is in progress) @@ -411,6 +411,6 @@ class RunningJobRegistry(spark: SparkSession) { */ private def handleCompletedBatchJob(execution: FhirMappingJobExecution): Unit = { FileStreamInputArchiver.applyArchivingOnBatchJob(execution) - removeExecutionFromRunningTasks(execution.job.id, execution.id) + removeExecutionFromRunningTasks(execution.jobId, execution.id) } } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala index c8709acb..9ebb2343 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala @@ -71,7 +71,7 @@ class FhirMappingJobManager( case se: SparkThrowable if se.getMessage.contains("cancelled part of cancelled job group") => val jobResult = FhirMappingJobResult(mappingJobExecution, Some(task.mappingRef), status = Some(FhirMappingJobResult.STOPPED)) logger.info(jobResult.toMapMarker, jobResult.toString) - throw FhirMappingJobStoppedException(s"Execution '${mappingJobExecution.id}' of job '${mappingJobExecution.job.id}' in project ${mappingJobExecution.projectId}' terminated manually!") + throw FhirMappingJobStoppedException(s"Execution '${mappingJobExecution.id}' of job '${mappingJobExecution.jobId}' in project ${mappingJobExecution.projectId}' terminated manually!") // Exceptions from Spark executors are wrapped inside a SparkException, which are caught below case se: SparkThrowable => se.getCause match { @@ -90,7 +90,7 @@ class FhirMappingJobManager( val jobResult = FhirMappingJobResult(mappingJobExecution, Some(task.mappingRef), status = Some(FhirMappingJobResult.FAILURE)) logger.error(jobResult.toMapMarker, jobResult.toString, e) } - } map { _ => logger.debug(s"MappingJob execution finished for MappingJob: ${mappingJobExecution.job.id}.") } + } map { _ => logger.debug(s"MappingJob execution finished for MappingJob: ${mappingJobExecution.jobId}.") } } /** @@ -113,13 +113,13 @@ class FhirMappingJobManager( fhirWriter.validate() mappingJobExecution.mappingTasks .map(t => { - logger.debug(s"Streaming mapping job ${mappingJobExecution.job.id}, mapping url ${t.mappingRef} is started and waiting for the data...") + logger.debug(s"Streaming mapping job ${mappingJobExecution.jobId}, mapping url ${t.mappingRef} is started and waiting for the data...") val jobResult = FhirMappingJobResult(mappingJobExecution, Some(t.mappingRef)) logger.info(jobResult.toMapMarker, jobResult.toString) // Construct a tuple of (mapping url, Future[StreamingQuery]) t.mappingRef -> - readSourceAndExecuteTask(mappingJobExecution.job.id, t, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) + readSourceAndExecuteTask(mappingJobExecution.jobId, t, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) .map(ts => { SinkHandler.writeStream(spark, mappingJobExecution, ts, fhirWriter, t.mappingRef) }) @@ -198,12 +198,12 @@ class FhirMappingJobManager( terminologyServiceSettings: Option[TerminologyServiceSettings] = None, identityServiceSettings: Option[IdentityServiceSettings] = None, schedulingSettings: BaseSchedulingSettings): Future[Unit] = { - val timeRange = getScheduledTimeRange(mappingJobExecution.job.id, mappingJobScheduler.get.folderUri, startTime) + val timeRange = getScheduledTimeRange(mappingJobExecution.jobId, mappingJobScheduler.get.folderUri, startTime) logger.info(s"Running scheduled job with the expression: ${schedulingSettings.cronExpression}") logger.info(s"Synchronizing data between ${timeRange._1} and ${timeRange._2}") executeMappingJob(mappingJobExecution, sourceSettings, sinkSettings, terminologyServiceSettings, identityServiceSettings, Some(timeRange)) .map(_ => { - val writer = new FileWriter(s"${mappingJobScheduler.get.folderUri.getPath}/${mappingJobExecution.job.id}.txt", true) + val writer = new FileWriter(s"${mappingJobScheduler.get.folderUri.getPath}/${mappingJobExecution.jobId}.txt", true) try writer.write(timeRange._2.toString + "\n") finally writer.close() //write last sync time to the file }) } @@ -250,7 +250,7 @@ class FhirMappingJobManager( val fhirWriter = FhirWriterFactory.apply(sinkSettings) fhirWriter.validate() - readSourceAndExecuteTask(mappingJobExecution.job.id, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) + readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) .map { dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingJobExecution.mappingTasks.head.mappingRef), dataset, fhirWriter) @@ -305,33 +305,33 @@ class FhirMappingJobManager( identityServiceSettings: Option[IdentityServiceSettings] = None, timeRange: Option[(LocalDateTime, LocalDateTime)] = None): Future[Unit] = { val mappingTask = mappingJobExecution.mappingTasks.head - logger.debug(s"Reading source data for mapping ${mappingTask.mappingRef} within mapping job ${mappingJobExecution.job.id} ...") - val (fhirMapping, mds, df) = readJoinSourceData(mappingTask, sourceSettings, timeRange, jobId = Some(mappingJobExecution.job.id)) // FIXME: Why reading again below? + logger.debug(s"Reading source data for mapping ${mappingTask.mappingRef} within mapping job ${mappingJobExecution.jobId} ...") + val (fhirMapping, mds, df) = readJoinSourceData(mappingTask, sourceSettings, timeRange, jobId = Some(mappingJobExecution.jobId)) // FIXME: Why reading again below? val sizeOfDf: Long = df.count() - logger.debug(s"$sizeOfDf records read for mapping ${mappingTask.mappingRef} within mapping job ${mappingJobExecution.job.id} ...") + logger.debug(s"$sizeOfDf records read for mapping ${mappingTask.mappingRef} within mapping job ${mappingJobExecution.jobId} ...") ToFhirConfig.engineConfig.maxBatchSizeForMappingJobs match { //If not specify run it as single batch case None => - logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.job.id} ...") - readSourceAndExecuteTask(mappingJobExecution.job.id, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) // Retrieve the source data and execute the mapping + logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} ...") + readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) // Retrieve the source data and execute the mapping .map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter case Some(batchSize) if sizeOfDf < batchSize => - logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.job} ...") - readSourceAndExecuteTask(mappingJobExecution.job.id, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) // Retrieve the source data and execute the mapping + logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} ...") + readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) // Retrieve the source data and execute the mapping .map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter //Otherwise divide the data into batches case Some(batchSize) => val numOfBatch: Int = Math.ceil(sizeOfDf * 1.0 / batchSize * 1.0).toInt - logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.job.id} in $numOfBatch batches ...") + logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} in $numOfBatch batches ...") val splitDf = df.randomSplit((1 to numOfBatch).map(_ => 1.0).toArray[Double]) splitDf .zipWithIndex .foldLeft(Future.apply(())) { case (fj, (df, i)) => fj.flatMap(_ => - executeTask(mappingJobExecution.job.id, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) + executeTask(mappingJobExecution.jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) .map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) - .map(_ => logger.debug(s"Batch ${i + 1} is completed for mapping ${mappingTask.mappingRef} within MappingJob: ${mappingJobExecution.job.id}...")) + .map(_ => logger.debug(s"Batch ${i + 1} is completed for mapping ${mappingTask.mappingRef} within MappingJob: ${mappingJobExecution.jobId}...")) ) } } @@ -489,7 +489,7 @@ class FhirMappingJobManager( terminologyServiceSettings: Option[TerminologyServiceSettings] = None, identityServiceSettings: Option[IdentityServiceSettings] = None, ): Future[Seq[FhirMappingResult]] = { - readSourceAndExecuteTask(mappingJobExecution.job.id, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) + readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) .map { dataFrame => dataFrame .collect() // Collect into an Array[String] @@ -508,17 +508,17 @@ class FhirMappingJobManager( // fold over the mapping tasks to chain the futures sequentially mappingJobExecution.mappingTasks.foldLeft(Future.successful(initialDataFrame)) { (accFuture, task) => accFuture.flatMap { accDataFrame => - logger.info(s"Executing mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}") - readSourceAndExecuteTask(mappingJobExecution.job.id, task, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) + logger.info(s"Executing mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.jobId}") + readSourceAndExecuteTask(mappingJobExecution.jobId, task, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) .map { dataFrame => - logger.info(s"Completed the execution of mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}") + logger.info(s"Completed the execution of mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.jobId}") // notify the caller that the mapping task execution is complete by invoking the taskCompletionCallback function taskCompletionCallback() // combine the accumulated DataFrame with the current task's DataFrame accDataFrame.union(dataFrame) }.recover { case e: Throwable => - logger.error(s"Failed to execute mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.job.id}",e) + logger.error(s"Failed to execute mapping task ${task.mappingRef} within mapping job: ${mappingJobExecution.jobId}",e) throw e } } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala index 4fe18279..39be4844 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala @@ -1,6 +1,7 @@ package io.tofhir.engine.model import io.tofhir.engine.config.ToFhirConfig +import io.tofhir.engine.model.ArchiveModes.ArchiveModes import io.tofhir.engine.util.SparkUtil import org.apache.spark.sql.streaming.StreamingQuery @@ -17,21 +18,21 @@ import java.util.regex.Pattern * @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job) * @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs */ -// TODO: The FhirMappingJobExecution model currently includes the entire FhirMappingJob ('job' field), which is unnecessary. -// We should remove the 'job' field from the model. Instead, add only the necessary fields to the model. -case class FhirMappingJobExecution(id: String = UUID.randomUUID().toString, - projectId: String = "", - job: FhirMappingJob, - mappingTasks: Seq[FhirMappingTask] = Seq.empty, - jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]] = None +case class FhirMappingJobExecution(id: String, + projectId: String, + jobId: String, + mappingTasks: Seq[FhirMappingTask], + jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]], + sourceSettings: Map[String,DataSourceSettings], + archiveMode: ArchiveModes, + saveErroneousRecords: Boolean ) { - /** * Returns whether the execution is streaming or not * @return */ - def isStreaming(): Boolean = { - job.sourceSettings.exists(source => source._2.asStream) + def isStreaming: Boolean = { + sourceSettings.exists(source => source._2.asStream) } /** @@ -80,7 +81,7 @@ case class FhirMappingJobExecution(id: String = UUID.randomUUID().toString, * @return Directory path in which the checkpoints will be managed */ def getCheckpointDirectory(mappingUrl: String): String = - Paths.get(ToFhirConfig.sparkCheckpointDirectory, job.id, mappingUrl.hashCode.toString).toString + Paths.get(ToFhirConfig.sparkCheckpointDirectory, jobId, mappingUrl.hashCode.toString).toString /** * Creates a commit directory for a mapping included in a job @@ -112,7 +113,7 @@ case class FhirMappingJobExecution(id: String = UUID.randomUUID().toString, * @return */ def getErrorOutputDirectory(mappingUrl: String, errorType: String): String = - Paths.get(ToFhirConfig.engineConfig.erroneousRecordsFolder, errorType, s"job-${job.id}", s"execution-${id}", + Paths.get(ToFhirConfig.engineConfig.erroneousRecordsFolder, errorType, s"job-${jobId}", s"execution-${id}", this.convertUrlToAlphaNumeric(mappingUrl)).toString @@ -137,5 +138,19 @@ case class FhirMappingJobExecution(id: String = UUID.randomUUID().toString, // Combine the transformed words to create a folder name extractedWords.mkString("-") } +} +/** + * An object to create FhirMappingJobExecution instances + */ +object FhirMappingJobExecution { + def apply(id: String = UUID.randomUUID().toString, + projectId: String = "", + job: FhirMappingJob, + mappingTasks: Seq[FhirMappingTask] = Seq.empty, + jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]] = None + ): FhirMappingJobExecution = { + FhirMappingJobExecution(id, projectId, job.id, mappingTasks, jobGroupIdOrStreamingQuery, job.sourceSettings, job.dataProcessingSettings.archiveMode, job.dataProcessingSettings.saveErroneousRecords) + } } + diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobResult.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobResult.scala index 2dd1202d..c8bc0358 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobResult.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobResult.scala @@ -42,7 +42,7 @@ case class FhirMappingJobResult(mappingJobExecution: FhirMappingJobExecution, } override def toString: String = { - s"toFHIR batch mapping result ($result) for execution '${mappingJobExecution.id}' of job '${mappingJobExecution.job.id}' in project '${mappingJobExecution.projectId}'${mappingUrl.map(u => s" for mapping '$u'").getOrElse("")}!\n" + + s"toFHIR batch mapping result ($result) for execution '${mappingJobExecution.id}' of job '${mappingJobExecution.jobId}' in project '${mappingJobExecution.projectId}'${mappingUrl.map(u => s" for mapping '$u'").getOrElse("")}!\n" + s"\t# of Invalid Rows: \t$numOfInvalids\n" + s"\t# of Not Mapped: \t$numOfNotMapped\n" + s"\t# of Failed writes:\t$numOfFailedWrites\n" + @@ -58,7 +58,7 @@ case class FhirMappingJobResult(mappingJobExecution: FhirMappingJobExecution, // create a new HashMap to store the marker attributes val markerMap: java.util.Map[String, Any] = new java.util.HashMap[String, Any]() // add attributes to the marker map - markerMap.put("jobId", mappingJobExecution.job.id) + markerMap.put("jobId", mappingJobExecution.jobId) markerMap.put("projectId", mappingJobExecution.projectId) markerMap.put("executionId", mappingJobExecution.id) markerMap.put("mappingUrl", mappingUrl.orNull) diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index a18d7021..9057bdfd 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -127,7 +127,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin val checkpointDirectory: File = new File(mappingJobExecution.getCheckpointDirectory(mapping.mappingRef)) io.FileUtils.deleteDirectory(checkpointDirectory) - logger.debug(s"Deleted checkpoint directory for jobId: ${mappingJobExecution.job.id}, executionId: ${mappingJobExecution.id}, mappingUrl: ${mapping.mappingRef}, path: ${checkpointDirectory.getAbsolutePath}") + logger.debug(s"Deleted checkpoint directory for jobId: ${mappingJobExecution.jobId}, executionId: ${mappingJobExecution.id}, mappingUrl: ${mapping.mappingRef}, path: ${checkpointDirectory.getAbsolutePath}") }) } @@ -180,7 +180,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin toFhirEngine.runningJobRegistry.registerBatchJob( mappingJobExecution, Some(executionFuture), - s"Spark job for job: ${mappingJobExecution.job.id} mappings: ${mappingTasks.map(_.mappingRef).mkString(" ")}" + s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingTasks.map(_.mappingRef).mkString(" ")}" ) } }