From 8549dbc58d0e79881ce8b17768dfbb477d30144c Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Thu, 25 Apr 2024 17:09:13 +0200 Subject: [PATCH] Replaced nested loop join with hash join algorithm --- UPGRADE.md | 18 + src/core/etl/src/Flow/ETL/DSL/functions.php | 29 -- src/core/etl/src/Flow/ETL/DataFrame.php | 24 +- .../Exception/DuplicatedEntriesException.php | 9 + .../src/Flow/ETL/Exception/JoinException.php | 9 + src/core/etl/src/Flow/ETL/Hash/Algorithm.php | 10 + .../etl/src/Flow/ETL/Hash/NativePHPHash.php | 20 + src/core/etl/src/Flow/ETL/Hash/PlainText.php | 13 + .../Flow/ETL/Join/Comparison/GreaterThan.php | 39 -- .../ETL/Join/Comparison/GreaterThanEqual.php | 39 -- .../src/Flow/ETL/Join/Comparison/LessThan.php | 39 -- .../ETL/Join/Comparison/LessThanEqual.php | 39 -- src/core/etl/src/Flow/ETL/Join/Expression.php | 6 +- .../Flow/ETL/Pipeline/BatchingPipeline.php | 2 +- .../src/Flow/ETL/Pipeline/HashJoin/Bucket.php | 83 ++++ .../Flow/ETL/Pipeline/HashJoin/HashTable.php | 77 ++++ .../Flow/ETL/Pipeline/HashJoinPipeline.php | 152 +++++++ src/core/etl/src/Flow/ETL/Row/Entries.php | 4 +- .../etl/src/Flow/ETL/Row/EntryFactory.php | 3 +- .../ETL/Row/Factory/NativeEntryFactory.php | 12 +- src/core/etl/src/Flow/ETL/Rows.php | 78 ++-- .../ETL/Transformer/JoinRowsTransformer.php | 59 --- .../Integration/DataFrame/JoinCrossTest.php | 54 +++ .../Integration/DataFrame/JoinEachTest.php | 65 +++ .../Tests/Integration/DataFrame/JoinTest.php | 291 ++++++++---- .../Unit/Join/Comparison/GreaterTest.php | 39 -- .../Join/Comparison/GreaterThanEqualTest.php | 40 -- .../Tests/Unit/Join/Comparison/LessTest.php | 39 -- .../Join/Comparison/LessThanEqualTest.php | 40 -- .../Tests/Unit/Join/Comparison/NotTest.php | 33 -- .../Unit/Pipeline/HashJoin/HashTableTest.php | 48 ++ .../Flow/ETL/Tests/Unit/RowsJoinTest.php | 417 +++++++++--------- .../JoinEachRowsTransformerTest.php | 86 ++-- .../Transformer/JoinRowsTransformerTest.php | 94 ---- 34 files changed, 1083 insertions(+), 927 deletions(-) create mode 100644 src/core/etl/src/Flow/ETL/Exception/DuplicatedEntriesException.php create mode 100644 src/core/etl/src/Flow/ETL/Exception/JoinException.php create mode 100644 src/core/etl/src/Flow/ETL/Hash/Algorithm.php create mode 100644 src/core/etl/src/Flow/ETL/Hash/NativePHPHash.php create mode 100644 src/core/etl/src/Flow/ETL/Hash/PlainText.php delete mode 100644 src/core/etl/src/Flow/ETL/Join/Comparison/GreaterThan.php delete mode 100644 src/core/etl/src/Flow/ETL/Join/Comparison/GreaterThanEqual.php delete mode 100644 src/core/etl/src/Flow/ETL/Join/Comparison/LessThan.php delete mode 100644 src/core/etl/src/Flow/ETL/Join/Comparison/LessThanEqual.php create mode 100644 src/core/etl/src/Flow/ETL/Pipeline/HashJoin/Bucket.php create mode 100644 src/core/etl/src/Flow/ETL/Pipeline/HashJoin/HashTable.php create mode 100644 src/core/etl/src/Flow/ETL/Pipeline/HashJoinPipeline.php delete mode 100644 src/core/etl/src/Flow/ETL/Transformer/JoinRowsTransformer.php create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinCrossTest.php create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinEachTest.php delete mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/GreaterTest.php delete mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/GreaterThanEqualTest.php delete mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/LessTest.php delete mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/LessThanEqualTest.php delete mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/NotTest.php create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/HashJoin/HashTableTest.php delete mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/JoinRowsTransformerTest.php diff --git a/UPGRADE.md b/UPGRADE.md index 33a2ec42e..6eaaae6eb 100644 --- a/UPGRADE.md +++ b/UPGRADE.md @@ -5,6 +5,24 @@ Please follow the instructions for your specific version to ensure a smooth upgr --- +## Upgrading from 0.7.x to 0.8.x + +### 1) Joins + +In order to support joining bigger datasets, we had to move from initial NestedLoop join algorithm into Hash Join algorithm. + +- the only supported coin expression is `=` (equals) that can be grouped with `AND` and `OR` operators. +- `joinPrefix` is now always required, and by default is set to 'joined_' +- join will always result all columns from both datasets, columns used in join condition will be prefixed with `joinPrefix`. + +Other than that, API stays the same. + +Above changes were introduced in all 3 types of joins: + +- `DataFrame::join()` +- `DataFrame::joinEach()` +- `DataFrame::crossJoin()` + ## Upgrading from 0.6.x to 0.7.x ### 1) DataFrame::appendSafe() method was removed diff --git a/src/core/etl/src/Flow/ETL/DSL/functions.php b/src/core/etl/src/Flow/ETL/DSL/functions.php index 06d66f928..64f13f968 100644 --- a/src/core/etl/src/Flow/ETL/DSL/functions.php +++ b/src/core/etl/src/Flow/ETL/DSL/functions.php @@ -36,11 +36,7 @@ Formatter, Join\Comparison, Join\Comparison\Equal, - Join\Comparison\GreaterThan, - Join\Comparison\GreaterThanEqual, Join\Comparison\Identical, - Join\Comparison\LessThan, - Join\Comparison\LessThanEqual, Join\Expression, Loader, Partition, @@ -1163,31 +1159,6 @@ function compare_any(Comparison ...$comparisons) : Comparison\Any return new Comparison\Any(...$comparisons); } -function greater_than(Reference|string $left, Reference|string $right) : GreaterThan -{ - return new GreaterThan($left, $right); -} - -function greater_than_equal(Reference|string $left, Reference|string $right) : GreaterThanEqual -{ - return new GreaterThanEqual($left, $right); -} - -function less_than(Reference|string $left, Reference|string $right) : LessThan -{ - return new LessThan($left, $right); -} - -function less_than_equal(Reference|string $left, Reference|string $right) : LessThanEqual -{ - return new LessThanEqual($left, $right); -} - -function negation(Comparison $comparison) : Comparison\Not -{ - return new Comparison\Not($comparison); -} - function join_on(array|Comparison $comparisons, string $joinPrefix = '') : Expression { return Expression::on($comparisons, $joinPrefix); diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index 2178f9216..40e8de309 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -17,10 +17,16 @@ use Flow\ETL\Loader\StreamLoader\Output; use Flow\ETL\Partition\ScalarFunctionFilter; use Flow\ETL\PHP\Type\{AutoCaster, Caster}; -use Flow\ETL\Pipeline\{BatchingPipeline, CachingPipeline, CollectingPipeline, GroupByPipeline, PartitioningPipeline, VoidPipeline}; +use Flow\ETL\Pipeline\{BatchingPipeline, + CachingPipeline, + CollectingPipeline, + GroupByPipeline, + HashJoinPipeline, + PartitioningPipeline, + VoidPipeline}; use Flow\ETL\Row\{Reference, References, Schema}; use Flow\ETL\Transformer\StyleConverter\StringStyles; -use Flow\ETL\Transformer\{AutoCastTransformer, CallbackRowTransformer, CrossJoinRowsTransformer, DropDuplicatesTransformer, DropPartitionsTransformer, EntryNameStyleConverterTransformer, JoinEachRowsTransformer, JoinRowsTransformer, KeepEntriesTransformer, LimitTransformer, RemoveEntriesTransformer, RenameAllCaseTransformer, RenameEntryTransformer, RenameStrReplaceAllEntriesTransformer, ScalarFunctionFilterTransformer, ScalarFunctionTransformer, UntilTransformer, WindowFunctionTransformer}; +use Flow\ETL\Transformer\{AutoCastTransformer, CallbackRowTransformer, CrossJoinRowsTransformer, DropDuplicatesTransformer, DropPartitionsTransformer, EntryNameStyleConverterTransformer, JoinEachRowsTransformer, KeepEntriesTransformer, LimitTransformer, RemoveEntriesTransformer, RenameAllCaseTransformer, RenameEntryTransformer, RenameStrReplaceAllEntriesTransformer, ScalarFunctionFilterTransformer, ScalarFunctionTransformer, UntilTransformer, WindowFunctionTransformer}; use Flow\RDSL\AccessControl\{AllowAll, AllowList, DenyAll}; use Flow\RDSL\Attribute\DSLMethod; use Flow\RDSL\{Builder, DSLNamespace, Executor, Finder}; @@ -452,19 +458,11 @@ public function groupBy(string|Reference ...$entries) : GroupedDataFrame */ public function join(self $dataFrame, Expression $on, string|Join $type = Join::left) : self { - if ($type instanceof Join) { - $type = $type->name; + if (\is_string($type)) { + $type = Join::from($type); } - $transformer = match ($type) { - Join::left->value => JoinRowsTransformer::left($dataFrame, $on), - Join::left_anti->value => JoinRowsTransformer::leftAnti($dataFrame, $on), - Join::right->value => JoinRowsTransformer::right($dataFrame, $on), - Join::inner->value => JoinRowsTransformer::inner($dataFrame, $on), - default => throw new InvalidArgumentException('Unsupported join type') - }; - - $this->pipeline->add($transformer); + $this->pipeline = new HashJoinPipeline($this->pipeline, $dataFrame, $on, $type); return $this; } diff --git a/src/core/etl/src/Flow/ETL/Exception/DuplicatedEntriesException.php b/src/core/etl/src/Flow/ETL/Exception/DuplicatedEntriesException.php new file mode 100644 index 000000000..f72a2c3f0 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Exception/DuplicatedEntriesException.php @@ -0,0 +1,9 @@ +algorithm, $value, $this->binary, $this->options); + } +} diff --git a/src/core/etl/src/Flow/ETL/Hash/PlainText.php b/src/core/etl/src/Flow/ETL/Hash/PlainText.php new file mode 100644 index 000000000..a96a94952 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Hash/PlainText.php @@ -0,0 +1,13 @@ +valueOf($this->entryLeft) > $right->valueOf($this->entryRight); - } - - /** - * @return array - */ - public function left() : array - { - return [\is_string($this->entryLeft) ? EntryReference::init($this->entryLeft) : $this->entryLeft]; - } - - /** - * @return array - */ - public function right() : array - { - return [\is_string($this->entryRight) ? EntryReference::init($this->entryRight) : $this->entryRight]; - } -} diff --git a/src/core/etl/src/Flow/ETL/Join/Comparison/GreaterThanEqual.php b/src/core/etl/src/Flow/ETL/Join/Comparison/GreaterThanEqual.php deleted file mode 100644 index 109c8f087..000000000 --- a/src/core/etl/src/Flow/ETL/Join/Comparison/GreaterThanEqual.php +++ /dev/null @@ -1,39 +0,0 @@ -valueOf($this->entryLeft) >= $right->valueOf($this->entryRight); - } - - /** - * @return array - */ - public function left() : array - { - return [\is_string($this->entryLeft) ? EntryReference::init($this->entryLeft) : $this->entryLeft]; - } - - /** - * @return array - */ - public function right() : array - { - return [\is_string($this->entryRight) ? EntryReference::init($this->entryRight) : $this->entryRight]; - } -} diff --git a/src/core/etl/src/Flow/ETL/Join/Comparison/LessThan.php b/src/core/etl/src/Flow/ETL/Join/Comparison/LessThan.php deleted file mode 100644 index d3038de78..000000000 --- a/src/core/etl/src/Flow/ETL/Join/Comparison/LessThan.php +++ /dev/null @@ -1,39 +0,0 @@ -valueOf($this->entryLeft) < $right->valueOf($this->entryRight); - } - - /** - * @return array - */ - public function left() : array - { - return [\is_string($this->entryLeft) ? EntryReference::init($this->entryLeft) : $this->entryLeft]; - } - - /** - * @return array - */ - public function right() : array - { - return [\is_string($this->entryRight) ? EntryReference::init($this->entryRight) : $this->entryRight]; - } -} diff --git a/src/core/etl/src/Flow/ETL/Join/Comparison/LessThanEqual.php b/src/core/etl/src/Flow/ETL/Join/Comparison/LessThanEqual.php deleted file mode 100644 index 073effdbf..000000000 --- a/src/core/etl/src/Flow/ETL/Join/Comparison/LessThanEqual.php +++ /dev/null @@ -1,39 +0,0 @@ -valueOf($this->entryLeft) <= $right->valueOf($this->entryRight); - } - - /** - * @return array - */ - public function left() : array - { - return [\is_string($this->entryLeft) ? EntryReference::init($this->entryLeft) : $this->entryLeft]; - } - - /** - * @return array - */ - public function right() : array - { - return [\is_string($this->entryRight) ? EntryReference::init($this->entryRight) : $this->entryRight]; - } -} diff --git a/src/core/etl/src/Flow/ETL/Join/Expression.php b/src/core/etl/src/Flow/ETL/Join/Expression.php index b618a06b0..4519ef321 100644 --- a/src/core/etl/src/Flow/ETL/Join/Expression.php +++ b/src/core/etl/src/Flow/ETL/Join/Expression.php @@ -5,7 +5,7 @@ namespace Flow\ETL\Join; use Flow\ETL\Exception\RuntimeException; -use Flow\ETL\Join\Comparison\{All, Identical}; +use Flow\ETL\Join\Comparison\{All, Equal}; use Flow\ETL\Row; use Flow\ETL\Row\Reference; @@ -20,7 +20,7 @@ public function __construct( /** * @param array|array|Comparison $comparison */ - public static function on(array|Comparison $comparison, string $joinPrefix = '') : self + public static function on(array|Comparison $comparison, string $joinPrefix = 'joined_') : self { if (\is_array($comparison)) { /** @var array $comparisons */ @@ -41,7 +41,7 @@ public static function on(array|Comparison $comparison, string $joinPrefix = '') throw new RuntimeException('Expected right entry name to be string, got ' . \gettype($right) . ". Example: ['id' => 'id']"); } - $comparisons[] = new Identical($left, $right); + $comparisons[] = new Equal($left, $right); } return new self(new All(...$comparisons), $joinPrefix); diff --git a/src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php b/src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php index e2afd9c77..043758132 100644 --- a/src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php +++ b/src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php @@ -20,7 +20,7 @@ final class BatchingPipeline implements OverridingPipeline, Pipeline */ public function __construct(private readonly Pipeline $pipeline, private readonly int $size) { - $this->nextPipeline = $pipeline->cleanCopy(); + $this->nextPipeline = new SynchronousPipeline(); if ($this->size <= 0) { throw new InvalidArgumentException('Batch size must be greater than 0, given: ' . $this->size); diff --git a/src/core/etl/src/Flow/ETL/Pipeline/HashJoin/Bucket.php b/src/core/etl/src/Flow/ETL/Pipeline/HashJoin/Bucket.php new file mode 100644 index 000000000..7535029e0 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Pipeline/HashJoin/Bucket.php @@ -0,0 +1,83 @@ + + */ + private array $rowsArray; + + /** + * @var array + */ + private array $rowsMatches = []; + + /** + * @param string $hash - hash of the bucket calculated from join expression columns and row + */ + public function __construct(public readonly string $hash) + { + $this->rowsArray = []; + $this->rows = null; + } + + public function add(Row $row) : void + { + $this->rowsArray[$rowHash = $row->hash()] = $row; + $this->rowsMatches[$rowHash] = 0; + $this->rows = null; + } + + public function count() : int + { + return \count($this->rowsArray); + } + + public function findMatch(Row $row, Expression $expression) : ?Row + { + foreach ($this->rowsArray as $hash => $bucketRow) { + if ($expression->meet($row, $bucketRow)) { + $this->rowsMatches[$hash]++; + + return $bucketRow; + } + } + + return null; + } + + public function rows() : Rows + { + if ($this->rows === null) { + $this->rows = rows(...$this->rowsArray); + } + + return $this->rows; + } + + /** + * @return array + */ + public function unmatchedRows() : array + { + $unmatchedRows = []; + + foreach ($this->rowsArray as $hash => $bucketRow) { + if ($this->rowsMatches[$hash] === 0) { + $unmatchedRows[$hash] = $bucketRow; + } + } + + return $unmatchedRows; + } +} diff --git a/src/core/etl/src/Flow/ETL/Pipeline/HashJoin/HashTable.php b/src/core/etl/src/Flow/ETL/Pipeline/HashJoin/HashTable.php new file mode 100644 index 000000000..3defede07 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Pipeline/HashJoin/HashTable.php @@ -0,0 +1,77 @@ + + */ + private array $buckets; + + /** + * @var array + */ + private array $bucketsMatches; + + public function __construct(private readonly Algorithm $hashAlgorithm) + { + $this->buckets = []; + $this->bucketsMatches = []; + } + + public function add(Row $row, References $hashBy) : void + { + $hash = $this->hash($hashBy, $row); + + if (!\array_key_exists($hash, $this->buckets)) { + $this->buckets[$hash] = new Bucket($hash); + $this->bucketsMatches[$hash] = 0; + } + + $this->buckets[$hash]->add($row); + } + + public function bucketFor(Row $row, References $hashBy) : ?Bucket + { + $hash = $this->hash($hashBy, $row); + + if (!\array_key_exists($hash, $this->buckets)) { + return null; + } + + $this->bucketsMatches[$hash]++; + + return $this->buckets[$hash]; + } + + public function unmatchedRows() : Rows + { + $rows = []; + + foreach ($this->buckets as $hash => $bucket) { + if ($this->bucketsMatches[$hash] === 0) { + $rows = \array_merge($rows, $bucket->unmatchedRows()); + } + } + + return new Rows(...$rows); + } + + private function hash(References $hashBy, Row $row) : string + { + $value = ''; + + foreach ($hashBy->all() as $reference) { + $value .= $row->get($reference)->toString(); + } + + return $this->hashAlgorithm->hash($value); + } +} diff --git a/src/core/etl/src/Flow/ETL/Pipeline/HashJoinPipeline.php b/src/core/etl/src/Flow/ETL/Pipeline/HashJoinPipeline.php new file mode 100644 index 000000000..7ba9819a7 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Pipeline/HashJoinPipeline.php @@ -0,0 +1,152 @@ +extractor = from_rows(rows()); + } + + public function add(Loader|Transformer $pipe) : Pipeline + { + $this->left->pipes()->add($pipe); + + return $this; + } + + public function cleanCopy() : Pipeline + { + return new self($this->left->cleanCopy(), $this->right, $this->expression, $this->join); + } + + public function closure(FlowContext $context) : void + { + foreach ($this->left->pipes()->all() as $pipe) { + if ($pipe instanceof Loader && $pipe instanceof Closure) { + $pipe->closure($context); + } + } + } + + public function has(string $transformerClass) : bool + { + return $this->left->pipes()->has($transformerClass); + } + + public function pipes() : Pipes + { + return $this->left->pipes(); + } + + public function process(FlowContext $context) : \Generator + { + $leftReferences = refs(...$this->expression->left()); + $rightReferences = refs(...$this->expression->right()); + + $hashTable = new HashTable(new NativePHPHash()); + + $rightSchema = schema(); + + foreach ($this->right->getEach() as $rightRow) { + $hashTable->add($rightRow, $rightReferences); + $rightSchema = $rightSchema->merge($rightRow->schema()); + } + + /** @var array $leftEntries */ + $leftEntries = []; + /** @var array $rightEntries */ + $rightEntries = []; + + if ($this->join === Join::left) { + foreach ($rightSchema->definitions() as $rightEntryDefinition) { + $rightEntries[] = $context->entryFactory()->create($rightEntryDefinition->entry()->name(), null, $rightEntryDefinition->nullable()); + } + } + + $leftSchema = schema(); + + /** @var Rows $leftRows */ + foreach ($this->left->process($context) as $leftRows) { + foreach ($leftRows as $leftRow) { + $bucket = $hashTable->bucketFor($leftRow, $leftReferences); + + if ($bucket === null) { + if ($this->join === Join::left) { + $rightEmptyRow = row(...$rightEntries); + yield $this->createRows($leftRow, $rightEmptyRow); + } + + if ($this->join === Join::left_anti) { + yield rows($leftRow); + } + + continue; + } + + $rightRow = $bucket->findMatch($leftRow, $this->expression); + + if ($this->join === Join::left_anti) { + continue; + } + + if ($rightRow !== null) { + yield $this->createRows($leftRow, $rightRow); + } + } + + $leftSchema = $leftSchema->merge($leftRows->schema()); + } + + if ($this->join === Join::right) { + foreach ($leftSchema->definitions() as $leftEntryDefinition) { + $leftEntries[] = $context->entryFactory()->create($leftEntryDefinition->entry()->name(), null, $leftEntryDefinition->nullable()); + } + + foreach ($hashTable->unmatchedRows() as $unmatchedRow) { + $leftEmptyRow = row(...$leftEntries); + yield $this->createRows($leftEmptyRow, $unmatchedRow); + } + } + } + + public function setSource(Extractor $extractor) : Pipeline + { + $this->extractor = $extractor; + + return $this; + } + + public function source() : Extractor + { + return $this->extractor; + } + + private function createRows(Row $leftRow, Row $rightRow) : Rows + { + try { + return rows($leftRow->merge($rightRow, $this->expression->prefix())); + } catch (DuplicatedEntriesException $e) { + throw new JoinException($e->getMessage() . ' try to use a different join prefix than: "' . $this->expression->prefix() . '"', $e->getCode(), $e); + } + } +} diff --git a/src/core/etl/src/Flow/ETL/Row/Entries.php b/src/core/etl/src/Flow/ETL/Row/Entries.php index 8034b2dd1..f5cff6955 100644 --- a/src/core/etl/src/Flow/ETL/Row/Entries.php +++ b/src/core/etl/src/Flow/ETL/Row/Entries.php @@ -4,7 +4,7 @@ namespace Flow\ETL\Row; -use Flow\ETL\Exception\{InvalidArgumentException, InvalidLogicException, RuntimeException}; +use Flow\ETL\Exception\{DuplicatedEntriesException, InvalidArgumentException, InvalidLogicException, RuntimeException}; /** * @implements \ArrayAccess @@ -170,7 +170,7 @@ public function merge(self $entries) : self $newEntries = \array_merge($this->entries, $entries->entries); if (\count($newEntries) !== $this->count() + $entries->count()) { - throw InvalidArgumentException::because( + throw new DuplicatedEntriesException( \sprintf( 'Merged entries names must be unique, given: [%s] + [%s]', \implode(', ', \array_map(fn (Entry $entry) => $entry->name(), $this->entries)), diff --git a/src/core/etl/src/Flow/ETL/Row/EntryFactory.php b/src/core/etl/src/Flow/ETL/Row/EntryFactory.php index b893aca87..4fc4769e9 100644 --- a/src/core/etl/src/Flow/ETL/Row/EntryFactory.php +++ b/src/core/etl/src/Flow/ETL/Row/EntryFactory.php @@ -5,6 +5,7 @@ namespace Flow\ETL\Row; use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException, SchemaDefinitionNotFoundException}; +use Flow\ETL\Row\Schema\Definition; interface EntryFactory { @@ -13,5 +14,5 @@ interface EntryFactory * @throws RuntimeException * @throws SchemaDefinitionNotFoundException */ - public function create(string $entryName, mixed $value, ?Schema $schema = null) : Entry; + public function create(string $entryName, mixed $value, Schema|Definition|null $schema = null) : Entry; } diff --git a/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php b/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php index 72e7da7b2..8cbcf8670 100644 --- a/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php +++ b/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php @@ -10,7 +10,7 @@ use Flow\ETL\PHP\Type\Logical\{DateTimeType, JsonType, ListType, MapType, StructureType, UuidType, XMLNodeType, XMLType}; use Flow\ETL\PHP\Type\Native\{ArrayType, EnumType, ObjectType, ScalarType}; use Flow\ETL\PHP\Type\{Caster, TypeDetector}; -use Flow\ETL\Row\{Entry, EntryFactory, Schema}; +use Flow\ETL\Row\{Entry, EntryFactory, Schema, Schema\Definition}; use Ramsey\Uuid\UuidInterface; use Symfony\Component\Uid\Uuid; @@ -28,9 +28,13 @@ public function __construct() * @throws RuntimeException * @throws SchemaDefinitionNotFoundException */ - public function create(string $entryName, mixed $value, ?Schema $schema = null) : Entry + public function create(string $entryName, mixed $value, Schema|Definition|null $schema = null) : Entry { - if ($schema !== null) { + if ($schema instanceof Definition) { + return $this->fromDefinition($schema, $value); + } + + if ($schema instanceof Schema) { return $this->fromDefinition($schema->getDefinition($entryName), $value); } @@ -143,7 +147,7 @@ public function create(string $entryName, mixed $value, ?Schema $schema = null) throw new InvalidArgumentException("{$valueType->toString()} can't be converted to any known Entry"); } - private function fromDefinition(Schema\Definition $definition, mixed $value) : Entry + private function fromDefinition(Definition $definition, mixed $value) : Entry { $type = $definition->type(); diff --git a/src/core/etl/src/Flow/ETL/Rows.php b/src/core/etl/src/Flow/ETL/Rows.php index 4a174f58e..02b1a7a68 100644 --- a/src/core/etl/src/Flow/ETL/Rows.php +++ b/src/core/etl/src/Flow/ETL/Rows.php @@ -4,13 +4,13 @@ namespace Flow\ETL; -use function Flow\ETL\DSL\{array_to_rows, string_entry}; -use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException}; +use function Flow\ETL\DSL\{array_to_rows, row}; +use Flow\ETL\Exception\{DuplicatedEntriesException, InvalidArgumentException, RuntimeException}; use Flow\ETL\Join\Expression; use Flow\ETL\Partition\CartesianProduct; use Flow\ETL\Row\Comparator\NativeComparator; use Flow\ETL\Row\Factory\NativeEntryFactory; -use Flow\ETL\Row\{Comparator, Entries, EntryFactory, Entry\StringEntry, Reference, References, Schema, SortOrder}; +use Flow\ETL\Row\{Comparator, Entries, EntryFactory, Reference, References, Schema, SortOrder}; /** * @implements \ArrayAccess @@ -286,7 +286,7 @@ public function isPartitioned() : bool return \count($this->partitions) > 0; } - public function joinCross(self $right, string $joinPrefix = '') : self + public function joinCross(self $right, string $joinPrefix = 'joined_') : self { /** * @var array $joined @@ -331,11 +331,9 @@ public function joinInner(self $right, Expression $expression) : self foreach ($right as $rightRow) { if ($expression->meet($leftRow, $rightRow)) { try { - $joinedRow = $leftRow - ->merge($rightRow, $expression->prefix()) - ->remove(...\array_map(static fn (Reference $e) : string => $expression->prefix() . $e->name(), $expression->right())); - } catch (InvalidArgumentException $e) { - throw new InvalidArgumentException($e->getMessage() . '. Please consider using Condition, join prefix option'); + $joinedRow = $leftRow->merge($rightRow, $expression->prefix()); + } catch (DuplicatedEntriesException $e) { + throw new DuplicatedEntriesException($e->getMessage() . ' try to use a different join prefix than: "' . $expression->prefix() . '"'); } break; @@ -360,7 +358,7 @@ public function joinLeft(self $right, Expression $expression) : self */ $joined = []; - $rightSchema = $right->schema()->gracefulRemove(...$expression->right()); + $rightSchema = $right->schema(); foreach ($this->rows as $leftRow) { /** @var ?Row $joinedRow */ @@ -369,29 +367,28 @@ public function joinLeft(self $right, Expression $expression) : self foreach ($right as $rightRow) { if ($expression->meet($leftRow, $rightRow)) { try { - $joinedRow = $leftRow - ->merge($rightRow, $expression->prefix()) - ->remove(...\array_map(static fn (Reference $e) : string => $expression->prefix() . $e->name(), $expression->right())); - } catch (InvalidArgumentException $e) { - throw new InvalidArgumentException($e->getMessage() . '. Please consider using Condition, join prefix option'); + $joinedRow = $leftRow->merge($rightRow, $expression->prefix()); + } catch (DuplicatedEntriesException $e) { + throw new DuplicatedEntriesException($e->getMessage() . ' try to use a different join prefix than: "' . $expression->prefix() . '"'); } break; } } - $joined[] = $joinedRow ?: $leftRow->merge( - Row::create( - ...\array_map( - static fn (string $e) : StringEntry => string_entry($e, null), - \array_map( - static fn (Reference $r) : string => $r->name(), - $rightSchema->entries() - ) - ) - ), - $expression->prefix() - ); + if ($joinedRow === null) { + $entryFactory = new NativeEntryFactory(); + + $entries = []; + + foreach ($rightSchema->definitions() as $definition) { + $entries[] = $entryFactory->create($definition->entry()->name(), null, $definition->nullable()); + } + + $joinedRow = $leftRow->merge(row(...$entries), $expression->prefix()); + } + + $joined[] = $joinedRow; } return new self(...$joined); @@ -435,7 +432,7 @@ public function joinRight(self $right, Expression $expression) : self */ $joined = []; - $leftSchema = $this->schema()->gracefulRemove(...$expression->left()); + $leftSchema = $this->schema(); foreach ($right->rows as $rightRow) { /** @var ?Row $joinedRow */ @@ -444,13 +441,9 @@ public function joinRight(self $right, Expression $expression) : self foreach ($this->rows as $leftRow) { if ($expression->meet($leftRow, $rightRow)) { try { - $joinedRow = $rightRow - ->merge($leftRow, $expression->prefix()) - ->remove( - ...\array_map(static fn (Reference $e) : string => $expression->prefix() . $e->name(), $expression->left()) - ); - } catch (InvalidArgumentException $e) { - throw new InvalidArgumentException($e->getMessage() . '. Please consider using Condition, join prefix option'); + $joinedRow = $leftRow->merge($rightRow, $expression->prefix()); + } catch (DuplicatedEntriesException $e) { + throw new DuplicatedEntriesException($e->getMessage() . ' try to use a different join prefix than: "' . $expression->prefix() . '"'); } $joined[] = $joinedRow; @@ -458,12 +451,15 @@ public function joinRight(self $right, Expression $expression) : self } if ($joinedRow === null) { - $joined[] = $rightRow->merge( - Row::create( - ...\array_map(static fn (Reference $e) : StringEntry => string_entry($e->name(), null), $leftSchema->entries()) - ), - $expression->prefix() - ); + $entryFactory = new NativeEntryFactory(); + + $entries = []; + + foreach ($leftSchema->definitions() as $definition) { + $entries[] = $entryFactory->create($definition->entry()->name(), null, $definition->nullable()); + } + + $joined[] = row(...$entries)->merge($rightRow, $expression->prefix()); } } diff --git a/src/core/etl/src/Flow/ETL/Transformer/JoinRowsTransformer.php b/src/core/etl/src/Flow/ETL/Transformer/JoinRowsTransformer.php deleted file mode 100644 index d290e6f3d..000000000 --- a/src/core/etl/src/Flow/ETL/Transformer/JoinRowsTransformer.php +++ /dev/null @@ -1,59 +0,0 @@ -type) { - Join::left => $rows->joinLeft($this->rows(), $this->condition), - Join::left_anti => $rows->joinLeftAnti($this->rows(), $this->condition), - Join::right => $rows->joinRight($this->rows(), $this->condition), - default => $rows->joinInner($this->rows(), $this->condition), - }; - } - - private function rows() : Rows - { - if ($this->rows === null) { - $this->rows = $this->dataFrame->fetch(); - } - - return $this->rows; - } -} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinCrossTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinCrossTest.php new file mode 100644 index 000000000..42dc96a59 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinCrossTest.php @@ -0,0 +1,54 @@ +createMock(Loader::class); + $loader->expects(self::exactly(2)) + ->method('load'); + + $rows = df() + ->from(from_rows( + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'PL')), + row(int_entry('id', 4), str_entry('country', 'PL')), + ) + )) + ->batchSize(2) + ->crossJoin( + (new Flow())->process( + rows( + row(int_entry('num', 1), bool_entry('active', true)), + row(int_entry('num', 2), bool_entry('active', false)), + ) + ), + ) + ->write($loader) + ->fetch(); + + self::assertEquals( + [ + ['id' => 1, 'country' => 'PL', 'num' => 1, 'active' => true], + ['id' => 1, 'country' => 'PL', 'num' => 2, 'active' => false], + ['id' => 2, 'country' => 'PL', 'num' => 1, 'active' => true], + ['id' => 2, 'country' => 'PL', 'num' => 2, 'active' => false], + ['id' => 3, 'country' => 'PL', 'num' => 1, 'active' => true], + ['id' => 3, 'country' => 'PL', 'num' => 2, 'active' => false], + ['id' => 4, 'country' => 'PL', 'num' => 1, 'active' => true], + ['id' => 4, 'country' => 'PL', 'num' => 2, 'active' => false], + ], + $rows->toArray() + ); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinEachTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinEachTest.php new file mode 100644 index 000000000..c13a9b92c --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinEachTest.php @@ -0,0 +1,65 @@ +createMock(Loader::class); + $loader->expects(self::exactly(2)) + ->method('load'); + + $rows = df() + ->read(from_rows( + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'PL')), + row(int_entry('id', 4), str_entry('country', 'PL')), + row(int_entry('id', 5), str_entry('country', 'US')), + row(int_entry('id', 6), str_entry('country', 'US')), + row(int_entry('id', 7), str_entry('country', 'US')), + row(int_entry('id', 9), str_entry('country', 'US')), + ) + )) + ->batchSize(4) + ->joinEach( + new class implements DataFrameFactory { + public function from(Rows $rows) : DataFrame + { + return (new Flow())->process( + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + ) + ); + } + }, + Expression::on(['country' => 'code']), + ) + ->write($loader) + ->fetch(); + + self::assertEquals( + [ + ['id' => 1, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 2, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 3, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 4, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 5, 'country' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => 6, 'country' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => 7, 'country' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => 9, 'country' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ], + $rows->toArray() + ); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinTest.php index cf4fdbf0d..8941e0a2a 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/JoinTest.php @@ -4,14 +4,14 @@ namespace Flow\ETL\Tests\Integration\DataFrame; -use function Flow\ETL\DSL\{bool_entry, df, from_rows, int_entry, str_entry, string_entry}; +use function Flow\ETL\DSL\{datetime_entry, df, from_rows, int_entry, row, rows, str_entry}; use Flow\ETL\Join\Expression; use Flow\ETL\Tests\Integration\IntegrationTestCase; -use Flow\ETL\{DataFrame, DataFrameFactory, Flow, Loader, Row, Rows}; +use Flow\ETL\{Flow, Join\Join, Loader}; final class JoinTest extends IntegrationTestCase { - public function test_cross_join() : void + public function test_join_inner() : void { $loader = $this->createMock(Loader::class); $loader->expects(self::exactly(2)) @@ -19,41 +19,41 @@ public function test_cross_join() : void $rows = df() ->from(from_rows( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'PL')), - Row::create(int_entry('id', 4), str_entry('country', 'PL')), + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), + row(int_entry('id', 4), str_entry('country', 'UK')), + row(int_entry('id', 5), str_entry('country', 'GB')), ) )) - ->batchSize(2) - ->crossJoin( + ->batchSize(4) + ->join( (new Flow())->process( - new Rows( - Row::create(int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('num', 2), bool_entry('active', false)), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'FR'), str_entry('name', 'France')), + row(str_entry('code', 'CN'), str_entry('name', 'Canada')), ) ), + Expression::on(['country' => 'code']), + Join::inner ) ->write($loader) ->fetch(); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('id', 1), str_entry('country', 'PL'), int_entry('num', 2), bool_entry('active', false)), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), int_entry('num', 2), bool_entry('active', false)), - Row::create(int_entry('id', 3), str_entry('country', 'PL'), int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('id', 3), str_entry('country', 'PL'), int_entry('num', 2), bool_entry('active', false)), - Row::create(int_entry('id', 4), str_entry('country', 'PL'), int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('id', 4), str_entry('country', 'PL'), int_entry('num', 2), bool_entry('active', false)), - ), - $rows + [ + ['id' => 1, 'joined_name' => 'Poland', 'country' => 'PL', 'joined_code' => 'PL'], + ['id' => 2, 'joined_name' => 'United States', 'country' => 'US', 'joined_code' => 'US'], + ['id' => 3, 'joined_name' => 'France', 'country' => 'FR', 'joined_code' => 'FR'], + ], + $rows->toArray() ); } - public function test_join() : void + public function test_join_left() : void { $loader = $this->createMock(Loader::class); $loader->expects(self::exactly(2)) @@ -61,94 +61,221 @@ public function test_join() : void $rows = df() ->from(from_rows( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'PL')), - Row::create(int_entry('id', 4), str_entry('country', 'PL')), - Row::create(int_entry('id', 5), str_entry('country', 'US')), - Row::create(int_entry('id', 6), str_entry('country', 'US')), - Row::create(int_entry('id', 7), str_entry('country', 'US')), - Row::create(int_entry('id', 9), str_entry('country', 'US')), + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'PL')), + row(int_entry('id', 4), str_entry('country', 'PL')), + row(int_entry('id', 5), str_entry('country', 'US')), + row(int_entry('id', 6), str_entry('country', 'US')), + row(int_entry('id', 7), str_entry('country', 'US')), + row(int_entry('id', 9), str_entry('country', 'US')), ) )) ->batchSize(4) ->join( (new Flow())->process( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + ) + ), + Expression::on(['country' => 'code']), + ) + ->write($loader) + ->fetch(); + + self::assertEquals( + [ + ['id' => 1, 'country' => 'PL', 'joined_name' => 'Poland', 'joined_code' => 'PL'], + ['id' => 2, 'country' => 'PL', 'joined_name' => 'Poland', 'joined_code' => 'PL'], + ['id' => 3, 'country' => 'PL', 'joined_name' => 'Poland', 'joined_code' => 'PL'], + ['id' => 4, 'country' => 'PL', 'joined_name' => 'Poland', 'joined_code' => 'PL'], + ['id' => 5, 'country' => 'US', 'joined_name' => 'United States', 'joined_code' => 'US'], + ['id' => 6, 'country' => 'US', 'joined_name' => 'United States', 'joined_code' => 'US'], + ['id' => 7, 'country' => 'US', 'joined_name' => 'United States', 'joined_code' => 'US'], + ['id' => 9, 'country' => 'US', 'joined_name' => 'United States', 'joined_code' => 'US'], + ], + $rows->toArray() + ); + } + + public function test_join_left_anti() : void + { + $loader = $this->createMock(Loader::class); + $loader->expects(self::exactly(1)) + ->method('load'); + + $rows = df() + ->from(from_rows( + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), + row(int_entry('id', 5), str_entry('country', 'GB')), + row(int_entry('id', 7), str_entry('country', 'CN')), + ) + )) + ->join( + (new Flow())->process( + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'FR'), str_entry('name', 'France')), + row(str_entry('code', 'CA'), str_entry('name', 'Canada')), ) ), Expression::on(['country' => 'code']), + Join::left_anti ) + ->batchSize(2) ->write($loader) ->fetch(); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), string_entry('country', 'PL'), string_entry('name', 'Poland')), - Row::create(int_entry('id', 2), string_entry('country', 'PL'), string_entry('name', 'Poland')), - Row::create(int_entry('id', 3), string_entry('country', 'PL'), string_entry('name', 'Poland')), - Row::create(int_entry('id', 4), string_entry('country', 'PL'), string_entry('name', 'Poland')), - Row::create(int_entry('id', 5), string_entry('country', 'US'), string_entry('name', 'United States')), - Row::create(int_entry('id', 6), string_entry('country', 'US'), string_entry('name', 'United States')), - Row::create(int_entry('id', 7), string_entry('country', 'US'), string_entry('name', 'United States')), - Row::create(int_entry('id', 9), string_entry('country', 'US'), string_entry('name', 'United States')), - ), - $rows + [ + ['id' => 5, 'country' => 'GB'], + ['id' => 7, 'country' => 'CN'], + ], + $rows->toArray() ); } - public function test_join_each() : void + public function test_join_left_on_date_time_entry() : void { $loader = $this->createMock(Loader::class); $loader->expects(self::exactly(2)) ->method('load'); $rows = df() - ->read(from_rows( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'PL')), - Row::create(int_entry('id', 4), str_entry('country', 'PL')), - Row::create(int_entry('id', 5), str_entry('country', 'US')), - Row::create(int_entry('id', 6), str_entry('country', 'US')), - Row::create(int_entry('id', 7), str_entry('country', 'US')), - Row::create(int_entry('id', 9), str_entry('country', 'US')), + ->from(from_rows( + rows( + row(int_entry('id', 1), datetime_entry('date', new \DateTimeImmutable('2024-01-01 00:00:00'))), + row(int_entry('id', 2), datetime_entry('date', new \DateTimeImmutable('2024-01-01 00:00:00'))), + row(int_entry('id', 3), datetime_entry('date', new \DateTimeImmutable('2024-01-02 00:00:00'))), + row(int_entry('id', 4), datetime_entry('date', new \DateTimeImmutable('2024-01-03 00:00:00'))), + row(int_entry('id', 5), datetime_entry('date', new \DateTimeImmutable('2024-01-04 00:00:00'))), + row(int_entry('id', 6), datetime_entry('date', new \DateTimeImmutable('2024-01-04 00:00:00'))), + row(int_entry('id', 7), datetime_entry('date', new \DateTimeImmutable('2024-01-05 00:00:00'))), + row(int_entry('id', 9), datetime_entry('date', new \DateTimeImmutable('2024-01-05 00:00:00'))), ) )) ->batchSize(4) - ->joinEach( - new class implements DataFrameFactory { - public function from(Rows $rows) : DataFrame - { - return (new Flow())->process( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - ) - ); - } - }, + ->join( + (new Flow())->process( + rows( + row(datetime_entry('date', new \DateTimeImmutable('2024-01-01 00:00:00')), int_entry('events', 1)), + row(datetime_entry('date', new \DateTimeImmutable('2024-01-05 00:00:00')), int_entry('events', 5)), + ) + ), + Expression::on(['date' => 'date']), + Join::left + ) + ->write($loader) + ->fetch(); + + self::assertEquals( + [ + ['id' => 1, 'date' => new \DateTimeImmutable('2024-01-01 00:00:00'), 'joined_date' => new \DateTimeImmutable('2024-01-01 00:00:00'), 'joined_events' => 1], + ['id' => 2, 'date' => new \DateTimeImmutable('2024-01-01 00:00:00'), 'joined_date' => new \DateTimeImmutable('2024-01-01 00:00:00'), 'joined_events' => 1], + ['id' => 3, 'date' => new \DateTimeImmutable('2024-01-02 00:00:00'), 'joined_date' => null, 'joined_events' => null], + ['id' => 4, 'date' => new \DateTimeImmutable('2024-01-03 00:00:00'), 'joined_date' => null, 'joined_events' => null], + ['id' => 5, 'date' => new \DateTimeImmutable('2024-01-04 00:00:00'), 'joined_date' => null, 'joined_events' => null], + ['id' => 6, 'date' => new \DateTimeImmutable('2024-01-04 00:00:00'), 'joined_date' => null, 'joined_events' => null], + ['id' => 7, 'date' => new \DateTimeImmutable('2024-01-05 00:00:00'), 'joined_date' => new \DateTimeImmutable('2024-01-05 00:00:00'), 'joined_events' => 5], + ['id' => 9, 'date' => new \DateTimeImmutable('2024-01-05 00:00:00'), 'joined_date' => new \DateTimeImmutable('2024-01-05 00:00:00'), 'joined_events' => 5], + ], + $rows->toArray() + ); + } + + public function test_join_on_same_column_name() : void + { + $loader = $this->createMock(Loader::class); + $loader->expects(self::exactly(2)) + ->method('load'); + + $rows = df() + ->from(from_rows( + rows( + row(int_entry('id', 1), str_entry('code', 'PL')), + row(int_entry('id', 2), str_entry('code', 'PL')), + row(int_entry('id', 3), str_entry('code', 'PL')), + row(int_entry('id', 4), str_entry('code', 'PL')), + row(int_entry('id', 5), str_entry('code', 'US')), + row(int_entry('id', 6), str_entry('code', 'US')), + row(int_entry('id', 7), str_entry('code', 'US')), + row(int_entry('id', 9), str_entry('code', 'US')), + ) + )) + ->join( + (new Flow())->process( + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + ) + ), + Expression::on(['code' => 'code']), + ) + ->batchSize(4) + ->write($loader) + ->fetch(); + + self::assertEquals( + [ + ['id' => 1, 'code' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 2, 'code' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 3, 'code' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 4, 'code' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 5, 'code' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => 6, 'code' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => 7, 'code' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => 9, 'code' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ], + $rows->toArray() + ); + } + + public function test_join_right() : void + { + $loader = $this->createMock(Loader::class); + $loader->expects(self::exactly(2)) + ->method('load'); + + $rows = df() + ->from(from_rows( + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), + row(int_entry('id', 4), str_entry('country', 'UK')), + row(int_entry('id', 5), str_entry('country', 'GB')), + ) + )) + ->join( + (new Flow())->process( + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'FR'), str_entry('name', 'France')), + row(str_entry('code', 'CA'), str_entry('name', 'Canada')), + ) + ), Expression::on(['country' => 'code']), + Join::right ) + ->batchSize(2) ->write($loader) ->fetch(); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 3), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 4), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 5), str_entry('country', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 6), str_entry('country', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 7), str_entry('country', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 9), str_entry('country', 'US'), str_entry('name', 'United States')), - ), - $rows + [ + ['id' => 1, 'joined_code' => 'PL', 'joined_name' => 'Poland', 'country' => 'PL'], + ['id' => 2, 'joined_code' => 'US', 'joined_name' => 'United States', 'country' => 'US'], + ['id' => 3, 'joined_code' => 'FR', 'joined_name' => 'France', 'country' => 'FR'], + ['id' => null, 'joined_code' => 'CA', 'joined_name' => 'Canada', 'country' => null], + ], + $rows->toArray() ); } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/GreaterTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/GreaterTest.php deleted file mode 100644 index 8c497f810..000000000 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/GreaterTest.php +++ /dev/null @@ -1,39 +0,0 @@ -compare( - Row::create(int_entry('id', 1)), - Row::create(int_entry('id', 2)), - ) - ); - self::assertFalse( - (new GreaterThan('id', 'id'))->compare( - Row::create(int_entry('id', 1)), - Row::create(int_entry('id', 1)), - ) - ); - } - - public function test_success() : void - { - self::assertTrue( - (new GreaterThan('id', 'id'))->compare( - Row::create(int_entry('id', 5)), - Row::create(int_entry('id', 1)), - ) - ); - } -} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/GreaterThanEqualTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/GreaterThanEqualTest.php deleted file mode 100644 index 8016927fc..000000000 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/GreaterThanEqualTest.php +++ /dev/null @@ -1,40 +0,0 @@ -compare( - Row::create(int_entry('id', 1)), - Row::create(int_entry('id', 2)), - ) - ); - } - - public function test_success() : void - { - self::assertTrue( - (new GreaterThanEqual('id', 'id'))->compare( - Row::create(int_entry('id', 5)), - Row::create(int_entry('id', 1)), - ) - ); - - self::assertTrue( - (new GreaterThanEqual('id', 'id'))->compare( - Row::create(int_entry('id', 1)), - Row::create(int_entry('id', 1)), - ) - ); - } -} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/LessTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/LessTest.php deleted file mode 100644 index 24f1b3c6c..000000000 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/LessTest.php +++ /dev/null @@ -1,39 +0,0 @@ -compare( - Row::create(int_entry('id', 2)), - Row::create(int_entry('id', 1)), - ) - ); - self::assertFalse( - (new LessThan('id', 'id'))->compare( - Row::create(int_entry('id', 1)), - Row::create(int_entry('id', 1)), - ) - ); - } - - public function test_success() : void - { - self::assertTrue( - (new LessThan('id', 'id'))->compare( - Row::create(int_entry('id', 1)), - Row::create(int_entry('id', 5)), - ) - ); - } -} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/LessThanEqualTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/LessThanEqualTest.php deleted file mode 100644 index c632bac08..000000000 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/LessThanEqualTest.php +++ /dev/null @@ -1,40 +0,0 @@ -compare( - Row::create(int_entry('id', 2)), - Row::create(int_entry('id', 1)), - ) - ); - } - - public function test_success() : void - { - self::assertTrue( - (new LessThanEqual('id', 'id'))->compare( - Row::create(int_entry('id', 1)), - Row::create(int_entry('id', 5)), - ) - ); - - self::assertTrue( - (new LessThanEqual('id', 'id'))->compare( - Row::create(int_entry('id', 1)), - Row::create(int_entry('id', 1)), - ) - ); - } -} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/NotTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/NotTest.php deleted file mode 100644 index de0246c95..000000000 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Join/Comparison/NotTest.php +++ /dev/null @@ -1,33 +0,0 @@ -compare( - Row::create(datetime_entry('datetime', $datetime = new \DateTimeImmutable('2022-10-01 00:00:00'))), - Row::create(datetime_entry('datetime', $datetime)), - ) - ); - } - - public function test_success() : void - { - self::assertTrue( - (new Not(new Equal('datetime', 'datetime')))->compare( - Row::create(datetime_entry('datetime', new \DateTimeImmutable('2022-10-01 00:00:00'))), - Row::create(datetime_entry('datetime', new \DateTimeImmutable('2022-10-01 01:00:00'))), - ) - ); - } -} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/HashJoin/HashTableTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/HashJoin/HashTableTest.php new file mode 100644 index 000000000..4408b73e3 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/HashJoin/HashTableTest.php @@ -0,0 +1,48 @@ +add(row(int_entry('id', 1), str_entry('value', '1')), refs('id')); + $hashTable->add(row(int_entry('id', 1), str_entry('value', '2')), refs('id')); + $hashTable->add(row(int_entry('id', 1), str_entry('value', '3')), refs('id')); + + $hashTable->add(row(int_entry('id', 2)), refs('id')); + $hashTable->add(row(int_entry('id', 2)), refs('id')); + + $hashTable->add(row(int_entry('id', 3), str_entry('value', '1')), refs('id')); + $hashTable->add(row(int_entry('id', 3), str_entry('value', '2')), refs('id')); + $hashTable->add(row(int_entry('id', 3), str_entry('value', '1')), refs('id')); + + self::assertCount(3, $hashTable->bucketFor(row(int_entry('id', 1)), refs('id'))); + self::assertCount(1, $hashTable->bucketFor(row(int_entry('id', 2)), refs('id'))); + self::assertCount(2, $hashTable->bucketFor(row(int_entry('id', 3)), refs('id'))); + self::assertNull($hashTable->bucketFor(row(int_entry('id', 4)), refs('id'))); + } + + public function test_using_different_references_to_hash_row() : void + { + $hashTable = new HashTable(new PlainText()); + + $hashTable->add(row(int_entry('id', 1)), refs('id')); + $hashTable->add(row(int_entry('id', 1)), refs('id')); + + $hashTable->add(row(int_entry('id', 2), str_entry('value', '1')), refs('id')); + $hashTable->add(row(int_entry('id', 2), str_entry('value', '2')), refs('id')); + + self::assertCount(1, $hashTable->bucketFor(row(int_entry('identifier', 1)), refs('identifier'))); + self::assertCount(2, $hashTable->bucketFor(row(int_entry('identifier', 2)), refs('identifier'))); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/RowsJoinTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/RowsJoinTest.php index 6b24bc41e..d96297be4 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/RowsJoinTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/RowsJoinTest.php @@ -4,90 +4,90 @@ namespace Flow\ETL\Tests\Unit; -use function Flow\ETL\DSL\{bool_entry, int_entry, str_entry}; -use Flow\ETL\Exception\InvalidArgumentException; +use function Flow\ETL\DSL\{bool_entry, int_entry, row, rows, str_entry}; +use Flow\ETL\Exception\{DuplicatedEntriesException, InvalidArgumentException}; use Flow\ETL\Join\Expression; -use Flow\ETL\{Row, Rows}; use PHPUnit\Framework\TestCase; final class RowsJoinTest extends TestCase { public function test_cross_join() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ); $joined = $left->joinCross( - new Rows( - Row::create(int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('num', 2), bool_entry('active', false)), + rows( + row(int_entry('num', 1), bool_entry('active', true)), + row(int_entry('num', 2), bool_entry('active', false)), ), ); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('id', 1), str_entry('country', 'PL'), int_entry('num', 2), bool_entry('active', false)), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), int_entry('num', 2), bool_entry('active', false)), - Row::create(int_entry('id', 3), str_entry('country', 'US'), int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('id', 3), str_entry('country', 'US'), int_entry('num', 2), bool_entry('active', false)), - Row::create(int_entry('id', 4), str_entry('country', 'FR'), int_entry('num', 1), bool_entry('active', true)), - Row::create(int_entry('id', 4), str_entry('country', 'FR'), int_entry('num', 2), bool_entry('active', false)), - ), - $joined, + [ + ['id' => 1, 'country' => 'PL', 'joined_num' => 1, 'joined_active' => true], + ['id' => 1, 'country' => 'PL', 'joined_num' => 2, 'joined_active' => false], + ['id' => 2, 'country' => 'PL', 'joined_num' => 1, 'joined_active' => true], + ['id' => 2, 'country' => 'PL', 'joined_num' => 2, 'joined_active' => false], + ['id' => 3, 'country' => 'US', 'joined_num' => 1, 'joined_active' => true], + ['id' => 3, 'country' => 'US', 'joined_num' => 2, 'joined_active' => false], + ['id' => 4, 'country' => 'FR', 'joined_num' => 1, 'joined_active' => true], + ['id' => 4, 'country' => 'FR', 'joined_num' => 2, 'joined_active' => false], + + ], + $joined->toArray(), ); } public function test_cross_join_empty() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ); $joined = $left->joinCross( - new Rows(), + rows(), ); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), - ), - $joined + [ + ['id' => 1, 'country' => 'PL'], + ['id' => 2, 'country' => 'PL'], + ['id' => 3, 'country' => 'US'], + ['id' => 4, 'country' => 'FR'], + ], + $joined->toArray() ); } public function test_cross_join_left_empty() : void { - $left = new Rows(); + $left = rows(); $joined = $left->joinCross( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ), ); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), - ), - $joined + [ + ['id' => 1, 'country' => 'PL'], + ['id' => 2, 'country' => 'PL'], + ['id' => 3, 'country' => 'US'], + ['id' => 4, 'country' => 'FR'], + ], + $joined->toArray() ); } @@ -95,155 +95,156 @@ public function test_cross_join_left_with_name_conflict() : void { $this->expectExceptionMessage('Merged entries names must be unique, given: [id, country, active] + [active]. Please consider using join prefix option'); - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), bool_entry('active', false)), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), bool_entry('active', false)), - Row::create(int_entry('id', 3), str_entry('country', 'US'), bool_entry('active', false)), - Row::create(int_entry('id', 4), str_entry('country', 'FR'), bool_entry('active', false)), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL'), bool_entry('active', false)), + row(int_entry('id', 2), str_entry('country', 'PL'), bool_entry('active', false)), + row(int_entry('id', 3), str_entry('country', 'US'), bool_entry('active', false)), + row(int_entry('id', 4), str_entry('country', 'FR'), bool_entry('active', false)), ); $joined = $left->joinCross( - new Rows( - Row::create(bool_entry('active', true)) + rows( + row(bool_entry('active', true)) ), + '' ); } public function test_cross_join_left_with_name_conflict_with_prefix() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), bool_entry('active', false)), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), bool_entry('active', false)), - Row::create(int_entry('id', 3), str_entry('country', 'US'), bool_entry('active', false)), - Row::create(int_entry('id', 4), str_entry('country', 'FR'), bool_entry('active', false)), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL'), bool_entry('active', false)), + row(int_entry('id', 2), str_entry('country', 'PL'), bool_entry('active', false)), + row(int_entry('id', 3), str_entry('country', 'US'), bool_entry('active', false)), + row(int_entry('id', 4), str_entry('country', 'FR'), bool_entry('active', false)), ); $joined = $left->joinCross( - new Rows( - Row::create(bool_entry('active', true)) + rows( + row(bool_entry('active', true)) ), '_' ); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), bool_entry('active', false), bool_entry('_active', true)), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), bool_entry('active', false), bool_entry('_active', true)), - Row::create(int_entry('id', 3), str_entry('country', 'US'), bool_entry('active', false), bool_entry('_active', true)), - Row::create(int_entry('id', 4), str_entry('country', 'FR'), bool_entry('active', false), bool_entry('_active', true)), - ), - $joined + [ + ['id' => 1, 'country' => 'PL', 'active' => false, '_active' => true], + ['id' => 2, 'country' => 'PL', 'active' => false, '_active' => true], + ['id' => 3, 'country' => 'US', 'active' => false, '_active' => true], + ['id' => 4, 'country' => 'FR', 'active' => false, '_active' => true], + ], + $joined->toArray() ); } public function test_inner_empty() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ); $joined = $left->joinInner( - new Rows(), + rows(), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows(), + rows(), $joined ); } public function test_inner_join() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ); $joined = $left->joinInner( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 2), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 3), str_entry('country', 'US'), str_entry('name', 'United States')), - ), - $joined + [ + ['id' => 1, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 2, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 3, 'country' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ], + $joined->toArray() ); } public function test_inner_join_into_empty() : void { - $left = new Rows(); + $left = rows(); $joined = $left->joinInner( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows(), + rows(), $joined ); } public function test_inner_join_with_duplicated_entries() : void { - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('Merged entries names must be unique, given: [id, country] + [id, code, name]. Please consider using Condition, join prefix option'); + $this->expectException(DuplicatedEntriesException::class); + $this->expectExceptionMessage('Merged entries names must be unique, given: [id, country] + [id, code, name] try to use a different join prefix than: ""'); - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ); $left->joinInner( - new Rows( - Row::create(int_entry('id', 101), str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 102), str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 103), str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(int_entry('id', 101), str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(int_entry('id', 102), str_entry('code', 'US'), str_entry('name', 'United States')), + row(int_entry('id', 103), str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), - Expression::on(['country' => 'code']) + Expression::on(['country' => 'code'], joinPrefix: '') ); } public function test_left_anti_join() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ); $joined = $left->joinLeftAnti( - new Rows( - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'FR'), str_entry('name', 'France')), + rows( + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'FR'), str_entry('name', 'France')), ), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), ), $joined ); @@ -251,14 +252,14 @@ public function test_left_anti_join() : void public function test_left_anti_join_on_empty() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ); $joined = $left->joinLeftAnti( - new Rows(), + rows(), Expression::on(['country' => 'code']) ); @@ -270,49 +271,49 @@ public function test_left_anti_join_on_empty() : void public function test_left_join() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ); $joined = $left->joinLeft( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 2), str_entry('country', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 3), str_entry('country', 'FR'), str_entry('name', null)), - ), - $joined + [ + ['id' => 1, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 2, 'country' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => 3, 'country' => 'FR', 'joined_code' => null, 'joined_name' => null], + ], + $joined->toArray() ); } public function test_left_join_empty() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ); $joined = $left->joinLeft( - new Rows(), + rows(), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ), $joined ); @@ -320,19 +321,19 @@ public function test_left_join_empty() : void public function test_left_join_to_empty() : void { - $left = new Rows(); + $left = rows(); $joined = $left->joinLeft( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows(), + rows(), $joined ); } @@ -340,115 +341,115 @@ public function test_left_join_to_empty() : void public function test_left_join_with_the_duplicated_columns() : void { $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('Merged entries names must be unique, given: [id, country] + [id, code, name]. Please consider using Condition, join prefix option'); + $this->expectExceptionMessage('Merged entries names must be unique, given: [id, country] + [id, code, name] try to use a different join prefix than: ""'); - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ); $left->joinLeft( - new Rows( - Row::create(int_entry('id', 100), str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 101), str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 102), str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(int_entry('id', 100), str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(int_entry('id', 101), str_entry('code', 'US'), str_entry('name', 'United States')), + row(int_entry('id', 102), str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), - Expression::on(['country' => 'code']) + Expression::on(['country' => 'code'], '') ); } public function test_right_join() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ); $joined = $left->joinRight( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland'), int_entry('id', 1)), - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland'), int_entry('id', 2)), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States'), int_entry('id', 3)), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain'), str_entry('id', null)), - ), - $joined + [ + ['id' => 1, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 2, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 3, 'country' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => null, 'country' => null, 'joined_code' => 'GB', 'joined_name' => 'Great Britain'], + ], + $joined->toArray() ); } public function test_right_join_empty() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ); $joined = $left->joinRight( - new Rows(), + rows(), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows(), + rows(), $joined ); } public function test_right_join_to_empty() : void { - $left = new Rows(); + $left = rows(); $joined = $left->joinRight( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), Expression::on(['country' => 'code']) ); self::assertEquals( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), - ), - $joined + [ + ['joined_code' => 'PL', 'joined_name' => 'Poland'], + ['joined_code' => 'US', 'joined_name' => 'United States'], + ['joined_code' => 'GB', 'joined_name' => 'Great Britain'], + ], + $joined->toArray() ); } public function test_right_join_with_duplicated_entry_names() : void { - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('Merged entries names must be unique, given: [id, code, name] + [id, country]. Please consider using Condition, join prefix option'); + $this->expectException(DuplicatedEntriesException::class); + $this->expectExceptionMessage('erged entries names must be unique, given: [id, country] + [id, code, name] try to use a different join prefix than: ""'); - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'PL')), - Row::create(int_entry('id', 3), str_entry('country', 'US')), - Row::create(int_entry('id', 4), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'PL')), + row(int_entry('id', 3), str_entry('country', 'US')), + row(int_entry('id', 4), str_entry('country', 'FR')), ); $left->joinRight( - new Rows( - Row::create(int_entry('id', 101), str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 102), str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 103), str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(int_entry('id', 101), str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(int_entry('id', 102), str_entry('code', 'US'), str_entry('name', 'United States')), + row(int_entry('id', 103), str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ), - Expression::on(['country' => 'code']) + Expression::on(['country' => 'code'], '') ); } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/JoinEachRowsTransformerTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/JoinEachRowsTransformerTest.php index 346f010f4..2e6a06b09 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/JoinEachRowsTransformerTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/JoinEachRowsTransformerTest.php @@ -4,30 +4,30 @@ namespace Flow\ETL\Tests\Unit\Transformer; -use function Flow\ETL\DSL\{int_entry, str_entry}; +use function Flow\ETL\DSL\{int_entry, row, rows, str_entry}; use Flow\ETL\Join\Expression; use Flow\ETL\Transformer\JoinEachRowsTransformer; -use Flow\ETL\{Config, DataFrame, DataFrameFactory, Flow, FlowContext, Row, Rows}; +use Flow\ETL\{Config, DataFrame, DataFrameFactory, Flow, FlowContext, Rows}; use PHPUnit\Framework\TestCase; final class JoinEachRowsTransformerTest extends TestCase { public function test_inner_join_rows() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ); $right = new class implements DataFrameFactory { public function from(Rows $rows) : DataFrame { return (new Flow())->process( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ) ); } @@ -36,29 +36,29 @@ public function from(Rows $rows) : DataFrame $transformer = JoinEachRowsTransformer::inner($right, Expression::on(['country' => 'code'])); self::assertEquals( - new Rows( - Row::create(str_entry('name', 'Poland'), int_entry('id', 1), str_entry('country', 'PL')), - Row::create(str_entry('name', 'United States'), int_entry('id', 2), str_entry('country', 'US')), - ), - $transformer->transform($left, new FlowContext(Config::default())) + [ + ['id' => 1, 'country' => 'PL', 'joined_name' => 'Poland', 'joined_code' => 'PL'], + ['id' => 2, 'country' => 'US', 'joined_name' => 'United States', 'joined_code' => 'US'], + ], + $transformer->transform($left, new FlowContext(Config::default()))->toArray() ); } public function test_left_join_rows() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ); $right = new class implements DataFrameFactory { public function from(Rows $rows) : DataFrame { return (new Flow())->process( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ) ); } @@ -67,30 +67,30 @@ public function from(Rows $rows) : DataFrame $transformer = JoinEachRowsTransformer::left($right, Expression::on(['country' => 'code'])); self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 2), str_entry('country', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 3), str_entry('country', 'FR'), str_entry('name', null)), - ), - $transformer->transform($left, new FlowContext(Config::default())) + [ + ['id' => 1, 'country' => 'PL', 'joined_name' => 'Poland', 'joined_code' => 'PL'], + ['id' => 2, 'country' => 'US', 'joined_name' => 'United States', 'joined_code' => 'US'], + ['id' => 3, 'country' => 'FR', 'joined_name' => null, 'joined_code' => null], + ], + $transformer->transform($left, new FlowContext(Config::default()))->toArray() ); } public function test_right_join_rows() : void { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), + $left = rows( + row(int_entry('id', 1), str_entry('country', 'PL')), + row(int_entry('id', 2), str_entry('country', 'US')), + row(int_entry('id', 3), str_entry('country', 'FR')), ); $right = new class implements DataFrameFactory { public function from(Rows $rows) : DataFrame { return (new Flow())->process( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), + rows( + row(str_entry('code', 'PL'), str_entry('name', 'Poland')), + row(str_entry('code', 'US'), str_entry('name', 'United States')), + row(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), ) ); } @@ -99,12 +99,12 @@ public function from(Rows $rows) : DataFrame $transformer = JoinEachRowsTransformer::right($right, Expression::on(['country' => 'code'])); self::assertEquals( - new Rows( - Row::create(str_entry('name', 'Poland'), str_entry('code', 'PL'), int_entry('id', 1)), - Row::create(str_entry('name', 'United States'), str_entry('code', 'US'), int_entry('id', 2)), - Row::create(str_entry('name', 'Great Britain'), str_entry('code', 'GB'), str_entry('id', null)), - ), - $transformer->transform($left, new FlowContext(Config::default())) + [ + ['id' => 1, 'country' => 'PL', 'joined_code' => 'PL', 'joined_name' => 'Poland'], + ['id' => 2, 'country' => 'US', 'joined_code' => 'US', 'joined_name' => 'United States'], + ['id' => null, 'country' => null, 'joined_code' => 'GB', 'joined_name' => 'Great Britain'], + ], + $transformer->transform($left, new FlowContext(Config::default()))->toArray() ); } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/JoinRowsTransformerTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/JoinRowsTransformerTest.php deleted file mode 100644 index 6195cabe5..000000000 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/JoinRowsTransformerTest.php +++ /dev/null @@ -1,94 +0,0 @@ -process( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), - ) - ); - - $transformer = JoinRowsTransformer::inner($right, Expression::on(['country' => 'code'])); - - self::assertEquals( - new Rows( - Row::create(str_entry('name', 'Poland'), int_entry('id', 1), str_entry('country', 'PL')), - Row::create(str_entry('name', 'United States'), int_entry('id', 2), str_entry('country', 'US')), - ), - $transformer->transform($left, new FlowContext(Config::default())) - ); - } - - public function test_left_join_rows() : void - { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), - ); - $right = (new Flow())->process( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), - ) - ); - - $transformer = JoinRowsTransformer::left($right, Expression::on(['country' => 'code'])); - - self::assertEquals( - new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL'), str_entry('name', 'Poland')), - Row::create(int_entry('id', 2), str_entry('country', 'US'), str_entry('name', 'United States')), - Row::create(int_entry('id', 3), str_entry('country', 'FR'), str_entry('name', null)), - ), - $transformer->transform($left, new FlowContext(Config::default())) - ); - } - - public function test_right_join_rows() : void - { - $left = new Rows( - Row::create(int_entry('id', 1), str_entry('country', 'PL')), - Row::create(int_entry('id', 2), str_entry('country', 'US')), - Row::create(int_entry('id', 3), str_entry('country', 'FR')), - ); - $right = (new Flow())->process( - new Rows( - Row::create(str_entry('code', 'PL'), str_entry('name', 'Poland')), - Row::create(str_entry('code', 'US'), str_entry('name', 'United States')), - Row::create(str_entry('code', 'GB'), str_entry('name', 'Great Britain')), - ) - ); - - $transformer = JoinRowsTransformer::right($right, Expression::on(['country' => 'code'])); - - self::assertEquals( - new Rows( - Row::create(str_entry('name', 'Poland'), str_entry('code', 'PL'), int_entry('id', 1)), - Row::create(str_entry('name', 'United States'), str_entry('code', 'US'), int_entry('id', 2)), - Row::create(str_entry('name', 'Great Britain'), str_entry('code', 'GB'), str_entry('id', null)), - ), - $transformer->transform($left, new FlowContext(Config::default())) - ); - } -}