Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Entry factory schema #928

Merged
merged 6 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Row\Schema;

final class CSVExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor
{
Expand All @@ -31,7 +32,8 @@ public function __construct(
private readonly string|null $separator = null,
private readonly string|null $enclosure = null,
private readonly string|null $escape = null,
private readonly int $charactersReadInLine = 1000
private readonly int $charactersReadInLine = 1000,
private readonly Schema|null $schema = null
) {
$this->resetLimit();
}
Expand Down Expand Up @@ -98,7 +100,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $stream->path()->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $path->partitions());
$signal = yield array_to_rows($row, $context->entryFactory(), $path->partitions(), $this->schema);
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;
use Flow\ETL\Row\Schema;

/**
* @param int<0, max> $characters_read_in_line
Expand All @@ -21,7 +22,8 @@ function from_csv(
string|null $delimiter = null,
string|null $enclosure = null,
string|null $escape = null,
int $characters_read_in_line = 1000
int $characters_read_in_line = 1000,
Schema|null $schema = null
) : Extractor {
if (\is_array($path)) {
$extractors = [];
Expand All @@ -35,6 +37,7 @@ function from_csv(
$enclosure,
$escape,
$characters_read_in_line,
$schema
);
}

Expand All @@ -49,6 +52,7 @@ function from_csv(
$enclosure,
$escape,
$characters_read_in_line,
$schema
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\print_schema;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\CSV\CSVExtractor;
use Flow\ETL\Config;
Expand Down Expand Up @@ -143,6 +144,61 @@ public function test_extracting_csv_files_with_header() : void
$this->assertSame(998, $rows->count());
}

public function test_extracting_csv_files_with_schema() : void
{
$path = __DIR__ . '/../Fixtures/annual-enterprise-survey-2019-financial-year-provisional-csv.csv';

$rows = df()
->read(
from_csv($path, schema: $schema = df()
->read(from_csv($path))
->autoCast()
->schema())
)
->fetch();

foreach ($rows as $row) {
$this->assertSame(
[
'Year',
'Industry_aggregation_NZSIOC',
'Industry_code_NZSIOC',
'Industry_name_NZSIOC',
'Units',
'Variable_code',
'Variable_name',
'Variable_category',
'Value',
'Industry_code_ANZSIC06',

],
\array_keys($row->toArray())
);
}

$this->assertSame(998, $rows->count());
$this->assertEquals($schema, $rows->schema());

$this->assertSame(
<<<'SCHEMA'
schema
|-- Year: integer
|-- Industry_aggregation_NZSIOC: string
|-- Industry_code_NZSIOC: string
|-- Industry_name_NZSIOC: string
|-- Units: string
|-- Variable_code: string
|-- Variable_name: string
|-- Variable_category: string
|-- Value: string
|-- Industry_code_ANZSIC06: string

SCHEMA,
print_schema($rows->schema())
);

}

public function test_extracting_csv_files_without_header() : void
{
$extractor = from_csv(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Row\Schema;
use JsonMachine\Items;
use JsonMachine\JsonDecoder\ExtJsonDecoder;

Expand All @@ -26,6 +27,7 @@ final class JsonExtractor implements Extractor, FileExtractor, LimitableExtracto
public function __construct(
private readonly Path $path,
private readonly ?string $pointer = null,
private readonly Schema|null $schema = null,
) {
$this->resetLimit();
}
Expand All @@ -46,7 +48,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $filePath->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions());
$signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions(), $this->schema);
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;
use Flow\ETL\Row\Schema;

/**
* @param array<Path|string>|Path|string $path - string is internally turned into stream
Expand All @@ -19,6 +20,7 @@
function from_json(
string|Path|array $path,
?string $pointer = null,
Schema|null $schema = null,
) : Extractor {
if (\is_array($path)) {
$extractors = [];
Expand All @@ -27,6 +29,7 @@ function from_json(
$extractors[] = new JsonExtractor(
\is_string($file) ? Path::realpath($file) : $file,
$pointer,
$schema
);
}

Expand All @@ -36,6 +39,7 @@ function from_json(
return new JsonExtractor(
\is_string($path) ? Path::realpath($path) : $path,
$pointer,
$schema
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

use function Flow\ETL\Adapter\JSON\from_json;
use function Flow\ETL\Adapter\JSON\to_json;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\print_schema;
use Flow\ETL\Adapter\JSON\JSONMachine\JsonExtractor;
use Flow\ETL\Config;
use Flow\ETL\Extractor\Signal;
Expand Down Expand Up @@ -65,6 +67,48 @@ public function test_extracting_json_from_local_file_stream_using_pointer() : vo
$this->assertSame(247, $rows->count());
}

public function test_extracting_json_from_local_file_stream_with_schema() : void
{
$rows = df()
->read(from_json(
__DIR__ . '/../../Fixtures/timezones.json',
schema: $schema = df()
->read(from_json(__DIR__ . '/../../Fixtures/timezones.json'))
->autoCast()
->schema()
))
->fetch();

foreach ($rows as $row) {
$this->assertSame(
[
'timezones',
'latlng',
'name',
'country_code',
'capital',
],
\array_keys($row->toArray())
);
}

$this->assertSame(247, $rows->count());
$this->assertEquals($schema, $rows->schema());
$this->assertSame(
<<<'SCHEMA'
schema
|-- timezones: list<string>
|-- latlng: list<float>
|-- name: string
|-- country_code: string
|-- capital: ?string

SCHEMA
,
print_schema($schema)
);
}

public function test_extracting_json_from_local_file_string_uri() : void
{
$extractor = new JsonExtractor(Path::realpath(__DIR__ . '/../../Fixtures/timezones.json'));
Expand Down
48 changes: 42 additions & 6 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
use Flow\ETL\PHP\Type\Native\ResourceType;
use Flow\ETL\PHP\Type\Native\ScalarType;
use Flow\ETL\PHP\Type\Type;
use Flow\ETL\PHP\Type\TypeDetector;
use Flow\ETL\Pipeline;
use Flow\ETL\Row;
use Flow\ETL\Row\EntryFactory;
Expand Down Expand Up @@ -357,6 +358,11 @@ function struct_entry(string $name, array $value, StructureType $type) : Row\Ent
return new Row\Entry\StructureEntry($name, $value, $type);
}

function structure_entry(string $name, array $value, StructureType $type) : Row\Entry\StructureEntry
{
return new Row\Entry\StructureEntry($name, $value, $type);
}

/**
* @param array<string, StructureElement> $elements
*/
Expand All @@ -365,11 +371,21 @@ function struct_type(array $elements, bool $nullable = false) : StructureType
return new StructureType($elements, $nullable);
}

