Skip to content

Commit

Permalink
🏷️ feat(SourceFileFormats): Separate the txt format as txt-csv a… (#216)
Browse files Browse the repository at this point in the history
* 🏷️ feat(SourceFileFormats): Separate the txt format as txt-csv and txt-ndjson format.

* ✨ feat(FileDataSourceReader): Read zip data using spark binary files functionality. Add a zip file reading test.

* ✨ feat(FileDataSourceReader): Read txt files from folder.
  • Loading branch information
Okanmercan99 authored Sep 3, 2024
1 parent 4b0736c commit 61c0e2d
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.tofhir.engine.data.read

import com.typesafe.scalalogging.Logger
import io.tofhir.engine.model.{FileSystemSource, FileSystemSourceSettings, SourceFileFormats}
import io.tofhir.engine.util.FileUtils
import io.tofhir.engine.util.{FileUtils, SparkUtil}
import org.apache.spark.sql.functions.{input_file_name, udf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down Expand Up @@ -34,6 +34,8 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
override def read(mappingSourceBinding: FileSystemSource, mappingJobSourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
// get the format of the file
val sourceType = mappingSourceBinding.inferFileFormat
// check whether it is a zip file
val isZipFile = mappingSourceBinding.path.endsWith(".zip");
// determine the final path
// if it is a Hadoop path (starts with "hdfs://"), construct the URI directly without adding the context path
val finalPath = if (mappingJobSourceSettings.dataFolderPath.startsWith("hdfs://")) {
Expand All @@ -52,7 +54,7 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
val processedFiles: mutable.HashSet[String] =mutable.HashSet.empty
//Based on source type
val resultDf = sourceType match {
case SourceFileFormats.CSV | SourceFileFormats.TSV =>
case SourceFileFormats.CSV | SourceFileFormats.TSV | SourceFileFormats.TXT_CSV =>
val updatedOptions = sourceType match {
case SourceFileFormats.TSV =>
// If the file format is tsv, use tab (\t) as separator by default if it is not set explicitly
Expand All @@ -64,6 +66,9 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
mappingSourceBinding.options +
// use *.csv as pathGlobFilter by default if it is not set explicitly to ignore files without csv extension
("pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.CSV}"))
case SourceFileFormats.TXT_CSV => mappingSourceBinding.options +
// use *.txt as pathGlobFilter by default if it is not set explicitly to ignore files without txt extension
("pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.TXT}"))
}

//Options that we infer for csv
Expand All @@ -84,7 +89,17 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
// add a dummy column called 'filename' using a udf function to print a log when the data reading is
// started for a file.
.withColumn("filename",logStartOfDataReading(processedFiles,logger = logger,jobId = jobId)(input_file_name))
else
else if(isZipFile) {
import spark.implicits._
val unzippedFile = SparkUtil.readZip(finalPath, spark.sparkContext);
spark.read
.option("enforceSchema", false) //Enforce schema should be false
.option("header", includeHeader)
.option("inferSchema", inferSchema)
.options(otherOptions)
.schema(csvSchema.orNull)
.csv(unzippedFile.toDS())
} else
spark.read
.option("enforceSchema", false) //Enforce schema should be false
.option("header", includeHeader)
Expand All @@ -93,11 +108,16 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
.schema(csvSchema.orNull)
.csv(finalPath)
// assume that each line in the txt files contains a separate JSON object.
case SourceFileFormats.JSON | SourceFileFormats.TXT=>
case SourceFileFormats.JSON | SourceFileFormats.TXT_NDJON=>
if(mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
.withColumn("filename", logStartOfDataReading(processedFiles, logger = logger, jobId = jobId)(input_file_name))
else if(isZipFile){
import spark.implicits._
val unzippedFile = SparkUtil.readZip(finalPath, spark.sparkContext);
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).json(unzippedFile.toDS())
}
else
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
case SourceFileFormats.PARQUET =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ object SourceFileFormats {
final val PARQUET = "parquet"
final val JSON = "json"
final val AVRO = "avro"
final val TXT_NDJON = "txt-ndjson"
final val TXT_CSV = "txt-csv"
final val TXT = "txt"
}

Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package io.tofhir.engine.util

import io.tofhir.engine.util.FhirMappingJobFormatter.formats
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.json4s.jackson.JsonMethods

import java.io.{File, PrintWriter}
import java.io.{BufferedReader, File, InputStreamReader, PrintWriter}
import java.net.URI
import java.nio.file.{Path, Paths}
import java.util.zip.ZipInputStream
import scala.io.Source

/**
Expand Down Expand Up @@ -87,4 +91,25 @@ object SparkUtil {
.max
}

/**
* Read zip file using spark context.
* @param path Path to the zip file
* @param sparkContext The spark context
* @return
*/
def readZip(path: String, sparkContext: SparkContext): RDD[String] = {
sparkContext.binaryFiles(path)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open);
LazyList.continually(zis.getNextEntry)
.takeWhile {
case null => zis.close(); false
case _ => true
}
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
LazyList.continually(br.readLine()).takeWhile(_ != null)
}
}
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit
sourceBinding = Map("source" -> FileSystemSource(path = "patients.tsv"))
)

