diff --git a/src/adapter/etl-adapter-filesystem/tests/Flow/ETL/Adapter/Filesystem/Tests/Integration/FlysystemFSTest.php b/src/adapter/etl-adapter-filesystem/tests/Flow/ETL/Adapter/Filesystem/Tests/Integration/FlysystemFSTest.php index 89f3f8a05..433e0e9eb 100644 --- a/src/adapter/etl-adapter-filesystem/tests/Flow/ETL/Adapter/Filesystem/Tests/Integration/FlysystemFSTest.php +++ b/src/adapter/etl-adapter-filesystem/tests/Flow/ETL/Adapter/Filesystem/Tests/Integration/FlysystemFSTest.php @@ -9,6 +9,7 @@ use Flow\ETL\Filesystem\Path; use Flow\ETL\Filesystem\Stream\Mode; use Flow\ETL\Partition\{NoopFilter, ScalarFunctionFilter}; +use Flow\ETL\PHP\Type\{AutoCaster, Caster}; use Flow\ETL\Row\Factory\NativeEntryFactory; use PHPUnit\Framework\TestCase; @@ -90,7 +91,8 @@ public function test_reading_multi_partitioned_path() : void ref('date')->cast('date')->lessThan(lit(new \DateTimeImmutable('2022-01-04 00:00:00'))) ) ), - new NativeEntryFactory() + new NativeEntryFactory(), + new AutoCaster(Caster::default()) ) ) ); @@ -129,7 +131,7 @@ public function test_reading_partitioned_folder_with_partitions_filtering() : vo (new FlysystemFS()) ->scan( new Path(__DIR__ . '/Fixtures/partitioned/**/*.txt'), - new ScalarFunctionFilter(ref('partition_01')->equals(lit('b')), new NativeEntryFactory()) + new ScalarFunctionFilter(ref('partition_01')->equals(lit('b')), new NativeEntryFactory(), new AutoCaster(Caster::default())) ) ) ); diff --git a/src/core/etl/src/Flow/ETL/Config.php b/src/core/etl/src/Flow/ETL/Config.php index 0d12bbef9..68361efb5 100644 --- a/src/core/etl/src/Flow/ETL/Config.php +++ b/src/core/etl/src/Flow/ETL/Config.php @@ -6,6 +6,7 @@ use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Filesystem\FilesystemStreams; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Pipeline\Optimizer; use Flow\ETL\Row\EntryFactory; use Flow\Serializer\Serializer; @@ -30,6 +31,7 @@ public function __construct( private readonly ExternalSort $externalSort, private readonly FilesystemStreams $filesystemStreams, private readonly Optimizer $optimizer, + private readonly Caster $caster, private readonly bool $putInputIntoRows, private readonly EntryFactory $entryFactory, private readonly int $cacheBatchSize @@ -62,6 +64,11 @@ public function cacheBatchSize() : int return $this->cacheBatchSize; } + public function caster() : Caster + { + return $this->caster; + } + public function entryFactory() : EntryFactory { return $this->entryFactory; diff --git a/src/core/etl/src/Flow/ETL/ConfigBuilder.php b/src/core/etl/src/Flow/ETL/ConfigBuilder.php index 206f7046a..ad0c5c136 100644 --- a/src/core/etl/src/Flow/ETL/ConfigBuilder.php +++ b/src/core/etl/src/Flow/ETL/ConfigBuilder.php @@ -9,6 +9,7 @@ use Flow\ETL\ExternalSort\MemorySort; use Flow\ETL\Filesystem\{FilesystemStreams, LocalFilesystem}; use Flow\ETL\Monitoring\Memory\Unit; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Pipeline\Optimizer; use Flow\ETL\Row\Factory\NativeEntryFactory; use Flow\Serializer\{Base64Serializer, NativePHPSerializer, Serializer}; @@ -22,6 +23,8 @@ final class ConfigBuilder */ private int $cacheBatchSize = 1000; + private ?Caster $caster; + private ?ExternalSort $externalSort; private ?Filesystem $filesystem; @@ -43,6 +46,7 @@ public function __construct() $this->filesystem = null; $this->putInputIntoRows = false; $this->optimizer = null; + $this->caster = null; } /** @@ -89,6 +93,8 @@ public function build() : Config new Optimizer\BatchSizeOptimization(batchSize: 1000) ); + $this->caster ??= Caster::default(); + return new Config( $this->id, $this->serializer, @@ -96,6 +102,7 @@ public function build() : Config $this->externalSort, new FilesystemStreams($this->filesystem), $this->optimizer, + $this->caster, $this->putInputIntoRows, $entryFactory, $this->cacheBatchSize diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index 3115b9865..8ac3f91db 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -16,7 +16,7 @@ use Flow\ETL\Loader\SchemaValidationLoader; use Flow\ETL\Loader\StreamLoader\Output; use Flow\ETL\Partition\ScalarFunctionFilter; -use Flow\ETL\PHP\Type\{AutoCaster, Caster}; +use Flow\ETL\PHP\Type\{AutoCaster}; use Flow\ETL\Pipeline\{BatchingPipeline, CachingPipeline, CollectingPipeline, @@ -137,7 +137,7 @@ public function aggregate(AggregatingFunction ...$aggregations) : self public function autoCast() : self { - $this->pipeline->add(new AutoCastTransformer(new AutoCaster(Caster::default()))); + $this->pipeline->add(new AutoCastTransformer(new AutoCaster($this->context->config->caster()))); return $this; } @@ -374,7 +374,13 @@ public function filterPartitions(Partition\PartitionFilter|ScalarFunction $filte return $this; } - $extractor->addPartitionFilter(new ScalarFunctionFilter($filter, $this->context->entryFactory())); + $extractor->addPartitionFilter( + new ScalarFunctionFilter( + $filter, + $this->context->entryFactory(), + new AutoCaster($this->context->config->caster()) + ) + ); return $this; } diff --git a/src/core/etl/src/Flow/ETL/Function/Comparison/Comparable.php b/src/core/etl/src/Flow/ETL/Function/Comparison/Comparable.php new file mode 100644 index 000000000..75e27b7b6 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Function/Comparison/Comparable.php @@ -0,0 +1,22 @@ +detectType($base); + $nextType = $detector->detectType($next); + + if (!$baseType->isComparableWith($nextType)) { + throw new InvalidArgumentException(\sprintf("Can't compare '(%s %s %s)' due to data type mismatch.", $baseType->toString(), $symbol, $nextType->toString())); + } + } +} diff --git a/src/core/etl/src/Flow/ETL/Function/Concat.php b/src/core/etl/src/Flow/ETL/Function/Concat.php index 97472e016..89eec0737 100644 --- a/src/core/etl/src/Flow/ETL/Function/Concat.php +++ b/src/core/etl/src/Flow/ETL/Function/Concat.php @@ -4,6 +4,8 @@ namespace Flow\ETL\Function; +use function Flow\ETL\DSL\type_string; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row; final class Concat extends ScalarFunctionChain @@ -22,7 +24,7 @@ public function __construct( public function eval(Row $row) : mixed { $values = \array_map(function (ScalarFunction $ref) use ($row) : mixed { - return (new Cast($ref, 'string'))->eval($row); + return Caster::default()->to(type_string(true))->value($ref->eval($row)); }, $this->refs); foreach ($values as $value) { diff --git a/src/core/etl/src/Flow/ETL/Function/Equals.php b/src/core/etl/src/Flow/ETL/Function/Equals.php index 1de450037..ddbda6800 100644 --- a/src/core/etl/src/Flow/ETL/Function/Equals.php +++ b/src/core/etl/src/Flow/ETL/Function/Equals.php @@ -4,10 +4,13 @@ namespace Flow\ETL\Function; +use Flow\ETL\Function\Comparison\Comparable; use Flow\ETL\Row; final class Equals extends ScalarFunctionChain { + use Comparable; + public function __construct( private readonly ScalarFunction $base, private readonly ScalarFunction $next @@ -19,6 +22,8 @@ public function eval(Row $row) : bool $base = $this->base->eval($row); $next = $this->next->eval($row); + $this->assertComparable($base, $next, '=='); + return $base == $next; } } diff --git a/src/core/etl/src/Flow/ETL/Function/GreaterThan.php b/src/core/etl/src/Flow/ETL/Function/GreaterThan.php index f8471b8e3..a03f3968d 100644 --- a/src/core/etl/src/Flow/ETL/Function/GreaterThan.php +++ b/src/core/etl/src/Flow/ETL/Function/GreaterThan.php @@ -4,10 +4,13 @@ namespace Flow\ETL\Function; +use Flow\ETL\Function\Comparison\Comparable; use Flow\ETL\Row; final class GreaterThan extends ScalarFunctionChain { + use Comparable; + public function __construct( private readonly ScalarFunction $base, private readonly ScalarFunction $next @@ -19,6 +22,8 @@ public function eval(Row $row) : bool $base = $this->base->eval($row); $next = $this->next->eval($row); + $this->assertComparable($base, $next, '>'); + return $base > $next; } } diff --git a/src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php b/src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php index e91e5216c..912b7796a 100644 --- a/src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php +++ b/src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php @@ -4,10 +4,13 @@ namespace Flow\ETL\Function; +use Flow\ETL\Function\Comparison\Comparable; use Flow\ETL\Row; final class GreaterThanEqual extends ScalarFunctionChain { + use Comparable; + public function __construct( private readonly ScalarFunction $base, private readonly ScalarFunction $next @@ -19,6 +22,8 @@ public function eval(Row $row) : bool $base = $this->base->eval($row); $next = $this->next->eval($row); + $this->assertComparable($base, $next, '>='); + return $base >= $next; } } diff --git a/src/core/etl/src/Flow/ETL/Function/LessThan.php b/src/core/etl/src/Flow/ETL/Function/LessThan.php index 4c385487b..3ab7b60a1 100644 --- a/src/core/etl/src/Flow/ETL/Function/LessThan.php +++ b/src/core/etl/src/Flow/ETL/Function/LessThan.php @@ -4,10 +4,13 @@ namespace Flow\ETL\Function; +use Flow\ETL\Function\Comparison\Comparable; use Flow\ETL\Row; final class LessThan extends ScalarFunctionChain { + use Comparable; + public function __construct( private readonly ScalarFunction $base, private readonly ScalarFunction $next @@ -19,6 +22,8 @@ public function eval(Row $row) : bool $base = $this->base->eval($row); $next = $this->next->eval($row); + $this->assertComparable($base, $next, '<'); + return $base < $next; } } diff --git a/src/core/etl/src/Flow/ETL/Function/LessThanEqual.php b/src/core/etl/src/Flow/ETL/Function/LessThanEqual.php index c040a6c55..f328b63f0 100644 --- a/src/core/etl/src/Flow/ETL/Function/LessThanEqual.php +++ b/src/core/etl/src/Flow/ETL/Function/LessThanEqual.php @@ -4,10 +4,13 @@ namespace Flow\ETL\Function; +use Flow\ETL\Function\Comparison\Comparable; use Flow\ETL\Row; final class LessThanEqual extends ScalarFunctionChain { + use Comparable; + public function __construct( private readonly ScalarFunction $base, private readonly ScalarFunction $next @@ -19,6 +22,8 @@ public function eval(Row $row) : bool $base = $this->base->eval($row); $next = $this->next->eval($row); + $this->assertComparable($base, $next, '<='); + return $base <= $next; } } diff --git a/src/core/etl/src/Flow/ETL/Function/NotEquals.php b/src/core/etl/src/Flow/ETL/Function/NotEquals.php index e593e2e7a..1135dfcc3 100644 --- a/src/core/etl/src/Flow/ETL/Function/NotEquals.php +++ b/src/core/etl/src/Flow/ETL/Function/NotEquals.php @@ -4,10 +4,13 @@ namespace Flow\ETL\Function; +use Flow\ETL\Function\Comparison\Comparable; use Flow\ETL\Row; final class NotEquals extends ScalarFunctionChain { + use Comparable; + public function __construct( private readonly ScalarFunction $base, private readonly ScalarFunction $next @@ -19,6 +22,8 @@ public function eval(Row $row) : bool $base = $this->base->eval($row); $next = $this->next->eval($row); + $this->assertComparable($base, $next, '!='); + return $base != $next; } } diff --git a/src/core/etl/src/Flow/ETL/Function/NotSame.php b/src/core/etl/src/Flow/ETL/Function/NotSame.php index 3cacf822b..70116cbbe 100644 --- a/src/core/etl/src/Flow/ETL/Function/NotSame.php +++ b/src/core/etl/src/Flow/ETL/Function/NotSame.php @@ -4,10 +4,13 @@ namespace Flow\ETL\Function; +use Flow\ETL\Function\Comparison\Comparable; use Flow\ETL\Row; final class NotSame extends ScalarFunctionChain { + use Comparable; + public function __construct( private readonly ScalarFunction $base, private readonly ScalarFunction $next @@ -19,6 +22,8 @@ public function eval(Row $row) : bool $base = $this->base->eval($row); $next = $this->next->eval($row); + $this->assertComparable($base, $next, '!=='); + return $base !== $next; } } diff --git a/src/core/etl/src/Flow/ETL/Function/Same.php b/src/core/etl/src/Flow/ETL/Function/Same.php index 3fad94283..e8a480ac4 100644 --- a/src/core/etl/src/Flow/ETL/Function/Same.php +++ b/src/core/etl/src/Flow/ETL/Function/Same.php @@ -4,10 +4,13 @@ namespace Flow\ETL\Function; +use Flow\ETL\Function\Comparison\Comparable; use Flow\ETL\Row; final class Same extends ScalarFunctionChain { + use Comparable; + public function __construct( private readonly ScalarFunction $base, private readonly ScalarFunction $next @@ -19,6 +22,8 @@ public function eval(Row $row) : bool $base = $this->base->eval($row); $next = $this->next->eval($row); + $this->assertComparable($base, $next, '==='); + return $base === $next; } } diff --git a/src/core/etl/src/Flow/ETL/Function/Sanitize.php b/src/core/etl/src/Flow/ETL/Function/Sanitize.php index 95e7f805e..97b4e92fd 100644 --- a/src/core/etl/src/Flow/ETL/Function/Sanitize.php +++ b/src/core/etl/src/Flow/ETL/Function/Sanitize.php @@ -4,6 +4,8 @@ namespace Flow\ETL\Function; +use function Flow\ETL\DSL\{type_int, type_string}; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row; final class Sanitize extends ScalarFunctionChain @@ -24,8 +26,8 @@ public function eval(Row $row) : ?string return null; } - $placeholder = (string) $this->placeholder->eval($row); - $skipCharacters = (int) $this->skipCharacters->eval($row); + $placeholder = Caster::default()->to(type_string(true))->value($this->placeholder->eval($row)); + $skipCharacters = Caster::default()->to(type_int(true))->value($this->skipCharacters->eval($row)); $size = \mb_strlen($val); diff --git a/src/core/etl/src/Flow/ETL/Function/StartsWith.php b/src/core/etl/src/Flow/ETL/Function/StartsWith.php index f1d327990..94c1ff1ad 100644 --- a/src/core/etl/src/Flow/ETL/Function/StartsWith.php +++ b/src/core/etl/src/Flow/ETL/Function/StartsWith.php @@ -4,6 +4,8 @@ namespace Flow\ETL\Function; +use function Flow\ETL\DSL\type_string; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row; final class StartsWith extends ScalarFunctionChain @@ -16,8 +18,8 @@ public function __construct( public function eval(Row $row) : bool { - $haystack = $this->haystack->eval($row); - $needle = $this->needle->eval($row); + $haystack = Caster::default()->to(type_string(true))->value($this->haystack->eval($row)); + $needle = Caster::default()->to(type_string(true))->value($this->needle->eval($row)); if (!\is_string($needle) || !\is_string($haystack)) { return false; diff --git a/src/core/etl/src/Flow/ETL/Function/StrPad.php b/src/core/etl/src/Flow/ETL/Function/StrPad.php index 4dc491a2d..1c6d743b3 100644 --- a/src/core/etl/src/Flow/ETL/Function/StrPad.php +++ b/src/core/etl/src/Flow/ETL/Function/StrPad.php @@ -4,6 +4,8 @@ namespace Flow\ETL\Function; +use function Flow\ETL\DSL\type_string; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row; final class StrPad extends ScalarFunctionChain @@ -18,8 +20,8 @@ public function __construct( public function eval(Row $row) : mixed { - /** @var mixed $val */ - $val = $this->ref->eval($row); + /** @var null|string $val */ + $val = Caster::default()->to(type_string(true))->value($this->ref->eval($row)); if (!\is_string($val)) { return null; diff --git a/src/core/etl/src/Flow/ETL/Function/StrReplace.php b/src/core/etl/src/Flow/ETL/Function/StrReplace.php index b3931bfdc..f6c5a34aa 100644 --- a/src/core/etl/src/Flow/ETL/Function/StrReplace.php +++ b/src/core/etl/src/Flow/ETL/Function/StrReplace.php @@ -4,6 +4,8 @@ namespace Flow\ETL\Function; +use function Flow\ETL\DSL\type_string; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row; final class StrReplace extends ScalarFunctionChain @@ -21,8 +23,8 @@ public function __construct( public function eval(Row $row) : mixed { - /** @var mixed $val */ - $val = $this->ref->eval($row); + /** @var null|string $val */ + $val = Caster::default()->to(type_string(true))->value($this->ref->eval($row)); if (!\is_string($val)) { return null; diff --git a/src/core/etl/src/Flow/ETL/Function/ToMoney.php b/src/core/etl/src/Flow/ETL/Function/ToMoney.php index 8023788ea..bc1db6407 100644 --- a/src/core/etl/src/Flow/ETL/Function/ToMoney.php +++ b/src/core/etl/src/Flow/ETL/Function/ToMoney.php @@ -4,7 +4,9 @@ namespace Flow\ETL\Function; +use function Flow\ETL\DSL\type_string; use Flow\ETL\Exception\RuntimeException; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row; use Money\Currencies\ISOCurrencies; use Money\Parser\DecimalMoneyParser; @@ -25,7 +27,7 @@ public function __construct( public function eval(Row $row) : ?Money { - $currency = $this->currencyRef->eval($row); + $currency = Caster::default()->to(type_string(true))->value($this->currencyRef->eval($row)); if (!\is_string($currency)) { return null; diff --git a/src/core/etl/src/Flow/ETL/Function/Trim.php b/src/core/etl/src/Flow/ETL/Function/Trim.php index 622bc8fa9..dbe043450 100644 --- a/src/core/etl/src/Flow/ETL/Function/Trim.php +++ b/src/core/etl/src/Flow/ETL/Function/Trim.php @@ -4,7 +4,9 @@ namespace Flow\ETL\Function; +use function Flow\ETL\DSL\type_string; use Flow\ETL\Function\Trim\Type; +use Flow\ETL\PHP\Type\Caster; use Flow\ETL\Row; final class Trim extends ScalarFunctionChain @@ -18,8 +20,8 @@ public function __construct( public function eval(Row $row) : mixed { - /** @var mixed $value */ - $value = $this->ref->eval($row); + /** @var null|string $value */ + $value = Caster::default()->to(type_string(true))->value($this->ref->eval($row)); if (!\is_string($value)) { return null; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/DateTimeType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/DateTimeType.php index 7dd79aed5..60cd89757 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/DateTimeType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/DateTimeType.php @@ -19,6 +19,19 @@ public static function fromArray(array $data) : self return new self($data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/JsonType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/JsonType.php index c31d66511..a1e9feda2 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/JsonType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/JsonType.php @@ -20,6 +20,19 @@ public static function fromArray(array $data) : self return new self($data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/ListType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/ListType.php index 7a1d76b29..7b489a04f 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/ListType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/ListType.php @@ -25,6 +25,19 @@ public function element() : ListElement return $this->element; } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { if (!$type instanceof self) { diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/MapType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/MapType.php index 1db25781e..789563b67 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/MapType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/MapType.php @@ -20,6 +20,19 @@ public static function fromArray(array $data) : self return new self(MapKey::fromArray($data['key']), MapValue::fromArray($data['value']), $data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { if (!$type instanceof self) { diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/StructureType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/StructureType.php index a7303993d..393080fec 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/StructureType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/StructureType.php @@ -52,6 +52,19 @@ public function elements() : array return $this->elements; } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { if (!$type instanceof self) { diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/UuidType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/UuidType.php index feb6070d9..05c006b27 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/UuidType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/UuidType.php @@ -20,6 +20,19 @@ public static function fromArray(array $data) : Type return new self($data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/XMLElementType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/XMLElementType.php index 06307c70a..44529fa9c 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/XMLElementType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/XMLElementType.php @@ -19,6 +19,15 @@ public static function fromArray(array $data) : self return new self($data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/XMLType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/XMLType.php index 0fa33d3c3..d6be1eebb 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Logical/XMLType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Logical/XMLType.php @@ -19,6 +19,15 @@ public static function fromArray(array $data) : self return new self($data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Native/ArrayType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Native/ArrayType.php index 987917b9f..743c8f2af 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Native/ArrayType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Native/ArrayType.php @@ -23,6 +23,19 @@ public static function fromArray(array $data) : self return new self($data['empty'] ?? false, $data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof NullType) { + return true; + } + + if ($type instanceof self) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self && $this->empty === $type->empty; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Native/CallableType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Native/CallableType.php index 72197a728..e0a9fc7eb 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Native/CallableType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Native/CallableType.php @@ -19,6 +19,15 @@ public static function fromArray(array $data) : self return new self($data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self && $this->nullable === $type->nullable; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Native/EnumType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Native/EnumType.php index b9265e079..e779acd8e 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Native/EnumType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Native/EnumType.php @@ -38,6 +38,23 @@ public static function of(string $class, bool $nullable = false) : self return new self($class, $nullable); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + if ($type instanceof NullType) { + return true; + } + + if ($type instanceof ScalarType && \is_a($this->class, \BackedEnum::class, true)) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self && $this->class === $type->class && $this->nullable === $type->nullable; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Native/NullType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Native/NullType.php index 697516754..d351d9e01 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Native/NullType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Native/NullType.php @@ -13,6 +13,11 @@ public static function fromArray(array $data) : self return new self(); } + public function isComparableWith(Type $type) : bool + { + return true; + } + public function isEqual(Type $type) : bool { return $type instanceof self; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Native/ObjectType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Native/ObjectType.php index 1aefdbe5c..f63843926 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Native/ObjectType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Native/ObjectType.php @@ -30,6 +30,15 @@ public static function fromArray(array $data) : self return new self($data['class'], $nullable); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self && $this->class === $type->class && $this->nullable === $type->nullable; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Native/ResourceType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Native/ResourceType.php index 5ef9fc042..51e6059e1 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Native/ResourceType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Native/ResourceType.php @@ -19,6 +19,15 @@ public static function fromArray(array $data) : self return new self($data['nullable'] ?? false); } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self && $this->nullable === $type->nullable; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Native/ScalarType.php b/src/core/etl/src/Flow/ETL/PHP/Type/Native/ScalarType.php index c35d674f5..9b6f9ed1f 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Native/ScalarType.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Native/ScalarType.php @@ -66,6 +66,19 @@ public function isBoolean() : bool return $this->type === self::BOOLEAN; } + public function isComparableWith(Type $type) : bool + { + if ($type instanceof self) { + return true; + } + + if ($type instanceof NullType) { + return true; + } + + return false; + } + public function isEqual(Type $type) : bool { return $type instanceof self && $type->type === $this->type && $this->nullable === $type->nullable; diff --git a/src/core/etl/src/Flow/ETL/PHP/Type/Type.php b/src/core/etl/src/Flow/ETL/PHP/Type/Type.php index b91b5ed30..d084c0dd1 100644 --- a/src/core/etl/src/Flow/ETL/PHP/Type/Type.php +++ b/src/core/etl/src/Flow/ETL/PHP/Type/Type.php @@ -8,6 +8,8 @@ interface Type { public static function fromArray(array $data) : self; + public function isComparableWith(self $type) : bool; + public function isEqual(self $type) : bool; public function isValid(mixed $value) : bool; diff --git a/src/core/etl/src/Flow/ETL/Partition/ScalarFunctionFilter.php b/src/core/etl/src/Flow/ETL/Partition/ScalarFunctionFilter.php index 23c5e05e0..7b4dae62e 100644 --- a/src/core/etl/src/Flow/ETL/Partition/ScalarFunctionFilter.php +++ b/src/core/etl/src/Flow/ETL/Partition/ScalarFunctionFilter.php @@ -7,29 +7,27 @@ use function Flow\ETL\DSL\row; use Flow\ETL\Function\ScalarFunction; use Flow\ETL\Partition; +use Flow\ETL\PHP\Type\AutoCaster; use Flow\ETL\Row\EntryFactory; final class ScalarFunctionFilter implements PartitionFilter { public function __construct( private readonly ScalarFunction $function, - private readonly EntryFactory $entryFactory + private readonly EntryFactory $entryFactory, + private readonly AutoCaster $caster ) { } public function keep(Partition ...$partitions) : bool { - try { - return (bool) $this->function->eval( - row( - ...\array_map( - fn (Partition $partition) => $this->entryFactory->create($partition->name, $partition->value), - $partitions - ) + return (bool) $this->function->eval( + row( + ...\array_map( + fn (Partition $partition) => $this->entryFactory->create($partition->name, $this->caster->cast($partition->value)), + $partitions ) - ); - } catch (\Exception $e) { - return false; - } + ) + ); } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php index 69f1a1a5e..0a1d788f9 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php @@ -191,7 +191,12 @@ public function test_pruning_single_partition() : void { $rows = df() ->read(from_text(__DIR__ . '/Fixtures/Partitioning/multi_partition_pruning_test/year=*/month=*/day=*/*.txt')) - ->filterPartitions(ref('year')->concat(lit('-'), ref('month')->strPadLeft(2, '0'), lit('-'), ref('day')->strPadLeft(2, '0'))->cast('date')->greaterThanEqual(lit(new \DateTimeImmutable('2023-01-01')))) + ->filterPartitions( + ref('year') + ->concat(lit('-'), ref('month')->strPadLeft(2, '0'), lit('-'), ref('day')->strPadLeft(2, '0')) + ->cast('date') + ->greaterThanEqual(lit(new \DateTimeImmutable('2023-01-01'))) + ) ->collect() ->select('year') ->withEntry('year', ref('year')->cast('int')) diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/LocalFilesystemTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/LocalFilesystemTest.php index 583681dd0..c705caf65 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/LocalFilesystemTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/LocalFilesystemTest.php @@ -8,6 +8,7 @@ use Flow\ETL\Filesystem\Stream\Mode; use Flow\ETL\Filesystem\{LocalFilesystem, Path}; use Flow\ETL\Partition\{NoopFilter, ScalarFunctionFilter}; +use Flow\ETL\PHP\Type\{AutoCaster, Caster}; use Flow\ETL\Row\Factory\NativeEntryFactory; use PHPUnit\Framework\TestCase; @@ -89,7 +90,8 @@ public function test_reading_multi_partitioned_path() : void ref('date')->cast('date')->lessThan(lit(new \DateTimeImmutable('2022-01-04'))) ) ), - new NativeEntryFactory() + new NativeEntryFactory(), + new AutoCaster(Caster::default()) ) ) ); @@ -128,7 +130,7 @@ public function test_reading_partitioned_folder_with_partitions_filtering() : vo (new LocalFilesystem()) ->scan( new Path(__DIR__ . '/Fixtures/partitioned/**/*.txt'), - new ScalarFunctionFilter(ref('partition_01')->equals(lit('b')), new NativeEntryFactory()) + new ScalarFunctionFilter(ref('partition_01')->equals(lit('b')), new NativeEntryFactory(), new AutoCaster(Caster::default())) ) ) ); diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StartsWithTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StartsWithTest.php index 127fdd8d4..02f528e4d 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StartsWithTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StartsWithTest.php @@ -49,7 +49,7 @@ public function test_starts_with_on_non_string_key() : void self::assertSame( [ - ['id' => 1, 'starts_with' => false], + ['id' => 1, 'starts_with' => true], ], $memory->dump() ); @@ -71,7 +71,7 @@ public function test_starts_with_on_non_string_value() : void self::assertSame( [ - ['id' => '1', 'starts_with' => false], + ['id' => '1', 'starts_with' => true], ], $memory->dump() ); diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StrPadTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StrPadTest.php index 25bcc173c..e881221df 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StrPadTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StrPadTest.php @@ -49,7 +49,7 @@ public function test_strpad_on_non_string_key() : void self::assertSame( [ - ['id' => 1, 'strpad' => null], + ['id' => 1, 'strpad' => '1 '], ], $memory->dump() ); diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StrReplaceTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StrReplaceTest.php index 43212f4a8..2d15b487c 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StrReplaceTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/StrReplaceTest.php @@ -49,7 +49,7 @@ public function test_str_replace_on_non_string_key() : void self::assertSame( [ - ['id' => 1, 'str_replace' => null], + ['id' => 1, 'str_replace' => '1'], ], $memory->dump() ); diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/TrimTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/TrimTest.php index 15b5e55f7..60cdb325c 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/TrimTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/TrimTest.php @@ -94,7 +94,7 @@ public function test_trim_on_non_string_key() : void self::assertSame( [ - ['id' => 1, 'trim' => null], + ['id' => 1, 'trim' => '1'], ], $memory->dump() ); diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/StrPadTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/StrPadTest.php index 2459646ba..8ba32ce96 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/StrPadTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/StrPadTest.php @@ -12,7 +12,8 @@ final class StrPadTest extends TestCase { public function test_str_pad_on_non_string_value() : void { - self::assertNull( + self::assertSame( + '-1000', ref('value')->strPad(5, '-', \STR_PAD_LEFT)->eval(Row::create(int_entry('value', 1000))), ); } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/StrReplaceTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/StrReplaceTest.php index 53ed863c5..eb2955004 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/StrReplaceTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/StrReplaceTest.php @@ -12,7 +12,8 @@ final class StrReplaceTest extends TestCase { public function test_str_replace_on_non_string_value() : void { - self::assertNull( + self::assertSame( + '1000', ref('value')->strReplace('test', '1')->eval(Row::create(int_entry('value', 1000))), ); } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/TrimTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/TrimTest.php index 86971726e..cd5538c6b 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/TrimTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/TrimTest.php @@ -21,7 +21,8 @@ public function test_trim_both_valid_string() : void public function test_trim_integer() : void { - self::assertNull( + self::assertSame( + '1', ref('integer')->trim()->eval(Row::create(int_entry('integer', 1))) ); } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Partition/ScalarFunctionFilterTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Partition/ScalarFunctionFilterTest.php index 8813d1d2e..221cafa9b 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Partition/ScalarFunctionFilterTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Partition/ScalarFunctionFilterTest.php @@ -7,6 +7,7 @@ use function Flow\ETL\DSL\{all, any, lit, ref}; use Flow\ETL\Partition; use Flow\ETL\Partition\ScalarFunctionFilter; +use Flow\ETL\PHP\Type\{AutoCaster, Caster}; use Flow\ETL\Row\Factory\NativeEntryFactory; use PHPUnit\Framework\TestCase; @@ -16,20 +17,47 @@ public function test_filtering() : void { $filter = new ScalarFunctionFilter( ref('foo')->greaterThan(lit(10)), - new NativeEntryFactory() + new NativeEntryFactory(), + new AutoCaster(Caster::default()) ); self::assertTrue($filter->keep(new Partition('foo', '100'))); self::assertFalse($filter->keep(new Partition('foo', '5'))); } + public function test_filtering_datetime_partitions() : void + { + $filter = new ScalarFunctionFilter( + ref('foo')->greaterThan(lit(new \DateTimeImmutable('2021-01-01'))), + new NativeEntryFactory(), + new AutoCaster(Caster::default()) + ); + + self::assertTrue($filter->keep(new Partition('foo', '2021-01-02'))); + self::assertFalse($filter->keep(new Partition('foo', '2020-12-31'))); + } + + public function test_filtering_datetime_partitions_by_string_value() : void + { + $filter = new ScalarFunctionFilter( + ref('foo')->greaterThan(lit('2021-01-01')), + new NativeEntryFactory(), + new AutoCaster(Caster::default()) + ); + + $this->expectExceptionMessage("Can't compare '(datetime > string)' due to data type mismatch."); + self::assertTrue($filter->keep(new Partition('foo', '2021-01-02'))); + } + public function test_filtering_when_partition_is_not_covered_by_any_filter() : void { $filter = new ScalarFunctionFilter( ref('foo')->greaterThan(lit(10)), - new NativeEntryFactory() + new NativeEntryFactory(), + new AutoCaster(Caster::default()) ); + $this->expectExceptionMessage('Entry "foo" does not exist. Did you mean one of the following? ["bar"]'); self::assertFalse($filter->keep(new Partition('bar', '100'))); } @@ -40,7 +68,8 @@ public function test_filtering_with_multiple_partitions_and_condition() : void ref('foo')->greaterThanEqual(lit(100)), ref('bar')->greaterThanEqual(lit(100)) ), - new NativeEntryFactory() + new NativeEntryFactory(), + new AutoCaster(Caster::default()) ); self::assertTrue($filter->keep(new Partition('foo', '100'), new Partition('bar', '100'))); @@ -56,7 +85,8 @@ public function test_filtering_with_multiple_partitions_or_condition() : void ref('foo')->greaterThanEqual(lit(100)), ref('bar')->greaterThanEqual(lit(100)) ), - new NativeEntryFactory() + new NativeEntryFactory(), + new AutoCaster(Caster::default()) ); self::assertTrue($filter->keep(new Partition('foo', '100'), new Partition('bar', '100')));