diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/Codename/ParquetTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/Codename/ParquetTest.php index 09e3e2378..a3896c074 100644 --- a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/Codename/ParquetTest.php +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/Codename/ParquetTest.php @@ -12,8 +12,6 @@ use Flow\ETL\Flow; use Flow\ETL\Row; use Flow\ETL\Rows; -use Flow\Parquet\Option; -use Flow\Parquet\Options; use PHPUnit\Framework\TestCase; final class ParquetTest extends TestCase @@ -121,12 +119,7 @@ public function test_writing_and_reading_parquet_with_all_supported_types() : vo $rows, (new Flow()) ->read( - Parquet::from( - $path, - options: (new Options()) - ->set(Option::BYTE_ARRAY_TO_STRING) - ->set(Option::INT_96_AS_DATETIME) - ) + Parquet::from($path) ) ->fetch() ); @@ -172,9 +165,6 @@ public function test_writing_safe_and_reading_parquet_with_all_supported_types() (new Flow()) ->read(Parquet::from( $paths, - options: (new Options()) - ->set(Option::BYTE_ARRAY_TO_STRING) - ->set(Option::INT_96_AS_DATETIME) )) ->sortBy(ref('integer')) ->fetch() diff --git a/src/lib/dremel/src/Flow/Dremel/DataShredded.php b/src/lib/dremel/src/Flow/Dremel/DataShredded.php index d0085a908..12668ccdc 100644 --- a/src/lib/dremel/src/Flow/Dremel/DataShredded.php +++ b/src/lib/dremel/src/Flow/Dremel/DataShredded.php @@ -2,8 +2,6 @@ namespace Flow\Dremel; -use Flow\Parquet\Exception\RuntimeException; - final class DataShredded { public function __construct( @@ -12,21 +10,4 @@ public function __construct( public readonly array $values ) { } - - public function indices(array $dictionary) : array - { - $indices = []; - - foreach ($this->values as $value) { - $index = \array_search($value, $dictionary, true); - - if (!\is_int($index)) { - throw new RuntimeException('Value "' . $value . '" not found in dictionary'); - } - - $indices[] = $index; - } - - return $indices; - } } diff --git a/src/lib/parquet/src/Flow/Parquet/Consts.php b/src/lib/parquet/src/Flow/Parquet/Consts.php index cf4f3a6e3..3240c1f43 100644 --- a/src/lib/parquet/src/Flow/Parquet/Consts.php +++ b/src/lib/parquet/src/Flow/Parquet/Consts.php @@ -4,6 +4,12 @@ final class Consts { + public const GB_SIZE = 1073741824; + + public const KB_SIZE = 1024; + + public const MB_SIZE = 1048576; + public const PHP_INT32_MAX = 2147483647; public const PHP_INT64_MAX = 9223372036854775807; diff --git a/src/lib/parquet/src/Flow/Parquet/Option.php b/src/lib/parquet/src/Flow/Parquet/Option.php index 8650a5a59..3fd4ca636 100644 --- a/src/lib/parquet/src/Flow/Parquet/Option.php +++ b/src/lib/parquet/src/Flow/Parquet/Option.php @@ -7,9 +7,28 @@ enum Option /** * Some parquet writers might not properly use LogicalTyp for storing Strings or JSON's. * This option would tell the reader to treat all BYTE_ARRAY's as UTF-8 strings. + * + * Default value is true; */ case BYTE_ARRAY_TO_STRING; + /** + * Whenever cardinality ration of the dictionary goes below this value, PagesBuilders is going to fallback to PLAIN encoding. + * Cardinality ration is calculated as distinct values / total values. + * Please notice that even when cardinality ration is above this value, PageBuilder will still fallback to PLAIN encoding + * when dictionary size gets above DICTIONARY_PAGE_SIZE. + * + * Default value 0.4 (40% of the total values is distinct) + */ + case DICTIONARY_PAGE_MIN_CARDINALITY_RATION; + + /** + * Whenever size of the dictionary goes above this value, PagesBuilders is going to fallback to PLAIN encoding. + * + * Default value is 1Mb + */ + case DICTIONARY_PAGE_SIZE; + /** * When this option is set to true, reader will try to convert INT96 logical type to DateTimeImmutable object. * Some parquet writers due to historical reasons might still use INT96 to store timestamps with nanoseconds precision @@ -19,6 +38,8 @@ enum Option * * INT96 in general is not supported anymore, this option should be set to true by default, otherwise it will * return array of bytes (12) that represents INT96. + * + * Default value is true */ case INT_96_AS_DATETIME; @@ -27,6 +48,8 @@ enum Option * PageBuilder is not going to make it precisely equal to this value, but it will try to make it as close as possible. * This should be considered as a threshold rather than a strict value. * + * Default value is 128Mb + * * https://parquet.apache.org/docs/file-format/configurations/#data-page--size */ case PAGE_SIZE_BYTES; @@ -34,6 +57,8 @@ enum Option /** * Since PHP does not support nanoseconds precision for DateTime objects, when this options is set to true, * reader will round nanoseconds to microseconds. + * + * Default value is false */ case ROUND_NANOSECONDS; @@ -41,6 +66,8 @@ enum Option * RowGroupBuilder is going to use this value to determine for how long it should keep adding rows to the buffer * before flushing it on disk. * + * Default value is 8Kb + * * https://parquet.apache.org/docs/file-format/configurations/#row-group-size */ case ROW_GROUP_SIZE_BYTES; diff --git a/src/lib/parquet/src/Flow/Parquet/Options.php b/src/lib/parquet/src/Flow/Parquet/Options.php index 609420ac0..4567ad1cc 100644 --- a/src/lib/parquet/src/Flow/Parquet/Options.php +++ b/src/lib/parquet/src/Flow/Parquet/Options.php @@ -5,7 +5,7 @@ final class Options { /** - * @var array + * @var array */ private array $options; @@ -15,8 +15,10 @@ public function __construct() Option::BYTE_ARRAY_TO_STRING->name => true, Option::ROUND_NANOSECONDS->name => false, Option::INT_96_AS_DATETIME->name => true, - Option::PAGE_SIZE_BYTES->name => 1024 * 8, - Option::ROW_GROUP_SIZE_BYTES->name => 1024 * 1024 * 128, + Option::PAGE_SIZE_BYTES->name => Consts::KB_SIZE * 8, + Option::ROW_GROUP_SIZE_BYTES->name => Consts::MB_SIZE * 128, + Option::DICTIONARY_PAGE_SIZE->name => Consts::MB_SIZE, + Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION->name => 0.4, ]; } @@ -25,12 +27,12 @@ public static function default() : self return new self; } - public function get(Option $option) : bool|int + public function get(Option $option) : bool|int|float { return $this->options[$option->name]; } - public function set(Option $option, bool|int $value = true) : self + public function set(Option $option, bool|int|float $value) : self { $this->options[$option->name] = $value; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php index b48ed7556..b0db6fc15 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php @@ -107,7 +107,7 @@ private function createColumnChunkBuilders(Schema $schema) : array $builders = []; foreach ($schema->columnsFlat() as $column) { - $builders[$column->flatPath()] = new ColumnChunkBuilder($column, $this->dataConverter, $this->calculator); + $builders[$column->flatPath()] = new ColumnChunkBuilder($column, $this->dataConverter, $this->calculator, $this->options); } return $builders; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php index 507c98bdc..bb223920e 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php @@ -3,6 +3,7 @@ namespace Flow\Parquet\ParquetFile\RowGroupBuilder; use Flow\Parquet\Data\DataConverter; +use Flow\Parquet\Options; use Flow\Parquet\ParquetFile\Compressions; use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk; use Flow\Parquet\ParquetFile\Schema\FlatColumn; @@ -16,7 +17,8 @@ final class ColumnChunkBuilder public function __construct( private readonly FlatColumn $column, private readonly DataConverter $dataConverter, - private readonly PageSizeCalculator $calculator + private readonly PageSizeCalculator $calculator, + private readonly Options $options ) { $this->statistics = new ColumnChunkStatistics($column); } @@ -29,7 +31,7 @@ public function addRow(mixed $row) : void public function flush(int $fileOffset) : ColumnChunkContainer { - $pageContainers = (new PagesBuilder($this->dataConverter, $this->calculator))->build($this->column, $this->rows, $this->statistics); + $pageContainers = (new PagesBuilder($this->dataConverter, $this->calculator, $this->options))->build($this->column, $this->rows, $this->statistics); $this->statistics->reset(); @@ -45,7 +47,7 @@ public function flush(int $fileOffset) : ColumnChunkContainer totalCompressedSize: $pageContainers->size(), totalUncompressedSize: $pageContainers->size(), dictionaryPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset : null, - dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->size() : $fileOffset, + dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->totalSize() : $fileOffset, indexPageOffset: null, ) ); diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkStatistics.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkStatistics.php index 1a1146448..d3d57c2b1 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkStatistics.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkStatistics.php @@ -9,18 +9,17 @@ final class ColumnChunkStatistics { - private int $distinctCount; - private int $nullCount; private int $totalStringLength; + private array $values = []; + private int $valuesCount; public function __construct(private readonly FlatColumn $column) { $this->nullCount = 0; - $this->distinctCount = 0; $this->valuesCount = 0; $this->totalStringLength = 0; } @@ -39,6 +38,14 @@ public function add(string|int|float|null|array|bool|object $value) : void return; } + if (\is_array($value)) { + foreach ($value as $val) { + $this->values[] = \is_object($val) ? \serialize($val) : $val; + } + } else { + $this->values[] = \is_object($value) ? \serialize($value) : $value; + } + if ((\is_string($value) || \is_array($value)) && ColumnPrimitiveType::isString($this->column)) { if (\is_string($value)) { $this->totalStringLength += \strlen($value); @@ -59,9 +66,14 @@ public function avgStringLength() : int return (int) \ceil($this->totalStringLength / $this->notNullCount()); } + public function cardinalityRation() : float + { + return \round($this->distinctCount() / $this->notNullCount(), 2); + } + public function distinctCount() : int { - return $this->distinctCount; + return \count(\array_unique($this->values)); } public function notNullCount() : int @@ -77,9 +89,9 @@ public function nullCount() : int public function reset() : void { $this->nullCount = 0; - $this->distinctCount = 0; $this->valuesCount = 0; $this->totalStringLength = 0; + $this->values = []; } public function totalStringLength() : int diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder.php deleted file mode 100644 index f474721e4..000000000 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder.php +++ /dev/null @@ -1,10 +0,0 @@ -shred($rows, $column->maxDefinitionsLevel()); @@ -41,8 +39,8 @@ public function build(FlatColumn $column, array $rows) : PageContainer $pageWriter->append((new RLEBitPackedPacker($rleBitPackedHybrid))->pack($shredded->definitions)); } - if ($this->dictionary) { - $pageWriter->append((new RLEBitPackedPacker($rleBitPackedHybrid))->packWithBitWidth($shredded->indices($this->dictionary))); + if ($dictionary && $indices) { + $pageWriter->append((new RLEBitPackedPacker($rleBitPackedHybrid))->packWithBitWidth($indices)); } else { $pageWriter->append((new PlainValuesPacker($this->dataConverter))->packValues($column, $shredded->values)); } @@ -52,8 +50,8 @@ public function build(FlatColumn $column, array $rows) : PageContainer \strlen($pageBuffer), \strlen($pageBuffer), dataPageHeader: new DataPageHeader( - $this->dictionary ? Encodings::PLAIN_DICTIONARY : Encodings::PLAIN, - \count($shredded->values), + $dictionary && $indices ? Encodings::RLE_DICTIONARY : Encodings::PLAIN, + $dictionary && $indices ? \count($indices) : \count($shredded->values), ), dataPageHeaderV2: null, dictionaryPageHeader: null, @@ -64,6 +62,7 @@ public function build(FlatColumn $column, array $rows) : PageContainer $pageHeaderBuffer->getBuffer(), $pageBuffer, $shredded->values, + null, $pageHeader ); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/Dictionary.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/Dictionary.php new file mode 100644 index 000000000..9dacafc54 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/Dictionary.php @@ -0,0 +1,16 @@ + $dictionary + * @param array $indices + */ + public function __construct( + public readonly array $dictionary, + public readonly array $indices, + ) { + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder.php new file mode 100644 index 000000000..984a28393 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder.php @@ -0,0 +1,54 @@ +type()) { + case PhysicalType::INT64: + case PhysicalType::INT32: + switch ($column->logicalType()?->name()) { + case LogicalType::DATE: + case LogicalType::TIME: + case LogicalType::TIMESTAMP: + return (new ObjectDictionaryBuilder())->build($rows); + } + + return (new ScalarDictionaryBuilder())->build($rows); + case PhysicalType::BOOLEAN: + return (new ScalarDictionaryBuilder())->build($rows); + case PhysicalType::FLOAT: + case PhysicalType::DOUBLE: + return (new FloatDictionaryBuilder())->build($rows); + case PhysicalType::BYTE_ARRAY: + switch ($column->logicalType()?->name()) { + case LogicalType::STRING: + case LogicalType::JSON: + case LogicalType::BSON: + case LogicalType::UUID: + case LogicalType::ENUM: + return (new ScalarDictionaryBuilder())->build($rows); + case LogicalType::DECIMAL: + return (new FloatDictionaryBuilder())->build($rows); + case LogicalType::DATE: + case LogicalType::TIME: + case LogicalType::TIMESTAMP: + return (new ObjectDictionaryBuilder())->build($rows); + } + + throw new \RuntimeException('Building dictionary for "' . $column->logicalType()?->name() . '" is not supported'); + + default: + throw new \RuntimeException('Building dictionary for "' . $column->type()->name . '" is not supported'); + } + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/FloatDictionaryBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/FloatDictionaryBuilder.php new file mode 100644 index 000000000..e7bdb30a3 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/FloatDictionaryBuilder.php @@ -0,0 +1,35 @@ + $value) { + $dictionary[$index] = @\unserialize($value, ['allowed_classes' => []]); + } + + return new Dictionary($dictionary, $indices); + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ObjectDictionaryBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ObjectDictionaryBuilder.php new file mode 100644 index 000000000..711050830 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ObjectDictionaryBuilder.php @@ -0,0 +1,35 @@ + $value) { + $dictionary[$index] = @\unserialize($value, ['allowed_classes' => [\DateTimeImmutable::class, \DateInterval::class]]); + } + + return new Dictionary($dictionary, $indices); + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ScalarDictionaryBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ScalarDictionaryBuilder.php new file mode 100644 index 000000000..c76abc84b --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ScalarDictionaryBuilder.php @@ -0,0 +1,33 @@ + $value) { + $dictionary[$index] = $value; + } + + return new Dictionary($dictionary, $indices); + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryPageBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryPageBuilder.php index 6bcfe0c53..28befd020 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryPageBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryPageBuilder.php @@ -2,20 +2,18 @@ namespace Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder; -use function Flow\Parquet\array_flatten; use Flow\Parquet\BinaryWriter\BinaryBufferWriter; use Flow\Parquet\Data\DataConverter; use Flow\Parquet\ParquetFile\Encodings; use Flow\Parquet\ParquetFile\Page\Header\DictionaryPageHeader; use Flow\Parquet\ParquetFile\Page\Header\Type; use Flow\Parquet\ParquetFile\Page\PageHeader; -use Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder; use Flow\Parquet\ParquetFile\RowGroupBuilder\PageContainer; use Flow\Parquet\ParquetFile\Schema\FlatColumn; use Thrift\Protocol\TCompactProtocol; use Thrift\Transport\TMemoryBuffer; -final class DictionaryPageBuilder implements PageBuilder +final class DictionaryPageBuilder { public function __construct(private readonly DataConverter $dataConverter) { @@ -23,18 +21,11 @@ public function __construct(private readonly DataConverter $dataConverter) public function build(FlatColumn $column, array $rows) : PageContainer { - $dictionary = []; - - foreach (array_flatten($rows) as $value) { - if (!\array_key_exists($value, $dictionary)) { - $dictionary[$value] = $value; - } - } - $dictionary = \array_values($dictionary); + $dictionary = (new DictionaryBuilder())->build($column, $rows); $pageBuffer = ''; $pageWriter = new BinaryBufferWriter($pageBuffer); - $pageWriter->append((new PlainValuesPacker($this->dataConverter))->packValues($column, $dictionary)); + $pageWriter->append((new PlainValuesPacker($this->dataConverter))->packValues($column, $dictionary->dictionary)); $pageHeader = new PageHeader( Type::DICTIONARY_PAGE, @@ -44,7 +35,7 @@ public function build(FlatColumn $column, array $rows) : PageContainer dataPageHeaderV2: null, dictionaryPageHeader: new DictionaryPageHeader( Encodings::PLAIN, - \count($dictionary) + \count($dictionary->dictionary) ), ); $pageHeader->toThrift()->write(new TCompactProtocol($pageHeaderBuffer = new TMemoryBuffer())); @@ -52,7 +43,8 @@ public function build(FlatColumn $column, array $rows) : PageContainer return new PageContainer( $pageHeaderBuffer->getBuffer(), $pageBuffer, - $dictionary, + $dictionary->indices, + $dictionary->dictionary, $pageHeader ); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainer.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainer.php index c2db97412..28d097b6e 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainer.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainer.php @@ -6,16 +6,34 @@ final class PageContainer { + /** + * @param string $pageHeaderBuffer + * @param string $pageBuffer + * @param array $values - when dictionary is present values are indices + * @param null|array $dictionary + * @param PageHeader $pageHeader + */ public function __construct( public readonly string $pageHeaderBuffer, public readonly string $pageBuffer, public readonly array $values, + public readonly ?array $dictionary, public readonly PageHeader $pageHeader ) { } - public function size() : int + public function dataSize() : int { - return \strlen($this->pageHeaderBuffer) + \strlen($this->pageBuffer); + return \strlen($this->pageBuffer); + } + + public function headerSize() : int + { + return \strlen($this->pageHeaderBuffer); + } + + public function totalSize() : int + { + return $this->headerSize() + $this->dataSize(); } } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainers.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainers.php index ce40a56ad..c47076346 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainers.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainers.php @@ -87,11 +87,11 @@ public function size() : int $size = 0; if ($this->dictionaryPageContainer) { - $size += $this->dictionaryPageContainer->size(); + $size += $this->dictionaryPageContainer->totalSize(); } foreach ($this->dataPageContainers as $pageContainer) { - $size += $pageContainer->size(); + $size += $pageContainer->totalSize(); } return $size; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PagesBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PagesBuilder.php index b42356b0e..72b0f281f 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PagesBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PagesBuilder.php @@ -3,16 +3,18 @@ namespace Flow\Parquet\ParquetFile\RowGroupBuilder; use Flow\Parquet\Data\DataConverter; +use Flow\Parquet\Option; +use Flow\Parquet\Options; use Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder\DataPageBuilder; use Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder\DictionaryPageBuilder; -use Flow\Parquet\ParquetFile\Schema\ColumnPrimitiveType; use Flow\Parquet\ParquetFile\Schema\FlatColumn; final class PagesBuilder { public function __construct( private readonly DataConverter $dataConverter, - private readonly PageSizeCalculator $pageSizeCalculator + private readonly PageSizeCalculator $pageSizeCalculator, + private readonly Options $options ) { } @@ -20,17 +22,19 @@ public function build(FlatColumn $column, array $rows, ColumnChunkStatistics $st { $containers = new PageContainers(); - if (ColumnPrimitiveType::isString($column)) { + if ($statistics->cardinalityRation() <= $this->options->get(Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION)) { $dictionaryPageContainer = (new DictionaryPageBuilder($this->dataConverter))->build($column, $rows); - $containers->add($dictionaryPageContainer); + if ($dictionaryPageContainer->dataSize() <= $this->options->get(Option::DICTIONARY_PAGE_SIZE)) { + $containers->add($dictionaryPageContainer); - /* @phpstan-ignore-next-line */ - foreach (\array_chunk($rows, $this->pageSizeCalculator->rowsPerPage($column, $statistics)) as $rowsChunk) { - $containers->add((new DataPageBuilder($this->dataConverter, $dictionaryPageContainer->values))->build($column, $rowsChunk)); - } + $containers->add( + (new DataPageBuilder($this->dataConverter))->build($column, $rows, $dictionaryPageContainer->dictionary, $dictionaryPageContainer->values) + ); - return $containers; + return $containers; + } + $dictionaryPageContainer = null; } /* @phpstan-ignore-next-line */ diff --git a/src/lib/parquet/src/Flow/Parquet/Writer.php b/src/lib/parquet/src/Flow/Parquet/Writer.php index accb49be5..39dc2b108 100644 --- a/src/lib/parquet/src/Flow/Parquet/Writer.php +++ b/src/lib/parquet/src/Flow/Parquet/Writer.php @@ -47,12 +47,10 @@ public function write(string $path, Schema $schema, iterable $rows) : void foreach ($rows as $row) { $this->rowGroupBuilder($schema)->addRow($row); - if ($this->rowGroupBuilder($schema)->isFull()) { - $rowGroupContainer = $this->rowGroupBuilder($schema)->flush($fileOffset); - \fwrite($stream, $rowGroupContainer->binaryBuffer); - $metadata->rowGroups()->add($rowGroupContainer->rowGroup); - $fileOffset += \strlen($rowGroupContainer->binaryBuffer); - } + $rowGroupContainer = $this->rowGroupBuilder($schema)->flush($fileOffset); + \fwrite($stream, $rowGroupContainer->binaryBuffer); + $metadata->rowGroups()->add($rowGroupContainer->rowGroup); + $fileOffset += \strlen($rowGroupContainer->binaryBuffer); } if (!$this->rowGroupBuilder($schema)->isEmpty()) { diff --git a/src/lib/parquet/src/Flow/Parquet/functions.php b/src/lib/parquet/src/Flow/Parquet/functions.php index 93ae93f6d..75a39bf77 100644 --- a/src/lib/parquet/src/Flow/Parquet/functions.php +++ b/src/lib/parquet/src/Flow/Parquet/functions.php @@ -47,13 +47,17 @@ function array_flatten(array $array) : array { $result = []; - foreach ($array as $item) { - if (\is_array($item)) { - $result = \array_merge($result, array_flatten($item)); - } else { - $result[] = $item; + $flatten = function (array $arr) use (&$result, &$flatten) : void { + foreach ($arr as $item) { + if (\is_array($item)) { + $flatten($item); + } else { + $result[] = $item; + } } - } + }; + + $flatten($array); return $result; } diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/ParquetFile/RowGroupBuilder/PagesBuilderTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/ParquetFile/RowGroupBuilder/PagesBuilderTest.php index d7ee80016..015332ba5 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/ParquetFile/RowGroupBuilder/PagesBuilderTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/ParquetFile/RowGroupBuilder/PagesBuilderTest.php @@ -31,7 +31,8 @@ public function test_building_multiple_pages_for_large_int32_column() : void foreach ($values as $value) { $statistics->add($value); } - $pages = (new PagesBuilder(DataConverter::initialize($options), new PageSizeCalculator($options)))->build($column, $values, $statistics); + $pages = (new PagesBuilder(DataConverter::initialize($options), new PageSizeCalculator($options), $options)) + ->build($column, $values, $statistics); $this->assertCount(4, $pages->dataPageContainers()); $this->assertEquals( @@ -65,7 +66,8 @@ public function test_building_pages_for_enum_columns() : void $statistics->add($value); } - $pages = (new PagesBuilder(DataConverter::initialize(new Options()), new PageSizeCalculator(new Options()))) + $options = new Options(); + $pages = (new PagesBuilder(DataConverter::initialize($options), new PageSizeCalculator($options), $options)) ->build($column, $values, $statistics); $this->assertEquals( @@ -77,7 +79,7 @@ public function test_building_pages_for_enum_columns() : void null, new DictionaryPageHeader( Encodings::PLAIN, - $pages->valuesCount(), + \count($enum), ) ), $pages->dictionaryPageContainer()->pageHeader @@ -88,7 +90,7 @@ public function test_building_pages_for_enum_columns() : void \strlen($pages->dataPageContainers()[0]->pageBuffer), \strlen($pages->dataPageContainers()[0]->pageBuffer), new DataPageHeader( - Encodings::PLAIN_DICTIONARY, + Encodings::RLE_DICTIONARY, \count($values), ), null, @@ -109,7 +111,9 @@ public function test_building_pages_for_integer_column() : void $statistics->add($value); } - $pages = (new PagesBuilder(DataConverter::initialize(new Options()), new PageSizeCalculator(new Options())))->build($column, $values, $statistics); + $options = new Options(); + $pages = (new PagesBuilder(DataConverter::initialize($options), new PageSizeCalculator($options), $options)) + ->build($column, $values, $statistics); $this->assertCount(1, $pages->dataPageContainers()); $this->assertEquals( @@ -138,29 +142,19 @@ public function test_building_pages_for_json_columns() : void foreach ($values as $value) { $statistics->add($value); } - $pages = (new PagesBuilder(DataConverter::initialize(new Options()), new PageSizeCalculator(new Options())))->build($column, $values, $statistics); - $this->assertEquals( - new PageHeader( - Type::DICTIONARY_PAGE, - \strlen($pages->dictionaryPageContainer()->pageBuffer), - \strlen($pages->dictionaryPageContainer()->pageBuffer), - null, - null, - new DictionaryPageHeader( - Encodings::PLAIN, - $pages->valuesCount(), - ) - ), - $pages->dictionaryPageContainer()->pageHeader - ); + $options = new Options(); + $pages = (new PagesBuilder(DataConverter::initialize($options), new PageSizeCalculator($options), $options)) + ->build($column, $values, $statistics); + + $this->assertNull($pages->dictionaryPageContainer()); $this->assertEquals( new PageHeader( Type::DATA_PAGE, \strlen($pages->dataPageContainers()[0]->pageBuffer), \strlen($pages->dataPageContainers()[0]->pageBuffer), new DataPageHeader( - Encodings::PLAIN_DICTIONARY, + Encodings::PLAIN, \count($values), ), null, @@ -170,7 +164,7 @@ public function test_building_pages_for_json_columns() : void ); } - public function test_building_pages_for_string_columns() : void + public function test_building_pages_for_string_columns_with_very_low_cardinality() : void { $column = FlatColumn::string('string'); $values = \array_map(static fn ($i) => 'abcdefghij', \range(0, 99)); @@ -180,7 +174,9 @@ public function test_building_pages_for_string_columns() : void foreach ($values as $value) { $statistics->add($value); } - $pages = (new PagesBuilder(DataConverter::initialize($options), new PageSizeCalculator($options)))->build($column, $values, $statistics); + + $pages = (new PagesBuilder(DataConverter::initialize($options), new PageSizeCalculator($options), $options)) + ->build($column, $values, $statistics); $this->assertCount(1, $pages->dataPageContainers()); $this->assertEquals( @@ -203,7 +199,7 @@ public function test_building_pages_for_string_columns() : void \strlen($pages->dataPageContainers()[0]->pageBuffer), \strlen($pages->dataPageContainers()[0]->pageBuffer), new DataPageHeader( - Encodings::PLAIN_DICTIONARY, + Encodings::RLE_DICTIONARY, 100, ), null, @@ -223,29 +219,18 @@ public function test_building_pages_for_uuid_columns() : void foreach ($values as $value) { $statistics->add($value); } - $pages = (new PagesBuilder(DataConverter::initialize(new Options()), new PageSizeCalculator(new Options())))->build($column, $values, $statistics); + $options = new Options(); + $pages = (new PagesBuilder(DataConverter::initialize($options), new PageSizeCalculator($options), $options)) + ->build($column, $values, $statistics); - $this->assertEquals( - new PageHeader( - Type::DICTIONARY_PAGE, - \strlen($pages->dictionaryPageContainer()->pageBuffer), - \strlen($pages->dictionaryPageContainer()->pageBuffer), - null, - null, - new DictionaryPageHeader( - Encodings::PLAIN, - $pages->valuesCount(), - ) - ), - $pages->dictionaryPageContainer()->pageHeader - ); + $this->assertNull($pages->dictionaryPageContainer()); $this->assertEquals( new PageHeader( Type::DATA_PAGE, \strlen($pages->dataPageContainers()[0]->pageBuffer), \strlen($pages->dataPageContainers()[0]->pageBuffer), new DataPageHeader( - Encodings::PLAIN_DICTIONARY, + Encodings::PLAIN, \count($values), ), null,