diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/FileStreamInputArchiver.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/FileStreamInputArchiver.scala index f2f5e689..51b39d24 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/FileStreamInputArchiver.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/FileStreamInputArchiver.scala @@ -92,9 +92,8 @@ object FileStreamInputArchiver { try { val archiveMode: ArchiveModes = execution.archiveMode if (archiveMode != ArchiveModes.OFF) { - val fileSystemSourceSettings = execution.sourceSettings.head._2.asInstanceOf[FileSystemSourceSettings] // get data folder path from data source settings - val dataFolderPath = FileUtils.getPath(fileSystemSourceSettings.dataFolderPath).toString + val dataFolderPath = FileUtils.getPath(execution.fileSystemSourceDataFolderPath).toString // Get paths of the input files referred by the mapping tasks paths = execution.mappingTasks.flatMap(mapping => { @@ -181,10 +180,10 @@ class StreamingArchiverTask(archiver: FileStreamInputArchiver, runningJobRegistr override def run(): Unit = { // Get executions with streaming queries and file system sources and apply val executions = runningJobRegistry.getRunningExecutionsWithCompleteMetadata() - .filter(execution => execution.isStreaming) - .filter(execution => execution.sourceSettings.head._2.isInstanceOf[FileSystemSourceSettings]) + .filter(execution => execution.isStreamingJob) + .filter(execution => execution.hasFileSystemSource) executions.foreach(execution => { - if (execution.isStreaming) { + if (execution.isStreamingJob) { execution.getStreamingQueryMap().keys.foreach(mappingUrl => { archiver.applyArchivingOnStreamingJob(execution, mappingUrl) }) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/RunningJobRegistry.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/RunningJobRegistry.scala index 1803ed57..27867c30 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/RunningJobRegistry.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/RunningJobRegistry.scala @@ -270,7 +270,7 @@ class RunningJobRegistry(spark: SparkSession) { val execution: FhirMappingJobExecution = jobMapping(executionId) var removedMappingEntry: Option[Either[String, StreamingQuery]] = None // If it is a batch job do nothing but warn user about the situation - if (!execution.isStreaming) { + if (!execution.isStreamingJob) { logger.warn(s"Execution with $jobId: $jobId, executionId: $executionId, mappingUrl: $mappingUrl won't be stopped with a specific mapping as this is a batch job." + s"Stop execution by providing only the jobId and executionId") @@ -308,7 +308,7 @@ class RunningJobRegistry(spark: SparkSession) { case None => true // We know we have an execution at this point case Some(url) => // For streaming jobs, we check whether there is a streaming query for the given mapping - if (runningTasks(jobId)(executionId).isStreaming) { + if (runningTasks(jobId)(executionId).isStreamingJob) { runningTasks(jobId)(executionId).getStreamingQueryMap().contains(url) // For batch jobs, we don't differentiate mapping tasks. So, returning true directly (which indicates that the job execution is in progress) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala index 39be4844..01893c96 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobExecution.scala @@ -12,29 +12,27 @@ import java.util.regex.Pattern /** * Represents the execution of mapping tasks included in a mapping job. * - * @param id Unique identifier for the execution - * @param projectId Unique identifier of project to which mapping job belongs - * @param job Fhir mapping job - * @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job) - * @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs + * @param id Unique identifier for the execution + * @param projectId Unique identifier of project to which mapping job belongs + * @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job) + * @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs + * @param isStreamingJob Whether the execution is streaming or not + * @param hasFileSystemSource Whether the execution source is file system or not + * @param fileSystemSourceDataFolderPath If execution has a file system source, this is data folder path of it + * @param archiveMode Archive mode of execution + * @param saveErroneousRecords Whether to save erroneous records or not */ case class FhirMappingJobExecution(id: String, projectId: String, jobId: String, mappingTasks: Seq[FhirMappingTask], jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]], - sourceSettings: Map[String,DataSourceSettings], + isStreamingJob: Boolean, + hasFileSystemSource: Boolean, + fileSystemSourceDataFolderPath: String, archiveMode: ArchiveModes, saveErroneousRecords: Boolean ) { - /** - * Returns whether the execution is streaming or not - * @return - */ - def isStreaming: Boolean = { - sourceSettings.exists(source => source._2.asStream) - } - /** * Returns the map of streaming queries i.e. map of (mapping url -> streaming query) * @return @@ -150,7 +148,30 @@ object FhirMappingJobExecution { mappingTasks: Seq[FhirMappingTask] = Seq.empty, jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]] = None ): FhirMappingJobExecution = { - FhirMappingJobExecution(id, projectId, job.id, mappingTasks, jobGroupIdOrStreamingQuery, job.sourceSettings, job.dataProcessingSettings.archiveMode, job.dataProcessingSettings.saveErroneousRecords) + val jobId = if (job != null && job.id != null) job.id else "" + + val sourceSettings = Option(job.sourceSettings) + var isStreamingJob = false + var hasFileSystemSource = false + var dataFolderPath = "" + if(sourceSettings.nonEmpty) { + isStreamingJob = sourceSettings.get.exists(source => source._2.asStream) + hasFileSystemSource = sourceSettings.get.head._2.isInstanceOf[FileSystemSourceSettings] + dataFolderPath = sourceSettings.get.head._2 match { + case settings: FileSystemSourceSettings => settings.dataFolderPath + case _ => "" + } + } + + val dataProcessingSettings = job.dataProcessingSettings + var archiveMode = ArchiveModes.OFF + var saveErroneousRecords = false + if(dataProcessingSettings != null) { + archiveMode = dataProcessingSettings.archiveMode + saveErroneousRecords = dataProcessingSettings.saveErroneousRecords + } + + FhirMappingJobExecution(id, projectId, jobId, mappingTasks, jobGroupIdOrStreamingQuery, isStreamingJob, hasFileSystemSource, dataFolderPath, archiveMode, saveErroneousRecords) } } diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/SinkHandlerTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/SinkHandlerTest.scala index a0b2d4d9..6ab84a69 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/SinkHandlerTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/SinkHandlerTest.scala @@ -2,7 +2,7 @@ package io.tofhir.test.engine.data.write import io.tofhir.engine.config.ToFhirConfig import io.tofhir.engine.data.write.{BaseFhirWriter, SinkHandler} -import io.tofhir.engine.model.{DataProcessingSettings, FhirMappingJob, FhirMappingJobExecution, FhirMappingResult} +import io.tofhir.engine.model.{DataProcessingSettings, FhirMappingJob, FhirMappingJobExecution, FhirMappingResult, FileSystemSourceSettings} import org.apache.spark.sql.{Dataset, SparkSession} import org.mockito.ArgumentMatchers import org.mockito.MockitoSugar._ @@ -21,6 +21,7 @@ class SinkHandlerTest extends AnyFlatSpec { val mockJob: FhirMappingJob = mock[FhirMappingJob] when(mockJob.id).thenReturn("jobId") when(mockJob.dataProcessingSettings).thenReturn(DataProcessingSettings.apply()) + when(mockJob.sourceSettings).thenReturn(Map("0" -> FileSystemSourceSettings.apply("name", "sourceUri", "dataFolderPath"))) val execution: FhirMappingJobExecution = FhirMappingJobExecution("executionId", "projectId", mockJob) diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/engine/execution/FileStreamInputArchiverTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/engine/execution/FileStreamInputArchiverTest.scala index 5b8c7827..d3cfadcc 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/engine/execution/FileStreamInputArchiverTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/engine/execution/FileStreamInputArchiverTest.scala @@ -37,7 +37,7 @@ class FileStreamInputArchiverTest extends AnyFlatSpec with Matchers { // Create test objects for off mode val jobId3 = "mocked_job_id_3" - val testExecutionWithOff: FhirMappingJobExecution = testExecution.copy(id = jobId3, ArchiveModes.OFF) + val testExecutionWithOff: FhirMappingJobExecution = testExecution.copy(id = jobId3, archiveMode = ArchiveModes.OFF) "FileStreamInputArchiver" should "not apply archiving/deletion for a streaming job with archive mode is off" in { // Initialize spark files for this test