Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): format and cleanup code #874

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.automq.stream;

import com.automq.stream.api.RecordBatch;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.automq.stream.api.RecordBatch;
import com.automq.stream.api.RecordBatchWithContext;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
Expand All @@ -33,6 +32,12 @@ public RecordBatchWithContextWrapper(RecordBatch recordBatch, long baseOffset) {
this.baseOffset = baseOffset;
}

public static RecordBatchWithContextWrapper decode(ByteBuffer buffer) {
long baseOffset = buffer.getLong();
int count = buffer.getInt();
return new RecordBatchWithContextWrapper(new DefaultRecordBatch(count, 0, Collections.emptyMap(), buffer), baseOffset);
}

@Override
public long baseOffset() {
return baseOffset;
Expand Down Expand Up @@ -65,16 +70,10 @@ public ByteBuffer rawPayload() {

public byte[] encode() {
ByteBuffer buffer = ByteBuffer.allocate(8 + 4 + recordBatch.rawPayload().remaining())
.putLong(baseOffset)
.putInt(recordBatch.count())
.put(recordBatch.rawPayload().duplicate())
.flip();
.putLong(baseOffset)
.putInt(recordBatch.count())
.put(recordBatch.rawPayload().duplicate())
.flip();
return buffer.array();
}

public static RecordBatchWithContextWrapper decode(ByteBuffer buffer) {
long baseOffset = buffer.getLong();
int count = buffer.getInt();
return new RecordBatchWithContextWrapper(new DefaultRecordBatch(count, 0, Collections.emptyMap(), buffer), baseOffset);
}
}
1 change: 1 addition & 0 deletions s3stream/src/main/java/com/automq/stream/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface Client {
void start();

void shutdown();

/**
* Get stream client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.automq.stream.api;

import com.automq.stream.s3.cache.CacheAccessType;

import java.util.List;

public interface FetchResult {
Expand Down
1 change: 1 addition & 0 deletions s3stream/src/main/java/com/automq/stream/api/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface KVClient {

/**
* Put key value, overwrite if key exist, return current key value after putting.
*
* @param keyValue {@link KeyValue} k-v pair
* @return async put result. {@link KeyValue} current value after putting.
*/
Expand Down
13 changes: 8 additions & 5 deletions s3stream/src/main/java/com/automq/stream/api/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.automq.stream.api;


import java.nio.ByteBuffer;
import java.util.Objects;

Expand All @@ -44,8 +43,10 @@ public Value value() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
KeyValue keyValue = (KeyValue) o;
return Objects.equals(key, keyValue.key) && Objects.equals(value, keyValue.value);
}
Expand Down Expand Up @@ -131,8 +132,10 @@ public boolean isNull() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Value)) return false;
if (this == o)
return true;
if (!(o instanceof Value))
return false;
Value value1 = (Value) o;
return Objects.equals(value, value1.value);
}
Expand Down
8 changes: 4 additions & 4 deletions s3stream/src/main/java/com/automq/stream/api/ReadOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class ReadOptions {
private boolean fastRead;
private boolean pooledBuf;

public static Builder builder() {
return new Builder();
}

public boolean fastRead() {
return fastRead;
}
Expand All @@ -33,10 +37,6 @@ public boolean pooledBuf() {
return pooledBuf;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private final ReadOptions options = new ReadOptions();

Expand Down
2 changes: 0 additions & 2 deletions s3stream/src/main/java/com/automq/stream/api/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;

import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -48,7 +47,6 @@ public interface Stream {
*/
long nextOffset();


/**
* Append recordBatch to stream.
*
Expand Down
40 changes: 20 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/DeltaWALUploadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.utils.AsyncRateLimiter;
import com.automq.stream.utils.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -37,30 +34,33 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID;

public class DeltaWALUploadTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DeltaWALUploadTask.class);
private long startTimestamp;
final boolean forceSplit;
private final Logger s3ObjectLogger;
private final Map<Long, List<StreamRecordBatch>> streamRecordsMap;
private final int objectBlockSize;
private final int objectPartSize;
private final int streamSplitSizeThreshold;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
final boolean forceSplit;
private final boolean s3ObjectLogEnable;
private final CompletableFuture<Long> prepareCf = new CompletableFuture<>();
private volatile CommitStreamSetObjectRequest commitStreamSetObjectRequest;
private final CompletableFuture<CommitStreamSetObjectRequest> uploadCf = new CompletableFuture<>();
private final ExecutorService executor;
private final double rate;
private final AsyncRateLimiter limiter;
private long startTimestamp;
private volatile CommitStreamSetObjectRequest commitStreamSetObjectRequest;

public DeltaWALUploadTask(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor, boolean forceSplit, double rate) {
public DeltaWALUploadTask(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap,
ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor, boolean forceSplit, double rate) {
this.s3ObjectLogger = S3ObjectLogger.logger(String.format("[DeltaWALUploadTask id=%d] ", config.nodeId()));
this.streamRecordsMap = streamRecordsMap;
this.objectBlockSize = config.objectBlockSize();
Expand All @@ -85,12 +85,12 @@ public CompletableFuture<Long> prepare() {
prepareCf.complete(NOOP_OBJECT_ID);
} else {
objectManager
.prepareObject(1, TimeUnit.MINUTES.toMillis(60))
.thenAcceptAsync(prepareCf::complete, executor)
.exceptionally(ex -> {
prepareCf.completeExceptionally(ex);
return null;
});
.prepareObject(1, TimeUnit.MINUTES.toMillis(60))
.thenAcceptAsync(prepareCf::complete, executor)
.exceptionally(ex -> {
prepareCf.completeExceptionally(ex);
return null;
});
}
return prepareCf;
}
Expand Down Expand Up @@ -135,7 +135,7 @@ private void upload0(long objectId) {
request.setObjectId(objectId);
request.setOrderId(objectId);
CompletableFuture<Void> streamSetObjectCf = CompletableFuture.allOf(streamSetWriteCfList.toArray(new CompletableFuture[0]))
.thenCompose(nil -> streamSetObject.close().thenAccept(nil2 -> request.setObjectSize(streamSetObject.size())));
.thenCompose(nil -> streamSetObject.close().thenAccept(nil2 -> request.setObjectSize(streamSetObject.size())));
List<CompletableFuture<?>> allCf = new LinkedList<>(streamObjectCfList);
allCf.add(streamSetObjectCf);
CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> {
Expand All @@ -150,7 +150,7 @@ private void upload0(long objectId) {
public CompletableFuture<Void> commit() {
return uploadCf.thenCompose(request -> objectManager.commitStreamSetObject(request).thenAccept(resp -> {
LOGGER.info("Upload delta WAL {}, cost {}ms, rate limiter {}bytes/s", commitStreamSetObjectRequest,
System.currentTimeMillis() - startTimestamp, rate);
System.currentTimeMillis() - startTimestamp, rate);
if (s3ObjectLogEnable) {
s3ObjectLogger.trace("{}", commitStreamSetObjectRequest);
}
Expand Down Expand Up @@ -227,10 +227,10 @@ public DeltaWALUploadTask build() {
boolean forceSplit = streamRecordsMap.size() == 1;
if (!forceSplit) {
Optional<Boolean> hasStreamSetData = streamRecordsMap.values()
.stream()
.map(records -> records.stream().mapToLong(StreamRecordBatch::size).sum() >= config.streamSplitSize())
.filter(split -> !split)
.findAny();
.stream()
.map(records -> records.stream().mapToLong(StreamRecordBatch::size).sum() >= config.streamSplitSize())
.filter(split -> !split)
.findAny();
if (hasStreamSetData.isEmpty()) {
forceSplit = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectByteBufAlloc {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class);
Expand Down
24 changes: 12 additions & 12 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;
Expand Down Expand Up @@ -128,7 +127,8 @@ public void close0() {
*/
public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount, int indexBlockSize) {

public static BasicObjectInfo parse(ByteBuf objectTailBuf, S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
public static BasicObjectInfo parse(ByteBuf objectTailBuf,
S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - FOOTER_SIZE);
int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
if (indexBlockPosition + objectTailBuf.readableBytes() < s3ObjectMetadata.objectSize()) {
Expand Down Expand Up @@ -217,14 +217,14 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
rst.add(new StreamDataBlock(streamId, rangeStartOffset, rangeEndOffset, s3ObjectMetadata.objectId(),
new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount)));
new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount)));

// we consider first block as not matched because we do not know exactly how many bytes are within
// the range in first block, as a result we may read one more block than expected.
if (matched) {
int recordPayloadSize = blockSize
- recordCount * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
- recordCount * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
nextMaxBytes -= Math.min(nextMaxBytes, recordPayloadSize);
}
if ((endOffset != NOOP_OFFSET && nextStartOffset >= endOffset) || nextMaxBytes == 0) {
Expand Down Expand Up @@ -272,11 +272,11 @@ public long endPosition() {
@Override
public String toString() {
return "DataBlockIndex{" +
"blockId=" + blockId +
", startPosition=" + startPosition +
", size=" + size +
", recordCount=" + recordCount +
'}';
"blockId=" + blockId +
", startPosition=" + startPosition +
", size=" + size +
", recordCount=" + recordCount +
'}';
}
}

Expand Down
Loading
Loading