Skip to content

Commit

Permalink
Add support for ZSTD compression
Browse files Browse the repository at this point in the history
  • Loading branch information
flavioheleno committed Jul 3, 2024
1 parent 8ae179f commit 45a8846
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 1 deletion.
91 changes: 91 additions & 0 deletions .github/workflows/test-extensions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
name: Extensions Tests

on:
pull_request:
paths:
- '.github/workflows/**'
- 'src/adapter/**'
- 'src/core/**'
- 'src/lib/**'
- 'tools/**'
- 'examples/**'
- 'composer.lock'
push:
branches: [ 1.x ]
paths-ignore:
- 'CHANGELOG.md'

# See https://stackoverflow.com/a/72408109
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
tests:
name: "Tests"

runs-on: ${{ matrix.operating-system }}

strategy:
fail-fast: false
matrix:
dependencies:
- "locked"
- "lowest"
- "highest"
php-version:
- "8.1"
- "8.2"
- "8.3"
operating-system:
- "ubuntu-latest"

steps:
- name: "Checkout"
uses: "actions/checkout@v4"

- name: "Install dependencies"
run: |
sudo apt-get update && sudo apt-get install libzstd1 --assume-yes
- name: "Install PHP"
uses: "shivammathur/setup-php@v2"
with:
tools: composer:v2
php-version: "${{ matrix.php-version }}"
ini-values: memory_limit=-1
extensions: :psr, zstd

- name: "List PHP Extensions"
run: php -m

- name: "List PHP configuration"
run: php -i

- name: "Get Composer Cache Directory"
id: composer-cache
run: |
echo "dir=$(composer config cache-files-dir)" >> $GITHUB_OUTPUT
- name: "Cache Composer dependencies"
uses: "actions/cache@v3"
with:
path: "${{ steps.composer-cache.outputs.dir }}"
key: "php-${{ matrix.php-version }}-${{ matrix.dependencies }}-composer-${{ hashFiles('**/composer.lock') }}"
restore-keys: |
php-${{ matrix.php-version }}-${{ matrix.dependencies }}-composer-
- name: "Install lowest dependencies"
if: ${{ matrix.dependencies == 'lowest' }}
run: "composer update --prefer-lowest --no-interaction --no-progress --no-suggest"

- name: "Install highest dependencies"
if: ${{ matrix.dependencies == 'highest' }}
run: "composer update --no-interaction --no-progress --no-suggest"

- name: "Install locked dependencies"
if: ${{ matrix.dependencies == 'locked' }}
run: "composer install --no-interaction --no-progress --no-suggest"

- name: "Test ZSTD"
run: "composer test -- --group zstd-extension"
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"src/core/etl/src/Flow/ETL/DSL/functions.php",
"src/lib/array-dot/src/Flow/ArrayDot/array_dot.php",
"src/lib/parquet/src/Flow/Parquet/functions.php",
"src/lib/parquet/src/stubs.php",
"src/lib/snappy/polyfill.php"
],
"psr-4": {
Expand Down
3 changes: 2 additions & 1 deletion src/lib/parquet/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
]
},
"files": [
"src/Flow/Parquet/functions.php"
"src/Flow/Parquet/functions.php",
"src/stubs.php"
]
},
"autoload-dev": {
Expand Down
8 changes: 8 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Option.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,12 @@ enum Option
* Default 1
*/
case WRITER_VERSION;

/**
* Compression level for ZSTD codec. This option is going to be passed to zstd_compress function when Compression is set to ZSTD.
* A value smaller than 0 means a faster compression level. (Zstandard library 1.3.4 or later).
*
* Default value is 3
*/
case ZSTD_COMPRESSION_LEVEL;
}
1 change: 1 addition & 0 deletions src/lib/parquet/src/Flow/Parquet/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public function __construct()
Option::DICTIONARY_PAGE_SIZE->name => Consts::MB_SIZE,
Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION->name => 0.4,
Option::GZIP_COMPRESSION_LEVEL->name => 9,
Option::ZSTD_COMPRESSION_LEVEL->name => 3,
Option::WRITER_VERSION->name => 1,
Option::VALIDATE_DATA->name => true,
];
Expand Down
2 changes: 2 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public function compress(string $data, Compressions $compression) : string
Compressions::SNAPPY => \snappy_compress($data),
/** @phpstan-ignore-next-line */
Compressions::GZIP => \gzencode($data, $this->options->get(Option::GZIP_COMPRESSION_LEVEL)),
Compressions::ZSTD => \zstd_compress($data, $this->options->getInt(Option::ZSTD_COMPRESSION_LEVEL)),
default => throw new RuntimeException('Compression ' . $compression->name . ' is not supported yet')
};

