diff --git a/docker-compose.yml b/docker-compose.yml
index 07a2a09..491475a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -39,8 +39,40 @@ services:
ports:
- "0.0.0.0:8082:8082"
+ kafka-create-topics:
+ image: confluentinc/cp-kafka:5.4.0
+ depends_on:
+ - kafka
+ command: "bash -c 'echo Waiting for Kafka to be ready... && \
+ cub kafka-ready -b kafka:29092 1 20 && \
+ kafka-topics --create --topic bench1_feature --if-not-exists --zookeeper zookeeper:2181 --partitions 37 --replication-factor 1 && \
+ kafka-topics --create --topic bench1_makinage --if-not-exists --zookeeper zookeeper:2181 --partitions 37 --replication-factor 1 && \
+ kafka-topics --create --topic bench1_kafka_dsl --if-not-exists --zookeeper zookeeper:2181 --partitions 37 --replication-factor 1 && \
+ sleep infinity'"
+ environment:
+ KAFKA_BROKER_ID: ignored
+ KAFKA_ZOOKEEPER_CONNECT: ignored
+
makinage-benchmark:
image: makinage-benchmark:latest
depends_on:
- kafka
- command: makinage --config /opt/makinage/config.yaml
\ No newline at end of file
+ - kafka-create-topics
+ command: makinage --config /opt/makinage/config.yaml
+
+ kafka-dsl-benchmark1:
+ image: kafka-dsl-benchmark1:latest
+ depends_on:
+ - kafka
+ - kafka-create-topics
+ environment:
+ APPLICATION_ID: kafka-dsl-benchmark1
+
+ kafka-dsl-benchmark1-scaled:
+ image: kafka-dsl-benchmark1:latest
+ depends_on:
+ - kafka
+ - kafka-create-topics
+ environment:
+ APPLICATION_ID: kafka-dsl-benchmark1-scaled
+
diff --git a/java/.gitignore b/java/.gitignore
new file mode 100644
index 0000000..02cf2e1
--- /dev/null
+++ b/java/.gitignore
@@ -0,0 +1,4 @@
+.idea/
+kafka-dsl-benchmark1.iml
+target/
+
diff --git a/java/kafka-dsl-benchmark1/Dockerfile b/java/kafka-dsl-benchmark1/Dockerfile
new file mode 100644
index 0000000..3d84023
--- /dev/null
+++ b/java/kafka-dsl-benchmark1/Dockerfile
@@ -0,0 +1,5 @@
+FROM openjdk:8-jre
+
+ADD ./target/kafka-dsl-benchmark1-*-with-dependencies.jar /usr/src/kafka-dsl-benchmark1.jar
+WORKDIR /usr/src
+ENTRYPOINT exec java -jar /usr/src/kafka-dsl-benchmark1.jar ${APPLICATION_ID}
diff --git a/java/kafka-dsl-benchmark1/pom.xml b/java/kafka-dsl-benchmark1/pom.xml
new file mode 100644
index 0000000..d26c61a
--- /dev/null
+++ b/java/kafka-dsl-benchmark1/pom.xml
@@ -0,0 +1,78 @@
+
+
+ 4.0.0
+ com.makinage.kafka.dsl.benchmark
+ kafka-dsl-benchmark1
+ 1.0-SNAPSHOT
+
+ UTF-8
+ 1.8
+ 1.8
+
+
+
+ com.google.code.gson
+ gson
+ 2.8.6
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.13.3
+
+
+ org.apache.logging.log4j
+ log4j-api
+ 2.13.3
+
+
+ org.slf4j
+ slf4j-simple
+ 1.7.21
+
+
+ org.apache.kafka
+ kafka-clients
+ 2.4.0
+
+
+ org.apache.kafka
+ kafka-streams
+ 2.4.0
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+
+
+ maven-assembly-plugin
+
+
+
+ com.makinage.kafka.dsl.benchmark.Bench1
+
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Bench1.java b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Bench1.java
new file mode 100644
index 0000000..0c71799
--- /dev/null
+++ b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Bench1.java
@@ -0,0 +1,121 @@
+package com.makinage.kafka.dsl.benchmark;
+
+import com.google.gson.Gson;
+import com.makinage.kafka.dsl.benchmark.utils.FeatureTimestampExtractor;
+import com.makinage.kafka.dsl.benchmark.utils.JsonSerde;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Date;
+import java.util.Properties;
+
+
+public class Bench1 {
+
+ static final Logger LOG = LogManager.getLogger(Bench1.class.getName());
+
+ static final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+
+ static final Gson gson = new Gson();
+
+ public static void main(final String[] args) throws Exception, StreamsException {
+
+ String applicationId = "kafka-dsl-benchmark1";
+ if (args.length == 1 && !args[0].isEmpty())
+ applicationId = args[0];
+
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, applicationId + "-client");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
+ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FeatureTimestampExtractor.class);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/bench1");
+ streamsConfiguration.put(ProducerConfig.ACKS_CONFIG, "all");
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 50);
+
+ final KafkaStreams streams = createStreams(streamsConfiguration);
+ streams.cleanUp();
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ streams.close();
+ } catch (final Exception e) {
+ // ignored
+ }
+ }));
+ }
+
+ static KafkaStreams createStreams(final Properties streamsConfiguration) {
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream featuresStream =
+ builder.stream("bench1_feature", Consumed.with(Serdes.String(), JsonSerde.FeatureSerde()));
+
+ featuresStream.groupByKey()
+ .windowedBy(TimeWindows.of(Duration.ofHours(2)).grace(Duration.ofMinutes(0)))
+ .aggregate(
+ Mean::new,
+ (group_id, newValue, mean) -> {
+
+ if (mean.event_id == -1L) {
+ // only store the first one
+ mean.event_id = newValue.event_id;
+ }
+ mean.value = ((mean.value * mean.count) + newValue.data.field2) / (mean.count+1);
+ mean.count++;
+
+ LOG.debug(group_id
+ + " / " + dateFormat.format(new Date(newValue.getEventTime().toEpochMilli()))
+ + " / new value=" + newValue.data.field2
+ + " / " + mean.toString() + "\n");
+
+ return mean;
+ },
+ Materialized.with(Serdes.String(), JsonSerde.MeanSerde())
+ )
+ .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
+ .toStream()
+ .map((groupIddWindowed, mean) -> {
+
+ if (mean == null)
+ return null;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(groupIddWindowed.key()
+ + " / win[" + dateFormat.format(new Date(groupIddWindowed.window().start()))
+ + "," + dateFormat.format(new Date(groupIddWindowed.window().end())) + "]"
+ + " / " + mean.toString() + "\n");
+
+ }
+
+ FeatureOutput feature = new FeatureOutput(mean.event_id,
+ groupIddWindowed.window().start(),
+ groupIddWindowed.key(),
+ mean.value);
+
+ LOG.info("Stream to bench1_kafka_dsl" + " - " + gson.toJson(groupIddWindowed.key()) + "/ "
+ + gson.toJson(feature) + "\n");
+
+ return new KeyValue<>(groupIddWindowed.key(), gson.toJson(feature));
+ })
+ .to("bench1_kafka_dsl");
+
+ return new KafkaStreams(builder.build(), streamsConfiguration);
+ }
+}
diff --git a/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Data.java b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Data.java
new file mode 100644
index 0000000..0920928
--- /dev/null
+++ b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Data.java
@@ -0,0 +1,23 @@
+package com.makinage.kafka.dsl.benchmark;
+
+public class Data {
+
+ public Data(Long field1, Double field2, Boolean field3) {
+ this.field1 = field1;
+ this.field2 = field2;
+ this.field3 = field3;
+ }
+
+ public Long field1;
+ public Double field2;
+ public Boolean field3;
+
+ @Override
+ public String toString() {
+ return "Data{" +
+ "field1=" + field1 +
+ ", field2=" + field2 +
+ ", field3=" + field3 +
+ '}';
+ }
+}
diff --git a/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Feature.java b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Feature.java
new file mode 100644
index 0000000..285a640
--- /dev/null
+++ b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Feature.java
@@ -0,0 +1,43 @@
+package com.makinage.kafka.dsl.benchmark;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+
+public class Feature {
+
+ static DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+
+ public Feature(Long event_id, String event_time, String group_id, Data data) {
+ this.event_id = event_id;
+ this.event_time = event_time;
+ this.group_id = group_id;
+ this.data = data;
+ }
+
+ public Long event_id;
+ public String event_time;
+ public String group_id;
+ public Data data;
+
+ public Instant getEventTime() {
+
+ try {
+ Instant instant = df.parse(event_time).toInstant();
+ return instant;
+ } catch (ParseException exception) {
+ return Instant.EPOCH;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Feature{" +
+ "event_id=" + event_id +
+ ", event_time='" + event_time + '\'' +
+ ", group_id='" + group_id + '\'' +
+ ", data=" + data +
+ '}';
+ }
+}
diff --git a/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/FeatureOutput.java b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/FeatureOutput.java
new file mode 100644
index 0000000..28f99d8
--- /dev/null
+++ b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/FeatureOutput.java
@@ -0,0 +1,26 @@
+package com.makinage.kafka.dsl.benchmark;
+
+public class FeatureOutput {
+
+ public FeatureOutput(Long event_id, Long event_time, String group_id, Double field2_mean) {
+ this.event_id = event_id;
+ this.event_time = event_time;
+ this.group_id = group_id;
+ this.field2_mean = field2_mean;
+ }
+
+ public Long event_id;
+ public Long event_time;
+ public String group_id;
+ public Double field2_mean;
+
+ @Override
+ public String toString() {
+ return "FeatureOutput{" +
+ "event_id=" + event_id +
+ ", event_time=" + event_time +
+ ", group_id='" + group_id + '\'' +
+ ", field2_mean=" + field2_mean +
+ '}';
+ }
+}
diff --git a/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Mean.java b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Mean.java
new file mode 100644
index 0000000..b6dd33e
--- /dev/null
+++ b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/Mean.java
@@ -0,0 +1,31 @@
+package com.makinage.kafka.dsl.benchmark;
+
+public class Mean {
+
+ public Mean() {
+ value = new Double(0);
+ count = new Long(0);
+ event_id = new Long(-1);
+
+ }
+
+ public Mean(Double mean, Long count) {
+ this.value = mean;
+ this.count = count;
+ this.event_id = -1L;
+ }
+
+ public Double value;
+ public Long count;
+ public Long event_id;
+
+
+ @Override
+ public String toString() {
+ return "Mean{" +
+ "value=" + value +
+ ", count=" + count +
+ ", event_id=" + event_id +
+ '}';
+ }
+}
diff --git a/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/utils/FeatureTimestampExtractor.java b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/utils/FeatureTimestampExtractor.java
new file mode 100644
index 0000000..c37100e
--- /dev/null
+++ b/java/kafka-dsl-benchmark1/src/main/java/com/makinage/kafka/dsl/benchmark/utils/FeatureTimestampExtractor.java
@@ -0,0 +1,21 @@
+package com.makinage.kafka.dsl.benchmark.utils;
+
+import com.makinage.kafka.dsl.benchmark.Feature;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+public class FeatureTimestampExtractor implements TimestampExtractor {
+
+ static final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+
+ @Override
+ public long extract(ConsumerRecord