Skip to content

Commit

Permalink
Drop partition columns (#987)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Feb 13, 2024
1 parent 458bcc0 commit 7efdf8f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 5 deletions.
9 changes: 7 additions & 2 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,14 @@ public function dropDuplicates(string|Reference ...$entries) : self
return $this;
}

public function dropPartitions() : self
/**
* Drop all partitions from Rows, additionally when $dropPartitionColumns is set to true, partition columns are also removed.
*
* @lazy
*/
public function dropPartitions(bool $dropPartitionColumns = false) : self
{
$this->pipeline->add(new DropPartitionsTransformer());
$this->pipeline->add(new DropPartitionsTransformer($dropPartitionColumns));

return $this;
}
Expand Down
15 changes: 13 additions & 2 deletions src/core/etl/src/Flow/ETL/Rows.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,20 @@ public function drop(int $size) : self
return self::partitioned(\array_slice($this->rows, $size), $this->partitions);
}

public function dropPartitions() : self
public function dropPartitions(bool $dropPartitionColumns = false) : self
{
return new self(...$this->rows);
$rows = new self(...$this->rows);

if ($dropPartitionColumns) {
return $rows->map(fn (Row $row) : Row => $row->remove(
...\array_map(
static fn (Partition $partition) : Reference => $partition->reference(),
$this->partitions->toArray()
)
));
}

return $rows;
}

public function dropRight(int $size) : self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@

final class DropPartitionsTransformer implements Transformer
{
public function __construct(private readonly bool $dropPartitionColumns = false)
{

}

public function transform(Rows $rows, FlowContext $context) : Rows
{
if ($rows->isPartitioned()) {
return $rows->dropPartitions();
return $rows->dropPartitions($this->dropPartitionColumns);
}

return $rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@ public function test_dropping_partitions() : void
}
}

public function test_dropping_partitions_with_columns() : void
{
$partitioned = array_to_rows([
['id' => 1, 'name' => 'one', 'category' => 'a'],
['id' => 2, 'name' => 'two', 'category' => 'a'],
['id' => 3, 'name' => 'three', 'category' => 'a'],
['id' => 4, 'name' => 'four', 'category' => 'a'],
['id' => 5, 'name' => 'five', 'category' => 'a'],
['id' => 6, 'name' => 'six', 'category' => 'b'],
['id' => 7, 'name' => 'seven', 'category' => 'b'],
['id' => 8, 'name' => 'eight', 'category' => 'b'],
['id' => 9, 'name' => 'nine', 'category' => 'b'],
['id' => 10, 'name' => 'ten', 'category' => 'b'],
])->partitionBy(ref('category'));

foreach ($partitioned as $rows) {
$this->assertTrue($rows->isPartitioned());

$notPartitioned = (new DropPartitionsTransformer(true))->transform($rows, flow_context());

$this->assertFalse($notPartitioned->isPartitioned());
$this->assertFalse($notPartitioned->first()->has('category'));
}
}

public function test_transforming_not_partitioned_rows() : void
{
$rows = array_to_rows([
Expand Down

0 comments on commit 7efdf8f

Please sign in to comment.