Skip to content

Commit

Permalink
Allow to use max/min aggregations on date time columns (#1083)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored May 24, 2024
1 parent c2bc5ad commit e5bbefe
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 44 deletions.
12 changes: 10 additions & 2 deletions src/core/etl/src/Flow/ETL/Function/Max.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\{float_entry, int_entry};
use function Flow\ETL\DSL\{datetime_entry, float_entry, int_entry};
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Row;
use Flow\ETL\Row\{Entry, Reference};

final class Max implements AggregatingFunction
{
private ?float $max;
private float|\DateTimeInterface|null $max;

public function __construct(private readonly Reference $ref)
{
Expand All @@ -27,10 +27,14 @@ public function aggregate(Row $row) : void
if ($this->max === null) {
if (\is_numeric($value)) {
$this->max = (float) $value;
} elseif ($value instanceof \DateTimeInterface) {
$this->max = $value;
}
} else {
if (\is_numeric($value)) {
$this->max = \max($this->max, (float) $value);
} elseif ($value instanceof \DateTimeInterface) {
$this->max = \max($this->max, $value);
}
}
} catch (InvalidArgumentException) {
Expand All @@ -48,6 +52,10 @@ public function result() : Entry
return int_entry($this->ref->name(), null);
}

if ($this->max instanceof \DateTimeInterface) {
return datetime_entry($this->ref->name(), $this->max);
}

$resultInt = (int) $this->max;

if ($this->max - $resultInt === 0.0) {
Expand Down
16 changes: 12 additions & 4 deletions src/core/etl/src/Flow/ETL/Function/Min.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\{float_entry, int_entry};
use function Flow\ETL\DSL\{datetime_entry, float_entry, int_entry};
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Row;
use Flow\ETL\Row\{Entry, Reference};

final class Min implements AggregatingFunction
{
private ?float $min;
private float|\DateTimeInterface|null $min;

public function __construct(private readonly Reference $ref)
{
Expand All @@ -27,10 +27,14 @@ public function aggregate(Row $row) : void
if ($this->min === null) {
if (\is_numeric($value)) {
$this->min = (float) $value;
} elseif ($value instanceof \DateTimeInterface) {
$this->min = $value;
}
} else {
if (\is_numeric($value)) {
$this->min = \min($this->min, (float) $value);
} elseif ($value instanceof \DateTimeInterface) {
$this->min = \min($this->min, $value);
}
}
} catch (InvalidArgumentException) {
Expand All @@ -44,12 +48,16 @@ public function result() : Entry
$this->ref->as($this->ref->to() . '_min');
}

$resultInt = (int) $this->min;

if ($this->min === null) {
return int_entry($this->ref->name(), null);
}

if ($this->min instanceof \DateTimeInterface) {
return datetime_entry($this->ref->name(), $this->min);
}

$resultInt = (int) $this->min;

if ($this->min - $resultInt === 0.0) {
return int_entry($this->ref->name(), (int) $this->min);
}
Expand Down
52 changes: 33 additions & 19 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/Function/MaxTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

namespace Flow\ETL\Tests\Unit\Function;

use function Flow\ETL\DSL\{float_entry, int_entry, ref, str_entry};
use Flow\ETL\Row;
use function Flow\ETL\DSL\{datetime_entry, float_entry, int_entry, ref, row, str_entry};
use PHPUnit\Framework\TestCase;

final class MaxTest extends TestCase
Expand All @@ -14,11 +13,11 @@ public function test_aggregation_max_from_numeric_values() : void
{
$aggregator = \Flow\ETL\DSL\max(ref('int'));

$aggregator->aggregate(Row::create(str_entry('int', '10')));
$aggregator->aggregate(Row::create(str_entry('int', '20')));
$aggregator->aggregate(Row::create(str_entry('int', '55')));
$aggregator->aggregate(Row::create(str_entry('int', '25')));
$aggregator->aggregate(Row::create(str_entry('not_int', null)));
$aggregator->aggregate(row(str_entry('int', '10')));
$aggregator->aggregate(row(str_entry('int', '20')));
$aggregator->aggregate(row(str_entry('int', '55')));
$aggregator->aggregate(row(str_entry('int', '25')));
$aggregator->aggregate(row(str_entry('not_int', null)));

self::assertSame(
55,
Expand All @@ -30,25 +29,40 @@ public function test_aggregation_max_including_null_value() : void
{
$aggregator = \Flow\ETL\DSL\max(ref('int'));

$aggregator->aggregate(Row::create(int_entry('int', 10)));
$aggregator->aggregate(Row::create(int_entry('int', 20)));
$aggregator->aggregate(Row::create(int_entry('int', 30)));
$aggregator->aggregate(Row::create(str_entry('int', null)));
$aggregator->aggregate(row(int_entry('int', 10)));
$aggregator->aggregate(row(int_entry('int', 20)));
$aggregator->aggregate(row(int_entry('int', 30)));
$aggregator->aggregate(row(str_entry('int', null)));

self::assertSame(
30,
$aggregator->result()->value()
);
}

public function test_aggregation_max_with_datetime_values() : void
{
$aggregator = \Flow\ETL\DSL\max(ref('datetime'));

$aggregator->aggregate(row(datetime_entry('datetime', '2021-01-01 00:00:00')));
$aggregator->aggregate(row(datetime_entry('datetime', '2021-01-02 00:00:00')));
$aggregator->aggregate(row(datetime_entry('datetime', '2021-01-03 00:00:00')));
$aggregator->aggregate(row(datetime_entry('datetime', '2021-01-04 00:00:00')));

self::assertEquals(
new \DateTimeImmutable('2021-01-04 00:00:00'),
$aggregator->result()->value()
);
}

public function test_aggregation_max_with_float_result() : void
{
$aggregator = \Flow\ETL\DSL\max(ref('int'));

$aggregator->aggregate(Row::create(int_entry('int', 10)));
$aggregator->aggregate(Row::create(int_entry('int', 20)));
$aggregator->aggregate(Row::create(float_entry('int', 30.5)));
$aggregator->aggregate(Row::create(int_entry('int', 25)));
$aggregator->aggregate(row(int_entry('int', 10)));
$aggregator->aggregate(row(int_entry('int', 20)));
$aggregator->aggregate(row(float_entry('int', 30.5)));
$aggregator->aggregate(row(int_entry('int', 25)));

self::assertSame(
30.5,
Expand All @@ -60,10 +74,10 @@ public function test_aggregation_max_with_integer_result() : void
{
$aggregator = \Flow\ETL\DSL\max(ref('int'));

$aggregator->aggregate(Row::create(int_entry('int', 10)));
$aggregator->aggregate(Row::create(int_entry('int', 20)));
$aggregator->aggregate(Row::create(int_entry('int', 30)));
$aggregator->aggregate(Row::create(int_entry('int', 40)));
$aggregator->aggregate(row(int_entry('int', 10)));
$aggregator->aggregate(row(int_entry('int', 20)));
$aggregator->aggregate(row(int_entry('int', 30)));
$aggregator->aggregate(row(int_entry('int', 40)));

self::assertSame(
40,
Expand Down
52 changes: 33 additions & 19 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/Function/MinTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

namespace Flow\ETL\Tests\Unit\Function;

use function Flow\ETL\DSL\{float_entry, int_entry, min, ref, str_entry};
use Flow\ETL\Row;
use function Flow\ETL\DSL\{datetime_entry, float_entry, int_entry, min, ref, row, str_entry};
use PHPUnit\Framework\TestCase;

final class MinTest extends TestCase
Expand All @@ -14,11 +13,11 @@ public function test_aggregation_min_from_numeric_values() : void
{
$aggregator = min(ref('int'));

$aggregator->aggregate(Row::create(str_entry('int', '10')));
$aggregator->aggregate(Row::create(str_entry('int', '20')));
$aggregator->aggregate(Row::create(str_entry('int', '55')));
$aggregator->aggregate(Row::create(str_entry('int', '25')));
$aggregator->aggregate(Row::create(str_entry('not_int', null)));
$aggregator->aggregate(row(str_entry('int', '10')));
$aggregator->aggregate(row(str_entry('int', '20')));
$aggregator->aggregate(row(str_entry('int', '55')));
$aggregator->aggregate(row(str_entry('int', '25')));
$aggregator->aggregate(row(str_entry('not_int', null)));

self::assertSame(
10,
Expand All @@ -30,25 +29,40 @@ public function test_aggregation_min_including_null_value() : void
{
$aggregator = min(ref('int'));

$aggregator->aggregate(Row::create(int_entry('int', 10)));
$aggregator->aggregate(Row::create(int_entry('int', 20)));
$aggregator->aggregate(Row::create(int_entry('int', 30)));
$aggregator->aggregate(Row::create(str_entry('int', null)));
$aggregator->aggregate(row(int_entry('int', 10)));
$aggregator->aggregate(row(int_entry('int', 20)));
$aggregator->aggregate(row(int_entry('int', 30)));
$aggregator->aggregate(row(str_entry('int', null)));

self::assertSame(
10,
$aggregator->result()->value()
);
}

public function test_aggregation_min_with_datetime_values() : void
{
$aggregator = min(ref('datetime'));

$aggregator->aggregate(row(datetime_entry('datetime', '2021-01-01 00:00:00')));
$aggregator->aggregate(row(datetime_entry('datetime', '2021-01-02 00:00:00')));
$aggregator->aggregate(row(datetime_entry('datetime', '2021-01-03 00:00:00')));
$aggregator->aggregate(row(datetime_entry('datetime', '2021-01-04 00:00:00')));

self::assertEquals(
new \DateTimeImmutable('2021-01-01 00:00:00'),
$aggregator->result()->value()
);
}

public function test_aggregation_min_with_float_result() : void
{
$aggregator = min(ref('int'));

$aggregator->aggregate(Row::create(float_entry('int', 10.25)));
$aggregator->aggregate(Row::create(int_entry('int', 20)));
$aggregator->aggregate(Row::create(int_entry('int', 305)));
$aggregator->aggregate(Row::create(int_entry('int', 25)));
$aggregator->aggregate(row(float_entry('int', 10.25)));
$aggregator->aggregate(row(int_entry('int', 20)));
$aggregator->aggregate(row(int_entry('int', 305)));
$aggregator->aggregate(row(int_entry('int', 25)));

self::assertSame(
10.25,
Expand All @@ -60,10 +74,10 @@ public function test_aggregation_min_with_integer_result() : void
{
$aggregator = min(ref('int'));

$aggregator->aggregate(Row::create(int_entry('int', 10)));
$aggregator->aggregate(Row::create(int_entry('int', 20)));
$aggregator->aggregate(Row::create(int_entry('int', 30)));
$aggregator->aggregate(Row::create(int_entry('int', 40)));
$aggregator->aggregate(row(int_entry('int', 10)));
$aggregator->aggregate(row(int_entry('int', 20)));
$aggregator->aggregate(row(int_entry('int', 30)));
$aggregator->aggregate(row(int_entry('int', 40)));

self::assertSame(
10,
Expand Down

0 comments on commit e5bbefe

Please sign in to comment.