Skip to content

Commit

Permalink
[kv] Partition table support PrefixLookup
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Jan 20, 2025
1 parent 2658ea0 commit 7566cb3
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@ public CompletableFuture<byte[]> lookup(TableBucket tableBucket, byte[] keyBytes
return lookup.future();
}

public CompletableFuture<List<byte[]>> prefixLookup(
long tableId, int bucketId, byte[] keyBytes) {
// TODO prefix lookup support partition table (#266)
PrefixLookup prefixLookup = new PrefixLookup(new TableBucket(tableId, bucketId), keyBytes);
public CompletableFuture<List<byte[]>> prefixLookup(TableBucket tableBucket, byte[] keyBytes) {
PrefixLookup prefixLookup = new PrefixLookup(tableBucket, keyBytes);
lookupQueue.appendLookup(prefixLookup);
return prefixLookup.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import com.alibaba.fluss.rpc.messages.LimitScanResponse;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.types.DataField;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.CloseableIterator;
Expand All @@ -88,6 +87,8 @@
import java.util.function.Supplier;

import static com.alibaba.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
import static com.alibaba.fluss.metadata.Schema.getKeyRowType;
import static com.alibaba.fluss.utils.Preconditions.checkArgument;

/**
* The base impl of {@link Table}.
Expand All @@ -105,12 +106,15 @@ public class FlussTable implements Table {
private final TableInfo tableInfo;
private final boolean hasPrimaryKey;
private final int numBuckets;
private final RowType keyRowType;
private final RowType primaryKeyRowType;
private final @Nullable RowType bucketKeyRowType;
// decode the lookup bytes to result row
private final ValueDecoder kvValueDecoder;
// a getter to extract partition from key row, null when it's not a partitioned primary key
// table
private final @Nullable PartitionGetter keyRowPartitionGetter;
// a getter to extract partition from primary key row, null when it's not a partitioned primary
// key table
private final @Nullable PartitionGetter primaryKeyRowPartitionGetter;

private final List<String> partitionKeyNames;

private final KeyEncoder bucketKeyEncoder;
private final KeyEncoder primaryKeyEncoder;
Expand All @@ -125,6 +129,10 @@ public class FlussTable implements Table {
private volatile SecurityTokenManager securityTokenManager;

private @Nullable LakeTableBucketAssigner lakeTableBucketAssigner;
private volatile boolean prefixLookupInitialized;
// a getter to extract partition from bucket key row, null when it's not a partitioned primary
// key table.
private @Nullable PartitionGetter bucketKeyRowPartitionGetter;

public FlussTable(
Configuration conf,
Expand All @@ -151,10 +159,10 @@ public FlussTable(
RowType rowType = schema.toRowType();
this.hasPrimaryKey = tableDescriptor.hasPrimaryKey();
this.numBuckets = metadataUpdater.getBucketCount(tablePath);
this.keyRowType = getKeyRowType(schema, schema.getPrimaryKeyIndexes());
this.keyRowPartitionGetter =
this.primaryKeyRowType = getKeyRowType(schema, schema.getPrimaryKeyIndexes());
this.primaryKeyRowPartitionGetter =
tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey()
? new PartitionGetter(keyRowType, tableDescriptor.getPartitionKeys())
? new PartitionGetter(primaryKeyRowType, tableDescriptor.getPartitionKeys())
: null;
this.closed = new AtomicBoolean(false);
this.kvValueDecoder =
Expand All @@ -165,13 +173,27 @@ public FlussTable(

this.primaryKeyEncoder =
KeyEncoder.createKeyEncoder(
keyRowType, keyRowType.getFieldNames(), tableDescriptor.getPartitionKeys());
primaryKeyRowType,
primaryKeyRowType.getFieldNames(),
tableDescriptor.getPartitionKeys());
int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes();
if (bucketKeyIndexes.length != 0) {
this.bucketKeyEncoder = new KeyEncoder(getKeyRowType(schema, bucketKeyIndexes));
this.bucketKeyRowType = getKeyRowType(schema, bucketKeyIndexes);
this.bucketKeyEncoder =
KeyEncoder.createKeyEncoder(
bucketKeyRowType,
bucketKeyRowType.getFieldNames(),
tableDescriptor.getPartitionKeys());
} else {
this.bucketKeyRowType = null;
this.bucketKeyEncoder = primaryKeyEncoder;
}

this.partitionKeyNames =
tableDescriptor.isPartitioned()
? tableDescriptor.getPartitionKeys()
: new ArrayList<>();
prefixLookupInitialized = false;
}

@Override
Expand All @@ -189,7 +211,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow key) {
// a row
byte[] pkBytes = primaryKeyEncoder.encode(key);
byte[] bkBytes = bucketKeyEncoder.encode(key);
Long partitionId = keyRowPartitionGetter == null ? null : getPartitionId(key);
Long partitionId = primaryKeyRowPartitionGetter == null ? null : getPartitionId(key);
int bucketId = getBucketId(bkBytes, key);
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
return lookupClientSupplier
Expand All @@ -207,17 +229,20 @@ public CompletableFuture<LookupResult> lookup(InternalRow key) {

@Override
public CompletableFuture<PrefixLookupResult> prefixLookup(InternalRow bucketKey) {
if (!hasPrimaryKey) {
throw new FlussRuntimeException(
String.format("None-pk table %s don't support prefix lookup", tablePath));
}
// TODO: add checks the bucket key is prefix of primary key
maybeInitPrefixLookup();

byte[] prefixKeyBytes = bucketKeyEncoder.encode(bucketKey);
int bucketId = getBucketId(prefixKeyBytes, bucketKey);

Long partitionId = null;
if (bucketKeyRowPartitionGetter != null) {
partitionId = getPartitionId(bucketKey);
}

TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
return lookupClientSupplier
.get()
.prefixLookup(tableId, bucketId, prefixKeyBytes)
.prefixLookup(tableBucket, prefixKeyBytes)
.thenApply(
result -> {
List<InternalRow> rowList = new ArrayList<>();
Expand All @@ -238,7 +263,7 @@ private int getBucketId(byte[] keyBytes, InternalRow key) {
if (lakeTableBucketAssigner == null) {
lakeTableBucketAssigner =
new LakeTableBucketAssigner(
keyRowType,
primaryKeyRowType,
tableInfo.getTableDescriptor().getBucketKey(),
numBuckets);
}
Expand Down Expand Up @@ -367,22 +392,14 @@ private void addScanRecord(
* throw {@link PartitionNotExistException}.
*/
private Long getPartitionId(InternalRow row) {
Preconditions.checkNotNull(keyRowPartitionGetter, "partitionGetter shouldn't be null.");
String partitionName = keyRowPartitionGetter.getPartition(row);
Preconditions.checkNotNull(
primaryKeyRowPartitionGetter, "partitionGetter shouldn't be null.");
String partitionName = primaryKeyRowPartitionGetter.getPartition(row);
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);
metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath);
}

