Skip to content

Commit

Permalink
KLL Sketch Optimisations (#171)
Browse files Browse the repository at this point in the history
Several optimisation w.r.t the new KLL sketches
  • Loading branch information
sscdotopen authored Nov 19, 2019
1 parent 0436d5a commit 9c69f00
Show file tree
Hide file tree
Showing 14 changed files with 384 additions and 32 deletions.
24 changes: 23 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.22</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.22</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<version>1.1.0-incubating</version>
<scope>test</scope>
</dependency>


</dependencies>


Expand Down Expand Up @@ -255,7 +277,7 @@
<artifactId>scoverage-maven-plugin</artifactId>
<version>1.3.0</version>
<configuration>
<scalaCompatVersion>${scala.major.version}</scalaCompatVersion>
<!--scalaCompatVersion>${scala.major.version}</scalaCompatVersion-->
<scalaVersion>${scala.version}</scalaVersion>
<!--<minimumCoverage>80</minimumCoverage>-->
<!--<failOnMinimumCoverage>true</failOnMinimumCoverage>-->
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ trait Analyzer[S <: State[_], +M <: Metric[_]] {

private[deequ] def toFailureMetric(failure: Exception): M

protected def calculateMetric(
def calculateMetric(
state: Option[S],
aggregateWith: Option[StateLoader] = None,
saveStatesWith: Option[StatePersister] = None)
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.amazon.deequ.analyzers.runners.MetricCalculationException
import org.apache.spark.sql.DeequFunctions.stateful_kll
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, Row}

import org.apache.spark.sql.functions.col

/**
* State definition for KLL Sketches.
Expand Down Expand Up @@ -84,12 +84,12 @@ case class KLLParameters(sketchSize: Int, shrinkingFactor: Double, numberOfBucke
/**
* The KLL Sketch analyzer.
* @param column the column to run the analyzer
* @param where constraint expression on the column
* //@param where constraint expression on the column
* @param kllParameters parameters of KLL Sketch
*/
case class KLLSketch(
column: String,
where: Option[String] = None,
// where: Option[String] = None,
kllParameters: Option[KLLParameters] = None)
extends ScanShareableAnalyzer[KLLState, KLLMetric] {

Expand All @@ -111,7 +111,8 @@ case class KLLSketch(
}

override def aggregationFunctions(): Seq[Column] = {
stateful_kll(conditionalSelection(column, where), sketchSize, shrinkingFactor) :: Nil
// stateful_kll(conditionalSelection(column, where), sketchSize, shrinkingFactor) :: Nil
stateful_kll(col(column), sketchSize, shrinkingFactor) :: Nil
}

override def fromAggregationResult(result: Row, offset: Int): Option[KLLState] = {
Expand Down Expand Up @@ -147,7 +148,6 @@ case class KLLSketch(
val parameters = List[Double](finalSketch.shrinkingFactor,
finalSketch.sketchSize.toDouble)
val data = finalSketch.getCompactorItems

BucketDistribution(bucketsList.toList, parameters, data)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class QuantileNonSample[T](
var sketchSize: Int,
var shrinkingFactor: Double = 0.64)
(implicit ordering: Ordering[T], ct: ClassTag[T])
extends Serializable{
extends Serializable {

/** Current Number of levels in compactors */
var curNumOfCompactors = 0
Expand Down Expand Up @@ -244,6 +244,11 @@ class QuantileNonSample[T](
* @return quantiles 1/q through (q-1)/q
*/
def quantiles(q: Int) : Array[T] = {

if (output.isEmpty) {
return Array.empty
}

val sortedItems = output.sortBy({
case (item, _) => item
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,20 @@ object AnalysisRunner {
val preconditionFailures = computePreconditionFailureMetrics(failedAnalyzers, data.schema)

/* Identify analyzers which require us to group the data */
val (groupingAnalyzers, scanningAnalyzers) =
val (groupingAnalyzers, allScanningAnalyzers) =
passedAnalyzers.partition { _.isInstanceOf[GroupingAnalyzer[State[_], Metric[_]]] }


val (kllAnalyzers, scanningAnalyzers) =
allScanningAnalyzers.partition { _.isInstanceOf[KLLSketch] }

val kllMetrics =
if (kllAnalyzers.nonEmpty) {
KLLRunner.computeKLLSketchesInExtraPass(data, kllAnalyzers, aggregateWith, saveStatesWith)
} else {
AnalyzerContext.empty
}

/* Run the analyzers which do not require grouping in a single pass over the data */
val nonGroupedMetrics =
runScanningAnalyzers(data, scanningAnalyzers, aggregateWith, saveStatesWith)
Expand Down Expand Up @@ -179,7 +190,7 @@ object AnalysisRunner {
}

val resultingAnalyzerContext = resultsComputedPreviously ++ preconditionFailures ++
nonGroupedMetrics ++ groupedMetrics
nonGroupedMetrics ++ groupedMetrics ++ kllMetrics

saveOrAppendResultsIfNecessary(
resultingAnalyzerContext,
Expand Down
179 changes: 179 additions & 0 deletions src/main/scala/com/amazon/deequ/analyzers/runners/KLLRunner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.analyzers.runners

import com.amazon.deequ.analyzers.{Analyzer, KLLParameters, KLLSketch, KLLState, QuantileNonSample, State, StateLoader, StatePersister}
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.types.{ByteType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructType}
import org.apache.spark.sql.{DataFrame, Row}

@SerialVersionUID(1L)
abstract class UntypedQuantileNonSample(sketchSize: Int, shrinkingFactor: Double)
extends Serializable {

var min: Double = Int.MaxValue.toDouble
var max: Double = Int.MinValue.toDouble
var sketch: QuantileNonSample[Double] = new QuantileNonSample[Double](sketchSize, shrinkingFactor)

def itemAsDouble(item: Any): Double

def updateUntyped(item: Any): Unit = {
this.min = math.min(this.min, itemAsDouble(item))
this.max = math.max(this.max, itemAsDouble(item))
sketch.update(itemAsDouble(item))
}

def mergeUntyped(other: UntypedQuantileNonSample): Unit = {
this.min = math.min(this.min, other.min)
this.max = math.max(this.max, other.max)
this.sketch = this.sketch.merge(other.sketch)
}

def asKLLState(): KLLState = {
KLLState(sketch, max, min)
}
}

@SerialVersionUID(1L)
class LongQuantileNonSample(sketchSize: Int, shrinkingFactor: Double)
extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable {
override def itemAsDouble(item: Any): Double = item.asInstanceOf[Long].toDouble
}

@SerialVersionUID(1L)
class IntQuantileNonSample(sketchSize: Int, shrinkingFactor: Double)
extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable {
override def itemAsDouble(item: Any): Double = item.asInstanceOf[Int].toDouble
}

@SerialVersionUID(1L)
class ShortQuantileNonSample(sketchSize: Int, shrinkingFactor: Double)
extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable {
override def itemAsDouble(item: Any): Double = item.asInstanceOf[Short].toDouble
}

@SerialVersionUID(1L)
class ByteQuantileNonSample(sketchSize: Int, shrinkingFactor: Double)
extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable {
override def itemAsDouble(item: Any): Double = item.asInstanceOf[Byte].toDouble
}

@SerialVersionUID(1L)
class DoubleQuantileNonSample(sketchSize: Int, shrinkingFactor: Double)
extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable {
override def itemAsDouble(item: Any): Double = item.asInstanceOf[Double]
}

@SerialVersionUID(1L)
class FloatQuantileNonSample(sketchSize: Int, shrinkingFactor: Double)
extends UntypedQuantileNonSample(sketchSize, shrinkingFactor) with Serializable {
override def itemAsDouble(item: Any): Double = item.asInstanceOf[Float].toDouble
}

object KLLRunner {

def computeKLLSketchesInExtraPass(
data: DataFrame,
analyzers: Seq[Analyzer[State[_], Metric[_]]],
aggregateWith: Option[StateLoader] = None,
saveStatesTo: Option[StatePersister] = None)
: AnalyzerContext = {

val kllAnalyzers = analyzers.map { _.asInstanceOf[KLLSketch] }

val columnsAndParameters = kllAnalyzers
.map { analyzer => (analyzer.column, analyzer.kllParameters) }
.toMap

val sketching = sketchPartitions(columnsAndParameters, data.schema)_

val sketchPerColumn =
data.rdd
.mapPartitions(sketching, preservesPartitioning = true)
.treeReduce { case (columnAndSketchesA, columnAndSketchesB) =>
columnAndSketchesA.map { case (column, sketch) =>
sketch.mergeUntyped(columnAndSketchesB(column))
column -> sketch
}
}

val metricsByAnalyzer = kllAnalyzers.map { analyzer =>
val kllState = sketchPerColumn(analyzer.column).asKLLState()
val metric = analyzer.calculateMetric(Some(kllState), aggregateWith, saveStatesTo)

analyzer -> metric
}

AnalyzerContext(metricsByAnalyzer.toMap[Analyzer[_, Metric[_]], Metric[_]])
}

private[this] def emptySketches(
columnsAndParameters: Map[String, Option[KLLParameters]],
schema: StructType): Map[String, UntypedQuantileNonSample] = {

columnsAndParameters.map { case (column, parameters) =>

val (sketchSize, shrinkingFactor) = parameters match {
case Some(kllParameters) => (kllParameters.sketchSize, kllParameters.shrinkingFactor)
case _ => (KLLSketch.DEFAULT_SKETCH_SIZE, KLLSketch.DEFAULT_SHRINKING_FACTOR)
}

val sketch: UntypedQuantileNonSample = schema(column).dataType match {
case DoubleType => new DoubleQuantileNonSample(sketchSize, shrinkingFactor)
case FloatType => new FloatQuantileNonSample(sketchSize, shrinkingFactor)
case ByteType => new ByteQuantileNonSample(sketchSize, shrinkingFactor)
case ShortType => new ShortQuantileNonSample(sketchSize, shrinkingFactor)
case IntegerType => new IntQuantileNonSample(sketchSize, shrinkingFactor)
case LongType => new LongQuantileNonSample(sketchSize, shrinkingFactor)
// TODO at the moment, we will throw exceptions for Decimals
case _ => throw new IllegalArgumentException(s"Cannot handle ${schema(column).dataType}")
}

column -> sketch
}
}

private[this] def sketchPartitions(
columnsAndParameters: Map[String, Option[KLLParameters]],
schema: StructType)(rows: Iterator[Row])
: Iterator[Map[String, UntypedQuantileNonSample]] = {

val columnsAndSketches = emptySketches(columnsAndParameters, schema)

val namesToIndexes = schema.fields
.map { _.name }
.zipWithIndex
.toMap

// Include the index to avoid a lookup per row
val indexesAndSketches = columnsAndSketches.map { case (column, sketch) =>
(namesToIndexes(column), sketch )
}

while (rows.hasNext) {
val row = rows.next()
indexesAndSketches.foreach { case (index, sketch) =>
if (!row.isNullAt(index)) {
sketch.updateUntyped(row.get(index))
}
}
}

Iterator.single(columnsAndSketches)
}

}
4 changes: 2 additions & 2 deletions src/main/scala/com/amazon/deequ/examples/KLLExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ private[examples] object KLLExample extends App {
output += s"\t\t\tbuckets: [\n"
val kllMetric = numProfile.kll.get
kllMetric.buckets.foreach { item =>
output += s"\t\t\t\t{\n \t\t\t\t\tlow_value: ${item.low_value} \n " +
s"\t\t\t\t\thigh_value: ${item.high_value} \n " +
output += s"\t\t\t\t{\n \t\t\t\t\tlow_value: ${item.lowValue} \n " +
s"\t\t\t\t\thigh_value: ${item.highValue} \n " +
s"\t\t\t\t\tcount: ${item.count}\n\t\t\t\t}\n"
}
output += s"\t\t\t],\n"
Expand Down
20 changes: 17 additions & 3 deletions src/main/scala/com/amazon/deequ/metrics/KLLMetric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,29 @@

package com.amazon.deequ.metrics

import com.amazon.deequ.analyzers.QuantileNonSample

import scala.util.{Failure, Success, Try}
import scala.util.control.Breaks._

case class BucketValue(low_value: Double, high_value: Double, count: Long)
case class BucketValue(lowValue: Double, highValue: Double, count: Long)

case class BucketDistribution(
buckets: List[BucketValue],
parameters: List[Double],
data: Array[Array[Double]]) {

def computePercentiles(): Array[Double] = {

val sketchSize = parameters(0).toInt
val shrinkingFactor = parameters(1)

val quantileNonSample = new QuantileNonSample[Double](sketchSize, shrinkingFactor)
quantileNonSample.reconstruct(sketchSize, shrinkingFactor, data)

quantileNonSample.quantiles(100)
}

/**
* Get relevant bucketValue with index of bucket.
* @param key index of bucket
Expand Down Expand Up @@ -76,6 +89,7 @@ case class BucketDistribution(
}
}

// TODO not sure if thats correct...
override def hashCode(): Int = super.hashCode()
}

Expand All @@ -94,8 +108,8 @@ case class KLLMetric(column: String, value: Try[BucketDistribution])

val details = distribution.buckets
.flatMap { distValue =>
DoubleMetric(entity, s"$name.low", instance, Success(distValue.low_value)) ::
DoubleMetric(entity, s"$name.high", instance, Success(distValue.high_value)) ::
DoubleMetric(entity, s"$name.low", instance, Success(distValue.lowValue)) ::
DoubleMetric(entity, s"$name.high", instance, Success(distValue.highValue)) ::
DoubleMetric(entity, s"$name.count", instance, Success(distValue.count)) :: Nil
}
numberOfBuckets ++ details
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/amazon/deequ/profiles/ColumnProfile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ object ColumnProfiles {
val tmp = new JsonArray()
kllSketch.buckets.foreach{bucket =>
val entry = new JsonObject()
entry.addProperty("low_value", bucket.low_value)
entry.addProperty("high_value", bucket.high_value)
entry.addProperty("low_value", bucket.lowValue)
entry.addProperty("high_value", bucket.highValue)
entry.addProperty("count", bucket.count)
tmp.add(entry)
}
Expand Down
Loading

0 comments on commit 9c69f00

Please sign in to comment.