diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java index 72927a368..e6db0daa9 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java @@ -82,10 +82,8 @@ public CompletableFuture lookup(TableBucket tableBucket, byte[] keyBytes return lookup.future(); } - public CompletableFuture> prefixLookup( - long tableId, int bucketId, byte[] keyBytes) { - // TODO prefix lookup support partition table (#266) - PrefixLookup prefixLookup = new PrefixLookup(new TableBucket(tableId, bucketId), keyBytes); + public CompletableFuture> prefixLookup(TableBucket tableBucket, byte[] keyBytes) { + PrefixLookup prefixLookup = new PrefixLookup(tableBucket, keyBytes); lookupQueue.appendLookup(prefixLookup); return prefixLookup.future(); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 779fb63ee..6ec40a57a 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -71,7 +71,6 @@ import com.alibaba.fluss.rpc.messages.LimitScanResponse; import com.alibaba.fluss.rpc.metrics.ClientMetricGroup; import com.alibaba.fluss.rpc.protocol.ApiError; -import com.alibaba.fluss.types.DataField; import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.CloseableIterator; @@ -88,6 +87,8 @@ import java.util.function.Supplier; import static com.alibaba.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode; +import static com.alibaba.fluss.metadata.Schema.getKeyRowType; +import static com.alibaba.fluss.utils.Preconditions.checkArgument; /** * The base impl of {@link Table}. @@ -105,12 +106,15 @@ public class FlussTable implements Table { private final TableInfo tableInfo; private final boolean hasPrimaryKey; private final int numBuckets; - private final RowType keyRowType; + private final RowType primaryKeyRowType; + private final @Nullable RowType bucketKeyRowType; // decode the lookup bytes to result row private final ValueDecoder kvValueDecoder; - // a getter to extract partition from key row, null when it's not a partitioned primary key - // table - private final @Nullable PartitionGetter keyRowPartitionGetter; + // a getter to extract partition from primary key row, null when it's not a partitioned primary + // key table + private final @Nullable PartitionGetter primaryKeyRowPartitionGetter; + + private final List partitionKeyNames; private final KeyEncoder bucketKeyEncoder; private final KeyEncoder primaryKeyEncoder; @@ -125,6 +129,10 @@ public class FlussTable implements Table { private volatile SecurityTokenManager securityTokenManager; private @Nullable LakeTableBucketAssigner lakeTableBucketAssigner; + private volatile boolean prefixLookupInitialized; + // a getter to extract partition from bucket key row, null when it's not a partitioned primary + // key table. + private @Nullable PartitionGetter bucketKeyRowPartitionGetter; public FlussTable( Configuration conf, @@ -151,10 +159,10 @@ public FlussTable( RowType rowType = schema.toRowType(); this.hasPrimaryKey = tableDescriptor.hasPrimaryKey(); this.numBuckets = metadataUpdater.getBucketCount(tablePath); - this.keyRowType = getKeyRowType(schema, schema.getPrimaryKeyIndexes()); - this.keyRowPartitionGetter = + this.primaryKeyRowType = getKeyRowType(schema, schema.getPrimaryKeyIndexes()); + this.primaryKeyRowPartitionGetter = tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey() - ? new PartitionGetter(keyRowType, tableDescriptor.getPartitionKeys()) + ? new PartitionGetter(primaryKeyRowType, tableDescriptor.getPartitionKeys()) : null; this.closed = new AtomicBoolean(false); this.kvValueDecoder = @@ -165,13 +173,27 @@ public FlussTable( this.primaryKeyEncoder = KeyEncoder.createKeyEncoder( - keyRowType, keyRowType.getFieldNames(), tableDescriptor.getPartitionKeys()); + primaryKeyRowType, + primaryKeyRowType.getFieldNames(), + tableDescriptor.getPartitionKeys()); int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes(); if (bucketKeyIndexes.length != 0) { - this.bucketKeyEncoder = new KeyEncoder(getKeyRowType(schema, bucketKeyIndexes)); + this.bucketKeyRowType = getKeyRowType(schema, bucketKeyIndexes); + this.bucketKeyEncoder = + KeyEncoder.createKeyEncoder( + bucketKeyRowType, + bucketKeyRowType.getFieldNames(), + tableDescriptor.getPartitionKeys()); } else { + this.bucketKeyRowType = null; this.bucketKeyEncoder = primaryKeyEncoder; } + + this.partitionKeyNames = + tableDescriptor.isPartitioned() + ? tableDescriptor.getPartitionKeys() + : new ArrayList<>(); + prefixLookupInitialized = false; } @Override @@ -189,7 +211,7 @@ public CompletableFuture lookup(InternalRow key) { // a row byte[] pkBytes = primaryKeyEncoder.encode(key); byte[] bkBytes = bucketKeyEncoder.encode(key); - Long partitionId = keyRowPartitionGetter == null ? null : getPartitionId(key); + Long partitionId = primaryKeyRowPartitionGetter == null ? null : getPartitionId(key); int bucketId = getBucketId(bkBytes, key); TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); return lookupClientSupplier @@ -207,17 +229,20 @@ public CompletableFuture lookup(InternalRow key) { @Override public CompletableFuture prefixLookup(InternalRow bucketKey) { - if (!hasPrimaryKey) { - throw new FlussRuntimeException( - String.format("None-pk table %s don't support prefix lookup", tablePath)); - } - // TODO: add checks the bucket key is prefix of primary key + maybeInitPrefixLookup(); byte[] prefixKeyBytes = bucketKeyEncoder.encode(bucketKey); int bucketId = getBucketId(prefixKeyBytes, bucketKey); + + Long partitionId = null; + if (bucketKeyRowPartitionGetter != null) { + partitionId = getPartitionId(bucketKey); + } + + TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); return lookupClientSupplier .get() - .prefixLookup(tableId, bucketId, prefixKeyBytes) + .prefixLookup(tableBucket, prefixKeyBytes) .thenApply( result -> { List rowList = new ArrayList<>(); @@ -238,7 +263,7 @@ private int getBucketId(byte[] keyBytes, InternalRow key) { if (lakeTableBucketAssigner == null) { lakeTableBucketAssigner = new LakeTableBucketAssigner( - keyRowType, + primaryKeyRowType, tableInfo.getTableDescriptor().getBucketKey(), numBuckets); } @@ -367,22 +392,14 @@ private void addScanRecord( * throw {@link PartitionNotExistException}. */ private Long getPartitionId(InternalRow row) { - Preconditions.checkNotNull(keyRowPartitionGetter, "partitionGetter shouldn't be null."); - String partitionName = keyRowPartitionGetter.getPartition(row); + Preconditions.checkNotNull( + primaryKeyRowPartitionGetter, "partitionGetter shouldn't be null."); + String partitionName = primaryKeyRowPartitionGetter.getPartition(row); PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath); return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath); } - private RowType getKeyRowType(Schema schema, int[] keyIndexes) { - List keyRowFields = new ArrayList<>(keyIndexes.length); - List rowFields = schema.toRowType().getFields(); - for (int index : keyIndexes) { - keyRowFields.add(rowFields.get(index)); - } - return new RowType(keyRowFields); - } - @Override public AppendWriter getAppendWriter() { if (hasPrimaryKey) { @@ -492,4 +509,52 @@ private void mayPrepareRemoteFileDownloader() { } } } + + private void maybeInitPrefixLookup() { + if (!prefixLookupInitialized) { + synchronized (this) { + if (!prefixLookupInitialized) { + if (!hasPrimaryKey) { + throw new FlussRuntimeException( + String.format( + "None-pk table %s don't support prefix lookup", tablePath)); + } + + checkArgument(bucketKeyRowType != null, "bucketKeyRowType shouldn't be null."); + List primaryKeyNames = primaryKeyRowType.getFieldNames(); + List bucketKeyNames = bucketKeyRowType.getFieldNames(); + + for (int i = 0; i < bucketKeyNames.size(); i++) { + if (!bucketKeyNames.get(i).equals(primaryKeyNames.get(i))) { + throw new FlussRuntimeException( + String.format( + "To do prefix lookup, the bucket keys must be the prefix subset of " + + "primary keys, but the bucket keys are %s and the primary " + + "keys are %s for table %s", + bucketKeyNames, primaryKeyNames, tablePath)); + } + } + + partitionKeyNames.forEach( + partitionKey -> { + if (!bucketKeyNames.contains(partitionKey)) { + throw new FlussRuntimeException( + String.format( + "To do prefix lookup for partitioned primary key table, the " + + "partition keys must be in bucket keys, but the bucket " + + "keys are %s and the partition keys are %s for table %s", + bucketKeyNames, partitionKeyNames, tablePath)); + } + }); + + this.bucketKeyRowPartitionGetter = + partitionKeyNames.size() > 0 + ? new PartitionGetter(bucketKeyRowType, partitionKeyNames) + : null; + + prefixLookupInitialized = true; + } + } + } + } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java index fe84f76e0..1482318ef 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java @@ -33,6 +33,8 @@ import java.util.BitSet; import java.util.concurrent.CompletableFuture; +import static com.alibaba.fluss.metadata.Schema.getKeyRowType; + /** * The writer to write data to the primary key table. * @@ -69,7 +71,12 @@ public UpsertWriter( this.bucketKeyEncoder = null; } else { int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes(); - this.bucketKeyEncoder = new KeyEncoder(rowType, bucketKeyIndexes); + RowType bucketKeyRowType = getKeyRowType(schema, bucketKeyIndexes); + this.bucketKeyEncoder = + KeyEncoder.createKeyEncoder( + rowType, + bucketKeyRowType.getFieldNames(), + tableDescriptor.getPartitionKeys()); } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java index 3de534c53..8cb2472d2 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java @@ -25,14 +25,17 @@ import com.alibaba.fluss.client.scanner.log.LogScanner; import com.alibaba.fluss.client.scanner.log.ScanRecords; import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.writer.UpsertWriter; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.indexed.IndexedRow; import com.alibaba.fluss.server.testutils.FlussClusterExtension; import com.alibaba.fluss.types.RowType; @@ -48,6 +51,8 @@ import java.util.List; import java.util.Map; +import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; +import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; import static com.alibaba.fluss.testutils.InternalRowAssert.assertThatRow; import static org.assertj.core.api.Assertions.assertThat; @@ -237,4 +242,22 @@ protected static void verifyRows( } } } + + protected static void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) + throws Exception { + // put data. + InternalRow row = compactedRow(tableSchema.toRowType(), fields); + UpsertWriter upsertWriter = table.getUpsertWriter(); + // put data. + upsertWriter.upsert(row); + upsertWriter.flush(); + // lookup this key. + IndexedRow keyRow = keyRow(tableSchema, fields); + assertThat(lookupRow(table, keyRow)).isEqualTo(row); + } + + protected static InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception { + // lookup this key. + return table.lookup(keyRow).get().getRow(); + } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java index 09e16589d..096837d96 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.client.table; import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.lookup.PrefixLookupResult; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -25,6 +26,7 @@ import com.alibaba.fluss.client.table.writer.UpsertWriter; import com.alibaba.fluss.config.AutoPartitionTimeUnit; import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.exception.PartitionNotExistException; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; @@ -33,6 +35,7 @@ import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.types.RowType; import org.junit.jupiter.api.Test; @@ -41,9 +44,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; import static com.alibaba.fluss.testutils.DataTestUtils.row; @@ -92,6 +97,111 @@ void testPartitionedPrimaryKeyTable() throws Exception { verifyPartitionLogs(table, schema.toRowType(), expectPutRows); } + @Test + void testPartitionedTablePrefixLookup() throws Exception { + // This case partition key 'b' in both pk and bucket key (prefix key). + TablePath tablePath = TablePath.of("test_db_1", "test_partitioned_table_prefix_lookup"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(3, "a", "b") + .partitionedBy("b") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.YEAR) + .build(); + RowType rowType = schema.toRowType(); + createTable(tablePath, descriptor, false); + Map partitionIdByNames = + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); + + Table table = conn.getTable(tablePath); + for (String partition : partitionIdByNames.keySet()) { + verifyPutAndLookup(table, schema, new Object[] {1, partition, 1L, "value1"}); + verifyPutAndLookup(table, schema, new Object[] {1, partition, 2L, "value2"}); + } + + // test prefix lookup. + Schema prefixKeySchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + for (String partition : partitionIdByNames.keySet()) { + CompletableFuture result = + table.prefixLookup( + compactedRow(prefixKeySchema.toRowType(), new Object[] {1, partition})); + PrefixLookupResult prefixLookupResult = result.get(); + assertThat(prefixLookupResult).isNotNull(); + List rowList = prefixLookupResult.getRowList(); + assertThat(rowList.size()).isEqualTo(2); + assertRowValueEquals( + rowType, rowList.get(0), new Object[] {1, partition, 1L, "value1"}); + assertRowValueEquals( + rowType, rowList.get(1), new Object[] {1, partition, 2L, "value2"}); + } + } + + @Test + void testPrefixLookupWithPartitionKeysNotInBucketKeys() throws Exception { + // This case partition key 'c' only in pk but not in bucket key (prefix key). + TablePath tablePath = TablePath.of("test_db_1", "test_partitioned_table_prefix_lookup2"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.BIGINT()) + .column("c", DataTypes.STRING()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(3, "a", "b") + .partitionedBy("c") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.YEAR) + .build(); + createTable(tablePath, descriptor, false); + Map partitionIdByNames = + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); + + Table table = conn.getTable(tablePath); + for (String partition : partitionIdByNames.keySet()) { + verifyPutAndLookup(table, schema, new Object[] {1, 1L, partition, "value1"}); + } + + // test prefix lookup. + Schema prefixKeySchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.BIGINT()) + .build(); + assertThatThrownBy( + () -> + table.prefixLookup( + compactedRow( + prefixKeySchema.toRowType(), + new Object[] {1, 1L})) + .get()) + .isInstanceOf(FlussRuntimeException.class) + .hasMessageContaining( + "To do prefix lookup for partitioned primary key table, the partition keys" + + " must be in bucket keys, but the bucket keys are [a, b] and the " + + "partition keys are [c] for table test_db_1.test_partitioned_table_prefix_lookup2"); + } + @Test void testPartitionedLogTable() throws Exception { Schema schema = createPartitionedTable(DATA1_TABLE_PATH, false); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index e4cd9ca9c..cd82b8c78 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -31,6 +31,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.MemorySize; +import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.MergeEngine; @@ -277,6 +278,42 @@ void testPutAndPrefixLookup() throws Exception { assertThat(rowList.size()).isEqualTo(0); } + @Test + void testInvalidPrefixLookup() throws Exception { + // First, test the bucket keys not a prefix subset of primary keys. + TablePath tablePath = TablePath.of("test_db_1", "test_invalid_prefix_lookup_1"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.BIGINT()) + .column("c", DataTypes.STRING()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(3, "a", "c").build(); + createTable(tablePath, descriptor, false); + Table table = conn.getTable(tablePath); + + Schema prefixKeySchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("c", DataTypes.STRING()) + .build(); + assertThatThrownBy( + () -> + table.prefixLookup( + compactedRow( + prefixKeySchema.toRowType(), + new Object[] {1, "hello"})) + .get()) + .isInstanceOf(FlussRuntimeException.class) + .hasMessageContaining( + "To do prefix lookup, the bucket keys must be the prefix subset of primary keys, " + + "but the bucket keys are [a, c] and the primary keys are [a, b, c] for " + + "table test_db_1.test_invalid_prefix_lookup_1"); + } + @Test void testLookupForNotReadyTable() throws Exception { TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1"); @@ -397,23 +434,6 @@ void testLimitScanLogTable() throws Exception { } } - void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) throws Exception { - // put data. - InternalRow row = compactedRow(tableSchema.toRowType(), fields); - UpsertWriter upsertWriter = table.getUpsertWriter(); - // put data. - upsertWriter.upsert(row); - upsertWriter.flush(); - // lookup this key. - IndexedRow keyRow = keyRow(tableSchema, fields); - assertThat(lookupRow(table, keyRow)).isEqualTo(row); - } - - private InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception { - // lookup this key. - return table.lookup(keyRow).get().getRow(); - } - @Test void testPartialPutAndDelete() throws Exception { Schema schema = diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java index 4e60ae249..56da1505f 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java @@ -491,4 +491,13 @@ private static Set duplicate(List names) { .filter(name -> Collections.frequency(names, name) > 1) .collect(Collectors.toSet()); } + + public static RowType getKeyRowType(Schema schema, int[] keyIndexes) { + List keyRowFields = new ArrayList<>(keyIndexes.length); + List rowFields = schema.toRowType().getFields(); + for (int index : keyIndexes) { + keyRowFields.add(rowFields.get(index)); + } + return new RowType(keyRowFields); + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 161d89013..69d8684d3 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -113,8 +113,8 @@ private TableDescriptor( f)); } - if (tableDistribution != null) { - tableDistribution + if (this.tableDistribution != null) { + this.tableDistribution .getBucketKeys() .forEach( f -> @@ -409,13 +409,6 @@ private static TableDistribution normalizeDistribution( if (originDistribution != null) { // we may need to check and normalize bucket key List bucketKeys = originDistribution.getBucketKeys(); - // bucket key shouldn't include partition key - if (bucketKeys.stream().anyMatch(partitionKeys::contains)) { - throw new IllegalArgumentException( - String.format( - "Bucket key %s shouldn't include any column in partition keys %s.", - bucketKeys, partitionKeys)); - } // if primary key set if (schema.getPrimaryKey().isPresent()) { diff --git a/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java b/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java index cec8894a6..35ea95dcb 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java @@ -174,18 +174,6 @@ void testPrimaryKeyDifferentWithBucketKeys() { "Bucket keys must be a subset of primary keys excluding partition keys for primary-key tables. " + "The primary keys are [f0, f3], the partition keys are [], " + "but the user-defined bucket keys are [f0, f1]."); - - // bucket key shouldn't include partition key - assertThatThrownBy( - () -> - TableDescriptor.builder() - .schema(SCHEMA_1) - .partitionedBy("f0") - .distributedBy(3, "f0", "f3") - .build()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage( - "Bucket key [f0, f3] shouldn't include any column in partition keys [f0]."); } @Test @@ -319,18 +307,6 @@ void testPartitionedTable() { TableDescriptor.builder().schema(SCHEMA_1).partitionedBy("f0").build(); assertThat(tableDescriptor.getTableDistribution().get().getBucketKeys()) .isEqualTo(Collections.singletonList("f3")); - - // bucket key contains partitioned key, throw exception - assertThatThrownBy( - () -> - TableDescriptor.builder() - .schema(SCHEMA_1) - .partitionedBy("f0") - .distributedBy(1, "f0", "f3") - .build()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage( - "Bucket key [f0, f3] shouldn't include any column in partition keys [f0]."); } @Test