Skip to content

Commit

Permalink
Add in Java API for validations, add in matches list field validation…
Browse files Browse the repository at this point in the history
…, add test for reading all validations from YAML, clearer exception message when validating list of validations
  • Loading branch information
pflooky committed Dec 26, 2024
1 parent 09b044c commit 83a898f
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 48 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ object Constants {
lazy val VALIDATION_BETWEEN = "between"
lazy val VALIDATION_IN = "in"
lazy val VALIDATION_MATCHES = "matches"
lazy val VALIDATION_MATCHES_LIST = "matchesList"
lazy val VALIDATION_STARTS_WITH = "startsWith"
lazy val VALIDATION_ENDS_WITH = "endsWith"
lazy val VALIDATION_SIZE = "size"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ case class FieldValidations(
new Type(value = classOf[BetweenFieldValidation], name = "between"),
new Type(value = classOf[InFieldValidation], name = "in"),
new Type(value = classOf[MatchesFieldValidation], name = "matches"),
new Type(value = classOf[MatchesListFieldValidation], name = "matchesList"),
new Type(value = classOf[StartsWithFieldValidation], name = "startsWith"),
new Type(value = classOf[EndsWithFieldValidation], name = "endsWith"),
new Type(value = classOf[SizeFieldValidation], name = "size"),
Expand Down Expand Up @@ -215,6 +216,10 @@ case class MatchesFieldValidation(regex: String, negate: Boolean = false) extend
override val `type`: String = VALIDATION_MATCHES
}

case class MatchesListFieldValidation(regexes: List[String], matchAll: Boolean = true, negate: Boolean = false) extends FieldValidation {
override val `type`: String = VALIDATION_MATCHES_LIST
}

case class StartsWithFieldValidation(value: String, negate: Boolean = false) extends FieldValidation {
override val `type`: String = VALIDATION_STARTS_WITH
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.datacatering.datacaterer.core.exception

import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, FOREIGN_KEY_DELIMITER, FOREIGN_KEY_PLAN_FILE_DELIMITER}
import io.github.datacatering.datacaterer.api.model.{Count, Field, Step}
import io.github.datacatering.datacaterer.api.model.{Count, Field, Step, Validation}
import org.apache.spark.sql.types.{DataType, Metadata, StructField}

//files
Expand Down Expand Up @@ -94,6 +94,11 @@ case class DataValidationMissingUpstreamDataSourceException(name: String) extend
s"Failed to find upstream data source configuration, data-source-name=$name"
)

case class FailedFieldDataValidationException(field: String, validation: Validation, throwable: Throwable) extends RuntimeException(
s"Failed to run data validation, field-name=$field, validation=$validation",
throwable
)

//delete data
case class UnsupportedJdbcDeleteDataType(dataType: DataType, table: String) extends RuntimeException(
s"Unsupported data type for deleting from JDBC data source: type=$dataType, table=$table"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package io.github.datacatering.datacaterer.core.validator

import io.github.datacatering.datacaterer.api.ValidationBuilder
import io.github.datacatering.datacaterer.api.model.Constants.{AGGREGATION_COUNT, FORMAT, VALIDATION_FIELD_NAME_COUNT_BETWEEN, VALIDATION_FIELD_NAME_COUNT_EQUAL, VALIDATION_FIELD_NAME_MATCH_ORDER, VALIDATION_FIELD_NAME_MATCH_SET, VALIDATION_PREFIX_JOIN_EXPRESSION, VALIDATION_UNIQUE}
import io.github.datacatering.datacaterer.api.model.{BetweenFieldValidation, ContainsFieldValidation, DistinctContainsSetFieldValidation, DistinctEqualFieldValidation, DistinctInSetFieldValidation, EndsWithFieldValidation, EqualFieldValidation, ExpressionValidation, FieldNamesValidation, FieldValidations, GreaterThanFieldValidation, GreaterThanSizeFieldValidation, GroupByValidation, HasTypeFieldValidation, HasTypesFieldValidation, InFieldValidation, IsDecreasingFieldValidation, IsIncreasingFieldValidation, IsJsonParsableFieldValidation, LengthBetweenFieldValidation, LengthEqualFieldValidation, LessThanFieldValidation, LessThanSizeFieldValidation, LuhnCheckFieldValidation, MatchDateTimeFormatFieldValidation, MatchJsonSchemaFieldValidation, MatchesFieldValidation, MaxBetweenFieldValidation, MeanBetweenFieldValidation, MedianBetweenFieldValidation, MinBetweenFieldValidation, MostCommonValueInSetFieldValidation, NullFieldValidation, QuantileValuesBetweenFieldValidation, SizeFieldValidation, StartsWithFieldValidation, StdDevBetweenFieldValidation, SumBetweenFieldValidation, UniqueFieldValidation, UniqueValuesProportionBetweenFieldValidation, UpstreamDataSourceValidation, Validation, YamlUpstreamDataSourceValidation}
import io.github.datacatering.datacaterer.core.exception.UnsupportedDataValidationTypeException
import io.github.datacatering.datacaterer.api.model._
import io.github.datacatering.datacaterer.core.exception.{FailedFieldDataValidationException, UnsupportedDataValidationTypeException}
import io.github.datacatering.datacaterer.core.model.ValidationResult
import io.github.datacatering.datacaterer.core.validator.ValidationHelper.getValidationType
import org.apache.log4j.Logger
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession}

