Skip to content

Commit

Permalink
address jark's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Jan 16, 2025
1 parent f6cfb55 commit 82dac7e
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@
* byte. The CRC-32C (Castagnoli) polynomial is used for the computation. CommitTimestamp is also
* located before the CRC, because it is determined in server side.
*
* <p>The filed 'lastOffsetDelta is used to calculate the lastOffset of the current batch as:
* <p>The field 'lastOffsetDelta is used to calculate the lastOffset of the current batch as:
* [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset = baseOffset + recordCount -
* 1]. The reason for introducing this field is that there might be cases where the offset delta in
* batch does not match the recordCount. For example, when generating CDC logs for a kv table and
* sending a batch that only contains the deletion of non-existent kvs, no CDC logs would be
* generated. However, we need to increment the batchSequence for the corresponding writerId to make
* sure no {@link OutOfOrderSequenceException} will be thrown. In such a case, we would generate a
* logRecordBatch with a LastOffsetDelta of 1 but a recordCount of 0.
* logRecordBatch with a LastOffsetDelta of 0 but a recordCount of 0.
*
* <p>The current attributes are given below:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.alibaba.fluss.record.bytesview.MultiBytesView;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.arrow.ArrowWriter;
import com.alibaba.fluss.utils.Preconditions;
import com.alibaba.fluss.utils.crc.Crc32C;

