Skip to content

Commit

Permalink
Decide when to apply dictionary encoding (#654)
Browse files Browse the repository at this point in the history
* Decide when to apply dictionary encoding

* Allow to save as a dictionary any data type
  • Loading branch information
norberttech authored Oct 27, 2023
1 parent 212f07a commit d1f7c27
Show file tree
Hide file tree
Showing 22 changed files with 324 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
use Flow\ETL\Flow;
use Flow\ETL\Row;
use Flow\ETL\Rows;
use Flow\Parquet\Option;
use Flow\Parquet\Options;
use PHPUnit\Framework\TestCase;

final class ParquetTest extends TestCase
Expand Down Expand Up @@ -121,12 +119,7 @@ public function test_writing_and_reading_parquet_with_all_supported_types() : vo
$rows,
(new Flow())
->read(
Parquet::from(
$path,
options: (new Options())
->set(Option::BYTE_ARRAY_TO_STRING)
->set(Option::INT_96_AS_DATETIME)
)
Parquet::from($path)
)
->fetch()
);
Expand Down Expand Up @@ -172,9 +165,6 @@ public function test_writing_safe_and_reading_parquet_with_all_supported_types()
(new Flow())
->read(Parquet::from(
$paths,
options: (new Options())
->set(Option::BYTE_ARRAY_TO_STRING)
->set(Option::INT_96_AS_DATETIME)
))
->sortBy(ref('integer'))
->fetch()
Expand Down
19 changes: 0 additions & 19 deletions src/lib/dremel/src/Flow/Dremel/DataShredded.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace Flow\Dremel;

use Flow\Parquet\Exception\RuntimeException;

final class DataShredded
{
public function __construct(
Expand All @@ -12,21 +10,4 @@ public function __construct(
public readonly array $values
) {
}

public function indices(array $dictionary) : array
{
$indices = [];

foreach ($this->values as $value) {
$index = \array_search($value, $dictionary, true);

if (!\is_int($index)) {
throw new RuntimeException('Value "' . $value . '" not found in dictionary');
}

$indices[] = $index;
}

return $indices;
}
}
6 changes: 6 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Consts.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

final class Consts
{
public const GB_SIZE = 1073741824;

public const KB_SIZE = 1024;

public const MB_SIZE = 1048576;

public const PHP_INT32_MAX = 2147483647;

public const PHP_INT64_MAX = 9223372036854775807;
Expand Down
27 changes: 27 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Option.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,28 @@ enum Option
/**
* Some parquet writers might not properly use LogicalTyp for storing Strings or JSON's.
* This option would tell the reader to treat all BYTE_ARRAY's as UTF-8 strings.
*
* Default value is true;
*/
case BYTE_ARRAY_TO_STRING;

/**
* Whenever cardinality ration of the dictionary goes below this value, PagesBuilders is going to fallback to PLAIN encoding.
* Cardinality ration is calculated as distinct values / total values.
* Please notice that even when cardinality ration is above this value, PageBuilder will still fallback to PLAIN encoding
* when dictionary size gets above DICTIONARY_PAGE_SIZE.
*
* Default value 0.4 (40% of the total values is distinct)
*/
case DICTIONARY_PAGE_MIN_CARDINALITY_RATION;

/**
* Whenever size of the dictionary goes above this value, PagesBuilders is going to fallback to PLAIN encoding.
*
* Default value is 1Mb
*/
case DICTIONARY_PAGE_SIZE;

/**
* When this option is set to true, reader will try to convert INT96 logical type to DateTimeImmutable object.
* Some parquet writers due to historical reasons might still use INT96 to store timestamps with nanoseconds precision
Expand All @@ -19,6 +38,8 @@ enum Option
*
* INT96 in general is not supported anymore, this option should be set to true by default, otherwise it will
* return array of bytes (12) that represents INT96.
*
* Default value is true
*/
case INT_96_AS_DATETIME;

Expand All @@ -27,20 +48,26 @@ enum Option
* PageBuilder is not going to make it precisely equal to this value, but it will try to make it as close as possible.
* This should be considered as a threshold rather than a strict value.
*
* Default value is 128Mb
*
* https://parquet.apache.org/docs/file-format/configurations/#data-page--size
*/
case PAGE_SIZE_BYTES;

/**
* Since PHP does not support nanoseconds precision for DateTime objects, when this options is set to true,
* reader will round nanoseconds to microseconds.
*
* Default value is false
*/
case ROUND_NANOSECONDS;

/**
* RowGroupBuilder is going to use this value to determine for how long it should keep adding rows to the buffer
* before flushing it on disk.
*
* Default value is 8Kb
*
* https://parquet.apache.org/docs/file-format/configurations/#row-group-size
*/
case ROW_GROUP_SIZE_BYTES;
Expand Down
12 changes: 7 additions & 5 deletions src/lib/parquet/src/Flow/Parquet/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
final class Options
{
/**
* @var array<string, bool|int>
* @var array<string, bool|float|int>
*/
private array $options;

Expand All @@ -15,8 +15,10 @@ public function __construct()
Option::BYTE_ARRAY_TO_STRING->name => true,
Option::ROUND_NANOSECONDS->name => false,
Option::INT_96_AS_DATETIME->name => true,
Option::PAGE_SIZE_BYTES->name => 1024 * 8,
Option::ROW_GROUP_SIZE_BYTES->name => 1024 * 1024 * 128,
Option::PAGE_SIZE_BYTES->name => Consts::KB_SIZE * 8,
Option::ROW_GROUP_SIZE_BYTES->name => Consts::MB_SIZE * 128,
Option::DICTIONARY_PAGE_SIZE->name => Consts::MB_SIZE,
Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION->name => 0.4,
];
}

Expand All @@ -25,12 +27,12 @@ public static function default() : self
return new self;
}

