Skip to content

Commit

Permalink
perf(s3stream): not use byte arrays to reduce heap mem alloc when dec…
Browse files Browse the repository at this point in the history
…oding (#823)

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Dec 12, 2023
1 parent 52f9ecb commit 19549be
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 38 deletions.
23 changes: 6 additions & 17 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@

package com.automq.stream.s3;

import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.api.exceptions.ErrorCode;
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.utils.ByteBufInputStream;
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes;
import io.netty.buffer.ByteBuf;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -250,7 +245,8 @@ void close() {
}
}

public record FindIndexResult(boolean isFulfilled, long nextStartOffset, int nextMaxBytes, List<StreamDataBlock> streamDataBlocks) {
public record FindIndexResult(boolean isFulfilled, long nextStartOffset, int nextMaxBytes,
List<StreamDataBlock> streamDataBlocks) {

}

Expand Down Expand Up @@ -302,8 +298,6 @@ public CloseableIterator<StreamRecordBatch> iterator() {
throw new RuntimeException("[FATAL] magic code mismatch, data is corrupted");
}
// TODO: check flag, use uncompressed stream or compressed stream.
// DataInputStream in = new DataInputStream(ZstdFactory.wrapForInput(buf.nioBuffer(), (byte) 0, BufferSupplier.NO_CACHING));
DataInputStream in = new DataInputStream(new ByteBufInputStream(buf.duplicate()));
return new CloseableIterator<>() {
@Override
public boolean hasNext() {
Expand All @@ -316,16 +310,11 @@ public StreamRecordBatch next() {
if (remainingRecordCount.decrementAndGet() < 0) {
throw new NoSuchElementException();
}
return StreamRecordBatchCodec.decode(in);
return StreamRecordBatchCodec.duplicateDecode(buf);
}

@Override
public void close() {
try {
in.close();
} catch (IOException e) {
throw new StreamClientException(ErrorCode.UNEXPECTED, "Failed to close object block stream ", e);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import com.automq.stream.s3.model.StreamRecordBatch;
import io.netty.buffer.ByteBuf;

import java.io.DataInputStream;
import java.io.IOException;

public class StreamRecordBatchCodec {
public static final byte MAGIC_V0 = 0x22;

Expand All @@ -48,28 +45,27 @@ public static ByteBuf encode(StreamRecordBatch streamRecord) {

/**
* Decode a stream record batch from a byte buffer and move the reader index.
* The returned stream record batch does NOT share the payload buffer with the input buffer.
*/
public static StreamRecordBatch decode(DataInputStream in) {
try {
byte magic = in.readByte(); // magic
if (magic != MAGIC_V0) {
throw new RuntimeException("Invalid magic byte " + magic);
}
long streamId = in.readLong();
long epoch = in.readLong();
long baseOffset = in.readLong();
int lastOffsetDelta = in.readInt();
int payloadLength = in.readInt();
byte[] payloadBytes = new byte[payloadLength];
in.readFully(payloadBytes);
ByteBuf payload = DirectByteBufAlloc.byteBuffer(payloadLength);
payload.writeBytes(payloadBytes);
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
} catch (IOException e) {
throw new RuntimeException(e);
public static StreamRecordBatch duplicateDecode(ByteBuf buf) {
byte magic = buf.readByte(); // magic
if (magic != MAGIC_V0) {
throw new RuntimeException("Invalid magic byte " + magic);
}
long streamId = buf.readLong();
long epoch = buf.readLong();
long baseOffset = buf.readLong();
int lastOffsetDelta = buf.readInt();
int payloadLength = buf.readInt();
ByteBuf payload = DirectByteBufAlloc.byteBuffer(payloadLength);
buf.readBytes(payload);
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
}

/**
* Decode a stream record batch from a byte buffer and move the reader index.
* The returned stream record batch shares the payload buffer with the input buffer.
*/
public static StreamRecordBatch decode(ByteBuf buf) {
buf.readByte(); // magic
long streamId = buf.readLong();
Expand Down

0 comments on commit 19549be

Please sign in to comment.