Skip to content

Commit

Permalink
[kv] Fix out of order exception after delete a not exist row (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong authored Jan 17, 2025
1 parent bda3589 commit d614576
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.exception.CorruptMessageException;
import com.alibaba.fluss.exception.OutOfOrderSequenceException;
import com.alibaba.fluss.memory.MemorySegment;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.row.arrow.ArrowReader;
Expand Down Expand Up @@ -50,6 +51,7 @@
* <li>CRC => Uint32
* <li>SchemaId => Int16
* <li>Attributes => Int8
* <li>LastOffsetDelta => Int32
* <li>WriterID => Int64
* <li>SequenceID => Int32
* <li>RecordCount => Int32
Expand All @@ -62,6 +64,15 @@
* byte. The CRC-32C (Castagnoli) polynomial is used for the computation. CommitTimestamp is also
* located before the CRC, because it is determined in server side.
*
* <p>The field 'lastOffsetDelta is used to calculate the lastOffset of the current batch as:
* [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset = baseOffset + recordCount -
* 1]. The reason for introducing this field is that there might be cases where the offset delta in
* batch does not match the recordCount. For example, when generating CDC logs for a kv table and
* sending a batch that only contains the deletion of non-existent kvs, no CDC logs would be
* generated. However, we need to increment the batchSequence for the corresponding writerId to make
* sure no {@link OutOfOrderSequenceException} will be thrown. In such a case, we would generate a
* logRecordBatch with a LastOffsetDelta of 0 but a recordCount of 0.
*
* <p>The current attributes are given below:
*
* <pre>
Expand All @@ -82,6 +93,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch {
static final int CRC_LENGTH = 4;
static final int SCHEMA_ID_LENGTH = 2;
static final int ATTRIBUTE_LENGTH = 1;
static final int LAST_OFFSET_DELTA_LENGTH = 4;
static final int WRITE_CLIENT_ID_LENGTH = 8;
static final int BATCH_SEQUENCE_LENGTH = 4;
static final int RECORDS_COUNT_LENGTH = 4;
Expand All @@ -93,7 +105,8 @@ public class DefaultLogRecordBatch implements LogRecordBatch {
public static final int CRC_OFFSET = COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH;
protected static final int SCHEMA_ID_OFFSET = CRC_OFFSET + CRC_LENGTH;
static final int ATTRIBUTES_OFFSET = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH;
static final int WRITE_CLIENT_ID_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
static final int WRITE_CLIENT_ID_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
static final int BATCH_SEQUENCE_OFFSET = WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH;
public static final int RECORDS_COUNT_OFFSET = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH;
static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
Expand Down Expand Up @@ -201,7 +214,7 @@ public long lastLogOffset() {
}

private int lastOffsetDelta() {
return getRecordCount() - 1;
return segment.getInt(LAST_OFFSET_DELTA_OFFSET + position);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LAST_OFFSET_DELTA_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
Expand Down Expand Up @@ -216,7 +216,14 @@ private void writeBatchHeader() throws IOException {

outputView.writeShort((short) schemaId);
// skip write attributes
outputView.setPosition(WRITE_CLIENT_ID_OFFSET);
outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
if (recordCount > 0) {
outputView.writeInt(recordCount - 1);
} else {
// If there is no record, we write 0 for filed lastOffsetDelta, see the comments about
// the field 'lastOffsetDelta' in DefaultLogRecordBatch.
outputView.writeInt(0);
}
outputView.writeLong(writerId);
outputView.writeInt(batchSequence);
outputView.writeInt(recordCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LAST_OFFSET_DELTA_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
Expand Down Expand Up @@ -191,7 +191,14 @@ private void writeBatchHeader() throws IOException {

outputView.writeShort((short) schemaId);
// skip write attribute byte for now.
outputView.setPosition(WRITE_CLIENT_ID_OFFSET);
outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
if (currentRecordNumber > 0) {
outputView.writeInt(currentRecordNumber - 1);
} else {
// If there is no record, we write 0 for filed lastOffsetDelta, see the comments about
// the field 'lastOffsetDelta' in DefaultLogRecordBatch.
outputView.writeInt(0);
}
outputView.writeLong(writerId);
outputView.writeInt(batchSequence);
outputView.writeInt(currentRecordNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,59 @@ void testIndexedRowWriteAndReadBatch() throws Exception {

builder.close();
}

@Test
void testNoRecordAppend() throws Exception {
// 1. no record append with baseOffset as 0.
MemoryLogRecordsIndexedBuilder builder =
MemoryLogRecordsIndexedBuilder.builder(
0L, schemaId, Integer.MAX_VALUE, magic, new UnmanagedPagedOutputView(100));
MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
Iterator<LogRecordBatch> iterator = memoryLogRecords.batches().iterator();
// only contains batch header.
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);

assertThat(iterator.hasNext()).isTrue();
LogRecordBatch logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();

logRecordBatch.ensureValid();
assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
try (LogRecordReadContext readContext =
LogRecordReadContext.createIndexedReadContext(baseRowType, schemaId);
CloseableIterator<LogRecord> iter = logRecordBatch.records(readContext)) {
assertThat(iter.hasNext()).isFalse();
}

// 2. no record append with baseOffset as 100.
builder =
MemoryLogRecordsIndexedBuilder.builder(
100L,
schemaId,
Integer.MAX_VALUE,
magic,
new UnmanagedPagedOutputView(100));
memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
iterator = memoryLogRecords.batches().iterator();
// only contains batch header.
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);

assertThat(iterator.hasNext()).isTrue();
logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();

logRecordBatch.ensureValid();
assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
assertThat(logRecordBatch.lastLogOffset()).isEqualTo(100);
assertThat(logRecordBatch.nextLogOffset()).isEqualTo(101);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(100);
try (LogRecordReadContext readContext =
LogRecordReadContext.createIndexedReadContext(baseRowType, schemaId);
CloseableIterator<LogRecord> iter = logRecordBatch.records(readContext)) {
assertThat(iter.hasNext()).isFalse();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alibaba.fluss.row.arrow.ArrowWriterPool;
import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
import com.alibaba.fluss.utils.CloseableIterator;

import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -71,7 +72,8 @@ void testAppendWithEmptyRecord() throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE);
MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 100);
MemoryLogRecordsArrowBuilder builder =
createMemoryLogRecordsArrowBuilder(0, writer, 10, 100);
assertThat(builder.isFull()).isFalse();
assertThat(builder.getWriteLimitInBytes())
.isEqualTo((int) (maxSizeInBytes * BUFFER_USAGE_RATIO));
Expand All @@ -82,7 +84,7 @@ void testAppendWithEmptyRecord() throws Exception {
assertThat(iterator.hasNext()).isTrue();
LogRecordBatch batch = iterator.next();
assertThat(batch.getRecordCount()).isEqualTo(0);
assertThat(batch.sizeInBytes()).isEqualTo(44);
assertThat(batch.sizeInBytes()).isEqualTo(48);
assertThat(iterator.hasNext()).isFalse();
}

Expand All @@ -91,7 +93,8 @@ void testAppend() throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE);
MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024);
MemoryLogRecordsArrowBuilder builder =
createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
List<RowKind> rowKinds =
DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
List<InternalRow> rows =
Expand Down Expand Up @@ -132,20 +135,21 @@ void testIllegalArgument() {
DEFAULT_SCHEMA_ID,
maxSizeInBytes,
DATA1_ROW_TYPE)) {
createMemoryLogRecordsArrowBuilder(writer, 10, 30);
createMemoryLogRecordsArrowBuilder(0, writer, 10, 30);
}
})
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"The size of first segment of pagedOutputView is too small, need at least 44 bytes.");
"The size of first segment of pagedOutputView is too small, need at least 48 bytes.");
}