Expand All @@ -43,6 +44,7 @@ public function decompress(string $data, Compressions $compression) : string
Compressions::UNCOMPRESSED => $data,
Compressions::SNAPPY => \snappy_uncompress($data),
Compressions::GZIP => \gzdecode($data),
Compressions::ZSTD => \zstd_uncompress($data),
default => throw new RuntimeException('Compression ' . $compression->name . ' is not supported yet')
};

Expand Down
1 change: 1 addition & 0 deletions src/lib/parquet/src/Flow/Parquet/Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public function __construct(
case Compressions::UNCOMPRESSED:
case Compressions::SNAPPY:
case Compressions::GZIP:
case Compressions::ZSTD:
break;

default:
Expand Down
17 changes: 17 additions & 0 deletions src/lib/parquet/src/stubs.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

if (!\function_exists('zstd_compress')) {
function zstd_compress(string $data, int $level = 3) : string
{
throw new RuntimeException('The Zstd extension is not available');
}
}

if (!\function_exists('zstd_uncompress')) {
function zstd_uncompress(string $data) : string
{
throw new RuntimeException('The Zstd extension is not available');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Flow\Parquet\ParquetFile\Schema\{FlatColumn, ListElement, NestedColumn};
use Flow\Parquet\ParquetFile\{Compressions, Schema};
use Flow\Parquet\{Consts, Reader, Writer};
use PHPUnit\Framework\Attributes\Group;
use PHPUnit\Framework\TestCase;

final class CompressionTest extends TestCase
Expand Down Expand Up @@ -152,4 +153,56 @@ public function test_writing_and_reading_file_with_uncompressed_compression() :
self::assertFileExists($path);
\unlink($path);
}

#[Group('zstd-extension')]
public function test_writing_and_reading_file_with_zstd_compression() : void
{
if (!\extension_loaded('zstd')) {
self::markTestSkipped('The Zstd extension is not available');
}

$path = \sys_get_temp_dir() . '/test-writer-parquet-test-' . bin2hex(random_bytes(16)) . '.parquet';

$writer = new Writer(Compressions::ZSTD);

$schema = Schema::with(NestedColumn::struct('struct', [
FlatColumn::int64('int64'),
FlatColumn::boolean('boolean'),
FlatColumn::string('string'),
FlatColumn::int32('int32'),
NestedColumn::list('list_of_int', ListElement::int32()),
NestedColumn::list('list_of_string', ListElement::string()),
]));

$faker = Factory::create();
$inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array {
return [
[
'struct' => [
'int64' => $faker->numberBetween(0, Consts::PHP_INT64_MAX),
'boolean' => $faker->boolean,
'string' => $faker->text(150),
'int32' => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
'list_of_int' => \array_map(
static fn ($i) => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
\range(1, \random_int(2, 10))
),
'list_of_string' => \array_map(
static fn ($i) => $faker->text(10),
\range(1, \random_int(2, 10))
),
],
],
];
}, \range(1, 100)));

$writer->write($path, $schema, $inputData);

self::assertSame(
$inputData,
\iterator_to_array((new Reader())->read($path)->values())
);
self::assertFileExists($path);
\unlink($path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Flow\Parquet\ParquetFile\{Codec, Compressions};
use Flow\Parquet\{Option, Options};
use PHPUnit\Framework\Attributes\Group;
use PHPUnit\Framework\TestCase;

final class CodecTest extends TestCase
Expand Down Expand Up @@ -45,4 +46,21 @@ public function test_uncompressed() : void
$codec->decompress($codec->compress($data, Compressions::UNCOMPRESSED), Compressions::UNCOMPRESSED)
);
}

#[Group('zstd-extension')]
public function test_zstd() : void
{
if (!\extension_loaded('zstd')) {
self::markTestSkipped('The Zstd extension is not available');
}

$data = 'this is some test data to be compressed';

$codec = new Codec((new Options()));

self::assertSame(
$data,
$codec->decompress($codec->compress($data, Compressions::ZSTD), Compressions::ZSTD)
);
}
}

0 comments on commit 45a8846

Please sign in to comment.