Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor file format to content type #226

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,20 +547,24 @@ Example of a Mapping Job definition file with csv source type:
},
"mappings": [
{
"name": "patient-mapping",
"mappingRef": "https://aiccelerate.eu/fhir/mappings/project1/patient-mapping",
"sourceBinding": {
"patient": {
"jsonClass": "FileSystemSource",
"path": "patients.csv"
"path": "patients.csv",
"contentType": "csv"
}
}
},
{
"name": "practitioner-mapping",
"mappingRef": "https://aiccelerate.eu/fhir/mappings/project1/practitioner-mapping",
"sourceBinding": {
"practitioner": {
"jsonClass": "FileSystemSource",
"path": "practitioners.csv"
"path": "practitioners.csv",
"contentType": "csv"
}
}
}
Expand Down Expand Up @@ -632,12 +636,13 @@ Example of a Mapping Job definition file with csv source type in streaming mode:
},
"mappings": [
{
"name": "patient-mapping",
"mappingRef": "https://aiccelerate.eu/fhir/mappings/project1/patient-mapping",
"sourceBinding": {
"patient": {
"jsonClass": "FileSystemSource",
"path": "patients",
"fileFormat": "csv"
"contentType": "csv"
}
}
}
Expand All @@ -649,7 +654,6 @@ The json snippet above illustrates the structure of an example mapping job in st
Similar to the batch mode, most of the fields are the same. The only differences are:
- `asStream` field in the source settings
- `path` in the source binding of the mapping. `path` should be the name of the **folder** this time, and it is where toFHIR will monitor the changes.
- `fileFormat` in the source binding of the mapping. `fileFormat` field is mandatory for streams and filters to process only files with the given format.

##### SQL

Expand All @@ -670,6 +674,7 @@ Similarly, if we had a source with SQL type, `sourceSettings` and `mappings` par
```
```json
{
"name": "location-sql-mapping",
"mappingRef": "https://aiccelerate.eu/fhir/mappings/location-sql-mapping",
"sourceBinding": {
"source": {
Expand All @@ -682,6 +687,7 @@ Similarly, if we had a source with SQL type, `sourceSettings` and `mappings` par
We can give a table name with the `tableName` field, as well as write a query with the `query` field:
```json
{
"name": "location-sql-mapping",
"mappingRef": "https://aiccelerate.eu/fhir/mappings/location-sql-mapping",
"sourceBinding": {
"source": {
Expand Down Expand Up @@ -709,6 +715,7 @@ Mapping job and mapping examples shown below for the streaming type of sources l
```
```json
{
"name": "location-sql-mapping",
"mappingRef": "https://aiccelerate.eu/fhir/mappings/location-sql-mapping",
"sourceBinding": {
"source": {
Expand Down Expand Up @@ -778,6 +785,7 @@ In addition to specifying the server URL (**serverUrl**), you can configure secu
Within the mapping source, you can define the resource type (e.g., Patient, Observation) and apply filters using a query string:
```json
{
"name": "patient-mapping",
"mappingRef" : "https://aiccelerate.eu/fhir/mappings/pilot1/patient-mapping",
"sourceBinding" : {
"source" : {
Expand All @@ -803,12 +811,13 @@ To give any spark option, you can use the `options` field in the source binding

```json
{
"name": "patient-mapping",
"mappingRef": "https://aiccelerate.eu/fhir/mappings/project1/patient-mapping",
"sourceBinding": {
"source": {
"jsonClass": "FileSystemSource",
"path": "patients",
"fileFormat": "csv",
"contentType": "csv",
"options": {
"sep": "\\t" // tab separated file
}
Expand Down Expand Up @@ -885,12 +894,13 @@ Next, specify the source bindings for your mappings in the job. Here's an exampl
```json
{
"mappings" : [ {
"name": "patient-mapping-with-two-sources",
"mappingRef" : "http://patient-mapping-with-two-sources",
"sourceBinding" : {
"patient" : {
"jsonClass" : "FileSystemSource",
"path" : "patient-simple.csv",
"fileFormat" : "csv",
"contentType" : "csv",
"options" : { },
"sourceRef": "patientSource"
},
Expand All @@ -915,19 +925,20 @@ If the `genderSource` was connected to file system in the job definition, the `s
```json
{
"mappings" : [ {
"name": "patient-mapping-with-two-sources",
"mappingRef" : "http://patient-mapping-with-two-sources",
"sourceBinding" : {
"patient" : {
"jsonClass" : "FileSystemSource",
"path" : "patient-simple.csv",
"fileFormat" : "csv",
"contentType" : "csv",
"options" : { },
"sourceRef": "patientSource"
},
"patientGender" : {
"jsonClass" : "FileSystemSource",
"path" : "patient-gender-simple.csv",
"fileFormat" : "csv",
"contentType" : "csv",
"options" : { },
"sourceRef": "genderSource"
}
Expand Down Expand Up @@ -1001,7 +1012,8 @@ Or you can use a local file system to persist the generated FHIR resources:
{
"sinkSettings": {
"jsonClass": "FileSystemSinkSettings",
"path": "sink/project1"
"path": "sink/project1",
"contentType": "csv"
}
}
```
Expand Down Expand Up @@ -1133,7 +1145,8 @@ you can specify the initial time in your mapping job definition as follows:
`mapping.json`
```json
{
...
...,
"name": "procedure-occurrence-mapping",
"mappingRef": "https://aiccelerate.eu/fhir/mappings/omop/procedure-occurrence-mapping",
"sourceBinding": {
"source": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.tofhir.engine.data.read

import com.typesafe.scalalogging.Logger
import io.tofhir.engine.model.{FileSystemSource, FileSystemSourceSettings, SourceFileFormats}
import io.tofhir.engine.model.{FileSystemSource, FileSystemSourceSettings, SourceContentTypes}
import io.tofhir.engine.util.{FileUtils, SparkUtil}
import org.apache.spark.sql.functions.{input_file_name, udf}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -34,8 +34,8 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
* @throws NotImplementedError If the specified source format is not implemented.
*/
override def read(mappingSourceBinding: FileSystemSource, mappingJobSourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
// get the format of the file
val sourceType = mappingSourceBinding.inferFileFormat
// get the content type for the file
val contentType = mappingSourceBinding.contentType
// check whether it is a zip file
val isZipFile = mappingSourceBinding.path.endsWith(".zip");
// determine the final path
Expand All @@ -55,22 +55,14 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
// keeps the names of processed files by Spark
val processedFiles: mutable.HashSet[String] =mutable.HashSet.empty
//Based on source type
val resultDf = sourceType match {
case SourceFileFormats.CSV | SourceFileFormats.TSV | SourceFileFormats.TXT_CSV =>
val updatedOptions = sourceType match {
case SourceFileFormats.TSV =>
val resultDf = contentType match {
case SourceContentTypes.CSV | SourceContentTypes.TSV =>
val updatedOptions = contentType match {
case SourceContentTypes.TSV =>
// If the file format is tsv, use tab (\t) as separator by default if it is not set explicitly
mappingSourceBinding.options +
("sep" -> mappingSourceBinding.options.getOrElse("sep", "\\t"),
// use *.tsv as pathGlobFilter by default if it is not set explicitly to ignore files without tsv extension
"pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.TSV}"))
case SourceFileFormats.CSV =>
mappingSourceBinding.options +
// use *.csv as pathGlobFilter by default if it is not set explicitly to ignore files without csv extension
("pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.CSV}"))
case SourceFileFormats.TXT_CSV => mappingSourceBinding.options +
// use *.txt as pathGlobFilter by default if it is not set explicitly to ignore files without txt extension
("pathGlobFilter" -> mappingSourceBinding.options.getOrElse("pathGlobFilter", s"*.${SourceFileFormats.TXT}"))
("sep" -> mappingSourceBinding.options.getOrElse("sep", "\\t"))
case SourceContentTypes.CSV => mappingSourceBinding.options
}

//Options that we infer for csv
Expand Down Expand Up @@ -110,7 +102,7 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
.schema(csvSchema.orNull)
.csv(finalPath)
// assume that each line in the txt files contains a separate JSON object.
case SourceFileFormats.JSON | SourceFileFormats.TXT_NDJSON =>
case SourceContentTypes.JSON | SourceContentTypes.NDJSON =>
if(mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
Expand All @@ -122,7 +114,7 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
}
else
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
case SourceFileFormats.PARQUET =>
case SourceContentTypes.PARQUET =>
if(mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).parquet(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.tofhir.engine.data.write

import com.typesafe.scalalogging.Logger
import io.tofhir.engine.data.write.FileSystemWriter.SinkFileFormats
import FileSystemWriter.SinkContentTypes
import io.tofhir.engine.model.{FhirMappingResult, FileSystemSinkSettings}
import org.apache.spark.sql.functions.{col, collect_list}
import org.apache.spark.sql.types.{ArrayType, StructType}
Expand All @@ -18,10 +18,10 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri
override def write(spark:SparkSession, df: Dataset[FhirMappingResult], problemsAccumulator:CollectionAccumulator[FhirMappingResult]): Unit = {
import spark.implicits._
logger.debug("Created FHIR resources will be written to the given URL:{}", sinkSettings.path)
sinkSettings.sinkType match {
// Handle cases where the file format is either NDJSON, PARQUET, or DELTA_LAKE,
sinkSettings.contentType match {
// Handle cases where the content type is either NDJSON, PARQUET, or DELTA_LAKE,
// and the output needs to be partitioned by FHIR resource type.
case SinkFileFormats.NDJSON | SinkFileFormats.PARQUET | SinkFileFormats.DELTA_LAKE if sinkSettings.partitionByResourceType =>
case SinkContentTypes.NDJSON | SinkContentTypes.PARQUET | SinkContentTypes.DELTA_LAKE if sinkSettings.partitionByResourceType =>
// Group the DataFrame by resourceType to aggregate all resources of the same type.
// groupedDFs will have the following structure:
// +------------+--------------------+
Expand All @@ -41,8 +41,8 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri

// Generate the DataFrame that will be written to the file system.
// If the sink type is NDJSON, the DataFrame should have a single column containing the JSON strings.
// For other formats, the DataFrame should have multiple columns corresponding to the keys in the JSON objects.
val resourcesDF = if(sinkSettings.sinkType.contentEquals(SinkFileFormats.NDJSON)) {
// For other content types, the DataFrame should have multiple columns corresponding to the keys in the JSON objects.
val resourcesDF = if(sinkSettings.contentType.contentEquals(SinkContentTypes.NDJSON)) {
// Convert the list of JSON strings into a DataFrame with a single column named "mappedResourceJson".
// The resulting DataFrame will contain one row per JSON string, where each row is a single JSON object.
// The structure of this DataFrame will be as follows:
Expand Down Expand Up @@ -91,7 +91,7 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri

// Define the output path based on the resourceType, ensuring that each resource type is saved in its own folder.
val outputPath = s"${sinkSettings.path}/$resourceType"
// Write the resources to the specified path based on the chosen format.
// Write the resources to the specified path based on the chosen content type.
val writer = getWriter(resourcesDF, sinkSettings)

// Apply partitioning if partition columns are specified
Expand All @@ -101,30 +101,30 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri
writer
}

// Handle the specific formats
sinkSettings.sinkType match {
case SinkFileFormats.NDJSON => partitionedWriter.text(outputPath)
case SinkFileFormats.PARQUET => partitionedWriter.parquet(outputPath)
case SinkFileFormats.DELTA_LAKE => partitionedWriter.format(SinkFileFormats.DELTA_LAKE).save(outputPath)
// Handle the specific content types
sinkSettings.contentType match {
case SinkContentTypes.NDJSON => partitionedWriter.text(outputPath)
case SinkContentTypes.PARQUET => partitionedWriter.parquet(outputPath)
case SinkContentTypes.DELTA_LAKE => partitionedWriter.format(SinkContentTypes.DELTA_LAKE).save(outputPath)
}
})
case SinkFileFormats.NDJSON =>
case SinkContentTypes.NDJSON =>
getWriter(df.map(_.mappedResource.get), sinkSettings).text(sinkSettings.path)
case SinkFileFormats.PARQUET =>
case SinkContentTypes.PARQUET =>
// Convert the DataFrame to a Dataset of JSON strings
val jsonDS = df.select("mappedResource").as[String]
// Create a DataFrame from the Dataset of JSON strings
val jsonDF = spark.read.json(jsonDS)
getWriter(jsonDF, sinkSettings).parquet(sinkSettings.path)
case SinkFileFormats.DELTA_LAKE =>
case SinkContentTypes.DELTA_LAKE =>
// Convert the DataFrame to a Dataset of JSON strings
val jsonDS = df.select("mappedResource").as[String]
// Create a DataFrame from the Dataset of JSON strings
val jsonDF = spark.read.json(jsonDS)
getWriter(jsonDF, sinkSettings)
.format(SinkFileFormats.DELTA_LAKE) // Specify Delta Lake format
.format(SinkContentTypes.DELTA_LAKE) // Specify Delta Lake content type
.save(sinkSettings.path)
case SinkFileFormats.CSV =>
case SinkContentTypes.CSV =>
// read the mapped resource json column and load it to a new data frame
val mappedResourceDF = spark.read.json(df.select("mappedResource").as[String])
// select the columns that are not array type or struct type
Expand Down Expand Up @@ -169,7 +169,7 @@ class FileSystemWriter(sinkSettings: FileSystemSinkSettings) extends BaseFhirWri
}

object FileSystemWriter {
object SinkFileFormats {
object SinkContentTypes {
final val NDJSON = "ndjson"
final val CSV = "csv"
final val PARQUET = "parquet"
Expand Down
Loading
Loading