Skip to content

Commit

Permalink
[kv] Avoid duplicate bucket key encoding for Lookup and PrefixLookup (#…
Browse files Browse the repository at this point in the history
…317)

This also makes Lookuper and PrefixLookuper to initialize and validate for themselves. This makes logic much clean, and avoid mis-reuse for some variables (e.g., lakeTableBucketAssigner).

This also introduced a UnifiedLookuper in Flink connector to unify the different API of Lookuper and PrefixLookup. This makes the logic of AsyncLookupFunction and LookupFunction simpler.
  • Loading branch information
wuchong committed Jan 26, 2025
1 parent 02b547e commit 55d6e98
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 412 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ public Table getTable(TablePath tablePath) {
tablePath,
rpcClient,
metadataUpdater,
this::maybeCreateWriter,
this::maybeCreateLookupClient,
this::getOrCreateWriterClient,
this::getOrCreateLookupClient,
clientMetricGroup);
}

private WriterClient maybeCreateWriter() {
public WriterClient getOrCreateWriterClient() {
if (writerClient == null) {
synchronized (this) {
if (writerClient == null) {
Expand All @@ -94,7 +94,7 @@ private WriterClient maybeCreateWriter() {
return writerClient;
}

private LookupClient maybeCreateLookupClient() {
public LookupClient getOrCreateLookupClient() {
if (lookupClient == null) {
synchronized (this) {
if (lookupClient == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@
import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner;
import com.alibaba.fluss.client.metadata.MetadataUpdater;
import com.alibaba.fluss.client.table.getter.PartitionGetter;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.encode.KeyEncoder;
import com.alibaba.fluss.row.encode.ValueDecoder;
import com.alibaba.fluss.types.RowType;

import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

import static com.alibaba.fluss.client.utils.ClientUtils.getBucketId;
import static com.alibaba.fluss.client.utils.ClientUtils.getPartitionId;
import static com.alibaba.fluss.utils.Preconditions.checkArgument;

/**
* The default impl of {@link Lookuper}.
Expand All @@ -49,7 +53,10 @@ public class FlussLookuper implements Lookuper {

private final KeyEncoder primaryKeyEncoder;

/** Extract bucket key from lookup key row. */
/**
* Extract bucket key from lookup key row, use {@link #primaryKeyEncoder} if is default bucket
* key (bucket key = physical primary key).
*/
private final KeyEncoder bucketKeyEncoder;

private final boolean isDataLakeEnable;
Expand All @@ -69,20 +76,39 @@ public FlussLookuper(
int numBuckets,
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
KeyEncoder primaryKeyEncoder,
KeyEncoder bucketKeyEncoder,
LakeTableBucketAssigner lakeTableBucketAssigner,
@Nullable PartitionGetter partitionGetter,
ValueDecoder kvValueDecoder) {
checkArgument(
tableInfo.getTableDescriptor().hasPrimaryKey(),
"Log table %s doesn't support lookup",
tableInfo.getTablePath());
this.tableInfo = tableInfo;
this.numBuckets = numBuckets;
this.metadataUpdater = metadataUpdater;
this.lookupClient = lookupClient;
this.primaryKeyEncoder = primaryKeyEncoder;
this.bucketKeyEncoder = bucketKeyEncoder;

TableDescriptor tableDescriptor = tableInfo.getTableDescriptor();
Schema schema = tableDescriptor.getSchema();
RowType primaryKeyRowType = schema.toRowType().project(schema.getPrimaryKeyIndexes());
this.primaryKeyEncoder =
KeyEncoder.createKeyEncoder(
primaryKeyRowType,
primaryKeyRowType.getFieldNames(),
tableDescriptor.getPartitionKeys());
if (tableDescriptor.isDefaultBucketKey()) {
this.bucketKeyEncoder = primaryKeyEncoder;
} else {
// bucket key doesn't contain partition key, so no need exclude partition keys
this.bucketKeyEncoder =
new KeyEncoder(primaryKeyRowType, tableDescriptor.getBucketKeyIndexes());
}
this.isDataLakeEnable = tableInfo.getTableDescriptor().isDataLakeEnabled();
this.lakeTableBucketAssigner = lakeTableBucketAssigner;
this.partitionGetter = partitionGetter;
this.lakeTableBucketAssigner =
new LakeTableBucketAssigner(
primaryKeyRowType, tableDescriptor.getBucketKey(), numBuckets);
this.partitionGetter =
tableDescriptor.isPartitioned()
? new PartitionGetter(primaryKeyRowType, tableDescriptor.getPartitionKeys())
: null;
this.kvValueDecoder = kvValueDecoder;
}

Expand All @@ -91,7 +117,10 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
// encoding the key row using a compacted way consisted with how the key is encoded when put
// a row
byte[] pkBytes = primaryKeyEncoder.encode(lookupKey);
byte[] bkBytes = bucketKeyEncoder.encode(lookupKey);
byte[] bkBytes =
bucketKeyEncoder == primaryKeyEncoder
? pkBytes
: bucketKeyEncoder.encode(lookupKey);
Long partitionId =
partitionGetter == null
? null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@
import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner;
import com.alibaba.fluss.client.metadata.MetadataUpdater;
import com.alibaba.fluss.client.table.getter.PartitionGetter;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.encode.KeyEncoder;
import com.alibaba.fluss.row.encode.ValueDecoder;
import com.alibaba.fluss.types.RowType;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static com.alibaba.fluss.client.utils.ClientUtils.getBucketId;
Expand Down Expand Up @@ -71,27 +76,91 @@ public FlussPrefixLookuper(
int numBuckets,
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
KeyEncoder bucketKeyEncoder,
LakeTableBucketAssigner lakeTableBucketAssigner,
@Nullable PartitionGetter partitionGetter,
List<String> lookupColumnNames,
ValueDecoder kvValueDecoder) {
// sanity check
validatePrefixLookup(tableInfo, lookupColumnNames);
// initialization
this.tableInfo = tableInfo;
this.numBuckets = numBuckets;
this.metadataUpdater = metadataUpdater;
this.lookupClient = lookupClient;
TableDescriptor tableDescriptor = tableInfo.getTableDescriptor();
Schema schema = tableDescriptor.getSchema();
RowType prefixKeyRowType =
schema.toRowType().project(schema.getColumnIndexes(lookupColumnNames));
this.bucketKeyEncoder =
KeyEncoder.createKeyEncoder(
prefixKeyRowType,
tableDescriptor.getBucketKey(),
tableDescriptor.getPartitionKeys());
this.isDataLakeEnable = tableInfo.getTableDescriptor().isDataLakeEnabled();
this.lakeTableBucketAssigner = lakeTableBucketAssigner;
this.bucketKeyEncoder = bucketKeyEncoder;
this.partitionGetter = partitionGetter;
this.lakeTableBucketAssigner =
new LakeTableBucketAssigner(
prefixKeyRowType, tableDescriptor.getBucketKey(), numBuckets);
this.partitionGetter =
tableDescriptor.isPartitioned()
? new PartitionGetter(prefixKeyRowType, tableDescriptor.getPartitionKeys())
: null;
this.kvValueDecoder = kvValueDecoder;
}

private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumns) {
// verify is primary key table
Schema schema = tableInfo.getTableDescriptor().getSchema();
if (!schema.getPrimaryKey().isPresent()) {
throw new IllegalArgumentException(
String.format(
"Log table %s doesn't support prefix lookup",
tableInfo.getTablePath()));
}

// verify the bucket keys are the prefix subset of physical primary keys
List<String> physicalPrimaryKeys = schema.getPrimaryKey().get().getColumnNames();
physicalPrimaryKeys.removeAll(tableInfo.getTableDescriptor().getPartitionKeys());
List<String> bucketKeys = tableInfo.getTableDescriptor().getBucketKey();
for (int i = 0; i < bucketKeys.size(); i++) {
if (!bucketKeys.get(i).equals(physicalPrimaryKeys.get(i))) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the bucket keys %s is not a prefix subset of the "
+ "physical primary keys %s (excluded partition fields if present).",
tableInfo.getTablePath(), bucketKeys, physicalPrimaryKeys));
}
}

// verify the lookup columns must contain all partition fields if this is partitioned table
if (tableInfo.getTableDescriptor().isPartitioned()) {
List<String> partitionKeys = tableInfo.getTableDescriptor().getPartitionKeys();
Set<String> lookupColumnsSet = new HashSet<>(lookupColumns);
if (!lookupColumnsSet.containsAll(partitionKeys)) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the lookup columns %s must contain all partition fields %s.",
tableInfo.getTablePath(), lookupColumns, partitionKeys));
}
}

// verify the lookup columns must contain all bucket keys **in order**
List<String> physicalLookupColumns = new ArrayList<>(lookupColumns);
physicalLookupColumns.removeAll(tableInfo.getTableDescriptor().getPartitionKeys());
if (!physicalLookupColumns.equals(bucketKeys)) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the lookup columns %s must contain all bucket keys %s in order.",
tableInfo.getTablePath(), lookupColumns, bucketKeys));
}
}

