diff --git a/src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalLoader.php b/src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalLoader.php index 7598893eb..bed468934 100644 --- a/src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalLoader.php +++ b/src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalLoader.php @@ -16,7 +16,7 @@ /** * @implements Loader, * connection_params: array, * operation: string, * operation_options: array{ @@ -35,8 +35,7 @@ final class DbalLoader implements Loader private string $operation; /** - * @param string $tableName - * @param int $chunkSize + * @param int<1, max> $chunkSize * @param array $connectionParams * @param array{ * skip_conflicts?: boolean, @@ -45,7 +44,6 @@ final class DbalLoader implements Loader * update_columns?: array, * primary_key_columns?: array * } $operationOptions - * @param string $operation * * @throws InvalidArgumentException */ @@ -66,9 +64,7 @@ public function __construct( * Since Connection::getParams() is marked as an internal method, please * use this constructor with caution. * - * @param Connection $connection - * @param string $tableName - * @param int $chunkSize + * @param int<1, max> $chunkSize * @param array{ * skip_conflicts?: boolean, * constraint?: string, @@ -76,7 +72,6 @@ public function __construct( * update_columns?: array, * primary_key_columns?: array * } $operationOptions - * @param string $operation * * @throws InvalidArgumentException */ diff --git a/src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php b/src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php index 988ee4a26..07731e9f3 100644 --- a/src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php +++ b/src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php @@ -71,8 +71,6 @@ final public static function from_limit_offset( * @param int $page_size * @param null|int $maximum * - * @throws InvalidArgumentException - * * @return Extractor */ final public static function from_limit_offset_qb( @@ -135,8 +133,7 @@ final public static function from_query( /** * @param array|Connection $connection - * @param string $table - * @param int $chunk_size + * @param int<1, max> $chunk_size * @param array{ * skip_conflicts?: boolean, * constraint?: string, @@ -146,8 +143,6 @@ final public static function from_query( * } $options * * @throws InvalidArgumentException - * - * @return Loader */ final public static function to_table_insert( array|Connection $connection, @@ -162,8 +157,7 @@ final public static function to_table_insert( /** * @param array|Connection $connection - * @param string $table - * @param int $chunk_size + * @param int<1, max> $chunk_size * @param array{ * skip_conflicts?: boolean, * constraint?: string, diff --git a/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/ElasticsearchLoader.php b/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/ElasticsearchLoader.php index e34b4cc28..33528f3a7 100644 --- a/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/ElasticsearchLoader.php +++ b/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/ElasticsearchLoader.php @@ -25,7 +25,7 @@ * elasticMetaHeader?: boolean, * includePortInHostHeader?: boolean * }, - * chunk_size: int, + * chunk_size: int<1, max>, * index: string, * id_factory: IdFactory, * parameters: array, @@ -41,6 +41,7 @@ final class ElasticsearchLoader implements Loader /** * @param array{hosts?: array, connectionParams?: array, retries?: int, sniffOnStart?: boolean, sslCert?: array, sslKey?: array, sslVerification?: (boolean|string), elasticMetaHeader?: boolean, includePortInHostHeader?: boolean} $config + * @param int<1, max> $chunkSize * @param array $parameters */ public function __construct( @@ -66,6 +67,7 @@ public function __construct( * elasticMetaHeader?: boolean, * includePortInHostHeader?: boolean * } $clientConfig + * @param int<1, max> $chunkSize * @param array $parameters */ public static function update(array $clientConfig, int $chunkSize, string $index, IdFactory $idFactory, array $parameters = []) : self diff --git a/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php b/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php index 4d9350cf9..af98f6b86 100644 --- a/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php +++ b/src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/DSL/Elasticsearch.php @@ -29,7 +29,7 @@ class Elasticsearch * elasticMetaHeader?: boolean, * includePortInHostHeader?: boolean * } $config - * @param int $chunk_size + * @param int<1, max> $chunk_size * @param string $index * @param IdFactory $id_factory * @param array $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html @@ -60,7 +60,7 @@ final public static function bulk_index( * elasticMetaHeader?: boolean, * includePortInHostHeader?: boolean * } $config - * @param int $chunk_size + * @param int<1, max> $chunk_size * @param string $index * @param IdFactory $id_factory * @param array $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html diff --git a/src/core/etl/src/Flow/ETL/DSL/From.php b/src/core/etl/src/Flow/ETL/DSL/From.php index 253e55237..173f8769a 100644 --- a/src/core/etl/src/Flow/ETL/DSL/From.php +++ b/src/core/etl/src/Flow/ETL/DSL/From.php @@ -35,6 +35,9 @@ final public static function array(array $array, int $batch_size = 100) : Extrac return new MemoryExtractor(new ArrayMemory($array), $batch_size); } + /** + * @param int<1, max> $max_row_size + */ final public static function buffer(Extractor $extractor, int $max_row_size) : Extractor { return new Extractor\BufferExtractor($extractor, $max_row_size); @@ -50,6 +53,9 @@ final public static function chain(Extractor ...$extractors) : Extractor return new Extractor\ChainExtractor(...$extractors); } + /** + * @param int<1, max> $chunk_size + */ final public static function chunks_from(Extractor $extractor, int $chunk_size) : Extractor { return new Extractor\ChunkExtractor($extractor, $chunk_size); diff --git a/src/core/etl/src/Flow/ETL/DSL/To.php b/src/core/etl/src/Flow/ETL/DSL/To.php index 1f908ea5f..68c59d4e8 100644 --- a/src/core/etl/src/Flow/ETL/DSL/To.php +++ b/src/core/etl/src/Flow/ETL/DSL/To.php @@ -18,6 +18,9 @@ */ class To { + /** + * @param int<1, max> $bufferSize + */ final public static function buffer(Loader $overflowLoader, int $bufferSize) : Loader { return new Loader\BufferLoader($overflowLoader, $bufferSize); diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index c8137fc2c..4ec766908 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -416,7 +416,7 @@ public function onError(ErrorHandler $handler) : self * * @lazy * - * @throws InvalidArgumentException + * @param int<1, max> $chunks */ public function parallelize(int $chunks) : self { diff --git a/src/core/etl/src/Flow/ETL/ExternalSort/BufferCache.php b/src/core/etl/src/Flow/ETL/ExternalSort/BufferCache.php index 4bde76c54..633dd60ee 100644 --- a/src/core/etl/src/Flow/ETL/ExternalSort/BufferCache.php +++ b/src/core/etl/src/Flow/ETL/ExternalSort/BufferCache.php @@ -17,6 +17,9 @@ final class BufferCache */ private array $buffers = []; + /** + * @param int<1, max> $bufferSize + */ public function __construct( private readonly Cache $overflowCache, private readonly int $bufferSize diff --git a/src/core/etl/src/Flow/ETL/ExternalSort/CacheExternalSort.php b/src/core/etl/src/Flow/ETL/ExternalSort/CacheExternalSort.php index 9a6e123a7..9fa7f4c5c 100644 --- a/src/core/etl/src/Flow/ETL/ExternalSort/CacheExternalSort.php +++ b/src/core/etl/src/Flow/ETL/ExternalSort/CacheExternalSort.php @@ -34,7 +34,7 @@ public function sortBy(EntryReference ...$refs) : Extractor { /** @var array> $cachedPartsArray */ $cachedPartsArray = []; - $maxRowsSize = 0; + $maxRowsSize = 1; /** @var int $i */ foreach ($this->cache->read($this->id) as $i => $rows) { diff --git a/src/core/etl/src/Flow/ETL/ExternalSort/MemorySort.php b/src/core/etl/src/Flow/ETL/ExternalSort/MemorySort.php index 11d946f0a..cd7b6500c 100644 --- a/src/core/etl/src/Flow/ETL/ExternalSort/MemorySort.php +++ b/src/core/etl/src/Flow/ETL/ExternalSort/MemorySort.php @@ -31,7 +31,7 @@ public function __construct( private readonly Cache $cache, private Unit $maximumMemory ) { - $this->configuration = new Configuration($safetyBufferPercentage = 10); + $this->configuration = new Configuration(10); if ($this->configuration->isLessThan($maximumMemory) && !$this->configuration->isInfinite()) { /** @@ -48,7 +48,7 @@ public function sortBy(EntryReference ...$refs) : Extractor $memoryConsumption = new Consumption(); $mergedRows = new Rows(); - $maxSize = 0; + $maxSize = 1; foreach ($this->cache->read($this->cacheId) as $rows) { $maxSize = \max($rows->count(), $maxSize); @@ -64,6 +64,6 @@ public function sortBy(EntryReference ...$refs) : Extractor $this->cache->clear($this->cacheId); - return new Extractor\ProcessExtractor(...$mergedRows->sortBy(...$refs)->chunks($maxSize)); + return new Extractor\ProcessExtractor(...\iterator_to_array($mergedRows->sortBy(...$refs)->chunks($maxSize))); } } diff --git a/src/core/etl/src/Flow/ETL/Extractor/BufferExtractor.php b/src/core/etl/src/Flow/ETL/Extractor/BufferExtractor.php index 6b21697ba..aca0c9f72 100644 --- a/src/core/etl/src/Flow/ETL/Extractor/BufferExtractor.php +++ b/src/core/etl/src/Flow/ETL/Extractor/BufferExtractor.php @@ -10,6 +10,9 @@ final class BufferExtractor implements Extractor, OverridingExtractor { + /** + * @param int<1, max> $maxRowsSize + */ public function __construct( private readonly Extractor $extractor, private readonly int $maxRowsSize diff --git a/src/core/etl/src/Flow/ETL/Extractor/ChunkExtractor.php b/src/core/etl/src/Flow/ETL/Extractor/ChunkExtractor.php index ce8ea4fb7..c2407356e 100644 --- a/src/core/etl/src/Flow/ETL/Extractor/ChunkExtractor.php +++ b/src/core/etl/src/Flow/ETL/Extractor/ChunkExtractor.php @@ -9,6 +9,9 @@ final class ChunkExtractor implements Extractor, OverridingExtractor { + /** + * @param int<1, max> $chunkSize + */ public function __construct( private readonly Extractor $extractor, private readonly int $chunkSize diff --git a/src/core/etl/src/Flow/ETL/Loader/BufferLoader.php b/src/core/etl/src/Flow/ETL/Loader/BufferLoader.php index 1cc1d53c5..fc86eb94c 100644 --- a/src/core/etl/src/Flow/ETL/Loader/BufferLoader.php +++ b/src/core/etl/src/Flow/ETL/Loader/BufferLoader.php @@ -10,12 +10,15 @@ use Flow\ETL\Rows; /** - * @implements Loader + * @implements Loader}> */ final class BufferLoader implements Closure, Loader, OverridingLoader { private Rows $buffer; + /** + * @param int<1, max> $bufferSize + */ public function __construct(private readonly Loader $overflowLoader, private readonly int $bufferSize) { $this->buffer = new Rows(); diff --git a/src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php b/src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php index 7dc38a1f0..aea050821 100644 --- a/src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php +++ b/src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php @@ -5,7 +5,6 @@ namespace Flow\ETL\Pipeline; use Flow\ETL\DSL\From; -use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Extractor; use Flow\ETL\FlowContext; use Flow\ETL\Loader; @@ -20,14 +19,13 @@ final class ParallelizingPipeline implements Pipeline { private readonly Pipeline $nextPipeline; + /** + * @param int<1, max> $parallel + */ public function __construct( private readonly Pipeline $pipeline, private readonly int $parallel ) { - if ($parallel < 1) { - throw new InvalidArgumentException("Parallel value can't be lower than 1."); - } - $this->nextPipeline = $pipeline->cleanCopy(); } diff --git a/src/core/etl/src/Flow/ETL/Rows.php b/src/core/etl/src/Flow/ETL/Rows.php index 6aa406118..8d912619e 100644 --- a/src/core/etl/src/Flow/ETL/Rows.php +++ b/src/core/etl/src/Flow/ETL/Rows.php @@ -59,26 +59,21 @@ public function __unserialize(array $data) : void public function add(Row ...$rows) : self { return new self( - ...\array_merge($this->rows, $rows) + ...$this->rows, + ...$rows ); } /** - * @return Rows[] + * @param int<1, max> $size + * + * @return \Generator */ - public function chunks(int $size) : array + public function chunks(int $size) : \Generator { - if ($size < 1) { - throw InvalidArgumentException::because('Chunk size must be greater than 0'); - } - - $chunks = []; - foreach (\array_chunk($this->rows, $size) as $chunk) { - $chunks[] = new self(...$chunk); + yield new self(...$chunk); } - - return $chunks; } public function count() : int diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/RowsTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/RowsTest.php index 518c582ba..24b1afbfd 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/RowsTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/RowsTest.php @@ -189,13 +189,6 @@ public function test_building_rows_from_array() : void ); } - public function test_chunks_smaller_than_1() : void - { - $this->expectException(InvalidArgumentException::class); - - (new Rows())->chunks(-1); - } - public function test_chunks_with_less() : void { $rows = new Rows( @@ -208,8 +201,10 @@ public function test_chunks_with_less() : void Row::create(new IntegerEntry('id', 7)), ); - $this->assertCount(1, $rows->chunks(10)); - $this->assertSame([1, 2, 3, 4, 5, 6, 7], $rows->chunks(10)[0]->reduceToArray('id')); + $chunk = \iterator_to_array($rows->chunks(10)); + + $this->assertCount(1, $chunk); + $this->assertSame([1, 2, 3, 4, 5, 6, 7], $chunk[0]->reduceToArray('id')); } public function test_chunks_with_more_than_expected_in_chunk_rows() : void @@ -227,9 +222,11 @@ public function test_chunks_with_more_than_expected_in_chunk_rows() : void Row::create(new IntegerEntry('id', 10)), ); - $this->assertCount(2, $rows->chunks(5)); - $this->assertSame([1, 2, 3, 4, 5], $rows->chunks(5)[0]->reduceToArray('id')); - $this->assertSame([6, 7, 8, 9, 10], $rows->chunks(5)[1]->reduceToArray('id')); + $chunk = \iterator_to_array($rows->chunks(5)); + + $this->assertCount(2, $chunk); + $this->assertSame([1, 2, 3, 4, 5], $chunk[0]->reduceToArray('id')); + $this->assertSame([6, 7, 8, 9, 10], $chunk[1]->reduceToArray('id')); } public function test_drop() : void