From f3b5b23b477de16f0b593d085254a84dfc15d200 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Thu, 18 Jan 2024 13:12:01 +0100 Subject: [PATCH] Added automated detection of CSV separator and enclousure --- .../src/Flow/ETL/Adapter/CSV/CSVDetector.php | 74 +++++-------- .../src/Flow/ETL/Adapter/CSV/CSVExtractor.php | 18 ++-- .../Flow/ETL/Adapter/CSV/Detector/Option.php | 94 ++++++++++++++++ .../Flow/ETL/Adapter/CSV/Detector/Options.php | 85 +++++++++++++++ .../CSV/Exception/CantDetectCSVOptions.php | 11 ++ .../src/Flow/ETL/Adapter/CSV/functions.php | 18 +++- .../CSV/Tests/Integration/CSVDetectorTest.php | 101 +++++++----------- .../CSV/Tests/Unit/Detector/OptionTest.php | 37 +++++++ 8 files changed, 320 insertions(+), 118 deletions(-) create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Exception/CantDetectCSVOptions.php create mode 100644 src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Unit/Detector/OptionTest.php diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php index 693678520..8da757b54 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php @@ -4,26 +4,39 @@ namespace Flow\ETL\Adapter\CSV; +use Flow\ETL\Adapter\CSV\Detector\Option; +use Flow\ETL\Adapter\CSV\Detector\Options; +use Flow\ETL\Adapter\CSV\Exception\CantDetectCSVOptions; use Flow\ETL\Exception\InvalidArgumentException; -use Flow\ETL\Exception\RuntimeException; final class CSVDetector { + private ?Option $fallback; + + private Options $options; + + /** + * @var resource + */ private $resource; private int $startingPosition; /** - * @param $resource + * @param resource $resource */ - public function __construct($resource) + public function __construct($resource, ?Option $fallback = new Option(',', '"', '\\'), ?Options $options = null) { + /** @psalm-suppress DocblockTypeContradiction */ if (!\is_resource($resource)) { throw new InvalidArgumentException('Argument must be a valid resource'); } $this->resource = $resource; + /** @phpstan-ignore-next-line */ $this->startingPosition = \ftell($resource); + $this->options = $options ?? Options::all(); + $this->fallback = $fallback; } public function __destruct() @@ -32,69 +45,36 @@ public function __destruct() } /** - * @throws InvalidArgumentException - * @throws RuntimeException + * @throws CantDetectCSVOptions|InvalidArgumentException */ - public function separator(int $lines = 5) : string + public function detect(int $lines = 5) : Option { if ($lines < 1) { throw new InvalidArgumentException('Lines must be greater than 0'); } - $delimiters = [ - ',' => [], - "\t" => [], - ';' => [], - '|' => [], - ' ' => [], - '_' => [], - '-' => [], - ':' => [], - ]; - $readLines = 1; while ($line = \fgets($this->resource)) { - foreach ($delimiters as $delimiter => $count) { - $row = \str_getcsv($line, $delimiter); - $delimiters[$delimiter][] = \count($row); - } + $this->options->parse($line); if ($readLines++ >= $lines) { break; } } - foreach ($delimiters as $delimiter => $rows) { - $columnsCount = null; - - foreach ($rows as $rowColumns) { - if ($columnsCount === null) { - $columnsCount = $rowColumns; - } - - if ($columnsCount !== $rowColumns) { - unset($delimiters[$delimiter]); - - break; - } + try { + $bestOption = $this->options->onlyValid()->best(); + } catch (CantDetectCSVOptions $e) { + if ($this->fallback) { + return $this->fallback; } - } - - $delimiters = \array_map(fn (array $rows) : int => \array_sum($rows), $delimiters); - - \arsort($delimiters); - $delimiters = \array_filter($delimiters, fn (int $count) : bool => $count > $lines); - - if (!\count($delimiters)) { - \fseek($this->resource, $this->startingPosition); - - throw new RuntimeException('Cannot detect delimiter'); + throw $e; } - \fseek($this->resource, $this->startingPosition); + $this->options = $this->options->reset(); - return \array_key_first($delimiters); + return $bestOption; } } diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php index 4abbab8a0..acbdd5903 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php @@ -29,8 +29,8 @@ public function __construct( private readonly bool $withHeader = true, private readonly bool $emptyToNull = true, private readonly string|null $separator = null, - private readonly string $enclosure = '"', - private readonly string $escape = '\\', + private readonly string|null $enclosure = null, + private readonly string|null $escape = null, private readonly int $charactersReadInLine = 1000 ) { $this->resetLimit(); @@ -43,15 +43,21 @@ public function extract(FlowContext $context) : \Generator foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $path) { $stream = $context->streams()->fs()->open($path, Mode::READ); + $option = \Flow\ETL\Adapter\CSV\csv_detect_separator($stream->resource()); + + $separator = $this->separator ?? $option->separator; + $enclosure = $this->enclosure ?? $option->enclosure; + $escape = $this->escape ?? $option->escape; + $headers = []; if ($this->withHeader && \count($headers) === 0) { /** @var array $headers */ - $headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); } /** @var array $rowData */ - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); if (!\count($headers)) { $headers = \array_map(fn (int $e) : string => 'e' . \str_pad((string) $e, 2, '0', STR_PAD_LEFT), \range(0, \count($rowData) - 1)); @@ -81,7 +87,7 @@ public function extract(FlowContext $context) : \Generator } if (\count($headers) !== \count($rowData)) { - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); continue; } @@ -101,7 +107,7 @@ public function extract(FlowContext $context) : \Generator return; } - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); } } diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php new file mode 100644 index 000000000..56906b504 --- /dev/null +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php @@ -0,0 +1,94 @@ + + */ + private array $rows; + + public function __construct( + public string $separator, + public string $enclosure, + public string $escape = '\\' + ) { + if (\mb_strlen($this->separator) !== 1) { + throw new InvalidArgumentException('Separator must be a single character'); + } + + if (\mb_strlen($this->enclosure) !== 1) { + throw new InvalidArgumentException('Enclosure must be a single character'); + } + + $this->rows = []; + } + + public function isValid() : bool + { + $columnsCount = null; + + foreach ($this->rows as $row) { + if ($columnsCount === null) { + $columnsCount = \count($row); + + continue; + } + + if ($columnsCount !== \count($row)) { + return false; + } + } + + if ($columnsCount === 1) { + return false; + } + + return true; + } + + public function parse(string $line) : void + { + $this->rows[] = \str_getcsv($line, $this->separator, $this->enclosure); + } + + public function reset() : self + { + return new self($this->separator, $this->enclosure); + } + + public function score() : int + { + if (!$this->isValid()) { + return 0; + } + + if (!\count($this->rows)) { + return 0; + } + + $columnScore = \count($this->rows[0]) * self::COLUMN_SCORE_WEIGHT; + $totalLength = \array_reduce( + $this->rows, + static fn (int $carry, array $row) : int => $carry + \array_reduce( + $row, + static fn (int $carry, $column) : int => $carry + (\is_string($column) ? \mb_strlen($column) : 0), + 0 + ), + 0 + ); + + $lengthScore = (int) \round((1 / ($totalLength + 1) * self::COLUMNS_LENGTH_WEIGHT)); + + return $columnScore + $lengthScore; + } +} diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php new file mode 100644 index 000000000..63eefb848 --- /dev/null +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php @@ -0,0 +1,85 @@ + + */ + private array $options; + + /** + * @param array