Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FHIR data can be read from FHIR endpoint and joined during mappings. #196

Merged
merged 3 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
tofhir-db/
projects.json
terminology-systems.json
testcontext/

# Spark
checkpoint/
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@
<artifactId>fhir-api-spark-source_${scala.binary.version}</artifactId>
<version>${spark-on-fhir.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.onfhir</groupId>-->
<!-- <artifactId>fhir-spark-util</artifactId>-->
<!-- <version>${spark-on-fhir.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.onfhir</groupId>-->
<!-- <artifactId>fhir-path-spark_${scala.binary.version}</artifactId>-->
<!-- <version>${spark-on-fhir.version}</version>-->
<!-- </dependency>-->

<!-- JSON4S -->
<dependency>
Expand Down
1 change: 1 addition & 0 deletions tofhir-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
<groupId>io.onfhir</groupId>
<artifactId>fhir-api-spark-source_${scala.binary.version}</artifactId>
</dependency>

<!-- toFHIR dependencies -->
<dependency>
<groupId>io.onfhir</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.tofhir.engine.data.read

import io.onfhir.spark.reader.FhirApiReader.OPTIONS
import io.tofhir.engine.model.{BasicAuthenticationSettings, BearerTokenAuthorizationSettings, FhirServerSource, FhirServerSourceSettings, FixedTokenAuthenticationSettings}
import io.tofhir.engine.model._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}

Expand Down Expand Up @@ -35,15 +35,24 @@ class FhirServerDataSourceReader(spark: SparkSession) extends BaseDataSourceRead
override def read(mappingSource: FhirServerSource, sourceSettings: FhirServerSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
// extract Spark option for the authentication from the given source settings
val authenticationOptions = extractAuthenticationOptions(sourceSettings)
// read data from a FHIR Server using a custom Spark data source i.e. io.onfhir.spark.reader.FhirApiTableProvider
spark

/*
val fhirConfig = SparkFhirConfig.apply("R5")
val schemaUtil = new SparkSchemaUtil(fhirConfig)
//Get the schema of the resource type from the provided FHIR release (R4 or R5)
val resourceSchema = schemaUtil.getSparkSchemaForResourceType(mappingSource.resourceType).get
*/

import io.onfhir.spark.reader.FhirApiReader._
implicit val implicitSpark: SparkSession = spark
implicitSpark
.read
.format("io.onfhir.spark.reader.FhirApiTableProvider")
.option("url", sourceSettings.serverUrl)
.option("rtype", mappingSource.resourceType)
.option("query", s"?${mappingSource.query.getOrElse("")}")
.fhir(sourceSettings.serverUrl)
.on(mappingSource.resourceType)
.options(authenticationOptions)
.load()
.load(/*resourceSchema*/)
// If we use the schema of a resource type, spark processing takes too long, probably because of the huge size of the schema.
// Hence, we load without a schema; FhirApiReader internally utilizes spark.read.json to parse the retrieved FHIR resources.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.tofhir.engine.data.write.{BaseFhirWriter, FhirWriterFactory, SinkHandl
import io.tofhir.engine.model._
import it.sauronsoftware.cron4j.SchedulingPattern
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions.{collect_list, struct}
import org.apache.spark.sql.functions.{collect_list, struct, udf}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

Expand All @@ -33,7 +33,7 @@ class FhirMappingJobManager(
fhirMappingRepository: IFhirMappingRepository,
contextLoader: IMappingContextLoader,
schemaLoader: IFhirSchemaLoader,
functionLibraries : Map[String, IFhirPathFunctionLibraryFactory],
functionLibraries: Map[String, IFhirPathFunctionLibraryFactory],
spark: SparkSession,
mappingJobScheduler: Option[MappingJobScheduler] = Option.empty
)(implicit ec: ExecutionContext) extends IFhirMappingJobManager {
Expand Down Expand Up @@ -126,7 +126,7 @@ class FhirMappingJobManager(
.recover {
case e: Throwable =>
val jobResult = FhirMappingJobResult(mappingJobExecution, Some(t.mappingRef), status = Some(FhirMappingJobResult.FAILURE))
logger.error(jobResult.toMapMarker, jobResult.toString,e)
logger.error(jobResult.toMapMarker, jobResult.toString, e)
throw e
}
})
Expand Down Expand Up @@ -185,10 +185,10 @@ class FhirMappingJobManager(
* Runnable for scheduled periodic mapping job
*
* @param mappingJobExecution Mapping job execution
* @param startTime Initial start time for source data
* @param sourceSettings Settings for the source system
* @param sinkSettings FHIR sink settings/configurations
* @param schedulingSettings Scheduling information
* @param startTime Initial start time for source data
* @param sourceSettings Settings for the source system
* @param sinkSettings FHIR sink settings/configurations
* @param schedulingSettings Scheduling information
* @return
*/
private def runnableMappingJob(mappingJobExecution: FhirMappingJobExecution,
Expand Down Expand Up @@ -282,8 +282,8 @@ class FhirMappingJobManager(
// Using Future.apply to convert the result of readJoinSourceData into a Future
// ensuring that if there's an error in readJoinSourceData, it will be propagated as a failed future
Future.apply(readJoinSourceData(task, sourceSettings, timeRange, jobId = Some(jobId))) flatMap {
case (fhirMapping, mds, df) => executeTask(jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, executionId, projectId = projectId)
}
case (fhirMapping, mds, df) => executeTask(jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, executionId, projectId = projectId)
}
}

/**
Expand All @@ -306,19 +306,19 @@ class FhirMappingJobManager(
timeRange: Option[(LocalDateTime, LocalDateTime)] = None): Future[Unit] = {
val mappingTask = mappingJobExecution.mappingTasks.head
logger.debug(s"Reading source data for mapping ${mappingTask.mappingRef} within mapping job ${mappingJobExecution.jobId} ...")
val (fhirMapping, mds, df) = readJoinSourceData(mappingTask, sourceSettings, timeRange, jobId = Some(mappingJobExecution.jobId)) // FIXME: Why reading again below?
val (fhirMapping, mds, df) = readJoinSourceData(mappingTask, sourceSettings, timeRange, jobId = Some(mappingJobExecution.jobId))
val sizeOfDf: Long = df.count()
logger.debug(s"$sizeOfDf records read for mapping ${mappingTask.mappingRef} within mapping job ${mappingJobExecution.jobId} ...")

ToFhirConfig.engineConfig.maxBatchSizeForMappingJobs match {
//If not specify run it as single batch
case None =>
logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} ...")
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) // Retrieve the source data and execute the mapping
executeTask(mappingJobExecution.jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), Some(mappingJobExecution.projectId))
.map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter
case Some(batchSize) if sizeOfDf < batchSize =>
logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} ...")
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) // Retrieve the source data and execute the mapping
executeTask(mappingJobExecution.jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), Some(mappingJobExecution.projectId))
.map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter
//Otherwise divide the data into batches
case Some(batchSize) =>
Expand Down Expand Up @@ -377,7 +377,7 @@ class FhirMappingJobManager(
sources.map {
case (alias, schema, sourceContext, sourceStt, timeRange) =>
alias ->
SourceHandler.readSource( alias, spark, sourceContext, sourceStt, schema, timeRange, jobId = jobId)
SourceHandler.readSource(alias, spark, sourceContext, sourceStt, schema, timeRange, jobId = jobId)
}

val df = handleJoin(fhirMapping.source, sourceDataFrames)
Expand Down Expand Up @@ -421,12 +421,12 @@ class FhirMappingJobManager(
.toSeq
.map(cdef => contextLoader.retrieveContext(cdef._2).map(context => cdef._1 -> context))
).map(loadedContextMap => {
//Get configuration context
val configurationContext = mainSourceSettings.toConfigurationContext
//Construct the mapping service
val fhirMappingService = new FhirMappingService(jobId, fhirMapping.url, fhirMapping.source.map(_.alias), (loadedContextMap :+ configurationContext).toMap, fhirMapping.mapping, fhirMapping.variable, mainSourceSettings.columnToConvert ,terminologyServiceSettings, identityServiceSettings, functionLibraries,projectId)
MappingTaskExecutor.executeMapping(spark, df, fhirMappingService, executionId)
})
//Get configuration context
val configurationContext = mainSourceSettings.toConfigurationContext
//Construct the mapping service
val fhirMappingService = new FhirMappingService(jobId, fhirMapping.url, fhirMapping.source.map(_.alias), (loadedContextMap :+ configurationContext).toMap, fhirMapping.mapping, fhirMapping.variable, terminologyServiceSettings, identityServiceSettings, functionLibraries, projectId)
MappingTaskExecutor.executeMapping(spark, df, fhirMappingService, executionId)
})
}

/**
Expand All @@ -437,31 +437,66 @@ class FhirMappingJobManager(
* @return
*/
private def handleJoin(sources: Seq[FhirMappingSource], sourceDataFrames: Seq[(String, DataFrame)]): DataFrame = {

// Rename a DataFrame's column name from dotted version to camelcase since dots have special meaning in Spark's column names.
def toCamelCase(input: String): String = {
input.split("\\.").toList match {
case Nil => ""
case head :: tail =>
head + tail.map(_.capitalize).mkString("")
}
}

// For each column which is a FHIR reference, remove FHIR resource name so that the joins can work.
// Patient/1234 -> 1234
def transformFhirReferenceColumns(joinCols: Seq[String], df: DataFrame): DataFrame = {
// This is a Spark UDF to remove FHIR resource names from values of FHIR references.
val fhirReferenceResourceNameRemoverUDF = udf((reference: String) => {
if (reference.matches("^[A-Z].*/.*$")) {
reference.substring(reference.indexOf('/') + 1)
} else {
reference
}
})
joinCols.filter(_.contains(".reference")).foldLeft(df) {
case (df, refColumn) => df.withColumn(toCamelCase(refColumn), fhirReferenceResourceNameRemoverUDF(df.col(toCamelCase(refColumn))))
}
}

sourceDataFrames match {
case Seq(_ -> df) => df
//If we have multiple sources
case _ =>
val mainSource = sourceDataFrames.head._1
case _ => //If we have multiple sources
val mainSource = sourceDataFrames.head._1 // We accept the 1st source as the main source and left-join the other sources on this main source.
val mainJoinOnColumns = sources.find(_.alias == mainSource).get.joinOn

// Add the JSON object of the whole Row as a column to the DataFrame of the main source
var mainDf = sourceDataFrames.head._2.withColumn(s"__$mainSource", struct("*"))
mainDf = mainDf.select((mainJoinOnColumns :+ s"__$mainSource").map(mainDf.col): _*)

// Find the values addressed by each column (they can be subject.reference or identifier.value (Spark navigates the DataFrame accordingly, like FHIRPath)),
// add them to the DataFrame with an alias for each column to convert subject.reference to subjectReference with toCamelCase function.
// Construct a DataFrame with these join columns and the JSON object of the Row.
mainDf = mainDf.select(
mainJoinOnColumns.map(c => mainDf.col(c).as(toCamelCase(c))) :+ mainDf.col(s"__$mainSource"): _*)

// This is a hack to remove the FHIR resource names from reference fields so that join can work!
mainDf = transformFhirReferenceColumns(mainJoinOnColumns, mainDf)

//Group other dataframes on join columns and rename their join columns
val otherDfs: Seq[(DataFrame, Seq[String])] =
sourceDataFrames
.tail
.tail // The first source is the main and the rest are others to be left-joined
.map {
case (alias, df) =>
val joinColumnStmts = sources.find(_.alias == alias).get.joinOn
val colsToJoinOn = joinColumnStmts.filter(_ != null)
val groupedDf =
df
.groupBy(colsToJoinOn.map(df.col): _*)
.agg(collect_list(struct("*")).as(s"__$alias"))

var groupedDf = df
.groupBy(colsToJoinOn.map(c => df.col(c).as(toCamelCase(c))): _*)
.agg(collect_list(struct("*")).as(s"__$alias"))
groupedDf = transformFhirReferenceColumns(colsToJoinOn, groupedDf)
if (colsToJoinOn.toSet.subsetOf(mainJoinOnColumns.toSet))
groupedDf -> colsToJoinOn
else {
val actualJoinColumns = joinColumnStmts.zip(mainJoinOnColumns).filter(_._1 != null)
val actualJoinColumns = joinColumnStmts.map(toCamelCase).zip(mainJoinOnColumns.map(toCamelCase)).filter(_._1 != null)
actualJoinColumns
.foldLeft(groupedDf) {
case (gdf, (c1, r1)) => gdf.withColumnRenamed(c1, r1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import scala.concurrent.Future
* @param context Context data for mappings
* @param mappings Mapping scripts
* @param variables Variables defined in the mapping
* @param columnToConvert Optional column name in the source DataFrame to be converted to a JObject i.e.
* input to the mapping executor. If not provided, the entire row will be converted.
* @param terminologyServiceSettings Settings for terminology service to use within mappings (e.g. lookupDisplay)
* @param identityServiceSettings Settings for identity service to use within mappings (e.g. resolveIdentifier)
* @param functionLibraries External function libraries containing functions to use in FHIRPath expressions
Expand All @@ -34,7 +32,6 @@ class FhirMappingService(
context: Map[String, FhirMappingContext],
mappings: Seq[FhirMappingExpression],
variables: Seq[FhirExpression],
val columnToConvert: Option[String],
terminologyServiceSettings: Option[TerminologyServiceSettings],
identityServiceSettings: Option[IdentityServiceSettings],
functionLibraries: Map[String, IFhirPathFunctionLibraryFactory],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.tofhir.engine.mapping

import com.fasterxml.jackson.dataformat.csv.CsvReadException
import io.onfhir.api.Resource
import io.onfhir.api.service.IFhirTerminologyService
import io.onfhir.api.util.{FHIRUtil, IOUtil}
import io.tofhir.engine.mapping.LocalTerminologyService.{CodeSystemFileColumns, ConceptMapFileColumns, equivalenceCodes}
Expand Down Expand Up @@ -424,6 +425,13 @@ class LocalTerminologyService(settings:LocalFhirTerminologyServiceSettings) exte
throw FhirMappingException(s"Invalid tofhir CodeSystem CSV file $filePath! Columns ${Set(CodeSystemFileColumns.CODE, CodeSystemFileColumns.DISPLAY).mkString(",")} are mandatory for code system definitions!",t)
}
}

/**
* These methods are not implemented for the LocalTerminologyService and will not implemented in the future.
*/
override def expandWithId(id: String, filter: Option[String], offset: Option[Long], count: Option[Long]): Future[JObject] = ???
override def expand(url: String, version: Option[String], filter: Option[String], offset: Option[Long], count: Option[Long]): Future[JObject] = ???
override def expandWithValueSet(valueSet: Resource, offset: Option[Long], count: Option[Long]): Future[JObject] = ???
}

object LocalTerminologyService {
Expand Down
Loading
Loading