Skip to content

Commit

Permalink
✨ Add partitioning functionality while writing the parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
dogukan10 committed Sep 13, 2024
1 parent 2f10c30 commit 344a415
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import com.typesafe.scalalogging.Logger
import io.tofhir.common.model.Json4sSupport.formats
import io.tofhir.engine.data.write.FileSystemWriter.SinkFileFormats
import io.tofhir.engine.model.{FhirMappingResult, FileSystemSinkSettings}
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.{col, collect_list}
import org.apache.spark.sql.types.{ArrayType, StructType}
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrameWriter, Dataset, SaveMode, SparkSession}
import org.apache.spark.util.CollectionAccumulator
import org.json4s.jackson.JsonMethods

Expand Down Expand Up @@ -56,6 +56,8 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri
val resourceType = rDf.getAs[String]("resourceType")
// Convert the mutable ArraySeq (default in Spark) to an immutable List
val resourcesSeq = rDf.getAs[Seq[String]]("resources").toList
// Get the partition columns for the given resourceType
val partitionColumns = sinkSettings.getPartitioningColumns(resourceType)

// Generate the DataFrame that will be written to the file system.
// If the sink type is NDJSON, the DataFrame should have a single column containing the JSON strings.
Expand Down Expand Up @@ -84,19 +86,46 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri
// | NULL | NULL | [ {problem-list-item} ] | {active, http: //...| {J13, Pneumonia} | NULL | 2faab6373e7c3bba4...| [ {https://aiccele...| 2012-10-15 | Condition | {Patient/34dc88d5... | {confirmed, http... |
// | 2013-05-22 | NULL | [ {encounter-diagnosis} ] | {inactive, http...| {G40, Parkinson's disease} | Encounter/bb7134...| 63058b87a718e66d4...| [ {https://aiccele...| 2013-05-07 | Condition | {Patient/0b3a0b23... | NULL |
// +-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+
spark.read.json(resourcesDS)
val resourcesDF = spark.read.json(resourcesDS)

// Extend resourcesDF to handle partitioning if required.
// Some partition columns may not exist in the DataFrame (e.g., nested fields like `subject.reference`),
// so this extension adds the missing columns, allowing Spark to partition the DataFrame accordingly.
if(partitionColumns.isEmpty){
// If no partition columns are defined, return the DataFrame as-is
resourcesDF
} else {
// Obtain all existing columns
val existingColumns = resourcesDF.columns
// Filter out partition columns that are already in existingColumns
val filteredPartitionColumns = partitionColumns.filterNot(pc => existingColumns.exists(_.contentEquals(pc)))
// Merge existingColumns with the filtered partition columns
// This ensures that partition columns, which may not be part of the original data, are included.
val allColumnsWithPartition = existingColumns.map(col) ++ filteredPartitionColumns.map(c => col(c).as(c))

// Create a new DataFrame by selecting all existing columns along with the added partition columns
resourcesDF
.select(allColumnsWithPartition: _*)
}
}

// Define the output path based on the resourceType, ensuring that each resource type is saved in its own folder.
val outputPath = s"${sinkSettings.path}/$resourceType"
// Write the resources to the specified path based on the chosen format.
val writer = getWriter(resourcesDF, sinkSettings)

// Apply partitioning if partition columns are specified
val partitionedWriter = if (partitionColumns.nonEmpty) {
writer.partitionBy(partitionColumns: _*)
} else {
writer
}

// Handle the specific formats
sinkSettings.sinkType match {
case SinkFileFormats.NDJSON => writer.text(outputPath)
case SinkFileFormats.PARQUET => writer.parquet(outputPath)
case SinkFileFormats.DELTA_LAKE => writer.format(SinkFileFormats.DELTA_LAKE).save(outputPath)
case SinkFileFormats.NDJSON => partitionedWriter.text(outputPath)
case SinkFileFormats.PARQUET => partitionedWriter.parquet(outputPath)
case SinkFileFormats.DELTA_LAKE => partitionedWriter.format(SinkFileFormats.DELTA_LAKE).save(outputPath)
}
})
case SinkFileFormats.NDJSON =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.tofhir.engine.model

import akka.actor.ActorSystem
import io.onfhir.client.OnFhirNetworkClient
import io.tofhir.engine.data.write.FileSystemWriter.SinkFileFormats
import io.tofhir.engine.util.FhirClientUtil

