From a7250cd750dbf0716959128e334cbc323134d8ea Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Fri, 1 Nov 2024 10:19:22 +0800 Subject: [PATCH] perf: limit the inflight requests (#2100) * docs: add todos Signed-off-by: Ning Yu * perf(network): limit the inflight requests by size Signed-off-by: Ning Yu * perf(ReplicaManager): limit the queue size of the `fetchExecutor`s Signed-off-by: Ning Yu * perf(KafkaApis): limit the queue size of async request handlers Signed-off-by: Ning Yu * refactor(network): make "queued.max.requests.size.bytes" configurable Signed-off-by: Ning Yu * style: fix lint Signed-off-by: Ning Yu * fix(network): limit the min queued request size per queue Signed-off-by: Ning Yu --------- Signed-off-by: Ning Yu --- core/src/main/resources/jmx/rules/broker.yaml | 9 +++- .../scala/kafka/network/RequestChannel.scala | 48 +++++++++++++++++-- .../scala/kafka/network/SocketServer.scala | 5 +- .../main/scala/kafka/server/KafkaConfig.scala | 5 +- .../kafka/server/KafkaRequestHandler.scala | 2 +- .../streamaspect/ElasticKafkaApis.scala | 11 +++-- .../streamaspect/ElasticReplicaManager.scala | 5 +- .../kafka/network/SocketServerConfigs.java | 9 ++++ 8 files changed, 79 insertions(+), 15 deletions(-) diff --git a/core/src/main/resources/jmx/rules/broker.yaml b/core/src/main/resources/jmx/rules/broker.yaml index ee2b066388..95576f5413 100644 --- a/core/src/main/resources/jmx/rules/broker.yaml +++ b/core/src/main/resources/jmx/rules/broker.yaml @@ -207,6 +207,13 @@ rules: type: gauge desc: Size of the request queue + - bean: kafka.network:type=RequestChannel,name=AvailableRequestSize + mapping: + Value: + metric: kafka.available.request.size + type: gauge + desc: Remaining permitted request size in the request queue + - bean: kafka.network:type=RequestChannel,name=ResponseQueueSize mapping: Value: @@ -370,4 +377,4 @@ rules: connection-accept-throttle-time: metric: kafka.listener.connection.accept.throttle.time type: gauge - desc: The average throttle-time pre listener \ No newline at end of file + desc: The average throttle-time pre listener diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 3ec88d807a..1ff64338f3 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -50,6 +50,10 @@ object RequestChannel extends Logging { private val ResponseQueueSizeMetric = "ResponseQueueSize" val ProcessorMetricTag = "processor" + // AutoMQ inject start + private val AvailableRequestSizeMetric = "AvailableRequestSize" + // AutoMQ inject end + private def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled sealed trait BaseRequest @@ -347,6 +351,9 @@ object RequestChannel extends Logging { } class RequestChannel(val queueSize: Int, + // AutoMQ inject start + val queuedRequestSize: Int, + // AutoMQ inject end val metricNamePrefix: String, time: Time, val metrics: RequestChannel.Metrics) { @@ -355,12 +362,23 @@ class RequestChannel(val queueSize: Int, private val metricsGroup = new KafkaMetricsGroup(this.getClass) private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) + // AutoMQ inject start + /** + * Queue of requests to be handled, in the order they arrived. + * Note: Before any request enters this queue, it needs to acquire {@link multiQueuedRequestSizeSemaphore} + */ private val multiRequestQueue = new java.util.ArrayList[ArrayBlockingQueue[BaseRequest]]() - + /** + * Semaphore to limit the total size of requests in the {@link multiRequestQueue}. + */ + private val multiQueuedRequestSizeSemaphore = new java.util.ArrayList[Semaphore]() + private val availableRequestSizeMetricName = metricNamePrefix.concat(AvailableRequestSizeMetric) + // AutoMQ inject end private val processors = new ConcurrentHashMap[Int, Processor]() private val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric) private val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric) private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize) + // AutoMQ inject start private val multiCallbackQueue = new java.util.ArrayList[ArrayBlockingQueue[BaseRequest]]() private var notifiedShutdown = false @@ -371,6 +389,14 @@ class RequestChannel(val queueSize: Int, requestQueue.size() } }) + metricsGroup.newGauge(availableRequestSizeMetricName, () => { + multiQueuedRequestSizeSemaphore.stream().mapToInt(s => s.availablePermits()).sum() + }) + + def this(queueSize: Int, metricNamePrefix: String, time: Time, metrics: RequestChannel.Metrics) { + this(queueSize, Integer.MAX_VALUE, metricNamePrefix, time, metrics) + } + // AutoMQ inject end metricsGroup.newGauge(responseQueueSizeMetricName, () => { processors.values.asScala.foldLeft(0) {(total, processor) => @@ -386,14 +412,21 @@ class RequestChannel(val queueSize: Int, Map(ProcessorMetricTag -> processor.id.toString).asJava) } - def registerNRequestHandler(count: Int): util.List[BlockingQueue[BaseRequest]] = { + // AutoMQ inject start + def registerNRequestHandler(count: Int): Unit = { val queueSize = math.max(this.queueSize / count, 1) - for (i <- 0 until count) { + // TODO: maxQueuedRequestSize will be 100 / 8 = 12.5 MiB as a default. + // However, if the request size is too large, it will block at the semaphore. + // Currently, the max request size is 1 MiB (max.request.size) by default, so it is not very problematic. + val maxQueuedRequestSize = math.max(this.queuedRequestSize / count, 10 * 1024 * 1024) + for (_ <- 0 until count) { multiRequestQueue.add(new ArrayBlockingQueue[BaseRequest](queueSize)) + multiQueuedRequestSizeSemaphore.add(new Semaphore(maxQueuedRequestSize)) multiCallbackQueue.add(new ArrayBlockingQueue[BaseRequest](queueSize)) } Collections.unmodifiableList(multiRequestQueue) } + // AutoMQ inject end def removeProcessor(processorId: Int): Unit = { processors.remove(processorId) @@ -403,6 +436,8 @@ class RequestChannel(val queueSize: Int, /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request): Unit = { if (multiRequestQueue.size() != 0) { + val requestSizeSemaphore = multiQueuedRequestSizeSemaphore.get(math.abs(request.context.connectionId.hashCode % multiQueuedRequestSizeSemaphore.size())) + requestSizeSemaphore.acquire(request.sizeInBytes) val requestQueue = multiRequestQueue.get(math.abs(request.context.connectionId.hashCode % multiRequestQueue.size())) requestQueue.put(request) } else { @@ -505,9 +540,11 @@ class RequestChannel(val queueSize: Int, } } + // AutoMQ inject start def receiveRequest(timeout: Long, id: Int): RequestChannel.BaseRequest = { val callbackQueue = multiCallbackQueue.get(id) val requestQueue = multiRequestQueue.get(id) + val requestSizeSemaphore = multiQueuedRequestSizeSemaphore.get(id) val callbackRequest = callbackQueue.poll() if (callbackRequest != null) callbackRequest @@ -515,11 +552,14 @@ class RequestChannel(val queueSize: Int, val request = requestQueue.poll(timeout, TimeUnit.MILLISECONDS) request match { case WakeupRequest => callbackQueue.poll() + case request: Request => + requestSizeSemaphore.release(request.sizeInBytes) + request case _ => request } } } - + // AutoMQ inject end /** Get the next request or block until there is one */ @Deprecated def receiveRequest(): RequestChannel.BaseRequest = diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index cc9018e171..8ddd9f5f01 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -97,7 +97,10 @@ class SocketServer(val config: KafkaConfig, private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE // data-plane private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics) + // AutoMQ inject start + // val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics) + val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, config.queuedMaxRequestSize, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics) + // AutoMQ inject end // control-plane private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 407aa1b9eb..5b167f8959 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -100,7 +100,7 @@ object KafkaConfig { AutoBalancerControllerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key)) AutoBalancerMetricsReporterConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key)) // AutoMQ inject end - + def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala @@ -435,6 +435,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val connectionsMaxIdleMs = getLong(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG) val failedAuthenticationDelayMs = getInt(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG) val queuedMaxRequests = getInt(SocketServerConfigs.QUEUED_MAX_REQUESTS_CONFIG) + // AutoMQ inject start + val queuedMaxRequestSize = getInt(SocketServerConfigs.QUEUED_MAX_REQUESTS_SIZE_BYTES_CONFIG) + // AutoMQ inject end val queuedMaxBytes = getLong(SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG) def numNetworkThreads = getInt(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG) diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 5976653cf0..f5291637a3 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -216,7 +216,7 @@ class KafkaRequestHandlerPool( this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], " val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) - var multiRequestQueue = requestChannel.registerNRequestHandler(numThreads) + requestChannel.registerNRequestHandler(numThreads) for (i <- 0 until numThreads) { createHandler(i) diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala index 40f02ad9bd..a1d432def2 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala @@ -1,6 +1,7 @@ package kafka.server.streamaspect import com.automq.stream.s3.metrics.TimerUtil +import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import com.yammer.metrics.core.Histogram import kafka.automq.zonerouter.{ClientIdMetadata, NoopProduceRouter, ProduceRouter} import kafka.coordinator.transaction.TransactionCoordinator @@ -29,7 +30,7 @@ import org.apache.kafka.common.requests.s3.AutomqZoneRouterRequest import org.apache.kafka.common.requests.{AbstractResponse, DeleteTopicsRequest, DeleteTopicsResponse, FetchRequest, FetchResponse, ProduceRequest, ProduceResponse, RequestUtils} import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, TRANSACTIONAL_ID} -import org.apache.kafka.common.utils.{ThreadUtils, Time} +import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.authorizer.Authorizer @@ -40,7 +41,7 @@ import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, Fetc import java.util import java.util.{Collections, Optional} import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import java.util.concurrent.{ExecutorService, TimeUnit} import java.util.stream.IntStream import scala.annotation.nowarn import scala.collection.{Map, Seq, mutable} @@ -76,8 +77,8 @@ class ElasticKafkaApis( tokenManager: DelegationTokenManager, apiVersionManager: ApiVersionManager, clientMetricsManager: Option[ClientMetricsManager], - val asyncHandleExecutor: ExecutorService = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("kafka-apis-async-handle-executor-%d", true)), - val listOffsetHandleExecutor: ExecutorService = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("kafka-apis-list-offset-handle-executor-%d", true)) + val deleteTopicHandleExecutor: ExecutorService = S3StreamThreadPoolMonitor.createAndMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, "kafka-apis-delete-topic-handle-executor", true, 1000), + val listOffsetHandleExecutor: ExecutorService = S3StreamThreadPoolMonitor.createAndMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, "kafka-apis-list-offset-handle-executor", true, 1000) ) extends KafkaApis(requestChannel, metadataSupport, replicaManager, groupCoordinator, txnCoordinator, autoTopicCreationManager, brokerId, config, configRepository, metadataCache, metrics, authorizer, quotas, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager, clientMetricsManager) { @@ -131,7 +132,7 @@ class ElasticKafkaApis( response.asInstanceOf[DeleteTopicsResponse].data().responses().forEach(result => { if (result.errorCode() == Errors.NONE.code()) { if (!metadataCache.autoMQVersion().isTopicCleanupByControllerSupported) { - asyncHandleExecutor.submit(new Runnable { + deleteTopicHandleExecutor.submit(new Runnable { override def run(): Unit = { topicNameToPartitionEpochsMap.get(result.name()).foreach(partitionEpochs => { ElasticLogManager.destroyLog(new TopicPartition(result.name(), partitionEpochs._1), result.topicId(), partitionEpochs._2) diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 966efbcfba..78084ef7cb 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -2,6 +2,7 @@ package kafka.server.streamaspect import com.automq.stream.api.exceptions.FastReadFailFastException import com.automq.stream.utils.FutureUtil +import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import kafka.cluster.Partition import kafka.log.remote.RemoteLogManager import kafka.log.streamaspect.{ElasticLogManager, PartitionStatusTracker, ReadHint} @@ -83,8 +84,8 @@ class ElasticReplicaManager( brokerEpochSupplier: () => Long = () => -1, addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP, - private val fastFetchExecutor: ExecutorService = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fast-fetch-executor-%d", true)), - private val slowFetchExecutor: ExecutorService = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true)), + private val fastFetchExecutor: ExecutorService = S3StreamThreadPoolMonitor.createAndMonitor(4, 4, 0L, TimeUnit.MILLISECONDS, "kafka-apis-fast-fetch-executor", true, 10000), + private val slowFetchExecutor: ExecutorService = S3StreamThreadPoolMonitor.createAndMonitor(12, 12, 0L, TimeUnit.MILLISECONDS, "kafka-apis-slow-fetch-executor", true, 10000), private val partitionMetricsCleanerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("kafka-partition-metrics-cleaner", true)), ) extends ReplicaManager(config, metrics, time, scheduler, logManager, remoteLogManager, quotaManagers, metadataCache, logDirFailureChannel, alterPartitionManager, brokerTopicStats, isShuttingDown, zkClient, delayedProducePurgatoryParam, diff --git a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java index 72c4bb53d1..b4696a9f4f 100644 --- a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java +++ b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java @@ -158,6 +158,12 @@ public class SocketServerConfigs { public static final int QUEUED_MAX_REQUESTS_DEFAULT = 500; public static final String QUEUED_MAX_REQUESTS_DOC = "The number of queued requests allowed for data-plane, before blocking the network threads"; + // AutoMQ inject start + public static final String QUEUED_MAX_REQUESTS_SIZE_BYTES_CONFIG = "queued.max.requests.size.bytes"; + public static final int QUEUED_MAX_REQUESTS_SIZE_BYTES_DEFAULT = 100 * 1024 * 1024; + public static final String QUEUED_MAX_REQUESTS_SIZE_BYTES_DOC = "The number of queued requests size in total allowed for data-plane, before blocking the network threads"; + // AutoMQ inject end + public static final String QUEUED_MAX_BYTES_CONFIG = "queued.max.request.bytes"; public static final int QUEUED_MAX_REQUEST_BYTES_DEFAULT = -1; public static final String QUEUED_MAX_REQUEST_BYTES_DOC = "The number of queued bytes allowed before no more requests are read"; @@ -182,6 +188,9 @@ public class SocketServerConfigs { .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, LONG, CONNECTIONS_MAX_IDLE_MS_DEFAULT, MEDIUM, CONNECTIONS_MAX_IDLE_MS_DOC) .define(FAILED_AUTHENTICATION_DELAY_MS_CONFIG, INT, FAILED_AUTHENTICATION_DELAY_MS_DEFAULT, atLeast(0), LOW, FAILED_AUTHENTICATION_DELAY_MS_DOC) .define(QUEUED_MAX_REQUESTS_CONFIG, INT, QUEUED_MAX_REQUESTS_DEFAULT, atLeast(1), HIGH, QUEUED_MAX_REQUESTS_DOC) + // AutoMQ inject start + .define(QUEUED_MAX_REQUESTS_SIZE_BYTES_CONFIG, INT, QUEUED_MAX_REQUESTS_SIZE_BYTES_DEFAULT, atLeast(1024 * 1024), HIGH, QUEUED_MAX_REQUESTS_SIZE_BYTES_DOC) + // AutoMQ inject end .define(QUEUED_MAX_BYTES_CONFIG, LONG, QUEUED_MAX_REQUEST_BYTES_DEFAULT, MEDIUM, QUEUED_MAX_REQUEST_BYTES_DOC) .define(NUM_NETWORK_THREADS_CONFIG, INT, NUM_NETWORK_THREADS_DEFAULT, atLeast(1), HIGH, NUM_NETWORK_THREADS_DOC); }