Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in data validation for HTTP requests and responses #83

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.datacatering.datacaterer.api

import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.ValidationHelper.cleanColumnName
import io.github.datacatering.datacaterer.api.connection.{ConnectionTaskBuilder, FileBuilder}
import io.github.datacatering.datacaterer.api.model.ConditionType.ConditionType
import io.github.datacatering.datacaterer.api.model.Constants.{AGGREGATION_AVG, AGGREGATION_COUNT, AGGREGATION_MAX, AGGREGATION_MIN, AGGREGATION_STDDEV, AGGREGATION_SUM, DEFAULT_VALIDATION_JOIN_TYPE, DEFAULT_VALIDATION_WEBHOOK_HTTP_DATA_SOURCE_NAME, VALIDATION_COLUMN_NAME_COUNT_BETWEEN, VALIDATION_COLUMN_NAME_COUNT_EQUAL, VALIDATION_COLUMN_NAME_MATCH_ORDER, VALIDATION_COLUMN_NAME_MATCH_SET, VALIDATION_PREFIX_JOIN_EXPRESSION, VALIDATION_UNIQUE}
Expand Down Expand Up @@ -164,7 +165,7 @@ case class ValidationBuilder(validation: Validation = ExpressionValidation(), op
* @return ColumnValidationBuilder
*/
def col(column: String): ColumnValidationBuilder = {
ColumnValidationBuilder(this, column)
ColumnValidationBuilder(this, cleanColumnName(column))
}

/**
Expand Down Expand Up @@ -932,4 +933,8 @@ case class CombinationPreFilterBuilder(
}.mkString(" ")
)
}
}
}

object ValidationHelper {
def cleanColumnName(column: String): String = column.split("\\.").map(c => s"`$c`").mkString(".")
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object Constants {
lazy val STATIC = "static"
lazy val CLUSTERING_POSITION = "clusteringPos"
lazy val METADATA_IDENTIFIER = "metadataIdentifier"
lazy val VALIDATION_IDENTIFIER = "validationIdentifier"
lazy val FIELD_LABEL = "label"
lazy val IS_PII = "isPII"
lazy val HTTP_PARAMETER_TYPE = "httpParamType"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD
import scala.language.implicitConversions

case class Plan(
name: String = "Default plan",
name: String = "default_plan",
description: String = "Data generation plan",
tasks: List[TaskSummary] = List(),
sinkOptions: Option[SinkOptions] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
test("Can create basic configuration with defaults") {
val result = DataCatererConfigurationBuilder().build

assert(result.flagsConfig == FlagsConfig())
assert(result.foldersConfig == FoldersConfig())
assert(result.metadataConfig == MetadataConfig())
assert(result.generationConfig == GenerationConfig())
assertResult(FlagsConfig())(result.flagsConfig)
assertResult(FoldersConfig())(result.foldersConfig)
assertResult(MetadataConfig())(result.metadataConfig)
assertResult(GenerationConfig())(result.generationConfig)
assert(result.connectionConfigByName.isEmpty)
assert(result.runtimeConfig.size == 16)
assert(result.master == "local[*]")
assertResult(16)(result.runtimeConfig.size)
assertResult("local[*]")(result.master)
}

test("Can create postgres connection configuration") {
Expand All @@ -27,14 +27,14 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_postgres"))
val config = result("my_postgres")
assert(config("url") == DEFAULT_POSTGRES_URL)
assert(config("user") == DEFAULT_POSTGRES_USERNAME)
assert(config("password") == DEFAULT_POSTGRES_PASSWORD)
assert(config("format") == "jdbc")
assert(config("driver") == "org.postgresql.Driver")
assertResult(DEFAULT_POSTGRES_URL)(config("url"))
assertResult(DEFAULT_POSTGRES_USERNAME)(config("user"))
assertResult(DEFAULT_POSTGRES_PASSWORD)(config("password"))
assertResult("jdbc")(config("format"))
assertResult("org.postgresql.Driver")(config("driver"))
}

test("Can create postgres connection with custom configuration") {
Expand All @@ -43,12 +43,12 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_postgres"))
val config = result("my_postgres")
assert(config.size == 6)
assert(config("url") == "jdbc:postgresql://localhost:5432/customer")
assert(config("stringtype") == "undefined")
assertResult(6)(config.size)
assertResult("jdbc:postgresql://localhost:5432/customer")(config("url"))
assertResult("undefined")(config("stringtype"))
}

test("Can create mysql connection configuration") {
Expand All @@ -57,14 +57,14 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_mysql"))
val config = result("my_mysql")
assert(config("url") == DEFAULT_MYSQL_URL)
assert(config("user") == DEFAULT_MYSQL_USERNAME)
assert(config("password") == DEFAULT_MYSQL_PASSWORD)
assert(config("format") == "jdbc")
assert(config("driver") == "com.mysql.cj.jdbc.Driver")
assertResult(DEFAULT_MYSQL_URL)(config("url"))
assertResult(DEFAULT_MYSQL_USERNAME)(config("user"))
assertResult(DEFAULT_MYSQL_PASSWORD)(config("password"))
assertResult("jdbc")(config("format"))
assertResult("com.mysql.cj.jdbc.Driver")(config("driver"))
}

test("Can create cassandra connection configuration") {
Expand All @@ -73,14 +73,14 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_cassandra"))
val config = result("my_cassandra")
assert(config("spark.cassandra.connection.host") == "cassandraserver")
assert(config("spark.cassandra.connection.port") == "9042")
assert(config("spark.cassandra.auth.username") == DEFAULT_CASSANDRA_USERNAME)
assert(config("spark.cassandra.auth.password") == DEFAULT_CASSANDRA_PASSWORD)
assert(config("format") == "org.apache.spark.sql.cassandra")
assertResult("cassandraserver")(config("spark.cassandra.connection.host"))
assertResult("9042")(config("spark.cassandra.connection.port"))
assertResult(DEFAULT_CASSANDRA_USERNAME)(config("spark.cassandra.auth.username"))
assertResult(DEFAULT_CASSANDRA_PASSWORD)(config("spark.cassandra.auth.password"))
assertResult("org.apache.spark.sql.cassandra")(config("format"))
}

test("Can create solace connection configuration") {
Expand All @@ -89,16 +89,16 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_solace"))
val config = result("my_solace")
assert(config("url") == DEFAULT_SOLACE_URL)
assert(config("user") == DEFAULT_SOLACE_USERNAME)
assert(config("password") == DEFAULT_SOLACE_PASSWORD)
assert(config("format") == "jms")
assert(config("vpnName") == DEFAULT_SOLACE_VPN_NAME)
assert(config("connectionFactory") == DEFAULT_SOLACE_CONNECTION_FACTORY)
assert(config("initialContextFactory") == DEFAULT_SOLACE_INITIAL_CONTEXT_FACTORY)
assertResult(DEFAULT_SOLACE_URL)(config("url"))
assertResult(DEFAULT_SOLACE_USERNAME)(config("user"))
assertResult(DEFAULT_SOLACE_PASSWORD)(config("password"))
assertResult("jms")(config("format"))
assertResult(DEFAULT_SOLACE_VPN_NAME)(config("vpnName"))
assertResult(DEFAULT_SOLACE_CONNECTION_FACTORY)(config("connectionFactory"))
assertResult(DEFAULT_SOLACE_INITIAL_CONTEXT_FACTORY)(config("initialContextFactory"))
}

test("Can create kafka connection configuration") {
Expand All @@ -107,11 +107,11 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_kafka"))
val config = result("my_kafka")
assert(config("kafka.bootstrap.servers") == DEFAULT_KAFKA_URL)
assert(config("format") == "kafka")
assertResult(DEFAULT_KAFKA_URL)(config("kafka.bootstrap.servers"))
assertResult("kafka")(config("format"))
}

test("Can create http connection configuration") {
Expand All @@ -120,11 +120,11 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_http"))
val config = result("my_http")
assert(config("user") == "user")
assert(config("password") == "pw")
assertResult("user")(config("user"))
assertResult("pw")(config("password"))
}

test("Can enable/disable flags") {
Expand Down Expand Up @@ -165,12 +165,12 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.foldersConfig

assert(result.planFilePath == "/my_plan")
assert(result.taskFolderPath == "/my_task")
assert(result.recordTrackingFolderPath == "/my_record_tracking")
assert(result.validationFolderPath == "/my_validation")
assert(result.generatedReportsFolderPath == "/my_generation_results")
assert(result.generatedPlanAndTaskFolderPath == "/my_generated_plan_tasks")
assertResult("/my_plan")(result.planFilePath)
assertResult("/my_task")(result.taskFolderPath)
assertResult("/my_record_tracking")(result.recordTrackingFolderPath)
assertResult("/my_validation")(result.validationFolderPath)
assertResult("/my_generation_results")(result.generatedReportsFolderPath)
assertResult("/my_generated_plan_tasks")(result.generatedPlanAndTaskFolderPath)
}

test("Can alter metadata configurations") {
Expand All @@ -183,11 +183,11 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.metadataConfig

assert(result.numRecordsFromDataSource == 1)
assert(result.numRecordsForAnalysis == 2)
assert(result.numGeneratedSamples == 3)
assert(result.oneOfMinCount == 100)
assert(result.oneOfDistinctCountVsCountThreshold == 0.3)
assertResult(1)(result.numRecordsFromDataSource)
assertResult(2)(result.numRecordsForAnalysis)
assertResult(3)(result.numGeneratedSamples)
assertResult(100)(result.oneOfMinCount)
assertResult(0.3)(result.oneOfDistinctCountVsCountThreshold)
}

test("Can alter generation configurations") {
Expand All @@ -197,7 +197,7 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.generationConfig

assert(result.numRecordsPerBatch == 100)
assertResult(100)(result.numRecordsPerBatch)
assert(result.numRecordsPerStep.contains(10))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ class MetadataSourceBuilderTest extends AnyFunSuite {
val result = MetadataSourceBuilder().openApi("localhost:8080").metadataSource

assert(result.isInstanceOf[OpenAPISource])
assert(result.asInstanceOf[OpenAPISource].connectionOptions == Map(SCHEMA_LOCATION -> "localhost:8080"))
assertResult(Map(SCHEMA_LOCATION -> "localhost:8080"))(result.asInstanceOf[OpenAPISource].connectionOptions)
}

test("Can create Great Expectations metadata source") {
val result = MetadataSourceBuilder().greatExpectations("/tmp/expectations").metadataSource

assert(result.isInstanceOf[GreatExpectationsSource])
assert(result.asInstanceOf[GreatExpectationsSource].connectionOptions == Map(GREAT_EXPECTATIONS_FILE -> "/tmp/expectations"))
assertResult(Map(GREAT_EXPECTATIONS_FILE -> "/tmp/expectations"))(result.asInstanceOf[GreatExpectationsSource].connectionOptions)
}

test("Can create Open Data Contract Standard metadata source") {
val result = MetadataSourceBuilder().openDataContractStandard("/tmp/odcs").metadataSource

assert(result.isInstanceOf[OpenDataContractStandardSource])
assert(result.asInstanceOf[OpenDataContractStandardSource].connectionOptions == Map(DATA_CONTRACT_FILE -> "/tmp/odcs"))
assertResult(Map(DATA_CONTRACT_FILE -> "/tmp/odcs"))(result.asInstanceOf[OpenDataContractStandardSource].connectionOptions)
}

test("Can create Open Data Contract Standard metadata source with schema name") {
Expand All @@ -80,7 +80,7 @@ class MetadataSourceBuilderTest extends AnyFunSuite {
val result = MetadataSourceBuilder().dataContractCli("/tmp/datacli").metadataSource

assert(result.isInstanceOf[DataContractCliSource])
assert(result.asInstanceOf[DataContractCliSource].connectionOptions == Map(DATA_CONTRACT_FILE -> "/tmp/datacli"))
assertResult(Map(DATA_CONTRACT_FILE -> "/tmp/datacli"))(result.asInstanceOf[DataContractCliSource].connectionOptions)
}

test("Can create Data Contract CLI metadata source with schema name") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ class PlanBuilderTest extends AnyFunSuite {
.description(desc)
.taskSummaries(taskSummaries)

assert(result.plan.name == name)
assert(result.plan.description == desc)
assert(result.plan.tasks.size == 1)
assert(result.plan.tasks.head == taskSummaries.taskSummary)
assertResult(name)(result.plan.name)
assertResult(desc)(result.plan.description)
assertResult(1)(result.plan.tasks.size)
assertResult(taskSummaries.taskSummary)(result.plan.tasks.head)
}

test("Can implement PlanRun") {
Expand Down Expand Up @@ -73,14 +73,14 @@ class PlanBuilderTest extends AnyFunSuite {
execute(List(t), p, c, List(v))
}

assert(result._tasks.size == 1)
assert(result._tasks.head.name == "my task")
assert(result._tasks.head.steps.head.schema.fields.get.head.name == "account_id")
assertResult(1)(result._tasks.size)
assertResult("my task")(result._tasks.head.name)
assertResult("account_id")(result._tasks.head.steps.head.schema.fields.get.head.name)

assert(result._plan.name == "my plan")
assert(result._plan.tasks.size == 1)
assert(result._plan.tasks.head.name == "my task")
assert(result._plan.tasks.head.dataSourceName == "account_json")
assertResult("my plan")(result._plan.name)
assertResult(1)(result._plan.tasks.size)
assertResult("my task")(result._plan.tasks.head.name)
assertResult("account_json")(result._plan.tasks.head.dataSourceName)
assert(result._plan.tasks.head.enabled)
assert(result._plan.sinkOptions.get.seed.contains("1"))
assert(result._plan.sinkOptions.get.locale.contains("en"))
Expand All @@ -106,26 +106,26 @@ class PlanBuilderTest extends AnyFunSuite {
assert(!result._configuration.flagsConfig.enableSinkMetadata)
assert(result._configuration.flagsConfig.enableSaveReports)
assert(result._configuration.flagsConfig.enableValidation)
assert(result._configuration.connectionConfigByName.size == 2)
assertResult(2)(result._configuration.connectionConfigByName.size)
assert(result._configuration.connectionConfigByName.contains("account_json"))
assert(result._configuration.connectionConfigByName("account_json") == Map("format" -> "json"))
assertResult(Map("format" -> "json"))(result._configuration.connectionConfigByName("account_json"))
assert(result._configuration.connectionConfigByName.contains("txn_db"))
assert(result._configuration.connectionConfigByName("txn_db") == Map("format" -> "postgres"))
assert(result._configuration.runtimeConfig == DataCatererConfiguration().runtimeConfig ++ Map("spark.sql.shuffle.partitions" -> "2"))
assertResult(Map("format" -> "postgres"))(result._configuration.connectionConfigByName("txn_db"))
assertResult(DataCatererConfiguration().runtimeConfig ++ Map("spark.sql.shuffle.partitions" -> "2"))(result._configuration.runtimeConfig)

assert(result._validations.size == 1)
assert(result._validations.head.dataSources.size == 1)
assertResult(1)(result._validations.size)
assertResult(1)(result._validations.head.dataSources.size)
val dataSourceHead = result._validations.head.dataSources.head
assert(dataSourceHead._1 == "account_json")
assert(dataSourceHead._2.size == 1)
assert(dataSourceHead._2.head.validations.size == 1)
assertResult("account_json")(dataSourceHead._1)
assertResult(1)(dataSourceHead._2.size)
assertResult(1)(dataSourceHead._2.head.validations.size)
val validationHead = dataSourceHead._2.head.validations.head.validation
assert(validationHead.description.contains("name is equal to Peter"))
assert(validationHead.errorThreshold.contains(0.1))
assert(validationHead.isInstanceOf[ExpressionValidation])
assert(validationHead.asInstanceOf[ExpressionValidation].expr == "name == 'Peter'")
assert(dataSourceHead._2.head.options == Map("path" -> "test/path/json"))
assert(dataSourceHead._2.head.waitCondition == PauseWaitCondition())
assertResult("name == 'Peter'")(validationHead.asInstanceOf[ExpressionValidation].expr)
assertResult(Map("path" -> "test/path/json"))(dataSourceHead._2.head.options)
assertResult(PauseWaitCondition())(dataSourceHead._2.head.waitCondition)
}

test("Can define random seed and locale that get used across all data generators") {
Expand All @@ -149,7 +149,7 @@ class PlanBuilderTest extends AnyFunSuite {
assert(result.sinkOptions.isDefined)
val fk = result.sinkOptions.get.foreignKeys
assert(fk.nonEmpty)
assert(fk.size == 1)
assertResult(1)(fk.size)
assert(fk.exists(f => f._1.startsWith("my_json") && f._1.endsWith("account_id") &&
f._2.size == 1 && f._2.head.startsWith("my_csv") && f._2.head.endsWith("account_id")
))
Expand All @@ -162,7 +162,7 @@ class PlanBuilderTest extends AnyFunSuite {
assert(result2.sinkOptions.isDefined)
val fk2 = result2.sinkOptions.get.foreignKeys
assert(fk2.nonEmpty)
assert(fk2.size == 1)
assertResult(1)(fk2.size)
}

test("Throw runtime exception when foreign key column is not defined in data sources") {
Expand Down Expand Up @@ -196,7 +196,7 @@ class PlanBuilderTest extends AnyFunSuite {
assert(result.sinkOptions.isDefined)
val fk = result.sinkOptions.get.foreignKeys
assert(fk.nonEmpty)
assert(fk.size == 1)
assertResult(1)(fk.size)
}

test("Don't throw runtime exception when delete foreign key column, defined by SQL, is not defined in data sources") {
Expand All @@ -211,9 +211,9 @@ class PlanBuilderTest extends AnyFunSuite {
assert(result.sinkOptions.isDefined)
val fk = result.sinkOptions.get.foreignKeys
assert(fk.nonEmpty)
assert(fk.size == 1)
assertResult(1)(fk.size)
assert(fk.head._2.isEmpty)
assert(fk.head._3.size == 1)
assertResult(1)(fk.head._3.size)
}

test("Can create a step that will generate records for all combinations") {
Expand Down
Loading
Loading