Skip to content

Commit

Permalink
Updated parquet thrift definitions (#1251)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Oct 25, 2024
1 parent c3e8671 commit f091008
Show file tree
Hide file tree
Showing 73 changed files with 5,781 additions and 512 deletions.
6 changes: 6 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@
"build:docs": [
"bin/docs.php dsl:dump web/landing/resources/dsl.json"
],
"build:parquet:thrift": [
"grep -q 'namespace php Flow.Parquet.Thrift' src/lib/parquet/src/Flow/Parquet/Resources/Thrift/parquet.thrift || { echo \"Flow php namespace not found in thrift definition!\"; exit 1; }\n",
"rm src/lib/parquet/src/Flow/Parquet/Thrift/*.php",
"thrift --gen php --out src/lib/parquet/src src/lib/parquet/src/Flow/Parquet/Resources/Thrift/parquet.thrift",
"@cs:php:fix"
],
"pre-autoload-dump": [
"Google\\Task\\Composer::cleanup"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ public function withSchema(Schema $schema) : self
private function inferSchema(Rows $rows) : void
{
if ($this->inferredSchema === null) {
$this->inferredSchema = $rows->schema();
$this->inferredSchema = $rows->schema()->makeNullable();
} else {
$this->inferredSchema = $this->inferredSchema->merge($rows->schema());
$this->inferredSchema = $this->inferredSchema->merge($rows->schema())->makeNullable();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Flow\ETL\PHP\Type\Logical\{DateTimeType, JsonType, ListType, MapType, StructureType, UuidType, XMLElementType, XMLType};
use Flow\ETL\PHP\Type\Native\{ObjectType, ScalarType};
use Flow\ETL\PHP\Type\Type;
use Flow\ETL\PHP\Value\Uuid;
use Flow\ETL\Row\{Schema};
use Flow\Parquet\ParquetFile\Schema as ParquetSchema;
use Flow\Parquet\ParquetFile\Schema\{Column, FlatColumn, ListElement, NestedColumn};
Expand Down Expand Up @@ -65,42 +66,43 @@ private function flowListToParquetList(ListType $type) : ListElement
case ScalarType::class:
switch ($element->type()) {
case ScalarType::FLOAT:
return ListElement::float();
return ListElement::float(!$type->nullable());
case ScalarType::INTEGER:
return ListElement::int64();
return ListElement::int64(!$type->nullable());
case ScalarType::STRING:
return ListElement::string();
return ListElement::string(!$type->nullable());
case ScalarType::BOOLEAN:
return ListElement::boolean();
return ListElement::boolean(!$type->nullable());
}

break;
case DateTimeType::class:
return ListElement::datetime();
return ListElement::datetime(!$type->nullable());
case UuidType::class:
return ListElement::uuid();
return ListElement::uuid(!$type->nullable());
case JsonType::class:
return ListElement::json();
return ListElement::json(!$type->nullable());
case XMLType::class:
case XMLElementType::class:
return ListElement::string();
return ListElement::string(!$type->nullable());
case ObjectType::class:
$class = $element->class;

if ($class === \DateInterval::class) {
return ListElement::time();
return ListElement::time(!$type->nullable());
}

throw new \Flow\Parquet\Exception\RuntimeException($class . ' can\'t be converted to any parquet columns.');
case ListType::class:
return ListElement::list($this->flowListToParquetList($element));
return ListElement::list($this->flowListToParquetList($element), !$type->nullable());
case MapType::class:
return ListElement::map(
$this->flowMapKeyToParquetMapKey($element->key()),
$this->flowMapValueToParquetMapValue($element->value())
$this->flowMapValueToParquetMapValue($element->value()),
!$type->nullable()
);
case StructureType::class:
return ListElement::structure($this->flowStructureToParquetStructureElements($element));
return ListElement::structure($this->flowStructureToParquetStructureElements($element), !$type->nullable());
}

throw new RuntimeException($element::class . ' is not supported.');
Expand Down Expand Up @@ -141,50 +143,51 @@ private function flowMapValueToParquetMapValue(MapValue $mapValue) : ParquetSche
case ScalarType::class:
switch ($mapValueType->type()) {
case ScalarType::FLOAT:
return ParquetSchema\MapValue::float();
return ParquetSchema\MapValue::float(!$mapValueType->nullable());
case ScalarType::INTEGER:
return ParquetSchema\MapValue::int64();
return ParquetSchema\MapValue::int64(!$mapValueType->nullable());
case ScalarType::STRING:
return ParquetSchema\MapValue::string();
return ParquetSchema\MapValue::string(!$mapValueType->nullable());
case ScalarType::BOOLEAN:
return ParquetSchema\MapValue::boolean();
return ParquetSchema\MapValue::boolean(!$mapValueType->nullable());
}

break;
case UuidType::class:
return ParquetSchema\MapValue::uuid();
return ParquetSchema\MapValue::uuid(!$mapValueType->nullable());
case DateTimeType::class:
return ParquetSchema\MapValue::datetime();
return ParquetSchema\MapValue::datetime(!$mapValueType->nullable());
case JsonType::class:
return ParquetSchema\MapValue::json();
return ParquetSchema\MapValue::json(!$mapValueType->nullable());
case XMLType::class:
case XMLElementType::class:
return ParquetSchema\MapValue::string();
return ParquetSchema\MapValue::string(!$mapValueType->nullable());
case ObjectType::class:
$class = $mapValueType->class;

if (\is_a($class, \DateTimeInterface::class, true)) {
return ParquetSchema\MapValue::datetime();
return ParquetSchema\MapValue::datetime(!$mapValueType->nullable());
}

if ($class === \Flow\ETL\PHP\Value\Uuid::class) {
return ParquetSchema\MapValue::string();
if ($class === Uuid::class) {
return ParquetSchema\MapValue::string(!$mapValueType->nullable());
}

if ($class === \DateInterval::class) {
return ParquetSchema\MapValue::time();
return ParquetSchema\MapValue::time(!$mapValueType->nullable());
}

throw new \Flow\Parquet\Exception\RuntimeException($class . ' can\'t be converted to any parquet columns.');
case ListType::class:
return ParquetSchema\MapValue::list($this->flowListToParquetList($mapValueType));
return ParquetSchema\MapValue::list($this->flowListToParquetList($mapValueType), !$mapValueType->nullable());
case MapType::class:
return ParquetSchema\MapValue::map(
$this->flowMapKeyToParquetMapKey($mapValueType->key()),
$this->flowMapValueToParquetMapValue($mapValueType->value())
$this->flowMapValueToParquetMapValue($mapValueType->value()),
!$mapValueType->nullable()
);
case StructureType::class:
return ParquetSchema\MapValue::structure(...$this->flowStructureToParquetStructureElements($mapValueType));
return ParquetSchema\MapValue::structure($this->flowStructureToParquetStructureElements($mapValueType), !$mapValueType->nullable());
}

throw new RuntimeException($mapValueType::class . ' is not supported.');
Expand All @@ -195,7 +198,7 @@ private function flowObjectToParquetFlat(ObjectType $type, string $name) : FlatC
$class = $type->class;

if ($class === \DateInterval::class) {
return FlatColumn::time($name);
return FlatColumn::time($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
}

throw new RuntimeException($type->toString() . ' can\'t be converted to any parquet columns.');
Expand All @@ -205,13 +208,13 @@ private function flowScalarToParquetFlat(ScalarType $type, string $name) : FlatC
{
switch ($type->type()) {
case ScalarType::FLOAT:
return FlatColumn::float($name);
return FlatColumn::float($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
case ScalarType::INTEGER:
return FlatColumn::int64($name);
return FlatColumn::int64($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
case ScalarType::STRING:
return FlatColumn::string($name);
return FlatColumn::string($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
case ScalarType::BOOLEAN:
return FlatColumn::boolean($name);
return FlatColumn::boolean($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);

default:
throw new RuntimeException($type->type() . ' is not supported.');
Expand All @@ -235,26 +238,27 @@ private function flowTypeToParquetType(string $name, Type $type) : Column
case ScalarType::class:
return $this->flowScalarToParquetFlat($type, $name);
case DateTimeType::class:
return FlatColumn::datetime($name);
return FlatColumn::datetime($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
case UuidType::class:
return FlatColumn::uuid($name);
return FlatColumn::uuid($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
case JsonType::class:
return FlatColumn::json($name);
return FlatColumn::json($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
case XMLType::class:
case XMLElementType::class:
return FlatColumn::string($name);
return FlatColumn::string($name, $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
case ObjectType::class:
return $this->flowObjectToParquetFlat($type, $name);
case ListType::class:
return NestedColumn::list($name, $this->flowListToParquetList($type));
return NestedColumn::list($name, $this->flowListToParquetList($type), $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
case MapType::class:
return NestedColumn::map(
$name,
$this->flowMapKeyToParquetMapKey($type->key()),
$this->flowMapValueToParquetMapValue($type->value())
$this->flowMapValueToParquetMapValue($type->value()),
$type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED
);
case StructureType::class:
return NestedColumn::struct($name, $this->flowStructureToParquetStructureElements($type));
return NestedColumn::struct($name, $this->flowStructureToParquetStructureElements($type), $type->nullable() ? ParquetSchema\Repetition::OPTIONAL : ParquetSchema\Repetition::REQUIRED);
}

throw new RuntimeException($type::class . ' is not supported.');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,23 @@ public function test_convert_etl_entries_to_parquet_fields() : void
{
self::assertEquals(
ParquetSchema::with(
FlatColumn::int64('integer'),
FlatColumn::boolean('boolean'),
FlatColumn::string('string'),
FlatColumn::float('float'),
FlatColumn::dateTime('datetime'),
FlatColumn::json('json'),
NestedColumn::list('list', ParquetSchema\ListElement::string()),
FlatColumn::int64('integer', ParquetSchema\Repetition::REQUIRED),
FlatColumn::boolean('boolean', ParquetSchema\Repetition::REQUIRED),
FlatColumn::string('string', ParquetSchema\Repetition::REQUIRED),
FlatColumn::float('float', ParquetSchema\Repetition::REQUIRED),
FlatColumn::dateTime('datetime', ParquetSchema\Repetition::REQUIRED),
FlatColumn::json('json', ParquetSchema\Repetition::REQUIRED),
NestedColumn::list('list', ParquetSchema\ListElement::string(true), ParquetSchema\Repetition::REQUIRED),
NestedColumn::list('list_of_structs', ParquetSchema\ListElement::structure(
[
FlatColumn::int64('integer'),
FlatColumn::boolean('boolean'),
]
)),
NestedColumn::struct('structure', [FlatColumn::string('a')]),
NestedColumn::map('map', ParquetSchema\MapKey::string(), ParquetSchema\MapValue::int64()),
FlatColumn::time('time')
FlatColumn::int64('integer', ParquetSchema\Repetition::REQUIRED),
FlatColumn::boolean('boolean', ParquetSchema\Repetition::REQUIRED),
],
true
), ParquetSchema\Repetition::REQUIRED),
NestedColumn::struct('structure', [FlatColumn::string('a', ParquetSchema\Repetition::REQUIRED)], ParquetSchema\Repetition::REQUIRED),
NestedColumn::map('map', ParquetSchema\MapKey::string(), ParquetSchema\MapValue::int64(true), ParquetSchema\Repetition::REQUIRED),
FlatColumn::time('time', ParquetSchema\Repetition::REQUIRED)
),
(new SchemaConverter())->toParquet(new Schema(
Schema\Definition::integer('integer'),
Expand All @@ -65,7 +66,7 @@ public function test_convert_etl_entries_to_parquet_fields() : void
))),
Schema\Definition::structure('structure', new StructureType([new StructureElement('a', type_string())])),
Schema\Definition::map('map', new MapType(MapKey::string(), MapValue::integer())),
Schema\Definition::object('time', type_object(\DateInterval::class, false))
Schema\Definition::object('time', type_object(\DateInterval::class))
))
);
}
Expand All @@ -76,7 +77,7 @@ public function test_convert_object_entry_to_parquet_array() : void
$this->expectExceptionMessage("object<stdClass> can't be converted to any parquet columns.");

(new SchemaConverter())->toParquet(new Schema(
Schema\Definition::object('object', type_object(\stdClass::class, false))
Schema\Definition::object('object', type_object(\stdClass::class))
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public function test_merging_nested_structures() : void
struct_element('id', type_string()),
struct_element('name', type_float()),
],
true
nullable: true
)),
]),
struct_type([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ protected function execute(InputInterface $input, OutputInterface $output) : int
if ($displayColumns) {
$columnsTable = $style->createTable();
$columnsTable->setStyle('box');
$columnsTable->setHeaderTitle('Flat Columns');
$columnsTable->setHeaderTitle('Columns');
$columnsTable->setHeaders(['path', 'type', 'logical type', 'repetition', 'max repetition', 'max definition']);

foreach ($parquetFile->schema()->columnsFlat() as $column) {
$columnsTable->addRow([
$column->flatPath(),
$column->type() ? $column->type()->name : 'group',
($column->type() ? $column->type()->name : 'group') . ($column->typeLength() ? '(' . $column->typeLength() . ')' : ''),
$column->logicalType() ? $column->logicalType()->name() : '-',
$column->repetition()?->name ?? 'N/A',
$column->maxRepetitionsLevel(),
Expand Down
6 changes: 5 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/ParquetFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ public function metadata() : Metadata
$metadata = $this->stream->read($metadataLength, -($metadataLength + 8));

$thriftMetadata = new FileMetaData();
$thriftMetadata->read(new TCompactProtocol(new TMemoryBuffer($metadata)));
$thriftMetadata->read(
new TCompactProtocol(
new TMemoryBuffer($metadata)
)
);

$this->metadata = Metadata::fromThrift($thriftMetadata, $this->options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public function unpack(FlatColumn $column, int $total) : array
PhysicalType::FIXED_LEN_BYTE_ARRAY => match ($column->logicalType()?->name()) {
/** @phpstan-ignore-next-line */
LogicalType::DECIMAL => $this->reader->readDecimals($total, $column->typeLength(), $column->logicalType()?->decimalData()?->precision(), $column->logicalType()?->decimalData()?->scale()),
LogicalType::UUID => $this->reader->readStrings($total),
default => throw new RuntimeException('Unsupported logical type ' . ($column->logicalType()?->name() ?: 'null') . ' for FIXED_LEN_BYTE_ARRAY'),
},
PhysicalType::BOOLEAN => $this->reader->readBooleans($total),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public function packValues(FlatColumn $column, array $values) : void
break;
case PhysicalType::FIXED_LEN_BYTE_ARRAY:
switch ($column->logicalType()?->name()) {
case LogicalType::UUID:
$this->writer->writeStrings($parquetValues);

break;
case LogicalType::DECIMAL:
/**
* @phpstan-ignore-next-line
Expand All @@ -87,7 +91,6 @@ public function packValues(FlatColumn $column, array $values) : void
break;
case PhysicalType::BYTE_ARRAY:
switch ($column->logicalType()?->name()) {
case LogicalType::UUID:
case LogicalType::JSON:
case LogicalType::STRING:
$this->writer->writeStrings($parquetValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ public static function time(string $name, Repetition $repetition = Repetition::O
return new self($name, PhysicalType::INT64, ConvertedType::TIME_MICROS, LogicalType::time(), $repetition);
}

public static function uuid(string $string, Repetition $repetition = Repetition::OPTIONAL) : self
public static function uuid(string $uuid, Repetition $repetition = Repetition::OPTIONAL) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, null, LogicalType::uuid(), $repetition);
return new self($uuid, PhysicalType::FIXED_LEN_BYTE_ARRAY, null, LogicalType::uuid(), $repetition, typeLength: 16);
}

public function __debugInfo() : ?array
Expand Down
Loading

0 comments on commit f091008

Please sign in to comment.