Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified behavior of first/last aggregations #1248

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/First.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public function aggregate(Row $row) : void

public function result() : Entry
{
return $this->first ?? new Entry\StringEntry($this->ref->name(), null);
$name = $this->ref->hasAlias() ? $this->ref->name() : $this->ref->name() . '_first';

if ($this->first) {
return $this->first->rename($name);
}

return new Entry\StringEntry($name, null);
}
}
8 changes: 7 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/Last.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public function aggregate(Row $row) : void

public function result() : Entry
{
return $this->last ?? new Entry\StringEntry($this->ref->name(), null);
$name = $this->ref->hasAlias() ? $this->ref->name() : $this->ref->name() . '_last';

if ($this->last) {
return $this->last->rename($name);
}

return new Entry\StringEntry($name, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Function;

use function Flow\ETL\DSL\{df, first, from_array};
use PHPUnit\Framework\TestCase;

final class FirstTest extends TestCase
{
public function test_first_aggregation() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10],
['id' => 2, 'value' => 20],
['id' => 3, 'value' => 30],
['id' => 4, 'value' => 40],
['id' => 5, 'value' => 50],
])
)
->aggregate(first('value'))
->fetch()
->toArray();

self::assertSame(
[
['value_first' => 10],
],
$first
);
}

public function test_first_aggregation_with_grouping() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10, 'group' => 'A'],
['id' => 2, 'value' => 20, 'group' => 'A'],
['id' => 3, 'value' => 30, 'group' => 'B'],
['id' => 4, 'value' => 40, 'group' => 'C'],
['id' => 5, 'value' => 50, 'group' => 'B'],
])
)
->groupBy('group')->aggregate(first('value'))
->fetch()
->toArray();

self::assertSame(
[
['group' => 'A', 'value_first' => 10],
['group' => 'B', 'value_first' => 30],
['group' => 'C', 'value_first' => 40],
],
$first
);
}

public function test_first_aggregation_with_on_aggregated_column() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10, 'group' => 'A'],
['id' => 2, 'value' => 20, 'group' => 'A'],
['id' => 3, 'value' => 30, 'group' => 'B'],
['id' => 4, 'value' => 40, 'group' => 'C'],
['id' => 5, 'value' => 50, 'group' => 'B'],
])
)
->groupBy('value')->aggregate(first('value'))
->fetch()
->toArray();

self::assertSame(
[
['value' => 10, 'value_first' => 10],
['value' => 20, 'value_first' => 20],
['value' => 30, 'value_first' => 30],
['value' => 40, 'value_first' => 40],
['value' => 50, 'value_first' => 50],
],
$first
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Function;

use function Flow\ETL\DSL\{df, from_array, last};
use PHPUnit\Framework\TestCase;

final class LastTest extends TestCase
{
public function test_first_aggregation_with_grouping() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10, 'group' => 'A'],
['id' => 2, 'value' => 20, 'group' => 'A'],
['id' => 3, 'value' => 30, 'group' => 'B'],
['id' => 4, 'value' => 40, 'group' => 'C'],
['id' => 5, 'value' => 50, 'group' => 'B'],
])
)
->groupBy('group')->aggregate(last('value'))
->fetch()
->toArray();

self::assertSame(
[
['group' => 'A', 'value_last' => 20],
['group' => 'B', 'value_last' => 50],
['group' => 'C', 'value_last' => 40],
],
$first
);
}

public function test_first_aggregation_with_on_aggregated_column() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10, 'group' => 'A'],
['id' => 2, 'value' => 20, 'group' => 'A'],
['id' => 3, 'value' => 30, 'group' => 'B'],
['id' => 4, 'value' => 40, 'group' => 'C'],
['id' => 5, 'value' => 50, 'group' => 'B'],
])
)
->groupBy('value')->aggregate(last('value'))
->fetch()
->toArray();

self::assertSame(
[
['value' => 10, 'value_last' => 10],
['value' => 20, 'value_last' => 20],
['value' => 30, 'value_last' => 30],
['value' => 40, 'value_last' => 40],
['value' => 50, 'value_last' => 50],
],
$first
);
}

public function test_last_aggregation() : void
{
$first = df()
->read(
from_array([
['id' => 1, 'value' => 10],
['id' => 2, 'value' => 20],
['id' => 3, 'value' => 30],
['id' => 4, 'value' => 40],
['id' => 5, 'value' => 50],
])
)
->aggregate(last('value'))
->fetch()
->toArray();

self::assertSame(
[
['value_last' => 50],
],
$first
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public function __construct(string $name)
parent::__construct($name);

$this->baseMemoryLimit = (\ini_get('memory_limit')) ?: '-1';

$this->cacheDir = Path::realpath(\getenv(CacheConfig::CACHE_DIR_ENV));
$this->fs = new NativeLocalFilesystem();
$this->fstab = new FilesystemTable($this->fs, new StdOutFilesystem());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function test_aggregation_firs_value_when_nothing_aggregated() : void
$aggregator = first(ref('int'));

self::assertEquals(
new Row\Entry\StringEntry('int', null),
new Row\Entry\StringEntry('int_first', null),
$aggregator->result()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function test_aggregation_last_value_when_nothing_aggregated() : void
$aggregator = last(ref('int'));

self::assertEquals(
new Row\Entry\StringEntry('int', null),
new Row\Entry\StringEntry('int_last', null),
$aggregator->result()
);
}
Expand Down
Loading