Skip to content

Commit

Permalink
Add in plan results, clean up imports
Browse files Browse the repository at this point in the history
  • Loading branch information
pflooky committed Dec 26, 2024
1 parent 2adfc7b commit 4f8395a
Show file tree
Hide file tree
Showing 43 changed files with 455 additions and 308 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.github.datacatering.datacaterer.api

import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants._
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.connection.{CassandraBuilder, ConnectionTaskBuilder, FileBuilder, HttpBuilder, KafkaBuilder, MySqlBuilder, NoopBuilder, PostgresBuilder, SolaceBuilder}
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants._
import io.github.datacatering.datacaterer.api.model.DataCatererConfiguration

import scala.annotation.varargs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.github.datacatering.datacaterer.api

import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY_ID, CONFLUENT_SCHEMA_REGISTRY_SUBJECT, CONFLUENT_SCHEMA_REGISTRY_VERSION, DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.model.{ConfluentSchemaRegistrySource, DataContractCliSource, GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}

case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadataSource()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package io.github.datacatering.datacaterer.api

import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api
import io.github.datacatering.datacaterer.api.connection.ConnectionTaskBuilder
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaList
import io.github.datacatering.datacaterer.api.model.Constants.METADATA_SOURCE_TYPE
import io.github.datacatering.datacaterer.api.connection.ConnectionTaskBuilder
import io.github.datacatering.datacaterer.api.model.{ForeignKeyRelation, Plan}

import scala.annotation.varargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.github.datacatering.datacaterer.api
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants._
import io.github.datacatering.datacaterer.api.model.{Count, DataType, Field, PerFieldCount, Step, StringType, Task, TaskSummary}
import io.github.datacatering.datacaterer.api.model.{Count, DataType, Field, PerFieldCount, Step, Task, TaskSummary}

import scala.annotation.varargs

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.datacatering.datacaterer.api.model

import Constants._
import io.github.datacatering.datacaterer.api.model.Constants._

