diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/GenerateStartCmdCmd.java b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateStartCmdCmd.java index 1149592f20..7f58bb4f9c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/GenerateStartCmdCmd.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateStartCmdCmd.java @@ -17,6 +17,10 @@ package org.apache.kafka.tools.automq; import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; @@ -109,36 +113,46 @@ public void run() throws IOException { for (int controllerNodeId : controllerGroupConfig.getNodeIdList()) { if (parameter.controllerOnlyMode) { - System.out.println(String.format("bin/kafka-server-start.sh " + System.out.printf("bin/kafka-server-start.sh " + "--s3-url=\"%s\" " + "--override process.roles=controller " + "--override node.id=%s " + "--override controller.quorum.voters=%s " - + "--override listeners=%s ", parameter.s3Url, controllerNodeId, controllerGroupConfig.getQuorumVoters(), controllerGroupConfig.getListenerMap().get(controllerNodeId))); + + "--override listeners=%s %n", parameter.s3Url, controllerNodeId, controllerGroupConfig.getQuorumVoters(), controllerGroupConfig.getListenerMap().get(controllerNodeId)); } else { - System.out.println(String.format("bin/kafka-server-start.sh " + System.out.printf("bin/kafka-server-start.sh " + "--s3-url=\"%s\" " + "--override process.roles=broker,controller " + "--override node.id=%s " + "--override controller.quorum.voters=%s " + "--override listeners=%s " - + "--override advertised.listeners=%s ", parameter.s3Url, controllerNodeId, controllerGroupConfig.getQuorumVoters(), controllerGroupConfig.getListenerMap().get(controllerNodeId), controllerGroupConfig.getAdvertisedListenerMap().get(controllerNodeId))); + + "--override advertised.listeners=%s %n", parameter.s3Url, controllerNodeId, controllerGroupConfig.getQuorumVoters(), controllerGroupConfig.getListenerMap().get(controllerNodeId), controllerGroupConfig.getAdvertisedListenerMap().get(controllerNodeId)); } System.out.println(); } - - for (int brokerNodeId : brokerGroupConfig.getNodeIdList()) { - System.out.println(String.format("bin/kafka-server-start.sh " - + "--s3-url=\"%s\" " - + "--override process.roles=broker " - + "--override node.id=%s " - + "--override controller.quorum.voters=%s " - + "--override listeners=%s " - + "--override advertised.listeners=%s ", parameter.s3Url, brokerNodeId, brokerGroupConfig.getQuorumVoters(), brokerGroupConfig.getListenerMap().get(brokerNodeId), brokerGroupConfig.getAdvertisedListenerMap().get(brokerNodeId))); - System.out.println(); + Set nodes = convert2Nodes(parameter.brokerList); + if (!parameter.controllerOnlyMode) { + Optional.ofNullable(convert2Nodes(parameter.controllerList)).ifPresent(nodes::removeAll); + } + if (!nodes.isEmpty()) { + for (int brokerNodeId : brokerGroupConfig.getNodeIdList()) { + System.out.printf("bin/kafka-server-start.sh " + + "--s3-url=\"%s\" " + + "--override process.roles=broker " + + "--override node.id=%s " + + "--override controller.quorum.voters=%s " + + "--override listeners=%s " + + "--override advertised.listeners=%s %n", parameter.s3Url, brokerNodeId, brokerGroupConfig.getQuorumVoters(), brokerGroupConfig.getListenerMap().get(brokerNodeId), brokerGroupConfig.getAdvertisedListenerMap().get(brokerNodeId)); + System.out.println(); + } } System.out.println(); System.out.println("TIPS: Start controllers first and then the brokers."); System.out.println(); } + + private Set convert2Nodes(String list) { + return Arrays.stream(list.split(";")).map(broker -> broker.split(":")[0]).collect(Collectors.toSet()); + } + }