Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(issues751): support quota throttling #758

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 66 additions & 29 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ClientInformation, KafkaChannel, ListenerName, ListenerReconfigurable, NetworkSend, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, RequestHeader}
Expand Down Expand Up @@ -921,7 +920,7 @@ private[kafka] class Processor(
() => apiVersionManager.apiVersionResponse(throttleTimeMs = 0)
)
)
private val orderedResponses = new ConcurrentHashMap[String, OrderedResponse]()
private val channelContexts = new ConcurrentHashMap[String, ChannelContext]()

// Visible to override for testing
protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
Expand Down Expand Up @@ -1020,12 +1019,17 @@ private[kafka] class Processor(
trace("Closing socket connection actively according to the response code.")
close(channelId)
case _: StartThrottlingResponse =>
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
val channelContext = channelContexts.get(channelId)
if (channelContext != null) {
channelContext.markThrottle()
selector.mute(channelId)
}
case _: EndThrottlingResponse =>
// Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
// the client.
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
tryUnmuteChannel(channelId)
val channelContext = channelContexts.get(channelId)
val unmute = channelContext == null || channelContext.clearThrottle()
if (unmute) {
selector.unmute(channelId)
}
case _ =>
throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
}
Expand Down Expand Up @@ -1112,18 +1116,18 @@ private[kafka] class Processor(
apiVersionsRequest.data.clientSoftwareVersion))
}
}
val ordering = orderedResponses.computeIfAbsent(connectionId, _ => new OrderedResponse(new ConcurrentLinkedQueue[Int](), new ConcurrentHashMap[Int, RequestChannel.Response]()))
ordering.nextCorrelationId.add(req.context.correlationId())
val channelContext = channelContexts.computeIfAbsent(connectionId, _ => new ChannelContext(new ConcurrentLinkedQueue[Int](), new ConcurrentHashMap[Int, RequestChannel.Response]()))
channelContext.nextCorrelationId.add(req.context.correlationId())
requestChannel.sendRequest(req)

// AutoMQ for Kafka inject start
// AutoMQ will pipeline the requests to accelerate the performance and also keep the request order.

// Mute the channel if the inflight requests exceed the threshold.
if (ordering.nextCorrelationId.size() >= 8 && !channel.isMuted) {
info(s"Mute channel ${channel.id} because the inflight requests exceed the threshold, inflight count is ${ordering.nextCorrelationId.size()}.")
if (channelContext.nextCorrelationId.size() >= 8 && !channel.isMuted) {
info(s"Mute channel ${channel.id} because the inflight requests exceed the threshold, inflight count is ${channelContext.nextCorrelationId.size()}.")
channelContext.markQueueFull()
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}

// AutoMQ for Kafka inject end
Expand Down Expand Up @@ -1161,16 +1165,22 @@ private[kafka] class Processor(
// Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
// it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling
// delay has already passed by now.

val orderedResponse = orderedResponses.get(send.destinationId)
val channelContext = channelContexts.get(send.destinationId)
openOrClosingChannel(send.destinationId).foreach(channel => {
if (channel.isMuted && (orderedResponse == null || orderedResponse.nextCorrelationId.size() < 8)) {
info(s"Unmute channel ${send.destinationId} because the inflight requests are below the threshold.")
channel.handleChannelMuteEvent(ChannelMuteEvent.RESPONSE_SENT)
selector.unmute(channel.id)
if (channel.isMuted) {
val unmute = if (channelContext == null) {
true
} else if (channelContext.nextCorrelationId.size() < 8 && channelContext.clearQueueFull()) {
info(s"Unmute channel ${send.destinationId} because the inflight requests are below the threshold.")
true
} else {
false
}
if (unmute) {
selector.unmute(channel.id)
}
}
})

// AutoMQ for Kafka inject end
} catch {
case e: Throwable => processChannelException(send.destinationId,
Expand Down Expand Up @@ -1235,7 +1245,7 @@ private[kafka] class Processor(
}
remove
})
orderedResponses.remove(connectionId)
channelContexts.remove(connectionId)
// inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// AutoMQ for Kafka inject end
}
Expand Down Expand Up @@ -1315,6 +1325,13 @@ private[kafka] class Processor(


private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
response match {
case _: StartThrottlingResponse | _: EndThrottlingResponse =>
responseQueue.put(response)
return
case _ => // continue
}

// AutoMQ for Kafka inject start
val connectionId = response.request.context.connectionId
val originHeader = response.request.context.originHeader()
Expand All @@ -1323,7 +1340,7 @@ private[kafka] class Processor(
} else {
response.request.header.correlationId()
}
val orderedResponse = orderedResponses.get(connectionId)
val orderedResponse = channelContexts.get(connectionId)
if (orderedResponse == null) {
// connection closed
responseQueue.put(response)
Expand Down Expand Up @@ -1365,13 +1382,13 @@ private[kafka] class Processor(

// Indicate the specified channel that the specified channel mute-related event has happened so that it can change its
// mute state.
private def handleChannelMuteEvent(connectionId: String, event: ChannelMuteEvent): Unit = {
openOrClosingChannel(connectionId).foreach(c => c.handleChannelMuteEvent(event))
}

private def tryUnmuteChannel(connectionId: String): Unit = {
openOrClosingChannel(connectionId).foreach(c => selector.unmute(c.id))
}
// private def handleChannelMuteEvent(connectionId: String, event: ChannelMuteEvent): Unit = {
// openOrClosingChannel(connectionId).foreach(c => c.handleChannelMuteEvent(event))
// }
//
// private def tryUnmuteChannel(connectionId: String): Unit = {
// openOrClosingChannel(connectionId).foreach(c => selector.unmute(c.id))
// }

/* For test usage */
private[network] def channel(connectionId: String): Option[KafkaChannel] =
Expand Down Expand Up @@ -1401,7 +1418,27 @@ private[kafka] class Processor(
}
}

class OrderedResponse(val nextCorrelationId: util.Queue[Int], val responses: util.Map[Int, RequestChannel.Response])
class ChannelContext(val nextCorrelationId: util.Queue[Int], val responses: util.Map[Int, RequestChannel.Response]) {

var muteFlag: Int = 0
def markThrottle(): Unit = {
muteFlag = muteFlag | 1
}

def clearThrottle(): Boolean = {
muteFlag = muteFlag & (~1)
muteFlag == 0
}

def markQueueFull(): Unit = {
muteFlag = muteFlag | 2
}

def clearQueueFull(): Boolean = {
muteFlag = muteFlag & (~2)
muteFlag == 0
}
}

/**
* Interface for connection quota configuration. Connection quotas can be configured at the
Expand Down
Loading