Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed parquet writer performance degradation #657

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public function build(FlatColumn $column, array $rows) : Dictionary
case PhysicalType::FLOAT:
case PhysicalType::DOUBLE:
return (new FloatDictionaryBuilder())->build($rows);
case PhysicalType::FIXED_LEN_BYTE_ARRAY:
case PhysicalType::BYTE_ARRAY:
switch ($column->logicalType()?->name()) {
case LogicalType::STRING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Flow\Parquet\ThriftStream;

use Thrift\Exception\TException;
use Thrift\Factory\TStringFuncFactory;
use Thrift\Transport\TTransport;

final class TPhpFileStream extends TTransport
Expand Down Expand Up @@ -63,15 +62,16 @@ public function read($len)

public function write($buf) : void
{
while (TStringFuncFactory::create()->strlen($buf) > 0) {
while ($buf != '') {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed, how terrible those default classes from Thrift 🤦 instead of using strlen, they are creating an object that is using strlen inside 🤦 🤦 🤦 I might need to look into other classes and create our own custom implementations just slightly optimized or maybe even optimize it directly in thrift repo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be strict check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah shit 🤦 IDE replaced this for me and I didn't notice it's != instead of !==. It wont affect anything as there can't be anything different than string here, will correct that later. Do you know if there is a rule in phpstan/psalm maybe that could enforce strict checks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a cs-fixer rule: https://cs.symfony.com/doc/rules/strict/strict_comparison.html but is really dumb one, and will lead to replacing every == with === without checking the code, so objects comparison will be also replaced will fail after applying.

$got = @\fwrite($this->stream, $buf);

if ($got === 0 || $got === false) {
throw new TException(
'TPhpStream: Could not write ' . TStringFuncFactory::create()->strlen($buf) . ' bytes'
'TPhpStream: Could not write ' . \strlen($buf) . ' bytes'
);
}
$buf = TStringFuncFactory::create()->substr($buf, $got);

$buf = \substr($buf, $got);
}
}
}
10 changes: 6 additions & 4 deletions src/lib/parquet/src/Flow/Parquet/Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ public function write(string $path, Schema $schema, iterable $rows) : void
foreach ($rows as $row) {
$this->rowGroupBuilder($schema)->addRow($row);

$rowGroupContainer = $this->rowGroupBuilder($schema)->flush($fileOffset);
\fwrite($stream, $rowGroupContainer->binaryBuffer);
$metadata->rowGroups()->add($rowGroupContainer->rowGroup);
$fileOffset += \strlen($rowGroupContainer->binaryBuffer);
if ($this->rowGroupBuilder($schema)->isFull()) {
$rowGroupContainer = $this->rowGroupBuilder($schema)->flush($fileOffset);
\fwrite($stream, $rowGroupContainer->binaryBuffer);
$metadata->rowGroups()->add($rowGroupContainer->rowGroup);
$fileOffset += \strlen($rowGroupContainer->binaryBuffer);
}
}

if (!$this->rowGroupBuilder($schema)->isEmpty()) {
Expand Down