diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/SchemaConverter.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/SchemaConverter.php index d0539f438..2ffb2afa4 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/SchemaConverter.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/SchemaConverter.php @@ -4,7 +4,22 @@ namespace Flow\ETL\Adapter\Parquet; -use function Flow\ETL\DSL\{list_schema, map_schema, struct_schema, struct_type, structure_element, type_list, type_map, type_object}; +use function Flow\ETL\DSL\{bool_schema, + datetime_schema, + float_schema, + int_schema, + json_schema, + list_schema, + map_schema, + object_schema, + str_schema, + struct_schema, + struct_type, + structure_element, + type_list, + type_map, + type_object, + uuid_schema}; use Flow\ETL\Exception\RuntimeException; use Flow\ETL\PHP\Type\Logical\Map\{MapKey, MapValue}; use Flow\ETL\PHP\Type\Logical\Structure\StructureElement; @@ -259,39 +274,46 @@ private function parquetFlatToFlowType(FlatColumn $column) : Schema\Definition { $logicalType = $column->logicalType(); + $nullable = $column->repetition() === ParquetSchema\Repetition::OPTIONAL; + if ($logicalType === null) { return match ($column->type()) { - ParquetSchema\PhysicalType::INT32 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\PhysicalType::INT64 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\PhysicalType::BOOLEAN => Schema\Definition::boolean($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\PhysicalType::DOUBLE => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\PhysicalType::FLOAT => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\PhysicalType::BYTE_ARRAY => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\PhysicalType::INT32 => match ($column->convertedType()) { + ParquetSchema\ConvertedType::DATE => datetime_schema($column->name(), $nullable), + default => int_schema($column->name(), $nullable) + }, + ParquetSchema\PhysicalType::INT64 => int_schema($column->name(), $nullable), + ParquetSchema\PhysicalType::BOOLEAN => bool_schema($column->name(), $nullable), + ParquetSchema\PhysicalType::DOUBLE => float_schema($column->name(), $nullable), + ParquetSchema\PhysicalType::FLOAT => float_schema($column->name(), $nullable), + ParquetSchema\PhysicalType::BYTE_ARRAY => str_schema($column->name(), $nullable), default => throw new RuntimeException($column->type()->name . ' is not supported.') }; } return match ($logicalType->name()) { - ParquetSchema\LogicalType::STRING => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\LogicalType::DATE => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\LogicalType::TIME => Schema\Definition::object($column->name(), type_object(\DateInterval::class, $column->repetition() === ParquetSchema\Repetition::OPTIONAL)), - ParquetSchema\LogicalType::TIMESTAMP => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\LogicalType::UUID => Schema\Definition::uuid($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\LogicalType::JSON => Schema\Definition::json($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\LogicalType::DECIMAL => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), - ParquetSchema\LogicalType::INTEGER => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\LogicalType::STRING => str_schema($column->name(), $nullable), + ParquetSchema\LogicalType::DATE => datetime_schema($column->name(), $nullable), + ParquetSchema\LogicalType::TIME => object_schema($column->name(), type_object(\DateInterval::class, $nullable)), + ParquetSchema\LogicalType::TIMESTAMP => datetime_schema($column->name(), $nullable), + ParquetSchema\LogicalType::UUID => uuid_schema($column->name(), $nullable), + ParquetSchema\LogicalType::JSON => json_schema($column->name(), $nullable), + ParquetSchema\LogicalType::DECIMAL => float_schema($column->name(), $nullable), + ParquetSchema\LogicalType::INTEGER => int_schema($column->name(), $nullable), default => throw new RuntimeException($logicalType->name() . ' is not supported.') }; } private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definition { + $nullable = $column->repetition() === ParquetSchema\Repetition::OPTIONAL; + if ($column->isList()) { return list_schema( $column->name(), type_list( $this->fromParquetColumnToFlowDefinition($column->getListElement())->type(), - $column->repetition() === ParquetSchema\Repetition::OPTIONAL + $nullable ) ); } @@ -308,7 +330,7 @@ private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definiti type_map( $keyType, $this->fromParquetColumnToFlowDefinition($column->getMapValueColumn())->type(), - $column->repetition() === ParquetSchema\Repetition::OPTIONAL + $nullable ) ); } @@ -323,6 +345,6 @@ private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definiti ); } - return struct_schema($column->name(), struct_type($elements, $column->repetition() === ParquetSchema\Repetition::OPTIONAL)); + return struct_schema($column->name(), struct_type($elements, $nullable)); } }