diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/streaming/BaseDatasetProcessFunction.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/streaming/BaseDatasetProcessFunction.scala
index fede2b54..9c454ec0 100644
--- a/dataset-registry/src/main/scala/org/sunbird/obsrv/streaming/BaseDatasetProcessFunction.scala
+++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/streaming/BaseDatasetProcessFunction.scala
@@ -28,11 +28,11 @@ trait SystemEventHandler {
}
private def getTime(timespans: Map[String, AnyRef], producer: Producer): Option[Long] = {
- timespans.get(producer.toString).map(f => f.asInstanceOf[Long])
+ timespans.get(producer.toString).map(f => f.asInstanceOf[Number].longValue())
}
private def getStat(obsrvMeta: Map[String, AnyRef], stat: Stats): Option[Long] = {
- obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Long])
+ obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Number].longValue())
}
def getError(error: ErrorConstants.Error, producer: Producer, functionalError: FunctionalError): Option[ErrorLog] = {
diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala b/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala
index 370353c7..d68b924a 100644
--- a/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala
+++ b/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala
@@ -46,6 +46,40 @@ class MapDeserializationSchema extends KafkaRecordDeserializationSchema[mutable.
}
+class TopicDeserializationSchema extends KafkaRecordDeserializationSchema[mutable.Map[String, AnyRef]] {
+
+ private val serialVersionUID = -3224825136576915426L
+
+ override def getProducedType: TypeInformation[mutable.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[mutable.Map[String, AnyRef]])
+
+ override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[mutable.Map[String, AnyRef]]): Unit = {
+ val msg = try {
+ val event = JSONUtil.deserialize[Map[String, AnyRef]](record.value())
+ mutable.Map[String, AnyRef](
+ "dataset" -> record.topic(),
+ "event" -> event
+ )
+ } catch {
+ case _: Exception =>
+ mutable.Map[String, AnyRef](Constants.INVALID_JSON -> new String(record.value, "UTF-8"))
+ }
+ initObsrvMeta(msg, record)
+ out.collect(msg)
+ }
+
+ private def initObsrvMeta(msg: mutable.Map[String, AnyRef], record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
+ if (!msg.contains("obsrv_meta")) {
+ msg.put("obsrv_meta", Map(
+ "syncts" -> record.timestamp(),
+ "processingStartTime" -> System.currentTimeMillis(),
+ "flags" -> Map(),
+ "timespans" -> Map(),
+ "error" -> Map()
+ ))
+ }
+ }
+}
+
class StringDeserializationSchema extends KafkaRecordDeserializationSchema[String] {
private val serialVersionUID = -3224825136576915426L
diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseStreamTask.scala b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseStreamTask.scala
index 8ebdb8a7..bdc897da 100644
--- a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseStreamTask.scala
+++ b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseStreamTask.scala
@@ -38,6 +38,13 @@ abstract class BaseStreamTask[T] extends BaseStreamTaskSink[T] {
.rebalance()
}
+ def getTopicMapDataStream(env: StreamExecutionEnvironment, config: BaseJobConfig[T], kafkaTopics: List[String],
+ consumerSourceName: String, kafkaConnector: FlinkKafkaConnector): DataStream[mutable.Map[String, AnyRef]] = {
+ env.fromSource(kafkaConnector.kafkaTopicMapSource(kafkaTopics), WatermarkStrategy.noWatermarks[mutable.Map[String, AnyRef]](), consumerSourceName)
+ .uid(consumerSourceName).setParallelism(config.kafkaConsumerParallelism)
+ .rebalance()
+ }
+
def getStringDataStream(env: StreamExecutionEnvironment, config: BaseJobConfig[T], kafkaConnector: FlinkKafkaConnector): DataStream[String] = {
env.fromSource(kafkaConnector.kafkaStringSource(config.inputTopic()), WatermarkStrategy.noWatermarks[String](), config.inputConsumer())
.uid(config.inputConsumer()).setParallelism(config.kafkaConsumerParallelism)
diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/FlinkKafkaConnector.scala b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/FlinkKafkaConnector.scala
index 508e1e7c..39552dd7 100644
--- a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/FlinkKafkaConnector.scala
+++ b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/FlinkKafkaConnector.scala
@@ -47,6 +47,15 @@ class FlinkKafkaConnector(config: BaseJobConfig[_]) extends Serializable {
.build()
}
+ def kafkaTopicMapSource(kafkaTopics: List[String]): KafkaSource[mutable.Map[String, AnyRef]] = {
+ KafkaSource.builder[mutable.Map[String, AnyRef]]()
+ .setTopics(kafkaTopics.asJava)
+ .setDeserializer(new TopicDeserializationSchema)
+ .setProperties(config.kafkaConsumerProperties())
+ .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
+ .build()
+ }
+
def kafkaMapDynamicSink(): KafkaSink[mutable.Map[String, AnyRef]] = {
KafkaSink.builder[mutable.Map[String, AnyRef]]()
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
diff --git a/pipeline/cache-indexer/pom.xml b/pipeline/cache-indexer/pom.xml
index 7d9ed5a8..36d76208 100644
--- a/pipeline/cache-indexer/pom.xml
+++ b/pipeline/cache-indexer/pom.xml
@@ -37,24 +37,12 @@
org.sunbird.obsrv
dataset-registry
1.0.0
-
-
- org.apache.kafka
- kafka-clients
-
-
org.json4s
json4s-native_${scala.maj.version}
4.0.6
-
- org.apache.kafka
- kafka-clients
- ${kafka.version}
- test
-
org.apache.kafka
kafka_${scala.maj.version}
diff --git a/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/streaming/CacheIndexerStreamTask.scala b/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/streaming/CacheIndexerStreamTask.scala
index 61b9ddec..735440b7 100644
--- a/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/streaming/CacheIndexerStreamTask.scala
+++ b/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/streaming/CacheIndexerStreamTask.scala
@@ -32,7 +32,7 @@ class CacheIndexerStreamTask(config: CacheIndexerConfig, kafkaConnector: FlinkKa
val datasets = DatasetRegistry.getAllDatasets(Some(DatasetType.master.toString))
val datasetIds = datasets.map(f => f.id)
- val dataStream = getMapDataStream(env, config, datasetIds, config.kafkaConsumerProperties(), consumerSourceName = s"cache-indexer-consumer", kafkaConnector)
+ val dataStream = getTopicMapDataStream(env, config, datasetIds, consumerSourceName = s"cache-indexer-consumer", kafkaConnector)
processStream(dataStream)
}
diff --git a/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/util/MasterDataCache.scala b/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/util/MasterDataCache.scala
index c3365255..c5f95f32 100644
--- a/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/util/MasterDataCache.scala
+++ b/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/util/MasterDataCache.scala
@@ -1,9 +1,10 @@
package org.sunbird.obsrv.util
import org.json4s.native.JsonMethods._
-import org.json4s.{JNothing, JValue}
+import org.json4s.{JField, JNothing, JValue}
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.cache.RedisConnect
+import org.sunbird.obsrv.core.model.Constants.OBSRV_META
import org.sunbird.obsrv.model.DatasetModels.Dataset
import org.sunbird.obsrv.pipeline.task.CacheIndexerConfig
import redis.clients.jedis.Jedis
@@ -37,7 +38,11 @@ class MasterDataCache(val config: CacheIndexerConfig) {
def process(dataset: Dataset, key: String, event: JValue): (Int, Int) = {
val jedis = this.datasetPipelineMap(dataset.id)
val dataFromCache = getDataFromCache(dataset, key, jedis)
- updateCache(dataset, dataFromCache, key, event, jedis)
+ val updatedEvent = event.removeField {
+ case JField(OBSRV_META, _) => true
+ case _ => false
+ }
+ updateCache(dataset, dataFromCache, key, updatedEvent, jedis)
(if (dataFromCache == null) 1 else 0, if (dataFromCache == null) 0 else 1)
}
diff --git a/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/fixture/EventFixture.scala b/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/fixture/EventFixture.scala
index cf28aec5..078cde33 100644
--- a/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/fixture/EventFixture.scala
+++ b/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/fixture/EventFixture.scala
@@ -2,9 +2,9 @@ package org.sunbird.obsrv.fixture
object EventFixture {
- val VALID_BATCH_EVENT_D3_INSERT = """{"dataset":"dataset3","event":{"code":"HYUN-CRE-D6","manufacturer":"Hyundai","model":"Creta","variant":"SX(O)","modelYear":"2023","price":"2200000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"}}}"""
- val VALID_BATCH_EVENT_D3_INSERT_2 = """{"dataset":"dataset3","event":{"code":"HYUN-TUC-D6","manufacturer":"Hyundai","model":"Tucson","variant":"Signature","modelYear":"2023","price":"4000000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"admin.hyun@gmail.com","locationId":"KUN134567"}}}"""
- val VALID_BATCH_EVENT_D3_UPDATE = """{"dataset":"dataset3","event":{"code":"HYUN-CRE-D6","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"},"safety":"3 Star (Global NCAP)","seatingCapacity":5}}"""
- val VALID_BATCH_EVENT_D4 = """{"dataset":"dataset4","event":{"code":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}}"""
- val INVALID_BATCH_EVENT_D4 = """{"dataset":"dataset4","event":{"code1":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}}"""
+ val VALID_BATCH_EVENT_D3_INSERT = """{"code":"HYUN-CRE-D6","manufacturer":"Hyundai","model":"Creta","variant":"SX(O)","modelYear":"2023","price":"2200000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"}}"""
+ val VALID_BATCH_EVENT_D3_INSERT_2 = """{"code":"HYUN-TUC-D6","manufacturer":"Hyundai","model":"Tucson","variant":"Signature","modelYear":"2023","price":"4000000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"admin.hyun@gmail.com","locationId":"KUN134567"}}"""
+ val VALID_BATCH_EVENT_D3_UPDATE = """{"code":"HYUN-CRE-D6","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"},"safety":"3 Star (Global NCAP)","seatingCapacity":5}"""
+ val VALID_BATCH_EVENT_D4 = """{"code":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}"""
+ val INVALID_BATCH_EVENT_D4 = """{"code1":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}"""
}