Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Commit

Permalink
[Khermes #98]Show config content command. (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
eruizgar authored and albertostratio committed Apr 23, 2017
1 parent ac4dcc9 commit 0d00f90
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
14 changes: 13 additions & 1 deletion src/main/resources/web/js/console.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ jQuery(document).ready(function($) {
else if (command == 'show twirl-template') {
sendMessage('[command]\nshow twirl-template')
}
else if (command.startsWith('show kafka-config ')) {
sendMessage('[command]\nshow kafka-config\n[name]\n'+command.split(" ").pop(2)+'\n')
}
else if (command.startsWith('show generator-config')) {
sendMessage('[command]\nshow generator-config\n[name]\n'+command.split(" ").pop(2)+'\n')
}
else if (command.startsWith('show avro-config')) {
sendMessage('[command]\nshow avro-config\n[name]\n'+command.split(" ").pop(2)+'\n')
}
else if (command.startsWith('show twirl-template')) {
sendMessage('[command]\nshow twirl-template\n[name]\n'+command.split(" ").pop(2)+'\n')
}
else {
term.echo("unknown command " + command);
}
Expand Down Expand Up @@ -143,7 +155,7 @@ function setupWebSocket(endpoint, name, term) {
ws.onmessage = function(event) {
console.log(event);
data = event.data;
if (data.indexOf("-") != -1)
if (data.indexOf("|") != -1)
term.echo(parseLs(event.data))
else
term.echo(parseOkResponse(event.data));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.stratio.khermes.commons.constants.AppConstants
import com.stratio.khermes.commons.implicits.AppImplicits._

import scala.concurrent.duration._
import scala.util.Try
import scala.util.{Failure, Success, Try}

class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result] with ActorLogging {

Expand Down Expand Up @@ -72,17 +72,17 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result]
case WSProtocolMessage(WsProtocolCommand.CreateAvroConfig, args) =>
createConfig(args, WsProtocolCommand.CreateAvroConfig, AppConstants.AvroConfigPath)

case WSProtocolMessage(WsProtocolCommand.ShowTwirlTemplate, _) =>
showConfig(AppConstants.TwirlTemplatePath)
case WSProtocolMessage(WsProtocolCommand.ShowTwirlTemplate, args) =>
showConfig(args, WsProtocolCommand.ShowTwirlTemplate, AppConstants.TwirlTemplatePath)

case WSProtocolMessage(WsProtocolCommand.ShowGeneratorConfig, _) =>
showConfig(AppConstants.GeneratorConfigPath)
case WSProtocolMessage(WsProtocolCommand.ShowGeneratorConfig, args) =>
showConfig(args, WsProtocolCommand.ShowGeneratorConfig, AppConstants.GeneratorConfigPath)

case WSProtocolMessage(WsProtocolCommand.ShowKafkaConfig, _) =>
showConfig(AppConstants.KafkaConfigPath)
case WSProtocolMessage(WsProtocolCommand.ShowKafkaConfig, args) =>
showConfig(args, WsProtocolCommand.ShowKafkaConfig, AppConstants.KafkaConfigPath)

case WSProtocolMessage(WsProtocolCommand.ShowAvroConfig, _) =>
showConfig(AppConstants.AvroConfigPath)
case WSProtocolMessage(WsProtocolCommand.ShowAvroConfig, args) =>
showConfig(args, WsProtocolCommand.ShowAvroConfig, AppConstants.AvroConfigPath)

case result: NodeSupervisorActor.Result =>
collectResult(result)
Expand Down Expand Up @@ -115,11 +115,11 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result]
val argsAvroConfigOption = args.get(WsProtocolCommand.ArgsAvroConfig)
val nodeIds = args.get(WsProtocolCommand.ArgsNodeIds).map(value => value.split(" ")).toSeq.flatten

val twirlTemplate = configDAO.read(s"${AppConstants.TwirlTemplatePath}/${argsTwirlTemplate}")
val kafkaConfig = configDAO.read(s"${AppConstants.KafkaConfigPath}/${argsKafkaConfig}")
val generatorConfig = configDAO.read(s"${AppConstants.GeneratorConfigPath}/${argsGeneratorConfig}")
val twirlTemplate = configDAO.read(s"${AppConstants.TwirlTemplatePath}/$argsTwirlTemplate")
val kafkaConfig = configDAO.read(s"${AppConstants.KafkaConfigPath}/$argsKafkaConfig")
val generatorConfig = configDAO.read(s"${AppConstants.GeneratorConfigPath}/$argsGeneratorConfig")
val avroConfig = argsAvroConfigOption.map(
argsAvroConfig => configDAO.read(s"${AppConstants.AvroConfigPath}/${argsAvroConfig}"))
argsAvroConfig => configDAO.read(s"${AppConstants.AvroConfigPath}/$argsAvroConfig"))

mediator ! Publish("content",
NodeSupervisorActor.Start(nodeIds, AppConfig(generatorConfig, kafkaConfig, twirlTemplate, avroConfig)))
Expand All @@ -132,19 +132,31 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result]
self ! Result("OK", s"Sending Stop signal to nodes ${nodeIds.mkString(" ")}")
}

def createConfig(args: Map[String,String], protocolCommand: WsProtocolCommandValue, basePath: String): Unit = {
def createConfig(args: Map[String, String], protocolCommand: WsProtocolCommandValue, basePath: String): Unit = {
val name = args.get(WsProtocolCommand.ArgsName).getOrElse(
throw new IllegalArgumentException(s"Not found name for ${protocolCommand.toString}"))
val content = args.get(WsProtocolCommand.ArgsContent).getOrElse(
throw new IllegalArgumentException(s"not found content for ${protocolCommand.toString}"))

configDAO.create(s"${basePath}/${name}", content)
self ! Result("OK", s"Created node in ZK: ${basePath}/${name}")
configDAO.create(s"$basePath/$name", content)
self ! Result("OK", s"Created node in ZK: $basePath/$name")
}

def showConfig(basePath: String): Unit ={
val list = configDAO.list(s"$basePath")
self ! Result(s"OK \n$list", s"Show config of $basePath")
def showConfig(args: Map[String, String], protocolCommand: WsProtocolCommandValue, basePath: String): Unit = {
val name = args.getOrElse(WsProtocolCommand.ArgsName, "")
if (name == "") {
val list = Try(configDAO.list(s"$basePath")) match {
case Success(list) => s"OK \n$list"
case Failure(e) => s"There is none $basePath stored."
}
self ! Result(s"$list", s"Show config of $basePath")
} else {
val read = Try(configDAO.read(s"$basePath/$name")) match {
case Success(config) => s"OK \n$config"
case Failure(e) => s"$name config does not exist."
}
self ! Result(s"$read", s"Show config of $basePath/$name")
}
}

def checkCommandHasEnd(): Unit = {
Expand All @@ -159,7 +171,7 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result]
}

def performOnNext(message: CommandCollectorActor.Result): Unit = {
if(totalDemand > 0 && isActive) {
if (totalDemand > 0 && isActive) {
onNext(message)
}
}
Expand All @@ -169,7 +181,10 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result]
}

object CommandCollectorActor {

case object CheckCommandHasEnd

case class Result(value: String)

def props: Props = Props[CommandCollectorActor]
}

0 comments on commit 0d00f90

Please sign in to comment.