Skip to content

Commit

Permalink
added conversions from timeSeriesRDD to distributed MLlib matrix data…
Browse files Browse the repository at this point in the history
… structures
  • Loading branch information
josepablocam authored and sryza committed Aug 24, 2015
1 parent a75a385 commit 65c0772
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
40 changes: 40 additions & 0 deletions src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkContext._
import org.apache.spark.{Partition, Partitioner, SparkContext, TaskContext}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.StatCounter

Expand Down Expand Up @@ -276,6 +278,44 @@ class TimeSeriesRDD(val index: DateTimeIndex, parent: RDD[(String, Vector[Double
}
}

/**
* Converts a TimeSeriesRDD into a distributed IndexedRowMatrix, useful to take advantage
* of Spark MLlib's statistic functions on matrices in a distributed fashion. This is only
* supported for cases with a uniform time series index
* @param nPartitions number of partitions, default to -1, which represents the same number
* as currently used for the TimeSeriesRDD
* @return an equivalent IndexedRowMatrix
*/
def toIndexedRowMatrix(nPartitions: Int = -1): IndexedRowMatrix = {
if (!index.isInstanceOf[UniformDateTimeIndex]) {
throw new UnsupportedOperationException("only supported for uniform indices")
}
// each record contains a value per time series, in original order
// and records are ordered by time
val unifIndex = index.asInstanceOf[UniformDateTimeIndex]
val instants = this.toInstants(nPartitions)
val start = unifIndex.first()
val rows = instants.map{ x =>
val rowIndex = unifIndex.frequency.difference(start, x._1)
val rowData = Vectors.dense(x._2.toArray)
IndexedRow(rowIndex, rowData)
}
new IndexedRowMatrix(rows)
}

/**
* Converts a TimeSeriesRDD into a distributed RowMatrix, note that indices in
* a RowMatrix are not significant, and thus this is a valid operation regardless
* of the type of time index
* @param nPartitions
* @return
*/
def toRowMatrix(nPartitions: Int = -1): RowMatrix = {
val instants = this.toInstants(nPartitions)
val rows = instants.map{ x => Vectors.dense(x._2.toArray) }
new RowMatrix(rows)
}

def compute(split: Partition, context: TaskContext): Iterator[(String, Vector[Double])] = {
parent.iterator(split, context)
}
Expand Down
36 changes: 36 additions & 0 deletions src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.cloudera.sparkts.DateTimeIndex._
import com.github.nscala_time.time.Imports._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.distributed.IndexedRow

import org.scalatest.{FunSuite, ShouldMatchers}

Expand Down Expand Up @@ -107,4 +108,39 @@ class TimeSeriesRDDSuite extends FunSuite with LocalSparkContext with ShouldMatc
new File(path).delete()
}
}

test("toIndexedRowMatrix") {
val conf = new SparkConf().setMaster("local").setAppName(getClass.getName)
TimeSeriesKryoRegistrator.registerKryoClasses(conf)
sc = new SparkContext(conf)
val seriesVecs = (0 until 20 by 4).map(
x => new DenseVector((x until x + 4).map(_.toDouble).toArray))
val labels = Array("a", "b", "c", "d", "e")
val start = new DateTime("2015-4-9")
val index = uniform(start, 4, 1.days)
val rdd = sc.parallelize(labels.zip(seriesVecs.map(_.asInstanceOf[Vector[Double]])), 3)
val tsRdd = new TimeSeriesRDD(index, rdd)
val indexedMatrix = tsRdd.toIndexedRowMatrix()
val (rowIndices, rowData) = indexedMatrix.rows.collect().map { case IndexedRow(ix, data) =>
(ix, data.toArray)
}.unzip
rowData.toArray should be ((0.0 to 3.0 by 1.0).map(x => (x until 20.0 by 4.0).toArray).toArray)
rowIndices.toArray should be (Array(0, 1, 2, 3))
}

test("toRowMatrix") {
val conf = new SparkConf().setMaster("local").setAppName(getClass.getName)
TimeSeriesKryoRegistrator.registerKryoClasses(conf)
sc = new SparkContext(conf)
val seriesVecs = (0 until 20 by 4).map(
x => new DenseVector((x until x + 4).map(_.toDouble).toArray))
val labels = Array("a", "b", "c", "d", "e")
val start = new DateTime("2015-4-9")
val index = uniform(start, 4, 1.days)
val rdd = sc.parallelize(labels.zip(seriesVecs.map(_.asInstanceOf[Vector[Double]])), 3)
val tsRdd = new TimeSeriesRDD(index, rdd)
val matrix = tsRdd.toRowMatrix()
val rowData = matrix.rows.collect().map(_.toArray)
rowData.toArray should be ((0.0 to 3.0 by 1.0).map(x => (x until 20.0 by 4.0).toArray).toArray)
}
}

0 comments on commit 65c0772

Please sign in to comment.