Skip to content

Commit

Permalink
Add support for BROTLI compression (#1106)
Browse files Browse the repository at this point in the history
* Add support for BROTLI compression

* Update Parquet documentation
  • Loading branch information
flavioheleno authored Jul 4, 2024
1 parent dab9006 commit 9bbe385
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 38 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/test-extensions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ jobs:

- name: "Install dependencies"
run: |
sudo apt-get update && sudo apt-get install liblz4-1 libzstd1 --assume-yes
sudo apt-get update && sudo apt-get install libbrotli1 liblz4-1 libzstd1 --assume-yes
- name: "Install PHP"
uses: "shivammathur/setup-php@v2"
with:
tools: composer:v2
php-version: "${{ matrix.php-version }}"
ini-values: memory_limit=-1
extensions: :psr, lz4, zstd
extensions: :psr, brotli, lz4, zstd

- name: "List PHP Extensions"
run: php -m
Expand Down Expand Up @@ -87,6 +87,9 @@ jobs:
if: ${{ matrix.dependencies == 'locked' }}
run: "composer install --no-interaction --no-progress --no-suggest"

- name: "Test Brotli"
run: "composer test -- --group brotli-extension"

- name: "Test LZ4"
run: "composer test -- --group lz4-extension"

Expand Down
72 changes: 36 additions & 36 deletions docs/components/libs/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
composer require flow-php/parquet
```

## What is Parquet
## What is Parquet

Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval.
It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval.
It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
Parquet is available in multiple languages including Java, C++, Python, etc... **Now also in PHP!**

## Columnar Storage

Parquet stores data in a columnar format, but what does it means?
Parquet stores data in a columnar format, but what does it means?

Row-based format:

Expand Down Expand Up @@ -69,30 +69,30 @@ File Metadata

## Reading Parquet Files

The first thing we need to do is to create a reader.
The first thing we need to do is to create a reader.

```php
use Flow\Parquet\Reader;

$reader = new Reader();
```

The Reader accepts two arguments:
The Reader accepts two arguments:

- `$byteOrder` - by default set to `ByteOrder::LITTLE_ENDIAN`
- `$options` - a set of options that can be used to configure the reader.

All available options are described in [Option](/src/lib/parquet/src/Flow/Parquet/Option.php) enum.
All available options are described in [Option](/src/lib/parquet/src/Flow/Parquet/Option.php) enum.

> Please be aware that not all options are affecting reader.
> Please be aware that not all options are affecting reader.
### Reader Options

- `INT_96_AS_DATETIME` - default: `true` - if set to `true` then `INT96` values will be converted to `DateTime` objects.
- `INT_96_AS_DATETIME` - default: `true` - if set to `true` then `INT96` values will be converted to `DateTime` objects.

### Reading a file

Once we have reader we can read a file.
Once we have reader we can read a file.

```php
use Flow\Parquet\Reader;
Expand All @@ -105,13 +105,13 @@ $file = $reader->readStream(\fopen('path/to/file.parquet', 'rb'));

At this point, nothing is read yet. We just created a file object.

There are several things we can read from parquet file:
There are several things we can read from parquet file:

- `ParquetFile::values(array $columns = [], ?int $limit = null, ?int $offset = null) : \Generator`
- `ParquetFile::metadata() : Metadata`
- `ParquetFile::schema() : Schema` - shortcut for `ParquetFile::metadata()->schema()`

### Reading the whole file:
### Reading the whole file:

```php
use Flow\Parquet\Reader;
Expand All @@ -137,9 +137,9 @@ foreach ($file->values(["column_1", "column_2"]) as $row) {
}
```

### Pagination
### Pagination

> [!NOTE]
> [!NOTE]
> Paginating over parquet file is a bit tricky, especially if we want to keep memory usage low.
> To achieve the best results, we will need to play a bit with Writer options (covered later).
Expand All @@ -157,9 +157,9 @@ foreach ($file->values(["column_1", "column_2"], limit: 100, offset: 1000) as $r
## Writing Parquet Files

Since parquet is a binary format, we need to provide a schema for the writer so it can know how
to encode values in specific columns.
to encode values in specific columns.

Here is how we can create a schema:
Here is how we can create a schema:

```php

Expand All @@ -185,20 +185,20 @@ $schema = Schema::with(
);
```

Once we have a schema, we can create a writer.
Once we have a schema, we can create a writer.

```php
use Flow\Parquet\Writer;

$writer = new Writer();
```

and write our data:
and write our data:

```
$writer->write(
$path,
$schema,
$path,
$schema,
[
[
'id' => 1,
Expand All @@ -210,11 +210,11 @@ $writer->write(
```

This approach will open a parquet file, create a group writer, write all data and close the file.
It requires to keep whole dataset in memory which usually is not the best approach.
It requires to keep whole dataset in memory which usually is not the best approach.

### Writing data in chunks

Before we can write a batch of rows, we need to open a file.
Before we can write a batch of rows, we need to open a file.

```php
$writer->open($path, $schema);
Expand Down Expand Up @@ -244,8 +244,8 @@ $writer->writeRow($row);
$writer->close();
```

> [!WARNING]
> At this point, schema evolution is not yet supported.
> [!WARNING]
> At this point, schema evolution is not yet supported.
> We need to make sure that schema is the same as the one used to create a file.
### Writer Options
Expand All @@ -256,7 +256,7 @@ $writer->close();
- `GZIP_COMPRESSION_LEVEL` - default: `9` - compression level for GZIP compression (applied only when GZIP compression is enabled).
- `PAGE_SIZE_BYTES` - default: `8Kb` - maximum size of data page.
- `ROUND_NANOSECONDS` - default: `false` - Since PHP does not support nanoseconds precision for DateTime objects, when this options is set to true, reader will round nanoseconds to microseconds.
- `ROW_GROUP_SIZE_BYTES` - default: `8Mb` - maximum size of row group.
- `ROW_GROUP_SIZE_BYTES` - default: `8Mb` - maximum size of row group.
- `ROW_GROUP_SIZE_CHECK_INTERVAL` default: `1000` - number of rows to write before checking if row group size limit is reached.
- `VALIDATE_DATA` - default: `true` - if set to `true` then writer will validate data against schema.
- `WRITER_VERSION` - default `1` - tells writer which version of parquet format should be used.
Expand All @@ -269,36 +269,36 @@ Two most important options that can heavily affect memory usage are:
Row Group Size defines pretty much how much data writer (but also reader) will need to keep in memory
before flushing it to the file.
Row group size check interval, defines how often writer will check if row group size limit is reached.
If you set this value too high, writer might exceed row group size limit.
If you set this value too high, writer might exceed row group size limit.

By default tools like Spark or Hive are using 128-512Mb as a row group size.
Which is great for big data, and quick processing in memory but not so great for PHP.

For example, if you need to paginate over file with 1Gb of data, and you set row group size to 512Mb,
you will need to keep at least 512Mb of data in memory at once.
you will need to keep at least 512Mb of data in memory at once.

A Much better approach is to reduce the row group size to something closer to 1Mb, and row grpu size check interval to
A Much better approach is to reduce the row group size to something closer to 1Mb, and row grpu size check interval to
what your default page size should be - like for example 100 or 500 (that obviously depends on your data)

This way you will keep memory usage low, and you will be able to paginate over big files without any issues.
But it will take a bit longer to write into those files since writter will need to flush and calculate staticists
more frequently.
more frequently.

Unfortunately, there is no one size fits all solution here.
Unfortunately, there is no one size fits all solution here.
You will need to play a bit with those values to find the best one for your use case.

## Compressions

Parquet supports several compression algorithms.

- `BROTLI` - not yet supported
- `GZIP` - supported out of the box
- `LZ4` - not yet supported
- `LZ4_RAW` - not yet supported
- `BROTLI` - supported if [Brotli Extension](https://github.com/kjdev/php-ext-brotli) is installed
- `GZIP` - supported out of the box
- `LZ4` - supported if [LZ4 Extension](https://github.com/kjdev/php-ext-lz4) is installed
- `LZ4_RAW` - supported if [LZ4 Extension](https://github.com/kjdev/php-ext-lz4) is installed
- `LZO` - not yet supported
- `SNAPPY` - supported - it's recommended to install [Snappy Extension](https://github.com/kjdev/php-ext-snappy) - otherwise php implementation is used that is much slower than extension
- `UNCOMPRESSED` - supported out of the box
- `ZSTD` - not yet supported
- `UNCOMPRESSED` - supported out of the box
- `ZSTD` - supported if [ZSTD Extension](https://github.com/kjdev/php-ext-zstd) is installed

Obviously, compression is a trade-off between speed and size.
If you want to achieve the best compression, you should use `GZIP` or `SNAPPY` which is a default compression algorithm.
Expand Down
7 changes: 7 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Option.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

enum Option
{
/**
* Compression level for Brotli codec. This option is going to be passed to gzcompress function when Compression is set to Brotli.
* The higher the quality, the slower the compression.
*
* Default value is 11 (BROTLI_COMPRESS_LEVEL_DEFAULT)
*/
case BROTLI_COMPRESSION_LEVEL;
/**
* Some parquet writers might not properly use LogicalTyp for storing Strings or JSON's.
* This option would tell the reader to treat all BYTE_ARRAY's as UTF-8 strings.
Expand Down
1 change: 1 addition & 0 deletions src/lib/parquet/src/Flow/Parquet/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public function __construct()
Option::ROW_GROUP_SIZE_CHECK_INTERVAL->name => 1000,
Option::DICTIONARY_PAGE_SIZE->name => SizeUnits::MiB_SIZE,
Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION->name => 0.4,
Option::BROTLI_COMPRESSION_LEVEL->name => 11,
Option::GZIP_COMPRESSION_LEVEL->name => 9,
Option::LZ4_COMPRESSION_LEVEL->name => 4,
Option::ZSTD_COMPRESSION_LEVEL->name => 3,
Expand Down
2 changes: 2 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public function compress(string $data, Compressions $compression) : string
$result = match ($compression) {
Compressions::UNCOMPRESSED => $data,
Compressions::SNAPPY => \snappy_compress($data),
Compressions::BROTLI => \brotli_compress($data, $this->options->getInt(Option::BROTLI_COMPRESSION_LEVEL)),
Compressions::GZIP => \gzencode($data, $this->options->getInt(Option::GZIP_COMPRESSION_LEVEL)),
Compressions::LZ4 => \lz4_compress($data, $this->options->getInt(Option::LZ4_COMPRESSION_LEVEL)),
Compressions::LZ4_RAW => \lz4_compress($data, $this->options->getInt(Option::LZ4_COMPRESSION_LEVEL)),
Expand All @@ -42,6 +43,7 @@ public function decompress(string $data, Compressions $compression) : string
$result = match ($compression) {
Compressions::UNCOMPRESSED => $data,
Compressions::SNAPPY => \snappy_uncompress($data),
Compressions::BROTLI => \brotli_uncompress($data),
Compressions::GZIP => \gzdecode($data),
Compressions::LZ4 => \lz4_uncompress($data),
Compressions::LZ4_RAW => \lz4_uncompress($data),
Expand Down
1 change: 1 addition & 0 deletions src/lib/parquet/src/Flow/Parquet/Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public function __construct(
switch ($this->compression) {
case Compressions::UNCOMPRESSED:
case Compressions::SNAPPY:
case Compressions::BROTLI:
case Compressions::GZIP:
case Compressions::LZ4:
case Compressions::LZ4_RAW:
Expand Down
14 changes: 14 additions & 0 deletions src/lib/parquet/src/stubs.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

declare(strict_types=1);

if (!\function_exists('brotli_compress')) {
function brotli_compress(string $data, int $quality = 11, int $mode = 0) : string
{
throw new RuntimeException('The Brotli extension is not available');
}
}

if (!\function_exists('brotli_uncompress')) {
function brotli_uncompress(string $data, int $length = 0) : string
{
throw new RuntimeException('The Brotli extension is not available');
}
}

if (!\function_exists('lz4_compress')) {
function lz4_compress(string $data, int $level = 0, ?string $extra = null) : string
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,58 @@ protected function setUp() : void
}
}

#[Group('brotli-extension')]
public function test_writing_and_reading_file_with_brotli_compression() : void
{
if (!\extension_loaded('brotli')) {
self::markTestSkipped('The Brotli extension is not available');
}

$path = \sys_get_temp_dir() . '/test-writer-parquet-test-' . bin2hex(random_bytes(16)) . '.parquet';

$writer = new Writer(Compressions::BROTLI);

$schema = Schema::with(NestedColumn::struct('struct', [
FlatColumn::int64('int64'),
FlatColumn::boolean('boolean'),
FlatColumn::string('string'),
FlatColumn::int32('int32'),
NestedColumn::list('list_of_int', ListElement::int32()),
NestedColumn::list('list_of_string', ListElement::string()),
]));

$faker = Factory::create();
$inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array {
return [
[
'struct' => [
'int64' => $faker->numberBetween(0, Consts::PHP_INT64_MAX),
'boolean' => $faker->boolean,
'string' => $faker->text(150),
'int32' => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
'list_of_int' => \array_map(
static fn ($i) => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
\range(1, \random_int(2, 10))
),
'list_of_string' => \array_map(
static fn ($i) => $faker->text(10),
\range(1, \random_int(2, 10))
),
],
],
];
}, \range(1, 100)));

$writer->write($path, $schema, $inputData);

self::assertSame(
$inputData,
\iterator_to_array((new Reader())->read($path)->values())
);
self::assertFileExists($path);
\unlink($path);
}

public function test_writing_and_reading_file_with_gzip_compression() : void
{
$path = __DIR__ . '/var/test-writer-parquet-test-' . bin2hex(random_bytes(16)) . '.parquet';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@

final class CodecTest extends TestCase
{
#[Group('brotli-extension')]
public function test_brotli() : void
{
if (!\extension_loaded('brotli')) {
self::markTestSkipped('The Brotli extension is not available');
}

$data = 'this is some test data to be compressed';

$codec = new Codec((new Options()));

self::assertSame(
$data,
$codec->decompress($codec->compress($data, Compressions::BROTLI), Compressions::BROTLI)
);
}

public function test_gzip() : void
{
$data = 'this is some test data to be compressed';
Expand Down

0 comments on commit 9bbe385

Please sign in to comment.