Skip to content

Commit

Permalink
All Kafka Connectors read and write String values. THis is done for c…
Browse files Browse the repository at this point in the history
…ross platform compatibility reasons.
  • Loading branch information
ArisKonidaris committed Apr 4, 2020
1 parent e536d12 commit f57bbde
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 57 deletions.
12 changes: 0 additions & 12 deletions oml1.2.iml
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,6 @@
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" exported="" name="scala-sdk-2.12.8" level="application" />
<orderEntry type="library" name="Maven: org.jetbrains:annotations:18.0.0" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware.kryo:kryo:2.24.0" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware.minlog:minlog:1.2" level="project" />
<orderEntry type="library" name="Maven: org.objenesis:objenesis:2.1" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.15" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-math3:3.5" level="project" />
<orderEntry type="library" name="Maven: com.twitter:chill-java:0.7.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro:1.8.2" level="project" />
<orderEntry type="library" name="Maven: com.github.fommil.netlib:core:1.1.2" level="project" />
<orderEntry type="library" name="Maven: net.sourceforge.f2j:arpack_combined_all:0.1" level="project" />
<orderEntry type="library" name="Maven: net.sf.opencsv:opencsv:2.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-scala_2.12:1.10.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-core:1.10.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-annotations:1.10.0" level="project" />
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/oml/POJOs/DataInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
*/
public class DataInstance implements Serializable {

/**
* A unique id for the data point needed only for the prediction of an unlabeled data point.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public Long id;
public Long id; // A unique id for a data point.

public List<Double> numericFeatures; // The numerical features of the data point.

Expand Down Expand Up @@ -125,7 +122,7 @@ public boolean isValid() {
(numericFeatures == null || numericFeatures.size() == 0) &&
(discreteFeatures == null || discreteFeatures.size() == 0)
) return false;
if (operation.equals("forecasting") && target != null && id == null) return false;
if (operation.equals("forecasting") && target != null) return false;
return true;
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/oml/POJOs/Prediction.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ public class Prediction implements Serializable {

Integer mlPipeline;

Long dataInstanceId;
DataInstance dataPoint;

Double prediction;

public Prediction() {
}

public Prediction(Integer mlPipeline, Long dataInstanceId, Double prediction) {
public Prediction(Integer mlPipeline, DataInstance dataPoint, Double prediction) {
this.mlPipeline = mlPipeline;
this.dataInstanceId = dataInstanceId;
this.dataPoint = dataPoint;
this.prediction = prediction;
}

Expand Down Expand Up @@ -51,12 +51,12 @@ public void setPrediction(Double prediction) {
this.prediction = prediction;
}

public Long getDataInstanceId() {
return dataInstanceId;
public DataInstance getDataPoint() {
return dataPoint;
}

public void setDataInstanceId(Long dataInstanceId) {
this.dataInstanceId = dataInstanceId;
public void setDataInstanceId(DataInstance dataPoint) {
this.dataPoint = dataPoint;
}

}
43 changes: 15 additions & 28 deletions src/main/scala/oml/OML_Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,17 @@ import oml.logic.{ParameterServer, Predictor, Trainer}
import oml.message.mtypes.{ControlMessage, workerMessage}
import oml.mlAPI.mlworkers.MLNodeGenerator
import oml.utils.{CommonUtils, KafkaUtils}
import oml.utils.KafkaUtils.createProperties
import oml.POJOs.{DataInstance, Prediction, QueryResponse, Request}
import oml.math.Point
import oml.parameters.ParameterDescriptor
import oml.utils.deserializers.{DataInstanceDeserializer, RequestDeserializer}
import oml.utils.parsers.{DataInstanceParser, RequestParser}
import oml.utils.parsers.dataStream.DataPointParser
import oml.utils.parsers.requestStream.PipelineMap
import oml.utils.partitioners.random_partitioner
import oml.utils.serializers.{PredictionSerializer, QueryResponseSerializer}
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{ConnectedStreams, _}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.util.Collector

/**
Expand All @@ -61,32 +58,26 @@ object OML_Job {
////////////////////////////////////////////// Kafka Connectors ////////////////////////////////////////////////////


/** The parameter server messages */
/** The parameter server messages. */
val psMessages: DataStream[ControlMessage] = env
.addSource(KafkaUtils.KafkaTypeConsumer[ControlMessage]("psMessages"))

/** The incoming training data */
/** The incoming training data. */
val trainingSource: DataStream[DataInstance] = env.addSource(
new FlinkKafkaConsumer[DataInstance]("trainingData",
new DataInstanceDeserializer(true),
createProperties("trainingDataAddr", "trainingDataConsumer"))
.setStartFromEarliest())
KafkaUtils.KafkaStringConsumer("trainingData")
).flatMap(DataInstanceParser())
.name("TrainingSource")

