Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ feat: Log kafka topic in the kafka topic not found error. #264

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.tofhir.engine.execution
import it.sauronsoftware.cron4j.{Scheduler, SchedulerListener, TaskExecutor}
import com.typesafe.scalalogging.Logger
import io.tofhir.engine.Execution.actorSystem.dispatcher
import io.tofhir.engine.model.{FhirMappingJobExecution, FhirMappingJobResult}
import io.tofhir.engine.model.{FhirMappingJobExecution, FhirMappingJobResult, KafkaSource}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException}

Expand All @@ -15,6 +15,8 @@ import io.tofhir.engine.Execution.actorSystem
import io.tofhir.engine.data.write.SinkHandler
import io.tofhir.engine.execution.log.ExecutionLogger
import io.tofhir.engine.execution.processing.FileStreamInputArchiver
import io.tofhir.engine.model.exception.FhirMappingException
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
/**
* Execution manager that keeps track of running and scheduled mapping tasks in-memory.
* This registry is designed to maintain the execution status of both Streaming and Batch mapping jobs.
Expand Down Expand Up @@ -96,9 +98,21 @@ class RunningJobRegistry(spark: SparkSession) {
// wait for StreamingQuery to terminate
updatedExecution.getStreamingQuery(mappingTaskName).awaitTermination()
}catch{
case exception: StreamingQueryException =>{
ExecutionLogger.logExecutionStatus(execution, FhirMappingJobResult.FAILURE, Some(mappingTaskName), Some(exception), isChunkResult = false)
}
case exception: StreamingQueryException =>
Option(exception.getCause) match {
case None =>
ExecutionLogger.logExecutionStatus(execution, FhirMappingJobResult.FAILURE, Some(mappingTaskName), Some(exception), isChunkResult = false)
case Some(cause) =>
Option(cause.getCause) match {
// special handling of UnknownTopicOrPartitionException to include the missing topic names
case Some(_: UnknownTopicOrPartitionException) =>
val topicNames = execution.mappingTasks.find(mappingTask => mappingTask.name.contentEquals(mappingTaskName)).get.sourceBinding.map(source => source._2.asInstanceOf[KafkaSource].topicName).mkString(", ")
val unknownTopicError = FhirMappingException(s"The following Kafka topic(s) specified in the mapping task do not exist: $topicNames")
ExecutionLogger.logExecutionStatus(execution,FhirMappingJobResult.FAILURE,Some(mappingTaskName),Some(unknownTopicError),isChunkResult = false)
case _ =>
ExecutionLogger.logExecutionStatus(execution,FhirMappingJobResult.FAILURE,Some(mappingTaskName),Some(exception),isChunkResult = false)
}
}
}finally{
// Remove the mapping execution from the running tasks after the query is terminated
stopMappingExecution(jobId, executionId, mappingTaskName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import io.tofhir.server.repository.mapping.IMappingRepository
import io.tofhir.server.repository.schema.ISchemaRepository
import io.tofhir.server.util.DataFrameUtil
import org.apache.commons.io
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.spark.sql.KeyValueGroupedDataset
import org.json4s.jackson.JsonMethods
import org.json4s.{JArray, JBool, JObject, JString, JValue}

import java.io.File
import java.util.UUID
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionException, Future}

/**
* Service to handle all execution related operations
Expand Down Expand Up @@ -245,6 +246,21 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
.groupByKey((result: FhirMappingResult) => result.source)
// Map each group to FhirMappingResultForInput
grouped.mapGroups(FhirMappingResultConverter.convertToFhirMappingResultsForInput).collect().toSeq
}.recover{
case ee: ExecutionException =>
Option(ee.getCause) match {
// special handling of UnknownTopicOrPartitionException to include the missing topic names
case Some(_: UnknownTopicOrPartitionException) =>
val topicNames: Seq[String] = testResourceCreationRequest.fhirMappingTask.sourceBinding.map(source => source._2.asInstanceOf[KafkaSource].topicName).toSeq
throw BadRequest(
"Invalid Kafka Topics",
s"The following Kafka topic(s) specified in the mapping task do not exist: ${topicNames.mkString(", ")}."
)
case _ =>
throw ee
}
case e: Exception =>
throw e
}
}
}
Expand Down
Loading