Skip to content

Commit

Permalink
Adjust Rows::chunk() to work on generators instead of arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
stloyd committed Oct 27, 2023
1 parent 212f07a commit 6defcaa
Show file tree
Hide file tree
Showing 16 changed files with 56 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* @implements Loader<array{
* table_name: string,
* chunk_size: int,
* chunk_size: int<1, max>,
* connection_params: array<string, mixed>,
* operation: string,
* operation_options: array{
Expand All @@ -35,8 +35,7 @@ final class DbalLoader implements Loader
private string $operation;

/**
* @param string $tableName
* @param int $chunkSize
* @param int<1, max> $chunkSize
* @param array<string, mixed> $connectionParams
* @param array{
* skip_conflicts?: boolean,
Expand All @@ -45,7 +44,6 @@ final class DbalLoader implements Loader
* update_columns?: array<string>,
* primary_key_columns?: array<string>
* } $operationOptions
* @param string $operation
*
* @throws InvalidArgumentException
*/
Expand All @@ -66,17 +64,14 @@ public function __construct(
* Since Connection::getParams() is marked as an internal method, please
* use this constructor with caution.
*
* @param Connection $connection
* @param string $tableName
* @param int $chunkSize
* @param int<1, max> $chunkSize
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
* conflict_columns?: array<string>,
* update_columns?: array<string>,
* primary_key_columns?: array<string>
* } $operationOptions
* @param string $operation
*
* @throws InvalidArgumentException
*/
Expand Down
10 changes: 2 additions & 8 deletions src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ final public static function from_limit_offset(
* @param int $page_size
* @param null|int $maximum
*
* @throws InvalidArgumentException
*
* @return Extractor
*/
final public static function from_limit_offset_qb(
Expand Down Expand Up @@ -135,8 +133,7 @@ final public static function from_query(

/**
* @param array<string, mixed>|Connection $connection
* @param string $table
* @param int $chunk_size
* @param int<1, max> $chunk_size
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
Expand All @@ -146,8 +143,6 @@ final public static function from_query(
* } $options
*
* @throws InvalidArgumentException
*
* @return Loader
*/
final public static function to_table_insert(
array|Connection $connection,
Expand All @@ -162,8 +157,7 @@ final public static function to_table_insert(

/**
* @param array<string, mixed>|Connection $connection
* @param string $table
* @param int $chunk_size
* @param int<1, max> $chunk_size
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* elasticMetaHeader?: boolean,
* includePortInHostHeader?: boolean
* },
* chunk_size: int,
* chunk_size: int<1, max>,
* index: string,
* id_factory: IdFactory,
* parameters: array<mixed>,
Expand All @@ -41,6 +41,7 @@ final class ElasticsearchLoader implements Loader

/**
* @param array{hosts?: array<string>, connectionParams?: array<mixed>, retries?: int, sniffOnStart?: boolean, sslCert?: array<string>, sslKey?: array<string>, sslVerification?: (boolean|string), elasticMetaHeader?: boolean, includePortInHostHeader?: boolean} $config
* @param int<1, max> $chunkSize
* @param array<mixed> $parameters
*/
public function __construct(
Expand All @@ -66,6 +67,7 @@ public function __construct(
* elasticMetaHeader?: boolean,
* includePortInHostHeader?: boolean
* } $clientConfig
* @param int<1, max> $chunkSize
* @param array<mixed> $parameters
*/
public static function update(array $clientConfig, int $chunkSize, string $index, IdFactory $idFactory, array $parameters = []) : self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Elasticsearch
* elasticMetaHeader?: boolean,
* includePortInHostHeader?: boolean
* } $config
* @param int $chunk_size
* @param int<1, max> $chunk_size
* @param string $index
* @param IdFactory $id_factory
* @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html
Expand Down Expand Up @@ -60,7 +60,7 @@ final public static function bulk_index(
* elasticMetaHeader?: boolean,
* includePortInHostHeader?: boolean
* } $config
* @param int $chunk_size
* @param int<1, max> $chunk_size
* @param string $index
* @param IdFactory $id_factory
* @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html
Expand Down
6 changes: 6 additions & 0 deletions src/core/etl/src/Flow/ETL/DSL/From.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ final public static function array(array $array, int $batch_size = 100) : Extrac
return new MemoryExtractor(new ArrayMemory($array), $batch_size);
}

/**
* @param int<1, max> $max_row_size
*/
final public static function buffer(Extractor $extractor, int $max_row_size) : Extractor
{
return new Extractor\BufferExtractor($extractor, $max_row_size);
Expand All @@ -50,6 +53,9 @@ final public static function chain(Extractor ...$extractors) : Extractor
return new Extractor\ChainExtractor(...$extractors);
}

/**
* @param int<1, max> $chunk_size
*/
final public static function chunks_from(Extractor $extractor, int $chunk_size) : Extractor
{
return new Extractor\ChunkExtractor($extractor, $chunk_size);
Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/DSL/To.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
class To
{
/**
* @param int<1, max> $bufferSize
*/
final public static function buffer(Loader $overflowLoader, int $bufferSize) : Loader
{
return new Loader\BufferLoader($overflowLoader, $bufferSize);
Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public function onError(ErrorHandler $handler) : self
*
* @lazy
*
* @throws InvalidArgumentException
* @param int<1, max> $chunks
*/
public function parallelize(int $chunks) : self
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/ExternalSort/BufferCache.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ final class BufferCache
*/
private array $buffers = [];

/**
* @param int<1, max> $bufferSize
*/
public function __construct(
private readonly Cache $overflowCache,
private readonly int $bufferSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function sortBy(EntryReference ...$refs) : Extractor
{
/** @var array<string, \Generator<Rows>> $cachedPartsArray */
$cachedPartsArray = [];
$maxRowsSize = 0;
$maxRowsSize = 1;

/** @var int $i */
foreach ($this->cache->read($this->id) as $i => $rows) {
Expand Down
6 changes: 3 additions & 3 deletions src/core/etl/src/Flow/ETL/ExternalSort/MemorySort.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function __construct(
private readonly Cache $cache,
private Unit $maximumMemory
) {
$this->configuration = new Configuration($safetyBufferPercentage = 10);
$this->configuration = new Configuration(10);

if ($this->configuration->isLessThan($maximumMemory) && !$this->configuration->isInfinite()) {
/**
Expand All @@ -48,7 +48,7 @@ public function sortBy(EntryReference ...$refs) : Extractor
$memoryConsumption = new Consumption();

$mergedRows = new Rows();
$maxSize = 0;
$maxSize = 1;

foreach ($this->cache->read($this->cacheId) as $rows) {
$maxSize = \max($rows->count(), $maxSize);
Expand All @@ -64,6 +64,6 @@ public function sortBy(EntryReference ...$refs) : Extractor

$this->cache->clear($this->cacheId);

return new Extractor\ProcessExtractor(...$mergedRows->sortBy(...$refs)->chunks($maxSize));
return new Extractor\ProcessExtractor(...\iterator_to_array($mergedRows->sortBy(...$refs)->chunks($maxSize)));
}
}
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/Extractor/BufferExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

final class BufferExtractor implements Extractor, OverridingExtractor
{
/**
* @param int<1, max> $maxRowsSize
*/
public function __construct(
private readonly Extractor $extractor,
private readonly int $maxRowsSize
Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/Extractor/ChunkExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

final class ChunkExtractor implements Extractor, OverridingExtractor
{
/**
* @param int<1, max> $chunkSize
*/
public function __construct(
private readonly Extractor $extractor,
private readonly int $chunkSize
Expand Down
5 changes: 4 additions & 1 deletion src/core/etl/src/Flow/ETL/Loader/BufferLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
use Flow\ETL\Rows;

/**
* @implements Loader<array{overflow_loader: Loader, buffer_size: int}>
* @implements Loader<array{overflow_loader: Loader, buffer_size: int<1, max>}>
*/
final class BufferLoader implements Closure, Loader, OverridingLoader
{
private Rows $buffer;

/**
* @param int<1, max> $bufferSize
*/
public function __construct(private readonly Loader $overflowLoader, private readonly int $bufferSize)
{
$this->buffer = new Rows();
Expand Down
8 changes: 3 additions & 5 deletions src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Flow\ETL\Pipeline;

use Flow\ETL\DSL\From;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
Expand All @@ -20,14 +19,13 @@ final class ParallelizingPipeline implements Pipeline
{
private readonly Pipeline $nextPipeline;

/**
* @param int<1, max> $parallel
*/
public function __construct(
private readonly Pipeline $pipeline,
private readonly int $parallel
) {
if ($parallel < 1) {
throw new InvalidArgumentException("Parallel value can't be lower than 1.");
}

$this->nextPipeline = $pipeline->cleanCopy();
}

Expand Down
19 changes: 7 additions & 12 deletions src/core/etl/src/Flow/ETL/Rows.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,21 @@ public function __unserialize(array $data) : void
public function add(Row ...$rows) : self
{
return new self(
...\array_merge($this->rows, $rows)
...$this->rows,
...$rows
);
}

/**
* @return Rows[]
* @param int<1, max> $size
*
* @return \Generator<Rows>
*/
public function chunks(int $size) : array
public function chunks(int $size) : \Generator
{
if ($size < 1) {
throw InvalidArgumentException::because('Chunk size must be greater than 0');
}

$chunks = [];

foreach (\array_chunk($this->rows, $size) as $chunk) {
$chunks[] = new self(...$chunk);
yield new self(...$chunk);
}

return $chunks;
}

public function count() : int
Expand Down
21 changes: 9 additions & 12 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/RowsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,6 @@ public function test_building_rows_from_array() : void
);
}

public function test_chunks_smaller_than_1() : void
{
$this->expectException(InvalidArgumentException::class);

(new Rows())->chunks(-1);
}

public function test_chunks_with_less() : void
{
$rows = new Rows(
Expand All @@ -208,8 +201,10 @@ public function test_chunks_with_less() : void
Row::create(new IntegerEntry('id', 7)),
);

$this->assertCount(1, $rows->chunks(10));
$this->assertSame([1, 2, 3, 4, 5, 6, 7], $rows->chunks(10)[0]->reduceToArray('id'));
$chunk = \iterator_to_array($rows->chunks(10));

$this->assertCount(1, $chunk);
$this->assertSame([1, 2, 3, 4, 5, 6, 7], $chunk[0]->reduceToArray('id'));
}

public function test_chunks_with_more_than_expected_in_chunk_rows() : void
Expand All @@ -227,9 +222,11 @@ public function test_chunks_with_more_than_expected_in_chunk_rows() : void
Row::create(new IntegerEntry('id', 10)),
);

$this->assertCount(2, $rows->chunks(5));
$this->assertSame([1, 2, 3, 4, 5], $rows->chunks(5)[0]->reduceToArray('id'));
$this->assertSame([6, 7, 8, 9, 10], $rows->chunks(5)[1]->reduceToArray('id'));
$chunk = \iterator_to_array($rows->chunks(5));

$this->assertCount(2, $chunk);
$this->assertSame([1, 2, 3, 4, 5], $chunk[0]->reduceToArray('id'));
$this->assertSame([6, 7, 8, 9, 10], $chunk[1]->reduceToArray('id'));
}

public function test_drop() : void
Expand Down

0 comments on commit 6defcaa

Please sign in to comment.