Skip to content

Commit

Permalink
fix(issues751): support quota throttling
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Feb 2, 2024
1 parent 256ca54 commit 443ad89
Showing 1 changed file with 66 additions and 29 deletions.
95 changes: 66 additions & 29 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ClientInformation, KafkaChannel, ListenerName, ListenerReconfigurable, NetworkSend, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, RequestHeader}
Expand Down Expand Up @@ -921,7 +920,7 @@ private[kafka] class Processor(
() => apiVersionManager.apiVersionResponse(throttleTimeMs = 0)
)
)
private val orderedResponses = new ConcurrentHashMap[String, OrderedResponse]()
private val channelContexts = new ConcurrentHashMap[String, ChannelContext]()

// Visible to override for testing
protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
Expand Down Expand Up @@ -1020,12 +1019,17 @@ private[kafka] class Processor(
trace("Closing socket connection actively according to the response code.")
close(channelId)
case _: StartThrottlingResponse =>
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
val channelContext = channelContexts.get(channelId)
if (channelContext != null) {
channelContext.markThrottle()
selector.mute(channelId)
}
case _: EndThrottlingResponse =>
// Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
// the client.
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
tryUnmuteChannel(channelId)
val channelContext = channelContexts.get(channelId)
val unmute = channelContext == null || channelContext.clearThrottle()
if (unmute) {
selector.unmute(channelId)
}
case _ =>
throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
}
Expand Down Expand Up @@ -1112,18 +1116,18 @@ private[kafka] class Processor(
apiVersionsRequest.data.clientSoftwareVersion))
}
}
val ordering = orderedResponses.computeIfAbsent(connectionId, _ => new OrderedResponse(new ConcurrentLinkedQueue[Int](), new ConcurrentHashMap[Int, RequestChannel.Response]()))
ordering.nextCorrelationId.add(req.context.correlationId())
val channelContext = channelContexts.computeIfAbsent(connectionId, _ => new ChannelContext(new ConcurrentLinkedQueue[Int](), new ConcurrentHashMap[Int, RequestChannel.Response]()))
channelContext.nextCorrelationId.add(req.context.correlationId())
requestChannel.sendRequest(req)

// AutoMQ for Kafka inject start
// AutoMQ will pipeline the requests to accelerate the performance and also keep the request order.

// Mute the channel if the inflight requests exceed the threshold.
if (ordering.nextCorrelationId.size() >= 8 && !channel.isMuted) {
info(s"Mute channel ${channel.id} because the inflight requests exceed the threshold, inflight count is ${ordering.nextCorrelationId.size()}.")
if (channelContext.nextCorrelationId.size() >= 8 && !channel.isMuted) {
info(s"Mute channel ${channel.id} because the inflight requests exceed the threshold, inflight count is ${channelContext.nextCorrelationId.size()}.")
channelContext.markQueueFull()
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}

// AutoMQ for Kafka inject end
Expand Down Expand Up @@ -1161,16 +1165,22 @@ private[kafka] class Processor(
// Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
// it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling
// delay has already passed by now.

val orderedResponse = orderedResponses.get(send.destinationId)
val channelContext = channelContexts.get(send.destinationId)
openOrClosingChannel(send.destinationId).foreach(channel => {
if (channel.isMuted && (orderedResponse == null || orderedResponse.nextCorrelationId.size() < 8)) {
info(s"Unmute channel ${send.destinationId} because the inflight requests are below the threshold.")
channel.handleChannelMuteEvent(ChannelMuteEvent.RESPONSE_SENT)
selector.unmute(channel.id)
if (channel.isMuted) {
val unmute = if (channelContext == null) {
true
} else if (channelContext.nextCorrelationId.size() < 8 && channelContext.clearQueueFull()) {
info(s"Unmute channel ${send.destinationId} because the inflight requests are below the threshold.")
true
} else {
false
}
if (unmute) {
selector.unmute(channel.id)
}
}
})

// AutoMQ for Kafka inject end
} catch {
case e: Throwable => processChannelException(send.destinationId,
Expand Down Expand Up @@ -1235,7 +1245,7 @@ private[kafka] class Processor(
}
remove
})
orderedResponses.remove(connectionId)
channelContexts.remove(connectionId)
// inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// AutoMQ for Kafka inject end
}
Expand Down Expand Up @@ -1315,6 +1325,13 @@ private[kafka] class Processor(


private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
response match {
case _: StartThrottlingResponse | _: EndThrottlingResponse =>
responseQueue.put(response)
return
case _ => // continue
}

// AutoMQ for Kafka inject start
val connectionId = response.request.context.connectionId
val originHeader = response.request.context.originHeader()
Expand All @@ -1323,7 +1340,7 @@ private[kafka] class Processor(
} else {
response.request.header.correlationId()
}
val orderedResponse = orderedResponses.get(connectionId)
val orderedResponse = channelContexts.get(connectionId)
if (orderedResponse == null) {
// connection closed
responseQueue.put(response)
Expand Down Expand Up @@ -1365,13 +1382,13 @@ private[kafka] class Processor(

// Indicate the specified channel that the specified channel mute-related event has happened so that it can change its
// mute state.
private def handleChannelMuteEvent(connectionId: String, event: ChannelMuteEvent): Unit = {
openOrClosingChannel(connectionId).foreach(c => c.handleChannelMuteEvent(event))
}

private def tryUnmuteChannel(connectionId: String): Unit = {
openOrClosingChannel(connectionId).foreach(c => selector.unmute(c.id))
}
// private def handleChannelMuteEvent(connectionId: String, event: ChannelMuteEvent): Unit = {
// openOrClosingChannel(connectionId).foreach(c => c.handleChannelMuteEvent(event))
// }
//
// private def tryUnmuteChannel(connectionId: String): Unit = {
// openOrClosingChannel(connectionId).foreach(c => selector.unmute(c.id))
// }

/* For test usage */
private[network] def channel(connectionId: String): Option[KafkaChannel] =
Expand Down Expand Up @@ -1401,7 +1418,27 @@ private[kafka] class Processor(
}
}

class OrderedResponse(val nextCorrelationId: util.Queue[Int], val responses: util.Map[Int, RequestChannel.Response])
class ChannelContext(val nextCorrelationId: util.Queue[Int], val responses: util.Map[Int, RequestChannel.Response]) {

var muteFlag: Int = 0
def markThrottle(): Unit = {
muteFlag = muteFlag | 1
}

def clearThrottle(): Boolean = {
muteFlag = muteFlag & (~1)
muteFlag == 0
}

def markQueueFull(): Unit = {
muteFlag = muteFlag | 2
}

def clearQueueFull(): Boolean = {
muteFlag = muteFlag & (~2)
muteFlag == 0
}
}

/**
* Interface for connection quota configuration. Connection quotas can be configured at the
Expand Down

0 comments on commit 443ad89

Please sign in to comment.