Skip to content

Commit

Permalink
Bypass schema infering when reading from parquet files (#975)
Browse files Browse the repository at this point in the history
* Bypass schema infering when reading from parquet files

* Fixed failing benchmarks
  • Loading branch information
norberttech authored Feb 9, 2024
1 parent 21f82ee commit 3aed55d
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtra
use Limitable;
use PartitionFiltering;

private SchemaConverter $schemaConverter;

/**
* @param Path $path
* @param array<string> $columns
Expand All @@ -37,6 +39,7 @@ public function __construct(
private readonly ?int $offset = null
) {
$this->resetLimit();
$this->schemaConverter = new SchemaConverter();

if ($this->path->isPattern() && $this->offset !== null) {
throw new InvalidArgumentException('Offset can be used only with single file path, not with pattern');
Expand All @@ -48,12 +51,14 @@ public function extract(FlowContext $context) : \Generator
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($this->readers($context) as $fileData) {
$flowSchema = $this->schemaConverter->fromParquet($fileData['file']->schema());

foreach ($fileData['file']->values($this->columns, $this->limit(), $this->offset) as $row) {
if ($shouldPutInputIntoRows) {
$row['_input_file_uri'] = $fileData['uri'];
}

$signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions']);
$signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions'], $flowSchema);

$this->countRow();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@

namespace Flow\ETL\Adapter\Parquet;

use function Flow\ETL\DSL\list_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\ETL\DSL\struct_schema;
use function Flow\ETL\DSL\struct_type;
use function Flow\ETL\DSL\structure_element;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_map;
use function Flow\ETL\DSL\type_object;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
use Flow\ETL\PHP\Type\Logical\JsonType;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Logical\Map\MapKey;
use Flow\ETL\PHP\Type\Logical\Map\MapValue;
use Flow\ETL\PHP\Type\Logical\MapType;
use Flow\ETL\PHP\Type\Logical\Structure\StructureElement;
use Flow\ETL\PHP\Type\Logical\StructureType;
use Flow\ETL\PHP\Type\Logical\UuidType;
use Flow\ETL\PHP\Type\Logical\XMLNodeType;
Expand All @@ -26,6 +35,17 @@

final class SchemaConverter
{
public function fromParquet(ParquetSchema $schema) : Schema
{
$definitions = [];

foreach ($schema->columns() as $column) {
$definitions[] = $this->fromParquetColumnToFlowDefinition($column);
}

return \Flow\ETL\DSL\schema(...$definitions);
}

public function toParquet(Schema $schema) : ParquetSchema
{
$columns = [];
Expand Down Expand Up @@ -242,4 +262,85 @@ private function flowTypeToParquetType(string $name, Type $type) : Column

throw new RuntimeException($type::class . ' is not supported.');
}

private function fromParquetColumnToFlowDefinition(Column $column) : Schema\Definition
{
if ($column instanceof FlatColumn) {
return $this->parquetFlatToFlowType($column);
}

/** @var NestedColumn $column */
return $this->parquetNestedToFlowType($column);
}

