From 55c4d7532a3b4f9b4578299a17627107a67e6b47 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Thu, 28 Dec 2023 20:25:44 +0100 Subject: [PATCH] Partition pruning optimizer (#887) * Partitions cleanup & fixes * Removed ScalarFunctions chain * Introduced FunctionChain interface * Moved all scalar function methods to abstract class * Removed partitions filter from FlowContext * Removed double partitioning & removed partitions from FlowContext * Added CompositeScalarFunctionIterator * Added GroupedDataFrame * Apply filter to partitions through partition pruning optimizer * Added partition pruning optimization tests * Reduced MSI * Updated examples * Fixed broken examples * Reverted PartitionPruningOptimization * Fixed exampleS * Removed tracking scalar function roots --- UPGRADE.md | 2 +- composer.lock | 30 +- .../topics/types/csv/csv_read_partitioned.php | 3 - .../types/csv/csv_read_partitioned_filter.php | 3 +- infection.json | 4 +- .../Adapter/Avro/FlixTech/AvroExtractor.php | 12 +- .../ETL/Adapter/Avro/FlixTech/AvroLoader.php | 5 - .../Avro/Tests/Integration/AvroTest.php | 29 -- .../src/Flow/ETL/Adapter/CSV/CSVExtractor.php | 10 +- .../src/Flow/ETL/Adapter/CSV/CSVLoader.php | 6 +- .../CSV/Tests/Integration/CSVLoaderTest.php | 2 +- .../ETL/Adapter/Filesystem/FlysystemFS.php | 4 +- .../Filesystem/RemoteFileListExtractor.php | 4 +- .../JSON/JSONMachine/JsonExtractor.php | 10 +- .../src/Flow/ETL/Adapter/JSON/JsonLoader.php | 33 +- .../JSON/Tests/Integration/JsonLoaderTest.php | 2 +- .../ETL/Adapter/Parquet/ParquetExtractor.php | 11 +- .../ETL/Adapter/Parquet/ParquetLoader.php | 24 +- .../Flow/ETL/Adapter/Text/TextExtractor.php | 10 +- .../src/Flow/ETL/Adapter/Text/TextLoader.php | 20 +- .../ETL/Adapter/XML/XMLReaderExtractor.php | 28 +- src/core/etl/src/Flow/ETL/ConfigBuilder.php | 5 +- src/core/etl/src/Flow/ETL/DSL/functions.php | 41 +- src/core/etl/src/Flow/ETL/DataFrame.php | 50 +- .../Flow/ETL/DataFrame/GroupedDataFrame.php | 40 ++ .../ETL/DataFrame/PartitionedDataFrame.php | 64 +++ .../Flow/ETL/Extractor/PartitionFiltering.php | 34 ++ .../ETL/Extractor/PartitionsExtractor.php | 12 + src/core/etl/src/Flow/ETL/Filesystem.php | 3 +- .../Flow/ETL/Filesystem/LocalFilesystem.php | 7 +- src/core/etl/src/Flow/ETL/Filesystem/Path.php | 8 +- .../etl/src/Flow/ETL/Filesystem/Paths.php | 38 ++ src/core/etl/src/Flow/ETL/FlowContext.php | 34 -- src/core/etl/src/Flow/ETL/Function/All.php | 7 +- src/core/etl/src/Flow/ETL/Function/Any.php | 10 +- .../etl/src/Flow/ETL/Function/ArrayExists.php | 2 +- .../etl/src/Flow/ETL/Function/ArrayExpand.php | 4 +- .../etl/src/Flow/ETL/Function/ArrayGet.php | 2 +- .../Flow/ETL/Function/ArrayGetCollection.php | 2 +- .../src/Flow/ETL/Function/ArrayKeyRename.php | 2 +- .../ETL/Function/ArrayKeysStyleConvert.php | 2 +- .../etl/src/Flow/ETL/Function/ArrayMerge.php | 2 +- .../ETL/Function/ArrayMergeCollection.php | 2 +- .../src/Flow/ETL/Function/ArrayReverse.php | 2 +- .../etl/src/Flow/ETL/Function/ArraySort.php | 2 +- .../etl/src/Flow/ETL/Function/ArrayUnpack.php | 4 +- .../etl/src/Flow/ETL/Function/CallMethod.php | 2 +- .../etl/src/Flow/ETL/Function/Capitalize.php | 2 +- src/core/etl/src/Flow/ETL/Function/Cast.php | 2 +- .../etl/src/Flow/ETL/Function/Combine.php | 2 +- .../ETL/Function/CompositeScalarFunction.php | 11 + src/core/etl/src/Flow/ETL/Function/Concat.php | 6 +- .../etl/src/Flow/ETL/Function/Contains.php | 2 +- .../Flow/ETL/Function/DOMNodeAttribute.php | 2 +- .../src/Flow/ETL/Function/DOMNodeValue.php | 2 +- .../src/Flow/ETL/Function/DateTimeFormat.php | 2 +- src/core/etl/src/Flow/ETL/Function/Divide.php | 2 +- .../etl/src/Flow/ETL/Function/EndsWith.php | 2 +- .../Flow/ETL/Function/EntryScalarFunction.php | 417 ----------------- src/core/etl/src/Flow/ETL/Function/Equals.php | 2 +- src/core/etl/src/Flow/ETL/Function/Exists.php | 2 +- .../etl/src/Flow/ETL/Function/GreaterThan.php | 2 +- .../Flow/ETL/Function/GreaterThanEqual.php | 2 +- src/core/etl/src/Flow/ETL/Function/Hash.php | 2 +- src/core/etl/src/Flow/ETL/Function/IsIn.php | 2 +- .../etl/src/Flow/ETL/Function/IsNotNull.php | 2 +- .../src/Flow/ETL/Function/IsNotNumeric.php | 2 +- src/core/etl/src/Flow/ETL/Function/IsNull.php | 2 +- .../etl/src/Flow/ETL/Function/IsNumeric.php | 2 +- src/core/etl/src/Flow/ETL/Function/IsType.php | 2 +- .../etl/src/Flow/ETL/Function/JsonDecode.php | 2 +- .../etl/src/Flow/ETL/Function/JsonEncode.php | 2 +- .../etl/src/Flow/ETL/Function/LessThan.php | 2 +- .../src/Flow/ETL/Function/LessThanEqual.php | 2 +- .../etl/src/Flow/ETL/Function/Literal.php | 2 +- src/core/etl/src/Flow/ETL/Function/Minus.php | 2 +- src/core/etl/src/Flow/ETL/Function/Mod.php | 2 +- .../etl/src/Flow/ETL/Function/Multiply.php | 2 +- src/core/etl/src/Flow/ETL/Function/Not.php | 2 +- .../etl/src/Flow/ETL/Function/NotEquals.php | 2 +- .../etl/src/Flow/ETL/Function/NotSame.php | 2 +- src/core/etl/src/Flow/ETL/Function/Now.php | 2 +- .../src/Flow/ETL/Function/NumberFormat.php | 2 +- .../etl/src/Flow/ETL/Function/Optional.php | 4 +- src/core/etl/src/Flow/ETL/Function/Plus.php | 2 +- src/core/etl/src/Flow/ETL/Function/Power.php | 2 +- .../etl/src/Flow/ETL/Function/PregMatch.php | 2 +- .../src/Flow/ETL/Function/PregMatchAll.php | 2 +- .../etl/src/Flow/ETL/Function/PregReplace.php | 2 +- src/core/etl/src/Flow/ETL/Function/Round.php | 2 +- src/core/etl/src/Flow/ETL/Function/Same.php | 2 +- .../etl/src/Flow/ETL/Function/Sanitize.php | 2 +- .../Function/ScalarFunction/ExpandResults.php | 2 +- .../Function/ScalarFunction/UnpackResults.php | 2 +- .../Flow/ETL/Function/ScalarFunctionChain.php | 429 ++++++++++++++++++ .../src/Flow/ETL/Function/ScalarFunctions.php | 62 --- src/core/etl/src/Flow/ETL/Function/Size.php | 2 +- src/core/etl/src/Flow/ETL/Function/Split.php | 2 +- .../etl/src/Flow/ETL/Function/Sprintf.php | 2 +- .../etl/src/Flow/ETL/Function/StartsWith.php | 2 +- src/core/etl/src/Flow/ETL/Function/StrPad.php | 2 +- .../etl/src/Flow/ETL/Function/StrReplace.php | 2 +- src/core/etl/src/Flow/ETL/Function/ToDate.php | 2 +- .../etl/src/Flow/ETL/Function/ToDateTime.php | 2 +- .../etl/src/Flow/ETL/Function/ToLower.php | 2 +- .../etl/src/Flow/ETL/Function/ToMoney.php | 2 +- .../etl/src/Flow/ETL/Function/ToTimeZone.php | 2 +- .../etl/src/Flow/ETL/Function/ToUpper.php | 2 +- src/core/etl/src/Flow/ETL/Function/Trim.php | 2 +- src/core/etl/src/Flow/ETL/Function/Ulid.php | 2 +- src/core/etl/src/Flow/ETL/Function/Uuid.php | 2 +- src/core/etl/src/Flow/ETL/Function/When.php | 2 +- src/core/etl/src/Flow/ETL/Function/XPath.php | 3 +- src/core/etl/src/Flow/ETL/Partition.php | 7 +- .../Flow/ETL/Partition/FiltersCollection.php | 40 ++ src/core/etl/src/Flow/ETL/Partitions.php | 11 + .../ETL/Pipeline/PartitioningPipeline.php | 17 +- .../etl/src/Flow/ETL/Row/EntryReference.php | 7 +- src/core/etl/src/Flow/ETL/Rows.php | 2 +- .../ScalarFunctionFilterTransformer.php | 2 +- .../Transformer/ScalarFunctionTransformer.php | 4 +- .../year=2022/month=12/day=30/file.txt | 1 + .../year=2022/month=12/day=31/file.txt | 1 + .../year=2023/month=1/day=1/file.txt | 1 + .../year=2023/month=1/day=2/file.txt | 1 + .../year=2023/month=1/day=3/file.txt | 1 + .../year=2023/month=1/day=4/file.txt | 1 + .../year=2023/month=1/day=5/file.txt | 1 + .../DataFrame/PartitioningTest.php | 82 ++-- .../Function/ArrayGetCollectionTest.php | 3 +- .../Integration/Function/ArrayGetTest.php | 3 +- .../Function/ArrayMergeCollectionTest.php | 3 +- .../Flow/ETL/Tests/Unit/DataFrameTest.php | 3 +- .../Unit/Function/ArrayGetCollectionTest.php | 2 +- .../Flow/ETL/Tests/Unit/Function/MathTest.php | 18 +- .../Unit/Function/ScalarFunctionsTest.php | 66 --- .../Flow/ETL/Tests/Unit/PartitionTest.php | 2 +- .../Optimizer/LimitOptimizationTest.php | 2 +- .../Flow/ETL/Tests/Unit/Stream/PathTest.php | 13 +- 139 files changed, 1020 insertions(+), 957 deletions(-) create mode 100644 src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php create mode 100644 src/core/etl/src/Flow/ETL/DataFrame/PartitionedDataFrame.php create mode 100644 src/core/etl/src/Flow/ETL/Extractor/PartitionFiltering.php create mode 100644 src/core/etl/src/Flow/ETL/Extractor/PartitionsExtractor.php create mode 100644 src/core/etl/src/Flow/ETL/Filesystem/Paths.php create mode 100644 src/core/etl/src/Flow/ETL/Function/CompositeScalarFunction.php delete mode 100644 src/core/etl/src/Flow/ETL/Function/EntryScalarFunction.php create mode 100644 src/core/etl/src/Flow/ETL/Function/ScalarFunctionChain.php delete mode 100644 src/core/etl/src/Flow/ETL/Function/ScalarFunctions.php create mode 100644 src/core/etl/src/Flow/ETL/Partition/FiltersCollection.php create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2022/month=12/day=30/file.txt create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2022/month=12/day=31/file.txt create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=1/file.txt create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=2/file.txt create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=3/file.txt create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=4/file.txt create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=5/file.txt delete mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/Function/ScalarFunctionsTest.php diff --git a/UPGRADE.md b/UPGRADE.md index 0619b07fa..91874d8c1 100644 --- a/UPGRADE.md +++ b/UPGRADE.md @@ -162,7 +162,7 @@ Transformers are a really powerful tool that was used in Flow since the beginnin We reworked most of the internal transformers to new scalar functions and entry scalar functions (based on the built-in functions), and we still internally use that powerful tool, but we don't expose it to end users, instead, we provide easy-to-use, covering all user needs functions. -All available functions can be found in [`ETL\Row\Function` folder](src/core/etl/src/Flow/ETL/Function) or in [`ETL\DSL\functions` file](src/core/etl/src/Flow/ETL/DSL/functions.php), and entry scalar functions are defined in [`EntryScalarFunction` trait](src/core/etl/src/Flow/ETL/Function/EntryScalarFunction.php). +All available functions can be found in [`ETL\Row\Function` folder](src/core/etl/src/Flow/ETL/Function) or in [`ETL\DSL\functions` file](src/core/etl/src/Flow/ETL/DSL/functions.php), and entry scalar functions are defined in `EntryScalarFunction`. To see what transformers are available see [`ETL\DSL\Transform` class](src/core/etl/src/Flow/ETL/DSL/Transform.php). diff --git a/composer.lock b/composer.lock index 6b7e48f6a..d8753e439 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "bb4cc755a787ffc3bde1e2aa8132a21c", + "content-hash": "37963f1ae47d70937a6f218f968a5522", "packages": [ { "name": "aeon-php/calendar", @@ -58,23 +58,23 @@ }, { "name": "clue/stream-filter", - "version": "v1.6.0", + "version": "v1.7.0", "source": { "type": "git", "url": "https://github.com/clue/stream-filter.git", - "reference": "d6169430c7731d8509da7aecd0af756a5747b78e" + "reference": "049509fef80032cb3f051595029ab75b49a3c2f7" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/clue/stream-filter/zipball/d6169430c7731d8509da7aecd0af756a5747b78e", - "reference": "d6169430c7731d8509da7aecd0af756a5747b78e", + "url": "https://api.github.com/repos/clue/stream-filter/zipball/049509fef80032cb3f051595029ab75b49a3c2f7", + "reference": "049509fef80032cb3f051595029ab75b49a3c2f7", "shasum": "" }, "require": { "php": ">=5.3" }, "require-dev": { - "phpunit/phpunit": "^9.3 || ^5.7 || ^4.8.36" + "phpunit/phpunit": "^9.6 || ^5.7 || ^4.8.36" }, "type": "library", "autoload": { @@ -96,7 +96,7 @@ } ], "description": "A simple and modern approach to stream filtering in PHP", - "homepage": "https://github.com/clue/php-stream-filter", + "homepage": "https://github.com/clue/stream-filter", "keywords": [ "bucket brigade", "callback", @@ -108,7 +108,7 @@ ], "support": { "issues": "https://github.com/clue/stream-filter/issues", - "source": "https://github.com/clue/stream-filter/tree/v1.6.0" + "source": "https://github.com/clue/stream-filter/tree/v1.7.0" }, "funding": [ { @@ -120,7 +120,7 @@ "type": "github" } ], - "time": "2022-02-21T13:15:14+00:00" + "time": "2023-12-20T15:40:13+00:00" }, { "name": "coduo/php-humanizer", @@ -3822,16 +3822,16 @@ }, { "name": "aws/aws-sdk-php", - "version": "3.294.2", + "version": "3.294.4", "source": { "type": "git", "url": "https://github.com/aws/aws-sdk-php.git", - "reference": "e6a63e39fed0fd9fb553af42e99aaf8d7c104c88" + "reference": "4f59bf50aa445fc3ec0b10648b205dd2465e9bec" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/aws/aws-sdk-php/zipball/e6a63e39fed0fd9fb553af42e99aaf8d7c104c88", - "reference": "e6a63e39fed0fd9fb553af42e99aaf8d7c104c88", + "url": "https://api.github.com/repos/aws/aws-sdk-php/zipball/4f59bf50aa445fc3ec0b10648b205dd2465e9bec", + "reference": "4f59bf50aa445fc3ec0b10648b205dd2465e9bec", "shasum": "" }, "require": { @@ -3911,9 +3911,9 @@ "support": { "forum": "https://forums.aws.amazon.com/forum.jspa?forumID=80", "issues": "https://github.com/aws/aws-sdk-php/issues", - "source": "https://github.com/aws/aws-sdk-php/tree/3.294.2" + "source": "https://github.com/aws/aws-sdk-php/tree/3.294.4" }, - "time": "2023-12-18T19:11:16+00:00" + "time": "2023-12-20T19:21:19+00:00" }, { "name": "brick/math", diff --git a/examples/topics/types/csv/csv_read_partitioned.php b/examples/topics/types/csv/csv_read_partitioned.php index ece9f64e1..c581a5036 100644 --- a/examples/topics/types/csv/csv_read_partitioned.php +++ b/examples/topics/types/csv/csv_read_partitioned.php @@ -3,7 +3,6 @@ declare(strict_types=1); use function Flow\ETL\Adapter\CSV\from_csv; -use function Flow\ETL\DSL\ref; use function Flow\ETL\DSL\to_output; use Aeon\Calendar\Stopwatch; use Flow\ETL\Flow; @@ -12,8 +11,6 @@ $flow = (new Flow()) ->read(from_csv(__FLOW_DATA__ . '/partitioned')) - ->collect() - ->sortBy(ref('id')) ->write(to_output()); if ($_ENV['FLOW_PHAR_APP'] ?? false) { diff --git a/examples/topics/types/csv/csv_read_partitioned_filter.php b/examples/topics/types/csv/csv_read_partitioned_filter.php index 9db1be0ff..ffb1674ee 100644 --- a/examples/topics/types/csv/csv_read_partitioned_filter.php +++ b/examples/topics/types/csv/csv_read_partitioned_filter.php @@ -13,8 +13,9 @@ $flow = (new Flow()) ->read(from_csv(__FLOW_DATA__ . '/partitioned')) - ->collect() + ->filterPartitions(ref('country')->equals(lit('pl'))) ->filterPartitions(ref('t_shirt_color')->equals(lit('green'))) + ->collect() ->sortBy(ref('id')) ->write(to_output()); diff --git a/infection.json b/infection.json index 1fd9281c0..4cfd03008 100644 --- a/infection.json +++ b/infection.json @@ -82,6 +82,6 @@ }, "tmpDir": "var/infection/cache", "testFrameworkOptions": "--testsuite=unit", - "minMsi": 50, - "minCoveredMsi": 50 + "minMsi": 40, + "minCoveredMsi": 40 } diff --git a/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php b/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php index ceab3cbc9..938fdf7ba 100644 --- a/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php +++ b/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php @@ -6,16 +6,20 @@ use function Flow\ETL\DSL\array_to_rows; use Flow\ETL\Extractor; +use Flow\ETL\Extractor\FileExtractor; use Flow\ETL\Extractor\Limitable; use Flow\ETL\Extractor\LimitableExtractor; +use Flow\ETL\Extractor\PartitionFiltering; +use Flow\ETL\Extractor\PartitionsExtractor; use Flow\ETL\Extractor\Signal; use Flow\ETL\Filesystem\Path; use Flow\ETL\Filesystem\Stream\Mode; use Flow\ETL\FlowContext; -final class AvroExtractor implements Extractor, Extractor\FileExtractor, LimitableExtractor +final class AvroExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor { use Limitable; + use PartitionFiltering; public function __construct( private readonly Path $path @@ -27,9 +31,7 @@ public function extract(FlowContext $context) : \Generator { $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); - foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) { - $partitions = $filePath->partitions(); - + foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $filePath) { $reader = new \AvroDataIOReader( new AvroResource( $context->streams()->fs()->open( @@ -49,7 +51,7 @@ public function extract(FlowContext $context) : \Generator $row['_input_file_uri'] = $filePath->uri(); } - $signal = yield array_to_rows($row, $context->entryFactory(), $partitions); + $signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions()); $this->countRow(); if ($signal === Signal::STOP || $this->reachedLimit()) { diff --git a/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroLoader.php b/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroLoader.php index 6b50c2d72..ae709c28c 100644 --- a/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroLoader.php +++ b/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroLoader.php @@ -4,7 +4,6 @@ namespace Flow\ETL\Adapter\Avro\FlixTech; -use Flow\ETL\Exception\RuntimeException; use Flow\ETL\Filesystem\Path; use Flow\ETL\FlowContext; use Flow\ETL\Loader; @@ -65,10 +64,6 @@ public function destination() : Path public function load(Rows $rows, FlowContext $context) : void { - if ($context->partitionEntries()->count()) { - throw new RuntimeException('Partitioning is not supported yet'); - } - if ($this->schema === null) { if ($this->inferredSchema === null) { $this->inferredSchema = $rows->schema(); diff --git a/src/adapter/etl-adapter-avro/tests/Flow/ETL/Adapter/Avro/Tests/Integration/AvroTest.php b/src/adapter/etl-adapter-avro/tests/Flow/ETL/Adapter/Avro/Tests/Integration/AvroTest.php index 884750502..73bfce711 100644 --- a/src/adapter/etl-adapter-avro/tests/Flow/ETL/Adapter/Avro/Tests/Integration/AvroTest.php +++ b/src/adapter/etl-adapter-avro/tests/Flow/ETL/Adapter/Avro/Tests/Integration/AvroTest.php @@ -60,35 +60,6 @@ public function test_limit() : void ); } - public function test_partitioning() : void - { - $this->expectExceptionMessage('Partitioning is not supported yet'); - - $this->removeFile($path = \sys_get_temp_dir() . '/file.avro'); - - df() - ->read(from_rows( - $rows = new Rows( - ...\array_map(function (int $i) : Row { - return Row::create( - int_entry('integer', $i), - float_entry('float', 1.5), - str_entry('string', 'name_' . $i), - bool_entry('boolean', true), - datetime_entry('datetime', new \DateTimeImmutable()), - json_object_entry('json_object', ['id' => 1, 'name' => 'test']), - json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]), - list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())), - list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class))), - ); - }, \range(1, 100)) - ) - )) - ->partitionBy('integer') - ->write(to_avro($path)) - ->run(); - } - public function test_safe_writing_and_reading_avro_with_all_supported_types() : void { $this->cleanDirectory($path = \sys_get_temp_dir() . '/directory.avro'); diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php index a4b7b1c9f..2eddee69a 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php @@ -9,14 +9,17 @@ use Flow\ETL\Extractor\FileExtractor; use Flow\ETL\Extractor\Limitable; use Flow\ETL\Extractor\LimitableExtractor; +use Flow\ETL\Extractor\PartitionFiltering; +use Flow\ETL\Extractor\PartitionsExtractor; use Flow\ETL\Extractor\Signal; use Flow\ETL\Filesystem\Path; use Flow\ETL\Filesystem\Stream\Mode; use Flow\ETL\FlowContext; -final class CSVExtractor implements Extractor, FileExtractor, LimitableExtractor +final class CSVExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor { use Limitable; + use PartitionFiltering; /** * @param int<0, max> $charactersReadInLine @@ -37,8 +40,7 @@ public function extract(FlowContext $context) : \Generator { $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); - foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $path) { - $partitions = $path->partitions(); + foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $path) { $stream = $context->streams()->fs()->open($path, Mode::READ); $headers = []; @@ -90,7 +92,7 @@ public function extract(FlowContext $context) : \Generator $row['_input_file_uri'] = $stream->path()->uri(); } - $signal = yield array_to_rows($row, $context->entryFactory(), $partitions); + $signal = yield array_to_rows($row, $context->entryFactory(), $path->partitions()); $this->countRow(); if ($signal === Signal::STOP || $this->reachedLimit()) { diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVLoader.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVLoader.php index 37b7ec982..f577b0767 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVLoader.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVLoader.php @@ -79,10 +79,8 @@ public function load(Rows $rows, FlowContext $context) : void $headers = $rows->first()->entries()->map(fn (Entry $entry) => $entry->name()); - if ($context->partitionEntries()->count()) { - foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) { - $this->write($partitionedRows, $headers, $context, $partitionedRows->partitions()->toArray()); - } + if ($rows->partitions()->count()) { + $this->write($rows, $headers, $context, $rows->partitions()->toArray()); } else { $this->write($rows, $headers, $context, []); } diff --git a/src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVLoaderTest.php b/src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVLoaderTest.php index d0b38612c..90583faa8 100644 --- a/src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVLoaderTest.php +++ b/src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVLoaderTest.php @@ -146,8 +146,8 @@ public function test_loading_csv_with_partitioning() : void Row::create(int_entry('id', 4), int_entry('group', 2)), ) ) - ->load(to_csv($path)) ->partitionBy('group') + ->load(to_csv($path)) ->run(); $partitions = \array_values(\array_diff(\scandir($path), ['..', '.'])); diff --git a/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/FlysystemFS.php b/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/FlysystemFS.php index 73fd778f5..5f9fa4258 100644 --- a/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/FlysystemFS.php +++ b/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/FlysystemFS.php @@ -142,7 +142,7 @@ public function rm(Path $path) : void * * @return \Generator */ - public function scan(Path $path, PartitionFilter $partitionFilter) : \Generator + public function scan(Path $path, PartitionFilter $partitionFilter = new NoopFilter()) : \Generator { $fs = $this->factory->create($path); @@ -163,7 +163,7 @@ public function scan(Path $path, PartitionFilter $partitionFilter) : \Generator } } - return $partitionFilter->keep(...(new Path(DIRECTORY_SEPARATOR . $file->path()))->partitions()); + return $partitionFilter->keep(...(new Path(DIRECTORY_SEPARATOR . $file->path()))->partitions()->toArray()); }; /** diff --git a/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/RemoteFileListExtractor.php b/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/RemoteFileListExtractor.php index 7a9fc639a..fac64b5c7 100644 --- a/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/RemoteFileListExtractor.php +++ b/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/RemoteFileListExtractor.php @@ -5,14 +5,16 @@ use function Flow\ETL\DSL\array_to_rows; use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Extractor; +use Flow\ETL\Extractor\FileExtractor; use Flow\ETL\Extractor\Limitable; +use Flow\ETL\Extractor\LimitableExtractor; use Flow\ETL\Extractor\Signal; use Flow\ETL\Filesystem\Path; use Flow\ETL\FlowContext; use League\Flysystem\DirectoryAttributes; use League\Flysystem\FileAttributes; -final class RemoteFileListExtractor implements Extractor, Extractor\FileExtractor, Extractor\LimitableExtractor +final class RemoteFileListExtractor implements Extractor, FileExtractor, LimitableExtractor { use Limitable; diff --git a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php index 81066fe77..ae701b072 100644 --- a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php +++ b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php @@ -9,6 +9,8 @@ use Flow\ETL\Extractor\FileExtractor; use Flow\ETL\Extractor\Limitable; use Flow\ETL\Extractor\LimitableExtractor; +use Flow\ETL\Extractor\PartitionFiltering; +use Flow\ETL\Extractor\PartitionsExtractor; use Flow\ETL\Extractor\Signal; use Flow\ETL\Filesystem\Path; use Flow\ETL\Filesystem\Stream\Mode; @@ -16,9 +18,10 @@ use JsonMachine\Items; use JsonMachine\JsonDecoder\ExtJsonDecoder; -final class JsonExtractor implements Extractor, FileExtractor, LimitableExtractor +final class JsonExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor { use Limitable; + use PartitionFiltering; public function __construct( private readonly Path $path, @@ -31,8 +34,7 @@ public function extract(FlowContext $context) : \Generator { $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); - foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) { - $partitions = $filePath->partitions(); + foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $filePath) { /** * @var array|object $rowData @@ -44,7 +46,7 @@ public function extract(FlowContext $context) : \Generator $row['_input_file_uri'] = $filePath->uri(); } - $signal = yield array_to_rows($row, $context->entryFactory(), $partitions); + $signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions()); $this->countRow(); if ($signal === Signal::STOP || $this->reachedLimit()) { diff --git a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php index 562c3749d..401fffe5b 100644 --- a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php +++ b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php @@ -46,7 +46,7 @@ public function closure(FlowContext $context) : void { foreach ($context->streams() as $stream) { if ($stream->path()->extension() === 'json') { - $this->close($stream); + \fwrite($stream->resource(), ']'); } } @@ -60,10 +60,8 @@ public function destination() : Path public function load(Rows $rows, FlowContext $context) : void { - if ($context->partitionEntries()->count()) { - foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) { - $this->write($partitionedRows, $partitionedRows->partitions()->toArray(), $context); - } + if ($rows->partitions()->count()) { + $this->write($rows, $rows->partitions()->toArray(), $context); } else { $this->write($rows, [], $context); } @@ -79,7 +77,11 @@ public function write(Rows $nextRows, array $partitions, FlowContext $context) : if (!$streams->isOpen($this->path, $partitions)) { $stream = $streams->open($this->path, 'json', $context->appendSafe(), $partitions); - $this->init($stream); + if (!\array_key_exists($stream->path()->path(), $this->writes)) { + $this->writes[$stream->path()->path()] = 0; + } + + \fwrite($stream->resource(), '['); } else { $stream = $streams->open($this->path, 'json', $context->appendSafe(), $partitions); } @@ -107,23 +109,4 @@ public function writeJSON(Rows $rows, FileStream $stream) : void $this->writes[$stream->path()->path()]++; } - - private function close(FileStream $stream) : void - { - \fwrite($stream->resource(), ']'); - } - - /** - * @param FileStream $stream - * - * @throws RuntimeException - */ - private function init(FileStream $stream) : void - { - if (!\array_key_exists($stream->path()->path(), $this->writes)) { - $this->writes[$stream->path()->path()] = 0; - } - - \fwrite($stream->resource(), '['); - } } diff --git a/src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Integration/JsonLoaderTest.php b/src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Integration/JsonLoaderTest.php index 77e2b08b3..7c53f57d3 100644 --- a/src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Integration/JsonLoaderTest.php +++ b/src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Integration/JsonLoaderTest.php @@ -261,7 +261,7 @@ public function test_save_mode_throw_exception_on_partitioned_rows() : void ['id' => 5, 'partition' => 'b'], ])) ->partitionBy(ref('partition')) - ->saveMode(exception_if_exists()) + ->mode(exception_if_exists()) ->write(to_json($path)) ->run(); diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php index a0a892857..45a3f86a7 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php @@ -7,19 +7,22 @@ use Flow\ETL\Extractor\FileExtractor; use Flow\ETL\Extractor\Limitable; use Flow\ETL\Extractor\LimitableExtractor; +use Flow\ETL\Extractor\PartitionFiltering; +use Flow\ETL\Extractor\PartitionsExtractor; use Flow\ETL\Extractor\Signal; use Flow\ETL\Filesystem\Path; use Flow\ETL\Filesystem\Stream\Mode; use Flow\ETL\FlowContext; -use Flow\ETL\Partition; +use Flow\ETL\Partitions; use Flow\Parquet\ByteOrder; use Flow\Parquet\Options; use Flow\Parquet\ParquetFile; use Flow\Parquet\Reader; -final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtractor +final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor { use Limitable; + use PartitionFiltering; /** * @param Path $path @@ -65,11 +68,11 @@ public function source() : Path } /** - * @return \Generator}> + * @return \Generator */ private function readers(FlowContext $context) : \Generator { - foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) { + foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $filePath) { yield [ 'file' => (new Reader( byteOrder: $this->byteOrder, diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php index de2a5cb87..3d6f7511d 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php @@ -78,23 +78,21 @@ public function load(Rows $rows, FlowContext $context) : void $streams = $context->streams(); - if ($context->partitionEntries()->count()) { - foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) { + if ($rows->partitions()->count()) { - $stream = $streams->open($this->path, 'parquet', $context->appendSafe(), $partitionedRows->partitions()->toArray()); + $stream = $streams->open($this->path, 'parquet', $context->appendSafe(), $rows->partitions()->toArray()); - if (!\array_key_exists($stream->path()->uri(), $this->writers)) { - $this->writers[$stream->path()->uri()] = new Writer( - compression: $this->compressions, - options: $this->options - ); + if (!\array_key_exists($stream->path()->uri(), $this->writers)) { + $this->writers[$stream->path()->uri()] = new Writer( + compression: $this->compressions, + options: $this->options + ); - $this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema())); - } + $this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema())); + } - foreach ($partitionedRows as $row) { - $this->writers[$stream->path()->uri()]->writeRow($row->toArray()); - } + foreach ($rows as $row) { + $this->writers[$stream->path()->uri()]->writeRow($row->toArray()); } } else { $stream = $streams->open($this->path, 'parquet', $context->appendSafe()); diff --git a/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextExtractor.php b/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextExtractor.php index 07d0bb415..750b4e9b9 100644 --- a/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextExtractor.php +++ b/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextExtractor.php @@ -9,14 +9,17 @@ use Flow\ETL\Extractor\FileExtractor; use Flow\ETL\Extractor\Limitable; use Flow\ETL\Extractor\LimitableExtractor; +use Flow\ETL\Extractor\PartitionFiltering; +use Flow\ETL\Extractor\PartitionsExtractor; use Flow\ETL\Extractor\Signal; use Flow\ETL\Filesystem\Path; use Flow\ETL\Filesystem\Stream\Mode; use Flow\ETL\FlowContext; -final class TextExtractor implements Extractor, FileExtractor, LimitableExtractor +final class TextExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor { use Limitable; + use PartitionFiltering; public function __construct( private readonly Path $path, @@ -28,8 +31,7 @@ public function extract(FlowContext $context) : \Generator { $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); - foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) { - $partitions = $filePath->partitions(); + foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $filePath) { $fileStream = $context->streams()->fs()->open($filePath, Mode::READ); $rowData = \fgets($fileStream->resource()); @@ -45,7 +47,7 @@ public function extract(FlowContext $context) : \Generator $row = [['text' => \rtrim($rowData)]]; } - $signal = yield array_to_rows($row, $context->entryFactory(), $partitions); + $signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions()); $this->countRow(); diff --git a/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextLoader.php b/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextLoader.php index 64f1e93a3..9ff3cabb9 100644 --- a/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextLoader.php +++ b/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextLoader.php @@ -54,18 +54,16 @@ public function destination() : Path public function load(Rows $rows, FlowContext $context) : void { - if ($context->partitionEntries()->count()) { - foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) { - foreach ($partitionedRows as $row) { - if ($row->entries()->count() > 1) { - throw new RuntimeException(\sprintf('Text data loader supports only a single entry rows, and you have %d rows.', $row->entries()->count())); - } - - \fwrite( - $context->streams()->open($this->path, 'text', $context->appendSafe(), $partitionedRows->partitions()->toArray())->resource(), - $row->entries()->all()[0]->toString() . $this->newLineSeparator - ); + if ($rows->partitions()->count()) { + foreach ($rows as $row) { + if ($row->entries()->count() > 1) { + throw new RuntimeException(\sprintf('Text data loader supports only a single entry rows, and you have %d rows.', $row->entries()->count())); } + + \fwrite( + $context->streams()->open($this->path, 'text', $context->appendSafe(), $rows->partitions()->toArray())->resource(), + $row->entries()->all()[0]->toString() . $this->newLineSeparator + ); } } else { foreach ($rows as $row) { diff --git a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php index 01b5506ab..ad5d69d2d 100644 --- a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php +++ b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php @@ -4,21 +4,21 @@ namespace Flow\ETL\Adapter\XML; -use function Flow\ETL\DSL\str_entry; -use function Flow\ETL\DSL\xml_entry; +use function Flow\ETL\DSL\array_to_rows; use Flow\ETL\Extractor; use Flow\ETL\Extractor\FileExtractor; use Flow\ETL\Extractor\Limitable; use Flow\ETL\Extractor\LimitableExtractor; +use Flow\ETL\Extractor\PartitionFiltering; +use Flow\ETL\Extractor\PartitionsExtractor; use Flow\ETL\Extractor\Signal; use Flow\ETL\Filesystem\Path; use Flow\ETL\FlowContext; -use Flow\ETL\Row; -use Flow\ETL\Rows; -final class XMLReaderExtractor implements Extractor, FileExtractor, LimitableExtractor +final class XMLReaderExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor { use Limitable; + use PartitionFiltering; /** * In order to iterate only over nodes us root/elements/element. @@ -46,9 +46,7 @@ public function extract(FlowContext $context) : \Generator { $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); - foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) { - $partitions = $filePath->partitions(); - + foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $filePath) { $xmlReader = new \XMLReader(); $xmlReader->open($filePath->path()); @@ -78,17 +76,15 @@ public function extract(FlowContext $context) : \Generator $node->loadXML($xmlReader->readOuterXml()); if ($shouldPutInputIntoRows) { - $row = Row::create( - xml_entry('node', $node), - str_entry('_input_file_uri', $filePath->uri()) - ); + $rowData = [ + 'node' => $node, + '_input_file_uri' => $filePath->uri(), + ]; } else { - $row = Row::create(xml_entry('node', $node)); + $rowData = ['node' => $node]; } - $signal = yield \count($partitions) - ? Rows::partitioned([$row], $partitions) - : new Rows($row); + $signal = yield array_to_rows($rowData, $context->entryFactory(), $filePath->partitions()); $this->countRow(); diff --git a/src/core/etl/src/Flow/ETL/ConfigBuilder.php b/src/core/etl/src/Flow/ETL/ConfigBuilder.php index 9e9d5edcb..7e8af91ab 100644 --- a/src/core/etl/src/Flow/ETL/ConfigBuilder.php +++ b/src/core/etl/src/Flow/ETL/ConfigBuilder.php @@ -48,6 +48,7 @@ public function build() : Config { $this->id ??= \uniqid('flow_php', true); $this->serializer ??= new CompressingSerializer(); + $entryFactory = new NativeEntryFactory(); $cachePath = \is_string(\getenv(Config::CACHE_DIR_ENV)) && \realpath(\getenv(Config::CACHE_DIR_ENV)) ? \getenv(Config::CACHE_DIR_ENV) : \sys_get_temp_dir() . '/flow_php/'; @@ -84,7 +85,7 @@ public function build() : Config $this->optimizer ??= new Optimizer( new Optimizer\LimitOptimization(), - new Optimizer\BatchSizeOptimization(batchSize: 1000), + new Optimizer\BatchSizeOptimization(batchSize: 1000) ); return new Config( @@ -95,7 +96,7 @@ public function build() : Config new FilesystemStreams($this->filesystem), $this->optimizer, $this->putInputIntoRows, - new NativeEntryFactory() + $entryFactory ); } diff --git a/src/core/etl/src/Flow/ETL/DSL/functions.php b/src/core/etl/src/Flow/ETL/DSL/functions.php index 2326fdef5..0962c3070 100644 --- a/src/core/etl/src/Flow/ETL/DSL/functions.php +++ b/src/core/etl/src/Flow/ETL/DSL/functions.php @@ -699,12 +699,12 @@ function call_method(ScalarFunction $object, ScalarFunction $method, ScalarFunct return new CallMethod($object, $method, ...$params); } -function all(ScalarFunction ...$functions) : ScalarFunction +function all(ScalarFunction ...$functions) : All { return new All(...$functions); } -function any(ScalarFunction ...$functions) : ScalarFunction +function any(ScalarFunction ...$functions) : Any { return new Any(...$functions); } @@ -801,10 +801,12 @@ function number_format(ScalarFunction $function, ?ScalarFunction $decimals = nul * @psalm-suppress PossiblyInvalidIterator * * @param array>|array $data - * @param array $partitions + * @param array|\Flow\ETL\Partitions $partitions */ -function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntryFactory(), array $partitions = []) : Rows +function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntryFactory(), array|\Flow\ETL\Partitions $partitions = []) : Rows { + $partitions = \is_array($partitions) ? new \Flow\ETL\Partitions(...$partitions) : $partitions; + $isRows = true; foreach ($data as $v) { @@ -819,27 +821,40 @@ function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntry $entries = []; foreach ($data as $key => $value) { - $entries[] = $entryFactory->create(\is_int($key) ? 'e' . \str_pad((string) $key, 2, '0', STR_PAD_LEFT) : $key, $value); + $name = \is_int($key) ? 'e' . \str_pad((string) $key, 2, '0', STR_PAD_LEFT) : $key; + + $entries[$name] = $entryFactory->create($name, $value); + } + + foreach ($partitions as $partition) { + if (!\array_key_exists($partition->name, $entries)) { + $entries[$partition->name] = $entryFactory->create($partition->name, $partition->value); + } } - return \count($partitions) - ? Rows::partitioned([Row::create(...$entries)], $partitions) - : new Rows(Row::create(...$entries)); + return Rows::partitioned([Row::create(...\array_values($entries))], $partitions); } + $rows = []; foreach ($data as $row) { $entries = []; foreach ($row as $column => $value) { - $entries[] = $entryFactory->create(\is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column, $value); + $name = \is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column; + $entries[$name] = $entryFactory->create(\is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column, $value); + } + + foreach ($partitions as $partition) { + if (!\array_key_exists($partition->name, $entries)) { + $entries[$partition->name] = $entryFactory->create($partition->name, $partition->value); + } } - $rows[] = Row::create(...$entries); + + $rows[] = Row::create(...\array_values($entries)); } - return \count($partitions) - ? Rows::partitioned($rows, $partitions) - : new Rows(...$rows); + return Rows::partitioned($rows, $partitions); } function rank() : Rank diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index 234a28e21..350b713c3 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -5,9 +5,12 @@ namespace Flow\ETL; use function Flow\ETL\DSL\to_output; +use Flow\ETL\DataFrame\GroupedDataFrame; +use Flow\ETL\DataFrame\PartitionedDataFrame; use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Exception\InvalidFileFormatException; use Flow\ETL\Exception\RuntimeException; +use Flow\ETL\Extractor\PartitionsExtractor; use Flow\ETL\Filesystem\SaveMode; use Flow\ETL\Formatter\AsciiTableFormatter; use Flow\ETL\Function\AggregatingFunction; @@ -120,7 +123,7 @@ public static function fromJson(string $json) : self * * @throws InvalidArgumentException */ - public function aggregate(AggregatingFunction ...$aggregations) : self + public function aggregate(AggregatingFunction ...$aggregations) : GroupedDataFrame { if (!$this->pipeline instanceof GroupByPipeline) { $this->pipeline = new GroupByPipeline(new GroupBy(), $this->pipeline); @@ -128,7 +131,7 @@ public function aggregate(AggregatingFunction ...$aggregations) : self $this->pipeline->groupBy->aggregate(...$aggregations); - return $this; + return new GroupedDataFrame($this); } /** @@ -319,18 +322,6 @@ public function fetch(?int $limit = null) : Rows $clone->limit($limit); } - if ($clone->context->partitionEntries()->count()) { - $rows = new Rows(); - - foreach ($clone->pipeline->process($clone->context) as $nextRows) { - if ($clone->context->partitionFilter()->keep(...$nextRows->partitions()->toArray())) { - $rows = $rows->merge($nextRows); - } - } - - return $rows; - } - $rows = new Rows(); foreach ($clone->pipeline->process($clone->context) as $nextRows) { @@ -352,15 +343,24 @@ public function filter(ScalarFunction $function) : self /** * @lazy + * + * @throws RuntimeException */ public function filterPartitions(Partition\PartitionFilter|ScalarFunction $filter) : self { + $extractor = $this->pipeline->source(); + + if (!$extractor instanceof PartitionsExtractor) { + throw new RuntimeException('filterPartitions can be used only with extractors that implement PartitionsExtractor interface'); + } + if ($filter instanceof Partition\PartitionFilter) { - $this->context->filterPartitions($filter); + $extractor->addPartitionFilter($filter); return $this; } - $this->context->filterPartitions(new ScalarFunctionFilter($filter, $this->context->entryFactory())); + + $extractor->addPartitionFilter(new ScalarFunctionFilter($filter, $this->context->entryFactory())); return $this; } @@ -450,11 +450,11 @@ public function getEachAsArray() : \Generator /** * @lazy */ - public function groupBy(string|Reference ...$entries) : self + public function groupBy(string|Reference ...$entries) : GroupedDataFrame { $this->pipeline = new GroupByPipeline(new GroupBy(...$entries), $this->pipeline); - return $this; + return new GroupedDataFrame($this); } /** @@ -591,14 +591,13 @@ public function parallelize(int $chunks) : self /** * @lazy */ - public function partitionBy(string|Reference $entry, string|Reference ...$entries) : self + public function partitionBy(string|Reference $entry, string|Reference ...$entries) : PartitionedDataFrame { \array_unshift($entries, $entry); - $this->context->partitionBy(...References::init(...$entries)->all()); - $this->pipeline = new PartitioningPipeline($this->pipeline); + $this->pipeline = new PartitioningPipeline($this->pipeline, References::init(...$entries)->all()); - return $this; + return new PartitionedDataFrame($this); } public function pivot(Reference $ref) : self @@ -848,7 +847,7 @@ public function void() : self /** * @lazy * - * @param array $refs + * @param array $refs */ public function withEntries(array $refs) : self { @@ -862,12 +861,11 @@ public function withEntries(array $refs) : self /** * @lazy */ - public function withEntry(string $entryName, Function\ScalarFunction|WindowFunction $ref) : self + public function withEntry(string $entryName, ScalarFunction|WindowFunction $ref) : self { if ($ref instanceof WindowFunction) { if (\count($ref->window()->partitions())) { - $this->context->partitionBy(...$ref->window()->partitions()); - $this->pipeline = new PartitioningPipeline($this->pipeline, $ref->window()->order()); + $this->pipeline = new PartitioningPipeline($this->pipeline, $ref->window()->partitions(), $ref->window()->order()); } else { $this->collect(); diff --git a/src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php new file mode 100644 index 000000000..e0067fd62 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php @@ -0,0 +1,40 @@ +pivot(...$arguments); + } + + if (\strtolower($name) === 'aggregate') { + return $this->aggregate(...$arguments); + } + + return $this->df->{$name}(...$arguments); + } + + public function aggregate(AggregatingFunction ...$aggregations) : self + { + $this->df->aggregate(...$aggregations); + + return $this; + } + + public function pivot(Reference $ref) : DataFrame + { + return $this->df->pivot($ref); + } +} diff --git a/src/core/etl/src/Flow/ETL/DataFrame/PartitionedDataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame/PartitionedDataFrame.php new file mode 100644 index 000000000..db2a8c3bf --- /dev/null +++ b/src/core/etl/src/Flow/ETL/DataFrame/PartitionedDataFrame.php @@ -0,0 +1,64 @@ +df->display($limit, $truncate, $formatter); + } + + public function fetch(?int $limit = null) : Rows + { + return $this->df->fetch($limit); + } + + public function get() : \Generator + { + return $this->df->get(); + } + + public function getAsArray() : \Generator + { + return $this->df->getAsArray(); + } + + public function getEach() : \Generator + { + return $this->df->getEach(); + } + + public function getEachAsArray() : \Generator + { + return $this->df->getEachAsArray(); + } + + public function load(Loader $loader) : DataFrame + { + return $this->write($loader); + } + + public function mode(SaveMode $mode) : self + { + $this->df->mode($mode); + + return $this; + } + + public function write(Loader $loader) : DataFrame + { + return $this->df->write($loader); + } +} diff --git a/src/core/etl/src/Flow/ETL/Extractor/PartitionFiltering.php b/src/core/etl/src/Flow/ETL/Extractor/PartitionFiltering.php new file mode 100644 index 000000000..94f3be699 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Extractor/PartitionFiltering.php @@ -0,0 +1,34 @@ +partitionFilter === null) { + $this->partitionFilter = $filter; + + return; + } + + if ($this->partitionFilter instanceof FiltersCollection) { + $this->partitionFilter = new FiltersCollection([...$this->partitionFilter->filters, $filter]); + + return; + } + + $this->partitionFilter = new FiltersCollection([$this->partitionFilter, $filter]); + } + + public function partitionFilter() : PartitionFilter + { + return $this->partitionFilter ?? new NoopFilter(); + } +} diff --git a/src/core/etl/src/Flow/ETL/Extractor/PartitionsExtractor.php b/src/core/etl/src/Flow/ETL/Extractor/PartitionsExtractor.php new file mode 100644 index 000000000..e75a55ed9 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Extractor/PartitionsExtractor.php @@ -0,0 +1,12 @@ + */ - public function scan(Path $path, PartitionFilter $partitionFilter) : \Generator; + public function scan(Path $path, PartitionFilter $partitionFilter = new NoopFilter()) : \Generator; } diff --git a/src/core/etl/src/Flow/ETL/Filesystem/LocalFilesystem.php b/src/core/etl/src/Flow/ETL/Filesystem/LocalFilesystem.php index 118835357..2728682b0 100644 --- a/src/core/etl/src/Flow/ETL/Filesystem/LocalFilesystem.php +++ b/src/core/etl/src/Flow/ETL/Filesystem/LocalFilesystem.php @@ -7,6 +7,7 @@ use Flow\ETL\Filesystem; use Flow\ETL\Filesystem\Stream\FileStream; use Flow\ETL\Filesystem\Stream\Mode; +use Flow\ETL\Partition\NoopFilter; use Flow\ETL\Partition\PartitionFilter; use Webmozart\Glob\Glob; @@ -121,7 +122,7 @@ public function rm(Path $path) : void } } - public function scan(Path $path, PartitionFilter $partitionFilter) : \Generator + public function scan(Path $path, PartitionFilter $partitionFilter = new NoopFilter()) : \Generator { if (!$path->isLocal()) { throw new RuntimeException(\sprintf('Path "%s" is not local', $path->uri())); @@ -139,7 +140,7 @@ public function scan(Path $path, PartitionFilter $partitionFilter) : \Generator continue; } - if ($partitionFilter->keep(...(Path::realpath($filePath, $path->options()))->partitions())) { + if ($partitionFilter->keep(...(Path::realpath($filePath, $path->options()))->partitions()->toArray())) { yield Path::realpath($filePath, $path->options()); } } @@ -152,7 +153,7 @@ public function scan(Path $path, PartitionFilter $partitionFilter) : \Generator continue; } - if ($partitionFilter->keep(...(Path::realpath($filePath, $path->options()))->partitions())) { + if ($partitionFilter->keep(...(Path::realpath($filePath, $path->options()))->partitions()->toArray())) { yield Path::realpath($filePath, $path->options()); } } diff --git a/src/core/etl/src/Flow/ETL/Filesystem/Path.php b/src/core/etl/src/Flow/ETL/Filesystem/Path.php index 2211dff66..ece477782 100644 --- a/src/core/etl/src/Flow/ETL/Filesystem/Path.php +++ b/src/core/etl/src/Flow/ETL/Filesystem/Path.php @@ -8,6 +8,7 @@ use Flow\ETL\Exception\RuntimeException; use Flow\ETL\Filesystem\Stream\ResourceContext; use Flow\ETL\Partition; +use Flow\ETL\Partitions; use Flow\Serializer\Serializable; /** @@ -252,13 +253,10 @@ public function parentDirectory() : self ); } - /** - * @return array - */ - public function partitions() : array + public function partitions() : Partitions { if ($this->isPathPattern($this->path)) { - return []; + return new Partitions(); } return Partition::fromUri($this->path); diff --git a/src/core/etl/src/Flow/ETL/Filesystem/Paths.php b/src/core/etl/src/Flow/ETL/Filesystem/Paths.php new file mode 100644 index 000000000..b4128fd70 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Filesystem/Paths.php @@ -0,0 +1,38 @@ + + */ + private readonly array $paths; + + public function __construct(Path ...$paths) + { + $this->paths = $paths; + $this->partitions = null; + } + + public function partitions() : Partitions + { + if ($this->partitions === null) { + $partitions = []; + + foreach ($this->paths as $path) { + foreach ($path->partitions() as $partition) { + $partitions[$partition->id()] = $partition; + } + } + + $this->partitions = new Partitions(...\array_values($partitions)); + } + + return $this->partitions; + } +} diff --git a/src/core/etl/src/Flow/ETL/FlowContext.php b/src/core/etl/src/Flow/ETL/FlowContext.php index aa7f47953..655bb4bd3 100644 --- a/src/core/etl/src/Flow/ETL/FlowContext.php +++ b/src/core/etl/src/Flow/ETL/FlowContext.php @@ -6,11 +6,7 @@ use Flow\ETL\ErrorHandler\ThrowError; use Flow\ETL\Filesystem\FilesystemStreams; -use Flow\ETL\Partition\NoopFilter; -use Flow\ETL\Partition\PartitionFilter; use Flow\ETL\Row\EntryFactory; -use Flow\ETL\Row\Reference; -use Flow\ETL\Row\References; use Flow\Serializer\Serializer; /** @@ -23,15 +19,9 @@ final class FlowContext private ErrorHandler $errorHandler; - private PartitionFilter $partitionFilter; - - private References $partitions; - public function __construct(public readonly Config $config) { - $this->partitionFilter = new NoopFilter(); $this->errorHandler = new ThrowError(); - $this->partitions = new References(); } public function appendSafe() : bool @@ -54,30 +44,6 @@ public function errorHandler() : ErrorHandler return $this->errorHandler; } - public function filterPartitions(PartitionFilter $filter) : self - { - $this->partitionFilter = $filter; - - return $this; - } - - public function partitionBy(string|Reference ...$entry) : self - { - $this->partitions = References::init(...$entry); - - return $this; - } - - public function partitionEntries() : References - { - return $this->partitions; - } - - public function partitionFilter() : PartitionFilter - { - return $this->partitionFilter; - } - public function serializer() : Serializer { return $this->config->serializer(); diff --git a/src/core/etl/src/Flow/ETL/Function/All.php b/src/core/etl/src/Flow/ETL/Function/All.php index a584e2296..e68ffa26d 100644 --- a/src/core/etl/src/Flow/ETL/Function/All.php +++ b/src/core/etl/src/Flow/ETL/Function/All.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class All implements ScalarFunction +final class All extends ScalarFunctionChain implements CompositeScalarFunction { /** * @var array @@ -29,4 +29,9 @@ public function eval(Row $row) : mixed return true; } + + public function functions() : array + { + return $this->refs; + } } diff --git a/src/core/etl/src/Flow/ETL/Function/Any.php b/src/core/etl/src/Flow/ETL/Function/Any.php index 585eb5307..083d54019 100644 --- a/src/core/etl/src/Flow/ETL/Function/Any.php +++ b/src/core/etl/src/Flow/ETL/Function/Any.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Any implements ScalarFunction +final class Any extends ScalarFunctionChain implements CompositeScalarFunction { /** * @var array @@ -29,4 +29,12 @@ public function eval(Row $row) : mixed return false; } + + /** + * @return array + */ + public function functions() : array + { + return $this->refs; + } } diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayExists.php b/src/core/etl/src/Flow/ETL/Function/ArrayExists.php index e0aa007a4..3de24662f 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayExists.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayExists.php @@ -8,7 +8,7 @@ use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Row; -final class ArrayExists implements ScalarFunction +final class ArrayExists extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayExpand.php b/src/core/etl/src/Flow/ETL/Function/ArrayExpand.php index 488fc9cae..2a67c3350 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayExpand.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayExpand.php @@ -7,7 +7,7 @@ use Flow\ETL\Function\ScalarFunction\ExpandResults; use Flow\ETL\Row; -final class ArrayExpand implements ExpandResults, ScalarFunction +final class ArrayExpand extends ScalarFunctionChain implements ExpandResults { public function __construct(private readonly ScalarFunction $ref, private readonly ArrayExpand\ArrayExpand $expand) { @@ -32,7 +32,7 @@ public function eval(Row $row) : mixed return $array; } - public function expand() : bool + public function expandResults() : bool { return true; } diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayGet.php b/src/core/etl/src/Flow/ETL/Function/ArrayGet.php index bc8a52aa4..fa6ff97ff 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayGet.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayGet.php @@ -8,7 +8,7 @@ use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Row; -final class ArrayGet implements ScalarFunction +final class ArrayGet extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayGetCollection.php b/src/core/etl/src/Flow/ETL/Function/ArrayGetCollection.php index d6d4f9e2f..f17f31613 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayGetCollection.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayGetCollection.php @@ -9,7 +9,7 @@ use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Row; -final class ArrayGetCollection implements ScalarFunction +final class ArrayGetCollection extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayKeyRename.php b/src/core/etl/src/Flow/ETL/Function/ArrayKeyRename.php index 8ca5114e3..aeb95921e 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayKeyRename.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayKeyRename.php @@ -7,7 +7,7 @@ use function Flow\ArrayDot\array_dot_rename; use Flow\ETL\Row; -final class ArrayKeyRename implements ScalarFunction +final class ArrayKeyRename extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayKeysStyleConvert.php b/src/core/etl/src/Flow/ETL/Function/ArrayKeysStyleConvert.php index 0df18dbeb..2a553c616 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayKeysStyleConvert.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayKeysStyleConvert.php @@ -9,7 +9,7 @@ use Flow\ETL\Row; use Jawira\CaseConverter\Convert; -final class ArrayKeysStyleConvert implements ScalarFunction +final class ArrayKeysStyleConvert extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayMerge.php b/src/core/etl/src/Flow/ETL/Function/ArrayMerge.php index 9353a71c3..889fc93c6 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayMerge.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayMerge.php @@ -9,7 +9,7 @@ /** * Scalar function that takes two other functions, checks if both of them are arrays and merges them. */ -final class ArrayMerge implements ScalarFunction +final class ArrayMerge extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $left, private readonly ScalarFunction $right) { diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayMergeCollection.php b/src/core/etl/src/Flow/ETL/Function/ArrayMergeCollection.php index 7e65799bd..da6aaa4ed 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayMergeCollection.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayMergeCollection.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class ArrayMergeCollection implements ScalarFunction +final class ArrayMergeCollection extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref) { diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayReverse.php b/src/core/etl/src/Flow/ETL/Function/ArrayReverse.php index 9e75eacf3..51c22e1c2 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayReverse.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayReverse.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class ArrayReverse implements ScalarFunction +final class ArrayReverse extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $left, private readonly bool $preserveKeys) { diff --git a/src/core/etl/src/Flow/ETL/Function/ArraySort.php b/src/core/etl/src/Flow/ETL/Function/ArraySort.php index 0c6e827e5..2633a69f5 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArraySort.php +++ b/src/core/etl/src/Flow/ETL/Function/ArraySort.php @@ -7,7 +7,7 @@ use Flow\ETL\Function\ArraySort\Sort; use Flow\ETL\Row; -final class ArraySort implements ScalarFunction +final class ArraySort extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ArrayUnpack.php b/src/core/etl/src/Flow/ETL/Function/ArrayUnpack.php index 948ae6762..4c38c0827 100644 --- a/src/core/etl/src/Flow/ETL/Function/ArrayUnpack.php +++ b/src/core/etl/src/Flow/ETL/Function/ArrayUnpack.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class ArrayUnpack implements ScalarFunction, ScalarFunction\UnpackResults +final class ArrayUnpack extends ScalarFunctionChain implements ScalarFunction\UnpackResults { public function __construct( private readonly ScalarFunction $ref, @@ -46,7 +46,7 @@ public function eval(Row $row) : mixed return $values; } - public function unpack() : bool + public function unpackResults() : bool { return true; } diff --git a/src/core/etl/src/Flow/ETL/Function/CallMethod.php b/src/core/etl/src/Flow/ETL/Function/CallMethod.php index 62e423a7d..a8dea0cda 100644 --- a/src/core/etl/src/Flow/ETL/Function/CallMethod.php +++ b/src/core/etl/src/Flow/ETL/Function/CallMethod.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class CallMethod implements ScalarFunction +final class CallMethod extends ScalarFunctionChain { /** * @var ScalarFunction[] diff --git a/src/core/etl/src/Flow/ETL/Function/Capitalize.php b/src/core/etl/src/Flow/ETL/Function/Capitalize.php index 11016f193..d356729ca 100644 --- a/src/core/etl/src/Flow/ETL/Function/Capitalize.php +++ b/src/core/etl/src/Flow/ETL/Function/Capitalize.php @@ -4,7 +4,7 @@ use Flow\ETL\Row; -final class Capitalize implements ScalarFunction +final class Capitalize extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref) { diff --git a/src/core/etl/src/Flow/ETL/Function/Cast.php b/src/core/etl/src/Flow/ETL/Function/Cast.php index a1c49eb20..5195c8fee 100644 --- a/src/core/etl/src/Flow/ETL/Function/Cast.php +++ b/src/core/etl/src/Flow/ETL/Function/Cast.php @@ -7,7 +7,7 @@ use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Row; -final class Cast implements ScalarFunction +final class Cast extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/Combine.php b/src/core/etl/src/Flow/ETL/Function/Combine.php index 6b5c269c1..1381a034b 100644 --- a/src/core/etl/src/Flow/ETL/Function/Combine.php +++ b/src/core/etl/src/Flow/ETL/Function/Combine.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Combine implements ScalarFunction +final class Combine extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $keys, diff --git a/src/core/etl/src/Flow/ETL/Function/CompositeScalarFunction.php b/src/core/etl/src/Flow/ETL/Function/CompositeScalarFunction.php new file mode 100644 index 000000000..edc66f2bb --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Function/CompositeScalarFunction.php @@ -0,0 +1,11 @@ + + */ + public function functions() : array; +} diff --git a/src/core/etl/src/Flow/ETL/Function/Concat.php b/src/core/etl/src/Flow/ETL/Function/Concat.php index 75887c88b..97472e016 100644 --- a/src/core/etl/src/Flow/ETL/Function/Concat.php +++ b/src/core/etl/src/Flow/ETL/Function/Concat.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Concat implements ScalarFunction +final class Concat extends ScalarFunctionChain { /** * @var array @@ -22,9 +22,7 @@ public function __construct( public function eval(Row $row) : mixed { $values = \array_map(function (ScalarFunction $ref) use ($row) : mixed { - $ref = new ScalarFunctions(new Cast($ref, 'string')); - - return $ref->eval($row); + return (new Cast($ref, 'string'))->eval($row); }, $this->refs); foreach ($values as $value) { diff --git a/src/core/etl/src/Flow/ETL/Function/Contains.php b/src/core/etl/src/Flow/ETL/Function/Contains.php index da2a9cd93..0d674a342 100644 --- a/src/core/etl/src/Flow/ETL/Function/Contains.php +++ b/src/core/etl/src/Flow/ETL/Function/Contains.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Contains implements ScalarFunction +final class Contains extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $haystack, private readonly ScalarFunction $needle) { diff --git a/src/core/etl/src/Flow/ETL/Function/DOMNodeAttribute.php b/src/core/etl/src/Flow/ETL/Function/DOMNodeAttribute.php index 8ae6e4536..226386d5c 100644 --- a/src/core/etl/src/Flow/ETL/Function/DOMNodeAttribute.php +++ b/src/core/etl/src/Flow/ETL/Function/DOMNodeAttribute.php @@ -4,7 +4,7 @@ use Flow\ETL\Row; -final class DOMNodeAttribute implements ScalarFunction +final class DOMNodeAttribute extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref, private readonly string $attribute) { diff --git a/src/core/etl/src/Flow/ETL/Function/DOMNodeValue.php b/src/core/etl/src/Flow/ETL/Function/DOMNodeValue.php index 72a70601a..c385b35e1 100644 --- a/src/core/etl/src/Flow/ETL/Function/DOMNodeValue.php +++ b/src/core/etl/src/Flow/ETL/Function/DOMNodeValue.php @@ -4,7 +4,7 @@ use Flow\ETL\Row; -final class DOMNodeValue implements ScalarFunction +final class DOMNodeValue extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref) { diff --git a/src/core/etl/src/Flow/ETL/Function/DateTimeFormat.php b/src/core/etl/src/Flow/ETL/Function/DateTimeFormat.php index 5a5a08df7..f708be8b1 100644 --- a/src/core/etl/src/Flow/ETL/Function/DateTimeFormat.php +++ b/src/core/etl/src/Flow/ETL/Function/DateTimeFormat.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class DateTimeFormat implements ScalarFunction +final class DateTimeFormat extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/Divide.php b/src/core/etl/src/Flow/ETL/Function/Divide.php index d8a1226b7..cab6ab445 100644 --- a/src/core/etl/src/Flow/ETL/Function/Divide.php +++ b/src/core/etl/src/Flow/ETL/Function/Divide.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Divide implements ScalarFunction +final class Divide extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $leftRef, diff --git a/src/core/etl/src/Flow/ETL/Function/EndsWith.php b/src/core/etl/src/Flow/ETL/Function/EndsWith.php index 4672761a7..505256597 100644 --- a/src/core/etl/src/Flow/ETL/Function/EndsWith.php +++ b/src/core/etl/src/Flow/ETL/Function/EndsWith.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class EndsWith implements ScalarFunction +final class EndsWith extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $haystack, diff --git a/src/core/etl/src/Flow/ETL/Function/EntryScalarFunction.php b/src/core/etl/src/Flow/ETL/Function/EntryScalarFunction.php deleted file mode 100644 index 987402079..000000000 --- a/src/core/etl/src/Flow/ETL/Function/EntryScalarFunction.php +++ /dev/null @@ -1,417 +0,0 @@ - ...$entryClass - */ - public function isType(string ...$entryClass) : ScalarFunction|EntryReference - { - if ([] === $entryClass) { - throw new InvalidArgumentException('isType expression requires at least one entryClass'); - } - - return new ScalarFunctions(new IsType($this, ...$entryClass)); - } - - public function jsonDecode(int $flags = JSON_THROW_ON_ERROR) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\JsonDecode($this, $flags)); - } - - public function jsonEncode(int $flags = JSON_THROW_ON_ERROR) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\JsonEncode($this, $flags)); - } - - public function lessThan(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new LessThan($this, $ref)); - } - - public function lessThanEqual(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new LessThanEqual($this, $ref)); - } - - public function literal(mixed $value) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Literal($value)); - } - - public function lower() : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\ToLower($this)); - } - - public function method(ScalarFunction $method, ScalarFunction ...$params) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\CallMethod($this, $method, ...$params)); - } - - public function minus(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Minus($this, $ref)); - } - - public function mod(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Mod($this, $ref)); - } - - public function multiply(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Multiply($this, $ref)); - } - - public function notEquals(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new NotEquals($this, $ref)); - } - - public function notSame(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new NotSame($this, $ref)); - } - - public function numberFormat(?ScalarFunction $decimals = null, ?ScalarFunction $decimalSeparator = null, ?ScalarFunction $thousandsSeparator = null) : ScalarFunction|EntryReference - { - if ($decimals === null) { - $decimals = lit(0); - } - - if ($decimalSeparator === null) { - $decimalSeparator = lit('.'); - } - - if ($thousandsSeparator === null) { - $thousandsSeparator = lit(','); - } - - return new ScalarFunctions(new Function\NumberFormat($this, $decimals, $decimalSeparator, $thousandsSeparator)); - } - - public function plus(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Plus($this, $ref)); - } - - public function power(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Power($this, $ref)); - } - - public function regexMatch(ScalarFunction $pattern) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\PregMatch($pattern, $this)); - } - - public function regexMatchAll(ScalarFunction $pattern, ?ScalarFunction $flags = null) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\PregMatchAll($pattern, $this, $flags)); - } - - public function regexReplace(ScalarFunction $pattern, ScalarFunction $replacement) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\PregReplace($pattern, $replacement, $this)); - } - - /** - * @param ScalarFunction $precision - * @param int<0, max> $mode - * - * @return ScalarFunction - */ - public function round(ScalarFunction $precision, int $mode = PHP_ROUND_HALF_UP) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\Round($this, $precision, $mode)); - } - - public function same(ScalarFunction $ref) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Same($this, $ref)); - } - - public function sanitize(?ScalarFunction $placeholder = null, ?ScalarFunction $skipCharacters = null) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\Sanitize($this, $placeholder ?: new Function\Literal('*'), $skipCharacters ?: new Function\Literal(0))); - } - - public function size() : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\Size($this)); - } - - public function sprintf(ScalarFunction ...$params) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\Sprintf($this, ...$params)); - } - - public function startsWith(ScalarFunction $needle) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new StartsWith($this, $needle)); - } - - public function strPad(int $length, string $pad_string = ' ', int $type = STR_PAD_RIGHT) : ScalarFunction|EntryReference - { - return new ScalarFunctions((new Function\StrPad($this, $length, $pad_string, $type))); - } - - /** - * @param string|string[] $search - * @param string|string[] $replace - */ - public function strReplace(string|array $search, string|array $replace) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\StrReplace($this, $search, $replace)); - } - - /** - * @param string $format - current format of the date that will be used to create DateTimeImmutable instance - * @param \DateTimeZone $timeZone - * - * @return ScalarFunction - */ - public function toDate(string $format = \DateTimeInterface::RFC3339, \DateTimeZone $timeZone = new \DateTimeZone('UTC')) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\ToDate($this, $format, $timeZone)); - } - - /** - * @param string $format - current format of the date that will be used to create DateTimeImmutable instance - * @param \DateTimeZone $timeZone - * - * @return ScalarFunction - */ - public function toDateTime(string $format = 'Y-m-d H:i:s', \DateTimeZone $timeZone = new \DateTimeZone('UTC')) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\ToDateTime($this, $format, $timeZone)); - } - - public function trim(Trim\Type $type = Trim\Type::BOTH, string $characters = " \t\n\r\0\x0B") : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\Trim($this, $type, $characters)); - } - - /** - * Unpacks each element of an array into a new entry, using the array key as the entry name. - * - * Before: - * +--+-------------------+ - * |id| array| - * +--+-------------------+ - * | 1|{"a":1,"b":2,"c":3}| - * | 2|{"d":4,"e":5,"f":6}| - * +--+-------------------+ - * - * After: - * +--+-----+-----+-----+-----+-----+ - * |id|arr.b|arr.c|arr.d|arr.e|arr.f| - * +--+-----+-----+-----+-----+-----+ - * | 1| 2| 3| | | | - * | 2| | | 4| 5| 6| - * +--+-----+-----+-----+-----+-----+ - */ - public function unpack(array $skipKeys = [], ?string $entryPrefix = null) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\ArrayUnpack($this, $skipKeys, $entryPrefix)); - } - - public function upper() : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\ToUpper($this)); - } - - public function xpath(string $string) : ScalarFunction|EntryReference - { - return new ScalarFunctions(new Function\XPath($this, $string)); - } -} diff --git a/src/core/etl/src/Flow/ETL/Function/Equals.php b/src/core/etl/src/Flow/ETL/Function/Equals.php index 66ad3cae7..1de450037 100644 --- a/src/core/etl/src/Flow/ETL/Function/Equals.php +++ b/src/core/etl/src/Flow/ETL/Function/Equals.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Equals implements ScalarFunction +final class Equals extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $base, diff --git a/src/core/etl/src/Flow/ETL/Function/Exists.php b/src/core/etl/src/Flow/ETL/Function/Exists.php index 1c5a27e67..b30b51e38 100644 --- a/src/core/etl/src/Flow/ETL/Function/Exists.php +++ b/src/core/etl/src/Flow/ETL/Function/Exists.php @@ -7,7 +7,7 @@ use Flow\ETL\Row; use Flow\ETL\Row\Reference; -final class Exists implements ScalarFunction +final class Exists extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref) { diff --git a/src/core/etl/src/Flow/ETL/Function/GreaterThan.php b/src/core/etl/src/Flow/ETL/Function/GreaterThan.php index eef962406..f8471b8e3 100644 --- a/src/core/etl/src/Flow/ETL/Function/GreaterThan.php +++ b/src/core/etl/src/Flow/ETL/Function/GreaterThan.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class GreaterThan implements ScalarFunction +final class GreaterThan extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $base, diff --git a/src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php b/src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php index a0cee80f5..e91e5216c 100644 --- a/src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php +++ b/src/core/etl/src/Flow/ETL/Function/GreaterThanEqual.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class GreaterThanEqual implements ScalarFunction +final class GreaterThanEqual extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $base, diff --git a/src/core/etl/src/Flow/ETL/Function/Hash.php b/src/core/etl/src/Flow/ETL/Function/Hash.php index df231c710..96aa58395 100644 --- a/src/core/etl/src/Flow/ETL/Function/Hash.php +++ b/src/core/etl/src/Flow/ETL/Function/Hash.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Hash implements ScalarFunction +final class Hash extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/IsIn.php b/src/core/etl/src/Flow/ETL/Function/IsIn.php index fa1ece96a..a4c7888a4 100644 --- a/src/core/etl/src/Flow/ETL/Function/IsIn.php +++ b/src/core/etl/src/Flow/ETL/Function/IsIn.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class IsIn implements ScalarFunction +final class IsIn extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $haystack, diff --git a/src/core/etl/src/Flow/ETL/Function/IsNotNull.php b/src/core/etl/src/Flow/ETL/Function/IsNotNull.php index 8eb512115..4d5232c54 100644 --- a/src/core/etl/src/Flow/ETL/Function/IsNotNull.php +++ b/src/core/etl/src/Flow/ETL/Function/IsNotNull.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class IsNotNull implements ScalarFunction +final class IsNotNull extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref diff --git a/src/core/etl/src/Flow/ETL/Function/IsNotNumeric.php b/src/core/etl/src/Flow/ETL/Function/IsNotNumeric.php index 42863f876..937c04f5f 100644 --- a/src/core/etl/src/Flow/ETL/Function/IsNotNumeric.php +++ b/src/core/etl/src/Flow/ETL/Function/IsNotNumeric.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class IsNotNumeric implements ScalarFunction +final class IsNotNumeric extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref diff --git a/src/core/etl/src/Flow/ETL/Function/IsNull.php b/src/core/etl/src/Flow/ETL/Function/IsNull.php index 04d545dc5..a85d4c046 100644 --- a/src/core/etl/src/Flow/ETL/Function/IsNull.php +++ b/src/core/etl/src/Flow/ETL/Function/IsNull.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class IsNull implements ScalarFunction +final class IsNull extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref diff --git a/src/core/etl/src/Flow/ETL/Function/IsNumeric.php b/src/core/etl/src/Flow/ETL/Function/IsNumeric.php index 54583b3ec..7a5fe90e6 100644 --- a/src/core/etl/src/Flow/ETL/Function/IsNumeric.php +++ b/src/core/etl/src/Flow/ETL/Function/IsNumeric.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class IsNumeric implements ScalarFunction +final class IsNumeric extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref diff --git a/src/core/etl/src/Flow/ETL/Function/IsType.php b/src/core/etl/src/Flow/ETL/Function/IsType.php index 6c99a99eb..d0b13a755 100644 --- a/src/core/etl/src/Flow/ETL/Function/IsType.php +++ b/src/core/etl/src/Flow/ETL/Function/IsType.php @@ -9,7 +9,7 @@ use Flow\ETL\Row\Entry; use Flow\ETL\Row\Reference; -final class IsType implements ScalarFunction +final class IsType extends ScalarFunctionChain { /** * @var array> diff --git a/src/core/etl/src/Flow/ETL/Function/JsonDecode.php b/src/core/etl/src/Flow/ETL/Function/JsonDecode.php index efe39de2f..c79cd8303 100644 --- a/src/core/etl/src/Flow/ETL/Function/JsonDecode.php +++ b/src/core/etl/src/Flow/ETL/Function/JsonDecode.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class JsonDecode implements ScalarFunction +final class JsonDecode extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref, private readonly int $flags = JSON_THROW_ON_ERROR) { diff --git a/src/core/etl/src/Flow/ETL/Function/JsonEncode.php b/src/core/etl/src/Flow/ETL/Function/JsonEncode.php index e3884762f..78367b5ed 100644 --- a/src/core/etl/src/Flow/ETL/Function/JsonEncode.php +++ b/src/core/etl/src/Flow/ETL/Function/JsonEncode.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class JsonEncode implements ScalarFunction +final class JsonEncode extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref, private readonly int $flags = JSON_THROW_ON_ERROR) { diff --git a/src/core/etl/src/Flow/ETL/Function/LessThan.php b/src/core/etl/src/Flow/ETL/Function/LessThan.php index 76bc8f519..4c385487b 100644 --- a/src/core/etl/src/Flow/ETL/Function/LessThan.php +++ b/src/core/etl/src/Flow/ETL/Function/LessThan.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class LessThan implements ScalarFunction +final class LessThan extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $base, diff --git a/src/core/etl/src/Flow/ETL/Function/LessThanEqual.php b/src/core/etl/src/Flow/ETL/Function/LessThanEqual.php index 47845926b..c040a6c55 100644 --- a/src/core/etl/src/Flow/ETL/Function/LessThanEqual.php +++ b/src/core/etl/src/Flow/ETL/Function/LessThanEqual.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class LessThanEqual implements ScalarFunction +final class LessThanEqual extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $base, diff --git a/src/core/etl/src/Flow/ETL/Function/Literal.php b/src/core/etl/src/Flow/ETL/Function/Literal.php index 8ee132229..b7388e7e1 100644 --- a/src/core/etl/src/Flow/ETL/Function/Literal.php +++ b/src/core/etl/src/Flow/ETL/Function/Literal.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Literal implements ScalarFunction +final class Literal extends ScalarFunctionChain { public function __construct( private readonly mixed $value diff --git a/src/core/etl/src/Flow/ETL/Function/Minus.php b/src/core/etl/src/Flow/ETL/Function/Minus.php index 76b25ede0..cb20df209 100644 --- a/src/core/etl/src/Flow/ETL/Function/Minus.php +++ b/src/core/etl/src/Flow/ETL/Function/Minus.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Minus implements ScalarFunction +final class Minus extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $leftRef, diff --git a/src/core/etl/src/Flow/ETL/Function/Mod.php b/src/core/etl/src/Flow/ETL/Function/Mod.php index ac37f010a..8e087c02e 100644 --- a/src/core/etl/src/Flow/ETL/Function/Mod.php +++ b/src/core/etl/src/Flow/ETL/Function/Mod.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Mod implements ScalarFunction +final class Mod extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $leftRef, diff --git a/src/core/etl/src/Flow/ETL/Function/Multiply.php b/src/core/etl/src/Flow/ETL/Function/Multiply.php index 14aa6e1f0..0ca23773f 100644 --- a/src/core/etl/src/Flow/ETL/Function/Multiply.php +++ b/src/core/etl/src/Flow/ETL/Function/Multiply.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Multiply implements ScalarFunction +final class Multiply extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $leftRef, diff --git a/src/core/etl/src/Flow/ETL/Function/Not.php b/src/core/etl/src/Flow/ETL/Function/Not.php index 30fc724ff..943624bcd 100644 --- a/src/core/etl/src/Flow/ETL/Function/Not.php +++ b/src/core/etl/src/Flow/ETL/Function/Not.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Not implements ScalarFunction +final class Not extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $function) { diff --git a/src/core/etl/src/Flow/ETL/Function/NotEquals.php b/src/core/etl/src/Flow/ETL/Function/NotEquals.php index 5c838db7f..e593e2e7a 100644 --- a/src/core/etl/src/Flow/ETL/Function/NotEquals.php +++ b/src/core/etl/src/Flow/ETL/Function/NotEquals.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class NotEquals implements ScalarFunction +final class NotEquals extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $base, diff --git a/src/core/etl/src/Flow/ETL/Function/NotSame.php b/src/core/etl/src/Flow/ETL/Function/NotSame.php index f6d3b079f..3cacf822b 100644 --- a/src/core/etl/src/Flow/ETL/Function/NotSame.php +++ b/src/core/etl/src/Flow/ETL/Function/NotSame.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class NotSame implements ScalarFunction +final class NotSame extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $base, diff --git a/src/core/etl/src/Flow/ETL/Function/Now.php b/src/core/etl/src/Flow/ETL/Function/Now.php index 16630dcfd..52f502b8c 100644 --- a/src/core/etl/src/Flow/ETL/Function/Now.php +++ b/src/core/etl/src/Flow/ETL/Function/Now.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Now implements ScalarFunction +final class Now extends ScalarFunctionChain { public function __construct(private readonly \DateTimeZone $timeZone = new \DateTimeZone('UTC')) { diff --git a/src/core/etl/src/Flow/ETL/Function/NumberFormat.php b/src/core/etl/src/Flow/ETL/Function/NumberFormat.php index fb49823f5..0369ae43b 100644 --- a/src/core/etl/src/Flow/ETL/Function/NumberFormat.php +++ b/src/core/etl/src/Flow/ETL/Function/NumberFormat.php @@ -4,7 +4,7 @@ use Flow\ETL\Row; -final class NumberFormat implements ScalarFunction +final class NumberFormat extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/Optional.php b/src/core/etl/src/Flow/ETL/Function/Optional.php index 5b58c0201..7964d6268 100644 --- a/src/core/etl/src/Flow/ETL/Function/Optional.php +++ b/src/core/etl/src/Flow/ETL/Function/Optional.php @@ -4,10 +4,8 @@ use Flow\ETL\Row; -final class Optional implements ScalarFunction +final class Optional extends ScalarFunctionChain { - use EntryScalarFunction; - public function __construct(private readonly ScalarFunction $function) { } diff --git a/src/core/etl/src/Flow/ETL/Function/Plus.php b/src/core/etl/src/Flow/ETL/Function/Plus.php index abc599309..340d01071 100644 --- a/src/core/etl/src/Flow/ETL/Function/Plus.php +++ b/src/core/etl/src/Flow/ETL/Function/Plus.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Plus implements ScalarFunction +final class Plus extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $leftRef, diff --git a/src/core/etl/src/Flow/ETL/Function/Power.php b/src/core/etl/src/Flow/ETL/Function/Power.php index 5d3850b0e..a1dbb8128 100644 --- a/src/core/etl/src/Flow/ETL/Function/Power.php +++ b/src/core/etl/src/Flow/ETL/Function/Power.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Power implements ScalarFunction +final class Power extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $leftRef, diff --git a/src/core/etl/src/Flow/ETL/Function/PregMatch.php b/src/core/etl/src/Flow/ETL/Function/PregMatch.php index ff048aac7..2f4e97cda 100644 --- a/src/core/etl/src/Flow/ETL/Function/PregMatch.php +++ b/src/core/etl/src/Flow/ETL/Function/PregMatch.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class PregMatch implements ScalarFunction +final class PregMatch extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $pattern, diff --git a/src/core/etl/src/Flow/ETL/Function/PregMatchAll.php b/src/core/etl/src/Flow/ETL/Function/PregMatchAll.php index afd9a9389..08ec86274 100644 --- a/src/core/etl/src/Flow/ETL/Function/PregMatchAll.php +++ b/src/core/etl/src/Flow/ETL/Function/PregMatchAll.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class PregMatchAll implements ScalarFunction +final class PregMatchAll extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $pattern, diff --git a/src/core/etl/src/Flow/ETL/Function/PregReplace.php b/src/core/etl/src/Flow/ETL/Function/PregReplace.php index db6e5d8cd..2995375ce 100644 --- a/src/core/etl/src/Flow/ETL/Function/PregReplace.php +++ b/src/core/etl/src/Flow/ETL/Function/PregReplace.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class PregReplace implements ScalarFunction +final class PregReplace extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $pattern, diff --git a/src/core/etl/src/Flow/ETL/Function/Round.php b/src/core/etl/src/Flow/ETL/Function/Round.php index 684cd6741..eaf67d768 100644 --- a/src/core/etl/src/Flow/ETL/Function/Round.php +++ b/src/core/etl/src/Flow/ETL/Function/Round.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Round implements ScalarFunction +final class Round extends ScalarFunctionChain { /** * @param ScalarFunction $entry diff --git a/src/core/etl/src/Flow/ETL/Function/Same.php b/src/core/etl/src/Flow/ETL/Function/Same.php index da246924a..3fad94283 100644 --- a/src/core/etl/src/Flow/ETL/Function/Same.php +++ b/src/core/etl/src/Flow/ETL/Function/Same.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Same implements ScalarFunction +final class Same extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $base, diff --git a/src/core/etl/src/Flow/ETL/Function/Sanitize.php b/src/core/etl/src/Flow/ETL/Function/Sanitize.php index 6eca16364..95e7f805e 100644 --- a/src/core/etl/src/Flow/ETL/Function/Sanitize.php +++ b/src/core/etl/src/Flow/ETL/Function/Sanitize.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Sanitize implements ScalarFunction +final class Sanitize extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ScalarFunction/ExpandResults.php b/src/core/etl/src/Flow/ETL/Function/ScalarFunction/ExpandResults.php index 4c34c3962..411ddc88f 100644 --- a/src/core/etl/src/Flow/ETL/Function/ScalarFunction/ExpandResults.php +++ b/src/core/etl/src/Flow/ETL/Function/ScalarFunction/ExpandResults.php @@ -6,5 +6,5 @@ interface ExpandResults { - public function expand() : bool; + public function expandResults() : bool; } diff --git a/src/core/etl/src/Flow/ETL/Function/ScalarFunction/UnpackResults.php b/src/core/etl/src/Flow/ETL/Function/ScalarFunction/UnpackResults.php index be2b584d7..7e7f38f8c 100644 --- a/src/core/etl/src/Flow/ETL/Function/ScalarFunction/UnpackResults.php +++ b/src/core/etl/src/Flow/ETL/Function/ScalarFunction/UnpackResults.php @@ -6,5 +6,5 @@ interface UnpackResults { - public function unpack() : bool; + public function unpackResults() : bool; } diff --git a/src/core/etl/src/Flow/ETL/Function/ScalarFunctionChain.php b/src/core/etl/src/Flow/ETL/Function/ScalarFunctionChain.php new file mode 100644 index 000000000..2b16b7acb --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Function/ScalarFunctionChain.php @@ -0,0 +1,429 @@ + ...$entryClass + */ + public function isType(string ...$entryClass) : self + { + if ([] === $entryClass) { + throw new InvalidArgumentException('isType expression requires at least one entryClass'); + } + + return new IsType($this, ...$entryClass); + } + + public function jsonDecode(int $flags = JSON_THROW_ON_ERROR) : self + { + return new Function\JsonDecode($this, $flags); + } + + public function jsonEncode(int $flags = JSON_THROW_ON_ERROR) : self + { + return new Function\JsonEncode($this, $flags); + } + + public function lessThan(ScalarFunction $ref) : self + { + return new LessThan($this, $ref); + } + + public function lessThanEqual(ScalarFunction $ref) : self + { + return new LessThanEqual($this, $ref); + } + + public function literal(mixed $value) : self + { + return new Literal($value); + } + + public function lower() : self + { + return new Function\ToLower($this); + } + + public function method(ScalarFunction $method, ScalarFunction ...$params) : self + { + return new Function\CallMethod($this, $method, ...$params); + } + + public function minus(ScalarFunction $ref) : self + { + return new Minus($this, $ref); + } + + public function mod(ScalarFunction $ref) : self + { + return new Mod($this, $ref); + } + + public function multiply(ScalarFunction $ref) : self + { + return new Multiply($this, $ref); + } + + public function notEquals(ScalarFunction $ref) : self + { + return new NotEquals($this, $ref); + } + + public function notSame(ScalarFunction $ref) : self + { + return new NotSame($this, $ref); + } + + public function numberFormat(?ScalarFunction $decimals = null, ?ScalarFunction $decimalSeparator = null, ?ScalarFunction $thousandsSeparator = null) : self + { + if ($decimals === null) { + $decimals = lit(0); + } + + if ($decimalSeparator === null) { + $decimalSeparator = lit('.'); + } + + if ($thousandsSeparator === null) { + $thousandsSeparator = lit(','); + } + + return new Function\NumberFormat($this, $decimals, $decimalSeparator, $thousandsSeparator); + } + + public function plus(ScalarFunction $ref) : self + { + return new Plus($this, $ref); + } + + public function power(ScalarFunction $ref) : self + { + return new Power($this, $ref); + } + + public function regexMatch(ScalarFunction $pattern) : self + { + return new Function\PregMatch($pattern, $this); + } + + public function regexMatchAll(ScalarFunction $pattern, ?ScalarFunction $flags = null) : self + { + return new Function\PregMatchAll($pattern, $this, $flags); + } + + public function regexReplace(ScalarFunction $pattern, ScalarFunction $replacement) : self + { + return new Function\PregReplace($pattern, $replacement, $this); + } + + /** + * @param ScalarFunction $precision + * @param int<0, max> $mode + */ + public function round(ScalarFunction $precision, int $mode = PHP_ROUND_HALF_UP) : self + { + return new Function\Round($this, $precision, $mode); + } + + public function same(ScalarFunction $ref) : self + { + return new Same($this, $ref); + } + + public function sanitize(?ScalarFunction $placeholder = null, ?ScalarFunction $skipCharacters = null) : self + { + return new Function\Sanitize($this, $placeholder ?: new Function\Literal('*'), $skipCharacters ?: new Function\Literal(0)); + } + + public function size() : self + { + return new Function\Size($this); + } + + public function sprintf(ScalarFunction ...$params) : self + { + return new Function\Sprintf($this, ...$params); + } + + public function startsWith(ScalarFunction $needle) : self + { + return new StartsWith($this, $needle); + } + + public function strPad(int $length, string $pad_string = ' ', int $type = STR_PAD_RIGHT) : self + { + return new Function\StrPad($this, $length, $pad_string, $type); + } + + public function strPadBoth(int $length, string $pad_string = ' ') : self + { + return new Function\StrPad($this, $length, $pad_string, STR_PAD_BOTH); + } + + public function strPadLeft(int $length, string $pad_string = ' ') : self + { + return new Function\StrPad($this, $length, $pad_string, STR_PAD_LEFT); + } + + public function strPadRight(int $length, string $pad_string = ' ') : self + { + return new Function\StrPad($this, $length, $pad_string, STR_PAD_RIGHT); + } + + /** + * @param string|string[] $search + * @param string|string[] $replace + */ + public function strReplace(string|array $search, string|array $replace) : self + { + return new Function\StrReplace($this, $search, $replace); + } + + /** + * @param string $format - current format of the date that will be used to create DateTimeImmutable instance + * @param \DateTimeZone $timeZone + */ + public function toDate(string $format = \DateTimeInterface::RFC3339, \DateTimeZone $timeZone = new \DateTimeZone('UTC')) : self + { + return new Function\ToDate($this, $format, $timeZone); + } + + /** + * @param string $format - current format of the date that will be used to create DateTimeImmutable instance + * @param \DateTimeZone $timeZone + */ + public function toDateTime(string $format = 'Y-m-d H:i:s', \DateTimeZone $timeZone = new \DateTimeZone('UTC')) : self + { + return new Function\ToDateTime($this, $format, $timeZone); + } + + public function trim(Trim\Type $type = Trim\Type::BOTH, string $characters = " \t\n\r\0\x0B") : self + { + return new Function\Trim($this, $type, $characters); + } + + /** + * Unpacks each element of an array into a new entry, using the array key as the entry name. + * + * Before: + * +--+-------------------+ + * |id| array| + * +--+-------------------+ + * | 1|{"a":1,"b":2,"c":3}| + * | 2|{"d":4,"e":5,"f":6}| + * +--+-------------------+ + * + * After: + * +--+-----+-----+-----+-----+-----+ + * |id|arr.b|arr.c|arr.d|arr.e|arr.f| + * +--+-----+-----+-----+-----+-----+ + * | 1| 2| 3| | | | + * | 2| | | 4| 5| 6| + * +--+-----+-----+-----+-----+-----+ + */ + public function unpack(array $skipKeys = [], ?string $entryPrefix = null) : self + { + return new Function\ArrayUnpack($this, $skipKeys, $entryPrefix); + } + + public function upper() : self + { + return new Function\ToUpper($this); + } + + public function xpath(string $string) : self + { + return new Function\XPath($this, $string); + } +} diff --git a/src/core/etl/src/Flow/ETL/Function/ScalarFunctions.php b/src/core/etl/src/Flow/ETL/Function/ScalarFunctions.php deleted file mode 100644 index 3837d0d1c..000000000 --- a/src/core/etl/src/Flow/ETL/Function/ScalarFunctions.php +++ /dev/null @@ -1,62 +0,0 @@ - - */ - private array $functions; - - public function __construct(ScalarFunction ...$functions) - { - $this->functions = $functions; - } - - public function eval(Row $row) : mixed - { - $lastValue = null; - - foreach ($this->functions as $function) { - $lastValue = $function->eval($row); - - if ($function instanceof Reference) { - $row = $row->set((new Row\Factory\NativeEntryFactory())->create($function->to(), $lastValue)); - } - } - - return $lastValue; - } - - public function expand() : bool - { - foreach ($this->functions as $function) { - if ($function instanceof ExpandResults) { - return $function->expand(); - } - } - - return false; - } - - public function unpack() : bool - { - foreach ($this->functions as $function) { - if ($function instanceof UnpackResults) { - return $function->unpack(); - } - } - - return false; - } -} diff --git a/src/core/etl/src/Flow/ETL/Function/Size.php b/src/core/etl/src/Flow/ETL/Function/Size.php index 1edf0e476..a9d1812e9 100644 --- a/src/core/etl/src/Flow/ETL/Function/Size.php +++ b/src/core/etl/src/Flow/ETL/Function/Size.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Size implements ScalarFunction +final class Size extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref) { diff --git a/src/core/etl/src/Flow/ETL/Function/Split.php b/src/core/etl/src/Flow/ETL/Function/Split.php index d109fa03f..36ef4ef4e 100644 --- a/src/core/etl/src/Flow/ETL/Function/Split.php +++ b/src/core/etl/src/Flow/ETL/Function/Split.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Split implements ScalarFunction +final class Split extends ScalarFunctionChain { /** * @param non-empty-string $separator diff --git a/src/core/etl/src/Flow/ETL/Function/Sprintf.php b/src/core/etl/src/Flow/ETL/Function/Sprintf.php index 6a3a6e487..c77f153a3 100644 --- a/src/core/etl/src/Flow/ETL/Function/Sprintf.php +++ b/src/core/etl/src/Flow/ETL/Function/Sprintf.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class Sprintf implements ScalarFunction +final class Sprintf extends ScalarFunctionChain { /** * @var array diff --git a/src/core/etl/src/Flow/ETL/Function/StartsWith.php b/src/core/etl/src/Flow/ETL/Function/StartsWith.php index b4d607560..f1d327990 100644 --- a/src/core/etl/src/Flow/ETL/Function/StartsWith.php +++ b/src/core/etl/src/Flow/ETL/Function/StartsWith.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class StartsWith implements ScalarFunction +final class StartsWith extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $haystack, diff --git a/src/core/etl/src/Flow/ETL/Function/StrPad.php b/src/core/etl/src/Flow/ETL/Function/StrPad.php index 84e60a4cc..4dc491a2d 100644 --- a/src/core/etl/src/Flow/ETL/Function/StrPad.php +++ b/src/core/etl/src/Flow/ETL/Function/StrPad.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class StrPad implements ScalarFunction +final class StrPad extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/StrReplace.php b/src/core/etl/src/Flow/ETL/Function/StrReplace.php index 4ba9f2d3e..b3931bfdc 100644 --- a/src/core/etl/src/Flow/ETL/Function/StrReplace.php +++ b/src/core/etl/src/Flow/ETL/Function/StrReplace.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class StrReplace implements ScalarFunction +final class StrReplace extends ScalarFunctionChain { /** * @param string|string[] $search diff --git a/src/core/etl/src/Flow/ETL/Function/ToDate.php b/src/core/etl/src/Flow/ETL/Function/ToDate.php index 85e363163..2e272f0dc 100644 --- a/src/core/etl/src/Flow/ETL/Function/ToDate.php +++ b/src/core/etl/src/Flow/ETL/Function/ToDate.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class ToDate implements ScalarFunction +final class ToDate extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ToDateTime.php b/src/core/etl/src/Flow/ETL/Function/ToDateTime.php index 843c8a5bc..cc360b865 100644 --- a/src/core/etl/src/Flow/ETL/Function/ToDateTime.php +++ b/src/core/etl/src/Flow/ETL/Function/ToDateTime.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class ToDateTime implements ScalarFunction +final class ToDateTime extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/ToLower.php b/src/core/etl/src/Flow/ETL/Function/ToLower.php index 1e5129d89..f5ee2b24d 100644 --- a/src/core/etl/src/Flow/ETL/Function/ToLower.php +++ b/src/core/etl/src/Flow/ETL/Function/ToLower.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class ToLower implements ScalarFunction +final class ToLower extends ScalarFunctionChain { public function __construct(private ScalarFunction $ref) { diff --git a/src/core/etl/src/Flow/ETL/Function/ToMoney.php b/src/core/etl/src/Flow/ETL/Function/ToMoney.php index 66692cae0..e4fa50b38 100644 --- a/src/core/etl/src/Flow/ETL/Function/ToMoney.php +++ b/src/core/etl/src/Flow/ETL/Function/ToMoney.php @@ -14,7 +14,7 @@ throw new RuntimeException("Money\MoneyParser class not found, please add moneyphp/money dependency to the project first."); } -final class ToMoney implements ScalarFunction +final class ToMoney extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $amountRef, diff --git a/src/core/etl/src/Flow/ETL/Function/ToTimeZone.php b/src/core/etl/src/Flow/ETL/Function/ToTimeZone.php index 747515e1c..4c9a5892e 100644 --- a/src/core/etl/src/Flow/ETL/Function/ToTimeZone.php +++ b/src/core/etl/src/Flow/ETL/Function/ToTimeZone.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class ToTimeZone implements ScalarFunction +final class ToTimeZone extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $function, diff --git a/src/core/etl/src/Flow/ETL/Function/ToUpper.php b/src/core/etl/src/Flow/ETL/Function/ToUpper.php index 82ab1d741..00c9075e0 100644 --- a/src/core/etl/src/Flow/ETL/Function/ToUpper.php +++ b/src/core/etl/src/Flow/ETL/Function/ToUpper.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class ToUpper implements ScalarFunction +final class ToUpper extends ScalarFunctionChain { public function __construct(private ScalarFunction $ref) { diff --git a/src/core/etl/src/Flow/ETL/Function/Trim.php b/src/core/etl/src/Flow/ETL/Function/Trim.php index 67a3bbb6d..622bc8fa9 100644 --- a/src/core/etl/src/Flow/ETL/Function/Trim.php +++ b/src/core/etl/src/Flow/ETL/Function/Trim.php @@ -7,7 +7,7 @@ use Flow\ETL\Function\Trim\Type; use Flow\ETL\Row; -final class Trim implements ScalarFunction +final class Trim extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/Ulid.php b/src/core/etl/src/Flow/ETL/Function/Ulid.php index 7b111f77f..5c1d53090 100644 --- a/src/core/etl/src/Flow/ETL/Function/Ulid.php +++ b/src/core/etl/src/Flow/ETL/Function/Ulid.php @@ -11,7 +11,7 @@ throw new RuntimeException("\Symfony\Component\Uid\Ulid class not found, please add 'symfony/uid' as a dependency to the project first."); } -final class Ulid implements ScalarFunction +final class Ulid extends ScalarFunctionChain { public function __construct(private readonly ?ScalarFunction $ref = null) { diff --git a/src/core/etl/src/Flow/ETL/Function/Uuid.php b/src/core/etl/src/Flow/ETL/Function/Uuid.php index 3d5c7f01b..a2441599d 100644 --- a/src/core/etl/src/Flow/ETL/Function/Uuid.php +++ b/src/core/etl/src/Flow/ETL/Function/Uuid.php @@ -11,7 +11,7 @@ throw new RuntimeException("\Ramsey\Uuid\Uuid nor \Symfony\Component\Uid\Uuid class not found, please add 'ramsey/uuid' or 'symfony/uid' as a dependency to the project first."); } -final class Uuid implements ScalarFunction +final class Uuid extends ScalarFunctionChain { private function __construct(private readonly string $uuidVersion, private readonly ?ScalarFunction $ref = null) { diff --git a/src/core/etl/src/Flow/ETL/Function/When.php b/src/core/etl/src/Flow/ETL/Function/When.php index a20241626..760f3cb5b 100644 --- a/src/core/etl/src/Flow/ETL/Function/When.php +++ b/src/core/etl/src/Flow/ETL/Function/When.php @@ -6,7 +6,7 @@ use Flow\ETL\Row; -final class When implements ScalarFunction +final class When extends ScalarFunctionChain { public function __construct( private readonly ScalarFunction $ref, diff --git a/src/core/etl/src/Flow/ETL/Function/XPath.php b/src/core/etl/src/Flow/ETL/Function/XPath.php index 9767b9a12..97a2b3d89 100644 --- a/src/core/etl/src/Flow/ETL/Function/XPath.php +++ b/src/core/etl/src/Flow/ETL/Function/XPath.php @@ -4,7 +4,7 @@ use Flow\ETL\Row; -final class XPath implements ScalarFunction +final class XPath extends ScalarFunctionChain { public function __construct(private readonly ScalarFunction $ref, private readonly string $path) { @@ -12,7 +12,6 @@ public function __construct(private readonly ScalarFunction $ref, private readon /** * @psalm-suppress InvalidReturnStatement - * @psalm-suppress InvalidReturnType */ public function eval(Row $row) : null|\DOMNode|array { diff --git a/src/core/etl/src/Flow/ETL/Partition.php b/src/core/etl/src/Flow/ETL/Partition.php index 39d9ceffc..426de9f0a 100644 --- a/src/core/etl/src/Flow/ETL/Partition.php +++ b/src/core/etl/src/Flow/ETL/Partition.php @@ -54,10 +54,7 @@ public static function fromArray(array $data) : array return $partitions; } - /** - * @return array - */ - public static function fromUri(string $uri) : array + public static function fromUri(string $uri) : Partitions { $regex = '/^([^\/\\\=:><|"?*]+)=([^\/\\\=:><|"?*]+)$/'; @@ -69,7 +66,7 @@ public static function fromUri(string $uri) : array } } - return $partitions; + return new Partitions(...$partitions); } public function __serialize() : array diff --git a/src/core/etl/src/Flow/ETL/Partition/FiltersCollection.php b/src/core/etl/src/Flow/ETL/Partition/FiltersCollection.php new file mode 100644 index 000000000..aee78b76c --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Partition/FiltersCollection.php @@ -0,0 +1,40 @@ +}> + */ +final class FiltersCollection implements PartitionFilter +{ + /** + * @param array $filters + */ + public function __construct(public readonly array $filters) + { + + } + + public function __serialize() : array + { + return ['filters' => $this->filters]; + } + + public function __unserialize(array $data) : void + { + $this->filters = $data['filters']; + } + + public function keep(Partition ...$partitions) : bool + { + foreach ($this->filters as $filter) { + if (!$filter->keep(...$partitions)) { + return false; + } + } + + return true; + } +} diff --git a/src/core/etl/src/Flow/ETL/Partitions.php b/src/core/etl/src/Flow/ETL/Partitions.php index 0a84381de..00b0f7186 100644 --- a/src/core/etl/src/Flow/ETL/Partitions.php +++ b/src/core/etl/src/Flow/ETL/Partitions.php @@ -43,6 +43,17 @@ public function getIterator() : \Traversable return new \ArrayIterator($this->partitions); } + public function has(string $name) : bool + { + foreach ($this->partitions as $partition) { + if ($partition->name === $name) { + return true; + } + } + + return false; + } + public function id() : string { $id = '|'; diff --git a/src/core/etl/src/Flow/ETL/Pipeline/PartitioningPipeline.php b/src/core/etl/src/Flow/ETL/Pipeline/PartitioningPipeline.php index 294173fb9..636a8300c 100644 --- a/src/core/etl/src/Flow/ETL/Pipeline/PartitioningPipeline.php +++ b/src/core/etl/src/Flow/ETL/Pipeline/PartitioningPipeline.php @@ -6,6 +6,7 @@ use function Flow\ETL\DSL\from_all; use function Flow\ETL\DSL\from_cache; +use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Extractor; use Flow\ETL\Extractor\CollectingExtractor; use Flow\ETL\FlowContext; @@ -21,10 +22,20 @@ final class PartitioningPipeline implements OverridingPipeline, Pipeline /** * @param Pipeline $pipeline + * @param array $partitionBy * @param array $orderBy + * + * @throws InvalidArgumentException */ - public function __construct(private readonly Pipeline $pipeline, private readonly array $orderBy = []) - { + public function __construct( + private readonly Pipeline $pipeline, + private readonly array $partitionBy = [], + private readonly array $orderBy = [] + ) { + if (!\count($this->partitionBy)) { + throw new InvalidArgumentException('PartitioningPipeline requires at least one partitionBy entry'); + } + $this->nextPipeline = $this->pipeline->cleanCopy(); } @@ -80,7 +91,7 @@ public function process(FlowContext $context) : \Generator $partitionIds = []; foreach ($this->pipeline->process($context) as $rows) { - foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) { + foreach ($rows->partitionBy(...$this->partitionBy) as $partitionedRows) { $rows = $partitionedRows->sortBy(...$this->orderBy); diff --git a/src/core/etl/src/Flow/ETL/Row/EntryReference.php b/src/core/etl/src/Flow/ETL/Row/EntryReference.php index cb4628966..474e7938b 100644 --- a/src/core/etl/src/Flow/ETL/Row/EntryReference.php +++ b/src/core/etl/src/Flow/ETL/Row/EntryReference.php @@ -4,17 +4,14 @@ namespace Flow\ETL\Row; -use Flow\ETL\Function\EntryScalarFunction; -use Flow\ETL\Function\ScalarFunction; +use Flow\ETL\Function\ScalarFunctionChain; use Flow\ETL\Row; /** * @implements Reference */ -final class EntryReference implements Reference, ScalarFunction +final class EntryReference extends ScalarFunctionChain implements Reference { - use EntryScalarFunction; - private ?string $alias = null; private SortOrder $sort = SortOrder::ASC; diff --git a/src/core/etl/src/Flow/ETL/Rows.php b/src/core/etl/src/Flow/ETL/Rows.php index 52f7019f7..63ea54947 100644 --- a/src/core/etl/src/Flow/ETL/Rows.php +++ b/src/core/etl/src/Flow/ETL/Rows.php @@ -466,7 +466,7 @@ public function map(callable $callable) : self $rows[] = $callable($row); } - return new self(...$rows); + return self::partitioned($rows, $this->partitions); } public function merge(self $rows) : self diff --git a/src/core/etl/src/Flow/ETL/Transformer/ScalarFunctionFilterTransformer.php b/src/core/etl/src/Flow/ETL/Transformer/ScalarFunctionFilterTransformer.php index 013163fc2..5d23a45cc 100644 --- a/src/core/etl/src/Flow/ETL/Transformer/ScalarFunctionFilterTransformer.php +++ b/src/core/etl/src/Flow/ETL/Transformer/ScalarFunctionFilterTransformer.php @@ -16,7 +16,7 @@ final class ScalarFunctionFilterTransformer implements Transformer { public function __construct( - private readonly ScalarFunction $function + public readonly ScalarFunction $function ) { } diff --git a/src/core/etl/src/Flow/ETL/Transformer/ScalarFunctionTransformer.php b/src/core/etl/src/Flow/ETL/Transformer/ScalarFunctionTransformer.php index 12bfa529d..ece6eef32 100644 --- a/src/core/etl/src/Flow/ETL/Transformer/ScalarFunctionTransformer.php +++ b/src/core/etl/src/Flow/ETL/Transformer/ScalarFunctionTransformer.php @@ -39,7 +39,7 @@ public function __unserialize(array $data) : void public function transform(Rows $rows, FlowContext $context) : Rows { - if ($this->function instanceof ExpandResults && $this->function->expand()) { + if ($this->function instanceof ExpandResults && $this->function->expandResults()) { return $rows->flatMap( fn (Row $r) : array => \array_map( fn ($val) : Row => new Row( @@ -57,7 +57,7 @@ function (Row $r) use ($context) : Row { $value = $this->function->eval($r); if (\is_array($value)) { - if ($this->function instanceof ScalarFunction\UnpackResults && $this->function->unpack()) { + if ($this->function instanceof ScalarFunction\UnpackResults && $this->function->unpackResults()) { /** * @var array-key $key * @var mixed $val diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2022/month=12/day=30/file.txt b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2022/month=12/day=30/file.txt new file mode 100644 index 000000000..a2b48737d --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2022/month=12/day=30/file.txt @@ -0,0 +1 @@ +2022-12-30 \ No newline at end of file diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2022/month=12/day=31/file.txt b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2022/month=12/day=31/file.txt new file mode 100644 index 000000000..2b53b8b1a --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2022/month=12/day=31/file.txt @@ -0,0 +1 @@ +2022-12-31 \ No newline at end of file diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=1/file.txt b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=1/file.txt new file mode 100644 index 000000000..fd78aceae --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=1/file.txt @@ -0,0 +1 @@ +2023-01-01 \ No newline at end of file diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=2/file.txt b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=2/file.txt new file mode 100644 index 000000000..f5e66c56a --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=2/file.txt @@ -0,0 +1 @@ +2023-01-02 \ No newline at end of file diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=3/file.txt b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=3/file.txt new file mode 100644 index 000000000..d4074cd8c --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=3/file.txt @@ -0,0 +1 @@ +2023-01-03 \ No newline at end of file diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=4/file.txt b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=4/file.txt new file mode 100644 index 000000000..19cef5c5f --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=4/file.txt @@ -0,0 +1 @@ +2023-01-04 \ No newline at end of file diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=5/file.txt b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=5/file.txt new file mode 100644 index 000000000..d947eec20 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/Fixtures/Partitioning/multi_partition_pruning_test/year=2023/month=1/day=5/file.txt @@ -0,0 +1 @@ +2023-01-05 \ No newline at end of file diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php index a3e51ffa2..d253191b6 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/PartitioningTest.php @@ -2,6 +2,7 @@ namespace Flow\ETL\Tests\Integration\DataFrame; +use function Flow\ETL\Adapter\Text\from_text; use function Flow\ETL\DSL\df; use function Flow\ETL\DSL\from_rows; use function Flow\ETL\DSL\int_entry; @@ -16,41 +17,6 @@ final class PartitioningTest extends IntegrationTestCase { - public function test_filter_partitions() : void - { - $partitionedRows = df() - ->read(from_rows( - rows( - row(int_entry('id', 1), str_entry('country', 'PL'), int_entry('age', 20)), - row(int_entry('id', 2), str_entry('country', 'PL'), int_entry('age', 20)), - row(int_entry('id', 3), str_entry('country', 'PL'), int_entry('age', 25)), - row(int_entry('id', 4), str_entry('country', 'PL'), int_entry('age', 30)), - row(int_entry('id', 5), str_entry('country', 'US'), int_entry('age', 40)), - row(int_entry('id', 6), str_entry('country', 'US'), int_entry('age', 40)), - row(int_entry('id', 7), str_entry('country', 'US'), int_entry('age', 45)), - row(int_entry('id', 9), str_entry('country', 'US'), int_entry('age', 50)), - ) - )) - ->partitionBy('country') - ->filterPartitions(ref('country')->equals(lit('US'))) - ->fetch(); - - $this->assertEquals( - rows_partitioned( - [ - row(int_entry('id', 5), str_entry('country', 'US'), int_entry('age', 40)), - row(int_entry('id', 6), str_entry('country', 'US'), int_entry('age', 40)), - row(int_entry('id', 7), str_entry('country', 'US'), int_entry('age', 45)), - row(int_entry('id', 9), str_entry('country', 'US'), int_entry('age', 50)), - ], - [ - partition('country', 'US'), - ] - ), - $partitionedRows - ); - } - public function test_partition_by() : void { $rows = df() @@ -67,7 +33,6 @@ public function test_partition_by() : void ) )) ->partitionBy(ref('country')) - ->batchSize(2) // split each partition into two ->get(); $this->assertEquals( @@ -76,13 +41,6 @@ public function test_partition_by() : void [ row(int_entry('id', 1), str_entry('country', 'PL'), int_entry('age', 20)), row(int_entry('id', 2), str_entry('country', 'PL'), int_entry('age', 20)), - ], - [ - partition('country', 'PL'), - ] - ), - rows_partitioned( - [ row(int_entry('id', 3), str_entry('country', 'PL'), int_entry('age', 25)), row(int_entry('id', 4), str_entry('country', 'PL'), int_entry('age', 30)), ], @@ -94,13 +52,6 @@ public function test_partition_by() : void [ row(int_entry('id', 5), str_entry('country', 'US'), int_entry('age', 40)), row(int_entry('id', 6), str_entry('country', 'US'), int_entry('age', 40)), - ], - [ - partition('country', 'US'), - ] - ), - rows_partitioned( - [ row(int_entry('id', 7), str_entry('country', 'US'), int_entry('age', 45)), row(int_entry('id', 9), str_entry('country', 'US'), int_entry('age', 50)), ], @@ -112,4 +63,35 @@ public function test_partition_by() : void \iterator_to_array($rows) ); } + + public function test_pruning_multiple_partitions() : void + { + $rows = df() + ->read(from_text(__DIR__ . '/Fixtures/Partitioning/multi_partition_pruning_test/year=*/month=*/day=*/*.txt')) + ->filterPartitions(ref('year')->cast('int')->greaterThanEqual(lit(2023))) + ->filterPartitions(ref('month')->cast('int')->greaterThanEqual(lit(1))) + ->filterPartitions(ref('day')->cast('int')->lessThan(lit(3))) + ->filter(ref('text')->notEquals(lit('dupa'))) + ->withEntry('day', ref('day')->cast('int')) + ->collect() + ->fetch(); + + $this->assertCount(2, $rows); + $this->assertSame([1, 2], $rows->reduceToArray('day')); + } + + public function test_pruning_single_partition() : void + { + $rows = df() + ->read(from_text(__DIR__ . '/Fixtures/Partitioning/multi_partition_pruning_test/year=*/month=*/day=*/*.txt')) + ->filterPartitions(ref('year')->concat(lit('-'), ref('month')->strPadLeft(2, '0'), lit('-'), ref('day')->strPadLeft(2, '0'))->cast('date')->greaterThanEqual(lit(new \DateTimeImmutable('2023-01-01')))) + ->collect() + ->select('year') + ->withEntry('year', ref('year')->cast('int')) + ->groupBy(ref('year')) + ->fetch(); + + $this->assertCount(1, $rows); + $this->assertSame(2023, $rows->first()->valueOf('year')); + } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayGetCollectionTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayGetCollectionTest.php index 6b65a6fe7..42e1e19b7 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayGetCollectionTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayGetCollectionTest.php @@ -4,7 +4,6 @@ namespace Flow\ETL\Tests\Integration\Function; -use function Flow\ETL\DSL\array_get_collection; use function Flow\ETL\DSL\from_array; use function Flow\ETL\DSL\ref; use function Flow\ETL\DSL\to_memory; @@ -29,7 +28,7 @@ public function test_array_get_collection() : void ] ) ) - ->withEntry('result', array_get_collection(ref('array'), 'a', 'c')) + ->withEntry('result', ref('array')->arrayGetCollection('a', 'c')) ->drop('array') ->write(to_memory($memory = new ArrayMemory())) ->run(); diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayGetTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayGetTest.php index 1eca08a37..defd16c22 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayGetTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayGetTest.php @@ -4,7 +4,6 @@ namespace Flow\ETL\Tests\Integration\Function; -use function Flow\ETL\DSL\array_get; use function Flow\ETL\DSL\from_array; use function Flow\ETL\DSL\ref; use function Flow\ETL\DSL\to_memory; @@ -25,7 +24,7 @@ public function test_array_get() : void ] ) ) - ->withEntry('result', array_get(ref('array'), 'b')) + ->withEntry('result', ref('array')->arrayGet('b')) ->drop('array') ->write(to_memory($memory = new ArrayMemory())) ->run(); diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayMergeCollectionTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayMergeCollectionTest.php index 9faa61edc..a4bf1883e 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayMergeCollectionTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Function/ArrayMergeCollectionTest.php @@ -4,7 +4,6 @@ namespace Flow\ETL\Tests\Integration\Function; -use function Flow\ETL\DSL\array_merge_collection; use function Flow\ETL\DSL\from_array; use function Flow\ETL\DSL\optional; use function Flow\ETL\DSL\ref; @@ -30,7 +29,7 @@ public function test_array_merge_collection() : void ] ) ) - ->withEntry('result', optional(array_merge_collection(ref('array')))) + ->withEntry('result', optional(ref('array')->arrayMergeCollection())) ->drop('array') ->write(to_memory($memory = new ArrayMemory())) ->run(); diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/DataFrameTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/DataFrameTest.php index 24bcf2c65..c35f4b21f 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/DataFrameTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/DataFrameTest.php @@ -20,7 +20,6 @@ use function Flow\ETL\DSL\str_entry; use function Flow\ETL\DSL\to_callable; use Flow\ETL\DataFrame; -use Flow\ETL\DSL\Transform; use Flow\ETL\ErrorHandler\IgnoreError; use Flow\ETL\Extractor; use Flow\ETL\Flow; @@ -625,7 +624,7 @@ public function test_void() : void ->rename('country', 'country_code') ->void() ->aggregate(average(ref('age'))) - ->rows(Transform::rename('age_avg', 'average_age')) + ->rename('age_avg', 'average_age') ->fetch(); $this->assertEquals( diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/ArrayGetCollectionTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/ArrayGetCollectionTest.php index e4a8cf93e..d29140221 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/ArrayGetCollectionTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/ArrayGetCollectionTest.php @@ -99,7 +99,7 @@ public function test_getting_specific_keys_from_first_element_in_collection_of_a [ 'parent_id' => 1, ], - array_get_collection_first(ref('array_entry'), 'parent_id')->eval($row) + ref('array_entry')->arrayGetCollectionFirst('parent_id')->eval($row) ); } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/MathTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/MathTest.php index 0d3bb3fc5..0a40b3d02 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/MathTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/MathTest.php @@ -8,6 +8,7 @@ use function Flow\ETL\DSL\int_entry; use function Flow\ETL\DSL\lit; use function Flow\ETL\DSL\ref; +use function Flow\ETL\DSL\row; use Flow\ETL\Function\Divide; use Flow\ETL\Function\Minus; use Flow\ETL\Function\Mod; @@ -15,14 +16,13 @@ use Flow\ETL\Function\Plus; use Flow\ETL\Function\Power; use Flow\ETL\Function\Round; -use Flow\ETL\Row; use PHPUnit\Framework\TestCase; final class MathTest extends TestCase { public function test_divide() : void { - $row = Row::create(int_entry('a', 100), int_entry('b', 10)); + $row = row(int_entry('a', 100), int_entry('b', 10)); $this->assertSame( 10, @@ -32,7 +32,7 @@ public function test_divide() : void public function test_minus() : void { - $row = Row::create(int_entry('a', 100), int_entry('b', 100)); + $row = row(int_entry('a', 100), int_entry('b', 100)); $this->assertSame( 0, @@ -42,7 +42,7 @@ public function test_minus() : void public function test_modulo() : void { - $row = Row::create(int_entry('a', 110), int_entry('b', 100)); + $row = row(int_entry('a', 110), int_entry('b', 100)); $this->assertSame( 10, @@ -54,13 +54,13 @@ public function test_multiple_operations() : void { $this->assertSame( 200, - ref('a')->plus(lit(100))->plus(lit(100))->minus(ref('b'))->eval(Row::create(int_entry('a', 100), int_entry('b', 100))) + ref('a')->plus(lit(100))->plus(lit(100))->minus(ref('b'))->eval(row(int_entry('a', 100), int_entry('b', 100))) ); } public function test_multiply() : void { - $row = Row::create(int_entry('a', 100), int_entry('b', 100)); + $row = row(int_entry('a', 100), int_entry('b', 100)); $this->assertSame( 10_000, @@ -70,7 +70,7 @@ public function test_multiply() : void public function test_plus() : void { - $row = Row::create(int_entry('a', 100), int_entry('b', 100)); + $row = row(int_entry('a', 100), int_entry('b', 100)); $this->assertSame( 200, @@ -80,7 +80,7 @@ public function test_plus() : void public function test_power() : void { - $row = Row::create(int_entry('a', 1), int_entry('b', 2)); + $row = row(int_entry('a', 1), int_entry('b', 2)); $this->assertSame( 1, @@ -90,7 +90,7 @@ public function test_power() : void public function test_round() : void { - $row = Row::create(float_entry('a', 1.009), int_entry('b', 2)); + $row = row(float_entry('a', 1.009), int_entry('b', 2)); $this->assertSame( 1.01, diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/ScalarFunctionsTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/ScalarFunctionsTest.php deleted file mode 100644 index 26a0d32c6..000000000 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/ScalarFunctionsTest.php +++ /dev/null @@ -1,66 +0,0 @@ -assertTrue( - (new ScalarFunctions( - ref('entry'), - new Cast(ref('entry'), 'string'), - new Equals(ref('entry'), lit(1)) - ))->eval(Row::create( - int_entry('entry', 1), - )) - ); - } - - public function test_evaluating_multiple_literal_functions() : void - { - $this->assertSame( - 'value3', - (new ScalarFunctions( - lit('value1'), - lit('value2'), - lit('value3'), - ))->eval(Row::create()) - ); - } - - public function test_evaluation_cast_expression() : void - { - $this->assertSame( - 1, - (new ScalarFunctions(new Cast(ref('entry'), 'int')))->eval(Row::create(str_entry('entry', '1'))) - ); - } - - public function test_evaluation_empty_expression() : void - { - $this->assertNull( - (new ScalarFunctions())->eval(Row::create(str_entry('entry', 'value'))) - ); - } - - public function test_evaluation_equals_expression() : void - { - $this->assertTrue( - (new ScalarFunctions(ref('entry')->equals(lit('1')))) - ->eval(Row::create(str_entry('entry', '1'))) - ); - } -} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/PartitionTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/PartitionTest.php index 236b66edf..43a631e04 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/PartitionTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/PartitionTest.php @@ -45,7 +45,7 @@ public function test_creating_partitions_from_uri_with_partitions() : void new Partition('country', 'US'), new Partition('age-range', '20-45'), ], - $partitions + $partitions->toArray() ); } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/Optimizer/LimitOptimizationTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/Optimizer/LimitOptimizationTest.php index a4b2fce53..f14ca96b3 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/Optimizer/LimitOptimizationTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/Optimizer/LimitOptimizationTest.php @@ -26,7 +26,7 @@ public function test_optimization_against_pipelines() : void (new LimitOptimization())->isFor(new LimitTransformer(10), new GroupByPipeline(new GroupBy(), new SynchronousPipeline())) ); $this->assertFalse( - (new LimitOptimization())->isFor(new LimitTransformer(10), new PartitioningPipeline(new SynchronousPipeline())) + (new LimitOptimization())->isFor(new LimitTransformer(10), new PartitioningPipeline(new SynchronousPipeline(), [ref('group')])) ); // Pipeline without extractor $this->assertFalse( diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Stream/PathTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Stream/PathTest.php index a4d878455..62f273969 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Stream/PathTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Stream/PathTest.php @@ -7,6 +7,7 @@ use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Filesystem\Path; use Flow\ETL\Partition; +use Flow\ETL\Partitions; use PHPUnit\Framework\TestCase; final class PathTest extends TestCase @@ -42,11 +43,11 @@ public static function paths_pattern_matching() : \Generator public static function paths_with_partitions() : \Generator { - yield '/' => ['/', []]; - yield 'file://path/without/partitions/file.csv' => ['file://path/without/partitions/file.csv', []]; - yield 'file://path/country=US/file.csv' => ['file://path/country=US/file.csv', [new Partition('country', 'US')]]; - yield 'file://path/country=US/region=america/file.csv' => ['file://path/country=US/region=america/file.csv', [new Partition('country', 'US'), new Partition('region', 'america')]]; - yield 'file://path/country=*/file.csv' => ['file://path/country=*/file.csv', []]; + yield '/' => ['/', new Partitions()]; + yield 'file://path/without/partitions/file.csv' => ['file://path/without/partitions/file.csv', new Partitions()]; + yield 'file://path/country=US/file.csv' => ['file://path/country=US/file.csv', new Partitions(new Partition('country', 'US'))]; + yield 'file://path/country=US/region=america/file.csv' => ['file://path/country=US/region=america/file.csv', new Partitions(new Partition('country', 'US'), new Partition('region', 'america'))]; + yield 'file://path/country=*/file.csv' => ['file://path/country=*/file.csv', new Partitions()]; } public static function paths_with_static_parts() : \Generator @@ -145,7 +146,7 @@ public function test_parsing_path(string $uri, string $schema, string $parsedUri /** * @dataProvider paths_with_partitions */ - public function test_partitions_in_path(string $uri, array $partitions) : void + public function test_partitions_in_path(string $uri, Partitions $partitions) : void { $this->assertEquals($partitions, (new Path($uri))->partitions()); }