Skip to content

Commit

Permalink
[client] Fix some error which will let LookupSender hang if leader an…
Browse files Browse the repository at this point in the history
…d bucket not ready (#174)

* [client] Fix some error which will let LookupSender hang if leader and bucket not ready

* address yuxia's comments
  • Loading branch information
swuferhong authored Dec 13, 2024
1 parent 0bd16ef commit 246cdc5
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,17 @@ private Map<Integer, List<Lookup>> groupByLeader(List<Lookup> lookups) {
for (Lookup lookup : lookups) {
// get the leader node
TableBucket tb = lookup.tableBucket();
int leader = metadataUpdater.leaderFor(tb);

int leader;
try {
// TODO this can be a re-triable operation. We should retry here instead of throwing
// exception.
leader = metadataUpdater.leaderFor(tb);
} catch (Exception e) {
lookup.future().completeExceptionally(e);
continue;
}

lookupBatchesByLeader.computeIfAbsent(leader, k -> new ArrayList<>()).add(lookup);
}
return lookupBatchesByLeader;
Expand Down Expand Up @@ -188,16 +198,27 @@ private void handleLookupResponse(
? pbLookupRespForBucket.getPartitionId()
: null,
pbLookupRespForBucket.getBucketId());
List<PbValue> pbValues = pbLookupRespForBucket.getValuesList();
LookupBatch lookupBatch = lookupsByBucket.get(tableBucket);
lookupBatch.complete(pbValues);
if (pbLookupRespForBucket.hasErrorCode()) {
// TODO for re-triable error, we should retry here instead of throwing exception.
ApiError error = ApiError.fromErrorMessage(pbLookupRespForBucket);
LOG.warn(
"Get error lookup response on table bucket {}, fail. Error: {}",
tableBucket,
error.formatErrMsg());
lookupBatch.completeExceptionally(error.exception());
} else {
List<PbValue> pbValues = pbLookupRespForBucket.getValuesList();
lookupBatch.complete(pbValues);
}
}
}

private void handleLookupRequestException(
Throwable t, Map<TableBucket, LookupBatch> lookupsByBucket) {
ApiError error = ApiError.fromThrowable(t);
for (LookupBatch lookupBatch : lookupsByBucket.values()) {
// TODO for re-triable error, we should retry here instead of throwing exception.
LOG.warn(
"Get error lookup response on table bucket {}, fail. Error: {}",
lookupBatch.tableBucket(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ public static void verifyPartitionLogs(
verifyRows(rowType, actualRows, expectPartitionsRows);
}

public static void waitAllReplicasReady(long tableId, TableDescriptor tableDescriptor) {
// retry until all replica ready.
int expectBucketCount = tableDescriptor.getTableDistribution().get().getBucketCount().get();
for (int i = 0; i < expectBucketCount; i++) {
FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(new TableBucket(tableId, i));
}
}

protected static void verifyRows(
RowType rowType,
Map<Long, List<InternalRow>> actualRows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ void testUpsertWithSmallBuffer() throws Exception {
void testPutAndLookup() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_lookup_table");
createTable(tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor(), false);
verifyPutAndLookup(tablePath, DATA1_SCHEMA_PK, new Object[] {1, "a"});

Table table = conn.getTable(tablePath);
verifyPutAndLookup(table, DATA1_SCHEMA_PK, new Object[] {1, "a"});

// test put/lookup data for primary table with pk index is not 0
Schema schema =
Expand All @@ -207,8 +209,24 @@ void testPutAndLookup() throws Exception {
TablePath data1PkTablePath2 =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_2");
createTable(data1PkTablePath2, tableDescriptor, true);

// now, check put/lookup data
verifyPutAndLookup(data1PkTablePath2, schema, new Object[] {"a", 1});
Table table2 = conn.getTable(data1PkTablePath2);
verifyPutAndLookup(table2, schema, new Object[] {"a", 1});
}

@Test
void testLookupForNotReadyTable() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1");
TableDescriptor descriptor =
TableDescriptor.builder().schema(DATA1_SCHEMA_PK).distributedBy(10).build();
long tableId = createTable(tablePath, descriptor, true);
IndexedRow rowKey = keyRow(DATA1_SCHEMA_PK, new Object[] {1, "a"});
// retry until all replica ready. Otherwise, the lookup maybe fail. To avoid test unstable,
// if you want to test the lookup for not ready table, you can comment the following line.
waitAllReplicasReady(tableId, descriptor);
Table table = conn.getTable(tablePath);
assertThat(lookupRow(table, rowKey)).isNull();
}

@Test
Expand All @@ -232,8 +250,10 @@ void testLimitScanPrimaryTable() throws Exception {
}
}
upsertWriter.flush();

TableBucket tb = new TableBucket(tableId, 0);
List<InternalRow> actualRows =
table.limitScan(new TableBucket(tableId, 0), limitSize, null).get().stream()
table.limitScan(tb, limitSize, null).get().stream()
.map(ScanRecord::getRow)
.collect(Collectors.toList());
assertThat(actualRows.size()).isEqualTo(limitSize);
Expand All @@ -248,8 +268,7 @@ void testLimitScanPrimaryTable() throws Exception {
expectedRows.set(i, new Object[] {expectedRows.get(i)[1]});
}
actualRows =
table.limitScan(new TableBucket(tableId, 0), limitSize, projectedFields).get()
.stream()
table.limitScan(tb, limitSize, projectedFields).get().stream()
.map(ScanRecord::getRow)
.collect(Collectors.toList());
assertThat(actualRows.size()).isEqualTo(limitSize);
Expand Down Expand Up @@ -285,8 +304,10 @@ void testLimitScanLogTable() throws Exception {
}
}
appendWriter.flush();

TableBucket tb = new TableBucket(tableId, 0);
List<InternalRow> actualRows =
table.limitScan(new TableBucket(tableId, 0), limitSize, null).get().stream()
table.limitScan(tb, limitSize, null).get().stream()
.map(ScanRecord::getRow)
.collect(Collectors.toList());
assertThat(actualRows.size()).isEqualTo(limitSize);
Expand All @@ -301,8 +322,7 @@ void testLimitScanLogTable() throws Exception {
expectedRows.set(i, new Object[] {expectedRows.get(i)[1]});
}
actualRows =
table.limitScan(new TableBucket(tableId, 0), limitSize, projectedFields).get()
.stream()
table.limitScan(tb, limitSize, projectedFields).get().stream()
.map(ScanRecord::getRow)
.collect(Collectors.toList());
assertThat(actualRows.size()).isEqualTo(limitSize);
Expand All @@ -315,26 +335,21 @@ void testLimitScanLogTable() throws Exception {
}
}

void verifyPutAndLookup(TablePath tablePath, Schema tableSchema, Object[] fields)
throws Exception {
void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) throws Exception {
// put data.
InternalRow row = compactedRow(tableSchema.toRowType(), fields);
try (Table table = conn.getTable(tablePath)) {
UpsertWriter upsertWriter = table.getUpsertWriter();
// put data.
upsertWriter.upsert(row);
upsertWriter.flush();
}
UpsertWriter upsertWriter = table.getUpsertWriter();
// put data.
upsertWriter.upsert(row);
upsertWriter.flush();
// lookup this key.
IndexedRow keyRow = keyRow(tableSchema, fields);
assertThat(lookupRow(tablePath, keyRow)).isEqualTo(row);
assertThat(lookupRow(table, keyRow)).isEqualTo(row);
}

private InternalRow lookupRow(TablePath tablePath, IndexedRow keyRow) throws Exception {
try (Table table = conn.getTable(tablePath)) {
// lookup this key.
return table.lookup(keyRow).get().getRow();
}
private InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception {
// lookup this key.
return table.lookup(keyRow).get().getRow();
}

@Test
Expand All @@ -353,19 +368,19 @@ void testPartialPutAndDelete() throws Exception {
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, true);

// test put a full row
verifyPutAndLookup(DATA1_TABLE_PATH_PK, schema, new Object[] {1, "a", 1, true});
Table table = conn.getTable(DATA1_TABLE_PATH_PK);
verifyPutAndLookup(table, schema, new Object[] {1, "a", 1, true});

// partial update columns: a, b
UpsertWrite partialUpdate = new UpsertWrite().withPartialUpdate(new int[] {0, 1});
Table table = conn.getTable(DATA1_TABLE_PATH_PK);
UpsertWriter upsertWriter = table.getUpsertWriter(partialUpdate);
upsertWriter
.upsert(compactedRow(schema.toRowType(), new Object[] {1, "aaa", null, null}))
.get();

// check the row
IndexedRow rowKey = row(pkRowType, new Object[] {1});
assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey))
assertThat(lookupRow(table, rowKey))
.isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "aaa", 1, true}));

