Skip to content

Commit

Permalink
✨ when testing mappings, group FHIR resources by each row in the inpu…
Browse files Browse the repository at this point in the history
…t, and also show joined secondary inputs (#236)
  • Loading branch information
YemreGurses authored Oct 3, 2024
1 parent 9edcfe5 commit f46a240
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ import scala.concurrent.Future
* @param identityServiceSettings Settings for identity service to use within mappings (e.g. resolveIdentifier)
* @param functionLibraries External function libraries containing functions to use in FHIRPath expressions
* @param projectId Project identifier associated with the mapping job
* @param isForTesting Flag indicating whether the mapping is being tested
* (if true, mapped FHIR resources are grouped by input row in the FhirMappingResult)
*/
class FhirMappingService(
val jobId: String,
val mappingTaskName: String,
val sources: Seq[String],
context: Map[String, FhirMappingContext],
mappings: Seq[FhirMappingExpression],
variables: Seq[FhirExpression],
terminologyServiceSettings: Option[TerminologyServiceSettings],
identityServiceSettings: Option[IdentityServiceSettings],
functionLibraries: Map[String, IFhirPathFunctionLibraryFactory],
val projectId: Option[String]
class FhirMappingService(val jobId: String,
val mappingTaskName: String,
val sources: Seq[String],
context: Map[String, FhirMappingContext],
mappings: Seq[FhirMappingExpression],
variables: Seq[FhirExpression],
terminologyServiceSettings: Option[TerminologyServiceSettings],
identityServiceSettings: Option[IdentityServiceSettings],
functionLibraries: Map[String, IFhirPathFunctionLibraryFactory],
val projectId: Option[String],
val isForTesting: Boolean = false
) extends IFhirMappingService {

lazy val terminologyService = terminologyServiceSettings.map(setting => IntegratedServiceFactory.createTerminologyService(setting))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import io.tofhir.common.util.ExceptionUtil
import io.tofhir.engine.config.ToFhirConfig
import io.tofhir.engine.data.read.SourceHandler
import io.tofhir.engine.model.exception.FhirMappingException
import io.tofhir.engine.model.{FhirMappingError, FhirMappingErrorCodes, FhirMappingResult}
import io.tofhir.engine.model.{FhirMappingError, FhirMappingErrorCodes, FhirMappingResult, MappedFhirResource}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.json4s.JsonAST.{JArray, JObject, JValue}
import org.json4s.JsonDSL._

import java.sql.Timestamp
import java.time.Instant
Expand Down Expand Up @@ -177,36 +178,53 @@ object MappingTaskExecutor {
val results =
try {
val mappedResources = Await.result(fhirMappingService.mapToFhir(jo, otherInputs), ToFhirConfig.engineConfig.mappingTimeout)
mappedResources.flatMap {
//If this is a JSON Patch, the resources are patches so return it as single result
case (mappingExpr, resources, fhirInteraction) if fhirInteraction.exists(_.`type` == "patch") && resources.length > 1 =>
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
mappedResource = Some(Serialization.write(JArray(resources.toList))),
fhirInteraction = fhirInteraction,
executionId = executionId,
projectId = fhirMappingService.projectId
))
//Otherwise return each resource as a separate mapping result
case (mappingExpr, resources, fhirInteraction) =>
resources.map(r =>
FhirMappingResult(
// we don't want to lose information about which FHIR resource was generated by which input row during testing
if(fhirMappingService.isForTesting) {
val flattenedResources = mappedResources.flatMap {
case (mappingExpr, resources, fhirInteraction) =>
resources.map(resource => (mappingExpr, resource, fhirInteraction))
}
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = fhirMappingService.mappingTaskName,
mappedFhirResources = flattenedResources.map(r => MappedFhirResource(Some(r._1), Some(Serialization.write(r._2)), r._3)),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)),
executionId = executionId,
projectId = fhirMappingService.projectId,
))
} else {
mappedResources.flatMap {
//If this is a JSON Patch, the resources are patches so return it as single result
case (mappingExpr, resources, fhirInteraction) if fhirInteraction.exists(_.`type` == "patch") && resources.length > 1 =>
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
mappedResource = Some(Serialization.write(r)),
mappedResource = Some(Serialization.write(JArray(resources.toList))),
fhirInteraction = fhirInteraction,
executionId = executionId,
projectId = fhirMappingService.projectId,
resourceType = (r\ "resourceType").extractOpt[String]
projectId = fhirMappingService.projectId
))
//Otherwise return each resource as a separate mapping result
case (mappingExpr, resources, fhirInteraction) =>
resources.map(r =>
FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
mappedResource = Some(Serialization.write(r)),
fhirInteraction = fhirInteraction,
executionId = executionId,
projectId = fhirMappingService.projectId,
resourceType = (r\ "resourceType").extractOpt[String]
)
)
)
}
}
} catch {
// Exception in expression evaluation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ class FhirMappingJobManager(
* @param identityServiceSettings Identity service settings
* @param executionId Id of FhirMappingJobExecution object
* @param projectId Project identifier associated with the mapping job
* @param isForTesting Flag indicating whether the mapping is being tested
* (if true, mapped FHIR resources are grouped by input row in the FhirMappingResult)
* @return
*/
def executeTask(jobId: String,
Expand All @@ -438,7 +440,8 @@ class FhirMappingJobManager(
terminologyServiceSettings: Option[TerminologyServiceSettings] = None,
identityServiceSettings: Option[IdentityServiceSettings] = None,
executionId: Option[String] = None,
projectId: Option[String] = None
projectId: Option[String] = None,
isForTesting: Boolean = false
): Future[Dataset[FhirMappingResult]] = {
//Load the contextual data for the mapping
Future
Expand All @@ -451,7 +454,7 @@ class FhirMappingJobManager(
//Get configuration context
val configurationContext = mainSourceSettings.toConfigurationContext
//Construct the mapping service
val fhirMappingService = new FhirMappingService(jobId, mappingTaskName, fhirMapping.source.map(_.alias), (loadedContextMap :+ configurationContext).toMap, fhirMapping.mapping, fhirMapping.variable, terminologyServiceSettings, identityServiceSettings, functionLibraries, projectId)
val fhirMappingService = new FhirMappingService(jobId, mappingTaskName, fhirMapping.source.map(_.alias), (loadedContextMap :+ configurationContext).toMap, fhirMapping.mapping, fhirMapping.variable, terminologyServiceSettings, identityServiceSettings, functionLibraries, projectId, isForTesting)
MappingTaskExecutor.executeMapping(spark, df, fhirMappingService, executionId)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import java.sql.Timestamp

/**
* Mapping process result
* @param jobId Identifier of the job that this mapping is performed within
* @param mappingTaskName Name of the mappingTask that is executed
* @param mappingExpr Name of the mapping expression (FhirMapping.mapping.expression.name) that this mapping is based on
* @param timestamp Timestamp of the result
* @param mappedResource If successful, JSON serialization of the FHIR resource generated via the mapping
* @param source If there is a problem in the process, the JSON serialization of the source data
* @param error If there is a problem in the process, description of the problem
* @param fhirInteraction FHIR interaction details to persist the mapped result
* @param executionId Id of FhirMappingJobExecution object
* @param projectId Project identifier associated with the mapping job
* @param resourceType The type of FHIR resource generated by the mapping. This field is optional, as the
* FHIR mapping result may represent a flat structure without a specific FHIR resource type.
*
* @param jobId Identifier of the job that this mapping is performed within
* @param mappingTaskName Name of the mappingTask that is executed
* @param mappingExpr Name of the mapping expression (FhirMapping.mapping.expression.name) that this mapping is based on
* @param timestamp Timestamp of the result
* @param mappedResource If successful, JSON serialization of the FHIR resource generated via the mapping
* @param source If there is a problem in the process, the JSON serialization of the source data
* @param error If there is a problem in the process, description of the problem
* @param fhirInteraction FHIR interaction details to persist the mapped result
* @param executionId Id of FhirMappingJobExecution object
* @param projectId Project identifier associated with the mapping job
* @param resourceType The type of FHIR resource generated by the mapping. This field is optional, as the
* FHIR mapping result may represent a flat structure without a specific FHIR resource type.
* @param mappedFhirResources List of mapped FHIR resources. This is used to group mapped resources by input row for testing purposes.
*/
case class FhirMappingResult(
jobId:String,
Expand All @@ -31,7 +33,8 @@ case class FhirMappingResult(
fhirInteraction:Option[FhirInteraction] = None,
executionId: Option[String] = None,
projectId: Option[String] = None,
resourceType: Option[String] = None
resourceType: Option[String] = None,
mappedFhirResources: Seq[MappedFhirResource] = Seq.empty,
) {
final val eventId:String = "MAPPING_RESULT"
override def toString: String = {
Expand Down Expand Up @@ -103,3 +106,14 @@ object FhirMappingErrorCodes {

final val UNEXPECTED_PROBLEM = "unexpected_problem"
}

/**
* Mapped FHIR resources model after the mapping process, only used for mapping testing feature
*
* @param mappingExpr Mapping expression name that this mapping is based on
* @param mappedResource JSON serialization of the FHIR resource generated via the mapping
* @param fhirInteraction FHIR interaction details
*/
case class MappedFhirResource(mappingExpr:Option[String] = None,
mappedResource: Option[String] = None,
fhirInteraction: Option[FhirInteraction])
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
)
val (fhirMapping, mappingJobSourceSettings, dataFrame) = fhirMappingJobManager.readJoinSourceData(mappingTask, mappingJob.sourceSettings, jobId = Some(jobId), isTestExecution = true)
val selected = DataFrameUtil.applyResourceFilter(dataFrame, testResourceCreationRequest.resourceFilter)
fhirMappingJobManager.executeTask(mappingJob.id, mappingTask.name, fhirMapping, selected, mappingJobSourceSettings, mappingJob.terminologyServiceSettings, mappingJob.getIdentityServiceSettings(), projectId = Some(projectId))
fhirMappingJobManager.executeTask(mappingJob.id, mappingTask.name, fhirMapping, selected, mappingJobSourceSettings, mappingJob.terminologyServiceSettings, mappingJob.getIdentityServiceSettings(), projectId = Some(projectId), isForTesting = true)
.map { dataFrame =>
dataFrame
.collect() // Collect into an Array[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
val results: Seq[FhirMappingResult] = JsonMethods.parse(responseAs[String]).extract[Seq[FhirMappingResult]]
results.length shouldEqual 3
results.head.mappingTaskName shouldEqual "patient-mapping"
results.head.mappedResource.get shouldEqual "{\"resourceType\":\"Patient\"," +
results.head.mappedFhirResources.head.mappedResource.get shouldEqual "{\"resourceType\":\"Patient\"," +
"\"id\":\"34dc88d5972fd5472a942fc80f69f35c\"," +
"\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"]," +
"\"source\":\"https://aiccelerate.eu/data-integration-suite/test-data\"}," +
Expand All @@ -256,7 +256,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
val results: Seq[FhirMappingResult] = JsonMethods.parse(responseAs[String]).extract[Seq[FhirMappingResult]]
results.length shouldEqual 3
results.head.mappingTaskName shouldEqual "patient-mapping2"
results.head.mappedResource.get shouldEqual "{\"resourceType\":\"Patient\"," +
results.head.mappedFhirResources.head.mappedResource.get shouldEqual "{\"resourceType\":\"Patient\"," +
"\"id\":\"34dc88d5972fd5472a942fc80f69f35c\"," +
"\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"]," +
"\"source\":\"https://aiccelerate.eu/data-integration-suite/test-data\"}," +
Expand All @@ -283,7 +283,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
results.length shouldEqual 3
results.head.mappingTaskName shouldEqual "other-observation-mapping"

val result: JObject = JsonMethods.parse(results.head.mappedResource.get).asInstanceOf[JObject]
val result: JObject = JsonMethods.parse(results.head.mappedFhirResources.head.mappedResource.get).asInstanceOf[JObject]
(result \ "meta" \ "profile").asInstanceOf[JArray].arr.head.extract[String] shouldEqual "https://aiccelerate.eu/fhir/StructureDefinition/AIC-IntraOperativeObservation"
(result \ "effectiveDateTime").extract[String] startsWith "2007-10-12T10:00:00"
(result \ "valueQuantity" \ "value").extract[Int] shouldEqual 450
Expand Down

0 comments on commit f46a240

Please sign in to comment.