From 4f8395acd3a1e7b23aadca93c51a2ca2cd120243 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Thu, 26 Dec 2024 17:41:47 +0800 Subject: [PATCH] Add in plan results, clean up imports --- .../api/DataCatererConfigurationBuilder.scala | 4 +- .../api/MetadataSourceBuilder.scala | 2 +- .../datacaterer/api/PlanBuilder.scala | 2 +- .../datacaterer/api/TaskBuilder.scala | 2 +- .../datacaterer/api/model/ConfigModels.scala | 5 +- .../datacaterer/api/model/Constants.scala | 7 + .../api/model/MetadataSourceModels.scala | 2 +- .../datacaterer/api/model/PlanModels.scala | 5 +- .../datacaterer/api/model/ResultModels.scala | 162 ++++++++++++++++++ .../datacaterer/api}/util/ConfigUtil.scala | 2 +- .../api}/util/ResultWriterUtil.scala | 2 +- .../activity/PlanRunPostPlanProcessor.scala | 46 +++++ .../activity/PlanRunPrePlanProcessor.scala | 44 +++++ .../core/alert/AlertProcessor.scala | 3 +- .../core/alert/SlackAlertProcessor.scala | 3 +- .../core/exception/Exceptions.scala | 2 +- .../core/generator/BatchDataProcessor.scala | 3 +- .../generator/DataGeneratorProcessor.scala | 11 +- .../delete/DeleteRecordProcessor.scala | 4 +- .../delete/JdbcDeleteRecordService.scala | 1 - .../DataSourceMetadataFactory.scala | 3 +- .../datasource/http/OpenAPIConverter.scala | 2 +- .../openlineage/OpenLineageMetadata.scala | 2 +- .../result/DataGenerationResultWriter.scala | 10 +- .../generator/result/ResultHtmlWriter.scala | 14 +- .../datacaterer/core/model/ResultModels.scala | 69 +------- .../core/model/ValidationModels.scala | 117 +------------ .../datacaterer/core/plan/PlanProcessor.scala | 48 ++++-- .../core/plan/PostPlanProcessor.scala | 3 +- .../core/plan/PrePlanProcessor.scala | 12 ++ .../datacaterer/core/sink/SinkFactory.scala | 6 +- .../datacaterer/core/sink/SinkProcessor.scala | 3 +- .../core/ui/plan/PlanRepository.scala | 4 +- .../core/util/ManagementUtil.scala | 9 + .../datacaterer/core/util/MetadataUtil.scala | 21 +-- .../datacaterer/core/util/ProtobufUtil.scala | 4 - .../core/validator/ValidationOperations.scala | 84 +++++---- .../core/validator/ValidationProcessor.scala | 12 +- .../validator/ValidationWaitImplicits.scala | 2 +- .../core/alert/AlertProcessorTest.scala | 3 +- .../core/model/ValidationOperationsTest.scala | 16 +- .../validator/ValidationProcessorTest.scala | 5 +- gradle.properties | 2 +- 43 files changed, 455 insertions(+), 308 deletions(-) create mode 100644 api/src/main/scala/io/github/datacatering/datacaterer/api/model/ResultModels.scala rename {app/src/main/scala/io/github/datacatering/datacaterer/core => api/src/main/scala/io/github/datacatering/datacaterer/api}/util/ConfigUtil.scala (87%) rename {app/src/main/scala/io/github/datacatering/datacaterer/core => api/src/main/scala/io/github/datacatering/datacaterer/api}/util/ResultWriterUtil.scala (70%) create mode 100644 app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPostPlanProcessor.scala create mode 100644 app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPrePlanProcessor.scala create mode 100644 app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PrePlanProcessor.scala create mode 100644 app/src/main/scala/io/github/datacatering/datacaterer/core/util/ManagementUtil.scala diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala index 82c994df..4c501135 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala @@ -1,9 +1,9 @@ package io.github.datacatering.datacaterer.api -import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap -import io.github.datacatering.datacaterer.api.model.Constants._ import com.softwaremill.quicklens.ModifyPimp import io.github.datacatering.datacaterer.api.connection.{CassandraBuilder, ConnectionTaskBuilder, FileBuilder, HttpBuilder, KafkaBuilder, MySqlBuilder, NoopBuilder, PostgresBuilder, SolaceBuilder} +import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap +import io.github.datacatering.datacaterer.api.model.Constants._ import io.github.datacatering.datacaterer.api.model.DataCatererConfiguration import scala.annotation.varargs diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala index 6578d945..c7309919 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala @@ -1,8 +1,8 @@ package io.github.datacatering.datacaterer.api +import com.softwaremill.quicklens.ModifyPimp import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY_ID, CONFLUENT_SCHEMA_REGISTRY_SUBJECT, CONFLUENT_SCHEMA_REGISTRY_VERSION, DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION} -import com.softwaremill.quicklens.ModifyPimp import io.github.datacatering.datacaterer.api.model.{ConfluentSchemaRegistrySource, DataContractCliSource, GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource} case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadataSource()) { diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala index 401b57bf..877a893e 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala @@ -2,9 +2,9 @@ package io.github.datacatering.datacaterer.api import com.softwaremill.quicklens.ModifyPimp import io.github.datacatering.datacaterer.api +import io.github.datacatering.datacaterer.api.connection.ConnectionTaskBuilder import io.github.datacatering.datacaterer.api.converter.Converters.toScalaList import io.github.datacatering.datacaterer.api.model.Constants.METADATA_SOURCE_TYPE -import io.github.datacatering.datacaterer.api.connection.ConnectionTaskBuilder import io.github.datacatering.datacaterer.api.model.{ForeignKeyRelation, Plan} import scala.annotation.varargs diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala index 86408e60..cbe7b94d 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala @@ -3,7 +3,7 @@ package io.github.datacatering.datacaterer.api import com.softwaremill.quicklens.ModifyPimp import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap import io.github.datacatering.datacaterer.api.model.Constants._ -import io.github.datacatering.datacaterer.api.model.{Count, DataType, Field, PerFieldCount, Step, StringType, Task, TaskSummary} +import io.github.datacatering.datacaterer.api.model.{Count, DataType, Field, PerFieldCount, Step, Task, TaskSummary} import scala.annotation.varargs diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ConfigModels.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ConfigModels.scala index 254b58d1..cbc49f04 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ConfigModels.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ConfigModels.scala @@ -1,6 +1,6 @@ package io.github.datacatering.datacaterer.api.model -import Constants._ +import io.github.datacatering.datacaterer.api.model.Constants._ case class FlagsConfig( enableCount: Boolean = DEFAULT_ENABLE_COUNT, @@ -14,7 +14,8 @@ case class FlagsConfig( enableSaveReports: Boolean = DEFAULT_ENABLE_SAVE_REPORTS, enableValidation: Boolean = DEFAULT_ENABLE_VALIDATION, enableGenerateValidations: Boolean = DEFAULT_ENABLE_SUGGEST_VALIDATIONS, - enableAlerts: Boolean = DEFAULT_ENABLE_ALERTS + enableAlerts: Boolean = DEFAULT_ENABLE_ALERTS, + enableTrackActivity: Boolean = DEFAULT_ENABLE_TRACK_ACTIVITY ) case class FoldersConfig( diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala index b55b910f..e6e73010 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala @@ -189,6 +189,7 @@ object Constants { lazy val DEFAULT_ENABLE_VALIDATION = true lazy val DEFAULT_ENABLE_SUGGEST_VALIDATIONS = false lazy val DEFAULT_ENABLE_ALERTS = true + lazy val DEFAULT_ENABLE_TRACK_ACTIVITY = true //folders defaults lazy val DEFAULT_PLAN_FILE_PATH = "/opt/app/plan/customer-create-plan.yaml" @@ -552,4 +553,10 @@ object Constants { lazy val PLAN_RUN_EXECUTION_DELIMITER_REGEX = "\\|\\|" lazy val PLAN_RUN_SUMMARY_DELIMITER = "&&" + //source of plan run + lazy val DATA_CATERER_INTERFACE_JAVA = "java" + lazy val DATA_CATERER_INTERFACE_SCALA = "scala" + lazy val DATA_CATERER_INTERFACE_UI = "ui" + lazy val DATA_CATERER_INTERFACE_YAML = "yaml" + } diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala index 505b42df..a547aaf8 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala @@ -1,6 +1,6 @@ package io.github.datacatering.datacaterer.api.model -import Constants.{CONFLUENT_SCHEMA_REGISTRY, DATA_CONTRACT_CLI, GREAT_EXPECTATIONS, MARQUEZ, METADATA_SOURCE_HAS_OPEN_LINEAGE_SUPPORT, METADATA_SOURCE_TYPE, OPEN_API, OPEN_DATA_CONTRACT_STANDARD, OPEN_METADATA} +import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY, DATA_CONTRACT_CLI, GREAT_EXPECTATIONS, MARQUEZ, METADATA_SOURCE_HAS_OPEN_LINEAGE_SUPPORT, METADATA_SOURCE_TYPE, OPEN_API, OPEN_DATA_CONTRACT_STANDARD, OPEN_METADATA} trait MetadataSource { diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/PlanModels.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/PlanModels.scala index 8edae571..1448868c 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/PlanModels.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/PlanModels.scala @@ -1,7 +1,7 @@ package io.github.datacatering.datacaterer.api.model import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD_NAME, DEFAULT_FIELD_NULLABLE, DEFAULT_FIELD_TYPE, DEFAULT_GENERATOR_TYPE, DEFAULT_PER_FIELD_COUNT_RECORDS, DEFAULT_STEP_ENABLED, DEFAULT_STEP_NAME, DEFAULT_STEP_TYPE, DEFAULT_TASK_NAME, DEFAULT_TASK_SUMMARY_ENABLE, FOREIGN_KEY_DELIMITER} +import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD_NAME, DEFAULT_FIELD_NULLABLE, DEFAULT_FIELD_TYPE, DEFAULT_PER_FIELD_COUNT_RECORDS, DEFAULT_STEP_ENABLED, DEFAULT_STEP_NAME, DEFAULT_STEP_TYPE, DEFAULT_TASK_NAME, DEFAULT_TASK_SUMMARY_ENABLE, FOREIGN_KEY_DELIMITER} import scala.language.implicitConversions @@ -11,7 +11,8 @@ case class Plan( tasks: List[TaskSummary] = List(), sinkOptions: Option[SinkOptions] = None, validations: List[String] = List(), - runId: Option[String] = None + runId: Option[String] = None, + interface: Option[String] = None ) case class SinkOptions( diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ResultModels.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ResultModels.scala new file mode 100644 index 00000000..5914e436 --- /dev/null +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/ResultModels.scala @@ -0,0 +1,162 @@ +package io.github.datacatering.datacaterer.api.model + +import io.github.datacatering.datacaterer.api.model.Constants.DEFAULT_DATA_SOURCE_NAME +import io.github.datacatering.datacaterer.api.util.ConfigUtil.cleanseOptions +import io.github.datacatering.datacaterer.api.util.ResultWriterUtil.getSuccessSymbol + +import java.time.{Duration, LocalDateTime} +import scala.math.BigDecimal.RoundingMode + + +case class DataSourceResultSummary( + name: String, + numRecords: Long, + isSuccess: Boolean, + dataSourceResults: List[DataSourceResult] + ) + +case class DataSourceResult( + name: String = DEFAULT_DATA_SOURCE_NAME, + task: Task = Task(), + step: Step = Step(), + sinkResult: SinkResult = SinkResult(), + batchNum: Int = 0 + ) { + + def summarise: List[String] = { + val format = sinkResult.format + val isSuccess = getSuccessSymbol(sinkResult.isSuccess) + val numRecords = sinkResult.count.toString + List(name, format, isSuccess, numRecords) + } + + def jsonSummary: Map[String, Any] = { + Map( + "name" -> name, + "options" -> step.options, + "isSuccess" -> sinkResult.isSuccess, + "numRecords" -> sinkResult.count + ) + } +} + +case class TaskResultSummary( + task: Task, + numRecords: Long, + isSuccess: Boolean, + stepResults: List[StepResultSummary] + ) + +case class StepResultSummary( + step: Step, + numRecords: Long, + isSuccess: Boolean, + dataSourceResults: List[DataSourceResult] + ) + +case class SinkResult( + name: String = DEFAULT_DATA_SOURCE_NAME, + format: String = "json", + saveMode: String = "append", + options: Map[String, String] = Map(), + count: Long = -1, + isSuccess: Boolean = true, + sample: Array[String] = Array(), + startTime: LocalDateTime = LocalDateTime.now(), + endTime: LocalDateTime = LocalDateTime.now(), + generatedMetadata: Array[Field] = Array(), + exception: Option[Throwable] = None + ) { + + def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds +} + + +case class ValidationConfigResult( + name: String = "default_validation_result", + description: String = "Validation result for data sources", + dataSourceValidationResults: List[DataSourceValidationResult] = List(), + startTime: LocalDateTime = LocalDateTime.now(), + endTime: LocalDateTime = LocalDateTime.now() + ) { + def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds + + def summarise: List[String] = { + val validationRes = dataSourceValidationResults.flatMap(_.validationResults) + if (validationRes.nonEmpty) { + val (numSuccess, successRate, isSuccess) = baseSummary(validationRes) + val successRateVisual = s"$numSuccess/${validationRes.size} ($successRate%)" + List(name, description, getSuccessSymbol(isSuccess), successRateVisual) + } else List() + } + + def jsonSummary: Map[String, Any] = { + val validationRes = dataSourceValidationResults.flatMap(dsv => + dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v)) + ) + if (validationRes.nonEmpty) { + val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3)) + val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => { + val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap + Map( + "dataSourceName" -> res._1, + "options" -> cleanseOptions(res._2), + "validation" -> validationDetails, + "numErrors" -> res._3.numErrors, + "sampleErrorValues" -> res._3.sampleErrorValues.getOrElse(Array()) + ) + }) + val baseValidationMap = Map( + "name" -> name, + "description" -> description, + "isSuccess" -> isSuccess, + "numSuccess" -> numSuccess, + "numValidations" -> validationRes.size, + "successRate" -> successRate + ) + if (errorMap.nonEmpty) { + baseValidationMap ++ Map("errorValidations" -> errorMap) + } else baseValidationMap + } else Map() + } + + private def baseSummary(validationRes: List[ValidationResult]): (Int, BigDecimal, Boolean) = { + val validationSuccess = validationRes.map(_.isSuccess) + val numSuccess = validationSuccess.count(x => x) + val successRate = BigDecimal(numSuccess.toDouble / validationRes.size * 100).setScale(2, RoundingMode.HALF_UP) + val isSuccess = validationSuccess.forall(x => x) + (numSuccess, successRate, isSuccess) + } +} + +case class DataSourceValidationResult( + dataSourceName: String = "default_data_source", + options: Map[String, String] = Map(), + validationResults: List[ValidationResult] = List() + ) + +case class ValidationResult( + validation: Validation = ExpressionValidation(), + isSuccess: Boolean = true, + numErrors: Long = 0, + total: Long = 0, + sampleErrorValues: Option[Array[Map[String, Any]]] = None + ) + +object ValidationResult { + def fromValidationWithBaseResult(validation: Validation, validationResult: ValidationResult): ValidationResult = { + ValidationResult(validation, validationResult.isSuccess, validationResult.numErrors, validationResult.total, validationResult.sampleErrorValues) + } +} + +case class PlanResults( + plan: Plan, + generationResult: List[DataSourceResult], + validationResults: List[ValidationConfigResult] + ) + +case class PlanRunSummary( + plan: Plan, + tasks: List[Task], + validations: List[ValidationConfiguration] + ) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ConfigUtil.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/util/ConfigUtil.scala similarity index 87% rename from app/src/main/scala/io/github/datacatering/datacaterer/core/util/ConfigUtil.scala rename to api/src/main/scala/io/github/datacatering/datacaterer/api/util/ConfigUtil.scala index 1cf8a19c..80d4c07a 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ConfigUtil.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/util/ConfigUtil.scala @@ -1,4 +1,4 @@ -package io.github.datacatering.datacaterer.core.util +package io.github.datacatering.datacaterer.api.util object ConfigUtil { diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ResultWriterUtil.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/util/ResultWriterUtil.scala similarity index 70% rename from app/src/main/scala/io/github/datacatering/datacaterer/core/util/ResultWriterUtil.scala rename to api/src/main/scala/io/github/datacatering/datacaterer/api/util/ResultWriterUtil.scala index e7e9b3c5..93261f89 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ResultWriterUtil.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/util/ResultWriterUtil.scala @@ -1,4 +1,4 @@ -package io.github.datacatering.datacaterer.core.util +package io.github.datacatering.datacaterer.api.util object ResultWriterUtil { diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPostPlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPostPlanProcessor.scala new file mode 100644 index 00000000..e1942e2a --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPostPlanProcessor.scala @@ -0,0 +1,46 @@ +package io.github.datacatering.datacaterer.core.activity + +import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, PlanResults, ValidationConfigResult} +import io.github.datacatering.datacaterer.core.listener.SparkRecordListener +import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor +import io.github.datacatering.datacaterer.core.util.ManagementUtil.getDataCatererManagementUrl +import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil +import org.apache.log4j.Logger +import org.asynchttpclient.AsyncHttpClient +import org.asynchttpclient.Dsl.asyncHttpClient + +import scala.compat.java8.FutureConverters.CompletionStageOps +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success} + +class PlanRunPostPlanProcessor(val dataCatererConfiguration: DataCatererConfiguration) extends PostPlanProcessor { + + override val enabled: Boolean = dataCatererConfiguration.flagsConfig.enableTrackActivity + + private val LOGGER = Logger.getLogger(getClass.getName) + private val dataCatererManagementUrl = getDataCatererManagementUrl + private val http: AsyncHttpClient = asyncHttpClient + + override def apply( + plan: Plan, + sparkRecordListener: SparkRecordListener, + generationResult: List[DataSourceResult], + validationResults: List[ValidationConfigResult] + ): Unit = { + val planResults = PlanResults(plan, generationResult, validationResults) + val jsonBody = ObjectMapperUtil.jsonObjectMapper.writeValueAsString(planResults) + val url = s"$dataCatererManagementUrl/plan/finish" + val prepareRequest = http.prepare("POST", url) + .setBody(jsonBody) + + val futureResp = prepareRequest.execute().toCompletableFuture.toScala + futureResp.onComplete { + case Success(_) => + LOGGER.debug(s"Successfully posted run results, url=$url") + http.close() + case Failure(exception) => + LOGGER.debug(s"Failed to post run results, url=$url", exception) + http.close() + } + } +} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPrePlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPrePlanProcessor.scala new file mode 100644 index 00000000..feff55c7 --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/activity/PlanRunPrePlanProcessor.scala @@ -0,0 +1,44 @@ +package io.github.datacatering.datacaterer.core.activity + +import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, PlanRunSummary, Task, ValidationConfiguration} +import io.github.datacatering.datacaterer.core.plan.PrePlanProcessor +import io.github.datacatering.datacaterer.core.util.ManagementUtil.getDataCatererManagementUrl +import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil +import org.apache.log4j.Logger +import org.asynchttpclient.AsyncHttpClient +import org.asynchttpclient.Dsl.asyncHttpClient + +import scala.compat.java8.FutureConverters.CompletionStageOps +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success} + +class PlanRunPrePlanProcessor(val dataCatererConfiguration: DataCatererConfiguration) extends PrePlanProcessor { + + override val enabled: Boolean = dataCatererConfiguration.flagsConfig.enableTrackActivity + + private val LOGGER = Logger.getLogger(getClass.getName) + private val dataCatererManagementUrl = getDataCatererManagementUrl + private val http: AsyncHttpClient = asyncHttpClient + + override def apply( + plan: Plan, + tasks: List[Task], + validations: List[ValidationConfiguration] + ): Unit = { + val planRunSummary = PlanRunSummary(plan, tasks, validations) + val jsonBody = ObjectMapperUtil.jsonObjectMapper.writeValueAsString(planRunSummary) + val url = s"$dataCatererManagementUrl/plan/start" + val prepareRequest = http.prepare("POST", url) + .setBody(jsonBody) + + val futureResp = prepareRequest.execute().toCompletableFuture.toScala + futureResp.onComplete { + case Success(_) => + LOGGER.debug(s"Successfully posted run start, url=$url") + http.close() + case Failure(exception) => + LOGGER.debug(s"Failed to post run start, url=$url", exception) + http.close() + } + } +} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessor.scala index b031aab4..2d59252e 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessor.scala @@ -1,9 +1,8 @@ package io.github.datacatering.datacaterer.core.alert import io.github.datacatering.datacaterer.api.model.Constants.{ALERT_TRIGGER_ON_ALL, ALERT_TRIGGER_ON_FAILURE, ALERT_TRIGGER_ON_GENERATION_FAILURE, ALERT_TRIGGER_ON_GENERATION_SUCCESS, ALERT_TRIGGER_ON_SUCCESS, ALERT_TRIGGER_ON_VALIDATION_FAILURE, ALERT_TRIGGER_ON_VALIDATION_SUCCESS} -import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan} +import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, ValidationConfigResult} import io.github.datacatering.datacaterer.core.listener.SparkRecordListener -import io.github.datacatering.datacaterer.core.model.{DataSourceResult, ValidationConfigResult} import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor import org.apache.log4j.Logger diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/SlackAlertProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/SlackAlertProcessor.scala index fd4c2d2c..7c8f6c40 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/SlackAlertProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/alert/SlackAlertProcessor.scala @@ -3,9 +3,8 @@ package io.github.datacatering.datacaterer.core.alert import com.slack.api.Slack import com.slack.api.methods.MethodsClient import com.slack.api.methods.request.chat.ChatPostMessageRequest -import io.github.datacatering.datacaterer.api.model.SlackAlertConfig +import io.github.datacatering.datacaterer.api.model.{DataSourceResult, SlackAlertConfig, ValidationConfigResult} import io.github.datacatering.datacaterer.core.model.Constants.REPORT_HOME_HTML -import io.github.datacatering.datacaterer.core.model.{DataSourceResult, ValidationConfigResult} import org.apache.log4j.Logger import scala.util.{Failure, Success, Try} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/exception/Exceptions.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/exception/Exceptions.scala index 2361567c..5659200c 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/exception/Exceptions.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/exception/Exceptions.scala @@ -1,7 +1,7 @@ package io.github.datacatering.datacaterer.core.exception import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, FOREIGN_KEY_DELIMITER, FOREIGN_KEY_PLAN_FILE_DELIMITER} -import io.github.datacatering.datacaterer.api.model.{Count, Field, Step, Validation} +import io.github.datacatering.datacaterer.api.model.{Field, Step, Validation} import org.apache.spark.sql.types.{DataType, Metadata, StructField} //files diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala index a8cbb571..9875caab 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala @@ -1,10 +1,9 @@ package io.github.datacatering.datacaterer.core.generator import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_ENABLE_GENERATE_DATA, ENABLE_DATA_GENERATION, SAVE_MODE} -import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, GenerationConfig, MetadataConfig, Plan, Step, Task, TaskSummary, UpstreamDataSourceValidation, ValidationConfiguration} +import io.github.datacatering.datacaterer.api.model.{DataSourceResult, FlagsConfig, FoldersConfig, GenerationConfig, MetadataConfig, Plan, Step, Task, TaskSummary, UpstreamDataSourceValidation, ValidationConfiguration} import io.github.datacatering.datacaterer.core.exception.InvalidRandomSeedException import io.github.datacatering.datacaterer.core.generator.track.RecordTrackingProcessor -import io.github.datacatering.datacaterer.core.model.DataSourceResult import io.github.datacatering.datacaterer.core.sink.SinkFactory import io.github.datacatering.datacaterer.core.util.GeneratorUtil.getDataSourceName import io.github.datacatering.datacaterer.core.util.PlanImplicits.PerFieldCountOps diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala index 67290f4a..cc0785d8 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala @@ -1,11 +1,12 @@ package io.github.datacatering.datacaterer.core.generator -import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, Task, TaskSummary, ValidationConfiguration} +import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, Task, TaskSummary, ValidationConfigResult, ValidationConfiguration} +import io.github.datacatering.datacaterer.core.activity.PlanRunPostPlanProcessor import io.github.datacatering.datacaterer.core.alert.AlertProcessor import io.github.datacatering.datacaterer.core.generator.delete.DeleteRecordProcessor import io.github.datacatering.datacaterer.core.generator.result.DataGenerationResultWriter import io.github.datacatering.datacaterer.core.listener.SparkRecordListener -import io.github.datacatering.datacaterer.core.model.{DataSourceResult, PlanRunResults, ValidationConfigResult} +import io.github.datacatering.datacaterer.core.model.PlanRunResults import io.github.datacatering.datacaterer.core.parser.PlanParser import io.github.datacatering.datacaterer.core.util.PlanImplicits.TaskOps import io.github.datacatering.datacaterer.core.validator.ValidationProcessor @@ -80,7 +81,11 @@ class DataGeneratorProcessor(dataCatererConfiguration: DataCatererConfiguration) private def applyPostPlanProcessors(plan: Plan, sparkRecordListener: SparkRecordListener, generationResult: List[DataSourceResult], validationResults: List[ValidationConfigResult]): Unit = { - val postPlanProcessors = List(new DataGenerationResultWriter(dataCatererConfiguration), new AlertProcessor(dataCatererConfiguration)) + val postPlanProcessors = List( + new DataGenerationResultWriter(dataCatererConfiguration), + new AlertProcessor(dataCatererConfiguration), + new PlanRunPostPlanProcessor(dataCatererConfiguration), + ) postPlanProcessors.foreach(postPlanProcessor => { if (postPlanProcessor.enabled) postPlanProcessor.apply(plan, sparkRecordListener, generationResult, validationResults) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/DeleteRecordProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/DeleteRecordProcessor.scala index 0ed2cbc9..dd2ac402 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/DeleteRecordProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/DeleteRecordProcessor.scala @@ -2,8 +2,8 @@ package io.github.datacatering.datacaterer.core.generator.delete import io.github.datacatering.datacaterer.api.model.Constants.{CASSANDRA, CSV, DELTA, FOREIGN_KEY_DELIMITER, FORMAT, JDBC, JSON, ORC, PARQUET, PATH} import io.github.datacatering.datacaterer.api.model.{ForeignKeyRelation, Plan, Step, Task, TaskSummary} +import io.github.datacatering.datacaterer.api.util.ConfigUtil.cleanseOptions import io.github.datacatering.datacaterer.core.model.Constants.RECORD_TRACKING_VALIDATION_FORMAT -import io.github.datacatering.datacaterer.core.util.ConfigUtil.cleanseOptions import io.github.datacatering.datacaterer.core.util.MetadataUtil.getSubDataSourcePath import io.github.datacatering.datacaterer.core.util.PlanImplicits.SinkOptionsOps import io.github.datacatering.datacaterer.core.util.{ForeignKeyRelationHelper, ForeignKeyUtil} @@ -82,7 +82,7 @@ class DeleteRecordProcessor(connectionConfigsByName: Map[String, Map[String, Str } private def deleteRecords(dataSourceName: String, plan: Plan, step: Step, stepsByName: Map[String, Step] = Map(), - optSourceForeignKey: Option[String] = None, optFullForeignKey: Option[(ForeignKeyRelation, String)] = None): Unit = { + optSourceForeignKey: Option[String] = None, optFullForeignKey: Option[(ForeignKeyRelation, String)] = None): Unit = { val format = step.options(FORMAT) val subDataSourcePath = getSubDataSourcePath(dataSourceName, plan.name, step, recordTrackingFolderPath) val optDeleteRecordService = getDeleteRecordService(format) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/JdbcDeleteRecordService.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/JdbcDeleteRecordService.scala index bc119d73..54c9b713 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/JdbcDeleteRecordService.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/delete/JdbcDeleteRecordService.scala @@ -2,7 +2,6 @@ package io.github.datacatering.datacaterer.core.generator.delete import io.github.datacatering.datacaterer.api.model.Constants.{DRIVER, JDBC_TABLE, MYSQL_DRIVER, PASSWORD, URL, USERNAME} import io.github.datacatering.datacaterer.core.exception.{InvalidDataSourceOptions, UnsupportedJdbcDeleteDataType} -import org.apache.log4j.Logger import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/DataSourceMetadataFactory.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/DataSourceMetadataFactory.scala index 6402f805..08fbc166 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/DataSourceMetadataFactory.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/DataSourceMetadataFactory.scala @@ -2,11 +2,12 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource import io.github.datacatering.datacaterer.api.model.Constants.METADATA_SOURCE_TYPE import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceValidation, Field, Plan, Task, ValidationConfiguration} +import io.github.datacatering.datacaterer.api.util.ConfigUtil import io.github.datacatering.datacaterer.api.{PlanRun, ValidationBuilder} import io.github.datacatering.datacaterer.core.generator.metadata.PlanGenerator.writeToFiles import io.github.datacatering.datacaterer.core.model.{ForeignKeyRelationship, ValidationConfigurationHelper} import io.github.datacatering.datacaterer.core.util.MetadataUtil.getMetadataFromConnectionConfig -import io.github.datacatering.datacaterer.core.util.{ConfigUtil, ForeignKeyUtil, MetadataUtil, SchemaHelper, TaskHelper} +import io.github.datacatering.datacaterer.core.util.{ForeignKeyUtil, MetadataUtil, SchemaHelper, TaskHelper} import org.apache.log4j.Logger import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Dataset, SparkSession} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/http/OpenAPIConverter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/http/OpenAPIConverter.scala index 7a9f482f..f7d0d501 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/http/OpenAPIConverter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/http/OpenAPIConverter.scala @@ -4,7 +4,7 @@ import io.github.datacatering.datacaterer.api.model.Constants.{ARRAY_MAXIMUM_LEN import io.github.datacatering.datacaterer.api.model.{ArrayType, BinaryType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType} import io.github.datacatering.datacaterer.core.exception.UnsupportedOpenApiDataTypeException import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata -import io.github.datacatering.datacaterer.core.model.Constants.{HTTP_HEADER_FIELD_PREFIX, HTTP_PATH_PARAM_FIELD_PREFIX, HTTP_QUERY_PARAM_FIELD_PREFIX, REAL_TIME_BODY_FIELD, REAL_TIME_BODY_CONTENT_FIELD, REAL_TIME_CONTENT_TYPE_FIELD, REAL_TIME_METHOD_FIELD, REAL_TIME_URL_FIELD} +import io.github.datacatering.datacaterer.core.model.Constants.{HTTP_HEADER_FIELD_PREFIX, HTTP_PATH_PARAM_FIELD_PREFIX, HTTP_QUERY_PARAM_FIELD_PREFIX, REAL_TIME_BODY_CONTENT_FIELD, REAL_TIME_BODY_FIELD, REAL_TIME_CONTENT_TYPE_FIELD, REAL_TIME_METHOD_FIELD, REAL_TIME_URL_FIELD} import io.swagger.v3.oas.models.PathItem.HttpMethod import io.swagger.v3.oas.models.media.Schema import io.swagger.v3.oas.models.parameters.Parameter diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/openlineage/OpenLineageMetadata.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/openlineage/OpenLineageMetadata.scala index 437e7d59..70665fe3 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/openlineage/OpenLineageMetadata.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/openlineage/OpenLineageMetadata.scala @@ -1,6 +1,6 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource.openlineage -import io.github.datacatering.datacaterer.api.model.Constants.{DATA_SOURCE_NAME, DEFAULT_FIELD_TYPE, DEFAULT_STEP_NAME, FACET_DATA_SOURCE, FIELD_DATA_TYPE, FIELD_DESCRIPTION, JDBC, JDBC_TABLE, METADATA_IDENTIFIER, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, URI} +import io.github.datacatering.datacaterer.api.model.Constants.{DATA_SOURCE_NAME, DEFAULT_FIELD_TYPE, FACET_DATA_SOURCE, FIELD_DATA_TYPE, FIELD_DESCRIPTION, JDBC, JDBC_TABLE, METADATA_IDENTIFIER, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, URI} import io.github.datacatering.datacaterer.core.exception.{FailedMarquezHttpCallException, InvalidMarquezResponseException} import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata import io.github.datacatering.datacaterer.core.generator.metadata.datasource.{DataSourceMetadata, SubDataSourceMetadata} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala index b7ba64ca..5a51bb84 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala @@ -1,19 +1,17 @@ package io.github.datacatering.datacaterer.core.generator.result import com.fasterxml.jackson.annotation.JsonInclude.Include -import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_GENERATED_REPORTS_FOLDER_PATH, SPECIFIC_DATA_SOURCE_OPTIONS} -import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Field, Plan, Step, Task} +import io.github.datacatering.datacaterer.api.model.Constants.SPECIFIC_DATA_SOURCE_OPTIONS +import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, DataSourceResultSummary, Field, Plan, Step, StepResultSummary, Task, TaskResultSummary, ValidationConfigResult} import io.github.datacatering.datacaterer.core.listener.SparkRecordListener import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_CATERING_SVG, REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_MAIN_CSS, REPORT_RESULT_JSON, REPORT_TASK_HTML, REPORT_VALIDATIONS_HTML} -import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult} import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor import io.github.datacatering.datacaterer.core.util.FileUtil.writeStringToFile import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem import org.apache.log4j.Logger import org.apache.spark.sql.SparkSession -import scala.util.{Failure, Success, Try} import scala.xml.Node class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfiguration) @@ -107,7 +105,7 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig private def resultsAsJson(generationResult: List[DataSourceResult], validationResults: List[ValidationConfigResult]): String = { val resultMap = Map( "generation" -> getGenerationJsonSummary(generationResult), - "validation" -> validationResults.map(_.jsonSummary(validationConfig.numSampleErrorRecords)) + "validation" -> validationResults.map(_.jsonSummary) ) OBJECT_MAPPER.writeValueAsString(resultMap) } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala index de8a8b6b..ba0553df 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala @@ -1,10 +1,10 @@ package io.github.datacatering.datacaterer.core.generator.result import io.github.datacatering.datacaterer.api.model.Constants.HISTOGRAM -import io.github.datacatering.datacaterer.api.model.{FlagsConfig, Plan, Step, Validation, ValidationConfig} +import io.github.datacatering.datacaterer.api.model.{DataSourceResult, DataSourceResultSummary, FlagsConfig, Plan, Step, StepResultSummary, TaskResultSummary, Validation, ValidationConfig, ValidationConfigResult} import io.github.datacatering.datacaterer.core.listener.{SparkRecordListener, SparkTaskRecordSummary} import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_VALIDATIONS_HTML} -import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult} +import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil import io.github.datacatering.datacaterer.core.util.PlanImplicits.CountOps import org.joda.time.DateTime @@ -467,7 +467,15 @@ class ResultHtmlWriter { {keyValueTable(getValidationOptions(validationRes.validation))} - {if (validationRes.isSuccess) "" else keyValueTable(validationRes.sampleErrorValues.get.take(validationConfig.numSampleErrorRecords).map(e => List(e.json)).toList)} + {if (validationRes.isSuccess) { + "" + } else { + keyValueTable( + validationRes.sampleErrorValues.getOrElse(Array()) + .map(e => List(ObjectMapperUtil.jsonObjectMapper.writeValueAsString(e))) + .toList + ) + }} }) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ResultModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ResultModels.scala index a1b70831..0bf1d20f 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ResultModels.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ResultModels.scala @@ -1,10 +1,6 @@ package io.github.datacatering.datacaterer.core.model -import io.github.datacatering.datacaterer.api.model.Constants.DEFAULT_DATA_SOURCE_NAME -import io.github.datacatering.datacaterer.api.model.{Field, Step, Task} -import io.github.datacatering.datacaterer.core.util.ResultWriterUtil.getSuccessSymbol - -import java.time.{Duration, LocalDateTime} +import io.github.datacatering.datacaterer.api.model.{DataSourceResult, ValidationConfigResult} case class PlanRunResults( generationResults: List[DataSourceResult] = List(), @@ -25,67 +21,4 @@ case class PlanRunResults( } } -case class DataSourceResultSummary( - name: String, - numRecords: Long, - isSuccess: Boolean, - dataSourceResults: List[DataSourceResult] - ) - -case class DataSourceResult( - name: String = DEFAULT_DATA_SOURCE_NAME, - task: Task = Task(), - step: Step = Step(), - sinkResult: SinkResult = SinkResult(), - batchNum: Int = 0 - ) { - - def summarise: List[String] = { - val format = sinkResult.format - val isSuccess = getSuccessSymbol(sinkResult.isSuccess) - val numRecords = sinkResult.count.toString - List(name, format, isSuccess, numRecords) - } - - def jsonSummary: Map[String, Any] = { - Map( - "name" -> name, - "options" -> step.options, - "isSuccess" -> sinkResult.isSuccess, - "numRecords" -> sinkResult.count - ) - } -} - -case class TaskResultSummary( - task: Task, - numRecords: Long, - isSuccess: Boolean, - stepResults: List[StepResultSummary] - ) - -case class StepResultSummary( - step: Step, - numRecords: Long, - isSuccess: Boolean, - dataSourceResults: List[DataSourceResult] - ) - -case class SinkResult( - name: String = DEFAULT_DATA_SOURCE_NAME, - format: String = "json", - saveMode: String = "append", - options: Map[String, String] = Map(), - count: Long = -1, - isSuccess: Boolean = true, - sample: Array[String] = Array(), - startTime: LocalDateTime = LocalDateTime.now(), - endTime: LocalDateTime = LocalDateTime.now(), - generatedMetadata: Array[Field] = Array(), - exception: Option[Throwable] = None - ) { - - def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds -} - case class RealTimeSinkResult(result: String = "") diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala index a10c9090..d44dce49 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala @@ -1,114 +1,11 @@ package io.github.datacatering.datacaterer.core.model import io.github.datacatering.datacaterer.api.model.Constants.{AGGREGATION_AVG, AGGREGATION_COUNT, AGGREGATION_MAX, AGGREGATION_MIN, AGGREGATION_STDDEV, AGGREGATION_SUM, SPECIFIC_DATA_SOURCE_OPTIONS} -import io.github.datacatering.datacaterer.api.model.{ExpressionValidation, Validation, ValidationConfiguration} +import io.github.datacatering.datacaterer.api.model.ValidationConfiguration import io.github.datacatering.datacaterer.api.{FieldValidationBuilder, ValidationBuilder} import io.github.datacatering.datacaterer.core.exception.UnsupportedDataValidationAggregateFunctionException -import io.github.datacatering.datacaterer.core.util.ConfigUtil.cleanseOptions -import io.github.datacatering.datacaterer.core.util.ResultWriterUtil.getSuccessSymbol import org.apache.log4j.Logger -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import java.time.{Duration, LocalDateTime} -import scala.collection.mutable -import scala.math.BigDecimal.RoundingMode - -case class ValidationConfigResult( - name: String = "default_validation_result", - description: String = "Validation result for data sources", - dataSourceValidationResults: List[DataSourceValidationResult] = List(), - startTime: LocalDateTime = LocalDateTime.now(), - endTime: LocalDateTime = LocalDateTime.now() - ) { - def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds - - def summarise: List[String] = { - val validationRes = dataSourceValidationResults.flatMap(_.validationResults) - if (validationRes.nonEmpty) { - val (numSuccess, successRate, isSuccess) = baseSummary(validationRes) - val successRateVisual = s"$numSuccess/${validationRes.size} ($successRate%)" - List(name, description, getSuccessSymbol(isSuccess), successRateVisual) - } else List() - } - - def jsonSummary(numErrorSamples: Int): Map[String, Any] = { - val validationRes = dataSourceValidationResults.flatMap(dsv => - dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v)) - ) - if (validationRes.nonEmpty) { - val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3)) - val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => { - val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap - Map( - "dataSourceName" -> res._1, - "options" -> cleanseOptions(res._2), - "validation" -> validationDetails, - "numErrors" -> res._3.numErrors, - "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res._3) - ) - }) - val baseValidationMap = Map( - "name" -> name, - "description" -> description, - "isSuccess" -> isSuccess, - "numSuccess" -> numSuccess, - "numValidations" -> validationRes.size, - "successRate" -> successRate - ) - if (errorMap.nonEmpty) { - baseValidationMap ++ Map("errorValidations" -> errorMap) - } else baseValidationMap - } else Map() - } - - private def getErrorSamplesAsMap(numErrorSamples: Int, res: ValidationResult): Array[Map[String, Any]] = { - def parseValueMap[T](valMap: (String, T)): Map[String, Any] = { - valMap._2 match { - case genericRow: GenericRowWithSchema => - val innerValueMap = genericRow.getValuesMap[T](genericRow.schema.fields.map(_.name)) - Map(valMap._1 -> innerValueMap.flatMap(parseValueMap[T])) - case wrappedArray: mutable.WrappedArray[_] => - Map(valMap._1 -> wrappedArray.map { - case genericRowWithSchema: GenericRowWithSchema => - val innerValueMap = genericRowWithSchema.getValuesMap[T](genericRowWithSchema.schema.fields.map(_.name)) - innerValueMap.flatMap(parseValueMap[T]) - case x => x - }) - case _ => - Map(valMap) - } - } - - res.sampleErrorValues.get.take(numErrorSamples) - .map(row => { - val valuesMap = row.getValuesMap(res.sampleErrorValues.get.columns) - valuesMap.flatMap(valMap => parseValueMap(valMap)) - }) - } - - private def baseSummary(validationRes: List[ValidationResult]): (Int, BigDecimal, Boolean) = { - val validationSuccess = validationRes.map(_.isSuccess) - val numSuccess = validationSuccess.count(x => x) - val successRate = BigDecimal(numSuccess.toDouble / validationRes.size * 100).setScale(2, RoundingMode.HALF_UP) - val isSuccess = validationSuccess.forall(x => x) - (numSuccess, successRate, isSuccess) - } -} - -case class DataSourceValidationResult( - dataSourceName: String = "default_data_source", - options: Map[String, String] = Map(), - validationResults: List[ValidationResult] = List() - ) - -case class ValidationResult( - validation: Validation = ExpressionValidation(), - isSuccess: Boolean = true, - numErrors: Long = 0, - total: Long = 0, - sampleErrorValues: Option[DataFrame] = None - ) trait ExternalDataValidation { val LOGGER: Logger = Logger.getLogger(getClass.getName) @@ -141,12 +38,6 @@ trait ExternalDataValidation { } } -object ValidationResult { - def fromValidationWithBaseResult(validation: Validation, validationResult: ValidationResult): ValidationResult = { - ValidationResult(validation, validationResult.isSuccess, validationResult.numErrors, validationResult.total, validationResult.sampleErrorValues) - } -} - object ValidationConfigurationHelper { private val LOGGER = Logger.getLogger(getClass.getName) @@ -173,9 +64,9 @@ object ValidationConfigurationHelper { //need to match only on the data source specific options (i.e. table name, file path) val genDataSourceOpts = genV.options.filter(o => SPECIFIC_DATA_SOURCE_OPTIONS.contains(o._1)) val optMatchingValidations = currentUserDsValid.find(userV => { - val userDataSourceOpts = userV.options.filter(o => SPECIFIC_DATA_SOURCE_OPTIONS.contains(o._1)) - genDataSourceOpts.forall(genOpt => userDataSourceOpts.get(genOpt._1).contains(genOpt._2)) - }) + val userDataSourceOpts = userV.options.filter(o => SPECIFIC_DATA_SOURCE_OPTIONS.contains(o._1)) + genDataSourceOpts.forall(genOpt => userDataSourceOpts.get(genOpt._1).contains(genOpt._2)) + }) if (optMatchingValidations.isDefined) { LOGGER.debug(s"Found matching data source with manual validations, merging list of validations together, " + diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala index 62bcbf43..e56c012c 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala @@ -1,8 +1,9 @@ package io.github.datacatering.datacaterer.core.plan import io.github.datacatering.datacaterer.api.PlanRun -import io.github.datacatering.datacaterer.api.model.Constants.PLAN_CLASS +import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CATERER_INTERFACE_JAVA, DATA_CATERER_INTERFACE_SCALA, DATA_CATERER_INTERFACE_YAML, PLAN_CLASS} import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, Task, ValidationConfiguration} +import io.github.datacatering.datacaterer.core.activity.PlanRunPrePlanProcessor import io.github.datacatering.datacaterer.core.config.ConfigParser import io.github.datacatering.datacaterer.core.exception.PlanRunClassNotFoundException import io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor @@ -10,7 +11,6 @@ import io.github.datacatering.datacaterer.core.generator.metadata.datasource.Dat import io.github.datacatering.datacaterer.core.model.Constants.METADATA_CONNECTION_OPTIONS import io.github.datacatering.datacaterer.core.model.PlanRunResults import io.github.datacatering.datacaterer.core.parser.PlanParser -import io.github.datacatering.datacaterer.core.ui.model.PlanRunRequest import io.github.datacatering.datacaterer.core.util.SparkProvider import org.apache.spark.sql.SparkSession @@ -18,7 +18,10 @@ import scala.util.{Success, Try} object PlanProcessor { - def determineAndExecutePlan(optPlanRun: Option[PlanRun] = None): PlanRunResults = { + def determineAndExecutePlan( + optPlanRun: Option[PlanRun] = None, + interface: String = DATA_CATERER_INTERFACE_SCALA + ): PlanRunResults = { val optPlanClass = getPlanClass optPlanClass.map(Class.forName) .map(cls => { @@ -26,41 +29,46 @@ object PlanProcessor { val tryScalaPlan = Try(cls.getDeclaredConstructor().newInstance().asInstanceOf[PlanRun]) val tryJavaPlan = Try(cls.getDeclaredConstructor().newInstance().asInstanceOf[io.github.datacatering.datacaterer.javaapi.api.PlanRun]) (tryScalaPlan, tryJavaPlan) match { - case (Success(value), _) => value - case (_, Success(value)) => value.getPlan + case (Success(value), _) => (value, DATA_CATERER_INTERFACE_SCALA) + case (_, Success(value)) => (value.getPlan, DATA_CATERER_INTERFACE_JAVA) case _ => throw PlanRunClassNotFoundException(optPlanClass.get) } }) - .map(executePlan) + .map(p => executePlan(p._1, p._2)) .getOrElse( - optPlanRun.map(executePlan) - .getOrElse(executePlan) + optPlanRun.map(p => executePlan(p, interface)) + .getOrElse(executePlan(interface)) ) } def determineAndExecutePlanJava(planRun: io.github.datacatering.datacaterer.javaapi.api.PlanRun): PlanRunResults = determineAndExecutePlan(Some(planRun.getPlan)) - private def executePlan(planRun: PlanRun): PlanRunResults = { + private def executePlan(planRun: PlanRun, interface: String): PlanRunResults = { val dataCatererConfiguration = planRun._configuration - executePlanWithConfig(dataCatererConfiguration, Some(planRun)) + executePlanWithConfig(dataCatererConfiguration, Some(planRun), interface) } - private def executePlan: PlanRunResults = { + private def executePlan(interface: String): PlanRunResults = { val dataCatererConfiguration = ConfigParser.toDataCatererConfiguration - executePlanWithConfig(dataCatererConfiguration, None) + executePlanWithConfig(dataCatererConfiguration, None, interface) } - private def executePlanWithConfig(dataCatererConfiguration: DataCatererConfiguration, optPlan: Option[PlanRun]): PlanRunResults = { + private def executePlanWithConfig( + dataCatererConfiguration: DataCatererConfiguration, + optPlan: Option[PlanRun], + interface: String + ): PlanRunResults = { val connectionConf = optPlan.map(_._connectionTaskBuilders.flatMap(_.connectionConfigWithTaskBuilder.options).toMap).getOrElse(Map()) implicit val sparkSession: SparkSession = new SparkProvider(dataCatererConfiguration.master, dataCatererConfiguration.runtimeConfig ++ connectionConf).getSparkSession - val planRun = if (optPlan.isDefined) { - optPlan.get + val (planRun, resolvedInterface) = if (optPlan.isDefined) { + (optPlan.get, interface) } else { val (parsedPlan, enabledTasks, validations) = PlanParser.getPlanTasksFromYaml(dataCatererConfiguration) - new YamlPlanRun(parsedPlan, enabledTasks, validations, dataCatererConfiguration) + (new YamlPlanRun(parsedPlan, enabledTasks, validations, dataCatererConfiguration), DATA_CATERER_INTERFACE_YAML) } + applyPrePlanProcessors(planRun, dataCatererConfiguration, resolvedInterface) val optPlanWithTasks = new DataSourceMetadataFactory(dataCatererConfiguration).extractAllDataSourceMetadata(planRun) val dataGeneratorProcessor = new DataGeneratorProcessor(dataCatererConfiguration) @@ -80,6 +88,14 @@ object PlanProcessor { case _ => None } } + + private def applyPrePlanProcessors(planRun: PlanRun, dataCatererConfiguration: DataCatererConfiguration, interface: String): Unit = { + val prePlanProcessors = List(new PlanRunPrePlanProcessor(dataCatererConfiguration)) + + prePlanProcessors.foreach(prePlanProcessor => { + if (prePlanProcessor.enabled) prePlanProcessor.apply(planRun._plan.copy(interface = Some(interface)), planRun._tasks, planRun._validations) + }) + } } class YamlPlanRun( diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PostPlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PostPlanProcessor.scala index e24d38fb..4f3c1331 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PostPlanProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PostPlanProcessor.scala @@ -1,8 +1,7 @@ package io.github.datacatering.datacaterer.core.plan -import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan} +import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, ValidationConfigResult} import io.github.datacatering.datacaterer.core.listener.SparkRecordListener -import io.github.datacatering.datacaterer.core.model.{DataSourceResult, ValidationConfigResult} trait PostPlanProcessor { diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PrePlanProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PrePlanProcessor.scala new file mode 100644 index 00000000..68b4e3f0 --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PrePlanProcessor.scala @@ -0,0 +1,12 @@ +package io.github.datacatering.datacaterer.core.plan + +import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, Task, ValidationConfiguration} + +trait PrePlanProcessor { + + val dataCatererConfiguration: DataCatererConfiguration + val enabled: Boolean + + def apply(plan: Plan, tasks: List[Task], validations: List[ValidationConfiguration]): Unit + +} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala index 0a23e579..8ea11246 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala @@ -2,11 +2,11 @@ package io.github.datacatering.datacaterer.core.sink import com.google.common.util.concurrent.RateLimiter import io.github.datacatering.datacaterer.api.model.Constants.{DELTA, DELTA_LAKE_SPARK_CONF, DRIVER, FORMAT, ICEBERG, ICEBERG_SPARK_CONF, JDBC, OMIT, PARTITIONS, PARTITION_BY, POSTGRES_DRIVER, RATE, ROWS_PER_SECOND, SAVE_MODE, TABLE} -import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, MetadataConfig, Step} +import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, MetadataConfig, SinkResult, Step} +import io.github.datacatering.datacaterer.api.util.ConfigUtil import io.github.datacatering.datacaterer.core.exception.{FailedSaveDataDataFrameV2Exception, FailedSaveDataException} import io.github.datacatering.datacaterer.core.model.Constants.{BATCH, DEFAULT_ROWS_PER_SECOND, FAILED, FINISHED, PER_FIELD_INDEX_FIELD, STARTED} -import io.github.datacatering.datacaterer.core.model.{RealTimeSinkResult, SinkResult} -import io.github.datacatering.datacaterer.core.util.ConfigUtil +import io.github.datacatering.datacaterer.core.model.RealTimeSinkResult import io.github.datacatering.datacaterer.core.util.GeneratorUtil.determineSaveTiming import io.github.datacatering.datacaterer.core.util.MetadataUtil.getFieldMetadata import io.github.datacatering.datacaterer.core.util.ValidationUtil.cleanValidationIdentifier diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkProcessor.scala index 5996b405..092d5a6a 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkProcessor.scala @@ -5,9 +5,8 @@ import io.github.datacatering.datacaterer.api.model.Step import io.github.datacatering.datacaterer.core.exception.UnsupportedRealTimeDataSourceFormat import io.github.datacatering.datacaterer.core.model.RealTimeSinkResult import io.github.datacatering.datacaterer.core.sink.http.HttpSinkProcessor -import io.github.datacatering.datacaterer.core.sink.http.model.HttpResult import io.github.datacatering.datacaterer.core.sink.jms.JmsSinkProcessor -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import java.util.concurrent.LinkedBlockingQueue diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala index 64619b19..bd5a71db 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala @@ -1,6 +1,6 @@ package io.github.datacatering.datacaterer.core.ui.plan -import io.github.datacatering.datacaterer.api.model.Constants.{CONFIG_FLAGS_DELETE_GENERATED_RECORDS, CONFIG_FLAGS_GENERATE_DATA, CONFIG_FLAGS_GENERATE_VALIDATIONS, DEFAULT_MASTER, DEFAULT_RUNTIME_CONFIG, FORMAT, METADATA_SOURCE_NAME} +import io.github.datacatering.datacaterer.api.model.Constants.{CONFIG_FLAGS_DELETE_GENERATED_RECORDS, CONFIG_FLAGS_GENERATE_DATA, CONFIG_FLAGS_GENERATE_VALIDATIONS, DATA_CATERER_INTERFACE_UI, DEFAULT_MASTER, DEFAULT_RUNTIME_CONFIG, FORMAT, METADATA_SOURCE_NAME} import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, Task, ValidationConfiguration, YamlUpstreamDataSourceValidation} import io.github.datacatering.datacaterer.api.{DataCatererConfigurationBuilder, ValidationBuilder} import io.github.datacatering.datacaterer.core.exception.SaveFileException @@ -139,7 +139,7 @@ object PlanRepository { case Success(planAsYaml) => updatePlanRunExecution(planRunExecution, PARSED_PLAN) val runPlanFuture = Future { - PlanProcessor.determineAndExecutePlan(Some(planAsYaml)) + PlanProcessor.determineAndExecutePlan(Some(planAsYaml), DATA_CATERER_INTERFACE_UI) } runPlanFuture.onComplete { diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ManagementUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ManagementUtil.scala new file mode 100644 index 00000000..96918122 --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ManagementUtil.scala @@ -0,0 +1,9 @@ +package io.github.datacatering.datacaterer.core.util + +object ManagementUtil { + + def getDataCatererManagementUrl: String = { + val envVar = System.getenv("DATA_CATERER_MANAGEMENT_URL") + if (envVar == null) "http://localhost:8082" else envVar + } +} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/MetadataUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/MetadataUtil.scala index 9af11c98..52bad39a 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/MetadataUtil.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/MetadataUtil.scala @@ -2,6 +2,7 @@ package io.github.datacatering.datacaterer.core.util import io.github.datacatering.datacaterer.api.model.Constants._ import io.github.datacatering.datacaterer.api.model.{Field, MetadataConfig, Step} +import io.github.datacatering.datacaterer.api.util.ConfigUtil import io.github.datacatering.datacaterer.core.exception.UnsupportedDataFormatForTrackingException import io.github.datacatering.datacaterer.core.generator.metadata.ExpressionPredictor import io.github.datacatering.datacaterer.core.generator.metadata.datasource.DataSourceMetadata @@ -130,11 +131,11 @@ object MetadataUtil { } private def computeFieldStatistics( - sourceData: DataFrame, - dataSourceReadOptions: Map[String, String], - dataSourceName: String, - dataSourceFormat: String - )(implicit sparkSession: SparkSession): Unit = { + sourceData: DataFrame, + dataSourceReadOptions: Map[String, String], + dataSourceName: String, + dataSourceFormat: String + )(implicit sparkSession: SparkSession): Unit = { //have to create temp view then analyze the field stats which can be found in the cached data sourceData.createOrReplaceTempView(TEMP_CACHED_TABLE_NAME) if (!sparkSession.catalog.isCached(TEMP_CACHED_TABLE_NAME)) sparkSession.catalog.cacheTable(TEMP_CACHED_TABLE_NAME) @@ -152,11 +153,11 @@ object MetadataUtil { } def determineIfOneOfField( - sourceData: DataFrame, - fieldName: String, - statisticsMap: Map[String, String], - metadataConfig: MetadataConfig - ): Option[Array[String]] = { + sourceData: DataFrame, + fieldName: String, + statisticsMap: Map[String, String], + metadataConfig: MetadataConfig + ): Option[Array[String]] = { val fieldDataType = sourceData.schema.fields.find(_.name == fieldName).map(_.dataType) val count = statisticsMap(ROW_COUNT).toLong (fieldDataType, count) match { diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ProtobufUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ProtobufUtil.scala index 1eab20cf..ea1d280e 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ProtobufUtil.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/ProtobufUtil.scala @@ -1,14 +1,10 @@ package io.github.datacatering.datacaterer.core.util -import com.google.protobuf.DescriptorProtos.FieldDescriptorProto import com.google.protobuf.Descriptors.FieldDescriptor -import com.google.protobuf.Descriptors.FieldDescriptor.JavaType import com.google.protobuf.{BoolValue, BytesValue, DescriptorProtos, DoubleValue, FloatValue, Int32Value, Int64Value, StringValue, UInt32Value, UInt64Value, WireFormat} import io.github.datacatering.datacaterer.api.model.{ArrayType, BinaryType, BooleanType, DataType, DecimalType, DoubleType, Field, FloatType, IntegerType, LongType, MapType, StringType, StructType, TimestampType} import io.github.datacatering.datacaterer.core.exception.UnsupportedProtobufType import org.apache.log4j.Logger -import org.apache.spark.sql.protobuf.utils.SchemaConverters -import org.apache.spark.sql.types.{DataTypes, StructField} import java.io.{BufferedInputStream, FileInputStream} import scala.collection.JavaConverters.asScalaBufferConverter diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationOperations.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationOperations.scala index d941e627..a6179987 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationOperations.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationOperations.scala @@ -4,19 +4,20 @@ import io.github.datacatering.datacaterer.api.ValidationBuilder import io.github.datacatering.datacaterer.api.model.Constants.{AGGREGATION_COUNT, FORMAT, VALIDATION_FIELD_NAME_COUNT_BETWEEN, VALIDATION_FIELD_NAME_COUNT_EQUAL, VALIDATION_FIELD_NAME_MATCH_ORDER, VALIDATION_FIELD_NAME_MATCH_SET, VALIDATION_PREFIX_JOIN_EXPRESSION, VALIDATION_UNIQUE} import io.github.datacatering.datacaterer.api.model._ import io.github.datacatering.datacaterer.core.exception.{FailedFieldDataValidationException, UnsupportedDataValidationTypeException} -import io.github.datacatering.datacaterer.core.model.ValidationResult import io.github.datacatering.datacaterer.core.validator.ValidationHelper.getValidationType import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions.{col, expr} -import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import scala.collection.mutable import scala.util.{Failure, Success, Try} abstract class ValidationOps(validation: Validation) { private val LOGGER = Logger.getLogger(getClass.getName) - def validate(df: DataFrame, dfCount: Long): List[ValidationResult] + def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int = 10): List[ValidationResult] def filterData(df: DataFrame): DataFrame = { validation.preFilter.map(preFilter => { @@ -32,13 +33,13 @@ abstract class ValidationOps(validation: Validation) { }).getOrElse(df) } - def validateWithExpression(df: DataFrame, dfCount: Long, expression: String): ValidationResult = { + def validateWithExpression(df: DataFrame, dfCount: Long, expression: String, numErrorSamples: Int): ValidationResult = { val notEqualDf = df.where(s"!($expression)") - val (isSuccess, sampleErrors, numErrors) = getIsSuccessAndSampleErrors(notEqualDf, dfCount) + val (isSuccess, sampleErrors, numErrors) = getIsSuccessAndSampleErrors(notEqualDf, dfCount, numErrorSamples) ValidationResult(validation, isSuccess, numErrors, dfCount, sampleErrors) } - private def getIsSuccessAndSampleErrors(notEqualDf: Dataset[Row], dfCount: Long): (Boolean, Option[DataFrame], Long) = { + private def getIsSuccessAndSampleErrors(notEqualDf: Dataset[Row], dfCount: Long, numErrorSamples: Int): (Boolean, Option[Array[Map[String, Any]]], Long) = { val numErrors = notEqualDf.count() val (isSuccess, sampleErrors) = (numErrors, validation.errorThreshold) match { case (c, Some(threshold)) if c > 0 => @@ -48,7 +49,31 @@ abstract class ValidationOps(validation: Validation) { case (c, None) if c > 0 => (false, Some(notEqualDf)) case _ => (true, None) } - (isSuccess, sampleErrors, numErrors) + val errorsAsMap = sampleErrors.map(ds => { + ds.take(numErrorSamples) + .map(r => { + val valuesMap = r.getValuesMap(ds.columns) + valuesMap.flatMap(parseValueMap) + }) + }) + (isSuccess, errorsAsMap, numErrors) + } + + private def parseValueMap[T](valMap: (String, T)): Map[String, Any] = { + valMap._2 match { + case genericRow: GenericRowWithSchema => + val innerValueMap = genericRow.getValuesMap[T](genericRow.schema.fields.map(_.name)) + Map(valMap._1 -> innerValueMap.flatMap(parseValueMap[T])) + case wrappedArray: mutable.WrappedArray[_] => + Map(valMap._1 -> wrappedArray.map { + case genericRowWithSchema: GenericRowWithSchema => + val innerValueMap = genericRowWithSchema.getValuesMap[T](genericRowWithSchema.schema.fields.map(_.name)) + innerValueMap.flatMap(parseValueMap[T]) + case x => x + }) + case _ => + Map(valMap) + } } } @@ -68,7 +93,7 @@ object ValidationHelper { } class FieldValidationsOps(fieldValidations: FieldValidations) extends ValidationOps(fieldValidations) { - override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = { + override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = { val field = fieldValidations.field val build = ValidationBuilder().field(field) fieldValidations.validation.flatMap(v => { @@ -117,8 +142,8 @@ class FieldValidationsOps(fieldValidations: FieldValidations) extends Validation val validationWithPreFilter = v.preFilter.map(f => validationWithDescription.preFilter(f)).getOrElse(validationWithDescription) val tryRunValidation = Try(validationWithPreFilter.validation match { - case e: ExpressionValidation => new ExpressionValidationOps(e).validate(df, dfCount) - case g: GroupByValidation => new GroupByValidationOps(g).validate(df, dfCount) + case e: ExpressionValidation => new ExpressionValidationOps(e).validate(df, dfCount, numErrorSamples) + case g: GroupByValidation => new GroupByValidationOps(g).validate(df, dfCount, numErrorSamples) }) tryRunValidation match { case Success(value) => value @@ -129,15 +154,15 @@ class FieldValidationsOps(fieldValidations: FieldValidations) extends Validation } class ExpressionValidationOps(expressionValidation: ExpressionValidation) extends ValidationOps(expressionValidation) { - override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = { + override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = { //TODO allow for pre-filter? can technically be done via custom sql validation using CASE WHERE ... ELSE true END val dfWithSelectExpr = df.selectExpr(expressionValidation.selectExpr: _*) - List(validateWithExpression(dfWithSelectExpr, dfCount, expressionValidation.expr)) + List(validateWithExpression(dfWithSelectExpr, dfCount, expressionValidation.expr, numErrorSamples)) } } class GroupByValidationOps(groupByValidation: GroupByValidation) extends ValidationOps(groupByValidation) { - override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = { + override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = { //TODO allow for pre and post group filter? val groupByDf = df.groupBy(groupByValidation.groupByFields.map(col): _*) val (aggregateDf, validationCount) = if ((groupByValidation.aggField == VALIDATION_UNIQUE || groupByValidation.aggField.isEmpty) && groupByValidation.aggType == AGGREGATION_COUNT) { @@ -149,7 +174,7 @@ class GroupByValidationOps(groupByValidation: GroupByValidation) extends Validat )) (aggDf, aggDf.count()) } - List(validateWithExpression(aggregateDf, validationCount, groupByValidation.aggExpr)) + List(validateWithExpression(aggregateDf, validationCount, groupByValidation.aggExpr, numErrorSamples)) } } @@ -157,14 +182,14 @@ class UpstreamDataSourceValidationOps( upstreamDataSourceValidation: UpstreamDataSourceValidation, recordTrackingForValidationFolderPath: String ) extends ValidationOps(upstreamDataSourceValidation) { - override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = { + override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = { val upstreamDf = getUpstreamData(df.sparkSession) val joinedDf = getJoinedDf(df, upstreamDf) val joinedCount = joinedDf.count() upstreamDataSourceValidation.validations.flatMap(v => { val baseValidationOp = getValidationType(v.validation, recordTrackingForValidationFolderPath) - val result = baseValidationOp.validate(joinedDf, joinedCount) + val result = baseValidationOp.validate(joinedDf, joinedCount, numErrorSamples) result.map(r => ValidationResult.fromValidationWithBaseResult(upstreamDataSourceValidation, r)) }) } @@ -203,37 +228,34 @@ class FieldNamesValidationOps(fieldNamesValidation: FieldNamesValidation) extend private val LOGGER = Logger.getLogger(getClass.getName) - override def validate(df: DataFrame, dfCount: Long): List[ValidationResult] = { - implicit val stringEncoder: Encoder[CustomErrorSample] = Encoders.kryo[CustomErrorSample] - + override def validate(df: DataFrame, dfCount: Long, numErrorSamples: Int): List[ValidationResult] = { val (isSuccess, errorSamples, total) = fieldNamesValidation.fieldNameType match { case VALIDATION_FIELD_NAME_COUNT_EQUAL => val isEqualLength = df.columns.length == fieldNamesValidation.count - val sample = if (isEqualLength) List() else List(CustomErrorSample(df.columns.length.toString)) + val sample = if (isEqualLength) Array[Map[String, Any]]() else Array(Map[String, Any]("columnLength" -> df.columns.length.toString)) (isEqualLength, sample, 1) case VALIDATION_FIELD_NAME_COUNT_BETWEEN => val colLength = df.columns.length val isBetween = colLength >= fieldNamesValidation.min && colLength <= fieldNamesValidation.max - val sample = if (isBetween) List() else List(CustomErrorSample(colLength.toString)) + val sample = if (isBetween) Array[Map[String, Any]]() else Array(Map[String, Any]("columnLength" -> colLength.toString)) (isBetween, sample, 1) case VALIDATION_FIELD_NAME_MATCH_ORDER => val zippedNames = df.columns.zip(fieldNamesValidation.names).zipWithIndex val misalignedNames = zippedNames.filter(n => n._1._1 != n._1._2) - (misalignedNames.isEmpty, misalignedNames.map(n => CustomErrorSample(s"${n._2}: ${n._1._1} -> ${n._1._2}")).toList, zippedNames.length) + val errorSample = misalignedNames.map(n => Map[String, Any](s"field_index_${n._2}" -> s"${n._1._1} -> ${n._1._2}")) + (misalignedNames.isEmpty, errorSample, zippedNames.length) case VALIDATION_FIELD_NAME_MATCH_SET => - val missingNames = fieldNamesValidation.names.filter(n => !df.columns.contains(n)).map(CustomErrorSample) - (missingNames.isEmpty, missingNames.toList, fieldNamesValidation.names.length) + val missingNames = fieldNamesValidation.names.filter(n => !df.columns.contains(n)) + .map(n => Map("missing_field" -> n)) + .asInstanceOf[Array[Map[String, Any]]] + (missingNames.isEmpty, missingNames, fieldNamesValidation.names.length) case x => LOGGER.error(s"Unknown field name validation type, returning as a failed validation, type=$x") - (false, List(), 1) + (false, Array[Map[String, Any]](), 1) } - val optErrorSample = if (isSuccess) { - None - } else { - Some(df.sparkSession.createDataFrame(errorSamples)) - } - List(ValidationResult(fieldNamesValidation, isSuccess, errorSamples.size, total, optErrorSample)) + val optErrorSample = if (isSuccess) None else Some(errorSamples) + List(ValidationResult(fieldNamesValidation, isSuccess, errorSamples.length, total, optErrorSample)) } } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala index 256937ff..f7242e09 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala @@ -2,9 +2,9 @@ package io.github.datacatering.datacaterer.core.validator import io.github.datacatering.datacaterer.api.ValidationBuilder import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_ENABLE_VALIDATION, DELTA, DELTA_LAKE_SPARK_CONF, ENABLE_DATA_VALIDATION, FORMAT, HTTP, ICEBERG, ICEBERG_SPARK_CONF, JMS, TABLE, VALIDATION_IDENTIFIER} -import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, ExpressionValidation, FoldersConfig, GroupByValidation, UpstreamDataSourceValidation, ValidationConfig, ValidationConfiguration} -import io.github.datacatering.datacaterer.core.model.{DataSourceValidationResult, ValidationConfigResult, ValidationResult} +import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, DataSourceValidationResult, ExpressionValidation, FoldersConfig, GroupByValidation, UpstreamDataSourceValidation, ValidationConfig, ValidationConfigResult, ValidationConfiguration, ValidationResult} import io.github.datacatering.datacaterer.core.parser.PlanParser +import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil import io.github.datacatering.datacaterer.core.util.ValidationUtil.cleanValidationIdentifier import io.github.datacatering.datacaterer.core.validator.ValidationHelper.getValidationType import io.github.datacatering.datacaterer.core.validator.ValidationWaitImplicits.ValidationWaitConditionOps @@ -102,10 +102,10 @@ class ValidationProcessor( val preFilterData = validation.filterData(df) if (!preFilterData.storageLevel.useMemory) preFilterData.cache() val count = preFilterData.count() - Try(validation.validate(preFilterData, count)) match { + Try(validation.validate(preFilterData, count, validationConfig.numSampleErrorRecords)) match { case Failure(exception) => LOGGER.error(s"Failed to run data validation, $validationDescription", exception) - List(ValidationResult(validBuilder.validation, false, count, count, Some(sparkSession.createDataFrame(Seq(CustomErrorSample(exception.getLocalizedMessage)))))) + List(ValidationResult(validBuilder.validation, false, count, count, Some(Array(Map("exception" -> exception.getLocalizedMessage))))) case Success(value) => LOGGER.debug(s"Successfully ran data validation, $validationDescription") value @@ -186,7 +186,9 @@ class ValidationProcessor( case UpstreamDataSourceValidation(_, upstreamDataSource, _, _, _) => ("upstreamDataSource", upstreamDataSource.connectionConfigWithTaskBuilder.dataSourceName) case _ => ("Unknown", "") } - val sampleErrors = validationRes.sampleErrorValues.get.take(validationConfig.numSampleErrorRecords).map(_.json).mkString(",") + val sampleErrors = validationRes.sampleErrorValues.getOrElse(Array()) + .map(ObjectMapperUtil.jsonObjectMapper.writeValueAsString) + .mkString(",") LOGGER.error(s"Failed validation: validation-name=${vcr.name}, description=${vcr.description}, data-source-name=${dsr.dataSourceName}, " + s"data-source-options=${dsr.options}, is-success=${validationRes.isSuccess}, validation-type=$validationType, " + s"check=$validationCheck, sample-errors=$sampleErrors") diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationWaitImplicits.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationWaitImplicits.scala index ea50215b..ea79d897 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationWaitImplicits.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationWaitImplicits.scala @@ -2,8 +2,8 @@ package io.github.datacatering.datacaterer.core.validator import io.github.datacatering.datacaterer.api.model.Constants.FORMAT import io.github.datacatering.datacaterer.api.model.{DataExistsWaitCondition, FileExistsWaitCondition, PauseWaitCondition, WaitCondition, WebhookWaitCondition} +import io.github.datacatering.datacaterer.api.util.ConfigUtil import io.github.datacatering.datacaterer.core.exception.InvalidWaitConditionException -import io.github.datacatering.datacaterer.core.util.ConfigUtil import io.github.datacatering.datacaterer.core.util.HttpUtil.getAuthHeader import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.log4j.Logger diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessorTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessorTest.scala index 548c3173..c0d5f90a 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessorTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/alert/AlertProcessorTest.scala @@ -1,8 +1,7 @@ package io.github.datacatering.datacaterer.core.alert import io.github.datacatering.datacaterer.api.model.Constants.{ALERT_TRIGGER_ON_ALL, ALERT_TRIGGER_ON_FAILURE, ALERT_TRIGGER_ON_GENERATION_FAILURE, ALERT_TRIGGER_ON_GENERATION_SUCCESS, ALERT_TRIGGER_ON_SUCCESS, ALERT_TRIGGER_ON_VALIDATION_FAILURE, ALERT_TRIGGER_ON_VALIDATION_SUCCESS} -import io.github.datacatering.datacaterer.api.model.{AlertConfig, DataCatererConfiguration} -import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceValidationResult, SinkResult, ValidationConfigResult, ValidationResult} +import io.github.datacatering.datacaterer.api.model.{AlertConfig, DataCatererConfiguration, DataSourceResult, DataSourceValidationResult, SinkResult, ValidationConfigResult, ValidationResult} import org.junit.runner.RunWith import org.scalatest.funsuite.AnyFunSuite import org.scalatestplus.junit.JUnitRunner diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/model/ValidationOperationsTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/model/ValidationOperationsTest.scala index 1e64c780..7ca82083 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/model/ValidationOperationsTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/model/ValidationOperationsTest.scala @@ -235,8 +235,8 @@ class ValidationOperationsTest extends SparkSuite { assertResult(1)(result.size) assert(!result.head.isSuccess) assert(result.head.sampleErrorValues.isDefined) - assertResult(2)(result.head.sampleErrorValues.get.count()) - assertResult(2)(result.head.sampleErrorValues.get.filter(r => r.getAs[Double]("amount") >= 100).count()) + assertResult(2)(result.head.sampleErrorValues.get.length) + assertResult(2)(result.head.sampleErrorValues.get.count(r => r("amount").asInstanceOf[Double] >= 100)) } test("Can get sample rows when validation is not successful by error threshold greater than 1") { @@ -246,8 +246,8 @@ class ValidationOperationsTest extends SparkSuite { assertResult(1)(result.size) assert(!result.head.isSuccess) assert(result.head.sampleErrorValues.isDefined) - assertResult(3)(result.head.sampleErrorValues.get.count()) - assertResult(3)(result.head.sampleErrorValues.get.filter(r => r.getAs[Double]("amount") >= 20).count()) + assertResult(3)(result.head.sampleErrorValues.get.length) + assertResult(3)(result.head.sampleErrorValues.get.count(r => r("amount").asInstanceOf[Double] >= 20)) } test("Can get sample rows when validation is not successful by error threshold less than 1") { @@ -257,8 +257,8 @@ class ValidationOperationsTest extends SparkSuite { assertResult(1)(result.size) assert(!result.head.isSuccess) assert(result.head.sampleErrorValues.isDefined) - assertResult(2)(result.head.sampleErrorValues.get.count()) - assertResult(2)(result.head.sampleErrorValues.get.filter(r => r.getAs[Double]("amount") >= 100).count()) + assertResult(2)(result.head.sampleErrorValues.get.length) + assertResult(2)(result.head.sampleErrorValues.get.count(r => r("amount").asInstanceOf[Double] >= 100)) } test("Can check field names count is equal") { @@ -292,7 +292,7 @@ class ValidationOperationsTest extends SparkSuite { assertResult(5)(result.head.total) assertResult(2)(result.head.numErrors) assert(result.head.sampleErrorValues.isDefined) - assertResult(2)(result.head.sampleErrorValues.get.count()) + assertResult(2)(result.head.sampleErrorValues.get.length) } test("Can show error when field name not in set") { @@ -304,6 +304,6 @@ class ValidationOperationsTest extends SparkSuite { assertResult(4)(result.head.total) assertResult(1)(result.head.numErrors) assert(result.head.sampleErrorValues.isDefined) - assertResult(1)(result.head.sampleErrorValues.get.count()) + assertResult(1)(result.head.sampleErrorValues.get.length) } } diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala index 1921d895..2ddc2f32 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala @@ -1,10 +1,9 @@ package io.github.datacatering.datacaterer.core.validator import io.github.datacatering.datacaterer.api.model.Constants.{DELTA, DELTA_LAKE_SPARK_CONF, FORMAT, ICEBERG, ICEBERG_SPARK_CONF, PATH, TABLE} -import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, FoldersConfig, ValidationConfig, ValidationConfiguration} +import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, FoldersConfig, ValidationConfig, ValidationConfigResult, ValidationConfiguration} import io.github.datacatering.datacaterer.api.{PreFilterBuilder, ValidationBuilder} -import io.github.datacatering.datacaterer.core.model.ValidationConfigResult -import io.github.datacatering.datacaterer.core.util.{ObjectMapperUtil, SparkSuite, Transaction} +import io.github.datacatering.datacaterer.core.util.{SparkSuite, Transaction} import org.junit.runner.RunWith import org.scalatestplus.junit.JUnitRunner diff --git a/gradle.properties b/gradle.properties index b30dcd70..22ef1e4b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ groupId=io.github.data-catering -version=0.13.1 +version=0.13.2 scalaVersion=2.12 scalaSpecificVersion=2.12.19