function structure_type(array $elements, bool $nullable = false) : StructureType
{
return new StructureType($elements, $nullable);
}

function struct_element(string $name, Type $type) : StructureElement
{
return new StructureElement($name, $type);
}

function structure_element(string $name, Type $type) : StructureElement
{
return new StructureElement($name, $type);
}

function list_entry(string $name, array $value, ListType $type) : Row\Entry\ListEntry
{
return new Row\Entry\ListEntry($name, $value, $type);
Expand Down Expand Up @@ -420,6 +436,11 @@ function type_int(bool $nullable = false) : ScalarType
return ScalarType::integer($nullable);
}

function type_integer(bool $nullable = false) : ScalarType
{
return ScalarType::integer($nullable);
}

function type_string(bool $nullable = false) : ScalarType
{
return ScalarType::string($nullable);
Expand Down Expand Up @@ -661,7 +682,7 @@ function hash(ScalarFunction $function, string $algorithm = 'xxh128', bool $bina
return new Hash($function, $algorithm, $binary, $options);
}

function cast(ScalarFunction $function, string $type) : Cast
function cast(ScalarFunction $function, string|Type $type) : Cast
{
return new Cast($function, $type);
}
Expand Down Expand Up @@ -862,7 +883,7 @@ function number_format(ScalarFunction $function, ?ScalarFunction $decimals = nul
* @param array<array<mixed>>|array<mixed|string> $data
* @param array<Partition>|\Flow\ETL\Partitions $partitions
*/
function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntryFactory(), array|\Flow\ETL\Partitions $partitions = []) : Rows
function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntryFactory(), array|\Flow\ETL\Partitions $partitions = [], ?Schema $schema = null) : Rows
{
$partitions = \is_array($partitions) ? new \Flow\ETL\Partitions(...$partitions) : $partitions;

Expand All @@ -882,12 +903,12 @@ function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntry
foreach ($data as $key => $value) {
$name = \is_int($key) ? 'e' . \str_pad((string) $key, 2, '0', STR_PAD_LEFT) : $key;

$entries[$name] = $entryFactory->create($name, $value);
$entries[$name] = $entryFactory->create($name, $value, $schema);
}

foreach ($partitions as $partition) {
if (!\array_key_exists($partition->name, $entries)) {
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value);
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value, $schema);
}
}

Expand All @@ -901,12 +922,12 @@ function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntry

foreach ($row as $column => $value) {
$name = \is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column;
$entries[$name] = $entryFactory->create(\is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column, $value);
$entries[$name] = $entryFactory->create(\is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column, $value, $schema);
}

foreach ($partitions as $partition) {
if (!\array_key_exists($partition->name, $entries)) {
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value);
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value, $schema);
}
}

Expand Down Expand Up @@ -1108,3 +1129,18 @@ function append() : SaveMode
{
return SaveMode::Append;
}

function get_type(mixed $value) : Type
{
return (new TypeDetector())->detectType($value);
}

function print_schema(Schema $schema, ?SchemaFormatter $formatter = null) : string
{
return ($formatter ?? new ASCIISchemaFormatter())->format($schema);
}

function print_rows(Rows $rows, int|bool $truncate = false, ?Formatter $formatter = null) : string
{
return ($formatter ?? new Formatter\AsciiTableFormatter())->format($rows, $truncate);
}
4 changes: 3 additions & 1 deletion src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
use Flow\ETL\Loader\SchemaValidationLoader;
use Flow\ETL\Loader\StreamLoader\Output;
use Flow\ETL\Partition\ScalarFunctionFilter;
use Flow\ETL\PHP\Type\AutoCaster;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Pipeline\BatchingPipeline;
use Flow\ETL\Pipeline\CachingPipeline;
use Flow\ETL\Pipeline\CollectingPipeline;
Expand Down Expand Up @@ -151,7 +153,7 @@ public function appendSafe(bool $appendSafe = true) : self

public function autoCast() : self
{
$this->pipeline->add(new AutoCastTransformer());
$this->pipeline->add(new AutoCastTransformer(new AutoCaster(Caster::default())));

return $this;
}
Expand Down
Loading