diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala
index 82c994df..4c501135 100644
--- a/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala
@@ -1,9 +1,9 @@
package io.github.datacatering.datacaterer.api
-import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
-import io.github.datacatering.datacaterer.api.model.Constants._
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.connection.{CassandraBuilder, ConnectionTaskBuilder, FileBuilder, HttpBuilder, KafkaBuilder, MySqlBuilder, NoopBuilder, PostgresBuilder, SolaceBuilder}
+import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
+import io.github.datacatering.datacaterer.api.model.Constants._
import io.github.datacatering.datacaterer.api.model.DataCatererConfiguration
import scala.annotation.varargs
diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala
index 6578d945..c7309919 100644
--- a/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala
@@ -1,8 +1,8 @@
package io.github.datacatering.datacaterer.api
+import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY_ID, CONFLUENT_SCHEMA_REGISTRY_SUBJECT, CONFLUENT_SCHEMA_REGISTRY_VERSION, DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
-import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.model.{ConfluentSchemaRegistrySource, DataContractCliSource, GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}
case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadataSource()) {
diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala
index 401b57bf..877a893e 100644
--- a/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala
@@ -2,9 +2,9 @@ package io.github.datacatering.datacaterer.api
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api
+import io.github.datacatering.datacaterer.api.connection.ConnectionTaskBuilder
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaList
import io.github.datacatering.datacaterer.api.model.Constants.METADATA_SOURCE_TYPE
-import io.github.datacatering.datacaterer.api.connection.ConnectionTaskBuilder
import io.github.datacatering.datacaterer.api.model.{ForeignKeyRelation, Plan}
import scala.annotation.varargs
diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala
index 86408e60..cbe7b94d 100644
--- a/api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala
@@ -3,7 +3,7 @@ package io.github.datacatering.datacaterer.api
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants._
-import io.github.datacatering.datacaterer.api.model.{Count, DataType, Field, PerFieldCount, Step, StringType, Task, TaskSummary}
+import io.github.datacatering.datacaterer.api.model.{Count, DataType, Field, PerFieldCount, Step, Task, TaskSummary}
import scala.annotation.varargs
diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ConfigModels.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ConfigModels.scala
index 254b58d1..cbc49f04 100644
--- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ConfigModels.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ConfigModels.scala
@@ -1,6 +1,6 @@
package io.github.datacatering.datacaterer.api.model
-import Constants._
+import io.github.datacatering.datacaterer.api.model.Constants._
case class FlagsConfig(
enableCount: Boolean = DEFAULT_ENABLE_COUNT,
@@ -14,7 +14,8 @@ case class FlagsConfig(
enableSaveReports: Boolean = DEFAULT_ENABLE_SAVE_REPORTS,
enableValidation: Boolean = DEFAULT_ENABLE_VALIDATION,
enableGenerateValidations: Boolean = DEFAULT_ENABLE_SUGGEST_VALIDATIONS,
- enableAlerts: Boolean = DEFAULT_ENABLE_ALERTS
+ enableAlerts: Boolean = DEFAULT_ENABLE_ALERTS,
+ enableTrackActivity: Boolean = DEFAULT_ENABLE_TRACK_ACTIVITY
)
case class FoldersConfig(
diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala
index b55b910f..e6e73010 100644
--- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala
@@ -189,6 +189,7 @@ object Constants {
lazy val DEFAULT_ENABLE_VALIDATION = true
lazy val DEFAULT_ENABLE_SUGGEST_VALIDATIONS = false
lazy val DEFAULT_ENABLE_ALERTS = true
+ lazy val DEFAULT_ENABLE_TRACK_ACTIVITY = true
//folders defaults
lazy val DEFAULT_PLAN_FILE_PATH = "/opt/app/plan/customer-create-plan.yaml"
@@ -552,4 +553,10 @@ object Constants {
lazy val PLAN_RUN_EXECUTION_DELIMITER_REGEX = "\\|\\|"
lazy val PLAN_RUN_SUMMARY_DELIMITER = "&&"
+ //source of plan run
+ lazy val DATA_CATERER_INTERFACE_JAVA = "java"
+ lazy val DATA_CATERER_INTERFACE_SCALA = "scala"
+ lazy val DATA_CATERER_INTERFACE_UI = "ui"
+ lazy val DATA_CATERER_INTERFACE_YAML = "yaml"
+
}
diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala
index 505b42df..a547aaf8 100644
--- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala
@@ -1,6 +1,6 @@
package io.github.datacatering.datacaterer.api.model
-import Constants.{CONFLUENT_SCHEMA_REGISTRY, DATA_CONTRACT_CLI, GREAT_EXPECTATIONS, MARQUEZ, METADATA_SOURCE_HAS_OPEN_LINEAGE_SUPPORT, METADATA_SOURCE_TYPE, OPEN_API, OPEN_DATA_CONTRACT_STANDARD, OPEN_METADATA}
+import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY, DATA_CONTRACT_CLI, GREAT_EXPECTATIONS, MARQUEZ, METADATA_SOURCE_HAS_OPEN_LINEAGE_SUPPORT, METADATA_SOURCE_TYPE, OPEN_API, OPEN_DATA_CONTRACT_STANDARD, OPEN_METADATA}
trait MetadataSource {
diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/PlanModels.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/PlanModels.scala
index 8edae571..1448868c 100644
--- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/PlanModels.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/PlanModels.scala
@@ -1,7 +1,7 @@
package io.github.datacatering.datacaterer.api.model
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
-import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD_NAME, DEFAULT_FIELD_NULLABLE, DEFAULT_FIELD_TYPE, DEFAULT_GENERATOR_TYPE, DEFAULT_PER_FIELD_COUNT_RECORDS, DEFAULT_STEP_ENABLED, DEFAULT_STEP_NAME, DEFAULT_STEP_TYPE, DEFAULT_TASK_NAME, DEFAULT_TASK_SUMMARY_ENABLE, FOREIGN_KEY_DELIMITER}
+import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD_NAME, DEFAULT_FIELD_NULLABLE, DEFAULT_FIELD_TYPE, DEFAULT_PER_FIELD_COUNT_RECORDS, DEFAULT_STEP_ENABLED, DEFAULT_STEP_NAME, DEFAULT_STEP_TYPE, DEFAULT_TASK_NAME, DEFAULT_TASK_SUMMARY_ENABLE, FOREIGN_KEY_DELIMITER}
import scala.language.implicitConversions
@@ -11,7 +11,8 @@ case class Plan(
tasks: List[TaskSummary] = List(),
sinkOptions: Option[SinkOptions] = None,
validations: List[String] = List(),
- runId: Option[String] = None
+ runId: Option[String] = None,
+ interface: Option[String] = None
)
case class SinkOptions(
diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ResultModels.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ResultModels.scala
new file mode 100644
index 00000000..5914e436
--- /dev/null
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ResultModels.scala
@@ -0,0 +1,162 @@
+package io.github.datacatering.datacaterer.api.model
+
+import io.github.datacatering.datacaterer.api.model.Constants.DEFAULT_DATA_SOURCE_NAME
+import io.github.datacatering.datacaterer.api.util.ConfigUtil.cleanseOptions
+import io.github.datacatering.datacaterer.api.util.ResultWriterUtil.getSuccessSymbol
+
+import java.time.{Duration, LocalDateTime}
+import scala.math.BigDecimal.RoundingMode
+
+
+case class DataSourceResultSummary(
+ name: String,
+ numRecords: Long,
+ isSuccess: Boolean,
+ dataSourceResults: List[DataSourceResult]
+ )
+
+case class DataSourceResult(
+ name: String = DEFAULT_DATA_SOURCE_NAME,
+ task: Task = Task(),
+ step: Step = Step(),
+ sinkResult: SinkResult = SinkResult(),
+ batchNum: Int = 0
+ ) {
+
+ def summarise: List[String] = {
+ val format = sinkResult.format
+ val isSuccess = getSuccessSymbol(sinkResult.isSuccess)
+ val numRecords = sinkResult.count.toString
+ List(name, format, isSuccess, numRecords)
+ }
+
+ def jsonSummary: Map[String, Any] = {
+ Map(
+ "name" -> name,
+ "options" -> step.options,
+ "isSuccess" -> sinkResult.isSuccess,
+ "numRecords" -> sinkResult.count
+ )
+ }
+}
+
+case class TaskResultSummary(
+ task: Task,
+ numRecords: Long,
+ isSuccess: Boolean,
+ stepResults: List[StepResultSummary]
+ )
+
+case class StepResultSummary(
+ step: Step,
+ numRecords: Long,
+ isSuccess: Boolean,
+ dataSourceResults: List[DataSourceResult]
+ )
+
+case class SinkResult(
+ name: String = DEFAULT_DATA_SOURCE_NAME,
+ format: String = "json",
+ saveMode: String = "append",
+ options: Map[String, String] = Map(),
+ count: Long = -1,
+ isSuccess: Boolean = true,
+ sample: Array[String] = Array(),
+ startTime: LocalDateTime = LocalDateTime.now(),
+ endTime: LocalDateTime = LocalDateTime.now(),
+ generatedMetadata: Array[Field] = Array(),
+ exception: Option[Throwable] = None
+ ) {
+
+ def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds
+}
+
+
+case class ValidationConfigResult(
+ name: String = "default_validation_result",
+ description: String = "Validation result for data sources",
+ dataSourceValidationResults: List[DataSourceValidationResult] = List(),
+ startTime: LocalDateTime = LocalDateTime.now(),
+ endTime: LocalDateTime = LocalDateTime.now()
+ ) {
+ def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds
+
+ def summarise: List[String] = {
+ val validationRes = dataSourceValidationResults.flatMap(_.validationResults)
+ if (validationRes.nonEmpty) {
+ val (numSuccess, successRate, isSuccess) = baseSummary(validationRes)
+ val successRateVisual = s"$numSuccess/${validationRes.size} ($successRate%)"
+ List(name, description, getSuccessSymbol(isSuccess), successRateVisual)
+ } else List()
+ }
+
+ def jsonSummary: Map[String, Any] = {
+ val validationRes = dataSourceValidationResults.flatMap(dsv =>
+ dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v))
+ )
+ if (validationRes.nonEmpty) {
+ val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3))
+ val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => {
+ val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap
+ Map(
+ "dataSourceName" -> res._1,
+ "options" -> cleanseOptions(res._2),
+ "validation" -> validationDetails,
+ "numErrors" -> res._3.numErrors,
+ "sampleErrorValues" -> res._3.sampleErrorValues.getOrElse(Array())
+ )
+ })
+ val baseValidationMap = Map(
+ "name" -> name,
+ "description" -> description,
+ "isSuccess" -> isSuccess,
+ "numSuccess" -> numSuccess,
+ "numValidations" -> validationRes.size,
+ "successRate" -> successRate
+ )
+ if (errorMap.nonEmpty) {
+ baseValidationMap ++ Map("errorValidations" -> errorMap)
+ } else baseValidationMap
+ } else Map()
+ }
+
+ private def baseSummary(validationRes: List[ValidationResult]): (Int, BigDecimal, Boolean) = {
+ val validationSuccess = validationRes.map(_.isSuccess)
+ val numSuccess = validationSuccess.count(x => x)
+ val successRate = BigDecimal(numSuccess.toDouble / validationRes.size * 100).setScale(2, RoundingMode.HALF_UP)
+ val isSuccess = validationSuccess.forall(x => x)
+ (numSuccess, successRate, isSuccess)
+ }
+}
+
+case class DataSourceValidationResult(
+ dataSourceName: String = "default_data_source",
+ options: Map[String, String] = Map(),
+ validationResults: List[ValidationResult] = List()
+ )
+
+case class ValidationResult(
+ validation: Validation = ExpressionValidation(),
+ isSuccess: Boolean = true,
+ numErrors: Long = 0,
+ total: Long = 0,
+ sampleErrorValues: Option[Array[Map[String, Any]]] = None
+ )
+
+object ValidationResult {
+ def fromValidationWithBaseResult(validation: Validation, validationResult: ValidationResult): ValidationResult = {
+ ValidationResult(validation, validationResult.isSuccess, validationResult.numErrors, validationResult.total, validationResult.sampleErrorValues)
+ }
+}
+
+case class PlanResults(
+ plan: Plan,
+ generationResult: List[DataSourceResult],
+ validationResults: List[ValidationConfigResult]
+ )
+
+case class PlanRunSummary(
+ plan: Plan,
+ tasks: List[Task],
+ validations: List[ValidationConfiguration]
+ )
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ConfigUtil.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/util/ConfigUtil.scala
similarity index 87%
rename from app/src/main/scala/io/github/datacatering/datacaterer/core/util/ConfigUtil.scala
rename to api/src/main/scala/io/github/datacatering/datacaterer/api/util/ConfigUtil.scala
index 1cf8a19c..80d4c07a 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ConfigUtil.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/util/ConfigUtil.scala
@@ -1,4 +1,4 @@
-package io.github.datacatering.datacaterer.core.util
+package io.github.datacatering.datacaterer.api.util
object ConfigUtil {
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ResultWriterUtil.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/util/ResultWriterUtil.scala
similarity index 70%
rename from app/src/main/scala/io/github/datacatering/datacaterer/core/util/ResultWriterUtil.scala
rename to api/src/main/scala/io/github/datacatering/datacaterer/api/util/ResultWriterUtil.scala
index e7e9b3c5..93261f89 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ResultWriterUtil.scala
+++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/util/ResultWriterUtil.scala
@@ -1,4 +1,4 @@
-package io.github.datacatering.datacaterer.core.util
+package io.github.datacatering.datacaterer.api.util
object ResultWriterUtil {
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPostPlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPostPlanProcessor.scala
new file mode 100644
index 00000000..e1942e2a
--- /dev/null
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPostPlanProcessor.scala
@@ -0,0 +1,46 @@
+package io.github.datacatering.datacaterer.core.activity
+
+import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, PlanResults, ValidationConfigResult}
+import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
+import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor
+import io.github.datacatering.datacaterer.core.util.ManagementUtil.getDataCatererManagementUrl
+import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
+import org.apache.log4j.Logger
+import org.asynchttpclient.AsyncHttpClient
+import org.asynchttpclient.Dsl.asyncHttpClient
+
+import scala.compat.java8.FutureConverters.CompletionStageOps
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.{Failure, Success}
+
+class PlanRunPostPlanProcessor(val dataCatererConfiguration: DataCatererConfiguration) extends PostPlanProcessor {
+
+ override val enabled: Boolean = dataCatererConfiguration.flagsConfig.enableTrackActivity
+
+ private val LOGGER = Logger.getLogger(getClass.getName)
+ private val dataCatererManagementUrl = getDataCatererManagementUrl
+ private val http: AsyncHttpClient = asyncHttpClient
+
+ override def apply(
+ plan: Plan,
+ sparkRecordListener: SparkRecordListener,
+ generationResult: List[DataSourceResult],
+ validationResults: List[ValidationConfigResult]
+ ): Unit = {
+ val planResults = PlanResults(plan, generationResult, validationResults)
+ val jsonBody = ObjectMapperUtil.jsonObjectMapper.writeValueAsString(planResults)
+ val url = s"$dataCatererManagementUrl/plan/finish"
+ val prepareRequest = http.prepare("POST", url)
+ .setBody(jsonBody)
+
+ val futureResp = prepareRequest.execute().toCompletableFuture.toScala
+ futureResp.onComplete {
+ case Success(_) =>
+ LOGGER.debug(s"Successfully posted run results, url=$url")
+ http.close()
+ case Failure(exception) =>
+ LOGGER.debug(s"Failed to post run results, url=$url", exception)
+ http.close()
+ }
+ }
+}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPrePlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPrePlanProcessor.scala
new file mode 100644
index 00000000..feff55c7
--- /dev/null
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPrePlanProcessor.scala
@@ -0,0 +1,44 @@
+package io.github.datacatering.datacaterer.core.activity
+
+import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, PlanRunSummary, Task, ValidationConfiguration}
+import io.github.datacatering.datacaterer.core.plan.PrePlanProcessor
+import io.github.datacatering.datacaterer.core.util.ManagementUtil.getDataCatererManagementUrl
+import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
+import org.apache.log4j.Logger
+import org.asynchttpclient.AsyncHttpClient
+import org.asynchttpclient.Dsl.asyncHttpClient
+
+import scala.compat.java8.FutureConverters.CompletionStageOps
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.{Failure, Success}
+
+class PlanRunPrePlanProcessor(val dataCatererConfiguration: DataCatererConfiguration) extends PrePlanProcessor {
+
+ override val enabled: Boolean = dataCatererConfiguration.flagsConfig.enableTrackActivity
+
+ private val LOGGER = Logger.getLogger(getClass.getName)
+ private val dataCatererManagementUrl = getDataCatererManagementUrl
+ private val http: AsyncHttpClient = asyncHttpClient
+
+ override def apply(
+ plan: Plan,
+ tasks: List[Task],
+ validations: List[ValidationConfiguration]
+ ): Unit = {
+ val planRunSummary = PlanRunSummary(plan, tasks, validations)
+ val jsonBody = ObjectMapperUtil.jsonObjectMapper.writeValueAsString(planRunSummary)
+ val url = s"$dataCatererManagementUrl/plan/start"
+ val prepareRequest = http.prepare("POST", url)
+ .setBody(jsonBody)
+
+ val futureResp = prepareRequest.execute().toCompletableFuture.toScala
+ futureResp.onComplete {
+ case Success(_) =>
+ LOGGER.debug(s"Successfully posted run start, url=$url")
+ http.close()
+ case Failure(exception) =>
+ LOGGER.debug(s"Failed to post run start, url=$url", exception)
+ http.close()
+ }
+ }
+}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessor.scala
index b031aab4..2d59252e 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessor.scala
@@ -1,9 +1,8 @@
package io.github.datacatering.datacaterer.core.alert
import io.github.datacatering.datacaterer.api.model.Constants.{ALERT_TRIGGER_ON_ALL, ALERT_TRIGGER_ON_FAILURE, ALERT_TRIGGER_ON_GENERATION_FAILURE, ALERT_TRIGGER_ON_GENERATION_SUCCESS, ALERT_TRIGGER_ON_SUCCESS, ALERT_TRIGGER_ON_VALIDATION_FAILURE, ALERT_TRIGGER_ON_VALIDATION_SUCCESS}
-import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan}
+import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
-import io.github.datacatering.datacaterer.core.model.{DataSourceResult, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor
import org.apache.log4j.Logger
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/SlackAlertProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/SlackAlertProcessor.scala
index fd4c2d2c..7c8f6c40 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/SlackAlertProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/SlackAlertProcessor.scala
@@ -3,9 +3,8 @@ package io.github.datacatering.datacaterer.core.alert
import com.slack.api.Slack
import com.slack.api.methods.MethodsClient
import com.slack.api.methods.request.chat.ChatPostMessageRequest
-import io.github.datacatering.datacaterer.api.model.SlackAlertConfig
+import io.github.datacatering.datacaterer.api.model.{DataSourceResult, SlackAlertConfig, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.model.Constants.REPORT_HOME_HTML
-import io.github.datacatering.datacaterer.core.model.{DataSourceResult, ValidationConfigResult}
import org.apache.log4j.Logger
import scala.util.{Failure, Success, Try}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/exception/Exceptions.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/exception/Exceptions.scala
index 2361567c..5659200c 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/exception/Exceptions.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/exception/Exceptions.scala
@@ -1,7 +1,7 @@
package io.github.datacatering.datacaterer.core.exception
import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, FOREIGN_KEY_DELIMITER, FOREIGN_KEY_PLAN_FILE_DELIMITER}
-import io.github.datacatering.datacaterer.api.model.{Count, Field, Step, Validation}
+import io.github.datacatering.datacaterer.api.model.{Field, Step, Validation}
import org.apache.spark.sql.types.{DataType, Metadata, StructField}
//files
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala
index a8cbb571..9875caab 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala
@@ -1,10 +1,9 @@
package io.github.datacatering.datacaterer.core.generator
import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_ENABLE_GENERATE_DATA, ENABLE_DATA_GENERATION, SAVE_MODE}
-import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, GenerationConfig, MetadataConfig, Plan, Step, Task, TaskSummary, UpstreamDataSourceValidation, ValidationConfiguration}
+import io.github.datacatering.datacaterer.api.model.{DataSourceResult, FlagsConfig, FoldersConfig, GenerationConfig, MetadataConfig, Plan, Step, Task, TaskSummary, UpstreamDataSourceValidation, ValidationConfiguration}
import io.github.datacatering.datacaterer.core.exception.InvalidRandomSeedException
import io.github.datacatering.datacaterer.core.generator.track.RecordTrackingProcessor
-import io.github.datacatering.datacaterer.core.model.DataSourceResult
import io.github.datacatering.datacaterer.core.sink.SinkFactory
import io.github.datacatering.datacaterer.core.util.GeneratorUtil.getDataSourceName
import io.github.datacatering.datacaterer.core.util.PlanImplicits.PerFieldCountOps
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala
index 67290f4a..cc0785d8 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala
@@ -1,11 +1,12 @@
package io.github.datacatering.datacaterer.core.generator
-import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, Task, TaskSummary, ValidationConfiguration}
+import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, Task, TaskSummary, ValidationConfigResult, ValidationConfiguration}
+import io.github.datacatering.datacaterer.core.activity.PlanRunPostPlanProcessor
import io.github.datacatering.datacaterer.core.alert.AlertProcessor
import io.github.datacatering.datacaterer.core.generator.delete.DeleteRecordProcessor
import io.github.datacatering.datacaterer.core.generator.result.DataGenerationResultWriter
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
-import io.github.datacatering.datacaterer.core.model.{DataSourceResult, PlanRunResults, ValidationConfigResult}
+import io.github.datacatering.datacaterer.core.model.PlanRunResults
import io.github.datacatering.datacaterer.core.parser.PlanParser
import io.github.datacatering.datacaterer.core.util.PlanImplicits.TaskOps
import io.github.datacatering.datacaterer.core.validator.ValidationProcessor
@@ -80,7 +81,11 @@ class DataGeneratorProcessor(dataCatererConfiguration: DataCatererConfiguration)
private def applyPostPlanProcessors(plan: Plan, sparkRecordListener: SparkRecordListener,
generationResult: List[DataSourceResult], validationResults: List[ValidationConfigResult]): Unit = {
- val postPlanProcessors = List(new DataGenerationResultWriter(dataCatererConfiguration), new AlertProcessor(dataCatererConfiguration))
+ val postPlanProcessors = List(
+ new DataGenerationResultWriter(dataCatererConfiguration),
+ new AlertProcessor(dataCatererConfiguration),
+ new PlanRunPostPlanProcessor(dataCatererConfiguration),
+ )
postPlanProcessors.foreach(postPlanProcessor => {
if (postPlanProcessor.enabled) postPlanProcessor.apply(plan, sparkRecordListener, generationResult, validationResults)
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/DeleteRecordProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/DeleteRecordProcessor.scala
index 0ed2cbc9..dd2ac402 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/DeleteRecordProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/DeleteRecordProcessor.scala
@@ -2,8 +2,8 @@ package io.github.datacatering.datacaterer.core.generator.delete
import io.github.datacatering.datacaterer.api.model.Constants.{CASSANDRA, CSV, DELTA, FOREIGN_KEY_DELIMITER, FORMAT, JDBC, JSON, ORC, PARQUET, PATH}
import io.github.datacatering.datacaterer.api.model.{ForeignKeyRelation, Plan, Step, Task, TaskSummary}
+import io.github.datacatering.datacaterer.api.util.ConfigUtil.cleanseOptions
import io.github.datacatering.datacaterer.core.model.Constants.RECORD_TRACKING_VALIDATION_FORMAT
-import io.github.datacatering.datacaterer.core.util.ConfigUtil.cleanseOptions
import io.github.datacatering.datacaterer.core.util.MetadataUtil.getSubDataSourcePath
import io.github.datacatering.datacaterer.core.util.PlanImplicits.SinkOptionsOps
import io.github.datacatering.datacaterer.core.util.{ForeignKeyRelationHelper, ForeignKeyUtil}
@@ -82,7 +82,7 @@ class DeleteRecordProcessor(connectionConfigsByName: Map[String, Map[String, Str
}
private def deleteRecords(dataSourceName: String, plan: Plan, step: Step, stepsByName: Map[String, Step] = Map(),
- optSourceForeignKey: Option[String] = None, optFullForeignKey: Option[(ForeignKeyRelation, String)] = None): Unit = {
+ optSourceForeignKey: Option[String] = None, optFullForeignKey: Option[(ForeignKeyRelation, String)] = None): Unit = {
val format = step.options(FORMAT)
val subDataSourcePath = getSubDataSourcePath(dataSourceName, plan.name, step, recordTrackingFolderPath)
val optDeleteRecordService = getDeleteRecordService(format)
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/JdbcDeleteRecordService.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/JdbcDeleteRecordService.scala
index bc119d73..54c9b713 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/JdbcDeleteRecordService.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/JdbcDeleteRecordService.scala
@@ -2,7 +2,6 @@ package io.github.datacatering.datacaterer.core.generator.delete
import io.github.datacatering.datacaterer.api.model.Constants.{DRIVER, JDBC_TABLE, MYSQL_DRIVER, PASSWORD, URL, USERNAME}
import io.github.datacatering.datacaterer.core.exception.{InvalidDataSourceOptions, UnsupportedJdbcDeleteDataType}
-import org.apache.log4j.Logger
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/DataSourceMetadataFactory.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/DataSourceMetadataFactory.scala
index 6402f805..08fbc166 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/DataSourceMetadataFactory.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/DataSourceMetadataFactory.scala
@@ -2,11 +2,12 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource
import io.github.datacatering.datacaterer.api.model.Constants.METADATA_SOURCE_TYPE
import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceValidation, Field, Plan, Task, ValidationConfiguration}
+import io.github.datacatering.datacaterer.api.util.ConfigUtil
import io.github.datacatering.datacaterer.api.{PlanRun, ValidationBuilder}
import io.github.datacatering.datacaterer.core.generator.metadata.PlanGenerator.writeToFiles
import io.github.datacatering.datacaterer.core.model.{ForeignKeyRelationship, ValidationConfigurationHelper}
import io.github.datacatering.datacaterer.core.util.MetadataUtil.getMetadataFromConnectionConfig
-import io.github.datacatering.datacaterer.core.util.{ConfigUtil, ForeignKeyUtil, MetadataUtil, SchemaHelper, TaskHelper}
+import io.github.datacatering.datacaterer.core.util.{ForeignKeyUtil, MetadataUtil, SchemaHelper, TaskHelper}
import org.apache.log4j.Logger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, SparkSession}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/http/OpenAPIConverter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/http/OpenAPIConverter.scala
index 7a9f482f..f7d0d501 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/http/OpenAPIConverter.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/http/OpenAPIConverter.scala
@@ -4,7 +4,7 @@ import io.github.datacatering.datacaterer.api.model.Constants.{ARRAY_MAXIMUM_LEN
import io.github.datacatering.datacaterer.api.model.{ArrayType, BinaryType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType}
import io.github.datacatering.datacaterer.core.exception.UnsupportedOpenApiDataTypeException
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata
-import io.github.datacatering.datacaterer.core.model.Constants.{HTTP_HEADER_FIELD_PREFIX, HTTP_PATH_PARAM_FIELD_PREFIX, HTTP_QUERY_PARAM_FIELD_PREFIX, REAL_TIME_BODY_FIELD, REAL_TIME_BODY_CONTENT_FIELD, REAL_TIME_CONTENT_TYPE_FIELD, REAL_TIME_METHOD_FIELD, REAL_TIME_URL_FIELD}
+import io.github.datacatering.datacaterer.core.model.Constants.{HTTP_HEADER_FIELD_PREFIX, HTTP_PATH_PARAM_FIELD_PREFIX, HTTP_QUERY_PARAM_FIELD_PREFIX, REAL_TIME_BODY_CONTENT_FIELD, REAL_TIME_BODY_FIELD, REAL_TIME_CONTENT_TYPE_FIELD, REAL_TIME_METHOD_FIELD, REAL_TIME_URL_FIELD}
import io.swagger.v3.oas.models.PathItem.HttpMethod
import io.swagger.v3.oas.models.media.Schema
import io.swagger.v3.oas.models.parameters.Parameter
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/openlineage/OpenLineageMetadata.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/openlineage/OpenLineageMetadata.scala
index 437e7d59..70665fe3 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/openlineage/OpenLineageMetadata.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/openlineage/OpenLineageMetadata.scala
@@ -1,6 +1,6 @@
package io.github.datacatering.datacaterer.core.generator.metadata.datasource.openlineage
-import io.github.datacatering.datacaterer.api.model.Constants.{DATA_SOURCE_NAME, DEFAULT_FIELD_TYPE, DEFAULT_STEP_NAME, FACET_DATA_SOURCE, FIELD_DATA_TYPE, FIELD_DESCRIPTION, JDBC, JDBC_TABLE, METADATA_IDENTIFIER, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, URI}
+import io.github.datacatering.datacaterer.api.model.Constants.{DATA_SOURCE_NAME, DEFAULT_FIELD_TYPE, FACET_DATA_SOURCE, FIELD_DATA_TYPE, FIELD_DESCRIPTION, JDBC, JDBC_TABLE, METADATA_IDENTIFIER, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, URI}
import io.github.datacatering.datacaterer.core.exception.{FailedMarquezHttpCallException, InvalidMarquezResponseException}
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.{DataSourceMetadata, SubDataSourceMetadata}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala
index b7ba64ca..5a51bb84 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala
@@ -1,19 +1,17 @@
package io.github.datacatering.datacaterer.core.generator.result
import com.fasterxml.jackson.annotation.JsonInclude.Include
-import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_GENERATED_REPORTS_FOLDER_PATH, SPECIFIC_DATA_SOURCE_OPTIONS}
-import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Field, Plan, Step, Task}
+import io.github.datacatering.datacaterer.api.model.Constants.SPECIFIC_DATA_SOURCE_OPTIONS
+import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, DataSourceResultSummary, Field, Plan, Step, StepResultSummary, Task, TaskResultSummary, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_CATERING_SVG, REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_MAIN_CSS, REPORT_RESULT_JSON, REPORT_TASK_HTML, REPORT_VALIDATIONS_HTML}
-import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor
import io.github.datacatering.datacaterer.core.util.FileUtil.writeStringToFile
import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
-import scala.util.{Failure, Success, Try}
import scala.xml.Node
class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfiguration)
@@ -107,7 +105,7 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig
private def resultsAsJson(generationResult: List[DataSourceResult], validationResults: List[ValidationConfigResult]): String = {
val resultMap = Map(
"generation" -> getGenerationJsonSummary(generationResult),
- "validation" -> validationResults.map(_.jsonSummary(validationConfig.numSampleErrorRecords))
+ "validation" -> validationResults.map(_.jsonSummary)
)
OBJECT_MAPPER.writeValueAsString(resultMap)
}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala
index de8a8b6b..ba0553df 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala
@@ -1,10 +1,10 @@
package io.github.datacatering.datacaterer.core.generator.result
import io.github.datacatering.datacaterer.api.model.Constants.HISTOGRAM
-import io.github.datacatering.datacaterer.api.model.{FlagsConfig, Plan, Step, Validation, ValidationConfig}
+import io.github.datacatering.datacaterer.api.model.{DataSourceResult, DataSourceResultSummary, FlagsConfig, Plan, Step, StepResultSummary, TaskResultSummary, Validation, ValidationConfig, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.listener.{SparkRecordListener, SparkTaskRecordSummary}
import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_VALIDATIONS_HTML}
-import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult}
+import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
import io.github.datacatering.datacaterer.core.util.PlanImplicits.CountOps
import org.joda.time.DateTime
@@ -467,7 +467,15 @@ class ResultHtmlWriter {
{keyValueTable(getValidationOptions(validationRes.validation))}
- {if (validationRes.isSuccess) "" else keyValueTable(validationRes.sampleErrorValues.get.take(validationConfig.numSampleErrorRecords).map(e => List(e.json)).toList)}
+ {if (validationRes.isSuccess) {
+ ""
+ } else {
+ keyValueTable(
+ validationRes.sampleErrorValues.getOrElse(Array())
+ .map(e => List(ObjectMapperUtil.jsonObjectMapper.writeValueAsString(e)))
+ .toList
+ )
+ }}
|
})
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ResultModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ResultModels.scala
index a1b70831..0bf1d20f 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ResultModels.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ResultModels.scala
@@ -1,10 +1,6 @@
package io.github.datacatering.datacaterer.core.model
-import io.github.datacatering.datacaterer.api.model.Constants.DEFAULT_DATA_SOURCE_NAME
-import io.github.datacatering.datacaterer.api.model.{Field, Step, Task}
-import io.github.datacatering.datacaterer.core.util.ResultWriterUtil.getSuccessSymbol
-
-import java.time.{Duration, LocalDateTime}
+import io.github.datacatering.datacaterer.api.model.{DataSourceResult, ValidationConfigResult}
case class PlanRunResults(
generationResults: List[DataSourceResult] = List(),
@@ -25,67 +21,4 @@ case class PlanRunResults(
}
}
-case class DataSourceResultSummary(
- name: String,
- numRecords: Long,
- isSuccess: Boolean,
- dataSourceResults: List[DataSourceResult]
- )
-
-case class DataSourceResult(
- name: String = DEFAULT_DATA_SOURCE_NAME,
- task: Task = Task(),
- step: Step = Step(),
- sinkResult: SinkResult = SinkResult(),
- batchNum: Int = 0
- ) {
-
- def summarise: List[String] = {
- val format = sinkResult.format
- val isSuccess = getSuccessSymbol(sinkResult.isSuccess)
- val numRecords = sinkResult.count.toString
- List(name, format, isSuccess, numRecords)
- }
-
- def jsonSummary: Map[String, Any] = {
- Map(
- "name" -> name,
- "options" -> step.options,
- "isSuccess" -> sinkResult.isSuccess,
- "numRecords" -> sinkResult.count
- )
- }
-}
-
-case class TaskResultSummary(
- task: Task,
- numRecords: Long,
- isSuccess: Boolean,
- stepResults: List[StepResultSummary]
- )
-
-case class StepResultSummary(
- step: Step,
- numRecords: Long,
- isSuccess: Boolean,
- dataSourceResults: List[DataSourceResult]
- )
-
-case class SinkResult(
- name: String = DEFAULT_DATA_SOURCE_NAME,
- format: String = "json",
- saveMode: String = "append",
- options: Map[String, String] = Map(),
- count: Long = -1,
- isSuccess: Boolean = true,
- sample: Array[String] = Array(),
- startTime: LocalDateTime = LocalDateTime.now(),
- endTime: LocalDateTime = LocalDateTime.now(),
- generatedMetadata: Array[Field] = Array(),
- exception: Option[Throwable] = None
- ) {
-
- def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds
-}
-
case class RealTimeSinkResult(result: String = "")
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala
index a10c9090..d44dce49 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala
@@ -1,114 +1,11 @@
package io.github.datacatering.datacaterer.core.model
import io.github.datacatering.datacaterer.api.model.Constants.{AGGREGATION_AVG, AGGREGATION_COUNT, AGGREGATION_MAX, AGGREGATION_MIN, AGGREGATION_STDDEV, AGGREGATION_SUM, SPECIFIC_DATA_SOURCE_OPTIONS}
-import io.github.datacatering.datacaterer.api.model.{ExpressionValidation, Validation, ValidationConfiguration}
+import io.github.datacatering.datacaterer.api.model.ValidationConfiguration
import io.github.datacatering.datacaterer.api.{FieldValidationBuilder, ValidationBuilder}
import io.github.datacatering.datacaterer.core.exception.UnsupportedDataValidationAggregateFunctionException
-import io.github.datacatering.datacaterer.core.util.ConfigUtil.cleanseOptions
-import io.github.datacatering.datacaterer.core.util.ResultWriterUtil.getSuccessSymbol
import org.apache.log4j.Logger
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
-import java.time.{Duration, LocalDateTime}
-import scala.collection.mutable
-import scala.math.BigDecimal.RoundingMode
-
-case class ValidationConfigResult(
- name: String = "default_validation_result",
- description: String = "Validation result for data sources",
- dataSourceValidationResults: List[DataSourceValidationResult] = List(),
- startTime: LocalDateTime = LocalDateTime.now(),
- endTime: LocalDateTime = LocalDateTime.now()
- ) {
- def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds
-
- def summarise: List[String] = {
- val validationRes = dataSourceValidationResults.flatMap(_.validationResults)
- if (validationRes.nonEmpty) {
- val (numSuccess, successRate, isSuccess) = baseSummary(validationRes)
- val successRateVisual = s"$numSuccess/${validationRes.size} ($successRate%)"
- List(name, description, getSuccessSymbol(isSuccess), successRateVisual)
- } else List()
- }
-
- def jsonSummary(numErrorSamples: Int): Map[String, Any] = {
- val validationRes = dataSourceValidationResults.flatMap(dsv =>
- dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v))
- )
- if (validationRes.nonEmpty) {
- val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3))
- val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => {
- val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap
- Map(
- "dataSourceName" -> res._1,
- "options" -> cleanseOptions(res._2),
- "validation" -> validationDetails,
- "numErrors" -> res._3.numErrors,
- "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res._3)
- )
- })
- val baseValidationMap = Map(
- "name" -> name,
- "description" -> description,
- "isSuccess" -> isSuccess,
- "numSuccess" -> numSuccess,
- "numValidations" -> validationRes.size,
- "successRate" -> successRate
- )
- if (errorMap.nonEmpty) {
- baseValidationMap ++ Map("errorValidations" -> errorMap)
- } else baseValidationMap
- } else Map()
- }
-
- private def getErrorSamplesAsMap(numErrorSamples: Int, res: ValidationResult): Array[Map[String, Any]] = {
- def parseValueMap[T](valMap: (String, T)): Map[String, Any] = {
- valMap._2 match {
- case genericRow: GenericRowWithSchema =>
- val innerValueMap = genericRow.getValuesMap[T](genericRow.schema.fields.map(_.name))
- Map(valMap._1 -> innerValueMap.flatMap(parseValueMap[T]))
- case wrappedArray: mutable.WrappedArray[_] =>
- Map(valMap._1 -> wrappedArray.map {
- case genericRowWithSchema: GenericRowWithSchema =>
- val innerValueMap = genericRowWithSchema.getValuesMap[T](genericRowWithSchema.schema.fields.map(_.name))
- innerValueMap.flatMap(parseValueMap[T])
- case x => x
- })
- case _ =>
- Map(valMap)
- }
- }
-
- res.sampleErrorValues.get.take(numErrorSamples)
- .map(row => {
- val valuesMap = row.getValuesMap(res.sampleErrorValues.get.columns)
- valuesMap.flatMap(valMap => parseValueMap(valMap))
- })
- }
-
- private def baseSummary(validationRes: List[ValidationResult]): (Int, BigDecimal, Boolean) = {
- val validationSuccess = validationRes.map(_.isSuccess)
- val numSuccess = validationSuccess.count(x => x)
- val successRate = BigDecimal(numSuccess.toDouble / validationRes.size * 100).setScale(2, RoundingMode.HALF_UP)
- val isSuccess = validationSuccess.forall(x => x)
- (numSuccess, successRate, isSuccess)
- }
-}
-
-case class DataSourceValidationResult(
- dataSourceName: String = "default_data_source",
- options: Map[String, String] = Map(),
- validationResults: List[ValidationResult] = List()
- )
-
-case class ValidationResult(
- validation: Validation = ExpressionValidation(),
- isSuccess: Boolean = true,
- numErrors: Long = 0,
- total: Long = 0,
- sampleErrorValues: Option[DataFrame] = None
- )
trait ExternalDataValidation {
val LOGGER: Logger = Logger.getLogger(getClass.getName)
@@ -141,12 +38,6 @@ trait ExternalDataValidation {
}
}
-object ValidationResult {
- def fromValidationWithBaseResult(validation: Validation, validationResult: ValidationResult): ValidationResult = {
- ValidationResult(validation, validationResult.isSuccess, validationResult.numErrors, validationResult.total, validationResult.sampleErrorValues)
- }
-}
-
object ValidationConfigurationHelper {
private val LOGGER = Logger.getLogger(getClass.getName)
@@ -173,9 +64,9 @@ object ValidationConfigurationHelper {
//need to match only on the data source specific options (i.e. table name, file path)
val genDataSourceOpts = genV.options.filter(o => SPECIFIC_DATA_SOURCE_OPTIONS.contains(o._1))
val optMatchingValidations = currentUserDsValid.find(userV => {
- val userDataSourceOpts = userV.options.filter(o => SPECIFIC_DATA_SOURCE_OPTIONS.contains(o._1))
- genDataSourceOpts.forall(genOpt => userDataSourceOpts.get(genOpt._1).contains(genOpt._2))
- })
+ val userDataSourceOpts = userV.options.filter(o => SPECIFIC_DATA_SOURCE_OPTIONS.contains(o._1))
+ genDataSourceOpts.forall(genOpt => userDataSourceOpts.get(genOpt._1).contains(genOpt._2))
+ })
if (optMatchingValidations.isDefined) {
LOGGER.debug(s"Found matching data source with manual validations, merging list of validations together, " +
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala
index 62bcbf43..e56c012c 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala
@@ -1,8 +1,9 @@
package io.github.datacatering.datacaterer.core.plan
import io.github.datacatering.datacaterer.api.PlanRun
-import io.github.datacatering.datacaterer.api.model.Constants.PLAN_CLASS
+import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CATERER_INTERFACE_JAVA, DATA_CATERER_INTERFACE_SCALA, DATA_CATERER_INTERFACE_YAML, PLAN_CLASS}
import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, Task, ValidationConfiguration}
+import io.github.datacatering.datacaterer.core.activity.PlanRunPrePlanProcessor
import io.github.datacatering.datacaterer.core.config.ConfigParser
import io.github.datacatering.datacaterer.core.exception.PlanRunClassNotFoundException
import io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor
@@ -10,7 +11,6 @@ import io.github.datacatering.datacaterer.core.generator.metadata.datasource.Dat
import io.github.datacatering.datacaterer.core.model.Constants.METADATA_CONNECTION_OPTIONS
import io.github.datacatering.datacaterer.core.model.PlanRunResults
import io.github.datacatering.datacaterer.core.parser.PlanParser
-import io.github.datacatering.datacaterer.core.ui.model.PlanRunRequest
import io.github.datacatering.datacaterer.core.util.SparkProvider
import org.apache.spark.sql.SparkSession
@@ -18,7 +18,10 @@ import scala.util.{Success, Try}
object PlanProcessor {
- def determineAndExecutePlan(optPlanRun: Option[PlanRun] = None): PlanRunResults = {
+ def determineAndExecutePlan(
+ optPlanRun: Option[PlanRun] = None,
+ interface: String = DATA_CATERER_INTERFACE_SCALA
+ ): PlanRunResults = {
val optPlanClass = getPlanClass
optPlanClass.map(Class.forName)
.map(cls => {
@@ -26,41 +29,46 @@ object PlanProcessor {
val tryScalaPlan = Try(cls.getDeclaredConstructor().newInstance().asInstanceOf[PlanRun])
val tryJavaPlan = Try(cls.getDeclaredConstructor().newInstance().asInstanceOf[io.github.datacatering.datacaterer.javaapi.api.PlanRun])
(tryScalaPlan, tryJavaPlan) match {
- case (Success(value), _) => value
- case (_, Success(value)) => value.getPlan
+ case (Success(value), _) => (value, DATA_CATERER_INTERFACE_SCALA)
+ case (_, Success(value)) => (value.getPlan, DATA_CATERER_INTERFACE_JAVA)
case _ => throw PlanRunClassNotFoundException(optPlanClass.get)
}
})
- .map(executePlan)
+ .map(p => executePlan(p._1, p._2))
.getOrElse(
- optPlanRun.map(executePlan)
- .getOrElse(executePlan)
+ optPlanRun.map(p => executePlan(p, interface))
+ .getOrElse(executePlan(interface))
)
}
def determineAndExecutePlanJava(planRun: io.github.datacatering.datacaterer.javaapi.api.PlanRun): PlanRunResults =
determineAndExecutePlan(Some(planRun.getPlan))
- private def executePlan(planRun: PlanRun): PlanRunResults = {
+ private def executePlan(planRun: PlanRun, interface: String): PlanRunResults = {
val dataCatererConfiguration = planRun._configuration
- executePlanWithConfig(dataCatererConfiguration, Some(planRun))
+ executePlanWithConfig(dataCatererConfiguration, Some(planRun), interface)
}
- private def executePlan: PlanRunResults = {
+ private def executePlan(interface: String): PlanRunResults = {
val dataCatererConfiguration = ConfigParser.toDataCatererConfiguration
- executePlanWithConfig(dataCatererConfiguration, None)
+ executePlanWithConfig(dataCatererConfiguration, None, interface)
}
- private def executePlanWithConfig(dataCatererConfiguration: DataCatererConfiguration, optPlan: Option[PlanRun]): PlanRunResults = {
+ private def executePlanWithConfig(
+ dataCatererConfiguration: DataCatererConfiguration,
+ optPlan: Option[PlanRun],
+ interface: String
+ ): PlanRunResults = {
val connectionConf = optPlan.map(_._connectionTaskBuilders.flatMap(_.connectionConfigWithTaskBuilder.options).toMap).getOrElse(Map())
implicit val sparkSession: SparkSession = new SparkProvider(dataCatererConfiguration.master, dataCatererConfiguration.runtimeConfig ++ connectionConf).getSparkSession
- val planRun = if (optPlan.isDefined) {
- optPlan.get
+ val (planRun, resolvedInterface) = if (optPlan.isDefined) {
+ (optPlan.get, interface)
} else {
val (parsedPlan, enabledTasks, validations) = PlanParser.getPlanTasksFromYaml(dataCatererConfiguration)
- new YamlPlanRun(parsedPlan, enabledTasks, validations, dataCatererConfiguration)
+ (new YamlPlanRun(parsedPlan, enabledTasks, validations, dataCatererConfiguration), DATA_CATERER_INTERFACE_YAML)
}
+ applyPrePlanProcessors(planRun, dataCatererConfiguration, resolvedInterface)
val optPlanWithTasks = new DataSourceMetadataFactory(dataCatererConfiguration).extractAllDataSourceMetadata(planRun)
val dataGeneratorProcessor = new DataGeneratorProcessor(dataCatererConfiguration)
@@ -80,6 +88,14 @@ object PlanProcessor {
case _ => None
}
}
+
+ private def applyPrePlanProcessors(planRun: PlanRun, dataCatererConfiguration: DataCatererConfiguration, interface: String): Unit = {
+ val prePlanProcessors = List(new PlanRunPrePlanProcessor(dataCatererConfiguration))
+
+ prePlanProcessors.foreach(prePlanProcessor => {
+ if (prePlanProcessor.enabled) prePlanProcessor.apply(planRun._plan.copy(interface = Some(interface)), planRun._tasks, planRun._validations)
+ })
+ }
}
class YamlPlanRun(
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PostPlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PostPlanProcessor.scala
index e24d38fb..4f3c1331 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PostPlanProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PostPlanProcessor.scala
@@ -1,8 +1,7 @@
package io.github.datacatering.datacaterer.core.plan
-import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan}
+import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
-import io.github.datacatering.datacaterer.core.model.{DataSourceResult, ValidationConfigResult}
trait PostPlanProcessor {
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PrePlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PrePlanProcessor.scala
new file mode 100644
index 00000000..68b4e3f0
--- /dev/null
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PrePlanProcessor.scala
@@ -0,0 +1,12 @@
+package io.github.datacatering.datacaterer.core.plan
+
+import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, Task, ValidationConfiguration}
+
+trait PrePlanProcessor {
+
+ val dataCatererConfiguration: DataCatererConfiguration
+ val enabled: Boolean
+
+ def apply(plan: Plan, tasks: List[Task], validations: List[ValidationConfiguration]): Unit
+
+}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala
index 0a23e579..8ea11246 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala
@@ -2,11 +2,11 @@ package io.github.datacatering.datacaterer.core.sink
import com.google.common.util.concurrent.RateLimiter
import io.github.datacatering.datacaterer.api.model.Constants.{DELTA, DELTA_LAKE_SPARK_CONF, DRIVER, FORMAT, ICEBERG, ICEBERG_SPARK_CONF, JDBC, OMIT, PARTITIONS, PARTITION_BY, POSTGRES_DRIVER, RATE, ROWS_PER_SECOND, SAVE_MODE, TABLE}
-import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, MetadataConfig, Step}
+import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, MetadataConfig, SinkResult, Step}
+import io.github.datacatering.datacaterer.api.util.ConfigUtil
import io.github.datacatering.datacaterer.core.exception.{FailedSaveDataDataFrameV2Exception, FailedSaveDataException}
import io.github.datacatering.datacaterer.core.model.Constants.{BATCH, DEFAULT_ROWS_PER_SECOND, FAILED, FINISHED, PER_FIELD_INDEX_FIELD, STARTED}
-import io.github.datacatering.datacaterer.core.model.{RealTimeSinkResult, SinkResult}
-import io.github.datacatering.datacaterer.core.util.ConfigUtil
+import io.github.datacatering.datacaterer.core.model.RealTimeSinkResult
import io.github.datacatering.datacaterer.core.util.GeneratorUtil.determineSaveTiming
import io.github.datacatering.datacaterer.core.util.MetadataUtil.getFieldMetadata
import io.github.datacatering.datacaterer.core.util.ValidationUtil.cleanValidationIdentifier
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkProcessor.scala
index 5996b405..092d5a6a 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkProcessor.scala
@@ -5,9 +5,8 @@ import io.github.datacatering.datacaterer.api.model.Step
import io.github.datacatering.datacaterer.core.exception.UnsupportedRealTimeDataSourceFormat
import io.github.datacatering.datacaterer.core.model.RealTimeSinkResult
import io.github.datacatering.datacaterer.core.sink.http.HttpSinkProcessor
-import io.github.datacatering.datacaterer.core.sink.http.model.HttpResult
import io.github.datacatering.datacaterer.core.sink.jms.JmsSinkProcessor
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import java.util.concurrent.LinkedBlockingQueue
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala
index 64619b19..bd5a71db 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala
@@ -1,6 +1,6 @@
package io.github.datacatering.datacaterer.core.ui.plan
-import io.github.datacatering.datacaterer.api.model.Constants.{CONFIG_FLAGS_DELETE_GENERATED_RECORDS, CONFIG_FLAGS_GENERATE_DATA, CONFIG_FLAGS_GENERATE_VALIDATIONS, DEFAULT_MASTER, DEFAULT_RUNTIME_CONFIG, FORMAT, METADATA_SOURCE_NAME}
+import io.github.datacatering.datacaterer.api.model.Constants.{CONFIG_FLAGS_DELETE_GENERATED_RECORDS, CONFIG_FLAGS_GENERATE_DATA, CONFIG_FLAGS_GENERATE_VALIDATIONS, DATA_CATERER_INTERFACE_UI, DEFAULT_MASTER, DEFAULT_RUNTIME_CONFIG, FORMAT, METADATA_SOURCE_NAME}
import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, Task, ValidationConfiguration, YamlUpstreamDataSourceValidation}
import io.github.datacatering.datacaterer.api.{DataCatererConfigurationBuilder, ValidationBuilder}
import io.github.datacatering.datacaterer.core.exception.SaveFileException
@@ -139,7 +139,7 @@ object PlanRepository {
case Success(planAsYaml) =>
updatePlanRunExecution(planRunExecution, PARSED_PLAN)
val runPlanFuture = Future {
- PlanProcessor.determineAndExecutePlan(Some(planAsYaml))
+ PlanProcessor.determineAndExecutePlan(Some(planAsYaml), DATA_CATERER_INTERFACE_UI)
}
runPlanFuture.onComplete {
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ManagementUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ManagementUtil.scala
new file mode 100644
index 00000000..96918122
--- /dev/null
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ManagementUtil.scala
@@ -0,0 +1,9 @@
+package io.github.datacatering.datacaterer.core.util
+
+object ManagementUtil {
+
+ def getDataCatererManagementUrl: String = {
+ val envVar = System.getenv("DATA_CATERER_MANAGEMENT_URL")
+ if (envVar == null) "http://localhost:8082" else envVar
+ }
+}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/MetadataUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/MetadataUtil.scala
index 9af11c98..52bad39a 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/MetadataUtil.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/MetadataUtil.scala
@@ -2,6 +2,7 @@ package io.github.datacatering.datacaterer.core.util
import io.github.datacatering.datacaterer.api.model.Constants._
import io.github.datacatering.datacaterer.api.model.{Field, MetadataConfig, Step}
+import io.github.datacatering.datacaterer.api.util.ConfigUtil
import io.github.datacatering.datacaterer.core.exception.UnsupportedDataFormatForTrackingException
import io.github.datacatering.datacaterer.core.generator.metadata.ExpressionPredictor
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.DataSourceMetadata
@@ -130,11 +131,11 @@ object MetadataUtil {
}
private def computeFieldStatistics(
- sourceData: DataFrame,
- dataSourceReadOptions: Map[String, String],
- dataSourceName: String,
- dataSourceFormat: String
- )(implicit sparkSession: SparkSession): Unit = {
+ sourceData: DataFrame,
+ dataSourceReadOptions: Map[String, String],
+ dataSourceName: String,
+ dataSourceFormat: String
+ )(implicit sparkSession: SparkSession): Unit = {
//have to create temp view then analyze the field stats which can be found in the cached data
sourceData.createOrReplaceTempView(TEMP_CACHED_TABLE_NAME)
if (!sparkSession.catalog.isCached(TEMP_CACHED_TABLE_NAME)) sparkSession.catalog.cacheTable(TEMP_CACHED_TABLE_NAME)
@@ -152,11 +153,11 @@ object MetadataUtil {
}
def determineIfOneOfField(
- sourceData: DataFrame,
- fieldName: String,
- statisticsMap: Map[String, String],
- metadataConfig: MetadataConfig
- ): Option[Array[String]] = {
+ sourceData: DataFrame,
+ fieldName: String,
+ statisticsMap: Map[String, String],
+ metadataConfig: MetadataConfig
+ ): Option[Array[String]] = {
val fieldDataType = sourceData.schema.fields.find(_.name == fieldName).map(_.dataType)
val count = statisticsMap(ROW_COUNT).toLong
(fieldDataType, count) match {
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ProtobufUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ProtobufUtil.scala
index 1eab20cf..ea1d280e 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ProtobufUtil.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ProtobufUtil.scala
@@ -1,14 +1,10 @@
package io.github.datacatering.datacaterer.core.util
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto
import com.google.protobuf.Descriptors.FieldDescriptor
-import com.google.protobuf.Descriptors.FieldDescriptor.JavaType
import com.google.protobuf.{BoolValue, BytesValue, DescriptorProtos, DoubleValue, FloatValue, Int32Value, Int64Value, StringValue, UInt32Value, UInt64Value, WireFormat}
import io.github.datacatering.datacaterer.api.model.{ArrayType, BinaryType, BooleanType, DataType, DecimalType, DoubleType, Field, FloatType, IntegerType, LongType, MapType, StringType, StructType, TimestampType}
import io.github.datacatering.datacaterer.core.exception.UnsupportedProtobufType
import org.apache.log4j.Logger
-import org.apache.spark.sql.protobuf.utils.SchemaConverters
-import org.apache.spark.sql.types.{DataTypes, StructField}
import java.io.{BufferedInputStream, FileInputStream}
import scala.collection.JavaConverters.asScalaBufferConverter
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationOperations.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationOperations.scala
index d941e627..a6179987 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationOperations.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationOperations.scala
@@ -4,19 +4,20 @@ import io.github.datacatering.datacaterer.api.ValidationBuilder
import io.github.datacatering.datacaterer.api.model.Constants.{AGGREGATION_COUNT, FORMAT, VALIDATION_FIELD_NAME_COUNT_BETWEEN, VALIDATION_FIELD_NAME_COUNT_EQUAL, VALIDATION_FIELD_NAME_MATCH_ORDER, VALIDATION_FIELD_NAME_MATCH_SET, VALIDATION_PREFIX_JOIN_EXPRESSION, VALIDATION_UNIQUE}
import io.github.datacatering.datacaterer.api.model._
import io.github.datacatering.datacaterer.core.exception.{FailedFieldDataValidationException, UnsupportedDataValidationTypeException}
-import io.github.datacatering.datacaterer.core.model.ValidationResult
import io.github.datacatering.datacaterer.core.validator.ValidationHelper.getValidationType
import org.apache.log4j.Logger
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.functions.{col, expr}
-import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import scala.collection.mutable
import scala.util.{Failure, Success, Try}
abstract class ValidationOps(validation: Validation) {
private val LOGGER = Logger.getLogger(getClass.getName)
- def validate(df: DataFrame, dfCount: Long): List[ValidationResult]
+ def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int = 10): List[ValidationResult]
def filterData(df: DataFrame): DataFrame = {
validation.preFilter.map(preFilter => {
@@ -32,13 +33,13 @@ abstract class ValidationOps(validation: Validation) {
}).getOrElse(df)
}
- def validateWithExpression(df: DataFrame, dfCount: Long, expression: String): ValidationResult = {
+ def validateWithExpression(df: DataFrame, dfCount: Long, expression: String, numErrorSamples: Int): ValidationResult = {
val notEqualDf = df.where(s"!($expression)")
- val (isSuccess, sampleErrors, numErrors) = getIsSuccessAndSampleErrors(notEqualDf, dfCount)
+ val (isSuccess, sampleErrors, numErrors) = getIsSuccessAndSampleErrors(notEqualDf, dfCount, numErrorSamples)
ValidationResult(validation, isSuccess, numErrors, dfCount, sampleErrors)
}
- private def getIsSuccessAndSampleErrors(notEqualDf: Dataset[Row], dfCount: Long): (Boolean, Option[DataFrame], Long) = {
+ private def getIsSuccessAndSampleErrors(notEqualDf: Dataset[Row], dfCount: Long, numErrorSamples: Int): (Boolean, Option[Array[Map[String, Any]]], Long) = {
val numErrors = notEqualDf.count()
val (isSuccess, sampleErrors) = (numErrors, validation.errorThreshold) match {
case (c, Some(threshold)) if c > 0 =>
@@ -48,7 +49,31 @@ abstract class ValidationOps(validation: Validation) {
case (c, None) if c > 0 => (false, Some(notEqualDf))
case _ => (true, None)
}
- (isSuccess, sampleErrors, numErrors)
+ val errorsAsMap = sampleErrors.map(ds => {
+ ds.take(numErrorSamples)
+ .map(r => {
+ val valuesMap = r.getValuesMap(ds.columns)
+ valuesMap.flatMap(parseValueMap)
+ })
+ })
+ (isSuccess, errorsAsMap, numErrors)
+ }
+
+ private def parseValueMap[T](valMap: (String, T)): Map[String, Any] = {
+ valMap._2 match {
+ case genericRow: GenericRowWithSchema =>
+ val innerValueMap = genericRow.getValuesMap[T](genericRow.schema.fields.map(_.name))
+ Map(valMap._1 -> innerValueMap.flatMap(parseValueMap[T]))
+ case wrappedArray: mutable.WrappedArray[_] =>
+ Map(valMap._1 -> wrappedArray.map {
+ case genericRowWithSchema: GenericRowWithSchema =>
+ val innerValueMap = genericRowWithSchema.getValuesMap[T](genericRowWithSchema.schema.fields.map(_.name))
+ innerValueMap.flatMap(parseValueMap[T])
+ case x => x
+ })
+ case _ =>
+ Map(valMap)
+ }
}
}
@@ -68,7 +93,7 @@ object ValidationHelper {
}
class FieldValidationsOps(fieldValidations: FieldValidations) extends ValidationOps(fieldValidations) {
- override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = {
+ override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = {
val field = fieldValidations.field
val build = ValidationBuilder().field(field)
fieldValidations.validation.flatMap(v => {
@@ -117,8 +142,8 @@ class FieldValidationsOps(fieldValidations: FieldValidations) extends Validation
val validationWithPreFilter = v.preFilter.map(f => validationWithDescription.preFilter(f)).getOrElse(validationWithDescription)
val tryRunValidation = Try(validationWithPreFilter.validation match {
- case e: ExpressionValidation => new ExpressionValidationOps(e).validate(df, dfCount)
- case g: GroupByValidation => new GroupByValidationOps(g).validate(df, dfCount)
+ case e: ExpressionValidation => new ExpressionValidationOps(e).validate(df, dfCount, numErrorSamples)
+ case g: GroupByValidation => new GroupByValidationOps(g).validate(df, dfCount, numErrorSamples)
})
tryRunValidation match {
case Success(value) => value
@@ -129,15 +154,15 @@ class FieldValidationsOps(fieldValidations: FieldValidations) extends Validation
}
class ExpressionValidationOps(expressionValidation: ExpressionValidation) extends ValidationOps(expressionValidation) {
- override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = {
+ override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = {
//TODO allow for pre-filter? can technically be done via custom sql validation using CASE WHERE ... ELSE true END
val dfWithSelectExpr = df.selectExpr(expressionValidation.selectExpr: _*)
- List(validateWithExpression(dfWithSelectExpr, dfCount, expressionValidation.expr))
+ List(validateWithExpression(dfWithSelectExpr, dfCount, expressionValidation.expr, numErrorSamples))
}
}
class GroupByValidationOps(groupByValidation: GroupByValidation) extends ValidationOps(groupByValidation) {
- override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = {
+ override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = {
//TODO allow for pre and post group filter?
val groupByDf = df.groupBy(groupByValidation.groupByFields.map(col): _*)
val (aggregateDf, validationCount) = if ((groupByValidation.aggField == VALIDATION_UNIQUE || groupByValidation.aggField.isEmpty) && groupByValidation.aggType == AGGREGATION_COUNT) {
@@ -149,7 +174,7 @@ class GroupByValidationOps(groupByValidation: GroupByValidation) extends Validat
))
(aggDf, aggDf.count())
}
- List(validateWithExpression(aggregateDf, validationCount, groupByValidation.aggExpr))
+ List(validateWithExpression(aggregateDf, validationCount, groupByValidation.aggExpr, numErrorSamples))
}
}
@@ -157,14 +182,14 @@ class UpstreamDataSourceValidationOps(
upstreamDataSourceValidation: UpstreamDataSourceValidation,
recordTrackingForValidationFolderPath: String
) extends ValidationOps(upstreamDataSourceValidation) {
- override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = {
+ override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = {
val upstreamDf = getUpstreamData(df.sparkSession)
val joinedDf = getJoinedDf(df, upstreamDf)
val joinedCount = joinedDf.count()
upstreamDataSourceValidation.validations.flatMap(v => {
val baseValidationOp = getValidationType(v.validation, recordTrackingForValidationFolderPath)
- val result = baseValidationOp.validate(joinedDf, joinedCount)
+ val result = baseValidationOp.validate(joinedDf, joinedCount, numErrorSamples)
result.map(r => ValidationResult.fromValidationWithBaseResult(upstreamDataSourceValidation, r))
})
}
@@ -203,37 +228,34 @@ class FieldNamesValidationOps(fieldNamesValidation: FieldNamesValidation) extend
private val LOGGER = Logger.getLogger(getClass.getName)
- override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = {
- implicit val stringEncoder: Encoder[CustomErrorSample] = Encoders.kryo[CustomErrorSample]
-
+ override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = {
val (isSuccess, errorSamples, total) = fieldNamesValidation.fieldNameType match {
case VALIDATION_FIELD_NAME_COUNT_EQUAL =>
val isEqualLength = df.columns.length == fieldNamesValidation.count
- val sample = if (isEqualLength) List() else List(CustomErrorSample(df.columns.length.toString))
+ val sample = if (isEqualLength) Array[Map[String, Any]]() else Array(Map[String, Any]("columnLength" -> df.columns.length.toString))
(isEqualLength, sample, 1)
case VALIDATION_FIELD_NAME_COUNT_BETWEEN =>
val colLength = df.columns.length
val isBetween = colLength >= fieldNamesValidation.min && colLength <= fieldNamesValidation.max
- val sample = if (isBetween) List() else List(CustomErrorSample(colLength.toString))
+ val sample = if (isBetween) Array[Map[String, Any]]() else Array(Map[String, Any]("columnLength" -> colLength.toString))
(isBetween, sample, 1)
case VALIDATION_FIELD_NAME_MATCH_ORDER =>
val zippedNames = df.columns.zip(fieldNamesValidation.names).zipWithIndex
val misalignedNames = zippedNames.filter(n => n._1._1 != n._1._2)
- (misalignedNames.isEmpty, misalignedNames.map(n => CustomErrorSample(s"${n._2}: ${n._1._1} -> ${n._1._2}")).toList, zippedNames.length)
+ val errorSample = misalignedNames.map(n => Map[String, Any](s"field_index_${n._2}" -> s"${n._1._1} -> ${n._1._2}"))
+ (misalignedNames.isEmpty, errorSample, zippedNames.length)
case VALIDATION_FIELD_NAME_MATCH_SET =>
- val missingNames = fieldNamesValidation.names.filter(n => !df.columns.contains(n)).map(CustomErrorSample)
- (missingNames.isEmpty, missingNames.toList, fieldNamesValidation.names.length)
+ val missingNames = fieldNamesValidation.names.filter(n => !df.columns.contains(n))
+ .map(n => Map("missing_field" -> n))
+ .asInstanceOf[Array[Map[String, Any]]]
+ (missingNames.isEmpty, missingNames, fieldNamesValidation.names.length)
case x =>
LOGGER.error(s"Unknown field name validation type, returning as a failed validation, type=$x")
- (false, List(), 1)
+ (false, Array[Map[String, Any]](), 1)
}
- val optErrorSample = if (isSuccess) {
- None
- } else {
- Some(df.sparkSession.createDataFrame(errorSamples))
- }
- List(ValidationResult(fieldNamesValidation, isSuccess, errorSamples.size, total, optErrorSample))
+ val optErrorSample = if (isSuccess) None else Some(errorSamples)
+ List(ValidationResult(fieldNamesValidation, isSuccess, errorSamples.length, total, optErrorSample))
}
}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala
index 256937ff..f7242e09 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala
@@ -2,9 +2,9 @@ package io.github.datacatering.datacaterer.core.validator
import io.github.datacatering.datacaterer.api.ValidationBuilder
import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_ENABLE_VALIDATION, DELTA, DELTA_LAKE_SPARK_CONF, ENABLE_DATA_VALIDATION, FORMAT, HTTP, ICEBERG, ICEBERG_SPARK_CONF, JMS, TABLE, VALIDATION_IDENTIFIER}
-import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, ExpressionValidation, FoldersConfig, GroupByValidation, UpstreamDataSourceValidation, ValidationConfig, ValidationConfiguration}
-import io.github.datacatering.datacaterer.core.model.{DataSourceValidationResult, ValidationConfigResult, ValidationResult}
+import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, DataSourceValidationResult, ExpressionValidation, FoldersConfig, GroupByValidation, UpstreamDataSourceValidation, ValidationConfig, ValidationConfigResult, ValidationConfiguration, ValidationResult}
import io.github.datacatering.datacaterer.core.parser.PlanParser
+import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
import io.github.datacatering.datacaterer.core.util.ValidationUtil.cleanValidationIdentifier
import io.github.datacatering.datacaterer.core.validator.ValidationHelper.getValidationType
import io.github.datacatering.datacaterer.core.validator.ValidationWaitImplicits.ValidationWaitConditionOps
@@ -102,10 +102,10 @@ class ValidationProcessor(
val preFilterData = validation.filterData(df)
if (!preFilterData.storageLevel.useMemory) preFilterData.cache()
val count = preFilterData.count()
- Try(validation.validate(preFilterData, count)) match {
+ Try(validation.validate(preFilterData, count, validationConfig.numSampleErrorRecords)) match {
case Failure(exception) =>
LOGGER.error(s"Failed to run data validation, $validationDescription", exception)
- List(ValidationResult(validBuilder.validation, false, count, count, Some(sparkSession.createDataFrame(Seq(CustomErrorSample(exception.getLocalizedMessage))))))
+ List(ValidationResult(validBuilder.validation, false, count, count, Some(Array(Map("exception" -> exception.getLocalizedMessage)))))
case Success(value) =>
LOGGER.debug(s"Successfully ran data validation, $validationDescription")
value
@@ -186,7 +186,9 @@ class ValidationProcessor(
case UpstreamDataSourceValidation(_, upstreamDataSource, _, _, _) => ("upstreamDataSource", upstreamDataSource.connectionConfigWithTaskBuilder.dataSourceName)
case _ => ("Unknown", "")
}
- val sampleErrors = validationRes.sampleErrorValues.get.take(validationConfig.numSampleErrorRecords).map(_.json).mkString(",")
+ val sampleErrors = validationRes.sampleErrorValues.getOrElse(Array())
+ .map(ObjectMapperUtil.jsonObjectMapper.writeValueAsString)
+ .mkString(",")
LOGGER.error(s"Failed validation: validation-name=${vcr.name}, description=${vcr.description}, data-source-name=${dsr.dataSourceName}, " +
s"data-source-options=${dsr.options}, is-success=${validationRes.isSuccess}, validation-type=$validationType, " +
s"check=$validationCheck, sample-errors=$sampleErrors")
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationWaitImplicits.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationWaitImplicits.scala
index ea50215b..ea79d897 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationWaitImplicits.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationWaitImplicits.scala
@@ -2,8 +2,8 @@ package io.github.datacatering.datacaterer.core.validator
import io.github.datacatering.datacaterer.api.model.Constants.FORMAT
import io.github.datacatering.datacaterer.api.model.{DataExistsWaitCondition, FileExistsWaitCondition, PauseWaitCondition, WaitCondition, WebhookWaitCondition}
+import io.github.datacatering.datacaterer.api.util.ConfigUtil
import io.github.datacatering.datacaterer.core.exception.InvalidWaitConditionException
-import io.github.datacatering.datacaterer.core.util.ConfigUtil
import io.github.datacatering.datacaterer.core.util.HttpUtil.getAuthHeader
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Logger
diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessorTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessorTest.scala
index 548c3173..c0d5f90a 100644
--- a/app/src/test/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessorTest.scala
+++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessorTest.scala
@@ -1,8 +1,7 @@
package io.github.datacatering.datacaterer.core.alert
import io.github.datacatering.datacaterer.api.model.Constants.{ALERT_TRIGGER_ON_ALL, ALERT_TRIGGER_ON_FAILURE, ALERT_TRIGGER_ON_GENERATION_FAILURE, ALERT_TRIGGER_ON_GENERATION_SUCCESS, ALERT_TRIGGER_ON_SUCCESS, ALERT_TRIGGER_ON_VALIDATION_FAILURE, ALERT_TRIGGER_ON_VALIDATION_SUCCESS}
-import io.github.datacatering.datacaterer.api.model.{AlertConfig, DataCatererConfiguration}
-import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceValidationResult, SinkResult, ValidationConfigResult, ValidationResult}
+import io.github.datacatering.datacaterer.api.model.{AlertConfig, DataCatererConfiguration, DataSourceResult, DataSourceValidationResult, SinkResult, ValidationConfigResult, ValidationResult}
import org.junit.runner.RunWith
import org.scalatest.funsuite.AnyFunSuite
import org.scalatestplus.junit.JUnitRunner
diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/model/ValidationOperationsTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/model/ValidationOperationsTest.scala
index 1e64c780..7ca82083 100644
--- a/app/src/test/scala/io/github/datacatering/datacaterer/core/model/ValidationOperationsTest.scala
+++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/model/ValidationOperationsTest.scala
@@ -235,8 +235,8 @@ class ValidationOperationsTest extends SparkSuite {
assertResult(1)(result.size)
assert(!result.head.isSuccess)
assert(result.head.sampleErrorValues.isDefined)
- assertResult(2)(result.head.sampleErrorValues.get.count())
- assertResult(2)(result.head.sampleErrorValues.get.filter(r => r.getAs[Double]("amount") >= 100).count())
+ assertResult(2)(result.head.sampleErrorValues.get.length)
+ assertResult(2)(result.head.sampleErrorValues.get.count(r => r("amount").asInstanceOf[Double] >= 100))
}
test("Can get sample rows when validation is not successful by error threshold greater than 1") {
@@ -246,8 +246,8 @@ class ValidationOperationsTest extends SparkSuite {
assertResult(1)(result.size)
assert(!result.head.isSuccess)
assert(result.head.sampleErrorValues.isDefined)
- assertResult(3)(result.head.sampleErrorValues.get.count())
- assertResult(3)(result.head.sampleErrorValues.get.filter(r => r.getAs[Double]("amount") >= 20).count())
+ assertResult(3)(result.head.sampleErrorValues.get.length)
+ assertResult(3)(result.head.sampleErrorValues.get.count(r => r("amount").asInstanceOf[Double] >= 20))
}
test("Can get sample rows when validation is not successful by error threshold less than 1") {
@@ -257,8 +257,8 @@ class ValidationOperationsTest extends SparkSuite {
assertResult(1)(result.size)
assert(!result.head.isSuccess)
assert(result.head.sampleErrorValues.isDefined)
- assertResult(2)(result.head.sampleErrorValues.get.count())
- assertResult(2)(result.head.sampleErrorValues.get.filter(r => r.getAs[Double]("amount") >= 100).count())
+ assertResult(2)(result.head.sampleErrorValues.get.length)
+ assertResult(2)(result.head.sampleErrorValues.get.count(r => r("amount").asInstanceOf[Double] >= 100))
}
test("Can check field names count is equal") {
@@ -292,7 +292,7 @@ class ValidationOperationsTest extends SparkSuite {
assertResult(5)(result.head.total)
assertResult(2)(result.head.numErrors)
assert(result.head.sampleErrorValues.isDefined)
- assertResult(2)(result.head.sampleErrorValues.get.count())
+ assertResult(2)(result.head.sampleErrorValues.get.length)
}
test("Can show error when field name not in set") {
@@ -304,6 +304,6 @@ class ValidationOperationsTest extends SparkSuite {
assertResult(4)(result.head.total)
assertResult(1)(result.head.numErrors)
assert(result.head.sampleErrorValues.isDefined)
- assertResult(1)(result.head.sampleErrorValues.get.count())
+ assertResult(1)(result.head.sampleErrorValues.get.length)
}
}
diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala
index 1921d895..2ddc2f32 100644
--- a/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala
+++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala
@@ -1,10 +1,9 @@
package io.github.datacatering.datacaterer.core.validator
import io.github.datacatering.datacaterer.api.model.Constants.{DELTA, DELTA_LAKE_SPARK_CONF, FORMAT, ICEBERG, ICEBERG_SPARK_CONF, PATH, TABLE}
-import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, FoldersConfig, ValidationConfig, ValidationConfiguration}
+import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, FoldersConfig, ValidationConfig, ValidationConfigResult, ValidationConfiguration}
import io.github.datacatering.datacaterer.api.{PreFilterBuilder, ValidationBuilder}
-import io.github.datacatering.datacaterer.core.model.ValidationConfigResult
-import io.github.datacatering.datacaterer.core.util.{ObjectMapperUtil, SparkSuite, Transaction}
+import io.github.datacatering.datacaterer.core.util.{SparkSuite, Transaction}
import org.junit.runner.RunWith
import org.scalatestplus.junit.JUnitRunner
diff --git a/gradle.properties b/gradle.properties
index b30dcd70..22ef1e4b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,5 +1,5 @@
groupId=io.github.data-catering
-version=0.13.1
+version=0.13.2
scalaVersion=2.12
scalaSpecificVersion=2.12.19