From be314520a331cbe6b7fa893f45b9bdcd5a8cd60f Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Fri, 2 Aug 2024 09:45:47 +0800 Subject: [PATCH] feat(controller): add has opening streams method (#1707) Signed-off-by: Robin Han --- .../stream/DefaultNodeRuntimeInfoGetter.java | 2 +- .../stream/StreamControlManager.java | 25 +++++++++++++++++++ .../org/apache/kafka/image/DeltaList.java | 24 ++++++++++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java b/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java index 4505dc95a9..e71d391afa 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java @@ -40,6 +40,6 @@ public NodeState state(int nodeId) { @Override public boolean hasOpeningStreams(int nodeId) { - return !streamControlManager.getOpeningStreams(nodeId).isEmpty(); + return streamControlManager.hasOpeningStreams(nodeId); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 5bf16b64b8..36bbc6d8a5 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -37,6 +37,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.common.Uuid; @@ -1077,6 +1079,29 @@ public List getOpeningStreams(int nodeId) { return streams; } + public boolean hasOpeningStreams(int nodeId) { + DeltaList streamIdList = node2streams.get(nodeId); + if (streamIdList == null) { + return false; + } + AtomicBoolean hasOpeningStreams = new AtomicBoolean(false); + streamIdList.reverseForEachWithBreak(new Function() { + @Override + public Boolean apply(Long streamId) { + StreamRuntimeMetadata streamRuntimeMetadata = streamsMetadata.get(streamId); + if (streamRuntimeMetadata == null) { + return false; + } + if (streamRuntimeMetadata.currentState() == StreamState.OPENED) { + hasOpeningStreams.set(true); + return true; + } + return false; + } + }); + return hasOpeningStreams.get(); + } + /** * Check whether this node is the owner of this stream. */ diff --git a/metadata/src/main/java/org/apache/kafka/image/DeltaList.java b/metadata/src/main/java/org/apache/kafka/image/DeltaList.java index ebb6085d73..b97aa39770 100644 --- a/metadata/src/main/java/org/apache/kafka/image/DeltaList.java +++ b/metadata/src/main/java/org/apache/kafka/image/DeltaList.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Objects; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; public class DeltaList { @@ -115,6 +116,29 @@ public void reverseForEach(Consumer consumer) { } } + /** + * Reverse iterate the list + * + * @param func accept a list element, return break loop mark. + */ + public void reverseForEachWithBreak(Function func) { + synchronized (operations) { + List> removedList = new ArrayList<>(); + for (int i = snapshotIndex - 1; i >= 0; i--) { + Operation operation = operations.get(i); + if (operation.tombstoneMatcher != null) { + removedList.add(operation); + } else { + if (!isRemoved(operation, removedList)) { + if (func.apply(operation.t)) { + return; + } + } + } + } + } + } + public void forEach(Consumer consumer) { synchronized (operations) { List> removedList = new ArrayList<>();