diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index fc87fee..3dd7738 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -23,7 +23,7 @@ import common.Util._ import play.api.libs.json._ import play.api.libs.json.JsObject import play.api.libs.iteratee.{Concurrent, Iteratee} -import common.Registry +import common.{Message, Registry} import common.Registry.PropertyConstants import java.util import kafka.javaapi.consumer.EventHandler @@ -36,9 +36,25 @@ import com.twitter.zk.{ZNode, ZkClient} import scala.util.Random import okapies.finagle.Kafka import kafka.api.OffsetRequest +import kafka.utils.{ZkUtils, ZKStringSerializer} +import play.api.data.{Forms, Form} +import play.api.libs.concurrent.Akka +import play.api.data.Forms._ +import play.api.libs.json.JsArray +import scala.Some +import kafka.message.MessageAndMetadata +import play.api.libs.json.JsObject object Topic extends Controller { + val topicForm = Forms.tuple( + "name" -> Forms.text, + "group" -> Forms.text, + "zookeeper" -> Forms.text, + "partitions" -> Forms.number, + "replications" -> Forms.number + ) + object TopicsWrites extends Writes[List[Map[String, Object]]] { def writes(l: List[Map[String, Object]]) = { val topics = l.map { t => @@ -145,6 +161,55 @@ object Topic extends Controller { (in, out) } + def create() = Action { implicit request => + val result = Form(topicForm).bindFromRequest.fold( + formFailure => BadRequest, + formSuccess => { + + val name: String = formSuccess._1 + val group: String = formSuccess._2 + val zookeeper: String = formSuccess._3 + val partitions: Int = formSuccess._4 + val replications: Int = formSuccess._5 + + import org.I0Itec.zkclient.ZkClient + + val zk = models.Zookeeper.findById(zookeeper).get + + val zkClient = new ZkClient(zk.host+":"+zk.port, 30000, 30000, ZKStringSerializer) + try { + kafka.admin.AdminUtils.createTopic(zkClient, name, partitions, replications) + + } finally { + zkClient.close() + } + + Ok + } + + ) + + result + } + + def delete(name: String, zookeeper: String) = Action { + + import org.I0Itec.zkclient.ZkClient + + val zk = models.Zookeeper.findById(zookeeper).get + + val zkClient = new ZkClient(zk.host+":"+zk.port, 30000, 30000, ZKStringSerializer) + try { + //How it is deleted in the DeleteTopicCommand + zkClient.deleteRecursive(ZkUtils.getTopicPath(name)) + //kafka.admin.AdminUtils.deleteTopic(zkClient, name) + + } finally { + zkClient.close() + } + Ok + } + private def createConsumerConfig(zookeeperAddress: String, gid: String): ConsumerConfig = { val props = new Properties() props.put("zookeeper.connect", zookeeperAddress) diff --git a/conf/routes b/conf/routes index 95c106e..8a4865a 100644 --- a/conf/routes +++ b/conf/routes @@ -15,6 +15,8 @@ GET /topics controllers.As GET /topics.json controllers.Topic.index() GET /topics.json/:name/:zookeeper controllers.Topic.show(name, zookeeper) GET /topics.json/:name/:zookeeper/feed controllers.Topic.feed(name, zookeeper) +POST /topics.json controllers.Topic.create() +DELETE /topics.json/:name/:zookeeper controllers.Topic.delete(name, zookeeper) GET /topics/:name/:zookeeper controllers.IgnoreParamAssets.at(path="/public", file="html/partials/topic/show.html", name, zookeeper) GET /consumergroups.json/:name/:topic/:zookeeper controllers.ConsumerGroup.show(name, topic, zookeeper) diff --git a/public/html/partials/topic/index.html b/public/html/partials/topic/index.html index afeab06..2e69642 100644 --- a/public/html/partials/topic/index.html +++ b/public/html/partials/topic/index.html @@ -1,30 +1,91 @@ -
- - - - - - - - - - - - - - - - - - - - - - - - -
ZookeeperTopicPartitionLog Size
- - - {{topic.zookeeper}}{{topic.name}}{{topic.logSize}}
{{$index}}{{partition.logSize}}
-
\ No newline at end of file + + +
+
+
+ + + + + + + + + + + + + + + + + + + + + + + + + +
ZookeeperTopicPartitionLog Size
+ + + {{topic.zookeeper}}{{topic.name}}{{topic.logSize}} + +
{{$index}}{{partition.logSize}}
+
+
+
+
+
+ + + +

A name + for Topic is required.

+

+ Topic must be in a valid format.

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+

+ +

+
+
+
diff --git a/public/javascripts/topics-controller.js b/public/javascripts/topics-controller.js index d0c472a..06766ec 100644 --- a/public/javascripts/topics-controller.js +++ b/public/javascripts/topics-controller.js @@ -12,7 +12,42 @@ app.controller("TopicsController", function ($scope, $location, $http, $filter) }); }); + $scope.ctopic = {}; + $scope.ctopic.partitions = 1; + $scope.ctopic.replications = 1; + $scope.groups = [ + {name:'All'}, + {name:'Development'}, + {name:'Production'}, + {name:'Staging'}, + {name:'Test'}]; + + $scope.zookeepers = {}; + $scope.getTopic = function (topic) { $location.path('/topics/' + topic.name + '/' + topic.zookeeper); }; -}); \ No newline at end of file + $scope.getZookeepers = function (group) { + $http.get('/zookeepers.json/' + group). + success(function (data) { + $scope.zookeepers = data + }); + }; + + $scope.selectGroup = function () { + $scope.getZookeepers($scope.ctopic.group.name) + }; + + $scope.createTopic = function (ctopic) { + console.log(ctopic) + $http.post('/topics.json', { name: ctopic.name, group: ctopic.group.name, zookeeper: ctopic.zookeeper.name, partitions: ctopic.partitions, replications: ctopic.replications}).success(function () { + location.reload(); + }); + }; + + $scope.removeTopic = function (topic) { + $http.delete('/topics.json/' + topic.name + '/'+topic.zookeeper).success(function () { + location.reload(); + }); + }; +});