Skip to content

Commit

Permalink
💄 refactor: Resolve MR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dogukan10 committed Sep 20, 2024
1 parent 59cadb2 commit 6629b97
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import scala.concurrent.Future
* Mapping service for a specific FhirMapping together with contextual data and mapping scripts
*
* @param jobId Identifier of the job referring to the mapping
* @param mappingUrl Url of the mapping being executed
* @param mappingTaskName The name of the mapping task being executed
* @param sources List of source aliases
* @param context Context data for mappings
* @param mappings Mapping scripts
Expand All @@ -30,7 +30,7 @@ import scala.concurrent.Future
*/
class FhirMappingService(
val jobId: String,
val mappingUrl: String,
val mappingTaskName: String,
val sources: Seq[String],
context: Map[String, FhirMappingContext],
mappings: Seq[FhirMappingExpression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@ object MappingTaskExecutor {
* Executing the mapping and returning the dataframe for FHIR resources
*
* @param spark Spark session
* @param mappingTaskName Name of the mappingTask
* @param df DataFrame to be mapped
* @param fhirMappingService Mapping service for a specific FhirMapping together with contextual data and mapping scripts
* @param executionId Id of FhirMappingJobExecution object
* @return
*/
def executeMapping(spark: SparkSession, mappingTaskName: String, df: DataFrame, fhirMappingService: FhirMappingService, executionId: Option[String] = None): Dataset[FhirMappingResult] = {
def executeMapping(spark: SparkSession, df: DataFrame, fhirMappingService: FhirMappingService, executionId: Option[String] = None): Dataset[FhirMappingResult] = {
fhirMappingService.sources match {
case Seq(_) => executeMappingOnSingleSource(spark, mappingTaskName, df, fhirMappingService, executionId)
case Seq(_) => executeMappingOnSingleSource(spark, df, fhirMappingService, executionId)
//Executing on multiple sources
case oth => executeMappingOnMultipleSources(spark, mappingTaskName, df, fhirMappingService, oth, executionId)
case oth => executeMappingOnMultipleSources(spark, df, fhirMappingService, oth, executionId)
}
}

Expand All @@ -66,7 +65,6 @@ object MappingTaskExecutor {
* @return
*/
private def executeMappingOnSingleSource(spark: SparkSession,
mappingTaskName: String,
df: DataFrame,
fhirMappingService: FhirMappingService,
executionId: Option[String] = None): Dataset[FhirMappingResult] = {
Expand All @@ -81,12 +79,12 @@ object MappingTaskExecutor {

Option(row.getAs[String](SourceHandler.INPUT_VALIDITY_ERROR)) match {
//If input is valid
case None => executeMappingOnInput(jo, mappingTaskName, Map.empty[String, JValue], fhirMappingService, executionId)
case None => executeMappingOnInput(jo, Map.empty[String, JValue], fhirMappingService, executionId)
//If the input is not valid, return the error
case Some(validationError) =>
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = mappingTaskName,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
Expand All @@ -103,7 +101,6 @@ object MappingTaskExecutor {
}

private def executeMappingOnMultipleSources(spark: SparkSession,
mappingTaskName: String,
df: DataFrame,
fhirMappingService: FhirMappingService,
sources: Seq[String],
Expand Down Expand Up @@ -144,12 +141,12 @@ object MappingTaskExecutor {

validationErrors match {
//If input is valid
case Nil => executeMappingOnInput(jo, mappingTaskName, otherObjectMap, fhirMappingService, executionId)
case Nil => executeMappingOnInput(jo, otherObjectMap, fhirMappingService, executionId)
//If the input is not valid, return the error
case _ =>
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = mappingTaskName,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
Expand All @@ -169,13 +166,11 @@ object MappingTaskExecutor {
* Execute the mapping on an JSON converted input
*
* @param jo Input object
* @param mappingTaskName Name of the mappingTask
* @param fhirMappingService Mapping service
* @param executionId Id of FhirMappingJobExecution object
* @return
*/
private def executeMappingOnInput(jo: JObject,
mappingTaskName: String,
otherInputs: Map[String, JValue],
fhirMappingService: FhirMappingService,
executionId: Option[String] = None): Seq[FhirMappingResult] = {
Expand All @@ -188,7 +183,7 @@ object MappingTaskExecutor {
case (mappingExpr, resources, fhirInteraction) if fhirInteraction.exists(_.`type` == "patch") && resources.length > 1 =>
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = mappingTaskName,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
Expand All @@ -202,7 +197,7 @@ object MappingTaskExecutor {
resources.map(r =>
FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = mappingTaskName,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
Expand Down Expand Up @@ -234,7 +229,7 @@ object MappingTaskExecutor {
}
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = mappingTaskName,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
Expand All @@ -249,7 +244,7 @@ object MappingTaskExecutor {
case e: FhirMappingException =>
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = mappingTaskName,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
Expand All @@ -263,7 +258,7 @@ object MappingTaskExecutor {
logger.debug("Mapping timeout, continuing the processing of mappings...")
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = mappingTaskName,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
Expand All @@ -277,7 +272,7 @@ object MappingTaskExecutor {
logger.error("Unexpected problem while executing the mappings...", oth)
Seq(FhirMappingResult(
jobId = fhirMappingService.jobId,
mappingTaskName = mappingTaskName,
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ class FhirMappingJobManager(
terminologyServiceSettings: Option[TerminologyServiceSettings] = None,
identityServiceSettings: Option[IdentityServiceSettings] = None,
executionId: Option[String] = None,
projectId: Option[String] = None,
projectId: Option[String] = None
): Future[Dataset[FhirMappingResult]] = {
//Load the contextual data for the mapping
Future
Expand All @@ -443,8 +443,8 @@ class FhirMappingJobManager(
//Get configuration context
val configurationContext = mainSourceSettings.toConfigurationContext
//Construct the mapping service
val fhirMappingService = new FhirMappingService(jobId, fhirMapping.url, fhirMapping.source.map(_.alias), (loadedContextMap :+ configurationContext).toMap, fhirMapping.mapping, fhirMapping.variable, terminologyServiceSettings, identityServiceSettings, functionLibraries, projectId)
MappingTaskExecutor.executeMapping(spark, mappingTaskName, df, fhirMappingService, executionId)
val fhirMappingService = new FhirMappingService(jobId, mappingTaskName, fhirMapping.source.map(_.alias), (loadedContextMap :+ configurationContext).toMap, fhirMapping.mapping, fhirMapping.variable, terminologyServiceSettings, identityServiceSettings, functionLibraries, projectId)
MappingTaskExecutor.executeMapping(spark, df, fhirMappingService, executionId)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ case class FhirMappingJob(id: String = UUID.randomUUID().toString,
}

// Check names of the mappingTasks, if a duplicate name is found, throw an error
if(!FhirMappingJobFormatter.checkMappingTaskNamesUnique(mappings)){
throw new BadRequestException("Duplicate mappingTask name. Ensure each MappingTask has a unique name!")
val duplicateMappingTasks = FhirMappingJobFormatter.findDuplicateMappingTaskNames(mappings)
if(duplicateMappingTasks.nonEmpty){
throw new BadRequestException(s"Duplicate MappingTask name(s) found: ${duplicateMappingTasks.mkString(", ")}. Each MappingTask must have a unique name.")
}

// Check mapping tasks of the job, if a data source of a mapping task is missing, throw an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ object FhirMappingJobFormatter {
val fileContent = try replaceEnvironmentVariables(source.mkString) finally source.close()
val mappingJob = org.json4s.jackson.JsonMethods.parse(fileContent).extract[FhirMappingJob]
// check there are no duplicate name on mappingTasks of the job
if(!this.checkMappingTaskNamesUnique(mappingJob.mappings)){
throw new MappingException("Duplicate 'name' fields found in the MappingTasks within the MappingJob! Ensure each MappingTask has a unique name.")
val duplicateMappingTasks = FhirMappingJobFormatter.findDuplicateMappingTaskNames(mappingJob.mappings)
if (duplicateMappingTasks.nonEmpty) {
throw new MappingException(s"Duplicate 'name' fields detected in the MappingTasks of the MappingJob: ${duplicateMappingTasks.mkString(", ")}. Please ensure that each MappingTask has a unique name.")
}
mappingJob
}
Expand All @@ -84,13 +85,13 @@ object FhirMappingJobFormatter {
}

/**
* Check whether names of the mappingTask array is unique
* @param mappingTasks mappingTask array from mappingJob definition
* @return
* Check for duplicate names in the mappingTask array from the mappingJob definition.
*
* @param mappingTasks mappingTask array from the mappingJob definition.
* @return A sequence of duplicate names, if any. Returns an empty sequence if all names are unique.
*/
def checkMappingTaskNamesUnique(mappingTasks: Seq[FhirMappingTask]): Boolean = {
val nameSet = mappingTasks.map(_.name).toSet
nameSet.size == mappingTasks.size
def findDuplicateMappingTaskNames(mappingTasks: Seq[FhirMappingTask]): Seq[String] = {
mappingTasks.groupBy(_.name).view.mapValues(_.size).filter(_._2 > 1).keys.toSeq
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito
try {
val job = JsonMethods.parse(fileContent).extract[FhirMappingJob]
// check there are no duplicate name on mappingTasks of the job
if(!FhirMappingJobFormatter.checkMappingTaskNamesUnique(job.mappings)){
throw new MappingException(s"Duplicate 'name' fields found in the MappingTasks within the ${job.id}! Ensure each MappingTask has an unique name.")
val duplicateMappingTasks = FhirMappingJobFormatter.findDuplicateMappingTaskNames(job.mappings)
if (duplicateMappingTasks.nonEmpty) {
throw new MappingException(s"Duplicate 'name' fields detected in the MappingTasks of job '${job.id}': ${duplicateMappingTasks.mkString(", ")}. Please ensure that each MappingTask has a unique name.")
}
// discard if the job id and file name not match
if (FileOperations.checkFileNameMatchesEntityId(job.id, file, "job")) {
Expand Down

0 comments on commit 6629b97

Please sign in to comment.