Skip to content

Commit

Permalink
Added type comparable assertion to scalar comparison functions to avo…
Browse files Browse the repository at this point in the history
…id unpredictable comparisons between different types (#1081)
  • Loading branch information
norberttech authored May 22, 2024
1 parent 6d5d8db commit 4c3a553
Show file tree
Hide file tree
Showing 47 changed files with 353 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\Partition\{NoopFilter, ScalarFunctionFilter};
use Flow\ETL\PHP\Type\{AutoCaster, Caster};
use Flow\ETL\Row\Factory\NativeEntryFactory;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -90,7 +91,8 @@ public function test_reading_multi_partitioned_path() : void
ref('date')->cast('date')->lessThan(lit(new \DateTimeImmutable('2022-01-04 00:00:00')))
)
),
new NativeEntryFactory()
new NativeEntryFactory(),
new AutoCaster(Caster::default())
)
)
);
Expand Down Expand Up @@ -129,7 +131,7 @@ public function test_reading_partitioned_folder_with_partitions_filtering() : vo
(new FlysystemFS())
->scan(
new Path(__DIR__ . '/Fixtures/partitioned/**/*.txt'),
new ScalarFunctionFilter(ref('partition_01')->equals(lit('b')), new NativeEntryFactory())
new ScalarFunctionFilter(ref('partition_01')->equals(lit('b')), new NativeEntryFactory(), new AutoCaster(Caster::default()))
)
)
);
Expand Down
7 changes: 7 additions & 0 deletions src/core/etl/src/Flow/ETL/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Pipeline\Optimizer;
use Flow\ETL\Row\EntryFactory;
use Flow\Serializer\Serializer;
Expand All @@ -30,6 +31,7 @@ public function __construct(
private readonly ExternalSort $externalSort,
private readonly FilesystemStreams $filesystemStreams,
private readonly Optimizer $optimizer,
private readonly Caster $caster,
private readonly bool $putInputIntoRows,
private readonly EntryFactory $entryFactory,
private readonly int $cacheBatchSize
Expand Down Expand Up @@ -62,6 +64,11 @@ public function cacheBatchSize() : int
return $this->cacheBatchSize;
}

public function caster() : Caster
{
return $this->caster;
}

public function entryFactory() : EntryFactory
{
return $this->entryFactory;
Expand Down
7 changes: 7 additions & 0 deletions src/core/etl/src/Flow/ETL/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Flow\ETL\ExternalSort\MemorySort;
use Flow\ETL\Filesystem\{FilesystemStreams, LocalFilesystem};
use Flow\ETL\Monitoring\Memory\Unit;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Pipeline\Optimizer;
use Flow\ETL\Row\Factory\NativeEntryFactory;
use Flow\Serializer\{Base64Serializer, NativePHPSerializer, Serializer};
Expand All @@ -22,6 +23,8 @@ final class ConfigBuilder
*/
private int $cacheBatchSize = 1000;

private ?Caster $caster;

private ?ExternalSort $externalSort;

private ?Filesystem $filesystem;
Expand All @@ -43,6 +46,7 @@ public function __construct()
$this->filesystem = null;
$this->putInputIntoRows = false;
$this->optimizer = null;
$this->caster = null;
}

