Skip to content

Commit

Permalink
Replaced nested loop join with hash join algorithm (#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Apr 25, 2024
1 parent 3958083 commit f742666
Show file tree
Hide file tree
Showing 34 changed files with 1,083 additions and 927 deletions.
18 changes: 18 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@ Please follow the instructions for your specific version to ensure a smooth upgr

---

## Upgrading from 0.7.x to 0.8.x

### 1) Joins

In order to support joining bigger datasets, we had to move from initial NestedLoop join algorithm into Hash Join algorithm.

- the only supported coin expression is `=` (equals) that can be grouped with `AND` and `OR` operators.
- `joinPrefix` is now always required, and by default is set to 'joined_'
- join will always result all columns from both datasets, columns used in join condition will be prefixed with `joinPrefix`.

Other than that, API stays the same.

Above changes were introduced in all 3 types of joins:

- `DataFrame::join()`
- `DataFrame::joinEach()`
- `DataFrame::crossJoin()`

## Upgrading from 0.6.x to 0.7.x

### 1) DataFrame::appendSafe() method was removed
Expand Down
29 changes: 0 additions & 29 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@
Formatter,
Join\Comparison,
Join\Comparison\Equal,
Join\Comparison\GreaterThan,
Join\Comparison\GreaterThanEqual,
Join\Comparison\Identical,
Join\Comparison\LessThan,
Join\Comparison\LessThanEqual,
Join\Expression,
Loader,
Partition,
Expand Down Expand Up @@ -1163,31 +1159,6 @@ function compare_any(Comparison ...$comparisons) : Comparison\Any
return new Comparison\Any(...$comparisons);
}

function greater_than(Reference|string $left, Reference|string $right) : GreaterThan
{
return new GreaterThan($left, $right);
}

function greater_than_equal(Reference|string $left, Reference|string $right) : GreaterThanEqual
{
return new GreaterThanEqual($left, $right);
}

function less_than(Reference|string $left, Reference|string $right) : LessThan
{
return new LessThan($left, $right);
}

function less_than_equal(Reference|string $left, Reference|string $right) : LessThanEqual
{
return new LessThanEqual($left, $right);
}

function negation(Comparison $comparison) : Comparison\Not
{
return new Comparison\Not($comparison);
}