public function get(Option $option) : bool|int
public function get(Option $option) : bool|int|float
{
return $this->options[$option->name];
}

public function set(Option $option, bool|int $value = true) : self
public function set(Option $option, bool|int|float $value) : self
{
$this->options[$option->name] = $value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private function createColumnChunkBuilders(Schema $schema) : array
$builders = [];

foreach ($schema->columnsFlat() as $column) {
$builders[$column->flatPath()] = new ColumnChunkBuilder($column, $this->dataConverter, $this->calculator);
$builders[$column->flatPath()] = new ColumnChunkBuilder($column, $this->dataConverter, $this->calculator, $this->options);
}

return $builders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Flow\Parquet\ParquetFile\RowGroupBuilder;

use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Compressions;
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
Expand All @@ -16,7 +17,8 @@ final class ColumnChunkBuilder
public function __construct(
private readonly FlatColumn $column,
private readonly DataConverter $dataConverter,
private readonly PageSizeCalculator $calculator
private readonly PageSizeCalculator $calculator,
private readonly Options $options
) {
$this->statistics = new ColumnChunkStatistics($column);
}
Expand All @@ -29,7 +31,7 @@ public function addRow(mixed $row) : void

public function flush(int $fileOffset) : ColumnChunkContainer
{
$pageContainers = (new PagesBuilder($this->dataConverter, $this->calculator))->build($this->column, $this->rows, $this->statistics);
$pageContainers = (new PagesBuilder($this->dataConverter, $this->calculator, $this->options))->build($this->column, $this->rows, $this->statistics);

$this->statistics->reset();

Expand All @@ -45,7 +47,7 @@ public function flush(int $fileOffset) : ColumnChunkContainer
totalCompressedSize: $pageContainers->size(),
totalUncompressedSize: $pageContainers->size(),
dictionaryPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset : null,
dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->size() : $fileOffset,
dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->totalSize() : $fileOffset,
indexPageOffset: null,
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@

final class ColumnChunkStatistics
{
private int $distinctCount;

private int $nullCount;

private int $totalStringLength;

private array $values = [];

private int $valuesCount;

public function __construct(private readonly FlatColumn $column)
{
$this->nullCount = 0;
$this->distinctCount = 0;
$this->valuesCount = 0;
$this->totalStringLength = 0;
}
Expand All @@ -39,6 +38,14 @@ public function add(string|int|float|null|array|bool|object $value) : void
return;
}

if (\is_array($value)) {
foreach ($value as $val) {
$this->values[] = \is_object($val) ? \serialize($val) : $val;
}
} else {
$this->values[] = \is_object($value) ? \serialize($value) : $value;
}

if ((\is_string($value) || \is_array($value)) && ColumnPrimitiveType::isString($this->column)) {
if (\is_string($value)) {
$this->totalStringLength += \strlen($value);
Expand All @@ -59,9 +66,14 @@ public function avgStringLength() : int
return (int) \ceil($this->totalStringLength / $this->notNullCount());
}

public function cardinalityRation() : float
{
return \round($this->distinctCount() / $this->notNullCount(), 2);
}

public function distinctCount() : int
{
return $this->distinctCount;
return \count(\array_unique($this->values));
}

public function notNullCount() : int
Expand All @@ -77,9 +89,9 @@ public function nullCount() : int
public function reset() : void
{
$this->nullCount = 0;
$this->distinctCount = 0;
$this->valuesCount = 0;
$this->totalStringLength = 0;
$this->values = [];
}

public function totalStringLength() : int
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,19 @@
use Flow\Parquet\ParquetFile\Page\Header\DataPageHeader;
use Flow\Parquet\ParquetFile\Page\Header\Type;
use Flow\Parquet\ParquetFile\Page\PageHeader;
use Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder;
use Flow\Parquet\ParquetFile\RowGroupBuilder\PageContainer;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Thrift\Protocol\TCompactProtocol;
use Thrift\Transport\TMemoryBuffer;

final class DataPageBuilder implements PageBuilder
final class DataPageBuilder
{
public function __construct(
private readonly DataConverter $dataConverter,
private readonly ?array $dictionary = null
) {
}

public function build(FlatColumn $column, array $rows) : PageContainer
public function build(FlatColumn $column, array $rows, ?array $dictionary = null, ?array $indices = null) : PageContainer
{
$shredded = (new Dremel())->shred($rows, $column->maxDefinitionsLevel());

Expand All @@ -41,8 +39,8 @@ public function build(FlatColumn $column, array $rows) : PageContainer
$pageWriter->append((new RLEBitPackedPacker($rleBitPackedHybrid))->pack($shredded->definitions));
}

if ($this->dictionary) {
$pageWriter->append((new RLEBitPackedPacker($rleBitPackedHybrid))->packWithBitWidth($shredded->indices($this->dictionary)));
if ($dictionary && $indices) {
$pageWriter->append((new RLEBitPackedPacker($rleBitPackedHybrid))->packWithBitWidth($indices));
} else {
$pageWriter->append((new PlainValuesPacker($this->dataConverter))->packValues($column, $shredded->values));
}
Expand All @@ -52,8 +50,8 @@ public function build(FlatColumn $column, array $rows) : PageContainer
\strlen($pageBuffer),
\strlen($pageBuffer),
dataPageHeader: new DataPageHeader(
$this->dictionary ? Encodings::PLAIN_DICTIONARY : Encodings::PLAIN,
\count($shredded->values),
$dictionary && $indices ? Encodings::RLE_DICTIONARY : Encodings::PLAIN,
$dictionary && $indices ? \count($indices) : \count($shredded->values),
),
dataPageHeaderV2: null,
dictionaryPageHeader: null,
Expand All @@ -64,6 +62,7 @@ public function build(FlatColumn $column, array $rows) : PageContainer
$pageHeaderBuffer->getBuffer(),
$pageBuffer,
$shredded->values,
null,
$pageHeader
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder;

final class Dictionary
{
/**
* @param array<int, mixed> $dictionary
* @param array<int, int> $indices
*/
public function __construct(
public readonly array $dictionary,
public readonly array $indices,
) {
}
}
Loading

0 comments on commit d1f7c27

Please sign in to comment.