Skip to content

Commit

Permalink
support primary key table
Browse files Browse the repository at this point in the history
  • Loading branch information
hdygxsj committed Jan 26, 2025
1 parent e1ab8dc commit ef3a906
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,21 @@ private List<String> getPrimaryKeysFromIndexes(Index[] indexes) {
if (indexes == null || indexes.length == 0) {
return Collections.emptyList();
}

Preconditions.checkArgument(
indexes.length == 1, "Paimon only supports no more than one Index.");

Index primaryKeyIndex = indexes[0];
return Arrays.stream(primaryKeyIndex.fieldNames()).map(e -> e[0]).collect(Collectors.toList());
Arrays.stream(primaryKeyIndex.fieldNames())
.forEach(
filedName ->
Preconditions.checkArgument(
filedName != null && filedName.length == 1,
"The primary key columns should not be nested."));

return Arrays.stream(primaryKeyIndex.fieldNames())
.map(fieldName -> fieldName[0])
.collect(Collectors.toList());
}

private static Index[] constructIndexesFromPrimaryKeys(Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig

try {

Index[] indices = getIndices(resolvedTable);
Index[] indices = getGrivatinoIndeics(resolvedTable);
catalog()
.asTableCatalog()
.createTable(
Expand All @@ -308,14 +308,16 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private static Index[] getIndices(ResolvedCatalogBaseTable<?> resolvedTable) {
private static Index[] getGrivatinoIndeics(ResolvedCatalogBaseTable<?> resolvedTable) {
Optional<UniqueConstraint> primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey();
List<String> primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null);
if (primaryColumns == null) {
return new Index[0];
}
String[][] primaryField =
primaryColumns.stream().map(e -> new String[] {e}).toArray(String[][]::new);
primaryColumns.stream()
.map(primaryColumn -> new String[] {primaryColumn})
.toArray(String[][]::new);
Index primary = Indexes.primary("primary", primaryField);
return new Index[] {primary};
}
Expand Down Expand Up @@ -552,17 +554,30 @@ protected CatalogBaseTable toFlinkTable(Table table) {
.column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull())
.withComment(column.comment());
}
Index[] indices = table.index();
if (indices != null && indices.length == 1) {
builder.primaryKey(
Arrays.stream(indices[0].fieldNames()).map(arr -> arr[0]).collect(Collectors.toList()));
}
handleFlinkPrimaryKey(table, builder);
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
}

private static void handleFlinkPrimaryKey(
Table table, org.apache.flink.table.api.Schema.Builder builder) {
List<Index> primaryKeyList =
Arrays.stream(table.index())
.filter(index -> index.type() == Index.IndexType.PRIMARY_KEY)
.collect(Collectors.toList());
if (primaryKeyList.isEmpty()) {
return;
}
Preconditions.checkArgument(
primaryKeyList.size() == 1, "More than one primary key is not supported.");
builder.primaryKey(
Arrays.stream(primaryKeyList.get(0).fieldNames())
.map(fieldNames -> fieldNames[0])
.collect(Collectors.toList()));
}

private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
return Column.of(
column.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected boolean supportGetSchemaWithoutCommentAndOption() {

protected abstract boolean supportDropCascade();

protected boolean supportCreateTableWithPrimaryKey() {
protected boolean supportsPrimaryKey() {
return true;
}

Expand Down Expand Up @@ -286,7 +286,7 @@ public void testCreateSimpleTable() {
}

@Test
@EnabledIf("supportCreateTableWithPrimaryKey")
@EnabledIf("supportsPrimaryKey")
public void testCreateTableWithPrimaryKey() {
String databaseName = "test_create_table_with_primary_key_db";
String tableName = "test_create_primary_key_table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
private static org.apache.gravitino.Catalog hiveCatalog;

@Override
protected boolean supportCreateTableWithPrimaryKey() {
protected boolean supportsPrimaryKey() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT {
private static org.apache.gravitino.Catalog icebergCatalog;

@Override
protected boolean supportCreateTableWithPrimaryKey() {
protected boolean supportsPrimaryKey() {
return false;
}

Expand Down

0 comments on commit ef3a906

Please sign in to comment.