import scala.util.{Failure, Success, Try}

abstract class ValidationOps(validation: Validation) {

private val LOGGER = Logger.getLogger(getClass.getName)
Expand Down Expand Up @@ -80,6 +82,7 @@ class FieldValidationsOps(fieldValidations: FieldValidations) extends Validation
case BetweenFieldValidation(min, max, negate) => build.between(min, max, negate)
case InFieldValidation(values, negate) => build.in(values, negate)
case MatchesFieldValidation(regex, negate) => build.matches(regex, negate)
case MatchesListFieldValidation(regexes, matchAll, negate) => build.matchesList(regexes, matchAll, negate)
case StartsWithFieldValidation(value, negate) => build.startsWith(value, negate)
case EndsWithFieldValidation(value, negate) => build.endsWith(value, negate)
case SizeFieldValidation(size, negate) => build.size(size, negate)
Expand Down Expand Up @@ -113,9 +116,13 @@ class FieldValidationsOps(fieldValidations: FieldValidations) extends Validation
val validationWithDescription = v.description.map(d => baseValidation.description(d)).getOrElse(validationWithThreshold)
val validationWithPreFilter = v.preFilter.map(f => validationWithDescription.preFilter(f)).getOrElse(validationWithDescription)

validationWithPreFilter.validation match {
val tryRunValidation = Try(validationWithPreFilter.validation match {
case e: ExpressionValidation => new ExpressionValidationOps(e).validate(df, dfCount)
case g: GroupByValidation => new GroupByValidationOps(g).validate(df, dfCount)
})
tryRunValidation match {
case Success(value) => value
case Failure(exception) => throw FailedFieldDataValidationException(field, validationWithPreFilter.validation, exception)
}
})
}
Expand Down
126 changes: 125 additions & 1 deletion app/src/test/resources/sample/validation/json/json-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,128 @@ dataSources:
- options:
path: "/tmp/yaml-validation-json-test"
validations:
- expr: "STARTSWITH(transaction_id, 'txn')"
- expr: "STARTSWITH(transaction_id, 'txn')"
- field: name
validation:
- type: "equal"
value: 2021
negate: true
- type: "null"
negate: true
- type: "contains"
value: "peter"
negate: true
- type: "in"
values: [ "open", "closed" ]
negate: true
- type: "matches"
regex: "ACC[0-9]{8}"
negate: true
- type: "matchesList"
regexes: [ "ACC[0-9]{8}", "ACC[0-9]{10}" ]
matchAll: true
negate: true
- type: "startsWith"
value: "ACC"
negate: true
- type: "endsWith"
value: "ACC"
negate: true
- type: "luhnCheck"
negate: true
- type: "hasType"
value: "string"
negate: true
- type: "hasType"
values: [ "string", "double" ]
negate: true
- type: "distinctInSet"
values: [ "peter", "john" ]
negate: true
- type: "distinctContainsSet"
values: [ "peter", "john" ]
negate: true
- type: "distinctEqual"
values: [ "peter", "john" ]
negate: true
- type: "lengthBetween"
min: 1
max: 10
negate: true
- type: "lengthEqual"
value: 5
negate: true
- type: "isJsonParsable"
negate: true
- type: "matchJsonSchema"
schema: "id STRING, amount DOUBLE"
negate: true
- field: created_date
validation:
- type: "matchDateTimeFormat"
format: "yyyy-MM-dd"
negate: true
- field: links
validation:
- type: "size"
value: 5
negate: true
- type: "lessThanSize"
value: 5
strictly: false
- type: "greaterThanSize"
value: 5
strictly: false
- field: amount
validation:
- type: "lessThan"
value: 100
strictly: false
- type: "greaterThan"
value: 100
strictly: false
- type: "between"
min: 100
max: 200
negate: true
- type: "maxBetween"
min: 1
max: 100
negate: true
- type: "meanBetween"
min: 1
max: 100
negate: true
- type: "medianBetween"
min: 1
max: 100
negate: true
- type: "minBetween"
min: 1
max: 100
negate: true
- type: "stdDevBetween"
min: 1
max: 100
negate: true
- type: "sumBetween"
min: 1
max: 100
negate: true
- type: "isDecreasing"
strictly: false
- type: "isIncreasing"
strictly: false
- type: "mostCommonValueInSet"
values: [ "200.0", "100.0" ]
negate: true
- type: "uniqueValuesProportionBetween"
min: 0.1
max: 0.3
negate: true
- type: "quantileValuesBetween"
quantileRanges:
"0.1":
- 1.0
- 10.0
negate: true
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ dataSources:
max: 10
description: "hello"
errorThreshold: 2
- type: quantileValuesBetween
quantileRanges:
0.1:
- - 1.0
- 10.0
- expr: "amount < 100"
- expr: "year == 2021"
errorThreshold: 0.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.datacatering.datacaterer.core.model

