Skip to content

Commit

Permalink
Entry factory schema (#928)
Browse files Browse the repository at this point in the history
* Added type Caster

* use Type in Scalar Cast function

* Unified and reorganized casting/type detection logic

* Allow to pass schema into CSV/Json extractors

* Casting Lists/Maps/Structures

* Microoptimization
  • Loading branch information
norberttech authored Jan 21, 2024
1 parent e1d3519 commit 42f03f7
Show file tree
Hide file tree
Showing 59 changed files with 1,915 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Row\Schema;

final class CSVExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor
{
Expand All @@ -31,7 +32,8 @@ public function __construct(
private readonly string|null $separator = null,
private readonly string|null $enclosure = null,
private readonly string|null $escape = null,
private readonly int $charactersReadInLine = 1000
private readonly int $charactersReadInLine = 1000,
private readonly Schema|null $schema = null
) {
$this->resetLimit();
}
Expand Down Expand Up @@ -98,7 +100,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $stream->path()->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $path->partitions());
$signal = yield array_to_rows($row, $context->entryFactory(), $path->partitions(), $this->schema);
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;
use Flow\ETL\Row\Schema;

/**
* @param int<0, max> $characters_read_in_line
Expand All @@ -21,7 +22,8 @@ function from_csv(
string|null $delimiter = null,
string|null $enclosure = null,
string|null $escape = null,
int $characters_read_in_line = 1000
int $characters_read_in_line = 1000,
Schema|null $schema = null
) : Extractor {
if (\is_array($path)) {
$extractors = [];
Expand All @@ -35,6 +37,7 @@ function from_csv(
$enclosure,
$escape,
$characters_read_in_line,
$schema
);
}

Expand All @@ -49,6 +52,7 @@ function from_csv(
$enclosure,
$escape,
$characters_read_in_line,
$schema
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\print_schema;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\CSV\CSVExtractor;
use Flow\ETL\Config;
Expand Down Expand Up @@ -143,6 +144,61 @@ public function test_extracting_csv_files_with_header() : void
$this->assertSame(998, $rows->count());
}

public function test_extracting_csv_files_with_schema() : void
{
$path = __DIR__ . '/../Fixtures/annual-enterprise-survey-2019-financial-year-provisional-csv.csv';

$rows = df()
->read(
from_csv($path, schema: $schema = df()
->read(from_csv($path))
->autoCast()
->schema())
)
->fetch();

foreach ($rows as $row) {
$this->assertSame(
[
'Year',
'Industry_aggregation_NZSIOC',
'Industry_code_NZSIOC',
'Industry_name_NZSIOC',
'Units',
'Variable_code',
'Variable_name',
'Variable_category',
'Value',
'Industry_code_ANZSIC06',

],
\array_keys($row->toArray())
);
}

$this->assertSame(998, $rows->count());
$this->assertEquals($schema, $rows->schema());

$this->assertSame(
<<<'SCHEMA'
schema
|-- Year: integer
|-- Industry_aggregation_NZSIOC: string
|-- Industry_code_NZSIOC: string
|-- Industry_name_NZSIOC: string
|-- Units: string
|-- Variable_code: string
|-- Variable_name: string
|-- Variable_category: string
|-- Value: string
|-- Industry_code_ANZSIC06: string

SCHEMA,
print_schema($rows->schema())
);

}

public function test_extracting_csv_files_without_header() : void
{
$extractor = from_csv(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Row\Schema;
use JsonMachine\Items;
use JsonMachine\JsonDecoder\ExtJsonDecoder;

Expand All @@ -26,6 +27,7 @@ final class JsonExtractor implements Extractor, FileExtractor, LimitableExtracto
public function __construct(
private readonly Path $path,
private readonly ?string $pointer = null,
private readonly Schema|null $schema = null,
) {
$this->resetLimit();
}
Expand All @@ -46,7 +48,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $filePath->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions());
$signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions(), $this->schema);
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;
use Flow\ETL\Row\Schema;

/**
* @param array<Path|string>|Path|string $path - string is internally turned into stream
Expand All @@ -19,6 +20,7 @@
function from_json(
string|Path|array $path,
?string $pointer = null,
Schema|null $schema = null,
) : Extractor {
if (\is_array($path)) {
$extractors = [];
Expand All @@ -27,6 +29,7 @@ function from_json(
$extractors[] = new JsonExtractor(
\is_string($file) ? Path::realpath($file) : $file,
$pointer,
$schema
);
}

Expand All @@ -36,6 +39,7 @@ function from_json(
return new JsonExtractor(
\is_string($path) ? Path::realpath($path) : $path,
$pointer,
$schema
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

use function Flow\ETL\Adapter\JSON\from_json;
use function Flow\ETL\Adapter\JSON\to_json;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\print_schema;
use Flow\ETL\Adapter\JSON\JSONMachine\JsonExtractor;
use Flow\ETL\Config;
use Flow\ETL\Extractor\Signal;
Expand Down Expand Up @@ -65,6 +67,48 @@ public function test_extracting_json_from_local_file_stream_using_pointer() : vo
$this->assertSame(247, $rows->count());
}

public function test_extracting_json_from_local_file_stream_with_schema() : void
{
$rows = df()
->read(from_json(
__DIR__ . '/../../Fixtures/timezones.json',
schema: $schema = df()
->read(from_json(__DIR__ . '/../../Fixtures/timezones.json'))
->autoCast()
->schema()
))
->fetch();

foreach ($rows as $row) {
$this->assertSame(
[
'timezones',
'latlng',
'name',
'country_code',
'capital',
],
\array_keys($row->toArray())
);
}

$this->assertSame(247, $rows->count());
$this->assertEquals($schema, $rows->schema());
$this->assertSame(
<<<'SCHEMA'
schema
|-- timezones: list<string>
|-- latlng: list<float>
|-- name: string
|-- country_code: string
|-- capital: ?string

SCHEMA
,
print_schema($schema)
);
}

public function test_extracting_json_from_local_file_string_uri() : void
{
$extractor = new JsonExtractor(Path::realpath(__DIR__ . '/../../Fixtures/timezones.json'));
Expand Down
48 changes: 42 additions & 6 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
use Flow\ETL\PHP\Type\Native\ResourceType;
use Flow\ETL\PHP\Type\Native\ScalarType;
use Flow\ETL\PHP\Type\Type;
use Flow\ETL\PHP\Type\TypeDetector;
use Flow\ETL\Pipeline;
use Flow\ETL\Row;
use Flow\ETL\Row\EntryFactory;
Expand Down Expand Up @@ -357,6 +358,11 @@ function struct_entry(string $name, array $value, StructureType $type) : Row\Ent
return new Row\Entry\StructureEntry($name, $value, $type);
}

function structure_entry(string $name, array $value, StructureType $type) : Row\Entry\StructureEntry
{
return new Row\Entry\StructureEntry($name, $value, $type);
}

/**
* @param array<string, StructureElement> $elements
*/
Expand All @@ -365,11 +371,21 @@ function struct_type(array $elements, bool $nullable = false) : StructureType
return new StructureType($elements, $nullable);
}

