Skip to content

Commit

Permalink
Partition pruning optimizer (#887)
Browse files Browse the repository at this point in the history
* Partitions cleanup & fixes

* Removed ScalarFunctions chain

* Introduced FunctionChain interface

* Moved all scalar function methods to abstract class

* Removed partitions filter from FlowContext

* Removed double partitioning & removed partitions from FlowContext

* Added CompositeScalarFunctionIterator

* Added GroupedDataFrame

* Apply filter to partitions through partition pruning optimizer

* Added partition pruning optimization tests

* Reduced MSI

* Updated examples

* Fixed broken examples

* Reverted PartitionPruningOptimization

* Fixed exampleS

* Removed tracking scalar function roots
  • Loading branch information
norberttech authored Dec 28, 2023
1 parent a89ec97 commit 55c4d75
Show file tree
Hide file tree
Showing 139 changed files with 1,020 additions and 957 deletions.
2 changes: 1 addition & 1 deletion UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Transformers are a really powerful tool that was used in Flow since the beginnin

We reworked most of the internal transformers to new scalar functions and entry scalar functions (based on the built-in functions), and we still internally use that powerful tool, but we don't expose it to end users, instead, we provide easy-to-use, covering all user needs functions.

All available functions can be found in [`ETL\Row\Function` folder](src/core/etl/src/Flow/ETL/Function) or in [`ETL\DSL\functions` file](src/core/etl/src/Flow/ETL/DSL/functions.php), and entry scalar functions are defined in [`EntryScalarFunction` trait](src/core/etl/src/Flow/ETL/Function/EntryScalarFunction.php).
All available functions can be found in [`ETL\Row\Function` folder](src/core/etl/src/Flow/ETL/Function) or in [`ETL\DSL\functions` file](src/core/etl/src/Flow/ETL/DSL/functions.php), and entry scalar functions are defined in `EntryScalarFunction`.

To see what transformers are available see [`ETL\DSL\Transform` class](src/core/etl/src/Flow/ETL/DSL/Transform.php).

Expand Down
30 changes: 15 additions & 15 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions examples/topics/types/csv/csv_read_partitioned.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_output;
use Aeon\Calendar\Stopwatch;
use Flow\ETL\Flow;
Expand All @@ -12,8 +11,6 @@

$flow = (new Flow())
->read(from_csv(__FLOW_DATA__ . '/partitioned'))
->collect()
->sortBy(ref('id'))
->write(to_output());

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
Expand Down
3 changes: 2 additions & 1 deletion examples/topics/types/csv/csv_read_partitioned_filter.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

$flow = (new Flow())
->read(from_csv(__FLOW_DATA__ . '/partitioned'))
->collect()
->filterPartitions(ref('country')->equals(lit('pl')))
->filterPartitions(ref('t_shirt_color')->equals(lit('green')))
->collect()
->sortBy(ref('id'))
->write(to_output());

Expand Down
4 changes: 2 additions & 2 deletions infection.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@
},
"tmpDir": "var/infection/cache",
"testFrameworkOptions": "--testsuite=unit",
"minMsi": 50,
"minCoveredMsi": 50
"minMsi": 40,
"minCoveredMsi": 40
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Extractor;
use Flow\ETL\Extractor\FileExtractor;
use Flow\ETL\Extractor\Limitable;
use Flow\ETL\Extractor\LimitableExtractor;
use Flow\ETL\Extractor\PartitionFiltering;
use Flow\ETL\Extractor\PartitionsExtractor;
use Flow\ETL\Extractor\Signal;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;

