Skip to content

Commit

Permalink
scaladoc, type param order, Scala 2.13 compat
Browse files Browse the repository at this point in the history
  • Loading branch information
mjakubowski84 committed Sep 27, 2020
1 parent 64e1ea7 commit ce9e77c
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
target
spark-warehouse
project/.plugins.sbt.swp
project/project
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lazy val fs2ScalaVersions = Seq("2.12.12", "2.13.3")
ThisBuild / organization := "com.github.mjakubowski84"
ThisBuild / version := "1.5.0-SNAPSHOT"
ThisBuild / isSnapshot := true
ThisBuild / scalaVersion := "2.13.3"
ThisBuild / scalaVersion := "2.12.12"
ThisBuild / scalacOptions ++= Seq("-deprecation", "-target:jvm-1.8")
ThisBuild / javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-unchecked", "-deprecation", "-feature")
ThisBuild / resolvers := Seq(
Expand Down
2 changes: 1 addition & 1 deletion core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ libraryDependencies ++= Seq(
"com.chuusai" %% "shapeless" % "2.3.3",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % Provided,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.6",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.2.0",

// tests
"org.mockito" %% "mockito-scala-scalatest" % "1.13.11" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, Inspectors}

import scala.collection.compat._
import immutable.LazyList
import scala.util.Random

class FilteringByListSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll with Inspectors {
Expand Down Expand Up @@ -46,8 +48,8 @@ class FilteringByListSpec extends AnyFlatSpec with Matchers with BeforeAndAfterA
override def compare(x: Date, y: Date): Int = x.compareTo(y)
}

def data: Stream[Data] =
Stream.range(0, dataSize).map { i =>
def data: LazyList[Data] =
LazyList.range(0, dataSize).map { i =>
Data(
idx = i,
short = (i % Short.MaxValue).toShort,
Expand All @@ -65,7 +67,7 @@ class FilteringByListSpec extends AnyFlatSpec with Matchers with BeforeAndAfterA
)
}

def everyOtherDatum: Stream[Data] = data.filter(_.idx % 2 == 0)
def everyOtherDatum: LazyList[Data] = data.filter(_.idx % 2 == 0)

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -126,15 +128,15 @@ class FilteringByListSpec extends AnyFlatSpec with Matchers with BeforeAndAfterA
it should "filter data by a list of embedded values" in genericFilterTest("embedded.x", _.idx)

it should "filter data by a hard-coded list of values" in {
val filteredRecords = ParquetReader.read[Data](filePath, filter = Col("idx") in(1, 2, 3))
val filteredRecords = ParquetReader.read[Data](filePath, filter = Col("idx").in(1, 2, 3))
try {
filteredRecords.size should equal(3)
filteredRecords.map(_.idx) should contain allOf(1, 2, 3)
} finally filteredRecords.close()
}

it should "reject an empty set of keys" in {
a[IllegalArgumentException] should be thrownBy (Col("idx") in(Seq.empty:_*))
a[IllegalArgumentException] should be thrownBy Col("idx").in(Seq.empty:_*)
a[IllegalArgumentException] should be thrownBy (Col("idx") in Set.empty[Int])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, Inspectors}

import scala.util.Random
import scala.collection.compat._
import immutable.LazyList

class FilteringSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll with Inspectors {

Expand All @@ -36,8 +38,8 @@ class FilteringSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll wit
override def compare(x: LocalDate, y: LocalDate): Int = x.compareTo(y)
}

def data: Stream[Data] =
Stream.range(0, dataSize).map { i =>
def data: LazyList[Data] =
LazyList.range(0, dataSize).map { i =>
Data(
idx = i,
float = (BigDecimal("0.01") * BigDecimal(i)).toFloat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
import org.apache.parquet.io.api._
import org.apache.parquet.schema._

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._


private[parquet4s] class ParquetReadSupport extends ReadSupport[RowParquetRecord] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, MessageType, Type}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.MessageType
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FilterSpec extends AnyFlatSpec with Matchers {
}

it should "build in predicate with varargs" in {
val predicate = (Col("i") in(1, 2, 3)).toPredicate(valueCodecConfiguration)
val predicate = Col("i").in(1, 2, 3).toPredicate(valueCodecConfiguration)
predicate.toString should be("userdefinedbyinstance(i, in(1, 2, 3))")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,24 @@ class ParquetIterableSpec extends AnyFlatSpec with Matchers with IdiomaticMockit
val reader = mock[HadoopParquetReader[RowParquetRecord]]
reader.read() returns testRecord(1)

newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator.next should be(TestRow(1))
newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator.next() should be(TestRow(1))
}

it should "throw NoSuchElementException for empty resource" in {
val reader = mock[HadoopParquetReader[RowParquetRecord]]
reader.read() returns null

a[NoSuchElementException] should be thrownBy newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator.next
a[NoSuchElementException] should be thrownBy newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator.next()
}

it should "try to read record only once in case of sequential calls for missing record" in {
val reader = mock[HadoopParquetReader[RowParquetRecord]]
reader.read() returns null

val iterator = newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator
a[NoSuchElementException] should be thrownBy iterator.next
a[NoSuchElementException] should be thrownBy iterator.next
a[NoSuchElementException] should be thrownBy iterator.next
a[NoSuchElementException] should be thrownBy iterator.next()
a[NoSuchElementException] should be thrownBy iterator.next()
a[NoSuchElementException] should be thrownBy iterator.next()

reader.read() wasCalled once
}
Expand All @@ -122,10 +122,10 @@ class ParquetIterableSpec extends AnyFlatSpec with Matchers with IdiomaticMockit
reader.read() returns testRecord(1) andThen testRecord(2) andThen testRecord(3) andThen null

val iterator = newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator
iterator.next should be(TestRow(1))
iterator.next should be(TestRow(2))
iterator.next should be(TestRow(3))
a[NoSuchElementException] should be thrownBy iterator.next
iterator.next() should be(TestRow(1))
iterator.next() should be(TestRow(2))
iterator.next() should be(TestRow(3))
a[NoSuchElementException] should be thrownBy iterator.next()
}

it should "not call 'read' if 'hasNext' already did it (and throw exception)" in {
Expand All @@ -134,7 +134,7 @@ class ParquetIterableSpec extends AnyFlatSpec with Matchers with IdiomaticMockit

val iterator = newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator
iterator.hasNext should be(false)
a[NoSuchElementException] should be thrownBy iterator.next
a[NoSuchElementException] should be thrownBy iterator.next()

reader.read() wasCalled once
}
Expand All @@ -145,7 +145,7 @@ class ParquetIterableSpec extends AnyFlatSpec with Matchers with IdiomaticMockit

val iterator = newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator
iterator.hasNext should be(true)
iterator.next should be(TestRow(1))
iterator.next() should be(TestRow(1))

reader.read() wasCalled once
}
Expand All @@ -156,9 +156,9 @@ class ParquetIterableSpec extends AnyFlatSpec with Matchers with IdiomaticMockit

val iterator = newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator
iterator.hasNext should be(true)
iterator.next should be(TestRow(1))
iterator.next() should be(TestRow(1))
iterator.hasNext should be(false)
a[NoSuchElementException] should be thrownBy iterator.next
a[NoSuchElementException] should be thrownBy iterator.next()

reader.read() wasCalled twice
}
Expand All @@ -169,11 +169,11 @@ class ParquetIterableSpec extends AnyFlatSpec with Matchers with IdiomaticMockit

val iterator = newParquetIterable[TestRow](mockTestBuilder(reader), options).iterator
iterator.hasNext should be(true)
iterator.next should be(TestRow(1))
iterator.next() should be(TestRow(1))
iterator.hasNext should be(true)
iterator.next should be(TestRow(2))
iterator.next() should be(TestRow(2))
iterator.hasNext should be(false)
a[NoSuchElementException] should be thrownBy iterator.next
a[NoSuchElementException] should be thrownBy iterator.next()

reader.read() wasCalled 3.times
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object WriteAndReadFS2App extends IOApp {
_ <- Stream.range[IO](start = 0, stopExclusive = Count)
.map { i => Data(id = i, text = Random.nextString(4)) }
.through(writeSingleFile(blocker, path.resolve("data.parquet").toString))
.append(read[Data, IO](blocker, path.toString).showLinesStdOut.drain)
.append(read[IO, Data](blocker, path.toString).showLinesStdOut.drain)
} yield ()

stream.compile.drain.as(ExitCode.Success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ object WriteAndReadFilteredFS2App extends IOApp {
.map { i => Data(id = i, dict = Dict.random) }
.through(writeSingleFile(blocker, path.resolve("data.parquet").toString))
.append(Stream.eval_(IO(println("""dict == "A""""))))
.append(read[Data, IO](blocker, path.toString, filter = Col("dict") === Dict.A).showLinesStdOut.drain)
.append(read[IO, Data](blocker, path.toString, filter = Col("dict") === Dict.A).showLinesStdOut.drain)
.append(Stream.eval_(IO(println("""id >= 20 && id < 40"""))))
.append(read[Data, IO](blocker, path.toString, filter = Col("id") >= 20 && Col("id") < 40).showLinesStdOut.drain)
.append(read[IO, Data](blocker, path.toString, filter = Col("id") >= 20 && Col("id") < 40).showLinesStdOut.drain)
} yield ()

stream.compile.drain.as(ExitCode.Success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object WriteAndReadGenericFS2App extends IOApp {
path <- tempDirectoryStream[IO](blocker, dir = TmpPath)
_ <- Stream.iterable[IO, RowParquetRecord](users)
.through(writeSingleFile(blocker, path.resolve("data.parquet").toString))
.append(read[RowParquetRecord, IO](blocker, path.toString).showLinesStdOut.drain)
.append(read[IO, RowParquetRecord](blocker, path.toString).showLinesStdOut.drain)
} yield ()

stream.compile.drain.as(ExitCode.Success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Fs2ParquetItSpec extends AsyncFlatSpec with Matchers with Inspectors {
))

def read[T: ParquetRecordDecoder](blocker: Blocker, path: Path): Stream[IO, Vector[T]] =
parquet.read[T, IO](blocker, path.toString).fold(Vector.empty[T])(_ :+ _)
parquet.read[IO, T](blocker, path.toString).fold(Vector.empty[T])(_ :+ _)

def listParquetFiles(blocker: Blocker, path: Path): Stream[IO, Vector[Path]] =
directoryStream[IO](blocker, path)
Expand All @@ -71,7 +71,7 @@ class Fs2ParquetItSpec extends AsyncFlatSpec with Matchers with Inspectors {
def write(blocker: Blocker, path: Path): Stream[IO, Unit] =
Stream
.iterable(data)
.through(parquet.writeSingleFile[Data, IO](blocker, path.resolve(outputFileName).toString, writeOptions))
.through(parquet.writeSingleFile[IO, Data](blocker, path.resolve(outputFileName).toString, writeOptions))

val testStream =
for {
Expand Down Expand Up @@ -208,7 +208,7 @@ class Fs2ParquetItSpec extends AsyncFlatSpec with Matchers with Inspectors {

def read(blocker: Blocker, path: Path): Stream[IO, Map[String, Vector[Data]]] =
parquet
.read[DataTransformed, IO](blocker, path.toString)
.read[IO, DataTransformed](blocker, path.toString)
.map { case DataTransformed(i, s, partition) => Map(partition -> Vector(Data(i, s))) }
.reduceSemigroup

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,72 @@ package object parquet {
val DefaultMaxCount: Long = HadoopParquetWriter.DEFAULT_BLOCK_SIZE
val DefaultMaxDuration: FiniteDuration = FiniteDuration(1, TimeUnit.MINUTES)

def read[T: ParquetRecordDecoder, F[_]: Sync: ContextShift](blocker: Blocker,

/**
* Creates a [[fs2.Stream]] that reads Parquet data from the specified path.
* If there are multiple files at path then the order in which files are loaded is determined by underlying
* filesystem.
* <br/>
* Path can refer to local file, HDFS, AWS S3, Google Storage, Azure, etc.
* Please refer to Hadoop client documentation or your data provider in order to know how to configure the connection.
* <br/>
* Can read also <b>partitioned</b> directories. Filter applies also to partition values. Partition values are set
* as fields in read entities at path defined by partition name. Path can be a simple column name or a dot-separated
* path to nested field. Missing intermediate fields are automatically created for each read record.
* <br/>
* <br/>
*
* @param blocker used to perform blocking operations
* @param path URI to Parquet files, e.g.: {{{ "file:///data/users" }}}
* @param options configuration of how Parquet files should be read
* @param filter optional before-read filter; no filtering is applied by default; check [[Filter]] for more details
* @tparam F effect type
* @tparam T type of data that represent the schema of the Parquet data, e.g.:
* {{{ case class MyData(id: Long, name: String, created: java.sql.Timestamp) }}}
* @return The stream of Parquet data
*/
def read[F[_]: Sync: ContextShift, T: ParquetRecordDecoder](blocker: Blocker,
path: String,
options: ParquetReader.Options = ParquetReader.Options(),
filter: Filter = Filter.noopFilter
): Stream[F, T] =
reader.read(blocker, path, options, filter)


def writeSingleFile[T : ParquetRecordEncoder : ParquetSchemaResolver, F[_]: Sync: ContextShift](blocker: Blocker,
/**
* Creates a [[fs2.Pipe]] that writes Parquet data to single file at the specified path (including
* file name).
* <br/>
* Path can refer to local file, HDFS, AWS S3, Google Storage, Azure, etc.
* Please refer to Hadoop client documentation or your data provider in order to know how to configure the connection.
*
* @param path URI to Parquet files, e.g.: {{{ "file:///data/users/users-2019-01-01.parquet" }}}
* @param options set of options that define how Parquet files will be created
* @tparam F effect type
* @tparam T type of data that represent the schema of the Parquet data, e.g.:
* {{{ case class MyData(id: Long, name: String, created: java.sql.Timestamp) }}}
* @return The pipe that writes Parquet file
*/
def writeSingleFile[F[_]: Sync: ContextShift, T : ParquetRecordEncoder : ParquetSchemaResolver](blocker: Blocker,
path: String,
options: ParquetWriter.Options = ParquetWriter.Options()
): Pipe[F, T, Unit] =
writer.write(blocker, path, options)

/**
* Builds a [[fs2.Pipe]] that:
* <ol>
* <li>Is designed to write Parquet files indefinitely</li>
* <li>Is able to (optionally) partition data by a list of provided fields</li>
* <li>Flushes and rotates files after given number of rows is written or given time period elapses</li>
* <li>Outputs incoming message after it is written but can write an effect of provided message transformation.</li>
* </ol>
*
* @tparam F effect type
* @tparam T type of data that represent the schema of the Parquet data, e.g.:
* {{{ case class MyData(id: Long, name: String, created: java.sql.Timestamp) }}}
* @return
*/
def viaParquet[F[_], T]: Builder[F, T, T] =
BuilderImpl[F, T, T](
maxCount = DefaultMaxCount,
Expand Down Expand Up @@ -84,7 +136,7 @@ package object parquet {
*/
def withPreWriteTransformation[X](transformation: T => Stream[F, X]): Builder[F, T, X]
/**
* Builds final writer.
* Builds final writer pipe.
*/
def write(blocker: Blocker, basePath: String)(implicit
schemaResolver: SkippingParquetSchemaResolver[W],
Expand Down

0 comments on commit ce9e77c

Please sign in to comment.