Skip to content

Commit

Permalink
Added to_branch loader (#937)
Browse files Browse the repository at this point in the history
* Added to_branch loader

* Added Closure interface to BranchingLoader
  • Loading branch information
norberttech authored Jan 28, 2024
1 parent a66a3cc commit 406b35c
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ function to_transformation(Transformer $transformer, Loader $loader) : Transform
return new TransformerLoader($transformer, $loader);
}

function to_branch(ScalarFunction $condition, Loader $loader) : Loader
{
return new Loader\BranchingLoader($condition, $loader);
}

/**
* @param array<mixed> $data
*/
Expand Down
42 changes: 42 additions & 0 deletions src/core/etl/src/Flow/ETL/Loader/BranchingLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Loader;

use Flow\ETL\FlowContext;
use Flow\ETL\Function\ScalarFunction;
use Flow\ETL\Loader;
use Flow\ETL\Rows;
use Flow\ETL\Transformer\ScalarFunctionFilterTransformer;

final class BranchingLoader implements Closure, Loader, OverridingLoader
{
public function __construct(
private readonly ScalarFunction $condition,
private readonly Loader $loader
) {
}

public function closure(FlowContext $context) : void
{
if ($this->loader instanceof Closure) {
$this->loader->closure($context);
}
}

public function load(Rows $rows, FlowContext $context) : void
{
$this->loader->load(
(new ScalarFunctionFilterTransformer($this->condition))->transform($rows, $context),
$context
);
}

public function loaders() : array
{
return [
$this->loader,
];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\DataFrame;

use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_branch;
use function Flow\ETL\DSL\to_memory;
use Flow\ETL\Memory\ArrayMemory;
use Flow\ETL\Tests\Integration\IntegrationTestCase;

final class BranchingTest extends IntegrationTestCase
{
public function test_branching() : void
{
df()
->read(from_array([
['id' => 1, 'group' => 'A'],
['id' => 2, 'group' => 'B'],
['id' => 3, 'group' => 'A'],
['id' => 4, 'group' => 'B'],
['id' => 5, 'group' => 'A'],
['id' => 6, 'group' => 'C'],
]))
->write(
to_branch(
ref('group')->equals(lit('A')),
to_memory($memoryA = new ArrayMemory()),
)
)
->write(
to_branch(
ref('group')->isIn(lit(['B', 'C'])),
to_memory($memoryBC = new ArrayMemory()),
)
)
->run();

$this->assertSame(
[
['id' => 1, 'group' => 'A'],
['id' => 3, 'group' => 'A'],
['id' => 5, 'group' => 'A'],
],
$memoryA->dump(),
);
$this->assertSame(
[
['id' => 2, 'group' => 'B'],
['id' => 4, 'group' => 'B'],
['id' => 6, 'group' => 'C'],
],
$memoryBC->dump(),
);
}
}

0 comments on commit 406b35c

Please sign in to comment.