diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java index 9be052b1f..4ca51747d 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java @@ -18,6 +18,7 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.exception.CorruptMessageException; +import com.alibaba.fluss.exception.OutOfOrderSequenceException; import com.alibaba.fluss.memory.MemorySegment; import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.row.arrow.ArrowReader; @@ -50,6 +51,7 @@ *
  • CRC => Uint32 *
  • SchemaId => Int16 *
  • Attributes => Int8 + *
  • LastOffsetDelta => Int32 *
  • WriterID => Int64 *
  • SequenceID => Int32 *
  • RecordCount => Int32 @@ -62,6 +64,15 @@ * byte. The CRC-32C (Castagnoli) polynomial is used for the computation. CommitTimestamp is also * located before the CRC, because it is determined in server side. * + *

    The field 'lastOffsetDelta is used to calculate the lastOffset of the current batch as: + * [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset = baseOffset + recordCount - + * 1]. The reason for introducing this field is that there might be cases where the offset delta in + * batch does not match the recordCount. For example, when generating CDC logs for a kv table and + * sending a batch that only contains the deletion of non-existent kvs, no CDC logs would be + * generated. However, we need to increment the batchSequence for the corresponding writerId to make + * sure no {@link OutOfOrderSequenceException} will be thrown. In such a case, we would generate a + * logRecordBatch with a LastOffsetDelta of 0 but a recordCount of 0. + * *

    The current attributes are given below: * *

    @@ -82,6 +93,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch {
         static final int CRC_LENGTH = 4;
         static final int SCHEMA_ID_LENGTH = 2;
         static final int ATTRIBUTE_LENGTH = 1;
    +    static final int LAST_OFFSET_DELTA_LENGTH = 4;
         static final int WRITE_CLIENT_ID_LENGTH = 8;
         static final int BATCH_SEQUENCE_LENGTH = 4;
         static final int RECORDS_COUNT_LENGTH = 4;
    @@ -93,7 +105,8 @@ public class DefaultLogRecordBatch implements LogRecordBatch {
         public static final int CRC_OFFSET = COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH;
         protected static final int SCHEMA_ID_OFFSET = CRC_OFFSET + CRC_LENGTH;
         static final int ATTRIBUTES_OFFSET = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH;
    -    static final int WRITE_CLIENT_ID_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
    +    static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
    +    static final int WRITE_CLIENT_ID_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
         static final int BATCH_SEQUENCE_OFFSET = WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH;
         public static final int RECORDS_COUNT_OFFSET = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH;
         static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
    @@ -201,7 +214,7 @@ public long lastLogOffset() {
         }
     
         private int lastOffsetDelta() {
    -        return getRecordCount() - 1;
    +        return segment.getInt(LAST_OFFSET_DELTA_OFFSET + position);
         }
     
         @Override
    diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java
    index 24e37ac05..a84e76025 100644
    --- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java
    +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java
    @@ -31,9 +31,9 @@
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
    +import static com.alibaba.fluss.record.DefaultLogRecordBatch.LAST_OFFSET_DELTA_OFFSET;
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
    -import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
     import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
     import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
     import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
    @@ -216,7 +216,14 @@ private void writeBatchHeader() throws IOException {
     
             outputView.writeShort((short) schemaId);
             // skip write attributes
    -        outputView.setPosition(WRITE_CLIENT_ID_OFFSET);
    +        outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
    +        if (recordCount > 0) {
    +            outputView.writeInt(recordCount - 1);
    +        } else {
    +            // If there is no record, we write 0 for filed lastOffsetDelta, see the comments about
    +            // the field 'lastOffsetDelta' in DefaultLogRecordBatch.
    +            outputView.writeInt(0);
    +        }
             outputView.writeLong(writerId);
             outputView.writeInt(batchSequence);
             outputView.writeInt(recordCount);
    diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java
    index 5ef18dba0..79f152d64 100644
    --- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java
    +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java
    @@ -31,10 +31,10 @@
     
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
    +import static com.alibaba.fluss.record.DefaultLogRecordBatch.LAST_OFFSET_DELTA_OFFSET;
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
     import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
    -import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
     import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
     import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
     import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
    @@ -191,7 +191,14 @@ private void writeBatchHeader() throws IOException {
     
             outputView.writeShort((short) schemaId);
             // skip write attribute byte for now.
    -        outputView.setPosition(WRITE_CLIENT_ID_OFFSET);
    +        outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
    +        if (currentRecordNumber > 0) {
    +            outputView.writeInt(currentRecordNumber - 1);
    +        } else {
    +            // If there is no record, we write 0 for filed lastOffsetDelta, see the comments about
    +            // the field 'lastOffsetDelta' in DefaultLogRecordBatch.
    +            outputView.writeInt(0);
    +        }
             outputView.writeLong(writerId);
             outputView.writeInt(batchSequence);
             outputView.writeInt(currentRecordNumber);
    diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java
    index 7e1aae3b2..e1dd641c5 100644
    --- a/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java
    +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java
    @@ -96,4 +96,59 @@ void testIndexedRowWriteAndReadBatch() throws Exception {
     
             builder.close();
         }
    +
    +    @Test
    +    void testNoRecordAppend() throws Exception {
    +        // 1. no record append with baseOffset as 0.
    +        MemoryLogRecordsIndexedBuilder builder =
    +                MemoryLogRecordsIndexedBuilder.builder(
    +                        0L, schemaId, Integer.MAX_VALUE, magic, new UnmanagedPagedOutputView(100));
    +        MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
    +        Iterator iterator = memoryLogRecords.batches().iterator();
    +        // only contains batch header.
    +        assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
    +
    +        assertThat(iterator.hasNext()).isTrue();
    +        LogRecordBatch logRecordBatch = iterator.next();
    +        assertThat(iterator.hasNext()).isFalse();
    +
    +        logRecordBatch.ensureValid();
    +        assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
    +        assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
    +        assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
    +        assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
    +        try (LogRecordReadContext readContext =
    +                        LogRecordReadContext.createIndexedReadContext(baseRowType, schemaId);
    +                CloseableIterator iter = logRecordBatch.records(readContext)) {
    +            assertThat(iter.hasNext()).isFalse();
    +        }
    +
    +        // 2. no record append with baseOffset as 100.
    +        builder =
    +                MemoryLogRecordsIndexedBuilder.builder(
    +                        100L,
    +                        schemaId,
    +                        Integer.MAX_VALUE,
    +                        magic,
    +                        new UnmanagedPagedOutputView(100));
    +        memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
    +        iterator = memoryLogRecords.batches().iterator();
    +        // only contains batch header.
    +        assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
    +
    +        assertThat(iterator.hasNext()).isTrue();
    +        logRecordBatch = iterator.next();
    +        assertThat(iterator.hasNext()).isFalse();
    +
    +        logRecordBatch.ensureValid();
    +        assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
    +        assertThat(logRecordBatch.lastLogOffset()).isEqualTo(100);
    +        assertThat(logRecordBatch.nextLogOffset()).isEqualTo(101);
    +        assertThat(logRecordBatch.baseLogOffset()).isEqualTo(100);
    +        try (LogRecordReadContext readContext =
    +                        LogRecordReadContext.createIndexedReadContext(baseRowType, schemaId);
    +                CloseableIterator iter = logRecordBatch.records(readContext)) {
    +            assertThat(iter.hasNext()).isFalse();
    +        }
    +    }
     }
    diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java
    index 94da99d4b..b6e153324 100644
    --- a/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java
    +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java
    @@ -26,6 +26,7 @@
     import com.alibaba.fluss.row.arrow.ArrowWriterPool;
     import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
     import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
    +import com.alibaba.fluss.utils.CloseableIterator;
     
     import org.apache.commons.lang3.RandomUtils;
     import org.junit.jupiter.api.AfterEach;
    @@ -71,7 +72,8 @@ void testAppendWithEmptyRecord() throws Exception {
             int maxSizeInBytes = 1024;
             ArrowWriter writer =
                     provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE);
    -        MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 100);
    +        MemoryLogRecordsArrowBuilder builder =
    +                createMemoryLogRecordsArrowBuilder(0, writer, 10, 100);
             assertThat(builder.isFull()).isFalse();
             assertThat(builder.getWriteLimitInBytes())
                     .isEqualTo((int) (maxSizeInBytes * BUFFER_USAGE_RATIO));
    @@ -82,7 +84,7 @@ void testAppendWithEmptyRecord() throws Exception {
             assertThat(iterator.hasNext()).isTrue();
             LogRecordBatch batch = iterator.next();
             assertThat(batch.getRecordCount()).isEqualTo(0);
    -        assertThat(batch.sizeInBytes()).isEqualTo(44);
    +        assertThat(batch.sizeInBytes()).isEqualTo(48);
             assertThat(iterator.hasNext()).isFalse();
         }
     
    @@ -91,7 +93,8 @@ void testAppend() throws Exception {
             int maxSizeInBytes = 1024;
             ArrowWriter writer =
                     provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE);
    -        MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024);
    +        MemoryLogRecordsArrowBuilder builder =
    +                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
             List rowKinds =
                     DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
             List rows =
    @@ -132,12 +135,12 @@ void testIllegalArgument() {
                                                 DEFAULT_SCHEMA_ID,
                                                 maxSizeInBytes,
                                                 DATA1_ROW_TYPE)) {
    -                                createMemoryLogRecordsArrowBuilder(writer, 10, 30);
    +                                createMemoryLogRecordsArrowBuilder(0, writer, 10, 30);
                                 }
                             })
                     .isInstanceOf(IllegalArgumentException.class)
                     .hasMessage(
    -                        "The size of first segment of pagedOutputView is too small, need at least 44 bytes.");
    +                        "The size of first segment of pagedOutputView is too small, need at least 48 bytes.");
         }
     
         @Test
    @@ -145,7 +148,8 @@ void testClose() throws Exception {
             int maxSizeInBytes = 1024;
             ArrowWriter writer =
                     provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE);
    -        MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024);
    +        MemoryLogRecordsArrowBuilder builder =
    +                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
             List rowKinds =
                     DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
             List rows =
    @@ -179,15 +183,68 @@ void testClose() throws Exception {
             writer1.close();
         }
     
    +    @Test
    +    void testNoRecordAppend() throws Exception {
    +        // 1. no record append with base offset as 0.
    +        ArrowWriter writer =
    +                provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE);
    +        MemoryLogRecordsArrowBuilder builder =
    +                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10);
    +        MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
    +        // only contains batch header.
    +        assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
    +        Iterator iterator = memoryLogRecords.batches().iterator();
    +        assertThat(iterator.hasNext()).isTrue();
    +        LogRecordBatch logRecordBatch = iterator.next();
    +        assertThat(iterator.hasNext()).isFalse();
    +
    +        logRecordBatch.ensureValid();
    +        assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
    +        assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
    +        assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
    +        assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
    +        try (LogRecordReadContext readContext =
    +                        LogRecordReadContext.createArrowReadContext(
    +                                DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
    +                CloseableIterator iter = logRecordBatch.records(readContext)) {
    +            assertThat(iter.hasNext()).isFalse();
    +        }
    +
    +        // 2. no record append with base offset as 0.
    +        ArrowWriter writer2 =
    +                provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE);
    +        builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 10);
    +        memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
    +        // only contains batch header.
    +        assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
    +        iterator = memoryLogRecords.batches().iterator();
    +        assertThat(iterator.hasNext()).isTrue();
    +        logRecordBatch = iterator.next();
    +        assertThat(iterator.hasNext()).isFalse();
    +
    +        logRecordBatch.ensureValid();
    +        assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
    +        assertThat(logRecordBatch.lastLogOffset()).isEqualTo(100);
    +        assertThat(logRecordBatch.nextLogOffset()).isEqualTo(101);
    +        assertThat(logRecordBatch.baseLogOffset()).isEqualTo(100);
    +        try (LogRecordReadContext readContext =
    +                        LogRecordReadContext.createArrowReadContext(
    +                                DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
    +                CloseableIterator iter = logRecordBatch.records(readContext)) {
    +            assertThat(iter.hasNext()).isFalse();
    +        }
    +    }
    +
         private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder(
    -            ArrowWriter writer, int maxPages, int pageSizeInBytes) throws IOException {
    +            int baseOffset, ArrowWriter writer, int maxPages, int pageSizeInBytes)
    +            throws IOException {
             conf.set(
                     ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE,
                     new MemorySize((long) maxPages * pageSizeInBytes));
             conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(pageSizeInBytes));
             conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(pageSizeInBytes));
             return MemoryLogRecordsArrowBuilder.builder(
    -                0L,
    +                baseOffset,
                     DEFAULT_SCHEMA_ID,
                     writer,
                     new ManagedPagedOutputView(new TestingMemorySegmentPool(pageSizeInBytes)));
    diff --git a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java
    index 80c0242f8..42d4587a3 100644
    --- a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java
    +++ b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java
    @@ -508,6 +508,21 @@ public static void assertMemoryRecordsEqualsWithRowKind(
             assertThat(iterator.hasNext()).isFalse();
         }
     
    +    public static void assertLogRecordBatchEqualsWithRowKind(
    +            RowType rowType,
    +            LogRecordBatch logRecordBatch,
    +            List> expected) {
    +        try (LogRecordReadContext readContext = createArrowReadContext(rowType, DEFAULT_SCHEMA_ID);
    +                CloseableIterator logIterator = logRecordBatch.records(readContext)) {
    +            for (Tuple2 expectedFieldAndRowKind : expected) {
    +                assertThat(logIterator.hasNext()).isTrue();
    +                assertLogRecordsEqualsWithRowKind(
    +                        rowType, logIterator.next(), expectedFieldAndRowKind);
    +            }
    +            assertThat(logIterator.hasNext()).isFalse();
    +        }
    +    }
    +
         public static void assertLogRecordsEquals(LogRecords actual, LogRecords expected) {
             assertLogRecordsEquals(DATA1_ROW_TYPE, actual, expected);
         }
    diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java
    index 1d6568c77..29a61ddbd 100644
    --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java
    +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java
    @@ -387,7 +387,16 @@ void testFirstRowMergeEngine() throws Exception {
             // insert again
             tEnv.executeSql("insert into first_row_source(a, b) VALUES (3, 'v33'), (4, 'v44')").await();
             expectedRows = Collections.singletonList("+I[4, v44]");
    +        assertResultsIgnoreOrder(rowIter, expectedRows, false);
    +
    +        // insert with all keys already exists.
    +        tEnv.executeSql("insert into first_row_source(a, b) VALUES (3, 'v333'), (4, 'v444')")
    +                .await();
    +
    +        tEnv.executeSql("insert into first_row_source(a, b) VALUES (5, 'v5')").await();
    +        expectedRows = Collections.singletonList("+I[5, v5]");
             assertResultsIgnoreOrder(rowIter, expectedRows, true);
    +
             insertJobClient.cancel().get();
         }
     
    @@ -531,7 +540,7 @@ void testDeleteAndUpdateStmtOnPkTable() throws Exception {
                                     + " primary key (a) not enforced"
                                     + ")",
                             tableName));
    -        // test delete without data.
    +        // test delete data with non-exists key.
             tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE a = 5").await();
     
             List insertValues =
    @@ -557,6 +566,9 @@ void testDeleteAndUpdateStmtOnPkTable() throws Exception {
                             .collect();
             assertThat(rowIter.hasNext()).isFalse();
     
    +        // test delete data with non-exists key.
    +        tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE a = 15").await();
    +
             // test update row4
             tBatchEnv.executeSql("UPDATE " + tableName + " SET c = 'New York' WHERE a = 4").await();
             CloseableIterator row4 =
    diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java
    index 89eeeebac..6fa493cc8 100644
    --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java
    +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java
    @@ -267,7 +267,6 @@ public LogAppendInfo putAsLeader(
                             ValueDecoder valueDecoder =
                                     new ValueDecoder(readContext.getRowDecoder(schemaId));
     
    -                        int appendedRecordCount = 0;
                             for (KvRecord kvRecord : kvRecords.records(readContext)) {
                                 byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
                                 KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
    @@ -289,13 +288,11 @@ public LogAppendInfo putAsLeader(
                                         // if newRow is null, it means the row should be deleted
                                         if (newRow == null) {
                                             walBuilder.append(RowKind.DELETE, oldRow);
    -                                        appendedRecordCount += 1;
                                             kvPreWriteBuffer.delete(key, logOffset++);
                                         } else {
                                             // otherwise, it's a partial update, should produce -U,+U
                                             walBuilder.append(RowKind.UPDATE_BEFORE, oldRow);
                                             walBuilder.append(RowKind.UPDATE_AFTER, newRow);
    -                                        appendedRecordCount += 2;
                                             kvPreWriteBuffer.put(
                                                     key,
                                                     ValueEncoder.encodeValue(schemaId, newRow),
    @@ -317,7 +314,6 @@ public LogAppendInfo putAsLeader(
                                                 updateRow(oldRow, kvRecord.getRow(), partialUpdater);
                                         walBuilder.append(RowKind.UPDATE_BEFORE, oldRow);
                                         walBuilder.append(RowKind.UPDATE_AFTER, newRow);
    -                                    appendedRecordCount += 2;
                                         // logOffset is for -U, logOffset + 1 is for +U, we need to use
                                         // the log offset for +U
                                         kvPreWriteBuffer.put(
    @@ -331,7 +327,6 @@ public LogAppendInfo putAsLeader(
                                         //  of the input row are set to null.
                                         BinaryRow newRow = kvRecord.getRow();
                                         walBuilder.append(RowKind.INSERT, newRow);
    -                                    appendedRecordCount += 1;
                                         kvPreWriteBuffer.put(
                                                 key,
                                                 ValueEncoder.encodeValue(schemaId, newRow),
    @@ -340,21 +335,17 @@ public LogAppendInfo putAsLeader(
                                 }
                             }
     
    -                        // if appendedRecordCount is 0, it means there is no record to append, we
    -                        // should not append.
    -                        if (appendedRecordCount > 0) {
    -                            // now, we can build the full log.
    -                            return logTablet.appendAsLeader(walBuilder.build());
    -                        } else {
    -                            return new LogAppendInfo(
    -                                    logEndOffsetOfPrevBatch - 1,
    -                                    logEndOffsetOfPrevBatch - 1,
    -                                    0L,
    -                                    0L,
    -                                    0,
    -                                    0,
    -                                    false);
    -                        }
    +                        // There will be a situation that these batches of kvRecordBatch have not
    +                        // generated any CDC logs, for example, when client attempts to delete
    +                        // some non-existent keys or MergeEngine set to FIRST_ROW. In this case,
    +                        // we cannot simply return, as doing so would cause a
    +                        // OutOfOrderSequenceException problem. Therefore, here we will build an
    +                        // empty batch with lastLogOffset to 0L as the baseLogOffset is 0L. As doing
    +                        // that, the logOffsetDelta in logRecordBatch will be set to 0L. So, we will
    +                        // put a batch into file with recordCount 0 and offset plus 1L, it will
    +                        // update the batchSequence corresponding to the writerId and also increment
    +                        // the CDC log offset by 1.
    +                        return logTablet.appendAsLeader(walBuilder.build());
                         } catch (Throwable t) {
                             // While encounter error here, the CDC logs may fail writing to disk,
                             // and the client probably will resend the batch. If we do not remove the
    diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
    index dbae560fe..92ceafc9e 100644
    --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
    +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
    @@ -690,8 +690,7 @@ private AssignResult assignOffsetAndTimestamp(
                             "Currently, we only support DefaultLogRecordBatch.");
                 }
     
    -            int recordCount = batch.getRecordCount();
    -            initialOffset += recordCount;
    +            initialOffset = batch.nextLogOffset();
             }
     
             return new AssignResult(initialOffset - 1, commitTimestamp, baseLogOffset);
    diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java
    index 32990148d..65954851f 100644
    --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java
    +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java
    @@ -24,7 +24,9 @@
     import com.alibaba.fluss.record.DefaultValueRecordBatch;
     import com.alibaba.fluss.record.KvRecord;
     import com.alibaba.fluss.record.KvRecordBatch;
    +import com.alibaba.fluss.record.LogRecord;
     import com.alibaba.fluss.record.LogRecordBatch;
    +import com.alibaba.fluss.record.LogRecordReadContext;
     import com.alibaba.fluss.record.LogRecords;
     import com.alibaba.fluss.record.MemoryLogRecords;
     import com.alibaba.fluss.record.RowKind;
    @@ -53,6 +55,7 @@
     import com.alibaba.fluss.types.DataField;
     import com.alibaba.fluss.types.DataTypes;
     import com.alibaba.fluss.types.RowType;
    +import com.alibaba.fluss.utils.CloseableIterator;
     import com.alibaba.fluss.utils.types.Tuple2;
     
     import org.junit.jupiter.api.Test;
    @@ -65,11 +68,13 @@
     import java.util.Arrays;
     import java.util.Collections;
     import java.util.HashMap;
    +import java.util.Iterator;
     import java.util.List;
     import java.util.Map;
     import java.util.concurrent.CompletableFuture;
     import java.util.concurrent.TimeUnit;
     
    +import static com.alibaba.fluss.record.LogRecordReadContext.createArrowReadContext;
     import static com.alibaba.fluss.record.TestData.ANOTHER_DATA1;
     import static com.alibaba.fluss.record.TestData.DATA1;
     import static com.alibaba.fluss.record.TestData.DATA1_KEY_TYPE;
    @@ -81,9 +86,11 @@
     import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK;
     import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE;
     import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID;
    +import static com.alibaba.fluss.record.TestData.EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK;
     import static com.alibaba.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH;
     import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH;
     import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH;
    +import static com.alibaba.fluss.testutils.DataTestUtils.assertLogRecordBatchEqualsWithRowKind;
     import static com.alibaba.fluss.testutils.DataTestUtils.assertLogRecordsEquals;
     import static com.alibaba.fluss.testutils.DataTestUtils.assertLogRecordsEqualsWithRowKind;
     import static com.alibaba.fluss.testutils.DataTestUtils.assertMemoryRecordsEquals;
    @@ -562,6 +569,73 @@ void testPutKvWithOutOfBatchSequence() throws Exception {
                     DATA1_ROW_TYPE, records, Arrays.asList(expectedLogForData1, expectedLogForData2));
         }
     
    +    @Test
    +    void testPutKvWithDeleteNonExistsKey() throws Exception {
    +        TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1);
    +        makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket());
    +
    +        // put 10 batches delete non-exists key batch to kv store.
    +        CompletableFuture> future;
    +        List> deleteList =
    +                Arrays.asList(
    +                        Tuple2.of(new Object[] {1}, null),
    +                        Tuple2.of(new Object[] {2}, null),
    +                        Tuple2.of(new Object[] {3}, null),
    +                        Tuple2.of(new Object[] {4}, null));
    +        for (int i = 0; i < 10; i++) {
    +            future = new CompletableFuture<>();
    +            replicaManager.putRecordsToKv(
    +                    20000,
    +                    1,
    +                    Collections.singletonMap(tb, genKvRecordBatch(deleteList)),
    +                    null,
    +                    future::complete);
    +            assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, i + 1));
    +        }
    +
    +        // 2. write a normal batch.
    +        future = new CompletableFuture<>();
    +        replicaManager.putRecordsToKv(
    +                20000,
    +                1,
    +                Collections.singletonMap(tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
    +                null,
    +                future::complete);
    +        assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 18));
    +
    +        // 2. get the cdc-log of these batches.
    +        CompletableFuture> future1 =
    +                new CompletableFuture<>();
    +        replicaManager.fetchLogRecords(
    +                buildFetchParams(-1),
    +                Collections.singletonMap(tb, new FetchData(tb.getTableId(), 0L, Integer.MAX_VALUE)),
    +                future1::complete);
    +        FetchLogResultForBucket resultForBucket = future1.get().get(tb);
    +        assertThat(resultForBucket.getHighWatermark()).isEqualTo(18L);
    +        LogRecords logRecords = resultForBucket.recordsOrEmpty();
    +        Iterator iterator = logRecords.batches().iterator();
    +        for (int i = 0; i < 10; i++) {
    +            assertThat(iterator.hasNext()).isTrue();
    +            LogRecordBatch logRecordBatch = iterator.next();
    +            assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
    +            assertThat(logRecordBatch.baseLogOffset()).isEqualTo(i);
    +            assertThat(logRecordBatch.lastLogOffset()).isEqualTo(i);
    +            assertThat(logRecordBatch.nextLogOffset()).isEqualTo(i + 1);
    +            try (LogRecordReadContext readContext =
    +                            createArrowReadContext(DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
    +                    CloseableIterator logIterator =
    +                            logRecordBatch.records(readContext)) {
    +                assertThat(logIterator.hasNext()).isFalse();
    +            }
    +        }
    +
    +        assertThat(iterator.hasNext()).isTrue();
    +        LogRecordBatch logRecordBatch = iterator.next();
    +        assertThat(iterator.hasNext()).isFalse();
    +        assertLogRecordBatchEqualsWithRowKind(
    +                DATA1_ROW_TYPE, logRecordBatch, EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK);
    +    }
    +
         @Test
         void testLookup() throws Exception {
             TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1);
    diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java
    index 1d3e0e472..521a9796e 100644
    --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java
    +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java
    @@ -23,6 +23,7 @@
     import com.alibaba.fluss.metadata.TablePath;
     import com.alibaba.fluss.record.KvRecordBatch;
     import com.alibaba.fluss.record.KvRecordTestUtils;
    +import com.alibaba.fluss.record.LogRecordBatch;
     import com.alibaba.fluss.record.LogRecords;
     import com.alibaba.fluss.record.MemoryLogRecords;
     import com.alibaba.fluss.record.RowKind;
    @@ -49,6 +50,7 @@
     import java.util.ArrayList;
     import java.util.Arrays;
     import java.util.Collections;
    +import java.util.Iterator;
     import java.util.List;
     
     import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
    @@ -273,13 +275,16 @@ void testPutRecordsToLeader() throws Exception {
                     .isEqualTo(expected);
             currentOffset += 3;
     
    -        // delete k2 again, shouldn't produce any log records
    +        // delete k2 again, will produce a batch with empty record.
             kvRecords = kvRecordBatchFactory.ofRecords(kvRecordFactory.ofRecord("k2", null));
             logAppendInfo = putRecordsToLeader(kvReplica, kvRecords);
    -        assertThat(logAppendInfo.lastOffset()).isEqualTo(11);
    -        assertThatLogRecords(fetchRecords(kvReplica, currentOffset))
    -                .withSchema(DATA1_ROW_TYPE)
    -                .isEqualTo(MemoryLogRecords.EMPTY);
    +        assertThat(logAppendInfo.lastOffset()).isEqualTo(12);
    +        LogRecords logRecords = fetchRecords(kvReplica, currentOffset);
    +        Iterator iterator = logRecords.batches().iterator();
    +        assertThat(iterator.hasNext()).isTrue();
    +        LogRecordBatch batch = iterator.next();
    +        assertThat(batch.getRecordCount()).isEqualTo(0);
    +        currentOffset += 1;
     
             // delete k1 and put k1 again, should produce -D, +I
             kvRecords =
    @@ -287,7 +292,7 @@ void testPutRecordsToLeader() throws Exception {
                             kvRecordFactory.ofRecord("k1", null),
                             kvRecordFactory.ofRecord("k1", new Object[] {1, "aaa"}));
             logAppendInfo = putRecordsToLeader(kvReplica, kvRecords);
    -        assertThat(logAppendInfo.lastOffset()).isEqualTo(13);
    +        assertThat(logAppendInfo.lastOffset()).isEqualTo(14);
             expected =
                     logRecords(
                             currentOffset,