Skip to content

Commit

Permalink
♻️ Refactor(FhirMappingJobExecution): Add an apply method to simplify…
Browse files Browse the repository at this point in the history
… FhirMappingJobExecution by removing job from definition
  • Loading branch information
camemre49 committed Jun 26, 2024
1 parent 8891fd7 commit f89ae3a
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 59 deletions.
4 changes: 2 additions & 2 deletions tofhir-engine/src/main/scala/io/tofhir/engine/cli/Run.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Run extends Command {
context.toFhirEngine.runningJobRegistry.registerBatchJob(
mappingJobExecution,
Some(f),
s"Spark job for job: ${mappingJobExecution.job.id} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}"
s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}"
)
} else {
// Understand whether the argument is the name or the URL of the mapping and then find/execute it.
Expand Down Expand Up @@ -71,7 +71,7 @@ class Run extends Command {
context.toFhirEngine.runningJobRegistry.registerBatchJob(
mappingJobExecution,
Some(f),
s"Spark job for job: ${mappingJobExecution.job.id} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}"
s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object SinkHandler {
val mappingErrors = df.filter(_.error.exists(_.code != FhirMappingErrorCodes.INVALID_INPUT))
val mappedResults = df.filter(_.mappedResource.isDefined)
//Create an accumulator to accumulate the results that cannot be written
val accumName = s"${mappingJobExecution.job.id}:${mappingUrl.map(u => s"$u:").getOrElse("")}fhirWritingProblems"
val accumName = s"${mappingJobExecution.jobId}:${mappingUrl.map(u => s"$u:").getOrElse("")}fhirWritingProblems"
val fhirWriteProblemsAccum: CollectionAccumulator[FhirMappingResult] = spark.sparkContext.collectionAccumulator[FhirMappingResult](accumName)
fhirWriteProblemsAccum.reset()
//Write the FHIR resources
Expand All @@ -53,7 +53,7 @@ object SinkHandler {
writeBatch(spark, mappingJobExecution, Some(mappingUrl), dataset, resourceWriter)
} catch {
case e: Throwable =>
logger.error(s"Streaming batch resulted in error for project: ${mappingJobExecution.projectId}, job: ${mappingJobExecution.job.id}, execution: ${mappingJobExecution.id}, mapping: $mappingUrl", e.getMessage)
logger.error(s"Streaming batch resulted in error for project: ${mappingJobExecution.projectId}, job: ${mappingJobExecution.jobId}, execution: ${mappingJobExecution.id}, mapping: $mappingUrl", e.getMessage)
}

df
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object ErroneousRecordWriter {
notWrittenResources: util.List[FhirMappingResult],
mappingErrors: Dataset[FhirMappingResult],
invalidInputs: Dataset[FhirMappingResult]): Unit = {
if (mappingJobExecution.job.dataProcessingSettings.saveErroneousRecords) {
if (mappingJobExecution.saveErroneousRecords) {
if (!invalidInputs.isEmpty) {
this.writeErroneousDataset(mappingJobExecution, invalidInputs, mappingUrl.get, FhirMappingErrorCodes.INVALID_INPUT)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FileStreamInputArchiver(runningJobRegistry: RunningJobRegistry) {
* @param taskExecution
*/
def applyArchivingOnStreamingJob(taskExecution: FhirMappingJobExecution, mappingUrl: String): Unit = {
val archiveMode: ArchiveModes = taskExecution.job.dataProcessingSettings.archiveMode
val archiveMode: ArchiveModes = taskExecution.archiveMode
if (archiveMode != ArchiveModes.OFF) {
// Get the commit file directory for this execution
val commitDirectory: File = new File(taskExecution.getCommitDirectory(mappingUrl))
Expand Down Expand Up @@ -90,9 +90,9 @@ object FileStreamInputArchiver {
// Putting the archiving logic for the batch file inside within a try block so that it would not affect the caller in case of any exception.
// That means archiving works in best-effort mode.
try {
val archiveMode: ArchiveModes = execution.job.dataProcessingSettings.archiveMode
val archiveMode: ArchiveModes = execution.archiveMode
if (archiveMode != ArchiveModes.OFF) {
val fileSystemSourceSettings = execution.job.sourceSettings.head._2.asInstanceOf[FileSystemSourceSettings]
val fileSystemSourceSettings = execution.sourceSettings.head._2.asInstanceOf[FileSystemSourceSettings]
// get data folder path from data source settings
val dataFolderPath = FileUtils.getPath(fileSystemSourceSettings.dataFolderPath).toString

Expand All @@ -111,7 +111,7 @@ object FileStreamInputArchiver {
})
}
} catch {
case t:Throwable => logger.warn(s"Failed to apply archiving for job: ${execution.job.id}, execution: ${execution.id}")
case t:Throwable => logger.warn(s"Failed to apply archiving for job: ${execution.jobId}, execution: ${execution.id}")
}
}

Expand Down Expand Up @@ -181,10 +181,10 @@ class StreamingArchiverTask(archiver: FileStreamInputArchiver, runningJobRegistr
override def run(): Unit = {
// Get executions with streaming queries and file system sources and apply
val executions = runningJobRegistry.getRunningExecutionsWithCompleteMetadata()
.filter(execution => execution.isStreaming())
.filter(execution => execution.job.sourceSettings.head._2.isInstanceOf[FileSystemSourceSettings])
.filter(execution => execution.isStreaming)
.filter(execution => execution.sourceSettings.head._2.isInstanceOf[FileSystemSourceSettings])
executions.foreach(execution => {
if (execution.isStreaming()) {
if (execution.isStreaming) {
execution.getStreamingQueryMap().keys.foreach(mappingUrl => {
archiver.applyArchivingOnStreamingJob(execution, mappingUrl)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ class RunningJobRegistry(spark: SparkSession) {
// If there is an error in the streaming query execution, call 'stopMappingExecution' function,
// which is responsible for removing it from the registry. Without that, the registry might contain incorrect
// information such as indicating that the job is still running when it has encountered an error.
streamingQueryFuture.recover(_ => stopMappingExecution(execution.job.id, execution.id, mappingUrl))
streamingQueryFuture.recover(_ => stopMappingExecution(execution.jobId, execution.id, mappingUrl))
// Wait for the initial Future to be resolved
val streamingQuery: StreamingQuery = Await.result(streamingQueryFuture, Duration.Inf)
val jobId: String = execution.job.id
val jobId: String = execution.jobId
val executionId: String = execution.id

// Multiple threads can update the global task map. So, updates are synchronized.
Expand Down Expand Up @@ -106,7 +106,7 @@ class RunningJobRegistry(spark: SparkSession) {
def registerBatchJob(execution: FhirMappingJobExecution, jobFuture: Option[Future[Unit]], jobDescription: String = ""): Unit = {
val jobGroup: String = setSparkJobGroup(jobDescription)
val executionWithJobGroupId = execution.copy(jobGroupIdOrStreamingQuery = Some(Left(jobGroup)))
val jobId: String = executionWithJobGroupId.job.id
val jobId: String = executionWithJobGroupId.jobId
val executionId: String = executionWithJobGroupId.id

runningTasks.synchronized {
Expand All @@ -132,13 +132,13 @@ class RunningJobRegistry(spark: SparkSession) {
def registerSchedulingJob(mappingJobExecution: FhirMappingJobExecution, scheduler: Scheduler): Unit = {
// add it to the scheduledTasks map
scheduledTasks
.getOrElseUpdate(mappingJobExecution.job.id, collection.mutable.Map[String, Scheduler]())
.getOrElseUpdate(mappingJobExecution.jobId, collection.mutable.Map[String, Scheduler]())
.put(mappingJobExecution.id, scheduler)
logger.debug(s"Scheduling job ${mappingJobExecution.job.id} has been registered")
logger.debug(s"Scheduling job ${mappingJobExecution.jobId} has been registered")
// add a scheduler listener to monitor task events
scheduler.addSchedulerListener(new SchedulerListener {
override def taskLaunching(executor: TaskExecutor): Unit = {
registerBatchJob(mappingJobExecution,None,s"Spark job for job: ${mappingJobExecution.job.id} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}")
registerBatchJob(mappingJobExecution,None,s"Spark job for job: ${mappingJobExecution.jobId} mappings: ${mappingJobExecution.mappingTasks.map(_.mappingRef).mkString(" ")}")
}

override def taskSucceeded(executor: TaskExecutor): Unit = {
Expand Down Expand Up @@ -270,7 +270,7 @@ class RunningJobRegistry(spark: SparkSession) {
val execution: FhirMappingJobExecution = jobMapping(executionId)
var removedMappingEntry: Option[Either[String, StreamingQuery]] = None
// If it is a batch job do nothing but warn user about the situation
if (!execution.isStreaming()) {
if (!execution.isStreaming) {
logger.warn(s"Execution with $jobId: $jobId, executionId: $executionId, mappingUrl: $mappingUrl won't be stopped with a specific mapping as this is a batch job." +
s"Stop execution by providing only the jobId and executionId")

Expand Down Expand Up @@ -308,7 +308,7 @@ class RunningJobRegistry(spark: SparkSession) {
case None => true // We know we have an execution at this point
case Some(url) =>
// For streaming jobs, we check whether there is a streaming query for the given mapping
if (runningTasks(jobId)(executionId).isStreaming()) {
if (runningTasks(jobId)(executionId).isStreaming) {
runningTasks(jobId)(executionId).getStreamingQueryMap().contains(url)

// For batch jobs, we don't differentiate mapping tasks. So, returning true directly (which indicates that the job execution is in progress)
Expand Down Expand Up @@ -411,6 +411,6 @@ class RunningJobRegistry(spark: SparkSession) {
*/
private def handleCompletedBatchJob(execution: FhirMappingJobExecution): Unit = {
FileStreamInputArchiver.applyArchivingOnBatchJob(execution)
removeExecutionFromRunningTasks(execution.job.id, execution.id)
removeExecutionFromRunningTasks(execution.jobId, execution.id)
}
}
Loading

0 comments on commit f89ae3a

Please sign in to comment.