Skip to content

Commit

Permalink
Merge branch 'release/0.4.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Coveney committed Feb 6, 2014
2 parents eae6503 + d8a0c15 commit 5f26bc7
Show file tree
Hide file tree
Showing 21 changed files with 494 additions and 233 deletions.
16 changes: 16 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Algebird #

### Version 0.4.0 ###
* Make SketchMap1 the only SketchMap implementation: https://github.com/twitter/algebird/pull/256
* Use semigroup in StateWithError: https://github.com/twitter/algebird/pull/255
* Don't iterate through everything in unit monoid: https://github.com/twitter/algebird/pull/253
* Factor as much logic as possible into SketchmapMonoid1: https://github.com/twitter/algebird/pull/251
* Moving Trampoline flatMap implementation into trait: https://github.com/twitter/algebird/pull/249
* Integrate Caliper: https://github.com/twitter/algebird/pull/248
* Adds append method to MonoidAggregator and RingAggregator: https://github.com/twitter/algebird/pull/246
* Make the map monoid more performant for mutable maps: https://github.com/twitter/algebird/pull/245
* Make BFHash take on non negative values only: https://github.com/twitter/algebird/pull/243
* Fixed DecayedValue: https://github.com/twitter/algebird/pull/238
* Updated scaladoc to 0.3.0: https://github.com/twitter/algebird/pull/237
* Add Incrementable and tests: https://github.com/twitter/algebird/pull/234
* Updates sbt runner: https://github.com/twitter/algebird/pull/232
* Upgrade sbt, specs, add a build.properties, and bump travis's target: https://github.com/twitter/algebird/pull/231

