Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema definition type #927

Merged
merged 6 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
},
"autoload": {
"files": [
"src/functions.php",
"src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/functions.php",
"src/adapter/etl-adapter-chartjs/src/Flow/ETL/Adapter/ChartJS/functions.php",
"src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php",
Expand Down
2 changes: 1 addition & 1 deletion examples/topics/db/db_clean.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env php
<?php
<?php declare(strict_types=1);

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
print "This example cannot be run in PHAR, please use CLI approach.\n";
Expand Down
2 changes: 1 addition & 1 deletion examples/topics/db/db_source.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env php
<?php
<?php declare(strict_types=1);

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
print "This example cannot be run in PHAR, please use CLI approach.\n";
Expand Down
3 changes: 0 additions & 3 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@
<directory>src/adapter/**/**/**/**/**/**/Tests/Integration</directory>
<directory>src/core/etl/tests/Flow/ETL/Tests/Integration</directory>
<directory>src/lib/**/**/**/**/Tests/Integration</directory>
</testsuite>
<testsuite name="integration-services">
<directory>src/adapter/**/**/**/**/**/**/Tests/Integration</directory>
<directory>src/lib/doctrine-dbal-bulk/tests/Flow/Doctrine/Bulk/Tests/Integration</directory>
</testsuite>
</testsuites>
Expand Down
2 changes: 2 additions & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@
</projectFiles>
<issueHandlers>
<RiskyTruthyFalsyComparison errorLevel="suppress" />
<LessSpecificReturnStatement errorLevel="suppress" />
<MoreSpecificReturnType errorLevel="suppress" />
</issueHandlers>
</psalm>
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public function load(Rows $rows, FlowContext $context) : void
private function listEntryToValues(Row\Entry\ListEntry $entry) : array
{
/** @var ListType $listType */
$listType = $entry->definition()->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);
$listType = $entry->definition()->type();
$listElement = $listType->element();

if ($listElement->type() instanceof UuidType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,20 @@

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
use Flow\ETL\PHP\Type\Logical\JsonType;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Logical\MapType;
use Flow\ETL\PHP\Type\Logical\Structure\StructureElement;
use Flow\ETL\PHP\Type\Logical\StructureType;
use Flow\ETL\PHP\Type\Logical\UuidType;
use Flow\ETL\PHP\Type\Logical\XMLNodeType;
use Flow\ETL\PHP\Type\Logical\XMLType;
use Flow\ETL\PHP\Type\Native\ArrayType;
use Flow\ETL\PHP\Type\Native\EnumType;
use Flow\ETL\PHP\Type\Native\ObjectType;
use Flow\ETL\PHP\Type\Native\ScalarType;
use Flow\ETL\Row\Entry;
use Flow\ETL\Row\Entry\ListEntry;
use Flow\ETL\Row\Entry\NullEntry;
use Flow\ETL\Row\Entry\StructureEntry;
use Flow\ETL\Row\Schema;
use Flow\ETL\Row\Schema\Definition;
use Flow\ETL\Row\Schema\FlowMetadata;

final class SchemaConverter
{
Expand All @@ -46,12 +45,10 @@ public function toAvroJsonSchema(Schema $schema) : string
*/
private function convert(Definition $definition) : array
{
$type = $this->typeFromDefinition($definition);
$type = $definition->type();

if ($type === ListEntry::class) {
/** @var ListType $listType */
$listType = $definition->metadata()->get(FlowMetadata::METADATA_LIST_ENTRY_TYPE);
$listElement = $listType->element();
if ($type instanceof ListType) {
$listElement = $type->element();

if ($listElement->type() instanceof ScalarType) {
return match ($listElement->type()->toString()) {
Expand All @@ -70,23 +67,17 @@ private function convert(Definition $definition) : array
throw new RuntimeException("List of {$listElement->toString()} is not supported yet supported.");
}

if ($type === Entry\MapEntry::class) {
/** @var MapType $mapType */
$mapType = $definition->metadata()->get(FlowMetadata::METADATA_MAP_ENTRY_TYPE);

return match ($mapType->value()->type()->toString()) {
if ($type instanceof MapType) {
return match ($type->value()->type()->toString()) {
ScalarType::STRING => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::STRING_TYPE]],
ScalarType::INTEGER => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::INT_TYPE]],
ScalarType::FLOAT => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::FLOAT_TYPE]],
ScalarType::BOOLEAN => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::BOOLEAN_TYPE]],
default => throw new RuntimeException('Map ' . $mapType->toString() . ' is not supported yet supported.'),
default => throw new RuntimeException('Map ' . $type->toString() . ' is not supported yet supported.'),
};
}

if ($type === StructureEntry::class) {
/** @var StructureType $structureType */
$structureType = $definition->metadata()->get(FlowMetadata::METADATA_STRUCTURE_ENTRY_TYPE);

if ($type instanceof StructureType) {
$structConverter = function (array $definitions) use (&$structConverter) : array {
$structureFields = [];

Expand All @@ -113,30 +104,31 @@ private function convert(Definition $definition) : array

return [
'name' => $definition->entry()->name(),
'type' => ['name' => \ucfirst($definition->entry()->name()), 'type' => \AvroSchema::RECORD_SCHEMA, 'fields' => $structConverter($structureType->elements())],
'type' => ['name' => \ucfirst($definition->entry()->name()), 'type' => \AvroSchema::RECORD_SCHEMA, 'fields' => $structConverter($type->elements())],
];
}

$avroType = match ($type) {
Entry\StringEntry::class, Entry\JsonEntry::class, Entry\UuidEntry::class, Entry\XMLEntry::class, Entry\XMLNodeEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
Entry\EnumEntry::class => [
$avroType = match ($type::class) {
JsonType::class, UuidType::class, XMLType::class, XMLNodeType::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
EnumType::class => [
'name' => $definition->entry()->name(),
'type' => [
'name' => $definition->entry()->name(),
'type' => \AvroSchema::ENUM_SCHEMA,
'symbols' => \array_map(
fn (\UnitEnum $e) => $e->name,
$definition->metadata()->get(FlowMetadata::METADATA_ENUM_CASES)
$definition->type()->class::cases()
),
],
],
Entry\IntegerEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::INT_TYPE],
Entry\FloatEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::FLOAT_TYPE],
Entry\BooleanEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::BOOLEAN_TYPE],
Entry\ArrayEntry::class => throw new RuntimeException("ArrayEntry entry can't be saved in Avro file, try convert it to ListEntry"),
Entry\DateTimeEntry::class => ['name' => $definition->entry()->name(), 'type' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros'],
NullEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::NULL_TYPE],
default => throw new RuntimeException($type . ' is not yet supported.')
DateTimeType::class => ['name' => $definition->entry()->name(), 'type' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros'],
ScalarType::class => match ($type->type()) {
ScalarType::STRING => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
ScalarType::INTEGER => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::INT_TYPE],
ScalarType::FLOAT => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::FLOAT_TYPE],
ScalarType::BOOLEAN => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::BOOLEAN_TYPE],
},
default => throw new RuntimeException($type::class . ' is not yet supported.')
};

if ($definition->isNullable()) {
Expand Down Expand Up @@ -198,18 +190,4 @@ private function structureElementToArvo(StructureElement $element) : array

throw new RuntimeException($element->toString() . ' is not yet supported.');
}

private function typeFromDefinition(Definition $definition) : string
{
if ($definition->isNullable() && \count($definition->types()) === 2) {
/** @var class-string<Entry> $type */
$type = \current(\array_diff($definition->types(), [NullEntry::class]));
} elseif (\count($definition->types()) === 1) {
$type = \current($definition->types());
} else {
throw new RuntimeException('Union types are not supported by Avro file format. Invalid type: ' . $definition->entry()->name());
}

return $type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function __construct()

public function bench_extract_10k() : void
{
foreach (from_csv(__DIR__ . '/../Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
foreach (from_csv(__DIR__ . '/Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function __construct()
$this->outputPath = \tempnam(\sys_get_temp_dir(), 'etl_csv_loader_bench') . '.csv';
$this->rows = new Rows();

foreach (from_csv(__DIR__ . '/../Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
foreach (from_csv(__DIR__ . '/Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
$this->rows = $this->rows->merge($rows);
}
}
Expand Down
Loading