import io.github.datacatering.datacaterer.api.ValidationBuilder
import io.github.datacatering.datacaterer.api.model.{DistinctContainsSetFieldValidation, DistinctEqualFieldValidation, DistinctInSetFieldValidation, ExpressionValidation, FieldNamesValidation, FieldValidations, IsDecreasingFieldValidation, IsIncreasingFieldValidation, IsJsonParsableFieldValidation, LengthBetweenFieldValidation, LengthEqualFieldValidation, MatchDateTimeFormatFieldValidation, MatchJsonSchemaFieldValidation, MaxBetweenFieldValidation, MeanBetweenFieldValidation, MedianBetweenFieldValidation, MinBetweenFieldValidation, MostCommonValueInSetFieldValidation, QuantileValuesBetweenFieldValidation, StdDevBetweenFieldValidation, SumBetweenFieldValidation, UniqueValuesProportionBetweenFieldValidation}
import io.github.datacatering.datacaterer.api.model.{DistinctContainsSetFieldValidation, DistinctEqualFieldValidation, DistinctInSetFieldValidation, ExpressionValidation, FieldNamesValidation, FieldValidations, IsDecreasingFieldValidation, IsIncreasingFieldValidation, IsJsonParsableFieldValidation, LengthBetweenFieldValidation, LengthEqualFieldValidation, MatchDateTimeFormatFieldValidation, MatchJsonSchemaFieldValidation, MatchesListFieldValidation, MaxBetweenFieldValidation, MeanBetweenFieldValidation, MedianBetweenFieldValidation, MinBetweenFieldValidation, MostCommonValueInSetFieldValidation, QuantileValuesBetweenFieldValidation, StdDevBetweenFieldValidation, SumBetweenFieldValidation, UniqueValuesProportionBetweenFieldValidation}
import io.github.datacatering.datacaterer.core.util.{SparkSuite, Transaction}
import io.github.datacatering.datacaterer.core.validator.{ExpressionValidationOps, FieldNamesValidationOps, FieldValidationsOps}
import org.junit.runner.RunWith
Expand Down Expand Up @@ -38,6 +38,15 @@ class ValidationOperationsTest extends SparkSuite {
assert(result.head.sampleErrorValues.isEmpty)
}

test("Can define matches list field validation") {
val validation = MatchesListFieldValidation(List("peter"))
val result = new FieldValidationsOps(FieldValidations("name", List(validation))).validate(df, 4)

assertResult(1)(result.size)
assert(result.head.isSuccess)
assert(result.head.sampleErrorValues.isEmpty)
}

