Skip to content

Commit

Permalink
Added CSV RowsNormalizer to automate writing deeply nested data struc…
Browse files Browse the repository at this point in the history
…tures to CSV without manual casting
  • Loading branch information
norberttech committed Aug 9, 2024
1 parent a7c3654 commit 32be7f8
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 44 deletions.
30 changes: 8 additions & 22 deletions src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Loader\Closure;
use Flow\ETL\Row\Entry;
use Flow\ETL\{FlowContext, Loader, Rows};
use Flow\ETL\{Adapter\CSV\RowsNormalizer\EntryNormalizer, FlowContext, Loader, Rows};
use Flow\Filesystem\{DestinationStream, Partition, Path};

final class CSVLoader implements Closure, Loader, Loader\FileLoader
Expand All @@ -20,9 +20,6 @@ public function __construct(
private string $escape = '\\',
private string $newLineSeparator = PHP_EOL
) {
if ($this->path->isPattern()) {
throw new \InvalidArgumentException("CSVLoader path can't be pattern, given: " . $this->path->path());
}
}

public function closure(FlowContext $context) : void
Expand All @@ -41,19 +38,21 @@ public function load(Rows $rows, FlowContext $context) : void
return;
}

$normalizer = new RowsNormalizer(new EntryNormalizer($context->config->caster()));

$headers = $rows->first()->entries()->map(fn (Entry $entry) => $entry->name());

if ($rows->partitions()->count()) {
$this->write($rows, $headers, $context, $rows->partitions()->toArray());
$this->write($rows, $headers, $context, $rows->partitions()->toArray(), $normalizer);
} else {
$this->write($rows, $headers, $context, []);
$this->write($rows, $headers, $context, [], $normalizer);
}
}

/**
* @param array<Partition> $partitions
*/
public function write(Rows $nextRows, array $headers, FlowContext $context, array $partitions) : void
public function write(Rows $nextRows, array $headers, FlowContext $context, array $partitions, RowsNormalizer $normalizer) : void
{
if ($this->header && !$context->streams()->isOpen($this->path, $partitions)) {
$this->writeCSV(
Expand All @@ -62,26 +61,13 @@ public function write(Rows $nextRows, array $headers, FlowContext $context, arra
);
}

foreach ($nextRows as $row) {
$this->writeCSV(
$row->toArray(),
$context->streams()->writeTo($this->path, $partitions)
);
foreach ($normalizer->normalize($nextRows) as $normalizedRow) {
$this->writeCSV($normalizedRow, $context->streams()->writeTo($this->path, $partitions));
}
}

private function writeCSV(array $row, DestinationStream $stream) : void
{
/**
* @var string $entry
* @var mixed $value
*/
foreach ($row as $entry => $value) {
if (\is_array($value)) {
throw new RuntimeException("Entry \"{$entry}\" is an list|array, please cast to string before writing to CSV. Easiest way to cast arrays to string is to use Transform::to_json transformer.");
}
}

$tmpHandle = fopen('php://temp/maxmemory:' . (5 * 1024 * 1024), 'rb+');

if ($tmpHandle === false) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV;

use Flow\ETL\Adapter\CSV\RowsNormalizer\EntryNormalizer;
use Flow\ETL\Rows;

final class RowsNormalizer
{
public function __construct(private readonly EntryNormalizer $entryNormalizer)
{
}

/**
* @return \Generator<array<null|bool|float|int|string>>
*/
public function normalize(Rows $rows) : \Generator
{
foreach ($rows as $row) {
$normalizedRow = [];

foreach ($row->entries() as $entry) {
$normalizedRow[] = $this->entryNormalizer->normalize($entry);
}

yield $normalizedRow;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV\RowsNormalizer;

use function Flow\ETL\DSL\type_json;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Row\Entry;

final class EntryNormalizer
{
public function __construct(
private readonly Caster $caster,
private readonly string $dateTimeFormat = \DateTimeInterface::ATOM
) {
}

public function normalize(Entry $entry) : string|float|int|bool|null
{
return match ($entry::class) {
Entry\UuidEntry::class,
Entry\XMLElementEntry::class,
Entry\XMLEntry::class => $entry->toString(),
Entry\DateTimeEntry::class => $entry->value()?->format($this->dateTimeFormat),
Entry\EnumEntry::class => $entry->value()?->name,
Entry\ArrayEntry::class,
Entry\ListEntry::class,
Entry\MapEntry::class,
Entry\StructureEntry::class,
Entry\JsonEntry::class,
Entry\ObjectEntry::class => $this->caster->to(type_json())->value($entry->value()),
default => $entry->value(),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
namespace Flow\ETL\Adapter\CSV\Tests\Integration;

use function Flow\ETL\Adapter\CSV\{from_csv, to_csv};
use function Flow\ETL\DSL\{array_entry, df, int_entry, overwrite, ref, row, rows};
use Flow\ETL\Flow;
use function Flow\ETL\DSL\{df, overwrite, ref};
use Flow\ETL\Tests\Double\FakeExtractor;
use Flow\Filesystem\Path;
use PHPUnit\Framework\TestCase;

final class CSVTest extends TestCase
Expand All @@ -20,17 +18,6 @@ protected function setUp() : void
}
}

public function test_loading_array_entry() : void
{
$this->expectExceptionMessage('Entry "data" is an list|array, please cast to string before writing to CSV. Easiest way to cast arrays to string is to use Transform::to_json transformer.');

(new Flow())
->process(rows(row(int_entry('id', 1), array_entry('data', ['foo' => 'bar']))))
->saveMode(overwrite())
->write(to_csv(__DIR__ . '/var/test_loading_array_entry.csv'))
->run();
}

public function test_loading_csv_files() : void
{

Expand All @@ -51,11 +38,4 @@ public function test_loading_csv_files() : void
\unlink($path);
}
}

public function test_using_pattern_path() : void
{
$this->expectExceptionMessage("CSVLoader path can't be pattern, given: /path/*/pattern.csv");

to_csv(new Path('/path/*/pattern.csv'));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public function supports(Type $type) : bool
return $type instanceof JsonType;
}

public function value(mixed $value, Type $type, Caster $caster) : mixed
public function value(mixed $value, Type $type, Caster $caster) : string
{
try {
if (\is_string($value)) {
Expand Down

0 comments on commit 32be7f8

Please sign in to comment.