Skip to content

Commit

Permalink
♻️ refactor: Do not append the file format to given path of FileSyste…
Browse files Browse the repository at this point in the history
…mSource while reading the data
  • Loading branch information
dogukan10 committed Jun 13, 2024
1 parent ae35de6 commit 9d0e467
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import org.apache.spark.sql.{DataFrame, SparkSession}

import java.io.File
import java.net.URI
import java.nio.file.Paths
import java.time.LocalDateTime
import scala.collection.mutable

Expand All @@ -35,18 +34,12 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
override def read(mappingSource: FileSystemSource, sourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty,jobId: Option[String] = Option.empty): DataFrame = {
// get the format of the file
val sourceType = mappingSource.inferFileFormat
// append the file format to the path if not exists
// for streaming jobs, we expect a directory name, so simply use the given path
val sourcePath =
if (sourceSettings.asStream || mappingSource.path.endsWith(s".${sourceType}"))
mappingSource.path
else mappingSource.path.concat(s".${sourceType}")
// determine the final path
// if it is a Hadoop path (starts with "hdfs://"), construct the URI directly without adding the context path
val finalPath = if (sourceSettings.dataFolderPath.startsWith("hdfs://")) {
new URI(s"${sourceSettings.dataFolderPath.stripSuffix("/")}/${sourcePath.stripPrefix("/")}").toString
new URI(s"${sourceSettings.dataFolderPath.stripSuffix("/")}/${mappingSource.path.stripPrefix("/")}").toString
} else {
FileUtils.getPath(sourceSettings.dataFolderPath, sourcePath).toAbsolutePath.toString
FileUtils.getPath(sourceSettings.dataFolderPath, mappingSource.path).toAbsolutePath.toString
}
// validate whether the provided path is a directory when streaming is enabled in the source settings
if(sourceSettings.asStream && !new File(finalPath).isDirectory){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,22 @@ trait FhirMappingSourceContext extends Serializable {
/**
* Context/configuration for one of the sources of the mapping that will read the source data from the file system.
*
* For batch jobs, you should either provide a file name in the "path" field with a file extension or provide the file name
* without an extension in the "path" field and indicate the extension in the "fileFormat" field.
* For batch jobs, you should either provide a file name in the "path" field with a file extension or provide the folder name
* in the "path" field and indicate the extension in the "fileFormat" field. In the latter case, it reads the files with
* the specified file format in the given folder.
*
* Examples:
* Examples for Batch Jobs:
* - With extension in path:
* FileSystemSource(path = "data/patients.csv") => Will read "data/patients.csv" file
* - Without extension in path (specifying file format separately):
* FileSystemSource(path = "data/patients", fileFormat = Some("csv")) => Will read "data/patients.csv" file
* FileSystemSource(path = "data/patients.csv") => Will read the "data/patients.csv" file.
* - Providing folder name and file format:
* FileSystemSource(path = "data/patients", fileFormat = Some("csv")) => Will read all "csv" files in the "data/patients" folder.
*
* For streaming jobs, you should provide a folder name in the "path" field and a file format in the "fileFormat" field so that
* it can read the files with specified file formats in the given folder.
* it can read the files with the specified file format in the given folder.
*
* Examples:
* Examples for Streaming Jobs:
* - Providing folder name and file format:
* FileSystemSource(path = "data/streaming/patients", fileFormat = Some("json")) => Will read "json" files in "data/streaming/patients" folder
* FileSystemSource(path = "data/streaming/patients", fileFormat = Some("json")) => Will read all "json" files in the "data/streaming/patients" folder.
* @param path File path to the source file or folder, e.g., "patients.csv" or "patients".
* @param fileFormat Format of the file (csv | json | parquet) if it cannot be inferred from the path, e.g., csv.
* @param options Further options for the format (Spark Data source options for the format, e.g., for csv -> https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option).
Expand Down

0 comments on commit 9d0e467

Please sign in to comment.