From b27b901b997a5ba804b408fb65992a54f3da5db9 Mon Sep 17 00:00:00 2001 From: huafengw Date: Wed, 6 Jan 2016 14:26:02 +0800 Subject: [PATCH] add gearpump benchmark --- gearpump-benchmarks/pom.xml | 116 +++++++++++++++ .../src/main/resources/reference.conf | 5 + .../gearpump/benchmark/Advertising.scala | 138 ++++++++++++++++++ pom.xml | 31 ++++ stream-bench.sh | 61 ++++++++ 5 files changed, 351 insertions(+) create mode 100644 gearpump-benchmarks/pom.xml create mode 100644 gearpump-benchmarks/src/main/resources/reference.conf create mode 100644 gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala diff --git a/gearpump-benchmarks/pom.xml b/gearpump-benchmarks/pom.xml new file mode 100644 index 000000000..b78ec09a3 --- /dev/null +++ b/gearpump-benchmarks/pom.xml @@ -0,0 +1,116 @@ + + + + + yahoo-low-latency-bechmarks + com.yahoo.stream + 0.1.0 + + 4.0.0 + gearpump-benchmarks + + + + com.github.intel-hadoop + gearpump-streaming_${scala.binary.version} + provided + + + com.github.intel-hadoop + gearpump-core_${scala.binary.version} + provided + + + com.github.intel-hadoop + gearpump-external-kafka_${scala.binary.version} + + + org.json + json + + + com.yahoo.stream + streaming-benchmark-common + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + ${scala.version} + incremental + true + + -unchecked + -deprecation + -feature + + + -Xms1024m + -Xmx1024m + + + -source + ${java.version} + -target + ${java.version} + -Xlint:all,-serial,-path + + + + + org.apache.maven.plugins + maven-shade-plugin + + true + false + + + + + + + + package + + shade + + + + + + + diff --git a/gearpump-benchmarks/src/main/resources/reference.conf b/gearpump-benchmarks/src/main/resources/reference.conf new file mode 100644 index 000000000..3a245f0f6 --- /dev/null +++ b/gearpump-benchmarks/src/main/resources/reference.conf @@ -0,0 +1,5 @@ +gearpump { + serializers { + "scala.Tuple7" = "" + } +} \ No newline at end of file diff --git a/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala b/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala new file mode 100644 index 000000000..ff42c73bb --- /dev/null +++ b/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala @@ -0,0 +1,138 @@ +package gearpump.benchmark + +import akka.actor.ActorSystem +import benchmark.common.Utils +import benchmark.common.advertising.{CampaignProcessorCommon, RedisAdCampaignCache} +import io.gearpump.Message +import io.gearpump.partitioner.{Partitioner, UnicastPartitioner, HashPartitioner} +import io.gearpump.cluster.UserConfig +import io.gearpump.cluster.client.ClientContext +import io.gearpump.streaming.kafka.lib.StringMessageDecoder +import io.gearpump.streaming.{StreamApplication, Processor} +import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory} +import io.gearpump.streaming.source.{DefaultTimeStampFilter, DataSourceProcessor} +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} +import io.gearpump.util.{AkkaApp, Graph} +import org.json.JSONObject +import io.gearpump.util.Graph.Node +import scala.collection.JavaConverters._ + +object Advertising extends AkkaApp{ + + def application(args: Array[String], system: ActorSystem) : StreamApplication = { + implicit val actorSystem = system + val commonConfig = Utils.findAndReadConfigFile(args(0), true).asInstanceOf[java.util.Map[String, Any]] + + val cores = commonConfig.get("process.cores").asInstanceOf[Int] + val topic = commonConfig.get("kafka.topic").asInstanceOf[String] + val partitions = commonConfig.get("kafka.partitions").asInstanceOf[Int] + val redisHost = commonConfig.get("redis.host").asInstanceOf[String] + + val zookeeperHosts = commonConfig.get("zookeeper.servers").asInstanceOf[java.util.List[String]] match { + case l: java.util.List[String] => l.asScala.toSeq + case other => throw new ClassCastException(other + " not a List[String]") + } + val zookeeperPort = commonConfig.get("zookeeper.port").asInstanceOf[Int] + val zookeeperConnect = zookeeperHosts.map(_ + ":" + zookeeperPort).mkString(",") + + val kafkaHosts = commonConfig.get("kafka.brokers").asInstanceOf[java.util.List[String]] match { + case l: java.util.List[String] => l.asScala.toSeq + case other => throw new ClassCastException(other + " not a List[String]") + } + val kafkaPort = commonConfig.get("kafka.port").asInstanceOf[Int] + val brokerList = kafkaHosts.map(_ + ":" + kafkaPort).mkString(",") + + val parallel = Math.max(1, cores / 7) + val gearConfig = UserConfig.empty.withString("redis.host", redisHost) + val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) + val source = new KafkaSource(topic, zookeeperConnect, offsetStorageFactory, new StringMessageDecoder, new DefaultTimeStampFilter) + val sourceProcessor = DataSourceProcessor(source, partitions) + val deserializer = Processor[DeserializeTask](parallel) + val filter = Processor[EventFilterTask](parallel) + val projection = Processor[EventProjectionTask](parallel) + val join = Processor[RedisJoinTask](parallel) + val campaign = Processor[CampaignProcessorTask](parallel * 2) + val partitioner = new AdPartitioner + + val graph = Graph(sourceProcessor ~ new HashPartitioner ~> deserializer ~> filter ~> projection ~> join ~ partitioner ~> campaign) + StreamApplication("Advertising", graph, gearConfig) + } + + override def main(akkaConf: Advertising.Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + context.submit(application(args, context.system)) + context.close() + } + + override def help: Unit = {} +} + +class AdPartitioner extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + (msg.msg.asInstanceOf[(String, String, String)]._1.hashCode & Integer.MAX_VALUE) % partitionNum + } +} + +class DeserializeTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + override def onNext(msg : Message) : Unit = { + val jsonObj = new JSONObject(msg.msg.asInstanceOf[String]) + val tuple = ( + jsonObj.getString("user_id"), + jsonObj.getString("page_id"), + jsonObj.getString("ad_id"), + jsonObj.getString("ad_type"), + jsonObj.getString("event_type"), + jsonObj.getString("event_time"), + jsonObj.getString("ip_address") + ) + taskContext.output(Message(tuple, msg.timestamp)) + } +} + +class EventFilterTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + override def onNext(msg: Message): Unit = { + val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)] + if(tuple._5 == "view") { + taskContext.output(msg) + } + } +} + +class EventProjectionTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + override def onNext(msg: Message): Unit = { + val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)] + taskContext.output(Message((tuple._3, tuple._6), msg.timestamp)) + } +} + +class RedisJoinTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + private val redisHost = conf.getString("redis.host").get + private val redisAdCampaignCache = new RedisAdCampaignCache(redisHost) + + override def onStart(startTime : StartTime) : Unit = { + redisAdCampaignCache.prepare() + } + + override def onNext(msg: Message): Unit = { + val (ad_id, event_time) = msg.msg.asInstanceOf[(String, String)] + val campaign_id = redisAdCampaignCache.execute(ad_id) + if(campaign_id != null) { + val result = (campaign_id, ad_id, event_time) + taskContext.output(Message(result, msg.timestamp)) + } + } +} + +class CampaignProcessorTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + private val redisHost = conf.getString("redis.host").get + private val campaignProcessorCommon = new CampaignProcessorCommon(redisHost) + + override def onStart(startTime : StartTime) : Unit = { + campaignProcessorCommon.prepare() + } + + override def onNext(msg: Message): Unit = { + val (campaign_id, _, event_time) = msg.msg.asInstanceOf[(String, String, String)] + campaignProcessorCommon.execute(campaign_id, event_time) + } +} diff --git a/pom.xml b/pom.xml index dec8e7fff..4ee31ea2e 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,20 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t yahoo-low-latency-bechmarks 0.1.0 + + + releases-oss.sonatype.org + Sonatype Releases Repository + http://oss.sonatype.org/content/repositories/releases/ + + + + gearpump-shaded-repo + Vincent at Bintray + http://dl.bintray.com/fvunicorn/maven + + + UTF-8 UTF-8 @@ -19,6 +33,7 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t 0.8.2.1 0.10.1 0.10.0 + 0.7.5 2.10 2.10.4 20141113 @@ -65,6 +80,21 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t flink-connector-kafka ${flink.version} + + com.github.intel-hadoop + gearpump-core_${scala.binary.version} + ${gearpump.version} + + + com.github.intel-hadoop + gearpump-streaming_${scala.binary.version} + ${gearpump.version} + + + com.github.intel-hadoop + gearpump-external-kafka_${scala.binary.version} + ${gearpump.version} + com.yahoo.stream streaming-benchmark-common @@ -128,5 +158,6 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t storm-benchmarks flink-benchmarks spark-benchmarks + gearpump-benchmarks diff --git a/stream-bench.sh b/stream-bench.sh index d2cb3d57c..9c52cb228 100755 --- a/stream-bench.sh +++ b/stream-bench.sh @@ -18,12 +18,14 @@ SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"4"} STORM_VERSION=${STORM_VERSION:-"0.10.0"} FLINK_VERSION=${FLINK_VERSION:-"0.10.1"} SPARK_VERSION=${SPARK_VERSION:-"1.5.1"} +GEARPUMP_VERSION=${GEARPUMP_VERSION:-"0.7.5"} STORM_DIR="apache-storm-$STORM_VERSION" REDIS_DIR="redis-$REDIS_VERSION" KAFKA_DIR="kafka_$SCALA_BIN_VERSION-$KAFKA_VERSION" FLINK_DIR="flink-$FLINK_VERSION" SPARK_DIR="spark-$SPARK_VERSION-bin-hadoop2.6" +GEARPUMP_DIR="gearpump-$SCALA_BIN_VERSION-$GEARPUMP_VERSION" #Get one of the closet apache mirrors APACHE_MIRROR=$(curl 'https://www.apache.org/dyn/closer.cgi' | grep -o '[^<]*' | sed 's/<[^>]*>//g' | head -1) @@ -150,6 +152,10 @@ run() { SPARK_FILE="$SPARK_DIR.tgz" fetch_untar_file "$SPARK_FILE" "$APACHE_MIRROR/spark/spark-$SPARK_VERSION/$SPARK_FILE" + #Fetch Gearpump + GEARPUMP_FILE="$GEARPUMP_DIR.tar.gz" + fetch_untar_file "$GEARPUMP_FILE" "https://github.com/gearpump/gearpump/releases/download/$GEARPUMP_VERSION/$GEARPUMP_FILE" + elif [ "START_ZK" = "$OPERATION" ]; then start_if_needed dev_zookeeper ZooKeeper 10 "$STORM_DIR/bin/storm" dev-zookeeper @@ -203,6 +209,20 @@ run() { stop_if_needed org.apache.spark.deploy.master.Master SparkMaster stop_if_needed org.apache.spark.deploy.worker.Worker SparkSlave sleep 3 + + elif [ "START_GEARPUMP" = "$OPERATION" ]; + then + start_if_needed io.gearpump.cluster.main.Master GearpumpMaster 3 $GEARPUMP_DIR/bin/master -ip 127.0.0.1 -port 3000 + start_if_needed io.gearpump.cluster.main.Worker GearpumpWorker 3 $GEARPUMP_DIR/bin/worker + start_if_needed io.gearpump.services.main.Services GearpumpDashboard 3 $GEARPUMP_DIR/bin/services + sleep 3 + elif [ "STOP_GEARPUMP" = "$OPERATION" ]; + then + stop_if_needed io.gearpump.services.main.Services GearpumpDashboard + stop_if_needed io.gearpump.cluster.main.Worker GearpumpWorker + stop_if_needed io.gearpump.cluster.main.Master GearpumpMaster + sleep 3 + elif [ "START_LOAD" = "$OPERATION" ]; then cd data @@ -214,6 +234,7 @@ run() { cd data $LEIN run -g --configPath ../$CONF_FILE || true cd .. + elif [ "START_STORM_TOPOLOGY" = "$OPERATION" ]; then "$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology test-topo -conf $CONF_FILE @@ -222,6 +243,7 @@ run() { then "$STORM_DIR/bin/storm" kill -w 0 test-topo || true sleep 10 + elif [ "START_SPARK_PROCESSING" = "$OPERATION" ]; then "$SPARK_DIR/bin/spark-submit" --master spark://localhost:7077 --class spark.benchmark.KafkaRedisAdvertisingStream ./spark-benchmarks/target/spark-benchmarks-0.1.0.jar "$CONF_FILE" & @@ -229,6 +251,7 @@ run() { elif [ "STOP_SPARK_PROCESSING" = "$OPERATION" ]; then stop_if_needed spark.benchmark.KafkaRedisAdvertisingStream "Spark Client Process" + elif [ "START_FLINK_PROCESSING" = "$OPERATION" ]; then "$FLINK_DIR/bin/flink" run ./flink-benchmarks/target/flink-benchmarks-0.1.0.jar --confPath $CONF_FILE & @@ -243,6 +266,22 @@ run() { "$FLINK_DIR/bin/flink" cancel $FLINK_ID sleep 3 fi + + elif [ "START_GEARPUMP_APP" = "$OPERATION" ]; + then + "$GEARPUMP_DIR/bin/gear" app -jar ./gearpump-benchmarks/target/gearpump-benchmarks-0.1.0.jar gearpump.benchmark.Advertising $CONF_FILE & + sleep 5 + elif [ "STOP_GEARPUMP_APP" = "$OPERATION" ]; + then + APP_ID=`"$GEARPUMP_DIR/bin/gear" info | grep "application:" | awk -F ',' '{print $1}' | awk '{print $2}'` + if [ "$APP_ID" == "" ]; + then + echo "Could not find Gearpump application to kill" + else + "$GEARPUMP_DIR/bin/gear" kill -appid $APP_ID + sleep 5 + fi + elif [ "STORM_TEST" = "$OPERATION" ]; then run "START_ZK" @@ -288,6 +327,21 @@ run() { run "STOP_KAFKA" run "STOP_REDIS" run "STOP_ZK" + elif [ "GEARPUMP_TEST" = "$OPERATION" ]; + then + run "START_ZK" + run "START_REDIS" + run "START_KAFKA" + run "START_GEARPUMP" + run "START_GEARPUMP_APP" + run "START_LOAD" + sleep $TEST_TIME + run "STOP_LOAD" + run "STOP_GEARPUMP_APP" + run "STOP_GEARPUMP" + run "STOP_KAFKA" + run "STOP_REDIS" + run "STOP_ZK" elif [ "STOP_ALL" = "$OPERATION" ]; then run "STOP_LOAD" @@ -297,6 +351,8 @@ run() { run "STOP_FLINK" run "STOP_STORM_TOPOLOGY" run "STOP_STORM" + run "STOP_GEARPUMP_APP" + run "STOP_GEARPUMP" run "STOP_KAFKA" run "STOP_REDIS" run "STOP_ZK" @@ -322,6 +378,8 @@ run() { echo "STOP_FLINK: kill flink processes" echo "START_SPARK: run spark processes" echo "STOP_SPARK: kill spark processes" + echo "START_GEARPUMP: run gearpump processes" + echo "STOP_GEARPUMP: kill gearpump processes" echo echo "START_STORM_TOPOLOGY: run the storm test topology" echo "STOP_STORM_TOPOLOGY: kill the storm test topology" @@ -329,10 +387,13 @@ run() { echo "STOP_FLINK_PROCESSSING: kill the flink test processing" echo "START_SPARK_PROCESSING: run the spark test processing" echo "STOP_SPARK_PROCESSSING: kill the spark test processing" + echo "START_GEARPUMP_PROCESSING: run the gearpump test processing" + echo "STOP_GEARPUMP_PROCESSSING: kill the gearpump test processing" echo echo "STORM_TEST: run storm test (assumes SETUP is done)" echo "FLINK_TEST: run flink test (assumes SETUP is done)" echo "SPARK_TEST: run spark test (assumes SETUP is done)" + echo "GEARPUMP_TEST: run gearpump test (assumes SETUP is done)" echo "STOP_ALL: stop everything" echo echo "HELP: print out this message"