Skip to content

Commit

Permalink
Merge pull request #856 from iamgd67/brokerConfigs
Browse files Browse the repository at this point in the history
Broker dynamic configs support
  • Loading branch information
patelh authored Dec 12, 2022
2 parents 15c7777 + 5a050fb commit 65b0b11
Show file tree
Hide file tree
Showing 17 changed files with 612 additions and 20 deletions.
104 changes: 102 additions & 2 deletions app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

package controllers

import java.util.Properties

import features.{ApplicationFeatures, KMClusterManagerFeature}
import kafka.manager.ApiError
import kafka.manager.features.ClusterFeatures
import kafka.manager.model.ActorModel.BrokerIdentity
import kafka.manager.model._
import kafka.manager.utils.BrokerConfigs
import models.FollowLink
import models.form._
import models.navigation.Menus
Expand Down Expand Up @@ -205,8 +210,103 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
}

def broker(c: String, b: Int) = Action.async { implicit request: RequestHeader =>
kafkaManager.getBrokerView(c,b).map { errorOrBrokerView =>
Ok(views.html.broker.brokerView(c,b,errorOrBrokerView)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
val futureErrorOrBrokerIdentity = kafkaManager.getBrokerIdentity(c,b)
kafkaManager.getBrokerView(c,b).zip(futureErrorOrBrokerIdentity).map {
case (errorOrBrokerView,errorOrBrokerIdentity) =>
var newRst = errorOrBrokerView
errorOrBrokerIdentity.map(bi=>{
newRst = errorOrBrokerView.map(x=>x.copy(broker=Option(bi)))
})
Ok(views.html.broker.brokerView(c,b,newRst)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
}
}

val defaultUpdateBrokerConfigForm = Form(
mapping(
"broker" -> number,
"configs" -> list(
mapping(
"name" -> nonEmptyText,
"value" -> optional(text),
"help" -> optional(text),
)(BConfig.apply)(BConfig.unapply)
),
"readVersion" -> number(min = -1)
)(UpdateBrokerConfig.apply)(UpdateBrokerConfig.unapply)
)

private def updateBrokerConfigForm(clusterName: String, broker: BrokerIdentity) = {
kafkaManager.getClusterConfig(clusterName).map { errorOrConfig =>
errorOrConfig.map { clusterConfig =>
val defaultConfigs = clusterConfig.version match {
//todo add other version configs
case Kafka_0_10_1_1 => BrokerConfigs.configNamesAndDoc(Kafka_0_10_1_1).map { case (n, h) => (n,BConfig(n,None, Option(h))) }
case _=> BrokerConfigs.configNamesAndDoc(Kafka_0_10_1_1).map { case (n, h) => (n,BConfig(n,None, Option(h))) }
}
val updatedConfigMap = broker.config.toMap
val updatedConfigList = defaultConfigs.map {
case (n, cfg) =>
if(updatedConfigMap.contains(n)) {
cfg.copy(value = Option(updatedConfigMap(n)))
} else {
cfg
}
}
(defaultUpdateBrokerConfigForm.fill(UpdateBrokerConfig(broker.id,updatedConfigList.toList,broker.configReadVersion)),
clusterName)
}
}
}

def updateBrokerConfig(clusterName: String, broker: Int) = Action.async { implicit request:RequestHeader =>
featureGate(KMClusterManagerFeature) {
val errorOrFormFuture = kafkaManager.getBrokerIdentity(clusterName, broker).flatMap { errorOrBrokerIdentity =>
errorOrBrokerIdentity.fold(e => Future.successful(-\/(e)), { brokerIdentity =>
updateBrokerConfigForm(clusterName, brokerIdentity)
})
}
errorOrFormFuture.map { errorOrForm =>
Ok(views.html.broker.updateConfig(clusterName, broker, errorOrForm)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
}
}
}

def handleUpdateBrokerConfig(clusterName: String, broker: Int) = Action.async { implicit request:Request[AnyContent] =>
featureGate(KMClusterManagerFeature) {

defaultUpdateBrokerConfigForm.bindFromRequest.fold(
formWithErrors => {
kafkaManager.getClusterContext(clusterName).map { clusterContext =>
BadRequest(views.html.broker.updateConfig(clusterName, broker,clusterContext.map(c =>(formWithErrors,clusterName))))
}.recover {
case t =>
implicit val clusterFeatures = ClusterFeatures.default
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName, "Broker", "Brokers View", menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndCluster("Broker View", clusterName, "Update Config"),
-\/(ApiError(s"Unknown error : ${t.getMessage}")),
"Update Config",
FollowLink("Try again.", routes.Cluster.updateBrokerConfig(clusterName, broker).toString()),
FollowLink("Try again.", routes.Cluster.updateBrokerConfig(clusterName, broker).toString())
)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
}
},
updateBrokerConfig => {
val props = new Properties()
updateBrokerConfig.configs.filter(_.value.isDefined).foreach(c => props.setProperty(c.name, c.value.get))
kafkaManager.updateBrokerConfig(clusterName, updateBrokerConfig.broker, props, updateBrokerConfig.readVersion).map { errorOrSuccess =>
implicit val clusterFeatures = errorOrSuccess.toOption.map(_.clusterFeatures).getOrElse(ClusterFeatures.default)
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName, "Topic", "Topic View", menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndCluster("Broker View", clusterName, "Update Config"),
errorOrSuccess,
"Update Config",
FollowLink("Go to Broker view.", routes.Cluster.broker(clusterName, updateBrokerConfig.broker).toString()),
FollowLink("Try again.", routes.Cluster.updateBrokerConfig(clusterName, broker).toString())
)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
}
}
)
}
}

Expand Down
44 changes: 44 additions & 0 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,25 @@ class KafkaManager(akkaConfig: Config) extends Logging {
}
}

def updateBrokerConfig(
clusterName: String,
broker: Int,
config: Properties,
readVersion: Int
) =
{
implicit val ec = apiExecutionContext
withKafkaManagerActor(
KMClusterCommandRequest(
clusterName,
CMUpdateBrokerConfig(broker, config, readVersion)
)
) {
result: Future[CMCommandResult] =>
result.map(cmr => toDisjunction(cmr.result))
}
}

def updateTopicConfig(
clusterName: String,
topic: String,
Expand Down Expand Up @@ -777,6 +796,31 @@ class KafkaManager(akkaConfig: Config) extends Logging {
}
}

def getBrokerIdentity(clusterName: String, broker: Int): Future[ApiError \/ BrokerIdentity] = {
val futureCMBrokerIdentity = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, CMGetBrokerIdentity(broker)))(
identity[Option[CMBrokerIdentity]]
)
implicit val ec = apiExecutionContext
futureCMBrokerIdentity.map[ApiError \/ BrokerIdentity] { errOrTI =>
errOrTI.fold[ApiError \/ BrokerIdentity](
{ err: ApiError =>
-\/[ApiError](err)
}, { tiOption: Option[CMBrokerIdentity] =>
tiOption.fold[ApiError \/ BrokerIdentity] {
-\/(ApiError(s"Broker not found $broker for cluster $clusterName"))
} { cmBrokerIdentity =>
cmBrokerIdentity.brokerIdentity match {
case scala.util.Failure(t) =>
-\/[ApiError](t)
case scala.util.Success(ti) =>
\/-(ti)
}
}
}
)
}
}