case class FlagsConfig(
enableCount: Boolean = DEFAULT_ENABLE_COUNT,
Expand All @@ -14,7 +14,8 @@ case class FlagsConfig(
enableSaveReports: Boolean = DEFAULT_ENABLE_SAVE_REPORTS,
enableValidation: Boolean = DEFAULT_ENABLE_VALIDATION,
enableGenerateValidations: Boolean = DEFAULT_ENABLE_SUGGEST_VALIDATIONS,
enableAlerts: Boolean = DEFAULT_ENABLE_ALERTS
enableAlerts: Boolean = DEFAULT_ENABLE_ALERTS,
enableTrackActivity: Boolean = DEFAULT_ENABLE_TRACK_ACTIVITY
)

case class FoldersConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ object Constants {
lazy val DEFAULT_ENABLE_VALIDATION = true
lazy val DEFAULT_ENABLE_SUGGEST_VALIDATIONS = false
lazy val DEFAULT_ENABLE_ALERTS = true
lazy val DEFAULT_ENABLE_TRACK_ACTIVITY = true

//folders defaults
lazy val DEFAULT_PLAN_FILE_PATH = "/opt/app/plan/customer-create-plan.yaml"
Expand Down Expand Up @@ -552,4 +553,10 @@ object Constants {
lazy val PLAN_RUN_EXECUTION_DELIMITER_REGEX = "\\|\\|"
lazy val PLAN_RUN_SUMMARY_DELIMITER = "&&"

//source of plan run
lazy val DATA_CATERER_INTERFACE_JAVA = "java"
lazy val DATA_CATERER_INTERFACE_SCALA = "scala"
lazy val DATA_CATERER_INTERFACE_UI = "ui"
lazy val DATA_CATERER_INTERFACE_YAML = "yaml"

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.datacatering.datacaterer.api.model

import Constants.{CONFLUENT_SCHEMA_REGISTRY, DATA_CONTRACT_CLI, GREAT_EXPECTATIONS, MARQUEZ, METADATA_SOURCE_HAS_OPEN_LINEAGE_SUPPORT, METADATA_SOURCE_TYPE, OPEN_API, OPEN_DATA_CONTRACT_STANDARD, OPEN_METADATA}
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY, DATA_CONTRACT_CLI, GREAT_EXPECTATIONS, MARQUEZ, METADATA_SOURCE_HAS_OPEN_LINEAGE_SUPPORT, METADATA_SOURCE_TYPE, OPEN_API, OPEN_DATA_CONTRACT_STANDARD, OPEN_METADATA}

trait MetadataSource {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.datacatering.datacaterer.api.model

import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD_NAME, DEFAULT_FIELD_NULLABLE, DEFAULT_FIELD_TYPE, DEFAULT_GENERATOR_TYPE, DEFAULT_PER_FIELD_COUNT_RECORDS, DEFAULT_STEP_ENABLED, DEFAULT_STEP_NAME, DEFAULT_STEP_TYPE, DEFAULT_TASK_NAME, DEFAULT_TASK_SUMMARY_ENABLE, FOREIGN_KEY_DELIMITER}
import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD_NAME, DEFAULT_FIELD_NULLABLE, DEFAULT_FIELD_TYPE, DEFAULT_PER_FIELD_COUNT_RECORDS, DEFAULT_STEP_ENABLED, DEFAULT_STEP_NAME, DEFAULT_STEP_TYPE, DEFAULT_TASK_NAME, DEFAULT_TASK_SUMMARY_ENABLE, FOREIGN_KEY_DELIMITER}

import scala.language.implicitConversions

Expand All @@ -11,7 +11,8 @@ case class Plan(
tasks: List[TaskSummary] = List(),
sinkOptions: Option[SinkOptions] = None,
validations: List[String] = List(),
runId: Option[String] = None
runId: Option[String] = None,
interface: Option[String] = None
)

case class SinkOptions(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package io.github.datacatering.datacaterer.api.model

import io.github.datacatering.datacaterer.api.model.Constants.DEFAULT_DATA_SOURCE_NAME
import io.github.datacatering.datacaterer.api.util.ConfigUtil.cleanseOptions
import io.github.datacatering.datacaterer.api.util.ResultWriterUtil.getSuccessSymbol

import java.time.{Duration, LocalDateTime}
import scala.math.BigDecimal.RoundingMode


case class DataSourceResultSummary(
name: String,
numRecords: Long,
isSuccess: Boolean,
dataSourceResults: List[DataSourceResult]
)

case class DataSourceResult(
name: String = DEFAULT_DATA_SOURCE_NAME,
task: Task = Task(),
step: Step = Step(),
sinkResult: SinkResult = SinkResult(),
batchNum: Int = 0
) {

def summarise: List[String] = {
val format = sinkResult.format
val isSuccess = getSuccessSymbol(sinkResult.isSuccess)
val numRecords = sinkResult.count.toString
List(name, format, isSuccess, numRecords)
}

def jsonSummary: Map[String, Any] = {
Map(
"name" -> name,
"options" -> step.options,
"isSuccess" -> sinkResult.isSuccess,
"numRecords" -> sinkResult.count
)
}
}

case class TaskResultSummary(
task: Task,
numRecords: Long,
isSuccess: Boolean,
stepResults: List[StepResultSummary]
)

case class StepResultSummary(
step: Step,
numRecords: Long,
isSuccess: Boolean,
dataSourceResults: List[DataSourceResult]
)

case class SinkResult(
name: String = DEFAULT_DATA_SOURCE_NAME,
format: String = "json",
saveMode: String = "append",
options: Map[String, String] = Map(),
count: Long = -1,
isSuccess: Boolean = true,
sample: Array[String] = Array(),
startTime: LocalDateTime = LocalDateTime.now(),
endTime: LocalDateTime = LocalDateTime.now(),
generatedMetadata: Array[Field] = Array(),
exception: Option[Throwable] = None
) {

def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds
}


case class ValidationConfigResult(
name: String = "default_validation_result",
description: String = "Validation result for data sources",
dataSourceValidationResults: List[DataSourceValidationResult] = List(),
startTime: LocalDateTime = LocalDateTime.now(),
endTime: LocalDateTime = LocalDateTime.now()
) {
def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds

def summarise: List[String] = {
val validationRes = dataSourceValidationResults.flatMap(_.validationResults)
if (validationRes.nonEmpty) {
val (numSuccess, successRate, isSuccess) = baseSummary(validationRes)
val successRateVisual = s"$numSuccess/${validationRes.size} ($successRate%)"
List(name, description, getSuccessSymbol(isSuccess), successRateVisual)
} else List()
}

def jsonSummary: Map[String, Any] = {
val validationRes = dataSourceValidationResults.flatMap(dsv =>
dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v))
)
if (validationRes.nonEmpty) {
val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3))
val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => {
val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap
Map(
"dataSourceName" -> res._1,
"options" -> cleanseOptions(res._2),
"validation" -> validationDetails,
"numErrors" -> res._3.numErrors,
"sampleErrorValues" -> res._3.sampleErrorValues.getOrElse(Array())
)
})
val baseValidationMap = Map(
"name" -> name,
"description" -> description,
"isSuccess" -> isSuccess,
"numSuccess" -> numSuccess,
"numValidations" -> validationRes.size,
"successRate" -> successRate
)
if (errorMap.nonEmpty) {
baseValidationMap ++ Map("errorValidations" -> errorMap)
} else baseValidationMap
} else Map()
}

