From 3569ef6e36145235cf16d74118407eca869f96f5 Mon Sep 17 00:00:00 2001 From: emrecam Date: Mon, 1 Jul 2024 18:21:11 +0300 Subject: [PATCH] :sparkles: Add spark options to kafka and sql mapping sources --- .../io/tofhir/engine/data/read/KafkaSourceReader.scala | 1 + .../scala/io/tofhir/engine/data/read/SqlSourceReader.scala | 1 + .../main/scala/io/tofhir/engine/model/FhirMappingTask.scala | 6 ++++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/KafkaSourceReader.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/KafkaSourceReader.scala index 8d79b40e..80cda29b 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/KafkaSourceReader.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/KafkaSourceReader.scala @@ -132,6 +132,7 @@ class KafkaSourceReader(spark: SparkSession) extends BaseDataSourceReader[KafkaS .option("subscribe", mappingSource.topicName) .option("startingOffsets", mappingSource.startingOffsets) .option("inferSchema", value = true) + .options(mappingSource.options) .load() .select($"value".cast(StringType)) // change the type of message from binary to string .withColumn("value", processDataUDF(col("value"))) // replace 'value' column with the processed data diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/SqlSourceReader.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/SqlSourceReader.scala index bf0bcca0..37ad69f2 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/SqlSourceReader.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/SqlSourceReader.scala @@ -65,6 +65,7 @@ class SqlSourceReader(spark: SparkSession) extends BaseDataSourceReader[SqlSourc .option("dbtable", dbTable) .option("user", sourceSettings.username) .option("password", sourceSettings.password) + .options(mappingSource.options) .load() } } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala index 99144acb..923176b6 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala @@ -96,8 +96,9 @@ case class FileSystemSource(path: String, fileFormat:Option[String] = None, opti * * @param tableName Name of the table * @param query Query to execute in the database + * @param options Further options for SQL source (Spark SQL Guide -> https://spark.apache.org/docs/3.4.1/sql-data-sources-jdbc.html ). */ -case class SqlSource(tableName: Option[String] = None, query: Option[String] = None, override val preprocessSql: Option[String] = None) extends FhirMappingSourceContext +case class SqlSource(tableName: Option[String] = None, query: Option[String] = None, options:Map[String, String] = Map.empty[String, String], override val preprocessSql: Option[String] = None) extends FhirMappingSourceContext /** * Context/configuration for one of the source of the mapping that will read the source data from a kafka as stream @@ -105,8 +106,9 @@ case class SqlSource(tableName: Option[String] = None, query: Option[String] = N * @param topicName The topic(s) to subscribe, may be comma seperated string list (topic1,topic2) * @param groupId The Kafka group id to use in Kafka consumer while reading from Kafka * @param startingOffsets The start point when a query is started + * @param options Further options for Kafka source (Spark Kafka Guide -> https://spark.apache.org/docs/3.4.1/structured-streaming-kafka-integration.html) */ -case class KafkaSource(topicName: String, groupId: String, startingOffsets: String, override val preprocessSql: Option[String] = None) extends FhirMappingSourceContext +case class KafkaSource(topicName: String, groupId: String, startingOffsets: String, options:Map[String, String] = Map.empty[String, String], override val preprocessSql: Option[String] = None) extends FhirMappingSourceContext /** * Represents a mapping source context for FHIR server data.