Skip to content

Commit

Permalink
Fix bug in validation wait which would not use the correct sub class …
Browse files Browse the repository at this point in the history
…wait condition and always return true
  • Loading branch information
pflooky committed Jul 3, 2024
1 parent a254e78 commit 7444ff6
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, Expre
import io.github.datacatering.datacaterer.core.model.{DataSourceValidationResult, ValidationConfigResult, ValidationResult}
import io.github.datacatering.datacaterer.core.parser.PlanParser
import io.github.datacatering.datacaterer.core.validator.ValidationHelper.getValidationType
import io.github.datacatering.datacaterer.core.validator.ValidationWaitImplicits.WaitConditionOps
import io.github.datacatering.datacaterer.core.validator.ValidationWaitImplicits._
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SparkSession}

Expand Down Expand Up @@ -71,7 +71,7 @@ class ValidationProcessor(
} else {
LOGGER.debug(s"Waiting for validation condition to be successful before running validations, name=${vc.name}," +
s"data-source-name=$dataSourceName, details=${dataSourceValidation.options}, num-validations=${dataSourceValidation.validations.size}")
dataSourceValidation.waitCondition.waitForCondition(connectionConfigsByName)
dataSourceValidation.waitCondition.waitBeforeValidation(connectionConfigsByName)

val df = getDataFrame(dataSourceName, dataSourceValidation.options)
if (df.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.github.datacatering.datacaterer.core.validator

import io.github.datacatering.datacaterer.api.model.Constants.FORMAT
import io.github.datacatering.datacaterer.api.model.{DataExistsWaitCondition, FileExistsWaitCondition, PauseWaitCondition, WaitCondition, WebhookWaitCondition}
import io.github.datacatering.datacaterer.core.exception.InvalidWaitConditionException
import io.github.datacatering.datacaterer.core.util.ConfigUtil
import io.github.datacatering.datacaterer.core.util.HttpUtil.getAuthHeader
import org.apache.hadoop.fs.{FileSystem, Path}
Expand All @@ -13,94 +12,105 @@ import org.asynchttpclient.Dsl.asyncHttpClient
import scala.util.{Failure, Success, Try}


object ValidationWaitImplicits {
implicit class WaitConditionOps(waitCondition: WaitCondition = PauseWaitCondition()) {
private val LOGGER = Logger.getLogger(getClass.getName)
abstract class WaitConditionOps {
def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean
}

def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = true
class PauseWaitConditionOps(pauseWaitCondition: PauseWaitCondition) extends WaitConditionOps {
private val LOGGER = Logger.getLogger(getClass.getName)

def waitForCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Unit = {
if (waitCondition.isRetryable) {
var retries = 0
while (retries < waitCondition.maxRetries) {
if (!checkCondition(connectionConfigByName)) {
LOGGER.debug(s"Wait condition failed, pausing before retrying, pause-before-retry-seconds=${waitCondition.waitBeforeRetrySeconds}, " +
s"num-retries=$retries, max-retries=${waitCondition.maxRetries}")
Thread.sleep(waitCondition.waitBeforeRetrySeconds * 1000)
retries += 1
} else {
return
}
}
LOGGER.warn(s"Max retries has been reached for validation wait condition, continuing to try validation, " +
s"max-retries=${waitCondition.maxRetries}")
} else {
checkCondition(connectionConfigByName)
}
}
override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Pausing execution before starting validation, pause-in-seconds=${pauseWaitCondition.pauseInSeconds}")
Thread.sleep(pauseWaitCondition.pauseInSeconds * 1000)
true
}
}

implicit class PauseWaitConditionOps(pauseWaitCondition: PauseWaitCondition) extends WaitConditionOps(pauseWaitCondition) {
private val LOGGER = Logger.getLogger(getClass.getName)
class FileExistsWaitConditionOps(fileExistsWaitCondition: FileExistsWaitCondition) extends WaitConditionOps {
private val LOGGER = Logger.getLogger(getClass.getName)

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Pausing execution before starting validation, pause-in-seconds=${pauseWaitCondition.pauseInSeconds}")
Thread.sleep(pauseWaitCondition.pauseInSeconds * 1000)
true
}
override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Checking if file exists before running validations, file-path=${fileExistsWaitCondition.path}")
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
fs.exists(new Path(fileExistsWaitCondition.path))
}
}

implicit class FileExistsWaitConditionOps(fileExistsWaitCondition: FileExistsWaitCondition) extends WaitConditionOps(fileExistsWaitCondition) {
private val LOGGER = Logger.getLogger(getClass.getName)

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Checking if file exists before running validations, file-path=${fileExistsWaitCondition.path}")
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
fs.exists(new Path(fileExistsWaitCondition.path))
}
class DataExistsWaitConditionOps(dataExistsWaitCondition: DataExistsWaitCondition) extends WaitConditionOps {
private val LOGGER = Logger.getLogger(getClass.getName)

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Checking if data exists before running validations, data-source-name=${dataExistsWaitCondition.dataSourceName}," +
s"data-source-options=${ConfigUtil.cleanseOptions(dataExistsWaitCondition.options)}, expression=${dataExistsWaitCondition.expr}")
val connectionOptions = connectionConfigByName(dataExistsWaitCondition.dataSourceName)
val loadData = sparkSession.read
.format(connectionOptions(FORMAT))
.options(connectionOptions ++ dataExistsWaitCondition.options)
.load()
.where(dataExistsWaitCondition.expr)
!loadData.isEmpty
}
}