private RowType getKeyRowType(Schema schema, int[] keyIndexes) {
List<DataField> keyRowFields = new ArrayList<>(keyIndexes.length);
List<DataField> rowFields = schema.toRowType().getFields();
for (int index : keyIndexes) {
keyRowFields.add(rowFields.get(index));
}
return new RowType(keyRowFields);
}

@Override
public AppendWriter getAppendWriter() {
if (hasPrimaryKey) {
Expand Down Expand Up @@ -492,4 +509,52 @@ private void mayPrepareRemoteFileDownloader() {
}
}
}

private void maybeInitPrefixLookup() {
if (!prefixLookupInitialized) {
synchronized (this) {
if (!prefixLookupInitialized) {
if (!hasPrimaryKey) {
throw new FlussRuntimeException(
String.format(
"None-pk table %s don't support prefix lookup", tablePath));
}

checkArgument(bucketKeyRowType != null, "bucketKeyRowType shouldn't be null.");
List<String> primaryKeyNames = primaryKeyRowType.getFieldNames();
List<String> bucketKeyNames = bucketKeyRowType.getFieldNames();

for (int i = 0; i < bucketKeyNames.size(); i++) {
if (!bucketKeyNames.get(i).equals(primaryKeyNames.get(i))) {
throw new FlussRuntimeException(
String.format(
"To do prefix lookup, the bucket keys must be the prefix subset of "
+ "primary keys, but the bucket keys are %s and the primary "
+ "keys are %s for table %s",
bucketKeyNames, primaryKeyNames, tablePath));
}
}

partitionKeyNames.forEach(
partitionKey -> {
if (!bucketKeyNames.contains(partitionKey)) {
throw new FlussRuntimeException(
String.format(
"To do prefix lookup for partitioned primary key table, the "
+ "partition keys must be in bucket keys, but the bucket "
+ "keys are %s and the partition keys are %s for table %s",
bucketKeyNames, partitionKeyNames, tablePath));
}
});

this.bucketKeyRowPartitionGetter =
partitionKeyNames.size() > 0
? new PartitionGetter(bucketKeyRowType, partitionKeyNames)
: null;

prefixLookupInitialized = true;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;

import static com.alibaba.fluss.metadata.Schema.getKeyRowType;

/**
* The writer to write data to the primary key table.
*
Expand Down Expand Up @@ -69,7 +71,12 @@ public UpsertWriter(
this.bucketKeyEncoder = null;
} else {
int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes();
this.bucketKeyEncoder = new KeyEncoder(rowType, bucketKeyIndexes);
RowType bucketKeyRowType = getKeyRowType(schema, bucketKeyIndexes);
this.bucketKeyEncoder =
KeyEncoder.createKeyEncoder(
rowType,
bucketKeyRowType.getFieldNames(),
tableDescriptor.getPartitionKeys());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
import com.alibaba.fluss.client.scanner.log.LogScanner;
import com.alibaba.fluss.client.scanner.log.ScanRecords;
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.indexed.IndexedRow;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
import com.alibaba.fluss.types.RowType;

Expand All @@ -48,6 +51,8 @@
import java.util.List;
import java.util.Map;

import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow;
import static com.alibaba.fluss.testutils.DataTestUtils.keyRow;
import static com.alibaba.fluss.testutils.InternalRowAssert.assertThatRow;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -237,4 +242,22 @@ protected static void verifyRows(
}
}
}

protected static void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields)
throws Exception {
// put data.
InternalRow row = compactedRow(tableSchema.toRowType(), fields);
UpsertWriter upsertWriter = table.getUpsertWriter();
// put data.
upsertWriter.upsert(row);
upsertWriter.flush();
// lookup this key.
IndexedRow keyRow = keyRow(tableSchema, fields);
assertThat(lookupRow(table, keyRow)).isEqualTo(row);
}

protected static InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception {
// lookup this key.
return table.lookup(keyRow).get().getRow();
}
}
Loading

0 comments on commit 7566cb3

Please sign in to comment.