diff --git a/src/lib/parquet/src/Flow/Parquet/BinaryReader.php b/src/lib/parquet/src/Flow/Parquet/BinaryReader.php index bfec4166b..593446d63 100644 --- a/src/lib/parquet/src/Flow/Parquet/BinaryReader.php +++ b/src/lib/parquet/src/Flow/Parquet/BinaryReader.php @@ -41,6 +41,11 @@ public function readDoubles(int $total) : array; */ public function readFloats(int $total) : array; + /** + * @return array + */ + public function readInts16(int $total) : array; + /** * @return array */ diff --git a/src/lib/parquet/src/Flow/Parquet/BinaryReader/BinaryBufferReader.php b/src/lib/parquet/src/Flow/Parquet/BinaryReader/BinaryBufferReader.php index f93c8ab19..43b85816d 100644 --- a/src/lib/parquet/src/Flow/Parquet/BinaryReader/BinaryBufferReader.php +++ b/src/lib/parquet/src/Flow/Parquet/BinaryReader/BinaryBufferReader.php @@ -161,15 +161,30 @@ public function readFloats(int $total) : array return $floats; } - public function readInt32() : int + /** + * @return array + */ + public function readInts16(int $total) : array { - $bytes = $this->readBytes(4)->toArray(); + $intBytes = \array_chunk($this->readBytes(2 * $total)->toArray(), 2); + $ints = []; + + foreach ($intBytes as $bytes) { + + if ($this->byteOrder === ByteOrder::LITTLE_ENDIAN) { + $integer = $bytes[0] | ($bytes[1] << 8); + } else { + $integer = ($bytes[0] << 24) | ($bytes[1] << 16); + } + + if ($integer & 0x8000) { + $integer = -((~$integer & 0xFFFF) + 1); + } - if ($this->byteOrder === ByteOrder::LITTLE_ENDIAN) { - return $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24); + $ints[] = $integer; } - return ($bytes[0] << 24) | ($bytes[1] << 16) | ($bytes[2] << 8) | $bytes[3]; + return $ints; } /** @@ -182,10 +197,16 @@ public function readInts32(int $total) : array foreach ($intBytes as $bytes) { if ($this->byteOrder === ByteOrder::LITTLE_ENDIAN) { - $ints[] = $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24); + $int = $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24); } else { - $ints[] = ($bytes[0] << 24) | ($bytes[1] << 16) | ($bytes[2] << 8) | $bytes[3]; + $ints = ($bytes[0] << 24) | ($bytes[1] << 16) | ($bytes[2] << 8) | $bytes[3]; } + + if ($int & 0x80000000) { + $int = -((~$int & 0xFFFFFFFF) + 1); // Two's complement + } + + $ints[] = $int; } return $ints; @@ -199,12 +220,22 @@ public function readInts64(int $total) : array foreach ($intBytes as $bytes) { if ($this->byteOrder === ByteOrder::LITTLE_ENDIAN) { - $ints[] = $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24) | + $int = $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24) | ($bytes[4] << 32) | ($bytes[5] << 40) | ($bytes[6] << 48) | ($bytes[7] << 56); + $sign = $bytes[7]; } else { - $ints[] = ($bytes[0] << 56) | ($bytes[1] << 48) | ($bytes[2] << 40) | ($bytes[3] << 32) | + $int = ($bytes[0] << 56) | ($bytes[1] << 48) | ($bytes[2] << 40) | ($bytes[3] << 32) | ($bytes[4] << 24) | ($bytes[5] << 16) | ($bytes[6] << 8) | $bytes[7]; + $sign = $bytes[7]; + } + + if ($sign & 0x80) { + $int |= (-1 ^ 0xFFFFFFFFFFFFFFFF) << 56; + } else { + $int |= $sign << 56; } + + $ints[] = $int; } return $ints; diff --git a/src/lib/parquet/src/Flow/Parquet/BinaryWriter.php b/src/lib/parquet/src/Flow/Parquet/BinaryWriter.php index fa644af75..c159d207a 100644 --- a/src/lib/parquet/src/Flow/Parquet/BinaryWriter.php +++ b/src/lib/parquet/src/Flow/Parquet/BinaryWriter.php @@ -40,6 +40,11 @@ public function writeDoubles(array $doubles) : void; */ public function writeFloats(array $floats) : void; + /** + * @param array $ints + */ + public function writeInts16(array $ints) : void; + /** * @param array $ints */ diff --git a/src/lib/parquet/src/Flow/Parquet/BinaryWriter/BinaryBufferWriter.php b/src/lib/parquet/src/Flow/Parquet/BinaryWriter/BinaryBufferWriter.php index 95639c464..b89f131bd 100644 --- a/src/lib/parquet/src/Flow/Parquet/BinaryWriter/BinaryBufferWriter.php +++ b/src/lib/parquet/src/Flow/Parquet/BinaryWriter/BinaryBufferWriter.php @@ -108,6 +108,15 @@ public function writeFloats(array $floats) : void } } + public function writeInts16(array $ints) : void + { + $format = $this->byteOrder === ByteOrder::BIG_ENDIAN ? 'n' : 'v'; + + foreach ($ints as $int) { + $this->buffer .= \pack($format, $int); + } + } + public function writeInts32(array $ints) : void { $format = $this->byteOrder === ByteOrder::BIG_ENDIAN ? 'N' : 'V'; diff --git a/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php b/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php index cc84cb3b0..420a6691f 100644 --- a/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php +++ b/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php @@ -4,7 +4,14 @@ namespace Flow\Parquet\Data; -use Flow\Parquet\Data\Converter\{BytesStringConverter, Int32DateConverter, Int32DateTimeConverter, Int64DateTimeConverter, Int96DateTimeConverter, JsonConverter, TimeConverter, UuidConverter}; +use Flow\Parquet\Data\Converter\{BytesStringConverter, + Int32DateConverter, + Int32DateTimeConverter, + Int64DateTimeConverter, + Int96DateTimeConverter, + JsonConverter, + TimeConverter, + UuidConverter}; use Flow\Parquet\Exception\DataConversionException; use Flow\Parquet\Options; use Flow\Parquet\ParquetFile\Schema\FlatColumn; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValueUnpacker.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValueUnpacker.php index a827dad32..9fb994773 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValueUnpacker.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValueUnpacker.php @@ -6,7 +6,7 @@ use Flow\Parquet\BinaryReader; use Flow\Parquet\Exception\RuntimeException; -use Flow\Parquet\ParquetFile\Schema\{FlatColumn, LogicalType, PhysicalType}; +use Flow\Parquet\ParquetFile\Schema\{ConvertedType, FlatColumn, LogicalType, PhysicalType}; final class PlainValueUnpacker { @@ -23,7 +23,10 @@ public function __construct(private readonly BinaryReader $reader) public function unpack(FlatColumn $column, int $total) : array { return match ($column->type()) { - PhysicalType::INT32 => $this->reader->readInts32($total), + PhysicalType::INT32 => match ($column->convertedType()) { + ConvertedType::INT_16 => $this->reader->readInts16($total), + default => $this->reader->readInts32($total), + }, PhysicalType::INT64 => $this->reader->readInts64($total), PhysicalType::INT96 => $this->reader->readInts96($total), PhysicalType::FLOAT => $this->reader->readFloats($total), diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/Binary/BinaryReaderWriterTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/Binary/BinaryReaderWriterTest.php index a82a375e8..a8a2e56e4 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/Binary/BinaryReaderWriterTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/Binary/BinaryReaderWriterTest.php @@ -37,6 +37,23 @@ public static function decimalProvider() : array ]; } + public function test_writing_and_reading_big_integers() : void + { + $buffer = ''; + $ints = []; + + for ($i = 0; $i < 10000; $i++) { + $ints[] = $i; + $ints[] = -$i; + } + + (new BinaryBufferWriter($buffer))->writeInts64($ints); + self::assertEquals( + $ints, + (new BinaryBufferReader($buffer))->readInts64(\count($ints)), + ); + } + #[DataProvider('decimalProvider')] public function test_writing_and_reading_decimals(array $decimals, int $precision, int $scale) : void { @@ -64,6 +81,57 @@ public function test_writing_and_reading_floats() : void ); } + public function test_writing_and_reading_integers() : void + { + $buffer = ''; + $ints = []; + + for ($i = 0; $i < 10000; $i++) { + $ints[] = $i; + $ints[] = -$i; + } + + (new BinaryBufferWriter($buffer))->writeInts32($ints); + self::assertEquals( + $ints, + (new BinaryBufferReader($buffer))->readInts32(\count($ints)), + ); + } + + public function test_writing_and_reading_large_integers() : void + { + $buffer = ''; + $ints = []; + + for ($i = 0; $i < 10000; $i++) { + $ints[] = $i; + $ints[] = -$i; + } + + (new BinaryBufferWriter($buffer))->writeInts96($ints); + self::assertEquals( + $ints, + (new BinaryBufferReader($buffer))->readInts96(\count($ints)), + ); + } + + public function test_writing_and_reading_small_integers() : void + { + $buffer = ''; + $ints = []; + + for ($i = 0; $i < 1000; $i++) { + $ints[] = $i; + $ints[] = -$i; + } + + (new BinaryBufferWriter($buffer))->writeInts16($ints); + self::assertEquals( + $ints, + (new BinaryBufferReader($buffer))->readInts16(\count($ints)), + ); + } + public function test_writing_and_reading_strings() : void { $buffer = '';