Skip to content

Commit

Permalink
Add support for LZ4 compression (#1107)
Browse files Browse the repository at this point in the history
  • Loading branch information
flavioheleno authored Jul 4, 2024
1 parent 03cf4af commit 9436986
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 2 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/test-extensions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ jobs:

- name: "Install dependencies"
run: |
sudo apt-get update && sudo apt-get install libzstd1 --assume-yes
sudo apt-get update && sudo apt-get install liblz4-1 libzstd1 --assume-yes
- name: "Install PHP"
uses: "shivammathur/setup-php@v2"
with:
tools: composer:v2
php-version: "${{ matrix.php-version }}"
ini-values: memory_limit=-1
extensions: :psr, zstd
extensions: :psr, lz4, zstd

- name: "List PHP Extensions"
run: php -m
Expand Down Expand Up @@ -87,5 +87,8 @@ jobs:
if: ${{ matrix.dependencies == 'locked' }}
run: "composer install --no-interaction --no-progress --no-suggest"

- name: "Test LZ4"
run: "composer test -- --group lz4-extension"

- name: "Test ZSTD"
run: "composer test -- --group zstd-extension"
8 changes: 8 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Option.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ enum Option
*/
case INT_96_AS_DATETIME;

/**
* Compression level for LZ4 codec. This option is going to be passed to lz4_compress function when Compression is set to LZ4.
* The level of compression (1-12, Recommended values are between 4 and 9).
*
* Default value is 4
*/
case LZ4_COMPRESSION_LEVEL;

/**
* PageBuilder is going to use this value to determine how many rows should be stored in one page.
* PageBuilder is not going to make it precisely equal to this value, but it will try to make it as close as possible.
Expand Down
1 change: 1 addition & 0 deletions src/lib/parquet/src/Flow/Parquet/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public function __construct()
Option::DICTIONARY_PAGE_SIZE->name => SizeUnits::MiB_SIZE,
Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION->name => 0.4,
Option::GZIP_COMPRESSION_LEVEL->name => 9,
Option::LZ4_COMPRESSION_LEVEL->name => 4,
Option::ZSTD_COMPRESSION_LEVEL->name => 3,
Option::WRITER_VERSION->name => 1,
Option::VALIDATE_DATA->name => true,
Expand Down
4 changes: 4 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public function compress(string $data, Compressions $compression) : string
Compressions::UNCOMPRESSED => $data,
Compressions::SNAPPY => \snappy_compress($data),
Compressions::GZIP => \gzencode($data, $this->options->getInt(Option::GZIP_COMPRESSION_LEVEL)),
Compressions::LZ4 => \lz4_compress($data, $this->options->getInt(Option::LZ4_COMPRESSION_LEVEL)),
Compressions::LZ4_RAW => \lz4_compress($data, $this->options->getInt(Option::LZ4_COMPRESSION_LEVEL)),
Compressions::ZSTD => \zstd_compress($data, $this->options->getInt(Option::ZSTD_COMPRESSION_LEVEL)),
default => throw new RuntimeException('Compression ' . $compression->name . ' is not supported yet')
};
Expand All @@ -41,6 +43,8 @@ public function decompress(string $data, Compressions $compression) : string
Compressions::UNCOMPRESSED => $data,
Compressions::SNAPPY => \snappy_uncompress($data),
Compressions::GZIP => \gzdecode($data),
Compressions::LZ4 => \lz4_uncompress($data),
Compressions::LZ4_RAW => \lz4_uncompress($data),
Compressions::ZSTD => \zstd_uncompress($data),
default => throw new RuntimeException('Compression ' . $compression->name . ' is not supported yet')
};
Expand Down
2 changes: 2 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public function __construct(
case Compressions::UNCOMPRESSED:
case Compressions::SNAPPY:
case Compressions::GZIP:
case Compressions::LZ4:
case Compressions::LZ4_RAW:
case Compressions::ZSTD:
break;

Expand Down
14 changes: 14 additions & 0 deletions src/lib/parquet/src/stubs.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

declare(strict_types=1);

if (!\function_exists('lz4_compress')) {
function lz4_compress(string $data, int $level = 0, ?string $extra = null) : string
{
throw new RuntimeException('The lz4 extension is not available');
}
}

if (!\function_exists('lz4_uncompress')) {
function lz4_uncompress(string $data, int $maxsize = -1, int $offset = -1) : string
{
throw new RuntimeException('The lz4 extension is not available');
}
}

if (!\function_exists('zstd_compress')) {
function zstd_compress(string $data, int $level = 3) : string
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,110 @@ public function test_writing_and_reading_file_with_gzip_compression() : void
\unlink($path);
}

#[Group('lz4-extension')]
public function test_writing_and_reading_file_with_lz4_compression() : void
{
if (!\extension_loaded('lz4')) {
self::markTestSkipped('The lz4 extension is not available');
}

$path = \sys_get_temp_dir() . '/test-writer-parquet-test-' . bin2hex(random_bytes(16)) . '.parquet';

$writer = new Writer(Compressions::LZ4);

$schema = Schema::with(NestedColumn::struct('struct', [
FlatColumn::int64('int64'),
FlatColumn::boolean('boolean'),
FlatColumn::string('string'),
FlatColumn::int32('int32'),
NestedColumn::list('list_of_int', ListElement::int32()),
NestedColumn::list('list_of_string', ListElement::string()),
]));

$faker = Factory::create();
$inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array {
return [
[
'struct' => [
'int64' => $faker->numberBetween(0, Consts::PHP_INT64_MAX),
'boolean' => $faker->boolean,
'string' => $faker->text(150),
'int32' => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
'list_of_int' => \array_map(
static fn ($i) => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
\range(1, \random_int(2, 10))
),
'list_of_string' => \array_map(
static fn ($i) => $faker->text(10),
\range(1, \random_int(2, 10))
),
],
],
];
}, \range(1, 100)));

$writer->write($path, $schema, $inputData);

self::assertSame(
$inputData,
\iterator_to_array((new Reader())->read($path)->values())
);
self::assertFileExists($path);
\unlink($path);
}

#[Group('lz4-extension')]
public function test_writing_and_reading_file_with_lz4_raw_compression() : void
{
if (!\extension_loaded('lz4')) {
self::markTestSkipped('The lz4 extension is not available');
}

$path = \sys_get_temp_dir() . '/test-writer-parquet-test-' . bin2hex(random_bytes(16)) . '.parquet';

$writer = new Writer(Compressions::LZ4_RAW);

$schema = Schema::with(NestedColumn::struct('struct', [
FlatColumn::int64('int64'),
FlatColumn::boolean('boolean'),
FlatColumn::string('string'),
FlatColumn::int32('int32'),
NestedColumn::list('list_of_int', ListElement::int32()),
NestedColumn::list('list_of_string', ListElement::string()),
]));

$faker = Factory::create();
$inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array {
return [
[
'struct' => [
'int64' => $faker->numberBetween(0, Consts::PHP_INT64_MAX),
'boolean' => $faker->boolean,
'string' => $faker->text(150),
'int32' => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
'list_of_int' => \array_map(
static fn ($i) => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
\range(1, \random_int(2, 10))
),
'list_of_string' => \array_map(
static fn ($i) => $faker->text(10),
\range(1, \random_int(2, 10))
),
],
],
];
}, \range(1, 100)));

$writer->write($path, $schema, $inputData);

self::assertSame(
$inputData,
\iterator_to_array((new Reader())->read($path)->values())
);
self::assertFileExists($path);
\unlink($path);
}

public function test_writing_and_reading_file_with_snappy_compression() : void
{
$path = __DIR__ . '/var/test-writer-parquet-test-' . bin2hex(random_bytes(16)) . '.parquet';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,40 @@ public function test_gzip() : void
);
}

#[Group('lz4-extension')]
public function test_lz4() : void
{
if (!\extension_loaded('lz4')) {
self::markTestSkipped('The lz4 extension is not available');
}

$data = 'this is some test data to be compressed';

$codec = new Codec((new Options()));

self::assertSame(
$data,
$codec->decompress($codec->compress($data, Compressions::LZ4), Compressions::LZ4)
);
}

#[Group('lz4-extension')]
public function test_lz4_raw() : void
{
if (!\extension_loaded('lz4')) {
self::markTestSkipped('The lz4 extension is not available');
}

$data = 'this is some test data to be compressed';

$codec = new Codec((new Options()));

self::assertSame(
$data,
$codec->decompress($codec->compress($data, Compressions::LZ4_RAW), Compressions::LZ4_RAW)
);
}

public function test_snappy() : void
{
$data = 'this is some test data to be compressed';
Expand Down

0 comments on commit 9436986

Please sign in to comment.