From 82dac7eb187a1e44f3c40516a5dcc94596005af5 Mon Sep 17 00:00:00 2001
From: yunhong <337361684@qq.com>
Date: Thu, 16 Jan 2025 20:11:51 +0800
Subject: [PATCH] address jark's comments
---
.../fluss/record/DefaultLogRecordBatch.java | 4 +-
.../record/MemoryLogRecordsArrowBuilder.java | 17 +---
.../MemoryLogRecordsIndexedBuilder.java | 16 +---
.../record/DefaultLogRecordBatchTest.java | 64 +++++---------
.../MemoryLogRecordsArrowBuilderTest.java | 85 +++++++------------
.../com/alibaba/fluss/server/kv/KvTablet.java | 26 +++---
.../fluss/server/kv/wal/ArrowWalBuilder.java | 5 --
.../fluss/server/kv/wal/IndexWalBuilder.java | 5 --
.../fluss/server/kv/wal/WalBuilder.java | 2 -
9 files changed, 75 insertions(+), 149 deletions(-)
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java
index efe9ddd3c..4ca51747d 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java
@@ -64,14 +64,14 @@
* byte. The CRC-32C (Castagnoli) polynomial is used for the computation. CommitTimestamp is also
* located before the CRC, because it is determined in server side.
*
- *
The filed 'lastOffsetDelta is used to calculate the lastOffset of the current batch as:
+ *
The field 'lastOffsetDelta is used to calculate the lastOffset of the current batch as:
* [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset = baseOffset + recordCount -
* 1]. The reason for introducing this field is that there might be cases where the offset delta in
* batch does not match the recordCount. For example, when generating CDC logs for a kv table and
* sending a batch that only contains the deletion of non-existent kvs, no CDC logs would be
* generated. However, we need to increment the batchSequence for the corresponding writerId to make
* sure no {@link OutOfOrderSequenceException} will be thrown. In such a case, we would generate a
- * logRecordBatch with a LastOffsetDelta of 1 but a recordCount of 0.
+ * logRecordBatch with a LastOffsetDelta of 0 but a recordCount of 0.
*
*
The current attributes are given below:
*
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java
index e0f4718cf..a84e76025 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java
@@ -24,7 +24,6 @@
import com.alibaba.fluss.record.bytesview.MultiBytesView;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.arrow.ArrowWriter;
-import com.alibaba.fluss.utils.Preconditions;
import com.alibaba.fluss.utils.crc.Crc32C;
import java.io.IOException;
@@ -60,7 +59,6 @@ public class MemoryLogRecordsArrowBuilder implements AutoCloseable {
private int sizeInBytes;
private int recordCount;
private boolean isClosed;
- private long lastLogOffset;
private boolean reCalculateSizeInBytes = false;
private MemoryLogRecordsArrowBuilder(
@@ -80,7 +78,6 @@ private MemoryLogRecordsArrowBuilder(
this.writerId = NO_WRITER_ID;
this.batchSequence = NO_BATCH_SEQUENCE;
- this.lastLogOffset = -1L;
this.isClosed = false;
this.pagedOutputView = pagedOutputView;
@@ -170,14 +167,6 @@ public void resetWriterState(long writerId, int batchSequence) {
this.batchSequence = batchSequence;
}
- public void overrideLastLogOffset(long lastLogOffset) {
- Preconditions.checkArgument(
- lastLogOffset >= arrowWriter.getRecordsCount() + baseLogOffset,
- "The override lastLogOffset is less than recordCount + baseLogOffset, "
- + "which will cause the logOffsetDelta to be negative");
- this.lastLogOffset = lastLogOffset;
- }
-
public boolean isClosed() {
return isClosed;
}
@@ -228,10 +217,12 @@ private void writeBatchHeader() throws IOException {
outputView.writeShort((short) schemaId);
// skip write attributes
outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
- if (lastLogOffset < 0) {
+ if (recordCount > 0) {
outputView.writeInt(recordCount - 1);
} else {
- outputView.writeInt((int) (lastLogOffset - baseLogOffset));
+ // If there is no record, we write 0 for filed lastOffsetDelta, see the comments about
+ // the field 'lastOffsetDelta' in DefaultLogRecordBatch.
+ outputView.writeInt(0);
}
outputView.writeLong(writerId);
outputView.writeInt(batchSequence);
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java
index 8a0d98bc0..79f152d64 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java
@@ -59,7 +59,6 @@ public class MemoryLogRecordsIndexedBuilder implements AutoCloseable {
private int currentRecordNumber;
private int sizeInBytes;
private boolean isClosed;
- private long lastLogOffset;
private MemoryLogRecordsIndexedBuilder(
long baseLogOffset,
@@ -79,7 +78,6 @@ private MemoryLogRecordsIndexedBuilder(
this.writerId = NO_WRITER_ID;
this.batchSequence = NO_BATCH_SEQUENCE;
this.currentRecordNumber = 0;
- this.lastLogOffset = -1L;
this.isClosed = false;
// We don't need to write header information while the builder creating,
@@ -154,14 +152,6 @@ public void resetWriterState(long writerId, int batchSequence) {
this.batchSequence = batchSequence;
}
- public void overrideLastLogOffset(long lastLogOffset) {
- Preconditions.checkArgument(
- lastLogOffset >= currentRecordNumber + baseLogOffset,
- "The override lastLogOffset is less than recordCount + baseLogOffset, "
- + "which will cause the logOffsetDelta to be negative");
- this.lastLogOffset = lastLogOffset;
- }
-
public long writerId() {
return writerId;
}
@@ -202,10 +192,12 @@ private void writeBatchHeader() throws IOException {
outputView.writeShort((short) schemaId);
// skip write attribute byte for now.
outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
- if (lastLogOffset < 0) {
+ if (currentRecordNumber > 0) {
outputView.writeInt(currentRecordNumber - 1);
} else {
- outputView.writeInt((int) (lastLogOffset - baseLogOffset));
+ // If there is no record, we write 0 for filed lastOffsetDelta, see the comments about
+ // the field 'lastOffsetDelta' in DefaultLogRecordBatch.
+ outputView.writeInt(0);
}
outputView.writeLong(writerId);
outputView.writeInt(batchSequence);
diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java
index e6f30fdb8..ee3673c83 100644
--- a/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java
+++ b/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java
@@ -22,17 +22,13 @@
import com.alibaba.fluss.testutils.DataTestUtils;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.CloseableIterator;
-
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import static com.alibaba.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link DefaultLogRecordBatch}. */
public class DefaultLogRecordBatchTest extends LogTestBase {
@@ -101,59 +97,43 @@ void testIndexedRowWriteAndReadBatch() throws Exception {
}
@Test
- void testOverrideLastLogOffset() throws Exception {
- List rows1 =
- Arrays.asList(
- row(baseRowType, new Object[] {1, "1"}),
- row(baseRowType, new Object[] {2, "2"}),
- row(baseRowType, new Object[] {3, "3"}));
-
+ void testNoRecordAppend() throws Exception {
+ // 1. no record append with baseOffset as 0.
MemoryLogRecordsIndexedBuilder builder =
MemoryLogRecordsIndexedBuilder.builder(
- 0L, schemaId, Integer.MAX_VALUE, magic, outputView);
- for (IndexedRow row : rows1) {
- builder.append(RowKind.APPEND_ONLY, row);
- }
-
- // override lastLogOffset smaller than record counts.
- assertThatThrownBy(() -> builder.overrideLastLogOffset(1L))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "The override lastLogOffset is less than recordCount + baseLogOffset,"
- + " which will cause the logOffsetDelta to be negative");
-
- // override lastLogOffset larger than record counts.
- builder.overrideLastLogOffset(5L);
- MemoryLogRecords memoryLogRecords = builder.build();
+ 0L, schemaId, Integer.MAX_VALUE, magic, new UnmanagedPagedOutputView(100));
+ MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
Iterator iterator = memoryLogRecords.batches().iterator();
+ // only contains batch header.
+ assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
assertThat(iterator.hasNext()).isTrue();
LogRecordBatch logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();
logRecordBatch.ensureValid();
- assertThat(logRecordBatch.getRecordCount()).isEqualTo(3);
- assertThat(logRecordBatch.lastLogOffset()).isEqualTo(5);
- assertThat(logRecordBatch.nextLogOffset()).isEqualTo(6);
+ assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
+ assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
+ assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
try (LogRecordReadContext readContext =
LogRecordReadContext.createIndexedReadContext(baseRowType, schemaId);
CloseableIterator iter = logRecordBatch.records(readContext)) {
- for (IndexedRow row : rows1) {
- assertThat(iter.hasNext()).isTrue();
- LogRecord record = iter.next();
- assertThat(record.getRow()).isEqualTo(row);
- }
assertThat(iter.hasNext()).isFalse();
}
- // test empty record batch.
- MemoryLogRecordsIndexedBuilder builder2 =
+ // 2. no record append with baseOffset as 100.
+ builder =
MemoryLogRecordsIndexedBuilder.builder(
- 0L, schemaId, Integer.MAX_VALUE, magic, outputView);
- builder2.overrideLastLogOffset(0);
- memoryLogRecords = builder2.build();
+ 100L,
+ schemaId,
+ Integer.MAX_VALUE,
+ magic,
+ new UnmanagedPagedOutputView(100));
+ memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
iterator = memoryLogRecords.batches().iterator();
+ // only contains batch header.
+ assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
assertThat(iterator.hasNext()).isTrue();
logRecordBatch = iterator.next();
@@ -161,9 +141,9 @@ void testOverrideLastLogOffset() throws Exception {
logRecordBatch.ensureValid();
assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
- assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
- assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
- assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
+ assertThat(logRecordBatch.lastLogOffset()).isEqualTo(100);
+ assertThat(logRecordBatch.nextLogOffset()).isEqualTo(101);
+ assertThat(logRecordBatch.baseLogOffset()).isEqualTo(100);
try (LogRecordReadContext readContext =
LogRecordReadContext.createIndexedReadContext(baseRowType, schemaId);
CloseableIterator iter = logRecordBatch.records(readContext)) {
diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java
index aa5a3e993..c4bb9e2d0 100644
--- a/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java
+++ b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java
@@ -27,7 +27,6 @@
import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
import com.alibaba.fluss.utils.CloseableIterator;
-
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -35,7 +34,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -45,7 +43,6 @@
import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static com.alibaba.fluss.row.arrow.ArrowWriter.BUFFER_USAGE_RATIO;
import static com.alibaba.fluss.testutils.DataTestUtils.assertLogRecordsEquals;
-import static com.alibaba.fluss.testutils.DataTestUtils.assertMemoryRecordsEquals;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -74,7 +71,8 @@ void testAppendWithEmptyRecord() throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE);
- MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 100);
+ MemoryLogRecordsArrowBuilder builder =
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 100);
assertThat(builder.isFull()).isFalse();
assertThat(builder.getWriteLimitInBytes())
.isEqualTo((int) (maxSizeInBytes * BUFFER_USAGE_RATIO));
@@ -94,7 +92,8 @@ void testAppend() throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE);
- MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024);
+ MemoryLogRecordsArrowBuilder builder =
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
List rowKinds =
DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
List rows =
@@ -135,7 +134,7 @@ void testIllegalArgument() {
DEFAULT_SCHEMA_ID,
maxSizeInBytes,
DATA1_ROW_TYPE)) {
- createMemoryLogRecordsArrowBuilder(writer, 10, 30);
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 30);
}
})
.isInstanceOf(IllegalArgumentException.class)
@@ -148,7 +147,8 @@ void testClose() throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE);
- MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024);
+ MemoryLogRecordsArrowBuilder builder =
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
List rowKinds =
DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
List rows =
@@ -183,69 +183,49 @@ void testClose() throws Exception {
}
@Test
- void testOverrideLastOffset() throws Exception {
+ void testNoRecordAppend() throws Exception {
+ // 1. no record append with base offset as 0.
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE);
MemoryLogRecordsArrowBuilder builder =
- createMemoryLogRecordsArrowBuilder(writer, 10, 1024 * 10);
- List rowKinds =
- DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
- List rows =
- DATA1.stream()
- .map(object -> row(DATA1_ROW_TYPE, object))
- .collect(Collectors.toList());
- for (int i = 0; i < DATA1.size(); i++) {
- builder.append(rowKinds.get(i), rows.get(i));
- }
-
- // override lastLogOffset smaller than record counts.
- assertThatThrownBy(() -> builder.overrideLastLogOffset(3L))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "The override lastLogOffset is less than recordCount + baseLogOffset,"
- + " which will cause the logOffsetDelta to be negative");
-
- // override lastLogOffset larger than record counts.
- builder.overrideLastLogOffset(15L);
- builder.close();
- builder.serialize();
- MemoryLogRecords memoryLogRecords =
- MemoryLogRecords.pointToByteBuffer(builder.build().getByteBuf().nioBuffer());
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10);
+ MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
+ // only contains batch header.
+ assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
Iterator iterator = memoryLogRecords.batches().iterator();
-
assertThat(iterator.hasNext()).isTrue();
LogRecordBatch logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();
logRecordBatch.ensureValid();
- assertThat(logRecordBatch.getRecordCount()).isEqualTo(10);
- assertThat(logRecordBatch.lastLogOffset()).isEqualTo(15);
- assertThat(logRecordBatch.nextLogOffset()).isEqualTo(16);
+ assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
+ assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
+ assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
- assertMemoryRecordsEquals(
- DATA1_ROW_TYPE, memoryLogRecords, Collections.singletonList(DATA1));
+ try (LogRecordReadContext readContext =
+ LogRecordReadContext.createArrowReadContext(
+ DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
+ CloseableIterator iter = logRecordBatch.records(readContext)) {
+ assertThat(iter.hasNext()).isFalse();
+ }
- // test empty record batch.
+ // 2. no record append with base offset as 0.
ArrowWriter writer2 =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE);
- MemoryLogRecordsArrowBuilder builder2 =
- createMemoryLogRecordsArrowBuilder(writer2, 10, 1024 * 10);
- builder2.overrideLastLogOffset(0);
- builder2.close();
- builder2.serialize();
- memoryLogRecords =
- MemoryLogRecords.pointToByteBuffer(builder2.build().getByteBuf().nioBuffer());
+ builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 10);
+ memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
+ // only contains batch header.
+ assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
iterator = memoryLogRecords.batches().iterator();
-
assertThat(iterator.hasNext()).isTrue();
logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();
logRecordBatch.ensureValid();
assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
- assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
- assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
- assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
+ assertThat(logRecordBatch.lastLogOffset()).isEqualTo(100);
+ assertThat(logRecordBatch.nextLogOffset()).isEqualTo(101);
+ assertThat(logRecordBatch.baseLogOffset()).isEqualTo(100);
try (LogRecordReadContext readContext =
LogRecordReadContext.createArrowReadContext(
DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
@@ -255,14 +235,15 @@ void testOverrideLastOffset() throws Exception {
}
private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder(
- ArrowWriter writer, int maxPages, int pageSizeInBytes) throws IOException {
+ int baseOffset, ArrowWriter writer, int maxPages, int pageSizeInBytes)
+ throws IOException {
conf.set(
ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE,
new MemorySize((long) maxPages * pageSizeInBytes));
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(pageSizeInBytes));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(pageSizeInBytes));
return MemoryLogRecordsArrowBuilder.builder(
- 0L,
+ baseOffset,
DEFAULT_SCHEMA_ID,
writer,
new ManagedPagedOutputView(new TestingMemorySegmentPool(pageSizeInBytes)));
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java
index e92cb49cc..39ea873d3 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java
@@ -60,14 +60,12 @@
import com.alibaba.fluss.utils.FileUtils;
import com.alibaba.fluss.utils.FlussPaths;
import com.alibaba.fluss.utils.types.Tuple2;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
-
import java.io.File;
import java.io.IOException;
import java.util.Collection;
@@ -340,20 +338,16 @@ public LogAppendInfo putAsLeader(
}
}
- if (appendedRecordCount <= 0) {
- // This situation indicates that these batches of kvRecordBatch have not
- // generated any CDC logs, for example, when client attempts to delete
- // some non-existent keys or MergeEngine set to FIRST_ROW. In this case,
- // we cannot simply return, as doing so would cause a
- // OutOfOrderSequenceException problem. Therefore, here we override the
- // lastLogOffset to 0L as the baseLogOffset is 0L. As doing that, the
- // logOffsetDelta in logRecordBatch will be set to 0L instead of -1L.
- // So, we will put a batch into file with recordCount 0 and offset plus
- // 1L, it will update the batchSequence corresponding to the writerId
- // and also increment the CDC log offset by 1.
- walBuilder.overrideLastLogOffset(0L);
- }
-
+ // There will be a situation that these batches of kvRecordBatch have not
+ // generated any CDC logs, for example, when client attempts to delete
+ // some non-existent keys or MergeEngine set to FIRST_ROW. In this case,
+ // we cannot simply return, as doing so would cause a
+ // OutOfOrderSequenceException problem. Therefore, here we will build an
+ // empty batch with lastLogOffset to 0L as the baseLogOffset is 0L. As doing
+ // that, the logOffsetDelta in logRecordBatch will be set to 0L instead of
+ // -1L. So, we will put a batch into file with recordCount 0 and offset plus
+ // 1L, it will update the batchSequence corresponding to the writerId
+ // and also increment the CDC log offset by 1.
return logTablet.appendAsLeader(walBuilder.build());
} catch (Throwable t) {
// While encounter error here, the CDC logs may fail writing to disk,
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilder.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilder.java
index 6d87cfe83..ed9774248 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilder.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilder.java
@@ -61,11 +61,6 @@ public void setWriterState(long writerId, int batchSequence) {
recordsBuilder.setWriterState(writerId, batchSequence);
}
- @Override
- public void overrideLastLogOffset(long lastLogOffset) {
- recordsBuilder.overrideLastLogOffset(lastLogOffset);
- }
-
@Override
public void deallocate() {
recordsBuilder.recycleArrowWriter();
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/IndexWalBuilder.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/IndexWalBuilder.java
index 7fd773d7b..27e904c61 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/IndexWalBuilder.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/IndexWalBuilder.java
@@ -61,11 +61,6 @@ public void setWriterState(long writerId, int batchSequence) {
recordsBuilder.setWriterState(writerId, batchSequence);
}
- @Override
- public void overrideLastLogOffset(long lastLogOffset) {
- recordsBuilder.overrideLastLogOffset(lastLogOffset);
- }
-
@Override
public void deallocate() {
memorySegmentPool.returnAll(outputView.allocatedPooledSegments());
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/WalBuilder.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/WalBuilder.java
index f04413a9d..4f3ddd1c2 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/WalBuilder.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/WalBuilder.java
@@ -29,7 +29,5 @@ public interface WalBuilder {
void setWriterState(long writerId, int batchSequence);
- void overrideLastLogOffset(long lastLogOffset);
-
void deallocate();
}