Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplemented sorting algorithms #1146

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
## [Unreleased] - 2024-07-22
## [Unreleased] - 2024-07-26

### Added
- [#1144](https://github.com/flow-php/flow/pull/1144) - **Filesystem - added getSystemTmpDir() method** - [@norberttech](https://github.com/norberttech)
- [#1144](https://github.com/flow-php/flow/pull/1144) - **Path - added suffix(string $suffix): Path method** - [@norberttech](https://github.com/norberttech)
- [#1144](https://github.com/flow-php/flow/pull/1144) - **Path - removed setExtension()** - [@norberttech](https://github.com/norberttech)
- [#1144](https://github.com/flow-php/flow/pull/1144) - **Path - removed startsWith()** - [@norberttech](https://github.com/norberttech)
- [#1131](https://github.com/flow-php/flow/pull/1131) - **Dependabot for github actions dependencies** - [@norberttech](https://github.com/norberttech)
- [#1130](https://github.com/flow-php/flow/pull/1130) - **typed functions to array-dot library** - [@norberttech](https://github.com/norberttech)
- [#1127](https://github.com/flow-php/flow/pull/1127) - **Explain the “ETL” acronym** - [@alexislefebvre](https://github.com/alexislefebvre)
Expand Down
2 changes: 1 addition & 1 deletion phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ parameters:

excludePaths:
- src/core/etl/src/Flow/ETL/Formatter/ASCII/ASCIITable.php
- src/core/etl/src/Flow/ETL/ExternalSort/RowsMinHeap.php
- src/core/etl/src/Flow/ETL/Sort/ExternalSort/RowsMinHeap.php
- src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/SearchResults.php
- src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/SearchParams.php
- src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/PointInTime.php
Expand Down
2 changes: 1 addition & 1 deletion psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<directory name="src/adapter/etl-adapter-avro/src" />

<file name="src/core/etl/src/Flow/ETL/Formatter/ASCII/ASCIITable.php" />
<file name="src/core/etl/src/Flow/ETL/ExternalSort/RowsMinHeap.php" />
<file name="src/core/etl/src/Flow/ETL/Sort/ExternalSort/RowsMinHeap.php" />

<file name="src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/SearchResults.php"/>
<file name="src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/PointInTime.php"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,25 @@
use Flow\Azure\SDK\BlobServiceInterface;
use Flow\Filesystem\Path\Filter;
use Flow\Filesystem\Path\Filter\KeepAll;
use Flow\Filesystem\{DestinationStream, FileStatus, Filesystem, Path, Protocol, SourceStream};
use Flow\Filesystem\{DestinationStream,
Exception\RuntimeException,
FileStatus,
Filesystem,
Path,
Protocol,
SourceStream};

final class AzureBlobFilesystem implements Filesystem
{
public function __construct(private readonly BlobServiceInterface $blobService, private readonly Options $options)
{
}

public function getSystemTmpDir() : Path
{
return $this->options->tmpDir();
}

public function list(Path $path, Filter $pathFilter = new KeepAll()) : \Generator
{
$this->protocol()->validateScheme($path);
Expand Down Expand Up @@ -68,6 +79,10 @@ public function readFrom(Path $path) : SourceStream

public function rm(Path $path) : bool
{
if ($path->isEqual($this->getSystemTmpDir())) {
return false;
}

$this->protocol()->validateScheme($path);

if ($path->isPattern()) {
Expand Down Expand Up @@ -111,6 +126,10 @@ public function rm(Path $path) : bool

public function status(Path $path) : ?FileStatus
{
if ($path->isEqual($this->getSystemTmpDir())) {
return new FileStatus($path, false);
}

$this->protocol()->validateScheme($path);

if (!$path->isPattern()) {
Expand Down Expand Up @@ -147,6 +166,10 @@ public function status(Path $path) : ?FileStatus

public function writeTo(Path $path) : DestinationStream
{
if ($path->isEqual($this->getSystemTmpDir())) {
throw new RuntimeException('Cannot write to system tmp directory');
}

$this->protocol()->validateScheme($path);

return AzureBlobDestinationStream::openBlank(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow\Filesystem\Bridge\Azure;

use Flow\Azure\SDK\BlobService\ListBlobs\{ListBlobOptions, OptionInclude, OptionShowOnly};
use Flow\Filesystem\Path;
use Flow\Filesystem\Stream\Block\NativeLocalFileBlocksFactory;
use Flow\Filesystem\Stream\BlockFactory;

Expand All @@ -23,9 +24,12 @@ final class Options

private ?OptionShowOnly $listBlobShowOnly = null;

private Path $tmpDir;

public function __construct()
{
$this->blockFactory = new NativeLocalFileBlocksFactory();
$this->tmpDir = new Path('azure-blob://_$azure_flow_tmp$/');
}

public function blockFactory() : BlockFactory
Expand Down Expand Up @@ -57,6 +61,11 @@ public function listBlobOptions() : ListBlobOptions
return $listBlobOptions;
}

public function tmpDir() : Path
{
return $this->tmpDir;
}

public function withBlockFactory(BlockFactory $blockFactory) : self
{
$this->blockFactory = $blockFactory;
Expand Down Expand Up @@ -91,4 +100,11 @@ public function withListBlobShowOnly(OptionShowOnly $listBlobShowOnly) : self

return $this;
}

public function withTmpDir(Path $tmpDir) : self
{
$this->tmpDir = $tmpDir;

return $this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\Filesystem\Bridge\Azure\Tests\Integration;

use function Flow\Filesystem\Bridge\Azure\DSL\azure_filesystem;
use function Flow\Filesystem\Bridge\Azure\DSL\{azure_filesystem, azure_filesystem_options};
use Flow\Filesystem\Path;

final class AzureBlobFilesystemTest extends AzureBlobServiceTestCase
Expand Down Expand Up @@ -137,6 +137,64 @@ public function test_removing_folder_pattern() : void
self::assertNull($fs->status(new Path('azure-blob://nested/orders/orders_01.csv')));
}

public function test_rm_tmp_dir() : void
{
$fs = azure_filesystem($this->blobService('flow-php'));

self::assertFalse($fs->rm($fs->getSystemTmpDir()));
}

public function test_tmp_dir() : void
{
$fs = azure_filesystem($this->blobService('flow-php'));

self::assertSame('azure-blob://_$azure_flow_tmp$/', $fs->getSystemTmpDir()->uri());
}

public function test_tmp_dir_status() : void
{
$fs = azure_filesystem($this->blobService('flow-php'));

self::assertTrue($fs->status($fs->getSystemTmpDir())->isDirectory());
}

public function test_write_to_custom_tmp_dir() : void
{
$fs = azure_filesystem($this->blobService('flow-php'), azure_filesystem_options()->withTmpDir(new Path('azure-blob://custom-tmp-dir/')));

$stream = $fs->writeTo($fs->getSystemTmpDir()->suffix('file.txt'));
$stream->append('Hello, World!');
$stream->close();

self::assertTrue($fs->status(new Path('azure-blob://custom-tmp-dir/file.txt'))->isFile());
self::assertSame('Hello, World!', $fs->readFrom(new Path('azure-blob://custom-tmp-dir/file.txt'))->content());

$fs->rm($fs->getSystemTmpDir()->suffix('file.txt'));
}

public function test_write_to_tmp_dir() : void
{
$fs = azure_filesystem($this->blobService('flow-php'));

$stream = $fs->writeTo($fs->getSystemTmpDir()->suffix('file.txt'));
$stream->append('Hello, World!');
$stream->close();

self::assertTrue($fs->status($fs->getSystemTmpDir()->suffix('file.txt'))->isFile());
self::assertSame('Hello, World!', $fs->readFrom($fs->getSystemTmpDir()->suffix('file.txt'))->content());

$fs->rm($fs->getSystemTmpDir()->suffix('file.txt'));
}

public function test_write_to_tmp_dir_as_to_a_file() : void
{
$fs = azure_filesystem($this->blobService('flow-php'));

$this->expectExceptionMessage('Cannot write to system tmp directory');

$fs->writeTo($fs->getSystemTmpDir());
}

public function test_writing_to_azure_blob_storage() : void
{
$fs = azure_filesystem($this->blobService('flow-php'));
Expand Down
19 changes: 10 additions & 9 deletions src/core/etl/src/Flow/ETL/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\ETL\Monitoring\Memory\Unit;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Pipeline\Optimizer;
use Flow\ETL\Row\EntryFactory;
use Flow\Filesystem\FilesystemTable;
use Flow\Filesystem\{FilesystemTable};
use Flow\Serializer\Serializer;

/**
Expand All @@ -20,7 +21,7 @@ final class Config
{
public const CACHE_DIR_ENV = 'FLOW_LOCAL_FILESYSTEM_CACHE_DIR';

public const EXTERNAL_SORT_MAX_MEMORY_ENV = 'FLOW_EXTERNAL_SORT_MAX_MEMORY';
public const SORT_MAX_MEMORY_ENV = 'FLOW_SORT_MAX_MEMORY';

/**
* @param int<1, max> $cacheBatchSize
Expand All @@ -29,14 +30,14 @@ public function __construct(
private readonly string $id,
private readonly Serializer $serializer,
private readonly Cache $cache,
private readonly ExternalSort $externalSort,
private readonly Unit $sortMemoryLimit,
private readonly FilesystemTable $filesystemTable,
private readonly FilesystemStreams $filesystemStreams,
private readonly Optimizer $optimizer,
private readonly Caster $caster,
private readonly bool $putInputIntoRows,
private readonly EntryFactory $entryFactory,
private readonly int $cacheBatchSize
private readonly int $cacheBatchSize,
) {
if ($this->cacheBatchSize < 1) {
throw new InvalidArgumentException('Cache batch size must be greater than 0');
Expand Down Expand Up @@ -76,11 +77,6 @@ public function entryFactory() : EntryFactory
return $this->entryFactory;
}

public function externalSort() : ExternalSort
{
return $this->externalSort;
}

public function filesystemStreams() : FilesystemStreams
{
return $this->filesystemStreams;
Expand Down Expand Up @@ -110,4 +106,9 @@ public function shouldPutInputIntoRows() : bool
{
return $this->putInputIntoRows;
}

public function sortMemoryLimit() : Unit
{
return $this->sortMemoryLimit;
}
}
43 changes: 26 additions & 17 deletions src/core/etl/src/Flow/ETL/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use function Flow\Filesystem\DSL\fstab;
use Flow\ETL\Cache\LocalFilesystemCache;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\ExternalSort\MemorySort;
use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\ETL\Monitoring\Memory\Unit;
use Flow\ETL\PHP\Type\Caster;
Expand All @@ -18,6 +17,8 @@

final class ConfigBuilder
{
public const DEFAULT_SORT_MEMORY_PERCENTAGE = 70;

private ?Cache $cache;

/**
Expand All @@ -27,8 +28,6 @@ final class ConfigBuilder

private ?Caster $caster;

private ?ExternalSort $externalSort;

private ?FilesystemTable $fstab;

private ?string $id;
Expand All @@ -39,12 +38,14 @@ final class ConfigBuilder

private ?Serializer $serializer;

private ?Unit $sortMemoryLimit;

public function __construct()
{
$this->id = null;
$this->serializer = null;
$this->cache = null;
$this->externalSort = null;
$this->sortMemoryLimit = null;
$this->fstab = null;
$this->putInputIntoRows = false;
$this->optimizer = null;
Expand All @@ -71,11 +72,19 @@ public function build() : Config
$this->cache = new LocalFilesystemCache($cachePath, $this->serializer);
}

$this->externalSort ??= new MemorySort(
$this->id,
$this->cache,
\is_string(\getenv(Config::EXTERNAL_SORT_MAX_MEMORY_ENV)) ? Unit::fromString(\getenv(Config::EXTERNAL_SORT_MAX_MEMORY_ENV)) : Unit::fromMb(200)
);
if ($this->sortMemoryLimit === null) {
if (\is_string(\getenv(Config::SORT_MAX_MEMORY_ENV))) {
$this->sortMemoryLimit = Unit::fromString(\getenv(Config::SORT_MAX_MEMORY_ENV));
} else {
$memoryLimit = \ini_get('memory_limit');

if ($memoryLimit === '-1') {
$this->sortMemoryLimit = Unit::fromBytes(\PHP_INT_MAX);
} else {
$this->sortMemoryLimit = Unit::fromString($memoryLimit)->percentage(self::DEFAULT_SORT_MEMORY_PERCENTAGE);
}
}
}

$this->optimizer ??= new Optimizer(
new Optimizer\LimitOptimization(),
Expand All @@ -88,7 +97,7 @@ public function build() : Config
$this->id,
$this->serializer,
$this->cache,
$this->externalSort,
$this->sortMemoryLimit,
$this->fstab(),
new FilesystemStreams($this->fstab()),
$this->optimizer,
Expand Down Expand Up @@ -127,13 +136,6 @@ public function dontPutInputIntoRows() : self
return $this;
}

public function externalSort(ExternalSort $externalSort) : self
{
$this->externalSort = $externalSort;

return $this;
}

public function id(string $id) : self
{
$this->id = $id;
Expand Down Expand Up @@ -178,6 +180,13 @@ public function serializer(Serializer $serializer) : self
return $this;
}

public function sortMemoryLimit(Unit $unit) : self
{
$this->sortMemoryLimit = $unit;

return $this;
}

public function unmount(Filesystem $filesystem) : self
{
$this->fstab()->unmount($filesystem);
Expand Down
Loading
Loading