function join_on(array|Comparison $comparisons, string $joinPrefix = '') : Expression
{
return Expression::on($comparisons, $joinPrefix);
Expand Down
24 changes: 11 additions & 13 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
use Flow\ETL\Loader\StreamLoader\Output;
use Flow\ETL\Partition\ScalarFunctionFilter;
use Flow\ETL\PHP\Type\{AutoCaster, Caster};
use Flow\ETL\Pipeline\{BatchingPipeline, CachingPipeline, CollectingPipeline, GroupByPipeline, PartitioningPipeline, VoidPipeline};
use Flow\ETL\Pipeline\{BatchingPipeline,
CachingPipeline,
CollectingPipeline,
GroupByPipeline,
HashJoinPipeline,
PartitioningPipeline,
VoidPipeline};
use Flow\ETL\Row\{Reference, References, Schema};
use Flow\ETL\Transformer\StyleConverter\StringStyles;
use Flow\ETL\Transformer\{AutoCastTransformer, CallbackRowTransformer, CrossJoinRowsTransformer, DropDuplicatesTransformer, DropPartitionsTransformer, EntryNameStyleConverterTransformer, JoinEachRowsTransformer, JoinRowsTransformer, KeepEntriesTransformer, LimitTransformer, RemoveEntriesTransformer, RenameAllCaseTransformer, RenameEntryTransformer, RenameStrReplaceAllEntriesTransformer, ScalarFunctionFilterTransformer, ScalarFunctionTransformer, UntilTransformer, WindowFunctionTransformer};
use Flow\ETL\Transformer\{AutoCastTransformer, CallbackRowTransformer, CrossJoinRowsTransformer, DropDuplicatesTransformer, DropPartitionsTransformer, EntryNameStyleConverterTransformer, JoinEachRowsTransformer, KeepEntriesTransformer, LimitTransformer, RemoveEntriesTransformer, RenameAllCaseTransformer, RenameEntryTransformer, RenameStrReplaceAllEntriesTransformer, ScalarFunctionFilterTransformer, ScalarFunctionTransformer, UntilTransformer, WindowFunctionTransformer};
use Flow\RDSL\AccessControl\{AllowAll, AllowList, DenyAll};
use Flow\RDSL\Attribute\DSLMethod;
use Flow\RDSL\{Builder, DSLNamespace, Executor, Finder};
Expand Down Expand Up @@ -452,19 +458,11 @@ public function groupBy(string|Reference ...$entries) : GroupedDataFrame
*/
public function join(self $dataFrame, Expression $on, string|Join $type = Join::left) : self
{
if ($type instanceof Join) {
$type = $type->name;
if (\is_string($type)) {
$type = Join::from($type);
}

$transformer = match ($type) {
Join::left->value => JoinRowsTransformer::left($dataFrame, $on),
Join::left_anti->value => JoinRowsTransformer::leftAnti($dataFrame, $on),
Join::right->value => JoinRowsTransformer::right($dataFrame, $on),
Join::inner->value => JoinRowsTransformer::inner($dataFrame, $on),
default => throw new InvalidArgumentException('Unsupported join type')
};

$this->pipeline->add($transformer);
$this->pipeline = new HashJoinPipeline($this->pipeline, $dataFrame, $on, $type);

return $this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Exception;

final class DuplicatedEntriesException extends InvalidArgumentException
{
}
9 changes: 9 additions & 0 deletions src/core/etl/src/Flow/ETL/Exception/JoinException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Exception;

final class JoinException extends RuntimeException
{
}
10 changes: 10 additions & 0 deletions src/core/etl/src/Flow/ETL/Hash/Algorithm.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Hash;

interface Algorithm
{
public function hash(string $value) : string;
}
20 changes: 20 additions & 0 deletions src/core/etl/src/Flow/ETL/Hash/NativePHPHash.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Hash;

final class NativePHPHash implements Algorithm
{
public function __construct(private string $algorithm = 'xxh128', private bool $binary = false, private array $options = [])
{
if (!\in_array($algorithm, \hash_algos(), true)) {
throw new \InvalidArgumentException(\sprintf('Hashing algorithm "%s" is not supported', $algorithm));
}
}

public function hash(string $value) : string
{
return \hash($this->algorithm, $value, $this->binary, $this->options);
}
}
13 changes: 13 additions & 0 deletions src/core/etl/src/Flow/ETL/Hash/PlainText.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Hash;

final class PlainText implements Algorithm
{
public function hash(string $value) : string
{
return $value;
}
}
39 changes: 0 additions & 39 deletions src/core/etl/src/Flow/ETL/Join/Comparison/GreaterThan.php

This file was deleted.

39 changes: 0 additions & 39 deletions src/core/etl/src/Flow/ETL/Join/Comparison/GreaterThanEqual.php

This file was deleted.

39 changes: 0 additions & 39 deletions src/core/etl/src/Flow/ETL/Join/Comparison/LessThan.php

This file was deleted.

39 changes: 0 additions & 39 deletions src/core/etl/src/Flow/ETL/Join/Comparison/LessThanEqual.php

This file was deleted.

6 changes: 3 additions & 3 deletions src/core/etl/src/Flow/ETL/Join/Expression.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Flow\ETL\Join;

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Join\Comparison\{All, Identical};
use Flow\ETL\Join\Comparison\{All, Equal};
use Flow\ETL\Row;
use Flow\ETL\Row\Reference;

Expand All @@ -20,7 +20,7 @@ public function __construct(
/**
* @param array<Comparison>|array<string, string>|Comparison $comparison
*/
public static function on(array|Comparison $comparison, string $joinPrefix = '') : self
public static function on(array|Comparison $comparison, string $joinPrefix = 'joined_') : self
{
if (\is_array($comparison)) {
/** @var array<Comparison> $comparisons */
Expand All @@ -41,7 +41,7 @@ public static function on(array|Comparison $comparison, string $joinPrefix = '')
throw new RuntimeException('Expected right entry name to be string, got ' . \gettype($right) . ". Example: ['id' => 'id']");
}

$comparisons[] = new Identical($left, $right);
$comparisons[] = new Equal($left, $right);
}

return new self(new All(...$comparisons), $joinPrefix);
Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final class BatchingPipeline implements OverridingPipeline, Pipeline
*/
public function __construct(private readonly Pipeline $pipeline, private readonly int $size)
{
$this->nextPipeline = $pipeline->cleanCopy();
$this->nextPipeline = new SynchronousPipeline();

if ($this->size <= 0) {
throw new InvalidArgumentException('Batch size must be greater than 0, given: ' . $this->size);
Expand Down
Loading

0 comments on commit f742666

Please sign in to comment.