Skip to content

Commit

Permalink
feat(kafka_issues642): object format support data block group
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Jan 9, 2024
1 parent 1dab15a commit 4a7acb8
Show file tree
Hide file tree
Showing 25 changed files with 409 additions and 808 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.14.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.15.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.14.0-SNAPSHOT</version>
<version>0.15.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
44 changes: 44 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3;

import io.netty.buffer.ByteBuf;

public record DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition,
int size) {

public static final int BLOCK_INDEX_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */
+ 4 /* record count */ + 8 /* block position */ + 4 /* block size */;

public long endOffset() {
return startOffset + endOffsetDelta;
}

public long endPosition() {
return startPosition + size;
}

public void encode(ByteBuf buf) {
buf.writeLong(streamId);
buf.writeLong(startOffset);
buf.writeInt(endOffsetDelta);
buf.writeInt(recordCount);
buf.writeLong(startPosition);
buf.writeInt(size);
}
}
164 changes: 74 additions & 90 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public CompletableFuture<FindIndexResult> find(long streamId, long startOffset,

public CompletableFuture<DataBlock> read(DataBlockIndex block) {
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.THROTTLE_1);
return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount()));
return rangeReadCf.thenApply(DataBlock::new);
}

void asyncGetBasicObjectInfo() {
Expand Down Expand Up @@ -121,12 +121,10 @@ public void close0() {
}

/**
* @param dataBlockSize The total size of the data blocks, which equals to index start position.
* @param indexBlock raw index data.
* @param blockCount The number of data blocks in the object.
* @param indexBlockSize The size of the index blocks.
* @param dataBlockSize The total size of the data blocks, which equals to index start position.
* @param indexBlock raw index data.
*/
public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount, int indexBlockSize) {
public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) {

public static BasicObjectInfo parse(ByteBuf objectTailBuf,
S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
Expand All @@ -145,13 +143,7 @@ public static BasicObjectInfo parse(ByteBuf objectTailBuf,
indexBlockBuf.readBytes(copy, indexBlockBuf.readableBytes());
objectTailBuf.release();
indexBlockBuf = copy;

int blockCount = indexBlockBuf.readInt();
ByteBuf blocks = indexBlockBuf.retainedSlice(indexBlockBuf.readerIndex(), blockCount * 16);
indexBlockBuf.skipBytes(blockCount * 16);
ByteBuf streamRanges = indexBlockBuf.retainedSlice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes());
indexBlockBuf.release();
return new BasicObjectInfo(indexBlockPosition, new IndexBlock(s3ObjectMetadata, blocks, streamRanges), blockCount, indexBlockSize);
return new BasicObjectInfo(indexBlockPosition, new IndexBlock(s3ObjectMetadata, indexBlockBuf));
}
}

Expand All @@ -165,48 +157,47 @@ void close() {
}

