Skip to content

Commit

Permalink
Rework GroupBy::result() method to not recreate entries & rows in l…
Browse files Browse the repository at this point in the history
…oop (#655)

* Rework `GroupBy::result()` method to not recreate entries & rows in loop

* Use `FlowContext` in `GroupBy`

* Fixed broken `GroupByTest`
  • Loading branch information
stloyd authored Oct 27, 2023
1 parent 1068d39 commit 0fa2b7c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 16 deletions.
24 changes: 10 additions & 14 deletions src/core/etl/src/Flow/ETL/GroupBy.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\GroupBy\Aggregation;
use Flow\ETL\GroupBy\Aggregator;
use Flow\ETL\Row\Entries;
use Flow\ETL\Row\Factory\NativeEntryFactory;
use Flow\ETL\Row\Reference;
use Flow\ETL\Row\References;

Expand Down Expand Up @@ -82,30 +80,28 @@ public function group(Rows $rows) : void
}
}

public function result() : Rows
public function result(FlowContext $context) : Rows
{
$rows = new Rows();
$rows = [];

foreach ($this->groups as $group) {
$entries = new Entries();
$entries = [];

if (\array_key_exists('values', $group)) {
/** @var mixed $value */
foreach ($group['values'] as $entry => $value) {
$entries = $entries->add((new NativeEntryFactory)->create($entry, $value));
}
/** @var mixed $value */
foreach ($group['values'] ?? [] as $entry => $value) {
$entries[] = $context->entryFactory()->create($entry, $value);
}

foreach ($group['aggregators'] as $aggregator) {
$entries = $entries->add($aggregator->result());
$entries[] = $aggregator->result();
}

if ($entries->count()) {
$rows = $rows->add(new Row($entries));
if (\count($entries)) {
$rows[] = Row::create(...$entries);
}
}

return $rows;
return new Rows(...$rows);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/Pipeline/GroupByPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function process(FlowContext $context) : \Generator
$this->groupBy->group($nextRows);
}

$this->nextPipeline->source(new Extractor\ProcessExtractor($this->groupBy->result()));
$this->nextPipeline->source(new Extractor\ProcessExtractor($this->groupBy->result($context)));

foreach ($this->nextPipeline->process($context) as $nextRows) {
yield $nextRows;
Expand Down
4 changes: 3 additions & 1 deletion src/core/etl/tests/Flow/ETL/Tests/Unit/GroupByTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

namespace Flow\ETL\Tests\Unit;

use Flow\ETL\Config;
use Flow\ETL\DSL\Entry;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\FlowContext;
use Flow\ETL\GroupBy;
use Flow\ETL\Row;
use Flow\ETL\Rows;
Expand Down Expand Up @@ -44,7 +46,7 @@ public function test_group_by_missing_entry() : void
Row::create(Entry::null('type')),
Row::create(Entry::string('type', 'c'))
),
$groupBy->result()
$groupBy->result(new FlowContext(Config::default()))
);
}

Expand Down

0 comments on commit 0fa2b7c

Please sign in to comment.