val patientZipFileMappingTask: FhirMappingTask = FhirMappingTask(
mappingRef = "https://aiccelerate.eu/fhir/mappings/patient-mapping",
sourceBinding = Map("source" -> FileSystemSource(path = "patients.zip", fileFormat = Option.apply("txt-csv")))
)

val testMappingJobFilePath: String = getClass.getResource("/test-mappingjob.json").toURI.getPath
val testMappingJobWithIdentityServiceFilePath: String = getClass.getResource("/test-mappingjob-using-services.json").toURI.getPath

Expand Down Expand Up @@ -246,6 +251,38 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit
}
}

it should "execute the patient mapping task with given zip file and return the results" in {
val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map.empty, sparkSession)
fhirMappingJobManager.executeMappingTaskAndReturn(mappingJobExecution = FhirMappingJobExecution(
job = fhirMappingJob,
mappingTasks = Seq(patientZipFileMappingTask)), mappingJobSourceSettings = mappingJobSourceSettings) map { mappingResults =>
val results = mappingResults.map(r => {
r.mappedResource should not be None
val resource = r.mappedResource.get.parseJson
resource shouldBe a[Resource]
resource
})

results.size shouldBe 10
val patient1 = results.head
FHIRUtil.extractResourceType(patient1) shouldBe "Patient"
FHIRUtil.extractIdFromResource(patient1) shouldBe FhirMappingUtility.getHashedId("Patient", "p1")
FHIRUtil.extractValue[String](patient1, "gender") shouldBe "male"

val patient2 = results(1)
FHIRUtil.extractResourceType(patient2) shouldBe "Patient"
FHIRUtil.extractIdFromResource(patient2) shouldBe FhirMappingUtility.getHashedId("Patient", "p2")
FHIRUtil.extractValue[String](patient2, "deceasedDateTime") shouldBe "2017-03-10"

val patient10 = results.last
FHIRUtil.extractResourceType(patient10) shouldBe "Patient"
FHIRUtil.extractIdFromResource(patient10) shouldBe FhirMappingUtility.getHashedId("Patient", "p10")
FHIRUtil.extractValue[String](patient10, "gender") shouldBe "female"
FHIRUtil.extractValue[String](patient10, "birthDate") shouldBe "2003-11"
FHIRUtil.extractValueOption[String](patient10, "deceasedDateTime").isEmpty shouldBe true
}
}

it should "execute the other observation mapping task and return the results" in {
val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map(FhirPathUtilFunctionsFactory.defaultPrefix -> FhirPathUtilFunctionsFactory), sparkSession)
fhirMappingJobManager.executeMappingTaskAndReturn(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(otherObservationMappingTask), job = fhirMappingJob) , mappingJobSourceSettings = mappingJobSourceSettings) map { mappingResults =>
Expand Down

0 comments on commit 61c0e2d

Please sign in to comment.