Skip to content

Commit

Permalink
CSV options detection (#918)
Browse files Browse the repository at this point in the history
* Checkpoint

* Added automated detection of CSV separator and enclousure
  • Loading branch information
norberttech authored Jan 18, 2024
1 parent 0b21012 commit 179cbe0
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV;

use Flow\ETL\Adapter\CSV\Detector\Option;
use Flow\ETL\Adapter\CSV\Detector\Options;
use Flow\ETL\Adapter\CSV\Exception\CantDetectCSVOptions;
use Flow\ETL\Exception\InvalidArgumentException;

final class CSVDetector
{
private ?Option $fallback;

private Options $options;

/**
* @var resource
*/
private $resource;

private int $startingPosition;

/**
* @param resource $resource
*/
public function __construct($resource, ?Option $fallback = new Option(',', '"', '\\'), ?Options $options = null)
{
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_resource($resource)) {
throw new InvalidArgumentException('Argument must be a valid resource');
}

$this->resource = $resource;
/** @phpstan-ignore-next-line */
$this->startingPosition = \ftell($resource);
$this->options = $options ?? Options::all();
$this->fallback = $fallback;
}

public function __destruct()
{
\fseek($this->resource, $this->startingPosition);
}

/**
* @throws CantDetectCSVOptions|InvalidArgumentException
*/
public function detect(int $lines = 5) : Option
{
if ($lines < 1) {
throw new InvalidArgumentException('Lines must be greater than 0');
}

$readLines = 1;

while ($line = \fgets($this->resource)) {
$this->options->parse($line);

if ($readLines++ >= $lines) {
break;
}
}

try {
$bestOption = $this->options->onlyValid()->best();
} catch (CantDetectCSVOptions $e) {
if ($this->fallback) {
return $this->fallback;
}

throw $e;
}

$this->options = $this->options->reset();

return $bestOption;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public function __construct(
private readonly Path $path,
private readonly bool $withHeader = true,
private readonly bool $emptyToNull = true,
private readonly string $separator = ',',
private readonly string $enclosure = '"',
private readonly string $escape = '\\',
private readonly string|null $separator = null,
private readonly string|null $enclosure = null,
private readonly string|null $escape = null,
private readonly int $charactersReadInLine = 1000
) {
$this->resetLimit();
Expand All @@ -43,15 +43,21 @@ public function extract(FlowContext $context) : \Generator
foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $path) {
$stream = $context->streams()->fs()->open($path, Mode::READ);

$option = \Flow\ETL\Adapter\CSV\csv_detect_separator($stream->resource());

$separator = $this->separator ?? $option->separator;
$enclosure = $this->enclosure ?? $option->enclosure;
$escape = $this->escape ?? $option->escape;

$headers = [];

if ($this->withHeader && \count($headers) === 0) {
/** @var array<string> $headers */
$headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape);
$headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape);
}

/** @var array<mixed> $rowData */
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape);
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape);

if (!\count($headers)) {
$headers = \array_map(fn (int $e) : string => 'e' . \str_pad((string) $e, 2, '0', STR_PAD_LEFT), \range(0, \count($rowData) - 1));
Expand Down Expand Up @@ -81,7 +87,7 @@ public function extract(FlowContext $context) : \Generator
}

if (\count($headers) !== \count($rowData)) {
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape);
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape);

continue;
}
Expand All @@ -101,7 +107,7 @@ public function extract(FlowContext $context) : \Generator
return;
}

$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape);
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV\Detector;

use Flow\ETL\Exception\InvalidArgumentException;

