Skip to content

Commit

Permalink
🐛 fix: dangling ongoing task error. (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
Okanmercan99 authored Oct 3, 2024
1 parent 2267849 commit 9edcfe5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ object ExecutionLogger {
* @param status The status to log (e.g., STARTED, SKIPPED, STOPPED, FAILED)
* @param mappingTaskName The optional name of the mapping
* @param exception The optional exception that occurred
* @param isChunkResult Indicate whether the log is result of a chunk
*/
def logExecutionStatus(mappingJobExecution: FhirMappingJobExecution,
status: String,
mappingTaskName: Option[String] = None,
exception: Option[Throwable] = None): Unit = {
exception: Option[Throwable] = None,
isChunkResult: Boolean = true): Unit = {
// create the job result
val jobResult = FhirMappingJobResult(mappingJobExecution, mappingTaskName, status = Some(status))
val jobResult = FhirMappingJobResult(mappingJobExecution, mappingTaskName, status = Some(status), chunkResult = isChunkResult)
// log the status with either info or error based on the presence of an exception
exception match {
case Some(e) => logger.error(jobResult.toMapMarker, jobResult.toString, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,24 @@ class FhirMappingJobManager(
// Check whether the job is stopped
case se: SparkThrowable if se.getMessage.contains("cancelled part of cancelled job group") =>
// log the execution status as "STOPPED"
ExecutionLogger.logExecutionStatus(mappingJobExecution, FhirMappingJobResult.STOPPED, Some(task.name))
ExecutionLogger.logExecutionStatus(mappingJobExecution, FhirMappingJobResult.STOPPED, Some(task.name), isChunkResult = false)
throw FhirMappingJobStoppedException(s"Execution '${mappingJobExecution.id}' of job '${mappingJobExecution.jobId}' in project ${mappingJobExecution.projectId}' terminated manually!")
// Exceptions from Spark executors are wrapped inside a SparkException, which are caught below
case se: SparkThrowable =>
se.getCause match {
// log the mapping job result and exception for the errors encountered while reading the schema or writing the FHIR Resources
case _ =>
// log the execution status as "FAILURE"
ExecutionLogger.logExecutionStatus(mappingJobExecution, FhirMappingJobResult.FAILURE, Some(task.name), Some(se))
ExecutionLogger.logExecutionStatus(mappingJobExecution, FhirMappingJobResult.FAILURE, Some(task.name), Some(se), isChunkResult = false)
}
// Pass the stop exception to the upstream Futures in the chain laid out by foldLeft above
case t: FhirMappingJobStoppedException =>
// log the execution status as "SKIPPED"
ExecutionLogger.logExecutionStatus(mappingJobExecution, FhirMappingJobResult.SKIPPED, Some(task.name))
ExecutionLogger.logExecutionStatus(mappingJobExecution, FhirMappingJobResult.SKIPPED, Some(task.name), isChunkResult = false)
throw t
case e: Throwable =>
// log the execution status as "FAILURE"
ExecutionLogger.logExecutionStatus(mappingJobExecution, FhirMappingJobResult.FAILURE, Some(task.name), Some(e))
ExecutionLogger.logExecutionStatus(mappingJobExecution, FhirMappingJobResult.FAILURE, Some(task.name), Some(e), isChunkResult = false)
}
} map { _ => logger.debug(s"MappingJob execution finished for MappingJob: ${mappingJobExecution.jobId}.") }
}
Expand Down

0 comments on commit 9edcfe5

Please sign in to comment.