diff --git a/.github/labeler.yml b/.github/labeler.yml index 19c5f69d3..934979e77 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -17,6 +17,8 @@ lib-parquet: - any: ["src/lib/parquet/**/*"] lib-dremel: - any: ["src/lib/dremel/**/*"] +lib-snappy: + - any: ["src/lib/snappy/**/*"] adapter-amphp: - any: ["src/adapter/etl-adapter-amphp/**/*"] diff --git a/.github/workflows/monorepo-split.yml b/.github/workflows/monorepo-split.yml index a0c966369..afbe38b27 100644 --- a/.github/workflows/monorepo-split.yml +++ b/.github/workflows/monorepo-split.yml @@ -30,6 +30,8 @@ jobs: split_repository: 'parquet' - local_path: 'src/lib/dremel' split_repository: 'dremel' + - local_path: 'src/lib/snappy' + split_repository: 'snappy' - local_path: 'src/adapter/etl-adapter-amphp' split_repository: 'etl-adapter-amphp' diff --git a/README.md b/README.md index de1168e63..51a39b62c 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,7 @@ this will reduce the number of unnecessary dependencies in your project (less ma - [doctrine-dbal-bulk](src/lib/doctrine-dbal-bulk/README.md) - [Google Dremel algorithm](src/lib/dremel/README.md) - [Parquet](src/lib/parquet/README.md) + - [Snappy](src/lib/snappy/README.md) For example, if you want to work with JSON/CSV files here are the dependencies you will need to install: diff --git a/composer.json b/composer.json index 94eedd8e9..c97eeef5a 100644 --- a/composer.json +++ b/composer.json @@ -60,7 +60,8 @@ "build/version.php", "src/core/etl/src/Flow/ETL/DSL/functions.php", "src/lib/array-dot/src/Flow/ArrayDot/array_dot.php", - "src/lib/parquet/src/Flow/Parquet/functions.php" + "src/lib/parquet/src/Flow/Parquet/functions.php", + "src/lib/snappy/polifil.php" ], "psr-4": { "Flow\\": [ @@ -83,7 +84,8 @@ "src/lib/array-dot/src/Flow", "src/lib/doctrine-dbal-bulk/src/Flow", "src/lib/dremel/src/Flow", - "src/lib/parquet/src/Flow" + "src/lib/parquet/src/Flow", + "src/lib/snappy/src/Flow" ], "Flow\\Doctrine\\Bulk\\": [ "src/lib/doctrine-dbal-bulk/src/Flow/Doctrine/Bulk" @@ -115,7 +117,8 @@ "src/lib/array-dot/tests/Flow", "src/lib/doctrine-dbal-bulk/tests/Flow", "src/lib/dremel/tests/Flow", - "src/lib/parquet/tests/Flow" + "src/lib/parquet/tests/Flow", + "src/lib/snappy/tests/Flow" ], "Flow\\Doctrine\\Bulk\\Tests\\": [ "src/lib/doctrine-dbal-bulk/tests/Flow/Doctrine/Bulk/Tests" @@ -153,7 +156,8 @@ "flow-php/etl-adapter-reactphp": "self.version", "flow-php/etl-adapter-text": "self.version", "flow-php/etl-adapter-xml": "self.version", - "flow-php/parquet": "self.version" + "flow-php/parquet": "self.version", + "flow-php/snappy": "self.version" }, "scripts": { "build": [ diff --git a/phpunit.xml b/phpunit.xml index 073979ff1..38ffb5a1d 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -54,6 +54,7 @@ src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration src/core/etl/tests/Flow/ETL/Tests/Integration src/lib/parquet/tests/Flow/Parquet/Tests/Integration + src/lib/snappy/tests/Flow/Snappy/Tests/Integration src/adapter/etl-adapter-doctrine/tests/Flow/ETL/Adapter/Doctrine/Tests/Integration diff --git a/src/lib/parquet/composer.json b/src/lib/parquet/composer.json index 6071fa917..9be45397e 100644 --- a/src/lib/parquet/composer.json +++ b/src/lib/parquet/composer.json @@ -13,7 +13,8 @@ "require": { "php": "~8.1 || ~8.2", "ext-bcmath": "*", - "flow-php/dremel": "1.x-dev" + "flow-php/dremel": "1.x-dev", + "flow-php/snappy": "1.x-dev" }, "config": { "optimize-autoloader": true, diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php index ea482275b..b8ab7ba49 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php @@ -4,14 +4,6 @@ use Flow\Parquet\Exception\RuntimeException; -if (!\function_exists('snappy_uncompress')) { - /** @psalm-suppress LessSpecificReturnType */ - function snappy_uncompress(string $data) : string|false - { - throw new \Flow\ETL\Exception\RuntimeException('snappy_uncompress() is not available. Please install php-snappy extension https://github.com/kjdev/php-ext-snappy'); - } -} - final class Codec { public function decompress(string $data, Compressions $compression) : string @@ -19,7 +11,7 @@ public function decompress(string $data, Compressions $compression) : string /** @var false|string $result */ $result = match ($compression) { Compressions::UNCOMPRESSED => $data, - Compressions::SNAPPY => snappy_uncompress($data), + Compressions::SNAPPY => \snappy_uncompress($data), Compressions::GZIP => \gzdecode($data), default => throw new RuntimeException('Compression ' . $compression->name . ' is not supported yet') }; diff --git a/src/lib/snappy/.gitattributes b/src/lib/snappy/.gitattributes new file mode 100644 index 000000000..e02097205 --- /dev/null +++ b/src/lib/snappy/.gitattributes @@ -0,0 +1,9 @@ +*.php text eol=lf + +/.github export-ignore +/tests export-ignore + +/README.md export-ignore + +/.gitattributes export-ignore +/.gitignore export-ignore diff --git a/src/lib/snappy/CONTRIBUTING.md b/src/lib/snappy/CONTRIBUTING.md new file mode 100644 index 000000000..a2d0671c7 --- /dev/null +++ b/src/lib/snappy/CONTRIBUTING.md @@ -0,0 +1,6 @@ +## Contributing + +This repo is **READ ONLY**, in order to contribute to Flow PHP project, please +open PR against [flow](https://github.com/flow-php/flow) monorepo. + +Changes merged to monorepo are automatically propagated into sub repositories. \ No newline at end of file diff --git a/src/lib/snappy/LICENSE b/src/lib/snappy/LICENSE new file mode 100644 index 000000000..da3e28da7 --- /dev/null +++ b/src/lib/snappy/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Flow PHP + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/lib/snappy/README.md b/src/lib/snappy/README.md new file mode 100644 index 000000000..e7898a228 --- /dev/null +++ b/src/lib/snappy/README.md @@ -0,0 +1,73 @@ +# Snappy + +Pure PHP implementation of Google [Snappy](https://github.com/google/snappy) compression algorithm. + +This library is a port of javascript [snappyjs](https://github.com/zhipeng-jia/snappyjs). +Whenever it's possible it's recommended to install [PHP Extension](https://github.com/kjdev/php-ext-snappy), +otherwise this lib will register polyfill functions. + +## Installation + +``` +composer require flow-php/snappy:1.x@dev +``` + +## Usage + +```php +text($textSize); +} + +$snappy = new \Flow\Snappy\Snappy(); + +echo "Starting Benchmark\n\n"; + +$flowStart = microtime(true); +foreach ($texts as $text) { + if ($snappy->uncompress($snappy->compress($text)) !== $text) { + die('snappy flow failed'); + } +} +$flowEnd = microtime(true); +echo "Snappy Flow time: " . ($flowEnd - $flowStart) . "\n"; + +$extStart = microtime(true); +foreach ($texts as $text) { + if (\snappy_uncompress(\snappy_compress($text)) !== $text) { + die('snappy ext failed'); + } +} +$extEnd = microtime(true); +echo "Snappy PHP Extension time: " . ($extEnd - $extStart) . "\n"; +``` + +Output: + +```console +$ php benchmark_snappy.php +Starting Benchmark + +Snappy Flow time: 6.6838178634644 +Snappy PHP Extension time: 0.31190991401672 +``` \ No newline at end of file diff --git a/src/lib/snappy/composer.json b/src/lib/snappy/composer.json new file mode 100644 index 000000000..7737e523f --- /dev/null +++ b/src/lib/snappy/composer.json @@ -0,0 +1,38 @@ +{ + "name": "flow-php/snappy", + "type": "library", + "description": "PHP ETL - Google Snappy compression algorithm implementation", + "keywords": [ + "etl", + "extract", + "transform", + "load", + "filter", + "snappy", + "algorithm", + "compression" + ], + "require": { + "php": "~8.1 || ~8.2" + }, + "config": { + "optimize-autoloader": true, + "sort-packages": true + }, + "license": "MIT", + "autoload": { + "psr-4": { + "Flow\\": [ + "src/Flow" + ] + }, + "files": [ "polifil.php" ] + }, + "autoload-dev": { + "psr-4": { + "Flow\\": "tests/Flow" + } + }, + "minimum-stability": "dev", + "prefer-stable": true +} diff --git a/src/lib/snappy/polifil.php b/src/lib/snappy/polifil.php new file mode 100644 index 000000000..078f3af6f --- /dev/null +++ b/src/lib/snappy/polifil.php @@ -0,0 +1,20 @@ +compress($plainText); + } +} + +if (!\function_exists('snappy_uncompress')) { + function snappy_uncompress(string $compressedText) : string + { + return (new Snappy())->uncompress($compressedText); + } +} + diff --git a/src/lib/snappy/src/Flow/Snappy/Snappy.php b/src/lib/snappy/src/Flow/Snappy/Snappy.php new file mode 100644 index 000000000..7b39df219 --- /dev/null +++ b/src/lib/snappy/src/Flow/Snappy/Snappy.php @@ -0,0 +1,38 @@ +compressToBuffer($outputBuffer); + + return \pack('C*', ...$outputBuffer); + } + + public function uncompress(string $compressedText) : string + { + if ($compressedText === '') { + return $compressedText; + } + + $byteArray = \array_values(\unpack('C*', $compressedText)); + + $outputBuffer = []; + (new SnappyDecompressor($byteArray))->uncompressToBuffer($outputBuffer); + + return \pack('C*', ...$outputBuffer); + } +} diff --git a/src/lib/snappy/src/Flow/Snappy/SnappyCompressor.php b/src/lib/snappy/src/Flow/Snappy/SnappyCompressor.php new file mode 100644 index 000000000..775210277 --- /dev/null +++ b/src/lib/snappy/src/Flow/Snappy/SnappyCompressor.php @@ -0,0 +1,249 @@ +array = $uncompressed; + } + + public function compressToBuffer(array &$outBuffer) : int + { + $array = $this->array; + $length = \count($array); + $pos = 0; + $outPos = 0; + + $outPos = $this->putVarInt($length, $outBuffer, $outPos); + + while ($pos < $length) { + $fragmentSize = \min($length - $pos, self::BLOCK_SIZE); + $outPos = $this->compressFragment($array, $pos, $fragmentSize, $outBuffer, $outPos); + $pos += $fragmentSize; + } + + return $outPos; + } + + public function maxCompressedLength() : int + { + $sourceLen = \count($this->array); + + return 32 + $sourceLen + (int) \floor($sourceLen / 6); + } + + private function compressFragment(array $input, int $ip, int $inputSize, array &$output, int $op) : int + { + $hashTableBits = 1; + + while ((1 << $hashTableBits) <= $inputSize && $hashTableBits <= self::MAX_HASH_TABLE_BITS) { + $hashTableBits++; + } + $hashTableBits--; + + $hashFuncShift = 32 - $hashTableBits; + + if (!isset($this->globalHashTables[$hashTableBits])) { + $this->globalHashTables[$hashTableBits] = \array_fill(0, 1 << $hashTableBits, 0); + } + + $hashTable = $this->globalHashTables[$hashTableBits]; + + for ($i = 0; $i < \count($hashTable); $i++) { + $hashTable[$i] = 0; + } + + $ipEnd = $ip + $inputSize; + $baseIp = $ip; + $nextEmit = $ip; + + $flag = true; + + $inputMargin = 15; + + if ($inputSize >= $inputMargin) { + $ipLimit = $ipEnd - $inputMargin; + + $ip++; + $nextHash = $this->hashFunc($this->load32($input, $ip), $hashFuncShift); + + while ($flag) { + $skip = 32; + $nextIp = $ip; + + do { + $ip = $nextIp; + $hash = $nextHash; + $bytesBetweenHashLookups = (int) ($skip / 32); + $skip++; + $nextIp = $ip + $bytesBetweenHashLookups; + + if ($ip > $ipLimit) { + $flag = false; + + break; + } + + $nextHash = $this->hashFunc($this->load32($input, $nextIp), $hashFuncShift); + + $candidate = $baseIp + $hashTable[$hash]; + $hashTable[$hash] = $ip - $baseIp; + } while (!$this->equals32($input, $ip, $candidate)); + + if (!$flag) { + break; + } + + $op = $this->emitLiteral($input, $nextEmit, $ip - $nextEmit, $output, $op); + + do { + $base = $ip; + $matched = 4; + + while ($ip + $matched < $ipEnd && $input[$ip + $matched] === $input[$candidate + $matched]) { + $matched++; + } + + $ip += $matched; + $offset = $base - $candidate; + $op = $this->emitCopy($output, $op, $offset, $matched); + + $nextEmit = $ip; + + if ($ip >= $ipLimit) { + $flag = false; + + break; + } + + $prevHash = $this->hashFunc($this->load32($input, $ip - 1), $hashFuncShift); + $hashTable[$prevHash] = $ip - 1 - $baseIp; + $curHash = $this->hashFunc($this->load32($input, $ip), $hashFuncShift); + $candidate = $baseIp + $hashTable[$curHash]; + $hashTable[$curHash] = $ip - $baseIp; + } while ($this->equals32($input, $ip, $candidate)); + + if (!$flag) { + break; + } + + $ip++; + $nextHash = $this->hashFunc($this->load32($input, $ip), $hashFuncShift); + } + } + + if ($nextEmit < $ipEnd) { + $op = $this->emitLiteral($input, $nextEmit, $ipEnd - $nextEmit, $output, $op); + } + + return $op; + } + + private function copyBytes(array $fromArray, int $fromPos, array &$toArray, int $toPos, int $length) : void + { + for ($i = 0; $i < $length; $i++) { + $toArray[$toPos + $i] = $fromArray[$fromPos + $i]; + } + } + + private function emitCopy(array &$output, int $op, int $offset, int $len) : int + { + while ($len >= 68) { + $op = $this->emitCopyLessThan64($output, $op, $offset, 64); + $len -= 64; + } + + if ($len > 64) { + $op = $this->emitCopyLessThan64($output, $op, $offset, 60); + $len -= 60; + } + + return $this->emitCopyLessThan64($output, $op, $offset, $len); + } + + private function emitCopyLessThan64(array &$output, int $op, int $offset, int $len) : int + { + if ($len < 12 && $offset < 2048) { + $output[$op] = 1 + (($len - 4) << 2) + (($offset >> 8) << 5); + $output[$op + 1] = $offset & 0xff; + + return $op + 2; + } + $output[$op] = 2 + (($len - 1) << 2); + $output[$op + 1] = $offset & 0xff; + $output[$op + 2] = $offset >> 8; + + return $op + 3; + } + + private function emitLiteral(array &$input, int $ip, int $len, array &$output, int $op) + { + if ($len <= 60) { + $output[$op] = ($len - 1) << 2; + $op += 1; + } elseif ($len < 256) { + $output[$op] = 60 << 2; + $output[$op + 1] = $len - 1; + $op += 2; + } else { + $output[$op] = 61 << 2; + $output[$op + 1] = ($len - 1) & 0xff; + $output[$op + 2] = ($len - 1) >> 8; + $op += 3; + } + $this->copyBytes($input, $ip, $output, $op, $len); + + return $op + $len; + } + + private function equals32(array $array, int $pos1, int $pos2) + { + return $array[$pos1] === $array[$pos2] && + $array[$pos1 + 1] === $array[$pos2 + 1] && + $array[$pos1 + 2] === $array[$pos2 + 2] && + $array[$pos1 + 3] === $array[$pos2 + 3]; + } + + private function hashFunc(int $key, int $hashFuncShift) + { + $multiplied = $key * 0x1e35a7bd; + + // Emulate unsigned right shift in PHP + return ($multiplied >> $hashFuncShift) & ((1 << (32 - $hashFuncShift)) - 1); + } + + private function load32(array $array, int $pos) : int + { + return $array[$pos] + ($array[$pos + 1] << 8) + ($array[$pos + 2] << 16) + ($array[$pos + 3] << 24); + } + + private function putVarInt(int $value, array &$output, int $op) : int + { + do { + $output[$op] = $value & 0x7f; + $value = $value >> 7; + + if ($value > 0) { + $output[$op] += 0x80; + } + $op++; + } while ($value > 0); + + return $op; + } +} diff --git a/src/lib/snappy/src/Flow/Snappy/SnappyDecompressor.php b/src/lib/snappy/src/Flow/Snappy/SnappyDecompressor.php new file mode 100644 index 000000000..667e8379c --- /dev/null +++ b/src/lib/snappy/src/Flow/Snappy/SnappyDecompressor.php @@ -0,0 +1,131 @@ +array = $compressed; + $this->pos = 0; + } + + public function readUncompressedLength() : int + { + $result = 0; + $shift = 0; + + while ($shift < 32 && $this->pos < \count($this->array)) { + $c = $this->array[$this->pos]; + $this->pos++; + $val = $c & 0x7f; + + if (($val << $shift >> $shift) !== $val) { + return -1; + } + $result |= $val << $shift; + + if ($c < 128) { + return $result; + } + $shift += 7; + } + + return -1; + } + + public function uncompressToBuffer(array &$outBuffer) : bool + { + $array = $this->array; + $arrayLength = \count($array); + $outBuffer = \array_fill(0, $this->readUncompressedLength(), 0); + $pos = $this->pos; + $outPos = 0; + $len = $offset = 0; + + while ($pos < \count($array)) { + $c = $array[$pos]; + $pos++; + + if (($c & 0x3) === 0) { + // Literal + $len = ($c >> 2) + 1; + + if ($len > 60) { + if ($pos + 3 >= $arrayLength) { + return false; + } + $smallLen = $len - 60; + $len = $array[$pos] + ($array[$pos + 1] << 8) + ($array[$pos + 2] << 16) + ($array[$pos + 3] << 24); + $len = ($len & self::WORD_MASK[$smallLen]) + 1; + $pos += $smallLen; + } + + if ($pos + $len > $arrayLength) { + return false; + } + $this->copyBytes($array, $pos, $outBuffer, $outPos, $len); + $pos += $len; + $outPos += $len; + } else { + switch ($c & 0x3) { + case 1: + $len = (($c >> 2) & 0x7) + 4; + $offset = $array[$pos] + (($c >> 5) << 8); + $pos += 1; + + break; + case 2: + if ($pos + 1 >= $arrayLength) { + return false; + } + $len = ($c >> 2) + 1; + $offset = $array[$pos] + ($array[$pos + 1] << 8); + $pos += 2; + + break; + case 3: + if ($pos + 3 >= $arrayLength) { + return false; + } + $len = ($c >> 2) + 1; + $offset = $array[$pos] + ($array[$pos + 1] << 8) + ($array[$pos + 2] << 16) + ($array[$pos + 3] << 24); + $pos += 4; + + break; + } + + if ($offset === 0 || $offset > $outPos) { + return false; + } + $this->selfCopyBytes($outBuffer, $outPos, $offset, $len); + $outPos += $len; + } + } + + return true; + } + + private function copyBytes(array $fromArray, int $fromPos, array &$toArray, int $toPos, int $length) : void + { + for ($i = 0; $i < $length; $i++) { + $toArray[$toPos + $i] = $fromArray[$fromPos + $i]; + } + } + + private function selfCopyBytes(array &$array, int $pos, int $offset, int $length) : void + { + for ($i = 0; $i < $length; $i++) { + $array[$pos + $i] = $array[$pos - $offset + $i]; + } + } +} diff --git a/src/lib/snappy/tests/Flow/Snappy/Tests/Integration/SnappyTest.php b/src/lib/snappy/tests/Flow/Snappy/Tests/Integration/SnappyTest.php new file mode 100644 index 000000000..11cc15b52 --- /dev/null +++ b/src/lib/snappy/tests/Flow/Snappy/Tests/Integration/SnappyTest.php @@ -0,0 +1,93 @@ +markTestSkipped('Snappy extension is not installed'); + } + + $string = 'This is some random string with UTF-8 characters: ąęćźżół'; + + $snappy = new Snappy(); + + $this->assertSame( + $string, + $snappy->uncompress(\snappy_compress($string)) + ); + } + + public function test_decompress_with_extension_text_compressed_with_library() : void + { + if (!\function_exists('snappy_uncompress')) { + $this->markTestSkipped('Snappy extension is not installed'); + } + + $string = 'This is some random string with UTF-8 characters: ąęćźżół'; + + $snappy = new Snappy(); + + $this->assertSame( + $string, + \snappy_uncompress($snappy->compress($string)) + ); + } + + public function test_snappy_compression() : void + { + $string = 'This is some random string'; + + $snappy = new Snappy(); + + $this->assertSame( + $string, + $snappy->uncompress($snappy->compress($string)) + ); + } + + public function test_snappy_compression_on_a_longer_text() : void + { + $string = 'Fuga dolorem cum ut voluptatem alias est. At et atque et voluptatem explicabo. Error rerum quia sit. Amet minima corporis occaecati. Numquam ea molestiae itaque est modi accusamus. Est totam iste et aut. Asperiores voluptatem occaecati quaerat omnis. Consequatur qui voluptas porro natus et fugit consectetur dolor. Iusto voluptatibus libero dolores reiciendis a. Aspernatur tempore sed veritatis modi quis dicta. Eos illum sed ipsum et voluptatum. Et vel perspiciatis magnam ut maiores vitae.'; + + $snappy = new Snappy(); + + $this->assertSame( + $string, + $snappy->uncompress($snappy->compress($string)) + ); + } + + public function test_snappy_compression_with_dynamically_generated_texts() : void + { + $snappy = new Snappy(); + + for ($iteration = 0; $iteration < 100; $iteration++) { + $string = Factory::create()->text(\random_int(10, 1000)); + + $this->assertSame( + $string, + $snappy->uncompress($snappy->compress($string)), + 'Snappy compression/decomression failed at ' . $iteration . ' iteration, with text: "' . $string . '"' + ); + } + } + + public function test_snappy_compression_with_utf_8_characters() : void + { + $string = 'This is some random string with UTF-8 characters: ąęćźżół'; + + $snappy = new Snappy(); + + $this->assertSame( + $string, + $snappy->uncompress($snappy->compress($string)) + ); + } +}