Skip to content

Commit

Permalink
Make parquet rows normalizer to respect given schema (#931)
Browse files Browse the repository at this point in the history
* Make parquet rows normalizer to respect given schema

* CS Fixes
  • Loading branch information
norberttech authored Jan 27, 2024
1 parent d63a877 commit c652989
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Loader\Closure;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Row\Schema;
use Flow\ETL\Rows;
use Flow\Parquet\Options;
Expand All @@ -32,7 +33,7 @@ public function __construct(
private readonly ?Schema $schema = null,
) {
$this->converter = new SchemaConverter();
$this->normalizer = new RowsNormalizer();
$this->normalizer = new RowsNormalizer(Caster::default());

if ($this->path->isPattern()) {
throw new \InvalidArgumentException("ParquetLoader path can't be pattern, given: " . $this->path->path());
Expand Down Expand Up @@ -77,7 +78,7 @@ public function load(Rows $rows, FlowContext $context) : void
$this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema()));
}

$this->writers[$stream->path()->uri()]->writeBatch($this->normalizer->normalize($rows));
$this->writers[$stream->path()->uri()]->writeBatch($this->normalizer->normalize($rows, $this->schema()));
} else {
$stream = $streams->open($this->path, 'parquet', $context->appendSafe());

Expand All @@ -90,7 +91,7 @@ public function load(Rows $rows, FlowContext $context) : void
$this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema()));
}

$this->writers[$stream->path()->uri()]->writeBatch($this->normalizer->normalize($rows));
$this->writers[$stream->path()->uri()]->writeBatch($this->normalizer->normalize($rows, $this->schema()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@

namespace Flow\ETL\Adapter\Parquet;

use function Flow\ETL\DSL\type_string;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Row\Entry\UuidEntry;
use Flow\ETL\Row\Schema;
use Flow\ETL\Rows;

final class RowsNormalizer
{
public function __construct(private readonly Caster $caster)
{
}

/**
* @return array<mixed, array<string, mixed>>
*/
public function normalize(Rows $rows) : array
public function normalize(Rows $rows, Schema $schema) : array
{
$normalizedRows = [];

Expand All @@ -19,8 +26,8 @@ public function normalize(Rows $rows) : array

foreach ($row->entries() as $entry) {
$columns[$entry->name()] = match ($entry::class) {
UuidEntry::class => $entry->value()->toString(),
default => $entry->value(),
UuidEntry::class => $this->caster->to(type_string())->value($entry->value()),
default => $this->caster->to($schema->getDefinition($entry->ref())->type())->value($entry->value()),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
use function Flow\ETL\DSL\datetime_entry;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\float_entry;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\from_rows;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\json_entry;
use function Flow\ETL\DSL\json_object_entry;
use function Flow\ETL\DSL\json_schema;
use function Flow\ETL\DSL\list_entry;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\schema;
use function Flow\ETL\DSL\str_entry;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\struct_element;
use function Flow\ETL\DSL\struct_entry;
use function Flow\ETL\DSL\struct_type;
Expand Down Expand Up @@ -44,10 +48,10 @@ public function test_writing_to_file() : void
->run();

$this->assertEquals(
$rows,
$rows->toArray(),
(new Flow())
->read(from_parquet($path))
->fetch()
->fetch()->toArray()
);

$parquetFile = (new Reader())->read($path);
Expand Down Expand Up @@ -96,6 +100,41 @@ public function test_writing_with_partitioning() : void
$this->cleanDirectory($path);
}

public function test_writing_with_provided_schema() : void
{
$path = \sys_get_temp_dir() . '/file_schema.snappy.parquet';
$this->removeFile($path);

df()
->read(from_array([
['id' => 1, 'name' => 'test', 'uuid' => Uuid::fromString('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'],
['id' => 2, 'name' => 'test', 'uuid' => Uuid::fromString('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'],
]))
->write(
to_parquet($path, schema: schema(
str_schema('id'),
str_schema('name'),
str_schema('uuid'),
json_schema('json'),
))
)
->run();

$this->assertEquals(
[
['id' => '1', 'name' => 'test', 'uuid' => new Row\Entry\Type\Uuid('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'],
['id' => '2', 'name' => 'test', 'uuid' => new Row\Entry\Type\Uuid('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'],
],
df()
->read(from_parquet($path))
->fetch()
->toArray()
);

$this->assertFileExists($path);
$this->removeFile($path);
}

/**
* @param string $path
*/
Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/Partition.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static function fromUri(string $uri) : Partitions

$partitions = [];

foreach (\array_filter(\explode('/', $uri), 'strlen') as $uriPart) {
foreach (\array_filter(\explode('/', $uri), static fn (string $s) : bool => (bool) \strlen($s)) as $uriPart) {
if (\preg_match($regex, $uriPart, $matches)) {
$partitions[] = new self($matches[1], $matches[2]);
}
Expand Down
6 changes: 5 additions & 1 deletion src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ public function create(string $entryName, mixed $value, ?Schema $schema = null)
}

if ($valueType instanceof UuidType) {
return uuid_entry($entryName, $value);
if ($value instanceof Entry\Type\Uuid) {
return uuid_entry($entryName, $value);
}

return uuid_entry($entryName, (string) $value);
}

if ($valueType instanceof DateTimeType) {
Expand Down

0 comments on commit c652989

Please sign in to comment.