From 179cbe0ea4e3258c06fdfe4d0b9cb198a237f418 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Thu, 18 Jan 2024 13:22:41 +0100 Subject: [PATCH] CSV options detection (#918) * Checkpoint * Added automated detection of CSV separator and enclousure --- .../src/Flow/ETL/Adapter/CSV/CSVDetector.php | 80 ++++++++++++++++ .../src/Flow/ETL/Adapter/CSV/CSVExtractor.php | 20 ++-- .../Flow/ETL/Adapter/CSV/Detector/Option.php | 94 ++++++++++++++++++ .../Flow/ETL/Adapter/CSV/Detector/Options.php | 85 ++++++++++++++++ .../CSV/Exception/CantDetectCSVOptions.php | 11 +++ .../src/Flow/ETL/Adapter/CSV/functions.php | 19 +++- .../CSV/Tests/Integration/CSVDetectorTest.php | 96 +++++++++++++++++++ .../CSV/Tests/Unit/Detector/OptionTest.php | 37 +++++++ 8 files changed, 432 insertions(+), 10 deletions(-) create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Exception/CantDetectCSVOptions.php create mode 100644 src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVDetectorTest.php create mode 100644 src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Unit/Detector/OptionTest.php diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php new file mode 100644 index 000000000..8da757b54 --- /dev/null +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php @@ -0,0 +1,80 @@ +resource = $resource; + /** @phpstan-ignore-next-line */ + $this->startingPosition = \ftell($resource); + $this->options = $options ?? Options::all(); + $this->fallback = $fallback; + } + + public function __destruct() + { + \fseek($this->resource, $this->startingPosition); + } + + /** + * @throws CantDetectCSVOptions|InvalidArgumentException + */ + public function detect(int $lines = 5) : Option + { + if ($lines < 1) { + throw new InvalidArgumentException('Lines must be greater than 0'); + } + + $readLines = 1; + + while ($line = \fgets($this->resource)) { + $this->options->parse($line); + + if ($readLines++ >= $lines) { + break; + } + } + + try { + $bestOption = $this->options->onlyValid()->best(); + } catch (CantDetectCSVOptions $e) { + if ($this->fallback) { + return $this->fallback; + } + + throw $e; + } + + $this->options = $this->options->reset(); + + return $bestOption; + } +} diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php index 2eddee69a..acbdd5903 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php @@ -28,9 +28,9 @@ public function __construct( private readonly Path $path, private readonly bool $withHeader = true, private readonly bool $emptyToNull = true, - private readonly string $separator = ',', - private readonly string $enclosure = '"', - private readonly string $escape = '\\', + private readonly string|null $separator = null, + private readonly string|null $enclosure = null, + private readonly string|null $escape = null, private readonly int $charactersReadInLine = 1000 ) { $this->resetLimit(); @@ -43,15 +43,21 @@ public function extract(FlowContext $context) : \Generator foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $path) { $stream = $context->streams()->fs()->open($path, Mode::READ); + $option = \Flow\ETL\Adapter\CSV\csv_detect_separator($stream->resource()); + + $separator = $this->separator ?? $option->separator; + $enclosure = $this->enclosure ?? $option->enclosure; + $escape = $this->escape ?? $option->escape; + $headers = []; if ($this->withHeader && \count($headers) === 0) { /** @var array $headers */ - $headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); } /** @var array $rowData */ - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); if (!\count($headers)) { $headers = \array_map(fn (int $e) : string => 'e' . \str_pad((string) $e, 2, '0', STR_PAD_LEFT), \range(0, \count($rowData) - 1)); @@ -81,7 +87,7 @@ public function extract(FlowContext $context) : \Generator } if (\count($headers) !== \count($rowData)) { - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); continue; } @@ -101,7 +107,7 @@ public function extract(FlowContext $context) : \Generator return; } - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); } } diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php new file mode 100644 index 000000000..56906b504 --- /dev/null +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php @@ -0,0 +1,94 @@ + + */ + private array $rows; + + public function __construct( + public string $separator, + public string $enclosure, + public string $escape = '\\' + ) { + if (\mb_strlen($this->separator) !== 1) { + throw new InvalidArgumentException('Separator must be a single character'); + } + + if (\mb_strlen($this->enclosure) !== 1) { + throw new InvalidArgumentException('Enclosure must be a single character'); + } + + $this->rows = []; + } + + public function isValid() : bool + { + $columnsCount = null; + + foreach ($this->rows as $row) { + if ($columnsCount === null) { + $columnsCount = \count($row); + + continue; + } + + if ($columnsCount !== \count($row)) { + return false; + } + } + + if ($columnsCount === 1) { + return false; + } + + return true; + } + + public function parse(string $line) : void + { + $this->rows[] = \str_getcsv($line, $this->separator, $this->enclosure); + } + + public function reset() : self + { + return new self($this->separator, $this->enclosure); + } + + public function score() : int + { + if (!$this->isValid()) { + return 0; + } + + if (!\count($this->rows)) { + return 0; + } + + $columnScore = \count($this->rows[0]) * self::COLUMN_SCORE_WEIGHT; + $totalLength = \array_reduce( + $this->rows, + static fn (int $carry, array $row) : int => $carry + \array_reduce( + $row, + static fn (int $carry, $column) : int => $carry + (\is_string($column) ? \mb_strlen($column) : 0), + 0 + ), + 0 + ); + + $lengthScore = (int) \round((1 / ($totalLength + 1) * self::COLUMNS_LENGTH_WEIGHT)); + + return $columnScore + $lengthScore; + } +} diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php new file mode 100644 index 000000000..63eefb848 --- /dev/null +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php @@ -0,0 +1,85 @@ + + */ + private array $options; + + /** + * @param array