diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index 350b713c3..0895e3e4f 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -34,6 +34,7 @@ use Flow\ETL\Transformer\CallbackRowTransformer; use Flow\ETL\Transformer\CrossJoinRowsTransformer; use Flow\ETL\Transformer\DropDuplicatesTransformer; +use Flow\ETL\Transformer\DropPartitionsTransformer; use Flow\ETL\Transformer\EntryNameStyleConverterTransformer; use Flow\ETL\Transformer\JoinEachRowsTransformer; use Flow\ETL\Transformer\JoinRowsTransformer; @@ -300,6 +301,13 @@ public function dropDuplicates(string|Reference ...$entries) : self return $this; } + public function dropPartitions() : self + { + $this->pipeline->add(new DropPartitionsTransformer()); + + return $this; + } + /** * Be aware that fetch is not memory safe and will load all rows into memory. * If you want to safely iterate over Rows use oe of the following methods:. diff --git a/src/core/etl/src/Flow/ETL/Rows.php b/src/core/etl/src/Flow/ETL/Rows.php index fc7af5fda..44d748502 100644 --- a/src/core/etl/src/Flow/ETL/Rows.php +++ b/src/core/etl/src/Flow/ETL/Rows.php @@ -142,6 +142,11 @@ public function drop(int $size) : self return self::partitioned(\array_slice($this->rows, $size), $this->partitions); } + public function dropPartitions() : self + { + return new self(...$this->rows); + } + public function dropRight(int $size) : self { if ($size === 0) { diff --git a/src/core/etl/src/Flow/ETL/Transformer/DropPartitionsTransformer.php b/src/core/etl/src/Flow/ETL/Transformer/DropPartitionsTransformer.php new file mode 100644 index 000000000..e0f0328a7 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Transformer/DropPartitionsTransformer.php @@ -0,0 +1,21 @@ +isPartitioned()) { + return $rows->dropPartitions(); + } + + return $rows; + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php index 8fa12ed69..990f6c5cc 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php @@ -20,6 +20,28 @@ final class PartitioningTest extends IntegrationTestCase { + public function test_dropping_partitions() : void + { + $rows = df() + ->read(from_rows( + rows_partitioned( + [ + row(int_entry('id', 1), str_entry('country', 'PL'), int_entry('age', 20)), + row(int_entry('id', 2), str_entry('country', 'PL'), int_entry('age', 20)), + row(int_entry('id', 3), str_entry('country', 'PL'), int_entry('age', 25)), + row(int_entry('id', 4), str_entry('country', 'PL'), int_entry('age', 30)), + ], + [ + partition('country', 'PL'), + ] + ) + )) + ->dropPartitions() + ->fetch(); + + $this->assertFalse($rows->isPartitioned()); + } + public function test_partition_by() : void { $rows = df() diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/DropPartitionsTransformerTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/DropPartitionsTransformerTest.php new file mode 100644 index 000000000..767ee2b71 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/DropPartitionsTransformerTest.php @@ -0,0 +1,59 @@ + 1, 'name' => 'one', 'category' => 'a'], + ['id' => 2, 'name' => 'two', 'category' => 'a'], + ['id' => 3, 'name' => 'three', 'category' => 'a'], + ['id' => 4, 'name' => 'four', 'category' => 'a'], + ['id' => 5, 'name' => 'five', 'category' => 'a'], + ['id' => 6, 'name' => 'six', 'category' => 'b'], + ['id' => 7, 'name' => 'seven', 'category' => 'b'], + ['id' => 8, 'name' => 'eight', 'category' => 'b'], + ['id' => 9, 'name' => 'nine', 'category' => 'b'], + ['id' => 10, 'name' => 'ten', 'category' => 'b'], + ])->partitionBy(ref('category')); + + foreach ($partitioned as $rows) { + $this->assertTrue($rows->isPartitioned()); + + $notPartitioned = (new DropPartitionsTransformer())->transform($rows, flow_context()); + + $this->assertFalse($notPartitioned->isPartitioned()); + } + } + + public function test_transforming_not_partitioned_rows() : void + { + $rows = array_to_rows([ + ['id' => 1, 'name' => 'one', 'category' => 'a'], + ['id' => 2, 'name' => 'two', 'category' => 'a'], + ['id' => 3, 'name' => 'three', 'category' => 'a'], + ['id' => 4, 'name' => 'four', 'category' => 'a'], + ['id' => 5, 'name' => 'five', 'category' => 'a'], + ['id' => 6, 'name' => 'six', 'category' => 'b'], + ['id' => 7, 'name' => 'seven', 'category' => 'b'], + ['id' => 8, 'name' => 'eight', 'category' => 'b'], + ['id' => 9, 'name' => 'nine', 'category' => 'b'], + ['id' => 10, 'name' => 'ten', 'category' => 'b'], + ]); + + $this->assertSame( + $rows, + (new DropPartitionsTransformer())->transform($rows, flow_context()) + ); + } +}