Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Segment Indices transformer #536

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/scala/com/spotify/featran/CanBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,8 @@ object CanBuild {
implicit def arrayCB[T: ClassTag]: CanBuild[T, Array] = new CanBuild[T, Array] {
override def apply(): mutable.Builder[T, Array[T]] = Array.newBuilder[T]
}

implicit def intArrayCB: CanBuild[Int, Array] = new CanBuild[Int, Array] {
override def apply(): mutable.Builder[Int, Array[Int]] = Array.newBuilder[Int]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ private class CrossingFeatureBuilder[F] private (
}
fb.add(names, values)
}

override def addInts[M[_]](names: Iterable[String], values: M[Int])(implicit
ev: M[Int] => Seq[Int]): Unit = {
if (xEnabled) {
val i = names.iterator
val j = values.iterator
while (i.hasNext && j.hasNext) {
xQueue.enqueue(CrossValue(i.next(), xOffset, j.next()))
xOffset += 1
}
}
fb.addInts(names, values)
}

override def skip(): Unit = {
xOffset += 1
fb.skip()
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/com/spotify/featran/FeatureBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object FeatureRejection {
case class Unseen(labels: Set[String]) extends FeatureRejection
case class WrongDimension(expected: Int, actual: Int) extends FeatureRejection
case class Outlier(actual: Double) extends FeatureRejection
case class InvalidInput(reason: String) extends FeatureRejection
case object Collision extends FeatureRejection
}

Expand Down Expand Up @@ -109,6 +110,16 @@ object FeatureRejection {
}
}

def addInts[M[_]](names: Iterable[String], values: M[Int])(implicit
ev: M[Int] => Seq[Int]
): Unit = {
val i = names.iterator
val j = values.iterator
while (i.hasNext && j.hasNext) {
add(i.next(), j.next())
}
}

/**
* Skip multiple feature values. The total number of values added and skipped should equal to
* dimension in [[init]].
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/com/spotify/featran/FlatConverter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import scala.annotation.implicitNotFound

def writeStrings(name: String): Option[Seq[String]] => IF

def writeIntArray(name: String): Option[Array[Int]] => IF

def writer: Seq[IF] => T
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/com/spotify/featran/FlatExtractor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import scala.annotation.implicitNotFound
def readString(name: String): T => Option[String]

def readStrings(name: String): T => Option[Seq[String]]

def readIntArray(name: String): T => Option[Array[Int]]
}

object FlatReader {
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/com/spotify/featran/FloatingPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.annotation.implicitNotFound

/** Type class for floating point primitives. */
@implicitNotFound("Could not find an instance of FloatingPoint for ${T}")
@typeclass trait FloatingPoint[@specialized(Float, Double) T] extends Serializable {
@typeclass trait FloatingPoint[@specialized(Float, Double, Int) T] extends Serializable {
def fromDouble(x: Double): T
}

Expand All @@ -34,6 +34,9 @@ object FloatingPoint {
implicit val doubleFP: FloatingPoint[Double] = new FloatingPoint[Double] {
override def fromDouble(x: Double): Double = x
}
implicit val intFP: FloatingPoint[Int] = new FloatingPoint[Int] {
override def fromDouble(x: Double): Int = x.toInt
}

/* ======================================================================== */
/* THE FOLLOWING CODE IS MANAGED BY SIMULACRUM; PLEASE DO NOT EDIT!!!! */
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/com/spotify/featran/json/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ private[featran] trait Implicits extends Serializable {

override def readStrings(name: String): String => Option[Seq[String]] =
toFeature[Seq[String]](name)

override def readIntArray(name: String): String => Option[Array[Int]] =
toFeature[Array[Int]](name)
}

implicit val jsonFlatWriter: FlatWriter[String] = new FlatWriter[String] {
Expand Down Expand Up @@ -138,6 +141,9 @@ private[featran] trait Implicits extends Serializable {
override def writeStrings(name: String): Option[Seq[String]] => (String, Option[Json]) =
(v: Option[Seq[String]]) => (name, v.map(_.asJson))

override def writeIntArray(name: String): Option[Array[Int]] => (String, Option[Json]) =
(v: Option[Array[Int]]) => (name, v.map(_.asJson))

override def writer: Seq[(String, Option[Json])] => String =
_.asJson.noSpaces
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object Normalizer extends SettingsBuilder {
new Normalizer(name, p, expectedLength)

/**
* Create a new [[OneHotEncoder]] from a settings object
* Create a new [[Normalizer]] from a settings object
* @param setting Settings object
*/
def fromSettings(setting: Settings): Transformer[Array[Double], Int, Int] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.spotify.featran.transformers

import com.spotify.featran.{FeatureBuilder, FeatureRejection, FlatReader, FlatWriter}
import com.twitter.algebird.Aggregator

object SegmentIndices extends SettingsBuilder {

/**
* Create a new [[SegmentIndices]] instance.
* @param expectedLength expected length of the input vectors, or 0 to infer from data
*/
def apply(
name: String,
expectedLength: Int = 0
): Transformer[Array[Int], Int, Int] =
new SegmentIndices(name, expectedLength)

/**
* Create a new [[SegmentIndices]] from a settings object
* @param setting Settings object
*/
def fromSettings(setting: Settings): Transformer[Array[Int], Int, Int] = {
val expectedLength = setting.params("expectedLength").toInt
SegmentIndices(setting.name, expectedLength)
}
}

private[featran] class SegmentIndices(name: String, expectedLength: Int = 0)
extends Transformer[Array[Int], Int, Int](name) {

override val aggregator: Aggregator[Array[Int], Int, Int] =
Aggregators.seqLength(expectedLength)
override def featureDimension(c: Int): Int = c
override def featureNames(c: Int): Seq[String] = names(c)

override def buildFeatures(a: Option[Array[Int]], c: Int, fb: FeatureBuilder[_]): Unit = a match {
case Some(x) if (x.length != c) =>
fb.skip(c)
fb.reject(this, FeatureRejection.WrongDimension(c, x.length))
case Some(x) if (!isMonotonic(x)) =>
fb.skip(c)
fb.reject(this, FeatureRejection.InvalidInput("Require an increasing sequence of numbers to use SegementIndices."))
case Some(x) =>
val (segmentedIndices, _) = x.zipWithIndex.foldLeft((Array.empty[Int], 0)){
case (_, (xElement, 0)) => (Array(0), xElement)
case ((segments, previousXElement), (xElement, index)) if (xElement == previousXElement) => (segments ++ Array(segments(index - 1) + 1), xElement)
case ((segments, _), (xElement, _)) => (segments ++ Array(0), xElement)
}

fb.addInts(names = names(c), values = segmentedIndices)
case None => fb.skip(c)
}

override def encodeAggregator(c: Int): String = c.toString
override def decodeAggregator(s: String): Int = s.toInt
override def params: Map[String, String] =
Map("expectedLength" -> expectedLength.toString)

override def flatRead[T: FlatReader]: T => Option[Any] = FlatReader[T].readIntArray(name)

override def flatWriter[T](implicit fw: FlatWriter[T]): Option[Array[Int]] => fw.IF = fw.writeIntArray(name)

private def isMonotonic(arr:Array[Int]): Boolean =
(arr, arr.drop(1)).zipped.forall (_ <= _)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.spotify.featran.transformers

import com.spotify.featran.FeatureSpec
import org.scalacheck.{Arbitrary, Prop}

object SegmentIndicesSpec extends TransformerProp("SegmentIndices") {

implicit lazy val randomIncreasingArray: Arbitrary[Array[Int]] = Arbitrary {
val increasingArray = Array.fill(10)(0)
for (index <- 1 until increasingArray.length) {
if (math.random() > 0.5) {
increasingArray(index) = increasingArray(index - 1) + 1
} else
increasingArray(index) = increasingArray(index - 1)
}
increasingArray
}

property("default") = Prop.forAll { (xs: List[Array[Int]]) =>

val segmentIndicesSpec = FeatureSpec
.of[Array[Int]]
.required(identity)(SegmentIndices("segmented"))

val expected = xs.map { testCase =>
testCase.groupBy(identity).toSeq.sortBy(_._1).flatMap { case (_, sameNumber) =>
sameNumber.indices.toList }
}

val result = segmentIndicesSpec.extract(xs)
.featureValues[Array[Int]]
.map(_.toSeq)

Prop.all(result == expected)
}
}
11 changes: 11 additions & 0 deletions examples/src/main/scala/Examples.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ object Examples {
// Extract from a single record
recordExtractor.featureResult(recordGen.sample.get)

// This example applies the SegmentIndices transformer, which requires an increasing array of integers.
val segmentIndicesSpec = FeatureSpec
.of[Array[Int]]
.required(identity)(SegmentIndices("segmented"))

val f3: FeatureExtractor[List, Array[Int]] = segmentIndicesSpec.extract(List(Array(0,0,1,1,2,2)))

// Extract feature names and values as `Array[Int]` similar to other examples
println(f3.featureNames.head)
f3.featureValues[Array[Int]].foreach(println)

// # Extraction with Scio

// Create input `SCollection[Record]`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ package object tensorflow {
def fromStrings(xs: Seq[String]): tf.Feature.Builder =
fromByteStrings(xs.map(ByteString.copyFromUtf8))

def toInts(f: tf.Feature): Seq[Int] = toFloats(f).map(_.toInt)

def fromInts(xs: Seq[Int]): tf.Feature.Builder = fromFloats(xs.map(_.toFloat))
}

/** [[FeatureBuilder]] for output as TensorFlow `Example` type. */
Expand Down Expand Up @@ -147,6 +150,8 @@ package object tensorflow {

def readStrings(name: String): Example => Option[Seq[String]] =
(ex: Example) => toFeature(name, ex).map(v => toStrings(v))

def readIntArray(name: String): Example => Option[Array[Int]] = (ex: Example) => toFeature(name, ex).map(v => toInts(v).toArray)
}

implicit val exampleFlatWriter: FlatWriter[Example] = new FlatWriter[tf.Example] {
Expand Down Expand Up @@ -196,6 +201,11 @@ package object tensorflow {
v.toList.flatMap(values => List(NamedTFFeature(name, fromStrings(values).build())))
}

override def writeIntArray(name: String): Option[Array[Int]] => List[NamedTFFeature] =
(v: Option[Array[Int]]) => {
v.toList.flatMap(values => List(NamedTFFeature(name, fromInts(values).build())))
}

override def writer: Seq[List[NamedTFFeature]] => Example =
(fns: Seq[List[NamedTFFeature]]) => {
val builder = Features.newBuilder()
Expand Down