Skip to content

Commit

Permalink
Fixes #2170 - Replaced ErrorMessages and Mapping with spark-common Er…
Browse files Browse the repository at this point in the history
…rorMessages and Mapping
  • Loading branch information
TebaleloS committed Feb 24, 2023
1 parent 7cd5e3e commit eff8ef0
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
<apache.commons.configuration2.version>2.4</apache.commons.configuration2.version>
<abris.version>6.2.0</abris.version>
<absa.commons.version>1.1.0</absa.commons.version>
<absa.spark.commons.version>0.4.0</absa.spark.commons.version>
<absa.spark.commons.version>0.5.0</absa.spark.commons.version>
<atum.version>3.9.0</atum.version>
<bower.chart.js.version>2.7.3</bower.chart.js.version>
<bson.codec.jsr310.version>3.5.4</bson.codec.jsr310.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{expr, udf}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SparkSession}
import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping}
import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage

object BroadcastUtils {
// scalastyle:off null
Expand Down Expand Up @@ -108,7 +109,7 @@ object BroadcastUtils {
*/
def getErrorUdf(mappingTable: Broadcast[LocalMappingTable],
outputColumns: Seq[String],
mappings: Seq[Mapping])(implicit spark: SparkSession): UserDefinedFunction = {
mappings: Seq[ErrorMessage.Mapping])(implicit spark: SparkSession): UserDefinedFunction = {

val numberOfArguments = mappingTable.value.keyTypes.size

Expand All @@ -117,7 +118,7 @@ object BroadcastUtils {
null
} else {
val strings: Seq[String] = key.map(a => safeToString(a))
ErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings)
EnceladusErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings)
}
}
val errorMessageType = ScalaReflection.schemaFor[ErrorMessage].dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.enceladus.utils.error
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import za.co.absa.standardization.config.DefaultErrorCodesConfig
import za.co.absa.spark.commons.errorhandling.ErrorMessage

/**
* Case class to represent an error message
Expand All @@ -29,13 +30,13 @@ import za.co.absa.standardization.config.DefaultErrorCodesConfig
* @param rawValues - Sequence of raw values (which are the potential culprits of the error)
* @param mappings - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column
*/
case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq())
case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String)
//case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq())
//case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String)

object ErrorMessage {
val errorColumnName = "errCol"
object EnceladusErrorMessage {
// val errorColumnName = "errCol"

def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]): ErrorMessage = ErrorMessage(
def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]): ErrorMessage = ErrorMessage(
errType = "confMapError",
errCode = ErrorCodes.ConfMapError,
errMsg = "Conformance Error - Null produced by mapping conformance rule",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package za.co.absa.enceladus.utils.schema
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.udf.ConformanceUDFLibrary
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.spark.hats.transformations.NestedArrayTransformations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package za.co.absa.enceladus.utils.udf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.types.ArrayType
import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping}
import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.enceladus.utils.udf.ConformanceUDFNames._
import za.co.absa.spark.commons.OncePerSparkSession

Expand All @@ -28,20 +29,20 @@ import scala.collection.mutable
class ConformanceUDFLibrary()(implicit sparkToRegisterTo: SparkSession) extends OncePerSparkSession {

override protected def register(implicit spark: SparkSession): Unit = {
spark.udf.register(confMappingErr, { (errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]) =>
ErrorMessage.confMappingErr(errCol, rawValues, mappings)
spark.udf.register(confMappingErr, { (errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]) =>
EnceladusErrorMessage.confMappingErr(errCol, rawValues, mappings)
})

spark.udf.register(confCastErr, { (errCol: String, rawValue: String) =>
ErrorMessage.confCastErr(errCol, rawValue)
EnceladusErrorMessage.confCastErr(errCol, rawValue)
})

spark.udf.register(confNegErr, { (errCol: String, rawValue: String) =>
ErrorMessage.confNegErr(errCol, rawValue)
EnceladusErrorMessage.confNegErr(errCol, rawValue)
})

spark.udf.register(confLitErr, { (errCol: String, rawValue: String) =>
ErrorMessage.confLitErr(errCol, rawValue)
EnceladusErrorMessage.confLitErr(errCol, rawValue)
})

spark.udf.register(arrayDistinctErrors, // this UDF is registered for _spark-hats_ library sake
Expand Down

0 comments on commit eff8ef0

Please sign in to comment.