Skip to content

Commit

Permalink
Improve performance of merching rows into given batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Apr 12, 2024
1 parent 7fff268 commit 662943c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
39 changes: 29 additions & 10 deletions src/core/etl/src/Flow/ETL/Extractor/ChunkExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,53 @@ public function __construct(

public function extract(FlowContext $context) : \Generator
{
$chunk = new Rows();
$chunk = [];
$chunkSize = 0;

foreach ($this->extractor->extract($context) as $rows) {
foreach ($rows->chunks($this->chunkSize) as $rowsChunk) {
$chunk = $chunk->merge($rowsChunk);
$chunk[] = $rowsChunk->all();
$chunkSize += $rowsChunk->count();

if ($chunk->count() === $this->chunkSize) {
$signal = yield $chunk;
if ($chunkSize === $this->chunkSize) {
$signal = yield new Rows(
...\array_merge(
...$chunk
)
);

if ($signal === Signal::STOP) {
return;
}
$chunk = new Rows();
$chunkSize = 0;
$chunk = [];
}

if ($chunk->count() > $this->chunkSize) {
$signal = yield $chunk->dropRight($chunk->count() - $this->chunkSize);
if ($chunkSize > $this->chunkSize) {
$allRows = new Rows(
...\array_merge(
...$chunk
)
);

$signal = yield $allRows->dropRight($allRows->count() - $this->chunkSize);

if ($signal === Signal::STOP) {
return;
}
$chunk = $chunk->takeRight($chunk->count() - $this->chunkSize);
$leftover = $allRows->takeRight($allRows->count() - $this->chunkSize);
$chunk = [$leftover->all()];
$chunkSize = $leftover->count();
}
}
}

if ($chunk->count()) {
yield $chunk;
if ($chunkSize) {
yield new Rows(
...\array_merge(
...$chunk
)
);
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/core/etl/src/Flow/ETL/Rows.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ public function add(Row ...$rows) : self
);
}

/**
* return array<Row>.
*/
public function all() : array
{
return $this->rows;
}

/**
* @param int<1, max> $size
*
Expand Down

0 comments on commit 662943c

Please sign in to comment.