Skip to content

Commit

Permalink
DataFrame::run method can now return execution report with Schema and…
Browse files Browse the repository at this point in the history
… Statistics
  • Loading branch information
norberttech committed Jan 28, 2024
1 parent 9bc119e commit be1b369
Show file tree
Hide file tree
Showing 5 changed files with 2,657 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use function Flow\ETL\DSL\to_output;
use Flow\ETL\DataFrame\GroupedDataFrame;
use Flow\ETL\DataFrame\PartitionedDataFrame;
use Flow\ETL\Dataset\Report;
use Flow\ETL\Dataset\Statistics;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\InvalidFileFormatException;
use Flow\ETL\Exception\RuntimeException;
Expand Down Expand Up @@ -748,17 +750,32 @@ public function rows(Transformer|Transformation $transformer) : self
* @trigger
*
* @param null|callable(Rows $rows): void $callback
* @param bool $analyze - when set to true, run will return Report
*/
#[DSLMethod(exclude: true)]
public function run(?callable $callback = null) : void
public function run(?callable $callback = null, bool $analyze = false) : null|Report
{
$clone = clone $this;

$totalRows = 0;
$schema = new Schema();

foreach ($clone->pipeline->process($clone->context) as $rows) {
if ($callback !== null) {
$callback($rows);
}

if ($analyze) {
$schema = $schema->merge($rows->schema());
$totalRows += $rows->count();
}
}

if ($analyze) {
return new Report($schema, new Statistics($totalRows));
}

return null;
}

/**
Expand Down
27 changes: 27 additions & 0 deletions src/core/etl/src/Flow/ETL/Dataset/Report.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Dataset;

use Flow\ETL\Row\Schema;

final class Report
{
public function __construct(
private readonly Schema $schema,
private readonly Statistics $statistics,
) {

}

public function schema() : Schema
{
return $this->schema;
}

public function statistics() : Statistics
{
return $this->statistics;
}
}
18 changes: 18 additions & 0 deletions src/core/etl/src/Flow/ETL/Dataset/Statistics.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Dataset;

final class Statistics
{
public function __construct(
private readonly int $totalRows,
) {
}

public function totalRows() : int
{
return $this->totalRows;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\DataFrame;

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\Text\from_text;
use function Flow\ETL\DSL\datetime_schema;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\schema;
use function Flow\ETL\DSL\str_schema;
use Flow\ETL\Tests\Integration\IntegrationTestCase;

final class AnalyzeTest extends IntegrationTestCase
{
public function test_analyzing_csv_file_with_auto_cast() : void
{
$report = df()
->read(from_csv(__DIR__ . '/Fixtures/Analyze/goldstock.csv'))
->autoCast()
->run(analyze: true);

$this->assertSame(2511, $report->statistics()->totalRows());
$this->assertEquals(
schema(
int_schema('Index'),
datetime_schema('Date'),
float_schema('Close'),
float_schema('Volume'),
float_schema('Open'),
float_schema('High'),
float_schema('Low'),
),
$report->schema()
);
$this->assertSame(7, $report->schema()->count());
}

public function test_analyzing_csv_file_with_limit() : void
{
$report = df()
->read(from_csv(__DIR__ . '/Fixtures/Analyze/goldstock.csv'))
->limit(100)
->run(analyze: true);

$this->assertSame(100, $report->statistics()->totalRows());
$this->assertEquals(
schema(
str_schema('Index'),
str_schema('Date'),
str_schema('Close'),
str_schema('Volume'),
str_schema('Open'),
str_schema('High'),
str_schema('Low'),
),
$report->schema()
);
$this->assertSame(7, $report->schema()->count());
}

public function test_analyzing_partitioned_datasets() : void
{
$report = df()
->read(from_text(__DIR__ . '/Fixtures/Partitioning/multi_partition_pruning_test/year=*/month=*/day=*/*.txt'))
->run(analyze: true);

$this->assertSame(7, $report->statistics()->totalRows());
$this->assertEquals(
schema(
str_schema('year'),
str_schema('month'),
str_schema('day'),
str_schema('text'),
),
$report->schema()
);
}
}
Loading

0 comments on commit be1b369

Please sign in to comment.