Skip to content

Commit

Permalink
[kv] Partition table support PrefixLookup
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Jan 9, 2025
1 parent d30ea3f commit 7e9df81
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@ public CompletableFuture<byte[]> lookup(TableBucket tableBucket, byte[] keyBytes
return lookup.future();
}

public CompletableFuture<List<byte[]>> prefixLookup(
long tableId, int bucketId, byte[] keyBytes) {
// TODO prefix lookup support partition table (#266)
PrefixLookup prefixLookup = new PrefixLookup(new TableBucket(tableId, bucketId), keyBytes);
public CompletableFuture<List<byte[]>> prefixLookup(TableBucket tableBucket, byte[] keyBytes) {
PrefixLookup prefixLookup = new PrefixLookup(tableBucket, keyBytes);
lookupQueue.appendLookup(prefixLookup);
return prefixLookup.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import com.alibaba.fluss.rpc.messages.LimitScanResponse;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.types.DataField;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.CloseableIterator;
Expand All @@ -88,6 +87,7 @@
import java.util.function.Supplier;

import static com.alibaba.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
import static com.alibaba.fluss.metadata.Schema.getKeyRowType;

/**
* The base impl of {@link Table}.
Expand All @@ -105,12 +105,15 @@ public class FlussTable implements Table {
private final TableInfo tableInfo;
private final boolean hasPrimaryKey;
private final int numBuckets;
private final RowType keyRowType;
private final RowType primaryKeyRowType;
// decode the lookup bytes to result row
private final ValueDecoder kvValueDecoder;
// a getter to extract partition from key row, null when it's not a partitioned primary key
// table
private final @Nullable PartitionGetter keyRowPartitionGetter;
// a getter to extract partition from primary key row, null when it's not a partitioned primary
// key table
private final @Nullable PartitionGetter primaryKeyRowPartitionGetter;
// a getter to extract partition from bucket key row, null when it's not a partitioned primary
// key table.
private final @Nullable PartitionGetter bucketKeyRowPartitionGetter;

private final KeyEncoder bucketKeyEncoder;
private final KeyEncoder primaryKeyEncoder;
Expand Down Expand Up @@ -151,10 +154,10 @@ public FlussTable(
RowType rowType = schema.toRowType();
this.hasPrimaryKey = tableDescriptor.hasPrimaryKey();
this.numBuckets = metadataUpdater.getBucketCount(tablePath);
this.keyRowType = getKeyRowType(schema, schema.getPrimaryKeyIndexes());
this.keyRowPartitionGetter =
this.primaryKeyRowType = getKeyRowType(schema, schema.getPrimaryKeyIndexes());
this.primaryKeyRowPartitionGetter =
tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey()
? new PartitionGetter(keyRowType, tableDescriptor.getPartitionKeys())
? new PartitionGetter(primaryKeyRowType, tableDescriptor.getPartitionKeys())
: null;
this.closed = new AtomicBoolean(false);
this.kvValueDecoder =
Expand All @@ -165,12 +168,25 @@ public FlussTable(

this.primaryKeyEncoder =
KeyEncoder.createKeyEncoder(
keyRowType, keyRowType.getFieldNames(), tableDescriptor.getPartitionKeys());
primaryKeyRowType,
primaryKeyRowType.getFieldNames(),
tableDescriptor.getPartitionKeys());
int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes();
if (bucketKeyIndexes.length != 0) {
this.bucketKeyEncoder = new KeyEncoder(getKeyRowType(schema, bucketKeyIndexes));
RowType bucketKeyRowType = getKeyRowType(schema, bucketKeyIndexes);
this.bucketKeyEncoder =
KeyEncoder.createKeyEncoder(
bucketKeyRowType,
bucketKeyRowType.getFieldNames(),
tableDescriptor.getPartitionKeys());
this.bucketKeyRowPartitionGetter =
tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey()
? new PartitionGetter(
bucketKeyRowType, tableDescriptor.getPartitionKeys())
: null;
} else {
this.bucketKeyEncoder = primaryKeyEncoder;
this.bucketKeyRowPartitionGetter = primaryKeyRowPartitionGetter;
}
}

Expand All @@ -189,7 +205,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow key) {
// a row
byte[] pkBytes = primaryKeyEncoder.encode(key);
byte[] bkBytes = bucketKeyEncoder.encode(key);
Long partitionId = keyRowPartitionGetter == null ? null : getPartitionId(key);
Long partitionId = primaryKeyRowPartitionGetter == null ? null : getPartitionId(key);
int bucketId = getBucketId(bkBytes, key);
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
return lookupClientSupplier
Expand All @@ -215,9 +231,11 @@ public CompletableFuture<PrefixLookupResult> prefixLookup(InternalRow bucketKey)

byte[] prefixKeyBytes = bucketKeyEncoder.encode(bucketKey);
int bucketId = getBucketId(prefixKeyBytes, bucketKey);
Long partitionId = bucketKeyRowPartitionGetter == null ? null : getPartitionId(bucketKey);
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
return lookupClientSupplier
.get()
.prefixLookup(tableId, bucketId, prefixKeyBytes)
.prefixLookup(tableBucket, prefixKeyBytes)
.thenApply(
result -> {
List<InternalRow> rowList = new ArrayList<>();
Expand All @@ -238,7 +256,7 @@ private int getBucketId(byte[] keyBytes, InternalRow key) {
if (lakeTableBucketAssigner == null) {
lakeTableBucketAssigner =
new LakeTableBucketAssigner(
keyRowType,
primaryKeyRowType,
tableInfo.getTableDescriptor().getBucketKey(),
numBuckets);
}
Expand Down Expand Up @@ -367,22 +385,14 @@ private void addScanRecord(
* throw {@link PartitionNotExistException}.
*/
private Long getPartitionId(InternalRow row) {
Preconditions.checkNotNull(keyRowPartitionGetter, "partitionGetter shouldn't be null.");
String partitionName = keyRowPartitionGetter.getPartition(row);
Preconditions.checkNotNull(
primaryKeyRowPartitionGetter, "partitionGetter shouldn't be null.");
String partitionName = primaryKeyRowPartitionGetter.getPartition(row);
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);
metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath);
}