/** The incoming forecasting data */
/** The incoming forecasting data. */
val forecastingSource: DataStream[DataInstance] = env.addSource(
new FlinkKafkaConsumer[DataInstance]("forecastingData",
new DataInstanceDeserializer(true),
createProperties("forecastingDataAddr", "forecastingDataConsumer"))
.setStartFromEarliest())
KafkaUtils.KafkaStringConsumer("forecastingData")
).flatMap(DataInstanceParser())
.name("ForecastingSource")

/** The incoming requests */
val requests: DataStream[Request] = env.addSource(
new FlinkKafkaConsumer[Request]("requests",
new RequestDeserializer(true),
createProperties("requestsAddr", "requestsConsumer"))
.setStartFromEarliest())
KafkaUtils.KafkaStringConsumer("requests")
).flatMap(RequestParser())
.name("RequestSource")


Expand Down Expand Up @@ -192,19 +183,15 @@ object OML_Job {


/** A Kafka sink for the predictions. */
predictionStream.addSink(
new FlinkKafkaProducer[Prediction](params.get("predictionsAddr", "localhost:9092"),
"predictions",
new PredictionSerializer))
predictionStream
.map(x => x.toString)
.addSink(KafkaUtils.kafkaStringProducer("predictions"))
.name("PredictionsSink")

/** A Kafka Sink for the query responses. */
worker.getSideOutput(queryResponse)
.addSink(
new FlinkKafkaProducer[QueryResponse](params.get("responsesAddr", "localhost:9092"),
"responses",
new QueryResponseSerializer)
).setParallelism(1)
.map(x => x.toString)
.addSink(KafkaUtils.kafkaStringProducer("responses"))
.name("ResponsesSink")


Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/oml/mlAPI/mlworkers/worker/MLPredictor.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package oml.mlAPI.mlworkers.worker

import oml.POJOs.{DataInstance, Prediction, QueryResponse, Request}
import oml.POJOs.{DataInstance, Prediction, Request}
import oml.StarTopologyAPI.{Inject, MergeOp, QueryOp, ReceiveTuple}
import oml.mlAPI.Investigator
import oml.math.{DenseVector, Point, UnlabeledPoint}
Expand All @@ -10,7 +10,6 @@ import oml.parameters.{LearningParameters, ParameterDescriptor}

import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

class MLPredictor() extends Serializable with MLWorkerRemote {

Expand Down Expand Up @@ -101,7 +100,7 @@ class MLPredictor() extends Serializable with MLWorkerRemote {
}
}

querier.sendPrediction(new Prediction(nodeId, data.id, prediction))
querier.sendPrediction(new Prediction(nodeId, data, prediction))

}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/oml/utils/DefaultJobParameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package oml.utils

object DefaultJobParameters {
val defaultJobName: String = "OML_job_1"
val defaultParallelism: String = "36"
val defaultParallelism: String = "16"
val defaultInputFile: String = "hdfs://clu01.softnet.tuc.gr:8020/user/vkonidaris/lin_class_mil_e10.txt"
val defaultOutputFile: String = "hdfs://clu01.softnet.tuc.gr:8020/user/vkonidaris/output"
}
2 changes: 1 addition & 1 deletion src/main/scala/oml/utils/KafkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object KafkaUtils {
: FlinkKafkaConsumerBase[T] = {
new FlinkKafkaConsumer[T](topic,
new TypeInformationSerializationSchema(createTypeInformation[T], env.getConfig),
createProperties(topic + "Addr", topic + "_Consumer"))
createProperties(topic + "Addr", topic + "Consumer"))
.setStartFromLatest()
}

Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/oml/utils/parsers/DataInstanceParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package oml.utils.parsers

import oml.POJOs.DataInstance
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.util.Collector

case class DataInstanceParser() extends RichFlatMapFunction[String, DataInstance] {

private val mapper: ObjectMapper = new ObjectMapper()

override def flatMap(record: String, collector: Collector[DataInstance]): Unit = {
val dataInstance = mapper.readValue(record, classOf[DataInstance])
if (dataInstance.isValid) collector.collect(dataInstance)
}

}
17 changes: 17 additions & 0 deletions src/main/scala/oml/utils/parsers/RequestParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package oml.utils.parsers

import oml.POJOs.Request
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.util.Collector

case class RequestParser() extends RichFlatMapFunction[String, Request] {

private val mapper: ObjectMapper = new ObjectMapper()

override def flatMap(record: String, collector: Collector[Request]): Unit = {
val request = mapper.readValue(record, classOf[Request])
if (request.isValid) collector.collect(request)
}

}

0 comments on commit f57bbde

Please sign in to comment.