From 49af75567d14fb057d18270814b8b0fecbc5d54b Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Wed, 14 Aug 2024 15:50:45 +0800 Subject: [PATCH] chore(logger): move controller commit sso log to broker (#1794) Signed-off-by: Robin Han --- .../main/java/kafka/automq/AutoMQConfig.java | 1 - .../kafka/log/stream/s3/ConfigUtils.java | 1 - .../main/scala/kafka/server/KafkaConfig.scala | 1 - .../stream/StreamControlManager.java | 10 ------- .../java/com/automq/stream/s3/Config.java | 10 ------- .../automq/stream/s3/DeltaWALUploadTask.java | 8 +++--- .../stream/s3/compact/CompactionManager.java | 10 ++----- .../objects/CommitStreamSetObjectRequest.java | 27 ++++++++++++------- .../stream/s3/objects/StreamObject.java | 14 +++++----- 9 files changed, 30 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/kafka/automq/AutoMQConfig.java b/core/src/main/java/kafka/automq/AutoMQConfig.java index 68b238dc70..aca8d5bcfb 100644 --- a/core/src/main/java/kafka/automq/AutoMQConfig.java +++ b/core/src/main/java/kafka/automq/AutoMQConfig.java @@ -245,7 +245,6 @@ public static void define(ConfigDef configDef) { .define(AutoMQConfig.S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT_CONFIG, INT, S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT, MEDIUM, AutoMQConfig.S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT_DOC) .define(AutoMQConfig.S3_MOCK_ENABLE_CONFIG, BOOLEAN, false, LOW, AutoMQConfig.S3_MOCK_ENABLE_DOC) .define(AutoMQConfig.S3_OBJECT_DELETION_MINUTES_CONFIG, LONG, S3_OBJECT_DELETE_RETENTION_MINUTES, MEDIUM, AutoMQConfig.S3_OBJECT_DELETION_MINUTES_DOC) - .define(AutoMQConfig.S3_OBJECT_LOG_ENABLE_CONFIG, BOOLEAN, false, LOW, AutoMQConfig.S3_OBJECT_LOG_ENABLE_DOC) .define(AutoMQConfig.S3_NETWORK_BASELINE_BANDWIDTH_CONFIG, LONG, S3_NETWORK_BASELINE_BANDWIDTH, MEDIUM, AutoMQConfig.S3_NETWORK_BASELINE_BANDWIDTH_DOC) .define(AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_CONFIG, INT, S3_REFILL_PERIOD_MS, MEDIUM, AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_DOC) .define(AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_CONFIG, STRING, "INFO", MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_DOC) diff --git a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java index fa9e5f5c33..a052767863 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java @@ -41,7 +41,6 @@ public static Config to(KafkaConfig s) { .streamSetObjectCompactionForceSplitPeriod(s.s3StreamSetObjectCompactionForceSplitMinutes()) .streamSetObjectCompactionMaxObjectNum(s.s3StreamSetObjectCompactionMaxObjectNum()) .mockEnable(s.s3MockEnable()) - .objectLogEnable(s.s3ObjectLogEnable()) .networkBaselineBandwidth(s.s3NetworkBaselineBandwidthProp()) .refillPeriodMs(s.s3RefillPeriodMsProp()) .objectRetentionTimeInSecond(s.s3ObjectDeleteRetentionTimeInSecond()); diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4c99312d13..2e2eb1116e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1166,7 +1166,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3MaxStreamObjectNumPerCommit = getInt(AutoMQConfig.S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT_CONFIG) val s3MockEnable = getBoolean(AutoMQConfig.S3_MOCK_ENABLE_CONFIG) val s3ObjectDeleteRetentionTimeInSecond = getLong(AutoMQConfig.S3_OBJECT_DELETION_MINUTES_CONFIG) * 60 - val s3ObjectLogEnable = getBoolean(AutoMQConfig.S3_OBJECT_LOG_ENABLE_CONFIG) val s3NetworkBaselineBandwidthProp = getLong(AutoMQConfig.S3_NETWORK_BASELINE_BANDWIDTH_CONFIG) val s3RefillPeriodMsProp = getInt(AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_CONFIG) val s3MetricsLevel = getString(AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_CONFIG) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 41ebee5bf0..1aabd889bf 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -1492,16 +1492,6 @@ private void logCommitStreamSetObject(CommitStreamSetObjectRequestData req) { sb.append("[CommitStreamSetObject]: successfully commit stream set object, "); sb.append("streamSetObjectId=").append(req.objectId()).append(", nodeId=").append(req.nodeId()); sb.append(", nodeEpoch=").append(req.nodeEpoch()).append(", compactedObjects=").append(req.compactedObjectIds()); - sb.append(", \n\tstreamRanges="); - req.objectStreamRanges().forEach(range -> { - sb.append("(si=").append(range.streamId()).append(", so=").append(range.startOffset()).append(", eo=") - .append(range.endOffset()).append("), "); - }); - sb.append(", \n\tstreamObjects="); - req.streamObjects().forEach(obj -> { - sb.append("(si=").append(obj.streamId()).append(", so=").append(obj.startOffset()).append(", eo=") - .append(obj.endOffset()).append(", oi=").append(obj.objectId()).append("), "); - }); log.info(sb.toString()); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/Config.java b/s3stream/src/main/java/com/automq/stream/s3/Config.java index 2ed6025ec7..0d0719648c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/Config.java +++ b/s3stream/src/main/java/com/automq/stream/s3/Config.java @@ -43,7 +43,6 @@ public class Config { private int maxStreamNumPerStreamSetObject = 100000; private int maxStreamObjectNumPerCommit = 10000; private boolean mockEnable = false; - private boolean objectLogEnable = false; // 100MB/s private long networkBaselineBandwidth = 100 * 1024 * 1024; private int refillPeriodMs = 10; @@ -149,10 +148,6 @@ public boolean mockEnable() { return mockEnable; } - public boolean objectLogEnable() { - return objectLogEnable; - } - public long networkBaselineBandwidth() { return networkBaselineBandwidth; } @@ -281,11 +276,6 @@ public Config mockEnable(boolean s3MockEnable) { return this; } - public Config objectLogEnable(boolean s3ObjectLogEnable) { - this.objectLogEnable = s3ObjectLogEnable; - return this; - } - public Config networkBaselineBandwidth(long networkBaselineBandwidth) { this.networkBaselineBandwidth = networkBaselineBandwidth; return this; diff --git a/s3stream/src/main/java/com/automq/stream/s3/DeltaWALUploadTask.java b/s3stream/src/main/java/com/automq/stream/s3/DeltaWALUploadTask.java index d9b1aa7cff..02b2f29a2c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DeltaWALUploadTask.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DeltaWALUploadTask.java @@ -45,7 +45,6 @@ public class DeltaWALUploadTask { private final int streamSplitSizeThreshold; private final ObjectManager objectManager; private final ObjectStorage objectStorage; - private final boolean s3ObjectLogEnable; private final CompletableFuture prepareCf = new CompletableFuture<>(); private final CompletableFuture uploadCf = new CompletableFuture<>(); private final ExecutorService executor; @@ -65,7 +64,6 @@ public DeltaWALUploadTask(Config config, Map> stre this.objectBlockSize = config.objectBlockSize(); this.objectPartSize = config.objectPartSize(); this.streamSplitSizeThreshold = config.streamSplitSize(); - this.s3ObjectLogEnable = config.objectLogEnable(); this.objectManager = objectManager; this.objectStorage = objectStorage; this.forceSplit = forceSplit; @@ -175,13 +173,13 @@ public CompletableFuture commit() { commitTimestamp = System.currentTimeMillis(); return objectManager.commitStreamSetObject(request).thenAccept(resp -> { long now = System.currentTimeMillis(); - LOGGER.info("Upload delta WAL finished, cost {}ms, prepare {}ms, upload {}ms, commit {}ms, rate limiter {}bytes/s, request: {}", + LOGGER.info("Upload delta WAL finished, cost {}ms, prepare {}ms, upload {}ms, commit {}ms, rate limiter {}bytes/s", now - startTimestamp, uploadTimestamp - startTimestamp, commitTimestamp - uploadTimestamp, now - commitTimestamp, - rate, - commitStreamSetObjectRequest); + rate); + s3ObjectLogger.info("[UPLOAD_WAL] {}", request); }).whenComplete((nil, ex) -> limiter.close()); }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 4f7bd6b284..ee4d72df89 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -87,7 +87,6 @@ public class CompactionManager { private final int maxStreamNumPerStreamSetObject; private final int maxStreamObjectNumPerCommit; private final long networkBandwidth; - private final boolean s3ObjectLogEnable; private final long compactionCacheSize; private final AtomicBoolean running = new AtomicBoolean(false); private volatile CompletableFuture forceSplitCf = null; @@ -109,7 +108,6 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag this.compactionInterval = config.streamSetObjectCompactionInterval(); this.forceSplitObjectPeriod = config.streamSetObjectCompactionForceSplitPeriod(); this.maxObjectNumToCompact = config.streamSetObjectCompactionMaxObjectNum(); - this.s3ObjectLogEnable = config.objectLogEnable(); this.networkBandwidth = config.networkBaselineBandwidth(); this.uploader = new CompactionUploader(objectManager, objectStorage, config); this.compactionCacheSize = config.streamSetObjectCompactionCacheSize(); @@ -334,9 +332,7 @@ void forceSplitObjects(List streamMetadataList, List { logger.info("Commit force split request succeed, time cost: {} ms", timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); - if (s3ObjectLogEnable) { - s3ObjectLogger.trace("[Compact] {}", request); - } + s3ObjectLogger.info("[COMPACT] {}", request); }) .exceptionally(ex -> { logger.error("Commit force split request failed, ex: ", ex); @@ -383,9 +379,7 @@ private void compactObjects(List streamMetadataList, List { logger.info("Commit compact request succeed, time cost: {} ms", timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); - if (s3ObjectLogEnable) { - s3ObjectLogger.trace("[Compact] {}", request); - } + s3ObjectLogger.info("[COMPACT] {}", request); }) .exceptionally(ex -> { logger.error("Commit compact request failed, ex: ", ex); diff --git a/s3stream/src/main/java/com/automq/stream/s3/objects/CommitStreamSetObjectRequest.java b/s3stream/src/main/java/com/automq/stream/s3/objects/CommitStreamSetObjectRequest.java index b7ae6d39b8..496e488e97 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/objects/CommitStreamSetObjectRequest.java +++ b/s3stream/src/main/java/com/automq/stream/s3/objects/CommitStreamSetObjectRequest.java @@ -136,14 +136,23 @@ public void setAttributes(int attributes) { @Override public String toString() { - return "CommitStreamSetObjectRequest{" + - "objectId=" + objectId + - ", orderId=" + orderId + - ", objectSize=" + objectSize + - ", streamRanges=" + streamRanges + - ", streamObjects=" + streamObjects + - ", compactedObjectIds=" + compactedObjectIds + - ", attributes=" + attributes + - '}'; + StringBuilder sb = new StringBuilder(); + sb.append("[CommitStreamSetObjectRequest]:"); + sb.append("streamSetObjectId=").append(objectId).append(", attr=").append(attributes).append(", compactedObjects=").append(compactedObjectIds); + sb.append(", \n\tstreamRanges="); + getStreamRanges().forEach(range -> { + sb.append("(si=").append(range.getStreamId()).append(", so=").append(range.getStartOffset()).append(", eo=") + .append(range.getEndOffset()).append("), "); + }); + sb.append(", \n\tstreamObjects="); + getStreamObjects().forEach(obj -> + sb.append("(si=").append(obj.getStreamId()) + .append(", so=").append(obj.getStartOffset()) + .append(", eo=").append(obj.getEndOffset()) + .append(", oi=").append(obj.getObjectId()) + .append(", size=").append(obj.getObjectSize()) + .append(", attr=").append(obj.getAttributes()) + .append("), ")); + return sb.toString(); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/objects/StreamObject.java b/s3stream/src/main/java/com/automq/stream/s3/objects/StreamObject.java index de8f57cf68..d01f7f1e00 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/objects/StreamObject.java +++ b/s3stream/src/main/java/com/automq/stream/s3/objects/StreamObject.java @@ -70,13 +70,13 @@ public void setAttributes(int attributes) { @Override public String toString() { - return "StreamObject{" + - "objectId=" + objectId + - ", objectSize=" + objectSize + - ", streamId=" + streamId + - ", startOffset=" + startOffset + - ", endOffset=" + endOffset + - ", attributes=" + attributes + + return "so{" + + "oi=" + objectId + + ", size=" + objectSize + + ", si=" + streamId + + ", so=" + startOffset + + ", eo=" + endOffset + + ", attr=" + attributes + '}'; } }