def getTopicIdentity(clusterName: String, topic: String): Future[ApiError \/ TopicIdentity] = {
val futureCMTopicIdentity = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, CMGetTopicIdentity(topic)))(
identity[Option[CMTopicIdentity]]
Expand Down
41 changes: 41 additions & 0 deletions app/kafka/manager/actor/cluster/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,27 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
} yield CMView(tl.list.size, bl.list.size, clusterContext)
result pipeTo sender

case CMGetBrokerIdentity(id) =>
implicit val ec = context.dispatcher
val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
val eventualBrokerConfig = withKafkaStateActor(KSGetBrokerDescription(id))(identity[Option[BrokerDescription]])
val result: Future[Option[CMBrokerIdentity]] = for {
bl <- eventualBrokerList
bc <- eventualBrokerConfig
} yield bl.list.find(x => x.id == id)
.flatMap { x =>
if(bc.isEmpty){
Option(CMBrokerIdentity(Try(x)))
}
else{
bc.map(c => TopicIdentity.parseCofig(c.config))
.map(c => BrokerIdentity(x.id, x.host, x.jmxPort, x.secure, x.nonSecure, x.endpoints, c._2.toList, c._1))
.map(b => CMBrokerIdentity(Try(b)))
}

}
result pipeTo sender

case CMGetTopicIdentity(topic) =>
implicit val ec = context.dispatcher
val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
Expand Down Expand Up @@ -447,6 +468,26 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}
} pipeTo sender()

