Skip to content

Commit

Permalink
DSL refactoring (#852)
Browse files Browse the repository at this point in the history
* Deprecated From/To/Transform/Handler DSL classes in favor of functions.php

* Deprecated all DSL static classes from adapters

* Added ScalarFunctionFilter implementation of Partition Filter in order to deprecate Partitions dsl and simplify filtering partitions

* Covered list/map/struct in DSL

* Renamed read to df()

* Extract adapter specific dsl functions to adapters

* Fixed invalid autoload paths

* Fixed wrong import statements

* CS Fixes

* Fixed incorrect import
  • Loading branch information
norberttech authored Nov 29, 2023
1 parent 6f400a4 commit 591529a
Show file tree
Hide file tree
Showing 337 changed files with 4,910 additions and 3,144 deletions.
23 changes: 23 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,29 @@ Class `Sha1IdFactory` was removed, use `HashIdFactory` class:
(new HashIdFactory('entry_name'))->withAlgorithm('sha1');
```

### 15) Deprecate DSL Static classes

DSL static classes were deprecated in favor of using functions defined in `src/core/etl/src/Flow/ETL/DSL/functions.php` file.

Deprecated classes:

- `src/core/etl/src/Flow/ETL/DSL/From.php`
- `src/core/etl/src/Flow/ETL/DSL/Handler.php`
- `src/core/etl/src/Flow/ETL/DSL/To.php`
- `src/core/etl/src/Flow/ETL/DSL/Transform.php`
- `src/core/etl/src/Flow/ETL/DSL/Partitions.php`
- `src/adapter/etl-adapter-avro/src/Flow/ETL/DSL/Avro.php`
- `src/adapter/etl-adapter-chartjs/src/Flow/ETL/DSL/ChartJS.php`
- `src/adapter/etl-adapter-csv/src/Flow/ETL/DSL/CSV.php`
- `src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php`
- `src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php`
- `src/adapter/etl-adapter-google-sheet/src/Flow/ETL/DSL/GoogleSheet.php`
- `src/adapter/etl-adapter-json/src/Flow/ETL/DSL/Json.php`
- `src/adapter/etl-adapter-meilisearch/src/Flow/ETL/DSL/Meilisearch.php`
- `src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php`
- `src/adapter/etl-adapter-text/src/Flow/ETL/DSL/Text.php`
- `src/adapter/etl-adapter-xml/src/Flow/ETL/DSL/XML.php`

---

## Upgrading from 0.3.x to 0.4.x
Expand Down
11 changes: 11 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
},
"autoload": {
"files": [
"src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/functions.php",
"src/adapter/etl-adapter-chartjs/src/Flow/ETL/Adapter/ChartJS/functions.php",
"src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php",
"src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/functions.php",
"src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/functions.php",
"src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/functions.php",
"src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/functions.php",
"src/adapter/etl-adapter-meilisearch/src/Flow/ETL/Adapter/Meilisearch/functions.php",
"src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/functions.php",
"src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/functions.php",
"src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/functions.php",
"src/core/etl/src/Flow/ETL/DSL/functions.php",
"src/lib/array-dot/src/Flow/ArrayDot/array_dot.php",
"src/lib/parquet/src/Flow/Parquet/functions.php",
Expand Down
20 changes: 10 additions & 10 deletions examples/data/orders_flow.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<?php declare(strict_types=1);

use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\ref;
use Flow\ETL\DSL\Avro;
use Flow\ETL\DSL\CSV;
use Flow\ETL\DSL\From;
use Flow\ETL\DSL\Json;
use Flow\ETL\DSL\Parquet;
use function Flow\ETL\DSL\to_avro;
use function Flow\ETL\DSL\to_csv;
use function Flow\ETL\DSL\to_json;
use function Flow\ETL\DSL\to_parquet;
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\Flow;

Expand Down Expand Up @@ -42,14 +42,14 @@
);

(new Flow())
->read(From::array($orders))
->read(from_array($orders))
->mode(SaveMode::Overwrite)
->write(Parquet::to(__DIR__ . '/orders_flow.parquet'))
->write(Json::to(__DIR__ . '/orders_flow.json'))
->write(Avro::to(__DIR__ . '/orders_flow.avro'))
->write(to_parquet(__DIR__ . '/orders_flow.parquet'))
->write(to_json(__DIR__ . '/orders_flow.json'))
->write(to_avro(__DIR__ . '/orders_flow.avro'))
->withEntry('order_id', ref('order_id')->cast('string'))
->withEntry('customer', ref('customer')->cast('string'))
->withEntry('address', ref('customer')->cast('string'))
->withEntry('notes', ref('customer')->cast('string'))
->write(CSV::to(__DIR__ . '/orders_flow.csv'))
->write(to_csv(__DIR__ . '/orders_flow.csv'))
->run();
4 changes: 2 additions & 2 deletions examples/setup/php_to_csv.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\to_csv;
use Aeon\Calendar\Stopwatch;
use Flow\ETL\DSL\CSV;
use Flow\ETL\Flow;
use Flow\ETL\Monitoring\Memory\Consumption;

Expand All @@ -17,7 +17,7 @@

$flow = (new Flow())
->read($extractor)
->write(CSV::to(__FLOW_OUTPUT__ . '/dataset.csv'));
->write(to_csv(__FLOW_OUTPUT__ . '/dataset.csv'));

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
return $flow;
Expand Down
11 changes: 6 additions & 5 deletions examples/topics/aggregations/daily_revenue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

declare(strict_types=1);

use function Flow\ETL\Adapter\Parquet\from_parquet;
use function Flow\ETL\Adapter\Parquet\to_parquet;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\sum;
use Flow\ETL\DSL\Parquet;
use Flow\ETL\DSL\To;
use function Flow\ETL\DSL\to_output;
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\Flow;

require __DIR__ . '/../../bootstrap.php';

$flow = (new Flow())
->read(Parquet::from(__FLOW_DATA__ . '/orders_flow.parquet'))
->read(from_parquet(__FLOW_DATA__ . '/orders_flow.parquet'))
->select('created_at', 'total_price', 'discount')
->withEntry('created_at', ref('created_at')->toDate()->dateFormat('Y/m'))
->withEntry('revenue', ref('total_price')->minus(ref('discount')))
Expand All @@ -23,10 +24,10 @@
->sortBy(ref('created_at')->desc())
->withEntry('daily_revenue', ref('revenue_sum')->round(lit(2))->numberFormat(lit(2)))
->drop('revenue_sum')
->write(To::output(truncate: false))
->write(to_output(truncate: false))
->withEntry('created_at', ref('created_at')->toDate('Y/m'))
->mode(SaveMode::Overwrite)
->write(Parquet::to(__FLOW_OUTPUT__ . '/daily_revenue.parquet'));
->write(to_parquet(__FLOW_OUTPUT__ . '/daily_revenue.parquet'));

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
return $flow;
Expand Down
17 changes: 8 additions & 9 deletions examples/topics/aggregations/power_plant.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\DSL\average;
use function Flow\ETL\DSL\concat;
use function Flow\ETL\DSL\data_frame;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\max;
use function Flow\ETL\DSL\min;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\sum;
use Flow\ETL\DSL\CSV;
use Flow\ETL\DSL\To;
use Flow\ETL\Flow;
use function Flow\ETL\DSL\to_output;

require __DIR__ . '/../../bootstrap.php';

$flow = (new Flow())
->read(CSV::from(__FLOW_DATA__ . '/power-plant-daily.csv', delimiter: ';'))
$df = data_frame()
->read(from_csv(__FLOW_DATA__ . '/power-plant-daily.csv', delimiter: ';'))
->withEntry('production_kwh', ref('Produkcja(kWh)'))
->withEntry('consumption_kwh', ref('Zużycie(kWh)'))
->withEntry('date', ref('Zaktualizowany czas')->toDate('Y/m/d')->dateFormat('Y/m'))
Expand All @@ -32,7 +32,6 @@
sum(ref('production_kwh')),
sum(ref('consumption_kwh'))
)

->withEntry('production_kwh_avg', ref('production_kwh_avg')->round(lit(2)))
->withEntry('consumption_kwh_avg', ref('consumption_kwh_avg')->round(lit(2)))
->withEntry('production_kwh_min', ref('production_kwh_min')->round(lit(2)))
Expand All @@ -44,10 +43,10 @@
->withEntry('consumption', ref('consumption_kwh_sum')->divide(ref('production_kwh_sum')))
->withEntry('consumption', ref('consumption')->multiply(lit(100))->round(lit(2)))
->withEntry('consumption', concat(ref('consumption'), lit('%')))
->write(To::output(truncate: false));
->write(to_output(truncate: false));

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
return $flow;
return $df;
}

$flow->run();
$df->run();
20 changes: 11 additions & 9 deletions examples/topics/aggregations/power_plant_bar_chart.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

declare(strict_types=1);

use function Flow\ETL\Adapter\ChartJS\bar_chart;
use function Flow\ETL\Adapter\ChartJS\to_chartjs_file;
use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\DSL\average;
use function Flow\ETL\DSL\concat;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\max;
use function Flow\ETL\DSL\min;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\refs;
use function Flow\ETL\DSL\sum;
use Flow\ETL\DSL\ChartJS;
use Flow\ETL\DSL\CSV;
use Flow\ETL\Flow;

require __DIR__ . '/../../bootstrap.php';

$flow = (new Flow)
->read(CSV::from(__FLOW_DATA__ . '/power-plant-daily.csv', delimiter: ';'))
$df = df()
->read(from_csv(__FLOW_DATA__ . '/power-plant-daily.csv', delimiter: ';'))
->withEntry('production_kwh', ref('Produkcja(kWh)'))
->withEntry('consumption_kwh', ref('Zużycie(kWh)'))
->withEntry('date', ref('Zaktualizowany czas')->toDate('Y/m/d')->dateFormat('Y/m'))
Expand Down Expand Up @@ -45,15 +47,15 @@
->withEntry('consumption', ref('consumption')->multiply(lit(100))->round(lit(2)))
->withEntry('consumption', concat(ref('consumption'), lit('%')))
->write(
ChartJS::to_file(
ChartJS::bar(label: ref('date'), datasets: [ref('production_kwh_avg'), ref('consumption_kwh_avg')])
to_chartjs_file(
bar_chart(label: ref('date'), datasets: refs(ref('production_kwh_avg'), ref('consumption_kwh_avg')))
->setOptions(['indexAxis' => 'y']),
output: __FLOW_OUTPUT__ . '/power_plant_bar_chart.html'
)
);

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
return $flow;
return $df;
}

$flow->run();
$df->run();
19 changes: 10 additions & 9 deletions examples/topics/dataframe/get.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

declare(strict_types=1);

use Flow\ETL\DSL\Entry;
use Flow\ETL\DSL\From;
use function Flow\ETL\DSL\from_rows;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\str_entry;
use Flow\ETL\Flow;
use Flow\ETL\Row;
use Flow\ETL\Rows;
Expand All @@ -12,14 +13,14 @@

$etl = (new Flow())
->read(
From::rows(
from_rows(
new Rows(
Row::create(Entry::integer('id', 1), Entry::string('name', 'foo')),
Row::create(Entry::integer('id', 2), Entry::string('name', 'bar')),
Row::create(Entry::integer('id', 3), Entry::string('name', 'baz')),
Row::create(Entry::integer('id', 4), Entry::string('name', 'foo')),
Row::create(Entry::integer('id', 5), Entry::string('name', 'bar')),
Row::create(Entry::integer('id', 6), Entry::string('name', 'baz')),
Row::create(int_entry('id', 1), str_entry('name', 'foo')),
Row::create(int_entry('id', 2), str_entry('name', 'bar')),
Row::create(int_entry('id', 3), str_entry('name', 'baz')),
Row::create(int_entry('id', 4), str_entry('name', 'foo')),
Row::create(int_entry('id', 5), str_entry('name', 'bar')),
Row::create(int_entry('id', 6), str_entry('name', 'baz')),
),
)
);
Expand Down
4 changes: 2 additions & 2 deletions examples/topics/db/db_source.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
exit(1);
}

use function Flow\ETL\Adapter\CSV\from_csv;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\Column;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Flow\ETL\Adapter\Doctrine\DbalLoader;
use Flow\ETL\DSL\CSV;
use Flow\ETL\Flow;

$dbConnectionString = 'postgresql://postgres:[email protected]:5432/postgres?serverVersion=11%26charset=utf8';
Expand All @@ -36,7 +36,7 @@
);

(new Flow())
->read(CSV::from($path = __FLOW_OUTPUT__ . '/dataset.csv', 10_000))
->read(from_csv($path = __FLOW_OUTPUT__ . '/dataset.csv', 10_000))
->rename('last name', 'last_name')
->limit(1_000_000)
->load(DbalLoader::fromConnection($dbConnection, 'source_dataset_table', 1000))
Expand Down
7 changes: 4 additions & 3 deletions examples/topics/db/db_to_db_sync.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
exit(1);
}

use function Flow\ETL\Adapter\Doctrine\from_dbal_limit_offset;
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\concat;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use Aeon\Calendar\Stopwatch;
use Flow\ETL\Adapter\Doctrine\Order;
use Flow\ETL\Adapter\Doctrine\OrderBy;
use Flow\ETL\DSL\Dbal;
use Flow\ETL\Flow;

require __DIR__ . '/../../bootstrap.php';
Expand All @@ -31,7 +32,7 @@

(new Flow())
->read(
Dbal::from_limit_offset(
from_dbal_limit_offset(
$sourceDbConnection,
'source_dataset_table',
new OrderBy('id', Order::DESC)
Expand All @@ -40,7 +41,7 @@
->withEntry('id', ref('id')->cast('int'))
->withEntry('name', concat(ref('name'), lit(' '), ref('last name')))
->drop('last_name')
->write(Dbal::to_table_insert($dbConnection, 'flow_dataset_table'))
->write(to_dbal_table_insert($dbConnection, 'flow_dataset_table'))
->run();

$stopwatch->stop();
Expand Down
7 changes: 4 additions & 3 deletions examples/topics/fs/remote/json_remote_stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
exit(1);
}

use function Flow\ETL\Adapter\JSON\from_json;
use function Flow\ETL\Adapter\JSON\to_json;
use function Flow\ETL\DSL\concat;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\Filesystem\AwsS3Stream;
use Flow\ETL\Adapter\Filesystem\AzureBlobStream;
use Flow\ETL\DSL\Json;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Flow;
use Symfony\Component\Dotenv\Dotenv;
Expand Down Expand Up @@ -45,9 +46,9 @@
AzureBlobStream::register();

(new Flow())
->read(Json::from(new Path('flow-aws-s3://dataset.json', $s3_client_option)))
->read(from_json(new Path('flow-aws-s3://dataset.json', $s3_client_option)))
->withEntry('id', ref('id')->cast('integer'))
->withEntry('name', concat(ref('name'), lit(' '), ref('last name')))
->drop('last name')
->write(Json::to(new Path('flow-azure-blob://dataset_test.json', $azure_blob_connection_string)))
->write(to_json(new Path('flow-azure-blob://dataset_test.json', $azure_blob_connection_string)))
->run();
7 changes: 4 additions & 3 deletions examples/topics/fs/remote/json_remote_stream_glob.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
exit(1);
}

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\concat;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\Filesystem\AwsS3Stream;
use Flow\ETL\Adapter\Filesystem\AzureBlobStream;
use Flow\ETL\DSL\CSV;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Flow;
use Symfony\Component\Dotenv\Dotenv;
Expand Down Expand Up @@ -45,9 +46,9 @@
AzureBlobStream::register();

(new Flow())
->read(CSV::from(new Path('flow-aws-s3://nested/**/*.csv', $s3_client_option)))
->read(from_csv(new Path('flow-aws-s3://nested/**/*.csv', $s3_client_option)))
->withEntry('id', ref('id')->cast('int'))
->withEntry('name', concat(ref('name'), lit(' '), ref('last name')))
->drop('last name')
->write(CSV::to(new Path('flow-azure-blob://output.csv', $azure_blob_connection_string)))
->write(to_csv(new Path('flow-azure-blob://output.csv', $azure_blob_connection_string)))
->run();
Loading

0 comments on commit 591529a

Please sign in to comment.