Skip to content

Commit

Permalink
chore(logger): move controller commit sso log to broker (#1794)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Aug 14, 2024
1 parent 3133390 commit 49af755
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 52 deletions.
1 change: 0 additions & 1 deletion core/src/main/java/kafka/automq/AutoMQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ public static void define(ConfigDef configDef) {
.define(AutoMQConfig.S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT_CONFIG, INT, S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT, MEDIUM, AutoMQConfig.S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT_DOC)
.define(AutoMQConfig.S3_MOCK_ENABLE_CONFIG, BOOLEAN, false, LOW, AutoMQConfig.S3_MOCK_ENABLE_DOC)
.define(AutoMQConfig.S3_OBJECT_DELETION_MINUTES_CONFIG, LONG, S3_OBJECT_DELETE_RETENTION_MINUTES, MEDIUM, AutoMQConfig.S3_OBJECT_DELETION_MINUTES_DOC)
.define(AutoMQConfig.S3_OBJECT_LOG_ENABLE_CONFIG, BOOLEAN, false, LOW, AutoMQConfig.S3_OBJECT_LOG_ENABLE_DOC)
.define(AutoMQConfig.S3_NETWORK_BASELINE_BANDWIDTH_CONFIG, LONG, S3_NETWORK_BASELINE_BANDWIDTH, MEDIUM, AutoMQConfig.S3_NETWORK_BASELINE_BANDWIDTH_DOC)
.define(AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_CONFIG, INT, S3_REFILL_PERIOD_MS, MEDIUM, AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_CONFIG, STRING, "INFO", MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_DOC)
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public static Config to(KafkaConfig s) {
.streamSetObjectCompactionForceSplitPeriod(s.s3StreamSetObjectCompactionForceSplitMinutes())
.streamSetObjectCompactionMaxObjectNum(s.s3StreamSetObjectCompactionMaxObjectNum())
.mockEnable(s.s3MockEnable())
.objectLogEnable(s.s3ObjectLogEnable())
.networkBaselineBandwidth(s.s3NetworkBaselineBandwidthProp())
.refillPeriodMs(s.s3RefillPeriodMsProp())
.objectRetentionTimeInSecond(s.s3ObjectDeleteRetentionTimeInSecond());
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val s3MaxStreamObjectNumPerCommit = getInt(AutoMQConfig.S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT_CONFIG)
val s3MockEnable = getBoolean(AutoMQConfig.S3_MOCK_ENABLE_CONFIG)
val s3ObjectDeleteRetentionTimeInSecond = getLong(AutoMQConfig.S3_OBJECT_DELETION_MINUTES_CONFIG) * 60
val s3ObjectLogEnable = getBoolean(AutoMQConfig.S3_OBJECT_LOG_ENABLE_CONFIG)
val s3NetworkBaselineBandwidthProp = getLong(AutoMQConfig.S3_NETWORK_BASELINE_BANDWIDTH_CONFIG)
val s3RefillPeriodMsProp = getInt(AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_CONFIG)
val s3MetricsLevel = getString(AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_CONFIG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1492,16 +1492,6 @@ private void logCommitStreamSetObject(CommitStreamSetObjectRequestData req) {
sb.append("[CommitStreamSetObject]: successfully commit stream set object, ");
sb.append("streamSetObjectId=").append(req.objectId()).append(", nodeId=").append(req.nodeId());
sb.append(", nodeEpoch=").append(req.nodeEpoch()).append(", compactedObjects=").append(req.compactedObjectIds());
sb.append(", \n\tstreamRanges=");
req.objectStreamRanges().forEach(range -> {
sb.append("(si=").append(range.streamId()).append(", so=").append(range.startOffset()).append(", eo=")
.append(range.endOffset()).append("), ");
});
sb.append(", \n\tstreamObjects=");
req.streamObjects().forEach(obj -> {
sb.append("(si=").append(obj.streamId()).append(", so=").append(obj.startOffset()).append(", eo=")
.append(obj.endOffset()).append(", oi=").append(obj.objectId()).append("), ");
});
log.info(sb.toString());
}

Expand Down
10 changes: 0 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class Config {
private int maxStreamNumPerStreamSetObject = 100000;
private int maxStreamObjectNumPerCommit = 10000;
private boolean mockEnable = false;
private boolean objectLogEnable = false;
// 100MB/s
private long networkBaselineBandwidth = 100 * 1024 * 1024;
private int refillPeriodMs = 10;
Expand Down Expand Up @@ -149,10 +148,6 @@ public boolean mockEnable() {
return mockEnable;
}

public boolean objectLogEnable() {
return objectLogEnable;
}

public long networkBaselineBandwidth() {
return networkBaselineBandwidth;
}
Expand Down Expand Up @@ -281,11 +276,6 @@ public Config mockEnable(boolean s3MockEnable) {
return this;
}

public Config objectLogEnable(boolean s3ObjectLogEnable) {
this.objectLogEnable = s3ObjectLogEnable;
return this;
}

public Config networkBaselineBandwidth(long networkBaselineBandwidth) {
this.networkBaselineBandwidth = networkBaselineBandwidth;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class DeltaWALUploadTask {
private final int streamSplitSizeThreshold;
private final ObjectManager objectManager;
private final ObjectStorage objectStorage;
private final boolean s3ObjectLogEnable;
private final CompletableFuture<Long> prepareCf = new CompletableFuture<>();
private final CompletableFuture<CommitStreamSetObjectRequest> uploadCf = new CompletableFuture<>();
private final ExecutorService executor;
Expand All @@ -65,7 +64,6 @@ public DeltaWALUploadTask(Config config, Map<Long, List<StreamRecordBatch>> stre
this.objectBlockSize = config.objectBlockSize();
this.objectPartSize = config.objectPartSize();
this.streamSplitSizeThreshold = config.streamSplitSize();
this.s3ObjectLogEnable = config.objectLogEnable();
this.objectManager = objectManager;
this.objectStorage = objectStorage;
this.forceSplit = forceSplit;
Expand Down Expand Up @@ -175,13 +173,13 @@ public CompletableFuture<Void> commit() {
commitTimestamp = System.currentTimeMillis();
return objectManager.commitStreamSetObject(request).thenAccept(resp -> {
long now = System.currentTimeMillis();
LOGGER.info("Upload delta WAL finished, cost {}ms, prepare {}ms, upload {}ms, commit {}ms, rate limiter {}bytes/s, request: {}",
LOGGER.info("Upload delta WAL finished, cost {}ms, prepare {}ms, upload {}ms, commit {}ms, rate limiter {}bytes/s",
now - startTimestamp,
uploadTimestamp - startTimestamp,
commitTimestamp - uploadTimestamp,
now - commitTimestamp,
rate,
commitStreamSetObjectRequest);
rate);
s3ObjectLogger.info("[UPLOAD_WAL] {}", request);
}).whenComplete((nil, ex) -> limiter.close());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public class CompactionManager {
private final int maxStreamNumPerStreamSetObject;
private final int maxStreamObjectNumPerCommit;
private final long networkBandwidth;
private final boolean s3ObjectLogEnable;
private final long compactionCacheSize;
private final AtomicBoolean running = new AtomicBoolean(false);
private volatile CompletableFuture<Void> forceSplitCf = null;
Expand All @@ -109,7 +108,6 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag
this.compactionInterval = config.streamSetObjectCompactionInterval();
this.forceSplitObjectPeriod = config.streamSetObjectCompactionForceSplitPeriod();
this.maxObjectNumToCompact = config.streamSetObjectCompactionMaxObjectNum();
this.s3ObjectLogEnable = config.objectLogEnable();
this.networkBandwidth = config.networkBaselineBandwidth();
this.uploader = new CompactionUploader(objectManager, objectStorage, config);
this.compactionCacheSize = config.streamSetObjectCompactionCacheSize();
Expand Down Expand Up @@ -334,9 +332,7 @@ void forceSplitObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMet
objectManager.commitStreamSetObject(request)
.thenAccept(resp -> {
logger.info("Commit force split request succeed, time cost: {} ms", timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
if (s3ObjectLogEnable) {
s3ObjectLogger.trace("[Compact] {}", request);
}
s3ObjectLogger.info("[COMPACT] {}", request);
})
.exceptionally(ex -> {
logger.error("Commit force split request failed, ex: ", ex);
Expand Down Expand Up @@ -383,9 +379,7 @@ private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3Obje
objectManager.commitStreamSetObject(request)
.thenAccept(resp -> {
logger.info("Commit compact request succeed, time cost: {} ms", timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
if (s3ObjectLogEnable) {
s3ObjectLogger.trace("[Compact] {}", request);
}
s3ObjectLogger.info("[COMPACT] {}", request);
})
.exceptionally(ex -> {
logger.error("Commit compact request failed, ex: ", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,23 @@ public void setAttributes(int attributes) {

@Override
public String toString() {
return "CommitStreamSetObjectRequest{" +
"objectId=" + objectId +
", orderId=" + orderId +
", objectSize=" + objectSize +
", streamRanges=" + streamRanges +
", streamObjects=" + streamObjects +
", compactedObjectIds=" + compactedObjectIds +
", attributes=" + attributes +
'}';
StringBuilder sb = new StringBuilder();
sb.append("[CommitStreamSetObjectRequest]:");
sb.append("streamSetObjectId=").append(objectId).append(", attr=").append(attributes).append(", compactedObjects=").append(compactedObjectIds);
sb.append(", \n\tstreamRanges=");
getStreamRanges().forEach(range -> {
sb.append("(si=").append(range.getStreamId()).append(", so=").append(range.getStartOffset()).append(", eo=")
.append(range.getEndOffset()).append("), ");
});
sb.append(", \n\tstreamObjects=");
getStreamObjects().forEach(obj ->
sb.append("(si=").append(obj.getStreamId())
.append(", so=").append(obj.getStartOffset())
.append(", eo=").append(obj.getEndOffset())
.append(", oi=").append(obj.getObjectId())
.append(", size=").append(obj.getObjectSize())
.append(", attr=").append(obj.getAttributes())
.append("), "));
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ public void setAttributes(int attributes) {

@Override
public String toString() {
return "StreamObject{" +
"objectId=" + objectId +
", objectSize=" + objectSize +
", streamId=" + streamId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", attributes=" + attributes +
return "so{" +
"oi=" + objectId +
", size=" + objectSize +
", si=" + streamId +
", so=" + startOffset +
", eo=" + endOffset +
", attr=" + attributes +
'}';
}
}

0 comments on commit 49af755

Please sign in to comment.