// partial update columns columns: a,b,c
Expand All @@ -376,14 +391,14 @@ void testPartialPutAndDelete() throws Exception {
.get();

// lookup the row
assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey))
assertThat(lookupRow(table, rowKey))
.isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, true}));

// test partial delete, target column is a,b,c
upsertWriter
.delete(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, null}))
.get();
assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey))
assertThat(lookupRow(table, rowKey))
.isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, null, null, true}));

// partial delete, target column is d
Expand All @@ -394,7 +409,7 @@ void testPartialPutAndDelete() throws Exception {
.get();

// the row should be deleted, shouldn't get the row again
assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey)).isNull();
assertThat(lookupRow(table, rowKey)).isNull();

table.close();
}
Expand Down Expand Up @@ -450,12 +465,12 @@ void testDelete() throws Exception {

// lookup this key.
IndexedRow keyRow = keyRow(DATA1_SCHEMA_PK, new Object[] {1, "a"});
assertThat(lookupRow(DATA1_TABLE_PATH_PK, keyRow)).isEqualTo(row);
assertThat(lookupRow(table, keyRow)).isEqualTo(row);

// delete this key.
upsertWriter.delete(row).get();
// lookup this key again, will return null.
assertThat(lookupRow(DATA1_TABLE_PATH_PK, keyRow)).isNull();
assertThat(lookupRow(table, keyRow)).isNull();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,19 @@ public void waitUtilAllReplicaReady(TableBucket tableBucket) {
() -> {
Optional<LeaderAndIsr> leaderAndIsrOpt = zkClient.getLeaderAndIsr(tableBucket);
assertThat(leaderAndIsrOpt).isPresent();
List<Integer> isr = leaderAndIsrOpt.get().isr();
LeaderAndIsr leaderAndIsr = leaderAndIsrOpt.get();
List<Integer> isr = leaderAndIsr.isr();
for (int replicaId : isr) {
ReplicaManager replicaManager =
getTabletServerById(replicaId).getReplicaManager();
assertThat(replicaManager.getReplica(tableBucket))
.isInstanceOf(ReplicaManager.OnlineReplica.class);
}

int leader = leaderAndIsr.leader();
ReplicaManager replicaManager = getTabletServerById(leader).getReplicaManager();
assertThat(replicaManager.getReplicaOrException(tableBucket).isLeader())
.isTrue();
});
}

Expand Down

0 comments on commit 246cdc5

Please sign in to comment.