Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#105: Refactoring Measurement, from Agent side #104

Merged
merged 44 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7b013f9
Refactoring Measurement and a lot of stuff from Agent side
lsulak Nov 1, 2023
0b3b5b4
Better validation handling - Set for measurements and measures as the…
lsulak Nov 1, 2023
e9b771d
Merge branch 'master' into feature/measurement-refactoring
lsulak Nov 7, 2023
bb72f40
Merge branch 'master' into feature/measurement-refactoring
lsulak Nov 8, 2023
fe62225
Merge branch 'master' into feature/measurement-refactoring
lsulak Nov 9, 2023
6a299b6
Merge branch 'master' into feature/measurement-refactoring
lsulak Nov 10, 2023
a6bc965
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Nov 15, 2023
093c6e0
post-merge fix, set can't be understood by the fa-db
lsulak Nov 15, 2023
8e68cf1
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Nov 21, 2023
7a0b52e
fixing the build, after merge conflict resolution
lsulak Nov 21, 2023
5f1c422
Tiny doc improvement
lsulak Nov 21, 2023
bce5d81
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Nov 23, 2023
3a2b928
doc update, post-merge
lsulak Nov 23, 2023
9f82011
doc update, post-merge
lsulak Nov 23, 2023
ebf6731
Merge branch 'master' into feature/measurement-refactoring
lsulak Dec 27, 2023
b2c9aa2
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Jan 22, 2024
0a5e2ad
post-merge fixes
lsulak Jan 22, 2024
2ab9fd5
post-merge fixes
lsulak Jan 22, 2024
fe918df
post-merge fixes
lsulak Jan 22, 2024
c1efab2
post-merge fixes (still not finished completely)
lsulak Jan 22, 2024
f1be1a8
ControlCols -> MeasuredCols, refactoring and tests
lsulak Jan 24, 2024
64da68a
consistency with DTO
lsulak Jan 24, 2024
21b2454
yet more refactoring, this is cleaner (I'll consider using Shapeless)
lsulak Jan 24, 2024
f6c916d
removing basically redundant code
lsulak Jan 24, 2024
bd53781
anti-pattern removal & removal of useless parameter for the Record Count
lsulak Jan 25, 2024
62eb239
post-review comments
lsulak Jan 26, 2024
c5190d2
post-review comments
lsulak Jan 31, 2024
8a92e94
post-review changes
lsulak Feb 1, 2024
1805a53
Merge remote-tracking branch 'origin/master' into feature/measurement…
lsulak Feb 1, 2024
d85c85e
post-merge fixes
lsulak Feb 1, 2024
a80956a
post-merge fixes
lsulak Feb 1, 2024
fcfe3fb
#105: refactoring to have a generic type of the measurement result
lsulak Feb 8, 2024
cae400d
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 9, 2024
23ff72b
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 12, 2024
e6df0d7
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 13, 2024
bdf8906
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 16, 2024
8beceef
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 26, 2024
830be6e
#105: dropping Measurement in agent and thus simplifying the code eve…
lsulak Feb 27, 2024
098d40b
#105: changing Enumeration to case objects for easier SerDe in the fu…
lsulak Feb 28, 2024
6d838d7
fix
lsulak Feb 28, 2024
79a0bed
Merge branch 'master' into feature/measurement-refactoring
lsulak Feb 29, 2024
4a34b7f
#105: getSimpleName is using reflection, and reflection in Java 8 and…
lsulak Feb 29, 2024
98d10e0
Merge branch 'master' into feature/measurement-refactoring
lsulak Mar 5, 2024
595a972
Merge branch 'master' into feature/measurement-refactoring
lsulak Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.Measurement.MeasurementByAtum
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._

Expand All @@ -28,9 +27,6 @@ import scala.collection.immutable.ListMap

/**
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitions
* @param agent
* @param measures
*/
class AtumContext private[agent] (
val atumPartitions: AtumPartitions,
Expand All @@ -45,16 +41,16 @@ class AtumContext private[agent] (
agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

private def takeMeasurements(df: DataFrame): Set[MeasurementByAtum] = {
private def takeMeasurements(df: DataFrame): Set[MeasurementDTO] = {
measures.map { m =>
val measurementResult = m.function(df)
MeasurementByAtum(m, measurementResult.result, measurementResult.resultType)
val measureResult = m.function(df)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIny: (do only in any reasonable change is needed too)
Add a TODO message referencing the optimization ticket that would group the execution into one place. I think this is te right starting point. #98

MeasurementBuilder.buildMeasurementDTO(m, measureResult)
}
}

def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = {
val startTime = OffsetDateTime.now()
val measurements = takeMeasurements(dataToMeasure)
val measurementDTOs = takeMeasurements(dataToMeasure)
val endTime = OffsetDateTime.now()

val checkpointDTO = CheckpointDTO(
Expand All @@ -65,14 +61,14 @@ class AtumContext private[agent] (
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO).toSeq
measurements = measurementDTOs
)

agent.saveCheckpoint(checkpointDTO)
this
}

def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Seq[Measurement]): AtumContext = {
def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Set[Measurement]): AtumContext = {
val offsetDateTimeNow = OffsetDateTime.now()

val checkpointDTO = CheckpointDTO(
Expand All @@ -82,7 +78,7 @@ class AtumContext private[agent] (
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = offsetDateTimeNow,
processEndTime = Some(offsetDateTimeNow),
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO)
measurements = MeasurementBuilder.buildMeasurementDTO(measurements)
)

agent.saveCheckpoint(checkpointDTO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,12 @@ package za.co.absa.atum.agent.core

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.atum.agent.model.MeasureResult

trait MeasurementProcessor {

def function: MeasurementFunction

}

object MeasurementProcessor {
/**
* The raw result of measurement is always gonna be string, because we want to avoid some floating point issues
* (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more),
* but the actual type is stored alongside the computation because we don't want to lost this information.
*/
final case class ResultOfMeasurement(result: String, resultType: ResultValueType.ResultValueType)

type MeasurementFunction = DataFrame => ResultOfMeasurement

type MeasurementFunction = DataFrame => MeasureResult
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ package za.co.absa.atum.agent.exception

sealed abstract class AtumAgentException extends Exception

case class MeasurementProvidedException(msg: String) extends AtumAgentException
case class MeasurementException(msg: String) extends AtumAgentException
case class MeasureException(msg: String) extends AtumAgentException
21 changes: 10 additions & 11 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DecimalType, LongType, StringType}
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.atum.agent.core.MeasurementProcessor
import za.co.absa.atum.agent.core.MeasurementProcessor.{MeasurementFunction, ResultOfMeasurement}
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

Expand Down Expand Up @@ -55,10 +54,10 @@ object Measure {
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction =
override def function: MeasurementProcessor.MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(col(controlCol)).count().toString
ResultOfMeasurement(resultValue, resultValueType)
MeasureResult(resultValue, resultValueType)
}
}
object RecordCount extends MeasureType {
Expand All @@ -74,10 +73,10 @@ object Measure {
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction =
override def function: MeasurementProcessor.MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(col(controlCol)).distinct().count().toString
ResultOfMeasurement(resultValue, resultValueType)
MeasureResult(resultValue, resultValueType)
}
}
object DistinctRecordCount extends MeasureType {
Expand All @@ -95,10 +94,10 @@ object Measure {
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
override def function: MeasurementProcessor.MeasurementFunction = (ds: DataFrame) => {
salamonpavel marked this conversation as resolved.
Show resolved Hide resolved
val aggCol = sum(col(valueColumnName))
val resultValue = aggregateColumn(ds, controlCol, aggCol)
ResultOfMeasurement(resultValue, resultValueType)
MeasureResult(resultValue, resultValueType)
}
}
object SumOfValuesOfColumn extends MeasureType {
Expand All @@ -116,10 +115,10 @@ object Measure {
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
override def function: MeasurementProcessor.MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(abs(col(valueColumnName)))
val resultValue = aggregateColumn(ds, controlCol, aggCol)
ResultOfMeasurement(resultValue, resultValueType)
MeasureResult(resultValue, resultValueType)
}
}
object AbsSumOfValuesOfColumn extends MeasureType {
Expand All @@ -137,15 +136,15 @@ object Measure {
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
override def function: MeasurementProcessor.MeasurementFunction = (ds: DataFrame) => {

val aggregatedColumnName = ds.schema.getClosestUniqueName("sum_of_hashes")
val value = ds
.withColumn(aggregatedColumnName, crc32(col(controlCol).cast("String")))
.agg(sum(col(aggregatedColumnName)))
.collect()(0)(0)
val resultValue = if (value == null) "" else value.toString
ResultOfMeasurement(resultValue, ResultValueType.String)
MeasureResult(resultValue, ResultValueType.String)
}
}
object SumOfHashesOfColumn extends MeasureType {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.exception.MeasurementException
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

trait MeasureResult {
val resultValue: Any
Copy link
Collaborator

@salamonpavel salamonpavel Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to use generic T instead of Any? If we were to use generic T we would preserve type safety (Any can be any type at all) and flexibility (you cannot apply constraints like contexts bounds to Any).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'm absolutely aware of that. The thing is that if this would be of a generic type, then this trait would need to be of a generic type as well, and every usages of it would need to take that into an account. I gave it a try (1-2 hours) twice in the past and I didn't like the solution at all. I even wanted to include Shapeless here and this would be a the first step, but as I said I decided to keep it like this for now

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TebaleloS @benedeki
I am interested in opinion of others on this matter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like to use Any. On the other hand I can imagine using a generic there leads to a cascade of changes, that complicates things. There's a third option to consider:

trait MeasureResult {
  type ResultType,
  val resultValue: ResultType
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lsulak
Can you maybe elaborate a bit more on the problems the generic would bring? I understand that the type parameter will be reflected in signatures throughout the code but when you create an instance of a class you don't need to specify the type as it's inferred.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trait MeasureResult[T] {
  val resultValue: T
  val resultValueType: ResultValueType.ResultValueType
}

basically this ^ would mean, that I would need to adjust code in MeasurementProcessor and its usecases, Measurement and its usecases, MeasurementBuilder and its usecases, including things like AtumContext -> takeMeasurements and then also unit tests and god knows what else :D

I really like David's solution though, the whole 'generic' problem is nicely 'encapsulated' within the whole module

Copy link
Collaborator

@salamonpavel salamonpavel Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are actually right about this one. Anyway I have one suggestion, to replace Enumeration with ADT as shown below. Using sealed trait and case classes for ResultValueType is more idiomatic Scala and provides more flexibility. In this setup, ResultValueType is a sealed trait, which means it can only be extended in the same file. StringResult, LongResult, BigDecimalResult and DoubleResult are case classes the extend ResultValueType. This limits the possible types of ResultValueType to these four case classes.

This approach provides more flexibility because you can add methods and fields to the case classes if needed. It also provides more type safety because the compiler can check that you've handled all possible cases when you use pattern matching.

sealed trait ResultValueType
case class StringResult(value: String) extends ResultValueType
case class LongResult(value: Long) extends ResultValueType
case class BigDecimalResult(value: BigDecimal) extends ResultValueType
case class DoubleResult(value: Double) extends ResultValueType

trait MeasureResult {
  val resultValue: ResultValueType
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. I know that this is a common Scala pattern, but the original solution with the Enumerate is extremely small, really easy to refactor and extend if needed:

  object ResultValueType extends Enumeration {
    type ResultValueType = Value
    val String, Long, BigDecimal, Double = Value

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave it up to you to consider these options and decide. My personal preference would be the idiomatic solution with sealed trait and case classes. The reasoning was mentioned above, plus you wouldn't have to have a separate field just to keep track of the type.

val resultType: ResultValueType.ResultValueType
}

object MeasureResult {
private final case class MeasureResultWithType[T](resultValue: T, resultType: ResultValueType.ResultValueType)
extends MeasureResult

/**
* When the Atum Agent itself performs the measurements, using Spark, then in some cases some adjustments are
* needed - thus we are converting the results to strings always - but we need to keep the information about
* the actual type as well.
*
* These adjustments are needed to be performed - to avoid some floating point issues
* (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more).
*/
def apply(resultValue: String, resultType: ResultValueType.ResultValueType): MeasureResult = {
MeasureResultWithType[String](resultValue, resultType)
}

/**
* When the application/user of Atum Agent provides actual results by himself, the type is precise and we don't need
* to do any adjustments.
*/
def apply(resultValue: Any): MeasureResult = {
resultValue match {

case l: Long =>
MeasureResultWithType[Long](l, ResultValueType.Long)
case d: Double =>
MeasureResultWithType[Double](d, ResultValueType.Double)
case bd: BigDecimal =>
MeasureResultWithType[BigDecimal](bd, ResultValueType.BigDecimal)
case s: String =>
MeasureResultWithType[String](s, ResultValueType.String)

case unsupportedType =>
val className = unsupportedType.getClass.getSimpleName
throw MeasurementException(
s"Unsupported type of measurement: $className for provided result: $resultValue")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,65 +16,4 @@

package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.exception.MeasurementProvidedException
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

trait Measurement {
val measure: Measure
val resultValue: Any
val resultType: ResultValueType.ResultValueType
}

object Measurement {

/**
* When the application/user of Atum Agent provides actual results by himself, the type is precise and we don't need
* to do any adjustments.
*/
case class MeasurementProvided[T](measure: Measure, resultValue: T, resultType: ResultValueType.ResultValueType)
extends Measurement

object MeasurementProvided {

private def handleSpecificType[T](
measure: Measure, resultValue: T, requiredType: ResultValueType.ResultValueType
): MeasurementProvided[T] = {

val actualType = measure.resultValueType
if (actualType != requiredType)
throw MeasurementProvidedException(
s"Type of a given provided measurement result and type that a given measure supports are not compatible! " +
s"Got $actualType but should be $requiredType"
)
MeasurementProvided[T](measure, resultValue, requiredType)
}

def apply[T](measure: Measure, resultValue: T): Measurement = {
resultValue match {
case l: Long =>
handleSpecificType[Long](measure, l, ResultValueType.Long)
case d: Double =>
handleSpecificType[Double](measure, d, ResultValueType.Double)
case bd: BigDecimal =>
handleSpecificType[BigDecimal](measure, bd, ResultValueType.BigDecimal)
case s: String =>
handleSpecificType[String](measure, s, ResultValueType.String)

case unsupportedType =>
val className = unsupportedType.getClass.getSimpleName
throw MeasurementProvidedException(
s"Unsupported type of measurement for measure ${measure.measureName}: $className " +
s"for provided result: $resultValue"
)
}
}
}

/**
* When the Atum Agent itself performs the measurements, using Spark, then in some cases some adjustments are
* needed - thus we are converting the results to strings always - but we need to keep the information about
* the actual type as well.
*/
case class MeasurementByAtum(measure: Measure, resultValue: String, resultType: ResultValueType.ResultValueType)
extends Measurement
}
final case class Measurement(measure: Measure, result: MeasureResult)
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,53 @@

package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.exception.MeasurementException
import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}
import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue

private [agent] object MeasurementBuilder {

private [agent] def buildMeasurementDTO(measurement: Measurement): MeasurementDTO = {
val measureName = measurement.measure.measureName
val controlCols = Seq(measurement.measure.controlCol)
val measureDTO = MeasureDTO(measureName, controlCols)
private def validateMeasurement(measure: Measure, result: MeasureResult): Unit = {
val actualType = result.resultType
val requiredType = measure.resultValueType

if (actualType != requiredType)
throw MeasurementException(
s"Type of a given provided measurement result and type that a given measure supports are not compatible! " +
s"Got $actualType but should be $requiredType"
)
}

private def validateMeasurementUniqueness(measurements: Set[Measurement]): Unit = {
val originalMeasurementsCnt = measurements.size
val uniqueMeasuresCnt = measurements.toSeq.map(m =>
Tuple2(m.measure.measureName, m.measure.controlCol) // there can't be 2 same measures defined on the same column
).distinct.size

val areMeasuresUnique = originalMeasurementsCnt == uniqueMeasuresCnt

require(areMeasuresUnique, s"Measures must be unique, i.e. they cannot repeat! Got: ${measurements.map(_.measure)}")
}

val measureResultDTO = MeasureResultDTO(TypedValue(measurement.resultValue.toString, measurement.resultType))
private[agent] def buildMeasurementDTO(measurements: Set[Measurement]): Set[MeasurementDTO] = {
validateMeasurementUniqueness(measurements)

measurements.map(m => buildMeasurementDTO(m.measure, m.result))
}

private[agent] def buildMeasurementDTO(measurement: Measurement): MeasurementDTO = {
buildMeasurementDTO(measurement.measure, measurement.result)
}

private[agent] def buildMeasurementDTO(measure: Measure, measureResult: MeasureResult): MeasurementDTO = {
val measureName = measure.measureName
val controlCols = Seq(measure.controlCol)

validateMeasurement(measure, measureResult)

val measureDTO = MeasureDTO(measureName, controlCols)
val measureResultDTO = MeasureResultDTO(
MeasureResultDTO.TypedValue(measureResult.resultValue.toString, measureResult.resultType)
)
MeasurementDTO(measureDTO, measureResultDTO)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.Measure.{RecordCount, SumOfValuesOfColumn}
import za.co.absa.atum.agent.model.MeasurementBuilder
import za.co.absa.atum.agent.model.Measurement.MeasurementProvided
import za.co.absa.atum.agent.model.{Measurement, MeasurementBuilder, MeasureResult}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
Expand Down Expand Up @@ -106,9 +105,9 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
val atumPartitions = AtumPartitions("key" -> "value")
val atumContext: AtumContext = new AtumContext(atumPartitions, mockAgent)

val measurements = Seq(
MeasurementProvided(RecordCount("col"), 1L),
MeasurementProvided(SumOfValuesOfColumn("col"), BigDecimal(1))
val measurements = Set(
Measurement(RecordCount("col"), MeasureResult(1L)),
Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1)))
)

atumContext.createCheckpointOnProvidedData(
Expand Down Expand Up @@ -192,5 +191,4 @@ class AtumContextTest extends AnyFlatSpec with Matchers {

assert(atumContext.currentAdditionalData == expectedAdditionalData)
}

}
Loading