test("Can define distinct in set field validation") {
val validation = DistinctInSetFieldValidation(List("peter"))
val result = new FieldValidationsOps(FieldValidations("name", List(validation))).validate(df, 4)
Expand Down Expand Up @@ -253,7 +262,7 @@ class ValidationOperationsTest extends SparkSuite {
}

test("Can check field names count is equal") {
val validation = new ValidationBuilder().fieldNames.countEqual(5).validation.asInstanceOf[FieldNamesValidation]
val validation = new ValidationBuilder().fieldNames.countEqual(6).validation.asInstanceOf[FieldNamesValidation]
val result = new FieldNamesValidationOps(validation).validate(df, 4)

assertResult(1)(result.size)
Expand All @@ -264,7 +273,7 @@ class ValidationOperationsTest extends SparkSuite {
}

test("Can check field names count is between") {
val validation = new ValidationBuilder().fieldNames.countBetween(3, 5).validation.asInstanceOf[FieldNamesValidation]
val validation = new ValidationBuilder().fieldNames.countBetween(3, 6).validation.asInstanceOf[FieldNamesValidation]
val result = new FieldNamesValidationOps(validation).validate(df, 4)

assertResult(1)(result.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PlanProcessorTest extends SparkSuite {
assert(csvData.forall(r => r.getAs[String]("time").substring(0, 10) == r.getAs[String]("date")))
}

ignore("Write YAML for plan") {
test("Write YAML for plan") {
val docPlanRun = new TestValidation()
val planWrite = ObjectMapperUtil.yamlObjectMapper.writeValueAsString(docPlanRun._validations)
println(planWrite)
Expand Down Expand Up @@ -385,6 +385,7 @@ class PlanProcessorTest extends SparkSuite {
.count(count.records(10).recordsPerField(3, "account_id"))
.validations(
validation.field("account_id").isNull(true),
validation.field("amount").quantileValuesBetween(Map(0.1 -> (1.0 -> 10.0))),
validation.groupBy("account_id").count().isEqual(1),
validation.fieldNames.countEqual(3),
validation.fieldNames.countBetween(1, 2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SinkFactoryTest extends SparkSuite {
ignore("Can overwrite existing Iceberg data") {
sparkSession.sql("DELETE FROM iceberg.account.transactions_overwrite").count()
val sinkFactory = new SinkFactory(FlagsConfig(), MetadataConfig(), FoldersConfig())
val options = Map(FORMAT -> ICEBERG, TABLE -> "account.transactions_overwrite", PATH -> "/tmp/iceberg-test")
val options = Map(FORMAT -> ICEBERG, TABLE -> "account.transactions_overwrite", PATH -> "/tmp/iceberg-test-overwrite")
val step = Step(options = options)
val existingDataRes = sinkFactory.pushToSink(df, "iceberg-data-source", step, LocalDateTime.now())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,4 @@ class ForeignKeyUtilTest extends SparkSuite {

case class Account(account_id: String = "acc123", name: String = "peter", open_date: Date = Date.valueOf("2023-01-31"), age: Int = 10, debitCredit: String = "D")

case class Transaction(account_id: String, name: String, transaction_id: String, created_date: Date, amount: Double)
case class Transaction(account_id: String, name: String, transaction_id: String, created_date: Date, amount: Double, links: List[String] = List())
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.github.datacatering.datacaterer.api.model.Constants.{DELTA, DELTA_LAKE
import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, FoldersConfig, ValidationConfig, ValidationConfiguration}
import io.github.datacatering.datacaterer.api.{PreFilterBuilder, ValidationBuilder}
import io.github.datacatering.datacaterer.core.model.ValidationConfigResult
import io.github.datacatering.datacaterer.core.util.{SparkSuite, Transaction}
import io.github.datacatering.datacaterer.core.util.{ObjectMapperUtil, SparkSuite, Transaction}
import org.junit.runner.RunWith
import org.scalatestplus.junit.JUnitRunner

Expand Down Expand Up @@ -81,8 +81,7 @@ class ValidationProcessorTest extends SparkSuite {
ValidationConfig(),
FoldersConfig(validationFolderPath = "src/test/resources/sample/validation/json")
)
val result = validationProcessor.executeValidations
validateResult(result)
validationProcessor.executeValidations
}

private def setupValidationProcessor(connectingConfig: Map[String, String]): ValidationProcessor = {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
groupId=io.github.data-catering
version=0.13.0
version=0.13.1

scalaVersion=2.12
scalaSpecificVersion=2.12.19
Expand Down
7 changes: 7 additions & 0 deletions misc/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 0.13.1

- Clear exception message on which validation failed to run when running list of validations
- Add test for all field validations being read from YAML
- Create ValidationBuilders for Java API
- Add in missing Matches List field validation

## 0.13.0

Following major changes to API were made:
Expand Down

0 comments on commit 83a898f

Please sign in to comment.