Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#105: Refactoring Measurement, from Agent side #104

Merged
merged 44 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7b013f9
Refactoring Measurement and a lot of stuff from Agent side
lsulak Nov 1, 2023
0b3b5b4
Better validation handling - Set for measurements and measures as the…
lsulak Nov 1, 2023
e9b771d
Merge branch 'master' into feature/measurement-refactoring
lsulak Nov 7, 2023
bb72f40
Merge branch 'master' into feature/measurement-refactoring
lsulak Nov 8, 2023
fe62225
Merge branch 'master' into feature/measurement-refactoring
lsulak Nov 9, 2023
6a299b6
Merge branch 'master' into feature/measurement-refactoring
lsulak Nov 10, 2023
a6bc965
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Nov 15, 2023
093c6e0
post-merge fix, set can't be understood by the fa-db
lsulak Nov 15, 2023
8e68cf1
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Nov 21, 2023
7a0b52e
fixing the build, after merge conflict resolution
lsulak Nov 21, 2023
5f1c422
Tiny doc improvement
lsulak Nov 21, 2023
bce5d81
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Nov 23, 2023
3a2b928
doc update, post-merge
lsulak Nov 23, 2023
9f82011
doc update, post-merge
lsulak Nov 23, 2023
ebf6731
Merge branch 'master' into feature/measurement-refactoring
lsulak Dec 27, 2023
b2c9aa2
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Jan 22, 2024
0a5e2ad
post-merge fixes
lsulak Jan 22, 2024
2ab9fd5
post-merge fixes
lsulak Jan 22, 2024
fe918df
post-merge fixes
lsulak Jan 22, 2024
c1efab2
post-merge fixes (still not finished completely)
lsulak Jan 22, 2024
f1be1a8
ControlCols -> MeasuredCols, refactoring and tests
lsulak Jan 24, 2024
64da68a
consistency with DTO
lsulak Jan 24, 2024
21b2454
yet more refactoring, this is cleaner (I'll consider using Shapeless)
lsulak Jan 24, 2024
f6c916d
removing basically redundant code
lsulak Jan 24, 2024
bd53781
anti-pattern removal & removal of useless parameter for the Record Count
lsulak Jan 25, 2024
62eb239
post-review comments
lsulak Jan 26, 2024
c5190d2
post-review comments
lsulak Jan 31, 2024
8a92e94
post-review changes
lsulak Feb 1, 2024
1805a53
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Feb 1, 2024
d85c85e
post-merge fixes
lsulak Feb 1, 2024
a80956a
post-merge fixes
lsulak Feb 1, 2024
fcfe3fb
#105: refactoring to have a generic type of the measurement result
lsulak Feb 8, 2024
cae400d
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 9, 2024
23ff72b
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 12, 2024
e6df0d7
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 13, 2024
bdf8906
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 16, 2024
8beceef
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 26, 2024
830be6e
#105: dropping Measurement in agent and thus simplifying the code eve…
lsulak Feb 27, 2024
098d40b
#105: changing Enumeration to case objects for easier SerDe in the fu…
lsulak Feb 28, 2024
6d838d7
fix
lsulak Feb 28, 2024
79a0bed
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 29, 2024
4a34b7f
#105: getSimpleName is using reflection, and reflection in Java 8 and…
lsulak Feb 29, 2024
98d10e0
Merge branch 'master' into feature/measurement-refactoring
lsulak Mar 5, 2024
595a972
Merge branch 'master' into feature/measurement-refactoring
lsulak Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class AtumAgent private[agent] () {
}

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
*
* Note: if partitioning doesn't exist in the store yet, a new one will be created with the author stored in
* `AtumAgent.currentUser`. If partitioning already exists, this attribute will be ignored because there
Expand Down
20 changes: 10 additions & 10 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.Measurement.MeasurementByAtum
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._

Expand Down Expand Up @@ -56,10 +55,11 @@ class AtumContext private[agent] (
agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

private def takeMeasurements(df: DataFrame): Set[MeasurementByAtum] = {
private def takeMeasurements(df: DataFrame): Set[MeasurementDTO] = {
measures.map { m =>
val measurementResult = m.function(df)
MeasurementByAtum(m, measurementResult.result, measurementResult.resultType)
// TODO group measurements together: https://github.com/AbsaOSS/atum-service/issues/98
val measureResult = m.function(df)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIny: (do only in any reasonable change is needed too)
Add a TODO message referencing the optimization ticket that would group the execution into one place. I think this is te right starting point. #98

MeasurementBuilder.buildMeasurementDTO(m, measureResult)
}
}

Expand All @@ -78,7 +78,7 @@ class AtumContext private[agent] (
*/
def createCheckpoint(checkpointName: String, dataToMeasure: DataFrame): AtumContext = {
val startTime = ZonedDateTime.now()
val measurements = takeMeasurements(dataToMeasure)
val measurementDTOs = takeMeasurements(dataToMeasure)
val endTime = ZonedDateTime.now()

val checkpointDTO = CheckpointDTO(
Expand All @@ -89,7 +89,7 @@ class AtumContext private[agent] (
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO).toSeq
measurements = measurementDTOs
)

agent.saveCheckpoint(checkpointDTO)
Expand All @@ -103,7 +103,7 @@ class AtumContext private[agent] (
* @param measurements the measurements to be included in the checkpoint
* @return the AtumContext after the checkpoint has been created
*/
def createCheckpointOnProvidedData(checkpointName: String, measurements: Seq[Measurement]): AtumContext = {
def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[AtumMeasure, MeasureResult]): AtumContext = {
val dateTimeNow = ZonedDateTime.now()

val checkpointDTO = CheckpointDTO(
Expand All @@ -113,7 +113,7 @@ class AtumContext private[agent] (
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = dateTimeNow,
processEndTime = Some(dateTimeNow),
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO)
measurements = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)
)

agent.saveCheckpoint(checkpointDTO)
Expand Down Expand Up @@ -154,7 +154,7 @@ class AtumContext private[agent] (
/**
* Adds a measure to the AtumContext.
*
* @param measure the measure to be added
* @param newMeasure the measure to be added
*/
def addMeasure(newMeasure: AtumMeasure): AtumContext = {
measures = measures + newMeasure
Expand All @@ -164,7 +164,7 @@ class AtumContext private[agent] (
/**
* Adds multiple measures to the AtumContext.
*
* @param measures set sequence of measures to be added
* @param newMeasures set sequence of measures to be added
*/
def addMeasures(newMeasures: Set[AtumMeasure]): AtumContext = {
measures = measures ++ newMeasures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.atum.agent.core

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.atum.agent.model.MeasureResult

/**
* This trait provides a contract for different measurement processors
Expand All @@ -27,29 +27,19 @@ trait MeasurementProcessor {

/**
* This method is used to compute measure on Spark `Dataframe`.
* @param df: Spark `Dataframe` to be measured.
* @return Result of measurement.
*/
def function: MeasurementFunction

}

/**
* This companion object provides a set of types for measurement processors
*/
object MeasurementProcessor {
/**
* The raw result of measurement is always gonna be string, because we want to avoid some floating point issues
* (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more),
* but the actual type is stored alongside the computation because we don't want to lost this information.
* This type alias describes a function that is used to compute measure on Spark `Dataframe`.
* It receives a Spark `Dataframe` to be measured on its input.
*
* @return Result of measurement.
*/
final case class ResultOfMeasurement(result: String, resultType: ResultValueType.ResultValueType)

/**
* This type alias describes a function that is used to compute measure on Spark `Dataframe`.
* @param df: Spark `Dataframe` to be measured.
* @return Result of measurement.
*/
type MeasurementFunction = DataFrame => ResultOfMeasurement

type MeasurementFunction = DataFrame => MeasureResult
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.utils.SerializationUtils

import scala.util.{Failure, Success, Try}

class HttpDispatcher(config: Config) extends Dispatcher with Logging {

private val serverUrl = config.getString("url")
Expand All @@ -51,7 +49,7 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {
val response = backend.send(request)

SerializationUtils.fromJson[AtumContextDTO](
safeResponseBody(response).get
handleResponseBody(response)
)
}

Expand All @@ -62,7 +60,7 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {

val response = backend.send(request)

safeResponseBody(response).get
handleResponseBody(response)
}

override def saveAdditionalData(additionalDataSubmitDTO: AdditionalDataSubmitDTO): Unit = {
Expand All @@ -72,13 +70,13 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {

val response = backend.send(request)

safeResponseBody(response).get
handleResponseBody(response)
}

def safeResponseBody(response: Response[Either[String, String]]): Try[String] = {
private def handleResponseBody(response: Response[Either[String, String]]): String = {
response.body match {
case Left(body) => Failure(HttpException(response.code.code, body))
case Right(body) => Success(body)
case Left(body) => throw HttpException(response.code.code, body)
benedeki marked this conversation as resolved.
Show resolved Hide resolved
case Right(body) => body
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object AtumAgentException {
*
* @param message A message describing the exception.
*/
case class MeasurementProvidedException(message: String) extends AtumAgentException(message)
case class MeasurementException(message: String) extends AtumAgentException(message)

/**
* This type represents an exception thrown when a measure is not supported by the Atum Agent.
Expand Down
54 changes: 27 additions & 27 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, DecimalType, LongType, StringType}
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.atum.agent.core.MeasurementProcessor
import za.co.absa.atum.agent.core.MeasurementProcessor.{MeasurementFunction, ResultOfMeasurement}
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

/**
* Type of different measures to be applied to the columns.
*/
sealed trait Measure {
val measureName: String
def controlColumns: Seq[String]
def measuredColumns: Seq[String]
}

trait AtumMeasure extends Measure with MeasurementProcessor {
Expand All @@ -51,83 +51,83 @@ object AtumMeasure {
override def function: MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(columnExpression).collect()
ResultOfMeasurement(resultValue(0).toString, resultValueType)
MeasureResult(resultValue(0).toString, resultValueType)
}

override def controlColumns: Seq[String] = Seq.empty
override def measuredColumns: Seq[String] = Seq.empty
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long
}
object RecordCount {
private[agent] val measureName: String = "count"
def apply(): RecordCount = RecordCount(measureName)
}

case class DistinctRecordCount private (measureName: String, controlCols: Seq[String]) extends AtumMeasure {
require(controlCols.nonEmpty, "At least one control column has to be defined.")
case class DistinctRecordCount private (measureName: String, measuredCols: Seq[String]) extends AtumMeasure {
lsulak marked this conversation as resolved.
Show resolved Hide resolved
require(measuredCols.nonEmpty, "At least one measured column has to be defined.")

private val columnExpression = countDistinct(col(controlCols.head), controlCols.tail.map(col): _*)
private val columnExpression = countDistinct(col(measuredCols.head), measuredCols.tail.map(col): _*)

override def function: MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(columnExpression).collect()
ResultOfMeasurement(resultValue(0)(0).toString, resultValueType)
MeasureResult(resultValue(0)(0).toString, resultValueType)
}

override def controlColumns: Seq[String] = controlCols
override def measuredColumns: Seq[String] = measuredCols
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long
}
object DistinctRecordCount {
private[agent] val measureName: String = "distinctCount"
def apply(controlCols: Seq[String]): DistinctRecordCount = DistinctRecordCount(measureName, controlCols)
def apply(measuredCols: Seq[String]): DistinctRecordCount = DistinctRecordCount(measureName, measuredCols)
}

case class SumOfValuesOfColumn private (measureName: String, controlCol: String) extends AtumMeasure {
case class SumOfValuesOfColumn private (measureName: String, measuredCol: String) extends AtumMeasure {
private val columnAggFn: Column => Column = column => sum(column)

override def function: MeasurementFunction = (ds: DataFrame) => {
val dataType = ds.select(controlCol).schema.fields(0).dataType
val resultValue = ds.select(columnAggFn(castForAggregation(dataType, col(controlCol)))).collect()
ResultOfMeasurement(handleAggregationResult(dataType, resultValue(0)(0)), resultValueType)
val dataType = ds.select(measuredCol).schema.fields(0).dataType
val resultValue = ds.select(columnAggFn(castForAggregation(dataType, col(measuredCol)))).collect()
MeasureResult(handleAggregationResult(dataType, resultValue(0)(0)), resultValueType)
}

override def controlColumns: Seq[String] = Seq(controlCol)
override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.BigDecimal
}
object SumOfValuesOfColumn {
private[agent] val measureName: String = "aggregatedTotal"
def apply(controlCol: String): SumOfValuesOfColumn = SumOfValuesOfColumn(measureName, controlCol)
def apply(measuredCol: String): SumOfValuesOfColumn = SumOfValuesOfColumn(measureName, measuredCol)
}

case class AbsSumOfValuesOfColumn private (measureName: String, controlCol: String) extends AtumMeasure {
case class AbsSumOfValuesOfColumn private (measureName: String, measuredCol: String) extends AtumMeasure {
private val columnAggFn: Column => Column = column => sum(abs(column))

override def function: MeasurementFunction = (ds: DataFrame) => {
val dataType = ds.select(controlCol).schema.fields(0).dataType
val resultValue = ds.select(columnAggFn(castForAggregation(dataType, col(controlCol)))).collect()
ResultOfMeasurement(handleAggregationResult(dataType, resultValue(0)(0)), resultValueType)
val dataType = ds.select(measuredCol).schema.fields(0).dataType
val resultValue = ds.select(columnAggFn(castForAggregation(dataType, col(measuredCol)))).collect()
MeasureResult(handleAggregationResult(dataType, resultValue(0)(0)), resultValueType)
}

override def controlColumns: Seq[String] = Seq(controlCol)
override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.BigDecimal
}
object AbsSumOfValuesOfColumn {
private[agent] val measureName: String = "absAggregatedTotal"
def apply(controlCol: String): AbsSumOfValuesOfColumn = AbsSumOfValuesOfColumn(measureName, controlCol)
def apply(measuredCol: String): AbsSumOfValuesOfColumn = AbsSumOfValuesOfColumn(measureName, measuredCol)
}

case class SumOfHashesOfColumn private (measureName: String, controlCol: String) extends AtumMeasure {
private val columnExpression: Column = sum(crc32(col(controlCol).cast("String")))
case class SumOfHashesOfColumn private (measureName: String, measuredCol: String) extends AtumMeasure {
private val columnExpression: Column = sum(crc32(col(measuredCol).cast("String")))
override def function: MeasurementFunction = (ds: DataFrame) => {
val resultValue = ds.select(columnExpression).collect()
ResultOfMeasurement(Option(resultValue(0)(0)).getOrElse("").toString, resultValueType)
MeasureResult(Option(resultValue(0)(0)).getOrElse("").toString, resultValueType)
}

override def controlColumns: Seq[String] = Seq(controlCol)
override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.String
}
object SumOfHashesOfColumn {
private[agent] val measureName: String = "hashCrc32"
def apply(controlCol: String): SumOfHashesOfColumn = SumOfHashesOfColumn(measureName, controlCol)
def apply(measuredCol: String): SumOfHashesOfColumn = SumOfHashesOfColumn(measureName, measuredCol)
}

private def castForAggregation(
Expand Down
Loading
Loading