### Version 0.3.1 ###
* Make a field transient in BloomFilter serialization: https://github.com/twitter/algebird/pull/209
* Add the TunnelMonoid to Util (like async function monoid): https://github.com/twitter/algebird/pull/213
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Discussion occurs primarily on the [Algebird mailing list](https://groups.google

## Maven

Algebird modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.3.0`.
Algebird modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.4.0`.

Current published artifacts are

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.twitter.algebird.caliper

import com.google.caliper.{Param, SimpleBenchmark}
import com.twitter.algebird.HyperLogLogMonoid

import java.nio.ByteBuffer

class HllBatchCreateBenchmark extends SimpleBenchmark {
@Param(Array("5", "10", "17", "25"))
val bits: Int = 0

@Param(Array("10", "20", "30"))
val max: Long = 0

var set: Set[Long] = _

/* Don't use twitter bijection to reduce dependencies on other projects */
implicit def injection(value: Long) = {
val size = 8
val buf = ByteBuffer.allocate(size)
buf.putLong(value)
buf.array
}

override def setUp {
set = (0L until max).toSet
}

def timeBatchCreate(reps: Int): Int = {
val hllMonoid = new HyperLogLogMonoid(bits)
var dummy = 0
while (dummy < reps) {
val hll = hllMonoid.batchCreate(set)(injection)
dummy += 1
}
dummy
}
}
23 changes: 23 additions & 0 deletions algebird-caliper/src/main/scala/com/twitter/algebird/Runner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.twiter.algebird.caliper

/** Adapted from @link https://github.com/sirthias/scala-benchmarking-template/blob/master/src/main/scala/org/example/Runner.scala
*/
import com.google.caliper.{Benchmark, Runner => CaliperRunner}
import com.google.common.collect.ObjectArrays.concat

import java.io.PrintWriter

object Runner {

import com.twitter.algebird.caliper._

def main(args: Array[String]) {
val outWriter = new PrintWriter(System.out, true)
val errWriter = new PrintWriter(System.err, true)
/* We should probably call to the Caliper command line
tool if we want to scale up? */
CaliperRunner.main(classOf[HllBatchCreateBenchmark], args)
}

}

21 changes: 17 additions & 4 deletions algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ object Aggregator extends java.io.Serializable {
def reduce(l : T, r : T) = sg.plus(l, r)
def present(reduction : T) = reduction
}
def fromMonoid[T](implicit mon: Monoid[T]): MonoidAggregator[T,T,T] = new MonoidAggregator[T,T,T] {
def prepare(input : T) = input
def fromMonoid[T](implicit mon: Monoid[T]): MonoidAggregator[T,T,T] = fromMonoid[T,T](mon,identity[T])
// Uses the product from the ring
def fromRing[T](implicit rng: Ring[T]): RingAggregator[T,T,T] = fromRing[T,T](rng,identity[T])

def fromMonoid[F,T](implicit mon: Monoid[T],prep: F=>T): MonoidAggregator[F,T,T] = new MonoidAggregator[F,T,T] {
def prepare(input : F) = prep(input)
def monoid = mon
def present(reduction : T) = reduction
}
// Uses the product from the ring
def fromRing[T](implicit rng: Ring[T]): RingAggregator[T,T,T] = new RingAggregator[T,T,T] {
def prepare(input : T) = input
def fromRing[F,T](implicit rng: Ring[T],prep: F=>T): RingAggregator[F,T,T] = new RingAggregator[F,T,T] {
def prepare(input : F) = prep(input)
def ring = rng
def present(reduction : T) = reduction
}
Expand All @@ -41,6 +45,11 @@ trait Aggregator[-A,B,+C] extends Function1[TraversableOnce[A], C] with java.io.
def reduce(items : TraversableOnce[B]) : B = items.reduce{reduce(_,_)}
def apply(inputs : TraversableOnce[A]) : C = present(reduce(inputs.map{prepare(_)}))

def append(l: B,r: A): B=reduce(l,prepare(r))

def appendAll(old: B, items: TraversableOnce[A]): B =
if (items.isEmpty) old else reduce(old, reduce(items.map(prepare)))

/** Like calling andThen on the present function */
def andThenPresent[D](present2: C => D): Aggregator[A,B,D] =
new Aggregator[A,B,D] {
Expand All @@ -62,6 +71,8 @@ trait MonoidAggregator[-A,B,+C] extends Aggregator[A,B,C] {
final def reduce(l : B, r : B) : B = monoid.plus(l, r)
final override def reduce(items : TraversableOnce[B]) : B =
monoid.sum(items)

def appendAll(items:TraversableOnce[A]): B=appendAll(monoid.zero,items)
}

trait RingAggregator[-A,B,+C] extends Aggregator[A,B,C] {
Expand All @@ -70,4 +81,6 @@ trait RingAggregator[-A,B,+C] extends Aggregator[A,B,C] {
final override def reduce(items : TraversableOnce[B]) : B =
if(items.isEmpty) ring.one // There are several pseudo-rings, so avoid one if you can
else items.reduceLeft(reduce _)

def appendAll(items:TraversableOnce[A]): B=appendAll(ring.one,items)
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,13 @@ case class BFHash(numHashes: Int, width: Int, seed: Long = 0L) extends Function1
def apply(s: String) = nextHash(s.getBytes, numHashes)

private def splitLong(x: Long) = {
val upper = math.abs(x >> 32).toInt
val lower = math.abs((x << 32) >> 32).toInt
def toNonNegativeInt(x: Long) = {
val y = math.abs(x).toInt // y may be negative (Interger.MIN_VALUE)
y & 0x7fffffff // no change for positive numbers, converts Interger.MIN_VALUE to positive number
}

val upper = toNonNegativeInt(x >> 32)
val lower = toNonNegativeInt((x << 32) >> 32)
(upper, lower)
}

Expand Down
3 changes: 3 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Group.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class ConstantGroup[T](constant: T) extends Group[T] {
override def zero = constant
override def negate(u : T) = constant
override def plus(l : T, r : T) = constant
override def sumOption(iter: TraversableOnce[T]): Option[T] =
if(iter.isEmpty) None
else Some(constant)
}

// Trivial group, but possibly useful to make a group of (Unit, T) for some T.
Expand Down
70 changes: 51 additions & 19 deletions algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,32 @@ limitations under the License.

package com.twitter.algebird

import scala.collection.BitSet

import java.nio.ByteBuffer

/** A super lightweight (hopefully) version of BitSet */
case class BitSetLite(in: Array[Byte]) {
def contains(x: Int): Boolean = {
/** Pretend 'in' is little endian so that the bitstring b0b1b2b3 is such that if b0 == 1, then
* 0 is in the bitset, if b1 == 1, then 1 is in the bitset.
*/
val arrayIdx = x/8
val remainder = x%8
((in(arrayIdx) >> (7 - remainder)) & 1) == 1
}
}

/** Implementation of the HyperLogLog approximate counting as a Monoid
* @link http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
*
* HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm
* Philippe Flajolet and Éric Fusy and Olivier Gandouet and Frédéric Meunier
*/
object HyperLogLog {

/* Size of the hash in bits */
val hashSize = 128

def hash(input : Array[Byte]) : Array[Byte] = {
val seed = 12345678
val (l0, l1) = MurmurHash128(seed)(input)
Expand Down Expand Up @@ -56,30 +71,47 @@ object HyperLogLog {

def twopow(i : Int) : Double = scala.math.pow(2.0, i)

def bytesToBitSet(in : Array[Byte]) : BitSet = {
BitSet(in.zipWithIndex.map { bi => (bi._1, bi._2 * 8) }
.flatMap { byteToIndicator(_) } : _*)
}
def byteToIndicator(bi : (Byte,Int)) : Seq[Int] = {
(0 to 7).flatMap { i =>
if (((bi._1 >> (7 - i)) & 1) == 1) {
Vector(bi._2 + i)
}
else {
Vector[Int]()
/** the value 'j' is equal to <w_0, w_1 ... w_(bits-1)>
* TODO: We could read in a byte at a time.
*/
def j(bsl: BitSetLite, bits: Int): Int = {
@annotation.tailrec
def loop(pos: Int, accum: Int): Int = {
if (pos >= bits) {
accum
} else if (bsl.contains(pos)) {
loop(pos + 1, accum + (1 << pos))
} else {
loop(pos + 1, accum)
}
}
loop(0, 0)
}

/** The value 'w' is equal to <w_bits ... w_n>. The function rho counts the number of leading
* zeroes in 'w'. We can calculate rho(w) at once with the method rhoW.
*/
def rhoW(bsl: BitSetLite, bits: Int): Byte = {
@annotation.tailrec
def loop(pos: Int, zeros: Int): Int =
if (bsl.contains(pos)) zeros
else loop(pos + 1, zeros + 1)
loop(bits, 1).toByte
}

// We are computing j and \rho(w) from the paper,
// sorry for the name, but it allows someone to compare to the paper
// extremely low probability rhow (position of the leftmost one bit) is > 127, so we use a Byte to store it
def jRhoW(in : Array[Byte], bits: Int) : (Int,Byte) = {
val onBits = HyperLogLog.bytesToBitSet(in)
(onBits.filter { _ < bits }.map { 1 << _ }.sum,
(onBits.filter { _ >= bits }.min - bits + 1).toByte)
/** We are computing j and \rho(w) from the paper,
* sorry for the name, but it allows someone to compare to the paper extremely low probability
* rhow (position of the leftmost one bit) is > 127, so we use a Byte to store it
* Given a hash <w_0, w_1, w_2 ... w_n> the value 'j' is equal to <w_0, w_1 ... w_(bits-1)> and
* the value 'w' is equal to <w_bits ... w_n>. The function rho counts the number of leading
* zeroes in 'w'. We can calculate rho(w) at once with the method rhoW.
*/
def jRhoW(in: Array[Byte], bits: Int): (Int, Byte) = {
val onBits = BitSetLite(in)
(j(onBits, bits), rhoW(onBits, bits))
}


def toBytes(h : HLL) : Array[Byte] = {
h match {
case SparseHLL(bits,maxRhow) =>
Expand Down
30 changes: 26 additions & 4 deletions algebird-core/src/main/scala/com/twitter/algebird/MapAlgebra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ limitations under the License.
*/
package com.twitter.algebird

import scala.annotation.tailrec
import scala.collection.{Map => ScMap}
import scala.collection.mutable.{Map => MMap}

Expand Down Expand Up @@ -45,6 +44,16 @@ abstract class GenericMapMonoid[K, V, M <: ScMap[K, V]](implicit val semigroup:
// Scala maps can reuse internal structure, so don't copy just add into the bigger one:
// This really saves computation when adding lots of small maps into big ones (common)
val (big, small, bigOnLeft) = if(x.size > y.size) { (x,y,true) } else { (y,x,false) }
small match {
// Mutable maps create new copies of the underlying data on add so don't use the
// handleImmutable method.
// Cannot have a None so 'get' is safe here.
case mmap: MMap[_,_] => sumOption(Seq(big, small)).get
case _ => handleImmutable(big, small, bigOnLeft)
}
}

private def handleImmutable(big: M, small:M, bigOnLeft: Boolean) = {
small.foldLeft(big) { (oldMap, kv) =>
val newV = big
.get(kv._1)
Expand All @@ -70,7 +79,10 @@ abstract class GenericMapMonoid[K, V, M <: ScMap[K, V]](implicit val semigroup:
val oldVOpt = mutable.get(k)
// sorry for the micro optimization here: avoiding a closure
val newV = if(oldVOpt.isEmpty) v else Semigroup.plus(oldVOpt.get, v)
mutable.update(k, newV)
if (nonZero(newV))
mutable.update(k, newV)
else
mutable.remove(k)
}
}
Some(fromMutable(mutable))
Expand All @@ -81,14 +93,24 @@ class MapMonoid[K,V](implicit semigroup: Semigroup[V]) extends GenericMapMonoid[
override lazy val zero = Map[K,V]()
override def add(oldMap: Map[K,V], kv: (K, V)) = oldMap + kv
override def remove(oldMap: Map[K,V], k: K) = oldMap - k
override def fromMutable(mut: MMap[K, V]) = Map(mut.toSeq: _*)
override def fromMutable(mut: MMap[K, V]): Map[K,V] = new MutableBackedMap(mut)
}

class ScMapMonoid[K,V](implicit semigroup: Semigroup[V]) extends GenericMapMonoid[K, V, ScMap[K,V]] {
override lazy val zero = ScMap[K,V]()
override def add(oldMap: ScMap[K,V], kv: (K, V)) = oldMap + kv
override def remove(oldMap: ScMap[K,V], k: K) = oldMap - k
override def fromMutable(mut: MMap[K, V]) = mut.toMap
override def fromMutable(mut: MMap[K, V]): ScMap[K,V] = new MutableBackedMap(mut)
}

private[this] class MutableBackedMap[K,V](val backingMap: MMap[K,V]) extends Map[K,V] {
def get(key: K) = backingMap.get(key)

def iterator = backingMap.iterator

def +[B1 >: V](kv: (K, B1)) = backingMap.toMap + kv

def -(key: K) = backingMap.toMap - key
}

/** You can think of this as a Sparse vector group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ limitations under the License.
*/
package com.twitter.algebird

import scala.annotation.tailrec
import scala.annotation.implicitNotFound
import scala.math.Equiv

import java.lang.{Integer => JInt, Short => JShort, Long => JLong, Float => JFloat, Double => JDouble, Boolean => JBool}
import java.util.{List => JList, Map => JMap}
import java.util.{List => JList}

/**
* Monoid (take a deep breath, and relax about the weird name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License.
package com.twitter.algebird

import java.lang.{Integer => JInt, Short => JShort, Long => JLong, Float => JFloat, Double => JDouble, Boolean => JBool}
import java.util.{List => JList, Map => JMap}

import scala.annotation.implicitNotFound
/**
Expand Down
Loading

0 comments on commit 5f26bc7

Please sign in to comment.