implicit class DataExistsWaitConditionOps(dataExistsWaitCondition: DataExistsWaitCondition) extends WaitConditionOps(dataExistsWaitCondition) {
private val LOGGER = Logger.getLogger(getClass.getName)

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Checking if data exists before running validations, data-source-name=${dataExistsWaitCondition.dataSourceName}," +
s"data-source-options=${ConfigUtil.cleanseOptions(dataExistsWaitCondition.options)}, expression=${dataExistsWaitCondition.expr}")
val connectionOptions = connectionConfigByName(dataExistsWaitCondition.dataSourceName)
val loadData = sparkSession.read
.format(connectionOptions(FORMAT))
.options(connectionOptions ++ dataExistsWaitCondition.options)
.load()
.where(dataExistsWaitCondition.expr)
!loadData.isEmpty
class WebhookWaitConditionOps(webhookWaitCondition: WebhookWaitCondition) extends WaitConditionOps {
private val LOGGER = Logger.getLogger(getClass.getName)

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
val webhookOptions = connectionConfigByName.getOrElse(webhookWaitCondition.dataSourceName, Map())
val request = asyncHttpClient().prepare(webhookWaitCondition.method, webhookWaitCondition.url)
val authHeader = getAuthHeader(webhookOptions)
val requestWithAuth = if (authHeader.nonEmpty) request.setHeader(authHeader.head._1, authHeader.head._2) else request

LOGGER.debug(s"Attempting HTTP request, url=${webhookWaitCondition.url}")
val tryResponse = Try(requestWithAuth.execute().get())

tryResponse match {
case Failure(exception) =>
LOGGER.error(s"Failed to execute HTTP wait condition request, url=${webhookWaitCondition.url}", exception)
false
case Success(value) =>
if (webhookWaitCondition.statusCodes.contains(value.getStatusCode)) {
true
} else {
LOGGER.debug(s"HTTP wait condition status code did not match expected status code, url=${webhookWaitCondition.url}, " +
s"expected-status-code=${webhookWaitCondition.statusCodes}, actual-status-code=${value.getStatusCode}, " +
s"response-body=${value.getResponseBody}")
false
}
}
}
}

implicit class WebhookWaitConditionOps(webhookWaitCondition: WebhookWaitCondition) extends WaitConditionOps(webhookWaitCondition) {
private val LOGGER = Logger.getLogger(getClass.getName)
object ValidationWaitImplicits {

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
val webhookOptions = connectionConfigByName.getOrElse(webhookWaitCondition.dataSourceName, Map())
val request = asyncHttpClient().prepare(webhookWaitCondition.method, webhookWaitCondition.url)
val authHeader = getAuthHeader(webhookOptions)
val requestWithAuth = if (authHeader.nonEmpty) request.setHeader(authHeader.head._1, authHeader.head._2) else request
private val LOGGER = Logger.getLogger(getClass.getName)

LOGGER.debug(s"Attempting HTTP request, url=${webhookWaitCondition.url}")
val tryResponse = Try(requestWithAuth.execute().get())
implicit class ValidationWaitConditionOps(waitCondition: WaitCondition)(implicit sparkSession: SparkSession) {
def waitBeforeValidation(connectionConfigByName: Map[String, Map[String, String]]): Unit = {
val waitOps = waitCondition match {
case x: DataExistsWaitCondition => new DataExistsWaitConditionOps(x)
case x: FileExistsWaitCondition => new FileExistsWaitConditionOps(x)
case x: PauseWaitCondition => new PauseWaitConditionOps(x)
case x: WebhookWaitCondition => new WebhookWaitConditionOps(x)
case x => throw new RuntimeException(s"Unknown type of validation wait condition, class=${x.getClass.getName}")
}

tryResponse match {
case Failure(exception) =>
LOGGER.error(s"Failed to execute HTTP wait condition request, url=${webhookWaitCondition.url}", exception)
false
case Success(value) =>
if (webhookWaitCondition.statusCodes.contains(value.getStatusCode)) {
true
if (waitCondition.isRetryable) {
var retries = 0
while (retries < waitCondition.maxRetries) {
if (!waitOps.checkCondition(connectionConfigByName)) {
LOGGER.debug(s"Wait condition failed, pausing before retrying, pause-before-retry-seconds=${waitCondition.waitBeforeRetrySeconds}, " +
s"num-retries=$retries, max-retries=${waitCondition.maxRetries}")
Thread.sleep(waitCondition.waitBeforeRetrySeconds * 1000)
retries += 1
} else {
LOGGER.debug(s"HTTP wait condition status code did not match expected status code, url=${webhookWaitCondition.url}, " +
s"expected-status-code=${webhookWaitCondition.statusCodes}, actual-status-code=${value.getStatusCode}, " +
s"response-body=${value.getResponseBody}")
false
return
}
}
LOGGER.warn(s"Max retries has been reached for validation wait condition, continuing to try validation, " +
s"max-retries=${waitCondition.maxRetries}")
} else {
waitOps.checkCondition(connectionConfigByName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ dataSources:
json:
- options:
path: "/tmp/yaml-validation-json-test"
waitCondition:
path: "/tmp/yaml-validation-json-test"
validations:
- expr: "STARTSWITH(transaction_id, 'txn')"

0 comments on commit 7444ff6

Please sign in to comment.