From d98bf96c87ff4d94279b0d5da7d35dfb858f780f Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 15 Aug 2024 18:02:16 +0800 Subject: [PATCH] fix(cli): add cli utils back Signed-off-by: Robin Han --- .../java/com/automq/shell/util/CLIUtils.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 automq-shell/src/main/java/com/automq/shell/util/CLIUtils.java diff --git a/automq-shell/src/main/java/com/automq/shell/util/CLIUtils.java b/automq-shell/src/main/java/com/automq/shell/util/CLIUtils.java new file mode 100644 index 0000000000..d34d3f5cdf --- /dev/null +++ b/automq-shell/src/main/java/com/automq/shell/util/CLIUtils.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.shell.util; + +import java.util.Collections; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.ManualMetadataUpdater; +import org.apache.kafka.clients.MetadataRecoveryStrategy; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; + +public class CLIUtils { + public static NetworkClient buildNetworkClient( + String prefix, + AdminClientConfig config, + Metrics metrics, + Time time, + LogContext logContext) { + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); + + String metricGroupPrefix = prefix + "-channel"; + + Selector selector = new Selector( + NetworkReceive.UNLIMITED, + config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), + metrics, + time, + metricGroupPrefix, + Collections.emptyMap(), + false, + channelBuilder, + logContext + ); + + String clientId = prefix + "-network-client"; + return new NetworkClient( + selector, + new ManualMetadataUpdater(), + clientId, + 100, + config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG), + config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG), + config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG), + config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG), + config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), + config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), + config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), + time, + false, + new ApiVersions(), + logContext, + MetadataRecoveryStrategy.NONE + ); + } +} \ No newline at end of file