private def baseSummary(validationRes: List[ValidationResult]): (Int, BigDecimal, Boolean) = {
val validationSuccess = validationRes.map(_.isSuccess)
val numSuccess = validationSuccess.count(x => x)
val successRate = BigDecimal(numSuccess.toDouble / validationRes.size * 100).setScale(2, RoundingMode.HALF_UP)
val isSuccess = validationSuccess.forall(x => x)
(numSuccess, successRate, isSuccess)
}
}

case class DataSourceValidationResult(
dataSourceName: String = "default_data_source",
options: Map[String, String] = Map(),
validationResults: List[ValidationResult] = List()
)

case class ValidationResult(
validation: Validation = ExpressionValidation(),
isSuccess: Boolean = true,
numErrors: Long = 0,
total: Long = 0,
sampleErrorValues: Option[Array[Map[String, Any]]] = None
)

object ValidationResult {
def fromValidationWithBaseResult(validation: Validation, validationResult: ValidationResult): ValidationResult = {
ValidationResult(validation, validationResult.isSuccess, validationResult.numErrors, validationResult.total, validationResult.sampleErrorValues)
}
}

case class PlanResults(
plan: Plan,
generationResult: List[DataSourceResult],
validationResults: List[ValidationConfigResult]
)

case class PlanRunSummary(
plan: Plan,
tasks: List[Task],
validations: List[ValidationConfiguration]
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.datacatering.datacaterer.core.util
package io.github.datacatering.datacaterer.api.util

object ConfigUtil {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.datacatering.datacaterer.core.util
package io.github.datacatering.datacaterer.api.util

object ResultWriterUtil {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.github.datacatering.datacaterer.core.activity

import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, PlanResults, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor
import io.github.datacatering.datacaterer.core.util.ManagementUtil.getDataCatererManagementUrl
import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
import org.apache.log4j.Logger
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.Dsl.asyncHttpClient

import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

class PlanRunPostPlanProcessor(val dataCatererConfiguration: DataCatererConfiguration) extends PostPlanProcessor {

override val enabled: Boolean = dataCatererConfiguration.flagsConfig.enableTrackActivity

private val LOGGER = Logger.getLogger(getClass.getName)
private val dataCatererManagementUrl = getDataCatererManagementUrl
private val http: AsyncHttpClient = asyncHttpClient

override def apply(
plan: Plan,
sparkRecordListener: SparkRecordListener,
generationResult: List[DataSourceResult],
validationResults: List[ValidationConfigResult]
): Unit = {
val planResults = PlanResults(plan, generationResult, validationResults)
val jsonBody = ObjectMapperUtil.jsonObjectMapper.writeValueAsString(planResults)
val url = s"$dataCatererManagementUrl/plan/finish"
val prepareRequest = http.prepare("POST", url)
.setBody(jsonBody)

val futureResp = prepareRequest.execute().toCompletableFuture.toScala
futureResp.onComplete {
case Success(_) =>
LOGGER.debug(s"Successfully posted run results, url=$url")
http.close()
case Failure(exception) =>
LOGGER.debug(s"Failed to post run results, url=$url", exception)
http.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.github.datacatering.datacaterer.core.activity

import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, PlanRunSummary, Task, ValidationConfiguration}
import io.github.datacatering.datacaterer.core.plan.PrePlanProcessor
import io.github.datacatering.datacaterer.core.util.ManagementUtil.getDataCatererManagementUrl
import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
import org.apache.log4j.Logger
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.Dsl.asyncHttpClient

import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

class PlanRunPrePlanProcessor(val dataCatererConfiguration: DataCatererConfiguration) extends PrePlanProcessor {

override val enabled: Boolean = dataCatererConfiguration.flagsConfig.enableTrackActivity

private val LOGGER = Logger.getLogger(getClass.getName)
private val dataCatererManagementUrl = getDataCatererManagementUrl
private val http: AsyncHttpClient = asyncHttpClient

override def apply(
plan: Plan,
tasks: List[Task],
validations: List[ValidationConfiguration]
): Unit = {
val planRunSummary = PlanRunSummary(plan, tasks, validations)
val jsonBody = ObjectMapperUtil.jsonObjectMapper.writeValueAsString(planRunSummary)
val url = s"$dataCatererManagementUrl/plan/start"
val prepareRequest = http.prepare("POST", url)
.setBody(jsonBody)

val futureResp = prepareRequest.execute().toCompletableFuture.toScala
futureResp.onComplete {
case Success(_) =>
LOGGER.debug(s"Successfully posted run start, url=$url")
http.close()
case Failure(exception) =>
LOGGER.debug(s"Failed to post run start, url=$url", exception)
http.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.github.datacatering.datacaterer.core.alert

import io.github.datacatering.datacaterer.api.model.Constants.{ALERT_TRIGGER_ON_ALL, ALERT_TRIGGER_ON_FAILURE, ALERT_TRIGGER_ON_GENERATION_FAILURE, ALERT_TRIGGER_ON_GENERATION_SUCCESS, ALERT_TRIGGER_ON_SUCCESS, ALERT_TRIGGER_ON_VALIDATION_FAILURE, ALERT_TRIGGER_ON_VALIDATION_SUCCESS}
import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan}
import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, DataSourceResult, Plan, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
import io.github.datacatering.datacaterer.core.model.{DataSourceResult, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor
import org.apache.log4j.Logger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package io.github.datacatering.datacaterer.core.alert
import com.slack.api.Slack
import com.slack.api.methods.MethodsClient
import com.slack.api.methods.request.chat.ChatPostMessageRequest
import io.github.datacatering.datacaterer.api.model.SlackAlertConfig
import io.github.datacatering.datacaterer.api.model.{DataSourceResult, SlackAlertConfig, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.model.Constants.REPORT_HOME_HTML
import io.github.datacatering.datacaterer.core.model.{DataSourceResult, ValidationConfigResult}
import org.apache.log4j.Logger

import scala.util.{Failure, Success, Try}
Expand Down
Loading

0 comments on commit 4f8395a

Please sign in to comment.