final class Option
{
private const COLUMN_SCORE_WEIGHT = 100_000;

private const COLUMNS_LENGTH_WEIGHT = 10_000;

/**
* @var array<mixed>
*/
private array $rows;

public function __construct(
public string $separator,
public string $enclosure,
public string $escape = '\\'
) {
if (\mb_strlen($this->separator) !== 1) {
throw new InvalidArgumentException('Separator must be a single character');
}

if (\mb_strlen($this->enclosure) !== 1) {
throw new InvalidArgumentException('Enclosure must be a single character');
}

$this->rows = [];
}

public function isValid() : bool
{
$columnsCount = null;

foreach ($this->rows as $row) {
if ($columnsCount === null) {
$columnsCount = \count($row);

continue;
}

if ($columnsCount !== \count($row)) {
return false;
}
}

if ($columnsCount === 1) {
return false;
}

return true;
}

public function parse(string $line) : void
{
$this->rows[] = \str_getcsv($line, $this->separator, $this->enclosure);
}

public function reset() : self
{
return new self($this->separator, $this->enclosure);
}

public function score() : int
{
if (!$this->isValid()) {
return 0;
}

if (!\count($this->rows)) {
return 0;
}

$columnScore = \count($this->rows[0]) * self::COLUMN_SCORE_WEIGHT;
$totalLength = \array_reduce(
$this->rows,
static fn (int $carry, array $row) : int => $carry + \array_reduce(
$row,
static fn (int $carry, $column) : int => $carry + (\is_string($column) ? \mb_strlen($column) : 0),
0
),
0
);

$lengthScore = (int) \round((1 / ($totalLength + 1) * self::COLUMNS_LENGTH_WEIGHT));

return $columnScore + $lengthScore;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV\Detector;

use Flow\ETL\Adapter\CSV\Exception\CantDetectCSVOptions;

final class Options
{
/**
* @var array<Option>
*/
private array $options;

/**
* @param array<Option> $options
*/
public function __construct(array $options)
{
$this->options = $options;
}

public static function all() : self
{
$separators = [',', "\t", ';', '|', ' ', '_', '-', ':', '~', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '?', '!', '\\', '/', '.', '>', '<'];
$enclosures = ['"', "'"];

$options = [];

foreach ($separators as $separator) {
foreach ($enclosures as $enclosure) {
$options[] = new Option($separator, $enclosure);
}
}

return new self($options);
}

public function best() : Option
{
$best = null;

foreach ($this->options as $option) {
if ($best === null) {
$best = $option;

continue;
}

if ($option->score() > $best->score()) {
$best = $option;
}
}

if ($best === null) {
throw new CantDetectCSVOptions('No best option found');
}

return $best;
}

public function onlyValid() : self
{
return new self(\array_filter($this->options, fn (Option $option) : bool => $option->isValid()));
}

public function parse(string $line) : void
{
foreach ($this->options as $option) {
$option->parse($line);
}
}

public function reset() : self
{
$options = [];

foreach ($this->options as $option) {
$options[] = $option->reset();
}

return new self($options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV\Exception;

use Flow\ETL\Exception\RuntimeException;

final class CantDetectCSVOptions extends RuntimeException
{
}
19 changes: 16 additions & 3 deletions src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace Flow\ETL\Adapter\CSV;

use function Flow\ETL\DSL\from_all;
use Flow\ETL\Adapter\CSV\Detector\Option;
use Flow\ETL\Adapter\CSV\Detector\Options;
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;
Expand All @@ -16,9 +18,9 @@ function from_csv(
string|Path|array $path,
bool $with_header = true,
bool $empty_to_null = true,
string $delimiter = ',',
string $enclosure = '"',
string $escape = '\\',
string|null $delimiter = null,
string|null $enclosure = null,
string|null $escape = null,
int $characters_read_in_line = 1000
) : Extractor {
if (\is_array($path)) {
Expand Down Expand Up @@ -67,3 +69,14 @@ function to_csv(
$new_line_separator
);
}

/**
* @param resource $resource - valid resource to CSV file opened with 'r' mode
* @param int<1, max> $lines - number of lines to read from CSV file, default 5, more lines means more accurate detection but slower detection
* @param null|Option $fallback - fallback option to use when no best option can be detected, default is Option(',', '"', '\\')
* @param null|Options $options - options to use for detection, default is Options::all()
*/
function csv_detect_separator($resource, int $lines = 5, ?Option $fallback = new Option(',', '"', '\\'), ?Options $options = null) : Option
{
return (new CSVDetector($resource, $fallback, $options))->detect($lines);
}
Loading

0 comments on commit 179cbe0

Please sign in to comment.