Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka-issues447): WAL record nodeId & epoch metadata #636

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.3.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.4.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.3.0-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
20 changes: 10 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

// TODO: rename & init
public class Config {
private int brokerId;
private int nodeId;
private String endpoint;
private String region;
private String bucket;
Expand All @@ -45,7 +45,7 @@ public class Config {
private int streamObjectCompactionLivingTimeMinutes = 60;
private int controllerRequestRetryMaxCount = Integer.MAX_VALUE;
private long controllerRequestRetryBaseDelayMs = 500;
private long brokerEpoch = 0L;
private long nodeEpoch = 0L;
private int streamSetObjectCompactionInterval = 20;
private long streamSetObjectCompactionCacheSize = 200 * 1024 * 1024;
private int streamSetObjectCompactionUploadConcurrency = 8;
Expand All @@ -60,8 +60,8 @@ public class Config {
private long networkBaselineBandwidth = 100 * 1024 * 1024;
private int refillPeriodMs = 1000;

public int brokerId() {
return brokerId;
public int nodeId() {
return nodeId;
}

public String endpoint() {
Expand Down Expand Up @@ -156,8 +156,8 @@ public long controllerRequestRetryBaseDelayMs() {
return controllerRequestRetryBaseDelayMs;
}

public long brokerEpoch() {
return brokerEpoch;
public long nodeEpoch() {
return nodeEpoch;
}

public int streamSetObjectCompactionInterval() {
Expand Down Expand Up @@ -216,8 +216,8 @@ public int refillPeriodMs() {
return refillPeriodMs;
}

public Config brokerId(int brokerId) {
this.brokerId = brokerId;
public Config nodeId(int brokerId) {
this.nodeId = brokerId;
return this;
}

Expand Down Expand Up @@ -336,8 +336,8 @@ public Config controllerRequestRetryBaseDelayMs(long s3ControllerRequestRetryBas
return this;
}

public Config brokerEpoch(long brokerEpoch) {
this.brokerEpoch = brokerEpoch;
public Config nodeEpoch(long brokerEpoch) {
this.nodeEpoch = brokerEpoch;
return this;
}

Expand Down
22 changes: 22 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.automq.stream.s3;

public class Constants {
public static final int NOOP_NODE_ID = -1;
public static final long NOOP_EPOCH = -1L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class DeltaWALUploadTask {

public DeltaWALUploadTask(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor, boolean forceSplit) {
this.s3ObjectLogger = S3ObjectLogger.logger(String.format("[DeltaWALUploadTask id=%d] ", config.brokerId()));
this.s3ObjectLogger = S3ObjectLogger.logger(String.format("[DeltaWALUploadTask id=%d] ", config.nodeId()));
this.streamRecordsMap = streamRecordsMap;
this.objectBlockSize = config.objectBlockSize();
this.objectPartSize = config.objectPartSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class CompactionManager {
private Bucket compactionBucket = null;

public CompactionManager(Config config, ObjectManager objectManager, StreamManager streamManager, S3Operator s3Operator) {
String logPrefix = String.format("[CompactionManager id=%d] ", config.brokerId());
String logPrefix = String.format("[CompactionManager id=%d] ", config.nodeId());
this.logger = new LogContext(logPrefix).logger(CompactionManager.class);
this.s3ObjectLogger = S3ObjectLogger.logger(logPrefix);
this.kafkaConfig = config;
Expand All @@ -102,7 +102,7 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag
maxStreamNumPerStreamSetObject = config.maxStreamNumPerStreamSetObject();
maxStreamObjectNumPerCommit = config.maxStreamObjectNumPerCommit();
this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, streamSplitSize, maxStreamNumPerStreamSetObject,
maxStreamObjectNumPerCommit, new LogContext(String.format("[CompactionAnalyzer id=%d] ", config.brokerId())));
maxStreamObjectNumPerCommit, new LogContext(String.format("[CompactionAnalyzer id=%d] ", config.nodeId())));
this.compactScheduledExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("schedule-compact-executor-%d", true), logger);
this.bucketCallbackScheduledExecutor = Threads.newSingleThreadScheduledExecutor(
Expand Down
Loading