private function parquetFlatToFlowType(FlatColumn $column) : Schema\Definition
{
$logicalType = $column->logicalType();

if ($logicalType === null) {
return match ($column->type()) {
ParquetSchema\PhysicalType::INT32 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::INT64 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::BOOLEAN => Schema\Definition::boolean($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::DOUBLE => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::FLOAT => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::BYTE_ARRAY => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
default => throw new RuntimeException($column->type()->name . ' is not supported.')
};
}

return match ($logicalType->name()) {
ParquetSchema\LogicalType::STRING => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::DATE => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::TIME => Schema\Definition::object($column->name(), type_object(\DateInterval::class, $column->repetition() === ParquetSchema\Repetition::OPTIONAL)),
ParquetSchema\LogicalType::TIMESTAMP => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::UUID => Schema\Definition::uuid($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::JSON => Schema\Definition::json($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::DECIMAL => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::INTEGER => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
default => throw new RuntimeException($logicalType->name() . ' is not supported.')
};
}

private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definition
{
if ($column->isList()) {
return list_schema(
$column->name(),
type_list(
$this->fromParquetColumnToFlowDefinition($column->getListElement())->type(),
$column->repetition() === ParquetSchema\Repetition::OPTIONAL
)
);
}

if ($column->isMap()) {
$keyType = $this->fromParquetColumnToFlowDefinition($column->getMapKeyColumn())->type();

if (!$keyType instanceof ScalarType) {
throw new RuntimeException('Flow expects map key type to be scalar type.');
}

return map_schema(
$column->name(),
type_map(
$keyType,
$this->fromParquetColumnToFlowDefinition($column->getMapValueColumn())->type(),
$column->repetition() === ParquetSchema\Repetition::OPTIONAL
)
);
}

/** @var array<StructureElement> $elements */
$elements = [];

foreach ($column->children() as $structColumn) {
$elements[] = structure_element(
$structColumn->name(),
$this->fromParquetColumnToFlowDefinition($structColumn)->type()
);
}

return struct_schema($column->name(), struct_type($elements, $column->repetition() === ParquetSchema\Repetition::OPTIONAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

use function Flow\ETL\Adapter\Parquet\from_parquet;
use function Flow\ETL\Adapter\Parquet\to_parquet;
use function Flow\ETL\DSL\str_entry;
use Flow\ETL\Config;
use Flow\ETL\FlowContext;
use Flow\ETL\Row;
use Flow\ETL\Rows;
use PhpBench\Attributes\Groups;

Expand All @@ -27,10 +25,6 @@ public function __construct()
$this->rows = new Rows();

foreach (from_parquet(__DIR__ . '/../Fixtures/orders_flow.parquet')->extract($this->context) as $rows) {
$rows = $rows->map(static function (Row $row) : Row {
return $row->set(str_entry('order_id', $row->valueOf('order_id')->toString()));
});

$this->rows = $this->rows->merge($rows);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ public function test_writing_with_partitioning() : void
->run();

$this->assertEquals(
$rows,
$rows->toArray(),
(new Flow())
->read(from_parquet($path . '/**/*.parquet'))
->drop('date')
->sortBy(ref('datetime')->asc())
->fetch()
->toArray()
);

$this->assertSame(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use Flow\Parquet\ParquetFile\Schema\NestedColumn;
use PHPUnit\Framework\TestCase;

final class SchemaConverterTest extends TestCase
final class FlowToParquetSchemaTest extends TestCase
{
public function test_convert_array_entry_to_parquet_array() : void
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Parquet\Tests\Unit;

use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\datetime_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\json_schema;
use function Flow\ETL\DSL\list_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\ETL\DSL\object_schema;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\struct_element;
use function Flow\ETL\DSL\struct_schema;
use function Flow\ETL\DSL\type_boolean;
use function Flow\ETL\DSL\type_int;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_map;
use function Flow\ETL\DSL\type_object;
use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\type_structure;
use function Flow\ETL\DSL\type_uuid;
use function Flow\ETL\DSL\uuid_schema;
use Flow\ETL\Adapter\Parquet\SchemaConverter;
use Flow\Parquet\ParquetFile\Schema;
use Flow\Parquet\ParquetFile\Schema\MapKey;
use Flow\Parquet\ParquetFile\Schema\MapValue;
use PHPUnit\Framework\TestCase;

final class ParquetToFlowSchemaTest extends TestCase
{
public function test_converting_flat_fields_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\FlatColumn::int32('int32'),
Schema\FlatColumn::int64('int64'),
Schema\FlatColumn::string('string'),
Schema\FlatColumn::float('float'),
Schema\FlatColumn::double('double'),
Schema\FlatColumn::decimal('decimal'),
Schema\FlatColumn::boolean('boolean'),
Schema\FlatColumn::date('date'),
Schema\FlatColumn::time('time'),
Schema\FlatColumn::dateTime('datetime'),
Schema\FlatColumn::uuid('uuid'),
Schema\FlatColumn::json('json'),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
int_schema('int32', true),
int_schema('int64', true),
str_schema('string', true),
float_schema('float', true),
float_schema('double', true),
float_schema('decimal', true),
bool_schema('boolean', true),
datetime_schema('date', true),
object_schema('time', type_object(\DateInterval::class, true)),
datetime_schema('datetime', true),
uuid_schema('uuid', true),
json_schema('json', true),
),
$flowSchema
);
}

public function test_converting_list_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::list('list', Schema\ListElement::string()),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
list_schema('list', type_list(type_string(true), true))
),
$flowSchema,
);
}

public function test_converting_map_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::map('map', MapKey::string(), MapValue::int64()),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
map_schema('map', type_map(type_string(), type_int(true), true))
),
$flowSchema,
);
}

public function test_converting_struct_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::struct(
'struct',
[
Schema\FlatColumn::uuid('uuid'),
Schema\FlatColumn::string('name'),
Schema\FlatColumn::boolean('active'),
]
),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
struct_schema(
'struct',
type_structure(
[
struct_element('uuid', type_uuid(true)),
struct_element('name', type_string(true)),
struct_element('active', type_boolean(true)),
],
true
),
)
),
$flowSchema,
);
}
}
7 changes: 5 additions & 2 deletions src/lib/parquet/src/Flow/Parquet/BinaryReader/Bytes.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

final class Bytes implements \ArrayAccess, \Countable, \IteratorAggregate
{
private readonly \ArrayIterator $iterator;
private \ArrayIterator|null $iterator = null;

private readonly DataSize $size;

Expand All @@ -18,7 +18,6 @@ public function __construct(
private readonly ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN
) {
$this->size = new DataSize(\count($this->bytes) * 8);
$this->iterator = new \ArrayIterator($this->bytes);
}

public static function fromString(string $string, ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN) : self
Expand All @@ -36,6 +35,10 @@ public function count() : int
// IteratorAggregate methods
public function getIterator() : \ArrayIterator
{
if ($this->iterator === null) {
$this->iterator = new \ArrayIterator($this->bytes);
}

return $this->iterator;
}

Expand Down
Loading

0 comments on commit 3aed55d

Please sign in to comment.