Skip to content

Commit

Permalink
Unified behavior of first/last aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Oct 20, 2024
1 parent f06a032 commit 5900095
Show file tree
Hide file tree
Showing 17 changed files with 278 additions and 81 deletions.
55 changes: 30 additions & 25 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/First.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public function aggregate(Row $row) : void

public function result() : Entry
{
return $this->first ?? new Entry\StringEntry($this->ref->name(), null);
$name = $this->ref->hasAlias() ? $this->ref->name() : $this->ref->name() . '_first';

if ($this->first) {
return $this->first->rename($name);
}

return new Entry\StringEntry($name, null);
}
}
8 changes: 7 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/Last.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public function aggregate(Row $row) : void

public function result() : Entry
{
return $this->last ?? new Entry\StringEntry($this->ref->name(), null);
$name = $this->ref->hasAlias() ? $this->ref->name() : $this->ref->name() . '_last';

if ($this->last) {
return $this->last->rename($name);
}

return new Entry\StringEntry($name, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Function;

use function Flow\ETL\DSL\{df, first, from_array};
use PHPUnit\Framework\TestCase;

final class FirstTest extends TestCase
{
public function test_first_aggregation() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10],
['id' => 2, 'value' => 20],
['id' => 3, 'value' => 30],
['id' => 4, 'value' => 40],
['id' => 5, 'value' => 50],
])
)
->aggregate(first('value'))
->fetch()
->toArray();

self::assertSame(
[
['value_first' => 10],
],
$first
);
}

public function test_first_aggregation_with_grouping() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10, 'group' => 'A'],
['id' => 2, 'value' => 20, 'group' => 'A'],
['id' => 3, 'value' => 30, 'group' => 'B'],
['id' => 4, 'value' => 40, 'group' => 'C'],
['id' => 5, 'value' => 50, 'group' => 'B'],
])
)
->groupBy('group')->aggregate(first('value'))
->fetch()
->toArray();

self::assertSame(
[
['group' => 'A', 'value_first' => 10],
['group' => 'B', 'value_first' => 30],
['group' => 'C', 'value_first' => 40],
],
$first
);
}

public function test_first_aggregation_with_on_aggregated_column() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10, 'group' => 'A'],
['id' => 2, 'value' => 20, 'group' => 'A'],
['id' => 3, 'value' => 30, 'group' => 'B'],
['id' => 4, 'value' => 40, 'group' => 'C'],
['id' => 5, 'value' => 50, 'group' => 'B'],
])
)
->groupBy('value')->aggregate(first('value'))
->fetch()
->toArray();

self::assertSame(
[
['value' => 10, 'value_first' => 10],
['value' => 20, 'value_first' => 20],
['value' => 30, 'value_first' => 30],
['value' => 40, 'value_first' => 40],
['value' => 50, 'value_first' => 50],
],
$first
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Function;

use function Flow\ETL\DSL\{df, from_array, last};
use PHPUnit\Framework\TestCase;

final class LastTest extends TestCase
{
public function test_first_aggregation_with_grouping() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10, 'group' => 'A'],
['id' => 2, 'value' => 20, 'group' => 'A'],
['id' => 3, 'value' => 30, 'group' => 'B'],
['id' => 4, 'value' => 40, 'group' => 'C'],
['id' => 5, 'value' => 50, 'group' => 'B'],
])
)
->groupBy('group')->aggregate(last('value'))
->fetch()
->toArray();

self::assertSame(
[
['group' => 'A', 'value_last' => 20],
['group' => 'B', 'value_last' => 50],
['group' => 'C', 'value_last' => 40],
],
$first
);
}

public function test_first_aggregation_with_on_aggregated_column() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10, 'group' => 'A'],
['id' => 2, 'value' => 20, 'group' => 'A'],
['id' => 3, 'value' => 30, 'group' => 'B'],
['id' => 4, 'value' => 40, 'group' => 'C'],
['id' => 5, 'value' => 50, 'group' => 'B'],
])
)
->groupBy('value')->aggregate(last('value'))
->fetch()
->toArray();

self::assertSame(
[
['value' => 10, 'value_last' => 10],
['value' => 20, 'value_last' => 20],
['value' => 30, 'value_last' => 30],
['value' => 40, 'value_last' => 40],
['value' => 50, 'value_last' => 50],
],
$first
);
}

public function test_last_aggregation() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10],
['id' => 2, 'value' => 20],
['id' => 3, 'value' => 30],
['id' => 4, 'value' => 40],
['id' => 5, 'value' => 50],
])
)
->aggregate(last('value'))
->fetch()
->toArray();

self::assertSame(
[
['value_last' => 50],
],
$first
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public function __construct(string $name)
parent::__construct($name);

$this->baseMemoryLimit = (\ini_get('memory_limit')) ?: '-1';

$this->cacheDir = Path::realpath(\getenv(CacheConfig::CACHE_DIR_ENV));
$this->fs = new NativeLocalFilesystem();
$this->fstab = new FilesystemTable($this->fs, new StdOutFilesystem());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function test_aggregation_firs_value_when_nothing_aggregated() : void
$aggregator = first(ref('int'));

self::assertEquals(
new Row\Entry\StringEntry('int', null),
new Row\Entry\StringEntry('int_first', null),
$aggregator->result()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function test_aggregation_last_value_when_nothing_aggregated() : void
$aggregator = last(ref('int'));

self::assertEquals(
new Row\Entry\StringEntry('int', null),
new Row\Entry\StringEntry('int_last', null),
$aggregator->result()
);
}
Expand Down
Loading

0 comments on commit 5900095

Please sign in to comment.