Skip to content

Commit

Permalink
add gearpump benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
huafengw committed Jan 28, 2016
1 parent a140e1c commit b27b901
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 0 deletions.
116 changes: 116 additions & 0 deletions gearpump-benchmarks/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2015, Yahoo Inc.
Licensed under the terms of the Apache License 2.0. Please see LICENSE file in the project root for terms.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yahoo-low-latency-bechmarks</artifactId>
<groupId>com.yahoo.stream</groupId>
<version>0.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>gearpump-benchmarks</artifactId>

<dependencies>
<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-streaming_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-external-kafka_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</dependency>
<dependency>
<groupId>com.yahoo.stream</groupId>
<artifactId>streaming-benchmark-common</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path</javacArg>
</javacArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<minimizeJar>false</minimizeJar>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
5 changes: 5 additions & 0 deletions gearpump-benchmarks/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
gearpump {
serializers {
"scala.Tuple7" = ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package gearpump.benchmark

import akka.actor.ActorSystem
import benchmark.common.Utils
import benchmark.common.advertising.{CampaignProcessorCommon, RedisAdCampaignCache}
import io.gearpump.Message
import io.gearpump.partitioner.{Partitioner, UnicastPartitioner, HashPartitioner}
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.streaming.kafka.lib.StringMessageDecoder
import io.gearpump.streaming.{StreamApplication, Processor}
import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import io.gearpump.streaming.source.{DefaultTimeStampFilter, DataSourceProcessor}
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.util.{AkkaApp, Graph}
import org.json.JSONObject
import io.gearpump.util.Graph.Node
import scala.collection.JavaConverters._

object Advertising extends AkkaApp{

def application(args: Array[String], system: ActorSystem) : StreamApplication = {
implicit val actorSystem = system
val commonConfig = Utils.findAndReadConfigFile(args(0), true).asInstanceOf[java.util.Map[String, Any]]

val cores = commonConfig.get("process.cores").asInstanceOf[Int]
val topic = commonConfig.get("kafka.topic").asInstanceOf[String]
val partitions = commonConfig.get("kafka.partitions").asInstanceOf[Int]
val redisHost = commonConfig.get("redis.host").asInstanceOf[String]

val zookeeperHosts = commonConfig.get("zookeeper.servers").asInstanceOf[java.util.List[String]] match {
case l: java.util.List[String] => l.asScala.toSeq
case other => throw new ClassCastException(other + " not a List[String]")
}
val zookeeperPort = commonConfig.get("zookeeper.port").asInstanceOf[Int]
val zookeeperConnect = zookeeperHosts.map(_ + ":" + zookeeperPort).mkString(",")

val kafkaHosts = commonConfig.get("kafka.brokers").asInstanceOf[java.util.List[String]] match {
case l: java.util.List[String] => l.asScala.toSeq
case other => throw new ClassCastException(other + " not a List[String]")
}
val kafkaPort = commonConfig.get("kafka.port").asInstanceOf[Int]
val brokerList = kafkaHosts.map(_ + ":" + kafkaPort).mkString(",")

val parallel = Math.max(1, cores / 7)
val gearConfig = UserConfig.empty.withString("redis.host", redisHost)
val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList)
val source = new KafkaSource(topic, zookeeperConnect, offsetStorageFactory, new StringMessageDecoder, new DefaultTimeStampFilter)
val sourceProcessor = DataSourceProcessor(source, partitions)
val deserializer = Processor[DeserializeTask](parallel)
val filter = Processor[EventFilterTask](parallel)
val projection = Processor[EventProjectionTask](parallel)
val join = Processor[RedisJoinTask](parallel)
val campaign = Processor[CampaignProcessorTask](parallel * 2)
val partitioner = new AdPartitioner

val graph = Graph(sourceProcessor ~ new HashPartitioner ~> deserializer ~> filter ~> projection ~> join ~ partitioner ~> campaign)
StreamApplication("Advertising", graph, gearConfig)
}

override def main(akkaConf: Advertising.Config, args: Array[String]): Unit = {
val context = ClientContext(akkaConf)
context.submit(application(args, context.system))
context.close()
}

override def help: Unit = {}
}

class AdPartitioner extends UnicastPartitioner {
override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
(msg.msg.asInstanceOf[(String, String, String)]._1.hashCode & Integer.MAX_VALUE) % partitionNum
}
}

class DeserializeTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
override def onNext(msg : Message) : Unit = {
val jsonObj = new JSONObject(msg.msg.asInstanceOf[String])
val tuple = (
jsonObj.getString("user_id"),
jsonObj.getString("page_id"),
jsonObj.getString("ad_id"),
jsonObj.getString("ad_type"),
jsonObj.getString("event_type"),
jsonObj.getString("event_time"),
jsonObj.getString("ip_address")
)
taskContext.output(Message(tuple, msg.timestamp))
}
}

class EventFilterTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
override def onNext(msg: Message): Unit = {
val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)]
if(tuple._5 == "view") {
taskContext.output(msg)
}
}
}

class EventProjectionTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
override def onNext(msg: Message): Unit = {
val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)]
taskContext.output(Message((tuple._3, tuple._6), msg.timestamp))
}
}

class RedisJoinTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private val redisHost = conf.getString("redis.host").get
private val redisAdCampaignCache = new RedisAdCampaignCache(redisHost)

override def onStart(startTime : StartTime) : Unit = {
redisAdCampaignCache.prepare()
}

override def onNext(msg: Message): Unit = {
val (ad_id, event_time) = msg.msg.asInstanceOf[(String, String)]
val campaign_id = redisAdCampaignCache.execute(ad_id)
if(campaign_id != null) {
val result = (campaign_id, ad_id, event_time)
taskContext.output(Message(result, msg.timestamp))
}
}
}

class CampaignProcessorTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private val redisHost = conf.getString("redis.host").get
private val campaignProcessorCommon = new CampaignProcessorCommon(redisHost)

override def onStart(startTime : StartTime) : Unit = {
campaignProcessorCommon.prepare()
}

override def onNext(msg: Message): Unit = {
val (campaign_id, _, event_time) = msg.msg.asInstanceOf[(String, String, String)]
campaignProcessorCommon.execute(campaign_id, event_time)
}
}
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,28 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
<name>yahoo-low-latency-bechmarks</name>
<version>0.1.0</version>

<repositories>
<repository>
<id>releases-oss.sonatype.org</id>
<name>Sonatype Releases Repository</name>
<url>http://oss.sonatype.org/content/repositories/releases/</url>
</repository>

<repository>
<id>gearpump-shaded-repo</id>
<name>Vincent at Bintray</name>
<url>http://dl.bintray.com/fvunicorn/maven</url>
</repository>
</repositories>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spark.version>1.5.1</spark.version>
<kafka.version>0.8.2.1</kafka.version>
<flink.version>0.10.1</flink.version>
<storm.version>0.10.0</storm.version>
<gearpump.version>0.7.5</gearpump.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.version>2.10.4</scala.version>
<json.version>20141113</json.version>
Expand Down Expand Up @@ -65,6 +80,21 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-core_${scala.binary.version}</artifactId>
<version>${gearpump.version}</version>
</dependency>
<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-streaming_${scala.binary.version}</artifactId>
<version>${gearpump.version}</version>
</dependency>
<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-external-kafka_${scala.binary.version}</artifactId>
<version>${gearpump.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.stream</groupId>
<artifactId>streaming-benchmark-common</artifactId>
Expand Down Expand Up @@ -128,5 +158,6 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
<module>storm-benchmarks</module>
<module>flink-benchmarks</module>
<module>spark-benchmarks</module>
<module>gearpump-benchmarks</module>
</modules>
</project>
Loading

0 comments on commit b27b901

Please sign in to comment.