Skip to content

Commit

Permalink
Fixed handling negative numbers in parquet binary reader/writer (#1096)
Browse files Browse the repository at this point in the history
* Fixed handling negative numbrers in parquet binary reader/writer

* Fixed failing tests
  • Loading branch information
norberttech authored Jun 10, 2024
1 parent f9fd4c1 commit 9f175e3
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 18 deletions.
5 changes: 5 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/BinaryReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public function readDoubles(int $total) : array;
*/
public function readFloats(int $total) : array;

/**
* @return array<int>
*/
public function readInts16(int $total) : array;

/**
* @return array<int>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,30 @@ public function readFloats(int $total) : array
return $floats;
}

public function readInt32() : int
/**
* @return array<int>
*/
public function readInts16(int $total) : array
{
$bytes = $this->readBytes(4)->toArray();
$intBytes = \array_chunk($this->readBytes(2 * $total)->toArray(), 2);
$ints = [];

foreach ($intBytes as $bytes) {

if ($this->byteOrder === ByteOrder::LITTLE_ENDIAN) {
$integer = $bytes[0] | ($bytes[1] << 8);
} else {
$integer = ($bytes[0] << 24) | ($bytes[1] << 16);
}

if ($integer & 0x8000) {
$integer = -((~$integer & 0xFFFF) + 1);
}

if ($this->byteOrder === ByteOrder::LITTLE_ENDIAN) {
return $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24);
$ints[] = $integer;
}

return ($bytes[0] << 24) | ($bytes[1] << 16) | ($bytes[2] << 8) | $bytes[3];
return $ints;
}

/**
Expand All @@ -182,10 +197,16 @@ public function readInts32(int $total) : array

foreach ($intBytes as $bytes) {
if ($this->byteOrder === ByteOrder::LITTLE_ENDIAN) {
$ints[] = $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24);
$int = $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24);
} else {
$ints[] = ($bytes[0] << 24) | ($bytes[1] << 16) | ($bytes[2] << 8) | $bytes[3];
$ints = ($bytes[0] << 24) | ($bytes[1] << 16) | ($bytes[2] << 8) | $bytes[3];
}

if ($int & 0x80000000) {
$int = -((~$int & 0xFFFFFFFF) + 1); // Two's complement
}

$ints[] = $int;
}

return $ints;
Expand All @@ -199,12 +220,22 @@ public function readInts64(int $total) : array

foreach ($intBytes as $bytes) {
if ($this->byteOrder === ByteOrder::LITTLE_ENDIAN) {
$ints[] = $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24) |
$int = $bytes[0] | ($bytes[1] << 8) | ($bytes[2] << 16) | ($bytes[3] << 24) |
($bytes[4] << 32) | ($bytes[5] << 40) | ($bytes[6] << 48) | ($bytes[7] << 56);
$sign = $bytes[7];
} else {
$ints[] = ($bytes[0] << 56) | ($bytes[1] << 48) | ($bytes[2] << 40) | ($bytes[3] << 32) |
$int = ($bytes[0] << 56) | ($bytes[1] << 48) | ($bytes[2] << 40) | ($bytes[3] << 32) |
($bytes[4] << 24) | ($bytes[5] << 16) | ($bytes[6] << 8) | $bytes[7];
$sign = $bytes[7];
}

if ($sign & 0x80) {
$int |= (-1 ^ 0xFFFFFFFFFFFFFFFF) << 56;
} else {
$int |= $sign << 56;
}

$ints[] = $int;
}

return $ints;
Expand Down
5 changes: 5 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/BinaryWriter.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public function writeDoubles(array $doubles) : void;
*/
public function writeFloats(array $floats) : void;

/**
* @param array<int> $ints
*/
public function writeInts16(array $ints) : void;

/**
* @param array<int> $ints
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ public function writeFloats(array $floats) : void
}
}

public function writeInts16(array $ints) : void
{
$format = $this->byteOrder === ByteOrder::BIG_ENDIAN ? 'n' : 'v';

foreach ($ints as $int) {
$this->buffer .= \pack($format, $int);
}
}

public function writeInts32(array $ints) : void
{
$format = $this->byteOrder === ByteOrder::BIG_ENDIAN ? 'N' : 'V';
Expand Down
9 changes: 8 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@

namespace Flow\Parquet\Data;

use Flow\Parquet\Data\Converter\{BytesStringConverter, Int32DateConverter, Int32DateTimeConverter, Int64DateTimeConverter, Int96DateTimeConverter, JsonConverter, TimeConverter, UuidConverter};
use Flow\Parquet\Data\Converter\{BytesStringConverter,
Int32DateConverter,
Int32DateTimeConverter,
Int64DateTimeConverter,
Int96DateTimeConverter,
JsonConverter,
TimeConverter,
UuidConverter};
use Flow\Parquet\Exception\DataConversionException;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Flow\Parquet\BinaryReader;
use Flow\Parquet\Exception\RuntimeException;
use Flow\Parquet\ParquetFile\Schema\{FlatColumn, LogicalType, PhysicalType};
use Flow\Parquet\ParquetFile\Schema\{ConvertedType, FlatColumn, LogicalType, PhysicalType};

final class PlainValueUnpacker
{
Expand All @@ -23,7 +23,10 @@ public function __construct(private readonly BinaryReader $reader)
public function unpack(FlatColumn $column, int $total) : array
{
return match ($column->type()) {
PhysicalType::INT32 => $this->reader->readInts32($total),
PhysicalType::INT32 => match ($column->convertedType()) {
ConvertedType::INT_16 => $this->reader->readInts16($total),
default => $this->reader->readInts32($total),
},
PhysicalType::INT64 => $this->reader->readInts64($total),
PhysicalType::INT96 => $this->reader->readInts96($total),
PhysicalType::FLOAT => $this->reader->readFloats($total),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ public static function decimalProvider() : array
];
}

public function test_writing_and_reading_big_integers() : void
{
$buffer = '';
$ints = [];

for ($i = 0; $i < 10000; $i++) {
$ints[] = $i;
$ints[] = -$i;
}

(new BinaryBufferWriter($buffer))->writeInts64($ints);
self::assertEquals(
$ints,
(new BinaryBufferReader($buffer))->readInts64(\count($ints)),
);
}

#[DataProvider('decimalProvider')]
public function test_writing_and_reading_decimals(array $decimals, int $precision, int $scale) : void
{
Expand Down Expand Up @@ -64,6 +81,40 @@ public function test_writing_and_reading_floats() : void
);
}

public function test_writing_and_reading_integers() : void
{
$buffer = '';
$ints = [];

for ($i = 0; $i < 10000; $i++) {
$ints[] = $i;
$ints[] = -$i;
}

(new BinaryBufferWriter($buffer))->writeInts32($ints);
self::assertEquals(
$ints,
(new BinaryBufferReader($buffer))->readInts32(\count($ints)),
);
}

public function test_writing_and_reading_small_integers() : void
{
$buffer = '';
$ints = [];

for ($i = 0; $i < 1000; $i++) {
$ints[] = $i;
$ints[] = -$i;
}

(new BinaryBufferWriter($buffer))->writeInts16($ints);
self::assertEquals(
$ints,
(new BinaryBufferReader($buffer))->readInts16(\count($ints)),
);
}

public function test_writing_and_reading_strings() : void
{
$buffer = '';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public function test_nonullable_impala() : void
'ID' => 8,
'Int_Array' => [
'list' => [
'element' => 4294967295,
'element' => -1,
],
],
'int_array_array' => [
'list' => [
'element' => [
'list' => [
'element' => [
[4294967295, 4294967294],
[-1, -2],
[null],
],
],
Expand All @@ -45,7 +45,7 @@ public function test_nonullable_impala() : void
'Int_Map' => [
'map' => [
'key' => 'k1',
'value' => 4294967295,
'value' => -1,
],
],
'int_map_array' => [
Expand All @@ -58,10 +58,10 @@ public function test_nonullable_impala() : void
],
],
'nested_Struct' => [
'a' => 4294967295,
'a' => -1,
'B' => [
'list' => [
'element' => 4294967295,
'element' => -1,
],
],
'c' => [
Expand All @@ -70,7 +70,7 @@ public function test_nonullable_impala() : void
'element' => [
'list' => [
'element' => [
'e' => 4294967295,
'e' => -1,
'f' => 'nonnullable',
],
],
Expand Down

0 comments on commit 9f175e3

Please sign in to comment.