Skip to content

Commit

Permalink
Fixed Flow to Parquet Schema converter not respecting deprecated parq…
Browse files Browse the repository at this point in the history
…uet converted types
  • Loading branch information
norberttech committed Jul 5, 2024
1 parent d8d5b01 commit 760e697
Showing 1 changed file with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,22 @@

namespace Flow\ETL\Adapter\Parquet;

use function Flow\ETL\DSL\{list_schema, map_schema, struct_schema, struct_type, structure_element, type_list, type_map, type_object};
use function Flow\ETL\DSL\{bool_schema,
datetime_schema,
float_schema,
int_schema,
json_schema,
list_schema,
map_schema,
object_schema,
str_schema,
struct_schema,
struct_type,
structure_element,
type_list,
type_map,
type_object,
uuid_schema};
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\Logical\Map\{MapKey, MapValue};
use Flow\ETL\PHP\Type\Logical\Structure\StructureElement;
Expand Down Expand Up @@ -259,39 +274,46 @@ private function parquetFlatToFlowType(FlatColumn $column) : Schema\Definition
{
$logicalType = $column->logicalType();

$nullable = $column->repetition() === ParquetSchema\Repetition::OPTIONAL;

if ($logicalType === null) {
return match ($column->type()) {
ParquetSchema\PhysicalType::INT32 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::INT64 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::BOOLEAN => Schema\Definition::boolean($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::DOUBLE => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::FLOAT => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::BYTE_ARRAY => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::INT32 => match ($column->convertedType()) {
ParquetSchema\ConvertedType::DATE => datetime_schema($column->name(), $nullable),
default => int_schema($column->name(), $nullable)
},
ParquetSchema\PhysicalType::INT64 => int_schema($column->name(), $nullable),
ParquetSchema\PhysicalType::BOOLEAN => bool_schema($column->name(), $nullable),
ParquetSchema\PhysicalType::DOUBLE => float_schema($column->name(), $nullable),
ParquetSchema\PhysicalType::FLOAT => float_schema($column->name(), $nullable),
ParquetSchema\PhysicalType::BYTE_ARRAY => str_schema($column->name(), $nullable),
default => throw new RuntimeException($column->type()->name . ' is not supported.')
};
}

return match ($logicalType->name()) {
ParquetSchema\LogicalType::STRING => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::DATE => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::TIME => Schema\Definition::object($column->name(), type_object(\DateInterval::class, $column->repetition() === ParquetSchema\Repetition::OPTIONAL)),
ParquetSchema\LogicalType::TIMESTAMP => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::UUID => Schema\Definition::uuid($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::JSON => Schema\Definition::json($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::DECIMAL => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::INTEGER => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::STRING => str_schema($column->name(), $nullable),
ParquetSchema\LogicalType::DATE => datetime_schema($column->name(), $nullable),
ParquetSchema\LogicalType::TIME => object_schema($column->name(), type_object(\DateInterval::class, $nullable)),
ParquetSchema\LogicalType::TIMESTAMP => datetime_schema($column->name(), $nullable),
ParquetSchema\LogicalType::UUID => uuid_schema($column->name(), $nullable),
ParquetSchema\LogicalType::JSON => json_schema($column->name(), $nullable),
ParquetSchema\LogicalType::DECIMAL => float_schema($column->name(), $nullable),
ParquetSchema\LogicalType::INTEGER => int_schema($column->name(), $nullable),
default => throw new RuntimeException($logicalType->name() . ' is not supported.')
};
}

private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definition
{
$nullable = $column->repetition() === ParquetSchema\Repetition::OPTIONAL;

if ($column->isList()) {
return list_schema(
$column->name(),
type_list(
$this->fromParquetColumnToFlowDefinition($column->getListElement())->type(),
$column->repetition() === ParquetSchema\Repetition::OPTIONAL
$nullable
)
);
}
Expand All @@ -308,7 +330,7 @@ private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definiti
type_map(
$keyType,
$this->fromParquetColumnToFlowDefinition($column->getMapValueColumn())->type(),
$column->repetition() === ParquetSchema\Repetition::OPTIONAL
$nullable
)
);
}
Expand All @@ -323,6 +345,6 @@ private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definiti
);
}

return struct_schema($column->name(), struct_type($elements, $column->repetition() === ParquetSchema\Repetition::OPTIONAL));
return struct_schema($column->name(), struct_type($elements, $nullable));
}
}

0 comments on commit 760e697

Please sign in to comment.