diff --git a/lib/stream-2.5.0-SNAPSHOT.jar b/lib/stream-2.5.0-SNAPSHOT.jar new file mode 100644 index 00000000..5cc2425a Binary files /dev/null and b/lib/stream-2.5.0-SNAPSHOT.jar differ diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index fc989885..a50b69a9 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -87,6 +87,8 @@ class MemoryStoreSinkOperator extends TerminalOperator { } if (builder != null) { + // Generate cardinality estimate now that all rows for current part are serialized + builder.asInstanceOf[TablePartitionBuilder].stats.estimateCardinality() statsAcc += Tuple2(partitionIndex, builder.asInstanceOf[TablePartitionBuilder].stats) Iterator(builder.asInstanceOf[TablePartitionBuilder].build) } else { diff --git a/src/main/scala/shark/memstore2/TablePartitionStats.scala b/src/main/scala/shark/memstore2/TablePartitionStats.scala index c2f20cbc..3d5bbd32 100644 --- a/src/main/scala/shark/memstore2/TablePartitionStats.scala +++ b/src/main/scala/shark/memstore2/TablePartitionStats.scala @@ -32,4 +32,8 @@ class TablePartitionStats(val stats: Array[ColumnStats[_]], val numRows: Long) " column " + index + " " + { if (column != null) column.toString else "no column statistics" } }.mkString("\n") + + def estimateCardinality() { + stats.foreach(_.estimateCardinality()) + } } diff --git a/src/main/scala/shark/memstore2/column/ColumnStats.scala b/src/main/scala/shark/memstore2/column/ColumnStats.scala index 31270fa3..695bab46 100644 --- a/src/main/scala/shark/memstore2/column/ColumnStats.scala +++ b/src/main/scala/shark/memstore2/column/ColumnStats.scala @@ -21,6 +21,7 @@ import java.io.ObjectInput import java.io.ObjectOutput import java.io.Externalizable import java.sql.Timestamp +import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.hadoop.io.Text @@ -35,18 +36,29 @@ sealed trait ColumnStats[@specialized(Boolean, Byte, Short, Int, Long, Float, Do protected def _min: T protected def _max: T - def min: T = _min def max: T = _max override def toString = "[" + min + ", " + max + "]" - + def :><(l: Any, r: Any): Boolean = (this :>= l) && (this :<= r) def :<=(v: Any): Boolean = (this := v) || (this :< v) def :>=(v: Any): Boolean = (this := v) || (this :> v) def :=(v: Any): Boolean def :>(v: Any): Boolean def :<(v: Any): Boolean + + + // Use Streamlib's HyperLogLog and 32-bit Murmurhash implemenatations for + // cardinality estimation, with 16 bits as the basis for this HLL instance. + @transient protected var _hyperLogLog: HyperLogLog = new HyperLogLog(16) + protected var _numDistinct: Long = 0 + + def numDistinct: Long = _numDistinct + + def estimateCardinality() { + _numDistinct = _hyperLogLog.cardinality() + } } @@ -58,7 +70,9 @@ object ColumnStats { class NoOpStats[T] extends ColumnStats[T] { protected var _max = null.asInstanceOf[T] protected var _min = null.asInstanceOf[T] - override def append(v: T) {} + override def append(v: T) { + _hyperLogLog.offer(v) + } override def :=(v: Any): Boolean = true override def :>(v: Any): Boolean = true override def :<(v: Any): Boolean = true @@ -68,9 +82,11 @@ object ColumnStats { protected var _max = false protected var _min = true override def append(v: Boolean) { + _hyperLogLog.offer(v) if (v) _max = v else _min = v } + def :=(v: Any): Boolean = { v match { case u:Boolean => _min <= u && _max >= u @@ -98,6 +114,7 @@ object ColumnStats { protected var _max = Byte.MinValue protected var _min = Byte.MaxValue override def append(v: Byte) { + _hyperLogLog.offer(v) if (v > _max) _max = v if (v < _min) _min = v } @@ -128,9 +145,11 @@ object ColumnStats { protected var _max = Short.MinValue protected var _min = Short.MaxValue override def append(v: Short) { + _hyperLogLog.offer(v) if (v > _max) _max = v if (v < _min) _min = v } + def :=(v: Any): Boolean = { v match { case u:Short => _min <= u && _max >= u @@ -197,6 +216,7 @@ object ColumnStats { } override def append(v: Int) { + _hyperLogLog.offer(v) if (v > _max) _max = v if (v < _min) _min = v @@ -229,9 +249,11 @@ object ColumnStats { protected var _max = Long.MinValue protected var _min = Long.MaxValue override def append(v: Long) { + _hyperLogLog.offer(v) if (v > _max) _max = v if (v < _min) _min = v } + def :=(v: Any): Boolean = { v match { case u:Long => _min <= u && _max >= u @@ -258,6 +280,7 @@ object ColumnStats { protected var _max = Float.MinValue protected var _min = Float.MaxValue override def append(v: Float) { + _hyperLogLog.offer(v) if (v > _max) _max = v if (v < _min) _min = v } @@ -287,6 +310,7 @@ object ColumnStats { protected var _max = Double.MinValue protected var _min = Double.MaxValue override def append(v: Double) { + _hyperLogLog.offer(v) if (v > _max) _max = v if (v < _min) _min = v } @@ -316,6 +340,7 @@ object ColumnStats { protected var _max = new Timestamp(0) protected var _min = new Timestamp(Long.MaxValue) override def append(v: Timestamp) { + _hyperLogLog.offer(v) if (v.compareTo(_max) > 0) _max = v if (v.compareTo(_min) < 0) _min = v } @@ -345,7 +370,7 @@ object ColumnStats { // Note: this is not Java serializable because Text is not Java serializable. protected var _max: Text = null protected var _min: Text = null - + def :=(v: Any): Boolean = { v match { case u: Text => _min.compareTo(u) <= 0 && _max.compareTo(u) >= 0 @@ -371,6 +396,7 @@ object ColumnStats { } override def append(v: Text) { + _hyperLogLog.offer(v) // Need to make a copy of Text since Text is not immutable and we reuse // the same Text object in serializer to mitigate frequent GC. if (_max == null) { diff --git a/src/test/scala/shark/memstore2/ColumnStatsSuite.scala b/src/test/scala/shark/memstore2/ColumnStatsSuite.scala index 21e55b21..fe5c783e 100644 --- a/src/test/scala/shark/memstore2/ColumnStatsSuite.scala +++ b/src/test/scala/shark/memstore2/ColumnStatsSuite.scala @@ -51,6 +51,10 @@ class ColumnStatsSuite extends FunSuite { assert(c :>= false) assert(!(c :<= false)) assert(c :>= true) + + c.append(true) + c.estimateCardinality() + assert(c.numDistinct == 1) } test("ByteColumnStats") { @@ -69,6 +73,10 @@ class ColumnStatsSuite extends FunSuite { assert(c :> 0.toByte) assert(c :<= -1.toByte) assert(!(c :<= -3.toByte)) + + c.append(0) + c.estimateCardinality() + assert(c.numDistinct == 5) } test("ShortColumnStats") { @@ -83,6 +91,10 @@ class ColumnStatsSuite extends FunSuite { assert(c.min == -1 && c.max == 1024) c.append(-1024) assert(c.min == -1024 && c.max == 1024) + + c.append(0) + c.estimateCardinality() + assert(c.numDistinct == 5) } test("IntColumnStats") { @@ -120,6 +132,10 @@ class ColumnStatsSuite extends FunSuite { c = new ColumnStats.IntColumnStats Array(22, 1, 24).foreach(c.append) assert(!c.isOrdered && !c.isAscending && !c.isDescending) + + c.append(22) + c.estimateCardinality() + assert(c.numDistinct == 3) } test("LongColumnStats") { @@ -137,6 +153,10 @@ class ColumnStatsSuite extends FunSuite { assert(c := 0.toLong) assert(c :> -2.toLong) assert(c :< 0.toLong) + + c.append(0) + c.estimateCardinality() + assert(c.numDistinct == 5) } test("FloatColumnStats") { @@ -154,6 +174,10 @@ class ColumnStatsSuite extends FunSuite { assert(c := 20.5F) assert(c :< 20.6F) assert(c :> -20.6F) + + c.append(0) + c.estimateCardinality() + assert(c.numDistinct == 5) } test("DoubleColumnStats") { @@ -171,6 +195,10 @@ class ColumnStatsSuite extends FunSuite { assert(c := 20.5) assert(!(c :> 20.6)) assert(c :< 20.6) + + c.append(0) + c.estimateCardinality() + assert(c.numDistinct == 5) } test("TimestampColumnStats") { @@ -192,7 +220,10 @@ class ColumnStatsSuite extends FunSuite { assert(c.min.equals(ts1) && c.max.equals(ts2)) c.append(ts4) assert(c.min.equals(ts1) && c.max.equals(ts4)) - + + c.append(ts4) + c.estimateCardinality() + assert(c.numDistinct == 4) } test("StringColumnStats") { @@ -213,5 +244,18 @@ class ColumnStatsSuite extends FunSuite { c.append("0987") assert(c.min.equals(T("0987")) && c.max.equals(T("cccc"))) + c.estimateCardinality() + assert(c.numDistinct == 4) + } + + test("HighCardinality") { + val size = 10000000 + var c = new ColumnStats.IntColumnStats + for (i <- 1 to size) { + c.append(i) + } + c.estimateCardinality() + val err = scala.math.abs(c.numDistinct - size) / size.asInstanceOf[Double] + assert(err < 0.1) } }