Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Descriptive statistics #167

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ A tool for random data sampling and generation
- [ScalaCheck Generators](https://github.com/spotify/ratatool/tree/master/ratatool-scalacheck) - [ScalaCheck](http://scalacheck.org/) generators (`Gen[T]`) for property-based testing for [Avro](https://avro.apache.org/), [Protocol Buffers](https://developers.google.com/protocol-buffers/) and [BigQuery](https://cloud.google.com/bigquery/) [TableRow](https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableRow.html)
- [IO](https://github.com/spotify/ratatool/tree/master/ratatool-sampling/src/main/scala/com/spotify/ratatool/io) - utilities for reading and writing records in Avro, [Parquet](http://parquet.apache.org/) (via Avro GenericRecord), BigQuery and TableRow JSON files. Local file system, HDFS and [Google Cloud Storage](https://cloud.google.com/storage/) are supported.
- [Samplers](https://github.com/spotify/ratatool/tree/master/ratatool-sampling) - random data samplers for Avro, BigQuery and Parquet. True random sampling is supported for Avro only while head mode (sampling from the start) is supported for all sources.
- [Describe](https://github.com/spotify/ratatool/tree/master/ratatool-describe) - descriptive statistics for Avro.
- [Diffy](https://github.com/spotify/ratatool/tree/master/ratatool-diffy) - field-level record diff tool for Avro, Protobuf and BigQuery TableRow.
- [BigDiffy](https://github.com/spotify/ratatool/blob/master/ratatool-diffy) - [Scio](https://github.com/spotify/scio) library for pairwise field-level statistical diff of data sets. See [slides](http://www.lyh.me/slides/bigdiffy.html) for more.
- [Command line tool](https://github.com/spotify/ratatool/tree/master/ratatool-cli/src/main/scala/com/spotify/ratatool/tool) - command line tool for local sampler, or executing BigDiffy and BigSampler.
Expand Down Expand Up @@ -72,8 +73,18 @@ bin/ratatool bigDiffy --mode avro --key record.key \
--runner DataflowRunner ....
```

# Contribution

In order to prepare ratatool for release or manual testing run:

```bash
sbt package pack
```

Then your package will be available in `./ratatool-cli/target/pack/`.

# License

Copyright 2016-2018 Spotify AB.
Copyright 2016-2019 Spotify AB.

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
35 changes: 33 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ lazy val ratatoolCommon = project
"org.slf4j" % "slf4j-simple" % slf4jVersion,
"com.google.apis" % "google-api-services-bigquery" % bigqueryVersion % "provided",
"com.google.guava" % "guava" % guavaVersion
),
),
// In case of scalacheck failures print more info
testOptions in Test += Tests.Argument(TestFrameworks.ScalaCheck, "-verbosity", "3")
)
Expand Down Expand Up @@ -161,6 +161,35 @@ lazy val ratatoolDiffy = project
)
.settings(protoBufSettings)

lazy val ratatoolDescribe = project
.in(file("ratatool-describe"))
.settings(commonSettings)
.settings(
name := "ratatool-describe",
libraryDependencies ++= Seq(
"com.spotify" %% "scio-core" % scioVersion,
"com.spotify" %% "scio-avro" % scioVersion,
"com.spotify" %% "scio-test" % scioVersion % "test",
"org.apache.beam" % "beam-runners-direct-java" % beamVersion,
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
"com.twitter" %% "algebird-core" % algebirdVersion
),
libraryDependencies ++= Seq(
"io.circe" %% "circe-core",
"io.circe" %% "circe-generic",
"io.circe" %% "circe-parser"
).map(_ % "0.11.1"),
// In case of scalacheck failures print more info
testOptions in Test += Tests.Argument(TestFrameworks.ScalaCheck, "-verbosity", "3"),
parallelExecution in Test := false,
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)
)
.dependsOn(
ratatoolCommon % "compile->compile;test->test",
ratatoolSampling,
ratatoolScalacheck % "test"
)

lazy val ratatoolShapeless = project
.in(file("ratatool-shapeless"))
.settings(commonSettings)
Expand Down Expand Up @@ -216,7 +245,8 @@ lazy val ratatoolCli = project
.dependsOn(
ratatoolCommon % "compile->compile;test->test",
ratatoolSampling,
ratatoolDiffy
ratatoolDiffy,
ratatoolDescribe
)
.settings(protoBufSettings)

Expand Down Expand Up @@ -260,6 +290,7 @@ val root = project.in(file("."))
.aggregate(
ratatoolCommon,
ratatoolScalacheck,
ratatoolDescribe,
ratatoolDiffy,
ratatoolSampling,
ratatoolShapeless,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
package com.spotify.ratatool.tool

import com.spotify.ratatool.Command
import com.spotify.ratatool.describe.BigDescribe
import com.spotify.ratatool.diffy.BigDiffy
import com.spotify.ratatool.io.{AvroIO, BigQueryIO, TableRowJsonIO}
import com.spotify.ratatool.samplers.{AvroSampler, BigQuerySampler, BigSampler}

object Ratatool {
private def commandSet[T <: Command](xs: T*): Set[String] = xs.map(_.command).toSet
private val commands = commandSet(DirectSamplerParser, BigDiffy, BigSampler)
private val commands = commandSet(BigDescribe, DirectSamplerParser, BigDiffy, BigSampler)

//scalastyle:off cyclomatic.complexity
def main(args: Array[String]): Unit = {
val usage = """
| Ratatool - a tool for random data generation, sampling, and diff-ing
| Usage: ratatool [bigDiffy|bigSampler|directSampler] [args]
| Usage: ratatool [bigDescribe|bigDiffy|bigSampler|directSampler] [args]
""".stripMargin

if (args.isEmpty || !commands.contains(args.head)) {
Expand All @@ -39,6 +40,7 @@ object Ratatool {
}
else {
args.head match {
case BigDescribe.command => BigDescribe.run(args.tail)
case BigDiffy.command => BigDiffy.run(args.tail)
case BigSampler.command => BigSampler.run(args.tail)
case DirectSamplerParser.command =>
Expand Down
6 changes: 6 additions & 0 deletions ratatool-describe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Describer
=========

Descriptive statistics of your Avro datasets.

# BigDescriby
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.spotify.ratatool.describe

import com.spotify.ratatool.Command
import com.spotify.ratatool.io.AvroIO
import com.spotify.scio.avro._
import com.spotify.scio.{ContextAndArgs, ScioContext}
import io.circe.syntax._
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.options.PipelineOptions
import org.slf4j.LoggerFactory


object BigDescribe extends Command {
val command: String = "bigDescribe"

private val log = LoggerFactory.getLogger(BigDescribe.getClass)

private def usage(): Unit = {
// scalastyle:off regex line.size.limit
println(
s"""BigDescribe - descriptive statistics of big datasets
|Usage: ratatool $command [dataflow_options] [options]
|
| --input=<path> Input file path
| --output=<path> Output file path
| [--fields=<field1,field2,...>] An optional list of fields to describe, eg if not all types are supported
|
|Since this runs a Scio/Beam pipeline, Dataflow options will have to be provided. At a
|minimum, the following should be specified:
|
| --project=<gcp-project-id> GCP Project used to run your job
| --runner=DataflowRunner Executes the job on Google Cloud Dataflow
| --tempLocation=<gcs-path> Location for temporary files. GCS bucket must be created prior to running job.
|
|The following options are recommended, but may not be necessary.
|
| --serviceAccount=<your-service-account> Service account used on Dataflow workers. Useful to avoid permissions issues.
| --workerMachineType=<machine-type> Can be tweaked based on your specific needs, but is not necessary.
| --maxNumWorkers=<num-workers> Limits the number of workers (machines) used in the job to avoid using up quota.
|
|For more details regarding Dataflow options see here: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
""".stripMargin)
// scalastyle:on regex line.size.limit
sys.exit(1)
}

def main(cmdlineArgs: Array[String]): Unit = run(cmdlineArgs)

def run(argv: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(argv)
val (opts, _) = ScioContext.parseArguments[PipelineOptions](argv)
FileSystems.setDefaultPipelineOptions(opts)

val (inputArg, outputArg) = try {
(args("input"), args("output"))
} catch {
case e: Throwable =>
usage()
throw e
}

val schema = AvroIO.getAvroSchemaFromFile(inputArg)
val input = sc
.withName("Read Avro Input")
.avroFile[GenericRecord](inputArg, schema)

BigDescribeAvro
.pipeline(input)
.withName("to JSON")
.map(_.asJson.spaces2)
.saveAsTextFile(outputArg, suffix = ".json")

sc.close().waitUntilDone()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.spotify.ratatool.describe

import com.spotify.scio.values.SCollection
import com.twitter.algebird.{Semigroup => AlgebirdSemigroup}
import org.apache.avro.generic.GenericRecord

object BigDescribeAvro {

def pipeline(input: SCollection[GenericRecord]): SCollection[Record] = {
implicit val recordSemigroup: AlgebirdSemigroup[Record] =
AlgebirdSemigroup.from(Record.recordSemigroup.combine)

input
.withName("Map")
.map(Record.fromGenericRecord)
.withName("Reduce")
.sum
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.spotify.ratatool.describe

import cats.kernel.Semigroup
import io.circe.{Encoder, Json}

case class Count(total: Long, nulled: Long) {
val present: Long = total - nulled
val isNullable: Boolean = nulled > 0
}

object Count {
implicit val countSemigroup: Semigroup[Count] = new Semigroup[Count] {
override def combine(x: Count, y: Count): Count = {
Count(
x.total + y.total,
x.nulled + y.nulled
)
}
}

implicit val countEncoder: Encoder[Count] = new Encoder[Count] {

import io.circe.syntax._

override def apply(a: Count): Json = Json.obj(
"total" -> a.total.asJson,
"nullable" -> a.nulled.asJson,
"present" -> a.present.asJson
)
}

def fromOption[T](value: Option[T]): Count = {
val nullable = if (value.isEmpty) 1 else 0
Count(1, nullable)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.spotify.ratatool.describe

import cats.kernel.Semigroup
import io.circe.{Encoder, Json}

sealed trait Factor[+X]

object Factor {
val ReasonableThreshold: Int = 32

class FactorSemigroup[X] extends Semigroup[Factor[X]] {
override def combine(x: Factor[X], y: Factor[X]): Factor[X] = (x, y) match {
case (x: Discrete[X], y: Discrete[X]) =>
val values = x.values ++ y.values

if (values.size > x.threshold || values.size > y.threshold) {
Continuous
} else {
Discrete(
values,
math.min(x.threshold, y.threshold)
)
}
case _ => Continuous
}
}

class FactorEncoder[X: Encoder] extends Encoder[Factor[X]] {
import io.circe.syntax._

override def apply(a: Factor[X]): Json = a match {
case Discrete(values, threshold) => Json.obj(
"type" -> "discrete".asJson,
"values" -> values.asJson,
"threshold" -> threshold.asJson
)

case Continuous =>Json.obj(
"type" -> "continuous".asJson
)
}
}

def fromValue[X](value: X, threshold: Int = ReasonableThreshold): Factor[X] = {
Discrete(Set(value), threshold)
}
}

case class Discrete[X](
values: Set[X],
threshold: Int
) extends Factor[X]

case object Continuous extends Factor[Nothing]
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.spotify.ratatool.describe

import cats.kernel.Semigroup
import io.circe.{Encoder, Json}

case class MinMaxAvg(
min: Long,
max: Long,
total: Long,
count: Long
) {
val mean: Option[Double] = if (count == 0) None else Some(total / count)
}

object MinMaxAvg {
implicit val minMaxSemigroup: Semigroup[MinMaxAvg] = new Semigroup[MinMaxAvg] {
override def combine(x: MinMaxAvg, y: MinMaxAvg): MinMaxAvg = {
MinMaxAvg(
min = math.min(x.min, y.min),
max = math.max(x.max, y.max),
total = x.total + y.total,
count = x.count + y.count
)
}
}

implicit val encodeMinMaxAvg: Encoder[MinMaxAvg] = new Encoder[MinMaxAvg] {

import io.circe.syntax._

override def apply(a: MinMaxAvg): Json = Json.obj(
"min" -> a.min.asJson,
"max" -> a.max.asJson,
"total" -> a.total.asJson,
"count" -> a.count.asJson,
"mean" -> a.mean.asJson
)
}

val empty: MinMaxAvg = MinMaxAvg(Long.MinValue, Long.MaxValue, 0, 0)

def fromLong(value: Option[Long]): MinMaxAvg = value match {
case Some(x) =>
MinMaxAvg(
min = x,
max = x,
total = x,
count = 1)
case None =>
MinMaxAvg.empty
}
}
Loading