Skip to content

Commit

Permalink
Optimize Window monoid a bit (#652)
Browse files Browse the repository at this point in the history
* Optimize Window monoid a bit

* avoid takeRight
  • Loading branch information
johnynek authored Feb 22, 2018
1 parent b5cd424 commit 9ea7772
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 44 deletions.
105 changes: 72 additions & 33 deletions algebird-core/src/main/scala/com/twitter/algebird/Window.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package com.twitter.algebird

import scala.collection.immutable.Queue
import java.io.Serializable
import Operators._

/**
Expand All @@ -41,7 +42,7 @@ import Operators._
*
* implicit def w28Monoid[T](implicit p: Priority[Group[T], Monoid[T]]): Monoid[W28[T]] =
* new Monoid[W28[T]] {
* private val WT: Monoid[Window[T]] = WindowMonoid[T](windowSize)
* private val WT: Monoid[Window[T]] = Window.monoid[T](windowSize)
* def zero = W28[T](WT.zero)
* def plus(a: W28[T], b: W28[T]): W28[T] =
* W28[T](WT.plus(a.window, b.window))
Expand All @@ -57,34 +58,36 @@ import Operators._
*/

case class Window[T](total: T, items: Queue[T]) {
def size = this.items.size
def size: Int = items.size
}

object Window {
object Window extends Serializable {
def apply[T](v: T): Window[T] = Window[T](v, Queue[T](v))
def from[T](ts: Iterable[T])(implicit m: WindowMonoid[T]) = m.fromIterable(ts)
}

/**
* Provides a natural monoid for combining windows truncated to some window size.
*
* @param windowSize Upper limit of the number of items in a window.
*/


case class WindowMonoid[T](
windowSize: Int
)(implicit p: Priority[Group[T], Monoid[T]])
extends Monoid[Window[T]] {

require(windowSize >= 1, "Windows must have positive sizes")
def fromIterable[T](ts: Iterable[T])(implicit m: WindowMonoid[T]) = m.fromIterable(ts)

/**
* Build a monoid from either a group or a monoid
*/
def monoid[T](size: Int)(implicit p: Priority[Group[T], Monoid[T]]): WindowMonoid[T] =
p match {
case Priority.Preferred(grp) => monoidFromGroup(size)(grp)
case Priority.Fallback(mon) => monoidFromMonoid(size)(mon)
}

def zero = p.fold(g => Window[T](g.zero, Queue.empty[T]))(m => Window(m.zero, Queue.empty[T]))
/**
* This is the more efficient way to build a window
*/
def monoidFromGroup[T: Group](size: Int): WindowMonoid[T] =
WindowMonoidFromGroup[T](size)

def plus(a: Window[T], b: Window[T]): Window[T] =
p.fold(g => plusG(a, b)(g))(m => plusM(a, b)(m))
def monoidFromMonoid[T: Monoid](size: Int): WindowMonoid[T] =
WindowMonoidFromMonoid[T](size)

def plusG(a: Window[T], b: Window[T])(implicit g: Group[T]): Window[T] =
/**
* This is a faster way to combine two Windows if you
* have a group
*/
def combineWithGroup[T: Group](windowSize: Int, a: Window[T], b: Window[T]): Window[T] =
if (b.items.size >= windowSize) {
var total: T = b.total
var q = b.items
Expand All @@ -107,18 +110,30 @@ case class WindowMonoid[T](
Window(total, items)
}

def plusM(a: Window[T], b: Window[T])(implicit m: Monoid[T]): Window[T] =
def combineWithMonoid[T: Monoid](windowSize: Int, a: Window[T], b: Window[T]): Window[T] =
if (b.items.size >= windowSize) {
val items = b.items.takeRight(windowSize)
val total = m.sum(items)
val total = Monoid.sum(items)
Window(total, items)
} else {
// we need windowSize - b.items.size from `a`
val fromA = a.items.takeRight(windowSize - b.items.size)
val items = fromA ++ b.items
val total = m.sum(fromA) + b.total
val total = Monoid.sum(fromA) + b.total
Window(total, items)
}
}

/**
* Provides a natural monoid for combining windows truncated to some window size.
*
* @param windowSize Upper limit of the number of items in a window.
*/
abstract class WindowMonoid[T](windowSize: Int) extends Monoid[Window[T]] {
require(windowSize >= 1, s"Windows must have positive sizes, found $windowSize")

def monoid: Monoid[T]
val zero = Window(monoid.zero, Queue.empty)

override def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] =
if (ws.isEmpty) None
Expand All @@ -128,17 +143,41 @@ case class WindowMonoid[T](
while (it.hasNext) {
queue = (queue ++ it.next.items).takeRight(windowSize)
}
val monT: Monoid[T] = p.join
Some(Window(monT.sum(queue), queue))
Some(Window(monoid.sum(queue), queue))
}

def fromIterable(ts: Iterable[T]): Window[T] = {
if(ts.size == 0) zero
if (ts.isEmpty) zero
else {
val monT: Monoid[T] = p.join
val right = ts.toList.takeRight(windowSize)
val total = monT.sum(right)
Window(total, Queue(right: _*))
var queue = Queue.empty[T]
var size: Int = 0
val it = ts.iterator
while (it.hasNext) {
// avoid materializing the whole list in memory
// at one time
queue = queue :+ it.next
size = size + 1
if (size > windowSize) {
queue = queue.tail
size = size - 1
}
}
val total = monoid.sum(queue)
Window(total, queue)
}
}
}

final case class WindowMonoidFromMonoid[T](windowSize: Int)(implicit m: Monoid[T]) extends WindowMonoid[T](windowSize) {
def monoid: Monoid[T] = m

override def plus(a: Window[T], b: Window[T]): Window[T] =
Window.combineWithMonoid(windowSize, a, b)
}

final case class WindowMonoidFromGroup[T](windowSize: Int)(implicit val group: Group[T]) extends WindowMonoid[T](windowSize) {
def monoid: Monoid[T] = group

def plus(a: Window[T], b: Window[T]): Window[T] =
Window.combineWithGroup(windowSize, a, b)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,36 @@ import org.scalacheck.Prop.forAll

class WindowLaws extends CheckProperties {

implicit val mon = WindowMonoid[Int](5)
implicit val wGen = Arbitrary {
for (
v <- Gen.choose(-1000, 1000)
) yield (Window[Int](v))
}
implicit val mon = Window.monoid[Int](5)

implicit def wGen[A: Arbitrary](implicit wm: WindowMonoid[A]): Arbitrary[Window[A]] =
Arbitrary {
for {
as <- Gen.listOf(Arbitrary.arbitrary[A])
} yield Window.fromIterable(as)
}

property("Window obeys monoid laws") { monoidLaws[Window[Int]] }
property("Window obeys monoid laws using a group") { monoidLaws[Window[Int]] }
property("Window obeys monoid laws using a monoid") {
implicit val mon = Window.monoid[String](5)
monoidLaws[Window[String]]
}
}

class WindowTest extends CheckProperties {
property("We aggregate over only n items") {
forAll { (ts0: List[Int], pn: PosNum[Int]) =>
val n = pn.value
val ts = ts0.takeRight(n)
val mon = WindowMonoid[Int](n)
val mon = Window.monoid[Int](n)
mon.sum(ts0.map(Window(_))).total == ts.sum
}
}

property("We correctly create a window from iterable") {
forAll { (ts0: List[Int], pn: PosNum[Int]) =>
val n = pn.value
val mon = WindowMonoid[Int](n)
val mon = Window.monoid[Int](n)
val right = Queue(ts0.takeRight(n): _*)
val expected = Window(right.sum, right)
val got = mon.fromIterable(ts0)
Expand All @@ -45,7 +51,7 @@ class WindowTest extends CheckProperties {
property("We correctly combine windows") {
forAll { (left: List[Int], right: List[Int], pn: PosNum[Int]) =>
val n = pn.value
val mon = WindowMonoid[Int](n)
val mon = Window.monoid[Int](n)
val trunc = Queue((left ::: right).takeRight(n): _*)
val expected = Window(trunc.sum, trunc)
val got = mon.plus(mon.fromIterable(left), mon.fromIterable(right))
Expand All @@ -56,7 +62,7 @@ class WindowTest extends CheckProperties {
property("We correctly overrode sumOption") {
forAll { (ts0: List[Int], pn: PosNum[Int]) =>
val n = pn.value
val mon = WindowMonoid[Int](n)
val mon = Window.monoid[Int](n)
val got = mon.sumOption(ts0.map { Window(_) })
val trunc = Queue(ts0.takeRight(n): _*)
val expected = if (ts0.size == 0) None else Some(Window(trunc.sum, trunc))
Expand Down

0 comments on commit 9ea7772

Please sign in to comment.