diff --git a/pom.xml b/pom.xml
index b446ba8f8..2fb387616 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
32.1.3-jre
2.0.9
2.2
- 0.14.0-SNAPSHOT
+ 0.15.0-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index 5e532e9b3..7a1317758 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.14.0-SNAPSHOT
+ 0.15.0-SNAPSHOT
5.5.0
5.10.0
diff --git a/s3stream/src/main/java/com/automq/stream/api/CreateStreamOptions.java b/s3stream/src/main/java/com/automq/stream/api/CreateStreamOptions.java
index d4c0e7879..470dd15b6 100644
--- a/s3stream/src/main/java/com/automq/stream/api/CreateStreamOptions.java
+++ b/s3stream/src/main/java/com/automq/stream/api/CreateStreamOptions.java
@@ -42,7 +42,7 @@ public static class Builder {
private final CreateStreamOptions options = new CreateStreamOptions();
public Builder replicaCount(int replicaCount) {
- Arguments.check(replicaCount > 0, "replica count should larger than 0");
+ Arguments.check(replicaCount > 0, "replica endOffsetDelta should larger than 0");
options.replicaCount = replicaCount;
return this;
}
diff --git a/s3stream/src/main/java/com/automq/stream/api/RecordBatch.java b/s3stream/src/main/java/com/automq/stream/api/RecordBatch.java
index dbe3d7892..39789184c 100644
--- a/s3stream/src/main/java/com/automq/stream/api/RecordBatch.java
+++ b/s3stream/src/main/java/com/automq/stream/api/RecordBatch.java
@@ -26,9 +26,9 @@
public interface RecordBatch {
/**
- * Get payload record count.
+ * Get payload record endOffsetDelta.
*
- * @return record count.
+ * @return record endOffsetDelta.
*/
int count();
diff --git a/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java b/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java
new file mode 100644
index 000000000..71c669d2e
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3;
+
+import io.netty.buffer.ByteBuf;
+
+public record DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition,
+ int size) {
+
+ public static final int BLOCK_INDEX_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */
+ + 4 /* record count */ + 8 /* block position */ + 4 /* block size */;
+
+ public long endOffset() {
+ return startOffset + endOffsetDelta;
+ }
+
+ public long endPosition() {
+ return startPosition + size;
+ }
+
+ public void encode(ByteBuf buf) {
+ buf.writeLong(streamId);
+ buf.writeLong(startOffset);
+ buf.writeInt(endOffsetDelta);
+ buf.writeInt(recordCount);
+ buf.writeLong(startPosition);
+ buf.writeInt(size);
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
index 964aa81ed..095410070 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
@@ -70,7 +70,7 @@ public CompletableFuture find(long streamId, long startOffset,
public CompletableFuture read(DataBlockIndex block) {
CompletableFuture rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.THROTTLE_1);
- return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount()));
+ return rangeReadCf.thenApply(DataBlock::new);
}
void asyncGetBasicObjectInfo() {
@@ -121,12 +121,10 @@ public void close0() {
}
/**
- * @param dataBlockSize The total size of the data blocks, which equals to index start position.
- * @param indexBlock raw index data.
- * @param blockCount The number of data blocks in the object.
- * @param indexBlockSize The size of the index blocks.
+ * @param dataBlockSize The total size of the data blocks, which equals to index start position.
+ * @param indexBlock raw index data.
*/
- public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount, int indexBlockSize) {
+ public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) {
public static BasicObjectInfo parse(ByteBuf objectTailBuf,
S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
@@ -145,13 +143,7 @@ public static BasicObjectInfo parse(ByteBuf objectTailBuf,
indexBlockBuf.readBytes(copy, indexBlockBuf.readableBytes());
objectTailBuf.release();
indexBlockBuf = copy;
-
- int blockCount = indexBlockBuf.readInt();
- ByteBuf blocks = indexBlockBuf.retainedSlice(indexBlockBuf.readerIndex(), blockCount * 16);
- indexBlockBuf.skipBytes(blockCount * 16);
- ByteBuf streamRanges = indexBlockBuf.retainedSlice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes());
- indexBlockBuf.release();
- return new BasicObjectInfo(indexBlockPosition, new IndexBlock(s3ObjectMetadata, blocks, streamRanges), blockCount, indexBlockSize);
+ return new BasicObjectInfo(indexBlockPosition, new IndexBlock(s3ObjectMetadata, indexBlockBuf));
}
}
@@ -165,48 +157,47 @@ void close() {
}
public static class IndexBlock {
+ public static final int INDEX_BLOCK_UNIT_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */
+ + 4 /* record endOffsetDelta */ + 8 /* block position */ + 4 /* block size */;
private final S3ObjectMetadata s3ObjectMetadata;
- private final ByteBuf blocks;
- private final ByteBuf streamRanges;
+ private final ByteBuf buf;
private final int size;
+ private final int count;
- public IndexBlock(S3ObjectMetadata s3ObjectMetadata, ByteBuf blocks, ByteBuf streamRanges) {
+ public IndexBlock(S3ObjectMetadata s3ObjectMetadata, ByteBuf buf) {
this.s3ObjectMetadata = s3ObjectMetadata;
- this.blocks = blocks;
- this.streamRanges = streamRanges;
- this.size = blocks.readableBytes() + streamRanges.readableBytes();
+ this.buf = buf;
+ this.size = buf.readableBytes();
+ this.count = buf.readableBytes() / INDEX_BLOCK_UNIT_SIZE;
}
- public Iterator iterator() {
- ByteBuf blocks = this.blocks.slice();
- ByteBuf ranges = this.streamRanges.slice();
+ public Iterator iterator() {
+ AtomicInteger getIndex = new AtomicInteger(0);
return new Iterator<>() {
@Override
public boolean hasNext() {
- return ranges.readableBytes() != 0;
+ return getIndex.get() < count;
}
@Override
- public StreamDataBlock next() {
- long streamId = ranges.readLong();
- long startOffset = ranges.readLong();
- long endOffset = startOffset + ranges.readInt();
- int rangeBlockId = ranges.readInt();
- long blockPosition = blocks.getLong(rangeBlockId * 16);
- int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
- int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
- return new StreamDataBlock(streamId, startOffset, endOffset, rangeBlockId, s3ObjectMetadata.objectId(),
- blockPosition, blockSize, recordCount);
+ public DataBlockIndex next() {
+ return get(getIndex.getAndIncrement());
}
};
}
- public ByteBuf blocks() {
- return blocks.slice();
- }
-
- public ByteBuf streamRanges() {
- return streamRanges.slice();
+ public DataBlockIndex get(int index) {
+ if (index < 0 || index >= count) {
+ throw new IllegalArgumentException("index" + index + " is out of range [0, " + count + ")");
+ }
+ int base = index * INDEX_BLOCK_UNIT_SIZE;
+ long streamId = buf.getLong(base);
+ long startOffset = buf.getLong(base + 8);
+ int endOffsetDelta = buf.getInt(base + 16);
+ int recordCount = buf.getInt(base + 20);
+ long blockPosition = buf.getLong(base + 24);
+ int blockSize = buf.getInt(base + 32);
+ return new DataBlockIndex(streamId, startOffset, endOffsetDelta, recordCount, blockPosition, blockSize);
}
public FindIndexResult find(long streamId, long startOffset, long endOffset) {
@@ -219,37 +210,30 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
boolean matched = false;
boolean isFulfilled = false;
List rst = new LinkedList<>();
- IndexBlockOrderedBytes indexBlockOrderedBytes = new IndexBlockOrderedBytes(streamRanges);
+ IndexBlockOrderedBytes indexBlockOrderedBytes = new IndexBlockOrderedBytes(this);
int startIndex = indexBlockOrderedBytes.search(new IndexBlockOrderedBytes.TargetStreamOffset(streamId, startOffset));
if (startIndex == -1) {
// mismatched
return new FindIndexResult(false, nextStartOffset, nextMaxBytes, rst);
}
- for (int i = startIndex * 24; i < streamRanges.readableBytes(); i += 24) {
- long rangeStreamId = streamRanges.getLong(i);
- long rangeStartOffset = streamRanges.getLong(i + 8);
- long rangeEndOffset = rangeStartOffset + streamRanges.getInt(i + 16);
- int rangeBlockId = streamRanges.getInt(i + 20);
- if (rangeStreamId == streamId) {
- if (nextStartOffset < rangeStartOffset) {
+ for (int i = startIndex; i < count(); i++) {
+ DataBlockIndex index = get(i);
+ if (index.streamId() == streamId) {
+ if (nextStartOffset < index.startOffset()) {
break;
}
- if (rangeEndOffset <= nextStartOffset) {
+ if (index.endOffset() <= nextStartOffset) {
continue;
}
- matched = nextStartOffset == rangeStartOffset;
- nextStartOffset = rangeEndOffset;
- long blockPosition = blocks.getLong(rangeBlockId * 16);
- int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
- int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
- rst.add(new StreamDataBlock(streamId, rangeStartOffset, rangeEndOffset, s3ObjectMetadata.objectId(),
- new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount)));
+ matched = nextStartOffset == index.startOffset();
+ nextStartOffset = index.endOffset();
+ rst.add(new StreamDataBlock(s3ObjectMetadata.objectId(), index));
// we consider first block as not matched because we do not know exactly how many bytes are within
// the range in first block, as a result we may read one more block than expected.
if (matched) {
- int recordPayloadSize = blockSize
- - recordCount * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
+ int recordPayloadSize = index.size()
+ - index.recordCount() * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
nextMaxBytes -= Math.min(nextMaxBytes, recordPayloadSize);
}
@@ -264,13 +248,16 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
return new FindIndexResult(isFulfilled, nextStartOffset, nextMaxBytes, rst);
}
- int size() {
+ public int size() {
return size;
}
+ public int count() {
+ return count;
+ }
+
void close() {
- blocks.release();
- streamRanges.release();
+ buf.release();
}
}
@@ -288,45 +275,36 @@ public IndexBlockParseException(long indexBlockPosition) {
}
- public record DataBlockIndex(int blockId, long startPosition, int size, int recordCount) {
- public static final int BLOCK_INDEX_SIZE = 8 + 4 + 4;
-
- public long endPosition() {
- return startPosition + size;
- }
-
- @Override
- public String toString() {
- return "DataBlockIndex{" +
- "blockId=" + blockId +
- ", startPosition=" + startPosition +
- ", size=" + size +
- ", recordCount=" + recordCount +
- '}';
- }
- }
-
public static class DataBlock implements AutoCloseable {
private final ByteBuf buf;
private final int recordCount;
- public DataBlock(ByteBuf buf, int recordCount) {
- this.buf = buf;
- this.recordCount = recordCount;
+ public DataBlock(ByteBuf buf) {
+ this.buf = buf.duplicate();
+ this.recordCount = check(buf);
+ }
+
+ private static int check(ByteBuf buf) {
+ buf = buf.duplicate();
+ int recordCount = 0;
+ while (buf.readableBytes() > 0) {
+ byte magicCode = buf.readByte();
+ if (magicCode != ObjectWriter.DATA_BLOCK_MAGIC) {
+ LOGGER.error("magic code mismatch, expected {}, actual {}", ObjectWriter.DATA_BLOCK_MAGIC, magicCode);
+ throw new RuntimeException("[FATAL] magic code mismatch, data is corrupted");
+ }
+ buf.readByte(); // flag
+ recordCount += buf.readInt();
+ int dataLength = buf.readInt();
+ buf.skipBytes(dataLength);
+ }
+ return recordCount;
}
public CloseableIterator iterator() {
ByteBuf buf = this.buf.duplicate();
+ AtomicInteger currentBlockRecordCount = new AtomicInteger(0);
AtomicInteger remainingRecordCount = new AtomicInteger(recordCount);
- // skip magic and flag
- byte magicCode = buf.readByte();
- buf.readByte();
-
- if (magicCode != ObjectWriter.DATA_BLOCK_MAGIC) {
- LOGGER.error("magic code mismatch, expected {}, actual {}", ObjectWriter.DATA_BLOCK_MAGIC, magicCode);
- throw new RuntimeException("[FATAL] magic code mismatch, data is corrupted");
- }
- // TODO: check flag, use uncompressed stream or compressed stream.
return new CloseableIterator<>() {
@Override
public boolean hasNext() {
@@ -339,6 +317,12 @@ public StreamRecordBatch next() {
if (remainingRecordCount.decrementAndGet() < 0) {
throw new NoSuchElementException();
}
+ if (currentBlockRecordCount.get() == 0) {
+ buf.skipBytes(1 /* magic */ + 1 /* flag */);
+ currentBlockRecordCount.set(buf.readInt());
+ buf.skipBytes(4);
+ }
+ currentBlockRecordCount.decrementAndGet();
return StreamRecordBatchCodec.duplicateDecode(buf);
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
index 7753188f7..4fb595c6a 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
@@ -200,33 +200,15 @@ class IndexBlock {
public IndexBlock() {
long nextPosition = 0;
- int indexBlockSize = 4 + (8 + 4 + 4 + 8 + 8 + 4 + 4) * completedBlocks.size();
+ int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * completedBlocks.size();
buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, "write_index_block");
- buf.writeInt(completedBlocks.size()); // block count
- // block index
for (DataBlock block : completedBlocks) {
- // start position in the object
- buf.writeLong(nextPosition);
- // byte size of the block
- buf.writeInt(block.size());
- // how many ranges in the block
- buf.writeInt(block.recordCount());
+ ObjectStreamRange streamRange = block.getStreamRange();
+ new DataBlockIndex(streamRange.getStreamId(), streamRange.getStartOffset(), (int) (streamRange.getEndOffset() - streamRange.getStartOffset()),
+ block.recordCount(), nextPosition, block.size()).encode(buf);
nextPosition += block.size();
}
position = nextPosition;
- // object stream range
- for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) {
- DataBlock block = completedBlocks.get(blockIndex);
- ObjectStreamRange range = block.getStreamRange();
- // stream id of this range
- buf.writeLong(range.getStreamId());
- // start offset of the related stream
- buf.writeLong(range.getStartOffset());
- // record count of the related stream in this range
- buf.writeInt((int) (range.getEndOffset() - range.getStartOffset()));
- // the index of block where this range is in
- buf.writeInt(blockIndex);
- }
}
public ByteBuf buffer() {
@@ -244,7 +226,7 @@ public int size() {
}
class DataBlock {
- public static final int BLOCK_HEADER_SIZE = 2;
+ public static final int BLOCK_HEADER_SIZE = 1 /* magic */ + 1/* flag */ + 4 /* record count*/ + 4 /* data length */;
private final CompositeByteBuf encodedBuf;
private final ObjectStreamRange streamRange;
private final int recordCount;
@@ -256,9 +238,12 @@ public DataBlock(long streamId, List records) {
ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE);
header.writeByte(DATA_BLOCK_MAGIC);
header.writeByte(DATA_BLOCK_DEFAULT_FLAG);
+ header.writeInt(recordCount);
+ header.writeInt(0);
encodedBuf.addComponent(true, header);
records.forEach(r -> encodedBuf.addComponent(true, r.encoded().retain()));
this.size = encodedBuf.readableBytes();
+ encodedBuf.setInt(BLOCK_HEADER_SIZE - 4, size - BLOCK_HEADER_SIZE);
this.streamRange = new ObjectStreamRange(streamId, records.get(0).getEpoch(), records.get(0).getBaseOffset(), records.get(records.size() - 1).getLastOffset(), size);
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java
index 3fb8ed413..a0710a91b 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java
@@ -27,52 +27,35 @@ public class StreamDataBlock {
public static final Comparator STREAM_OFFSET_COMPARATOR = Comparator.comparingLong(StreamDataBlock::getStartOffset);
public static final Comparator BLOCK_POSITION_COMPARATOR = Comparator.comparingLong(StreamDataBlock::getBlockStartPosition);
private final long objectId;
-
- // Stream attributes
- private final long streamId;
- private final long startOffset;
- private final long endOffset;
-
- private final ObjectReader.DataBlockIndex dataBlockIndex;
+ private final DataBlockIndex dataBlockIndex;
private final CompletableFuture dataCf = new CompletableFuture<>();
private final AtomicInteger refCount = new AtomicInteger(1);
- public StreamDataBlock(long streamId, long startOffset, long endOffset, long objectId,
- ObjectReader.DataBlockIndex dataBlockIndex) {
- this.streamId = streamId;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
+ public StreamDataBlock(long objectId, DataBlockIndex dataBlockIndex) {
this.dataBlockIndex = dataBlockIndex;
this.objectId = objectId;
}
- public StreamDataBlock(long streamId, long startOffset, long endOffset, int blockId,
+ public StreamDataBlock(long streamId, long startOffset, long endOffset,
long objectId, long blockPosition, int blockSize, int recordCount) {
- this.streamId = streamId;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
this.objectId = objectId;
- this.dataBlockIndex = new ObjectReader.DataBlockIndex(blockId, blockPosition, blockSize, recordCount);
+ this.dataBlockIndex = new DataBlockIndex(streamId, startOffset, (int) (endOffset - startOffset), recordCount, blockPosition, blockSize);
}
public long getStreamId() {
- return streamId;
+ return dataBlockIndex.streamId();
}
public long getStartOffset() {
- return startOffset;
+ return dataBlockIndex.startOffset();
}
public long getEndOffset() {
- return endOffset;
+ return dataBlockIndex.endOffset();
}
public long getStreamRangeSize() {
- return endOffset - startOffset;
- }
-
- public int getBlockId() {
- return dataBlockIndex.blockId();
+ return dataBlockIndex.endOffsetDelta();
}
public long getObjectId() {
@@ -91,11 +74,7 @@ public int getBlockSize() {
return dataBlockIndex.size();
}
- public int getRecordCount() {
- return dataBlockIndex.recordCount();
- }
-
- public ObjectReader.DataBlockIndex dataBlockIndex() {
+ public DataBlockIndex dataBlockIndex() {
return dataBlockIndex;
}
@@ -121,9 +100,6 @@ public void release() {
public String toString() {
return "StreamDataBlock{" +
"objectId=" + objectId +
- ", streamId=" + streamId +
- ", startOffset=" + startOffset +
- ", endOffset=" + endOffset +
", dataBlockIndex=" + dataBlockIndex +
'}';
}
@@ -135,13 +111,12 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass())
return false;
StreamDataBlock that = (StreamDataBlock) o;
- return streamId == that.streamId && startOffset == that.startOffset && endOffset == that.endOffset
- && objectId == that.objectId && dataBlockIndex.equals(that.dataBlockIndex);
+ return objectId == that.objectId && dataBlockIndex.equals(that.dataBlockIndex);
}
@Override
public int hashCode() {
- return Objects.hash(streamId, startOffset, endOffset, objectId, dataBlockIndex);
+ return Objects.hash(objectId, dataBlockIndex);
}
}
\ No newline at end of file
diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java
index 903d4be63..2a286d8ce 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java
@@ -26,7 +26,6 @@
import com.automq.stream.s3.operator.Writer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -42,8 +41,8 @@
*/
public class StreamObjectCompactor {
/**
- * max object count in one group, the group count will limit the compact request size to kraft and multi-part object
- * part count (less than {@code Writer.MAX_PART_COUNT}).
+ * max object endOffsetDelta in one group, the group endOffsetDelta will limit the compact request size to kraft and multi-part object
+ * part endOffsetDelta (less than {@code Writer.MAX_PART_COUNT}).
*/
private static final int MAX_OBJECT_GROUP_COUNT = Math.min(5000, Writer.MAX_PART_COUNT / 2);
private static final Logger LOGGER = LoggerFactory.getLogger(StreamObjectCompactor.class);
@@ -114,52 +113,38 @@ public Optional compact() throws ExecutionException,
return Optional.empty();
}
long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get();
- int blockId = 0;
long nextBlockPosition = 0;
long objectSize = 0;
long compactedStartOffset = objectGroup.get(0).startOffset();
long compactedEndOffset = objectGroup.get(objectGroup.size() - 1).endOffset();
List compactedObjectIds = new LinkedList<>();
- CompositeByteBuf blocksOfIndex = DirectByteBufAlloc.compositeByteBuffer();
- CompositeByteBuf rangesOfIndex = DirectByteBufAlloc.compositeByteBuffer();
+ CompositeByteBuf indexes = DirectByteBufAlloc.compositeByteBuffer();
Writer writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2);
for (S3ObjectMetadata object : objectGroup) {
try (ObjectReader reader = new ObjectReader(object, s3Operator)) {
ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().get();
- ByteBuf subBlocks = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().blocks().readableBytes());
- ByteBuf subRanges = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().streamRanges().readableBytes());
- Iterator it = basicObjectInfo.indexBlock().iterator();
+ ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE);
+ Iterator it = basicObjectInfo.indexBlock().iterator();
long validDataBlockStartPosition = 0;
while (it.hasNext()) {
- StreamDataBlock dataBlock = it.next();
- if (dataBlock.getEndOffset() <= startOffset) {
- validDataBlockStartPosition = dataBlock.getBlockEndPosition();
- compactedStartOffset = dataBlock.getEndOffset();
+ DataBlockIndex dataBlock = it.next();
+ if (dataBlock.endOffset() <= startOffset) {
+ validDataBlockStartPosition = dataBlock.endPosition();
+ compactedStartOffset = dataBlock.endOffset();
continue;
}
- subBlocks.writeLong(nextBlockPosition);
- subBlocks.writeInt(dataBlock.getBlockSize());
- subBlocks.writeInt(dataBlock.getRecordCount());
- subRanges.writeLong(dataBlock.getStreamId());
- subRanges.writeLong(dataBlock.getStartOffset());
- subRanges.writeInt((int) (dataBlock.getEndOffset() - dataBlock.getStartOffset()));
- subRanges.writeInt(blockId);
- blockId += 1;
- nextBlockPosition += dataBlock.getBlockSize();
+ new DataBlockIndex(streamId, dataBlock.startOffset(), dataBlock.endOffsetDelta(),
+ dataBlock.recordCount(), nextBlockPosition, dataBlock.size()).encode(subIndexes);
+ nextBlockPosition += dataBlock.size();
}
writer.copyWrite(ObjectUtils.genKey(0, object.objectId()), validDataBlockStartPosition, basicObjectInfo.dataBlockSize());
objectSize += basicObjectInfo.dataBlockSize() - validDataBlockStartPosition;
- blocksOfIndex.addComponent(true, subBlocks);
- rangesOfIndex.addComponent(true, subRanges);
+ indexes.addComponent(true, subIndexes);
compactedObjectIds.add(object.objectId());
}
}
CompositeByteBuf indexBlockAndFooter = DirectByteBufAlloc.compositeByteBuffer();
- ByteBuf blockCount = Unpooled.buffer(4);
- blockCount.writeInt(blockId);
- indexBlockAndFooter.addComponent(true, blockCount);
- indexBlockAndFooter.addComponent(true, blocksOfIndex);
- indexBlockAndFooter.addComponent(true, rangesOfIndex);
+ indexBlockAndFooter.addComponent(true, indexes);
indexBlockAndFooter.addComponent(true, new ObjectWriter.Footer(nextBlockPosition, indexBlockAndFooter.readableBytes()).buffer());
objectSize += indexBlockAndFooter.readableBytes();
@@ -188,7 +173,7 @@ static List> group0(List objects, long
if (groupNextOffset != object.startOffset()
// the group object size is less than maxStreamObjectSize
|| (groupSize + object.objectSize() > maxStreamObjectSize && !group.isEmpty())
- // object count in group is larger than MAX_OBJECT_GROUP_COUNT
+ // object endOffsetDelta in group is larger than MAX_OBJECT_GROUP_COUNT
|| group.size() >= MAX_OBJECT_GROUP_COUNT
|| partCount + objectPartCount > Writer.MAX_PART_COUNT
) {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java
index 1a51712a1..3162a2f0a 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import java.util.ArrayList;
@@ -34,15 +35,15 @@
*/
public class DataBlockReadAccumulator {
private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReadAccumulator.class);
- private final Map, DataBlockRecords> inflightDataBlockReads = new ConcurrentHashMap<>();
+ private final Map, DataBlockRecords> inflightDataBlockReads = new ConcurrentHashMap<>();
public List reserveDataBlock(List> dataBlockPairList) {
List reserveResults = new ArrayList<>();
synchronized (inflightDataBlockReads) {
for (Pair pair : dataBlockPairList) {
ObjectReader reader = pair.getLeft();
- ObjectReader.DataBlockIndex blockIndex = pair.getRight().dataBlockIndex();
- Pair key = Pair.of(reader.objectKey(), blockIndex.blockId());
+ DataBlockIndex blockIndex = pair.getRight().dataBlockIndex();
+ Pair key = Pair.of(reader.objectKey(), blockIndex.startPosition());
DataBlockRecords records = inflightDataBlockReads.get(key);
CompletableFuture cf = new CompletableFuture<>();
BiConsumer listener = (rst, ex) -> {
@@ -69,8 +70,8 @@ public List reserveDataBlock(List key = Pair.of(reader.objectKey(), blockIndex.blockId());
+ public void readDataBlock(ObjectReader reader, DataBlockIndex blockIndex) {
+ Pair key = Pair.of(reader.objectKey(), blockIndex.startPosition());
synchronized (inflightDataBlockReads) {
DataBlockRecords records = inflightDataBlockReads.get(key);
if (records != null) {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
index 6855d4fc1..490f0860b 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
@@ -180,10 +180,10 @@ CompletableFuture> handleSyncReadAhead(TraceContext trac
StreamDataBlock streamDataBlock = pair.getRight();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] sync ra, stream={}, {}-{}, read data block {} from {} [{}, {}), size={}",
- streamId, startOffset, endOffset, streamDataBlock.getBlockId(), objectReader.objectKey(),
+ streamId, startOffset, endOffset, streamDataBlock, objectReader.objectKey(),
streamDataBlock.getBlockStartPosition(), streamDataBlock.getBlockEndPosition(), streamDataBlock.getBlockSize());
}
- String dataBlockKey = streamDataBlock.getObjectId() + "-" + streamDataBlock.getBlockId();
+ String dataBlockKey = streamDataBlock.getObjectId() + "-" + streamDataBlock.getBlockStartPosition();
sortedDataBlockKeyList.add(dataBlockKey);
DataBlockReadAccumulator.ReserveResult reserveResult = reserveResults.get(i);
DefaultS3BlockCache.ReadAheadTaskKey taskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, streamDataBlock.getStartOffset());
@@ -327,7 +327,7 @@ CompletableFuture handleAsyncReadAhead(long streamId, long startOffset, lo
StreamDataBlock streamDataBlock = pair.getRight();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] async ra, stream={}, {}-{}, read data block {} from {} [{}, {}), size={}",
- streamId, startOffset, endOffset, streamDataBlock.getBlockId(), objectReader.objectKey(),
+ streamId, startOffset, endOffset, streamDataBlock, objectReader.objectKey(),
streamDataBlock.getBlockStartPosition(), streamDataBlock.getBlockEndPosition(), streamDataBlock.getBlockSize());
}
UUID uuid = UUID.randomUUID();
diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
index 470846f29..814c96c34 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
@@ -17,7 +17,9 @@
package com.automq.stream.s3.compact.operator;
+import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metrics.MetricsLevel;
@@ -28,6 +30,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -69,24 +72,21 @@ public CompletableFuture> getDataBlockIndex() {
}
public void parseDataBlockIndex() {
- parseDataBlockIndex(Math.max(0, metadata.objectSize() - 1024 * 1024));
- }
-
- public void parseDataBlockIndex(long startPosition) {
- s3Operator.rangeRead(objectKey, startPosition, metadata.objectSize(), ThrottleStrategy.THROTTLE_2)
- .thenAccept(buf -> {
- try {
- indexBlockCf.complete(IndexBlock.parse(buf, metadata.objectSize(), metadata.objectId()));
- } catch (IndexBlockParseException ex) {
- parseDataBlockIndex(ex.indexBlockPosition);
- }
- }).exceptionally(ex -> {
- // unrecoverable error, possibly read on a deleted object
- LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.objectSize(), ex);
- indexBlockCf.completeExceptionally(ex);
- return null;
- });
-
+ // TODO: throttle level
+ @SuppressWarnings("resource") ObjectReader objectReader = new ObjectReader(metadata, s3Operator);
+ objectReader.basicObjectInfo().thenAccept(info -> {
+ List blocks = new ArrayList<>(info.indexBlock().count());
+ Iterator it = info.indexBlock().iterator();
+ while (it.hasNext()) {
+ blocks.add(new StreamDataBlock(metadata.objectId(), it.next()));
+ }
+ indexBlockCf.complete(blocks);
+ }).exceptionally(ex -> {
+ // unrecoverable error, possibly read on a deleted object
+ LOGGER.warn("object {} index parse fail", objectKey, ex);
+ indexBlockCf.completeExceptionally(ex);
+ return null;
+ }).whenComplete((nil, ex) -> objectReader.release());
}
public void readBlocks(List streamDataBlocks) {
@@ -229,48 +229,4 @@ private void failDataBlocks(List streamDataBlocks, Throwable ex
streamDataBlock.getDataCf().completeExceptionally(ex);
}
}
-
- static class IndexBlock {
- static List parse(ByteBuf objectTailBuf, long objectSize,
- long objectId) throws IndexBlockParseException {
- try {
- long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - 48);
- int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
- if (indexBlockPosition + objectTailBuf.readableBytes() < objectSize) {
- throw new IndexBlockParseException(indexBlockPosition);
- } else {
- int indexRelativePosition = objectTailBuf.readableBytes() - (int) (objectSize - indexBlockPosition);
- ByteBuf indexBlockBuf = objectTailBuf.slice(objectTailBuf.readerIndex() + indexRelativePosition, indexBlockSize);
- int blockCount = indexBlockBuf.readInt();
- ByteBuf blocks = indexBlockBuf.slice(indexBlockBuf.readerIndex(), blockCount * 16);
- indexBlockBuf.skipBytes(blockCount * 16);
- ByteBuf streamRanges = indexBlockBuf.slice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes());
- List streamDataBlocks = new ArrayList<>();
- for (int i = 0; i < blockCount; i++) {
- long blockPosition = blocks.readLong();
- int blockSize = blocks.readInt();
- int recordCount = blocks.readInt();
- long streamId = streamRanges.readLong();
- long startOffset = streamRanges.readLong();
- int rangeSize = streamRanges.readInt();
- int blockIndex = streamRanges.readInt();
- streamDataBlocks.add(new StreamDataBlock(streamId, startOffset, startOffset + rangeSize,
- blockIndex, objectId, blockPosition, blockSize, recordCount));
- }
- return streamDataBlocks;
- }
- } finally {
- objectTailBuf.release();
- }
- }
- }
-
- static class IndexBlockParseException extends Exception {
- long indexBlockPosition;
-
- public IndexBlockParseException(long indexBlockPosition) {
- this.indexBlockPosition = indexBlockPosition;
- }
-
- }
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
index aabf9595a..822f36ce7 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.compact.operator;
+import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.ObjectUtils;
@@ -152,28 +153,16 @@ class IndexBlock {
public IndexBlock() {
position = nextDataBlockPosition;
buf = DirectByteBufAlloc.byteBuffer(calculateIndexBlockSize(), "write_index_block");
- buf.writeInt(completedBlocks.size()); // block count
long nextPosition = 0;
- // block index
for (StreamDataBlock block : completedBlocks) {
- buf.writeLong(nextPosition);
- buf.writeInt(block.getBlockSize());
- buf.writeInt(block.getRecordCount());
+ new DataBlockIndex(block.getStreamId(), block.getStartOffset(), (int) (block.getEndOffset() - block.getStartOffset()),
+ block.dataBlockIndex().recordCount(), nextPosition, block.getBlockSize()).encode(buf);
nextPosition += block.getBlockSize();
}
-
- // object stream range
- for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) {
- StreamDataBlock block = completedBlocks.get(blockIndex);
- buf.writeLong(block.getStreamId());
- buf.writeLong(block.getStartOffset());
- buf.writeInt((int) (block.getEndOffset() - block.getStartOffset()));
- buf.writeInt(blockIndex);
- }
}
private int calculateIndexBlockSize() {
- return 4 + completedBlocks.size() * 40;
+ return completedBlocks.size() * DataBlockIndex.BLOCK_INDEX_SIZE;
}
public ByteBuf buffer() {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
index da9cdf5cf..8dc78503b 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
@@ -98,7 +98,7 @@ public static void initMetrics(Meter meter, String prefix) {
.setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES)
.build();
objectNumInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.OBJECT_COUNT_METRIC_NAME)
- .setDescription("Objects count")
+ .setDescription("Objects endOffsetDelta")
.build();
objectStageCost = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME)
.setDescription("Objects stage cost")
@@ -240,7 +240,7 @@ public static void initMetrics(Meter meter, String prefix) {
}
});
inflightWALUploadTasksCount = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME)
- .setDescription("Inflight upload WAL tasks count")
+ .setDescription("Inflight upload WAL tasks endOffsetDelta")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/objects/ObjectManager.java b/s3stream/src/main/java/com/automq/stream/s3/objects/ObjectManager.java
index 99325b682..9b59dd89f 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/objects/ObjectManager.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/objects/ObjectManager.java
@@ -29,7 +29,7 @@ public interface ObjectManager {
/**
* Prepare object id for write, if the objects is not committed in ttl, then delete it.
*
- * @param count object id count.
+ * @param count object id endOffsetDelta.
* @param ttl ttl in milliseconds.
* @return object id range start.
*/
@@ -64,7 +64,7 @@ public interface ObjectManager {
* @param streamId stream id.
* @param startOffset get range start offset.
* @param endOffset get range end offset. NOOP_OFFSET represent endOffset is unlimited.
- * @param limit max object range count.
+ * @param limit max object range endOffsetDelta.
* @return {@link S3ObjectMetadata}
*/
CompletableFuture> getObjects(long streamId, long startOffset, long endOffset, int limit);
@@ -86,7 +86,7 @@ public interface ObjectManager {
* @param streamId stream id.
* @param startOffset get range start offset.
* @param endOffset get range end offset.
- * @param limit max object count.
+ * @param limit max object endOffsetDelta.
* @return {@link S3ObjectMetadata}
*/
CompletableFuture> getStreamObjects(long streamId, long startOffset, long endOffset,
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
index 65a04cfbd..8f8a22ff5 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
@@ -412,11 +412,11 @@ public CompletableFuture> delete(List objectKeys) {
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS);
- LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ LOGGER.info("[ControllerS3Operator]: Delete objects finished, endOffsetDelta: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS, false);
- LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
+ LOGGER.info("[ControllerS3Operator]: Delete objects failed, endOffsetDelta: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
}
diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java
index 0b17b0c16..5b8abc052 100644
--- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java
+++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java
@@ -17,47 +17,42 @@
package com.automq.stream.utils.biniarysearch;
-import io.netty.buffer.ByteBuf;
+import com.automq.stream.s3.DataBlockIndex;
+import com.automq.stream.s3.ObjectReader;
public class IndexBlockOrderedBytes extends AbstractOrderedCollection {
- private final ByteBuf byteBuf;
+ private final ObjectReader.IndexBlock indexBlock;
- public IndexBlockOrderedBytes(ByteBuf byteBuf) {
- this.byteBuf = byteBuf;
+ public IndexBlockOrderedBytes(ObjectReader.IndexBlock indexBlock) {
+ this.indexBlock = indexBlock;
}
@Override
protected int size() {
- return this.byteBuf.readableBytes() / ComparableStreamRange.SIZE;
+ return this.indexBlock.count();
}
@Override
protected ComparableItem get(int index) {
- int start = index * ComparableStreamRange.SIZE;
- long streamId = this.byteBuf.getLong(start);
- long startOffset = this.byteBuf.getLong(start + 8);
- int recordCount = this.byteBuf.getInt(start + 16);
- int blockId = this.byteBuf.getInt(start + 20);
- return new ComparableStreamRange(streamId, startOffset, recordCount, blockId);
+ return new ComparableStreamRange(indexBlock.get(index));
}
public record TargetStreamOffset(long streamId, long offset) {
}
- private record ComparableStreamRange(long streamId, long startOffset, int recordCount, int blockIndex)
+ private record ComparableStreamRange(DataBlockIndex index)
implements ComparableItem {
- private static final int SIZE = 8 + 8 + 4 + 4;
public long endOffset() {
- return startOffset + recordCount;
+ return index.endOffset();
}
@Override
public boolean isLessThan(TargetStreamOffset value) {
- if (this.streamId < value.streamId) {
+ if (this.index().streamId() < value.streamId) {
return true;
- } else if (this.streamId > value.streamId) {
+ } else if (this.index().streamId() > value.streamId) {
return false;
} else {
return this.endOffset() <= value.offset;
@@ -66,12 +61,12 @@ public boolean isLessThan(TargetStreamOffset value) {
@Override
public boolean isGreaterThan(TargetStreamOffset value) {
- if (this.streamId > value.streamId) {
+ if (this.index().streamId() > value.streamId) {
return true;
- } else if (this.streamId < value.streamId) {
+ } else if (this.index().streamId() < value.streamId) {
return false;
} else {
- return this.startOffset > value.offset;
+ return this.index().startOffset() > value.offset;
}
}
}
diff --git a/s3stream/src/test/java/com/automq/stream/s3/DeltaWALUploadTaskTest.java b/s3stream/src/test/java/com/automq/stream/s3/DeltaWALUploadTaskTest.java
index 5671580d0..5adefdce2 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/DeltaWALUploadTaskTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/DeltaWALUploadTaskTest.java
@@ -117,7 +117,7 @@ public void testUpload() throws Exception {
{
S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(request.getObjectId(), request.getObjectSize(), S3ObjectType.STREAM_SET);
ObjectReader objectReader = new ObjectReader(s3ObjectMetadata, s3Operator);
- ObjectReader.DataBlockIndex blockIndex = objectReader.find(234, 20, 24).get()
+ DataBlockIndex blockIndex = objectReader.find(234, 20, 24).get()
.streamDataBlocks().get(0).dataBlockIndex();
ObjectReader.DataBlock dataBlock = objectReader.read(blockIndex).get();
try (CloseableIterator it = dataBlock.iterator()) {
@@ -132,7 +132,7 @@ record = it.next();
{
S3ObjectMetadata streamObjectMetadata = new S3ObjectMetadata(11, request.getStreamObjects().get(0).getObjectSize(), S3ObjectType.STREAM);
ObjectReader objectReader = new ObjectReader(streamObjectMetadata, s3Operator);
- ObjectReader.DataBlockIndex blockIndex = objectReader.find(233, 10, 16).get()
+ DataBlockIndex blockIndex = objectReader.find(233, 10, 16).get()
.streamDataBlocks().get(0).dataBlockIndex();
ObjectReader.DataBlock dataBlock = objectReader.read(blockIndex).get();
try (CloseableIterator it = dataBlock.iterator()) {
diff --git a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java
index d4b35bcc1..666b8f752 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3;
+import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.model.StreamRecordBatch;
@@ -24,6 +25,7 @@
import com.automq.stream.s3.operator.S3Operator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Tag;
@@ -56,54 +58,20 @@ public void testIndexBlock() {
int blockSize3 = recordCntToBlockSize(recordCnt3, bodySize);
long streamId1 = 1;
long streamId2 = 2;
- ByteBuf blocks = Unpooled.buffer(3 * ObjectReader.DataBlockIndex.BLOCK_INDEX_SIZE);
- blocks.writeLong(0);
- blocks.writeInt(blockSize1);
- blocks.writeInt(recordCnt1);
+ ByteBuf indexBuf = Unpooled.buffer(3 * DataBlockIndex.BLOCK_INDEX_SIZE);
+ new DataBlockIndex(streamId1, 0, recordCnt1, recordCnt1, 0, blockSize1).encode(indexBuf);
+ new DataBlockIndex(streamId1, recordCnt1, recordCnt2, recordCnt2, blockSize1, blockSize2).encode(indexBuf);
+ new DataBlockIndex(streamId1, recordCnt1 + recordCnt2, recordCnt3, recordCnt3, blockSize1 + blockSize2, blockSize3).encode(indexBuf);
+ new DataBlockIndex(streamId2, 110, recordCnt3, recordCnt3, blockSize1 + blockSize2 + blockSize3, blockSize3).encode(indexBuf);
- blocks.writeLong(blockSize1);
- blocks.writeInt(blockSize2);
- blocks.writeInt(recordCnt2);
-
- blocks.writeLong((long) blockSize1 + blockSize2);
- blocks.writeInt(blockSize2);
- blocks.writeInt(recordCnt2);
-
- blocks.writeLong((long) blockSize1 + blockSize2 + blockSize2);
- blocks.writeInt(blockSize3);
- blocks.writeInt(recordCnt3);
-
- ByteBuf streamRanges = Unpooled.buffer(3 * (8 + 8 + 4 + 4));
- streamRanges.writeLong(streamId1);
- streamRanges.writeLong(0);
- streamRanges.writeInt(recordCnt1);
- streamRanges.writeInt(0);
-
- streamRanges.writeLong(streamId1);
- streamRanges.writeLong(recordCnt1);
- streamRanges.writeInt(recordCnt2);
- streamRanges.writeInt(1);
-
- streamRanges.writeLong(streamId1);
- streamRanges.writeLong(recordCnt1 + recordCnt2);
- streamRanges.writeInt(recordCnt2);
- streamRanges.writeInt(2);
-
- streamRanges.writeLong(streamId2);
- streamRanges.writeLong(110);
- streamRanges.writeInt(recordCnt3);
- streamRanges.writeInt(3);
-
- ObjectReader.IndexBlock indexBlock = new ObjectReader.IndexBlock(Mockito.mock(S3ObjectMetadata.class), blocks, streamRanges);
+ ObjectReader.IndexBlock indexBlock = new ObjectReader.IndexBlock(Mockito.mock(S3ObjectMetadata.class), indexBuf);
ObjectReader.FindIndexResult rst = indexBlock.find(1, 10, 150, 100000);
assertTrue(rst.isFulfilled());
List streamDataBlocks = rst.streamDataBlocks();
assertEquals(2, streamDataBlocks.size());
- assertEquals(0, streamDataBlocks.get(0).getBlockId());
assertEquals(0, streamDataBlocks.get(0).getBlockStartPosition());
assertEquals(blockSize1, streamDataBlocks.get(0).getBlockEndPosition());
- assertEquals(1, streamDataBlocks.get(1).getBlockId());
assertEquals(blockSize1, streamDataBlocks.get(1).getBlockStartPosition());
assertEquals((long) blockSize1 + blockSize2, streamDataBlocks.get(1).getBlockEndPosition());
@@ -138,7 +106,38 @@ public void testGetBasicObjectInfo() throws ExecutionException, InterruptedExcep
S3ObjectMetadata metadata = new S3ObjectMetadata(233L, objectWriter.size(), S3ObjectType.STREAM_SET);
try (ObjectReader objectReader = new ObjectReader(metadata, s3Operator)) {
ObjectReader.BasicObjectInfo info = objectReader.basicObjectInfo().get();
- assertEquals(streamCount, info.blockCount());
+ assertEquals(streamCount, info.indexBlock().count());
+ }
+ }
+
+ @Test
+ public void testReadBlockGroup() throws ExecutionException, InterruptedException {
+ S3Operator s3Operator = new MemoryS3Operator();
+ ByteBuf buf = DirectByteBufAlloc.byteBuffer(0);
+ buf.writeBytes(new ObjectWriter.DataBlock(233L, List.of(
+ new StreamRecordBatch(233L, 0, 10, 1, TestUtils.random(100)),
+ new StreamRecordBatch(233L, 0, 11, 2, TestUtils.random(100))
+ )).buffer());
+ buf.writeBytes(new ObjectWriter.DataBlock(233L, List.of(
+ new StreamRecordBatch(233L, 0, 13, 1, TestUtils.random(100))
+ )).buffer());
+ int indexPosition = buf.readableBytes();
+ new DataBlockIndex(233L, 10, 4, 3, 0, buf.readableBytes()).encode(buf);
+ int indexSize = buf.readableBytes() - indexPosition;
+ buf.writeBytes(new ObjectWriter.Footer(indexPosition, indexSize).buffer());
+ int objectSize = buf.readableBytes();
+ s3Operator.write(ObjectUtils.genKey(0, 1L), buf);
+ buf.release();
+ try (ObjectReader reader = new ObjectReader(new S3ObjectMetadata(1L, objectSize, S3ObjectType.STREAM), s3Operator)) {
+ ObjectReader.FindIndexResult rst = reader.find(233L, 10L, 14L, 1024).get();
+ assertEquals(1, rst.streamDataBlocks().size());
+ try (ObjectReader.DataBlock dataBlock = reader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get()) {
+ assertEquals(3, dataBlock.recordCount());
+ Iterator it = dataBlock.iterator();
+ assertEquals(10, it.next().getBaseOffset());
+ assertEquals(11, it.next().getBaseOffset());
+ assertEquals(13, it.next().getBaseOffset());
+ }
}
}
diff --git a/s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java
index 8459c8548..53b43b717 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java
@@ -95,7 +95,7 @@ public void testWrite() throws ExecutionException, InterruptedException {
streamDataBlocks = objectReader.find(234, 1, 2).get().streamDataBlocks();
assertEquals(1, streamDataBlocks.size());
- assertEquals(2, streamDataBlocks.get(0).getBlockId());
+ assertEquals(0, streamDataBlocks.get(0).getStartOffset());
{
Iterator it = objectReader.read(streamDataBlocks.get(0).dataBlockIndex()).get().iterator();
StreamRecordBatch r = it.next();
diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StreamMemoryTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StreamMemoryTest.java
deleted file mode 100644
index b43816330..000000000
--- a/s3stream/src/test/java/com/automq/stream/s3/S3StreamMemoryTest.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3;
-
-//import com.automq.stream.api.AppendResult;
-//import com.automq.stream.api.CreateStreamOptions;
-//import com.automq.stream.api.FetchResult;
-//import com.automq.stream.api.OpenStreamOptions;
-//import com.automq.stream.api.RecordBatch;
-//import com.automq.stream.api.RecordBatchWithContext;
-//import com.automq.stream.api.Stream;
-//import com.automq.stream.s3.cache.DefaultS3BlockCache;
-//import com.automq.stream.s3.cache.S3BlockCache;
-//import com.automq.stream.s3.memory.MemoryMetadataManager;
-//import com.automq.stream.s3.objects.ObjectManager;
-//import com.automq.stream.s3.operator.MemoryS3Operator;
-//import com.automq.stream.s3.operator.S3Operator;
-//import com.automq.stream.s3.streams.StreamManager;
-//import com.automq.stream.s3.wal.MemoryWriteAheadLog;
-//import org.junit.jupiter.api.BeforeEach;
-
-import org.junit.jupiter.api.Tag;
-//import org.junit.jupiter.api.Test;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import java.nio.ByteBuffer;
-//import java.util.Collections;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Random;
-//import java.util.concurrent.BrokenBarrierException;
-//import java.util.concurrent.CompletableFuture;
-//import java.util.concurrent.CountDownLatch;
-//import java.util.concurrent.CyclicBarrier;
-//import java.util.concurrent.ExecutionException;
-//import java.util.concurrent.atomic.AtomicLong;
-
-@Tag("S3Unit")
-public class S3StreamMemoryTest {
-//
-// static class MockRecordBatch implements RecordBatch {
-//
-// ByteBuffer byteBuffer;
-//
-// int count;
-//
-// public MockRecordBatch(String payload, int count) {
-// this.byteBuffer = ByteBuffer.wrap(payload.getBytes());
-// this.count = count;
-// }
-//
-// @Override
-// public int count() {
-// return count;
-// }
-//
-// @Override
-// public long baseTimestamp() {
-// return 0;
-// }
-//
-// @Override
-// public Map properties() {
-// return Collections.emptyMap();
-// }
-//
-// @Override
-// public ByteBuffer rawPayload() {
-// return this.byteBuffer.duplicate();
-// }
-// }
-//
-// private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamMemoryTest.class);
-// MemoryMetadataManager manager;
-// Storage storage;
-// S3BlockCache blockCache;
-// S3Operator operator;
-// StreamManager streamManager;
-// ObjectManager objectManager;
-// S3StreamClient streamClient;
-// private final static long MAX_APPENDED_OFFSET = 200;
-//
-// Random random = new Random();
-//
-// @BeforeEach
-// public void setUp() {
-// manager = new MemoryMetadataManager();
-// manager.start();
-// streamManager = manager;
-// objectManager = manager;
-// operator = new MemoryS3Operator();
-// blockCache = new DefaultS3BlockCache(0L, objectManager, operator);
-// storage = new S3Storage(Config.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), streamManager, objectManager, blockCache, operator);
-// streamClient = new S3StreamClient(streamManager, storage, objectManager, operator, Config.fromProps(TestUtils.defaultBrokerConfig()));
-// }
-//
-// @Test
-// public void testOpenAndClose() throws Exception {
-// CreateStreamOptions options = CreateStreamOptions.newBuilder().epoch(0L).build();
-// Stream stream = this.streamClient.createAndOpenStream(options).get();
-// long streamId = stream.streamId();
-// Assertions.assertNotNull(stream);
-// // duplicate open
-// stream = this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(0L).build()).get();
-// Assertions.assertNotNull(stream);
-// // open with new epoch but current epoch is not closed
-// Assertions.assertThrows(ExecutionException.class,
-// () -> this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(1L).build()).get());
-// stream.close().get();
-// // duplicate close
-// stream.close().get();
-// // reopen with stale epoch
-// Assertions.assertThrows(ExecutionException.class,
-// () -> this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(0L).build()).get());
-// // reopen with new epoch
-// Stream newStream = this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(1L).build()).get();
-// Assertions.assertEquals(streamId, newStream.streamId());
-// // close with stale epoch
-// final Stream oldStream = stream;
-// Assertions.assertThrows(ExecutionException.class, () -> oldStream.close().get());
-// }
-//
-// @Test
-// public void testFetch() throws Exception {
-// CreateStreamOptions options = CreateStreamOptions.newBuilder().epoch(1L).build();
-// Stream stream = this.streamClient.createAndOpenStream(options).get();
-// RecordBatch recordBatch = new MockRecordBatch("hello", 1);
-// CompletableFuture append0 = stream.append(recordBatch);
-// RecordBatch recordBatch1 = new MockRecordBatch("world", 1);
-// CompletableFuture append1 = stream.append(recordBatch1);
-// CompletableFuture.allOf(append0, append1).get();
-// // fetch
-// FetchResult result0 = stream.fetch(0, 1, 100).get();
-// Assertions.assertEquals(1, result0.recordBatchList().size());
-// RecordBatchWithContext record0 = result0.recordBatchList().get(0);
-// Assertions.assertEquals(0, record0.baseOffset());
-// Assertions.assertEquals(1, record0.lastOffset());
-// Assertions.assertEquals("hello", new String(buf2array(record0.rawPayload())));
-// FetchResult result1 = stream.fetch(1, 2, 100).get();
-// Assertions.assertEquals(1, result1.recordBatchList().size());
-// RecordBatchWithContext record1 = result1.recordBatchList().get(0);
-// Assertions.assertEquals(1, record1.baseOffset());
-// Assertions.assertEquals(2, record1.lastOffset());
-// Assertions.assertEquals("world", new String(buf2array(record1.rawPayload())));
-// // fetch all
-// FetchResult result = stream.fetch(0, 2, 100000).get();
-// Assertions.assertEquals(2, result.recordBatchList().size());
-// RecordBatchWithContext record = result.recordBatchList().get(0);
-// Assertions.assertEquals("hello", new String(buf2array(record.rawPayload())));
-// RecordBatchWithContext record2 = result.recordBatchList().get(1);
-// Assertions.assertEquals("world", new String(buf2array(record2.rawPayload())));
-// }
-//
-// @Test
-// public void testPressure() throws Exception {
-// CreateStreamOptions options = CreateStreamOptions.newBuilder().epoch(1L).build();
-// Stream stream0 = this.streamClient.createAndOpenStream(options).get();
-// Stream stream1 = this.streamClient.createAndOpenStream(options).get();
-// Stream stream2 = this.streamClient.createAndOpenStream(options).get();
-// List streams = List.of(stream0, stream1, stream2);
-// CountDownLatch latch = new CountDownLatch(1 * 3 + 5 * 3);
-// CyclicBarrier barrier = new CyclicBarrier(1 * 3 + 5 * 3);
-// for (int i = 0; i < 3; i++) {
-// AtomicLong appendedOffset = new AtomicLong(-1);
-// final Stream stream = streams.get(i);
-// new Thread(() -> {
-// Producer producer = new Producer(stream, latch, appendedOffset);
-// try {
-// barrier.await();
-// producer.run();
-// } catch (InterruptedException e) {
-// throw new RuntimeException(e);
-// } catch (BrokenBarrierException e) {
-// throw new RuntimeException(e);
-// }
-//
-// }).start();
-// for (int j = 0; j < 5; j++) {
-// final int id = j;
-// new Thread(() -> {
-// Consumer consumer = new Consumer(id, stream, latch, appendedOffset);
-// try {
-// barrier.await();
-// consumer.run();
-// } catch (InterruptedException e) {
-// throw new RuntimeException(e);
-// } catch (BrokenBarrierException e) {
-// throw new RuntimeException(e);
-// }
-//
-// }).start();
-// }
-// }
-// latch.await();
-// }
-//
-// private static byte[] buf2array(ByteBuffer buffer) {
-// byte[] array = new byte[buffer.remaining()];
-// buffer.get(array);
-// return array;
-// }
-//
-// static class Producer implements Runnable {
-//
-// private long nextAppendOffset = 0;
-// private Stream stream;
-// private CountDownLatch latch;
-// private AtomicLong appendedOffset;
-// private Random random = new Random();
-// private volatile boolean start = true;
-//
-// public Producer(Stream stream, CountDownLatch latch, AtomicLong appendedOffset) {
-// this.stream = stream;
-// this.latch = latch;
-// this.appendedOffset = appendedOffset;
-// }
-//
-//
-// @Override
-// public void run() {
-// while (start) {
-// try {
-// append();
-// } catch (Exception e) {
-// LOGGER.error("Error in producer", e);
-// }
-// }
-// latch.countDown();
-// }
-//
-// public void append() throws InterruptedException {
-// if (nextAppendOffset > MAX_APPENDED_OFFSET) {
-// start = false;
-// return;
-// }
-// MockRecordBatch recordBatch = new MockRecordBatch("hello[" + stream.streamId() + "][" + nextAppendOffset++ + "]", 1);
-// stream.append(recordBatch).whenCompleteAsync((result, error) -> {
-// Assertions.assertNull(error);
-// LOGGER.info("[Producer-{}]: produce: {}", stream.streamId(), result.baseOffset());
-// this.appendedOffset.incrementAndGet();
-// });
-// Thread.sleep(random.nextInt(30));
-// }
-// }
-//
-// static class Consumer implements Runnable {
-//
-// private long consumeOffset = 0;
-// private int id;
-// private Stream stream;
-// private CountDownLatch latch;
-// private AtomicLong appendedOffset;
-// private Random random = new Random();
-// private volatile boolean start = true;
-//
-// public Consumer(int id, Stream stream, CountDownLatch latch, AtomicLong appendedOffset) {
-// this.id = id;
-// this.stream = stream;
-// this.latch = latch;
-// this.appendedOffset = appendedOffset;
-// }
-//
-// @Override
-// public void run() {
-// while (start) {
-// try {
-// fetch();
-// } catch (Exception e) {
-// LOGGER.error("Error in consumer", e);
-// }
-// }
-// latch.countDown();
-// }
-//
-// public void fetch() throws InterruptedException, ExecutionException {
-// if (consumeOffset >= MAX_APPENDED_OFFSET) {
-// start = false;
-// return;
-// }
-// Thread.sleep(random.nextInt(200));
-// long appendEndOffset = appendedOffset.get();
-// if (consumeOffset > appendEndOffset) {
-// return;
-// }
-// FetchResult result = stream.fetch(consumeOffset, appendEndOffset + 1, Integer.MAX_VALUE).get();
-// LOGGER.info("[Consumer-{}-{}] fetch records: {}", stream.streamId(), id, result.recordBatchList().size());
-// result.recordBatchList().forEach(
-// record -> {
-// long offset = record.baseOffset();
-// Assertions.assertEquals("hello[" + stream.streamId() + "][" + offset + "]", new String(buf2array(record.rawPayload())));
-// LOGGER.info("[Consumer-{}-{}] consume: {}", stream.streamId(), id, offset);
-// consumeOffset = Math.max(consumeOffset, offset + 1);
-// }
-// );
-// }
-//
-// }
-
-}
diff --git a/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java b/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java
index 2209a7e13..74574193b 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java
@@ -154,7 +154,7 @@ public void testCompact() throws ExecutionException, InterruptedException {
// verify compacted object record
{
ObjectReader objectReader = new ObjectReader(new S3ObjectMetadata(5, req1.getObjectSize(), S3ObjectType.STREAM), s3Operator);
- assertEquals(3, objectReader.basicObjectInfo().get().blockCount());
+ assertEquals(3, objectReader.basicObjectInfo().get().indexBlock().count());
ObjectReader.FindIndexResult rst = objectReader.find(streamId, 13L, 18L).get();
assertEquals(3, rst.streamDataBlocks().size());
ObjectReader.DataBlock dataBlock1 = objectReader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get();
@@ -182,7 +182,7 @@ public void testCompact() throws ExecutionException, InterruptedException {
}
{
ObjectReader objectReader = new ObjectReader(new S3ObjectMetadata(6, req2.getObjectSize(), S3ObjectType.STREAM), s3Operator);
- assertEquals(3, objectReader.basicObjectInfo().get().blockCount());
+ assertEquals(3, objectReader.basicObjectInfo().get().indexBlock().count());
ObjectReader.FindIndexResult rst = objectReader.find(streamId, 30L, 33L).get();
assertEquals(3, rst.streamDataBlocks().size());
ObjectReader.DataBlock dataBlock1 = objectReader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get();
diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/DataBlockReadAccumulatorTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/DataBlockReadAccumulatorTest.java
index 33d7e3590..be9e4c02a 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/cache/DataBlockReadAccumulatorTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/cache/DataBlockReadAccumulatorTest.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.TestUtils;
@@ -51,8 +52,8 @@ public void test() throws ExecutionException, InterruptedException, TimeoutExcep
DataBlockReadAccumulator accumulator = new DataBlockReadAccumulator();
ObjectReader reader = mock(ObjectReader.class);
- ObjectReader.DataBlockIndex dataBlockIndex = new ObjectReader.DataBlockIndex(10, 10, 100, 2);
- StreamDataBlock streamDataBlock = new StreamDataBlock(233L, 0, 12, 1, dataBlockIndex);
+ DataBlockIndex dataBlockIndex = new DataBlockIndex(10, 0, 12, 2, 10, 100);
+ StreamDataBlock streamDataBlock = new StreamDataBlock(1, dataBlockIndex);
CompletableFuture readerCf = new CompletableFuture<>();
when(reader.read(eq(dataBlockIndex))).thenReturn(readerCf);
diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/ObjectReaderLRUCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/ObjectReaderLRUCacheTest.java
index e9c8e56e2..65d6d2aff 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/cache/ObjectReaderLRUCacheTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/cache/ObjectReaderLRUCacheTest.java
@@ -58,9 +58,9 @@ public void testGetPut() throws ExecutionException, InterruptedException {
ObjectReader objectReader = new ObjectReader(new S3ObjectMetadata(233L, objectWriter.size(), S3ObjectType.STREAM_SET), s3Operator);
ObjectReader objectReader2 = new ObjectReader(new S3ObjectMetadata(234L, objectWriter2.size(), S3ObjectType.STREAM_SET), s3Operator);
ObjectReader objectReader3 = new ObjectReader(new S3ObjectMetadata(235L, objectWriter3.size(), S3ObjectType.STREAM_SET), s3Operator);
- Assertions.assertEquals(40000, objectReader.basicObjectInfo().get().size());
- Assertions.assertEquals(80000, objectReader2.basicObjectInfo().get().size());
- Assertions.assertEquals(120000, objectReader3.basicObjectInfo().get().size());
+ Assertions.assertEquals(36000, objectReader.basicObjectInfo().get().size());
+ Assertions.assertEquals(72000, objectReader2.basicObjectInfo().get().size());
+ Assertions.assertEquals(108000, objectReader3.basicObjectInfo().get().size());
ObjectReaderLRUCache cache = new ObjectReaderLRUCache(100000);
cache.put(235L, objectReader3);
diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java
index bf5a988f8..01df04771 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.ObjectWriter;
import com.automq.stream.s3.StreamDataBlock;
@@ -109,10 +110,10 @@ public void testSyncReadAheadInflight() {
.when(objectManager).getObjects(eq(streamId), eq(startOffset), anyLong(), anyInt());
ObjectReader reader = Mockito.mock(ObjectReader.class);
- ObjectReader.DataBlockIndex index1 = new ObjectReader.DataBlockIndex(0, 0, 256, 128);
+ DataBlockIndex index1 = new DataBlockIndex(0, 64, 128, 128, 0, 256);
doReturn(reader).when(streamReader).getObjectReader(metadata);
doAnswer(invocation -> CompletableFuture.completedFuture(new ObjectReader.FindIndexResult(true, -1, -1,
- List.of(new StreamDataBlock(streamId, 64, 128, objectId, index1))))).when(reader).find(eq(streamId), eq(startOffset), anyLong(), eq(maxBytes));
+ List.of(new StreamDataBlock(objectId, index1))))).when(reader).find(eq(streamId), eq(startOffset), anyLong(), eq(maxBytes));
doReturn(new CompletableFuture<>()).when(reader).read(index1);
streamReader.syncReadAhead(TraceContext.DEFAULT, streamId, startOffset, endOffset, maxBytes, Mockito.mock(ReadAheadAgent.class), UUID.randomUUID());
@@ -136,10 +137,10 @@ public void testSyncReadAhead() {
StreamReader streamReader = new StreamReader(s3Operator, objectManager, blockCache, cache, accumulator, new HashMap<>(), new InflightReadThrottle());
StreamReader.ReadContext context = new StreamReader.ReadContext(0, 256);
- ObjectReader.DataBlockIndex index1 = new ObjectReader.DataBlockIndex(0, 0, 256, 128);
+ DataBlockIndex index1 = new DataBlockIndex(0, 0, 128, 128, 0, 256);
context.streamDataBlocksPair = List.of(
new ImmutablePair<>(1L, List.of(
- new StreamDataBlock(233L, 0, 128, 1, index1))));
+ new StreamDataBlock(1, index1))));
ObjectReader reader = Mockito.mock(ObjectReader.class);
ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class);
@@ -195,10 +196,10 @@ public void testSyncReadAheadNotAlign() {
long startOffset = 32;
StreamReader.ReadContext context = new StreamReader.ReadContext(startOffset, 256);
- ObjectReader.DataBlockIndex index1 = new ObjectReader.DataBlockIndex(0, 0, 256, 128);
+ DataBlockIndex index1 = new DataBlockIndex(0, 0, 128, 128, 0, 256);
context.streamDataBlocksPair = List.of(
new ImmutablePair<>(1L, List.of(
- new StreamDataBlock(233L, 0, 128, 1, index1))));
+ new StreamDataBlock(1, index1))));
ObjectReader reader = Mockito.mock(ObjectReader.class);
ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class);
@@ -256,12 +257,12 @@ public void testSyncReadAheadException() {
StreamReader streamReader = new StreamReader(s3Operator, objectManager, blockCache, cache, accumulator, new HashMap<>(), new InflightReadThrottle());
StreamReader.ReadContext context = new StreamReader.ReadContext(0, 512);
- ObjectReader.DataBlockIndex index1 = new ObjectReader.DataBlockIndex(0, 0, 256, 128);
- ObjectReader.DataBlockIndex index2 = new ObjectReader.DataBlockIndex(1, 256, 256, 128);
+ DataBlockIndex index1 = new DataBlockIndex(0, 0, 128, 128, 0, 256);
+ DataBlockIndex index2 = new DataBlockIndex(1, 128, 236, 128, 256, 256);
context.streamDataBlocksPair = List.of(
new ImmutablePair<>(1L, List.of(
- new StreamDataBlock(233L, 0, 128, 1, index1),
- new StreamDataBlock(233L, 128, 256, 1, index2))));
+ new StreamDataBlock(1, index1),
+ new StreamDataBlock(1, index2))));
ObjectReader reader = Mockito.mock(ObjectReader.class);
ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class);
@@ -323,10 +324,10 @@ public void testAsyncReadAhead() {
StreamReader streamReader = new StreamReader(s3Operator, objectManager, blockCache, cache, accumulator, new HashMap<>(), new InflightReadThrottle());
StreamReader.ReadContext context = new StreamReader.ReadContext(0, 256);
- ObjectReader.DataBlockIndex index1 = new ObjectReader.DataBlockIndex(0, 0, 256, 128);
+ DataBlockIndex index1 = new DataBlockIndex(0, 0, 128, 128, 0, 256);
context.streamDataBlocksPair = List.of(
new ImmutablePair<>(1L, List.of(
- new StreamDataBlock(233L, 0, 128, 1, index1))));
+ new StreamDataBlock(1, index1))));
ObjectReader reader = Mockito.mock(ObjectReader.class);
ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class);
@@ -378,12 +379,12 @@ public void testAsyncReadAheadException() {
StreamReader streamReader = new StreamReader(s3Operator, objectManager, blockCache, cache, accumulator, new HashMap<>(), new InflightReadThrottle());
StreamReader.ReadContext context = new StreamReader.ReadContext(0, 256);
- ObjectReader.DataBlockIndex index1 = new ObjectReader.DataBlockIndex(0, 0, 256, 128);
- ObjectReader.DataBlockIndex index2 = new ObjectReader.DataBlockIndex(1, 256, 256, 128);
+ DataBlockIndex index1 = new DataBlockIndex(0, 0, 128, 128, 0, 256);
+ DataBlockIndex index2 = new DataBlockIndex(1, 128, 256, 128, 256, 256);
context.streamDataBlocksPair = List.of(
new ImmutablePair<>(1L, List.of(
- new StreamDataBlock(233L, 0, 128, 1, index1),
- new StreamDataBlock(233L, 128, 256, 1, index2))));
+ new StreamDataBlock(1, index1),
+ new StreamDataBlock(1, index2))));
ObjectReader reader = Mockito.mock(ObjectReader.class);
ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class);
diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java
index 37f49edc5..061c00f99 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java
@@ -50,15 +50,15 @@ public class CompactionAnalyzerTest extends CompactionTestBase {
private static Map> generateStreamDataBlocks() {
return Map.of(
OBJECT_0, List.of(
- new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, 20, 1),
- new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, 30, 1),
- new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, 30, 1)),
+ new StreamDataBlock(STREAM_0, 0, 20, OBJECT_0, -1, 20, 1),
+ new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, 30, 1),
+ new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, 30, 1)),
OBJECT_1, List.of(
- new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, 5, 1),
- new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, 60, 1)),
+ new StreamDataBlock(STREAM_0, 20, 25, OBJECT_1, -1, 5, 1),
+ new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, 60, 1)),
OBJECT_2, List.of(
- new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, 100, 1),
- new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, 40, 1))
+ new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, 100, 1),
+ new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, 40, 1))
);
}
@@ -78,16 +78,16 @@ public void testReadObjectIndices() {
Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, s3Operator, null);
Map> expectedBlocksMap = Map.of(
OBJECT_0, List.of(
- new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)),
OBJECT_1, List.of(
- new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
OBJECT_2, List.of(
- new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)));
+ new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1)));
assertTrue(compare(streamDataBlocksMap, expectedBlocksMap));
}
@@ -101,15 +101,15 @@ public void testReadObjectIndicesWithTrimmedData() {
Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, s3Operator);
Map> expectedBlocksMap = Map.of(
OBJECT_0, List.of(
- new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)),
+ new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)),
OBJECT_1, List.of(
- new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
OBJECT_2, List.of(
- new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)));
+ new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1)));
assertTrue(compare(streamDataBlocksMap, expectedBlocksMap));
}
@@ -127,23 +127,23 @@ public void testFilterBlocksToCompact2() {
CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, STREAM_SPLIT_SIZE, MAX_STREAM_NUM_IN_WAL, MAX_STREAM_OBJECT_NUM);
Map> streamDataBlocksMap = Map.of(
OBJECT_0, List.of(
- new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 0, 20, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1)),
OBJECT_1, List.of(
- new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 20, 25, OBJECT_1, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
OBJECT_2, List.of(
- new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)),
+ new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1)),
OBJECT_3, List.of(
- new StreamDataBlock(STREAM_3, 0, 50, 1, OBJECT_3, -1, -1, 1)));
+ new StreamDataBlock(STREAM_3, 0, 50, OBJECT_3, -1, -1, 1)));
Map> result = compactionAnalyzer.filterBlocksToCompact(streamDataBlocksMap);
Map> expectedBlocksMap = Map.of(
OBJECT_0, List.of(
- new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 0, 20, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1)),
OBJECT_1, List.of(
- new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)));
+ new StreamDataBlock(STREAM_0, 20, 25, OBJECT_1, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)));
assertTrue(compare(result, expectedBlocksMap));
}
@@ -154,14 +154,14 @@ public void testSortStreamRangePositions() {
Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, s3Operator);
List sortedStreamDataBlocks = compactionAnalyzer.sortStreamRangePositions(streamDataBlocksMap);
List expectedBlocks = List.of(
- new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1));
+ new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1));
for (int i = 0; i < sortedStreamDataBlocks.size(); i++) {
assertTrue(compare(sortedStreamDataBlocks.get(i), expectedBlocks.get(i)));
}
@@ -178,22 +178,22 @@ public void testGroupObjectWithLimit() {
List expectedCompactedObject = List.of(
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)));
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1)));
for (int i = 0; i < compactedObjectBuilders.size(); i++) {
assertTrue(compare(compactedObjectBuilders.get(i), expectedCompactedObject.get(i)));
}
@@ -210,22 +210,22 @@ public void testGroupObjectWithLimit2() {
List expectedCompactedObject = List.of(
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)));
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1)));
for (int i = 0; i < compactedObjectBuilders.size(); i++) {
assertTrue(compare(compactedObjectBuilders.get(i), expectedCompactedObject.get(i)));
}
@@ -242,16 +242,16 @@ public void testGroupObjectWithLimit3() {
List expectedCompactedObject = List.of(
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)));
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)));
for (int i = 0; i < compactedObjectBuilders.size(); i++) {
assertTrue(compare(compactedObjectBuilders.get(i), expectedCompactedObject.get(i)));
}
@@ -266,15 +266,15 @@ public void testGroupObjectWithLimit4() {
List expectedCompactedObject = List.of(
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 20, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 20, 25, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)));
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)));
for (int i = 0; i < compactedObjectBuilders.size(); i++) {
assertTrue(compare(compactedObjectBuilders.get(i), expectedCompactedObject.get(i)));
}
@@ -298,15 +298,15 @@ public void testGroupObjectWithLimit6() {
List expectedCompactedObject = List.of(
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 20, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 20, 25, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)));
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)));
for (int i = 0; i < compactedObjectBuilders.size(); i++) {
assertTrue(compare(compactedObjectBuilders.get(i), expectedCompactedObject.get(i)));
}
@@ -331,36 +331,36 @@ public void testCompactionPlans1() {
List expectCompactedObjects = List.of(
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1))
.build(),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1))
.build(),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1))
.build(),
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1))
.build());
Map> expectObjectStreamDataBlocks = Map.of(
OBJECT_0, List.of(
- new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)),
OBJECT_1, List.of(
- new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)),
OBJECT_2, List.of(
- new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)));
+ new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1)));
CompactionPlan compactionPlan = compactionPlans.get(0);
for (int i = 0; i < compactionPlan.compactedObjects().size(); i++) {
assertTrue(compare(compactionPlan.compactedObjects().get(i), expectCompactedObjects.get(i)));
@@ -377,23 +377,23 @@ private void checkCompactionPlan2(List compactionPlans) {
List expectCompactedObjects = List.of(
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1))
.build(),
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1))
- .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1))
.build());
Map> expectObjectStreamDataBlocks = Map.of(
OBJECT_0, List.of(
- new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 25, 30, 1, OBJECT_0, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 30, 60, 2, OBJECT_0, -1, -1, 1)),
+ new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 25, 30, OBJECT_0, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 30, 60, OBJECT_0, -1, -1, 1)),
OBJECT_1, List.of(
- new StreamDataBlock(STREAM_0, 15, 20, 0, OBJECT_1, -1, -1, 1),
- new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)));
+ new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1),
+ new StreamDataBlock(STREAM_1, 60, 120, OBJECT_1, -1, -1, 1)));
CompactionPlan compactionPlan = compactionPlans.get(0);
for (int i = 0; i < compactionPlan.compactedObjects().size(); i++) {
assertTrue(compare(compactionPlan.compactedObjects().get(i), expectCompactedObjects.get(i)));
@@ -406,22 +406,22 @@ private void checkCompactionPlan2(List compactionPlans) {
expectCompactedObjects = List.of(
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1))
.build(),
new CompactedObjectBuilder()
.setType(CompactionType.SPLIT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1))
.build(),
new CompactedObjectBuilder()
.setType(CompactionType.COMPACT)
- .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1))
+ .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1))
.build());
expectObjectStreamDataBlocks = Map.of(
OBJECT_0, List.of(
- new StreamDataBlock(STREAM_2, 30, 60, 3, OBJECT_0, -1, -1, 1)),
+ new StreamDataBlock(STREAM_2, 30, 60, OBJECT_0, -1, -1, 1)),
OBJECT_2, List.of(
- new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1),
- new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)));
+ new StreamDataBlock(STREAM_1, 400, 500, OBJECT_2, -1, -1, 1),
+ new StreamDataBlock(STREAM_2, 230, 270, OBJECT_2, -1, -1, 1)));
compactionPlan = compactionPlans.get(1);
for (int i = 0; i < compactionPlan.compactedObjects().size(); i++) {
assertTrue(compare(compactionPlan.compactedObjects().get(i), expectCompactedObjects.get(i)));
diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java
index 46e2137cb..83f1d1167 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java
@@ -18,7 +18,7 @@
package com.automq.stream.s3.compact;
import com.automq.stream.s3.Config;
-import com.automq.stream.s3.ObjectReader;
+import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectWriter;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.TestUtils;
@@ -76,14 +76,14 @@ public class CompactionManagerTest extends CompactionTestBase {
private Config config;
private static Map> getStreamDataBlockMap() {
- StreamDataBlock block1 = new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, new ObjectReader.DataBlockIndex(0, 0, 15, 15));
- StreamDataBlock block2 = new StreamDataBlock(STREAM_1, 0, 20, OBJECT_0, new ObjectReader.DataBlockIndex(1, 15, 50, 20));
+ StreamDataBlock block1 = new StreamDataBlock(OBJECT_0, new DataBlockIndex(0, 0, 15, 15, 0, 15));
+ StreamDataBlock block2 = new StreamDataBlock(OBJECT_0, new DataBlockIndex(1, 0, 20, 20, 15, 50));
- StreamDataBlock block3 = new StreamDataBlock(STREAM_0, 15, 27, OBJECT_1, new ObjectReader.DataBlockIndex(0, 0, 20, 12));
- StreamDataBlock block4 = new StreamDataBlock(STREAM_1, 20, 45, OBJECT_1, new ObjectReader.DataBlockIndex(1, 20, 60, 25));
+ StreamDataBlock block3 = new StreamDataBlock(OBJECT_1, new DataBlockIndex(0, 15, 12, 12, 0, 20));
+ StreamDataBlock block4 = new StreamDataBlock(OBJECT_1, new DataBlockIndex(1, 20, 25, 25, 20, 60));
- StreamDataBlock block5 = new StreamDataBlock(STREAM_0, 27, 40, OBJECT_2, new ObjectReader.DataBlockIndex(0, 0, 20, 20));
- StreamDataBlock block6 = new StreamDataBlock(STREAM_3, 0, 30, OBJECT_2, new ObjectReader.DataBlockIndex(1, 20, 30, 30));
+ StreamDataBlock block5 = new StreamDataBlock(OBJECT_2, new DataBlockIndex(0, 27, 13, 20, 0, 20));
+ StreamDataBlock block6 = new StreamDataBlock(OBJECT_2, new DataBlockIndex(3, 0, 30, 30, 20, 30));
return Map.of(
OBJECT_0, List.of(
block1,
@@ -418,7 +418,7 @@ public void testCompactNoneExistObjects2() {
public void testCompactWithLimit() {
when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(70L);
when(config.maxStreamNumPerStreamSetObject()).thenReturn(MAX_STREAM_NUM_IN_WAL);
- when(config.maxStreamObjectNumPerCommit()).thenReturn(2);
+ when(config.maxStreamObjectNumPerCommit()).thenReturn(4);
List s3ObjectMetadata = this.objectManager.getServerObjects().join();
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);
List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java
index 87095f497..ce368a3d5 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.compact;
+import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectWriter;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.TestUtils;
@@ -76,10 +77,10 @@ public void setUp() throws Exception {
objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> {
assertEquals(OBJECT_0, objectId);
ObjectWriter objectWriter = ObjectWriter.writer(objectId, s3Operator, 1024, 1024);
- StreamRecordBatch r1 = new StreamRecordBatch(STREAM_0, 0, 0, 15, TestUtils.random(10));
- StreamRecordBatch r2 = new StreamRecordBatch(STREAM_1, 0, 25, 5, TestUtils.random(10));
- StreamRecordBatch r3 = new StreamRecordBatch(STREAM_1, 0, 30, 30, TestUtils.random(30));
- StreamRecordBatch r4 = new StreamRecordBatch(STREAM_2, 0, 30, 30, TestUtils.random(30));
+ StreamRecordBatch r1 = new StreamRecordBatch(STREAM_0, 0, 0, 15, TestUtils.random(2));
+ StreamRecordBatch r2 = new StreamRecordBatch(STREAM_1, 0, 25, 5, TestUtils.random(2));
+ StreamRecordBatch r3 = new StreamRecordBatch(STREAM_1, 0, 30, 30, TestUtils.random(22));
+ StreamRecordBatch r4 = new StreamRecordBatch(STREAM_2, 0, 30, 30, TestUtils.random(22));
objectWriter.write(STREAM_0, List.of(r1));
objectWriter.write(STREAM_1, List.of(r2));
objectWriter.write(STREAM_1, List.of(r3));
@@ -101,8 +102,8 @@ public void setUp() throws Exception {
objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> {
assertEquals(OBJECT_1, objectId);
ObjectWriter objectWriter = ObjectWriter.writer(OBJECT_1, s3Operator, 1024, 1024);
- StreamRecordBatch r5 = new StreamRecordBatch(STREAM_0, 0, 15, 5, TestUtils.random(5));
- StreamRecordBatch r6 = new StreamRecordBatch(STREAM_1, 0, 60, 60, TestUtils.random(60));
+ StreamRecordBatch r5 = new StreamRecordBatch(STREAM_0, 0, 15, 5, TestUtils.random(1));
+ StreamRecordBatch r6 = new StreamRecordBatch(STREAM_1, 0, 60, 60, TestUtils.random(52));
objectWriter.write(STREAM_0, List.of(r5));
objectWriter.write(STREAM_1, List.of(r6));
objectWriter.close().join();
@@ -120,8 +121,8 @@ public void setUp() throws Exception {
objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> {
assertEquals(OBJECT_2, objectId);
ObjectWriter objectWriter = ObjectWriter.writer(OBJECT_2, s3Operator, 1024, 1024);
- StreamRecordBatch r8 = new StreamRecordBatch(STREAM_1, 0, 400, 100, TestUtils.random(100));
- StreamRecordBatch r9 = new StreamRecordBatch(STREAM_2, 0, 230, 40, TestUtils.random(40));
+ StreamRecordBatch r8 = new StreamRecordBatch(STREAM_1, 0, 400, 100, TestUtils.random(92));
+ StreamRecordBatch r9 = new StreamRecordBatch(STREAM_2, 0, 230, 40, TestUtils.random(32));
objectWriter.write(STREAM_1, List.of(r8));
objectWriter.write(STREAM_2, List.of(r9));
objectWriter.close().join();
@@ -145,7 +146,7 @@ protected boolean compare(StreamDataBlock block1, StreamDataBlock block2) {
boolean attr = block1.getStreamId() == block2.getStreamId() &&
block1.getStartOffset() == block2.getStartOffset() &&
block1.getEndOffset() == block2.getEndOffset() &&
- block1.getRecordCount() == block2.getRecordCount();
+ block1.dataBlockIndex().recordCount() == block2.dataBlockIndex().recordCount();
if (!attr) {
return false;
}
@@ -204,8 +205,8 @@ protected boolean compare(CompactedObject compactedObject1, CompactedObject comp
protected long calculateObjectSize(List streamDataBlocks) {
long bodySize = streamDataBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum();
- long indexBlockSize = 4 + 40L * streamDataBlocks.size();
- long tailSize = 48;
+ int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * streamDataBlocks.size();
+ long tailSize = ObjectWriter.Footer.FOOTER_SIZE;
return bodySize + indexBlockSize + tailSize;
}
}
diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java
index 13accbd53..6d9f21939 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java
@@ -60,10 +60,10 @@ public void setUp() throws Exception {
@Test
public void testWriteWALObject() {
List streamDataBlocks = List.of(
- new StreamDataBlock(STREAM_0, 0, 20, 2, 1, 30, 20, 1),
- new StreamDataBlock(STREAM_0, 20, 25, 3, 0, 10, 5, 1),
- new StreamDataBlock(STREAM_2, 40, 120, 0, 2, 100, 80, 1),
- new StreamDataBlock(STREAM_2, 120, 150, 1, 3, 0, 30, 1));
+ new StreamDataBlock(STREAM_0, 0, 20, 1, 30, 20, 1),
+ new StreamDataBlock(STREAM_0, 20, 25, 0, 10, 5, 1),
+ new StreamDataBlock(STREAM_2, 40, 120, 2, 100, 80, 1),
+ new StreamDataBlock(STREAM_2, 120, 150, 3, 0, 30, 1));
CompactedObject compactedObject = new CompactedObject(CompactionType.COMPACT, streamDataBlocks);
CompactionUploader uploader = new CompactionUploader(objectManager, s3Operator, config);
CompletableFuture cf = uploader.chainWriteStreamSetObject(null, compactedObject);
@@ -93,15 +93,15 @@ public void testWriteWALObject() {
@Test
public void testWriteWALObject2() {
List streamDataBlocks1 = List.of(
- new StreamDataBlock(STREAM_0, 0, 20, 2, 1, 30, 20, 1),
- new StreamDataBlock(STREAM_0, 20, 25, 3, 0, 10, 5, 1),
- new StreamDataBlock(STREAM_2, 40, 120, 0, 2, 100, 80, 1),
- new StreamDataBlock(STREAM_2, 120, 150, 1, 3, 0, 30, 1));
+ new StreamDataBlock(STREAM_0, 0, 20, 1, 30, 20, 1),
+ new StreamDataBlock(STREAM_0, 20, 25, 0, 10, 5, 1),
+ new StreamDataBlock(STREAM_2, 40, 120, 2, 100, 80, 1),
+ new StreamDataBlock(STREAM_2, 120, 150, 3, 0, 30, 1));
CompactedObject compactedObject = new CompactedObject(CompactionType.COMPACT, streamDataBlocks1);
List streamDataBlocks2 = List.of(
- new StreamDataBlock(STREAM_3, 0, 15, 2, 4, 0, 15, 1),
- new StreamDataBlock(STREAM_3, 15, 20, 1, 5, 20, 5, 1));
+ new StreamDataBlock(STREAM_3, 0, 15, 4, 0, 15, 1),
+ new StreamDataBlock(STREAM_3, 15, 20, 5, 20, 5, 1));
CompactedObject compactedObject2 = new CompactedObject(CompactionType.COMPACT, streamDataBlocks2);
CompactionUploader uploader = new CompactionUploader(objectManager, s3Operator, config);
@@ -141,8 +141,8 @@ public void testWriteWALObject2() {
@Test
public void testWriteStreamObject() {
List streamDataBlocks = List.of(
- new StreamDataBlock(STREAM_0, 0, 60, 1, 0, 23, 60, 1),
- new StreamDataBlock(STREAM_0, 60, 120, 0, 1, 45, 60, 1));
+ new StreamDataBlock(STREAM_0, 0, 60, 0, 23, 60, 1),
+ new StreamDataBlock(STREAM_0, 60, 120, 1, 45, 60, 1));
CompactedObject compactedObject = new CompactedObject(CompactionType.SPLIT, streamDataBlocks);
CompactionUploader uploader = new CompactionUploader(objectManager, s3Operator, config);
diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java
index 2c9d93d47..cf86392a1 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java
@@ -36,10 +36,10 @@ public class CompactionUtilTest extends CompactionTestBase {
@Test
public void testBuildObjectStreamRanges() {
List streamDataBlocks = List.of(
- new StreamDataBlock(STREAM_0, 0, 20, 2, 1, 0, 20, 1),
- new StreamDataBlock(STREAM_0, 20, 25, 3, 0, 20, 5, 1),
- new StreamDataBlock(STREAM_2, 40, 120, 0, 2, 25, 80, 1),
- new StreamDataBlock(STREAM_2, 120, 150, 1, 3, 105, 30, 1));
+ new StreamDataBlock(STREAM_0, 0, 20, 1, 0, 20, 1),
+ new StreamDataBlock(STREAM_0, 20, 25, 0, 20, 5, 1),
+ new StreamDataBlock(STREAM_2, 40, 120, 2, 25, 80, 1),
+ new StreamDataBlock(STREAM_2, 120, 150, 3, 105, 30, 1));
CompactedObject compactedObject = new CompactedObject(CompactionType.COMPACT, streamDataBlocks);
List result = CompactionUtils.buildObjectStreamRange(compactedObject.streamDataBlocks());
assertEquals(2, result.size());
@@ -54,11 +54,11 @@ public void testBuildObjectStreamRanges() {
@Test
public void testMergeStreamDataBlocks() {
List streamDataBlocks = List.of(
- new StreamDataBlock(STREAM_0, 0, 15, 0, 1, 0, 20, 1),
- new StreamDataBlock(STREAM_0, 15, 30, 1, 1, 20, 5, 1),
- new StreamDataBlock(STREAM_0, 30, 100, 2, 1, 25, 80, 1),
- new StreamDataBlock(STREAM_2, 40, 100, 3, 1, 105, 80, 1),
- new StreamDataBlock(STREAM_2, 120, 150, 4, 1, 185, 30, 1));
+ new StreamDataBlock(STREAM_0, 0, 15, 1, 0, 20, 1),
+ new StreamDataBlock(STREAM_0, 15, 30, 1, 20, 5, 1),
+ new StreamDataBlock(STREAM_0, 30, 100, 1, 25, 80, 1),
+ new StreamDataBlock(STREAM_2, 40, 100, 1, 105, 80, 1),
+ new StreamDataBlock(STREAM_2, 120, 150, 1, 185, 30, 1));
List> result = CompactionUtils.groupStreamDataBlocks(streamDataBlocks);
assertEquals(3, result.size());
Assertions.assertEquals(List.of(streamDataBlocks.get(0), streamDataBlocks.get(1), streamDataBlocks.get(2)), result.get(0));
diff --git a/store/src/main/java/com/automq/rocketmq/store/api/StreamStore.java b/store/src/main/java/com/automq/rocketmq/store/api/StreamStore.java
index ab08b2e55..0a72aa726 100644
--- a/store/src/main/java/com/automq/rocketmq/store/api/StreamStore.java
+++ b/store/src/main/java/com/automq/rocketmq/store/api/StreamStore.java
@@ -52,7 +52,7 @@ public interface StreamStore extends Lifecycle {
*
* @param streamId the target stream id.
* @param startOffset the start offset of the fetch.
- * @param maxCount the max return count of the fetch.
+ * @param maxCount the max return endOffsetDelta of the fetch.
* @return the future of fetch result.
*/
CompletableFuture fetch(StoreContext context, long streamId, long startOffset, int maxCount);