final class AvroExtractor implements Extractor, Extractor\FileExtractor, LimitableExtractor
final class AvroExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor
{
use Limitable;
use PartitionFiltering;

public function __construct(
private readonly Path $path
Expand All @@ -27,9 +31,7 @@ public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) {
$partitions = $filePath->partitions();

foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $filePath) {
$reader = new \AvroDataIOReader(
new AvroResource(
$context->streams()->fs()->open(
Expand All @@ -49,7 +51,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $filePath->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $partitions);
$signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions());
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Flow\ETL\Adapter\Avro\FlixTech;

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
Expand Down Expand Up @@ -65,10 +64,6 @@ public function destination() : Path

public function load(Rows $rows, FlowContext $context) : void
{
if ($context->partitionEntries()->count()) {
throw new RuntimeException('Partitioning is not supported yet');
}

if ($this->schema === null) {
if ($this->inferredSchema === null) {
$this->inferredSchema = $rows->schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,6 @@ public function test_limit() : void
);
}

public function test_partitioning() : void
{
$this->expectExceptionMessage('Partitioning is not supported yet');

$this->removeFile($path = \sys_get_temp_dir() . '/file.avro');

df()
->read(from_rows(
$rows = new Rows(
...\array_map(function (int $i) : Row {
return Row::create(
int_entry('integer', $i),
float_entry('float', 1.5),
str_entry('string', 'name_' . $i),
bool_entry('boolean', true),
datetime_entry('datetime', new \DateTimeImmutable()),
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class))),
);
}, \range(1, 100))
)
))
->partitionBy('integer')
->write(to_avro($path))
->run();
}

public function test_safe_writing_and_reading_avro_with_all_supported_types() : void
{
$this->cleanDirectory($path = \sys_get_temp_dir() . '/directory.avro');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
use Flow\ETL\Extractor\FileExtractor;
use Flow\ETL\Extractor\Limitable;
use Flow\ETL\Extractor\LimitableExtractor;
use Flow\ETL\Extractor\PartitionFiltering;
use Flow\ETL\Extractor\PartitionsExtractor;
use Flow\ETL\Extractor\Signal;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;

final class CSVExtractor implements Extractor, FileExtractor, LimitableExtractor
final class CSVExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor
{
use Limitable;
use PartitionFiltering;

/**
* @param int<0, max> $charactersReadInLine
Expand All @@ -37,8 +40,7 @@ public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $path) {
$partitions = $path->partitions();
foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $path) {
$stream = $context->streams()->fs()->open($path, Mode::READ);

$headers = [];
Expand Down Expand Up @@ -90,7 +92,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $stream->path()->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $partitions);
$signal = yield array_to_rows($row, $context->entryFactory(), $path->partitions());
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,8 @@ public function load(Rows $rows, FlowContext $context) : void

$headers = $rows->first()->entries()->map(fn (Entry $entry) => $entry->name());

if ($context->partitionEntries()->count()) {
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) {
$this->write($partitionedRows, $headers, $context, $partitionedRows->partitions()->toArray());
}
if ($rows->partitions()->count()) {
$this->write($rows, $headers, $context, $rows->partitions()->toArray());
} else {
$this->write($rows, $headers, $context, []);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public function test_loading_csv_with_partitioning() : void
Row::create(int_entry('id', 4), int_entry('group', 2)),
)
)
->load(to_csv($path))
->partitionBy('group')
->load(to_csv($path))
->run();

$partitions = \array_values(\array_diff(\scandir($path), ['..', '.']));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public function rm(Path $path) : void
*
* @return \Generator<Path>
*/
public function scan(Path $path, PartitionFilter $partitionFilter) : \Generator
public function scan(Path $path, PartitionFilter $partitionFilter = new NoopFilter()) : \Generator
{
$fs = $this->factory->create($path);

Expand All @@ -163,7 +163,7 @@ public function scan(Path $path, PartitionFilter $partitionFilter) : \Generator
}
}

return $partitionFilter->keep(...(new Path(DIRECTORY_SEPARATOR . $file->path()))->partitions());
return $partitionFilter->keep(...(new Path(DIRECTORY_SEPARATOR . $file->path()))->partitions()->toArray());
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\Extractor\FileExtractor;
use Flow\ETL\Extractor\Limitable;
use Flow\ETL\Extractor\LimitableExtractor;
use Flow\ETL\Extractor\Signal;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\FlowContext;
use League\Flysystem\DirectoryAttributes;
use League\Flysystem\FileAttributes;

final class RemoteFileListExtractor implements Extractor, Extractor\FileExtractor, Extractor\LimitableExtractor
final class RemoteFileListExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Limitable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@
use Flow\ETL\Extractor\FileExtractor;
use Flow\ETL\Extractor\Limitable;
use Flow\ETL\Extractor\LimitableExtractor;
use Flow\ETL\Extractor\PartitionFiltering;
use Flow\ETL\Extractor\PartitionsExtractor;
use Flow\ETL\Extractor\Signal;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use JsonMachine\Items;
use JsonMachine\JsonDecoder\ExtJsonDecoder;

final class JsonExtractor implements Extractor, FileExtractor, LimitableExtractor
final class JsonExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor
{
use Limitable;
use PartitionFiltering;

public function __construct(
private readonly Path $path,
Expand All @@ -31,8 +34,7 @@ public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) {
$partitions = $filePath->partitions();
foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $filePath) {

/**
* @var array|object $rowData
Expand All @@ -44,7 +46,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $filePath->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $partitions);
$signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions());
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Loading

0 comments on commit 55c4d75

Please sign in to comment.