Skip to content

Commit

Permalink
Merge pull request #246 from srdc/kafka-reader-update
Browse files Browse the repository at this point in the history
Kafka reader update
  • Loading branch information
dogukan10 authored Nov 8, 2024
2 parents e9f5ea8 + 1e8022b commit 8ce54bf
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 26 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,9 @@ Mapping job and mapping examples shown below for the streaming type of sources l
"source": {
"jsonClass": "KafkaSource",
"topicName": "patients",
"groupId": "tofhir",
"startingOffsets": "earliest"
"options": {
"startingOffsets": "earliest"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ class KafkaSourceReader(spark: SparkSession) extends BaseDataSourceReader[KafkaS
.format("kafka")
.option("kafka.bootstrap.servers", mappingJobSourceSettings.bootstrapServers)
.option("subscribe", mappingSourceBinding.topicName)
.option("startingOffsets", mappingSourceBinding.startingOffsets)
.option("inferSchema", value = true)
.options(mappingSourceBinding.options)
.load()
Expand All @@ -143,13 +142,15 @@ class KafkaSourceReader(spark: SparkSession) extends BaseDataSourceReader[KafkaS
.select("record.*")
}
else {
// Filter out the 'startingOffsets' option as it always supposed to be "earliest" for batch kafka reads
val filteredOptions = mappingSourceBinding.options.filterNot(o => o._1 == "startingOffsets")
spark
.read // Use read instead of readStream for a batch read
.format("kafka")
.option("kafka.bootstrap.servers", mappingJobSourceSettings.bootstrapServers)
.option("subscribe", mappingSourceBinding.topicName)
.option("inferSchema", value = true)
.options(mappingSourceBinding.options)
.options(filteredOptions)
.load()
.select($"value".cast(StringType)) // change the type of message from binary to string
.withColumn("value", processDataUDF(col("value"))) // replace 'value' column with the processed data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ case class SqlSource(tableName: Option[String] = None, query: Option[String] = N
* Context/configuration for one of the source of the mapping that will read the source data from a kafka as stream
*
* @param topicName The topic(s) to subscribe, may be comma seperated string list (topic1,topic2)
* @param groupId The Kafka group id to use in Kafka consumer while reading from Kafka
* @param startingOffsets The start point when a query is started
* @param options Further options for Kafka source (Spark Kafka Guide -> https://spark.apache.org/docs/3.4.1/structured-streaming-kafka-integration.html)
* @param options Further options for Kafka source (Spark Kafka Guide -> https://spark.apache.org/docs/3.5.1/structured-streaming-kafka-integration.html)
*/
case class KafkaSource(topicName: String, groupId: String, startingOffsets: String, options:Map[String, String] = Map.empty[String, String], override val preprocessSql: Option[String] = None, override val sourceRef: Option[String] = None) extends MappingSourceBinding
case class KafkaSource(topicName: String, options:Map[String, String] = Map.empty[String, String], override val preprocessSql: Option[String] = None, override val sourceRef: Option[String] = None) extends MappingSourceBinding

/**
* Represents a mapping source binding for FHIR server data.
Expand Down
5 changes: 3 additions & 2 deletions tofhir-engine/src/test/resources/test-mapping-job-kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
"source": {
"jsonClass": "KafkaSource",
"topicName": "patients",
"groupId": "tofhir",
"startingOffsets": "earliest"
"options": {
"startingOffsets": "earliest"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ class KafkaSourceIntegrationTest extends AnyFlatSpec with ToFhirTestSpec with Be
val patientMappingTask: FhirMappingTask = FhirMappingTask(
name = "patient-mapping",
mappingRef = "https://aiccelerate.eu/fhir/mappings/patient-mapping",
sourceBinding = Map("source" -> KafkaSource(topicName = "patients", groupId = "tofhir", startingOffsets = "earliest", sourceRef = Some("kafka-source")))
sourceBinding = Map("source" -> KafkaSource(topicName = "patients", sourceRef = Some("kafka-source"), options = Map("startingOffsets" -> "earliest")))
)
val otherObservationMappingTask: FhirMappingTask = FhirMappingTask(
name = "other-observation-mapping",
mappingRef = "https://aiccelerate.eu/fhir/mappings/other-observation-mapping",
sourceBinding = Map("source" -> KafkaSource(topicName = "observations", groupId = "tofhir", startingOffsets = "earliest", sourceRef = Some("kafka-source")))
sourceBinding = Map("source" -> KafkaSource(topicName = "observations", sourceRef = Some("kafka-source"), options = Map("startingOffsets" -> "earliest")))
)
val familyMemberHistoryMappingTask: FhirMappingTask = FhirMappingTask(
name = "family-member-history-mapping",
mappingRef = "https://aiccelerate.eu/fhir/mappings/family-member-history-mapping",
sourceBinding = Map("source" -> KafkaSource(topicName = "familyMembers", groupId = "tofhir", startingOffsets = "earliest", sourceRef = Some("kafka-source")))
sourceBinding = Map("source" -> KafkaSource(topicName = "familyMembers", sourceRef = Some("kafka-source"), options = Map("startingOffsets" -> "earliest")))
)

val fhirMappingJob: FhirMappingJob = FhirMappingJob(
Expand Down Expand Up @@ -241,7 +241,7 @@ class KafkaSourceIntegrationTest extends AnyFlatSpec with ToFhirTestSpec with Be
})
consumer.unsubscribe()
// modify familyMemberHistoryMappingTask to listen to familyMembersCorrupted topic
val mappingTask = familyMemberHistoryMappingTask.copy(sourceBinding = Map("source" -> KafkaSource(topicName = "familyMembersCorrupted", groupId = "tofhir", startingOffsets = "earliest")))
val mappingTask = familyMemberHistoryMappingTask.copy(sourceBinding = Map("source" -> KafkaSource(topicName = "familyMembersCorrupted", options = Map("startingOffsets" -> "earliest"))))
val execution: FhirMappingJobExecution = FhirMappingJobExecution(job = fhirMappingJob, mappingTasks = Seq(mappingTask))
val streamingQueryFutures: Map[String, Future[StreamingQuery]] = fhirMappingJobManager.startMappingJobStream(
mappingJobExecution = execution,
Expand Down
13 changes: 2 additions & 11 deletions tofhir-server/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2197,18 +2197,10 @@ components:
topicName:
type: string
description: "Topic(s) to subscribe to, may be comma-separated string list (topic1,topic2)"
groupId:
type: string
description: "Kafka group id to use in Kafka consumer while reading from Kafka"
startingOffsets:
type: string
description: "The start point when a query is started"
options:
$ref: "#/components/schemas/SparkOptions"
required:
- topicName
- groupId
- startingOffsets

FhirServerSource:
description: "Defines FHIR server source configurations of a mapping in the mapping-job"
Expand Down Expand Up @@ -3542,11 +3534,10 @@ components:
source:
sourceRef: "source3"
jsonClass: "KafkaSource"
topicName: "topic"
options:
startingOffsets: "latest"
maxFilesPerTrigger: "1"
topicName: "topic"
groupId: "group"
startingOffsets: "latest"
- name: "example-mapping-4"
mappingRef: "https://aiccelerate.eu/fhir/mappings/pilot1/preoperative-risks-mapping"
sourceBinding:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class JobEndpointTest extends BaseEndpointTest {
val kafkaSourceSettings: KafkaSourceSettings = KafkaSourceSettings(name = "kafka-source", sourceUri = "http://example.com/kafka", bootstrapServers = "http://some-kafka-server:9092")
val mappingJobSourceSettings: Map[String, MappingJobSourceSettings] =
Map("source1" -> kafkaSourceSettings)
val kafkaMappingTask: Seq[FhirMappingTask] = Seq(FhirMappingTask("mappingTest1", "mappingRef1", Map("sourceBinding1" -> KafkaSource(topicName = "topic", sourceRef = Some("source1"), groupId = "group", startingOffsets = "latest"))))
val kafkaMappingTask: Seq[FhirMappingTask] = Seq(FhirMappingTask("mappingTest1", "mappingRef1", Map("sourceBinding1" -> KafkaSource(topicName = "topic", sourceRef = Some("source1"), options = Map("startingOffsets" -> "latest")))))
val kafkaSourceJob: FhirMappingJob = FhirMappingJob(name = Some("mappingJob2"), sourceSettings = mappingJobSourceSettings, sinkSettings = sinkSettings, mappings = kafkaMappingTask, dataProcessingSettings = DataProcessingSettings())
// a malformed job with a source reference to a missing data source in the mapping tasks, to be rejected
val malformedMappingTasks: Seq[FhirMappingTask] = Seq(FhirMappingTask("mappingTest2","mappingRef1", Map("sourceBinding1" -> SqlSource(tableName = Some("table"), sourceRef = Some("source2")))))
Expand Down

0 comments on commit 8ce54bf

Please sign in to comment.