/**
Expand Down Expand Up @@ -89,13 +93,16 @@ public function build() : Config
new Optimizer\BatchSizeOptimization(batchSize: 1000)
);

$this->caster ??= Caster::default();

return new Config(
$this->id,
$this->serializer,
$this->cache,
$this->externalSort,
new FilesystemStreams($this->filesystem),
$this->optimizer,
$this->caster,
$this->putInputIntoRows,
$entryFactory,
$this->cacheBatchSize
Expand Down
12 changes: 9 additions & 3 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use Flow\ETL\Loader\SchemaValidationLoader;
use Flow\ETL\Loader\StreamLoader\Output;
use Flow\ETL\Partition\ScalarFunctionFilter;
use Flow\ETL\PHP\Type\{AutoCaster, Caster};
use Flow\ETL\PHP\Type\{AutoCaster};
use Flow\ETL\Pipeline\{BatchingPipeline,
CachingPipeline,
CollectingPipeline,
Expand Down Expand Up @@ -137,7 +137,7 @@ public function aggregate(AggregatingFunction ...$aggregations) : self

public function autoCast() : self
{
$this->pipeline->add(new AutoCastTransformer(new AutoCaster(Caster::default())));
$this->pipeline->add(new AutoCastTransformer(new AutoCaster($this->context->config->caster())));

return $this;
}
Expand Down Expand Up @@ -374,7 +374,13 @@ public function filterPartitions(Partition\PartitionFilter|ScalarFunction $filte
return $this;
}

$extractor->addPartitionFilter(new ScalarFunctionFilter($filter, $this->context->entryFactory()));
$extractor->addPartitionFilter(
new ScalarFunctionFilter(
$filter,
$this->context->entryFactory(),
new AutoCaster($this->context->config->caster())
)
);

return $this;
}
Expand Down
22 changes: 22 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/Comparison/Comparable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Function\Comparison;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\PHP\Type\TypeDetector;

trait Comparable
{
public function assertComparable(mixed $base, mixed $next, string $symbol) : void
{
$detector = new TypeDetector();
$baseType = $detector->detectType($base);
$nextType = $detector->detectType($next);

if (!$baseType->isComparableWith($nextType)) {
throw new InvalidArgumentException(\sprintf("Can't compare '(%s %s %s)' due to data type mismatch.", $baseType->toString(), $symbol, $nextType->toString()));
}
}
}
4 changes: 3 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/Concat.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\type_string;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Row;

final class Concat extends ScalarFunctionChain
Expand All @@ -22,7 +24,7 @@ public function __construct(
public function eval(Row $row) : mixed
{
$values = \array_map(function (ScalarFunction $ref) use ($row) : mixed {
return (new Cast($ref, 'string'))->eval($row);
return Caster::default()->to(type_string(true))->value($ref->eval($row));
}, $this->refs);

foreach ($values as $value) {
Expand Down
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/Equals.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Function;

use Flow\ETL\Function\Comparison\Comparable;
use Flow\ETL\Row;

final class Equals extends ScalarFunctionChain
{
use Comparable;

public function __construct(
private readonly ScalarFunction $base,
private readonly ScalarFunction $next
Expand All @@ -19,6 +22,8 @@ public function eval(Row $row) : bool
$base = $this->base->eval($row);
$next = $this->next->eval($row);

$this->assertComparable($base, $next, '==');

return $base == $next;
}
}
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/GreaterThan.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Function;

use Flow\ETL\Function\Comparison\Comparable;
use Flow\ETL\Row;

final class GreaterThan extends ScalarFunctionChain
{
use Comparable;

public function __construct(
private readonly ScalarFunction $base,
private readonly ScalarFunction $next
Expand All @@ -19,6 +22,8 @@ public function eval(Row $row) : bool
$base = $this->base->eval($row);
$next = $this->next->eval($row);

$this->assertComparable($base, $next, '>');

return $base > $next;
}
}
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Function;

use Flow\ETL\Function\Comparison\Comparable;
use Flow\ETL\Row;

final class GreaterThanEqual extends ScalarFunctionChain
{
use Comparable;

public function __construct(
private readonly ScalarFunction $base,
private readonly ScalarFunction $next
Expand All @@ -19,6 +22,8 @@ public function eval(Row $row) : bool
$base = $this->base->eval($row);
$next = $this->next->eval($row);

$this->assertComparable($base, $next, '>=');

return $base >= $next;
}
}
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/LessThan.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Function;

use Flow\ETL\Function\Comparison\Comparable;
use Flow\ETL\Row;

final class LessThan extends ScalarFunctionChain
{
use Comparable;

public function __construct(
private readonly ScalarFunction $base,
private readonly ScalarFunction $next
Expand All @@ -19,6 +22,8 @@ public function eval(Row $row) : bool
$base = $this->base->eval($row);
$next = $this->next->eval($row);

$this->assertComparable($base, $next, '<');

return $base < $next;
}
}
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/LessThanEqual.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Function;

use Flow\ETL\Function\Comparison\Comparable;
use Flow\ETL\Row;

final class LessThanEqual extends ScalarFunctionChain
{
use Comparable;

public function __construct(
private readonly ScalarFunction $base,
private readonly ScalarFunction $next
Expand All @@ -19,6 +22,8 @@ public function eval(Row $row) : bool
$base = $this->base->eval($row);
$next = $this->next->eval($row);

$this->assertComparable($base, $next, '<=');

return $base <= $next;
}
}
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/NotEquals.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Function;

use Flow\ETL\Function\Comparison\Comparable;
use Flow\ETL\Row;

final class NotEquals extends ScalarFunctionChain
{
use Comparable;

public function __construct(
private readonly ScalarFunction $base,
private readonly ScalarFunction $next
Expand All @@ -19,6 +22,8 @@ public function eval(Row $row) : bool
$base = $this->base->eval($row);
$next = $this->next->eval($row);

$this->assertComparable($base, $next, '!=');

return $base != $next;
}
}
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/NotSame.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Function;

use Flow\ETL\Function\Comparison\Comparable;
use Flow\ETL\Row;

final class NotSame extends ScalarFunctionChain
{
use Comparable;

public function __construct(
private readonly ScalarFunction $base,
private readonly ScalarFunction $next
Expand All @@ -19,6 +22,8 @@ public function eval(Row $row) : bool
$base = $this->base->eval($row);
$next = $this->next->eval($row);

$this->assertComparable($base, $next, '!==');

return $base !== $next;
}
}
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/Same.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Function;

use Flow\ETL\Function\Comparison\Comparable;
use Flow\ETL\Row;

final class Same extends ScalarFunctionChain
{
use Comparable;

public function __construct(
private readonly ScalarFunction $base,
private readonly ScalarFunction $next
Expand All @@ -19,6 +22,8 @@ public function eval(Row $row) : bool
$base = $this->base->eval($row);
$next = $this->next->eval($row);

$this->assertComparable($base, $next, '===');

return $base === $next;
}
}
6 changes: 4 additions & 2 deletions src/core/etl/src/Flow/ETL/Function/Sanitize.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\{type_int, type_string};
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Row;

final class Sanitize extends ScalarFunctionChain
Expand All @@ -24,8 +26,8 @@ public function eval(Row $row) : ?string
return null;
}

$placeholder = (string) $this->placeholder->eval($row);
$skipCharacters = (int) $this->skipCharacters->eval($row);
$placeholder = Caster::default()->to(type_string(true))->value($this->placeholder->eval($row));
$skipCharacters = Caster::default()->to(type_int(true))->value($this->skipCharacters->eval($row));

$size = \mb_strlen($val);

Expand Down
6 changes: 4 additions & 2 deletions src/core/etl/src/Flow/ETL/Function/StartsWith.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\type_string;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Row;

final class StartsWith extends ScalarFunctionChain
Expand All @@ -16,8 +18,8 @@ public function __construct(

public function eval(Row $row) : bool
{
$haystack = $this->haystack->eval($row);
$needle = $this->needle->eval($row);
$haystack = Caster::default()->to(type_string(true))->value($this->haystack->eval($row));
$needle = Caster::default()->to(type_string(true))->value($this->needle->eval($row));

if (!\is_string($needle) || !\is_string($haystack)) {
return false;
Expand Down
Loading

0 comments on commit 4c3a553

Please sign in to comment.