/**
Expand All @@ -12,20 +13,40 @@ trait FhirSinkSettings
/**
* Settings to write mapped FHIR resources to file system
*
* @param path Path to the folder or file to write the resources
* @param fileFormat File format if not inferred from the path
* @param numOfPartitions Number of partitions for the file (for distributed fs)
* @param options Further options (Spark data source write options)
* @param path Path to the folder or file to write the resources
* @param fileFormat File format if not inferred from the path
* @param numOfPartitions Number of partitions for the file (for distributed fs)
* @param options Further options (Spark data source write options)
* @param partitionByResourceType Flag to determine whether to partition the output files by FHIR resource type.
* When enabled, each resource type will be written to a separate directory.
* Supported file formats: {@link SinkFileFormats.NDJSON}, {@link SinkFileFormats.PARQUET}
* and {@link SinkFileFormats.DELTA_LAKE}
* @param partitioningColumns Keeps partitioning columns for specific resource types.
* Applicable only when data is partitioned by resource type (via "partitionByResourceType").
* Supported file formats: {@link SinkFileFormats.PARQUET} and {@link SinkFileFormats.DELTA_LAKE}
*/
case class FileSystemSinkSettings(path: String,
fileFormat: Option[String] = None,
numOfPartitions: Int = 1,
options: Map[String, String] = Map.empty[String, String],
partitionByResourceType: Boolean = false) extends FhirSinkSettings {
partitionByResourceType: Boolean = false,
partitioningColumns: Map[String, List[String]] = Map.empty[String, List[String]]) extends FhirSinkSettings {
/**
* Determines the file format to use, inferred from the path if not explicitly provided.
*
* @return The file format as a string, either inferred from the file extension or provided explicitly.
*/
def sinkType: String = fileFormat.getOrElse(path.split('.').last)

/**
* Retrieves the partition columns for a given resource type.
*
* @param resourceType The FHIR resource type for which to retrieve the partitioning columns.
* @return A list of columns to partition by, or an empty list if no partitioning is defined for the resource type.
*/
def getPartitioningColumns(resourceType: String): List[String] = {
partitioningColumns.getOrElse(resourceType, List.empty[String])
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,74 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll {
patientDf.count() shouldBe 10
}

/**
* Tests whether FileSystemWriter can write a DataFrame into a parquet file, partitioned by different columns.
*
* The test uses the following FileSystemSinkSettings:
* {
* "path": "output-parquet-by-partition",
* "fileFormat": "parquet",
* "partitionByResourceType": true,
* "partitioningColumns": {
* "Patient": ["gender"],
* "Condition": ["subject.reference"]
* }
* }
*
* The expected output structure is:
*
* > output-parquet-by-partition
* > Condition
* > subject.reference=Patient%2F0b3a0b23a0c6e223b941e63787f15a6a
* > .part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet.crc
* > part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet
* > subject.reference=Patient%2F0bbad2343eb86d5cdc16a1b292537576
* > .part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet.crc
* > part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet
* > subject.reference=Patient%2F7b650be0176d6d29351f84314a5efbe3
* > .part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet.crc
* > part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet
* > subject.reference=Patient%2F34dc88d5972fd5472a942fc80f69f35c
* > .part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet.crc
* > part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet
* > subject.reference=Patient%2F49d3c335681ab7fb2d4cdf19769655db
* > .part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet.crc
* > part-00000-4a3c7bc0-164c-471c-9ddf-e8117aa445af.c000.snappy.parquet
* > Patient
* > gender=female
* > .part-00000-84ddd340-5ee0-41f8-a566-0e480e36870a.c000.snappy.parquet.crc
* > .part-00000-84ddd340-5ee0-41f8-a566-0e480e36870a.c000.snappy.parquet
* > gender=male
* > .part-00000-84ddd340-5ee0-41f8-a566-0e480e36870a.c000.snappy.parquet.crc
* > .part-00000-84ddd340-5ee0-41f8-a566-0e480e36870a.c000.snappy.parquet
* */
it should "write DataFrame as partitioned parquet files based on Patient's gender and Condition's reference of subject" in {
// Define the output path for the parquet files
val outputFolderPath = s"${ToFhirConfig.engineConfig.contextPath}/output-parquet-by-partition"
// Instantiate the FileSystemWriter with parquet file format and partitioning
val fileSystemWriter = new FileSystemWriter(sinkSettings = FileSystemSinkSettings(
path = outputFolderPath, fileFormat = Some(SinkFileFormats.PARQUET), partitionByResourceType = true,
partitioningColumns = Map("Patient" -> List("gender"), "Condition" -> List("subject.reference"))
))
// Write the DataFrame using the FileSystemWriter
fileSystemWriter.write(sparkSession, df, sparkSession.sparkContext.collectionAccumulator[FhirMappingResult])

// Verify that the data was correctly written and partitioned under "Condition"
val conditionDf = sparkSession.read
.parquet(s"$outputFolderPath/Condition")
conditionDf.count() shouldBe 5
val patientConditionDf = sparkSession.read
.parquet(s"$outputFolderPath/Condition/subject.reference=Patient%2F49d3c335681ab7fb2d4cdf19769655db")
patientConditionDf.count() shouldBe 1
// Verify that the data was correctly written and partitioned under "Patient"
val femalePatientDf = sparkSession.read
.parquet(s"$outputFolderPath/Patient/gender=female")
femalePatientDf.count() shouldBe 5
val malePatientDf = sparkSession.read
.parquet(s"$outputFolderPath/Patient/gender=male")
malePatientDf.count() shouldBe 5
}

/**
* Tests whether FileSystemWriter can write a DataFrame into a Delta Lake file.
*
Expand Down

0 comments on commit 344a415

Please sign in to comment.