diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php index 63daa0891..ffd329526 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php @@ -6,6 +6,7 @@ use Flow\ETL\FlowContext; use Flow\ETL\Loader; use Flow\ETL\Loader\Closure; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row\Schema; use Flow\ETL\Rows; use Flow\Parquet\Options; @@ -32,7 +33,7 @@ public function __construct( private readonly ?Schema $schema = null, ) { $this->converter = new SchemaConverter(); - $this->normalizer = new RowsNormalizer(); + $this->normalizer = new RowsNormalizer(Caster::default()); if ($this->path->isPattern()) { throw new \InvalidArgumentException("ParquetLoader path can't be pattern, given: " . $this->path->path()); @@ -77,7 +78,7 @@ public function load(Rows $rows, FlowContext $context) : void $this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema())); } - $this->writers[$stream->path()->uri()]->writeBatch($this->normalizer->normalize($rows)); + $this->writers[$stream->path()->uri()]->writeBatch($this->normalizer->normalize($rows, $this->schema())); } else { $stream = $streams->open($this->path, 'parquet', $context->appendSafe()); @@ -90,7 +91,7 @@ public function load(Rows $rows, FlowContext $context) : void $this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema())); } - $this->writers[$stream->path()->uri()]->writeBatch($this->normalizer->normalize($rows)); + $this->writers[$stream->path()->uri()]->writeBatch($this->normalizer->normalize($rows, $this->schema())); } } diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/RowsNormalizer.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/RowsNormalizer.php index 2b320bba9..e3d0646be 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/RowsNormalizer.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/RowsNormalizer.php @@ -2,15 +2,22 @@ namespace Flow\ETL\Adapter\Parquet; +use function Flow\ETL\DSL\type_string; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row\Entry\UuidEntry; +use Flow\ETL\Row\Schema; use Flow\ETL\Rows; final class RowsNormalizer { + public function __construct(private readonly Caster $caster) + { + } + /** * @return array> */ - public function normalize(Rows $rows) : array + public function normalize(Rows $rows, Schema $schema) : array { $normalizedRows = []; @@ -19,8 +26,8 @@ public function normalize(Rows $rows) : array foreach ($row->entries() as $entry) { $columns[$entry->name()] = match ($entry::class) { - UuidEntry::class => $entry->value()->toString(), - default => $entry->value(), + UuidEntry::class => $this->caster->to(type_string())->value($entry->value()), + default => $this->caster->to($schema->getDefinition($entry->ref())->type())->value($entry->value()), }; } diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php index 0fd34faa6..59b2a1bec 100644 --- a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php @@ -8,13 +8,17 @@ use function Flow\ETL\DSL\datetime_entry; use function Flow\ETL\DSL\df; use function Flow\ETL\DSL\float_entry; +use function Flow\ETL\DSL\from_array; use function Flow\ETL\DSL\from_rows; use function Flow\ETL\DSL\int_entry; use function Flow\ETL\DSL\json_entry; use function Flow\ETL\DSL\json_object_entry; +use function Flow\ETL\DSL\json_schema; use function Flow\ETL\DSL\list_entry; use function Flow\ETL\DSL\ref; +use function Flow\ETL\DSL\schema; use function Flow\ETL\DSL\str_entry; +use function Flow\ETL\DSL\str_schema; use function Flow\ETL\DSL\struct_element; use function Flow\ETL\DSL\struct_entry; use function Flow\ETL\DSL\struct_type; @@ -44,10 +48,10 @@ public function test_writing_to_file() : void ->run(); $this->assertEquals( - $rows, + $rows->toArray(), (new Flow()) ->read(from_parquet($path)) - ->fetch() + ->fetch()->toArray() ); $parquetFile = (new Reader())->read($path); @@ -96,6 +100,41 @@ public function test_writing_with_partitioning() : void $this->cleanDirectory($path); } + public function test_writing_with_provided_schema() : void + { + $path = \sys_get_temp_dir() . '/file_schema.snappy.parquet'; + $this->removeFile($path); + + df() + ->read(from_array([ + ['id' => 1, 'name' => 'test', 'uuid' => Uuid::fromString('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'], + ['id' => 2, 'name' => 'test', 'uuid' => Uuid::fromString('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'], + ])) + ->write( + to_parquet($path, schema: schema( + str_schema('id'), + str_schema('name'), + str_schema('uuid'), + json_schema('json'), + )) + ) + ->run(); + + $this->assertEquals( + [ + ['id' => '1', 'name' => 'test', 'uuid' => new Row\Entry\Type\Uuid('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'], + ['id' => '2', 'name' => 'test', 'uuid' => new Row\Entry\Type\Uuid('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'], + ], + df() + ->read(from_parquet($path)) + ->fetch() + ->toArray() + ); + + $this->assertFileExists($path); + $this->removeFile($path); + } + /** * @param string $path */ diff --git a/src/core/etl/src/Flow/ETL/Partition.php b/src/core/etl/src/Flow/ETL/Partition.php index de45013f8..5a5d5e483 100644 --- a/src/core/etl/src/Flow/ETL/Partition.php +++ b/src/core/etl/src/Flow/ETL/Partition.php @@ -68,7 +68,7 @@ public static function fromUri(string $uri) : Partitions $partitions = []; - foreach (\array_filter(\explode('/', $uri), 'strlen') as $uriPart) { + foreach (\array_filter(\explode('/', $uri), static fn (string $s) : bool => (bool) \strlen($s)) as $uriPart) { if (\preg_match($regex, $uriPart, $matches)) { $partitions[] = new self($matches[1], $matches[2]); } diff --git a/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php b/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php index 7e288d56e..22c3639ea 100644 --- a/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php +++ b/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php @@ -106,7 +106,11 @@ public function create(string $entryName, mixed $value, ?Schema $schema = null) } if ($valueType instanceof UuidType) { - return uuid_entry($entryName, $value); + if ($value instanceof Entry\Type\Uuid) { + return uuid_entry($entryName, $value); + } + + return uuid_entry($entryName, (string) $value); } if ($valueType instanceof DateTimeType) {