@Test
void testClose() throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE);
MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024);
MemoryLogRecordsArrowBuilder builder =
createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
List<RowKind> rowKinds =
DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
List<InternalRow> rows =
Expand Down Expand Up @@ -179,15 +183,68 @@ void testClose() throws Exception {
writer1.close();
}

@Test
void testNoRecordAppend() throws Exception {
// 1. no record append with base offset as 0.
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE);
MemoryLogRecordsArrowBuilder builder =
createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10);
MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
// only contains batch header.
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
Iterator<LogRecordBatch> iterator = memoryLogRecords.batches().iterator();
assertThat(iterator.hasNext()).isTrue();
LogRecordBatch logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();

logRecordBatch.ensureValid();
assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
try (LogRecordReadContext readContext =
LogRecordReadContext.createArrowReadContext(
DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
CloseableIterator<LogRecord> iter = logRecordBatch.records(readContext)) {
assertThat(iter.hasNext()).isFalse();
}

// 2. no record append with base offset as 0.
ArrowWriter writer2 =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE);
builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 10);
memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
// only contains batch header.
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
iterator = memoryLogRecords.batches().iterator();
assertThat(iterator.hasNext()).isTrue();
logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();

logRecordBatch.ensureValid();
assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
assertThat(logRecordBatch.lastLogOffset()).isEqualTo(100);
assertThat(logRecordBatch.nextLogOffset()).isEqualTo(101);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(100);
try (LogRecordReadContext readContext =
LogRecordReadContext.createArrowReadContext(
DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
CloseableIterator<LogRecord> iter = logRecordBatch.records(readContext)) {
assertThat(iter.hasNext()).isFalse();
}
}

