diff --git a/src/core/etl/src/Flow/ETL/Config/Cache/CacheConfig.php b/src/core/etl/src/Flow/ETL/Config/Cache/CacheConfig.php index 44dc44a3c..c502fc32d 100644 --- a/src/core/etl/src/Flow/ETL/Config/Cache/CacheConfig.php +++ b/src/core/etl/src/Flow/ETL/Config/Cache/CacheConfig.php @@ -12,12 +12,10 @@ final class CacheConfig public const CACHE_DIR_ENV = 'FLOW_LOCAL_FILESYSTEM_CACHE_DIR'; /** - * @param int<1, max> $cacheBatchSize * @param int<1, max> $externalSortBucketsCount */ public function __construct( public readonly Cache $cache, - public readonly int $cacheBatchSize, public readonly Path $localFilesystemCacheDir, public readonly int $externalSortBucketsCount, ) { diff --git a/src/core/etl/src/Flow/ETL/Config/Cache/CacheConfigBuilder.php b/src/core/etl/src/Flow/ETL/Config/Cache/CacheConfigBuilder.php index a6f1f1537..be9fdb557 100644 --- a/src/core/etl/src/Flow/ETL/Config/Cache/CacheConfigBuilder.php +++ b/src/core/etl/src/Flow/ETL/Config/Cache/CacheConfigBuilder.php @@ -5,6 +5,7 @@ namespace Flow\ETL\Config\Cache; use function Flow\Filesystem\DSL\protocol; +use Flow\ETL\Cache; use Flow\ETL\Cache\{Implementation\FilesystemCache}; use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException}; use Flow\Filesystem\{FilesystemTable, Path}; @@ -12,12 +13,7 @@ final class CacheConfigBuilder { - private ?\Flow\ETL\Cache $cache = null; - - /** - * @var int<1, max> - */ - private int $cacheBatchSize = 100; + private ?Cache $cache = null; /** * @var int<1, max> @@ -46,33 +42,18 @@ public function build(FilesystemTable $fstab, Serializer $serializer) : CacheCon $serializer, cacheDir: Path::realpath($cachePath) ), - cacheBatchSize: $this->cacheBatchSize, localFilesystemCacheDir: Path::realpath($cachePath), externalSortBucketsCount: $this->externalSortBucketsCount ); } - public function cache(\Flow\ETL\Cache $cache) : self + public function cache(Cache $cache) : self { $this->cache = $cache; return $this; } - /** - * @param int<1, max> $cacheBatchSize - */ - public function cacheBatchSize(int $cacheBatchSize) : self - { - if ($cacheBatchSize < 1) { - throw new InvalidArgumentException('Cache batch size must be greater than 0'); - } - - $this->cacheBatchSize = $cacheBatchSize; - - return $this; - } - /** * @param int<1, max> $externalSortBucketsCount */ diff --git a/src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php b/src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php index b9a5385d6..de4051c2d 100644 --- a/src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php +++ b/src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php @@ -7,7 +7,6 @@ use function Flow\Filesystem\DSL\fstab; use Flow\ETL\Config\Cache\CacheConfigBuilder; use Flow\ETL\Config\Sort\SortConfigBuilder; -use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Filesystem\FilesystemStreams; use Flow\ETL\Monitoring\Memory\Unit; use Flow\ETL\PHP\Type\Caster; @@ -84,18 +83,6 @@ public function cache(Cache $cache) : self return $this; } - /** - * @param int<1, max> $cacheBatchSize - * - * @throws InvalidArgumentException - */ - public function cacheBatchSize(int $cacheBatchSize) : self - { - $this->cache->cacheBatchSize($cacheBatchSize); - - return $this; - } - public function dontPutInputIntoRows() : self { $this->putInputIntoRows = false; diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index 864d1a6a7..5dc8ed83d 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -171,6 +171,10 @@ public function batchSize(int $size) : self * Cache type can be set through ConfigBuilder. * By default everything is cached in system tmp dir. * + * Important: cache batch size might significantly improve performance when processing large amount of rows. + * Larger batch size will increase memory consumption but will reduce number of IO operations. + * When not set, the batch size is taken from the last DataFrame::batchSize() call. + * * @lazy * * @param null|string $id @@ -183,8 +187,11 @@ public function cache(?string $id = null, ?int $cacheBatchSize = null) : self throw new InvalidArgumentException('Cache batch size must be greater than 0'); } - $this->batchSize($cacheBatchSize ?? $this->context->config->cache->cacheBatchSize); - $this->pipeline = new LinkedPipeline(new CachingPipeline($this->pipeline, $id)); + if ($cacheBatchSize) { + $this->pipeline = new LinkedPipeline(new CachingPipeline(new BatchingPipeline($this->pipeline, $cacheBatchSize), $id)); + } else { + $this->pipeline = new LinkedPipeline(new CachingPipeline($this->pipeline, $id)); + } return $this; } diff --git a/src/core/etl/src/Flow/ETL/Pipeline/PipelineMessage.php b/src/core/etl/src/Flow/ETL/Pipeline/PipelineMessage.php deleted file mode 100644 index 3fc6ad8d7..000000000 --- a/src/core/etl/src/Flow/ETL/Pipeline/PipelineMessage.php +++ /dev/null @@ -1,9 +0,0 @@ -extractions); self::assertFalse($cache->has('test_etl_cache')); } + + public function test_cache_with_previously_set_batch_size() : void + { + $cache = new InMemoryCache(); + + df(config_builder()->cache($cache)) + ->read( + from_array(\array_map( + fn (int $i) => ['id' => $i], + \range(1, 100) + )) + ) + ->batchSize(20) + ->cache('test') + ->run(); + + /** @var CacheIndex $cacheIndex */ + $cacheIndex = $cache->get('test'); + + self::assertCount(5, $cacheIndex->values()); + + foreach ($cacheIndex->values() as $index => $cacheRowsKey) { + $rows = $cache->get($cacheRowsKey); + self::assertInstanceOf(Rows::class, $rows); + self::assertCount(20, $rows); + } + } + + public function test_cache_without_previously_set_batch_size() : void + { + $cache = new InMemoryCache(); + + df(config_builder()->cache($cache)) + ->read( + from_array(\array_map( + fn (int $i) => ['id' => $i], + \range(1, 100) + )) + ) + ->cache('test') + ->run(); + + /** @var CacheIndex $cacheIndex */ + $cacheIndex = $cache->get('test'); + + self::assertCount(100, $cacheIndex->values()); + + foreach ($cacheIndex->values() as $index => $cacheRowsKey) { + $rows = $cache->get($cacheRowsKey); + self::assertInstanceOf(Rows::class, $rows); + self::assertCount(1, $rows); + } + } }