diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java b/fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java index 5cfa29fd1..45083b25e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java @@ -78,12 +78,12 @@ public Table getTable(TablePath tablePath) { tablePath, rpcClient, metadataUpdater, - this::maybeCreateWriter, - this::maybeCreateLookupClient, + this::getOrCreateWriterClient, + this::getOrCreateLookupClient, clientMetricGroup); } - private WriterClient maybeCreateWriter() { + public WriterClient getOrCreateWriterClient() { if (writerClient == null) { synchronized (this) { if (writerClient == null) { @@ -94,7 +94,7 @@ private WriterClient maybeCreateWriter() { return writerClient; } - private LookupClient maybeCreateLookupClient() { + public LookupClient getOrCreateLookupClient() { if (lookupClient == null) { synchronized (this) { if (lookupClient == null) { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java index 0dcd52aa6..2b2399add 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java @@ -26,7 +26,7 @@ @Internal public abstract class AbstractLookupBatch { - protected final List> lookups; + protected final List> lookups; private final TableBucket tableBucket; public AbstractLookupBatch(TableBucket tableBucket) { @@ -37,11 +37,11 @@ public AbstractLookupBatch(TableBucket tableBucket) { /** Complete the lookup operations using given values . */ public abstract void complete(List values); - public void addLookup(AbstractLookup lookup) { + public void addLookup(AbstractLookupQuery lookup) { lookups.add(lookup); } - public List> lookups() { + public List> lookups() { return lookups; } @@ -51,7 +51,7 @@ public TableBucket tableBucket() { /** Complete the get operations with given exception. */ public void completeExceptionally(Exception exception) { - for (AbstractLookup lookup : lookups) { + for (AbstractLookupQuery lookup : lookups) { lookup.future().completeExceptionally(exception); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupQuery.java similarity index 91% rename from fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java rename to fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupQuery.java index 9d1e9a2a5..9f50fe95b 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupQuery.java @@ -23,12 +23,12 @@ /** Abstract Class to represent a lookup operation. */ @Internal -public abstract class AbstractLookup { +public abstract class AbstractLookupQuery { private final TableBucket tableBucket; private final byte[] key; - public AbstractLookup(TableBucket tableBucket, byte[] key) { + public AbstractLookupQuery(TableBucket tableBucket, byte[] key) { this.tableBucket = tableBucket; this.key = key; } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussLookuper.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussLookuper.java new file mode 100644 index 000000000..8b0d3b75b --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussLookuper.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; +import com.alibaba.fluss.client.metadata.MetadataUpdater; +import com.alibaba.fluss.client.table.getter.PartitionGetter; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.types.RowType; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +import static com.alibaba.fluss.client.utils.ClientUtils.getBucketId; +import static com.alibaba.fluss.client.utils.ClientUtils.getPartitionId; +import static com.alibaba.fluss.utils.Preconditions.checkArgument; + +/** + * The default impl of {@link Lookuper}. + * + * @since 0.6 + */ +@PublicEvolving +public class FlussLookuper implements Lookuper { + + private final TableInfo tableInfo; + + private final MetadataUpdater metadataUpdater; + + private final LookupClient lookupClient; + + private final KeyEncoder primaryKeyEncoder; + + /** + * Extract bucket key from lookup key row, use {@link #primaryKeyEncoder} if is default bucket + * key (bucket key = physical primary key). + */ + private final KeyEncoder bucketKeyEncoder; + + private final boolean isDataLakeEnable; + + private final int numBuckets; + + private final LakeTableBucketAssigner lakeTableBucketAssigner; + + /** a getter to extract partition from lookup key row, null when it's not a partitioned. */ + private @Nullable final PartitionGetter partitionGetter; + + /** Decode the lookup bytes to result row. */ + private final ValueDecoder kvValueDecoder; + + public FlussLookuper( + TableInfo tableInfo, + int numBuckets, + MetadataUpdater metadataUpdater, + LookupClient lookupClient, + ValueDecoder kvValueDecoder) { + checkArgument( + tableInfo.getTableDescriptor().hasPrimaryKey(), + "Log table %s doesn't support lookup", + tableInfo.getTablePath()); + this.tableInfo = tableInfo; + this.numBuckets = numBuckets; + this.metadataUpdater = metadataUpdater; + this.lookupClient = lookupClient; + + TableDescriptor tableDescriptor = tableInfo.getTableDescriptor(); + Schema schema = tableDescriptor.getSchema(); + RowType primaryKeyRowType = schema.toRowType().project(schema.getPrimaryKeyIndexes()); + this.primaryKeyEncoder = + KeyEncoder.createKeyEncoder( + primaryKeyRowType, + primaryKeyRowType.getFieldNames(), + tableDescriptor.getPartitionKeys()); + if (tableDescriptor.isDefaultBucketKey()) { + this.bucketKeyEncoder = primaryKeyEncoder; + } else { + // bucket key doesn't contain partition key, so no need exclude partition keys + this.bucketKeyEncoder = + new KeyEncoder(primaryKeyRowType, tableDescriptor.getBucketKeyIndexes()); + } + this.isDataLakeEnable = tableInfo.getTableDescriptor().isDataLakeEnabled(); + this.lakeTableBucketAssigner = + new LakeTableBucketAssigner( + primaryKeyRowType, tableDescriptor.getBucketKey(), numBuckets); + this.partitionGetter = + tableDescriptor.isPartitioned() + ? new PartitionGetter(primaryKeyRowType, tableDescriptor.getPartitionKeys()) + : null; + this.kvValueDecoder = kvValueDecoder; + } + + @Override + public CompletableFuture lookup(InternalRow lookupKey) { + // encoding the key row using a compacted way consisted with how the key is encoded when put + // a row + byte[] pkBytes = primaryKeyEncoder.encode(lookupKey); + byte[] bkBytes = + bucketKeyEncoder == primaryKeyEncoder + ? pkBytes + : bucketKeyEncoder.encode(lookupKey); + Long partitionId = + partitionGetter == null + ? null + : getPartitionId( + lookupKey, + partitionGetter, + tableInfo.getTablePath(), + metadataUpdater); + int bucketId = + getBucketId( + bkBytes, + lookupKey, + lakeTableBucketAssigner, + isDataLakeEnable, + numBuckets, + metadataUpdater); + TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); + return lookupClient + .lookup(tableBucket, pkBytes) + .thenApply( + valueBytes -> { + InternalRow row = + valueBytes == null + ? null + : kvValueDecoder.decodeValue(valueBytes).row; + return new LookupResult(row); + }); + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussPrefixLookuper.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussPrefixLookuper.java new file mode 100644 index 000000000..a22ae8ad4 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/FlussPrefixLookuper.java @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; +import com.alibaba.fluss.client.metadata.MetadataUpdater; +import com.alibaba.fluss.client.table.getter.PartitionGetter; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.types.RowType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static com.alibaba.fluss.client.utils.ClientUtils.getBucketId; +import static com.alibaba.fluss.client.utils.ClientUtils.getPartitionId; + +/** + * The default impl of {@link PrefixLookuper}. + * + * @since 0.6 + */ +@PublicEvolving +public class FlussPrefixLookuper implements PrefixLookuper { + + private final TableInfo tableInfo; + + private final MetadataUpdater metadataUpdater; + + private final LookupClient lookupClient; + + /** Extract bucket key from prefix lookup key row. */ + private final KeyEncoder bucketKeyEncoder; + + private final boolean isDataLakeEnable; + + private final int numBuckets; + + private final LakeTableBucketAssigner lakeTableBucketAssigner; + + /** + * a getter to extract partition from prefix lookup key row, null when it's not a partitioned. + */ + private @Nullable final PartitionGetter partitionGetter; + + /** Decode the lookup bytes to result row. */ + private final ValueDecoder kvValueDecoder; + + public FlussPrefixLookuper( + TableInfo tableInfo, + int numBuckets, + MetadataUpdater metadataUpdater, + LookupClient lookupClient, + List lookupColumnNames, + ValueDecoder kvValueDecoder) { + // sanity check + validatePrefixLookup(tableInfo, lookupColumnNames); + // initialization + this.tableInfo = tableInfo; + this.numBuckets = numBuckets; + this.metadataUpdater = metadataUpdater; + this.lookupClient = lookupClient; + TableDescriptor tableDescriptor = tableInfo.getTableDescriptor(); + Schema schema = tableDescriptor.getSchema(); + RowType prefixKeyRowType = + schema.toRowType().project(schema.getColumnIndexes(lookupColumnNames)); + this.bucketKeyEncoder = + KeyEncoder.createKeyEncoder( + prefixKeyRowType, + tableDescriptor.getBucketKey(), + tableDescriptor.getPartitionKeys()); + this.isDataLakeEnable = tableInfo.getTableDescriptor().isDataLakeEnabled(); + this.lakeTableBucketAssigner = + new LakeTableBucketAssigner( + prefixKeyRowType, tableDescriptor.getBucketKey(), numBuckets); + this.partitionGetter = + tableDescriptor.isPartitioned() + ? new PartitionGetter(prefixKeyRowType, tableDescriptor.getPartitionKeys()) + : null; + this.kvValueDecoder = kvValueDecoder; + } + + private void validatePrefixLookup(TableInfo tableInfo, List lookupColumns) { + // verify is primary key table + Schema schema = tableInfo.getTableDescriptor().getSchema(); + if (!schema.getPrimaryKey().isPresent()) { + throw new IllegalArgumentException( + String.format( + "Log table %s doesn't support prefix lookup", + tableInfo.getTablePath())); + } + + // verify the bucket keys are the prefix subset of physical primary keys + List physicalPrimaryKeys = schema.getPrimaryKey().get().getColumnNames(); + physicalPrimaryKeys.removeAll(tableInfo.getTableDescriptor().getPartitionKeys()); + List bucketKeys = tableInfo.getTableDescriptor().getBucketKey(); + for (int i = 0; i < bucketKeys.size(); i++) { + if (!bucketKeys.get(i).equals(physicalPrimaryKeys.get(i))) { + throw new IllegalArgumentException( + String.format( + "Can not perform prefix lookup on table '%s', " + + "because the bucket keys %s is not a prefix subset of the " + + "physical primary keys %s (excluded partition fields if present).", + tableInfo.getTablePath(), bucketKeys, physicalPrimaryKeys)); + } + } + + // verify the lookup columns must contain all partition fields if this is partitioned table + if (tableInfo.getTableDescriptor().isPartitioned()) { + List partitionKeys = tableInfo.getTableDescriptor().getPartitionKeys(); + Set lookupColumnsSet = new HashSet<>(lookupColumns); + if (!lookupColumnsSet.containsAll(partitionKeys)) { + throw new IllegalArgumentException( + String.format( + "Can not perform prefix lookup on table '%s', " + + "because the lookup columns %s must contain all partition fields %s.", + tableInfo.getTablePath(), lookupColumns, partitionKeys)); + } + } + + // verify the lookup columns must contain all bucket keys **in order** + List physicalLookupColumns = new ArrayList<>(lookupColumns); + physicalLookupColumns.removeAll(tableInfo.getTableDescriptor().getPartitionKeys()); + if (!physicalLookupColumns.equals(bucketKeys)) { + throw new IllegalArgumentException( + String.format( + "Can not perform prefix lookup on table '%s', " + + "because the lookup columns %s must contain all bucket keys %s in order.", + tableInfo.getTablePath(), lookupColumns, bucketKeys)); + } + } + + @Override + public CompletableFuture prefixLookup(InternalRow prefixKey) { + byte[] bucketKeyBytes = bucketKeyEncoder.encode(prefixKey); + int bucketId = + getBucketId( + bucketKeyBytes, + prefixKey, + lakeTableBucketAssigner, + isDataLakeEnable, + numBuckets, + metadataUpdater); + + Long partitionId = null; + if (partitionGetter != null) { + partitionId = + getPartitionId( + prefixKey, partitionGetter, tableInfo.getTablePath(), metadataUpdater); + } + + TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); + return lookupClient + .prefixLookup(tableBucket, bucketKeyBytes) + .thenApply( + result -> { + List rowList = new ArrayList<>(result.size()); + for (byte[] valueBytes : result) { + if (valueBytes == null) { + continue; + } + rowList.add(kvValueDecoder.decodeValue(valueBytes).row); + } + return new PrefixLookupResult(rowList); + }); + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java index 751c6eea8..0705636c6 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java @@ -30,18 +30,18 @@ public class LookupBatch { /** The table bucket that the lookup operations should fall into. */ private final TableBucket tableBucket; - private final List lookups; + private final List lookups; public LookupBatch(TableBucket tableBucket) { this.tableBucket = tableBucket; this.lookups = new ArrayList<>(); } - public void addLookup(Lookup lookup) { + public void addLookup(LookupQuery lookup) { lookups.add(lookup); } - public List lookups() { + public List lookups() { return lookups; } @@ -62,7 +62,7 @@ public void complete(List values) { values.size(), lookups.size()))); } else { for (int i = 0; i < values.size(); i++) { - AbstractLookup lookup = lookups.get(i); + AbstractLookupQuery lookup = lookups.get(i); // single value. lookup.future().complete(values.get(i)); } @@ -71,7 +71,7 @@ public void complete(List values) { /** Complete the lookup operations with given exception. */ public void completeExceptionally(Exception exception) { - for (Lookup lookup : lookups) { + for (LookupQuery lookup : lookups) { lookup.future().completeExceptionally(exception); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java index 72927a368..69d59034f 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java @@ -77,15 +77,13 @@ private ExecutorService createThreadPool() { } public CompletableFuture lookup(TableBucket tableBucket, byte[] keyBytes) { - Lookup lookup = new Lookup(tableBucket, keyBytes); + LookupQuery lookup = new LookupQuery(tableBucket, keyBytes); lookupQueue.appendLookup(lookup); return lookup.future(); } - public CompletableFuture> prefixLookup( - long tableId, int bucketId, byte[] keyBytes) { - // TODO prefix lookup support partition table (#266) - PrefixLookup prefixLookup = new PrefixLookup(new TableBucket(tableId, bucketId), keyBytes); + public CompletableFuture> prefixLookup(TableBucket tableBucket, byte[] keyBytes) { + PrefixLookupQuery prefixLookup = new PrefixLookupQuery(tableBucket, keyBytes); lookupQueue.appendLookup(prefixLookup); return prefixLookup.future(); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQuery.java similarity index 91% rename from fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java rename to fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQuery.java index 8d340a9d6..f8533e7f6 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQuery.java @@ -26,11 +26,11 @@ * from, the bytes of the key, and a future for the lookup operation. */ @Internal -public class Lookup extends AbstractLookup { +public class LookupQuery extends AbstractLookupQuery { private final CompletableFuture future; - Lookup(TableBucket tableBucket, byte[] key) { + LookupQuery(TableBucket tableBucket, byte[] key) { super(tableBucket, key); this.future = new CompletableFuture<>(); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java index 7cc20e393..ca9419257 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java @@ -28,8 +28,8 @@ import java.util.concurrent.TimeUnit; /** - * A queue that buffers the pending lookup operations and provides a list of {@link Lookup} when - * call method {@link #drain()}. + * A queue that buffers the pending lookup operations and provides a list of {@link LookupQuery} + * when call method {@link #drain()}. */ @ThreadSafe @Internal @@ -37,7 +37,7 @@ class LookupQueue { private volatile boolean closed; // buffering both the Lookup and PrefixLookup. - private final ArrayBlockingQueue> lookupQueue; + private final ArrayBlockingQueue> lookupQueue; private final int maxBatchSize; private final long batchTimeoutNanos; @@ -49,7 +49,7 @@ class LookupQueue { this.closed = false; } - void appendLookup(AbstractLookup lookup) { + void appendLookup(AbstractLookupQuery lookup) { if (closed) { throw new IllegalStateException( "Can not append lookup operation since the LookupQueue is closed."); @@ -66,10 +66,10 @@ boolean hasUnDrained() { return !lookupQueue.isEmpty(); } - /** Drain a batch of {@link Lookup}s from the lookup queue. */ - List> drain() throws Exception { + /** Drain a batch of {@link LookupQuery}s from the lookup queue. */ + List> drain() throws Exception { final long startNanos = System.nanoTime(); - List> lookupOperations = new ArrayList<>(maxBatchSize); + List> lookupOperations = new ArrayList<>(maxBatchSize); int count = 0; while (true) { long waitNanos = batchTimeoutNanos - (System.nanoTime() - startNanos); @@ -77,7 +77,7 @@ List> drain() throws Exception { break; } - AbstractLookup lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS); + AbstractLookupQuery lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS); if (lookup == null) { break; } @@ -92,9 +92,9 @@ List> drain() throws Exception { return lookupOperations; } - /** Drain all the {@link Lookup}s from the lookup queue. */ - List> drainAll() { - List> lookupOperations = new ArrayList<>(lookupQueue.size()); + /** Drain all the {@link LookupQuery}s from the lookup queue. */ + List> drainAll() { + List> lookupOperations = new ArrayList<>(lookupQueue.size()); lookupQueue.drainTo(lookupOperations); return lookupOperations; } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java index d31f7bbeb..ab446a575 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java @@ -17,7 +17,6 @@ package com.alibaba.fluss.client.lookup; import com.alibaba.fluss.annotation.PublicEvolving; -import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.row.InternalRow; import javax.annotation.Nullable; @@ -25,7 +24,7 @@ import java.util.Objects; /** - * The result of {@link Table#lookup(InternalRow)}. + * The result of {@link Lookuper#lookup(InternalRow)}. * * @since 0.1 */ diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java index cd3be7be3..668e3f138 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java @@ -102,28 +102,29 @@ public void run() { /** Run a single iteration of sending. */ private void runOnce(boolean drainAll) throws Exception { - List> lookups = drainAll ? lookupQueue.drainAll() : lookupQueue.drain(); + List> lookups = + drainAll ? lookupQueue.drainAll() : lookupQueue.drain(); sendLookups(lookups); } - private void sendLookups(List> lookups) { + private void sendLookups(List> lookups) { if (lookups.isEmpty()) { return; } // group by to lookup batches - Map, List>> lookupBatches = + Map, List>> lookupBatches = groupByLeaderAndType(lookups); // now, send the batches lookupBatches.forEach( (destAndType, batch) -> sendLookups(destAndType.f0, destAndType.f1, batch)); } - private Map, List>> groupByLeaderAndType( - List> lookups) { + private Map, List>> groupByLeaderAndType( + List> lookups) { // -> lookup batches - Map, List>> lookupBatchesByLeader = + Map, List>> lookupBatchesByLeader = new HashMap<>(); - for (AbstractLookup lookup : lookups) { + for (AbstractLookupQuery lookup : lookups) { int leader; // lookup the leader node TableBucket tb = lookup.tableBucket(); @@ -143,7 +144,7 @@ private Map, List>> groupByLeaderA } private void sendLookups( - int destination, LookupType lookupType, List> lookupBatches) { + int destination, LookupType lookupType, List> lookupBatches) { TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination); if (lookupType == LookupType.LOOKUP) { @@ -155,11 +156,12 @@ private void sendLookups( } } - private void sendLookupRequest(TabletServerGateway gateway, List> lookups) { + private void sendLookupRequest( + TabletServerGateway gateway, List> lookups) { // table id -> (bucket -> lookups) Map> lookupByTableId = new HashMap<>(); - for (AbstractLookup abstractLookup : lookups) { - Lookup lookup = (Lookup) abstractLookup; + for (AbstractLookupQuery abstractLookupQuery : lookups) { + LookupQuery lookup = (LookupQuery) abstractLookupQuery; TableBucket tb = lookup.tableBucket(); long tableId = tb.getTableId(); lookupByTableId @@ -178,11 +180,11 @@ private void sendLookupRequest(TabletServerGateway gateway, List> prefixLookups) { + TabletServerGateway gateway, List> prefixLookups) { // table id -> (bucket -> lookups) Map> lookupByTableId = new HashMap<>(); - for (AbstractLookup abstractLookup : prefixLookups) { - PrefixLookup prefixLookup = (PrefixLookup) abstractLookup; + for (AbstractLookupQuery abstractLookupQuery : prefixLookups) { + PrefixLookupQuery prefixLookup = (PrefixLookupQuery) abstractLookupQuery; TableBucket tb = prefixLookup.tableBucket(); long tableId = tb.getTableId(); lookupByTableId diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookuper.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookuper.java new file mode 100644 index 000000000..a6e18fc10 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookuper.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.row.InternalRow; + +import java.util.concurrent.CompletableFuture; + +/** + * The lookup-er is used to lookup row of a primary key table by primary key. + * + * @since 0.6 + */ +@PublicEvolving +public interface Lookuper { + + /** + * Lookups certain row from the given table primary keys. + * + * @param lookupKey the given table primary keys. + * @return the result of lookup. + */ + CompletableFuture lookup(InternalRow lookupKey); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java index a9f1050d9..fcf944d41 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java @@ -16,32 +16,35 @@ package com.alibaba.fluss.client.lookup; -import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.row.InternalRow; import java.util.List; -import java.util.concurrent.CompletableFuture; /** - * Class to represent a prefix lookup operation, it contains the table id, bucketNums and related - * CompletableFuture. + * Used to describe the operation to prefix lookup by {@link PrefixLookuper} to a primary key table. + * + * @since 0.6 */ -@Internal -public class PrefixLookup extends AbstractLookup> { - private final CompletableFuture> future; +@PublicEvolving +public class PrefixLookup { - PrefixLookup(TableBucket tableBucket, byte[] prefixKey) { - super(tableBucket, prefixKey); - this.future = new CompletableFuture<>(); - } + /** + * Currently, For non-partitioned table, the lookupColumnNames can only be the field of bucket + * key. + * + *

For partitioned table, the lookupColumnNames exclude partition fields should be a prefix + * of primary key exclude partition fields. + * + *

See {@link PrefixLookuper#prefixLookup(InternalRow)} for more details. + */ + private final List lookupColumnNames; - @Override - public CompletableFuture> future() { - return future; + public PrefixLookup(List lookupColumnNames) { + this.lookupColumnNames = lookupColumnNames; } - @Override - public LookupType lookupType() { - return LookupType.PREFIX_LOOKUP; + public List getLookupColumnNames() { + return lookupColumnNames; } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java index 6c9736816..b234175e8 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java @@ -33,18 +33,18 @@ public class PrefixLookupBatch { /** The table bucket that the lookup operations should fall into. */ private final TableBucket tableBucket; - private final List prefixLookups; + private final List prefixLookups; public PrefixLookupBatch(TableBucket tableBucket) { this.tableBucket = tableBucket; this.prefixLookups = new ArrayList<>(); } - public void addLookup(PrefixLookup lookup) { + public void addLookup(PrefixLookupQuery lookup) { prefixLookups.add(lookup); } - public List lookups() { + public List lookups() { return prefixLookups; } @@ -62,7 +62,7 @@ public void complete(List> values) { values.size(), prefixLookups.size()))); } else { for (int i = 0; i < values.size(); i++) { - AbstractLookup> lookup = prefixLookups.get(i); + AbstractLookupQuery> lookup = prefixLookups.get(i); lookup.future().complete(values.get(i)); } } @@ -70,7 +70,7 @@ public void complete(List> values) { /** Complete the get operations with given exception. */ public void completeExceptionally(Exception exception) { - for (PrefixLookup prefixLookup : prefixLookups) { + for (PrefixLookupQuery prefixLookup : prefixLookups) { prefixLookup.future().completeExceptionally(exception); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupQuery.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupQuery.java new file mode 100644 index 000000000..0aba576e8 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupQuery.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; +import com.alibaba.fluss.metadata.TableBucket; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Class to represent a prefix lookup operation, it contains the table id, bucketNums and related + * CompletableFuture. + */ +@Internal +public class PrefixLookupQuery extends AbstractLookupQuery> { + private final CompletableFuture> future; + + PrefixLookupQuery(TableBucket tableBucket, byte[] prefixKey) { + super(tableBucket, prefixKey); + this.future = new CompletableFuture<>(); + } + + @Override + public CompletableFuture> future() { + return future; + } + + @Override + public LookupType lookupType() { + return LookupType.PREFIX_LOOKUP; + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java index 3a6e31869..8b93e8092 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java @@ -16,16 +16,17 @@ package com.alibaba.fluss.client.lookup; -import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.row.InternalRow; import java.util.List; /** - * The result of {@link Table#prefixLookup(InternalRow)}}. + * The result of {@link PrefixLookuper#prefixLookup(InternalRow)}}. * * @since 0.6 */ +@PublicEvolving public class PrefixLookupResult { private final List rowList; diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookuper.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookuper.java new file mode 100644 index 000000000..1a771067e --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookuper.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.row.InternalRow; + +import java.util.concurrent.CompletableFuture; + +/** + * The prefix lookup-er is used to lookup rows of a primary key table by a prefix of primary key. + * + * @since 0.6 + */ +@PublicEvolving +public interface PrefixLookuper { + + /** + * Prefix lookup certain rows from the given table by a prefix of primary key. + * + *

Only available for Primary Key Table. Will throw exception when the table isn't a Primary + * Key Table. + * + *

Note: Currently, if you want to use prefix lookup, the table you created must both define + * the primary key and the bucket key, in addition, the prefixKey needs to be equals with bucket + * key, and to be a part of the primary key and must be a prefix of the primary key. For + * example, if a table has fields [a,b,c,d], and the primary key is set to [a, b, c], with the + * bucket key set to [a, b], then the schema of the prefixKey would also be [a, b]. This pattern + * can use PrefixLookup to lookup by prefix scan. + * + *

TODO: currently, the interface only support bucket key as the prefixKey to lookup. + * Generalize the prefix lookup to support any prefixKey including bucket key. + * + *

We also support prefix lookup for partitioned table. The schema of the prefixKey should + * contain partition fields and bucket key. In addition, the schema of the prefixKey exclude + * partition fields should be a prefix of primary key exclude partition fields. + * + * @param prefixKey the given prefix key to do prefix lookup. + * @return the result of prefix lookup. + */ + CompletableFuture prefixLookup(InternalRow prefixKey); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java index 0d98129e6..1dd395133 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java @@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong; /** - * The default impl of {@link FlussLogScanner}. + * The default impl of {@link LogScanner}. * *

The {@link FlussLogScanner} is NOT thread-safe. It is the responsibility of the user to ensure * that multithreaded access is properly synchronized. Un-synchronized access will result in {@link diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 779fb63ee..a634d2780 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -17,10 +17,12 @@ package com.alibaba.fluss.client.table; import com.alibaba.fluss.annotation.PublicEvolving; -import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; +import com.alibaba.fluss.client.lookup.FlussLookuper; +import com.alibaba.fluss.client.lookup.FlussPrefixLookuper; import com.alibaba.fluss.client.lookup.LookupClient; -import com.alibaba.fluss.client.lookup.LookupResult; -import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.metadata.MetadataUpdater; import com.alibaba.fluss.client.scanner.RemoteFileDownloader; import com.alibaba.fluss.client.scanner.ScanRecord; @@ -29,7 +31,6 @@ import com.alibaba.fluss.client.scanner.log.LogScanner; import com.alibaba.fluss.client.scanner.snapshot.SnapshotScan; import com.alibaba.fluss.client.scanner.snapshot.SnapshotScanner; -import com.alibaba.fluss.client.table.getter.PartitionGetter; import com.alibaba.fluss.client.table.writer.AppendWriter; import com.alibaba.fluss.client.table.writer.UpsertWrite; import com.alibaba.fluss.client.table.writer.UpsertWriter; @@ -37,14 +38,10 @@ import com.alibaba.fluss.client.token.DefaultSecurityTokenProvider; import com.alibaba.fluss.client.token.SecurityTokenManager; import com.alibaba.fluss.client.token.SecurityTokenProvider; -import com.alibaba.fluss.client.write.HashBucketAssigner; import com.alibaba.fluss.client.write.WriterClient; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.FlussRuntimeException; -import com.alibaba.fluss.exception.PartitionNotExistException; -import com.alibaba.fluss.metadata.PhysicalTablePath; -import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; @@ -61,7 +58,6 @@ import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.ProjectedRow; import com.alibaba.fluss.row.decode.RowDecoder; -import com.alibaba.fluss.row.encode.KeyEncoder; import com.alibaba.fluss.row.encode.ValueDecoder; import com.alibaba.fluss.rpc.GatewayClientProxy; import com.alibaba.fluss.rpc.RpcClient; @@ -71,11 +67,9 @@ import com.alibaba.fluss.rpc.messages.LimitScanResponse; import com.alibaba.fluss.rpc.metrics.ClientMetricGroup; import com.alibaba.fluss.rpc.protocol.ApiError; -import com.alibaba.fluss.types.DataField; import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.CloseableIterator; -import com.alibaba.fluss.utils.Preconditions; import javax.annotation.Nullable; @@ -98,22 +92,14 @@ public class FlussTable implements Table { private final Configuration conf; - private final long tableId; private final TablePath tablePath; private final RpcClient rpcClient; private final MetadataUpdater metadataUpdater; private final TableInfo tableInfo; private final boolean hasPrimaryKey; private final int numBuckets; - private final RowType keyRowType; // decode the lookup bytes to result row private final ValueDecoder kvValueDecoder; - // a getter to extract partition from key row, null when it's not a partitioned primary key - // table - private final @Nullable PartitionGetter keyRowPartitionGetter; - - private final KeyEncoder bucketKeyEncoder; - private final KeyEncoder primaryKeyEncoder; private final Supplier writerSupplier; private final Supplier lookupClientSupplier; @@ -124,8 +110,6 @@ public class FlussTable implements Table { private volatile RemoteFileDownloader remoteFileDownloader; private volatile SecurityTokenManager securityTokenManager; - private @Nullable LakeTableBucketAssigner lakeTableBucketAssigner; - public FlussTable( Configuration conf, TablePath tablePath, @@ -145,33 +129,16 @@ public FlussTable( metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath)); this.tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath); - this.tableId = tableInfo.getTableId(); TableDescriptor tableDescriptor = tableInfo.getTableDescriptor(); - Schema schema = tableDescriptor.getSchema(); - RowType rowType = schema.toRowType(); + RowType rowType = tableDescriptor.getSchema().toRowType(); this.hasPrimaryKey = tableDescriptor.hasPrimaryKey(); this.numBuckets = metadataUpdater.getBucketCount(tablePath); - this.keyRowType = getKeyRowType(schema, schema.getPrimaryKeyIndexes()); - this.keyRowPartitionGetter = - tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey() - ? new PartitionGetter(keyRowType, tableDescriptor.getPartitionKeys()) - : null; this.closed = new AtomicBoolean(false); this.kvValueDecoder = new ValueDecoder( RowDecoder.create( tableDescriptor.getKvFormat(), rowType.getChildren().toArray(new DataType[0]))); - - this.primaryKeyEncoder = - KeyEncoder.createKeyEncoder( - keyRowType, keyRowType.getFieldNames(), tableDescriptor.getPartitionKeys()); - int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes(); - if (bucketKeyIndexes.length != 0) { - this.bucketKeyEncoder = new KeyEncoder(getKeyRowType(schema, bucketKeyIndexes)); - } else { - this.bucketKeyEncoder = primaryKeyEncoder; - } } @Override @@ -179,74 +146,6 @@ public TableDescriptor getDescriptor() { return tableInfo.getTableDescriptor(); } - @Override - public CompletableFuture lookup(InternalRow key) { - if (!hasPrimaryKey) { - throw new FlussRuntimeException( - String.format("none-pk table %s not support lookup()", tablePath)); - } - // encoding the key row using a compacted way consisted with how the key is encoded when put - // a row - byte[] pkBytes = primaryKeyEncoder.encode(key); - byte[] bkBytes = bucketKeyEncoder.encode(key); - Long partitionId = keyRowPartitionGetter == null ? null : getPartitionId(key); - int bucketId = getBucketId(bkBytes, key); - TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); - return lookupClientSupplier - .get() - .lookup(tableBucket, pkBytes) - .thenApply( - valueBytes -> { - InternalRow row = - valueBytes == null - ? null - : kvValueDecoder.decodeValue(valueBytes).row; - return new LookupResult(row); - }); - } - - @Override - public CompletableFuture prefixLookup(InternalRow bucketKey) { - if (!hasPrimaryKey) { - throw new FlussRuntimeException( - String.format("None-pk table %s don't support prefix lookup", tablePath)); - } - // TODO: add checks the bucket key is prefix of primary key - - byte[] prefixKeyBytes = bucketKeyEncoder.encode(bucketKey); - int bucketId = getBucketId(prefixKeyBytes, bucketKey); - return lookupClientSupplier - .get() - .prefixLookup(tableId, bucketId, prefixKeyBytes) - .thenApply( - result -> { - List rowList = new ArrayList<>(); - for (byte[] valueBytes : result) { - rowList.add( - valueBytes == null - ? null - : kvValueDecoder.decodeValue(valueBytes).row); - } - return new PrefixLookupResult(rowList); - }); - } - - private int getBucketId(byte[] keyBytes, InternalRow key) { - if (!tableInfo.getTableDescriptor().isDataLakeEnabled()) { - return HashBucketAssigner.bucketForRowKey(keyBytes, numBuckets); - } else { - if (lakeTableBucketAssigner == null) { - lakeTableBucketAssigner = - new LakeTableBucketAssigner( - keyRowType, - tableInfo.getTableDescriptor().getBucketKey(), - numBuckets); - } - return lakeTableBucketAssigner.assignBucket( - keyBytes, key, metadataUpdater.getCluster()); - } - } - @Override public CompletableFuture> limitScan( TableBucket tableBucket, int limit, @Nullable int[] projectedFields) { @@ -361,28 +260,6 @@ private void addScanRecord( } } - /** - * Return the id of the partition the row belongs to. It'll try to update the metadata if the - * partition doesn't exist. If the partition doesn't exist yet after update metadata, it'll - * throw {@link PartitionNotExistException}. - */ - private Long getPartitionId(InternalRow row) { - Preconditions.checkNotNull(keyRowPartitionGetter, "partitionGetter shouldn't be null."); - String partitionName = keyRowPartitionGetter.getPartition(row); - PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); - metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath); - return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath); - } - - private RowType getKeyRowType(Schema schema, int[] keyIndexes) { - List keyRowFields = new ArrayList<>(keyIndexes.length); - List rowFields = schema.toRowType().getFields(); - for (int index : keyIndexes) { - keyRowFields.add(rowFields.get(index)); - } - return new RowType(keyRowFields); - } - @Override public AppendWriter getAppendWriter() { if (hasPrimaryKey) { @@ -439,6 +316,23 @@ public SnapshotScanner getSnapshotScanner(SnapshotScan snapshotScan) { snapshotScan); } + @Override + public Lookuper getLookuper() { + return new FlussLookuper( + tableInfo, numBuckets, metadataUpdater, lookupClientSupplier.get(), kvValueDecoder); + } + + @Override + public PrefixLookuper getPrefixLookuper(PrefixLookup prefixLookup) { + return new FlussPrefixLookuper( + tableInfo, + numBuckets, + metadataUpdater, + lookupClientSupplier.get(), + prefixLookup.getLookupColumnNames(), + kvValueDecoder); + } + @Override public void close() throws Exception { if (closed.compareAndSet(false, true)) { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java index 62fd8ef18..ac3f96b38 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java @@ -18,8 +18,9 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.client.Connection; -import com.alibaba.fluss.client.lookup.LookupResult; -import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -30,7 +31,6 @@ import com.alibaba.fluss.client.table.writer.UpsertWriter; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; -import com.alibaba.fluss.row.InternalRow; import javax.annotation.Nullable; @@ -57,35 +57,6 @@ public interface Table extends AutoCloseable { */ TableDescriptor getDescriptor(); - /** - * Lookups certain row from the given table primary keys. - * - * @param key the given table primary keys. - * @return the result of get. - */ - CompletableFuture lookup(InternalRow key); - - /** - * Prefix lookup certain rows from the given table by prefix key. - * - *

Only available for Primary Key Table. Will throw exception when the table isn't a Primary - * Key Table. - * - *

Note: Currently, if you want to use prefix lookup, the table you created must both define - * the primary key and the bucket key, in addition, the bucket key needs to be part of the - * primary key and must be a prefix of the primary key. For example, if a table has fields - * [a,b,c,d], and the primary key is set to [a, b, c], with the bucket key set to [a, b], then - * the prefix schema would also be [a, b]. This pattern can use PrefixLookup to lookup by prefix - * scan. - * - *

TODO: currently, the interface only support bucket key as the prefix key to lookup. - * Generalize the prefix lookup to support any prefix key including bucket key. - * - * @param bucketKey the given bucket key to do prefix lookup. - * @return the result of prefix lookup. - */ - CompletableFuture prefixLookup(InternalRow bucketKey); - /** * Extracts limit number of rows from the given table bucket. * @@ -135,4 +106,19 @@ CompletableFuture> limitScan( * @return the {@link SnapshotScanner} to scan data from this table. */ SnapshotScanner getSnapshotScanner(SnapshotScan snapshotScan); + + /** + * Get a {@link Lookuper} to do lookup. + * + * @return the {@link Lookuper} to do lookup. + */ + Lookuper getLookuper(); + + /** + * Get a {@link PrefixLookuper} to do prefix lookup. + * + * @param prefixLookup the given prefix lookup. + * @return the {@link PrefixLookuper} to do prefix lookup. + */ + PrefixLookuper getPrefixLookuper(PrefixLookup prefixLookup); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java index fe84f76e0..6a4452a12 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java @@ -42,8 +42,8 @@ public class UpsertWriter extends TableWriter { private final KeyEncoder primaryKeyEncoder; - // null if the bucket key is the same to the primary key - private final @Nullable KeyEncoder bucketKeyEncoder; + // same to primaryKeyEncoder if the bucket key is the same to the primary key + private final KeyEncoder bucketKeyEncoder; private final @Nullable int[] targetColumns; public UpsertWriter( @@ -66,7 +66,7 @@ public UpsertWriter( tableDescriptor.getPartitionKeys()); if (tableDescriptor.isDefaultBucketKey()) { - this.bucketKeyEncoder = null; + this.bucketKeyEncoder = primaryKeyEncoder; } else { int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes(); this.bucketKeyEncoder = new KeyEncoder(rowType, bucketKeyIndexes); @@ -121,7 +121,8 @@ private static void sanityCheck(Schema schema, @Nullable int[] targetColumns) { */ public CompletableFuture upsert(InternalRow row) { byte[] key = primaryKeyEncoder.encode(row); - byte[] bucketKey = bucketKeyEncoder != null ? bucketKeyEncoder.encode(row) : key; + byte[] bucketKey = + bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encode(row); return send( new WriteRecord( getPhysicalPath(row), WriteKind.PUT, key, bucketKey, row, targetColumns)); @@ -136,7 +137,8 @@ public CompletableFuture upsert(InternalRow row) { */ public CompletableFuture delete(InternalRow row) { byte[] key = primaryKeyEncoder.encode(row); - byte[] bucketKey = bucketKeyEncoder != null ? bucketKeyEncoder.encode(row) : key; + byte[] bucketKey = + bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encode(row); return send( new WriteRecord( getPhysicalPath(row), diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java index e4d381dbf..f2e1d765d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java @@ -16,8 +16,16 @@ package com.alibaba.fluss.client.utils; +import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; +import com.alibaba.fluss.client.metadata.MetadataUpdater; +import com.alibaba.fluss.client.table.getter.PartitionGetter; +import com.alibaba.fluss.client.write.HashBucketAssigner; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.exception.IllegalConfigurationException; +import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.InternalRow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +36,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + /** Utils for Fluss Client. */ public final class ClientUtils { @@ -105,4 +115,36 @@ public static Integer getPort(String address) { Matcher matcher = HOST_PORT_PATTERN.matcher(address); return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null; } + + /** + * Return the id of the partition the row belongs to. It'll try to update the metadata if the + * partition doesn't exist. If the partition doesn't exist yet after update metadata, it'll + * throw {@link PartitionNotExistException}. + */ + public static Long getPartitionId( + InternalRow row, + PartitionGetter partitionGetter, + TablePath tablePath, + MetadataUpdater metadataUpdater) { + checkNotNull(partitionGetter, "partitionGetter shouldn't be null."); + String partitionName = partitionGetter.getPartition(row); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); + metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath); + return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath); + } + + public static int getBucketId( + byte[] keyBytes, + InternalRow key, + LakeTableBucketAssigner lakeTableBucketAssigner, + boolean isDataLakeEnable, + int numBuckets, + MetadataUpdater metadataUpdater) { + if (!isDataLakeEnable) { + return HashBucketAssigner.bucketForRowKey(keyBytes, numBuckets); + } else { + return lakeTableBucketAssigner.assignBucket( + keyBytes, key, metadataUpdater.getCluster()); + } + } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java index 3de534c53..84096cce9 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java @@ -20,19 +20,23 @@ import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.admin.OffsetSpec.LatestSpec; import com.alibaba.fluss.client.admin.OffsetSpec.TimestampSpec; +import com.alibaba.fluss.client.lookup.Lookuper; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; import com.alibaba.fluss.client.scanner.log.ScanRecords; import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.writer.UpsertWriter; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.indexed.IndexedRow; import com.alibaba.fluss.server.testutils.FlussClusterExtension; import com.alibaba.fluss.types.RowType; @@ -48,6 +52,8 @@ import java.util.List; import java.util.Map; +import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; +import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; import static com.alibaba.fluss.testutils.InternalRowAssert.assertThatRow; import static org.assertj.core.api.Assertions.assertThat; @@ -237,4 +243,23 @@ protected static void verifyRows( } } } + + protected static void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) + throws Exception { + // put data. + InternalRow row = compactedRow(tableSchema.toRowType(), fields); + UpsertWriter upsertWriter = table.getUpsertWriter(); + // put data. + upsertWriter.upsert(row); + upsertWriter.flush(); + // lookup this key. + Lookuper lookuper = table.getLookuper(); + IndexedRow keyRow = keyRow(tableSchema, fields); + assertThat(lookupRow(lookuper, keyRow)).isEqualTo(row); + } + + protected static InternalRow lookupRow(Lookuper lookuper, IndexedRow keyRow) throws Exception { + // lookup this key. + return lookuper.lookup(keyRow).get().getRow(); + } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java index 93fc48ee5..04d7299a8 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java @@ -59,7 +59,7 @@ void testDrainMaxBatchSize() throws Exception { private static void appendLookups(LookupQueue queue, int count) { for (int i = 0; i < count; i++) { - queue.appendLookup(new Lookup(new TableBucket(1, 1), new byte[] {0})); + queue.appendLookup(new LookupQuery(new TableBucket(1, 1), new byte[] {0})); } } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java index 09e16589d..c1ddbf3a4 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java @@ -17,6 +17,10 @@ package com.alibaba.fluss.client.table; import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; +import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -33,17 +37,23 @@ import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.types.RowType; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; import static com.alibaba.fluss.testutils.DataTestUtils.row; @@ -75,13 +85,14 @@ void testPartitionedPrimaryKeyTable() throws Exception { } upsertWriter.flush(); + Lookuper lookuper = table.getLookuper(); // now, let's lookup the written data by look up for (String partition : partitionIdByNames.keySet()) { for (int i = 0; i < recordsPerPartition; i++) { InternalRow actualRow = compactedRow(schema.toRowType(), new Object[] {i, "a" + i, partition}); InternalRow lookupRow = - table.lookup(keyRow(schema, new Object[] {i, null, partition})) + lookuper.lookup(keyRow(schema, new Object[] {i, null, partition})) .get() .getRow(); assertThat(lookupRow).isEqualTo(actualRow); @@ -92,6 +103,105 @@ void testPartitionedPrimaryKeyTable() throws Exception { verifyPartitionLogs(table, schema.toRowType(), expectPutRows); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testPartitionedTablePrefixLookup(boolean isDataLakeEnabled) throws Exception { + // This case partition key 'b' in both pk and bucket key (prefix key). + TablePath tablePath = TablePath.of("test_db_1", "test_partitioned_table_prefix_lookup"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(3, "a", "c") + .partitionedBy("b") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.YEAR) + // test data lake bucket assigner for prefix lookup + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, isDataLakeEnabled) + .build(); + RowType rowType = schema.toRowType(); + createTable(tablePath, descriptor, false); + Map partitionIdByNames = + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); + + Table table = conn.getTable(tablePath); + for (String partition : partitionIdByNames.keySet()) { + verifyPutAndLookup(table, schema, new Object[] {1, partition, 1L, "value1"}); + verifyPutAndLookup(table, schema, new Object[] {1, partition, 2L, "value2"}); + } + + for (int i = 0; i < 3; i++) { + // test prefix lookups with partition field (b) in different + // position of the lookup columns + List lookupColumns = + i == 0 + ? Arrays.asList("b", "a", "c") + : i == 1 ? Arrays.asList("a", "b", "c") : Arrays.asList("a", "c", "b"); + RowType prefixKeyRowType = rowType.project(schema.getColumnIndexes(lookupColumns)); + PrefixLookuper prefixLookuper = + table.getPrefixLookuper(new PrefixLookup(lookupColumns)); + for (String partition : partitionIdByNames.keySet()) { + Object[] lookupRow = + i == 0 + ? new Object[] {partition, 1, 1L} + : i == 1 + ? new Object[] {1, partition, 1L} + : new Object[] {1, 1L, partition}; + CompletableFuture result = + prefixLookuper.prefixLookup(compactedRow(prefixKeyRowType, lookupRow)); + PrefixLookupResult prefixLookupResult = result.get(); + assertThat(prefixLookupResult).isNotNull(); + List rowList = prefixLookupResult.getRowList(); + assertThat(rowList.size()).isEqualTo(1); + assertRowValueEquals( + rowType, rowList.get(0), new Object[] {1, partition, 1L, "value1"}); + } + } + } + + @Test + void testInvalidPrefixLookupForPartitionedTable() throws Exception { + // This case partition key 'c' only in pk but not in prefix key. + TablePath tablePath = TablePath.of("test_db_1", "test_partitioned_table_prefix_lookup2"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.BIGINT()) + .column("c", DataTypes.STRING()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(3, "a", "b") + .partitionedBy("c") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.YEAR) + .build(); + createTable(tablePath, descriptor, false); + + Table table = conn.getTable(tablePath); + + // test prefix lookup with (a, b). + assertThatThrownBy(() -> table.getPrefixLookuper(new PrefixLookup(Arrays.asList("a", "b")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Can not perform prefix lookup on table 'test_db_1.test_partitioned_table_prefix_lookup2', " + + "because the lookup columns [a, b] must contain all partition fields [c]."); + } + @Test void testPartitionedLogTable() throws Exception { Schema schema = createPartitionedTable(DATA1_TABLE_PATH, false); @@ -196,11 +306,12 @@ private Map> pollRecords( void testOperateNotExistPartitionShouldThrowException() throws Exception { Schema schema = createPartitionedTable(DATA1_TABLE_PATH_PK, true); Table table = conn.getTable(DATA1_TABLE_PATH_PK); + Lookuper lookuper = table.getLookuper(); // test get for a not exist partition assertThatThrownBy( () -> - table.lookup( + lookuper.lookup( keyRow( schema, new Object[] { diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index e4cd9ca9c..ffd56890e 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -19,7 +19,10 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.PrefixLookuper; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -60,6 +63,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -247,9 +251,11 @@ void testPutAndPrefixLookup() throws Exception { .column("a", DataTypes.INT()) .column("b", DataTypes.STRING()) .build(); + RowType prefixKeyRowType = prefixKeySchema.toRowType(); + PrefixLookuper prefixLookuper = + table.getPrefixLookuper(new PrefixLookup(prefixKeyRowType.getFieldNames())); CompletableFuture result = - table.prefixLookup( - compactedRow(prefixKeySchema.toRowType(), new Object[] {1, "a"})); + prefixLookuper.prefixLookup(compactedRow(prefixKeyRowType, new Object[] {1, "a"})); PrefixLookupResult prefixLookupResult = result.get(); assertThat(prefixLookupResult).isNotNull(); List rowList = prefixLookupResult.getRowList(); @@ -259,24 +265,76 @@ void testPutAndPrefixLookup() throws Exception { rowType, rowList.get(i), new Object[] {1, "a", i + 1L, "value" + (i + 1)}); } - result = - table.prefixLookup( - compactedRow(prefixKeySchema.toRowType(), new Object[] {2, "a"})); + result = prefixLookuper.prefixLookup(compactedRow(prefixKeyRowType, new Object[] {2, "a"})); prefixLookupResult = result.get(); assertThat(prefixLookupResult).isNotNull(); rowList = prefixLookupResult.getRowList(); assertThat(rowList.size()).isEqualTo(1); assertRowValueEquals(rowType, rowList.get(0), new Object[] {2, "a", 4L, "value4"}); - result = - table.prefixLookup( - compactedRow(prefixKeySchema.toRowType(), new Object[] {3, "a"})); + result = prefixLookuper.prefixLookup(compactedRow(prefixKeyRowType, new Object[] {3, "a"})); prefixLookupResult = result.get(); assertThat(prefixLookupResult).isNotNull(); rowList = prefixLookupResult.getRowList(); assertThat(rowList.size()).isEqualTo(0); } + @Test + void testInvalidPrefixLookup() throws Exception { + // First, test the bucket keys not a prefix subset of primary keys. + TablePath tablePath = TablePath.of("test_db_1", "test_invalid_prefix_lookup_1"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.BIGINT()) + .column("c", DataTypes.STRING()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(3, "a", "c").build(); + createTable(tablePath, descriptor, false); + Table table = conn.getTable(tablePath); + + assertThatThrownBy(() -> table.getPrefixLookuper(new PrefixLookup(Arrays.asList("a", "c")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Can not perform prefix lookup on table 'test_db_1.test_invalid_prefix_lookup_1', " + + "because the bucket keys [a, c] is not a prefix subset of the " + + "physical primary keys [a, b, c] (excluded partition fields if present)."); + + // Second, test the lookup column names in PrefixLookup not a subset of primary keys. + tablePath = TablePath.of("test_db_1", "test_invalid_prefix_lookup_2"); + schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + + descriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a", "b").build(); + createTable(tablePath, descriptor, true); + Table table2 = conn.getTable(tablePath); + + // not match bucket key + assertThatThrownBy( + () -> table2.getPrefixLookuper(new PrefixLookup(Arrays.asList("a", "d")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Can not perform prefix lookup on table 'test_db_1.test_invalid_prefix_lookup_2', " + + "because the lookup columns [a, d] must contain all bucket keys [a, b] in order."); + + // wrong bucket key order + assertThatThrownBy( + () -> table2.getPrefixLookuper(new PrefixLookup(Arrays.asList("b", "a")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Can not perform prefix lookup on table 'test_db_1.test_invalid_prefix_lookup_2', " + + "because the lookup columns [b, a] must contain all bucket keys [a, b] in order."); + } + @Test void testLookupForNotReadyTable() throws Exception { TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1"); @@ -288,7 +346,8 @@ void testLookupForNotReadyTable() throws Exception { // if you want to test the lookup for not ready table, you can comment the following line. waitAllReplicasReady(tableId, descriptor); Table table = conn.getTable(tablePath); - assertThat(lookupRow(table, rowKey)).isNull(); + Lookuper lookuper = table.getLookuper(); + assertThat(lookupRow(lookuper, rowKey)).isNull(); } @Test @@ -397,23 +456,6 @@ void testLimitScanLogTable() throws Exception { } } - void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) throws Exception { - // put data. - InternalRow row = compactedRow(tableSchema.toRowType(), fields); - UpsertWriter upsertWriter = table.getUpsertWriter(); - // put data. - upsertWriter.upsert(row); - upsertWriter.flush(); - // lookup this key. - IndexedRow keyRow = keyRow(tableSchema, fields); - assertThat(lookupRow(table, keyRow)).isEqualTo(row); - } - - private InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception { - // lookup this key. - return table.lookup(keyRow).get().getRow(); - } - @Test void testPartialPutAndDelete() throws Exception { Schema schema = @@ -439,10 +481,11 @@ void testPartialPutAndDelete() throws Exception { upsertWriter .upsert(compactedRow(schema.toRowType(), new Object[] {1, "aaa", null, null})) .get(); + Lookuper lookuper = table.getLookuper(); // check the row IndexedRow rowKey = row(pkRowType, new Object[] {1}); - assertThat(lookupRow(table, rowKey)) + assertThat(lookupRow(lookuper, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "aaa", 1, true})); // partial update columns columns: a,b,c @@ -453,14 +496,14 @@ void testPartialPutAndDelete() throws Exception { .get(); // lookup the row - assertThat(lookupRow(table, rowKey)) + assertThat(lookupRow(lookuper, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, true})); // test partial delete, target column is a,b,c upsertWriter .delete(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, null})) .get(); - assertThat(lookupRow(table, rowKey)) + assertThat(lookupRow(lookuper, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, null, null, true})); // partial delete, target column is d @@ -471,7 +514,7 @@ void testPartialPutAndDelete() throws Exception { .get(); // the row should be deleted, shouldn't get the row again - assertThat(lookupRow(table, rowKey)).isNull(); + assertThat(lookupRow(lookuper, rowKey)).isNull(); table.close(); } @@ -524,15 +567,16 @@ void testDelete() throws Exception { try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) { UpsertWriter upsertWriter = table.getUpsertWriter(); upsertWriter.upsert(row).get(); + Lookuper lookuper = table.getLookuper(); // lookup this key. IndexedRow keyRow = keyRow(DATA1_SCHEMA_PK, new Object[] {1, "a"}); - assertThat(lookupRow(table, keyRow)).isEqualTo(row); + assertThat(lookupRow(lookuper, keyRow)).isEqualTo(row); // delete this key. upsertWriter.delete(row).get(); // lookup this key again, will return null. - assertThat(lookupRow(table, keyRow)).isNull(); + assertThat(lookupRow(lookuper, keyRow)).isNull(); } } @@ -889,6 +933,9 @@ void testFirstRowMergeEngine() throws Exception { .build(); RowType rowType = DATA1_SCHEMA_PK.toRowType(); createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false); + + RowType lookupRowType = RowType.of(DataTypes.INT()); + int rows = 5; int duplicateNum = 3; try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) { @@ -903,12 +950,11 @@ void testFirstRowMergeEngine() throws Exception { } upsertWriter.flush(); + Lookuper lookuper = table.getLookuper(); // now, get rows by lookup for (int id = 0; id < rows; id++) { InternalRow gotRow = - table.lookup(keyRow(DATA1_SCHEMA_PK, new Object[] {id, "dumpy"})) - .get() - .getRow(); + lookuper.lookup(row(lookupRowType, new Object[] {id})).get().getRow(); assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id)); } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java index 4e60ae249..35f143f76 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/Schema.java @@ -125,6 +125,15 @@ public List getColumnNames(int[] columnIndexes) { return columnNames; } + /** Returns the indexes of the fields in the schema. */ + public int[] getColumnIndexes(List keyNames) { + int[] keyIndexes = new int[keyNames.size()]; + for (int i = 0; i < keyNames.size(); i++) { + keyIndexes[i] = rowType.getFieldIndex(keyNames.get(i)); + } + return keyIndexes; + } + @Override public String toString() { final List components = new ArrayList<>(columns); @@ -491,4 +500,13 @@ private static Set duplicate(List names) { .filter(name -> Collections.frequency(names, name) > 1) .collect(Collectors.toSet()); } + + public static RowType getKeyRowType(Schema schema, int[] keyIndexes) { + List keyRowFields = new ArrayList<>(keyIndexes.length); + List rowFields = schema.toRowType().getFields(); + for (int index : keyIndexes) { + keyRowFields.add(rowFields.get(index)); + } + return new RowType(keyRowFields); + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 161d89013..6c1abe631 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -23,7 +23,6 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.ConfigurationUtils; -import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.AutoPartitionStrategy; import com.alibaba.fluss.utils.Preconditions; import com.alibaba.fluss.utils.json.JsonSerdeUtils; @@ -113,8 +112,8 @@ private TableDescriptor( f)); } - if (tableDistribution != null) { - tableDistribution + if (this.tableDistribution != null) { + this.tableDistribution .getBucketKeys() .forEach( f -> @@ -187,13 +186,7 @@ public List getBucketKey() { * Returns the indexes of the bucket key fields in the schema, empty if no bucket key is set. */ public int[] getBucketKeyIndexes() { - List bucketKey = getBucketKey(); - RowType rowType = schema.toRowType(); - int[] bucketKeyIndex = new int[bucketKey.size()]; - for (int i = 0; i < bucketKey.size(); i++) { - bucketKeyIndex[i] = rowType.getFieldIndex(bucketKey.get(i)); - } - return bucketKeyIndex; + return schema.getColumnIndexes(getBucketKey()); } /** diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java index ae53093d7..461306b67 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -18,9 +18,6 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; -import com.alibaba.fluss.client.lookup.LookupResult; -import com.alibaba.fluss.client.lookup.LookupType; -import com.alibaba.fluss.client.lookup.PrefixLookupResult; import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.connector.flink.source.lookup.LookupNormalizer.RemainingFilter; @@ -61,12 +58,12 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { private final RowType flinkRowType; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; - private final LookupType flussLookupType; private transient FlinkRowToFlussRowConverter flinkRowToFlussRowConverter; private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter; private transient Connection connection; private transient Table table; + private transient UnifiedLookuper lookuper; public FlinkAsyncLookupFunction( Configuration flussConfig, @@ -81,7 +78,6 @@ public FlinkAsyncLookupFunction( this.flinkRowType = flinkRowType; this.lookupNormalizer = lookupNormalizer; this.projection = projection; - this.flussLookupType = lookupNormalizer.getLookupType(); } @Override @@ -91,10 +87,10 @@ public void open(FunctionContext context) { table = connection.getTable(tablePath); // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); + RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes); flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes), - table.getDescriptor().getKvFormat()); + lookupKeyRowType, table.getDescriptor().getKvFormat()); final RowType outputRowType; if (projection == null) { @@ -104,6 +100,8 @@ public void open(FunctionContext context) { } flussRowToFlinkRowConverter = new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType)); + lookuper = UnifiedLookuper.of(lookupNormalizer.getLookupType(), lookupKeyRowType, table); + LOG.info("end open."); } @@ -136,38 +134,20 @@ private void fetchResult( int currentRetry, InternalRow keyRow, @Nullable RemainingFilter remainingFilter) { - if (flussLookupType == LookupType.LOOKUP) { - table.lookup(keyRow) - .whenComplete( - (result, throwable) -> { - if (throwable != null) { - handleLookupFailed( - resultFuture, - throwable, - currentRetry, - keyRow, - remainingFilter); - } else { - handleLookupSuccess(resultFuture, result, remainingFilter); - } - }); - } else { - table.prefixLookup(keyRow) - .whenComplete( - (result, throwable) -> { - if (throwable != null) { - handleLookupFailed( - resultFuture, - throwable, - currentRetry, - keyRow, - remainingFilter); - } else { - handlePrefixLookupSuccess( - resultFuture, result, remainingFilter); - } - }); - } + lookuper.lookup(keyRow) + .whenComplete( + (result, throwable) -> { + if (throwable != null) { + handleLookupFailed( + resultFuture, + throwable, + currentRetry, + keyRow, + remainingFilter); + } else { + handleLookupSuccess(resultFuture, result, remainingFilter); + } + }); } private void handleLookupFailed( @@ -201,27 +181,15 @@ private void handleLookupFailed( private void handleLookupSuccess( CompletableFuture> resultFuture, - LookupResult result, + List lookupResult, @Nullable RemainingFilter remainingFilter) { - InternalRow row = result.getRow(); - if (row == null) { + if (lookupResult.isEmpty()) { resultFuture.complete(Collections.emptyList()); - } else { - RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); - if (remainingFilter != null && !remainingFilter.isMatch(flinkRow)) { - resultFuture.complete(Collections.emptyList()); - } else { - resultFuture.complete(Collections.singletonList(flinkRow)); - } + return; } - } - private void handlePrefixLookupSuccess( - CompletableFuture> resultFuture, - PrefixLookupResult result, - @Nullable RemainingFilter remainingFilter) { List projectedRow = new ArrayList<>(); - for (InternalRow row : result.getRowList()) { + for (InternalRow row : lookupResult) { if (row != null) { RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java index 20788e53d..f63b1ad42 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java @@ -18,7 +18,6 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; -import com.alibaba.fluss.client.lookup.LookupType; import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.connector.flink.utils.FlinkConversions; @@ -55,12 +54,12 @@ public class FlinkLookupFunction extends LookupFunction { private final RowType flinkRowType; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; - private final LookupType flussLookupType; private transient FlinkRowToFlussRowConverter flinkRowToFlussRowConverter; private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter; private transient Connection connection; private transient Table table; + private transient UnifiedLookuper lookuper; @Nullable private transient ProjectedRow projectedRow; public FlinkLookupFunction( @@ -76,7 +75,6 @@ public FlinkLookupFunction( this.flinkRowType = flinkRowType; this.lookupNormalizer = lookupNormalizer; this.projection = projection; - this.flussLookupType = lookupNormalizer.getLookupType(); } @Override @@ -86,10 +84,10 @@ public void open(FunctionContext context) { table = connection.getTable(tablePath); // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); + RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes); flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes), - table.getDescriptor().getKvFormat()); + lookupKeyRowType, table.getDescriptor().getKvFormat()); final RowType outputRowType; if (projection == null) { @@ -102,6 +100,7 @@ public void open(FunctionContext context) { } flussRowToFlinkRowConverter = new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType)); + lookuper = UnifiedLookuper.of(lookupNormalizer.getLookupType(), lookupKeyRowType, table); LOG.info("end open."); } @@ -124,32 +123,21 @@ public Collection lookup(RowData keyRow) { InternalRow flussKeyRow = flinkRowToFlussRowConverter.toInternalRow(normalizedKeyRow); for (int retry = 0; retry <= maxRetryTimes; retry++) { try { - if (flussLookupType == LookupType.LOOKUP) { - InternalRow row = table.lookup(flussKeyRow).get().getRow(); + List lookupRows = lookuper.lookup(flussKeyRow).get(); + if (lookupRows.isEmpty()) { + return Collections.emptyList(); + } + List projectedRows = new ArrayList<>(); + for (InternalRow row : lookupRows) { if (row != null) { RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { - return Collections.singletonList(flinkRow); - } else { - return Collections.emptyList(); - } - } - } else { - List projectedRows = new ArrayList<>(); - List lookupRows = - table.prefixLookup(flussKeyRow).get().getRowList(); - for (InternalRow row : lookupRows) { - if (row != null) { - RowData flinkRow = - flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); - if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { - projectedRows.add(flinkRow); - } + projectedRows.add(flinkRow); } } - return projectedRows; } + return projectedRows; } catch (Exception e) { LOG.error(String.format("Fluss lookup error, retry times = %d", retry), e); if (retry >= maxRetryTimes) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/UnifiedLookuper.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/UnifiedLookuper.java new file mode 100644 index 000000000..a67404441 --- /dev/null +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/UnifiedLookuper.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.connector.flink.source.lookup; + +import com.alibaba.fluss.client.lookup.LookupType; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.lookup.PrefixLookup; +import com.alibaba.fluss.client.lookup.PrefixLookupResult; +import com.alibaba.fluss.client.lookup.PrefixLookuper; +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.row.InternalRow; + +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * A unified view of Fluss lookuper function to simplify the implementation of Flink lookup + * connector. + */ +public abstract class UnifiedLookuper { + + /** Lookups matched rows by the given key. */ + public abstract CompletableFuture> lookup(InternalRow key); + + /** Creates a unified lookup function to lookup rows by primary key or prefix key. */ + public static UnifiedLookuper of( + LookupType lookupType, RowType lookupKeyRowType, Table flussTable) { + if (lookupType == LookupType.LOOKUP) { + return new PrimaryKeyLookuper(flussTable.getLookuper()); + } else { + PrefixLookuper prefixLookuper = + flussTable.getPrefixLookuper( + new PrefixLookup(lookupKeyRowType.getFieldNames())); + return new PrefixKeyLookuper(prefixLookuper); + } + } + + /** An implementation that lookup using primary key of the table. */ + private static class PrimaryKeyLookuper extends UnifiedLookuper { + + private final Lookuper lookuper; + + private PrimaryKeyLookuper(Lookuper lookuper) { + this.lookuper = lookuper; + } + + @Override + public CompletableFuture> lookup(InternalRow key) { + return lookuper.lookup(key) + .thenApply( + result -> + result == null + ? Collections.emptyList() + : Collections.singletonList(result.getRow())); + } + } + + /** An implementation that lookup using prefix of primary key of the table. */ + private static class PrefixKeyLookuper extends UnifiedLookuper { + + private final PrefixLookuper lookuper; + + private PrefixKeyLookuper(PrefixLookuper lookuper) { + this.lookuper = lookuper; + } + + @Override + public CompletableFuture> lookup(InternalRow key) { + return lookuper.prefixLookup(key).thenApply(PrefixLookupResult::getRowList); + } + } +} diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java index 34450ba64..064a74d46 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java @@ -908,6 +908,27 @@ void testPrefixLookup(Caching caching, boolean async) throws Exception { assertResultsIgnoreOrder(collected, expected, true); } + @ParameterizedTest + @MethodSource("lookupArgs") + void testPrefixLookupPartitionedTable(Caching caching, boolean async) throws Exception { + String dim = + prepareDimTableAndSourceTable( + caching, + async, + new String[] {"name", "id"}, + new String[] {"name"}, + "p_date"); + String dimJoinQuery = + String.format( + "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" + + " ON src.b = h.name AND src.p_date = h.p_date", + dim); + + CloseableIterator collected = tEnv.executeSql(dimJoinQuery).collect(); + List expected = Arrays.asList("+I[1, name1, address1]", "+I[1, name1, address5]"); + assertResultsIgnoreOrder(collected, expected, true); + } + @ParameterizedTest @MethodSource("lookupArgs") void testPrefixLookupWithCondition(Caching caching, boolean async) throws Exception {