Skip to content

Commit

Permalink
Schema definition type (#927)
Browse files Browse the repository at this point in the history
* Removed union types from schema definitions

* Move merging schema definitions logic to types

* Make impossible to merge two schema definitions that are pointing to a different entry

* Removed development leftovers

* Improved failed Definition::merge exception

* Removed schema narrowing
  • Loading branch information
norberttech authored Jan 21, 2024
1 parent f82b2ce commit e81e228
Show file tree
Hide file tree
Showing 49 changed files with 10,915 additions and 63,557 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
},
"autoload": {
"files": [
"src/functions.php",
"src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/functions.php",
"src/adapter/etl-adapter-chartjs/src/Flow/ETL/Adapter/ChartJS/functions.php",
"src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php",
Expand Down
2 changes: 1 addition & 1 deletion examples/topics/db/db_clean.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env php
<?php
<?php declare(strict_types=1);

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
print "This example cannot be run in PHAR, please use CLI approach.\n";
Expand Down
2 changes: 1 addition & 1 deletion examples/topics/db/db_source.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env php
<?php
<?php declare(strict_types=1);

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
print "This example cannot be run in PHAR, please use CLI approach.\n";
Expand Down
3 changes: 0 additions & 3 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@
<directory>src/adapter/**/**/**/**/**/**/Tests/Integration</directory>
<directory>src/core/etl/tests/Flow/ETL/Tests/Integration</directory>
<directory>src/lib/**/**/**/**/Tests/Integration</directory>
</testsuite>
<testsuite name="integration-services">
<directory>src/adapter/**/**/**/**/**/**/Tests/Integration</directory>
<directory>src/lib/doctrine-dbal-bulk/tests/Flow/Doctrine/Bulk/Tests/Integration</directory>
</testsuite>
</testsuites>
Expand Down
2 changes: 2 additions & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@
</projectFiles>
<issueHandlers>
<RiskyTruthyFalsyComparison errorLevel="suppress" />
<LessSpecificReturnStatement errorLevel="suppress" />
<MoreSpecificReturnType errorLevel="suppress" />
</issueHandlers>
</psalm>
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public function load(Rows $rows, FlowContext $context) : void
private function listEntryToValues(Row\Entry\ListEntry $entry) : array
{
/** @var ListType $listType */
$listType = $entry->definition()->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);
$listType = $entry->definition()->type();
$listElement = $listType->element();

if ($listElement->type() instanceof UuidType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,20 @@

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
use Flow\ETL\PHP\Type\Logical\JsonType;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Logical\MapType;
use Flow\ETL\PHP\Type\Logical\Structure\StructureElement;
use Flow\ETL\PHP\Type\Logical\StructureType;
use Flow\ETL\PHP\Type\Logical\UuidType;
use Flow\ETL\PHP\Type\Logical\XMLNodeType;
use Flow\ETL\PHP\Type\Logical\XMLType;
use Flow\ETL\PHP\Type\Native\ArrayType;
use Flow\ETL\PHP\Type\Native\EnumType;
use Flow\ETL\PHP\Type\Native\ObjectType;
use Flow\ETL\PHP\Type\Native\ScalarType;
use Flow\ETL\Row\Entry;
use Flow\ETL\Row\Entry\ListEntry;
use Flow\ETL\Row\Entry\NullEntry;
use Flow\ETL\Row\Entry\StructureEntry;
use Flow\ETL\Row\Schema;
use Flow\ETL\Row\Schema\Definition;
use Flow\ETL\Row\Schema\FlowMetadata;

final class SchemaConverter
{
Expand All @@ -46,12 +45,10 @@ public function toAvroJsonSchema(Schema $schema) : string
*/
private function convert(Definition $definition) : array
{
$type = $this->typeFromDefinition($definition);
$type = $definition->type();

if ($type === ListEntry::class) {
/** @var ListType $listType */
$listType = $definition->metadata()->get(FlowMetadata::METADATA_LIST_ENTRY_TYPE);
$listElement = $listType->element();
if ($type instanceof ListType) {
$listElement = $type->element();

if ($listElement->type() instanceof ScalarType) {
return match ($listElement->type()->toString()) {
Expand All @@ -70,23 +67,17 @@ private function convert(Definition $definition) : array
throw new RuntimeException("List of {$listElement->toString()} is not supported yet supported.");
}

if ($type === Entry\MapEntry::class) {
/** @var MapType $mapType */
$mapType = $definition->metadata()->get(FlowMetadata::METADATA_MAP_ENTRY_TYPE);

return match ($mapType->value()->type()->toString()) {
if ($type instanceof MapType) {
return match ($type->value()->type()->toString()) {
ScalarType::STRING => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::STRING_TYPE]],
ScalarType::INTEGER => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::INT_TYPE]],
ScalarType::FLOAT => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::FLOAT_TYPE]],
ScalarType::BOOLEAN => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::BOOLEAN_TYPE]],
default => throw new RuntimeException('Map ' . $mapType->toString() . ' is not supported yet supported.'),
default => throw new RuntimeException('Map ' . $type->toString() . ' is not supported yet supported.'),
};
}

if ($type === StructureEntry::class) {
/** @var StructureType $structureType */
$structureType = $definition->metadata()->get(FlowMetadata::METADATA_STRUCTURE_ENTRY_TYPE);

if ($type instanceof StructureType) {
$structConverter = function (array $definitions) use (&$structConverter) : array {
$structureFields = [];

Expand All @@ -113,30 +104,31 @@ private function convert(Definition $definition) : array

return [
'name' => $definition->entry()->name(),
'type' => ['name' => \ucfirst($definition->entry()->name()), 'type' => \AvroSchema::RECORD_SCHEMA, 'fields' => $structConverter($structureType->elements())],
'type' => ['name' => \ucfirst($definition->entry()->name()), 'type' => \AvroSchema::RECORD_SCHEMA, 'fields' => $structConverter($type->elements())],
];
}

$avroType = match ($type) {
Entry\StringEntry::class, Entry\JsonEntry::class, Entry\UuidEntry::class, Entry\XMLEntry::class, Entry\XMLNodeEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
Entry\EnumEntry::class => [
$avroType = match ($type::class) {
JsonType::class, UuidType::class, XMLType::class, XMLNodeType::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
EnumType::class => [
'name' => $definition->entry()->name(),
'type' => [
'name' => $definition->entry()->name(),
'type' => \AvroSchema::ENUM_SCHEMA,
'symbols' => \array_map(
fn (\UnitEnum $e) => $e->name,
$definition->metadata()->get(FlowMetadata::METADATA_ENUM_CASES)
$definition->type()->class::cases()
),
],
],
Entry\IntegerEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::INT_TYPE],
Entry\FloatEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::FLOAT_TYPE],
Entry\BooleanEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::BOOLEAN_TYPE],
Entry\ArrayEntry::class => throw new RuntimeException("ArrayEntry entry can't be saved in Avro file, try convert it to ListEntry"),
Entry\DateTimeEntry::class => ['name' => $definition->entry()->name(), 'type' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros'],
NullEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::NULL_TYPE],
default => throw new RuntimeException($type . ' is not yet supported.')
DateTimeType::class => ['name' => $definition->entry()->name(), 'type' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros'],
ScalarType::class => match ($type->type()) {
ScalarType::STRING => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
ScalarType::INTEGER => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::INT_TYPE],
ScalarType::FLOAT => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::FLOAT_TYPE],
ScalarType::BOOLEAN => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::BOOLEAN_TYPE],
},
default => throw new RuntimeException($type::class . ' is not yet supported.')
};

if ($definition->isNullable()) {
Expand Down Expand Up @@ -198,18 +190,4 @@ private function structureElementToArvo(StructureElement $element) : array

throw new RuntimeException($element->toString() . ' is not yet supported.');
}

private function typeFromDefinition(Definition $definition) : string
{
if ($definition->isNullable() && \count($definition->types()) === 2) {
/** @var class-string<Entry> $type */
$type = \current(\array_diff($definition->types(), [NullEntry::class]));
} elseif (\count($definition->types()) === 1) {
$type = \current($definition->types());
} else {
throw new RuntimeException('Union types are not supported by Avro file format. Invalid type: ' . $definition->entry()->name());
}

return $type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function __construct()

public function bench_extract_10k() : void
{
foreach (from_csv(__DIR__ . '/../Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
foreach (from_csv(__DIR__ . '/Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function __construct()
$this->outputPath = \tempnam(\sys_get_temp_dir(), 'etl_csv_loader_bench') . '.csv';
$this->rows = new Rows();

foreach (from_csv(__DIR__ . '/../Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
foreach (from_csv(__DIR__ . '/Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
$this->rows = $this->rows->merge($rows);
}
}
Expand Down
Loading

0 comments on commit e81e228

Please sign in to comment.