From eb205db5b405a4f57cdd204c802d6c6efbfe4a5d Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 19 Sep 2024 15:43:56 -0400 Subject: [PATCH] Cleanup --- .../scala/magnolify/jmh/MagnolifyBench.scala | 106 +----------------- .../jmh/ParquetInMemoryPageStore.scala | 77 ------------- .../parquet/MagnolifyParquetProperties.scala | 7 -- 3 files changed, 2 insertions(+), 188 deletions(-) delete mode 100644 jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala diff --git a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala index 9935027c9..ccf745160 100644 --- a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala +++ b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala @@ -16,18 +16,13 @@ package magnolify.jmh -import magnolify.parquet.ParquetType.WriteSupport -import magnolify.parquet.{MagnolifyParquetProperties, ParquetType} - import java.util.concurrent.TimeUnit + import magnolify.scalacheck.auto._ import magnolify.test.Simple._ -import org.apache.hadoop.conf.Configuration import org.scalacheck._ import org.openjdk.jmh.annotations._ -import scala.jdk.CollectionConverters._ - object MagnolifyBench { val seed: rng.Seed = rng.Seed(0) val prms: Gen.Parameters = Gen.Parameters.default @@ -92,103 +87,6 @@ class AvroBench { @Benchmark def avroSchema: Schema = AvroType[Nested].schema } -@State(Scope.Benchmark) -class ParquetReadState(pt: ParquetType[Nested]) { - import org.apache.parquet.io._ - import org.apache.parquet.column.impl.ColumnWriteStoreV1 - import org.apache.parquet.column.ParquetProperties - import org.apache.parquet.hadoop.api.InitContext - - var reader: RecordReader[Nested] = null - - @Setup(Level.Invocation) - def setup(): Unit = { - // Write page - val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema) - val memPageStore = new ParquetInMemoryPageStore(1) - val columns = new ColumnWriteStoreV1( - pt.schema, - memPageStore, - ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build - ) - val writeSupport = pt.writeSupport - val recordWriter = columnIO.getRecordWriter(columns) - writeSupport.prepareForWrite(recordWriter) - writeSupport.write(MagnolifyBench.nested) - recordWriter.flush() - columns.flush() - - // Read and convert page - val conf = new Configuration() - val readSupport = pt.readSupport - reader = columnIO.getRecordReader( - memPageStore, - readSupport.prepareForRead( - conf, - new java.util.HashMap, - pt.schema, - readSupport.init(new InitContext(conf, new java.util.HashMap, pt.schema))) - ) - } -} - -@State(Scope.Benchmark) -class ParquetWriteState(pt: ParquetType[Nested]) { - import org.apache.parquet.io._ - import org.apache.parquet.column.impl.ColumnWriteStoreV1 - import org.apache.parquet.column.ParquetProperties - - var writer: WriteSupport[Nested] = null - - @Setup(Level.Invocation) - def setup(): Unit = { - val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema) - val memPageStore = new ParquetInMemoryPageStore(1) - val columns = new ColumnWriteStoreV1( - pt.schema, - memPageStore, - ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build - ) - val writeSupport = pt.writeSupport - val recordWriter = columnIO.getRecordWriter(columns) - writeSupport.prepareForWrite(recordWriter) - this.writer = writeSupport - } -} - -object ParquetStates { - def confWithGroupedArraysProp(propValue: Boolean): Configuration = { - val conf = new Configuration() - conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, propValue) - conf - } - class DefaultParquetReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(false))) - class DefaultParquetWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(false))) - - class ParquetAvroCompatReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(true))) - class ParquetAvroCompatWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(true))) -} - -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -class ParquetBench { - import MagnolifyBench._ - - @Benchmark def parquetWrite(state: ParquetStates.DefaultParquetWriteState): Unit = state.writer.write(nested) - @Benchmark def parquetRead(state: ParquetStates.DefaultParquetReadState): Nested = state.reader.read() -} - -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -class ParquetAvroCompatBench { - import MagnolifyBench._ - - @Benchmark def parquetWrite(state: ParquetStates.ParquetAvroCompatWriteState): Unit = state.writer.write(nested) - @Benchmark def parquetRead(state: ParquetStates.ParquetAvroCompatReadState): Nested = state.reader.read() -} - @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Thread) @@ -259,7 +157,7 @@ class ExampleBench { private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get private val example = exampleType.to(exampleNested).build() @Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested) - @Benchmark def exampleFrom: ExampleNested = exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap) + @Benchmark def exampleFrom: ExampleNested = exampleType.from(example) } // Collections are not supported diff --git a/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala b/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala deleted file mode 100644 index 5bb596d4b..000000000 --- a/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala +++ /dev/null @@ -1,77 +0,0 @@ -package magnolify.jmh - -import org.apache.parquet.bytes.{ByteBufferReleaser, BytesInput, HeapByteBufferAllocator} -import org.apache.parquet.column.{ColumnDescriptor, Encoding} -import org.apache.parquet.column.page._ -import org.apache.parquet.column.statistics._ - -import scala.collection.mutable - -/** - * An in-memory Parquet page store modeled after parquet-java's MemPageStore, used to benchmark - * ParquetType conversion between Parquet Groups and Scala case classes - */ -class ParquetInMemoryPageStore(rowCount: Long) extends PageReadStore with PageWriteStore { - lazy val writers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryWriter]() - lazy val readers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryReader]() - - override def getPageReader(path: ColumnDescriptor): PageReader = - readers.getOrElseUpdate(path, { - val writer = writers(path) - new ParquetInMemoryReader(writer.numValues, writer.pages.toList, writer.dictionaryPage) - }) - - override def getPageWriter(path: ColumnDescriptor): PageWriter = - writers.getOrElseUpdate(path, new ParquetInMemoryWriter()) - - override def getRowCount: Long = rowCount -} - -class ParquetInMemoryReader(valueCount: Long, pages: List[DataPage], dictionaryPage: DictionaryPage) extends PageReader { - lazy val pagesIt = pages.iterator - override def readDictionaryPage(): DictionaryPage = dictionaryPage - override def getTotalValueCount: Long = valueCount - override def readPage(): DataPage = pagesIt.next() -} - -class ParquetInMemoryWriter extends PageWriter { - var numRows = 0 - var numValues: Long = 0 - var memSize: Long = 0 - val pages = new mutable.ListBuffer[DataPage]() - var dictionaryPage: DictionaryPage = null - - override def writePage(bytesInput: BytesInput, valueCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { - writePage(bytesInput, valueCount, 1, statistics, rlEncoding, dlEncoding, valuesEncoding) - } - - override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], sizeStatistics: SizeStatistics, rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { - writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding) - } - - override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { - pages.addOne(new DataPageV1( - bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)), - valueCount, - bytesInput.size().toInt, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding)) - memSize += bytesInput.size() - numRows += rowCount - numValues += valueCount - } - - override def writePageV2(rowCount: Int, nullCount: Int, valueCount: Int, repetitionLevels: BytesInput, definitionLevels: BytesInput, dataEncoding: Encoding, data: BytesInput, statistics: Statistics[_]): Unit = ??? - - override def getMemSize: Long = memSize - - override def allocatedSize(): Long = memSize - - override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit = { - this.dictionaryPage = dictionaryPage - } - - override def memUsageString(prefix: String): String = s"$prefix $memSize bytes" -} diff --git a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala index 34a51a91d..a5123a47b 100644 --- a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala +++ b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala @@ -31,11 +31,4 @@ object MagnolifyParquetProperties { val ReadTypeKey = "parquet.type.read.type" val WriteTypeKey = "parquet.type.write.type" - - // Hash any Configuration values that might affect schema creation to use as part of Schema cache key - private[parquet] def hashValues(conf: Configuration): Int = - Option(conf.get(WriteGroupedArrays)) - .map(_.toBoolean) - .getOrElse(WriteGroupedArraysDefault) - .hashCode() }