Skip to content

Commit

Permalink
♻️ refactor(FileSystemSinkSettings, FileSystemSource): Change fileFor…
Browse files Browse the repository at this point in the history
…mat with contentType.
  • Loading branch information
Okanmercan99 committed Sep 20, 2024
1 parent 4673bee commit 2087da4
Show file tree
Hide file tree
Showing 22 changed files with 229 additions and 292 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ Example of a Mapping Job definition file with csv source type in streaming mode:
"patient": {
"jsonClass": "FileSystemSource",
"path": "patients",
"fileFormat": "csv"
"contentType": "csv"
}
}
}
Expand All @@ -649,7 +649,7 @@ The json snippet above illustrates the structure of an example mapping job in st
Similar to the batch mode, most of the fields are the same. The only differences are:
- `asStream` field in the source settings
- `path` in the source binding of the mapping. `path` should be the name of the **folder** this time, and it is where toFHIR will monitor the changes.
- `fileFormat` in the source binding of the mapping. `fileFormat` field is mandatory for streams and filters to process only files with the given format.
- `contentType` in the source binding of the mapping. `contentType` field is mandatory.

##### SQL

Expand Down Expand Up @@ -808,7 +808,7 @@ To give any spark option, you can use the `options` field in the source binding
"source": {
"jsonClass": "FileSystemSource",
"path": "patients",
"fileFormat": "csv",
"contentType": "csv",
"options": {
"sep": "\\t" // tab separated file
}
Expand Down Expand Up @@ -890,7 +890,7 @@ Next, specify the source bindings for your mappings in the job. Here's an exampl
"patient" : {
"jsonClass" : "FileSystemSource",
"path" : "patient-simple.csv",
"fileFormat" : "csv",
"contentType" : "csv",
"options" : { },
"sourceRef": "patientSource"
},
Expand Down Expand Up @@ -920,14 +920,14 @@ If the `genderSource` was connected to file system in the job definition, the `s
"patient" : {
"jsonClass" : "FileSystemSource",
"path" : "patient-simple.csv",
"fileFormat" : "csv",
"contentType" : "csv",
"options" : { },
"sourceRef": "patientSource"
},
"patientGender" : {
"jsonClass" : "FileSystemSource",
"path" : "patient-gender-simple.csv",
"fileFormat" : "csv",
"contentType" : "csv",
"options" : { },
"sourceRef": "genderSource"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.tofhir.engine.data.read

import com.typesafe.scalalogging.Logger
import io.tofhir.engine.model.{FileSystemSource, FileSystemSourceSettings, SourceFileFormats}
import io.tofhir.engine.model.{FileSystemSource, FileSystemSourceSettings, SourceContentTypes}
import io.tofhir.engine.util.{FileUtils, SparkUtil}
import org.apache.spark.sql.functions.{input_file_name, udf}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -34,8 +34,8 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
* @throws NotImplementedError If the specified source format is not implemented.
*/
override def read(mappingSourceBinding: FileSystemSource, mappingJobSourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
// get the format of the file
val sourceType = mappingSourceBinding.inferFileFormat
// get the content type for the file
val contentType = mappingSourceBinding.contentType
// check whether it is a zip file
val isZipFile = mappingSourceBinding.path.endsWith(".zip");
// determine the final path
Expand All @@ -55,26 +55,14 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
// keeps the names of processed files by Spark
val processedFiles: mutable.HashSet[String] =mutable.HashSet.empty
//Based on source type
val resultDf = sourceType match {
case SourceFileFormats.CSV | SourceFileFormats.TSV | SourceFileFormats.TXT_CSV | SourceFileFormats.TXT_TSV =>
val updatedOptions = sourceType match {
case SourceFileFormats.TSV =>
val resultDf = contentType match {
case SourceContentTypes.CSV | SourceContentTypes.TSV =>
val updatedOptions = contentType match {
case SourceContentTypes.TSV =>
// If the file format is tsv, use tab (\t) as separator by default if it is not set explicitly
mappingSourceBinding.options +
("sep" -> mappingSourceBinding.options.getOrElse("sep", "\\t"),
// use *.tsv as pathGlobFilter by default if it is not set explicitly to ignore files without tsv extension
"pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.TSV}"))
case SourceFileFormats.CSV =>
mappingSourceBinding.options +
// use *.csv as pathGlobFilter by default if it is not set explicitly to ignore files without csv extension
("pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.CSV}"))
case SourceFileFormats.TXT_CSV => mappingSourceBinding.options +
// use *.txt as pathGlobFilter by default if it is not set explicitly to ignore files without txt extension
("pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.TXT}"))
case SourceFileFormats.TXT_TSV => mappingSourceBinding.options +
("sep" -> mappingSourceBinding.options.getOrElse("sep", "\\t"),
// use *.txt as pathGlobFilter by default if it is not set explicitly to ignore files without txt extension
"pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.TXT}"))
("sep" -> mappingSourceBinding.options.getOrElse("sep", "\\t"))
case SourceContentTypes.CSV => mappingSourceBinding.options
}

//Options that we infer for csv
Expand Down Expand Up @@ -114,7 +102,7 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
.schema(csvSchema.orNull)
.csv(finalPath)
// assume that each line in the txt files contains a separate JSON object.
case SourceFileFormats.JSON | SourceFileFormats.TXT_NDJSON =>
case SourceContentTypes.JSON | SourceContentTypes.NDJSON =>
if(mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
Expand All @@ -126,7 +114,7 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
}
else
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
case SourceFileFormats.PARQUET =>
case SourceContentTypes.PARQUET =>
if(mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).parquet(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.tofhir.engine.data.write

import com.typesafe.scalalogging.Logger
import io.tofhir.engine.data.write.FileSystemWriter.SinkFileFormats
import FileSystemWriter.SinkContentTypes
import io.tofhir.engine.model.{FhirMappingResult, FileSystemSinkSettings}
import org.apache.spark.sql.functions.{col, collect_list}
import org.apache.spark.sql.types.{ArrayType, StructType}
Expand All @@ -18,10 +18,10 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri
override def write(spark:SparkSession, df: Dataset[FhirMappingResult], problemsAccumulator:CollectionAccumulator[FhirMappingResult]): Unit = {
import spark.implicits._
logger.debug("Created FHIR resources will be written to the given URL:{}", sinkSettings.path)
sinkSettings.sinkType match {
// Handle cases where the file format is either NDJSON, PARQUET, or DELTA_LAKE,
sinkSettings.contentType match {
// Handle cases where the content type is either NDJSON, PARQUET, or DELTA_LAKE,
// and the output needs to be partitioned by FHIR resource type.
case SinkFileFormats.NDJSON | SinkFileFormats.PARQUET | SinkFileFormats.DELTA_LAKE if sinkSettings.partitionByResourceType =>
case SinkContentTypes.NDJSON | SinkContentTypes.PARQUET | SinkContentTypes.DELTA_LAKE if sinkSettings.partitionByResourceType =>
// Group the DataFrame by resourceType to aggregate all resources of the same type.
// groupedDFs will have the following structure:
// +------------+--------------------+
Expand All @@ -41,8 +41,8 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri

// Generate the DataFrame that will be written to the file system.
// If the sink type is NDJSON, the DataFrame should have a single column containing the JSON strings.
// For other formats, the DataFrame should have multiple columns corresponding to the keys in the JSON objects.
val resourcesDF = if(sinkSettings.sinkType.contentEquals(SinkFileFormats.NDJSON)) {
// For other content types, the DataFrame should have multiple columns corresponding to the keys in the JSON objects.
val resourcesDF = if(sinkSettings.contentType.contentEquals(SinkContentTypes.NDJSON)) {
// Convert the list of JSON strings into a DataFrame with a single column named "mappedResourceJson".
// The resulting DataFrame will contain one row per JSON string, where each row is a single JSON object.
// The structure of this DataFrame will be as follows:
Expand Down Expand Up @@ -91,7 +91,7 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri

// Define the output path based on the resourceType, ensuring that each resource type is saved in its own folder.
val outputPath = s"${sinkSettings.path}/$resourceType"
// Write the resources to the specified path based on the chosen format.
// Write the resources to the specified path based on the chosen content type.
val writer = getWriter(resourcesDF, sinkSettings)

// Apply partitioning if partition columns are specified
Expand All @@ -101,30 +101,30 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri
writer
}

// Handle the specific formats
sinkSettings.sinkType match {
case SinkFileFormats.NDJSON => partitionedWriter.text(outputPath)
case SinkFileFormats.PARQUET => partitionedWriter.parquet(outputPath)
case SinkFileFormats.DELTA_LAKE => partitionedWriter.format(SinkFileFormats.DELTA_LAKE).save(outputPath)
// Handle the specific content types
sinkSettings.contentType match {
case SinkContentTypes.NDJSON => partitionedWriter.text(outputPath)
case SinkContentTypes.PARQUET => partitionedWriter.parquet(outputPath)
case SinkContentTypes.DELTA_LAKE => partitionedWriter.format(SinkContentTypes.DELTA_LAKE).save(outputPath)
}
})
case SinkFileFormats.NDJSON =>
case SinkContentTypes.NDJSON =>
getWriter(df.map(_.mappedResource.get), sinkSettings).text(sinkSettings.path)
case SinkFileFormats.PARQUET =>
case SinkContentTypes.PARQUET =>
// Convert the DataFrame to a Dataset of JSON strings
val jsonDS = df.select("mappedResource").as[String]
// Create a DataFrame from the Dataset of JSON strings
val jsonDF = spark.read.json(jsonDS)
getWriter(jsonDF, sinkSettings).parquet(sinkSettings.path)
case SinkFileFormats.DELTA_LAKE =>
case SinkContentTypes.DELTA_LAKE =>
// Convert the DataFrame to a Dataset of JSON strings
val jsonDS = df.select("mappedResource").as[String]
// Create a DataFrame from the Dataset of JSON strings
val jsonDF = spark.read.json(jsonDS)
getWriter(jsonDF, sinkSettings)
.format(SinkFileFormats.DELTA_LAKE) // Specify Delta Lake format
.format(SinkContentTypes.DELTA_LAKE) // Specify Delta Lake content type
.save(sinkSettings.path)
case SinkFileFormats.CSV =>
case SinkContentTypes.CSV =>
// read the mapped resource json column and load it to a new data frame
val mappedResourceDF = spark.read.json(df.select("mappedResource").as[String])
// select the columns that are not array type or struct type
Expand Down Expand Up @@ -169,7 +169,7 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri
}

object FileSystemWriter {
object SinkFileFormats {
object SinkContentTypes {
final val NDJSON = "ndjson"
final val CSV = "csv"
final val PARQUET = "parquet"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
package io.tofhir.engine.model

/**
* Custom exception class for signaling errors related to missing or invalid file formats.
*
* This exception is thrown when a required file format is not provided or is determined
* to be invalid during processing.
*
* @param message A descriptive message providing more information about the exception.
* @throws RuntimeException This exception extends the RuntimeException class for signaling runtime errors.
*
*/
class MissingFileFormatException(message: String) extends RuntimeException(message)


/**
* FHIR Mapping task instance
* {@link mapping} will be executed if it is provided. Otherwise, the mapping referenced by {@link mappingRef} is
Expand Down Expand Up @@ -51,58 +38,24 @@ trait MappingSourceBinding extends Serializable {
/**
* Context/configuration for one of the sources of the mapping that will read the source data from the file system.
*
* For batch jobs, you should either provide a file name in the "path" field with a file extension or provide the folder name
* in the "path" field and indicate the extension in the "fileFormat" field. In the latter case, it reads the files with
* the specified file format in the given folder.
* For batch jobs, you should provide the folder name in the "path" field and indicate the content type in the "contentType"
* field. It reads the files with the specified content type in the given folder.
*
* Examples for Batch Jobs:
* - With extension in path:
* FileSystemSource(path = "data/patients.csv") => Will read the "data/patients.csv" file.
* - Providing folder name and file format:
* FileSystemSource(path = "data/patients", fileFormat = Some("csv")) => Will read all "csv" files in the "data/patients" folder.
* Example for Batch Jobs:
* - Providing folder name and content type:
* FileSystemSource(path = "data/patients", contentType = "csv") => Will read all files in the "data/patients" folder and interpret them as a csv.
*
* For streaming jobs, you should provide a folder name in the "path" field and a file format in the "fileFormat" field so that
* it can read the files with the specified file format in the given folder.
* For streaming jobs, you should provide a folder name in the "path" field and a content type in the "contentType" field so that
* it can read the files with the specified content type in the given folder.
*
* Examples for Streaming Jobs:
* - Providing folder name and file format:
* FileSystemSource(path = "data/streaming/patients", fileFormat = Some("json")) => Will read all "json" files in the "data/streaming/patients" folder.
* - Providing folder name and content type:
* FileSystemSource(path = "data/streaming/patients", contentType = "json") => Will read all files in the "data/streaming/patients" folder in json content type.
* @param path File path to the source file or folder, e.g., "patients.csv" or "patients".
* @param fileFormat Format of the file (csv | json | parquet) if it cannot be inferred from the path, e.g., csv.
* @param options Further options for the format (Spark Data source options for the format, e.g., for csv -> https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option).
* @param contentType Content of the file
* @param options Further options for the content type (Spark Data source options for the content type, e.g., for csv -> https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option).
*/
case class FileSystemSource(path: String, fileFormat:Option[String] = None, options:Map[String, String] = Map.empty[String, String], override val preprocessSql: Option[String] = None, override val sourceRef: Option[String] = None) extends MappingSourceBinding {
/**
* Infers the file format based on the provided path and file format.
*
* This method attempts to determine the file format using the following logic:
* 1. If `fileFormat` is provided, it is used as the determined format.
* 2. If `fileFormat` is not provided and the `path` contains an extension (determined by the presence of a period),
* the extension of the `path` is used as the format.
* 3. If neither `fileFormat` is provided nor an extension is found in the `path`, a `MissingFileFormatException` is thrown.
*
* @return The inferred file format.
* @throws MissingFileFormatException If the file format cannot be determined.
*/
def inferFileFormat: String = {
var format = fileFormat
// split the path into segments based on the period ('.')
val pathSegments = path.split('.')

// if the file format is empty and path is a file i.e. there are at least two segments in the path,
// set the file format based on the last segment of the path
if (format.isEmpty && pathSegments.length > 1) {
format = Some(pathSegments.last)
}

// if the file format is empty, throw an exception
if (format.isEmpty) {
throw new MissingFileFormatException("File format is missing for FileSystemSource. Please provide a valid file format.")
}
format.get
}

}
case class FileSystemSource(path: String, contentType:String, options:Map[String, String] = Map.empty[String, String], override val preprocessSql: Option[String] = None, override val sourceRef: Option[String] = None) extends MappingSourceBinding {}
/**
* Context/configuration for one of the source of the mapping that will read the source data from an SQL database
* Any of tableName and query must be defined. Not both, not neither
Expand Down Expand Up @@ -133,17 +86,14 @@ case class KafkaSource(topicName: String, groupId: String, startingOffsets: Stri
case class FhirServerSource(resourceType: String, query: Option[String] = None, override val preprocessSql: Option[String] = None, override val sourceRef: Option[String] = None) extends MappingSourceBinding

/**
* List of source file formats supported by tofhir
* List of source content types supported by tofhir
*/
object SourceFileFormats {
object SourceContentTypes {
final val CSV = "csv"
final val TSV = "tsv"
final val PARQUET = "parquet"
final val JSON = "json"
final val AVRO = "avro"
final val TXT_NDJSON = "txt-ndjson"
final val TXT_CSV = "txt-csv"
final val TXT_TSV = "txt-tsv"
final val TXT = "txt"
final val NDJSON = "ndjson"
}

Loading

0 comments on commit 2087da4

Please sign in to comment.