From a0234b3f7c91d42fc433a1ac7787ea22cc6e2470 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Sun, 4 Aug 2024 09:40:17 +0200 Subject: [PATCH] Use basenamePrefix instead of suffix when creating temporary file in overwrite save mode (#1165) --- .../JSON/Tests/Integration/JsonTest.php | 29 +++++++++++++++- .../Flow/ETL/Filesystem/FilesystemStreams.php | 8 ++--- .../NotPartitioned/OverwriteModeTest.php | 2 +- .../filesystem/src/Flow/Filesystem/Path.php | 8 +++++ .../Flow/Filesystem/Tests/Unit/PathTest.php | 33 +++++++++++++++++++ 5 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Integration/JsonTest.php b/src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Integration/JsonTest.php index e5cd62741..b8f7e818a 100644 --- a/src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Integration/JsonTest.php +++ b/src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Integration/JsonTest.php @@ -6,7 +6,7 @@ use function Flow\ETL\Adapter\JSON\from_json; use function Flow\ETL\Adapter\Json\to_json; -use function Flow\ETL\DSL\df; +use function Flow\ETL\DSL\{df, overwrite}; use function Flow\Filesystem\DSL\path; use Flow\ETL\Adapter\JSON\JsonLoader; use Flow\ETL\Tests\Double\FakeExtractor; @@ -53,4 +53,31 @@ public function test_json_loader_loading_empty_string() : void \unlink($path); } } + + public function test_json_loader_overwrite_mode() : void + { + + df() + ->read(new FakeExtractor(100)) + ->write(to_json($path = __DIR__ . '/var/test_json_loader.json')) + ->run(); + + df() + ->read(new FakeExtractor(100)) + ->mode(overwrite()) + ->write(to_json($path = __DIR__ . '/var/test_json_loader.json')) + ->run(); + + $content = \file_get_contents($path); + self::stringEndsWith(']', $content); + + self::assertEquals( + 100, + df()->read(from_json($path))->count() + ); + + if (\file_exists($path)) { + \unlink($path); + } + } } diff --git a/src/core/etl/src/Flow/ETL/Filesystem/FilesystemStreams.php b/src/core/etl/src/Flow/ETL/Filesystem/FilesystemStreams.php index f5de49c0e..b53078588 100644 --- a/src/core/etl/src/Flow/ETL/Filesystem/FilesystemStreams.php +++ b/src/core/etl/src/Flow/ETL/Filesystem/FilesystemStreams.php @@ -13,7 +13,7 @@ */ final class FilesystemStreams implements \Countable, \IteratorAggregate { - public const FLOW_TMP_SUFFIX = '._flow_tmp'; + public const FLOW_TMP_FILE_PREFIX = '._flow_php_tmp.'; private SaveMode $saveMode; @@ -47,7 +47,7 @@ public function closeWriters(Path $path) : void $partitionFilesPatter = new Path($fileStream->path()->parentDirectory()->path() . '/*', $fileStream->path()->options()); foreach ($fs->list($partitionFilesPatter) as $partitionFile) { - if (\str_ends_with($partitionFile->path->path(), self::FLOW_TMP_SUFFIX)) { + if (\str_contains($partitionFile->path->path(), self::FLOW_TMP_FILE_PREFIX)) { continue; } @@ -58,7 +58,7 @@ public function closeWriters(Path $path) : void $fs->mv( $fileStream->path(), new Path( - \str_replace(self::FLOW_TMP_SUFFIX, '', $fileStream->path()->uri()), + \str_replace(self::FLOW_TMP_FILE_PREFIX, '', $fileStream->path()->uri()), $fileStream->path()->options() ) ); @@ -199,7 +199,7 @@ public function writeTo(Path $path, array $partitions = []) : DestinationStream } if ($this->saveMode === SaveMode::Overwrite) { - $outputPath = new Path($outputPath->uri() . self::FLOW_TMP_SUFFIX, $outputPath->options()); + $outputPath = $outputPath->basenamePrefix(self::FLOW_TMP_FILE_PREFIX); } if ($this->saveMode === SaveMode::ExceptionIfExists) { diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/FilesystemStreams/NotPartitioned/OverwriteModeTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/FilesystemStreams/NotPartitioned/OverwriteModeTest.php index 00d309fdb..d53a68c03 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/FilesystemStreams/NotPartitioned/OverwriteModeTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/FilesystemStreams/NotPartitioned/OverwriteModeTest.php @@ -27,7 +27,7 @@ public function test_open_stream_for_existing_file() : void ]); $fileStream = $streams->writeTo($path = $this->getPath(__FUNCTION__ . '/existing-file.txt')); - self::assertStringEndsWith(FilesystemStreams::FLOW_TMP_SUFFIX, $fileStream->path()->path()); + self::assertStringContainsString(FilesystemStreams::FLOW_TMP_FILE_PREFIX, $fileStream->path()->path()); $fileStream->append('some other content'); self::assertSame('some content', \file_get_contents($path->path())); diff --git a/src/lib/filesystem/src/Flow/Filesystem/Path.php b/src/lib/filesystem/src/Flow/Filesystem/Path.php index e338e06dd..47c76ed3c 100644 --- a/src/lib/filesystem/src/Flow/Filesystem/Path.php +++ b/src/lib/filesystem/src/Flow/Filesystem/Path.php @@ -150,6 +150,14 @@ public function basename() : string return $this->basename; } + public function basenamePrefix(string $prefix) : self + { + return new self( + $this->parentDirectory()->uri() . DIRECTORY_SEPARATOR . $prefix . $this->basename(), + $this->options() + ); + } + public function context() : ResourceContext { return ResourceContext::from($this); diff --git a/src/lib/filesystem/tests/Flow/Filesystem/Tests/Unit/PathTest.php b/src/lib/filesystem/tests/Flow/Filesystem/Tests/Unit/PathTest.php index d15808edf..ca70a5f08 100644 --- a/src/lib/filesystem/tests/Flow/Filesystem/Tests/Unit/PathTest.php +++ b/src/lib/filesystem/tests/Flow/Filesystem/Tests/Unit/PathTest.php @@ -125,6 +125,39 @@ public function test_extension() : void self::assertFalse((new Path(__DIR__))->extension()); } + public function test_file_prefix() : void + { + $path = new Path('flow-file://var/dir/file.csv', []); + + self::assertSame( + 'flow-file://var/dir/._flow_tmp.file.csv', + $path->basenamePrefix('._flow_tmp.')->uri() + ); + self::assertSame('csv', $path->extension()); + } + + public function test_file_prefix_on_directory() : void + { + $path = new Path('flow-file://var/dir/', []); + + self::assertSame( + 'flow-file://var/._flow_tmp.dir', + $path->basenamePrefix('._flow_tmp.')->uri() + ); + self::assertFalse($path->extension()); + } + + public function test_file_prefix_on_root_directory() : void + { + $path = new Path('flow-file://', []); + + self::assertSame( + 'flow-file://._flow_tmp.', + $path->basenamePrefix('._flow_tmp.')->uri() + ); + self::assertFalse($path->extension()); + } + #[DataProvider('paths_with_static_parts')] public function test_finding_static_part_of_the_path(string $staticPart, string $uri) : void {