diff --git a/src/core/etl/src/Flow/ETL/DSL/functions.php b/src/core/etl/src/Flow/ETL/DSL/functions.php index ea37f6324..f3b8a9cc4 100644 --- a/src/core/etl/src/Flow/ETL/DSL/functions.php +++ b/src/core/etl/src/Flow/ETL/DSL/functions.php @@ -248,6 +248,11 @@ function to_transformation(Transformer $transformer, Loader $loader) : Transform return new TransformerLoader($transformer, $loader); } +function to_branch(ScalarFunction $condition, Loader $loader) : Loader +{ + return new Loader\BranchingLoader($condition, $loader); +} + /** * @param array $data */ diff --git a/src/core/etl/src/Flow/ETL/Loader/BranchingLoader.php b/src/core/etl/src/Flow/ETL/Loader/BranchingLoader.php new file mode 100644 index 000000000..0e1166dac --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Loader/BranchingLoader.php @@ -0,0 +1,42 @@ +loader instanceof Closure) { + $this->loader->closure($context); + } + } + + public function load(Rows $rows, FlowContext $context) : void + { + $this->loader->load( + (new ScalarFunctionFilterTransformer($this->condition))->transform($rows, $context), + $context + ); + } + + public function loaders() : array + { + return [ + $this->loader, + ]; + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/BranchingTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/BranchingTest.php new file mode 100644 index 000000000..e406fe03c --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/BranchingTest.php @@ -0,0 +1,60 @@ +read(from_array([ + ['id' => 1, 'group' => 'A'], + ['id' => 2, 'group' => 'B'], + ['id' => 3, 'group' => 'A'], + ['id' => 4, 'group' => 'B'], + ['id' => 5, 'group' => 'A'], + ['id' => 6, 'group' => 'C'], + ])) + ->write( + to_branch( + ref('group')->equals(lit('A')), + to_memory($memoryA = new ArrayMemory()), + ) + ) + ->write( + to_branch( + ref('group')->isIn(lit(['B', 'C'])), + to_memory($memoryBC = new ArrayMemory()), + ) + ) + ->run(); + + $this->assertSame( + [ + ['id' => 1, 'group' => 'A'], + ['id' => 3, 'group' => 'A'], + ['id' => 5, 'group' => 'A'], + ], + $memoryA->dump(), + ); + $this->assertSame( + [ + ['id' => 2, 'group' => 'B'], + ['id' => 4, 'group' => 'B'], + ['id' => 6, 'group' => 'C'], + ], + $memoryBC->dump(), + ); + } +}