import java.io.IOException;
Expand Down Expand Up @@ -60,7 +59,6 @@ public class MemoryLogRecordsArrowBuilder implements AutoCloseable {
private int sizeInBytes;
private int recordCount;
private boolean isClosed;
private long lastLogOffset;
private boolean reCalculateSizeInBytes = false;

private MemoryLogRecordsArrowBuilder(
Expand All @@ -80,7 +78,6 @@ private MemoryLogRecordsArrowBuilder(

this.writerId = NO_WRITER_ID;
this.batchSequence = NO_BATCH_SEQUENCE;
this.lastLogOffset = -1L;
this.isClosed = false;

this.pagedOutputView = pagedOutputView;
Expand Down Expand Up @@ -170,14 +167,6 @@ public void resetWriterState(long writerId, int batchSequence) {
this.batchSequence = batchSequence;
}

public void overrideLastLogOffset(long lastLogOffset) {
Preconditions.checkArgument(
lastLogOffset >= arrowWriter.getRecordsCount() + baseLogOffset,
"The override lastLogOffset is less than recordCount + baseLogOffset, "
+ "which will cause the logOffsetDelta to be negative");
this.lastLogOffset = lastLogOffset;
}

public boolean isClosed() {
return isClosed;
}
Expand Down Expand Up @@ -228,10 +217,12 @@ private void writeBatchHeader() throws IOException {
outputView.writeShort((short) schemaId);
// skip write attributes
outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
if (lastLogOffset < 0) {
if (recordCount > 0) {
outputView.writeInt(recordCount - 1);
} else {
outputView.writeInt((int) (lastLogOffset - baseLogOffset));
// If there is no record, we write 0 for filed lastOffsetDelta, see the comments about
// the field 'lastOffsetDelta' in DefaultLogRecordBatch.
outputView.writeInt(0);
}
outputView.writeLong(writerId);
outputView.writeInt(batchSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class MemoryLogRecordsIndexedBuilder implements AutoCloseable {
private int currentRecordNumber;
private int sizeInBytes;
private boolean isClosed;
private long lastLogOffset;

private MemoryLogRecordsIndexedBuilder(
long baseLogOffset,
Expand All @@ -79,7 +78,6 @@ private MemoryLogRecordsIndexedBuilder(
this.writerId = NO_WRITER_ID;
this.batchSequence = NO_BATCH_SEQUENCE;
this.currentRecordNumber = 0;
this.lastLogOffset = -1L;
this.isClosed = false;

// We don't need to write header information while the builder creating,
Expand Down Expand Up @@ -154,14 +152,6 @@ public void resetWriterState(long writerId, int batchSequence) {
this.batchSequence = batchSequence;
}

public void overrideLastLogOffset(long lastLogOffset) {
Preconditions.checkArgument(
lastLogOffset >= currentRecordNumber + baseLogOffset,
"The override lastLogOffset is less than recordCount + baseLogOffset, "
+ "which will cause the logOffsetDelta to be negative");
this.lastLogOffset = lastLogOffset;
}

public long writerId() {
return writerId;
}
Expand Down Expand Up @@ -202,10 +192,12 @@ private void writeBatchHeader() throws IOException {
outputView.writeShort((short) schemaId);
// skip write attribute byte for now.
outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
if (lastLogOffset < 0) {
if (currentRecordNumber > 0) {
outputView.writeInt(currentRecordNumber - 1);
} else {
outputView.writeInt((int) (lastLogOffset - baseLogOffset));
// If there is no record, we write 0 for filed lastOffsetDelta, see the comments about
// the field 'lastOffsetDelta' in DefaultLogRecordBatch.
outputView.writeInt(0);
}
outputView.writeLong(writerId);
outputView.writeInt(batchSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@
import com.alibaba.fluss.testutils.DataTestUtils;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.CloseableIterator;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static com.alibaba.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link DefaultLogRecordBatch}. */
public class DefaultLogRecordBatchTest extends LogTestBase {
Expand Down Expand Up @@ -101,69 +97,53 @@ void testIndexedRowWriteAndReadBatch() throws Exception {
}

@Test
void testOverrideLastLogOffset() throws Exception {
List<IndexedRow> rows1 =
Arrays.asList(
row(baseRowType, new Object[] {1, "1"}),
row(baseRowType, new Object[] {2, "2"}),
row(baseRowType, new Object[] {3, "3"}));

void testNoRecordAppend() throws Exception {
// 1. no record append with baseOffset as 0.
MemoryLogRecordsIndexedBuilder builder =
MemoryLogRecordsIndexedBuilder.builder(
0L, schemaId, Integer.MAX_VALUE, magic, outputView);
for (IndexedRow row : rows1) {
builder.append(RowKind.APPEND_ONLY, row);
}

// override lastLogOffset smaller than record counts.
assertThatThrownBy(() -> builder.overrideLastLogOffset(1L))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"The override lastLogOffset is less than recordCount + baseLogOffset,"
+ " which will cause the logOffsetDelta to be negative");

// override lastLogOffset larger than record counts.
builder.overrideLastLogOffset(5L);
MemoryLogRecords memoryLogRecords = builder.build();
0L, schemaId, Integer.MAX_VALUE, magic, new UnmanagedPagedOutputView(100));
MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
Iterator<LogRecordBatch> iterator = memoryLogRecords.batches().iterator();
// only contains batch header.
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);

assertThat(iterator.hasNext()).isTrue();
LogRecordBatch logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();

logRecordBatch.ensureValid();
assertThat(logRecordBatch.getRecordCount()).isEqualTo(3);
assertThat(logRecordBatch.lastLogOffset()).isEqualTo(5);
assertThat(logRecordBatch.nextLogOffset()).isEqualTo(6);
assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
try (LogRecordReadContext readContext =
LogRecordReadContext.createIndexedReadContext(baseRowType, schemaId);
CloseableIterator<LogRecord> iter = logRecordBatch.records(readContext)) {
for (IndexedRow row : rows1) {
assertThat(iter.hasNext()).isTrue();
LogRecord record = iter.next();
assertThat(record.getRow()).isEqualTo(row);
}
assertThat(iter.hasNext()).isFalse();
}

// test empty record batch.
MemoryLogRecordsIndexedBuilder builder2 =
// 2. no record append with baseOffset as 100.
builder =
MemoryLogRecordsIndexedBuilder.builder(
0L, schemaId, Integer.MAX_VALUE, magic, outputView);
builder2.overrideLastLogOffset(0);
memoryLogRecords = builder2.build();
100L,
schemaId,
Integer.MAX_VALUE,
magic,
new UnmanagedPagedOutputView(100));
memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
iterator = memoryLogRecords.batches().iterator();
// only contains batch header.
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);

assertThat(iterator.hasNext()).isTrue();
logRecordBatch = iterator.next();
assertThat(iterator.hasNext()).isFalse();

logRecordBatch.ensureValid();
assertThat(logRecordBatch.getRecordCount()).isEqualTo(0);
assertThat(logRecordBatch.lastLogOffset()).isEqualTo(0);
assertThat(logRecordBatch.nextLogOffset()).isEqualTo(1);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(0);
assertThat(logRecordBatch.lastLogOffset()).isEqualTo(100);
assertThat(logRecordBatch.nextLogOffset()).isEqualTo(101);
assertThat(logRecordBatch.baseLogOffset()).isEqualTo(100);
try (LogRecordReadContext readContext =
LogRecordReadContext.createIndexedReadContext(baseRowType, schemaId);
CloseableIterator<LogRecord> iter = logRecordBatch.records(readContext)) {
Expand Down
Loading

0 comments on commit 82dac7e

Please sign in to comment.