Skip to content

Commit

Permalink
Sort topics in consumer view
Browse files Browse the repository at this point in the history
This will sort the list of topics a consumer is assigned to.
  • Loading branch information
bjoernhaeuser committed Jun 20, 2020
1 parent 2b3ca92 commit 5446a78
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
23 changes: 12 additions & 11 deletions app/kafka/manager/model/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import scala.collection.immutable.Queue
import scala.util.Try
import scalaz.{NonEmptyList, Validation}

import scala.collection.immutable.SortedMap
import scala.collection.immutable.Map

/**
Expand Down Expand Up @@ -101,8 +102,8 @@ object ActorModel {
log_path: String,
config: Properties = new Properties
) extends CommandRequest
case class CMUpdateLogkafkaConfig(logkafka_id: String,
log_path: String,
case class CMUpdateLogkafkaConfig(logkafka_id: String,
log_path: String,
config: Properties,
checkConfig: Boolean = true
) extends CommandRequest
Expand Down Expand Up @@ -200,22 +201,22 @@ object ActorModel {

case class TopicDescription(topic: String,
description: (Int,String),
partitionState: Option[Map[String, String]],
partitionState: Option[Map[String, String]],
partitionOffsets: PartitionOffsetsCapture,
config:Option[(Int,String)]) extends QueryResponse
case class TopicDescriptions(descriptions: IndexedSeq[TopicDescription], lastUpdateMillis: Long) extends QueryResponse

case class BrokerList(list: IndexedSeq[BrokerIdentity], clusterContext: ClusterContext) extends QueryResponse

case class PreferredReplicaElection(startTime: DateTime,
case class PreferredReplicaElection(startTime: DateTime,
topicAndPartition: Set[TopicPartition],
endTime: Option[DateTime],
endTime: Option[DateTime],
clusterContext: ClusterContext) extends QueryResponse {
def sortedTopicPartitionList: List[(String, Int)] = topicAndPartition.toList.map(tp => (tp.topic(), tp.partition())).sortBy(_._1)
}
case class ReassignPartitions(startTime: DateTime,
case class ReassignPartitions(startTime: DateTime,
partitionsToBeReassigned: Map[TopicPartition, Seq[Int]],
endTime: Option[DateTime],
endTime: Option[DateTime],
clusterContext: ClusterContext) extends QueryResponse {
def sortedTopicPartitionAssignmentList : List[((String, Int), Seq[Int])] = partitionsToBeReassigned.toList.sortBy(partition => (partition._1.topic(), partition._1.partition())).map { case (tp, a) => ((tp.topic(), tp.partition()), a)}
}
Expand Down Expand Up @@ -371,7 +372,7 @@ import scala.language.reflectiveCalls

object PartitionOffsetsCapture {
val ZERO : Option[Double] = Option(0D)

val EMPTY : PartitionOffsetsCapture = PartitionOffsetsCapture(0, Map.empty)

def getRate(part: Int, currentOffsets: PartitionOffsetsCapture, previousOffsets: PartitionOffsetsCapture): Option[Double] = {
Expand Down Expand Up @@ -656,7 +657,7 @@ import scala.language.reflectiveCalls

case class ConsumerIdentity(consumerGroup:String,
consumerType: ConsumerType,
topicMap: Map[String, ConsumedTopicState],
topicMap: collection.Map[String, ConsumedTopicState],
clusterContext: ClusterContext)
object ConsumerIdentity extends Logging {
import scala.language.reflectiveCalls
Expand All @@ -669,7 +670,7 @@ import scala.language.reflectiveCalls
} yield (topic, cts)
ConsumerIdentity(cd.consumer,
cd.consumerType,
topicMap.toMap,
SortedMap(topicMap: _*),
clusterContext)
}

Expand Down Expand Up @@ -713,7 +714,7 @@ import scala.language.reflectiveCalls
}

case class BrokerClusterStats(perMessages: BigDecimal, perIncoming: BigDecimal, perOutgoing: BigDecimal)

sealed trait LKVRequest extends QueryRequest

case object LKVForceUpdate extends CommandRequest
Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ logLevel := Level.Info

resolvers += Resolver.url(
"bintray-sbt-plugin-releases",
url("http://dl.bintray.com/content/sbt/sbt-plugin-releases"))(
url("https://dl.bintray.com/content/sbt/sbt-plugin-releases"))(
Resolver.ivyStylePatterns)

addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.4")

// The Typesafe repository
resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += "Typesafe repository" at "https://repo.typesafe.com/typesafe/releases/"

// Use the Play sbt plugin for Play projects
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.6.21")
Expand Down

0 comments on commit 5446a78

Please sign in to comment.