case CMUpdateBrokerConfig(broker, config, readVersion) =>
implicit val ec = longRunningExecutionContext
val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
eventualBrokerList.map{
bl=>{
val exist = bl.list.exists(x=>x.id==broker)
if(!exist){
Future.successful(CMCommandResult(Failure(new IllegalArgumentException(s"Broker doesn't exist : $broker"))))
}
else{
withKafkaCommandActor(KCUpdateBrokerConfig(broker, config, readVersion))
{
kcResponse: KCCommandResult =>
CMCommandResult(kcResponse.result)
}
}
}
} pipeTo sender()


case CMUpdateTopicConfig(topic, config, readVersion) =>
implicit val ec = longRunningExecutionContext
val eventualTopicDescription = withKafkaStateActor(KSGetTopicDescription(topic))(identity[Option[TopicDescription]])
Expand Down
8 changes: 8 additions & 0 deletions app/kafka/manager/actor/cluster/KafkaCommandActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend
})
}
}
case KCUpdateBrokerConfig(broker, config, readVersion) =>
longRunning {
Future {
KCCommandResult(Try {
kafkaCommandActorConfig.adminUtils.changeBrokerConfig(kafkaCommandActorConfig.curator, broker, config, readVersion)
})
}
}
case KCUpdateTopicConfig(topic, config, readVersion) =>
longRunning {
Future {
Expand Down
20 changes: 20 additions & 0 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,8 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom

private[this] val topicsConfigPathCache = new PathChildrenCache(config.curator,ZkUtils.TopicConfigPath,true)

private[this] val brokerConfigPathCache = new PathChildrenCache(config.curator,ZkUtils.BrokerConfigPath,true)

private[this] val brokersPathCache = new PathChildrenCache(config.curator,ZkUtils.BrokerIdsPath,true)

private[this] val adminPathCache = new PathChildrenCache(config.curator,ZkUtils.AdminPath,true)
Expand Down Expand Up @@ -1113,6 +1115,8 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
topicsTreeCache.start()
log.info("Starting topics config path cache...")
topicsConfigPathCache.start(StartMode.BUILD_INITIAL_CACHE)
log.info("Starting brokers config path cache...")
brokerConfigPathCache.start(StartMode.BUILD_INITIAL_CACHE)
log.info("Starting brokers path cache...")
brokersPathCache.start(StartMode.BUILD_INITIAL_CACHE)
log.info("Starting admin path cache...")
Expand Down Expand Up @@ -1162,6 +1166,8 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
Try(brokersPathCache.close())
log.info("Shutting down topics config path cache...")
Try(topicsConfigPathCache.close())
log.info("Shutting down brokers config path cache...")
Try(brokerConfigPathCache.close())
log.info("Shutting down topics tree cache...")
Try(topicsTreeCache.close())

Expand All @@ -1185,6 +1191,11 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
partitionOffsets
}

def getBrokerDescription(broker:Int): Option[BrokerDescription] ={
val brokerConfig = getBrokerConfigString(broker)
brokerConfig.map(c=> BrokerDescription(broker,Option(c)))
}

def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = {
for {
description <- getTopicZookeeperData(topic)
Expand Down Expand Up @@ -1229,6 +1240,12 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
result.map(cd => (cd.getStat.getVersion,asString(cd.getData)))
}

private[this] def getBrokerConfigString(broker: Int) : Option[(Int,String)] = {
val data: mutable.Buffer[ChildData] = brokerConfigPathCache.getCurrentData.asScala
val result: Option[ChildData] = data.find(p => p.getPath.endsWith("/" + broker.toString))
result.map(cd => (cd.getStat.getVersion,asString(cd.getData)))
}

override def processActorResponse(response: ActorResponse): Unit = {
response match {
case any: Any => log.warning("ksa : processActorResponse : Received unknown message: {}", any.toString)
Expand Down Expand Up @@ -1291,6 +1308,9 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
case KSGetTopicDescription(topic) =>
sender ! getTopicDescription(topic, false)

case KSGetBrokerDescription(broker)=>
sender ! getBrokerDescription(broker)

case KSGetTopicDescriptions(topics) =>
sender ! TopicDescriptions(topics.toIndexedSeq.flatMap(getTopicDescription(_, false)), topicsTreeCacheLastUpdateMillis)

Expand Down
Loading

0 comments on commit 65b0b11

Please sign in to comment.