From 9d0e46786c9f61bb4a371f6167f6e2520db8b88d Mon Sep 17 00:00:00 2001 From: dogukan10 Date: Thu, 13 Jun 2024 16:25:51 +0300 Subject: [PATCH] :recycle: refactor: Do not append the file format to given path of FileSystemSource while reading the data --- .../data/read/FileDataSourceReader.scala | 11 ++--------- .../tofhir/engine/model/FhirMappingTask.scala | 19 ++++++++++--------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/FileDataSourceReader.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/FileDataSourceReader.scala index c393350b..30b6bf09 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/FileDataSourceReader.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/data/read/FileDataSourceReader.scala @@ -9,7 +9,6 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import java.io.File import java.net.URI -import java.nio.file.Paths import java.time.LocalDateTime import scala.collection.mutable @@ -35,18 +34,12 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil override def read(mappingSource: FileSystemSource, sourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty,jobId: Option[String] = Option.empty): DataFrame = { // get the format of the file val sourceType = mappingSource.inferFileFormat - // append the file format to the path if not exists - // for streaming jobs, we expect a directory name, so simply use the given path - val sourcePath = - if (sourceSettings.asStream || mappingSource.path.endsWith(s".${sourceType}")) - mappingSource.path - else mappingSource.path.concat(s".${sourceType}") // determine the final path // if it is a Hadoop path (starts with "hdfs://"), construct the URI directly without adding the context path val finalPath = if (sourceSettings.dataFolderPath.startsWith("hdfs://")) { - new URI(s"${sourceSettings.dataFolderPath.stripSuffix("/")}/${sourcePath.stripPrefix("/")}").toString + new URI(s"${sourceSettings.dataFolderPath.stripSuffix("/")}/${mappingSource.path.stripPrefix("/")}").toString } else { - FileUtils.getPath(sourceSettings.dataFolderPath, sourcePath).toAbsolutePath.toString + FileUtils.getPath(sourceSettings.dataFolderPath, mappingSource.path).toAbsolutePath.toString } // validate whether the provided path is a directory when streaming is enabled in the source settings if(sourceSettings.asStream && !new File(finalPath).isDirectory){ diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala index 198fd577..02cc7d84 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala @@ -38,21 +38,22 @@ trait FhirMappingSourceContext extends Serializable { /** * Context/configuration for one of the sources of the mapping that will read the source data from the file system. * - * For batch jobs, you should either provide a file name in the "path" field with a file extension or provide the file name - * without an extension in the "path" field and indicate the extension in the "fileFormat" field. + * For batch jobs, you should either provide a file name in the "path" field with a file extension or provide the folder name + * in the "path" field and indicate the extension in the "fileFormat" field. In the latter case, it reads the files with + * the specified file format in the given folder. * - * Examples: + * Examples for Batch Jobs: * - With extension in path: - * FileSystemSource(path = "data/patients.csv") => Will read "data/patients.csv" file - * - Without extension in path (specifying file format separately): - * FileSystemSource(path = "data/patients", fileFormat = Some("csv")) => Will read "data/patients.csv" file + * FileSystemSource(path = "data/patients.csv") => Will read the "data/patients.csv" file. + * - Providing folder name and file format: + * FileSystemSource(path = "data/patients", fileFormat = Some("csv")) => Will read all "csv" files in the "data/patients" folder. * * For streaming jobs, you should provide a folder name in the "path" field and a file format in the "fileFormat" field so that - * it can read the files with specified file formats in the given folder. + * it can read the files with the specified file format in the given folder. * - * Examples: + * Examples for Streaming Jobs: * - Providing folder name and file format: - * FileSystemSource(path = "data/streaming/patients", fileFormat = Some("json")) => Will read "json" files in "data/streaming/patients" folder + * FileSystemSource(path = "data/streaming/patients", fileFormat = Some("json")) => Will read all "json" files in the "data/streaming/patients" folder. * @param path File path to the source file or folder, e.g., "patients.csv" or "patients". * @param fileFormat Format of the file (csv | json | parquet) if it cannot be inferred from the path, e.g., csv. * @param options Further options for the format (Spark Data source options for the format, e.g., for csv -> https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option).