diff --git a/pom.xml b/pom.xml index e94cd7f02..e87af8813 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,28 @@ test + + org.openjdk.jmh + jmh-core + 1.22 + test + + + + org.openjdk.jmh + jmh-generator-annprocess + 1.22 + test + + + + org.apache.datasketches + datasketches-java + 1.1.0-incubating + test + + + @@ -255,7 +277,7 @@ scoverage-maven-plugin 1.3.0 - ${scala.major.version} + ${scala.version} diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index aa976d44f..aa9db3fac 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -104,7 +104,7 @@ trait Analyzer[S <: State[_], +M <: Metric[_]] { private[deequ] def toFailureMetric(failure: Exception): M - protected def calculateMetric( + def calculateMetric( state: Option[S], aggregateWith: Option[StateLoader] = None, saveStatesWith: Option[StatePersister] = None) diff --git a/src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala b/src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala index 8616503c8..502d59874 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala @@ -31,7 +31,7 @@ import com.amazon.deequ.analyzers.runners.MetricCalculationException import org.apache.spark.sql.DeequFunctions.stateful_kll import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Column, Row} - +import org.apache.spark.sql.functions.col /** * State definition for KLL Sketches. @@ -84,12 +84,12 @@ case class KLLParameters(sketchSize: Int, shrinkingFactor: Double, numberOfBucke /** * The KLL Sketch analyzer. * @param column the column to run the analyzer - * @param where constraint expression on the column + * //@param where constraint expression on the column * @param kllParameters parameters of KLL Sketch */ case class KLLSketch( column: String, - where: Option[String] = None, +// where: Option[String] = None, kllParameters: Option[KLLParameters] = None) extends ScanShareableAnalyzer[KLLState, KLLMetric] { @@ -111,7 +111,8 @@ case class KLLSketch( } override def aggregationFunctions(): Seq[Column] = { - stateful_kll(conditionalSelection(column, where), sketchSize, shrinkingFactor) :: Nil + // stateful_kll(conditionalSelection(column, where), sketchSize, shrinkingFactor) :: Nil + stateful_kll(col(column), sketchSize, shrinkingFactor) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[KLLState] = { @@ -147,7 +148,6 @@ case class KLLSketch( val parameters = List[Double](finalSketch.shrinkingFactor, finalSketch.sketchSize.toDouble) val data = finalSketch.getCompactorItems - BucketDistribution(bucketsList.toList, parameters, data) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/QuantileNonSample.scala b/src/main/scala/com/amazon/deequ/analyzers/QuantileNonSample.scala index b3e13615b..a39b2d672 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/QuantileNonSample.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/QuantileNonSample.scala @@ -26,7 +26,7 @@ class QuantileNonSample[T]( var sketchSize: Int, var shrinkingFactor: Double = 0.64) (implicit ordering: Ordering[T], ct: ClassTag[T]) - extends Serializable{ + extends Serializable { /** Current Number of levels in compactors */ var curNumOfCompactors = 0 @@ -244,6 +244,11 @@ class QuantileNonSample[T]( * @return quantiles 1/q through (q-1)/q */ def quantiles(q: Int) : Array[T] = { + + if (output.isEmpty) { + return Array.empty + } + val sortedItems = output.sortBy({ case (item, _) => item }) diff --git a/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala b/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala index c285b3852..5f1efec80 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala @@ -145,9 +145,20 @@ object AnalysisRunner { val preconditionFailures = computePreconditionFailureMetrics(failedAnalyzers, data.schema) /* Identify analyzers which require us to group the data */ - val (groupingAnalyzers, scanningAnalyzers) = + val (groupingAnalyzers, allScanningAnalyzers) = passedAnalyzers.partition { _.isInstanceOf[GroupingAnalyzer[State[_], Metric[_]]] } + + val (kllAnalyzers, scanningAnalyzers) = + allScanningAnalyzers.partition { _.isInstanceOf[KLLSketch] } + + val kllMetrics = + if (kllAnalyzers.nonEmpty) { + KLLRunner.computeKLLSketchesInExtraPass(data, kllAnalyzers, aggregateWith, saveStatesWith) + } else { + AnalyzerContext.empty + } + /* Run the analyzers which do not require grouping in a single pass over the data */ val nonGroupedMetrics = runScanningAnalyzers(data, scanningAnalyzers, aggregateWith, saveStatesWith) @@ -179,7 +190,7 @@ object AnalysisRunner { } val resultingAnalyzerContext = resultsComputedPreviously ++ preconditionFailures ++ - nonGroupedMetrics ++ groupedMetrics + nonGroupedMetrics ++ groupedMetrics ++ kllMetrics saveOrAppendResultsIfNecessary( resultingAnalyzerContext, diff --git a/src/main/scala/com/amazon/deequ/analyzers/runners/KLLRunner.scala b/src/main/scala/com/amazon/deequ/analyzers/runners/KLLRunner.scala new file mode 100644 index 000000000..64a008009 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/runners/KLLRunner.scala @@ -0,0 +1,179 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers.runners + +import com.amazon.deequ.analyzers.{Analyzer, KLLParameters, KLLSketch, KLLState, QuantileNonSample, State, StateLoader, StatePersister} +import com.amazon.deequ.metrics.Metric +import org.apache.spark.sql.types.{ByteType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructType} +import org.apache.spark.sql.{DataFrame, Row} + +@SerialVersionUID(1L) +abstract class UntypedQuantileNonSample(sketchSize: Int, shrinkingFactor: Double) + extends Serializable { + + var min: Double = Int.MaxValue.toDouble + var max: Double = Int.MinValue.toDouble + var sketch: QuantileNonSample[Double] = new QuantileNonSample[Double](sketchSize, shrinkingFactor) + + def itemAsDouble(item: Any): Double + + def updateUntyped(item: Any): Unit = { + this.min = math.min(this.min, itemAsDouble(item)) + this.max = math.max(this.max, itemAsDouble(item)) + sketch.update(itemAsDouble(item)) + } + + def mergeUntyped(other: UntypedQuantileNonSample): Unit = { + this.min = math.min(this.min, other.min) + this.max = math.max(this.max, other.max) + this.sketch = this.sketch.merge(other.sketch) + } + + def asKLLState(): KLLState = { + KLLState(sketch, max, min) + } +} + +@SerialVersionUID(1L) +class LongQuantileNonSample(sketchSize: Int, shrinkingFactor: Double) + extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable { + override def itemAsDouble(item: Any): Double = item.asInstanceOf[Long].toDouble +} + +@SerialVersionUID(1L) +class IntQuantileNonSample(sketchSize: Int, shrinkingFactor: Double) + extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable { + override def itemAsDouble(item: Any): Double = item.asInstanceOf[Int].toDouble +} + +@SerialVersionUID(1L) +class ShortQuantileNonSample(sketchSize: Int, shrinkingFactor: Double) + extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable { + override def itemAsDouble(item: Any): Double = item.asInstanceOf[Short].toDouble +} + +@SerialVersionUID(1L) +class ByteQuantileNonSample(sketchSize: Int, shrinkingFactor: Double) + extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable { + override def itemAsDouble(item: Any): Double = item.asInstanceOf[Byte].toDouble +} + +@SerialVersionUID(1L) +class DoubleQuantileNonSample(sketchSize: Int, shrinkingFactor: Double) + extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable { + override def itemAsDouble(item: Any): Double = item.asInstanceOf[Double] +} + +@SerialVersionUID(1L) +class FloatQuantileNonSample(sketchSize: Int, shrinkingFactor: Double) + extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable { + override def itemAsDouble(item: Any): Double = item.asInstanceOf[Float].toDouble +} + +object KLLRunner { + + def computeKLLSketchesInExtraPass( + data: DataFrame, + analyzers: Seq[Analyzer[State[_], Metric[_]]], + aggregateWith: Option[StateLoader] = None, + saveStatesTo: Option[StatePersister] = None) + : AnalyzerContext = { + + val kllAnalyzers = analyzers.map { _.asInstanceOf[KLLSketch] } + + val columnsAndParameters = kllAnalyzers + .map { analyzer => (analyzer.column, analyzer.kllParameters) } + .toMap + + val sketching = sketchPartitions(columnsAndParameters, data.schema)_ + + val sketchPerColumn = + data.rdd + .mapPartitions(sketching, preservesPartitioning = true) + .treeReduce { case (columnAndSketchesA, columnAndSketchesB) => + columnAndSketchesA.map { case (column, sketch) => + sketch.mergeUntyped(columnAndSketchesB(column)) + column -> sketch + } + } + + val metricsByAnalyzer = kllAnalyzers.map { analyzer => + val kllState = sketchPerColumn(analyzer.column).asKLLState() + val metric = analyzer.calculateMetric(Some(kllState), aggregateWith, saveStatesTo) + + analyzer -> metric + } + + AnalyzerContext(metricsByAnalyzer.toMap[Analyzer[_, Metric[_]], Metric[_]]) + } + + private[this] def emptySketches( + columnsAndParameters: Map[String, Option[KLLParameters]], + schema: StructType): Map[String, UntypedQuantileNonSample] = { + + columnsAndParameters.map { case (column, parameters) => + + val (sketchSize, shrinkingFactor) = parameters match { + case Some(kllParameters) => (kllParameters.sketchSize, kllParameters.shrinkingFactor) + case _ => (KLLSketch.DEFAULT_SKETCH_SIZE, KLLSketch.DEFAULT_SHRINKING_FACTOR) + } + + val sketch: UntypedQuantileNonSample = schema(column).dataType match { + case DoubleType => new DoubleQuantileNonSample(sketchSize, shrinkingFactor) + case FloatType => new FloatQuantileNonSample(sketchSize, shrinkingFactor) + case ByteType => new ByteQuantileNonSample(sketchSize, shrinkingFactor) + case ShortType => new ShortQuantileNonSample(sketchSize, shrinkingFactor) + case IntegerType => new IntQuantileNonSample(sketchSize, shrinkingFactor) + case LongType => new LongQuantileNonSample(sketchSize, shrinkingFactor) + // TODO at the moment, we will throw exceptions for Decimals + case _ => throw new IllegalArgumentException(s"Cannot handle ${schema(column).dataType}") + } + + column -> sketch + } + } + + private[this] def sketchPartitions( + columnsAndParameters: Map[String, Option[KLLParameters]], + schema: StructType)(rows: Iterator[Row]) + : Iterator[Map[String, UntypedQuantileNonSample]] = { + + val columnsAndSketches = emptySketches(columnsAndParameters, schema) + + val namesToIndexes = schema.fields + .map { _.name } + .zipWithIndex + .toMap + + // Include the index to avoid a lookup per row + val indexesAndSketches = columnsAndSketches.map { case (column, sketch) => + (namesToIndexes(column), sketch ) + } + + while (rows.hasNext) { + val row = rows.next() + indexesAndSketches.foreach { case (index, sketch) => + if (!row.isNullAt(index)) { + sketch.updateUntyped(row.get(index)) + } + } + } + + Iterator.single(columnsAndSketches) + } + +} diff --git a/src/main/scala/com/amazon/deequ/examples/KLLExample.scala b/src/main/scala/com/amazon/deequ/examples/KLLExample.scala index 412a91db0..3a2b897eb 100644 --- a/src/main/scala/com/amazon/deequ/examples/KLLExample.scala +++ b/src/main/scala/com/amazon/deequ/examples/KLLExample.scala @@ -55,8 +55,8 @@ private[examples] object KLLExample extends App { output += s"\t\t\tbuckets: [\n" val kllMetric = numProfile.kll.get kllMetric.buckets.foreach { item => - output += s"\t\t\t\t{\n \t\t\t\t\tlow_value: ${item.low_value} \n " + - s"\t\t\t\t\thigh_value: ${item.high_value} \n " + + output += s"\t\t\t\t{\n \t\t\t\t\tlow_value: ${item.lowValue} \n " + + s"\t\t\t\t\thigh_value: ${item.highValue} \n " + s"\t\t\t\t\tcount: ${item.count}\n\t\t\t\t}\n" } output += s"\t\t\t],\n" diff --git a/src/main/scala/com/amazon/deequ/metrics/KLLMetric.scala b/src/main/scala/com/amazon/deequ/metrics/KLLMetric.scala index b86f97ca1..868eca6a3 100644 --- a/src/main/scala/com/amazon/deequ/metrics/KLLMetric.scala +++ b/src/main/scala/com/amazon/deequ/metrics/KLLMetric.scala @@ -16,16 +16,29 @@ package com.amazon.deequ.metrics +import com.amazon.deequ.analyzers.QuantileNonSample + import scala.util.{Failure, Success, Try} import scala.util.control.Breaks._ -case class BucketValue(low_value: Double, high_value: Double, count: Long) +case class BucketValue(lowValue: Double, highValue: Double, count: Long) case class BucketDistribution( buckets: List[BucketValue], parameters: List[Double], data: Array[Array[Double]]) { + def computePercentiles(): Array[Double] = { + + val sketchSize = parameters(0).toInt + val shrinkingFactor = parameters(1) + + val quantileNonSample = new QuantileNonSample[Double](sketchSize, shrinkingFactor) + quantileNonSample.reconstruct(sketchSize, shrinkingFactor, data) + + quantileNonSample.quantiles(100) + } + /** * Get relevant bucketValue with index of bucket. * @param key index of bucket @@ -76,6 +89,7 @@ case class BucketDistribution( } } + // TODO not sure if thats correct... override def hashCode(): Int = super.hashCode() } @@ -94,8 +108,8 @@ case class KLLMetric(column: String, value: Try[BucketDistribution]) val details = distribution.buckets .flatMap { distValue => - DoubleMetric(entity, s"$name.low", instance, Success(distValue.low_value)) :: - DoubleMetric(entity, s"$name.high", instance, Success(distValue.high_value)) :: + DoubleMetric(entity, s"$name.low", instance, Success(distValue.lowValue)) :: + DoubleMetric(entity, s"$name.high", instance, Success(distValue.highValue)) :: DoubleMetric(entity, s"$name.count", instance, Success(distValue.count)) :: Nil } numberOfBuckets ++ details diff --git a/src/main/scala/com/amazon/deequ/profiles/ColumnProfile.scala b/src/main/scala/com/amazon/deequ/profiles/ColumnProfile.scala index 1847d7cbb..7c36d4093 100644 --- a/src/main/scala/com/amazon/deequ/profiles/ColumnProfile.scala +++ b/src/main/scala/com/amazon/deequ/profiles/ColumnProfile.scala @@ -130,8 +130,8 @@ object ColumnProfiles { val tmp = new JsonArray() kllSketch.buckets.foreach{bucket => val entry = new JsonObject() - entry.addProperty("low_value", bucket.low_value) - entry.addProperty("high_value", bucket.high_value) + entry.addProperty("low_value", bucket.lowValue) + entry.addProperty("high_value", bucket.highValue) entry.addProperty("count", bucket.count) tmp.add(entry) } diff --git a/src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala b/src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala index cfe7476bd..2e6fbe95a 100644 --- a/src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala +++ b/src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala @@ -235,15 +235,8 @@ object ColumnProfiler { relevantColumnNames .filter { name => Set(Integral, Fractional).contains(genericStatistics.typeOf(name)) } .flatMap { name => - - val percentiles = (1 to 100).map { - _.toDouble / 100 - } - - Seq(Minimum(name), Maximum(name), Mean(name), StandardDeviation(name), - Sum(name), KLLSketch(name, kllParameters = kllParameters), - ApproxQuantiles(name, percentiles)) + Sum(name), KLLSketch(name, kllParameters = kllParameters)) } } @@ -485,21 +478,30 @@ object ColumnProfiler { val kll = results.metricMap .collect { case (analyzer: KLLSketch, metric: KLLMetric) if metric.value.isSuccess => - analyzer.column -> metric.value.get + metric.value match { + case Success(bucketDistribution) => + Some(analyzer.column -> bucketDistribution) + case _ => None + } } + .flatten + .toMap val approxPercentiles = results.metricMap - .collect { case (analyzer: ApproxQuantiles, metric: KeyedDoubleMetric) => + .collect { case (analyzer: KLLSketch, metric: KLLMetric) => metric.value match { - case Success(metricValue) => - val percentiles = metricValue.values.toSeq.sorted - Some(analyzer.column -> percentiles) + case Success(bucketDistribution) => + + val percentiles = bucketDistribution.computePercentiles() + + Some(analyzer.column -> percentiles.toSeq.sorted) case _ => None } } .flatten .toMap + NumericColumnStatistics(means, stdDevs, minima, maxima, sums, kll, approxPercentiles) } diff --git a/src/test/scala/com/amazon/deequ/KLL/KLLBenchmark.java b/src/test/scala/com/amazon/deequ/KLL/KLLBenchmark.java new file mode 100644 index 000000000..44773c36d --- /dev/null +++ b/src/test/scala/com/amazon/deequ/KLL/KLLBenchmark.java @@ -0,0 +1,89 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.KLL; + +import com.amazon.deequ.analyzers.QuantileNonSample; +import org.apache.datasketches.kll.KllFloatsSketch; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"}) +public class KLLBenchmark { + + private static final int N = 10_000_000; + + private static float[] DATA_FOR_TESTING = createData(); + + public static void main(String[] args) throws RunnerException { + + Options opt = new OptionsBuilder() + .include(KLLBenchmark.class.getSimpleName()) + .forks(1) + .build(); + + new Runner(opt).run(); + } + + private static float[] createData() { + Random prng = new Random(); + float[] numbers = new float[N]; + for (int i = 0; i < N; i++) { + numbers[i] = prng.nextFloat(); + } + return numbers; + } + + @Benchmark + public void sumArray(Blackhole bh) { + float sum = 0.0f; + for (int i = 0; i < N; i++) { + sum += DATA_FOR_TESTING[i]; + } + bh.consume(sum); + } + + @Benchmark + public void sketchArrayWithKLL(Blackhole bh) { + QuantileNonSample sketch = KLLBenchmarkHelper.floatSketch(); + for (int i = 0; i < N; i++) { + sketch.update(DATA_FOR_TESTING[i]); + } + bh.consume(sketch); + } + + @Benchmark + public void sketchArrayWithJavaSketchesKLL(Blackhole bh) { + KllFloatsSketch sketch = new KllFloatsSketch(); + for (int i = 0; i < N; i++) { + sketch.update(DATA_FOR_TESTING[i]); + } + bh.consume(sketch); + } +} diff --git a/src/test/scala/com/amazon/deequ/KLL/KLLBenchmarkHelper.scala b/src/test/scala/com/amazon/deequ/KLL/KLLBenchmarkHelper.scala new file mode 100644 index 000000000..554ab1648 --- /dev/null +++ b/src/test/scala/com/amazon/deequ/KLL/KLLBenchmarkHelper.scala @@ -0,0 +1,28 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.KLL + +import com.amazon.deequ.analyzers.{KLLSketch, QuantileNonSample} + +object KLLBenchmarkHelper { + + def floatSketch(): QuantileNonSample[java.lang.Float] = { + new QuantileNonSample[java.lang.Float](KLLSketch.DEFAULT_SKETCH_SIZE, + KLLSketch.DEFAULT_SHRINKING_FACTOR) + } + +} diff --git a/src/test/scala/com/amazon/deequ/profiles/ColumnProfilerRunnerTest.scala b/src/test/scala/com/amazon/deequ/profiles/ColumnProfilerRunnerTest.scala index ab4e610b5..ea8e01cee 100644 --- a/src/test/scala/com/amazon/deequ/profiles/ColumnProfilerRunnerTest.scala +++ b/src/test/scala/com/amazon/deequ/profiles/ColumnProfilerRunnerTest.scala @@ -60,7 +60,8 @@ class ColumnProfilerRunnerTest extends WordSpec with Matchers with SparkContextS (results, stat.jobCount) } - assert(jobNumberAllCalculations == 3) + // assert(jobNumberAllCalculations == 3) + assert(jobNumberAllCalculations == 4) assert(jobNumberReusing == 0) assertConstraintSuggestionResultsEquals(separateResults, resultsReusingMetrics) } diff --git a/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunnerTest.scala b/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunnerTest.scala index 5171bd5b4..e6768a506 100644 --- a/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunnerTest.scala +++ b/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunnerTest.scala @@ -78,7 +78,8 @@ class ConstraintSuggestionRunnerTest extends WordSpec with Matchers with SparkCo (results, stat.jobCount) } - assert(jobNumberAllCalculations == 3) + // assert(jobNumberAllCalculations == 3) + assert(jobNumberAllCalculations == 4) assert(jobNumberReusing == 0) assertConstraintSuggestionResultsEquals(separateResults, resultsReusingMetrics) }