private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder(
ArrowWriter writer, int maxPages, int pageSizeInBytes) throws IOException {
int baseOffset, ArrowWriter writer, int maxPages, int pageSizeInBytes)
throws IOException {
conf.set(
ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE,
new MemorySize((long) maxPages * pageSizeInBytes));
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(pageSizeInBytes));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(pageSizeInBytes));
return MemoryLogRecordsArrowBuilder.builder(
0L,
baseOffset,
DEFAULT_SCHEMA_ID,
writer,
new ManagedPagedOutputView(new TestingMemorySegmentPool(pageSizeInBytes)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,21 @@ public static void assertMemoryRecordsEqualsWithRowKind(
assertThat(iterator.hasNext()).isFalse();
}

public static void assertLogRecordBatchEqualsWithRowKind(
RowType rowType,
LogRecordBatch logRecordBatch,
List<Tuple2<RowKind, Object[]>> expected) {
try (LogRecordReadContext readContext = createArrowReadContext(rowType, DEFAULT_SCHEMA_ID);
CloseableIterator<LogRecord> logIterator = logRecordBatch.records(readContext)) {
for (Tuple2<RowKind, Object[]> expectedFieldAndRowKind : expected) {
assertThat(logIterator.hasNext()).isTrue();
assertLogRecordsEqualsWithRowKind(
rowType, logIterator.next(), expectedFieldAndRowKind);
}
assertThat(logIterator.hasNext()).isFalse();
}
}

public static void assertLogRecordsEquals(LogRecords actual, LogRecords expected) {
assertLogRecordsEquals(DATA1_ROW_TYPE, actual, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,16 @@ void testFirstRowMergeEngine() throws Exception {
// insert again
tEnv.executeSql("insert into first_row_source(a, b) VALUES (3, 'v33'), (4, 'v44')").await();
expectedRows = Collections.singletonList("+I[4, v44]");
assertResultsIgnoreOrder(rowIter, expectedRows, false);

// insert with all keys already exists.
tEnv.executeSql("insert into first_row_source(a, b) VALUES (3, 'v333'), (4, 'v444')")
.await();

tEnv.executeSql("insert into first_row_source(a, b) VALUES (5, 'v5')").await();
expectedRows = Collections.singletonList("+I[5, v5]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);

insertJobClient.cancel().get();
}

Expand Down Expand Up @@ -531,7 +540,7 @@ void testDeleteAndUpdateStmtOnPkTable() throws Exception {
+ " primary key (a) not enforced"
+ ")",
tableName));
// test delete without data.
// test delete data with non-exists key.
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE a = 5").await();

List<String> insertValues =
Expand All @@ -557,6 +566,9 @@ void testDeleteAndUpdateStmtOnPkTable() throws Exception {
.collect();
assertThat(rowIter.hasNext()).isFalse();

// test delete data with non-exists key.
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE a = 15").await();

// test update row4
tBatchEnv.executeSql("UPDATE " + tableName + " SET c = 'New York' WHERE a = 4").await();
CloseableIterator<Row> row4 =
Expand Down
Loading

0 comments on commit d614576

Please sign in to comment.