Skip to content

Commit

Permalink
feat(controller): add has opening streams method (#1707)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Aug 2, 2024
1 parent 756fe41 commit be31452
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public NodeState state(int nodeId) {

@Override
public boolean hasOpeningStreams(int nodeId) {
return !streamControlManager.getOpeningStreams(nodeId).isEmpty();
return streamControlManager.hasOpeningStreams(nodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -1077,6 +1079,29 @@ public List<StreamRuntimeMetadata> getOpeningStreams(int nodeId) {
return streams;
}

public boolean hasOpeningStreams(int nodeId) {
DeltaList<Long> streamIdList = node2streams.get(nodeId);
if (streamIdList == null) {
return false;
}
AtomicBoolean hasOpeningStreams = new AtomicBoolean(false);
streamIdList.reverseForEachWithBreak(new Function<Long, Boolean>() {
@Override
public Boolean apply(Long streamId) {
StreamRuntimeMetadata streamRuntimeMetadata = streamsMetadata.get(streamId);
if (streamRuntimeMetadata == null) {
return false;
}
if (streamRuntimeMetadata.currentState() == StreamState.OPENED) {
hasOpeningStreams.set(true);
return true;
}
return false;
}
});
return hasOpeningStreams.get();
}

/**
* Check whether this node is the owner of this stream.
*/
Expand Down
24 changes: 24 additions & 0 deletions metadata/src/main/java/org/apache/kafka/image/DeltaList.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

public class DeltaList<T> {
Expand Down Expand Up @@ -115,6 +116,29 @@ public void reverseForEach(Consumer<T> consumer) {
}
}

/**
* Reverse iterate the list
*
* @param func accept a list element, return break loop mark.
*/
public void reverseForEachWithBreak(Function<T, Boolean> func) {
synchronized (operations) {
List<Operation<T>> removedList = new ArrayList<>();
for (int i = snapshotIndex - 1; i >= 0; i--) {
Operation<T> operation = operations.get(i);
if (operation.tombstoneMatcher != null) {
removedList.add(operation);
} else {
if (!isRemoved(operation, removedList)) {
if (func.apply(operation.t)) {
return;
}
}
}
}
}
}

public void forEach(Consumer<T> consumer) {
synchronized (operations) {
List<Operation<T>> removedList = new ArrayList<>();
Expand Down

0 comments on commit be31452

Please sign in to comment.