function structure_type(array $elements, bool $nullable = false) : StructureType
{
return new StructureType($elements, $nullable);
}

function struct_element(string $name, Type $type) : StructureElement
{
return new StructureElement($name, $type);
}

function structure_element(string $name, Type $type) : StructureElement
{
return new StructureElement($name, $type);
}

function list_entry(string $name, array $value, ListType $type) : Row\Entry\ListEntry
{
return new Row\Entry\ListEntry($name, $value, $type);
Expand Down Expand Up @@ -420,6 +436,11 @@ function type_int(bool $nullable = false) : ScalarType
return ScalarType::integer($nullable);
}

function type_integer(bool $nullable = false) : ScalarType
{
return ScalarType::integer($nullable);
}

function type_string(bool $nullable = false) : ScalarType
{
return ScalarType::string($nullable);
Expand Down Expand Up @@ -661,7 +682,7 @@ function hash(ScalarFunction $function, string $algorithm = 'xxh128', bool $bina
return new Hash($function, $algorithm, $binary, $options);
}

function cast(ScalarFunction $function, string $type) : Cast
function cast(ScalarFunction $function, string|Type $type) : Cast
{
return new Cast($function, $type);
}
Expand Down Expand Up @@ -862,7 +883,7 @@ function number_format(ScalarFunction $function, ?ScalarFunction $decimals = nul
* @param array<array<mixed>>|array<mixed|string> $data
* @param array<Partition>|\Flow\ETL\Partitions $partitions
*/
function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntryFactory(), array|\Flow\ETL\Partitions $partitions = []) : Rows
function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntryFactory(), array|\Flow\ETL\Partitions $partitions = [], ?Schema $schema = null) : Rows
{
$partitions = \is_array($partitions) ? new \Flow\ETL\Partitions(...$partitions) : $partitions;

Expand All @@ -882,12 +903,12 @@ function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntry
foreach ($data as $key => $value) {
$name = \is_int($key) ? 'e' . \str_pad((string) $key, 2, '0', STR_PAD_LEFT) : $key;

$entries[$name] = $entryFactory->create($name, $value);
$entries[$name] = $entryFactory->create($name, $value, $schema);
}

foreach ($partitions as $partition) {
if (!\array_key_exists($partition->name, $entries)) {
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value);
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value, $schema);
}
}

Expand All @@ -901,12 +922,12 @@ function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntry

foreach ($row as $column => $value) {
$name = \is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column;
$entries[$name] = $entryFactory->create(\is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column, $value);
$entries[$name] = $entryFactory->create(\is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column, $value, $schema);
}

foreach ($partitions as $partition) {
if (!\array_key_exists($partition->name, $entries)) {
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value);
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value, $schema);
}
}

Expand Down Expand Up @@ -1108,3 +1129,18 @@ function append() : SaveMode
{
return SaveMode::Append;
}

function get_type(mixed $value) : Type
{
return (new TypeDetector())->detectType($value);
}

function print_schema(Schema $schema, ?SchemaFormatter $formatter = null) : string
{
return ($formatter ?? new ASCIISchemaFormatter())->format($schema);
}

function print_rows(Rows $rows, int|bool $truncate = false, ?Formatter $formatter = null) : string
{
return ($formatter ?? new Formatter\AsciiTableFormatter())->format($rows, $truncate);
}
4 changes: 3 additions & 1 deletion src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
use Flow\ETL\Loader\SchemaValidationLoader;
use Flow\ETL\Loader\StreamLoader\Output;
use Flow\ETL\Partition\ScalarFunctionFilter;
use Flow\ETL\PHP\Type\AutoCaster;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Pipeline\BatchingPipeline;
use Flow\ETL\Pipeline\CachingPipeline;
use Flow\ETL\Pipeline\CollectingPipeline;
Expand Down Expand Up @@ -151,7 +153,7 @@ public function appendSafe(bool $appendSafe = true) : self

public function autoCast() : self
{
$this->pipeline->add(new AutoCastTransformer());
$this->pipeline->add(new AutoCastTransformer(new AutoCaster(Caster::default())));

return $this;
}
Expand Down
Loading

0 comments on commit 42f03f7

Please sign in to comment.