Skip to content

Commit

Permalink
Extended the component with the ability to input your own topic names…
Browse files Browse the repository at this point in the history
… for Kafka Consumers and Producers.
  • Loading branch information
ArisKonidaris committed Apr 15, 2020
1 parent f57bbde commit 7528541
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 15 deletions.
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ $ kafka/bin/kafka-topics.sh --create --bootstrap-server ip:port --partitions 36
$ kafka/bin/kafka-topics.sh --create --bootstrap-server ip:port --partitions 36 --replication-factor 4 --topic psMessages
$ flink/bin/flink run /path/to/Online_Machine_Learning_via_Flink/target/oml1.2-0.0.1-SNAPSHOT.jar \
--parallelism 36 \
--parallelism 16 \
--trainingDataTopic <training data topic name> \
--trainingDataAddr <training data BrokerList> \
--forecastingDataTopic <forecasting data topic name> \
--forecastingDataAddr <forecasting data BrokerList> \
--requestsAddr <requests BrokerList>
--responsesAddr <responses BrokerList>
--predictionsAddr <predictions BrokerList>
--psMessagesAddr <psMessages BrokerList> \
--requestsTopic <requests data topic name> \
--requestsAddr <requests BrokerList> \
--responsesTopic <responses topic name> \
--responsesAddr <responses BrokerList> \
--predictionsTopic <predictions topic name> \
--predictionsAddr <predictions BrokerList> \
--psMessagesTopic <psMessages topic name> \
--psMessagesAddr <psMessages BrokerList>
```
54 changes: 44 additions & 10 deletions src/main/scala/oml/OML_Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package oml

import java.util.Properties

import oml.logic.{ParameterServer, Predictor, Trainer}
import oml.message.mtypes.{ControlMessage, workerMessage}
import oml.mlAPI.mlworkers.MLNodeGenerator
Expand All @@ -33,13 +35,22 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{ConnectedStreams, _}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.api.common.serialization.{SimpleStringSchema, TypeInformationSerializationSchema}
import org.apache.flink.util.Collector

/**
* Interactive Online Machine Learning Flink Streaming Job.
*/
object OML_Job {

def createProperties(brokerList: String, group_id: String)(implicit params: ParameterTool): Properties = {
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", params.get(brokerList, "localhost:9092"))
properties.setProperty("group.flink_worker_id", group_id)
properties
}

val queryResponse: OutputTag[QueryResponse] = OutputTag[QueryResponse]("QueryResponse")

def main(args: Array[String]) {
Expand All @@ -60,24 +71,38 @@ object OML_Job {

/** The parameter server messages. */
val psMessages: DataStream[ControlMessage] = env
.addSource(KafkaUtils.KafkaTypeConsumer[ControlMessage]("psMessages"))
.addSource(new FlinkKafkaConsumer[ControlMessage](
params.get("psMessagesTopic", "psMessages"),
new TypeInformationSerializationSchema(createTypeInformation[ControlMessage], env.getConfig),
createProperties("psMessagesAddr", "psMessagesConsumer"))
.setStartFromLatest())
.name("FeedBackLoopSource")

/** The incoming training data. */
val trainingSource: DataStream[DataInstance] = env.addSource(
KafkaUtils.KafkaStringConsumer("trainingData")
).flatMap(DataInstanceParser())
new FlinkKafkaConsumer[String](params.get("trainingDataTopic", "trainingData"),
new SimpleStringSchema(),
createProperties("trainingDataAddr", "trainingDataConsumer"))
.setStartFromEarliest())
.flatMap(DataInstanceParser())
.name("TrainingSource")

/** The incoming forecasting data. */
val forecastingSource: DataStream[DataInstance] = env.addSource(
KafkaUtils.KafkaStringConsumer("forecastingData")
).flatMap(DataInstanceParser())
new FlinkKafkaConsumer[String](params.get("forecastingDataTopic", "forecastingData"),
new SimpleStringSchema(),
createProperties("forecastingDataAddr", "forecastingDataConsumer"))
.setStartFromEarliest())
.flatMap(DataInstanceParser())
.name("ForecastingSource")

/** The incoming requests */
val requests: DataStream[Request] = env.addSource(
KafkaUtils.KafkaStringConsumer("requests")
).flatMap(RequestParser())
new FlinkKafkaConsumer[String](params.get("requestsTopic", "requests"),
new SimpleStringSchema(),
createProperties("requestsAddr", "requestsConsumer"))
.setStartFromEarliest())
.flatMap(RequestParser())
.name("RequestSource")


Expand Down Expand Up @@ -127,7 +152,10 @@ object OML_Job {

/** The Kafka iteration for emulating parameter server messages. */
coordinator
.addSink(KafkaUtils.kafkaTypeProducer[ControlMessage]("psMessages"))
.addSink(new FlinkKafkaProducer[ControlMessage](
params.get("psMessagesAddr", "localhost:9092"), // broker list
params.get("psMessagesTopic", "psMessages"), // target topic
new TypeInformationSerializationSchema(createTypeInformation[ControlMessage], env.getConfig)))
.name("FeedbackLoop")


Expand Down Expand Up @@ -185,13 +213,19 @@ object OML_Job {
/** A Kafka sink for the predictions. */
predictionStream
.map(x => x.toString)
.addSink(KafkaUtils.kafkaStringProducer("predictions"))
.addSink(new FlinkKafkaProducer[String](
params.get("predictionsAddr", "localhost:9092"), // broker list
params.get("predictionsTopic", "predictions"), // target topic
new SimpleStringSchema()))
.name("PredictionsSink")

/** A Kafka Sink for the query responses. */
worker.getSideOutput(queryResponse)
.map(x => x.toString)
.addSink(KafkaUtils.kafkaStringProducer("responses"))
.addSink(new FlinkKafkaProducer[String](
params.get("responsesAddr", "localhost:9092"), // broker list
params.get("responsesTopic", "responses"), // target topic
new SimpleStringSchema()))
.name("ResponsesSink")


Expand Down

0 comments on commit 7528541

Please sign in to comment.