Skip to content

Commit

Permalink
♻️ Refactor(FhirMappingJobExecution): Further simplify the definition…
Browse files Browse the repository at this point in the history
… by removing sourceSettings
  • Loading branch information
camemre49 committed Jun 28, 2024
1 parent 97934ab commit 91924d1
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ object FileStreamInputArchiver {
try {
val archiveMode: ArchiveModes = execution.archiveMode
if (archiveMode != ArchiveModes.OFF) {
val fileSystemSourceSettings = execution.sourceSettings.head._2.asInstanceOf[FileSystemSourceSettings]
// get data folder path from data source settings
val dataFolderPath = FileUtils.getPath(fileSystemSourceSettings.dataFolderPath).toString
val dataFolderPath = FileUtils.getPath(execution.fileSystemSourceDataFolderPath).toString

// Get paths of the input files referred by the mapping tasks
paths = execution.mappingTasks.flatMap(mapping => {
Expand Down Expand Up @@ -181,10 +180,10 @@ class StreamingArchiverTask(archiver: FileStreamInputArchiver, runningJobRegistr
override def run(): Unit = {
// Get executions with streaming queries and file system sources and apply
val executions = runningJobRegistry.getRunningExecutionsWithCompleteMetadata()
.filter(execution => execution.isStreaming)
.filter(execution => execution.sourceSettings.head._2.isInstanceOf[FileSystemSourceSettings])
.filter(execution => execution.isStreamingJob)
.filter(execution => execution.hasFileSystemSource)
executions.foreach(execution => {
if (execution.isStreaming) {
if (execution.isStreamingJob) {
execution.getStreamingQueryMap().keys.foreach(mappingUrl => {
archiver.applyArchivingOnStreamingJob(execution, mappingUrl)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class RunningJobRegistry(spark: SparkSession) {
val execution: FhirMappingJobExecution = jobMapping(executionId)
var removedMappingEntry: Option[Either[String, StreamingQuery]] = None
// If it is a batch job do nothing but warn user about the situation
if (!execution.isStreaming) {
if (!execution.isStreamingJob) {
logger.warn(s"Execution with $jobId: $jobId, executionId: $executionId, mappingUrl: $mappingUrl won't be stopped with a specific mapping as this is a batch job." +
s"Stop execution by providing only the jobId and executionId")

Expand Down Expand Up @@ -308,7 +308,7 @@ class RunningJobRegistry(spark: SparkSession) {
case None => true // We know we have an execution at this point
case Some(url) =>
// For streaming jobs, we check whether there is a streaming query for the given mapping
if (runningTasks(jobId)(executionId).isStreaming) {
if (runningTasks(jobId)(executionId).isStreamingJob) {
runningTasks(jobId)(executionId).getStreamingQueryMap().contains(url)

// For batch jobs, we don't differentiate mapping tasks. So, returning true directly (which indicates that the job execution is in progress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,27 @@ import java.util.regex.Pattern
/**
* Represents the execution of mapping tasks included in a mapping job.
*
* @param id Unique identifier for the execution
* @param projectId Unique identifier of project to which mapping job belongs
* @param job Fhir mapping job
* @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job)
* @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs
* @param id Unique identifier for the execution
* @param projectId Unique identifier of project to which mapping job belongs
* @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job)
* @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs
* @param isStreamingJob Whether the execution is streaming or not
* @param hasFileSystemSource Whether the execution source is file system or not
* @param fileSystemSourceDataFolderPath If execution has a file system source, this is data folder path of it
* @param archiveMode Archive mode of execution
* @param saveErroneousRecords Whether to save erroneous records or not
*/
case class FhirMappingJobExecution(id: String,
projectId: String,
jobId: String,
mappingTasks: Seq[FhirMappingTask],
jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]],
sourceSettings: Map[String,DataSourceSettings],
isStreamingJob: Boolean,
hasFileSystemSource: Boolean,
fileSystemSourceDataFolderPath: String,
archiveMode: ArchiveModes,
saveErroneousRecords: Boolean
) {
/**
* Returns whether the execution is streaming or not
* @return
*/
def isStreaming: Boolean = {
sourceSettings.exists(source => source._2.asStream)
}

/**
* Returns the map of streaming queries i.e. map of (mapping url -> streaming query)
* @return
Expand Down Expand Up @@ -150,7 +148,30 @@ object FhirMappingJobExecution {
mappingTasks: Seq[FhirMappingTask] = Seq.empty,
jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]] = None
): FhirMappingJobExecution = {
FhirMappingJobExecution(id, projectId, job.id, mappingTasks, jobGroupIdOrStreamingQuery, job.sourceSettings, job.dataProcessingSettings.archiveMode, job.dataProcessingSettings.saveErroneousRecords)
val jobId = if (job != null && job.id != null) job.id else ""

val sourceSettings = Option(job.sourceSettings)
var isStreamingJob = false
var hasFileSystemSource = false
var dataFolderPath = ""
if(sourceSettings.nonEmpty) {
isStreamingJob = sourceSettings.get.exists(source => source._2.asStream)
hasFileSystemSource = sourceSettings.get.head._2.isInstanceOf[FileSystemSourceSettings]
dataFolderPath = sourceSettings.get.head._2 match {
case settings: FileSystemSourceSettings => settings.dataFolderPath
case _ => ""
}
}

val dataProcessingSettings = job.dataProcessingSettings
var archiveMode = ArchiveModes.OFF
var saveErroneousRecords = false
if(dataProcessingSettings != null) {
archiveMode = dataProcessingSettings.archiveMode
saveErroneousRecords = dataProcessingSettings.saveErroneousRecords
}

FhirMappingJobExecution(id, projectId, jobId, mappingTasks, jobGroupIdOrStreamingQuery, isStreamingJob, hasFileSystemSource, dataFolderPath, archiveMode, saveErroneousRecords)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.tofhir.test.engine.data.write

import io.tofhir.engine.config.ToFhirConfig
import io.tofhir.engine.data.write.{BaseFhirWriter, SinkHandler}
import io.tofhir.engine.model.{DataProcessingSettings, FhirMappingJob, FhirMappingJobExecution, FhirMappingResult}
import io.tofhir.engine.model.{DataProcessingSettings, FhirMappingJob, FhirMappingJobExecution, FhirMappingResult, FileSystemSourceSettings}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.mockito.ArgumentMatchers
import org.mockito.MockitoSugar._
Expand All @@ -21,6 +21,7 @@ class SinkHandlerTest extends AnyFlatSpec {
val mockJob: FhirMappingJob = mock[FhirMappingJob]
when(mockJob.id).thenReturn("jobId")
when(mockJob.dataProcessingSettings).thenReturn(DataProcessingSettings.apply())
when(mockJob.sourceSettings).thenReturn(Map("0" -> FileSystemSourceSettings.apply("name", "sourceUri", "dataFolderPath")))

val execution: FhirMappingJobExecution = FhirMappingJobExecution("executionId", "projectId", mockJob)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class FileStreamInputArchiverTest extends AnyFlatSpec with Matchers {

// Create test objects for off mode
val jobId3 = "mocked_job_id_3"
val testExecutionWithOff: FhirMappingJobExecution = testExecution.copy(id = jobId3, ArchiveModes.OFF)
val testExecutionWithOff: FhirMappingJobExecution = testExecution.copy(id = jobId3, archiveMode = ArchiveModes.OFF)

"FileStreamInputArchiver" should "not apply archiving/deletion for a streaming job with archive mode is off" in {
// Initialize spark files for this test
Expand Down

0 comments on commit 91924d1

Please sign in to comment.