private RowType getKeyRowType(Schema schema, int[] keyIndexes) {
List<DataField> keyRowFields = new ArrayList<>(keyIndexes.length);
List<DataField> rowFields = schema.toRowType().getFields();
for (int index : keyIndexes) {
keyRowFields.add(rowFields.get(index));
}
return new RowType(keyRowFields);
}

@Override
public AppendWriter getAppendWriter() {
if (hasPrimaryKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;

import static com.alibaba.fluss.metadata.Schema.getKeyRowType;

/**
* The writer to write data to the primary key table.
*
Expand Down Expand Up @@ -69,7 +71,12 @@ public UpsertWriter(
this.bucketKeyEncoder = null;
} else {
int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes();
this.bucketKeyEncoder = new KeyEncoder(rowType, bucketKeyIndexes);
RowType bucketKeyRowType = getKeyRowType(schema, bucketKeyIndexes);
this.bucketKeyEncoder =
KeyEncoder.createKeyEncoder(
rowType,
bucketKeyRowType.getFieldNames(),
tableDescriptor.getPartitionKeys());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
import com.alibaba.fluss.client.scanner.log.LogScanner;
import com.alibaba.fluss.client.scanner.log.ScanRecords;
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.indexed.IndexedRow;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
import com.alibaba.fluss.types.RowType;

Expand All @@ -48,6 +51,8 @@
import java.util.List;
import java.util.Map;

import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow;
import static com.alibaba.fluss.testutils.DataTestUtils.keyRow;
import static com.alibaba.fluss.testutils.InternalRowAssert.assertThatRow;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -237,4 +242,22 @@ protected static void verifyRows(
}
}
}

protected static void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields)
throws Exception {
// put data.
InternalRow row = compactedRow(tableSchema.toRowType(), fields);
UpsertWriter upsertWriter = table.getUpsertWriter();
// put data.
upsertWriter.upsert(row);
upsertWriter.flush();
// lookup this key.
IndexedRow keyRow = keyRow(tableSchema, fields);
assertThat(lookupRow(table, keyRow)).isEqualTo(row);
}

protected static InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception {
// lookup this key.
return table.lookup(keyRow).get().getRow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.client.table;

import com.alibaba.fluss.client.admin.ClientToServerITCaseBase;
import com.alibaba.fluss.client.lookup.PrefixLookupResult;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.client.scanner.log.LogScan;
import com.alibaba.fluss.client.scanner.log.LogScanner;
Expand All @@ -33,6 +34,7 @@
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.types.DataTypes;
import com.alibaba.fluss.types.RowType;

import org.junit.jupiter.api.Test;

Expand All @@ -41,9 +43,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals;
import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow;
import static com.alibaba.fluss.testutils.DataTestUtils.keyRow;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
Expand Down Expand Up @@ -92,6 +96,59 @@ void testPartitionedPrimaryKeyTable() throws Exception {
verifyPartitionLogs(table, schema.toRowType(), expectPutRows);
}

@Test
void testPartitionedTablePrefixLookup() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_partitioned_table_prefix_lookup");
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.column("d", DataTypes.STRING())
.primaryKey("a", "b", "c")
.build();
TableDescriptor descriptor =
TableDescriptor.builder()
.schema(schema)
.distributedBy(3, "a", "b")
.partitionedBy("b")
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true)
.property(
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR)
.build();
RowType rowType = schema.toRowType();
createTable(tablePath, descriptor, false);
Map<String, Long> partitionIdByNames =
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath);

Table table = conn.getTable(tablePath);
for (String partition : partitionIdByNames.keySet()) {
verifyPutAndLookup(table, schema, new Object[] {1, partition, 1L, "value1"});
verifyPutAndLookup(table, schema, new Object[] {1, partition, 2L, "value2"});
}

// test prefix lookup.
Schema prefixKeySchema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.build();
for (String partition : partitionIdByNames.keySet()) {
CompletableFuture<PrefixLookupResult> result =
table.prefixLookup(
compactedRow(prefixKeySchema.toRowType(), new Object[] {1, partition}));
PrefixLookupResult prefixLookupResult = result.get();
assertThat(prefixLookupResult).isNotNull();
List<InternalRow> rowList = prefixLookupResult.getRowList();
assertThat(rowList.size()).isEqualTo(2);
assertRowValueEquals(
rowType, rowList.get(0), new Object[] {1, partition, 1L, "value1"});
assertRowValueEquals(
rowType, rowList.get(1), new Object[] {1, partition, 2L, "value2"});
}
}

@Test
void testPartitionedLogTable() throws Exception {
Schema schema = createPartitionedTable(DATA1_TABLE_PATH, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,23 +396,6 @@ void testLimitScanLogTable() throws Exception {
}
}

void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) throws Exception {
// put data.
InternalRow row = compactedRow(tableSchema.toRowType(), fields);
UpsertWriter upsertWriter = table.getUpsertWriter();
// put data.
upsertWriter.upsert(row);
upsertWriter.flush();
// lookup this key.
IndexedRow keyRow = keyRow(tableSchema, fields);
assertThat(lookupRow(table, keyRow)).isEqualTo(row);
}

private InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception {
// lookup this key.
return table.lookup(keyRow).get().getRow();
}

@Test
void testPartialPutAndDelete() throws Exception {
Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,13 @@ private static Set<String> duplicate(List<String> names) {
.filter(name -> Collections.frequency(names, name) > 1)
.collect(Collectors.toSet());
}

public static RowType getKeyRowType(Schema schema, int[] keyIndexes) {
List<DataField> keyRowFields = new ArrayList<>(keyIndexes.length);
List<DataField> rowFields = schema.toRowType().getFields();
for (int index : keyIndexes) {
keyRowFields.add(rowFields.get(index));
}
return new RowType(keyRowFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,6 @@ private static TableDistribution normalizeDistribution(
if (originDistribution != null) {
// we may need to check and normalize bucket key
List<String> bucketKeys = originDistribution.getBucketKeys();
// bucket key shouldn't include partition key
if (bucketKeys.stream().anyMatch(partitionKeys::contains)) {
throw new IllegalArgumentException(
String.format(
"Bucket key %s shouldn't include any column in partition keys %s.",
bucketKeys, partitionKeys));
}

// if primary key set
if (schema.getPrimaryKey().isPresent()) {
Expand Down

0 comments on commit 7e9df81

Please sign in to comment.