diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala index 5e002cca3..1d423ab81 100644 --- a/app/controllers/Cluster.scala +++ b/app/controllers/Cluster.scala @@ -5,9 +5,14 @@ package controllers +import java.util.Properties + import features.{ApplicationFeatures, KMClusterManagerFeature} import kafka.manager.ApiError +import kafka.manager.features.ClusterFeatures +import kafka.manager.model.ActorModel.BrokerIdentity import kafka.manager.model._ +import kafka.manager.utils.BrokerConfigs import models.FollowLink import models.form._ import models.navigation.Menus @@ -205,8 +210,103 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag } def broker(c: String, b: Int) = Action.async { implicit request: RequestHeader => - kafkaManager.getBrokerView(c,b).map { errorOrBrokerView => - Ok(views.html.broker.brokerView(c,b,errorOrBrokerView)).withHeaders("X-Frame-Options" -> "SAMEORIGIN") + val futureErrorOrBrokerIdentity = kafkaManager.getBrokerIdentity(c,b) + kafkaManager.getBrokerView(c,b).zip(futureErrorOrBrokerIdentity).map { + case (errorOrBrokerView,errorOrBrokerIdentity) => + var newRst = errorOrBrokerView + errorOrBrokerIdentity.map(bi=>{ + newRst = errorOrBrokerView.map(x=>x.copy(broker=Option(bi))) + }) + Ok(views.html.broker.brokerView(c,b,newRst)).withHeaders("X-Frame-Options" -> "SAMEORIGIN") + } + } + + val defaultUpdateBrokerConfigForm = Form( + mapping( + "broker" -> number, + "configs" -> list( + mapping( + "name" -> nonEmptyText, + "value" -> optional(text), + "help" -> optional(text), + )(BConfig.apply)(BConfig.unapply) + ), + "readVersion" -> number(min = -1) + )(UpdateBrokerConfig.apply)(UpdateBrokerConfig.unapply) + ) + + private def updateBrokerConfigForm(clusterName: String, broker: BrokerIdentity) = { + kafkaManager.getClusterConfig(clusterName).map { errorOrConfig => + errorOrConfig.map { clusterConfig => + val defaultConfigs = clusterConfig.version match { + //todo add other version configs + case Kafka_0_10_1_1 => BrokerConfigs.configNamesAndDoc(Kafka_0_10_1_1).map { case (n, h) => (n,BConfig(n,None, Option(h))) } + case _=> BrokerConfigs.configNamesAndDoc(Kafka_0_10_1_1).map { case (n, h) => (n,BConfig(n,None, Option(h))) } + } + val updatedConfigMap = broker.config.toMap + val updatedConfigList = defaultConfigs.map { + case (n, cfg) => + if(updatedConfigMap.contains(n)) { + cfg.copy(value = Option(updatedConfigMap(n))) + } else { + cfg + } + } + (defaultUpdateBrokerConfigForm.fill(UpdateBrokerConfig(broker.id,updatedConfigList.toList,broker.configReadVersion)), + clusterName) + } + } + } + + def updateBrokerConfig(clusterName: String, broker: Int) = Action.async { implicit request:RequestHeader => + featureGate(KMClusterManagerFeature) { + val errorOrFormFuture = kafkaManager.getBrokerIdentity(clusterName, broker).flatMap { errorOrBrokerIdentity => + errorOrBrokerIdentity.fold(e => Future.successful(-\/(e)), { brokerIdentity => + updateBrokerConfigForm(clusterName, brokerIdentity) + }) + } + errorOrFormFuture.map { errorOrForm => + Ok(views.html.broker.updateConfig(clusterName, broker, errorOrForm)).withHeaders("X-Frame-Options" -> "SAMEORIGIN") + } + } + } + + def handleUpdateBrokerConfig(clusterName: String, broker: Int) = Action.async { implicit request:Request[AnyContent] => + featureGate(KMClusterManagerFeature) { + + defaultUpdateBrokerConfigForm.bindFromRequest.fold( + formWithErrors => { + kafkaManager.getClusterContext(clusterName).map { clusterContext => + BadRequest(views.html.broker.updateConfig(clusterName, broker,clusterContext.map(c =>(formWithErrors,clusterName)))) + }.recover { + case t => + implicit val clusterFeatures = ClusterFeatures.default + Ok(views.html.common.resultOfCommand( + views.html.navigation.clusterMenu(clusterName, "Broker", "Brokers View", menus.clusterMenus(clusterName)), + models.navigation.BreadCrumbs.withNamedViewAndCluster("Broker View", clusterName, "Update Config"), + -\/(ApiError(s"Unknown error : ${t.getMessage}")), + "Update Config", + FollowLink("Try again.", routes.Cluster.updateBrokerConfig(clusterName, broker).toString()), + FollowLink("Try again.", routes.Cluster.updateBrokerConfig(clusterName, broker).toString()) + )).withHeaders("X-Frame-Options" -> "SAMEORIGIN") + } + }, + updateBrokerConfig => { + val props = new Properties() + updateBrokerConfig.configs.filter(_.value.isDefined).foreach(c => props.setProperty(c.name, c.value.get)) + kafkaManager.updateBrokerConfig(clusterName, updateBrokerConfig.broker, props, updateBrokerConfig.readVersion).map { errorOrSuccess => + implicit val clusterFeatures = errorOrSuccess.toOption.map(_.clusterFeatures).getOrElse(ClusterFeatures.default) + Ok(views.html.common.resultOfCommand( + views.html.navigation.clusterMenu(clusterName, "Topic", "Topic View", menus.clusterMenus(clusterName)), + models.navigation.BreadCrumbs.withNamedViewAndCluster("Broker View", clusterName, "Update Config"), + errorOrSuccess, + "Update Config", + FollowLink("Go to Broker view.", routes.Cluster.broker(clusterName, updateBrokerConfig.broker).toString()), + FollowLink("Try again.", routes.Cluster.updateBrokerConfig(clusterName, broker).toString()) + )).withHeaders("X-Frame-Options" -> "SAMEORIGIN") + } + } + ) } } diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 4ccf3b8b3..88901b120 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -557,6 +557,25 @@ class KafkaManager(akkaConfig: Config) extends Logging { } } + def updateBrokerConfig( + clusterName: String, + broker: Int, + config: Properties, + readVersion: Int + ) = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor( + KMClusterCommandRequest( + clusterName, + CMUpdateBrokerConfig(broker, config, readVersion) + ) + ) { + result: Future[CMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } + def updateTopicConfig( clusterName: String, topic: String, @@ -777,6 +796,31 @@ class KafkaManager(akkaConfig: Config) extends Logging { } } + def getBrokerIdentity(clusterName: String, broker: Int): Future[ApiError \/ BrokerIdentity] = { + val futureCMBrokerIdentity = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, CMGetBrokerIdentity(broker)))( + identity[Option[CMBrokerIdentity]] + ) + implicit val ec = apiExecutionContext + futureCMBrokerIdentity.map[ApiError \/ BrokerIdentity] { errOrTI => + errOrTI.fold[ApiError \/ BrokerIdentity]( + { err: ApiError => + -\/[ApiError](err) + }, { tiOption: Option[CMBrokerIdentity] => + tiOption.fold[ApiError \/ BrokerIdentity] { + -\/(ApiError(s"Broker not found $broker for cluster $clusterName")) + } { cmBrokerIdentity => + cmBrokerIdentity.brokerIdentity match { + case scala.util.Failure(t) => + -\/[ApiError](t) + case scala.util.Success(ti) => + \/-(ti) + } + } + } + ) + } + } + def getTopicIdentity(clusterName: String, topic: String): Future[ApiError \/ TopicIdentity] = { val futureCMTopicIdentity = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, CMGetTopicIdentity(topic)))( identity[Option[CMTopicIdentity]] diff --git a/app/kafka/manager/actor/cluster/ClusterManagerActor.scala b/app/kafka/manager/actor/cluster/ClusterManagerActor.scala index 0268d632b..e8b0479e0 100644 --- a/app/kafka/manager/actor/cluster/ClusterManagerActor.scala +++ b/app/kafka/manager/actor/cluster/ClusterManagerActor.scala @@ -276,6 +276,27 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig) } yield CMView(tl.list.size, bl.list.size, clusterContext) result pipeTo sender + case CMGetBrokerIdentity(id) => + implicit val ec = context.dispatcher + val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList]) + val eventualBrokerConfig = withKafkaStateActor(KSGetBrokerDescription(id))(identity[Option[BrokerDescription]]) + val result: Future[Option[CMBrokerIdentity]] = for { + bl <- eventualBrokerList + bc <- eventualBrokerConfig + } yield bl.list.find(x => x.id == id) + .flatMap { x => + if(bc.isEmpty){ + Option(CMBrokerIdentity(Try(x))) + } + else{ + bc.map(c => TopicIdentity.parseCofig(c.config)) + .map(c => BrokerIdentity(x.id, x.host, x.jmxPort, x.secure, x.nonSecure, x.endpoints, c._2.toList, c._1)) + .map(b => CMBrokerIdentity(Try(b))) + } + + } + result pipeTo sender + case CMGetTopicIdentity(topic) => implicit val ec = context.dispatcher val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList]) @@ -447,6 +468,26 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig) } } pipeTo sender() + case CMUpdateBrokerConfig(broker, config, readVersion) => + implicit val ec = longRunningExecutionContext + val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList]) + eventualBrokerList.map{ + bl=>{ + val exist = bl.list.exists(x=>x.id==broker) + if(!exist){ + Future.successful(CMCommandResult(Failure(new IllegalArgumentException(s"Broker doesn't exist : $broker")))) + } + else{ + withKafkaCommandActor(KCUpdateBrokerConfig(broker, config, readVersion)) + { + kcResponse: KCCommandResult => + CMCommandResult(kcResponse.result) + } + } + } + } pipeTo sender() + + case CMUpdateTopicConfig(topic, config, readVersion) => implicit val ec = longRunningExecutionContext val eventualTopicDescription = withKafkaStateActor(KSGetTopicDescription(topic))(identity[Option[TopicDescription]]) diff --git a/app/kafka/manager/actor/cluster/KafkaCommandActor.scala b/app/kafka/manager/actor/cluster/KafkaCommandActor.scala index d9986e5e2..b0b5535e3 100644 --- a/app/kafka/manager/actor/cluster/KafkaCommandActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaCommandActor.scala @@ -106,6 +106,14 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend }) } } + case KCUpdateBrokerConfig(broker, config, readVersion) => + longRunning { + Future { + KCCommandResult(Try { + kafkaCommandActorConfig.adminUtils.changeBrokerConfig(kafkaCommandActorConfig.curator, broker, config, readVersion) + }) + } + } case KCUpdateTopicConfig(topic, config, readVersion) => longRunning { Future { diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 081d09902..4a240c406 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -996,6 +996,8 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom private[this] val topicsConfigPathCache = new PathChildrenCache(config.curator,ZkUtils.TopicConfigPath,true) + private[this] val brokerConfigPathCache = new PathChildrenCache(config.curator,ZkUtils.BrokerConfigPath,true) + private[this] val brokersPathCache = new PathChildrenCache(config.curator,ZkUtils.BrokerIdsPath,true) private[this] val adminPathCache = new PathChildrenCache(config.curator,ZkUtils.AdminPath,true) @@ -1113,6 +1115,8 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom topicsTreeCache.start() log.info("Starting topics config path cache...") topicsConfigPathCache.start(StartMode.BUILD_INITIAL_CACHE) + log.info("Starting brokers config path cache...") + brokerConfigPathCache.start(StartMode.BUILD_INITIAL_CACHE) log.info("Starting brokers path cache...") brokersPathCache.start(StartMode.BUILD_INITIAL_CACHE) log.info("Starting admin path cache...") @@ -1162,6 +1166,8 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom Try(brokersPathCache.close()) log.info("Shutting down topics config path cache...") Try(topicsConfigPathCache.close()) + log.info("Shutting down brokers config path cache...") + Try(brokerConfigPathCache.close()) log.info("Shutting down topics tree cache...") Try(topicsTreeCache.close()) @@ -1185,6 +1191,11 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom partitionOffsets } + def getBrokerDescription(broker:Int): Option[BrokerDescription] ={ + val brokerConfig = getBrokerConfigString(broker) + brokerConfig.map(c=> BrokerDescription(broker,Option(c))) + } + def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = { for { description <- getTopicZookeeperData(topic) @@ -1229,6 +1240,12 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom result.map(cd => (cd.getStat.getVersion,asString(cd.getData))) } + private[this] def getBrokerConfigString(broker: Int) : Option[(Int,String)] = { + val data: mutable.Buffer[ChildData] = brokerConfigPathCache.getCurrentData.asScala + val result: Option[ChildData] = data.find(p => p.getPath.endsWith("/" + broker.toString)) + result.map(cd => (cd.getStat.getVersion,asString(cd.getData))) + } + override def processActorResponse(response: ActorResponse): Unit = { response match { case any: Any => log.warning("ksa : processActorResponse : Received unknown message: {}", any.toString) @@ -1291,6 +1308,9 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom case KSGetTopicDescription(topic) => sender ! getTopicDescription(topic, false) + case KSGetBrokerDescription(broker)=> + sender ! getBrokerDescription(broker) + case KSGetTopicDescriptions(topics) => sender ! TopicDescriptions(topics.toIndexedSeq.flatMap(getTopicDescription(_, false)), topicsTreeCacheLastUpdateMillis) diff --git a/app/kafka/manager/model/ActorModel.scala b/app/kafka/manager/model/ActorModel.scala index 1d4a328da..40d1de4c1 100644 --- a/app/kafka/manager/model/ActorModel.scala +++ b/app/kafka/manager/model/ActorModel.scala @@ -52,7 +52,9 @@ object ActorModel { case class BVView(topicPartitions: Map[TopicIdentity, BrokerTopicInfo], clusterContext: ClusterContext, metrics: Option[BrokerMetrics] = None, messagesPerSecCountHistory: Option[Queue[BrokerMessagesPerSecCount]] = None, - stats: Option[BrokerClusterStats] = None) extends QueryResponse { + stats: Option[BrokerClusterStats] = None, + broker: Option[BrokerIdentity] = None + ) extends QueryResponse { def numTopics : Int = topicPartitions.size def numPartitions : Int = topicPartitions.values.foldLeft(0)((acc,i) => acc + i.partitions.size) def numPartitionsAsLeader : Int = topicPartitions.values.foldLeft(0)((acc,i) => acc + i.partitionsAsLeader.size) @@ -64,11 +66,13 @@ object ActorModel { case object CMGetView extends QueryRequest case class CMGetTopicIdentity(topic: String) extends QueryRequest + case class CMGetBrokerIdentity(broker: Int) extends QueryRequest case object CMGetClusterContext extends QueryRequest case class CMView(topicsCount: Int, brokersCount: Int, clusterContext: ClusterContext) extends QueryResponse case class CMGetConsumerIdentity(consumer: String, consumerType: ConsumerType) extends QueryRequest case class CMGetConsumedTopicState(consumer: String, topic: String, consumerType: ConsumerType) extends QueryRequest case class CMTopicIdentity(topicIdentity: Try[TopicIdentity]) extends QueryResponse + case class CMBrokerIdentity(brokerIdentity: Try[BrokerIdentity]) extends QueryResponse case class CMConsumerIdentity(consumerIdentity: Try[ConsumerIdentity]) extends QueryResponse case class CMConsumedTopic(ctIdentity: Try[ConsumedTopicState]) extends QueryResponse case class CMGetGeneratedPartitionAssignments(topic: String) extends QueryRequest @@ -87,6 +91,7 @@ object ActorModel { partitions: Int, readVersions: Map[String,Int]) extends CommandRequest case class CMUpdateTopicConfig(topic: String, config: Properties, readVersion: Int) extends CommandRequest + case class CMUpdateBrokerConfig(broker: Int, config: Properties, readVersion: Int) extends CommandRequest case class CMDeleteTopic(topic: String) extends CommandRequest case class CMRunPreferredLeaderElection(topics: Set[String]) extends CommandRequest case class CMSchedulePreferredLeaderElection(schedule: Map[String, Int]) extends CommandRequest @@ -128,6 +133,7 @@ object ActorModel { partitions: Int, readVersions: Map[String, Int]) extends CommandRequest case class KCUpdateTopicConfig(topic: String, config: Properties, readVersion: Int) extends CommandRequest + case class KCUpdateBrokerConfig(broker: Int, config: Properties, readVersion: Int) extends CommandRequest case class KCDeleteTopic(topic: String) extends CommandRequest case class KCPreferredReplicaLeaderElection(topicAndPartition: Set[TopicPartition]) extends CommandRequest case class KCReassignPartition(currentTopicIdentity: Map[String, TopicIdentity] @@ -161,6 +167,7 @@ object ActorModel { case object KSGetConsumers extends KSRequest case class KSGetTopicConfig(topic: String) extends KSRequest case class KSGetTopicDescription(topic: String) extends KSRequest + case class KSGetBrokerDescription(broker: Int) extends KSRequest case class KSGetAllTopicDescriptions(lastUpdateMillis: Option[Long]= None) extends KSRequest case class KSGetTopicDescriptions(topics: Set[String]) extends KSRequest case class KSGetConsumerDescription(consumer: String, consumerType: ConsumerType) extends KSRequest @@ -199,6 +206,9 @@ object ActorModel { case class ConsumerNameAndType(name: String, consumerType: ConsumerType) case class ConsumerList(list: IndexedSeq[ConsumerNameAndType], clusterContext: ClusterContext) extends QueryResponse + case class BrokerDescription(broker: Int, + config:Option[(Int,String)] + )extends QueryResponse case class TopicDescription(topic: String, description: (Int,String), partitionState: Option[Map[String, String]], @@ -237,7 +247,7 @@ object ActorModel { case class GeneratedPartitionAssignments(topic: String, assignments: Map[Int, Seq[Int]], nonExistentBrokers: Set[Int]) - case class BrokerIdentity(id: Int, host: String, jmxPort: Int, secure: Boolean, nonSecure:Boolean, endpoints: Map[SecurityProtocol, Int]) { + case class BrokerIdentity(id: Int, host: String, jmxPort: Int, secure: Boolean, nonSecure:Boolean, endpoints: Map[SecurityProtocol, Int], config: List[(String,String)]=List(),configReadVersion: Int= -1) { def endpointsString: String = endpoints.toList.map(tpl => s"${tpl._1.stringId}:${tpl._2}").mkString(",") } @@ -466,6 +476,27 @@ import scala.language.reflectiveCalls import scala.language.reflectiveCalls import scala.concurrent.duration._ + def parseCofig(config: Option[(Int,String)]): (Int,Map[String, String]) ={ + import org.json4s.jackson.JsonMethods._ + import org.json4s.scalaz.JsonScalaz._ + import org.json4s._ + try { + val resultOption: Option[(Int,Map[String, String])] = config.map { configString => + val configJson = parse(configString._2) + val configMap : Map[String, String] = field[Map[String,String]]("config")(configJson).fold({ e => + logger.error(s"Failed to parse topic config ${configString._2}") + Map.empty + }, identity) + (configString._1,configMap) + } + resultOption.getOrElse((-1,Map.empty[String, String])) + } catch { + case e: Exception => + logger.error(s"[Failed to parse config : ${config.getOrElse("")}",e) + (-1,Map.empty[String, String]) + } + } + implicit val formats = Serialization.formats(FullTypeHints(List(classOf[TopicIdentity]))) // Adding a write method to transform/sort the partitionsIdentity to be more readable in JSON and include Topic Identity vals implicit def topicIdentityJSONW: JSONW[TopicIdentity] = new JSONW[TopicIdentity] { @@ -548,21 +579,7 @@ import scala.language.reflectiveCalls val partMap = getPartitionReplicaMap(td) val tpi : Map[Int,TopicPartitionIdentity] = getTopicPartitionIdentity(td, partMap, tdPrevious, tpSizes.getOrElse(Map.empty)) val config : (Int,Map[String, String]) = { - try { - val resultOption: Option[(Int,Map[String, String])] = td.config.map { configString => - val configJson = parse(configString._2) - val configMap : Map[String, String] = field[Map[String,String]]("config")(configJson).fold({ e => - logger.error(s"Failed to parse topic config ${configString._2}") - Map.empty - }, identity) - (configString._1,configMap) - } - resultOption.getOrElse((-1,Map.empty[String, String])) - } catch { - case e: Exception => - logger.error(s"[topic=${td.topic}] Failed to parse topic config : ${td.config.getOrElse("")}",e) - (-1,Map.empty[String, String]) - } + parseCofig(td.config) } val size = tpi.flatMap(_._2.leaderSize).reduceLeftOption{ _ + _ }.map(FormatMetric.sizeFormat(_)) TopicIdentity(td.topic,td.description._1,partMap.size,tpi,brokers,config._1,config._2.toList, clusterContext, tm, size) diff --git a/app/kafka/manager/utils/AdminUtils.scala b/app/kafka/manager/utils/AdminUtils.scala index 382a33225..a2d11feea 100644 --- a/app/kafka/manager/utils/AdminUtils.scala +++ b/app/kafka/manager/utils/AdminUtils.scala @@ -134,6 +134,15 @@ class AdminUtils(version: KafkaVersion) extends Logging { writeTopicPartitionAssignment(curator, topic, partitionReplicaAssignment, update, readVersion) } + private def writeBrokerConfig(curator: CuratorFramework, broker: Int, config: Properties, readVersion: Int): Unit ={ + val configMap: mutable.Map[String, String] = { + import scala.collection.JavaConverters._ + config.asScala + } + val map : Map[String, Any] = Map("version" -> 1, "config" -> configMap) + ZkUtils.updatePersistentPath(curator, ZkUtils.getBrokerConfigPath(broker), toJson(map), readVersion) + } + /** * Write out the topic config to zk, if there is any */ @@ -247,6 +256,12 @@ class AdminUtils(version: KafkaVersion) extends Logging { } } + def changeBrokerConfig(curator: CuratorFramework, broker: Int, config: Properties, readVersion: Int): Unit ={ + BrokerConfigs.validate(version,config) + + writeBrokerConfig(curator, broker, config, readVersion) + } + /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers * @param curator: The zk client handle used to write the new config to zookeeper diff --git a/app/kafka/manager/utils/BrokerConfigs.scala b/app/kafka/manager/utils/BrokerConfigs.scala new file mode 100644 index 000000000..1b4f89888 --- /dev/null +++ b/app/kafka/manager/utils/BrokerConfigs.scala @@ -0,0 +1,55 @@ +/** + * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 + * See accompanying LICENSE file. + */ + +package kafka.manager.utils + +import java.util.Properties + +import kafka.manager.model.{KafkaVersion, Kafka_0_10_1_1} + + +trait BrokerConfigs { + def configNames: Seq[String] + + def validate(props: Properties) + + def configNamesAndDoc: Seq[(String, String)] +} +object BrokerConfigs{ + val brokerConfigsByVersion: Map[KafkaVersion, BrokerConfigs] = Map( + Kafka_0_10_1_1 -> zero11.BrokerConfig, + ) + + def configNames(version: KafkaVersion): Seq[String] = { + brokerConfigsByVersion.get(version) match { + case Some(tc) => tc.configNames + case None => throw new IllegalArgumentException(s"Undefined topic configs for version : $version, cannot get config names") + } + } + + def validate(version: KafkaVersion, props: Properties): Unit = { + brokerConfigsByVersion.get(version) match { + case Some(tc) => tc.validate(props) + case None =>{ + if(version==Kafka_0_10_1_1){ + throw new IllegalArgumentException(s"Undefined broker configs for version : $version, cannot validate config") + } + else { + //use default 0.10.1.1 to check, other versions likely to be the same, avoid many same config class + validate(Kafka_0_10_1_1,props) + } + } + } + } + + def configNamesAndDoc(version: KafkaVersion): Seq[(String, String)] = { + brokerConfigsByVersion.get(version) match { + case Some(tc) => tc.configNamesAndDoc + case None => throw new IllegalArgumentException(s"Undefined topic configs for version : $version, cannot get config names and doc") + } + } +} + + diff --git a/app/kafka/manager/utils/ZkUtils.scala b/app/kafka/manager/utils/ZkUtils.scala index 9be87f37a..dadb6d71b 100644 --- a/app/kafka/manager/utils/ZkUtils.scala +++ b/app/kafka/manager/utils/ZkUtils.scala @@ -34,6 +34,7 @@ object ZkUtils { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + val BrokerConfigPath = "/config/brokers" val TopicConfigPath = "/config/topics" val TopicConfigChangesPath = "/config/changes" val ControllerPath = "/controller" @@ -55,6 +56,9 @@ object ZkUtils { def getTopicConfigPath(topic: String): String = TopicConfigPath + "/" + topic + def getBrokerConfigPath(broker: Int): String = + BrokerConfigPath + "/" + broker.toString + def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic diff --git a/app/kafka/manager/utils/zero11/BrokerConfig.scala b/app/kafka/manager/utils/zero11/BrokerConfig.scala new file mode 100644 index 000000000..b2081f802 --- /dev/null +++ b/app/kafka/manager/utils/zero11/BrokerConfig.scala @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.manager.utils.zero11 + +import java.util.Properties + +import kafka.manager.utils.BrokerConfigs +import kafka.server.ReplicationQuotaManagerConfig +import org.apache.kafka.common.config.ConfigDef.{ConfigKey, Validator} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} +import org.apache.kafka.common.errors.InvalidConfigurationException + +import scala.collection.JavaConverters._ +import scala.collection.mutable + + + +case class BrokerConfig(props: java.util.Map[_, _]) extends AbstractConfig(BrokerConfig.configDef, props, false) { + +} + +object BrokerConfig extends BrokerConfigs { + + //Properties + val LeaderReplicationThrottledRateProp = "leader.replication.throttled.rate" + val FollowerReplicationThrottledRateProp = "follower.replication.throttled.rate" + + //Defaults + val DefaultReplicationThrottledRate = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault + + //Documentation + val LeaderReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec) on replication traffic for leaders enumerated in the " + + s"property ${LogConfig.LeaderReplicationThrottledReplicasProp} (for each topic). This property can be only set dynamically. It is suggested that the " + + s"limit be kept above 1MB/s for accurate behaviour." + val FollowerReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec) on replication traffic for followers enumerated in the " + + s"property ${LogConfig.FollowerReplicationThrottledReplicasProp} (for each topic). This property can be only set dynamically. It is suggested that the " + + s"limit be kept above 1MB/s for accurate behaviour." + + + private class BrokerConfigDef extends ConfigDef { + private final val serverDefaultConfigNames = mutable.Map[String, String]() + + def define(name: String, defType: ConfigDef.Type, defaultValue: Any, validator: Validator, + importance: ConfigDef.Importance, doc: String, serverDefaultConfigName: String): BrokerConfigDef = { + super.define(name, defType, defaultValue, validator, importance, doc) + serverDefaultConfigNames.put(name, serverDefaultConfigName) + this + } + + def define(name: String, defType: ConfigDef.Type, defaultValue: Any, importance: ConfigDef.Importance, + documentation: String, serverDefaultConfigName: String): BrokerConfigDef = { + super.define(name, defType, defaultValue, importance, documentation) + serverDefaultConfigNames.put(name, serverDefaultConfigName) + this + } + + def define(name: String, defType: ConfigDef.Type, importance: ConfigDef.Importance, documentation: String, + serverDefaultConfigName: String): BrokerConfigDef = { + super.define(name, defType, importance, documentation) + serverDefaultConfigNames.put(name, serverDefaultConfigName) + this + } + + override def headers = List("Name", "Description", "Type", "Default", "Valid Values", "Server Default Property", "Importance").asJava + + override def getConfigValue(key: ConfigKey, headerName: String): String = { + headerName match { + case "Server Default Property" => serverDefaultConfigNames.get(key.name).get + case _ => super.getConfigValue(key, headerName) + } + } + + def serverConfigName(configName: String): Option[String] = serverDefaultConfigNames.get(configName) + } + private val configDef: BrokerConfigDef = { + import org.apache.kafka.common.config.ConfigDef.Importance._ + import org.apache.kafka.common.config.ConfigDef.Range._ + import org.apache.kafka.common.config.ConfigDef.Type._ + new BrokerConfigDef() + .define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc + ,LeaderReplicationThrottledRateProp) + .define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc + ,LeaderReplicationThrottledRateProp) + } + + def configNames: Seq[String] = configDef.names.asScala.toSeq.sorted + /** + * Check that property names are valid + */ + def validateNames(props: Properties) { + val names = configNames + for(name <- props.asScala.keys) + if (!names.contains(name)) + throw new InvalidConfigurationException(s"Unknown Log configuration $name.") + } + /** + * Check that the given properties contain only valid log config names and that all values can be parsed and are valid + */ + def validate(props: Properties) { + validateNames(props) + configDef.parse(props) + } + + def configNamesAndDoc: Seq[(String, String)] = { + Option(configDef).fold { + configNames.map(n => n -> "") + } { + configDef => + val keyMap = configDef.configKeys() + configNames.map(n => n -> Option(keyMap.get(n)).map(_.documentation).flatMap(Option.apply).getOrElse("")) + } + } +} diff --git a/app/models/form/BrokerOperation.scala b/app/models/form/BrokerOperation.scala new file mode 100644 index 000000000..7301eba2b --- /dev/null +++ b/app/models/form/BrokerOperation.scala @@ -0,0 +1,17 @@ +/** + * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 + * See accompanying LICENSE file. + */ + +package models.form + +/** + * @author liu qiang + */ + +sealed trait BrokerOperation + +case class BConfig(name: String, value: Option[String], help: Option[String]) + +case class UpdateBrokerConfig(broker: Int, configs: List[BConfig], readVersion: Int) extends BrokerOperation + diff --git a/app/models/navigation/BreadCrumbs.scala b/app/models/navigation/BreadCrumbs.scala index 40c5ef086..6cfb2114d 100644 --- a/app/models/navigation/BreadCrumbs.scala +++ b/app/models/navigation/BreadCrumbs.scala @@ -96,6 +96,15 @@ object BreadCrumbs { ) ) + val brokerBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map( + "Broker View" -> IndexedSeq( + "Clusters".baseRouteBreadCrumb, + BCDynamicNamedLink(identity,"Summary".clusterRoute), + "Brokers".clusterRouteBreadCrumb, + BCDynamicMultiNamedLink(identity,"Broker View".brokerRoute) + ) + ) + val consumerBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map( "Consumer View" -> IndexedSeq( "Clusters".baseRouteBreadCrumb, @@ -150,6 +159,17 @@ object BreadCrumbs { } } + private[this] def renderWithClusterAndBroker(s: String, clusterName: String, broker: Int) : IndexedSeq[BreadCrumbRendered] = { + brokerBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map { + case BCStaticLink(n,c) => BCLink(n,c.toString()) + case BCDynamicNamedLink(cn, cl) => BCLink(cn(clusterName),cl(clusterName).toString()) + case BCDynamicMultiNamedLink(cn, cl) => BCLink(cn(broker.toString),cl(clusterName,List(broker.toString)).toString()) + case BCDynamicLink(cn, cl) => BCLink(cn,cl(clusterName).toString()) + case BCDynamicText(cn) => BCText(cn(clusterName)) + case any => throw new UnsupportedOperationException(s"Unhandled breadcrumb : $any") + } + } + private[this] def renderWithClusterAndConsumer(s: String, clusterName: String, consumer: String, consumerType: String, topic: String = "") : IndexedSeq[BreadCrumbRendered] = { consumerBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map { case BCStaticLink(n,c) => BCLink(n,c.toString()) @@ -166,6 +186,10 @@ object BreadCrumbs { renderWithClusterAndTopic(s, clusterName,topic) :+ BCActive(name) } + def withNamedViewAndClusterAndBroker(s: String, clusterName: String, broker: Int, name: String) : IndexedSeq[BreadCrumbRendered] = { + renderWithClusterAndBroker(s, clusterName, broker) :+ BCActive(name) + } + private[this] def renderWithClusterAndLogkafka(s: String, clusterName: String, logkafka_id: String, log_path: String) : IndexedSeq[BreadCrumbRendered] = { val hl = logkafka_id + "?" + log_path logkafkaBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map { diff --git a/app/models/navigation/QuickRoutes.scala b/app/models/navigation/QuickRoutes.scala index 6c539bcc9..1a0b357f6 100644 --- a/app/models/navigation/QuickRoutes.scala +++ b/app/models/navigation/QuickRoutes.scala @@ -33,6 +33,11 @@ object QuickRoutes { "List Logkafka" -> controllers.routes.Logkafka.logkafkas, "Create Logkafka" -> controllers.routes.Logkafka.createLogkafka ) + + val brokerRoutes : Map[String, (String, Int) => Call] = Map( + "Broker View" -> ((c,t)=>controllers.routes.Cluster.broker(c,t)), + ) + val topicRoutes : Map[String, (String, String) => Call] = Map( "Topic View" -> ((c, t) => controllers.routes.Topic.topic(c, t, force=false)), "Add Partitions" -> controllers.routes.Topic.addPartitions, @@ -85,6 +90,12 @@ object QuickRoutes { } } + implicit class BrokerRoute(s: String) { + def brokerRoute(c: String, t: List[String]): Call = { + brokerRoutes(s)(c,t.head.toInt) + } + } + implicit class ConsumerRoute(s: String) { def consumerRouteMenuItem(cluster: String, consumer: String, consumerType: String): (String, Call) = { s -> consumerRoutes(s)(cluster,consumer,consumerType) diff --git a/app/views/broker/brokerViewContent.scala.html b/app/views/broker/brokerViewContent.scala.html index 2d08fe314..4b35fa5e3 100644 --- a/app/views/broker/brokerViewContent.scala.html +++ b/app/views/broker/brokerViewContent.scala.html @@ -2,7 +2,7 @@ * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 * See accompanying LICENSE file. *@ -@(cluster: String, brokerId: Int, brokerView :kafka.manager.model.ActorModel.BVView)(implicit messages: play.api.i18n.Messages, request:RequestHeader) +@(cluster: String, brokerId: Int, brokerView :kafka.manager.model.ActorModel.BVView)(implicit af: features.ApplicationFeatures, messages: play.api.i18n.Messages, request:RequestHeader) @renderBrokerMetrics = { @if(brokerView.clusterContext.clusterFeatures.features(kafka.manager.features.KMJMXMetricsFeature)) { @@ -31,10 +31,33 @@ } + @if(!brokerView.broker.isEmpty && !brokerView.broker.get.config.isEmpty) { + + + + + + @for( (k,v) <- brokerView.broker.get.config) { + + + + + } + +
ConfigValue
@k@v
+ }
+ @features.app(features.KMClusterManagerFeature) { +
+

Operations

+ +
+ }

Metrics

diff --git a/app/views/broker/updateConfig.scala.html b/app/views/broker/updateConfig.scala.html new file mode 100644 index 000000000..9eadeac85 --- /dev/null +++ b/app/views/broker/updateConfig.scala.html @@ -0,0 +1,57 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@import scalaz.{\/} +@(cluster: String, brokerId: Int, errorOrForm: kafka.manager.ApiError \/ (Form[models.form.UpdateBrokerConfig], String) +)(implicit af: features.ApplicationFeatures, messages: play.api.i18n.Messages, menus: models.navigation.Menus, request:RequestHeader) + +@import helper._ +@import controllers.routes + + +@theMenu = { + @views.html.navigation.clusterMenu(cluster,"Broker","Update Config",menus.clusterMenus(cluster)( + kafka.manager.features.ClusterFeatures.default)) +} + +@renderForm(updateBrokerConfigForm: Form[models.form.UpdateBrokerConfig]) = { + @b4.vertical.form(routes.Cluster.handleUpdateBrokerConfig(cluster, brokerId)) { implicit fc => + + + + + + + + + +
Update Config +
notice: replication.throttled.rate need set topic level replication.throttled.replicas to work.
+ @b4.text(updateBrokerConfigForm("broker"), '_label -> "Broker", 'placeholder -> "", 'autofocus -> true ) + @b4.hidden(updateBrokerConfigForm("readVersion").name,updateBrokerConfigForm("readVersion").value.getOrElse(-1)) + @helper.repeat(updateBrokerConfigForm("configs"), min = 1) { configsForm => + @b4.hidden(configsForm("name").name, configsForm("name").value.getOrElse("")) + @b4.hidden(configsForm("help").name, configsForm("help").value.getOrElse("")) + @b4.text(configsForm("value"), '_label -> configsForm("name").value.getOrElse(""), '_help -> configsForm("help").value.getOrElse("")) + } +
+ @b4.submit('class -> "submit-button btn btn-primary"){ Update Config } + Cancel + } +} + +@main( + "Update Config", + menu = theMenu, + breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withNamedViewAndClusterAndBroker("Broker View",cluster,brokerId,"Update Config"))) { +
+
+

Update Config

+
+ @errorOrForm.fold( views.html.errors.onApiError(_), t => renderForm(t._1)) +
+
+
+} +
diff --git a/conf/routes b/conf/routes index 27ec9de98..ba8b1e70c 100644 --- a/conf/routes +++ b/conf/routes @@ -16,6 +16,8 @@ GET /clusters/:c/logkafkas controllers.Logkafka GET /clusters/:c/logkafkas/:h/:l controllers.Logkafka.logkafka(c:String, h:String, l:String) GET /clusters/:c/brokers controllers.Cluster.brokers(c: String) GET /clusters/:c/brokers/:b controllers.Cluster.broker(c: String, b:Int) +GET /clusters/:c/brokers/:b/updateConfig controllers.Cluster.updateBrokerConfig(c:String,b: Int) +POST /clusters/:c/brokers/:b/updateConfig controllers.Cluster.handleUpdateBrokerConfig(c:String,b: Int) GET /clusters/:c/consumers controllers.Consumer.consumers(c: String) GET /clusters/:c/consumers/:g/type/:ct controllers.Consumer.consumer(c: String, g:String, ct: String) GET /clusters/:c/consumers/:g/topic/:t/type/:ct controllers.Consumer.consumerAndTopic(c: String, g:String, t:String, ct: String) diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index ca070c319..94fe80e41 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -391,6 +391,32 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } } + test("update broker config") { + val tiFuture= kafkaManager.getBrokerIdentity("dev",0) + val tiOrError = Await.result(tiFuture, duration) + assert(tiOrError.isRight, "Failed to get broker identity!") + val ti = tiOrError.toOption.get + val config = new Properties() + config.put(kafka.manager.utils.zero11.BrokerConfig.FollowerReplicationThrottledRateProp,"10000000") + val configReadVersion = ti.configReadVersion + val future = kafkaManager.updateBrokerConfig("dev",0,config,configReadVersion) + val result = Await.result(future,duration) + assert(result.isRight === true) + Thread.sleep(2000) + + //check new config + { + val tiFuture= kafkaManager.getBrokerIdentity("dev",0) + val tiOrError = Await.result(tiFuture, duration) + assert(tiOrError.isRight, "Failed to get broker identity!") + val ti = tiOrError.toOption.get + assert(ti.configReadVersion > configReadVersion) + assert(ti.config.toMap.apply(kafka.manager.utils.zero11.BrokerConfig.FollowerReplicationThrottledRateProp) === "10000000") + } + } + + + test("delete topic") { val futureA = kafkaManager.deleteTopic("dev",createTopicNameA) val resultA = Await.result(futureA,duration)