From 8e51014a6cd57af695dba7df75547c6b98c94acc Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Wed, 14 Feb 2024 21:49:01 +0100 Subject: [PATCH] Fixed writing parquet to remote locations (#989) --- .../Adapter/Filesystem/FlysystemWrapper.php | 38 +++++++++++++++++++ .../src/Flow/ETL/Filesystem/LocalBuffer.php | 4 ++ .../ETL/Filesystem/Stream/StreamWrapper.php | 4 ++ .../Filesystem/Stream/VoidStreamWrapper.php | 10 +++++ .../src/Flow/ETL/Filesystem/TmpfileBuffer.php | 10 +++++ src/lib/parquet/src/Flow/Parquet/Writer.php | 5 ++- 6 files changed, 70 insertions(+), 1 deletion(-) diff --git a/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/FlysystemWrapper.php b/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/FlysystemWrapper.php index b5e003e29..13c98386f 100644 --- a/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/FlysystemWrapper.php +++ b/src/adapter/etl-adapter-filesystem/src/Flow/ETL/Adapter/Filesystem/FlysystemWrapper.php @@ -28,6 +28,8 @@ abstract class FlysystemWrapper implements StreamWrapper */ protected $stream; + protected ?array $streamMedata = null; + /** * @var null|array{path?: string, host?: string} */ @@ -53,6 +55,10 @@ public function stream_close() : void public function stream_eof() : bool { + if ($this->stream === null) { + return false; + } + $this->openRead(); /** @@ -97,6 +103,10 @@ public function stream_open(string $path, string $mode, int $options, ?string &$ default => null }; + if ($this->stream) { + $this->streamMedata = \stream_get_meta_data($this->stream); + } + return true; } @@ -112,6 +122,25 @@ public function stream_read(int $count) : string|false return \fread($this->stream, $count); } + public function stream_seek(int $offset, int $whence = SEEK_SET) : bool + { + if ($this->stream === null) { + return false; + } + + if ($this->streamMedata === null) { + return false; + } + + if ($this->streamMedata['seekable'] === false) { + throw new RuntimeException('Remote streams are not seekable'); + } + + $this->buffer()->seek($offset, $whence); + + return true; + } + public function stream_stat() : array|false { if (!$this->filesystem()->fileExists($this->path())) { @@ -134,6 +163,15 @@ public function stream_stat() : array|false ]; } + public function stream_tell() : int|false + { + if ($this->stream === null) { + return $this->buffer()->tell(); + } + + return \ftell($this->stream); + } + public function stream_write(string $data) : int { $this->buffer()->write($data); diff --git a/src/core/etl/src/Flow/ETL/Filesystem/LocalBuffer.php b/src/core/etl/src/Flow/ETL/Filesystem/LocalBuffer.php index dd7b0448c..a208a5e63 100644 --- a/src/core/etl/src/Flow/ETL/Filesystem/LocalBuffer.php +++ b/src/core/etl/src/Flow/ETL/Filesystem/LocalBuffer.php @@ -6,10 +6,14 @@ interface LocalBuffer { public function release() : void; + public function seek(int $offset, int $whence = SEEK_SET) : void; + /** * @return resource */ public function stream(); + public function tell() : int|false; + public function write(string $data) : void; } diff --git a/src/core/etl/src/Flow/ETL/Filesystem/Stream/StreamWrapper.php b/src/core/etl/src/Flow/ETL/Filesystem/Stream/StreamWrapper.php index 2daecc9fe..5f4a67c4e 100644 --- a/src/core/etl/src/Flow/ETL/Filesystem/Stream/StreamWrapper.php +++ b/src/core/etl/src/Flow/ETL/Filesystem/Stream/StreamWrapper.php @@ -25,11 +25,15 @@ public function stream_open(string $path, string $mode, int $options, ?string &$ public function stream_read(int $count) : string|false; + public function stream_seek(int $offset, int $whence = SEEK_SET) : bool; + /** * @return array|false */ public function stream_stat() : array|false; + public function stream_tell() : int|false; + public function stream_write(string $data) : int; /** diff --git a/src/core/etl/src/Flow/ETL/Filesystem/Stream/VoidStreamWrapper.php b/src/core/etl/src/Flow/ETL/Filesystem/Stream/VoidStreamWrapper.php index 48100b3dc..e354b5fa2 100644 --- a/src/core/etl/src/Flow/ETL/Filesystem/Stream/VoidStreamWrapper.php +++ b/src/core/etl/src/Flow/ETL/Filesystem/Stream/VoidStreamWrapper.php @@ -42,11 +42,21 @@ public function stream_read(int $count) : string|false return false; } + public function stream_seek(int $offset, int $whence = SEEK_SET) : bool + { + return false; + } + public function stream_stat() : array|false { return false; } + public function stream_tell() : int + { + return 0; + } + public function stream_write(string $data) : int { return 0; diff --git a/src/core/etl/src/Flow/ETL/Filesystem/TmpfileBuffer.php b/src/core/etl/src/Flow/ETL/Filesystem/TmpfileBuffer.php index da104d0ff..bdbbbd845 100644 --- a/src/core/etl/src/Flow/ETL/Filesystem/TmpfileBuffer.php +++ b/src/core/etl/src/Flow/ETL/Filesystem/TmpfileBuffer.php @@ -21,6 +21,11 @@ public function release() : void } } + public function seek(int $offset, int $whence = SEEK_SET) : void + { + \fseek($this->stream(), $offset, $whence); + } + /** * @return resource */ @@ -35,6 +40,11 @@ public function stream() return $this->stream; } + public function tell() : int|false + { + return \ftell($this->stream()); + } + public function write(string $data) : void { \fwrite($this->stream(), $data); diff --git a/src/lib/parquet/src/Flow/Parquet/Writer.php b/src/lib/parquet/src/Flow/Parquet/Writer.php index fdb167a84..ee0a1aa81 100644 --- a/src/lib/parquet/src/Flow/Parquet/Writer.php +++ b/src/lib/parquet/src/Flow/Parquet/Writer.php @@ -150,7 +150,10 @@ public function openForStream($resource, Schema $schema) : void $this->stream = $resource; - \fseek($this->stream(), 0); + if (\ftell($this->stream()) !== 0) { + \fseek($this->stream(), 0); + } + \fwrite($this->stream(), ParquetFile::PARQUET_MAGIC_NUMBER); $this->fileOffset = \strlen(ParquetFile::PARQUET_MAGIC_NUMBER);