@Override
public CompletableFuture<PrefixLookupResult> prefixLookup(InternalRow prefixKey) {
byte[] prefixKeyBytes = bucketKeyEncoder.encode(prefixKey);
byte[] bucketKeyBytes = bucketKeyEncoder.encode(prefixKey);
int bucketId =
getBucketId(
prefixKeyBytes,
bucketKeyBytes,
prefixKey,
lakeTableBucketAssigner,
isDataLakeEnable,
Expand All @@ -107,15 +176,15 @@ public CompletableFuture<PrefixLookupResult> prefixLookup(InternalRow prefixKey)

TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
return lookupClient
.prefixLookup(tableBucket, prefixKeyBytes)
.prefixLookup(tableBucket, bucketKeyBytes)
.thenApply(
result -> {
List<InternalRow> rowList = new ArrayList<>();
List<InternalRow> rowList = new ArrayList<>(result.size());
for (byte[] valueBytes : result) {
rowList.add(
valueBytes == null
? null
: kvValueDecoder.decodeValue(valueBytes).row);
if (valueBytes == null) {
continue;
}
rowList.add(kvValueDecoder.decodeValue(valueBytes).row);
}
return new PrefixLookupResult(rowList);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.CompletableFuture;

/**
* The lookup-er is used to lookup data of specify kv table from Fluss.
* The lookup-er is used to lookup row of a primary key table by primary key.
*
* @since 0.6
*/
Expand All @@ -33,7 +33,7 @@ public interface Lookuper {
* Lookups certain row from the given table primary keys.
*
* @param lookupKey the given table primary keys.
* @return the result of get.
* @return the result of lookup.
*/
CompletableFuture<LookupResult> lookup(InternalRow lookupKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,32 @@
import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.row.InternalRow;

import java.util.List;

/**
* Used to describe the operation to prefix lookup by {@link PrefixLookuper} to a kv table.
* Used to describe the operation to prefix lookup by {@link PrefixLookuper} to a primary key table.
*
* @since 0.6
*/
@PublicEvolving
public class PrefixLookup {

/**
* Currently, For none-partition table, the lookupColumnNames can only be the field of bucket
* Currently, For non-partitioned table, the lookupColumnNames can only be the field of bucket
* key.
*
* <p>For partition table, the lookupColumnNames exclude partition fields should be a prefix of
* primary key exclude partition fields.
* <p>For partitioned table, the lookupColumnNames exclude partition fields should be a prefix
* of primary key exclude partition fields.
*
* <p>See {@link PrefixLookuper#prefixLookup(InternalRow)} for more details.
*/
private final String[] lookupColumnNames;
private final List<String> lookupColumnNames;

public PrefixLookup(String[] lookupColumnNames) {
public PrefixLookup(List<String> lookupColumnNames) {
this.lookupColumnNames = lookupColumnNames;
}

public String[] getLookupColumnNames() {
public List<String> getLookupColumnNames() {
return lookupColumnNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import java.util.concurrent.CompletableFuture;

/**
* The prefix lookup-er is used to prefix lookup data of specify kv table from Fluss.
* The prefix lookup-er is used to lookup rows of a primary key table by a prefix of primary key.
*
* @since 0.6
*/
@PublicEvolving
public interface PrefixLookuper {

/**
* Prefix lookup certain rows from the given table by prefix key.
* Prefix lookup certain rows from the given table by a prefix of primary key.
*
* <p>Only available for Primary Key Table. Will throw exception when the table isn't a Primary
* Key Table.
Expand All @@ -45,7 +45,7 @@ public interface PrefixLookuper {
* <p>TODO: currently, the interface only support bucket key as the prefixKey to lookup.
* Generalize the prefix lookup to support any prefixKey including bucket key.
*
* <p>We also support prefix lookup for partition table. The schema of the prefixKey should
* <p>We also support prefix lookup for partitioned table. The schema of the prefixKey should
* contain partition fields and bucket key. In addition, the schema of the prefixKey exclude
* partition fields should be a prefix of primary key exclude partition fields.
*
Expand Down
Loading

0 comments on commit 55d6e98

Please sign in to comment.