public static class IndexBlock {
public static final int INDEX_BLOCK_UNIT_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */
+ 4 /* record count */ + 8 /* block position */ + 4 /* block size */;
private final S3ObjectMetadata s3ObjectMetadata;
private final ByteBuf blocks;
private final ByteBuf streamRanges;
private final ByteBuf buf;
private final int size;
private final int count;

public IndexBlock(S3ObjectMetadata s3ObjectMetadata, ByteBuf blocks, ByteBuf streamRanges) {
public IndexBlock(S3ObjectMetadata s3ObjectMetadata, ByteBuf buf) {
this.s3ObjectMetadata = s3ObjectMetadata;
this.blocks = blocks;
this.streamRanges = streamRanges;
this.size = blocks.readableBytes() + streamRanges.readableBytes();
this.buf = buf;
this.size = buf.readableBytes();
this.count = buf.readableBytes() / INDEX_BLOCK_UNIT_SIZE;
}

public Iterator<StreamDataBlock> iterator() {
ByteBuf blocks = this.blocks.slice();
ByteBuf ranges = this.streamRanges.slice();
public Iterator<DataBlockIndex> iterator() {
AtomicInteger getIndex = new AtomicInteger(0);
return new Iterator<>() {
@Override
public boolean hasNext() {
return ranges.readableBytes() != 0;
return getIndex.get() < count;
}

@Override
public StreamDataBlock next() {
long streamId = ranges.readLong();
long startOffset = ranges.readLong();
long endOffset = startOffset + ranges.readInt();
int rangeBlockId = ranges.readInt();
long blockPosition = blocks.getLong(rangeBlockId * 16);
int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
return new StreamDataBlock(streamId, startOffset, endOffset, rangeBlockId, s3ObjectMetadata.objectId(),
blockPosition, blockSize, recordCount);
public DataBlockIndex next() {
return get(getIndex.getAndIncrement());
}
};
}

public ByteBuf blocks() {
return blocks.slice();
}

public ByteBuf streamRanges() {
return streamRanges.slice();
public DataBlockIndex get(int index) {
if (index < 0 || index >= count) {
throw new IllegalArgumentException("index" + index + " is out of range [0, " + count + ")");
}
int base = index * INDEX_BLOCK_UNIT_SIZE;
long streamId = buf.getLong(base);
long startOffset = buf.getLong(base + 8);
int endOffsetDelta = buf.getInt(base + 16);
int recordCount = buf.getInt(base + 20);
long blockPosition = buf.getLong(base + 24);
int blockSize = buf.getInt(base + 32);
return new DataBlockIndex(streamId, startOffset, endOffsetDelta, recordCount, blockPosition, blockSize);
}

public FindIndexResult find(long streamId, long startOffset, long endOffset) {
Expand All @@ -219,37 +210,30 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
boolean matched = false;
boolean isFulfilled = false;
List<StreamDataBlock> rst = new LinkedList<>();
IndexBlockOrderedBytes indexBlockOrderedBytes = new IndexBlockOrderedBytes(streamRanges);
IndexBlockOrderedBytes indexBlockOrderedBytes = new IndexBlockOrderedBytes(this);
int startIndex = indexBlockOrderedBytes.search(new IndexBlockOrderedBytes.TargetStreamOffset(streamId, startOffset));
if (startIndex == -1) {
// mismatched
return new FindIndexResult(false, nextStartOffset, nextMaxBytes, rst);
}
for (int i = startIndex * 24; i < streamRanges.readableBytes(); i += 24) {
long rangeStreamId = streamRanges.getLong(i);
long rangeStartOffset = streamRanges.getLong(i + 8);
long rangeEndOffset = rangeStartOffset + streamRanges.getInt(i + 16);
int rangeBlockId = streamRanges.getInt(i + 20);
if (rangeStreamId == streamId) {
if (nextStartOffset < rangeStartOffset) {
for (int i = startIndex; i < count(); i++) {
DataBlockIndex index = get(i);
if (index.streamId() == streamId) {
if (nextStartOffset < index.startOffset()) {
break;
}
if (rangeEndOffset <= nextStartOffset) {
if (index.endOffset() <= nextStartOffset) {
continue;
}
matched = nextStartOffset == rangeStartOffset;
nextStartOffset = rangeEndOffset;
long blockPosition = blocks.getLong(rangeBlockId * 16);
int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
rst.add(new StreamDataBlock(streamId, rangeStartOffset, rangeEndOffset, s3ObjectMetadata.objectId(),
new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount)));
matched = nextStartOffset == index.startOffset();
nextStartOffset = index.endOffset();
rst.add(new StreamDataBlock(s3ObjectMetadata.objectId(), index));

// we consider first block as not matched because we do not know exactly how many bytes are within
// the range in first block, as a result we may read one more block than expected.
if (matched) {
int recordPayloadSize = blockSize
- recordCount * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
int recordPayloadSize = index.size()
- index.recordCount() * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
nextMaxBytes -= Math.min(nextMaxBytes, recordPayloadSize);
}
Expand All @@ -264,13 +248,16 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
return new FindIndexResult(isFulfilled, nextStartOffset, nextMaxBytes, rst);
}

int size() {
public int size() {
return size;
}

public int count() {
return count;
}

void close() {
blocks.release();
streamRanges.release();
buf.release();
}
}

Expand All @@ -288,45 +275,36 @@ public IndexBlockParseException(long indexBlockPosition) {

}

public record DataBlockIndex(int blockId, long startPosition, int size, int recordCount) {
public static final int BLOCK_INDEX_SIZE = 8 + 4 + 4;

public long endPosition() {
return startPosition + size;
}

@Override
public String toString() {
return "DataBlockIndex{" +
"blockId=" + blockId +
", startPosition=" + startPosition +
", size=" + size +
", recordCount=" + recordCount +
'}';
}
}

public static class DataBlock implements AutoCloseable {
private final ByteBuf buf;
private final int recordCount;

public DataBlock(ByteBuf buf, int recordCount) {
this.buf = buf;
this.recordCount = recordCount;
public DataBlock(ByteBuf buf) {
this.buf = buf.duplicate();
this.recordCount = check(buf);
}

private static int check(ByteBuf buf) {
buf = buf.duplicate();
int recordCount = 0;
while (buf.readableBytes() > 0) {
byte magicCode = buf.readByte();
if (magicCode != ObjectWriter.DATA_BLOCK_MAGIC) {
LOGGER.error("magic code mismatch, expected {}, actual {}", ObjectWriter.DATA_BLOCK_MAGIC, magicCode);
throw new RuntimeException("[FATAL] magic code mismatch, data is corrupted");
}
buf.readByte(); // flag
recordCount += buf.readInt();
int dataLength = buf.readInt();
buf.skipBytes(dataLength);
}
return recordCount;
}

public CloseableIterator<StreamRecordBatch> iterator() {
ByteBuf buf = this.buf.duplicate();
AtomicInteger currentBlockRecordCount = new AtomicInteger(0);
AtomicInteger remainingRecordCount = new AtomicInteger(recordCount);
// skip magic and flag
byte magicCode = buf.readByte();
buf.readByte();

if (magicCode != ObjectWriter.DATA_BLOCK_MAGIC) {
LOGGER.error("magic code mismatch, expected {}, actual {}", ObjectWriter.DATA_BLOCK_MAGIC, magicCode);
throw new RuntimeException("[FATAL] magic code mismatch, data is corrupted");
}
// TODO: check flag, use uncompressed stream or compressed stream.
return new CloseableIterator<>() {
@Override
public boolean hasNext() {
Expand All @@ -339,6 +317,12 @@ public StreamRecordBatch next() {
if (remainingRecordCount.decrementAndGet() < 0) {
throw new NoSuchElementException();
}
if (currentBlockRecordCount.get() == 0) {
buf.skipBytes(1 /* magic */ + 1 /* flag */);
currentBlockRecordCount.set(buf.readInt());
buf.skipBytes(4);
}
currentBlockRecordCount.decrementAndGet();
return StreamRecordBatchCodec.duplicateDecode(buf);
}

Expand Down
31 changes: 8 additions & 23 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,33 +200,15 @@ class IndexBlock {

public IndexBlock() {
long nextPosition = 0;
int indexBlockSize = 4 + (8 + 4 + 4 + 8 + 8 + 4 + 4) * completedBlocks.size();
int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * completedBlocks.size();
buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, "write_index_block");
buf.writeInt(completedBlocks.size()); // block count
// block index
for (DataBlock block : completedBlocks) {
// start position in the object
buf.writeLong(nextPosition);
// byte size of the block
buf.writeInt(block.size());
// how many ranges in the block
buf.writeInt(block.recordCount());
ObjectStreamRange streamRange = block.getStreamRange();
new DataBlockIndex(streamRange.getStreamId(), streamRange.getStartOffset(), (int) (streamRange.getEndOffset() - streamRange.getStartOffset()),
block.recordCount(), nextPosition, block.size()).encode(buf);
nextPosition += block.size();
}
position = nextPosition;
// object stream range
for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) {
DataBlock block = completedBlocks.get(blockIndex);
ObjectStreamRange range = block.getStreamRange();
// stream id of this range
buf.writeLong(range.getStreamId());
// start offset of the related stream
buf.writeLong(range.getStartOffset());
// record count of the related stream in this range
buf.writeInt((int) (range.getEndOffset() - range.getStartOffset()));
// the index of block where this range is in
buf.writeInt(blockIndex);
}
}

public ByteBuf buffer() {
Expand All @@ -244,7 +226,7 @@ public int size() {
}

class DataBlock {
public static final int BLOCK_HEADER_SIZE = 2;
public static final int BLOCK_HEADER_SIZE = 1 /* magic */ + 1/* flag */ + 4 /* record count*/ + 4 /* data length */;
private final CompositeByteBuf encodedBuf;
private final ObjectStreamRange streamRange;
private final int recordCount;
Expand All @@ -256,9 +238,12 @@ public DataBlock(long streamId, List<StreamRecordBatch> records) {
ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE);
header.writeByte(DATA_BLOCK_MAGIC);
header.writeByte(DATA_BLOCK_DEFAULT_FLAG);
header.writeInt(recordCount);
header.writeInt(0); // data length
encodedBuf.addComponent(true, header);
records.forEach(r -> encodedBuf.addComponent(true, r.encoded().retain()));
this.size = encodedBuf.readableBytes();
encodedBuf.setInt(BLOCK_HEADER_SIZE - 4, size - BLOCK_HEADER_SIZE);
this.streamRange = new ObjectStreamRange(streamId, records.get(0).getEpoch(), records.get(0).getBaseOffset(), records.get(records.size() - 1).getLastOffset(), size);
}

Expand Down
Loading

0 comments on commit 4a7acb8

Please sign in to comment.