From 6bf7b22913e053c52ace62502d3f7edc0c8ff4b4 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Sun, 18 Feb 2024 21:35:49 +0100 Subject: [PATCH 1/3] Removed deprecated code --- .github/workflows/build-release.yml | 1 - .github/workflows/build.yml | 1 - .../src/Flow/ETL/DSL/Avro.php | 58 ---- .../src/Flow/ETL/DSL/ChartJS.php | 56 ---- .../etl-adapter-csv/src/Flow/ETL/DSL/CSV.php | 80 ----- .../src/Flow/ETL/DSL/Dbal.php | 188 ----------- .../src/Flow/ETL/DSL/Elasticsearch.php | 123 ------- .../src/Flow/ETL/DSL/GoogleSheet.php | 91 ------ .../src/Flow/ETL/DSL/Json.php | 58 ---- .../src/Flow/ETL/DSL/Meilisearch.php | 56 ---- .../src/Flow/ETL/DSL/Parquet.php | 82 ----- .../src/Flow/ETL/DSL/Text.php | 58 ---- .../etl-adapter-xml/src/Flow/ETL/DSL/XML.php | 45 --- src/core/etl/src/Flow/ETL/DSL/Entry.php | 306 ------------------ src/core/etl/src/Flow/ETL/DSL/From.php | 123 ------- src/core/etl/src/Flow/ETL/DSL/Handler.php | 30 -- src/core/etl/src/Flow/ETL/DSL/Partitions.php | 207 ------------ src/core/etl/src/Flow/ETL/DSL/To.php | 57 ---- src/core/etl/src/Flow/ETL/DSL/Transform.php | 68 ---- src/core/etl/src/Flow/ETL/DataFrame.php | 19 -- .../Flow/ETL/DataFrame/GroupedDataFrame.php | 5 + .../src/Flow/ETL/Partition/CallableFilter.php | 31 -- .../Optimizer/BatchSizeOptimization.php | 4 - .../Pipeline/Optimizer/LimitOptimization.php | 5 - .../ETL/Pipeline/ParallelizingPipeline.php | 99 ------ src/core/etl/src/Flow/ETL/Row/Schema.php | 8 - .../Integration/DataFrame/GroupByTest.php | 3 +- .../Pipeline/ParallelizingPipelineTest.php | 63 ---- .../Unit/Pipeline/NestedPipelineTest.php | 4 +- .../Optimizer/BatchSizeOptimizationTest.php | 10 - .../Flow/ETL/Tests/Unit/Row/SchemaTest.php | 9 - .../KeepEntriesTransformerTest.php | 6 +- .../RemoveEntriesTransformerTest.php | 6 +- .../RenameEntryTransformerTest.php | 6 +- 34 files changed, 17 insertions(+), 1949 deletions(-) delete mode 100644 src/adapter/etl-adapter-avro/src/Flow/ETL/DSL/Avro.php delete mode 100644 src/adapter/etl-adapter-chartjs/src/Flow/ETL/DSL/ChartJS.php delete mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/DSL/CSV.php delete mode 100644 src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php delete mode 100644 src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php delete mode 100644 src/adapter/etl-adapter-google-sheet/src/Flow/ETL/DSL/GoogleSheet.php delete mode 100644 src/adapter/etl-adapter-json/src/Flow/ETL/DSL/Json.php delete mode 100644 src/adapter/etl-adapter-meilisearch/src/Flow/ETL/DSL/Meilisearch.php delete mode 100644 src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php delete mode 100644 src/adapter/etl-adapter-text/src/Flow/ETL/DSL/Text.php delete mode 100644 src/adapter/etl-adapter-xml/src/Flow/ETL/DSL/XML.php delete mode 100644 src/core/etl/src/Flow/ETL/DSL/Entry.php delete mode 100644 src/core/etl/src/Flow/ETL/DSL/From.php delete mode 100644 src/core/etl/src/Flow/ETL/DSL/Handler.php delete mode 100644 src/core/etl/src/Flow/ETL/DSL/Partitions.php delete mode 100644 src/core/etl/src/Flow/ETL/DSL/To.php delete mode 100644 src/core/etl/src/Flow/ETL/DSL/Transform.php delete mode 100644 src/core/etl/src/Flow/ETL/Partition/CallableFilter.php delete mode 100644 src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php delete mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/Pipeline/ParallelizingPipelineTest.php diff --git a/.github/workflows/build-release.yml b/.github/workflows/build-release.yml index bf3ee16bb..0e48e1c2f 100644 --- a/.github/workflows/build-release.yml +++ b/.github/workflows/build-release.yml @@ -52,7 +52,6 @@ jobs: - name: "Validate Flow PHAR" run: | ./build/flow.phar --version - ./build/flow.phar run examples/topics/phar/data_frame/code.php - name: "Import GPG Key" uses: crazy-max/ghaction-import-gpg@72b6676b71ab476b77e676928516f6982eef7a41 diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a7aa4c2fc..31113e758 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -56,7 +56,6 @@ jobs: - name: "Validate Flow PHAR" run: | ./build/flow.phar --version - ./build/flow.phar run examples/topics/phar/data_frame/code.php - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 diff --git a/src/adapter/etl-adapter-avro/src/Flow/ETL/DSL/Avro.php b/src/adapter/etl-adapter-avro/src/Flow/ETL/DSL/Avro.php deleted file mode 100644 index f683ef895..000000000 --- a/src/adapter/etl-adapter-avro/src/Flow/ETL/DSL/Avro.php +++ /dev/null @@ -1,58 +0,0 @@ -|Path|string $path - */ - final public static function from( - Path|string|array $path - ) : Extractor { - if (\is_array($path)) { - /** @var array $extractors */ - $extractors = []; - - foreach ($path as $next_path) { - $extractors[] = new AvroExtractor( - \is_string($next_path) ? Path::realpath($next_path) : $next_path, - ); - } - - return new Extractor\ChainExtractor(...$extractors); - } - - return new AvroExtractor( - \is_string($path) ? Path::realpath($path) : $path - ); - } - - /** - * @param Path|string $path - * @param null|Schema $schema - * - * @return Loader - */ - final public static function to(Path|string $path, ?Schema $schema = null) : Loader - { - return new AvroLoader( - \is_string($path) ? Path::realpath($path) : $path, - $schema - ); - } -} diff --git a/src/adapter/etl-adapter-chartjs/src/Flow/ETL/DSL/ChartJS.php b/src/adapter/etl-adapter-chartjs/src/Flow/ETL/DSL/ChartJS.php deleted file mode 100644 index c44e6178c..000000000 --- a/src/adapter/etl-adapter-chartjs/src/Flow/ETL/DSL/ChartJS.php +++ /dev/null @@ -1,56 +0,0 @@ -|Path|string $path - * @param int<0, max> $characters_read_in_line - * - * @throws InvalidArgumentException - */ - final public static function from( - string|Path|array $path, - bool $with_header = true, - bool $empty_to_null = true, - string $delimiter = ',', - string $enclosure = '"', - string $escape = '\\', - int $characters_read_in_line = 1000 - ) : Extractor { - if (\is_array($path)) { - $extractors = []; - - foreach ($path as $file_path) { - $extractors[] = new CSVExtractor( - \is_string($file_path) ? Path::realpath($file_path) : $file_path, - $with_header, - $empty_to_null, - $delimiter, - $enclosure, - $escape, - $characters_read_in_line, - ); - } - - return new Extractor\ChainExtractor(...$extractors); - } - - return new CSVExtractor( - \is_string($path) ? Path::realpath($path) : $path, - $with_header, - $empty_to_null, - $delimiter, - $enclosure, - $escape, - $characters_read_in_line, - ); - } - - final public static function to( - string|Path $uri, - bool $with_header = true, - string $separator = ',', - string $enclosure = '"', - string $escape = '\\', - string $new_line_separator = PHP_EOL - ) : Loader { - return new CSVLoader( - \is_string($uri) ? Path::realpath($uri) : $uri, - $with_header, - $separator, - $enclosure, - $escape, - $new_line_separator - ); - } -} diff --git a/src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php b/src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php deleted file mode 100644 index 3440310e0..000000000 --- a/src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php +++ /dev/null @@ -1,188 +0,0 @@ -|Connection $connection - * @param string $query - * @param QueryParameter ...$parameters - * - * @return DataFrameFactory - */ - final public static function dataframe_factory( - array|Connection $connection, - string $query, - QueryParameter ...$parameters - ) : DataFrameFactory { - return \is_array($connection) - ? new DbalDataFrameFactory($connection, $query, ...$parameters) - : DbalDataFrameFactory::fromConnection($connection, $query, ...$parameters); - } - - /** - * @param Connection $connection - * @param string|Table $table - * @param array|OrderBy $order_by - * @param int $page_size - * @param null|int $maximum - * - * @throws InvalidArgumentException - * - * @return Extractor - */ - final public static function from_limit_offset( - Connection $connection, - string|Table $table, - array|OrderBy $order_by, - int $page_size = 1000, - ?int $maximum = null, - ) : Extractor { - return DbalLimitOffsetExtractor::table( - $connection, - \is_string($table) ? new Table($table) : $table, - $order_by instanceof OrderBy ? [$order_by] : $order_by, - $page_size, - $maximum, - ); - } - - /** - * @param Connection $connection - * @param int $page_size - * @param null|int $maximum - * - * @return Extractor - */ - final public static function from_limit_offset_qb( - Connection $connection, - QueryBuilder $queryBuilder, - int $page_size = 1000, - ?int $maximum = null, - ) : Extractor { - return new DbalLimitOffsetExtractor( - $connection, - $queryBuilder, - $page_size, - $maximum, - ); - } - - /** - * @param Connection $connection - * @param string $query - * @param null|ParametersSet $parameters_set - each one parameters array will be evaluated as new query - * @param array|string, ArrayParameterType|ParameterType|string|Type> $types - * - * @return Extractor - */ - final public static function from_queries( - Connection $connection, - string $query, - ?ParametersSet $parameters_set = null, - array $types = [], - ) : Extractor { - return new DbalQueryExtractor( - $connection, - $query, - $parameters_set, - $types, - ); - } - - /** - * @param Connection $connection - * @param string $query - * @param array|list $parameters - * @param array|string, ArrayParameterType|ParameterType|string|Type> $types - * - * @return Extractor - */ - final public static function from_query( - Connection $connection, - string $query, - array $parameters = [], - array $types = [], - ) : Extractor { - return DbalQueryExtractor::single( - $connection, - $query, - $parameters, - $types, - ); - } - - /** - * In order to control the size of the single insert, use DataFrame::chunkSize() method just before calling DataFrame::load(). - * - * @param array|Connection $connection - * @param array{ - * skip_conflicts?: boolean, - * constraint?: string, - * conflict_columns?: array, - * update_columns?: array, - * primary_key_columns?: array - * } $options - * - * @throws InvalidArgumentException - */ - final public static function to_table_insert( - array|Connection $connection, - string $table, - array $options = [], - ) : Loader { - return \is_array($connection) - ? new DbalLoader($table, $connection, $options, 'insert') - : DbalLoader::fromConnection($connection, $table, $options, 'insert'); - } - - /** - * In order to control the size of the single request, use DataFrame::chunkSize() method just before calling DataFrame::load(). - * - * @param array|Connection $connection - * @param array{ - * skip_conflicts?: boolean, - * constraint?: string, - * conflict_columns?: array, - * update_columns?: array, - * primary_key_columns?: array - * } $options - * - * @throws InvalidArgumentException - * - * @return Loader - */ - final public static function to_table_update( - array|Connection $connection, - string $table, - array $options = [], - ) : Loader { - return \is_array($connection) - ? new DbalLoader($table, $connection, $options, 'update') - : DbalLoader::fromConnection($connection, $table, $options, 'update'); - } -} diff --git a/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php b/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php deleted file mode 100644 index 5bb4690d6..000000000 --- a/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php +++ /dev/null @@ -1,123 +0,0 @@ -, - * connectionParams?: array, - * retries?: int, - * sniffOnStart?: boolean, - * sslCert?: array, - * sslKey?: array, - * sslVerification?: boolean|string, - * elasticMetaHeader?: boolean, - * includePortInHostHeader?: boolean - * } $config - * @param string $index - * @param IdFactory $id_factory - * @param array $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html - * - * @return Loader - */ - final public static function bulk_index( - array $config, - string $index, - IdFactory $id_factory, - array $parameters = [] - ) : Loader { - return new ElasticsearchLoader($config, $index, $id_factory, $parameters); - } - - /** - * https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html. - * - * In order to control the size of the single request, use DataFrame::chunkSize() method just before calling DataFrame::load(). - * - * @param array{ - * hosts?: array, - * connectionParams?: array, - * retries?: int, - * sniffOnStart?: boolean, - * sslCert?: array, - * sslKey?: array, - * sslVerification?: boolean|string, - * elasticMetaHeader?: boolean, - * includePortInHostHeader?: boolean - * } $config - * @param string $index - * @param IdFactory $id_factory - * @param array $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html - * - * @return Loader - */ - final public static function bulk_update( - array $config, - string $index, - IdFactory $id_factory, - array $parameters = [] - ) : Loader { - return ElasticsearchLoader::update($config, $index, $id_factory, $parameters); - } - - /** - * Transforms elasticsearch results into clear Flow Rows using ['hits']['hits'][x]['_source']. - * - * @return Transformer - */ - final public static function hits_to_rows(DocumentDataSource $source = DocumentDataSource::source) : Transformer - { - return new HitsIntoRowsTransformer($source); - } - - /** - * Extractor will automatically try to iterate over whole index using one of the two iteration methods:. - * - * - from/size - * - search_after - * - * Search after is selected when you provide define sort parameters in query, otherwise it will fallback to from/size. - * - * @param array{ - * hosts?: array, - * connectionParams?: array, - * retries?: int, - * sniffOnStart?: boolean, - * sslCert?: array, - * sslKey?: array, - * sslVerification?: boolean|string, - * elasticMetaHeader?: boolean, - * includePortInHostHeader?: boolean - * } $config - * @param array $params - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-search.html - * @param ?array $pit_params - when used extractor will create point in time to stabilize search results. Point in time is automatically closed when last element is extracted. https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html - */ - final public static function search(array $config, array $params, ?array $pit_params = null) : Extractor - { - return new ElasticsearchExtractor( - $config, - $params, - $pit_params, - ); - } -} diff --git a/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/DSL/GoogleSheet.php b/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/DSL/GoogleSheet.php deleted file mode 100644 index a94541128..000000000 --- a/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/DSL/GoogleSheet.php +++ /dev/null @@ -1,91 +0,0 @@ -setScopes(Sheets::SPREADSHEETS_READONLY); - $client->setAuthConfig($auth_config); - $sheets = new Sheets($client); - } - - return new GoogleSheetExtractor( - $sheets, - $spreadsheet_id, - new Columns($sheet_name, 'A', 'Z'), - $with_header, - $rows_per_page, - $options, - ); - } - - /** - * @param array{type: string, project_id: string, private_key_id: string, private_key: string, client_email: string, client_id: string, auth_uri: string, token_uri: string, auth_provider_x509_cert_url: string, client_x509_cert_url: string}|Sheets $auth_config - * @param string $spreadsheet_id - * @param string $sheet_name - * @param string $start_range_column - * @param string $end_range_column - * @param bool $with_header - * @param int $rows_per_page - how many rows per page to fetch from Google Sheets API - * @param array{dateTimeRenderOption?: string, majorDimension?: string, valueRenderOption?: string} $options - */ - public static function from_columns( - array|Sheets $auth_config, - string $spreadsheet_id, - string $sheet_name, - string $start_range_column, - string $end_range_column, - bool $with_header = true, - int $rows_per_page = 1000, - array $options = [], - ) : Extractor { - if ($auth_config instanceof Sheets) { - $sheets = $auth_config; - } else { - $client = new Client(); - $client->setScopes(Sheets::SPREADSHEETS_READONLY); - $client->setAuthConfig($auth_config); - $sheets = new Sheets($client); - } - - return new GoogleSheetExtractor( - $sheets, - $spreadsheet_id, - new Columns($sheet_name, $start_range_column, $end_range_column), - $with_header, - $rows_per_page, - $options, - ); - } -} diff --git a/src/adapter/etl-adapter-json/src/Flow/ETL/DSL/Json.php b/src/adapter/etl-adapter-json/src/Flow/ETL/DSL/Json.php deleted file mode 100644 index b181d82d2..000000000 --- a/src/adapter/etl-adapter-json/src/Flow/ETL/DSL/Json.php +++ /dev/null @@ -1,58 +0,0 @@ -|Path|string $path - string is internally turned into stream - * @param ?string $pointer - if you want to iterate only results of a subtree, use a pointer, read more at https://github.com/halaxa/json-machine#parsing-a-subtree - * - * @return Extractor - */ - public static function from( - string|Path|array $path, - ?string $pointer = null, - ) : Extractor { - if (\is_array($path)) { - $extractors = []; - - foreach ($path as $file) { - $extractors[] = new JsonExtractor( - \is_string($file) ? Path::realpath($file) : $file, - $pointer, - ); - } - - return new Extractor\ChainExtractor(...$extractors); - } - - return new JsonExtractor( - \is_string($path) ? Path::realpath($path) : $path, - $pointer, - ); - } - - /** - * @param Path|string $path - * - * @return Loader - */ - public static function to(string|Path $path) : Loader - { - return new JsonLoader( - \is_string($path) ? Path::realpath($path) : $path, - ); - } -} diff --git a/src/adapter/etl-adapter-meilisearch/src/Flow/ETL/DSL/Meilisearch.php b/src/adapter/etl-adapter-meilisearch/src/Flow/ETL/DSL/Meilisearch.php deleted file mode 100644 index 529a3d01b..000000000 --- a/src/adapter/etl-adapter-meilisearch/src/Flow/ETL/DSL/Meilisearch.php +++ /dev/null @@ -1,56 +0,0 @@ -, sort: ?array} $params - */ - final public static function search(array $config, array $params, string $index) : Extractor - { - return new MeilisearchExtractor($config, $params, $index); - } -} diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php deleted file mode 100644 index d82b627d0..000000000 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php +++ /dev/null @@ -1,82 +0,0 @@ -|Path|string $uri - * @param array $columns - * - * @return Extractor - */ - final public static function from( - string|Path|array $uri, - array $columns = [], - Options $options = new Options(), - ByteOrder $byte_order = ByteOrder::LITTLE_ENDIAN, - ) : Extractor { - if (\is_array($uri)) { - $extractors = []; - - foreach ($uri as $filePath) { - $extractors[] = new ParquetExtractor( - $filePath, - $options, - $byte_order, - $columns - ); - } - - return new Extractor\ChainExtractor(...$extractors); - } - - return new ParquetExtractor( - \is_string($uri) ? Path::realpath($uri) : $uri, - $options, - $byte_order, - $columns - ); - } - - /** - * @param Path|string $path - * @param null|Schema $schema - * - * @return Loader - */ - final public static function to( - string|Path $path, - ?Options $options = null, - Compressions $compressions = Compressions::SNAPPY, - ?Schema $schema = null, - ) : Loader { - if ($options === null) { - $options = Options::default(); - } - - return new ParquetLoader( - \is_string($path) ? Path::realpath($path) : $path, - $options, - $compressions, - $schema, - ); - } -} diff --git a/src/adapter/etl-adapter-text/src/Flow/ETL/DSL/Text.php b/src/adapter/etl-adapter-text/src/Flow/ETL/DSL/Text.php deleted file mode 100644 index 803e2b29d..000000000 --- a/src/adapter/etl-adapter-text/src/Flow/ETL/DSL/Text.php +++ /dev/null @@ -1,58 +0,0 @@ -|Path|string $path - * - * @return Extractor - */ - final public static function from( - string|Path|array $path, - ) : Extractor { - if (\is_array($path)) { - $extractors = []; - - foreach ($path as $file_path) { - $extractors[] = new TextExtractor( - \is_string($file_path) ? Path::realpath($file_path) : $file_path, - ); - } - - return new Extractor\ChainExtractor(...$extractors); - } - - return new TextExtractor( - \is_string($path) ? Path::realpath($path) : $path, - ); - } - - /** - * @param Path|string $path - * @param string $new_line_separator - * - * @return Loader - */ - final public static function to( - string|Path $path, - string $new_line_separator = PHP_EOL - ) : Loader { - return new TextLoader( - \is_string($path) ? Path::realpath($path) : $path, - $new_line_separator - ); - } -} diff --git a/src/adapter/etl-adapter-xml/src/Flow/ETL/DSL/XML.php b/src/adapter/etl-adapter-xml/src/Flow/ETL/DSL/XML.php deleted file mode 100644 index 30f6fd747..000000000 --- a/src/adapter/etl-adapter-xml/src/Flow/ETL/DSL/XML.php +++ /dev/null @@ -1,45 +0,0 @@ -|Path|string $path - * @param string $xml_node_path - * - * @return Extractor - */ - final public static function from( - string|Path|array $path, - string $xml_node_path = '' - ) : Extractor { - if (\is_array($path)) { - /** @var array $extractors */ - $extractors = []; - - foreach ($path as $next_path) { - $extractors[] = new XMLReaderExtractor( - \is_string($next_path) ? Path::realpath($next_path) : $next_path, - $xml_node_path - ); - } - - return new Extractor\ChainExtractor(...$extractors); - } - - return new XMLReaderExtractor( - \is_string($path) ? Path::realpath($path) : $path, - $xml_node_path - ); - } -} diff --git a/src/core/etl/src/Flow/ETL/DSL/Entry.php b/src/core/etl/src/Flow/ETL/DSL/Entry.php deleted file mode 100644 index c1107214d..000000000 --- a/src/core/etl/src/Flow/ETL/DSL/Entry.php +++ /dev/null @@ -1,306 +0,0 @@ - $data - * - * @return RowEntry\ArrayEntry - */ - final public static function array(string $name, array $data) : RowEntry - { - return new RowEntry\ArrayEntry($name, $data); - } - - /** - * @throws InvalidArgumentException - */ - final public static function bool(string $name, bool $value) : RowEntry - { - return self::boolean($name, $value); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\BooleanEntry - */ - final public static function boolean(string $name, bool $value) : RowEntry - { - return new RowEntry\BooleanEntry($name, $value); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\DateTimeEntry - */ - final public static function datetime(string $name, \DateTimeInterface|string $value) : RowEntry - { - return new RowEntry\DateTimeEntry($name, $value); - } - - /** - * @throws InvalidArgumentException - */ - final public static function entries(RowEntry ...$entries) : Entries - { - return new Entries(...$entries); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\EnumEntry - */ - final public static function enum(string $name, \UnitEnum $enum) : RowEntry - { - return new RowEntry\EnumEntry($name, $enum); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\FloatEntry - */ - final public static function float(string $name, float $value) : RowEntry - { - return new RowEntry\FloatEntry($name, $value); - } - - final public static function int(string $name, int $value) : RowEntry - { - return self::integer($name, $value); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\IntegerEntry - */ - final public static function integer(string $name, int $value) : RowEntry - { - return new RowEntry\IntegerEntry($name, $value); - } - - /** - * @return RowEntry\JsonEntry - */ - final public static function json(string $name, array|string $data) : RowEntry - { - return new RowEntry\JsonEntry($name, $data); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\JsonEntry - */ - final public static function json_object(string $name, array|string $data) : RowEntry - { - if (\is_string($data)) { - return new RowEntry\JsonEntry($name, $data); - } - - return RowEntry\JsonEntry::object($name, $data); - } - - /** - * @param array $value - * - * @throws InvalidArgumentException - * - * @return RowEntry\ListEntry - */ - final public static function list_of_boolean(string $name, array $value) : RowEntry - { - return new RowEntry\ListEntry($name, $value, new ListType(ListElement::boolean())); - } - - /** - * @param array<\DateTimeInterface> $value - * - * @throws InvalidArgumentException - * - * @return RowEntry\ListEntry - */ - final public static function list_of_datetime(string $name, array $value) : RowEntry - { - return new RowEntry\ListEntry($name, $value, new ListType(ListElement::datetime())); - } - - /** - * @param array $value - * - * @throws InvalidArgumentException - * - * @return RowEntry\ListEntry - */ - final public static function list_of_float(string $name, array $value) : RowEntry - { - return new RowEntry\ListEntry($name, $value, new ListType(ListElement::float())); - } - - /** - * @param array $value - * - * @throws InvalidArgumentException - * - * @return RowEntry\ListEntry - */ - final public static function list_of_int(string $name, array $value) : RowEntry - { - return new RowEntry\ListEntry($name, $value, new ListType(ListElement::integer())); - } - - /** - * @param array<\DateTimeInterface> $value - * @param class-string $class - * - * @throws InvalidArgumentException - * - * @return RowEntry\ListEntry - */ - final public static function list_of_objects(string $name, string $class, array $value) : RowEntry - { - if (\is_a($class, \DateTimeInterface::class, true)) { - return new RowEntry\ListEntry($name, $value, new ListType(ListElement::datetime())); - } - - return new RowEntry\ListEntry($name, $value, new ListType(ListElement::object($class))); - } - - /** - * @param array $value - * - * @throws InvalidArgumentException - * - * @return RowEntry\ListEntry - */ - final public static function list_of_string(string $name, array $value) : RowEntry - { - return new RowEntry\ListEntry($name, $value, new ListType(ListElement::string())); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\MapEntry - */ - final public static function map(string $name, array $values, MapType $mapType) : RowEntry - { - return new RowEntry\MapEntry($name, $values, $mapType); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\NullEntry - */ - final public static function null(string $name) : RowEntry - { - return new RowEntry\NullEntry($name); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\ObjectEntry - */ - final public static function object(string $name, object $object) : RowEntry - { - return new RowEntry\ObjectEntry($name, $object); - } - - final public static function str(string $name, string $value) : RowEntry - { - return self::string($name, $value); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\StringEntry - */ - final public static function string(string $name, string $value) : RowEntry - { - return new RowEntry\StringEntry($name, $value); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\StringEntry - */ - final public static function string_lower(string $name, string $value) : RowEntry - { - return RowEntry\StringEntry::lowercase($name, $value); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\StringEntry - */ - final public static function string_upper(string $name, string $value) : RowEntry - { - return RowEntry\StringEntry::uppercase($name, $value); - } - - final public static function struct(string $name, array $values, StructureType $structureType) : RowEntry - { - return self::structure($name, $values, $structureType); - } - - /** - * @throws InvalidArgumentException - * - * @return RowEntry\StructureEntry - */ - final public static function structure(string $name, array $values, StructureType $structureType) : RowEntry - { - return new RowEntry\StructureEntry($name, $values, $structureType); - } - - /** - * @return RowEntry\UuidEntry - */ - final public static function uuid(string $name, Uuid|string $value) : RowEntry - { - return new RowEntry\UuidEntry($name, $value); - } - - /** - * @return RowEntry\XMLEntry - */ - final public static function xml(string $name, \DOMDocument|string $data) : RowEntry - { - return new RowEntry\XMLEntry($name, $data); - } - - /** - * @return RowEntry\XMLNodeEntry - */ - final public static function xml_node(string $name, \DOMNode $data) : RowEntry - { - return new RowEntry\XMLNodeEntry($name, $data); - } -} diff --git a/src/core/etl/src/Flow/ETL/DSL/From.php b/src/core/etl/src/Flow/ETL/DSL/From.php deleted file mode 100644 index b55e22dde..000000000 --- a/src/core/etl/src/Flow/ETL/DSL/From.php +++ /dev/null @@ -1,123 +0,0 @@ -> $array - */ - final public static function array(array $array) : Extractor - { - return new MemoryExtractor(new ArrayMemory($array)); - } - - final public static function cache(string $id, ?Extractor $fallback_extractor = null, bool $clear = false) : Extractor - { - return new Extractor\CacheExtractor($id, $fallback_extractor, $clear); - } - - final public static function chain(Extractor ...$extractors) : Extractor - { - return new Extractor\ChainExtractor(...$extractors); - } - - /** - * @param int<1, max> $chunk_size - */ - final public static function chunks_from(Extractor $extractor, int $chunk_size) : Extractor - { - return new Extractor\ChunkExtractor($extractor, $chunk_size); - } - - final public static function data_frame(DataFrame $data_frame) : Extractor - { - return new Extractor\DataFrameExtractor($data_frame); - } - - /** - * @param Memory $memory - * - * @return Extractor - */ - final public static function memory(Memory $memory) : Extractor - { - return new MemoryExtractor($memory); - } - - final public static function pipeline(Pipeline $pipeline) : Extractor - { - return new Extractor\PipelineExtractor($pipeline); - } - - final public static function rows(Rows ...$rows) : Extractor - { - return new ProcessExtractor(...$rows); - } - - /** - * @param string $entry_name - * @param \DateTimeInterface $start - * @param \DateInterval $interval - * @param \DateTimeInterface $end - * @param 0|1 $options - * - * @return Extractor - */ - final public static function sequence_date_period(string $entry_name, \DateTimeInterface $start, \DateInterval $interval, \DateTimeInterface $end, int $options = 0) : Extractor - { - return new Extractor\SequenceExtractor( - new Extractor\SequenceGenerator\DatePeriodSequenceGenerator(new \DatePeriod($start, $interval, $end, $options)), - $entry_name - ); - } - - /** - * @param string $entry_name - * @param \DateTimeInterface $start - * @param \DateInterval $interval - * @param int<1, max> $recurrences - * @param 0|1 $options - * - * @return Extractor - */ - final public static function sequence_date_period_recurrences(string $entry_name, \DateTimeInterface $start, \DateInterval $interval, int $recurrences, int $options = 0) : Extractor - { - return new Extractor\SequenceExtractor( - new Extractor\SequenceGenerator\DatePeriodSequenceGenerator(new \DatePeriod($start, $interval, $recurrences, $options)), - $entry_name - ); - } - - final public static function sequence_number(string $entry_name, string|int|float $start, string|int|float $end, int|float $step = 1) : Extractor - { - return new Extractor\SequenceExtractor( - new Extractor\SequenceGenerator\NumberSequenceGenerator($start, $end, $step), - $entry_name - ); - } -} diff --git a/src/core/etl/src/Flow/ETL/DSL/Handler.php b/src/core/etl/src/Flow/ETL/DSL/Handler.php deleted file mode 100644 index f69ad34f4..000000000 --- a/src/core/etl/src/Flow/ETL/DSL/Handler.php +++ /dev/null @@ -1,30 +0,0 @@ -keep(...$partitions)) { - return false; - } - } - - return true; - }); - } - - public static function date_after(string $partition, \DateTimeInterface $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - if ($p->name === $partition && new \DateTimeImmutable($p->value) > $value) { - return true; - } - } - - return false; - }); - } - - public static function date_after_or_equal(string $partition, \DateTimeInterface $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - if ($p->name === $partition && new \DateTimeImmutable($p->value) >= $value) { - return true; - } - } - - return false; - }); - } - - public static function date_before(string $partition, \DateTimeInterface $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - if ($p->name === $partition && new \DateTimeImmutable($p->value) < $value) { - return true; - } - } - - return false; - }); - } - - public static function date_before_or_equal(string $partition, \DateTimeInterface $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - if ($p->name === $partition && new \DateTimeImmutable($p->value) <= $value) { - return true; - } - } - - return false; - }); - } - - public static function date_between(string $partition, \DateTimeInterface $start, \DateTimeInterface $end) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $start, $end) : bool { - foreach ($partitions as $p) { - if ($p->name === $partition && new \DateTimeImmutable($p->value) >= $start && new \DateTimeImmutable($p->value) < $end) { - return true; - } - } - - return false; - }); - } - - public static function greater(string $partition, int|float $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - $castedValue = \is_int($value) ? (int) $p->value : (float) $p->value; - - if ($p->name === $partition && $castedValue > $value) { - return true; - } - } - - return false; - }); - } - - public static function greater_or_equal(string $partition, int|float $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - $castedValue = \is_int($value) ? (int) $p->value : (float) $p->value; - - if ($p->name === $partition && $castedValue >= $value) { - return true; - } - } - - return false; - }); - } - - public static function lower(string $partition, int|float $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - $castedValue = \is_int($value) ? (int) $p->value : (float) $p->value; - - if ($p->name === $partition && $castedValue < $value) { - return true; - } - } - - return false; - }); - } - - public static function lower_or_equal(string $partition, int|float $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - $castedValue = \is_int($value) ? (int) $p->value : (float) $p->value; - - if ($p->name === $partition && $castedValue <= $value) { - return true; - } - } - - return false; - }); - } - - public static function not(PartitionFilter $filter) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static fn (FlowPartition ...$partitions) : bool => !$filter->keep(...$partitions)); - } - - /** - * @param string $partition - * @param array $values - * - * @return PartitionFilter - */ - public static function one_of(string $partition, array $values) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter( - static function (FlowPartition ...$partitions) use ($partition, $values) : bool { - foreach ($partitions as $p) { - if ($p->name === $partition && \in_array($p->value, $values, true)) { - return true; - } - } - - return false; - } - ); - } - - public static function only(string $partition, string $value) : PartitionFilter - { - /** @psalm-suppress DeprecatedClass */ - return new CallableFilter(static function (FlowPartition ...$partitions) use ($partition, $value) : bool { - foreach ($partitions as $p) { - if ($p->name === $partition && $p->value === $value) { - return true; - } - } - - return false; - }); - } -} diff --git a/src/core/etl/src/Flow/ETL/DSL/To.php b/src/core/etl/src/Flow/ETL/DSL/To.php deleted file mode 100644 index 00a8122ff..000000000 --- a/src/core/etl/src/Flow/ETL/DSL/To.php +++ /dev/null @@ -1,57 +0,0 @@ - $chunks - */ - public function parallelize(int $chunks) : self - { - $this->pipeline = new ParallelizingPipeline($this->pipeline, $chunks); - - return $this; - } - /** * @lazy */ diff --git a/src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php index b38453574..64b429c25 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php @@ -41,4 +41,9 @@ public function pivot(Reference $ref) : DataFrame { return $this->df->pivot($ref); } + + public function rename(string $from, string $to) : DataFrame + { + return $this->df->rename($from, $to); + } } diff --git a/src/core/etl/src/Flow/ETL/Partition/CallableFilter.php b/src/core/etl/src/Flow/ETL/Partition/CallableFilter.php deleted file mode 100644 index cb45514d1..000000000 --- a/src/core/etl/src/Flow/ETL/Partition/CallableFilter.php +++ /dev/null @@ -1,31 +0,0 @@ -filter = $filter; - } - - public function keep(Partition ...$partitions) : bool - { - return ($this->filter)(...$partitions); - } -} diff --git a/src/core/etl/src/Flow/ETL/Pipeline/Optimizer/BatchSizeOptimization.php b/src/core/etl/src/Flow/ETL/Pipeline/Optimizer/BatchSizeOptimization.php index d1b70de10..24b526e16 100644 --- a/src/core/etl/src/Flow/ETL/Pipeline/Optimizer/BatchSizeOptimization.php +++ b/src/core/etl/src/Flow/ETL/Pipeline/Optimizer/BatchSizeOptimization.php @@ -7,7 +7,6 @@ use Flow\ETL\Pipeline\BatchingPipeline; use Flow\ETL\Pipeline\CollectingPipeline; use Flow\ETL\Pipeline\OverridingPipeline; -use Flow\ETL\Pipeline\ParallelizingPipeline; use Flow\ETL\Pipeline\PartitioningPipeline; use Flow\ETL\Transformer; @@ -22,14 +21,11 @@ final class BatchSizeOptimization implements Optimization { /** - * @psalm-suppress DeprecatedClass - * * @var array> */ private array $batchingPipelines = [ BatchingPipeline::class, CollectingPipeline::class, - ParallelizingPipeline::class, PartitioningPipeline::class, ]; diff --git a/src/core/etl/src/Flow/ETL/Pipeline/Optimizer/LimitOptimization.php b/src/core/etl/src/Flow/ETL/Pipeline/Optimizer/LimitOptimization.php index 96f98eed8..31d64b1d0 100644 --- a/src/core/etl/src/Flow/ETL/Pipeline/Optimizer/LimitOptimization.php +++ b/src/core/etl/src/Flow/ETL/Pipeline/Optimizer/LimitOptimization.php @@ -9,7 +9,6 @@ use Flow\ETL\Pipeline\BatchingPipeline; use Flow\ETL\Pipeline\CollectingPipeline; use Flow\ETL\Pipeline\NestedPipeline; -use Flow\ETL\Pipeline\ParallelizingPipeline; use Flow\ETL\Pipeline\SynchronousPipeline; use Flow\ETL\Pipeline\VoidPipeline; use Flow\ETL\Transformer; @@ -25,15 +24,11 @@ final class LimitOptimization implements Optimization { - /** - * @psalm-suppress DeprecatedClass - */ private array $nonExpandingPipelines = [ SynchronousPipeline::class, CollectingPipeline::class, BatchingPipeline::class, NestedPipeline::class, - ParallelizingPipeline::class, VoidPipeline::class, ]; diff --git a/src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php b/src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php deleted file mode 100644 index 2b2bf4c30..000000000 --- a/src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php +++ /dev/null @@ -1,99 +0,0 @@ - $parallel - */ - public function __construct( - private readonly Pipeline $pipeline, - private readonly int $parallel - ) { - $this->nextPipeline = $pipeline->cleanCopy(); - } - - public function add(Loader|Transformer $pipe) : self - { - $this->nextPipeline->add($pipe); - - return $this; - } - - public function cleanCopy() : Pipeline - { - return new self($this->pipeline, $this->parallel); - } - - public function closure(FlowContext $context) : void - { - $this->pipeline->closure($context); - } - - public function has(string $transformerClass) : bool - { - return $this->pipeline->has($transformerClass); - } - - /** - * @return array - */ - public function pipelines() : array - { - $pipelines = []; - - if ($this->pipeline instanceof OverridingPipeline) { - $pipelines = $this->pipeline->pipelines(); - } - $pipelines[] = $this->pipeline; - - return $pipelines; - } - - public function pipes() : Pipes - { - return $this->pipeline->pipes()->merge($this->nextPipeline->pipes()); - } - - public function process(FlowContext $context) : \Generator - { - $this->nextPipeline->setSource( - chunks_from( - from_pipeline($this->pipeline), - $this->parallel - ) - ); - - return $this->nextPipeline->process($context); - } - - public function setSource(Extractor $extractor) : self - { - $this->pipeline->setSource($extractor); - - return $this; - } - - public function source() : Extractor - { - return $this->pipeline->source(); - } -} diff --git a/src/core/etl/src/Flow/ETL/Row/Schema.php b/src/core/etl/src/Flow/ETL/Row/Schema.php index 40a44600a..9c625ebd4 100644 --- a/src/core/etl/src/Flow/ETL/Row/Schema.php +++ b/src/core/etl/src/Flow/ETL/Row/Schema.php @@ -239,14 +239,6 @@ public function replace(string|Reference $entry, Definition $definition) : self return $this; } - /** - * @deprecated Use `remove` instead - */ - public function without(string|Reference ...$entries) : self - { - return $this->remove(...$entries); - } - private function setDefinitions(Definition ...$definitions) : void { $uniqueDefinitions = []; diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/GroupByTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/GroupByTest.php index ccd6e9698..86cf9baa0 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/GroupByTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/GroupByTest.php @@ -19,7 +19,6 @@ use function Flow\ETL\DSL\str_entry; use function Flow\ETL\DSL\sum; use function Flow\ETL\DSL\window; -use Flow\ETL\DSL\Transform; use Flow\ETL\Loader; use Flow\ETL\Memory\ArrayMemory; use Flow\ETL\Row; @@ -290,7 +289,7 @@ public function test_standalone_avg_aggregation() : void ) )) ->aggregate(average(ref('age'))) - ->rows(Transform::rename('age_avg', 'average_age')) + ->rename('age_avg', 'average_age') ->fetch(); $this->assertEquals( diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Pipeline/ParallelizingPipelineTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Pipeline/ParallelizingPipelineTest.php deleted file mode 100644 index c66e5f115..000000000 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Pipeline/ParallelizingPipelineTest.php +++ /dev/null @@ -1,63 +0,0 @@ -setSource(from_array([ - ['id' => 1], - ['id' => 2], - ['id' => 3], - ['id' => 4], - ['id' => 5], - ['id' => 6], - ['id' => 7], - ['id' => 8], - ['id' => 9], - ['id' => 10], - ])); - - $this->assertCount( - 2, - \iterator_to_array($pipeline->process(new FlowContext(Config::default()))) - ); - } - - public function test_parallelizing_pipeline_with_batch_size_greater_than_total_number_of_rows() : void - { - $pipeline = new ParallelizingPipeline( - new SynchronousPipeline(), - 15 - ); - $pipeline->setSource(from_array([ - ['id' => 1], - ['id' => 2], - ['id' => 3], - ['id' => 4], - ['id' => 5], - ['id' => 6], - ['id' => 7], - ['id' => 8], - ['id' => 9], - ['id' => 10], - ])); - - $this->assertCount( - 1, - \iterator_to_array($pipeline->process(new FlowContext(Config::default()))) - ); - } -} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php index 639197530..1cf252af2 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php @@ -10,8 +10,8 @@ use Flow\ETL\Config; use Flow\ETL\Extractor\ProcessExtractor; use Flow\ETL\FlowContext; +use Flow\ETL\Pipeline\BatchingPipeline; use Flow\ETL\Pipeline\NestedPipeline; -use Flow\ETL\Pipeline\ParallelizingPipeline; use Flow\ETL\Pipeline\SynchronousPipeline; use Flow\ETL\Row; use Flow\ETL\Rows; @@ -24,7 +24,7 @@ public function test_nested_pipelines() : void { $pipeline = new NestedPipeline( (new SynchronousPipeline())->add(new ScalarFunctionTransformer('active', lit(true))), - new ParallelizingPipeline(new SynchronousPipeline(), 1) + new BatchingPipeline(new SynchronousPipeline(), 1) ); $pipeline->setSource(new ProcessExtractor( diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/Optimizer/BatchSizeOptimizationTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/Optimizer/BatchSizeOptimizationTest.php index daa2daa3a..8aa89a5f1 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/Optimizer/BatchSizeOptimizationTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Pipeline/Optimizer/BatchSizeOptimizationTest.php @@ -8,7 +8,6 @@ use Flow\ETL\Pipeline\CollectingPipeline; use Flow\ETL\Pipeline\NestedPipeline; use Flow\ETL\Pipeline\Optimizer\BatchSizeOptimization; -use Flow\ETL\Pipeline\ParallelizingPipeline; use Flow\ETL\Pipeline\SynchronousPipeline; use Flow\ETL\Transformer; use PHPUnit\Framework\TestCase; @@ -71,13 +70,4 @@ public function test_is_for_collecting_pipeline() : void (new BatchSizeOptimization())->isFor(new DbalLoader('test', [], []), $pipeline) ); } - - public function test_is_for_parallelizing_pipeline() : void - { - $pipeline = new ParallelizingPipeline(new SynchronousPipeline(), 10); - - $this->assertFalse( - (new BatchSizeOptimization())->isFor(new DbalLoader('test', [], []), $pipeline) - ); - } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/SchemaTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/SchemaTest.php index d684ac76a..09e47acca 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/SchemaTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/SchemaTest.php @@ -216,15 +216,6 @@ public function test_remove_non_existing_definition() : void public function test_removing_elements_from_schema() : void { - $this->assertEquals( - schema( - int_schema('id'), - ), - schema( - int_schema('id'), - str_schema('name'), - )->without('name') - ); $this->assertEquals( schema( int_schema('id'), diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/KeepEntriesTransformerTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/KeepEntriesTransformerTest.php index ad11e8be3..0d8501fea 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/KeepEntriesTransformerTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/KeepEntriesTransformerTest.php @@ -5,10 +5,10 @@ namespace Flow\ETL\Tests\Unit\Transformer; use Flow\ETL\Config; -use Flow\ETL\DSL\Transform; use Flow\ETL\FlowContext; use Flow\ETL\Row; use Flow\ETL\Rows; +use Flow\ETL\Transformer\KeepEntriesTransformer; use PHPUnit\Framework\TestCase; final class KeepEntriesTransformerTest extends TestCase @@ -23,7 +23,7 @@ public function test_keeping_entries() : void ) ); - $transformer = Transform::keep('name'); + $transformer = new KeepEntriesTransformer('name'); $this->assertSame( [ ['name' => 'Row Name'], @@ -42,7 +42,7 @@ public function test_keeping_not_existing_entries() : void ) ); - $transformer = Transform::keep('not_existing'); + $transformer = new KeepEntriesTransformer('not_existing'); $this->assertSame( [['not_existing' => null]], $transformer->transform($rows, new FlowContext(Config::default()))->toArray() diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/RemoveEntriesTransformerTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/RemoveEntriesTransformerTest.php index 81e619e3a..d5e298a10 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/RemoveEntriesTransformerTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/RemoveEntriesTransformerTest.php @@ -5,10 +5,10 @@ namespace Flow\ETL\Tests\Unit\Transformer; use Flow\ETL\Config; -use Flow\ETL\DSL\Transform; use Flow\ETL\FlowContext; use Flow\ETL\Row; use Flow\ETL\Rows; +use Flow\ETL\Transformer\RemoveEntriesTransformer; use PHPUnit\Framework\TestCase; final class RemoveEntriesTransformerTest extends TestCase @@ -23,7 +23,7 @@ public function test_removing_entries() : void ) ); - $transformer = Transform::remove('id', 'array'); + $transformer = new RemoveEntriesTransformer('id', 'array'); $this->assertSame( [ ['name' => 'Row Name'], @@ -42,7 +42,7 @@ public function test_removing_not_existing_entries() : void ) ); - $transformer = Transform::remove('not_existing'); + $transformer = new RemoveEntriesTransformer('not_existing'); $this->assertSame( [ ['id' => 1, 'name' => 'Row Name', 'array' => ['test']], diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/RenameEntryTransformerTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/RenameEntryTransformerTest.php index 5766179a3..eea38f8df 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/RenameEntryTransformerTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/RenameEntryTransformerTest.php @@ -5,18 +5,18 @@ namespace Flow\ETL\Tests\Unit\Transformer; use Flow\ETL\Config; -use Flow\ETL\DSL\Transform; use Flow\ETL\FlowContext; use Flow\ETL\Row; use Flow\ETL\Rows; +use Flow\ETL\Transformer\RenameEntryTransformer; use PHPUnit\Framework\TestCase; final class RenameEntryTransformerTest extends TestCase { public function test_renaming_entries() : void { - $renameTransformerOne = Transform::rename('old_int', 'new_int'); - $renameTransformerTwo = Transform::rename('null', 'nothing'); + $renameTransformerOne = new RenameEntryTransformer('old_int', 'new_int'); + $renameTransformerTwo = new RenameEntryTransformer('null', 'nothing'); $rows = $renameTransformerOne->transform( new Rows( From bf8859d96703a211276aa73702942bd8c2076a92 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Sun, 18 Feb 2024 21:38:09 +0100 Subject: [PATCH 2/3] Removed serialization leftover --- .../Elasticsearch/EntryIdFactory/EntryIdFactory.php | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/EntryIdFactory/EntryIdFactory.php b/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/EntryIdFactory/EntryIdFactory.php index 414cef635..bfbccb634 100644 --- a/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/EntryIdFactory/EntryIdFactory.php +++ b/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/EntryIdFactory/EntryIdFactory.php @@ -14,18 +14,6 @@ public function __construct(private string $entryName) { } - public function __serialize() : array - { - return [ - 'entry_name' => $this->entryName, - ]; - } - - public function __unserialize(array $data) : void - { - $this->entryName = $data['entry_name']; - } - public function create(Row $row) : Entry { return $row->get($this->entryName)->rename('id'); From a696ac10a0278912f16146108e1c7ffad1c9118c Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Sun, 18 Feb 2024 21:44:25 +0100 Subject: [PATCH 3/3] Fixed readme --- UPGRADE.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/UPGRADE.md b/UPGRADE.md index 4f1b54d5f..33a2ec42e 100644 --- a/UPGRADE.md +++ b/UPGRADE.md @@ -170,8 +170,6 @@ We reworked most of the internal transformers to new scalar functions and entry All available functions can be found in [`ETL\Row\Function` folder](src/core/etl/src/Flow/ETL/Function) or in [`ETL\DSL\functions` file](src/core/etl/src/Flow/ETL/DSL/functions.php), and entry scalar functions are defined in `EntryScalarFunction`. -To see what transformers are available see [`ETL\DSL\Transform` class](src/core/etl/src/Flow/ETL/DSL/Transform.php). - Before: ```php