From 86e1285bf7edad2b157dc352cd50407b8e757dbd Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Mon, 20 Jan 2025 15:03:43 +0800 Subject: [PATCH] address jark's comments --- .../client/lookup/AbstractLookupBatch.java | 8 +- ...ctLookup.java => AbstractLookupQuery.java} | 4 +- .../fluss/client/lookup/FlussLookuper.java | 123 +++++++ .../client/lookup/FlussPrefixLookuper.java | 123 +++++++ .../fluss/client/lookup/LookupBatch.java | 10 +- .../fluss/client/lookup/LookupClient.java | 4 +- .../lookup/{Lookup.java => LookupQuery.java} | 4 +- .../fluss/client/lookup/LookupQueue.java | 22 +- .../fluss/client/lookup/LookupResult.java | 3 +- .../fluss/client/lookup/LookupSender.java | 30 +- .../alibaba/fluss/client/lookup/Lookuper.java | 39 +++ .../fluss/client/lookup/PrefixLookup.java | 41 +-- .../client/lookup/PrefixLookupBatch.java | 10 +- .../client/lookup/PrefixLookupQuery.java | 47 +++ .../client/lookup/PrefixLookupResult.java | 5 +- .../fluss/client/lookup/PrefixLookuper.java | 56 ++++ .../client/scanner/log/FlussLogScanner.java | 2 +- .../fluss/client/table/FlussTable.java | 307 ++++++++---------- .../com/alibaba/fluss/client/table/Table.java | 50 +-- .../client/table/writer/UpsertWriter.java | 9 +- .../fluss/client/utils/ClientUtils.java | 43 +++ .../admin/ClientToServerITCaseBase.java | 8 +- .../fluss/client/lookup/LookupQueueTest.java | 2 +- .../table/FlussPartitionedTableITCase.java | 50 +-- .../fluss/client/table/FlussTableITCase.java | 93 ++++-- .../com/alibaba/fluss/metadata/Schema.java | 9 + .../fluss/metadata/TableDescriptor.java | 19 +- .../fluss/metadata/TableDescriptorTest.java | 24 ++ .../lookup/FlinkAsyncLookupFunction.java | 30 +- .../source/lookup/FlinkLookupFunction.java | 28 +- .../flink/source/FlinkTableSourceITCase.java | 21 ++ 31 files changed, 886 insertions(+), 338 deletions(-) rename fluss-client/src/main/java/com/alibaba/fluss/client/lookup/{AbstractLookup.java => AbstractLookupQuery.java} (91%) create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussLookuper.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussPrefixLookuper.java rename fluss-client/src/main/java/com/alibaba/fluss/client/lookup/{Lookup.java => LookupQuery.java} (91%) create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookuper.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupQuery.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookuper.java diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java index 0dcd52aa6..2b2399add 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java @@ -26,7 +26,7 @@ @Internal public abstract class AbstractLookupBatch { - protected final List> lookups; + protected final List> lookups; private final TableBucket tableBucket; public AbstractLookupBatch(TableBucket tableBucket) { @@ -37,11 +37,11 @@ public AbstractLookupBatch(TableBucket tableBucket) { /** Complete the lookup operations using given values . */ public abstract void complete(List values); - public void addLookup(AbstractLookup lookup) { + public void addLookup(AbstractLookupQuery lookup) { lookups.add(lookup); } - public List> lookups() { + public List> lookups() { return lookups; } @@ -51,7 +51,7 @@ public TableBucket tableBucket() { /** Complete the get operations with given exception. */ public void completeExceptionally(Exception exception) { - for (AbstractLookup lookup : lookups) { + for (AbstractLookupQuery lookup : lookups) { lookup.future().completeExceptionally(exception); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupQuery.java similarity index 91% rename from fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java rename to fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupQuery.java index 9d1e9a2a5..9f50fe95b 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupQuery.java @@ -23,12 +23,12 @@ /** Abstract Class to represent a lookup operation. */ @Internal -public abstract class AbstractLookup { +public abstract class AbstractLookupQuery { private final TableBucket tableBucket; private final byte[] key; - public AbstractLookup(TableBucket tableBucket, byte[] key) { + public AbstractLookupQuery(TableBucket tableBucket, byte[] key) { this.tableBucket = tableBucket; this.key = key; } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussLookuper.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussLookuper.java new file mode 100644 index 000000000..dbe6fc39b --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussLookuper.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; +import com.alibaba.fluss.client.metadata.MetadataUpdater; +import com.alibaba.fluss.client.table.getter.PartitionGetter; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.row.encode.ValueDecoder; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +import static com.alibaba.fluss.client.utils.ClientUtils.getBucketId; +import static com.alibaba.fluss.client.utils.ClientUtils.getPartitionId; + +/** + * The default impl of {@link Lookuper}. + * + * @since 0.6 + */ +@PublicEvolving +public class FlussLookuper implements Lookuper { + + private final TableInfo tableInfo; + + private final MetadataUpdater metadataUpdater; + + private final LookupClient lookupClient; + + private final KeyEncoder primaryKeyEncoder; + + /** Extract bucket key from lookup key row. */ + private final KeyEncoder bucketKeyEncoder; + + private final boolean isDataLakeEnable; + + private final int numBuckets; + + private final LakeTableBucketAssigner lakeTableBucketAssigner; + + /** a getter to extract partition from lookup key row, null when it's not a partitioned. */ + private @Nullable final PartitionGetter partitionGetter; + + /** Decode the lookup bytes to result row. */ + private final ValueDecoder kvValueDecoder; + + public FlussLookuper( + TableInfo tableInfo, + int numBuckets, + MetadataUpdater metadataUpdater, + LookupClient lookupClient, + KeyEncoder primaryKeyEncoder, + KeyEncoder bucketKeyEncoder, + LakeTableBucketAssigner lakeTableBucketAssigner, + @Nullable PartitionGetter partitionGetter, + ValueDecoder kvValueDecoder) { + this.tableInfo = tableInfo; + this.numBuckets = numBuckets; + this.metadataUpdater = metadataUpdater; + this.lookupClient = lookupClient; + this.primaryKeyEncoder = primaryKeyEncoder; + this.bucketKeyEncoder = bucketKeyEncoder; + this.isDataLakeEnable = tableInfo.getTableDescriptor().isDataLakeEnabled(); + this.lakeTableBucketAssigner = lakeTableBucketAssigner; + this.partitionGetter = partitionGetter; + this.kvValueDecoder = kvValueDecoder; + } + + @Override + public CompletableFuture lookup(InternalRow lookupKey) { + // encoding the key row using a compacted way consisted with how the key is encoded when put + // a row + byte[] pkBytes = primaryKeyEncoder.encode(lookupKey); + byte[] bkBytes = bucketKeyEncoder.encode(lookupKey); + Long partitionId = + partitionGetter == null + ? null + : getPartitionId( + lookupKey, + partitionGetter, + tableInfo.getTablePath(), + metadataUpdater); + int bucketId = + getBucketId( + bkBytes, + lookupKey, + lakeTableBucketAssigner, + isDataLakeEnable, + numBuckets, + metadataUpdater); + TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); + return lookupClient + .lookup(tableBucket, pkBytes) + .thenApply( + valueBytes -> { + InternalRow row = + valueBytes == null + ? null + : kvValueDecoder.decodeValue(valueBytes).row; + return new LookupResult(row); + }); + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussPrefixLookuper.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussPrefixLookuper.java new file mode 100644 index 000000000..6c3dc8288 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussPrefixLookuper.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; +import com.alibaba.fluss.client.metadata.MetadataUpdater; +import com.alibaba.fluss.client.table.getter.PartitionGetter; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.row.encode.ValueDecoder; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static com.alibaba.fluss.client.utils.ClientUtils.getBucketId; +import static com.alibaba.fluss.client.utils.ClientUtils.getPartitionId; + +/** + * The default impl of {@link PrefixLookuper}. + * + * @since 0.6 + */ +@PublicEvolving +public class FlussPrefixLookuper implements PrefixLookuper { + + private final TableInfo tableInfo; + + private final MetadataUpdater metadataUpdater; + + private final LookupClient lookupClient; + + /** Extract bucket key from prefix lookup key row. */ + private final KeyEncoder bucketKeyEncoder; + + private final boolean isDataLakeEnable; + + private final int numBuckets; + + private final LakeTableBucketAssigner lakeTableBucketAssigner; + + /** + * a getter to extract partition from prefix lookup key row, null when it's not a partitioned. + */ + private @Nullable final PartitionGetter partitionGetter; + + /** Decode the lookup bytes to result row. */ + private final ValueDecoder kvValueDecoder; + + public FlussPrefixLookuper( + TableInfo tableInfo, + int numBuckets, + MetadataUpdater metadataUpdater, + LookupClient lookupClient, + KeyEncoder bucketKeyEncoder, + LakeTableBucketAssigner lakeTableBucketAssigner, + @Nullable PartitionGetter partitionGetter, + ValueDecoder kvValueDecoder) { + this.tableInfo = tableInfo; + this.numBuckets = numBuckets; + this.metadataUpdater = metadataUpdater; + this.lookupClient = lookupClient; + this.isDataLakeEnable = tableInfo.getTableDescriptor().isDataLakeEnabled(); + this.lakeTableBucketAssigner = lakeTableBucketAssigner; + this.bucketKeyEncoder = bucketKeyEncoder; + this.partitionGetter = partitionGetter; + this.kvValueDecoder = kvValueDecoder; + } + + @Override + public CompletableFuture prefixLookup(InternalRow prefixKey) { + byte[] prefixKeyBytes = bucketKeyEncoder.encode(prefixKey); + int bucketId = + getBucketId( + prefixKeyBytes, + prefixKey, + lakeTableBucketAssigner, + isDataLakeEnable, + numBuckets, + metadataUpdater); + + Long partitionId = null; + if (partitionGetter != null) { + partitionId = + getPartitionId( + prefixKey, partitionGetter, tableInfo.getTablePath(), metadataUpdater); + } + + TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); + return lookupClient + .prefixLookup(tableBucket, prefixKeyBytes) + .thenApply( + result -> { + List rowList = new ArrayList<>(); + for (byte[] valueBytes : result) { + rowList.add( + valueBytes == null + ? null + : kvValueDecoder.decodeValue(valueBytes).row); + } + return new PrefixLookupResult(rowList); + }); + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java index 751c6eea8..0705636c6 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java @@ -30,18 +30,18 @@ public class LookupBatch { /** The table bucket that the lookup operations should fall into. */ private final TableBucket tableBucket; - private final List lookups; + private final List lookups; public LookupBatch(TableBucket tableBucket) { this.tableBucket = tableBucket; this.lookups = new ArrayList<>(); } - public void addLookup(Lookup lookup) { + public void addLookup(LookupQuery lookup) { lookups.add(lookup); } - public List lookups() { + public List lookups() { return lookups; } @@ -62,7 +62,7 @@ public void complete(List values) { values.size(), lookups.size()))); } else { for (int i = 0; i < values.size(); i++) { - AbstractLookup lookup = lookups.get(i); + AbstractLookupQuery lookup = lookups.get(i); // single value. lookup.future().complete(values.get(i)); } @@ -71,7 +71,7 @@ public void complete(List values) { /** Complete the lookup operations with given exception. */ public void completeExceptionally(Exception exception) { - for (Lookup lookup : lookups) { + for (LookupQuery lookup : lookups) { lookup.future().completeExceptionally(exception); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java index e6db0daa9..69d59034f 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java @@ -77,13 +77,13 @@ private ExecutorService createThreadPool() { } public CompletableFuture lookup(TableBucket tableBucket, byte[] keyBytes) { - Lookup lookup = new Lookup(tableBucket, keyBytes); + LookupQuery lookup = new LookupQuery(tableBucket, keyBytes); lookupQueue.appendLookup(lookup); return lookup.future(); } public CompletableFuture> prefixLookup(TableBucket tableBucket, byte[] keyBytes) { - PrefixLookup prefixLookup = new PrefixLookup(tableBucket, keyBytes); + PrefixLookupQuery prefixLookup = new PrefixLookupQuery(tableBucket, keyBytes); lookupQueue.appendLookup(prefixLookup); return prefixLookup.future(); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQuery.java similarity index 91% rename from fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java rename to fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQuery.java index 8d340a9d6..f8533e7f6 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQuery.java @@ -26,11 +26,11 @@ * from, the bytes of the key, and a future for the lookup operation. */ @Internal -public class Lookup extends AbstractLookup { +public class LookupQuery extends AbstractLookupQuery { private final CompletableFuture future; - Lookup(TableBucket tableBucket, byte[] key) { + LookupQuery(TableBucket tableBucket, byte[] key) { super(tableBucket, key); this.future = new CompletableFuture<>(); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java index 7cc20e393..ca9419257 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java @@ -28,8 +28,8 @@ import java.util.concurrent.TimeUnit; /** - * A queue that buffers the pending lookup operations and provides a list of {@link Lookup} when - * call method {@link #drain()}. + * A queue that buffers the pending lookup operations and provides a list of {@link LookupQuery} + * when call method {@link #drain()}. */ @ThreadSafe @Internal @@ -37,7 +37,7 @@ class LookupQueue { private volatile boolean closed; // buffering both the Lookup and PrefixLookup. - private final ArrayBlockingQueue> lookupQueue; + private final ArrayBlockingQueue> lookupQueue; private final int maxBatchSize; private final long batchTimeoutNanos; @@ -49,7 +49,7 @@ class LookupQueue { this.closed = false; } - void appendLookup(AbstractLookup lookup) { + void appendLookup(AbstractLookupQuery lookup) { if (closed) { throw new IllegalStateException( "Can not append lookup operation since the LookupQueue is closed."); @@ -66,10 +66,10 @@ boolean hasUnDrained() { return !lookupQueue.isEmpty(); } - /** Drain a batch of {@link Lookup}s from the lookup queue. */ - List> drain() throws Exception { + /** Drain a batch of {@link LookupQuery}s from the lookup queue. */ + List> drain() throws Exception { final long startNanos = System.nanoTime(); - List> lookupOperations = new ArrayList<>(maxBatchSize); + List> lookupOperations = new ArrayList<>(maxBatchSize); int count = 0; while (true) { long waitNanos = batchTimeoutNanos - (System.nanoTime() - startNanos); @@ -77,7 +77,7 @@ List> drain() throws Exception { break; } - AbstractLookup lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS); + AbstractLookupQuery lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS); if (lookup == null) { break; } @@ -92,9 +92,9 @@ List> drain() throws Exception { return lookupOperations; } - /** Drain all the {@link Lookup}s from the lookup queue. */ - List> drainAll() { - List> lookupOperations = new ArrayList<>(lookupQueue.size()); + /** Drain all the {@link LookupQuery}s from the lookup queue. */ + List> drainAll() { + List> lookupOperations = new ArrayList<>(lookupQueue.size()); lookupQueue.drainTo(lookupOperations); return lookupOperations; } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java index d31f7bbeb..ab446a575 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java @@ -17,7 +17,6 @@ package com.alibaba.fluss.client.lookup; import com.alibaba.fluss.annotation.PublicEvolving; -import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.row.InternalRow; import javax.annotation.Nullable; @@ -25,7 +24,7 @@ import java.util.Objects; /** - * The result of {@link Table#lookup(InternalRow)}. + * The result of {@link Lookuper#lookup(InternalRow)}. * * @since 0.1 */ diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java index cd3be7be3..668e3f138 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java @@ -102,28 +102,29 @@ public void run() { /** Run a single iteration of sending. */ private void runOnce(boolean drainAll) throws Exception { - List> lookups = drainAll ? lookupQueue.drainAll() : lookupQueue.drain(); + List> lookups = + drainAll ? lookupQueue.drainAll() : lookupQueue.drain(); sendLookups(lookups); } - private void sendLookups(List> lookups) { + private void sendLookups(List> lookups) { if (lookups.isEmpty()) { return; } // group by to lookup batches - Map, List>> lookupBatches = + Map, List>> lookupBatches = groupByLeaderAndType(lookups); // now, send the batches lookupBatches.forEach( (destAndType, batch) -> sendLookups(destAndType.f0, destAndType.f1, batch)); } - private Map, List>> groupByLeaderAndType( - List> lookups) { + private Map, List>> groupByLeaderAndType( + List> lookups) { // -> lookup batches - Map, List>> lookupBatchesByLeader = + Map, List>> lookupBatchesByLeader = new HashMap<>(); - for (AbstractLookup lookup : lookups) { + for (AbstractLookupQuery lookup : lookups) { int leader; // lookup the leader node TableBucket tb = lookup.tableBucket(); @@ -143,7 +144,7 @@ private Map, List>> groupByLeaderA } private void sendLookups( - int destination, LookupType lookupType, List> lookupBatches) { + int destination, LookupType lookupType, List> lookupBatches) { TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination); if (lookupType == LookupType.LOOKUP) { @@ -155,11 +156,12 @@ private void sendLookups( } } - private void sendLookupRequest(TabletServerGateway gateway, List> lookups) { + private void sendLookupRequest( + TabletServerGateway gateway, List> lookups) { // table id -> (bucket -> lookups) Map> lookupByTableId = new HashMap<>(); - for (AbstractLookup abstractLookup : lookups) { - Lookup lookup = (Lookup) abstractLookup; + for (AbstractLookupQuery abstractLookupQuery : lookups) { + LookupQuery lookup = (LookupQuery) abstractLookupQuery; TableBucket tb = lookup.tableBucket(); long tableId = tb.getTableId(); lookupByTableId @@ -178,11 +180,11 @@ private void sendLookupRequest(TabletServerGateway gateway, List> prefixLookups) { + TabletServerGateway gateway, List> prefixLookups) { // table id -> (bucket -> lookups) Map> lookupByTableId = new HashMap<>(); - for (AbstractLookup abstractLookup : prefixLookups) { - PrefixLookup prefixLookup = (PrefixLookup) abstractLookup; + for (AbstractLookupQuery abstractLookupQuery : prefixLookups) { + PrefixLookupQuery prefixLookup = (PrefixLookupQuery) abstractLookupQuery; TableBucket tb = prefixLookup.tableBucket(); long tableId = tb.getTableId(); lookupByTableId diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookuper.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookuper.java new file mode 100644 index 000000000..6d6e5de20 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookuper.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.row.InternalRow; + +import java.util.concurrent.CompletableFuture; + +/** + * The lookup-er is used to lookup data of specify kv table from Fluss. + * + * @since 0.6 + */ +@PublicEvolving +public interface Lookuper { + + /** + * Lookups certain row from the given table primary keys. + * + * @param lookupKey the given table primary keys. + * @return the result of get. + */ + CompletableFuture lookup(InternalRow lookupKey); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java index a9f1050d9..a35f5351c 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java @@ -16,32 +16,33 @@ package com.alibaba.fluss.client.lookup; -import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.metadata.TableBucket; - -import java.util.List; -import java.util.concurrent.CompletableFuture; +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.row.InternalRow; /** - * Class to represent a prefix lookup operation, it contains the table id, bucketNums and related - * CompletableFuture. + * Used to describe the operation to prefix lookup by {@link PrefixLookuper} to a kv table. + * + * @since 0.6 */ -@Internal -public class PrefixLookup extends AbstractLookup> { - private final CompletableFuture> future; +@PublicEvolving +public class PrefixLookup { - PrefixLookup(TableBucket tableBucket, byte[] prefixKey) { - super(tableBucket, prefixKey); - this.future = new CompletableFuture<>(); - } + /** + * Currently, For none-partition table, the lookupColumnNames can only be the field of bucket + * key. + * + *

For partition table, the lookupColumnNames exclude partition fields should be a prefix of + * primary key exclude partition fields. + * + *

See {@link PrefixLookuper#prefixLookup(InternalRow)} for more details. + */ + private final String[] lookupColumnNames; - @Override - public CompletableFuture> future() { - return future; + public PrefixLookup(String[] lookupColumnNames) { + this.lookupColumnNames = lookupColumnNames; } - @Override - public LookupType lookupType() { - return LookupType.PREFIX_LOOKUP; + public String[] getLookupColumnNames() { + return lookupColumnNames; } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java index 6c9736816..b234175e8 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java @@ -33,18 +33,18 @@ public class PrefixLookupBatch { /** The table bucket that the lookup operations should fall into. */ private final TableBucket tableBucket; - private final List prefixLookups; + private final List prefixLookups; public PrefixLookupBatch(TableBucket tableBucket) { this.tableBucket = tableBucket; this.prefixLookups = new ArrayList<>(); } - public void addLookup(PrefixLookup lookup) { + public void addLookup(PrefixLookupQuery lookup) { prefixLookups.add(lookup); } - public List lookups() { + public List lookups() { return prefixLookups; } @@ -62,7 +62,7 @@ public void complete(List> values) { values.size(), prefixLookups.size()))); } else { for (int i = 0; i < values.size(); i++) { - AbstractLookup> lookup = prefixLookups.get(i); + AbstractLookupQuery> lookup = prefixLookups.get(i); lookup.future().complete(values.get(i)); } } @@ -70,7 +70,7 @@ public void complete(List> values) { /** Complete the get operations with given exception. */ public void completeExceptionally(Exception exception) { - for (PrefixLookup prefixLookup : prefixLookups) { + for (PrefixLookupQuery prefixLookup : prefixLookups) { prefixLookup.future().completeExceptionally(exception); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupQuery.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupQuery.java new file mode 100644 index 000000000..0aba576e8 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupQuery.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; +import com.alibaba.fluss.metadata.TableBucket; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Class to represent a prefix lookup operation, it contains the table id, bucketNums and related + * CompletableFuture. + */ +@Internal +public class PrefixLookupQuery extends AbstractLookupQuery> { + private final CompletableFuture> future; + + PrefixLookupQuery(TableBucket tableBucket, byte[] prefixKey) { + super(tableBucket, prefixKey); + this.future = new CompletableFuture<>(); + } + + @Override + public CompletableFuture> future() { + return future; + } + + @Override + public LookupType lookupType() { + return LookupType.PREFIX_LOOKUP; + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java index 3a6e31869..8b93e8092 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java @@ -16,16 +16,17 @@ package com.alibaba.fluss.client.lookup; -import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.row.InternalRow; import java.util.List; /** - * The result of {@link Table#prefixLookup(InternalRow)}}. + * The result of {@link PrefixLookuper#prefixLookup(InternalRow)}}. * * @since 0.6 */ +@PublicEvolving public class PrefixLookupResult { private final List rowList; diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookuper.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookuper.java new file mode 100644 index 000000000..49a26b079 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookuper.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.row.InternalRow; + +import java.util.concurrent.CompletableFuture; + +/** + * The prefix lookup-er is used to prefix lookup data of specify kv table from Fluss. + * + * @since 0.6 + */ +@PublicEvolving +public interface PrefixLookuper { + + /** + * Prefix lookup certain rows from the given table by prefix key. + * + *

Only available for Primary Key Table. Will throw exception when the table isn't a Primary + * Key Table. + * + *

Note: Currently, if you want to use prefix lookup, the table you created must both define + * the primary key and the bucket key, in addition, the prefixKey needs to be equals with bucket + * key, and to be a part of the primary key and must be a prefix of the primary key. For + * example, if a table has fields [a,b,c,d], and the primary key is set to [a, b, c], with the + * bucket key set to [a, b], then the schema of the prefixKey would also be [a, b]. This pattern + * can use PrefixLookup to lookup by prefix scan. + * + *

TODO: currently, the interface only support bucket key as the prefixKey to lookup. + * Generalize the prefix lookup to support any prefixKey including bucket key. + * + *

We also support prefix lookup for partition table. The schema of the prefixKey should + * contain partition fields and bucket key. In addition, the schema of the prefixKey exclude + * partition fields should be a prefix of primary key exclude partition fields. + * + * @param prefixKey the given prefix key to do prefix lookup. + * @return the result of prefix lookup. + */ + CompletableFuture prefixLookup(InternalRow prefixKey); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java index 0d98129e6..1dd395133 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java @@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong; /** - * The default impl of {@link FlussLogScanner}. + * The default impl of {@link LogScanner}. * *

The {@link FlussLogScanner} is NOT thread-safe. It is the responsibility of the user to ensure * that multithreaded access is properly synchronized. Un-synchronized access will result in {@link diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 6ec40a57a..88e0d320d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -18,9 +18,12 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; +import com.alibaba.fluss.client.lookup.FlussLookuper; +import com.alibaba.fluss.client.lookup.FlussPrefixLookuper; import com.alibaba.fluss.client.lookup.LookupClient; -import com.alibaba.fluss.client.lookup.LookupResult; -import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.metadata.MetadataUpdater; import com.alibaba.fluss.client.scanner.RemoteFileDownloader; import com.alibaba.fluss.client.scanner.ScanRecord; @@ -37,13 +40,10 @@ import com.alibaba.fluss.client.token.DefaultSecurityTokenProvider; import com.alibaba.fluss.client.token.SecurityTokenManager; import com.alibaba.fluss.client.token.SecurityTokenProvider; -import com.alibaba.fluss.client.write.HashBucketAssigner; import com.alibaba.fluss.client.write.WriterClient; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.FlussRuntimeException; -import com.alibaba.fluss.exception.PartitionNotExistException; -import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; @@ -74,12 +74,12 @@ import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.CloseableIterator; -import com.alibaba.fluss.utils.Preconditions; import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -87,7 +87,6 @@ import java.util.function.Supplier; import static com.alibaba.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode; -import static com.alibaba.fluss.metadata.Schema.getKeyRowType; import static com.alibaba.fluss.utils.Preconditions.checkArgument; /** @@ -99,26 +98,21 @@ public class FlussTable implements Table { private final Configuration conf; - private final long tableId; private final TablePath tablePath; private final RpcClient rpcClient; private final MetadataUpdater metadataUpdater; private final TableInfo tableInfo; private final boolean hasPrimaryKey; private final int numBuckets; + private final Schema schema; + private final TableDescriptor tableDescriptor; + private final RowType rowType; private final RowType primaryKeyRowType; private final @Nullable RowType bucketKeyRowType; // decode the lookup bytes to result row private final ValueDecoder kvValueDecoder; - // a getter to extract partition from primary key row, null when it's not a partitioned primary - // key table - private final @Nullable PartitionGetter primaryKeyRowPartitionGetter; - private final List partitionKeyNames; - private final KeyEncoder bucketKeyEncoder; - private final KeyEncoder primaryKeyEncoder; - private final Supplier writerSupplier; private final Supplier lookupClientSupplier; private final AtomicBoolean closed; @@ -127,12 +121,7 @@ public class FlussTable implements Table { private volatile RemoteFileDownloader remoteFileDownloader; private volatile SecurityTokenManager securityTokenManager; - - private @Nullable LakeTableBucketAssigner lakeTableBucketAssigner; - private volatile boolean prefixLookupInitialized; - // a getter to extract partition from bucket key row, null when it's not a partitioned primary - // key table. - private @Nullable PartitionGetter bucketKeyRowPartitionGetter; + private volatile LakeTableBucketAssigner lakeTableBucketAssigner; public FlussTable( Configuration conf, @@ -153,17 +142,12 @@ public FlussTable( metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath)); this.tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath); - this.tableId = tableInfo.getTableId(); - TableDescriptor tableDescriptor = tableInfo.getTableDescriptor(); - Schema schema = tableDescriptor.getSchema(); - RowType rowType = schema.toRowType(); + this.tableDescriptor = tableInfo.getTableDescriptor(); + this.schema = tableDescriptor.getSchema(); + this.rowType = schema.toRowType(); this.hasPrimaryKey = tableDescriptor.hasPrimaryKey(); this.numBuckets = metadataUpdater.getBucketCount(tablePath); - this.primaryKeyRowType = getKeyRowType(schema, schema.getPrimaryKeyIndexes()); - this.primaryKeyRowPartitionGetter = - tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey() - ? new PartitionGetter(primaryKeyRowType, tableDescriptor.getPartitionKeys()) - : null; + this.primaryKeyRowType = rowType.project(schema.getPrimaryKeyIndexes()); this.closed = new AtomicBoolean(false); this.kvValueDecoder = new ValueDecoder( @@ -171,29 +155,17 @@ public FlussTable( tableDescriptor.getKvFormat(), rowType.getChildren().toArray(new DataType[0]))); - this.primaryKeyEncoder = - KeyEncoder.createKeyEncoder( - primaryKeyRowType, - primaryKeyRowType.getFieldNames(), - tableDescriptor.getPartitionKeys()); int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes(); if (bucketKeyIndexes.length != 0) { - this.bucketKeyRowType = getKeyRowType(schema, bucketKeyIndexes); - this.bucketKeyEncoder = - KeyEncoder.createKeyEncoder( - bucketKeyRowType, - bucketKeyRowType.getFieldNames(), - tableDescriptor.getPartitionKeys()); + this.bucketKeyRowType = rowType.project(bucketKeyIndexes); } else { this.bucketKeyRowType = null; - this.bucketKeyEncoder = primaryKeyEncoder; } this.partitionKeyNames = tableDescriptor.isPartitioned() ? tableDescriptor.getPartitionKeys() : new ArrayList<>(); - prefixLookupInitialized = false; } @Override @@ -201,77 +173,6 @@ public TableDescriptor getDescriptor() { return tableInfo.getTableDescriptor(); } - @Override - public CompletableFuture lookup(InternalRow key) { - if (!hasPrimaryKey) { - throw new FlussRuntimeException( - String.format("none-pk table %s not support lookup()", tablePath)); - } - // encoding the key row using a compacted way consisted with how the key is encoded when put - // a row - byte[] pkBytes = primaryKeyEncoder.encode(key); - byte[] bkBytes = bucketKeyEncoder.encode(key); - Long partitionId = primaryKeyRowPartitionGetter == null ? null : getPartitionId(key); - int bucketId = getBucketId(bkBytes, key); - TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); - return lookupClientSupplier - .get() - .lookup(tableBucket, pkBytes) - .thenApply( - valueBytes -> { - InternalRow row = - valueBytes == null - ? null - : kvValueDecoder.decodeValue(valueBytes).row; - return new LookupResult(row); - }); - } - - @Override - public CompletableFuture prefixLookup(InternalRow bucketKey) { - maybeInitPrefixLookup(); - - byte[] prefixKeyBytes = bucketKeyEncoder.encode(bucketKey); - int bucketId = getBucketId(prefixKeyBytes, bucketKey); - - Long partitionId = null; - if (bucketKeyRowPartitionGetter != null) { - partitionId = getPartitionId(bucketKey); - } - - TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); - return lookupClientSupplier - .get() - .prefixLookup(tableBucket, prefixKeyBytes) - .thenApply( - result -> { - List rowList = new ArrayList<>(); - for (byte[] valueBytes : result) { - rowList.add( - valueBytes == null - ? null - : kvValueDecoder.decodeValue(valueBytes).row); - } - return new PrefixLookupResult(rowList); - }); - } - - private int getBucketId(byte[] keyBytes, InternalRow key) { - if (!tableInfo.getTableDescriptor().isDataLakeEnabled()) { - return HashBucketAssigner.bucketForRowKey(keyBytes, numBuckets); - } else { - if (lakeTableBucketAssigner == null) { - lakeTableBucketAssigner = - new LakeTableBucketAssigner( - primaryKeyRowType, - tableInfo.getTableDescriptor().getBucketKey(), - numBuckets); - } - return lakeTableBucketAssigner.assignBucket( - keyBytes, key, metadataUpdater.getCluster()); - } - } - @Override public CompletableFuture> limitScan( TableBucket tableBucket, int limit, @Nullable int[] projectedFields) { @@ -386,20 +287,6 @@ private void addScanRecord( } } - /** - * Return the id of the partition the row belongs to. It'll try to update the metadata if the - * partition doesn't exist. If the partition doesn't exist yet after update metadata, it'll - * throw {@link PartitionNotExistException}. - */ - private Long getPartitionId(InternalRow row) { - Preconditions.checkNotNull( - primaryKeyRowPartitionGetter, "partitionGetter shouldn't be null."); - String partitionName = primaryKeyRowPartitionGetter.getPartition(row); - PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); - metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath); - return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath); - } - @Override public AppendWriter getAppendWriter() { if (hasPrimaryKey) { @@ -456,6 +343,68 @@ public SnapshotScanner getSnapshotScanner(SnapshotScan snapshotScan) { snapshotScan); } + @Override + public Lookuper getLookuper() { + // lookup keys equals with primary keys. + if (!hasPrimaryKey) { + throw new FlussRuntimeException( + String.format("none-pk table %s not support lookup()", tablePath)); + } + maybeCreateLakeTableBucketAssigner(); + + PartitionGetter partitionGetter = + tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey() + ? new PartitionGetter(primaryKeyRowType, tableDescriptor.getPartitionKeys()) + : null; + KeyEncoder primaryKeyEncoder = + KeyEncoder.createKeyEncoder( + primaryKeyRowType, + primaryKeyRowType.getFieldNames(), + tableDescriptor.getPartitionKeys()); + checkArgument(bucketKeyRowType != null, "bucketKeyRowType shouldn't be null."); + KeyEncoder bucketKeyEncoder = + KeyEncoder.createKeyEncoder( + primaryKeyRowType, bucketKeyRowType.getFieldNames(), partitionKeyNames); + return new FlussLookuper( + tableInfo, + numBuckets, + metadataUpdater, + lookupClientSupplier.get(), + primaryKeyEncoder, + bucketKeyEncoder, + lakeTableBucketAssigner, + partitionGetter, + kvValueDecoder); + } + + @Override + public PrefixLookuper getPrefixLookuper(PrefixLookup prefixLookup) { + validatePrefixLookup(prefixLookup); + maybeCreateLakeTableBucketAssigner(); + + RowType prefixKeyRowType = + rowType.project( + schema.getColumnIndexes( + Arrays.asList(prefixLookup.getLookupColumnNames()))); + PartitionGetter partitionGetter = + partitionKeyNames.size() > 0 + ? new PartitionGetter(prefixKeyRowType, partitionKeyNames) + : null; + checkArgument(bucketKeyRowType != null, "bucketKeyRowType shouldn't be null."); + KeyEncoder bucketKeyEncoder = + KeyEncoder.createKeyEncoder( + prefixKeyRowType, bucketKeyRowType.getFieldNames(), partitionKeyNames); + return new FlussPrefixLookuper( + tableInfo, + numBuckets, + metadataUpdater, + lookupClientSupplier.get(), + bucketKeyEncoder, + lakeTableBucketAssigner, + partitionGetter, + kvValueDecoder); + } + @Override public void close() throws Exception { if (closed.compareAndSet(false, true)) { @@ -510,51 +459,73 @@ private void mayPrepareRemoteFileDownloader() { } } - private void maybeInitPrefixLookup() { - if (!prefixLookupInitialized) { + private void maybeCreateLakeTableBucketAssigner() { + if (lakeTableBucketAssigner == null) { synchronized (this) { - if (!prefixLookupInitialized) { - if (!hasPrimaryKey) { - throw new FlussRuntimeException( - String.format( - "None-pk table %s don't support prefix lookup", tablePath)); - } + if (lakeTableBucketAssigner == null) { + lakeTableBucketAssigner = + new LakeTableBucketAssigner( + primaryKeyRowType, + tableInfo.getTableDescriptor().getBucketKey(), + numBuckets); + } + } + } + } - checkArgument(bucketKeyRowType != null, "bucketKeyRowType shouldn't be null."); - List primaryKeyNames = primaryKeyRowType.getFieldNames(); - List bucketKeyNames = bucketKeyRowType.getFieldNames(); - - for (int i = 0; i < bucketKeyNames.size(); i++) { - if (!bucketKeyNames.get(i).equals(primaryKeyNames.get(i))) { - throw new FlussRuntimeException( - String.format( - "To do prefix lookup, the bucket keys must be the prefix subset of " - + "primary keys, but the bucket keys are %s and the primary " - + "keys are %s for table %s", - bucketKeyNames, primaryKeyNames, tablePath)); - } - } + private void validatePrefixLookup(PrefixLookup prefixLookup) { + // 1. verify table descriptor. + if (!hasPrimaryKey) { + throw new FlussRuntimeException( + String.format("None-pk table %s don't support prefix lookup", tablePath)); + } + + checkArgument(bucketKeyRowType != null, "bucketKeyRowType shouldn't be null."); + List pkRemovePartitionFields = primaryKeyRowType.getFieldNames(); + pkRemovePartitionFields.removeAll(partitionKeyNames); + List bucketKeyNames = bucketKeyRowType.getFieldNames(); + + for (int i = 0; i < bucketKeyNames.size(); i++) { + if (!bucketKeyNames.get(i).equals(pkRemovePartitionFields.get(i))) { + throw new FlussRuntimeException( + String.format( + "To do prefix lookup, the bucket keys must be the prefix subset of " + + "primary keys exclude partition fields (if partition table), but " + + "the bucket keys are %s and the primary keys are %s and the primary " + + "key exclude partition fields are %s for table %s", + bucketKeyNames, + primaryKeyRowType.getFieldNames(), + pkRemovePartitionFields, + tablePath)); + } + } - partitionKeyNames.forEach( - partitionKey -> { - if (!bucketKeyNames.contains(partitionKey)) { - throw new FlussRuntimeException( - String.format( - "To do prefix lookup for partitioned primary key table, the " - + "partition keys must be in bucket keys, but the bucket " - + "keys are %s and the partition keys are %s for table %s", - bucketKeyNames, partitionKeyNames, tablePath)); - } - }); - - this.bucketKeyRowPartitionGetter = - partitionKeyNames.size() > 0 - ? new PartitionGetter(bucketKeyRowType, partitionKeyNames) - : null; - - prefixLookupInitialized = true; + // verify PrefixLookup. + String[] prefixLookupColumns = prefixLookup.getLookupColumnNames(); + List prefixLookupColumnList = new ArrayList<>(Arrays.asList(prefixLookupColumns)); + if (!partitionKeyNames.isEmpty()) { + for (int i = 0; i < partitionKeyNames.size(); i++) { + if (!prefixLookupColumnList.contains(partitionKeyNames.get(i))) { + throw new FlussRuntimeException( + String.format( + "To do prefix lookup for partitioned primary key table, the " + + "partition keys must be in lookup columns, but the lookup " + + "columns are %s and the partition keys are %s for table %s", + prefixLookupColumnList, partitionKeyNames, tablePath)); } } + prefixLookupColumnList.removeAll(partitionKeyNames); } + checkArgument( + prefixLookupColumnList.size() == bucketKeyNames.size() + && Arrays.equals( + prefixLookupColumnList.toArray(new String[0]), + bucketKeyNames.toArray(new String[0])), + "To do prefix lookup, the lookup columns must be the bucket key with " + + "partition fields (if partition table), but the lookup columns are %s and the " + + "bucket keys are %s for table %s", + Arrays.asList(prefixLookup.getLookupColumnNames()), + bucketKeyNames, + tablePath); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java index 62fd8ef18..ac3f96b38 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java @@ -18,8 +18,9 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.client.Connection; -import com.alibaba.fluss.client.lookup.LookupResult; -import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -30,7 +31,6 @@ import com.alibaba.fluss.client.table.writer.UpsertWriter; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; -import com.alibaba.fluss.row.InternalRow; import javax.annotation.Nullable; @@ -57,35 +57,6 @@ public interface Table extends AutoCloseable { */ TableDescriptor getDescriptor(); - /** - * Lookups certain row from the given table primary keys. - * - * @param key the given table primary keys. - * @return the result of get. - */ - CompletableFuture lookup(InternalRow key); - - /** - * Prefix lookup certain rows from the given table by prefix key. - * - *

Only available for Primary Key Table. Will throw exception when the table isn't a Primary - * Key Table. - * - *

Note: Currently, if you want to use prefix lookup, the table you created must both define - * the primary key and the bucket key, in addition, the bucket key needs to be part of the - * primary key and must be a prefix of the primary key. For example, if a table has fields - * [a,b,c,d], and the primary key is set to [a, b, c], with the bucket key set to [a, b], then - * the prefix schema would also be [a, b]. This pattern can use PrefixLookup to lookup by prefix - * scan. - * - *

TODO: currently, the interface only support bucket key as the prefix key to lookup. - * Generalize the prefix lookup to support any prefix key including bucket key. - * - * @param bucketKey the given bucket key to do prefix lookup. - * @return the result of prefix lookup. - */ - CompletableFuture prefixLookup(InternalRow bucketKey); - /** * Extracts limit number of rows from the given table bucket. * @@ -135,4 +106,19 @@ CompletableFuture> limitScan( * @return the {@link SnapshotScanner} to scan data from this table. */ SnapshotScanner getSnapshotScanner(SnapshotScan snapshotScan); + + /** + * Get a {@link Lookuper} to do lookup. + * + * @return the {@link Lookuper} to do lookup. + */ + Lookuper getLookuper(); + + /** + * Get a {@link PrefixLookuper} to do prefix lookup. + * + * @param prefixLookup the given prefix lookup. + * @return the {@link PrefixLookuper} to do prefix lookup. + */ + PrefixLookuper getPrefixLookuper(PrefixLookup prefixLookup); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java index 1482318ef..fe84f76e0 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java @@ -33,8 +33,6 @@ import java.util.BitSet; import java.util.concurrent.CompletableFuture; -import static com.alibaba.fluss.metadata.Schema.getKeyRowType; - /** * The writer to write data to the primary key table. * @@ -71,12 +69,7 @@ public UpsertWriter( this.bucketKeyEncoder = null; } else { int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes(); - RowType bucketKeyRowType = getKeyRowType(schema, bucketKeyIndexes); - this.bucketKeyEncoder = - KeyEncoder.createKeyEncoder( - rowType, - bucketKeyRowType.getFieldNames(), - tableDescriptor.getPartitionKeys()); + this.bucketKeyEncoder = new KeyEncoder(rowType, bucketKeyIndexes); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java index e4d381dbf..fb8111481 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java @@ -16,12 +16,23 @@ package com.alibaba.fluss.client.utils; +import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; +import com.alibaba.fluss.client.metadata.MetadataUpdater; +import com.alibaba.fluss.client.table.getter.PartitionGetter; +import com.alibaba.fluss.client.write.HashBucketAssigner; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.exception.IllegalConfigurationException; +import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -105,4 +116,36 @@ public static Integer getPort(String address) { Matcher matcher = HOST_PORT_PATTERN.matcher(address); return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null; } + + /** + * Return the id of the partition the row belongs to. It'll try to update the metadata if the + * partition doesn't exist. If the partition doesn't exist yet after update metadata, it'll + * throw {@link PartitionNotExistException}. + */ + public static Long getPartitionId( + InternalRow row, + @Nullable PartitionGetter partitionGetter, + TablePath tablePath, + MetadataUpdater metadataUpdater) { + Preconditions.checkNotNull(partitionGetter, "partitionGetter shouldn't be null."); + String partitionName = partitionGetter.getPartition(row); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); + metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath); + return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath); + } + + public static int getBucketId( + byte[] keyBytes, + InternalRow key, + LakeTableBucketAssigner lakeTableBucketAssigner, + boolean isDataLakeEnable, + int numBuckets, + MetadataUpdater metadataUpdater) { + if (!isDataLakeEnable) { + return HashBucketAssigner.bucketForRowKey(keyBytes, numBuckets); + } else { + return lakeTableBucketAssigner.assignBucket( + keyBytes, key, metadataUpdater.getCluster()); + } + } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java index 8cb2472d2..84096cce9 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.admin.OffsetSpec.LatestSpec; import com.alibaba.fluss.client.admin.OffsetSpec.TimestampSpec; +import com.alibaba.fluss.client.lookup.Lookuper; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -252,12 +253,13 @@ protected static void verifyPutAndLookup(Table table, Schema tableSchema, Object upsertWriter.upsert(row); upsertWriter.flush(); // lookup this key. + Lookuper lookuper = table.getLookuper(); IndexedRow keyRow = keyRow(tableSchema, fields); - assertThat(lookupRow(table, keyRow)).isEqualTo(row); + assertThat(lookupRow(lookuper, keyRow)).isEqualTo(row); } - protected static InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception { + protected static InternalRow lookupRow(Lookuper lookuper, IndexedRow keyRow) throws Exception { // lookup this key. - return table.lookup(keyRow).get().getRow(); + return lookuper.lookup(keyRow).get().getRow(); } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java index 93fc48ee5..04d7299a8 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java @@ -59,7 +59,7 @@ void testDrainMaxBatchSize() throws Exception { private static void appendLookups(LookupQueue queue, int count) { for (int i = 0; i < count; i++) { - queue.appendLookup(new Lookup(new TableBucket(1, 1), new byte[] {0})); + queue.appendLookup(new LookupQuery(new TableBucket(1, 1), new byte[] {0})); } } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java index 096837d96..7696124a1 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java @@ -17,7 +17,10 @@ package com.alibaba.fluss.client.table; import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -80,13 +83,14 @@ void testPartitionedPrimaryKeyTable() throws Exception { } upsertWriter.flush(); + Lookuper lookuper = table.getLookuper(); // now, let's lookup the written data by look up for (String partition : partitionIdByNames.keySet()) { for (int i = 0; i < recordsPerPartition; i++) { InternalRow actualRow = compactedRow(schema.toRowType(), new Object[] {i, "a" + i, partition}); InternalRow lookupRow = - table.lookup(keyRow(schema, new Object[] {i, null, partition})) + lookuper.lookup(keyRow(schema, new Object[] {i, null, partition})) .get() .getRow(); assertThat(lookupRow).isEqualTo(actualRow); @@ -112,7 +116,7 @@ void testPartitionedTablePrefixLookup() throws Exception { TableDescriptor descriptor = TableDescriptor.builder() .schema(schema) - .distributedBy(3, "a", "b") + .distributedBy(3, "a", "c") .partitionedBy("b") .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) .property( @@ -130,30 +134,34 @@ void testPartitionedTablePrefixLookup() throws Exception { verifyPutAndLookup(table, schema, new Object[] {1, partition, 2L, "value2"}); } - // test prefix lookup. + // test prefix lookup with partition key (a, b, c). Schema prefixKeySchema = Schema.newBuilder() .column("a", DataTypes.INT()) .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) .build(); + RowType prefixKeyRowType = prefixKeySchema.toRowType(); + PrefixLookuper prefixLookuper = + table.getPrefixLookuper( + new PrefixLookup(prefixKeyRowType.getFieldNames().toArray(new String[0]))); for (String partition : partitionIdByNames.keySet()) { CompletableFuture result = - table.prefixLookup( - compactedRow(prefixKeySchema.toRowType(), new Object[] {1, partition})); + prefixLookuper.prefixLookup( + compactedRow( + prefixKeySchema.toRowType(), new Object[] {1, partition, 1L})); PrefixLookupResult prefixLookupResult = result.get(); assertThat(prefixLookupResult).isNotNull(); List rowList = prefixLookupResult.getRowList(); - assertThat(rowList.size()).isEqualTo(2); + assertThat(rowList.size()).isEqualTo(1); assertRowValueEquals( rowType, rowList.get(0), new Object[] {1, partition, 1L, "value1"}); - assertRowValueEquals( - rowType, rowList.get(1), new Object[] {1, partition, 2L, "value2"}); } } @Test - void testPrefixLookupWithPartitionKeysNotInBucketKeys() throws Exception { - // This case partition key 'c' only in pk but not in bucket key (prefix key). + void testPrefixLookupWithPartitionKeysNotInPrefixLookupKeys() throws Exception { + // This case partition key 'c' only in pk but not in prefix key. TablePath tablePath = TablePath.of("test_db_1", "test_partitioned_table_prefix_lookup2"); Schema schema = Schema.newBuilder() @@ -182,24 +190,25 @@ void testPrefixLookupWithPartitionKeysNotInBucketKeys() throws Exception { verifyPutAndLookup(table, schema, new Object[] {1, 1L, partition, "value1"}); } - // test prefix lookup. + // test prefix lookup with (a, b). Schema prefixKeySchema = Schema.newBuilder() .column("a", DataTypes.INT()) .column("b", DataTypes.BIGINT()) .build(); + RowType prefixKeyRowType = prefixKeySchema.toRowType(); assertThatThrownBy( () -> - table.prefixLookup( - compactedRow( - prefixKeySchema.toRowType(), - new Object[] {1, 1L})) - .get()) + table.getPrefixLookuper( + new PrefixLookup( + prefixKeyRowType + .getFieldNames() + .toArray(new String[0])))) .isInstanceOf(FlussRuntimeException.class) .hasMessageContaining( - "To do prefix lookup for partitioned primary key table, the partition keys" - + " must be in bucket keys, but the bucket keys are [a, b] and the " - + "partition keys are [c] for table test_db_1.test_partitioned_table_prefix_lookup2"); + "To do prefix lookup for partitioned primary key table, the partition keys " + + "must be in lookup columns, but the lookup columns are [a, b] and the partition " + + "keys are [c] for table test_db_1.test_partitioned_table_prefix_lookup2"); } @Test @@ -306,11 +315,12 @@ private Map> pollRecords( void testOperateNotExistPartitionShouldThrowException() throws Exception { Schema schema = createPartitionedTable(DATA1_TABLE_PATH_PK, true); Table table = conn.getTable(DATA1_TABLE_PATH_PK); + Lookuper lookuper = table.getLookuper(); // test get for a not exist partition assertThatThrownBy( () -> - table.lookup( + lookuper.lookup( keyRow( schema, new Object[] { diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index cd82b8c78..d7a8f1932 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -19,7 +19,10 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -248,9 +251,12 @@ void testPutAndPrefixLookup() throws Exception { .column("a", DataTypes.INT()) .column("b", DataTypes.STRING()) .build(); + RowType prefixKeyRowType = prefixKeySchema.toRowType(); + PrefixLookuper prefixLookuper = + table.getPrefixLookuper( + new PrefixLookup(prefixKeyRowType.getFieldNames().toArray(new String[0]))); CompletableFuture result = - table.prefixLookup( - compactedRow(prefixKeySchema.toRowType(), new Object[] {1, "a"})); + prefixLookuper.prefixLookup(compactedRow(prefixKeyRowType, new Object[] {1, "a"})); PrefixLookupResult prefixLookupResult = result.get(); assertThat(prefixLookupResult).isNotNull(); List rowList = prefixLookupResult.getRowList(); @@ -260,18 +266,14 @@ void testPutAndPrefixLookup() throws Exception { rowType, rowList.get(i), new Object[] {1, "a", i + 1L, "value" + (i + 1)}); } - result = - table.prefixLookup( - compactedRow(prefixKeySchema.toRowType(), new Object[] {2, "a"})); + result = prefixLookuper.prefixLookup(compactedRow(prefixKeyRowType, new Object[] {2, "a"})); prefixLookupResult = result.get(); assertThat(prefixLookupResult).isNotNull(); rowList = prefixLookupResult.getRowList(); assertThat(rowList.size()).isEqualTo(1); assertRowValueEquals(rowType, rowList.get(0), new Object[] {2, "a", 4L, "value4"}); - result = - table.prefixLookup( - compactedRow(prefixKeySchema.toRowType(), new Object[] {3, "a"})); + result = prefixLookuper.prefixLookup(compactedRow(prefixKeyRowType, new Object[] {3, "a"})); prefixLookupResult = result.get(); assertThat(prefixLookupResult).isNotNull(); rowList = prefixLookupResult.getRowList(); @@ -300,18 +302,53 @@ void testInvalidPrefixLookup() throws Exception { .column("a", DataTypes.INT()) .column("c", DataTypes.STRING()) .build(); + RowType prefixKeyRowType = prefixKeySchema.toRowType(); assertThatThrownBy( () -> - table.prefixLookup( - compactedRow( - prefixKeySchema.toRowType(), - new Object[] {1, "hello"})) - .get()) + table.getPrefixLookuper( + new PrefixLookup( + prefixKeyRowType + .getFieldNames() + .toArray(new String[0])))) .isInstanceOf(FlussRuntimeException.class) .hasMessageContaining( - "To do prefix lookup, the bucket keys must be the prefix subset of primary keys, " - + "but the bucket keys are [a, c] and the primary keys are [a, b, c] for " - + "table test_db_1.test_invalid_prefix_lookup_1"); + "To do prefix lookup, the bucket keys must be the prefix subset of primary keys " + + "exclude partition fields (if partition table), but the bucket keys are [a, c] " + + "and the primary keys are [a, b, c] and the primary key exclude partition " + + "fields are [a, b, c] for table test_db_1.test_invalid_prefix_lookup_1"); + + // Second, test the lookup column names in PrefixLookup not a subset of primary keys. + tablePath = TablePath.of("test_db_1", "test_invalid_prefix_lookup_2"); + schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + descriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a", "b").build(); + createTable(tablePath, descriptor, true); + Table table2 = conn.getTable(tablePath); + + prefixKeySchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("d", DataTypes.STRING()) + .build(); + RowType prefixKeyRowType2 = prefixKeySchema.toRowType(); + assertThatThrownBy( + () -> + table2.getPrefixLookuper( + new PrefixLookup( + prefixKeyRowType2 + .getFieldNames() + .toArray(new String[0])))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "To do prefix lookup, the lookup columns must be the bucket key " + + "with partition fields (if partition table), but the lookup columns are [a, d] and " + + "the bucket keys are [a, b] for table test_db_1.test_invalid_prefix_lookup_2"); } @Test @@ -325,7 +362,8 @@ void testLookupForNotReadyTable() throws Exception { // if you want to test the lookup for not ready table, you can comment the following line. waitAllReplicasReady(tableId, descriptor); Table table = conn.getTable(tablePath); - assertThat(lookupRow(table, rowKey)).isNull(); + Lookuper lookuper = table.getLookuper(); + assertThat(lookupRow(lookuper, rowKey)).isNull(); } @Test @@ -459,10 +497,11 @@ void testPartialPutAndDelete() throws Exception { upsertWriter .upsert(compactedRow(schema.toRowType(), new Object[] {1, "aaa", null, null})) .get(); + Lookuper lookuper = table.getLookuper(); // check the row IndexedRow rowKey = row(pkRowType, new Object[] {1}); - assertThat(lookupRow(table, rowKey)) + assertThat(lookupRow(lookuper, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "aaa", 1, true})); // partial update columns columns: a,b,c @@ -473,14 +512,14 @@ void testPartialPutAndDelete() throws Exception { .get(); // lookup the row - assertThat(lookupRow(table, rowKey)) + assertThat(lookupRow(lookuper, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, true})); // test partial delete, target column is a,b,c upsertWriter .delete(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, null})) .get(); - assertThat(lookupRow(table, rowKey)) + assertThat(lookupRow(lookuper, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, null, null, true})); // partial delete, target column is d @@ -491,7 +530,7 @@ void testPartialPutAndDelete() throws Exception { .get(); // the row should be deleted, shouldn't get the row again - assertThat(lookupRow(table, rowKey)).isNull(); + assertThat(lookupRow(lookuper, rowKey)).isNull(); table.close(); } @@ -544,15 +583,16 @@ void testDelete() throws Exception { try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) { UpsertWriter upsertWriter = table.getUpsertWriter(); upsertWriter.upsert(row).get(); + Lookuper lookuper = table.getLookuper(); // lookup this key. IndexedRow keyRow = keyRow(DATA1_SCHEMA_PK, new Object[] {1, "a"}); - assertThat(lookupRow(table, keyRow)).isEqualTo(row); + assertThat(lookupRow(lookuper, keyRow)).isEqualTo(row); // delete this key. upsertWriter.delete(row).get(); // lookup this key again, will return null. - assertThat(lookupRow(table, keyRow)).isNull(); + assertThat(lookupRow(lookuper, keyRow)).isNull(); } } @@ -909,6 +949,10 @@ void testFirstRowMergeEngine() throws Exception { .build(); RowType rowType = DATA1_SCHEMA_PK.toRowType(); createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false); + + Schema lookupKeySchema = Schema.newBuilder().column("a", DataTypes.INT()).build(); + RowType lookupRowType = lookupKeySchema.toRowType(); + int rows = 5; int duplicateNum = 3; try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) { @@ -923,10 +967,11 @@ void testFirstRowMergeEngine() throws Exception { } upsertWriter.flush(); + Lookuper lookuper = table.getLookuper(); // now, get rows by lookup for (int id = 0; id < rows; id++) { InternalRow gotRow = - table.lookup(keyRow(DATA1_SCHEMA_PK, new Object[] {id, "dumpy"})) + lookuper.lookup(keyRow(DATA1_SCHEMA_PK, new Object[] {id, "dumpy"})) .get() .getRow(); assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id)); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java index 56da1505f..35f143f76 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java @@ -125,6 +125,15 @@ public List getColumnNames(int[] columnIndexes) { return columnNames; } + /** Returns the indexes of the fields in the schema. */ + public int[] getColumnIndexes(List keyNames) { + int[] keyIndexes = new int[keyNames.size()]; + for (int i = 0; i < keyNames.size(); i++) { + keyIndexes[i] = rowType.getFieldIndex(keyNames.get(i)); + } + return keyIndexes; + } + @Override public String toString() { final List components = new ArrayList<>(columns); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 69d8684d3..378d0721b 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -187,11 +187,15 @@ public List getBucketKey() { * Returns the indexes of the bucket key fields in the schema, empty if no bucket key is set. */ public int[] getBucketKeyIndexes() { - List bucketKey = getBucketKey(); + return schema.getColumnIndexes(getBucketKey()); + } + + /** Returns the indexes of the key fields in the schema. */ + public int[] getKeyIndexes(List keyNames) { RowType rowType = schema.toRowType(); - int[] bucketKeyIndex = new int[bucketKey.size()]; - for (int i = 0; i < bucketKey.size(); i++) { - bucketKeyIndex[i] = rowType.getFieldIndex(bucketKey.get(i)); + int[] bucketKeyIndex = new int[keyNames.size()]; + for (int i = 0; i < keyNames.size(); i++) { + bucketKeyIndex[i] = rowType.getFieldIndex(keyNames.get(i)); } return bucketKeyIndex; } @@ -409,6 +413,13 @@ private static TableDistribution normalizeDistribution( if (originDistribution != null) { // we may need to check and normalize bucket key List bucketKeys = originDistribution.getBucketKeys(); + // bucket key shouldn't include partition key + if (bucketKeys.stream().anyMatch(partitionKeys::contains)) { + throw new IllegalArgumentException( + String.format( + "Bucket key %s shouldn't include any column in partition keys %s.", + bucketKeys, partitionKeys)); + } // if primary key set if (schema.getPrimaryKey().isPresent()) { diff --git a/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java b/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java index 35ea95dcb..cec8894a6 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java @@ -174,6 +174,18 @@ void testPrimaryKeyDifferentWithBucketKeys() { "Bucket keys must be a subset of primary keys excluding partition keys for primary-key tables. " + "The primary keys are [f0, f3], the partition keys are [], " + "but the user-defined bucket keys are [f0, f1]."); + + // bucket key shouldn't include partition key + assertThatThrownBy( + () -> + TableDescriptor.builder() + .schema(SCHEMA_1) + .partitionedBy("f0") + .distributedBy(3, "f0", "f3") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Bucket key [f0, f3] shouldn't include any column in partition keys [f0]."); } @Test @@ -307,6 +319,18 @@ void testPartitionedTable() { TableDescriptor.builder().schema(SCHEMA_1).partitionedBy("f0").build(); assertThat(tableDescriptor.getTableDistribution().get().getBucketKeys()) .isEqualTo(Collections.singletonList("f3")); + + // bucket key contains partitioned key, throw exception + assertThatThrownBy( + () -> + TableDescriptor.builder() + .schema(SCHEMA_1) + .partitionedBy("f0") + .distributedBy(1, "f0", "f3") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Bucket key [f0, f3] shouldn't include any column in partition keys [f0]."); } @Test diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java index ae53093d7..a35771b06 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -20,7 +20,10 @@ import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.lookup.LookupType; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.connector.flink.source.lookup.LookupNormalizer.RemainingFilter; @@ -48,6 +51,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import static com.alibaba.fluss.utils.Preconditions.checkArgument; + /** A flink async lookup function for fluss. */ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { @@ -67,6 +72,8 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter; private transient Connection connection; private transient Table table; + @Nullable private transient PrefixLookuper prefixLookuper; + @Nullable private transient Lookuper lookuper; public FlinkAsyncLookupFunction( Configuration flussConfig, @@ -91,10 +98,10 @@ public void open(FunctionContext context) { table = connection.getTable(tablePath); // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); + RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes); flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes), - table.getDescriptor().getKvFormat()); + lookupKeyRowType, table.getDescriptor().getKvFormat()); final RowType outputRowType; if (projection == null) { @@ -104,6 +111,18 @@ public void open(FunctionContext context) { } flussRowToFlinkRowConverter = new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType)); + + if (flussLookupType == LookupType.PREFIX_LOOKUP) { + prefixLookuper = + table.getPrefixLookuper( + new PrefixLookup( + lookupKeyRowType.getFieldNames().toArray(new String[0]))); + lookuper = null; + } else if (flussLookupType == LookupType.LOOKUP) { + prefixLookuper = null; + lookuper = table.getLookuper(); + } + LOG.info("end open."); } @@ -137,7 +156,8 @@ private void fetchResult( InternalRow keyRow, @Nullable RemainingFilter remainingFilter) { if (flussLookupType == LookupType.LOOKUP) { - table.lookup(keyRow) + checkArgument(lookuper != null, "Lookuper should not be null"); + lookuper.lookup(keyRow) .whenComplete( (result, throwable) -> { if (throwable != null) { @@ -152,7 +172,9 @@ private void fetchResult( } }); } else { - table.prefixLookup(keyRow) + checkArgument(prefixLookuper != null, "prefixLookuper should not be null"); + prefixLookuper + .prefixLookup(keyRow) .whenComplete( (result, throwable) -> { if (throwable != null) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java index 20788e53d..0872264cc 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java @@ -19,6 +19,9 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.lookup.LookupType; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.connector.flink.utils.FlinkConversions; @@ -43,6 +46,8 @@ import java.util.Collections; import java.util.List; +import static com.alibaba.fluss.utils.Preconditions.checkArgument; + /** A flink lookup function for fluss. */ public class FlinkLookupFunction extends LookupFunction { @@ -62,6 +67,8 @@ public class FlinkLookupFunction extends LookupFunction { private transient Connection connection; private transient Table table; @Nullable private transient ProjectedRow projectedRow; + @Nullable private transient PrefixLookuper prefixLookuper; + @Nullable private transient Lookuper lookuper; public FlinkLookupFunction( Configuration flussConfig, @@ -86,10 +93,10 @@ public void open(FunctionContext context) { table = connection.getTable(tablePath); // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); + RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes); flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes), - table.getDescriptor().getKvFormat()); + lookupKeyRowType, table.getDescriptor().getKvFormat()); final RowType outputRowType; if (projection == null) { @@ -103,6 +110,17 @@ public void open(FunctionContext context) { flussRowToFlinkRowConverter = new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType)); + if (flussLookupType == LookupType.PREFIX_LOOKUP) { + prefixLookuper = + table.getPrefixLookuper( + new PrefixLookup( + lookupKeyRowType.getFieldNames().toArray(new String[0]))); + lookuper = null; + } else if (flussLookupType == LookupType.LOOKUP) { + prefixLookuper = null; + lookuper = table.getLookuper(); + } + LOG.info("end open."); } @@ -125,7 +143,8 @@ public Collection lookup(RowData keyRow) { for (int retry = 0; retry <= maxRetryTimes; retry++) { try { if (flussLookupType == LookupType.LOOKUP) { - InternalRow row = table.lookup(flussKeyRow).get().getRow(); + checkArgument(lookuper != null, "prefixLookuper should not be null"); + InternalRow row = lookuper.lookup(flussKeyRow).get().getRow(); if (row != null) { RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); @@ -137,8 +156,9 @@ public Collection lookup(RowData keyRow) { } } else { List projectedRows = new ArrayList<>(); + checkArgument(prefixLookuper != null, "prefixLookuper should not be null"); List lookupRows = - table.prefixLookup(flussKeyRow).get().getRowList(); + prefixLookuper.prefixLookup(flussKeyRow).get().getRowList(); for (InternalRow row : lookupRows) { if (row != null) { RowData flinkRow = diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java index 34450ba64..064a74d46 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java @@ -908,6 +908,27 @@ void testPrefixLookup(Caching caching, boolean async) throws Exception { assertResultsIgnoreOrder(collected, expected, true); } + @ParameterizedTest + @MethodSource("lookupArgs") + void testPrefixLookupPartitionedTable(Caching caching, boolean async) throws Exception { + String dim = + prepareDimTableAndSourceTable( + caching, + async, + new String[] {"name", "id"}, + new String[] {"name"}, + "p_date"); + String dimJoinQuery = + String.format( + "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" + + " ON src.b = h.name AND src.p_date = h.p_date", + dim); + + CloseableIterator collected = tEnv.executeSql(dimJoinQuery).collect(); + List expected = Arrays.asList("+I[1, name1, address1]", "+I[1, name1, address5]"); + assertResultsIgnoreOrder(collected, expected, true); + } + @ParameterizedTest @MethodSource("lookupArgs") void testPrefixLookupWithCondition(Caching caching, boolean async) throws Exception {