Skip to content

Commit

Permalink
✨ Improve the log messages for the validation of required fields duri…
Browse files Browse the repository at this point in the history
…ng data source reading.
  • Loading branch information
sinaci committed Nov 21, 2024
1 parent be158d3 commit 881e5b8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.tofhir.engine.data.read

import io.tofhir.engine.model.exception.FhirMappingException
import io.tofhir.engine.model.{MappingJobSourceSettings, MappingSourceBinding}
import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.functions.{col, lit, when, concat, concat_ws}
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

Expand Down Expand Up @@ -72,33 +72,38 @@ object SourceHandler {

// Find the required fields
// Create a mutable set to store the names of required fields.
var requiredFields = sc.fields.filterNot(_.nullable).map(_.name).toSet
val requiredFieldsToCheckParents = sc.fields.filterNot(_.nullable).map(_.name).toSet
// Filter the set of required fields to ensure that their parents are also required
requiredFields = requiredFields.filter(field => {
val requiredFields = requiredFieldsToCheckParents.filter(field => {
// Split the field name into parts using dot (.) as the separator.
val parts = field.split("\\.")
// Check if each part's parent is also required.
parts.indices.forall { i =>
// Create the parent name by joining the parts up to index 'i' with dots.
val parentName = parts.take(i + 1).mkString(".")
// Check if the parent is required.
requiredFields.contains(parentName)
requiredFieldsToCheckParents.contains(parentName)
}
})
if (requiredFields.isEmpty)
finalSourceData.withColumn(INPUT_VALIDITY_ERROR, lit(null).cast(DataTypes.StringType))
else {
//TODO handle required fields for non-tabular data (deep fields)
//Check required columns
val nullCheck =
requiredFields
.map(f => col(f).isNull)
.reduce((c1, c2) => c1 || c2)
// TODO handle required fields for non-tabular data (deep fields)
// Check required columns
val nullChecksWithFields = requiredFields.map(f => (f, col(f).isNull))

val errorMessageColumn = concat_ws(", ",
nullChecksWithFields.map { case (field, isNullCheck) =>
when(isNullCheck, lit(field)) // Include field name if null
}.toSeq: _* // Convert to Seq and expand as varargs to pass them to concat_ws
)

finalSourceData
.withColumn(INPUT_VALIDITY_ERROR,
when(nullCheck, lit(s"One of the required column(s) (${requiredFields.mkString(", ")}) is missing or null"))
.otherwise(lit(null).cast(DataTypes.StringType))
when(
nullChecksWithFields.map(_._2).reduce(_ || _), // Adds a new column with an error message only if any one of the required field is null.
concat(lit("The following required columns are missing or null: "), errorMessageColumn)
).otherwise(lit(null).cast(DataTypes.StringType))
)
}
//If there is no schema or readers don't need validation, we assume all rows are valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object MappingTaskExecutor {
jobId = fhirMappingService.jobId,
mappingTaskName = fhirMappingService.mappingTaskName,
timestamp = Timestamp.from(Instant.now()),
source = Serialization.write(jo),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherObjectMap),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.INVALID_INPUT,
description = validationErrors.mkString("\n")
Expand Down

0 comments on commit 881e5b8

Please sign in to comment.